lib/oban/plugins/repeater.ex

defmodule Oban.Plugins.Repeater do
  @moduledoc """
  Repeatedly send inserted messages to all registered producers to simulate polling.

  This plugin is only necessary if you're running Oban in an environment where Postgres
  notifications don't work, notably one of:

  * Using a database connection pooler in transaction mode, i.e. pg_bouncer.
  * Integration testing within the Ecto sandbox, i.e. developing Oban plugins

  ## Options

  * `:interval` — the number of milliseconds between notifications. The default is `1_000ms`.
  """

  use GenServer

  alias Oban.Config

  @type option :: {:conf, Config.t()} | {:name, GenServer.name()} | {:interval, pos_integer()}

  defmodule State do
    @moduledoc false

    defstruct [:conf, :name, :timer, interval: :timer.seconds(1)]
  end

  @doc false
  @spec start_link([option()]) :: GenServer.on_start()
  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts, name: opts[:name])
  end

  @impl GenServer
  def init(opts) do
    Process.flag(:trap_exit, true)

    state = struct!(State, opts)

    {:ok, schedule_notify(state)}
  end

  @impl GenServer
  def terminate(_reason, %State{timer: timer}) do
    if is_reference(timer), do: Process.cancel_timer(timer)

    :ok
  end

  @impl GenServer
  def handle_info(:notify, %State{} = state) do
    match = [{{{state.conf.name, {:producer, :"$1"}}, :"$2", :_}, [], [{{:"$1", :"$2"}}]}]
    meta = %{conf: state.conf, plugin: __MODULE__}

    :telemetry.span([:oban, :plugin], meta, fn ->
      for {queue, pid} <- Registry.select(Oban.Registry, match) do
        send(pid, {:notification, :insert, %{"queue" => queue}})
      end

      {:ok, meta}
    end)

    {:noreply, schedule_notify(state)}
  end

  defp schedule_notify(state) do
    timer = Process.send_after(self(), :notify, state.interval)

    %{state | timer: timer}
  end
end