-- Generated by `mix pgflow.sync_upstream` — DO NOT EDIT BY HAND.
-- Source: pgflow-dev/pgflow@5c132f3cd0c220cb4213fe12ef6ac1799f219a77
-- Generated: 2026-04-23T16:29:27.934506Z
CREATE SCHEMA IF NOT EXISTS $SCHEMA$
--SPLIT--
-- source: 20250429164909_pgflow_initial.sql
-- Add new schema named "pgflow"
CREATE SCHEMA IF NOT EXISTS "pgflow"
--SPLIT--
-- Create "read_with_poll" function
CREATE FUNCTION "pgflow"."read_with_poll" ("queue_name" text, "vt" integer, "qty" integer, "max_poll_seconds" integer DEFAULT 5, "poll_interval_ms" integer DEFAULT 100, "conditional" jsonb DEFAULT '{}') RETURNS SETOF pgmq.message_record LANGUAGE plpgsql AS $$
DECLARE
r pgmq.message_record;
stop_at TIMESTAMP;
sql TEXT;
qtable TEXT := pgmq.format_table_name(queue_name, 'q');
BEGIN
stop_at := clock_timestamp() + make_interval(secs => max_poll_seconds);
LOOP
IF (SELECT clock_timestamp() >= stop_at) THEN
RETURN;
END IF;
sql := FORMAT(
$QUERY$
WITH cte AS
(
SELECT msg_id
FROM pgmq.%I
WHERE vt <= clock_timestamp() AND CASE
WHEN %L != '{}'::jsonb THEN (message @> %2$L)::integer
ELSE 1
END = 1
ORDER BY msg_id ASC
LIMIT $1
FOR UPDATE SKIP LOCKED
)
UPDATE pgmq.%I m
SET
vt = clock_timestamp() + %L,
read_ct = read_ct + 1
FROM cte
WHERE m.msg_id = cte.msg_id
RETURNING m.msg_id, m.read_ct, m.enqueued_at, m.vt, m.message;
$QUERY$,
qtable, conditional, qtable, make_interval(secs => vt)
);
FOR r IN
EXECUTE sql USING qty
LOOP
RETURN NEXT r;
END LOOP;
IF FOUND THEN
RETURN;
ELSE
PERFORM pg_sleep(poll_interval_ms::numeric / 1000);
END IF;
END LOOP;
END;
$$
--SPLIT--
-- Create composite type "step_task_record"
CREATE TYPE "pgflow"."step_task_record" AS ("flow_slug" text, "run_id" uuid, "step_slug" text, "input" jsonb, "msg_id" bigint)
--SPLIT--
-- Create "is_valid_slug" function
CREATE FUNCTION "pgflow"."is_valid_slug" ("slug" text) RETURNS boolean LANGUAGE plpgsql IMMUTABLE AS $$
begin
return
slug is not null
and slug <> ''
and length(slug) <= 128
and slug ~ '^[a-zA-Z_][a-zA-Z0-9_]*$'
and slug NOT IN ('run'); -- reserved words
end;
$$
--SPLIT--
-- Create "flows" table
CREATE TABLE "pgflow"."flows" ("flow_slug" text NOT NULL, "opt_max_attempts" integer NOT NULL DEFAULT 3, "opt_base_delay" integer NOT NULL DEFAULT 1, "opt_timeout" integer NOT NULL DEFAULT 60, "created_at" timestamptz NOT NULL DEFAULT now(), PRIMARY KEY ("flow_slug"), CONSTRAINT "opt_base_delay_is_nonnegative" CHECK (opt_base_delay >= 0), CONSTRAINT "opt_max_attempts_is_nonnegative" CHECK (opt_max_attempts >= 0), CONSTRAINT "opt_timeout_is_positive" CHECK (opt_timeout > 0), CONSTRAINT "slug_is_valid" CHECK (pgflow.is_valid_slug(flow_slug)))
--SPLIT--
-- Create "steps" table
CREATE TABLE "pgflow"."steps" ("flow_slug" text NOT NULL, "step_slug" text NOT NULL, "step_type" text NOT NULL DEFAULT 'single', "step_index" integer NOT NULL DEFAULT 0, "deps_count" integer NOT NULL DEFAULT 0, "opt_max_attempts" integer NULL, "opt_base_delay" integer NULL, "opt_timeout" integer NULL, "created_at" timestamptz NOT NULL DEFAULT now(), PRIMARY KEY ("flow_slug", "step_slug"), CONSTRAINT "steps_flow_slug_step_index_key" UNIQUE ("flow_slug", "step_index"), CONSTRAINT "steps_flow_slug_fkey" FOREIGN KEY ("flow_slug") REFERENCES "pgflow"."flows" ("flow_slug") ON UPDATE NO ACTION ON DELETE NO ACTION, CONSTRAINT "opt_base_delay_is_nonnegative" CHECK ((opt_base_delay IS NULL) OR (opt_base_delay >= 0)), CONSTRAINT "opt_max_attempts_is_nonnegative" CHECK ((opt_max_attempts IS NULL) OR (opt_max_attempts >= 0)), CONSTRAINT "opt_timeout_is_positive" CHECK ((opt_timeout IS NULL) OR (opt_timeout > 0)), CONSTRAINT "steps_deps_count_check" CHECK (deps_count >= 0), CONSTRAINT "steps_step_slug_check" CHECK (pgflow.is_valid_slug(step_slug)), CONSTRAINT "steps_step_type_check" CHECK (step_type = 'single'::text))
--SPLIT--
-- Create "deps" table
CREATE TABLE "pgflow"."deps" ("flow_slug" text NOT NULL, "dep_slug" text NOT NULL, "step_slug" text NOT NULL, "created_at" timestamptz NOT NULL DEFAULT now(), PRIMARY KEY ("flow_slug", "dep_slug", "step_slug"), CONSTRAINT "deps_flow_slug_dep_slug_fkey" FOREIGN KEY ("flow_slug", "dep_slug") REFERENCES "pgflow"."steps" ("flow_slug", "step_slug") ON UPDATE NO ACTION ON DELETE NO ACTION, CONSTRAINT "deps_flow_slug_fkey" FOREIGN KEY ("flow_slug") REFERENCES "pgflow"."flows" ("flow_slug") ON UPDATE NO ACTION ON DELETE NO ACTION, CONSTRAINT "deps_flow_slug_step_slug_fkey" FOREIGN KEY ("flow_slug", "step_slug") REFERENCES "pgflow"."steps" ("flow_slug", "step_slug") ON UPDATE NO ACTION ON DELETE NO ACTION, CONSTRAINT "deps_check" CHECK (dep_slug <> step_slug))
--SPLIT--
-- Create index "idx_deps_by_flow_dep" to table: "deps"
CREATE INDEX "idx_deps_by_flow_dep" ON "pgflow"."deps" ("flow_slug", "dep_slug")
--SPLIT--
-- Create index "idx_deps_by_flow_step" to table: "deps"
CREATE INDEX "idx_deps_by_flow_step" ON "pgflow"."deps" ("flow_slug", "step_slug")
--SPLIT--
-- Create "runs" table
CREATE TABLE "pgflow"."runs" ("run_id" uuid NOT NULL DEFAULT gen_random_uuid(), "flow_slug" text NOT NULL, "status" text NOT NULL DEFAULT 'started', "input" jsonb NOT NULL, "output" jsonb NULL, "remaining_steps" integer NOT NULL DEFAULT 0, "started_at" timestamptz NOT NULL DEFAULT now(), "completed_at" timestamptz NULL, "failed_at" timestamptz NULL, PRIMARY KEY ("run_id"), CONSTRAINT "runs_flow_slug_fkey" FOREIGN KEY ("flow_slug") REFERENCES "pgflow"."flows" ("flow_slug") ON UPDATE NO ACTION ON DELETE NO ACTION, CONSTRAINT "completed_at_is_after_started_at" CHECK ((completed_at IS NULL) OR (completed_at >= started_at)), CONSTRAINT "completed_at_or_failed_at" CHECK (NOT ((completed_at IS NOT NULL) AND (failed_at IS NOT NULL))), CONSTRAINT "failed_at_is_after_started_at" CHECK ((failed_at IS NULL) OR (failed_at >= started_at)), CONSTRAINT "runs_remaining_steps_check" CHECK (remaining_steps >= 0), CONSTRAINT "status_is_valid" CHECK (status = ANY (ARRAY['started'::text, 'failed'::text, 'completed'::text])))
--SPLIT--
-- Create index "idx_runs_flow_slug" to table: "runs"
CREATE INDEX "idx_runs_flow_slug" ON "pgflow"."runs" ("flow_slug")
--SPLIT--
-- Create index "idx_runs_status" to table: "runs"
CREATE INDEX "idx_runs_status" ON "pgflow"."runs" ("status")
--SPLIT--
-- Create "step_states" table
CREATE TABLE "pgflow"."step_states" ("flow_slug" text NOT NULL, "run_id" uuid NOT NULL, "step_slug" text NOT NULL, "status" text NOT NULL DEFAULT 'created', "remaining_tasks" integer NOT NULL DEFAULT 1, "remaining_deps" integer NOT NULL DEFAULT 0, "created_at" timestamptz NOT NULL DEFAULT now(), "started_at" timestamptz NULL, "completed_at" timestamptz NULL, "failed_at" timestamptz NULL, PRIMARY KEY ("run_id", "step_slug"), CONSTRAINT "step_states_flow_slug_fkey" FOREIGN KEY ("flow_slug") REFERENCES "pgflow"."flows" ("flow_slug") ON UPDATE NO ACTION ON DELETE NO ACTION, CONSTRAINT "step_states_flow_slug_step_slug_fkey" FOREIGN KEY ("flow_slug", "step_slug") REFERENCES "pgflow"."steps" ("flow_slug", "step_slug") ON UPDATE NO ACTION ON DELETE NO ACTION, CONSTRAINT "step_states_run_id_fkey" FOREIGN KEY ("run_id") REFERENCES "pgflow"."runs" ("run_id") ON UPDATE NO ACTION ON DELETE NO ACTION, CONSTRAINT "completed_at_is_after_started_at" CHECK ((completed_at IS NULL) OR (completed_at >= started_at)), CONSTRAINT "completed_at_or_failed_at" CHECK (NOT ((completed_at IS NOT NULL) AND (failed_at IS NOT NULL))), CONSTRAINT "failed_at_is_after_started_at" CHECK ((failed_at IS NULL) OR (failed_at >= started_at)), CONSTRAINT "started_at_is_after_created_at" CHECK ((started_at IS NULL) OR (started_at >= created_at)), CONSTRAINT "status_and_remaining_tasks_match" CHECK ((status <> 'completed'::text) OR (remaining_tasks = 0)), CONSTRAINT "status_is_valid" CHECK (status = ANY (ARRAY['created'::text, 'started'::text, 'completed'::text, 'failed'::text])), CONSTRAINT "step_states_remaining_deps_check" CHECK (remaining_deps >= 0), CONSTRAINT "step_states_remaining_tasks_check" CHECK (remaining_tasks >= 0))
--SPLIT--
-- Create index "idx_step_states_failed" to table: "step_states"
CREATE INDEX "idx_step_states_failed" ON "pgflow"."step_states" ("run_id", "step_slug") WHERE (status = 'failed'::text)
--SPLIT--
-- Create index "idx_step_states_flow_slug" to table: "step_states"
CREATE INDEX "idx_step_states_flow_slug" ON "pgflow"."step_states" ("flow_slug")
--SPLIT--
-- Create index "idx_step_states_ready" to table: "step_states"
CREATE INDEX "idx_step_states_ready" ON "pgflow"."step_states" ("run_id", "status", "remaining_deps") WHERE ((status = 'created'::text) AND (remaining_deps = 0))
--SPLIT--
-- Create "step_tasks" table
CREATE TABLE "pgflow"."step_tasks" ("flow_slug" text NOT NULL, "run_id" uuid NOT NULL, "step_slug" text NOT NULL, "message_id" bigint NULL, "task_index" integer NOT NULL DEFAULT 0, "status" text NOT NULL DEFAULT 'queued', "attempts_count" integer NOT NULL DEFAULT 0, "error_message" text NULL, "output" jsonb NULL, "queued_at" timestamptz NOT NULL DEFAULT now(), "completed_at" timestamptz NULL, "failed_at" timestamptz NULL, PRIMARY KEY ("run_id", "step_slug", "task_index"), CONSTRAINT "step_tasks_flow_slug_fkey" FOREIGN KEY ("flow_slug") REFERENCES "pgflow"."flows" ("flow_slug") ON UPDATE NO ACTION ON DELETE NO ACTION, CONSTRAINT "step_tasks_run_id_fkey" FOREIGN KEY ("run_id") REFERENCES "pgflow"."runs" ("run_id") ON UPDATE NO ACTION ON DELETE NO ACTION, CONSTRAINT "step_tasks_run_id_step_slug_fkey" FOREIGN KEY ("run_id", "step_slug") REFERENCES "pgflow"."step_states" ("run_id", "step_slug") ON UPDATE NO ACTION ON DELETE NO ACTION, CONSTRAINT "attempts_count_nonnegative" CHECK (attempts_count >= 0), CONSTRAINT "completed_at_is_after_queued_at" CHECK ((completed_at IS NULL) OR (completed_at >= queued_at)), CONSTRAINT "completed_at_or_failed_at" CHECK (NOT ((completed_at IS NOT NULL) AND (failed_at IS NOT NULL))), CONSTRAINT "failed_at_is_after_queued_at" CHECK ((failed_at IS NULL) OR (failed_at >= queued_at)), CONSTRAINT "only_single_task_per_step" CHECK (task_index = 0), CONSTRAINT "output_valid_only_for_completed" CHECK ((output IS NULL) OR (status = 'completed'::text)), CONSTRAINT "valid_status" CHECK (status = ANY (ARRAY['queued'::text, 'completed'::text, 'failed'::text])))
--SPLIT--
-- Create index "idx_step_tasks_completed" to table: "step_tasks"
CREATE INDEX "idx_step_tasks_completed" ON "pgflow"."step_tasks" ("run_id", "step_slug") WHERE (status = 'completed'::text)
--SPLIT--
-- Create index "idx_step_tasks_failed" to table: "step_tasks"
CREATE INDEX "idx_step_tasks_failed" ON "pgflow"."step_tasks" ("run_id", "step_slug") WHERE (status = 'failed'::text)
--SPLIT--
-- Create index "idx_step_tasks_flow_run_step" to table: "step_tasks"
CREATE INDEX "idx_step_tasks_flow_run_step" ON "pgflow"."step_tasks" ("flow_slug", "run_id", "step_slug")
--SPLIT--
-- Create index "idx_step_tasks_message_id" to table: "step_tasks"
CREATE INDEX "idx_step_tasks_message_id" ON "pgflow"."step_tasks" ("message_id")
--SPLIT--
-- Create index "idx_step_tasks_queued" to table: "step_tasks"
CREATE INDEX "idx_step_tasks_queued" ON "pgflow"."step_tasks" ("run_id", "step_slug") WHERE (status = 'queued'::text)
--SPLIT--
-- Create "poll_for_tasks" function
CREATE FUNCTION "pgflow"."poll_for_tasks" ("queue_name" text, "vt" integer, "qty" integer, "max_poll_seconds" integer DEFAULT 5, "poll_interval_ms" integer DEFAULT 100) RETURNS SETOF "pgflow"."step_task_record" LANGUAGE sql SET "search_path" = '' AS $$
with read_messages as (
select *
from pgflow.read_with_poll(
queue_name,
vt,
qty,
max_poll_seconds,
poll_interval_ms
)
),
tasks as (
select
task.flow_slug,
task.run_id,
task.step_slug,
task.task_index,
task.message_id
from pgflow.step_tasks as task
join read_messages as message on message.msg_id = task.message_id
where task.message_id = message.msg_id
and task.status = 'queued'
),
increment_attempts as (
update pgflow.step_tasks
set attempts_count = attempts_count + 1
from tasks
where step_tasks.message_id = tasks.message_id
and status = 'queued'
),
runs as (
select
r.run_id,
r.input
from pgflow.runs r
where r.run_id in (select run_id from tasks)
),
deps as (
select
st.run_id,
st.step_slug,
dep.dep_slug,
dep_task.output as dep_output
from tasks st
join pgflow.deps dep on dep.flow_slug = st.flow_slug and dep.step_slug = st.step_slug
join pgflow.step_tasks dep_task on
dep_task.run_id = st.run_id and
dep_task.step_slug = dep.dep_slug and
dep_task.status = 'completed'
),
deps_outputs as (
select
d.run_id,
d.step_slug,
jsonb_object_agg(d.dep_slug, d.dep_output) as deps_output
from deps d
group by d.run_id, d.step_slug
),
timeouts as (
select
task.message_id,
coalesce(step.opt_timeout, flow.opt_timeout) + 2 as vt_delay
from tasks task
join pgflow.flows flow on flow.flow_slug = task.flow_slug
join pgflow.steps step on step.flow_slug = task.flow_slug and step.step_slug = task.step_slug
)
select
st.flow_slug,
st.run_id,
st.step_slug,
jsonb_build_object('run', r.input) ||
coalesce(dep_out.deps_output, '{}'::jsonb) as input,
st.message_id as msg_id
from tasks st
join runs r on st.run_id = r.run_id
left join deps_outputs dep_out on
dep_out.run_id = st.run_id and
dep_out.step_slug = st.step_slug
cross join lateral (
-- TODO: this is slow because it calls set_vt for each row, and set_vt
-- builds dynamic query from string every time it is called
-- implement set_vt_batch(msgs_ids bigint[], vt_delays int[])
select pgmq.set_vt(queue_name, st.message_id,
(select t.vt_delay from timeouts t where t.message_id = st.message_id)
)
) set_vt;
$$
--SPLIT--
-- Create "add_step" function
CREATE FUNCTION "pgflow"."add_step" ("flow_slug" text, "step_slug" text, "deps_slugs" text[], "max_attempts" integer DEFAULT NULL::integer, "base_delay" integer DEFAULT NULL::integer, "timeout" integer DEFAULT NULL::integer) RETURNS "pgflow"."steps" LANGUAGE sql SET "search_path" = '' AS $$
WITH
next_index AS (
SELECT COALESCE(MAX(step_index) + 1, 0) as idx
FROM pgflow.steps
WHERE flow_slug = add_step.flow_slug
),
create_step AS (
INSERT INTO pgflow.steps (flow_slug, step_slug, step_index, deps_count, opt_max_attempts, opt_base_delay, opt_timeout)
SELECT add_step.flow_slug, add_step.step_slug, idx, COALESCE(array_length(deps_slugs, 1), 0), max_attempts, base_delay, timeout
FROM next_index
ON CONFLICT (flow_slug, step_slug)
DO UPDATE SET step_slug = pgflow.steps.step_slug
RETURNING *
),
insert_deps AS (
INSERT INTO pgflow.deps (flow_slug, dep_slug, step_slug)
SELECT add_step.flow_slug, d.dep_slug, add_step.step_slug
FROM unnest(deps_slugs) AS d(dep_slug)
ON CONFLICT (flow_slug, dep_slug, step_slug) DO NOTHING
RETURNING 1
)
-- Return the created step
SELECT * FROM create_step;
$$
--SPLIT--
-- Create "add_step" function
CREATE FUNCTION "pgflow"."add_step" ("flow_slug" text, "step_slug" text, "max_attempts" integer DEFAULT NULL::integer, "base_delay" integer DEFAULT NULL::integer, "timeout" integer DEFAULT NULL::integer) RETURNS "pgflow"."steps" LANGUAGE sql SET "search_path" = '' AS $$
-- Call the original function with an empty array
SELECT * FROM pgflow.add_step(flow_slug, step_slug, ARRAY[]::text[], max_attempts, base_delay, timeout);
$$
--SPLIT--
-- Create "calculate_retry_delay" function
CREATE FUNCTION "pgflow"."calculate_retry_delay" ("base_delay" numeric, "attempts_count" integer) RETURNS integer LANGUAGE sql IMMUTABLE PARALLEL SAFE AS $$ select floor(base_delay * power(2, attempts_count))::int $$
--SPLIT--
-- Create "maybe_complete_run" function
CREATE FUNCTION "pgflow"."maybe_complete_run" ("run_id" uuid) RETURNS void LANGUAGE sql SET "search_path" = '' AS $$
-- Update run status to completed and set output when there are no remaining steps
-- All done in a single declarative SQL statement
UPDATE pgflow.runs
SET
status = 'completed',
completed_at = now(),
output = (
-- Get outputs from final steps (steps that are not dependencies for other steps)
SELECT jsonb_object_agg(st.step_slug, st.output)
FROM pgflow.step_tasks st
JOIN pgflow.step_states ss ON ss.run_id = st.run_id AND ss.step_slug = st.step_slug
JOIN pgflow.runs r ON r.run_id = ss.run_id AND r.flow_slug = ss.flow_slug
WHERE st.run_id = maybe_complete_run.run_id
AND st.status = 'completed'
AND NOT EXISTS (
SELECT 1
FROM pgflow.deps d
WHERE d.flow_slug = ss.flow_slug
AND d.dep_slug = ss.step_slug
)
)
WHERE pgflow.runs.run_id = maybe_complete_run.run_id
AND pgflow.runs.remaining_steps = 0
AND pgflow.runs.status != 'completed';
$$
--SPLIT--
-- Create "start_ready_steps" function
CREATE FUNCTION "pgflow"."start_ready_steps" ("run_id" uuid) RETURNS void LANGUAGE sql SET "search_path" = '' AS $$
WITH ready_steps AS (
SELECT *
FROM pgflow.step_states AS step_state
WHERE step_state.run_id = start_ready_steps.run_id
AND step_state.status = 'created'
AND step_state.remaining_deps = 0
ORDER BY step_state.step_slug
FOR UPDATE
),
started_step_states AS (
UPDATE pgflow.step_states
SET status = 'started',
started_at = now()
FROM ready_steps
WHERE pgflow.step_states.run_id = start_ready_steps.run_id
AND pgflow.step_states.step_slug = ready_steps.step_slug
RETURNING pgflow.step_states.*
),
sent_messages AS (
SELECT
started_step.flow_slug,
started_step.run_id,
started_step.step_slug,
pgmq.send(started_step.flow_slug, jsonb_build_object(
'flow_slug', started_step.flow_slug,
'run_id', started_step.run_id,
'step_slug', started_step.step_slug,
'task_index', 0
)) AS msg_id
FROM started_step_states AS started_step
)
INSERT INTO pgflow.step_tasks (flow_slug, run_id, step_slug, message_id)
SELECT
sent_messages.flow_slug,
sent_messages.run_id,
sent_messages.step_slug,
sent_messages.msg_id
FROM sent_messages;
$$
--SPLIT--
-- Create "complete_task" function
CREATE FUNCTION "pgflow"."complete_task" ("run_id" uuid, "step_slug" text, "task_index" integer, "output" jsonb) RETURNS SETOF "pgflow"."step_tasks" LANGUAGE plpgsql SET "search_path" = '' AS $$
begin
WITH run_lock AS (
SELECT * FROM pgflow.runs
WHERE pgflow.runs.run_id = complete_task.run_id
FOR UPDATE
),
step_lock AS (
SELECT * FROM pgflow.step_states
WHERE pgflow.step_states.run_id = complete_task.run_id
AND pgflow.step_states.step_slug = complete_task.step_slug
FOR UPDATE
),
task AS (
UPDATE pgflow.step_tasks
SET
status = 'completed',
completed_at = now(),
output = complete_task.output
WHERE pgflow.step_tasks.run_id = complete_task.run_id
AND pgflow.step_tasks.step_slug = complete_task.step_slug
AND pgflow.step_tasks.task_index = complete_task.task_index
RETURNING *
),
step_state AS (
UPDATE pgflow.step_states
SET
status = CASE
WHEN pgflow.step_states.remaining_tasks = 1 THEN 'completed' -- Will be 0 after decrement
ELSE 'started'
END,
completed_at = CASE
WHEN pgflow.step_states.remaining_tasks = 1 THEN now() -- Will be 0 after decrement
ELSE NULL
END,
remaining_tasks = pgflow.step_states.remaining_tasks - 1
FROM task
WHERE pgflow.step_states.run_id = complete_task.run_id
AND pgflow.step_states.step_slug = complete_task.step_slug
RETURNING pgflow.step_states.*
),
-- Find all dependent steps if the current step was completed
dependent_steps AS (
SELECT d.step_slug AS dependent_step_slug
FROM pgflow.deps d
JOIN step_state s ON s.status = 'completed' AND d.flow_slug = s.flow_slug
WHERE d.dep_slug = complete_task.step_slug
ORDER BY d.step_slug -- Ensure consistent ordering
),
-- Lock dependent steps before updating
dependent_steps_lock AS (
SELECT * FROM pgflow.step_states
WHERE pgflow.step_states.run_id = complete_task.run_id
AND pgflow.step_states.step_slug IN (SELECT dependent_step_slug FROM dependent_steps)
FOR UPDATE
),
-- Update all dependent steps
dependent_steps_update AS (
UPDATE pgflow.step_states
SET remaining_deps = pgflow.step_states.remaining_deps - 1
FROM dependent_steps
WHERE pgflow.step_states.run_id = complete_task.run_id
AND pgflow.step_states.step_slug = dependent_steps.dependent_step_slug
)
-- Only decrement remaining_steps, don't update status
UPDATE pgflow.runs
SET remaining_steps = pgflow.runs.remaining_steps - 1
FROM step_state
WHERE pgflow.runs.run_id = complete_task.run_id
AND step_state.status = 'completed';
PERFORM pgmq.archive(
queue_name => (SELECT run.flow_slug FROM pgflow.runs AS run WHERE run.run_id = complete_task.run_id),
msg_id => (SELECT message_id FROM pgflow.step_tasks AS step_task
WHERE step_task.run_id = complete_task.run_id
AND step_task.step_slug = complete_task.step_slug
AND step_task.task_index = complete_task.task_index)
);
PERFORM pgflow.start_ready_steps(complete_task.run_id);
PERFORM pgflow.maybe_complete_run(complete_task.run_id);
RETURN QUERY SELECT *
FROM pgflow.step_tasks AS step_task
WHERE step_task.run_id = complete_task.run_id
AND step_task.step_slug = complete_task.step_slug
AND step_task.task_index = complete_task.task_index;
end;
$$
--SPLIT--
-- Create "create_flow" function
CREATE FUNCTION "pgflow"."create_flow" ("flow_slug" text, "max_attempts" integer DEFAULT 3, "base_delay" integer DEFAULT 5, "timeout" integer DEFAULT 60) RETURNS "pgflow"."flows" LANGUAGE sql SET "search_path" = '' AS $$
WITH
flow_upsert AS (
INSERT INTO pgflow.flows (flow_slug, opt_max_attempts, opt_base_delay, opt_timeout)
VALUES (flow_slug, max_attempts, base_delay, timeout)
ON CONFLICT (flow_slug) DO UPDATE
SET flow_slug = pgflow.flows.flow_slug -- Dummy update
RETURNING *
),
ensure_queue AS (
SELECT pgmq.create(flow_slug)
WHERE NOT EXISTS (
SELECT 1 FROM pgmq.list_queues() WHERE queue_name = flow_slug
)
)
SELECT f.*
FROM flow_upsert f
LEFT JOIN (SELECT 1 FROM ensure_queue) _dummy ON true; -- Left join ensures flow is returned
$$
--SPLIT--
-- Create "fail_task" function
CREATE FUNCTION "pgflow"."fail_task" ("run_id" uuid, "step_slug" text, "task_index" integer, "error_message" text) RETURNS SETOF "pgflow"."step_tasks" LANGUAGE plpgsql SET "search_path" = '' AS $$
begin
WITH run_lock AS (
SELECT * FROM pgflow.runs
WHERE pgflow.runs.run_id = fail_task.run_id
FOR UPDATE
),
step_lock AS (
SELECT * FROM pgflow.step_states
WHERE pgflow.step_states.run_id = fail_task.run_id
AND pgflow.step_states.step_slug = fail_task.step_slug
FOR UPDATE
),
flow_info AS (
SELECT r.flow_slug
FROM pgflow.runs r
WHERE r.run_id = fail_task.run_id
),
config AS (
SELECT
COALESCE(s.opt_max_attempts, f.opt_max_attempts) AS opt_max_attempts,
COALESCE(s.opt_base_delay, f.opt_base_delay) AS opt_base_delay
FROM pgflow.steps s
JOIN pgflow.flows f ON f.flow_slug = s.flow_slug
JOIN flow_info fi ON fi.flow_slug = s.flow_slug
WHERE s.flow_slug = fi.flow_slug AND s.step_slug = fail_task.step_slug
),
fail_or_retry_task as (
UPDATE pgflow.step_tasks as task
SET
status = CASE
WHEN task.attempts_count < (SELECT opt_max_attempts FROM config) THEN 'queued'
ELSE 'failed'
END,
failed_at = CASE
WHEN task.attempts_count >= (SELECT opt_max_attempts FROM config) THEN now()
ELSE NULL
END,
error_message = fail_task.error_message
WHERE task.run_id = fail_task.run_id
AND task.step_slug = fail_task.step_slug
AND task.task_index = fail_task.task_index
AND task.status = 'queued'
RETURNING *
),
maybe_fail_step AS (
UPDATE pgflow.step_states
SET
status = CASE
WHEN (select fail_or_retry_task.status from fail_or_retry_task) = 'failed' THEN 'failed'
ELSE pgflow.step_states.status
END,
failed_at = CASE
WHEN (select fail_or_retry_task.status from fail_or_retry_task) = 'failed' THEN now()
ELSE NULL
END
FROM fail_or_retry_task
WHERE pgflow.step_states.run_id = fail_task.run_id
AND pgflow.step_states.step_slug = fail_task.step_slug
RETURNING pgflow.step_states.*
)
UPDATE pgflow.runs
SET status = CASE
WHEN (select status from maybe_fail_step) = 'failed' THEN 'failed'
ELSE status
END,
failed_at = CASE
WHEN (select status from maybe_fail_step) = 'failed' THEN now()
ELSE NULL
END
WHERE pgflow.runs.run_id = fail_task.run_id;
-- For queued tasks: delay the message for retry with exponential backoff
PERFORM (
WITH retry_config AS (
SELECT
COALESCE(s.opt_base_delay, f.opt_base_delay) AS base_delay
FROM pgflow.steps s
JOIN pgflow.flows f ON f.flow_slug = s.flow_slug
JOIN pgflow.runs r ON r.flow_slug = f.flow_slug
WHERE r.run_id = fail_task.run_id
AND s.step_slug = fail_task.step_slug
),
queued_tasks AS (
SELECT
r.flow_slug,
st.message_id,
pgflow.calculate_retry_delay((SELECT base_delay FROM retry_config), st.attempts_count) AS calculated_delay
FROM pgflow.step_tasks st
JOIN pgflow.runs r ON st.run_id = r.run_id
WHERE st.run_id = fail_task.run_id
AND st.step_slug = fail_task.step_slug
AND st.task_index = fail_task.task_index
AND st.status = 'queued'
)
SELECT pgmq.set_vt(qt.flow_slug, qt.message_id, qt.calculated_delay)
FROM queued_tasks qt
WHERE EXISTS (SELECT 1 FROM queued_tasks)
);
-- For failed tasks: archive the message
PERFORM (
WITH failed_tasks AS (
SELECT r.flow_slug, st.message_id
FROM pgflow.step_tasks st
JOIN pgflow.runs r ON st.run_id = r.run_id
WHERE st.run_id = fail_task.run_id
AND st.step_slug = fail_task.step_slug
AND st.task_index = fail_task.task_index
AND st.status = 'failed'
)
SELECT pgmq.archive(ft.flow_slug, ft.message_id)
FROM failed_tasks ft
WHERE EXISTS (SELECT 1 FROM failed_tasks)
);
return query select *
from pgflow.step_tasks st
where st.run_id = fail_task.run_id
and st.step_slug = fail_task.step_slug
and st.task_index = fail_task.task_index;
end;
$$
--SPLIT--
-- Create "start_flow" function
CREATE FUNCTION "pgflow"."start_flow" ("flow_slug" text, "input" jsonb) RETURNS SETOF "pgflow"."runs" LANGUAGE plpgsql SET "search_path" = '' AS $$
declare
v_created_run pgflow.runs%ROWTYPE;
begin
WITH
flow_steps AS (
SELECT steps.flow_slug, steps.step_slug, steps.deps_count
FROM pgflow.steps
WHERE steps.flow_slug = start_flow.flow_slug
),
created_run AS (
INSERT INTO pgflow.runs (flow_slug, input, remaining_steps)
VALUES (
start_flow.flow_slug,
start_flow.input,
(SELECT count(*) FROM flow_steps)
)
RETURNING *
),
created_step_states AS (
INSERT INTO pgflow.step_states (flow_slug, run_id, step_slug, remaining_deps)
SELECT
fs.flow_slug,
(SELECT run_id FROM created_run),
fs.step_slug,
fs.deps_count
FROM flow_steps fs
)
SELECT * FROM created_run INTO v_created_run;
PERFORM pgflow.start_ready_steps(v_created_run.run_id);
RETURN QUERY SELECT * FROM pgflow.runs where run_id = v_created_run.run_id;
end;
$$
--SPLIT--
-- Create "workers" table
CREATE TABLE "pgflow"."workers" ("worker_id" uuid NOT NULL, "queue_name" text NOT NULL, "function_name" text NOT NULL, "started_at" timestamptz NOT NULL DEFAULT now(), "stopped_at" timestamptz NULL, "last_heartbeat_at" timestamptz NOT NULL DEFAULT now(), PRIMARY KEY ("worker_id"))
--SPLIT--
-- Create index "idx_workers_queue_name" to table: "workers"
CREATE INDEX "idx_workers_queue_name" ON "pgflow"."workers" ("queue_name")
--SPLIT--
-- source: 20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql
-- Modify "poll_for_tasks" function
CREATE OR REPLACE FUNCTION "pgflow"."poll_for_tasks" ("queue_name" text, "vt" integer, "qty" integer, "max_poll_seconds" integer DEFAULT 5, "poll_interval_ms" integer DEFAULT 100) RETURNS SETOF "pgflow"."step_task_record" LANGUAGE plpgsql SET "search_path" = '' AS $$
declare
msg_ids bigint[];
begin
-- First statement: Read messages and capture their IDs
-- This gets its own snapshot and can see newly committed messages
select array_agg(msg_id)
into msg_ids
from pgflow.read_with_poll(
queue_name,
vt,
qty,
max_poll_seconds,
poll_interval_ms
);
-- If no messages were read, return empty set
if msg_ids is null or array_length(msg_ids, 1) is null then
return;
end if;
-- Second statement: Process tasks with fresh snapshot
-- This can now see step_tasks that were committed during the poll
return query
with tasks as (
select
task.flow_slug,
task.run_id,
task.step_slug,
task.task_index,
task.message_id
from pgflow.step_tasks as task
where task.message_id = any(msg_ids)
and task.status = 'queued'
),
increment_attempts as (
update pgflow.step_tasks
set attempts_count = attempts_count + 1
from tasks
where step_tasks.message_id = tasks.message_id
and status = 'queued'
),
runs as (
select
r.run_id,
r.input
from pgflow.runs r
where r.run_id in (select run_id from tasks)
),
deps as (
select
st.run_id,
st.step_slug,
dep.dep_slug,
dep_task.output as dep_output
from tasks st
join pgflow.deps dep on dep.flow_slug = st.flow_slug and dep.step_slug = st.step_slug
join pgflow.step_tasks dep_task on
dep_task.run_id = st.run_id and
dep_task.step_slug = dep.dep_slug and
dep_task.status = 'completed'
),
deps_outputs as (
select
d.run_id,
d.step_slug,
jsonb_object_agg(d.dep_slug, d.dep_output) as deps_output
from deps d
group by d.run_id, d.step_slug
),
timeouts as (
select
task.message_id,
coalesce(step.opt_timeout, flow.opt_timeout) + 2 as vt_delay
from tasks task
join pgflow.flows flow on flow.flow_slug = task.flow_slug
join pgflow.steps step on step.flow_slug = task.flow_slug and step.step_slug = task.step_slug
)
select
st.flow_slug,
st.run_id,
st.step_slug,
jsonb_build_object('run', r.input) ||
coalesce(dep_out.deps_output, '{}'::jsonb) as input,
st.message_id as msg_id
from tasks st
join runs r on st.run_id = r.run_id
left join deps_outputs dep_out on
dep_out.run_id = st.run_id and
dep_out.step_slug = st.step_slug
cross join lateral (
-- TODO: this is slow because it calls set_vt for each row, and set_vt
-- builds dynamic query from string every time it is called
-- implement set_vt_batch(msgs_ids bigint[], vt_delays int[])
select pgmq.set_vt(queue_name, st.message_id,
(select t.vt_delay from timeouts t where t.message_id = st.message_id)
)
) set_vt;
end;
$$
--SPLIT--
-- source: 20250609105135_pgflow_add_start_tasks_and_started_status.sql
-- Create index "idx_workers_heartbeat" to table: "workers"
create index "idx_workers_heartbeat" on "pgflow"."workers" ("last_heartbeat_at")
--SPLIT--
-- Modify "step_tasks" table
alter table "pgflow"."step_tasks" drop constraint "valid_status",
add constraint "valid_status" check (
status = ANY(array['queued'::text, 'started'::text, 'completed'::text, 'failed'::text])
),
add constraint "completed_at_is_after_started_at" check (
(completed_at is null) or (started_at is null) or (completed_at >= started_at)
),
add constraint "failed_at_is_after_started_at" check (
(failed_at is null) or (started_at is null) or (failed_at >= started_at)
),
add constraint "started_at_is_after_queued_at" check ((started_at is null) or (started_at >= queued_at)),
add column "started_at" timestamptz null,
add column "last_worker_id" uuid null,
add constraint "step_tasks_last_worker_id_fkey" foreign key ("last_worker_id") references "pgflow"."workers" (
"worker_id"
) on update no action on delete set null
--SPLIT--
-- Create index "idx_step_tasks_last_worker" to table: "step_tasks"
create index "idx_step_tasks_last_worker" on "pgflow"."step_tasks" ("last_worker_id") where (status = 'started'::text)
--SPLIT--
-- Create index "idx_step_tasks_queued_msg" to table: "step_tasks"
create index "idx_step_tasks_queued_msg" on "pgflow"."step_tasks" ("message_id") where (status = 'queued'::text)
--SPLIT--
-- Create index "idx_step_tasks_started" to table: "step_tasks"
create index "idx_step_tasks_started" on "pgflow"."step_tasks" ("started_at") where (status = 'started'::text)
--SPLIT--
-- Modify "complete_task" function
create or replace function "pgflow"."complete_task"(
"run_id" uuid, "step_slug" text, "task_index" integer, "output" jsonb
) returns setof "pgflow"."step_tasks" language plpgsql set "search_path"
= '' as $$
begin
WITH run_lock AS (
SELECT * FROM pgflow.runs
WHERE pgflow.runs.run_id = complete_task.run_id
FOR UPDATE
),
step_lock AS (
SELECT * FROM pgflow.step_states
WHERE pgflow.step_states.run_id = complete_task.run_id
AND pgflow.step_states.step_slug = complete_task.step_slug
FOR UPDATE
),
task AS (
UPDATE pgflow.step_tasks
SET
status = 'completed',
completed_at = now(),
output = complete_task.output
WHERE pgflow.step_tasks.run_id = complete_task.run_id
AND pgflow.step_tasks.step_slug = complete_task.step_slug
AND pgflow.step_tasks.task_index = complete_task.task_index
AND pgflow.step_tasks.status = 'started'
RETURNING *
),
step_state AS (
UPDATE pgflow.step_states
SET
status = CASE
WHEN pgflow.step_states.remaining_tasks = 1 THEN 'completed' -- Will be 0 after decrement
ELSE 'started'
END,
completed_at = CASE
WHEN pgflow.step_states.remaining_tasks = 1 THEN now() -- Will be 0 after decrement
ELSE NULL
END,
remaining_tasks = pgflow.step_states.remaining_tasks - 1
FROM task
WHERE pgflow.step_states.run_id = complete_task.run_id
AND pgflow.step_states.step_slug = complete_task.step_slug
RETURNING pgflow.step_states.*
),
-- Find all dependent steps if the current step was completed
dependent_steps AS (
SELECT d.step_slug AS dependent_step_slug
FROM pgflow.deps d
JOIN step_state s ON s.status = 'completed' AND d.flow_slug = s.flow_slug
WHERE d.dep_slug = complete_task.step_slug
ORDER BY d.step_slug -- Ensure consistent ordering
),
-- Lock dependent steps before updating
dependent_steps_lock AS (
SELECT * FROM pgflow.step_states
WHERE pgflow.step_states.run_id = complete_task.run_id
AND pgflow.step_states.step_slug IN (SELECT dependent_step_slug FROM dependent_steps)
FOR UPDATE
),
-- Update all dependent steps
dependent_steps_update AS (
UPDATE pgflow.step_states
SET remaining_deps = pgflow.step_states.remaining_deps - 1
FROM dependent_steps
WHERE pgflow.step_states.run_id = complete_task.run_id
AND pgflow.step_states.step_slug = dependent_steps.dependent_step_slug
)
-- Only decrement remaining_steps, don't update status
UPDATE pgflow.runs
SET remaining_steps = pgflow.runs.remaining_steps - 1
FROM step_state
WHERE pgflow.runs.run_id = complete_task.run_id
AND step_state.status = 'completed';
-- For completed tasks: archive the message
PERFORM (
WITH completed_tasks AS (
SELECT r.flow_slug, st.message_id
FROM pgflow.step_tasks st
JOIN pgflow.runs r ON st.run_id = r.run_id
WHERE st.run_id = complete_task.run_id
AND st.step_slug = complete_task.step_slug
AND st.task_index = complete_task.task_index
AND st.status = 'completed'
)
SELECT pgmq.archive(ct.flow_slug, ct.message_id)
FROM completed_tasks ct
WHERE EXISTS (SELECT 1 FROM completed_tasks)
);
PERFORM pgflow.start_ready_steps(complete_task.run_id);
PERFORM pgflow.maybe_complete_run(complete_task.run_id);
RETURN QUERY SELECT *
FROM pgflow.step_tasks AS step_task
WHERE step_task.run_id = complete_task.run_id
AND step_task.step_slug = complete_task.step_slug
AND step_task.task_index = complete_task.task_index;
end;
$$
--SPLIT--
-- Modify "fail_task" function
create or replace function "pgflow"."fail_task"(
"run_id" uuid, "step_slug" text, "task_index" integer, "error_message" text
) returns setof "pgflow"."step_tasks" language plpgsql set "search_path"
= '' as $$
begin
WITH run_lock AS (
SELECT * FROM pgflow.runs
WHERE pgflow.runs.run_id = fail_task.run_id
FOR UPDATE
),
step_lock AS (
SELECT * FROM pgflow.step_states
WHERE pgflow.step_states.run_id = fail_task.run_id
AND pgflow.step_states.step_slug = fail_task.step_slug
FOR UPDATE
),
flow_info AS (
SELECT r.flow_slug
FROM pgflow.runs r
WHERE r.run_id = fail_task.run_id
),
config AS (
SELECT
COALESCE(s.opt_max_attempts, f.opt_max_attempts) AS opt_max_attempts,
COALESCE(s.opt_base_delay, f.opt_base_delay) AS opt_base_delay
FROM pgflow.steps s
JOIN pgflow.flows f ON f.flow_slug = s.flow_slug
JOIN flow_info fi ON fi.flow_slug = s.flow_slug
WHERE s.flow_slug = fi.flow_slug AND s.step_slug = fail_task.step_slug
),
fail_or_retry_task as (
UPDATE pgflow.step_tasks as task
SET
status = CASE
WHEN task.attempts_count < (SELECT opt_max_attempts FROM config) THEN 'queued'
ELSE 'failed'
END,
failed_at = CASE
WHEN task.attempts_count >= (SELECT opt_max_attempts FROM config) THEN now()
ELSE NULL
END,
started_at = CASE
WHEN task.attempts_count < (SELECT opt_max_attempts FROM config) THEN NULL
ELSE task.started_at
END,
error_message = fail_task.error_message
WHERE task.run_id = fail_task.run_id
AND task.step_slug = fail_task.step_slug
AND task.task_index = fail_task.task_index
AND task.status = 'started'
RETURNING *
),
maybe_fail_step AS (
UPDATE pgflow.step_states
SET
status = CASE
WHEN (select fail_or_retry_task.status from fail_or_retry_task) = 'failed' THEN 'failed'
ELSE pgflow.step_states.status
END,
failed_at = CASE
WHEN (select fail_or_retry_task.status from fail_or_retry_task) = 'failed' THEN now()
ELSE NULL
END
FROM fail_or_retry_task
WHERE pgflow.step_states.run_id = fail_task.run_id
AND pgflow.step_states.step_slug = fail_task.step_slug
RETURNING pgflow.step_states.*
)
UPDATE pgflow.runs
SET status = CASE
WHEN (select status from maybe_fail_step) = 'failed' THEN 'failed'
ELSE status
END,
failed_at = CASE
WHEN (select status from maybe_fail_step) = 'failed' THEN now()
ELSE NULL
END
WHERE pgflow.runs.run_id = fail_task.run_id;
-- For queued tasks: delay the message for retry with exponential backoff
PERFORM (
WITH retry_config AS (
SELECT
COALESCE(s.opt_base_delay, f.opt_base_delay) AS base_delay
FROM pgflow.steps s
JOIN pgflow.flows f ON f.flow_slug = s.flow_slug
JOIN pgflow.runs r ON r.flow_slug = f.flow_slug
WHERE r.run_id = fail_task.run_id
AND s.step_slug = fail_task.step_slug
),
queued_tasks AS (
SELECT
r.flow_slug,
st.message_id,
pgflow.calculate_retry_delay((SELECT base_delay FROM retry_config), st.attempts_count) AS calculated_delay
FROM pgflow.step_tasks st
JOIN pgflow.runs r ON st.run_id = r.run_id
WHERE st.run_id = fail_task.run_id
AND st.step_slug = fail_task.step_slug
AND st.task_index = fail_task.task_index
AND st.status = 'queued'
)
SELECT pgmq.set_vt(qt.flow_slug, qt.message_id, qt.calculated_delay)
FROM queued_tasks qt
WHERE EXISTS (SELECT 1 FROM queued_tasks)
);
-- For failed tasks: archive the message
PERFORM (
WITH failed_tasks AS (
SELECT r.flow_slug, st.message_id
FROM pgflow.step_tasks st
JOIN pgflow.runs r ON st.run_id = r.run_id
WHERE st.run_id = fail_task.run_id
AND st.step_slug = fail_task.step_slug
AND st.task_index = fail_task.task_index
AND st.status = 'failed'
)
SELECT pgmq.archive(ft.flow_slug, ft.message_id)
FROM failed_tasks ft
WHERE EXISTS (SELECT 1 FROM failed_tasks)
);
return query select *
from pgflow.step_tasks st
where st.run_id = fail_task.run_id
and st.step_slug = fail_task.step_slug
and st.task_index = fail_task.task_index;
end;
$$
--SPLIT--
-- Modify "poll_for_tasks" function
create or replace function "pgflow"."poll_for_tasks"(
"queue_name" text,
"vt" integer,
"qty" integer,
"max_poll_seconds" integer default 5,
"poll_interval_ms" integer default 100
) returns setof "pgflow"."step_task_record" language plpgsql set "search_path"
= '' as $$
begin
-- DEPRECATED: This function is deprecated and will be removed in a future version.
-- Please update pgflow to use the new two-phase polling approach.
-- Run 'npx pgflow install' to update your installation.
raise notice 'DEPRECATED: poll_for_tasks is deprecated and will be removed. Please update pgflow via "npx pgflow install".';
-- Return empty set - no tasks will be processed
return;
end;
$$
--SPLIT--
-- Create "start_tasks" function
create function "pgflow"."start_tasks"(
"flow_slug" text, "msg_ids" bigint [], "worker_id" uuid
) returns setof "pgflow"."step_task_record" language sql set "search_path"
= '' as $$
with tasks as (
select
task.flow_slug,
task.run_id,
task.step_slug,
task.task_index,
task.message_id
from pgflow.step_tasks as task
where task.flow_slug = start_tasks.flow_slug
and task.message_id = any(msg_ids)
and task.status = 'queued'
),
start_tasks_update as (
update pgflow.step_tasks
set
attempts_count = attempts_count + 1,
status = 'started',
started_at = now(),
last_worker_id = worker_id
from tasks
where step_tasks.message_id = tasks.message_id
and step_tasks.flow_slug = tasks.flow_slug
and step_tasks.status = 'queued'
),
runs as (
select
r.run_id,
r.input
from pgflow.runs r
where r.run_id in (select run_id from tasks)
),
deps as (
select
st.run_id,
st.step_slug,
dep.dep_slug,
dep_task.output as dep_output
from tasks st
join pgflow.deps dep on dep.flow_slug = st.flow_slug and dep.step_slug = st.step_slug
join pgflow.step_tasks dep_task on
dep_task.run_id = st.run_id and
dep_task.step_slug = dep.dep_slug and
dep_task.status = 'completed'
),
deps_outputs as (
select
d.run_id,
d.step_slug,
jsonb_object_agg(d.dep_slug, d.dep_output) as deps_output
from deps d
group by d.run_id, d.step_slug
),
timeouts as (
select
task.message_id,
task.flow_slug,
coalesce(step.opt_timeout, flow.opt_timeout) + 2 as vt_delay
from tasks task
join pgflow.flows flow on flow.flow_slug = task.flow_slug
join pgflow.steps step on step.flow_slug = task.flow_slug and step.step_slug = task.step_slug
)
select
st.flow_slug,
st.run_id,
st.step_slug,
jsonb_build_object('run', r.input) ||
coalesce(dep_out.deps_output, '{}'::jsonb) as input,
st.message_id as msg_id
from tasks st
join runs r on st.run_id = r.run_id
left join deps_outputs dep_out on
dep_out.run_id = st.run_id and
dep_out.step_slug = st.step_slug
cross join lateral (
-- TODO: this is slow because it calls set_vt for each row, and set_vt
-- builds dynamic query from string every time it is called
-- implement set_vt_batch(msgs_ids bigint[], vt_delays int[])
select pgmq.set_vt(t.flow_slug, st.message_id, t.vt_delay)
from timeouts t
where t.message_id = st.message_id
and t.flow_slug = st.flow_slug
) set_vt
$$
--SPLIT--
-- source: 20250610180554_pgflow_add_set_vt_batch_and_use_it_in_start_tasks.sql
-- Create "set_vt_batch" function
CREATE FUNCTION "pgflow"."set_vt_batch" ("queue_name" text, "msg_ids" bigint[], "vt_offsets" integer[]) RETURNS SETOF pgmq.message_record LANGUAGE plpgsql AS $$
DECLARE
qtable TEXT := pgmq.format_table_name(queue_name, 'q');
sql TEXT;
BEGIN
/* ---------- safety checks ---------------------------------------------------- */
IF msg_ids IS NULL OR vt_offsets IS NULL OR array_length(msg_ids, 1) = 0 THEN
RETURN; -- nothing to do, return empty set
END IF;
IF array_length(msg_ids, 1) IS DISTINCT FROM array_length(vt_offsets, 1) THEN
RAISE EXCEPTION
'msg_ids length (%) must equal vt_offsets length (%)',
array_length(msg_ids, 1), array_length(vt_offsets, 1);
END IF;
/* ---------- dynamic statement ------------------------------------------------ */
/* One UPDATE joins with the unnested arrays */
sql := format(
$FMT$
WITH input (msg_id, vt_offset) AS (
SELECT unnest($1)::bigint
, unnest($2)::int
)
UPDATE pgmq.%I q
SET vt = clock_timestamp() + make_interval(secs => input.vt_offset),
read_ct = read_ct -- no change, but keeps RETURNING list aligned
FROM input
WHERE q.msg_id = input.msg_id
RETURNING q.msg_id,
q.read_ct,
q.enqueued_at,
q.vt,
q.message
$FMT$,
qtable
);
RETURN QUERY EXECUTE sql USING msg_ids, vt_offsets;
END;
$$
--SPLIT--
-- Modify "start_tasks" function
CREATE OR REPLACE FUNCTION "pgflow"."start_tasks" ("flow_slug" text, "msg_ids" bigint[], "worker_id" uuid) RETURNS SETOF "pgflow"."step_task_record" LANGUAGE sql SET "search_path" = '' AS $$
with tasks as (
select
task.flow_slug,
task.run_id,
task.step_slug,
task.task_index,
task.message_id
from pgflow.step_tasks as task
where task.flow_slug = start_tasks.flow_slug
and task.message_id = any(msg_ids)
and task.status = 'queued'
),
start_tasks_update as (
update pgflow.step_tasks
set
attempts_count = attempts_count + 1,
status = 'started',
started_at = now(),
last_worker_id = worker_id
from tasks
where step_tasks.message_id = tasks.message_id
and step_tasks.flow_slug = tasks.flow_slug
and step_tasks.status = 'queued'
),
runs as (
select
r.run_id,
r.input
from pgflow.runs r
where r.run_id in (select run_id from tasks)
),
deps as (
select
st.run_id,
st.step_slug,
dep.dep_slug,
dep_task.output as dep_output
from tasks st
join pgflow.deps dep on dep.flow_slug = st.flow_slug and dep.step_slug = st.step_slug
join pgflow.step_tasks dep_task on
dep_task.run_id = st.run_id and
dep_task.step_slug = dep.dep_slug and
dep_task.status = 'completed'
),
deps_outputs as (
select
d.run_id,
d.step_slug,
jsonb_object_agg(d.dep_slug, d.dep_output) as deps_output
from deps d
group by d.run_id, d.step_slug
),
timeouts as (
select
task.message_id,
task.flow_slug,
coalesce(step.opt_timeout, flow.opt_timeout) + 2 as vt_delay
from tasks task
join pgflow.flows flow on flow.flow_slug = task.flow_slug
join pgflow.steps step on step.flow_slug = task.flow_slug and step.step_slug = task.step_slug
),
-- Batch update visibility timeouts for all messages
set_vt_batch as (
select pgflow.set_vt_batch(
start_tasks.flow_slug,
array_agg(t.message_id order by t.message_id),
array_agg(t.vt_delay order by t.message_id)
)
from timeouts t
)
select
st.flow_slug,
st.run_id,
st.step_slug,
jsonb_build_object('run', r.input) ||
coalesce(dep_out.deps_output, '{}'::jsonb) as input,
st.message_id as msg_id
from tasks st
join runs r on st.run_id = r.run_id
left join deps_outputs dep_out on
dep_out.run_id = st.run_id and
dep_out.step_slug = st.step_slug
$$
--SPLIT--
-- source: 20250614124241_pgflow_add_realtime.sql
-- Modify "step_states" table
ALTER TABLE "pgflow"."step_states" ADD COLUMN "error_message" text NULL
--SPLIT--
-- Create index "idx_step_states_run_id" to table: "step_states"
CREATE INDEX "idx_step_states_run_id" ON "pgflow"."step_states" ("run_id")
--SPLIT--
-- Modify "maybe_complete_run" function
CREATE OR REPLACE FUNCTION "pgflow"."maybe_complete_run" ("run_id" uuid) RETURNS void LANGUAGE plpgsql SET "search_path" = '' AS $$
declare
v_completed_run pgflow.runs%ROWTYPE;
begin
-- Update run status to completed and set output when there are no remaining steps
WITH run_output AS (
-- Get outputs from final steps (steps that are not dependencies for other steps)
SELECT jsonb_object_agg(st.step_slug, st.output) as final_output
FROM pgflow.step_tasks st
JOIN pgflow.step_states ss ON ss.run_id = st.run_id AND ss.step_slug = st.step_slug
JOIN pgflow.runs r ON r.run_id = ss.run_id AND r.flow_slug = ss.flow_slug
WHERE st.run_id = maybe_complete_run.run_id
AND st.status = 'completed'
AND NOT EXISTS (
SELECT 1
FROM pgflow.deps d
WHERE d.flow_slug = ss.flow_slug
AND d.dep_slug = ss.step_slug
)
)
UPDATE pgflow.runs
SET
status = 'completed',
completed_at = now(),
output = (SELECT final_output FROM run_output)
WHERE pgflow.runs.run_id = maybe_complete_run.run_id
AND pgflow.runs.remaining_steps = 0
AND pgflow.runs.status != 'completed'
RETURNING * INTO v_completed_run;
-- Only send broadcast if run was completed
IF v_completed_run.run_id IS NOT NULL THEN
PERFORM realtime.send(
jsonb_build_object(
'event_type', 'run:completed',
'run_id', v_completed_run.run_id,
'flow_slug', v_completed_run.flow_slug,
'status', 'completed',
'output', v_completed_run.output,
'completed_at', v_completed_run.completed_at
),
'run:completed',
concat('pgflow:run:', v_completed_run.run_id),
false
);
END IF;
end;
$$
--SPLIT--
-- Modify "complete_task" function
CREATE OR REPLACE FUNCTION "pgflow"."complete_task" ("run_id" uuid, "step_slug" text, "task_index" integer, "output" jsonb) RETURNS SETOF "pgflow"."step_tasks" LANGUAGE plpgsql SET "search_path" = '' AS $$
declare
v_step_state pgflow.step_states%ROWTYPE;
begin
WITH run_lock AS (
SELECT * FROM pgflow.runs
WHERE pgflow.runs.run_id = complete_task.run_id
FOR UPDATE
),
step_lock AS (
SELECT * FROM pgflow.step_states
WHERE pgflow.step_states.run_id = complete_task.run_id
AND pgflow.step_states.step_slug = complete_task.step_slug
FOR UPDATE
),
task AS (
UPDATE pgflow.step_tasks
SET
status = 'completed',
completed_at = now(),
output = complete_task.output
WHERE pgflow.step_tasks.run_id = complete_task.run_id
AND pgflow.step_tasks.step_slug = complete_task.step_slug
AND pgflow.step_tasks.task_index = complete_task.task_index
AND pgflow.step_tasks.status = 'started'
RETURNING *
),
step_state AS (
UPDATE pgflow.step_states
SET
status = CASE
WHEN pgflow.step_states.remaining_tasks = 1 THEN 'completed' -- Will be 0 after decrement
ELSE 'started'
END,
completed_at = CASE
WHEN pgflow.step_states.remaining_tasks = 1 THEN now() -- Will be 0 after decrement
ELSE NULL
END,
remaining_tasks = pgflow.step_states.remaining_tasks - 1
FROM task
WHERE pgflow.step_states.run_id = complete_task.run_id
AND pgflow.step_states.step_slug = complete_task.step_slug
RETURNING pgflow.step_states.*
),
-- Find all dependent steps if the current step was completed
dependent_steps AS (
SELECT d.step_slug AS dependent_step_slug
FROM pgflow.deps d
JOIN step_state s ON s.status = 'completed' AND d.flow_slug = s.flow_slug
WHERE d.dep_slug = complete_task.step_slug
ORDER BY d.step_slug -- Ensure consistent ordering
),
-- Lock dependent steps before updating
dependent_steps_lock AS (
SELECT * FROM pgflow.step_states
WHERE pgflow.step_states.run_id = complete_task.run_id
AND pgflow.step_states.step_slug IN (SELECT dependent_step_slug FROM dependent_steps)
FOR UPDATE
),
-- Update all dependent steps
dependent_steps_update AS (
UPDATE pgflow.step_states
SET remaining_deps = pgflow.step_states.remaining_deps - 1
FROM dependent_steps
WHERE pgflow.step_states.run_id = complete_task.run_id
AND pgflow.step_states.step_slug = dependent_steps.dependent_step_slug
)
-- Only decrement remaining_steps, don't update status
UPDATE pgflow.runs
SET remaining_steps = pgflow.runs.remaining_steps - 1
FROM step_state
WHERE pgflow.runs.run_id = complete_task.run_id
AND step_state.status = 'completed';
-- Get the updated step state for broadcasting
SELECT * INTO v_step_state FROM pgflow.step_states
WHERE pgflow.step_states.run_id = complete_task.run_id AND pgflow.step_states.step_slug = complete_task.step_slug;
-- Send broadcast event for step completed if the step is completed
IF v_step_state.status = 'completed' THEN
PERFORM realtime.send(
jsonb_build_object(
'event_type', 'step:completed',
'run_id', complete_task.run_id,
'step_slug', complete_task.step_slug,
'status', 'completed',
'output', complete_task.output,
'completed_at', v_step_state.completed_at
),
concat('step:', complete_task.step_slug, ':completed'),
concat('pgflow:run:', complete_task.run_id),
false
);
END IF;
-- For completed tasks: archive the message
PERFORM (
WITH completed_tasks AS (
SELECT r.flow_slug, st.message_id
FROM pgflow.step_tasks st
JOIN pgflow.runs r ON st.run_id = r.run_id
WHERE st.run_id = complete_task.run_id
AND st.step_slug = complete_task.step_slug
AND st.task_index = complete_task.task_index
AND st.status = 'completed'
)
SELECT pgmq.archive(ct.flow_slug, ct.message_id)
FROM completed_tasks ct
WHERE EXISTS (SELECT 1 FROM completed_tasks)
);
PERFORM pgflow.start_ready_steps(complete_task.run_id);
PERFORM pgflow.maybe_complete_run(complete_task.run_id);
RETURN QUERY SELECT *
FROM pgflow.step_tasks AS step_task
WHERE step_task.run_id = complete_task.run_id
AND step_task.step_slug = complete_task.step_slug
AND step_task.task_index = complete_task.task_index;
end;
$$
--SPLIT--
-- Modify "fail_task" function
CREATE OR REPLACE FUNCTION "pgflow"."fail_task" ("run_id" uuid, "step_slug" text, "task_index" integer, "error_message" text) RETURNS SETOF "pgflow"."step_tasks" LANGUAGE plpgsql SET "search_path" = '' AS $$
DECLARE
v_run_failed boolean;
begin
WITH run_lock AS (
SELECT * FROM pgflow.runs
WHERE pgflow.runs.run_id = fail_task.run_id
FOR UPDATE
),
step_lock AS (
SELECT * FROM pgflow.step_states
WHERE pgflow.step_states.run_id = fail_task.run_id
AND pgflow.step_states.step_slug = fail_task.step_slug
FOR UPDATE
),
flow_info AS (
SELECT r.flow_slug
FROM pgflow.runs r
WHERE r.run_id = fail_task.run_id
),
config AS (
SELECT
COALESCE(s.opt_max_attempts, f.opt_max_attempts) AS opt_max_attempts,
COALESCE(s.opt_base_delay, f.opt_base_delay) AS opt_base_delay
FROM pgflow.steps s
JOIN pgflow.flows f ON f.flow_slug = s.flow_slug
JOIN flow_info fi ON fi.flow_slug = s.flow_slug
WHERE s.flow_slug = fi.flow_slug AND s.step_slug = fail_task.step_slug
),
fail_or_retry_task as (
UPDATE pgflow.step_tasks as task
SET
status = CASE
WHEN task.attempts_count < (SELECT opt_max_attempts FROM config) THEN 'queued'
ELSE 'failed'
END,
failed_at = CASE
WHEN task.attempts_count >= (SELECT opt_max_attempts FROM config) THEN now()
ELSE NULL
END,
started_at = CASE
WHEN task.attempts_count < (SELECT opt_max_attempts FROM config) THEN NULL
ELSE task.started_at
END,
error_message = fail_task.error_message
WHERE task.run_id = fail_task.run_id
AND task.step_slug = fail_task.step_slug
AND task.task_index = fail_task.task_index
AND task.status = 'started'
RETURNING *
),
maybe_fail_step AS (
UPDATE pgflow.step_states
SET
status = CASE
WHEN (select fail_or_retry_task.status from fail_or_retry_task) = 'failed' THEN 'failed'
ELSE pgflow.step_states.status
END,
failed_at = CASE
WHEN (select fail_or_retry_task.status from fail_or_retry_task) = 'failed' THEN now()
ELSE NULL
END,
error_message = CASE
WHEN (select fail_or_retry_task.status from fail_or_retry_task) = 'failed' THEN fail_task.error_message
ELSE NULL
END
FROM fail_or_retry_task
WHERE pgflow.step_states.run_id = fail_task.run_id
AND pgflow.step_states.step_slug = fail_task.step_slug
RETURNING pgflow.step_states.*
),
-- Send broadcast event for step failed if necessary
broadcast_step_failed AS (
SELECT
realtime.send(
jsonb_build_object(
'event_type', 'step:failed',
'run_id', fail_task.run_id,
'step_slug', fail_task.step_slug,
'status', 'failed',
'error_message', fail_task.error_message,
'failed_at', now()
),
concat('step:', fail_task.step_slug, ':failed'),
concat('pgflow:run:', fail_task.run_id),
false
)
FROM maybe_fail_step
WHERE maybe_fail_step.status = 'failed'
)
-- Only decrement remaining_steps, don't update status
UPDATE pgflow.runs
SET status = CASE
WHEN (select status from maybe_fail_step) = 'failed' THEN 'failed'
ELSE status
END,
failed_at = CASE
WHEN (select status from maybe_fail_step) = 'failed' THEN now()
ELSE NULL
END
WHERE pgflow.runs.run_id = fail_task.run_id
RETURNING (status = 'failed') INTO v_run_failed;
-- Send broadcast event for run failure if the run was failed
IF v_run_failed THEN
DECLARE
v_flow_slug text;
BEGIN
SELECT flow_slug INTO v_flow_slug FROM pgflow.runs WHERE pgflow.runs.run_id = fail_task.run_id;
PERFORM realtime.send(
jsonb_build_object(
'event_type', 'run:failed',
'run_id', fail_task.run_id,
'flow_slug', v_flow_slug,
'status', 'failed',
'error_message', fail_task.error_message,
'failed_at', now()
),
'run:failed',
concat('pgflow:run:', fail_task.run_id),
false
);
END;
END IF;
-- For queued tasks: delay the message for retry with exponential backoff
PERFORM (
WITH retry_config AS (
SELECT
COALESCE(s.opt_base_delay, f.opt_base_delay) AS base_delay
FROM pgflow.steps s
JOIN pgflow.flows f ON f.flow_slug = s.flow_slug
JOIN pgflow.runs r ON r.flow_slug = f.flow_slug
WHERE r.run_id = fail_task.run_id
AND s.step_slug = fail_task.step_slug
),
queued_tasks AS (
SELECT
r.flow_slug,
st.message_id,
pgflow.calculate_retry_delay((SELECT base_delay FROM retry_config), st.attempts_count) AS calculated_delay
FROM pgflow.step_tasks st
JOIN pgflow.runs r ON st.run_id = r.run_id
WHERE st.run_id = fail_task.run_id
AND st.step_slug = fail_task.step_slug
AND st.task_index = fail_task.task_index
AND st.status = 'queued'
)
SELECT pgmq.set_vt(qt.flow_slug, qt.message_id, qt.calculated_delay)
FROM queued_tasks qt
WHERE EXISTS (SELECT 1 FROM queued_tasks)
);
-- For failed tasks: archive the message
PERFORM (
WITH failed_tasks AS (
SELECT r.flow_slug, st.message_id
FROM pgflow.step_tasks st
JOIN pgflow.runs r ON st.run_id = r.run_id
WHERE st.run_id = fail_task.run_id
AND st.step_slug = fail_task.step_slug
AND st.task_index = fail_task.task_index
AND st.status = 'failed'
)
SELECT pgmq.archive(ft.flow_slug, ft.message_id)
FROM failed_tasks ft
WHERE EXISTS (SELECT 1 FROM failed_tasks)
);
return query select *
from pgflow.step_tasks st
where st.run_id = fail_task.run_id
and st.step_slug = fail_task.step_slug
and st.task_index = fail_task.task_index;
end;
$$
--SPLIT--
-- Create "get_run_with_states" function
CREATE FUNCTION "pgflow"."get_run_with_states" ("run_id" uuid) RETURNS jsonb LANGUAGE sql SECURITY DEFINER AS $$
SELECT jsonb_build_object(
'run', to_jsonb(r),
'steps', COALESCE(jsonb_agg(to_jsonb(s)) FILTER (WHERE s.run_id IS NOT NULL), '[]'::jsonb)
)
FROM pgflow.runs r
LEFT JOIN pgflow.step_states s ON s.run_id = r.run_id
WHERE r.run_id = get_run_with_states.run_id
GROUP BY r.run_id;
$$
--SPLIT--
-- Create "start_flow" function
CREATE FUNCTION "pgflow"."start_flow" ("flow_slug" text, "input" jsonb, "run_id" uuid DEFAULT NULL::uuid) RETURNS SETOF "pgflow"."runs" LANGUAGE plpgsql SET "search_path" = '' AS $$
declare
v_created_run pgflow.runs%ROWTYPE;
begin
WITH
flow_steps AS (
SELECT steps.flow_slug, steps.step_slug, steps.deps_count
FROM pgflow.steps
WHERE steps.flow_slug = start_flow.flow_slug
),
created_run AS (
INSERT INTO pgflow.runs (run_id, flow_slug, input, remaining_steps)
VALUES (
COALESCE(start_flow.run_id, gen_random_uuid()),
start_flow.flow_slug,
start_flow.input,
(SELECT count(*) FROM flow_steps)
)
RETURNING *
),
created_step_states AS (
INSERT INTO pgflow.step_states (flow_slug, run_id, step_slug, remaining_deps)
SELECT
fs.flow_slug,
(SELECT created_run.run_id FROM created_run),
fs.step_slug,
fs.deps_count
FROM flow_steps fs
)
SELECT * FROM created_run INTO v_created_run;
-- Send broadcast event for run started
PERFORM realtime.send(
jsonb_build_object(
'event_type', 'run:started',
'run_id', v_created_run.run_id,
'flow_slug', v_created_run.flow_slug,
'input', v_created_run.input,
'status', 'started',
'remaining_steps', v_created_run.remaining_steps,
'started_at', v_created_run.started_at
),
'run:started',
concat('pgflow:run:', v_created_run.run_id),
false
);
PERFORM pgflow.start_ready_steps(v_created_run.run_id);
RETURN QUERY SELECT * FROM pgflow.runs where pgflow.runs.run_id = v_created_run.run_id;
end;
$$
--SPLIT--
-- Create "start_flow_with_states" function
CREATE FUNCTION "pgflow"."start_flow_with_states" ("flow_slug" text, "input" jsonb, "run_id" uuid DEFAULT NULL::uuid) RETURNS jsonb LANGUAGE plpgsql SECURITY DEFINER AS $$
DECLARE
v_run_id UUID;
BEGIN
-- Start the flow using existing function
SELECT r.run_id INTO v_run_id FROM pgflow.start_flow(
start_flow_with_states.flow_slug,
start_flow_with_states.input,
start_flow_with_states.run_id
) AS r LIMIT 1;
-- Use get_run_with_states to return the complete state
RETURN pgflow.get_run_with_states(v_run_id);
END;
$$
--SPLIT--
-- Drop "start_flow" function
DROP FUNCTION "pgflow"."start_flow" (text, jsonb)
--SPLIT--
-- source: 20250619195327_pgflow_fix_fail_task_missing_realtime_event.sql
-- Modify "fail_task" function
CREATE OR REPLACE FUNCTION "pgflow"."fail_task" ("run_id" uuid, "step_slug" text, "task_index" integer, "error_message" text) RETURNS SETOF "pgflow"."step_tasks" LANGUAGE plpgsql SET "search_path" = '' AS $$
DECLARE
v_run_failed boolean;
v_step_failed boolean;
begin
WITH run_lock AS (
SELECT * FROM pgflow.runs
WHERE pgflow.runs.run_id = fail_task.run_id
FOR UPDATE
),
step_lock AS (
SELECT * FROM pgflow.step_states
WHERE pgflow.step_states.run_id = fail_task.run_id
AND pgflow.step_states.step_slug = fail_task.step_slug
FOR UPDATE
),
flow_info AS (
SELECT r.flow_slug
FROM pgflow.runs r
WHERE r.run_id = fail_task.run_id
),
config AS (
SELECT
COALESCE(s.opt_max_attempts, f.opt_max_attempts) AS opt_max_attempts,
COALESCE(s.opt_base_delay, f.opt_base_delay) AS opt_base_delay
FROM pgflow.steps s
JOIN pgflow.flows f ON f.flow_slug = s.flow_slug
JOIN flow_info fi ON fi.flow_slug = s.flow_slug
WHERE s.flow_slug = fi.flow_slug AND s.step_slug = fail_task.step_slug
),
fail_or_retry_task as (
UPDATE pgflow.step_tasks as task
SET
status = CASE
WHEN task.attempts_count < (SELECT opt_max_attempts FROM config) THEN 'queued'
ELSE 'failed'
END,
failed_at = CASE
WHEN task.attempts_count >= (SELECT opt_max_attempts FROM config) THEN now()
ELSE NULL
END,
started_at = CASE
WHEN task.attempts_count < (SELECT opt_max_attempts FROM config) THEN NULL
ELSE task.started_at
END,
error_message = fail_task.error_message
WHERE task.run_id = fail_task.run_id
AND task.step_slug = fail_task.step_slug
AND task.task_index = fail_task.task_index
AND task.status = 'started'
RETURNING *
),
maybe_fail_step AS (
UPDATE pgflow.step_states
SET
status = CASE
WHEN (select fail_or_retry_task.status from fail_or_retry_task) = 'failed' THEN 'failed'
ELSE pgflow.step_states.status
END,
failed_at = CASE
WHEN (select fail_or_retry_task.status from fail_or_retry_task) = 'failed' THEN now()
ELSE NULL
END,
error_message = CASE
WHEN (select fail_or_retry_task.status from fail_or_retry_task) = 'failed' THEN fail_task.error_message
ELSE NULL
END
FROM fail_or_retry_task
WHERE pgflow.step_states.run_id = fail_task.run_id
AND pgflow.step_states.step_slug = fail_task.step_slug
RETURNING pgflow.step_states.*
)
-- Update run status
UPDATE pgflow.runs
SET status = CASE
WHEN (select status from maybe_fail_step) = 'failed' THEN 'failed'
ELSE status
END,
failed_at = CASE
WHEN (select status from maybe_fail_step) = 'failed' THEN now()
ELSE NULL
END
WHERE pgflow.runs.run_id = fail_task.run_id
RETURNING (status = 'failed') INTO v_run_failed;
-- Check if step failed by querying the step_states table
SELECT (status = 'failed') INTO v_step_failed
FROM pgflow.step_states
WHERE pgflow.step_states.run_id = fail_task.run_id
AND pgflow.step_states.step_slug = fail_task.step_slug;
-- Send broadcast event for step failure if the step was failed
IF v_step_failed THEN
PERFORM realtime.send(
jsonb_build_object(
'event_type', 'step:failed',
'run_id', fail_task.run_id,
'step_slug', fail_task.step_slug,
'status', 'failed',
'error_message', fail_task.error_message,
'failed_at', now()
),
concat('step:', fail_task.step_slug, ':failed'),
concat('pgflow:run:', fail_task.run_id),
false
);
END IF;
-- Send broadcast event for run failure if the run was failed
IF v_run_failed THEN
DECLARE
v_flow_slug text;
BEGIN
SELECT flow_slug INTO v_flow_slug FROM pgflow.runs WHERE pgflow.runs.run_id = fail_task.run_id;
PERFORM realtime.send(
jsonb_build_object(
'event_type', 'run:failed',
'run_id', fail_task.run_id,
'flow_slug', v_flow_slug,
'status', 'failed',
'error_message', fail_task.error_message,
'failed_at', now()
),
'run:failed',
concat('pgflow:run:', fail_task.run_id),
false
);
END;
END IF;
-- For queued tasks: delay the message for retry with exponential backoff
PERFORM (
WITH retry_config AS (
SELECT
COALESCE(s.opt_base_delay, f.opt_base_delay) AS base_delay
FROM pgflow.steps s
JOIN pgflow.flows f ON f.flow_slug = s.flow_slug
JOIN pgflow.runs r ON r.flow_slug = f.flow_slug
WHERE r.run_id = fail_task.run_id
AND s.step_slug = fail_task.step_slug
),
queued_tasks AS (
SELECT
r.flow_slug,
st.message_id,
pgflow.calculate_retry_delay((SELECT base_delay FROM retry_config), st.attempts_count) AS calculated_delay
FROM pgflow.step_tasks st
JOIN pgflow.runs r ON st.run_id = r.run_id
WHERE st.run_id = fail_task.run_id
AND st.step_slug = fail_task.step_slug
AND st.task_index = fail_task.task_index
AND st.status = 'queued'
)
SELECT pgmq.set_vt(qt.flow_slug, qt.message_id, qt.calculated_delay)
FROM queued_tasks qt
WHERE EXISTS (SELECT 1 FROM queued_tasks)
);
-- For failed tasks: archive the message
PERFORM (
WITH failed_tasks AS (
SELECT r.flow_slug, st.message_id
FROM pgflow.step_tasks st
JOIN pgflow.runs r ON st.run_id = r.run_id
WHERE st.run_id = fail_task.run_id
AND st.step_slug = fail_task.step_slug
AND st.task_index = fail_task.task_index
AND st.status = 'failed'
)
SELECT pgmq.archive(ft.flow_slug, ft.message_id)
FROM failed_tasks ft
WHERE EXISTS (SELECT 1 FROM failed_tasks)
);
return query select *
from pgflow.step_tasks st
where st.run_id = fail_task.run_id
and st.step_slug = fail_task.step_slug
and st.task_index = fail_task.task_index;
end;
$$
--SPLIT--
-- source: 20250627090700_pgflow_fix_function_search_paths.sql
-- Add "calculate_retry_delay" function configuration parameter
ALTER FUNCTION "pgflow"."calculate_retry_delay" SET "search_path" = ''
--SPLIT--
-- Add "is_valid_slug" function configuration parameter
ALTER FUNCTION "pgflow"."is_valid_slug" SET "search_path" = ''
--SPLIT--
-- Add "read_with_poll" function configuration parameter
ALTER FUNCTION "pgflow"."read_with_poll" SET "search_path" = ''
--SPLIT--
-- source: 20250707210212_pgflow_add_opt_start_delay.sql
-- Modify "steps" table
ALTER TABLE "pgflow"."steps" ADD CONSTRAINT "opt_start_delay_is_nonnegative" CHECK ((opt_start_delay IS NULL) OR (opt_start_delay >= 0)), ADD COLUMN "opt_start_delay" integer NULL
--SPLIT--
-- Create "add_step" function
CREATE FUNCTION "pgflow"."add_step" ("flow_slug" text, "step_slug" text, "deps_slugs" text[], "max_attempts" integer DEFAULT NULL::integer, "base_delay" integer DEFAULT NULL::integer, "timeout" integer DEFAULT NULL::integer, "start_delay" integer DEFAULT NULL::integer) RETURNS "pgflow"."steps" LANGUAGE sql SET "search_path" = '' AS $$
WITH
next_index AS (
SELECT COALESCE(MAX(step_index) + 1, 0) as idx
FROM pgflow.steps
WHERE flow_slug = add_step.flow_slug
),
create_step AS (
INSERT INTO pgflow.steps (flow_slug, step_slug, step_index, deps_count, opt_max_attempts, opt_base_delay, opt_timeout, opt_start_delay)
SELECT add_step.flow_slug, add_step.step_slug, idx, COALESCE(array_length(deps_slugs, 1), 0), max_attempts, base_delay, timeout, start_delay
FROM next_index
ON CONFLICT (flow_slug, step_slug)
DO UPDATE SET step_slug = pgflow.steps.step_slug
RETURNING *
),
insert_deps AS (
INSERT INTO pgflow.deps (flow_slug, dep_slug, step_slug)
SELECT add_step.flow_slug, d.dep_slug, add_step.step_slug
FROM unnest(deps_slugs) AS d(dep_slug)
ON CONFLICT (flow_slug, dep_slug, step_slug) DO NOTHING
RETURNING 1
)
-- Return the created step
SELECT * FROM create_step;
$$
--SPLIT--
-- Drop "add_step" function
DROP FUNCTION "pgflow"."add_step" (text, text, integer, integer, integer)
--SPLIT--
-- Drop "add_step" function
DROP FUNCTION "pgflow"."add_step" (text, text, text[], integer, integer, integer)
--SPLIT--
-- Create "add_step" function
CREATE FUNCTION "pgflow"."add_step" ("flow_slug" text, "step_slug" text, "max_attempts" integer DEFAULT NULL::integer, "base_delay" integer DEFAULT NULL::integer, "timeout" integer DEFAULT NULL::integer, "start_delay" integer DEFAULT NULL::integer) RETURNS "pgflow"."steps" LANGUAGE sql SET "search_path" = '' AS $$
-- Call the original function with an empty array
SELECT * FROM pgflow.add_step(flow_slug, step_slug, ARRAY[]::text[], max_attempts, base_delay, timeout, start_delay);
$$
--SPLIT--
-- source: 20250719205006_pgflow_worker_deprecation.sql
-- Rename a column from "stopped_at" to "deprecated_at"
ALTER TABLE "pgflow"."workers" RENAME COLUMN "stopped_at" TO "deprecated_at"
--SPLIT--
-- source: 20251006073122_pgflow_add_map_step_type.sql
-- Modify "step_task_record" composite type
ALTER TYPE "pgflow"."step_task_record" ADD ATTRIBUTE "task_index" integer
--SPLIT--
-- Modify "step_states" table - Step 1: Drop old constraint and NOT NULL
ALTER TABLE "pgflow"."step_states"
DROP CONSTRAINT "step_states_remaining_tasks_check",
ALTER COLUMN "remaining_tasks" DROP NOT NULL,
ALTER COLUMN "remaining_tasks" DROP DEFAULT,
ADD COLUMN "initial_tasks" integer NULL
--SPLIT--
-- AUTOMATIC DATA MIGRATION: Prepare existing data for new constraints
-- This runs AFTER dropping NOT NULL but BEFORE adding new constraints
-- All old steps had exactly 1 task (enforced by old only_single_task_per_step constraint)
-- Backfill initial_tasks = 1 for all existing steps
-- (Old schema enforced exactly 1 task per step, so all steps had initial_tasks=1)
UPDATE "pgflow"."step_states"
SET "initial_tasks" = 1
WHERE "initial_tasks" IS NULL
--SPLIT--
-- Set remaining_tasks to NULL for 'created' status
-- (New semantics: NULL = not started, old semantics: 1 = not started)
UPDATE "pgflow"."step_states"
SET "remaining_tasks" = NULL
WHERE "status" = 'created' AND "remaining_tasks" IS NOT NULL
--SPLIT--
-- Modify "step_states" table - Step 2: Add new constraints
ALTER TABLE "pgflow"."step_states"
ADD CONSTRAINT "initial_tasks_known_when_started" CHECK ((status <> 'started'::text) OR (initial_tasks IS NOT NULL)),
ADD CONSTRAINT "remaining_tasks_state_consistency" CHECK ((remaining_tasks IS NULL) OR (status <> 'created'::text)),
ADD CONSTRAINT "step_states_initial_tasks_check" CHECK ((initial_tasks IS NULL) OR (initial_tasks >= 0))
--SPLIT--
-- Modify "step_tasks" table
ALTER TABLE "pgflow"."step_tasks" DROP CONSTRAINT "only_single_task_per_step", DROP CONSTRAINT "output_valid_only_for_completed", ADD CONSTRAINT "output_valid_only_for_completed" CHECK ((output IS NULL) OR (status = ANY (ARRAY['completed'::text, 'failed'::text])))
--SPLIT--
-- Modify "steps" table
ALTER TABLE "pgflow"."steps" DROP CONSTRAINT "steps_step_type_check", ADD CONSTRAINT "steps_step_type_check" CHECK (step_type = ANY (ARRAY['single'::text, 'map'::text]))
--SPLIT--
-- Modify "maybe_complete_run" function
CREATE OR REPLACE FUNCTION "pgflow"."maybe_complete_run" ("run_id" uuid) RETURNS void LANGUAGE plpgsql SET "search_path" = '' AS $$
declare
v_completed_run pgflow.runs%ROWTYPE;
begin
-- ==========================================
-- CHECK AND COMPLETE RUN IF FINISHED
-- ==========================================
-- ---------- Complete run if all steps done ----------
UPDATE pgflow.runs
SET
status = 'completed',
completed_at = now(),
-- Only compute expensive aggregation when actually completing the run
output = (
-- ---------- Gather outputs from leaf steps ----------
-- Leaf steps = steps with no dependents
-- For map steps: aggregate all task outputs into array
-- For single steps: use the single task output
SELECT jsonb_object_agg(
step_slug,
CASE
WHEN step_type = 'map' THEN aggregated_output
ELSE single_output
END
)
FROM (
SELECT DISTINCT
leaf_state.step_slug,
leaf_step.step_type,
-- For map steps: aggregate all task outputs
CASE WHEN leaf_step.step_type = 'map' THEN
(SELECT COALESCE(jsonb_agg(leaf_task.output ORDER BY leaf_task.task_index), '[]'::jsonb)
FROM pgflow.step_tasks leaf_task
WHERE leaf_task.run_id = leaf_state.run_id
AND leaf_task.step_slug = leaf_state.step_slug
AND leaf_task.status = 'completed')
END as aggregated_output,
-- For single steps: get the single output
CASE WHEN leaf_step.step_type = 'single' THEN
(SELECT leaf_task.output
FROM pgflow.step_tasks leaf_task
WHERE leaf_task.run_id = leaf_state.run_id
AND leaf_task.step_slug = leaf_state.step_slug
AND leaf_task.status = 'completed'
LIMIT 1)
END as single_output
FROM pgflow.step_states leaf_state
JOIN pgflow.steps leaf_step ON leaf_step.flow_slug = leaf_state.flow_slug AND leaf_step.step_slug = leaf_state.step_slug
WHERE leaf_state.run_id = maybe_complete_run.run_id
AND leaf_state.status = 'completed'
AND NOT EXISTS (
SELECT 1
FROM pgflow.deps dep
WHERE dep.flow_slug = leaf_state.flow_slug
AND dep.dep_slug = leaf_state.step_slug
)
) leaf_outputs
)
WHERE pgflow.runs.run_id = maybe_complete_run.run_id
AND pgflow.runs.remaining_steps = 0
AND pgflow.runs.status != 'completed'
RETURNING * INTO v_completed_run;
-- ==========================================
-- BROADCAST COMPLETION EVENT
-- ==========================================
IF v_completed_run.run_id IS NOT NULL THEN
PERFORM realtime.send(
jsonb_build_object(
'event_type', 'run:completed',
'run_id', v_completed_run.run_id,
'flow_slug', v_completed_run.flow_slug,
'status', 'completed',
'output', v_completed_run.output,
'completed_at', v_completed_run.completed_at
),
'run:completed',
concat('pgflow:run:', v_completed_run.run_id),
false
);
END IF;
end;
$$
--SPLIT--
-- Modify "start_ready_steps" function
CREATE OR REPLACE FUNCTION "pgflow"."start_ready_steps" ("run_id" uuid) RETURNS void LANGUAGE plpgsql SET "search_path" = '' AS $$
begin
-- ==========================================
-- GUARD: No mutations on failed runs
-- ==========================================
IF EXISTS (SELECT 1 FROM pgflow.runs WHERE pgflow.runs.run_id = start_ready_steps.run_id AND pgflow.runs.status = 'failed') THEN
RETURN;
END IF;
-- ==========================================
-- HANDLE EMPTY ARRAY MAPS (initial_tasks = 0)
-- ==========================================
-- These complete immediately without spawning tasks
WITH empty_map_steps AS (
SELECT step_state.*
FROM pgflow.step_states AS step_state
JOIN pgflow.steps AS step
ON step.flow_slug = step_state.flow_slug
AND step.step_slug = step_state.step_slug
WHERE step_state.run_id = start_ready_steps.run_id
AND step_state.status = 'created'
AND step_state.remaining_deps = 0
AND step.step_type = 'map'
AND step_state.initial_tasks = 0
ORDER BY step_state.step_slug
FOR UPDATE OF step_state
),
-- ---------- Complete empty map steps ----------
completed_empty_steps AS (
UPDATE pgflow.step_states
SET status = 'completed',
started_at = now(),
completed_at = now(),
remaining_tasks = 0
FROM empty_map_steps
WHERE pgflow.step_states.run_id = start_ready_steps.run_id
AND pgflow.step_states.step_slug = empty_map_steps.step_slug
RETURNING pgflow.step_states.*
),
-- ---------- Broadcast completion events ----------
broadcast_empty_completed AS (
SELECT
realtime.send(
jsonb_build_object(
'event_type', 'step:completed',
'run_id', completed_step.run_id,
'step_slug', completed_step.step_slug,
'status', 'completed',
'started_at', completed_step.started_at,
'completed_at', completed_step.completed_at,
'remaining_tasks', 0,
'remaining_deps', 0,
'output', '[]'::jsonb
),
concat('step:', completed_step.step_slug, ':completed'),
concat('pgflow:run:', completed_step.run_id),
false
)
FROM completed_empty_steps AS completed_step
),
-- ==========================================
-- HANDLE NORMAL STEPS (initial_tasks > 0)
-- ==========================================
-- ---------- Find ready steps ----------
-- Steps with no remaining deps and known task count
ready_steps AS (
SELECT *
FROM pgflow.step_states AS step_state
WHERE step_state.run_id = start_ready_steps.run_id
AND step_state.status = 'created'
AND step_state.remaining_deps = 0
AND step_state.initial_tasks IS NOT NULL -- NEW: Cannot start with unknown count
AND step_state.initial_tasks > 0 -- Don't start taskless steps
-- Exclude empty map steps already handled
AND NOT EXISTS (
SELECT 1 FROM empty_map_steps
WHERE empty_map_steps.run_id = step_state.run_id
AND empty_map_steps.step_slug = step_state.step_slug
)
ORDER BY step_state.step_slug
FOR UPDATE
),
-- ---------- Mark steps as started ----------
started_step_states AS (
UPDATE pgflow.step_states
SET status = 'started',
started_at = now(),
remaining_tasks = ready_steps.initial_tasks -- Copy initial_tasks to remaining_tasks when starting
FROM ready_steps
WHERE pgflow.step_states.run_id = start_ready_steps.run_id
AND pgflow.step_states.step_slug = ready_steps.step_slug
RETURNING pgflow.step_states.*
),
-- ==========================================
-- TASK GENERATION AND QUEUE MESSAGES
-- ==========================================
-- ---------- Generate tasks and batch messages ----------
-- Single steps: 1 task (index 0)
-- Map steps: N tasks (indices 0..N-1)
message_batches AS (
SELECT
started_step.flow_slug,
started_step.run_id,
started_step.step_slug,
COALESCE(step.opt_start_delay, 0) as delay,
array_agg(
jsonb_build_object(
'flow_slug', started_step.flow_slug,
'run_id', started_step.run_id,
'step_slug', started_step.step_slug,
'task_index', task_idx.task_index
) ORDER BY task_idx.task_index
) AS messages,
array_agg(task_idx.task_index ORDER BY task_idx.task_index) AS task_indices
FROM started_step_states AS started_step
JOIN pgflow.steps AS step
ON step.flow_slug = started_step.flow_slug
AND step.step_slug = started_step.step_slug
-- Generate task indices from 0 to initial_tasks-1
CROSS JOIN LATERAL generate_series(0, started_step.initial_tasks - 1) AS task_idx(task_index)
GROUP BY started_step.flow_slug, started_step.run_id, started_step.step_slug, step.opt_start_delay
),
-- ---------- Send messages to queue ----------
-- Uses batch sending for performance with large arrays
sent_messages AS (
SELECT
mb.flow_slug,
mb.run_id,
mb.step_slug,
task_indices.task_index,
msg_ids.msg_id
FROM message_batches mb
CROSS JOIN LATERAL unnest(mb.task_indices) WITH ORDINALITY AS task_indices(task_index, idx_ord)
CROSS JOIN LATERAL pgmq.send_batch(mb.flow_slug, mb.messages, mb.delay) WITH ORDINALITY AS msg_ids(msg_id, msg_ord)
WHERE task_indices.idx_ord = msg_ids.msg_ord
),
-- ---------- Broadcast step:started events ----------
broadcast_events AS (
SELECT
realtime.send(
jsonb_build_object(
'event_type', 'step:started',
'run_id', started_step.run_id,
'step_slug', started_step.step_slug,
'status', 'started',
'started_at', started_step.started_at,
'remaining_tasks', started_step.remaining_tasks,
'remaining_deps', started_step.remaining_deps
),
concat('step:', started_step.step_slug, ':started'),
concat('pgflow:run:', started_step.run_id),
false
)
FROM started_step_states AS started_step
)
-- ==========================================
-- RECORD TASKS IN DATABASE
-- ==========================================
INSERT INTO pgflow.step_tasks (flow_slug, run_id, step_slug, task_index, message_id)
SELECT
sent_messages.flow_slug,
sent_messages.run_id,
sent_messages.step_slug,
sent_messages.task_index,
sent_messages.msg_id
FROM sent_messages;
end;
$$
--SPLIT--
-- Create "cascade_complete_taskless_steps" function
CREATE FUNCTION "pgflow"."cascade_complete_taskless_steps" ("run_id" uuid) RETURNS integer LANGUAGE plpgsql AS $$
DECLARE
v_total_completed int := 0;
v_iteration_completed int;
v_iterations int := 0;
v_max_iterations int := 50;
BEGIN
-- ==========================================
-- ITERATIVE CASCADE COMPLETION
-- ==========================================
-- Completes taskless steps in waves until none remain
LOOP
-- ---------- Safety check ----------
v_iterations := v_iterations + 1;
IF v_iterations > v_max_iterations THEN
RAISE EXCEPTION 'Cascade loop exceeded safety limit of % iterations', v_max_iterations;
END IF;
-- ==========================================
-- COMPLETE READY TASKLESS STEPS
-- ==========================================
WITH completed AS (
-- ---------- Complete taskless steps ----------
-- Steps with initial_tasks=0 and no remaining deps
UPDATE pgflow.step_states ss
SET status = 'completed',
started_at = now(),
completed_at = now(),
remaining_tasks = 0
FROM pgflow.steps s
WHERE ss.run_id = cascade_complete_taskless_steps.run_id
AND ss.flow_slug = s.flow_slug
AND ss.step_slug = s.step_slug
AND ss.status = 'created'
AND ss.remaining_deps = 0
AND ss.initial_tasks = 0
-- Process in topological order to ensure proper cascade
RETURNING ss.*
),
-- ---------- Update dependent steps ----------
-- Propagate completion and empty arrays to dependents
dep_updates AS (
UPDATE pgflow.step_states ss
SET remaining_deps = ss.remaining_deps - dep_count.count,
-- If the dependent is a map step and its dependency completed with 0 tasks,
-- set its initial_tasks to 0 as well
initial_tasks = CASE
WHEN s.step_type = 'map' AND dep_count.has_zero_tasks
THEN 0 -- Empty array propagation
ELSE ss.initial_tasks -- Keep existing value (including NULL)
END
FROM (
-- Aggregate dependency updates per dependent step
SELECT
d.flow_slug,
d.step_slug as dependent_slug,
COUNT(*) as count,
BOOL_OR(c.initial_tasks = 0) as has_zero_tasks
FROM completed c
JOIN pgflow.deps d ON d.flow_slug = c.flow_slug
AND d.dep_slug = c.step_slug
GROUP BY d.flow_slug, d.step_slug
) dep_count,
pgflow.steps s
WHERE ss.run_id = cascade_complete_taskless_steps.run_id
AND ss.flow_slug = dep_count.flow_slug
AND ss.step_slug = dep_count.dependent_slug
AND s.flow_slug = ss.flow_slug
AND s.step_slug = ss.step_slug
),
-- ---------- Update run counters ----------
-- Only decrement remaining_steps; let maybe_complete_run handle finalization
run_updates AS (
UPDATE pgflow.runs r
SET remaining_steps = r.remaining_steps - c.completed_count
FROM (SELECT COUNT(*) AS completed_count FROM completed) c
WHERE r.run_id = cascade_complete_taskless_steps.run_id
AND c.completed_count > 0
)
-- ---------- Check iteration results ----------
SELECT COUNT(*) INTO v_iteration_completed FROM completed;
EXIT WHEN v_iteration_completed = 0; -- No more steps to complete
v_total_completed := v_total_completed + v_iteration_completed;
END LOOP;
RETURN v_total_completed;
END;
$$
--SPLIT--
-- Modify "complete_task" function
CREATE OR REPLACE FUNCTION "pgflow"."complete_task" ("run_id" uuid, "step_slug" text, "task_index" integer, "output" jsonb) RETURNS SETOF "pgflow"."step_tasks" LANGUAGE plpgsql SET "search_path" = '' AS $$
declare
v_step_state pgflow.step_states%ROWTYPE;
v_dependent_map_slug text;
v_run_record pgflow.runs%ROWTYPE;
v_step_record pgflow.step_states%ROWTYPE;
begin
-- ==========================================
-- GUARD: No mutations on failed runs
-- ==========================================
IF EXISTS (SELECT 1 FROM pgflow.runs WHERE pgflow.runs.run_id = complete_task.run_id AND pgflow.runs.status = 'failed') THEN
RETURN QUERY SELECT * FROM pgflow.step_tasks
WHERE pgflow.step_tasks.run_id = complete_task.run_id
AND pgflow.step_tasks.step_slug = complete_task.step_slug
AND pgflow.step_tasks.task_index = complete_task.task_index;
RETURN;
END IF;
-- ==========================================
-- LOCK ACQUISITION AND TYPE VALIDATION
-- ==========================================
-- Acquire locks first to prevent race conditions
SELECT * INTO v_run_record FROM pgflow.runs
WHERE pgflow.runs.run_id = complete_task.run_id
FOR UPDATE;
SELECT * INTO v_step_record FROM pgflow.step_states
WHERE pgflow.step_states.run_id = complete_task.run_id
AND pgflow.step_states.step_slug = complete_task.step_slug
FOR UPDATE;
-- Check for type violations AFTER acquiring locks
SELECT child_step.step_slug INTO v_dependent_map_slug
FROM pgflow.deps dependency
JOIN pgflow.steps child_step ON child_step.flow_slug = dependency.flow_slug
AND child_step.step_slug = dependency.step_slug
JOIN pgflow.steps parent_step ON parent_step.flow_slug = dependency.flow_slug
AND parent_step.step_slug = dependency.dep_slug
JOIN pgflow.step_states child_state ON child_state.flow_slug = child_step.flow_slug
AND child_state.step_slug = child_step.step_slug
WHERE dependency.dep_slug = complete_task.step_slug -- parent is the completing step
AND dependency.flow_slug = v_run_record.flow_slug
AND parent_step.step_type = 'single' -- Only validate single steps
AND child_step.step_type = 'map'
AND child_state.run_id = complete_task.run_id
AND child_state.initial_tasks IS NULL
AND (complete_task.output IS NULL OR jsonb_typeof(complete_task.output) != 'array')
LIMIT 1;
-- Handle type violation if detected
IF v_dependent_map_slug IS NOT NULL THEN
-- Mark run as failed immediately
UPDATE pgflow.runs
SET status = 'failed',
failed_at = now()
WHERE pgflow.runs.run_id = complete_task.run_id;
-- Archive all active messages (both queued and started) to prevent orphaned messages
PERFORM pgmq.archive(
v_run_record.flow_slug,
array_agg(st.message_id)
)
FROM pgflow.step_tasks st
WHERE st.run_id = complete_task.run_id
AND st.status IN ('queued', 'started')
AND st.message_id IS NOT NULL
HAVING count(*) > 0; -- Only call archive if there are messages to archive
-- Mark current task as failed and store the output
UPDATE pgflow.step_tasks
SET status = 'failed',
failed_at = now(),
output = complete_task.output, -- Store the output that caused the violation
error_message = '[TYPE_VIOLATION] Produced ' ||
CASE WHEN complete_task.output IS NULL THEN 'null'
ELSE jsonb_typeof(complete_task.output) END ||
' instead of array'
WHERE pgflow.step_tasks.run_id = complete_task.run_id
AND pgflow.step_tasks.step_slug = complete_task.step_slug
AND pgflow.step_tasks.task_index = complete_task.task_index;
-- Mark step state as failed
UPDATE pgflow.step_states
SET status = 'failed',
failed_at = now(),
error_message = '[TYPE_VIOLATION] Map step ' || v_dependent_map_slug ||
' expects array input but dependency ' || complete_task.step_slug ||
' produced ' || CASE WHEN complete_task.output IS NULL THEN 'null'
ELSE jsonb_typeof(complete_task.output) END
WHERE pgflow.step_states.run_id = complete_task.run_id
AND pgflow.step_states.step_slug = complete_task.step_slug;
-- Archive the current task's message (it was started, now failed)
PERFORM pgmq.archive(
v_run_record.flow_slug,
st.message_id -- Single message, use scalar form
)
FROM pgflow.step_tasks st
WHERE st.run_id = complete_task.run_id
AND st.step_slug = complete_task.step_slug
AND st.task_index = complete_task.task_index
AND st.message_id IS NOT NULL;
-- Return empty result
RETURN QUERY SELECT * FROM pgflow.step_tasks WHERE false;
RETURN;
END IF;
-- ==========================================
-- MAIN CTE CHAIN: Update task and propagate changes
-- ==========================================
WITH
-- ---------- Task completion ----------
-- Update the task record with completion status and output
task AS (
UPDATE pgflow.step_tasks
SET
status = 'completed',
completed_at = now(),
output = complete_task.output
WHERE pgflow.step_tasks.run_id = complete_task.run_id
AND pgflow.step_tasks.step_slug = complete_task.step_slug
AND pgflow.step_tasks.task_index = complete_task.task_index
AND pgflow.step_tasks.status = 'started'
RETURNING *
),
-- ---------- Step state update ----------
-- Decrement remaining_tasks and potentially mark step as completed
step_state AS (
UPDATE pgflow.step_states
SET
status = CASE
WHEN pgflow.step_states.remaining_tasks = 1 THEN 'completed' -- Will be 0 after decrement
ELSE 'started'
END,
completed_at = CASE
WHEN pgflow.step_states.remaining_tasks = 1 THEN now() -- Will be 0 after decrement
ELSE NULL
END,
remaining_tasks = pgflow.step_states.remaining_tasks - 1
FROM task
WHERE pgflow.step_states.run_id = complete_task.run_id
AND pgflow.step_states.step_slug = complete_task.step_slug
RETURNING pgflow.step_states.*
),
-- ---------- Dependency resolution ----------
-- Find all child steps that depend on the completed parent step (only if parent completed)
child_steps AS (
SELECT deps.step_slug AS child_step_slug
FROM pgflow.deps deps
JOIN step_state parent_state ON parent_state.status = 'completed' AND deps.flow_slug = parent_state.flow_slug
WHERE deps.dep_slug = complete_task.step_slug -- dep_slug is the parent, step_slug is the child
ORDER BY deps.step_slug -- Ensure consistent ordering
),
-- ---------- Lock child steps ----------
-- Acquire locks on all child steps before updating them
child_steps_lock AS (
SELECT * FROM pgflow.step_states
WHERE pgflow.step_states.run_id = complete_task.run_id
AND pgflow.step_states.step_slug IN (SELECT child_step_slug FROM child_steps)
FOR UPDATE
),
-- ---------- Update child steps ----------
-- Decrement remaining_deps and resolve NULL initial_tasks for map steps
child_steps_update AS (
UPDATE pgflow.step_states child_state
SET remaining_deps = child_state.remaining_deps - 1,
-- Resolve NULL initial_tasks for child map steps
-- This is where child maps learn their array size from the parent
-- This CTE only runs when the parent step is complete (see child_steps JOIN)
initial_tasks = CASE
WHEN child_step.step_type = 'map' AND child_state.initial_tasks IS NULL THEN
CASE
WHEN parent_step.step_type = 'map' THEN
-- Map->map: Count all completed tasks from parent map
-- We add 1 because the current task is being completed in this transaction
-- but isn't yet visible as 'completed' in the step_tasks table
-- TODO: Refactor to use future column step_states.total_tasks
-- Would eliminate the COUNT query and just use parent_state.total_tasks
(SELECT COUNT(*)::int + 1
FROM pgflow.step_tasks parent_tasks
WHERE parent_tasks.run_id = complete_task.run_id
AND parent_tasks.step_slug = complete_task.step_slug
AND parent_tasks.status = 'completed'
AND parent_tasks.task_index != complete_task.task_index)
ELSE
-- Single->map: Use output array length (single steps complete immediately)
CASE
WHEN complete_task.output IS NOT NULL
AND jsonb_typeof(complete_task.output) = 'array' THEN
jsonb_array_length(complete_task.output)
ELSE NULL -- Keep NULL if not an array
END
END
ELSE child_state.initial_tasks -- Keep existing value (including NULL)
END
FROM child_steps children
JOIN pgflow.steps child_step ON child_step.flow_slug = (SELECT r.flow_slug FROM pgflow.runs r WHERE r.run_id = complete_task.run_id)
AND child_step.step_slug = children.child_step_slug
JOIN pgflow.steps parent_step ON parent_step.flow_slug = (SELECT r.flow_slug FROM pgflow.runs r WHERE r.run_id = complete_task.run_id)
AND parent_step.step_slug = complete_task.step_slug
WHERE child_state.run_id = complete_task.run_id
AND child_state.step_slug = children.child_step_slug
)
-- ---------- Update run remaining_steps ----------
-- Decrement the run's remaining_steps counter if step completed
UPDATE pgflow.runs
SET remaining_steps = pgflow.runs.remaining_steps - 1
FROM step_state
WHERE pgflow.runs.run_id = complete_task.run_id
AND step_state.status = 'completed';
-- ==========================================
-- POST-COMPLETION ACTIONS
-- ==========================================
-- ---------- Get updated state for broadcasting ----------
SELECT * INTO v_step_state FROM pgflow.step_states
WHERE pgflow.step_states.run_id = complete_task.run_id AND pgflow.step_states.step_slug = complete_task.step_slug;
-- ---------- Handle step completion ----------
IF v_step_state.status = 'completed' THEN
-- Cascade complete any taskless steps that are now ready
PERFORM pgflow.cascade_complete_taskless_steps(complete_task.run_id);
-- Broadcast step:completed event
-- For map steps, aggregate all task outputs; for single steps, use the task output
PERFORM realtime.send(
jsonb_build_object(
'event_type', 'step:completed',
'run_id', complete_task.run_id,
'step_slug', complete_task.step_slug,
'status', 'completed',
'output', CASE
WHEN (SELECT s.step_type FROM pgflow.steps s
WHERE s.flow_slug = v_step_state.flow_slug
AND s.step_slug = complete_task.step_slug) = 'map' THEN
-- Aggregate all task outputs for map steps
(SELECT COALESCE(jsonb_agg(st.output ORDER BY st.task_index), '[]'::jsonb)
FROM pgflow.step_tasks st
WHERE st.run_id = complete_task.run_id
AND st.step_slug = complete_task.step_slug
AND st.status = 'completed')
ELSE
-- Single step: use the individual task output
complete_task.output
END,
'completed_at', v_step_state.completed_at
),
concat('step:', complete_task.step_slug, ':completed'),
concat('pgflow:run:', complete_task.run_id),
false
);
END IF;
-- ---------- Archive completed task message ----------
-- Move message from active queue to archive table
PERFORM (
WITH completed_tasks AS (
SELECT r.flow_slug, st.message_id
FROM pgflow.step_tasks st
JOIN pgflow.runs r ON st.run_id = r.run_id
WHERE st.run_id = complete_task.run_id
AND st.step_slug = complete_task.step_slug
AND st.task_index = complete_task.task_index
AND st.status = 'completed'
)
SELECT pgmq.archive(ct.flow_slug, ct.message_id)
FROM completed_tasks ct
WHERE EXISTS (SELECT 1 FROM completed_tasks)
);
-- ---------- Trigger next steps ----------
-- Start any steps that are now ready (deps satisfied)
PERFORM pgflow.start_ready_steps(complete_task.run_id);
-- Check if the entire run is complete
PERFORM pgflow.maybe_complete_run(complete_task.run_id);
-- ---------- Return completed task ----------
RETURN QUERY SELECT *
FROM pgflow.step_tasks AS step_task
WHERE step_task.run_id = complete_task.run_id
AND step_task.step_slug = complete_task.step_slug
AND step_task.task_index = complete_task.task_index;
end;
$$
--SPLIT--
-- Modify "fail_task" function
CREATE OR REPLACE FUNCTION "pgflow"."fail_task" ("run_id" uuid, "step_slug" text, "task_index" integer, "error_message" text) RETURNS SETOF "pgflow"."step_tasks" LANGUAGE plpgsql SET "search_path" = '' AS $$
DECLARE
v_run_failed boolean;
v_step_failed boolean;
begin
-- If run is already failed, no retries allowed
IF EXISTS (SELECT 1 FROM pgflow.runs WHERE pgflow.runs.run_id = fail_task.run_id AND pgflow.runs.status = 'failed') THEN
UPDATE pgflow.step_tasks
SET status = 'failed',
failed_at = now(),
error_message = fail_task.error_message
WHERE pgflow.step_tasks.run_id = fail_task.run_id
AND pgflow.step_tasks.step_slug = fail_task.step_slug
AND pgflow.step_tasks.task_index = fail_task.task_index
AND pgflow.step_tasks.status = 'started';
-- Archive the task's message
PERFORM pgmq.archive(r.flow_slug, ARRAY_AGG(st.message_id))
FROM pgflow.step_tasks st
JOIN pgflow.runs r ON st.run_id = r.run_id
WHERE st.run_id = fail_task.run_id
AND st.step_slug = fail_task.step_slug
AND st.task_index = fail_task.task_index
AND st.message_id IS NOT NULL
GROUP BY r.flow_slug
HAVING COUNT(st.message_id) > 0;
RETURN QUERY SELECT * FROM pgflow.step_tasks
WHERE pgflow.step_tasks.run_id = fail_task.run_id
AND pgflow.step_tasks.step_slug = fail_task.step_slug
AND pgflow.step_tasks.task_index = fail_task.task_index;
RETURN;
END IF;
WITH run_lock AS (
SELECT * FROM pgflow.runs
WHERE pgflow.runs.run_id = fail_task.run_id
FOR UPDATE
),
step_lock AS (
SELECT * FROM pgflow.step_states
WHERE pgflow.step_states.run_id = fail_task.run_id
AND pgflow.step_states.step_slug = fail_task.step_slug
FOR UPDATE
),
flow_info AS (
SELECT r.flow_slug
FROM pgflow.runs r
WHERE r.run_id = fail_task.run_id
),
config AS (
SELECT
COALESCE(s.opt_max_attempts, f.opt_max_attempts) AS opt_max_attempts,
COALESCE(s.opt_base_delay, f.opt_base_delay) AS opt_base_delay
FROM pgflow.steps s
JOIN pgflow.flows f ON f.flow_slug = s.flow_slug
JOIN flow_info fi ON fi.flow_slug = s.flow_slug
WHERE s.flow_slug = fi.flow_slug AND s.step_slug = fail_task.step_slug
),
fail_or_retry_task as (
UPDATE pgflow.step_tasks as task
SET
status = CASE
WHEN task.attempts_count < (SELECT opt_max_attempts FROM config) THEN 'queued'
ELSE 'failed'
END,
failed_at = CASE
WHEN task.attempts_count >= (SELECT opt_max_attempts FROM config) THEN now()
ELSE NULL
END,
started_at = CASE
WHEN task.attempts_count < (SELECT opt_max_attempts FROM config) THEN NULL
ELSE task.started_at
END,
error_message = fail_task.error_message
WHERE task.run_id = fail_task.run_id
AND task.step_slug = fail_task.step_slug
AND task.task_index = fail_task.task_index
AND task.status = 'started'
RETURNING *
),
maybe_fail_step AS (
UPDATE pgflow.step_states
SET
status = CASE
WHEN (select fail_or_retry_task.status from fail_or_retry_task) = 'failed' THEN 'failed'
ELSE pgflow.step_states.status
END,
failed_at = CASE
WHEN (select fail_or_retry_task.status from fail_or_retry_task) = 'failed' THEN now()
ELSE NULL
END,
error_message = CASE
WHEN (select fail_or_retry_task.status from fail_or_retry_task) = 'failed' THEN fail_task.error_message
ELSE NULL
END
FROM fail_or_retry_task
WHERE pgflow.step_states.run_id = fail_task.run_id
AND pgflow.step_states.step_slug = fail_task.step_slug
RETURNING pgflow.step_states.*
)
-- Update run status
UPDATE pgflow.runs
SET status = CASE
WHEN (select status from maybe_fail_step) = 'failed' THEN 'failed'
ELSE status
END,
failed_at = CASE
WHEN (select status from maybe_fail_step) = 'failed' THEN now()
ELSE NULL
END
WHERE pgflow.runs.run_id = fail_task.run_id
RETURNING (status = 'failed') INTO v_run_failed;
-- Check if step failed by querying the step_states table
SELECT (status = 'failed') INTO v_step_failed
FROM pgflow.step_states
WHERE pgflow.step_states.run_id = fail_task.run_id
AND pgflow.step_states.step_slug = fail_task.step_slug;
-- Send broadcast event for step failure if the step was failed
IF v_step_failed THEN
PERFORM realtime.send(
jsonb_build_object(
'event_type', 'step:failed',
'run_id', fail_task.run_id,
'step_slug', fail_task.step_slug,
'status', 'failed',
'error_message', fail_task.error_message,
'failed_at', now()
),
concat('step:', fail_task.step_slug, ':failed'),
concat('pgflow:run:', fail_task.run_id),
false
);
END IF;
-- Send broadcast event for run failure if the run was failed
IF v_run_failed THEN
DECLARE
v_flow_slug text;
BEGIN
SELECT flow_slug INTO v_flow_slug FROM pgflow.runs WHERE pgflow.runs.run_id = fail_task.run_id;
PERFORM realtime.send(
jsonb_build_object(
'event_type', 'run:failed',
'run_id', fail_task.run_id,
'flow_slug', v_flow_slug,
'status', 'failed',
'error_message', fail_task.error_message,
'failed_at', now()
),
'run:failed',
concat('pgflow:run:', fail_task.run_id),
false
);
END;
END IF;
-- Archive all active messages (both queued and started) when run fails
IF v_run_failed THEN
PERFORM pgmq.archive(r.flow_slug, ARRAY_AGG(st.message_id))
FROM pgflow.step_tasks st
JOIN pgflow.runs r ON st.run_id = r.run_id
WHERE st.run_id = fail_task.run_id
AND st.status IN ('queued', 'started')
AND st.message_id IS NOT NULL
GROUP BY r.flow_slug
HAVING COUNT(st.message_id) > 0;
END IF;
-- For queued tasks: delay the message for retry with exponential backoff
PERFORM (
WITH retry_config AS (
SELECT
COALESCE(s.opt_base_delay, f.opt_base_delay) AS base_delay
FROM pgflow.steps s
JOIN pgflow.flows f ON f.flow_slug = s.flow_slug
JOIN pgflow.runs r ON r.flow_slug = f.flow_slug
WHERE r.run_id = fail_task.run_id
AND s.step_slug = fail_task.step_slug
),
queued_tasks AS (
SELECT
r.flow_slug,
st.message_id,
pgflow.calculate_retry_delay((SELECT base_delay FROM retry_config), st.attempts_count) AS calculated_delay
FROM pgflow.step_tasks st
JOIN pgflow.runs r ON st.run_id = r.run_id
WHERE st.run_id = fail_task.run_id
AND st.step_slug = fail_task.step_slug
AND st.task_index = fail_task.task_index
AND st.status = 'queued'
)
SELECT pgmq.set_vt(qt.flow_slug, qt.message_id, qt.calculated_delay)
FROM queued_tasks qt
WHERE EXISTS (SELECT 1 FROM queued_tasks)
);
-- For failed tasks: archive the message
PERFORM pgmq.archive(r.flow_slug, ARRAY_AGG(st.message_id))
FROM pgflow.step_tasks st
JOIN pgflow.runs r ON st.run_id = r.run_id
WHERE st.run_id = fail_task.run_id
AND st.step_slug = fail_task.step_slug
AND st.task_index = fail_task.task_index
AND st.status = 'failed'
AND st.message_id IS NOT NULL
GROUP BY r.flow_slug
HAVING COUNT(st.message_id) > 0;
return query select *
from pgflow.step_tasks st
where st.run_id = fail_task.run_id
and st.step_slug = fail_task.step_slug
and st.task_index = fail_task.task_index;
end;
$$
--SPLIT--
-- Modify "start_flow" function
CREATE OR REPLACE FUNCTION "pgflow"."start_flow" ("flow_slug" text, "input" jsonb, "run_id" uuid DEFAULT NULL::uuid) RETURNS SETOF "pgflow"."runs" LANGUAGE plpgsql SET "search_path" = '' AS $$
declare
v_created_run pgflow.runs%ROWTYPE;
v_root_map_count int;
begin
-- ==========================================
-- VALIDATION: Root map array input
-- ==========================================
WITH root_maps AS (
SELECT step_slug
FROM pgflow.steps
WHERE steps.flow_slug = start_flow.flow_slug
AND steps.step_type = 'map'
AND steps.deps_count = 0
)
SELECT COUNT(*) INTO v_root_map_count FROM root_maps;
-- If we have root map steps, validate that input is an array
IF v_root_map_count > 0 THEN
-- First check for NULL (should be caught by NOT NULL constraint, but be defensive)
IF start_flow.input IS NULL THEN
RAISE EXCEPTION 'Flow % has root map steps but input is NULL', start_flow.flow_slug;
END IF;
-- Then check if it's not an array
IF jsonb_typeof(start_flow.input) != 'array' THEN
RAISE EXCEPTION 'Flow % has root map steps but input is not an array (got %)',
start_flow.flow_slug, jsonb_typeof(start_flow.input);
END IF;
END IF;
-- ==========================================
-- MAIN CTE CHAIN: Create run and step states
-- ==========================================
WITH
-- ---------- Gather flow metadata ----------
flow_steps AS (
SELECT steps.flow_slug, steps.step_slug, steps.step_type, steps.deps_count
FROM pgflow.steps
WHERE steps.flow_slug = start_flow.flow_slug
),
-- ---------- Create run record ----------
created_run AS (
INSERT INTO pgflow.runs (run_id, flow_slug, input, remaining_steps)
VALUES (
COALESCE(start_flow.run_id, gen_random_uuid()),
start_flow.flow_slug,
start_flow.input,
(SELECT count(*) FROM flow_steps)
)
RETURNING *
),
-- ---------- Create step states ----------
-- Sets initial_tasks: known for root maps, NULL for dependent maps
created_step_states AS (
INSERT INTO pgflow.step_states (flow_slug, run_id, step_slug, remaining_deps, initial_tasks)
SELECT
fs.flow_slug,
(SELECT created_run.run_id FROM created_run),
fs.step_slug,
fs.deps_count,
-- Updated logic for initial_tasks:
CASE
WHEN fs.step_type = 'map' AND fs.deps_count = 0 THEN
-- Root map: get array length from input
CASE
WHEN jsonb_typeof(start_flow.input) = 'array' THEN
jsonb_array_length(start_flow.input)
ELSE
1
END
WHEN fs.step_type = 'map' AND fs.deps_count > 0 THEN
-- Dependent map: unknown until dependencies complete
NULL
ELSE
-- Single steps: always 1 task
1
END
FROM flow_steps fs
)
SELECT * FROM created_run INTO v_created_run;
-- ==========================================
-- POST-CREATION ACTIONS
-- ==========================================
-- ---------- Broadcast run:started event ----------
PERFORM realtime.send(
jsonb_build_object(
'event_type', 'run:started',
'run_id', v_created_run.run_id,
'flow_slug', v_created_run.flow_slug,
'input', v_created_run.input,
'status', 'started',
'remaining_steps', v_created_run.remaining_steps,
'started_at', v_created_run.started_at
),
'run:started',
concat('pgflow:run:', v_created_run.run_id),
false
);
-- ---------- Complete taskless steps ----------
-- Handle empty array maps that should auto-complete
PERFORM pgflow.cascade_complete_taskless_steps(v_created_run.run_id);
-- ---------- Start initial steps ----------
-- Start root steps (those with no dependencies)
PERFORM pgflow.start_ready_steps(v_created_run.run_id);
-- ---------- Check for run completion ----------
-- If cascade completed all steps (zero-task flows), finalize the run
PERFORM pgflow.maybe_complete_run(v_created_run.run_id);
RETURN QUERY SELECT * FROM pgflow.runs where pgflow.runs.run_id = v_created_run.run_id;
end;
$$
--SPLIT--
-- Modify "start_tasks" function
CREATE OR REPLACE FUNCTION "pgflow"."start_tasks" ("flow_slug" text, "msg_ids" bigint[], "worker_id" uuid) RETURNS SETOF "pgflow"."step_task_record" LANGUAGE sql SET "search_path" = '' AS $$
with tasks as (
select
task.flow_slug,
task.run_id,
task.step_slug,
task.task_index,
task.message_id
from pgflow.step_tasks as task
join pgflow.runs r on r.run_id = task.run_id
where task.flow_slug = start_tasks.flow_slug
and task.message_id = any(msg_ids)
and task.status = 'queued'
-- MVP: Don't start tasks on failed runs
and r.status != 'failed'
),
start_tasks_update as (
update pgflow.step_tasks
set
attempts_count = attempts_count + 1,
status = 'started',
started_at = now(),
last_worker_id = worker_id
from tasks
where step_tasks.message_id = tasks.message_id
and step_tasks.flow_slug = tasks.flow_slug
and step_tasks.status = 'queued'
),
runs as (
select
r.run_id,
r.input
from pgflow.runs r
where r.run_id in (select run_id from tasks)
),
deps as (
select
st.run_id,
st.step_slug,
dep.dep_slug,
-- Aggregate map outputs or use single output
CASE
WHEN dep_step.step_type = 'map' THEN
-- Aggregate all task outputs ordered by task_index
-- Use COALESCE to return empty array if no tasks
(SELECT COALESCE(jsonb_agg(dt.output ORDER BY dt.task_index), '[]'::jsonb)
FROM pgflow.step_tasks dt
WHERE dt.run_id = st.run_id
AND dt.step_slug = dep.dep_slug
AND dt.status = 'completed')
ELSE
-- Single step: use the single task output
dep_task.output
END as dep_output
from tasks st
join pgflow.deps dep on dep.flow_slug = st.flow_slug and dep.step_slug = st.step_slug
join pgflow.steps dep_step on dep_step.flow_slug = dep.flow_slug and dep_step.step_slug = dep.dep_slug
left join pgflow.step_tasks dep_task on
dep_task.run_id = st.run_id and
dep_task.step_slug = dep.dep_slug and
dep_task.status = 'completed'
and dep_step.step_type = 'single' -- Only join for single steps
),
deps_outputs as (
select
d.run_id,
d.step_slug,
jsonb_object_agg(d.dep_slug, d.dep_output) as deps_output,
count(*) as dep_count
from deps d
group by d.run_id, d.step_slug
),
timeouts as (
select
task.message_id,
task.flow_slug,
coalesce(step.opt_timeout, flow.opt_timeout) + 2 as vt_delay
from tasks task
join pgflow.flows flow on flow.flow_slug = task.flow_slug
join pgflow.steps step on step.flow_slug = task.flow_slug and step.step_slug = task.step_slug
),
-- Batch update visibility timeouts for all messages
set_vt_batch as (
select pgflow.set_vt_batch(
start_tasks.flow_slug,
array_agg(t.message_id order by t.message_id),
array_agg(t.vt_delay order by t.message_id)
)
from timeouts t
)
select
st.flow_slug,
st.run_id,
st.step_slug,
-- ==========================================
-- INPUT CONSTRUCTION LOGIC
-- ==========================================
-- This nested CASE statement determines how to construct the input
-- for each task based on the step type (map vs non-map).
--
-- The fundamental difference:
-- - Map steps: Receive RAW array elements (e.g., just 42 or "hello")
-- - Non-map steps: Receive structured objects with named keys
-- (e.g., {"run": {...}, "dependency1": {...}})
-- ==========================================
CASE
-- -------------------- MAP STEPS --------------------
-- Map steps process arrays element-by-element.
-- Each task receives ONE element from the array at its task_index position.
WHEN step.step_type = 'map' THEN
-- Map steps get raw array elements without any wrapper object
CASE
-- ROOT MAP: Gets array from run input
-- Example: run input = [1, 2, 3]
-- task 0 gets: 1
-- task 1 gets: 2
-- task 2 gets: 3
WHEN step.deps_count = 0 THEN
-- Root map (deps_count = 0): no dependencies, reads from run input.
-- Extract the element at task_index from the run's input array.
-- Note: If run input is not an array, this will return NULL
-- and the flow will fail (validated in start_flow).
jsonb_array_element(r.input, st.task_index)
-- DEPENDENT MAP: Gets array from its single dependency
-- Example: dependency output = ["a", "b", "c"]
-- task 0 gets: "a"
-- task 1 gets: "b"
-- task 2 gets: "c"
ELSE
-- Has dependencies (should be exactly 1 for map steps).
-- Extract the element at task_index from the dependency's output array.
--
-- Why the subquery with jsonb_each?
-- - The dependency outputs a raw array: [1, 2, 3]
-- - deps_outputs aggregates it into: {"dep_name": [1, 2, 3]}
-- - We need to unwrap and get just the array value
-- - Map steps have exactly 1 dependency (enforced by add_step)
-- - So jsonb_each will return exactly 1 row
-- - We extract the 'value' which is the raw array [1, 2, 3]
-- - Then get the element at task_index from that array
(SELECT jsonb_array_element(value, st.task_index)
FROM jsonb_each(dep_out.deps_output)
LIMIT 1)
END
-- -------------------- NON-MAP STEPS --------------------
-- Regular (non-map) steps receive ALL inputs as a structured object.
-- This includes the original run input plus all dependency outputs.
ELSE
-- Non-map steps get structured input with named keys
-- Example output: {
-- "run": {"original": "input"},
-- "step1": {"output": "from_step1"},
-- "step2": {"output": "from_step2"}
-- }
--
-- Build object with 'run' key containing original input
jsonb_build_object('run', r.input) ||
-- Merge with deps_output which already has dependency outputs
-- deps_output format: {"dep1": output1, "dep2": output2, ...}
-- If no dependencies, defaults to empty object
coalesce(dep_out.deps_output, '{}'::jsonb)
END as input,
st.message_id as msg_id,
st.task_index as task_index
from tasks st
join runs r on st.run_id = r.run_id
join pgflow.steps step on
step.flow_slug = st.flow_slug and
step.step_slug = st.step_slug
left join deps_outputs dep_out on
dep_out.run_id = st.run_id and
dep_out.step_slug = st.step_slug
$$
--SPLIT--
-- Create "add_step" function
CREATE FUNCTION "pgflow"."add_step" ("flow_slug" text, "step_slug" text, "deps_slugs" text[] DEFAULT '{}', "max_attempts" integer DEFAULT NULL::integer, "base_delay" integer DEFAULT NULL::integer, "timeout" integer DEFAULT NULL::integer, "start_delay" integer DEFAULT NULL::integer, "step_type" text DEFAULT 'single') RETURNS "pgflow"."steps" LANGUAGE plpgsql SET "search_path" = '' AS $$
DECLARE
result_step pgflow.steps;
next_idx int;
BEGIN
-- Validate map step constraints
-- Map steps can have either:
-- 0 dependencies (root map - maps over flow input array)
-- 1 dependency (dependent map - maps over dependency output array)
IF COALESCE(add_step.step_type, 'single') = 'map' AND COALESCE(array_length(add_step.deps_slugs, 1), 0) > 1 THEN
RAISE EXCEPTION 'Map step "%" can have at most one dependency, but % were provided: %',
add_step.step_slug,
COALESCE(array_length(add_step.deps_slugs, 1), 0),
array_to_string(add_step.deps_slugs, ', ');
END IF;
-- Get next step index
SELECT COALESCE(MAX(s.step_index) + 1, 0) INTO next_idx
FROM pgflow.steps s
WHERE s.flow_slug = add_step.flow_slug;
-- Create the step
INSERT INTO pgflow.steps (
flow_slug, step_slug, step_type, step_index, deps_count,
opt_max_attempts, opt_base_delay, opt_timeout, opt_start_delay
)
VALUES (
add_step.flow_slug,
add_step.step_slug,
COALESCE(add_step.step_type, 'single'),
next_idx,
COALESCE(array_length(add_step.deps_slugs, 1), 0),
add_step.max_attempts,
add_step.base_delay,
add_step.timeout,
add_step.start_delay
)
ON CONFLICT ON CONSTRAINT steps_pkey
DO UPDATE SET step_slug = EXCLUDED.step_slug
RETURNING * INTO result_step;
-- Insert dependencies
INSERT INTO pgflow.deps (flow_slug, dep_slug, step_slug)
SELECT add_step.flow_slug, d.dep_slug, add_step.step_slug
FROM unnest(COALESCE(add_step.deps_slugs, '{}')) AS d(dep_slug)
WHERE add_step.deps_slugs IS NOT NULL AND array_length(add_step.deps_slugs, 1) > 0
ON CONFLICT ON CONSTRAINT deps_pkey DO NOTHING;
RETURN result_step;
END;
$$
--SPLIT--
-- Drop "add_step" function
DROP FUNCTION "pgflow"."add_step" (text, text, integer, integer, integer, integer)
--SPLIT--
-- Drop "add_step" function
DROP FUNCTION "pgflow"."add_step" (text, text, text[], integer, integer, integer, integer)
--SPLIT--
-- source: 20251103222045_pgflow_fix_broadcast_order_and_timestamp_handling.sql
-- Modify "cascade_complete_taskless_steps" function
CREATE OR REPLACE FUNCTION "pgflow"."cascade_complete_taskless_steps" ("run_id" uuid) RETURNS integer LANGUAGE plpgsql AS $$
DECLARE
v_total_completed int := 0;
v_iteration_completed int;
v_iterations int := 0;
v_max_iterations int := 50;
BEGIN
-- ==========================================
-- ITERATIVE CASCADE COMPLETION
-- ==========================================
-- Completes taskless steps in waves until none remain
LOOP
-- ---------- Safety check ----------
v_iterations := v_iterations + 1;
IF v_iterations > v_max_iterations THEN
RAISE EXCEPTION 'Cascade loop exceeded safety limit of % iterations', v_max_iterations;
END IF;
-- ==========================================
-- COMPLETE READY TASKLESS STEPS
-- ==========================================
WITH
-- ---------- Find steps to complete in topological order ----------
steps_to_complete AS (
SELECT ss.run_id, ss.step_slug
FROM pgflow.step_states ss
JOIN pgflow.steps s ON s.flow_slug = ss.flow_slug AND s.step_slug = ss.step_slug
WHERE ss.run_id = cascade_complete_taskless_steps.run_id
AND ss.status = 'created'
AND ss.remaining_deps = 0
AND ss.initial_tasks = 0
-- Process in topological order to ensure proper cascade
ORDER BY s.step_index
),
completed AS (
-- ---------- Complete taskless steps ----------
-- Steps with initial_tasks=0 and no remaining deps
UPDATE pgflow.step_states ss
SET status = 'completed',
started_at = now(),
completed_at = now(),
remaining_tasks = 0
FROM steps_to_complete stc
WHERE ss.run_id = stc.run_id
AND ss.step_slug = stc.step_slug
RETURNING
ss.*,
-- Broadcast step:completed event atomically with the UPDATE
-- Using RETURNING ensures this executes during row processing
-- and cannot be optimized away by the query planner
realtime.send(
jsonb_build_object(
'event_type', 'step:completed',
'run_id', ss.run_id,
'step_slug', ss.step_slug,
'status', 'completed',
'started_at', ss.started_at,
'completed_at', ss.completed_at,
'remaining_tasks', 0,
'remaining_deps', 0,
'output', '[]'::jsonb
),
concat('step:', ss.step_slug, ':completed'),
concat('pgflow:run:', ss.run_id),
false
) as _broadcast_result -- Prefix with _ to indicate internal use only
),
-- ---------- Update dependent steps ----------
-- Propagate completion and empty arrays to dependents
dep_updates AS (
UPDATE pgflow.step_states ss
SET remaining_deps = ss.remaining_deps - dep_count.count,
-- If the dependent is a map step and its dependency completed with 0 tasks,
-- set its initial_tasks to 0 as well
initial_tasks = CASE
WHEN s.step_type = 'map' AND dep_count.has_zero_tasks
THEN 0 -- Empty array propagation
ELSE ss.initial_tasks -- Keep existing value (including NULL)
END
FROM (
-- Aggregate dependency updates per dependent step
SELECT
d.flow_slug,
d.step_slug as dependent_slug,
COUNT(*) as count,
BOOL_OR(c.initial_tasks = 0) as has_zero_tasks
FROM completed c
JOIN pgflow.deps d ON d.flow_slug = c.flow_slug
AND d.dep_slug = c.step_slug
GROUP BY d.flow_slug, d.step_slug
) dep_count,
pgflow.steps s
WHERE ss.run_id = cascade_complete_taskless_steps.run_id
AND ss.flow_slug = dep_count.flow_slug
AND ss.step_slug = dep_count.dependent_slug
AND s.flow_slug = ss.flow_slug
AND s.step_slug = ss.step_slug
),
-- ---------- Update run counters ----------
-- Only decrement remaining_steps; let maybe_complete_run handle finalization
run_updates AS (
UPDATE pgflow.runs r
SET remaining_steps = r.remaining_steps - c.completed_count
FROM (SELECT COUNT(*) AS completed_count FROM completed) c
WHERE r.run_id = cascade_complete_taskless_steps.run_id
AND c.completed_count > 0
)
-- ---------- Check iteration results ----------
SELECT COUNT(*) INTO v_iteration_completed FROM completed;
EXIT WHEN v_iteration_completed = 0; -- No more steps to complete
v_total_completed := v_total_completed + v_iteration_completed;
END LOOP;
RETURN v_total_completed;
END;
$$
--SPLIT--
-- Modify "start_ready_steps" function
CREATE OR REPLACE FUNCTION "pgflow"."start_ready_steps" ("run_id" uuid) RETURNS void LANGUAGE plpgsql SET "search_path" = '' AS $$
begin
-- ==========================================
-- GUARD: No mutations on failed runs
-- ==========================================
IF EXISTS (SELECT 1 FROM pgflow.runs WHERE pgflow.runs.run_id = start_ready_steps.run_id AND pgflow.runs.status = 'failed') THEN
RETURN;
END IF;
-- ==========================================
-- HANDLE EMPTY ARRAY MAPS (initial_tasks = 0)
-- ==========================================
-- These complete immediately without spawning tasks
WITH empty_map_steps AS (
SELECT step_state.*
FROM pgflow.step_states AS step_state
JOIN pgflow.steps AS step
ON step.flow_slug = step_state.flow_slug
AND step.step_slug = step_state.step_slug
WHERE step_state.run_id = start_ready_steps.run_id
AND step_state.status = 'created'
AND step_state.remaining_deps = 0
AND step.step_type = 'map'
AND step_state.initial_tasks = 0
ORDER BY step_state.step_slug
FOR UPDATE OF step_state
),
-- ---------- Complete empty map steps ----------
completed_empty_steps AS (
UPDATE pgflow.step_states
SET status = 'completed',
started_at = now(),
completed_at = now(),
remaining_tasks = 0
FROM empty_map_steps
WHERE pgflow.step_states.run_id = start_ready_steps.run_id
AND pgflow.step_states.step_slug = empty_map_steps.step_slug
RETURNING
pgflow.step_states.*,
-- Broadcast step:completed event atomically with the UPDATE
-- Using RETURNING ensures this executes during row processing
-- and cannot be optimized away by the query planner
realtime.send(
jsonb_build_object(
'event_type', 'step:completed',
'run_id', pgflow.step_states.run_id,
'step_slug', pgflow.step_states.step_slug,
'status', 'completed',
'started_at', pgflow.step_states.started_at,
'completed_at', pgflow.step_states.completed_at,
'remaining_tasks', 0,
'remaining_deps', 0,
'output', '[]'::jsonb
),
concat('step:', pgflow.step_states.step_slug, ':completed'),
concat('pgflow:run:', pgflow.step_states.run_id),
false
) as _broadcast_completed -- Prefix with _ to indicate internal use only
),
-- ==========================================
-- HANDLE NORMAL STEPS (initial_tasks > 0)
-- ==========================================
-- ---------- Find ready steps ----------
-- Steps with no remaining deps and known task count
ready_steps AS (
SELECT *
FROM pgflow.step_states AS step_state
WHERE step_state.run_id = start_ready_steps.run_id
AND step_state.status = 'created'
AND step_state.remaining_deps = 0
AND step_state.initial_tasks IS NOT NULL -- NEW: Cannot start with unknown count
AND step_state.initial_tasks > 0 -- Don't start taskless steps
-- Exclude empty map steps already handled
AND NOT EXISTS (
SELECT 1 FROM empty_map_steps
WHERE empty_map_steps.run_id = step_state.run_id
AND empty_map_steps.step_slug = step_state.step_slug
)
ORDER BY step_state.step_slug
FOR UPDATE
),
-- ---------- Mark steps as started ----------
started_step_states AS (
UPDATE pgflow.step_states
SET status = 'started',
started_at = now(),
remaining_tasks = ready_steps.initial_tasks -- Copy initial_tasks to remaining_tasks when starting
FROM ready_steps
WHERE pgflow.step_states.run_id = start_ready_steps.run_id
AND pgflow.step_states.step_slug = ready_steps.step_slug
RETURNING pgflow.step_states.*,
-- Broadcast step:started event atomically with the UPDATE
-- Using RETURNING ensures this executes during row processing
-- and cannot be optimized away by the query planner
realtime.send(
jsonb_build_object(
'event_type', 'step:started',
'run_id', pgflow.step_states.run_id,
'step_slug', pgflow.step_states.step_slug,
'status', 'started',
'started_at', pgflow.step_states.started_at,
'remaining_tasks', pgflow.step_states.remaining_tasks,
'remaining_deps', pgflow.step_states.remaining_deps
),
concat('step:', pgflow.step_states.step_slug, ':started'),
concat('pgflow:run:', pgflow.step_states.run_id),
false
) as _broadcast_result -- Prefix with _ to indicate internal use only
),
-- ==========================================
-- TASK GENERATION AND QUEUE MESSAGES
-- ==========================================
-- ---------- Generate tasks and batch messages ----------
-- Single steps: 1 task (index 0)
-- Map steps: N tasks (indices 0..N-1)
message_batches AS (
SELECT
started_step.flow_slug,
started_step.run_id,
started_step.step_slug,
COALESCE(step.opt_start_delay, 0) as delay,
array_agg(
jsonb_build_object(
'flow_slug', started_step.flow_slug,
'run_id', started_step.run_id,
'step_slug', started_step.step_slug,
'task_index', task_idx.task_index
) ORDER BY task_idx.task_index
) AS messages,
array_agg(task_idx.task_index ORDER BY task_idx.task_index) AS task_indices
FROM started_step_states AS started_step
JOIN pgflow.steps AS step
ON step.flow_slug = started_step.flow_slug
AND step.step_slug = started_step.step_slug
-- Generate task indices from 0 to initial_tasks-1
CROSS JOIN LATERAL generate_series(0, started_step.initial_tasks - 1) AS task_idx(task_index)
GROUP BY started_step.flow_slug, started_step.run_id, started_step.step_slug, step.opt_start_delay
),
-- ---------- Send messages to queue ----------
-- Uses batch sending for performance with large arrays
sent_messages AS (
SELECT
mb.flow_slug,
mb.run_id,
mb.step_slug,
task_indices.task_index,
msg_ids.msg_id
FROM message_batches mb
CROSS JOIN LATERAL unnest(mb.task_indices) WITH ORDINALITY AS task_indices(task_index, idx_ord)
CROSS JOIN LATERAL pgmq.send_batch(mb.flow_slug, mb.messages, mb.delay) WITH ORDINALITY AS msg_ids(msg_id, msg_ord)
WHERE task_indices.idx_ord = msg_ids.msg_ord
)
-- ==========================================
-- RECORD TASKS IN DATABASE
-- ==========================================
INSERT INTO pgflow.step_tasks (flow_slug, run_id, step_slug, task_index, message_id)
SELECT
sent_messages.flow_slug,
sent_messages.run_id,
sent_messages.step_slug,
sent_messages.task_index,
sent_messages.msg_id
FROM sent_messages;
-- ==========================================
-- BROADCAST REALTIME EVENTS
-- ==========================================
-- Note: Both step:completed events for empty maps and step:started events
-- are now broadcast atomically in their respective CTEs using RETURNING pattern.
-- This ensures correct ordering, prevents duplicate broadcasts, and guarantees
-- that events are sent for exactly the rows that were updated.
end;
$$
--SPLIT--
-- Modify "complete_task" function
CREATE OR REPLACE FUNCTION "pgflow"."complete_task" ("run_id" uuid, "step_slug" text, "task_index" integer, "output" jsonb) RETURNS SETOF "pgflow"."step_tasks" LANGUAGE plpgsql SET "search_path" = '' AS $$
declare
v_step_state pgflow.step_states%ROWTYPE;
v_dependent_map_slug text;
v_run_record pgflow.runs%ROWTYPE;
v_step_record pgflow.step_states%ROWTYPE;
begin
-- ==========================================
-- GUARD: No mutations on failed runs
-- ==========================================
IF EXISTS (SELECT 1 FROM pgflow.runs WHERE pgflow.runs.run_id = complete_task.run_id AND pgflow.runs.status = 'failed') THEN
RETURN QUERY SELECT * FROM pgflow.step_tasks
WHERE pgflow.step_tasks.run_id = complete_task.run_id
AND pgflow.step_tasks.step_slug = complete_task.step_slug
AND pgflow.step_tasks.task_index = complete_task.task_index;
RETURN;
END IF;
-- ==========================================
-- LOCK ACQUISITION AND TYPE VALIDATION
-- ==========================================
-- Acquire locks first to prevent race conditions
SELECT * INTO v_run_record FROM pgflow.runs
WHERE pgflow.runs.run_id = complete_task.run_id
FOR UPDATE;
SELECT * INTO v_step_record FROM pgflow.step_states
WHERE pgflow.step_states.run_id = complete_task.run_id
AND pgflow.step_states.step_slug = complete_task.step_slug
FOR UPDATE;
-- Check for type violations AFTER acquiring locks
SELECT child_step.step_slug INTO v_dependent_map_slug
FROM pgflow.deps dependency
JOIN pgflow.steps child_step ON child_step.flow_slug = dependency.flow_slug
AND child_step.step_slug = dependency.step_slug
JOIN pgflow.steps parent_step ON parent_step.flow_slug = dependency.flow_slug
AND parent_step.step_slug = dependency.dep_slug
JOIN pgflow.step_states child_state ON child_state.flow_slug = child_step.flow_slug
AND child_state.step_slug = child_step.step_slug
WHERE dependency.dep_slug = complete_task.step_slug -- parent is the completing step
AND dependency.flow_slug = v_run_record.flow_slug
AND parent_step.step_type = 'single' -- Only validate single steps
AND child_step.step_type = 'map'
AND child_state.run_id = complete_task.run_id
AND child_state.initial_tasks IS NULL
AND (complete_task.output IS NULL OR jsonb_typeof(complete_task.output) != 'array')
LIMIT 1;
-- Handle type violation if detected
IF v_dependent_map_slug IS NOT NULL THEN
-- Mark run as failed immediately
UPDATE pgflow.runs
SET status = 'failed',
failed_at = now()
WHERE pgflow.runs.run_id = complete_task.run_id;
-- Broadcast run:failed event
-- Uses PERFORM pattern to ensure execution (proven reliable pattern in this function)
PERFORM realtime.send(
jsonb_build_object(
'event_type', 'run:failed',
'run_id', complete_task.run_id,
'flow_slug', v_run_record.flow_slug,
'status', 'failed',
'failed_at', now()
),
'run:failed',
concat('pgflow:run:', complete_task.run_id),
false
);
-- Archive all active messages (both queued and started) to prevent orphaned messages
PERFORM pgmq.archive(
v_run_record.flow_slug,
array_agg(st.message_id)
)
FROM pgflow.step_tasks st
WHERE st.run_id = complete_task.run_id
AND st.status IN ('queued', 'started')
AND st.message_id IS NOT NULL
HAVING count(*) > 0; -- Only call archive if there are messages to archive
-- Mark current task as failed and store the output
UPDATE pgflow.step_tasks
SET status = 'failed',
failed_at = now(),
output = complete_task.output, -- Store the output that caused the violation
error_message = '[TYPE_VIOLATION] Produced ' ||
CASE WHEN complete_task.output IS NULL THEN 'null'
ELSE jsonb_typeof(complete_task.output) END ||
' instead of array'
WHERE pgflow.step_tasks.run_id = complete_task.run_id
AND pgflow.step_tasks.step_slug = complete_task.step_slug
AND pgflow.step_tasks.task_index = complete_task.task_index;
-- Mark step state as failed
UPDATE pgflow.step_states
SET status = 'failed',
failed_at = now(),
error_message = '[TYPE_VIOLATION] Map step ' || v_dependent_map_slug ||
' expects array input but dependency ' || complete_task.step_slug ||
' produced ' || CASE WHEN complete_task.output IS NULL THEN 'null'
ELSE jsonb_typeof(complete_task.output) END
WHERE pgflow.step_states.run_id = complete_task.run_id
AND pgflow.step_states.step_slug = complete_task.step_slug;
-- Broadcast step:failed event
-- Uses PERFORM pattern to ensure execution (proven reliable pattern in this function)
PERFORM realtime.send(
jsonb_build_object(
'event_type', 'step:failed',
'run_id', complete_task.run_id,
'step_slug', complete_task.step_slug,
'status', 'failed',
'error_message', '[TYPE_VIOLATION] Map step ' || v_dependent_map_slug ||
' expects array input but dependency ' || complete_task.step_slug ||
' produced ' || CASE WHEN complete_task.output IS NULL THEN 'null'
ELSE jsonb_typeof(complete_task.output) END,
'failed_at', now()
),
concat('step:', complete_task.step_slug, ':failed'),
concat('pgflow:run:', complete_task.run_id),
false
);
-- Archive the current task's message (it was started, now failed)
PERFORM pgmq.archive(
v_run_record.flow_slug,
st.message_id -- Single message, use scalar form
)
FROM pgflow.step_tasks st
WHERE st.run_id = complete_task.run_id
AND st.step_slug = complete_task.step_slug
AND st.task_index = complete_task.task_index
AND st.message_id IS NOT NULL;
-- Return empty result
RETURN QUERY SELECT * FROM pgflow.step_tasks WHERE false;
RETURN;
END IF;
-- ==========================================
-- MAIN CTE CHAIN: Update task and propagate changes
-- ==========================================
WITH
-- ---------- Task completion ----------
-- Update the task record with completion status and output
task AS (
UPDATE pgflow.step_tasks
SET
status = 'completed',
completed_at = now(),
output = complete_task.output
WHERE pgflow.step_tasks.run_id = complete_task.run_id
AND pgflow.step_tasks.step_slug = complete_task.step_slug
AND pgflow.step_tasks.task_index = complete_task.task_index
AND pgflow.step_tasks.status = 'started'
RETURNING *
),
-- ---------- Step state update ----------
-- Decrement remaining_tasks and potentially mark step as completed
step_state AS (
UPDATE pgflow.step_states
SET
status = CASE
WHEN pgflow.step_states.remaining_tasks = 1 THEN 'completed' -- Will be 0 after decrement
ELSE 'started'
END,
completed_at = CASE
WHEN pgflow.step_states.remaining_tasks = 1 THEN now() -- Will be 0 after decrement
ELSE NULL
END,
remaining_tasks = pgflow.step_states.remaining_tasks - 1
FROM task
WHERE pgflow.step_states.run_id = complete_task.run_id
AND pgflow.step_states.step_slug = complete_task.step_slug
RETURNING pgflow.step_states.*
),
-- ---------- Dependency resolution ----------
-- Find all child steps that depend on the completed parent step (only if parent completed)
child_steps AS (
SELECT deps.step_slug AS child_step_slug
FROM pgflow.deps deps
JOIN step_state parent_state ON parent_state.status = 'completed' AND deps.flow_slug = parent_state.flow_slug
WHERE deps.dep_slug = complete_task.step_slug -- dep_slug is the parent, step_slug is the child
ORDER BY deps.step_slug -- Ensure consistent ordering
),
-- ---------- Lock child steps ----------
-- Acquire locks on all child steps before updating them
child_steps_lock AS (
SELECT * FROM pgflow.step_states
WHERE pgflow.step_states.run_id = complete_task.run_id
AND pgflow.step_states.step_slug IN (SELECT child_step_slug FROM child_steps)
FOR UPDATE
),
-- ---------- Update child steps ----------
-- Decrement remaining_deps and resolve NULL initial_tasks for map steps
child_steps_update AS (
UPDATE pgflow.step_states child_state
SET remaining_deps = child_state.remaining_deps - 1,
-- Resolve NULL initial_tasks for child map steps
-- This is where child maps learn their array size from the parent
-- This CTE only runs when the parent step is complete (see child_steps JOIN)
initial_tasks = CASE
WHEN child_step.step_type = 'map' AND child_state.initial_tasks IS NULL THEN
CASE
WHEN parent_step.step_type = 'map' THEN
-- Map->map: Count all completed tasks from parent map
-- We add 1 because the current task is being completed in this transaction
-- but isn't yet visible as 'completed' in the step_tasks table
-- TODO: Refactor to use future column step_states.total_tasks
-- Would eliminate the COUNT query and just use parent_state.total_tasks
(SELECT COUNT(*)::int + 1
FROM pgflow.step_tasks parent_tasks
WHERE parent_tasks.run_id = complete_task.run_id
AND parent_tasks.step_slug = complete_task.step_slug
AND parent_tasks.status = 'completed'
AND parent_tasks.task_index != complete_task.task_index)
ELSE
-- Single->map: Use output array length (single steps complete immediately)
CASE
WHEN complete_task.output IS NOT NULL
AND jsonb_typeof(complete_task.output) = 'array' THEN
jsonb_array_length(complete_task.output)
ELSE NULL -- Keep NULL if not an array
END
END
ELSE child_state.initial_tasks -- Keep existing value (including NULL)
END
FROM child_steps children
JOIN pgflow.steps child_step ON child_step.flow_slug = (SELECT r.flow_slug FROM pgflow.runs r WHERE r.run_id = complete_task.run_id)
AND child_step.step_slug = children.child_step_slug
JOIN pgflow.steps parent_step ON parent_step.flow_slug = (SELECT r.flow_slug FROM pgflow.runs r WHERE r.run_id = complete_task.run_id)
AND parent_step.step_slug = complete_task.step_slug
WHERE child_state.run_id = complete_task.run_id
AND child_state.step_slug = children.child_step_slug
)
-- ---------- Update run remaining_steps ----------
-- Decrement the run's remaining_steps counter if step completed
UPDATE pgflow.runs
SET remaining_steps = pgflow.runs.remaining_steps - 1
FROM step_state
WHERE pgflow.runs.run_id = complete_task.run_id
AND step_state.status = 'completed';
-- ==========================================
-- POST-COMPLETION ACTIONS
-- ==========================================
-- ---------- Get updated state for broadcasting ----------
SELECT * INTO v_step_state FROM pgflow.step_states
WHERE pgflow.step_states.run_id = complete_task.run_id AND pgflow.step_states.step_slug = complete_task.step_slug;
-- ---------- Handle step completion ----------
IF v_step_state.status = 'completed' THEN
-- Broadcast step:completed event FIRST (before cascade)
-- This ensures parent broadcasts before its dependent children
-- For map steps, aggregate all task outputs; for single steps, use the task output
PERFORM realtime.send(
jsonb_build_object(
'event_type', 'step:completed',
'run_id', complete_task.run_id,
'step_slug', complete_task.step_slug,
'status', 'completed',
'output', CASE
WHEN (SELECT s.step_type FROM pgflow.steps s
WHERE s.flow_slug = v_step_state.flow_slug
AND s.step_slug = complete_task.step_slug) = 'map' THEN
-- Aggregate all task outputs for map steps
(SELECT COALESCE(jsonb_agg(st.output ORDER BY st.task_index), '[]'::jsonb)
FROM pgflow.step_tasks st
WHERE st.run_id = complete_task.run_id
AND st.step_slug = complete_task.step_slug
AND st.status = 'completed')
ELSE
-- Single step: use the individual task output
complete_task.output
END,
'completed_at', v_step_state.completed_at
),
concat('step:', complete_task.step_slug, ':completed'),
concat('pgflow:run:', complete_task.run_id),
false
);
-- THEN cascade complete any taskless steps that are now ready
-- This ensures dependent children broadcast AFTER their parent
PERFORM pgflow.cascade_complete_taskless_steps(complete_task.run_id);
END IF;
-- ---------- Archive completed task message ----------
-- Move message from active queue to archive table
PERFORM (
WITH completed_tasks AS (
SELECT r.flow_slug, st.message_id
FROM pgflow.step_tasks st
JOIN pgflow.runs r ON st.run_id = r.run_id
WHERE st.run_id = complete_task.run_id
AND st.step_slug = complete_task.step_slug
AND st.task_index = complete_task.task_index
AND st.status = 'completed'
)
SELECT pgmq.archive(ct.flow_slug, ct.message_id)
FROM completed_tasks ct
WHERE EXISTS (SELECT 1 FROM completed_tasks)
);
-- ---------- Trigger next steps ----------
-- Start any steps that are now ready (deps satisfied)
PERFORM pgflow.start_ready_steps(complete_task.run_id);
-- Check if the entire run is complete
PERFORM pgflow.maybe_complete_run(complete_task.run_id);
-- ---------- Return completed task ----------
RETURN QUERY SELECT *
FROM pgflow.step_tasks AS step_task
WHERE step_task.run_id = complete_task.run_id
AND step_task.step_slug = complete_task.step_slug
AND step_task.task_index = complete_task.task_index;
end;
$$
--SPLIT--
-- source: 20251104080523_pgflow_upgrade_pgmq_1_5_1.sql
-- Migration tested 2025-11-02:
-- Successfully verified that this migration fails on pgmq 1.4.4 (Supabase CLI < 2.50.3)
-- with clear error message guiding users to upgrade pgmq to 1.5.0+
--
-- Compatibility check: Ensure pgmq.message_record has headers column (pgmq 1.5.0+)
DO $$
DECLARE
has_headers BOOLEAN;
BEGIN
SELECT EXISTS (
SELECT 1
FROM pg_type t
JOIN pg_namespace n ON t.typnamespace = n.oid
JOIN pg_attribute a ON a.attrelid = t.typrelid
WHERE n.nspname = 'pgmq'
AND t.typname = 'message_record'
AND a.attname = 'headers'
AND a.attnum > 0
AND NOT a.attisdropped
) INTO has_headers;
IF NOT has_headers THEN
RAISE EXCEPTION E'INCOMPATIBLE PGMQ VERSION DETECTED\n\n'
'This migration is part of pgflow 0.8.0+, which requires pgmq 1.5.0 or higher.\n'
'The pgmq.message_record type is missing the "headers" column, which indicates you are running pgmq < 1.5.0.\n\n'
'pgflow 0.8.0+ is NOT compatible with pgmq versions below 1.5.0.\n\n'
'Action required:\n'
' - If using Supabase: Ensure you are running a recent version that includes pgmq 1.5.0+\n'
' - If self-hosting: Upgrade pgmq to version 1.5.0 or higher before running this migration\n\n'
'Migration aborted to prevent runtime failures.';
END IF;
END $$
--SPLIT--
-- Modify "set_vt_batch" function
-- Must drop first because we're changing the return type from SETOF to TABLE
DROP FUNCTION IF EXISTS "pgflow"."set_vt_batch"(text, bigint[], integer[])
--SPLIT--
CREATE FUNCTION "pgflow"."set_vt_batch" (
"queue_name" text,
"msg_ids" bigint[],
"vt_offsets" integer[]
)
RETURNS TABLE(
msg_id bigint,
read_ct integer,
enqueued_at timestamp with time zone,
vt timestamp with time zone,
message jsonb,
headers jsonb
)
LANGUAGE plpgsql AS $$
DECLARE
qtable TEXT := pgmq.format_table_name(queue_name, 'q');
sql TEXT;
BEGIN
/* ---------- safety checks ---------------------------------------------------- */
IF msg_ids IS NULL OR vt_offsets IS NULL OR array_length(msg_ids, 1) = 0 THEN
RETURN; -- nothing to do, return empty set
END IF;
IF array_length(msg_ids, 1) IS DISTINCT FROM array_length(vt_offsets, 1) THEN
RAISE EXCEPTION
'msg_ids length (%) must equal vt_offsets length (%)',
array_length(msg_ids, 1), array_length(vt_offsets, 1);
END IF;
/* ---------- dynamic statement ------------------------------------------------ */
/* One UPDATE joins with the unnested arrays */
sql := format(
$FMT$
WITH input (msg_id, vt_offset) AS (
SELECT unnest($1)::bigint
, unnest($2)::int
)
UPDATE pgmq.%I q
SET vt = clock_timestamp() + make_interval(secs => input.vt_offset),
read_ct = read_ct -- no change, but keeps RETURNING list aligned
FROM input
WHERE q.msg_id = input.msg_id
RETURNING q.msg_id,
q.read_ct,
q.enqueued_at,
q.vt,
q.message,
q.headers
$FMT$,
qtable
);
RETURN QUERY EXECUTE sql USING msg_ids, vt_offsets;
END;
$$
--SPLIT--
-- Drop "read_with_poll" function
DROP FUNCTION "pgflow"."read_with_poll"
--SPLIT--
-- source: 20251130000000_pgflow_auto_compilation.sql
-- Modify "create_flow" function
CREATE OR REPLACE FUNCTION "pgflow"."create_flow" ("flow_slug" text, "max_attempts" integer DEFAULT NULL::integer, "base_delay" integer DEFAULT NULL::integer, "timeout" integer DEFAULT NULL::integer) RETURNS "pgflow"."flows" LANGUAGE sql SET "search_path" = '' AS $$
WITH
defaults AS (
SELECT 3 AS def_max_attempts, 5 AS def_base_delay, 60 AS def_timeout
),
flow_upsert AS (
INSERT INTO pgflow.flows (flow_slug, opt_max_attempts, opt_base_delay, opt_timeout)
SELECT
flow_slug,
COALESCE(max_attempts, defaults.def_max_attempts),
COALESCE(base_delay, defaults.def_base_delay),
COALESCE(timeout, defaults.def_timeout)
FROM defaults
ON CONFLICT (flow_slug) DO UPDATE
SET flow_slug = pgflow.flows.flow_slug -- Dummy update
RETURNING *
),
ensure_queue AS (
SELECT pgmq.create(flow_slug)
WHERE NOT EXISTS (
SELECT 1 FROM pgmq.list_queues() WHERE queue_name = flow_slug
)
)
SELECT f.*
FROM flow_upsert f
LEFT JOIN (SELECT 1 FROM ensure_queue) _dummy ON true; -- Left join ensures flow is returned
$$
--SPLIT--
-- Create "_compare_flow_shapes" function
CREATE FUNCTION "pgflow"."_compare_flow_shapes" ("p_local" jsonb, "p_db" jsonb) RETURNS text[] LANGUAGE plpgsql STABLE SET "search_path" = '' AS $BODY$
DECLARE
v_differences text[] := '{}';
v_local_steps jsonb;
v_db_steps jsonb;
v_local_count int;
v_db_count int;
v_max_count int;
v_idx int;
v_local_step jsonb;
v_db_step jsonb;
v_local_deps text;
v_db_deps text;
BEGIN
v_local_steps := p_local->'steps';
v_db_steps := p_db->'steps';
v_local_count := jsonb_array_length(COALESCE(v_local_steps, '[]'::jsonb));
v_db_count := jsonb_array_length(COALESCE(v_db_steps, '[]'::jsonb));
-- Compare step counts
IF v_local_count != v_db_count THEN
v_differences := array_append(
v_differences,
format('Step count differs: %s vs %s', v_local_count, v_db_count)
);
END IF;
-- Compare steps by index
v_max_count := GREATEST(v_local_count, v_db_count);
FOR v_idx IN 0..(v_max_count - 1) LOOP
v_local_step := v_local_steps->v_idx;
v_db_step := v_db_steps->v_idx;
IF v_local_step IS NULL THEN
v_differences := array_append(
v_differences,
format(
$$Step at index %s: missing in first shape (second has '%s')$$,
v_idx,
v_db_step->>'slug'
)
);
ELSIF v_db_step IS NULL THEN
v_differences := array_append(
v_differences,
format(
$$Step at index %s: missing in second shape (first has '%s')$$,
v_idx,
v_local_step->>'slug'
)
);
ELSE
-- Compare slug
IF v_local_step->>'slug' != v_db_step->>'slug' THEN
v_differences := array_append(
v_differences,
format(
$$Step at index %s: slug differs '%s' vs '%s'$$,
v_idx,
v_local_step->>'slug',
v_db_step->>'slug'
)
);
END IF;
-- Compare step type
IF v_local_step->>'stepType' != v_db_step->>'stepType' THEN
v_differences := array_append(
v_differences,
format(
$$Step at index %s: type differs '%s' vs '%s'$$,
v_idx,
v_local_step->>'stepType',
v_db_step->>'stepType'
)
);
END IF;
-- Compare dependencies (convert arrays to comma-separated strings)
SELECT string_agg(dep, ', ' ORDER BY dep)
INTO v_local_deps
FROM jsonb_array_elements_text(COALESCE(v_local_step->'dependencies', '[]'::jsonb)) AS dep;
SELECT string_agg(dep, ', ' ORDER BY dep)
INTO v_db_deps
FROM jsonb_array_elements_text(COALESCE(v_db_step->'dependencies', '[]'::jsonb)) AS dep;
IF COALESCE(v_local_deps, '') != COALESCE(v_db_deps, '') THEN
v_differences := array_append(
v_differences,
format(
$$Step at index %s: dependencies differ [%s] vs [%s]$$,
v_idx,
COALESCE(v_local_deps, ''),
COALESCE(v_db_deps, '')
)
);
END IF;
END IF;
END LOOP;
RETURN v_differences;
END;
$BODY$
--SPLIT--
-- Create "_create_flow_from_shape" function
CREATE FUNCTION "pgflow"."_create_flow_from_shape" ("p_flow_slug" text, "p_shape" jsonb) RETURNS void LANGUAGE plpgsql SET "search_path" = '' AS $$
DECLARE
v_step jsonb;
v_deps text[];
v_flow_options jsonb;
v_step_options jsonb;
BEGIN
-- Extract flow-level options (may be null)
v_flow_options := p_shape->'options';
-- Create the flow with options (NULL = use default)
PERFORM pgflow.create_flow(
p_flow_slug,
(v_flow_options->>'maxAttempts')::int,
(v_flow_options->>'baseDelay')::int,
(v_flow_options->>'timeout')::int
);
-- Iterate over steps in order and add each one
FOR v_step IN SELECT * FROM jsonb_array_elements(p_shape->'steps')
LOOP
-- Convert dependencies jsonb array to text array
SELECT COALESCE(array_agg(dep), '{}')
INTO v_deps
FROM jsonb_array_elements_text(COALESCE(v_step->'dependencies', '[]'::jsonb)) AS dep;
-- Extract step options (may be null)
v_step_options := v_step->'options';
-- Add the step with options (NULL = use default/inherit)
PERFORM pgflow.add_step(
flow_slug => p_flow_slug,
step_slug => v_step->>'slug',
deps_slugs => v_deps,
max_attempts => (v_step_options->>'maxAttempts')::int,
base_delay => (v_step_options->>'baseDelay')::int,
timeout => (v_step_options->>'timeout')::int,
start_delay => (v_step_options->>'startDelay')::int,
step_type => v_step->>'stepType'
);
END LOOP;
END;
$$
--SPLIT--
-- Create "_get_flow_shape" function
CREATE FUNCTION "pgflow"."_get_flow_shape" ("p_flow_slug" text) RETURNS jsonb LANGUAGE sql STABLE SET "search_path" = '' AS $$
SELECT jsonb_build_object(
'steps',
COALESCE(
jsonb_agg(
jsonb_build_object(
'slug', step.step_slug,
'stepType', step.step_type,
'dependencies', COALESCE(
(
SELECT jsonb_agg(dep.dep_slug ORDER BY dep.dep_slug)
FROM pgflow.deps AS dep
WHERE dep.flow_slug = step.flow_slug
AND dep.step_slug = step.step_slug
),
'[]'::jsonb
)
)
ORDER BY step.step_index
),
'[]'::jsonb
)
)
FROM pgflow.steps AS step
WHERE step.flow_slug = p_flow_slug;
$$
--SPLIT--
-- Create "delete_flow_and_data" function
CREATE FUNCTION "pgflow"."delete_flow_and_data" ("p_flow_slug" text) RETURNS void LANGUAGE plpgsql SET "search_path" = '' AS $$
BEGIN
-- Drop queue and archive table (pgmq)
PERFORM pgmq.drop_queue(p_flow_slug);
-- Delete all associated data in the correct order (respecting FK constraints)
DELETE FROM pgflow.step_tasks AS task WHERE task.flow_slug = p_flow_slug;
DELETE FROM pgflow.step_states AS state WHERE state.flow_slug = p_flow_slug;
DELETE FROM pgflow.runs AS run WHERE run.flow_slug = p_flow_slug;
DELETE FROM pgflow.deps AS dep WHERE dep.flow_slug = p_flow_slug;
DELETE FROM pgflow.steps AS step WHERE step.flow_slug = p_flow_slug;
DELETE FROM pgflow.flows AS flow WHERE flow.flow_slug = p_flow_slug;
END;
$$
--SPLIT--
-- Create "ensure_flow_compiled" function
CREATE FUNCTION "pgflow"."ensure_flow_compiled" ("p_flow_slug" text, "p_shape" jsonb, "p_mode" text DEFAULT 'production') RETURNS jsonb LANGUAGE plpgsql SET "search_path" = '' AS $$
DECLARE
v_lock_key int;
v_flow_exists boolean;
v_db_shape jsonb;
v_differences text[];
BEGIN
-- Generate lock key from flow_slug (deterministic hash)
v_lock_key := hashtext(p_flow_slug);
-- Acquire transaction-level advisory lock
-- Serializes concurrent compilation attempts for same flow
PERFORM pg_advisory_xact_lock(1, v_lock_key);
-- 1. Check if flow exists
SELECT EXISTS(SELECT 1 FROM pgflow.flows AS flow WHERE flow.flow_slug = p_flow_slug)
INTO v_flow_exists;
-- 2. If flow missing: compile (both modes)
IF NOT v_flow_exists THEN
PERFORM pgflow._create_flow_from_shape(p_flow_slug, p_shape);
RETURN jsonb_build_object('status', 'compiled', 'differences', '[]'::jsonb);
END IF;
-- 3. Get current shape from DB
v_db_shape := pgflow._get_flow_shape(p_flow_slug);
-- 4. Compare shapes
v_differences := pgflow._compare_flow_shapes(p_shape, v_db_shape);
-- 5. If shapes match: return verified
IF array_length(v_differences, 1) IS NULL THEN
RETURN jsonb_build_object('status', 'verified', 'differences', '[]'::jsonb);
END IF;
-- 6. Shapes differ - handle by mode
IF p_mode = 'development' THEN
-- Recompile in dev mode: full deletion + fresh compile
PERFORM pgflow.delete_flow_and_data(p_flow_slug);
PERFORM pgflow._create_flow_from_shape(p_flow_slug, p_shape);
RETURN jsonb_build_object('status', 'recompiled', 'differences', to_jsonb(v_differences));
ELSE
-- Fail in production mode
RETURN jsonb_build_object('status', 'mismatch', 'differences', to_jsonb(v_differences));
END IF;
END;
$$
--SPLIT--
-- Modify "workers" table
ALTER TABLE "pgflow"."workers" ADD COLUMN "stopped_at" timestamptz NULL
--SPLIT--
-- Create "worker_functions" table
CREATE TABLE "pgflow"."worker_functions" (
"function_name" text NOT NULL,
"enabled" boolean NOT NULL DEFAULT true,
"debounce" interval NOT NULL DEFAULT '00:00:06'::interval,
"last_invoked_at" timestamptz NULL,
"created_at" timestamptz NOT NULL DEFAULT now(),
"updated_at" timestamptz NOT NULL DEFAULT now(),
PRIMARY KEY ("function_name"),
CONSTRAINT "worker_functions_debounce_check" CHECK (debounce >= '00:00:01'::interval)
)
--SPLIT--
-- Set comment to table: "worker_functions"
COMMENT ON TABLE "pgflow"."worker_functions" IS 'Registry of edge functions that run pgflow workers, used by ensure_workers() cron'
--SPLIT--
-- Set comment to column: "function_name" on table: "worker_functions"
COMMENT ON COLUMN "pgflow"."worker_functions"."function_name" IS 'Name of the Supabase Edge Function'
--SPLIT--
-- Set comment to column: "enabled" on table: "worker_functions"
COMMENT ON COLUMN "pgflow"."worker_functions"."enabled" IS 'Whether ensure_workers() should ping this function'
--SPLIT--
-- Set comment to column: "debounce" on table: "worker_functions"
COMMENT ON COLUMN "pgflow"."worker_functions"."debounce" IS 'Minimum interval between invocation attempts for this function'
--SPLIT--
-- Set comment to column: "last_invoked_at" on table: "worker_functions"
COMMENT ON COLUMN "pgflow"."worker_functions"."last_invoked_at" IS 'When ensure_workers() last pinged this function (used for debouncing)'
--SPLIT--
-- Create "is_local" function
CREATE FUNCTION "pgflow"."is_local" () RETURNS boolean LANGUAGE sql STABLE PARALLEL SAFE SET "search_path" = '' AS $$
select coalesce(
current_setting('app.settings.jwt_secret', true)
= 'super-secret-jwt-token-with-at-least-32-characters-long',
false
)
$$
--SPLIT--
-- Create "ensure_flow_compiled" function
CREATE FUNCTION "pgflow"."ensure_flow_compiled" ("flow_slug" text, "shape" jsonb) RETURNS jsonb LANGUAGE plpgsql SET "search_path" = '' AS $$
DECLARE
v_lock_key int;
v_flow_exists boolean;
v_db_shape jsonb;
v_differences text[];
v_is_local boolean;
BEGIN
-- Generate lock key from flow_slug (deterministic hash)
v_lock_key := hashtext(ensure_flow_compiled.flow_slug);
-- Acquire transaction-level advisory lock
-- Serializes concurrent compilation attempts for same flow
PERFORM pg_advisory_xact_lock(1, v_lock_key);
-- 1. Check if flow exists
SELECT EXISTS(SELECT 1 FROM pgflow.flows AS flow WHERE flow.flow_slug = ensure_flow_compiled.flow_slug)
INTO v_flow_exists;
-- 2. If flow missing: compile (both environments)
IF NOT v_flow_exists THEN
PERFORM pgflow._create_flow_from_shape(ensure_flow_compiled.flow_slug, ensure_flow_compiled.shape);
RETURN jsonb_build_object('status', 'compiled', 'differences', '[]'::jsonb);
END IF;
-- 3. Get current shape from DB
v_db_shape := pgflow._get_flow_shape(ensure_flow_compiled.flow_slug);
-- 4. Compare shapes
v_differences := pgflow._compare_flow_shapes(ensure_flow_compiled.shape, v_db_shape);
-- 5. If shapes match: return verified
IF array_length(v_differences, 1) IS NULL THEN
RETURN jsonb_build_object('status', 'verified', 'differences', '[]'::jsonb);
END IF;
-- 6. Shapes differ - auto-detect environment via is_local()
v_is_local := pgflow.is_local();
IF v_is_local THEN
-- Recompile in local/dev: full deletion + fresh compile
PERFORM pgflow.delete_flow_and_data(ensure_flow_compiled.flow_slug);
PERFORM pgflow._create_flow_from_shape(ensure_flow_compiled.flow_slug, ensure_flow_compiled.shape);
RETURN jsonb_build_object('status', 'recompiled', 'differences', to_jsonb(v_differences));
ELSE
-- Fail in production
RETURN jsonb_build_object('status', 'mismatch', 'differences', to_jsonb(v_differences));
END IF;
END;
$$
--SPLIT--
-- Create "mark_worker_stopped" function
CREATE FUNCTION "pgflow"."mark_worker_stopped" ("worker_id" uuid) RETURNS void LANGUAGE sql AS $$
update pgflow.workers
set stopped_at = clock_timestamp()
where workers.worker_id = mark_worker_stopped.worker_id;
$$
--SPLIT--
-- Set comment to function: "mark_worker_stopped"
COMMENT ON FUNCTION "pgflow"."mark_worker_stopped" IS 'Marks a worker as stopped for graceful shutdown. Called by workers on beforeunload.'
--SPLIT--
-- Create "track_worker_function" function
CREATE FUNCTION "pgflow"."track_worker_function" ("function_name" text) RETURNS void LANGUAGE sql AS $$
insert into pgflow.worker_functions (function_name, updated_at)
values (track_worker_function.function_name, clock_timestamp())
on conflict (function_name)
do update set
updated_at = clock_timestamp();
$$
--SPLIT--
-- Set comment to function: "track_worker_function"
COMMENT ON FUNCTION "pgflow"."track_worker_function" IS 'Registers an edge function for monitoring. Called by workers on startup.'
--SPLIT--
-- Drop "ensure_flow_compiled" function
DROP FUNCTION "pgflow"."ensure_flow_compiled" (text, jsonb, text)
--SPLIT--
-- source: 20251212100113_pgflow_allow_data_loss_parameter.sql
-- Drop old 2-parameter version before creating new 3-parameter version
DROP FUNCTION IF EXISTS "pgflow"."ensure_flow_compiled" (text, jsonb)
--SPLIT--
-- Create "ensure_flow_compiled" function with allow_data_loss parameter
CREATE FUNCTION "pgflow"."ensure_flow_compiled" ("flow_slug" text, "shape" jsonb, "allow_data_loss" boolean DEFAULT false) RETURNS jsonb LANGUAGE plpgsql SET "search_path" = '' AS $$
DECLARE
v_lock_key int;
v_flow_exists boolean;
v_db_shape jsonb;
v_differences text[];
v_is_local boolean;
BEGIN
-- Generate lock key from flow_slug (deterministic hash)
v_lock_key := hashtext(ensure_flow_compiled.flow_slug);
-- Acquire transaction-level advisory lock
-- Serializes concurrent compilation attempts for same flow
PERFORM pg_advisory_xact_lock(1, v_lock_key);
-- 1. Check if flow exists
SELECT EXISTS(SELECT 1 FROM pgflow.flows AS flow WHERE flow.flow_slug = ensure_flow_compiled.flow_slug)
INTO v_flow_exists;
-- 2. If flow missing: compile (both environments)
IF NOT v_flow_exists THEN
PERFORM pgflow._create_flow_from_shape(ensure_flow_compiled.flow_slug, ensure_flow_compiled.shape);
RETURN jsonb_build_object('status', 'compiled', 'differences', '[]'::jsonb);
END IF;
-- 3. Get current shape from DB
v_db_shape := pgflow._get_flow_shape(ensure_flow_compiled.flow_slug);
-- 4. Compare shapes
v_differences := pgflow._compare_flow_shapes(ensure_flow_compiled.shape, v_db_shape);
-- 5. If shapes match: return verified
IF array_length(v_differences, 1) IS NULL THEN
RETURN jsonb_build_object('status', 'verified', 'differences', '[]'::jsonb);
END IF;
-- 6. Shapes differ - auto-detect environment via is_local()
v_is_local := pgflow.is_local();
IF v_is_local OR allow_data_loss THEN
-- Recompile in local/dev: full deletion + fresh compile
PERFORM pgflow.delete_flow_and_data(ensure_flow_compiled.flow_slug);
PERFORM pgflow._create_flow_from_shape(ensure_flow_compiled.flow_slug, ensure_flow_compiled.shape);
RETURN jsonb_build_object('status', 'recompiled', 'differences', to_jsonb(v_differences));
ELSE
-- Fail in production
RETURN jsonb_build_object('status', 'mismatch', 'differences', to_jsonb(v_differences));
END IF;
END;
$$
--SPLIT--
-- source: 20251225163110_pgflow_add_flow_input_column.sql
-- Modify "step_task_record" composite type
ALTER TYPE "pgflow"."step_task_record" ADD ATTRIBUTE "flow_input" jsonb
--SPLIT--
-- Modify "start_tasks" function
CREATE OR REPLACE FUNCTION "pgflow"."start_tasks" ("flow_slug" text, "msg_ids" bigint[], "worker_id" uuid) RETURNS SETOF "pgflow"."step_task_record" LANGUAGE sql SET "search_path" = '' AS $$
with tasks as (
select
task.flow_slug,
task.run_id,
task.step_slug,
task.task_index,
task.message_id
from pgflow.step_tasks as task
join pgflow.runs r on r.run_id = task.run_id
where task.flow_slug = start_tasks.flow_slug
and task.message_id = any(msg_ids)
and task.status = 'queued'
-- MVP: Don't start tasks on failed runs
and r.status != 'failed'
),
start_tasks_update as (
update pgflow.step_tasks
set
attempts_count = attempts_count + 1,
status = 'started',
started_at = now(),
last_worker_id = worker_id
from tasks
where step_tasks.message_id = tasks.message_id
and step_tasks.flow_slug = tasks.flow_slug
and step_tasks.status = 'queued'
),
runs as (
select
r.run_id,
r.input
from pgflow.runs r
where r.run_id in (select run_id from tasks)
),
deps as (
select
st.run_id,
st.step_slug,
dep.dep_slug,
-- Aggregate map outputs or use single output
CASE
WHEN dep_step.step_type = 'map' THEN
-- Aggregate all task outputs ordered by task_index
-- Use COALESCE to return empty array if no tasks
(SELECT COALESCE(jsonb_agg(dt.output ORDER BY dt.task_index), '[]'::jsonb)
FROM pgflow.step_tasks dt
WHERE dt.run_id = st.run_id
AND dt.step_slug = dep.dep_slug
AND dt.status = 'completed')
ELSE
-- Single step: use the single task output
dep_task.output
END as dep_output
from tasks st
join pgflow.deps dep on dep.flow_slug = st.flow_slug and dep.step_slug = st.step_slug
join pgflow.steps dep_step on dep_step.flow_slug = dep.flow_slug and dep_step.step_slug = dep.dep_slug
left join pgflow.step_tasks dep_task on
dep_task.run_id = st.run_id and
dep_task.step_slug = dep.dep_slug and
dep_task.status = 'completed'
and dep_step.step_type = 'single' -- Only join for single steps
),
deps_outputs as (
select
d.run_id,
d.step_slug,
jsonb_object_agg(d.dep_slug, d.dep_output) as deps_output,
count(*) as dep_count
from deps d
group by d.run_id, d.step_slug
),
timeouts as (
select
task.message_id,
task.flow_slug,
coalesce(step.opt_timeout, flow.opt_timeout) + 2 as vt_delay
from tasks task
join pgflow.flows flow on flow.flow_slug = task.flow_slug
join pgflow.steps step on step.flow_slug = task.flow_slug and step.step_slug = task.step_slug
),
-- Batch update visibility timeouts for all messages
set_vt_batch as (
select pgflow.set_vt_batch(
start_tasks.flow_slug,
array_agg(t.message_id order by t.message_id),
array_agg(t.vt_delay order by t.message_id)
)
from timeouts t
)
select
st.flow_slug,
st.run_id,
st.step_slug,
-- ==========================================
-- INPUT CONSTRUCTION LOGIC
-- ==========================================
-- This nested CASE statement determines how to construct the input
-- for each task based on the step type (map vs non-map).
--
-- The fundamental difference:
-- - Map steps: Receive RAW array elements (e.g., just 42 or "hello")
-- - Non-map steps: Receive structured objects with named keys
-- (e.g., {"run": {...}, "dependency1": {...}})
-- ==========================================
CASE
-- -------------------- MAP STEPS --------------------
-- Map steps process arrays element-by-element.
-- Each task receives ONE element from the array at its task_index position.
WHEN step.step_type = 'map' THEN
-- Map steps get raw array elements without any wrapper object
CASE
-- ROOT MAP: Gets array from run input
-- Example: run input = [1, 2, 3]
-- task 0 gets: 1
-- task 1 gets: 2
-- task 2 gets: 3
WHEN step.deps_count = 0 THEN
-- Root map (deps_count = 0): no dependencies, reads from run input.
-- Extract the element at task_index from the run's input array.
-- Note: If run input is not an array, this will return NULL
-- and the flow will fail (validated in start_flow).
jsonb_array_element(r.input, st.task_index)
-- DEPENDENT MAP: Gets array from its single dependency
-- Example: dependency output = ["a", "b", "c"]
-- task 0 gets: "a"
-- task 1 gets: "b"
-- task 2 gets: "c"
ELSE
-- Has dependencies (should be exactly 1 for map steps).
-- Extract the element at task_index from the dependency's output array.
--
-- Why the subquery with jsonb_each?
-- - The dependency outputs a raw array: [1, 2, 3]
-- - deps_outputs aggregates it into: {"dep_name": [1, 2, 3]}
-- - We need to unwrap and get just the array value
-- - Map steps have exactly 1 dependency (enforced by add_step)
-- - So jsonb_each will return exactly 1 row
-- - We extract the 'value' which is the raw array [1, 2, 3]
-- - Then get the element at task_index from that array
(SELECT jsonb_array_element(value, st.task_index)
FROM jsonb_each(dep_out.deps_output)
LIMIT 1)
END
-- -------------------- NON-MAP STEPS --------------------
-- Regular (non-map) steps receive dependency outputs as a structured object.
-- Root steps (no dependencies) get empty object - they access flowInput via context.
-- Dependent steps get only their dependency outputs.
ELSE
-- Non-map steps get structured input with dependency keys only
-- Example for dependent step: {
-- "step1": {"output": "from_step1"},
-- "step2": {"output": "from_step2"}
-- }
-- Example for root step: {}
--
-- Note: flow_input is available separately in the returned record
-- for workers to access via context.flowInput
coalesce(dep_out.deps_output, '{}'::jsonb)
END as input,
st.message_id as msg_id,
st.task_index as task_index,
-- flow_input: Original run input for worker context
-- Only included for root non-map steps to avoid data duplication.
-- Root map steps: flowInput IS the array, useless to include
-- Dependent steps: lazy load via ctx.flowInput when needed
CASE
WHEN step.step_type != 'map' AND step.deps_count = 0
THEN r.input
ELSE NULL
END as flow_input
from tasks st
join runs r on st.run_id = r.run_id
join pgflow.steps step on
step.flow_slug = st.flow_slug and
step.step_slug = st.step_slug
left join deps_outputs dep_out on
dep_out.run_id = st.run_id and
dep_out.step_slug = st.step_slug
$$
--SPLIT--
-- source: 20260103145141_pgflow_step_output_storage.sql
-- Modify "step_states" table
ALTER TABLE "pgflow"."step_states" ADD CONSTRAINT "output_only_for_completed_or_null" CHECK ((output IS NULL) OR (status = 'completed'::text)), ADD COLUMN "output" jsonb NULL
--SPLIT--
-- ==========================================
-- DATA BACKFILL: Populate output for existing completed steps
-- ==========================================
-- This backfill is safe: only touches completed rows, active workflows not affected
-- Backfill single steps (store task output directly)
UPDATE pgflow.step_states ss
SET output = (
SELECT st.output
FROM pgflow.step_tasks st
WHERE st.run_id = ss.run_id
AND st.step_slug = ss.step_slug
AND st.status = 'completed'
LIMIT 1
)
FROM pgflow.steps s
WHERE s.flow_slug = ss.flow_slug
AND s.step_slug = ss.step_slug
AND s.step_type = 'single'
AND ss.status = 'completed'
--SPLIT--
-- Backfill map steps (aggregate task outputs into array)
UPDATE pgflow.step_states ss
SET output = COALESCE(
(SELECT jsonb_agg(st.output ORDER BY st.task_index)
FROM pgflow.step_tasks st
WHERE st.run_id = ss.run_id
AND st.step_slug = ss.step_slug
AND st.status = 'completed'),
'[]'::jsonb
)
FROM pgflow.steps s
WHERE s.flow_slug = ss.flow_slug
AND s.step_slug = ss.step_slug
AND s.step_type = 'map'
AND ss.status = 'completed'
--SPLIT--
-- Modify "cascade_complete_taskless_steps" function
CREATE OR REPLACE FUNCTION "pgflow"."cascade_complete_taskless_steps" ("run_id" uuid) RETURNS integer LANGUAGE plpgsql AS $$
DECLARE
v_total_completed int := 0;
v_iteration_completed int;
v_iterations int := 0;
v_max_iterations int := 50;
BEGIN
-- ==========================================
-- ITERATIVE CASCADE COMPLETION
-- ==========================================
-- Completes taskless steps in waves until none remain
LOOP
-- ---------- Safety check ----------
v_iterations := v_iterations + 1;
IF v_iterations > v_max_iterations THEN
RAISE EXCEPTION 'Cascade loop exceeded safety limit of % iterations', v_max_iterations;
END IF;
-- ==========================================
-- COMPLETE READY TASKLESS STEPS
-- ==========================================
WITH
-- ---------- Find steps to complete in topological order ----------
steps_to_complete AS (
SELECT ss.run_id, ss.flow_slug, ss.step_slug, s.step_type
FROM pgflow.step_states ss
JOIN pgflow.steps s ON s.flow_slug = ss.flow_slug AND s.step_slug = ss.step_slug
WHERE ss.run_id = cascade_complete_taskless_steps.run_id
AND ss.status = 'created'
AND ss.remaining_deps = 0
AND ss.initial_tasks = 0
-- Process in topological order to ensure proper cascade
ORDER BY s.step_index
),
completed AS (
-- ---------- Complete taskless steps ----------
-- Steps with initial_tasks=0 and no remaining deps
-- Store output atomically: map steps get [], single steps get NULL
UPDATE pgflow.step_states ss
SET status = 'completed',
started_at = now(),
completed_at = now(),
remaining_tasks = 0,
-- Set output based on step type
output = CASE
WHEN stc.step_type = 'map' THEN '[]'::jsonb
ELSE NULL -- Single steps get NULL (for future conditional execution)
END
FROM steps_to_complete stc
WHERE ss.run_id = stc.run_id
AND ss.step_slug = stc.step_slug
RETURNING
ss.*,
-- Broadcast step:completed event atomically with the UPDATE
-- Using RETURNING ensures this executes during row processing
-- and cannot be optimized away by the query planner
realtime.send(
jsonb_build_object(
'event_type', 'step:completed',
'run_id', ss.run_id,
'step_slug', ss.step_slug,
'status', 'completed',
'started_at', ss.started_at,
'completed_at', ss.completed_at,
'remaining_tasks', 0,
'remaining_deps', 0,
'output', ss.output -- Use stored output instead of hardcoded []
),
concat('step:', ss.step_slug, ':completed'),
concat('pgflow:run:', ss.run_id),
false
) as _broadcast_result -- Prefix with _ to indicate internal use only
),
-- ---------- Update dependent steps ----------
-- Propagate completion and empty arrays to dependents
dep_updates AS (
UPDATE pgflow.step_states ss
SET remaining_deps = ss.remaining_deps - dep_count.count,
-- If the dependent is a map step and its dependency completed with 0 tasks,
-- set its initial_tasks to 0 as well
initial_tasks = CASE
WHEN s.step_type = 'map' AND dep_count.has_zero_tasks
THEN 0 -- Empty array propagation
ELSE ss.initial_tasks -- Keep existing value (including NULL)
END
FROM (
-- Aggregate dependency updates per dependent step
SELECT
d.flow_slug,
d.step_slug as dependent_slug,
COUNT(*) as count,
BOOL_OR(c.initial_tasks = 0) as has_zero_tasks
FROM completed c
JOIN pgflow.deps d ON d.flow_slug = c.flow_slug
AND d.dep_slug = c.step_slug
GROUP BY d.flow_slug, d.step_slug
) dep_count,
pgflow.steps s
WHERE ss.run_id = cascade_complete_taskless_steps.run_id
AND ss.flow_slug = dep_count.flow_slug
AND ss.step_slug = dep_count.dependent_slug
AND s.flow_slug = ss.flow_slug
AND s.step_slug = ss.step_slug
),
-- ---------- Update run counters ----------
-- Only decrement remaining_steps; let maybe_complete_run handle finalization
run_updates AS (
UPDATE pgflow.runs r
SET remaining_steps = r.remaining_steps - c.completed_count
FROM (SELECT COUNT(*) AS completed_count FROM completed) c
WHERE r.run_id = cascade_complete_taskless_steps.run_id
AND c.completed_count > 0
)
-- ---------- Check iteration results ----------
SELECT COUNT(*) INTO v_iteration_completed FROM completed;
EXIT WHEN v_iteration_completed = 0; -- No more steps to complete
v_total_completed := v_total_completed + v_iteration_completed;
END LOOP;
RETURN v_total_completed;
END;
$$
--SPLIT--
-- Modify "maybe_complete_run" function
CREATE OR REPLACE FUNCTION "pgflow"."maybe_complete_run" ("run_id" uuid) RETURNS void LANGUAGE plpgsql SET "search_path" = '' AS $$
declare
v_completed_run pgflow.runs%ROWTYPE;
begin
-- ==========================================
-- CHECK AND COMPLETE RUN IF FINISHED
-- ==========================================
-- ---------- Complete run if all steps done ----------
UPDATE pgflow.runs
SET
status = 'completed',
completed_at = now(),
-- Gather outputs from leaf steps (already stored in step_states.output by writers)
output = (
-- Leaf steps = steps with no dependents
SELECT jsonb_object_agg(
leaf_state.step_slug,
leaf_state.output -- Already aggregated by writers
)
FROM pgflow.step_states leaf_state
WHERE leaf_state.run_id = maybe_complete_run.run_id
AND leaf_state.status = 'completed'
AND NOT EXISTS (
SELECT 1
FROM pgflow.deps dep
WHERE dep.flow_slug = leaf_state.flow_slug
AND dep.dep_slug = leaf_state.step_slug
)
)
WHERE pgflow.runs.run_id = maybe_complete_run.run_id
AND pgflow.runs.remaining_steps = 0
AND pgflow.runs.status != 'completed'
RETURNING * INTO v_completed_run;
-- ==========================================
-- BROADCAST COMPLETION EVENT
-- ==========================================
IF v_completed_run.run_id IS NOT NULL THEN
PERFORM realtime.send(
jsonb_build_object(
'event_type', 'run:completed',
'run_id', v_completed_run.run_id,
'flow_slug', v_completed_run.flow_slug,
'status', 'completed',
'output', v_completed_run.output,
'completed_at', v_completed_run.completed_at
),
'run:completed',
concat('pgflow:run:', v_completed_run.run_id),
false
);
END IF;
end;
$$
--SPLIT--
-- Modify "start_ready_steps" function
CREATE OR REPLACE FUNCTION "pgflow"."start_ready_steps" ("run_id" uuid) RETURNS void LANGUAGE plpgsql SET "search_path" = '' AS $$
begin
-- ==========================================
-- GUARD: No mutations on failed runs
-- ==========================================
IF EXISTS (SELECT 1 FROM pgflow.runs WHERE pgflow.runs.run_id = start_ready_steps.run_id AND pgflow.runs.status = 'failed') THEN
RETURN;
END IF;
-- ==========================================
-- HANDLE EMPTY ARRAY MAPS (initial_tasks = 0)
-- ==========================================
-- These complete immediately without spawning tasks
WITH empty_map_steps AS (
SELECT step_state.*
FROM pgflow.step_states AS step_state
JOIN pgflow.steps AS step
ON step.flow_slug = step_state.flow_slug
AND step.step_slug = step_state.step_slug
WHERE step_state.run_id = start_ready_steps.run_id
AND step_state.status = 'created'
AND step_state.remaining_deps = 0
AND step.step_type = 'map'
AND step_state.initial_tasks = 0
ORDER BY step_state.step_slug
FOR UPDATE OF step_state
),
-- ---------- Complete empty map steps ----------
completed_empty_steps AS (
UPDATE pgflow.step_states
SET status = 'completed',
started_at = now(),
completed_at = now(),
remaining_tasks = 0,
output = '[]'::jsonb -- Empty map produces empty array output
FROM empty_map_steps
WHERE pgflow.step_states.run_id = start_ready_steps.run_id
AND pgflow.step_states.step_slug = empty_map_steps.step_slug
RETURNING
pgflow.step_states.*,
-- Broadcast step:completed event atomically with the UPDATE
-- Using RETURNING ensures this executes during row processing
-- and cannot be optimized away by the query planner
realtime.send(
jsonb_build_object(
'event_type', 'step:completed',
'run_id', pgflow.step_states.run_id,
'step_slug', pgflow.step_states.step_slug,
'status', 'completed',
'started_at', pgflow.step_states.started_at,
'completed_at', pgflow.step_states.completed_at,
'remaining_tasks', 0,
'remaining_deps', 0,
'output', pgflow.step_states.output -- Use stored output instead of hardcoded []
),
concat('step:', pgflow.step_states.step_slug, ':completed'),
concat('pgflow:run:', pgflow.step_states.run_id),
false
) as _broadcast_completed -- Prefix with _ to indicate internal use only
),
-- ==========================================
-- HANDLE NORMAL STEPS (initial_tasks > 0)
-- ==========================================
-- ---------- Find ready steps ----------
-- Steps with no remaining deps and known task count
ready_steps AS (
SELECT *
FROM pgflow.step_states AS step_state
WHERE step_state.run_id = start_ready_steps.run_id
AND step_state.status = 'created'
AND step_state.remaining_deps = 0
AND step_state.initial_tasks IS NOT NULL -- NEW: Cannot start with unknown count
AND step_state.initial_tasks > 0 -- Don't start taskless steps
-- Exclude empty map steps already handled
AND NOT EXISTS (
SELECT 1 FROM empty_map_steps
WHERE empty_map_steps.run_id = step_state.run_id
AND empty_map_steps.step_slug = step_state.step_slug
)
ORDER BY step_state.step_slug
FOR UPDATE
),
-- ---------- Mark steps as started ----------
started_step_states AS (
UPDATE pgflow.step_states
SET status = 'started',
started_at = now(),
remaining_tasks = ready_steps.initial_tasks -- Copy initial_tasks to remaining_tasks when starting
FROM ready_steps
WHERE pgflow.step_states.run_id = start_ready_steps.run_id
AND pgflow.step_states.step_slug = ready_steps.step_slug
RETURNING pgflow.step_states.*,
-- Broadcast step:started event atomically with the UPDATE
-- Using RETURNING ensures this executes during row processing
-- and cannot be optimized away by the query planner
realtime.send(
jsonb_build_object(
'event_type', 'step:started',
'run_id', pgflow.step_states.run_id,
'step_slug', pgflow.step_states.step_slug,
'status', 'started',
'started_at', pgflow.step_states.started_at,
'remaining_tasks', pgflow.step_states.remaining_tasks,
'remaining_deps', pgflow.step_states.remaining_deps
),
concat('step:', pgflow.step_states.step_slug, ':started'),
concat('pgflow:run:', pgflow.step_states.run_id),
false
) as _broadcast_result -- Prefix with _ to indicate internal use only
),
-- ==========================================
-- TASK GENERATION AND QUEUE MESSAGES
-- ==========================================
-- ---------- Generate tasks and batch messages ----------
-- Single steps: 1 task (index 0)
-- Map steps: N tasks (indices 0..N-1)
message_batches AS (
SELECT
started_step.flow_slug,
started_step.run_id,
started_step.step_slug,
COALESCE(step.opt_start_delay, 0) as delay,
array_agg(
jsonb_build_object(
'flow_slug', started_step.flow_slug,
'run_id', started_step.run_id,
'step_slug', started_step.step_slug,
'task_index', task_idx.task_index
) ORDER BY task_idx.task_index
) AS messages,
array_agg(task_idx.task_index ORDER BY task_idx.task_index) AS task_indices
FROM started_step_states AS started_step
JOIN pgflow.steps AS step
ON step.flow_slug = started_step.flow_slug
AND step.step_slug = started_step.step_slug
-- Generate task indices from 0 to initial_tasks-1
CROSS JOIN LATERAL generate_series(0, started_step.initial_tasks - 1) AS task_idx(task_index)
GROUP BY started_step.flow_slug, started_step.run_id, started_step.step_slug, step.opt_start_delay
),
-- ---------- Send messages to queue ----------
-- Uses batch sending for performance with large arrays
sent_messages AS (
SELECT
mb.flow_slug,
mb.run_id,
mb.step_slug,
task_indices.task_index,
msg_ids.msg_id
FROM message_batches mb
CROSS JOIN LATERAL unnest(mb.task_indices) WITH ORDINALITY AS task_indices(task_index, idx_ord)
CROSS JOIN LATERAL pgmq.send_batch(mb.flow_slug, mb.messages, mb.delay) WITH ORDINALITY AS msg_ids(msg_id, msg_ord)
WHERE task_indices.idx_ord = msg_ids.msg_ord
)
-- ==========================================
-- RECORD TASKS IN DATABASE
-- ==========================================
INSERT INTO pgflow.step_tasks (flow_slug, run_id, step_slug, task_index, message_id)
SELECT
sent_messages.flow_slug,
sent_messages.run_id,
sent_messages.step_slug,
sent_messages.task_index,
sent_messages.msg_id
FROM sent_messages;
-- ==========================================
-- BROADCAST REALTIME EVENTS
-- ==========================================
-- Note: Both step:completed events for empty maps and step:started events
-- are now broadcast atomically in their respective CTEs using RETURNING pattern.
-- This ensures correct ordering, prevents duplicate broadcasts, and guarantees
-- that events are sent for exactly the rows that were updated.
end;
$$
--SPLIT--
-- Modify "complete_task" function
CREATE OR REPLACE FUNCTION "pgflow"."complete_task" ("run_id" uuid, "step_slug" text, "task_index" integer, "output" jsonb) RETURNS SETOF "pgflow"."step_tasks" LANGUAGE plpgsql SET "search_path" = '' AS $$
declare
v_step_state pgflow.step_states%ROWTYPE;
v_dependent_map_slug text;
v_run_record pgflow.runs%ROWTYPE;
v_step_record pgflow.step_states%ROWTYPE;
begin
-- ==========================================
-- GUARD: No mutations on failed runs
-- ==========================================
IF EXISTS (SELECT 1 FROM pgflow.runs WHERE pgflow.runs.run_id = complete_task.run_id AND pgflow.runs.status = 'failed') THEN
RETURN QUERY SELECT * FROM pgflow.step_tasks
WHERE pgflow.step_tasks.run_id = complete_task.run_id
AND pgflow.step_tasks.step_slug = complete_task.step_slug
AND pgflow.step_tasks.task_index = complete_task.task_index;
RETURN;
END IF;
-- ==========================================
-- LOCK ACQUISITION AND TYPE VALIDATION
-- ==========================================
-- Acquire locks first to prevent race conditions
SELECT * INTO v_run_record FROM pgflow.runs
WHERE pgflow.runs.run_id = complete_task.run_id
FOR UPDATE;
SELECT * INTO v_step_record FROM pgflow.step_states
WHERE pgflow.step_states.run_id = complete_task.run_id
AND pgflow.step_states.step_slug = complete_task.step_slug
FOR UPDATE;
-- Check for type violations AFTER acquiring locks
SELECT child_step.step_slug INTO v_dependent_map_slug
FROM pgflow.deps dependency
JOIN pgflow.steps child_step ON child_step.flow_slug = dependency.flow_slug
AND child_step.step_slug = dependency.step_slug
JOIN pgflow.steps parent_step ON parent_step.flow_slug = dependency.flow_slug
AND parent_step.step_slug = dependency.dep_slug
JOIN pgflow.step_states child_state ON child_state.flow_slug = child_step.flow_slug
AND child_state.step_slug = child_step.step_slug
WHERE dependency.dep_slug = complete_task.step_slug -- parent is the completing step
AND dependency.flow_slug = v_run_record.flow_slug
AND parent_step.step_type = 'single' -- Only validate single steps
AND child_step.step_type = 'map'
AND child_state.run_id = complete_task.run_id
AND child_state.initial_tasks IS NULL
AND (complete_task.output IS NULL OR jsonb_typeof(complete_task.output) != 'array')
LIMIT 1;
-- Handle type violation if detected
IF v_dependent_map_slug IS NOT NULL THEN
-- Mark run as failed immediately
UPDATE pgflow.runs
SET status = 'failed',
failed_at = now()
WHERE pgflow.runs.run_id = complete_task.run_id;
-- Broadcast run:failed event
-- Uses PERFORM pattern to ensure execution (proven reliable pattern in this function)
PERFORM realtime.send(
jsonb_build_object(
'event_type', 'run:failed',
'run_id', complete_task.run_id,
'flow_slug', v_run_record.flow_slug,
'status', 'failed',
'failed_at', now()
),
'run:failed',
concat('pgflow:run:', complete_task.run_id),
false
);
-- Archive all active messages (both queued and started) to prevent orphaned messages
PERFORM pgmq.archive(
v_run_record.flow_slug,
array_agg(st.message_id)
)
FROM pgflow.step_tasks st
WHERE st.run_id = complete_task.run_id
AND st.status IN ('queued', 'started')
AND st.message_id IS NOT NULL
HAVING count(*) > 0; -- Only call archive if there are messages to archive
-- Mark current task as failed and store the output
UPDATE pgflow.step_tasks
SET status = 'failed',
failed_at = now(),
output = complete_task.output, -- Store the output that caused the violation
error_message = '[TYPE_VIOLATION] Produced ' ||
CASE WHEN complete_task.output IS NULL THEN 'null'
ELSE jsonb_typeof(complete_task.output) END ||
' instead of array'
WHERE pgflow.step_tasks.run_id = complete_task.run_id
AND pgflow.step_tasks.step_slug = complete_task.step_slug
AND pgflow.step_tasks.task_index = complete_task.task_index;
-- Mark step state as failed
UPDATE pgflow.step_states
SET status = 'failed',
failed_at = now(),
error_message = '[TYPE_VIOLATION] Map step ' || v_dependent_map_slug ||
' expects array input but dependency ' || complete_task.step_slug ||
' produced ' || CASE WHEN complete_task.output IS NULL THEN 'null'
ELSE jsonb_typeof(complete_task.output) END
WHERE pgflow.step_states.run_id = complete_task.run_id
AND pgflow.step_states.step_slug = complete_task.step_slug;
-- Broadcast step:failed event
-- Uses PERFORM pattern to ensure execution (proven reliable pattern in this function)
PERFORM realtime.send(
jsonb_build_object(
'event_type', 'step:failed',
'run_id', complete_task.run_id,
'step_slug', complete_task.step_slug,
'status', 'failed',
'error_message', '[TYPE_VIOLATION] Map step ' || v_dependent_map_slug ||
' expects array input but dependency ' || complete_task.step_slug ||
' produced ' || CASE WHEN complete_task.output IS NULL THEN 'null'
ELSE jsonb_typeof(complete_task.output) END,
'failed_at', now()
),
concat('step:', complete_task.step_slug, ':failed'),
concat('pgflow:run:', complete_task.run_id),
false
);
-- Archive the current task's message (it was started, now failed)
PERFORM pgmq.archive(
v_run_record.flow_slug,
st.message_id -- Single message, use scalar form
)
FROM pgflow.step_tasks st
WHERE st.run_id = complete_task.run_id
AND st.step_slug = complete_task.step_slug
AND st.task_index = complete_task.task_index
AND st.message_id IS NOT NULL;
-- Return empty result
RETURN QUERY SELECT * FROM pgflow.step_tasks WHERE false;
RETURN;
END IF;
-- ==========================================
-- MAIN CTE CHAIN: Update task and propagate changes
-- ==========================================
WITH
-- ---------- Task completion ----------
-- Update the task record with completion status and output
task AS (
UPDATE pgflow.step_tasks
SET
status = 'completed',
completed_at = now(),
output = complete_task.output
WHERE pgflow.step_tasks.run_id = complete_task.run_id
AND pgflow.step_tasks.step_slug = complete_task.step_slug
AND pgflow.step_tasks.task_index = complete_task.task_index
AND pgflow.step_tasks.status = 'started'
RETURNING *
),
-- ---------- Get step type for output handling ----------
step_def AS (
SELECT step.step_type
FROM pgflow.steps step
JOIN pgflow.runs run ON run.flow_slug = step.flow_slug
WHERE run.run_id = complete_task.run_id
AND step.step_slug = complete_task.step_slug
),
-- ---------- Step state update ----------
-- Decrement remaining_tasks and potentially mark step as completed
-- Also store output atomically with status transition to completed
step_state AS (
UPDATE pgflow.step_states
SET
status = CASE
WHEN pgflow.step_states.remaining_tasks = 1 THEN 'completed' -- Will be 0 after decrement
ELSE 'started'
END,
completed_at = CASE
WHEN pgflow.step_states.remaining_tasks = 1 THEN now() -- Will be 0 after decrement
ELSE NULL
END,
remaining_tasks = pgflow.step_states.remaining_tasks - 1,
-- Store output atomically with completion (only when remaining_tasks = 1, meaning step completes)
output = CASE
-- Single step: store task output directly when completing
WHEN (SELECT step_type FROM step_def) = 'single' AND pgflow.step_states.remaining_tasks = 1 THEN
complete_task.output
-- Map step: aggregate on completion (ordered by task_index)
WHEN (SELECT step_type FROM step_def) = 'map' AND pgflow.step_states.remaining_tasks = 1 THEN
(SELECT COALESCE(jsonb_agg(all_outputs.output ORDER BY all_outputs.task_index), '[]'::jsonb)
FROM (
-- All previously completed tasks
SELECT st.output, st.task_index
FROM pgflow.step_tasks st
WHERE st.run_id = complete_task.run_id
AND st.step_slug = complete_task.step_slug
AND st.status = 'completed'
UNION ALL
-- Current task being completed (not yet visible as completed in snapshot)
SELECT complete_task.output, complete_task.task_index
) all_outputs)
ELSE pgflow.step_states.output
END
FROM task
WHERE pgflow.step_states.run_id = complete_task.run_id
AND pgflow.step_states.step_slug = complete_task.step_slug
RETURNING pgflow.step_states.*
),
-- ---------- Dependency resolution ----------
-- Find all child steps that depend on the completed parent step (only if parent completed)
child_steps AS (
SELECT deps.step_slug AS child_step_slug
FROM pgflow.deps deps
JOIN step_state parent_state ON parent_state.status = 'completed' AND deps.flow_slug = parent_state.flow_slug
WHERE deps.dep_slug = complete_task.step_slug -- dep_slug is the parent, step_slug is the child
ORDER BY deps.step_slug -- Ensure consistent ordering
),
-- ---------- Lock child steps ----------
-- Acquire locks on all child steps before updating them
child_steps_lock AS (
SELECT * FROM pgflow.step_states
WHERE pgflow.step_states.run_id = complete_task.run_id
AND pgflow.step_states.step_slug IN (SELECT child_step_slug FROM child_steps)
FOR UPDATE
),
-- ---------- Update child steps ----------
-- Decrement remaining_deps and resolve NULL initial_tasks for map steps
child_steps_update AS (
UPDATE pgflow.step_states child_state
SET remaining_deps = child_state.remaining_deps - 1,
-- Resolve NULL initial_tasks for child map steps
-- This is where child maps learn their array size from the parent
-- This CTE only runs when the parent step is complete (see child_steps JOIN)
initial_tasks = CASE
WHEN child_step.step_type = 'map' AND child_state.initial_tasks IS NULL THEN
CASE
WHEN parent_step.step_type = 'map' THEN
-- Map->map: Count all completed tasks from parent map
-- We add 1 because the current task is being completed in this transaction
-- but isn't yet visible as 'completed' in the step_tasks table
-- TODO: Refactor to use future column step_states.total_tasks
-- Would eliminate the COUNT query and just use parent_state.total_tasks
(SELECT COUNT(*)::int + 1
FROM pgflow.step_tasks parent_tasks
WHERE parent_tasks.run_id = complete_task.run_id
AND parent_tasks.step_slug = complete_task.step_slug
AND parent_tasks.status = 'completed'
AND parent_tasks.task_index != complete_task.task_index)
ELSE
-- Single->map: Use output array length (single steps complete immediately)
CASE
WHEN complete_task.output IS NOT NULL
AND jsonb_typeof(complete_task.output) = 'array' THEN
jsonb_array_length(complete_task.output)
ELSE NULL -- Keep NULL if not an array
END
END
ELSE child_state.initial_tasks -- Keep existing value (including NULL)
END
FROM child_steps children
JOIN pgflow.steps child_step ON child_step.flow_slug = (SELECT r.flow_slug FROM pgflow.runs r WHERE r.run_id = complete_task.run_id)
AND child_step.step_slug = children.child_step_slug
JOIN pgflow.steps parent_step ON parent_step.flow_slug = (SELECT r.flow_slug FROM pgflow.runs r WHERE r.run_id = complete_task.run_id)
AND parent_step.step_slug = complete_task.step_slug
WHERE child_state.run_id = complete_task.run_id
AND child_state.step_slug = children.child_step_slug
)
-- ---------- Update run remaining_steps ----------
-- Decrement the run's remaining_steps counter if step completed
UPDATE pgflow.runs
SET remaining_steps = pgflow.runs.remaining_steps - 1
FROM step_state
WHERE pgflow.runs.run_id = complete_task.run_id
AND step_state.status = 'completed';
-- ==========================================
-- POST-COMPLETION ACTIONS
-- ==========================================
-- ---------- Get updated state for broadcasting ----------
SELECT * INTO v_step_state FROM pgflow.step_states
WHERE pgflow.step_states.run_id = complete_task.run_id AND pgflow.step_states.step_slug = complete_task.step_slug;
-- ---------- Handle step completion ----------
IF v_step_state.status = 'completed' THEN
-- Broadcast step:completed event FIRST (before cascade)
-- This ensures parent broadcasts before its dependent children
-- Use stored output from step_states (set atomically during status transition)
PERFORM realtime.send(
jsonb_build_object(
'event_type', 'step:completed',
'run_id', complete_task.run_id,
'step_slug', complete_task.step_slug,
'status', 'completed',
'output', v_step_state.output, -- Use stored output instead of re-aggregating
'completed_at', v_step_state.completed_at
),
concat('step:', complete_task.step_slug, ':completed'),
concat('pgflow:run:', complete_task.run_id),
false
);
-- THEN cascade complete any taskless steps that are now ready
-- This ensures dependent children broadcast AFTER their parent
PERFORM pgflow.cascade_complete_taskless_steps(complete_task.run_id);
END IF;
-- ---------- Archive completed task message ----------
-- Move message from active queue to archive table
PERFORM (
WITH completed_tasks AS (
SELECT r.flow_slug, st.message_id
FROM pgflow.step_tasks st
JOIN pgflow.runs r ON st.run_id = r.run_id
WHERE st.run_id = complete_task.run_id
AND st.step_slug = complete_task.step_slug
AND st.task_index = complete_task.task_index
AND st.status = 'completed'
)
SELECT pgmq.archive(ct.flow_slug, ct.message_id)
FROM completed_tasks ct
WHERE EXISTS (SELECT 1 FROM completed_tasks)
);
-- ---------- Trigger next steps ----------
-- Start any steps that are now ready (deps satisfied)
PERFORM pgflow.start_ready_steps(complete_task.run_id);
-- Check if the entire run is complete
PERFORM pgflow.maybe_complete_run(complete_task.run_id);
-- ---------- Return completed task ----------
RETURN QUERY SELECT *
FROM pgflow.step_tasks AS step_task
WHERE step_task.run_id = complete_task.run_id
AND step_task.step_slug = complete_task.step_slug
AND step_task.task_index = complete_task.task_index;
end;
$$
--SPLIT--
-- Modify "start_tasks" function
CREATE OR REPLACE FUNCTION "pgflow"."start_tasks" ("flow_slug" text, "msg_ids" bigint[], "worker_id" uuid) RETURNS SETOF "pgflow"."step_task_record" LANGUAGE sql SET "search_path" = '' AS $$
with tasks as (
select
task.flow_slug,
task.run_id,
task.step_slug,
task.task_index,
task.message_id
from pgflow.step_tasks as task
join pgflow.runs r on r.run_id = task.run_id
where task.flow_slug = start_tasks.flow_slug
and task.message_id = any(msg_ids)
and task.status = 'queued'
-- MVP: Don't start tasks on failed runs
and r.status != 'failed'
),
start_tasks_update as (
update pgflow.step_tasks
set
attempts_count = attempts_count + 1,
status = 'started',
started_at = now(),
last_worker_id = worker_id
from tasks
where step_tasks.message_id = tasks.message_id
and step_tasks.flow_slug = tasks.flow_slug
and step_tasks.status = 'queued'
),
runs as (
select
r.run_id,
r.input
from pgflow.runs r
where r.run_id in (select run_id from tasks)
),
deps as (
select
st.run_id,
st.step_slug,
dep.dep_slug,
-- Read output directly from step_states (already aggregated by writers)
dep_state.output as dep_output
from tasks st
join pgflow.deps dep on dep.flow_slug = st.flow_slug and dep.step_slug = st.step_slug
join pgflow.step_states dep_state on
dep_state.run_id = st.run_id and
dep_state.step_slug = dep.dep_slug
),
deps_outputs as (
select
d.run_id,
d.step_slug,
jsonb_object_agg(d.dep_slug, d.dep_output) as deps_output,
count(*) as dep_count
from deps d
group by d.run_id, d.step_slug
),
timeouts as (
select
task.message_id,
task.flow_slug,
coalesce(step.opt_timeout, flow.opt_timeout) + 2 as vt_delay
from tasks task
join pgflow.flows flow on flow.flow_slug = task.flow_slug
join pgflow.steps step on step.flow_slug = task.flow_slug and step.step_slug = task.step_slug
),
-- Batch update visibility timeouts for all messages
set_vt_batch as (
select pgflow.set_vt_batch(
start_tasks.flow_slug,
array_agg(t.message_id order by t.message_id),
array_agg(t.vt_delay order by t.message_id)
)
from timeouts t
)
select
st.flow_slug,
st.run_id,
st.step_slug,
-- ==========================================
-- INPUT CONSTRUCTION LOGIC
-- ==========================================
-- This nested CASE statement determines how to construct the input
-- for each task based on the step type (map vs non-map).
--
-- The fundamental difference:
-- - Map steps: Receive RAW array elements (e.g., just 42 or "hello")
-- - Non-map steps: Receive structured objects with named keys
-- (e.g., {"run": {...}, "dependency1": {...}})
-- ==========================================
CASE
-- -------------------- MAP STEPS --------------------
-- Map steps process arrays element-by-element.
-- Each task receives ONE element from the array at its task_index position.
WHEN step.step_type = 'map' THEN
-- Map steps get raw array elements without any wrapper object
CASE
-- ROOT MAP: Gets array from run input
-- Example: run input = [1, 2, 3]
-- task 0 gets: 1
-- task 1 gets: 2
-- task 2 gets: 3
WHEN step.deps_count = 0 THEN
-- Root map (deps_count = 0): no dependencies, reads from run input.
-- Extract the element at task_index from the run's input array.
-- Note: If run input is not an array, this will return NULL
-- and the flow will fail (validated in start_flow).
jsonb_array_element(r.input, st.task_index)
-- DEPENDENT MAP: Gets array from its single dependency
-- Example: dependency output = ["a", "b", "c"]
-- task 0 gets: "a"
-- task 1 gets: "b"
-- task 2 gets: "c"
ELSE
-- Has dependencies (should be exactly 1 for map steps).
-- Extract the element at task_index from the dependency's output array.
--
-- Why the subquery with jsonb_each?
-- - The dependency outputs a raw array: [1, 2, 3]
-- - deps_outputs aggregates it into: {"dep_name": [1, 2, 3]}
-- - We need to unwrap and get just the array value
-- - Map steps have exactly 1 dependency (enforced by add_step)
-- - So jsonb_each will return exactly 1 row
-- - We extract the 'value' which is the raw array [1, 2, 3]
-- - Then get the element at task_index from that array
(SELECT jsonb_array_element(value, st.task_index)
FROM jsonb_each(dep_out.deps_output)
LIMIT 1)
END
-- -------------------- NON-MAP STEPS --------------------
-- Regular (non-map) steps receive dependency outputs as a structured object.
-- Root steps (no dependencies) get empty object - they access flowInput via context.
-- Dependent steps get only their dependency outputs.
ELSE
-- Non-map steps get structured input with dependency keys only
-- Example for dependent step: {
-- "step1": {"output": "from_step1"},
-- "step2": {"output": "from_step2"}
-- }
-- Example for root step: {}
--
-- Note: flow_input is available separately in the returned record
-- for workers to access via context.flowInput
coalesce(dep_out.deps_output, '{}'::jsonb)
END as input,
st.message_id as msg_id,
st.task_index as task_index,
-- flow_input: Original run input for worker context
-- Only included for root non-map steps to avoid data duplication.
-- Root map steps: flowInput IS the array, useless to include
-- Dependent steps: lazy load via ctx.flowInput when needed
CASE
WHEN step.step_type != 'map' AND step.deps_count = 0
THEN r.input
ELSE NULL
END as flow_input
from tasks st
join runs r on st.run_id = r.run_id
join pgflow.steps step on
step.flow_slug = st.flow_slug and
step.step_slug = st.step_slug
left join deps_outputs dep_out on
dep_out.run_id = st.run_id and
dep_out.step_slug = st.step_slug
$$
--SPLIT--
-- source: 20260120205547_pgflow_requeue_stalled_tasks.sql
-- Modify "step_tasks" table
ALTER TABLE "pgflow"."step_tasks" ADD COLUMN "requeued_count" integer NOT NULL DEFAULT 0, ADD COLUMN "last_requeued_at" timestamptz NULL, ADD COLUMN "permanently_stalled_at" timestamptz NULL
--SPLIT--
-- Create "requeue_stalled_tasks" function
CREATE FUNCTION "pgflow"."requeue_stalled_tasks" () RETURNS integer LANGUAGE plpgsql SECURITY DEFINER SET "search_path" = '' AS $$
declare
result_count int := 0;
max_requeues constant int := 3;
begin
-- Find and requeue stalled tasks (where started_at > timeout + 30s buffer)
-- Tasks with requeued_count >= max_requeues will have their message archived
-- but status left as 'started' for easy identification via requeued_count column
with stalled_tasks as (
select
st.run_id,
st.step_slug,
st.task_index,
st.message_id,
r.flow_slug,
st.requeued_count,
f.opt_timeout
from pgflow.step_tasks st
join pgflow.runs r on r.run_id = st.run_id
join pgflow.flows f on f.flow_slug = r.flow_slug
where st.status = 'started'
and st.permanently_stalled_at is null
and st.started_at < now() - (f.opt_timeout * interval '1 second') - interval '30 seconds'
for update of st skip locked
),
-- Separate tasks that can be requeued from those that exceeded max requeues
to_requeue as (
select * from stalled_tasks where requeued_count < max_requeues
),
to_archive as (
select * from stalled_tasks where requeued_count >= max_requeues
),
-- Update tasks that will be requeued
requeued as (
update pgflow.step_tasks st
set
status = 'queued',
started_at = null,
last_worker_id = null,
requeued_count = st.requeued_count + 1,
last_requeued_at = now()
from to_requeue tr
where st.run_id = tr.run_id
and st.step_slug = tr.step_slug
and st.task_index = tr.task_index
returning tr.flow_slug as queue_name, tr.message_id
),
-- Make requeued messages visible immediately (batched per queue)
visibility_reset as (
select pgflow.set_vt_batch(
r.queue_name,
array_agg(r.message_id),
array_agg(0) -- all offsets are 0 (immediate visibility)
)
from requeued r
where r.message_id is not null
group by r.queue_name
),
-- Mark tasks as permanently stalled before archiving
mark_permanently_stalled as (
update pgflow.step_tasks st
set permanently_stalled_at = now()
from to_archive ta
where st.run_id = ta.run_id
and st.step_slug = ta.step_slug
and st.task_index = ta.task_index
returning st.run_id
),
-- Archive messages for tasks that exceeded max requeues (batched per queue)
archived as (
select pgmq.archive(ta.flow_slug, array_agg(ta.message_id))
from to_archive ta
where ta.message_id is not null
group by ta.flow_slug
),
-- Force execution of visibility_reset CTE
_vr as (select count(*) from visibility_reset),
-- Force execution of mark_permanently_stalled CTE
_mps as (select count(*) from mark_permanently_stalled),
-- Force execution of archived CTE
_ar as (select count(*) from archived)
select count(*) into result_count
from requeued, _vr, _mps, _ar;
return result_count;
end;
$$
--SPLIT--
-- source: 20260214181656_pgflow_step_conditions.sql
-- Modify "step_states" table
ALTER TABLE "pgflow"."step_states" DROP CONSTRAINT "completed_at_or_failed_at", DROP CONSTRAINT "remaining_tasks_state_consistency", ADD CONSTRAINT "remaining_tasks_state_consistency" CHECK ((remaining_tasks IS NULL) OR (status <> ALL (ARRAY['created'::text, 'skipped'::text]))), DROP CONSTRAINT "status_is_valid", ADD CONSTRAINT "status_is_valid" CHECK (status = ANY (ARRAY['created'::text, 'started'::text, 'completed'::text, 'failed'::text, 'skipped'::text])), ADD CONSTRAINT "completed_at_or_failed_at_or_skipped_at" CHECK (((
CASE
WHEN (completed_at IS NOT NULL) THEN 1
ELSE 0
END +
CASE
WHEN (failed_at IS NOT NULL) THEN 1
ELSE 0
END) +
CASE
WHEN (skipped_at IS NOT NULL) THEN 1
ELSE 0
END) <= 1), ADD CONSTRAINT "skip_reason_matches_status" CHECK (((status = 'skipped'::text) AND (skip_reason IS NOT NULL)) OR ((status <> 'skipped'::text) AND (skip_reason IS NULL))), ADD CONSTRAINT "skipped_at_is_after_created_at" CHECK ((skipped_at IS NULL) OR (skipped_at >= created_at)), ADD COLUMN "skip_reason" text NULL, ADD COLUMN "skipped_at" timestamptz NULL
--SPLIT--
-- Create index "idx_step_states_skipped" to table: "step_states"
CREATE INDEX "idx_step_states_skipped" ON "pgflow"."step_states" ("run_id", "step_slug") WHERE (status = 'skipped'::text)
--SPLIT--
-- Modify "steps" table
ALTER TABLE "pgflow"."steps" ADD CONSTRAINT "forbidden_input_pattern_is_object" CHECK ((forbidden_input_pattern IS NULL) OR (jsonb_typeof(forbidden_input_pattern) = 'object'::text)), ADD CONSTRAINT "required_input_pattern_is_object" CHECK ((required_input_pattern IS NULL) OR (jsonb_typeof(required_input_pattern) = 'object'::text)), ADD CONSTRAINT "when_exhausted_is_valid" CHECK (when_exhausted = ANY (ARRAY['fail'::text, 'skip'::text, 'skip-cascade'::text])), ADD CONSTRAINT "when_unmet_is_valid" CHECK (when_unmet = ANY (ARRAY['fail'::text, 'skip'::text, 'skip-cascade'::text])), ADD COLUMN "required_input_pattern" jsonb NULL, ADD COLUMN "forbidden_input_pattern" jsonb NULL, ADD COLUMN "when_unmet" text NOT NULL DEFAULT 'skip', ADD COLUMN "when_exhausted" text NOT NULL DEFAULT 'fail'
--SPLIT--
-- Modify "_compare_flow_shapes" function
CREATE OR REPLACE FUNCTION "pgflow"."_compare_flow_shapes" ("p_local" jsonb, "p_db" jsonb) RETURNS text[] LANGUAGE plpgsql STABLE SET "search_path" = '' AS $BODY$
DECLARE
v_differences text[] := '{}';
v_local_steps jsonb;
v_db_steps jsonb;
v_local_count int;
v_db_count int;
v_max_count int;
v_idx int;
v_local_step jsonb;
v_db_step jsonb;
v_local_deps text;
v_db_deps text;
BEGIN
v_local_steps := p_local->'steps';
v_db_steps := p_db->'steps';
v_local_count := jsonb_array_length(COALESCE(v_local_steps, '[]'::jsonb));
v_db_count := jsonb_array_length(COALESCE(v_db_steps, '[]'::jsonb));
-- Compare step counts
IF v_local_count != v_db_count THEN
v_differences := array_append(
v_differences,
format('Step count differs: %s vs %s', v_local_count, v_db_count)
);
END IF;
-- Compare steps by index
v_max_count := GREATEST(v_local_count, v_db_count);
FOR v_idx IN 0..(v_max_count - 1) LOOP
v_local_step := v_local_steps->v_idx;
v_db_step := v_db_steps->v_idx;
IF v_local_step IS NULL THEN
v_differences := array_append(
v_differences,
format(
$$Step at index %s: missing in first shape (second has '%s')$$,
v_idx,
v_db_step->>'slug'
)
);
ELSIF v_db_step IS NULL THEN
v_differences := array_append(
v_differences,
format(
$$Step at index %s: missing in second shape (first has '%s')$$,
v_idx,
v_local_step->>'slug'
)
);
ELSE
-- Compare slug
IF v_local_step->>'slug' != v_db_step->>'slug' THEN
v_differences := array_append(
v_differences,
format(
$$Step at index %s: slug differs '%s' vs '%s'$$,
v_idx,
v_local_step->>'slug',
v_db_step->>'slug'
)
);
END IF;
-- Compare step type
IF v_local_step->>'stepType' != v_db_step->>'stepType' THEN
v_differences := array_append(
v_differences,
format(
$$Step at index %s: type differs '%s' vs '%s'$$,
v_idx,
v_local_step->>'stepType',
v_db_step->>'stepType'
)
);
END IF;
-- Compare dependencies (convert arrays to comma-separated strings)
SELECT string_agg(dep, ', ' ORDER BY dep)
INTO v_local_deps
FROM jsonb_array_elements_text(COALESCE(v_local_step->'dependencies', '[]'::jsonb)) AS dep;
SELECT string_agg(dep, ', ' ORDER BY dep)
INTO v_db_deps
FROM jsonb_array_elements_text(COALESCE(v_db_step->'dependencies', '[]'::jsonb)) AS dep;
IF COALESCE(v_local_deps, '') != COALESCE(v_db_deps, '') THEN
v_differences := array_append(
v_differences,
format(
$$Step at index %s: dependencies differ [%s] vs [%s]$$,
v_idx,
COALESCE(v_local_deps, ''),
COALESCE(v_db_deps, '')
)
);
END IF;
-- Compare whenUnmet (structural - affects DAG execution semantics)
IF v_local_step->>'whenUnmet' != v_db_step->>'whenUnmet' THEN
v_differences := array_append(
v_differences,
format(
$$Step at index %s: whenUnmet differs '%s' vs '%s'$$,
v_idx,
v_local_step->>'whenUnmet',
v_db_step->>'whenUnmet'
)
);
END IF;
-- Compare whenExhausted (structural - affects DAG execution semantics)
IF v_local_step->>'whenExhausted' != v_db_step->>'whenExhausted' THEN
v_differences := array_append(
v_differences,
format(
$$Step at index %s: whenExhausted differs '%s' vs '%s'$$,
v_idx,
v_local_step->>'whenExhausted',
v_db_step->>'whenExhausted'
)
);
END IF;
-- Compare requiredInputPattern (structural - affects DAG execution semantics)
-- Uses -> (jsonb) not ->> (text) to properly compare wrapper objects
IF v_local_step->'requiredInputPattern' IS DISTINCT FROM v_db_step->'requiredInputPattern' THEN
v_differences := array_append(
v_differences,
format(
$$Step at index %s: requiredInputPattern differs '%s' vs '%s'$$,
v_idx,
v_local_step->'requiredInputPattern',
v_db_step->'requiredInputPattern'
)
);
END IF;
-- Compare forbiddenInputPattern (structural - affects DAG execution semantics)
-- Uses -> (jsonb) not ->> (text) to properly compare wrapper objects
IF v_local_step->'forbiddenInputPattern' IS DISTINCT FROM v_db_step->'forbiddenInputPattern' THEN
v_differences := array_append(
v_differences,
format(
$$Step at index %s: forbiddenInputPattern differs '%s' vs '%s'$$,
v_idx,
v_local_step->'forbiddenInputPattern',
v_db_step->'forbiddenInputPattern'
)
);
END IF;
END IF;
END LOOP;
RETURN v_differences;
END;
$BODY$
--SPLIT--
-- Create "add_step" function
CREATE FUNCTION "pgflow"."add_step" ("flow_slug" text, "step_slug" text, "deps_slugs" text[] DEFAULT '{}', "max_attempts" integer DEFAULT NULL::integer, "base_delay" integer DEFAULT NULL::integer, "timeout" integer DEFAULT NULL::integer, "start_delay" integer DEFAULT NULL::integer, "step_type" text DEFAULT 'single', "required_input_pattern" jsonb DEFAULT NULL::jsonb, "forbidden_input_pattern" jsonb DEFAULT NULL::jsonb, "when_unmet" text DEFAULT 'skip', "when_exhausted" text DEFAULT 'fail') RETURNS "pgflow"."steps" LANGUAGE plpgsql SET "search_path" = '' AS $$
DECLARE
result_step pgflow.steps;
next_idx int;
BEGIN
-- Validate map step constraints
-- Map steps can have either:
-- 0 dependencies (root map - maps over flow input array)
-- 1 dependency (dependent map - maps over dependency output array)
IF COALESCE(add_step.step_type, 'single') = 'map' AND COALESCE(array_length(add_step.deps_slugs, 1), 0) > 1 THEN
RAISE EXCEPTION 'Map step "%" can have at most one dependency, but % were provided: %',
add_step.step_slug,
COALESCE(array_length(add_step.deps_slugs, 1), 0),
array_to_string(add_step.deps_slugs, ', ');
END IF;
-- Get next step index
SELECT COALESCE(MAX(s.step_index) + 1, 0) INTO next_idx
FROM pgflow.steps s
WHERE s.flow_slug = add_step.flow_slug;
-- Create the step
INSERT INTO pgflow.steps (
flow_slug, step_slug, step_type, step_index, deps_count,
opt_max_attempts, opt_base_delay, opt_timeout, opt_start_delay,
required_input_pattern, forbidden_input_pattern, when_unmet, when_exhausted
)
VALUES (
add_step.flow_slug,
add_step.step_slug,
COALESCE(add_step.step_type, 'single'),
next_idx,
COALESCE(array_length(add_step.deps_slugs, 1), 0),
add_step.max_attempts,
add_step.base_delay,
add_step.timeout,
add_step.start_delay,
add_step.required_input_pattern,
add_step.forbidden_input_pattern,
add_step.when_unmet,
add_step.when_exhausted
)
ON CONFLICT ON CONSTRAINT steps_pkey
DO UPDATE SET step_slug = EXCLUDED.step_slug
RETURNING * INTO result_step;
-- Insert dependencies
INSERT INTO pgflow.deps (flow_slug, dep_slug, step_slug)
SELECT add_step.flow_slug, d.dep_slug, add_step.step_slug
FROM unnest(COALESCE(add_step.deps_slugs, '{}')) AS d(dep_slug)
WHERE add_step.deps_slugs IS NOT NULL AND array_length(add_step.deps_slugs, 1) > 0
ON CONFLICT ON CONSTRAINT deps_pkey DO NOTHING;
RETURN result_step;
END;
$$
--SPLIT--
-- Modify "_create_flow_from_shape" function
CREATE OR REPLACE FUNCTION "pgflow"."_create_flow_from_shape" ("p_flow_slug" text, "p_shape" jsonb) RETURNS void LANGUAGE plpgsql SET "search_path" = '' AS $$
DECLARE
v_step jsonb;
v_deps text[];
v_flow_options jsonb;
v_step_options jsonb;
BEGIN
-- Extract flow-level options (may be null)
v_flow_options := p_shape->'options';
-- Create the flow with options (NULL = use default)
PERFORM pgflow.create_flow(
p_flow_slug,
(v_flow_options->>'maxAttempts')::int,
(v_flow_options->>'baseDelay')::int,
(v_flow_options->>'timeout')::int
);
-- Iterate over steps in order and add each one
FOR v_step IN SELECT * FROM jsonb_array_elements(p_shape->'steps')
LOOP
-- Convert dependencies jsonb array to text array
SELECT COALESCE(array_agg(dep), '{}')
INTO v_deps
FROM jsonb_array_elements_text(COALESCE(v_step->'dependencies', '[]'::jsonb)) AS dep;
-- Extract step options (may be null)
v_step_options := v_step->'options';
-- Add the step with options (NULL = use default/inherit)
PERFORM pgflow.add_step(
flow_slug => p_flow_slug,
step_slug => v_step->>'slug',
deps_slugs => v_deps,
max_attempts => (v_step_options->>'maxAttempts')::int,
base_delay => (v_step_options->>'baseDelay')::int,
timeout => (v_step_options->>'timeout')::int,
start_delay => (v_step_options->>'startDelay')::int,
step_type => v_step->>'stepType',
when_unmet => COALESCE(v_step->>'whenUnmet', 'skip'),
when_exhausted => COALESCE(v_step->>'whenExhausted', 'fail'),
required_input_pattern => CASE
WHEN (v_step->'requiredInputPattern'->>'defined')::boolean
THEN v_step->'requiredInputPattern'->'value'
ELSE NULL
END,
forbidden_input_pattern => CASE
WHEN (v_step->'forbiddenInputPattern'->>'defined')::boolean
THEN v_step->'forbiddenInputPattern'->'value'
ELSE NULL
END
);
END LOOP;
END;
$$
--SPLIT--
-- Modify "_get_flow_shape" function
CREATE OR REPLACE FUNCTION "pgflow"."_get_flow_shape" ("p_flow_slug" text) RETURNS jsonb LANGUAGE sql STABLE SET "search_path" = '' AS $$
SELECT jsonb_build_object(
'steps',
COALESCE(
jsonb_agg(
jsonb_build_object(
'slug', step.step_slug,
'stepType', step.step_type,
'dependencies', COALESCE(
(
SELECT jsonb_agg(dep.dep_slug ORDER BY dep.dep_slug)
FROM pgflow.deps AS dep
WHERE dep.flow_slug = step.flow_slug
AND dep.step_slug = step.step_slug
),
'[]'::jsonb
),
'whenUnmet', step.when_unmet,
'whenExhausted', step.when_exhausted,
'requiredInputPattern', CASE
WHEN step.required_input_pattern IS NULL
THEN '{"defined": false}'::jsonb
ELSE jsonb_build_object('defined', true, 'value', step.required_input_pattern)
END,
'forbiddenInputPattern', CASE
WHEN step.forbidden_input_pattern IS NULL
THEN '{"defined": false}'::jsonb
ELSE jsonb_build_object('defined', true, 'value', step.forbidden_input_pattern)
END
)
ORDER BY step.step_index
),
'[]'::jsonb
)
)
FROM pgflow.steps AS step
WHERE step.flow_slug = p_flow_slug;
$$
--SPLIT--
-- Create "_cascade_force_skip_steps" function
CREATE FUNCTION "pgflow"."_cascade_force_skip_steps" ("run_id" uuid, "step_slug" text, "skip_reason" text) RETURNS integer LANGUAGE plpgsql AS $$
DECLARE
v_flow_slug text;
v_total_skipped int := 0;
BEGIN
-- Get flow_slug for this run
SELECT r.flow_slug INTO v_flow_slug
FROM pgflow.runs r
WHERE r.run_id = _cascade_force_skip_steps.run_id;
IF v_flow_slug IS NULL THEN
RAISE EXCEPTION 'Run not found: %', _cascade_force_skip_steps.run_id;
END IF;
-- ==========================================
-- SKIP STEPS IN TOPOLOGICAL ORDER
-- ==========================================
-- Use recursive CTE to find all downstream dependents,
-- then skip them in topological order (by step_index)
WITH RECURSIVE
-- ---------- Find all downstream steps ----------
downstream_steps AS (
-- Base case: the trigger step
SELECT
s.flow_slug,
s.step_slug,
s.step_index,
_cascade_force_skip_steps.skip_reason AS reason -- Original reason for trigger step
FROM pgflow.steps s
WHERE s.flow_slug = v_flow_slug
AND s.step_slug = _cascade_force_skip_steps.step_slug
UNION ALL
-- Recursive case: steps that depend on already-found steps
SELECT
s.flow_slug,
s.step_slug,
s.step_index,
'dependency_skipped'::text AS reason -- Downstream steps get this reason
FROM pgflow.steps s
JOIN pgflow.deps d ON d.flow_slug = s.flow_slug AND d.step_slug = s.step_slug
JOIN downstream_steps ds ON ds.flow_slug = d.flow_slug AND ds.step_slug = d.dep_slug
),
-- ---------- Deduplicate and order by step_index ----------
steps_to_skip AS (
SELECT DISTINCT ON (ds.step_slug)
ds.flow_slug,
ds.step_slug,
ds.step_index,
ds.reason
FROM downstream_steps ds
ORDER BY ds.step_slug, ds.step_index -- Keep first occurrence (trigger step has original reason)
),
-- ---------- Skip the steps ----------
skipped AS (
UPDATE pgflow.step_states ss
SET status = 'skipped',
skip_reason = sts.reason,
skipped_at = now(),
remaining_tasks = NULL -- Clear remaining_tasks for skipped steps
FROM steps_to_skip sts
WHERE ss.run_id = _cascade_force_skip_steps.run_id
AND ss.step_slug = sts.step_slug
AND ss.status IN ('created', 'started') -- Only skip non-terminal steps
RETURNING
ss.*,
-- Broadcast step:skipped event
realtime.send(
jsonb_build_object(
'event_type', 'step:skipped',
'run_id', ss.run_id,
'flow_slug', ss.flow_slug,
'step_slug', ss.step_slug,
'status', 'skipped',
'skip_reason', ss.skip_reason,
'skipped_at', ss.skipped_at
),
concat('step:', ss.step_slug, ':skipped'),
concat('pgflow:run:', ss.run_id),
false
) as _broadcast_result
),
-- ---------- Archive queued/started task messages for skipped steps ----------
archived_messages AS (
SELECT pgmq.archive(v_flow_slug, ARRAY_AGG(st.message_id)) as result
FROM pgflow.step_tasks st
WHERE st.run_id = _cascade_force_skip_steps.run_id
AND st.step_slug IN (SELECT sk.step_slug FROM skipped sk)
AND st.status IN ('queued', 'started')
AND st.message_id IS NOT NULL
HAVING COUNT(st.message_id) > 0
),
-- ---------- Update run counters ----------
run_updates AS (
UPDATE pgflow.runs r
SET remaining_steps = r.remaining_steps - skipped_count.count
FROM (SELECT COUNT(*) AS count FROM skipped) skipped_count
WHERE r.run_id = _cascade_force_skip_steps.run_id
AND skipped_count.count > 0
)
SELECT skipped_count.count
INTO v_total_skipped
FROM (SELECT COUNT(*) AS count FROM skipped) skipped_count
LEFT JOIN archived_messages ON true;
RETURN v_total_skipped;
END;
$$
--SPLIT--
-- Create "cascade_resolve_conditions" function
CREATE FUNCTION "pgflow"."cascade_resolve_conditions" ("run_id" uuid) RETURNS boolean LANGUAGE plpgsql SET "search_path" = '' AS $$
DECLARE
v_run_input jsonb;
v_run_status text;
v_first_fail record;
v_iteration_count int := 0;
v_max_iterations int := 50;
v_processed_count int;
BEGIN
-- ==========================================
-- GUARD: Early return if run is already terminal
-- ==========================================
SELECT r.status, r.input INTO v_run_status, v_run_input
FROM pgflow.runs r
WHERE r.run_id = cascade_resolve_conditions.run_id;
IF v_run_status IN ('failed', 'completed') THEN
RETURN v_run_status != 'failed';
END IF;
-- ==========================================
-- ITERATE UNTIL CONVERGENCE
-- ==========================================
-- After skipping steps, dependents may become ready and need evaluation.
-- Loop until no more steps are processed.
LOOP
v_iteration_count := v_iteration_count + 1;
IF v_iteration_count > v_max_iterations THEN
RAISE EXCEPTION 'cascade_resolve_conditions exceeded safety limit of % iterations', v_max_iterations;
END IF;
v_processed_count := 0;
-- ==========================================
-- PHASE 1a: CHECK FOR FAIL CONDITIONS
-- ==========================================
-- Find first step (by topological order) with unmet condition and 'fail' mode.
-- Condition is unmet when:
-- (required_input_pattern is set AND input does NOT contain it) OR
-- (forbidden_input_pattern is set AND input DOES contain it)
WITH steps_with_conditions AS (
SELECT
step_state.flow_slug,
step_state.step_slug,
step.required_input_pattern,
step.forbidden_input_pattern,
step.when_unmet,
step.deps_count,
step.step_index
FROM pgflow.step_states AS step_state
JOIN pgflow.steps AS step
ON step.flow_slug = step_state.flow_slug
AND step.step_slug = step_state.step_slug
WHERE step_state.run_id = cascade_resolve_conditions.run_id
AND step_state.status = 'created'
AND step_state.remaining_deps = 0
AND (step.required_input_pattern IS NOT NULL OR step.forbidden_input_pattern IS NOT NULL)
),
step_deps_output AS (
SELECT
swc.step_slug,
jsonb_object_agg(dep_state.step_slug, dep_state.output) AS deps_output
FROM steps_with_conditions swc
JOIN pgflow.deps dep ON dep.flow_slug = swc.flow_slug AND dep.step_slug = swc.step_slug
JOIN pgflow.step_states dep_state
ON dep_state.run_id = cascade_resolve_conditions.run_id
AND dep_state.step_slug = dep.dep_slug
AND dep_state.status = 'completed' -- Only completed deps (not skipped)
WHERE swc.deps_count > 0
GROUP BY swc.step_slug
),
condition_evaluations AS (
SELECT
swc.*,
-- condition_met = (if IS NULL OR input @> if) AND (ifNot IS NULL OR NOT(input @> ifNot))
(swc.required_input_pattern IS NULL OR
CASE WHEN swc.deps_count = 0 THEN v_run_input ELSE COALESCE(sdo.deps_output, '{}'::jsonb) END @> swc.required_input_pattern)
AND
(swc.forbidden_input_pattern IS NULL OR
NOT (CASE WHEN swc.deps_count = 0 THEN v_run_input ELSE COALESCE(sdo.deps_output, '{}'::jsonb) END @> swc.forbidden_input_pattern))
AS condition_met
FROM steps_with_conditions swc
LEFT JOIN step_deps_output sdo ON sdo.step_slug = swc.step_slug
)
SELECT
flow_slug,
step_slug,
required_input_pattern,
forbidden_input_pattern
INTO v_first_fail
FROM condition_evaluations
WHERE NOT condition_met AND when_unmet = 'fail'
ORDER BY step_index
LIMIT 1;
-- Handle fail mode: fail step and run, return false
-- Note: Cannot use "v_first_fail IS NOT NULL" because records with NULL fields
-- evaluate to NULL in IS NOT NULL checks. Use FOUND instead.
IF FOUND THEN
UPDATE pgflow.step_states
SET status = 'failed',
failed_at = now(),
error_message = 'Condition not met'
WHERE pgflow.step_states.run_id = cascade_resolve_conditions.run_id
AND pgflow.step_states.step_slug = v_first_fail.step_slug;
UPDATE pgflow.runs
SET status = 'failed',
failed_at = now()
WHERE pgflow.runs.run_id = cascade_resolve_conditions.run_id;
PERFORM realtime.send(
jsonb_build_object(
'event_type', 'step:failed',
'run_id', cascade_resolve_conditions.run_id,
'step_slug', v_first_fail.step_slug,
'status', 'failed',
'error_message', 'Condition not met',
'failed_at', now()
),
concat('step:', v_first_fail.step_slug, ':failed'),
concat('pgflow:run:', cascade_resolve_conditions.run_id),
false
);
PERFORM realtime.send(
jsonb_build_object(
'event_type', 'run:failed',
'run_id', cascade_resolve_conditions.run_id,
'flow_slug', v_first_fail.flow_slug,
'status', 'failed',
'error_message', 'Condition not met',
'failed_at', now()
),
'run:failed',
concat('pgflow:run:', cascade_resolve_conditions.run_id),
false
);
PERFORM pgmq.archive(r.flow_slug, ARRAY_AGG(st.message_id))
FROM pgflow.step_tasks st
JOIN pgflow.runs r ON st.run_id = r.run_id
WHERE st.run_id = cascade_resolve_conditions.run_id
AND st.status IN ('queued', 'started')
AND st.message_id IS NOT NULL
GROUP BY r.flow_slug
HAVING COUNT(st.message_id) > 0;
RETURN false;
END IF;
-- ==========================================
-- PHASE 1b: HANDLE SKIP CONDITIONS (with propagation)
-- ==========================================
-- Skip steps with unmet conditions and whenUnmet='skip'.
-- Also decrement remaining_deps on dependents and set initial_tasks=0 for map dependents.
WITH steps_with_conditions AS (
SELECT
step_state.flow_slug,
step_state.step_slug,
step.required_input_pattern,
step.forbidden_input_pattern,
step.when_unmet,
step.deps_count,
step.step_index
FROM pgflow.step_states AS step_state
JOIN pgflow.steps AS step
ON step.flow_slug = step_state.flow_slug
AND step.step_slug = step_state.step_slug
WHERE step_state.run_id = cascade_resolve_conditions.run_id
AND step_state.status = 'created'
AND step_state.remaining_deps = 0
AND (step.required_input_pattern IS NOT NULL OR step.forbidden_input_pattern IS NOT NULL)
),
step_deps_output AS (
SELECT
swc.step_slug,
jsonb_object_agg(dep_state.step_slug, dep_state.output) AS deps_output
FROM steps_with_conditions swc
JOIN pgflow.deps dep ON dep.flow_slug = swc.flow_slug AND dep.step_slug = swc.step_slug
JOIN pgflow.step_states dep_state
ON dep_state.run_id = cascade_resolve_conditions.run_id
AND dep_state.step_slug = dep.dep_slug
AND dep_state.status = 'completed' -- Only completed deps (not skipped)
WHERE swc.deps_count > 0
GROUP BY swc.step_slug
),
condition_evaluations AS (
SELECT
swc.*,
-- condition_met = (if IS NULL OR input @> if) AND (ifNot IS NULL OR NOT(input @> ifNot))
(swc.required_input_pattern IS NULL OR
CASE WHEN swc.deps_count = 0 THEN v_run_input ELSE COALESCE(sdo.deps_output, '{}'::jsonb) END @> swc.required_input_pattern)
AND
(swc.forbidden_input_pattern IS NULL OR
NOT (CASE WHEN swc.deps_count = 0 THEN v_run_input ELSE COALESCE(sdo.deps_output, '{}'::jsonb) END @> swc.forbidden_input_pattern))
AS condition_met
FROM steps_with_conditions swc
LEFT JOIN step_deps_output sdo ON sdo.step_slug = swc.step_slug
),
unmet_skip_steps AS (
SELECT * FROM condition_evaluations
WHERE NOT condition_met AND when_unmet = 'skip'
),
skipped_steps AS (
UPDATE pgflow.step_states ss
SET status = 'skipped',
skip_reason = 'condition_unmet',
skipped_at = now()
FROM unmet_skip_steps uss
WHERE ss.run_id = cascade_resolve_conditions.run_id
AND ss.step_slug = uss.step_slug
AND ss.status = 'created'
RETURNING
ss.*,
realtime.send(
jsonb_build_object(
'event_type', 'step:skipped',
'run_id', ss.run_id,
'flow_slug', ss.flow_slug,
'step_slug', ss.step_slug,
'status', 'skipped',
'skip_reason', 'condition_unmet',
'skipped_at', ss.skipped_at
),
concat('step:', ss.step_slug, ':skipped'),
concat('pgflow:run:', ss.run_id),
false
) AS _broadcast_result
),
-- NEW: Update dependent steps (decrement remaining_deps by count of skipped parents, set initial_tasks=0 for maps)
skipped_parent_counts AS (
-- Count how many skipped parents each child has
SELECT
dep.step_slug AS child_step_slug,
dep.flow_slug AS child_flow_slug,
COUNT(*) AS skipped_parent_count
FROM skipped_steps parent
JOIN pgflow.deps dep ON dep.flow_slug = parent.flow_slug AND dep.dep_slug = parent.step_slug
GROUP BY dep.step_slug, dep.flow_slug
),
dependent_updates AS (
UPDATE pgflow.step_states child_state
SET remaining_deps = child_state.remaining_deps - spc.skipped_parent_count,
-- If child is a map step and this skipped step is its only dependency,
-- set initial_tasks = 0 (skipped dep = empty array)
initial_tasks = CASE
WHEN child_step.step_type = 'map' AND child_step.deps_count = 1 THEN 0
ELSE child_state.initial_tasks
END
FROM skipped_parent_counts spc
JOIN pgflow.steps child_step ON child_step.flow_slug = spc.child_flow_slug AND child_step.step_slug = spc.child_step_slug
WHERE child_state.run_id = cascade_resolve_conditions.run_id
AND child_state.step_slug = spc.child_step_slug
),
run_update AS (
UPDATE pgflow.runs r
SET remaining_steps = r.remaining_steps - (SELECT COUNT(*) FROM skipped_steps)
WHERE r.run_id = cascade_resolve_conditions.run_id
AND (SELECT COUNT(*) FROM skipped_steps) > 0
)
SELECT COUNT(*)::int INTO v_processed_count FROM skipped_steps;
-- ==========================================
-- PHASE 1c: HANDLE SKIP-CASCADE CONDITIONS
-- ==========================================
-- Call _cascade_force_skip_steps for each step with unmet condition and whenUnmet='skip-cascade'.
-- Process in topological order; _cascade_force_skip_steps is idempotent.
PERFORM pgflow._cascade_force_skip_steps(cascade_resolve_conditions.run_id, ready_step.step_slug, 'condition_unmet')
FROM pgflow.step_states AS ready_step
JOIN pgflow.steps AS step
ON step.flow_slug = ready_step.flow_slug
AND step.step_slug = ready_step.step_slug
LEFT JOIN LATERAL (
SELECT jsonb_object_agg(dep_state.step_slug, dep_state.output) AS deps_output
FROM pgflow.deps dep
JOIN pgflow.step_states dep_state
ON dep_state.run_id = cascade_resolve_conditions.run_id
AND dep_state.step_slug = dep.dep_slug
AND dep_state.status = 'completed' -- Only completed deps (not skipped)
WHERE dep.flow_slug = ready_step.flow_slug
AND dep.step_slug = ready_step.step_slug
) AS agg_deps ON step.deps_count > 0
WHERE ready_step.run_id = cascade_resolve_conditions.run_id
AND ready_step.status = 'created'
AND ready_step.remaining_deps = 0
AND (step.required_input_pattern IS NOT NULL OR step.forbidden_input_pattern IS NOT NULL)
AND step.when_unmet = 'skip-cascade'
-- Condition is NOT met when: (if fails) OR (ifNot fails)
AND NOT (
(step.required_input_pattern IS NULL OR
CASE WHEN step.deps_count = 0 THEN v_run_input ELSE COALESCE(agg_deps.deps_output, '{}'::jsonb) END @> step.required_input_pattern)
AND
(step.forbidden_input_pattern IS NULL OR
NOT (CASE WHEN step.deps_count = 0 THEN v_run_input ELSE COALESCE(agg_deps.deps_output, '{}'::jsonb) END @> step.forbidden_input_pattern))
)
ORDER BY step.step_index;
-- Check if run was failed during cascade (e.g., if _cascade_force_skip_steps triggers fail)
SELECT r.status INTO v_run_status
FROM pgflow.runs r
WHERE r.run_id = cascade_resolve_conditions.run_id;
IF v_run_status IN ('failed', 'completed') THEN
RETURN v_run_status != 'failed';
END IF;
-- Exit loop if no steps were processed in this iteration
EXIT WHEN v_processed_count = 0;
END LOOP;
RETURN true;
END;
$$
--SPLIT--
-- Modify "start_ready_steps" function
CREATE OR REPLACE FUNCTION "pgflow"."start_ready_steps" ("run_id" uuid) RETURNS void LANGUAGE plpgsql SET "search_path" = '' AS $$
BEGIN
-- ==========================================
-- GUARD: No mutations on terminal runs
-- ==========================================
IF EXISTS (
SELECT 1 FROM pgflow.runs
WHERE pgflow.runs.run_id = start_ready_steps.run_id
AND pgflow.runs.status IN ('failed', 'completed')
) THEN
RETURN;
END IF;
-- ==========================================
-- PHASE 1: START READY STEPS
-- ==========================================
-- NOTE: Condition evaluation and empty map handling are done by
-- cascade_resolve_conditions() and cascade_complete_taskless_steps()
-- which are called before this function.
WITH
-- ---------- Find ready steps ----------
-- Steps with no remaining deps and known task count
ready_steps AS (
SELECT *
FROM pgflow.step_states AS step_state
WHERE step_state.run_id = start_ready_steps.run_id
AND step_state.status = 'created'
AND step_state.remaining_deps = 0
AND step_state.initial_tasks IS NOT NULL -- Cannot start with unknown count
AND step_state.initial_tasks > 0 -- Don't start taskless steps (handled by cascade_complete_taskless_steps)
ORDER BY step_state.step_slug
FOR UPDATE
),
-- ---------- Mark steps as started ----------
started_step_states AS (
UPDATE pgflow.step_states
SET status = 'started',
started_at = now(),
remaining_tasks = ready_steps.initial_tasks -- Copy initial_tasks to remaining_tasks when starting
FROM ready_steps
WHERE pgflow.step_states.run_id = start_ready_steps.run_id
AND pgflow.step_states.step_slug = ready_steps.step_slug
RETURNING pgflow.step_states.*,
-- Broadcast step:started event atomically with the UPDATE
-- Using RETURNING ensures this executes during row processing
-- and cannot be optimized away by the query planner
realtime.send(
jsonb_build_object(
'event_type', 'step:started',
'run_id', pgflow.step_states.run_id,
'step_slug', pgflow.step_states.step_slug,
'status', 'started',
'started_at', pgflow.step_states.started_at,
'remaining_tasks', pgflow.step_states.remaining_tasks,
'remaining_deps', pgflow.step_states.remaining_deps
),
concat('step:', pgflow.step_states.step_slug, ':started'),
concat('pgflow:run:', pgflow.step_states.run_id),
false
) as _broadcast_result -- Prefix with _ to indicate internal use only
),
-- ==========================================
-- PHASE 2: TASK GENERATION AND QUEUE MESSAGES
-- ==========================================
-- ---------- Generate tasks and batch messages ----------
-- Single steps: 1 task (index 0)
-- Map steps: N tasks (indices 0..N-1)
message_batches AS (
SELECT
started_step.flow_slug,
started_step.run_id,
started_step.step_slug,
COALESCE(step.opt_start_delay, 0) as delay,
array_agg(
jsonb_build_object(
'flow_slug', started_step.flow_slug,
'run_id', started_step.run_id,
'step_slug', started_step.step_slug,
'task_index', task_idx.task_index
) ORDER BY task_idx.task_index
) AS messages,
array_agg(task_idx.task_index ORDER BY task_idx.task_index) AS task_indices
FROM started_step_states AS started_step
JOIN pgflow.steps AS step
ON step.flow_slug = started_step.flow_slug
AND step.step_slug = started_step.step_slug
-- Generate task indices from 0 to initial_tasks-1
CROSS JOIN LATERAL generate_series(0, started_step.initial_tasks - 1) AS task_idx(task_index)
GROUP BY started_step.flow_slug, started_step.run_id, started_step.step_slug, step.opt_start_delay
),
-- ---------- Send messages to queue ----------
-- Uses batch sending for performance with large arrays
sent_messages AS (
SELECT
mb.flow_slug,
mb.run_id,
mb.step_slug,
task_indices.task_index,
msg_ids.msg_id
FROM message_batches mb
CROSS JOIN LATERAL unnest(mb.task_indices) WITH ORDINALITY AS task_indices(task_index, idx_ord)
CROSS JOIN LATERAL pgmq.send_batch(mb.flow_slug, mb.messages, mb.delay) WITH ORDINALITY AS msg_ids(msg_id, msg_ord)
WHERE task_indices.idx_ord = msg_ids.msg_ord
)
-- ==========================================
-- PHASE 3: RECORD TASKS IN DATABASE
-- ==========================================
INSERT INTO pgflow.step_tasks (flow_slug, run_id, step_slug, task_index, message_id)
SELECT
sent_messages.flow_slug,
sent_messages.run_id,
sent_messages.step_slug,
sent_messages.task_index,
sent_messages.msg_id
FROM sent_messages;
END;
$$
--SPLIT--
-- Modify "complete_task" function
CREATE OR REPLACE FUNCTION "pgflow"."complete_task" ("run_id" uuid, "step_slug" text, "task_index" integer, "output" jsonb) RETURNS SETOF "pgflow"."step_tasks" LANGUAGE plpgsql SET "search_path" = '' AS $$
declare
v_step_state pgflow.step_states%ROWTYPE;
v_dependent_map_slug text;
v_run_record pgflow.runs%ROWTYPE;
v_step_record pgflow.step_states%ROWTYPE;
begin
-- ==========================================
-- GUARD: No mutations on failed runs
-- ==========================================
IF EXISTS (SELECT 1 FROM pgflow.runs WHERE pgflow.runs.run_id = complete_task.run_id AND pgflow.runs.status = 'failed') THEN
RETURN QUERY SELECT * FROM pgflow.step_tasks
WHERE pgflow.step_tasks.run_id = complete_task.run_id
AND pgflow.step_tasks.step_slug = complete_task.step_slug
AND pgflow.step_tasks.task_index = complete_task.task_index;
RETURN;
END IF;
-- ==========================================
-- LOCK ACQUISITION AND TYPE VALIDATION
-- ==========================================
-- Acquire locks first to prevent race conditions
SELECT * INTO v_run_record FROM pgflow.runs
WHERE pgflow.runs.run_id = complete_task.run_id
FOR UPDATE;
SELECT * INTO v_step_record FROM pgflow.step_states
WHERE pgflow.step_states.run_id = complete_task.run_id
AND pgflow.step_states.step_slug = complete_task.step_slug
FOR UPDATE;
-- ==========================================
-- GUARD: Late callback - step not started
-- ==========================================
-- If the step is not in 'started' state, this is a late callback.
-- Do not mutate step_states or runs, archive message, return task row.
IF v_step_record.status != 'started' THEN
-- Archive the task message if present (prevents stuck work)
PERFORM pgmq.archive(
v_run_record.flow_slug,
st.message_id
)
FROM pgflow.step_tasks st
WHERE st.run_id = complete_task.run_id
AND st.step_slug = complete_task.step_slug
AND st.task_index = complete_task.task_index
AND st.message_id IS NOT NULL;
-- Return the current task row without any mutations
RETURN QUERY SELECT * FROM pgflow.step_tasks
WHERE pgflow.step_tasks.run_id = complete_task.run_id
AND pgflow.step_tasks.step_slug = complete_task.step_slug
AND pgflow.step_tasks.task_index = complete_task.task_index;
RETURN;
END IF;
-- Check for type violations AFTER acquiring locks
SELECT child_step.step_slug INTO v_dependent_map_slug
FROM pgflow.deps dependency
JOIN pgflow.steps child_step ON child_step.flow_slug = dependency.flow_slug
AND child_step.step_slug = dependency.step_slug
JOIN pgflow.steps parent_step ON parent_step.flow_slug = dependency.flow_slug
AND parent_step.step_slug = dependency.dep_slug
JOIN pgflow.step_states child_state ON child_state.flow_slug = child_step.flow_slug
AND child_state.step_slug = child_step.step_slug
WHERE dependency.dep_slug = complete_task.step_slug -- parent is the completing step
AND dependency.flow_slug = v_run_record.flow_slug
AND parent_step.step_type = 'single' -- Only validate single steps
AND child_step.step_type = 'map'
AND child_state.run_id = complete_task.run_id
AND child_state.initial_tasks IS NULL
AND (complete_task.output IS NULL OR jsonb_typeof(complete_task.output) != 'array')
LIMIT 1;
-- Handle type violation if detected
IF v_dependent_map_slug IS NOT NULL THEN
-- Mark run as failed immediately
UPDATE pgflow.runs
SET status = 'failed',
failed_at = now()
WHERE pgflow.runs.run_id = complete_task.run_id;
-- Broadcast run:failed event
-- Uses PERFORM pattern to ensure execution (proven reliable pattern in this function)
PERFORM realtime.send(
jsonb_build_object(
'event_type', 'run:failed',
'run_id', complete_task.run_id,
'flow_slug', v_run_record.flow_slug,
'status', 'failed',
'failed_at', now()
),
'run:failed',
concat('pgflow:run:', complete_task.run_id),
false
);
-- Archive all active messages (both queued and started) to prevent orphaned messages
PERFORM pgmq.archive(
v_run_record.flow_slug,
array_agg(st.message_id)
)
FROM pgflow.step_tasks st
WHERE st.run_id = complete_task.run_id
AND st.status IN ('queued', 'started')
AND st.message_id IS NOT NULL
HAVING count(*) > 0; -- Only call archive if there are messages to archive
-- Mark current task as failed and store the output
UPDATE pgflow.step_tasks
SET status = 'failed',
failed_at = now(),
output = complete_task.output, -- Store the output that caused the violation
error_message = '[TYPE_VIOLATION] Produced ' ||
CASE WHEN complete_task.output IS NULL THEN 'null'
ELSE jsonb_typeof(complete_task.output) END ||
' instead of array'
WHERE pgflow.step_tasks.run_id = complete_task.run_id
AND pgflow.step_tasks.step_slug = complete_task.step_slug
AND pgflow.step_tasks.task_index = complete_task.task_index;
-- Mark step state as failed
UPDATE pgflow.step_states
SET status = 'failed',
failed_at = now(),
error_message = '[TYPE_VIOLATION] Map step ' || v_dependent_map_slug ||
' expects array input but dependency ' || complete_task.step_slug ||
' produced ' || CASE WHEN complete_task.output IS NULL THEN 'null'
ELSE jsonb_typeof(complete_task.output) END
WHERE pgflow.step_states.run_id = complete_task.run_id
AND pgflow.step_states.step_slug = complete_task.step_slug;
-- Broadcast step:failed event
-- Uses PERFORM pattern to ensure execution (proven reliable pattern in this function)
PERFORM realtime.send(
jsonb_build_object(
'event_type', 'step:failed',
'run_id', complete_task.run_id,
'step_slug', complete_task.step_slug,
'status', 'failed',
'error_message', '[TYPE_VIOLATION] Map step ' || v_dependent_map_slug ||
' expects array input but dependency ' || complete_task.step_slug ||
' produced ' || CASE WHEN complete_task.output IS NULL THEN 'null'
ELSE jsonb_typeof(complete_task.output) END,
'failed_at', now()
),
concat('step:', complete_task.step_slug, ':failed'),
concat('pgflow:run:', complete_task.run_id),
false
);
-- Archive the current task's message (it was started, now failed)
PERFORM pgmq.archive(
v_run_record.flow_slug,
st.message_id -- Single message, use scalar form
)
FROM pgflow.step_tasks st
WHERE st.run_id = complete_task.run_id
AND st.step_slug = complete_task.step_slug
AND st.task_index = complete_task.task_index
AND st.message_id IS NOT NULL;
-- Return the failed task row (API contract: always return task row)
RETURN QUERY
SELECT * FROM pgflow.step_tasks st
WHERE st.run_id = complete_task.run_id
AND st.step_slug = complete_task.step_slug
AND st.task_index = complete_task.task_index;
RETURN;
END IF;
-- ==========================================
-- MAIN CTE CHAIN: Update task and propagate changes
-- ==========================================
WITH
-- ---------- Task completion ----------
-- Update the task record with completion status and output
task AS (
UPDATE pgflow.step_tasks
SET
status = 'completed',
completed_at = now(),
output = complete_task.output
WHERE pgflow.step_tasks.run_id = complete_task.run_id
AND pgflow.step_tasks.step_slug = complete_task.step_slug
AND pgflow.step_tasks.task_index = complete_task.task_index
AND pgflow.step_tasks.status = 'started'
RETURNING *
),
-- ---------- Get step type for output handling ----------
step_def AS (
SELECT step.step_type
FROM pgflow.steps step
JOIN pgflow.runs run ON run.flow_slug = step.flow_slug
WHERE run.run_id = complete_task.run_id
AND step.step_slug = complete_task.step_slug
),
-- ---------- Step state update ----------
-- Decrement remaining_tasks and potentially mark step as completed
-- Also store output atomically with status transition to completed
step_state AS (
UPDATE pgflow.step_states
SET
status = CASE
WHEN pgflow.step_states.remaining_tasks = 1 THEN 'completed' -- Will be 0 after decrement
ELSE 'started'
END,
completed_at = CASE
WHEN pgflow.step_states.remaining_tasks = 1 THEN now() -- Will be 0 after decrement
ELSE NULL
END,
remaining_tasks = pgflow.step_states.remaining_tasks - 1,
-- Store output atomically with completion (only when remaining_tasks = 1, meaning step completes)
output = CASE
-- Single step: store task output directly when completing
WHEN (SELECT step_type FROM step_def) = 'single' AND pgflow.step_states.remaining_tasks = 1 THEN
complete_task.output
-- Map step: aggregate on completion (ordered by task_index)
WHEN (SELECT step_type FROM step_def) = 'map' AND pgflow.step_states.remaining_tasks = 1 THEN
(SELECT COALESCE(jsonb_agg(all_outputs.output ORDER BY all_outputs.task_index), '[]'::jsonb)
FROM (
-- All previously completed tasks
SELECT st.output, st.task_index
FROM pgflow.step_tasks st
WHERE st.run_id = complete_task.run_id
AND st.step_slug = complete_task.step_slug
AND st.status = 'completed'
UNION ALL
-- Current task being completed (not yet visible as completed in snapshot)
SELECT complete_task.output, complete_task.task_index
) all_outputs)
ELSE pgflow.step_states.output
END
FROM task
WHERE pgflow.step_states.run_id = complete_task.run_id
AND pgflow.step_states.step_slug = complete_task.step_slug
RETURNING pgflow.step_states.*
),
-- ---------- Dependency resolution ----------
-- Find all child steps that depend on the completed parent step (only if parent completed)
child_steps AS (
SELECT deps.step_slug AS child_step_slug
FROM pgflow.deps deps
JOIN step_state parent_state ON parent_state.status = 'completed' AND deps.flow_slug = parent_state.flow_slug
WHERE deps.dep_slug = complete_task.step_slug -- dep_slug is the parent, step_slug is the child
ORDER BY deps.step_slug -- Ensure consistent ordering
),
-- ---------- Lock child steps ----------
-- Acquire locks on all child steps before updating them
child_steps_lock AS (
SELECT * FROM pgflow.step_states
WHERE pgflow.step_states.run_id = complete_task.run_id
AND pgflow.step_states.step_slug IN (SELECT child_step_slug FROM child_steps)
FOR UPDATE
),
-- ---------- Update child steps ----------
-- Decrement remaining_deps and resolve NULL initial_tasks for map steps
child_steps_update AS (
UPDATE pgflow.step_states child_state
SET remaining_deps = child_state.remaining_deps - 1,
-- Resolve NULL initial_tasks for child map steps
-- This is where child maps learn their array size from the parent
-- This CTE only runs when the parent step is complete (see child_steps JOIN)
initial_tasks = CASE
WHEN child_step.step_type = 'map' AND child_state.initial_tasks IS NULL THEN
CASE
WHEN parent_step.step_type = 'map' THEN
-- Map->map: Count all completed tasks from parent map
-- We add 1 because the current task is being completed in this transaction
-- but isn't yet visible as 'completed' in the step_tasks table
-- TODO: Refactor to use future column step_states.total_tasks
-- Would eliminate the COUNT query and just use parent_state.total_tasks
(SELECT COUNT(*)::int + 1
FROM pgflow.step_tasks parent_tasks
WHERE parent_tasks.run_id = complete_task.run_id
AND parent_tasks.step_slug = complete_task.step_slug
AND parent_tasks.status = 'completed'
AND parent_tasks.task_index != complete_task.task_index)
ELSE
-- Single->map: Use output array length (single steps complete immediately)
CASE
WHEN complete_task.output IS NOT NULL
AND jsonb_typeof(complete_task.output) = 'array' THEN
jsonb_array_length(complete_task.output)
ELSE NULL -- Keep NULL if not an array
END
END
ELSE child_state.initial_tasks -- Keep existing value (including NULL)
END
FROM child_steps children
JOIN pgflow.steps child_step ON child_step.flow_slug = (SELECT r.flow_slug FROM pgflow.runs r WHERE r.run_id = complete_task.run_id)
AND child_step.step_slug = children.child_step_slug
JOIN pgflow.steps parent_step ON parent_step.flow_slug = (SELECT r.flow_slug FROM pgflow.runs r WHERE r.run_id = complete_task.run_id)
AND parent_step.step_slug = complete_task.step_slug
WHERE child_state.run_id = complete_task.run_id
AND child_state.step_slug = children.child_step_slug
)
-- ---------- Update run remaining_steps ----------
-- Decrement the run's remaining_steps counter if step completed
UPDATE pgflow.runs
SET remaining_steps = pgflow.runs.remaining_steps - 1
FROM step_state
WHERE pgflow.runs.run_id = complete_task.run_id
AND step_state.status = 'completed';
-- ==========================================
-- POST-COMPLETION ACTIONS
-- ==========================================
-- ---------- Get updated state for broadcasting ----------
SELECT * INTO v_step_state FROM pgflow.step_states
WHERE pgflow.step_states.run_id = complete_task.run_id AND pgflow.step_states.step_slug = complete_task.step_slug;
-- ---------- Handle step completion ----------
IF v_step_state.status = 'completed' THEN
-- Broadcast step:completed event FIRST (before cascade)
-- This ensures parent broadcasts before its dependent children
-- Use stored output from step_states (set atomically during status transition)
PERFORM realtime.send(
jsonb_build_object(
'event_type', 'step:completed',
'run_id', complete_task.run_id,
'step_slug', complete_task.step_slug,
'status', 'completed',
'output', v_step_state.output, -- Use stored output instead of re-aggregating
'completed_at', v_step_state.completed_at
),
concat('step:', complete_task.step_slug, ':completed'),
concat('pgflow:run:', complete_task.run_id),
false
);
-- THEN evaluate conditions on newly-ready dependent steps
-- This must happen before cascade_complete_taskless_steps so that
-- skipped steps can set initial_tasks=0 for their map dependents
IF NOT pgflow.cascade_resolve_conditions(complete_task.run_id) THEN
-- Run was failed due to a condition with when_unmet='fail'
-- Archive the current task's message before returning
PERFORM pgmq.archive(
(SELECT r.flow_slug FROM pgflow.runs r WHERE r.run_id = complete_task.run_id),
(SELECT st.message_id FROM pgflow.step_tasks st
WHERE st.run_id = complete_task.run_id
AND st.step_slug = complete_task.step_slug
AND st.task_index = complete_task.task_index)
);
RETURN QUERY SELECT * FROM pgflow.step_tasks
WHERE pgflow.step_tasks.run_id = complete_task.run_id
AND pgflow.step_tasks.step_slug = complete_task.step_slug
AND pgflow.step_tasks.task_index = complete_task.task_index;
RETURN;
END IF;
-- THEN cascade complete any taskless steps that are now ready
-- This ensures dependent children broadcast AFTER their parent
PERFORM pgflow.cascade_complete_taskless_steps(complete_task.run_id);
END IF;
-- ---------- Archive completed task message ----------
-- Move message from active queue to archive table
PERFORM (
WITH completed_tasks AS (
SELECT r.flow_slug, st.message_id
FROM pgflow.step_tasks st
JOIN pgflow.runs r ON st.run_id = r.run_id
WHERE st.run_id = complete_task.run_id
AND st.step_slug = complete_task.step_slug
AND st.task_index = complete_task.task_index
AND st.status = 'completed'
)
SELECT pgmq.archive(ct.flow_slug, ct.message_id)
FROM completed_tasks ct
WHERE EXISTS (SELECT 1 FROM completed_tasks)
);
-- ---------- Trigger next steps ----------
-- Start any steps that are now ready (deps satisfied)
PERFORM pgflow.start_ready_steps(complete_task.run_id);
-- Check if the entire run is complete
PERFORM pgflow.maybe_complete_run(complete_task.run_id);
-- ---------- Return completed task ----------
RETURN QUERY SELECT *
FROM pgflow.step_tasks AS step_task
WHERE step_task.run_id = complete_task.run_id
AND step_task.step_slug = complete_task.step_slug
AND step_task.task_index = complete_task.task_index;
end;
$$
--SPLIT--
-- Create "_archive_task_message" function
CREATE FUNCTION "pgflow"."_archive_task_message" ("p_run_id" uuid, "p_step_slug" text, "p_task_index" integer) RETURNS void LANGUAGE sql SET "search_path" = '' AS $$
SELECT pgmq.archive(
r.flow_slug,
ARRAY_AGG(st.message_id)
)
FROM pgflow.step_tasks st
JOIN pgflow.runs r ON st.run_id = r.run_id
WHERE st.run_id = p_run_id
AND st.step_slug = p_step_slug
AND st.task_index = p_task_index
AND st.message_id IS NOT NULL
GROUP BY r.flow_slug
HAVING COUNT(st.message_id) > 0;
$$
--SPLIT--
-- Modify "fail_task" function
CREATE OR REPLACE FUNCTION "pgflow"."fail_task" ("run_id" uuid, "step_slug" text, "task_index" integer, "error_message" text) RETURNS SETOF "pgflow"."step_tasks" LANGUAGE plpgsql SET "search_path" = '' AS $$
DECLARE
v_run_failed boolean;
v_step_failed boolean;
v_step_skipped boolean;
v_when_exhausted text;
v_task_exhausted boolean;
v_flow_slug_for_deps text;
v_prev_step_status text;
v_flow_slug text;
begin
-- If run is already failed, no retries allowed
IF EXISTS (SELECT 1 FROM pgflow.runs WHERE pgflow.runs.run_id = fail_task.run_id AND pgflow.runs.status = 'failed') THEN
UPDATE pgflow.step_tasks
SET status = 'failed',
failed_at = now(),
error_message = fail_task.error_message
WHERE pgflow.step_tasks.run_id = fail_task.run_id
AND pgflow.step_tasks.step_slug = fail_task.step_slug
AND pgflow.step_tasks.task_index = fail_task.task_index
AND pgflow.step_tasks.status = 'started';
-- Archive the task's message
PERFORM pgmq.archive(r.flow_slug, ARRAY_AGG(st.message_id))
FROM pgflow.step_tasks st
JOIN pgflow.runs r ON st.run_id = r.run_id
WHERE st.run_id = fail_task.run_id
AND st.step_slug = fail_task.step_slug
AND st.task_index = fail_task.task_index
AND st.message_id IS NOT NULL
GROUP BY r.flow_slug
HAVING COUNT(st.message_id) > 0;
RETURN QUERY SELECT * FROM pgflow.step_tasks
WHERE pgflow.step_tasks.run_id = fail_task.run_id
AND pgflow.step_tasks.step_slug = fail_task.step_slug
AND pgflow.step_tasks.task_index = fail_task.task_index;
RETURN;
END IF;
-- Late callback guard: lock run + step rows and use current step status
-- under lock so concurrent fail_task calls cannot read stale status.
SELECT ss.status, r.flow_slug INTO v_prev_step_status, v_flow_slug
FROM pgflow.runs r
JOIN pgflow.step_states ss ON ss.run_id = r.run_id
WHERE ss.run_id = fail_task.run_id
AND ss.step_slug = fail_task.step_slug
FOR UPDATE OF r, ss;
IF v_prev_step_status IS NOT NULL AND v_prev_step_status != 'started' THEN
-- Archive the task message if present
PERFORM pgmq.archive(v_flow_slug, ARRAY_AGG(st.message_id))
FROM pgflow.step_tasks st
WHERE st.run_id = fail_task.run_id
AND st.step_slug = fail_task.step_slug
AND st.task_index = fail_task.task_index
AND st.message_id IS NOT NULL
HAVING COUNT(st.message_id) > 0;
RETURN QUERY SELECT * FROM pgflow.step_tasks
WHERE pgflow.step_tasks.run_id = fail_task.run_id
AND pgflow.step_tasks.step_slug = fail_task.step_slug
AND pgflow.step_tasks.task_index = fail_task.task_index;
RETURN;
END IF;
WITH flow_info AS (
SELECT r.flow_slug
FROM pgflow.runs r
WHERE r.run_id = fail_task.run_id
),
config AS (
SELECT
COALESCE(s.opt_max_attempts, f.opt_max_attempts) AS opt_max_attempts,
COALESCE(s.opt_base_delay, f.opt_base_delay) AS opt_base_delay,
s.when_exhausted
FROM pgflow.steps s
JOIN pgflow.flows f ON f.flow_slug = s.flow_slug
JOIN flow_info fi ON fi.flow_slug = s.flow_slug
WHERE s.flow_slug = fi.flow_slug AND s.step_slug = fail_task.step_slug
),
fail_or_retry_task as (
UPDATE pgflow.step_tasks as task
SET
status = CASE
WHEN task.attempts_count < (SELECT opt_max_attempts FROM config) THEN 'queued'
ELSE 'failed'
END,
failed_at = CASE
WHEN task.attempts_count >= (SELECT opt_max_attempts FROM config) THEN now()
ELSE NULL
END,
started_at = CASE
WHEN task.attempts_count < (SELECT opt_max_attempts FROM config) THEN NULL
ELSE task.started_at
END,
error_message = fail_task.error_message
WHERE task.run_id = fail_task.run_id
AND task.step_slug = fail_task.step_slug
AND task.task_index = fail_task.task_index
AND task.status = 'started'
RETURNING *
),
-- Determine if task exhausted retries and get when_exhausted mode
task_status AS (
SELECT
(select status from fail_or_retry_task) AS new_task_status,
(select when_exhausted from config) AS when_exhausted_mode,
-- Task is exhausted when it's failed (no more retries)
((select status from fail_or_retry_task) = 'failed') AS is_exhausted
),
maybe_fail_step AS (
UPDATE pgflow.step_states
SET
-- Status logic:
-- - If task not exhausted (retrying): keep current status
-- - If exhausted AND when_exhausted='fail': set to 'failed'
-- - If exhausted AND when_exhausted IN ('skip', 'skip-cascade'): set to 'skipped'
status = CASE
WHEN NOT (select is_exhausted from task_status) THEN pgflow.step_states.status
WHEN (select when_exhausted_mode from task_status) = 'fail' THEN 'failed'
ELSE 'skipped' -- skip or skip-cascade
END,
failed_at = CASE
WHEN (select is_exhausted from task_status) AND (select when_exhausted_mode from task_status) = 'fail' THEN now()
ELSE NULL
END,
error_message = CASE
WHEN (select is_exhausted from task_status) THEN fail_task.error_message
ELSE NULL
END,
skip_reason = CASE
WHEN (select is_exhausted from task_status) AND (select when_exhausted_mode from task_status) IN ('skip', 'skip-cascade') THEN 'handler_failed'
ELSE pgflow.step_states.skip_reason
END,
skipped_at = CASE
WHEN (select is_exhausted from task_status) AND (select when_exhausted_mode from task_status) IN ('skip', 'skip-cascade') THEN now()
ELSE pgflow.step_states.skipped_at
END,
-- Clear remaining_tasks when skipping (required by remaining_tasks_state_consistency constraint)
remaining_tasks = CASE
WHEN (select is_exhausted from task_status) AND (select when_exhausted_mode from task_status) IN ('skip', 'skip-cascade') THEN NULL
ELSE pgflow.step_states.remaining_tasks
END
FROM fail_or_retry_task
WHERE pgflow.step_states.run_id = fail_task.run_id
AND pgflow.step_states.step_slug = fail_task.step_slug
RETURNING pgflow.step_states.*
),
run_update AS (
-- Update run status: only fail when when_exhausted='fail' and step was failed
UPDATE pgflow.runs
SET status = CASE
WHEN (select status from maybe_fail_step) = 'failed' THEN 'failed'
ELSE status
END,
failed_at = CASE
WHEN (select status from maybe_fail_step) = 'failed' THEN now()
ELSE NULL
END,
-- Decrement remaining_steps only on FIRST transition to skipped
-- (not when step was already skipped and a second task fails)
-- Uses PL/pgSQL variable captured before CTE chain
remaining_steps = CASE
WHEN (select status from maybe_fail_step) = 'skipped'
AND v_prev_step_status != 'skipped'
THEN pgflow.runs.remaining_steps - 1
ELSE pgflow.runs.remaining_steps
END
WHERE pgflow.runs.run_id = fail_task.run_id
RETURNING pgflow.runs.status
)
SELECT
COALESCE((SELECT status = 'failed' FROM run_update), false),
COALESCE((SELECT status = 'failed' FROM maybe_fail_step), false),
COALESCE((SELECT status = 'skipped' FROM maybe_fail_step), false),
COALESCE((SELECT is_exhausted FROM task_status), false)
INTO v_run_failed, v_step_failed, v_step_skipped, v_task_exhausted;
-- Capture when_exhausted mode for later skip handling
SELECT s.when_exhausted INTO v_when_exhausted
FROM pgflow.steps s
JOIN pgflow.runs r ON r.flow_slug = s.flow_slug
WHERE r.run_id = fail_task.run_id
AND s.step_slug = fail_task.step_slug;
-- Send broadcast event for step failure if the step was failed
IF v_task_exhausted AND v_step_failed THEN
PERFORM realtime.send(
jsonb_build_object(
'event_type', 'step:failed',
'run_id', fail_task.run_id,
'step_slug', fail_task.step_slug,
'status', 'failed',
'error_message', fail_task.error_message,
'failed_at', now()
),
concat('step:', fail_task.step_slug, ':failed'),
concat('pgflow:run:', fail_task.run_id),
false
);
END IF;
-- Handle step skipping (when_exhausted = 'skip' or 'skip-cascade')
IF v_task_exhausted AND v_step_skipped THEN
-- Archive all queued/started sibling task messages for this step
PERFORM pgmq.archive(r.flow_slug, ARRAY_AGG(st.message_id))
FROM pgflow.step_tasks st
JOIN pgflow.runs r ON st.run_id = r.run_id
WHERE st.run_id = fail_task.run_id
AND st.step_slug = fail_task.step_slug
AND st.status IN ('queued', 'started')
AND st.message_id IS NOT NULL
GROUP BY r.flow_slug
HAVING COUNT(st.message_id) > 0;
-- Send broadcast event for step skipped
PERFORM realtime.send(
jsonb_build_object(
'event_type', 'step:skipped',
'run_id', fail_task.run_id,
'step_slug', fail_task.step_slug,
'status', 'skipped',
'skip_reason', 'handler_failed',
'error_message', fail_task.error_message,
'skipped_at', now()
),
concat('step:', fail_task.step_slug, ':skipped'),
concat('pgflow:run:', fail_task.run_id),
false
);
-- For skip-cascade: cascade skip to all downstream dependents
IF v_when_exhausted = 'skip-cascade' THEN
PERFORM pgflow._cascade_force_skip_steps(fail_task.run_id, fail_task.step_slug, 'handler_failed');
ELSE
-- For plain 'skip': decrement remaining_deps on dependent steps
-- (This mirrors the pattern in cascade_resolve_conditions.sql for when_unmet='skip')
SELECT flow_slug INTO v_flow_slug_for_deps
FROM pgflow.runs
WHERE pgflow.runs.run_id = fail_task.run_id;
UPDATE pgflow.step_states AS child_state
SET remaining_deps = child_state.remaining_deps - 1,
-- If child is a map step and this skipped step is its only dependency,
-- set initial_tasks = 0 (skipped dep = empty array)
initial_tasks = CASE
WHEN child_step.step_type = 'map' AND child_step.deps_count = 1 THEN 0
ELSE child_state.initial_tasks
END
FROM pgflow.deps AS dep
JOIN pgflow.steps AS child_step ON child_step.flow_slug = dep.flow_slug AND child_step.step_slug = dep.step_slug
WHERE child_state.run_id = fail_task.run_id
AND dep.flow_slug = v_flow_slug_for_deps
AND dep.dep_slug = fail_task.step_slug
AND child_state.step_slug = dep.step_slug;
-- Evaluate conditions on newly-ready dependent steps
-- This must happen before cascade_complete_taskless_steps so that
-- skipped steps can set initial_tasks=0 for their map dependents
IF NOT pgflow.cascade_resolve_conditions(fail_task.run_id) THEN
-- Run was failed due to a condition with when_unmet='fail'
-- Archive the failed task's message before returning
PERFORM pgflow._archive_task_message(fail_task.run_id, fail_task.step_slug, fail_task.task_index);
-- Return the task row (API contract)
RETURN QUERY SELECT * FROM pgflow.step_tasks
WHERE pgflow.step_tasks.run_id = fail_task.run_id
AND pgflow.step_tasks.step_slug = fail_task.step_slug
AND pgflow.step_tasks.task_index = fail_task.task_index;
RETURN;
END IF;
-- Auto-complete taskless steps (e.g., map steps with initial_tasks=0 from skipped dep)
PERFORM pgflow.cascade_complete_taskless_steps(fail_task.run_id);
-- Start steps that became ready after condition resolution and taskless completion
PERFORM pgflow.start_ready_steps(fail_task.run_id);
END IF;
-- Try to complete the run (remaining_steps may now be 0)
PERFORM pgflow.maybe_complete_run(fail_task.run_id);
END IF;
-- Send broadcast event for run failure if the run was failed
IF v_run_failed THEN
DECLARE
v_flow_slug text;
BEGIN
SELECT flow_slug INTO v_flow_slug FROM pgflow.runs WHERE pgflow.runs.run_id = fail_task.run_id;
PERFORM realtime.send(
jsonb_build_object(
'event_type', 'run:failed',
'run_id', fail_task.run_id,
'flow_slug', v_flow_slug,
'status', 'failed',
'error_message', fail_task.error_message,
'failed_at', now()
),
'run:failed',
concat('pgflow:run:', fail_task.run_id),
false
);
END;
END IF;
-- Archive all active messages (both queued and started) when run fails
IF v_run_failed THEN
PERFORM pgmq.archive(r.flow_slug, ARRAY_AGG(st.message_id))
FROM pgflow.step_tasks st
JOIN pgflow.runs r ON st.run_id = r.run_id
WHERE st.run_id = fail_task.run_id
AND st.status IN ('queued', 'started')
AND st.message_id IS NOT NULL
GROUP BY r.flow_slug
HAVING COUNT(st.message_id) > 0;
END IF;
-- For queued tasks: delay the message for retry with exponential backoff
PERFORM (
WITH retry_config AS (
SELECT
COALESCE(s.opt_base_delay, f.opt_base_delay) AS base_delay
FROM pgflow.steps s
JOIN pgflow.flows f ON f.flow_slug = s.flow_slug
JOIN pgflow.runs r ON r.flow_slug = f.flow_slug
WHERE r.run_id = fail_task.run_id
AND s.step_slug = fail_task.step_slug
),
queued_tasks AS (
SELECT
r.flow_slug,
st.message_id,
pgflow.calculate_retry_delay((SELECT base_delay FROM retry_config), st.attempts_count) AS calculated_delay
FROM pgflow.step_tasks st
JOIN pgflow.runs r ON st.run_id = r.run_id
WHERE st.run_id = fail_task.run_id
AND st.step_slug = fail_task.step_slug
AND st.task_index = fail_task.task_index
AND st.status = 'queued'
)
SELECT pgmq.set_vt(qt.flow_slug, qt.message_id, qt.calculated_delay)
FROM queued_tasks qt
WHERE EXISTS (SELECT 1 FROM queued_tasks)
);
-- For failed tasks: archive the message
PERFORM pgmq.archive(r.flow_slug, ARRAY_AGG(st.message_id))
FROM pgflow.step_tasks st
JOIN pgflow.runs r ON st.run_id = r.run_id
WHERE st.run_id = fail_task.run_id
AND st.step_slug = fail_task.step_slug
AND st.task_index = fail_task.task_index
AND st.status = 'failed'
AND st.message_id IS NOT NULL
GROUP BY r.flow_slug
HAVING COUNT(st.message_id) > 0;
return query select *
from pgflow.step_tasks st
where st.run_id = fail_task.run_id
and st.step_slug = fail_task.step_slug
and st.task_index = fail_task.task_index;
end;
$$
--SPLIT--
-- Modify "start_flow" function
CREATE OR REPLACE FUNCTION "pgflow"."start_flow" ("flow_slug" text, "input" jsonb, "run_id" uuid DEFAULT NULL::uuid) RETURNS SETOF "pgflow"."runs" LANGUAGE plpgsql SET "search_path" = '' AS $$
declare
v_created_run pgflow.runs%ROWTYPE;
v_root_map_count int;
begin
-- ==========================================
-- VALIDATION: Root map array input
-- ==========================================
WITH root_maps AS (
SELECT step_slug
FROM pgflow.steps
WHERE steps.flow_slug = start_flow.flow_slug
AND steps.step_type = 'map'
AND steps.deps_count = 0
)
SELECT COUNT(*) INTO v_root_map_count FROM root_maps;
-- If we have root map steps, validate that input is an array
IF v_root_map_count > 0 THEN
-- First check for NULL (should be caught by NOT NULL constraint, but be defensive)
IF start_flow.input IS NULL THEN
RAISE EXCEPTION 'Flow % has root map steps but input is NULL', start_flow.flow_slug;
END IF;
-- Then check if it's not an array
IF jsonb_typeof(start_flow.input) != 'array' THEN
RAISE EXCEPTION 'Flow % has root map steps but input is not an array (got %)',
start_flow.flow_slug, jsonb_typeof(start_flow.input);
END IF;
END IF;
-- ==========================================
-- MAIN CTE CHAIN: Create run and step states
-- ==========================================
WITH
-- ---------- Gather flow metadata ----------
flow_steps AS (
SELECT steps.flow_slug, steps.step_slug, steps.step_type, steps.deps_count
FROM pgflow.steps
WHERE steps.flow_slug = start_flow.flow_slug
),
-- ---------- Create run record ----------
created_run AS (
INSERT INTO pgflow.runs (run_id, flow_slug, input, remaining_steps)
VALUES (
COALESCE(start_flow.run_id, gen_random_uuid()),
start_flow.flow_slug,
start_flow.input,
(SELECT count(*) FROM flow_steps)
)
RETURNING *
),
-- ---------- Create step states ----------
-- Sets initial_tasks: known for root maps, NULL for dependent maps
created_step_states AS (
INSERT INTO pgflow.step_states (flow_slug, run_id, step_slug, remaining_deps, initial_tasks)
SELECT
fs.flow_slug,
(SELECT created_run.run_id FROM created_run),
fs.step_slug,
fs.deps_count,
-- Updated logic for initial_tasks:
CASE
WHEN fs.step_type = 'map' AND fs.deps_count = 0 THEN
-- Root map: get array length from input
CASE
WHEN jsonb_typeof(start_flow.input) = 'array' THEN
jsonb_array_length(start_flow.input)
ELSE
1
END
WHEN fs.step_type = 'map' AND fs.deps_count > 0 THEN
-- Dependent map: unknown until dependencies complete
NULL
ELSE
-- Single steps: always 1 task
1
END
FROM flow_steps fs
)
SELECT * FROM created_run INTO v_created_run;
-- ==========================================
-- POST-CREATION ACTIONS
-- ==========================================
-- ---------- Broadcast run:started event ----------
PERFORM realtime.send(
jsonb_build_object(
'event_type', 'run:started',
'run_id', v_created_run.run_id,
'flow_slug', v_created_run.flow_slug,
'input', v_created_run.input,
'status', 'started',
'remaining_steps', v_created_run.remaining_steps,
'started_at', v_created_run.started_at
),
'run:started',
concat('pgflow:run:', v_created_run.run_id),
false
);
-- ---------- Evaluate conditions on ready steps ----------
-- Skip steps with unmet conditions, propagate to dependents
IF NOT pgflow.cascade_resolve_conditions(v_created_run.run_id) THEN
-- Run was failed due to a condition with when_unmet='fail'
RETURN QUERY SELECT * FROM pgflow.runs where pgflow.runs.run_id = v_created_run.run_id;
RETURN;
END IF;
-- ---------- Complete taskless steps ----------
-- Handle empty array maps that should auto-complete
PERFORM pgflow.cascade_complete_taskless_steps(v_created_run.run_id);
-- ---------- Start initial steps ----------
-- Start root steps (those with no dependencies)
PERFORM pgflow.start_ready_steps(v_created_run.run_id);
-- ---------- Check for run completion ----------
-- If cascade completed all steps (zero-task flows), finalize the run
PERFORM pgflow.maybe_complete_run(v_created_run.run_id);
RETURN QUERY SELECT * FROM pgflow.runs where pgflow.runs.run_id = v_created_run.run_id;
end;
$$
--SPLIT--
-- Modify "start_tasks" function
CREATE OR REPLACE FUNCTION "pgflow"."start_tasks" ("flow_slug" text, "msg_ids" bigint[], "worker_id" uuid) RETURNS SETOF "pgflow"."step_task_record" LANGUAGE sql SET "search_path" = '' AS $$
with tasks as (
select
task.flow_slug,
task.run_id,
task.step_slug,
task.task_index,
task.message_id
from pgflow.step_tasks as task
join pgflow.runs r on r.run_id = task.run_id
where task.flow_slug = start_tasks.flow_slug
and task.message_id = any(msg_ids)
and task.status = 'queued'
and r.status = 'started'
and exists (
select 1
from pgflow.step_states ss
where ss.run_id = task.run_id
and ss.step_slug = task.step_slug
and ss.status = 'started'
)
),
start_tasks_update as (
update pgflow.step_tasks
set
attempts_count = attempts_count + 1,
status = 'started',
started_at = now(),
last_worker_id = worker_id
from tasks
where step_tasks.message_id = tasks.message_id
and step_tasks.flow_slug = tasks.flow_slug
and step_tasks.status = 'queued'
),
runs as (
select
r.run_id,
r.input
from pgflow.runs r
where r.run_id in (select run_id from tasks)
),
deps as (
select
st.run_id,
st.step_slug,
dep.dep_slug,
-- Read output directly from step_states (already aggregated by writers)
dep_state.output as dep_output
from tasks st
join pgflow.deps dep on dep.flow_slug = st.flow_slug and dep.step_slug = st.step_slug
join pgflow.step_states dep_state on
dep_state.run_id = st.run_id and
dep_state.step_slug = dep.dep_slug and
dep_state.status = 'completed' -- Only include completed deps (not skipped)
),
deps_outputs as (
select
d.run_id,
d.step_slug,
jsonb_object_agg(d.dep_slug, d.dep_output) as deps_output,
count(*) as dep_count
from deps d
group by d.run_id, d.step_slug
),
timeouts as (
select
task.message_id,
task.flow_slug,
coalesce(step.opt_timeout, flow.opt_timeout) + 2 as vt_delay
from tasks task
join pgflow.flows flow on flow.flow_slug = task.flow_slug
join pgflow.steps step on step.flow_slug = task.flow_slug and step.step_slug = task.step_slug
),
-- Batch update visibility timeouts for all messages
set_vt_batch as (
select pgflow.set_vt_batch(
start_tasks.flow_slug,
array_agg(t.message_id order by t.message_id),
array_agg(t.vt_delay order by t.message_id)
)
from timeouts t
)
select
st.flow_slug,
st.run_id,
st.step_slug,
-- ==========================================
-- INPUT CONSTRUCTION LOGIC
-- ==========================================
-- This nested CASE statement determines how to construct the input
-- for each task based on the step type (map vs non-map).
--
-- The fundamental difference:
-- - Map steps: Receive RAW array elements (e.g., just 42 or "hello")
-- - Non-map steps: Receive structured objects with named keys
-- (e.g., {"run": {...}, "dependency1": {...}})
-- ==========================================
CASE
-- -------------------- MAP STEPS --------------------
-- Map steps process arrays element-by-element.
-- Each task receives ONE element from the array at its task_index position.
WHEN step.step_type = 'map' THEN
-- Map steps get raw array elements without any wrapper object
CASE
-- ROOT MAP: Gets array from run input
-- Example: run input = [1, 2, 3]
-- task 0 gets: 1
-- task 1 gets: 2
-- task 2 gets: 3
WHEN step.deps_count = 0 THEN
-- Root map (deps_count = 0): no dependencies, reads from run input.
-- Extract the element at task_index from the run's input array.
-- Note: If run input is not an array, this will return NULL
-- and the flow will fail (validated in start_flow).
jsonb_array_element(r.input, st.task_index)
-- DEPENDENT MAP: Gets array from its single dependency
-- Example: dependency output = ["a", "b", "c"]
-- task 0 gets: "a"
-- task 1 gets: "b"
-- task 2 gets: "c"
ELSE
-- Has dependencies (should be exactly 1 for map steps).
-- Extract the element at task_index from the dependency's output array.
--
-- Why the subquery with jsonb_each?
-- - The dependency outputs a raw array: [1, 2, 3]
-- - deps_outputs aggregates it into: {"dep_name": [1, 2, 3]}
-- - We need to unwrap and get just the array value
-- - Map steps have exactly 1 dependency (enforced by add_step)
-- - So jsonb_each will return exactly 1 row
-- - We extract the 'value' which is the raw array [1, 2, 3]
-- - Then get the element at task_index from that array
(SELECT jsonb_array_element(value, st.task_index)
FROM jsonb_each(dep_out.deps_output)
LIMIT 1)
END
-- -------------------- NON-MAP STEPS --------------------
-- Regular (non-map) steps receive dependency outputs as a structured object.
-- Root steps (no dependencies) get empty object - they access flowInput via context.
-- Dependent steps get only their dependency outputs.
ELSE
-- Non-map steps get structured input with dependency keys only
-- Example for dependent step: {
-- "step1": {"output": "from_step1"},
-- "step2": {"output": "from_step2"}
-- }
-- Example for root step: {}
--
-- Note: flow_input is available separately in the returned record
-- for workers to access via context.flowInput
coalesce(dep_out.deps_output, '{}'::jsonb)
END as input,
st.message_id as msg_id,
st.task_index as task_index,
-- flow_input: Original run input for worker context
-- Only included for root non-map steps to avoid data duplication.
-- Root map steps: flowInput IS the array, useless to include
-- Dependent steps: lazy load via ctx.flowInput when needed
CASE
WHEN step.step_type != 'map' AND step.deps_count = 0
THEN r.input
ELSE NULL
END as flow_input
from tasks st
join runs r on st.run_id = r.run_id
join pgflow.steps step on
step.flow_slug = st.flow_slug and
step.step_slug = st.step_slug
left join deps_outputs dep_out on
dep_out.run_id = st.run_id and
dep_out.step_slug = st.step_slug
$$
--SPLIT--
-- Drop "add_step" function
DROP FUNCTION "pgflow"."add_step" (text, text, text[], integer, integer, integer, integer, text)
--SPLIT--
-- EctoEvolver tracking object (commented with `<label> version=1` after up/0).
CREATE OR REPLACE VIEW $SCHEMA$.pgflow_version AS SELECT 1 AS placeholder