defmodule Reactor.Executor.StepRunner do
@moduledoc """
Run an individual step, including compensation if possible.
"""
alias Reactor.{
Error.Invalid.ArgumentSubpathError,
Error.Invalid.CompensateStepError,
Error.Invalid.MissingInputError,
Error.Invalid.MissingResultError,
Error.Invalid.RunStepError,
Error.Invalid.UndoRetriesExceededError,
Error.Invalid.UndoStepError,
Executor.ConcurrencyTracker,
Executor.Hooks,
Executor.State,
Step
}
import Reactor.Utils
import Reactor.Argument, only: :macros
require Logger
# In the future this could be moved into a step property.
@max_undo_count 5
@doc """
Collect the arguments and and run a step, with compensation if required.
"""
@spec run(Reactor.t(), State.t(), Step.t(), ConcurrencyTracker.pool_key()) ::
{:ok, any, [Step.t()]} | :retry | {:retry, any} | {:error | :halt, any}
def run(reactor, state, step, concurrency_key) do
with {:ok, arguments} <- get_step_arguments(reactor, step),
{:ok, context} <- build_context(reactor, state, step, concurrency_key),
{:ok, arguments} <- maybe_replace_arguments(arguments, context) do
metadata = %{
current_step: step,
pid: self(),
reactor: reactor,
concurrency_key: concurrency_key
}
metadata_stack = Process.get(:__reactor__, [])
Process.put(:__reactor__, [metadata | metadata_stack])
result = do_run(reactor, step, arguments, context)
Process.put(:__reactor__, metadata_stack)
result
end
end
@doc """
Run a step inside a task.
This is a simple wrapper around `run/4` except that it emits more events.
"""
@spec run_async(Reactor.t(), State.t(), Step.t(), ConcurrencyTracker.pool_key(), map) ::
{:ok, any, [Step.t()]} | :retry | {:retry, any} | {:error | :halt, any}
def run_async(reactor, state, step, concurrency_key, process_contexts) do
Hooks.set_process_contexts(process_contexts)
Hooks.event(reactor, {:process_start, self()}, step, reactor.context)
run(reactor, state, step, concurrency_key)
after
Hooks.event(reactor, {:process_terminate, self()}, step, reactor.context)
end
@doc """
Undo a step if possible.
"""
@spec undo(Reactor.t(), State.t(), Step.t(), any, ConcurrencyTracker.pool_key()) ::
:ok | {:error, any}
def undo(reactor, state, step, value, concurrency_key) do
with {:ok, arguments} <- get_step_arguments(reactor, step),
{:ok, context} <- build_context(reactor, state, step, concurrency_key),
{:ok, arguments} <- maybe_replace_arguments(arguments, context) do
Hooks.event(reactor, :undo_start, step, context)
do_undo(reactor, value, step, arguments, context, 0)
end
end
defp do_undo(reactor, _value, step, _arguments, context, undo_count)
when undo_count == @max_undo_count do
error = UndoRetriesExceededError.exception(step: step.name, retry_count: undo_count)
Hooks.event(reactor, {:undo_error, error}, step, context)
{:error, error}
end
defp do_undo(reactor, value, step, arguments, context, undo_count) do
case Step.undo(step, value, arguments, context) do
:ok ->
Hooks.event(reactor, :undo_complete, step, context)
:ok
:retry ->
Hooks.event(reactor, :undo_retry, step, context)
do_undo(reactor, value, step, arguments, context, undo_count + 1)
{:retry, reason} ->
Hooks.event(reactor, {:undo_retry, reason}, step, context)
do_undo(reactor, value, step, arguments, context, undo_count + 1)
{:error, reason} ->
error = UndoStepError.exception(step: step.name, error: reason)
Hooks.event(reactor, {:undo_error, error}, step, context)
{:error, error}
end
end
defp do_run(reactor, step, arguments, context) do
Hooks.event(reactor, {:run_start, arguments}, step, context)
step
|> Step.run(arguments, context)
|> handle_run_result(reactor, step, arguments, context)
rescue
reason ->
error = RunStepError.exception(step: step, error: reason)
Hooks.event(reactor, {:run_error, error}, step, context)
maybe_compensate(reactor, step, error, arguments, context)
end
defp handle_run_result({:ok, value}, reactor, step, _arguments, context) do
Hooks.event(reactor, {:run_complete, value}, step, context)
{:ok, value, []}
end
defp handle_run_result({:ok, value, steps}, reactor, step, _arguments, context)
when is_list(steps) do
Hooks.event(reactor, {:run_complete, value}, step, context)
{:ok, value, steps}
end
defp handle_run_result({:retry, reason}, reactor, step, _arguments, context) do
Hooks.event(reactor, {:run_retry, reason}, step, context)
{:retry, reason}
end
defp handle_run_result(:retry, reactor, step, _arguments, context) do
Hooks.event(reactor, :run_retry, step, context)
:retry
end
defp handle_run_result({:error, reason}, reactor, step, arguments, context) do
error = RunStepError.exception(step: step, error: reason)
Hooks.event(reactor, {:run_error, error}, step, context)
maybe_compensate(reactor, step, error, arguments, context)
end
defp handle_run_result({:halt, value}, reactor, step, _arguments, context) do
Hooks.event(reactor, {:run_halt, value}, step, context)
{:halt, value}
end
defp maybe_compensate(reactor, step, error, arguments, context) do
if Step.can?(step, :compensate) do
compensate(reactor, step, error, arguments, context)
else
{:error, error}
end
end
defp compensate(reactor, step, error, arguments, context) do
Hooks.event(reactor, {:compensate_start, error}, step, context)
step
|> Step.compensate(error.error, arguments, context)
|> handle_compensate_result(reactor, step, context, error)
rescue
error ->
error =
CompensateStepError.exception(
reactor: reactor,
step: step,
error: error,
stacktrace: __STACKTRACE__
)
Hooks.event(reactor, {:compensate_error, error}, step, context)
Logger.error(fn ->
"Warning: step `#{inspect(step.name)}` `compensate/4` raised an error:\n" <>
Exception.format(:error, error, __STACKTRACE__)
end)
{:error, error}
end
defp handle_compensate_result({:continue, value}, reactor, step, context, _) do
Hooks.event(reactor, {:compensate_continue, value}, step, context)
{:ok, value, []}
end
defp handle_compensate_result({:retry, reason}, reactor, step, context, _) do
Hooks.event(reactor, {:compensate_retry, reason}, step, context)
{:retry, reason}
end
defp handle_compensate_result(:retry, reactor, step, context, reason) do
Hooks.event(reactor, :compensate_retry, step, context)
{:retry, reason}
end
defp handle_compensate_result({:error, reason}, reactor, step, context, _) do
error = CompensateStepError.exception(reactor: reactor, step: step, error: reason)
Hooks.event(reactor, {:compensate_error, error}, step, context)
{:error, error}
end
defp handle_compensate_result(:ok, reactor, step, context, error) do
Hooks.event(reactor, :compensate_complete, step, context)
{:error, error}
end
defp get_step_arguments(reactor, step) do
reduce_while_ok(step.arguments, %{}, fn
argument, arguments when argument.name == :_ ->
{:ok, arguments}
argument, arguments ->
with {:ok, value} <- fetch_argument(reactor, step, argument),
{:ok, value} <- subpath_argument(value, step, argument) do
{:ok, Map.put(arguments, argument.name, value)}
end
end)
end
defp fetch_argument(reactor, step, argument) when is_from_input(argument) do
with :error <- Map.fetch(reactor.context.private.inputs, argument.source.name) do
{:error, MissingInputError.exception(reactor: reactor, step: step, argument: argument)}
end
end
defp fetch_argument(reactor, step, argument) when is_from_result(argument) do
with :error <- Map.fetch(reactor.intermediate_results, argument.source.name) do
{:error, MissingResultError.exception(reactor: reactor, step: step, argument: argument)}
end
end
defp fetch_argument(_reactor, _step, argument) when is_from_value(argument) do
{:ok, argument.source.value}
end
defp subpath_argument(value, step, argument) when has_sub_path(argument),
do: perform_argument_subpath(value, step, argument, argument.source.sub_path, [], value)
defp subpath_argument(value, _step, _argument), do: {:ok, value}
defp perform_argument_subpath(
value,
step,
argument,
remaining_path,
done_path,
intermediate_value
)
defp perform_argument_subpath(_value, _step, _argument, [], _, result), do: {:ok, result}
defp perform_argument_subpath(
value,
step,
argument,
[key | remaining_path],
done_path,
intermediate_value
)
when is_map(intermediate_value) do
case Map.fetch(intermediate_value, key) do
{:ok, intermediate_value} ->
perform_argument_subpath(
value,
step,
argument,
remaining_path,
[key | done_path],
intermediate_value
)
:error ->
type = if is_struct(intermediate_value), do: "struct", else: "map"
{:error,
ArgumentSubpathError.exception(
step: step,
argument: argument,
culprit: intermediate_value,
culprit_path: done_path,
culprit_key: key,
value: value,
message:
"key `#{inspect(key)}` not present in #{type} at path `#{inspect(done_path)}`."
)}
end
end
defp perform_argument_subpath(
value,
step,
argument,
[key | remaining_path],
done_path,
intermediate_value
)
when is_list(intermediate_value) do
if Keyword.keyword?(intermediate_value) do
case Keyword.fetch(intermediate_value, key) do
{:ok, intermediate_value} ->
perform_argument_subpath(
value,
step,
argument,
remaining_path,
[key | done_path],
intermediate_value
)
:error ->
{:error,
ArgumentSubpathError.exception(
step: step,
argument: argument,
culprit: intermediate_value,
culprit_path: done_path,
culprit_key: key,
value: value,
message:
"key `#{inspect(key)}` not present in keyword list at path `#{inspect(done_path)}`."
)}
end
else
{:error,
ArgumentSubpathError.exception(
step: step,
argument: argument,
value: value,
culprit: intermediate_value,
culprit_key: List.first(done_path),
culprit_path: done_path,
message: "list at path `#{inspect(done_path)}` is not a keyword list."
)}
end
end
defp perform_argument_subpath(
value,
step,
argument,
_remaining_path,
done_path,
intermediate_value
) do
{:error,
ArgumentSubpathError.exception(
step: step,
argument: argument,
value: value,
culprit: intermediate_value,
culprit_path: done_path,
culprit_key: List.first(done_path),
message: "value is neither a map or keyword list."
)}
end
defp build_context(reactor, state, step, concurrency_key) do
current_try =
state
|> Map.get(:retries, %{})
|> Map.get(step.ref, 0)
retries_remaining =
step
|> Map.get(:max_retries)
|> case do
:infinity -> :infinity
max when is_integer(max) and max >= 0 -> max - current_try
end
context =
step.context
|> deep_merge(reactor.context)
|> Map.merge(%{
current_step: step,
concurrency_key: concurrency_key,
current_try: current_try,
retries_remaining: retries_remaining
})
|> Map.put(:current_step, step)
|> Map.put(:concurrency_key, concurrency_key)
{:ok, context}
end
defp maybe_replace_arguments(arguments, context) when is_nil(context.private.replace_arguments),
do: {:ok, arguments}
defp maybe_replace_arguments(arguments, context)
when is_map_key(arguments, context.private.replace_arguments),
do: {:ok, Map.get(arguments, context.private.replace_arguments)}
defp maybe_replace_arguments(arguments, _context), do: {:ok, arguments}
end