lib/phoenix/pubsub/pg2.ex

defmodule Phoenix.PubSub.PG2 do
  @moduledoc """
  Phoenix PubSub adapter based on `:pg`/`:pg2`.

  It runs on Distributed Erlang and is the default adapter.
  """

  @behaviour Phoenix.PubSub.Adapter
  use Supervisor

  ## Adapter callbacks

  @impl true
  def node_name(_), do: node()

  @impl true
  def broadcast(adapter_name, topic, message, dispatcher) do
    case pg_members(group(adapter_name)) do
      {:error, {:no_such_group, _}} ->
        {:error, :no_such_group}

      pids ->
        message = forward_to_local(topic, message, dispatcher)

        for pid <- pids, node(pid) != node() do
          send(pid, message)
        end

        :ok
    end
  end

  @impl true
  def direct_broadcast(adapter_name, node_name, topic, message, dispatcher) do
    send({group(adapter_name), node_name}, {:forward_to_local, topic, message, dispatcher})
    :ok
  end

  defp forward_to_local(topic, message, dispatcher) do
    {:forward_to_local, topic, message, dispatcher}
  end

  defp group(adapter_name) do
    groups = :persistent_term.get(adapter_name)
    elem(groups, :erlang.phash2(self(), tuple_size(groups)))
  end

  if Code.ensure_loaded?(:pg) do
    defp pg_members(group) do
      :pg.get_members(Phoenix.PubSub, group)
    end
  else
    defp pg_members(group) do
      :pg2.get_members({:phx, group})
    end
  end

  ## Supervisor callbacks

  @doc false
  def start_link(opts) do
    name = Keyword.fetch!(opts, :name)
    pool_size = Keyword.get(opts, :pool_size, 1)
    adapter_name = Keyword.fetch!(opts, :adapter_name)
    Supervisor.start_link(__MODULE__, {name, adapter_name, pool_size}, name: :"#{adapter_name}_supervisor")
  end

  @impl true
  def init({name, adapter_name, pool_size}) do
    [_ | groups] =
      for number <- 1..pool_size do
        :"#{adapter_name}_#{number}"
      end

    # Use `adapter_name` for the first in the pool for backwards compatability
    # with v2.0 when the pool_size is 1.
    groups = [adapter_name | groups]

    :persistent_term.put(adapter_name, List.to_tuple(groups))

    children =
      for group <- groups do
        Supervisor.child_spec({Phoenix.PubSub.PG2Worker, {name, group}}, id: group)
      end

    Supervisor.init(children, strategy: :one_for_one)
  end
end

defmodule Phoenix.PubSub.PG2Worker do
  @moduledoc false
  use GenServer

  @doc false
  def start_link({name, group}) do
    GenServer.start_link(__MODULE__, {name, group}, name: group)
  end

  @impl true
  def init({name, group}) do
    :ok = pg_join(group)
    {:ok, name}
  end

  @impl true
  def handle_info({:forward_to_local, topic, message, dispatcher}, pubsub) do
    Phoenix.PubSub.local_broadcast(pubsub, topic, message, dispatcher)
    {:noreply, pubsub}
  end

  @impl true
  def handle_info(_, pubsub) do
    {:noreply, pubsub}
  end

  if Code.ensure_loaded?(:pg) do
    defp pg_join(group) do
      :ok = :pg.join(Phoenix.PubSub, group, self())
    end
  else
    defp pg_join(group) do
      namespace = {:phx, group}
      :ok = :pg2.create(namespace)
      :ok = :pg2.join(namespace, self())
      :ok
    end
  end
end