defmodule Kathikon.Storage do
@moduledoc """
Storage behaviour and facade for job persistence.
Application code should prefer `Kathikon.insert/3` and management APIs.
Configure the backend via `config :kathikon, storage_backend: module`.
Default: `Kathikon.Storage.Mnesia`.
## Examples
:ok = Kathikon.Storage.setup()
# Tests and Livebook embedding
Kathikon.Storage.reset!()
See `docs/storage.md`.
"""
alias Kathikon.Job
@backend_key :storage_backend
@storage_override {:kathikon, :storage_override}
@type job :: Job.t() | map()
@callback setup() :: :ok
@callback clear_jobs!() :: :ok
@callback reset!() :: :ok
@callback insert_job(job()) :: {:ok, String.t()} | {:ok, Job.t()} | {:error, term()}
@callback get_job(String.t()) :: {:ok, map()} | {:ok, Job.t()} | {:error, :not_found | term()}
@callback update_job(String.t(), map()) ::
{:ok, map()} | {:ok, Job.t()} | {:error, :not_found | term()}
@callback claim_job(String.t(), map()) ::
{:ok, map()}
| {:ok, Job.t()}
| {:error, :not_found | :already_claimed | :not_claimable | term()}
@callback claim_available_jobs(atom(), pos_integer(), map()) ::
{:ok, [map()]} | {:ok, [Job.t()]} | {:error, term()}
@callback claim_and_start_available_jobs(atom(), pos_integer(), map()) ::
{:ok, [map()]} | {:ok, [Job.t()]} | {:error, term()}
@callback defer_job(String.t(), DateTime.t(), map()) ::
{:ok, map()} | {:ok, Job.t()} | {:error, term()}
@callback complete_job(String.t(), term(), map()) ::
{:ok, map()} | {:ok, Job.t()} | {:error, term()}
@callback fail_job(String.t(), term(), map()) ::
{:ok, map()} | {:ok, Job.t()} | {:error, term()}
@callback retry_job(String.t(), keyword()) :: {:ok, map()} | {:ok, Job.t()} | {:error, term()}
@callback discard_job(String.t(), term(), map()) ::
{:ok, map()} | {:ok, Job.t()} | {:error, term()}
@callback cancel_job(String.t(), term(), map()) ::
{:ok, map()} | {:ok, Job.t()} | {:error, term()}
@callback list_jobs(keyword()) :: {:ok, [map()]} | {:ok, [Job.t()]} | {:error, term()}
@callback list_jobs_page(keyword()) ::
{:ok, %{jobs: [map()] | [Job.t()], total: non_neg_integer()}} | {:error, term()}
@callback insert_history_event(String.t(), map()) :: :ok | {:error, term()}
@callback list_history(String.t()) :: {:ok, [map()]} | {:error, term()}
@callback move_to_dead_letter(String.t(), term(), map()) ::
{:ok, map()} | {:ok, Job.t()} | {:error, term()}
@callback list_dead_jobs(keyword()) :: {:ok, [map()]} | {:ok, [Job.t()]} | {:error, term()}
@callback insert(Job.t()) :: {:ok, Job.t()} | {:error, term()}
@callback update(Job.t()) :: {:ok, Job.t()} | {:error, term()}
@callback fetch(String.t()) :: {:ok, Job.t()} | {:error, term()}
@callback claim(atom(), DateTime.t()) :: {:ok, Job.t()} | :not_found | {:error, term()}
@callback promote_scheduled(DateTime.t()) :: non_neg_integer()
@callback prunable_jobs(DateTime.t()) :: [Job.t()]
@callback delete(String.t()) :: :ok
@callback all() :: [Job.t()]
@callback start_job(Job.t(), map(), DateTime.t()) :: {:ok, Job.t()} | {:error, term()}
@optional_callbacks [
start_job: 3,
claim_and_start_available_jobs: 3,
defer_job: 3,
insert: 1,
update: 1,
fetch: 1,
claim: 2,
promote_scheduled: 1,
prunable_jobs: 1,
delete: 1,
all: 0,
list_jobs_page: 1
]
@doc """
Ensures the storage backend schema and tables exist.
Called automatically on application start. Call explicitly when embedding
Kathikon in scripts, Livebook, or tests without the full application.
## Examples
:ok = Kathikon.Storage.setup()
"""
@spec setup() :: :ok
def setup, do: backend_module().setup()
@doc false
@spec clear_jobs!() :: :ok
def clear_jobs!, do: backend_module().clear_jobs!()
@doc false
@spec reset!() :: :ok
def reset!, do: backend_module().reset!()
@doc false
@spec backend() :: module()
def backend, do: backend_module()
@doc false
@spec backend(module()) :: :ok
def backend(module) when is_atom(module) do
Application.put_env(:kathikon, @backend_key, module)
end
@doc false
@spec set_test_backend!(module()) :: :ok
def set_test_backend!(module) when is_atom(module) do
Process.put(@storage_override, module)
:ok
end
@doc false
@spec clear_test_backend!() :: :ok
def clear_test_backend! do
Process.delete(@storage_override)
:ok
end
@doc false
@spec with_backend(module(), (-> term())) :: term()
def with_backend(module, fun) when is_atom(module) and is_function(fun, 0) do
previous = Process.get(@storage_override)
Process.put(@storage_override, module)
try do
fun.()
after
case previous do
nil -> Process.delete(@storage_override)
mod -> Process.put(@storage_override, mod)
end
end
end
@doc false
def start_job(job, claimant, now \\ DateTime.utc_now()),
do: backend_module().start_job(job, claimant, now)
@doc false
def insert(job), do: backend_module().insert(job)
@doc false
def update(job), do: backend_module().update(job)
@doc false
def fetch(id), do: backend_module().fetch(id)
@doc false
def claim(queue, now), do: backend_module().claim(queue, now)
@doc false
def promote_scheduled(now), do: backend_module().promote_scheduled(now)
@doc false
def prunable_jobs(cutoff), do: backend_module().prunable_jobs(cutoff)
@doc false
def delete(id), do: backend_module().delete(id)
@doc false
def all, do: backend_module().all()
@doc false
def insert_job(job), do: backend_module().insert_job(job)
@doc false
def get_job(id), do: backend_module().get_job(id)
@doc false
def update_job(id, changes), do: backend_module().update_job(id, changes)
@doc false
def claim_job(id, claimant), do: backend_module().claim_job(id, claimant)
@doc false
def claim_available_jobs(queue, limit, claimant),
do: backend_module().claim_available_jobs(queue, limit, claimant)
@doc false
def claim_and_start_available_jobs(queue, limit, claimant),
do: backend_module().claim_and_start_available_jobs(queue, limit, claimant)
@doc false
def defer_job(id, scheduled_at, metadata),
do: backend_module().defer_job(id, scheduled_at, metadata)
@doc false
def fetch_batch(batch_id), do: backend_module().fetch_batch(batch_id)
@doc false
def write_batch(batch), do: backend_module().write_batch(batch)
@doc false
def start_batch(parent_id, child_jobs, batch_attrs),
do: backend_module().start_batch(parent_id, child_jobs, batch_attrs)
@doc false
def record_batch_child_finished(child_job),
do: backend_module().record_batch_child_finished(child_job)
@doc false
def complete_job(id, result, metadata), do: backend_module().complete_job(id, result, metadata)
@doc false
def fail_job(id, error, metadata), do: backend_module().fail_job(id, error, metadata)
@doc false
def retry_job(id, opts \\ []), do: backend_module().retry_job(id, opts)
@doc false
def discard_job(id, reason, metadata), do: backend_module().discard_job(id, reason, metadata)
@doc false
def cancel_job(id, reason, metadata), do: backend_module().cancel_job(id, reason, metadata)
@doc false
def list_jobs(opts \\ []), do: backend_module().list_jobs(opts)
@doc false
def list_jobs_page(opts \\ []) do
mod = backend_module()
if function_exported?(mod, :list_jobs_page, 1) do
mod.list_jobs_page(opts)
else
list_jobs_page_fallback(opts)
end
end
@doc false
def insert_history_event(job_id, event),
do: backend_module().insert_history_event(job_id, event)
@doc false
def list_history(job_id), do: backend_module().list_history(job_id)
@doc false
def move_to_dead_letter(id, reason, metadata),
do: backend_module().move_to_dead_letter(id, reason, metadata)
@doc false
def list_dead_jobs(opts \\ []), do: backend_module().list_dead_jobs(opts)
defp list_jobs_page_fallback(opts) do
queue = Keyword.get(opts, :queue)
states = Keyword.get(opts, :states)
limit = Keyword.get(opts, :limit, 50)
offset = Keyword.get(opts, :offset, 0)
order = Keyword.get(opts, :order, :newest)
with {:ok, jobs} <- list_jobs(if queue, do: [queue: queue], else: []) do
filtered =
jobs
|> filter_jobs_by_states(states)
|> sort_jobs_for_page(order)
page = filtered |> Enum.drop(offset) |> Enum.take(limit)
{:ok, %{jobs: page, total: length(filtered)}}
end
end
defp filter_jobs_by_states(jobs, nil), do: jobs
defp filter_jobs_by_states(jobs, states), do: Enum.filter(jobs, &(&1.state in states))
defp sort_jobs_for_page(jobs, :newest) do
Enum.sort_by(jobs, &job_page_sort_time/1, {:desc, DateTime})
end
defp sort_jobs_for_page(jobs, :oldest) do
Enum.sort_by(jobs, &job_page_sort_time/1, DateTime)
end
defp job_page_sort_time(job) do
job.inserted_at || job.available_at || DateTime.utc_now()
end
defp backend_module do
Process.get(@storage_override) ||
Application.get_env(:kathikon, @backend_key, Kathikon.Storage.Mnesia)
end
end