# Supervisor
Coordinator + dynamic worker pool. One coordinator plans sub-tasks
via its own LLM turn, **spawns N worker agents from inside its own
`handle_response/3` callback**, notifies each worker with its
sub-task, collects results as they arrive via notify, and
self-chains a synthesis turn.
## When to reach for this
A task decomposes into N independent sub-tasks where N is decided
by the LLM (or by runtime conditions), each sub-task gets its own
agent, and the coordinator has to aggregate their outputs into a
final answer. You want fan-out for parallelism and fan-in for the
synthesis step. Classic map/reduce, but every worker is an agent.
This is the richest cross-agent pattern in this collection.
Everything else composes: the coordinator is a
[Research](research.md)-style self-chaining agent whose planning
turn spawns a pool of one-shot workers, each of whom is
essentially a single-item [Pipeline](pipeline.md) stage.
## What it exercises in gen_agent
- **Dynamic `GenAgent.start_agent/2` called from inside a running
callback.** The coordinator's planning-phase `handle_response`
spawns workers on the fly. They join the shared `GenAgent`
supervision tree and are live from the moment they start.
- **Fan-out via notify**: the coordinator notifies each worker
with its sub-task immediately after spawning. Workers sit idle
until they receive the notify.
- **Fan-in via notify**: each worker notifies the coordinator
with its result (or failure). The coordinator's `handle_event/2`
accumulates results into a map.
- **Multi-phase coordinator state machine** with an LLM turn at
each end (planning -> dispatch -> collect -> synthesize).
- **Self-halt workers**: each worker halts via `{:halt, state}`
after its single turn so the coordinator doesn't have to track
or stop them explicitly.
## The pattern
Two callback modules: a `Coordinator` that owns the phase state
machine and spawns workers, and a `Worker` that is one-shot and
notifies the coordinator with its result.
### `Supervisor.Coordinator`
```elixir
defmodule Supervisor.Coordinator do
use GenAgent
alias Supervisor.Worker
defmodule State do
defstruct [
:topic,
:max_workers,
:coordinator_name,
:final_output,
:error,
phase: :planning,
sub_tasks: [],
workers: [],
results: %{},
failures: %{}
]
end
@impl true
def init_agent(opts) do
state = %State{
topic: Keyword.fetch!(opts, :topic),
max_workers: Keyword.get(opts, :max_workers, 3),
coordinator_name: Keyword.fetch!(opts, :coordinator_name)
}
system = """
You are a research coordinator.
When asked to plan sub-tasks, output them one per line, no
numbering or bullets -- just plain sub-task text, one per line.
When asked to synthesize worker results, write a coherent
2-3 paragraph answer that weaves together the findings.
"""
{:ok, [system: system, max_tokens: Keyword.get(opts, :max_tokens, 600)], state}
end
# Phase :planning -> spawn workers, notify each, transition to :collecting.
@impl true
def handle_response(_ref, response, %State{phase: :planning} = state) do
sub_tasks =
response.text
|> String.split("\n")
|> Enum.map(&String.trim/1)
|> Enum.reject(&(&1 == ""))
|> Enum.take(state.max_workers)
workers = spawn_workers(state.coordinator_name, sub_tasks)
Enum.zip(workers, sub_tasks)
|> Enum.each(fn {worker, task} ->
GenAgent.notify(worker, {:sub_task, task})
end)
new_state = %{state | sub_tasks: sub_tasks, workers: workers, phase: :collecting}
case sub_tasks do
[] -> {:halt, %{new_state | phase: :failed, error: :no_sub_tasks}}
_ -> {:noreply, new_state}
end
end
# Phase :synthesizing -> terminal halt with the final answer.
def handle_response(_ref, response, %State{phase: :synthesizing} = state) do
{:halt, %{state | final_output: String.trim(response.text), phase: :done}}
end
# Phase :collecting -> accumulate worker results, self-chain synthesis
# once everyone has reported.
@impl true
def handle_event({:worker_result, worker_name, text}, %State{phase: :collecting} = state) do
maybe_synthesize(%{state | results: Map.put(state.results, worker_name, text)})
end
def handle_event({:worker_failed, worker_name, reason}, %State{phase: :collecting} = state) do
maybe_synthesize(%{state | failures: Map.put(state.failures, worker_name, reason)})
end
def handle_event(_other, state), do: {:noreply, state}
@impl true
def handle_error(_ref, reason, %State{} = state) do
{:halt, %{state | error: reason, phase: :failed}}
end
defp maybe_synthesize(%State{} = state) do
received = map_size(state.results) + map_size(state.failures)
cond do
received < length(state.workers) ->
{:noreply, state}
state.results == %{} ->
{:halt, %{state | phase: :failed, error: :all_workers_failed}}
true ->
{:prompt, synthesis_prompt(state), %{state | phase: :synthesizing}}
end
end
defp synthesis_prompt(%State{} = state) do
sections =
state.sub_tasks
|> Enum.with_index()
|> Enum.map_join("\n\n", fn {task, i} ->
worker = Enum.at(state.workers, i)
result = Map.get(state.results, worker, "(worker failed)")
"Sub-task: #{task}\nResult: #{result}"
end)
"""
Your workers have reported on all sub-tasks for the topic:
#{state.topic}
Here is what each worker returned:
#{sections}
Synthesize these into a cohesive 2-paragraph answer.
"""
end
defp spawn_workers(coordinator_name, sub_tasks) do
sub_tasks
|> Enum.with_index(1)
|> Enum.map(fn {_task, i} ->
worker_name = "#{coordinator_name}-worker-#{i}"
{:ok, _pid} = GenAgent.start_agent(Worker,
name: worker_name,
backend: GenAgent.Backends.Anthropic,
worker_name: worker_name,
supervisor: coordinator_name
)
worker_name
end)
end
end
```
### `Supervisor.Worker`
```elixir
defmodule Supervisor.Worker do
use GenAgent
defmodule State do
defstruct [:name, :supervisor, :task, :result, :error]
end
@impl true
def init_agent(opts) do
state = %State{
name: Keyword.fetch!(opts, :worker_name),
supervisor: Keyword.fetch!(opts, :supervisor)
}
system = """
You are a research worker. You will be given exactly one
sub-task. Answer it in 2-3 concise sentences. No preamble.
"""
{:ok, [system: system, max_tokens: 300], state}
end
@impl true
def handle_event({:sub_task, task}, %State{} = state) do
{:prompt, task, %{state | task: task}}
end
def handle_event(_other, state), do: {:noreply, state}
@impl true
def handle_response(_ref, response, %State{} = state) do
result = String.trim(response.text)
GenAgent.notify(state.supervisor, {:worker_result, state.name, result})
{:halt, %{state | result: result}}
end
@impl true
def handle_error(_ref, reason, %State{} = state) do
GenAgent.notify(state.supervisor, {:worker_failed, state.name, reason})
{:halt, %{state | error: reason}}
end
end
```
## Using it
```elixir
name = "coord-#{System.unique_integer([:positive])}"
{:ok, _pid} = GenAgent.start_agent(Supervisor.Coordinator,
name: name,
backend: GenAgent.Backends.Anthropic,
topic: "why do octopuses have three hearts?",
max_workers: 3,
coordinator_name: name
)
# Kick off the planning turn.
{:ok, _ref} = GenAgent.tell(name,
"Break the topic into 3 specific sub-questions. One per line.")
# The coordinator will plan, spawn 3 workers, dispatch their
# sub-tasks, wait for responses, synthesize, and halt. The manager
# just watches.
# When phase: :done, read the final output:
%{agent_state: %{final_output: output}} = GenAgent.status(name)
IO.puts(output)
GenAgent.stop(name)
```
## Variations
- **Bounded concurrency.** For very large N, instead of spawning
N workers, spawn K and use a work-stealing loop: when one
worker halts, the coordinator notifies a new worker with the
next sub-task. See [Pool](pool.md) for a cleaner version of
this shape.
- **Heterogeneous workers.** Different sub-tasks can get
different worker modules. The coordinator's `spawn_workers`
function decides which module to instantiate based on the
sub-task content.
- **Partial success.** The current `maybe_synthesize` only
proceeds if at least one worker succeeded. You could instead
require a quorum (e.g. 2/3) or fail the whole run if any
worker failed.
- **Nested coordinators.** Any worker could itself be a
coordinator that fans out further. The shared supervision tree
doesn't care -- each level just spawns agents into it.
- **Streaming synthesis.** Instead of waiting for all workers
before synthesizing, the coordinator could start synthesis
once the first K results are in, incorporate later results by
editing state, and produce a final synthesis when everything
is complete. Requires a more complex phase machine.