defmodule Reactor.Step.Around do
@moduledoc """
Wrap the execution of a number of steps in a function.
This allows you to provide custom context and filter the provided steps as
needed.
## Options
* `fun` - a four-arity function that will be called when executing this step.
* `steps` - a list of steps which are will be provided to the above mentioned
function.
* `allow_async?` - a boolean indicating whether the nested steps can be
executed asynchronously or must remain within the current process.
## Wrapper function
Your around function will be called by this step and will be passed the
following arguments:
* `arguments` - the arguments passed to the step.
* `context` - the context passed to the step.
* `steps` - the list of steps passed in the options.
* `callback` - a 3 arity function that you can call to execute steps.
This provides you the opportunity to modify the arguments, context and list of
steps to be executed. You then can call the callback with the modified
arguments, context and steps and they will be executed in a Reactor of their
own. The callback will return `{:ok, results}` where results is a map of all
of the step results by name, or an error tuple.
You can then modify the result in any way before returning it as the return of
the around step.
## Callback function
The callback function will spawn a separate Reactor and run provided steps to
completion using `arguments` as input.
It expects the following three arguments to be passed:
1. `arguments` - a map of arguments to be used as input to the nested Reactor.
2. `context` - the context passed to the nested Reactor.
3. `steps` - the list of steps which will be executed in the nested Reactor.
## Example
You could use a function like that below to cause some steps to be executed
inside an Ecto database transaction.
```elixir
def in_transaction(arguments, context, steps, callback) do
MyApp.Repo.transaction(fn ->
case callback.(arguments, context, steps) do
{:ok, results} -> result
{:error, reason} -> raise reason
end
end)
end
```
"""
use Reactor.Step
alias Reactor.{Argument, Builder, Step}
import Reactor.Utils
@typedoc """
The type signature for the provided callback function.
"""
@type callback ::
(Reactor.inputs(), Reactor.context(), [Step.t()] -> {:ok, any} | {:error, any})
@typedoc """
The type signature for the "around" function.
"""
@type around_fun ::
(Reactor.inputs(), Reactor.context(), [Step.t()], callback ->
{:ok, any} | {:error, any})
@type options :: [function_option | steps_option | allow_async_option]
@typedoc """
The MFA or 4-arity function which this step will call.
"""
@type function_option :: {:fun, mfa | around_fun}
@typedoc """
The initial steps to pass into the "around" function.
Optional.
"""
@type steps_option :: {:steps, [Step.t()]}
@typedoc """
Should the inner Reactor be allowed to run tasks asynchronously?
Optional. Defaults to `true`.
"""
@type allow_async_option :: {:allow_async?, boolean}
@doc false
@impl true
@spec run(Reactor.inputs(), Reactor.context(), options) :: {:ok, any} | {:error, any}
def run(arguments, context, options) do
with {:ok, fun} <- capture(options),
{:ok, steps} <- fetch_steps(options),
{:ok, result} <- fun.(arguments, context, steps, &__MODULE__.around(&1, &2, &3, options)) do
{:ok, result}
else
:error -> {:error, "Missing `fun` option."}
{:error, reason} -> {:error, reason}
end
end
def around(_arguments, _context, [], _options), do: {:ok, %{}}
def around(arguments, context, steps, options) do
allow_async? = Keyword.get(options, :allow_async?, true)
name = context.current_step.name
reactor = Builder.new({__MODULE__, name})
with {:ok, reactor} <- build_inputs(reactor, arguments),
{:ok, reactor} <- build_steps(reactor, steps),
{:ok, reactor} <- build_return_step(reactor, steps),
options <-
maybe_append_result([async?: allow_async?], fn ->
case Map.fetch(context, :concurrency_key) do
{:ok, value} -> {:concurrency_key, value}
:error -> nil
end
end),
{:ok, result} <-
Reactor.run(reactor, arguments, context, options) do
{:ok, result}
else
{:error, reason} -> {:error, reason}
{:halt, reactor} -> {:halt, reactor}
end
end
defp capture(options) do
case Keyword.fetch(options, :fun) do
{:ok, fun} when is_function(fun, 4) ->
{:ok, fun}
{:ok, {m, f, []}} when is_atom(m) and is_atom(f) ->
ensure_exported(m, f, 4, fn -> {:ok, Function.capture(m, f, 4)} end)
{:ok, {m, f, a}} when is_atom(m) and is_atom(f) and is_list(a) ->
ensure_exported(m, f, length(a) + 4, fn ->
{:ok,
fn arguments, context, steps, callback ->
apply(m, f, [arguments, context, steps, callback | a])
end}
end)
_ ->
{:error,
argument_error(:options, "Expected `fun` option to be a 4 arity function", options)}
end
end
defp ensure_exported(m, f, arity, callback) do
if Code.ensure_loaded?(m) && function_exported?(m, f, arity) do
callback.()
else
{:error, "Expected `#{inspect(m)}.#{f}/#{arity}` to be exported."}
end
end
defp fetch_steps(options) do
steps = Keyword.get(options, :steps, [])
if Enum.all?(steps, &is_struct(&1, Step)) do
{:ok, steps}
else
{:error,
argument_error(
:options,
"Expected `steps` option to be a list of `Reactor.Step` structs",
options
)}
end
end
defp build_inputs(reactor, arguments) do
arguments
|> map_while_ok(&Argument.Build.build/1)
|> and_then(&{:ok, List.flatten(&1)})
|> and_then(fn arguments ->
reduce_while_ok(arguments, reactor, &Builder.add_input(&2, &1.name))
end)
end
defp build_steps(reactor, steps) do
{:ok, %{reactor | steps: Enum.concat(steps, reactor.steps)}}
end
defp build_return_step(reactor, steps) do
arguments =
steps
|> Enum.map(&Argument.from_result(&1.name, &1.name))
return_step_name = {__MODULE__, :return_step}
with {:ok, reactor} <-
Builder.add_step(reactor, return_step_name, Step.ReturnAllArguments, arguments,
async?: false,
max_retries: 0
) do
Builder.return(reactor, return_step_name)
end
end
end