guides/patterns/pipeline.md

# Pipeline

Linear N-stage transformation chain. Each stage is a distinct
single-turn agent with its own role, the output of stage N becomes
the input of stage N+1 via `GenAgent.notify/2`, and the last stage
halts with the final result on state.

## When to reach for this

You have a transformation that decomposes cleanly into a fixed
sequence of steps, each step is best modeled as its own LLM with
its own system prompt and role, and the output of one step is
exactly the input to the next. Brainstorm -> edit -> headline.
Research question -> answer -> translate -> summarize. Spec ->
design -> implementation notes.

The key difference from [Research](research.md) is that pipeline
stages are **distinct agents** with **distinct roles**, not phases
inside one agent. That matters when each step benefits from a
different system prompt, different max tokens, or potentially a
different backend.

## What it exercises in gen_agent

- **One-way cross-agent notify chain**: each stage notifies the
  next with `{:pipeline_input, text}` and then halts.
- **`handle_event/2` returning `{:prompt, text, state}`**: the
  receiving stage turns the notify into its dispatch.
- **Per-stage self-halt after one turn**: each stage is
  intentionally one-and-done via `{:halt, new_state}` from
  `handle_response/3`.
- **Nil-terminator for the last stage**: the final stage has
  `next_stage: nil` and halts without notifying anyone.
- **Failure propagation**: `handle_error/3` forwards a
  `{:pipeline_failed, reason}` notify down the chain so no
  downstream stage sits waiting forever.

## The pattern

One callback module (used for every stage; the role and
instruction are per-stage config), plus a starter that wires up
the chain.

### `Pipeline.Stage`

```elixir
defmodule Pipeline.Stage do
  use GenAgent

  defmodule State do
    defstruct [
      :name,
      :next_stage,
      :role,
      :instruction,
      :input,
      :output,
      :error,
      index: 0
    ]
  end

  @impl true
  def init_agent(opts) do
    state = %State{
      name: Keyword.fetch!(opts, :agent_name),
      next_stage: Keyword.get(opts, :next_stage),
      role: Keyword.fetch!(opts, :role),
      instruction: Keyword.fetch!(opts, :instruction),
      index: Keyword.get(opts, :index, 0)
    }

    system = "You are #{state.role}. #{state.instruction}"
    {:ok, [system: system, max_tokens: Keyword.get(opts, :max_tokens, 400)], state}
  end

  @impl true
  def handle_response(_ref, response, %State{} = state) do
    output = String.trim(response.text)
    new_state = %{state | output: output}

    case state.next_stage do
      nil ->
        # End of pipeline. Final result is on state.output.
        {:halt, new_state}

      next ->
        GenAgent.notify(next, {:pipeline_input, output})
        {:halt, new_state}
    end
  end

  @impl true
  def handle_error(_ref, reason, %State{} = state) do
    new_state = %{state | error: reason}

    case state.next_stage do
      nil ->
        {:halt, new_state}

      next ->
        GenAgent.notify(next, {:pipeline_failed, reason})
        {:halt, new_state}
    end
  end

  @impl true
  def handle_event({:pipeline_input, text}, %State{} = state) do
    {:prompt, text, %{state | input: text}}
  end

  def handle_event({:pipeline_failed, reason}, %State{} = state) do
    new_state = %{state | error: {:upstream_failed, reason}}

    case state.next_stage do
      nil -> {:halt, new_state}
      next ->
        GenAgent.notify(next, {:pipeline_failed, reason})
        {:halt, new_state}
    end
  end

  def handle_event(_other, state), do: {:noreply, state}
end
```

### Starter

```elixir
defmodule Pipeline do
  alias Pipeline.Stage

  def run(initial_input, stages_config, opts \\ []) do
    backend = Keyword.get(opts, :backend, GenAgent.Backends.Anthropic)
    id = System.unique_integer([:positive])

    # Assign unique names per stage and compute the next-stage pointer.
    stage_names =
      stages_config
      |> Enum.with_index(1)
      |> Enum.map(fn {cfg, i} -> "pipe-#{id}-#{i}-#{cfg.name}" end)

    # next_map: stage_name -> name_of_next_stage_or_nil
    next_map =
      stage_names
      |> Enum.zip(Enum.drop(stage_names, 1) ++ [nil])
      |> Map.new()

    stages_config
    |> Enum.with_index(1)
    |> Enum.each(fn {cfg, i} ->
      name = Enum.at(stage_names, i - 1)

      {:ok, _pid} = GenAgent.start_agent(Stage,
        name: name,
        agent_name: name,
        backend: backend,
        next_stage: Map.fetch!(next_map, name),
        role: cfg.role,
        instruction: cfg.instruction,
        index: i
      )
    end)

    # Kick off the first stage with the initial input.
    [first | _] = stage_names
    {:ok, _ref} = GenAgent.tell(first, initial_input)

    {:ok, %{stages: stage_names}}
  end
end
```

## Using it

```elixir
{:ok, handle} = Pipeline.run(
  "the octopus has three hearts and blue blood",
  [
    %{
      name: "brainstorm",
      role: "a creative brainstormer",
      instruction: "Given a fact, list 3 distinct angles to write about it. One per line."
    },
    %{
      name: "editor",
      role: "a sharp editor",
      instruction: "Given a list of angles, pick the most interesting and develop it into a tight paragraph."
    },
    %{
      name: "headline",
      role: "a headline writer",
      instruction: "Given a paragraph, write ONE compelling title. Output only the title."
    }
  ]
)

# Each stage notifies the next as it completes. Wait for the
# last stage to halt:
last = List.last(handle.stages)

# In practice, a small wait loop checking:
%{agent_state: %{output: output}} = GenAgent.status(last)
IO.puts(output)

# Read the trace across all stages:
Enum.map(handle.stages, fn name ->
  %{agent_state: %{input: i, output: o, role: r}} = GenAgent.status(name)
  %{stage: name, role: r, in: i, out: o}
end)

# Clean up:
Enum.each(handle.stages, &GenAgent.stop/1)
```

## Variations

- **Per-stage backend selection.** Nothing in the pattern requires
  every stage to use the same backend. Pass a `:backend` in each
  stage config and let cheap stages (brainstorm) use a faster
  model than expensive stages (synthesis).
- **Branching pipelines.** Instead of a linear chain, have one
  stage notify multiple "next" stages with the same output, then
  a later join stage collects them. You're now halfway to the
  [Supervisor](supervisor.md) shape.
- **Reusable stages.** The same callback module can be
  instantiated many times in the same pipeline with different
  roles -- e.g. two "editor" stages in sequence with different
  instructions.
- **Conditional routing.** `handle_event({:pipeline_input, text}, state)`
  can inspect `text` before deciding what to do -- dispatch,
  transform, or `{:halt, state}` early if the upstream produced
  something bad.