# erl-esdb-gater
Gateway for distributed access to erl-esdb event stores.

## Overview
erl-esdb-gater is an Erlang gateway service providing:
- **Distributed Worker Registry**: pg-based registry for cluster-wide worker discovery
- **Load Balancing**: Round-robin with exponential backoff retry
- **PubSub Channels**: 10 dedicated channels with priority-based delivery
- **HMAC Security**: Message signing for critical channels
- **Telemetry**: BEAM telemetry with optional OpenTelemetry exporters
## Installation
Add to your `rebar.config`:
```erlang
{deps, [
{erl_esdb_gater, {git, "https://github.com/macula-io/erl-esdb-gater.git", {branch, "main"}}}
]}.
```
## Quick Start
```erlang
%% Start the application
application:ensure_all_started(erl_esdb_gater).
%% Register a worker for a store
ok = esdb_gater_api:register_worker(my_store, self()).
%% Execute an operation with automatic retry and load balancing
{ok, Result} = esdb_gater_api:execute(my_store, fun(Worker) ->
gen_server:call(Worker, {append, StreamId, Events})
end).
%% Subscribe to events channel
Topic = <<"events.user.created">>,
ok = esdb_channel_server:subscribe(esdb_channel_events, Topic, self()).
%% Receive events
receive
{channel_message, esdb_channel_events, Topic, Event} ->
handle_event(Event)
end.
%% Publish to alerts channel (requires HMAC)
SignedMsg = esdb_pubsub_security:sign(#{type => alert, level => critical}),
ok = esdb_channel_server:publish(esdb_channel_alerts, <<"alerts.critical">>, SignedMsg).
```
## API Reference
### Worker Registry
```erlang
%% Register a worker for a store
esdb_gater_api:register_worker(StoreId, Pid) -> ok | {error, term()}.
%% Unregister a worker
esdb_gater_api:unregister_worker(StoreId, Pid) -> ok | {error, term()}.
%% Get all workers for a store
esdb_gater_api:get_workers(StoreId) -> {ok, [{Node, Pid}]} | {error, term()}.
%% Get local node workers only
esdb_gater_api:get_local_workers(StoreId) -> [Pid].
```
### Operations
```erlang
%% Execute operation with load balancing and retry
esdb_gater_api:execute(StoreId, Fun) -> {ok, Result} | {error, Reason}.
esdb_gater_api:execute(StoreId, Fun, RetryConfig) -> {ok, Result} | {error, Reason}.
%% Fire and forget (no retry)
esdb_gater_api:cast(StoreId, Fun) -> ok | {error, no_workers}.
```
### Channels
```erlang
%% Subscribe to a topic
esdb_channel_server:subscribe(ChannelName, Topic, Pid) -> ok.
%% Unsubscribe from a topic
esdb_channel_server:unsubscribe(ChannelName, Topic, Pid) -> ok.
%% Publish a message
esdb_channel_server:publish(ChannelName, Topic, Message) ->
ok | {error, rate_limited | signature_required | invalid_signature}.
```
### Security
```erlang
%% Sign a message with default secret
esdb_pubsub_security:sign(Message) -> SignedMessage.
%% Sign with custom secret
esdb_pubsub_security:sign(Message, Secret) -> SignedMessage.
%% Verify a signed message
esdb_pubsub_security:verify(SignedMessage) -> ok | {error, Reason}.
%% Set the default secret
esdb_pubsub_security:set_secret(Secret) -> ok.
```
### Retry Configuration
```erlang
%% Create custom retry config
Config = esdb_gater_retry:new_config(
100, %% base_delay_ms
5000, %% max_delay_ms
5 %% max_attempts
),
%% Execute with custom retry
esdb_gater_api:execute(my_store, Fun, Config).
```
## Channels

The gateway provides 10 dedicated PubSub channels:
| Channel | Priority | Rate Limit | HMAC | Purpose |
|---------|----------|------------|------|---------|
| `esdb_channel_alerts` | critical | unlimited | required | Critical system alerts |
| `esdb_channel_security` | critical | unlimited | required | Security events |
| `esdb_channel_events` | high | unlimited | optional | Business events |
| `esdb_channel_health` | high | 100/sec | optional | Health checks |
| `esdb_channel_system` | normal | unlimited | optional | System notifications |
| `esdb_channel_metrics` | normal | 10000/sec | optional | Performance metrics |
| `esdb_channel_audit` | normal | unlimited | optional | Audit trail |
| `esdb_channel_lifecycle` | normal | unlimited | optional | Lifecycle events |
| `esdb_channel_logging` | low | 1000/sec | optional | Log messages |
| `esdb_channel_diagnostics` | low | 100/sec | optional | Diagnostic info |
### Channel Priorities
- **critical**: Immediate delivery, no rate limiting, HMAC required
- **high**: Priority delivery, minimal rate limiting
- **normal**: Standard delivery
- **low**: Background delivery, may be rate limited
## Architecture
### Supervision Tree
```
erl_esdb_gater_sup (one_for_one)
├── esdb_gater_cluster_sup (rest_for_one)
│ ├── esdb_gater_worker_registry (pg-based)
│ └── esdb_gater_cluster_monitor
└── esdb_channel_sup (one_for_one)
├── esdb_channel_alerts
├── esdb_channel_security
├── esdb_channel_events
├── esdb_channel_health
├── esdb_channel_system
├── esdb_channel_metrics
├── esdb_channel_audit
├── esdb_channel_lifecycle
├── esdb_channel_logging
└── esdb_channel_diagnostics
```
### Worker Registry Flow
```
Register Worker
│
▼
esdb_gater_api:register_worker()
│
▼
esdb_gater_worker_registry (gen_server)
│
▼
pg:join() (broadcasts to all cluster nodes)
│
▼
Process monitor attached
Execute Operation
│
▼
esdb_gater_api:execute()
│
▼
Get workers → Select (round-robin)
│
▼
Call worker with timeout
│ │
▼ ▼
Success Failure
│ │
▼ ▼
Return result Exponential backoff → Retry
```
### Channel Message Flow
```
Publisher
│
▼
esdb_channel_server:publish()
│
├─→ Rate limit check
│
├─→ HMAC verification (if required)
│
▼
pg:get_members() for topic
│
▼
Broadcast to all subscribers
│
▼
{channel_message, Channel, Topic, Message}
```
## Configuration
```erlang
%% sys.config
[{erl_esdb_gater, [
%% Cluster configuration
{cluster, [
{port, 45893},
{multicast_addr, {239, 255, 0, 2}}
]},
%% Retry defaults
{retry, [
{base_delay_ms, 100},
{max_delay_ms, 30000},
{max_attempts, 10}
]},
%% Channel configuration
{channels, [
{esdb_channel_events, [
{priority, high}
]},
{esdb_channel_metrics, [
{max_rate, 10000}
]}
]},
%% Security
{security, [
{hmac_secret, <<"your_secret_here">>},
{message_ttl_seconds, 300}
]},
%% Telemetry
{telemetry_handlers, [logger]}
]}].
```
## Telemetry Events
| Event | Measurements | Metadata |
|-------|--------------|----------|
| `[esdb_gater, worker, registered]` | system_time | store_id, node, pid |
| `[esdb_gater, worker, unregistered]` | system_time | store_id, pid |
| `[esdb_gater, worker, lookup]` | duration | store_id |
| `[esdb_gater, request, start]` | system_time | store_id, request_type |
| `[esdb_gater, request, stop]` | duration | store_id, request_type, result |
| `[esdb_gater, request, error]` | duration | store_id, request_type, reason |
| `[esdb_gater, retry, attempt]` | delay_ms, attempt | store_id, reason |
| `[esdb_gater, retry, exhausted]` | total_attempts | store_id, reason |
| `[esdb_gater, cluster, node, up]` | system_time | node, member_count |
| `[esdb_gater, cluster, node, down]` | system_time | node, member_count |
| `[esdb_gater, channel, broadcast]` | recipient_count | channel, topic |
### Attaching Handlers
```erlang
%% Attach default logger handler
ok = esdb_gater_telemetry:attach_default_handler().
%% Attach custom handler
Handler = fun(Event, Measurements, Meta, Config) ->
%% Your custom handling
ok
end,
ok = esdb_gater_telemetry:attach(my_handler, Handler, #{}).
%% Detach handler
ok = esdb_gater_telemetry:detach(my_handler).
```
## Building
```bash
rebar3 compile # Compile
rebar3 eunit # Unit tests (44 tests)
rebar3 ct # Integration tests (8 tests)
rebar3 dialyzer # Type checking
```
## Testing
Test counts:
- **Unit tests**: 44 tests (retry, security, telemetry)
- **Integration tests**: 8 tests (channel system)
- **End-to-end tests**: 24 tests (with erl-esdb, run from erl-esdb)
```bash
rebar3 eunit # All unit tests
rebar3 ct --suite=esdb_channel_SUITE # Channel tests
```
Run e2e tests from erl-esdb:
```bash
cd /path/to/erl-esdb
rebar3 ct --suite=test/e2e/erl_esdb_gater_e2e_SUITE
```
## Integration with erl-esdb
erl-esdb-gater is designed to work with [erl-esdb](https://github.com/macula-io/erl-esdb) to provide load-balanced, distributed access to event stores.
### Automatic Worker Registration
When both packages are deployed on the same nodes:
1. **erl-esdb** gateway workers automatically register with **erl-esdb-gater**
2. No manual registration is required
3. Worker cleanup is automatic when nodes leave or workers crash
```
┌─────────────────────────────────────────────────────────────────────┐
│ Client Applications │
│ │
│ esdb_gater_api:call(my_store, {append, StreamId, -1, Events}) │
└──────────────────────────────┬──────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────┐
│ erl-esdb-gater │
│ │
│ Worker Registry ──────► Load Balancer ──────► Retry Handler │
│ (pg groups) (round-robin) (exponential backoff) │
└──────────────────────────────┬──────────────────────────────────────┘
│
┌──────────────────┼──────────────────┐
▼ ▼ ▼
┌───────────────────┐ ┌───────────────────┐ ┌───────────────────┐
│ erl-esdb Node 1 │ │ erl-esdb Node 2 │ │ erl-esdb Node 3 │
│ Gateway Worker │ │ Gateway Worker │ │ Gateway Worker │
│ (registered) │ │ (registered) │ │ (registered) │
└───────────────────┘ └───────────────────┘ └───────────────────┘
```
### Accessing the Event Store
Use the gateway API to access erl-esdb with automatic load balancing and retry:
```erlang
%% Stream operations
{ok, Result} = esdb_gater_api:call(my_store, {append, StreamId, ExpectedVersion, Events}).
{ok, Events} = esdb_gater_api:call(my_store, {read, StreamId, Start, Count, Direction}).
{ok, Version} = esdb_gater_api:call(my_store, {get_version, StreamId}).
{ok, Exists} = esdb_gater_api:call(my_store, {stream_exists, StreamId}).
%% Subscription operations
{ok, SubKey} = esdb_gater_api:call(my_store, {subscribe, Type, Selector, Name}).
{ok, SubKey} = esdb_gater_api:call(my_store, {subscribe, Type, Selector, Name, Opts}).
ok = esdb_gater_api:call(my_store, {unsubscribe, SubKey}).
{ok, Subs} = esdb_gater_api:call(my_store, list_subscriptions).
%% Snapshot operations
ok = esdb_gater_api:call(my_store, {write_snapshot, StreamId, Version, Data}).
ok = esdb_gater_api:call(my_store, {write_snapshot, StreamId, Version, Data, Metadata}).
{ok, Snap} = esdb_gater_api:call(my_store, {read_snapshot, StreamId}).
ok = esdb_gater_api:call(my_store, {delete_snapshot, StreamId, Version}).
%% Aggregation
{ok, Acc} = esdb_gater_api:call(my_store, {aggregator_foldl, StreamId, Fun, InitAcc}).
%% Health check
pong = esdb_gater_api:call(my_store, ping).
```
### Deployment
erl-esdb includes erl-esdb-gater as a dependency. Starting erl-esdb automatically starts the gateway:
```erlang
%% Start erl-esdb (includes gater)
application:ensure_all_started(erl_esdb).
%% Gateway workers auto-register with the pg-based registry
%% Use the gater API for all operations
{ok, Version} = esdb_gater_api:call(my_store, {append, StreamId, -1, Events}).
```
In a multi-node cluster, each node runs erl-esdb with its gateway worker. The pg-based registry provides:
- Cluster-wide worker discovery via `pg:get_members/2`
- Eventual consistency (workers visible across all nodes)
- Automatic cleanup on node failure (pg membership)
- Load balancing with round-robin selection
- Exponential backoff retry on failures
## Related Projects
- [erl-esdb](https://github.com/macula-io/erl-esdb) - Core event store
- [ex-esdb](https://github.com/beam-campus/ex-esdb) - Original Elixir implementation
## License
Apache-2.0