lib/oban/engine.ex

defmodule Oban.Engine do
  @moduledoc """
  Defines an Engine for job orchestration.

  Engines are responsible for all non-plugin database interaction, from inserting through
  executing jobs.

  Oban ships with three Engine implementations:

  1. `Basic` — The default engine for development, production, and manual testing mode.
  2. `Inline` — Designed specifically for testing, it executes jobs immediately, in-memory, as
     they are inserted.
  3. `Lite` - The engine for running Oban using SQLite3.

  > #### 🌟 SmartEngine {: .info}
  >
  > The Basic engine lacks advanced functionality such as global limits, rate limits, and
  > unique bulk insert. For those features and more, see the [`Smart` engine in Oban
  > Pro](https://getoban.pro/docs/pro/Oban.Pro.Engines.Smart.html).
  """

  alias Ecto.{Changeset, Multi}
  alias Oban.{Config, Job, Notifier}

  @type conf :: Config.t()
  @type job :: Job.t()
  @type meta :: map()
  @type opts :: Keyword.t()
  @type queryable :: Ecto.Queryable.t()
  @type running :: map()
  @type seconds :: non_neg_integer()

  @doc """
  Initialize metadata for a queue engine.

  Queue metadata is used to identify and track subsequent actions such as fetching or staging
  jobs.
  """
  @callback init(conf(), opts()) :: {:ok, meta()} | {:error, term()}

  @doc """
  Store the given key/value pair in the engine meta.
  """
  @callback put_meta(conf(), meta(), atom(), term()) :: meta()

  @doc """
  Format engine meta in a digestible format for queue inspection.
  """
  @callback check_meta(conf(), meta(), running()) :: map()

  @doc """
  Refresh a queue to indicate that it is still alive.
  """
  @callback refresh(conf(), meta()) :: meta()

  @doc """
  Prepare a queue engine for shutdown.

  The queue process is expected to stop processing new jobs after shutdown starts, though it may
  continue executing jobs that are already running.
  """
  @callback shutdown(conf(), meta()) :: meta()

  @doc """
  Insert a job into the database.
  """
  @callback insert_job(conf(), Job.changeset(), opts()) :: {:ok, Job.t()} | {:error, term()}

  @doc deprecated: "Handled automatically by engine dispatch."
  @callback insert_job(conf(), Multi.t(), Multi.name(), [Job.changeset()], opts()) :: Multi.t()

  @doc """
  Insert multiple jobs into the database.
  """
  @callback insert_all_jobs(conf(), [Job.changeset()], opts()) :: [Job.t()]

  @doc deprecated: "Handled automatically by engine dispatch."
  @callback insert_all_jobs(conf(), Multi.t(), Multi.name(), [Job.changeset()], opts()) ::
              Multi.t()

  @doc """
  Transition scheduled or retryable jobs to available prior to execution.
  """
  @callback stage_jobs(conf(), queryable(), opts()) :: {:ok, [Job.t()]}

  @doc """
  Fetch available jobs for the given queue, up to configured limits.
  """
  @callback fetch_jobs(conf(), meta(), running()) :: {:ok, {meta(), [Job.t()]}} | {:error, term()}

  @doc """
  Delete completed, cancelled and discarded jobs.
  """
  @callback prune_jobs(conf(), queryable(), opts()) :: {:ok, [Job.t()]}

  @doc """
  Record that a job completed successfully.
  """
  @callback complete_job(conf(), Job.t()) :: :ok

  @doc """
  Transition a job to `discarded` and record an optional reason that it shouldn't be ran again.
  """
  @callback discard_job(conf(), Job.t()) :: :ok

  @doc """
  Record an executing job's errors and either retry or discard it, depending on whether it has
  exhausted its available attempts.
  """
  @callback error_job(conf(), Job.t(), seconds()) :: :ok

  @doc """
  Reschedule an executing job to run some number of seconds in the future.
  """
  @callback snooze_job(conf(), Job.t(), seconds()) :: :ok

  @doc """
  Mark an `executing`, `available`, `scheduled` or `retryable` job as `cancelled` to prevent it
  from running.
  """
  @callback cancel_job(conf(), Job.t()) :: :ok

  @doc """
  Mark a job as `available`, adding attempts if already maxed out. If the job is currently
  `available`, `executing` or `scheduled` it should be ignored.
  """
  @callback retry_job(conf(), Job.t()) :: :ok

  @doc """
  Mark many `executing`, `available`, `scheduled` or `retryable` job as `cancelled` to prevent them
  from running.
  """
  @callback cancel_all_jobs(conf(), queryable()) :: {:ok, [Job.t()]}

  @doc """
  Mark many jobs as `available`, adding attempts if already maxed out. Any jobs currently
  `available`, `executing` or `scheduled` should be ignored.
  """
  @callback retry_all_jobs(conf(), queryable()) :: {:ok, [Job.t()]}

  @optional_callbacks [insert_all_jobs: 5, insert_job: 5, prune_jobs: 3, stage_jobs: 3]

  @doc false
  def init(%Config{} = conf, [_ | _] = opts) do
    with_span(:init, conf, %{opts: opts}, fn engine ->
      engine.init(conf, opts)
    end)
  end

  @doc false
  def refresh(%Config{} = conf, %{} = meta) do
    with_span(:refresh, conf, %{meta: meta}, fn engine ->
      engine.refresh(conf, meta)
    end)
  end

  @doc false
  def shutdown(%Config{} = conf, %{} = meta) do
    with_span(:shutdown, conf, %{meta: meta}, fn engine ->
      engine.shutdown(conf, meta)
    end)
  end

  @doc false
  def put_meta(%Config{} = conf, %{} = meta, key, value) when is_atom(key) do
    with_span(:put_meta, conf, %{meta: meta, key: key, value: value}, fn engine ->
      engine.put_meta(conf, meta, key, value)
    end)
  end

  @doc false
  def check_meta(%Config{} = conf, %{} = meta, %{} = running) do
    with_span(:check_meta, conf, %{meta: meta, running: running}, fn engine ->
      engine.check_meta(conf, meta, running)
    end)
  end

  @doc false
  def insert_job(%Config{} = conf, %Changeset{} = changeset, opts) do
    with_span(:insert_job, conf, %{changeset: changeset, opts: opts}, fn engine ->
      with {:ok, job} <- engine.insert_job(conf, changeset, opts) do
        notify_trigger(conf, [job])

        {:meta, {:ok, job}, %{job: job}}
      end
    end)
  end

  @doc false
  def insert_job(%Config{} = conf, %Multi{} = multi, name, fun, opts) when is_function(fun, 1) do
    Multi.run(multi, name, fn repo, changes ->
      insert_job(%{conf | repo: repo}, fun.(changes), opts)
    end)
  end

  def insert_job(%Config{} = conf, %Multi{} = multi, name, changeset, opts) do
    Multi.run(multi, name, fn repo, _changes ->
      insert_job(%{conf | repo: repo}, changeset, opts)
    end)
  end

  @doc false
  def insert_all_jobs(%Config{} = conf, changesets, opts) do
    with_span(:insert_all_jobs, conf, %{changesets: changesets, opts: opts}, fn engine ->
      jobs = engine.insert_all_jobs(conf, expand(changesets, %{}), opts)

      notify_trigger(conf, jobs)

      {:meta, jobs, %{jobs: jobs}}
    end)
  end

  @doc false
  def insert_all_jobs(%Config{} = conf, %Multi{} = multi, name, wrapper, opts) do
    Multi.run(multi, name, fn repo, changes ->
      {:ok, insert_all_jobs(%{conf | repo: repo}, expand(wrapper, changes), opts)}
    end)
  end

  @doc false
  def fetch_jobs(%Config{} = conf, %{} = meta, %{} = running) do
    with_span(:fetch_jobs, conf, %{meta: meta, running: running}, fn engine ->
      {:ok, {meta, jobs}} = engine.fetch_jobs(conf, meta, running)
      {:meta, {:ok, {meta, jobs}}, %{jobs: jobs, meta: meta}}
    end)
  end

  @doc false
  def stage_jobs(%Config{} = conf, queryable, opts) do
    conf = with_compatible_engine(conf, :stage_jobs)

    with_span(:stage_jobs, conf, %{opts: opts, queryable: queryable}, fn engine ->
      {:ok, jobs} = engine.stage_jobs(conf, queryable, opts)
      {:meta, {:ok, jobs}, %{jobs: jobs}}
    end)
  end

  @doc false
  def prune_jobs(%Config{} = conf, queryable, opts) do
    conf = with_compatible_engine(conf, :prune_jobs)

    with_span(:prune_jobs, conf, %{opts: opts, queryable: queryable}, fn engine ->
      {:ok, jobs} = engine.prune_jobs(conf, queryable, opts)
      {:meta, {:ok, jobs}, %{jobs: jobs}}
    end)
  end

  @doc false
  def complete_job(%Config{} = conf, %Job{} = job) do
    with_span(:complete_job, conf, %{job: job}, fn engine ->
      engine.complete_job(conf, job)
    end)
  end

  @doc false
  def discard_job(%Config{} = conf, %Job{} = job) do
    with_span(:discard_job, conf, %{job: job}, fn engine ->
      engine.discard_job(conf, job)
    end)
  end

  @doc false
  def error_job(%Config{} = conf, %Job{} = job, seconds) when is_integer(seconds) do
    with_span(:error_job, conf, %{job: job}, fn engine ->
      engine.error_job(conf, job, seconds)
    end)
  end

  @doc false
  def snooze_job(%Config{} = conf, %Job{} = job, seconds) when is_integer(seconds) do
    with_span(:snooze_job, conf, %{job: job}, fn engine ->
      engine.snooze_job(conf, job, seconds)
    end)
  end

  @doc false
  def cancel_job(%Config{} = conf, %Job{} = job) do
    with_span(:cancel_job, conf, %{job: job}, fn engine ->
      engine.cancel_job(conf, job)
    end)
  end

  @doc false
  def cancel_all_jobs(%Config{} = conf, queryable) do
    with_span(:cancel_all_jobs, conf, %{queryable: queryable}, fn engine ->
      with {:ok, jobs} <- engine.cancel_all_jobs(conf, queryable) do
        {:meta, {:ok, jobs}, %{jobs: jobs}}
      end
    end)
  end

  @doc false
  def retry_job(%Config{} = conf, %Job{} = job) do
    with_span(:retry_job, conf, %{job: job}, fn engine ->
      engine.retry_job(conf, job)
    end)
  end

  @doc false
  def retry_all_jobs(%Config{} = conf, queryable) do
    with_span(:retry_all_jobs, conf, %{queryable: queryable}, fn engine ->
      with {:ok, jobs} <- engine.retry_all_jobs(conf, queryable) do
        {:meta, {:ok, jobs}, %{jobs: jobs}}
      end
    end)
  end

  defp expand(fun, changes) when is_function(fun, 1), do: expand(fun.(changes), changes)
  defp expand(%{changesets: changesets}, _), do: expand(changesets, %{})
  defp expand(changesets, _) when is_struct(changesets, Stream), do: changesets
  defp expand(changesets, _) when is_list(changesets), do: changesets

  defp with_span(event, %Config{} = conf, base_meta, fun) do
    base_meta = Map.merge(base_meta, %{conf: conf, engine: conf.engine})

    :telemetry.span([:oban, :engine, event], base_meta, fn ->
      engine = Config.get_engine(conf)

      case fun.(engine) do
        {:meta, result, more_meta} ->
          {result, Map.merge(base_meta, more_meta)}

        result ->
          {result, base_meta}
      end
    end)
  end

  # External engines aren't guaranteed to implement stage_jobs/3 or prune_jobs/3, but we can
  # assume they were built for Postgres and safely fall back to the Basic engine.
  defp with_compatible_engine(%{engine: engine} = conf, function) do
    if function_exported?(engine, function, 3) do
      conf
    else
      %Config{conf | engine: Oban.Engines.Basic}
    end
  end

  defp notify_trigger(%{insert_trigger: true} = conf, jobs) do
    payload = for job <- jobs, job.state == "available", uniq: true, do: %{queue: job.queue}

    unless payload == [], do: Notifier.notify(conf, :insert, payload)
  catch
    # Insert notification timeouts aren't worth failing inserts over.
    :exit, {:timeout, _} -> :ok
  end

  defp notify_trigger(_conf, _queue), do: :skip
end