lib/lifebelt.ex

defmodule Lifebelt do
  use GenServer

  @behaviour Oban.Plugin

  import Ecto.Query, only: [select: 3, where: 3]
  import Access, only: [all: 0]
  alias Oban.{Job, Peer, Plugin, Repo, Validation}

  @type option ::
          Plugin.option()
          | {:interval, timeout()}
          | {:rescue_after, pos_integer()}
  defmodule State do
    @moduledoc false

    defstruct [
      :conf,
      :name,
      :timer,
      interval: :timer.minutes(1),
      rescue_after: :timer.minutes(60)
    ]
  end

  @impl Plugin
  @spec start_link([option()]) :: GenServer.on_start()
  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts, name: opts[:name])
  end

  @impl Oban.Plugin
  def validate(opts) do
    Validation.validate(opts, fn
      {:conf, _} -> :ok
      {:name, _} -> :ok
      {:interval, interval} -> Validation.validate_integer(:interval, interval)
      {:rescue_after, interval} -> Validation.validate_integer(:rescue_after, interval)
      option -> {:unknown, option, State}
    end)
  end

  @impl GenServer
  def init(opts) do
    Validation.validate!(opts, &validate/1)

    state =
      State
      |> struct!(opts)
      |> schedule_rescue()

    :telemetry.execute([:oban, :plugin, :init], %{}, %{conf: state.conf, plugin: __MODULE__})

    {:ok, state}
  end

  @impl GenServer
  def terminate(_reason, %State{timer: timer}) do
    if is_reference(timer), do: Process.cancel_timer(timer)

    :ok
  end

  @impl GenServer
  def handle_info(:rescue, %State{} = state) do
    meta = %{conf: state.conf, plugin: __MODULE__}

    :telemetry.span([:oban, :plugin], meta, fn ->
      case check_leadership_and_rescue_jobs(state) do
        {:ok, extra} when is_map(extra) ->
          {:ok, Map.merge(meta, extra)}

        error ->
          {:error, Map.put(meta, :error, error)}
      end
    end)

    {:noreply, schedule_rescue(state)}
  end

  # Scheduling

  defp schedule_rescue(state) do
    timer = Process.send_after(self(), :rescue, state.interval)

    %{state | timer: timer}
  end

  # Rescuing

  defp check_leadership_and_rescue_jobs(state) do
    if Peer.leader?(state.conf) do
      Repo.transaction(state.conf, fn ->
        time = DateTime.add(DateTime.utc_now(), -state.rescue_after, :millisecond)
        running_ids = get_running_ids(state)

        base =
          where(
            Job,
            [j],
            j.state == "executing" and j.attempted_at < ^time and j.id not in ^running_ids
          )

        {rescued_count, rescued} = transition_available(base, state)
        {discard_count, discard} = transition_discarded(base, state)

        %{
          discarded_count: discard_count,
          discarded_jobs: discard,
          rescued_count: rescued_count,
          rescued_jobs: rescued
        }
      end)
    else
      {:ok, %{}}
    end
  end

  defp get_running_ids(%State{} = state) do
    state.conf
    |> Map.get(:queues)
    |> Keyword.keys()
    |> Enum.map(&Oban.check_queue(state.conf.name, queue: &1))
    |> get_in([all(), :running])
    |> List.flatten()
  end

  defp transition_available(base, state) do
    query =
      base
      |> where([j], j.attempt < j.max_attempts)
      |> select([j], map(j, [:id, :queue, :state]))

    Repo.update_all(state.conf, query, set: [state: "available"])
  end

  defp transition_discarded(base, state) do
    query =
      base
      |> where([j], j.attempt >= j.max_attempts)
      |> select([j], map(j, [:id, :queue, :state]))

    Repo.update_all(state.conf, query, set: [state: "discarded", discarded_at: DateTime.utc_now()])
  end
end