lib/runbox/runtime/stage/sandbox.ex

defmodule Runbox.Runtime.Stage.Sandbox do
  @moduledoc """
  Sandbox is helper for executing runs without started Altworx application.

  Sandbox can be used for scenarios unit testing and also for interactive
  development of scenarios. When used for unit testing, it can be used
  directly or via `Runbox.Runtime.Stage.Sandbox.TestRunner`.

  For interactive development of scenarios, run `iex --erl "-runbox mode slave" -S mix`
  from command line in your scenarios application root directory and then use Sandbox
  fuctions to execute your runs (of course use `recompile` before run execution if you
  changed scenario code).
  """

  alias Mix.Project
  alias Runbox.RunNode
  alias Runbox.RunStartContext
  alias Runbox.Runtime.Stage.ComponentNetwork
  alias Runbox.Runtime.Stage.Sandbox.InputStream
  alias Runbox.Runtime.Stage.Sandbox.OutputStream
  alias Runbox.Runtime.Stage.Sandbox.StartRecipe
  alias Runbox.Runtime.Stage.StateGenerator
  alias Runbox.Runtime.Stage.Timezip
  alias Runbox.Scenario.Template
  alias Runbox.ScenarioRelease.SlaveFunc
  alias Runbox.ScenarioTemplate
  alias Runbox.StateStore.Entity
  alias Toolbox.Message
  alias Toolbox.Scenario.OutputAction

  @default_start_from 0
  defstruct [
    :run_id,
    :scenario_id,
    :sup,
    :topics,
    :start_recipe,
    start_ctx: %RunStartContext{},
    start_from: @default_start_from
  ]

  @type topics :: %{topic_name => [Message.t()]}
  @type topic_name :: String.t()

  @doc """
  Starts new run for `scenario_id` and waits until all input messages are processed.

  Returns all output actions generated by run execution. Input messages for each runtime
  topic used by scenario should be defined in `topics` argument. Undefined topics are
  considered empty. Messages in each topic must be in ascending order by `timestamp`.

  There are few options that can be specified.

  - `:start_from` - specify the start from value for the run. The value is used to filter out
    messages (as in a real run, input topics only use messages with `timestamp >= start_from`) and
    the value is provided to template's `init` callback.
  - `:modules` - list of modules containing scenarios (manifests, templates). If list of modules is
    specified `execute_run` does not search scenario in `:scenarios` application release, but in
    given modules. It opens possibility to write ad-hoc scenarios with only dependency on Runbox
    application, for example use it in [Livebook](https://livebook.dev/). To see example of such
    livebook just run `livebook server --home .` in runbox root directory, go to generated livebook
    server link in browser and finally open `sandbox-scenario-demo.livemd` document. Running docker
    or altworx is not needed, only runbox app (and its deps) is required!
  """
  @spec execute_run(topics, String.t(), [module] | nil) :: {:ok, OutputAction.t()} | {:error, term}
  def execute_run(topics, scenario_id, opts \\ []) do
    modules = Keyword.get(opts, :modules) || modules_from_mix_app()
    start_from = Keyword.get(opts, :start_from, @default_start_from)

    with {:ok, scenario} <- find_scenario(scenario_id, modules),
         {:ok, component_network} <- create_component_network(scenario.templates),
         start_recipe = StartRecipe.create(component_network),
         sandbox = new(topics, scenario_id, start_recipe, start_from),
         sandbox = generate_components_state(sandbox),
         sandbox = append_end_of_topic_msgs(sandbox),
         :ok <- maybe_execute_on_start(scenario),
         {:ok, sandbox} <- start(sandbox) do
      try do
        with {:ok, sandbox} <- start_components(sandbox),
             :ok <- start_demand(sandbox) do
          OutputStream.wait_end_of_run(
            sandbox.start_ctx.components_pids[:output_sink],
            get_end_of_topic_msgs(sandbox)
          )
        end
      after
        :ok = stop(sandbox)
      end
    end
  end

  defp new(topics, scenario_id, start_recipe, start_from) do
    %__MODULE__{
      run_id: UUID.uuid1(),
      topics: topics,
      scenario_id: scenario_id,
      start_recipe: start_recipe,
      start_from: start_from
    }
  end

  defp modules_from_mix_app do
    if Code.ensure_loaded?(Project) and function_exported?(Project, :config, 0) do
      case Keyword.fetch(Project.config(), :app) do
        {:ok, app} -> Application.spec(app, :modules)
        :error -> nil
      end
    end
  end

  defp find_scenario(scenario_id, modules) do
    case Enum.find(SlaveFunc.release_info(modules), &(&1.manifest.id == scenario_id)) do
      nil -> {:error, :not_found}
      scenario -> {:ok, scenario}
    end
  end

  defp create_component_network(templates) do
    templates
    |> Enum.map(fn %ScenarioTemplate{} = t -> %Template{mod: t.module, info: t.info} end)
    |> ComponentNetwork.create(direct_ticking: false)
  end

  defp generate_components_state(sandbox) do
    update_in(sandbox.start_recipe, fn recipe ->
      Enum.map(recipe, &generate_component_state(&1, sandbox))
    end)
  end

  defp generate_component_state(%{id: {:template, _template}} = component, sandbox) do
    {id, state} = StateGenerator.generate_for_component(component.id, sandbox.run_id)
    entity = Entity.new(sandbox.run_id, id, :none, sandbox.start_from, state)

    config =
      component.args.config
      |> Map.put(:scenario_id, sandbox.scenario_id)
      |> Map.put(:start_from, sandbox.start_from)
      |> Map.put(:start_or_continue, :start)

    args =
      component.args
      |> Map.put(:run_id, sandbox.run_id)
      |> Map.put(:config, config)
      |> Map.put(:state_entity, entity)

    %{component | args: args}
  end

  defp generate_component_state(%{id: {:timezip, _}} = component, sandbox) do
    config =
      component.args.config
      |> Map.put(:scenario_id, sandbox.scenario_id)

    args =
      component.args
      |> Map.put(:run_id, sandbox.run_id)
      |> Map.put(:config, config)

    %{component | args: args}
  end

  defp generate_component_state(%{id: {input_topic_type, topic}} = component, sandbox)
       when input_topic_type in [:input_topic, :load_topic] do
    messages = filter_messages(sandbox.topics[topic] || [], input_topic_type, sandbox.start_from)
    put_in(component, [:args, :messages], messages)
  end

  defp generate_component_state(component, _sandbox) do
    component
  end

  defp append_end_of_topic_msgs(sandbox) do
    timestamp = max_timestamp(sandbox.topics) + 1

    update_in(sandbox.start_recipe, fn recipe ->
      Enum.map(recipe, fn
        %{id: {input_topic_type, topic}} = component
        when input_topic_type in [:input_topic, :load_topic] ->
          update_in(component, [:args, :messages], &(&1 ++ [end_of_topic_msg(topic, timestamp)]))

        component ->
          component
      end)
    end)
  end

  defp max_timestamp(topics, default_max \\ System.os_time(:millisecond)) do
    topics
    |> Map.values()
    |> Enum.reject(&Enum.empty?/1)
    |> Enum.map(&Enum.at(&1, -1).timestamp)
    |> Enum.max(fn -> default_max end)
  end

  defp end_of_topic_msg(topic, timestamp) do
    Message.new(:sandbox, "tick", timestamp, {:end_of_topic, topic})
  end

  defp get_end_of_topic_msgs(sandbox) do
    Enum.flat_map(sandbox.start_recipe, fn
      %{id: {input_topic_type, _}} = component
      when input_topic_type in [:input_topic, :load_topic] ->
        case Enum.at(component.args[:messages], -1) do
          %Message{body: {:end_of_topic, _}} = msg -> [msg]
          _ -> []
        end

      _ ->
        []
    end)
  end

  defp maybe_execute_on_start(scenario) do
    case scenario.opts[:on_start] do
      {m, f, a} -> apply(m, f, a)
      nil -> :ok
    end
  end

  defp start(%__MODULE__{} = sandbox) do
    with {:ok, sup} <- RunNode.start_run_sup(sandbox.run_id) do
      Process.link(sup)
      {:ok, %__MODULE__{sandbox | sup: sup}}
    end
  end

  defp start_components(sandbox) do
    sandbox.start_recipe
    |> Enum.reduce_while(sandbox.start_ctx, fn comp, start_ctx ->
      case start_comp(comp, sandbox.sup, start_ctx) do
        {:ok, {pid, name}} ->
          {:cont, RunStartContext.add_component(start_ctx, name, pid)}

        {:error, _reason} = error ->
          {:halt, error}
      end
    end)
    |> then(fn
      %RunStartContext{} = start_ctx -> {:ok, %__MODULE__{sandbox | start_ctx: start_ctx}}
      {:error, _reason} = error -> error
    end)
  end

  defp start_comp(component, sup, start_ctx) do
    with {:ok, pid} <- start_comp_(component, sup, start_ctx) do
      {:ok, {pid, StartRecipe.component_name(component)}}
    end
  end

  defp start_comp_(%{mod: mod} = comp, sup, start_ctx)
       when mod in [InputStream, OutputStream] do
    %{id: id, mod: mod, fun: fun, args: args} = comp
    child_spec = %{id: id, type: :worker, start: {mod, fun, [args, start_ctx]}, restart: :transient}
    Supervisor.start_child(sup, child_spec)
  end

  defp start_comp_(%{id: {type, _}, mod: mod} = comp, sup, start_ctx)
       when type == :template or mod == Timezip do
    runbox_ctx = %Runbox.RunContext{sup_pid: sup}
    RunNode.start_run_component(comp, runbox_ctx, start_ctx)
  end

  defp start_demand(sandbox) do
    Enum.each(sandbox.start_ctx.components_pids, fn
      {{:input_stream, _topic}, pid} -> GenStage.demand(pid, :forward)
      _ -> :ok
    end)
  end

  defp stop(%__MODULE__{} = sandbox) do
    Supervisor.stop(sandbox.sup)
  end

  defp filter_messages(messages, :load_topic, _), do: messages

  defp filter_messages(messages, :input_topic, start_from) do
    Enum.drop_while(messages, &(&1.timestamp < start_from))
  end
end