defmodule Commanded.Assertions.EventAssertions do
@moduledoc """
Provides test assertion and wait for event functions to help test applications
built using Commanded.
The default assert and refute receive timeouts are one second.
You can override the default timeout in config (e.g. `config/test.exs`):
config :commanded,
assert_receive_event_timeout: 1_000,
refute_receive_event_timeout: 1_000
"""
import ExUnit.Assertions
alias Commanded.EventStore
alias Commanded.EventStore.RecordedEvent
@doc """
Assert that events matching their respective predicates have a matching
correlation id.
Useful when there is a chain of events that is connected through event handlers.
## Example
assert_correlated(
BankApp,
BankAccountOpened, fn opened -> opened.id == 1 end,
InitialAmountDeposited, fn deposited -> deposited.id == 2 end
)
"""
def assert_correlated(application, event_type_a, predicate_a, event_type_b, predicate_b) do
assert_receive_event(application, event_type_a, predicate_a, fn _event_a, metadata_a ->
assert_receive_event(application, event_type_b, predicate_b, fn _event_b, metadata_b ->
assert metadata_a.correlation_id == metadata_b.correlation_id
end)
end)
end
@doc """
Assert that an event of the given event type is published.
Verify that event using the assertion function.
## Example
assert_receive_event(BankApp, BankAccountOpened, fn opened ->
assert opened.account_number == "ACC123"
end)
"""
def assert_receive_event(application, event_type, assertion_fn)
when is_function(assertion_fn, 1) or is_function(assertion_fn, 2) do
assert_receive_event(application, event_type, fn _event -> true end, assertion_fn)
end
@doc """
Assert that an event of the given event type, matching the predicate, is
published. Verify that event using the assertion function.
## Example
assert_receive_event(BankApp, BankAccountOpened,
fn opened -> opened.account_number == "ACC123" end,
fn opened ->
assert opened.balance == 1_000
end)
"""
def assert_receive_event(application, event_type, predicate_fn, assertion_fn)
when is_function(assertion_fn, 1) or is_function(assertion_fn, 2) do
unless Code.ensure_loaded?(event_type) do
raise ExUnit.AssertionError, "Event #{inspect(event_type)} not found"
end
with_subscription(application, fn subscription ->
do_assert_receive(application, subscription, event_type, predicate_fn, assertion_fn)
end)
end
@doc """
Refute that an event of the given type has been received.
An optional predicate may be provided to filter events matching the refuted
type.
## Examples
Refute that `ExampleEvent` is produced by given anonymous function:
refute_receive_event(ExampleApp, ExampleEvent, fn ->
:ok = MyApp.dispatch(command)
end)
Refute that `ExampleEvent` is produced by `some_func/0` function:
refute_receive_event(ExampleApp, ExampleEvent, &some_func/0)
Refute that `ExampleEvent` matching given `event_matches?/1` predicate funtion
is produced by `some_func/0` function:
refute_receive_event(ExampleApp, ExampleEvent, &some_func/0,
predicate: &event_matches?/1
)
Refute that `ExampleEvent` matching given anonymous predicate funtion
is produced by `some_func/0` function:
refute_receive_event(ExampleApp, ExampleEvent, &some_func/0,
predicate: fn event -> event.value == 1 end
)
Refute that `ExampleEvent` produced by `some_func/0` function is published to
a given stream:
refute_receive_event(ExampleApp, ExampleEvent, &some_func/0,
predicate: fn event -> event.value == 1 end,
stream: "foo-1234"
)
"""
def refute_receive_event(application, event_type, refute_fn, opts \\ [])
when is_function(refute_fn, 0) do
predicate_fn = Keyword.get(opts, :predicate) || fn _event -> true end
timeout = Keyword.get(opts, :timeout, default_refute_receive_timeout())
subscription_opts = Keyword.take(opts, [:stream]) |> Keyword.put(:start_from, :current)
reply_to = self()
ref = make_ref()
# Start a task to subscribe and verify received events
task =
Task.async(fn ->
with_subscription(
application,
fn subscription ->
send(reply_to, {:subscribed, ref})
do_refute_receive_event(application, subscription, event_type, predicate_fn)
end,
subscription_opts
)
end)
# Wait until subscription has subscribed before executing refute function,
# otherwise we might not receive a matching event.
assert_receive {:subscribed, ^ref}, default_receive_timeout()
refute_fn.()
case Task.yield(task, timeout) || Task.shutdown(task) do
{:ok, :ok} -> :ok
{:ok, {:error, event}} -> flunk("Unexpectedly received event: " <> inspect(event))
{:exit, error} -> flunk("Encountered an error: " <> inspect(error))
nil -> :ok
end
end
@doc """
Wait for an event of the given event type to be published.
## Examples
wait_for_event(BankApp, BankAccountOpened)
"""
def wait_for_event(application, event_type) do
wait_for_event(application, event_type, fn _event -> true end)
end
@doc """
Wait for an event of the given event type, matching the predicate, to be
published.
## Examples
wait_for_event(BankApp, BankAccountOpened, fn opened ->
opened.account_number == "ACC123"
end)
"""
def wait_for_event(application, event_type, predicate_fn) when is_function(predicate_fn) do
with_subscription(application, fn subscription ->
do_wait_for_event(application, subscription, event_type, predicate_fn)
end)
end
@doc false
def with_subscription(application, callback_fn, opts \\ [])
when is_function(callback_fn, 1) do
subscription_name = UUID.uuid4()
stream = Keyword.get(opts, :stream, :all)
start_from = Keyword.get(opts, :start_from, :origin)
{:ok, subscription} =
EventStore.subscribe_to(application, stream, subscription_name, self(), start_from)
assert_receive {:subscribed, ^subscription}, default_receive_timeout()
try do
callback_fn.(subscription)
after
:ok = EventStore.unsubscribe(application, subscription)
:ok = EventStore.delete_subscription(application, stream, subscription_name)
end
end
defp do_assert_receive(application, subscription, event_type, predicate_fn, assertion_fn) do
assert_receive {:events, received_events}, default_receive_timeout()
case find_expected_event(received_events, event_type, predicate_fn) do
%RecordedEvent{data: data} = expected_event ->
args =
cond do
is_function(assertion_fn, 1) -> [data]
is_function(assertion_fn, 2) -> [data, expected_event]
end
apply(assertion_fn, args)
nil ->
:ok = ack_events(application, subscription, received_events)
do_assert_receive(application, subscription, event_type, predicate_fn, assertion_fn)
end
end
defp do_refute_receive_event(application, subscription, event_type, predicate_fn) do
receive do
{:events, events} ->
case find_expected_event(events, event_type, predicate_fn) do
%RecordedEvent{data: data} ->
{:error, data}
nil ->
:ok = ack_events(application, subscription, events)
do_refute_receive_event(application, subscription, event_type, predicate_fn)
end
end
end
defp do_wait_for_event(application, subscription, event_type, predicate_fn) do
assert_receive {:events, received_events}, default_receive_timeout()
case find_expected_event(received_events, event_type, predicate_fn) do
%RecordedEvent{} = expected_event ->
expected_event
nil ->
:ok = ack_events(application, subscription, received_events)
do_wait_for_event(application, subscription, event_type, predicate_fn)
end
end
defp find_expected_event(received_events, event_type, predicate_fn) do
Enum.find(received_events, fn
%RecordedEvent{data: %{__struct__: ^event_type} = data} = received_event ->
args =
cond do
is_function(predicate_fn, 1) -> [data]
is_function(predicate_fn, 2) -> [data, received_event]
end
apply(predicate_fn, args)
%RecordedEvent{} ->
false
end)
end
defp ack_events(_application, _subscription, []), do: :ok
defp ack_events(application, subscription, [event]),
do: EventStore.ack_event(application, subscription, event)
defp ack_events(application, subscription, [_event | events]),
do: ack_events(application, subscription, events)
defp default_receive_timeout,
do: Application.get_env(:commanded, :assert_receive_event_timeout, 1_000)
defp default_refute_receive_timeout,
do: Application.get_env(:commanded, :refute_receive_event_timeout, 1_000)
end