Skip to main content

README.md

# gabsurd

A Gleam SDK for the [Absurd](https://github.com/absurd-sql/absurd) durable workflow system. Provides type-safe database access via Parrot (sqlc) code generation and OTP worker actors for task processing.

## Overview

**gabsurd** wraps the Absurd PostgreSQL schema with two layers:

1. **Generated SQL layer** (`gabsurd/sql`) — Type-safe query functions generated by [Parrot](https://github.com/daniellionel01/parrot) (sqlc for Gleam). All database access goes through Absurd's PL/pgSQL functions.
2. **SDK layer** — Hand-written Gleam modules providing ergonomic APIs for queues, tasks, events, checkpoints, and worker actors.

## Quick Start

### 1. Install & Setup

```toml
# gleam.toml
[dependencies]
gabsurd = { path = "..." }
gleam_erlang = ">= 1.3.0 and < 2.0.0"
gleam_otp = ">= 1.2.0 and < 2.0.0"
gleam_time = ">= 1.8.0 and < 2.0.0"
```

Ensure the Absurd schema is loaded into your PostgreSQL database.

### 2. Connect to the Database

```gleam
import gabsurd/client

let assert Ok(started) = client.start("postgresql://user:pass@localhost:5432/mydb")
let db = started.data
```

### 3. Create a Queue

```gleam
import gabsurd/queue

let assert Ok(Nil) = queue.create(db, "emails")
```

### 4. Spawn Tasks

```gleam
import gleam/json
import gabsurd/task

let assert Ok(info) =
  task.spawn(
    db,
    "emails",
    "send_welcome",
    json.object([#("to", json.string("user@example.com"))]),
    task.new_options() |> task.with_max_attempts(3),
  )
```

### 5. Start a Worker

```gleam
import gleam/dynamic/decode
import gleam/json
import gleam/option
import gabsurd/worker.{Complete, Handler}

let email_handler = Handler(
  task_name: "send_welcome",
  execute: fn(ctx) {
    // Process the task...
    Complete(json.object([#("sent", json.bool(True))]))
  },
  on_error: option.None,
)

let config =
  worker.new(db, "emails", [email_handler])
  |> worker.with_poll_interval(1000)
  |> worker.with_batch_size(10)

// Start a single worker
let assert Ok(started) = worker.start(config)
```

### 6. Worker Pool with Supervisor

```gleam
import gleam/otp/static_supervisor

let pool = worker.pool_child_specs("email_workers", config, 4)

// pool is a List(ChildSpecification) with exactly 4 elements
let [first, second, third, fourth] = pool

let assert Ok(sup) =
  static_supervisor.new(static_supervisor.OneForOne)
  |> static_supervisor.add(first)
  |> static_supervisor.add(second)
  |> static_supervisor.add(third)
  |> static_supervisor.add(fourth)
  |> static_supervisor.start()
```

## API Reference

### `gabsurd/client`

Database connection management.

| Function | Description |
|----------|-------------|
| `start(url)` | Connect to PostgreSQL, returns `StartResult(Db)` |

### `gabsurd/queue`

Queue lifecycle management.

| Function | Description |
|----------|-------------|
| `create(db, name)` | Create an unpartitioned queue |
| `create_with_mode(db, name, mode)` | Create a queue with storage mode |
| `drop(db, name)` | Drop a queue and all its data |
| `list(db)` | List all queue names |

### `gabsurd/task`

Task lifecycle operations.

| Function | Description |
|----------|-------------|
| `spawn(db, queue, name, params, options)` | Create a new task |
| `claim(db, queue, worker_id, timeout, qty)` | Claim available tasks |
| `complete(db, queue, run_id, state)` | Mark a run as completed |
| `fail(db, queue, run_id, reason)` | Fail a run (uses queue retry policy) |
| `fail_with_retry(db, queue, run_id, reason, retry_at)` | Fail a run and schedule retry |
| `cancel(db, queue, task_id)` | Cancel a task |
| `get_result(db, queue, task_id)` | Get task result as `TaskResult` record |
| `retry(db, queue, task_id, options)` | Retry a failed task |
| `extend_claim(db, queue, run_id, extend_by)` | Manually extend a claim lease (heartbeat) |
| `schedule_run(db, queue, run_id, defer_seconds)` | Reschedule a run for future execution |
| `new_options()` | Create empty spawn options |
| `with_max_attempts(options, n)` | Set max attempts |
| `with_retry_strategy(options, strategy)` | Set retry strategy |
| `with_cancellation(options, policy)` | Set cancellation policy |
| `with_headers(options, json)` | Set headers metadata |
| `with_idempotency_key(options, key)` | Prevent duplicate spawning |

### `gabsurd/event`

Event coordination for workflows.

| Function | Description |
|----------|-------------|
| `emit(db, queue, event_name, payload)` | Emit an event |
| `await(db, queue, task_id, run_id, step, event, timeout)` | Await an event |

### `gabsurd/checkpoint`

Workflow checkpoint persistence.

| Function | Description |
|----------|-------------|
| `set(db, queue, task_id, step, state, run_id, extend_claim_by)` | Save a checkpoint (pass `claim_timeout` to extend lease) |
| `get(db, queue, task_id, step, include_pending)` | Retrieve a checkpoint, returns `Result(Option(Checkpoint), Error)` |

### `gabsurd/worker`

OTP worker actor for task processing.

| Type | Description |
|------|-------------|
| `Handler(task_name, execute, on_error)` | Handler for a specific task type |
| `HandlerResult` | Return type: `Complete(json)`, `Fail(json)`, or `Suspend` |
| `Config` | Worker configuration |
| `Worker` | Running worker handle |
| Function | Description |
|----------|-------------|
| `new(db, queue, handlers)` | Create config with defaults |
| `with_poll_interval(config, ms)` | Set poll interval (default: 5000ms) |
| `with_batch_size(config, n)` | Set tasks claimed per poll (default: 1) |
| `with_claim_timeout(config, secs)` | Set claim timeout (default: 30s) |
| `with_worker_id(config, id)` | Set worker ID |
| `with_max_backoff(config, ms)` | Set max error backoff (default: 60000ms) |
| `start(config)` | Start a worker actor |
| `stop(worker)` | Stop a worker gracefully |
| `child_spec(name, config)` | Create supervisor child spec |
| `pool_child_specs(name, config, count)` | Create N child specs for a pool |

### `gabsurd/context`

Execution context passed to worker handlers.

| Type | Description |
|------|-------------|
| `Context` | Encapsulates db, queue, claim, claim_timeout |
| `EventResult` | `Received(String)` or `Suspended` |

| Function | Description |
|----------|-------------|
| `task_id(ctx)` | Task's unique identifier |
| `run_id(ctx)` | Current run's unique identifier |
| `params(ctx)` | Task parameters as raw JSON string |
| `task_name(ctx)` | Task name |
| `attempt(ctx)` | Current attempt number |
| `step(ctx, name, decoder, run)` | Idempotent step — skips if checkpoint exists |
| `get_checkpoint(ctx, name)` | Get raw JSON state for a step |
| `set_checkpoint(ctx, name, state)` | Persist a checkpoint and extend lease |
| `heartbeat(ctx)` | Extend the claim lease |
| `await_event(ctx, event_name, timeout)` | Await an event, returns `EventResult` |

### `gabsurd/utility`

Maintenance and introspection.

| Function | Description |
|----------|-------------|
| `cleanup_queue(db, queue)` | Clean up completed/failed tasks |
| `get_schema_version(db)` | Get the installed schema version |

## Architecture

```
┌─────────────────────────────────────┐
│           Your Application          │
├─────────────────────────────────────┤
│   worker.gleam   │  task.gleam      │  ← SDK Layer (hand-written)
│   context.gleam  │  queue.gleam      │
│   event.gleam    │  checkpoint.gleam │
│   utility.gleam  │                   │
├─────────────────────────────────────┤
│          sql.gleam (generated)      │  ← Parrot codegen
├─────────────────────────────────────┤
│   param.gleam  │  client.gleam      │  ← pog driver adapter
├─────────────────────────────────────┤
│        PostgreSQL + absurd.sql      │  ← Database
└─────────────────────────────────────┘
```

## Worker Design

Workers use the **Handler Record** pattern — each task type gets a `Handler` with:
- `task_name`: Which tasks this handler processes
- `execute`: The business logic — `fn(Context) -> HandlerResult`
- `on_error`: Optional hook for logging/metrics when execute returns `Fail`

The handler receives a `Context` that encapsulates the database connection, queue,
claim details, and claim timeout. It provides `context.step(ctx, ...)` for
idempotent steps and `context.await_event(ctx, ...)` for event-driven workflows.

The `HandlerResult` type has three variants:
- `Complete(json)` — mark the task as successfully done
- `Fail(json)` — mark the task as failed (triggers retry if attempts remain)
- `Suspend` — the task is waiting for an event; skip complete/fail

The worker polls the queue using `process.send_after`, claims tasks, constructs
a `Context`, dispatches to matching handlers by `task_name`, and handles the result.

### Idempotent Steps

`context.step(ctx, name, decoder, run)` is the primary building block for durable workflows:
- On first execution: calls `run()`, persists the result as a checkpoint, extends the lease
- On retry: finds the checkpoint, decodes the stored value, skips execution
- `decoder` is a `gleam/dynamic/decode.Decoder(a)` — Gleam's `json.Json` is write-only,
  so a decoder is required to recover typed values from the database
- For steps that don't need a return value, use `decode.success(Nil)`

### Unknown Task Deferral

Tasks with no registered handler are deferred (rescheduled with a delay) rather than
failed. This supports rolling deployments where a new task type may arrive before
its handler code is deployed.

### Error Backoff

On transient claim errors, the worker backs off exponentially up to `max_backoff`
(default 60s), resetting on success.

For pools, use `pool_child_specs` to generate N workers with globally unique IDs
inside a `static_supervisor`.

## Examples

### Multi-Step Workflow with Checkpoints

Durable workflows survive crashes. Each step uses `context.step` — on retry,
completed steps are skipped. Steps also extend the worker's claim lease
so the task never times out mid-workflow.

```gleam
import gleam/dynamic/decode
import gleam/json
import gleam/option
import gleam/result
import gabsurd/client
import gabsurd/context
import gabsurd/worker.{Complete, Fail, Handler}

let process_order = Handler(
  task_name: "process_order",
  execute: fn(ctx) {
    case order_workflow(ctx) {
      Ok(Nil) -> Complete(json.object([#("status", json.string("completed"))]))
      Error(e) -> Fail(json.string(error_to_string(e)))
    }
  },
  on_error: option.None,
)

fn order_workflow(ctx) -> Result(Nil, client.GabsurdError) {
  let params = decode_params(context.params(ctx))

  // Step 1: Charge credit card (skip if checkpoint exists)
  use _ <- result.try(context.step(ctx, "charge", decode.success(Nil), fn() {
    charge_card(params)
    json.null()
  }))

  // Step 2: Reserve inventory (skip if done)
  use _ <- result.try(context.step(ctx, "reserve", decode.success(Nil), fn() {
    reserve_inventory(params)
    json.null()
  }))

  // Step 3: Send confirmation
  use _ <- result.try(context.step(ctx, "notify", decode.success(Nil), fn() {
    send_confirmation(params)
    json.null()
  }))

  Ok(Nil)
}
```

### Steps That Pass Data Forward

```gleam
fn order_workflow(ctx) -> Result(Nil, client.GabsurdError) {
  // Charge and get back the charge_id
  use charge_id <- result.try(
    context.step(ctx, "charge", charge_id_decoder(), fn() {
      let result = charge_card(decode_params(context.params(ctx)))
      json.object([#("charge_id", json.string(result.id))])
    }),
  )

  // Use charge_id in the next step
  use _ <- result.try(context.step(ctx, "capture", decode.success(Nil), fn() {
    capture_charge(charge_id)
    json.null()
  }))

  Ok(Nil)
}

// Decoder using Gleam's continuation-based decode.field API:
fn charge_id_decoder() -> decode.Decoder(String) {
  use charge_id <- decode.field("charge_id", decode.string)
  decode.success(charge_id)
}
```

### Event-Driven Workflow

Tasks can suspend waiting for an external event (e.g., a webhook callback).
Call `context.await_event` and return `Suspend` when the task needs to sleep.
When `event.emit` fires, the task wakes up and the handler runs again.

```gleam
import gleam/dynamic/decode
import gleam/json
import gleam/option
import gleam/result
import gabsurd/client
import gabsurd/context
import gabsurd/context.{Received, Suspended}
import gabsurd/worker.{Complete, Fail, Handler, Suspend}

let generate_report = Handler(
  task_name: "generate_report",
  execute: fn(ctx) {
    // Start the remote job (idempotent — skips if already done)
    use job_id <- result.try(
      context.step(ctx, "start", job_id_decoder(), fn() {
        let id = start_remote_job(context.params(ctx))
        json.object([#("job_id", json.string(id))])
      }),
    )

    // Wait for the remote service to POST back
    case context.await_event(ctx, "report_complete_" <> job_id, 3600) {
      Ok(Received(payload)) -> Complete(json.string(payload))
      Ok(Suspended) -> Suspend
      Error(e) -> Fail(json.string("event error"))
    }
  },
  on_error: option.None,
)

// Decoder for the job_id stored in the checkpoint:
fn job_id_decoder() -> decode.Decoder(String) {
  use job_id <- decode.field("job_id", decode.string)
  decode.success(job_id)
}
```

Then in your webhook HTTP handler (separate process):

```gleam
import gabsurd/event

// When the remote service calls back:
let assert Ok(Nil) = event.emit(
  db, "reports",
  "report_complete_" <> job_id,  // must match event_name from await
  json.object([#("url", json.string(download_url))]),
)
```

### Idempotent Task Spawning

Use `with_idempotency_key` to prevent duplicate tasks — critical for
payment processing where double-spawning means double-charging.

```gleam
let assert Ok(info) =
  task.spawn(
    db, "payments", "charge_card",
    json.object([#("amount", json.int(9999))]),
    task.new_options()
    |> task.with_idempotency_key("order-" <> order_id),
  )
// If the caller crashes and retries, the same task is returned
// (info.created will be False on subsequent calls)
```

### Retry Strategies

```gleam
// Exponential backoff for flaky APIs
let assert Ok(_) =
  task.spawn(
    db, "integrations", "sync_crm",
    json.object([#("contact_id", json.string(id))]),
    task.new_options()
    |> task.with_max_attempts(5)
    |> task.with_retry_strategy(task.ExponentialRetry(
      base_seconds: 10, factor: 2.0, max_seconds: 300.0,
    )),
  )
// Retries at ~10s, 20s, 40s, 80s, capped at 300s
```

### Long-Running Tasks with Manual Heartbeats

For tasks that do a single long operation without checkpoints:

```gleam
import gleam/list
import gleam/json
import gabsurd/worker.{Complete, Handler}

let handler = Handler(
  task_name: "batch_import",
  execute: fn(ctx) {
    list.each(records, fn(batch) {
      process_batch(batch)
      // Keep the lease alive after each batch
      let _ = context.heartbeat(ctx)
    })
    Complete(json.object([#("imported", json.int(list.length(records)))]))
  },
  on_error: option.None,
)
```

## Development

```bash
# Start PostgreSQL
bash bin/postgres.sh

# Reset database
bash bin/reset_db.sh

# Run tests
DATABASE_URL="postgresql://gabsurd:gabsurd@127.0.0.1:5432/gabsurd" gleam test

# Regenerate SQL (after changing queries.sql)
DATABASE_URL="..." PATH="$(nix eval --raw 'nixpkgs#postgresql_17.out')/bin:$PATH" gleam run -m parrot
```

## License

Apache-2.0