# Shigoto 仕事
PostgreSQL-backed background job processing for Erlang.
Shigoto (仕事, "work") is a feature-rich job queue built on PostgreSQL's `FOR UPDATE SKIP LOCKED` for safe multi-node operation. No Redis or external broker needed — if you have PostgreSQL, you have a job queue.
## Features
### Core
- **PostgreSQL-backed** — Jobs stored in PostgreSQL via [pgo](https://github.com/erleans/pgo). Transactional enqueue using the same pool as your application.
- **Multi-node safe** — `FOR UPDATE SKIP LOCKED` ensures each job is claimed by exactly one node. No coordination required.
- **Priority queues** — Higher priority jobs are claimed first. Multiple queues with independent concurrency limits.
- **Dynamic queues** — Add and remove queues at runtime without restart.
- **Exponential backoff** — Failed jobs retry with `min(attempt^4 + jitter, 1800)` second delays, or provide a custom `backoff/2` callback.
### Scheduling
- **Cron scheduling** — Built-in 5-field cron parser with leader election via advisory locks. Catches up on missed intervals after restarts.
- **Scheduled jobs** — Enqueue jobs for future execution with `scheduled_at`.
- **Job snoozing** — Workers can return `{snooze, Seconds}` to reschedule without consuming a retry attempt.
### Reliability
- **Job dependencies** — `depends_on` chains ensure jobs execute in order. Cycle detection prevents deadlocks.
- **Batches** — Group jobs with completion callbacks. Batch state tracks completed/discarded counts.
- **Unique jobs** — Prevent duplicates with configurable keys, states, time windows, debounce, and field replacement on conflict.
- **Stale job rescue** — Heartbeat-based detection of zombie jobs with automatic rescheduling.
- **Graceful shutdown** — Waits for in-flight jobs to finish before stopping.
### Resilience ([seki](https://github.com/Taure/seki) integration)
- **Rate limiting** — Per-worker token bucket, sliding window, GCRA, or leaky bucket.
- **Circuit breaking** — Auto-opens on repeated failures, configurable per worker.
- **Bulkhead** — Per-worker concurrency limits (local node) and global concurrency limits (across nodes via PostgreSQL).
- **Load shedding** — CoDel-based system-level protection, shedding low-priority jobs first.
### Operations
- **Encryption** — AES-256-GCM encryption for job args at rest with key rotation support.
- **Middleware** — Composable before/after hooks for logging, metrics, authorization.
- **Telemetry** — 16+ telemetry events covering job lifecycle, queue operations, resilience, batches, and cron.
- **Health check** — `shigoto:health/0` reports pool status, job counts, stale jobs, and queue health.
- **Bulk operations** — `insert_all/1`, `cancel_by/2`, `retry_by/2` for batch operations.
- **Auto-archival** — Old jobs archived to `shigoto_jobs_archive` table, then pruned.
- **Dashboard** — [shigoto_board](https://github.com/Taure/shigoto_board) provides a real-time web dashboard.
### Testing
- **Synchronous drain** — `drain_queue/1` processes all jobs synchronously for deterministic tests.
- **74 tests** — Comprehensive test coverage across 3 Common Test suites.
## Quick Start
Add to your deps:
```erlang
{deps, [
{shigoto, {git, "https://github.com/Taure/shigoto.git", {branch, "main"}}}
]}.
```
Configure in `sys.config`:
```erlang
{shigoto, [
{pool, my_app_db},
{queues, [{<<"default">>, 10}, {<<"emails">>, 5}]},
{poll_interval, 5000},
{cron, [
{<<"daily_cleanup">>, <<"0 3 * * *">>, my_cleanup_worker, #{}}
]}
]}
```
Run the migration:
```erlang
shigoto_migration:up(my_app_db).
```
Define a worker:
```erlang
-module(my_email_worker).
-behaviour(shigoto_worker).
-export([perform/1]).
perform(#{<<"to">> := To, <<"subject">> := Subject}) ->
send_email(To, Subject),
ok.
```
Enqueue jobs:
```erlang
%% Simple insert
shigoto:insert(#{
worker => my_email_worker,
args => #{<<"to">> => <<"user@example.com">>, <<"subject">> => <<"Welcome">>}
}).
%% Scheduled for later
shigoto:insert(#{
worker => my_cleanup_worker,
scheduled_at => {{2026, 3, 20}, {3, 0, 0}}
}).
%% With priority and queue
shigoto:insert(#{
worker => my_urgent_worker,
args => #{},
priority => 10,
queue => <<"critical">>
}).
```
## Job Lifecycle
```
available → executing → completed
↘ retryable → available (retry with backoff)
↘ discarded (max attempts reached)
↘ snoozed → available (rescheduled, attempt preserved)
```
Jobs can also be `cancelled` via `shigoto:cancel/2` and retried via `shigoto:retry/2`.
## Worker Callbacks
All optional except `perform/1`:
| Callback | Default | Description |
|----------|---------|-------------|
| `perform/1` | *required* | Execute the job. Return `ok`, `{error, Reason}`, or `{snooze, Seconds}` |
| `max_attempts/0` | `3` | Maximum retry attempts before discarding |
| `queue/0` | `<<"default">>` | Default queue name |
| `priority/0` | `0` | Default priority (higher = claimed first) |
| `timeout/0` | `300000` | Execution timeout in milliseconds |
| `unique/0` | — | Uniqueness constraints |
| `tags/0` | `[]` | Default tags for filtering |
| `backoff/2` | exponential | Custom retry delay: `(Attempt, Error) -> Seconds` |
| `rate_limit/0` | — | Seki rate limiter config |
| `concurrency/0` | — | Max concurrent executions per node (seki bulkhead) |
| `global_concurrency/0` | — | Max concurrent executions across all nodes |
| `circuit_breaker/0` | — | Per-worker circuit breaker thresholds |
| `middleware/0` | `[]` | Worker-specific middleware chain |
| `on_discard/2` | — | Called when a job is permanently discarded |
## Configuration
| Option | Default | Description |
|--------|---------|-------------|
| `pool` | *required* | pgo pool name |
| `queues` | `[{<<"default">>, 10}]` | Queue names and concurrency limits |
| `poll_interval` | `5000` | Milliseconds between polling |
| `cron` | `[]` | `{Name, Schedule, Worker, Args}` tuples |
| `prune_after_days` | `14` | Days to keep completed/discarded jobs |
| `shutdown_timeout` | `15000` | Milliseconds to wait for in-flight jobs |
| `middleware` | `[]` | Global middleware chain |
| `encryption_key` | — | 32-byte AES-256-GCM key |
| `encryption_keys` | `[]` | Ordered key list for rotation (newest first) |
| `heartbeat_interval` | `30000` | Stale job detection interval |
| `load_shedding` | — | CoDel config map for seki |
| `queue_weights` | `#{}` | Weighted polling distribution |
| `fair_queues` | `[]` | Queues using partition-key fair claiming |
| `notifier` | — | LISTEN/NOTIFY connection config |
## Supervision Tree
```
shigoto_sup (one_for_one)
├─ shigoto_executor_sup — simple_one_for_one for job execution
├─ shigoto_queue_sup — one gen_server per queue
│ ├─ shigoto_queue:default
│ └─ shigoto_queue:emails
├─ shigoto_cron — cron scheduling with leader election
├─ shigoto_pruner — hourly archival and cleanup
├─ shigoto_heartbeat — periodic heartbeat updates
└─ shigoto_notifier — LISTEN/NOTIFY (optional)
```
## Guides
- [Getting Started](guides/getting-started.md)
- [Workers](guides/workers.md)
- [Cron Scheduling](guides/cron.md)
- [Batches](guides/batches.md)
- [Middleware](guides/middleware.md)
- [Resilience](guides/resilience.md)
- [Job Dependencies](guides/dependencies.md)
- [Encryption](guides/encryption.md)
- [Testing](guides/testing.md)
## Ecosystem
- [shigoto_board](https://github.com/Taure/shigoto_board) — Real-time Arizona LiveView dashboard
- [opentelemetry_shigoto](https://github.com/Taure/opentelemetry_shigoto) — OpenTelemetry instrumentation
## Requirements
- Erlang/OTP 27+
- PostgreSQL 9.5+
## License
MIT