# ObanEvents
A lightweight, persistent event handling library for Elixir applications built on top of [Oban](https://github.com/sorentwo/oban).
## Features
- 🔒 **Persistent** - Handler jobs survive application restarts
- 🔄 **Retryable** - Automatic retries on failure via Oban
- âš¡ **Async** - Non-blocking event emission
- 🔗 **Transactional** - Job creation works within database transactions
- 📊 **Observable** - Track processing via Oban Web UI
- ✅ **Validated** - Events are validated at compile time
- 🎯 **Decoupled** - Event emitters don't know about handlers
## Installation
Add `oban_events` to your list of dependencies in `mix.exs`:
```elixir
def deps do
[
{:oban_events, "~> 1.0"},
{:oban, "~> 2.0"}
]
end
```
## Quick Start
### 1. Define Your Events
Create a module that uses `ObanEvents`, pass your Oban instance and optional global job config (defaults: `queue: :oban_events`, `max_attempts: 3`, `priority: 2`), then define your events and handlers:
```elixir
defmodule MyApp.Events do
use ObanEvents,
oban: {MyApp.Oban, queue: :myapp_events, max_attempts: 3}
@events %{
user_created: [
{MyApp.EmailHandler, oban: [priority: 0, max_attempts: 5, tags: ["critical"]]},
MyApp.AnalyticsHandler
],
user_updated: [MyApp.CacheHandler],
order_placed: [MyApp.NotificationHandler]
}
end
```
### 2. Create Event Handlers
Implement the `ObanEvents.Handler` behaviour. Handlers receive an `Event` struct with your data plus metadata:
```elixir
defmodule MyApp.EmailHandler do
use ObanEvents.Handler
require Logger
@impl true
def handle_event(:user_created, %Event{data: data, idempotency_key: key}) do
%{"user_id" => user_id, "email" => email} = data
# Use idempotency_key for outbox pattern
case OutboxEmail.insert(%{
idempotency_key: key,
user_id: user_id,
email: email,
template: :welcome,
status: :pending
}, on_conflict: :nothing, conflict_target: :idempotency_key) do
{:ok, _} ->
Logger.info("Welcome email queued for #{email}")
:ok
{:error, _} ->
Logger.info("Welcome email already queued for #{email}")
:ok
end
end
end
```
### 3. Emit Events
Emit events from your application code, preferably within transactions:
```elixir
defmodule MyApp.Accounts do
alias MyApp.{Repo, Events}
def create_user(attrs) do
Repo.transact(fn ->
with {:ok, user} <- Repo.insert(changeset),
{:ok, _jobs} <- Events.emit(:user_created, %{
user_id: user.id,
email: user.email
}) do
{:ok, user}
end
end)
end
end
```
## How it works
```mermaid
flowchart TD
A[Business Logic] -->|1. emit event + data| B[Events.emit]
B -->|2. lookup handlers| C[Create Oban jobs]
C -->|3. persist dispatch jobs| D[Oban processes jobs]
D -->|4. dispatch| E[EmailHandler]
D -->|4. dispatch| F[AnalyticsHandler]
```
## Event Metadata
Handlers receive an `ObanEvents.Event` struct containing your data plus metadata for deduplication, tracing, and correlation:
```elixir
%Event{
data: %{"user_id" => 123}, # Your event data
event_id: "01933b7e-8a3f-7f6f-9e42-6c8f3a0b2d1e", # Unique per emit
idempotency_key: "01933b7e-9f2c-7a1b-8d4e-3c5f6a7b8c9d", # Unique per job
causation_id: "01933b7e-7f4a-7c2d-9b3e-4d5f6a7b8c9d", # Optional: parent event_id
correlation_id: "01933b7e-6e3b-7d5f-8c4e-5d6f7a8b9c0d" # Optional: business operation grouping
}
```
### Metadata Fields
#### `event_id` - Emit Identifier
- **Generated**: Once per `emit` call (UUIDv7)
- **Shared**: All handlers for the same emit get the same `event_id`
- **Use**: Track which emit created jobs, pass as `causation_id` in child emits
```elixir
def handle_event(:user_created, %Event{event_id: id, data: data}) do
# Emit child events, linking them to parent
Events.emit(:send_welcome_email, data, causation_id: id)
Events.emit(:create_user_profile, data, causation_id: id)
end
```
#### `idempotency_key` - Job Deduplication
- **Generated**: Unique per handler job (UUIDv7)
- **Use**: Outbox pattern, prevent duplicate processing on retry
```elixir
def handle_event(:send_email, %Event{idempotency_key: key, data: data}) do
# Atomic insert - if key exists, we already processed this
OutboxEmail.insert(%{
idempotency_key: key,
user_id: data["user_id"],
status: :pending
}, on_conflict: :nothing, conflict_target: :idempotency_key)
end
```
#### `causation_id` - Event Chains (Optional)
- **Provided**: User passes parent's `event_id` when emitting
- **Use**: Build audit trails, understand "why did this happen?"
```elixir
# Event chain:
user_registered
event_id: "01933b7e-1111-...", causation_id: nil
└─> send_welcome_email
event_id: "01933b7e-2222-...", causation_id: "01933b7e-1111-..."
└─> email_delivered
event_id: "01933b7e-3333-...", causation_id: "01933b7e-2222-..."
```
#### `correlation_id` - Business Operation Grouping (Optional)
- **Provided**: User generates and passes to related emits
- **Use**: Group all events from a single business operation
```elixir
# User upgrades subscription - many events happen
correlation_id = UUIDv7.generate()
Events.emit(:subscription_upgraded, data, correlation_id: correlation_id)
Events.emit(:old_subscription_cancelled, data, correlation_id: correlation_id)
Events.emit(:payment_processed, data, correlation_id: correlation_id)
Events.emit(:invoice_generated, data, correlation_id: correlation_id)
# In logs: filter by correlation_id to see all related events
```
### When to use what
| Metadata | Always Auto-Generated | User Can Override | Primary Use Case |
|----------|----------------------|-------------------|------------------|
| `data` | No | Yes (required) | Your event payload |
| `event_id` | Yes | No | Event causation tracking |
| `idempotency_key` | Yes | No | Prevent duplicate processing |
| `causation_id` | No | Yes (optional) | Build event chains |
| `correlation_id` | No | Yes (optional) | Group business operations |
## Configuration options
### Global configuration
Configure the Oban instance and default job options:
```elixir
defmodule MyApp.Events do
# Simple: just specify the Oban instance (uses all defaults)
use ObanEvents, oban: MyApp.Oban
# Or: specify Oban instance + custom options
use ObanEvents,
oban: {MyApp.Oban, queue: :my_events, max_attempts: 5, priority: 1, tags: ["myapp"]}
@events %{
# ...
}
end
```
**Default Oban options:**
- `queue`: `:oban_events`
- `max_attempts`: `3`
- `priority`: `2` (0-9, lower is higher priority)
- `tags`: `[]`
### Per-handler configuration
Override global options for specific handlers using tuple syntax:
```elixir
@events %{
user_created: [
# Critical handler with higher priority and more retries
{MyApp.EmailHandler, oban: [priority: 0, max_attempts: 10, tags: ["critical", "email"]]},
# Low-priority handler in different queue
{MyApp.AnalyticsHandler, oban: [queue: :analytics, priority: 3]},
# Delayed handler - send reminder email after 24 hours
{MyApp.ReminderHandler, oban: [schedule_in: {24, :hours}]},
# Handler using all global defaults
MyApp.NotificationHandler
]
}
```
**Per-handler options:**
Any [Oban.Job.new/2](https://hexdocs.pm/oban/Oban.Job.html) option can be passed under the `:oban` key. Common options:
- `queue` - Override queue (atom)
- `max_attempts` - Override retry count (integer)
- `priority` - Override priority (0-9, lower is higher)
- `tags` - Override tags (list of strings)
- `scheduled_at` - Schedule for future execution (DateTime)
- `schedule_in` - Delay execution (integer seconds or `{n, :unit}` tuple)
- `meta` - Additional job metadata (map)
- `unique` - Uniqueness constraints (keyword list)
## API
Your events module provides these functions:
### `emit/2`
Emit an event to all registered handlers.
```elixir
@spec emit(atom(), map()) :: {:ok, [Oban.Job.t()]}
# Raises ArgumentError if event is not registered
MyApp.Events.emit(:user_created, %{user_id: 123, email: "user@example.com"})
```
### `get_handlers!/1`
Get all handlers registered for an event.
```elixir
@spec get_handlers!(atom()) :: [module()]
MyApp.Events.get_handlers!(:user_created)
# => [MyApp.EmailHandler, MyApp.AnalyticsHandler]
```
### `all_events/0`
List all registered event names.
```elixir
@spec all_events() :: [atom()]
MyApp.Events.all_events()
# => [:user_created, :user_updated, :order_placed]
```
### `registered?/1`
Check if an event is registered.
```elixir
@spec registered?(atom()) :: boolean()
MyApp.Events.registered?(:user_created)
# => true
```
## Handler implementation
Handlers must implement the `handle_event/2` callback:
```elixir
defmodule MyApp.AnalyticsHandler do
use ObanEvents.Handler
@impl true
def handle_event(:user_created, %Event{data: data}) do
%{"user_id" => user_id} = data
MyApp.Analytics.track("User Created", user_id: user_id)
:ok
end
@impl true
def handle_event(:user_updated, %Event{data: data}) do
%{"user_id" => user_id, "changes" => changes} = data
MyApp.Analytics.track("User Updated", user_id: user_id, changes: changes)
:ok
end
end
```
### Return values
Handlers should return:
- `:ok` - Event processed successfully
- `{:ok, result}` - Event processed successfully with a result
- `{:error, reason}` - Event processing failed (will trigger Oban retry)
Handlers may also raise exceptions, which will trigger Oban's retry mechanism.
## Best practices
### 1. Always Use Transactions
Emit events within transactions to ensure atomicity:
```elixir
Repo.transact(fn ->
with {:ok, user} <- Repo.insert(changeset),
{:ok, _jobs} <- Events.emit(:user_created, %{user_id: user.id}) do
{:ok, user}
end
end)
# If insert fails, emit never happens ✅
# If emit fails, transaction rolls back ✅
```
### 2. Make Handlers Idempotent
**Critical:** Handlers may be executed multiple times due to:
- Oban retries on failure
- Application restarts during processing
- Infrastructure issues (database failover, network timeouts)
Design handlers to be safe when run multiple times with the same event:
```elixir
# Example 1: Outbox pattern for external services (RECOMMENDED)
def handle_event(:send_welcome_email, %Event{
data: data,
idempotency_key: key,
event_id: trace_id
}) do
%{"user_id" => user_id, "email" => email} = data
# Write to outbox table with idempotency_key
# Separate worker processes the outbox
OutboxEmail.insert(%{
idempotency_key: key, # Prevents duplicates on retry
trace_id: trace_id, # For tracing/correlation
user_id: user_id,
email: email,
template: :welcome,
status: :pending
}, on_conflict: :nothing, conflict_target: :idempotency_key)
:ok
end
# Example 2: Database operations - use upserts
def handle_event(:create_user_profile, %Event{data: data}) do
%{"user_id" => user_id} = data
%UserProfile{user_id: user_id}
|> Repo.insert(
on_conflict: :nothing,
conflict_target: :user_id
)
:ok
end
# Example 3: Direct external API call (use event_id for tracing)
# Note: Prefer outbox pattern for critical operations
def handle_event(:track_analytics, %Event{
data: data,
event_id: trace_id,
idempotency_key: key
}) do
# Pass event_id as trace/correlation ID to external service
AnalyticsAPI.track(
event: "user_signup",
properties: data,
trace_id: trace_id, # Links logs across systems
idempotency_key: key # Service can deduplicate
)
:ok
end
# Example 4: Multiple database changes - use Ecto.Multi
def handle_event(:subscription_upgraded, %Event{data: data}) do
%{"user_id" => user_id, "new_plan" => new_plan, "old_plan" => old_plan} = data
# Use Ecto.Multi for multiple related operations
Multi.new()
|> Multi.run(:upgrade, fn _repo, _changes ->
Subscriptions.upgrade(user_id, new_plan)
end)
|> Multi.run(:credits, fn _repo, _changes ->
Credits.add_bonus(user_id, new_plan)
end)
|> Multi.insert(:audit, Audit.changeset(%{
user_id: user_id,
old_plan: old_plan,
new_plan: new_plan
}))
|> Repo.transaction()
|> case do
{:ok, _} -> :ok
{:error, _failed_operation, _failed_value, _changes} ->
{:error, :subscription_upgrade_failed}
end
end
# Example 5: Increment operations - use atomic updates
def handle_event(:page_view, %Event{data: %{"page_id" => page_id}}) do
# Atomic operation - naturally idempotent
Repo.update_all(
from(p in Page, where: p.id == ^page_id),
inc: [view_count: 1]
)
:ok
end
```
### 3. Include All Necessary Data
Don't rely on database lookups for data that might change:
```elixir
# Good: Include all data needed
Events.emit(:status_changed, %{
record_id: record.id,
old_status: old_status,
new_status: record.status
})
# Bad: Handler has to query DB (old_status might be wrong)
Events.emit(:status_changed, %{
record_id: record.id
})
```
### 4. Use JSON-Serializable Data
Event data must be JSON-serializable (no PIDs, refs, or functions):
```elixir
# Good
Events.emit(:user_created, %{
user_id: user.id,
email: user.email,
amount: Decimal.to_string(user.balance)
})
# Bad: Full struct is not reliably serializable
Events.emit(:user_created, %{user: user})
```
### 5. Return Errors for Retriable Failures
```elixir
def handle_event(:send_notification, %Event{data: data}) do
case NotificationService.send(data) do
{:ok, _} -> :ok
{:error, :rate_limited} -> {:error, :rate_limited} # Will retry
{:error, :invalid_data} -> :ok # Don't retry invalid data
end
end
```
### 6. Handle permanently failed jobs
When a handler exhausts all retry attempts, you may want to take action like alerting, logging to a dead letter queue, or notifying administrators. Override `handle_exhausted/4` in your events module:
```elixir
defmodule MyApp.Events do
use ObanEvents, oban: MyApp.Oban
@events %{
user_created: [MyApp.EmailHandler]
}
# Called when a handler fails after max_attempts
def handle_exhausted(event_name, handler_module, event, error) do
# Log to your error tracking service
Sentry.capture_message("Event handler permanently failed",
extra: %{
event: event_name,
handler: handler_module,
event_id: event.event_id,
error: error
}
)
# Store in dead letter queue for manual review
DeadLetterQueue.insert(%{
event_name: event_name,
handler: handler_module,
event_data: event.data,
event_id: event.event_id,
error: inspect(error),
failed_at: DateTime.utc_now()
})
:ok
end
end
```
**Default behavior:**
If you don't override `handle_exhausted/4`, the worker will log a warning with the event details. This ensures failures are always logged even without custom handling.
**The callback receives:**
- `event_name` - The event that failed (atom)
- `handler_module` - The handler that failed (module)
- `event` - The full Event struct with all metadata
- `error` - The error reason from the final attempt
## Handler management
### Renaming or removing handler modules
Handler module names are serialized to the database as fully-qualified Elixir module atoms (e.g., `"Elixir.MyApp.EmailHandler"`). When you rename or remove a handler module, existing queued jobs will still reference the old module name and will fail when Oban tries to execute them.
**Safe migration strategy:**
Use a module alias/stub to maintain backward compatibility during the transition:
**Example 1: Renaming a handler**
```elixir
# After renaming MyApp.EmailHandler to MyApp.Notifications.EmailHandler
# 1. Create the new module with your desired name
defmodule MyApp.Notifications.EmailHandler do
use ObanEvents.Handler
@impl true
def handle_event(:user_created, %Event{data: data}) do
# Your handler logic
:ok
end
end
# 2. Keep the old module as an alias
defmodule MyApp.EmailHandler do
@moduledoc false
defdelegate handle_event(event, event_struct), to: MyApp.Notifications.EmailHandler
end
# 3. Update your @events to use the new name
defmodule MyApp.Events do
use ObanEvents
@events %{
user_created: [
MyApp.Notifications.EmailHandler # New jobs use new name
]
}
end
```
**Example 2: Removing a handler**
```elixir
# Removing MyApp.OldFeatureHandler
# 1. Remove from @events (new jobs won't be created)
defmodule MyApp.Events do
use ObanEvents
@events %{
user_created: [
# MyApp.OldFeatureHandler removed
MyApp.EmailHandler
]
}
end
# 2. Keep a stub module to handle old queued jobs gracefully
defmodule MyApp.OldFeatureHandler do
use ObanEvents.Handler
@impl true
def handle_event(_event, _event_struct) do
# Either process gracefully or just return :ok to discard
:ok
end
end
```
**How this works:**
1. **Existing queued jobs** continue to work (either delegated or gracefully handled)
2. **New jobs** use the updated handler list
3. **Zero downtime** - no jobs fail during the transition
**Cleanup:**
After all old jobs have processed (check Oban Web UI), you can safely remove the alias/stub module. This typically takes as long as your retry window (default: a few hours with exponential backoff).
## Testing
### Testing event emission
```elixir
use Oban.Testing, repo: MyApp.Repo
test "emits user_created event" do
{:ok, user} = Accounts.create_user(%{email: "test@example.com"})
assert_enqueued(
worker: ObanEvents.DispatchWorker,
args: %{
"event_name" => "user_created",
"handler_module" => "Elixir.MyApp.EmailHandler",
"data" => %{"user_id" => user.id}
}
)
end
```
### Testing handlers
Use `ObanEvents.Testing` to easily create Event structs for testing:
```elixir
defmodule MyApp.EmailHandlerTest do
use ExUnit.Case
import ObanEvents.Testing
test "sends welcome email" do
event = build_event(%{"user_id" => 123, "email" => "test@example.com"})
assert :ok = MyApp.EmailHandler.handle_event(:user_created, event)
assert_email_sent(to: "test@example.com", subject: "Welcome!")
end
test "with custom metadata" do
event = build_event(
%{"user_id" => 123},
causation_id: "parent-event-id",
correlation_id: "test-correlation"
)
assert :ok = MyApp.EmailHandler.handle_event(:user_created, event)
end
end
```
### Testing with Oban inline mode
For integration tests, configure Oban to execute jobs inline:
```elixir
# config/test.exs
config :my_app, Oban,
testing: :inline,
queues: false,
plugins: false
# In test
test "creates user and sends email" do
{:ok, user} = Accounts.create_user(%{email: "test@example.com"})
# Email sent immediately in test mode
assert_email_sent(to: "test@example.com")
end
```
## Observability
### Oban Web UI
View event processing history, errors, and retries:
```elixir
# In router.ex (development only)
import Phoenix.LiveDashboard.Router
scope "/dev" do
pipe_through :browser
live_dashboard "/dashboard", metrics: MyAppWeb.Telemetry
forward "/oban", Oban.Web.Router
end
```
### Logging
ObanEvents logs successful event processing at debug level and failures at error level:
```
[debug] Processing event: user_created with handler: MyApp.EmailHandler
[debug] Event processed successfully: user_created by MyApp.EmailHandler
[error] Event handler failed: user_created by MyApp.EmailHandler, error: :network_timeout
```
To enable debug logs at runtime use:
```elixir
iex> Logger.configure(level: :debug)
```
## Troubleshooting
### Events not processing
**Check Oban queue configuration:**
```elixir
# config/config.exs
config :my_app, Oban,
repo: MyApp.Repo,
queues: [
events: 10 # Make sure your queue is configured
]
```
**Check job status in database:**
```sql
SELECT * FROM oban_jobs
WHERE queue = 'events'
ORDER BY inserted_at DESC
LIMIT 10;
```
### Events not emitted
**Verify event is registered:**
```elixir
iex> MyApp.Events.registered?(:user_created)
true
iex> MyApp.Events.all_events()
[:user_created, :user_updated, ...]
```
**Check transaction succeeded:**
Add logging to verify the transaction completes:
```elixir
Repo.transact(fn ->
with {:ok, user} <- Repo.insert(changeset),
{:ok, jobs} <- Events.emit(:user_created, data) do
Logger.info("Emitted #{length(jobs)} jobs")
{:ok, user}
end
end)
```
### Handler failures
**View errors in Oban Web UI** at `/dev/oban`
**Check application logs** for handler errors
**Manually retry failed job:**
```elixir
iex> job = Oban.Job |> Repo.get(job_id)
iex> Oban.retry_job(job)
```
## Examples
See the [test suite](test/) for complete examples of:
- Event emission
- Handler implementation
- Transaction behavior
- Testing patterns
## Disclaimer
ObanEvents is an independent, community-built library and is not an official Oban project.
## Credits
Built with [Oban](https://github.com/sorentwo/oban) by Parker Selbert.