lib/oban/plugins/reindexer.ex

defmodule Oban.Plugins.Reindexer do
  @moduledoc """
  Periodically rebuild indexes to minimize database bloat.

  Over time various Oban indexes may grow without `VACUUM` cleaning them up properly. When this
  happens, rebuilding the indexes will release bloat.

  The plugin uses `REINDEX` with the `CONCURRENTLY` option to rebuild without taking any locks
  that prevent concurrent inserts, updates, or deletes on the table.

  Note: This plugin requires the `CONCURRENT` option, which is only available in Postgres 12 and
  above.

  ## Using the Plugin

  By default, the plugin will reindex once a day, at midnight UTC:

      config :my_app, Oban,
        plugins: [Oban.Plugins.Reindexer],
        ...

  To run on a different schedule you can provide a cron expression. For example, you could use the
  `"@weekly"` shorthand to run once a week on Sunday:

      config :my_app, Oban,
        plugins: [{Oban.Plugins.Reindexer, schedule: "@weekly"}],
        ...

  ## Options

    * `:indexes` — a list of indexes to reindex on the `oban_jobs` table. Defaults to only the
      `oban_jobs_args_index` and `oban_jobs_meta_index`.

    * `:schedule` — a cron expression that controls when to reindex. Defaults to `"@midnight"`.

    * `:timeout` - time in milliseconds to wait for each query call to finish. Defaults to 15 seconds.

    * `:timezone` — which timezone to use when evaluating the schedule. To use a timezone other than
      the default of "Etc/UTC" you *must* have a timezone database like [tz][tz] installed and
      configured.

  [tz]: https://hexdocs.pm/tz
  """

  @behaviour Oban.Plugin

  use GenServer

  alias Oban.Cron.Expression
  alias Oban.Plugins.Cron
  alias Oban.{Peer, Plugin, Repo, Validation}

  @type option :: Plugin.option() | {:schedule, binary()}

  defmodule State do
    @moduledoc false

    defstruct [
      :conf,
      :name,
      :schedule,
      :timer,
      indexes: ~w(oban_jobs_args_index oban_jobs_meta_index),
      timezone: "Etc/UTC",
      timeout: :timer.seconds(15)
    ]
  end

  @impl Plugin
  @spec start_link([option()]) :: GenServer.on_start()
  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts, name: opts[:name])
  end

  @impl Plugin
  def validate(opts) do
    Validation.validate(opts, fn
      {:conf, _} -> :ok
      {:name, _} -> :ok
      {:indexes, indexes} -> validate_indexes(indexes)
      {:schedule, schedule} -> validate_schedule(schedule)
      {:timezone, timezone} -> Validation.validate_timezone(:timezone, timezone)
      {:timeout, timeout} -> Validation.validate_timeout(:timeout, timeout)
      option -> {:unknown, option, State}
    end)
  end

  @impl GenServer
  def init(opts) do
    Validation.validate!(opts, &validate/1)

    Process.flag(:trap_exit, true)

    opts =
      opts
      |> Keyword.put_new(:schedule, "@midnight")
      |> Keyword.update!(:schedule, &Expression.parse!/1)

    state =
      State
      |> struct!(opts)
      |> schedule_reindex()

    :telemetry.execute([:oban, :plugin, :init], %{}, %{conf: state.conf, plugin: __MODULE__})

    {:ok, state}
  end

  @impl GenServer
  def terminate(_reason, %State{timer: timer}) do
    if is_reference(timer), do: Process.cancel_timer(timer)

    :ok
  end

  @impl GenServer
  def handle_info(:reindex, %State{} = state) do
    meta = %{conf: state.conf, plugin: __MODULE__}

    :telemetry.span([:oban, :plugin], meta, fn ->
      case check_leadership_and_reindex(state) do
        :ok ->
          {:ok, meta}

        error ->
          {:error, Map.put(meta, :error, error)}
      end
    end)

    {:noreply, schedule_reindex(state)}
  end

  # Validation

  defp validate_indexes(indexes) do
    if is_list(indexes) and Enum.all?(indexes, &is_binary/1) do
      :ok
    else
      {:error, "expected :indexes to be a list of strings, got: #{inspect(indexes)}"}
    end
  end

  defp validate_schedule(schedule) do
    Expression.parse!(schedule)

    :ok
  rescue
    error in [ArgumentError] -> {:error, error}
  end

  # Scheduling

  defp schedule_reindex(state) do
    timer = Process.send_after(self(), :reindex, Cron.interval_to_next_minute())

    %{state | timer: timer}
  end

  # Reindexing

  defp check_leadership_and_reindex(state) do
    {:ok, datetime} = DateTime.now(state.timezone)

    if Peer.leader?(state.conf) and Expression.now?(state.schedule, datetime) do
      queries = [deindex_query(state) | Enum.map(state.indexes, &reindex_query(state, &1))]

      Enum.reduce_while(queries, :ok, fn query, _ ->
        case Repo.query(state.conf, query, [], timeout: state.timeout) do
          {:ok, _} -> {:cont, :ok}
          error -> {:halt, error}
        end
      end)
    end
  end

  defp reindex_query(state, index) do
    prefix = inspect(state.conf.prefix)

    "REINDEX INDEX CONCURRENTLY #{prefix}.#{index}"
  end

  defp deindex_query(state) do
    """
    DO $$
    DECLARE
      rec record;
    BEGIN
      FOR rec IN
        SELECT relname, relnamespace::regnamespace AS namespace
        FROM pg_index i
        JOIN pg_class c on c.oid = i.indexrelid
        WHERE relnamespace = '#{state.conf.prefix}'::regnamespace
          AND NOT indisvalid
          AND starts_with(relname, 'index_oban_jobs')
      LOOP
        EXECUTE format('DROP INDEX %s.%s', rec.namespace, rec.relname);
      END LOOP;
    END $$
    """
  end
end