lib/oban/peers/global.ex

defmodule Oban.Peers.Global do
  @moduledoc """
  A cluster based peer that coordinates through a distributed registry.

  Leadership is coordinated through global locks. It requires a functional distributed Erlang
  cluster, without one global plugins (Cron, Lifeline, etc.) will not function correctly.

  ## Usage

  Specify the `Global` peer in your Oban configuration.

      config :my_app, Oban,
        peer: Oban.Peers.Global,
        ...
  """

  @behaviour Oban.Peer

  use GenServer

  alias Oban.{Backoff, Notifier}

  defmodule State do
    @moduledoc false

    defstruct [
      :conf,
      :name,
      :timer,
      interval: :timer.seconds(30),
      leader?: false
    ]
  end

  @impl Oban.Peer
  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts, name: opts[:name])
  end

  @impl Oban.Peer
  def leader?(pid, timeout \\ 5_000) do
    GenServer.call(pid, :leader?, timeout)
  end

  @impl GenServer
  def init(opts) do
    Process.flag(:trap_exit, true)

    {:ok, struct!(State, opts), {:continue, :start}}
  end

  @impl GenServer
  def terminate(_reason, %State{timer: timer} = state) do
    if is_reference(timer), do: Process.cancel_timer(timer)

    if state.leader? do
      try do
        delete_self(state)
        notify_down(state)
      catch
        :exit, _reason -> :ok
      end
    end

    :ok
  end

  @impl GenServer
  def handle_continue(:start, %State{} = state) do
    Notifier.listen(state.conf.name, [:leader])

    handle_info(:election, state)
  end

  @impl GenServer
  def handle_call(:leader?, _from, %State{} = state) do
    {:reply, state.leader?, state}
  end

  @impl GenServer
  def handle_info(:election, %State{} = state) do
    meta = %{conf: state.conf, leader: state.leader?, peer: __MODULE__}

    locked? =
      :telemetry.span([:oban, :peer, :election], meta, fn ->
        locked? = :global.set_lock(key(state), nodes(), 0)

        {locked?, %{meta | leader: locked?}}
      end)

    {:noreply, schedule_election(%{state | leader?: locked?})}
  end

  def handle_info({:notification, :leader, %{"down" => name}}, %State{conf: conf} = state) do
    if name == inspect(conf.name) do
      handle_info(:election, state)
    else
      {:noreply, state}
    end
  end

  # Helpers

  defp schedule_election(%State{interval: interval} = state) do
    time = Backoff.jitter(interval, mode: :dec)

    %{state | timer: Process.send_after(self(), :election, time)}
  end

  defp delete_self(state) do
    :global.del_lock(key(state), nodes())
  end

  defp notify_down(%{conf: conf}) do
    Notifier.notify(conf, :leader, %{down: inspect(conf.name)})
  end

  defp key(state), do: {state.conf.name, state.conf.node}

  defp nodes, do: [Node.self() | Node.list()]
end