README.md

# erl-esdb-gater

Gateway for distributed access to erl-esdb event stores.

![Gateway Architecture](assets/gateway_architecture.svg)

## Overview

erl-esdb-gater is an Erlang gateway service providing:
- **Distributed Worker Registry**: pg-based registry for cluster-wide worker discovery
- **Load Balancing**: Round-robin with exponential backoff retry
- **PubSub Channels**: 10 dedicated channels with priority-based delivery
- **HMAC Security**: Message signing for critical channels
- **Telemetry**: BEAM telemetry with optional OpenTelemetry exporters

## Installation

Add to your `rebar.config`:

```erlang
{deps, [
    {erl_esdb_gater, {git, "https://github.com/macula-io/erl-esdb-gater.git", {branch, "main"}}}
]}.
```

## Quick Start

```erlang
%% Start the application
application:ensure_all_started(erl_esdb_gater).

%% Register a worker for a store
ok = esdb_gater_api:register_worker(my_store, self()).

%% Execute an operation with automatic retry and load balancing
{ok, Result} = esdb_gater_api:execute(my_store, fun(Worker) ->
    gen_server:call(Worker, {append, StreamId, Events})
end).

%% Subscribe to events channel
Topic = <<"events.user.created">>,
ok = esdb_channel_server:subscribe(esdb_channel_events, Topic, self()).

%% Receive events
receive
    {channel_message, esdb_channel_events, Topic, Event} ->
        handle_event(Event)
end.

%% Publish to alerts channel (requires HMAC)
SignedMsg = esdb_pubsub_security:sign(#{type => alert, level => critical}),
ok = esdb_channel_server:publish(esdb_channel_alerts, <<"alerts.critical">>, SignedMsg).
```

## API Reference

### Worker Registry

```erlang
%% Register a worker for a store
esdb_gater_api:register_worker(StoreId, Pid) -> ok | {error, term()}.

%% Unregister a worker
esdb_gater_api:unregister_worker(StoreId, Pid) -> ok | {error, term()}.

%% Get all workers for a store
esdb_gater_api:get_workers(StoreId) -> {ok, [{Node, Pid}]} | {error, term()}.

%% Get local node workers only
esdb_gater_api:get_local_workers(StoreId) -> [Pid].
```

### Operations

```erlang
%% Execute operation with load balancing and retry
esdb_gater_api:execute(StoreId, Fun) -> {ok, Result} | {error, Reason}.
esdb_gater_api:execute(StoreId, Fun, RetryConfig) -> {ok, Result} | {error, Reason}.

%% Fire and forget (no retry)
esdb_gater_api:cast(StoreId, Fun) -> ok | {error, no_workers}.
```

### Channels

```erlang
%% Subscribe to a topic
esdb_channel_server:subscribe(ChannelName, Topic, Pid) -> ok.

%% Unsubscribe from a topic
esdb_channel_server:unsubscribe(ChannelName, Topic, Pid) -> ok.

%% Publish a message
esdb_channel_server:publish(ChannelName, Topic, Message) ->
    ok | {error, rate_limited | signature_required | invalid_signature}.
```

### Security

```erlang
%% Sign a message with default secret
esdb_pubsub_security:sign(Message) -> SignedMessage.

%% Sign with custom secret
esdb_pubsub_security:sign(Message, Secret) -> SignedMessage.

%% Verify a signed message
esdb_pubsub_security:verify(SignedMessage) -> ok | {error, Reason}.

%% Set the default secret
esdb_pubsub_security:set_secret(Secret) -> ok.
```

### Retry Configuration

```erlang
%% Create custom retry config
Config = esdb_gater_retry:new_config(
    100,     %% base_delay_ms
    5000,    %% max_delay_ms
    5        %% max_attempts
),

%% Execute with custom retry
esdb_gater_api:execute(my_store, Fun, Config).
```

## Channels

![PubSub Channels](assets/channels.svg)

The gateway provides 10 dedicated PubSub channels:

| Channel | Priority | Rate Limit | HMAC | Purpose |
|---------|----------|------------|------|---------|
| `esdb_channel_alerts` | critical | unlimited | required | Critical system alerts |
| `esdb_channel_security` | critical | unlimited | required | Security events |
| `esdb_channel_events` | high | unlimited | optional | Business events |
| `esdb_channel_health` | high | 100/sec | optional | Health checks |
| `esdb_channel_system` | normal | unlimited | optional | System notifications |
| `esdb_channel_metrics` | normal | 10000/sec | optional | Performance metrics |
| `esdb_channel_audit` | normal | unlimited | optional | Audit trail |
| `esdb_channel_lifecycle` | normal | unlimited | optional | Lifecycle events |
| `esdb_channel_logging` | low | 1000/sec | optional | Log messages |
| `esdb_channel_diagnostics` | low | 100/sec | optional | Diagnostic info |

### Channel Priorities

- **critical**: Immediate delivery, no rate limiting, HMAC required
- **high**: Priority delivery, minimal rate limiting
- **normal**: Standard delivery
- **low**: Background delivery, may be rate limited

## Architecture

### Supervision Tree

```
erl_esdb_gater_sup (one_for_one)
├── esdb_gater_cluster_sup (rest_for_one)
│   ├── esdb_gater_worker_registry (pg-based)
│   └── esdb_gater_cluster_monitor
└── esdb_channel_sup (one_for_one)
    ├── esdb_channel_alerts
    ├── esdb_channel_security
    ├── esdb_channel_events
    ├── esdb_channel_health
    ├── esdb_channel_system
    ├── esdb_channel_metrics
    ├── esdb_channel_audit
    ├── esdb_channel_lifecycle
    ├── esdb_channel_logging
    └── esdb_channel_diagnostics
```

### Worker Registry Flow

```
Register Worker
    │
    ▼
esdb_gater_api:register_worker()
    │
    ▼
esdb_gater_worker_registry (gen_server)
    │
    ▼
pg:join() (broadcasts to all cluster nodes)
    │
    ▼
Process monitor attached

Execute Operation
    │
    ▼
esdb_gater_api:execute()
    │
    ▼
Get workers → Select (round-robin)
    │
    ▼
Call worker with timeout
    │                     │
    ▼                     ▼
Success              Failure
    │                     │
    ▼                     ▼
Return result     Exponential backoff → Retry
```

### Channel Message Flow

```
Publisher
    │
    ▼
esdb_channel_server:publish()
    │
    ├─→ Rate limit check
    │
    ├─→ HMAC verification (if required)
    │
    ▼
pg:get_members() for topic
    │
    ▼
Broadcast to all subscribers
    │
    ▼
{channel_message, Channel, Topic, Message}
```

## Configuration

```erlang
%% sys.config
[{erl_esdb_gater, [
    %% Cluster configuration
    {cluster, [
        {port, 45893},
        {multicast_addr, {239, 255, 0, 2}}
    ]},

    %% Retry defaults
    {retry, [
        {base_delay_ms, 100},
        {max_delay_ms, 30000},
        {max_attempts, 10}
    ]},

    %% Channel configuration
    {channels, [
        {esdb_channel_events, [
            {priority, high}
        ]},
        {esdb_channel_metrics, [
            {max_rate, 10000}
        ]}
    ]},

    %% Security
    {security, [
        {hmac_secret, <<"your_secret_here">>},
        {message_ttl_seconds, 300}
    ]},

    %% Telemetry
    {telemetry_handlers, [logger]}
]}].
```

## Telemetry Events

| Event | Measurements | Metadata |
|-------|--------------|----------|
| `[esdb_gater, worker, registered]` | system_time | store_id, node, pid |
| `[esdb_gater, worker, unregistered]` | system_time | store_id, pid |
| `[esdb_gater, worker, lookup]` | duration | store_id |
| `[esdb_gater, request, start]` | system_time | store_id, request_type |
| `[esdb_gater, request, stop]` | duration | store_id, request_type, result |
| `[esdb_gater, request, error]` | duration | store_id, request_type, reason |
| `[esdb_gater, retry, attempt]` | delay_ms, attempt | store_id, reason |
| `[esdb_gater, retry, exhausted]` | total_attempts | store_id, reason |
| `[esdb_gater, cluster, node, up]` | system_time | node, member_count |
| `[esdb_gater, cluster, node, down]` | system_time | node, member_count |
| `[esdb_gater, channel, broadcast]` | recipient_count | channel, topic |

### Attaching Handlers

```erlang
%% Attach default logger handler
ok = esdb_gater_telemetry:attach_default_handler().

%% Attach custom handler
Handler = fun(Event, Measurements, Meta, Config) ->
    %% Your custom handling
    ok
end,
ok = esdb_gater_telemetry:attach(my_handler, Handler, #{}).

%% Detach handler
ok = esdb_gater_telemetry:detach(my_handler).
```

## Building

```bash
rebar3 compile         # Compile
rebar3 eunit           # Unit tests (44 tests)
rebar3 ct              # Integration tests (8 tests)
rebar3 dialyzer        # Type checking
```

## Testing

Test counts:
- **Unit tests**: 44 tests (retry, security, telemetry)
- **Integration tests**: 8 tests (channel system)
- **End-to-end tests**: 24 tests (with erl-esdb, run from erl-esdb)

```bash
rebar3 eunit                                    # All unit tests
rebar3 ct --suite=esdb_channel_SUITE            # Channel tests
```

Run e2e tests from erl-esdb:
```bash
cd /path/to/erl-esdb
rebar3 ct --suite=test/e2e/erl_esdb_gater_e2e_SUITE
```

## Integration with erl-esdb

erl-esdb-gater is designed to work with [erl-esdb](https://github.com/macula-io/erl-esdb) to provide load-balanced, distributed access to event stores.

### Automatic Worker Registration

When both packages are deployed on the same nodes:

1. **erl-esdb** gateway workers automatically register with **erl-esdb-gater**
2. No manual registration is required
3. Worker cleanup is automatic when nodes leave or workers crash

```
┌─────────────────────────────────────────────────────────────────────┐
│                        Client Applications                          │
│                                                                     │
│   esdb_gater_api:call(my_store, {append, StreamId, -1, Events})    │
└──────────────────────────────┬──────────────────────────────────────┘
                               │
                               ▼
┌─────────────────────────────────────────────────────────────────────┐
│                      erl-esdb-gater                                 │
│                                                                     │
│   Worker Registry ──────► Load Balancer ──────► Retry Handler       │
│   (pg groups)            (round-robin)       (exponential backoff)  │
└──────────────────────────────┬──────────────────────────────────────┘
                               │
            ┌──────────────────┼──────────────────┐
            ▼                  ▼                  ▼
┌───────────────────┐ ┌───────────────────┐ ┌───────────────────┐
│ erl-esdb Node 1   │ │ erl-esdb Node 2   │ │ erl-esdb Node 3   │
│ Gateway Worker    │ │ Gateway Worker    │ │ Gateway Worker    │
│   (registered)    │ │   (registered)    │ │   (registered)    │
└───────────────────┘ └───────────────────┘ └───────────────────┘
```

### Accessing the Event Store

Use the gateway API to access erl-esdb with automatic load balancing and retry:

```erlang
%% Stream operations
{ok, Result} = esdb_gater_api:call(my_store, {append, StreamId, ExpectedVersion, Events}).
{ok, Events} = esdb_gater_api:call(my_store, {read, StreamId, Start, Count, Direction}).
{ok, Version} = esdb_gater_api:call(my_store, {get_version, StreamId}).
{ok, Exists} = esdb_gater_api:call(my_store, {stream_exists, StreamId}).

%% Subscription operations
{ok, SubKey} = esdb_gater_api:call(my_store, {subscribe, Type, Selector, Name}).
{ok, SubKey} = esdb_gater_api:call(my_store, {subscribe, Type, Selector, Name, Opts}).
ok = esdb_gater_api:call(my_store, {unsubscribe, SubKey}).
{ok, Subs} = esdb_gater_api:call(my_store, list_subscriptions).

%% Snapshot operations
ok = esdb_gater_api:call(my_store, {write_snapshot, StreamId, Version, Data}).
ok = esdb_gater_api:call(my_store, {write_snapshot, StreamId, Version, Data, Metadata}).
{ok, Snap} = esdb_gater_api:call(my_store, {read_snapshot, StreamId}).
ok = esdb_gater_api:call(my_store, {delete_snapshot, StreamId, Version}).

%% Aggregation
{ok, Acc} = esdb_gater_api:call(my_store, {aggregator_foldl, StreamId, Fun, InitAcc}).

%% Health check
pong = esdb_gater_api:call(my_store, ping).
```

### Deployment

erl-esdb includes erl-esdb-gater as a dependency. Starting erl-esdb automatically starts the gateway:

```erlang
%% Start erl-esdb (includes gater)
application:ensure_all_started(erl_esdb).

%% Gateway workers auto-register with the pg-based registry
%% Use the gater API for all operations
{ok, Version} = esdb_gater_api:call(my_store, {append, StreamId, -1, Events}).
```

In a multi-node cluster, each node runs erl-esdb with its gateway worker. The pg-based registry provides:
- Cluster-wide worker discovery via `pg:get_members/2`
- Eventual consistency (workers visible across all nodes)
- Automatic cleanup on node failure (pg membership)
- Load balancing with round-robin selection
- Exponential backoff retry on failures

## Related Projects

- [erl-esdb](https://github.com/macula-io/erl-esdb) - Core event store
- [ex-esdb](https://github.com/beam-campus/ex-esdb) - Original Elixir implementation

## License

Apache-2.0