defmodule Planck.Agent do
@moduledoc """
OTP-based LLM agent.
Each agent is a `GenServer` that drives the LLM loop:
stream a response → collect tool calls → execute them concurrently →
append results → re-stream until the model stops.
## Roles
An agent's role is derived from its tool list at start time:
- **Orchestrator** — has a tool named `"spawn_agent"` in its list. Owns a
`team_id`; all agents sharing that `team_id` are terminated when this
agent exits.
- **Worker** — no `"spawn_agent"` tool. Receives tasks and reports back.
## Events
Subscribers receive `{:agent_event, type, payload}` messages:
| Event | Payload keys |
|---|---|
| `:turn_start` | `index` |
| `:turn_end` | `message`, `usage` |
| `:text_delta` | `text` |
| `:thinking_delta` | `text` |
| `:usage_delta` | `delta` (`input_tokens`, `output_tokens`, `cost`), `total` (`input_tokens`, `output_tokens`, `cost`), `context_tokens` |
| `:tool_start` | `id`, `name`, `args` |
| `:tool_end` | `id`, `name`, `result`, `error` |
| `:worker_spawned` | — |
| `:worker_exit` | `pid`, `reason` |
| `:error` | `reason` |
## Example
{:ok, pid} = DynamicSupervisor.start_child(
Planck.Agent.AgentSupervisor,
{Planck.Agent,
id: "agent-1",
model: model,
system_prompt: "You are helpful.",
tools: [read_tool]}
)
Planck.Agent.subscribe(pid)
Planck.Agent.prompt(pid, "What is in lib/app.ex?")
"""
use GenServer
require Logger
alias Planck.Agent.{AIBehaviour, Message, Session, Tool}
alias Planck.AI.Context
@typedoc "A reference to a running agent — pid, registered name, or via-tuple."
@type agent :: pid() | atom() | {:via, module(), term()}
# ---------------------------------------------------------------------------
# State
# ---------------------------------------------------------------------------
@typedoc """
Internal GenServer state for an agent.
Public fields (readable via `get_state/1` or `get_info/1`):
- `id` — unique agent identifier
- `name` / `description` / `type` — display metadata set at start time
- `team_id` — registry namespace shared by all agents in the same team
- `session_id` — SQLite session this agent persists messages to; `nil` for
ephemeral agents
- `delegator_id` — id of the orchestrator that spawned this worker; `nil` for
orchestrators
- `role` — `:orchestrator` (has `spawn_agent` tool) or `:worker`
- `model` — the `Planck.AI.Model` the agent is configured to use
- `system_prompt` — prepended to every LLM context
- `cwd` — working directory for the session; used to locate `AGENTS.md`
- `messages` — full in-memory conversation history (`Message.t()` list)
- `tools` — map of tool name → `Tool.t()` available to this agent
- `status` — `:idle`, `:streaming`, or `:executing_tools`
- `turn_index` — monotonically increasing turn counter
- `usage` — accumulated `%{input_tokens, output_tokens}` for this session
- `cost` — accumulated cost in USD; never decreases (rewinding messages does not reduce it)
Internal fields (not part of the public API):
- `stream_task` / `stream_ref` — in-flight async LLM stream
- `stream_start` — length of `messages` when the current stream began; used to
detect messages appended *during* streaming that the LLM did not see
- `turn_checkpoints` — message-count stack used internally
- `pending_tool_calls` — tool calls waiting for execution after stream end
- `text_buffer` / `thinking_buffer` — partial text accumulated during streaming
- `on_compact` — optional compaction callback
- `opts` — pass-through keyword options (e.g. `tool_timeout`)
- `available_models` — model catalog used by `list_models` and `spawn_agent`
"""
@type t :: %__MODULE__{
id: String.t(),
name: String.t() | nil,
description: String.t() | nil,
type: String.t() | nil,
team_id: String.t() | nil,
session_id: String.t() | nil,
delegator_id: String.t() | nil,
role: :orchestrator | :worker,
model: Planck.AI.Model.t() | nil,
on_compact: ([Message.t()] -> {:compact, Message.t(), [Message.t()]} | :skip) | nil,
cwd: String.t(),
system_prompt: String.t(),
messages: [Message.t()],
tools: %{String.t() => Tool.t()},
opts: keyword(),
available_models: [Planck.AI.Model.t()],
status: :idle | :streaming | :executing_tools,
stream_task: Task.t() | nil,
stream_ref: reference() | nil,
stream_start: non_neg_integer(),
turn_index: non_neg_integer(),
turn_checkpoints: [non_neg_integer()],
pending_tool_calls: [map()],
text_buffer: String.t(),
thinking_buffer: String.t(),
usage: %{input_tokens: non_neg_integer(), output_tokens: non_neg_integer()},
cost: float(),
running_tools: %{String.t() => map()},
tool_results_acc: list()
}
defstruct [
:id,
:name,
:description,
:type,
:team_id,
:session_id,
:delegator_id,
:role,
:model,
:on_compact,
cwd: "",
system_prompt: "",
messages: [],
tools: %{},
opts: [],
available_models: [],
status: :idle,
stream_task: nil,
stream_ref: nil,
stream_start: 0,
turn_index: 0,
turn_checkpoints: [],
pending_tool_calls: [],
text_buffer: "",
thinking_buffer: "",
usage: %{input_tokens: 0, output_tokens: 0},
cost: 0.0,
running_tools: %{},
tool_results_acc: []
]
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
@doc "Start an agent under a supervisor."
@spec start_link(keyword()) :: GenServer.on_start()
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, [])
end
@doc false
def child_spec(opts) do
%{
id: Keyword.fetch!(opts, :id),
start: {__MODULE__, :start_link, [opts]},
restart: :temporary
}
end
@doc "Send a user message and kick off the agent loop. Returns once the agent status is :streaming."
@spec prompt(agent(), String.t() | [Planck.AI.Message.content_part()], keyword()) :: :ok
def prompt(agent, content, opts \\ []) do
GenServer.call(agent, {:prompt, content, opts})
end
@doc """
Trigger the agent to run an LLM turn without adding a new user message.
Used after session resume when a recovery context message is already present
in the agent's history and just needs to be acted upon.
"""
@spec nudge(agent()) :: :ok
def nudge(agent) do
GenServer.cast(agent, :nudge)
end
@doc """
Cancel in-flight streaming and tool execution. Blocks until the agent has
returned to `:idle` (or started a follow-up turn for any queued messages).
"""
@spec abort(agent()) :: :ok
def abort(agent) do
GenServer.call(agent, :abort)
end
@doc """
Truncate the session to strictly before `message_id`, then reload the
agent's in-memory message history from the DB (the source of truth).
`turn_checkpoints` is rebuilt from the reloaded message list.
Only meaningful for agents with a `session_id`. A no-op for ephemeral agents.
"""
@spec rewind_to_message(agent(), pos_integer()) :: :ok
def rewind_to_message(agent, message_id) do
GenServer.cast(agent, {:rewind_to_message, message_id})
end
@doc "Stop the agent. Cancels any in-flight work and removes it from the supervisor."
@spec stop(agent()) :: :ok
def stop(agent) do
GenServer.stop(agent)
end
@doc "Synchronous state snapshot."
@spec get_state(agent()) :: map()
def get_state(agent) do
GenServer.call(agent, :get_state)
end
@doc "Lightweight summary: id, name, description, type, role, status, turn_index, usage."
@spec get_info(agent()) :: map()
def get_info(agent) do
GenServer.call(agent, :get_info)
end
@doc "Estimate the number of tokens currently in the agent's context window."
@spec estimate_tokens(agent()) :: non_neg_integer()
def estimate_tokens(agent) do
GenServer.call(agent, :estimate_tokens)
end
@doc "Replace the model used for subsequent LLM turns without interrupting the current state."
@spec change_model(agent(), Planck.AI.Model.t()) :: :ok
def change_model(agent, model) do
GenServer.call(agent, {:change_model, model})
end
@doc """
Subscribe the calling process to `{:agent_event, type, payload}` messages.
Accepts either an agent id string or a pid/name. The pid form resolves the id
via `get_info/1` — prefer passing the id directly when available.
"""
@spec subscribe(String.t() | agent()) :: :ok | {:error, term()}
def subscribe(agent_id) when is_binary(agent_id) do
Phoenix.PubSub.subscribe(Planck.Agent.PubSub, "agent:#{agent_id}")
end
def subscribe(agent) do
%{id: id} = get_info(agent)
Phoenix.PubSub.subscribe(Planck.Agent.PubSub, "agent:#{id}")
end
@doc "Resolve an agent id to its pid via the Registry."
@spec whereis(String.t()) :: {:ok, pid()} | {:error, :not_found}
def whereis(id) do
case Registry.lookup(Planck.Agent.Registry, {:agent, id}) do
[{pid, _}] -> {:ok, pid}
_ -> {:error, :not_found}
end
end
@doc "Add a tool at runtime."
@spec add_tool(agent(), Tool.t()) :: :ok
def add_tool(agent, tool) do
GenServer.cast(agent, {:add_tool, tool})
end
@doc "Remove a tool by name at runtime."
@spec remove_tool(agent(), String.t()) :: :ok
def remove_tool(agent, name) do
GenServer.cast(agent, {:remove_tool, name})
end
# ---------------------------------------------------------------------------
# GenServer callbacks
# ---------------------------------------------------------------------------
@impl true
def init(opts) do
tool_list = Keyword.get(opts, :tools, [])
tool_map = Map.new(tool_list, &{&1.name, &1})
role = if Map.has_key?(tool_map, "spawn_agent"), do: :orchestrator, else: :worker
state = %__MODULE__{
id: Keyword.fetch!(opts, :id),
name: Keyword.get(opts, :name),
description: Keyword.get(opts, :description),
type: Keyword.get(opts, :type),
team_id: Keyword.get(opts, :team_id),
session_id: Keyword.get(opts, :session_id),
delegator_id: Keyword.get(opts, :delegator_id),
role: role,
model: Keyword.fetch!(opts, :model),
cwd: Keyword.get(opts, :cwd, ""),
system_prompt: Keyword.get(opts, :system_prompt, ""),
tools: tool_map,
opts: Keyword.get(opts, :opts, []),
available_models: Keyword.get(opts, :available_models, []),
on_compact: Keyword.get(opts, :on_compact),
usage: Keyword.get(opts, :usage, %{input_tokens: 0, output_tokens: 0}),
cost: Keyword.get(opts, :cost, 0.0)
}
register_agent(state)
link_to_orchestrator(state)
# Orchestrators trap exits so they survive individual worker crashes.
if role == :orchestrator, do: Process.flag(:trap_exit, true)
# Notify session subscribers so UIs can refresh the agent list.
if state.delegator_id, do: broadcast(state, :worker_spawned, %{})
{:ok, state}
end
@impl true
def handle_call(event, from, state)
def handle_call(:get_state, _from, state) do
{:reply, state, state}
end
def handle_call(:get_info, _from, state) do
info = %{
id: state.id,
name: state.name,
description: state.description,
type: state.type,
role: state.role,
status: state.status,
turn_index: state.turn_index,
usage: state.usage,
cost: state.cost
}
{:reply, info, state}
end
def handle_call({:change_model, model}, _from, state) do
{:reply, :ok, %{state | model: model}}
end
def handle_call(:estimate_tokens, _from, state) do
{:reply, Message.estimate_tokens(state.messages), state}
end
def handle_call({:prompt, content, _opts}, _from, %{status: :idle} = state) do
do_prompt(content, state)
end
def handle_call({:prompt, content, _opts}, _from, state) do
# Agent is busy — append without persisting yet. Persisting now would give
# the queued message a db_id smaller than the current turn's assistant
# response, breaking edit-message truncation order. The message is flushed
# to the session in handle_continue(:run_llm) after the current turn ends.
parts = normalize_content(content)
msg = Message.new(:user, parts)
{:reply, :ok, %{state | messages: state.messages ++ [msg]}}
end
def handle_call(:abort, _from, state) do
cancel_stream(state)
cancel_running_tools(state)
new_state = reset_streaming(state)
if has_pending_input?(new_state.messages, new_state.stream_start) do
broadcast(new_state, :turn_start, %{index: new_state.turn_index})
{:reply, :ok, %{new_state | status: :streaming}, {:continue, {:run_llm, :new_turn}}}
else
{:reply, :ok, new_state}
end
end
@impl true
def handle_cast(event, state)
def handle_cast(:nudge, %{status: :idle} = state) do
broadcast(state, :turn_start, %{index: state.turn_index})
{:noreply, %{state | status: :streaming}, {:continue, {:run_llm, :new_turn}}}
end
def handle_cast(:nudge, state) do
{:noreply, state}
end
def handle_cast({:rewind_to_message, _message_id}, %{session_id: nil} = state) do
{:noreply, state}
end
def handle_cast({:rewind_to_message, message_id}, state) do
Session.truncate_after(state.session_id, message_id)
{:noreply, reload_messages_from_session(state)}
end
def handle_cast({:add_tool, tool}, state) do
{:noreply, %{state | tools: Map.put(state.tools, tool.name, tool)}}
end
def handle_cast({:remove_tool, name}, state) do
{:noreply, %{state | tools: Map.delete(state.tools, name)}}
end
@impl true
def handle_continue(message, state)
def handle_continue(:run_llm, state) do
{:noreply, do_run_llm(state, :continuation)}
end
def handle_continue({:run_llm, :new_turn}, state) do
{:noreply, do_run_llm(state, :new_turn)}
end
def handle_continue({:execute_tools, calls}, state) do
{:noreply, start_tool_tasks(calls, state)}
end
@impl true
def handle_info(event, state)
def handle_info({:stream_event, ref, event}, %{stream_ref: ref} = state) do
{:noreply, process_event(state, event)}
end
def handle_info({:stream_event, _stale, _event}, state) do
{:noreply, state}
end
def handle_info({:stream_done, ref}, %{stream_ref: ref} = state) do
do_stream_done(state)
end
def handle_info({:stream_done, _stale}, state) do
{:noreply, state}
end
def handle_info({:tool_done, call_id, name, result}, state) do
case Map.pop(state.running_tools, call_id) do
{nil, _} ->
# Stale result arriving after an abort — ignore.
{:noreply, state}
{_task, remaining} ->
error = match?({:error, _}, result)
broadcast(state, :tool_end, %{id: call_id, name: name, result: result, error: error})
new_results = [{call_id, result} | state.tool_results_acc]
if map_size(remaining) == 0 do
{:noreply, finish_tool_execution(new_results, state), {:continue, :run_llm}}
else
{:noreply, %{state | running_tools: remaining, tool_results_acc: new_results}}
end
end
end
def handle_info({:agent_response, response, sender}, state) do
do_agent_response(response, sender, state)
end
def handle_info({:EXIT, pid, reason}, state) do
broadcast(state, :worker_exit, %{pid: pid, reason: reason})
{:noreply, state}
end
@impl true
def terminate(_reason, state) do
cancel_stream(state)
cancel_running_tools(state)
end
# ---------------------------------------------------------------------------
# Callback implementations
# ---------------------------------------------------------------------------
@spec do_prompt(String.t() | [Planck.AI.Message.content_part()], t()) ::
{:reply, :ok, t(), {:continue, {:run_llm, :new_turn}}}
defp do_prompt(content, state) do
parts = normalize_content(content)
msg = Message.new(:user, parts)
checkpoint = length(state.messages)
msg = persist_message(state, msg)
new_state = %{
state
| messages: state.messages ++ [msg],
turn_checkpoints: [checkpoint | state.turn_checkpoints]
}
broadcast(new_state, :turn_start, %{index: new_state.turn_index})
{:reply, :ok, %{new_state | status: :streaming}, {:continue, {:run_llm, :new_turn}}}
end
defp do_run_llm(state, turn_type) do
{messages, state} = apply_compact(state)
state = flush_unpersisted_messages(state)
# Only advance stream_start for fresh turns. Tool continuations keep the
# original stream_start so any user message queued during tool execution
# remains detectable by maybe_turn_start after the turn ends.
stream_start =
case turn_type do
:new_turn -> length(state.messages)
:continuation -> state.stream_start
end
ai_tools = state.tools |> Map.values() |> Enum.map(&Tool.to_ai_tool/1)
context = %Context{
system: presence(state.system_prompt),
messages: Message.to_ai_messages(messages),
tools: ai_tools
}
ref = make_ref()
parent = self()
{:ok, task} =
Task.Supervisor.start_child(Planck.Agent.TaskSupervisor, fn ->
AIBehaviour.client().stream(state.model, context, state.opts)
|> Enum.each(fn event -> send(parent, {:stream_event, ref, event}) end)
send(parent, {:stream_done, ref})
end)
%{
state
| stream_task: task,
stream_ref: ref,
stream_start: stream_start,
status: :streaming,
turn_index: state.turn_index + 1
}
end
@spec start_tool_tasks([map()], t()) :: t()
defp start_tool_tasks(tool_calls, state) do
parent = self()
running =
Map.new(tool_calls, fn %{id: id, name: name, args: args} ->
broadcast(state, :tool_start, %{id: id, name: name, args: args})
execute_fn = resolve_tool_fn(state.tools, state.id, name, id, args)
{:ok, pid} =
Task.Supervisor.start_child(Planck.Agent.TaskSupervisor, fn ->
send(parent, {:tool_done, id, name, execute_fn.()})
end)
{id, %{name: name, pid: pid}}
end)
%{state | running_tools: running, tool_results_acc: [], status: :executing_tools}
end
@spec finish_tool_execution(list(), t()) :: t()
defp finish_tool_execution(results, state) do
tool_result_msg = results |> Enum.reverse() |> build_tool_result_message()
tool_result_msg = persist_message(state, tool_result_msg)
%{
state
| messages: state.messages ++ [tool_result_msg],
pending_tool_calls: [],
running_tools: %{},
tool_results_acc: [],
status: :streaming
}
end
@spec resolve_tool_fn(%{String.t() => Tool.t()}, String.t(), String.t(), String.t(), map()) ::
(-> {:ok, String.t()} | {:error, term()})
defp resolve_tool_fn(tools, agent_id, name, id, args) do
case Map.get(tools, name) do
nil -> fn -> {:error, "unknown tool: #{name}"} end
%Tool{execute_fn: fun} -> fn -> safe_execute(fun, agent_id, id, args) end
end
end
@spec cancel_running_tools(t()) :: :ok
defp cancel_running_tools(%{running_tools: tools}) when map_size(tools) == 0, do: :ok
defp cancel_running_tools(%{running_tools: tools}) do
Enum.each(tools, fn {_id, %{pid: pid}} -> Process.exit(pid, :kill) end)
end
@spec do_stream_done(t()) ::
{:noreply, t()} | {:noreply, t(), {:continue, {:execute_tools, [map()]}}}
defp do_stream_done(state) do
pending = state.pending_tool_calls
assistant_msg = build_assistant_message(state)
assistant_msg = persist_message(state, assistant_msg)
new_state =
%{state | messages: state.messages ++ [assistant_msg]}
|> reset_streaming()
case pending do
[] ->
broadcast(new_state, :turn_end, %{message: assistant_msg, usage: new_state.usage})
maybe_turn_start(new_state)
calls ->
{:noreply, %{new_state | status: :executing_tools}, {:continue, {:execute_tools, calls}}}
end
end
@spec do_agent_response(String.t(), term(), t()) ::
{:noreply, t()}
| {:noreply, t(), {:continue, {:run_llm, :new_turn}}}
defp do_agent_response(response, sender, state) do
metadata =
case sender do
%{id: id, name: name} -> %{sender_id: id, sender_name: name}
_ -> %{}
end
msg = Message.new({:custom, :agent_response}, [{:text, response}], metadata)
msg = persist_message(state, msg)
new_state = %{state | messages: state.messages ++ [msg]}
if state.status == :idle do
broadcast(new_state, :turn_start, %{index: new_state.turn_index})
{:noreply, %{new_state | status: :streaming}, {:continue, {:run_llm, :new_turn}}}
else
{:noreply, new_state}
end
end
# ---------------------------------------------------------------------------
# Private helpers
# ---------------------------------------------------------------------------
@spec link_to_orchestrator(t()) :: :ok
defp link_to_orchestrator(%{delegator_id: nil}), do: :ok
defp link_to_orchestrator(%{delegator_id: id}) do
case whereis(id) do
{:ok, pid} -> Process.link(pid)
_ -> :ok
end
end
@spec register_agent(t()) :: :ok
defp register_agent(%{id: id, team_id: team_id, type: type, name: name, description: desc}) do
Registry.register(Planck.Agent.Registry, {:agent, id}, nil)
if team_id do
meta = %{id: id, type: type, name: name, description: desc}
Registry.register(Planck.Agent.Registry, {team_id, :member}, meta)
if type, do: Registry.register(Planck.Agent.Registry, {team_id, type}, id)
if name, do: Registry.register(Planck.Agent.Registry, {team_id, name}, id)
end
:ok
end
@spec broadcast(t(), atom(), map()) :: :ok
defp broadcast(%{id: id, session_id: session_id}, type, payload) do
event = {:agent_event, type, payload}
Phoenix.PubSub.broadcast(Planck.Agent.PubSub, "agent:#{id}", event)
if session_id do
session_event = {:agent_event, type, Map.put(payload, :agent_id, id)}
Phoenix.PubSub.broadcast(Planck.Agent.PubSub, "session:#{session_id}", session_event)
end
end
# Persist any messages with a UUID id (not yet written to the session),
# then reload all messages from the DB to get the canonical id-ordered sequence
# and rebuild turn_checkpoints. Called at the start of handle_continue(:run_llm)
# so queued messages are always written AFTER the previous turn's assistant
# response and the in-memory list reflects the correct DB order.
@spec flush_unpersisted_messages(t()) :: t()
defp flush_unpersisted_messages(state)
defp flush_unpersisted_messages(%__MODULE__{session_id: nil} = state) do
state
end
defp flush_unpersisted_messages(state) do
unpersisted = Enum.filter(state.messages, &is_binary(&1.id))
if unpersisted == [] do
state
else
Enum.each(unpersisted, &Session.append(state.session_id, state.id, &1))
reload_messages_from_session(state)
end
end
# Reload the agent's message history from the session DB and rebuild
# turn_checkpoints. Used after any operation that changes the canonical
# sequence (rewind, flush of queued messages).
@spec reload_messages_from_session(t()) :: t()
defp reload_messages_from_session(state) do
case Session.messages(state.session_id, agent_id: state.id) do
{:ok, rows} ->
messages = Enum.map(rows, & &1.message)
checkpoints =
messages
|> Enum.with_index()
|> Enum.filter(fn {msg, _} -> msg.role == :user end)
|> Enum.map(fn {_, idx} -> idx end)
|> Enum.reverse()
%{state | messages: messages, turn_checkpoints: checkpoints}
_ ->
# Session unavailable (e.g. no GenServer running for this session_id);
# keep current in-memory state unchanged.
state
end
end
# Persist a message and return it with its DB row id set. For ephemeral
# agents (no session_id), the message is returned unchanged.
@spec persist_usage(t()) :: :ok
defp persist_usage(%{session_id: nil}), do: :ok
defp persist_usage(%{session_id: sid, id: agent_id, usage: usage, cost: cost}) do
data =
Jason.encode!(%{
input_tokens: usage.input_tokens,
output_tokens: usage.output_tokens,
cost: cost
})
Session.save_metadata(sid, %{"agent_usage:#{agent_id}" => data})
end
@spec persist_message(t(), Message.t()) :: Message.t()
defp persist_message(%{session_id: nil}, msg), do: msg
defp persist_message(%{session_id: sid, id: agent_id}, msg) do
case Session.append(sid, agent_id, msg) do
nil -> msg
db_id -> %{msg | id: db_id}
end
end
@spec process_event(t(), Planck.AI.Stream.t()) :: t()
defp process_event(state, {:text_delta, text}) do
broadcast(state, :text_delta, %{text: text})
%{state | text_buffer: state.text_buffer <> text}
end
defp process_event(state, {:thinking_delta, text}) do
broadcast(state, :thinking_delta, %{text: text})
%{state | thinking_buffer: state.thinking_buffer <> text}
end
defp process_event(state, {:tool_call_complete, call}) do
%{state | pending_tool_calls: state.pending_tool_calls ++ [call]}
end
defp process_event(state, {:error, reason}) do
broadcast(state, :error, %{reason: reason})
reset_streaming(state)
end
defp process_event(state, {:done, %{usage: %{input_tokens: i, output_tokens: o}}}) do
usage = %{
input_tokens: state.usage.input_tokens + i,
output_tokens: state.usage.output_tokens + o
}
turn_cost =
case state.model do
%{cost: %{input: in_rate, output: out_rate}} ->
(i * in_rate + o * out_rate) / 1_000_000
_ ->
0.0
end
new_state = %{state | usage: usage, cost: state.cost + turn_cost}
persist_usage(new_state)
broadcast(new_state, :usage_delta, %{
delta: %{
input_tokens: i,
output_tokens: o,
cost: turn_cost
},
total: %{
input_tokens: usage.input_tokens,
output_tokens: usage.output_tokens,
cost: new_state.cost
},
context_tokens: Message.estimate_tokens(new_state.messages)
})
new_state
end
defp process_event(state, _other), do: state
@spec build_assistant_message(t()) :: Message.t()
defp build_assistant_message(%{
text_buffer: text,
thinking_buffer: thinking,
pending_tool_calls: calls
}) do
content =
Enum.map(calls, fn %{id: id, name: name, args: args} -> {:tool_call, id, name, args} end)
|> prepend_if(text != "", {:text, text})
|> prepend_if(thinking != "", {:thinking, thinking})
Message.new(:assistant, content)
end
@spec prepend_if(list(), boolean(), term()) :: list()
defp prepend_if(list, true, item), do: [item | list]
defp prepend_if(list, false, _item), do: list
# 2000 lines or 50 KB, whichever is hit first — matching pi-mono's limits.
@max_tool_output_lines 2_000
@max_tool_output_bytes 50_000
@spec build_tool_result_message([{String.t(), {:ok, String.t()} | {:error, term()}}]) ::
Message.t()
defp build_tool_result_message(results) do
content =
Enum.map(results, fn {id, result} ->
value =
case result do
{:ok, v} when is_binary(v) -> v
{:ok, v} -> inspect(v)
{:error, reason} when is_binary(reason) -> "Error: #{reason}"
{:error, reason} -> "Error: #{inspect(reason)}"
end
{:tool_result, id, truncate_tool_output(value)}
end)
Message.new(:tool_result, content)
end
@spec truncate_tool_output(String.t()) :: String.t()
defp truncate_tool_output(value) do
lines = String.split(value, "\n")
{value, line_truncated} =
if length(lines) > @max_tool_output_lines do
{Enum.take(lines, @max_tool_output_lines) |> Enum.join("\n"), true}
else
{value, false}
end
{value, truncated} =
if byte_size(value) > @max_tool_output_bytes do
{binary_part(value, 0, @max_tool_output_bytes), true}
else
{value, line_truncated}
end
if truncated, do: value <> "\n[output truncated]", else: value
end
@spec apply_compact(t()) :: {[Message.t()], t()}
defp apply_compact(state)
defp apply_compact(%__MODULE__{on_compact: nil, messages: messages} = state) do
{messages_since_last_summary(messages), state}
end
defp apply_compact(%__MODULE__{on_compact: fun, messages: messages} = state)
when is_function(fun) do
recent = messages_since_last_summary(messages)
case fun.(recent) do
:skip ->
{recent, state}
{:compact, %Message{} = summary_msg, kept} ->
broadcast(state, :compacting, %{})
summary_msg = persist_message(state, summary_msg)
prefix_len = length(messages) - length(recent)
prefix = Enum.take(messages, prefix_len)
new_messages = prefix ++ [summary_msg | kept]
new_state = %{state | messages: new_messages}
broadcast(new_state, :compacted, %{})
{[summary_msg | kept], new_state}
end
end
@spec maybe_turn_start(t()) ::
{:noreply, t()}
| {:noreply, t(), {:continue, {:run_llm, :new_turn}}}
defp maybe_turn_start(state)
defp maybe_turn_start(%__MODULE__{} = state) do
if has_pending_input?(state.messages, state.stream_start) do
broadcast(state, :turn_start, %{index: state.turn_index})
{:noreply, %{state | status: :streaming}, {:continue, {:run_llm, :new_turn}}}
else
{:noreply, state}
end
end
# Returns true if any :user or {:custom, :agent_response} message arrived
# after stream_start — those were appended during streaming and not seen
# by the LLM.
@spec has_pending_input?([Message.t()], non_neg_integer()) :: boolean()
defp has_pending_input?(messages, stream_start) do
messages
|> Enum.drop(stream_start)
|> Enum.any?(fn
%{role: :user} -> true
%{role: {:custom, :agent_response}} -> true
_ -> false
end)
end
@spec messages_since_last_summary([Message.t()]) :: [Message.t()]
defp messages_since_last_summary(messages) do
messages
|> Enum.reverse()
|> Enum.split_while(&(not match?(%Message{role: {:custom, :summary}}, &1)))
|> case do
{_tail_rev, []} -> messages
{tail_rev, [%Message{} = summary | _]} -> [summary | Enum.reverse(tail_rev)]
end
end
@spec cancel_stream(t()) :: :ok
defp cancel_stream(%{stream_task: nil}), do: :ok
defp cancel_stream(%{stream_task: task}) do
Task.Supervisor.terminate_child(Planck.Agent.TaskSupervisor, task)
end
@spec reset_streaming(t()) :: t()
defp reset_streaming(state) do
%{
state
| status: :idle,
stream_task: nil,
stream_ref: nil,
text_buffer: "",
thinking_buffer: "",
pending_tool_calls: [],
running_tools: %{},
tool_results_acc: []
}
end
@spec safe_execute(Tool.execute_fn(), String.t(), String.t(), map()) ::
{:ok, String.t()} | {:error, term()}
defp safe_execute(fun, agent_id, id, args) do
fun.(agent_id, id, args)
rescue
e -> {:error, e}
catch
:exit, reason -> {:error, {:exit, reason}}
end
@spec normalize_content(String.t() | [Planck.AI.Message.content_part()]) ::
[Planck.AI.Message.content_part()]
defp normalize_content(text) when is_binary(text), do: [{:text, text}]
defp normalize_content(parts) when is_list(parts), do: parts
@spec presence(String.t()) :: String.t() | nil
defp presence(""), do: nil
defp presence(str), do: str
end