lib/oban/job.ex

defmodule Oban.Job do
  @moduledoc """
  A Job is an Ecto schema used for asynchronous execution.

  Job changesets are created by your application code and inserted into the database for
  asynchronous execution. Jobs can be inserted along with other application data as part of a
  transaction, which guarantees that jobs will only be triggered from a successful transaction.
  """
  @moduledoc since: "0.1.0"

  use Ecto.Schema

  import Ecto.Changeset

  @type args :: map()
  @type errors :: [%{at: DateTime.t(), attempt: pos_integer(), error: binary()}]
  @type tags :: [binary()]

  @type unique_field :: [:args | :meta | :queue | :worker]

  @type unique_period :: pos_integer() | :infinity

  @type unique_state :: [
          :available
          | :cancelled
          | :completed
          | :discarded
          | :executing
          | :retryable
          | :scheduled
        ]

  @type unique_option ::
          {:fields, [unique_field()]}
          | {:keys, [atom()]}
          | {:period, unique_period()}
          | {:states, [unique_state()]}
          | {:timestamp, :inserted_at | :scheduled_at}

  @type replace_option :: [
          :args
          | :max_attempts
          | :meta
          | :priority
          | :queue
          | :scheduled_at
          | :tags
          | :worker
        ]

  @type replace_by_state_option :: [
          {:available, [replace_option()]}
          | {:cancelled, [replace_option()]}
          | {:completed, [replace_option()]}
          | {:discarded, [replace_option()]}
          | {:executing, [replace_option()]}
          | {:retryable, [replace_option()]}
          | {:scheduled, [replace_option()]}
        ]

  @type schedule_in_option ::
          pos_integer()
          | {pos_integer(),
             :second
             | :seconds
             | :minute
             | :minutes
             | :hour
             | :hours
             | :day
             | :days
             | :week
             | :weeks}

  @type option ::
          {:args, args()}
          | {:max_attempts, pos_integer()}
          | {:meta, map()}
          | {:priority, pos_integer()}
          | {:queue, atom() | binary()}
          | {:schedule_in, schedule_in_option()}
          | {:replace_args, boolean()}
          | {:replace, [replace_option() | replace_by_state_option()]}
          | {:scheduled_at, DateTime.t()}
          | {:tags, tags()}
          | {:unique, [unique_option()]}
          | {:worker, atom() | binary()}

  @type t :: %__MODULE__{
          id: pos_integer(),
          state: binary(),
          queue: binary(),
          worker: binary(),
          args: args(),
          errors: errors(),
          tags: tags(),
          attempt: non_neg_integer(),
          attempted_by: [binary()] | nil,
          max_attempts: pos_integer(),
          meta: map(),
          priority: non_neg_integer(),
          inserted_at: DateTime.t(),
          scheduled_at: DateTime.t(),
          attempted_at: DateTime.t() | nil,
          completed_at: DateTime.t() | nil,
          discarded_at: DateTime.t() | nil,
          cancelled_at: DateTime.t() | nil,
          conf: Oban.Config.t() | nil,
          conflict?: boolean(),
          replace: [replace_option() | replace_by_state_option()] | nil,
          unique:
            %{fields: [unique_field()], period: unique_period(), states: [unique_state()]} | nil,
          unsaved_error:
            %{
              kind: Exception.kind(),
              reason: term(),
              stacktrace: Exception.stacktrace()
            }
            | nil
        }

  @type changeset :: Ecto.Changeset.t(t())
  @type changeset_fun :: (map() -> changeset())
  @type changeset_list :: [changeset()]
  @type changeset_list_fun :: (map() -> changeset_list())

  schema "oban_jobs" do
    field :state, :string, default: "available"
    field :queue, :string, default: "default"
    field :worker, :string
    field :args, :map
    field :meta, :map, default: %{}
    field :tags, {:array, :string}, default: []
    field :errors, {:array, :map}, default: []
    field :attempt, :integer, default: 0
    field :attempted_by, {:array, :string}
    field :max_attempts, :integer, default: 20
    field :priority, :integer

    field :attempted_at, :utc_datetime_usec
    field :cancelled_at, :utc_datetime_usec
    field :completed_at, :utc_datetime_usec
    field :discarded_at, :utc_datetime_usec
    field :inserted_at, :utc_datetime_usec
    field :scheduled_at, :utc_datetime_usec

    field :conf, :map, virtual: true
    field :conflict?, :boolean, virtual: true, default: false
    field :replace, {:array, :any}, virtual: true
    field :unique, :map, virtual: true
    field :unsaved_error, :map, virtual: true
  end

  @permitted_params ~w(
    args
    attempt
    attempted_by
    attempted_at
    completed_at
    discarded_at
    cancelled_at
    errors
    inserted_at
    max_attempts
    meta
    priority
    queue
    scheduled_at
    state
    tags
    worker
  )a

  @virtual_params ~w(replace replace_args schedule_in unique)a

  @required_params ~w(worker args)a

  @replace_options ~w(args max_attempts meta priority queue scheduled_at tags worker)a

  @doc """
  Construct a new job changeset ready for insertion into the database.

  ## Options

    * `:max_attempts` — the maximum number of times a job can be retried if there are errors
      during execution
    * `:meta` — a map containing additional information about the job
    * `:priority` — a numerical indicator from 0 to 3 of how important this job is relative to
      other jobs in the same queue. The lower the number, the higher priority the job.
    * `:queue` — a named queue to push the job into. Jobs may be pushed into any queue, regardless
      of whether jobs are currently being processed for the queue.
    * `:replace` - a list of keys to replace per state on a unique conflict
    * `:scheduled_at` - a time in the future after which the job should be executed
    * `:schedule_in` - the number of seconds until the job should be executed or a tuple containing
      a number and unit
    * `:tags` — a list of tags to group and organize related jobs, i.e. to identify scheduled jobs
    * `:unique` — a keyword list of options specifying how uniqueness will be calculated. The
      options define which fields will be used, for how long, with which keys, and for which states.
    * `:worker` — a module to execute the job in. The module must implement the `Oban.Worker`
      behaviour.

  ## Examples

  Insert a job with the `:default` queue:

      %{id: 1, user_id: 2}
      |> Oban.Job.new(queue: :default, worker: MyApp.Worker)
      |> Oban.insert()

  Generate a pre-configured job for `MyApp.Worker`:

      MyApp.Worker.new(%{id: 1, user_id: 2})

  Schedule a job to run in 5 seconds:

      MyApp.Worker.new(%{id: 1}, schedule_in: 5)

  Schedule a job to run in 5 minutes:

      MyApp.Worker.new(%{id: 1}, schedule_in: {5, :minutes})

  Insert a job, ensuring that it is unique within the past minute:

      MyApp.Worker.new(%{id: 1}, unique: [period: 60])

  Insert a unique job where the period is compared to the `scheduled_at` timestamp rather than
  `inserted_at`:

      MyApp.Worker.new(%{id: 1}, unique: [period: 60, timestamp: :scheduled_at])

  Insert a unique job based only on the worker field, and within multiple states:

      fields = [:worker]
      states = [:available, :scheduled, :executing, :retryable, :completed]

      MyApp.Worker.new(%{id: 1}, unique: [fields: fields, period: 60, states: states])

  Insert a unique job considering only the worker and specified keys in the args:

      keys = [:account_id, :url]
      args = %{account_id: 1, url: "https://example.com"}

      MyApp.Worker.new(args, unique: [fields: [:args, :worker], keys: keys])

  Insert a unique job considering only specified keys in the meta:

      unique = [fields: [:meta], keys: [:slug]]

      MyApp.Worker.new(%{id: 1}, meta: %{slug: "unique-key"}, unique: unique)
  """
  @doc since: "0.1.0"
  @spec new(args(), [option()]) :: changeset()
  def new(args, opts \\ []) when is_map(args) and is_list(opts) do
    params =
      opts
      |> Keyword.put(:args, args)
      |> Map.new()
      |> coerce_field(:queue, &to_string/1)
      |> coerce_field(:worker, &Oban.Worker.to_string/1)
      |> normalize_tags()

    %__MODULE__{}
    |> cast(params, @permitted_params)
    |> validate_keys(params, @permitted_params ++ @virtual_params)
    |> validate_required(@required_params)
    |> put_scheduling(params[:schedule_in])
    |> put_uniqueness(params[:unique])
    |> put_replace(params[:replace], params[:replace_args])
    |> put_state()
    |> validate_length(:queue, min: 1, max: 128)
    |> validate_length(:worker, min: 1, max: 128)
    |> validate_number(:max_attempts, greater_than: 0)
    |> validate_number(:priority, greater_than: -1, less_than: 4)
    |> validate_replace()
    |> check_constraint(:attempt, name: :attempt_range)
    |> check_constraint(:max_attempts, name: :positive_max_attempts)
    |> check_constraint(:priority, name: :priority_range)
  end

  @unique_fields ~w(args queue worker)a
  @unique_period 60
  @unique_states ~w(scheduled available executing retryable completed)a
  @unique_timestamp :inserted_at

  @doc """
  A canonical list of all possible job states.

  This may be used to build up `:unique` options without duplicating states in application code.

  ## Examples

      iex> Oban.Job.states() -- [:completed, :discarded]
      [:scheduled, :available, :executing, :retryable, :cancelled]

  ## Job State Transitions

  * `:scheduled`—Jobs inserted with `scheduled_at` in the future are `:scheduled`. After the
    `scheduled_at` time has ellapsed the `Oban.Plugins.Stager` will transition them to `:available`
  * `:available`—Jobs without a future `scheduled_at` timestamp are inserted as `:available` and may
    execute immediately
  * `:executing`—Available jobs may be ran, at which point they are `:executing`
  * `:retryable`—Jobs that fail and haven't exceeded their max attempts are transitiond to
    `:retryable` and rescheduled until after a backoff period. Once the backoff has ellapsed the
    `Oban.Plugins.Stager` will transition them back to `:available`
  * `:completed`—Jobs that finish executing succesfully are marked `:completed`
  * `:discarded`—Jobs that fail and exhaust their max attempts, or return a `:discard` tuple during
    execution, are marked `:discarded`
  * `:cancelled`—Jobs that are cancelled intentionally

  """
  @doc since: "2.1.0"
  def states, do: @unique_states ++ [:discarded, :cancelled]

  @doc """
  Convert a Job changeset into a map suitable for database insertion.

  ## Examples

  Convert a worker generated changeset into a plain map:

      %{id: 123}
      |> MyApp.Worker.new()
      |> Oban.Job.to_map()
  """
  @doc since: "0.9.0"
  @spec to_map(Ecto.Changeset.t(t())) :: map()
  def to_map(%Ecto.Changeset{} = changeset) do
    changeset
    |> Ecto.Changeset.apply_action!(:insert)
    |> Map.from_struct()
    |> Enum.reduce(%{}, fn {key, val}, acc ->
      if key in @permitted_params and not is_nil(val) do
        Map.put(acc, key, val)
      else
        acc
      end
    end)
  end

  @doc """
  Normalize, blame, and format a job's `unsaved_error` into the stored error format.

  Formatted errors are stored in a job's `errors` field.
  """
  @doc since: "2.14.0"
  def format_attempt(%__MODULE__{attempt: attempt, unsaved_error: unsaved}) do
    %{kind: kind, reason: error, stacktrace: stacktrace} = unsaved

    {blamed, stacktrace} = Exception.blame(kind, error, stacktrace)

    error = Exception.format(kind, blamed, stacktrace)

    %{attempt: attempt, at: DateTime.utc_now(), error: error}
  end

  defp coerce_field(params, field, fun) do
    case Map.get(params, field) do
      value when is_atom(value) and not is_nil(value) ->
        update_in(params, [field], fun)

      value when is_binary(value) ->
        update_in(params, [field], fun)

      _ ->
        params
    end
  end

  @doc false
  @spec valid_unique_opt?({:fields | :period | :states, [atom()] | integer()}) :: boolean()
  def valid_unique_opt?({:fields, [_ | _] = fields}), do: fields -- [:meta | @unique_fields] == []
  def valid_unique_opt?({:keys, []}), do: true
  def valid_unique_opt?({:keys, [_ | _] = keys}), do: Enum.all?(keys, &is_atom/1)
  def valid_unique_opt?({:period, :infinity}), do: true
  def valid_unique_opt?({:period, period}), do: is_integer(period) and period > 0
  def valid_unique_opt?({:states, [_ | _] = states}), do: states -- states() == []
  def valid_unique_opt?({:timestamp, stamp}), do: stamp in ~w(inserted_at scheduled_at)a
  def valid_unique_opt?(_option), do: false

  @time_units ~w(
    second
    seconds
    minute
    minutes
    hour
    hours
    day
    days
    week
    weeks
  )a

  defguardp is_timestampable(value)
            when is_integer(value) or
                   (is_integer(elem(value, 0)) and elem(value, 1) in @time_units)

  defp put_replace(changeset, replace, replace_args) do
    with_states = fn fields ->
      for state <- states(), do: {state, fields}
    end

    case {replace, replace_args} do
      {nil, true} ->
        put_change(changeset, :replace, with_states.([:args]))

      {[field | _] = replace, true} when is_atom(field) ->
        put_change(changeset, :replace, with_states.([:args | replace]))

      {[field | _] = replace, _} when is_atom(field) ->
        put_change(changeset, :replace, with_states.(replace))

      {replace, _} when is_list(replace) ->
        put_change(changeset, :replace, replace)

      _ ->
        changeset
    end
  end

  defp put_scheduling(changeset, value) do
    case value do
      value when is_timestampable(value) ->
        scheduled_at = to_timestamp(value)

        put_change(changeset, :scheduled_at, scheduled_at)

      nil ->
        changeset

      _ ->
        add_error(changeset, :schedule_in, "invalid value")
    end
  end

  defp put_state(changeset) do
    case {get_change(changeset, :state), get_change(changeset, :scheduled_at)} do
      {nil, %_{}} -> put_change(changeset, :state, "scheduled")
      _ -> changeset
    end
  end

  defp put_uniqueness(changeset, value) do
    case value do
      [_ | _] = opts ->
        unique =
          opts
          |> Map.new()
          |> Map.put_new(:fields, @unique_fields)
          |> Map.put_new(:keys, [])
          |> Map.put_new(:period, @unique_period)
          |> Map.put_new(:states, @unique_states)
          |> Map.put_new(:timestamp, @unique_timestamp)

        case validate_unique_opts(unique) do
          :ok ->
            put_change(changeset, :unique, unique)

          {:error, field, value} ->
            add_error(changeset, :unique, "invalid unique option for #{field}, #{inspect(value)}")
        end

      nil ->
        changeset

      _ ->
        add_error(changeset, :unique, "invalid unique options")
    end
  end

  defp normalize_tags(%{tags: [_ | _] = tags} = params) do
    normalize = fn string ->
      string
      |> to_string()
      |> String.trim()
      |> String.downcase()
    end

    tags =
      tags
      |> Enum.map(normalize)
      |> Enum.reject(&(&1 == ""))
      |> Enum.uniq()

    %{params | tags: tags}
  end

  defp normalize_tags(params), do: params

  defp to_timestamp(seconds) when is_integer(seconds) do
    DateTime.add(DateTime.utc_now(), seconds, :second)
  end

  defp to_timestamp({seconds, :second}), do: to_timestamp(seconds)
  defp to_timestamp({seconds, :seconds}), do: to_timestamp(seconds)
  defp to_timestamp({minutes, :minute}), do: to_timestamp(minutes * 60)
  defp to_timestamp({minutes, :minutes}), do: to_timestamp({minutes, :minute})
  defp to_timestamp({hours, :hour}), do: to_timestamp(hours * 60 * 60)
  defp to_timestamp({hours, :hours}), do: to_timestamp({hours, :hour})
  defp to_timestamp({days, :day}), do: to_timestamp(days * 24 * 60 * 60)
  defp to_timestamp({days, :days}), do: to_timestamp({days, :day})
  defp to_timestamp({weeks, :week}), do: to_timestamp(weeks * 7 * 24 * 60 * 60)
  defp to_timestamp({weeks, :weeks}), do: to_timestamp({weeks, :week})

  # Validation

  defp validate_keys(changeset, params, keys) do
    keys = Enum.map(keys, &to_string/1)

    Enum.reduce(params, changeset, fn {key, _val}, acc ->
      if to_string(key) in keys do
        acc
      else
        add_error(acc, :base, "unknown option #{inspect(key)} provided")
      end
    end)
  end

  defp validate_replace(changeset) do
    invalid_state = fn replace ->
      replace
      |> Keyword.keys()
      |> Enum.any?(&(&1 not in states()))
    end

    invalid_field = fn replace ->
      replace
      |> Keyword.values()
      |> List.flatten()
      |> Enum.any?(&(&1 not in @replace_options))
    end

    replace = get_change(changeset, :replace)

    cond do
      is_list(replace) and invalid_state.(replace) ->
        add_error(changeset, :replace, "has an invalid state")

      is_list(replace) and invalid_field.(replace) ->
        add_error(changeset, :replace, "has an invalid field")

      true ->
        changeset
    end
  end

  defp validate_unique_opts(unique) do
    Enum.reduce_while(unique, :ok, fn {key, val}, _acc ->
      if valid_unique_opt?({key, val}) do
        {:cont, :ok}
      else
        {:halt, {:error, key, val}}
      end
    end)
  end
end