defmodule Oban.Notifiers.Phoenix do
@external_resource readme = Path.join([__DIR__, "../../../README.md"])
@moduledoc readme
|> File.read!()
|> String.split("<!-- MDOC -->")
|> Enum.fetch!(1)
@behaviour Oban.Notifier
use GenServer
alias Oban.Notifier
alias Phoenix.PubSub
defstruct [:conf, :pubsub]
@doc false
def child_spec(opts), do: super(opts)
@impl Notifier
def start_link(opts) do
{name, opts} = Keyword.pop(opts, :name, __MODULE__)
GenServer.start_link(__MODULE__, struct!(__MODULE__, opts), name: name)
end
@impl Notifier
def listen(server, channels) do
with {:ok, %{pubsub: pubsub}} <- get_state(server) do
for channel <- channels,
do: PubSub.subscribe(pubsub, to_string(channel), metadata: __MODULE__)
:ok
end
end
@impl Notifier
def unlisten(server, channels) do
with {:ok, %{pubsub: pubsub}} <- get_state(server) do
for channel <- channels, do: PubSub.unsubscribe(pubsub, to_string(channel))
:ok
end
end
@impl Notifier
def notify(server, channel, payload) do
with {:ok, %{conf: conf, pubsub: pubsub}} <- get_state(server) do
PubSub.broadcast(
pubsub,
to_string(channel),
{conf.name, channel, payload},
__MODULE__
)
:ok
end
end
@impl GenServer
def init(state) do
put_state(state)
{:ok, state}
end
@doc false
def dispatch(entries, :none, {name, channel, payload}) do
pids = Enum.map(entries, &elem(&1, 0))
conf = Oban.config(name)
for message <- payload, do: Notifier.relay(conf, pids, channel, message)
end
# Backward compatibility with 0.2.1
def dispatch(entries, :none, {name, _node, channel, payload}) do
dispatch(entries, :none, {name, channel, payload})
end
# Backward compatibility with < 0.2.0
def dispatch(entries, _from, {conf, channel, payload}) do
dispatch(entries, :none, {conf.name, conf.node, channel, payload})
end
defp put_state(state) do
Registry.update_value(Oban.Registry, {state.conf.name, Oban.Notifier}, fn _ -> state end)
end
defp get_state(server) do
[name] = Registry.keys(Oban.Registry, server)
case Oban.Registry.lookup(name) do
{_pid, state} -> {:ok, state}
nil -> {:error, RuntimeError.exception("no notifier running as #{inspect(name)}")}
end
end
end