defmodule Baton do
@moduledoc """
Build and insert DAG-based Oban job workflows.
Workflows link Oban jobs together with named dependencies, guaranteeing
execution order regardless of scheduling or retries. Jobs are inserted
atomically — all at once, in a single transaction alongside their workflow
metadata — and each job gates itself at runtime by checking whether its
declared dependencies have completed.
## Basic Usage
alias Baton
Baton.new()
|> Baton.add(:a, FetchToken.new(%{api_key: "xyz"}))
|> Baton.add(:b, ProcessA.new(%{url: "a.com"}), deps: [:a])
|> Baton.add(:c, ProcessB.new(%{url: "b.com"}), deps: [:a])
|> Baton.add(:d, Finalize.new(%{}), deps: [:b, :c])
|> Baton.insert!()
This produces a diamond-shaped DAG: `B` and `C` run in parallel after `A`,
and `D` waits for both.
## Where state lives
Each step has two rows:
* an `oban_jobs` row — owned by Oban; its `meta` carries only immutable
identifiers (`workflow_id`, `workflow_name`, `workflow_label`) written
once and never mutated, so the engine never races Oban's own `meta` writes
* a `workflow_nodes` row — owned by this engine; holds dependencies, the
ignore-flags, and the step's result
Both are inserted in one transaction, so a workflow is all-or-nothing.
## Repo / Oban instance
The repo is resolved from Oban's configuration — you don't pass it. If you
run a non-default Oban instance, pass `oban: MyApp.Oban` to `insert/2` (or set
`config :baton, :oban_name, MyApp.Oban`). For backwards compatibility,
`insert/2` still accepts a repo module as its second argument and ignores it.
## Options for `new/1`
- `:workflow_id` — override the auto-generated UUIDv4 ID
- `:workflow_name` — human-readable label for the whole workflow
- `:ignore_cancelled` — apply globally to all steps (default: `false`)
- `:ignore_discarded` — apply globally to all steps (default: `false`)
- `:debug` — enable context-window capture for all steps (default: `false`)
## Options for `add/4`
- `:deps` — list of step names this job depends on (atoms or strings)
- `:ignore_cancelled` — override the workflow-level default for this step
- `:ignore_discarded` — override the workflow-level default for this step
"""
alias Baton.{Config, DAG}
alias Ecto.Changeset
@type step_name :: atom() | String.t()
@type entry :: %{
name: String.t(),
changeset: Changeset.t(),
deps: [String.t()],
ignore_cancelled: boolean(),
ignore_discarded: boolean()
}
@type t :: %__MODULE__{
workflow_id: String.t(),
workflow_name: String.t() | nil,
global_ignore_cancelled: boolean(),
global_ignore_discarded: boolean(),
debug: boolean(),
model_map: %{optional(atom() | String.t()) => String.t()},
jobs: [entry()]
}
defstruct [
:workflow_id,
:workflow_name,
global_ignore_cancelled: false,
global_ignore_discarded: false,
debug: false,
model_map: %{},
jobs: []
]
@new_opts_schema [
workflow_id: [type: :string],
workflow_name: [type: :string],
ignore_cancelled: [type: :boolean, default: false],
ignore_discarded: [type: :boolean, default: false],
debug: [type: :boolean, default: false]
]
@add_opts_schema [
deps: [type: {:list, {:or, [:atom, :string]}}, default: []],
ignore_cancelled: [type: :boolean],
ignore_discarded: [type: :boolean]
]
@insert_opts_schema [
oban: [type: :atom]
]
@doc "Initialize a new workflow."
@spec new(keyword()) :: t()
def new(opts \\ []) do
opts = NimbleOptions.validate!(opts, @new_opts_schema)
%__MODULE__{
workflow_id: Keyword.get(opts, :workflow_id, generate_id()),
workflow_name: Keyword.get(opts, :workflow_name),
global_ignore_cancelled: Keyword.fetch!(opts, :ignore_cancelled),
global_ignore_discarded: Keyword.fetch!(opts, :ignore_discarded),
debug: Keyword.fetch!(opts, :debug)
}
end
@doc """
Add a named step to the workflow.
Raises `ArgumentError` immediately on a duplicate step name. Full cycle
detection runs at `insert/2` time.
"""
@spec add(t(), step_name(), Changeset.t(), keyword()) :: t()
def add(%__MODULE__{} = workflow, name, %Changeset{} = job_changeset, opts \\ []) do
opts = NimbleOptions.validate!(opts, @add_opts_schema)
name_str = to_string(name)
deps = opts |> Keyword.fetch!(:deps) |> Enum.map(&to_string/1)
if name_str in Enum.map(workflow.jobs, & &1.name) do
raise ArgumentError, "Workflow already has a step named #{inspect(name_str)}"
end
ignore_cancelled = Keyword.get(opts, :ignore_cancelled, workflow.global_ignore_cancelled)
ignore_discarded = Keyword.get(opts, :ignore_discarded, workflow.global_ignore_discarded)
# meta carries ONLY immutable identifiers. It is written once here and never
# mutated after insert, so it can never race Oban's own writes to meta.
# Dependencies, flags, and results live in the workflow_nodes row instead.
meta =
%{"workflow_id" => workflow.workflow_id, "workflow_name" => name_str}
|> maybe_put("workflow_label", workflow.workflow_name)
|> maybe_put("workflow_debug", if(workflow.debug, do: true))
changeset =
job_changeset
|> maybe_inject_model(workflow.model_map, name)
|> merge_meta(meta)
entry = %{
name: name_str,
changeset: changeset,
deps: deps,
ignore_cancelled: ignore_cancelled,
ignore_discarded: ignore_discarded
}
%{workflow | jobs: workflow.jobs ++ [entry]}
end
@doc """
Validate the DAG, then insert all jobs and their workflow nodes in one
transaction.
Returns `{:ok, jobs}` or `{:error, {message, reason}}`.
## Options
- `:oban` — Oban instance name (default: `Baton.Config.oban_name/0`)
For backwards compatibility, passing a repo module as the second argument is
accepted and ignored — the repo now comes from Oban.
"""
@spec insert(t(), keyword() | module()) ::
{:ok, [Oban.Job.t()]} | {:error, {String.t(), term()}}
def insert(workflow, opts_or_repo \\ [])
def insert(%__MODULE__{} = workflow, repo) when is_atom(repo) and not is_nil(repo) do
# Legacy call style: Baton.insert(wf, MyApp.Repo). Repo now comes from Oban.
insert(workflow, [])
end
def insert(%__MODULE__{} = workflow, opts) when is_list(opts) do
opts = NimbleOptions.validate!(opts, @insert_opts_schema)
case DAG.validate(build_step_list(workflow)) do
{:ok, _order} -> do_insert(workflow, Keyword.get(opts, :oban, Config.oban_name()))
{:error, reason} -> {:error, {DAG.format_error(reason), reason}}
end
end
@doc "Like `insert/2` but returns the jobs directly or raises on failure."
@spec insert!(t(), keyword() | module()) :: [Oban.Job.t()]
def insert!(workflow, opts_or_repo \\ [])
def insert!(%__MODULE__{} = workflow, repo) when is_atom(repo) and not is_nil(repo) do
insert!(workflow, [])
end
def insert!(%__MODULE__{} = workflow, opts) when is_list(opts) do
case insert(workflow, opts) do
{:ok, jobs} -> jobs
{:error, {message, _reason}} -> raise ArgumentError, message
end
end
@doc """
Validate the workflow DAG without inserting.
Returns `{:ok, topological_order}` or `{:error, {message, reason}}`.
"""
@spec validate(t()) :: {:ok, [String.t()]} | {:error, {String.t(), term()}}
def validate(%__MODULE__{} = workflow) do
case DAG.validate(build_step_list(workflow)) do
{:ok, order} -> {:ok, order}
{:error, reason} -> {:error, {DAG.format_error(reason), reason}}
end
end
@doc "Step names in the order they were added."
@spec step_names(t()) :: [String.t()]
def step_names(%__MODULE__{jobs: jobs}), do: Enum.map(jobs, & &1.name)
# ── Private ───────────────────────────────────────────────────────────────
defp do_insert(%__MODULE__{} = workflow, oban_name) do
repo = Config.repo(oban_name)
changesets = Enum.map(workflow.jobs, & &1.changeset)
txn =
repo.transaction(fn ->
# Jobs first — Oban assigns ids. Inside this transaction, Oban's insert
# runs on the same connection, so a failure inserting nodes rolls the
# jobs back too: a workflow is all-or-nothing.
jobs = Oban.insert_all(oban_name, changesets)
{_count, _} = Baton.Nodes.insert_all(repo, build_node_rows(workflow, jobs))
jobs
end)
case txn do
{:ok, jobs} -> {:ok, jobs}
{:error, reason} -> {:error, {"workflow insert failed", reason}}
end
end
# Correlate returned jobs back to builder entries by step name (unique within
# a workflow), rather than relying on insert ordering.
defp build_node_rows(%__MODULE__{} = workflow, jobs) do
now = DateTime.utc_now()
job_by_name = Map.new(jobs, fn job -> {job.meta["workflow_name"], job} end)
Enum.map(workflow.jobs, fn entry ->
job = Map.fetch!(job_by_name, entry.name)
%{
workflow_id: workflow.workflow_id,
workflow_label: workflow.workflow_name,
step_name: entry.name,
oban_job_id: job.id,
deps: entry.deps,
ignore_cancelled: entry.ignore_cancelled,
ignore_discarded: entry.ignore_discarded,
result: nil,
inserted_at: now,
updated_at: now
}
end)
end
defp build_step_list(%__MODULE__{jobs: jobs}) do
Enum.map(jobs, fn entry -> %{name: entry.name, deps: entry.deps} end)
end
defp merge_meta(%Changeset{} = cs, new_meta) do
existing = Changeset.get_change(cs, :meta) || Changeset.get_field(cs, :meta) || %{}
Changeset.put_change(cs, :meta, Map.merge(existing, new_meta))
end
# If a model map was set via `Baton.MultiModel.configure/2` and it names
# this step, inject the model into the job's args as "workflow_model" so the
# worker can read it with `MultiModel.model_for/2`. No-op for the common case
# of an empty map.
defp maybe_inject_model(%Changeset{} = cs, model_map, _name) when map_size(model_map) == 0,
do: cs
defp maybe_inject_model(%Changeset{} = cs, model_map, name) do
case Map.get(model_map, name) || Map.get(model_map, to_string(name)) do
nil ->
cs
model ->
args = Changeset.get_change(cs, :args) || Changeset.get_field(cs, :args) || %{}
Changeset.put_change(cs, :args, Map.put(args, "workflow_model", model))
end
end
defp maybe_put(map, _key, nil), do: map
defp maybe_put(map, key, value), do: Map.put(map, key, value)
defp generate_id, do: Ecto.UUID.generate()
end