Skip to main content

guides/aggregates-and-event-sourcing.md

# Aggregates & event sourcing

An **aggregate** is a consistency boundary: a single entity (an account, an order, a
user) that decides whether a command is allowed and what should happen as a result.
In `X3m.System` an aggregate is **event-sourced** by default — its state is not stored
directly but rebuilt by replaying the events it has produced. (You can also persist
state directly instead; see [State persistence](#state-persistence-event-sourcing-or-not).)

This guide builds a small bank-account aggregate. It assumes you're comfortable with
the [messaging layer](messaging.md); aggregates plug into the same routers and
dispatcher.

## The moving parts

| Piece | Responsibility |
|---|---|
| `X3m.System.Aggregate` | decides how a command is handled and how events change state |
| `X3m.System.MessageHandler` | loads the aggregate, runs the command, persists events, replies |
| `X3m.System.Aggregate.Repo` | reads and writes the event stream (you implement it) |
| `X3m.System.Router` | routes a service call to the message handler |

How they collaborate when a command is dispatched — either the aggregate is already in
memory, or it must be rehydrated from its event stream first — then the new events are
persisted:

```mermaid
sequenceDiagram
  participant D as Dispatcher
  participant R as Router
  participant MH as MessageHandler
  participant Repo as Aggregate.Repo
  participant Agg as Aggregate
  D->>R: command message
  R->>R: authorize/1
  R->>MH: command message
  alt aggregate already in memory (pid registered)
    MH->>Agg: handle_msg(message, current state)
  else not running (when_pid_is_not_registered/3)
    MH->>Repo: load event stream
    Repo-->>MH: past events
    MH->>Agg: apply_event/2 per event (rebuild state)
    MH->>Agg: handle_msg(message, rebuilt state)
  end
  Agg-->>MH: {:block, message + events, state}
  Note over Agg: blocked — no further commands until commit + apply
  MH->>Repo: save_events
  MH->>Agg: apply_event/2 for new events (new state), then unblock
  MH-->>D: response, e.g. {:created, id, version}
```

With `:block`, the aggregate process holds further commands until the message handler has
committed and applied these events — so commands are serialized per aggregate and never run
against not-yet-persisted state. (`:noblock` returns immediately; nothing is persisted.)

## 1. The aggregate

`use X3m.System.Aggregate` and declare the starting state, one command handler per
command, and one `apply_event/2` clause per event:

```elixir
defmodule MyApp.Accounts.Aggregate do
  use X3m.System.Aggregate
  alias X3m.System.Message, as: SysMsg
  alias MyApp.Accounts.{Commands, Events, State}

  @impl X3m.System.Aggregate
  def initial_state, do: %State{}

  handle_msg :open_account, &Commands.Open.new/2, &handle_open/2
  handle_msg :deposit, &Commands.Deposit.new/2, &handle_deposit/2

  defp handle_open(%SysMsg{request: %Commands.Open{} = cmd} = msg, %State{status: :new} = state) do
    event = %Events.Opened{id: cmd.id, owner: cmd.owner}

    msg =
      msg
      |> SysMsg.add_event(event)
      |> SysMsg.created(cmd.id)

    {:block, msg, state}
  end

  defp handle_deposit(%SysMsg{request: %Commands.Deposit{} = cmd} = msg, %State{} = state) do
    event = %Events.Deposited{id: state.id, amount: cmd.amount}

    msg =
      msg
      |> SysMsg.add_event(event)
      |> SysMsg.ok()

    {:block, msg, state}
  end

  def apply_event(%Events.Opened{} = e, %State{} = state),
    do: %State{state | id: e.id, owner: e.owner, status: :open}

  def apply_event(%Events.Deposited{} = e, %State{} = state),
    do: %State{state | balance: state.balance + e.amount}
end
```

### Command handlers and the block/noblock contract

`handle_msg/3` takes a **validate** function and a **process** function:

- The validate function (`Commands.Open.new/2`) casts `message.raw_request` into a
  structured request and stores it with `X3m.System.Message.put_request/2`. If the
  request is invalid, `put_request/2` halts the message with a `:validation_error` and
  the process function never runs.
- The process function returns one of:
  - `{:block, message, state}` — there are events to persist. The message handler saves
    `message.events`, commits, and only then replies.
  - `{:noblock, message, state}` — nothing to persist; the response is returned as-is
    (e.g. an idempotent no-op or a rejected command).

A validate function looks like this (using a plain struct here; an `Ecto.Changeset`
works too, since `put_request/2` checks `valid?`):

```elixir
defmodule MyApp.Accounts.Commands.Open do
  defstruct [:id, :owner, valid?: true]

  alias X3m.System.Message, as: SysMsg

  def new(%SysMsg{raw_request: raw} = msg, _state) do
    cmd = %__MODULE__{
      id: raw["id"],
      owner: raw["owner"],
      valid?: is_binary(raw["id"]) and is_binary(raw["owner"])
    }

    SysMsg.put_request(cmd, msg)
  end
end
```

Use the two-argument `handle_msg/2` when a command needs no separate validation step —
pass a single function that returns the `{:block | :noblock, message, state}` tuple.

### Applying events

`apply_event/2` is the only place state changes. It runs both when a command produces a
new event and when the aggregate is rehydrated from history, so it must be pure. Events
your aggregate doesn't recognise fall through to a default clause that leaves state
unchanged.

### Idempotency

The macros skip a command whose `message.id` was already processed, returning `:ok`
without re-running it. Override `processed_message_id/1` to pull the originating id out
of your event metadata so this survives restarts.

## 2. The message handler

The message handler wires the aggregate to a persistence backend and exposes one
function per command:

```elixir
defmodule MyApp.Accounts.MessageHandler do
  use X3m.System.MessageHandler,
    aggregate_mod: MyApp.Accounts.Aggregate,
    aggregate_repo: MyApp.Accounts.AggregateRepo,
    stream: "accounts",
    pid_facade_mod: X3m.System.AggregatePidFacade,
    event_metadata: %{app_version: "1.0.0"}

  on_new_aggregate :open_account            # id read from raw_request["id"]
  on_aggregate :deposit, id: "account_id"   # id read from raw_request["account_id"]
end
```

Pick the macro that matches the command's intent:

- `on_new_aggregate` — creates a fresh aggregate. The id may be generated if missing.
- `on_aggregate` — loads an existing aggregate; responds `{:error, :not_found}` if there
  is no stream for that id.
- `on_maybe_new_aggregate` — loads the aggregate, creating it if it doesn't exist yet.

Each macro reads the aggregate id from `message.raw_request` (under `"id"` by default,
or the `:id` option), loads or spawns the aggregate process, runs the command, persists
any events, and enriches the response with the new version — `{:ok, version}` or
`{:created, id, version}`.

`:event_metadata` is merged into the metadata stored with every event (handy for
schema/app versions). `:commit_timeout` (per macro) bounds how long persistence may
take.

## 3. The event store (`Aggregate.Repo`)

`X3m.System` does not ship a concrete store — you implement `X3m.System.Aggregate.Repo`
against whatever you use (EventStoreDB, Postgres, an in-memory store for tests):

```elixir
defmodule MyApp.Accounts.AggregateRepo do
  use X3m.System.Aggregate.Repo

  @impl true
  def has?(stream_name), do: # ...

  @impl true
  def stream_events(stream_name, start_at, per_page), do: # ... enumerable of {event, number, metadata}

  @impl true
  def delete_stream(stream_name, hard_delete?, expected_version), do: :ok

  @impl true
  def save_events(stream_name, message, events_metadata), do: {:ok, _last_event_number}
end
```

Stream names are built from the handler's `:stream` option and the aggregate id, e.g.
`"accounts-acc-1"`.

## 4. Wiring it into your app

Register the router's services, and start the per-node aggregate processes by adding
`X3m.System.LocalAggregatesSupervision` to your supervision tree. First, list the
aggregate modules to run locally:

```elixir
defmodule MyApp.LocalAggregates do
  use X3m.System.LocalAggregates, [MyApp.Accounts.Aggregate]
end
```

Then in your application:

```elixir
def start(_type, _args) do
  :ok = MyApp.Router.register_services()

  children = [
    MyApp.Accounts.AggregateRepo,
    {X3m.System.LocalAggregatesSupervision, [MyApp.LocalAggregates, MyApp]}
  ]

  Supervisor.start_link(children, strategy: :one_for_one, name: MyApp.Supervisor)
end
```

Now a dispatch flows end to end:

```elixir
:open_account
|> X3m.System.Message.new(raw_request: %{"id" => "acc-1", "owner" => "Ada"})
|> X3m.System.Dispatcher.dispatch()
#=> %X3m.System.Message{response: {:created, "acc-1", 0}, ...}
```

## Testing the aggregate

Because an aggregate is just a module — no process, no event store, no dispatch — you
test it by calling its command functions directly and replaying the events they return.
`X3m.System.Aggregate.TestSupport` gives you the given/when/then pieces:
`command_message/3` builds the command message, `state_from_events/3` rebuilds the
*given* state from prior events, and `apply_events/4` folds the events a command *emitted*
back onto that state so you can assert the result:

```elixir
defmodule MyApp.Accounts.AggregateTest do
  use ExUnit.Case, async: true

  import X3m.System.Aggregate.TestSupport
  alias MyApp.Accounts.{Aggregate, Events, State}
  alias X3m.System.Message, as: SysMsg

  test "open_account emits Opened and responds :created" do
    # given
    state = X3m.System.Aggregate.initial_state(Aggregate)
    # when
    msg = command_message(:open_account, %{"id" => "acc-1", "owner" => "Ada"})

    assert {:block, %SysMsg{events: events, response: {:created, "acc-1"}}, _state} =
             Aggregate.open_account(msg, state)

    assert [%Events.Opened{id: "acc-1", owner: "Ada"}] = events

    # then — replay the emitted events to assert the resulting state
    assert %{client_state: %State{status: :open, owner: "Ada"}} =
             apply_events(Aggregate, state, events)
  end

  test "open_account with a missing owner is a validation error" do
    state = X3m.System.Aggregate.initial_state(Aggregate)
    msg = command_message(:open_account, %{"id" => "acc-1"})

    assert {:noblock,
            %SysMsg{events: [], response: {:validation_error, _request}, halted?: true},
            ^state} = Aggregate.open_account(msg, state)
  end

  test "deposit adds to the balance of an open account" do
    # given an already-opened account
    state = state_from_events(Aggregate, [%Events.Opened{id: "acc-1", owner: "Ada"}])

    # when
    deposit = command_message(:deposit, %{"account_id" => "acc-1", "amount" => 100})

    assert {:block, %SysMsg{events: events, response: :ok}, _state} =
             Aggregate.deposit(deposit, state)

    # then
    assert [%Events.Deposited{amount: 100}] = events
    assert %{client_state: %State{balance: 100}} = apply_events(Aggregate, state, events)
  end
end
```

A few things to note:

- The functions you call (`open_account/2`, `deposit/2`) are the ones `handle_msg`
  generates — named after the message and taking `(message, state)`.
- `command_message/3` builds the message (with `raw_request` and `assigns`),
  `state_from_events/3` is the *given* state, and `apply_events/4` is the *then* step.
- A command's validate function receives `(message, state)`, so it can validate against
  state — e.g. rejecting a deposit into a closed account with a `validation_error`.
- A command may emit events *and* return an error (`{:block, error, state}`) — e.g.
  recording a rejection while replying `{:error, _}` — and a single command may emit
  several events (closing an account can return the remaining balance and then close it).
  So assert the emitted events, not just the response.
- A failed validation comes back as `{:noblock, message, state}` with `message.events ==
  []`, `halted?: true` and a `{:validation_error, request}` response (authorization
  guards look the same with an `{:error, _}` response) — no setup required to assert.
- Calling a successful handler returns the events but does **not** apply them; state
  changes happen in `apply_event/2` at commit time. Replaying the returned events with
  `apply_events(events, version, state)` both gives you the resulting state to assert on
  and exercises your `apply_event/2` clauses.
- No message handler, repo, or running process is involved, so these are plain, fast,
  `async: true` unit tests.

## Testing the full lifecycle

The tests above cover your command logic. To exercise the *runtime* — loading and spawning
the aggregate process, persisting events, idempotency, and your handler's **overrides**
(`save_events/1`, `save_state/3`, `when_pid_is_not_registered/3`, `:unload_aggregate_on`) —
you need a running aggregate and a repo. In tests, back it with an **in-memory
`Aggregate.Repo`** instead of a real event store:

```elixir
defmodule MyApp.Accounts.InMemoryEventStore do
  @moduledoc false
  use X3m.System.Aggregate.Repo
  use Agent

  alias X3m.System.Message

  def start_link(_opts \\ []),
    do: Agent.start_link(fn -> %{} end, name: __MODULE__)

  def reset(),
    do: Agent.update(__MODULE__, fn _state -> %{} end)

  @impl true
  def has?(stream_name),
    do: Agent.get(__MODULE__, &Map.has_key?(&1, stream_name))

  @impl true
  def stream_events(stream_name, _start_at \\ 0, _per_page \\ 1_000),
    do: Agent.get(__MODULE__, &Map.get(&1, stream_name, []))

  @impl true
  def delete_stream(stream_name, _hard_delete?, _expected_version) do
    Agent.update(__MODULE__, &Map.delete(&1, stream_name))
    :ok
  end

  @impl true
  def save_events(stream_name, %Message{} = message, metadata) do
    __MODULE__
    |> Agent.get_and_update(fn streams ->
      existing = Map.get(streams, stream_name, [])
      current = length(existing) - 1

      # optimistic concurrency: the expected version is the aggregate's loaded version
      # (-1 for a new aggregate), so a "create" against an existing stream is rejected.
      if message.aggregate_meta.version != current do
        {{:error, :wrong_expected_version, current}, streams}
      else
        # stamp message.id into metadata so idempotency survives a rehydrate
        meta = Map.put(metadata, :message_id, message.id)

        appended =
          message.events
          |> Enum.with_index(current + 1)
          |> Enum.map(fn {event, number} -> {event, number, meta} end)

        last = current + length(message.events)
        {{:ok, last}, Map.put(streams, stream_name, existing ++ appended)}
      end
    end)
  end
end
```

Start it alongside the aggregate's supervision tree in `test/test_helper.exs`:

```elixir
{:ok, _} = MyApp.Accounts.InMemoryEventStore.start_link()

{:ok, _} =
  X3m.System.LocalAggregatesSupervision.start_link([MyApp.LocalAggregates, MyApp])
```

Point a test message handler at the in-memory store, then drive it by calling the generated
functions directly and asserting on both the reply and the stored events:

```elixir
defmodule MyApp.Accounts.MessageHandlerTest do
  # the pid facade and registry are named per aggregate module
  use ExUnit.Case, async: false

  import X3m.System.Aggregate.TestSupport
  alias MyApp.Accounts.{MessageHandler, InMemoryEventStore, Events}

  setup do
    InMemoryEventStore.reset()
    :ok
  end

  test "opens an account and persists the event" do
    id = UUID.uuid4()
    msg = command_message(:open_account, %{"id" => id, "owner" => "Ada"})

    assert {:reply, replied} = MessageHandler.open_account(msg)
    assert {:created, ^id, 0} == replied.response
    assert [{%Events.Opened{}, 0, _}] = InMemoryEventStore.stream_events("accounts-" <> id)
  end
end
```

Notes:

- Use a fresh aggregate id per test; reuse an id only when you are deliberately testing
  rehydration or idempotency (unload the process — `AggregatePidFacade.exit_process/3` — to
  force the next command to reload).
- `save_events/3` stamps `message.id` into each event's metadata so idempotency survives a
  rehydrate (the aggregate re-reads it via `processed_message_id/1` while replaying).
- This is how you test *your* overrides — `save_events/1` reacting to or filtering events,
  `:unload_aggregate_on` rules, the save/load callbacks below — rather than the library
  internals. For a non-event-sourced handler, point `save_state/3` and
  `when_pid_is_not_registered/3` at a plain in-memory store the same way.

## Validating without persisting (`dry_run`)

`X3m.System.Dispatcher.validate/1` dispatches the command with `dry_run: true`: the
aggregate runs the command exactly as it would normally, but the message handler does
**not** persist any events and the aggregate's `rollback/2` callback is invoked so it
can undo any side effects. The response tells you whether the command *would* have
succeeded. If a command performs side effects during handling (e.g. reserving a unique
value), implement the `commit/2` and `rollback/2` callbacks on the aggregate.

## State persistence: event sourcing or not

Event sourcing is the default, but how an aggregate's state is saved and restored is
governed by two overridable message-handler callbacks:

- `save_state/3` — called after each successful commit. Override it to persist the
  aggregate's state.
- `when_pid_is_not_registered/3` — called when a command targets an aggregate whose
  process isn't currently running. Override it to decide how that process is hydrated.

Together they support a spectrum:

- **Pure event sourcing (default):** `when_pid_is_not_registered/3` replays the event
  stream through `apply_event/2`; `save_state/3` does nothing.
- **Snapshots on top of events:** `save_state/3` stores a periodic snapshot;
  `when_pid_is_not_registered/3` loads the latest snapshot and replays only the events
  after it.
- **Non-event-sourced (state-based) aggregates:** `save_state/3` persists the full current
  state (it receives the wrapped `%X3m.System.Aggregate.State{}`, so you can store its
  `client_state` and `version`); `when_pid_is_not_registered/3` loads that row, spawns the
  process and seeds it with `X3m.System.GenAggregate.set_state(pid, loaded_state, version)` —
  no event replay at all. `set_state/3` rebuilds the process's `client_state` through your
  aggregate's `set_state/1` callback (the state-based analogue of `initial_state/0` — "for
  this loaded data, give me back your state"; defaults to the identity) and restores the
  version.

Your command logic (`handle_msg`) is identical in all three cases; only how state is
saved and restored differs.

To unload idle aggregates and reclaim memory, pass `:unload_aggregate_on` to
`use X3m.System.MessageHandler` with rules keyed on emitted events or current state.

When several nodes can host the same aggregate, see [Distribution](distribution.md) for
how a request is routed to the node where the aggregate already lives.