# Cyclium
> **Autonomous agent framework for Elixir** — actors monitor domains, run multi-turn episodes with budget enforcement, and produce persistent findings and typed outputs.
Cyclium is an Elixir library for building agentic systems that monitor domains, run multi-turn episodes, classify situations, and produce typed outputs. Actors declare expectations about how things should be; when triggers fire, episodes execute strategies that can gather data, call tools, synthesize with LLMs, and converge into findings and outputs. Think of it as an OTP-native agent framework where the episode — not the request — is the unit of work.
## Key features
- **Declarative Actor DSL** — Define actors, expectations, triggers, and budgets in a compact macro-based syntax
- **Strategy Pattern** — Pluggable investigation logic with a clear init → observe → converge lifecycle
- **Episode Runner** — Budget-enforced execution loop with step journaling, checkpointing, and crash recovery
- **Findings Lifecycle** — Persistent observations with raise/update/clear semantics, upsert-by-key, causality chains, TTL expiration, severity escalation, and post-raise enrichment hooks
- **Output Router** — Deduplicated, adapter-based delivery (email, Slack, webhooks) with approval gates and adapter registry
- **Event Bus** — Phoenix.PubSub-backed event system connecting actors without coupling
- **Workflow Engine** — Multi-actor coordination with dependency graphs, failure policies, retry with backoff, cross-workflow episode dedup, and cancellation cascade
- **Circuit Breaker** — Per-expectation circuit breaker with configurable thresholds, half-open recovery, and optional in-flight episode cancellation
- **Episode Sampling** — Probabilistic firing control via `sample_rate` on expectations
- **Service Level Tracking** — Declarative performance objectives with breach detection and telemetry
- **Adaptive Budgets** — Advisory budget recommendations based on historical episode usage (p95 with headroom)
- **Backpressure Controls** — Per-actor concurrency limits with queue, drop, or shed-oldest overflow policies
- **Debounce and Cooldown** — Temporal controls to coalesce rapid-fire events and enforce minimum gaps
- **Log Projection** — Materialized human-readable logs at configurable verbosity (none → full_debug)
- **Telemetry** — 36 structured telemetry events for observability
- **OTP-Native** — No Oban or external job queue required; episodes run as Tasks under DynamicSupervisor
- **Test Kit** — Assertion macros and fakes for validating actors, strategies, synthesizers, output adapters, workflows, and checkpoint migrations in host apps
- **SQL Server 2017 Compatible** — Transaction-based upserts, denormalized query columns, no JSON operators in DDL
## Who is this for?
Cyclium is designed for Elixir teams building **autonomous agent systems** where:
- Business rules define what *should* be true (SLAs, health thresholds, compliance checks)
- Episodes involve multiple steps: data gathering, LLM synthesis, tool calls, human approval
- Findings need to persist and evolve over time (raised → updated → cleared)
- Actions need deduplication, audit trails, and typed delivery through adapters
- Multiple actors need to coordinate through workflows with dependency ordering
- Real-time visibility into agent state is essential (Phoenix LiveView integration via Bus)
If you need a simple cron job or a one-shot script, Cyclium is overkill. Cyclium shines when you have ongoing, stateful processes that produce findings and outputs — and need the lifecycle, audit trail, and coordination to go with them.
## How Cyclium differs
**vs. Oban** — Oban is a job queue: enqueue work, run it, done. Cyclium is an agent framework that manages stateful, multi-turn episodes with budgets, findings, outputs, and workflows. Episodes happen to run as OTP Tasks, so you don't need Oban — but the two solve different problems. You could use both: Oban for fire-and-forget jobs, Cyclium for ongoing autonomous processes.
**vs. Sagents** — Sagents is built for interactive AI conversations where users chat with LLM-powered agents in real time. Cyclium is built for autonomous operational agents that monitor domains, classify situations, and act — with or without an LLM in the loop. Cyclium's strategies can call LLMs via `:synthesize`, but they can also run purely deterministic logic. The execution model (expectations → episodes → findings → outputs) is designed for operational workflows, not chat.
**vs. GenServers / custom OTP** — You could build all of this with raw GenServers, but Cyclium gives you the episode lifecycle (budgets, journaling, checkpoints, crash recovery), the findings system (upsert-by-key, severity, evidence), the output router (deduplication, adapters, approval gates), the workflow engine (dependency graphs, failure policies), and the event bus — all wired together with telemetry and audit trails.
## Strategy-driven vs. LLM-routed
Most agent frameworks put the LLM in the driver's seat — it decides which tool to call, when to stop, and how to recover from errors. Cyclium inverts this. The **developer is the router**: your `next_step/2` function is a deterministic state machine that decides what happens next. The LLM is a powerful tool you call at specific points via `:synthesize`, but it never controls the flow.
This means you can mix deterministic and AI-powered steps in a single episode — gather data with a tool call, classify it with an LLM, then act on the result with another tool call — all under explicit developer control with budget enforcement at every turn:
```elixir
def next_step(%{phase: :gather} = state, _ctx) do
{:tool_call, :erp_read, :search_pos, %{status: "STALLED"}}
end
def next_step(%{phase: :classify, po_data: data} = state, _ctx) do
{:synthesize, %{task: :classify_po, data: data}}
end
def next_step(%{phase: :act, classification: "vendor_delay"} = state, _ctx) do
{:tool_call, :email_write, :send_followup, build_email(state)}
end
```
The LLM is powerful, but it's not the control plane. You get repeatability, testability, and full visibility into exactly which steps ran and why — without sacrificing the ability to use AI where it adds value.
## Architecture
> The examples throughout this README use a **client health monitoring** system: a `ClientHealthActor` that evaluates client metrics (MRR, active users, support tickets) on each change and classifies health status, plus a `ClientAdvisorActor` that synthesizes an LLM-powered summary. See the [demo application](#demo-application) for the full working implementation.
### Supervision tree
```
YourApp.Supervisor
├── YourApp.Repo
├── Phoenix.PubSub
├── YourApp.Actors.ClientHealthActor (GenServer)
├── YourApp.Actors.ClientAdvisorActor (GenServer)
├── Cyclium.Supervisor
│ ├── Cyclium.ActorSupervisor (DynamicSupervisor)
│ ├── Cyclium.EpisodeSupervisor (DynamicSupervisor)
│ │ └── Cyclium.EpisodeTask (one per running episode)
│ ├── Cyclium.TaskSupervisor (Task.Supervisor)
│ ├── Cyclium.Reconciler (optional — spec change detection)
│ ├── Cyclium.WorkflowEngine (optional — multi-actor workflows)
│ └── Cyclium.Findings.FindingSweep (optional — expiration + escalation)
└── YourAppWeb.Endpoint
```
**Actors** are GenServers that subscribe to Bus events and manage episode lifecycle. They're started by the consuming app's supervision tree, above Cyclium's supervisors. When a trigger fires, the actor creates an Episode row and starts an `EpisodeTask` under the `EpisodeSupervisor`. Each task resolves a strategy from the registry and runs the episode loop.
**Key design principles:**
- Actors own concurrency limits — they track active/queued episodes in-process
- Episodes are durable — the `cyclium_episodes` table is itself a work queue
- The Bus connects everything — actors, LiveViews, and workflows all subscribe to the same event stream
- Strategies are stateless modules — all state lives in the episode's strategy state map
### Execution model
```
Bus event arrives
→ Actor.handle_info matches expectation trigger
→ Check debounce/cooldown → circuit breaker → sample_rate
→ Check concurrency (active < max?)
→ yes: create Episode row, start EpisodeTask under DynamicSupervisor
→ no: apply overflow policy (queue / drop / shed_oldest)
EpisodeTask starts
→ Resolve strategy (persistent_term registration from actor boot → registry override)
→ strategy.init(episode, trigger)
→ EpisodeRunner.execute_loop:
┌─────────────────────────────────────────────┐
│ check_budget → check_loop → increment_turn │
│ strategy.next_step(state, ctx) │
│ :done → journal, set done │
│ :converge → run converge pipeline │
│ {:tool_call} → exec tool, handle_result │
│ {:observe} → journal, handle_result │
│ {:synthesize} → journal, handle_result │
│ {:checkpoint} → save state, loop │
│ {:approval} → block, wait for human │
│ {:wait} → block, wait for external │
│ ... → loop │
└─────────────────────────────────────────────┘
Converge pipeline (post_converge):
1. Persist findings (raise/update/clear) → enrich → Bus events per finding
2. Route outputs through adapters → dedup by dedupe_key, deliver
3. Compute final episode status from delivery outcomes
4. Journal completion/failure step
5. Project log via LogProjector
6. Record service levels + check for breach
7. Record adaptive budget sample (if enabled)
8. Broadcast episode.completed/failed on Bus
9. Emit telemetry
```
## Core concepts
### Actors
An **actor** is a GenServer that owns one or more **expectations**. Each actor watches a domain (e.g., `:procurement`, `:client_health`) and fires **episodes** when triggers match.
```elixir
defmodule MyApp.Actors.ClientHealthActor do
use Cyclium.Actor
actor do
domain(:client_health)
spec_rev("v0.1.0")
max_concurrent_episodes(5)
episode_overflow(:queue)
expectation(:client_should_be_healthy,
strategy: MyApp.Strategies.ClientHealth,
trigger: {:event, "client.health_check_requested"},
subject_key: :client_id,
debounce_ms: :timer.seconds(3),
budget: %{max_turns: 3, max_tokens: 1_000, max_wall_ms: 10_000}
)
expectation(:contract_review,
strategy: MyApp.Strategies.ContractReview,
trigger: {:schedule, :timer.hours(24)},
recovery_policy: :restart,
budget: %{max_turns: 12, max_tokens: 25_000, max_wall_ms: 120_000}
)
end
end
```
**Trigger types:**
- `{:event, "event.name"}` — fires when a matching Bus event arrives
- `{:schedule, interval_ms}` — fires on a recurring timer
- `:manual` — fires on explicit request
- `:workflow` — fires as part of a multi-actor workflow
- **List** — combine multiple triggers: `[{:event, "client.health_check_requested"}, :workflow]`
**List triggers** allow an expectation to fire from multiple sources. Event subscriptions and schedule timers are extracted from the list automatically. This is the recommended pattern when an expectation participates in a workflow but should also be independently triggerable:
```elixir
expectation(:client_should_be_healthy,
strategy: MyApp.Strategies.ClientHealth,
trigger: [{:event, "client.health_check_requested"}, :workflow],
subject_key: :client_id,
debounce_ms: :timer.seconds(3),
budget: %{max_turns: 3, max_tokens: 1_000, max_wall_ms: 10_000}
)
```
The actor subscribes to `"client.health_check_requested"` for standalone use (with debounce), while the `:workflow` marker documents that workflows can also invoke this expectation. Workflow-triggered episodes bypass the actor GenServer entirely — the workflow engine creates episodes directly — so actor-level debounce/cooldown only applies to event triggers.
**Backpressure options** (`episode_overflow`):
- `:queue` — buffer excess episodes (default)
- `:drop` — discard when at capacity
- `:shed_oldest` — cancel the oldest queued episode to make room
**Expectation options:**
| Option | Default | Description |
|---|---|---|
| `strategy` | required | Strategy module for this expectation. Declared inline — Cyclium registers it when the actor boots |
| `trigger` | required | What fires the episode. Single trigger or list (e.g. `[{:event, "..."}, :workflow]`) |
| `synthesizer` | `nil` | Synthesizer module override for this expectation. Overrides the actor-level `synthesizer(...)` declaration |
| `filter` | `%{}` | Payload predicates — only fire when all match |
| `debounce_ms` | `nil` | Coalesce rapid events into one firing |
| `cooldown_ms` | `nil` | Minimum gap between firings |
| `subject_key` | `nil` | Payload key to scope debounce/cooldown per subject (e.g., `:client_id`) |
| `budget` | `%{max_turns: 12, max_tokens: 25_000, max_wall_ms: 120_000}` | Resource limits |
| `log_strategy` | `:timeline` | Controls materialized log verbosity AND step journal detail (see below) |
| `outputs` | `[]` | Declared output types (informational) |
| `resources` | `[]` | Declared capability dependencies (informational) |
| `audit_level` | `:standard` | Audit verbosity |
| `retention_days` | `90` | How long to keep episode data. Set higher for audit-sensitive workflows (e.g., 365). Retention is declarative — enforcement requires a scheduled cleanup job (not yet built) |
| `sample_rate` | `nil` | Float 0.0–1.0. When set, episodes fire probabilistically. `nil` or `1.0` = always fire. Force-fire bypasses sampling |
| `circuit_breaker` | `nil` | Circuit breaker config: `%{threshold: 5, half_open_after_ms: 60_000, cancel_in_flight: false}`. See [Circuit Breaker](#circuit-breaker) |
| `adaptive_budget` | `false` | When `true`, records episode resource usage for advisory budget recommendations |
| `service_levels` | `nil` | Performance objectives: `%{max_duration_ms: n, success_rate: 0.95, window_episodes: 20}`. See [Service Level Tracking](#service-level-tracking) |
| `finding_enrichment` | `nil` | Post-raise enrichment callback: `fn finding, episode -> {:ok, %{summary: ...}} end` or `{Mod, :fun}` |
| `escalation_rules` | `nil` | Time-based severity escalation: `%{"class" => [%{after_minutes: 60, escalate_to: :high}]}` |
| `finding_ttl_seconds` | `nil` | Default TTL for findings raised by this expectation. Individual findings can override with explicit `ttl_seconds` |
**Actor ID convention:** Actor IDs are **atoms in-process** and **strings in the database**. Use `identifier/1` in the DSL to set a stable actor ID that survives module renames:
```elixir
actor do
identifier(:client_health) # explicit, rename-proof
domain(:health)
# ...
end
```
If omitted, the ID is derived from the module name (`MyApp.Actors.ClientHealthActor` → `:client_health_actor`). The boundary is at episode creation — `Cyclium.Actor.Server` calls `to_string(state.actor_id)` when building the episode params. Everything upstream is atoms, everything downstream (DB, strategies, findings) is strings. **Actors with persisted episodes should always declare an explicit identifier.**
### Strategies
A **strategy** implements the investigation logic for an expectation. It's the brain of an episode — a stateless module that receives state and returns actions.
```elixir
defmodule MyApp.Strategies.ClientHealth do
@behaviour Cyclium.EpisodeRunner.Strategy
@impl true
def init(_episode, trigger) do
client_id = trigger.payload["client_id"]
{:ok, %{client_id: client_id}}
end
@impl true
def next_step(state, _episode_ctx) do
:converge # go straight to classification
end
@impl true
def handle_result(state, _step, _result) do
{:ok, state}
end
@impl true
def converge(state, _episode_ctx) do
client = MyApp.Clients.get!(state.client_id)
{class, severity, summary} = classify(client)
{:ok, %Cyclium.ConvergeResult{
classification: %{"primary" => class, "severity" => to_string(severity)},
confidence: 1.0,
summary: summary,
findings: [
{:raise, %{
actor_id: "client_health_actor",
finding_key: "client:health:#{client.id}",
class: class,
severity: severity,
confidence: 1.0,
subject: %{kind: "client", id: client.id},
subject_kind: "client",
subject_id: client.id,
summary: summary,
evidence_refs: %{"active_users" => client.active_users}
}}
],
outputs: []
}}
end
end
```
**Strategy callbacks:**
| Callback | Purpose |
|---|---|
| `init(episode, trigger)` | Initialize state from trigger data. Return `{:ok, state}` |
| `next_step(state, episode_ctx)` | Decide the next action (see table below) |
| `handle_result(state, step, result)` | Process a step's outcome. Return `{:ok, state}`, `{:retry, state}`, or `{:abort, reason}` |
| `converge(state, episode_ctx)` | Produce findings, outputs, and classification. Return `{:ok, ConvergeResult}` |
| `workflow_result(state, converge_result)` | *(optional)* Extract data to pass to downstream workflow steps |
**`next_step` return values:**
| Return | Effect |
|---|---|
| `:done` | Episode complete (skip converge phase) |
| `:converge` | Run the converge pipeline |
| `{:tool_call, capability, action, args}` | Call a registered tool capability, pass result to `handle_result` |
| `{:observe, data}` | Journal `data` as an observation step, then pass `{:ok, data}` to `handle_result`. This is a synchronous in-process action — no external system is called. Use it to feed data you've already gathered into the strategy's result-handling flow |
| `{:synthesize, prompt_ctx}` | Request LLM synthesis via app-provided `Cyclium.Synthesizer`. The synthesizer calls the LLM, and the response flows to `handle_result` |
| `{:checkpoint, phase_name}` | Save strategy state for crash recovery |
| `{:output, type, payload}` | Propose an output inline (outside converge) |
| `{:approval, request}` | Block episode until human approval |
| `{:wait, external_ref}` | Block episode until external event resolves |
**`handle_result` step kinds:**
The `step` argument passed to `handle_result/3` is an `%EpisodeStep{}` struct. Pattern-match on `kind` to distinguish which action produced the result:
| `step.kind` | Produced by | `result` on success | `result` on failure |
|---|---|---|---|
| `:tool_call` | `{:tool_call, capability, action, args}` | `{:ok, tool_return_value}` | `{:error, {error_class, detail}}` |
| `:synthesis` | `{:synthesize, prompt_ctx}` | `{:ok, llm_response}` | `{:error, {error_class, detail}}` |
| `:observation` | `{:observe, data}` | `{:ok, data}` | *(never fails — data is passed through as-is)* |
The step struct also carries `tool_name` (for `:tool_call` steps) which is useful for per-tool retry tracking:
```elixir
def handle_result(state, %{kind: :synthesis}, {:ok, result}), do: ...
def handle_result(state, %{kind: :tool_call, tool_name: "erp_read"}, {:ok, data}), do: ...
def handle_result(state, %{kind: :observation}, {:ok, data}), do: ...
```
### Multi-turn strategies
Strategies can run multiple turns before converging. `next_step` decides actions, `handle_result` absorbs outcomes — expensive work like LLM calls should be delegated to actions (`:synthesize`, `:tool_call`), not done inside `handle_result`.
**Use a `:phase` field in state to drive progression.** This is the recommended pattern — it makes flow explicit, prevents ambiguous matching in `handle_result`, and avoids accidental loops:
```elixir
defmodule MyApp.Strategies.ClientAdvisor do
@behaviour Cyclium.EpisodeRunner.Strategy
@system_prompt "You are a customer success analyst. Assess the client's health."
@impl true
def init(_episode, trigger) do
# Load data in init — next_step should be pure routing, not queries
client_id = trigger.payload["client_id"]
client = MyApp.Clients.get!(client_id)
client_data = Map.take(client, [:name, :status, :mrr])
{:ok, %{phase: :synthesize, client_id: client_id, client_data: client_data, ai_summary: nil}}
end
# --- next_step: pure routing based on phase ---
@impl true
def next_step(%{phase: :synthesize} = state, _episode_ctx) do
{:synthesize, %{
system_prompt: @system_prompt,
user_message: "Client: #{state.client_data.name}, MRR: $#{state.client_data.mrr}"
}}
end
def next_step(%{phase: :done}, _episode_ctx), do: :converge
# --- handle_result: ALWAYS guard on phase ---
@impl true
def handle_result(%{phase: :synthesize} = state, _step, {:ok, result}) do
summary = if is_map(result), do: result[:text] || result["text"] || inspect(result), else: inspect(result)
{:ok, %{state | phase: :done, ai_summary: summary}}
end
def handle_result(%{phase: :synthesize} = state, %{kind: :synthesis}, {:error, {_class, _detail}}) do
# Transient failure — retry the same step (next_step will re-emit :synthesize)
{:retry, state}
end
def handle_result(_state, _step, {:error, reason}) do
{:abort, reason}
end
@impl true
def converge(state, _episode_ctx) do
{:ok, %Cyclium.ConvergeResult{
classification: %{"primary" => "ai_summary", "severity" => "low"},
confidence: 0.9,
summary: state.ai_summary,
findings: [
{:raise, %{
actor_id: "client_advisor_actor",
finding_key: "client:advisor:#{state.client_id}",
class: "ai_summary",
severity: :low,
confidence: 0.9,
subject_kind: "client",
subject_id: state.client_id,
summary: state.ai_summary
}}
],
outputs: []
}}
end
end
```
**Key points:**
- `next_step` is pure decision-making — it returns what action to take, never does expensive work itself
- `handle_result` absorbs outcomes — it pattern-matches on step kind and result, then updates state
- `{:retry, state}` re-enters the loop with the same state, letting `next_step` retry the action
- `{:abort, reason}` immediately fails the episode with the given reason
- The `:synthesize` action delegates LLM calls to the app-provided `Cyclium.Synthesizer`, keeping the strategy free of HTTP concerns
#### Multi-step patterns and pitfalls
**Always guard `handle_result` on `:phase`.** The most common multi-step bug is an unguarded `handle_result` clause that matches too broadly, causing the strategy to cycle between phases instead of progressing:
```elixir
# BAD — matches ANY success result during any phase
def handle_result(state, _step, {:ok, result}) do
{:ok, %{state | phase: :done, ai_summary: inspect(result)}}
end
# GOOD — each clause is scoped to its phase
def handle_result(%{phase: :synthesize} = state, _step, {:ok, result}) do
{:ok, %{state | phase: :done, ai_summary: extract_text(result)}}
end
def handle_result(%{phase: :tool_result} = state, _step, {:ok, data}) do
{:ok, %{state | phase: :synthesize, gathered: data}}
end
```
Without phase guards, `handle_result` matches the wrong clause → phase doesn't advance → `next_step` re-emits the same action → loop. The budget will eventually kill it, but you'll burn turns for no reason.
**Handle the no-synthesizer passthrough.** When no `Cyclium.Synthesizer` is configured (or the registry returns `nil` for your actor), the runner passes `prompt_ctx` through as-is to `handle_result` with `{:ok, prompt_ctx}`. Your `:synthesize` phase handler must handle both the LLM response shape AND the raw passthrough:
```elixir
def handle_result(%{phase: :synthesize} = state, _step, {:ok, result}) do
summary =
cond do
is_binary(result) -> result
is_map(result) && Map.has_key?(result, :text) -> result.text
is_map(result) && Map.has_key?(result, "text") -> result["text"]
true -> inspect(result) # passthrough — no synthesizer configured
end
{:ok, %{state | phase: :done, ai_summary: summary}}
end
```
**Handle both trigger types if your actor can be triggered by workflows.** Workflow-created episodes use `%Cyclium.Trigger.Workflow{input: ...}`, not `%Cyclium.Trigger.Event{payload: ...}`:
```elixir
def init(_episode, trigger) do
client_id =
case trigger do
%Cyclium.Trigger.Event{payload: %{client_id: id}} -> id
%Cyclium.Trigger.Event{payload: payload} -> payload["client_id"]
%Cyclium.Trigger.Workflow{input: %{client_id: id}} -> id
%Cyclium.Trigger.Workflow{input: input} when is_map(input) -> input["client_id"]
_ -> nil
end
{:ok, %{phase: :gather, client_id: client_id}}
end
```
#### Loop detection
The episode runner automatically detects repeating step cycles. It fingerprints each `next_step` return value and watches for repeated patterns — a 1-step cycle (A, A, A), a 2-step cycle (A, B, A, B), or longer. When a cycle is detected, the episode fails immediately with `error_class: "loop_detected"`.
This is a safety net, not a substitute for correct phase guards. If loop detection fires, it means your strategy has a bug — fix the root cause (usually a missing phase guard on `handle_result`).
Note that consecutive steps of the **same kind but different data** are fine — the fingerprint includes the full action payload via `:erlang.phash2/1`. A dispatch strategy that emits multiple `:observe` steps with different entity data won't trigger loop detection. Only identical actions repeated in a cycle will.
### Episodes
An **episode** is one execution of a strategy. It tracks:
- Budget usage (turns, tokens, wall time)
- Step journal (every action recorded as an `EpisodeStep`)
- Classification and summary (set during converge)
- Status lifecycle: `:running` → `:done` | `:failed` | `:blocked` | `:canceled` | `:partially_failed`
Episodes run as Tasks under a DynamicSupervisor — no Oban required. The `cyclium_episodes` table serves as a durable work queue.
**Querying episodes:**
```elixir
Cyclium.Episodes.get!(episode_id)
Cyclium.Episodes.list_by_status([:running, :done, :failed])
Cyclium.Episodes.list_steps(episode_id) # step journal
Cyclium.Episodes.get_log(episode_id) # materialized log
Cyclium.Episodes.cancel(episode_id) # cancellation sequence
# List episodes by actor(s)
Cyclium.Episodes.list_by_actors(["my_actor"], limit: 20, order: :desc)
# Filter by subject — DB-level JSON filtering across trigger types
# Checks trigger_ref.payload.<key> (event-triggered) and
# trigger_ref.input.<key> (workflow-triggered) in a single query.
Cyclium.Episodes.list_by_actors_and_subject(
["client_health_actor", "client_advisor_actor"],
:client_id,
client.id,
limit: 20, order: :desc
)
```
`list_by_actors_and_subject/4` detects the repo adapter at runtime and uses the appropriate JSON text extraction — Postgres `#>>` or SQL Server `JSON_VALUE`. This is the recommended way to fetch episodes for a specific entity — avoids pulling all episodes and filtering in memory.
### Findings
A **finding** is a persistent observation about an entity. Findings have a lifecycle:
- **Raise** — create or update an active finding (upsert by `finding_key`)
- **Update** — modify mutable fields on an active finding
- **Clear** — mark a finding as resolved (idempotent)
```elixir
# In your converge/2 callback:
findings: [
{:raise, %{
actor_id: "client_health_actor",
finding_key: "client:health:123",
class: "churned",
severity: :high, # :low | :medium | :high | :critical
confidence: 1.0,
subject: %{kind: "client", id: "123"},
subject_kind: "client", # denormalized for SQL Server compat
subject_id: "123",
summary: "Client has churned",
evidence_refs: %{"status" => "churned"}
}},
{:update, "client:health:123", %{confidence: 0.8}},
{:clear, "client:health:123"},
{:clear, "client:health:123", "customer reactivated"}
]
```
**Finding key scoping — deduplicated vs. distinct:**
The `finding_key` controls deduplication. An active finding with the same key is updated in place (last-writer-wins on mutable fields). Choose your key strategy based on intent:
- **Deduplicated (default pattern):** Use a stable key like `"client:health:123"`. Repeated episodes update the same active finding — ideal for ongoing status tracking where you want one finding per subject.
- **Distinct per episode:** Include the episode ID in the key, e.g. `"po_review:PO-1955:#{episode.id}"`. Each episode creates a separate finding — useful for audit trails or point-in-time snapshots where every run should produce its own record.
The `episode_ctx` map passed to `converge/2` contains `episode_id`, so you can reference it directly:
```elixir
# Deduplicated: one active finding per client, updated each run
def converge(state, _episode_ctx) do
{:ok, %Cyclium.ConvergeResult{
findings: [{:raise, %{finding_key: "client:health:#{state.client_id}", ...}}]
}}
end
# Distinct: one finding per episode run
def converge(state, episode_ctx) do
{:ok, %Cyclium.ConvergeResult{
findings: [{:raise, %{finding_key: "po_review:#{state.po_id}:#{episode_ctx.episode_id}", ...}}]
}}
end
```
Findings are queried via `Cyclium.Findings.active_for/1`:
```elixir
Cyclium.Findings.active_for(actor: "client_health_actor")
Cyclium.Findings.active_for(subject: %{kind: "client", id: "123"})
Cyclium.Findings.active_for(finding_key: "client:health:123")
Cyclium.Findings.active_for(class: "churned")
Cyclium.Findings.active_for(caused_by: "parent:finding:key")
```
**Causality chains:** Findings can reference a parent finding via `caused_by_key`. This enables tracing root causes through chains of related findings:
```elixir
# Raise a child finding linked to a parent
{:raise, %{finding_key: "vendor:delay:PO-123", caused_by_key: "vendor:health:acme", ...}}
# Query helpers
Cyclium.Findings.caused_by("parent:key") # direct children
Cyclium.Findings.causal_chain("child:key", 10) # walk chain upward (max depth)
Cyclium.Findings.root_cause("child:key") # find root (no caused_by_key)
```
**TTL / Expiration:** Findings can auto-expire after a duration. Declare a default TTL on the expectation, or pass `ttl_seconds` / `expires_at` per finding:
```elixir
# Default TTL for all findings raised by this expectation
expectation(:check_temp,
strategy: MyApp.Strategies.TempCheck,
trigger: {:event, "sensor.updated"},
finding_ttl_seconds: 3600
)
# Or override per finding in converge:
{:raise, %{finding_key: "temp:alert:123", ttl_seconds: 7200, ...}}
```
Expired findings are cleared and active findings are escalated by `Cyclium.Findings.FindingSweep`, an optional GenServer that runs on a configurable interval:
```elixir
# config/config.exs
config :cyclium, :finding_sweep, true
config :cyclium, :finding_sweep_interval_ms, 300_000 # 5 minutes (default)
config :cyclium, :finding_sweep_batch_size, 100 # per sweep (default)
```
**Severity escalation:** Time-based rules automatically escalate finding severity based on how long a finding has been active. Declare rules on the expectation:
```elixir
expectation(:check_vendor,
strategy: MyApp.Strategies.VendorHealth,
trigger: {:event, "vendor.updated"},
escalation_rules: %{
"vendor_delay" => [
%{after_minutes: 60, escalate_to: :high},
%{after_minutes: 1440, escalate_to: :critical}
]
}
)
```
Escalation runs as part of the finding sweep cycle. Each sweep interval, **every active finding** matching a registered escalation pair (actor + expectation + classes) is loaded from the database and checked against the time-based rules. This means the sweep interval (`finding_sweep_interval_ms`) controls how often escalation is evaluated — rules with `after_minutes` granularity finer than the sweep interval won't fire any faster. For expectations with many active findings, keep this in mind when tuning the sweep interval and batch size. Application config (`config :cyclium, :escalation_rules`) is supported as a fallback.
**Post-raise enrichment:** An optional callback enriches findings immediately after they're raised. Declare it on the expectation:
```elixir
expectation(:check_health,
strategy: MyApp.Strategies.ClientHealth,
trigger: {:event, "client.updated"},
finding_enrichment: fn finding, _episode ->
{:ok, %{summary: "Enriched: #{finding.summary}", confidence: 0.95}}
end
)
# Or use a module/function tuple:
expectation(:check_health,
strategy: MyApp.Strategies.ClientHealth,
trigger: {:event, "client.updated"},
finding_enrichment: {MyApp.FindingEnricher, :enrich}
)
```
The callback receives `(finding, episode)` and returns `{:ok, %{...}}` or `:skip`. Only safe fields are applied: `evidence_refs`, `summary`, `confidence`. Errors in the callback are logged — the finding persists unchanged. Application config (`config :cyclium, :finding_enrichment`) is supported as a fallback.
### Outputs
Outputs are typed proposals produced during converge. They flow through the **Output Router**, which handles deduplication (via `dedupe_key`) and delivery through app-provided adapters.
```elixir
# In converge result:
outputs: [
%Cyclium.OutputProposal{
type: :email,
dedupe_key: "alert:client:123:#{Cyclium.Window.bucket(:h4, DateTime.utc_now())}",
payload: %{to: "team@co.com", subject: "Client 123 churned"},
requires_approval: false
}
]
```
Register adapters in config:
```elixir
config :cyclium, :output_adapters, %{
email: MyApp.Adapters.Email,
slack: MyApp.Adapters.Slack
}
```
Adapters implement `Cyclium.Output.Adapter`:
```elixir
defmodule MyApp.Adapters.Email do
@behaviour Cyclium.Output.Adapter
@impl true
def deliver(:email, payload, _ctx) do
case MyApp.Mailer.send(payload) do
:ok -> {:ok, %{message_id: "abc123"}}
{:error, reason} -> {:error, reason}
end
end
end
```
The adapter registry provides programmatic access:
```elixir
Cyclium.Output.Adapter.resolve(:email) # => MyApp.Adapters.Email
Cyclium.Output.Adapter.resolve("slack") # => MyApp.Adapters.Slack
Cyclium.Output.Adapter.all() # => [:email, :slack]
```
### Bus
The event bus connects actors, LiveViews, and workflows without coupling. It wraps Phoenix.PubSub.
```elixir
# Publish a domain event (from your app code):
Cyclium.Bus.broadcast("client.updated", %{client_id: "123"})
# Subscribe to all events (actors do this automatically):
Cyclium.Bus.subscribe()
# Subscribe to a specific event:
Cyclium.Bus.subscribe("episode.completed")
# In a LiveView or GenServer:
def handle_info({:bus, "episode.completed", payload}, socket) do
# payload contains: episode_id, actor_id, status, workflow_instance_id
end
```
**Runtime events emitted by Cyclium:**
| Category | Events |
|---|---|
| Episode lifecycle | `episode.completed`, `episode.failed`, `episode.canceled`, `episode.queued`, `episode.dropped` |
| Expectations | `expectation.triggered` |
| Findings | `finding.raised`, `finding.updated`, `finding.cleared` |
| Outputs | `output.delivered` |
| Workflows | `workflow.started`, `workflow.completed`, `workflow.failed` |
| System | `spec.updated` |
## Setup
### 1. Add dependency
```elixir
# mix.exs
def deps do
[{:cyclium, path: "../cyclium_ex"}]
end
```
Dependencies pulled in: `ecto`, `ecto_sql`, `jason`, `phoenix_pubsub`.
### 2. Run migrations
```elixir
# In a migration file:
def up do
Cyclium.Migrations.V1.up() # episodes, steps, checkpoints, findings, outputs
Cyclium.Migrations.V2.up() # episode_logs
Cyclium.Migrations.V3.up() # workflow_instances
Cyclium.Migrations.V4.up() # archived_at on episodes and findings
Cyclium.Migrations.V5.up() # unique index on episode dedupe_key
Cyclium.Migrations.V6.up() # work_claims table for lease-based coordination
# ...V7 through V13...
Cyclium.Migrations.V14.up() # trigger_requests table for deferred execution
# ...V15 through V18...
Cyclium.Migrations.V19.up() # SQL Server: convert legacy TEXT columns to nvarchar(max)
end
def down do
Cyclium.Migrations.V19.down()
Cyclium.Migrations.V14.down()
# ...V13 through V7...
Cyclium.Migrations.V6.down()
Cyclium.Migrations.V5.down()
Cyclium.Migrations.V4.down()
Cyclium.Migrations.V3.down()
Cyclium.Migrations.V2.down()
Cyclium.Migrations.V1.down()
end
```
> **Authoring migrations:** do not use bare `:text`. On `Ecto.Adapters.Tds` it
> emits SQL Server's legacy non-Unicode `TEXT` type, which silently replaces
> emoji and other non-CP1252 characters with `?`. Use `{:string, size: :max}`
> (which becomes `nvarchar(max)` on Tds and `TEXT` on Postgres/SQLite), or
> branch on `repo().__adapter__()` for finer control. V19 is the one-shot
> repair migration for columns that were already declared `:text`.
### 3. Configure
```elixir
# config.exs
config :cyclium, :repo, MyApp.Repo
# Optional: registry for strategy/synthesizer overrides (see "Strategy registry" section)
# config :cyclium, :strategy_registry, MyApp.StrategyRegistry
# Optional: episode runner (default: Cyclium.Runner.OTP)
# Use Cyclium.Runner.Deferred for trigger-only mode (see "Trigger-Only Mode" section)
config :cyclium, :runner, Cyclium.Runner.OTP
# Optional: node identity override for shared-name environments (see "Node Identity")
# config :cyclium, :node_identity, "my-unique-node-name"
# Optional: tool capabilities
config :cyclium, :capability_registry, %{
erp_read: MyApp.Tools.ERP,
vendor_api: MyApp.Tools.VendorAPI
}
# Optional: output adapters
config :cyclium, :output_adapters, %{
email: MyApp.Adapters.Email,
slack: MyApp.Adapters.Slack
}
# Optional: checkpoint schemas for versioned state migration
config :cyclium, :checkpoint_schemas, %{
{"client_health_actor", "client_should_be_healthy"} => MyApp.Checkpoints.HealthCheck
}
# Optional: enable reconciler for hot spec changes
config :cyclium, :reconciler, true
# Optional: register workflows
config :cyclium, :workflows, [MyApp.Workflows.ClientReview]
```
### 4. Declare strategies on expectations
The preferred approach is declaring the strategy module directly on each expectation in the actor DSL. Cyclium registers the mapping automatically when the actor GenServer boots — no separate registry needed:
```elixir
defmodule MyApp.Actors.ClientHealthActor do
use Cyclium.Actor
actor do
domain(:client_health)
spec_rev("v0.1.0")
synthesizer(MyApp.Synthesizers.ClientHealth) # actor-level default
max_concurrent_episodes(5)
episode_overflow(:queue)
expectation(:client_should_be_healthy,
strategy: MyApp.Strategies.ClientHealth,
trigger: {:event, "client.health_check_requested"},
subject_key: :client_id,
debounce_ms: :timer.seconds(3),
budget: %{max_turns: 3, max_tokens: 1_000, max_wall_ms: 10_000}
)
expectation(:client_ai_summary,
strategy: MyApp.Strategies.ClientAdvisor,
synthesizer: MyApp.Synthesizers.FastSummary, # override for this expectation
trigger: {:event, "client.summary_requested"},
budget: %{max_turns: 5, max_tokens: 5_000, max_wall_ms: 60_000}
)
end
end
```
**Optional: strategy registry for overrides**
If you need to override a strategy or synthesizer without changing the actor code — for example, in a staging environment or during an A/B test — you can configure a registry:
```elixir
# config.exs (optional)
config :cyclium, :strategy_registry, MyApp.StrategyRegistry
defmodule MyApp.StrategyRegistry do
# Only add clauses for explicit overrides — anything not matched here
# falls through to the actor's declared strategy.
def strategy_for("client_health_actor", _exp), do: MyApp.Strategies.ClientHealthV2
end
```
### 5. Supervision tree
```elixir
# application.ex
children = [
MyApp.Repo,
{Phoenix.PubSub, name: MyApp.PubSub},
{Cyclium.Supervisor, pubsub: MyApp.PubSub},
MyApp.Actors.ClientHealthActor,
MyApp.Actors.ClientAdvisorActor,
MyAppWeb.Endpoint
]
```
`Cyclium.Supervisor` starts the DynamicSupervisors, TaskSupervisor, and optionally the Reconciler and WorkflowEngine.
## Budgets
Every expectation declares a budget. The runner enforces all three dimensions:
```elixir
budget: %{
max_turns: 12, # loop iterations (incremented every next_step call)
max_tokens: 25_000, # LLM token cost (incremented by tool_call results)
max_wall_ms: 120_000 # wall-clock deadline (enforced via Process.send_after)
}
```
When any limit is hit, the episode fails with `error_class: "budget_exceeded"`. Wall time is enforced asynchronously — a `:budget_wall_exceeded` message interrupts the loop even if the strategy is blocked on a tool call.
## Deduplication: actor-level vs. strategy-level
Cyclium provides three layers of temporal dedup:
**Actor-level global (`cooldown_ms`)** — enforced by the actor GenServer before an episode starts. Simple and zero-cost, but applies globally to the expectation — *all* subjects are blocked during the window.
```elixir
expectation(:client_ai_summary,
strategy: MyApp.Strategies.ClientAdvisor,
trigger: {:event, "client.summary_requested"},
cooldown_ms: :timer.minutes(5) # no advisor episodes for ANY client for 5 min
)
```
**Actor-level per-subject (`subject_key` + `debounce_ms`/`cooldown_ms`)** — when `subject_key` is set, debounce and cooldown are scoped per subject value. Each unique subject gets its own independent trailing-edge timer and cooldown window. Client A and client B are debounced independently.
```elixir
expectation(:client_ai_summary,
strategy: MyApp.Strategies.ClientAdvisor,
trigger: {:event, "client.summary_requested"},
subject_key: :client_id,
debounce_ms: :timer.seconds(10), # trailing-edge, per-client
cooldown_ms: :timer.minutes(5) # minimum gap, per-client
)
```
When `subject_key` is set but the payload doesn't contain that key, the subject value is `nil` and the key becomes `{expectation_id, nil}` — still isolated from real subjects, never crashing.
**Strategy-level (`Findings.recent?/2`)** — checked in `init/2` using the existing finding for a specific subject. This is DB-backed, so it survives actor restarts unlike the in-memory actor-level dedup.
```elixir
@summary_cooldown_ms :timer.minutes(5)
def init(_episode, trigger) do
client_id = trigger.payload["client_id"]
skip = Cyclium.Findings.recent?("client:advisor:#{client_id}", @summary_cooldown_ms)
{:ok, %{client_id: client_id, skip: skip}}
end
def next_step(%{skip: true}, _ctx), do: :done
```
**When to use which:**
| Scenario | Use |
|---|---|
| Rate-limit a high-frequency trigger globally | `cooldown_ms` on the expectation |
| Coalesce rapid events per subject before firing | `subject_key` + `debounce_ms` |
| Minimum gap between runs per subject | `subject_key` + `cooldown_ms` |
| Dedup that survives actor restarts (DB-backed) | `Findings.recent?/2` in `init/2` |
| Belt-and-suspenders | `subject_key` + `debounce_ms` as fast path, `Findings.recent?/2` for persistence across restarts |
## Workflows
Workflows coordinate multiple actors in a dependency graph. Data flows between steps via the `workflow_result/2` strategy callback and `input:` functions on downstream steps.
### Defining a workflow
```elixir
defmodule MyApp.Workflows.ClientReview do
use Cyclium.Workflow
workflow do
trigger {:event, "client.updated"}
debounce_ms :timer.seconds(3)
subject_key :client_id
step :health_check,
actor: :client_health_actor,
expectation: :client_should_be_healthy
step :ai_summary,
actor: :client_advisor_actor,
expectation: :client_ai_summary,
depends_on: [:health_check],
input: fn _trigger, prior ->
# prior[:health_check] contains the map returned by
# the health strategy's workflow_result/2 callback
%{client_id: prior[:health_check].client_id}
end
on_failure :health_check, policy: :retry, max_step_attempts: 3, backoff_ms: 5_000
on_failure :ai_summary, policy: :abort
end
end
```
**Workflow debounce:** `debounce_ms` and `subject_key` coalesce rapid events before starting the workflow. When `subject_key` is set, each unique subject value gets its own debounce window — client A and client B debounce independently. Without `subject_key`, all events for the workflow share a single timer. Each new event resets the debounce window (trailing-edge). This is useful for workflows that trigger on high-frequency events like `"entity.updated"`.
**Episode reuse (cross-workflow dedup):** By default, workflows reuse recent completed episodes when two workflows trigger the same actor + expectation + input within a 5-minute window. To disable this:
```elixir
workflow do
disable_episode_reuse
trigger {:event, "client.review_requested"}
step :health_check, actor: :client_health_actor, expectation: :client_should_be_healthy
end
```
**Cancellation cascade:** When a workflow fails, pending and retrying steps are automatically canceled. To also clear active findings raised by the workflow's episodes, set `clear_findings_on_cancel` in the workflow instance metadata.
### Passing data between steps
When a workflow step completes, the engine calls the strategy's optional `workflow_result/2` callback to extract the data that downstream steps receive via `prior`. If `workflow_result/2` is not implemented, downstream steps receive `nil` for that step's prior.
```elixir
defmodule MyApp.Strategies.ClientHealth do
@behaviour Cyclium.EpisodeRunner.Strategy
# ... init, next_step, handle_result, converge as usual ...
# Optional: extract data for downstream workflow steps
@impl true
def workflow_result(state, _converge_result) do
# This map becomes prior[:health_check] in downstream input functions
%{client_id: state.client_id, classification: state.classification}
end
end
```
### Configuration and usage
Register workflows in config:
```elixir
config :cyclium, :workflows, [MyApp.Workflows.ClientReview]
```
The `WorkflowEngine` GenServer:
- Listens for trigger events on the Bus
- Creates a `WorkflowInstance` record to track execution
- Fires steps in dependency order (DAG validated at compile time)
- Passes data between steps via `workflow_result/2` → `input` functions
- Applies failure policies per-step: `:abort` (cancel all), `:retry` (with backoff), `:pause` (wait for manual intervention)
Workflows can also be started manually:
```elixir
Cyclium.WorkflowEngine.start_workflow(
MyApp.Workflows.ClientReview,
%{client_id: "123"},
[]
)
```
### Dynamic workflows
Dynamic workflows can be defined in the database and registered at runtime — no compiled modules required. They follow the same step-dependency model as compiled workflows but use declarative input mappings instead of Elixir functions.
#### Defining a workflow in the database
Insert a row into `cyclium_workflow_definitions`:
```elixir
%Cyclium.Schemas.WorkflowDefinition{
workflow_id: "vendor_onboarding",
trigger_type: "event",
trigger_event: "vendor.registration_submitted",
steps: Jason.encode!([
%{
"id" => "compliance_check",
"actor_id" => "compliance_monitor",
"expectation" => "vendor_risk",
"depends_on" => [],
"input_map" => %{"vendor_id" => "trigger.vendor_id"},
"failure_policy" => "abort"
},
%{
"id" => "connector_setup",
"actor_id" => "integration_actor",
"expectation" => "setup_connector",
"depends_on" => ["compliance_check"],
"input_map" => %{
"vendor_id" => "trigger.vendor_id",
"risk_level" => "prior.compliance_check.classification.primary"
},
"failure_policy" => "retry",
"max_step_attempts" => 2,
"backoff_ms" => 30000
}
]),
enabled: true
}
```
#### Input mapping syntax
Dynamic workflows use dot-notation paths instead of Elixir functions:
| Path | Resolves to |
|------|-------------|
| `"trigger.order_id"` | `trigger_ref["order_id"]` |
| `"prior.validate.classification.primary"` | `prior[:validate][:classification]["primary"]` |
| `"fast"` (no prefix) | Static value `"fast"` |
#### Loading dynamic workflows
```elixir
# At startup — loads all enabled definitions
Cyclium.DynamicWorkflow.Loader.load_all()
# Load a single workflow
Cyclium.DynamicWorkflow.Loader.load("vendor_onboarding")
# Reload after updating the definition in DB
Cyclium.DynamicWorkflow.Loader.reload("vendor_onboarding")
# Unregister a workflow
Cyclium.DynamicWorkflow.Loader.unload("vendor_onboarding")
```
#### Starting dynamic workflows manually
```elixir
Cyclium.WorkflowEngine.start_dynamic_workflow(
"vendor_onboarding",
%{"vendor_id" => "v-123"}
)
```
Dynamic workflows are event-triggered (via Bus) like compiled workflows. The Watcher also listens for `workflow_definition.created/updated/disabled` events for automatic refresh.
## Circuit breaker
Per-expectation circuit breaker prevents cascading failures when a tool or external service is down. When consecutive episode failures exceed a threshold, the circuit opens and rejects new episodes. After a cooldown period, one probe episode is allowed through (half-open state) — if it succeeds, the circuit closes.
```elixir
expectation(:check_vendor_api,
strategy: MyApp.Strategies.VendorCheck,
trigger: {:event, "vendor.updated"},
circuit_breaker: %{
threshold: 5, # consecutive failures to trip
half_open_after_ms: 60_000, # cooldown before probe
cancel_in_flight: false # cancel running episodes when circuit trips
}
)
```
**States:** `:closed` (normal) → `:open` (rejecting) → `:half_open` (probe) → `:closed`
**In-flight cancellation:** When `cancel_in_flight: true`, tripping the circuit also cancels any running or blocked episodes for that actor + expectation, preventing wasted work against a known-broken dependency.
**Scope:** Circuit breakers are node-local (ETS-backed). In a cluster, each node tracks failures independently. This is intentional — a service might be unreachable from one node but fine from another. For cluster-wide coordination, combine with the Bus (circuit breaker events are broadcast).
Force-fired episodes bypass the circuit breaker check.
Query state: `Cyclium.CircuitBreaker.get_state(actor_id, expectation_id)`
## Episode sampling
Probabilistic episode firing for high-frequency triggers. Set `sample_rate` on an expectation to control what fraction of triggers actually fire episodes:
```elixir
expectation(:health_check,
strategy: MyApp.Strategies.MetricsCheck,
trigger: {:event, "metrics.updated"},
sample_rate: 0.1 # fire ~10% of triggers
)
```
- `nil` or `1.0` = always fire (default)
- `0.0` = never fire
- Sampled-out episodes emit `[:cyclium, :episode, :sampled_out]` telemetry
- Force-fired episodes bypass sampling
## Service level tracking
Declarative performance objectives with automatic breach detection. Define success rate and duration thresholds per expectation:
```elixir
expectation(:process_order,
strategy: MyApp.Strategies.OrderProcessor,
trigger: {:event, "order.created"},
service_levels: %{
max_duration_ms: 30_000, # p95 target
success_rate: 0.95, # 95% success target
window_episodes: 20 # rolling window size
}
)
```
Breaches emit `[:cyclium, :service_levels, :breach]` telemetry and a `"service_levels.breach"` Bus event with details:
```elixir
%{type: :success_rate, current: 0.85, threshold: 0.95}
%{type: :duration, current: 45_000, threshold: 30_000}
```
Query metrics: `Cyclium.ServiceLevels.metrics(actor_id, expectation_id)` returns `%{success_rate: f, p95_duration_ms: n, sample_count: n}`.
## Adaptive budgets
Advisory budget tracking based on historical episode resource usage. When enabled, Cyclium records turns, tokens, and wall time for each completed episode and recommends budgets based on p95 values with 25% headroom.
```elixir
expectation(:classify_ticket,
strategy: MyApp.Strategies.TicketClassifier,
trigger: {:event, "ticket.created"},
adaptive_budget: true
)
```
Query recommendations:
```elixir
# After enough samples (minimum 5):
Cyclium.AdaptiveBudget.recommend(actor_id, expectation_id)
# => %{max_turns: 8, max_tokens: 15_000, max_wall_ms: 25_000}
# Detailed stats:
Cyclium.AdaptiveBudget.stats(actor_id, expectation_id)
# => %{samples: 47, p50: %{...}, p95: %{...}, max: %{...}}
```
Adaptive budgets are advisory only — they do not automatically adjust episode budgets. Use the recommendations to tune your expectation configs over time.
## Logging and observability
### Log strategies
Set per-expectation via `log_strategy`. Controls both what gets stored in step journal columns (`args_redacted`, `result_ref`) and what the materialized log renders:
| Strategy | Step journal `args_redacted` | Step journal `result_ref` | Materialized log |
|---|---|---|---|
| `:none` | omitted | omitted | none |
| `:summary_only` | omitted | omitted | one-line status summary |
| `:timeline` | tool name + action only | summary/IDs only | step-by-step with timestamps |
| `:full_debug` | surviving context after earlier layers | full result payload | timeline + args, results, errors |
Use `:full_debug` for audit-sensitive workflows where you need to reconstruct exactly what context an LLM had (EOX predictions, SKU classifications). Use `:timeline` for high-frequency episodes where you want the flow visible without storing full payloads.
**Important:** `log_strategy` is the last layer in a pipeline. By the time it runs, earlier layers have already trimmed the data. "Full" in `:full_debug` means "everything that survived the earlier layers", not necessarily everything the strategy originally passed. See the pipeline below.
The materialized log is built by `LogProjector` reading back from the already-stored step rows — it never re-processes the original payload. Whatever `log_strategy` stored is exactly what appears in the log.
### Storage pipeline for synthesis steps
Every synthesis step passes through three filtering layers in order before anything reaches the database:
```
synthesis payload from next_step
│
▼
1. __transient__ stripping — keys listed in :__transient__ are removed from storage
│ but the synthesizer receives the full payload
▼
2. tool redact callbacks — redact/1 and redact_result/1 on tool steps
│ (not applicable to synthesis, but runs for tool_call steps)
▼
3. log_strategy filtering — controls final shape of args_redacted / result_ref
│
▼
cyclium_episode_steps (args_redacted, result_ref)
│
▼
cyclium_episode_logs (rendered by LogProjector from stored step rows)
```
Each layer has a different scope:
| Layer | Controlled by | Purpose |
|---|---|---|
| `__transient__` | Strategy (per synthesis call) | Pass bulk data to LLM without persisting it |
| `redact/1`, `redact_result/1` | Tool module | Trim domain-specific bulky fields from tool steps |
| `log_strategy` | Expectation | Set overall verbosity for the whole episode |
### Transient synthesis data
Sometimes a strategy needs to pass large data to the synthesizer (full record lists, raw API payloads) that the LLM needs for context but that you don't want persisted in the step journal or materialized log. Mark those keys under `:__transient__` in the synthesis payload:
```elixir
def next_step(state, _ctx) do
orders = load_orders(state.project_id)
{:synthesize, %{
project_id: state.project_id,
project_name: state.project_name,
order_count: length(orders), # small scalar — kept in storage
orders: serialize_orders(orders), # full list — synthesizer needs it, storage does not
evidence: build_evidence(orders), # small structured summary — kept in storage
__transient__: [:orders] # strip :orders before writing args_redacted
}}
end
```
The runner passes the full map (minus `:__transient__` itself) to `synthesizer.synthesize/2`, then drops the listed keys before handing off to `log_strategy` filtering. The synthesizer receives `orders`; the stored step and rendered log do not, regardless of `log_strategy`.
This is the right tool when:
- You need full detail for synthesis quality (long order lists, raw API responses, document text)
- You don't want that data in the audit trail or materialized log
- You still want other context fields (counts, summaries, IDs) persisted for debugging
It is not a substitute for `log_strategy` — if you want no storage at all for an episode type, use `:none` or `:summary_only`. `__transient__` is surgical; `log_strategy` is wholesale.
Materialized logs are stored in `cyclium_episode_logs` by `Cyclium.LogProjector` and can be queried via `Cyclium.Episodes.get_log(episode_id)`.
### Telemetry
Cyclium emits 36 structured telemetry events under the `[:cyclium, ...]` prefix. Attach a handler for development:
```elixir
Cyclium.Telemetry.attach_default_logger()
```
Key events:
| Event | Metadata |
|---|---|
| `[:cyclium, :episode, :completed]` | episode_id, actor_id, output_count, finding_count |
| `[:cyclium, :episode, :failed]` | episode_id, actor_id |
| `[:cyclium, :episode, :sampled_out]` | actor_id, expectation_id |
| `[:cyclium, :step, :tool_call]` | tool, action, episode_id |
| `[:cyclium, :step, :synthesis]` | episode_id |
| `[:cyclium, :finding, :raised]` | finding_key, actor_id, class |
| `[:cyclium, :finding, :cleared]` | finding_key, actor_id, class |
| `[:cyclium, :finding, :expired]` | count |
| `[:cyclium, :finding, :escalated]` | finding_key, actor_id, class |
| `[:cyclium, :finding_sweep, :completed]` | duration_ms, expired_count, escalated_count, node |
| `[:cyclium, :finding_sweep, :failed]` | duration_ms, node, reason |
| `[:cyclium, :output, :delivered]` | type, episode_id |
| `[:cyclium, :actor, :event_received]` | actor_id, event_type |
| `[:cyclium, :actor, :overflow]` | actor_id, policy |
| `[:cyclium, :circuit_breaker, :opened]` | actor_id, expectation_id, consecutive_failures |
| `[:cyclium, :circuit_breaker, :closed]` | actor_id, expectation_id |
| `[:cyclium, :circuit_breaker, :rejected]` | actor_id, expectation_id |
| `[:cyclium, :service_levels, :breach]` | actor_id, expectation_id, type, current, threshold |
| `[:cyclium, :workflow, :step_reused]` | workflow_id, instance_id, step_id, reused_episode_id |
Full list: `Cyclium.Telemetry.events/0`
### Step journal
Every episode action is recorded as an `EpisodeStep` with one of 16 kinds:
`tool_call`, `synthesis`, `observation`, `checkpoint`, `output_proposed`, `output_delivered`, `output_failed`, `approval_requested`, `approval_resolved`, `wait_started`, `wait_resolved`, `finding_raised`, `finding_updated`, `finding_cleared`, `episode_completed`, `episode_failed`
Each step records: `step_no`, `kind`, `tool_name`, `args_redacted`, `result_ref`, `error_class`, `error_detail`, `cost_tokens`, `cost_ms`, `created_at`.
Query steps: `Cyclium.Episodes.list_steps(episode_id)`
## Checkpointing
Strategies can save state mid-episode for crash recovery. Return `{:checkpoint, phase_name}` from `next_step/2` to persist the current state:
```elixir
def next_step(state, _ctx) do
if state.phase == :data_collected do
{:checkpoint, "data_collected"}
else
{:tool_call, :erp_read, :read_po, %{"po_id" => state.po_id}}
end
end
```
The checkpoint saves the full strategy state map to the `cyclium_episode_checkpoints` table. On resume, `EpisodeTask` loads the latest checkpoint by `checkpoint_no` and passes it to the strategy — execution continues from where it left off.
### Checkpoint schema versioning
If your strategy's state shape changes between deploys, register a checkpoint schema to migrate old checkpoints forward:
```elixir
defmodule MyApp.Checkpoints.HealthCheck do
use Cyclium.CheckpointSchema, version: 2
# Migrate from version 1 -> 2
def migrate(1, state), do: {:ok, Map.put(state, :new_field, nil)}
def migrate(2, state), do: {:ok, state}
end
```
Register in config:
```elixir
config :cyclium, :checkpoint_schemas, %{
{"client_health_actor", "client_should_be_healthy"} => MyApp.Checkpoints.HealthCheck
}
```
If migration fails, `EpisodeTask` falls back to a fresh `strategy.init/2` — the episode restarts from scratch.
### When to checkpoint vs. restart
**Use checkpoints** when:
- The strategy accumulates state across many turns that would be expensive to recompute (multi-turn LLM conversations, progressive data aggregation)
- Steps have non-idempotent side effects that shouldn't be repeated
**Use restart (no checkpoint)** when:
- The strategy re-queries all data from the DB each turn (most monitoring/health strategies)
- Side effects are idempotent (findings upsert by key, outputs are deduplicated)
- Episodes are short (< a few minutes)
Most strategies don't need checkpoints — `recovery_policy: :restart` on the expectation handles recovery by re-running the episode from scratch. See the Recovery section below.
## Recovery
Cyclium provides built-in recovery for orphaned episodes after server restarts or deploys.
### The problem
When a node shuts down (deploy, crash, scaling event), in-flight episodes are killed and left as `:running` in the database. Without recovery, these episodes stay orphaned forever.
### Distributed episode claiming
In a multi-node cluster, all nodes run the same actors and receive the same Bus events via PG2. Without coordination, every node would independently create and run an episode for every trigger — tripling (or more) the work. Cyclium uses DB-based coordination with no Redis or leader election required.
#### Episode creation: dedupe_key
When a trigger fires, every node's actor calls `maybe_fire_episode`. Before inserting, the actor generates a deterministic `dedupe_key`:
- **Schedule triggers:** `"schedule:{actor_id}:{expectation_id}:{date}"` — one episode per schedule window
- **Event triggers:** `"event:{actor_id}:{expectation_id}:{payload_hash}"` — one episode per distinct event payload
A filtered unique index on `dedupe_key` (`WHERE dedupe_key IS NOT NULL AND archived_at IS NULL`) ensures only one node's insert succeeds. The other nodes receive a constraint violation and silently skip — no episode created, no work duplicated. Archived episodes are excluded from the constraint so that re-triggered work isn't blocked by old runs.
A random jitter (0-200ms) before insert spreads winners evenly across nodes, preventing one faster node from consistently claiming all episodes:
```elixir
# In Actor.Server.maybe_fire_episode/3
Process.sleep(:rand.uniform(200))
enqueue_episode(state, episode_params)
```
The constraint violation is caught in `enqueue_episode` using the same pattern as the Output router:
```elixir
{:error, %Ecto.Changeset{} = cs} ->
if has_dedupe_violation?(cs) do
Logger.debug("[#{state.actor_id}] Dedupe skip: #{params.dedupe_key}")
end
state
```
#### Recovery claims: optimistic update
After a restart, all surviving nodes run `Cyclium.Recovery.sweep/1`. Each node sees the same list of stale episodes, but only one node can claim each episode:
```elixir
# Episodes.claim_for_recovery/1
from(e in Episode,
where: e.id == ^episode_id and e.status == :running and is_nil(e.archived_at)
)
|> repo().update_all(set: [phase: "recovering"])
```
This is an atomic `UPDATE ... WHERE` — the first node to execute it sets `phase: "recovering"` and gets `{1, _}` back. All other nodes get `{0, _}` (no rows affected) and skip. No locks, no races, no distributed coordination protocol needed.
#### Summary of coordination guarantees
| Scenario | Mechanism | Guarantee |
|----------|-----------|-----------|
| Same trigger on N nodes | `dedupe_key` unique index | Exactly one episode created |
| Same orphan on N nodes | Optimistic `UPDATE ... WHERE` | Exactly one node recovers it |
| Archived episodes | Filtered unique index excludes `archived_at IS NOT NULL` | Re-triggering is not blocked by old runs |
| Node with lower latency | Random jitter (0-200ms) | Winners distributed evenly over time |
### Recovery sweep
`Cyclium.Recovery.sweep/1` finds stale `:running` episodes and recovers them:
1. Query episodes where the most recent step journal entry is older than `stale_after_ms` (default: 2 minutes)
2. Attempt optimistic claim on each stale episode
3. If claimed, apply the expectation's `recovery_policy`:
- `:restart` — enqueue fresh (re-runs `strategy.init/2`)
- `:fail` (default) — mark `:failed` with `error_class: "orphaned"`
4. Emit `[:cyclium, :recovery, :sweep]` telemetry with counts
Policy resolution checks the compiled `:actor_registry` map first, then falls back to the `cyclium_agent_definitions` table for dynamic actors. Unknown actors default to `:fail`.
### Workflow reconciliation
`Cyclium.Recovery.reconcile_workflows/0` handles a related problem: workflow instances stuck in `:running` because the WorkflowEngine missed a Bus event during a restart.
The WorkflowEngine is purely event-driven — it advances workflows when it receives `episode.completed` or `episode.failed` Bus events. If the engine wasn't running when those events fired (e.g. during a deploy), the workflow step_states become stale and the workflow hangs.
Reconciliation fixes this by:
1. Finding all `:running` / `:blocked` workflow instances
2. For each step marked `"running"` in step_states, loading the actual episode
3. If the episode has already reached a terminal state, re-broadcasting the appropriate Bus event
4. The WorkflowEngine handles the replayed event through its normal path — no special logic needed
Call `reconcile_workflows/0` **after** `sweep/1` and after workflow configs are registered (compiled modules booted, dynamic workflows loaded).
### Setting recovery policy
Add `recovery_policy` to the expectation DSL:
```elixir
actor do
expectation(:evaluate_project,
strategy: MyApp.Strategies.ProjectHealth,
trigger: {:event, "project_health.check_requested"},
recovery_policy: :restart,
budget: %{max_turns: 5, max_tokens: 25_000, max_wall_ms: 120_000}
)
end
```
Use `:restart` for idempotent strategies that re-query data from the DB. Use `:fail` (default) for strategies with non-idempotent side effects where automatic recovery could cause harm.
### Wiring up recovery in your app
Add a delayed recovery task to your Cyclium supervisor with an `:actor_registry` map that maps actor `identifier()` strings to their modules. Cyclium looks up the `recovery_policy` from each actor's compiled expectations automatically. Dynamic actors not in the registry are resolved from the DB.
```elixir
# Maps identifier (as DB string) → actor module for recovery sweep.
# Must match the identifier() declared in each actor's DSL block.
@actor_registry %{
"project_health_actor" => MyApp.Actors.ProjectHealthActor,
"client_health_actor" => MyApp.Actors.ClientHealthActor
}
children = [
{Cyclium.Supervisor, pubsub: MyApp.PubSub},
MyApp.Actors.ProjectHealthActor,
MyApp.Actors.ClientHealthActor,
{Task, fn ->
# Wait for cluster to settle after deploy
Process.sleep(:timer.minutes(2))
Cyclium.Recovery.sweep(actor_registry: @actor_registry)
Cyclium.Recovery.reconcile_workflows()
end}
]
```
For custom policy logic, pass `:resolve_policy` instead:
```elixir
Cyclium.Recovery.sweep(
resolve_policy: fn episode ->
if episode.actor_id == "critical_actor", do: :restart, else: :fail
end
)
```
### Deploy sequence
1. Node gets SIGTERM -> 30s graceful shutdown timeout
2. Episodes trap exits, try to finish current step within remaining time
3. Episodes that don't finish stay `:running` in DB
4. After boot, each node waits 2 minutes before running recovery sweep
5. Sweep finds stale episodes via step journal recency
6. First node to claim each orphan via optimistic update handles recovery
7. Workflow reconciliation replays missed Bus events for stale workflow steps
## Step Retry Helper
`Cyclium.Strategy.Retry` provides a lightweight helper for retrying failed steps within an episode. This is distinct from workflow-level retry (`on_failure :step, :retry`) which retries entire episodes — the step retry helper retries individual steps (e.g. a synthesis call) within a single episode run.
### The problem
When a strategy calls `:synthesize` and the LLM provider returns a transient error (timeout, rate limit, 503), the strategy needs to retry. Without a helper, you'd manually track attempt counts in the state map:
```elixir
# Without helper — manual retry tracking
def handle_result(state, %{kind: :synthesis}, {:error, _}) do
if state[:synthesis_retries] < 3 do
{:retry, Map.update(state, :synthesis_retries, 1, &(&1 + 1))}
else
{:abort, "synthesis_failed"}
end
end
```
This is error-prone (forgetting to reset counters, tracking multiple step types, off-by-one errors).
### Using `Cyclium.Strategy.Retry`
```elixir
alias Cyclium.Strategy.Retry
# On success — reset the counter so future failures get fresh attempts
def handle_result(state, %{kind: :synthesis}, {:ok, result}) do
{:ok, state |> Retry.reset(:synthesis) |> Map.put(:assessment, result)}
end
# On failure — retry up to 3 times with 2-second backoff
def handle_result(state, %{kind: :synthesis} = step, {:error, _}) do
case Retry.check(state, step, max_attempts: 3, backoff_ms: 2_000) do
{:retry, new_state} -> {:retry, new_state}
{:give_up, _attempts, new_state} -> {:abort, "synthesis_failed_after_retries"}
end
end
```
When `handle_result` returns `{:retry, state}`, the runner calls `do_loop` → `next_step` again. The strategy should naturally re-emit the same step type (e.g. `:synthesize`) since its phase/state hasn't changed — only the internal `__retries` counter was updated.
### Options
| Option | Default | Description |
|--------|---------|-------------|
| `:max_attempts` | `3` | Total attempts including the original |
| `:backoff_ms` | `0` | Milliseconds to sleep before retry |
| `:step_key` | `step.kind` | Key for tracking — use custom keys to track retries per tool name or phase |
### Custom step keys
Track retries separately for different tool calls within the same episode:
```elixir
def handle_result(state, %{kind: :tool_call, tool_name: tool} = step, {:error, _}) do
case Retry.check(state, step, step_key: {:tool, tool}, max_attempts: 2) do
{:retry, new_state} -> {:retry, new_state}
{:give_up, _attempts, new_state} -> {:ok, %{new_state | phase: :skip_tool}}
end
end
```
### API reference
- `Retry.check(state, step, opts)` — returns `{:retry, state}` or `{:give_up, count, state}`
- `Retry.reset(state, key)` — clears the counter for one key (call on success)
- `Retry.reset_all(state)` — clears all retry tracking
### Retry layers summary
| Layer | Scope | Mechanism | Backoff | Limit |
|-------|-------|-----------|---------|-------|
| **Step retry** (`Strategy.Retry`) | Within one episode | `handle_result` returns `{:retry, state}` | Optional (`backoff_ms`) | `max_attempts` per step key |
| **Episode budget** | Within one episode | Runner checks `max_turns`, `max_tokens`, `max_wall_ms` | N/A | Budget exhaustion |
| **Workflow retry** (`on_failure`) | Across episodes | WorkflowEngine creates new episode | `backoff_ms` (default 5s) | `max_step_attempts` (default 3) |
| **Crash recovery** | After restart | `Recovery.sweep` re-enqueues or fails | N/A | One attempt per `recovery_policy` |
## Work Claims (Distributed Lease Coordination)
For clusters where multiple applications share the same database and actor definitions, work claims provide lease-based coordination to ensure at-most-once execution.
### How it works
1. Before executing an episode, `EpisodeTask` calls `WorkClaims.gate_acquire/3` with the episode's `dedupe_key`
2. If claimed successfully, a `Heartbeat` GenServer renews the lease periodically (every lease/3 seconds)
3. On completion, the claim is marked `:done`; on crash, `:failed`
4. If a node dies, the lease expires and another node can steal it
Work claims also coordinate trigger request dispatch — `TriggerRequests.Poller` acquires a claim per request before dispatching to prevent multiple full-mode nodes from processing the same deferred episode.
### Configuration
```elixir
# Use the built-in Ecto-based implementation:
config :cyclium, work_claims: Cyclium.WorkClaims.EctoClaims
# Or a SQL Server-optimized adapter in consuming apps:
config :cyclium, work_claims: MyApp.WorkClaims.SqlServer
# Lease duration (default: 120 seconds)
config :cyclium, work_claims_lease_seconds: 180
# Or omit work_claims entirely — no claiming, fully backwards compatible
```
When unconfigured, all `gate_*` functions return passthrough values with zero overhead.
### Writing a custom adapter
Implement the `Cyclium.WorkClaims` behaviour with 5 callbacks:
```elixir
defmodule MyApp.WorkClaims.SqlServer do
@behaviour Cyclium.WorkClaims
@impl true
def acquire(dedupe_key, owner_node, opts) do
# Use hints: ["UPDLOCK"] for SQL Server lock acquisition
# Transaction-based: read with lock, then insert or update
end
@impl true
def renew(dedupe_key, owner_node, lease_seconds), do: # ...
def complete(dedupe_key, owner_node), do: # ...
def fail(dedupe_key, owner_node, error_detail), do: # ...
def reclaim_expired(limit), do: # ...
end
```
The default `EctoClaims` implementation uses plain transactions (no lock hints) and works with any Ecto adapter. For SQL Server, a custom adapter can use `hints: ["UPDLOCK"]` on the read query inside the transaction for stronger concurrency guarantees.
### Database table
The `cyclium_work_claims` table is created by V6 migration:
| Column | Type | Notes |
|--------|------|-------|
| `dedupe_key` | string(512) | Unique — matches the episode's dedupe_key |
| `state` | string(32) | `claimed`, `done`, `failed`, `expired` |
| `owner_node` | string(255) | Node holding the lease |
| `lease_until` | utc_datetime | When the lease expires |
| `attempt` | integer | Incremented on each steal/reclaim |
### Integration with recovery
When work claims are configured, `Recovery.sweep/1` uses `gate_acquire` to coordinate across nodes before claiming orphaned episodes. This provides two layers of coordination: the work claim lease prevents concurrent execution, and the optimistic `claim_for_recovery` update prevents duplicate recovery actions.
### Lease tuning
The lease duration (`work_claims_lease_seconds`, default: 120s) controls the trade-off between availability and the "zombie window" — the time between a node crash and another node stealing the work.
| Setting | Zombie window | Heartbeat interval | Good for |
|---------|---------------|-------------------|----------|
| 60s | ~60s | ~20s | Short tasks, fast failover |
| 120s (default) | ~120s | ~40s | Most workloads |
| 300s | ~5min | ~100s | Long-running tasks, flaky networks |
**Guidelines:**
- Heartbeat fires at `lease / 3` — set the lease to at least 3x your worst-case DB round-trip time
- If your episodes typically run for minutes, 120s is fine — the heartbeat keeps the lease alive indefinitely
- If network partitions last >30s regularly, increase the lease to avoid false steals
- After a steal, the new node restarts the episode fresh (strategies should be idempotent)
### Heartbeat failure modes
The heartbeat GenServer is linked to the EpisodeTask process:
- **Heartbeat crashes** — The EpisodeTask traps the EXIT and can restart the heartbeat. The lease has margin (only 1/3 expired per interval), so a brief restart is safe.
- **EpisodeTask crashes** — The heartbeat dies with it. The rescue block marks the claim as `:failed`. If it doesn't (hard kill), the lease expires naturally and another node can steal it.
- **DB becomes unreachable** — Heartbeat renewal fails, lease expires. When DB comes back, another node may steal. This is by design — if you can't reach the DB, you can't guarantee exclusive access.
- **Lost ownership** — If `gate_renew` returns `{:error, :not_owner}` (another node stole the claim), the heartbeat stops itself.
**Idempotency guidance:** Since lease expiry can cause a second node to start the same work, strategies that perform side effects should use idempotency keys. For DB writes, use unique constraints keyed by the episode's dedupe_key or step number. For external API calls, include an idempotency header derived from the episode ID + step number.
### Telemetry events
All events are prefixed with `[:cyclium, :work_claims, ...]`:
| Event | Measurements | Metadata | Meaning |
|-------|-------------|----------|---------|
| `:acquired` | `count`, `duration_ms` | `dedupe_key`, `owner_node` | Fresh claim acquired |
| `:steal` | `count`, `duration_ms` | `dedupe_key`, `owner_node` | Expired claim reclaimed (attempt > 1) |
| `:busy` | `count`, `duration_ms` | `dedupe_key`, `owner_node` | Claim denied — another node holds it |
| `:renewed` | `count` | `dedupe_key`, `owner_node` | Heartbeat renewal succeeded |
| `:renew_failed` | `count` | `dedupe_key`, `owner_node` | Heartbeat renewal failed (lost ownership) |
| `:completed` | `count` | `dedupe_key`, `owner_node` | Work finished, claim released |
| `:failed` | `count` | `dedupe_key`, `owner_node` | Work failed, claim released |
**Key metrics to alert on:**
- `steal` rate > 0 during normal operation → nodes are dying or leases are too short
- `busy` rate proportional to node count → expected (N-1 nodes get busy per dedupe key)
- `renew_failed` > 0 → possible clock drift or DB contention
### Testing work claims
**Unit tests:** Use `Cyclium.WorkClaims.FakeClaims` — an Agent-backed in-memory implementation:
```elixir
setup do
{:ok, _} = Cyclium.WorkClaims.FakeClaims.start_link()
Application.put_env(:cyclium, :work_claims, Cyclium.WorkClaims.FakeClaims)
on_exit(fn -> Application.delete_env(:cyclium, :work_claims) end)
end
test "second acquire is busy" do
assert {:ok, _} = Cyclium.WorkClaims.gate_acquire("key:1", "node-a")
Cyclium.WorkClaims.FakeClaims.set_busy("key:2")
assert {:error, :busy} = Cyclium.WorkClaims.gate_acquire("key:2", "node-b")
end
```
**Integration tests (single node):** Configure `EctoClaims` with your test repo. Verify:
1. Two concurrent `acquire` calls on the same key — one succeeds, one gets `:busy`
2. After `complete`, a new `acquire` on the same key succeeds (reclaim)
3. After lease expiry (set a short lease), `acquire` steals from the previous owner
**Multi-node tests:** Deploy to a staging cluster with 2+ nodes. Use a test actor with a short schedule:
1. Verify only one node's episode runs (check `owner_node` in `cyclium_work_claims`)
2. Kill a node mid-episode, wait for lease expiry, verify another node steals and completes
3. Monitor telemetry — `steal` events should only appear after the kill, never during normal operation
## Node Identity
By default, Cyclium uses `node()` to identify the current BEAM instance for work claims, trigger requests, and recovery coordination. In environments where multiple instances share the same Erlang node name (e.g., dev containers all starting as `app@app`), this breaks lease semantics — every node looks like the same owner.
`Cyclium.NodeIdentity` provides a pluggable identity layer:
```elixir
# Static override — set per instance via config or env var
config :cyclium, :node_identity, "dev-jane"
# MFA callback for dynamic resolution (hostname, env var, etc.)
config :cyclium, :node_identity, {MyApp.NodeIdentity, :resolve, []}
```
When unconfigured and running in non-distributed mode (`:nonode@nohost`), a random stable identity is generated per BEAM instance and stored in `:persistent_term` — unique for the process lifetime but not across restarts.
All work claim operations (`EpisodeTask`, `Heartbeat`, `Runner.Deferred`, `TriggerRequests.Poller`) use `Cyclium.NodeIdentity.name()` instead of raw `node()`.
## Multi-Stack Deployments
A **stack** is one logical cyclium cluster that shares a database with other clusters but runs its own Elixir nodes (its own `Phoenix.PubSub`, its own in-memory `persistent_term` / ETS registries). Typical reasons to run multiple stacks against one schema: independent release cadences, blast-radius isolation, or partitioning actors across regions.
### The library contract is cluster-level
Cyclium itself exposes no per-actor DSL option for stacks. It reads `:stack_slug` once per cluster, stamps every row its actors produce with that value, and scopes Recovery to the matching slug. Which actors actually run on a given cluster is the consumer's decision, implemented in the host app's supervisor.
Episodes, workflow instances, and deferred trigger requests are stamped with the current `source_stack` at insert time. `Cyclium.Recovery.sweep/1` and `Cyclium.Recovery.reconcile_workflows/1` read `:stack_slug` by default and only scan rows from their own stack — this prevents a crashed cluster's work from being re-driven on a cluster whose `persistent_term` / PubSub state doesn't know about it.
### Partitioning actors across stacks
The simplest approach is one actor list per deployment:
```elixir
# On the stack_a cluster's host app:
config :cyclium, :stack_slug, System.get_env("CYCLIUM_STACK_SLUG") # "stack_a"
config :my_app, :cyclium_actors, [StackAOnlyActor, SharedActor]
# On the stack_b cluster's host app:
config :cyclium, :stack_slug, System.get_env("CYCLIUM_STACK_SLUG") # "stack_b"
config :my_app, :cyclium_actors, [StackBOnlyActor, SharedActor]
```
A richer approach declares the allowed stacks on each actor's child-spec and has the supervisor filter the list at init time — useful when the same release is deployed to every cluster and you want a single source of truth for which actor runs where:
```elixir
config :my_app, :cyclium_actors, [
{SharedActor, []},
{StackAOnlyActor, stacks: [:stack_a]},
{CrossStackActor, stacks: [:stack_a, :stack_b]}
]
# In the supervisor's init/1:
stack = Application.get_env(:cyclium, :stack_slug)
children = Enum.filter(configured_actors, &actor_runs_on_stack?(&1, stack))
```
Either pattern is fine — cyclium doesn't care how the list was built, only what actually gets supervised. Because each cluster has its own node processes, PubSub, and `persistent_term` cache, a stack-local actor's strategy / budget / log-strategy lookups only exist on the cluster that supervises it — which is exactly why Recovery must be stack-scoped.
### Runtime configuration
For a single release that can be deployed into multiple stacks, drive the slug from an env var in `runtime.exs`:
```elixir
# runtime.exs
config :cyclium, :stack_slug, System.get_env("CYCLIUM_STACK_SLUG")
```
Leaving `:stack_slug` unset (or `nil`) is the single-stack default: rows are stamped `NULL` and Recovery scans without a stack filter. Pre-migration rows with `NULL` `source_stack` are swept by any stack for one release so legacy episodes aren't orphaned — once all rows have been stamped, you can tighten Recovery by stamping a real slug on every cluster.
### Related config
- `:stack_slug` — identity of *this* cluster (stamps rows and scopes Recovery)
- `:trigger_poll_source_stack` — separate filter on `TriggerRequests.Poller`: which stacks' deferred requests this full-mode node will pick up (often the same value as `:stack_slug`, but can be broader)
- `Cyclium.StackSlug.current/0` — read the slug; returns `nil` when unset
## Trigger-Only Mode (Deferred Execution)
In shared environments — dev machines on a common test DB, QC/sandbox instances, CI — running cyclium actors on every node leads to competing work claims and unpredictable execution. Conversely, disabling cyclium entirely on non-processing nodes leaves the UI hobbled: episodes never fire, statuses never update.
Trigger-only mode solves both problems by decoupling event processing from episode execution.
### Three operating modes
| Mode | Actors start? | Events flow? | Episodes execute locally? | Trigger requests written? |
|------|-------------|-------------|--------------------------|--------------------------|
| `:full` | yes | yes | yes | no (direct) |
| `:trigger_only` | yes | yes | no | yes (to DB) |
| `:disabled` | no | no | no | no |
### How it works
In **`:trigger_only`** mode, the actor supervision tree starts normally — Bus subscriptions, schedule timers, debounce, circuit breakers all work. But the runner is swapped to `Cyclium.Runner.Deferred`, which writes a row to `cyclium_trigger_requests` instead of spawning a Task. The episode record is still created in the DB so the UI can display it.
On **`:full`** mode nodes, a `Cyclium.TriggerRequests.Poller` watches the trigger requests table and dispatches deferred episodes to `Runner.OTP` for local execution. The poller uses `WorkClaims.gate_acquire/3` (with dedupe key `"trigger_request:<id>"`) to coordinate dispatch across nodes — only one full-mode node will pick up a given request. If work claims are not configured, the poller falls through to passthrough mode. The poller can be scoped by `source_stack` to only pick up requests from specific stacks.
### Configuration
```elixir
# Host app config — set per environment
config :my_app, :cyclium_mode, :full # :full | :trigger_only | :disabled
# On full-mode nodes: enable the poller
config :cyclium, :trigger_poller, true
config :cyclium, :trigger_poll_interval_ms, 5_000 # default
config :cyclium, :trigger_poll_source_stack, "stack_a" # nil = pick up all
# On trigger-only nodes: runner is set automatically
# config :cyclium, :runner, Cyclium.Runner.Deferred
# config :cyclium, :stack_slug, :stack_a
```
### Deployment scenarios
**Dev machines + shared test DB:**
- QC/test node runs `:full` with the poller enabled
- Dev machines run `:trigger_only` — events flow, UI works, episodes defer to the processing node
- A dev who wants to process locally overrides to `:full` and narrows their actor list
**Sandbox / feature-branch testing:**
- Sandbox runs `:trigger_only` — UI flows complete, episode records exist, Bus events fire
- Designated processing node runs `:full` and picks up deferred triggers
**Production (unchanged):**
- All nodes run `:full` as before; work claims handle multi-node coordination
- The trigger requests table stays empty
### Database table
The `cyclium_trigger_requests` table (V14 migration):
| Column | Type | Notes |
|--------|------|-------|
| `episode_id` | binary_id | FK to `cyclium_episodes` |
| `actor_id` | string | Actor that created the trigger |
| `expectation_id` | string | Expectation that fired |
| `source_node` | string | Node identity of the trigger-only instance |
| `source_stack` | string | Stack slug for scoped polling |
| `status` | string | `pending`, `claimed`, `completed`, `expired` |
| `opts` | map | Runner options (e.g., resume flag) |
| `claimed_by` | string | Node identity of the full-mode instance |
Indexed on `(status, inserted_at)` for efficient polling.
### Runtime mode switching
`Cyclium.Mode` supports live mode changes without restart — both node-wide and per-actor:
```elixir
# Switch the whole node (via remote console, admin endpoint, etc.)
Cyclium.Mode.set(:trigger_only) # stop local execution, defer to DB
Cyclium.Mode.set(:full) # resume local execution + polling
# Per-actor override — yield one actor to another node while keeping the rest
Cyclium.Mode.set_actor_override(:client_health, :trigger_only)
Cyclium.Mode.clear_actor_override(:client_health)
Cyclium.Mode.clear_all_overrides()
# Inspect current state
Cyclium.Mode.status()
# %{node_mode: :full, overrides: %{client_health: :trigger_only}, node_identity: "..."}
```
Mode reads are ETS-backed (`read_concurrency: true`) for zero overhead in hot paths. The trigger request poller self-gates on each cycle — it only polls when the node-wide mode is `:full`.
## Dynamic Actors
Dynamic actors allow agent definitions to be stored in the database and hydrated into running supervised processes at runtime — without requiring compiled Elixir modules.
### When to use dynamic actors
- Users create custom monitoring agents through a UI
- Agent definitions are stored per-tenant
- Agent configurations change frequently without requiring code deploys
### How it works
A single `Cyclium.DynamicActor` GenServer module serves all DB-defined actors. Each instance is started with different config/expectations args under `Cyclium.ActorSupervisor`. This avoids runtime module pollution — no `Module.create/3` needed.
```
Cyclium.ActorSupervisor (DynamicSupervisor)
├── MyApp.Agents.CompiledActor (compiled, use Cyclium.Actor)
├── Cyclium.DynamicActor (from DB: "user_monitor_1")
└── Cyclium.DynamicActor (from DB: "user_monitor_2")
```
### Defining an agent in the database
Insert a row into `cyclium_agent_definitions`:
```elixir
%Cyclium.Schemas.AgentDefinition{
actor_id: "custom_health_check",
domain: "monitoring",
strategy_template: "observe_classify_converge", # built-in template
config: Jason.encode!(%{max_concurrent_episodes: 3, episode_overflow: "queue"}),
expectations: Jason.encode!([
%{
id: "check_target",
trigger: %{type: "schedule", interval_ms: 300_000},
budget: %{max_turns: 5, max_tokens: 10_000, max_wall_ms: 60_000},
log_strategy: "timeline"
}
]),
enabled: true
}
```
### Loading dynamic actors
```elixir
# At application startup — loads all enabled definitions
Cyclium.DynamicActor.Loader.load_all()
# Load a single actor
Cyclium.DynamicActor.Loader.load("custom_health_check")
# Reload after updating the definition in DB
Cyclium.DynamicActor.Loader.reload("custom_health_check")
# Stop a dynamic actor
Cyclium.DynamicActor.Loader.stop("custom_health_check")
```
### Database table
`cyclium_agent_definitions` (V7 migration):
| Column | Type | Notes |
|--------|------|-------|
| `id` | uuid | PK |
| `actor_id` | string(255) | Unique identifier |
| `domain` | string(255) | Grouping domain |
| `config` | text (JSON) | `max_concurrent_episodes`, `episode_overflow`, etc. |
| `expectations` | text (JSON) | Array of expectation definitions |
| `strategy_ref` | string(255) | Strategy module or registry lookup key |
| `strategy_template` | string(255) | Template name (e.g. `"observe_synthesize_converge"`) |
| `strategy_config` | text (JSON) | Template parameters (gatherer, system_prompt, finding_config, etc.) |
| `enabled` | boolean | Soft toggle |
| `created_by` | string(255) | User/tenant |
### Strategy templates (data-driven strategies)
Dynamic actors use **strategy templates** — built-in parameterized strategy modules that are configured via `strategy_config` JSON in the DB. The compiled app defines what data sources (gatherers) and outputs are available; the DB definition composes them.
| Template | Pattern | Use Case |
|----------|---------|----------|
| `"observe_synthesize_converge"` | Gather → LLM → Finding | Health checks, advisors, analysis |
| `"observe_classify_converge"` | Gather → Rules → Finding | Threshold/rule-based monitoring |
| `"dispatch"` | Load entities → Broadcast events | Fan-out triggers |
#### Gatherers
Gatherers are compiled modules that know how to collect domain-specific data. The compiled app implements and registers them:
```elixir
defmodule MyApp.Gatherers.ProjectData do
@behaviour Cyclium.Gatherer
@impl true
def gather(trigger_payload, _opts) do
project_id = trigger_payload["project_id"]
project = Repo.get!(Project, project_id)
orders = load_orders(project_id)
{:ok, %{project: project, orders: orders, order_count: length(orders)}}
end
end
```
Register in app config:
```elixir
config :cyclium, :gatherer_registry, %{
"project_data" => MyApp.Gatherers.ProjectData,
"client_metrics" => MyApp.Gatherers.ClientMetrics
}
```
#### Observe → Synthesize → Converge
The main workhorse. Gathers data, sends to LLM, maps result to findings:
```elixir
%AgentDefinition{
actor_id: "project_health_dynamic",
strategy_template: "observe_synthesize_converge",
strategy_config: Jason.encode!(%{
"gatherer" => "project_data",
"system_prompt" => "You are a project health analyst. Evaluate the project data and classify its health status.",
"finding_config" => %{
"actor_id_field" => "project_health_dynamic",
"finding_key_template" => "project:health:${subject_id}",
"class_field" => "class",
"severity_field" => "severity",
"summary_field" => "summary",
"subject_kind" => "project",
"subject_id_key" => "project_id"
},
"outputs" => ["email"]
}),
expectations: Jason.encode!([
%{id: "evaluate", trigger: %{type: "event", event_type: "project.check_requested"}}
])
}
```
#### Observe → Classify → Converge
Rule-based classification without LLM. Rules are evaluated in order, first match wins:
```elixir
%AgentDefinition{
actor_id: "client_risk_monitor",
strategy_template: "observe_classify_converge",
strategy_config: Jason.encode!(%{
"gatherer" => "client_metrics",
"classify_rules" => [
%{"field" => "mrr", "op" => "lt", "value" => 500, "class" => "at_risk", "severity" => "high"},
%{"field" => "last_login_days_ago", "op" => "gt", "value" => 30, "class" => "inactive", "severity" => "medium"}
],
"default_class" => "healthy",
"default_severity" => "low",
"finding_config" => %{
"finding_key_template" => "client:risk:${subject_id}",
"subject_kind" => "client",
"subject_id_key" => "client_id"
}
})
}
```
Rule operators: `lt`, `gt`, `eq`, `neq`, `in`, `not_in`.
#### Dispatch
Fan-out pattern. Calls a gatherer that returns a list of entities, broadcasts an event for each:
```elixir
%AgentDefinition{
actor_id: "project_dispatch",
strategy_template: "dispatch",
strategy_config: Jason.encode!(%{
"gatherer" => "active_projects",
"event_type" => "project.check_requested",
"entity_id_field" => "id",
"entity_payload_fields" => ["id", "name"]
})
}
```
#### Strategy resolution for dynamic actors
Dynamic actors use the same `:persistent_term` registration path as compiled actors. When a dynamic actor boots, `Loader` resolves the strategy from `strategy_template` and injects it into each expectation — `init_state_from_config` then registers it automatically. No strategy registry entries needed.
If you need to override a strategy without updating the DB record, add a `strategy_for/2` clause to your registry as usual.
Custom templates can be registered in app config:
```elixir
config :cyclium, :strategy_templates, %{
"my_custom_template" => MyApp.Strategies.CustomTemplate
}
```
### Lifecycle and draining
Safe lifecycle operations for updating dynamic actors without losing in-flight episodes.
#### Drain and reload
```elixir
# Graceful: waits for active episodes to finish, then reloads from DB
Cyclium.DynamicActor.Lifecycle.drain_and_reload("my_monitor")
# Graceful stop (waits for episodes)
Cyclium.DynamicActor.Lifecycle.drain_and_stop("my_monitor")
# Instant stop/reload (existing behavior, may lose in-flight episodes)
Cyclium.DynamicActor.Loader.stop("my_monitor")
Cyclium.DynamicActor.Loader.reload("my_monitor")
```
#### Event-driven refresh
Start the optional `Watcher` in your supervision tree for automatic refresh:
```elixir
children = [
# ... your app ...
Cyclium.DynamicActor.Watcher
]
```
Then broadcast events when definitions change:
```elixir
# Agent definitions:
Cyclium.Bus.broadcast("agent_definition.created", %{actor_id: "my_monitor"})
Cyclium.Bus.broadcast("agent_definition.updated", %{actor_id: "my_monitor"})
Cyclium.Bus.broadcast("agent_definition.disabled", %{actor_id: "my_monitor"})
# Workflow definitions:
Cyclium.Bus.broadcast("workflow_definition.created", %{workflow_id: "onboarding"})
Cyclium.Bus.broadcast("workflow_definition.updated", %{workflow_id: "onboarding"})
Cyclium.Bus.broadcast("workflow_definition.disabled", %{workflow_id: "onboarding"})
```
The Watcher handles each event appropriately — `created` loads, `updated` reloads, `disabled` stops/unloads. For actors, updates use drain-and-reload to preserve in-flight episodes.
#### Deploy patterns
**Rolling deploy:**
```elixir
# In application stop callback or shutdown hook:
Cyclium.DynamicActor.Lifecycle.stop_all(drain: true, timeout: 30_000)
```
**Blue-green:** The new instance calls `Loader.load_all()` on startup. Global name registration ensures only one instance runs per actor across the cluster.
## Dry Runs / Simulations
Dry runs let you test what an actor would do without producing real findings, outputs, or side effects. Useful for validating agent configurations and "what if" testing.
### How dry runs work
An episode with `mode: "dry_run"`:
- Runs the full strategy loop (same `next_step` → `handle_result` cycle)
- **Findings are NOT persisted** — but are journaled for inspection (optionally persistable with prefixed keys via `persist_findings` option)
- **Outputs are NOT delivered** — but output proposals are journaled
- **Tool calls and synthesis can be overridden** with mock responses
- **Steps are fully journaled** — complete audit trail of what *would have* happened
- **Episode is tagged** as `mode: "dry_run"` for filtering in UI and metrics
### Force-firing a dry run
```elixir
# Simplest: real tool calls and synthesis, skip persist
Cyclium.Episodes.force_fire("project_health_actor", "evaluate_project",
mode: :dry_run,
trigger_payload: %{project_id: 123}
)
# With mock overrides — skip real API calls
Cyclium.Episodes.force_fire("project_health_actor", "evaluate_project",
mode: :dry_run,
trigger_payload: %{project_id: 123},
overrides: %{
tool_overrides: %{"erp.get_orders" => [%{id: 1, status: "complete"}]},
synthesis_override: %{"class" => "healthy", "severity" => "low"}
}
)
```
### Override resolution (layered)
Three sources of overrides, checked in priority order:
```
fire-time overrides > expectation-level DSL > real execution
```
1. **Fire-time overrides** — passed to `force_fire/3` as `:overrides` option
2. **Expectation-level DSL** — defined in the actor definition:
```elixir
expectation(:evaluate_project,
strategy: MyApp.Strategies.ProjectHealth,
trigger: {:event, "project_health.check_requested"},
dry_run: [
tool_overrides: %{
{"erp", "get_orders"} => {:ok, %{orders: []}}
},
synthesis_override: {:ok, %{"class" => "healthy"}}
]
)
```
3. **No overrides** — real tool calls and synthesis execute normally, only findings and outputs are skipped
### Persisting findings in dry runs
By default, dry run findings are journaled but not persisted to the DB. You can opt in to persistence with prefixed keys so dry run findings don't collide with live ones:
```elixir
# Persist with default "dry_run" prefix (finding_key becomes "dry_run:po_stalled:PO-123")
Cyclium.Episodes.force_fire("po_monitor", "check_pos",
mode: :dry_run,
overrides: %{persist_findings: true}
)
# Persist with custom prefix (finding_key becomes "experiment1:po_stalled:PO-123")
Cyclium.Episodes.force_fire("po_monitor", "check_pos",
mode: :dry_run,
overrides: %{persist_findings: "experiment1"}
)
```
The prefix is also applied to `actor_id` on persisted findings, so `Findings.active_for(actor: "po_monitor")` won't return dry run findings — use `Findings.active_for(actor: "dry_run:po_monitor")` instead, or use the mode-aware helper:
```elixir
# Automatically prefixes filters when episode is a dry run with persist_findings enabled:
Cyclium.Findings.active_for_mode([actor: "po_monitor"], episode)
```
This can also be set at the expectation level in the actor DSL:
```elixir
expectation(:check_pos,
strategy: MyApp.Strategies.PoCheck,
trigger: {:schedule, 300_000},
dry_run: [persist_findings: true]
)
```
### Via the Actor GenServer
You can also fire dry runs through the actor's message interface:
```elixir
GenServer.cast(MyApp.Agents.ProjectHealth, {:force_fire, :evaluate_project, mode: :dry_run})
```
### Dry run results
The episode completes with full step journal. In the UI:
- "DRY RUN" badge on the episode
- Step timeline shows which steps used mock overrides (`_dry_run: true` in result_ref)
- Findings show what *would have* been created
- Full step-by-step debugging available
### Workflow dry runs
Workflows support dry run mode — every step episode inherits the mode and opts from the workflow instance:
```elixir
# Compiled workflow
WorkflowEngine.start_workflow(MyWorkflow, trigger_data, mode: :dry_run)
# Dynamic workflow with finding persistence
WorkflowEngine.start_dynamic_workflow("order_flow", trigger_data,
mode: :dry_run,
dry_run_opts: %{persist_findings: true}
)
```
All step episodes will run in dry run mode: findings are journaled (and optionally persisted with prefix), outputs are skipped. The workflow instance itself stores `mode` and `dry_run_opts` (V9 migration), so retries and subsequent steps also inherit the mode.
Per-step overrides allow targeting different mocks and options to individual steps:
```elixir
WorkflowEngine.start_dynamic_workflow("vendor_onboarding", trigger_data,
mode: :dry_run,
dry_run_opts: %{
persist_findings: true,
steps: %{
"compliance_check" => %{
"synthesis_override" => %{"class" => "high_risk", "severity" => "high"}
},
"connector_setup" => %{
"tool_overrides" => %{"erp.create_vendor" => %{"id" => "mock-v-001"}},
"persist_findings" => "experiment1"
}
}
}
)
```
Global keys (like `persist_findings: true`) apply to all steps. Step-specific keys override globals for that step. The `"steps"` key itself is stripped from each episode's opts.
Strategies in workflow steps can use `Findings.active_for_mode/3` to transparently query their own dry run findings when `persist_findings` is enabled.
## Tools
External capabilities are registered as tools implementing `Cyclium.Tool`. Use `use Cyclium.Tool` for sensible defaults — the only required callback is `call/3`:
```elixir
defmodule MyApp.Tools.ERP do
use Cyclium.Tool
@impl true
def call(:read_po, args, _ctx) do
case MyApp.ERP.get_po(args["po_id"]) do
{:ok, po} -> {:ok, po}
{:error, reason} -> {:error, reason}
end
end
end
```
Override optional callbacks as needed:
```elixir
defmodule MyApp.Tools.VendorAPI do
use Cyclium.Tool
@impl true
def call(:send_notification, args, _ctx), do: # ...
# Strip credentials before journaling
@impl true
def redact(args), do: Map.drop(args, ["api_key"])
# Strip large payloads from results before journaling
@impl true
def redact_result(result) when is_list(result) do
%{count: length(result), ids: Enum.map(result, & &1.id)}
end
def redact_result(result), do: result
# Mark as having side effects (affects caching/retry behavior)
@impl true
def side_effect?, do: true
# Cache results for 5 minutes
@impl true
def cache_ttl, do: :timer.minutes(5)
# Cache key scope — same PO ID returns cached result
@impl true
def cache_scope(args), do: args["po_id"]
end
```
| Callback | Default | Description |
|---|---|---|
| `call(action, args, ctx)` | *required* | Execute the tool action |
| `redact(args)` | passthrough | Strip sensitive/bulky data from args before journaling |
| `redact_result(result)` | passthrough | Strip bulky data from results before journaling |
| `side_effect?()` | `false` | Whether the action mutates external state |
| `cache_ttl()` | `:no_cache` | How long to cache results (ms) |
| `cache_scope(args)` | `""` | Cache key discriminator |
Register tools in config:
```elixir
config :cyclium, :capability_registry, %{
erp_read: MyApp.Tools.ERP,
vendor_api: MyApp.Tools.VendorAPI
}
```
Strategies invoke tools via `{:tool_call, :erp_read, :read_po, %{"po_id" => "PO-123"}}`. The `ToolExec` wrapper handles capability resolution, caching, redaction, and error classification.
## Reconciler
The optional `Cyclium.Reconciler` watches for `spec.updated` Bus events and reconciles running actors when their configuration changes at runtime:
- Sends updated config to actor GenServers
- Cancels timers for removed expectations
- Starts timers for newly added schedule expectations
- Identifies orphaned blocked episodes (expectation removed) and cancels them
Enable via config:
```elixir
config :cyclium, :reconciler, true
```
Or trigger manually:
```elixir
Cyclium.Reconciler.reconcile_actor(actor_pid, new_module)
```
## LiveView integration
Cyclium integrates with Phoenix LiveView via the Bus. Subscribe in your LiveView's mount and handle events:
```elixir
defmodule MyAppWeb.DashboardLive do
use MyAppWeb, :live_view
def mount(_params, _session, socket) do
if connected?(socket), do: Cyclium.Bus.subscribe()
{:ok, assign(socket, findings: load_findings())}
end
def handle_info({:bus, event, _payload}, socket)
when event in ["finding.raised", "finding.updated", "finding.cleared"] do
{:noreply, assign(socket, findings: load_findings())}
end
def handle_info({:bus, _event, _payload}, socket) do
{:noreply, socket}
end
end
```
## Database tables
All tables use `binary_id` primary keys and are SQL Server 2017 compatible (no JSON operators in DDL, application-layer upserts, denormalized columns for indexed queries).
| Table | Migration | Purpose |
|---|---|---|
| `cyclium_episodes` | V1 | Episode lifecycle, budget tracking, classification |
| `cyclium_episode_steps` | V1 | Step-by-step journal (16 step kinds) |
| `cyclium_episode_checkpoints` | V1 | Versioned strategy state snapshots |
| `cyclium_findings` | V1 | Persistent observations with raise/update/clear lifecycle |
| `cyclium_outputs` | V1 | Output proposals, delivery status, deduplication |
| `cyclium_episode_logs` | V2 | Materialized human-readable logs |
| `cyclium_workflow_instances` | V3, V9 | Workflow execution tracking, step states, dry run mode |
| `cyclium_work_claims` | V6 | Lease-based distributed work coordination |
| `cyclium_agent_definitions` | V7 | DB-stored actor definitions for dynamic actors |
| `cyclium_workflow_definitions` | V8 | DB-stored workflow definitions for dynamic workflows |
| `cyclium_trigger_requests` | V14 | Deferred episode execution for trigger-only nodes |
V4 adds `archived_at` to episodes and findings. V5 replaces the non-unique `dedupe_key` index on episodes with a filtered unique index (`WHERE dedupe_key IS NOT NULL AND archived_at IS NULL`) for multi-node coordination. V6 adds the `cyclium_work_claims` table for lease-based distributed coordination across clustered nodes. V7 adds `cyclium_agent_definitions` for dynamic actors and `mode`/`dry_run_opts` columns to episodes for simulation support. V8 adds `cyclium_workflow_definitions` for dynamic workflows. V10 adds `caused_by_key` (finding causality chains) and `expires_at` (TTL-based expiration) to `cyclium_findings`.
## Window helpers
`Cyclium.Window` provides clock-aligned deduplication buckets for output `dedupe_key` construction:
```elixir
Cyclium.Window.bucket(:h4, DateTime.utc_now()) # "2026-02-24T08" (4-hour windows)
Cyclium.Window.bucket(:h24, DateTime.utc_now()) # "2026-02-24" (daily)
Cyclium.Window.bucket(:h48, DateTime.utc_now()) # "2026-02-24" (every-other-day)
Cyclium.Window.bucket(:w1, DateTime.utc_now()) # "2026-W09" (ISO week)
```
Use these in `dedupe_key` to prevent duplicate outputs within a time window:
```elixir
dedupe_key: "alert:client:#{id}:#{Cyclium.Window.bucket(:h4, DateTime.utc_now())}"
```
## Batch helpers
`Cyclium.Batch` provides a lightweight struct for strategies that process data in grouped batches across multiple `:synthesize` calls. No new step types — strategies continue using `:tool_call` and `:synthesize` as normal.
```elixir
# Group items semantically (e.g., by base item so variants are compared together)
groups = Cyclium.Batch.group_by(items, & &1.base_item_id)
batch = Cyclium.Batch.init(groups)
# Or chunk by fixed size
batch = items |> Cyclium.Batch.chunk(10) |> Cyclium.Batch.init()
```
In `next_step`, drive the loop:
```elixir
case Cyclium.Batch.current_group(state.batch) do
nil -> :converge # all groups processed
{group_key, items} -> {:synthesize, build_prompt(group_key, items)}
end
```
In `handle_result`, advance:
```elixir
batch = Cyclium.Batch.advance(state.batch, parsed_result)
{:ok, %{state | batch: batch}}
```
Progress tracking via `Batch.group_count/1`, `Batch.processed_count/1`, and `Batch.done?/1`.
## Per-item episodes vs. batch processing
Batch helpers are useful when a single episode needs to process many items in groups — but they're not the only pattern. An alternative is to **fire one episode per item**, driven by domain events. This is the pattern used by the project health actor.
**The difference:**
| Approach | When to use | Episode count |
|---|---|---|
| Batch (single episode) | Scheduled sweep over a large dataset; items need cross-comparison | One episode, many `:synthesize` turns |
| Per-item (many episodes) | Event-driven re-evaluation; each item is independent | One episode per item |
**Per-item pattern — ProjectHealthActor:**
The actor listens for `"project.updated"` events. Each event carries a `project_id`, and each project gets its own independent episode. There's no need to load the full dataset — the trigger tells you which item changed.
```elixir
defmodule MyApp.Actors.ProjectHealthActor do
use Cyclium.Actor
actor do
domain(:project_health)
spec_rev("v0.1.0")
max_concurrent_episodes(5)
episode_overflow(:queue)
expectation(:project_should_be_healthy,
strategy: MyApp.Strategies.ProjectHealth,
trigger: {:event, "project.updated"},
subject_key: :project_id,
debounce_ms: :timer.seconds(2),
budget: %{max_turns: 3, max_tokens: 1_000, max_wall_ms: 10_000}
)
end
end
```
The strategy is single-turn — load the item, classify it deterministically, emit findings:
```elixir
defmodule MyApp.Strategies.ProjectHealth do
@behaviour Cyclium.EpisodeRunner.Strategy
@impl true
def init(_episode, trigger) do
{:ok, %{project_id: trigger.payload["project_id"]}}
end
@impl true
def next_step(_state, _episode_ctx), do: :converge
@impl true
def handle_result(state, _step, _result), do: {:ok, state}
@impl true
def converge(state, episode_ctx) do
project = MyApp.Projects.get!(state.project_id)
percent_spent = project.spent / max(project.budget, 1)
days_remaining = Date.diff(project.due_date, Date.utc_today())
{class, severity, summary} = classify(project.status, percent_spent, days_remaining)
{:ok, %Cyclium.ConvergeResult{
classification: %{"primary" => class, "severity" => to_string(severity)},
confidence: 1.0,
summary: summary,
findings: [
{:raise, %{
actor_id: "project_health_actor",
finding_key: "project:health:#{project.id}:#{episode_ctx.episode_id}",
class: class,
severity: severity,
confidence: 1.0,
subject_kind: "project",
subject_id: project.id,
summary: summary,
evidence_refs: %{
"percent_spent" => percent_spent,
"days_remaining" => days_remaining
}
}}
],
outputs: []
}}
end
defp classify(:completed, _pct, _days), do: {"complete", :low, "Project completed"}
defp classify(_s, pct, _d) when pct > 1.0, do: {"over_budget", :high, "Over budget"}
defp classify(_s, pct, _d) when pct > 0.85, do: {"budget_risk", :medium, "Budget at risk"}
defp classify(_s, _pct, d) when d < 0, do: {"overdue", :high, "Past due date"}
defp classify(_s, _pct, d) when d < 3, do: {"schedule_risk", :medium, "Due date approaching"}
defp classify(_s, _pct, _d), do: {"healthy", :low, "On track"}
end
```
**Why this works well:**
- **Reactive** — evaluation happens the moment data changes, not on a polling schedule
- **Isolated** — each project gets its own episode with its own budget, journal, and findings
- **Backpressure built in** — `max_concurrent_episodes` + `debounce_ms` + `subject_key` prevent a burst of updates from overwhelming the system. Rapid updates to the same project coalesce into one episode
- **Findings create an audit trail** — the episode-scoped `finding_key` (`"project:health:#{id}:#{episode_id}"`) means each evaluation produces its own finding, giving you a point-in-time history of how health evolved
**When to reach for Batch instead:** If you need to process 500 items in one pass (e.g., a nightly SKU classification sweep), a single episode with `Cyclium.Batch` is more efficient than 500 separate episodes — fewer rows, one journal, and the ability to compare items within groups.
## Synthesizers
A **synthesizer** is the bridge between strategies and LLM infrastructure. It implements `Cyclium.Synthesizer` and is called when a strategy returns `{:synthesize, prompt_ctx}`. The synthesizer handles the actual LLM call, and its response flows back through `handle_result/3`.
```elixir
defmodule MyApp.Synthesizers.ProjectAnalysis do
@behaviour Cyclium.Synthesizer
@impl true
def synthesize(prompt_ctx, _episode_ctx) do
case MyApp.LLM.chat(prompt_ctx.system, prompt_ctx.user) do
{:ok, text} -> {:ok, %{text: text}}
{:error, reason} -> {:error, :llm_error, reason}
end
end
@impl true
def estimate_tokens(prompt_ctx) do
# Rough estimate for budget tracking
String.length(prompt_ctx.user || "") |> div(4)
end
end
```
### Attaching a synthesizer to an actor
Synthesizers can be declared at two levels:
**Actor-level** — inherited by all expectations in the actor that use `:synthesize`. Declare it when most or all expectations share the same synthesizer:
```elixir
defmodule MyApp.Actors.ProjectAdvisorActor do
use Cyclium.Actor
actor do
domain(:project_advisory)
spec_rev("v0.1.0")
synthesizer(MyApp.Synthesizers.ProjectAnalysis)
max_concurrent_episodes(3)
episode_overflow(:queue)
expectation(:project_ai_summary,
strategy: MyApp.Strategies.ProjectAdvisor,
trigger: {:event, "project.summary_requested"},
budget: %{max_turns: 5, max_tokens: 10_000, max_wall_ms: 30_000}
)
expectation(:project_risk_assessment,
strategy: MyApp.Strategies.ProjectRisk,
trigger: {:event, "project.risk_review_requested"},
budget: %{max_turns: 8, max_tokens: 15_000, max_wall_ms: 60_000}
)
end
end
```
Both expectations above use `MyApp.Synthesizers.ProjectAnalysis`.
**Expectation-level** — overrides the actor-level synthesizer for a specific expectation. Use this when one expectation needs a different model or configuration:
```elixir
actor do
domain(:project_advisory)
synthesizer(MyApp.Synthesizers.ProjectAnalysis) # default for this actor
expectation(:project_ai_summary,
strategy: MyApp.Strategies.ProjectAdvisor,
trigger: {:event, "project.summary_requested"},
synthesizer: MyApp.Synthesizers.FastSummary # override for this expectation
)
expectation(:project_risk_assessment,
strategy: MyApp.Strategies.ProjectRisk,
trigger: {:event, "project.risk_review_requested"}
# uses ProjectAnalysis (inherited from actor)
)
end
```
The strategy itself doesn't know or care which synthesizer is wired in — it just returns `{:synthesize, prompt_ctx}` and receives the result in `handle_result`:
```elixir
def next_step(%{project_data: data, ai_summary: nil}, _ctx) do
{:synthesize, %{
system: "You are a project risk analyst.",
user: "Project: #{data.name}, #{data.percent_spent * 100}% spent, #{data.days_remaining} days left"
}}
end
def handle_result(state, %{kind: :synthesis}, {:ok, %{text: text}}) do
{:ok, %{state | ai_summary: text}}
end
```
**When you don't need a synthesizer:** If your strategy is purely deterministic (like the ProjectHealthStrategy above), you don't need to declare a synthesizer at all. The synthesizer is only invoked when a strategy returns `{:synthesize, prompt_ctx}` — if your strategy never does, the synthesizer configuration is ignored.
### LLM-provided confidence
Findings have a `confidence` field (0.0–1.0). For deterministic strategies, hardcoding `1.0` is fine. But when an LLM produces the assessment, you can ask it to self-report confidence and pass that through to the finding.
**Step 1 — Add `confidence` to the tool schema in your synthesizer:**
```elixir
@tool_definition %{
type: "function",
function: %{
name: "project_health_assessment",
parameters: %{
type: "object",
properties: %{
class: %{type: "string", enum: ["healthy", "at_risk", "critical"]},
severity: %{type: "string", enum: ["low", "medium", "high", "critical"]},
summary: %{type: "string", description: "One sentence health summary"},
confidence: %{
type: "number",
minimum: 0.0,
maximum: 1.0,
description:
"How confident you are in this assessment (0.0–1.0). " <>
"Use lower values when data is sparse or ambiguous, " <>
"higher values when the evidence clearly supports the classification."
}
# ... other fields
},
required: ["class", "severity", "summary", "confidence"]
}
}
}
```
The description matters — it tells the LLM what the scale means, which produces more calibrated values than a bare `"confidence"` field.
**Step 2 — Read it in converge and pass it to the finding:**
```elixir
def converge(state, episode_ctx) do
result = state.assessment
confidence = parse_confidence(result["confidence"])
{:ok, %ConvergeResult{
classification: %{"primary" => result["class"]},
confidence: confidence,
summary: result["summary"],
findings: [
{:raise, %{
finding_key: "project:health:#{state.project_id}:#{episode_ctx.episode_id}",
confidence: confidence,
# ... other finding fields
}}
]
}}
end
defp parse_confidence(val) when is_number(val), do: max(0.0, min(val, 1.0))
defp parse_confidence(_), do: 0.5
```
The `parse_confidence/1` helper clamps to `[0.0, 1.0]` and falls back to `0.5` if the LLM returns something unexpected. The same confidence flows into both the `ConvergeResult` (episode-level) and the finding (queryable).
**When to hardcode instead:** If the classification is deterministic (rule-based, no LLM), use `confidence: 1.0`. The LLM confidence pattern is for cases where the assessment involves judgment — ambiguous data, sparse evidence, or nuanced classification where the LLM's certainty is genuinely informative.
## Test kit
Cyclium ships a test kit in `Cyclium.Test.*` that host apps can use to smoke-test their definitions without running full episodes. Import the helpers with `use`:
### Actor validation
```elixir
defmodule MyApp.Actors.ClientHealthActorTest do
use ExUnit.Case, async: true
use Cyclium.Test.ActorCase
test "actor definition is valid" do
assert_valid_actor(MyApp.Actors.ClientHealthActor)
end
test "all expectations have strategies" do
assert_strategies_defined(MyApp.Actors.ClientHealthActor)
end
test "budgets are well-formed" do
assert_budgets_valid(MyApp.Actors.ClientHealthActor)
end
test "spec_rev is set" do
assert_spec_rev_set(MyApp.Actors.ClientHealthActor)
end
end
```
### Strategy contract verification
```elixir
defmodule MyApp.Strategies.ClientHealthTest do
use ExUnit.Case, async: true
use Cyclium.Test.StrategyCase
@episode build_test_episode(actor_id: "client_health", expectation_id: "health_check")
@trigger %Cyclium.Trigger.Manual{requested_by: "test"}
test "init returns valid state" do
assert_valid_init(MyApp.Strategies.ClientHealth, @episode, @trigger)
end
test "strategy terminates within budget" do
assert_strategy_terminates(MyApp.Strategies.ClientHealth, @episode, @trigger,
max_steps: 20
)
end
end
```
### Synthesizer testing
```elixir
use Cyclium.Test.SynthesizerCase
# Contract validation
assert_valid_synthesize(MySynthesizer, prompt_ctx, episode_ctx)
assert_valid_estimate_tokens(MySynthesizer, prompt_ctx)
# FakeSynthesizer for strategy tests
{:ok, _} = Cyclium.Test.FakeSynthesizer.start_link()
Cyclium.Test.FakeSynthesizer.set_response(%{"answer" => "42"})
# ... run strategy, then inspect calls:
Cyclium.Test.FakeSynthesizer.calls()
```
### Output adapter testing
```elixir
use Cyclium.Test.OutputCase
# Contract validation
assert_valid_deliver(MyApp.Adapters.Slack, :slack, payload, ctx)
# FakeOutputAdapter for integration tests
{:ok, _} = Cyclium.Test.FakeOutputAdapter.start_link()
# ... run episode, then inspect deliveries:
Cyclium.Test.FakeOutputAdapter.deliveries()
```
### Workflow validation
```elixir
defmodule MyApp.Workflows.VendorOnboardingTest do
use ExUnit.Case, async: true
use Cyclium.Test.WorkflowCase
test "workflow is valid" do
assert_valid_workflow(MyApp.Workflows.VendorOnboarding)
end
test "all steps have failure policies" do
assert_failure_policies_complete(MyApp.Workflows.VendorOnboarding)
end
test "step inputs don't crash" do
assert_step_inputs_safe(MyApp.Workflows.VendorOnboarding,
trigger: %{"vendor_id" => "v123"}
)
end
end
```
### Checkpoint migration fuzzing
Property-based testing for `migrate/2` chains using StreamData:
```elixir
use Cyclium.Test.CheckpointMigration
# Fuzz test: generate random states, migrate through version chain, assert no crashes
assert_migration_safe(MyCheckpoint, iterations: 200)
# Specific version migration
assert_migration(MyCheckpoint, %{"old_field" => 1}, 1, 3)
# Idempotency: migrating an already-current state is a no-op
assert_migration_idempotent(MyCheckpoint, iterations: 100)
```
## Demo application
See [cyclium_ex_hapi](../cyclium_ex_hapi) for a complete Phoenix LiveView application demonstrating Cyclium:
- Client health monitoring with real-time evaluation
- LLM-powered AI advisor actor (Anthropic Claude integration)
- Simulation controls for testing different scenarios
- Episode detail view with step timeline and rendered logs
- Reactive UI via Bus event subscriptions
## Development
```bash
# Install dependencies
mix deps.get
# Run tests
mix test
# Dialyzer (static analysis)
mix dialyzer
# Compile
mix compile --warnings-as-errors
```