# HephaestusOban
Oban-based runner adapter for [Hephaestus](https://github.com/lucas-stellet/hephaestus) workflow engine.
Turns each workflow step into a durable Oban job with retry/backoff, advisory lock serialization, and zero-contention parallel execution via an auxiliary `step_results` table.
## Installation
Add to your `mix.exs`:
```elixir
def deps do
[
{:hephaestus_oban, "~> 0.1.0"}
]
end
```
## Setup
### 1. Generate and run migrations
Requires [HephaestusEcto](https://github.com/lucas-stellet/hephaestus_ecto) migration to be run first (FK reference).
```bash
mix hephaestus_ecto.gen.migration
mix hephaestus_oban.gen.migration
mix ecto.migrate
```
### 2. Configure your workflow engine
```elixir
defmodule MyApp.Hephaestus do
use Hephaestus,
storage: {HephaestusEcto.Storage, repo: MyApp.Repo},
runner: {HephaestusOban.Runner, oban: MyApp.Oban}
end
```
### 3. Add to your supervision tree
```elixir
children = [
MyApp.Repo,
{Oban, name: MyApp.Oban, repo: MyApp.Repo, queues: [hephaestus: 10]},
MyApp.Hephaestus
]
```
## How it works
### Architecture
```
start_instance(Workflow, context)
|
+-- Instance.new() --> persist via HephaestusEcto.Storage
+-- Oban.insert(AdvanceWorker)
|
v
AdvanceWorker (single Instance writer, advisory lock)
| Engine.advance() --> active_steps: {StepA, StepB, StepC}
+-- enqueue 3x ExecuteStepWorker
|
+-- ExecuteStepWorker(StepA) --+
+-- ExecuteStepWorker(StepB) --+ parallel, zero contention
+-- ExecuteStepWorker(StepC) --+
|
each: execute step |
INSERT step_results |
enqueue AdvanceWorker |
v
AdvanceWorker (serialized via unique + advisory lock)
| apply step_results --> Engine.complete_step + activate_transitions
| persist Instance
|
+-- :completed --> done
+-- :waiting --> awaits ResumeWorker
+-- active --> enqueue ExecuteStepWorkers (next wave)
```
### Three workers
| Worker | Role | Writes to Instance? |
|--------|------|---------------------|
| **AdvanceWorker** | Orchestrator. Reads step_results, applies Engine transitions, persists Instance. Serialized per instance via Oban unique + `pg_advisory_xact_lock`. | Yes (single writer) |
| **ExecuteStepWorker** | Executes a single step. Writes result to `step_results` table, enqueues AdvanceWorker. Idempotent via existence check. | No |
| **ResumeWorker** | Handles external events and durable timers. Writes to `step_results`, enqueues AdvanceWorker. | No |
### Concurrency model
ExecuteStepWorkers run in parallel during fan-out. They never write to the Instance directly — each inserts its own row into `hephaestus_step_results` (zero contention). The AdvanceWorker is the **single writer** for the Instance, serialized via Oban unique constraint + PostgreSQL advisory lock. All Instance mutations happen atomically inside a `Repo.transaction` with `pg_advisory_xact_lock`.
### Failure handling
When an ExecuteStepWorker exhausts all retries (discarded by Oban), the `FailureHandler` telemetry listener detects it and enqueues an AdvanceWorker, which marks the workflow as `:failed` and cancels remaining pending jobs.
### Retry configuration
Retry config resolves with most-specific-wins priority:
1. `Step.retry_config/0` — per-step override (optional callback)
2. `Workflow.default_retry_config/0` — per-workflow default (optional callback)
3. Library default — `%{max_attempts: 5, backoff: :exponential, max_backoff: 60_000}`
### Async steps and durable timers
```elixir
# Step returns {:async} --> instance moves to :waiting
# Resume with external event:
MyApp.Hephaestus.resume(instance_id, :payment_confirmed)
# Schedule a durable timer (survives VM restarts):
MyApp.Hephaestus.schedule_resume(instance_id, :wait_step, 30_000)
# Returns {:ok, job_id} — cancellable via Oban.cancel_job/1
```
## Database schema
### step_results table
Auxiliary table for zero-contention parallel step execution:
```
hephaestus_step_results
+-- id UUID (primary key)
+-- instance_id UUID (FK -> workflow_instances, ON DELETE CASCADE)
+-- step_ref STRING (module name)
+-- event STRING (step event or "__async__" sentinel)
+-- context_updates JSONB (step output data)
+-- processed BOOLEAN (consumed by AdvanceWorker)
+-- inserted_at TIMESTAMP
```
Indexes:
- Partial index on `instance_id WHERE NOT processed` — fast pending lookups
- Partial unique index on `(instance_id, step_ref) WHERE NOT processed` — idempotent inserts via `ON CONFLICT DO NOTHING`
## Queue configuration
```elixir
# Default: single queue for all workers
{Oban, queues: [hephaestus: 10]}
# Advanced: separate queues for orchestration vs execution
{Oban, queues: [hephaestus_advance: 5, hephaestus_execute: 20]}
```
The `hephaestus: 10` means up to 10 Oban jobs run concurrently. In a fan-out of 20 steps, only 10 execute at once — the rest wait. Adjust based on your workload.
## Error handling
| Scenario | Handler | Outcome |
|----------|---------|---------|
| Step returns `{:error, reason}` | Oban retry with backoff | Retried up to max_attempts |
| Step exhausts all retries | FailureHandler (telemetry) | Workflow marked `:failed` |
| Step crashes/raises | Oban catches, treats as error | Same retry flow |
| AdvanceWorker fails | Oban retry | Idempotent — re-applies unprocessed step_results |
| ResumeWorker fails | Oban retry | Idempotent — INSERT deduplicated via unique index |
| DB connection lost | Oban retry | All workers are idempotent |
## Requirements
- Elixir ~> 1.19
- Oban >= 2.14
- PostgreSQL (for advisory locks and JSONB)
- [Hephaestus](https://github.com/lucas-stellet/hephaestus) ~> 0.1
- [HephaestusEcto](https://github.com/lucas-stellet/hephaestus_ecto) ~> 0.1
## License
MIT