defmodule Oban.Peers.Global do
  @moduledoc """
  A cluster based peer that coordinates through a distributed registry.
  Leadership is coordinated through global locks. It requires a functional distributed Erlang
  cluster, without one global plugins (Cron, Lifeline, Stager, etc.) will not function correctly.
  ## Usage
  Specify the `Global` peer in your Oban configuration.
      config :my_app, Oban,
        peer: Oban.Peers.Global,
        ...
  """
  @behaviour Oban.Peer
  use GenServer
  alias Oban.{Backoff, Notifier}
  defmodule State do
    @moduledoc false
    defstruct [
      :conf,
      :name,
      :timer,
      interval: :timer.seconds(30),
      leader?: false
    ]
  end
  @impl Oban.Peer
  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts, name: opts[:name])
  end
  @impl Oban.Peer
  def leader?(pid, timeout \\ 5_000) do
    GenServer.call(pid, :leader?, timeout)
  end
  @impl GenServer
  def init(opts) do
    Process.flag(:trap_exit, true)
    {:ok, struct!(State, opts), {:continue, :start}}
  end
  @impl GenServer
  def terminate(_reason, %State{timer: timer} = state) do
    if is_reference(timer), do: Process.cancel_timer(timer)
    if state.leader? do
      try do
        delete_self(state)
        notify_down(state)
      catch
        :exit, _reason -> :ok
      end
    end
    :ok
  end
  @impl GenServer
  def handle_continue(:start, %State{} = state) do
    Notifier.listen(state.conf.name, [:leader])
    handle_info(:election, state)
  end
  @impl GenServer
  def handle_call(:leader?, _from, %State{} = state) do
    {:reply, state.leader?, state}
  end
  @impl GenServer
  def handle_info(:election, %State{} = state) do
    meta = %{conf: state.conf, leader: state.leader?, peer: __MODULE__}
    locked? =
      :telemetry.span([:oban, :peer, :election], meta, fn ->
        locked? = :global.set_lock(key(state), nodes(), 0)
        {locked?, %{meta | leader: locked?}}
      end)
    {:noreply, schedule_election(%{state | leader?: locked?})}
  end
  def handle_info({:notification, :leader, %{"down" => name}}, %State{conf: conf} = state) do
    if name == inspect(conf.name) do
      handle_info(:election, state)
    else
      {:noreply, state}
    end
  end
  # Helpers
  defp schedule_election(%State{interval: interval} = state) do
    time = Backoff.jitter(interval, mode: :dec)
    %{state | timer: Process.send_after(self(), :election, time)}
  end
  defp delete_self(state) do
    :global.del_lock(key(state), nodes())
  end
  defp notify_down(%{conf: conf}) do
    Notifier.notify(conf, :leader, %{down: inspect(conf.name)})
  end
  defp key(state), do: {state.conf.name, state.conf.node}
  defp nodes, do: [Node.self() | Node.list()]
end