# PgFlow
[](https://hex.pm/packages/pgflow)
[](https://hexdocs.pm/pgflow)
[](https://github.com/agoodway/pgflow/blob/main/LICENSE)
> Workflows, background jobs and cron in Elixir and Postgres powered by PGMQ.
A native Elixir implementation of [pgflow](https://pgflow.dev) — a PostgreSQL-based workflow engine built on [pgmq](https://github.com/pgmq/pgmq). Define multi-step DAG workflows ("flows"), one-off background jobs ("jobs"), and scheduled cron jobs and flows — all backed by the same PostgreSQL queuing infrastructure with retries, visibility timeouts, and delivery guarantees. Built on OTP with supervised workers, adaptive backoff polling, and an optional LISTEN/NOTIFY strategy for low-latency task dispatch. Compatible with the TypeScript/Deno [pgflow](https://github.com/pgflow-dev/pgflow) project, sharing the same database schema and SQL functions.
## Why PgFlow?
- **No extra infrastructure** — Runs entirely in PostgreSQL using pgmq. No Redis, no external queue service.
- **Queryable state** — All workflow state lives in SQL tables. Debug with `SELECT * FROM pgflow.runs`.
- **Automatic retries** — Failed steps retry with exponential backoff. Only failed steps retry, not the whole workflow.
- **Parallel processing** — Steps run concurrently when dependencies allow. Fan-out with `map` for array processing.
- **Cross-language** — Same flows can be processed by Elixir or Deno (Supabase) workers side-by-side.
## Further Reading
- [COMPARISON.md](docs/COMPARISON.md) — PgFlow vs Oban, Broadway, Temporal, Inngest, and others
- [ELIXIR_VS_SUPABASE.md](docs/ELIXIR_VS_SUPABASE.md) — Elixir vs Deno/TypeScript (Supabase) implementation
- [ARCHITECTURE.md](docs/ARCHITECTURE.md) — OTP supervision tree, worker model, and internals
- [LIVE_CLIENT.md](docs/LIVE_CLIENT.md) — LiveView integration for real-time flow/job tracking
## Prerequisites
- Elixir 1.17+
- PostgreSQL 17+ with:
- **pgmq** — pgflow's queue backbone. The `mix pgflow.gen.pgmq_migration` task installs it via SQL (works on Neon, self-hosted, and any plain Postgres). Skip this step if your environment already ships pgmq (e.g. Supabase projects or managed services where pgmq is pre-enabled).
- **pg_cron** (only for cron-scheduled flows/jobs) — requires two server-level settings before `CREATE EXTENSION pg_cron` will succeed:
- `shared_preload_libraries = 'pg_cron'` (requires a Postgres restart)
- `cron.database_name = '<your_app_db>'` — pg_cron's metadata lives in exactly one DB; defaults to `postgres`
Supported on Neon, AWS RDS (PG 12.5+), Aurora (PG 12.6+), and Supabase — each with its own setup path (parameter groups, API calls, etc. — check your host's docs). If your host doesn't support pg_cron, generate extensions with `mix pgflow.gen.postgres_extensions_migration --no-cron` — cron-scheduled flows/jobs become unavailable but the rest of pgflow works.
- Standard extensions: `citext`, `pg_trgm`, `pgcrypto`.
- An Ecto repository
The provided Docker setup (Postgres 17) includes all extensions pre-configured.
## Installation
Add `pgflow` to your dependencies in `mix.exs`:
```elixir
def deps do
[
{:pgflow, "~> 0.1.0"}
]
end
```
Then fetch dependencies:
```bash
mix deps.get
```
## Quick Start
### 1. Database Setup
**For development**, use the provided Docker Compose which builds a pre-configured Postgres image:
```bash
docker compose up -d
```
This builds a Postgres 17 image with pgmq, pg_cron, and the pgflow schema pre-loaded. Database available at `localhost:54322` (user: `postgres`, password: `postgres`, database: `pgflow_test`).
**Resetting the database:** The pgflow schema is loaded by the Docker init script on first container creation only. If you drop the database (e.g. `mix ecto.reset`), you must re-apply it:
```bash
# Destroy the Docker volume and start fresh
docker compose down -v && docker compose up -d
```
**For your application**, generate consumer migrations using the setup tasks. Each writes one wrapper migration into your app's `priv/repo/migrations/`:
```bash
# 1. Install required Postgres extensions (citext, pg_trgm, pgcrypto, pg_cron).
# Pass `--no-cron` if pg_cron isn't available on your host.
mix pgflow.gen.postgres_extensions_migration
# 2. Install pgmq (unless your Postgres already provides it as an extension —
# e.g. Supabase. On most hosts pgmq isn't native; use this task).
mix pgflow.gen.pgmq_migration
# 3. Install the pgflow schema + Elixir helper functions. Add `--dashboard`
# to also install the LiveView dashboard schema. `--no-helpers` skips
# the Elixir-binding SQL helpers (only useful if you're using a
# different client).
mix pgflow.setup
# 4. Apply everything.
mix ecto.migrate
```
The generated `setup_pgflow.exs` migration just calls `PgFlow.Migration.up/0`
and `PgFlow.HelpersMigration.up/0` — new pgflow releases bump the vendored
SQL, not your migration list.
### 2. Define a Flow
```elixir
defmodule MyApp.Flows.ProcessOrder do
use PgFlow.Flow
@flow queue: :process_order, max_attempts: 3, base_delay: 5, timeout: 60
step :validate do
fn input, _ctx ->
%{order_id: input["order_id"], valid: true}
end
end
step :charge_payment, depends_on: [:validate] do
fn deps, _ctx ->
%{charged: true, amount: deps["validate"]["amount"]}
end
end
step :send_confirmation, depends_on: [:charge_payment] do
fn deps, _ctx ->
%{sent: true}
end
end
end
```
See `PgFlow.Flow` moduledocs for the full DSL reference (step options, map macro, handler input, error handling).
### 3. Compile the Flow to Database
Before workers can process a flow, it must be "compiled" into the database. This creates the flow record, PGMQ queue, and step definitions:
```bash
mix pgflow.gen.flow_migration MyApp.Flows.ProcessOrder
mix ecto.migrate
```
> **Note:** If you start a worker for a flow that hasn't been compiled, you'll get a helpful error message with the exact command to run.
### 4. Configure and Start
```elixir
# config/config.exs
config :my_app, MyApp.PgFlow,
repo: MyApp.Repo,
flows: [MyApp.Flows.ProcessOrder],
signal_strategy: :notify # use LISTEN/NOTIFY for low-latency (requires pgmq 1.8+)
```
All options have sensible defaults — only `repo` is required. See `PgFlow.Config` for the full list (concurrency, batch size, poll intervals, recovery, etc.).
```elixir
# lib/my_app/application.ex
def start(_type, _args) do
children = [
MyApp.Repo,
{PgFlow, Application.fetch_env!(:my_app, MyApp.PgFlow)}
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
```
### 5. Trigger a Flow
```elixir
# Async — returns immediately with run_id
{:ok, run_id} = PgFlow.start_flow(MyApp.Flows.ProcessOrder, %{"order_id" => 123})
# Sync — waits for completion (with optional timeout)
{:ok, run} = PgFlow.start_flow_sync(:process_order, %{"order_id" => 123}, timeout: 30_000)
# Check run status
{:ok, run} = PgFlow.get_run(run_id)
{:ok, run} = PgFlow.get_run_with_states(run_id)
```
## Background Jobs
PgFlow supports simple background jobs — one-off tasks like sending emails or processing webhooks. Jobs are single-step flows under the hood, reusing the same queuing infrastructure, retries, and dashboard visibility.
```elixir
defmodule MyApp.Jobs.SendEmail do
use PgFlow.Job
@job queue: :send_email, max_attempts: 5, base_delay: 10, timeout: 120
perform :deliver do
fn input, _ctx ->
Mailer.send(input["to"], input["subject"], input["body"])
%{sent: true}
end
end
end
```
The step name in `perform :deliver do` is optional — when omitted, it defaults to the `@job` queue/slug value.
See `PgFlow.Job` moduledocs for the full options reference.
```bash
# Compile to database
mix pgflow.gen.job_migration MyApp.Jobs.SendEmail
mix ecto.migrate
```
```elixir
# Enqueue a job
{:ok, run_id} = PgFlow.enqueue(MyApp.Jobs.SendEmail, %{"to" => "user@example.com", "subject" => "Hello"})
```
## Cron Scheduling
Both flows and jobs support cron scheduling via [pg_cron](https://github.com/citusdata/pg_cron). Add a `cron` option to run on a schedule:
```elixir
@flow queue: :daily_report, cron: [schedule: "0 9 * * *", input: %{"type" => "daily"}]
```
```elixir
@job queue: :cleanup, cron: [schedule: "@hourly"]
```
The cron schedule SQL is generated automatically when you run `mix pgflow.gen.flow_migration` or `mix pgflow.gen.job_migration` and migrate.
## Mix Tasks
### One-time setup
Run these once when adding pgflow to a project. Migrations are applied via `mix ecto.migrate`.
| Task | Description |
|--------------------------------------------------|-------------------------------------------------------------|
| `mix pgflow.gen.postgres_extensions_migration` | Migration: citext, pg_trgm, pgcrypto, pg_cron |
| `mix pgflow.gen.pgmq_migration` | Migration: pgmq via SQL-only install |
| `mix pgflow.setup` | Wrapper migration: core schema + helpers |
| `mix pgflow.gen.helpers_migration` | Migration: Elixir helpers standalone (setup bundles these) |
| `mix pgflow.stamp` | Adopt an existing pgflow schema into EctoEvolver tracking |
### Per-flow / per-job
Run once per flow or job module. Each generates a migration that compiles the flow/job definition into the database.
| Task | Description |
|--------------------------------------------------|-------------------------------------------------------------|
| `mix pgflow.gen.flow_migration MyApp.Flow` | Generate migration to compile flow to database |
| `mix pgflow.gen.job_migration MyApp.Job` | Generate migration to compile job to database |
### Verification & test DB
| Task | Description |
|--------------------------------------------------|-------------------------------------------------------------|
| `mix pgflow.check_schema` | Verify pgflow database schema compatibility |
| `mix pgflow.test.setup` | Set up test database |
| `mix pgflow.test.reset` | Reset test database (teardown + setup) |
| `mix pgflow.test.teardown` | Tear down test database |
## Dashboard
PgFlow includes an optional Phoenix LiveView dashboard for monitoring workflows, jobs, workers, and cron schedules in real-time.
See [DASHBOARD.md](docs/DASHBOARD.md) for installation instructions.
## LiveView Integration
`PgFlow.LiveClient` provides a LiveView-native client for tracking flow and job runs in real-time. It manages PubSub subscriptions and applies incremental updates to `%Run{}` structs in your socket assigns:
```elixir
defmodule MyAppWeb.OrderLive do
use MyAppWeb, :live_view
alias PgFlow.LiveClient
def mount(_params, _session, socket) do
{:ok, LiveClient.init(socket, pubsub: MyApp.PubSub)}
end
def handle_event("process", params, socket) do
{:ok, socket} = LiveClient.start_flow(socket, :process_order, params)
{:noreply, socket}
end
def handle_info({:pgflow, _, _} = msg, socket) do
{:noreply, LiveClient.handle_info(msg, socket)}
end
end
```
Requires the `:pubsub` option in your PgFlow config. See [LIVE_CLIENT.md](docs/LIVE_CLIENT.md) for the full API, multiple run tracking, and struct reference.
## Demo App
See [`demo/README.md`](demo/README.md) for a Phoenix LiveView application demonstrating PgFlow with real-time flow visualization.
## Telemetry
PgFlow emits `:telemetry` events across worker, poll, task, and run lifecycles for monitoring and metrics collection. See `PgFlow.Telemetry` moduledocs for event names, measurements, and metadata.
## Testing
```bash
# Start the database (same as Quick Start step 1)
docker compose up -d
# Run tests
mix test
```
## Compatibility with PgFlow TypeScript/Deno
This Elixir implementation is compatible with the TypeScript/Deno version — same PostgreSQL schema, same SQL functions, same PGMQ message format. Workers can run side-by-side. See [ELIXIR_VS_SUPABASE.md](docs/ELIXIR_VS_SUPABASE.md) for a detailed comparison and schema divergences.
## License
MIT