README.md

<h1 align="center">Eventsourcing</h1>

<div align="center">
  ✨ <strong>Event Sourcing Library for Gleam</strong> ✨
</div>

<div align="center">
  A Gleam library for building event-sourced systems with supervision trees.
</div>

<br />

<div align="center">
  <a href="https://hex.pm/packages/eventsourcing">
    <img src="https://img.shields.io/hexpm/v/eventsourcing"
      alt="Available on Hex" />
  </a>
  <a href="https://hexdocs.pm/eventsourcing">
    <img src="https://img.shields.io/badge/hex-docs-ffaff3"
      alt="Documentation" />
  </a>
</div>

---

## Table of contents

- [Introduction](#introduction)
- [Architecture](#architecture)
- [Features](#features)
- [Quick Start](#quick-start)
- [Example](#example)
  - [Define Your Domain](#define-your-domain)
  - [Command Handling](#command-handling)
  - [Event Application](#event-application)
  - [Supervised Usage (Recommended)](#supervised-usage-recommended)
  - [Async API Usage](#async-api-usage)
  - [Snapshot Configuration](#snapshot-configuration)
  - [Error Handling](#error-handling)
- [Migration Guide](#migration-guide)
- [Philosophy](#philosophy)
- [Installation](#installation)
- [Support](#support)
- [Contributing](#contributing)
- [License](#license)

## Introduction

Eventsourcing is a Gleam library for building robust, concurrent event-sourced systems using OTP supervision trees. Event sourcing stores changes to application state as a sequence of immutable events, providing excellent auditability, debugging capabilities, and system resilience.

**Version 9.0** introduces API simplifications with cleaner function signatures and streamlined message passing for improved developer experience while maintaining all the production-ready features from v8.0.

## Architecture

The library is built on a supervised actor architecture:

- **Supervision Trees**: Production-ready fault tolerance with automatic actor restarts
- **Query Actors**: Event processing runs in separate supervised actors for non-blocking processing
- **Domain Error Resilience**: Business rule violations don't crash the system
- **Asynchronous Processing**: Commands and queries are processed independently

## Features

### 🏗️ **Supervision & Fault Tolerance**

- **Supervised architecture** with built-in supervision trees
- **Graceful domain error handling** - business rule violations don't crash actors
- **Automatic actor recovery** - failed components restart automatically
- **Fault isolation** - actor crashes don't cascade through the system

### ⚡ **Concurrent Processing**

- **Asynchronous query processing** - queries don't block command execution
- **Multi-aggregate support** - process multiple aggregates simultaneously  
- **Actor-based isolation** - queries run in independent processes

### 📊 **Event Sourcing Core**

- **Command validation** and event generation
- **Event persistence** with multiple store implementations
- **Aggregate reconstruction** from event streams
- **Partial event loading** from specific sequence numbers
- **Snapshot support** for performance optimization

### 🔧 **Event Store Support**

- [**In-memory Store**](https://github.com/renatillas/eventsourcing_inmemory): Development and testing
- [**Postgres Store**](https://github.com/renatillas/eventsourcing_postgres): PostgreSQL persistence  
- [**SQLite Store**](https://github.com/renatillas/eventsourcing_sqlite): SQLite persistence

### 🛡️ **Type Safety**

- **Comprehensive error types** with Result-based API
- **Input validation** for timeouts and frequencies
- **Type-safe event sourcing pipeline**

## Quick Start

```gleam
import eventsourcing
import eventsourcing/memory_store
import gleam/otp/static_supervisor
import gleam/erlang/process

// 1. Create memory store with supervision
let events_actor_name = process.new_name("events_actor")
let snapshot_actor_name = process.new_name("snapshot_actor")
let #(eventstore, memory_store_spec) = memory_store.supervised(
  events_actor_name,
  snapshot_actor_name, 
  static_supervisor.OneForOne
)

// 2. Start memory store supervisor
let assert Ok(_) = static_supervisor.new(static_supervisor.OneForOne)
  |> static_supervisor.add(memory_store_spec)
  |> static_supervisor.start()

// 3. Create event sourcing system
let name = process.new_name("eventsourcing_actor")
let queries = [#(process.new_name("my_query"), my_query)]
let assert Ok(eventsourcing_spec) = eventsourcing.supervised(
  name: name,
  eventstore: eventstore,
  handle: my_handle,
  apply: my_apply,
  empty_state: MyEmptyState,
  queries: queries,
  snapshot_config: None
)

// 4. Start event sourcing supervisor
let assert Ok(_supervisor) = static_supervisor.new(static_supervisor.OneForOne)
  |> static_supervisor.add(eventsourcing_spec)
  |> static_supervisor.start()

// 5. Get actor from name
let eventsourcing_actor = process.named_subject(name)

// 6. Execute commands
eventsourcing.execute(eventsourcing_actor, "aggregate-123", MyCommand)

// 7. Load events and monitor system
let events_subject = eventsourcing.load_events(eventsourcing_actor, "aggregate-123")
let stats_subject = eventsourcing.system_stats(eventsourcing_actor)
```

## Example

### Define Your Domain

```gleam
pub type BankAccount {
  BankAccount(balance: Float)
  UnopenedBankAccount
}

pub type BankAccountCommand {
  OpenAccount(account_id: String)
  DepositMoney(amount: Float)
  WithDrawMoney(amount: Float)
}

pub type BankAccountEvent {
  AccountOpened(account_id: String)
  CustomerDepositedCash(amount: Float, balance: Float)
  CustomerWithdrewCash(amount: Float, balance: Float)
}

pub type BankAccountError {
  CantDepositNegativeAmount
  CantOperateOnUnopenedAccount
  CantWithdrawMoreThanCurrentBalance
}
```

### Command Handling

```gleam
pub fn handle(
  bank_account: BankAccount,
  command: BankAccountCommand,
) -> Result(List(BankAccountEvent), BankAccountError) {
  case bank_account, command {
    UnopenedBankAccount, OpenAccount(account_id) ->
      Ok([AccountOpened(account_id)])
    BankAccount(balance), DepositMoney(amount) -> {
      case amount >. 0.0 {
        True -> {
          let new_balance = balance +. amount
          Ok([CustomerDepositedCash(amount, new_balance)])
        }
        False -> Error(CantDepositNegativeAmount)
      }
    }
    BankAccount(balance), WithDrawMoney(amount) -> {
      case amount >. 0.0 && balance >=. amount {
        True -> {
          let new_balance = balance -. amount
          Ok([CustomerWithdrewCash(amount, new_balance)])
        }
        False -> Error(CantWithdrawMoreThanCurrentBalance)
      }
    }
    _, _ -> Error(CantOperateOnUnopenedAccount)
  }
}
```

### Event Application

```gleam
pub fn apply(bank_account: BankAccount, event: BankAccountEvent) -> BankAccount {
  case event {
    AccountOpened(_) -> BankAccount(0.0)
    CustomerDepositedCash(_, balance) -> BankAccount(balance)
    CustomerWithdrewCash(_, balance) -> BankAccount(balance)
  }
}
```

### Supervised Usage (Recommended)

```gleam
import eventsourcing
import eventsourcing/memory_store
import gleam/otp/static_supervisor
import gleam/erlang/process

pub fn main() {
  // Define query for read model updates
  let balance_query = fn(aggregate_id, events) {
    io.println(
      "Account " <> aggregate_id <> " processed " 
      <> int.to_string(list.length(events)) <> " events"
    )
  }

  // 1. Create memory store with supervision
  let events_actor_name = process.new_name("events_actor")
  let snapshot_actor_name = process.new_name("snapshot_actor")
  let #(eventstore, memory_store_spec) = memory_store.supervised(
    events_actor_name,
    snapshot_actor_name,
    static_supervisor.OneForOne
  )

  // 2. Start memory store supervisor
  let assert Ok(_) = static_supervisor.new(static_supervisor.OneForOne)
    |> static_supervisor.add(memory_store_spec)
    |> static_supervisor.start()

  // 3. Create supervised event sourcing system
  let name = process.new_name("eventsourcing_actor")
  let queries = [#(process.new_name("balance_query"), balance_query)]
  let assert Ok(eventsourcing_spec) = eventsourcing.supervised(
    name: name,
    eventstore: eventstore,
    handle: handle,
    apply: apply,
    empty_state: UnopenedBankAccount,
    queries: queries,
    snapshot_config: None
  )

  // 4. Start event sourcing supervision tree
  let assert Ok(_supervisor) = static_supervisor.new(static_supervisor.OneForOne)
    |> static_supervisor.add(eventsourcing_spec)
    |> static_supervisor.start()

  // 5. Get actor from name
  let eventsourcing_actor = process.named_subject(name)

  // Give time for actors to start
  process.sleep(100)

  // 6. Execute commands - they will be processed asynchronously
  eventsourcing.execute(
    eventsourcing_actor,
    "account-123",
    OpenAccount("account-123")
  )
  
  eventsourcing.execute(
    eventsourcing_actor,
    "account-123",
    DepositMoney(100.0)
  )
}
```

### Async API Usage

All data operations now use an asynchronous message-passing pattern:

```gleam
pub fn async_example() {
  // Set up supervised system (memory store setup omitted for brevity)
  let name = process.new_name("eventsourcing_actor")
  let assert Ok(eventsourcing_spec) = eventsourcing.supervised(
    name: name,
    eventstore: eventstore, // from memory_store.supervised()
    handle: handle,
    apply: apply,
    empty_state: UnopenedBankAccount,
    queries: [],
    snapshot_config: None
  )

  let assert Ok(_supervisor) = static_supervisor.new(static_supervisor.OneForOne)
    |> static_supervisor.add(eventsourcing_spec)
    |> static_supervisor.start()

  let eventsourcing_actor = process.named_subject(name)
  process.sleep(100) // Give time for actors to start

  // Load aggregate state asynchronously
  let load_subject = eventsourcing.load_aggregate(eventsourcing_actor, "account-123")
  case process.receive(load_subject, 1000) {
    Ok(Ok(aggregate)) -> io.println("Account loaded: " <> aggregate.aggregate_id)
    Ok(Error(eventsourcing.EntityNotFound)) -> io.println("Account not found")
    Ok(Error(other)) -> io.println("Error: " <> string.inspect(other))
    Error(_) -> io.println("Timeout waiting for response")
  }
  
  // Load events asynchronously
  let events_subject = eventsourcing.load_events_from(eventsourcing_actor, "account-123", 0)
  case process.receive(events_subject, 1000) {
    Ok(Ok(events)) -> io.println("Loaded " <> int.to_string(list.length(events)) <> " events")
    Ok(Error(error)) -> io.println("Error loading events: " <> string.inspect(error))
    Error(_) -> io.println("Timeout waiting for events")
  }
}
```

### Snapshot Configuration

```gleam
// Create validated frequency (snapshots every 100 events)
let assert Ok(frequency) = eventsourcing.frequency(100)
let snapshot_config = eventsourcing.SnapshotConfig(frequency)

// Enable snapshots during supervised system setup
let name = process.new_name("eventsourcing_actor")
let assert Ok(eventsourcing_spec) = eventsourcing.supervised(
  name: name,
  eventstore: eventstore, // from memory_store.supervised()
  handle: handle,
  apply: apply,
  empty_state: UnopenedBankAccount,
  queries: [],
  snapshot_config: Some(snapshot_config) // Enable snapshots
)

// Get actor and load latest snapshot asynchronously
let eventsourcing_actor = process.named_subject(name)
let snapshot_subject = eventsourcing.latest_snapshot(eventsourcing_actor, "account-123")
case process.receive(snapshot_subject, 1000) {
  Ok(Ok(Some(snapshot))) -> {
    io.println("Using snapshot from sequence " <> int.to_string(snapshot.sequence))
  }
  Ok(Ok(None)) -> io.println("No snapshot available, loading from events")
  Ok(Error(error)) -> io.println("Error loading snapshot: " <> string.inspect(error))
  Error(_) -> io.println("Timeout waiting for snapshot")
}
```

### Enhanced API Features

**Execute Commands with Metadata**

```gleam
// Execute with additional tracking information
eventsourcing.execute_with_metadata(
  eventsourcing_actor,
  "account-123",
  DepositMoney(100.0),
  [#("user_id", "user-456"), #("source", "mobile_app"), #("trace_id", "abc-123")]
)
```

**System Monitoring and Stats**

```gleam
// Get system health statistics
let stats_subject = eventsourcing.system_stats(eventsourcing_actor)
case process.receive(stats_subject, 1000) {
  Ok(stats) -> {
    io.println("Query actors: " <> int.to_string(stats.query_actors_count))
    io.println("Commands processed: " <> int.to_string(stats.total_commands_processed))
  }
  Error(_) -> io.println("Timeout getting stats")
}

// Get individual aggregate statistics  
let agg_stats_subject = eventsourcing.aggregate_stats(eventsourcing_actor, "account-123")
case process.receive(agg_stats_subject, 1000) {
  Ok(Ok(stats)) -> {
    io.println("Aggregate: " <> stats.aggregate_id)
    io.println("Has snapshot: " <> string.inspect(stats.has_snapshot))
  }
  Ok(Error(error)) -> io.println("Error: " <> string.inspect(error))
  Error(_) -> io.println("Timeout")
}
```

## Migration Guide

### Migrating from v8 to v9

**Version 9.0** introduces API simplifications to improve the developer experience:

#### Key Changes
- **Simplified function signatures**: All public functions now accept `process.Subject(...)` directly instead of `actor.Started(...)`
- **Cleaner function names**: Removed `get_` prefixes from stats functions (`system_stats`, `aggregate_stats`, `latest_snapshot`)
- **Direct message passing**: Functions send messages directly without `.data` accessor

#### Migration Steps

**1. Update Function Calls - Remove .data accessor**
```gleam
// v8.0
eventsourcing.execute(eventsourcing_actor.data, "agg-123", command)
eventsourcing.load_aggregate(eventsourcing_actor.data, "agg-123")

// v9.0 - Direct subject passing
eventsourcing.execute(eventsourcing_actor, "agg-123", command)
eventsourcing.load_aggregate(eventsourcing_actor, "agg-123")
```

**2. Update Statistics Function Names**
```gleam
// v8.0
let stats = eventsourcing.get_system_stats(eventsourcing_actor.data)
let agg_stats = eventsourcing.get_aggregate_stats(eventsourcing_actor.data, "agg-123")
let snapshot = eventsourcing.get_latest_snapshot(eventsourcing_actor.data, "agg-123")

// v9.0 - Cleaner names
let stats = eventsourcing.system_stats(eventsourcing_actor)
let agg_stats = eventsourcing.aggregate_stats(eventsourcing_actor, "agg-123") 
let snapshot = eventsourcing.latest_snapshot(eventsourcing_actor, "agg-123")
```

**3. Add Named Actor Support**
```gleam
// v8.0
let assert Ok(spec) = eventsourcing.supervised(
  eventstore, handle: handle, apply: apply,
  empty_state: state, queries: [balance_query],
  eventsourcing_actor_receiver: receiver1,
  query_actors_receiver: receiver2,
  snapshot_config: None
)

// v9.0 - Named actors required
let assert Ok(spec) = eventsourcing.supervised(
  name: process.new_name("eventsourcing_actor"),
  eventstore: eventstore, handle: handle, apply: apply,
  empty_state: state, 
  queries: [#(process.new_name("balance_query"), balance_query)],
  snapshot_config: None
)
```

### Migrating from v7 to v9

### Key Changes

- **Supervision required** for production use
- **Query registration** needed after supervisor startup  
- **Asynchronous execution** via actor message passing
- **Async data loading** - all load operations return subjects and require `process.receive()`
- **Enhanced error handling** with graceful domain error recovery
- **Snapshot config** now passed during system initialization

### Migration Steps

**1. Update Initialization**

```gleam
// v7.x
let eventsourcing = eventsourcing.new(store, queries, handle, apply, empty_state)

// v8.0 - Supervised (Recommended)  
let eventsourcing_spec = eventsourcing.supervised(
  store, handle, apply, empty_state, queries,
  eventsourcing_receiver, query_receiver
)
```

**2. Query Registration (No Longer Needed)**

```gleam
// v8.0 - Required after supervisor start
eventsourcing.register_queries(eventsourcing_actor, query_actors)

// v9.0 - Not needed! Queries are automatically registered through supervised() function
```

**3. Update Command Execution**

```gleam  
// v7.x
eventsourcing.execute(eventsourcing, "agg-123", command)

// v8.0  
eventsourcing.execute(eventsourcing_actor, "agg-123", command)
```

**4. Update Data Loading (Now Async)**

```gleam
// v7.x - Synchronous
let assert Ok(aggregate) = eventsourcing.load_aggregate(eventsourcing, "agg-123")

// v8.0 - Asynchronous message passing
let load_subject = eventsourcing.load_aggregate(eventsourcing_actor, "agg-123")
let assert Ok(Ok(aggregate)) = process.receive(load_subject, 1000)
```

**5. Update Snapshot Configuration**

```gleam
// v7.x
let config = eventsourcing.SnapshotConfig(5)

// v8.0
let assert Ok(frequency) = eventsourcing.frequency(5)
let config = eventsourcing.SnapshotConfig(frequency)
```

## Philosophy

Eventsourcing v8 embraces OTP supervision principles to build production-ready, fault-tolerant event-sourced systems:

- **Fault Tolerance First**: Supervision trees ensure system resilience
- **Let It Crash**: System errors crash actors for clean recovery, business errors don't
- **Clear Separation**: Commands, events, and queries are distinct supervised processes  
- **Concurrent by Design**: Asynchronous processing maximizes throughput
- **Type Safety**: Full Gleam type safety with comprehensive error handling
- **Production Ready**: Built for real-world concurrent systems

This design makes your event-sourced systems naturally concurrent, fault-tolerant, and maintainable while preserving Gleam's excellent type safety.

## Installation

Eventsourcing is published on [Hex](https://hex.pm/packages/eventsourcing)!
You can add it to your Gleam projects from the command line:

```sh
gleam add eventsourcing
```

## Support

Eventsourcing is built by [Renatillas](https://github.com/renatillas).
Contributions are very welcome!
If you've spotted a bug, or would like to suggest a feature,
please open an issue or a pull request.

## Contributing

Contributions are welcome! Please follow these steps:

1. Fork the repository.
2. Create a new branch (`git checkout -b my-feature-branch`).
3. Make your changes and commit them (`git commit -m 'Add new feature'`).
4. Push to the branch (`git push origin my-feature-branch`).
5. Open a pull request.

Please ensure your code adheres to the project's coding standards and
includes appropriate tests.

## License

This project is licensed under the MIT License.
See the [LICENSE](LICENSE) file for details.