defmodule Oban.Worker do
@moduledoc """
Defines a behavior and macro to guide the creation of worker modules.
Worker modules do the work of processing a job. At a minimum they must define a `c:perform/1`
function, which is called with the full `Oban.Job` struct.
## Defining Workers
Worker modules are defined by using `Oban.Worker`. A bare `use Oban.Worker` invocation sets a
worker with these defaults:
* `:max_attempts` — 20
* `:priority` — 0
* `:queue` — `:default`
* `:unique` — no uniqueness set
The following example defines a worker module to process jobs in the `events` queue. It then
dials down the priority from 0 to 1, limits retrying on failures to 10, adds a "business" tag,
and ensures that duplicate jobs aren't enqueued within a 30 second period:
defmodule MyApp.Workers.Business do
use Oban.Worker,
queue: :events,
priority: 1,
max_attempts: 10,
tags: ["business"],
unique: [period: 30]
@impl Oban.Worker
def perform(%Oban.Job{attempt: attempt}) when attempt > 3 do
IO.inspect(attempt)
end
def perform(job) do
IO.inspect(job.args)
end
end
The `c:perform/1` function receives an `Oban.Job` struct as argument. This allows workers to
change the behavior of `c:perform/1` based on attributes of the job, e.g. the number of
execution attempts or when it was inserted.
The value returned from `c:perform/1` can control whether the job is a success or a failure:
* `:ok` or `{:ok, value}` — the job is successful; for success tuples the `value` is ignored
* `:discard` or `{:discard, reason}` — discard the job and prevent it from being retried again.
An error is recorded using the optional reason, though the job is still successful
* `{:error, error}` — the job failed, record the error and schedule a retry if possible
* `{:snooze, seconds}` — mark the job as `snoozed` and schedule it to run again `seconds` in the
future. See [Snoozing](#module-snoozing-jobs) for more details.
In addition to explicit return values, any _unhandled exception_, _exit_ or _throw_ will fail
the job and schedule a retry if possible.
As an example of error tuple handling, this worker will return an error tuple when the `value`
is less than one:
defmodule MyApp.Workers.ErrorExample do
use Oban.Worker
@impl Worker
def perform(%{args: %{"value" => value}}) do
if value > 1 do
:ok
else
{:error, "invalid value given: " <> inspect(value)}
end
end
end
The error tuple is wrapped in an `Oban.PerformError` with a formatted message. The error tuple
itself is available through the exception's `:reason` field.
## Enqueuing Jobs
All workers implement a `c:new/2` function that converts an args map into a job changeset
suitable for inserting into the database for later execution:
%{in_the: "business", of_doing: "business"}
|> MyApp.Workers.Business.new()
|> Oban.insert()
The worker's defaults may be overridden by passing options:
%{vote_for: "none of the above"}
|> MyApp.Workers.Business.new(queue: "special", max_attempts: 5)
|> Oban.insert()
Uniqueness options may also be overridden by passing options:
%{expensive: "business"}
|> MyApp.Workers.Business.new(unique: [period: 120, fields: [:worker]])
|> Oban.insert()
Note that `unique` options aren't merged, they are overridden entirely.
See `Oban.Job` for all available options.
## Customizing Backoff
When jobs fail they may be retried again in the future using a backoff algorithm. By default the
backoff is exponential with a fixed padding of 15 seconds and a small amount of jitter. The
jitter helps to prevent jobs that fail simultaneously from consistently retrying at the same
time. The default backoff is clamped to a maximum of 12 days, the equivalent of the 20th
attempt.
If the default strategy is too aggressive or otherwise unsuited to your app's workload you can
define a custom backoff function using the `c:backoff/1` callback.
The following worker defines a `c:backoff/1` function that delays retries using a variant of the
historic Resque/Sidekiq algorithm:
defmodule MyApp.SidekiqBackoffWorker do
use Oban.Worker
@impl Worker
def backoff(%Job{attempt: attempt}) do
trunc(:math.pow(attempt, 4) + 15 + :rand.uniform(30) * attempt)
end
@impl Worker
def perform(_job) do
:do_business
end
end
Here are some alternative backoff strategies to consider:
* **constant** — delay by a fixed number of seconds, e.g. 1→15, 2→15, 3→15
* **linear** — delay for the same number of seconds as the current attempt, e.g. 1→1, 2→2, 3→3
* **squared** — delay by attempt number squared, e.g. 1→1, 2→4, 3→9
* **sidekiq** — delay by a base amount plus some jitter, e.g. 1→32, 2→61, 3→135
### Contextual Backoff
Any error, catch or throw is temporarily recorded in the job's `unsaved_error` map. The unsaved
error map can be used by `c:backoff/1` to calculate a custom backoff based on the exact error
that failed the job. In this example the `c:backoff/1` callback checks to see if the error was
due to rate limiting and adjusts the backoff accordingly:
defmodule MyApp.ApiWorker do
use Oban.Worker
@five_minutes 5 * 60
@impl Worker
def perform(%{args: args}) do
MyApp.make_external_api_call(args)
end
@impl Worker
def backoff(%Job{attempt: attempt, unsaved_error: unsaved_error}) do
%{kind: _, reason: reason, stacktrace: _} = unsaved_error
case reason do
%MyApp.ApiError{status: 429} -> @five_minutes
_ -> trunc(:math.pow(attempt, 4))
end
end
end
## Snoozing jobs
When returning `{:snooze, snooze_time}` in `c:perform/1`, the job is postponed for at
least `snooze_time` seconds. Snoozing is done by incrementing the job's `max_attempts` field and
scheduling execution for `snooze_time` seconds in the future.
Snoozing does not change the number of retries remaining on the job, but it does increment the `attempt`
number each time the job snoozes, which will affect the default backoff exponential retry
algorithm. In the example below the `c:backoff/1` callback compensates for snoozing:
defmodule MyApp.SnoozingWorker do
@max_attempts 20
use Oban.Worker, max_attempts: @max_attempts
@impl Worker
def backoff(%Job{} = job) do
corrected_attempt = @max_attempts - (job.max_attempts - job.attempt)
Worker.backoff(%{job | attempt: corrected_attempt})
end
@impl Worker
def perform(job) do
if MyApp.something?(job), do: :ok, else: {:snooze, 60}
end
end
## Limiting Execution Time
By default, individual jobs may execute indefinitely. If this is undesirable you may define a
timeout in milliseconds with the `c:timeout/1` callback on your worker module.
For example, to limit a worker's execution time to 30 seconds:
def MyApp.Worker do
use Oban.Worker
@impl Oban.Worker
def perform(_job) do
something_that_may_take_a_long_time()
:ok
end
@impl Oban.Worker
def timeout(_job), do: :timer.seconds(30)
end
The `c:timeout/1` function accepts an `Oban.Job` struct, so you can customize the timeout using
any job attributes.
Define the `timeout` value through job args:
def timeout(%_{args: %{"timeout" => timeout}}), do: timeout
Define the `timeout` based on the number of attempts:
def timeout(%_{attempt: attempt}), do: attempt * :timer.seconds(5)
"""
@moduledoc since: "0.1.0"
import Kernel, except: [to_string: 1]
alias Oban.{Backoff, Job}
@type t :: module()
@type result ::
:ok
| :discard
| {:discard, reason :: term()}
| {:ok, ignored :: term()}
| {:error, reason :: term()}
| {:snooze, seconds :: pos_integer()}
@doc """
Build a job changeset for this worker with optional overrides.
See `Oban.Job.new/2` for the available options.
"""
@callback new(args :: Job.args(), opts :: [Job.option()]) :: Job.changeset()
@doc """
Calculate the execution backoff.
In this context backoff specifies the number of seconds to wait before retrying a failed job.
Defaults to an exponential algorithm with a minimum delay of 15 seconds and a small amount of
jitter.
"""
@callback backoff(job :: Job.t()) :: pos_integer()
@doc """
Set a job's maximum execution time in milliseconds.
Jobs that exceed the time limit are considered a failure and may be retried.
Defaults to `:infinity`.
"""
@callback timeout(job :: Job.t()) :: :infinity | pos_integer()
@doc """
The `c:perform/1` function is called to execute a job.
Each `c:perform/1` function should return `:ok` or a success tuple. When the return is an error
tuple, an uncaught exception or a throw then the error is recorded and the job may be retried if
there are any attempts remaining.
Note that the `args` map provided to `perform/1` will _always_ have string keys, regardless of
the key type when the job was enqueued. The `args` are stored as `jsonb` in PostgreSQL and the
serialization process automatically stringifies all keys.
"""
@callback perform(job :: Job.t()) :: result()
@max_for_backoff 20
@backoff_base 15
@doc false
defmacro __using__(opts) do
quote location: :keep do
alias Oban.{Job, Worker}
@after_compile Worker
@behaviour Worker
@doc false
def __opts__ do
Keyword.put(unquote(opts), :worker, to_string(__MODULE__))
end
@impl Worker
def new(args, opts \\ []) when is_map(args) and is_list(opts) do
Job.new(args, Worker.merge_opts(__opts__(), opts))
end
@impl Worker
def backoff(%Job{} = job) do
Worker.backoff(job)
end
@impl Worker
def timeout(%Job{} = job) do
Worker.timeout(job)
end
defoverridable Worker
end
end
@doc false
defmacro __after_compile__(%{module: module}, _env) do
Enum.each(module.__opts__(), &validate_opt!/1)
end
@doc false
def merge_opts(base_opts, opts) do
Keyword.merge(base_opts, opts, fn
:unique, [_ | _] = opts_1, [_ | _] = opts_2 ->
Keyword.merge(opts_1, opts_2)
_key, _opts, opts_2 ->
opts_2
end)
end
@doc false
def backoff(%Job{attempt: attempt, max_attempts: max_attempts}) do
clamped_attempt =
if max_attempts <= @max_for_backoff do
attempt
else
round(attempt / max_attempts * @max_for_backoff)
end
time = trunc(:math.pow(2, clamped_attempt) + @backoff_base)
Backoff.jitter(time)
end
@doc false
def timeout(%Job{} = _job) do
:infinity
end
@doc """
Return a string representation of a worker module.
This is particularly useful for normalizing worker names when building custom Ecto queries.
## Examples
iex> Oban.Worker.to_string(MyApp.SomeWorker)
"MyApp.SomeWorker"
iex> Oban.Worker.to_string(Elixir.MyApp.SomeWorker)
"MyApp.SomeWorker"
iex> Oban.Worker.to_string("Elixir.MyApp.SomeWorker")
"MyApp.SomeWorker"
"""
@doc since: "2.1.0"
@spec to_string(module() | String.t()) :: String.t()
def to_string(worker) when is_atom(worker) and not is_nil(worker) do
worker
|> Kernel.to_string()
|> to_string()
end
def to_string("Elixir." <> val), do: val
def to_string(worker) when is_binary(worker), do: worker
@doc """
Resolve a module from a worker string.
## Examples
iex> Oban.Worker.from_string("Oban.Integration.Worker")
{:ok, Oban.Integration.Worker}
iex> defmodule NotAWorker, do: []
...> Oban.Worker.from_string("NotAWorker")
{:error, %RuntimeError{message: "module is not a worker: NotAWorker"}}
iex> Oban.Worker.from_string("RandomWorker")
{:error, %RuntimeError{message: "unknown worker: RandomWorker"}}
"""
@doc since: "2.3.0"
@spec from_string(String.t()) :: {:ok, module()} | {:error, Exception.t()}
def from_string(worker_name) when is_binary(worker_name) do
module =
worker_name
|> String.split(".")
|> Module.safe_concat()
if Code.ensure_loaded?(module) && function_exported?(module, :perform, 1) do
{:ok, module}
else
{:error, %RuntimeError{message: "module is not a worker: #{inspect(module)}"}}
end
rescue
ArgumentError ->
{:error, %RuntimeError{message: "unknown worker: #{worker_name}"}}
end
# Validation Helpers
defp validate_opt!({:max_attempts, max_attempts}) do
unless is_integer(max_attempts) and max_attempts > 0 do
raise ArgumentError,
"expected :max_attempts to be a positive integer, got: #{inspect(max_attempts)}"
end
end
defp validate_opt!({:priority, priority}) do
unless is_integer(priority) and priority > -1 and priority < 4 do
raise ArgumentError,
"expected :priority to be an integer from 0 to 3, got: #{inspect(priority)}"
end
end
defp validate_opt!({:queue, queue}) do
unless is_atom(queue) or is_binary(queue) do
raise ArgumentError, "expected :queue to be an atom or a binary, got: #{inspect(queue)}"
end
end
defp validate_opt!({:tags, tags}) do
unless is_list(tags) and Enum.all?(tags, &is_binary/1) do
raise ArgumentError, "expected :tags to be a list of strings, got: #{inspect(tags)}"
end
end
defp validate_opt!({:unique, unique}) do
unless is_list(unique) and Enum.all?(unique, &Job.valid_unique_opt?/1) do
raise ArgumentError, "unexpected unique options: #{inspect(unique)}"
end
end
defp validate_opt!({:worker, worker}) do
unless is_binary(worker) do
raise ArgumentError, "expected :worker to be a binary, got: #{inspect(worker)}"
end
end
defp validate_opt!(option) do
raise ArgumentError, "unknown option provided #{inspect(option)}"
end
end