------------------------------------------------------------
-- Schema, tables, records, privileges, indexes, etc
------------------------------------------------------------
-- When installed as an extension, we don't need to create the `pgmq` schema
-- because it is automatically created by postgres due to being declared in
-- the extension control file
DO
$$
BEGIN
IF (SELECT NOT EXISTS( SELECT 1 FROM pg_extension WHERE extname = 'pgmq')) THEN
CREATE SCHEMA IF NOT EXISTS pgmq;
END IF;
END
$$;
-- Table where queues and metadata about them is stored
CREATE TABLE IF NOT EXISTS pgmq.meta (
queue_name VARCHAR UNIQUE NOT NULL,
is_partitioned BOOLEAN NOT NULL,
is_unlogged BOOLEAN NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL
);
-- Table to track notification throttling for queues
CREATE UNLOGGED TABLE IF NOT EXISTS pgmq.notify_insert_throttle (
queue_name VARCHAR UNIQUE NOT NULL -- Queue name (without 'q_' prefix)
CONSTRAINT notify_insert_throttle_meta_queue_name_fk
REFERENCES pgmq.meta (queue_name)
ON DELETE CASCADE,
throttle_interval_ms INTEGER NOT NULL DEFAULT 0, -- Min milliseconds between notifications (0 = no throttling)
last_notified_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT to_timestamp(0) -- Timestamp of last sent notification
);
CREATE INDEX IF NOT EXISTS idx_notify_throttle_active
ON pgmq.notify_insert_throttle (queue_name, last_notified_at)
WHERE throttle_interval_ms > 0;
CREATE TABLE IF NOT EXISTS pgmq.topic_bindings
(
pattern text NOT NULL, -- Wildcard pattern for routing key matching (* = one segment, # = zero or more segments)
queue_name text NOT NULL -- Name of the queue that receives messages when pattern matches
CONSTRAINT topic_bindings_meta_queue_name_fk
REFERENCES pgmq.meta (queue_name)
ON DELETE CASCADE,
bound_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL, -- Timestamp when the binding was created
compiled_regex text GENERATED ALWAYS AS (
-- Pre-compile the pattern to regex for faster matching
-- This avoids runtime compilation on every send_topic call
'^' ||
replace(
replace(
regexp_replace(pattern, '([.+?{}()|\[\]\\^$])', '\\\1', 'g'),
'*', '[^.]+'
),
'#', '.*'
) || '$'
) STORED, -- Computed column: stores the compiled regex pattern
CONSTRAINT topic_bindings_unique_pattern_queue UNIQUE (pattern, queue_name)
);
-- Create covering index for better performance when scanning patterns
-- Includes queue_name and compiled_regex to allow index-only scans (no table access needed)
CREATE INDEX IF NOT EXISTS idx_topic_bindings_covering ON pgmq.topic_bindings (pattern) INCLUDE (queue_name, compiled_regex);
-- Allow pgmq.meta to be dumped by `pg_dump` when pgmq is installed as an extension
DO
$$
BEGIN
IF EXISTS(SELECT 1 FROM pg_extension WHERE extname = 'pgmq') THEN
PERFORM pg_catalog.pg_extension_config_dump('pgmq.meta', '');
PERFORM pg_catalog.pg_extension_config_dump('pgmq.notify_insert_throttle', '');
PERFORM pg_catalog.pg_extension_config_dump('pgmq.topic_bindings', '');
END IF;
END
$$;
-- Grant permission to pg_monitor to all tables and sequences
GRANT USAGE ON SCHEMA pgmq TO pg_monitor;
GRANT SELECT ON ALL TABLES IN SCHEMA pgmq TO pg_monitor;
GRANT SELECT ON ALL SEQUENCES IN SCHEMA pgmq TO pg_monitor;
ALTER DEFAULT PRIVILEGES IN SCHEMA pgmq GRANT SELECT ON TABLES TO pg_monitor;
ALTER DEFAULT PRIVILEGES IN SCHEMA pgmq GRANT SELECT ON SEQUENCES TO pg_monitor;
-- This type has the shape of a message in a queue, and is often returned by
-- pgmq functions that return messages
CREATE TYPE pgmq.message_record AS (
msg_id BIGINT,
read_ct INTEGER,
enqueued_at TIMESTAMP WITH TIME ZONE,
last_read_at TIMESTAMP WITH TIME ZONE,
vt TIMESTAMP WITH TIME ZONE,
message JSONB,
headers JSONB
);
CREATE TYPE pgmq.queue_record AS (
queue_name VARCHAR,
is_partitioned BOOLEAN,
is_unlogged BOOLEAN,
created_at TIMESTAMP WITH TIME ZONE
);
------------------------------------------------------------
-- Functions
------------------------------------------------------------
-- prevents race conditions during queue creation by acquiring a transaction-level advisory lock
-- uses a transaction advisory lock maintain the lock until transaction commit
-- a race condition would still exist if lock was released before commit
CREATE FUNCTION pgmq.acquire_queue_lock(queue_name TEXT)
RETURNS void AS $$
BEGIN
PERFORM pg_advisory_xact_lock(hashtext('pgmq.queue_' || queue_name));
END;
$$ LANGUAGE plpgsql;
-- read_grouped_round_robin
-- reads messages while preserving FIFO within groups and interleaving across groups (layered round-robin)
CREATE FUNCTION pgmq.read_grouped_rr(
queue_name TEXT,
vt INTEGER,
qty INTEGER
)
RETURNS SETOF pgmq.message_record AS $$
DECLARE
sql TEXT;
qtable TEXT := pgmq.format_table_name(queue_name, 'q');
BEGIN
sql := FORMAT(
$QUERY$
WITH fifo_groups AS (
-- Determine the absolute head (oldest) message id per FIFO group, regardless of visibility
SELECT
COALESCE(headers->>'x-pgmq-group', '_default_fifo_group') AS fifo_key,
MIN(msg_id) AS head_msg_id
FROM pgmq.%1$I
GROUP BY COALESCE(headers->>'x-pgmq-group', '_default_fifo_group')
),
eligible_groups AS (
-- Only groups whose head message is currently visible
-- Acquire a transaction-level advisory lock per group to prevent concurrent selection
SELECT
g.fifo_key,
g.head_msg_id,
ROW_NUMBER() OVER (ORDER BY g.head_msg_id) AS group_priority
FROM fifo_groups g
JOIN pgmq.%2$I h ON h.msg_id = g.head_msg_id
WHERE h.vt <= clock_timestamp()
AND pg_try_advisory_xact_lock(pg_catalog.hashtextextended(g.fifo_key, 0))
),
available_messages AS (
-- All currently visible messages starting at the head for each eligible group
SELECT
m.msg_id,
eg.group_priority,
ROW_NUMBER() OVER (
PARTITION BY eg.fifo_key
ORDER BY m.msg_id
) AS msg_rank_in_group
FROM pgmq.%3$I m
JOIN eligible_groups eg
ON COALESCE(m.headers->>'x-pgmq-group', '_default_fifo_group') = eg.fifo_key
WHERE m.vt <= clock_timestamp()
AND m.msg_id >= eg.head_msg_id
),
ordered_messages AS (
-- Layered round-robin: take rank 1 of all groups by group_priority, then rank 2, etc.
-- Assign selection order before locking
SELECT msg_id, ROW_NUMBER() OVER (ORDER BY msg_rank_in_group, group_priority) as selection_order
FROM available_messages
),
selected_messages AS (
-- Lock the messages in the correct order, preserving selection_order
SELECT om.msg_id, om.selection_order
FROM ordered_messages om
JOIN pgmq.%4$I m ON m.msg_id = om.msg_id
WHERE om.selection_order <= $1
ORDER BY om.selection_order
FOR UPDATE OF m SKIP LOCKED
),
updated_messages AS (
UPDATE pgmq.%5$I m
SET
vt = clock_timestamp() + %6$L,
read_ct = read_ct + 1,
last_read_at = clock_timestamp()
FROM selected_messages sm
WHERE m.msg_id = sm.msg_id
AND m.vt <= clock_timestamp() -- final guard to avoid duplicate reads under races
RETURNING m.msg_id, m.read_ct, m.enqueued_at, m.last_read_at, m.vt, m.message, m.headers, sm.selection_order
)
SELECT msg_id, read_ct, enqueued_at, last_read_at, vt, message, headers
FROM updated_messages
ORDER BY selection_order;
$QUERY$,
qtable, qtable, qtable, qtable, qtable, make_interval(secs => vt)
);
RETURN QUERY EXECUTE sql USING qty;
END;
$$ LANGUAGE plpgsql;
-- read_grouped_rr_with_poll
-- reads messages using round-robin layering across groups, with polling support
CREATE FUNCTION pgmq.read_grouped_rr_with_poll(
queue_name TEXT,
vt INTEGER,
qty INTEGER,
max_poll_seconds INTEGER DEFAULT 5,
poll_interval_ms INTEGER DEFAULT 100
)
RETURNS SETOF pgmq.message_record AS $$
DECLARE
r pgmq.message_record;
stop_at TIMESTAMP;
BEGIN
stop_at := clock_timestamp() + make_interval(secs => max_poll_seconds);
LOOP
IF (SELECT clock_timestamp() >= stop_at) THEN
RETURN;
END IF;
FOR r IN
SELECT * FROM pgmq.read_grouped_rr(queue_name, vt, qty)
LOOP
RETURN NEXT r;
END LOOP;
IF FOUND THEN
RETURN;
ELSE
PERFORM pg_sleep(poll_interval_ms::numeric / 1000);
END IF;
END LOOP;
END;
$$ LANGUAGE plpgsql;
-- a helper to format table names and check for invalid characters
CREATE FUNCTION pgmq.format_table_name(queue_name text, prefix text)
RETURNS TEXT AS $$
BEGIN
IF queue_name ~ '\$|;|--|'''
THEN
RAISE EXCEPTION 'queue name contains invalid characters: $, ;, --, or \''';
END IF;
RETURN lower(prefix || '_' || queue_name);
END;
$$ LANGUAGE plpgsql;
-- read
-- reads a number of messages from a queue, setting a visibility timeout on them
CREATE FUNCTION pgmq.read(
queue_name TEXT,
vt INTEGER,
qty INTEGER,
conditional JSONB DEFAULT '{}'
)
RETURNS SETOF pgmq.message_record AS $$
DECLARE
sql TEXT;
qtable TEXT := pgmq.format_table_name(queue_name, 'q');
BEGIN
sql := FORMAT(
$QUERY$
WITH cte AS
(
SELECT msg_id
FROM pgmq.%I
WHERE vt <= clock_timestamp() AND CASE
WHEN %L != '{}'::jsonb THEN (message @> %2$L)::integer
ELSE 1
END = 1
ORDER BY msg_id ASC
LIMIT $1
FOR UPDATE SKIP LOCKED
)
UPDATE pgmq.%I m
SET
last_read_at = clock_timestamp(),
vt = clock_timestamp() + %L,
read_ct = read_ct + 1
FROM cte
WHERE m.msg_id = cte.msg_id
RETURNING m.msg_id, m.read_ct, m.enqueued_at, m.last_read_at, m.vt, m.message, m.headers;
$QUERY$,
qtable, conditional, qtable, make_interval(secs => vt)
);
RETURN QUERY EXECUTE sql USING qty;
END;
$$ LANGUAGE plpgsql;
-- read_grouped
-- reads messages with AWS SQS FIFO-style batch retrieval behavior
-- attempts to return as many messages as possible from the same message group
CREATE FUNCTION pgmq.read_grouped(
queue_name TEXT,
vt INTEGER,
qty INTEGER
)
RETURNS SETOF pgmq.message_record AS $$
DECLARE
sql TEXT;
qtable TEXT := pgmq.format_table_name(queue_name, 'q');
BEGIN
sql := FORMAT(
$QUERY$
WITH fifo_groups AS (
-- Find the minimum msg_id for each FIFO group that's ready to be processed
SELECT
COALESCE(headers->>'x-pgmq-group', '_default_fifo_group') as fifo_key,
MIN(msg_id) as min_msg_id
FROM pgmq.%I
WHERE vt <= clock_timestamp()
GROUP BY COALESCE(headers->>'x-pgmq-group', '_default_fifo_group')
),
locked_groups AS (
-- Lock the first available message in each FIFO group
SELECT
m.msg_id,
fg.fifo_key
FROM pgmq.%I m
INNER JOIN fifo_groups fg ON
COALESCE(m.headers->>'x-pgmq-group', '_default_fifo_group') = fg.fifo_key
AND m.msg_id = fg.min_msg_id
WHERE m.vt <= clock_timestamp()
ORDER BY m.msg_id ASC
FOR UPDATE SKIP LOCKED
),
group_priorities AS (
-- Assign priority to groups based on their oldest message
SELECT
fifo_key,
msg_id as min_msg_id,
ROW_NUMBER() OVER (ORDER BY msg_id) as group_priority
FROM locked_groups
),
filtered_groups as (
SELECT * FROM group_priorities gp
WHERE NOT EXISTS (
-- Ensure no earlier message in this group is currently being processed
SELECT 1
FROM pgmq.%I m2
WHERE COALESCE(m2.headers->>'x-pgmq-group', '_default_fifo_group') = gp.fifo_key
AND m2.vt > clock_timestamp()
AND m2.msg_id < gp.min_msg_id
)
),
available_messages as (
SELECT gp.fifo_key, t.msg_id,gp.group_priority,
ROW_NUMBER() OVER (PARTITION BY gp.fifo_key ORDER BY t.msg_id) as msg_rank_in_group
FROM filtered_groups gp
CROSS JOIN LATERAL (
SELECT *
FROM pgmq.%I t
WHERE COALESCE(t.headers->>'x-pgmq-group', '_default_fifo_group') = gp.fifo_key
AND t.vt <= clock_timestamp()
ORDER BY msg_id
LIMIT $1 -- tip to limit query impact, we know we need at most qty in each group
) t
ORDER BY gp.group_priority
),
batch_selection AS (
-- Select messages to fill batch, prioritizing earliest group
SELECT
msg_id,
ROW_NUMBER() OVER (ORDER BY group_priority, msg_rank_in_group) as overall_rank
FROM available_messages
),
selected_messages AS (
-- Limit to requested quantity
SELECT msg_id
FROM batch_selection
WHERE overall_rank <= $1
ORDER BY msg_id
FOR UPDATE SKIP LOCKED
)
UPDATE pgmq.%I m
SET
vt = clock_timestamp() + %L,
read_ct = read_ct + 1,
last_read_at = clock_timestamp()
FROM selected_messages sm
WHERE m.msg_id = sm.msg_id
RETURNING m.msg_id, m.read_ct, m.enqueued_at, m.last_read_at, m.vt, m.message, m.headers;
$QUERY$,
qtable, qtable, qtable, qtable, qtable, make_interval(secs => vt)
);
RETURN QUERY EXECUTE sql USING qty;
END;
$$ LANGUAGE plpgsql;
-- read_grouped_with_poll
-- reads messages with AWS SQS FIFO-style batch retrieval behavior, with polling support
CREATE FUNCTION pgmq.read_grouped_with_poll(
queue_name TEXT,
vt INTEGER,
qty INTEGER,
max_poll_seconds INTEGER DEFAULT 5,
poll_interval_ms INTEGER DEFAULT 100
)
RETURNS SETOF pgmq.message_record AS $$
DECLARE
r pgmq.message_record;
stop_at TIMESTAMP;
BEGIN
stop_at := clock_timestamp() + make_interval(secs => max_poll_seconds);
LOOP
IF (SELECT clock_timestamp() >= stop_at) THEN
RETURN;
END IF;
FOR r IN
SELECT * FROM pgmq.read_grouped(queue_name, vt, qty)
LOOP
RETURN NEXT r;
END LOOP;
IF FOUND THEN
RETURN;
ELSE
PERFORM pg_sleep(poll_interval_ms::numeric / 1000);
END IF;
END LOOP;
END;
$$ LANGUAGE plpgsql;
---- read_with_poll
---- reads a number of messages from a queue, setting a visibility timeout on them
CREATE FUNCTION pgmq.read_with_poll(
queue_name TEXT,
vt INTEGER,
qty INTEGER,
max_poll_seconds INTEGER DEFAULT 5,
poll_interval_ms INTEGER DEFAULT 100,
conditional JSONB DEFAULT '{}'
)
RETURNS SETOF pgmq.message_record AS $$
DECLARE
r pgmq.message_record;
stop_at TIMESTAMP;
sql TEXT;
qtable TEXT := pgmq.format_table_name(queue_name, 'q');
BEGIN
stop_at := clock_timestamp() + make_interval(secs => max_poll_seconds);
LOOP
IF (SELECT clock_timestamp() >= stop_at) THEN
RETURN;
END IF;
sql := FORMAT(
$QUERY$
WITH cte AS
(
SELECT msg_id
FROM pgmq.%I
WHERE vt <= clock_timestamp() AND CASE
WHEN %L != '{}'::jsonb THEN (message @> %2$L)::integer
ELSE 1
END = 1
ORDER BY msg_id ASC
LIMIT $1
FOR UPDATE SKIP LOCKED
)
UPDATE pgmq.%I m
SET
last_read_at = clock_timestamp(),
vt = clock_timestamp() + %L,
read_ct = read_ct + 1
FROM cte
WHERE m.msg_id = cte.msg_id
RETURNING m.msg_id, m.read_ct, m.enqueued_at, m.last_read_at, m.vt, m.message, m.headers;
$QUERY$,
qtable, conditional, qtable, make_interval(secs => vt)
);
FOR r IN
EXECUTE sql USING qty
LOOP
RETURN NEXT r;
END LOOP;
IF FOUND THEN
RETURN;
ELSE
PERFORM pg_sleep(poll_interval_ms::numeric / 1000);
END IF;
END LOOP;
END;
$$ LANGUAGE plpgsql;
---- archive
---- removes a message from the queue, and sends it to the archive, where its
---- saved permanently.
CREATE FUNCTION pgmq.archive(
queue_name TEXT,
msg_id BIGINT
)
RETURNS BOOLEAN AS $$
DECLARE
sql TEXT;
result BIGINT;
qtable TEXT := pgmq.format_table_name(queue_name, 'q');
atable TEXT := pgmq.format_table_name(queue_name, 'a');
BEGIN
sql := FORMAT(
$QUERY$
WITH archived AS (
DELETE FROM pgmq.%I
WHERE msg_id = $1
RETURNING msg_id, vt, read_ct, enqueued_at, last_read_at, message, headers
)
INSERT INTO pgmq.%I (msg_id, vt, read_ct, enqueued_at, last_read_at, message, headers)
SELECT msg_id, vt, read_ct, enqueued_at, last_read_at, message, headers
FROM archived
RETURNING msg_id;
$QUERY$,
qtable, atable
);
EXECUTE sql USING msg_id INTO result;
RETURN NOT (result IS NULL);
END;
$$ LANGUAGE plpgsql;
---- archive
---- removes an array of message ids from the queue, and sends it to the archive,
---- where these messages will be saved permanently.
CREATE FUNCTION pgmq.archive(
queue_name TEXT,
msg_ids BIGINT[]
)
RETURNS SETOF BIGINT AS $$
DECLARE
sql TEXT;
qtable TEXT := pgmq.format_table_name(queue_name, 'q');
atable TEXT := pgmq.format_table_name(queue_name, 'a');
BEGIN
sql := FORMAT(
$QUERY$
WITH archived AS (
DELETE FROM pgmq.%I
WHERE msg_id = ANY($1)
RETURNING msg_id, vt, read_ct, enqueued_at, last_read_at, message, headers
)
INSERT INTO pgmq.%I (msg_id, vt, read_ct, enqueued_at, last_read_at, message, headers)
SELECT msg_id, vt, read_ct, enqueued_at, last_read_at, message, headers
FROM archived
RETURNING msg_id;
$QUERY$,
qtable, atable
);
RETURN QUERY EXECUTE sql USING msg_ids;
END;
$$ LANGUAGE plpgsql;
---- delete
---- deletes a message id from the queue permanently
CREATE FUNCTION pgmq.delete(
queue_name TEXT,
msg_id BIGINT
)
RETURNS BOOLEAN AS $$
DECLARE
sql TEXT;
result BIGINT;
qtable TEXT := pgmq.format_table_name(queue_name, 'q');
BEGIN
sql := FORMAT(
$QUERY$
DELETE FROM pgmq.%I
WHERE msg_id = $1
RETURNING msg_id
$QUERY$,
qtable
);
EXECUTE sql USING msg_id INTO result;
RETURN NOT (result IS NULL);
END;
$$ LANGUAGE plpgsql;
---- delete
---- deletes an array of message ids from the queue permanently
CREATE FUNCTION pgmq.delete(
queue_name TEXT,
msg_ids BIGINT[]
)
RETURNS SETOF BIGINT AS $$
DECLARE
sql TEXT;
qtable TEXT := pgmq.format_table_name(queue_name, 'q');
BEGIN
sql := FORMAT(
$QUERY$
DELETE FROM pgmq.%I
WHERE msg_id = ANY($1)
RETURNING msg_id
$QUERY$,
qtable
);
RETURN QUERY EXECUTE sql USING msg_ids;
END;
$$ LANGUAGE plpgsql;
-- send: actual implementation
CREATE FUNCTION pgmq.send(
queue_name TEXT,
msg JSONB,
headers JSONB,
delay TIMESTAMP WITH TIME ZONE
) RETURNS SETOF BIGINT AS $$
DECLARE
sql TEXT;
qtable TEXT := pgmq.format_table_name(queue_name, 'q');
BEGIN
sql := FORMAT(
$QUERY$
INSERT INTO pgmq.%I (vt, message, headers)
VALUES ($2, $1, $3)
RETURNING msg_id;
$QUERY$,
qtable
);
RETURN QUERY EXECUTE sql USING msg, delay, headers;
END;
$$ LANGUAGE plpgsql;
-- send: 2 args, no delay or headers
CREATE FUNCTION pgmq.send(
queue_name TEXT,
msg JSONB
) RETURNS SETOF BIGINT AS $$
SELECT * FROM pgmq.send(queue_name, msg, NULL, clock_timestamp());
$$ LANGUAGE sql;
-- send: 3 args with headers
CREATE FUNCTION pgmq.send(
queue_name TEXT,
msg JSONB,
headers JSONB
) RETURNS SETOF BIGINT AS $$
SELECT * FROM pgmq.send(queue_name, msg, headers, clock_timestamp());
$$ LANGUAGE sql;
-- send: 3 args with integer delay
CREATE FUNCTION pgmq.send(
queue_name TEXT,
msg JSONB,
delay INTEGER
) RETURNS SETOF BIGINT AS $$
SELECT * FROM pgmq.send(queue_name, msg, NULL, clock_timestamp() + make_interval(secs => delay));
$$ LANGUAGE sql;
-- send: 3 args with timestamp
CREATE FUNCTION pgmq.send(
queue_name TEXT,
msg JSONB,
delay TIMESTAMP WITH TIME ZONE
) RETURNS SETOF BIGINT AS $$
SELECT * FROM pgmq.send(queue_name, msg, NULL, delay);
$$ LANGUAGE sql;
-- send: 4 args with integer delay
CREATE FUNCTION pgmq.send(
queue_name TEXT,
msg JSONB,
headers JSONB,
delay INTEGER
) RETURNS SETOF BIGINT AS $$
SELECT * FROM pgmq.send(queue_name, msg, headers, clock_timestamp() + make_interval(secs => delay));
$$ LANGUAGE sql;
-- _validate_batch_params: Private function to validate batch parameters
CREATE FUNCTION pgmq._validate_batch_params(
msgs JSONB[],
headers JSONB[]
) RETURNS void AS $$
BEGIN
-- Validate that msgs is not NULL or empty
IF msgs IS NULL OR array_length(msgs, 1) IS NULL THEN
RAISE EXCEPTION 'msgs cannot be NULL or empty';
END IF;
-- Validate that headers array length matches msgs array length if headers is provided
-- Note: array_length returns NULL for empty arrays, so we use COALESCE to treat empty arrays as length 0
IF headers IS NOT NULL AND COALESCE(array_length(headers, 1), 0) != COALESCE(array_length(msgs, 1), 0) THEN
RAISE EXCEPTION 'headers array length (%) must match msgs array length (%)',
COALESCE(array_length(headers, 1), 0), COALESCE(array_length(msgs, 1), 0);
END IF;
END;
$$ LANGUAGE plpgsql;
-- _send_batch: Private function that performs the actual batch insert without validation
CREATE FUNCTION pgmq._send_batch(
queue_name TEXT,
msgs JSONB[],
headers JSONB[],
delay TIMESTAMP WITH TIME ZONE
) RETURNS SETOF BIGINT AS $$
DECLARE
sql TEXT;
qtable TEXT := pgmq.format_table_name(queue_name, 'q');
BEGIN
sql := FORMAT(
$QUERY$
INSERT INTO pgmq.%I (vt, message, headers)
SELECT $2, unnest($1), unnest(coalesce($3, ARRAY[]::jsonb[]))
RETURNING msg_id;
$QUERY$,
qtable
);
RETURN QUERY EXECUTE sql USING msgs, delay, headers;
END;
$$ LANGUAGE plpgsql;
-- send_batch: Public function with validation
CREATE FUNCTION pgmq.send_batch(
queue_name TEXT,
msgs JSONB[],
headers JSONB[],
delay TIMESTAMP WITH TIME ZONE
) RETURNS SETOF BIGINT AS $$
BEGIN
PERFORM pgmq._validate_batch_params(msgs, headers);
RETURN QUERY SELECT * FROM pgmq._send_batch(queue_name, msgs, headers, delay);
END;
$$ LANGUAGE plpgsql;
-- send batch: 2 args
CREATE FUNCTION pgmq.send_batch(
queue_name TEXT,
msgs JSONB[]
) RETURNS SETOF BIGINT AS $$
SELECT * FROM pgmq.send_batch(queue_name, msgs, NULL, clock_timestamp());
$$ LANGUAGE sql;
-- send batch: 3 args with headers
CREATE FUNCTION pgmq.send_batch(
queue_name TEXT,
msgs JSONB[],
headers JSONB[]
) RETURNS SETOF BIGINT AS $$
SELECT * FROM pgmq.send_batch(queue_name, msgs, headers, clock_timestamp());
$$ LANGUAGE sql;
-- send batch: 3 args with integer delay
CREATE FUNCTION pgmq.send_batch(
queue_name TEXT,
msgs JSONB[],
delay INTEGER
) RETURNS SETOF BIGINT AS $$
SELECT * FROM pgmq.send_batch(queue_name, msgs, NULL, clock_timestamp() + make_interval(secs => delay));
$$ LANGUAGE sql;
-- send batch: 3 args with timestamp
CREATE FUNCTION pgmq.send_batch(
queue_name TEXT,
msgs JSONB[],
delay TIMESTAMP WITH TIME ZONE
) RETURNS SETOF BIGINT AS $$
SELECT * FROM pgmq.send_batch(queue_name, msgs, NULL, delay);
$$ LANGUAGE sql;
-- send_batch: 4 args with integer delay
CREATE FUNCTION pgmq.send_batch(
queue_name TEXT,
msgs JSONB[],
headers JSONB[],
delay INTEGER
) RETURNS SETOF BIGINT AS $$
SELECT * FROM pgmq.send_batch(queue_name, msgs, headers, clock_timestamp() + make_interval(secs => delay));
$$ LANGUAGE sql;
-- returned by pgmq.metrics() and pgmq.metrics_all
CREATE TYPE pgmq.metrics_result AS (
queue_name text,
queue_length bigint,
newest_msg_age_sec int,
oldest_msg_age_sec int,
total_messages bigint,
scrape_time timestamp with time zone,
queue_visible_length bigint
);
-- get metrics for a single queue
CREATE FUNCTION pgmq.metrics(queue_name TEXT)
RETURNS pgmq.metrics_result AS $$
DECLARE
result_row pgmq.metrics_result;
query TEXT;
qtable TEXT := pgmq.format_table_name(queue_name, 'q');
BEGIN
query := FORMAT(
$QUERY$
WITH q_summary AS (
SELECT
count(*) as queue_length,
count(CASE WHEN vt <= NOW() THEN 1 END) as queue_visible_length,
EXTRACT(epoch FROM (NOW() - max(enqueued_at)))::int as newest_msg_age_sec,
EXTRACT(epoch FROM (NOW() - min(enqueued_at)))::int as oldest_msg_age_sec,
NOW() as scrape_time
FROM pgmq.%I
),
all_metrics AS (
SELECT CASE
WHEN is_called THEN last_value ELSE 0
END as total_messages
FROM pgmq.%I
)
SELECT
%L as queue_name,
q_summary.queue_length,
q_summary.newest_msg_age_sec,
q_summary.oldest_msg_age_sec,
all_metrics.total_messages,
q_summary.scrape_time,
q_summary.queue_visible_length
FROM q_summary, all_metrics
$QUERY$,
qtable, qtable || '_msg_id_seq', queue_name
);
EXECUTE query INTO result_row;
RETURN result_row;
END;
$$ LANGUAGE plpgsql;
-- get metrics for all queues
CREATE FUNCTION pgmq."metrics_all"()
RETURNS SETOF pgmq.metrics_result AS $$
DECLARE
row_name RECORD;
result_row pgmq.metrics_result;
BEGIN
FOR row_name IN SELECT queue_name FROM pgmq.meta LOOP
result_row := pgmq.metrics(row_name.queue_name);
RETURN NEXT result_row;
END LOOP;
END;
$$ LANGUAGE plpgsql;
-- list queues
CREATE FUNCTION pgmq."list_queues"()
RETURNS SETOF pgmq.queue_record AS $$
BEGIN
RETURN QUERY SELECT * FROM pgmq.meta;
END
$$ LANGUAGE plpgsql;
-- purge queue, deleting all entries in it.
CREATE OR REPLACE FUNCTION pgmq."purge_queue"(queue_name TEXT)
RETURNS BIGINT AS $$
DECLARE
deleted_count INTEGER;
qtable TEXT := pgmq.format_table_name(queue_name, 'q');
BEGIN
-- Get the row count before truncating
EXECUTE format('SELECT count(*) FROM pgmq.%I', qtable) INTO deleted_count;
-- Use TRUNCATE for better performance on large tables
EXECUTE format('TRUNCATE TABLE pgmq.%I', qtable);
-- Return the number of purged rows
RETURN deleted_count;
END
$$ LANGUAGE plpgsql;
-- unassign archive, so it can be kept when a queue is deleted
CREATE FUNCTION pgmq."detach_archive"(queue_name TEXT)
RETURNS VOID AS $$
DECLARE
atable TEXT := pgmq.format_table_name(queue_name, 'a');
BEGIN
RAISE WARNING 'detach_archive(queue_name) is deprecated and is a no-op. It will be removed in PGMQ v2.0. Archive tables are no longer member objects.';
END
$$ LANGUAGE plpgsql;
-- pop: implementation
CREATE FUNCTION pgmq.pop(queue_name TEXT, qty INTEGER DEFAULT 1)
RETURNS SETOF pgmq.message_record AS $$
DECLARE
sql TEXT;
result pgmq.message_record;
qtable TEXT := pgmq.format_table_name(queue_name, 'q');
BEGIN
sql := FORMAT(
$QUERY$
WITH cte AS
(
SELECT msg_id
FROM pgmq.%I
WHERE vt <= clock_timestamp()
ORDER BY msg_id ASC
LIMIT $1
FOR UPDATE SKIP LOCKED
)
DELETE from pgmq.%I
WHERE msg_id IN (select msg_id from cte)
RETURNING msg_id, read_ct, enqueued_at, last_read_at, vt, message, headers;
$QUERY$,
qtable, qtable
);
RETURN QUERY EXECUTE sql USING qty;
END;
$$ LANGUAGE plpgsql;
-- Sets timestamp vt of a message, returns it
CREATE FUNCTION pgmq.set_vt(queue_name TEXT, msg_id BIGINT, vt TIMESTAMP WITH TIME ZONE)
RETURNS SETOF pgmq.message_record AS $$
DECLARE
sql TEXT;
result pgmq.message_record;
qtable TEXT := pgmq.format_table_name(queue_name, 'q');
BEGIN
sql := FORMAT(
$QUERY$
UPDATE pgmq.%I
SET vt = $1
WHERE msg_id = $2
RETURNING msg_id, read_ct, enqueued_at, last_read_at, vt, message, headers;
$QUERY$,
qtable
);
RETURN QUERY EXECUTE sql USING vt, msg_id;
END;
$$ LANGUAGE plpgsql;
-- Sets integer vt of a message, returns it
CREATE FUNCTION pgmq.set_vt(queue_name TEXT, msg_id BIGINT, vt INTEGER)
RETURNS SETOF pgmq.message_record AS $$
SELECT * FROM pgmq.set_vt(queue_name, msg_id, clock_timestamp() + make_interval(secs => vt));
$$ LANGUAGE sql;
-- Sets timestamp vt of multiple messages, returns them
CREATE FUNCTION pgmq.set_vt(
queue_name TEXT,
msg_ids BIGINT[],
vt TIMESTAMP WITH TIME ZONE
)
RETURNS SETOF pgmq.message_record AS $$
DECLARE
sql TEXT;
qtable TEXT := pgmq.format_table_name(queue_name, 'q');
BEGIN
sql := FORMAT(
$QUERY$
UPDATE pgmq.%I
SET vt = $1
WHERE msg_id = ANY($2)
RETURNING msg_id, read_ct, enqueued_at, last_read_at, vt, message, headers;
$QUERY$,
qtable
);
RETURN QUERY EXECUTE sql USING vt, msg_ids;
END;
$$ LANGUAGE plpgsql;
-- Sets integer vt of multiple messages, returns them
CREATE FUNCTION pgmq.set_vt(
queue_name TEXT,
msg_ids BIGINT[],
vt INTEGER
)
RETURNS SETOF pgmq.message_record AS $$
SELECT * FROM pgmq.set_vt(queue_name, msg_ids, clock_timestamp() + make_interval(secs => vt));
$$ LANGUAGE sql;
CREATE FUNCTION pgmq._get_pg_partman_schema()
RETURNS TEXT AS $$
SELECT
extnamespace::regnamespace::text
FROM
pg_extension
WHERE
extname = 'pg_partman';
$$ LANGUAGE SQL;
CREATE FUNCTION pgmq.drop_queue(queue_name TEXT, partitioned BOOLEAN)
RETURNS BOOLEAN AS $$
DECLARE
qtable TEXT := pgmq.format_table_name(queue_name, 'q');
fq_qtable TEXT := 'pgmq.' || qtable;
atable TEXT := pgmq.format_table_name(queue_name, 'a');
fq_atable TEXT := 'pgmq.' || atable;
BEGIN
RAISE WARNING 'drop_queue(queue_name, partitioned) is deprecated and will be removed in PGMQ v2.0. Use drop_queue(queue_name) instead';
PERFORM pgmq.drop_queue(queue_name);
RETURN TRUE;
END;
$$ LANGUAGE plpgsql;
CREATE FUNCTION pgmq.drop_queue(queue_name TEXT)
RETURNS BOOLEAN AS $$
DECLARE
qtable TEXT := pgmq.format_table_name(queue_name, 'q');
qtable_seq TEXT := qtable || '_msg_id_seq';
fq_qtable TEXT := 'pgmq.' || qtable;
atable TEXT := pgmq.format_table_name(queue_name, 'a');
fq_atable TEXT := 'pgmq.' || atable;
partitioned BOOLEAN;
BEGIN
PERFORM pgmq.acquire_queue_lock(queue_name);
EXECUTE FORMAT(
$QUERY$
SELECT is_partitioned FROM pgmq.meta WHERE queue_name = %L
$QUERY$,
queue_name
) INTO partitioned;
-- check if the queue exists
IF NOT EXISTS (
SELECT 1
FROM information_schema.tables
WHERE table_name = qtable and table_schema = 'pgmq'
) THEN
RAISE NOTICE 'pgmq queue `%` does not exist', queue_name;
RETURN FALSE;
END IF;
EXECUTE FORMAT(
$QUERY$
DROP TABLE IF EXISTS pgmq.%I
$QUERY$,
qtable
);
EXECUTE FORMAT(
$QUERY$
DROP TABLE IF EXISTS pgmq.%I
$QUERY$,
atable
);
IF EXISTS (
SELECT 1
FROM information_schema.tables
WHERE table_name = 'meta' and table_schema = 'pgmq'
) THEN
EXECUTE FORMAT(
$QUERY$
DELETE FROM pgmq.meta WHERE queue_name = %L
$QUERY$,
queue_name
);
END IF;
IF partitioned THEN
EXECUTE FORMAT(
$QUERY$
DELETE FROM %I.part_config where parent_table in (%L, %L)
$QUERY$,
pgmq._get_pg_partman_schema(), fq_qtable, fq_atable
);
END IF;
RETURN TRUE;
END;
$$ LANGUAGE plpgsql;
CREATE FUNCTION pgmq.validate_queue_name(queue_name TEXT)
RETURNS void AS $$
BEGIN
IF length(queue_name) > 47 THEN
-- complete table identifier must be <= 63
-- https://www.postgresql.org/docs/17/sql-syntax-lexical.html#SQL-SYNTAX-IDENTIFIERS
-- e.g. template_pgmq_q_my_queue is an identifier for my_queue when partitioned
-- template_pgmq_q_ (16) + <a max length queue name> (47) = 63
RAISE EXCEPTION 'queue name is too long, maximum length is 47 characters';
END IF;
END;
$$ LANGUAGE plpgsql;
CREATE FUNCTION pgmq._belongs_to_pgmq(table_name TEXT)
RETURNS BOOLEAN AS $$
DECLARE
sql TEXT;
result BOOLEAN;
BEGIN
SELECT EXISTS (
SELECT 1
FROM pg_depend
WHERE refobjid = (SELECT oid FROM pg_extension WHERE extname = 'pgmq')
AND objid = (
SELECT oid
FROM pg_class
WHERE relname = table_name
)
) INTO result;
RETURN result;
END;
$$ LANGUAGE plpgsql;
CREATE FUNCTION pgmq.create_non_partitioned(queue_name TEXT)
RETURNS void AS $$
DECLARE
qtable TEXT := pgmq.format_table_name(queue_name, 'q');
qtable_seq TEXT := qtable || '_msg_id_seq';
atable TEXT := pgmq.format_table_name(queue_name, 'a');
BEGIN
PERFORM pgmq.validate_queue_name(queue_name);
PERFORM pgmq.acquire_queue_lock(queue_name);
EXECUTE FORMAT(
$QUERY$
CREATE TABLE IF NOT EXISTS pgmq.%I (
msg_id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
read_ct INT DEFAULT 0 NOT NULL,
enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL,
last_read_at TIMESTAMP WITH TIME ZONE,
vt TIMESTAMP WITH TIME ZONE NOT NULL,
message JSONB,
headers JSONB
)
$QUERY$,
qtable
);
EXECUTE FORMAT(
$QUERY$
CREATE TABLE IF NOT EXISTS pgmq.%I (
msg_id BIGINT PRIMARY KEY,
read_ct INT DEFAULT 0 NOT NULL,
enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL,
last_read_at TIMESTAMP WITH TIME ZONE,
archived_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL,
vt TIMESTAMP WITH TIME ZONE NOT NULL,
message JSONB,
headers JSONB
);
$QUERY$,
atable
);
EXECUTE FORMAT(
$QUERY$
CREATE INDEX IF NOT EXISTS %I ON pgmq.%I (vt ASC);
$QUERY$,
qtable || '_vt_idx', qtable
);
EXECUTE FORMAT(
$QUERY$
CREATE INDEX IF NOT EXISTS %I ON pgmq.%I (archived_at);
$QUERY$,
'archived_at_idx_' || queue_name, atable
);
EXECUTE FORMAT(
$QUERY$
INSERT INTO pgmq.meta (queue_name, is_partitioned, is_unlogged)
VALUES (%L, false, false)
ON CONFLICT
DO NOTHING;
$QUERY$,
queue_name
);
END;
$$ LANGUAGE plpgsql;
CREATE FUNCTION pgmq.create_unlogged(queue_name TEXT)
RETURNS void AS $$
DECLARE
qtable TEXT := pgmq.format_table_name(queue_name, 'q');
qtable_seq TEXT := qtable || '_msg_id_seq';
atable TEXT := pgmq.format_table_name(queue_name, 'a');
BEGIN
PERFORM pgmq.validate_queue_name(queue_name);
PERFORM pgmq.acquire_queue_lock(queue_name);
EXECUTE FORMAT(
$QUERY$
CREATE UNLOGGED TABLE IF NOT EXISTS pgmq.%I (
msg_id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
read_ct INT DEFAULT 0 NOT NULL,
enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL,
last_read_at TIMESTAMP WITH TIME ZONE,
vt TIMESTAMP WITH TIME ZONE NOT NULL,
message JSONB,
headers JSONB
)
$QUERY$,
qtable
);
EXECUTE FORMAT(
$QUERY$
CREATE TABLE IF NOT EXISTS pgmq.%I (
msg_id BIGINT PRIMARY KEY,
read_ct INT DEFAULT 0 NOT NULL,
enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL,
last_read_at TIMESTAMP WITH TIME ZONE,
archived_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL,
vt TIMESTAMP WITH TIME ZONE NOT NULL,
message JSONB,
headers JSONB
);
$QUERY$,
atable
);
EXECUTE FORMAT(
$QUERY$
CREATE INDEX IF NOT EXISTS %I ON pgmq.%I (vt ASC);
$QUERY$,
qtable || '_vt_idx', qtable
);
EXECUTE FORMAT(
$QUERY$
CREATE INDEX IF NOT EXISTS %I ON pgmq.%I (archived_at);
$QUERY$,
'archived_at_idx_' || queue_name, atable
);
EXECUTE FORMAT(
$QUERY$
INSERT INTO pgmq.meta (queue_name, is_partitioned, is_unlogged)
VALUES (%L, false, true)
ON CONFLICT
DO NOTHING;
$QUERY$,
queue_name
);
END;
$$ LANGUAGE plpgsql;
CREATE FUNCTION pgmq._get_partition_col(partition_interval TEXT)
RETURNS TEXT AS $$
DECLARE
num INTEGER;
BEGIN
BEGIN
num := partition_interval::INTEGER;
RETURN 'msg_id';
EXCEPTION
WHEN others THEN
RETURN 'enqueued_at';
END;
END;
$$ LANGUAGE plpgsql;
CREATE FUNCTION pgmq._extension_exists(extension_name TEXT)
RETURNS BOOLEAN
LANGUAGE SQL
AS $$
SELECT EXISTS (
SELECT 1
FROM pg_extension
WHERE extname = extension_name
)
$$;
CREATE FUNCTION pgmq._ensure_pg_partman_installed()
RETURNS void AS $$
BEGIN
IF NOT pgmq._extension_exists('pg_partman') THEN
RAISE EXCEPTION 'pg_partman is required for partitioned queues';
END IF;
END;
$$ LANGUAGE plpgsql;
CREATE FUNCTION pgmq._get_pg_partman_major_version()
RETURNS INT
LANGUAGE SQL
AS $$
SELECT split_part(extversion, '.', 1)::INT
FROM pg_extension
WHERE extname = 'pg_partman'
$$;
CREATE FUNCTION pgmq.create_partitioned(
queue_name TEXT,
partition_interval TEXT DEFAULT '10000',
retention_interval TEXT DEFAULT '100000'
)
RETURNS void AS $$
DECLARE
partition_col TEXT;
a_partition_col TEXT;
qtable TEXT := pgmq.format_table_name(queue_name, 'q');
qtable_seq TEXT := qtable || '_msg_id_seq';
atable TEXT := pgmq.format_table_name(queue_name, 'a');
fq_qtable TEXT := 'pgmq.' || qtable;
fq_atable TEXT := 'pgmq.' || atable;
BEGIN
PERFORM pgmq.validate_queue_name(queue_name);
PERFORM pgmq.acquire_queue_lock(queue_name);
PERFORM pgmq._ensure_pg_partman_installed();
SELECT pgmq._get_partition_col(partition_interval) INTO partition_col;
EXECUTE FORMAT(
$QUERY$
CREATE TABLE IF NOT EXISTS pgmq.%I (
msg_id BIGINT GENERATED ALWAYS AS IDENTITY,
read_ct INT DEFAULT 0 NOT NULL,
enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL,
last_read_at TIMESTAMP WITH TIME ZONE,
vt TIMESTAMP WITH TIME ZONE NOT NULL,
message JSONB,
headers JSONB
) PARTITION BY RANGE (%I)
$QUERY$,
qtable, partition_col
);
-- https://github.com/pgpartman/pg_partman/blob/master/doc/pg_partman.md
-- p_parent_table - the existing parent table. MUST be schema qualified, even if in public schema.
EXECUTE FORMAT(
$QUERY$
SELECT %I.create_parent(
p_parent_table := %L,
p_control := %L,
p_interval := %L,
p_type := case
when pgmq._get_pg_partman_major_version() = 5 then 'range'
else 'native'
end
)
$QUERY$,
pgmq._get_pg_partman_schema(),
fq_qtable,
partition_col,
partition_interval
);
EXECUTE FORMAT(
$QUERY$
CREATE INDEX IF NOT EXISTS %I ON pgmq.%I (%I);
$QUERY$,
qtable || '_part_idx', qtable, partition_col
);
EXECUTE FORMAT(
$QUERY$
UPDATE %I.part_config
SET
retention = %L,
retention_keep_table = false,
retention_keep_index = true,
automatic_maintenance = 'on'
WHERE parent_table = %L;
$QUERY$,
pgmq._get_pg_partman_schema(),
retention_interval,
'pgmq.' || qtable
);
EXECUTE FORMAT(
$QUERY$
INSERT INTO pgmq.meta (queue_name, is_partitioned, is_unlogged)
VALUES (%L, true, false)
ON CONFLICT
DO NOTHING;
$QUERY$,
queue_name
);
IF partition_col = 'enqueued_at' THEN
a_partition_col := 'archived_at';
ELSE
a_partition_col := partition_col;
END IF;
EXECUTE FORMAT(
$QUERY$
CREATE TABLE IF NOT EXISTS pgmq.%I (
msg_id BIGINT NOT NULL,
read_ct INT DEFAULT 0 NOT NULL,
enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL,
last_read_at TIMESTAMP WITH TIME ZONE,
archived_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL,
vt TIMESTAMP WITH TIME ZONE NOT NULL,
message JSONB,
headers JSONB
) PARTITION BY RANGE (%I);
$QUERY$,
atable, a_partition_col
);
-- https://github.com/pgpartman/pg_partman/blob/master/doc/pg_partman.md
-- p_parent_table - the existing parent table. MUST be schema qualified, even if in public schema.
EXECUTE FORMAT(
$QUERY$
SELECT %I.create_parent(
p_parent_table := %L,
p_control := %L,
p_interval := %L,
p_type := case
when pgmq._get_pg_partman_major_version() = 5 then 'range'
else 'native'
end
)
$QUERY$,
pgmq._get_pg_partman_schema(),
fq_atable,
a_partition_col,
partition_interval
);
EXECUTE FORMAT(
$QUERY$
UPDATE %I.part_config
SET
retention = %L,
retention_keep_table = false,
retention_keep_index = true,
automatic_maintenance = 'on'
WHERE parent_table = %L;
$QUERY$,
pgmq._get_pg_partman_schema(),
retention_interval,
'pgmq.' || atable
);
EXECUTE FORMAT(
$QUERY$
CREATE INDEX IF NOT EXISTS %I ON pgmq.%I (archived_at);
$QUERY$,
'archived_at_idx_' || queue_name, atable
);
END;
$$ LANGUAGE plpgsql;
CREATE FUNCTION pgmq.create(queue_name TEXT)
RETURNS void AS $$
BEGIN
PERFORM pgmq.create_non_partitioned(queue_name);
END;
$$ LANGUAGE plpgsql;
-- _create_fifo_index_if_not_exists
-- internal function to create GIN index on headers for better FIFO performance
CREATE OR REPLACE FUNCTION pgmq._create_fifo_index_if_not_exists(queue_name TEXT)
RETURNS void AS $$
DECLARE
qtable TEXT := pgmq.format_table_name(queue_name, 'q');
index_name TEXT := qtable || '_fifo_idx';
BEGIN
-- Create GIN index on headers for efficient FIFO key lookups
EXECUTE FORMAT(
$QUERY$
CREATE INDEX IF NOT EXISTS %I ON pgmq.%I USING GIN (headers);
$QUERY$,
index_name, qtable
);
END;
$$ LANGUAGE plpgsql;
-- create_fifo_index
-- creates a GIN index on the headers column to improve FIFO read performance
CREATE FUNCTION pgmq.create_fifo_index(queue_name TEXT)
RETURNS void AS $$
BEGIN
PERFORM pgmq._create_fifo_index_if_not_exists(queue_name);
END;
$$ LANGUAGE plpgsql;
-- create_fifo_indexes_all
-- creates FIFO indexes on all existing queues
CREATE FUNCTION pgmq.create_fifo_indexes_all()
RETURNS void AS $$
DECLARE
queue_record RECORD;
BEGIN
FOR queue_record IN SELECT queue_name FROM pgmq.meta LOOP
PERFORM pgmq.create_fifo_index(queue_record.queue_name);
END LOOP;
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION pgmq.convert_archive_partitioned(
table_name TEXT,
partition_interval TEXT DEFAULT '10000',
retention_interval TEXT DEFAULT '100000',
leading_partition INT DEFAULT 10
)
RETURNS void AS $$
DECLARE
a_table_name TEXT := pgmq.format_table_name(table_name, 'a');
a_table_name_old TEXT := pgmq.format_table_name(table_name, 'a') || '_old';
qualified_a_table_name TEXT := format('pgmq.%I', a_table_name);
partition_col TEXT;
a_partition_col TEXT;
BEGIN
PERFORM c.relkind
FROM pg_class c
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE c.relname = a_table_name
AND c.relkind = 'p';
IF FOUND THEN
RAISE NOTICE 'Table %s is already partitioned', a_table_name;
RETURN;
END IF;
PERFORM c.relkind
FROM pg_class c
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE c.relname = a_table_name
AND c.relkind = 'r';
IF NOT FOUND THEN
RAISE NOTICE 'Table %s does not exists', a_table_name;
RETURN;
END IF;
SELECT pgmq._get_partition_col(partition_interval) INTO partition_col;
-- For archive tables, use archived_at for time-based partitioning
IF partition_col = 'enqueued_at' THEN
a_partition_col := 'archived_at';
ELSE
a_partition_col := partition_col;
END IF;
EXECUTE 'ALTER TABLE ' || qualified_a_table_name || ' RENAME TO ' || a_table_name_old;
-- When partitioning by time (archived_at), we need to exclude constraints and indexes
-- because the existing PRIMARY KEY on msg_id alone is incompatible with partitioning by archived_at.
-- When partitioning by msg_id, we can keep all constraints including PRIMARY KEY.
IF a_partition_col = 'archived_at' THEN
EXECUTE format( 'CREATE TABLE pgmq.%I (LIKE pgmq.%I including defaults including generated including storage including comments) PARTITION BY RANGE (%I)', a_table_name, a_table_name_old, a_partition_col );
ELSE
EXECUTE format( 'CREATE TABLE pgmq.%I (LIKE pgmq.%I including all) PARTITION BY RANGE (%I)', a_table_name, a_table_name_old, a_partition_col );
END IF;
EXECUTE 'ALTER INDEX pgmq.archived_at_idx_' || table_name || ' RENAME TO archived_at_idx_' || table_name || '_old';
EXECUTE 'CREATE INDEX archived_at_idx_'|| table_name || ' ON ' || qualified_a_table_name ||'(archived_at)';
-- https://github.com/pgpartman/pg_partman/blob/master/doc/pg_partman.md
-- p_parent_table - the existing parent table. MUST be schema qualified, even if in public schema.
EXECUTE FORMAT(
$QUERY$
SELECT %I.create_parent(
p_parent_table := %L,
p_control := %L,
p_interval := %L,
p_type := case
when pgmq._get_pg_partman_major_version() = 5 then 'range'
else 'native'
end
)
$QUERY$,
pgmq._get_pg_partman_schema(),
qualified_a_table_name,
a_partition_col,
partition_interval
);
EXECUTE FORMAT(
$QUERY$
UPDATE %I.part_config
SET
retention = %L,
retention_keep_table = false,
retention_keep_index = false,
infinite_time_partitions = true
WHERE
parent_table = %L;
$QUERY$,
pgmq._get_pg_partman_schema(),
retention_interval,
qualified_a_table_name
);
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION pgmq.notify_queue_listeners()
RETURNS TRIGGER AS $$
DECLARE
queue_name_extracted TEXT; -- Queue name extracted from trigger table name
updated_count INTEGER; -- Number of rows updated (0 or 1)
BEGIN
queue_name_extracted := substring(TG_TABLE_NAME from 3);
UPDATE pgmq.notify_insert_throttle
SET last_notified_at = clock_timestamp()
WHERE queue_name = queue_name_extracted
AND (
throttle_interval_ms = 0 -- No throttling configured
OR clock_timestamp() - last_notified_at >=
(throttle_interval_ms * INTERVAL '1 millisecond') -- Throttle interval has elapsed
);
-- Check how many rows were updated (will be 0 or 1)
GET DIAGNOSTICS updated_count = ROW_COUNT;
IF updated_count > 0 THEN
PERFORM PG_NOTIFY('pgmq.' || TG_TABLE_NAME || '.' || TG_OP, NULL);
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION pgmq.enable_notify_insert(queue_name TEXT, throttle_interval_ms INTEGER DEFAULT 250)
RETURNS void AS $$
DECLARE
qtable TEXT := pgmq.format_table_name(queue_name, 'q');
v_queue_name TEXT := queue_name;
v_throttle_interval_ms INTEGER := throttle_interval_ms;
BEGIN
-- Validate that throttle_interval_ms is non-negative
IF v_throttle_interval_ms < 0 THEN
RAISE EXCEPTION 'throttle_interval_ms must be non-negative';
END IF;
-- Validate that the queue table exists
IF NOT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = 'pgmq' AND table_name = qtable) THEN
RAISE EXCEPTION 'Queue "%" does not exist. Create it first using pgmq.create()', v_queue_name;
END IF;
PERFORM pgmq.disable_notify_insert(v_queue_name);
INSERT INTO pgmq.notify_insert_throttle (queue_name, throttle_interval_ms)
VALUES (v_queue_name, v_throttle_interval_ms)
ON CONFLICT ON CONSTRAINT notify_insert_throttle_queue_name_key DO UPDATE
SET throttle_interval_ms = EXCLUDED.throttle_interval_ms,
last_notified_at = to_timestamp(0);
EXECUTE FORMAT(
$QUERY$
CREATE CONSTRAINT TRIGGER trigger_notify_queue_insert_listeners
AFTER INSERT ON pgmq.%I
DEFERRABLE FOR EACH ROW
EXECUTE PROCEDURE pgmq.notify_queue_listeners()
$QUERY$,
qtable
);
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION pgmq.disable_notify_insert(queue_name TEXT)
RETURNS void AS $$
DECLARE
qtable TEXT := pgmq.format_table_name(queue_name, 'q');
v_queue_name TEXT := queue_name;
BEGIN
EXECUTE FORMAT(
$QUERY$
DROP TRIGGER IF EXISTS trigger_notify_queue_insert_listeners ON pgmq.%I;
$QUERY$,
qtable
);
DELETE FROM pgmq.notify_insert_throttle nit WHERE nit.queue_name = v_queue_name;
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION pgmq.list_notify_insert_throttles()
RETURNS TABLE
(
queue_name text,
throttle_interval_ms integer,
last_notified_at TIMESTAMP WITH TIME ZONE
)
LANGUAGE sql
STABLE
AS
$$
SELECT queue_name, throttle_interval_ms, last_notified_at
FROM pgmq.notify_insert_throttle
ORDER BY queue_name;
$$;
CREATE OR REPLACE FUNCTION pgmq.update_notify_insert(queue_name text, throttle_interval_ms integer)
RETURNS void
LANGUAGE plpgsql
AS
$$
BEGIN
IF throttle_interval_ms < 0 THEN
RAISE EXCEPTION 'throttle_interval_ms must be non-negative, got: %', throttle_interval_ms;
END IF;
IF NOT EXISTS (SELECT 1 FROM pgmq.meta WHERE meta.queue_name = update_notify_insert.queue_name) THEN
RAISE EXCEPTION 'Queue "%" does not exist. Create the queue first using pgmq.create()', queue_name;
END IF;
IF NOT EXISTS (SELECT 1 FROM pgmq.notify_insert_throttle WHERE notify_insert_throttle.queue_name = update_notify_insert.queue_name) THEN
RAISE EXCEPTION 'Queue "%" does not have notify_insert enabled. Enable it first using pgmq.enable_notify_insert()', queue_name;
END IF;
UPDATE pgmq.notify_insert_throttle
SET throttle_interval_ms = update_notify_insert.throttle_interval_ms,
last_notified_at = to_timestamp(0)
WHERE notify_insert_throttle.queue_name = update_notify_insert.queue_name;
END;
$$;
CREATE OR REPLACE FUNCTION pgmq.validate_routing_key(routing_key text)
RETURNS boolean
LANGUAGE plpgsql
IMMUTABLE
AS
$$
BEGIN
-- Valid routing key examples:
-- "logs.error"
-- "app.user-service.auth"
-- "system_events.db.connection_failed"
--
-- Invalid routing key examples:
-- "" - empty
-- ".logs.error" - starts with dot
-- "logs.error." - ends with dot
-- "logs..error" - consecutive dots
-- "logs.error!" - invalid character
-- "logs error" - space not allowed
-- "logs.*" - wildcards not allowed in routing keys
IF routing_key IS NULL OR routing_key = '' THEN
RAISE EXCEPTION 'routing_key cannot be NULL or empty';
END IF;
IF length(routing_key) > 255 THEN
RAISE EXCEPTION 'routing_key length cannot exceed 255 characters, got % characters', length(routing_key);
END IF;
IF routing_key !~ '^[a-zA-Z0-9._-]+$' THEN
RAISE EXCEPTION 'routing_key contains invalid characters. Only alphanumeric, dots, hyphens, and underscores are allowed. Got: %', routing_key;
END IF;
IF routing_key ~ '^\.' THEN
RAISE EXCEPTION 'routing_key cannot start with a dot. Got: %', routing_key;
END IF;
IF routing_key ~ '\.$' THEN
RAISE EXCEPTION 'routing_key cannot end with a dot. Got: %', routing_key;
END IF;
IF routing_key ~ '\.\.' THEN
RAISE EXCEPTION 'routing_key cannot contain consecutive dots. Got: %', routing_key;
END IF;
RETURN true;
END;
$$;
CREATE OR REPLACE FUNCTION pgmq.validate_topic_pattern(pattern text)
RETURNS boolean
LANGUAGE plpgsql
IMMUTABLE
AS
$$
BEGIN
-- Valid pattern examples:
-- "logs.*" - matches one segment after logs. (e.g., logs.error, logs.info)
-- "logs.#" - matches one or more segments after logs. (e.g., logs.error, logs.api.error)
-- "*.error" - matches one segment before .error (e.g., app.error, db.error)
-- "#.error" - matches one or more segments before .error (e.g., app.error, x.y.error)
-- "app.*.#" - mixed wildcards (one segment then one or more)
-- "#" - catch-all pattern, matches any routing key
--
-- Invalid pattern examples:
-- ".logs.*" - starts with dot
-- "logs.*." - ends with dot
-- "logs..error" - consecutive dots
-- "logs.**" - consecutive stars
-- "logs.##" - consecutive hashes
-- "logs.*#" - adjacent wildcards
-- "logs.error!" - invalid character
IF pattern IS NULL OR pattern = '' THEN
RAISE EXCEPTION 'pattern cannot be NULL or empty';
END IF;
IF length(pattern) > 255 THEN
RAISE EXCEPTION 'pattern length cannot exceed 255 characters, got % characters', length(pattern);
END IF;
IF pattern !~ '^[a-zA-Z0-9._\-*#]+$' THEN
RAISE EXCEPTION 'pattern contains invalid characters. Only alphanumeric, dots, hyphens, underscores, *, and # are allowed. Got: %', pattern;
END IF;
IF pattern ~ '^\.' THEN
RAISE EXCEPTION 'pattern cannot start with a dot. Got: %', pattern;
END IF;
IF pattern ~ '\.$' THEN
RAISE EXCEPTION 'pattern cannot end with a dot. Got: %', pattern;
END IF;
IF pattern ~ '\.\.' THEN
RAISE EXCEPTION 'pattern cannot contain consecutive dots. Got: %', pattern;
END IF;
IF pattern ~ '\*\*' THEN
RAISE EXCEPTION 'pattern cannot contain consecutive stars (**). Use # for multi-segment matching. Got: %', pattern;
END IF;
IF pattern ~ '##' THEN
RAISE EXCEPTION 'pattern cannot contain consecutive hashes (##). A single # already matches zero or more segments. Got: %', pattern;
END IF;
IF pattern ~ '\*#' OR pattern ~ '#\*' THEN
RAISE EXCEPTION 'pattern cannot contain adjacent wildcards (*# or #*). Separate wildcards with dots. Got: %', pattern;
END IF;
RETURN true;
END;
$$;
CREATE OR REPLACE FUNCTION pgmq.bind_topic(pattern text, queue_name text)
RETURNS void
LANGUAGE plpgsql
AS
$$
BEGIN
PERFORM pgmq.validate_topic_pattern(pattern);
IF queue_name IS NULL OR queue_name = '' THEN
RAISE EXCEPTION 'queue_name cannot be NULL or empty';
END IF;
IF NOT EXISTS (SELECT 1 FROM pgmq.meta WHERE meta.queue_name = bind_topic.queue_name) THEN
RAISE EXCEPTION 'Queue "%" does not exist. Create the queue first using pgmq.create()', queue_name;
END IF;
INSERT INTO pgmq.topic_bindings (pattern, queue_name)
VALUES (pattern, queue_name)
ON CONFLICT ON CONSTRAINT topic_bindings_unique_pattern_queue DO NOTHING;
END;
$$;
CREATE OR REPLACE FUNCTION pgmq.unbind_topic(pattern text, queue_name text)
RETURNS boolean
LANGUAGE plpgsql
AS
$$
DECLARE
rows_deleted integer;
BEGIN
IF pattern IS NULL OR pattern = '' THEN
RAISE EXCEPTION 'pattern cannot be NULL or empty';
END IF;
IF queue_name IS NULL OR queue_name = '' THEN
RAISE EXCEPTION 'queue_name cannot be NULL or empty';
END IF;
DELETE
FROM pgmq.topic_bindings
WHERE topic_bindings.pattern = unbind_topic.pattern
AND topic_bindings.queue_name = unbind_topic.queue_name;
GET DIAGNOSTICS rows_deleted = ROW_COUNT;
IF rows_deleted > 0 THEN
RETURN true;
ELSE
RETURN false;
END IF;
END;
$$;
CREATE OR REPLACE FUNCTION pgmq.test_routing(routing_key text)
RETURNS TABLE
(
pattern text,
queue_name text,
compiled_regex text
)
LANGUAGE plpgsql
STABLE
AS
$$
BEGIN
PERFORM pgmq.validate_routing_key(routing_key);
RETURN QUERY
SELECT b.pattern,
b.queue_name,
b.compiled_regex
FROM pgmq.topic_bindings b
WHERE routing_key ~ b.compiled_regex
ORDER BY b.pattern;
END;
$$;
CREATE OR REPLACE FUNCTION pgmq.send_topic(routing_key text, msg jsonb, headers jsonb, delay integer)
RETURNS integer
LANGUAGE plpgsql
VOLATILE
AS
$$
DECLARE
b RECORD;
matched_count integer := 0;
BEGIN
PERFORM pgmq.validate_routing_key(routing_key);
IF msg IS NULL THEN
RAISE EXCEPTION 'msg cannot be NULL';
END IF;
IF delay < 0 THEN
RAISE EXCEPTION 'delay cannot be negative, got: %', delay;
END IF;
-- Filter matching patterns in SQL for better performance (uses index)
-- Any failure will rollback the entire transaction
FOR b IN
SELECT DISTINCT tb.queue_name
FROM pgmq.topic_bindings tb
WHERE routing_key ~ tb.compiled_regex
ORDER BY tb.queue_name -- Deterministic ordering, deduplicated by queue_name
LOOP
PERFORM pgmq.send(b.queue_name, msg, headers, delay);
matched_count := matched_count + 1;
END LOOP;
RETURN matched_count;
END;
$$;
CREATE OR REPLACE FUNCTION pgmq.send_topic(routing_key text, msg jsonb)
RETURNS integer
LANGUAGE plpgsql
VOLATILE
AS
$$
BEGIN
RETURN pgmq.send_topic(routing_key, msg, NULL, 0);
END;
$$;
CREATE OR REPLACE FUNCTION pgmq.send_topic(routing_key text, msg jsonb, delay integer)
RETURNS integer
LANGUAGE plpgsql
VOLATILE
AS
$$
BEGIN
RETURN pgmq.send_topic(routing_key, msg, NULL, delay);
END;
$$;
CREATE OR REPLACE FUNCTION pgmq.list_topic_bindings()
RETURNS TABLE
(
pattern text,
queue_name text,
bound_at TIMESTAMP WITH TIME ZONE,
compiled_regex text
)
LANGUAGE sql
STABLE
AS
$$
SELECT pattern, queue_name, bound_at, compiled_regex
FROM pgmq.topic_bindings
ORDER BY bound_at DESC, pattern, queue_name;
$$;
CREATE OR REPLACE FUNCTION pgmq.list_topic_bindings(queue_name text)
RETURNS TABLE
(
pattern text,
queue_name text,
bound_at TIMESTAMP WITH TIME ZONE,
compiled_regex text
)
LANGUAGE sql
STABLE
AS
$$
SELECT pattern, tb.queue_name, bound_at, compiled_regex
FROM pgmq.topic_bindings tb
WHERE tb.queue_name = list_topic_bindings.queue_name
ORDER BY bound_at DESC, pattern;
$$;
-- send_batch_topic: Base implementation with TIMESTAMP WITH TIME ZONE delay
CREATE OR REPLACE FUNCTION pgmq.send_batch_topic(
routing_key text,
msgs jsonb[],
headers jsonb[],
delay TIMESTAMP WITH TIME ZONE
)
RETURNS TABLE(queue_name text, msg_id bigint)
LANGUAGE plpgsql
VOLATILE
AS
$$
DECLARE
b RECORD;
BEGIN
PERFORM pgmq.validate_routing_key(routing_key);
-- Validate batch parameters once (not per queue)
PERFORM pgmq._validate_batch_params(msgs, headers);
-- Filter matching patterns in SQL for better performance (uses index)
-- Any failure will rollback the entire transaction
FOR b IN
SELECT DISTINCT tb.queue_name
FROM pgmq.topic_bindings tb
WHERE routing_key ~ tb.compiled_regex
ORDER BY tb.queue_name -- Deterministic ordering, deduplicated by queue_name
LOOP
-- Use private _send_batch to avoid redundant validation
RETURN QUERY
SELECT b.queue_name, batch_result.msg_id
FROM pgmq._send_batch(b.queue_name, msgs, headers, delay) AS batch_result(msg_id);
END LOOP;
RETURN;
END;
$$;
-- send_batch_topic: 2 args (routing_key, msgs)
CREATE OR REPLACE FUNCTION pgmq.send_batch_topic(
routing_key text,
msgs jsonb[]
)
RETURNS TABLE(queue_name text, msg_id bigint)
LANGUAGE sql
VOLATILE
AS
$$
SELECT * FROM pgmq.send_batch_topic(routing_key, msgs, NULL, clock_timestamp());
$$;
-- send_batch_topic: 3 args with headers
CREATE OR REPLACE FUNCTION pgmq.send_batch_topic(
routing_key text,
msgs jsonb[],
headers jsonb[]
)
RETURNS TABLE(queue_name text, msg_id bigint)
LANGUAGE sql
VOLATILE
AS
$$
SELECT * FROM pgmq.send_batch_topic(routing_key, msgs, headers, clock_timestamp());
$$;
-- send_batch_topic: 3 args with integer delay
CREATE OR REPLACE FUNCTION pgmq.send_batch_topic(
routing_key text,
msgs jsonb[],
delay integer
)
RETURNS TABLE(queue_name text, msg_id bigint)
LANGUAGE sql
VOLATILE
AS
$$
SELECT * FROM pgmq.send_batch_topic(routing_key, msgs, NULL, clock_timestamp() + make_interval(secs => delay));
$$;
-- send_batch_topic: 3 args with timestamp delay
CREATE OR REPLACE FUNCTION pgmq.send_batch_topic(
routing_key text,
msgs jsonb[],
delay TIMESTAMP WITH TIME ZONE
)
RETURNS TABLE(queue_name text, msg_id bigint)
LANGUAGE sql
VOLATILE
AS
$$
SELECT * FROM pgmq.send_batch_topic(routing_key, msgs, NULL, delay);
$$;
-- send_batch_topic: 4 args with integer delay
CREATE OR REPLACE FUNCTION pgmq.send_batch_topic(
routing_key text,
msgs jsonb[],
headers jsonb[],
delay integer
)
RETURNS TABLE(queue_name text, msg_id bigint)
LANGUAGE sql
VOLATILE
AS
$$
SELECT * FROM pgmq.send_batch_topic(routing_key, msgs, headers, clock_timestamp() + make_interval(secs => delay));
$$;