# 常见配方
端到端组合范例。每个配方都可独立运行(mock provider 或真实 LLM 二选一)。
---
## 1. 流式 UI(SSE 推送)
让前端浏览器实时看到每个 token:
```elixir
defmodule MyApp.AgentSSE do
use Plug.Router
plug :match
plug :dispatch
get "/agents/:id/stream" do
conn = send_chunked(conn, 200)
{:ok, session} = CMDC.create_agent(
session_id: id,
model: "anthropic:claude-sonnet-4-5"
)
CMDC.subscribe(session)
CMDC.prompt(session, conn.params["q"])
stream_loop(conn, session)
end
defp stream_loop(conn, session) do
receive do
{:cmdc_event, _sid, {:message_delta, %{delta: text}}} ->
{:ok, conn} = chunk(conn, "data: #{Jason.encode!(%{delta: text})}\n\n")
stream_loop(conn, session)
{:cmdc_event, _sid, {:agent_end, _msgs, usage}} ->
chunk(conn, "data: #{Jason.encode!(%{done: true, usage: usage})}\n\n")
CMDC.stop(session)
conn
{:cmdc_event, _sid, {:tool_execution_start, name, _, _}} ->
chunk(conn, "data: #{Jason.encode!(%{tool: name})}\n\n")
stream_loop(conn, session)
after
30_000 ->
CMDC.abort(session, reason: "timeout")
conn
end
end
end
```
生产环境推荐直接用 `cmdc_gateway` 的 SSE / WebSocket 端点,不用自己撸。
---
## 2. HITL 审批(CLI 终端确认)
```elixir
{:ok, session} = CMDC.create_agent(
model: "anthropic:claude-sonnet-4-5",
tools: [CMDC.Tool.Shell, CMDC.Tool.WriteFile],
plugins: [
{CMDC.Plugin.Builtin.HumanApproval, [
tools: ["shell", "write_file"],
permission_options: [:approve_once, :approve_always, :reject_once]
]}
]
)
CMDC.subscribe(session)
CMDC.prompt(session, "在 /tmp 创建一个 Elixir Hello World 项目")
handle_loop = fn loop ->
receive do
{:cmdc_event, _sid, {:approval_required, %{id: id, tool: tool, args: args}}} ->
IO.puts("\n[Agent 想跑] #{tool} #{inspect(args)}")
IO.write("(o)nce / (a)lways / (r)eject ? ")
case IO.gets("") |> String.trim() do
"o" -> CMDC.approve(session, id)
"a" -> CMDC.approve(session, id, kind: :approve_always)
_ -> CMDC.reject(session, id)
end
loop.(loop)
{:cmdc_event, _sid, {:agent_end, _, _}} ->
:done
end
end
handle_loop.(handle_loop)
```
`approve_always` 把 `{tool, command_family}` 加入 session 白名单,下次同类
工具调用直接放行。
---
## 3. 长会话不失忆(MemoryLoader + MemoryFlush 闭环)
```elixir
{:ok, session} = CMDC.create_agent(
model: "anthropic:claude-sonnet-4-5",
working_dir: "/tmp/long-chat",
plugins: [
# 启动时加载 AGENTS.md / MEMORY.md 到 system prompt
{CMDC.Plugin.Builtin.MemoryLoader, [
files: ["AGENTS.md", "MEMORY.md"]
]},
# 压缩前把待丢消息里的关键事实持久化
{CMDC.Plugin.Builtin.MemoryFlush, [
file: "MEMORY.md",
max_facts_per_flush: 10,
dedupe: true
]}
],
compactor: [
trigger: {:tokens, 50_000},
keep: {:messages, 10}
]
)
# 第 1 次会话:记下用户偏好
CMDC.prompt(session, "我喜欢用 Phoenix LiveView 而不是 React 写前端,记住。")
{:ok, _} = CMDC.collect_reply(session)
# 跑很多轮触发 compact,期间 MemoryFlush 把"用户偏好 LiveView"写进 MEMORY.md
# ...
# 同 session_id 重新打开(或 BEAM 重启后)
{:ok, session2} = CMDC.create_agent(
session_id: session.id,
...
)
# MemoryLoader 自动加载 MEMORY.md,新会话依然记得偏好
CMDC.prompt(session2, "帮我写个登录页")
```
详细示例见 [`CMDC.Plugin.Builtin.MemoryFlush`](CMDC.Plugin.Builtin.MemoryFlush.html)。
---
## 4. 多 Agent 协作(Task 工具派发)
父 Agent 通过 `Tool.Task` 派发子任务:
```elixir
{:ok, parent} = CMDC.create_agent(
model: "anthropic:claude-sonnet-4-5",
tools: [CMDC.Tool.Task],
subagents: [
%CMDC.SubAgent{
name: "researcher",
description: "搜集资料的专家",
system_prompt: "你只负责搜集和总结资料,不做评论",
tools: [CMDC.Tool.Grep, CMDC.Tool.ReadFile]
},
%CMDC.SubAgent{
name: "writer",
description: "把资料整理成文章的作家",
system_prompt: "把输入的资料写成结构化的中文报告",
prompt_mode: :task # 子代理用精简 system prompt 省 token
}
]
)
CMDC.prompt(parent, """
帮我做一份"Elixir 在 AI Agent 领域应用"的调研报告:
1. 让 researcher 搜集本仓库 lib/ 下的相关代码
2. 让 writer 整合成 1500 字中文文章
""")
{:ok, report} = CMDC.collect_reply(parent)
```
子代理跑在独立 OTP 进程里,crash 不传染父进程。需要 DAG 编排(debate /
hierarchy / router-llm)见 `cmdc_orchestrator` 子库。
---
## 5. 中段干预(Steering)
发现方向不对中途换路:
```elixir
{:ok, session} = CMDC.create_agent(
model: "anthropic:claude-sonnet-4-5",
tools: [CMDC.Tool.Shell, CMDC.Tool.WriteFile]
)
CMDC.subscribe(session)
CMDC.prompt(session, "用 Python 实现一个简单的 Web 爬虫,目标 example.com")
# 等了几秒发现想换语言
Process.sleep(3_000)
ref = make_ref()
CMDC.steer(session, ref, """
打住——改用 Elixir 的 Req + Floki 实现,别用 Python。
""")
# 等待新方向的回复
{:ok, reply} = CMDC.collect_reply(session)
```
正在运行的 killable 工具被 brutal_kill,下次 LLM 调用看到合并后的 steering
prompt。Plugin 可在 `:before_steering` hook 拦截恶意 steering。
---
## 6. 跨进程恢复(Checkpoint Facade,v0.5 推荐)
```elixir
# 第一段:正常对话
{:ok, session} = CMDC.create_agent(
session_id: "s-001",
model: "anthropic:claude-sonnet-4-5"
)
CMDC.prompt(session, "帮我设计一个 Elixir Agent 框架,先列大纲。")
{:ok, _} = CMDC.collect_reply(session)
# v0.5 新增 facade:抓快照
{:ok, snapshot} = CMDC.checkpoint!(session,
backend: CMDC.Checkpoint.Backend.DETS,
label: "after-outline"
)
CMDC.stop(session)
# 第二段:BEAM 重启后 / 换台机器,从 snapshot 恢复
{:ok, snapshot} = CMDC.Checkpoint.load("s-001",
backend: CMDC.Checkpoint.Backend.DETS
)
# v0.5 新增 facade:一行恢复 + 续 prompt
{:ok, restored} = CMDC.resume_session!(snapshot,
session_id: "s-001-resumed" # 避免 SessionRegistry 冲突,可选
)
CMDC.prompt(restored, "继续,第 2 章详细展开。")
{:ok, reply} = CMDC.collect_reply(restored)
```
**序列化策略**:保留 messages / tools / plugins(模块名) / user_data / prompt_mode 等可重建核心状态;
plugin_states 不进 snapshot,resume 后 plugin 通过 `:session_start` hook 自动重建。
**接 Cloak 加密**:
```elixir
# write path
{:ok, snap} = CMDC.checkpoint!(session)
encrypted = CMDC.Checkpoint.Snapshot.redact(snap, &MyApp.Vault.encrypt/1)
CMDC.Checkpoint.Backend.DETS.save("s-001", encrypted, [])
# read path
{:ok, snap} = CMDC.Checkpoint.load("s-001", backend: CMDC.Checkpoint.Backend.DETS)
decrypted = CMDC.Checkpoint.Snapshot.redact(snap, &MyApp.Vault.decrypt/1)
{:ok, restored} = CMDC.resume_session!(decrypted, session_id: "s-001-resumed")
```
DETS backend 持久化到本地文件;生产推荐 PG backend(在 `cmdc_memory_pg` 子库提供)。
---
## 6.1 自动定时存档(AutoCheckpoint Plugin,v0.5 推荐)
避免每个集成方都自己写"每 N turn 自动 save"+"老 snapshot GC"逻辑:
```elixir
{:ok, session} = CMDC.create_agent(
session_id: "s-prod",
model: "anthropic:claude-sonnet-4-5",
plugins: [
{CMDC.Plugin.Builtin.AutoCheckpoint,
backend: CMDC.Checkpoint.Backend.DETS, # 生产推 PG
every_n_turns: 10,
on_tools: ["shell", "edit_file"], # 关键工具后强制存档
on_events: [:approval_required, :session_end],
max_checkpoints: 50, # 同 session 上限
ttl_seconds: 30 * 86_400} # 30 天自动清理
]
)
# Plugin 自动按 OR 触发:任一条件命中即异步 save
# save + GC 走 CMDC.AsyncTaskSupervisor 异步,不阻塞 gen_statem callback
# snapshot.metadata 自动写 %{trigger: :auto, plugin: __MODULE__, turn: N}
# 订阅自动 save 事件做审计
CMDC.subscribe(session)
receive do
{:cmdc_event, _, {:auto_checkpoint_saved, %{checkpoint_id: id, trigger: t}}} ->
Logger.info("auto checkpoint saved: #{id} (trigger: #{t})")
end
```
---
## 6.2 长会话多租户 Hibernate(v0.5 新功能)
如果你的 Agent 是 1500+ idle 常驻多租户场景,单进程 heap ~8KB 累积内存爆炸。
新增 `:hibernate_after_ms` 配置后 idle 超时自动 hibernate,heap 8KB → 1.5KB(~80% 节省):
```elixir
{:ok, session} = CMDC.create_agent(
model: "anthropic:claude-sonnet-4-5",
hibernate_after_ms: 60_000 # 1 分钟无事件自动 hibernate
)
```
**推荐取值**:
| 场景 | 取值 |
|---|---|
| 短会话 / 单 Agent | `nil`(默认,不启用) |
| 长会话 / 主 Agent 多租户常驻 | `60_000` |
| 极端多租户(>1000 idle) | `30_000` |
| SubAgent 短任务 | `nil`(很快结束 hibernate 无收益) |
走 `:gen_statem` 原生 `:hibernate_after` 选项,所有状态(idle / running / streaming / executing_tools)
下都会触发。配套 telemetry `[:cmdc, :agent, :hibernate, :configured]` 在 Agent init 时 emit 一次,
可观测有多少 session 启用了 hibernate。
---
## 6.3 多租户 Provider Profile(Provider Registry,v0.5.1+)
多租户 SaaS 场景下,每个租户有自己的 LLM provider api_key / base_url。
v0.5.1 新增 `CMDC.Provider.Registry` 替代 per-Agent `provider_opts` 拼装:
```elixir
# ─────────────────────────────────────────────────────────────
# 启动时灌入:业务持久层 → cmdc Registry
# ─────────────────────────────────────────────────────────────
defmodule MyApp.ProviderProfileStore do
@moduledoc "持久层(DB)维护所有租户 profile,启动时全量灌入 cmdc Registry。"
use GenServer
def start_link(_), do: GenServer.start_link(__MODULE__, [], name: __MODULE__)
@impl true
def init(_) do
# 从 DB / 配置文件加载所有 profile(业务层持久化)
profiles = MyApp.Repo.all(MyApp.AI.Profile)
Enum.each(profiles, fn p ->
:ok =
CMDC.Provider.Registry.register(p.name,
provider: p.provider,
opts: [api_key: MyApp.Vault.decrypt(p.api_key), base_url: p.base_url]
)
end)
{:ok, %{}}
end
end
# ─────────────────────────────────────────────────────────────
# Blueprint 用字符串协议,一行替代 200µs/Agent opts 拼装
# ─────────────────────────────────────────────────────────────
defmodule MyApp.AI.TenantAgent do
@behaviour CMDC.Blueprint
def build(opts) do
tenant_id = Keyword.fetch!(opts, :tenant_id)
model_id = Keyword.fetch!(opts, :model_id)
CMDC.Options.new!(
# registry: 字符串协议 — Agent.init 自动解析 + 透写 provider_opts
model: "registry:#{tenant_id}-anthropic:#{model_id}",
tools: [CMDC.Tool.ReadFile, CMDC.Tool.Shell],
working_dir: opts[:working_dir] || File.cwd!()
)
end
end
# ─────────────────────────────────────────────────────────────
# 跨节点同步:自实现 Broadcaster 接 Phoenix.PubSub(生产推荐)
# ─────────────────────────────────────────────────────────────
defmodule MyApp.ProviderRegistryBroadcaster do
@behaviour CMDC.Provider.Registry.Broadcaster
@topic "cmdc:provider_registry"
# 可选:Registry init 时调用一次
def setup(_opts) do
Phoenix.PubSub.subscribe(MyApp.PubSub, @topic)
:ok
end
@impl true
def broadcast(event) do
# 派发给所有订阅节点,远端节点 PubSub handler 调
# send(GenServer.whereis(CMDC.Provider.Registry), {:cmdc_registry_remote, event})
Phoenix.PubSub.broadcast(MyApp.PubSub, @topic, {:cmdc_registry_remote, event})
end
end
# config/config.exs
config :cmdc, CMDC.Provider.Registry,
broadcaster: MyApp.ProviderRegistryBroadcaster
# ─────────────────────────────────────────────────────────────
# Checkpoint resume 容错:profile 缺失自动重试
# ─────────────────────────────────────────────────────────────
defp resume_with_retry(snap, retries \\ 1) do
case CMDC.resume_session!(snap) do
{:ok, session} ->
{:ok, session}
{:error, {:registry_profile_missing, name}} when retries > 0 ->
# 重新从 DB 加载 profile(可能其它节点已删/迁移)
:ok = reload_profile_from_db(name)
resume_with_retry(snap, retries - 1)
{:error, _} = err ->
err
end
end
```
**Profile name 约束**:不能含 `:` 字符(解析时被吃掉一段)。
Studio 现有 `"tenant-#{tid}-#{provider}"` 格式继续可用;UUID 含 `:` 时替换为 `-`。
**性能**:`Registry.lookup/1` ~0.5-1 µs ETS 读,比 per-Agent `provider_opts` 拼装快 200 倍。
完整设计见 `docs/dev/rfc/2026-05-provider-registry.md`(含 4 设计问题答疑 + 6 处评审订正照搬清单)。
---
## 7. 接 Langfuse / LangSmith / Tempo(Telemetry,v0.5.1 含 18 事件)
CMDC 只发 `:telemetry` 标准事件,sink 由你挂:
```elixir
defmodule MyApp.LangfuseSink do
def attach do
:telemetry.attach_many(
"cmdc-langfuse",
CMDC.Telemetry.all_events(), # v0.5.1 = 18 events (6 v0.4 + 10 v0.5 + 2 v0.5.1)
&__MODULE__.handle_event/4,
%{api_key: System.get_env("LANGFUSE_API_KEY")}
)
end
# --- v0.4 core 6 events ---
def handle_event([:cmdc, :llm, :request, :stop], measurements, metadata, config) do
Langfuse.create_generation(%{
session_id: metadata.session_id,
model: metadata.model,
input_tokens: metadata.tokens_in,
output_tokens: metadata.tokens_out,
latency_ms: measurements.duration_ms,
api_key: config.api_key
})
end
def handle_event([:cmdc, :tool, :exec, :stop], measurements, metadata, config) do
Langfuse.create_span(%{
session_id: metadata.session_id,
name: "tool:#{metadata.tool}",
duration_ms: measurements.duration_ms,
error: metadata.error?,
api_key: config.api_key
})
end
# --- v0.5 new 10 events ---
# Plugin Pipeline 整体跑完一次(hook 粒度,非 per-plugin),看 plugin 拖累
def handle_event([:cmdc, :plugin, :pipeline, :stop], m, meta, _) do
Metrics.histogram("cmdc.plugin.pipeline.duration_ms", m.duration_ms,
tags: [hook: meta.hook, halted_by: meta.halted_by])
end
# Plugin 异常被 Pipeline rescue(含 plugin name / hook / reason)
def handle_event([:cmdc, :plugin, :crash], _m, meta, _) do
Sentry.capture_message("Plugin crash: #{inspect(meta.plugin)}",
level: "warning",
extra: %{hook: meta.hook, reason: meta.reason, session: meta.session_id})
end
# Compactor 压缩前后(看上下文压缩频率 + 压缩率)
def handle_event([:cmdc, :compactor, :run, :stop], m, meta, _) do
Metrics.gauge("cmdc.compactor.ratio", m.ratio,
tags: [session: meta.session_id, strategy: meta.strategy])
end
# Checkpoint save/load 完成(snapshot_bytes / hit?)
def handle_event([:cmdc, :checkpoint, :save, :stop], m, meta, _) do
Metrics.histogram("cmdc.checkpoint.save.bytes", m.snapshot_bytes,
tags: [backend: meta.backend, label: meta.label])
end
def handle_event([:cmdc, :checkpoint, :load, :stop], m, meta, _) do
Metrics.counter("cmdc.checkpoint.load",
tags: [backend: meta.backend, hit: m.hit?, outcome: meta.outcome])
end
# SubAgent 启停(看子代理生命周期)
def handle_event([:cmdc, :subagent, :spawn, :start], _, meta, _) do
Logger.info("subagent spawned: #{meta.name}",
parent: meta.parent_session_id, sub: meta.sub_session_id)
end
# Hibernate 配置(init 时一次,看启用率)
def handle_event([:cmdc, :agent, :hibernate, :configured], m, meta, _) do
Metrics.gauge("cmdc.hibernate.enabled",
if(meta.enabled?, do: 1, else: 0),
tags: [session: meta.session_id, ms: m.ms])
end
def handle_event(_, _, _, _), do: :ok
end
# 在 application.ex 或合适入口处
MyApp.LangfuseSink.attach()
```
16 事件完整清单见 [`CMDC.Telemetry`](CMDC.Telemetry.html)。
> **设计说明**:v0.5 新 10 事件**走 `:telemetry.execute` 直接埋点,不经 EventBus 桥接**,
> 避免 per-session 业务订阅者噪音(它们是 system-wide 可观测性事件)。
> EventBus `:tool_execution_metrics` 业务事件保留给 Plugin / UI 用。
---
## 8. 大工具结果 0 token 占用
让一个 `shell` 工具返回 200KB 的 SQL 结果不再炸 LLM 上下文:
```elixir
backend = CMDC.Backend.Filesystem.new(
root_dir: "/tmp/cmdc-results",
virtual_mode: true
)
{:ok, session} = CMDC.create_agent(
model: "anthropic:claude-sonnet-4-5",
tools: [CMDC.Tool.Shell, CMDC.Tool.ReadFile],
plugins: [
{CMDC.Plugin.Builtin.LargeResultOffload, [
backend: backend,
tool_token_limit_before_evict: 20_000 # ≈ 80KB chars
]}
]
)
CMDC.prompt(session, """
跑 `cat /var/log/nginx/access.log`,从输出里找 5xx 错误的 path 分布。
""")
```
工具返回 200KB 时 plugin 自动写到 backend `/large_tool_results/<call_id>`,
LLM 看到的是 head + tail preview + 引导:"如需完整内容用
`read_file(path, offset, limit)` 分页"。
---
## 9. 内容安全双层防御
denylist 拦显式 + LLM-judge 拦语义:
```elixir
{:ok, session} = CMDC.create_agent(
model: "anthropic:claude-sonnet-4-5",
plugins: [
# L1 显式词拦截(快)
{MyApp.SensitiveContentGuard, words: ["身份证号", "护照号", "信用卡"]},
# L2 LLM-as-Judge(深度)
{CMDC.Plugin.Builtin.ContentPolicy, [
judge_model: "openai:gpt-4o-mini",
judge_provider_opts: [temperature: 0.0],
brand_keywords: ["MyProduct"]
]}
]
)
CMDC.prompt(session, "...")
```
---
## 10. 端到端模板:客服 Agent
把上面单点配方组合成一个完整 demo(多租户 + 工具 + 审批 + checkpoint +
telemetry):
```elixir
defmodule MyApp.SupportAgent do
@behaviour CMDC.Blueprint
@impl true
def build(opts) do
tenant_id = Keyword.fetch!(opts, :tenant_id)
%CMDC.Options{
model: "anthropic:claude-sonnet-4-5",
system_prompt: "你是 Acme 公司的客服 agent,只回答与产品相关的问题。",
working_dir: "/tmp/support/#{tenant_id}",
user_data: %{tenant_id: tenant_id, user_tier: opts[:user_tier]},
tools: [
MyApp.QueryDB,
MyApp.SearchKnowledgeBase,
MyApp.RefundOrder,
CMDC.Tool.AskUser
],
plugins: [
# 安全
{CMDC.Plugin.Builtin.HumanApproval, [tools: ["refund_order"]]},
{CMDC.Plugin.Builtin.ContentPolicy, []},
# 优化
{CMDC.Plugin.Builtin.MemoryLoader, [files: ["KB.md"]]},
{CMDC.Plugin.Builtin.ModelRouter, [
rules: [
%{condition: {:user_tier, :free}, model: "openai:gpt-4o-mini"},
%{condition: {:cost_gt, 0.10}, model: "openai:gpt-4o-mini"}
]
]},
# 监控
{CMDC.Plugin.Builtin.CostGuard, [max_usd: 0.50]},
CMDC.Plugin.Builtin.EventLogger
],
compactor: [trigger: {:tokens, 50_000}, keep: {:messages, 10}],
event_buffer_size: 200 # 支持前端断网补帧
}
end
end
# 启动一个 session
{:ok, session} = CMDC.create_agent(MyApp.SupportAgent,
tenant_id: "acme",
user_tier: :pro
)
```
---
## 下一步
- [升级指南](upgrading.html) — v0.2 → v0.3 → v0.4 用户感知到的兼容边界