lib/oban/repo.ex

defmodule Oban.Repo do
  @moduledoc """
  Wrappers around `Ecto.Repo` callbacks.

  These functions should be used when working with an Ecto repo inside a plugin. These functions
  will resolve the correct repo instance, and set the schema prefix and the log level, according
  to the Oban configuration.
  """

  alias Ecto.{Changeset, Multi, Query, Queryable, Schema}
  alias Oban.Config

  @doc "Wraps `c:Ecto.Repo.all/2`."
  @doc since: "2.2.0"
  @spec all(Config.t(), Queryable.t(), Keyword.t()) :: [Schema.t()]
  def all(conf, queryable, opts \\ []) do
    with_dynamic_repo(
      conf,
      fn -> conf.repo.all(queryable, query_opts(conf, opts)) end
    )
  end

  @doc "Wraps `c:Ecto.Repo.checkout/2`."
  @doc since: "2.2.0"
  @spec checkout(Config.t(), (() -> result), Keyword.t()) :: result when result: var
  def checkout(conf, function, opts \\ []) do
    with_dynamic_repo(
      conf,
      fn -> conf.repo.checkout(function, query_opts(conf, opts)) end
    )
  end

  @doc "Wraps `c:Ecto.Repo.config/0`."
  @doc since: "2.2.0"
  @spec config(Config.t()) :: Keyword.t()
  def config(conf), do: with_dynamic_repo(conf, &conf.repo.config/0)

  @doc "Wraps `c:Ecto.Repo.delete/2`."
  @doc since: "2.4.0"
  @spec delete(
          Config.t(),
          struct_or_changeset :: Schema.t() | Changeset.t(),
          opts :: Keyword.t()
        ) :: {:ok, Schema.t()} | {:error, Changeset.t()}
  def delete(conf, struct_or_changeset, opts \\ []) do
    with_dynamic_repo(
      conf,
      fn -> conf.repo.delete(struct_or_changeset, query_opts(conf, opts)) end
    )
  end

  @doc "Wraps `c:Ecto.Repo.delete_all/2`."
  @doc since: "2.2.0"
  @spec delete_all(Config.t(), Queryable.t(), Keyword.t()) :: {integer(), nil | [term()]}
  def delete_all(conf, queryable, opts \\ []) do
    with_dynamic_repo(
      conf,
      fn -> conf.repo.delete_all(queryable, query_opts(conf, opts)) end
    )
  end

  @doc "Wraps `c:Ecto.Repo.insert/2`."
  @doc since: "2.2.0"
  @spec insert(Config.t(), Schema.t() | Changeset.t(), Keyword.t()) ::
          {:ok, Schema.t()} | {:error, Changeset.t()}
  def insert(conf, struct_or_changeset, opts \\ []) do
    with_dynamic_repo(
      conf,
      fn -> conf.repo.insert(struct_or_changeset, query_opts(conf, opts)) end
    )
  end

  @doc "Wraps `c:Ecto.Repo.insert_all/3`."
  @doc since: "2.2.0"
  @spec insert_all(
          Config.t(),
          binary() | {binary(), module()} | module(),
          [map() | [{atom(), term() | Query.t()}]],
          Keyword.t()
        ) :: {integer(), nil | [term()]}
  def insert_all(conf, schema_or_source, entries, opts \\ []) do
    with_dynamic_repo(
      conf,
      fn -> conf.repo.insert_all(schema_or_source, entries, query_opts(conf, opts)) end
    )
  end

  @doc "Wraps `c:Ecto.Repo.one/2`."
  @doc since: "2.2.0"
  @spec one(Config.t(), Queryable.t(), Keyword.t()) :: Schema.t() | nil
  def one(conf, queryable, opts \\ []) do
    with_dynamic_repo(
      conf,
      fn -> conf.repo.one(queryable, query_opts(conf, opts)) end
    )
  end

  @doc "Wraps `Ecto.Adapters.SQL.Repo.query/4`."
  @doc since: "2.2.0"
  @spec query(Config.t(), String.t(), [term()], Keyword.t()) ::
          {:ok,
           %{
             :rows => nil | [[term()] | binary()],
             :num_rows => non_neg_integer(),
             optional(atom()) => any()
           }}
          | {:error, Exception.t()}
  def query(conf, sql, params \\ [], opts \\ []) do
    with_dynamic_repo(conf, fn -> conf.repo.query(sql, params, query_opts(conf, opts)) end)
  end

  @doc "Wraps `c:Ecto.Repo.stream/2`"
  @doc since: "2.9.0"
  @spec stream(Config.t(), Queryable.t(), Keyword.t()) :: Enum.t()
  def stream(conf, queryable, opts \\ []) do
    with_dynamic_repo(conf, fn -> conf.repo.stream(queryable, query_opts(conf, opts)) end)
  end

  @doc "Wraps `c:Ecto.Repo.transaction/2`."
  @doc since: "2.2.0"
  @spec transaction(Config.t(), (... -> any()) | Multi.t(), opts :: Keyword.t()) ::
          {:ok, any()}
          | {:error, any()}
          | {:error, Multi.name(), any(), %{required(Multi.name()) => any()}}
  def transaction(conf, fun_or_multi, opts \\ []) do
    with_dynamic_repo(
      conf,
      fn -> conf.repo.transaction(fun_or_multi, default_opts(conf, opts)) end
    )
  end

  @doc "Wraps `Ecto.Adapters.SQL.Repo.to_sql/2`."
  @doc since: "2.2.0"
  @spec to_sql(Config.t(), :all | :update_all | :delete_all, Queryable.t()) ::
          {String.t(), [term()]}
  def to_sql(conf, kind, queryable) do
    queryable =
      case Map.fetch(conf, :prefix) do
        :error -> queryable
        {:ok, prefix} -> queryable |> Queryable.to_query() |> Map.put(:prefix, prefix)
      end

    conf.repo.to_sql(kind, queryable)
  end

  @doc "Wraps `c:Ecto.Repo.update/2`."
  @doc since: "2.2.0"
  @spec update(Config.t(), Changeset.t(), Keyword.t()) ::
          {:ok, Schema.t()} | {:error, Changeset.t()}
  def update(conf, changeset, opts \\ []) do
    with_dynamic_repo(conf, fn -> conf.repo.update(changeset, query_opts(conf, opts)) end)
  end

  @doc "Wraps `c:Ecto.Repo.update_all/3`."
  @doc since: "2.2.0"
  @spec update_all(Config.t(), Queryable.t(), Keyword.t(), Keyword.t()) ::
          {integer(), nil | [term()]}
  def update_all(conf, queryable, updates, opts \\ []) do
    with_dynamic_repo(
      conf,
      fn -> conf.repo.update_all(queryable, updates, query_opts(conf, opts)) end
    )
  end

  @doc false
  @spec with_dynamic_repo(Config.t(), fun()) :: any()
  def with_dynamic_repo(conf, fun) do
    case get_dynamic_repo(conf) do
      nil ->
        fun.()

      instance ->
        prev_instance = conf.repo.get_dynamic_repo()
        maybe_run_in_transaction(conf, fun, instance, prev_instance)
    end
  end

  defp get_dynamic_repo(%{get_dynamic_repo: fun}) when is_function(fun, 0), do: fun.()
  defp get_dynamic_repo(_conf), do: nil

  defp maybe_run_in_transaction(conf, fun, instance, prev_instance) do
    unless in_transaction?(conf, prev_instance) do
      conf.repo.put_dynamic_repo(instance)
    end

    fun.()
  after
    conf.repo.put_dynamic_repo(prev_instance)
  end

  defp in_transaction?(conf, instance) when is_pid(instance), do: conf.repo.in_transaction?()

  defp in_transaction?(conf, instance) when is_atom(instance) do
    case GenServer.whereis(instance) do
      pid when is_pid(pid) ->
        in_transaction?(conf, pid)

      _ ->
        false
    end
  end

  defp in_transaction?(_, _), do: false

  defp default_opts(conf, opts) do
    Keyword.put(opts, :log, conf.log)
  end

  defp query_opts(conf, opts) do
    opts
    |> Keyword.put(:log, conf.log)
    |> Keyword.put(:prefix, conf.prefix)
    |> Keyword.put(:telemetry_options, oban_conf: conf)
  end
end