Skip to main content

guides/cookbook.md

# 常见配方

端到端组合范例。每个配方都可独立运行(mock provider 或真实 LLM 二选一)。

---

## 1. 流式 UI(SSE 推送)

让前端浏览器实时看到每个 token:

```elixir
defmodule MyApp.AgentSSE do
  use Plug.Router

  plug :match
  plug :dispatch

  get "/agents/:id/stream" do
    conn = send_chunked(conn, 200)

    {:ok, session} = CMDC.create_agent(
      session_id: id,
      model: "anthropic:claude-sonnet-4-5"
    )

    CMDC.subscribe(session)
    CMDC.prompt(session, conn.params["q"])

    stream_loop(conn, session)
  end

  defp stream_loop(conn, session) do
    receive do
      {:cmdc_event, _sid, {:message_delta, %{delta: text}}} ->
        {:ok, conn} = chunk(conn, "data: #{Jason.encode!(%{delta: text})}\n\n")
        stream_loop(conn, session)

      {:cmdc_event, _sid, {:agent_end, _msgs, usage}} ->
        chunk(conn, "data: #{Jason.encode!(%{done: true, usage: usage})}\n\n")
        CMDC.stop(session)
        conn

      {:cmdc_event, _sid, {:tool_execution_start, name, _, _}} ->
        chunk(conn, "data: #{Jason.encode!(%{tool: name})}\n\n")
        stream_loop(conn, session)
    after
      30_000 ->
        CMDC.abort(session, reason: "timeout")
        conn
    end
  end
end
```

生产环境推荐直接用 `cmdc_gateway` 的 SSE / WebSocket 端点,不用自己撸。

---

## 2. HITL 审批(CLI 终端确认)

```elixir
{:ok, session} = CMDC.create_agent(
  model: "anthropic:claude-sonnet-4-5",
  tools: [CMDC.Tool.Shell, CMDC.Tool.WriteFile],
  plugins: [
    {CMDC.Plugin.Builtin.HumanApproval, [
      tools: ["shell", "write_file"],
      permission_options: [:approve_once, :approve_always, :reject_once]
    ]}
  ]
)

CMDC.subscribe(session)
CMDC.prompt(session, "在 /tmp 创建一个 Elixir Hello World 项目")

handle_loop = fn loop ->
  receive do
    {:cmdc_event, _sid, {:approval_required, %{id: id, tool: tool, args: args}}} ->
      IO.puts("\n[Agent 想跑] #{tool} #{inspect(args)}")
      IO.write("(o)nce / (a)lways / (r)eject ? ")

      case IO.gets("") |> String.trim() do
        "o" -> CMDC.approve(session, id)
        "a" -> CMDC.approve(session, id, kind: :approve_always)
        _   -> CMDC.reject(session, id)
      end

      loop.(loop)

    {:cmdc_event, _sid, {:agent_end, _, _}} ->
      :done
  end
end

handle_loop.(handle_loop)
```

`approve_always` 把 `{tool, command_family}` 加入 session 白名单,下次同类
工具调用直接放行。

---

## 3. 长会话不失忆(MemoryLoader + MemoryFlush 闭环)

```elixir
{:ok, session} = CMDC.create_agent(
  model: "anthropic:claude-sonnet-4-5",
  working_dir: "/tmp/long-chat",
  plugins: [
    # 启动时加载 AGENTS.md / MEMORY.md 到 system prompt
    {CMDC.Plugin.Builtin.MemoryLoader, [
      files: ["AGENTS.md", "MEMORY.md"]
    ]},

    # 压缩前把待丢消息里的关键事实持久化
    {CMDC.Plugin.Builtin.MemoryFlush, [
      file: "MEMORY.md",
      max_facts_per_flush: 10,
      dedupe: true
    ]}
  ],
  compactor: [
    trigger: {:tokens, 50_000},
    keep: {:messages, 10}
  ]
)

# 第 1 次会话:记下用户偏好
CMDC.prompt(session, "我喜欢用 Phoenix LiveView 而不是 React 写前端,记住。")
{:ok, _} = CMDC.collect_reply(session)

# 跑很多轮触发 compact,期间 MemoryFlush 把"用户偏好 LiveView"写进 MEMORY.md
# ...

# 同 session_id 重新打开(或 BEAM 重启后)
{:ok, session2} = CMDC.create_agent(
  session_id: session.id,
  ...
)

# MemoryLoader 自动加载 MEMORY.md,新会话依然记得偏好
CMDC.prompt(session2, "帮我写个登录页")
```

详细示例见 [`CMDC.Plugin.Builtin.MemoryFlush`](CMDC.Plugin.Builtin.MemoryFlush.html)。

---

## 4. 多 Agent 协作(Task 工具派发)

父 Agent 通过 `Tool.Task` 派发子任务:

```elixir
{:ok, parent} = CMDC.create_agent(
  model: "anthropic:claude-sonnet-4-5",
  tools: [CMDC.Tool.Task],
  subagents: [
    %CMDC.SubAgent{
      name: "researcher",
      description: "搜集资料的专家",
      system_prompt: "你只负责搜集和总结资料,不做评论",
      tools: [CMDC.Tool.Grep, CMDC.Tool.ReadFile]
    },
    %CMDC.SubAgent{
      name: "writer",
      description: "把资料整理成文章的作家",
      system_prompt: "把输入的资料写成结构化的中文报告",
      prompt_mode: :task  # 子代理用精简 system prompt 省 token
    }
  ]
)

CMDC.prompt(parent, """
帮我做一份"Elixir 在 AI Agent 领域应用"的调研报告:
1. 让 researcher 搜集本仓库 lib/ 下的相关代码
2. 让 writer 整合成 1500 字中文文章
""")

{:ok, report} = CMDC.collect_reply(parent)
```

子代理跑在独立 OTP 进程里,crash 不传染父进程。需要 DAG 编排(debate /
hierarchy / router-llm)见 `cmdc_orchestrator` 子库。

---

## 4.1 推理策略(链式 Plugin + 显式 Runner)

链式策略用于单 Agent 当前 turn 的提示约束,直接配置 `reasoning_strategy` 即可:

```elixir
{:ok, session} =
  CMDC.create_agent(
    model: "anthropic:claude-sonnet-4-5",
    reasoning_strategy: {CMDC.Reasoning.Strategy.AoT, max_iterations: 3}
  )

CMDC.prompt(session, "证明这个算法的不变量并给出复杂度。")
{:ok, reply} = CMDC.collect_reply(session)
```

搜索、投票、精炼和自适应策略会启动子 Agent 分支,必须通过 Runner 显式运行:

```elixir
{:ok, session} =
  CMDC.create_agent(
    model: "anthropic:claude-sonnet-4-5",
    event_buffer_size: 100
  )

CMDC.subscribe(session)

{:ok, result} =
  CMDC.Reasoning.Runner.run(
    session,
    {CMDC.Reasoning.Strategy.Adaptive, allow_search?: true},
    "比较三个修复方案,选出风险最低的一种",
    max_depth: 3,
    timeout_ms: 30_000,
    max_total_tokens: 8_000
  )

IO.inspect(result.answer)
```

Runner 会广播 `:reasoning_thought` / `:reasoning_branch` /
`:reasoning_score` / `:reasoning_prune` / `:reasoning_done`。Trace Viewer、
Eval Gate 或测试可用这些事件还原分支、分数、修订和 token 使用。

---

## 5. 中段干预(Steering)

发现方向不对中途换路:

```elixir
{:ok, session} = CMDC.create_agent(
  model: "anthropic:claude-sonnet-4-5",
  tools: [CMDC.Tool.Shell, CMDC.Tool.WriteFile]
)

CMDC.subscribe(session)
CMDC.prompt(session, "用 Python 实现一个简单的 Web 爬虫,目标 example.com")

# 等了几秒发现想换语言
Process.sleep(3_000)

ref = make_ref()
CMDC.steer(session, ref, """
打住——改用 Elixir 的 Req + Floki 实现,别用 Python。
""")

# 等待新方向的回复
{:ok, reply} = CMDC.collect_reply(session)
```

正在运行的 killable 工具被 brutal_kill,下次 LLM 调用看到合并后的 steering
prompt。Plugin 可在 `:before_steering` hook 拦截恶意 steering。

---

## 6. 跨进程恢复(Checkpoint Facade,v0.5 推荐)

```elixir
# 第一段:正常对话
{:ok, session} = CMDC.create_agent(
  session_id: "s-001",
  model: "anthropic:claude-sonnet-4-5"
)

CMDC.prompt(session, "帮我设计一个 Elixir Agent 框架,先列大纲。")
{:ok, _} = CMDC.collect_reply(session)

# v0.5 新增 facade:抓快照
{:ok, snapshot} = CMDC.checkpoint!(session,
  backend: CMDC.Checkpoint.Backend.DETS,
  label: "after-outline"
)

CMDC.stop(session)


# 第二段:BEAM 重启后 / 换台机器,从 snapshot 恢复
{:ok, snapshot} = CMDC.Checkpoint.load("s-001",
  backend: CMDC.Checkpoint.Backend.DETS
)

# v0.5 新增 facade:一行恢复 + 续 prompt
{:ok, restored} = CMDC.resume_session!(snapshot,
  session_id: "s-001-resumed"   # 避免 SessionRegistry 冲突,可选
)

CMDC.prompt(restored, "继续,第 2 章详细展开。")
{:ok, reply} = CMDC.collect_reply(restored)
```

**序列化策略**:保留 messages / tools / plugins(模块名) / user_data / prompt_mode 等可重建核心状态;
plugin_states 不进 snapshot,resume 后 plugin 通过 `:session_start` hook 自动重建。

**接 Cloak 加密**:

```elixir
# write path
{:ok, snap} = CMDC.checkpoint!(session)
encrypted = CMDC.Checkpoint.Snapshot.redact(snap, &MyApp.Vault.encrypt/1)
CMDC.Checkpoint.Backend.DETS.save("s-001", encrypted, [])

# read path
{:ok, snap} = CMDC.Checkpoint.load("s-001", backend: CMDC.Checkpoint.Backend.DETS)
decrypted = CMDC.Checkpoint.Snapshot.redact(snap, &MyApp.Vault.decrypt/1)
{:ok, restored} = CMDC.resume_session!(decrypted, session_id: "s-001-resumed")
```

DETS backend 持久化到本地文件;生产推荐 PG backend(在 `cmdc_memory_pg` 子库提供)。

---

## 6.1 自动定时存档(AutoCheckpoint Plugin,v0.5 推荐)

避免每个集成方都自己写"每 N turn 自动 save"+"老 snapshot GC"逻辑:

```elixir
{:ok, session} = CMDC.create_agent(
  session_id: "s-prod",
  model: "anthropic:claude-sonnet-4-5",
  plugins: [
    {CMDC.Plugin.Builtin.AutoCheckpoint,
     backend: CMDC.Checkpoint.Backend.DETS,    # 生产推 PG
     every_n_turns: 10,
     on_tools: ["shell", "edit_file"],          # 关键工具后强制存档
     on_events: [:approval_required, :session_end],
     max_checkpoints: 50,                       # 同 session 上限
     ttl_seconds: 30 * 86_400}                  # 30 天自动清理
  ]
)

# Plugin 自动按 OR 触发:任一条件命中即异步 save
# save + GC 走 CMDC.AsyncTaskSupervisor 异步,不阻塞 gen_statem callback
# snapshot.metadata 自动写 %{trigger: :auto, plugin: __MODULE__, turn: N}

# 订阅自动 save 事件做审计
CMDC.subscribe(session)
receive do
  {:cmdc_event, _, {:auto_checkpoint_saved, %{checkpoint_id: id, trigger: t}}} ->
    Logger.info("auto checkpoint saved: #{id} (trigger: #{t})")
end
```

---

## 6.2 长会话多租户 Hibernate(v0.5 新功能)

如果你的 Agent 是 1500+ idle 常驻多租户场景,单进程 heap ~8KB 累积内存爆炸。
新增 `:hibernate_after_ms` 配置后 idle 超时自动 hibernate,heap 8KB → 1.5KB(~80% 节省):

```elixir
{:ok, session} = CMDC.create_agent(
  model: "anthropic:claude-sonnet-4-5",
  hibernate_after_ms: 60_000   # 1 分钟无事件自动 hibernate
)
```

**推荐取值**:

| 场景 | 取值 |
|---|---|
| 短会话 / 单 Agent | `nil`(默认,不启用) |
| 长会话 / 主 Agent 多租户常驻 | `60_000` |
| 极端多租户(>1000 idle) | `30_000` |
| SubAgent 短任务 | `nil`(很快结束 hibernate 无收益) |

走 `:gen_statem` 原生 `:hibernate_after` 选项,所有状态(idle / running / streaming / executing_tools)
下都会触发。配套 telemetry `[:cmdc, :agent, :hibernate, :configured]` 在 Agent init 时 emit 一次,
可观测有多少 session 启用了 hibernate。

---

## 6.3 多租户 Provider Profile(Provider Registry,v0.5.1+)

多租户 SaaS 场景下,每个租户有自己的 LLM provider api_key / base_url。
v0.5.1 新增 `CMDC.Provider.Registry` 替代 per-Agent `provider_opts` 拼装:

```elixir
# ─────────────────────────────────────────────────────────────
# 启动时灌入:业务持久层 → cmdc Registry
# ─────────────────────────────────────────────────────────────

defmodule MyApp.ProviderProfileStore do
  @moduledoc "持久层(DB)维护所有租户 profile,启动时全量灌入 cmdc Registry。"

  use GenServer

  def start_link(_), do: GenServer.start_link(__MODULE__, [], name: __MODULE__)

  @impl true
  def init(_) do
    # 从 DB / 配置文件加载所有 profile(业务层持久化)
    profiles = MyApp.Repo.all(MyApp.AI.Profile)

    Enum.each(profiles, fn p ->
      :ok =
        CMDC.Provider.Registry.register(p.name,
          provider: p.provider,
          opts: [api_key: MyApp.Vault.decrypt(p.api_key), base_url: p.base_url]
        )
    end)

    {:ok, %{}}
  end
end

# ─────────────────────────────────────────────────────────────
# Blueprint 用字符串协议,一行替代 200µs/Agent opts 拼装
# ─────────────────────────────────────────────────────────────

defmodule MyApp.AI.TenantAgent do
  @behaviour CMDC.Blueprint

  def build(opts) do
    tenant_id = Keyword.fetch!(opts, :tenant_id)
    model_id = Keyword.fetch!(opts, :model_id)

    CMDC.Options.new!(
      # registry: 字符串协议 — Agent.init 自动解析 + 透写 provider_opts
      model: "registry:#{tenant_id}-anthropic:#{model_id}",
      tools: [CMDC.Tool.ReadFile, CMDC.Tool.Shell],
      working_dir: opts[:working_dir] || File.cwd!()
    )
  end
end

# ─────────────────────────────────────────────────────────────
# 跨节点同步:自实现 Broadcaster 接 Phoenix.PubSub(生产推荐)
# ─────────────────────────────────────────────────────────────

defmodule MyApp.ProviderRegistryBroadcaster do
  @behaviour CMDC.Provider.Registry.Broadcaster

  @topic "cmdc:provider_registry"

  # 可选:Registry init 时调用一次
  def setup(_opts) do
    Phoenix.PubSub.subscribe(MyApp.PubSub, @topic)
    :ok
  end

  @impl true
  def broadcast(event) do
    # 派发给所有订阅节点,远端节点 PubSub handler 调
    # send(GenServer.whereis(CMDC.Provider.Registry), {:cmdc_registry_remote, event})
    Phoenix.PubSub.broadcast(MyApp.PubSub, @topic, {:cmdc_registry_remote, event})
  end
end

# config/config.exs
config :cmdc, CMDC.Provider.Registry,
  broadcaster: MyApp.ProviderRegistryBroadcaster

# ─────────────────────────────────────────────────────────────
# Checkpoint resume 容错:profile 缺失自动重试
# ─────────────────────────────────────────────────────────────

defp resume_with_retry(snap, retries \\ 1) do
  case CMDC.resume_session!(snap) do
    {:ok, session} ->
      {:ok, session}

    {:error, {:registry_profile_missing, name}} when retries > 0 ->
      # 重新从 DB 加载 profile(可能其它节点已删/迁移)
      :ok = reload_profile_from_db(name)
      resume_with_retry(snap, retries - 1)

    {:error, _} = err ->
      err
  end
end
```

**Profile name 约束**:不能含 `:` 字符(解析时被吃掉一段)。
Studio 现有 `"tenant-#{tid}-#{provider}"` 格式继续可用;UUID 含 `:` 时替换为 `-`。

**性能**:`Registry.lookup/1` ~0.5-1 µs ETS 读,比 per-Agent `provider_opts` 拼装快 200 倍。
完整设计见 `docs/dev/rfc/2026-05-provider-registry.md`(含 4 设计问题答疑 + 6 处评审订正照搬清单)。

---

## 6.4 Skill `allowed_tools` 白名单 enforcement(v0.5.3+ SkillGuard)

`SkillGuard` Plugin 在每次工具调用前检查 active Skills 的 `allowed_tools`
白名单,多 Skill 取并集,外白名工具直接 block。

```elixir
# 1. Skill 的 SKILL.md frontmatter 声明 allowed_tools
# ---
# name: elixir-testing
# description: Elixir 项目测试最佳实践
# allowed_tools:
#   - read_file
#   - grep
#   - shell
# ---

# 2. 发现 + 挂载 SkillGuard Plugin
skills = CMDC.Skill.discover([{:project, "./.cmdc/skills"}])

{:ok, session} =
  CMDC.create_agent(
    model: "anthropic:claude-sonnet-4-5",
    tools: [CMDC.Tool.ReadFile, CMDC.Tool.Grep, CMDC.Tool.WriteFile, CMDC.Tool.Shell],
    plugins: [
      {CMDC.Plugin.Builtin.SkillGuard,
       skills: skills,
       active: ["elixir-testing"],     # 默认全部 active
       enforce_mode: :strict}          # :strict | :warn
    ]
  )
```

**并集语义**:
- 单 active skill `allowed_tools: ["read", "grep"]` → 仅这两个工具可用
- 多 active skill `[["read"], ["write", "grep"]]` → 并集 `["read", "write", "grep"]` 都可用
- **任一** active skill 未设 `allowed_tools` → `:unrestricted` 所有工具可用

**两种 enforce 模式**:
- `:strict`(默认)—— block + Logger.info
- `:warn` —— 仅 Logger.warning 不 block(适合迁移期观察)

**优先级 `priority/0 = 20`** —— 在 `SecurityGuard:10` 之后业务 Plugin 之前。

## 7. 接 Langfuse / LangSmith / Tempo(Telemetry,v0.5.3 含 18 事件)

CMDC 只发 `:telemetry` 标准事件,sink 由你挂:

```elixir
defmodule MyApp.LangfuseSink do
  def attach do
    :telemetry.attach_many(
      "cmdc-langfuse",
      CMDC.Telemetry.all_events(),       # v0.5.1 = 18 events (6 v0.4 + 10 v0.5 + 2 v0.5.1)
      &__MODULE__.handle_event/4,
      %{api_key: System.get_env("LANGFUSE_API_KEY")}
    )
  end

  # --- v0.4 core 6 events ---

  def handle_event([:cmdc, :llm, :request, :stop], measurements, metadata, config) do
    Langfuse.create_generation(%{
      session_id: metadata.session_id,
      model: metadata.model,
      input_tokens: metadata.tokens_in,
      output_tokens: metadata.tokens_out,
      latency_ms: measurements.duration_ms,
      api_key: config.api_key
    })
  end

  def handle_event([:cmdc, :tool, :exec, :stop], measurements, metadata, config) do
    Langfuse.create_span(%{
      session_id: metadata.session_id,
      name: "tool:#{metadata.tool}",
      duration_ms: measurements.duration_ms,
      error: metadata.error?,
      api_key: config.api_key
    })
  end

  # --- v0.5 new 10 events ---

  # Plugin Pipeline 整体跑完一次(hook 粒度,非 per-plugin),看 plugin 拖累
  def handle_event([:cmdc, :plugin, :pipeline, :stop], m, meta, _) do
    Metrics.histogram("cmdc.plugin.pipeline.duration_ms", m.duration_ms,
      tags: [hook: meta.hook, halted_by: meta.halted_by])
  end

  # Plugin 异常被 Pipeline rescue(含 plugin name / hook / reason)
  def handle_event([:cmdc, :plugin, :crash], _m, meta, _) do
    Sentry.capture_message("Plugin crash: #{inspect(meta.plugin)}",
      level: "warning",
      extra: %{hook: meta.hook, reason: meta.reason, session: meta.session_id})
  end

  # Compactor 压缩前后(看上下文压缩频率 + 压缩率)
  def handle_event([:cmdc, :compactor, :run, :stop], m, meta, _) do
    Metrics.gauge("cmdc.compactor.ratio", m.ratio,
      tags: [session: meta.session_id, strategy: meta.strategy])
  end

  # Checkpoint save/load 完成(snapshot_bytes / hit?)
  def handle_event([:cmdc, :checkpoint, :save, :stop], m, meta, _) do
    Metrics.histogram("cmdc.checkpoint.save.bytes", m.snapshot_bytes,
      tags: [backend: meta.backend, label: meta.label])
  end

  def handle_event([:cmdc, :checkpoint, :load, :stop], m, meta, _) do
    Metrics.counter("cmdc.checkpoint.load",
      tags: [backend: meta.backend, hit: m.hit?, outcome: meta.outcome])
  end

  # SubAgent 启停(看子代理生命周期)
  def handle_event([:cmdc, :subagent, :spawn, :start], _, meta, _) do
    Logger.info("subagent spawned: #{meta.name}",
      parent: meta.parent_session_id, sub: meta.sub_session_id)
  end

  # Hibernate 配置(init 时一次,看启用率)
  def handle_event([:cmdc, :agent, :hibernate, :configured], m, meta, _) do
    Metrics.gauge("cmdc.hibernate.enabled",
      if(meta.enabled?, do: 1, else: 0),
      tags: [session: meta.session_id, ms: m.ms])
  end

  def handle_event(_, _, _, _), do: :ok
end

# 在 application.ex 或合适入口处
MyApp.LangfuseSink.attach()
```

16 事件完整清单见 [`CMDC.Telemetry`](CMDC.Telemetry.html)。

> **设计说明**:v0.5 新 10 事件**走 `:telemetry.execute` 直接埋点,不经 EventBus 桥接**,
> 避免 per-session 业务订阅者噪音(它们是 system-wide 可观测性事件)。
> EventBus `:tool_execution_metrics` 业务事件保留给 Plugin / UI 用。

---

## 8. 大工具结果 0 token 占用

让一个 `shell` 工具返回 200KB 的 SQL 结果不再炸 LLM 上下文:

```elixir
backend = CMDC.Backend.Filesystem.new(
  root_dir: "/tmp/cmdc-results",
  virtual_mode: true
)

{:ok, session} = CMDC.create_agent(
  model: "anthropic:claude-sonnet-4-5",
  tools: [CMDC.Tool.Shell, CMDC.Tool.ReadFile],
  plugins: [
    {CMDC.Plugin.Builtin.LargeResultOffload, [
      backend: backend,
      tool_token_limit_before_evict: 20_000  # ≈ 80KB chars
    ]}
  ]
)

CMDC.prompt(session, """
跑 `cat /var/log/nginx/access.log`,从输出里找 5xx 错误的 path 分布。
""")
```

工具返回 200KB 时 plugin 自动写到 backend `/large_tool_results/<call_id>`,
LLM 看到的是 head + tail preview + 引导:"如需完整内容用
`read_file(path, offset, limit)` 分页"。

---

## 9. 内容安全双层防御

denylist 拦显式 + LLM-judge 拦语义:

```elixir
{:ok, session} = CMDC.create_agent(
  model: "anthropic:claude-sonnet-4-5",
  plugins: [
    # L1 显式词拦截(快)
    {MyApp.SensitiveContentGuard, words: ["身份证号", "护照号", "信用卡"]},

    # L2 LLM-as-Judge(深度)
    {CMDC.Plugin.Builtin.ContentPolicy, [
      judge_model: "openai:gpt-4o-mini",
      judge_provider_opts: [temperature: 0.0],
      brand_keywords: ["MyProduct"]
    ]}
  ]
)

CMDC.prompt(session, "...")
```

---

## 10. 端到端模板:客服 Agent

把上面单点配方组合成一个完整 demo(多租户 + 工具 + 审批 + checkpoint +
telemetry):

```elixir
defmodule MyApp.SupportAgent do
  @behaviour CMDC.Blueprint

  @impl true
  def build(opts) do
    tenant_id = Keyword.fetch!(opts, :tenant_id)

    %CMDC.Options{
      model: "anthropic:claude-sonnet-4-5",
      system_prompt: "你是 Acme 公司的客服 agent,只回答与产品相关的问题。",
      working_dir: "/tmp/support/#{tenant_id}",
      user_data: %{tenant_id: tenant_id, user_tier: opts[:user_tier]},

      tools: [
        MyApp.QueryDB,
        MyApp.SearchKnowledgeBase,
        MyApp.RefundOrder,
        CMDC.Tool.AskUser
      ],

      plugins: [
        # 安全
        {CMDC.Plugin.Builtin.HumanApproval, [tools: ["refund_order"]]},
        {CMDC.Plugin.Builtin.ContentPolicy, []},

        # 优化
        {CMDC.Plugin.Builtin.MemoryLoader, [files: ["KB.md"]]},
        {CMDC.Plugin.Builtin.ModelRouter, [
          rules: [
            %{condition: {:user_tier, :free}, model: "openai:gpt-4o-mini"},
            %{condition: {:cost_gt, 0.10}, model: "openai:gpt-4o-mini"}
          ]
        ]},

        # 监控
        {CMDC.Plugin.Builtin.CostGuard, [max_usd: 0.50]},
        CMDC.Plugin.Builtin.EventLogger
      ],

      compactor: [trigger: {:tokens, 50_000}, keep: {:messages, 10}],
      event_buffer_size: 200  # 支持前端断网补帧
    }
  end
end

# 启动一个 session
{:ok, session} = CMDC.create_agent(MyApp.SupportAgent,
  tenant_id: "acme",
  user_tier: :pro
)
```

---

## 11. AuditEvent 一行落库(v0.5.4+)

把 Agent 一轮对话(prompt → finish/abort)直接写到 `audit_logs` 表,
无需为 47 个 EventBus + 18 个 telemetry 事件各写一段 case 翻译。

### 快路径:订阅 `:after_turn` Plugin hook

```elixir
defmodule MyApp.Plugins.AuditSink do
  @moduledoc "把每轮对话写入 audit_logs。"
  @behaviour CMDC.Plugin

  @impl true
  def init(opts), do: {:ok, %{repo: Keyword.fetch!(opts, :repo)}}

  @impl true
  def priority, do: 900

  @impl true
  def handle_event({:after_turn, payload}, state, ctx) do
    audit = CMDC.AuditEvent.from_turn(payload, ctx)

    state.repo.insert!(%MyApp.AuditLog{
      actor_type: Atom.to_string(audit.actor_type),
      actor_id: audit.actor_id,
      action: audit.action,
      target_type: audit.target_type,
      target_id: audit.target_id,
      occurred_at: audit.occurred_at,
      metadata: audit.metadata,
      schema_version: audit.schema_version
    })

    {:continue, state}
  end

  def handle_event(_event, state, _ctx), do: {:continue, state}
end

# 挂载
{:ok, session} = CMDC.create_agent(
  model: "anthropic:claude-sonnet-4-5",
  plugins: [{MyApp.Plugins.AuditSink, repo: MyApp.Repo}]
)
```

`from_turn/2` 自动从 payload 抽取 outcome / duration / token_usage / messages 摘要,
组成行业标准 6 元组 audit 记录(actor / action / target / before / after / metadata / at)。

### 细粒度路径:订阅 telemetry 事件(可选)

```elixir
:telemetry.attach_many(
  "audit-sink-telemetry",
  CMDC.Telemetry.all_events(),
  fn name, meas, meta, _ ->
    case CMDC.AuditEvent.from_telemetry(name, meas, meta) do
      nil -> :ok
      event -> MyApp.AuditLog.insert!(event)
    end
  end,
  nil
)
```

v0.5.4 轻量版覆盖 6 个核心 telemetry 事件(`agent.turn.start/stop` /
`llm.request.start/stop` / `tool.exec.start/stop`),其他事件返 `nil` 让调用方降级。
完整 18+ 事件映射规划见 v0.6。

### Postgres schema 参考

```sql
CREATE TABLE audit_logs (
  id            BIGSERIAL PRIMARY KEY,
  actor_type    VARCHAR(32) NOT NULL,
  actor_id      VARCHAR(128),
  action        VARCHAR(128) NOT NULL,
  target_type   VARCHAR(64),
  target_id     VARCHAR(128),
  occurred_at   TIMESTAMP WITH TIME ZONE NOT NULL,
  before        JSONB,
  after         JSONB,
  metadata      JSONB NOT NULL DEFAULT '{}',
  schema_version SMALLINT NOT NULL DEFAULT 1
);

CREATE INDEX idx_audit_actor ON audit_logs (actor_type, actor_id, occurred_at DESC);
CREATE INDEX idx_audit_action ON audit_logs (action, occurred_at DESC);
```

---

## 12. 群体 Agent 协调(v0.6+,subscribe_group)

### 场景

多个 Agent 协作完成一项任务时——多人房间 / 协作小组 / 联合调查 task force /
多 Agent debate 等——上层服务需要实时观察"整个群组"的事件,而不是手动维护
`[session_id_1, session_id_2, ...]` 列表去多次 `subscribe/1`。

新 API `subscribe_group/1` + `Options.group_id` 一次性把同组所有 Agent 事件聚合到
单个订阅者,新增 Agent 加入 group 后自动出现在订阅流里。

### 端到端示例

```elixir
# 1) 创建(或自定义)group id
group_id = CMDC.EventBus.create_group()
# => "grp_rzxxxxxxxxxxxxxxxxxxxxxx"
# 也可用业务语义命名:"study-room-42" / "task-force-investigate-incident-7"

# 2) 同一 group 创建多个 Agent
{:ok, alice} = CMDC.create_agent(
  model: "anthropic:claude-sonnet-4-5",
  group_id: group_id,
  user_data: %{role: "researcher", agent_name: "alice"}
)

{:ok, bob} = CMDC.create_agent(
  model: "anthropic:claude-sonnet-4-5",
  group_id: group_id,
  user_data: %{role: "reviewer", agent_name: "bob"}
)

# 3) 一次性订阅整个 group
{:ok, _pid} = CMDC.EventBus.subscribe_group(group_id)

# 4) 派任务
CMDC.prompt(alice, "调研 Elixir 1.20 的类型推断特性")
CMDC.prompt(bob, "等 alice 给出调研结果后做技术评审")

# 5) 聚合接收
defmodule GroupObserver do
  def run do
    receive do
      {:cmdc_event, sid, {:agent_end, _msgs, _usage}} ->
        IO.puts("[group] #{sid} 结束")
        run()

      {:cmdc_event, sid, {:tool_started, name, _, _}} ->
        IO.puts("[group] #{sid} 调用工具 #{name}")
        run()

      {:cmdc_event, sid, event} ->
        IO.puts("[group] #{sid} #{inspect(elem(event, 0))}")
        run()
    end
  end
end

GroupObserver.run()
```

### 与单 session 订阅互补

`subscribe_group/1` 和 `subscribe/1` 互补,不互斥:

| 需求 | 用什么 |
|---|---|
| UI 渲染单个 Agent 流式 token | `subscribe(session_id)` |
| 协作大厅展示房间内所有 Agent 进度 | `subscribe_group(group_id)` |
| 网关监控所有 session(全局) | `subscribe_all()` |

三者可在同一进程同时叠加(同一事件可能被多个订阅渠道送达,集成方按 `session_id` 自行去重)。

### SubAgent 自动继承

父 Agent 设置 `group_id` 后,所有 SubAgent(包括 `Tool.Task` 派发的子任务)
自动继承同一 group_id,不需要每个子代理重复声明:

```elixir
{:ok, supervisor} = CMDC.create_agent(
  model: "anthropic:claude-sonnet-4-5",
  group_id: "task-force-incident-7",
  subagents: [
    CMDC.SubAgent.new!(name: "log_analyzer", description: "分析日志"),
    CMDC.SubAgent.new!(name: "metric_collector", description: "收集指标")
  ]
)

# subscribe_group 一次性看到主 Agent + 所有 SubAgent 的事件流
CMDC.EventBus.subscribe_group("task-force-incident-7")
```

### Phoenix / WebSocket 集成

把 group_id 当作"房间"概念:

```elixir
defmodule MyAppWeb.RoomChannel do
  use Phoenix.Channel

  def join("room:" <> room_id, _params, socket) do
    {:ok, _pid} = CMDC.EventBus.subscribe_group("study-room-" <> room_id)
    {:ok, assign(socket, :room_id, room_id)}
  end

  def handle_info({:cmdc_event, sid, event}, socket) do
    push(socket, "agent_event", %{session_id: sid, event: inspect(event)})
    {:noreply, socket}
  end
end
```

### 性能特征

- **零额外开销**:不传 `group_id` 时 `EventBus.broadcast/3` 内部 `is_binary(group_id)` 守卫直接跳过 group 派发分支
- **派发复杂度**:O(订阅者数),与 session-level 等同
- **跨节点**:基于 `Registry` local-only,跨节点请走外部消息总线(Phoenix.PubSub 等)

---

## 13. Provider Registry + Cloak 静态加密(v0.6+ `resolver_fn`)

### 场景

多租户 Provider 配置里的 `api_key` 不能明文驻留:

- 持久化数据库时密文存盘(Cloak / Vault / KMS)
- BEAM 堆里只保留密文 + 解密引用,运行时懒解密
- vault rotate 后无需重启节点,TTL 过期自动重读

v0.6+ 在 `CMDC.Provider.Registry.register/2` 加 `:resolver_fn` opt(arity-1 函数),
`lookup/1` 命中时自动调用把存储态 opts 转成运行态 opts。集成方完全控制
解密 / cache / TTL 策略,cmdc 不内置 vault 协议或全局 resolver。

### 端到端示例(Cloak.AES.GCM)

```elixir
# mix.exs:{:cloak, "~> 1.1"}

defmodule MyApp.Vault do
  use Cloak.Vault, otp_app: :my_app

  @impl true
  def init(config) do
    config =
      Keyword.put(config, :ciphers,
        default: {Cloak.Ciphers.AES.GCM,
                  tag: "AES.GCM.V1",
                  key: Base.decode64!(System.fetch_env!("CLOAK_KEY"))})

    {:ok, config}
  end
end
```

```elixir
defmodule MyApp.RegistryWithCloak do
  alias CMDC.Provider.Registry

  @cache_table :registry_cloak_cache
  @ttl_ms 5 * 60 * 1000

  def warmup do
    if :ets.whereis(@cache_table) == :undefined do
      :ets.new(@cache_table, [:set, :public, :named_table, read_concurrency: true])
    end

    for tenant <- MyApp.Repo.all(MyApp.Tenant) do
      :ok =
        Registry.register("tenant-#{tenant.id}",
          provider: tenant.provider,
          opts: [encrypted_key: tenant.encrypted_api_key, base_url: tenant.base_url],
          resolver_fn: &resolve_opts/1
        )
    end
  end

  defp resolve_opts(opts) do
    {ciphertext, rest} = Keyword.pop!(opts, :encrypted_key)
    plain = cache_get_or_decrypt(ciphertext)

    Keyword.put(rest, :api_key, plain)
  end

  defp cache_get_or_decrypt(ciphertext) do
    now = System.monotonic_time(:millisecond)

    case :ets.lookup(@cache_table, ciphertext) do
      [{^ciphertext, plain, expires_at}] when expires_at > now ->
        plain

      _ ->
        {:ok, plain} = MyApp.Vault.decrypt(ciphertext)
        :ets.insert(@cache_table, {ciphertext, plain, now + @ttl_ms})
        plain
    end
  end
end
```

```elixir
# application.ex 启动期调一次
MyApp.RegistryWithCloak.warmup()

# 业务代码完全照常用
{:ok, session} = CMDC.create_agent(
  model: "registry:tenant-7:claude-sonnet-4-5",
  user_data: %{tenant_id: 7}
)
```

### 数据库 schema 参考

```sql
CREATE TABLE tenants (
  id                BIGSERIAL PRIMARY KEY,
  provider          VARCHAR(64) NOT NULL,
  base_url          VARCHAR(512) NOT NULL,
  encrypted_api_key BYTEA NOT NULL,
  created_at        TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT now()
);
```

### resolver 错误处理

`lookup/1` 在 resolver 失败时会返回结构化错误,cmdc 的 `create_agent`
路径会把它转成 `{:error, ...}` 给调用方,集成方可在 EventBus / monitor 捕获:

```elixir
case CMDC.create_agent(model: "registry:tenant-vault-down:claude-sonnet-4-5") do
  {:ok, _session} ->
    :ok

  {:error, {:registry_resolver_failed, name, {:resolver_failed, exception}}} ->
    Logger.error("Vault unreachable for #{name}: #{Exception.message(exception)}")
    {:error, :vault_down}

  {:error, {:registry_resolver_invalid, name}} ->
    Logger.error("resolver_fn for #{name} returned non-keyword")
    {:error, :config_bug}
end
```

### 何时选模式 1 vs 模式 2

| 维度 | 模式 1(启动期全量解密) | 模式 2(懒解密 + cache) |
|---|---|---|
| 实现成本 | 几行 register 循环 | ~30 行含 cache 逻辑 |
| Lookup hot path | ~500 ns | ~50 µs 首次 + ~1 µs 后续 |
| 明文驻留 | 长期 | 仅 TTL 内 |
| Rotate | 全节点重灌 | TTL 过期自动 |
| 推荐场景 | 95% 中小规模 / 单租户 | 高安全 / 频繁 rotate / 多租户 |

**强烈推荐**:模式 1(简单 + 性能最优 + 故障早暴露),仅在审计 / 合规 / vault rotate
频率高时切到模式 2。

### 跨节点部署注意

`:resolver_fn` 闭包**仅本地节点有效**——cmdc 的 Broadcaster 同步会自动剥离它,
远端节点 `lookup/1` 走"无 resolver"路径直接读 opts。多节点部署集成方必须
在**每个节点**的 init 代码里各自调用一次 `Registry.register/2` 传入本节点
版本的 resolver_fn 闭包。

---

## 14. 按 tags 过滤情景记忆(v0.6+ Memory `:tags` 一等公民)

### 场景

Agent 记忆库随会话增长可能膨胀到上万条 entry,按文本关键词检索 +
单一 `user_id` 隔离已不够:

- 按业务域筛选("只检索 finance / tax 相关记忆")
- 按时间区间标记("只看 2026 H1 的记忆")
- 按 Skill 来源筛选("只看 data-analysis skill 产生的记忆")

v0.6+ 起 `CMDC.Memory.entry()` 把 `:tags :: [String.t()]` 提升为一等公民字段,
backend 实现自动建索引(ETS 内存遍历 / PG GIN 索引)。搜索时通过
`{:tags_any, [...]}` / `{:tags_all, [...]}` filter 高效过滤。

### 端到端示例

```elixir
# 1) 写入时带 tags
{:ok, mem} = CMDC.Memory.ETS.new(:agent_memory)

CMDC.Memory.ETS.store(mem, "ep-001", %{
  content: "用 SQL JOIN 解决了客户去年 Q4 销售额查询",
  user_id: "u-1",
  tags: ["domain:finance", "topic:sql", "year:2025"]
})

CMDC.Memory.ETS.store(mem, "ep-002", %{
  content: "解释了 React useEffect 的清理函数",
  user_id: "u-1",
  tags: ["domain:frontend", "topic:react"]
})

CMDC.Memory.ETS.store(mem, "ep-003", %{
  content: "讨论了今年的销售目标",
  user_id: "u-1",
  tags: ["domain:finance", "topic:planning", "year:2026"]
})

# 2) 按 tags_any 检索(任一交集即匹配)
{:ok, results} = CMDC.Memory.ETS.search(mem, "",
  filters: [{:user_id, "u-1"}, {:tags_any, ["domain:finance"]}])
# => ep-001 + ep-003(两条 finance 域记忆)

# 3) 按 tags_all 检索(全包含才匹配)
{:ok, results} = CMDC.Memory.ETS.search(mem, "",
  filters: [{:user_id, "u-1"}, {:tags_all, ["domain:finance", "year:2026"]}])
# => 仅 ep-003

# 4) 空 tags 列表跳过过滤(条件构造便利)
maybe_year_filter = if year, do: [{:tags_any, ["year:#{year}"]}], else: []

{:ok, results} = CMDC.Memory.ETS.search(mem, "销售",
  filters: [{:user_id, "u-1"}] ++ maybe_year_filter)
```

### 与 cmdc_memory_pg 0.1.2 配合(生产推荐)

`cmdc_memory_pg 0.1.2` 实现同样的 `:tags_any` / `:tags_all` 操作符,底层走
PostgreSQL `text[]` 列 + GIN 索引:

```elixir
# config/runtime.exs
config :cmdc, :memory_backend, CMDCMemoryPg.EpisodicMemoryBackend

# Plugin 配置
plugins: [
  {CMDC.Plugin.Builtin.EpisodicMemory,
   memory_module: CMDCMemoryPg.EpisodicMemoryBackend,
   top_k: 5,
   tag_strategy: :match_user_query}    # 自定义:把 user query 解析为 tag filter
]
```

Migration(首次升级到 0.1.2 时跑一次):

```bash
$ mix ecto.migrate
# 自动应用 20260525_add_tags_to_cmdc_episodic_memories.exs
# 加 text[] 列 + GIN 索引,老数据全部 default '{}'
```

### Anthropic Agent Skills 对接

Anthropic 标准 SKILL.md frontmatter 含 `tags:` 字段:

```yaml
---
name: data-analysis
description: 数据分析助手
tags: [data, sql, analytics]
---
```

cmdc Skill 加载时把 frontmatter tags 透传到 Skill struct(`CMDC.Skill.tags`),
Plugin 可在 `before_prompt` hook 把当前激活的 Skill tags 作为 filter 注入:

```elixir
defmodule MyApp.Plugins.SkillTaggedMemory do
  @behaviour CMDC.Plugin

  @impl true
  def handle_event({:before_prompt, _msg}, state, ctx) do
    active_skill_tags = Map.get(ctx.user_data, :active_skill_tags, [])

    {:ok, hits} =
      CMDCMemoryPg.EpisodicMemoryBackend.search(nil, "",
        filters: [
          {:user_id, ctx.user_data.user_id},
          {:tags_any, active_skill_tags}
        ],
        limit: 5
      )

    # 把命中记忆注入 system prompt(略)
    {:continue, state}
  end
end
```

### Filter DSL 不引入说明(与 RFC #1 对齐)

v0.6 起 cmdc **明确不引入** `{:and, [...]}` / `{:or, [...]}` / `{:eq, k, v}`
等结构化 filter DSL evaluator。已有 keyword filters `[{key, value}]` 永久保留,
语义恒为"AND 等值"。如需复杂条件,集成方在 backend 实现侧自行扩展私有
filter atom(如 `{:date_range, from, to}`),不要求 cmdc 主库统一抽象。

理由:
- 让 backend 实现复杂度可控(无需实现通用 evaluator)
- 99% 用例 tags_any/all + 单字段等值已足够
- 复杂 SQL 查询本来就走自定义 backend / SQL view,不是 Memory behaviour 适用场景

---

## 15. 长会话 Plugin 热更(v0.6+ `update_plugin_opts/3`)

`CMDC.update_plugin_opts/3` 允许在长会话进行中调整已注册 Plugin 的运行时参数,
无需停 Agent 重启。典型场景:

- **CostGuard 阈值随用户付费等级动态收紧/放开**
- **HumanApproval 白名单运维期间临时调整**
- **EventLogger 日志级别即时升降级**
- **MemoryLoader top-k 在不同任务阶段不同档位**

### 工作流程

1. cast 立即返回 `:ok`,不阻塞调用方
2. 同一 plugin 多次更新走 LWW 覆盖语义(后写覆盖前写)
3. Agent 进入 `idle` 状态时统一 flush;`running` / `streaming` /
   `executing_tools` 期间走 gen_statem `:postpone` 推迟,到 idle 后自动重投递
4. 每条 flush 先走 `:before_plugin_opts_update` Plugin hook
   (其他 Plugin 可 `:abort` / `:skip` 拦截,典型用途:SecurityGuard 校验合法性)
5. 通过 hook 后调用 plugin 的 `on_config_update/2`(若实现)或默认替换语义
6. 单条 flush 完成 emit `{:plugin_opts_updated, %{plugin, success?, error, mode}}`

### 默认替换语义(未实现 `on_config_update/2`)

```elixir
defmodule MyApp.Plugins.SimpleConfig do
  @behaviour CMDC.Plugin
  @impl true
  def init(opts), do: {:ok, Map.new(opts)}    # state 与 opts 等价
  @impl true
  def priority, do: 500
  @impl true
  def handle_event(_event, state, _ctx), do: {:continue, state}
end
```

`CMDC.update_plugin_opts(session, SimpleConfig, level: :debug)` 自动:

- new_opts 是 keyword + old_state 是 map → `Map.merge(old_state, Map.new(new_opts))`
- 其他情况 → new_opts 直接替换为新 state

### 自定义 `on_config_update/2`(保留运行时累积值)

```elixir
defmodule MyApp.Plugins.CostGuard do
  @behaviour CMDC.Plugin

  @impl true
  def init(opts) do
    {:ok,
     %{
       max_cost_usd: Keyword.get(opts, :max_cost_usd, 5.0),
       soft_warning_pct: Keyword.get(opts, :soft_warning_pct, 0.8),
       # 运行时累积:消耗成本 + 已发警告次数(热更不能丢)
       accrued_usd: 0.0,
       warnings_sent: 0
     }}
  end

  @impl true
  def priority, do: 200

  @impl true
  def handle_event({:after_response, _msg}, state, ctx) do
    state = %{state | accrued_usd: ctx.cost_usd}

    cond do
      state.accrued_usd >= state.max_cost_usd ->
        {:abort, "成本超限 $#{state.accrued_usd}/$#{state.max_cost_usd}", state}

      state.accrued_usd >= state.max_cost_usd * state.soft_warning_pct ->
        {:emit, {:cost_warning, %{accrued: state.accrued_usd}},
         %{state | warnings_sent: state.warnings_sent + 1}}

      true ->
        {:continue, state}
    end
  end

  def handle_event(_event, state, _ctx), do: {:continue, state}

  # 热更 callback:合并新阈值,保留运行时累积值
  @impl true
  def on_config_update(new_opts, state) when is_list(new_opts) do
    new_max = Keyword.get(new_opts, :max_cost_usd, state.max_cost_usd)
    new_pct = Keyword.get(new_opts, :soft_warning_pct, state.soft_warning_pct)

    cond do
      new_max <= 0 ->
        {:error, :invalid_max_cost}

      new_pct < 0.0 or new_pct > 1.0 ->
        {:error, :invalid_warning_pct}

      true ->
        {:ok, %{state | max_cost_usd: new_max, soft_warning_pct: new_pct}}
    end
  end
end
```

调用方在任意时刻调整阈值:

```elixir
:ok = CMDC.update_plugin_opts(session, MyApp.Plugins.CostGuard,
        max_cost_usd: 20.0, soft_warning_pct: 0.9)

# 订阅生效信号
receive do
  {:cmdc_event, _sid,
   {:plugin_opts_updated,
    %{plugin: MyApp.Plugins.CostGuard, success?: true, mode: :callback}}} ->
    :ok

  {:cmdc_event, _sid,
   {:plugin_opts_updated,
    %{plugin: MyApp.Plugins.CostGuard, success?: false, error: reason}}} ->
    Logger.error("热更失败: #{inspect(reason)}")
end
```

### `:before_plugin_opts_update` hook 拦截

SecurityGuard 等安全类 Plugin 可拦截非法热更:

```elixir
defmodule MyApp.Plugins.OpsGuard do
  @behaviour CMDC.Plugin
  @impl true
  def init(opts), do: {:ok, %{allowed_ops: Keyword.get(opts, :allowed_ops, [])}}
  @impl true
  def priority, do: 50   # 优先级靠前,先拦截

  @impl true
  def handle_event({:before_plugin_opts_update, plugin_mod, new_opts}, state, ctx) do
    operator = ctx.user_data[:operator_id]

    if operator in state.allowed_ops do
      {:continue, state}
    else
      {:abort, "未授权操作员 #{operator} 试图热更 #{inspect(plugin_mod)}", state}
    end
  end

  def handle_event(_event, state, _ctx), do: {:continue, state}
end
```

非授权 operator 触发的 `update_plugin_opts/3` 会被拦截,emit:

```elixir
{:plugin_opts_updated,
 %{plugin: MyApp.Plugins.CostGuard,
   success?: false,
   error: {:hook_aborted, "未授权..."},
   mode: :blocked}}
```

目标 plugin state **完全不变**,符合 fail-safe 语义。

### 何时 *不* 使用热更

- **结构性配置变更**(plugin 模块间互引用 / 启动序列依赖)→ 走 `CMDC.stop/1` + 重启
- **新增/移除 plugin** → 用 `attach_tool/2` 范式的未来 plugin attach/detach API
  (v0.6 未覆盖);现阶段重启 Agent 最稳妥
- **跨 cycle 强一致 audit**(对每个 turn 都要保证用统一阈值审计)→ 改用 `:after_turn`
  hook 注入静态配置版本号

---

## 16. AuditEvent 完整版集成(v0.6+,15 EventBus + 18 telemetry + resolver)

v0.5.4 引入轻量版 `AuditEvent` struct + `from_turn/2` 高价值快路径;v0.6 收敛版
扩展为 **15 个核心 EventBus 事件 + 18 个 telemetry 事件 + actor/target/tags
resolver 注入 + schema_version 2(加 tags 字段)**。本节展示三层订阅完整集成示例。

### 完整字段(schema v2)

| 字段 | 类型 | 说明 |
|---|---|---|
| `actor_type` | atom | `:agent / :user / :plugin / :tool / :system` |
| `actor_id` | binary \| nil | session_id / account_id / plugin_name / tool_name |
| `action` | binary | 动作命名空间字符串(`"agent.session.start"` 等) |
| `target_type` | binary \| nil | `"agent_session" / "tool" / "approval" / "checkpoint" / ...` |
| `target_id` | binary \| nil | 目标 ID |
| `occurred_at` | DateTime.t() | UTC 时间 |
| `before` / `after` | term \| nil | 变更前后(model_switched 等用) |
| `metadata` | map | 附加字段(duration_ms / outcome / token_usage / ...) |
| `tags` | [String.t()] | **v0.6 新增** — resolver 可注入业务 tags |
| `schema_version` | pos_integer | **v0.6 升 2** — 兼容老 v1 消费者忽略 tags |

### 三层订阅方案

**层 1:Plugin `:after_turn` 高价值快路径**(覆盖 80% 审计场景)

```elixir
defmodule MyApp.Plugins.TurnAuditSink do
  @behaviour CMDC.Plugin
  @impl true
  def init(opts), do: {:ok, %{repo: opts[:repo], tenant: opts[:tenant]}}
  @impl true
  def priority, do: 900

  @impl true
  def handle_event({:after_turn, payload}, state, ctx) do
    audit = CMDC.AuditEvent.from_turn(payload, ctx,
      tags_resolver: fn _evt, _ctx -> ["tenant:" <> state.tenant] end
    )

    state.repo.insert!(audit)
    {:continue, state}
  end

  def handle_event(_event, state, _ctx), do: {:continue, state}
end
```

**层 2:EventBus 订阅 — 细粒度 15 事件**

```elixir
defmodule MyApp.AuditBridge do
  use GenServer

  def start_link(opts), do: GenServer.start_link(__MODULE__, opts, name: __MODULE__)

  @impl true
  def init(opts) do
    CMDC.subscribe(opts[:session])
    {:ok, %{repo: opts[:repo], tenant: opts[:tenant]}}
  end

  @impl true
  def handle_info({:cmdc_event, sid, event}, state) do
    ctx = %{session_id: sid, user_data: %{tenant: state.tenant}}

    case CMDC.AuditEvent.from_event(event, ctx,
           tags_resolver: fn _evt, ctx -> ["tenant:" <> ctx.user_data.tenant] end,
           actor_resolver: fn ctx, default -> resolve_actor(event, ctx, default) end
         ) do
      nil -> :ok                                # 未覆盖事件略过
      audit -> state.repo.insert!(audit)
    end

    {:noreply, state}
  end

  # 把 :tool_blocked / :tool_execution_* 的 actor 改为业务账号
  defp resolve_actor({:tool_blocked, _, _, _}, ctx, _default),
    do: {:plugin, "security_guard.#{ctx.user_data.tenant}"}

  defp resolve_actor(_, _ctx, _default), do: nil   # 走默认
end
```

**层 3:Telemetry 订阅 — 18 个细粒度系统事件**

```elixir
defmodule MyApp.TelemetryAudit do
  @events [
    [:cmdc, :agent, :turn, :start],
    [:cmdc, :agent, :turn, :stop],
    [:cmdc, :llm, :request, :start],
    [:cmdc, :llm, :request, :stop],
    [:cmdc, :tool, :exec, :start],
    [:cmdc, :tool, :exec, :stop],
    [:cmdc, :plugin, :pipeline, :start],
    [:cmdc, :plugin, :pipeline, :stop],
    [:cmdc, :plugin, :crash],
    [:cmdc, :compactor, :run, :start],
    [:cmdc, :compactor, :run, :stop],
    [:cmdc, :checkpoint, :save],
    [:cmdc, :checkpoint, :load],
    [:cmdc, :subagent, :start],
    [:cmdc, :subagent, :stop],
    [:cmdc, :agent, :hibernate, :configured],
    [:cmdc, :provider, :registry, :lookup],
    [:cmdc, :provider, :registry, :register]
  ]

  def attach(repo, tenant) do
    :telemetry.attach_many(
      "cmdc-audit-#{tenant}",
      @events,
      fn name, meas, meta, _ ->
        case CMDC.AuditEvent.from_telemetry(name, meas, meta,
               tags_resolver: fn _evt, _meta -> ["tenant:#{tenant}", "via:telemetry"] end
             ) do
          nil -> :ok
          audit -> repo.insert!(audit)
        end
      end,
      nil
    )
  end
end
```

### Postgres schema 参考(扩 v2 tags 字段)

```sql
CREATE TABLE audit_logs (
  id              BIGSERIAL PRIMARY KEY,
  actor_type      TEXT NOT NULL,
  actor_id        TEXT,
  action          TEXT NOT NULL,
  target_type     TEXT,
  target_id       TEXT,
  occurred_at     TIMESTAMPTZ NOT NULL,
  before          JSONB,
  after           JSONB,
  metadata        JSONB NOT NULL DEFAULT '{}'::jsonb,
  tags            TEXT[] NOT NULL DEFAULT ARRAY[]::TEXT[],      -- v0.6+
  schema_version  INTEGER NOT NULL DEFAULT 2,
  inserted_at     TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX audit_logs_actor_id_idx     ON audit_logs (actor_id);
CREATE INDEX audit_logs_action_idx       ON audit_logs (action);
CREATE INDEX audit_logs_occurred_at_idx  ON audit_logs (occurred_at DESC);
CREATE INDEX audit_logs_tags_gin_idx     ON audit_logs USING GIN (tags);    -- v0.6+
CREATE INDEX audit_logs_target_idx       ON audit_logs (target_type, target_id);
```

Ecto schema 示例:

```elixir
defmodule MyApp.AuditLog do
  use Ecto.Schema
  import Ecto.Changeset

  schema "audit_logs" do
    field :actor_type, Ecto.Enum, values: [:agent, :user, :plugin, :tool, :system]
    field :actor_id, :string
    field :action, :string
    field :target_type, :string
    field :target_id, :string
    field :occurred_at, :utc_datetime
    field :before, :map
    field :after, :map
    field :metadata, :map, default: %{}
    field :tags, {:array, :string}, default: []
    field :schema_version, :integer, default: 2

    timestamps(updated_at: false)
  end

  def changeset(audit_log, %CMDC.AuditEvent{} = audit) do
    audit_log
    |> cast(Map.from_struct(audit), [
      :actor_type, :actor_id, :action, :target_type, :target_id,
      :occurred_at, :before, :after, :metadata, :tags, :schema_version
    ])
    |> validate_required([:actor_type, :action, :occurred_at])
  end
end
```

### 双轨等价处理(避免重复落库)

EventBus `{:tool_execution_start, ...}` 与 telemetry `[:cmdc, :tool, :exec, :start]`
**会同时触发**(同一次工具调用,Agent 既广播 EventBus 又 emit telemetry)。如果
同时订阅两层,落库会重复。两种解决方式:

**方式 A**:只订阅一层(推荐 — 简单)

EventBus 更适合业务审计(含 `:approval_*` / `:subagent_*` 等业务事件);
telemetry 更适合系统观测(含 `:plugin_pipeline_*` / `:checkpoint_*` / `:hibernate_*`)。
通常 **混合订阅**:Plugin `:after_turn` 落 turn 级 + telemetry 落系统级,**EventBus
作为补充**只订阅 `:approval_*` / `:subagent_*` 等业务事件。

**方式 B**:dedup 落库(去重)

`action + target_id + 时间窗 ±50ms` 作为 dedup key,数据库唯一约束兜底。

### 30+ 业务事件按反馈滚动加(v0.6.x patch)

未在 v0.6 收敛集的事件(流式 `:message_delta` / `:thinking_delta` /
`:status_update` / `:tool_calls` / `:context_overflow` / `:tool_blocked` 等)
返回 `nil`。集成方有两种处理:

1. **用 `actor_resolver` / `target_resolver` 自行扩展**:在订阅层加一个 case 匹配
   自定义事件,手工构造 `%CMDC.AuditEvent{}`
2. **提需求等 v0.6.x patch**:按真实反馈滚动加,避免一次性 47 全集过度工程

---

## 下一步

- [升级指南](upgrading.html) — v0.2 → v0.3 → v0.4 用户感知到的兼容边界