Skip to main content

README.md

# CMDC Orchestrator

> CMDC 多 Agent 编排引擎 — WorkflowSpec + Run API + DAG 驱动的 AgentOps 编排。

`cmdc_orchestrator`[`cmdc`](https://hex.pm/packages/cmdc) 之上提供「图驱动」的多 Agent 编排能力:把
工作流描述成 **DAG**(节点 + 边),由执行器按拓扑顺序调度,并把上游节点的输出
自动注入到下游节点的 prompt 中。v0.4 起新增可持久化 `WorkflowSpec`,用于企业
Workflow Designer、发布校验、dry run 和 Trace Viewer 接入。v0.5 新增异步 Run API、
RunStore 事件账本、condition/signal 分支和 `output_key` 上下文合并。v0.6 补齐
`gen_statem` 可恢复执行器、运行控制 API、retry/timeout/fallback policy、
`:fork` / `:join` 聚合、`:human_task` 暂停审批恢复和 AgentOps 企业接入契约。

理论基石参考 *Agentic Design Patterns* 第 2 / 3 / 7 / 12 / 15 章(路由、并行化、
多 Agent 协作、异常处理与恢复、Inter-Agent 通信)。

## 安装

Hex 依赖:

```elixir
def deps do
  [
    {:cmdc, "~> 0.5"},
    {:cmdc_orchestrator, "~> 0.6"}
  ]
end
```

monorepo 内部通过 path 依赖:

```elixir
def deps do
  [
    {:cmdc_orchestrator, path: "../cmdc_orchestrator"}
  ]
end
```

## 核心概念

| 概念 | 说明 |
| --- | --- |
| **Orchestration** | 一个 DAG(有向无环图),定义节点和依赖 |
| **WorkflowSpec** | v0.4 新增的可序列化工作流规格,面向保存、校验、展示、发布 |
| **Node** | DAG 节点,12 种内置类型:`:agent` / `:aggregator` / `:router` / `:condition` / `:human_task` / `:fork` / `:join` / `:gate` / `:tool` / `:eval_gate` / `:debate` / `:hierarchy` |
| **Edge** | 节点间的数据流依赖 `%{from:, to:, branch?:, signal?:}` |
| **Run** | 一次编排执行,v0.5 起可通过 `start_run/2` 取得异步句柄 |
| **Runtime** | DAG 全程共享的运行时容器(v0.3+),管理 Agent 会话池与 SubAgent 生命周期 |

## WorkflowSpec(v0.4)

`WorkflowSpec` 是面向企业平台保存和发布的稳定配置层;旧 `%DAG{}` 仍可直接
`execute/2`
```elixir
spec = %{
  "workflow_id" => "wf.contract_review",
  "version" => "2026.06.001",
  "nodes" => [
    %{"id" => "draft", "type" => "agent", "config" => %{"prompt" => "审阅合同"}},
    %{"id" => "gate", "type" => "gate", "config" => %{"criteria" => ["完整"]}}
  ],
  "edges" => [%{"from" => "draft", "to" => "gate"}]
}

{:ok, workflow, warnings} = CMDCOrchestrator.validate_workflow(spec)
{:ok, dry_report} = CMDCOrchestrator.dry_run(workflow)
{:ok, dag} = CMDCOrchestrator.to_dag(workflow)
{:ok, results} = CMDCOrchestrator.execute(workflow, agent_opts)
```

校验覆盖稳定 node id、edge 引用、DAG 环路、router branch、节点 preflight、
孤岛节点 warning,以及匿名函数 / pid / reference / tuple 等不可持久化配置。

## Run API(v0.5+)

企业 Run Console / Trace Viewer 推荐使用异步 Run API:

```elixir
{:ok, run_id} =
  CMDCOrchestrator.start_run(workflow,
    input: %{"amount" => 128},
    trigger_source: :webhook,
    metadata: %{tenant_id: "t_001"}
  )

{:ok, %{run: run, node_runs: node_runs, events: events}} =
  CMDCOrchestrator.status(run_id)

{:ok, final_run} = CMDCOrchestrator.await_run(run_id, timeout: 120_000)
{:ok, events} =
  CMDCOrchestrator.events(run_id,
    type: "orchestrator.node.completed",
    limit: 100,
    after_id: last_event_id
  )
{:ok, _paused} = CMDCOrchestrator.pause_run(run_id, :operator_pause)
{:ok, _running} = CMDCOrchestrator.resume_run(run_id, idempotency_key: "resume-req-001")
{:ok, _cancelled} = CMDCOrchestrator.cancel(run_id, :user_cancelled)
```

`start_run/2` 支持 `:idempotency_key`,用于 HTTP 重试或重复点击时返回同一个 run:

```elixir
{:ok, run_id} = CMDCOrchestrator.start_run(workflow, idempotency_key: "trigger-001")
{:ok, ^run_id} = CMDCOrchestrator.start_run(workflow, idempotency_key: "trigger-001")
```

v0.6 M1 开始,Run Console 可使用:

- `pause_run/3`:当前节点结束后停在 `:paused`- `resume_run/2`:进程存在则唤醒,不存在则从 RunStore 的 DAG snapshot 重建;
- `retry_run/2`:从失败 run 的恢复点继续;
- `retry_node/3`:清掉指定节点及其下游结果后重跑;
- `rerun/2`:基于旧 run 的 DAG snapshot 创建新 run。

v0.6 M2 增加 human_task 决策 API:

```elixir
{:ok, _running} =
  CMDCOrchestrator.complete_human_task(run_id, "legal-review",
    actor_ref: "role:legal",
    idempotency_key: "approval-click-001"
  )

{:ok, _waiting} =
  CMDCOrchestrator.submit_human_task_decision(
    run_id,
    "legal-review",
    %{action: :progress, comment: "需要补充附件"},
    actor_ref: "role:legal"
  )

{:ok, _running} = CMDCOrchestrator.expire_waiting_tasks(run_id)
```

默认 RunStore 是 ETS 后端,适合开发和测试。v0.6 M1 还提供
`CMDCOrchestrator.RunStore.Checkpoint`,可复用 `CMDC.Checkpoint.Backend.ETS/DETS`
`cmdc_memory_pg` 的 checkpoint backend 做轻持久化。企业 Phoenix 平台可以实现
`CMDCOrchestrator.RunStore` behaviour,把 run、node run、event ledger 映射到
自己的 Ecto schema、租户、RBAC 和审计系统。

企业 Phoenix AgentOps 接入建议见
[`guides/agentops_integration.md`](guides/agentops_integration.md),覆盖 Workflow
Designer JSON DSL、RunService API、Approval Center、Gateway SSE 事件映射、
RunStore Ecto 注意事项、Oban/Temporal 边界和 Hive 迁移对照。

`run_sync/2` 是新 Run API 的同步 wrapper,成功时仍返回旧 `execute/2` 风格
`{:ok, results}`
```elixir
{:ok, results} = CMDCOrchestrator.run_sync(workflow, timeout: 120_000)
```

## 节点类型

### `:agent`

启动一个 `CMDC` Agent 执行配置好的 prompt,自动把 `dep_results` 拼到 prompt
末尾作为「上游节点输出」上下文。

```elixir
%{
  id: "research",
  type: :agent,
  config: %{
    prompt: "调研 AI Agent 最新进展",
    system_prompt: "你是研究员",
    max_turns: 5,
    mode: :standalone        # v0.3+::standalone | :pool | :subagent
  }
}
```

#### v0.3 新增:三种执行模式

| mode | 说明 | 适用场景 |
| --- | --- | --- |
| `:standalone`(默认) | 每次 ephemeral 会话,不复用 | 单点任务,无需历史 |
| `:pool` |`pool_key` 共享会话,对话累积 | Debater / 多轮审稿等需要"角色记忆" |
| `:subagent` | 通过 `Runtime` 注册独立长生命周期会话 | 需要 SubAgent 监督和生命周期跟踪 |

`:pool` 模式可指定 `config[:pool_key]`(默认 `node.id`);`:subagent` 模式
`Runtime` 在 DAG 结束时统一回收。

### `:aggregator`

合并多个上游结果。三种策略:`"concat"` 拼接 / `"merge"` 合并 map / `"vote"` 多数
投票。

```elixir
%{id: "merge", type: :aggregator, config: %{strategy: "concat"}}
```

### `:router`

按策略分发到不同分支,配合边的 `:branch` 标签由 Executor 自动剪枝
(ADP 第 2 章 Routing)。三种策略:

```elixir
# 1. rule —— 顺序匹配 pattern
%{
  id: "classify",
  type: :router,
  config: %{
    strategy: "rule",
    rules: [
      %{pattern: "技术", branch: "tech_branch"},
      %{pattern: "商业", branch: "biz_branch"}
    ]
  }
}

# 2. random —— 在 branches 中均匀随机
%{config: %{strategy: "random", branches: ["a", "b", "c"]}}

# 3. llm(v0.3+)—— LLM 选择 + fallback
%{
  id: "intent_router",
  type: :router,
  config: %{
    strategy: "llm",
    branches: ["weather", "news", "search"],
    fallback: "search",
    router_agent: %{name: "Router", model: "openai:gpt-4o-mini"}
  }
}
```

### `:condition`(v0.5 新增)

确定性条件节点,返回 `"true"` / `"false"` signal,下游边按 `:signal``:branch`
匹配:

```elixir
%{
  id: "risk_check",
  type: :condition,
  config: %{
    left: "{{amount}}",
    operator: "gte",
    right: 100_000,
    output_key: "risk_check"
  }
}
```

支持算子:`eq` / `neq` / `gt` / `gte` / `lt` / `lte` / `contains` /
`not_contains` / `is_truthy` / `is_falsy`
边示例:

```elixir
[
  %{from: "risk_check", to: "legal_review", signal: "true"},
  %{from: "risk_check", to: "auto_report", signal: "false"}
]
```

### `:fork` / `:join`(v0.6 M1)

`fork` 把多个分支放入同一个可恢复 run 内并发执行;`join` 聚合分支结果。分支使用
branch-local context,完成后一次性写回 run snapshot:

```elixir
%{id: "split", type: :fork, config: %{output_key: "parallel_review"}}
%{id: "legal", type: :agent, config: %{prompt: "法务审阅"}}
%{id: "finance", type: :agent, config: %{prompt: "财务审阅"}}
%{id: "join", type: :join, config: %{mode: :all, fail_strategy: :fail_fast}}
```

`join.config[:mode]` 支持 `:all` / `:any` / `:n_of_m``fail_strategy` 支持 `:fail_fast` / `:wait_all` / `:tolerate``WorkflowSpec.validate/1`
会拒绝 dangling join、join 入边不足、嵌套 fork 和多 join 歧义。

### `:human_task`(v0.6 M2)

`human_task` 把 workflow 挂起到人工审批或补录等待点。库内只保存任务描述、
`assignee_refs`、决策聚合状态、resume signal 与事件账本;审批 UI、RBAC/ABAC、
通知和企业审计表由 Phoenix 平台实现。导入 `:human_approval` / `:approval`
会归一化为 `:human_task`
```elixir
%{
  id: "legal_review",
  type: :human_task,
  config: %{
    task_id: "legal-review",
    title: "法务审批:{{risk_review.summary}}",
    assignee_refs: ["role:legal"],
    approval_mode: :quorum,
    required_count: 2,
    timeout_ms: 86_400_000,
    on_timeout: :proceed_with_default,
    default_signal: "timeout",
    default_output: %{timed_out: true},
    output_key: "legal_review"
  }
}
```

决策支持 `:approve` / `:reject` / `:request_changes` / `:progress`。终局决策会把
human_task 输出写入 `Run.completed``Run.context_data`,再按 `"approved"` /
`"rejected"` / `"request_changes"` / timeout signal 继续下游;progress 只更新
任务和事件账本,不恢复 run。`CMDCOrchestrator.AssigneeResolver` 只定义可选
解析 behaviour,通用包不查询 Accounts/Roles。

### 统一 Policy(v0.6 M1)

节点可通过 `policy` 配置执行策略:

```elixir
%{
  id: "risk_check",
  type: :tool,
  config: %{tool_name: "risk_score"},
  policy: %{
    retries: 2,
    timeout_ms: 5_000,
    backoff: :exponential,
    on_error: :emit_signal,
    signal: "needs_review"
  }
}
```

`on_error` 支持 `:fail` / `:continue` / `:skip` / `:fallback` / `:emit_signal``:fallback` 可配 `fallback_output` 返回确定性输出。

### `:gate`

质量检查点,criteria 全部通过才继续,否则中止整个 DAG。

```elixir
%{id: "review", type: :gate, config: %{criteria: ["accurate", "concise"]}}
```

### `:tool`(v0.4 新增)

直接调用已注册的 `CMDC.Tool` 模块,输入可从上游节点结果模板渲染。

```elixir
%{
  id: "rag_search",
  type: :tool,
  config: %{
    tool_name: "enterprise_rag_search",
    args: %{"query" => "{{classify.query}}"},
    output_key: "evidence"
  }
}
```

运行时通过 `agent_opts[:tool_registry]` 注入:

```elixir
CMDCOrchestrator.execute(dag,
  tool_registry: %{"enterprise_rag_search" => MyApp.Tools.RAGSearch}
)
```

### `:eval_gate`(v0.4 新增)

离线评测门禁,适合 AgentSpec / RAG preset / Workflow 发布前阻断。

```elixir
%{
  id: "release_gate",
  type: :eval_gate,
  config: %{
    metrics: %{groundedness: 0.91, unauthorized_source_count: 0},
    thresholds: %{groundedness: 0.85, unauthorized_source_count: 0}
  }
}
```

也可以配置 `gate_module` 委托给企业侧 `CMDCEval``CMDCRAGArcana.Eval.Gate` 风格模块的 `check/2`
### `:debate`(v0.3 新增)

多 Agent 辩论 + Judge 模式,对应 ADP 第 7 章 Multi-Agent Collaboration。

```elixir
%{
  id: "debate",
  type: :debate,
  config: %{
    topic: "Elixir vs Python on Agents?",
    debaters: [%{name: "ProElixir"}, %{name: "ProPython"}],
    judge: %{name: "Judge"},
    max_rounds: 3,
    consensus_fn: fn rounds -> length(rounds) >= 2 end  # 可选提前终止
  }
}
```

- 每轮内部按列表顺序串行执行,后发言者能看到本轮先发言者的内容;
- 默认每个 debater / judge 走 `:pool` 模式,自动累积自己角色的对话历史;
- 输出包含 `topic / rounds / consensus / verdict / terminated_at_round`
### `:hierarchy`(v0.3 新增)

Manager → Workers → Synthesizer 三段式协作。

```elixir
%{
  id: "hier",
  type: :hierarchy,
  config: %{
    goal: "为新 SaaS 制定市场调研报告",
    manager: %{name: "PM"},                                 # 可选
    workers: [%{name: "Researcher"}, %{name: "Analyst"}],
    synthesizer: %{name: "Writer"},                          # 可选
    worker_assign: :round_robin,                             # 或 :pairwise
    max_parallel: 4
  }
}
```

可选 `:tasks` / `:split_fn` 完全跳过 LLM 拆解,直接用静态子任务列表跑离线测试。

## 使用示例

```elixir
dag = %CMDCOrchestrator.DAG{
  nodes: [
    %{id: "research", type: :agent, config: %{prompt: "调研 AI Agent"}},
    %{id: "write", type: :agent, config: %{prompt: "写一篇 800 字博客"}},
    %{id: "review", type: :gate, config: %{criteria: ["完整"]}}
  ],
  edges: [
    %{from: "research", to: "write"},
    %{from: "write", to: "review"}
  ]
}

{:ok, results} = CMDCOrchestrator.execute(dag,
  model: "openai:gpt-4o-mini",
  api_key: System.fetch_env!("OPENAI_API_KEY")
)

results["write"]
```

执行失败时返回结构化错误,方便上层 UI 聚焦失败点:

```elixir
{:error, %{node_id: "review", reason: "门禁未通过: 完整", completed: %{...}}}
```

## 与 CMDC v0.2 的集成

`:agent` 节点支持透传 `cmdc` 0.2 的全部 Agent 选项:`:user_data` / `:prompt_mode` /
`:plugins` / `:tools` / `:provider` / `:model` 等。

v0.3 通过 `Runtime` 提供原生的会话池和 SubAgent 支持,**无需**用户手工管理
`CMDC.SubAgent.Supervisor`
- `:pool` 模式 → Runtime 内部按 `pool_key` 复用会话;
- `:subagent` 模式 → Runtime 注册独立长生命周期会话,DAG 结束统一 `CMDC.stop/1`
## 内置模板(v0.4)

```elixir
CMDCOrchestrator.Templates.names()
#=> ["contract_review", "order_delay_diagnosis", "ticket_triage", "rag_release_gate", "debate_review"]

spec = CMDCOrchestrator.Templates.get!("contract_review")
{:ok, _report} = CMDCOrchestrator.dry_run(spec)
```

模板是 JSON-ready 的 `WorkflowSpec` 草稿,可作为 Workflow Designer 初始化数据。

## Telemetry(v0.4)

执行器发出稳定事件,metadata 只包含结构化摘要,不放 prompt、chunk 或完整工具输出:

- `[:cmdc_orchestrator, :run, :start | :stop | :exception]`
- `[:cmdc_orchestrator, :node, :start | :stop | :exception]`

这些事件可由 `cmdc_gateway` 或企业 AgentOps Trace Viewer 映射为 SSE / timeline。

human_task 会额外写入 `human_task.created``human_task.decision_recorded``human_task.progress``human_task.completed``human_task.timeout` 等事件,供
Approval Center、Run Console 和 Trace Viewer 关联展示。

## 开发

```bash
mix deps.get
mix test
mix format --check-formatted
mix compile --warnings-as-errors
mix credo --strict
```

## 路线图

- v0.1 ✅ 基础 DAG + 4 种节点 + 串行执行
- v0.2 ✅ 真并行(拓扑分层 + `Task.async_stream`)+ Router 真剪枝 + AgentNode Recovery
- v0.3 ✅ Runtime + AgentNode 三模式(standalone/pool/subagent)+
  DebateNode + HierarchyNode + RouterNode LLM 策略
- v0.4 ✅ WorkflowSpec + validate/dry_run/to_dag + Node Registry + Tool/EvalGate +
  Templates + Telemetry
- v0.5 ✅ Async Run API + RunStore + event ledger + condition/signal
- v0.6 M1 ✅ 可恢复 Executor + Run Console controls + retry/timeout/fallback +
  fork/join
- v0.6 M2 ✅ human_task / 审批暂停恢复 / timeout tick helper

详见 [`CHANGELOG.md`](CHANGELOG.md)`example/multi_agent_debate_demo.exs`
## 许可

Apache-2.0