# erl-esdb-gater
Gateway for distributed access to erl-esdb event stores.

## Overview
erl-esdb-gater is an Erlang gateway service providing:
- **Distributed Worker Registry**: Ra-based registry for worker discovery across nodes
- **Load Balancing**: Round-robin with exponential backoff retry
- **PubSub Channels**: 10 dedicated channels with priority-based delivery
- **HMAC Security**: Message signing for critical channels
- **Telemetry**: BEAM telemetry with optional OpenTelemetry exporters
## Installation
Add to your `rebar.config`:
```erlang
{deps, [
{erl_esdb_gater, {git, "https://github.com/macula-io/erl-esdb-gater.git", {branch, "main"}}}
]}.
```
## Quick Start
```erlang
%% Start the application
application:ensure_all_started(erl_esdb_gater).
%% Register a worker for a store
ok = esdb_gater_api:register_worker(my_store, self()).
%% Execute an operation with automatic retry and load balancing
{ok, Result} = esdb_gater_api:execute(my_store, fun(Worker) ->
gen_server:call(Worker, {append, StreamId, Events})
end).
%% Subscribe to events channel
Topic = <<"events.user.created">>,
ok = esdb_channel_server:subscribe(esdb_channel_events, Topic, self()).
%% Receive events
receive
{channel_message, esdb_channel_events, Topic, Event} ->
handle_event(Event)
end.
%% Publish to alerts channel (requires HMAC)
SignedMsg = esdb_pubsub_security:sign(#{type => alert, level => critical}),
ok = esdb_channel_server:publish(esdb_channel_alerts, <<"alerts.critical">>, SignedMsg).
```
## API Reference
### Worker Registry
```erlang
%% Register a worker for a store
esdb_gater_api:register_worker(StoreId, Pid) -> ok | {error, term()}.
%% Unregister a worker
esdb_gater_api:unregister_worker(StoreId, Pid) -> ok | {error, term()}.
%% Get all workers for a store
esdb_gater_api:get_workers(StoreId) -> {ok, [{Node, Pid}]} | {error, term()}.
%% Get local node workers only
esdb_gater_api:get_local_workers(StoreId) -> [Pid].
```
### Operations
```erlang
%% Execute operation with load balancing and retry
esdb_gater_api:execute(StoreId, Fun) -> {ok, Result} | {error, Reason}.
esdb_gater_api:execute(StoreId, Fun, RetryConfig) -> {ok, Result} | {error, Reason}.
%% Fire and forget (no retry)
esdb_gater_api:cast(StoreId, Fun) -> ok | {error, no_workers}.
```
### Channels
```erlang
%% Subscribe to a topic
esdb_channel_server:subscribe(ChannelName, Topic, Pid) -> ok.
%% Unsubscribe from a topic
esdb_channel_server:unsubscribe(ChannelName, Topic, Pid) -> ok.
%% Publish a message
esdb_channel_server:publish(ChannelName, Topic, Message) ->
ok | {error, rate_limited | signature_required | invalid_signature}.
```
### Security
```erlang
%% Sign a message with default secret
esdb_pubsub_security:sign(Message) -> SignedMessage.
%% Sign with custom secret
esdb_pubsub_security:sign(Message, Secret) -> SignedMessage.
%% Verify a signed message
esdb_pubsub_security:verify(SignedMessage) -> ok | {error, Reason}.
%% Set the default secret
esdb_pubsub_security:set_secret(Secret) -> ok.
```
### Retry Configuration
```erlang
%% Create custom retry config
Config = esdb_gater_retry:new_config(
100, %% base_delay_ms
5000, %% max_delay_ms
5 %% max_attempts
),
%% Execute with custom retry
esdb_gater_api:execute(my_store, Fun, Config).
```
## Channels

The gateway provides 10 dedicated PubSub channels:
| Channel | Priority | Rate Limit | HMAC | Purpose |
|---------|----------|------------|------|---------|
| `esdb_channel_alerts` | critical | unlimited | required | Critical system alerts |
| `esdb_channel_security` | critical | unlimited | required | Security events |
| `esdb_channel_events` | high | unlimited | optional | Business events |
| `esdb_channel_health` | high | 100/sec | optional | Health checks |
| `esdb_channel_system` | normal | unlimited | optional | System notifications |
| `esdb_channel_metrics` | normal | 10000/sec | optional | Performance metrics |
| `esdb_channel_audit` | normal | unlimited | optional | Audit trail |
| `esdb_channel_lifecycle` | normal | unlimited | optional | Lifecycle events |
| `esdb_channel_logging` | low | 1000/sec | optional | Log messages |
| `esdb_channel_diagnostics` | low | 100/sec | optional | Diagnostic info |
### Channel Priorities
- **critical**: Immediate delivery, no rate limiting, HMAC required
- **high**: Priority delivery, minimal rate limiting
- **normal**: Standard delivery
- **low**: Background delivery, may be rate limited
## Architecture
### Supervision Tree
```
erl_esdb_gater_sup (one_for_one)
├── esdb_gater_cluster_sup (rest_for_one)
│ ├── esdb_gater_worker_registry (Ra-based)
│ └── esdb_gater_cluster_monitor
├── esdb_channel_sup (one_for_one)
│ ├── esdb_channel_alerts
│ ├── esdb_channel_security
│ ├── esdb_channel_events
│ ├── esdb_channel_health
│ ├── esdb_channel_system
│ ├── esdb_channel_metrics
│ ├── esdb_channel_audit
│ ├── esdb_channel_lifecycle
│ ├── esdb_channel_logging
│ └── esdb_channel_diagnostics
└── esdb_gater_api (worker)
```
### Worker Registry Flow
```
Register Worker
│
▼
esdb_gater_api:register_worker()
│
▼
esdb_gater_worker_registry (gen_server)
│
▼
Ra consensus (writes to distributed state)
│
▼
Process monitor attached
Execute Operation
│
▼
esdb_gater_api:execute()
│
▼
Get workers → Select (round-robin)
│
▼
Call worker with timeout
│ │
▼ ▼
Success Failure
│ │
▼ ▼
Return result Exponential backoff → Retry
```
### Channel Message Flow
```
Publisher
│
▼
esdb_channel_server:publish()
│
├─→ Rate limit check
│
├─→ HMAC verification (if required)
│
▼
pg:get_members() for topic
│
▼
Broadcast to all subscribers
│
▼
{channel_message, Channel, Topic, Message}
```
## Configuration
```erlang
%% sys.config
[{erl_esdb_gater, [
%% Cluster configuration
{cluster, [
{ra_cluster_name, esdb_gater_registry}
]},
%% Retry defaults
{retry, [
{base_delay_ms, 100},
{max_delay_ms, 5000},
{max_attempts, 5}
]},
%% Channel configuration
{channels, [
{esdb_channel_events, [
{priority, high}
]},
{esdb_channel_metrics, [
{max_rate, 10000}
]}
]},
%% Security
{security, [
{hmac_secret, <<"your_secret_here">>},
{message_ttl_seconds, 300}
]},
%% Telemetry
{telemetry_handlers, [logger]}
]}].
```
## Telemetry Events
| Event | Measurements | Metadata |
|-------|--------------|----------|
| `[esdb_gater, worker, registered]` | system_time | store_id, node, pid |
| `[esdb_gater, worker, unregistered]` | system_time | store_id, pid |
| `[esdb_gater, worker, lookup]` | duration | store_id |
| `[esdb_gater, request, start]` | system_time | store_id, request_type |
| `[esdb_gater, request, stop]` | duration | store_id, request_type, result |
| `[esdb_gater, request, error]` | duration | store_id, request_type, reason |
| `[esdb_gater, retry, attempt]` | delay_ms, attempt | store_id, reason |
| `[esdb_gater, retry, exhausted]` | total_attempts | store_id, reason |
| `[esdb_gater, cluster, node, up]` | system_time | node, member_count |
| `[esdb_gater, cluster, node, down]` | system_time | node, member_count |
| `[esdb_gater, channel, broadcast]` | recipient_count | channel, topic |
### Attaching Handlers
```erlang
%% Attach default logger handler
ok = esdb_gater_telemetry:attach_default_handler().
%% Attach custom handler
Handler = fun(Event, Measurements, Meta, Config) ->
%% Your custom handling
ok
end,
ok = esdb_gater_telemetry:attach(my_handler, Handler, #{}).
%% Detach handler
ok = esdb_gater_telemetry:detach(my_handler).
```
## Building
```bash
rebar3 compile # Compile
rebar3 eunit # Unit tests (44 tests)
rebar3 ct # Integration tests (8 tests)
rebar3 dialyzer # Type checking
```
## Testing
Test counts:
- **Unit tests**: 44 tests (retry, security, telemetry)
- **Integration tests**: 8 tests (channel system)
```bash
rebar3 eunit # All unit tests
rebar3 ct --suite=esdb_channel_SUITE # Channel tests
```
## Related Projects
- [erl-esdb](https://github.com/macula-io/erl-esdb) - Core event store
- [ex-esdb](https://github.com/beam-campus/ex-esdb) - Original Elixir implementation
## License
Apache-2.0