priv/pgflow_helpers/sql/versions/v01/v01_up.sql

-- Tracking view for ecto_evolver version management
CREATE OR REPLACE VIEW $SCHEMA$.extensions_version AS SELECT 1 AS placeholder;

--SPLIT--

-- Compatibility shim: pgflow core's plpgsql functions call `realtime.send(...)`
-- at runtime (Supabase Realtime broadcast). On plain Postgres that schema
-- doesn't exist, causing `start_flow` / `complete_task` to raise. Install a
-- no-op shim so those paths succeed — the Elixir binding publishes via
-- Phoenix.PubSub instead. On Supabase where `realtime.send` is already
-- defined, we skip installation and leave the real implementation alone.
DO $$
BEGIN
  IF to_regprocedure('realtime.send(jsonb,text,text,boolean)') IS NULL THEN
    IF to_regnamespace('realtime') IS NULL THEN
      CREATE SCHEMA realtime;
    END IF;

    CREATE FUNCTION realtime.send(
      payload jsonb, event text, topic text, private boolean DEFAULT false
    ) RETURNS void AS $body$ SELECT NULL::void; $body$ LANGUAGE sql;
  END IF;
END $$;

--SPLIT--

-- Drop any pre-existing functions that may have different param names
DROP FUNCTION IF EXISTS $SCHEMA$.get_flow_input(uuid);

--SPLIT--

DROP FUNCTION IF EXISTS $SCHEMA$.flow_exists(text);

--SPLIT--

DROP FUNCTION IF EXISTS $SCHEMA$.get_step_output(uuid, text);

--SPLIT--

DROP FUNCTION IF EXISTS $SCHEMA$.register_worker(uuid, text, text);

--SPLIT--

DROP FUNCTION IF EXISTS $SCHEMA$.mark_worker_stopped(uuid);

--SPLIT--

DROP FUNCTION IF EXISTS $SCHEMA$.recover_stalled_tasks(double precision);

--SPLIT--

-- READ: Get flow run input data
CREATE FUNCTION $SCHEMA$.get_flow_input(p_run_id uuid)
RETURNS jsonb
LANGUAGE sql STABLE
SECURITY INVOKER
SET search_path = ''
AS $$
  SELECT input FROM pgflow.runs WHERE run_id = p_run_id;
$$;

--SPLIT--

-- READ: Check if flow exists
CREATE FUNCTION $SCHEMA$.flow_exists(p_flow_slug text)
RETURNS boolean
LANGUAGE sql STABLE
SECURITY INVOKER
SET search_path = ''
AS $$
  SELECT EXISTS(SELECT 1 FROM pgflow.flows WHERE flow_slug = p_flow_slug);
$$;

--SPLIT--

-- READ: Get step output
CREATE FUNCTION $SCHEMA$.get_step_output(p_run_id uuid, p_step_slug text)
RETURNS jsonb
LANGUAGE sql STABLE
SECURITY INVOKER
SET search_path = ''
AS $$
  SELECT output FROM pgflow.step_states WHERE run_id = p_run_id AND step_slug = p_step_slug;
$$;

--SPLIT--

-- WRITE: Register or heartbeat a worker
CREATE FUNCTION $SCHEMA$.register_worker(
  p_worker_id uuid, p_queue_name text, p_function_name text
)
RETURNS void
LANGUAGE sql VOLATILE
SECURITY INVOKER
SET search_path = ''
AS $$
  INSERT INTO pgflow.workers (worker_id, queue_name, function_name, started_at, last_heartbeat_at)
  VALUES (p_worker_id, p_queue_name, p_function_name, NOW(), NOW())
  ON CONFLICT (worker_id) DO UPDATE SET last_heartbeat_at = NOW();
$$;

--SPLIT--

-- WRITE: Mark worker as stopped
CREATE FUNCTION $SCHEMA$.mark_worker_stopped(p_worker_id uuid)
RETURNS void
LANGUAGE sql VOLATILE
SECURITY INVOKER
SET search_path = ''
AS $$
  UPDATE pgflow.workers SET stopped_at = clock_timestamp() WHERE worker_id = p_worker_id;
$$;

--SPLIT--

-- WRITE: Recover stalled tasks + reset pgmq visibility
CREATE FUNCTION $SCHEMA$.recover_stalled_tasks(p_stale_threshold double precision)
RETURNS TABLE(recovered_count bigint, vt_batches bigint)
LANGUAGE sql VOLATILE
SECURITY INVOKER
SET search_path = ''
AS $$
  WITH stalled AS (
    UPDATE pgflow.step_tasks
    SET status = 'queued', started_at = NULL, last_worker_id = NULL
    WHERE status = 'started'
      AND started_at < NOW() - make_interval(secs => p_stale_threshold)
    RETURNING flow_slug, message_id
  ),
  vt_reset AS MATERIALIZED (
    SELECT pgflow.set_vt_batch(
      s.flow_slug,
      array_agg(s.message_id),
      array_agg(0::integer)
    )
    FROM stalled s
    WHERE s.message_id IS NOT NULL
    GROUP BY s.flow_slug
  )
  SELECT
    (SELECT count(*) FROM stalled)::bigint AS recovered_count,
    (SELECT count(*) FROM vt_reset)::bigint AS vt_batches;
$$;

--SPLIT--

-- SCHEMA EXTENSION: flow_type column on pgflow.flows
-- This column is an Elixir-specific extension NOT present in the upstream
-- TypeScript pgflow project. It distinguishes background jobs (single-step
-- flows) from multi-step DAG workflows in the dashboard.
-- Default 'flow' ensures backward compatibility with existing flow records.
ALTER TABLE pgflow.flows
  ADD COLUMN IF NOT EXISTS flow_type text NOT NULL DEFAULT 'flow';

--SPLIT--

-- Drop any existing flow_type constraint (may have been created with fewer values)
ALTER TABLE pgflow.flows
  DROP CONSTRAINT IF EXISTS flow_type_is_valid;

--SPLIT--

-- Add constraint allowing flow types (cron scheduling is now an option on flow/job, not a separate type)
ALTER TABLE pgflow.flows
  ADD CONSTRAINT flow_type_is_valid CHECK (flow_type IN ('flow', 'job'));

--SPLIT--

-- Drop any pre-existing prune function (interval only signature)
DROP FUNCTION IF EXISTS $SCHEMA$.prune_data_older_than(interval);

--SPLIT--

-- Drop any pre-existing prune function (interval + text[] signature)
DROP FUNCTION IF EXISTS $SCHEMA$.prune_data_older_than(interval, text[]);

--SPLIT--

-- MAINTENANCE: Prune old run data
-- Deletes completed/failed runs and all associated data older than retention_interval
-- If p_flow_slugs is NULL or empty, prunes all flows (global cleanup)
-- If p_flow_slugs is provided, only prunes runs for those specific flows
CREATE FUNCTION $SCHEMA$.prune_data_older_than(
  p_retention_interval INTERVAL,
  p_flow_slugs TEXT[] DEFAULT NULL
) RETURNS TABLE(
  deleted_runs bigint,
  deleted_step_states bigint,
  deleted_step_tasks bigint,
  deleted_workers bigint
)
LANGUAGE plpgsql VOLATILE
SECURITY INVOKER
SET search_path = ''
AS $$
DECLARE
  cutoff_timestamp TIMESTAMPTZ := NOW() - p_retention_interval;
  v_deleted_runs bigint := 0;
  v_deleted_step_states bigint := 0;
  v_deleted_step_tasks bigint := 0;
  v_deleted_workers bigint := 0;
  flow_record RECORD;
  archive_table TEXT;
  dynamic_sql TEXT;
  filter_flows BOOLEAN := (p_flow_slugs IS NOT NULL AND array_length(p_flow_slugs, 1) > 0);
BEGIN
  -- Delete old worker records (always global - not flow-specific)
  DELETE FROM pgflow.workers
  WHERE last_heartbeat_at < cutoff_timestamp;
  GET DIAGNOSTICS v_deleted_workers = ROW_COUNT;

  -- Delete PGMQ messages from active queues BEFORE deleting step_tasks
  FOR flow_record IN
    SELECT
      r.flow_slug,
      ARRAY_AGG(st.message_id) FILTER (WHERE st.message_id IS NOT NULL) as message_ids
    FROM pgflow.runs r
    JOIN pgflow.step_tasks st ON st.run_id = r.run_id
    WHERE (
      (r.completed_at IS NOT NULL AND r.completed_at < cutoff_timestamp) OR
      (r.failed_at IS NOT NULL AND r.failed_at < cutoff_timestamp)
    )
    AND (NOT filter_flows OR r.flow_slug = ANY(p_flow_slugs))
    GROUP BY r.flow_slug
  LOOP
    IF flow_record.message_ids IS NOT NULL AND array_length(flow_record.message_ids, 1) > 0 THEN
      PERFORM pgmq.delete(flow_record.flow_slug, flow_record.message_ids);
    END IF;
  END LOOP;

  -- Delete step_tasks for old runs
  DELETE FROM pgflow.step_tasks
  WHERE run_id IN (
    SELECT run_id FROM pgflow.runs
    WHERE ((completed_at IS NOT NULL AND completed_at < cutoff_timestamp)
       OR (failed_at IS NOT NULL AND failed_at < cutoff_timestamp))
    AND (NOT filter_flows OR flow_slug = ANY(p_flow_slugs))
  );
  GET DIAGNOSTICS v_deleted_step_tasks = ROW_COUNT;

  -- Delete step_states for old runs
  DELETE FROM pgflow.step_states
  WHERE run_id IN (
    SELECT run_id FROM pgflow.runs
    WHERE ((completed_at IS NOT NULL AND completed_at < cutoff_timestamp)
       OR (failed_at IS NOT NULL AND failed_at < cutoff_timestamp))
    AND (NOT filter_flows OR flow_slug = ANY(p_flow_slugs))
  );
  GET DIAGNOSTICS v_deleted_step_states = ROW_COUNT;

  -- Delete old runs
  DELETE FROM pgflow.runs
  WHERE ((completed_at IS NOT NULL AND completed_at < cutoff_timestamp)
     OR (failed_at IS NOT NULL AND failed_at < cutoff_timestamp))
  AND (NOT filter_flows OR flow_slug = ANY(p_flow_slugs));
  GET DIAGNOSTICS v_deleted_runs = ROW_COUNT;

  -- Prune archived PGMQ messages (only for specified flows, or all if no filter)
  FOR flow_record IN
    SELECT DISTINCT flow_slug FROM pgflow.flows
    WHERE (NOT filter_flows OR flow_slug = ANY(p_flow_slugs))
  LOOP
    archive_table := pgmq.format_table_name(flow_record.flow_slug, 'a');
    IF EXISTS (
      SELECT 1 FROM information_schema.tables
      WHERE table_schema = 'pgmq' AND table_name = archive_table
    ) THEN
      dynamic_sql := format('DELETE FROM pgmq.%I WHERE archived_at < $1', archive_table);
      EXECUTE dynamic_sql USING cutoff_timestamp;
    END IF;
  END LOOP;

  RETURN QUERY SELECT v_deleted_runs, v_deleted_step_states, v_deleted_step_tasks, v_deleted_workers;
END;
$$;

--SPLIT--

-- PERFORMANCE: Indexes for dashboard time-range queries
-- These indexes optimize the dashboard's filtering by started_at and per-flow queries
CREATE INDEX IF NOT EXISTS idx_runs_started_at ON pgflow.runs(started_at DESC);

--SPLIT--

-- Composite index for per-flow queries (used by crons/jobs show pages with lateral joins)
CREATE INDEX IF NOT EXISTS idx_runs_flow_started ON pgflow.runs(flow_slug, started_at DESC);

--SPLIT--

-- Index for worker health queries (ORDER BY last_heartbeat_at in dashboard, used by health status determination)
CREATE INDEX IF NOT EXISTS idx_workers_heartbeat ON pgflow.workers(last_heartbeat_at DESC);

--SPLIT--

-- Version tracking
COMMENT ON VIEW $SCHEMA$.extensions_version IS 'PgFlow version=1';