lib/oban/plugins/repeater.ex

defmodule Oban.Plugins.Repeater do
  @moduledoc """
  Repeatedly send inserted messages to all registered producers to simulate polling.

  ⚠️ This plugin is a **last resort** and only necessary if you're running Oban in an environment
  where neither Postgres nor PG notifications work. That situation should be rare, and limited to
  the following conditions:

  1. Running with a database connection pooler, i.e. pg_bouncer, in transaction mode
  2. Running without clustering, .i.e. distributed Erlang

  If **both** of those criteria apply and PubSub notifications won't work at all, then the
  Repeater will force local queues to poll for jobs.

  Note that the Repeater plugin won't enable other PubSub based functionality like pausing,
  scaling, starting, and stopping queues.

  ## Using the Plugin

  Tell all local, idle queues to poll for jobs every one second:

      config :my_app, Oban,
        plugins: [Oban.Plugins.Repeater],
        ...

  Override the default interval and poll every 30 seconds:

      config :my_app, Oban,
        plugins: [{Oban.Plugins.Repeater, interval: :timer.seconds(30)],
        ...

  ## Options

  * `:interval` — the number of milliseconds between notifications. The default is `1_000ms`.
  """

  @behaviour Oban.Plugin

  use GenServer

  alias Oban.{Plugin, Validation}

  @type option :: Plugin.option() | {:interval, pos_integer()}

  defmodule State do
    @moduledoc false

    defstruct [:conf, :name, :timer, interval: :timer.seconds(1)]
  end

  @impl Plugin
  @spec start_link([option()]) :: GenServer.on_start()
  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts, name: opts[:name])
  end

  @impl Plugin
  def validate(opts) do
    Validation.validate(opts, fn
      {:conf, _} -> :ok
      {:name, _} -> :ok
      {:interval, interval} -> Validation.validate_integer(:interval, interval)
      option -> {:error, "unknown option provided: #{inspect(option)}"}
    end)
  end

  @impl GenServer
  def init(opts) do
    Validation.validate!(opts, &validate/1)

    Process.flag(:trap_exit, true)

    state =
      State
      |> struct!(opts)
      |> schedule_notify()

    :telemetry.execute([:oban, :plugin, :init], %{}, %{conf: state.conf, plugin: __MODULE__})

    {:ok, state}
  end

  @impl GenServer
  def terminate(_reason, %State{timer: timer}) do
    if is_reference(timer), do: Process.cancel_timer(timer)

    :ok
  end

  @impl GenServer
  def handle_info(:notify, %State{} = state) do
    match = [{{{state.conf.name, {:producer, :"$1"}}, :"$2", :_}, [], [{{:"$1", :"$2"}}]}]
    meta = %{conf: state.conf, plugin: __MODULE__}

    :telemetry.span([:oban, :plugin], meta, fn ->
      for {queue, pid} <- Registry.select(Oban.Registry, match) do
        send(pid, {:notification, :insert, %{"queue" => queue}})
      end

      {:ok, meta}
    end)

    {:noreply, schedule_notify(state)}
  end

  # Scheduling

  defp schedule_notify(state) do
    timer = Process.send_after(self(), :notify, state.interval)

    %{state | timer: timer}
  end
end