README.md

# Orkestra

A lightweight CQRS/ES toolkit for Elixir. Pluggable message bus, event store, and OpenTelemetry tracing built in.

Orkestra gives you the building blocks without the framework lock-in. Define commands and events with a declarative DSL, wire up handlers that auto-subscribe, and swap between in-process PubSub and distributed RabbitMQ with a single config change.

## Installation

```elixir
def deps do
  [
    {:orkestra, "~> 0.1.0"}
  ]
end
```

## Quick start

### Define a command

```elixir
defmodule MyApp.Commands.PlaceOrder do
  use Orkestra.Command

  param :product_id, :string, required: true
  param :quantity, :integer, required: true
  param :notes, :string, default: ""
end
```

### Define an event

```elixir
defmodule MyApp.Events.OrderPlaced do
  use Orkestra.Event

  field :order_id, :string, required: true
  field :product_id, :string, required: true
  field :quantity, :integer, required: true
  field :placed_at, :string, required: true
end
```

### Handle the command

```elixir
defmodule MyApp.Handlers.PlaceOrderHandler do
  use Orkestra.CommandHandler,
    command: MyApp.Commands.PlaceOrder

  @impl true
  def execute(command, metadata) do
    order_id = Ecto.UUID.generate()

    # ... your business logic ...

    {:ok, %{order_id: order_id}}
  end
end
```

### React to the event

```elixir
defmodule MyApp.Handlers.SendOrderConfirmation do
  use Orkestra.EventHandler,
    event: MyApp.Events.OrderPlaced

  @impl true
  def handle_event(event, _metadata) do
    MyApp.Mailer.send_confirmation(event.data.order_id)
    :ok
  end
end
```

### Dispatch

```elixir
alias Orkestra.{CommandEnvelope, MessageBus}

{:ok, cmd} = MyApp.Commands.PlaceOrder.new(%{
  product_id: "sku_42",
  quantity: 3
}, actor_id: "user_123", source: "web")

bus = MessageBus.impl()
:ok = bus.dispatch(CommandEnvelope.wrap(cmd, max_retries: 2))
```

### Supervision tree

```elixir
children = [
  {Phoenix.PubSub, name: MyApp.PubSub},
  Orkestra.MessageBus.PubSub,
  MyApp.Handlers.PlaceOrderHandler,
  MyApp.Handlers.SendOrderConfirmation
]
```

## Configuration

### Message bus adapter

```elixir
# In-process (dev, test, single-node)
config :orkestra, Orkestra.MessageBus,
  adapter: Orkestra.MessageBus.PubSub,
  app_prefix: MyApp

config :orkestra, Orkestra.MessageBus.PubSub,
  pubsub: MyApp.PubSub

# Distributed (production, multi-node)
config :orkestra, Orkestra.MessageBus,
  adapter: Orkestra.MessageBus.RabbitMQ,
  app_prefix: MyApp

config :orkestra, Orkestra.MessageBus.RabbitMQ,
  channel_provider: fn -> MyApp.RabbitMQ.Connection.channel() end
```

### Topic derivation

Topics are derived automatically from module names. The `app_prefix` is stripped:

```
MyApp.Orders.Commands.PlaceOrder  ->  "orders.commands.place_order"
MyApp.Orders.Events.OrderPlaced   ->  "orders.events.order_placed"
```

### Event store

```elixir
# EventStoreDB (production)
config :orkestra, Orkestra.EventStore,
  adapter: Orkestra.EventStore.EventStoreDB

config :orkestra, Orkestra.EventStore.EventStoreDB,
  connection_string: "esdb://localhost:2113?tls=false"

# In-memory (test)
config :orkestra, Orkestra.EventStore,
  adapter: Orkestra.EventStore.InMemory
```

## Core concepts

### Commands

Commands represent an intent to change the system. They are validated, dispatched to a single handler, and either succeed or fail.

```elixir
defmodule CreateAccount do
  use Orkestra.Command

  param :email, :string, required: true
  param :name, :string, required: true
  param :plan, :string, default: "free"

  @impl true
  def validate(%{email: email}) do
    if String.contains?(email, "@"), do: :ok, else: {:error, :invalid_email}
  end
end

{:ok, cmd} = CreateAccount.new(%{email: "a@b.com", name: "Alice"})
{:error, {:missing_params, [:email]}} = CreateAccount.new(%{name: "Bob"})
{:error, :invalid_email} = CreateAccount.new(%{email: "nope", name: "Eve"})
```

### Events

Events represent something that happened. They are immutable facts, never rejected. Events can be derived from commands or other events, preserving the correlation chain.

```elixir
defmodule AccountCreated do
  use Orkestra.Event

  field :account_id, :string, required: true
  field :email, :string, required: true
  field :plan, :string, required: true
end

# From a command (preserves correlation, sets causation)
{:ok, event} = AccountCreated.from_command(cmd, %{
  account_id: "acc_123",
  email: "a@b.com",
  plan: "free"
})

event.metadata.correlation_id == cmd.metadata.correlation_id  # true
event.metadata.causation_id == cmd.id                         # true
```

### Metadata

Every command and event carries metadata that flows through the pipeline:

```elixir
%Orkestra.Metadata{
  correlation_id: "abc123",    # links an entire chain of commands/events
  causation_id: "cmd_456",     # what directly caused this
  actor_id: "user_789",        # who initiated it
  actor_type: :user,           # :user | :system | :expert | :scheduler
  source: "web",               # where it originated
  issued_at: ~U[2026-03-27 12:00:00Z]
}

# Derive child metadata (preserves correlation, sets causation)
child = Orkestra.Metadata.derive(parent_metadata, "parent_id")
```

### Envelopes

Envelopes wrap commands and events with dispatch context.

**Command envelopes** track dispatch lifecycle and retries:

```elixir
env = CommandEnvelope.wrap(cmd, max_retries: 3)
env.status    # :pending -> :dispatched -> :succeeded | :failed | :rejected

CommandEnvelope.retryable?(env)  # true if failed and attempts <= max_retries
```

**Event envelopes** track delivery to multiple handlers:

```elixir
env = EventEnvelope.wrap(event) |> EventEnvelope.mark_published()
env = EventEnvelope.register_handler(env, "SendEmail")
env = EventEnvelope.register_handler(env, "UpdateIndex")
env = EventEnvelope.mark_handler_succeeded(env, "SendEmail")
env = EventEnvelope.mark_handler_failed(env, "UpdateIndex")
env.status  # :partially_handled
```

### Event handlers

Subscribe to one event, multiple events, or wildcard patterns:

```elixir
# Single event
use Orkestra.EventHandler,
  event: MyApp.Events.OrderPlaced

# Multiple events
use Orkestra.EventHandler,
  events: [MyApp.Events.OrderPlaced, MyApp.Events.OrderCancelled]

# Wildcard pattern
use Orkestra.EventHandler,
  topic: "orders.events.#"

# With retry config
use Orkestra.EventHandler,
  event: MyApp.Events.OrderPlaced,
  max_retries: 5
```

## Message bus adapters

### PubSub (in-process)

Synchronous dispatch. Commands go to one handler, events broadcast to all subscribers. Retries are immediate (recursive). Dead-lettered messages are broadcast on `"orkestra:deadletter"`.

### RabbitMQ (distributed)

Commands use exchange `orkestra.commands` with one queue per command type (competing consumers). Events use exchange `orkestra.events` with one queue per handler (fan-out).

Features:
- Retry tracking via `x-death` headers (native RabbitMQ)
- Max retries via `x-max-retries` header
- Dead letter exchange `orkestra.deadletter` with catch-all queue
- All queues declared with DLX configuration
- W3C trace context propagation in AMQP headers

## Observability

Orkestra is instrumented with OpenTelemetry out of the box.

### Span hierarchy

```
orkestra.command.dispatch       (message bus)
  orkestra.command.handle       (command handler)
    orkestra.event.publish      (message bus)
      orkestra.event.handle     (event handler)
```

Additional spans: `orkestra.retry`, `orkestra.rabbitmq.publish` (kind: producer), `orkestra.rabbitmq.consume` (kind: consumer).

### Span attributes

All spans include: `orkestra.command.type`, `orkestra.command.id`, `orkestra.correlation_id`, `orkestra.causation_id`, `orkestra.actor_id`, `orkestra.handler`.

### Structured logging

All log messages use Logger metadata instead of string interpolation:

```
[info] Command handler subscribed  handler=MyApp.HandleOrder  topic=orders.commands.place_order  orkestra=command_handler
[warning] Handler nack, requeuing  handler=MyApp.HandleOrder  attempt=2  max_retries=3  orkestra=rabbitmq
[error] Dead letter recorded  handler=MyApp.HandleOrder  reason=:timeout  orkestra=pubsub
```

Logger metadata set during handler execution: `correlation_id`, `causation_id`, `actor_id`, `trace_id`, `span_id`.

### Distributed tracing (RabbitMQ)

Trace context is injected into AMQP message headers on publish and extracted on consume, creating linked spans across nodes. Uses `OpentelemetryProcessPropagator` for context propagation across BEAM processes.


## License

MIT