lib/oban/testing.ex

defmodule Oban.Testing do
  @moduledoc """
  This module simplifies testing workers and making assertions about enqueued jobs when testing in
  `:manual` mode.

  Assertions may be made on any property of a job, but you'll typically want to check by `args`,
  `queue` or `worker`.

  ## Usage

  The most convenient way to use `Oban.Testing` is to `use` the module:

      use Oban.Testing, repo: MyApp.Repo

  That will define the helper functions you'll use to make assertions on the jobs that should (or
  should not) be inserted in the database while testing.

  If you're using namespacing through Postgres schemas, also called "prefixes" in Ecto, you
  should set the `prefix` option:

      use Oban.Testing, repo: MyApp.Repo, prefix: "business"

  Unless overridden, the default `prefix` is `public`.

  ### Adding to Case Templates

  To include helpers in all of your tests you can add it to your case template:

  ```elixir
  defmodule MyApp.DataCase do
    use ExUnit.CaseTemplate

    using do
      quote do
        use Oban.Testing, repo: MyApp.Repo

        import Ecto
        import Ecto.Changeset
        import Ecto.Query
        import MyApp.DataCase

        alias MyApp.Repo
      end
    end
  end
  ```

  ## Examples

  After the test helpers are imported, you can make assertions about enqueued (available or
  scheduled) jobs in your tests.

  Here are a few examples that demonstrate what's possible:

  ```elixir
  # Assert that a job was already enqueued
  assert_enqueued worker: MyWorker, args: %{id: 1}

  # Assert that a job was enqueued or will be enqueued in the next 100ms
  assert_enqueued [worker: MyWorker, args: %{id: 1}], 100

  # Refute that a job was already enqueued
  refute_enqueued queue: "special", args: %{id: 2}

  # Refute that a job was already enqueued or would be enqueued in the next 100ms
  refute_enqueued queue: "special", args: %{id: 2}, 100

  # Make assertions on a list of all jobs matching some options
  assert [%{args: %{"id" => 1}}] = all_enqueued(worker: MyWorker)

  # Assert that no jobs are enqueued in any queues
  assert [] = all_enqueued()
  ```

  Note that the final example, using `all_enqueued/1`, returns a raw list of matching jobs and
  does not make an assertion by itself. This makes it possible to test using pattern matching at
  the expense of being more verbose.

  See the docs for `assert_enqueued/1,2`, `refute_enqueued/1,2`, and `all_enqueued/1` for more
  examples.

  ## Matching Timestamps

  In order to assert a job has been scheduled at a certain time, you will need to match against
  the `scheduled_at` attribute of the enqueued job.

      in_an_hour = DateTime.add(DateTime.utc_now(), 3600, :second)
      assert_enqueued worker: MyApp.Worker, scheduled_at: in_an_hour

  By default, Oban will apply a 1 second delta to all timestamp fields of jobs, so that small
  deviations between the actual value and the expected one are ignored. You may configure this
  delta by passing a tuple of value and a `delta` option (in seconds) to corresponding keyword:

      assert_enqueued worker: MyApp.Worker, scheduled_at: {in_an_hour, delta: 10}
  """

  @moduledoc since: "0.3.0"

  import ExUnit.Assertions, only: [assert: 2, refute: 2, flunk: 1]
  import Ecto.Query, only: [order_by: 2, select: 3, where: 2, where: 3]

  alias Ecto.Changeset

  alias Oban.{Config, Job, Queue.Executor, Repo, Worker}

  @type perform_opts :: Job.option() | Oban.option()

  @conf_keys []
             |> Config.new()
             |> Map.from_struct()
             |> Map.keys()

  @json_fields ~w(args meta)a
  @timestamp_fields ~w(attempted_at cancelled_at completed_at discarded_at inserted_at scheduled_at)a
  @wait_interval 10

  @doc false
  defmacro __using__(repo_opts) do
    repo_opts = Keyword.put_new(repo_opts, :prefix, false)

    unless Keyword.has_key?(repo_opts, :repo) do
      raise ArgumentError, "testing requires a :repo option to be set"
    end

    quote do
      alias Oban.Testing

      def perform_job(worker, args, opts \\ []) do
        opts = Keyword.merge(unquote(repo_opts), opts)

        Testing.perform_job(worker, args, opts)
      end

      def all_enqueued(opts \\ []) do
        unquote(repo_opts)
        |> Keyword.merge(opts)
        |> Testing.all_enqueued()
      end

      def assert_enqueued(opts, timeout \\ :none) do
        opts = Keyword.merge(unquote(repo_opts), opts)

        if timeout == :none do
          Testing.assert_enqueued(opts)
        else
          Testing.assert_enqueued(opts, timeout)
        end
      end

      def refute_enqueued(opts, timeout \\ :none) do
        opts = Keyword.merge(unquote(repo_opts), opts)

        if timeout == :none do
          Testing.refute_enqueued(opts)
        else
          Testing.refute_enqueued(opts, timeout)
        end
      end
    end
  end

  @doc """
  Construct a job and execute it with a worker module.

  This reduces boilerplate when constructing jobs for unit tests and checks for common pitfalls.
  For example, it automatically converts `args` to string keys before calling `perform/1`,
  ensuring that perform clauses aren't erroneously trying to match atom keys.

  The helper makes the following assertions:

  * That the worker implements the `Oban.Worker` behaviour
  * That the options provided build a valid job
  * That the return is valid, e.g. `:ok`, `{:ok, value}`, `{:error, value}` etc.

  If all of the assertions pass then the function returns the result of `perform/1` for you to
  make additional assertions on.

  ## Examples

  Successfully execute a job with some string arguments:

      assert :ok = perform_job(MyWorker, %{"id" => 1})

  Successfully execute a job and assert that it returns an error tuple:

      assert {:error, _} = perform_job(MyWorker, %{"bad" => "arg"})

  Execute a job with the args keys automatically stringified:

      assert :ok = perform_job(MyWorker, %{id: 1})

  Exercise custom attempt handling within a worker by passing options:

      assert :ok = perform_job(MyWorker, %{}, attempt: 42)

  Cause a test failure because the provided worker isn't real:

      assert :ok = perform_job(Vorker, %{"id" => 1})
  """
  @doc since: "2.0.0"
  @spec perform_job(worker :: Worker.t(), args :: term(), [perform_opts()]) :: Worker.result()
  def perform_job(worker, args, opts) when is_atom(worker) do
    assert_valid_worker(worker)

    {conf_opts, opts} = Keyword.split(opts, @conf_keys)

    utc_now = DateTime.utc_now()

    changeset =
      args
      |> worker.new(opts)
      |> Changeset.put_change(:id, System.unique_integer([:positive]))
      |> Changeset.update_change(:args, &json_recode/1)
      |> Changeset.update_change(:meta, &json_recode/1)
      |> put_new_change(:attempt, 1)
      |> put_new_change(:attempted_at, utc_now)
      |> put_new_change(:scheduled_at, utc_now)
      |> put_new_change(:inserted_at, utc_now)

    assert_valid_changeset(changeset)

    job = Changeset.apply_action!(changeset, :insert)

    result =
      conf_opts
      |> Config.new()
      |> Executor.new(job, safe: false, ack: false)
      |> Executor.call()
      |> Map.fetch!(:result)

    assert_valid_result(result)

    result
  end

  @doc """
  Retrieve all currently enqueued jobs matching a set of options.

  Only jobs matching all of the provided arguments will be returned. Additionally, jobs are
  returned in descending order where the most recently enqueued job will be listed first.

  ## Examples

  Assert based on only _some_ of a job's args:

      assert [%{args: %{"id" => 1}}] = all_enqueued(worker: MyWorker)

  Assert that exactly one job was inserted for a queue:

      assert [%Oban.Job{}] = all_enqueued(queue: :alpha)

  Assert that there aren't any jobs enqueued for any queues or workers:

      assert [] = all_enqueued()
  """
  @spec all_enqueued(opts :: Keyword.t()) :: [Job.t()]
  def all_enqueued(opts) when is_list(opts) do
    {conf, opts} = extract_conf(opts)

    filter_jobs(conf, opts)
  end

  @doc false
  @spec all_enqueued(repo :: module(), opts :: Keyword.t()) :: [Job.t()]
  def all_enqueued(repo, opts) do
    opts
    |> Keyword.put(:repo, repo)
    |> all_enqueued()
  end

  @doc """
  Assert that a job with matching fields is enqueued.

  Only values for the provided fields are checked. For example, an assertion made on `worker:
  "MyWorker"` will match _any_ jobs for that worker, regardless of every other field.

  ## Examples

  Assert that a job is enqueued for a certain worker and args:

      assert_enqueued worker: MyWorker, args: %{id: 1}

  Assert that a job is enqueued for a particular queue and priority:

      assert_enqueued queue: :business, priority: 3

  Assert that a job's args deeply match:

      assert_enqueued args: %{config: %{enabled: true}}

  Use the `:_` wildcard to assert that a job's meta has a key with any value:

      assert_enqueued meta: %{batch_id: :_}
  """
  @doc since: "0.3.0"
  @spec assert_enqueued(opts :: Keyword.t()) :: true
  def assert_enqueued([_ | _] = opts) do
    {conf, opts} = extract_conf(opts)

    if job_exists?(conf, opts) do
      true
    else
      flunk("""
      Expected a job matching:

      #{inspect_opts(opts)}

      to be enqueued. Instead found:

      #{inspect(available_jobs(conf, opts), charlists: :as_lists, pretty: true)}
      """)
    end
  end

  @doc false
  @spec assert_enqueued(repo :: module(), opts :: Keyword.t()) :: true
  def assert_enqueued(repo, opts) when is_list(opts) do
    opts
    |> Keyword.put(:repo, repo)
    |> assert_enqueued()
  end

  @doc """
  Assert that a job with particular options is or will be enqueued within a timeout period.

  See `assert_enqueued/1` for additional details.

  ## Examples

  Assert that a job will be enqueued in the next 100ms:

      assert_enqueued [worker: MyWorker], 100
  """
  @doc since: "1.2.0"
  @spec assert_enqueued(opts :: Keyword.t(), timeout :: timeout()) :: true
  def assert_enqueued([_ | _] = opts, timeout) when timeout > 0 do
    {conf, opts} = extract_conf(opts)

    error_message = """
    Expected a job matching:

    #{inspect_opts(opts)}

    to be enqueued within #{timeout}ms
    """

    assert wait_for_job(conf, opts, timeout), error_message
  end

  @doc false
  def assert_enqueued(repo, [_ | _] = opts, timeout) when timeout > 0 do
    opts
    |> Keyword.put(:repo, repo)
    |> assert_enqueued(timeout)
  end

  @doc """
  Refute that a job with particular options has been enqueued.

  See `assert_enqueued/1` for additional details.

  ## Examples

  Refute that a job is enqueued for a certain worker:

      refute_enqueued worker: MyWorker

  Refute that a job is enqueued for a certain worker and args:

      refute_enqueued worker: MyWorker, args: %{id: 1}

  Refute that a job's nested args match:

      refute_enqueued args: %{config: %{enabled: false}}

  Use the `:_` wildcard to refute that a job's meta has a key with any value:

      refute_enqueued meta: %{batch_id: :_}
  """
  @doc since: "0.3.0"
  @spec refute_enqueued(opts :: Keyword.t()) :: false
  def refute_enqueued([_ | _] = opts) do
    {conf, opts} = extract_conf(opts)

    error_message = """
    Expected no jobs matching:

    #{inspect_opts(opts)}

    to be enqueued.
    """

    refute job_exists?(conf, opts), error_message
  end

  @doc false
  def refute_enqueued(repo, [_ | _] = opts) do
    opts
    |> Keyword.put(:repo, repo)
    |> refute_enqueued()
  end

  @doc """
  Refute that a job with particular options is or will be enqueued within a timeout period.

  See `assert_enqueued/1` for additional details.

  ## Examples

  Refute that a job will be enqueued in the next 100ms:

      refute_enqueued [worker: MyWorker], 100
  """
  @doc since: "1.2.0"
  @spec refute_enqueued(opts :: Keyword.t(), timeout :: timeout()) :: false
  def refute_enqueued([_ | _] = opts, timeout) when is_integer(timeout) do
    {conf, opts} = extract_conf(opts)

    error_message = """
    Expected no jobs matching:

    #{inspect_opts(opts)}

    to be enqueued within #{timeout}ms
    """

    refute wait_for_job(conf, opts, timeout), error_message
  end

  def refute_enqueued(repo, [_ | _] = opts, timeout) do
    opts
    |> Keyword.put(:repo, repo)
    |> refute_enqueued(timeout)
  end

  @doc """
  Change the testing mode within the context of a function.

  Only `:manual` and `:inline` mode are supported, as `:disabled` implies that supervised queues
  and plugins are running and this function won't start any processes.

  ## Examples

  Switch to `:manual` mode when an Oban instance is configured for `:inline` testing:

      Oban.Testing.with_testing_mode(:manual, fn ->
        Oban.insert(MyWorker.new(%{id: 123}))

        assert_enqueued worker: MyWorker, args: %{id: 123}
      end)

  Visa-versa, switch to `:inline` mode:

      Oban.Testing.with_testing_mode(:inline, fn ->
        {:ok, %Job{state: "completed"}} = Oban.insert(MyWorker.new(%{id: 123}))
      end)
  """
  @doc since: "2.12.0"
  @spec with_testing_mode(:inline | :manual, (-> any())) :: any()
  def with_testing_mode(mode, fun) when mode in [:manual, :inline] and is_function(fun, 0) do
    Process.put(:oban_testing, mode)

    fun.()
  after
    Process.delete(:oban_testing)
  end

  # Assert Helpers

  defp inspect_opts(opts) do
    opts
    |> Map.new()
    |> inspect(charlists: :as_lists, pretty: true)
  end

  # Perform Helpers

  defp put_new_change(changeset, key, value) do
    case Changeset.fetch_change(changeset, key) do
      {:ok, _} ->
        changeset

      :error ->
        Changeset.put_change(changeset, key, value)
    end
  end

  defp assert_valid_worker(worker) do
    assert Code.ensure_loaded?(worker) and implements_worker?(worker), """
     Expected worker to be a module that implements the Oban.Worker behaviour, got:

    #{inspect(worker)}
    """
  end

  defp implements_worker?(worker) do
    :attributes
    |> worker.__info__()
    |> Keyword.get_values(:behaviour)
    |> List.flatten()
    |> Enum.member?(Oban.Worker)
  end

  defp assert_valid_changeset(changeset) do
    assert changeset.valid?, """
    Expected args and opts to build a valid job, got validation errors:

    #{traverse_errors(changeset)}
    """
  end

  defp traverse_errors(changeset) do
    traverser = fn {message, opts} ->
      Enum.reduce(opts, message, fn {key, value}, acc ->
        String.replace(acc, "%{#{key}}", to_string(value))
      end)
    end

    changeset
    |> Changeset.traverse_errors(traverser)
    |> Enum.map_join("\n", fn {key, val} -> "#{key}: #{val}" end)
  end

  defp json_recode(map) do
    map
    |> Jason.encode!()
    |> Jason.decode!()
  end

  defp assert_valid_result(result) do
    valid? =
      case result do
        :ok -> true
        {:ok, _value} -> true
        {:cancel, _value} -> true
        {:error, _value} -> true
        {:discard, _value} -> true
        {:snooze, snooze} when is_integer(snooze) -> true
        :discard -> true
        _ -> false
      end

    assert valid?, """
    Expected result to be one of

      - `:ok`
      - `{:ok, value}`
      - `{:cancel, reason}`
      - `{:error, reason}`
      - `{:snooze, duration}

    Instead received:

    #{inspect(result, pretty: true)}
    """
  end

  # Enqueued Helpers

  defp extract_conf(opts) do
    {conf_opts, opts} = Keyword.split(opts, @conf_keys)

    {Config.new(conf_opts), opts}
  end

  defp job_exists?(conf, opts) do
    conf
    |> filter_jobs(opts)
    |> Enum.any?()
  end

  defp wait_for_job(conf, opts, timeout) when timeout > 0 do
    if job_exists?(conf, opts) do
      true
    else
      Process.sleep(@wait_interval)

      wait_for_job(conf, opts, timeout - @wait_interval)
    end
  end

  defp wait_for_job(_conf, _opts, _timeout), do: false

  defp available_jobs(conf, opts) do
    query =
      []
      |> base_query()
      |> select([j], map(j, ^Keyword.keys(opts)))

    Repo.all(conf, query)
  end

  defp base_query(opts) do
    query =
      Job
      |> where([j], j.state in ["available", "scheduled"])
      |> order_by(desc: :id)

    opts
    |> Keyword.drop(~w(args meta)a)
    |> Enum.reduce(query, &conditions/2)
  end

  defp conditions({:queue, queue}, query), do: where(query, queue: ^to_string(queue))
  defp conditions({:state, state}, query), do: where(query, state: ^to_string(state))
  defp conditions({:worker, worker}, query), do: where(query, worker: ^Worker.to_string(worker))

  defp conditions({key, val}, query) when key in @timestamp_fields do
    {time, delta} =
      case val do
        {time, delta: delta} -> {time, delta}
        time -> {time, 1}
      end

    {:ok, time} = Ecto.Type.cast(:utc_datetime, time)

    begin = DateTime.add(time, -delta, :second)
    until = DateTime.add(time, +delta, :second)

    where(query, [j], fragment("? BETWEEN ? AND ?", field(j, ^key), ^begin, ^until))
  end

  defp conditions({key, value}, query), do: where(query, ^[{key, value}])

  defp filter_jobs(conf, opts) do
    {json_opts, base_opts} = Keyword.split(opts, @json_fields)

    json_opts = Keyword.new(json_opts, fn {key, val} -> {key, json_recode(val)} end)

    conf
    |> Repo.all(base_query(base_opts))
    |> Enum.filter(fn job -> Enum.all?(json_opts, &contains?(&1, job)) end)
  end

  defp contains?({key, "_"}, data) when is_map_key(data, key), do: true

  defp contains?({key, val}, data) when is_map(val) do
    case Map.get(data, key) do
      nil -> false
      sub -> Enum.all?(val, &contains?(&1, sub))
    end
  end

  defp contains?({key, val}, data), do: Map.get(data, key) == val
end