Skip to main content

README.md

# CMDC Orchestrator

> CMDC 多 Agent 编排引擎 — WorkflowSpec + Run API + DAG 驱动的 AgentOps 编排。

`cmdc_orchestrator`[`cmdc`](https://hex.pm/packages/cmdc) 之上提供「图驱动」的多 Agent 编排能力:把
工作流描述成 **DAG**(节点 + 边),由执行器按拓扑顺序调度,并把上游节点的输出
自动注入到下游节点的 prompt 中。v0.4 起新增可持久化 `WorkflowSpec`,用于企业
Workflow Designer、发布校验、dry run 和 Trace Viewer 接入。v0.5 新增异步 Run API、
RunStore 事件账本、condition/signal 分支和 `output_key` 上下文合并。

理论基石参考 *Agentic Design Patterns* 第 2 / 3 / 7 / 12 / 15 章(路由、并行化、
多 Agent 协作、异常处理与恢复、Inter-Agent 通信)。

## 安装

monorepo 内部通过 path 依赖:

```elixir
def deps do
  [
    {:cmdc_orchestrator, path: "../cmdc_orchestrator"}
  ]
end
```

## 核心概念

| 概念 | 说明 |
| --- | --- |
| **Orchestration** | 一个 DAG(有向无环图),定义节点和依赖 |
| **WorkflowSpec** | v0.4 新增的可序列化工作流规格,面向保存、校验、展示、发布 |
| **Node** | DAG 节点,9 种内置类型:`:agent` / `:aggregator` / `:router` / `:condition` / `:gate` / `:tool` / `:eval_gate` / `:debate` / `:hierarchy` |
| **Edge** | 节点间的数据流依赖 `%{from:, to:, branch?:, signal?:}` |
| **Run** | 一次编排执行,v0.5 起可通过 `start_run/2` 取得异步句柄 |
| **Runtime** | DAG 全程共享的运行时容器(v0.3+),管理 Agent 会话池与 SubAgent 生命周期 |

## WorkflowSpec(v0.4)

`WorkflowSpec` 是面向企业平台保存和发布的稳定配置层;旧 `%DAG{}` 仍可直接
`execute/2`
```elixir
spec = %{
  "workflow_id" => "wf.contract_review",
  "version" => "2026.06.001",
  "nodes" => [
    %{"id" => "draft", "type" => "agent", "config" => %{"prompt" => "审阅合同"}},
    %{"id" => "gate", "type" => "gate", "config" => %{"criteria" => ["完整"]}}
  ],
  "edges" => [%{"from" => "draft", "to" => "gate"}]
}

{:ok, workflow, warnings} = CMDCOrchestrator.validate_workflow(spec)
{:ok, dry_report} = CMDCOrchestrator.dry_run(workflow)
{:ok, dag} = CMDCOrchestrator.to_dag(workflow)
{:ok, results} = CMDCOrchestrator.execute(workflow, agent_opts)
```

校验覆盖稳定 node id、edge 引用、DAG 环路、router branch、节点 preflight、
孤岛节点 warning,以及匿名函数 / pid / reference / tuple 等不可持久化配置。

## Run API(v0.5)

企业 Run Console / Trace Viewer 推荐使用异步 Run API:

```elixir
{:ok, run_id} =
  CMDCOrchestrator.start_run(workflow,
    input: %{"amount" => 128},
    trigger_source: :webhook,
    metadata: %{tenant_id: "t_001"}
  )

{:ok, %{run: run, node_runs: node_runs, events: events}} =
  CMDCOrchestrator.status(run_id)

{:ok, final_run} = CMDCOrchestrator.await_run(run_id, timeout: 120_000)
{:ok, events} = CMDCOrchestrator.events(run_id, type: "orchestrator.node.completed")
{:ok, _cancelled} = CMDCOrchestrator.cancel(run_id, :user_cancelled)
```

默认 RunStore 是 ETS 后端,适合开发和测试。企业 Phoenix 平台可以实现
`CMDCOrchestrator.RunStore` behaviour,把 run、node run、event ledger 映射到
自己的 Ecto schema、租户、RBAC 和审计系统。

`run_sync/2` 是新 Run API 的同步 wrapper,成功时仍返回旧 `execute/2` 风格
`{:ok, results}`
```elixir
{:ok, results} = CMDCOrchestrator.run_sync(workflow, timeout: 120_000)
```

## 节点类型

### `:agent`

启动一个 `CMDC` Agent 执行配置好的 prompt,自动把 `dep_results` 拼到 prompt
末尾作为「上游节点输出」上下文。

```elixir
%{
  id: "research",
  type: :agent,
  config: %{
    prompt: "调研 AI Agent 最新进展",
    system_prompt: "你是研究员",
    max_turns: 5,
    mode: :standalone        # v0.3+::standalone | :pool | :subagent
  }
}
```

#### v0.3 新增:三种执行模式

| mode | 说明 | 适用场景 |
| --- | --- | --- |
| `:standalone`(默认) | 每次 ephemeral 会话,不复用 | 单点任务,无需历史 |
| `:pool` |`pool_key` 共享会话,对话累积 | Debater / 多轮审稿等需要"角色记忆" |
| `:subagent` | 通过 `Runtime` 注册独立长生命周期会话 | 需要 SubAgent 监督和生命周期跟踪 |

`:pool` 模式可指定 `config[:pool_key]`(默认 `node.id`);`:subagent` 模式
`Runtime` 在 DAG 结束时统一回收。

### `:aggregator`

合并多个上游结果。三种策略:`"concat"` 拼接 / `"merge"` 合并 map / `"vote"` 多数
投票。

```elixir
%{id: "merge", type: :aggregator, config: %{strategy: "concat"}}
```

### `:router`

按策略分发到不同分支,配合边的 `:branch` 标签由 Executor 自动剪枝
(ADP 第 2 章 Routing)。三种策略:

```elixir
# 1. rule —— 顺序匹配 pattern
%{
  id: "classify",
  type: :router,
  config: %{
    strategy: "rule",
    rules: [
      %{pattern: "技术", branch: "tech_branch"},
      %{pattern: "商业", branch: "biz_branch"}
    ]
  }
}

# 2. random —— 在 branches 中均匀随机
%{config: %{strategy: "random", branches: ["a", "b", "c"]}}

# 3. llm(v0.3+)—— LLM 选择 + fallback
%{
  id: "intent_router",
  type: :router,
  config: %{
    strategy: "llm",
    branches: ["weather", "news", "search"],
    fallback: "search",
    router_agent: %{name: "Router", model: "openai:gpt-4o-mini"}
  }
}
```

### `:condition`(v0.5 新增)

确定性条件节点,返回 `"true"` / `"false"` signal,下游边按 `:signal``:branch`
匹配:

```elixir
%{
  id: "risk_check",
  type: :condition,
  config: %{
    left: "{{amount}}",
    operator: "gte",
    right: 100_000,
    output_key: "risk_check"
  }
}
```

支持算子:`eq` / `neq` / `gt` / `gte` / `lt` / `lte` / `contains` /
`not_contains` / `is_truthy` / `is_falsy`
边示例:

```elixir
[
  %{from: "risk_check", to: "legal_review", signal: "true"},
  %{from: "risk_check", to: "auto_report", signal: "false"}
]
```

### `:gate`

质量检查点,criteria 全部通过才继续,否则中止整个 DAG。

```elixir
%{id: "review", type: :gate, config: %{criteria: ["accurate", "concise"]}}
```

### `:tool`(v0.4 新增)

直接调用已注册的 `CMDC.Tool` 模块,输入可从上游节点结果模板渲染。

```elixir
%{
  id: "rag_search",
  type: :tool,
  config: %{
    tool_name: "enterprise_rag_search",
    args: %{"query" => "{{classify.query}}"},
    output_key: "evidence"
  }
}
```

运行时通过 `agent_opts[:tool_registry]` 注入:

```elixir
CMDCOrchestrator.execute(dag,
  tool_registry: %{"enterprise_rag_search" => MyApp.Tools.RAGSearch}
)
```

### `:eval_gate`(v0.4 新增)

离线评测门禁,适合 AgentSpec / RAG preset / Workflow 发布前阻断。

```elixir
%{
  id: "release_gate",
  type: :eval_gate,
  config: %{
    metrics: %{groundedness: 0.91, unauthorized_source_count: 0},
    thresholds: %{groundedness: 0.85, unauthorized_source_count: 0}
  }
}
```

也可以配置 `gate_module` 委托给企业侧 `CMDCEval``CMDCRAGArcana.Eval.Gate` 风格模块的 `check/2`
### `:debate`(v0.3 新增)

多 Agent 辩论 + Judge 模式,对应 ADP 第 7 章 Multi-Agent Collaboration。

```elixir
%{
  id: "debate",
  type: :debate,
  config: %{
    topic: "Elixir vs Python on Agents?",
    debaters: [%{name: "ProElixir"}, %{name: "ProPython"}],
    judge: %{name: "Judge"},
    max_rounds: 3,
    consensus_fn: fn rounds -> length(rounds) >= 2 end  # 可选提前终止
  }
}
```

- 每轮内部按列表顺序串行执行,后发言者能看到本轮先发言者的内容;
- 默认每个 debater / judge 走 `:pool` 模式,自动累积自己角色的对话历史;
- 输出包含 `topic / rounds / consensus / verdict / terminated_at_round`
### `:hierarchy`(v0.3 新增)

Manager → Workers → Synthesizer 三段式协作。

```elixir
%{
  id: "hier",
  type: :hierarchy,
  config: %{
    goal: "为新 SaaS 制定市场调研报告",
    manager: %{name: "PM"},                                 # 可选
    workers: [%{name: "Researcher"}, %{name: "Analyst"}],
    synthesizer: %{name: "Writer"},                          # 可选
    worker_assign: :round_robin,                             # 或 :pairwise
    max_parallel: 4
  }
}
```

可选 `:tasks` / `:split_fn` 完全跳过 LLM 拆解,直接用静态子任务列表跑离线测试。

## 使用示例

```elixir
dag = %CMDCOrchestrator.DAG{
  nodes: [
    %{id: "research", type: :agent, config: %{prompt: "调研 AI Agent"}},
    %{id: "write", type: :agent, config: %{prompt: "写一篇 800 字博客"}},
    %{id: "review", type: :gate, config: %{criteria: ["完整"]}}
  ],
  edges: [
    %{from: "research", to: "write"},
    %{from: "write", to: "review"}
  ]
}

{:ok, results} = CMDCOrchestrator.execute(dag,
  model: "openai:gpt-4o-mini",
  api_key: System.fetch_env!("OPENAI_API_KEY")
)

results["write"]
```

执行失败时返回结构化错误,方便上层 UI 聚焦失败点:

```elixir
{:error, %{node_id: "review", reason: "门禁未通过: 完整", completed: %{...}}}
```

## 与 CMDC v0.2 的集成

`:agent` 节点支持透传 `cmdc` 0.2 的全部 Agent 选项:`:user_data` / `:prompt_mode` /
`:plugins` / `:tools` / `:provider` / `:model` 等。

v0.3 通过 `Runtime` 提供原生的会话池和 SubAgent 支持,**无需**用户手工管理
`CMDC.SubAgent.Supervisor`
- `:pool` 模式 → Runtime 内部按 `pool_key` 复用会话;
- `:subagent` 模式 → Runtime 注册独立长生命周期会话,DAG 结束统一 `CMDC.stop/1`
## 内置模板(v0.4)

```elixir
CMDCOrchestrator.Templates.names()
#=> ["contract_review", "order_delay_diagnosis", "ticket_triage", "rag_release_gate", "debate_review"]

spec = CMDCOrchestrator.Templates.get!("contract_review")
{:ok, _report} = CMDCOrchestrator.dry_run(spec)
```

模板是 JSON-ready 的 `WorkflowSpec` 草稿,可作为 Workflow Designer 初始化数据。

## Telemetry(v0.4)

执行器发出稳定事件,metadata 只包含结构化摘要,不放 prompt、chunk 或完整工具输出:

- `[:cmdc_orchestrator, :run, :start | :stop | :exception]`
- `[:cmdc_orchestrator, :node, :start | :stop | :exception]`

这些事件可由 `cmdc_gateway` 或企业 AgentOps Trace Viewer 映射为 SSE / timeline。

**计划中**
- v0.5:Async Run API、RunStore、event replay、cancel/status;
- v0.6:gen_statem 可恢复执行器、checkpoint、retry/timeout/fallback、human_task。

## 开发

```bash
mix deps.get
mix test
mix format --check-formatted
mix compile --warnings-as-errors
mix credo --strict
```

## 路线图

- v0.1 ✅ 基础 DAG + 4 种节点 + 串行执行
- v0.2 ✅ 真并行(拓扑分层 + `Task.async_stream`)+ Router 真剪枝 + AgentNode Recovery
- v0.3 ✅ Runtime + AgentNode 三模式(standalone/pool/subagent)+
  DebateNode + HierarchyNode + RouterNode LLM 策略
- v0.4 ✅ WorkflowSpec + validate/dry_run/to_dag + Node Registry + Tool/EvalGate +
  Templates + Telemetry
- v0.5 计划:Async Run API + RunStore 接缝
- v0.6 计划:可恢复 Executor + human_task / 审批暂停恢复

详见 [`CHANGELOG.md`](CHANGELOG.md)`example/multi_agent_debate_demo.exs`
## 许可

Apache-2.0