# gabsurd
A Gleam SDK for the [Absurd](https://github.com/absurd-sql/absurd) durable workflow system. Provides type-safe database access via Parrot (sqlc) code generation and OTP worker actors for task processing.
## Overview
**gabsurd** wraps the Absurd PostgreSQL schema with two layers:
1. **Generated SQL layer** (`gabsurd/sql`) — Type-safe query functions generated by [Parrot](https://github.com/daniellionel01/parrot) (sqlc for Gleam). All database access goes through Absurd's PL/pgSQL functions.
2. **SDK layer** — Hand-written Gleam modules providing ergonomic APIs for queues, tasks, events, checkpoints, and worker actors.
## Quick Start
### 1. Install & Setup
```toml
# gleam.toml
[dependencies]
gabsurd = { path = "..." }
gleam_erlang = ">= 1.3.0 and < 2.0.0"
gleam_otp = ">= 1.2.0 and < 2.0.0"
gleam_time = ">= 1.8.0 and < 2.0.0"
```
Ensure the Absurd schema is loaded into your PostgreSQL database.
### 2. Connect to the Database
```gleam
import gabsurd/client
let assert Ok(started) = client.start("postgresql://user:pass@localhost:5432/mydb")
let db = started.data
```
### 3. Create a Queue
```gleam
import gabsurd/queue
let assert Ok(Nil) = queue.create(db, "emails")
```
### 4. Spawn Tasks
```gleam
import gleam/json
import gabsurd/task
let assert Ok(info) =
task.spawn(
db,
"emails",
"send_welcome",
json.object([#("to", json.string("user@example.com"))]),
task.new_options() |> task.with_max_attempts(3),
)
```
### 5. Start a Worker
```gleam
import gleam/dynamic/decode
import gleam/json
import gleam/option
import gabsurd/worker.{Complete, Handler}
let email_handler = Handler(
task_name: "send_welcome",
execute: fn(ctx) {
// Process the task...
Complete(json.object([#("sent", json.bool(True))]))
},
on_error: option.None,
)
let config =
worker.new(db, "emails", [email_handler])
|> worker.with_poll_interval(1000)
|> worker.with_batch_size(10)
// Start a single worker
let assert Ok(started) = worker.start(config)
```
### 6. Worker Pool with Supervisor
```gleam
import gleam/otp/static_supervisor
let pool = worker.pool_child_specs("email_workers", config, 4)
// pool is a List(ChildSpecification) with exactly 4 elements
let [first, second, third, fourth] = pool
let assert Ok(sup) =
static_supervisor.new(static_supervisor.OneForOne)
|> static_supervisor.add(first)
|> static_supervisor.add(second)
|> static_supervisor.add(third)
|> static_supervisor.add(fourth)
|> static_supervisor.start()
```
## API Reference
### `gabsurd/client`
Database connection management.
| Function | Description |
|----------|-------------|
| `start(url)` | Connect to PostgreSQL, returns `StartResult(Db)` |
### `gabsurd/queue`
Queue lifecycle management.
| Function | Description |
|----------|-------------|
| `create(db, name)` | Create an unpartitioned queue |
| `create_with_mode(db, name, mode)` | Create a queue with storage mode |
| `drop(db, name)` | Drop a queue and all its data |
| `list(db)` | List all queue names |
### `gabsurd/task`
Task lifecycle operations.
| Function | Description |
|----------|-------------|
| `spawn(db, queue, name, params, options)` | Create a new task |
| `claim(db, queue, worker_id, timeout, qty)` | Claim available tasks |
| `complete(db, queue, run_id, state)` | Mark a run as completed |
| `fail(db, queue, run_id, reason)` | Fail a run (uses queue retry policy) |
| `fail_with_retry(db, queue, run_id, reason, retry_at)` | Fail a run and schedule retry |
| `cancel(db, queue, task_id)` | Cancel a task |
| `get_result(db, queue, task_id)` | Get task result as `TaskResult` record |
| `retry(db, queue, task_id, options)` | Retry a failed task |
| `extend_claim(db, queue, run_id, extend_by)` | Manually extend a claim lease (heartbeat) |
| `schedule_run(db, queue, run_id, defer_seconds)` | Reschedule a run for future execution |
| `new_options()` | Create empty spawn options |
| `with_max_attempts(options, n)` | Set max attempts |
| `with_retry_strategy(options, strategy)` | Set retry strategy |
| `with_cancellation(options, policy)` | Set cancellation policy |
| `with_headers(options, json)` | Set headers metadata |
| `with_idempotency_key(options, key)` | Prevent duplicate spawning |
### `gabsurd/event`
Event coordination for workflows.
| Function | Description |
|----------|-------------|
| `emit(db, queue, event_name, payload)` | Emit an event |
| `await(db, queue, task_id, run_id, step, event, timeout)` | Await an event |
### `gabsurd/checkpoint`
Workflow checkpoint persistence.
| Function | Description |
|----------|-------------|
| `set(db, queue, task_id, step, state, run_id, extend_claim_by)` | Save a checkpoint (pass `claim_timeout` to extend lease) |
| `get(db, queue, task_id, step, include_pending)` | Retrieve a checkpoint, returns `Result(Option(Checkpoint), Error)` |
### `gabsurd/worker`
OTP worker actor for task processing.
| Type | Description |
|------|-------------|
| `Handler(task_name, execute, on_error)` | Handler for a specific task type |
| `HandlerResult` | Return type: `Complete(json)`, `Fail(json)`, or `Suspend` |
| `Config` | Worker configuration |
| `Worker` | Running worker handle |
| Function | Description |
|----------|-------------|
| `new(db, queue, handlers)` | Create config with defaults |
| `with_poll_interval(config, ms)` | Set poll interval (default: 5000ms) |
| `with_batch_size(config, n)` | Set tasks claimed per poll (default: 1) |
| `with_claim_timeout(config, secs)` | Set claim timeout (default: 30s) |
| `with_worker_id(config, id)` | Set worker ID |
| `with_max_backoff(config, ms)` | Set max error backoff (default: 60000ms) |
| `start(config)` | Start a worker actor |
| `stop(worker)` | Stop a worker gracefully |
| `child_spec(name, config)` | Create supervisor child spec |
| `pool_child_specs(name, config, count)` | Create N child specs for a pool |
### `gabsurd/context`
Execution context passed to worker handlers.
| Type | Description |
|------|-------------|
| `Context` | Encapsulates db, queue, claim, claim_timeout |
| `EventResult` | `Received(String)` or `Suspended` |
| Function | Description |
|----------|-------------|
| `task_id(ctx)` | Task's unique identifier |
| `run_id(ctx)` | Current run's unique identifier |
| `params(ctx)` | Task parameters as raw JSON string |
| `task_name(ctx)` | Task name |
| `attempt(ctx)` | Current attempt number |
| `step(ctx, name, decoder, run)` | Idempotent step — skips if checkpoint exists |
| `get_checkpoint(ctx, name)` | Get raw JSON state for a step |
| `set_checkpoint(ctx, name, state)` | Persist a checkpoint and extend lease |
| `heartbeat(ctx)` | Extend the claim lease |
| `await_event(ctx, event_name, timeout)` | Await an event, returns `EventResult` |
### `gabsurd/utility`
Maintenance and introspection.
| Function | Description |
|----------|-------------|
| `cleanup_queue(db, queue)` | Clean up completed/failed tasks |
| `get_schema_version(db)` | Get the installed schema version |
## Architecture
```
┌─────────────────────────────────────┐
│ Your Application │
├─────────────────────────────────────┤
│ worker.gleam │ task.gleam │ ← SDK Layer (hand-written)
│ context.gleam │ queue.gleam │
│ event.gleam │ checkpoint.gleam │
│ utility.gleam │ │
├─────────────────────────────────────┤
│ sql.gleam (generated) │ ← Parrot codegen
├─────────────────────────────────────┤
│ param.gleam │ client.gleam │ ← pog driver adapter
├─────────────────────────────────────┤
│ PostgreSQL + absurd.sql │ ← Database
└─────────────────────────────────────┘
```
## Worker Design
Workers use the **Handler Record** pattern — each task type gets a `Handler` with:
- `task_name`: Which tasks this handler processes
- `execute`: The business logic — `fn(Context) -> HandlerResult`
- `on_error`: Optional hook for logging/metrics when execute returns `Fail`
The handler receives a `Context` that encapsulates the database connection, queue,
claim details, and claim timeout. It provides `context.step(ctx, ...)` for
idempotent steps and `context.await_event(ctx, ...)` for event-driven workflows.
The `HandlerResult` type has three variants:
- `Complete(json)` — mark the task as successfully done
- `Fail(json)` — mark the task as failed (triggers retry if attempts remain)
- `Suspend` — the task is waiting for an event; skip complete/fail
The worker polls the queue using `process.send_after`, claims tasks, constructs
a `Context`, dispatches to matching handlers by `task_name`, and handles the result.
### Idempotent Steps
`context.step(ctx, name, decoder, run)` is the primary building block for durable workflows:
- On first execution: calls `run()`, persists the result as a checkpoint, extends the lease
- On retry: finds the checkpoint, decodes the stored value, skips execution
- `decoder` is a `gleam/dynamic/decode.Decoder(a)` — Gleam's `json.Json` is write-only,
so a decoder is required to recover typed values from the database
- For steps that don't need a return value, use `decode.success(Nil)`
### Unknown Task Deferral
Tasks with no registered handler are deferred (rescheduled with a delay) rather than
failed. This supports rolling deployments where a new task type may arrive before
its handler code is deployed.
### Error Backoff
On transient claim errors, the worker backs off exponentially up to `max_backoff`
(default 60s), resetting on success.
For pools, use `pool_child_specs` to generate N workers with globally unique IDs
inside a `static_supervisor`.
## Examples
### Multi-Step Workflow with Checkpoints
Durable workflows survive crashes. Each step uses `context.step` — on retry,
completed steps are skipped. Steps also extend the worker's claim lease
so the task never times out mid-workflow.
```gleam
import gleam/dynamic/decode
import gleam/json
import gleam/option
import gleam/result
import gabsurd/client
import gabsurd/context
import gabsurd/worker.{Complete, Fail, Handler}
let process_order = Handler(
task_name: "process_order",
execute: fn(ctx) {
case order_workflow(ctx) {
Ok(Nil) -> Complete(json.object([#("status", json.string("completed"))]))
Error(e) -> Fail(json.string(error_to_string(e)))
}
},
on_error: option.None,
)
fn order_workflow(ctx) -> Result(Nil, client.GabsurdError) {
let params = decode_params(context.params(ctx))
// Step 1: Charge credit card (skip if checkpoint exists)
use _ <- result.try(context.step(ctx, "charge", decode.success(Nil), fn() {
charge_card(params)
json.null()
}))
// Step 2: Reserve inventory (skip if done)
use _ <- result.try(context.step(ctx, "reserve", decode.success(Nil), fn() {
reserve_inventory(params)
json.null()
}))
// Step 3: Send confirmation
use _ <- result.try(context.step(ctx, "notify", decode.success(Nil), fn() {
send_confirmation(params)
json.null()
}))
Ok(Nil)
}
```
### Steps That Pass Data Forward
```gleam
fn order_workflow(ctx) -> Result(Nil, client.GabsurdError) {
// Charge and get back the charge_id
use charge_id <- result.try(
context.step(ctx, "charge", charge_id_decoder(), fn() {
let result = charge_card(decode_params(context.params(ctx)))
json.object([#("charge_id", json.string(result.id))])
}),
)
// Use charge_id in the next step
use _ <- result.try(context.step(ctx, "capture", decode.success(Nil), fn() {
capture_charge(charge_id)
json.null()
}))
Ok(Nil)
}
// Decoder using Gleam's continuation-based decode.field API:
fn charge_id_decoder() -> decode.Decoder(String) {
use charge_id <- decode.field("charge_id", decode.string)
decode.success(charge_id)
}
```
### Event-Driven Workflow
Tasks can suspend waiting for an external event (e.g., a webhook callback).
Call `context.await_event` and return `Suspend` when the task needs to sleep.
When `event.emit` fires, the task wakes up and the handler runs again.
```gleam
import gleam/dynamic/decode
import gleam/json
import gleam/option
import gleam/result
import gabsurd/client
import gabsurd/context
import gabsurd/context.{Received, Suspended}
import gabsurd/worker.{Complete, Fail, Handler, Suspend}
let generate_report = Handler(
task_name: "generate_report",
execute: fn(ctx) {
// Start the remote job (idempotent — skips if already done)
use job_id <- result.try(
context.step(ctx, "start", job_id_decoder(), fn() {
let id = start_remote_job(context.params(ctx))
json.object([#("job_id", json.string(id))])
}),
)
// Wait for the remote service to POST back
case context.await_event(ctx, "report_complete_" <> job_id, 3600) {
Ok(Received(payload)) -> Complete(json.string(payload))
Ok(Suspended) -> Suspend
Error(e) -> Fail(json.string("event error"))
}
},
on_error: option.None,
)
// Decoder for the job_id stored in the checkpoint:
fn job_id_decoder() -> decode.Decoder(String) {
use job_id <- decode.field("job_id", decode.string)
decode.success(job_id)
}
```
Then in your webhook HTTP handler (separate process):
```gleam
import gabsurd/event
// When the remote service calls back:
let assert Ok(Nil) = event.emit(
db, "reports",
"report_complete_" <> job_id, // must match event_name from await
json.object([#("url", json.string(download_url))]),
)
```
### Idempotent Task Spawning
Use `with_idempotency_key` to prevent duplicate tasks — critical for
payment processing where double-spawning means double-charging.
```gleam
let assert Ok(info) =
task.spawn(
db, "payments", "charge_card",
json.object([#("amount", json.int(9999))]),
task.new_options()
|> task.with_idempotency_key("order-" <> order_id),
)
// If the caller crashes and retries, the same task is returned
// (info.created will be False on subsequent calls)
```
### Retry Strategies
```gleam
// Exponential backoff for flaky APIs
let assert Ok(_) =
task.spawn(
db, "integrations", "sync_crm",
json.object([#("contact_id", json.string(id))]),
task.new_options()
|> task.with_max_attempts(5)
|> task.with_retry_strategy(task.ExponentialRetry(
base_seconds: 10, factor: 2.0, max_seconds: 300.0,
)),
)
// Retries at ~10s, 20s, 40s, 80s, capped at 300s
```
### Long-Running Tasks with Manual Heartbeats
For tasks that do a single long operation without checkpoints:
```gleam
import gleam/list
import gleam/json
import gabsurd/worker.{Complete, Handler}
let handler = Handler(
task_name: "batch_import",
execute: fn(ctx) {
list.each(records, fn(batch) {
process_batch(batch)
// Keep the lease alive after each batch
let _ = context.heartbeat(ctx)
})
Complete(json.object([#("imported", json.int(list.length(records)))]))
},
on_error: option.None,
)
```
## Development
```bash
# Start PostgreSQL
bash bin/postgres.sh
# Reset database
bash bin/reset_db.sh
# Run tests
DATABASE_URL="postgresql://gabsurd:gabsurd@127.0.0.1:5432/gabsurd" gleam test
# Regenerate SQL (after changing queries.sql)
DATABASE_URL="..." PATH="$(nix eval --raw 'nixpkgs#postgresql_17.out')/bin:$PATH" gleam run -m parrot
```
## License
Apache-2.0