lib/oban/engine.ex

defmodule Oban.Engine do
  @moduledoc """
  Defines an Engine for job orchestration.

  Engines are responsible for all non-plugin database interaction, from inserting through
  executing jobs.

  Oban ships with two Engine implementations:

  1. `Basic` — The default engine for development, production, and manual testing mode.
  2. `Inline` — Designed specifically for testing, it executes jobs immediately, in-memory, as
     they are inserted.

  > #### 🌟 SmartEngine {: .info}
  >
  > The Basic engine lacks advanced functionality such as global limits, rate limits, and
  > unique bulk insert. For those features and more, see the [`SmartEngine` in Oban
  > Pro](https://getoban.pro/docs/pro/smart_engine.html).
  """

  alias Ecto.{Changeset, Multi}
  alias Oban.{Config, Job}

  @type conf :: Config.t()
  @type job :: Job.t()
  @type meta :: map()
  @type opts :: Keyword.t()
  @type queryable :: Ecto.Queryable.t()
  @type running :: map()
  @type seconds :: pos_integer()

  @doc """
  Initialize metadata for a queue engine.

  Queue metadata is used to identify and track subsequent actions such as fetching or staging
  jobs.
  """
  @callback init(conf(), opts()) :: {:ok, meta()} | {:error, term()}

  @doc """
  Store the given key/value pair in the engine meta.
  """
  @callback put_meta(conf(), meta(), atom(), term()) :: meta()

  @doc """
  Format engine meta in a digestible format for queue inspection.
  """
  @callback check_meta(conf(), meta(), running()) :: map()

  @doc """
  Refresh a queue to indicate that it is still alive.
  """
  @callback refresh(conf(), meta()) :: meta()

  @doc """
  Prepare a queue engine for shutdown.

  The queue process is expected to stop processing new jobs after shutdown starts, though it may
  continue executing jobs that are already running.
  """
  @callback shutdown(conf(), meta()) :: meta()

  @doc """
  Insert a job into the database.
  """
  @callback insert_job(conf(), Job.changeset(), opts()) :: {:ok, Job.t()} | {:error, term()}

  @doc """
  Insert a job within an `Ecto.Multi`.
  """
  @callback insert_job(conf(), Multi.t(), Multi.name(), Oban.changeset_or_fun(), opts()) ::
              Multi.t()

  @doc """
  Insert multiple jobs into the database.
  """
  @callback insert_all_jobs(conf(), Oban.changesets_or_wrapper(), opts()) :: [Job.t()]

  @doc """
  Insert multiple jobs within an `Ecto.Multi`
  """
  @callback insert_all_jobs(
              conf(),
              Multi.t(),
              Multi.name(),
              Oban.changesets_or_wrapper_or_fun(),
              opts()
            ) :: Multi.t()

  @doc """
  Fetch available jobs for the given queue, up to configured limits.
  """
  @callback fetch_jobs(conf(), meta(), running()) :: {:ok, {meta(), [Job.t()]}} | {:error, term()}

  @doc """
  Record that a job completed successfully.
  """
  @callback complete_job(conf(), Job.t()) :: :ok

  @doc """
  Transition a job to `discarded` and record an optional reason that it shouldn't be ran again.
  """
  @callback discard_job(conf(), Job.t()) :: :ok

  @doc """
  Record an executing job's errors and either retry or discard it, depending on whether it has
  exhausted its available attempts.
  """
  @callback error_job(conf(), Job.t(), seconds()) :: :ok

  @doc """
  Reschedule an executing job to run some number of seconds in the future.
  """
  @callback snooze_job(conf(), Job.t(), seconds()) :: :ok

  @doc """
  Mark an `executing`, `available`, `scheduled` or `retryable` job as `cancelled` to prevent it
  from running.
  """
  @callback cancel_job(conf(), Job.t()) :: :ok

  @doc """
  Mark many `executing`, `available`, `scheduled` or `retryable` job as `cancelled` to prevent them
  from running.
  """
  @callback cancel_all_jobs(conf(), queryable()) :: {:ok, {non_neg_integer(), [Job.t()]}}

  @doc """
  Mark a job as `available`, adding attempts if already maxed out. If the job is currently
  `available`, `executing` or `scheduled` it should be ignored.
  """
  @callback retry_job(conf(), Job.t()) :: :ok

  @doc """
  Mark many jobs as `available`, adding attempts if already maxed out. Any jobs currently
  `available`, `executing` or `scheduled` should be ignored.
  """
  @callback retry_all_jobs(conf(), queryable()) :: {:ok, non_neg_integer()}

  @doc false
  def init(%Config{} = conf, [_ | _] = opts) do
    with_span(:init, conf, fn engine ->
      engine.init(conf, opts)
    end)
  end

  @doc false
  def refresh(%Config{} = conf, %{} = meta) do
    with_span(:refresh, conf, fn engine ->
      engine.refresh(conf, meta)
    end)
  end

  @doc false
  def shutdown(%Config{} = conf, %{} = meta) do
    with_span(:shutdown, conf, fn engine ->
      engine.shutdown(conf, meta)
    end)
  end

  @doc false
  def put_meta(%Config{} = conf, %{} = meta, key, value) when is_atom(key) do
    with_span(:put_meta, conf, fn engine ->
      engine.put_meta(conf, meta, key, value)
    end)
  end

  @doc false
  def check_meta(%Config{} = conf, %{} = meta, %{} = running) do
    with_span(:check_meta, conf, fn engine ->
      engine.check_meta(conf, meta, running)
    end)
  end

  @doc false
  def insert_job(%Config{} = conf, %Changeset{} = changeset, opts) do
    meta = %{changeset: changeset, opts: opts}

    with_span(:insert_job, conf, meta, fn engine ->
      engine.insert_job(conf, changeset, opts)
    end)
  end

  @doc false
  def insert_job(%Config{} = conf, %Multi{} = multi, name, changeset, opts) do
    meta = %{changeset: changeset, opts: opts}

    with_span(:insert_job, conf, meta, fn engine ->
      engine.insert_job(conf, multi, name, changeset, opts)
    end)
  end

  @doc false
  def insert_all_jobs(%Config{} = conf, changesets, opts) do
    meta = %{changesets: changesets, opts: opts}

    with_span(:insert_all_jobs, conf, meta, fn engine ->
      engine.insert_all_jobs(conf, changesets, opts)
    end)
  end

  @doc false
  def insert_all_jobs(%Config{} = conf, %Multi{} = multi, name, changesets, opts) do
    meta = %{changesets: changesets, opts: opts}

    with_span(:insert_all_jobs, conf, meta, fn engine ->
      engine.insert_all_jobs(conf, multi, name, changesets, opts)
    end)
  end

  @doc false
  def fetch_jobs(%Config{} = conf, %{} = meta, %{} = running) do
    with_span(:fetch_jobs, conf, fn engine ->
      engine.fetch_jobs(conf, meta, running)
    end)
  end

  @doc false
  def complete_job(%Config{} = conf, %Job{} = job) do
    with_span(:complete_job, conf, %{job: job}, fn engine ->
      engine.complete_job(conf, job)
    end)
  end

  @doc false
  def discard_job(%Config{} = conf, %Job{} = job) do
    with_span(:discard_job, conf, %{job: job}, fn engine ->
      engine.discard_job(conf, job)
    end)
  end

  @doc false
  def error_job(%Config{} = conf, %Job{} = job, seconds) when is_integer(seconds) do
    with_span(:error_job, conf, %{job: job}, fn engine ->
      engine.error_job(conf, job, seconds)
    end)
  end

  @doc false
  def snooze_job(%Config{} = conf, %Job{} = job, seconds) when is_integer(seconds) do
    with_span(:snooze_job, conf, %{job: job}, fn engine ->
      engine.snooze_job(conf, job, seconds)
    end)
  end

  @doc false
  def cancel_job(%Config{} = conf, %Job{} = job) do
    with_span(:cancel_job, conf, %{job: job}, fn engine ->
      engine.cancel_job(conf, job)
    end)
  end

  @doc false
  def cancel_all_jobs(%Config{} = conf, queryable) do
    with_span(:cancel_all_jobs, conf, fn engine ->
      engine.cancel_all_jobs(conf, queryable)
    end)
  end

  @doc false
  def retry_job(%Config{} = conf, %Job{} = job) do
    with_span(:retry_job, conf, %{job: job}, fn engine ->
      engine.retry_job(conf, job)
    end)
  end

  @doc false
  def retry_all_jobs(%Config{} = conf, queryable) do
    with_span(:retry_all_jobs, conf, fn engine ->
      engine.retry_all_jobs(conf, queryable)
    end)
  end

  defp with_span(event, %Config{} = conf, tele_meta \\ %{}, fun) do
    tele_meta = Map.merge(tele_meta, %{conf: conf, engine: conf.engine})

    :telemetry.span([:oban, :engine, event], tele_meta, fn ->
      engine = Config.get_engine(conf)

      {fun.(engine), tele_meta}
    end)
  end
end