defmodule Pixir.ACP.Translate do
@moduledoc """
Pure mapping from Pixir bus `Events` to ACP `session/update` notification params, and
from a terminal `Conversation.await/2` outcome to an ACP `PromptResponse` stopReason
(ADR 0009, §§4-5). No IO, no process state.
This is **presentation only** — the canonical Log is never altered. Streamed
`text_delta`s become `agent_message_chunk`s; the canonical `assistant_message` (the
same text) is intentionally dropped here to avoid duplication, and is re-emitted only
by the no-deltas fallback path in `Pixir.ACP.Server`. File-oriented tool calls can
include ACP locations when the Server supplies the session workspace; paths are resolved
from structured tool args, never inferred from prose.
"""
alias Pixir.Event
@type acp_sid :: String.t()
@type await_outcome :: :done | :error | :interrupted | :timeout
@doc """
Translate a Pixir `Event` into the full `session/update` params
(`%{"sessionId" => acp_sid, "update" => update}`), or `nil` when the event maps to
nothing on the wire.
"""
@spec update(Event.t(), acp_sid(), keyword()) :: map() | nil
def update(event, acp_sid, opts \\ [])
def update(%{type: :text_delta, data: %{"chunk" => chunk}}, acp_sid, _opts) do
wrap(acp_sid, %{
"sessionUpdate" => "agent_message_chunk",
"content" => text_block(chunk)
})
end
def update(%{type: :reasoning_delta, data: %{"chunk" => chunk}}, acp_sid, _opts) do
wrap(acp_sid, %{
"sessionUpdate" => "agent_thought_chunk",
"content" => text_block(chunk)
})
end
def update(%{type: :tool_call, data: %{"call_id" => id, "name" => name} = data}, acp_sid, opts) do
args = Map.get(data, "args", %{})
wrap(
acp_sid,
%{
"sessionUpdate" => "tool_call",
"toolCallId" => id,
"title" => title(name, args),
"kind" => kind(name),
"status" => "in_progress"
}
|> put_optional("locations", tool_locations(name, args, opts))
|> put_optional("rawInput", semantic_tool_input(name, args))
)
end
def update(%{type: :tool_result, data: %{"call_id" => id, "ok" => ok} = data}, acp_sid, _opts) do
wrap(
acp_sid,
%{
"sessionUpdate" => "tool_call_update",
"toolCallId" => id,
"status" => if(ok, do: "completed", else: "failed"),
"content" => [%{"type" => "content", "content" => text_block(result_text(ok, data))}]
}
|> put_optional("rawOutput", semantic_tool_output(data))
)
end
def update(%{type: :subagent_event, data: %{"subagent_id" => id} = data}, acp_sid, opts)
when is_binary(id) and id != "" do
wrap(acp_sid, subagent_update(acp_sid, data, Keyword.get(opts, :subagent_seen?, false)))
end
def update(%{type: :plan, data: %{"entries" => entries}}, acp_sid, _opts)
when is_list(entries) do
wrap(acp_sid, %{
"sessionUpdate" => "plan",
"entries" => Enum.map(entries, &plan_entry/1)
})
end
# Context pressure / remaining window gauge (ADR 0020 + this change).
# Emitted as an ephemeral bus event by Turn after provider usage is assessed. Routine
# snapshots update a live gauge; notices/recovery events add human-facing pressure
# guidance. Forwarded over ACP so clients such as T3 Code can surface an accurate
# live badge (used / remaining / tier) instead of stale numbers. This is
# presentation-only (like :status / :plan) — never part of Provider input and never
# logged.
#
# ACP wire shape:
# session/update with "sessionUpdate": "usage_update"
# fields: used, size, _meta.pixir.{presentation,tier,model,remainingTokens,ratio,...}
#
# `usage_update` is the protocol surface T3 already validates for context gauges.
# Pixir-specific pressure semantics stay under `_meta.pixir` so the canonical
# runtime vocabulary does not leak as a custom ACP update kind.
def update(%{type: :context_pressure, data: data}, acp_sid, _opts) do
input = Map.get(data, "input_tokens") || Map.get(data, "context_pressure_input_tokens")
window = Map.get(data, "window_tokens")
if non_negative_integer?(input) and non_negative_integer?(window) do
remaining = max(0, window - input)
meta =
%{
"tier" => Map.get(data, "tier"),
"model" => Map.get(data, "model"),
"remainingTokens" => remaining,
"ratio" => Map.get(data, "ratio") || Map.get(data, "context_pressure_ratio")
}
|> put_optional("presentation", Map.get(data, "presentation"))
|> put_optional("checkpointToSeq", Map.get(data, "checkpoint_to_seq"))
|> put_optional("nextActions", Map.get(data, "next_actions"))
|> put_optional("message", Map.get(data, "message"))
|> put_optional("trigger", Map.get(data, "trigger"))
wrap(acp_sid, %{
"sessionUpdate" => "usage_update",
"used" => input,
"size" => window,
"_meta" => %{"pixir" => meta}
})
end
end
# assistant_message / reasoning / user_message / permission_decision / status: no direct
# wire form (context_pressure is handled above). assistant_message is handled only
# by the Server's no-deltas fallback; status drives the stopReason via await's
# terminal return.
def update(_event, _acp_sid, _opts), do: nil
@doc """
Map a canonical History Event to a `session/update` for `session/load` REPLAY
(epic A.6) — a fuller mapping than `update/2`, which intentionally drops
user/assistant (live streaming re-emits deltas, not the canonical message).
On load there are no deltas, so clean canonical messages ARE the transcript:
`user_message`→`user_message_chunk`, clean `assistant_message`→`agent_message_chunk`,
`tool_call`/`tool_result` as in live. **Reasoning is omitted** (opaque encrypted
`rs_` items carry no displayable summary — ADR 0007; the summary text is
ephemeral and never logged). Partial assistant evidence and `turn_failed` are audit
evidence, not clean transcript content. Returns `nil` for events with no transcript
form.
"""
@spec replay(Event.t(), acp_sid(), keyword()) :: map() | nil
def replay(event, acp_sid, opts \\ [])
def replay(%{type: :user_message, data: %{"text" => text}}, acp_sid, _opts) do
wrap(acp_sid, %{"sessionUpdate" => "user_message_chunk", "content" => text_block(text)})
end
def replay(
%{type: :assistant_message, data: %{"metadata" => %{"partial" => true}}},
_acp_sid,
_opts
),
do: nil
def replay(%{type: :assistant_message, data: %{"text" => text}}, acp_sid, _opts) do
wrap(acp_sid, %{"sessionUpdate" => "agent_message_chunk", "content" => text_block(text)})
end
# tool_call / tool_result / subagent_event replay identically to the live wire form.
def replay(%{type: type} = event, acp_sid, opts)
when type in [:tool_call, :tool_result, :subagent_event] do
update(event, acp_sid, opts)
end
# turn_failed / reasoning / permission_decision / status / plan / context_pressure:
# nothing to replay as clean transcript content.
# context_pressure is a live gauge only (ephemeral by construction).
def replay(%{type: :turn_failed}, _acp_sid, _opts), do: nil
def replay(%{type: :context_pressure}, _acp_sid, _opts), do: nil
def replay(_event, _acp_sid, _opts), do: nil
@doc """
Build a one-off `agent_message_chunk` `session/update` for a piece of assistant text.
Used by the Server's fallback when a Turn streamed no `text_delta` (ADR 0009 §4).
"""
@spec message_chunk(String.t(), acp_sid()) :: map()
def message_chunk(text, acp_sid) when is_binary(text) do
wrap(acp_sid, %{"sessionUpdate" => "agent_message_chunk", "content" => text_block(text)})
end
@doc """
Map a terminal `await` outcome to an ACP stopReason. `cancel_requested?` covers the
cancel-vs-terminal race: if `session/cancel` arrived, resolve `"cancelled"` even if a
`done`/`error` slipped in first. A turn-level error is reported as content, not a
protocol error, so it resolves `"end_turn"` (ADR 0009 §5).
"""
@spec stop_reason(await_outcome(), boolean()) :: String.t()
def stop_reason(_outcome, true), do: "cancelled"
def stop_reason(:interrupted, _cancel?), do: "cancelled"
def stop_reason(:done, _cancel?), do: "end_turn"
def stop_reason(:error, _cancel?), do: "end_turn"
def stop_reason(:timeout, _cancel?), do: "end_turn"
@doc """
Build the `session/request_permission` PARAMS (A.2) from an asker request
`%{tool, args, reason, call_id}`. Offers exactly two options per decision #7:
`allow_once` / `reject_once` (ADR 0006 defers persistent allow-lists). Reuses
`title/2`/`kind/1` so the toolCall reads like the live `tool_call` update.
"""
@spec permission_request(map(), acp_sid()) :: map()
def permission_request(%{tool: tool} = request, acp_sid) do
args = Map.get(request, :args, %{})
%{
"sessionId" => acp_sid,
"toolCall" => %{
"toolCallId" => Map.get(request, :call_id, ""),
"title" => title(tool, args),
"kind" => kind(tool)
},
"options" => [
%{"optionId" => "allow", "name" => "Allow", "kind" => "allow_once"},
%{"optionId" => "reject", "name" => "Reject", "kind" => "reject_once"}
]
}
end
@doc """
Map a `RequestPermissionResponse` (the `{:ok, result}` / `{:error, _}` from
`Server.request_permission/2`) to the Executor's decision: `:allow | {:deny,
reason}`. A `selected` outcome whose optionId is an allow option → `:allow`;
a reject option, a `cancelled` outcome, or any error/malformed response →
`{:deny, reason}` (default-deny: anything we can't read as an explicit allow
is a denial).
"""
@spec permission_outcome({:ok, map()} | {:error, term()}) :: :allow | {:deny, String.t()}
def permission_outcome(
{:ok, %{"outcome" => %{"outcome" => "selected", "optionId" => "allow"}}}
),
do: :allow
def permission_outcome({:ok, %{"outcome" => %{"outcome" => "selected"}}}),
do: {:deny, "rejected by user"}
def permission_outcome({:ok, %{"outcome" => %{"outcome" => "cancelled"}}}),
do: {:deny, "cancelled"}
def permission_outcome({:error, _reason}), do: {:deny, "permission request failed"}
def permission_outcome(_other), do: {:deny, "denied"}
@doc "ACP tool `kind` for a Pixir tool registry name (cosmetic — drives only an icon)."
@spec kind(String.t()) :: String.t()
def kind("read"), do: "read"
def kind("skills_list"), do: "read"
def kind("skill_view"), do: "read"
def kind("wait_agent"), do: "read"
def kind("list_agents"), do: "read"
def kind("spawn_agent"), do: "execute"
def kind("send_input"), do: "execute"
def kind("close_agent"), do: "execute"
def kind("run_workflow"), do: "execute"
def kind("resource_view"), do: "read"
def kind("write"), do: "edit"
def kind("edit"), do: "edit"
def kind("bash"), do: "execute"
def kind(_other), do: "other"
# ── internals ──────────────────────────────────────────────────────────────
defp wrap(acp_sid, update), do: %{"sessionId" => acp_sid, "update" => update}
defp put_optional(map, _key, nil), do: map
defp put_optional(map, key, value), do: Map.put(map, key, value)
defp non_negative_integer?(value), do: is_integer(value) and value >= 0
defp semantic_tool_input(name, args)
when name in ~w(spawn_agent wait_agent list_agents close_agent send_input run_workflow) do
%{
"_meta" => %{
"pixir" => %{
"presentation" => %{
"type" => semantic_tool_type(name),
"tool" => name
}
}
},
"args" => args || %{}
}
end
defp semantic_tool_input(_name, _args), do: nil
defp tool_locations(name, args, opts)
when name in ~w(read write edit) and is_map(args) do
workspace = Keyword.get(opts, :workspace)
path = args["path"]
with true <- is_binary(path) and path != "",
{:ok, location_path} <- location_path(path, workspace) do
[%{"path" => location_path}]
else
_ -> nil
end
end
defp tool_locations(_name, _args, _opts), do: nil
defp location_path(path, workspace) when is_binary(workspace) and workspace != "" do
root = Path.expand(workspace)
abs = Path.expand(path, root)
if inside_workspace?(abs, root) do
{:ok, abs}
else
:error
end
end
defp location_path(_path, _workspace), do: :error
defp inside_workspace?(path, root), do: path == root or String.starts_with?(path, root <> "/")
defp semantic_tool_type("run_workflow"), do: "workflow_tool"
defp semantic_tool_type(_name), do: "subagent_tool"
defp semantic_tool_output(%{"subagent" => subagent}) when is_map(subagent) do
semantic_output("subagent_tool_result", %{"subagent" => subagent})
end
defp semantic_tool_output(%{"subagents" => subagents}) when is_list(subagents) do
semantic_output("subagent_tool_result", %{"subagents" => subagents})
end
defp semantic_tool_output(%{"workflow" => workflow}) when is_map(workflow) do
semantic_output("workflow_tool_result", %{"workflow" => workflow})
end
defp semantic_tool_output(_data), do: nil
defp semantic_output(type, data) do
Map.put(data, "_meta", %{
"pixir" => %{
"presentation" => %{
"type" => type
}
}
})
end
defp subagent_update(acp_sid, data, seen?) do
status = data["status"] || data["event"] || "unknown"
tool_call_id = subagent_tool_call_id(acp_sid, data["subagent_id"])
title = subagent_title(data)
detail = subagent_detail(data)
semantic = subagent_semantic_data(data)
cond do
subagent_terminal?(status, data["event"]) ->
subagent_tool_update(tool_call_id, title, subagent_acp_status(status), detail, semantic)
seen? ->
subagent_tool_update(tool_call_id, title, "in_progress", detail, semantic)
true ->
%{
"sessionUpdate" => "tool_call",
"toolCallId" => tool_call_id,
"title" => title,
"kind" => "other",
"status" => "in_progress",
"rawInput" => semantic,
"content" => [%{"type" => "content", "content" => text_block(detail)}]
}
end
end
defp subagent_tool_update(tool_call_id, title, status, detail, semantic) do
%{
"sessionUpdate" => "tool_call_update",
"toolCallId" => tool_call_id,
"title" => title,
"kind" => "other",
"status" => status,
"rawOutput" => semantic,
"content" => [%{"type" => "content", "content" => text_block(detail)}]
}
end
defp subagent_tool_call_id(acp_sid, id), do: "pixir:#{acp_sid}:subagent:#{id}"
defp subagent_title(data) do
agent = data["agent"] || "default"
id = data["subagent_id"]
"Subagent #{id} (#{agent})"
end
defp subagent_detail(data) do
status = data["status"] || data["event"] || "unknown"
summary = data["summary"]
task = data["task"]
cond do
is_binary(summary) and summary != "" -> "#{status}: #{summary}"
is_binary(task) and task != "" -> "#{status}: #{truncate(task)}"
true -> status
end
end
defp subagent_semantic_data(data) do
%{
"_meta" => %{
"pixir" => %{
"presentation" => %{
"type" => "subagent_lifecycle",
"tool" => "subagent_event"
}
}
},
"subagent" => %{
"id" => data["subagent_id"],
"child_session_id" => data["child_session_id"],
"agent" => data["agent"],
"task" => data["task"],
"depth" => data["depth"],
"workspace" => data["workspace"],
"event" => data["event"],
"status" => data["status"],
"summary" => data["summary"]
}
}
end
defp subagent_terminal?(status, event),
do:
status in ~w(completed failed cancelled timed_out closed detached) or
event in ~w(finished failed cancelled timed_out closed)
defp subagent_acp_status(status) when status in ~w(completed closed), do: "completed"
defp subagent_acp_status(_status), do: "failed"
# Normalize one plan entry to the ACP `PlanEntry` schema: `priority ∈
# {high,medium,low}` (default medium), `status ∈ {pending,in_progress,
# completed}` (default pending), and a never-empty `content` string. Tolerates
# atom or string keys/values from the bus.
@plan_priorities ~w(high medium low)
@plan_statuses ~w(pending in_progress completed)
defp plan_entry(entry) when is_map(entry) do
%{
"content" => plan_content(get_field(entry, "content")),
"priority" => clamp(get_field(entry, "priority"), @plan_priorities, "medium"),
"status" => clamp(get_field(entry, "status"), @plan_statuses, "pending")
}
end
defp plan_entry(other),
do: %{"content" => to_text(other), "priority" => "medium", "status" => "pending"}
# Read a field by its string key, falling back to the atom key. The keys are
# fixed literals (content/priority/status), so the atoms already exist — but
# guard with `to_existing_atom` so a stray non-atom key never crashes.
defp get_field(map, key) do
Map.get(map, key) ||
case safe_atom(key) do
nil -> nil
atom -> Map.get(map, atom)
end
end
defp safe_atom(key) do
String.to_existing_atom(key)
rescue
ArgumentError -> nil
end
defp plan_content(value) do
case to_text(value) |> String.trim() do
"" -> "(untitled step)"
content -> content
end
end
defp clamp(value, allowed, default) do
str = value |> to_text() |> String.trim()
if str in allowed, do: str, else: default
end
defp text_block(text), do: %{"type" => "text", "text" => to_text(text)}
# A short, schema-required, never-nil title for a tool_call.
defp title(name, args) when is_map(args) do
cond do
is_binary(args["path"]) -> "#{name} #{args["path"]}"
is_binary(args["command"]) -> "#{name}: #{truncate(args["command"])}"
is_binary(args["cmd"]) -> "#{name}: #{truncate(args["cmd"])}"
true -> name
end
end
defp title(name, _args), do: name
defp truncate(s) when is_binary(s) do
if String.length(s) > 60, do: String.slice(s, 0, 57) <> "...", else: s
end
defp result_text(true, data), do: to_text(Map.get(data, "output", ""))
defp result_text(false, %{"error" => %{kind: kind, message: message}}),
do: "#{kind}: #{message}"
defp result_text(false, %{"error" => %{"kind" => kind, "message" => message}}),
do: "#{kind}: #{message}"
defp result_text(false, data), do: to_text(Map.get(data, "output") || Map.get(data, "error"))
defp to_text(text) when is_binary(text), do: text
defp to_text(nil), do: ""
defp to_text(other), do: inspect(other)
end