README.md

# EventodbKit

Production-ready Elixir SDK for EventoDB with built-in resilience patterns.

EventodbKit sits on top of the lightweight [EventodbEx](https://hexdocs.pm/eventodb_ex) SDK and provides:

- **Outbox Pattern** - Local persistence of writes before sending to EventoDB
- **Consumer Position Tracking** - Automatic position management per namespace/category/consumer
- **Idempotency** - Built-in deduplication for producers and consumers
- **Background Workers** - GenServer-based outbox sender and consumer
- **Type-Safe Event Handling** - Integration with code-generated event schemas

## Installation

Add `eventodb_kit` to your list of dependencies in `mix.exs`:

```elixir
def deps do
  [
    {:eventodb_kit, "~> 0.1.0"}
  ]
end
```

## Database Setup

EventodbKit requires database tables for outbox, position tracking, and idempotency.

### Step 1: Add Migration

Create a migration in your application:

```bash
mix ecto.gen.migration add_eventodb_kit_tables
```

### Step 2: Use EventodbKit.Migration

```elixir
defmodule MyApp.Repo.Migrations.AddEventodbKitTables do
  use Ecto.Migration

  def up do
    EventodbKit.Migration.up(version: 1)
  end

  def down do
    EventodbKit.Migration.down(version: 1)
  end
end
```

### Step 3: Run Migration

```bash
mix ecto.migrate
```

## Quick Start

### Type-Safe Events with Code Generation

**For the best developer experience, use with generated event schemas:**

```elixir
# 1. Define your event registry (using generated schemas)
defmodule MyApp.Events do
  use EventodbKit.EventDispatcher
  
  register "UserCreated", MyApp.Events.UserCreated
  register "OrderPlaced", MyApp.Events.OrderPlaced
end

# 2. Use in consumers with automatic validation
defmodule MyApp.MyConsumer do
  use EventodbKit.Consumer
  
  def handle_message(message, state) do
    MyApp.Events.dispatch(message["type"], message["data"], &handle_event/2)
  end
  
  defp handle_event(MyApp.Events.UserCreated, event) do
    # event is validated struct with all fields
    IO.puts("User: #{event.user_id}")
    :ok
  end
end
```

**See the repository's CODEGEN_API.md for complete code generation integration guide.**

## Usage

### Create Client

```elixir
kit = EventodbKit.Client.new(
  "http://localhost:8080",
  token: "ns_abc123",
  repo: MyApp.Repo
)
```

### Write Operations (Outbox Pattern)

Writes go to local database first, then are sent in the background:

```elixir
# Write to outbox
{:ok, outbox_id, kit} = EventodbKit.stream_write(
  kit,
  "account-123",
  %{type: "Deposited", data: %{amount: 100}}
)

# Write with idempotency key (prevents duplicates)
{:ok, outbox_id, kit} = EventodbKit.stream_write(
  kit,
  "payment-456",
  %{
    type: "PaymentRequested",
    data: %{
      amount: 1000,
      idempotency_key: "payment_user123_invoice456"
    }
  }
)

# Transactional write with business logic
MyApp.Repo.transaction(fn ->
  lead = MyApp.Repo.insert!(%Lead{email: "test@example.com"})
  
  {:ok, _outbox_id, _kit} = EventodbKit.stream_write(
    kit,
    "partnership-#{lead.id}",
    %{type: "LeadCreated", data: %{lead_id: lead.id}}
  )
  
  lead
end)
```

### Read Operations

Read operations delegate directly to EventodbEx:

```elixir
# Stream operations
{:ok, messages, kit} = EventodbKit.stream_get(kit, "account-123")
{:ok, message, kit} = EventodbKit.stream_last(kit, "account-123")
{:ok, version, kit} = EventodbKit.stream_version(kit, "account-123")

# Category operations
{:ok, messages, kit} = EventodbKit.category_get(kit, "account", %{
  position: 0,
  batch_size: 100
})
```

### Outbox Sender (Background Worker)

Add to your application supervisor:

```elixir
defmodule MyApp.Application do
  use Application

  def start(_type, _args) do
    children = [
      MyApp.Repo,
      
      # Outbox sender - sends unsent messages in background
      {EventodbKit.OutboxSender, [
        namespace: "analytics",
        base_url: "http://localhost:8080",
        token: System.get_env("ANALYTICS_TOKEN"),
        repo: MyApp.Repo,
        poll_interval: 1_000,  # Poll every second
        batch_size: 100
      ]}
    ]

    Supervisor.start_link(children, strategy: :one_for_one)
  end
end
```

### Consumer Pattern

Create a consumer to process events from a category:

```elixir
defmodule MyApp.PartnershipConsumer do
  use EventodbKit.Consumer
  require Logger
  
  def start_link(opts) do
    EventodbKit.Consumer.start_link(__MODULE__, opts)
  end
  
  @impl EventodbKit.Consumer
  def init(opts) do
    {:ok, %{
      namespace: Keyword.fetch!(opts, :namespace),
      category: "partnership",
      consumer_id: "singleton",
      base_url: "http://localhost:8080",
      token: Keyword.fetch!(opts, :token),
      repo: MyApp.Repo,
      poll_interval: 1_000,
      batch_size: 100
    }}
  end
  
  @impl EventodbKit.Consumer
  def handle_message(message, state) do
    case message["type"] do
      "PartnershipApplicationSubmitted" ->
        %MyApp.Lead{
          email: message["data"]["email"],
          school_name: message["data"]["school_name"]
        }
        |> MyApp.Repo.insert!()
        :ok
        
      _ ->
        Logger.warn("Unknown event type: #{message["type"]}")
        :ok
    end
  end
end

# Add to supervisor
{MyApp.PartnershipConsumer, [
  namespace: "analytics",
  token: System.get_env("ANALYTICS_TOKEN")
]}
```

### Consumer Groups

For higher throughput, use consumer groups to partition processing:

```elixir
defmodule MyApp.PartnershipConsumerGroup do
  use EventodbKit.Consumer
  
  @impl EventodbKit.Consumer
  def init(opts) do
    {:ok, %{
      namespace: Keyword.fetch!(opts, :namespace),
      category: "partnership",
      consumer_id: "member-#{Keyword.fetch!(opts, :group_member)}",
      group_member: Keyword.fetch!(opts, :group_member),
      group_size: Keyword.fetch!(opts, :group_size),
      base_url: "http://localhost:8080",
      token: Keyword.fetch!(opts, :token),
      repo: MyApp.Repo,
      poll_interval: 1_000,
      batch_size: 100
    }}
  end
  
  @impl EventodbKit.Consumer
  def handle_message(message, _state) do
    # Process message
    :ok
  end
end

# In supervisor - start 3 members
children = [
  {MyApp.PartnershipConsumerGroup, [
    namespace: "analytics",
    token: token,
    group_member: 0,
    group_size: 3
  ]},
  {MyApp.PartnershipConsumerGroup, [
    namespace: "analytics",
    token: token,
    group_member: 1,
    group_size: 3
  ]},
  {MyApp.PartnershipConsumerGroup, [
    namespace: "analytics",
    token: token,
    group_member: 2,
    group_size: 3
  ]}
]
```

## Features

### Outbox Pattern

- ✅ Writes succeed even when EventoDB is down
- ✅ Transactional consistency with business logic
- ✅ Automatic retry via background sender
- ✅ Idempotency key support

### Consumer Position Tracking

- ✅ Automatic position save/load per consumer
- ✅ Resume from last position after restart
- ✅ Support for multiple consumers per category

### Idempotency

- ✅ Producer: Prevent duplicate events with idempotency keys
- ✅ Consumer: Prevent duplicate processing via `evento_processed_events`
- ✅ Automatic deduplication

### Resilience

- ✅ EventoDB down: Writes continue (to outbox)
- ✅ EventoDB down: Consumers gracefully handle errors
- ✅ EventoDB recovery: Outbox drains automatically
- ✅ Consumer recovery: Resumes from saved position

## Database Tables

EventodbKit creates three tables:

### evento_outbox

Stores events before they're sent to EventoDB:

- `id` - UUID primary key
- `namespace` - Namespace identifier
- `stream` - Stream name
- `type` - Event type
- `data` - Event data (JSONB)
- `metadata` - Optional metadata (JSONB)
- `write_options` - Write options like expected_version (JSONB)
- `sent_at` - Timestamp when sent (NULL if not sent)
- `created_at` - Timestamp when created

### evento_consumer_positions

Tracks consumer positions:

- `namespace` - Namespace identifier (PK)
- `category` - Category name (PK)
- `consumer_id` - Consumer identifier (PK)
- `position` - Current global position
- `updated_at` - Last update timestamp

### evento_processed_events

Tracks processed events for idempotency:

- `event_id` - Event UUID (PK)
- `namespace` - Namespace identifier
- `event_type` - Event type
- `category` - Category name
- `consumer_id` - Consumer identifier
- `processed_at` - Processing timestamp

## Configuration

### Outbox Sender Options

- `:namespace` - Namespace to process (required)
- `:base_url` - EventoDB URL (required)
- `:token` - Namespace token (required)
- `:repo` - Ecto repo (required)
- `:poll_interval` - Polling interval in ms (default: 1000)
- `:batch_size` - Batch size (default: 100)

### Consumer Options

- `:namespace` - Namespace to consume from (required)
- `:category` - Category to consume (required)
- `:consumer_id` - Unique consumer ID (required)
- `:base_url` - EventoDB URL (required)
- `:token` - Namespace token (required)
- `:repo` - Ecto repo (required)
- `:poll_interval` - Polling interval in ms (default: 1000)
- `:batch_size` - Batch size (default: 100)
- `:group_member` - Group member index (optional, for consumer groups)
- `:group_size` - Total group size (optional, for consumer groups)

## Testing

```bash
# Run tests
cd clients/eventodb_kit
mix test

# Or use the test runner script
bin/run_elixir_kit_specs.sh
```

## License

MIT - see [LICENSE](LICENSE)

## Links

- [EventoDB GitHub](https://github.com/eventodb-hq/eventodb)
- [EventodbEx (low-level client)](https://hexdocs.pm/eventodb_ex)
- [EventoDB API Documentation](https://github.com/eventodb-hq/eventodb/blob/main/docs/API.md)