# AgentOps 集成契约
本指南面向企业 Phoenix AgentOps 平台接入 `cmdc_orchestrator 0.6`。目标是把
Workflow Designer、Run Console、Approval Center、Trace Viewer、Skill Registry
接到稳定 runtime contract 上,而不是把 Phoenix / Ecto / Oban / RBAC 实现下沉到
通用包。
## 边界
`cmdc_orchestrator` 负责:
- JSON-safe `WorkflowSpec`、节点/边校验、dry run。
- Run / NodeRun / RunEvent 账本结构与 `RunStore` behaviour。
- 可恢复 `RunExecutor`、pause/resume/retry/rerun/cancel。
- `:fork` / `:join`、retry/timeout/fallback/on_error policy。
- `:human_task` 等待、决策聚合、timeout tick 与 resume signal。
企业 Phoenix 平台负责:
- Workflow / Version / Run / Event / HumanTask 的 Ecto schema、索引与租户字段。
- Workflow Designer UI、审批中心、通知、RBAC/ABAC、业务审计表。
- Oban 定时触发、timeout tick、重试任务、通知任务。
- Gateway/SSE/WebSocket endpoint 与 Trace Viewer 展示。
- Skill / Tool / Agent / RAG 业务注册表。
## 推荐 Ecto 表
这些表是建议形状,不是库内 schema。字段名可按平台规范调整,但语义建议保持一致。
| 表 | 关键字段 | 说明 |
|---|---|---|
| `agent_workflows` | `id`, `tenant_id`, `slug`, `name`, `status`, `current_version_id` | Workflow 业务入口。 |
| `agent_workflow_versions` | `id`, `workflow_id`, `version`, `spec`, `ui_metadata`, `published_by`, `published_at` | 保存 JSON DSL 与画布布局。 |
| `agent_workflow_runs` | `id`, `tenant_id`, `workflow_id`, `workflow_version_id`, `status`, `trigger_source`, `trigger_ref`, `current_node_id`, `context_data`, `completed`, `pruned`, `signal_history`, `resume_cursor`, `retry_counters`, `branch_states`, `human_tasks`, `lock_version`, `claim_owner`, `claim_expires_at`, `started_at`, `finished_at` | 映射 `%CMDCOrchestrator.Run{}`。 |
| `agent_workflow_node_runs` | `id`, `run_id`, `node_id`, `node_type`, `status`, `signal`, `attempts`, `input_snapshot`, `output_data`, `actor_ref`, `external_ref`, `started_at`, `finished_at` | 映射 `%CMDCOrchestrator.NodeRun{}`。 |
| `agent_workflow_run_events` | `id`, `run_id`, `seq`, `type`, `node_id`, `trace_id`, `span_id`, `parent_id`, `payload`, `timestamp` | 映射 `%CMDCOrchestrator.RunEvent{}`;`(run_id, seq)` 唯一。 |
| `agent_workflow_idempotency_keys` | `scope`, `key`, `value`, `inserted_at` | HTTP 重试、重复点击、审批幂等。 |
推荐索引:
- `agent_workflows(tenant_id, slug)` unique。
- `agent_workflow_versions(workflow_id, version)` unique。
- `agent_workflow_runs(tenant_id, workflow_id, status, started_at desc)`。
- `agent_workflow_runs(claim_owner, claim_expires_at)`。
- `agent_workflow_node_runs(run_id, node_id)` unique。
- `agent_workflow_run_events(run_id, seq)` unique。
- `agent_workflow_run_events(run_id, type, node_id, seq)`。
- `agent_workflow_idempotency_keys(scope, key)` unique。
## Ecto RunStore 注意事项
Ecto backend 应实现 `CMDCOrchestrator.RunStore` behaviour。关键点:
- `compare_and_update_run/4` 必须用 `lock_version` 或数据库条件更新做 CAS。
- `claim_run/4` 应在事务里锁定 run 行,等价于 `SELECT ... FOR UPDATE`。
- `claim_expires_at` 过期后允许其它 executor 接管。
- `put_idempotency/4` 使用 unique index;冲突时返回已有 value。
- `append_event/3` 在同一事务内递增 run-local `seq`。
- `list_events/2` 支持 `:limit`,`:after_id` 或 `:after_seq`,给 Trace Viewer 做分页。
- payload 保存前继续依赖 `RunEvent.sanitize/1`,不要把 prompt/chunk/raw result 入库。
- Phoenix 业务审计不要写进 `RunEvent` 原始 payload;用平台审计表关联 `run_id` / `event_id`。
## Workflow Designer JSON DSL
Designer 保存 `WorkflowSpec` 原文,画布信息放在 `metadata.ui` 或平台自己的
`ui_metadata` 字段。节点 ID 必须稳定,节点名称和 label 只作展示。
```json
{
"workflow_id": "contract_review",
"version": "2026.06.001",
"mode": "async",
"nodes": [
{
"id": "risk_check",
"type": "condition",
"label": "风险判断",
"config": {"left": "{{amount}}", "operator": "gte", "right": 100000}
},
{
"id": "legal_review",
"type": "human_task",
"label": "法务审批",
"config": {
"title": "合同法务审批",
"assignee_refs": ["role:legal"],
"approval_mode": "any_of",
"timeout_ms": 86400000,
"on_timeout": "fail"
}
}
],
"edges": [
{"from": "risk_check", "to": "legal_review", "signal": "true"}
],
"metadata": {
"ui": {
"nodes": {
"risk_check": {"x": 160, "y": 80},
"legal_review": {"x": 420, "y": 80}
}
}
}
}
```
保存前调用:
```elixir
{:ok, spec, warnings} = CMDCOrchestrator.validate_workflow(params["spec"])
{:ok, preview} = CMDCOrchestrator.dry_run(spec, context_data: params["context_data"])
```
`dry_run/2` 不触发 Tool / Agent / human_task 副作用,适合画布校验和路径解释。
## RunService API 映射
| Phoenix API | Orchestrator 调用 | 说明 |
|---|---|---|
| `POST /workflows/:id/validate` | `validate_workflow/1` | 画布校验。 |
| `POST /workflows/:id/dry-run` | `dry_run/2` | 路径预演。 |
| `POST /workflows/:id/runs` | `start_run/2` | 传 `input`, `trigger_source`, `idempotency_key`。 |
| `GET /runs/:id` | `status/2` | 返回 run/node_runs/events 快照。 |
| `GET /runs/:id/events` | `events/2` | 支持 `limit`, `after_id`, `type`, `node_id`。 |
| `POST /runs/:id/pause` | `pause_run/3` | 操作员暂停。 |
| `POST /runs/:id/resume` | `resume_run/2` | 从 waiting/paused/failed 恢复。 |
| `POST /runs/:id/retry` | `retry_run/2` | 从失败恢复点继续。 |
| `POST /runs/:id/nodes/:node_id/retry` | `retry_node/3` | 清掉该节点及下游结果后重跑。 |
| `POST /runs/:id/rerun` | `rerun/2` | 基于旧 run 的 DAG snapshot 创建新 run。 |
| `POST /runs/:id/cancel` | `cancel/3` | 取消未完成 run。 |
Run 列表、统计、成功率、P95、租户成本建议由 Phoenix 按 Ecto 表聚合,不要放进通用包。
## Approval Center 映射
`human_task` task_id 与 `correlation_id` 是审批中心的关联键。平台可以把
`run.human_tasks` 同步到自己的审批表,但最终决策应回写 runtime:
```elixir
CMDCOrchestrator.submit_human_task_decision(run_id, task_id, %{
action: :approve,
actor_ref: "account:#{current_account.id}",
payload: %{"approval_id" => approval.id},
comment: params["comment"]
}, idempotency_key: params["request_id"])
```
定时超时由平台调度:
```elixir
CMDCOrchestrator.expire_waiting_tasks(run_id, now: DateTime.utc_now())
```
不要在 `cmdc_orchestrator` 内查 Accounts/Roles/RBAC。`AssigneeResolver` 只约定
`assignee_refs` 解析边界,具体人员、通知和审计由 Phoenix 平台处理。
## Trace Viewer / Gateway 事件映射
推荐 Gateway SSE/WebSocket 输出字段:
```json
{
"event": "workflow.node.completed",
"run_id": "run_x",
"node_id": "risk_check",
"trace_id": "trace_x",
"span_id": "run_x:risk_check:orchestrator.node.completed",
"timestamp": "2026-06-01T12:00:00Z",
"payload": {"signal": "true"}
}
```
| RunEvent.type | Gateway event | UI 用途 |
|---|---|---|
| `run.started` | `workflow.run.started` | Timeline 起点。 |
| `run.resumed` | `workflow.run.resumed` | 恢复标记。 |
| `run.paused` | `workflow.run.paused` | 操作员暂停。 |
| `run.waiting` | `workflow.run.waiting` | 等待人工或外部事件。 |
| `run.completed` | `workflow.run.completed` | 成功终态。 |
| `run.failed` | `workflow.run.failed` | 失败终态。 |
| `run.cancelled` | `workflow.run.cancelled` | 取消终态。 |
| `orchestrator.node.started` | `workflow.node.started` | 节点执行中。 |
| `orchestrator.node.completed` | `workflow.node.completed` | 节点成功和 signal。 |
| `orchestrator.node.failed` | `workflow.node.failed` | 节点失败原因。 |
| `orchestrator.node.skipped` | `workflow.node.skipped` | 分支剪枝。 |
| `human_task.created` | `workflow.human_task.created` | Approval Center 创建入口。 |
| `human_task.progress` | `workflow.human_task.progress` | 会签/进度更新。 |
| `human_task.completed` | `workflow.human_task.completed` | 审批终局。 |
| `human_task.timeout` | `workflow.human_task.timeout` | 超时处理。 |
Gateway 只做事件翻译和敏感字段裁剪,不持有 workflow 业务状态。
## Skill Registry 接入
`:skill` 节点建议作为 adapter 层实现,不要让 orchestrator 直接依赖
`cmdc_skill_engine`。节点配置只保存 stable ref:
```json
{
"id": "write_report",
"type": "skill",
"config": {
"skill_ref": "contract.report_writer",
"version": "1.2.0",
"input": {"risk": "{{risk_review}}"}
}
}
```
企业平台在执行上下文里注册自定义 node 或 Tool adapter,并在 Skill Registry 做版本、
质量分和发布门禁。
## Oban / Temporal 边界
| 场景 | 建议 |
|---|---|
| 定时触发 workflow | Oban cron 扫描业务 workflow,调用 `start_run/2`。 |
| human_task timeout tick | Oban worker 调 `expire_waiting_tasks/2`。 |
| 发送审批通知/催办 | Oban worker,不在 runtime 内做。 |
| 数小时到数天的普通审批 | `cmdc_orchestrator` + Ecto RunStore + Oban tick 足够。 |
| 跨系统、跨周、强补偿长事务 | Temporal 可作为外层,内部 Agentic 编排仍交给 `cmdc_orchestrator`。 |
## Hive 迁移对照
Hive 的 Workflow/Step/Run/StepRun、validate/dry_run、approval waiting/resume、
fork/join、retry/cancel 对产品语义有参考价值。迁移时只搬语义:
| Hive | AgentOps + cmdc_orchestrator |
|---|---|
| `Workflow` | Phoenix `agent_workflows` + `agent_workflow_versions.spec`。 |
| `WorkflowStep` | `WorkflowSpec.nodes` / `edges`。 |
| `WorkflowRun` | `%CMDCOrchestrator.Run{}` + Phoenix run table。 |
| `WorkflowStepRun` | `%CMDCOrchestrator.NodeRun{}` + node run table。 |
| `Engine.broadcast` | `RunEvent` -> Gateway SSE/WebSocket。 |
| `Approval` step | `:human_task` + Approval Center。 |
| `WorkflowScheduler` | Oban worker 调 `start_run/2`。 |
| `WorkflowReminder` | Oban worker 调 `expire_waiting_tasks/2` + 平台通知。 |
不要复制 Hive controller、schema、channel 或 React Flow 代码到通用包。
## 发布接入检查表
- WorkflowSpec JSON 不含匿名函数、pid、reference、tuple。
- 所有 node id 稳定,改名只改 label/name。
- Ecto RunStore 覆盖 CAS、claim lease、idempotency、event seq。
- Trace Viewer 使用 `events/2` 分页,不一次性加载完整长 run。
- Approval Center 所有按钮传 `idempotency_key`。
- Oban timeout tick 可重复执行且幂等。
- Gateway SSE 默认不输出 prompt、chunks、raw results。
- `cmdc_test` fake runtime 覆盖 CI;`cmdc_eval` WorkflowEval gate 覆盖发布门禁。