lib/oban/testing.ex

defmodule Oban.Testing do
  @moduledoc """
  This module simplifies testing workers and making assertions about enqueued jobs when testing in
  `:manual` mode.

  Assertions may be made on any property of a job, but you'll typically want to check by `args`,
  `queue` or `worker`. If you're using namespacing through PostgreSQL schemas, also called
  "prefixes" in Ecto, you should use the `prefix` option when doing assertions about enqueued
  jobs during testing. By default the `prefix` option is `public`.

  ## Using in Tests

  The most convenient way to use `Oban.Testing` is to `use` the module:

      use Oban.Testing, repo: MyApp.Repo

  That will define the helper functions you'll use to make assertions on the jobs that should (or
  should not) be inserted in the database while testing.

  Along with the `repo` you can also specify an alternate prefix to use in all assertions:

      use Oban.Testing, repo: MyApp.Repo, prefix: "business"

  Some example assertions:

  ```elixir
  # Assert that a job was already enqueued
  assert_enqueued worker: MyWorker, args: %{id: 1}

  # Assert that a job was enqueued or will be enqueued in the next 100ms
  assert_enqueued [worker: MyWorker, args: %{id: 1}], 100

  # Refute that a job was already enqueued
  refute_enqueued queue: "special", args: %{id: 2}

  # Refute that a job was already enqueued or would be enqueued in the next 100ms
  refute_enqueued queue: "special", args: %{id: 2}, 100

  # Make assertions on a list of all jobs matching some options
  assert [%{args: %{"id" => 1}}] = all_enqueued(worker: MyWorker)

  # Assert that no jobs are enqueued in any queues
  assert [] = all_enqueued()
  ```

  Note that the final example, using `all_enqueued/1`, returns a raw list of matching jobs and
  does not make an assertion by itself. This makes it possible to test using pattern matching at
  the expense of being more verbose.

  ## Example

  Given a simple module that enqueues a job:

  ```elixir
  defmodule MyApp.Business do
    def work(args) do
      args
      |> Oban.Job.new(worker: MyApp.Worker, queue: :special)
      |> Oban.insert!()
    end
  end
  ```

  The behaviour can be exercised in your test code:

      defmodule MyApp.BusinessTest do
        use ExUnit.Case, async: true
        use Oban.Testing, repo: MyApp.Repo

        alias MyApp.Business

        test "jobs are enqueued with provided arguments" do
          Business.work(%{id: 1, message: "Hello!"})

          assert_enqueued worker: MyApp.Worker, args: %{id: 1, message: "Hello!"}
        end
      end

  ## Matching Scheduled Jobs and Timestamps

  In order to assert a job has been scheduled at a certain time, you will need to match against
  the `scheduled_at` attribute of the enqueued job.

      in_an_hour = DateTime.add(DateTime.utc_now(), 3600, :second)
      assert_enqueued worker: MyApp.Worker, scheduled_at: in_an_hour

  By default, Oban will apply a 1 second delta to all timestamp fields of jobs, so that small
  deviations between the actual value and the expected one are ignored. You may configure this
  delta by passing a tuple of value and a `delta` option (in seconds) to corresponding keyword:

      assert_enqueued worker: MyApp.Worker, scheduled_at: {in_an_hour, delta: 10}

  ## Adding to Case Templates

  To include helpers in all of your tests you can add it to your case template:

  ```elixir
  defmodule MyApp.DataCase do
    use ExUnit.CaseTemplate

    using do
      quote do
        use Oban.Testing, repo: MyApp.Repo

        import Ecto
        import Ecto.Changeset
        import Ecto.Query
        import MyApp.DataCase

        alias MyApp.Repo
      end
    end
  end
  ```
  """
  @moduledoc since: "0.3.0"

  import ExUnit.Assertions, only: [assert: 2, refute: 2]
  import Ecto.Query, only: [limit: 2, order_by: 2, select: 2, where: 2, where: 3]

  alias Ecto.Changeset

  alias Oban.{Config, Job, Queue.Executor, Repo, Worker}

  @wait_interval 10

  @doc false
  defmacro __using__(opts) do
    repo = Keyword.fetch!(opts, :repo)
    prefix = Keyword.get(opts, :prefix, "public")

    quote do
      alias Oban.Testing

      def perform_job(worker, args, opts \\ []) do
        opts =
          opts
          |> Keyword.put_new(:repo, unquote(repo))
          |> Keyword.put_new(:prefix, unquote(prefix))

        Testing.perform_job(worker, args, opts)
      end

      def all_enqueued(opts \\ []) do
        opts = Keyword.put_new(opts, :prefix, unquote(prefix))

        Testing.all_enqueued(unquote(repo), opts)
      end

      def assert_enqueued(opts, timeout \\ :none) do
        opts = Keyword.put_new(opts, :prefix, unquote(prefix))

        if timeout == :none do
          Testing.assert_enqueued(unquote(repo), opts)
        else
          Testing.assert_enqueued(unquote(repo), opts, timeout)
        end
      end

      def refute_enqueued(opts, timeout \\ :none) do
        opts = Keyword.put_new(opts, :prefix, unquote(prefix))

        if timeout == :none do
          Testing.refute_enqueued(unquote(repo), opts)
        else
          Testing.refute_enqueued(unquote(repo), opts, timeout)
        end
      end
    end
  end

  @doc """
  Construct a job and execute it with a worker module.

  This reduces boilerplate when constructing jobs for unit tests and checks for common pitfalls.
  For example, it automatically converts `args` to string keys before calling `perform/1`,
  ensuring that perform clauses aren't erroneously trying to match atom keys.

  The helper makes the following assertions:

  * That the worker implements the `Oban.Worker` behaviour
  * That the options provided build a valid job
  * That the return is valid, e.g. `:ok`, `{:ok, value}`, `{:error, value}` etc.

  If all of the assertions pass then the function returns the result of `perform/1` for you to
  make additional assertions on.

  ## Examples

  Successfully execute a job with some string arguments:

      assert :ok = perform_job(MyWorker, %{"id" => 1})

  Successfully execute a job and assert that it returns an error tuple:

      assert {:error, _} = perform_job(MyWorker, %{"bad" => "arg"})

  Execute a job with the args keys automatically stringified:

      assert :ok = perform_job(MyWorker, %{id: 1})

  Exercise custom attempt handling within a worker by passing options:

      assert :ok = perform_job(MyWorker, %{}, attempt: 42)

  Cause a test failure because the provided worker isn't real:

      assert :ok = perform_job(Vorker, %{"id" => 1})
  """
  @doc since: "2.0.0"
  @spec perform_job(
          worker :: Worker.t(),
          args :: term(),
          opts :: [Job.option() | {:repo, module()}]
        ) ::
          Worker.result()
  def perform_job(worker, args, opts) when is_atom(worker) do
    {conf_opts, opts} = Keyword.split(opts, [:log, :prefix, :repo])

    opts = Keyword.put_new(opts, :attempt, 1)

    assert_valid_worker(worker)

    changeset =
      args
      |> worker.new(opts)
      |> Changeset.update_change(:args, &json_encode_decode/1)

    assert_valid_changeset(changeset)

    result =
      conf_opts
      |> Config.new()
      |> Executor.new(create_job(changeset), safe: false, ack: false)
      |> Executor.call()
      |> Map.fetch!(:result)

    assert_valid_result(result)

    result
  end

  @doc """
  Retrieve all currently enqueued jobs matching a set of options.

  Only jobs matching all of the provided arguments will be returned. Additionally, jobs are
  returned in descending order where the most recently enqueued job will be listed first.

  ## Examples

  Assert based on only _some_ of a job's args:

      assert [%{args: %{"id" => 1}}] = all_enqueued(worker: MyWorker)

  Assert that exactly one job was inserted for a queue:

      assert [%Oban.Job{}] = all_enqueued(queue: :alpha)

  Assert that there aren't any jobs enqueued for any queues or workers:

      assert [] = all_enqueued()
  """
  @doc since: "0.6.0"
  @spec all_enqueued(repo :: module(), opts :: Keyword.t()) :: [Job.t()]
  def all_enqueued(repo, opts) when is_list(opts) do
    {conf, opts} = extract_conf(repo, opts)

    Repo.all(conf, base_query(opts))
  end

  @doc """
  Assert that a job with particular options has been enqueued.

  Only values for the provided arguments will be checked. For example, an assertion made on
  `worker: "MyWorker"` will match _any_ jobs for that worker, regardless of the queue or args.
  """
  @doc since: "0.3.0"
  @spec assert_enqueued(repo :: module(), opts :: Keyword.t()) :: true
  def assert_enqueued(repo, [_ | _] = opts) do
    error_message = """
    Expected a job matching:

    #{inspect_opts(opts)}

    to be enqueued in the #{inspect(opts[:prefix])} schema. Instead found:

    #{inspect(available_jobs(repo, opts), pretty: true)}
    """

    assert get_job(repo, opts), error_message
  end

  @doc """
  Assert that a job with particular options is or will be enqueued within a timeout period.

  See `assert_enqueued/2` for additional details.

  ## Examples

  Assert that a job will be enqueued in the next 100ms:

      assert_enqueued [worker: MyWorker], 100
  """
  @doc since: "1.2.0"
  @spec assert_enqueued(repo :: module(), opts :: Keyword.t(), timeout :: pos_integer()) :: true
  def assert_enqueued(repo, [_ | _] = opts, timeout) when timeout > 0 do
    error_message = """
    Expected a job matching:

    #{inspect_opts(opts)}

    to be enqueued in the #{inspect(opts[:prefix])} schema within #{timeout}ms
    """

    assert wait_for_job(repo, opts, timeout), error_message
  end

  @doc """
  Refute that a job with particular options has been enqueued.

  See `assert_enqueued/2` for additional details.
  """
  @doc since: "0.3.0"
  @spec refute_enqueued(repo :: module(), opts :: Keyword.t()) :: false
  def refute_enqueued(repo, [_ | _] = opts) do
    error_message = """
    Expected no jobs matching:

    #{inspect_opts(opts)}

    to be enqueued in the #{inspect(opts[:prefix])} schema
    """

    refute get_job(repo, opts), error_message
  end

  @doc """
  Refute that a job with particular options is or will be enqueued within a timeout period.

  The minimum refute timeout is 10ms.

  See `assert_enqueued/2` for additional details.

  ## Examples

  Refute that a job will be enqueued in the next 100ms:

      refute_enqueued [worker: MyWorker], 100
  """
  @doc since: "1.2.0"
  @spec refute_enqueued(repo :: module(), opts :: Keyword.t(), timeout :: pos_integer()) :: false
  def refute_enqueued(repo, [_ | _] = opts, timeout) when timeout >= 10 do
    error_message = """
    Expected no jobs matching:

    #{inspect_opts(opts)}

    to be enqueued in the #{inspect(opts[:prefix])} schema within #{timeout}ms
    """

    refute wait_for_job(repo, opts, timeout), error_message
  end

  @doc """
  Change the testing mode within the context of a function.

  Only `:manual` and `:inline` mode are supported, as `:disabled` implies that supervised queues
  and plugins are running and this function won't start any processes.

  ## Examples

  Switch to `:manual` mode when an Oban instance is configured for `:inline` testing:

      Oban.Testing.with_testing_mode(:manual, fn ->
        Oban.insert(MyWorker.new(%{id: 123}))

        assert_enqueued worker: MyWorker, args: %{id: 123}
      end)

  Visa-versa, switch to `:inline` mode:

      Oban.Testing.with_testing_mode(:inline, fn ->
        {:ok, %Job{state: "completed"}} = Oban.insert(MyWorker.new(%{id: 123}))
      end)
  """
  @doc since: "2.12.0"
  @spec with_testing_mode(:inline | :manual, (() -> any())) :: any()
  def with_testing_mode(mode, fun) when mode in [:manual, :inline] and is_function(fun, 0) do
    Process.put(:oban_testing, mode)

    fun.()
  after
    Process.delete(:oban_testing)
  end

  # Assert Helpers

  defp inspect_opts(opts) do
    opts
    |> Map.new()
    |> Map.drop([:prefix])
    |> inspect(pretty: true)
  end

  # Perform Helpers

  defp assert_valid_worker(worker) do
    assert Code.ensure_loaded?(worker) and implements_worker?(worker), """
     Expected worker to be a module that implements the Oban.Worker behaviour, got:

    #{inspect(worker)}
    """
  end

  defp implements_worker?(worker) do
    :attributes
    |> worker.__info__()
    |> Keyword.get_values(:behaviour)
    |> List.flatten()
    |> Enum.member?(Oban.Worker)
  end

  defp assert_valid_changeset(changeset) do
    assert changeset.valid?, """
    Expected args and opts to build a valid job, got validation errors:

    #{traverse_errors(changeset)}
    """
  end

  defp traverse_errors(changeset) do
    traverser = fn {message, opts} ->
      Enum.reduce(opts, message, fn {key, value}, acc ->
        String.replace(acc, "%{#{key}}", to_string(value))
      end)
    end

    changeset
    |> Changeset.traverse_errors(traverser)
    |> Enum.map_join("\n", fn {key, val} -> "#{key}: #{val}" end)
  end

  defp json_encode_decode(map) do
    map
    |> Jason.encode!()
    |> Jason.decode!()
  end

  defp assert_valid_result(result) do
    valid? =
      case result do
        :ok -> true
        {:ok, _value} -> true
        {:cancel, _value} -> true
        {:error, _value} -> true
        {:discard, _value} -> true
        {:snooze, snooze} when is_integer(snooze) -> true
        :discard -> true
        _ -> false
      end

    assert valid?, """
    Expected result to be one of

      - `:ok`
      - `{:ok, value}`
      - `{:cancel, reason}`
      - `{:error, reason}`
      - `{:snooze, duration}

    Instead received:

    #{inspect(result, pretty: true)}
    """
  end

  # Enqueued Helpers

  defp get_job(repo, opts) do
    {conf, opts} = extract_conf(repo, opts)

    Repo.one(conf, opts |> base_query() |> limit(1) |> select([:id]))
  end

  defp wait_for_job(repo, opts, timeout) when timeout > 0 do
    case get_job(repo, opts) do
      nil ->
        Process.sleep(@wait_interval)

        wait_for_job(repo, opts, timeout - @wait_interval)

      job ->
        job
    end
  end

  defp wait_for_job(_repo, _opts, _timeout), do: nil

  defp available_jobs(repo, opts) do
    {conf, opts} = extract_conf(repo, opts)

    fields = Keyword.keys(opts)

    conf
    |> Repo.all([] |> base_query() |> select(^fields))
    |> Enum.map(&Map.take(&1, fields))
  end

  defp base_query(opts) do
    fields_with_opts = normalize_fields(opts)

    Job
    |> where([j], j.state in ["available", "scheduled"])
    |> apply_where_clauses(fields_with_opts)
    |> order_by(desc: :id)
  end

  defp extract_conf(repo, opts) do
    {conf_opts, opts} = Keyword.split(opts, [:prefix])

    conf =
      conf_opts
      |> Keyword.put(:repo, repo)
      |> Config.new()

    {conf, opts}
  end

  defp extract_field_opts({key, {value, field_opts}}, field_opts_acc) do
    {{key, value}, [{key, field_opts} | field_opts_acc]}
  end

  defp extract_field_opts({key, value}, field_opts_acc) do
    {{key, value}, field_opts_acc}
  end

  defp normalize_fields(opts) do
    {fields, field_opts} = Enum.map_reduce(opts, [], &extract_field_opts/2)

    args = Keyword.get(fields, :args, %{})
    keys = Keyword.keys(fields)

    args
    |> Job.new(fields)
    |> Changeset.apply_changes()
    |> Map.from_struct()
    |> Map.take(keys)
    |> Enum.map(fn {key, value} -> {key, value, Keyword.get(field_opts, key, [])} end)
  end

  @timestamp_fields ~W(attempted_at completed_at inserted_at scheduled_at)a
  @timestamp_default_delta_seconds 1

  defp apply_where_clauses(query, []), do: query

  defp apply_where_clauses(query, [{key, value, opts} | rest]) when key in @timestamp_fields do
    delta = Keyword.get(opts, :delta, @timestamp_default_delta_seconds)

    window_start = DateTime.add(value, -delta, :second)
    window_end = DateTime.add(value, delta, :second)

    query
    |> where([j], fragment("? BETWEEN ? AND ?", field(j, ^key), ^window_start, ^window_end))
    |> apply_where_clauses(rest)
  end

  defp apply_where_clauses(query, [{key, value, _opts} | rest]) do
    query
    |> where(^[{key, value}])
    |> apply_where_clauses(rest)
  end

  defp create_job(changeset) do
    changeset
    |> default_to_now(:attempted_at)
    |> default_to_now(:scheduled_at)
    |> Changeset.apply_action!(:insert)
  end

  defp default_to_now(changeset, field) do
    value = Changeset.get_change(changeset, field) || DateTime.utc_now()

    Changeset.put_change(changeset, field, value)
  end
end