defmodule Redix.PubSub do
@moduledoc """
Interface for the Redis pub/sub functionality.
The rest of this documentation will assume the reader knows how pub/sub works
in Redis and knows the meaning of the following Redis commands:
* `SUBSCRIBE` and `UNSUBSCRIBE`
* `PSUBSCRIBE` and `PUNSUBSCRIBE`
* `PUBLISH`
## Usage
Each `Redix.PubSub` process is able to subscribe to/unsubscribe from multiple
Redis channels/patterns, and is able to handle multiple Elixir processes subscribing
each to different channels/patterns.
A `Redix.PubSub` process can be started via `Redix.PubSub.start_link/2`; such
a process holds a single TCP (or SSL) connection to the Redis server.
`Redix.PubSub` has a message-oriented API. Subscribe operations are synchronous and return
a reference that can then be used to match on all messages sent by the `Redix.PubSub` process.
When `Redix.PubSub` registers a subscriptions, the subscriber process will receive a
confirmation message:
{:ok, pubsub} = Redix.PubSub.start_link()
{:ok, ref} = Redix.PubSub.subscribe(pubsub, "my_channel", self())
receive do message -> message end
#=> {:redix_pubsub, ^pubsub, ^ref, :subscribed, %{channel: "my_channel"}}
When the `:subscribed` message is received, it's guaranteed that the `Redix.PubSub` process has
subscribed to the given channel. This means that after a subscription, messages published to
a channel are delivered to all Elixir processes subscribed to that channel via `Redix.PubSub`:
# Someone publishes "hello" on "my_channel"
receive do message -> message end
#=> {:redix_pubsub, ^pubsub, ^ref, :message, %{channel: "my_channel", payload: "hello"}}
It's advised to wait for the subscription confirmation for a channel before doing any
other operation involving that channel.
Note that unsubscription confirmations are delivered right away even if the `Redix.PubSub`
process is still subscribed to the given channel: this is by design, as once a process
is unsubscribed from a channel it won't receive messages anyways, even if the `Redix.PubSub`
process still receives them.
Messages are also delivered as a confirmation of an unsubscription as well as when the
`Redix.PubSub` connection goes down. See the "Messages" section below.
## Messages
Most of the communication with a PubSub connection is done via (Elixir) messages: the
subscribers of these messages will be the processes specified at subscription time (in
`subscribe/3` or `psubscribe/3`). All `Redix.PubSub` messages have the same form: they're a
five-element tuple that looks like this:
{:redix_pubsub, pubsub_pid, subscription_ref, message_type, message_properties}
where:
* `pubsub_pid` is the pid of the `Redix.PubSub` process that sent this message.
* `subscription_ref` is the reference returned by `subscribe/3` or `psubscribe/3`.
* `message_type` is the type of this message, such as `:subscribed` for subscription
confirmations, `:message` for pub/sub messages, and so on.
* `message_properties` is a map of data related to that that varies based on `message_type`.
Given this format, it's easy to match on all Redix pub/sub messages for a subscription
as `{:redix_pubsub, _, ^subscription_ref, _, _}`.
### List of possible message types and properties
The following is a comprehensive list of possible message types alongside the properties
that each can have.
* `:subscribe` - sent as confirmation of subscription to a channel (via `subscribe/3` or
after a disconnection and reconnection). One `:subscribe` message is received for every
channel a process subscribed to. `:subscribe` messages have the following properties:
* `:channel` - the channel the process has been subscribed to.
* `:psubscribe` - sent as confirmation of subscription to a pattern (via `psubscribe/3` or
after a disconnection and reconnection). One `:psubscribe` message is received for every
pattern a process subscribed to. `:psubscribe` messages have the following properties:
* `:pattern` - the pattern the process has been subscribed to.
* `:unsubscribe` - sent as confirmation of unsubscription from a channel (via
`unsubscribe/3`). `:unsubscribe` messages are received for every channel a
process unsubscribes from. `:unsubscribe` messages havethe following properties:
* `:channel` - the channel the process has unsubscribed from.
* `:punsubscribe` - sent as confirmation of unsubscription from a pattern (via
`unsubscribe/3`). `:unsubscribe` messages are received for every pattern a
process unsubscribes from. `:unsubscribe` messages havethe following properties:
* `:pattern` - the pattern the process has unsubscribed from.
* `:message` - sent to subscribers to a given channel when a message is published on
that channel. `:message` messages have the following properties:
* `:channel` - the channel the message was published on
* `:payload` - the contents of the message
* `:pmessage` - sent to subscribers to a given pattern when a message is published on
a channel that matches that pattern. `:pmessage` messages have the following properties:
* `:channel` - the channel the message was published on
* `:pattern` - the original pattern that matched the channel
* `:payload` - the contents of the message
* `:disconnected` messages - sent to all subscribers to all channels/patterns when the
connection to Redis is interrupted. `:disconnected` messages have the following properties:
* `:error` - the reason for the disconnection, a `Redix.ConnectionError`
exception struct (that can be raised or turned into a message through
`Exception.message/1`).
## Reconnections
`Redix.PubSub` tries to be resilient to failures: when the connection with
Redis is interrupted (for whatever reason), it will try to reconnect to the
Redis server. When a disconnection happens, `Redix.PubSub` will notify all
clients subscribed to all channels with a `{:redix_pubsub, pid, subscription_ref, :disconnected,
_}` message (more on the format of messages above). When the connection goes
back up, `Redix.PubSub` takes care of actually re-subscribing to the
appropriate channels on the Redis server and subscribers are notified with a
`{:redix_pubsub, pid, subscription_ref, :subscribed | :psubscribed, _}` message, the same as
when a client subscribes to a channel/pattern.
Note that if `exit_on_disconnection: true` is passed to
`Redix.PubSub.start_link/2`, the `Redix.PubSub` process will exit and not send
any `:disconnected` messages to subscribed clients.
## Sentinel support
Works exactly the same as for normal `Redix` connections. See the documentation for `Redix`
for more information.
## Examples
This is an example of a workflow using the PubSub functionality; it uses
[Redix](https://github.com/whatyouhide/redix) as a Redis client for publishing
messages.
{:ok, pubsub} = Redix.PubSub.start_link()
{:ok, client} = Redix.start_link()
Redix.PubSub.subscribe(pubsub, "my_channel", self())
#=> {:ok, ref}
# We wait for the subscription confirmation
receive do
{:redix_pubsub, ^pubsub, ^ref, :subscribed, %{channel: "my_channel"}} -> :ok
end
Redix.command!(client, ~w(PUBLISH my_channel hello)
receive do
{:redix_pubsub, ^pubsub, ^ref, :message, %{channel: "my_channel"} = properties} ->
properties.payload
end
#=> "hello"
Redix.PubSub.unsubscribe(pubsub, "foo", self())
#=> :ok
# We wait for the unsubscription confirmation
receive do
{:redix_pubsub, ^pubsub, ^ref, :unsubscribed, _} -> :ok
end
"""
@type subscriber() :: pid() | port() | atom() | {atom(), node()}
@type connection() :: GenServer.server()
alias Redix.StartOptions
@doc """
Starts a pub/sub connection to Redis.
This function returns `{:ok, pid}` if the PubSub process is started successfully.
The actual TCP/SSL connection to the Redis server may happen either synchronously,
before `start_link/2` returns, or asynchronously: this behaviour is decided by
the `:sync_connect` option (see below).
This function accepts one argument, either a Redis URI as a string or a list of options.
## Redis URI
In case `uri_or_opts` is a Redis URI, it must be in the form:
redis://[:password@]host[:port][/db]
Here are some examples of valid URIs:
redis://localhost
redis://:secret@localhost:6397
redis://username:secret@localhost:6397
redis://example.com:6380/1
The only mandatory thing when using URIs is the host. All other elements are optional
and their default value can be found in the "Options" section below.
In earlier versions of Redix, the username in the URI was ignored. Redis 6 introduced [ACL
support](https://redis.io/topics/acl). Now, Redix supports usernames as well.
## Options
The following options can be used to specify the connection:
#{StartOptions.options_docs()}
## Examples
iex> Redix.PubSub.start_link()
{:ok, #PID<...>}
iex> Redix.PubSub.start_link(host: "example.com", port: 9999, password: "secret")
{:ok, #PID<...>}
iex> Redix.PubSub.start_link([database: 3], [name: :redix_3])
{:ok, #PID<...>}
"""
@spec start_link(String.t() | keyword()) :: {:ok, pid()} | :ignore | {:error, term()}
def start_link(uri_or_opts \\ [])
def start_link(uri) when is_binary(uri) do
uri |> Redix.URI.to_start_options() |> start_link()
end
def start_link(opts) when is_list(opts) do
opts = StartOptions.sanitize(opts)
{gen_statem_opts, opts} = Keyword.split(opts, [:hibernate_after, :debug, :spawn_opt])
case Keyword.fetch(opts, :name) do
:error ->
:gen_statem.start_link(Redix.PubSub.Connection, opts, gen_statem_opts)
{:ok, atom} when is_atom(atom) ->
:gen_statem.start_link({:local, atom}, Redix.PubSub.Connection, opts, gen_statem_opts)
{:ok, {:global, _term} = tuple} ->
:gen_statem.start_link(tuple, Redix.PubSub.Connection, opts, gen_statem_opts)
{:ok, {:via, via_module, _term} = tuple} when is_atom(via_module) ->
:gen_statem.start_link(tuple, Redix.PubSub.Connection, opts, gen_statem_opts)
{:ok, other} ->
raise ArgumentError, """
expected :name option to be one of the following:
* nil
* atom
* {:global, term}
* {:via, module, term}
Got: #{inspect(other)}
"""
end
end
@doc """
Same as `start_link/1` but using both a Redis URI and a list of options.
In this case, options specified in `opts` have precedence over values specified by `uri`.
For example, if `uri` is `redix://example1.com` but `opts` is `[host: "example2.com"]`, then
`example2.com` will be used as the host when connecting.
"""
@spec start_link(String.t(), keyword()) :: {:ok, pid()} | :ignore | {:error, term()}
def start_link(uri, opts) when is_binary(uri) and is_list(opts) do
uri |> Redix.URI.to_start_options() |> Keyword.merge(opts) |> start_link()
end
@doc """
Stops the given pub/sub process.
This function is synchronous and blocks until the given pub/sub connection
frees all its resources and disconnects from the Redis server. `timeout` can
be passed to limit the amount of time allowed for the connection to exit; if
it doesn't exit in the given interval, this call exits.
## Examples
iex> Redix.PubSub.stop(conn)
:ok
"""
@spec stop(connection()) :: :ok
def stop(conn, timeout \\ :infinity) do
:gen_statem.stop(conn, :normal, timeout)
end
@doc """
Subscribes `subscriber` to the given channel or list of channels.
Subscribes `subscriber` (which can be anything that can be passed to `send/2`)
to `channels`, which can be a single channel or a list of channels.
For each of the channels in `channels` which `subscriber` successfully
subscribes to, a message will be sent to `subscriber` with this form:
{:redix_pubsub, pid, subscription_ref, :subscribed, %{channel: channel}}
See the documentation for `Redix.PubSub` for more information about the format
of messages.
## Examples
iex> Redix.PubSub.subscribe(conn, ["foo", "bar"], self())
{:ok, subscription_ref}
iex> flush()
{:redix_pubsub, ^conn, ^subscription_ref, :subscribed, %{channel: "foo"}}
{:redix_pubsub, ^conn, ^subscription_ref, :subscribed, %{channel: "bar"}}
:ok
"""
@spec subscribe(connection(), String.t() | [String.t()], subscriber) :: {:ok, reference()}
def subscribe(conn, channels, subscriber \\ self())
when is_binary(channels) or is_list(channels) do
:gen_statem.call(conn, {:subscribe, List.wrap(channels), subscriber})
end
@doc """
Subscribes `subscriber` to the given pattern or list of patterns.
Works like `subscribe/3` but subscribing `subscriber` to a pattern (or list of
patterns) instead of regular channels.
Upon successful subscription to each of the `patterns`, a message will be sent
to `subscriber` with the following form:
{:redix_pubsub, pid, ^subscription_ref, :psubscribed, %{pattern: pattern}}
See the documentation for `Redix.PubSub` for more information about the format
of messages.
## Examples
iex> Redix.psubscribe(conn, "ba*", self())
:ok
iex> flush()
{:redix_pubsub, ^conn, ^subscription_ref, :psubscribe, %{pattern: "ba*"}}
:ok
"""
@spec psubscribe(connection(), String.t() | [String.t()], subscriber) :: {:ok, reference}
def psubscribe(conn, patterns, subscriber \\ self())
when is_binary(patterns) or is_list(patterns) do
:gen_statem.call(conn, {:psubscribe, List.wrap(patterns), subscriber})
end
@doc """
Unsubscribes `subscriber` from the given channel or list of channels.
This function basically "undoes" what `subscribe/3` does: it unsubscribes
`subscriber` from the given channel or list of channels.
Upon successful unsubscription from each of the `channels`, a message will be
sent to `subscriber` with the following form:
{:redix_pubsub, pid, ^subscription_ref, :unsubscribed, %{channel: channel}}
See the documentation for `Redix.PubSub` for more information about the format
of messages.
## Examples
iex> Redix.unsubscribe(conn, ["foo", "bar"], self())
:ok
iex> flush()
{:redix_pubsub, ^conn, ^subscription_ref, :unsubscribed, %{channel: "foo"}}
{:redix_pubsub, ^conn, ^subscription_ref, :unsubscribed, %{channel: "bar"}}
:ok
"""
@spec unsubscribe(connection(), String.t() | [String.t()], subscriber) :: :ok
def unsubscribe(conn, channels, subscriber \\ self())
when is_binary(channels) or is_list(channels) do
:gen_statem.call(conn, {:unsubscribe, List.wrap(channels), subscriber})
end
@doc """
Unsubscribes `subscriber` from the given pattern or list of patterns.
This function basically "undoes" what `psubscribe/3` does: it unsubscribes
`subscriber` from the given pattern or list of patterns.
Upon successful unsubscription from each of the `patterns`, a message will be
sent to `subscriber` with the following form:
{:redix_pubsub, pid, ^subscription_ref, :punsubscribed, %{pattern: pattern}}
See the documentation for `Redix.PubSub` for more information about the format
of messages.
## Examples
iex> Redix.punsubscribe(conn, "foo_*", self())
:ok
iex> flush()
{:redix_pubsub, ^conn, ^subscription_ref, :punsubscribed, %{pattern: "foo_*"}}
:ok
"""
@spec punsubscribe(connection(), String.t() | [String.t()], subscriber) :: :ok
def punsubscribe(conn, patterns, subscriber \\ self())
when is_binary(patterns) or is_list(patterns) do
:gen_statem.call(conn, {:punsubscribe, List.wrap(patterns), subscriber})
end
end