# CMDC — AI Complete Reference (llms-full.txt)
# Elixir Agent Kernel v0.2.1 | OTP gen_statem | Hex: {:cmdc, "~> 0.2"}
# Quick reference: llm.txt | HexDocs: https://hexdocs.pm/cmdc
#
# This file contains everything an AI needs to correctly use, extend,
# and integrate with CMDC. Read this file to understand the full API,
# event protocol, Plugin/Tool development patterns, and known pitfalls.
################################################################################
# 1. ARCHITECTURE OVERVIEW
################################################################################
# CMDC is an Elixir Agent Kernel library built on OTP gen_statem.
# It provides a complete agent loop: prompt → LLM stream → tool execution → reply.
#
# Module dependency layers (only downward dependencies allowed):
#
# L0 Foundation Options / Config / Context / Message / Event / EventBus / SubAgent
# L1 Abstractions Blueprint / Plugin / Tool / Sandbox / Skill / Provider
# L2 Agent Kernel Agent (gen_statem) + State + Stream + ToolRunner
# + BasePrompt + SystemPrompt + Compactor + ArgTruncator
# L3 Integration SessionServer + MCP.* + Memory.ETS + SubAgent.Supervisor + Gateway
# L4 Facade CMDC (public API entry point)
#
# Process tree per session:
#
# SessionServer (Supervisor)
# ├── Agent (gen_statem) — 4-state loop: idle → running → streaming → executing_tools
# └── SubAgent.Supervisor (DynamicSupervisor)
# └── child Agent processes (isolated, :temporary restart strategy)
################################################################################
# 2. PUBLIC API — CMDC MODULE
################################################################################
# All operations target a session pid (CMDC.SessionServer process).
# Events are decoupled via CMDC.EventBus (Registry-based PubSub).
# --- Types ---
# @type session :: pid() | String.t()
# @type create_opts :: keyword() | CMDC.Options.t()
#
# session 入参 — v0.2 起所有 CMDC.* API 同时接受 SessionServer pid 和 session_id 字符串。
# 字符串会通过 CMDC.SessionRegistry(unique :via 注册表)反查 pid。
# 反查不命中时除 subscribe/unsubscribe/user_respond 外都会 raise ArgumentError。
#
# 反查辅助:
# CMDC.SessionServer.whereis(session_id) :: pid() | nil
#
# 公开 API 入参矩阵:
# create_agent : keyword | Options (返回 pid)
# prompt : pid | string (raise if missing)
# collect_reply : pid | string (raise if missing)
# subscribe : pid | string (string 提前订阅合理,不 raise)
# unsubscribe : pid | string (同上)
# status : pid | string (raise if missing)
# abort : pid | string (raise if missing)
# steer : pid | string (raise if missing)
# approve : pid | string (raise if missing)
# reject : pid | string (raise if missing)
# stop : pid | string (raise if missing)
# agent_pid : pid | string (raise if missing)
# session_id : pid | string (string 直接返回,不查注册表)
# user_respond : pid | string (string 直接 broadcast,不查注册表)
# --- create_agent/1 ---
# @spec create_agent(create_opts()) :: {:ok, session()} | {:error, term()}
#
# Three invocation styles:
#
# Style 1: keyword list (recommended)
{:ok, session} = CMDC.create_agent(
model: "anthropic:claude-sonnet-4-5", # required — "provider:model_id"
tools: [CMDC.Tool.ReadFile, CMDC.Tool.Shell], # optional, default []
plugins: [CMDC.Plugin.Builtin.SecurityGuard], # optional, default []
system_prompt: "You are a coding assistant.", # optional, nil = BasePrompt only
working_dir: "/my/project", # optional, default "."
provider_opts: [api_key: "sk-..."], # optional, passed to req_llm
max_turns: 50, # optional, default 100
max_tokens: nil, # optional, nil = provider decides
skills_dirs: ["./skills"], # optional, SKILL.md discovery
memory: ["AGENTS.md"], # optional, memory file paths
sandbox: nil, # optional, Sandbox module or nil
subagents: [], # optional, [SubAgent.t()]
response_format: nil, # optional, JSON Schema map or nil
user_data: %{tenant_id: "t-1"}, # optional, business context map → ctx.user_data
messages: [], # optional, [%CMDC.Message{}] prior history; default []
session_id: "my-session-123" # optional, auto-generated if omitted
)
# Style 2: Options struct
opts = CMDC.Options.new!(model: "openai:gpt-4o", tools: [CMDC.Tool.Shell])
{:ok, session} = CMDC.create_agent(opts)
# Style 3: Blueprint
{:ok, session} = CMDC.create_agent(blueprint: MyApp.CodingAgent)
# --- prompt/2 ---
# @spec prompt(session(), String.t()) :: %{queued: boolean()}
# queued: false = immediate processing, true = Agent is busy, message enqueued
%{queued: false} = CMDC.prompt(session, "Refactor this module")
# --- collect_reply/2 ---
# @spec collect_reply(session(), keyword()) :: {:ok, String.t()} | {:error, :timeout} | {:error, term()}
# Blocks until Agent returns to idle. Subscribes to EventBus internally.
{:ok, reply} = CMDC.collect_reply(session, timeout: 60_000)
# --- stop/2 ---
# @spec stop(session(), keyword()) :: :ok
# Options: :reason (default :normal), :timeout (default 5_000)
CMDC.stop(session)
# --- abort/1, abort/2 --- (v0.2 RFC B6)
# @spec abort(session(), keyword()) :: :ok
#
# Options:
# :reason — 中止原因;事件变 {:agent_abort, reason}(不传则裸 :agent_abort)
# :clear_queue — boolean,默认 true。清空 pending prompt 队列,每条 emit {:prompt_dropped, text}
# :kill_tools — :all | :killable | :none,默认 :killable
# :all — brutal_kill 全部 in-flight 工具(含 immune)
# :killable — 只杀非 immune 工具,与 Steering 一致
# :none — 不杀,让工具自然完成(适合"温柔停止")
# 每杀一个工具 emit {:tool_killed, %{name, call_id, reason}}
#
# 4 状态行为表:
# :idle → :agent_abort(no-op,但仍 emit 便于订阅方对账)
# :running → cancel stream + emit
# :streaming → cancel stream + emit
# :executing_tools → 按 :kill_tools 模式杀工具 + cancel stream + emit
#
# 事件保证::agent_abort 100ms 内到达订阅方(BEAM 调度延迟,本地 EventBus 通常 <10ms)
# 多次 abort 幂等。
CMDC.abort(session)
CMDC.abort(session, reason: :user_cancelled)
CMDC.abort(session, kill_tools: :all)
CMDC.abort(session, kill_tools: :none, clear_queue: false)
# --- switch_model/2 + switch_model/3 --- (v0.2 RFC C8 + v0.3 RFC 11G #A17)
# @spec switch_model(session(), CMDC.Provider.model()) :: :ok
# @spec switch_model(session(), CMDC.Provider.model(), keyword()) :: :ok
#
# 运行期切换 LLM 模型,不重启 Agent;messages / tools / plugin 状态全部保留。
#
# 行为:
# :idle → 立即更新 state.model,emit {:model_switched, %{from, to, provider_opts_changed?}}
# :running/streaming/executing_tools
# → state.model 立即更新;当前轮继续用旧模型;下一轮用新模型
# 同 model + 无 provider_opts → no-op,**不发事件**(订阅方无需去重)
#
# v0.3 选项:
# :provider_opts :: keyword()
# 与 model 一并替换 state.config.provider_opts;典型场景:Anthropic → OpenAI 自建网关
# 需同时改 base_url / api_key / timeout_ms
# 传 nil 或不传 → 保留现有 provider_opts,事件 provider_opts_changed? = false
#
# 兼容性警告:
# - system_prompt 兼容性由 Provider 层处理(OpenAI system 消息 vs Anthropic system 字段)
# - 上下文窗口差异:从 200k 切到 8k 模型时,调用方需先评估是否要 compact
# - tool_calling schema 由 Provider 适配
#
# 触发方式:
# 1) 调用方主动:
# CMDC.switch_model(session, "openai:gpt-4o-mini")
# CMDC.switch_model(session, "openai:gpt-4o-mini",
# provider_opts: [base_url: "https://api.fallback.com", api_key: "sk-..."])
# 2) Plugin Action:在 :before_request / :after_response / :after_tool / :after_tool_batch /
# :before_tool 钩子中返回 {:switch_model, model_string, state}
# 4 元组同步切 provider_opts:{:switch_model, model_string, state, provider_opts: [...]}
# 多个 plugin 同时返回 → priority 较大者(后执行)的值生效
# 注意::on_tool_error 在 Task retry 内触发,无 state 上下文,switch_model 被忽略;
# 若需基于工具失败切 model,请用 :after_tool 钩子并匹配 {:error, _} result
#
# 事件:{:model_switched, %{from: old_model, to: new_model, provider_opts_changed?: boolean()}}
CMDC.switch_model(session, "openai:gpt-4o-mini")
CMDC.switch_model(session, "openai:gpt-4o-mini",
provider_opts: [base_url: "https://api.fallback.com/v1", api_key: "sk-fallback", timeout_ms: 90_000]
)
# --- messages/1 --- (v0.2)
# @spec messages(session()) :: [CMDC.Message.t()]
# 完整对话历史(含 system / user / assistant / tool_result),按时间正序。
# 适合:切模型前查看历史 / 审计 / 持久化。
CMDC.messages(session)
# --- attach_tool/2 + detach_tool/2 --- (v0.2 RFC C9)
# @spec attach_tool(session(), module()) :: :ok | {:error, :already_attached | :invalid_tool}
# @spec detach_tool(session(), String.t()) :: :ok | {:error, :not_found}
#
# 运行期挂载/卸载工具,**下一次** LLM 请求生效(重生成 tools schema)。
# 不重启 Agent,messages / plugin 状态全保留。
#
# attach 失败原因:
# :already_attached — 已存在同名 tool(按 tool.name() 比较)
# :invalid_tool — 模块未实现 CMDC.Tool 必需回调(name/0, description/0, parameters/0, execute/2)
#
# detach 失败原因:
# :not_found — 不存在同名 tool
#
# 事件:
# {:tool_attached, name :: String.t()} — attach 成功
# {:tool_detached, name :: String.t()} — detach 成功
# {:tool_call_unknown, name, call_id} — LLM 调用了不在 tools 列表的工具
# (通常因 detach 已发生但 streaming 已经引用了旧 tool)
# Agent 会自动注入 {:error, "tool not found: ..."}
# synthetic tool_result 让 LLM 在下一轮自我纠正
#
# 已 in-flight 工具不受 detach 影响:仍会跑完并返回结果。
#
# 适用场景:
# - MCP 热加载:用户对话中说"装个 GitHub MCP",无需 stop+create+restore_messages
# - Skill 进化:升级一个 Skill 不必重启会话
# - 基于上下文动态扩展:根据 LLM 当前任务挂载相关工具
:ok = CMDC.attach_tool(session, MyApp.Tools.GitHubMCP)
:ok = CMDC.detach_tool(session, "shell")
# --- EventBus Ring Buffer + subscribe(:since) --- (v0.2 RFC C10)
# @spec subscribe(session(), keyword()) :: {:ok, pid()} | {:error, term()}
#
# 默认关闭,零内存开销。开启方法:在 create_agent 时传 event_buffer_size > 0。
{:ok, session} = CMDC.create_agent(
model: "anthropic:claude-sonnet-4-5",
event_buffer_size: 100 # FIFO ring buffer,存最近 100 条事件
)
#
# 适用场景:WebSocket / Phoenix Channel 短暂断线重连不丢事件
#
# 断线前记录 last_index:
last_idx = CMDC.EventBus.last_index(session_id) # nil 表示未开启 buffer
#
# 重连后 replay:
{:ok, _} = CMDC.subscribe(session, since: last_idx)
# replay last_idx < event_idx <= now 的所有事件,按时序投递到当前进程
# 之后的实时事件正常订阅
#
# Buffer 满后行为:
# - new_idx > buffer_size 时自动 select_delete idx <= (new_idx - buffer_size)
# - subscribe(since: 0) 在 buffer 已 wrap 时仅 replay 还在 buffer 内的部分
# - 不抛异常,不发警告
#
# 典型容量建议:
# - 50 — IM 网关,约覆盖 0.5-1 秒 stream chunk 风暴,每会话 < 10KB
# - 100 — UI 客户端,约覆盖 1-2 秒 stream chunk 风暴
# - 200 — 监控/审计,约覆盖一次 turn 的全部事件
# - 500 — 长任务调试,配合 download_replay 做 dry-run
#
# 注意:
# - Buffer 仅活在该 session 所在 BEAM 节点的 ETS(:cmdc_event_buffer ordered_set)
# - 不跨节点;生产环境多节点请配合 Phoenix.PubSub / 外部存储做长期回放
# - Agent terminate 时自动调用 EventBus.disable_buffer(sid) 清理
# - subscribe(since: nil) 等同 subscribe(session)(不 replay 任何)
# - subscribe(since: N) 当 N >= last_index 时 0-replay,仅订阅未来事件
#
# 工具 API:
CMDC.EventBus.last_index(sid) # non_neg_integer | nil
CMDC.EventBus.buffer_size(sid) # 当前缓存事件数(用于监控)
CMDC.EventBus.enable_buffer(sid, n) # 通常由 Agent.init 自动调用,无需手动
CMDC.EventBus.disable_buffer(sid) # 通常由 Agent.terminate 自动调用
# --- monitor/1 + demonitor/2 --- (v0.2 RFC C12)
# @spec monitor(session()) :: reference()
# @spec demonitor(session(), reference()) :: :ok
#
# Agent 崩溃监控 + 结构化 reason。替代 Process.monitor 以避免集成方自行解析
# :shutdown / {:shutdown, _} / {exception, stacktrace} 等 raw reason。
#
ref = CMDC.monitor(session)
# 接受 pid (SessionServer) / session_id string / Agent pid 任一
# 返回 CMDC 自维护的 reference(),与 Process.monitor 的 ref 不同
#
# Agent 退出时观察者进程收到:
# {:cmdc_down, ref, session_id, structured_reason}
#
# ## 结构化 reason 表
#
# | reason 原值 | structured_reason |
# |-------------------------------------|-----------------------------|
# | :normal | :normal |
# | :shutdown | :normal |
# | {:shutdown, any()} | :normal |
# | %RuntimeError{} / 其他 exception struct | {:exception, exception} |
# | {exception, stacktrace} | {:exception, exception} |
# | 其他 atom / term | 原样透传 |
#
# ## 预留扩展(state.exit_reason override)
#
# 未来可由 CMDC 内部分支显式设置 state.exit_reason 来覆盖 raw reason,
# 支持以下结构化原因(v0.2 起定义,实现按需渐进):
#
# :max_turns_exceeded — 超出 Options.max_turns
# :provider_timeout — Provider.stream 超时
# {:plugin_aborted, name, why} — Plugin 返回 :abort
#
# 取消监控:
:ok = CMDC.demonitor(session, ref)
#
# 注意:
# - CMDC.monitor 返回的 ref **不能** 用 Process.demonitor/1 取消,必须用 CMDC.demonitor/2
# - Agent 崩溃时通过 terminate/3 callback 广播,brutal_kill (:kill) 不会触发
# - 多个观察者独立订阅,每个得到自己的 ref
# - Agent.init 有 Process.flag(:trap_exit, true),确保 supervisor shutdown 也调用 terminate
# --- PromptMode(v0.2 Phase 10B)---
#
# @type prompt_mode :: :full | :task | :minimal | :none
#
# 控制 `CMDC.Agent.SystemPrompt.build/1` 注入的段落密度,用于 SubAgent
# 或长会话 system prompt token 降本。
#
# ## 4 模式注入矩阵
#
# | 模式 | user_sp | BasePrompt | Identity | tools 清单 | Skills | Memory |
# |------|---------|------------|----------|-----------|--------|--------|
# | :full | ✓ | 完整基座 (~3.5KB) | ✓ | name + description | ✓ | ✓ |
# | :task | ✓ | 精简基座 (~500B) | ✓ | name + description | ✗ | ✗ |
# | :minimal | ✓ | ✗ | ✓ | 仅 name 列表 | ✗ | ✗ |
# | :none | ✓ | ✗ | ✗ | ✗ | ✗ | ✗ |
#
# ## 典型用途与节省量(9 个工具 + 2 skills + 1 memory 条目)
#
# | 模式 | 字节数 | 估算 tokens | vs :full | 推荐场景 |
# |----------|-------|-------------|----------|---------|
# | :full | 3783 | ~945 | 100% | 主 Agent / 单次短会话 |
# | :task | 1829 | ~457 | 48% | SubAgent 默认、多步任务 |
# | :minimal | 238 | ~59 | 6% | 快速路由 / 分类子任务 |
# | :none | 54 | ~13 | 1% | 完全由 system_prompt 接管 |
#
# ## 使用
#
# 主 Agent 层:
{:ok, session} = CMDC.create_agent(
model: "anthropic:claude-sonnet-4-5",
prompt_mode: :full, # 默认
system_prompt: "你是 Elixir 专家。",
tools: [...],
memory: ["AGENTS.md"]
)
# SubAgent 层(默认 :task,无需显式写):
CMDC.SubAgent.new!(
name: "coder",
description: "专门写 Elixir 代码的子代理"
# prompt_mode: :task — 默认值,自动省 ~50% system prompt token
)
# 快速路由子代理(极简 prompt):
CMDC.SubAgent.new!(
name: "router",
description: "把任务分类到对应子代理",
prompt_mode: :minimal, # 只列工具名,不含 BasePrompt
tools: [ClassifyTool]
)
# 自主 system_prompt 子代理(跳过所有内置 prompt):
CMDC.SubAgent.new!(
name: "custom",
system_prompt: "完全自定义的 system prompt...",
prompt_mode: :none
)
# ## 传递链
# Options.prompt_mode → Agent.State.prompt_mode → CMDC.Context.prompt_mode
# → Tool.Task 派发子代理时从 Context 提取为父 Options
# → SubAgent.to_options 合并(sub.prompt_mode 非 nil 覆盖,nil 继承父)
#
# ## 注意
# - SubAgent.prompt_mode 默认就是 :task(SDK 级默认省 token)
# - 显式传 nil 才继承父 Agent 的 prompt_mode
# - `:task` / `:minimal` 仍保留 user 自定义 system_prompt (用于 blueprint/hand-written prompt)
# --- MemoryFlush(v0.2 Phase 10C)---
#
# P2 Plugin — 压缩(Compact)前提取关键事实并追加到 working_dir/MEMORY.md,
# 实现"长会话不失忆"。对标 GoClaw memoryflushmiddleware。
#
# ## 工作原理
#
# 1. Agent 触发 Compactor 前,emit `:before_compact` Plugin Pipeline
# 2. MemoryFlush.handle_event({:before_compact, messages}, state, ctx)
# 3. 调用 extract_fn(messages, opts) 提取 N 条 facts
# - 默认启发式(无 LLM 依赖,便于单测)
# - 可覆盖为真实 LLM 提取函数
# 4. sha256 去重 → 追加到 working_dir/MEMORY.md
# 5. emit :memory_flushed 内部细分事件
# 6. emit :plugin_event 通用事件(RFC D14),payload:
# %{kind: :memory_flush, facts, count, session_id, occurred_at, file, v: 1}
#
# ## 配置
#
# {CMDC.Plugin.Builtin.MemoryFlush,
# file: "MEMORY.md", # 默认 "MEMORY.md"(相对 working_dir)
# max_facts_per_flush: 10, # 单次 flush 最多条数
# extract_fn: &MyApp.extract/2,
# dedupe: true # sha256 去重(默认开启)
# }
#
# ## 闭环:与 MemoryLoader 配合
#
# 会话 A:MemoryFlush → 追加 working_dir/MEMORY.md
# ↓
# 会话 B(可跨进程 / 跨节点):
# MemoryLoader.handle_event(:session_start, ...) 读 MEMORY.md
# → emit {:memory_contents, %{"MEMORY.md" => ...}}
# → state.memory_contents 更新
# → SystemPrompt.build/1 在 `:full` 模式下把 memory_contents 渲染进 system prompt
#
# 建议 Agent 同时注册这两个 Plugin:
#
{:ok, session} = CMDC.create_agent(
model: "anthropic:claude-sonnet-4-5",
working_dir: "/project",
plugins: [
CMDC.Plugin.Builtin.MemoryLoader,
CMDC.Plugin.Builtin.MemoryFlush
]
)
#
# ## 失败降级
#
# - extract_fn 抛异常:Logger.warning,不阻塞 compact,返回 :continue
# - extract_fn 返回 {:error, _}:同上
# - 文件写入失败:不阻塞 compact,返回 :continue
#
# "MemoryFlush 出问题也不影响主 Agent Loop" — 副作用隔离。
#
# ## 自定义 LLM 提取器
#
# extract_fn: fn messages, opts ->
# prompt = \"\"\"
# 从对话历史中提取最多 #{opts[:max_facts]} 条关键事实。
# 只输出事实文本,每条一行。
# \"\"\"
# case ReqLLM.chat(model: "...", messages: messages, system: prompt) do
# {:ok, %{content: text}} ->
# facts = text |> String.split("\n") |> Enum.map(&String.trim/1)
# {:ok, facts}
# {:error, reason} -> {:error, reason}
# end
# end
#
# ## RFC D14 — :plugin_event 事件规范
#
# 设计目标:集成方(Hive / 企业应用)订阅单个通用事件就能拿到全部 Plugin 推送
# 到 Agent EventBus 的业务事件,避免为每个 Plugin 写订阅逻辑。
#
# payload schema(带 version 字段便于演进):
# %{
# kind: atom(), # :memory_flush / 未来其他 Plugin 自定义
# v: pos_integer(), # payload schema 版本,当前 1
# session_id: String.t(),
# occurred_at: integer(), # System.system_time(:millisecond)
# ...Plugin-specific fields # kind=:memory_flush 时: facts, count, file
# }
#
# 集成示例 — 把 flushed facts 写入数据库:
#
# CMDC.subscribe(session)
# receive do
# {:cmdc_event, _sid, {:plugin_event, %{kind: :memory_flush} = payload}} ->
# MyApp.Memory.save_long_term(payload.session_id, payload.facts, payload.occurred_at)
# end
# --- subscribe/1, unsubscribe/1 ---
# @spec subscribe(session()) :: {:ok, pid()} | {:error, term()}
# @spec unsubscribe(session()) :: :ok
# After subscribe, caller receives {:cmdc_event, session_id, event_tuple}
CMDC.subscribe(session)
CMDC.unsubscribe(session)
# --- approve/3, reject/3 ---
# @spec approve(session(), String.t(), keyword()) :: :ok
# @spec reject(session(), String.t(), keyword()) :: :ok
# approval_id comes from {:approval_required, approval_map}.id field
#
# Options:
# :auto_resume — boolean
# approve default: true (Agent idle 时自动开新 turn 让 LLM 重试被拦截工具)
# reject default: false (默认让 Agent 留在 idle 等下次 prompt)
#
# Auto-resume 触发时会广播 {:agent_resumed, %{trigger, approval_id}}
# trigger ∈ :tool_approved | :tool_rejected | :tool_approval_timeout
#
# v0.1.x 下需要的 sleep+re-prompt hack 已经 ❌ 不再需要
CMDC.approve(session, approval_id) # 默认自动续 turn
CMDC.approve(session, approval_id, auto_resume: false) # 老行为(仅放行 pending_approvals 不续 turn)
CMDC.reject(session, approval_id) # 留在 idle,等用户下次 prompt
CMDC.reject(session, approval_id, auto_resume: true) # 拒绝后立刻续 turn 让 LLM 改方案
# --- user_respond/3 ---
# @spec user_respond(session(), String.t(), term()) :: :ok
# 应答 CMDC.Tool.AskUser 工具的等待请求(与 approve/reject 对称的公开 API)
# AskUser 工具会广播 {:ask_user, sid, question, options, ref}
# 应答方式:CMDC.user_respond(session, ref, response)
# session 入参:pid 或 session_id 字符串
# response:任意可序列化的应答(字符串、字符串列表、map 等)
CMDC.user_respond(session, ref, "我的回答")
CMDC.user_respond("my-sid", ref, ["A", "B"]) # multi-select 场景
# --- status/1 ---
# @spec status(session()) :: %{
# state: :idle | :running | :streaming | :executing_tools,
# session_id: String.t(),
# model: String.t(), # v0.2 RFC C8
# turns: non_neg_integer(),
# tool_calls: non_neg_integer(),
# total_tokens: non_neg_integer(),
# uptime_ms: non_neg_integer(),
# timestamp_ms: integer(),
# pending_tools: [%{name: String.t(), call_id: String.t(), args: map(), started_at_ms: integer()}],
# pending_approvals: [%{id: String.t(), tool: String.t(), args: map(),
# session_id: String.t(), requested_at: integer(),
# hint: String.t()}],
# queues: %{prompt_queue: non_neg_integer(),
# steering_queue: non_neg_integer()}
# }
#
# v0.2 新增 :pending_tools / :pending_approvals / :queues 三个字段,便于 UI / 监控 /
# 健康探针在不订阅 EventBus 的前提下取到运行期可观测信息。
#
# - :pending_tools — 当前正在执行的工具(Task 已 spawn、未完成),typically empty
# when state == :idle / :streaming
# 每项含 `started_at_ms`(v0.3 RFC 11G #A15);
# 配合 `:tool_execution_metrics` 事件可统计 tool 耗时分布,
# 监控方无需自己埋点
# - :pending_approvals — 来自启用了 awaiting_approvals 字段的 Plugin(如
# CMDC.Plugin.Builtin.HumanApproval)的实时聚合
# - :queues — :prompt_queue 是 pending_messages 长度(busy 时排队的 prompt)
# :steering_queue 是 steering 队列长度
CMDC.status(session)
# --- agent_pid/1 ---
# @spec agent_pid(session()) :: pid() | nil
CMDC.agent_pid(session)
# --- session_id/1 ---
# @spec session_id(session()) :: String.t()
CMDC.session_id(session)
# --- steer/2 — Mid-execution Soft Interrupt(中段软中断) ---
# @spec steer(session(), String.t()) ::
# {:ok, reference()} | {:error, :queue_full | :rejected | :invalid_text}
#
# 在 Agent 执行**中段**追加一条新指引,无需先 abort 再 prompt,无需丢弃中间产出。
# 详见 docs/dev/steering-design.md 与本文档第 17.5 节。
#
# 状态行为:
# :idle → 等同 prompt/2,立刻进入新 turn(仅 emit :steering_applied)
# :running → 入 :steering_queue,下个 turn 间隙合并注入
# :streaming → 入 :steering_queue,**不打断** LLM 流;stream_done 后注入
# :executing_tools → 入 :steering_queue,下次 collect_result 时把所有非
# interrupt_immune_tools 的 in-flight 工具 brutal_kill,
# 全部清空后注入合并 [Steering] message
#
# 返回:
# {:ok, ref} — 已入队(不等同已生效;生效信号通过 :steering_applied 事件广播)
# {:error, :queue_full} — :steering_queue 已满(默认 max_steering_queue=3)
# {:error, :rejected} — Plugin 拦截 :before_steering 返回 :abort
# {:error, :invalid_text} — 入参非合法字符串(空字符串或非 binary)
#
# 示例:
CMDC.subscribe(session)
CMDC.prompt(session, "搜索 Elixir gen_statem 教程,分析每个并整理成文档")
# 一段时间后用户改主意(不需要 abort)
{:ok, _ref} = CMDC.steer(session, "改成只看官方 hexdocs,不要第三方教程")
receive do
{:cmdc_event, _sid, {:steering_applied, %{count: n}}} ->
IO.puts("已注入 #{n} 条 steering,agent 将基于新指引重新规划")
end
################################################################################
# 3. OPTIONS — CMDC.Options
################################################################################
# Typed struct with NimbleOptions runtime validation.
# @enforce_keys [:model]
#
# @type plugin_spec :: module() | {module(), keyword()}
# @type t :: %CMDC.Options{
# model: String.t(), # required — "provider:model_id"
# tools: [module()], # default []
# system_prompt: String.t() | nil, # default nil
# plugins: [plugin_spec()], # default []
# subagents: [map()], # default []
# skills_dirs: [String.t()], # default []
# memory: [String.t()], # default []
# sandbox: module() | nil, # default nil
# response_format: map() | nil, # default nil
# provider_opts: keyword(), # default []
# max_turns: pos_integer(), # default 100
# max_tokens: pos_integer() | nil, # default nil
# working_dir: String.t(), # default "."
# max_steering_queue: pos_integer(), # default 3 — Steering queue 上限
# interrupt_immune_tools: [String.t()], # 默认 6 个写/副作用工具白名单
# user_data: map(), # default %{} — 业务上下文透传
# messages: [CMDC.Message.t()] # default [] — 启动时灌入历史
# }
#
# Steering 字段解释:
# :max_steering_queue — 同时排队的 steering 指令数上限。溢出时新 steer 返回
# {:error, :queue_full} + emit :steering_received
# (status=:rejected_full)。
# :interrupt_immune_tools — Steering 触发 brutal_kill 时**不**会被杀的工具名白名单。
# 默认 ~w(write_file edit_file shell git_commit
# notebook_edit ask_user)
# 覆盖时**整体替换**(不是追加)。
#
# History injection (`:messages`)字段解释:
# :messages — 历史消息列表(按时间顺序,最早在前),元素必须是 %CMDC.Message{}。
# Agent.init 时**直接写入** state.messages(内部按反序存储,
# State.append_messages/2 自动 reverse),跳过让 LLM 把
# "<history>...</history>" 当 prompt 重读一遍的轮次。
#
# 适用场景:idle 休眠唤醒 / 跨进程恢复 / 灾难恢复 / 历史快照重播。
# 不传 / 传 [] 时行为完全等同旧版(首次 prompt 之前 messages 为空)。
#
# 注意:
# - 与 :system_prompt 解耦——SystemPrompt.build/1 始终把
# blueprint_system_prompt 拼在 system 消息里注入,
# 调用方**不要**在 :messages 里再放 :system 消息。
# - 长度无内置上限;CMDC 不会静默截断。如需控制 prompt_tokens,
# 请在调用方按需要 summarize 或保留最近 N 条。
# - 灌入后第一次 prompt/2 → LLM 看到的 messages =
# [system, ...history, new_user_prompt],自然续接。
# Construction:
opts = CMDC.Options.new!(model: "anthropic:claude-sonnet-4-5", tools: [CMDC.Tool.Shell])
{:ok, opts} = CMDC.Options.new(model: "openai:gpt-4o")
CMDC.Options.schema() # => NimbleOptions.t()
CMDC.Options.merge(opts, max_turns: 20) # => new Options
# Construction with prior history (idle-resume / cross-process restore):
restored =
CMDC.Options.new!(
model: "anthropic:claude-sonnet-4-5",
system_prompt: "你是一名严谨的 Elixir 审计员。",
messages: [
CMDC.Message.user("帮我审核这段代码"),
CMDC.Message.assistant("好的,请贴上来", [], []),
CMDC.Message.user("def hello, do: :world")
]
)
{:ok, session} = CMDC.create_agent(restored)
CMDC.prompt(session, "继续:这段代码的复杂度如何?")
# 第一次 prompt → mock provider 看到 messages =
# [system_prompt(blueprint+user), 4×已存档 user/assistant, 新 user prompt]
# 不会浪费一次 LLM 调用让模型 "self-explain history"
################################################################################
# 4. EVENT PROTOCOL
################################################################################
# All events broadcast via CMDC.EventBus as:
# {:cmdc_event, session_id :: String.t(), event :: CMDC.Event.t()}
#
# CMDC.Event.valid?/1 accepts atoms and tuples; CMDC.Event.all_types/0 returns all type atoms.
# --- Session / Lifecycle ---
# :agent_start — Agent 开始处理本轮 prompt
# {:agent_end, messages, token_usage} — 本轮完成,回到 idle;token_usage: %{prompt_tokens, completion_tokens, total_tokens}
# {:agent_abort, reason} | :agent_abort — Agent 被中止(含/不含 reason)
# {:prompt_received, text} — 收到用户 prompt
# {:prompt_queued, text} — Agent 忙,prompt 已入队
# {:prompt_rejected, reason} — prompt 被 Plugin 拒绝
# --- Streaming ---
# :message_start — LLM 开始生成文本(首个 content chunk 前)
# {:message_delta, %{delta: text}} — 流式 token 片段
# {:response_complete, %CMDC.Message{}} — 本轮响应完整结束(含 tool_calls 已解析)
# :thinking_start — 思考链开始(部分模型)
# {:thinking_delta, %{delta: text}} — 思考链 token 片段
# {:status_update, text} — LLM 响应中 <status> 标签内容
# {:title_generated, title} — LLM 响应中 <title> 标签内容(限 60 字符)
# --- Provider / Request ---
# {:request_start, %{model: m, messages: count}} — LLM 请求即将发起
# {:stream_error, reason} — 流式响应出错
# {:stream_stalled, elapsed_s} — 流式疑似卡住(超 stall 阈值)
# {:retry, attempt, delay_ms, reason} — 即将重试 Provider 请求
# {:context_overflow, reason} — 上下文长度超限,尝试压缩
# --- Tool Execution ---
# {:tool_calls, count} — 本轮 LLM 请求了 count 个工具
# {:tool_execution_start, name, call_id, args} — 工具开始执行
# {:tool_execution_end, name, call_id, result} — 工具完成,result: {:ok, text} | {:error, text}
# {:tool_execution_metrics, name, call_id, meta} — v0.3 RFC 11G #A15 工具耗时指标;
# 紧跟 :tool_execution_start:meta = %{started_at_ms: integer()}
# 紧跟 :tool_execution_end: meta = %{started_at_ms, ended_at_ms, duration_ms}
# 适合做 tool 耗时 P99 监控 / 慢工具告警;订阅方无需自己埋点
# {:tool_blocked, name, call_id, reason} — 工具被 Plugin block_tool 拦截
# {:loop_detected, %{type: type, ...}} — 循环检测触发
# type: :repeat_pattern | :file_loop_warn | :file_loop_abort
# --- HITL (Human-in-the-loop) ---
# {:approval_required, approval_map} — 等待人类审批
# {:approval_resolved, approval_map} — 审批已决定
# approval_map: %{
# id: String.t(), — 审批 ID,传给 CMDC.approve/reject
# tool: String.t(), — 被拦截的工具名
# args: map(), — 工具参数
# session_id: String.t(),
# hint: String.t(), — 用户提示文本
# requested_at: integer(), — 毫秒时间戳
# status: :approved | :rejected | :timeout # 仅 approval_resolved 有此字段
# }
#
# {:agent_resumed, %{trigger, approval_id}} — approve/reject auto_resume 触发,Agent 已续 turn
# trigger: :tool_approved | :tool_rejected | :tool_approval_timeout
# approval_id: String.t() — 触发续 turn 的审批 ID
# 仅在 Agent 处于 idle 且 approve/reject 的 :auto_resume 为 true 时广播
# (approve 默认 true / reject 默认 false / approval timeout 默认 true)
# --- Agent 提问 ---
# {:ask_user, session_id, question, options, ref} — Agent 主动提问
# {:user_responded, session_id, ref, response} — 用户回答
# --- Planning ---
# {:todo_change, session_id, todos :: [map()]}
# --- Context Compaction ---
# {:compact_start, session_id}
# {:compact_end, session_id, removed_count :: non_neg_integer()}
# --- SubAgent ---
# {:subagent_start, session_id, child_session_id, description}
# {:subagent_end, session_id, child_session_id, result :: {:ok, text} | {:error, reason}}
# {:sub_agent_event, call_id, child_session_id, event} — 子代理内部事件透传
# --- Plugin 自定义事件 ---
# {:plugin_event, name :: atom(), payload :: term()}
# --- Agent 干预 ---
# {:intervention, prompt} — 循环检测/before_finish 注入干预
# {:stop_blocked, prompt} — abort 被 Plugin 拦截,注入继续 prompt
# --- Steering 中段干预(详见第 17.5 节) ---
# {:steering_received, %{ref: reference(), text: String.t(),
# queued_at: integer(),
# status: :queued | :rejected_full | :rejected_by_plugin}}
# — CMDC.steer/2 的处理结果广播。
# status=:queued — 已成功入 :steering_queue
# status=:rejected_full — queue 溢出,已拒绝
# status=:rejected_by_plugin — Plugin :before_steering 返回 :abort
# 注:idle 状态下 steer 不入 queue(透传到 prompt),故**不**广播此事件,
# 直接广播 :steering_applied 表示"立刻生效"。
#
# {:steering_applied, %{refs: [reference()], count: pos_integer()}}
# — :steering_queue 被合并为 1 条 [Steering] user message 注入下个 turn。
# refs 是按时间顺序的所有 ref 列表,count == length(refs)。
#
# {:tool_skipped_for_steering, %{name: String.t(), call_id: String.t(),
# reason: :killed_by_steering | :pending_dispatch}}
# — 工具因 steering 被取消。
# reason=:killed_by_steering — Task.shutdown(:brutal_kill) 已发出
# reason=:pending_dispatch — 预留:v0.3 加 Tool.interruptible? 后细分
# --- Error ---
# {:error, session_id, reason}
# --- EventBus API ---
# CMDC.EventBus.subscribe(session_id) # => {:ok, pid}
# CMDC.EventBus.subscribe_all() # subscribe to all sessions
# CMDC.EventBus.broadcast(session_id, event)
# CMDC.EventBus.unsubscribe(session_id)
# CMDC.EventBus.unsubscribe_all()
# --- Consuming events pattern ---
CMDC.subscribe(session)
CMDC.prompt(session, "hello")
loop = fn loop_fn ->
receive do
{:cmdc_event, sid, {:message_delta, %{delta: chunk}}} when is_binary(chunk) ->
IO.write(chunk)
loop_fn.(loop_fn)
{:cmdc_event, sid, {:response_complete, _message}} ->
loop_fn.(loop_fn)
{:cmdc_event, sid, {:tool_execution_start, name, _call_id, _args}} ->
IO.puts("[tool] #{name} started")
loop_fn.(loop_fn)
{:cmdc_event, sid, {:tool_execution_end, name, _call_id, _result}} ->
IO.puts("[tool] #{name} done")
loop_fn.(loop_fn)
{:cmdc_event, _sid, {:agent_end, _messages, _usage}} ->
:done
{:cmdc_event, _sid, {:approval_required, %{id: ref, tool: tool, args: args}}} ->
IO.inspect({tool, args}, label: "Approve?")
CMDC.approve(session, ref)
loop_fn.(loop_fn)
after
60_000 -> {:error, :timeout}
end
end
loop.(loop)
################################################################################
# 5. MESSAGE — CMDC.Message
################################################################################
# @derive Jason.Encoder
# @enforce_keys [:id, :role]
#
# @type role :: :system | :user | :assistant | :tool_result
# @type tool_call :: %{call_id: String.t(), name: String.t(), arguments: map()}
# @type t :: %CMDC.Message{
# id: String.t(),
# parent_id: String.t() | nil,
# role: role(),
# content: String.t() | nil,
# thinking: String.t() | nil,
# tool_calls: [tool_call()] | nil,
# call_id: String.t() | nil,
# name: String.t() | nil,
# is_error: boolean(), # default false
# metadata: map() | nil
# }
# Factory functions:
Message.system("You are helpful.")
Message.user("Hello")
Message.assistant("Sure!", [%{call_id: "c1", name: "read_file", arguments: %{"path" => "a.ex"}}])
Message.assistant("Plain text response.") # no tool_calls
Message.tool_result("c1", "file contents here", false) # (call_id, output, is_error)
Message.to_map(msg) # => map for serialization
################################################################################
# 6. CONTEXT — CMDC.Context
################################################################################
# Passed to Tool.execute/2 and Plugin.handle_event/3 as execution context.
# @enforce_keys [:session_id, :working_dir, :model]
#
# @type t :: %CMDC.Context{
# session_id: String.t(),
# working_dir: String.t(),
# model: String.t(),
# sandbox: module() | nil,
# tools: [module()], # default []
# subagents: [map()], # default []
# config: CMDC.Config.t() | nil,
# todos: [map()], # default []
# memory_contents: %{String.t() => String.t()}, # default %{}
# user_data: map(), # default %{} — verbatim from Options.user_data
# turn: non_neg_integer(), # default 0
# total_tokens: non_neg_integer(), # default 0
# cost_usd: float(), # default 0.0
# last_assistant_reply: String.t() | nil # v0.3 Phase 11D — 最后一条 assistant message 的文本
# # Plugin/Tool 直接读取无需翻 messages
# }
# Construction (usually automatic):
ctx = CMDC.Context.from_state(agent_state)
ctx = CMDC.Context.from_options(options, session_id)
# user_data — business context passthrough(多租户/操作者身份/数据范围)
# - Source: Options.user_data 或 SubAgent.user_data(非 nil 时覆盖父级,整张替换不合并)
# - Lifecycle: 写一次(init),运行期不可变;不参与 Compactor,不会丢
# - Tool 端:可以用 pattern match 直接在 execute/2 函数头取出
# - Plugin 端:handle_event/3 第 3 参 ctx 也含同一份 user_data
# - SubAgent:未声明 SubAgent.user_data 时**继承**父;声明时**完全替换**
# 示例:多租户订单查询
defmodule MyApp.Tools.FetchOrders do
@behaviour CMDC.Tool
@impl true
def name, do: "fetch_orders"
@impl true
def description, do: "List orders for the active tenant"
@impl true
def parameters, do: %{"type" => "object", "properties" => %{}}
@impl true
def execute(_args, %CMDC.Context{user_data: %{tenant_id: t}} = ctx) do
rows =
Repo.all(
from o in MyApp.Order,
where: o.tenant_id == ^t and o.workspace == ^ctx.working_dir
)
{:ok, Jason.encode!(rows)}
end
end
# 启动:
{:ok, session} =
CMDC.create_agent(
model: "anthropic:claude-sonnet-4-5",
tools: [MyApp.Tools.FetchOrders],
user_data: %{tenant_id: "acme-1", operator_id: "u-9", data_scope: :restricted}
)
# user_data 也接受 keyword(自动 normalize 成 map):
CMDC.create_agent(model: "...", user_data: [tenant_id: "t-1"])
# => ctx.user_data == %{tenant_id: "t-1"}
################################################################################
# 7. TOOL DEVELOPMENT
################################################################################
# All tools implement @behaviour CMDC.Tool.
#
# Required callbacks:
# @callback name() :: String.t()
# @callback description() :: String.t()
# @callback parameters() :: map() # JSON Schema object
# @callback execute(map(), CMDC.Context.t()) :: result()
#
# Optional callbacks:
# @callback description(tool_context()) :: String.t() # context-aware description
# @callback meta(map()) :: String.t() # short summary for logs
#
# @type result :: {:ok, String.t()} | {:error, String.t()} | {:effect, term()}
# @type tool_context :: %{optional(:working_dir) => String.t(), optional(:shell) => atom()}
# --- Example: Custom Tool ---
defmodule MyApp.Tool.HttpGet do
@moduledoc "Fetch a URL and return the response body."
@behaviour CMDC.Tool
@impl true
def name, do: "http_get"
@impl true
def description, do: "Fetch content from a URL via HTTP GET."
@impl true
def parameters do
%{
"type" => "object",
"properties" => %{
"url" => %{"type" => "string", "description" => "The URL to fetch"}
},
"required" => ["url"]
}
end
@impl true
def execute(%{"url" => url}, _ctx) do
case Req.get(url) do
{:ok, %{status: 200, body: body}} -> {:ok, body}
{:ok, %{status: status}} -> {:error, "HTTP #{status}"}
{:error, reason} -> {:error, inspect(reason)}
end
end
end
# --- Sandbox proxy pattern (for file/exec tools) ---
# If ctx.sandbox is set, delegate to sandbox; otherwise use Sandbox.Local:
def execute(%{"path" => path}, %{sandbox: sandbox} = ctx) when not is_nil(sandbox) do
sandbox.read_file(path, working_dir: ctx.working_dir)
end
def execute(%{"path" => path}, ctx) do
CMDC.Sandbox.Local.read_file(path, working_dir: ctx.working_dir)
end
# --- Tool utility functions ---
CMDC.Tool.to_schema([CMDC.Tool.ReadFile, CMDC.Tool.Shell]) # => [%{name, description, parameters}]
CMDC.Tool.find([CMDC.Tool.ReadFile], "read_file") # => {:ok, CMDC.Tool.ReadFile}
CMDC.Tool.description(CMDC.Tool.Shell, %{working_dir: "/p"}) # context-aware description
# --- Built-in Tool Reference ---
#
# CMDC.Tool.ReadFile
# name: "read_file"
# params: path (required), offset (integer), limit (integer)
# Returns file contents with line numbers. Supports pagination.
#
# CMDC.Tool.WriteFile
# name: "write_file"
# params: path (required), content (required)
# Creates parent directories automatically.
#
# CMDC.Tool.EditFile
# name: "edit_file"
# params: path (required), old_string (required), new_string (required)
# Exact string replacement. old_string must appear exactly once.
#
# CMDC.Tool.Shell
# name: "shell" (varies by OS: "bash", "zsh", "powershell")
# params: command (required), timeout (integer, default 30000ms)
# Large output auto-saved to temp file, returns file path.
# @type shell :: :sh | :bash | :zsh | :cmd | :powershell
#
# CMDC.Tool.Grep
# name: "grep"
# params: pattern (required), path, include (glob filter), context_lines, max_results
# Regex search with line numbers.
#
# CMDC.Tool.ListDir
# name: "list_dir"
# params: path (optional, defaults to working_dir)
# Returns files/dirs with type and size.
#
# CMDC.Tool.Glob
# name: "glob"
# params: pattern (required), path (optional)
# File pattern matching with wildcards.
#
# CMDC.Tool.Task
# name: "task"
# params: description (required), subagent_type (optional)
# Spawns a child Agent under SubAgent.Supervisor.
# Child runs in isolation; last assistant reply returned as tool result.
# Broadcasts :subagent_start/:subagent_end events.
#
# CMDC.Tool.WriteTodos
# name: "write_todos"
# params: todos (required) — array of %{id, content, status}
# status enum: "pending" | "in_progress" | "completed" | "cancelled"
# Updates Context.todos, broadcasts :todo_change event.
#
# CMDC.Tool.AskUser
# name: "ask_user"
# params: question (required), options (optional array of %{id, label})
# Broadcasts :ask_user event, waits for :user_responded.
#
# CMDC.Tool.CompactConversation
# name: "compact_conversation"
# params: reason (optional)
# Triggers immediate context compaction via Compactor.
################################################################################
# 8. PLUGIN DEVELOPMENT
################################################################################
# All plugins implement @behaviour CMDC.Plugin.
#
# Required callbacks:
# @callback init(keyword()) :: {:ok, plugin_state()} | {:error, term()}
# @callback priority() :: non_neg_integer() # lower = runs first (0-999)
# @callback handle_event(event(), plugin_state(), CMDC.Context.t()) :: action()
#
# Optional callbacks:
# @callback on_session_end(plugin_state(), CMDC.Context.t()) :: :ok
# @callback describe() :: String.t()
# --- Plugin Events (13 hooks) ---
# :session_start — Agent session started
# :session_end — Agent session ending(无 payload,向后兼容)
# {:after_turn, payload} — Agent 即将回 idle 前(v0.3 RFC 11G #A16)
# payload :: %{
# outcome: :finished | :aborted,
# abort_reason: term() | nil,
# messages_diff: [%CMDC.Message{}], # 本轮 prompt cycle 新增消息(按时间序)
# token_usage_diff: %CMDC.TokenUsage{}, # 本轮增量
# started_at_ms: integer(),
# ended_at_ms: integer(),
# duration_ms: non_neg_integer()
# }
# 与 :session_end 区别::session_end 保持现状(不破坏老 Plugin 如 EventLogger);
# :after_turn 带结构化 payload,**新 Plugin 推荐使用** — 典型场景:
# - 把本轮对话写入审计 / 长期记忆 / 计费系统
# - Plugin emit `{:knowledge_extracted, %{facts, account_id}}` 给独立 GenServer 入库
# {:before_prompt, text} — before user prompt is processed
# {:before_request, [Message.t()]} — before messages sent to LLM
# {:after_response, Message.t()} — after LLM response received
# {:before_tool, name, args} — before tool execution
# {:on_tool_error, name, call_id, error, attempt} — tool failed, before retry
# {:after_tool, name, call_id, result} — after tool execution
# {:after_tool_batch, results} — after all tools in batch complete
# :before_finish — before Agent returns to idle
# {:before_compact, [Message.t()]} — before context compaction
# {:before_steering, text} — CMDC.steer/2 触发,可继续/修改/拒绝
# actions:
# {:continue, state} — 放行
# {:intervene, new_text, state} — 用 new_text 替换原 steer 文本
# {:abort, reason, state} — 拒绝 steer,return {:error, :rejected}
# + emit :steering_received status=:rejected_by_plugin
# {:emit, {type, data}, state} — 加广播自定义事件(不影响 steer 流程)
# --- Plugin Actions (8 types) ---
# {:continue, state} — pass through, no modification
# {:intervene, prompt :: String.t(), state} — inject extra prompt into conversation
# {:abort, reason :: String.t(), state} — abort the entire agent run
# {:skip, state} — skip current operation
# {:block_tool, reason :: String.t(), state} — block the tool call (before_tool only)
# {:replace_tool_args, new_args :: map(), state} — replace tool arguments (before_tool only)
# {:emit, {type :: atom(), data :: term()}, state} — emit a custom event
# {:switch_model, model_string :: String.t(), state} — runtime model switch (v0.2 RFC C8)
# {:switch_model, model_string :: String.t(), state, opts :: keyword()}
# — v0.3 RFC 11G #A17 同步切 provider_opts;
# opts: [provider_opts: [base_url, api_key, timeout_ms]]
# --- Hook × Action 兼容性矩阵(v0.2 RFC B7 + C8)---
# 详见 docs/dev/plugin-cookbook.md(含完整矩阵表 + 5 个 Cookbook 实现)
#
# Hook \ Action | continue | intervene | abort | skip | block_tool | replace_tool_args | emit | switch_model
# :session_start | ✓ | - | ✓ | - | - | - | ✓ | -
# :session_end | ✓ | - | - | - | - | - | ✓ | -
# {:after_turn, p} | ✓ | - | - | - | - | - | ✓ | -
# :before_prompt | ✓ | ✓ | ✓ | ✓ | - | - | ✓ | -
# :before_request | ✓ | ✓ | ✓ | ✓ | - | - | ✓ | ✓
# :after_response | ✓ | ✓ | ✓ | ✓ | - | - | ✓ | ✓
# :before_tool | ✓ | - | ✓ | - | ✓ | ✓ | ✓ | ✓
# :on_tool_error | ✓ | - | ✓ | ✓ | - | - | ✓ | -*
# :after_tool | ✓ | ✓ | ✓ | - | - | - | ✓ | ✓
# :after_tool_batch | ✓ | ✓ | ✓ | - | - | - | ✓ | ✓
# :before_finish | ✓ | ✓ | ✓ | - | - | - | ✓ | -
# :before_compact | ✓ | - | - | ✓ | - | - | ✓ | -
# :before_steering | ✓ | ✓ | ✓ | - | - | - | ✓ | -
# *: :on_tool_error 在 Task 内 retry 循环触发,无 Agent state 上下文,switch_model 被忽略
# 若需基于工具失败切 model,请在 :after_tool 钩子内基于 result 是 {:error, _} 决定
#
# 关键约束:
# - block_tool / replace_tool_args 仅 :before_tool 有效,其他 hook 返回会被忽略并 warn
# - intervene 在 :before_prompt / :before_request / :after_response /
# :after_tool / :after_tool_batch / :before_finish / :before_steering
# 都会追加一条 user message 到 messages 末尾
# - skip 在 :before_compact 是"跳过这次压缩"语义;其他 hook 是"跳过后续 plugin"
# - abort 是终极停止,立刻把 Agent 推回 idle + emit :agent_abort
# - switch_model 累积式(最后一次生效),不短路;同 model 切换 → no-op 不发事件
# 多个 plugin 同时返回 → priority 较大者(后执行)的值生效
# --- 5 个 Cookbook 场景(docs/dev/plugin-cookbook.md)---
# 1. DangerousCommandGuard (priority 50) — :before_tool + block_tool 拦截 rm -rf 等
# 2. PromptAuditLog (priority 200) — :before_prompt + emit 审计 user 输入
# 3. BudgetGuard (priority 100) — :after_response + abort 控制成本上限
# 4. ToolFallbackGuard (priority 250) — :on_tool_error + emit :tool_degraded
# 5. SteeringContentGuard (priority 60) — :before_steering + abort 拦截 prompt injection
# --- Plugin.Pipeline execution ---
# Plugins sorted by priority (ascending). Pipeline runs each plugin in order.
# Short-circuit actions (abort, block_tool, skip) halt the pipeline immediately.
# The halted_by module and halt_reason are recorded in the result.
#
# Pipeline.run(plugin_entries, event, ctx)
# => {:ok, %{action: atom, plugin_states: map, interventions: list,
# emitted_events: list, replaced_args: map | nil,
# halted_by: module | nil, halt_reason: String.t | nil}}
# --- Example: Custom Plugin ---
defmodule MyApp.Plugins.RateLimit do
@moduledoc "Rate-limit tool calls to max N per minute."
@behaviour CMDC.Plugin
@impl true
def init(opts) do
{:ok, %{
max_per_minute: Keyword.get(opts, :max_per_minute, 30),
calls: [],
window_ms: 60_000
}}
end
@impl true
def priority, do: 20
@impl true
def handle_event({:before_tool, _name, _args}, state, _ctx) do
now = System.monotonic_time(:millisecond)
recent = Enum.filter(state.calls, &(now - &1 < state.window_ms))
if length(recent) >= state.max_per_minute do
{:block_tool, "Rate limit exceeded (#{state.max_per_minute}/min)", state}
else
{:continue, %{state | calls: [now | recent]}}
end
end
def handle_event(_event, state, _ctx), do: {:continue, state}
end
# Usage:
CMDC.create_agent(
model: "anthropic:claude-sonnet-4-5",
plugins: [{MyApp.Plugins.RateLimit, max_per_minute: 10}]
)
# --- Built-in Plugin Reference ---
#
# CMDC.Plugin.Builtin.SecurityGuard (priority 10)
# Hooks: before_tool
# Blocks dangerous file paths and shell commands via configurable blacklists.
# Default blacklists cover /etc/passwd, rm -rf /, sudo, etc.
# Config: blacklisted_paths, blacklisted_commands, allowed_dirs
#
# CMDC.Plugin.Builtin.HumanApproval (priority 15)
# Hooks: before_prompt, before_tool
# Emits {:approval_required, ...} event and waits for approve/reject.
# Config: tools (list of tool names requiring approval)
# Also handles extended events: {:tool_approved, id}, {:tool_rejected, id}
#
# CMDC.Plugin.Builtin.EventLogger (priority 50)
# Hooks: session_start, session_end, before_tool, after_tool, after_tool_batch,
# before_request, after_response, before_finish
# Writes JSON Lines log file per session.
# Config: log_dir (default: config.logs_dir)
#
# CMDC.Plugin.Builtin.MemoryLoader (priority 100)
# Hooks: session_start, after_tool
# Loads AGENTS.md files at session start → injects <agent_memory> into prompt.
# Auto-reloads when write_file/edit_file modifies a memory file.
# Config: auto_reload (default true)
#
# CMDC.Plugin.Builtin.PatchToolCalls (priority 120)
# Hooks: before_request
# Detects assistant messages with tool_calls that lack corresponding tool_result
# messages, and synthesizes placeholder responses to prevent LLM errors.
#
# CMDC.Plugin.Builtin.PromptCache (priority 130)
# Hooks: before_request
# Adds cache_control markers for Anthropic prompt caching.
# Automatically skipped for non-Anthropic models.
#
# CMDC.Plugin.Builtin.ModelRouter (priority 300, v0.2 Phase 10D)
# Hooks: before_request
# 按规则在请求前自动切换模型,返回 {:switch_model, new_model, state} action。
# v0.2 新增 4 个业务友好条件(+ 原 3 个运行时条件):
#
# Runtime (v0.1):
# {:turn_gt, n} — 对话轮次 > n
# {:cost_gt, usd} — 累计成本 > usd
# {:tokens_gt, n} — 累计 tokens > n
#
# Business (v0.2):
# {:token_budget_lt, n} — user_data[:token_budget] - total_tokens < n
# {:task_complexity, v} — user_data[:task_complexity] == v(:simple/:normal/:complex)
# {:task_complexity_in, vs} — user_data[:task_complexity] in vs
# {:time_of_day_in, ranges} — 当前 UTC 小时在任一 Range(如 [22..23, 0..6])
# {:user_tier, tier} — user_data[:user_tier] == tier
# {:user_tier_in, tiers} — user_data[:user_tier] in tiers
# {:user_data, key, value} — user_data[key] == value
# {:user_data, key, op, v} — op ∈ :eq/:gt/:lt/:gte/:lte
#
# Config:
# rules — 从上到下顺序匹配,命中第一条停止
# default_model — (当前未命中规则时保持不变;仅用于初始 current_model 推断)
# now_fn — 默认 &DateTime.utc_now/0,测试可注入
#
# 举例:
#
{CMDC.Plugin.Builtin.ModelRouter,
default_model: "anthropic:claude-sonnet-4-5",
rules: [
%{condition: {:cost_gt, 0.5}, model: "openai:gpt-4.1-mini"},
%{condition: {:token_budget_lt, 5_000}, model: "openai:gpt-4.1-mini"},
%{condition: {:task_complexity, :complex}, model: "anthropic:claude-opus-4"},
%{condition: {:time_of_day_in, [22..23, 0..6]}, model: "openai:gpt-4.1-mini"},
%{condition: {:user_tier, :free}, model: "openai:gpt-4.1-mini"},
%{condition: {:user_data, :region, "eu-west"}, model: "mistral:mistral-large"},
%{condition: {:user_data, :priority, :gt, 5}, model: "anthropic:claude-opus-4"}
]
}
#
# 命中后 Pipeline 把 model_switch 累加到 acc;Agent Loop 在 apply 时:
# 1. req_llm 用新 model
# 2. EventBus 广播 {:model_switched, %{from, to, reason: :model_router, turn, ts}}
# 3. 下一次 :before_request 时 Plugin 的 current_model 记录为新 model
#
# 健壮性保证:
# - evaluate_condition 内部异常 rescue 成 false(不崩插件)
# - user_data 为 nil / 非 map 时 user_data-based 条件自动 false
# - 未知 condition tuple 返回 false
# CMDC.Plugin.Builtin.Planning (priority 200, v0.3 Phase 11D — ADP Ch6 Planning Pattern)
# Hooks: before_prompt, after_response
# plan-first 规划插件:对较长 prompt 先要 LLM 输出 markdown checklist 作为计划,
# 解析为 %CMDC.Plan{} 并持续注入 system prompt(ADP Ch6 "持续感知"原则)。
#
# Config:
# plan_first — default true;是否在长 prompt 上自动触发规划
# min_prompt_length — default 20;prompt 短于此长度不触发
# max_plan_attempts — default 2;checklist 解析失败时的重试次数(兜底放弃不阻塞)
#
# 流程:
# 1. :before_prompt, text → len(text) >= min_prompt_length && !awaiting_plan && plan == nil
# → {:intervene, directive, %{awaiting_plan: true, last_goal: text}}
# directive 让 LLM 按照 markdown checklist 格式输出步骤。
# 2. :after_response, msg + awaiting_plan == true
# → Plan.from_markdown(msg.content, last_goal)
# → 成功:emit [{:plan_generated, plan}, {:update_system_context, :plan, Plan.to_prompt_section(plan)}]
# → 失败:attempts +1;未到 max_plan_attempts 则重新 intervene;到了则放弃不阻塞
#
# Emits: {:plugin_event, :plan_generated, %CMDC.Plan{}}
# Side-effect: {:update_system_context, :plan, text} → Agent.state.dynamic_context_sections[:plan]
# 下一轮 system prompt 在 base + 用户 prompt + Blueprint + Skills + Memory 之后附加此段
#
# CMDC.Plugin.Builtin.Reflection (priority 400, v0.3 Phase 11D — ADP Ch4+Ch7 Producer-Reviewer)
# Hooks: session_start, before_finish, after_response
# 反思循环插件:Agent 即将完成前触发评审,未通过则注入修改指令让 Agent 继续改进,
# 直到通过或达到 max_reviews。修掉 v0.2 的 iteration-reset 死循环 bug。
#
# Config:
# reviewer_prompt — Reviewer 的人设提示(默认"严格的质量评审员")
# reviewer_model — 可选,SubAgent 模式下用不同模型评审
# reviewer_subagent — default false;true = 启独立 SubAgent(ADP 推荐)
# criteria — 评审维度列表 ["准确性", "完整性", "可读性"]
# max_reviews — default 3;最大 review 轮数(max_iterations 作为历史别名仍生效)
# pass_signal — 通过信号,支持 String.t() 或 %Regex{};默认 "APPROVED"
#
# 状态机(存于 plugin_state):
# :idle ──before_finish──> :reviewing ──after_response(pass)──> :done
# │
# ├─after_response(fail)──> :reviewing (保持)
# └─max_reviews 达到──────> :done(强制通过)
# :session_start 重置 phase=:idle, iteration=0(每个会话独立)
#
# 模式 A · 内联自评(reviewer_subagent: false):
# :before_finish → intervene(review_prompt) → 主 Agent LLM 自评
# :after_response 检查 pass_signal 是否命中 → :done
# 轻量零成本,但 ADP Ch4 警告有"认知偏差"
#
# 模式 B · SubAgent 分离(reviewer_subagent: true,推荐):
# :before_finish → spawn_reviewer_subagent:
# 1. CMDC.Agent.start_link(reviewer_model, reviewer_prompt)
# 2. CMDC.Agent.prompt(reviewer_pid, build_review_prompt(last_reply))
# 3. 阻塞收 {:agent_end, msgs, _usage} 事件拿到 critique
# 4. GenServer.stop(reviewer_pid)
# critique 含 pass_signal → emit :reflection_approved,phase=:done
# critique 不含 → intervene(build_revise_prompt(critique)),iteration+1
# SubAgent 启动失败 / timeout → 自动降级到模式 A(内联自评),永不阻塞主会话
#
# Emits: {:plugin_event, :reflection_approved, %{critique, iteration}}
#
# 典型组合(ADP 书推荐):
# plugins: [
# {CMDC.Plugin.Builtin.Planning, plan_first: true},
# {CMDC.Plugin.Builtin.Reflection, reviewer_subagent: true,
# reviewer_model: "qwen3-max", pass_signal: ~r/APPROVED|已通过/}
# ]
# → "先规划 → 执行 → 独立评审 → 修订 → 交付" 闭环
# --- Plugin.Registry ---
# Manages plugin registration, deduplication, and priority ordering.
# CMDC.Plugin.Registry.from_specs([SecurityGuard, {HumanApproval, tools: ["shell"]}])
# => %Registry{entries: [{SecurityGuard, state}, {HumanApproval, state}]}
################################################################################
# 8A. PLAN — CMDC.Plan (v0.3 Phase 11D — ADP Ch6 Planning Pattern 数据结构)
################################################################################
# Plan 是 Agent 执行规划的显式数据结构。Planning plugin 创建并维护它,Reflection/其他
# plugin/工具也可以读它做决策。@derive Jason.Encoder,可直接序列化传给前端 UI。
#
# struct fields:
# goal :: String.t()
# steps :: [%CMDC.Plan.Step{}]
# status :: :draft | :approved | :in_progress | :completed
# created_at :: DateTime.t()
# approved_at :: DateTime.t() | nil
# metadata :: map() — last_replanned_at 等辅助信息
#
# Step struct fields:
# id :: String.t() # "step-1", "step-2"...
# description :: String.t()
# status :: :pending | :in_progress | :completed | :failed | :skipped
# started_at :: DateTime.t() | nil
# finished_at :: DateTime.t() | nil
# result :: any() # step_completed 时携带
# notes :: String.t() | nil # annotate_step 写入
#
# 构造:
plan = CMDC.Plan.new("部署到生产", ["跑测试", "打镜像", "推到集群"])
# from_markdown 解析 "- [ ] 第一步" 格式
{:ok, plan} = CMDC.Plan.from_markdown("- [ ] 调研\n- [x] 实现", "优化冷启动")
#
# 状态迁移(railway:{:ok, plan} | {:error, :not_found}):
CMDC.Plan.step_started(plan, "step-1")
CMDC.Plan.step_completed(plan, "step-1", _result = "all 1040 tests green")
CMDC.Plan.step_failed(plan, "step-2", "docker daemon unreachable")
CMDC.Plan.step_skipped(plan, "step-3", "已手动跳过")
CMDC.Plan.annotate_step(plan, "step-1", "花了 14 分钟")
CMDC.Plan.approve(plan) # status → :approved, approved_at = now
#
# Replan(ADP Ch6 适应性要求):
CMDC.Plan.replace_steps(plan, ["重写的 step1", "重写的 step2"]) # 整体重规划,重新编号 step-1...
CMDC.Plan.add_step(plan, "追加的步骤") # append;自动 step-N+1
# insert_step 默认在 anchor 之后插入,opts [before: true] 在之前
CMDC.Plan.insert_step(plan, "step-2", "中间新步骤") # => {:ok, plan}
CMDC.Plan.insert_step(plan, "step-2", "再前面一步", before: true) # => {:ok, plan}
CMDC.Plan.remove_step(plan, "step-3") # => {:ok, plan} | {:error, :not_found}
# 所有 replan 操作自动写入 plan.metadata[:last_replanned_at] 时戳
#
# 查询 / 渲染:
CMDC.Plan.get_step(plan, "step-2")
CMDC.Plan.current_step(plan) # 第一个 :pending / :in_progress 步骤
CMDC.Plan.progress(plan)
# => %{total: 3, completed: 1, failed: 0, skipped: 0, pending: 2, in_progress: 0, pct: 33.33}
CMDC.Plan.all_finished?(plan) # 全部进入终止态(completed/failed/skipped)
CMDC.Plan.all_completed?(plan) # 全部 completed
CMDC.Plan.to_markdown(plan) # => "## Goal\n优化冷启动\n\n- [x] step-1 ..."
CMDC.Plan.to_prompt_section(plan) # => 适合直接拼进 system prompt 的段落(含进度)
################################################################################
# 8B. AGENT DYNAMIC CONTEXT (v0.3 Phase 11D)
################################################################################
# Plugin 可以通过 emit 事件向 Agent 的下一轮 system prompt 注入 / 更新文本段落:
#
# {:emit, {:update_system_context, key, text}, state} # 注入(覆盖 / 新增)
# {:emit, {:update_system_context, key, nil}, state} # 删除 key
#
# 对应 Agent 状态字段:
# CMDC.Agent.State.dynamic_context_sections :: %{atom() => String.t()}
#
# CMDC.Agent.SystemPrompt 在 :full / :task / :minimal 三种模式下都会把这些段落(按 key
# 字典序排序)合并到 system prompt 末尾,位于 BasePrompt + 用户 system_prompt + Blueprint +
# Skills + Memory 之后。
#
# 典型使用者:CMDC.Plugin.Builtin.Planning 每次生成/更新 plan 都 emit
# {:update_system_context, :plan, Plan.to_prompt_section(plan)},让后续所有 LLM 调用都
# 自动感知到最新的 plan 进度——这是 ADP Ch6 "持续感知(continuous perception)"原则的
# 直接实现。
################################################################################
# 9. BLUEPRINT — CMDC.Blueprint
################################################################################
# Blueprint = declarative, reusable Agent configuration.
# @callback build(keyword()) :: CMDC.Options.t()
#
# Struct fields mirror Options: name, model, system_prompt, tools, plugins,
# subagents, skills_dirs, memory, sandbox, provider_opts, max_turns,
# max_tokens, working_dir
# @enforce_keys [:name, :model]
# --- Blueprint via behaviour ---
defmodule MyApp.CodingAgent do
use CMDC.Blueprint
@impl true
def build(opts) do
%CMDC.Options{
model: opts[:model] || "anthropic:claude-sonnet-4-5",
system_prompt: "You are an expert Elixir developer.",
tools: [CMDC.Tool.ReadFile, CMDC.Tool.WriteFile, CMDC.Tool.EditFile, CMDC.Tool.Shell],
plugins: [CMDC.Plugin.Builtin.SecurityGuard],
skills_dirs: ["priv/skills"],
max_turns: 50
}
end
end
{:ok, session} = CMDC.create_agent(
blueprint: MyApp.CodingAgent,
provider_opts: [api_key: System.get_env("ANTHROPIC_API_KEY")]
)
# --- Blueprint via struct pipeline ---
blueprint = %CMDC.Blueprint{name: "coder", model: "anthropic:claude-sonnet-4-5"}
blueprint = CMDC.Blueprint.add_tools(blueprint, [CMDC.Tool.ReadFile, CMDC.Tool.Shell])
blueprint = CMDC.Blueprint.add_plugins(blueprint, [CMDC.Plugin.Builtin.SecurityGuard])
opts = CMDC.Blueprint.to_options(blueprint, provider_opts: [api_key: "sk-..."])
{:ok, session} = CMDC.create_agent(opts)
# Struct manipulation:
# Blueprint.override(bp, fields)
# Blueprint.add_tools(bp, tools)
# Blueprint.add_plugins(bp, plugins)
# Blueprint.add_skills_dirs(bp, dirs)
# Blueprint.add_memory(bp, paths)
# Blueprint.add_subagents(bp, subagents)
# Blueprint.to_options(bp, overrides \\ [])
# --- Blueprint.Base (built-in) ---
# Default plugins: SecurityGuard + EventLogger + PatchToolCalls
opts = CMDC.Blueprint.Base.build(
model: "anthropic:claude-sonnet-4-5",
tools: [CMDC.Tool.ReadFile, CMDC.Tool.Shell],
extra_plugins: [CMDC.Plugin.Builtin.HumanApproval]
)
CMDC.Blueprint.Base.default_plugins() # => [SecurityGuard, EventLogger, PatchToolCalls]
################################################################################
# 10. SUBAGENT — CMDC.SubAgent
################################################################################
# Declarative specification for child agents spawned by Tool.Task.
# @enforce_keys [:name]
#
# @type t :: %CMDC.SubAgent{
# name: String.t(), # required
# description: String.t() | nil, # nil fields inherit from parent
# system_prompt: String.t() | nil,
# model: String.t() | nil,
# tools: [module()] | nil,
# plugins: [plugin_spec()] | nil,
# skills_dirs: [String.t()] | nil,
# user_data: map() | nil # nil = inherit parent; non-nil = full replace
# }
sa = CMDC.SubAgent.new!(name: "researcher",
description: "Specialized in web research",
model: "openai:gpt-4o",
tools: [CMDC.Tool.Shell, CMDC.Tool.ReadFile])
# SubAgent context isolation:
# INHERITS: working_dir, sandbox, model (if nil), tools (if nil), user_data (if nil)
# DOES NOT INHERIT: messages, todos, memory_contents, tool_call_hashes
# INPUT: single user message (task description)
# OUTPUT: last assistant reply text → returned as ToolMessage to parent
# SubAgent.to_options(subagent, parent_options) → merged Options
# Child Agent runs under SubAgent.Supervisor as :temporary process.
# Crash does NOT affect parent Agent.
# Override per-subagent user_data(替换不合并,常用于切换数据范围/降权操作者)
narrow = CMDC.SubAgent.new!(
name: "narrowed",
description: "Read-only narrow data scope",
user_data: %{tenant_id: "acme-1", data_scope: :read_only}
)
# 父 Agent 的 user_data 含 :operator_id / :role 等字段在 narrow 子代理中**不可见**。
################################################################################
# 11. SANDBOX — CMDC.Sandbox
################################################################################
# File/command execution abstraction layer.
# @behaviour with 8 callbacks:
#
# @callback read_file(path, opts) :: {:ok, String.t()} | {:error, String.t()}
# @callback write_file(path, content, opts):: :ok | {:error, String.t()}
# @callback edit_file(path, old, new, opts):: {:ok, count} | {:error, :not_found | :not_unique | String.t()}
# @callback list_dir(path, opts) :: {:ok, [dir_entry()]} | {:error, String.t()}
# @callback file_exists?(path, opts) :: boolean()
# @callback grep(pattern, path, opts) :: {:ok, [grep_match()]} | {:error, String.t()}
# @callback glob(pattern, path, opts) :: {:ok, [glob_match()]} | {:error, String.t()}
# @callback execute(command, opts) :: {:ok, String.t()} | {:error, String.t()}
#
# Types:
# @type dir_entry :: %{name: String.t(), type: :file | :directory, size: non_neg_integer() | nil}
# @type grep_match :: %{file: String.t(), line: non_neg_integer(), content: String.t()}
# @type glob_match :: %{path: String.t(), type: :file | :directory}
#
# Common opts keys: :working_dir, :timeout, :offset, :limit
# Default implementation: CMDC.Sandbox.Local (direct OS execution)
################################################################################
# 12. SKILL — CMDC.Skill
################################################################################
# Skills = SKILL.md files with YAML frontmatter metadata.
# Auto-discovered from skills_dirs, injected into system prompt.
#
# @type t :: %CMDC.Skill{
# name: String.t(),
# description: String.t(),
# path: String.t(),
# allowed_tools: [String.t()] | nil,
# content: String.t() | nil
# }
#
# SKILL.md format:
# ---
# name: my-skill
# description: Does something useful
# allowed_tools: [read_file, shell] # optional
# ---
# Skill content here...
skills = CMDC.Skill.discover(["./skills", "~/.cmdc/skills"])
{:ok, skill} = CMDC.Skill.load(skill) # reads file content
CMDC.Skill.to_prompt_snippet(skills) # generates prompt section
CMDC.Skill.find(skills, "my-skill") # => %Skill{} | nil
################################################################################
# 13. PROVIDER — CMDC.Provider
################################################################################
# Thin wrapper around req_llm for LLM API calls.
# NOT a behaviour — direct function module.
#
# @type model :: String.t() | map()
#
# Provider.stream(model, messages, tools, opts)
# => {:ok, %{bridge_pid: pid}} | {:error, term()}
# Starts a StreamBridge process that converts ReqLLM.StreamResponse
# into gen_statem messages:
# {:cmdc_stream_chunk, chunk}
# :cmdc_stream_done
# {:cmdc_stream_error, reason}
#
# Provider.convert_messages(messages, provider \\ :openai)
# => [ReqLLM.Message.t()]
#
# Provider.convert_tools(tool_modules)
# => [map()] — JSON Schema tool definitions
################################################################################
# 14. CONFIG — CMDC.Config
################################################################################
# @type provider_entry :: {module(), keyword()}
# @type t :: %CMDC.Config{
# data_dir: String.t(), # default ~/.cmdc
# default_model: String.t(), # default "anthropic:claude-sonnet-4-5"
# default_tools: [module()], # default []
# provider: module() | nil,
# provider_opts: keyword(), # default []
# providers: [provider_entry()] # default []
# }
config = CMDC.Config.new!(provider_opts: [api_key: "sk-..."])
CMDC.Config.resolve_provider(config, "anthropic:claude-sonnet-4-5")
CMDC.Config.data_dir(config) # => "~/.cmdc"
CMDC.Config.sessions_dir(config) # => "~/.cmdc/sessions"
CMDC.Config.logs_dir(config) # => "~/.cmdc/logs"
CMDC.Config.ensure_dirs!(config) # creates data/sessions/logs dirs
################################################################################
# 15. MCP INTEGRATION
################################################################################
# Model Context Protocol support via anubis_mcp.
# MCP tools are dynamically wrapped as CMDC.Tool behaviour modules.
# Discovery:
servers = CMDC.MCP.Config.discover(working_dir)
# Searches: .cursor/mcp.json, .vscode/mcp.json, mcp.json, etc.
modules = CMDC.MCP.Bridge.discover_tool_modules(servers)
# Each MCP tool → dynamically generated module implementing CMDC.Tool
# Client API:
CMDC.MCP.Client.await_ready("server-name", 5_000)
CMDC.MCP.Client.server_list_tools("server-name")
CMDC.MCP.Client.server_call_tool("server-name", "tool_name", %{"arg" => "val"})
CMDC.MCP.Client.server_list_resources("server-name")
CMDC.MCP.Client.server_read_resource("server-name", "resource://uri")
# Supervisor:
CMDC.MCP.Supervisor.start_link(opts)
CMDC.MCP.Supervisor.running_count(supervisor)
CMDC.MCP.Supervisor.running_clients(supervisor)
################################################################################
# 16. AGENT INTERNALS
################################################################################
# --- Agent gen_statem states ---
# :idle → waiting for prompt
# :running → processing prompt, preparing LLM request
# :streaming → receiving LLM stream response
# :executing_tools → running tool calls in parallel
#
# State transitions:
# idle → running (prompt/2) → streaming (Provider.stream) → idle (no tools)
# → executing_tools (has tools) → running (next turn)
# --- Agent.State fields (notable) ---
# session_id, model, working_dir, status, messages (reversed),
# plugins, plugin_states, tools, disabled_tools,
# pending_tool_tasks, tool_results, current_text, current_tool_calls,
# current_thinking, token_usage, turn_count, tool_call_count, cost_usd,
# tool_call_hashes, sandbox, subagents, todos, memory_contents,
# retry_count, max_retries (3), overflow_detected, pending_messages
# --- ToolRunner ---
# Executes tool calls in parallel via Task.async.
# Built-in loop detection: tracks hash of recent tool call sequences.
# Three detection paths:
# 1. Exact duplicate hash in last N calls
# 2. Repeating pattern detection (min_repeat_pattern)
# 3. Consecutive failure threshold (max_consecutive_failures)
#
# Tool retry: on {:error, _}, retries up to :tool_max_retries times (default 0).
# Before each retry, runs {:on_tool_error, name, call_id, error, attempt} pipeline.
# Plugin can return {:continue, state} to allow retry, {:abort, _, state} or {:skip, state} to give up.
# Retry happens inside Task.async — does NOT block other concurrent tools.
# Config keys (in state.config): :tool_max_retries (default 0), :tool_retry_delay_ms (default 500)
#
# ToolRunner.execute_batch(tool_calls, state)
# => {:next_state, :executing_tools, state} — tools dispatched
# => {:next_turn, state} — all results collected, continue
# => {:abort, state} — loop detected or error
# --- Compactor ---
# Auto-triggered when context exceeds token threshold.
# Strategy: usage-based (primary) + character estimation (fallback, chars/4).
# Compactor.maybe_compact(state) => {:compacted, state} | {:skip, state}
# Compactor.force_compact(state) => {:compacted, state} | {:skip, state}
# Compactor.estimate_tokens(state) => non_neg_integer()
#
# ArgTruncator: pre-truncates large tool arguments (write_file, edit_file, execute)
# before sending to LLM. Prevents context overflow from tool results.
# ArgTruncator.truncate(state) => {state, changed?}
# --- SystemPrompt assembly order ---
# 1. User system_prompt (blueprint_system_prompt)
# 2. BasePrompt (core behavior + tool usage guidelines)
# 3. Blueprint identity (name + purpose)
# 4. Skills section (discovered SKILL.md metadata)
# 5. Memory section (<agent_memory> tags from loaded files)
# --- Emitter ---
# Emitter.broadcast(state, event) — broadcasts via EventBus
# Emitter.recent(session_id, limit \\ 50) — recent events from ETS ring buffer
# Emitter.clear(session_id) — clear event buffer
################################################################################
# 17. MEMORY — CMDC.Memory.ETS
################################################################################
# ETS-based key-value memory store.
# @type store :: atom()
#
# {:ok, store} = CMDC.Memory.ETS.new(:my_memory)
# CMDC.Memory.ETS.store(store, "key1", %{text: "important fact"})
# {:ok, results} = CMDC.Memory.ETS.search(store, "important")
# {:ok, all} = CMDC.Memory.ETS.list(store)
# CMDC.Memory.ETS.delete(store, "key1")
################################################################################
# 17.5 STEERING — 中段软中断(Mid-Execution Soft Interrupt)
################################################################################
# Steering 是 v0.2 引入的关键能力:用户在 Agent 已开始执行后,无需先 abort 再
# prompt,无需丢弃中间产出,**直接追加新指引**让 Agent 调头。
#
# 与现有 API 的区别:
#
# API 对话历史 执行进度 典型场景
# ---------- ------------ --------------- --------------------------------
# prompt/2 新增 user msg 必须 idle 发送新任务
# abort/1 不变 强制清零→idle 紧急停止(丢失所有中间产出)
# steer/2 新增 user msg 保留中间产出 改方向 / 追加约束 / 修正 LLM 误解
# (前缀 [Steering]) (**核心能力**:不丢弃已完成的工具结果)
#
# 使用建议:
# - 短指令、需要立刻生效:steer/2
# - 需要重置上下文:abort/1 后 prompt/2
# - 需要分支多次实验:换 SubAgent 或 fork session
#
# 详细架构与决策:docs/dev/steering-design.md
#
# --- 4 状态行为表 ---
#
# gen_statem 状态 收到 steer 后行为
# ──────────────── ─────────────────────────────────────────────────────────
# :idle 透传到 prompt 路径(等同 prompt/2)。
# 不入 :steering_queue,仅 emit :steering_applied。
# 返回 :ok,立刻进入 :running。
#
# :running 入 :steering_queue,emit :steering_received status=:queued。
# 下个 turn 的 dispatch 阶段(无新 tool_calls 时)才注入。
# 返回 :ok。
#
# :streaming 入 :steering_queue,emit :steering_received status=:queued。
# **不打断 LLM 流**(避免跨网络/状态污染)。
# stream_done → tool_calls 为空 → 下个 turn 间隙注入。
# 返回 :ok。
#
# :executing_tools 入 :steering_queue,emit :steering_received status=:queued。
# 下次 ToolRunner.collect_result(任意工具完成时)触发
# maybe_apply_steering:
# (a) 把 in-flight 工具按 interrupt_immune_tools 分组;
# (b) 对 killable 组 Task.shutdown(:brutal_kill);
# 每个被杀工具:emit :tool_skipped_for_steering
# + 追加 [Skipped: steering] tool_result
# 防止 LLM 混乱(tool_call 必须配 tool_result);
# (c) immune 组**继续等**自然完成(保护副作用工具);
# (d) 全部清空后 inject_steering:合并 queue → 注入
# [Steering] user message → emit :steering_applied
# → next_turn。
# 返回 :ok。
#
# --- 注入消息格式 ---
#
# 当 queue 为 1 条时:
# [Steering] 改成只看官方 hexdocs,不要第三方教程
#
# 当 queue 为多条(按 queued_at 顺序合并 + 编号)时:
# [Steering] 用户在 Agent 执行中段追加了以下指引:
#
# 1. first hint
# 2. second hint
# 3. third hint
#
# 编号让 LLM 清楚先后顺序,避免在指令冲突时不知所措。
#
# --- Race Condition: steer 与 tool finish 同时 ---
#
# 因 gen_statem 串行处理 mailbox,所有事件按 FIFO 顺序:
#
# 场景 A: steer 先入 mailbox,tool result 后到
# → handle_steer 入 queue, reply :ok
# → 处理 tool result → collect_result → maybe_apply_steering 触发
#
# 场景 B: tool result 先入 mailbox,steer 后到
# → collect_result → maybe_apply_steering(queue 空 → :none)
# → handle_steer 入 queue, reply :ok
# → 等下次 collect_result 触发
#
# 两种场景都正确收敛,无 panic、无丢消息(gen_statem 同步保证)。
#
# --- Plugin 拦截示例 ---
#
defmodule MyApp.Plugins.SteerAuditor do
@moduledoc "审计 + 拒绝包含敏感词的 steering"
@behaviour CMDC.Plugin
@impl true
def init(opts), do: {:ok, %{blacklist: Keyword.get(opts, :blacklist, [])}}
@impl true
def priority, do: 5
@impl true
def handle_event({:before_steering, text}, state, _ctx) do
if Enum.any?(state.blacklist, &String.contains?(text, &1)) do
{:abort, "steering 包含敏感词", state}
else
{:emit, {:steering_audit, %{text: text}}, state}
end
end
def handle_event(_event, state, _ctx), do: {:continue, state}
end
# --- 完整示例:用 steer 中途切换查询主题 ---
{:ok, session} =
CMDC.create_agent(
model: "anthropic:claude-sonnet-4-5",
tools: [CMDC.Tool.WebSearch],
plugins: [CMDC.Plugin.Builtin.SecurityGuard],
max_steering_queue: 5,
interrupt_immune_tools: ~w(write_file edit_file shell git_commit notebook_edit ask_user)
)
CMDC.subscribe(session)
CMDC.prompt(session, "请并行调用 web_search 三次:分别查 'Erlang OTP'、'Elixir Phoenix'、'BEAM VM'")
# 等到第一个 tool_execution_start 触发后立刻改方向
receive do
{:cmdc_event, _sid, {:tool_execution_start, _name, _, _}} -> :ok
end
{:ok, _ref} = CMDC.steer(session, "改主意了,只查 'Elixir 1.19 新特性' 一个主题")
# 事件流大致为:
# :steering_received status=:queued ref=#Reference<...>
# :tool_skipped_for_steering name="web_search" call_id="c2" reason=:killed_by_steering
# :tool_skipped_for_steering name="web_search" call_id="c3" reason=:killed_by_steering
# ↑ 第一个 web_search 已完成的不杀,剩下两个被 brutal_kill
# :steering_applied count=1 refs=[ref]
# :request_start ... ← LLM 基于新指引重新规划
# :message_delta "Elixir 1.19 ..."
defmodule SteerLoop do
def run(session) do
receive do
{:cmdc_event, _sid, {:steering_received, %{status: status}}} ->
IO.puts("steering #{status}")
run(session)
{:cmdc_event, _sid, {:tool_skipped_for_steering, %{name: n, call_id: c}}} ->
IO.puts("killed: #{n} (#{c})")
run(session)
{:cmdc_event, _sid, {:steering_applied, %{count: n}}} ->
IO.puts("applied #{n} steering(s), regenerating...")
run(session)
{:cmdc_event, _sid, {:message_delta, %{delta: chunk}}} ->
IO.write(chunk)
run(session)
{:cmdc_event, _sid, {:agent_end, _, _}} ->
:ok
after
120_000 -> :timeout
end
end
end
SteerLoop.run(session)
################################################################################
# 18. GATEWAY — CMDC.Gateway
################################################################################
# Outbound event reporting contract.
# @callback report_event(session_id, event) :: :ok
# @callback report_agent_state(session_id, state_info) :: :ok
#
# Default: CMDC.Gateway.Local — broadcasts via EventBus.
################################################################################
# 19. SESSION SERVER
################################################################################
# Per-session Supervisor tree.
# SessionServer.start_link(opts) — starts Supervisor with Agent + SubAgent.Supervisor
# SessionServer.agent(session_server) — returns Agent pid
# SessionServer.sub_agent_supervisor(session_server) — returns SubAgent.Supervisor pid
# SessionServer.prompt(session_server, text) — delegates to Agent.prompt/2
################################################################################
# 20. KNOWN PITFALLS & GOTCHAS
################################################################################
# 1. TOKEN COUNTING
# Usage data is ONLY available in the last stream chunk (stream_done).
# Do NOT attempt to accumulate tokens during streaming.
# Fallback: character count / 4 (rough estimate).
# 2. SANDBOX PROXY PATTERN
# All file/exec tools MUST check ctx.sandbox:
# when not is_nil(sandbox) → sandbox.read_file(...)
# else → CMDC.Sandbox.Local.read_file(...)
# SecurityGuard handles policy; Sandbox handles execution.
# Avoid double-validating paths in both layers.
# 3. SUBAGENT ISOLATION
# Child inherits: working_dir, sandbox, model (if nil), tools (if nil), user_data (if nil)
# Child does NOT inherit: messages, todos, memory_contents, tool_call_hashes
# Child crash → {:error, ...} returned to parent ToolRunner (no restart, :temporary)
# user_data override: SubAgent.user_data 非 nil 时**整张替换**父级,**不**做 map merge。
# 如果你希望子代理在父级 user_data 基础上叠加几个字段,必须在 SubAgent
# spec 里手动 Map.merge(parent_user_data, child_extra) 后传入。
# 4. MEMORY PLUGIN WRITE-BACK
# MemoryLoader only LOADS and INJECTS memory into prompts.
# Writing back is done by the Agent using edit_file tool naturally.
# Do NOT auto-write-back in the plugin layer.
# 5. EVENTBUS IS THE ONLY EXTERNAL CONTRACT
# Never use send(pid, msg) to bypass EventBus.
# All external communication goes through Emitter.emit → EventBus.broadcast.
# 6. PLUGIN PRIORITY ORDERING
# Lower number = runs first.
# SecurityGuard (10) → HumanApproval (15) → EventLogger (50)
# → MemoryLoader (100) → PatchToolCalls (120) → PromptCache (130)
# Custom plugins: use 0-9 for "must run first", 200+ for "run after builtins".
# 7. MESSAGE LIST IS REVERSED
# Agent.State.messages stores messages in reverse chronological order.
# Use Enum.reverse(state.messages) when reading conversation history.
# 8. NIMBLEOPTIONS VALIDATION
# CMDC.Options.new!/1 raises on invalid input.
# CMDC.Options.new/1 returns {:ok, t} | {:error, ValidationError}.
# The error message is human-readable and includes field name + expected type.
# 9. COLLECT_REPLY SUBSCRIBES INTERNALLY
# collect_reply/2 subscribes to EventBus and unsubscribes after.
# If you're already subscribed, you may receive duplicate events.
# Best practice: use EITHER subscribe+receive OR collect_reply, not both.
# 10. BLUEPRINT VS OPTIONS PRECEDENCE
# When using blueprint: module, the blueprint's build/1 output provides
# base options. Keyword args passed to create_agent override blueprint values.
# Blueprint is evaluated first, then overrides are merged.
# 11. STREAMING MESSAGE PROTOCOL
# Agent gen_statem receives:
# {:cmdc_stream_chunk, chunk} — from StreamBridge
# :cmdc_stream_done — from StreamBridge
# {:cmdc_stream_error, reason} — from StreamBridge
# These are internal; external consumers use EventBus events.
# 12. TOOL RESULT SIZE
# Shell tool auto-saves large output (>10KB) to temp file.
# ArgTruncator truncates large write_file/edit_file args in history
# to prevent context overflow.
# 13. MAX_TURNS PROTECTION
# When turn_count reaches max_turns, Agent auto-stops with
# {:error, :max_turns_reached}. Default is 100.
# Set max_turns: nil in Options for unlimited (use with caution).
# 14. PROMPT QUEUEING
# If Agent is busy (streaming/executing_tools), prompt/2 queues the message.
# Returns %{queued: true}. Message processed after current task completes.
# 15. STEER vs PROMPT vs ABORT
# - 改方向:用 steer/2(保留中间产出,注入 [Steering] user message)
# - 全新任务:等 idle 后 prompt/2,或先 abort/1 + prompt/2
# - 紧急停止:abort/1(清空进度,回到 idle)
# steer/2 在 :idle 状态自动等同 prompt/2,无需调用方判断状态。
# interrupt_immune_tools 默认覆盖 6 个写/副作用工具——传 [] 可彻底关闭白名单
# (所有工具都可被 brutal_kill),但**强烈不推荐**:会丢失正在写的文件状态。
# 16. USER_DATA IS PASSTHROUGH ONLY
# CMDC 不解析、不校验、不修改 ctx.user_data 的内容;只做不可变透传。
# - **不要**用 user_data 携带运行时可变状态(CMDC 在 turn 间不会同步更新)。
# - **不要**把巨大的对象塞进去(每次 Tool/Plugin 都会拷贝整张 map)。
# - 推荐:tenant_id / operator_id / role / data_scope 等几十字节身份字段。
# - 业务侧需要根据 user_data 鉴权,请在自己的 Tool.execute 或自定义 Plugin
# 的 :before_tool 钩子里实现,CMDC 不会替你做这层判断。
# 17. RESTORING MESSAGES VIA :messages
# create_agent(messages: [...]) 灌入历史时:
# - **不要**在 messages 里放 :system 消息——SystemPrompt.build 会拼自己的 system,
# 否则 LLM 会同时看到两条 system 消息(行为未定义)。
# - 元素必须是 %CMDC.Message{},**不接受裸 map**(Options.new! 会抛 ValidationError)。
# 正确做法:CMDC.Message.user/1 / .assistant/3 / .tool_result/3 工厂构造。
# - CMDC 不会静默截断长 messages list;超长会一次性把所有 prompt_tokens 全付掉。
# 灌入前请按业务需要 summarize 或保留最近 N 轮(典型阈值:保留最近 20-40 条)。
# - tool_calls 配 tool_result 必须**配对**(否则 Repair.ensure_tool_results 会注入
# 占位"[Tool result missing]"导致意外行为)。如果灌入半截 turn,建议截到上一个
# 完整 user / assistant 边界。
# - 灌入后 turn_count 仍从 0 开始计——max_turns 控制的是后续真实轮次,
# 不会把历史 message 数算在内。
# 18. APPROVE/REJECT AUTO-RESUME 行为差异
# CMDC.approve(session, id) 默认 auto_resume: true
# CMDC.reject(session, id) 默认 auto_resume: false
# 这是有意为之的非对称默认值:
# - approve 的语义是"放行后让 Agent 继续",几乎所有场景都希望自动续 turn
# —— v0.1.x 的 sleep+re-prompt hack 已经废弃,请用单行 approve。
# - reject 的语义是"拒绝这次执行",多数场景下让 Agent 留在 idle 等用户进一步指令更安全;
# 如果硬塞一个 turn,LLM 可能还会反复尝试同一个工具(HumanApproval 又会再拦截)造成死循环。
# 只有当业务场景明确希望"被拒后让 LLM 走另一条规划分支"时,才传 reject(..., auto_resume: true)。
# - auto_resume 仅在 Agent 当前处于 :idle 时才真正触发;其他状态(:running / :streaming /
# :executing_tools)下 opts 等同 no-op,不会重复 run_turn。
# - 触发时广播 {:agent_resumed, %{trigger, approval_id}},trigger 反映来源
# (:tool_approved / :tool_rejected / :tool_approval_timeout)。集成方可根据 approval_id
# 做精准跟踪:知道是"哪一次审批触发的"续 turn。
# - approval timeout(HumanApproval Plugin 的 timeout_ms 触发)默认也走 auto_resume:
# 与 approve 同样的默认值,避免审批"超时但 Agent 永远卡在 idle"的灵异状况。
# 19. SESSION 入参 — pid 与 session_id 字符串互通
# v0.2 起所有 CMDC.* 公开 API 同时接受 pid 和 session_id 字符串,背后通过
# CMDC.SessionRegistry(unique :via 注册)反查。
# - 字符串 → pid 反查不命中时,prompt/status/abort/steer/approve/reject/stop/agent_pid
# 会 raise ArgumentError,**不会**返回 :error。这是有意为之,避免 silent failure。
# - subscribe / unsubscribe / user_respond 三个例外不查注册表:
# * subscribe(string) 即使 session 还没启动也合法(用户可以提前订阅以拦截 :agent_start)
# * user_respond(string, ref, response) 直接 broadcast,不验证 session 存活
# - 同一 session_id 重复 create_agent 会返回 {:error, {:already_started, pid}}
# —— 这是 Registry :unique 的语义保证,可以安全用作幂等 create-or-resume 模式。
# - 不要混用 pid 与 string 做 hash key — pid 在 GC/restart 后会变,string 是稳定 ID。
# 建议跨进程/跨节点的状态存 session_id 字符串,运行期热路径直接传 pid。
# - 反查辅助 CMDC.SessionServer.whereis(session_id) 返回 nil(不抛异常),
# 适合做"session 还活着吗"的轻量探针。
################################################################################
# 21. COMPLETE EXAMPLE — Full Agent Integration
################################################################################
# --- Setup ---
{:ok, session} = CMDC.create_agent(
model: "anthropic:claude-sonnet-4-5",
tools: [
CMDC.Tool.ReadFile,
CMDC.Tool.WriteFile,
CMDC.Tool.EditFile,
CMDC.Tool.Shell,
CMDC.Tool.Grep,
CMDC.Tool.Glob
],
plugins: [
CMDC.Plugin.Builtin.SecurityGuard,
CMDC.Plugin.Builtin.EventLogger,
{CMDC.Plugin.Builtin.HumanApproval, tools: ["shell"]},
CMDC.Plugin.Builtin.MemoryLoader,
CMDC.Plugin.Builtin.PatchToolCalls
],
system_prompt: "You are a senior Elixir developer. Follow OTP conventions.",
working_dir: "/path/to/project",
skills_dirs: ["./skills"],
memory: ["AGENTS.md"],
provider_opts: [api_key: System.get_env("ANTHROPIC_API_KEY")],
max_turns: 50
)
# --- Subscribe for events ---
CMDC.subscribe(session)
# --- Send prompt ---
CMDC.prompt(session, "Refactor the User module to use typed structs")
# --- Event loop ---
defmodule EventLoop do
def run(session) do
receive do
{:cmdc_event, sid, {:message_delta, %{delta: chunk}}} when is_binary(chunk) ->
IO.write(chunk)
run(session)
{:cmdc_event, sid, {:tool_execution_start, name, _call_id, _args}} ->
IO.puts("\n⚙ #{name}")
run(session)
{:cmdc_event, _sid, {:approval_required, %{id: ref, tool: tool, args: args}}} ->
IO.puts("\n🔒 Approve #{tool}? #{inspect(args)}")
# 默认 auto_resume: true — 无需 sleep+re-prompt,approve 后 Agent 自动续 turn
CMDC.approve(session, ref)
run(session)
{:cmdc_event, _sid, {:agent_end, _msgs, usage}} ->
IO.puts("\n✓ Done. Tokens: #{inspect(usage)}")
{:cmdc_event, sid, {:error, ^sid, reason}} ->
IO.puts("\n✗ Error: #{inspect(reason)}")
after
120_000 -> IO.puts("Timeout")
end
end
end
EventLoop.run(session)
# --- Cleanup ---
CMDC.stop(session)