Skip to main content

lib/echo_pubsub.ex

defmodule EchoPubSub do
  @moduledoc """
  Phoenix PubSub adapter using :pg with at-least-once delivery.

  To start it, list it in your supervision tree as:

      {Phoenix.PubSub, name: MyApp.PubSub, adapter: EchoPubSub}

  You will also need to add `:echo_pubsub` to your deps:

      defp deps do
        [{:echo_pubsub, "~> 0.1.0"}]
      end

  ## Options

    * `:name` - The required name to register the PubSub processes, ie: `MyApp.PubSub`
    * `:pool_size` - The number of producers and workers to run on each node, allowing concurrent message delivery.
    * `:buffer_size` - The amount of messages to maintain in memory
    * `:batch_interval` - The interval in milliseconds to batch messages before sending (default: 200)
    * `:call_timeout` - The timeout in milliseconds for GenServer calls to remote nodes (default: 5000)
    * `:capacity_warning_threshold` - Buffer fill ratio (0.0-1.0) that triggers warning (default: 0.4)
    * `:capacity_warning_interval` - Minimum seconds between warnings (default: 60)

  ## Configuration

  Options can also be set via Application config per PubSub name:

      # config/config.exs
      config :echo_pubsub, MyApp.PubSub,
        pool_size: 2,
        buffer_size: 50_000,
        batch_interval: 100,
        call_timeout: 5000

  Options passed directly to the supervisor take precedence over config values.

  ## Implementation

  The in memory buffer is a ring buffer, meaning that a constant number of messages are maintained and once
  the buffer is full, new messages overwrite the oldest message in the buffer.

  This means that if a node in the cluster is disconnected long enough that when it reconnects, its cursor
  points to a message that no longer exists, it will receive a special message over pubsub: `{:cursor_expired, node@host}`

  Applications are encouraged to handle and act on this message to get to a valid state, such as reloading all state from
  a source of truth like the db or another node. While this technically means that we don't guarentee every node will
  receive every message, we can guerentee that there are no gaps in messages. Recipt of message 3 guarentees you've
  received messages 1 and 2.

  """
  @behaviour Phoenix.PubSub.Adapter

  use Supervisor

  alias EchoPubSub.Producer
  alias EchoPubSub.Worker
  alias Phoenix.PubSub.Adapter

  ## Adapter callbacks

  @impl Adapter
  def node_name(_), do: node()

  @impl Adapter
  def broadcast(adapter_name, topic, message, dispatcher) do
    group = group(adapter_name)
    message = forward_to_local(topic, message, dispatcher)

    Producer.buffer_and_send(group, message)
  end

  @impl Adapter
  def direct_broadcast(adapter_name, node_name, topic, message, dispatcher) do
    GenServer.call(
      {Module.concat(group(adapter_name), :Worker), node_name},
      {:forward_to_local, topic, message, dispatcher}
    )
  end

  defp forward_to_local(topic, message, dispatcher) do
    {:forward_to_local, topic, message, dispatcher}
  end

  defp group(adapter_name) do
    groups = :persistent_term.get(adapter_name)
    elem(groups, :erlang.phash2(self(), tuple_size(groups)))
  end

  ## Supervisor callbacks

  @doc false
  def start_link(opts) do
    adapter_name = Keyword.fetch!(opts, :adapter_name)
    Supervisor.start_link(__MODULE__, opts, name: Module.concat(adapter_name, Supervisor))
  end

  @impl Supervisor
  def init(opts) do
    name = Keyword.fetch!(opts, :name)
    adapter_name = Keyword.fetch!(opts, :adapter_name)

    # Read from Application config, with opts taking precedence
    config = Application.get_env(:echo_pubsub, name, [])
    pool_size = Keyword.get(opts, :pool_size) || Keyword.get(config, :pool_size, 1)
    buffer_size = Keyword.get(opts, :buffer_size) || Keyword.get(config, :buffer_size, 10_000)

    batch_interval =
      Keyword.get(opts, :batch_interval) || Keyword.get(config, :batch_interval, 200)

    call_timeout =
      Keyword.get(opts, :call_timeout) || Keyword.get(config, :call_timeout, 5000)

    capacity_warning_threshold =
      Keyword.get(opts, :capacity_warning_threshold) ||
        Keyword.get(config, :capacity_warning_threshold, 0.4)

    capacity_warning_interval =
      Keyword.get(opts, :capacity_warning_interval) ||
        Keyword.get(config, :capacity_warning_interval, 60)

    [_ | groups] =
      for number <- 1..pool_size do
        :"#{adapter_name}#{number}"
      end

    # Use `adapter_name` for the first in the pool for backwards compatability
    # with v2.0 when the pool_size is 1.
    groups = [adapter_name | groups]

    :persistent_term.put(adapter_name, List.to_tuple(groups))

    children =
      Enum.flat_map(groups, fn group ->
        producer_id = Module.concat(group, :Producer)

        [
          Supervisor.child_spec(
            {Producer,
             {buffer_size, batch_interval, call_timeout, capacity_warning_threshold,
              capacity_warning_interval, group}},
            id: producer_id
          ),
          Supervisor.child_spec({Worker, {name, group}}, id: Module.concat(group, :Worker))
        ]
      end)

    Supervisor.init(children, strategy: :one_for_one)
  end
end