README.md

# erl-esdb-gater

Gateway for distributed access to erl-esdb event stores.

![Gateway Architecture](assets/gateway_architecture.svg)

## Overview

erl-esdb-gater is an Erlang gateway service providing:
- **Distributed Worker Registry**: Ra-based registry for worker discovery across nodes
- **Load Balancing**: Round-robin with exponential backoff retry
- **PubSub Channels**: 10 dedicated channels with priority-based delivery
- **HMAC Security**: Message signing for critical channels
- **Telemetry**: BEAM telemetry with optional OpenTelemetry exporters

## Installation

Add to your `rebar.config`:

```erlang
{deps, [
    {erl_esdb_gater, {git, "https://github.com/macula-io/erl-esdb-gater.git", {branch, "main"}}}
]}.
```

## Quick Start

```erlang
%% Start the application
application:ensure_all_started(erl_esdb_gater).

%% Register a worker for a store
ok = esdb_gater_api:register_worker(my_store, self()).

%% Execute an operation with automatic retry and load balancing
{ok, Result} = esdb_gater_api:execute(my_store, fun(Worker) ->
    gen_server:call(Worker, {append, StreamId, Events})
end).

%% Subscribe to events channel
Topic = <<"events.user.created">>,
ok = esdb_channel_server:subscribe(esdb_channel_events, Topic, self()).

%% Receive events
receive
    {channel_message, esdb_channel_events, Topic, Event} ->
        handle_event(Event)
end.

%% Publish to alerts channel (requires HMAC)
SignedMsg = esdb_pubsub_security:sign(#{type => alert, level => critical}),
ok = esdb_channel_server:publish(esdb_channel_alerts, <<"alerts.critical">>, SignedMsg).
```

## API Reference

### Worker Registry

```erlang
%% Register a worker for a store
esdb_gater_api:register_worker(StoreId, Pid) -> ok | {error, term()}.

%% Unregister a worker
esdb_gater_api:unregister_worker(StoreId, Pid) -> ok | {error, term()}.

%% Get all workers for a store
esdb_gater_api:get_workers(StoreId) -> {ok, [{Node, Pid}]} | {error, term()}.

%% Get local node workers only
esdb_gater_api:get_local_workers(StoreId) -> [Pid].
```

### Operations

```erlang
%% Execute operation with load balancing and retry
esdb_gater_api:execute(StoreId, Fun) -> {ok, Result} | {error, Reason}.
esdb_gater_api:execute(StoreId, Fun, RetryConfig) -> {ok, Result} | {error, Reason}.

%% Fire and forget (no retry)
esdb_gater_api:cast(StoreId, Fun) -> ok | {error, no_workers}.
```

### Channels

```erlang
%% Subscribe to a topic
esdb_channel_server:subscribe(ChannelName, Topic, Pid) -> ok.

%% Unsubscribe from a topic
esdb_channel_server:unsubscribe(ChannelName, Topic, Pid) -> ok.

%% Publish a message
esdb_channel_server:publish(ChannelName, Topic, Message) ->
    ok | {error, rate_limited | signature_required | invalid_signature}.
```

### Security

```erlang
%% Sign a message with default secret
esdb_pubsub_security:sign(Message) -> SignedMessage.

%% Sign with custom secret
esdb_pubsub_security:sign(Message, Secret) -> SignedMessage.

%% Verify a signed message
esdb_pubsub_security:verify(SignedMessage) -> ok | {error, Reason}.

%% Set the default secret
esdb_pubsub_security:set_secret(Secret) -> ok.
```

### Retry Configuration

```erlang
%% Create custom retry config
Config = esdb_gater_retry:new_config(
    100,     %% base_delay_ms
    5000,    %% max_delay_ms
    5        %% max_attempts
),

%% Execute with custom retry
esdb_gater_api:execute(my_store, Fun, Config).
```

## Channels

![PubSub Channels](assets/channels.svg)

The gateway provides 10 dedicated PubSub channels:

| Channel | Priority | Rate Limit | HMAC | Purpose |
|---------|----------|------------|------|---------|
| `esdb_channel_alerts` | critical | unlimited | required | Critical system alerts |
| `esdb_channel_security` | critical | unlimited | required | Security events |
| `esdb_channel_events` | high | unlimited | optional | Business events |
| `esdb_channel_health` | high | 100/sec | optional | Health checks |
| `esdb_channel_system` | normal | unlimited | optional | System notifications |
| `esdb_channel_metrics` | normal | 10000/sec | optional | Performance metrics |
| `esdb_channel_audit` | normal | unlimited | optional | Audit trail |
| `esdb_channel_lifecycle` | normal | unlimited | optional | Lifecycle events |
| `esdb_channel_logging` | low | 1000/sec | optional | Log messages |
| `esdb_channel_diagnostics` | low | 100/sec | optional | Diagnostic info |

### Channel Priorities

- **critical**: Immediate delivery, no rate limiting, HMAC required
- **high**: Priority delivery, minimal rate limiting
- **normal**: Standard delivery
- **low**: Background delivery, may be rate limited

## Architecture

### Supervision Tree

```
erl_esdb_gater_sup (one_for_one)
├── esdb_gater_cluster_sup (rest_for_one)
│   ├── esdb_gater_worker_registry (Ra-based)
│   └── esdb_gater_cluster_monitor
├── esdb_channel_sup (one_for_one)
│   ├── esdb_channel_alerts
│   ├── esdb_channel_security
│   ├── esdb_channel_events
│   ├── esdb_channel_health
│   ├── esdb_channel_system
│   ├── esdb_channel_metrics
│   ├── esdb_channel_audit
│   ├── esdb_channel_lifecycle
│   ├── esdb_channel_logging
│   └── esdb_channel_diagnostics
└── esdb_gater_api (worker)
```

### Worker Registry Flow

```
Register Worker
    │
    ▼
esdb_gater_api:register_worker()
    │
    ▼
esdb_gater_worker_registry (gen_server)
    │
    ▼
Ra consensus (writes to distributed state)
    │
    ▼
Process monitor attached

Execute Operation
    │
    ▼
esdb_gater_api:execute()
    │
    ▼
Get workers → Select (round-robin)
    │
    ▼
Call worker with timeout
    │                     │
    ▼                     ▼
Success              Failure
    │                     │
    ▼                     ▼
Return result     Exponential backoff → Retry
```

### Channel Message Flow

```
Publisher
    │
    ▼
esdb_channel_server:publish()
    │
    ├─→ Rate limit check
    │
    ├─→ HMAC verification (if required)
    │
    ▼
pg:get_members() for topic
    │
    ▼
Broadcast to all subscribers
    │
    ▼
{channel_message, Channel, Topic, Message}
```

## Configuration

```erlang
%% sys.config
[{erl_esdb_gater, [
    %% Cluster configuration
    {cluster, [
        {ra_cluster_name, esdb_gater_registry}
    ]},

    %% Retry defaults
    {retry, [
        {base_delay_ms, 100},
        {max_delay_ms, 5000},
        {max_attempts, 5}
    ]},

    %% Channel configuration
    {channels, [
        {esdb_channel_events, [
            {priority, high}
        ]},
        {esdb_channel_metrics, [
            {max_rate, 10000}
        ]}
    ]},

    %% Security
    {security, [
        {hmac_secret, <<"your_secret_here">>},
        {message_ttl_seconds, 300}
    ]},

    %% Telemetry
    {telemetry_handlers, [logger]}
]}].
```

## Telemetry Events

| Event | Measurements | Metadata |
|-------|--------------|----------|
| `[esdb_gater, worker, registered]` | system_time | store_id, node, pid |
| `[esdb_gater, worker, unregistered]` | system_time | store_id, pid |
| `[esdb_gater, worker, lookup]` | duration | store_id |
| `[esdb_gater, request, start]` | system_time | store_id, request_type |
| `[esdb_gater, request, stop]` | duration | store_id, request_type, result |
| `[esdb_gater, request, error]` | duration | store_id, request_type, reason |
| `[esdb_gater, retry, attempt]` | delay_ms, attempt | store_id, reason |
| `[esdb_gater, retry, exhausted]` | total_attempts | store_id, reason |
| `[esdb_gater, cluster, node, up]` | system_time | node, member_count |
| `[esdb_gater, cluster, node, down]` | system_time | node, member_count |
| `[esdb_gater, channel, broadcast]` | recipient_count | channel, topic |

### Attaching Handlers

```erlang
%% Attach default logger handler
ok = esdb_gater_telemetry:attach_default_handler().

%% Attach custom handler
Handler = fun(Event, Measurements, Meta, Config) ->
    %% Your custom handling
    ok
end,
ok = esdb_gater_telemetry:attach(my_handler, Handler, #{}).

%% Detach handler
ok = esdb_gater_telemetry:detach(my_handler).
```

## Building

```bash
rebar3 compile         # Compile
rebar3 eunit           # Unit tests (44 tests)
rebar3 ct              # Integration tests (8 tests)
rebar3 dialyzer        # Type checking
```

## Testing

Test counts:
- **Unit tests**: 44 tests (retry, security, telemetry)
- **Integration tests**: 8 tests (channel system)

```bash
rebar3 eunit                                    # All unit tests
rebar3 ct --suite=esdb_channel_SUITE            # Channel tests
```

## Related Projects

- [erl-esdb](https://github.com/macula-io/erl-esdb) - Core event store
- [ex-esdb](https://github.com/beam-campus/ex-esdb) - Original Elixir implementation

## License

Apache-2.0