defmodule Reactor.Step.Group do
@moduledoc """
Wrap the execution of a number of steps with before/after functions.
Unlike `Reactor.Step.Around`, this step doesn't need to run a nested Reactor
instance, but instead can emit the steps directly into the parent Reactor.
## Options
* `before` - a three-arity function that will be called before running any
child steps.
* `after` - a three-arity function that will be called after running any
emitted steps.
* `allow_async?` - a boolean indicating whether the emitted steps can be
executed asynchronously or must remain within the current process.
## Before function
The before function will be passed the following arguments:
1. `arguments` - the values of any step arguments needed by the group.
2. `context` - the Reactor context.
3. `steps` - the list of steps passed in the options.
This provides you the opportunity to modify the arguments, context and list of
steps to be executed.
The successful return value should be `{:ok, arguments, context, steps}`. The
returned arguments will be used to provide any `input` arguments to nested
steps.
### Example
```elixir
def no_time_travel(arguments, context, steps) do
steps = steps
|> Enum.filter(&(&1.name == :program_time_circuits))
arguments = arguments
|> Map.delete(:destination_time)
{:ok, arguments, context, steps}
end
```
## After function
The after function will be called with a single argument; a map of the nested
step results.
The successful return value should be `{:ok, any}` where `any` will be treated
as the result of the group.
```elixir
def find_current_year(results) do
case Map.fetch(results, :time_circuits) do
{:ok, %{present_time: present_time}} -> {:ok, present_time.year}
_ -> {:error, "Unable to read the present time from the time circuits"}
end
end
```
"""
use Reactor.Step
require Reactor.Argument
alias Reactor.{Argument, Builder, Step}
import Reactor.Utils
@typedoc """
The before function.
"""
@type before_fun ::
(Reactor.inputs(), Reactor.context(), [Step.t()] ->
{:ok, Reactor.inputs(), Reactor.context(), [Step.t()]} | {:error, any})
@typedoc """
The after function.
"""
@type after_fun :: (Reactor.inputs() -> {:ok, any} | {:error, any})
@type options :: [before_option | after_option | allow_async_option | steps_option]
@typedoc """
The MFA or 3-arity function which this step will call before running any
steps.
"""
@type before_option :: {:before, mfa | before_fun}
@typedoc """
The MFA or 1-arity function which this step will call after successfully
running the steps.
"""
@type after_option :: {:after, mfa | after_fun}
@typedoc """
The initial steps to pass into the "before" function.
Optional.
"""
@type steps_option :: {:steps, [Step.t()]}
@typedoc """
Should the emitted steps be allowed to run asynchronously?
Optional. Defaults to `true`.
"""
@type allow_async_option :: {:allow_async?, boolean}
@doc false
@impl true
@spec run(Reactor.inputs(), Reactor.context(), options) ::
{:ok, any, [Step.t()]} | {:error, any}
def run(arguments, context, options) do
allow_async? = Keyword.get(options, :allow_async?, true)
name = context.current_step.name
reactor = Builder.new({__MODULE__, name})
with {:ok, before_fun} <- capture_before_fun(options),
{:ok, after_fun} <- capture_after_fun(options),
{:ok, steps} <- fetch_steps(options),
{:ok, arguments, context, steps} <- before_fun.(arguments, context, steps),
{:ok, reactor} <- build_inputs(reactor, arguments),
{:ok, reactor} <- build_steps(reactor, steps),
{:ok, reactor} <- build_return_step(reactor, steps),
options <-
maybe_append_result([async?: allow_async?], fn ->
case Map.fetch(context, :concurrency_key) do
{:ok, value} -> {:concurrency_key, value}
:error -> nil
end
end),
{:ok, inner_result} <- Reactor.run(reactor, arguments, context, options),
{:ok, result} <- after_fun.(inner_result) do
{:ok, result}
else
{:error, reason} -> {:error, reason}
{:halt, reactor} -> {:halt, reactor}
end
end
defp capture_before_fun(options) do
case Keyword.fetch(options, :before) do
{:ok, fun} when is_function(fun, 3) ->
{:ok, fun}
{:ok, {m, f, []}} when is_atom(m) and is_atom(f) ->
ensure_exported(m, f, 3, fn -> {:ok, Function.capture(m, f, 3)} end)
{:ok, {m, f, a}} when is_atom(m) and is_atom(f) and is_list(a) ->
ensure_exported(m, f, length(a) + 3, fn ->
{:ok, fn arguments, context, steps -> apply(m, f, [arguments, context, steps | a]) end}
end)
_ ->
{:error,
argument_error(:options, "Expected `before` option to be a 3 arity function", options)}
end
end
defp capture_after_fun(options) do
case Keyword.fetch(options, :after) do
{:ok, fun} when is_function(fun, 1) ->
{:ok, fun}
{:ok, {m, f, []}} when is_atom(m) and is_atom(f) ->
ensure_exported(m, f, 1, fn -> {:ok, Function.capture(m, f, 1)} end)
{:ok, {m, f, a}} when is_atom(m) and is_atom(f) and is_list(a) ->
ensure_exported(m, f, length(a) + 1, fn ->
{:ok, fn results -> apply(m, f, [results | a]) end}
end)
_ ->
{:error,
argument_error(:options, "Expected `after` option to be a 1 arity function", options)}
end
end
defp ensure_exported(m, f, arity, callback) do
if Code.ensure_loaded?(m) && function_exported?(m, f, arity) do
callback.()
else
{:error, "Expected `#{inspect(m)}.#{f}/#{arity}` to be exported."}
end
end
defp fetch_steps(options) do
steps = Keyword.get(options, :steps, [])
if Enum.all?(steps, &is_struct(&1, Step)) do
{:ok, steps}
else
{:error,
argument_error(
:options,
"Expected `steps` option to be a list of `Reactor.Step` structs",
options
)}
end
end
defp build_inputs(reactor, arguments) do
arguments
|> map_while_ok(&Argument.Build.build/1)
|> and_then(&{:ok, List.flatten(&1)})
|> and_then(fn arguments ->
reduce_while_ok(arguments, reactor, &Builder.add_input(&2, &1.name))
end)
end
defp build_steps(reactor, steps) do
{:ok, %{reactor | steps: Enum.concat(steps, reactor.steps)}}
end
defp build_return_step(reactor, steps) do
arguments =
steps
|> Enum.map(&Argument.from_result(&1.name, &1.name))
return_step_name = {__MODULE__, :return_step}
with {:ok, reactor} <-
Builder.add_step(reactor, return_step_name, Step.ReturnAllArguments, arguments,
async?: false,
max_retries: 0
) do
Builder.return(reactor, return_step_name)
end
end
end