lib/reactor/step/compose.ex

defmodule Reactor.Step.Compose do
  @moduledoc """
  A built-in step which can embed one reactor inside another.

  This is different to the `Builder.compose` and DSL `compose` methods.  Those
  methods build a new reactor by combining the steps of the two input reactors,
  whereas this step expands the provided reactor at runtime and dynamically
  inserts it's steps into the running reactor.

  If emitting the reactor's steps into the current reactor would be recursive,
  then the reactor is directly executed within the step using `Reactor.run/4`.
  """

  use Reactor.Step
  alias Reactor.{Argument, Builder, Error.ComposeError, Info, Step}
  import Reactor, only: :macros
  import Reactor.Argument, only: :macros
  import Reactor.Utils

  @doc false
  @impl true
  def run(arguments, context, options) do
    reactor = Keyword.fetch!(options, :reactor)
    reactor_id = get_reactor_id(reactor)

    context
    |> get_composed_reactors()
    |> MapSet.member?(reactor_id)
    |> if do
      handle_recursive_reactor(reactor, arguments, context)
    else
      handle_non_recursive_reactor(reactor, arguments, context)
    end
  end

  defp handle_recursive_reactor(reactor, arguments, context),
    do: Reactor.run(reactor, arguments, context, concurrency_key: context.concurrency_key)

  defp handle_non_recursive_reactor(reactor, arguments, context) when is_atom(reactor) do
    with {:ok, reactor} <- Info.to_struct(reactor) do
      handle_non_recursive_reactor(reactor, arguments, context)
    end
  end

  defp handle_non_recursive_reactor(reactor, arguments, context) do
    current_step = Map.fetch!(context, :current_step)

    with :ok <- validate_arguments_match_inputs(arguments, reactor),
         :ok <- validate_reactor_has_return(reactor),
         {:ok, inner_steps} <- rewrite_steps(reactor, current_step.name, arguments),
         {:ok, recursion_step} <- create_recursion_step(reactor, current_step.name) do
      steps =
        inner_steps
        |> Enum.concat([recursion_step])

      {:ok, nil, steps}
    end
  end

  defp get_reactor_id(reactor) when is_atom(reactor), do: reactor
  defp get_reactor_id(reactor) when is_reactor(reactor), do: reactor.id

  defp get_composed_reactors(context) when not is_nil(context.private.composed_reactors),
    do: context.private.composed_reactors

  defp get_composed_reactors(_context), do: MapSet.new()

  defp validate_reactor_has_return(reactor) when is_nil(reactor.return),
    do:
      {:error,
       ComposeError.exception(
         inner_reactor: reactor,
         message: "The inner Reactor must have an explicit return value."
       )}

  defp validate_reactor_has_return(reactor) do
    if Enum.any?(reactor.steps, &(&1.name == reactor.return)) do
      :ok
    else
      {:error,
       ComposeError.exception(
         inner_reactor: reactor,
         message:
           "The inner Reactor return value does not correspond with an existing Reactor step."
       )}
    end
  end

  defp create_recursion_step(reactor, name) do
    Builder.new_step(
      name,
      {Step.AnonFn, run: fn args, _, _ -> {:ok, args.value} end},
      [value: {:result, {__MODULE__, name, reactor.return}}],
      max_retries: 0
    )
  end

  defp validate_arguments_match_inputs(arguments, reactor) do
    argument_names = arguments |> Map.keys() |> MapSet.new()
    input_names = MapSet.new(reactor.inputs)

    input_names
    |> MapSet.difference(argument_names)
    |> Enum.to_list()
    |> case do
      [] ->
        :ok

      [input] ->
        {:error,
         ComposeError.exception(
           inner_reactor: reactor,
           arguments: arguments,
           message: "Missing argument for input `#{input}`"
         )}

      inputs ->
        inputs = sentence(inputs, &"`#{&1}`", ", ", " and ")

        {:error,
         ComposeError.exception(
           inner_reactor: reactor,
           arguments: arguments,
           message: "Missing arguments for inputs #{inputs}"
         )}
    end
  end

  defp rewrite_steps(reactor, name, inputs) when not is_nil(reactor.plan) do
    steps =
      reactor.plan
      |> Graph.vertices()
      |> Enum.concat(reactor.steps)

    rewrite_steps(%{reactor | steps: steps, plan: nil}, name, inputs)
  end

  defp rewrite_steps(reactor, name, inputs) do
    reactor.steps
    |> map_while_ok(&rewrite_step(&1, name, inputs))
  end

  defp rewrite_step(step, name, inputs) do
    with {:ok, arguments} <- map_while_ok(step.arguments, &rewrite_argument(&1, name, inputs)) do
      {:ok,
       %{
         step
         | arguments: arguments,
           name: {__MODULE__, name, step.name},
           impl: {Step.ComposeWrapper, original: step.impl, prefix: [__MODULE__, name]}
       }}
    end
  end

  defp rewrite_argument(argument, _name, inputs) when is_from_input(argument) do
    value = Map.fetch!(inputs, argument.source.name)
    {:ok, Argument.from_value(argument.name, value)}
  end

  defp rewrite_argument(argument, name, _inputs) when is_from_result(argument),
    do: {:ok, Argument.from_result(argument.name, {__MODULE__, name, argument.source.name})}

  defp rewrite_argument(argument, _name, _inputs) when is_from_value(argument),
    do: {:ok, argument}
end