Skip to main content

lib/dust_ecto/phoenix/broadcaster.ex

defmodule DustEcto.Phoenix.Broadcaster do
  @moduledoc """
  One broadcaster process per `{schema, pubsub, topic}` triple. Holds
  a single `DustEcto.Repo.subscribe/2` registration whose callback
  broadcasts every event through Phoenix.PubSub on `topic`.

  Lifecycle:
    * Started lazily by `DustEcto.Phoenix.subscribe_to_pubsub/3`.
    * Registered in `DustEcto.Phoenix.Registry` keyed by
      `{schema, pubsub, topic}` so concurrent callers reuse the same
      broadcaster.
    * Stays alive for the application lifetime. The cost is one
      process and one entry in the SDK callback registry — both
      negligible. Cleanup is opt-in (caller can `unsubscribe/3`).

  The broadcast message shape is `{:dust_event, event}` where `event`
  is whatever `DustEcto.Repo.subscribe/2` would deliver:
  `{:upserted, struct}` or `{:deleted, slug}`.
  """

  use GenServer

  require Logger

  @doc false
  def start_link({schema, pubsub, topic} = key)
      when is_atom(schema) and is_atom(pubsub) and is_binary(topic) do
    GenServer.start_link(__MODULE__, key, name: via(key))
  end

  @doc false
  def via({schema, pubsub, topic}) do
    {:via, Registry, {DustEcto.Phoenix.Registry, {schema, pubsub, topic}}}
  end

  @impl true
  def init({schema, pubsub, topic}) do
    case DustEcto.Repo.subscribe(schema, broadcast_callback(pubsub, topic)) do
      {:ok, ref} ->
        {:ok, %{schema: schema, pubsub: pubsub, topic: topic, ref: ref}}

      {:error, %DustEcto.Error{} = err} ->
        {:stop, {:dust_subscribe_failed, err}}
    end
  end

  @impl true
  def terminate(_reason, %{ref: ref}) when is_reference(ref) do
    DustEcto.Repo.unsubscribe(ref)
    :ok
  end

  def terminate(_, _), do: :ok

  # Returns a 1-arity function suitable for DustEcto.Repo.subscribe/2.
  # We deliberately use apply/3 rather than a direct Phoenix.PubSub
  # call so this module compiles even when phoenix_pubsub isn't a
  # transitive dep — the call only ever runs if the broadcaster
  # started, which in turn requires phoenix_pubsub at runtime.
  defp broadcast_callback(pubsub, topic) do
    fn event ->
      apply(Phoenix.PubSub, :broadcast, [pubsub, topic, {:dust_event, event}])
    end
  end
end