defmodule Reactor.Executor do
@moduledoc """
The Reactor executor.
The executor handles the main loop of running a Reactor.
The algorithm is somewhat confusing, so here it is in pseudocode:
1. Find any async tasks (from a previous loop) which are completed. Either
recurse or continue if none are found.
2. Find any async steps in the plan which are ready to run (they have no
in-edges in the graph) and start as many as possible (given the constraints
of `max_concurrency`). Either start over, or continue if none are found.
3. Find a single synchronous step which is ready to run and execute it. If
there was one then recurse, otherwise continue.
4. Check if there are no more steps left in the plan (there are zero
vertices). If so, collect the return value and exit, otherwise recurse.
Whenever a step is run, whether run synchronously or asynchronously, the
following happens:
1. When the step is successful:
a. If the step is undoable (ie `Step.can?(module, :undo)?` returns `true`)
then the step and the result are stored in the Reactor's undo stack.
b. If the result is depended upon by another step (the graph has out-edges
for the step) _or_ the step is asking the reactor to halt then the
result is stored in the Reactor's intermediate results.
c. The step is removed from the graph (along with it's out-edges, freeing
up it's dependents to run).
2. When the step is unsuccessful (returns an error tuple or raises):
a. If the step can be compensated then compensation is attempted up to
five times before giving up.
b. The reactor iterates it's undo stack calling undo on each step.
3. When a step or compensation asks for a retry then the step is placed back
in the graph to be run again next iteration.
"""
alias Reactor.{Executor, Planner}
@doc """
Run a reactor.
Provided a Reactor which has been planned and the correct inputs, then run
the Reactor until completion, halting or failure.
You probably shouldn't call this directly, but use `Reactor.run/4` instead.
"""
@spec run(Reactor.t(), Reactor.inputs(), Reactor.context(), Reactor.options()) ::
{:ok, any} | {:halted, Reactor.t()} | {:error, any}
def run(reactor, inputs \\ %{}, context \\ %{}, options \\ [])
def run(reactor, _inputs, _context, _options) when is_nil(reactor.return),
do: {:error, ArgumentError.exception("`reactor` has no return value")}
def run(reactor, inputs, context, options) when reactor.state in ~w[pending halted]a do
case Executor.Init.init(reactor, inputs, context, options) do
{:ok, reactor, state} -> execute(reactor, state)
{:error, reason} -> {:error, reason}
end
end
def run(_reactor, _inputs, _context, _options),
do: {:error, ArgumentError.exception("`reactor` is not in `pending` or `halted` state")}
defp execute(reactor, state) when state.max_iterations == 0 do
{reactor, _status} = Executor.Async.collect_remaining_tasks_for_shutdown(reactor, state)
{:halted, %{reactor | state: :halted}}
end
defp execute(reactor, state) do
with {:continue, reactor, state} <- handle_unplanned_steps(reactor, state),
{:continue, reactor, state} <- handle_completed_async_steps(reactor, state),
{:continue, reactor, state} <- start_ready_async_steps(reactor, state),
{:continue, reactor, state} <- run_ready_sync_step(reactor, state),
{:continue, reactor} <- all_done(reactor) do
execute(reactor, subtract_iteration(state))
else
{:recurse, reactor, state} ->
execute(reactor, subtract_iteration(state))
{:undo, reactor, state} ->
handle_undo(reactor, state)
{:halt, reactor, _state} ->
{:halted, %{reactor | state: :halted}}
{:ok, result} ->
{:ok, result}
{:error, reason} ->
{:error, reason}
end
end
defp handle_unplanned_steps(reactor, state) when reactor.steps == [],
do: {:continue, reactor, state}
defp handle_unplanned_steps(reactor, state) do
case Planner.plan(reactor) do
{:ok, reactor} -> {:recurse, reactor, state}
{:error, reason} -> {:undo, reactor, %{state | errors: [reason | state.errors]}}
end
end
defp handle_completed_async_steps(reactor, state) do
Executor.Async.handle_completed_steps(reactor, state)
end
defp start_ready_async_steps(reactor, state)
when map_size(state.current_tasks) == state.max_concurrency,
do: {:continue, reactor, state}
defp start_ready_async_steps(reactor, state) do
steps = find_ready_async_steps(reactor)
Executor.Async.start_steps(reactor, state, steps)
end
defp run_ready_sync_step(reactor, state) do
step = find_ready_sync_step(reactor)
Executor.Sync.run(reactor, state, step)
end
defp subtract_iteration(state) when state.max_iterations == :infinity, do: state
defp subtract_iteration(state) when state.max_iterations > 0,
do: %{state | max_iterations: state.max_iterations - 1}
defp handle_undo(reactor, state) do
handle_undo(%{reactor | state: :failed, undo: []}, state, Enum.reverse(reactor.undo))
end
defp handle_undo(_reactor, state, []), do: {:error, state.errors}
defp handle_undo(reactor, state, [{step, value} | tail]) do
case Executor.StepRunner.undo(reactor, step, value) do
:ok -> handle_undo(reactor, state, tail)
{:error, reason} -> handle_undo(reactor, %{state | errors: [reason | state.errors]}, tail)
end
end
defp all_done(reactor) do
with 0 <- Graph.num_vertices(reactor.plan),
{:ok, value} <- Map.fetch(reactor.intermediate_results, reactor.return) do
{:ok, value}
else
:error -> {:error, "Unable to find result for `#{inspect(reactor.return)}` step"}
n when is_integer(n) -> {:continue, reactor}
end
end
defp find_ready_async_steps(reactor) do
reactor
|> find_ready_steps(& &1.async?)
|> Enum.to_list()
end
defp find_ready_sync_step(reactor) do
reactor
|> find_ready_steps(&(&1.async? == false))
|> Enum.at(0)
end
defp find_ready_steps(reactor, predicate) when is_function(predicate, 1) do
reactor.plan
|> Graph.vertices()
|> Stream.filter(&(Graph.in_degree(reactor.plan, &1) == 0))
|> Stream.reject(&is_struct(&1, Task))
|> Stream.filter(predicate)
end
end