Skip to main content

README.md

# Cyclium

> **Autonomous agent framework for Elixir** — actors monitor domains, run
> multi-turn episodes with budget enforcement, and produce persistent findings
> and typed outputs.

Cyclium is an Elixir library for building agentic systems that monitor domains,
run multi-turn episodes, classify situations, and produce typed outputs. Actors
declare expectations about how things should be; when triggers fire, episodes
execute strategies that can gather data, call tools, synthesize with LLMs, and
converge into findings and outputs. Think of it as an OTP-native agent framework
where the episode — not the request — is the unit of work.

The **developer is the router**: your `next_step/2` is a deterministic state
machine that decides what happens next. The LLM is a tool you call at specific
points via `:synthesize`, never the control plane — so you get repeatability,
testability, and full visibility while still using AI where it adds value. See
[Actors & Strategies](guides/actors_and_strategies.md) for the full philosophy.

## Key features

- **Declarative Actor DSL** — Define actors, expectations, triggers, and budgets in a compact macro-based syntax
- **Strategy Pattern** — Pluggable investigation logic with a clear init → observe → converge lifecycle
- **Episode Runner** — Budget-enforced execution loop with step journaling, checkpointing, and crash recovery
- **Findings Lifecycle** — Persistent observations with raise/update/clear semantics, upsert-by-key, causality chains, TTL expiration, severity escalation, and post-raise enrichment hooks
- **Output Router** — Deduplicated, adapter-based delivery (email, Slack, webhooks) with approval gates and adapter registry
- **Event Bus** — Phoenix.PubSub-backed event system connecting actors without coupling
- **Workflow Engine** — Multi-actor coordination with dependency graphs, failure policies, retry with backoff, cross-workflow episode dedup, and cancellation cascade
- **Circuit Breaker** — Per-expectation circuit breaker with configurable thresholds, half-open recovery, and optional in-flight episode cancellation
- **Episode Sampling** — Probabilistic firing control via `sample_rate` on expectations
- **Service Level Tracking** — Declarative performance objectives with breach detection and telemetry
- **Adaptive Budgets** — Advisory budget recommendations based on historical episode usage (p95 with headroom)
- **Backpressure Controls** — Per-actor concurrency limits with queue, drop, or shed-oldest overflow policies
- **Debounce and Cooldown** — Temporal controls to coalesce rapid-fire events and enforce minimum gaps
- **Log Projection** — Materialized human-readable logs at configurable verbosity (none → full_debug)
- **Telemetry** — 36 structured telemetry events for observability
- **OTP-Native** — No Oban or external job queue required; episodes run as Tasks under DynamicSupervisor
- **Distributed coordination** — DB-based dedup, lease-based work claims, crash recovery, and multi-stack/trigger-only deployment modes — no Redis or leader election
- **Test Kit** — Assertion macros and fakes for validating actors, strategies, synthesizers, output adapters, workflows, and checkpoint migrations in host apps
- **SQL Server 2017 Compatible** — Transaction-based upserts, denormalized query columns, no JSON operators in DDL

## Who is this for?

Cyclium is designed for Elixir teams building **autonomous agent systems** where:

- Business rules define what *should* be true (SLAs, health thresholds, compliance checks)
- Episodes involve multiple steps: data gathering, LLM synthesis, tool calls, human approval
- Findings need to persist and evolve over time (raised → updated → cleared)
- Actions need deduplication, audit trails, and typed delivery through adapters
- Multiple actors need to coordinate through workflows with dependency ordering
- Real-time visibility into agent state is essential (Phoenix LiveView integration via Bus)

If you need a simple cron job or a one-shot script, Cyclium is overkill. Cyclium
shines when you have ongoing, stateful processes that produce findings and
outputs — and need the lifecycle, audit trail, and coordination to go with them.

## How Cyclium differs

- **vs. Oban** — Oban is a job queue: enqueue work, run it, done. Cyclium manages
  stateful, multi-turn episodes with budgets, findings, outputs, and workflows.
  Episodes happen to run as OTP Tasks, so you don't need Oban — but the two solve
  different problems and can be used together.
- **vs. chat-agent frameworks** — Those are built for interactive, LLM-routed
  conversations. Cyclium is built for autonomous operational agents that monitor
  domains and act, with or without an LLM in the loop, under deterministic
  developer control.
- **vs. raw GenServers** — You could build all of this yourself, but Cyclium
  gives you the episode lifecycle, findings system, output router, workflow
  engine, and event bus — wired together with telemetry and audit trails.

## Architecture

> The guides use a generic **resource monitoring** example: a `ResourceMonitor`
> actor that evaluates a resource's usage against a limit and classifies status,
> plus a `ResourceAdvisorActor` that produces an LLM-powered summary. See the
> [demo application](#demo-application) for a full working implementation.

### Supervision tree

```
YourApp.Supervisor
├── YourApp.Repo
├── Phoenix.PubSub
├── YourApp.Actors.ResourceMonitor (GenServer)
├── YourApp.Actors.ResourceAdvisorActor (GenServer)
├── Cyclium.Supervisor
│   ├── Cyclium.ActorSupervisor (DynamicSupervisor)
│   ├── Cyclium.EpisodeSupervisor (DynamicSupervisor)
│   │   └── Cyclium.EpisodeTask (one per running episode)
│   ├── Cyclium.TaskSupervisor (Task.Supervisor)
│   ├── Cyclium.Reconciler (optional — spec change detection)
│   ├── Cyclium.WorkflowEngine (optional — multi-actor workflows)
│   └── Cyclium.Findings.FindingSweep (optional — expiration + escalation)
└── YourAppWeb.Endpoint
```

**Actors** are GenServers that subscribe to Bus events and manage episode
lifecycle. They're started by the consuming app's supervision tree, above
Cyclium's supervisors. When a trigger fires, the actor creates an Episode row and
starts an `EpisodeTask` under the `EpisodeSupervisor`. Each task resolves a
strategy from the registry and runs the episode loop.

**Key design principles:**

- Actors own concurrency limits — they track active/queued episodes in-process
- Episodes are durable — the `cyclium_episodes` table is itself a work queue
- The Bus connects everything — actors, LiveViews, and workflows all subscribe to the same event stream
- Strategies are stateless modules — all state lives in the episode's strategy state map

### Execution model

```
Bus event arrives
  → Actor.handle_info matches expectation trigger
  → Check debounce/cooldown → circuit breaker → sample_rate
  → Check concurrency (active < max?)
    → yes: create Episode row, start EpisodeTask under DynamicSupervisor
    → no:  apply overflow policy (queue / drop / shed_oldest)

EpisodeTask starts
  → Resolve strategy (persistent_term registration from actor boot → registry override)
  → strategy.init(episode, trigger)
  → EpisodeRunner.execute_loop:

    ┌─────────────────────────────────────────────┐
    │  check_budget → check_loop → increment_turn │
    │  strategy.next_step(state, ctx)             │
    │    :done         → journal, set done        │
    │    :converge     → run converge pipeline    │
    │    {:tool_call}  → exec tool, handle_result │
    │    {:observe}    → journal, handle_result   │
    │    {:synthesize} → journal, handle_result   │
    │    {:checkpoint} → save state, loop         │
    │    {:approval}   → block, wait for human    │
    │    {:wait}       → block, wait for external │
    │    ...           → loop                     │
    └─────────────────────────────────────────────┘

  Converge pipeline (post_converge):
    1. Persist findings (raise/update/clear) → enrich → Bus events per finding
    2. Route outputs through adapters → dedup by dedupe_key, deliver
    3. Compute final episode status from delivery outcomes
    4. Journal completion/failure step
    5. Project log via LogProjector
    6. Record service levels + check for breach
    7. Record adaptive budget sample (if enabled)
    8. Broadcast episode.completed/failed on Bus
    9. Emit telemetry
```

## Core concepts

| Concept | What it is |
|---|---|
| **Actor** | A GenServer that owns expectations and fires episodes when triggers match. → [Actors & Strategies](guides/actors_and_strategies.md) |
| **Expectation** | A declaration of what *should* be true, with a strategy, trigger, and budget |
| **Strategy** | The brain of an episode — a stateless `init → next_step → handle_result → converge` module |
| **Episode** | One execution of a strategy, with budget tracking, a step journal, and a status lifecycle |
| **Finding** | A persistent observation with raise/update/clear semantics. → [Findings & Outputs](guides/findings_and_outputs.md) |
| **Output** | A typed proposal delivered through deduplicated adapters |
| **Bus** | A Phoenix.PubSub event stream connecting actors, LiveViews, and workflows |
| **Workflow** | Multi-actor coordination over a dependency graph. → [Workflows](guides/workflows.md) |

## Guides

| Guide | Covers |
|---|---|
| [Actors & Strategies](guides/actors_and_strategies.md) | Actor DSL, expectation options, strategy lifecycle, multi-turn strategies, episodes, budgets, deduplication, tools, synthesizers |
| [Findings & Outputs](guides/findings_and_outputs.md) | Findings lifecycle, causality chains, TTL/escalation/enrichment, output router & adapters, Bus events, LiveView, Window helpers |
| [Workflows](guides/workflows.md) | Compiled and dynamic (DB-defined) workflows, data passing, failure policies |
| [Dynamic Actors](guides/dynamic_actors.md) | DB-stored actor definitions, strategy templates, gatherers, lifecycle & draining |
| [Observability](guides/observability.md) | Log strategies, synthesis storage pipeline, telemetry, step journal, sampling, service levels, adaptive budgets |
| [Distributed Ops](guides/distributed_ops.md) | Crash recovery, work claims, node identity, multi-stack deployments, trigger-only mode |
| [Advanced](guides/advanced.md) | Checkpointing, circuit breaker, step retry, reconciler, dry runs, batch & per-item processing, test kit |
| [Interactive Actors](guides/interactive_actors.md) | Human-in-the-loop episodes (approvals, waits) |
| [Conversation UI](guides/conversation_ui.md) | Building conversational front-ends over Cyclium |

## Setup

### 1. Add dependency

```elixir
# mix.exs
def deps do
  [{:cyclium, path: "../cyclium_ex"}]
end
```

Dependencies pulled in: `ecto`, `ecto_sql`, `jason`, `phoenix_pubsub`.

### 2. Run migrations

```elixir
# In a migration file:
def up do
  Cyclium.Migrations.V1.up()   # episodes, steps, checkpoints, findings, outputs
  Cyclium.Migrations.V2.up()   # episode_logs
  Cyclium.Migrations.V3.up()   # workflow_instances
  Cyclium.Migrations.V4.up()   # archived_at on episodes and findings
  Cyclium.Migrations.V5.up()   # unique index on episode dedupe_key
  Cyclium.Migrations.V6.up()   # work_claims table for lease-based coordination
  # ...V7 through V13...
  Cyclium.Migrations.V14.up()  # trigger_requests table for deferred execution
  # ...V15 through V18...
  Cyclium.Migrations.V19.up()  # SQL Server: convert legacy TEXT columns to nvarchar(max)
end

def down do
  Cyclium.Migrations.V19.down()
  Cyclium.Migrations.V14.down()
  # ...V13 through V7...
  Cyclium.Migrations.V6.down()
  Cyclium.Migrations.V5.down()
  Cyclium.Migrations.V4.down()
  Cyclium.Migrations.V3.down()
  Cyclium.Migrations.V2.down()
  Cyclium.Migrations.V1.down()
end
```

> **Authoring migrations:** do not use bare `:text`. On `Ecto.Adapters.Tds` it
> emits SQL Server's legacy non-Unicode `TEXT` type, which silently replaces
> emoji and other non-CP1252 characters with `?`. Use `{:string, size: :max}`
> (which becomes `nvarchar(max)` on Tds and `TEXT` on Postgres/SQLite), or
> branch on `repo().__adapter__()` for finer control. V19 is the one-shot
> repair migration for columns that were already declared `:text`.

### 3. Configure

```elixir
# config.exs
config :cyclium, :repo, MyApp.Repo

# Optional: registry for strategy/synthesizer overrides (see Actors & Strategies guide)
# config :cyclium, :strategy_registry, MyApp.StrategyRegistry

# Optional: episode runner (default: Cyclium.Runner.OTP)
# Use Cyclium.Runner.Deferred for trigger-only mode (see Distributed Ops guide)
config :cyclium, :runner, Cyclium.Runner.OTP

# Optional: node identity override for shared-name environments (see Distributed Ops guide)
# config :cyclium, :node_identity, "my-unique-node-name"

# Optional: tool capabilities
config :cyclium, :capability_registry, %{
  data_source: MyApp.Tools.DataSource,
  notifier: MyApp.Tools.Notifier
}

# Optional: output adapters
config :cyclium, :output_adapters, %{
  email: MyApp.Adapters.Email,
  slack: MyApp.Adapters.Slack
}

# Optional: checkpoint schemas for versioned state migration
config :cyclium, :checkpoint_schemas, %{
  {"resource_monitor", "check_resource_limits"} => MyApp.Checkpoints.ResourceCheck
}

# Optional: enable reconciler for hot spec changes
config :cyclium, :reconciler, true

# Optional: register workflows
config :cyclium, :workflows, [MyApp.Workflows.ResourceReview]
```

### 4. Declare strategies on expectations

The preferred approach is declaring the strategy module directly on each
expectation in the actor DSL. Cyclium registers the mapping automatically when the
actor GenServer boots — no separate registry needed:

```elixir
defmodule MyApp.Actors.ResourceMonitor do
  use Cyclium.Actor

  actor do
    domain(:resource)
    spec_rev("v0.1.0")
    synthesizer(MyApp.Synthesizers.ResourceAnalysis)  # actor-level default
    max_concurrent_episodes(5)
    episode_overflow(:queue)

    expectation(:check_resource_limits,
      strategy: MyApp.Strategies.ResourceLimits,
      trigger: {:event, "resource.updated"},
      subject_key: :resource_id,
      debounce_ms: :timer.seconds(3),
      budget: %{max_turns: 3, max_tokens: 1_000, max_wall_ms: 10_000}
    )

    expectation(:resource_advisory,
      strategy: MyApp.Strategies.ResourceAdvisor,
      synthesizer: MyApp.Synthesizers.FastSummary,  # override for this expectation
      trigger: {:event, "resource.advisory_requested"},
      budget: %{max_turns: 5, max_tokens: 5_000, max_wall_ms: 60_000}
    )
  end
end
```

See the [Actors & Strategies guide](guides/actors_and_strategies.md) for the
optional strategy registry used for environment/A-B overrides.

### 5. Supervision tree

```elixir
# application.ex
children = [
  MyApp.Repo,
  {Phoenix.PubSub, name: MyApp.PubSub},
  {Cyclium.Supervisor, pubsub: MyApp.PubSub},
  MyApp.Actors.ResourceMonitor,
  MyApp.Actors.ResourceAdvisorActor,
  MyAppWeb.Endpoint
]
```

`Cyclium.Supervisor` starts the DynamicSupervisors, TaskSupervisor, and optionally
the Reconciler and WorkflowEngine.

## Database tables

All tables use `binary_id` primary keys and are SQL Server 2017 compatible (no
JSON operators in DDL, application-layer upserts, denormalized columns for indexed
queries).

| Table | Migration | Purpose |
|---|---|---|
| `cyclium_episodes` | V1 | Episode lifecycle, budget tracking, classification |
| `cyclium_episode_steps` | V1 | Step-by-step journal (16 step kinds) |
| `cyclium_episode_checkpoints` | V1 | Versioned strategy state snapshots |
| `cyclium_findings` | V1 | Persistent observations with raise/update/clear lifecycle |
| `cyclium_outputs` | V1 | Output proposals, delivery status, deduplication |
| `cyclium_episode_logs` | V2 | Materialized human-readable logs |
| `cyclium_workflow_instances` | V3, V9 | Workflow execution tracking, step states, dry run mode |
| `cyclium_work_claims` | V6 | Lease-based distributed work coordination |
| `cyclium_agent_definitions` | V7 | DB-stored actor definitions for dynamic actors |
| `cyclium_workflow_definitions` | V8 | DB-stored workflow definitions for dynamic workflows |
| `cyclium_trigger_requests` | V14 | Deferred episode execution for trigger-only nodes |

V4 adds `archived_at` to episodes and findings. V5 replaces the non-unique
`dedupe_key` index on episodes with a filtered unique index
(`WHERE dedupe_key IS NOT NULL AND archived_at IS NULL`) for multi-node
coordination. V6 adds the `cyclium_work_claims` table. V7 adds
`cyclium_agent_definitions` and `mode`/`dry_run_opts` columns to episodes. V8 adds
`cyclium_workflow_definitions`. V10 adds `caused_by_key` (finding causality
chains) and `expires_at` (TTL-based expiration) to `cyclium_findings`.

## Demo application

See [cyclium_ex_hapi](../cyclium_ex_hapi) for a complete Phoenix LiveView
application demonstrating Cyclium:

- Real-time evaluation with deterministic and LLM-powered actors
- Simulation controls for testing different scenarios
- Episode detail view with step timeline and rendered logs
- Reactive UI via Bus event subscriptions

## Development

```bash
# Install dependencies
mix deps.get

# Run tests
mix test

# Dialyzer (static analysis)
mix dialyzer

# Compile
mix compile --warnings-as-errors
```