defmodule Oban.Peer do
@moduledoc """
The `Peer` module maintains leadership for a particular Oban instance within a cluster.
Leadership is used by plugins, primarily, to prevent duplicate work accross nodes. For example,
only the leader's `Cron` plugin will insert new jobs. You can use peer leadership to extend Oban
with custom plugins, or even within your own application.
Note a few important details about how peer leadership operates:
* Leadership is coordinated through the `oban_peers` table in your database. It doesn't require
distributed Erlang or any other interconnectivity.
* Each peer checks for leadership at a 30 second interval. When the leader exits it broadcasts a
message to all other peers to encourage another one to assume leadership.
* Each Oban instances supervises a distinct `Oban.Peer` instance. That means that with multiple
Oban instances on the same node one instance may be the leader, while the others aren't.
* Without leadership some plugins may not run on any node.
## Examples
Check leadership for the default Oban instance:
Oban.Peer.leader?()
# => true
That is identical to using the name `Oban`:
Oban.Peer.leader?(Oban)
# => true
Check leadership for a couple of instances:
Oban.Peer.leader?(Oban.A)
# => true
Oban.Peer.leader?(Oban.B)
# => false
"""
use GenServer
import Ecto.Query, only: [where: 2, where: 3]
alias Oban.{Backoff, Config, Notifier, Registry, Repo}
require Logger
@type option ::
{:name, module()}
| {:conf, Config.t()}
| {:interval, timeout()}
defmodule State do
@moduledoc false
defstruct [
:conf,
:name,
:timer,
interval: :timer.seconds(15),
leader?: false,
leader_boost: 2
]
end
@doc """
Check whether the current instance leads the cluster.
## Example
Check leadership for the default Oban instance:
Oban.Peer.leader?()
# => true
Check leadership for an alternate instance named `Oban.Private`:
Oban.Peer.leader?(Oban.Private)
# => true
"""
@spec leader?(Config.t() | GenServer.server()) :: boolean()
def leader?(name \\ Oban)
def leader?(%Config{name: name}), do: leader?(name)
def leader?(pid) when is_pid(pid), do: GenServer.call(pid, :leader?)
def leader?(name) do
name
|> Registry.whereis(__MODULE__)
|> leader?()
end
@doc false
@spec child_spec(Keyword.t()) :: Supervisor.child_spec()
def child_spec(opts) do
name = Keyword.get(opts, :name, __MODULE__)
opts
|> super()
|> Supervisor.child_spec(id: name)
end
@doc false
@spec start_link([option]) :: GenServer.on_start()
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: opts[:name])
end
@impl GenServer
def init(opts) do
Process.flag(:trap_exit, true)
case Keyword.fetch!(opts, :conf) do
%{plugins: []} ->
:ignore
_ ->
{:ok, struct!(State, opts), {:continue, :start}}
end
end
@impl GenServer
def terminate(_reason, %State{timer: timer} = state) do
if is_reference(timer), do: Process.cancel_timer(timer)
if state.leader? do
try do
Repo.transaction(state.conf, fn ->
delete_self(state)
notify_down(state)
end)
catch
:exit, _reason -> :ok
end
end
:ok
end
@impl GenServer
def handle_continue(:start, %State{} = state) do
Notifier.listen(state.conf.name, [:leader])
handle_info(:election, state)
end
@impl GenServer
def handle_info(:election, %State{} = state) do
{:ok, state} =
Repo.transaction(state.conf, fn ->
state
|> delete_expired_peers()
|> upsert_peer()
end)
{:noreply, schedule_election(state)}
rescue
error in [Postgrex.Error] ->
if error.postgres.code == :undefined_table do
Logger.warn("""
The `oban_peers` table is undefined and leadership is disabled.
Run migrations up to v11 to restore peer leadership. In the meantime, distributed plugins
(e.g. Cron, Pruner, Stager) will not run on any nodes.
""")
end
{:noreply, schedule_election(%{state | leader?: false})}
end
def handle_info({:notification, :leader, %{"down" => name}}, %State{conf: conf} = state) do
if name == inspect(conf.name) do
{:noreply, schedule_election(%{state | interval: 0})}
else
{:noreply, state}
end
end
@impl GenServer
def handle_call(:leader?, _from, %State{} = state) do
{:reply, state.leader?, state}
end
# Helpers
defp schedule_election(%State{interval: interval} = state) do
base = if state.leader?, do: div(interval, state.leader_boost), else: interval
time = Backoff.jitter(base, mode: :dec)
%{state | timer: Process.send_after(self(), :election, time)}
end
defp delete_expired_peers(%State{conf: conf} = state) do
query =
"oban_peers"
|> where([p], p.name == ^inspect(conf.name))
|> where([p], p.expires_at < ^DateTime.utc_now())
Repo.delete_all(conf, query)
state
end
defp delete_self(%State{conf: conf}) do
query = where("oban_peers", name: ^inspect(conf.name), node: ^conf.node)
Repo.delete_all(conf, query)
end
defp notify_down(%State{conf: conf}) do
Notifier.notify(conf, :leader, %{down: inspect(conf.name)})
end
defp upsert_peer(%State{conf: conf} = state) do
started_at = DateTime.utc_now()
expires_at = DateTime.add(started_at, state.interval, :millisecond)
peer_data = %{
name: inspect(conf.name),
node: conf.node,
started_at: started_at,
expires_at: expires_at
}
repo_opts =
if state.leader? do
[conflict_target: :name, on_conflict: [set: [expires_at: expires_at]]]
else
[on_conflict: :nothing]
end
case Repo.insert_all(conf, "oban_peers", [peer_data], repo_opts) do
{1, _} -> %{state | leader?: true}
{0, _} -> %{state | leader?: false}
end
end
end