lib/oban/notifiers/phoenix.ex

defmodule Oban.Notifiers.Phoenix do
  @external_resource readme = Path.join([__DIR__, "../../../README.md"])

  @moduledoc readme
             |> File.read!()
             |> String.split("<!-- MDOC -->")
             |> Enum.fetch!(1)

  @behaviour Oban.Notifier

  use GenServer

  alias Oban.Notifier
  alias Phoenix.PubSub

  @enforce_keys [:conf, :pubsub]
  defstruct @enforce_keys

  @doc false
  def child_spec(opts), do: super(opts)

  @impl Notifier
  def start_link(opts) do
    {name, opts} = Keyword.pop(opts, :name, __MODULE__)

    GenServer.start_link(__MODULE__, opts, name: name)
  end

  @impl Notifier
  def listen(server, channels) do
    with {:ok, %{pubsub: pubsub}} <- GenServer.call(server, :get_state) do
      for channel <- channels, do: PubSub.subscribe(pubsub, to_string(channel))

      :ok
    end
  end

  @impl Notifier
  def unlisten(server, channels) do
    with {:ok, %{pubsub: pubsub}} <- GenServer.call(server, :get_state) do
      for channel <- channels, do: PubSub.unsubscribe(pubsub, to_string(channel))

      :ok
    end
  end

  @impl Notifier
  def notify(server, channel, payload) do
    with {:ok, %{conf: conf, pubsub: pubsub}} <- GenServer.call(server, :get_state) do
      PubSub.broadcast(pubsub, to_string(channel), {conf, channel, payload}, __MODULE__)

      :ok
    end
  end

  @impl GenServer
  def init(opts) do
    {:ok, struct!(__MODULE__, opts)}
  end

  @impl GenServer
  def handle_call(:get_state, _from, state) do
    {:reply, {:ok, state}, state}
  end

  @doc false
  def dispatch(entries, _from, {conf, channel, payload}) do
    pids = Enum.map(entries, &elem(&1, 0))

    for message <- payload, do: Notifier.relay(conf, pids, channel, message)
  end
end