lib/oban/repo.ex

defmodule Oban.Repo do
  @moduledoc """
  Wrappers around `Ecto.Repo` and `Ecto.Adapters.SQL` callbacks.

  Each function resolves the correct repo instance and sets options such as `prefix` and `log`
  according to `Oban.Config`.

  > #### Meant for Extending Oban {: .warning}
  >
  > These functions should only be used when working with a repo inside engines, plugins, or other
  > extensions for Oban. Favor using your application's repo directly when querying `Oban.Job`
  > from your workers.

  ## Examples

  The first argument for every function must be an `Oban.Config` struct. Many functions pass
  configuration around as a `conf` key, and it can always be fetched with `Oban.config/1`. This
  demonstrates fetching the default instance config and querying all jobs:

      Oban
      |> Oban.config()
      |> Oban.Repo.all(Oban.Job)
  """

  @moduledoc since: "2.2.0"

  alias Oban.{Backoff, Config}

  @callbacks_without_opts [
    config: 0,
    default_options: 1,
    get_dynamic_repo: 0,
    in_transaction?: 0,
    load: 2,
    put_dynamic_repo: 1,
    rollback: 1
  ]

  @callbacks_with_opts [
    aggregate: 3,
    all: 2,
    checkout: 2,
    delete!: 2,
    delete: 2,
    delete_all: 2,
    exists?: 2,
    get!: 3,
    get: 3,
    get_by!: 3,
    get_by: 3,
    insert!: 2,
    insert: 2,
    insert_all: 3,
    insert_or_update!: 2,
    insert_or_update: 2,
    one!: 2,
    one: 2,
    preload: 3,
    reload!: 2,
    reload: 2,
    stream: 2,
    update!: 2,
    update: 2,
    update_all: 3
  ]

  @retry_opts delay: 500, retry: 5, expected_delay: 10, expected_retry: 20

  for {fun, arity} <- @callbacks_without_opts do
    args = [Macro.var(:conf, __MODULE__) | Macro.generate_arguments(arity, __MODULE__)]

    @doc """
    Wraps `c:Ecto.Repo.#{fun}/#{arity}` with an additional `Oban.Config` argument.
    """
    def unquote(fun)(unquote_splicing(args)) do
      __dispatch__(unquote(fun), unquote(args))
    end
  end

  for {fun, arity} <- @callbacks_with_opts do
    args = [Macro.var(:conf, __MODULE__) | Macro.generate_arguments(arity - 1, __MODULE__)]

    @doc """
    Wraps `c:Ecto.Repo.#{fun}/#{arity}` with an additional `Oban.Config` argument.
    """
    def unquote(fun)(unquote_splicing(args), opts \\ []) do
      __dispatch__(unquote(fun), unquote(args), opts)
    end
  end

  # Manually Defined

  @doc """
  The default values extracted from `Oban.Config` for use in all queries with options.
  """
  @doc since: "2.14.0"
  def default_options(conf) do
    base = [log: conf.log, oban: true, telemetry_options: [oban_conf: conf]]

    if conf.prefix do
      [prefix: conf.prefix] ++ base
    else
      base
    end
  end

  @doc """
  Wraps `Ecto.Adapters.SQL.Repo.query/4` with an added `Oban.Config` argument.
  """
  @doc since: "2.2.0"
  def query(conf, statement, params \\ [], opts \\ []) do
    __dispatch__(:query, [conf, statement, params], opts)
  end

  @doc """
  Wraps `Ecto.Adapters.SQL.Repo.query!/4` with an added `Oban.Config` argument.
  """
  @doc since: "2.17.0"
  def query!(conf, statement, params \\ [], opts \\ []) do
    __dispatch__(:query!, [conf, statement, params], opts)
  end

  @doc """
  Wraps `Ecto.Adapters.SQL.Repo.to_sql/2` with an added `Oban.Config` argument.
  """
  @doc since: "2.2.0"
  def to_sql(conf, kind, queryable) do
    query =
      queryable
      |> Ecto.Queryable.to_query()
      |> Map.put(:prefix, conf.prefix)

    conf.repo.to_sql(kind, query)
  end

  @doc """
  Wraps `c:Ecto.Repo.transaction/2` with an additional `Oban.Config` argument and automatic
  retries with backoff.

  ## Options

  Backoff helpers, in addition to the standard transaction options:

  * `delay` — the time to sleep between retries, defaults to `500ms`
  * `retry` — the number of retries for unexpected errors, defaults to `5`
  * `expected_delay` — the time to sleep between expected errors, e.g. `serialization` or
    `lock_not_available`, defaults to `10ms`
  * `expected_retry` — the number of retries for expected errors, defaults to `20`
  """
  @doc since: "2.18.1"
  def transaction(conf, fun_or_multi, opts \\ []) do
    transaction(conf, fun_or_multi, opts, 1)
  end

  defp transaction(conf, fun_or_multi, opts, attempt) do
    __dispatch__(:transaction, [conf, fun_or_multi], opts)
  rescue
    error in [DBConnection.ConnectionError, Postgrex.Error, MyXQL.Error] ->
      opts = Keyword.merge(@retry_opts, opts)

      cond do
        expected_error?(error) and attempt < opts[:expected_retry] ->
          jittery_sleep(opts[:expected_delay])

        attempt < opts[:retry] ->
          jittery_sleep(attempt * opts[:delay])

        true ->
          reraise error, __STACKTRACE__
      end

      transaction(conf, fun_or_multi, opts, attempt + 1)
  end

  defp expected_error?(%_{postgres: %{code: :lock_not_available}}), do: true
  defp expected_error?(%_{postgres: %{code: :serialization_failure}}), do: true
  defp expected_error?(_error), do: false

  defp jittery_sleep(delay), do: delay |> Backoff.jitter() |> Process.sleep()

  defp __dispatch__(name, [%Config{} = conf | args]) do
    with_dynamic_repo(conf, name, args)
  end

  defp __dispatch__(name, [%Config{} = conf | args], opts) when is_list(opts) do
    opts =
      conf
      |> default_options()
      |> Keyword.merge(opts)

    with_dynamic_repo(conf, name, args ++ [opts])
  end

  defp with_dynamic_repo(%{get_dynamic_repo: fun} = conf, name, args)
       when is_function(fun, 0) or is_tuple(fun) do
    prev_instance = conf.repo.get_dynamic_repo()

    dynamic_repo =
      case fun do
        {module, func, args} -> apply(module, func, args)
        fun -> fun.()
      end

    try do
      if not in_transaction?(conf, prev_instance) do
        conf.repo.put_dynamic_repo(dynamic_repo)
      end

      apply(conf.repo, name, args)
    after
      conf.repo.put_dynamic_repo(prev_instance)
    end
  end

  defp with_dynamic_repo(conf, name, args) do
    apply(conf.repo, name, args)
  end

  defp in_transaction?(conf, instance) when is_pid(instance), do: conf.repo.in_transaction?()

  defp in_transaction?(conf, instance) when is_atom(instance) do
    case GenServer.whereis(instance) do
      pid when is_pid(pid) -> in_transaction?(conf, pid)
      _ -> false
    end
  end

  defp in_transaction?(_, _), do: false
end