<h1 align="center">Eventsourcing</h1>
<div align="center">
✨ <strong>Event Sourcing Library for Gleam</strong> ✨
</div>
<div align="center">
A Gleam library for building event-sourced systems with supervision trees.
</div>
<br />
<div align="center">
<a href="https://hex.pm/packages/eventsourcing">
<img src="https://img.shields.io/hexpm/v/eventsourcing"
alt="Available on Hex" />
</a>
<a href="https://hexdocs.pm/eventsourcing">
<img src="https://img.shields.io/badge/hex-docs-ffaff3"
alt="Documentation" />
</a>
</div>
---
## Table of contents
- [Introduction](#introduction)
- [Architecture](#architecture)
- [Features](#features)
- [Quick Start](#quick-start)
- [Example](#example)
- [Define Your Domain](#define-your-domain)
- [Command Handling](#command-handling)
- [Event Application](#event-application)
- [Supervised Usage (Recommended)](#supervised-usage-recommended)
- [Simple Usage (Testing)](#simple-usage-testing)
- [Snapshot Configuration](#snapshot-configuration)
- [Error Handling](#error-handling)
- [Migration from v7](#migration-from-v7)
- [Philosophy](#philosophy)
- [Installation](#installation)
- [Support](#support)
- [Contributing](#contributing)
- [License](#license)
## Introduction
Eventsourcing is a Gleam library for building robust, concurrent event-sourced systems using OTP supervision trees. Event sourcing stores changes to application state as a sequence of immutable events, providing excellent auditability, debugging capabilities, and system resilience.
**Version 8.0** introduces a complete architectural rewrite with supervision trees, asynchronous query processing, and enhanced fault tolerance for production-ready event-sourced systems.
## Architecture
The library is built on a supervised actor architecture:
- **Supervision Trees**: Production-ready fault tolerance with automatic actor restarts
- **Query Actors**: Event processing runs in separate supervised actors for non-blocking processing
- **Domain Error Resilience**: Business rule violations don't crash the system
- **Asynchronous Processing**: Commands and queries are processed independently
## Features
### 🏗️ **Supervision & Fault Tolerance**
- **Supervised architecture** with built-in supervision trees
- **Graceful domain error handling** - business rule violations don't crash actors
- **Automatic actor recovery** - failed components restart automatically
- **Fault isolation** - actor crashes don't cascade through the system
### ⚡ **Concurrent Processing**
- **Asynchronous query processing** - queries don't block command execution
- **Multi-aggregate support** - process multiple aggregates simultaneously
- **Actor-based isolation** - queries run in independent processes
### 📊 **Event Sourcing Core**
- **Command validation** and event generation
- **Event persistence** with multiple store implementations
- **Aggregate reconstruction** from event streams
- **Partial event loading** from specific sequence numbers
- **Snapshot support** for performance optimization
### 🔧 **Event Store Support**
- [**In-memory Store**](https://github.com/renatillas/eventsourcing_inmemory): Development and testing
- [**Postgres Store**](https://github.com/renatillas/eventsourcing_postgres): PostgreSQL persistence
- [**SQLite Store**](https://github.com/renatillas/eventsourcing_sqlite): SQLite persistence
### 🛡️ **Type Safety**
- **Comprehensive error types** with Result-based API
- **Input validation** for timeouts and frequencies
- **Type-safe event sourcing pipeline**
## Quick Start
```gleam
import eventsourcing
import eventsourcing/memory_store
import gleam/otp/static_supervisor
// 1. Set up receivers for actors
let eventsourcing_actor_receiver = process.new_subject()
let query_actors_receiver = process.new_subject()
// 2. Create supervised event sourcing system
let assert Ok(memory_store) = memory_store.new()
let assert Ok(eventsourcing_spec) = eventsourcing.supervised(
memory_store,
handle: my_handle,
apply: my_apply,
empty_state: MyEmptyState,
queries: [my_query],
eventsourcing_actor_receiver:,
query_actors_receiver:,
)
// 3. Start supervisor
let assert Ok(supervisor) =
static_supervisor.new(static_supervisor.OneForOne)
|> static_supervisor.add(eventsourcing_spec)
|> static_supervisor.start()
// 4. Get actors from receivers
let assert Ok(eventsourcing_actor) =
process.receive(eventsourcing_actor_receiver, 2000)
let query_actors =
list.map(queries, fn(_) {
let assert Ok(query_actor) = process.receive(query_actors_receiver, 1000)
query_actor
})
// 5. Register query actors (required after supervisor start)
eventsourcing.register_queries(eventsourcing_actor, query_actors)
// 6. Execute commands
eventsourcing.execute(eventsourcing_actor, "aggregate-123", MyCommand)
// 7. Load events and monitor system
let events_subject = eventsourcing.load_events(eventsourcing_actor, "aggregate-123")
let stats_subject = eventsourcing.get_system_stats(eventsourcing_actor)
```
## Example
### Define Your Domain
```gleam
pub type BankAccount {
BankAccount(balance: Float)
UnopenedBankAccount
}
pub type BankAccountCommand {
OpenAccount(account_id: String)
DepositMoney(amount: Float)
WithDrawMoney(amount: Float)
}
pub type BankAccountEvent {
AccountOpened(account_id: String)
CustomerDepositedCash(amount: Float, balance: Float)
CustomerWithdrewCash(amount: Float, balance: Float)
}
pub type BankAccountError {
CantDepositNegativeAmount
CantOperateOnUnopenedAccount
CantWithdrawMoreThanCurrentBalance
}
```
### Command Handling
```gleam
pub fn handle(
bank_account: BankAccount,
command: BankAccountCommand,
) -> Result(List(BankAccountEvent), BankAccountError) {
case bank_account, command {
UnopenedBankAccount, OpenAccount(account_id) ->
Ok([AccountOpened(account_id)])
BankAccount(balance), DepositMoney(amount) -> {
case amount >. 0.0 {
True -> {
let new_balance = balance +. amount
Ok([CustomerDepositedCash(amount, new_balance)])
}
False -> Error(CantDepositNegativeAmount)
}
}
BankAccount(balance), WithDrawMoney(amount) -> {
case amount >. 0.0 && balance >=. amount {
True -> {
let new_balance = balance -. amount
Ok([CustomerWithdrewCash(amount, new_balance)])
}
False -> Error(CantWithdrawMoreThanCurrentBalance)
}
}
_, _ -> Error(CantOperateOnUnopenedAccount)
}
}
```
### Event Application
```gleam
pub fn apply(bank_account: BankAccount, event: BankAccountEvent) -> BankAccount {
case event {
AccountOpened(_) -> BankAccount(0.0)
CustomerDepositedCash(_, balance) -> BankAccount(balance)
CustomerWithdrewCash(_, balance) -> BankAccount(balance)
}
}
```
### Supervised Usage (Recommended)
```gleam
import eventsourcing
import eventsourcing/memory_store
import gleam/otp/static_supervisor
import gleam/erlang/process
pub fn main() {
// Define query for read model updates
let balance_query = fn(aggregate_id, events) {
io.println(
"Account " <> aggregate_id <> " processed "
<> int.to_string(list.length(events)) <> " events"
)
}
// Set up communication channels
let eventsourcing_actor_receiver = process.new_subject()
let query_actors_receiver = process.new_subject()
// Create supervised system
let assert Ok(memory_store) = memory_store.new()
let assert Ok(eventsourcing_spec) = eventsourcing.supervised(
memory_store,
handle: handle,
apply: apply,
empty_state: UnopenedBankAccount,
queries: [balance_query],
eventsourcing_actor_receiver:,
query_actors_receiver:,
)
// Start supervision tree
let assert Ok(_supervisor) =
static_supervisor.new(static_supervisor.OneForOne)
|> static_supervisor.add(eventsourcing_spec)
|> static_supervisor.start()
// Get actors from startup
let assert Ok(eventsourcing_actor) =
process.receive(eventsourcing_actor_receiver, 2000)
let assert Ok(query_actors) =
list.try_map([balance_query], fn(_) {
process.receive(query_actors_receiver, 1000)
})
// Register queries (required after supervisor initialization)
eventsourcing.register_queries(eventsourcing_actor.data, query_actors)
// Execute commands - they will be processed asynchronously
eventsourcing.execute(
eventsourcing_actor,
"account-123",
OpenAccount("account-123")
)
eventsourcing.execute(
eventsourcing_actor,
"account-123",
DepositMoney(100.0)
)
}
```
### Async API Usage
All data operations now use an asynchronous message-passing pattern:
```gleam
pub fn async_example() {
// Set up supervised system
let assert Ok(memory_store) = memory_store.new()
let eventsourcing_actor_receiver = process.new_subject()
let query_actors_receiver = process.new_subject()
let assert Ok(eventsourcing_spec) = eventsourcing.supervised(
memory_store,
handle: handle,
apply: apply,
empty_state: UnopenedBankAccount,
queries: [],
eventsourcing_actor_receiver:,
query_actors_receiver:,
snapshot_config: None,
)
let assert Ok(_supervisor) =
static_supervisor.new(static_supervisor.OneForOne)
|> static_supervisor.add(eventsourcing_spec)
|> static_supervisor.start()
let assert Ok(eventsourcing_actor) =
process.receive(eventsourcing_actor_receiver, 2000)
// Load aggregate state asynchronously
let load_subject = eventsourcing.load_aggregate(eventsourcing_actor, "account-123")
case process.receive(load_subject, 1000) {
Ok(Ok(aggregate)) -> io.println("Account loaded: " <> aggregate.aggregate_id)
Ok(Error(eventsourcing.EntityNotFound)) -> io.println("Account not found")
Ok(Error(other)) -> io.println("Error: " <> string.inspect(other))
Error(_) -> io.println("Timeout waiting for response")
}
// Load events asynchronously
let events_subject = eventsourcing.load_events_from(eventsourcing_actor, "account-123", 0)
case process.receive(events_subject, 1000) {
Ok(Ok(events)) -> io.println("Loaded " <> int.to_string(list.length(events)) <> " events")
Ok(Error(error)) -> io.println("Error loading events: " <> string.inspect(error))
Error(_) -> io.println("Timeout waiting for events")
}
}
```
### Snapshot Configuration
```gleam
// Create validated frequency (snapshots every 100 events)
let assert Ok(frequency) = eventsourcing.frequency(100)
let snapshot_config = eventsourcing.SnapshotConfig(frequency)
// Enable snapshots during supervised system setup
let assert Ok(eventsourcing_spec) = eventsourcing.supervised(
memory_store,
handle: handle,
apply: apply,
empty_state: UnopenedBankAccount,
queries: [],
eventsourcing_actor_receiver:,
query_actors_receiver:,
snapshot_config: Some(snapshot_config), // Enable snapshots
)
// Load latest snapshot asynchronously
let snapshot_subject = eventsourcing.get_latest_snapshot(eventsourcing_actor, "account-123")
case process.receive(snapshot_subject, 1000) {
Ok(Ok(Some(snapshot))) -> {
io.println("Using snapshot from sequence " <> int.to_string(snapshot.sequence))
}
Ok(Ok(None)) -> io.println("No snapshot available, loading from events")
Ok(Error(error)) -> io.println("Error loading snapshot: " <> string.inspect(error))
Error(_) -> io.println("Timeout waiting for snapshot")
}
```
### Enhanced API Features
**Execute Commands with Metadata**
```gleam
// Execute with additional tracking information
eventsourcing.execute_with_metadata(
eventsourcing_actor,
"account-123",
DepositMoney(100.0),
[#("user_id", "user-456"), #("source", "mobile_app"), #("trace_id", "abc-123")]
)
```
**System Monitoring and Stats**
```gleam
// Get system health statistics
let stats_subject = eventsourcing.get_system_stats(eventsourcing_actor)
case process.receive(stats_subject, 1000) {
Ok(stats) -> {
io.println("Query actors: " <> int.to_string(stats.query_actors_count))
io.println("Commands processed: " <> int.to_string(stats.total_commands_processed))
io.println("Uptime: " <> int.to_string(stats.uptime_seconds) <> " seconds")
}
Error(_) -> io.println("Timeout getting stats")
}
// Get individual aggregate statistics
let agg_stats_subject = eventsourcing.get_aggregate_stats(eventsourcing_actor, "account-123")
case process.receive(agg_stats_subject, 1000) {
Ok(Ok(stats)) -> {
io.println("Aggregate: " <> stats.aggregate_id)
io.println("Has snapshot: " <> string.inspect(stats.has_snapshot))
}
Ok(Error(error)) -> io.println("Error: " <> string.inspect(error))
Error(_) -> io.println("Timeout")
}
```
**Comprehensive Error Handling**
The library provides enhanced error types with detailed context:
```gleam
pub type EventSourcingError(domainerror) {
DomainError(domainerror) // Business rule violations
EventStoreError(String) // Storage/persistence errors
EntityNotFound // Aggregate doesn't exist
NonPositiveArgument // Invalid input validation
TransactionFailed // Transaction processing failed
TransactionRolledBack // Transaction was rolled back
ActorTimeout(operation: String, timeout_ms: Int) // Actor communication timeouts
InvalidAggregateId(aggregate_id: String, reason: String) // Aggregate ID validation
QueryActorFailed(query_id: String, error: String) // Query processing failures
SnapshotError(aggregate_id: String, error: String) // Snapshot operation errors
ConcurrencyError( // Optimistic concurrency failures
aggregate_id: String,
expected_sequence: Int,
actual_sequence: Int
)
}
// Domain errors no longer crash actors - they're handled gracefully
case result {
Error(eventsourcing.DomainError(domain_error)) -> {
io.println("Business rule violation: " <> string.inspect(domain_error))
// Actor continues running
}
Error(eventsourcing.ActorTimeout(operation, timeout_ms)) -> {
io.println(operation <> " timed out after " <> int.to_string(timeout_ms) <> "ms")
}
Error(eventsourcing.ConcurrencyError(agg_id, expected, actual)) -> {
io.println("Concurrency conflict in " <> agg_id <> ": expected seq "
<> int.to_string(expected) <> " but was " <> int.to_string(actual))
}
Error(eventsourcing.EntityNotFound) -> {
io.println("Aggregate not found - create it first")
}
}
```
## Migration from v7
### Key Changes
- **Supervision required** for production use
- **Query registration** needed after supervisor startup
- **Asynchronous execution** via actor message passing
- **Async data loading** - all load operations return subjects and require `process.receive()`
- **Enhanced error handling** with graceful domain error recovery
- **Snapshot config** now passed during system initialization
### Migration Steps
**1. Update Initialization**
```gleam
// v7.x
let eventsourcing = eventsourcing.new(store, queries, handle, apply, empty_state)
// v8.0 - Supervised (Recommended)
let eventsourcing_spec = eventsourcing.supervised(
store, handle, apply, empty_state, queries,
eventsourcing_receiver, query_receiver
)
```
**2. Add Query Registration**
```gleam
// v8.0 - Required after supervisor start
eventsourcing.register_queries(eventsourcing_actor, query_actors)
```
**3. Update Command Execution**
```gleam
// v7.x
eventsourcing.execute(eventsourcing, "agg-123", command)
// v8.0
eventsourcing.execute(eventsourcing_actor, "agg-123", command)
```
**4. Update Data Loading (Now Async)**
```gleam
// v7.x - Synchronous
let assert Ok(aggregate) = eventsourcing.load_aggregate(eventsourcing, "agg-123")
// v8.0 - Asynchronous message passing
let load_subject = eventsourcing.load_aggregate(eventsourcing_actor, "agg-123")
let assert Ok(Ok(aggregate)) = process.receive(load_subject, 1000)
```
**5. Update Snapshot Configuration**
```gleam
// v7.x
let config = eventsourcing.SnapshotConfig(5)
// v8.0
let assert Ok(frequency) = eventsourcing.frequency(5)
let config = eventsourcing.SnapshotConfig(frequency)
```
## Philosophy
Eventsourcing v8 embraces OTP supervision principles to build production-ready, fault-tolerant event-sourced systems:
- **Fault Tolerance First**: Supervision trees ensure system resilience
- **Let It Crash**: System errors crash actors for clean recovery, business errors don't
- **Clear Separation**: Commands, events, and queries are distinct supervised processes
- **Concurrent by Design**: Asynchronous processing maximizes throughput
- **Type Safety**: Full Gleam type safety with comprehensive error handling
- **Production Ready**: Built for real-world concurrent systems
This design makes your event-sourced systems naturally concurrent, fault-tolerant, and maintainable while preserving Gleam's excellent type safety.
## Installation
Eventsourcing is published on [Hex](https://hex.pm/packages/eventsourcing)!
You can add it to your Gleam projects from the command line:
```sh
gleam add eventsourcing
```
## Support
Eventsourcing is built by [Renatillas](https://github.com/renatillas).
Contributions are very welcome!
If you've spotted a bug, or would like to suggest a feature,
please open an issue or a pull request.
## Contributing
Contributions are welcome! Please follow these steps:
1. Fork the repository.
2. Create a new branch (`git checkout -b my-feature-branch`).
3. Make your changes and commit them (`git commit -m 'Add new feature'`).
4. Push to the branch (`git push origin my-feature-branch`).
5. Open a pull request.
Please ensure your code adheres to the project's coding standards and
includes appropriate tests.
## License
This project is licensed under the MIT License.
See the [LICENSE](LICENSE) file for details.