Skip to main content

lib/baton.ex

defmodule Baton do
  @moduledoc """
  Build and insert DAG-based Oban job workflows.

  Workflows link Oban jobs together with named dependencies, guaranteeing
  execution order regardless of scheduling or retries. Jobs are inserted
  atomically — all at once, in a single transaction alongside their workflow
  metadata — and each job gates itself at runtime by checking whether its
  declared dependencies have completed.

  ## Basic Usage

      alias Baton

      Baton.new()
      |> Baton.add(:a, FetchToken.new(%{api_key: "xyz"}))
      |> Baton.add(:b, ProcessA.new(%{url: "a.com"}), deps: [:a])
      |> Baton.add(:c, ProcessB.new(%{url: "b.com"}), deps: [:a])
      |> Baton.add(:d, Finalize.new(%{}), deps: [:b, :c])
      |> Baton.insert!()

  This produces a diamond-shaped DAG: `B` and `C` run in parallel after `A`,
  and `D` waits for both.

  ## Where state lives

  Each step has two rows:

    * an `oban_jobs` row — owned by Oban; its `meta` carries only immutable
      identifiers (`workflow_id`, `workflow_name`, `workflow_label`) written
      once and never mutated, so the engine never races Oban's own `meta` writes
    * a `workflow_nodes` row — owned by this engine; holds dependencies, the
      ignore-flags, and the step's result

  Both are inserted in one transaction, so a workflow is all-or-nothing.

  ## Repo / Oban instance

  The repo is resolved from Oban's configuration — you don't pass it. If you
  run a non-default Oban instance, pass `oban: MyApp.Oban` to `insert/2` (or set
  `config :baton, :oban_name, MyApp.Oban`). For backwards compatibility,
  `insert/2` still accepts a repo module as its second argument and ignores it.

  ## Options for `new/1`

    - `:workflow_id` — override the auto-generated UUIDv4 ID
    - `:workflow_name` — human-readable label for the whole workflow
    - `:ignore_cancelled` — apply globally to all steps (default: `false`)
    - `:ignore_discarded` — apply globally to all steps (default: `false`)
    - `:debug` — enable context-window capture for all steps (default: `false`)

  ## Options for `add/4`

    - `:deps` — list of step names this job depends on (atoms or strings)
    - `:ignore_cancelled` — override the workflow-level default for this step
    - `:ignore_discarded` — override the workflow-level default for this step
  """

  alias Baton.{Config, DAG}
  alias Ecto.Changeset

  @type step_name :: atom() | String.t()

  @type entry :: %{
          name: String.t(),
          changeset: Changeset.t(),
          deps: [String.t()],
          ignore_cancelled: boolean(),
          ignore_discarded: boolean()
        }

  @type t :: %__MODULE__{
          workflow_id: String.t(),
          workflow_name: String.t() | nil,
          global_ignore_cancelled: boolean(),
          global_ignore_discarded: boolean(),
          debug: boolean(),
          model_map: %{optional(atom() | String.t()) => String.t()},
          jobs: [entry()]
        }

  defstruct [
    :workflow_id,
    :workflow_name,
    global_ignore_cancelled: false,
    global_ignore_discarded: false,
    debug: false,
    model_map: %{},
    jobs: []
  ]

  @new_opts_schema [
    workflow_id: [type: :string],
    workflow_name: [type: :string],
    ignore_cancelled: [type: :boolean, default: false],
    ignore_discarded: [type: :boolean, default: false],
    debug: [type: :boolean, default: false]
  ]

  @add_opts_schema [
    deps: [type: {:list, {:or, [:atom, :string]}}, default: []],
    ignore_cancelled: [type: :boolean],
    ignore_discarded: [type: :boolean]
  ]

  @insert_opts_schema [
    oban: [type: :atom]
  ]

  @doc "Initialize a new workflow."
  @spec new(keyword()) :: t()
  def new(opts \\ []) do
    opts = NimbleOptions.validate!(opts, @new_opts_schema)

    %__MODULE__{
      workflow_id: Keyword.get(opts, :workflow_id, generate_id()),
      workflow_name: Keyword.get(opts, :workflow_name),
      global_ignore_cancelled: Keyword.fetch!(opts, :ignore_cancelled),
      global_ignore_discarded: Keyword.fetch!(opts, :ignore_discarded),
      debug: Keyword.fetch!(opts, :debug)
    }
  end

  @doc """
  Add a named step to the workflow.

  Raises `ArgumentError` immediately on a duplicate step name. Full cycle
  detection runs at `insert/2` time.
  """
  @spec add(t(), step_name(), Changeset.t(), keyword()) :: t()
  def add(%__MODULE__{} = workflow, name, %Changeset{} = job_changeset, opts \\ []) do
    opts = NimbleOptions.validate!(opts, @add_opts_schema)
    name_str = to_string(name)
    deps = opts |> Keyword.fetch!(:deps) |> Enum.map(&to_string/1)

    if name_str in Enum.map(workflow.jobs, & &1.name) do
      raise ArgumentError, "Workflow already has a step named #{inspect(name_str)}"
    end

    ignore_cancelled = Keyword.get(opts, :ignore_cancelled, workflow.global_ignore_cancelled)
    ignore_discarded = Keyword.get(opts, :ignore_discarded, workflow.global_ignore_discarded)

    # meta carries ONLY immutable identifiers. It is written once here and never
    # mutated after insert, so it can never race Oban's own writes to meta.
    # Dependencies, flags, and results live in the workflow_nodes row instead.
    meta =
      %{"workflow_id" => workflow.workflow_id, "workflow_name" => name_str}
      |> maybe_put("workflow_label", workflow.workflow_name)
      |> maybe_put("workflow_debug", if(workflow.debug, do: true))

    changeset =
      job_changeset
      |> maybe_inject_model(workflow.model_map, name)
      |> merge_meta(meta)

    entry = %{
      name: name_str,
      changeset: changeset,
      deps: deps,
      ignore_cancelled: ignore_cancelled,
      ignore_discarded: ignore_discarded
    }

    %{workflow | jobs: workflow.jobs ++ [entry]}
  end

  @doc """
  Validate the DAG, then insert all jobs and their workflow nodes in one
  transaction.

  Returns `{:ok, jobs}` or `{:error, {message, reason}}`.

  ## Options

    - `:oban` — Oban instance name (default: `Baton.Config.oban_name/0`)

  For backwards compatibility, passing a repo module as the second argument is
  accepted and ignored — the repo now comes from Oban.
  """
  @spec insert(t(), keyword() | module()) ::
          {:ok, [Oban.Job.t()]} | {:error, {String.t(), term()}}
  def insert(workflow, opts_or_repo \\ [])

  def insert(%__MODULE__{} = workflow, repo) when is_atom(repo) and not is_nil(repo) do
    # Legacy call style: Baton.insert(wf, MyApp.Repo). Repo now comes from Oban.
    insert(workflow, [])
  end

  def insert(%__MODULE__{} = workflow, opts) when is_list(opts) do
    opts = NimbleOptions.validate!(opts, @insert_opts_schema)

    case DAG.validate(build_step_list(workflow)) do
      {:ok, _order} -> do_insert(workflow, Keyword.get(opts, :oban, Config.oban_name()))
      {:error, reason} -> {:error, {DAG.format_error(reason), reason}}
    end
  end

  @doc "Like `insert/2` but returns the jobs directly or raises on failure."
  @spec insert!(t(), keyword() | module()) :: [Oban.Job.t()]
  def insert!(workflow, opts_or_repo \\ [])

  def insert!(%__MODULE__{} = workflow, repo) when is_atom(repo) and not is_nil(repo) do
    insert!(workflow, [])
  end

  def insert!(%__MODULE__{} = workflow, opts) when is_list(opts) do
    case insert(workflow, opts) do
      {:ok, jobs} -> jobs
      {:error, {message, _reason}} -> raise ArgumentError, message
    end
  end

  @doc """
  Validate the workflow DAG without inserting.

  Returns `{:ok, topological_order}` or `{:error, {message, reason}}`.
  """
  @spec validate(t()) :: {:ok, [String.t()]} | {:error, {String.t(), term()}}
  def validate(%__MODULE__{} = workflow) do
    case DAG.validate(build_step_list(workflow)) do
      {:ok, order} -> {:ok, order}
      {:error, reason} -> {:error, {DAG.format_error(reason), reason}}
    end
  end

  @doc "Step names in the order they were added."
  @spec step_names(t()) :: [String.t()]
  def step_names(%__MODULE__{jobs: jobs}), do: Enum.map(jobs, & &1.name)

  # ── Private ───────────────────────────────────────────────────────────────

  defp do_insert(%__MODULE__{} = workflow, oban_name) do
    repo = Config.repo(oban_name)
    changesets = Enum.map(workflow.jobs, & &1.changeset)

    txn =
      repo.transaction(fn ->
        # Jobs first — Oban assigns ids. Inside this transaction, Oban's insert
        # runs on the same connection, so a failure inserting nodes rolls the
        # jobs back too: a workflow is all-or-nothing.
        jobs = Oban.insert_all(oban_name, changesets)
        {_count, _} = Baton.Nodes.insert_all(repo, build_node_rows(workflow, jobs))
        jobs
      end)

    case txn do
      {:ok, jobs} -> {:ok, jobs}
      {:error, reason} -> {:error, {"workflow insert failed", reason}}
    end
  end

  # Correlate returned jobs back to builder entries by step name (unique within
  # a workflow), rather than relying on insert ordering.
  defp build_node_rows(%__MODULE__{} = workflow, jobs) do
    now = DateTime.utc_now()
    job_by_name = Map.new(jobs, fn job -> {job.meta["workflow_name"], job} end)

    Enum.map(workflow.jobs, fn entry ->
      job = Map.fetch!(job_by_name, entry.name)

      %{
        workflow_id: workflow.workflow_id,
        workflow_label: workflow.workflow_name,
        step_name: entry.name,
        oban_job_id: job.id,
        deps: entry.deps,
        ignore_cancelled: entry.ignore_cancelled,
        ignore_discarded: entry.ignore_discarded,
        result: nil,
        inserted_at: now,
        updated_at: now
      }
    end)
  end

  defp build_step_list(%__MODULE__{jobs: jobs}) do
    Enum.map(jobs, fn entry -> %{name: entry.name, deps: entry.deps} end)
  end

  defp merge_meta(%Changeset{} = cs, new_meta) do
    existing = Changeset.get_change(cs, :meta) || Changeset.get_field(cs, :meta) || %{}
    Changeset.put_change(cs, :meta, Map.merge(existing, new_meta))
  end

  # If a model map was set via `Baton.MultiModel.configure/2` and it names
  # this step, inject the model into the job's args as "workflow_model" so the
  # worker can read it with `MultiModel.model_for/2`. No-op for the common case
  # of an empty map.
  defp maybe_inject_model(%Changeset{} = cs, model_map, _name) when map_size(model_map) == 0,
    do: cs

  defp maybe_inject_model(%Changeset{} = cs, model_map, name) do
    case Map.get(model_map, name) || Map.get(model_map, to_string(name)) do
      nil ->
        cs

      model ->
        args = Changeset.get_change(cs, :args) || Changeset.get_field(cs, :args) || %{}
        Changeset.put_change(cs, :args, Map.put(args, "workflow_model", model))
    end
  end

  defp maybe_put(map, _key, nil), do: map
  defp maybe_put(map, key, value), do: Map.put(map, key, value)

  defp generate_id, do: Ecto.UUID.generate()
end