README.md

<p align="center">
  <img src="assets/agent_session_manager.svg" alt="AgentSessionManager" width="200">
</p>

<h1 align="center">AgentSessionManager</h1>

<p align="center">
  An Elixir library for managing AI agent sessions with state persistence, streaming events, multi-provider support, and concurrency controls.
</p>

<p align="center">
  <a href="https://hex.pm/packages/agent_session_manager"><img src="https://img.shields.io/hexpm/v/agent_session_manager.svg" alt="Hex.pm"></a>
  <a href="https://hexdocs.pm/agent_session_manager"><img src="https://img.shields.io/badge/docs-hexdocs-blue.svg" alt="Documentation"></a>
  <a href="https://github.com/nshkrdotcom/agent_session_manager/blob/main/LICENSE"><img src="https://img.shields.io/badge/license-MIT-green.svg" alt="License"></a>
</p>

---

## What It Does

AgentSessionManager provides the infrastructure layer for building applications that interact with AI agents. Rather than calling provider APIs directly, you work with sessions, runs, and events -- giving you lifecycle management, observability, and the ability to swap providers without rewriting your application.

**Key features:**

- **Session & run lifecycle** -- Create sessions, execute runs, and track state transitions with a well-defined state machine
- **Multi-provider support** -- Built-in adapters for Claude Code (Anthropic), Codex, and Amp (Sourcegraph), with a behaviour for adding your own
- **Streaming events** -- Normalized event pipeline that maps provider-specific events to a canonical format
- **Cursor-backed event streaming** -- Monotonic per-session sequence numbers with durable cursor queries (`after` / `before`)
- **Session continuity** -- Provider-agnostic transcript reconstruction and optional cross-run context replay
- **Workspace snapshots** -- Optional pre/post snapshots, diff summaries, patch capture caps, and git-only rollback on failure
- **Provider routing** -- Router-as-adapter with capability-based selection, policy ordering, and retryable failover
- **Policy enforcement** -- Real-time budget/tool governance with cancel or warn actions and `:policy_violation` events
- **Capability negotiation** -- Declare required and optional capabilities; the resolver checks provider support before execution
- **Concurrency controls** -- Configurable limits on parallel sessions and runs with slot-based tracking
- **Session server runtime** -- Optional per-session `SessionServer` with FIFO queueing, subscriptions, and strict sequential MVP execution (`max_concurrent_runs: 1`)
- **Observability** -- Telemetry integration, audit logging, and append-only event stores
- **Ports & adapters architecture** -- Clean separation between core logic and external dependencies

## Architecture Overview

```
Your Application
       |
  SessionManager         -- orchestrates lifecycle, events, capability checks
       |
  +----+----+
  |         |
Store    Adapter          -- ports (interfaces / behaviours)
  |         |
ETS/DB   Claude/Codex/Amp -- adapters (implementations)
```

The core domain types (`Session`, `Run`, `Event`, `Capability`, `Manifest`) are pure data structures with no side effects. The `SessionManager` coordinates between the storage port and the provider adapter port.

## Installation

Add `agent_session_manager` to your dependencies in `mix.exs`:

```elixir
def deps do
  [
    {:agent_session_manager, "~> 0.5.1"}
  ]
end
```

Then run:

```bash
mix deps.get
```

## Quick Start

### One-shot (simplest)

```elixir
alias AgentSessionManager.SessionManager
alias AgentSessionManager.Adapters.{ClaudeAdapter, InMemorySessionStore}

{:ok, store} = InMemorySessionStore.start_link()
{:ok, adapter} = ClaudeAdapter.start_link(api_key: System.get_env("ANTHROPIC_API_KEY"))

{:ok, result} = SessionManager.run_once(store, adapter, %{
  messages: [%{role: "user", content: "Hello!"}]
}, event_callback: fn e -> IO.inspect(e.type) end)

IO.puts(result.output.content)
IO.inspect(result.token_usage)
# result also includes :session_id and :run_id
```

### Full lifecycle

```elixir
alias AgentSessionManager.SessionManager
alias AgentSessionManager.Adapters.{ClaudeAdapter, InMemorySessionStore}

# 1. Start the storage and adapter processes
{:ok, store} = InMemorySessionStore.start_link()
{:ok, adapter} = ClaudeAdapter.start_link(api_key: System.get_env("ANTHROPIC_API_KEY"))

# 2. Create and activate a session
{:ok, session} = SessionManager.start_session(store, adapter, %{
  agent_id: "my-agent",
  context: %{system_prompt: "You are a helpful assistant."}
})
{:ok, session} = SessionManager.activate_session(store, session.id)

# 3. Create and execute a run
{:ok, run} = SessionManager.start_run(store, adapter, session.id, %{
  messages: [%{role: "user", content: "Hello!"}]
})
{:ok, result} = SessionManager.execute_run(store, adapter, run.id)

# 4. Inspect the result
IO.puts(result.output.content)
IO.inspect(result.token_usage)

# 5. Complete the session
{:ok, _} = SessionManager.complete_session(store, session.id)
```

### Session runtime (queued, sequential)

If you need a per-session runtime that queues runs and provides await/cancel and subscriptions, use `AgentSessionManager.Runtime.SessionServer`.

**MVP behavior:** strict sequential execution only (`max_concurrent_runs: 1`).

```elixir
alias AgentSessionManager.Adapters.{ClaudeAdapter, InMemorySessionStore}
alias AgentSessionManager.Runtime.SessionServer

{:ok, store} = InMemorySessionStore.start_link()
{:ok, adapter} = ClaudeAdapter.start_link(model: "claude-haiku-4-5-20251001", tools: [])

{:ok, server} =
  SessionServer.start_link(
    store: store,
    adapter: adapter,
    session_opts: %{agent_id: "runtime-session"},
    max_concurrent_runs: 1
  )

{:ok, run_id} =
  SessionServer.submit_run(server, %{
    messages: [%{role: "user", content: "Hello!"}]
  })

{:ok, result} = SessionServer.await_run(server, run_id, 120_000)
IO.inspect(result.output)
```

## Core Concepts

### Sessions

A session is a logical container for a series of interactions with an AI agent. Sessions track state (`pending -> active -> completed/failed/cancelled`) and carry context (system prompts, metadata, tags).

```elixir
{:ok, session} = Session.new(%{agent_id: "research-agent", tags: ["research"]})
{:ok, active} = Session.update_status(session, :active)
```

### Runs

A run represents one execution within a session -- sending input to the provider and receiving output. Runs have their own lifecycle (`pending -> running -> completed/failed/cancelled/timeout`) and track token usage.

```elixir
{:ok, run} = Run.new(%{session_id: session.id, input: %{messages: messages}})
{:ok, completed} = Run.set_output(run, %{content: "Response text"})
```

### Events

Events are immutable records of things that happen during execution. They provide an audit trail and power the streaming interface.

```elixir
{:ok, event} = Event.new(%{
  type: :message_received,
  session_id: session.id,
  run_id: run.id,
  data: %{content: "Hello!", role: "assistant"}
})
```

### Cursor-Backed Streaming

Event sequence numbers are assigned at append time by the `SessionStore` implementation.
This makes event ordering durable and resumable across processes.

```elixir
alias AgentSessionManager.Ports.SessionStore

{:ok, stored_event} = SessionStore.append_event_with_sequence(store, event)
stored_event.sequence_number
# => 42

{:ok, latest} = SessionStore.get_latest_sequence(store, session.id)
# => 42
```

`SessionStore.get_events/3` supports cursor filters in addition to existing filters:

```elixir
# Cursor pagination
{:ok, page_1} = SessionStore.get_events(store, session.id, limit: 50)
cursor = List.last(page_1).sequence_number
{:ok, page_2} = SessionStore.get_events(store, session.id, after: cursor, limit: 50)

# Bounded window
{:ok, window} = SessionStore.get_events(store, session.id, after: 100, before: 200)
```

For follow/poll consumption, use `SessionManager.stream_session_events/3`:

```elixir
stream =
  SessionManager.stream_session_events(store, session.id,
    after: 0,
    limit: 100,
    poll_interval_ms: 250
  )

Enum.take(stream, 10)
```

### Session Continuity

Continuity is opt-in per run. When enabled, `SessionManager` reconstructs a
transcript from persisted events and injects it into `session.context[:transcript]`
before calling the adapter.

```elixir
{:ok, result} = SessionManager.execute_run(store, adapter, run.id,
  continuation: true,
  continuation_opts: [max_messages: 200]
)
```

Adapters consume transcript context when present. If provider-native handle reuse
is unavailable, transcript replay is the fallback path.

### Workspace Snapshots

Workspace instrumentation is also opt-in per run:

```elixir
{:ok, result} = SessionManager.execute_run(store, adapter, run.id,
  workspace: [
    enabled: true,
    path: "/path/to/workspace",
    strategy: :auto,
    capture_patch: true,
    max_patch_bytes: 1_048_576,
    rollback_on_failure: false
  ]
)
```

When enabled, `SessionManager` takes pre/post snapshots, computes diffs,
emits workspace events, and enriches run metadata with compact diff summaries.
MVP rollback is git-only. Requesting `rollback_on_failure: true` with hash backend
returns a configuration error.

### Provider Routing

`ProviderRouter` is a normal `ProviderAdapter`, so no `SessionManager` branching is required.

```elixir
alias AgentSessionManager.Routing.ProviderRouter

{:ok, router} =
  ProviderRouter.start_link(
    policy: [prefer: ["amp", "codex", "claude"], max_attempts: 3],
    cooldown_ms: 30_000
  )

:ok = ProviderRouter.register_adapter(router, "claude", claude_adapter)
:ok = ProviderRouter.register_adapter(router, "codex", codex_adapter)
:ok = ProviderRouter.register_adapter(router, "amp", amp_adapter)

{:ok, result} =
  SessionManager.execute_run(store, router, run.id,
    adapter_opts: [
      routing: [
        required_capabilities: [%{type: :tool, name: "bash"}]
      ]
    ]
  )
```

MVP health/failover behavior:

- tracks consecutive failures and last failure time per adapter
- applies cooldown-based temporary skipping
- retries/fails over only for retryable errors (`Error.retryable?/1`)
- routes `cancel/2` to the adapter currently handling the active run

### Policy Enforcement

Policy enforcement is opt-in per execution:

```elixir
alias AgentSessionManager.Policy.Policy

{:ok, policy} =
  Policy.new(
    name: "production",
    limits: [{:max_total_tokens, 8_000}, {:max_duration_ms, 120_000}],
    tool_rules: [{:deny, ["bash"]}],
    on_violation: :cancel
  )

{:ok, result} =
  SessionManager.execute_run(store, adapter, run.id, policy: policy)
```

Behavior:

- violations emit `:policy_violation` events
- `on_violation: :cancel` triggers one cancellation request and returns
  `{:error, %Error{code: :policy_violation}}` even if provider returned success
- `on_violation: :warn` preserves success and returns violation metadata in `result.policy`
- cost limits (`{:max_cost_usd, ...}`) are optional and use configured provider token rates

### Provider Adapters

Adapters implement the `ProviderAdapter` behaviour to integrate with AI providers. Each adapter handles streaming, tool calls, and cancellation for its provider.

| Adapter | Provider | Streaming | Tool Use | Cancel |
|---------|----------|-----------|----------|--------|
| `ClaudeAdapter` | Anthropic | Yes | Yes | Yes |
| `CodexAdapter` | Codex CLI | Yes | Yes | Yes |
| `AmpAdapter` | Amp (Sourcegraph) | Yes | Yes | Yes |

### Capability Negotiation

Before starting a run, you can declare what capabilities are required. The resolver checks the provider's capabilities and fails fast if requirements aren't met.

```elixir
{:ok, resolver} = CapabilityResolver.new(required: [:sampling], optional: [:tool])
{:ok, result} = CapabilityResolver.negotiate(resolver, adapter_capabilities)
# result.status => :full | :degraded
```

## Provider Event Normalization

Each provider emits events in its own format. The adapters normalize these into a canonical set:

| Normalized Event | Description |
|---|---|
| `run_started` | Execution began |
| `message_streamed` | Streaming content chunk |
| `message_received` | Complete message ready |
| `tool_call_started` | Tool invocation begins |
| `tool_call_completed` | Tool finished |
| `token_usage_updated` | Usage stats updated |
| `run_completed` | Execution finished |
| `run_failed` | Execution failed |
| `run_cancelled` | Execution cancelled |
| `workspace_snapshot_taken` | Workspace snapshot captured |
| `workspace_diff_computed` | Workspace diff summary computed |
| `policy_violation` | Policy violation detected |

## Error Handling

All operations return tagged tuples. Errors use a structured taxonomy with machine-readable codes:

```elixir
case SessionManager.start_session(store, adapter, attrs) do
  {:ok, session} -> session
  {:error, %Error{code: :validation_error, message: msg}} ->
    Logger.error("Validation failed: #{msg}")
end
```

Error codes are grouped into categories: validation, resource, provider, storage, runtime, concurrency, and tool errors. Some errors (like `provider_timeout`) are marked retryable via `Error.retryable?/1`.

## Examples

The `examples/` directory contains runnable scripts:

```bash
# Cursor examples (live providers)
mix run examples/cursor_pagination.exs --provider claude
mix run examples/cursor_follow_stream.exs --provider codex

# Feature 2 and 3 examples
mix run examples/session_continuity.exs --provider amp
mix run examples/workspace_snapshot.exs --provider claude

# Feature 4 and 5 examples
mix run examples/provider_routing.exs --provider codex
mix run examples/policy_enforcement.exs --provider claude

# Default run-all mode executes all examples for all providers
bash examples/run_all.sh

# All examples for a single live provider
bash examples/run_all.sh --provider codex
```

See `examples/README.md` for full documentation.

## Guides

The guides cover each subsystem in depth:

- [Getting Started](guides/getting_started.md) -- Installation, first session, and core workflow
- [Live Examples](guides/live_examples.md) -- Running all live examples and validating contract surfaces
- [Architecture](guides/architecture.md) -- Ports & adapters design, module map, data flow
- [Configuration](guides/configuration.md) -- Layered config system, process-local overrides
- [Sessions and Runs](guides/sessions_and_runs.md) -- Lifecycle state machines, metadata, context
- [Session Server Runtime](guides/session_server_runtime.md) -- Per-session FIFO queueing, submit/await/cancel, and limiter integration
- [Session Server Subscriptions](guides/session_server_subscriptions.md) -- Store-backed event subscriptions with cursor replay and filtering
- [Session Continuity](guides/session_continuity.md) -- Transcript reconstruction, continuation options, adapter replay behavior
- [Events and Streaming](guides/events_and_streaming.md) -- Event types, normalization, EventStream cursor
- [Cursor Streaming and Migration](guides/cursor_streaming_and_migration.md) -- Sequence assignment, cursor APIs, and custom store migration
- [Workspace Snapshots](guides/workspace_snapshots.md) -- Workspace options, snapshot/diff events, metadata, and rollback scope
- [Provider Routing](guides/provider_routing.md) -- Router-as-adapter setup, capability matching, health, failover, cancel routing
- [Policy Enforcement](guides/policy_enforcement.md) -- Policy model, runtime enforcement, final result semantics, cost checks
- [Provider Adapters](guides/provider_adapters.md) -- Using Claude/Codex/Amp adapters, writing your own
- [Capabilities](guides/capabilities.md) -- Defining capabilities, negotiation, manifests, registry
- [Concurrency](guides/concurrency.md) -- Session/run limits, slot management, control operations
- [Telemetry and Observability](guides/telemetry_and_observability.md) -- Telemetry events, audit logging, metrics
- [Error Handling](guides/error_handling.md) -- Error taxonomy, retryable errors, provider errors
- [Testing](guides/testing.md) -- Mock adapters, in-memory store, testing patterns

## Documentation

Full API documentation is available at [HexDocs](https://hexdocs.pm/agent_session_manager).

## License

AgentSessionManager is released under the [MIT License](LICENSE).