Skip to main content

README.md

<p align="center">
  <img src="guides/assets/baton-logo.svg" alt="Baton" width="320">
</p>

<p align="center">
  <a href="https://hex.pm/packages/baton"><img src="https://img.shields.io/hexpm/v/baton.svg" alt="Hex.pm"></a>
  <a href="https://hexdocs.pm/baton"><img src="https://img.shields.io/badge/hex-docs-lightgreen.svg" alt="Documentation"></a>
  <a href="https://github.com/RudeWalrus/baton/actions/workflows/ci.yml"><img src="https://github.com/RudeWalrus/baton/actions/workflows/ci.yml/badge.svg" alt="CI"></a>
  <a href="https://github.com/RudeWalrus/baton/blob/main/LICENSE"><img src="https://img.shields.io/badge/license-MIT-blue.svg" alt="License"></a>
</p>

DAG-based job workflows for [Oban](https://github.com/oban-bg/oban): dependency
ordering, fan-out/synthesis, result passing, retry idempotency, and per-step LLM
cost tracking — built entirely on Oban OSS, no Oban Pro required.

## Features

- **Directed acyclic graphs** of Oban jobs with named dependencies, validated
  for cycles before insertion (Kahn's algorithm).
- **Self-gating execution** — each job checks its dependencies at runtime and
  snoozes, proceeds, or cancels accordingly. No external scheduler.
- **Completion-triggered rescheduling** so downstream steps start promptly
  instead of waiting out a snooze timer.
- **Result passing** between steps, stored in the engine's own table (never in
  `oban_jobs.meta`).
- **Retry idempotency** — a retried step that already produced a result returns
  it without re-running side effects (important for paid LLM calls).
- **Multi-model fan-out** — run the same step across several models and
  synthesize the results.
- **Observability** — telemetry for every transition, optional per-step token/
  cost stats, optional full context-window capture, and live step events over
  `Phoenix.PubSub` for building a LiveView dashboard.

## Installation

```elixir
def deps do
  [
    {:baton, "~> 0.1"},
    {:oban, "~> 2.17"}
  ]
end
```

Add the schema via a migration:

```elixir
defmodule MyApp.Repo.Migrations.AddBaton do
  use Ecto.Migration
  # Omit :version to install the latest schema. The migration is idempotent
  # (create_if_not_exists), so to upgrade an existing install you can ship a new
  # migration that simply calls Baton.Migration.up/0 again.
  def up,   do: Baton.Migration.up()
  def down, do: Baton.Migration.down()
end
```

Configure (the repo is inherited from Oban automatically):

```elixir
config :baton,
  oban_name: Oban,
  pubsub: MyApp.PubSub,            # only for live events/dashboard
  pricing: MyApp.LLMPricing        # only if tracking cost

config :my_app, Oban,
  plugins: [
    Oban.Plugins.Pruner,
    {Baton.Plugin, interval: :timer.seconds(60)}
  ],
  queues: [default: 20]
```

### Data retention

Baton's tables (`workflow_nodes`, `workflow_step_stats`,
`workflow_debug_logs`, `workflow_completions`) have **no foreign key** to
`oban_jobs`, so Oban's `Pruner` does not clean them up — left alone they grow
without bound. Enable pruning on `Baton.Plugin` to delete Baton rows
once their backing Oban job has been pruned:

```elixir
{Baton.Plugin,
  interval: :timer.seconds(60),
  prune: true,                       # off by default
  debug_log_max_age: 24 * 60 * 60}   # optional: cap debug logs at 24h (seconds)
```

This piggybacks on Oban's `Pruner`, so there's a single retention policy. For
this to be safe, the `Pruner`'s `max_age` must exceed your longest workflow's
runtime (which Baton already requires for correct dependency gating) — set
it generously, e.g. `{Oban.Plugins.Pruner, max_age: 60 * 60 * 24}`.

## Usage

```elixir
defmodule MyApp.Steps.Fetch do
  use Baton.Worker, queue: :default

  @impl true
  def perform_workflow(%Oban.Job{args: %{"url" => url}}) do
    {:ok, %{body: fetch(url)}}
  end
end

Baton.new(workflow_name: "ingest")
|> Baton.add(:fetch, MyApp.Steps.Fetch.new(%{url: "https://example.com"}))
|> Baton.add(:parse, MyApp.Steps.Parse.new(%{}), deps: [:fetch])
|> Baton.add(:store, MyApp.Steps.Store.new(%{}), deps: [:parse])
|> Baton.insert!()
```

See the [getting started guide](guides/getting_started.md), the
[building a workflow guide](guides/building_a_workflow.md) (fan-out/fan-in,
pruning, and a live LiveView), and the [multi-model guide](guides/multi_model.md).

## Integrating with Phoenix LiveView

Baton ships **no LiveView of its own**. Instead, every step transition is
broadcast over `Phoenix.PubSub`, so you render progress however you like. (The
same transitions are also emitted as telemetry — see `Baton.Telemetry` — if
you'd rather not use Phoenix at all.)

### 1. Point Baton at your PubSub

A Phoenix app already starts one in its supervision tree (`{Phoenix.PubSub,
name: MyApp.PubSub}`). Tell Baton to use it:

```elixir
config :baton, pubsub: MyApp.PubSub
```

If `:pubsub` is left unset, broadcasting is a no-op and the engine runs fine
without Phoenix — only telemetry is emitted.

### 2. Topics and message shape

Each transition is published on **two topics** so views can subscribe at the
granularity they need:

- `"workflow:all"` — every event from every workflow (index views)
- `"workflow:<workflow_id>"` — one workflow's events (detail views)

Don't build these strings by hand — use the helpers in `Baton.Events`. The
message is always:

```elixir
{:workflow_step_updated, %{
  workflow_id:    "uuid",
  workflow_label: "patent:US11234567B2",  # the :workflow_name you passed to new/1
  step_name:      "assess_quality",
  worker:         "MyApp.Patent.AssessQuality",
  state:          "completed",  # see below
  job_id:         123,
  attempt:        1,
  has_result:     true,
  error:          nil,          # an error string on failure, else nil
  timestamp:      ~U[2026-06-14 18:00:00Z]
}}
```

`state` is one of `"executing"`, `"snoozed"`, `"completed"`, `"retryable"`,
`"discarded"`, or `"cancelled"`.

When the **last** step in a workflow settles, a single terminal event is
published on the same two topics:

```elixir
{:workflow_finished, %{
  workflow_id:    "uuid",
  workflow_label: "patent:US11234567B2",
  outcome:        :completed,    # or :failed
  failed_steps:   [],            # step names that were cancelled/discarded
  timestamp:      ~U[2026-06-14 18:00:24Z]
}}
```

Use it to flip the page to a done state, redirect, or fire a notification
without polling. (The same signal is available as
`[:baton, :workflow, :finished]` telemetry if you're not using PubSub.)

> **Requires `Baton.Plugin` for crash-case coverage.** When a step fails by
> returning `{:error, reason}`, the finished event fires immediately. But if a
> step *hard-crashes* (raises/exits) or is killed by Oban, the worker never gets
> to announce — `Baton.Plugin`'s periodic sweep is what detects the settled
> workflow and broadcasts `{:workflow_finished, outcome: :failed}` as a backstop
> (typically within one sweep interval). Make sure the plugin is in your Oban
> `plugins:` list (see [Installation](#installation)); without it, workflows that
> die from a hard crash won't emit a terminal event.

### 3. Subscribe in a LiveView

```elixir
defmodule MyAppWeb.WorkflowLive do
  use MyAppWeb, :live_view
  alias Baton.Events

  def mount(%{"id" => workflow_id}, _session, socket) do
    if connected?(socket), do: Events.subscribe_workflow(workflow_id)
    {:ok, assign(socket, workflow_id: workflow_id, steps: %{})}
  end

  def handle_info({:workflow_step_updated, %{step_name: name} = event}, socket) do
    {:noreply, update(socket, :steps, &Map.put(&1, name, event))}
  end

  # ... render @steps ...
end
```

For an index of all running workflows, subscribe with `Events.subscribe_all/0`
and key your state by `event.workflow_id`. A complete, copy-paste pair of
detail and index LiveViews lives in
[`examples/my_app/live/workflow_live.ex`](examples/my_app/live/workflow_live.ex).

### Seeding initial state

PubSub only delivers events that occur *after* `mount`, so a fresh page load (or
a step that completed before the user opened the view) won't be reflected by
events alone. Seed `@steps` from the database on mount using `Baton.Query`,
then let incoming events keep it current — and handle `{:workflow_finished, _}`
to react when the whole workflow is done.

## How it compares to Oban Pro Workflow

Baton covers DAG ordering, fan-out/fan-in, dynamic workflows, result
passing, and dependency-failure cascading. It adds cycle detection, retry
idempotency, multi-model fan-out, and LLM cost tracking. The main mechanical
difference is that completion uses snooze-based gating plus an opportunistic
reschedule rather than Pro's event-driven completion; correctness does not
depend on the reschedule.

## License

MIT — see [LICENSE](https://github.com/RudeWalrus/baton/blob/main/LICENSE).