defmodule Pgmq do
@moduledoc """
Thin wrapper over the pgmq extension
Provides APIs for sending, reading, archiving and deleting messages.
### Use-macros
You can `use Pgmq` for the convenience of having a standardized repo and less
convoluted function calls. By defining:
```
# lib/my_app/pgmq.ex
defmodule MyApp.Pgmq do
use Pgmq, repo: MyApp.Repo
end
```
You can then call `MyApp.Pgmq.send_message("myqueue", "hello")`, without passing
in the `MyApp.Repo`
"""
alias Pgmq.Message
@typedoc "Queue name"
@type queue :: String.t()
@typedoc "An Ecto repository"
@type repo :: Ecto.Repo.t()
@default_max_poll_seconds 5
@default_poll_interval_ms 250
defmacro __using__(opts) do
repo = Keyword.fetch!(opts, :repo)
quote do
@spec create_queue(Pgmq.queue(), Keyword.t()) :: :ok
def create_queue(queue, opts \\ []), do: Pgmq.create_queue(unquote(repo), queue, opts)
@spec drop_queue(Pgmq.queue()) :: :ok
def drop_queue(queue), do: Pgmq.drop_queue(unquote(repo), queue)
@spec send_message(Pgmq.queue(), binary()) :: {:ok, integer()} | {:error, term()}
def send_message(queue, encoded_message) do
Pgmq.send_message(unquote(repo), queue, encoded_message)
end
@spec send_messages(Pgmq.queue(), [binary()]) :: {:ok, [integer()]} | {:error, term()}
def send_messages(queue, encoded_messages) do
Pgmq.send_messages(unquote(repo), queue, encoded_messages)
end
@spec read_message(Pgmq.queue(), integer()) :: Pgmq.Message.t() | nil
def read_message(queue, visibility_timeout_seconds) do
Pgmq.read_message(unquote(repo), queue, visibility_timeout_seconds)
end
@spec read_messages(
Pgmq.queue(),
visibility_timeout_seconds :: integer(),
count :: integer()
) :: [Pgmq.Message.t()]
def read_messages(queue, visibility_timeout_seconds, count) do
Pgmq.read_messages(unquote(repo), queue, visibility_timeout_seconds, count)
end
@spec read_messages_with_poll(
Pgmq.queue(),
visibility_timeout_seconds :: integer(),
count :: integer(),
opts :: Keyword.t()
) :: [Pgmq.Message.t()]
def read_messages_with_poll(
queue,
count,
visibility_timeout_seconds,
opts \\ []
) do
Pgmq.read_messages_with_poll(
unquote(repo),
queue,
visibility_timeout_seconds,
opts
)
end
@spec archive_messages(Pgmq.queue(), messages :: [Pgmq.Message.t()] | [integer()]) :: :ok
def archive_messages(queue, messages) do
Pgmq.archive_messages(unquote(repo), queue, messages)
end
@spec delete_messages(Pgmq.queue(), messages :: [Pgmq.Message.t()] | [integer()]) :: :ok
def delete_messages(queue, messages) do
Pgmq.delete_messages(unquote(repo), queue, messages)
end
@doc """
Returns a list of queue names
"""
@spec list_queues() :: [
%{
queue_name: String.t(),
is_partitioned: boolean(),
is_unlogged: boolean(),
created_at: DateTime.t()
}
]
def list_queues() do
Pgmq.list_queues(unquote(repo))
end
@doc """
Sets the visibility timeout of a message for X seconds from now
Accepts either a message or a message id.
"""
@spec set_message_vt(Pgmq.queue(), Pgmq.Message.t() | integer(), integer()) :: :ok
def set_message_vt(queue, message, vt) do
Pgmq.set_message_vt(unquote(repo), queue, message, vt)
end
@doc """
Reads a message and instantly deletes it from the queue
If there are no messages in the queue, returns `nil`.
"""
@spec pop_message(Pgmq.queue()) :: Pgmq.Message.t() | nil
def pop_message(queue) do
Pgmq.pop_message(unquote(repo), queue)
end
end
end
@doc """
Creates a queue in the database
Notice that the queue name must:
- have less than 48 characters
- start with a letter
- have only letters, numbers, and `_`
Accepts the following options:
- `:unlogged`: Boolean indicating if the queue should be unlogged. Unlogged
queues are faster to write to, but data may be lost in database crashes or
unclean exits. Can't be used together with `:partitioned`.
- `:partitioned`: indicates if the queue is partitioned. Defaults to `false`. Requires
`pg_partman` extension.
- `:partition_interval:` interval to partition the queue, required if `:partitioned`
is true.
- `:retention_interval:` interval for partition retention, required if `:partitioned`
is true.
"""
@spec create_queue(repo, queue, opts :: Keyword.t()) :: :ok | {:error, atom}
def create_queue(repo, queue, opts \\ []) do
if Keyword.get(opts, :partitioned, false) do
if Keyword.get(opts, :unlogged), do: raise("Partitioned queues can't be unlogged")
partition_interval = Keyword.fetch!(opts, :partition_interval)
retention_interval = Keyword.fetch!(opts, :retention_interval)
repo.query!("SELECT FROM pgmq.create_partitioned($1, $2, $3)", [
queue,
partition_interval,
retention_interval
])
else
if Keyword.get(opts, :unlogged) do
%Postgrex.Result{num_rows: 1} =
repo.query!("SELECT FROM pgmq.create_unlogged($1)", [queue])
else
%Postgrex.Result{num_rows: 1} = repo.query!("SELECT FROM pgmq.create($1)", [queue])
end
end
:ok
end
@doc """
Deletes a queue from the database
"""
@spec drop_queue(repo, queue) :: :ok | {:error, atom}
def drop_queue(repo, queue) do
%Postgrex.Result{num_rows: 1} = repo.query!("SELECT FROM pgmq.drop_queue($1)", [queue])
:ok
end
@doc """
Sends one message to a queue
"""
@spec send_message(repo, queue, encoded_message :: binary) ::
{:ok, Message.t()} | {:error, term}
def send_message(repo, queue, encoded_message) do
case repo.query!("SELECT * FROM pgmq.send($1, $2)", [queue, encoded_message]) do
%Postgrex.Result{rows: [[message_id]]} -> {:ok, message_id}
result -> {:error, {:sending_error, result}}
end
end
@doc """
Sends a message batch to a queue
"""
@spec send_messages(repo, queue, encoded_messages :: [binary]) ::
{:ok, Message.t()} | {:error, term}
def send_messages(repo, queue, encoded_messages) do
case repo.query!("SELECT * FROM pgmq.send_batch($1, $2)", [queue, encoded_messages]) do
%Postgrex.Result{rows: message_ids} -> {:ok, List.flatten(message_ids)}
result -> {:error, {:sending_error, result}}
end
end
@doc """
Reads one message from a queue
Returns immediately. If there are no messages in the queue, returns `nil`.
Messages read through this function are guaranteed not to be read by
other calls for `visibility_timeout_seconds`.
"""
@spec read_message(repo, queue, visibility_timeout_seconds :: integer) :: Message.t() | nil
def read_message(repo, queue, visibility_timeout_seconds) do
%Postgrex.Result{rows: rows} =
repo.query!("SELECT * FROM pgmq.read($1, $2, 1)", [queue, visibility_timeout_seconds])
case rows do
[] -> nil
[row] -> Message.from_row(row)
end
end
@doc """
Reads a batch of messages from a queue
Messages read through this function are guaranteed not to be read by
other calls for `visibility_timeout_seconds`.
"""
@spec read_messages(repo, queue, visibility_timeout_seconds :: integer, count :: integer) :: [
Message.t()
]
def read_messages(repo, queue, visibility_timeout_seconds, count) do
%Postgrex.Result{rows: rows} =
repo.query!("SELECT * FROM pgmq.read($1, $2, $3)", [
queue,
visibility_timeout_seconds,
count
])
Enum.map(rows, &Message.from_row/1)
end
@doc """
Reads a batch of messages from a queue, but waits if no messages are available
Accepts two options:
- `:max_poll_seconds`: the maximum duration of the poll. Defaults to 5.
- `:poll_interval_ms`: dictates how often the poll is made database
side. Defaults to 250. Can be tuned for lower latency or less database load,
depending on your needs.
When there are messages available in the queue, returns immediately.
Otherwise, blocks until at least one message is available, or `max_poll_seconds`
is reached.
Notice that this function may put significant burden on the connection pool,
as it may hold the connection for several seconds if there's no activity in
the queue.
Messages read through this function are guaranteed not to be read by
other calls for `visibility_timeout_seconds`.
"""
@spec read_messages_with_poll(
repo,
queue,
visibility_timeout_seconds :: integer,
count :: integer,
opts :: Keyword.t()
) :: [Message.t()]
def read_messages_with_poll(
repo,
queue,
visibility_timeout_seconds,
count,
opts \\ []
) do
max_poll_seconds = Keyword.get(opts, :max_poll_seconds, @default_max_poll_seconds)
poll_interval_ms = Keyword.get(opts, :poll_interval_ms, @default_poll_interval_ms)
%Postgrex.Result{rows: rows} =
repo.query!("SELECT * FROM pgmq.read_with_poll($1, $2, $3, $4, $5)", [
queue,
visibility_timeout_seconds,
count,
max_poll_seconds,
poll_interval_ms
])
Enum.map(rows, &Message.from_row/1)
end
@doc """
Archives list of messages, removing them from the queue and putting
them into the archive
This function can receive a list of either `Message.t()` or message ids. Mixed
lists aren't allowed.
"""
@spec archive_messages(repo, queue, [message_id :: integer] | [message :: Message.t()]) :: :ok
def archive_messages(repo, queue, [%Message{} | _] = messages) do
message_ids = Enum.map(messages, fn m -> m.id end)
archive_messages(repo, queue, message_ids)
end
def archive_messages(repo, queue, [message_id]) do
%Postgrex.Result{rows: [[true]]} =
repo.query!("SELECT * FROM pgmq.archive($1, $2::bigint)", [queue, message_id])
:ok
end
def archive_messages(repo, queue, message_ids) do
%Postgrex.Result{} =
repo.query!("SELECT * FROM pgmq.archive($1, $2::bigint[])", [queue, message_ids])
:ok
end
@doc """
Deletes a batch of messages, removing them from the queue
This function can receive a list of either `Message.t()` or message ids. Mixed
lists aren't allowed.
"""
@spec delete_messages(repo, queue, [message_id :: integer] | [Message.t()]) :: :ok
def delete_messages(repo, queue, [%Message{} | _] = messages) do
message_ids = Enum.map(messages, fn m -> m.id end)
delete_messages(repo, queue, message_ids)
end
def delete_messages(repo, queue, [message_id]) do
%Postgrex.Result{rows: [[true]]} =
repo.query!("SELECT * FROM pgmq.delete($1::text, $2::bigint)", [queue, message_id])
:ok
end
def delete_messages(repo, queue, message_ids) do
%Postgrex.Result{} =
repo.query!("SELECT * FROM pgmq.delete($1::text, $2::bigint[])", [queue, message_ids])
:ok
end
@doc """
Returns a list of queues
"""
@spec list_queues(repo) :: [
%{
queue_name: String.t(),
is_partitioned: boolean(),
is_unlogged: boolean(),
created_at: DateTime.t()
}
]
def list_queues(repo) do
%Postgrex.Result{
columns: ["queue_name", "is_partitioned", "is_unlogged", "created_at"],
rows: queues
} = repo.query!("SELECT * FROM pgmq.list_queues()", [])
Enum.map(queues, fn [queue_name, is_partitioned, is_unlogged, created_at] ->
%{
queue_name: queue_name,
is_partitioned: is_partitioned,
is_unlogged: is_unlogged,
created_at: created_at
}
end)
end
@doc """
Returns a list of queues with stats
"""
@spec get_metrics_all(repo) :: [
%{
queue_name: String.t(),
queue_length: pos_integer(),
newest_msg_age_sec: pos_integer() | nil,
oldest_msg_age_sec: pos_integer() | nil,
total_messages: pos_integer(),
scrape_time: DateTime.t()
}
]
def get_metrics_all(repo) do
%Postgrex.Result{rows: queues} = repo.query!("SELECT * FROM pgmq.metrics_all()", [])
Enum.map(queues, fn [
queue_name,
queue_length,
newest_msg_age_sec,
oldest_msg_age_sec,
total_messages,
scrape_time
] ->
%{
queue_name: queue_name,
queue_length: queue_length,
newest_msg_age_sec: newest_msg_age_sec,
oldest_msg_age_sec: oldest_msg_age_sec,
total_messages: total_messages,
scrape_time: scrape_time
}
end)
end
@doc """
Returns metrics for a single queue
"""
@spec get_metrics(repo, queue) :: [
%{
queue_name: String.t(),
queue_length: pos_integer(),
newest_msg_age_sec: pos_integer() | nil,
oldest_msg_age_sec: pos_integer() | nil,
total_messages: pos_integer(),
scrape_time: DateTime.t()
}
]
def get_metrics(repo, queue) do
%Postgrex.Result{rows: [result]} = repo.query!("SELECT * FROM pgmq.metrics($1)", [queue])
[
queue_name,
queue_length,
newest_msg_age_sec,
oldest_msg_age_sec,
total_messages,
scrape_time
] = result
%{
queue_name: queue_name,
queue_length: queue_length,
newest_msg_age_sec: newest_msg_age_sec,
oldest_msg_age_sec: oldest_msg_age_sec,
total_messages: total_messages,
scrape_time: scrape_time
}
end
@doc """
Sets the visibility timeout of a message for X seconds from now
"""
@spec set_message_vt(repo, queue, Message.t() | integer(), visibility_timeout :: integer()) ::
:ok
def set_message_vt(repo, queue, %Message{id: message_id}, visibility_timeout) do
set_message_vt(repo, queue, message_id, visibility_timeout)
end
def set_message_vt(repo, queue, message_id, visibility_timeout) do
%Postgrex.Result{rows: [_]} =
repo.query!("SELECT * FROM pgmq.set_vt($1, $2, $3)", [queue, message_id, visibility_timeout])
:ok
end
@doc """
Reads a message and instantly deletes it from the queue
If there are no messages in the queue, returns `nil`.
"""
@spec pop_message(repo, queue) :: Message.t() | nil
def pop_message(repo, queue) do
case repo.query!("SELECT * FROM pgmq.pop($1)", [queue]) do
%Postgrex.Result{rows: [columns]} ->
Message.from_row(columns)
%Postgrex.Result{rows: []} ->
nil
end
end
end