lib/oban/peer.ex

defmodule Oban.Peer do
  @moduledoc """
  The `Peer` module maintains leadership for a particular Oban instance within a cluster.

  Leadership is used by plugins, primarily, to prevent duplicate work across nodes. For example,
  only the leader's `Cron` plugin will try inserting new jobs. You can use peer leadership to
  extend Oban with custom plugins, or even within your own application.

  Note a few important details about how peer leadership operates:

  * Each peer checks for leadership at a 30 second interval. When the leader exits it broadcasts a
    message to all other peers to encourage another one to assume leadership.

  * Each Oban instance supervises a distinct `Oban.Peer` instance. That means that with multiple
    Oban instances on the same node one instance may be the leader, while the others aren't.

  * Without leadership, global plugins (Cron, Lifeline, Stager, etc.), will not run on any node.

  ## Available Peer Implementations

  There are two built-in peering modules:

  * `Oban.Peers.Database` — uses table-based leadership through the `oban_peers` table and works
    in any environment, with or without clustering. Only one node (per instance name) will have a
    row in the peers table, that node is the leader. This is the default.

  * `Oban.Peers.Global` — coordinates global locks through distributed Erlang, requires
    distributed Erlang.

  You can specify the peering module to use in your Oban configuration:

      config :my_app, Oban,
        peer: Oban.Peers.Database, # default value
        ...

  If in doubt, you can call `Oban.config()` to see which module is being used.

  ## Examples

  Check leadership for the default Oban instance:

      Oban.Peer.leader?()
      # => true

  That is identical to using the name `Oban`:

      Oban.Peer.leader?(Oban)
      # => true

  Check leadership for a couple of instances:

      Oban.Peer.leader?(Oban.A)
      # => true

      Oban.Peer.leader?(Oban.B)
      # => false
  """

  alias Oban.{Config, Registry}

  require Logger

  @type option :: [name: module(), conf: Config.t(), interval: timeout()]

  @type conf_or_name :: Config.t() | GenServer.name()

  @doc """
  Starts a peer instance.
  """
  @callback start_link([option()]) :: GenServer.on_start()

  @doc """
  Check whether the current peer instance leads the cluster.
  """
  @callback leader?(GenServer.server()) :: boolean()

  @doc """
  Check which node's peer instance currently leads the cluster.
  """
  @callback get_leader(GenServer.server()) :: nil | String.t()

  @doc false
  @spec child_spec(Keyword.t()) :: Supervisor.child_spec()
  def child_spec(opts) do
    %{peer: {peer, peer_opts}} = Keyword.fetch!(opts, :conf)

    opts =
      opts
      |> Keyword.merge(peer_opts)
      |> Keyword.put_new(:name, peer)

    %{id: opts[:name], start: {peer, :start_link, [opts]}}
  end

  @doc """
  Get the name and node of the instance that currently leads the cluster.

  ## Example

  Get the leader node for the default Oban instance:

      Oban.Peer.get_leader()
      "web.1"

  Get the leader node for an alternate instance named `Oban.Private`

      Oban.Peer.get_leader(Oban.Private)
      "web.1"
  """
  @spec get_leader(conf_or_name, timeout()) :: nil | String.t()
  def get_leader(conf_or_name \\ Oban, timeout \\ 5_000) do
    safe_call(conf_or_name, :get_leader, timeout, nil)
  end

  @doc """
  Check whether the current instance leads the cluster.

  ## Example

  Check leadership for the default Oban instance:

      Oban.Peer.leader?()
      # => true

  Check leadership for an alternate instance named `Oban.Private`:

      Oban.Peer.leader?(Oban.Private)
      # => true
  """
  @spec leader?(conf_or_name(), timeout()) :: boolean()
  def leader?(conf_or_name \\ Oban, timeout \\ 5_000) do
    safe_call(conf_or_name, :leader?, timeout, false)
  end

  defp safe_call(%Config{name: name, peer: {peer, _}}, fun, timeout, default) do
    case Registry.whereis(name, Oban.Peer) do
      pid when is_pid(pid) ->
        apply(peer, fun, [pid, timeout])

      _ ->
        default
    end
  catch
    :exit, {:timeout, _} = reason ->
      Logger.warning(
        message: "Oban.Peer.#{fun}/2 check failed due to #{inspect(reason)}",
        source: :oban,
        module: __MODULE__
      )

      default
  end

  defp safe_call(name, fun, timeout, default) do
    name
    |> Oban.config()
    |> safe_call(fun, timeout, default)
  end
end