defmodule Oban.Notifiers.PG do
@moduledoc """
A PG/PG2 based notifier implementation that runs with Distributed Erlang.
Out of the box, Oban uses PostgreSQL's `LISTEN/NOTIFY` for PubSub. For most applications, that
is fine, but Postgres-based PubSub isn't sufficient in some circumstances. In particular,
Postgres notifications won't work when your application connects through PGbouncer in
_transaction_ or _statement_ mode.
_Note: You must be using [Distributed Erlang][de] to use the PG notifier._
## Usage
Specify the `PG` notifier in your Oban configuration:
```elixir
config :my_app, Oban,
notifier: Oban.Pro.Notifiers.PG,
...
```
## Implementation Notes
* The notifier will use `pg` if available (OTP 23+) or fall back to `pg2` for
older OTP releases.
* Like the Postgres implementation, notifications are namespaced by `prefix`.
* For compatbility, message payloads are always serialized to JSON before
broadcast and deserialized before relay to local processes.
[de]: https://elixir-lang.org/getting-started/mix-otp/distributed-tasks.html#our-first-distributed-code
"""
@behaviour Oban.Notifier
use GenServer
alias Oban.Config
defmodule State do
@moduledoc false
defstruct [:conf, :name, listeners: %{}]
end
@impl Oban.Notifier
def start_link(opts) do
{name, opts} = Keyword.pop(opts, :name, __MODULE__)
GenServer.start_link(__MODULE__, opts, name: name)
end
@impl Oban.Notifier
def listen(server, channels) do
GenServer.call(server, {:listen, channels})
end
@impl Oban.Notifier
def unlisten(server, channels) do
GenServer.call(server, {:unlisten, channels})
end
@impl Oban.Notifier
def notify(server, channel, payload) do
with %State{} = state <- GenServer.call(server, :get_state),
[_ | _] = pids <- members(state.conf.prefix) do
for pid <- pids, message <- payload_to_messages(channel, payload) do
send(pid, message)
end
:ok
end
end
@impl GenServer
def init(opts) do
state = struct!(State, opts)
start_pg()
:ok = join(state.conf.prefix)
{:ok, state}
end
@impl GenServer
def handle_call({:listen, channels}, {pid, _}, %State{listeners: listeners} = state) do
if Map.has_key?(listeners, pid) do
{:reply, :ok, state}
else
Process.monitor(pid)
{:reply, :ok, %{state | listeners: Map.put(listeners, pid, channels)}}
end
end
def handle_call({:unlisten, channels}, {pid, _}, %State{listeners: listeners} = state) do
orig_channels = Map.get(listeners, pid, [])
listeners =
case orig_channels -- channels do
[] -> Map.delete(listeners, pid)
new_channels -> Map.put(listeners, pid, new_channels)
end
{:reply, :ok, %{state | listeners: listeners}}
end
def handle_call(:get_state, _from, state), do: {:reply, state, state}
@impl GenServer
def handle_info({:notification, channel, payload}, %State{} = state) do
decoded = Jason.decode!(payload)
if in_scope?(decoded, state.conf) do
for {pid, channels} <- state.listeners, channel in channels do
send(pid, {:notification, channel, decoded})
end
end
{:noreply, state}
end
def handle_info(_message, state) do
{:noreply, state}
end
## PG Helpers
if Code.ensure_loaded?(:pg) do
defp start_pg do
:pg.start_link(__MODULE__)
end
defp members(prefix) do
:pg.get_members(__MODULE__, prefix)
end
defp join(prefix) do
:ok = :pg.join(__MODULE__, prefix, self())
end
else
defp start_pg, do: :ok
defp members(prefix) do
:pg2.get_members(namespace(prefix))
end
defp join(prefix) do
namespace = namespace(prefix)
:ok = :pg2.create(namespace)
:ok = :pg2.join(namespace, self())
end
defp namespace(prefix), do: {:oban, prefix}
end
## Message Helpers
defp payload_to_messages(channel, payload) do
Enum.map(payload, &{:notification, channel, &1})
end
defp in_scope?(%{"ident" => "any"}, _conf), do: true
defp in_scope?(%{"ident" => ident}, conf), do: Config.match_ident?(conf, ident)
defp in_scope?(_payload, _conf), do: true
end