lib/oban/peers/global.ex

defmodule Oban.Peers.Global do
  @moduledoc """
  A cluster based peer that coordinates through a distributed registry.

  Leadership is coordinated through global locks. It requires a functional distributed Erlang
  cluster, without one global plugins (Cron, Lifeline, Stager, etc.) will not function correctly.

  ## Usage

  Specify the `Global` peer in your Oban configuration.

      config :my_app, Oban,
        peer: Oban.Peers.Global,
        ...
  """

  @behaviour Oban.Peer

  use GenServer

  alias Oban.Backoff

  defmodule State do
    @moduledoc false

    defstruct [
      :conf,
      :name,
      :timer,
      interval: :timer.seconds(30),
      leader?: false
    ]
  end

  @impl Oban.Peer
  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts, name: opts[:name])
  end

  @impl Oban.Peer
  def leader?(pid, timeout \\ 5_000) do
    GenServer.call(pid, :leader?, timeout)
  end

  @impl GenServer
  def init(opts) do
    Process.flag(:trap_exit, true)

    {:ok, struct!(State, opts), {:continue, :start}}
  end

  @impl GenServer
  def terminate(_reason, %State{timer: timer} = state) do
    if is_reference(timer), do: Process.cancel_timer(timer)
    if state.leader?, do: :global.del_lock(key(state), nodes())

    :ok
  end

  @impl GenServer
  def handle_continue(:start, %State{} = state) do
    handle_info(:election, state)
  end

  @impl GenServer
  def handle_call(:leader?, _from, %State{} = state) do
    {:reply, state.leader?, state}
  end

  @impl GenServer
  def handle_info(:election, %State{} = state) do
    locked? = :global.set_lock(key(state), nodes(), 0)

    {:noreply, schedule_election(%{state | leader?: locked?})}
  end

  defp schedule_election(%State{interval: interval} = state) do
    time = Backoff.jitter(interval, mode: :dec)

    %{state | timer: Process.send_after(self(), :election, time)}
  end

  # Helpers

  defp key(state), do: {state.conf.name, state.conf.node}

  defp nodes, do: [Node.self() | Node.list()]
end