# Building a Workflow
This guide walks through a complete Baton workflow end to end: a **fan-out /
fan-in** (diamond) DAG, the **pruning** setup that keeps its data from
accumulating, and a **LiveView** that renders progress live over PubSub.
The example is a small "research brief" pipeline:
```
┌──────────────────┐
│ fetch_sources │ (root)
└────────┬─────────┘
┌────────────┼────────────┐ fan-out: 3 steps run in parallel
▼ ▼ ▼
┌──────────┐ ┌───────────┐ ┌──────────────┐
│summarize │ │ keywords │ │ credibility │
└────┬─────┘ └─────┬─────┘ └──────┬───────┘
└─────────────┼──────────────┘ fan-in: waits for all three
▼
┌──────────────────┐
│ compile_brief │
└──────────────────┘
```
`fetch_sources` **fans out** to three independent analyses that run
concurrently; `compile_brief` **fans them in**, running only once all three have
finished, and reads each of their results.
## 1. Setup
Install the schema and register the plugin (see also the
[getting started guide](getting_started.md)):
```elixir
defmodule MyApp.Repo.Migrations.AddBaton do
use Ecto.Migration
def up, do: Baton.Migration.up()
def down, do: Baton.Migration.down()
end
```
```elixir
# config/config.exs
config :my_app, Oban,
repo: MyApp.Repo,
queues: [default: 20],
plugins: [
{Oban.Plugins.Pruner, max_age: 60 * 60 * 24},
{Baton.Plugin, interval: :timer.seconds(60)}
]
```
## 2. Define the steps
Each step is a `Baton.Worker`. A worker implements `perform_workflow/1` and
returns `{:ok, result}`; the result is stored on the step's node and made
available to downstream steps. Downstream steps read their dependencies'
results with `Baton.Results.get_result/2` (or `get_all_results/1`).
```elixir
defmodule MyApp.Research.FetchSources do
use Baton.Worker, queue: :default
@impl true
def perform_workflow(%Oban.Job{args: %{"topic" => topic}}) do
# In a real pipeline this would hit an API; here we just return some text.
{:ok, %{"documents" => MyApp.Search.fetch(topic)}}
end
end
```
The three fan-out steps each depend on `fetch_sources` and read its result:
```elixir
defmodule MyApp.Research.Summarize do
use Baton.Worker, queue: :default
alias Baton.Results
@impl true
def perform_workflow(%Oban.Job{} = job) do
{:ok, %{"documents" => docs}} = Results.get_result(job, :fetch_sources)
{:ok, %{"summary" => MyApp.LLM.summarize(docs)}}
end
end
defmodule MyApp.Research.Keywords do
use Baton.Worker, queue: :default
alias Baton.Results
@impl true
def perform_workflow(%Oban.Job{} = job) do
{:ok, %{"documents" => docs}} = Results.get_result(job, :fetch_sources)
{:ok, %{"keywords" => MyApp.LLM.keywords(docs)}}
end
end
defmodule MyApp.Research.Credibility do
use Baton.Worker, queue: :default
alias Baton.Results
@impl true
def perform_workflow(%Oban.Job{} = job) do
{:ok, %{"documents" => docs}} = Results.get_result(job, :fetch_sources)
{:ok, %{"score" => MyApp.LLM.credibility(docs)}}
end
end
```
The fan-in step depends on **all three** and reads each result. With
`get_all_results/1` you get every dependency's result keyed by step name:
```elixir
defmodule MyApp.Research.CompileBrief do
use Baton.Worker, queue: :default
alias Baton.Results
@impl true
def perform_workflow(%Oban.Job{} = job) do
%{
"summarize" => %{"summary" => summary},
"keywords" => %{"keywords" => keywords},
"credibility" => %{"score" => score}
} = Results.get_all_results(job)
brief = %{
summary: summary,
keywords: keywords,
credibility: score
}
{:ok, %{"brief" => brief}}
end
end
```
## 3. Build and insert the workflow
`deps:` is what wires the DAG. The fan-out is three steps sharing one
dependency; the fan-in is one step depending on all three. Baton validates the
graph (no cycles, all deps exist) before inserting anything.
```elixir
alias MyApp.Research.{FetchSources, Summarize, Keywords, Credibility, CompileBrief}
{:ok, jobs} =
Baton.new(workflow_name: "research:#{topic}")
|> Baton.add(:fetch_sources, FetchSources.new(%{topic: topic}))
# fan-out — all three depend only on fetch_sources, so they run in parallel
|> Baton.add(:summarize, Summarize.new(%{}), deps: [:fetch_sources])
|> Baton.add(:keywords, Keywords.new(%{}), deps: [:fetch_sources])
|> Baton.add(:credibility, Credibility.new(%{}), deps: [:fetch_sources])
# fan-in — runs only after all three finish
|> Baton.add(:compile_brief, CompileBrief.new(%{}),
deps: [:summarize, :keywords, :credibility])
|> Baton.insert()
```
That's it — Baton handles ordering, parallelism, and result passing. Each step
gates itself at runtime: the fan-in step snoozes until its three dependencies
have completed, then runs once.
> #### Tip {: .tip}
>
> To run the *same* worker across several models and synthesize the outputs,
> use `Baton.MultiModel.fan_out/4` instead of adding the parallel steps by hand
> — see the [multi-model guide](multi_model.md).
## 4. Set up pruning
Baton's tables (`workflow_nodes`, `workflow_step_stats`, `workflow_debug_logs`,
`workflow_completions`) have no foreign key to `oban_jobs`, so Oban's `Pruner`
won't clean them up. Turn on pruning in the plugin so Baton rows are removed
once their backing Oban job has been pruned:
```elixir
plugins: [
{Oban.Plugins.Pruner, max_age: 60 * 60 * 24}, # 24h — see note below
{Baton.Plugin,
interval: :timer.seconds(60),
prune: true, # off by default
debug_log_max_age: 60 * 60 * 24} # optional: cap debug logs at 24h (seconds)
]
```
- `prune: true` deletes orphaned Baton rows each sweep (orphaned = their Oban
job is gone). This piggybacks on Oban's `Pruner`, so you have a single
retention policy.
- `debug_log_max_age` gives `workflow_debug_logs` — the largest rows — a
separate, shorter age cap.
> #### The Pruner must outlast your workflows {: .warning}
>
> Set the `Pruner`'s `max_age` longer than your slowest workflow's total
> runtime. Baton resolves a dependency's result from its Oban job; if Oban prunes
> a completed step while the workflow is still running, downstream steps will
> treat that dependency as failed. A day is generous for most workflows.
## 5. Render progress in a LiveView
Baton broadcasts every step transition over `Phoenix.PubSub`, plus a single
terminal event when the whole workflow settles. Point Baton at your app's
PubSub server:
```elixir
config :baton, pubsub: MyApp.PubSub
```
Each transition is published on two topics — `"workflow:all"` and
`"workflow:<workflow_id>"` — as `{:workflow_step_updated, payload}`. When the
last step settles, a one-shot `{:workflow_finished, payload}` is published on
the same topics. Subscribe with `Baton.Events`:
```elixir
defmodule MyAppWeb.ResearchLive do
use MyAppWeb, :live_view
alias Baton.Events
@impl true
def mount(%{"id" => workflow_id}, _session, socket) do
if connected?(socket), do: Events.subscribe_workflow(workflow_id)
{:ok,
socket
|> assign(:workflow_id, workflow_id)
|> assign(:steps, %{}) # step_name => latest event
|> assign(:outcome, nil)}
end
# A step changed state (executing / completed / retryable / discarded / ...).
@impl true
def handle_info({:workflow_step_updated, %{step_name: name} = event}, socket) do
{:noreply, update(socket, :steps, &Map.put(&1, name, event))}
end
# Fired once, when the whole workflow is done.
def handle_info({:workflow_finished, %{outcome: outcome, failed_steps: failed}}, socket) do
{:noreply,
socket
|> assign(:outcome, outcome) # :completed | :failed
|> assign(:failed_steps, failed)}
end
@impl true
def render(assigns) do
~H"""
<h1>Research brief <small>{@workflow_id}</small></h1>
<p :if={@outcome}>Workflow {@outcome}</p>
<ul>
<li :for={{name, ev} <- Enum.sort_by(@steps, &elem(&1, 0))}>
<strong>{name}</strong>: {ev.state}
<span :if={ev.error}>— {ev.error}</span>
</li>
</ul>
"""
end
end
```
The `{:workflow_step_updated, _}` payload carries `step_name`, `state`,
`worker`, `attempt`, `has_result`, and `error`; `state` is one of `"executing"`,
`"snoozed"`, `"completed"`, `"retryable"`, `"discarded"`, or `"cancelled"`.
> #### Seed initial state on mount {: .info}
>
> PubSub only delivers events that occur *after* `mount`. To reflect a workflow
> that's already running (or finished) when the page loads, seed `@steps` from
> `Baton.Query.get_workflow_jobs/1` in `mount`, then let incoming events keep it
> current.
>
> The crash-case terminal event relies on `Baton.Plugin` being in your Oban
> `plugins:` list — see [`Baton.Plugin`](Baton.Plugin.html) and the README's
> *Integrating with Phoenix LiveView* section.
## What you built
- A diamond DAG with one **fan-out** point and one **fan-in** point, with
results flowing across every edge — no manual coordination.
- **Pruning** that keeps Baton's tables bounded, tied to Oban's own retention.
- A **LiveView** that updates in real time and reacts to workflow completion.
For LLM-specific concerns (cost tracking, idempotent retries, context capture,
multi-model fan-out) see `Baton.LLMWorker`, `Baton.Stats`, `Baton.Debug`, and
the [multi-model guide](multi_model.md).