lib/oban/plugins/gossip.ex

defmodule Oban.Plugins.Gossip do
  @moduledoc """
  The Gossip plugin uses PubSub to periodically exchange queue state information between all
  interested nodes. This allows Oban instances to broadcast state information regardless of which
  engine they are using, and without storing anything in the database.

  Gossip enables real-time updates across an entire cluster, and is essential to the operation of
  UIs like Oban Web.

  The Gossip plugin entirely replaced heartbeats and the legacy `oban_beats` table.

  ## Using the Plugin

  The following example demonstrates using the plugin without any configuration, which will broadcast
  the state of each local queue every 1 second:

      config :my_app, Oban,
        plugins: [Oban.Plugins.Gossip],
        ...

  Override the default options to broadcast every 5 seconds:

      config :my_app, Oban,
        plugins: [{Oban.Plugins.Gossip, interval: :timer.seconds(5)}],
        ...

  ## Options

  * `:interval` — the number of milliseconds between gossip broadcasts

  ## Instrumenting with Telemetry

  The `Oban.Plugins.Gossip` plugin adds the following metadata to the `[:oban, :plugin, :stop]` event:

  * `:gossip_count` - the number of queues that had activity broadcasted
  """

  @behaviour Oban.Plugin

  use GenServer

  alias Oban.{Notifier, Plugin, Validation}

  @type option :: Plugin.option() | {:interval, pos_integer()}

  defmodule State do
    @moduledoc false

    defstruct [:conf, :name, :timer, interval: :timer.seconds(1)]
  end

  @impl Plugin
  @spec start_link([option()]) :: GenServer.on_start()
  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts, name: opts[:name])
  end

  @impl Plugin
  def validate(opts) do
    Validation.validate(opts, fn
      {:conf, _} -> :ok
      {:name, _} -> :ok
      {:interval, interval} -> Validation.validate_integer(:interval, interval)
      option -> {:unknown, option, State}
    end)
  end

  @impl GenServer
  def init(opts) do
    Validation.validate!(opts, &validate/1)

    Process.flag(:trap_exit, true)

    state =
      State
      |> struct!(opts)
      |> schedule_gossip()

    :telemetry.execute([:oban, :plugin, :init], %{}, %{conf: state.conf, plugin: __MODULE__})

    {:ok, state}
  end

  @impl GenServer
  def terminate(_reason, state) do
    if is_reference(state.timer), do: Process.cancel_timer(state.timer)

    :ok
  end

  @impl GenServer
  def handle_info(:gossip, %State{} = state) do
    meta = %{conf: state.conf, plugin: __MODULE__}

    match = [{{{state.conf.name, {:producer, :_}}, :"$1", :_}, [], [:"$1"]}]

    :telemetry.span([:oban, :plugin], meta, fn ->
      checks =
        Oban.Registry
        |> Registry.select(match)
        |> Enum.map(&safe_check(&1, state))
        |> Enum.reject(&is_nil/1)
        |> Enum.map(&sanitize_name/1)

      if Enum.any?(checks), do: Notifier.notify(state.conf, :gossip, checks)

      {:ok, Map.put(meta, :gossip_count, length(checks))}
    end)

    {:noreply, schedule_gossip(state)}
  end

  def handle_info(_message, state) do
    {:noreply, state}
  end

  # Scheduling

  defp schedule_gossip(state) do
    %{state | timer: Process.send_after(self(), :gossip, state.interval)}
  end

  # Checking

  defp safe_check(pid, state) do
    if Process.alive?(pid), do: GenServer.call(pid, :check, state.interval)
  catch
    :exit, _ -> nil
  end

  defp sanitize_name(%{name: name} = check) when is_binary(name), do: check
  defp sanitize_name(%{name: name} = check), do: %{check | name: inspect(name)}
end