README.md

# erl-esdb

BEAM-native Event Store built on Khepri/Ra with Raft consensus.

![Architecture](assets/architecture.svg)

## Overview

erl-esdb is an Erlang implementation of a distributed event store designed for:
- **Event Sourcing**: Store and replay events with optimistic concurrency
- **Clustering**: Automatic node discovery and Raft-based replication
- **High Throughput**: Partitioned writers for concurrent stream writes
- **Edge & Datacenter**: Works on Nerves devices and Kubernetes clusters

## Features

- Event stream CRUD with versioning and optimistic concurrency
- Persistent subscriptions (stream, event type, pattern, payload matching)
- Snapshot management for aggregate state
- Emitter pools for high-throughput event delivery
- UDP multicast and Kubernetes DNS discovery
- BEAM telemetry with optional OpenTelemetry exporters

## Installation

Add to your `rebar.config`:

```erlang
{deps, [
    {erl_esdb, {git, "https://github.com/macula-io/erl-esdb.git", {branch, "main"}}}
]}.
```

## Quick Start

```erlang
%% Start the application
application:ensure_all_started(erl_esdb).

%% Append events to a stream
Events = [
    #{
        event_type => <<"user_created">>,
        data => #{name => <<"Alice">>, email => <<"alice@example.com">>},
        metadata => #{correlation_id => <<"req-123">>}
    }
],
{ok, Version} = erl_esdb_streams:append(my_store, <<"user-123">>, -1, Events).

%% Read events from a stream
{ok, ReadEvents} = erl_esdb_streams:read(my_store, <<"user-123">>, 0, 100, forward).

%% Subscribe to events
Subscription = #{
    type => by_stream,
    selector => <<"user-123">>,
    subscription_name => <<"user_projection">>,
    subscriber_pid => self(),
    pool_size => 4
},
{ok, SubId} = erl_esdb_subscriptions:create(my_store, Subscription).

%% Receive events
receive
    {event, Event} -> io:format("Received: ~p~n", [Event])
end.
```

## API Reference

### Streams

```erlang
%% Append events (returns new version)
erl_esdb_streams:append(StoreId, StreamId, ExpectedVersion, Events) ->
    {ok, NewVersion} | {error, version_mismatch | term()}.

%% Read events
erl_esdb_streams:read(StoreId, StreamId, FromVersion, Count, Direction) ->
    {ok, [Event]} | {error, stream_not_found | term()}.

%% Check if stream exists
erl_esdb_streams:exists(StoreId, StreamId) -> boolean().

%% Get stream version
erl_esdb_streams:version(StoreId, StreamId) -> {ok, Version} | {error, term()}.

%% Delete stream (soft delete)
erl_esdb_streams:delete(StoreId, StreamId, ExpectedVersion) -> ok | {error, term()}.
```

### Subscriptions

```erlang
%% Create subscription
erl_esdb_subscriptions:create(StoreId, SubscriptionMap) -> {ok, SubscriptionId}.

%% Delete subscription
erl_esdb_subscriptions:delete(StoreId, SubscriptionId) -> ok.

%% List subscriptions
erl_esdb_subscriptions:list(StoreId) -> {ok, [Subscription]}.

%% Subscription types:
%%   by_stream - Events from a specific stream
%%   by_event_type - Events matching event type
%%   by_pattern - Events matching stream pattern (wildcards)
%%   by_payload - Events matching payload criteria
```

### Snapshots

```erlang
%% Save snapshot
erl_esdb_snapshots:save(StoreId, StreamId, Version, Data, Metadata) -> ok.

%% Load latest snapshot
erl_esdb_snapshots:load(StoreId, StreamId) -> {ok, Snapshot} | {error, not_found}.

%% Load snapshot at specific version
erl_esdb_snapshots:load(StoreId, StreamId, Version) -> {ok, Snapshot} | {error, not_found}.

%% List all snapshots for stream
erl_esdb_snapshots:list(StoreId, StreamId) -> {ok, [Snapshot]}.

%% Delete snapshot
erl_esdb_snapshots:delete(StoreId, StreamId, Version) -> ok.
```

### Aggregation

```erlang
%% Fold over events with accumulator
erl_esdb_aggregator:foldl(StoreId, StreamId, Fun, InitAcc) -> {ok, FinalAcc}.

%% Fold with snapshot support (starts from latest snapshot)
erl_esdb_aggregator:foldl_from_snapshot(StoreId, StreamId, Fun, DefaultAcc) -> {ok, FinalAcc}.
```

### Telemetry

```erlang
%% Attach default logger handler
erl_esdb_telemetry:attach_default_handler() -> ok.

%% Attach custom handler
erl_esdb_telemetry:attach(HandlerId, HandlerFun, Config) -> ok.

%% Detach handler
erl_esdb_telemetry:detach(HandlerId) -> ok.
```

## Configuration

```erlang
%% sys.config
[{erl_esdb, [
    {stores, [
        {my_store, [
            {data_dir, "/var/lib/erl_esdb/my_store"},
            {mode, cluster},  %% single | cluster
            {timeout, 5000}
        ]}
    ]},
    {telemetry_handlers, [logger]},
    {writer_pool_size, 10},
    {reader_pool_size, 10},

    %% Cluster discovery (cluster mode only)
    {discovery, [
        {method, multicast},  %% multicast | k8s_dns
        {port, 45892},
        {multicast_addr, {239, 255, 0, 1}},
        {secret, <<"cluster_secret">>}
    ]}
]}].
```

## Architecture

### Supervision Tree

```
erl_esdb_system_sup (rest_for_one) <per store_id>
├── erl_esdb_core_sup (one_for_all)
│   ├── erl_esdb_persistence_sup (one_for_one)
│   │   ├── erl_esdb_store (Khepri lifecycle)
│   │   ├── erl_esdb_streams_sup (partitioned writers/readers)
│   │   ├── erl_esdb_snapshots_store
│   │   └── erl_esdb_subscriptions_store
│   ├── erl_esdb_notification_sup (one_for_one)
│   │   ├── erl_esdb_leader_sup
│   │   │   ├── erl_esdb_leader_tracker
│   │   │   └── erl_esdb_leader
│   │   └── erl_esdb_emitter_sup
│   │       └── <dynamic emitter_pool children>
│   └── erl_esdb_store_mgr
├── erl_esdb_cluster_sup (one_for_one) [cluster mode]
│   ├── erl_esdb_discovery
│   ├── erl_esdb_store_coordinator
│   └── erl_esdb_node_monitor
└── erl_esdb_gateway_sup
```

### Event Flow

```
Write Request
    │
    ▼
erl_esdb_streams:append()
    │
    ▼
Partitioned Writer (hash by stream_id)
    │
    ▼
Khepri/Ra (Raft consensus)
    │
    ▼
Khepri Trigger fires
    │
    ▼
Emitter Pool broadcasts to subscribers
```

## Telemetry Events

| Event | Measurements | Metadata |
|-------|--------------|----------|
| `[erl_esdb, stream, write, start]` | system_time | store_id, stream_id, event_count |
| `[erl_esdb, stream, write, stop]` | duration, event_count | store_id, stream_id, new_version |
| `[erl_esdb, stream, write, error]` | duration | store_id, stream_id, reason |
| `[erl_esdb, stream, read, start]` | system_time | store_id, stream_id |
| `[erl_esdb, stream, read, stop]` | duration, event_count | store_id, stream_id |
| `[erl_esdb, subscription, created]` | system_time | store_id, subscription_id, type |
| `[erl_esdb, subscription, deleted]` | system_time | store_id, subscription_id |
| `[erl_esdb, snapshot, created]` | duration, size_bytes | store_id, stream_id, version |
| `[erl_esdb, cluster, node, up]` | system_time | store_id, node, member_count |
| `[erl_esdb, cluster, node, down]` | system_time | store_id, node, reason |
| `[erl_esdb, cluster, leader, elected]` | system_time | store_id, leader |

## Building

```bash
rebar3 compile         # Compile
rebar3 eunit           # Unit tests
rebar3 ct              # Integration tests
rebar3 dialyzer        # Type checking
rebar3 cover           # Coverage report
```

## Testing

Test counts:
- **Unit tests**: 72 tests (aggregator, config, naming, telemetry)
- **Integration tests**: 53 tests (streams, subscriptions, snapshots, cluster)

```bash
rebar3 eunit                          # All unit tests
rebar3 ct --suite=erl_esdb_streams_SUITE   # Streams tests
rebar3 ct --suite=erl_esdb_cluster_SUITE   # Cluster tests
```

## Related Projects

- [erl-esdb-gater](https://github.com/macula-io/erl-esdb-gater) - Gateway for distributed access
- [ex-esdb](https://github.com/beam-campus/ex-esdb) - Original Elixir implementation

## License

Apache-2.0