defmodule Commanded.Middleware.Pipeline do
@moduledoc """
Pipeline is a struct used as an argument in the callback functions of modules
implementing the `Commanded.Middleware` behaviour.
This struct must be returned by each function to be used in the next
middleware based on the configured middleware chain.
## Pipeline fields
- `application` - the Commanded application.
- `assigns` - shared user data as a map.
- `causation_id` - an optional UUID used to identify the cause of the
command being dispatched.
- `correlation_id` - an optional UUID used to correlate related
commands/events together.
- `command` - command struct being dispatched.
- `command_uuid` - UUID assigned to the command being dispatched.
- `consistency` - requested dispatch consistency, either: `:eventual`
(default) or `:strong`.
- `halted` - flag indicating whether the pipeline was halted.
- `identity` - an atom specifying a field in the command containing the
aggregate's identity or a one-arity function that returns an identity
from the command being dispatched.
- `identity_prefix` - an optional prefix to the aggregate's identity. It may
be a string (e.g. "prefix-") or a zero arity function
(e.g. `&MyRouter.identity_prefix/0`).
- `metadata` - the metadata map to be persisted along with the events.
- `response` - sets the response to send back to the caller.
"""
defstruct [
:application,
:causation_id,
:correlation_id,
:command,
:command_uuid,
:consistency,
:identity,
:identity_prefix,
:metadata,
:response,
assigns: %{},
halted: false
]
alias Commanded.Middleware.Pipeline
@doc """
Puts the `key` with value equal to `value` into `assigns` map.
"""
def assign(%Pipeline{} = pipeline, key, value) when is_atom(key) do
%Pipeline{assigns: assigns} = pipeline
%Pipeline{pipeline | assigns: Map.put(assigns, key, value)}
end
@doc """
Puts the `key` with value equal to `value` into `metadata` map.
Note: Use of atom keys in metadata is deprecated in favour of binary strings.
"""
def assign_metadata(%Pipeline{} = pipeline, key, value) when is_binary(key) or is_atom(key) do
%Pipeline{metadata: metadata} = pipeline
%Pipeline{pipeline | metadata: Map.put(metadata, key, value)}
end
@doc """
Has the pipeline been halted?
"""
def halted?(%Pipeline{halted: halted}), do: halted
@doc """
Halts the pipeline by preventing further middleware downstream from being invoked.
Prevents dispatch of the command if `halt` occurs in a `before_dispatch` callback.
"""
def halt(%Pipeline{} = pipeline) do
%Pipeline{pipeline | halted: true} |> respond({:error, :halted})
end
@doc """
Extract the response from the pipeline
"""
def response(%Pipeline{response: response}), do: response
@doc """
Sets the response to be returned to the dispatch caller, unless already set.
"""
def respond(%Pipeline{response: nil} = pipeline, response) do
%Pipeline{pipeline | response: response}
end
def respond(%Pipeline{} = pipeline, _response), do: pipeline
@doc """
Executes the middleware chain.
"""
def chain(pipeline, stage, middleware)
def chain(%Pipeline{} = pipeline, _stage, []), do: pipeline
def chain(%Pipeline{halted: true} = pipeline, :before_dispatch, _middleware), do: pipeline
def chain(%Pipeline{halted: true} = pipeline, :after_dispatch, _middleware), do: pipeline
def chain(%Pipeline{} = pipeline, stage, [module | modules]) do
chain(apply(module, stage, [pipeline]), stage, modules)
end
end