defmodule Reactor.Executor do
@moduledoc """
The Reactor executor.
The executor handles the main loop of running a Reactor.
The algorithm is somewhat confusing, so here it is in pseudocode:
1. Find any async tasks (from a previous loop) which are completed. Either
recurse or continue if none are found.
2. Find any async steps in the plan which are ready to run (they have no
in-edges in the graph) and start as many as possible (given the constraints
of `max_concurrency` and the state of the concurrency pool). Either start
over, or continue if none are found.
3. Find a single synchronous step which is ready to run and execute it. If
there was one then recurse, otherwise continue.
4. Check if there are no more steps left in the plan (there are zero
vertices). If so, collect the return value and exit, otherwise recurse.
Whenever a step is run, whether run synchronously or asynchronously, the
following happens:
1. When the step is successful:
a. If the step is undoable (ie `Step.can?(module, :undo)?` returns `true`)
then the step and the result are stored in the Reactor's undo stack.
b. If the result is depended upon by another step (the graph has out-edges
for the step) _or_ the step is asking the reactor to halt then the
result is stored in the Reactor's intermediate results.
c. The step is removed from the graph (along with it's out-edges, freeing
up it's dependents to run).
2. When the step is unsuccessful (returns an error tuple or raises):
a. If the step can be compensated then compensation is attempted up to five
times before giving up.
b. The reactor iterates it's undo stack calling undo on each step.
3. When a step or compensation asks for a retry then the step is placed back
in the graph to be run again next iteration.
"""
alias Reactor.Executor.ConcurrencyTracker
alias Reactor.{Executor, Planner, Step}
@doc """
Run a reactor.
Provided a Reactor which has been planned and the correct inputs, then run
the Reactor until completion, halting or failure.
You probably shouldn't call this directly, but use `Reactor.run/4` instead.
"""
@spec run(Reactor.t(), Reactor.inputs(), Reactor.context(), Reactor.options()) ::
{:ok, any} | {:halted, Reactor.t()} | {:error, any}
def run(reactor, inputs \\ %{}, context \\ %{}, options \\ [])
def run(reactor, _inputs, _context, _options) when is_nil(reactor.return),
do: {:error, ArgumentError.exception("`reactor` has no return value")}
def run(reactor, inputs, context, options) when reactor.state in ~w[pending halted]a do
with {:ok, context} <- Executor.Hooks.init(reactor, context),
{:ok, reactor, state} <- Executor.Init.init(reactor, inputs, context, options) do
execute(reactor, state)
end
end
def run(_reactor, _inputs, _context, _options),
do: {:error, ArgumentError.exception("`reactor` is not in `pending` or `halted` state")}
defp execute(reactor, state) when state.max_iterations == 0 do
{reactor, _status} = Executor.Async.collect_remaining_tasks_for_shutdown(reactor, state)
maybe_release_pool(state)
{:halted, %{reactor | state: :halted}}
end
defp execute(reactor, state) do
with {:continue, reactor, state} <- maybe_timeout(reactor, state),
{:continue, reactor, state} <- handle_unplanned_steps(reactor, state),
{:continue, reactor, state} <- handle_completed_async_steps(reactor, state),
{:continue, ready_steps} <- find_ready_steps(reactor, state),
{:continue, reactor, state} <- start_ready_async_steps(reactor, state, ready_steps),
{:continue, reactor, state} <- run_ready_sync_step(reactor, state, ready_steps),
{:continue, reactor, state} <- maybe_run_any_step_sync(reactor, state, ready_steps),
{:continue, reactor} <- all_done(reactor) do
execute(reactor, subtract_iteration(state))
else
{:recurse, reactor, state} ->
execute(reactor, subtract_iteration(state))
{:undo, reactor, state} ->
handle_undo(reactor, state)
{:halt, reactor, _state} ->
maybe_release_pool(state)
case Executor.Hooks.halt(reactor, reactor.context) do
{:ok, context} -> {:halted, %{reactor | context: context, state: :halted}}
{:error, reason} -> {:error, reason}
end
{:ok, result} ->
maybe_release_pool(state)
Executor.Hooks.complete(reactor, result, reactor.context)
{:error, reason} ->
maybe_release_pool(state)
Executor.Hooks.error(reactor, reason, reactor.context)
end
end
defp maybe_timeout(reactor, state) when state.timeout == :infinity,
do: {:continue, reactor, state}
defp maybe_timeout(reactor, state) do
if DateTime.diff(DateTime.utc_now(), state.started_at, :millisecond) >= state.timeout do
{reactor, _status} = Executor.Async.collect_remaining_tasks_for_shutdown(reactor, state)
{:halt, reactor, state}
else
{:continue, reactor, state}
end
end
defp handle_unplanned_steps(reactor, state) when reactor.steps == [],
do: {:continue, reactor, state}
defp handle_unplanned_steps(reactor, state) do
case Planner.plan(reactor) do
{:ok, reactor} -> {:recurse, reactor, state}
{:error, reason} -> {:undo, reactor, %{state | errors: [reason | state.errors]}}
end
end
defp handle_completed_async_steps(reactor, state) when state.async? == false,
do: {:continue, reactor, state}
defp handle_completed_async_steps(reactor, state),
do: Executor.Async.handle_completed_steps(reactor, state)
defp start_ready_async_steps(reactor, state, _) when state.async? == false,
do: {:continue, reactor, state}
defp start_ready_async_steps(reactor, state, []), do: {:continue, reactor, state}
defp start_ready_async_steps(reactor, state, steps) do
steps = Enum.filter(steps, &Step.async?/1)
Executor.Async.start_steps(reactor, state, steps)
end
defp run_ready_sync_step(reactor, state, []), do: {:continue, reactor, state}
defp run_ready_sync_step(reactor, state, [step | _]) when state.async? == false,
do: Executor.Sync.run(reactor, state, step)
defp run_ready_sync_step(reactor, state, steps) do
steps
|> Enum.find(&(!Step.async?(&1)))
|> case do
nil ->
{:continue, reactor, state}
step ->
Executor.Sync.run(reactor, state, step)
end
end
# This seems a little unintuitive, but this is what allows reactors who are
# sharing a concurrency pool to move forward even then there's no concurrency
# left without deadlocking.
#
# It's a complicated scenario, so let's lay out the pieces:
#
# 1. When a new reactor is started it allocates a concurrency pool using
# `Reactor.Executor.ConcurrencyTracker` **unless** it is explicitly passed
# a `concurrency_key` option.
# 2. Every time a reactor runs an async step it starts a `Task` and consumes a
# space in the concurrency pool (if possible).
# 3. Every task that is started has it's concurrency key stored in it's
# process dictionary (actually a stack of them because we may be multiple
# nested reactors deep).
# 4. If that async step then turns around and runs a new reactor with shared
# concurrency then that reactor is already consuming a concurrency slot and
# may not be able to allocate any more slots for it's tasks.
#
# This situation can lead to a deadlock where we have multiple reactors all in
# a tight loop trying to start tasks but none of them able to proceed.
#
# We detect this situation by:
#
# 1. We are unable to start any async steps (`start_ready_async_steps/3`
# returns `:continue`).
# 2. We are unable to start any sync steps (`run_ready_sync_step/3` returns
# `:continue`).
# 3. We have any steps which can be run (ie async ones which we couldn't
# start).
# 4. Our concurrency key is in the process dictionary.
#
# If all four of these conditions are met we pick the first step and run it
# synchronously. This is fine because the reactor process itself is a task in
# another reactor so in effect is still running asynchronously.
defp maybe_run_any_step_sync(reactor, state, []), do: {:continue, reactor, state}
defp maybe_run_any_step_sync(reactor, state, [step | _]) do
:__reactor__
|> Process.get([])
|> Enum.any?(&(&1.concurrency_key == state.concurrency_key))
|> if do
Executor.Sync.run(reactor, state, step)
else
{:continue, reactor, state}
end
end
defp subtract_iteration(state) when state.max_iterations == :infinity, do: state
defp subtract_iteration(state) when state.max_iterations > 0,
do: %{state | max_iterations: state.max_iterations - 1}
defp handle_undo(reactor, state) do
handle_undo(%{reactor | state: :failed, undo: []}, state, Enum.reverse(reactor.undo))
end
defp handle_undo(reactor, state, []) do
Executor.Hooks.error(reactor, state.errors, reactor.context)
end
defp handle_undo(reactor, state, [{step, value} | tail]) do
case Executor.StepRunner.undo(reactor, state, step, value, state.concurrency_key) do
:ok -> handle_undo(reactor, state, tail)
{:error, reason} -> handle_undo(reactor, %{state | errors: [reason | state.errors]}, tail)
end
end
defp all_done(reactor) do
with 0 <- Graph.num_vertices(reactor.plan),
{:ok, value} <- Map.fetch(reactor.intermediate_results, reactor.return) do
{:ok, value}
else
:error -> {:error, "Unable to find result for `#{inspect(reactor.return)}` step"}
n when is_integer(n) -> {:continue, reactor}
end
end
defp find_ready_steps(reactor, state) when state.max_concurrency > 0 do
steps =
reactor.plan
|> Graph.vertices()
|> Stream.filter(fn
step when is_struct(step, Step) -> Graph.in_degree(reactor.plan, step) == 0
_ -> false
end)
|> Enum.take(state.max_concurrency)
{:continue, steps}
end
defp find_ready_steps(reactor, _state) do
step =
reactor.plan
|> Graph.vertices()
|> Enum.find(fn
step when is_struct(step, Step) -> Graph.in_degree(reactor.plan, step) == 0
_ -> false
end)
{:continue, [step]}
end
defp maybe_release_pool(state) when state.pool_owner == true do
ConcurrencyTracker.release_pool(state.concurrency_key)
end
defp maybe_release_pool(_), do: :ok
end