lib/oban.ex

defmodule Oban do
  @external_resource readme = Path.join([__DIR__, "../README.md"])

  @moduledoc readme
             |> File.read!()
             |> String.split("<!-- MDOC -->")
             |> Enum.fetch!(1)

  @moduledoc since: "0.1.0"

  use Supervisor

  alias Ecto.{Changeset, Multi}
  alias Oban.{Config, Engine, Job, Midwife, Notifier, Peer, Registry, Stager}
  alias Oban.Queue.{Drainer, Producer}

  @typedoc """
  The name of an Oban instance. This is used to identify instances in the internal registry for
  configuration lookup.
  """
  @type name :: term()

  @type queue_name :: atom() | binary()

  @type queue_option ::
          {:queue, queue_name()}
          | {:limit, pos_integer()}
          | {:local_only, boolean()}
          | {:node, String.t()}

  @type queue_state :: %{
          :limit => pos_integer(),
          :node => binary(),
          :paused => boolean(),
          :queue => queue_name(),
          :running => [pos_integer()],
          :started_at => DateTime.t(),
          :updated_at => DateTime.t(),
          optional(atom()) => any()
        }

  @type option ::
          {:dispatch_cooldown, pos_integer()}
          | {:engine, module()}
          | {:get_dynamic_repo, nil | (-> pid() | atom())}
          | {:log, false | Logger.level()}
          | {:name, name()}
          | {:node, String.t()}
          | {:notifier, module() | {module(), Keyword.t()}}
          | {:peer, false | module() | {module(), Keyword.t()}}
          | {:plugins, false | [module() | {module() | Keyword.t()}]}
          | {:prefix, false | String.t()}
          | {:queues, false | [{queue_name(), pos_integer() | Keyword.t()}]}
          | {:repo, module()}
          | {:shutdown_grace_period, timeout()}
          | {:stage_interval, timeout()}
          | {:testing, :disabled | :inline | :manual}

  @type drain_option ::
          {:queue, queue_name()}
          | {:with_limit, pos_integer()}
          | {:with_recursion, boolean()}
          | {:with_safety, boolean()}
          | {:with_scheduled, boolean() | DateTime.t()}

  @type drain_result :: %{
          cancelled: non_neg_integer(),
          discard: non_neg_integer(),
          failure: non_neg_integer(),
          snoozed: non_neg_integer(),
          success: non_neg_integer()
        }

  @type changeset_or_fun :: Job.changeset() | Job.changeset_fun()
  @type changesets_or_wrapper :: Job.changeset_list() | changeset_wrapper()
  @type changesets_or_wrapper_or_fun :: changesets_or_wrapper() | Job.changeset_list_fun()
  @type changeset_wrapper :: %{:changesets => Job.changeset_list(), optional(atom()) => term()}
  @type multi :: Multi.t()
  @type multi_name :: Multi.name()

  defguardp is_changeset_or_fun(cf)
            when is_struct(cf, Changeset) or is_function(cf, 1)

  defguardp is_list_or_wrapper(cw)
            when is_list(cw) or
                   (is_map(cw) and is_map_key(cw, :changesets) and is_list(cw.changesets)) or
                   is_function(cw, 1)

  @doc """
  Creates a facade for `Oban` functions and automates fetching configuration from the application
  environment.

  Facade modules support configuration via the application environment under an OTP application
  key. For example, the facade:

      defmodule MyApp.Oban do
        use Oban, otp_app: MyApp
      end

  Could be configured with:

      config :my_app, Oban, repo: MyApp.Repo

  Then you can include `MyApp.Oban` in your application's supervision tree without passing extra
  options:

      defmodule MyApp.Application do
        use Application

        def start(_type, _args) do
          children = [
            MyApp.Repo,
            MyApp.Oban
          ]

          opts = [strategy: :one_for_one, name: MyApp.Supervisor]
          Supervisor.start_link(children, opts)
        end
      end

  ### Calling Functions

  Facade modules allow you to call `Oban` functions on instances with custom names, e.g. not
  `Oban`, without passing a `t:Oban.name/0` as the first argument.

  For example, rather than calling `Oban.config/1` you'd call `MyOban.config/0`:

      MyOban.config()

  It also makes piping into Oban functions far more convenient: 

      %{some: :args}
      |> MyWorker.new()
      |> MyOban.insert()

  ### Merging Configuration

  All configuration can be provided through the `use` macro or application config, and options
  from the application supersedes those passed through `use`. Configuration is prioritized in
  order:

  1. Options passed through `use`
  2. Options pulled from the OTP app via `Application.get_env/3`
  3. Options passed through a child spec in the supervisor
  """
  defmacro __using__(opts \\ []) do
    {otp_app, child_opts} = Keyword.pop!(opts, :otp_app)

    quote do
      def child_spec(opts) do
        unquote(child_opts)
        |> Keyword.merge(Application.get_env(unquote(otp_app), __MODULE__, []))
        |> Keyword.merge(opts)
        |> Keyword.put(:name, __MODULE__)
        |> Oban.child_spec()
      end

      def cancel_all_jobs(queryable) do
        Oban.cancel_all_jobs(__MODULE__, queryable)
      end

      def cancel_job(job_or_id) do
        Oban.cancel_job(__MODULE__, job_or_id)
      end

      def check_queue(opts) do
        Oban.check_queue(__MODULE__, opts)
      end

      def config do
        Oban.config(__MODULE__)
      end

      def drain_queue(opts) do
        Oban.drain_queue(__MODULE__, opts)
      end

      def insert(changeset, opts \\ []) do
        Oban.insert(__MODULE__, changeset, opts)
      end

      def insert(multi, multi_name, changeset, opts \\ []) do
        Oban.insert(__MODULE__, multi, multi_name, changeset, opts)
      end

      def insert!(changeset, opts \\ []) do
        Oban.insert!(__MODULE__, changeset, opts)
      end

      def insert_all(changesets, opts) do
        Oban.insert_all(__MODULE__, changesets, opts)
      end

      def insert_all(multi, multi_name, changesets, opts) do
        Oban.insert_all(__MODULE__, multi, multi_name, changesets, opts)
      end

      def start_queue(opts) do
        Oban.start_queue(__MODULE__, opts)
      end

      def pause_queue(opts) do
        Oban.pause_queue(__MODULE__, opts)
      end

      def pause_all_queues(opts) do
        Oban.pause_all_queues(__MODULE__, opts)
      end

      def resume_queue(opts) do
        Oban.resume_queue(__MODULE__, opts)
      end

      def resume_all_queues(opts) do
        Oban.resume_all_queues(__MODULE__, opts)
      end

      def scale_queue(opts) do
        Oban.scale_queue(__MODULE__, opts)
      end

      def stop_queue(opts) do
        Oban.stop_queue(__MODULE__, opts)
      end

      def retry_job(job_or_id) do
        Oban.retry_job(__MODULE__, job_or_id)
      end

      def retry_all_jobs(queryable) do
        Oban.retry_all_jobs(__MODULE__, queryable)
      end
    end
  end

  @doc """
  Starts an `Oban` supervision tree linked to the current process.

  ## Options

  These options are required; without them the supervisor won't start:

  * `:repo` — specifies the Ecto repo used to insert and retrieve jobs

  ### Primary Options

  These options determine what the system does at a high level, i.e. which queues to run:

  * `:engine` — facilitates inserting, fetching, and otherwise managing jobs.

    There are three built-in engines: `Oban.Engines.Basic` for Postgres databases,
    `Oban.Engines.Lite` for SQLite3 databases, and `Oban.Engines.Inline` for simplified testing
    (only available for `:inline` testing mode).

    When the `Oban.Engines.Lite` engine is used the `:notifier` and `:peer` are automatically set
    to `PG` and `isolated` mode, respectively.

    Additional engines, such as Oban Pro's `SmartEngine` with advanced functionality for Postgres,
    are also available as an add-on.

    Defaults to the `Basic` engine for Postgres.

  * `:log` — either `false` to disable logging or a standard log level (`:error`, `:warning`,
    `:info`, `:debug`, etc.). This determines whether queries are logged or not; overriding the
    repo's configured log level. Defaults to `false`, where no queries are logged.

  * `:name` — used for supervisor registration, it must be unique across an entire VM instance.
    Defaults to `Oban` when no name is provided.

  * `:node` — used to identify the node that the supervision tree is running in. If no value is
    provided it will use the `node` name in a distributed system, or the `hostname` in an isolated
    node. See "Node Name" below.

  * `:notifier` — used to relay messages between processes, nodes, and the Postgres database.

    There are two built-in notifiers: `Oban.Notifiers.Postgres`, which uses Postgres PubSub; and
    `Oban.Notifiers.PG`, which uses process groups with distributed erlang. Defaults to the
    Postgres notifier.

  * `:peer` — used to specify which peer module to use for cluster leadership.

    There are two built-in peers: `Oban.Peers.Postgres`, which uses table-based leadership through
    the `oban_peers` table; and `Oban.Peers.Global`, which uses global locks through distributed Erlang.

    Leadership can be disabled by setting `peer: false`, but note that centralized plugins like
    `Cron` won't run without leadership.

    Defaults to the `Postgres` peer.

  * `:plugins` — a list or modules or module/option tuples that are started as children of an Oban
    supervisor. Any supervisable module is a valid plugin, i.e. a `GenServer` or an `Agent`.

  * `:prefix` — the query prefix, or schema, to use for inserting and executing jobs. An
    `oban_jobs` table must exist within the prefix. See the "Prefix Support" section in the module
    documentation for more details.

  * `:queues` — a keyword list where the keys are queue names and the values are the concurrency
    setting or a keyword list of queue options. For example, setting queues to `[default: 10,
    exports: 5]` would start the queues `default` and `exports` with a combined concurrency level
    of 15. The concurrency setting specifies how many jobs _each queue_ will run concurrently.

    Queues accept additional override options to customize their behavior, e.g. by setting
    `paused` or `dispatch_cooldown` for a specific queue.

  * `:testing` — a mode that controls how an instance is configured for testing. When set to
    `:inline` or `:manual` queues, peers, and plugins are automatically disabled. Defaults to
    `:disabled`, no test mode.

  ### Twiddly Options

  Additional options used to tune system behaviour. These are primarily useful for testing or
  troubleshooting and don't usually need modification.

  * `:dispatch_cooldown` — the minimum number of milliseconds a producer will wait before fetching
    and running more jobs. A slight cooldown period prevents a producer from flooding with
    messages and thrashing the database. The cooldown period _directly impacts_ a producer's
    throughput: jobs per second for a single queue is calculated by `(1000 / cooldown) * limit`.
    For example, with a `5ms` cooldown and a queue limit of `25` a single queue can run 5,000
    jobs/sec.

    The default is `5ms` and the minimum is `1ms`, which is likely faster than the database can
    return new jobs to run.

  * `:insert_trigger` — whether to dispatch notifications to relevant queues as jobs are inserted
    into the database. At high load, e.g. thousands or more job inserts per second, notifications
    may become a bottleneck.

    The trigger mechanism is designed to make jobs execute immediately after insert, rather than
    up to `:stage_interval` (1 second) afterwards, and it can safely be disabled to improve insert
    throughput.

    Defaults to `true`, with triggering enabled.

  * `:shutdown_grace_period` — the amount of time a queue will wait for executing jobs to complete
    before hard shutdown, specified in milliseconds. The default is `15_000`, or 15 seconds.

  * `:stage_interval` — the number of milliseconds between making scheduled jobs available and
    notifying relevant queues that jobs are available. This is directly tied to the resolution of
    `scheduled` or `retryable` jobs and how frequently the database is checked for jobs to run. To
    minimize database load, only `5_000` jobs are staged at each interval.

    Only the leader node stages jobs and notifies queues when the `:notifier's` pubsub
    notifications are functional. If pubusb messages can't get through then staging switches to a
    less efficient "local" mode in which all nodes poll for jobs to run.

    Setting the interval to `:infinity` disables staging entirely. The default is `1_000ms`.

  ## Example

  Start a stand-alone `Oban` instance:

      {:ok, pid} = Oban.start_link(repo: MyApp.Repo, queues: [default: 10])

  To start an `Oban` instance within an application's supervision tree:

      def start(_type, _args) do
        children = [MyApp.Repo, {Oban, repo: MyApp.Repo, queues: [default: 10]}]

        Supervisor.start_link(children, strategy: :one_for_one, name: MyApp.Supervisor)
      end

  Start multiple, named `Oban` supervisors within a supervision tree:

        children = [
          MyApp.Repo,
          {Oban, name: Oban.A, repo: MyApp.Repo, queues: [default: 10]},
          {Oban, name: Oban.B, repo: MyApp.Repo, queues: [special: 10]},
        ]

        Supervisor.start_link(children, strategy: :one_for_one, name: MyApp.Supervisor)

  Start a local `Oban` instance for SQLite:

      {:ok, pid} = Oban.start_link(engine: Oban.Engines.Lite, repo: MyApp.Repo)

  ## Node Name

  When the `node` value hasn't been configured it is generated based on the environment:

  * In a distributed system the node name is used
  * In a Heroku environment the system environment's `DYNO` value is used
  * Otherwise, the system hostname is used
  """
  @doc since: "0.1.0"
  @spec start_link([option()]) :: Supervisor.on_start()
  def start_link(opts) when is_list(opts) do
    conf = Config.new(opts)

    Supervisor.start_link(__MODULE__, conf, name: Registry.via(conf.name, nil, conf))
  end

  @doc false
  @spec child_spec([option]) :: Supervisor.child_spec()
  def child_spec(opts) do
    opts
    |> super()
    |> Supervisor.child_spec(id: Keyword.get(opts, :name, __MODULE__))
  end

  @doc """
  Returns the pid of the root Oban process for the given name.

  ## Example

  Find the default instance:

      Oban.whereis(Oban)

  Find a dynamically named instance:

      Oban.whereis({:oban, 1})
  """
  @doc since: "2.2.0"
  @spec whereis(name()) :: pid() | {atom(), node()} | nil
  def whereis(name), do: Registry.whereis(name)

  @impl Supervisor
  def init(%Config{plugins: plugins} = conf) do
    children = [
      {Notifier, conf: conf, name: Registry.via(conf.name, Notifier)},
      {DynamicSupervisor, name: Registry.via(conf.name, Foreman), strategy: :one_for_one},
      {Peer, conf: conf, name: Registry.via(conf.name, Peer)},
      {Midwife, conf: conf, name: Registry.via(conf.name, Midwife)},
      {Stager, conf: conf, name: Registry.via(conf.name, Stager)}
    ]

    children = children ++ Enum.map(plugins, &plugin_child_spec(&1, conf))
    children = children ++ event_child_spec(conf)

    Supervisor.init(children, strategy: :one_for_one)
  end

  @doc """
  Retrieve the `Oban.Config` struct for a named Oban supervision tree.

  ## Example

  Retrieve the default `Oban` instance config:

      %Oban.Config{} = Oban.config()

  Retrieve the config for an instance started with a custom name:

      %Oban.Config{} = Oban.config(MyCustomOban)
  """
  @doc since: "0.2.0"
  @spec config(name()) :: Config.t()
  def config(name \\ __MODULE__), do: Registry.config(name)

  @doc false
  def insert(%Changeset{} = changeset, opts) do
    insert(__MODULE__, changeset, opts)
  end

  @doc """
  Insert a new job into the database for execution.

  This and the other `insert` variants are the recommended way to enqueue jobs because they
  support features like unique jobs.

  See the section on "Unique Jobs" for more details.

  ## Example

  Insert a single job:

      {:ok, job} = Oban.insert(MyApp.Worker.new(%{id: 1}))

  Insert a job while ensuring that it is unique within the past 30 seconds:

      {:ok, job} = Oban.insert(MyApp.Worker.new(%{id: 1}, unique: [period: 30]))

  Insert a job using a custom timeout:

      {:ok, job} = Oban.insert(MyApp.Worker.new(%{id: 1}), timeout: 10_000)

  Insert a job using an alternative instance name:

      {:ok, job} = Oban.insert(MyOban, MyApp.Worker.new(%{id: 1}))
  """
  @doc since: "0.7.0"
  @spec insert(name(), Job.changeset(), Keyword.t()) ::
          {:ok, Job.t()} | {:error, Job.changeset() | term()}
  def insert(name \\ __MODULE__, changeset, opts \\ [])

  def insert(name, %Changeset{} = changeset, opts) do
    name
    |> config()
    |> Engine.insert_job(changeset, opts)
  end

  @spec insert(multi(), multi_name(), changeset_or_fun()) :: multi()
  def insert(%Multi{} = multi, multi_name, changeset) when is_changeset_or_fun(changeset) do
    insert(__MODULE__, multi, multi_name, changeset, [])
  end

  @doc false
  def insert(%Multi{} = multi, multi_name, changeset, opts) when is_changeset_or_fun(changeset) do
    insert(__MODULE__, multi, multi_name, changeset, opts)
  end

  @doc false
  def insert(name, multi, multi_name, changeset) when is_changeset_or_fun(changeset) do
    insert(name, multi, multi_name, changeset, [])
  end

  @doc """
  Put a job insert operation into an `Ecto.Multi`.

  Like `insert/2`, this variant is recommended over `Ecto.Multi.insert` because it supports all of
  Oban's features, i.e. unique jobs.

  See the section on "Unique Jobs" for more details.

  ## Example

      Ecto.Multi.new()
      |> Oban.insert("job-1", MyApp.Worker.new(%{id: 1}))
      |> Oban.insert("job-2", fn _ -> MyApp.Worker.new(%{id: 2}) end)
      |> MyApp.Repo.transaction()
  """
  @doc since: "0.7.0"
  @spec insert(name, multi(), multi_name(), changeset_or_fun(), Keyword.t()) :: multi()
  def insert(name, multi, multi_name, changeset, opts)
      when is_changeset_or_fun(changeset) and is_list(opts) do
    name
    |> config()
    |> Engine.insert_job(multi, multi_name, changeset, opts)
  end

  @doc false
  @spec insert!(Job.changeset(), opts :: Keyword.t()) :: Job.t()
  def insert!(%Changeset{} = changeset, opts) do
    insert!(__MODULE__, changeset, opts)
  end

  @doc """
  Similar to `insert/3`, but raises an `Ecto.InvalidChangesetError` if the job can't be inserted.

  ## Example

  Insert a single job:

      job = Oban.insert!(MyApp.Worker.new(%{id: 1}))

  Insert a job using a custom timeout:

      job = Oban.insert!(MyApp.Worker.new(%{id: 1}), timeout: 10_000)

  Insert a job using an alternative instance name:

      job = Oban.insert!(MyOban, MyApp.Worker.new(%{id: 1}))
  """
  @doc since: "0.7.0"
  @spec insert!(name(), Job.changeset()) :: Job.t()
  @spec insert!(name(), Job.changeset(), opts :: Keyword.t()) :: Job.t()
  def insert!(name \\ __MODULE__, %Changeset{} = changeset, opts \\ []) do
    case insert(name, changeset, opts) do
      {:ok, job} ->
        job

      {:error, %Changeset{} = changeset} ->
        raise Ecto.InvalidChangesetError, action: :insert, changeset: changeset

      {:error, reason} ->
        raise RuntimeError, inspect(reason)
    end
  end

  @doc false
  def insert_all(changesets, opts) when is_list_or_wrapper(changesets) do
    insert_all(__MODULE__, changesets, opts)
  end

  @doc """
  Insert multiple jobs into the database for execution.

  There are a few important differences between this function and `c:Ecto.Repo.insert_all/3`:

  1. This function always returns a list rather than a tuple of `{count, records}`
  2. This function accepts a list of changesets rather than a list of maps or keyword lists

  #### Error Handling and Rollbacks

  If `insert_all` encounters an issue, the function will raise an error based on your database
  adapter. This behavior is valuable in conjunction with `c:Ecto.Repo.transaction/2` because it
  allows for rollbacks.

  For example, an invalid changeset raises:

  `* (Ecto.InvalidChangesetError) could not perform insert because changeset is invalid.`

  > #### 🌟 Unique Jobs and Batching {: .warning}
  >
  > Only the [Smart Engine](https://getoban.pro/docs/pro/Oban.Pro.Engines.Smart.html) in [Oban
  > Pro](https://getoban.pro) supports bulk unique jobs and automatic batching. With the basic
  > engine, you must use `insert/3` for unique support.

  ## Options

  Accepts any of Ecto's "Shared Options" such as `timeout` and `log`.

  ## Example

  Insert 100 jobs with a single operation:

      1..100
      |> Enum.map(&MyApp.Worker.new(%{id: &1}))
      |> Oban.insert_all()

  Insert with a custom timeout:

      1..100
      |> Enum.map(&MyApp.Worker.new(%{id: &1}))
      |> Oban.insert_all(timeout: 10_000)

  Insert with an alternative instance name:

      changesets = Enum.map(1..100, &MyApp.Worker.new(%{id: &1}))
      jobs = Oban.insert_all(MyOban, changesets)
  """
  @doc since: "0.9.0"
  @spec insert_all(
          name() | multi(),
          changesets_or_wrapper() | multi_name(),
          Keyword.t() | changesets_or_wrapper_or_fun()
        ) :: [Job.t()] | multi()
  def insert_all(name \\ __MODULE__, changesets, opts \\ [])

  def insert_all(name, changesets, opts) when is_list_or_wrapper(changesets) and is_list(opts) do
    name
    |> config()
    |> Engine.insert_all_jobs(changesets, opts)
  end

  def insert_all(%Multi{} = multi, multi_name, changesets) when is_list_or_wrapper(changesets) do
    insert_all(__MODULE__, multi, multi_name, changesets, [])
  end

  @doc false
  def insert_all(%Multi{} = multi, multi_name, changesets, opts)
      when is_list_or_wrapper(changesets) do
    insert_all(__MODULE__, multi, multi_name, changesets, opts)
  end

  @doc false
  def insert_all(name, multi, multi_name, changesets) when is_list_or_wrapper(changesets) do
    insert_all(name, multi, multi_name, changesets, [])
  end

  @doc """
  Put an `insert_all` operation into an `Ecto.Multi`.

  This function supports the same features and has the same caveats as `insert_all/2`.

  ## Example

  Insert job changesets within a multi:

      changesets = Enum.map(0..100, &MyApp.Worker.new(%{id: &1}))

      Ecto.Multi.new()
      |> Oban.insert_all(:jobs, changesets)
      |> MyApp.Repo.transaction()

  Insert job changesets using a function:

      Ecto.Multi.new()
      |> Ecto.Multi.insert(:user, user_changeset)
      |> Oban.insert_all(:jobs, fn %{user: user} ->
        email_job = EmailWorker.new(%{id: user.id})
        staff_job = StaffWorker.new(%{id: user.id})
        [email_job, staff_job]
      end)
      |> MyApp.Repo.transaction()
  """
  @doc since: "0.9.0"
  @spec insert_all(name(), multi(), multi_name(), changesets_or_wrapper_or_fun(), Keyword.t()) ::
          multi()
  def insert_all(name, multi, multi_name, changesets, opts)
      when is_list_or_wrapper(changesets) and is_list(opts) do
    name
    |> config()
    |> Engine.insert_all_jobs(multi, multi_name, changesets, opts)
  end

  @doc """
  Synchronously execute all available jobs in a queue.

  All execution happens within the current process and it is guaranteed not to raise an error or
  exit.

  Draining a queue from within the current process is especially useful for testing. Jobs that are
  enqueued by a process when `Ecto` is in sandbox mode are only visible to that process. Calling
  `drain_queue/2` allows you to control when the jobs are executed and to wait synchronously for
  all jobs to complete.

  ## Failures & Retries

  Draining a queue uses the same execution mechanism as regular job dispatch. That means that any
  job failures or crashes are captured and result in a retry. Retries are scheduled in the future
  with backoff and won't be retried immediately.

  By default jobs are executed in `safe` mode, just as they are in production. Safe mode catches
  any errors or exits and records the formatted error in the job's `errors` array.  That means
  exceptions and crashes are _not_ bubbled up to the calling process.

  If you expect jobs to fail, would like to track failures, or need to check for specific errors
  you can pass the `with_safety: false` flag. See the "Options" section below for more details.

  ## Scheduled Jobs

  By default, `drain_queue/2` will execute all currently available jobs. In order to execute
  scheduled jobs, you may pass the `with_scheduled: true` which will cause all scheduled jobs to
  be marked as `available` beforehand. To run jobs scheduled up to a specific point in time, pass
  a `DateTime` instead.

  ## Options

  * `:queue` - a string or atom specifying the queue to drain, required

  * `:with_limit` — the maximum number of jobs to drain at once. When recursion is enabled this is
    how many jobs are processed per-iteration.

  * `:with_recursion` — whether to keep draining a queue repeatedly when jobs insert _more_ jobs

  * `:with_safety` — whether to silently catch errors when draining, defaults to `true`. When
    `false`, raised exceptions or unhandled exits are reraised (unhandled exits are wrapped in
    `Oban.CrashError`).

  * `:with_scheduled` — whether to include any scheduled jobs when draining, default `false`.
     When `true`, drains all scheduled jobs. When a `DateTime` is provided, drains all jobs
     scheduled up to, and including, that point in time.

  ## Example

  Drain a queue with three available jobs, two of which succeed and one of which fails:

      Oban.drain_queue(queue: :default)
      %{failure: 1, snoozed: 0, success: 2}

  Drain a queue including any scheduled jobs:

      Oban.drain_queue(queue: :default, with_scheduled: true)
      %{failure: 0, snoozed: 0, success: 1}

  Drain a queue including jobs scheduled up to a minute:

      Oban.drain_queue(queue: :default, with_scheduled: DateTime.add(DateTime.utc_now(), 60, :second))

  Drain a queue and assert an error is raised:

      assert_raise RuntimeError, fn -> Oban.drain_queue(queue: :risky, with_safety: false) end

  Drain a queue repeatedly until there aren't any more jobs to run. This is particularly useful
  for testing jobs that enqueue other jobs:

      Oban.drain_queue(queue: :default, with_recursion: true)
      %{failure: 1, snoozed: 0, success: 2}

  Drain only the top (by scheduled time and priority) five jobs off a queue:

      Oban.drain_queue(queue: :default, with_limit: 5)
      %{failure: 0, snoozed: 0, success: 1}

  Drain a queue recursively, only one job at a time:

      Oban.drain_queue(queue: :default, with_limit: 1, with_recursion: true)
      %{failure: 0, snoozed: 0, success: 3}
  """
  @doc since: "0.4.0"
  @spec drain_queue(name(), [drain_option()]) :: drain_result()
  def drain_queue(name \\ __MODULE__, [_ | _] = opts) do
    name
    |> config()
    |> Drainer.drain(opts)
  end

  @doc """
  Start a new supervised queue.

  By default this starts a new supervised queue across all nodes running Oban on the same database
  and prefix. You can pass the option `local_only: true` if you prefer to start the queue only on
  the local node.

  ## Options

  * `:queue` - a string or atom specifying the queue to start, required
  * `:local_only` - whether the queue will be started only on the local node, default: `false`
  * `:limit` - set the concurrency limit, required
  * `:paused` — set whether the queue starts in the "paused" state, optional

  In addition, all engine-specific queue options are passed along after validation.

  ## Example

  Start the `:priority` queue with a concurrency limit of 10 across the connected nodes.

      Oban.start_queue(queue: :priority, limit: 10)
      :ok

  Start the `:media` queue with a concurrency limit of 5 only on the local node.

      Oban.start_queue(queue: :media, limit: 5, local_only: true)
      :ok

  Start the `:media` queue on a particular node.

      Oban.start_queue(queue: :media, limit: 5, node: "worker.1")
      :ok

  Start the `:media` queue in a `paused` state.

      Oban.start_queue(queue: :media, limit: 5, paused: true)
      :ok
  """
  @doc since: "0.12.0"
  @spec start_queue(name(), opts :: Keyword.t()) :: :ok
  def start_queue(name \\ __MODULE__, [_ | _] = opts) do
    conf = config(name)

    validate_queue_opts!(opts, [:queue, :local_only])
    validate_engine_meta!(conf, opts)

    data =
      opts
      |> Map.new()
      |> Map.put(:action, :start)
      |> Map.put(:ident, scope_signal(conf, opts))

    Notifier.notify(conf, :signal, data)
  end

  @doc """
  Pause a running queue, preventing it from executing any new jobs. All running jobs will remain
  running until they are finished.

  When shutdown begins all queues are paused.

  ## Options

  * `:queue` - a string or atom specifying the queue to pause, required

  * `:local_only` - whether the queue will be paused only on the local node, default: `false`

  * `:node` - restrict pausing to a particular node

  Note: by default, Oban does not verify that the given queue exists unless `:local_only`
  is set to `true` as even if the queue does not exist locally, it might be running on
  another node.

  ## Example

  Pause the default queue:

      Oban.pause_queue(queue: :default)
      :ok

  Pause the default queue, but only on the local node:

      Oban.pause_queue(queue: :default, local_only: true)
      :ok

  Pause the default queue only on a particular node:

      Oban.pause_queue(queue: :default, node: "worker.1")
      :ok
  """
  @doc since: "0.2.0"
  @spec pause_queue(name(), opts :: [queue_option()]) :: :ok
  def pause_queue(name \\ __MODULE__, [_ | _] = opts) do
    validate_queue_opts!(opts, [:queue, :local_only, :node])
    validate_queue_exists!(name, opts)

    conf = config(name)
    data = %{action: :pause, queue: opts[:queue], ident: scope_signal(conf, opts)}

    Notifier.notify(conf, :signal, data)
  end

  @doc """
  Pause all running queues to prevent them from executing any new jobs.

  See `pause_queue/2` for options and details.

  ## Example

  Pause all queues:

      Oban.pause_all_queues()

  Pause all queues on the local node:

      Oban.pause_all_queues(local_only: true)

  Pause all queues on a specific node:

      Oban.pause_all_queues(node: "worker.1")
  """
  @doc since: "2.17.0"
  @spec pause_all_queues(name(), opts :: [local_only: boolean(), node: String.t()]) :: :ok
  def pause_all_queues(name \\ __MODULE__, opts \\ []) do
    pause_queue(name, Keyword.put(opts, :queue, :*))
  end

  @doc """
  Resume executing jobs in a paused queue.

  ## Options

  * `:queue` - a string or atom specifying the queue to resume, required

  * `:local_only` - whether the queue will be resumed only on the local node, default: `false`

  * `:node` - restrict resuming to a particular node

  Note: by default, Oban does not verify that the given queue exists unless `:local_only`
  is set to `true` as even if the queue does not exist locally, it might be running on
  another node.

  ## Example

  Resume a paused default queue:

      Oban.resume_queue(queue: :default)
      :ok

  Resume the default queue, but only on the local node:

      Oban.resume_queue(queue: :default, local_only: true)
      :ok

  Resume the default queue only on a particular node:

      Oban.resume_queue(queue: :default, node: "worker.1")
      :ok
  """
  @doc since: "0.2.0"
  @spec resume_queue(name(), opts :: [queue_option()]) :: :ok
  def resume_queue(name \\ __MODULE__, [_ | _] = opts) do
    validate_queue_opts!(opts, [:queue, :local_only, :node])
    validate_queue_exists!(name, opts)

    conf = config(name)
    data = %{action: :resume, queue: opts[:queue], ident: scope_signal(conf, opts)}

    Notifier.notify(conf, :signal, data)
  end

  @doc """
  Resume executing jobs in all paused queues.

  See `resume_queue/2` for options and details.

  ## Example

  Resume all queues:

      Oban.resume_all_queues()

  Resume all queues on the local node:

      Oban.resume_all_queues(local_only: true)

  Resume all queues on a specific node:

      Oban.resume_all_queues(node: "worker.1")
  """
  @doc since: "2.17.0"
  @spec resume_all_queues(name(), opts :: [local_only: boolean(), node: String.t()]) :: :ok
  def resume_all_queues(name \\ __MODULE__, opts \\ []) do
    resume_queue(name, Keyword.put(opts, :queue, :*))
  end

  @doc """
  Scale the concurrency for a queue.

  ## Options

  * `:queue` - a string or atom specifying the queue to scale, required

  * `:limit` — the new concurrency limit, required

  * `:local_only` — whether the queue will be scaled only on the local node, default: `false`

  * `:node` - restrict scaling to a particular node

  In addition, all engine-specific queue options are passed along after validation.

  Note: by default, Oban does not verify that the given queue exists unless `:local_only`
  is set to `true` as even if the queue does not exist locally, it might be running on
  another node.

  ## Example

  Scale a queue up, triggering immediate execution of queued jobs:

      Oban.scale_queue(queue: :default, limit: 50)
      :ok

  Scale the queue back down, allowing executing jobs to finish:

      Oban.scale_queue(queue: :default, limit: 5)
      :ok

  Scale the queue only on the local node:

      Oban.scale_queue(queue: :default, limit: 10, local_only: true)
      :ok

  Scale the queue on a particular node:

      Oban.scale_queue(queue: :default, limit: 10, node: "worker.1")
      :ok
  """
  @doc since: "0.2.0"
  @spec scale_queue(name(), opts :: [queue_option()]) :: :ok
  def scale_queue(name \\ __MODULE__, [_ | _] = opts) do
    conf = config(name)

    validate_queue_opts!(opts, [:queue, :local_only, :node])
    validate_engine_meta!(conf, opts)
    validate_queue_exists!(name, opts)

    data =
      opts
      |> Map.new()
      |> Map.put(:action, :scale)
      |> Map.put(:ident, scope_signal(conf, opts))

    Notifier.notify(conf, :signal, data)
  end

  @doc """
  Shutdown a queue's supervision tree and stop running jobs for that queue.

  By default this action will occur across all the running nodes. Still, if you prefer to stop the
  queue's supervision tree and stop running jobs for that queue only on the local node, you can
  pass the option: `local_only: true`

  The shutdown process pauses the queue first and allows current jobs to exit gracefully, provided
  they finish within the shutdown limit.

  Note: by default, Oban does not verify that the given queue exists unless `:local_only`
  is set to `true` as even if the queue does not exist locally, it might be running on
  another node.

  ## Options

  * `:queue` - a string or atom specifying the queue to stop, required

  * `:local_only` - whether the queue will be stopped only on the local node, default: `false`

  * `:node` - restrict stopping to a particular node

  ## Example

  Stop a running queue on all nodes:

      Oban.stop_queue(queue: :default)
      :ok

  Stop the queue only on the local node:

      Oban.stop_queue(queue: :media, local_only: true)
      :ok

  Stop the queue only on a particular node:

      Oban.stop_queue(queue: :media, node: "worker.1")
      :ok
  """
  @doc since: "0.12.0"
  @spec stop_queue(name(), opts :: [queue_option()]) :: :ok
  def stop_queue(name \\ __MODULE__, [_ | _] = opts) do
    validate_queue_opts!(opts, [:queue, :local_only, :node])
    validate_queue_exists!(name, opts)

    conf = config(name)
    data = %{action: :stop, queue: opts[:queue], ident: scope_signal(conf, opts)}

    Notifier.notify(conf, :signal, data)
  end

  @doc """
  Check the current state of a queue producer.

  This allows you to introspect on a queue's health by retrieving key attributes of the producer's
  state; values such as the current `limit`, the `running` job ids, and when the producer was
  started.

  ## Options

  * `:queue` - a string or atom specifying the queue to check, required

  ## Example

      Oban.check_queue(queue: :default)
      %{
        limit: 10,
        node: "me@local",
        paused: false,
        queue: "default",
        running: [100, 102],
        started_at: ~D[2020-10-07 15:31:00],
        updated_at: ~D[2020-10-07 15:31:00]
      }
  """
  @doc since: "2.2.0"
  @spec check_queue(name(), opts :: [{:queue, queue_name()}]) :: queue_state()
  def check_queue(name \\ __MODULE__, [_ | _] = opts) do
    validate_queue_opts!(opts, [:queue])
    validate_queue_exists!(name, opts)

    name
    |> Registry.via({:producer, to_string(opts[:queue])})
    |> Producer.check()
  end

  @doc """
  Sets a job as `available`, adding attempts if already maxed out. Jobs currently `available`,
  `executing` or `scheduled` are ignored. The job is scheduled for immediate execution.

  ## Example

  Retry a job:

      Oban.retry_job(job)
      :ok
  """
  @doc since: "2.2.0"
  @spec retry_job(name(), job_or_id :: Job.t() | integer()) :: :ok
  def retry_job(name \\ __MODULE__, job_or_id) do
    conf = config(name)

    if is_integer(job_or_id) do
      Engine.retry_job(conf, %Job{id: job_or_id})
    else
      Engine.retry_job(conf, job_or_id)
    end
  end

  @doc """
  Retries all jobs that match on the given queryable.

  If no queryable is given, Oban will retry all jobs that aren't currently `available` or
  `executing`. Note that regardless of constraints, it will never retry `available` or
  `executing` jobs.

  ## Example

  Retries jobs in _any state_ other than `available` or `executing`:

      Oban.retry_all_jobs(Oban.Job)
      {:ok, 9}

  Retries jobs with the `retryable` state:

      Oban.Job
      |> Ecto.Query.where(state: "retryable")
      |> Oban.retry_all_jobs()
      {:ok, 3}

  Retries all inactive jobs with priority 0

      Oban.Job
      |> Ecto.Query.where(priority: 0)
      |> Oban.retry_all_jobs()
      {:ok, 5}
  """
  @doc since: "2.9.0"
  @spec retry_all_jobs(name(), queryable :: Ecto.Queryable.t()) :: {:ok, non_neg_integer()}
  def retry_all_jobs(name \\ __MODULE__, queryable) do
    conf = config(name)

    {:ok, jobs} = Engine.retry_all_jobs(conf, queryable)

    {:ok, length(jobs)}
  end

  @doc """
  Cancel an `executing`, `available`, `scheduled` or `retryable` job and mark it as `cancelled` to
  prevent it from running. If the job is currently `executing` it will be killed and otherwise it
  is ignored.

  If an executing job happens to fail before it can be cancelled the state is set to `cancelled`.
  However, if it manages to complete successfully then the state will still be `completed`.

  ## Example

  Cancel a job:

      Oban.cancel_job(job)
      :ok
  """
  @doc since: "1.3.0"
  @spec cancel_job(name(), job_or_id :: Job.t() | integer()) :: :ok
  def cancel_job(name \\ __MODULE__, job_or_id)

  def cancel_job(name, %Job{id: job_id}), do: cancel_job(name, job_id)

  def cancel_job(name, job_id) when is_integer(job_id) do
    import Ecto.Query, only: [where: 2]

    with {:ok, _count} <- cancel_all_jobs(name, where(Job, id: ^job_id)), do: :ok
  end

  @doc """
  Cancel many jobs based on a queryable and mark them as `cancelled` to prevent them from running.
  Any currently `executing` jobs are killed while the others are ignored.

  If executing jobs happen to fail before cancellation then the state is set to `cancelled`.
  However, any that complete successfully will remain `completed`.

  Only jobs with the statuses `executing`, `available`, `scheduled`, or `retryable` can be cancelled.

  ## Example

  Cancel all jobs:

      Oban.cancel_all_jobs(Oban.Job)
      {:ok, 9}

  Cancel all jobs for a specific worker:

      Oban.Job
      |> Ecto.Query.where(worker: "MyApp.MyWorker")
      |> Oban.cancel_all_jobs()
      {:ok, 2}
  """
  @doc since: "2.9.0"
  @spec cancel_all_jobs(name(), queryable :: Ecto.Queryable.t()) :: {:ok, non_neg_integer()}
  def cancel_all_jobs(name \\ __MODULE__, queryable) do
    conf = config(name)

    {:ok, cancelled_jobs} = Engine.cancel_all_jobs(conf, queryable)

    payload =
      for %{id: id, state: "executing"} <- cancelled_jobs do
        %{action: :pkill, job_id: id}
      end

    Notifier.notify(conf, :signal, payload)

    {:ok, length(cancelled_jobs)}
  end

  ## Child Spec Helpers

  defp plugin_child_spec({module, opts}, conf) do
    name = Registry.via(conf.name, {:plugin, module})
    opts = Keyword.merge(opts, conf: conf, name: name)

    Supervisor.child_spec({module, opts}, id: {:plugin, module})
  end

  defp event_child_spec(conf) do
    time = %{system_time: System.system_time()}
    meta = %{pid: self(), conf: conf}

    [Task.child_spec(fn -> :telemetry.execute([:oban, :supervisor, :init], time, meta) end)]
  end

  ## Signal Helper

  defp scope_signal(conf, opts) do
    cond do
      Keyword.get(opts, :local_only) ->
        Config.to_ident(conf)

      Keyword.has_key?(opts, :node) ->
        Config.to_ident(%{conf | node: opts[:node]})

      true ->
        :any
    end
  end

  ## Validation Helpers

  defp validate_queue_opts!(opts, expected) when is_list(opts) do
    unless Keyword.has_key?(opts, :queue) do
      raise ArgumentError, "required option :queue is missing from #{inspect(opts)}"
    end

    opts
    |> Keyword.take(expected)
    |> Enum.each(&validate_queue_opts!/1)
  end

  defp validate_queue_opts!({:queue, queue}) do
    unless (is_atom(queue) and not is_nil(queue)) or (is_binary(queue) and byte_size(queue) > 0) do
      raise ArgumentError,
            "expected :queue to be a binary or atom (except `nil`), got: #{inspect(queue)}"
    end
  end

  defp validate_queue_opts!({:local_only, local_only}) do
    unless is_boolean(local_only) do
      raise ArgumentError, "expected :local_only to be a boolean, got: #{inspect(local_only)}"
    end
  end

  defp validate_queue_opts!({:node, node}) do
    unless (is_atom(node) and not is_nil(node)) or (is_binary(node) and byte_size(node) > 0) do
      raise ArgumentError,
            "expected :node to be a binary or atom (except `nil`), got: #{inspect(node)}"
    end
  end

  defp validate_queue_opts!(option) do
    raise ArgumentError, "unknown option provided #{inspect(option)}"
  end

  defp validate_engine_meta!(conf, opts) do
    opts =
      opts
      |> Keyword.drop([:local_only])
      |> Keyword.put(:validate, true)

    with {:error, error} <- conf.engine.init(conf, opts), do: raise(error)
  end

  defp validate_queue_exists!(name, opts) do
    queue = opts[:queue]
    local_only = opts[:local_only]

    if local_only && is_nil(Registry.whereis(name, {:producer, to_string(queue)})) do
      raise ArgumentError, "queue #{inspect(queue)} does not exist locally"
    end
  end
end