-- PgFlowDashboard Version 1 - Up Migration
-- Creates the pgflow_dashboard schema with views, materialized views, and functions
-- Create the dashboard schema
CREATE SCHEMA IF NOT EXISTS $SCHEMA$;
--SPLIT--
-- View: runs_with_progress
-- Shows runs with calculated progress percentage
CREATE OR REPLACE VIEW $SCHEMA$.runs_with_progress AS
SELECT
r.run_id,
r.flow_slug,
r.status,
r.input,
r.output,
r.started_at,
r.completed_at,
EXTRACT(EPOCH FROM (COALESCE(r.completed_at, NOW()) - r.started_at)) * 1000 AS duration_ms,
COALESCE(progress.total_steps, 0) AS total_steps,
COALESCE(progress.completed_steps, 0) AS completed_steps,
COALESCE(progress.failed_steps, 0) AS failed_steps,
CASE
WHEN progress.total_steps > 0
THEN ROUND((progress.completed_steps::numeric / progress.total_steps) * 100, 1)
ELSE 0
END AS progress_percent
FROM pgflow.runs r
LEFT JOIN LATERAL (
SELECT
COUNT(*) AS total_steps,
COUNT(*) FILTER (WHERE ss.status = 'completed') AS completed_steps,
COUNT(*) FILTER (WHERE ss.status = 'failed') AS failed_steps
FROM pgflow.step_states ss
WHERE ss.run_id = r.run_id
) progress ON true;
--SPLIT--
-- View: runs_view
-- Full runs view with flow_type for Ecto/LiveFilter queries
CREATE OR REPLACE VIEW $SCHEMA$.runs_view AS
SELECT
r.run_id,
r.flow_slug,
COALESCE(f.flow_type, 'flow') AS flow_type,
r.status,
r.input,
r.output,
r.started_at,
r.completed_at,
r.duration_ms,
r.total_steps,
r.completed_steps,
r.failed_steps,
r.progress_percent
FROM $SCHEMA$.runs_with_progress r
LEFT JOIN pgflow.flows f ON r.flow_slug = f.flow_slug;
--SPLIT--
-- View: workers_with_load
-- Shows workers with health status and load metrics
-- Includes flow_type to distinguish between flow workers and job workers
CREATE OR REPLACE VIEW $SCHEMA$.workers_with_load AS
SELECT
w.worker_id,
w.queue_name AS flow_slug,
COALESCE(f.flow_type, 'flow') AS flow_type,
w.last_heartbeat_at,
CASE
WHEN w.last_heartbeat_at > NOW() - INTERVAL '30 seconds' THEN 'healthy'
WHEN w.last_heartbeat_at > NOW() - INTERVAL '60 seconds' THEN 'stale'
ELSE 'dead'
END AS health_status,
COALESCE(tasks.active_count, 0) AS active_tasks,
COALESCE(tasks.completed_count, 0) AS completed_tasks_24h
FROM pgflow.workers w
LEFT JOIN pgflow.flows f ON f.flow_slug = w.queue_name
LEFT JOIN LATERAL (
SELECT
COUNT(*) FILTER (WHERE st.status = 'started') AS active_count,
COUNT(*) FILTER (
WHERE st.status = 'completed'
AND st.completed_at > NOW() - INTERVAL '24 hours'
) AS completed_count
FROM pgflow.step_tasks st
JOIN pgflow.step_states ss ON st.run_id = ss.run_id AND st.step_slug = ss.step_slug
JOIN pgflow.runs r ON ss.run_id = r.run_id
WHERE r.flow_slug = w.queue_name
) tasks ON true;
--SPLIT--
-- View: flow_stats
-- Shows flow-level statistics for the last 24 hours
-- Includes flow_type to distinguish between DAG workflows ('flow') and background jobs ('job')
CREATE OR REPLACE VIEW $SCHEMA$.flow_stats AS
SELECT
f.flow_slug AS flow_slug,
COALESCE(f.flow_type, 'flow') AS flow_type,
f.opt_max_attempts,
f.opt_base_delay,
f.opt_timeout,
COALESCE(stats.total_runs_24h, 0) AS total_runs_24h,
COALESCE(stats.completed_runs_24h, 0) AS completed_runs_24h,
COALESCE(stats.failed_runs_24h, 0) AS failed_runs_24h,
CASE
WHEN stats.total_runs_24h > 0
THEN ROUND((stats.completed_runs_24h::numeric / stats.total_runs_24h) * 100, 1)
ELSE 0
END AS success_rate_24h,
COALESCE(stats.avg_duration_ms, 0) AS avg_duration_ms,
COALESCE(stats.p95_duration_ms, 0) AS p95_duration_ms,
(SELECT COUNT(*) FROM pgflow.steps s WHERE s.flow_slug = f.flow_slug) AS step_count
FROM pgflow.flows f
LEFT JOIN LATERAL (
SELECT
COUNT(*) AS total_runs_24h,
COUNT(*) FILTER (WHERE r.status = 'completed') AS completed_runs_24h,
COUNT(*) FILTER (WHERE r.status = 'failed') AS failed_runs_24h,
AVG(EXTRACT(EPOCH FROM (r.completed_at - r.started_at)) * 1000)
FILTER (WHERE r.status = 'completed') AS avg_duration_ms,
PERCENTILE_CONT(0.95) WITHIN GROUP (
ORDER BY EXTRACT(EPOCH FROM (r.completed_at - r.started_at)) * 1000
) FILTER (WHERE r.status = 'completed') AS p95_duration_ms
FROM pgflow.runs r
WHERE r.flow_slug = f.flow_slug
AND r.started_at > NOW() - INTERVAL '24 hours'
) stats ON true;
--SPLIT--
-- View: step_states_with_tasks
-- Shows step states with task counts and dependencies
CREATE OR REPLACE VIEW $SCHEMA$.step_states_with_tasks AS
SELECT
ss.run_id,
ss.flow_slug,
ss.step_slug,
ss.status,
ss.remaining_deps,
ss.remaining_tasks,
ss.started_at,
ss.completed_at,
EXTRACT(EPOCH FROM (COALESCE(ss.completed_at, NOW()) - ss.started_at)) * 1000 AS duration_ms,
COALESCE(tasks.total_tasks, 0) AS total_tasks,
COALESCE(tasks.completed_tasks, 0) AS completed_tasks,
COALESCE(tasks.failed_tasks, 0) AS failed_tasks,
s.step_type,
COALESCE(deps.dep_slugs, ARRAY[]::text[]) AS deps
FROM pgflow.step_states ss
JOIN pgflow.steps s ON ss.flow_slug = s.flow_slug AND ss.step_slug = s.step_slug
LEFT JOIN LATERAL (
SELECT
COUNT(*) AS total_tasks,
COUNT(*) FILTER (WHERE st.status = 'completed') AS completed_tasks,
COUNT(*) FILTER (WHERE st.status = 'failed') AS failed_tasks
FROM pgflow.step_tasks st
WHERE st.run_id = ss.run_id AND st.step_slug = ss.step_slug
) tasks ON true
LEFT JOIN LATERAL (
SELECT ARRAY_AGG(d.dep_slug) AS dep_slugs
FROM pgflow.deps d
WHERE d.flow_slug = ss.flow_slug AND d.step_slug = ss.step_slug
) deps ON true;
--SPLIT--
-- Function: get_overview_metrics()
-- Returns real-time dashboard overview metrics
CREATE OR REPLACE FUNCTION $SCHEMA$.get_overview_metrics()
RETURNS TABLE (
active_workers bigint,
healthy_workers bigint,
stale_workers bigint,
total_runs_24h bigint,
completed_runs_24h bigint,
failed_runs_24h bigint,
running_runs bigint,
avg_duration_ms numeric,
queue_depth bigint
)
LANGUAGE sql
STABLE
AS $$
SELECT
(SELECT COUNT(*) FROM pgflow.workers
WHERE last_heartbeat_at > NOW() - INTERVAL '5 minutes') AS active_workers,
(SELECT COUNT(*) FROM pgflow.workers
WHERE last_heartbeat_at > NOW() - INTERVAL '30 seconds') AS healthy_workers,
(SELECT COUNT(*) FROM pgflow.workers
WHERE last_heartbeat_at BETWEEN NOW() - INTERVAL '60 seconds'
AND NOW() - INTERVAL '30 seconds') AS stale_workers,
(SELECT COUNT(*) FROM pgflow.runs
WHERE started_at > NOW() - INTERVAL '24 hours') AS total_runs_24h,
(SELECT COUNT(*) FROM pgflow.runs
WHERE status = 'completed'
AND started_at > NOW() - INTERVAL '24 hours') AS completed_runs_24h,
(SELECT COUNT(*) FROM pgflow.runs
WHERE status = 'failed'
AND started_at > NOW() - INTERVAL '24 hours') AS failed_runs_24h,
(SELECT COUNT(*) FROM pgflow.runs
WHERE status = 'started') AS running_runs,
(SELECT COALESCE(AVG(EXTRACT(EPOCH FROM (completed_at - started_at)) * 1000), 0)
FROM pgflow.runs
WHERE status = 'completed'
AND started_at > NOW() - INTERVAL '24 hours') AS avg_duration_ms,
(SELECT COUNT(*) FROM pgflow.step_tasks WHERE status = 'queued')::bigint AS queue_depth
$$;
--SPLIT--
-- =====================================================
-- Query Functions (15 total)
-- These provide a portable SQL API for dashboard data
-- =====================================================
-- Function: list_runs()
-- Lists runs with optional filters and cursor pagination
-- Uses composite cursor (started_at, run_id) to avoid skip/duplicate issues with identical timestamps
CREATE OR REPLACE FUNCTION $SCHEMA$.list_runs(
p_time_range_start timestamptz DEFAULT (NOW() - INTERVAL '24 hours'),
p_flow_slug text DEFAULT NULL,
p_status text DEFAULT NULL,
p_limit integer DEFAULT 50,
p_cursor_run_id uuid DEFAULT NULL,
p_flow_type text DEFAULT NULL
)
RETURNS TABLE (
run_id uuid,
flow_slug text,
flow_type text,
status text,
input jsonb,
output jsonb,
started_at timestamptz,
completed_at timestamptz,
duration_ms numeric,
total_steps bigint,
completed_steps bigint,
failed_steps bigint,
progress_percent numeric
)
LANGUAGE sql
STABLE
AS $$
SELECT
r.run_id, r.flow_slug, COALESCE(f.flow_type, 'flow') AS flow_type, r.status, r.input, r.output,
r.started_at, r.completed_at, r.duration_ms,
r.total_steps, r.completed_steps, r.failed_steps, r.progress_percent
FROM $SCHEMA$.runs_with_progress r
JOIN pgflow.flows f ON r.flow_slug = f.flow_slug
WHERE r.started_at > p_time_range_start
AND (p_flow_slug IS NULL OR r.flow_slug = p_flow_slug)
AND (p_status IS NULL OR r.status = p_status)
AND (p_flow_type IS NULL OR COALESCE(f.flow_type, 'flow') = p_flow_type)
AND (p_cursor_run_id IS NULL OR (r.started_at, r.run_id) < (
SELECT sub.started_at, sub.run_id FROM pgflow.runs sub WHERE sub.run_id = p_cursor_run_id
))
ORDER BY r.started_at DESC, r.run_id DESC
LIMIT p_limit
$$;
--SPLIT--
-- Function: count_runs()
-- Counts runs matching filters
CREATE OR REPLACE FUNCTION $SCHEMA$.count_runs(
p_time_range_start timestamptz DEFAULT (NOW() - INTERVAL '24 hours'),
p_flow_slug text DEFAULT NULL,
p_status text DEFAULT NULL,
p_flow_type text DEFAULT NULL
)
RETURNS bigint
LANGUAGE sql
STABLE
AS $$
SELECT COUNT(*)
FROM $SCHEMA$.runs_with_progress r
JOIN pgflow.flows f ON r.flow_slug = f.flow_slug
WHERE r.started_at > p_time_range_start
AND (p_flow_slug IS NULL OR r.flow_slug = p_flow_slug)
AND (p_status IS NULL OR r.status = p_status)
AND (p_flow_type IS NULL OR COALESCE(f.flow_type, 'flow') = p_flow_type)
$$;
--SPLIT--
-- Function: get_run()
-- Gets a single run by ID
CREATE OR REPLACE FUNCTION $SCHEMA$.get_run(p_run_id uuid)
RETURNS TABLE (
run_id uuid,
flow_slug text,
flow_type text,
status text,
input jsonb,
output jsonb,
started_at timestamptz,
completed_at timestamptz,
duration_ms numeric,
total_steps bigint,
completed_steps bigint,
failed_steps bigint,
progress_percent numeric
)
LANGUAGE sql
STABLE
AS $$
SELECT
r.run_id, r.flow_slug, COALESCE(f.flow_type, 'flow') AS flow_type, r.status, r.input, r.output,
r.started_at, r.completed_at, r.duration_ms,
r.total_steps, r.completed_steps, r.failed_steps, r.progress_percent
FROM $SCHEMA$.runs_with_progress r
JOIN pgflow.flows f ON r.flow_slug = f.flow_slug
WHERE r.run_id = p_run_id
$$;
--SPLIT--
-- Function: get_adjacent_run()
-- Gets the next or previous run for navigation
-- Uses base pgflow.runs table (not runs_with_progress) for efficiency
-- since only run_id and started_at are needed for navigation
CREATE OR REPLACE FUNCTION $SCHEMA$.get_adjacent_run(
p_run_id uuid,
p_direction text -- 'next' or 'prev'
)
RETURNS uuid
LANGUAGE plpgsql
STABLE
AS $$
DECLARE
v_current_started_at timestamptz;
v_result uuid;
BEGIN
SELECT started_at INTO v_current_started_at
FROM pgflow.runs WHERE run_id = p_run_id;
IF v_current_started_at IS NULL THEN
RETURN NULL;
END IF;
IF p_direction = 'next' THEN
-- Next = older (DESC ordering means next is < current)
SELECT run_id INTO v_result
FROM pgflow.runs
WHERE started_at < v_current_started_at
ORDER BY started_at DESC
LIMIT 1;
ELSIF p_direction = 'prev' THEN
-- Prev = newer (ASC ordering means prev is > current)
SELECT run_id INTO v_result
FROM pgflow.runs
WHERE started_at > v_current_started_at
ORDER BY started_at ASC
LIMIT 1;
END IF;
RETURN v_result;
END;
$$;
--SPLIT--
-- Function: list_step_states()
-- Lists step states for a run
CREATE OR REPLACE FUNCTION $SCHEMA$.list_step_states(p_run_id uuid)
RETURNS TABLE (
run_id uuid,
flow_slug text,
step_slug text,
status text,
remaining_deps integer,
remaining_tasks integer,
started_at timestamptz,
completed_at timestamptz,
duration_ms numeric,
total_tasks bigint,
completed_tasks bigint,
failed_tasks bigint,
step_type text,
deps text[]
)
LANGUAGE sql
STABLE
AS $$
SELECT
s.run_id, s.flow_slug, s.step_slug, s.status,
s.remaining_deps, s.remaining_tasks,
s.started_at, s.completed_at, s.duration_ms,
s.total_tasks, s.completed_tasks, s.failed_tasks,
s.step_type, s.deps
FROM $SCHEMA$.step_states_with_tasks s
WHERE s.run_id = p_run_id
ORDER BY s.started_at ASC NULLS LAST
$$;
--SPLIT--
-- Function: list_step_tasks()
-- Lists tasks for a specific step
CREATE OR REPLACE FUNCTION $SCHEMA$.list_step_tasks(
p_run_id uuid,
p_step_slug text
)
RETURNS TABLE (
task_index integer,
status text,
attempts_count integer,
error_message text,
started_at timestamptz,
completed_at timestamptz,
output jsonb
)
LANGUAGE sql
STABLE
AS $$
SELECT
st.task_index, st.status, st.attempts_count, st.error_message,
st.started_at, st.completed_at, st.output
FROM pgflow.step_tasks st
WHERE st.run_id = p_run_id AND st.step_slug = p_step_slug
ORDER BY st.task_index ASC
$$;
--SPLIT--
-- Function: count_workers()
-- Counts workers matching filters
CREATE OR REPLACE FUNCTION $SCHEMA$.count_workers(
p_flow_slug text DEFAULT NULL,
p_health_status text DEFAULT NULL
)
RETURNS bigint
LANGUAGE sql
STABLE
AS $$
SELECT COUNT(*)
FROM $SCHEMA$.workers_with_load w
WHERE (p_flow_slug IS NULL OR w.flow_slug = p_flow_slug)
AND (p_health_status IS NULL OR w.health_status = p_health_status)
$$;
--SPLIT--
-- Function: list_workers()
-- Lists workers with optional filters and cursor pagination
-- Uses composite cursor (last_heartbeat_at, worker_id) to avoid skip/duplicate issues with identical timestamps
CREATE OR REPLACE FUNCTION $SCHEMA$.list_workers(
p_flow_slug text DEFAULT NULL,
p_health_status text DEFAULT NULL,
p_limit integer DEFAULT NULL,
p_cursor_worker_id uuid DEFAULT NULL
)
RETURNS TABLE (
worker_id uuid,
flow_slug text,
flow_type text,
last_heartbeat_at timestamptz,
health_status text,
active_tasks bigint,
completed_tasks_24h bigint
)
LANGUAGE sql
STABLE
AS $$
SELECT
w.worker_id, w.flow_slug, w.flow_type, w.last_heartbeat_at,
w.health_status, w.active_tasks, w.completed_tasks_24h
FROM $SCHEMA$.workers_with_load w
WHERE (p_flow_slug IS NULL OR w.flow_slug = p_flow_slug)
AND (p_health_status IS NULL OR w.health_status = p_health_status)
AND (p_cursor_worker_id IS NULL OR (w.last_heartbeat_at, w.worker_id) < (
SELECT sub.last_heartbeat_at, sub.worker_id FROM $SCHEMA$.workers_with_load sub
WHERE sub.worker_id = p_cursor_worker_id
))
ORDER BY w.last_heartbeat_at DESC, w.worker_id DESC
LIMIT p_limit
$$;
--SPLIT--
-- Function: get_worker()
-- Gets a single worker by ID
CREATE OR REPLACE FUNCTION $SCHEMA$.get_worker(p_worker_id uuid)
RETURNS TABLE (
worker_id uuid,
flow_slug text,
flow_type text,
last_heartbeat_at timestamptz,
health_status text,
active_tasks bigint,
completed_tasks_24h bigint
)
LANGUAGE sql
STABLE
AS $$
SELECT
w.worker_id, w.flow_slug, w.flow_type, w.last_heartbeat_at,
w.health_status, w.active_tasks, w.completed_tasks_24h
FROM $SCHEMA$.workers_with_load w
WHERE w.worker_id = p_worker_id
$$;
--SPLIT--
-- Function: list_worker_tasks()
-- Lists tasks processed by a worker
CREATE OR REPLACE FUNCTION $SCHEMA$.list_worker_tasks(
p_worker_id uuid,
p_limit integer DEFAULT 50
)
RETURNS TABLE (
run_id uuid,
step_slug text,
task_index integer,
status text,
attempts_count integer,
completed_at timestamptz,
error_message text
)
LANGUAGE sql
STABLE
AS $$
SELECT
st.run_id, st.step_slug, st.task_index, st.status,
st.attempts_count, st.completed_at, st.error_message
FROM pgflow.step_tasks st
WHERE st.last_worker_id = p_worker_id
ORDER BY COALESCE(st.completed_at, st.started_at, st.queued_at) DESC
LIMIT p_limit
$$;
--SPLIT--
-- Function: get_adjacent_worker()
-- Gets the next or previous worker for navigation
CREATE OR REPLACE FUNCTION $SCHEMA$.get_adjacent_worker(
p_worker_id uuid,
p_direction text -- 'next' or 'prev'
)
RETURNS uuid
LANGUAGE plpgsql
STABLE
AS $$
DECLARE
v_current_heartbeat_at timestamptz;
v_result uuid;
BEGIN
SELECT last_heartbeat_at INTO v_current_heartbeat_at
FROM $SCHEMA$.workers_with_load WHERE worker_id = p_worker_id;
IF v_current_heartbeat_at IS NULL THEN
RETURN NULL;
END IF;
IF p_direction = 'next' THEN
-- Next = older heartbeat (DESC ordering)
SELECT worker_id INTO v_result
FROM $SCHEMA$.workers_with_load
WHERE last_heartbeat_at < v_current_heartbeat_at
ORDER BY last_heartbeat_at DESC
LIMIT 1;
ELSIF p_direction = 'prev' THEN
-- Prev = newer heartbeat (ASC ordering)
SELECT worker_id INTO v_result
FROM $SCHEMA$.workers_with_load
WHERE last_heartbeat_at > v_current_heartbeat_at
ORDER BY last_heartbeat_at ASC
LIMIT 1;
END IF;
RETURN v_result;
END;
$$;
--SPLIT--
-- Function: count_flows()
-- Counts all flows (excludes jobs)
CREATE OR REPLACE FUNCTION $SCHEMA$.count_flows()
RETURNS bigint
LANGUAGE sql
STABLE
AS $$
SELECT COUNT(*)
FROM $SCHEMA$.flow_stats f
WHERE f.flow_type = 'flow'
$$;
--SPLIT--
-- Function: list_flows()
-- Lists all flows with statistics (excludes jobs), with optional pagination
CREATE OR REPLACE FUNCTION $SCHEMA$.list_flows(
p_limit integer DEFAULT NULL,
p_cursor_slug text DEFAULT NULL
)
RETURNS TABLE (
flow_slug text,
opt_max_attempts integer,
opt_base_delay integer,
opt_timeout integer,
total_runs_24h bigint,
completed_runs_24h bigint,
failed_runs_24h bigint,
success_rate_24h numeric,
avg_duration_ms numeric,
p95_duration_ms numeric,
step_count bigint
)
LANGUAGE sql
STABLE
AS $$
SELECT
f.flow_slug, f.opt_max_attempts, f.opt_base_delay, f.opt_timeout,
f.total_runs_24h, f.completed_runs_24h, f.failed_runs_24h,
f.success_rate_24h, f.avg_duration_ms, f.p95_duration_ms, f.step_count
FROM $SCHEMA$.flow_stats f
WHERE f.flow_type = 'flow'
AND (p_cursor_slug IS NULL OR f.flow_slug > p_cursor_slug)
ORDER BY f.flow_slug
LIMIT p_limit
$$;
--SPLIT--
-- Function: count_jobs()
-- Counts all jobs
CREATE OR REPLACE FUNCTION $SCHEMA$.count_jobs()
RETURNS bigint
LANGUAGE sql
STABLE
AS $$
SELECT COUNT(*)
FROM $SCHEMA$.flow_stats f
WHERE f.flow_type = 'job'
$$;
--SPLIT--
-- Function: list_jobs()
-- Lists all jobs with statistics, with optional pagination
CREATE OR REPLACE FUNCTION $SCHEMA$.list_jobs(
p_limit integer DEFAULT NULL,
p_cursor_slug text DEFAULT NULL
)
RETURNS TABLE (
flow_slug text,
opt_max_attempts integer,
opt_base_delay integer,
opt_timeout integer,
total_runs_24h bigint,
completed_runs_24h bigint,
failed_runs_24h bigint,
success_rate_24h numeric,
avg_duration_ms numeric,
p95_duration_ms numeric
)
LANGUAGE sql
STABLE
AS $$
SELECT
f.flow_slug, f.opt_max_attempts, f.opt_base_delay, f.opt_timeout,
f.total_runs_24h, f.completed_runs_24h, f.failed_runs_24h,
f.success_rate_24h, f.avg_duration_ms, f.p95_duration_ms
FROM $SCHEMA$.flow_stats f
WHERE f.flow_type = 'job'
AND (p_cursor_slug IS NULL OR f.flow_slug > p_cursor_slug)
ORDER BY f.flow_slug
LIMIT p_limit
$$;
--SPLIT--
-- Function: get_flow()
-- Gets a single flow's statistics
CREATE OR REPLACE FUNCTION $SCHEMA$.get_flow(p_flow_slug text)
RETURNS TABLE (
flow_slug text,
opt_max_attempts integer,
opt_base_delay integer,
opt_timeout integer,
total_runs_24h bigint,
completed_runs_24h bigint,
failed_runs_24h bigint,
success_rate_24h numeric,
avg_duration_ms numeric,
p95_duration_ms numeric,
step_count bigint
)
LANGUAGE sql
STABLE
AS $$
SELECT
f.flow_slug, f.opt_max_attempts, f.opt_base_delay, f.opt_timeout,
f.total_runs_24h, f.completed_runs_24h, f.failed_runs_24h,
f.success_rate_24h, f.avg_duration_ms, f.p95_duration_ms, f.step_count
FROM $SCHEMA$.flow_stats f
WHERE f.flow_slug = p_flow_slug
AND f.flow_type = 'flow'
$$;
--SPLIT--
-- Function: get_job()
-- Gets a single job's statistics
CREATE OR REPLACE FUNCTION $SCHEMA$.get_job(p_flow_slug text)
RETURNS TABLE (
flow_slug text,
opt_max_attempts integer,
opt_base_delay integer,
opt_timeout integer,
total_runs_24h bigint,
completed_runs_24h bigint,
failed_runs_24h bigint,
success_rate_24h numeric,
avg_duration_ms numeric,
p95_duration_ms numeric
)
LANGUAGE sql
STABLE
AS $$
SELECT
f.flow_slug, f.opt_max_attempts, f.opt_base_delay, f.opt_timeout,
f.total_runs_24h, f.completed_runs_24h, f.failed_runs_24h,
f.success_rate_24h, f.avg_duration_ms, f.p95_duration_ms
FROM $SCHEMA$.flow_stats f
WHERE f.flow_slug = p_flow_slug
AND f.flow_type = 'job'
$$;
--SPLIT--
-- Function: count_crons()
-- Counts all scheduled flows/jobs
CREATE OR REPLACE FUNCTION $SCHEMA$.count_crons()
RETURNS bigint
LANGUAGE sql
STABLE
AS $$
SELECT COUNT(*)
FROM $SCHEMA$.flow_stats f
INNER JOIN cron.job cj ON cj.jobname = 'pgflow:' || f.flow_slug
$$;
--SPLIT--
-- Function: list_crons()
-- Lists all scheduled flows/jobs (detected by presence in cron.job table), with optional pagination
CREATE OR REPLACE FUNCTION $SCHEMA$.list_crons(
p_limit integer DEFAULT NULL,
p_cursor_slug text DEFAULT NULL
)
RETURNS TABLE (
flow_slug text,
flow_type text,
cron_expression text,
is_active boolean,
opt_max_attempts integer,
opt_base_delay integer,
opt_timeout integer,
total_runs_24h bigint,
completed_runs_24h bigint,
failed_runs_24h bigint,
success_rate_24h numeric,
avg_duration_ms numeric,
p95_duration_ms numeric,
last_run_at timestamptz,
last_run_status text
)
LANGUAGE sql
STABLE
AS $$
SELECT
f.flow_slug,
f.flow_type,
cj.schedule AS cron_expression,
COALESCE(cj.active, true) AS is_active,
f.opt_max_attempts,
f.opt_base_delay,
f.opt_timeout,
f.total_runs_24h,
f.completed_runs_24h,
f.failed_runs_24h,
f.success_rate_24h,
f.avg_duration_ms,
f.p95_duration_ms,
lr.last_run_at,
lr.last_run_status
FROM $SCHEMA$.flow_stats f
INNER JOIN cron.job cj ON cj.jobname = 'pgflow:' || f.flow_slug
LEFT JOIN LATERAL (
SELECT r.completed_at AS last_run_at, r.status AS last_run_status
FROM pgflow.runs r
WHERE r.flow_slug = f.flow_slug
ORDER BY r.started_at DESC
LIMIT 1
) lr ON true
WHERE (p_cursor_slug IS NULL OR f.flow_slug > p_cursor_slug)
ORDER BY f.flow_slug
LIMIT p_limit
$$;
--SPLIT--
-- Function: get_cron()
-- Gets a single scheduled flow/job's statistics and schedule info
CREATE OR REPLACE FUNCTION $SCHEMA$.get_cron(p_flow_slug text)
RETURNS TABLE (
flow_slug text,
flow_type text,
cron_expression text,
is_active boolean,
opt_max_attempts integer,
opt_base_delay integer,
opt_timeout integer,
total_runs_24h bigint,
completed_runs_24h bigint,
failed_runs_24h bigint,
success_rate_24h numeric,
avg_duration_ms numeric,
p95_duration_ms numeric,
last_run_at timestamptz,
last_run_status text
)
LANGUAGE sql
STABLE
AS $$
SELECT
f.flow_slug,
f.flow_type,
cj.schedule AS cron_expression,
COALESCE(cj.active, true) AS is_active,
f.opt_max_attempts,
f.opt_base_delay,
f.opt_timeout,
f.total_runs_24h,
f.completed_runs_24h,
f.failed_runs_24h,
f.success_rate_24h,
f.avg_duration_ms,
f.p95_duration_ms,
lr.last_run_at,
lr.last_run_status
FROM $SCHEMA$.flow_stats f
INNER JOIN cron.job cj ON cj.jobname = 'pgflow:' || f.flow_slug
LEFT JOIN LATERAL (
SELECT r.completed_at AS last_run_at, r.status AS last_run_status
FROM pgflow.runs r
WHERE r.flow_slug = f.flow_slug
ORDER BY r.started_at DESC
LIMIT 1
) lr ON true
WHERE f.flow_slug = p_flow_slug
$$;
--SPLIT--
-- Function: list_flow_steps()
-- Lists flow steps with dependencies (for dependency graph)
CREATE OR REPLACE FUNCTION $SCHEMA$.list_flow_steps(p_flow_slug text)
RETURNS TABLE (
step_slug text,
step_type text,
opt_max_attempts integer,
opt_base_delay integer,
opt_timeout integer,
deps text[]
)
LANGUAGE sql
STABLE
AS $$
SELECT
s.step_slug,
s.step_type,
s.opt_max_attempts,
s.opt_base_delay,
s.opt_timeout,
COALESCE(d.dep_slugs, ARRAY[]::text[]) AS deps
FROM pgflow.steps s
LEFT JOIN LATERAL (
SELECT ARRAY_AGG(dep.dep_slug) AS dep_slugs
FROM pgflow.deps dep
WHERE dep.flow_slug = s.flow_slug AND dep.step_slug = s.step_slug
) d ON true
WHERE s.flow_slug = p_flow_slug
ORDER BY s.step_slug
$$;
--SPLIT--
-- Function: get_run_history_grid()
-- Gets run history data for GitHub-style activity grid
CREATE OR REPLACE FUNCTION $SCHEMA$.get_run_history_grid(
p_flow_slug text,
p_limit integer DEFAULT 50
)
RETURNS TABLE (
run_id uuid,
started_at timestamptz,
step_slug text,
status text,
duration_ms numeric
)
LANGUAGE sql
STABLE
AS $$
WITH recent_runs AS (
SELECT r.run_id, r.started_at
FROM pgflow.runs r
WHERE r.flow_slug = p_flow_slug
ORDER BY r.started_at DESC
LIMIT p_limit
),
step_results AS (
SELECT
rr.run_id,
rr.started_at,
ss.step_slug,
ss.status,
ss.duration_ms
FROM recent_runs rr
LEFT JOIN $SCHEMA$.step_states_with_tasks ss ON rr.run_id = ss.run_id
)
SELECT sr.run_id, sr.started_at, sr.step_slug, sr.status, sr.duration_ms
FROM step_results sr
ORDER BY sr.started_at DESC, sr.step_slug
$$;
--SPLIT--
-- Version tracking comment
-- Used to track which migration version has been applied
COMMENT ON VIEW $SCHEMA$.runs_with_progress IS 'PgFlowDashboard version=1';