defmodule Kathikon do
@moduledoc """
BEAM-native durable job queue and task execution platform.
## Examples
# Enqueue work
{:ok, job} = Kathikon.insert(MyApp.EmailWorker, %{"to" => "user@example.com"})
# Schedule for later
{:ok, job_id} = Kathikon.schedule(MyApp.DigestWorker, %{}, in: 3600)
# Inspect
{:ok, %{state: :completed}} = Kathikon.status(job.id)
See the [README](readme.html) and `docs/` guides for v0.2.0+ features:
atomic claiming, job history, dead-letter queue, scheduling, batches,
management APIs, reporting, and v0.2.1 operations tooling (`Kathikon.Dashboard`,
`mix kathikon.ops`).
"""
alias Kathikon.{Job, Queue, QueueControl, Storage, Telemetry}
@doc """
Inserts a job into the durable queue.
Starts the target queue dispatcher if needed. Emits `[:kathikon, :job, :inserted]`.
## Options
* `:queue` — target queue (default `:default`)
* `:priority` — higher values claim first (default `0`)
* `:max_attempts` — retry limit (default from config)
* `:schedule_in` — seconds until the job becomes available
* `:schedule_at` — `DateTime` or `NaiveDateTime` when the job becomes available
## Examples
{:ok, job} = Kathikon.insert(MyApp.EmailWorker, %{"to" => "user@example.com"})
{:ok, job} =
Kathikon.insert(MyApp.EmailWorker, %{"to" => "user@example.com"},
queue: :emails,
priority: 5,
max_attempts: 10
)
{:ok, job} = Kathikon.insert(MyApp.DigestWorker, %{}, schedule_in: 3600)
"""
@spec insert(module(), map(), keyword()) :: {:ok, Job.t()} | {:error, term()}
def insert(worker, args, opts \\ []) when is_atom(worker) and is_map(args) do
with {:ok, opts} <- Kathikon.Timezone.normalize_opts(opts),
job = Job.build(worker, args, opts),
:ok <- Queue.ensure_started(job.queue),
{:ok, job} <- Storage.insert(job) do
Telemetry.event([:job, :inserted], %{}, metadata(job, worker: worker))
{:ok, job}
end
end
@doc """
Schedules a job at a datetime, after a duration, or with a cron expression.
Returns `{:ok, job_id}` for one-time schedules or `{:ok, schedule_id}` for cron.
## Options
* `:at` — `DateTime` or `NaiveDateTime` for a one-time run
* `:in` — seconds from now for a one-time run
* `:cron` — 5-field cron expression for recurring schedules
* `:queue`, `:priority`, `:max_attempts` — same as `insert/3`
Requires `config :elixir, :time_zone_database, Tzdata.TimeZoneDatabase` for
naive datetimes and cron.
## Examples
{:ok, job_id} = Kathikon.schedule(MyApp.DigestWorker, %{}, in: 3600)
at = DateTime.add(DateTime.utc_now(), 5, :second)
{:ok, job_id} = Kathikon.schedule(MyApp.ReportWorker, %{}, at: at)
{:ok, schedule_id} =
Kathikon.schedule(MyApp.DigestWorker, %{"list" => "all"}, cron: "0 9 * * *")
See `Kathikon.Scheduler` and `docs/scheduling.md`.
"""
@spec schedule(module(), map(), keyword()) :: {:ok, term()} | {:error, term()}
def schedule(worker, args, opts \\ []) do
cond do
Keyword.has_key?(opts, :at) ->
Kathikon.Scheduler.schedule(worker, args, opts)
Keyword.has_key?(opts, :in) ->
Kathikon.Scheduler.schedule(worker, args, opts)
Keyword.has_key?(opts, :cron) ->
Kathikon.Scheduler.schedule(worker, args, opts)
true ->
{:error, :missing_schedule_option}
end
end
@doc """
Atomically claims a specific job by id and transitions it to `:running`.
Intended for external runners and management tools. Normal application code
should let dispatchers claim jobs automatically.
## Options
* `:node` — claimant node (default `node()`)
* `:pid` — claimant pid (default `self()`)
* `:dispatcher_id` — optional dispatcher reference
## Examples
{:ok, job} = Kathikon.claim(job_id)
# perform work, then complete via storage callbacks or worker return
{:error, :not_claimable} = Kathikon.claim(already_running_id)
"""
@spec claim(String.t(), keyword()) :: {:ok, Job.t()} | {:error, term()}
def claim(job_id, opts \\ []) do
claimant = build_claimant(opts)
with {:ok, claimed} <- Storage.claim_job(job_id, claimant),
{:ok, running} <- Storage.start_job(claimed, claimant) do
Telemetry.event([:job, :claimed], %{}, metadata(running))
Telemetry.event([:job, :started], %{}, metadata(running))
{:ok, running}
end
end
@doc """
Atomically claims up to `:limit` available jobs from a queue (default 1).
Each returned job is already in `:running` state.
## Examples
{:ok, [job]} = Kathikon.claim_available(:imports)
{:ok, jobs} = Kathikon.claim_available(:imports, limit: 5)
"""
@spec claim_available(atom(), keyword()) :: {:ok, [Job.t()]} | {:error, term()}
def claim_available(queue, opts \\ []) do
limit = Keyword.get(opts, :limit, 1)
claimant = build_claimant(opts)
with {:ok, running} <- Storage.claim_and_start_available_jobs(queue, limit, claimant) do
for job <- running do
Telemetry.event([:job, :claimed], %{}, metadata(job))
Telemetry.event([:job, :started], %{}, metadata(job))
end
{:ok, running}
end
end
@doc """
Returns durable history events for a job.
## Examples
{:ok, events} = Kathikon.history(job_id)
Enum.map(events, & &1.event)
#=> [:inserted, :claimed, :started, :completed]
"""
@spec history(String.t()) :: {:ok, [map()]} | {:error, term()}
def history(job_id), do: Storage.list_history(job_id)
@doc """
Returns job status suitable for dashboards.
## Examples
{:ok, status} = Kathikon.status(job_id)
status.state
#=> :completed
status.history_count
#=> 4
"""
@spec status(String.t()) :: {:ok, map()} | {:error, term()}
def status(job_id) do
with {:ok, job} <- Storage.fetch(job_id),
{:ok, events} <- Storage.list_history(job_id) do
{:ok, %{job: Job.to_map(job), history_count: length(events), state: job.state}}
end
end
@doc """
Cancels a pending job.
Only jobs in cancellable states (`:scheduled`, `:available`, `:retryable`,
`:claimed`) succeed. Running jobs return `{:error, :executing}`.
## Examples
{:ok, job} = Kathikon.cancel(job_id)
job.state
#=> :cancelled
{:ok, job} = Kathikon.cancel(job_id, :user_requested)
{:error, :executing} = Kathikon.cancel(running_job_id)
"""
@spec cancel(String.t(), term()) :: {:ok, Job.t()} | {:error, term()}
def cancel(job_id, reason \\ nil) do
case Storage.cancel_job(job_id, reason, %{}) do
{:error, :running} ->
{:error, :executing}
other ->
tap(other, fn
{:ok, job} -> Telemetry.event([:job, :cancel], %{}, metadata(job))
_ -> :ok
end)
end
end
@doc """
Retries a retryable, failed, or dead job in place.
Resets availability and increments nothing on the original record — the same
job id is retried.
## Examples
{:ok, job} = Kathikon.retry(retryable_job_id)
job.state
#=> :available
"""
@spec retry(String.t(), keyword()) :: {:ok, Job.t()} | {:error, term()}
def retry(job_id, opts \\ []) do
Storage.retry_job(job_id, opts)
|> tap(fn
{:ok, job} -> Telemetry.event([:job, :retried], %{}, metadata(job))
_ -> :ok
end)
end
@doc """
Creates a linked rerun of a job (original is unchanged).
The new job references the original via `rerun_of` and `original_job_id`.
## Examples
{:ok, original} = Kathikon.fetch(dead_job_id)
{:ok, rerun} = Kathikon.rerun(original.id)
rerun.rerun_of
#=> original.id
"""
@spec rerun(String.t(), keyword()) :: {:ok, Job.t()} | {:error, term()}
def rerun(job_id, opts \\ []) do
with {:ok, original} <- Storage.fetch(job_id) do
rerun_opts =
opts
|> Keyword.merge(
queue: Keyword.get(opts, :queue, original.queue),
rerun_of: job_id,
original_job_id: original.original_job_id || job_id
)
insert(original.worker, original.args, rerun_opts)
|> tap(fn
{:ok, job} ->
Storage.insert_history_event(job_id, %{
id: random_id(),
job_id: job_id,
event: :rerun_requested,
from_state: original.state,
to_state: job.state,
metadata: %{rerun_job_id: job.id},
inserted_at: DateTime.utc_now()
})
_ ->
:ok
end)
end
end
@doc """
Lists dead-letter jobs.
## Options
* `:queue` — filter by queue
## Examples
{:ok, dead} = Kathikon.dead_jobs()
{:ok, dead} = Kathikon.dead_jobs(queue: :default)
"""
@spec dead_jobs(keyword()) :: {:ok, [Job.t()]} | {:error, term()}
def dead_jobs(opts \\ []), do: Storage.list_dead_jobs(opts)
@doc """
Retries a dead-letter job by creating a linked rerun.
Alias for `rerun/2`.
## Examples
{:ok, job} = Kathikon.retry_dead(dead_job_id)
"""
@spec retry_dead(String.t(), keyword()) :: {:ok, Job.t()} | {:error, term()}
def retry_dead(job_id, opts \\ []), do: rerun(job_id, opts)
@doc """
Discards a dead-letter job permanently.
## Examples
{:ok, job} = Kathikon.discard_dead(dead_job_id)
job.state
#=> :discarded
{:ok, job} = Kathikon.discard_dead(dead_job_id, :no_longer_needed)
"""
@spec discard_dead(String.t(), term()) :: {:ok, Job.t()} | {:error, term()}
def discard_dead(job_id, reason \\ nil),
do: Storage.discard_job(job_id, reason || :dead_discarded, %{})
@doc """
Pauses a queue — dispatchers stop claiming new jobs.
## Examples
:ok = Kathikon.pause_queue(:emails)
"""
@spec pause_queue(atom()) :: :ok
def pause_queue(queue), do: QueueControl.pause(queue)
@doc """
Resumes a paused queue.
## Examples
:ok = Kathikon.resume_queue(:emails)
"""
@spec resume_queue(atom()) :: :ok
def resume_queue(queue), do: QueueControl.resume(queue)
@doc """
Returns pause status for a queue.
## Examples
%{queue: :default, paused: false} = Kathikon.queue_status(:default)
"""
@spec queue_status(atom()) :: map()
def queue_status(queue), do: QueueControl.status(queue)
@doc """
Returns child jobs for a parent or batch id.
## Examples
{:ok, children} = Kathikon.children(parent_job_id)
Enum.map(children, & &1.state)
"""
@spec children(String.t()) :: {:ok, [Job.t()]} | {:error, term()}
def children(job_id) do
with {:ok, jobs} <- Storage.list_jobs([]) do
{:ok,
Enum.filter(jobs, fn job -> job.parent_job_id == job_id or job.batch_id == job_id end)}
end
end
@doc """
Returns the persisted result for a completed job.
Requires `config :kathikon, result: :store` (the default).
## Examples
{:ok, result} = Kathikon.result(completed_job_id)
{:error, {:invalid_state, :running}} = Kathikon.result(running_job_id)
"""
@spec result(String.t()) :: {:ok, term()} | {:error, term()}
def result(job_id) do
with {:ok, job} <- Storage.fetch(job_id) do
if job.state == :completed,
do: {:ok, job.result},
else: {:error, {:invalid_state, job.state}}
end
end
@doc """
Returns errors recorded for a job.
## Examples
{:ok, errors} = Kathikon.errors(job_id)
List.last(errors)
#=> %{"at" => ~U[...], "error" => "...", "attempt" => 1}
"""
@spec errors(String.t()) :: {:ok, [map()]} | {:error, term()}
def errors(job_id) do
with {:ok, job} <- Storage.fetch(job_id) do
{:ok, job.errors}
end
end
@doc """
Fetches a job by id.
## Examples
{:ok, job} = Kathikon.fetch(job_id)
job.state
{:error, :not_found} = Kathikon.fetch("missing")
"""
@spec fetch(String.t()) :: {:ok, Job.t()} | {:error, term()}
def fetch(job_id), do: Storage.fetch(job_id)
@doc """
Lists all jobs currently in storage.
Intended for tests, debugging, and reporting — not for hot paths in production.
## Examples
Kathikon.all()
|> Enum.count(&(&1.state == :completed))
"""
@spec all() :: [Job.t()]
def all, do: Storage.all()
@doc """
Ensures a dispatcher is running for the given queue.
Called automatically by `insert/3` and `schedule/3`. Use when starting
queues dynamically.
## Examples
:ok = Kathikon.start_queue(:imports)
"""
@spec start_queue(atom()) :: :ok
def start_queue(queue) when is_atom(queue), do: Queue.ensure_started(queue)
defp build_claimant(opts) do
%{
node: Keyword.get(opts, :node, node()),
pid: inspect(Keyword.get(opts, :pid, self())),
claimed_at: DateTime.utc_now(),
dispatcher_id: Keyword.get(opts, :dispatcher_id, self())
}
end
defp metadata(%Job{} = job, extra \\ []) do
Enum.into(extra, %{
queue: job.queue,
job_id: job.id,
worker: job.worker,
state: job.state,
attempt: job.attempts,
node: job.node || node()
})
end
defp random_id do
Base.encode16(:crypto.strong_rand_bytes(8), case: :lower)
end
end