README.md

# erl-esdb-gater

[![Hex.pm](https://img.shields.io/hexpm/v/erl_esdb_gater.svg)](https://hex.pm/packages/erl_esdb_gater)
[![Hexdocs.pm](https://img.shields.io/badge/docs-hexdocs-blue.svg)](https://hexdocs.pm/erl_esdb_gater)

Gateway for distributed access to [erl-esdb](https://github.com/macula-io/erl-esdb) event stores.

![Gateway Architecture](assets/gateway_architecture.svg)

## Overview

erl-esdb-gater is an Erlang gateway service providing:

- **Distributed Worker Registry**: pg-based registry for cluster-wide worker discovery
- **Load Balancing**: Round-robin with exponential backoff retry
- **Shared Type Definitions**: Common records for events, snapshots, and subscriptions
- **Capability-Based Security**: UCAN-inspired tokens for decentralized authorization
- **PubSub Channels**: 10 dedicated channels with priority-based delivery
- **HMAC Security**: Message signing for critical channels
- **Telemetry**: BEAM telemetry with optional OpenTelemetry exporters

## Installation

### Community Edition (hex.pm)

Add to your `rebar.config`:

```erlang
{deps, [
    {erl_esdb_gater, "0.4.0"}
]}.
```

Pure Erlang implementation - works everywhere, no native dependencies.

### Enterprise Edition (git)

For NIF-accelerated performance (5-10x faster crypto operations):

```erlang
{deps, [
    {erl_esdb_gater, {git, "https://github.com/macula-io/erl-esdb-gater.git", {tag, "v0.4.0"}}}
]}.
```

To enable NIF acceleration:

1. Install Rust: `curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh`
2. Uncomment the NIF hooks in `rebar.config` (see comments in file)
3. Run `rebar3 compile`

The NIF provides accelerated Base58 encoding/decoding for DID operations. When unavailable, pure Erlang fallbacks are used automatically.

## Quick Start

```erlang
%% Start the application (typically started by erl-esdb)
application:ensure_all_started(erl_esdb_gater).

%% Append events to a stream
Events = [#{type => <<"user_created">>, data => #{name => <<"Alice">>}}],
{ok, Version} = esdb_gater_api:append_events(my_store, <<"users-123">>, Events).

%% Read events from a stream
{ok, EventList} = esdb_gater_api:stream_forward(my_store, <<"users-123">>, 0, 100).

%% Subscribe to PubSub channel
ok = esdb_channel_server:subscribe(esdb_channel_events, <<"user.*">>, self()).

%% Receive channel messages
receive
    {channel_message, esdb_channel_events, _Topic, Event} ->
        handle_event(Event)
end.
```

## API Reference

### Stream Operations

```erlang
%% Append events to a stream
esdb_gater_api:append_events(StoreId, StreamId, Events) ->
    {ok, NewVersion} | {error, term()}.
esdb_gater_api:append_events(StoreId, StreamId, ExpectedVersion, Events) ->
    {ok, NewVersion} | {error, term()}.

%% Read events from a stream
esdb_gater_api:get_events(StoreId, StreamId, StartVersion, Count, Direction) ->
    {ok, [Event]} | {error, term()}.
esdb_gater_api:stream_forward(StoreId, StreamId, StartVersion, Count) ->
    {ok, [Event]} | {error, term()}.
esdb_gater_api:stream_backward(StoreId, StreamId, StartVersion, Count) ->
    {ok, [Event]} | {error, term()}.

%% Stream metadata
esdb_gater_api:get_version(StoreId, StreamId) -> {ok, Version} | {error, term()}.
esdb_gater_api:stream_exists(StoreId, StreamId) -> boolean().
esdb_gater_api:get_streams(StoreId) -> {ok, [StreamId]} | {error, term()}.
```

### Subscription Operations

```erlang
%% Create a subscription
esdb_gater_api:save_subscription(StoreId, Type, Selector, Name, StartFrom, Subscriber) ->
    ok | {error, term()}.

%% Remove a subscription
esdb_gater_api:remove_subscription(StoreId, Type, Selector, Name) ->
    ok | {error, term()}.

%% Acknowledge event processing
esdb_gater_api:ack_event(StoreId, StreamId, SubscriptionName, EventNumber) ->
    ok | {error, term()}.

%% List subscriptions
esdb_gater_api:get_subscriptions(StoreId) -> {ok, [Subscription]} | {error, term()}.
```

### Snapshot Operations

```erlang
%% Record a snapshot
esdb_gater_api:record_snapshot(StoreId, SourceUuid, StreamUuid, Version, Record) ->
    ok | {error, term()}.

%% Read a snapshot
esdb_gater_api:read_snapshot(StoreId, SourceUuid, StreamUuid, Version) ->
    {ok, Snapshot} | {error, term()}.

%% Delete a snapshot
esdb_gater_api:delete_snapshot(StoreId, SourceUuid, StreamUuid, Version) ->
    ok | {error, term()}.

%% List snapshots
esdb_gater_api:list_snapshots(StoreId, SourceUuid, StreamUuid) ->
    {ok, [Snapshot]} | {error, term()}.
```

### Health

```erlang
esdb_gater_api:health() -> healthy | {degraded, Reason} | {unhealthy, Reason}.
esdb_gater_api:quick_health_check(StoreId) -> ok | {error, term()}.
```

### Channels

```erlang
%% Subscribe to a topic
esdb_channel_server:subscribe(ChannelName, Topic, Pid) -> ok.

%% Subscribe with capability token (for authorization)
esdb_channel_server:subscribe(ChannelName, Topic, Pid, CapabilityToken) ->
    ok | {error, {unauthorized, Reason}}.

%% Unsubscribe from a topic
esdb_channel_server:unsubscribe(ChannelName, Topic, Pid) -> ok.

%% Publish a message
esdb_channel_server:publish(ChannelName, Topic, Message) ->
    ok | {error, rate_limited | signature_required | invalid_signature}.

%% Publish with capability token (for authorization)
esdb_channel_server:publish(ChannelName, Topic, Message, CapabilityToken) ->
    ok | {error, {unauthorized, Reason}}.
```

### Security

```erlang
%% Sign a message with default secret
esdb_pubsub_security:sign(Message) -> SignedMessage.

%% Sign with custom secret
esdb_pubsub_security:sign(Message, Secret) -> SignedMessage.

%% Verify a signed message
esdb_pubsub_security:verify(SignedMessage) -> ok | {error, Reason}.

%% Set the default secret
esdb_pubsub_security:set_secret(Secret) -> ok.
```

### Retry Configuration

```erlang
%% Create custom retry config
Config = esdb_gater_retry:new_config(
    100,     %% base_delay_ms
    5000,    %% max_delay_ms
    5        %% max_attempts
),

%% Execute with custom retry
esdb_gater_api:execute(my_store, Fun, Config).
```

## Channels

![PubSub Channels](assets/channels.svg)

The gateway provides 10 dedicated PubSub channels:

| Channel | Priority | Rate Limit | HMAC | Purpose |
|---------|----------|------------|------|---------|
| `esdb_channel_alerts` | critical | unlimited | required | Critical system alerts |
| `esdb_channel_security` | critical | unlimited | required | Security events |
| `esdb_channel_events` | high | unlimited | optional | Business events |
| `esdb_channel_health` | high | 100/sec | optional | Health checks |
| `esdb_channel_system` | normal | unlimited | optional | System notifications |
| `esdb_channel_metrics` | normal | 10000/sec | optional | Performance metrics |
| `esdb_channel_audit` | normal | unlimited | optional | Audit trail |
| `esdb_channel_lifecycle` | normal | unlimited | optional | Lifecycle events |
| `esdb_channel_logging` | low | 1000/sec | optional | Log messages |
| `esdb_channel_diagnostics` | low | 100/sec | optional | Diagnostic info |

### Channel Priorities

- **critical**: Immediate delivery, no rate limiting, HMAC required
- **high**: Priority delivery, minimal rate limiting
- **normal**: Standard delivery
- **low**: Background delivery, may be rate limited

## Architecture

### Supervision Tree

![Supervision Tree](assets/supervision_tree.svg)

### Worker Registry Flow

![Worker Registry Flow](assets/worker_registry_flow.svg)

### Channel Message Flow

![Channel Message Flow](assets/channel_message_flow.svg)

## Configuration

```erlang
%% sys.config
[{erl_esdb_gater, [
    %% Cluster configuration
    {cluster, [
        {port, 45893},
        {multicast_addr, {239, 255, 0, 2}}
    ]},

    %% Retry defaults
    {retry, [
        {base_delay_ms, 100},
        {max_delay_ms, 30000},
        {max_attempts, 10}
    ]},

    %% Channel configuration
    {channels, [
        {esdb_channel_events, [
            {priority, high}
        ]},
        {esdb_channel_metrics, [
            {max_rate, 10000}
        ]}
    ]},

    %% Security
    {security, [
        {hmac_secret, <<"your_secret_here">>},
        {message_ttl_seconds, 300}
    ]},

    %% Telemetry
    {telemetry_handlers, [logger]}
]}].
```

## Telemetry Events

| Event | Measurements | Metadata |
|-------|--------------|----------|
| `[esdb_gater, worker, registered]` | system_time | store_id, node, pid |
| `[esdb_gater, worker, unregistered]` | system_time | store_id, pid |
| `[esdb_gater, worker, lookup]` | duration | store_id |
| `[esdb_gater, request, start]` | system_time | store_id, request_type |
| `[esdb_gater, request, stop]` | duration | store_id, request_type, result |
| `[esdb_gater, request, error]` | duration | store_id, request_type, reason |
| `[esdb_gater, retry, attempt]` | delay_ms, attempt | store_id, reason |
| `[esdb_gater, retry, exhausted]` | total_attempts | store_id, reason |
| `[esdb_gater, cluster, node, up]` | system_time | node, member_count |
| `[esdb_gater, cluster, node, down]` | system_time | node, member_count |
| `[esdb_gater, channel, broadcast]` | recipient_count | channel, topic |

### Attaching Handlers

```erlang
%% Attach default logger handler
ok = esdb_gater_telemetry:attach_default_handler().

%% Attach custom handler
Handler = fun(Event, Measurements, Meta, Config) ->
    %% Your custom handling
    ok
end,
ok = esdb_gater_telemetry:attach(my_handler, Handler, #{}).

%% Detach handler
ok = esdb_gater_telemetry:detach(my_handler).
```

## Building

```bash
rebar3 compile         # Compile
rebar3 eunit           # Unit tests (44 tests)
rebar3 ct              # Integration tests (8 tests)
rebar3 dialyzer        # Type checking
```

## Testing

Test counts:
- **Unit tests**: 44 tests (retry, security, telemetry)
- **Integration tests**: 8 tests (channel system)
- **End-to-end tests**: 24 tests (with erl-esdb, run from erl-esdb)

```bash
rebar3 eunit                                    # All unit tests
rebar3 ct --suite=esdb_channel_SUITE            # Channel tests
```

Run e2e tests from erl-esdb:
```bash
cd /path/to/erl-esdb
rebar3 ct --suite=test/e2e/erl_esdb_gater_e2e_SUITE
```

## Integration with erl-esdb

erl-esdb-gater is designed to work with [erl-esdb](https://github.com/macula-io/erl-esdb) to provide load-balanced, distributed access to event stores.

### Automatic Worker Registration

When both packages are deployed on the same nodes:

1. **erl-esdb** gateway workers automatically register with **erl-esdb-gater**
2. No manual registration is required
3. Worker cleanup is automatic when nodes leave or workers crash

![Automatic Worker Registration](assets/worker_registration.svg)

### Accessing the Event Store

Use the gateway API to access erl-esdb with automatic load balancing and retry:

```erlang
%% Stream operations
{ok, Version} = esdb_gater_api:append_events(my_store, StreamId, Events).
{ok, Version} = esdb_gater_api:append_events(my_store, StreamId, ExpectedVersion, Events).
{ok, Events} = esdb_gater_api:stream_forward(my_store, StreamId, 0, 100).
{ok, Events} = esdb_gater_api:stream_backward(my_store, StreamId, 100, 50).
{ok, Version} = esdb_gater_api:get_version(my_store, StreamId).
true = esdb_gater_api:stream_exists(my_store, StreamId).

%% Subscription operations
ok = esdb_gater_api:save_subscription(my_store, stream, StreamId, <<"my_sub">>, 0, self()).
ok = esdb_gater_api:remove_subscription(my_store, stream, StreamId, <<"my_sub">>).
ok = esdb_gater_api:ack_event(my_store, StreamId, <<"my_sub">>, EventNumber).
{ok, Subs} = esdb_gater_api:get_subscriptions(my_store).

%% Snapshot operations
ok = esdb_gater_api:record_snapshot(my_store, SourceUuid, StreamUuid, Version, Record).
{ok, Snap} = esdb_gater_api:read_snapshot(my_store, SourceUuid, StreamUuid, Version).
ok = esdb_gater_api:delete_snapshot(my_store, SourceUuid, StreamUuid, Version).
{ok, Snaps} = esdb_gater_api:list_snapshots(my_store, SourceUuid, StreamUuid).

%% Health check
healthy = esdb_gater_api:health().
ok = esdb_gater_api:quick_health_check(my_store).
```

### Deployment

erl-esdb includes erl-esdb-gater as a dependency. Starting erl-esdb automatically starts the gateway:

```erlang
%% Start erl-esdb (includes gater)
application:ensure_all_started(erl_esdb).

%% Gateway workers auto-register with the pg-based registry
%% Use the gater API for all operations
{ok, Version} = esdb_gater_api:append_events(my_store, StreamId, Events).
```

In a multi-node cluster, each node runs erl-esdb with its gateway worker. The pg-based registry provides:
- Cluster-wide worker discovery via `pg:get_members/2`
- Eventual consistency (workers visible across all nodes)
- Automatic cleanup on node failure (pg membership)
- Load balancing with round-robin selection
- Exponential backoff retry on failures

## Shared Types

erl-esdb-gater provides shared type definitions used across the ecosystem. Include them in your modules:

```erlang
-include_lib("erl_esdb_gater/include/esdb_gater_types.hrl").
```

### Records

| Record | Purpose |
|--------|---------|
| `#event{}` | Event with type, data (Erlang term), and metadata |
| `#snapshot{}` | Aggregate snapshot at a specific version |
| `#subscription{}` | Subscription state and configuration |
| `#append_result{}` | Result of an append operation |

### Version Constants

| Constant | Value | Purpose |
|----------|-------|---------|
| `?NO_STREAM` | -1 | Stream must not exist (first write) |
| `?ANY_VERSION` | -2 | No version check, always append |
| `?STREAM_EXISTS` | -4 | Stream must exist |

See the [Shared Types Guide](guides/shared_types.md) for detailed usage.

## Related Projects

![Ecosystem](assets/ecosystem.svg)

| Project | Description |
|---------|-------------|
| [erl-esdb](https://github.com/macula-io/erl-esdb) | Core event store built on Khepri/Ra |
| [erl-evoq](https://github.com/macula-io/erl-evoq) | CQRS/Event Sourcing framework |
| [erl-evoq-esdb](https://github.com/macula-io/erl-evoq-esdb) | Adapter connecting erl-evoq to erl-esdb |

## License

Apache-2.0