# SPDX-FileCopyrightText: 2023 ash_oban contributors <https://github.com/ash-project/ash_oban/graphs/contributors>
#
# SPDX-License-Identifier: MIT
defmodule AshOban.Test do
@moduledoc """
Helpers for testing AshOban triggers and scheduled actions.
## Setup
Use this module in your test case to get all helpers at once:
defmodule MyApp.DataCase do
use ExUnit.CaseTemplate
using do
quote do
use AshOban.Test, repo: MyApp.Repo
end
end
end
`use AshOban.Test` accepts the same options as `use Oban.Testing` and
delegates to it, so you get both Oban's own testing helpers and AshOban's
helpers in a single call.
See the [Testing Guide](/documentation/topics/testing.md) for full setup
instructions.
## Helpers
### Running triggers
`schedule_and_run_triggers/2` runs all matching jobs synchronously, which
is the most common thing you need in tests:
# Run all triggers for a resource
AshOban.Test.schedule_and_run_triggers(MyApp.MyResource)
# Run a specific trigger
AshOban.Test.schedule_and_run_triggers({MyApp.MyResource, :process})
assert %{success: 1, failure: 0} =
AshOban.Test.schedule_and_run_triggers(MyApp.MyResource)
### Asserting enqueued jobs
After an action that calls `run_oban_trigger`, use `assert_triggered/3` to
verify the job was enqueued for the right record:
{:ok, appointment} = Appointment.update(appointment, params)
assert_triggered(appointment, :create_zoom_meeting)
These are **macros** that require `all_enqueued/1` to be in scope (provided
by `use AshOban.Test` or `use Oban.Testing` directly).
### Asserting scheduler eligibility
Use `assert_would_schedule/3` and `refute_would_schedule/3` to check whether
a record's current state matches a trigger's `where` filter without running
the scheduler:
record = MyApp.create_record!()
assert_would_schedule(record, :process)
processed = MyApp.process!(record)
refute_would_schedule(processed, :process)
"""
@pro Application.compile_env(:ash_oban, :pro?)
@doc """
Sets up AshOban test helpers.
Delegates to `use Oban.Testing` so Oban's own helpers (`all_enqueued`,
`assert_enqueued`, etc.) are also available. Accepts the same options.
## Options
* `:repo` - Required. The Ecto repo to use for querying Oban jobs.
* `:prefix` - The database prefix for Oban tables. Defaults to `"public"`.
## Example
use AshOban.Test, repo: MyApp.Repo, prefix: "private"
"""
defmacro __using__(opts) do
module =
if @pro do
Oban.Pro.Testing
else
Oban.Testing
end
quote do
use unquote(module), unquote(opts)
import AshOban.Test
end
end
@doc """
Asserts that an Oban job has been enqueued for the given record and trigger.
Returns the list of matching `Oban.Job` structs so you can make further
assertions on job properties if needed.
Requires `all_enqueued/1` to be in scope — use `use AshOban.Test` or
`use Oban.Testing` to set this up.
## Options
Additional options are merged with the trigger-derived options (`:worker`,
`:args`) and forwarded to `all_enqueued/1`. Use these to narrow the
assertion:
* `:queue` - assert the job is in a specific queue
* `:state` - assert the job is in a specific state
* `:priority` - assert the job has a specific priority
## Examples
{:ok, appointment} = Appointment.update(appointment, params)
assert_triggered(appointment, :create_zoom_meeting)
# Narrow to a specific queue
assert_triggered(appointment, :create_zoom_meeting, queue: :zoom_api)
# Further assert on the returned jobs
[job] = assert_triggered(appointment, :create_zoom_meeting)
assert job.priority == 2
"""
defmacro assert_triggered(record, trigger_name, opts \\ []) do
quote do
record_val = unquote(record)
resource = record_val.__struct__
trigger_name_val = unquote(trigger_name)
trigger = AshOban.Test.__fetch_trigger__!(resource, trigger_name_val)
primary_key = Ash.Resource.Info.primary_key(resource)
pk_map = Map.new(primary_key, fn key -> {key, Map.get(record_val, key)} end)
enqueued_opts =
Keyword.merge(
[worker: trigger.worker, args: %{primary_key: pk_map}],
unquote(opts)
)
matching = all_enqueued(enqueued_opts)
ExUnit.Assertions.assert(
matching != [],
"Expected an Oban job to be enqueued for trigger #{inspect(trigger_name_val)} on " <>
"#{inspect(resource)} with primary key #{inspect(pk_map)}, but none was found"
)
matching
end
end
@doc """
Refutes that an Oban job has been enqueued for the given record and trigger.
Requires `all_enqueued/1` to be in scope — use `use AshOban.Test` or
`use Oban.Testing` to set this up.
## Options
Additional options are merged with the trigger-derived options (`:worker`,
`:args`) and forwarded to `all_enqueued/1`. Use these to narrow the
assertion:
* `:queue` - refute that the job is in a specific queue
* `:state` - refute that the job is in a specific state
* `:priority` - refute that the job has a specific priority
## Examples
refute_triggered(appointment, :create_zoom_meeting)
# Narrow to a specific queue
refute_triggered(appointment, :create_zoom_meeting, queue: :zoom_api)
"""
defmacro refute_triggered(record, trigger_name, opts \\ []) do
quote do
record_val = unquote(record)
resource = record_val.__struct__
trigger_name_val = unquote(trigger_name)
trigger = AshOban.Test.__fetch_trigger__!(resource, trigger_name_val)
primary_key = Ash.Resource.Info.primary_key(resource)
pk_map = Map.new(primary_key, fn key -> {key, Map.get(record_val, key)} end)
enqueued_opts =
Keyword.merge(
[worker: trigger.worker, args: %{primary_key: pk_map}],
unquote(opts)
)
matching = all_enqueued(enqueued_opts)
ExUnit.Assertions.assert(
matching == [],
"Expected no Oban jobs to be enqueued for trigger #{inspect(trigger_name_val)} on " <>
"#{inspect(resource)} with primary key #{inspect(pk_map)}, " <>
"but #{length(matching)} job(s) were found"
)
end
end
@doc false
def __fetch_trigger__!(resource, trigger_name) do
case AshOban.Info.oban_trigger(resource, trigger_name) do
nil ->
raise ArgumentError,
"No trigger named #{inspect(trigger_name)} found for resource #{inspect(resource)}"
trigger ->
trigger
end
end
@doc """
Schedules and runs triggers, draining queues by default.
This is a wrapper around `AshOban.schedule_and_run_triggers/2` with
`drain_queues?: true` set by default, making it suitable for use in tests
where you want jobs to execute synchronously.
## Arguments
* `resources_or_domains_or_otp_apps` - Can be any of the following:
* A resource module - runs all triggers for that resource
* A `{resource, :trigger_name}` tuple - runs a specific trigger
* A domain module - runs all triggers for all resources in that domain
* An OTP app atom - runs all triggers for all domains in that app
* A list of any of the above
## Options
* `:drain_queues?` - Whether to drain queues after scheduling. Defaults to `true`
(unlike `AshOban.schedule_and_run_triggers/2` which defaults to `false`).
* `:scheduled_actions?` - Whether to include scheduled actions. Defaults to `false`.
When passing a `{resource, :scheduled_action_name}` tuple this is automatically enabled.
* `:triggers?` - Whether to include triggers. Defaults to `true`.
* `:actor` - The actor to pass to the trigger actions. Requires an actor persister to be configured.
* `:oban` - The Oban instance to use. Defaults to `Oban`.
* `:queue` - Passed through to `Oban.drain_queue/2`.
* `:with_limit` - Passed through to `Oban.drain_queue/2`.
* `:with_recursion` - Passed through to `Oban.drain_queue/2`.
* `:with_safety` - Passed through to `Oban.drain_queue/2`.
* `:with_scheduled` - Passed through to `Oban.drain_queue/2`.
## Return Value
Returns a map with job outcome counts:
%{
success: non_neg_integer(),
failure: non_neg_integer(),
discard: non_neg_integer(),
cancelled: non_neg_integer(),
snoozed: non_neg_integer(),
queues_not_drained: [atom()]
}
## Examples
# Run all triggers for a resource and assert outcomes
assert %{success: 3, failure: 0} =
AshOban.Test.schedule_and_run_triggers(MyResource)
# Run a specific trigger with an actor
assert %{success: 2} =
AshOban.Test.schedule_and_run_triggers({MyResource, :process},
actor: %MyApp.User{id: 1}
)
"""
def schedule_and_run_triggers(resources_or_domains_or_otp_apps, opts \\ []) do
opts = Keyword.put_new(opts, :drain_queues?, true)
AshOban.schedule_and_run_triggers(resources_or_domains_or_otp_apps, opts)
end
@doc """
Asserts that the given record currently matches a trigger's `where` filter,
meaning the scheduler would enqueue a job for it if it ran now.
This does **not** check whether a job has already been enqueued — use
`assert_triggered/3` for that. Instead, it checks the record's current
state in the database against the trigger's filter expression.
Returns the matched record.
## Options
* `:actor` - The actor to use when reading the record. Defaults to `nil`.
* `:tenant` - The tenant to use when reading the record. Defaults to `nil`.
* `:domain` - The domain to use when reading. Defaults to the resource's
configured Oban domain.
## Examples
# After creating a record, assert it is eligible for scheduling
record = MyApp.create_record!()
assert_would_schedule(record, :process)
# After processing, assert it is no longer eligible
processed = MyApp.process!(record)
refute_would_schedule(processed, :process)
"""
def assert_would_schedule(%resource{} = record, trigger_name, opts \\ []) do
trigger = __fetch_trigger__!(resource, trigger_name)
primary_key = Ash.Resource.Info.primary_key(resource)
pk_filter = Map.take(record, primary_key)
result = query_for_scheduling(resource, trigger, pk_filter, opts)
ExUnit.Assertions.assert(
result != nil,
"Expected #{inspect(resource)} record with primary key #{inspect(pk_filter)} " <>
"to match the where filter for trigger #{inspect(trigger_name)}, but it does not"
)
result
end
@doc """
Refutes that the given record currently matches a trigger's `where` filter.
This is the inverse of `assert_would_schedule/3`. It asserts that the
scheduler would **not** enqueue a job for the record if it ran now.
## Options
* `:actor` - The actor to use when reading the record. Defaults to `nil`.
* `:tenant` - The tenant to use when reading the record. Defaults to `nil`.
* `:domain` - The domain to use when reading. Defaults to the resource's
configured Oban domain.
## Examples
processed = MyApp.process!(record)
refute_would_schedule(processed, :process)
"""
def refute_would_schedule(%resource{} = record, trigger_name, opts \\ []) do
trigger = __fetch_trigger__!(resource, trigger_name)
primary_key = Ash.Resource.Info.primary_key(resource)
pk_filter = Map.take(record, primary_key)
result = query_for_scheduling(resource, trigger, pk_filter, opts)
ExUnit.Assertions.assert(
result == nil,
"Expected #{inspect(resource)} record with primary key #{inspect(pk_filter)} " <>
"to NOT match the where filter for trigger #{inspect(trigger_name)}, but it does"
)
end
defp query_for_scheduling(resource, trigger, pk_filter, opts) do
domain = opts[:domain] || AshOban.Info.oban_domain!(resource)
read_action = trigger.read_action || Ash.Resource.Info.primary_action!(resource, :read).name
resource
|> Ash.Query.do_filter(pk_filter)
|> then(fn query ->
if trigger.where do
Ash.Query.do_filter(query, trigger.where)
else
query
end
end)
|> Ash.Query.for_read(read_action, %{},
authorize?: false,
actor: opts[:actor],
domain: domain
)
|> Ash.Query.set_tenant(opts[:tenant])
|> Ash.read_one!()
end
end