defmodule Runbox.Runtime.Stage.Sandbox do
@moduledoc """
Sandbox is helper for executing runs without started Altworx application.
Sandbox can be used for scenarios unit testing and also for interactive
development of scenarios. When used for unit testing, it can be used
directly or via `Runbox.Runtime.Stage.Sandbox.TestRunner`.
For interactive development of scenarios, run `iex --erl "-runbox mode slave" -S mix`
from command line in your scenarios application root directory and then use Sandbox
fuctions to execute your runs (of course use `recompile` before run execution if you
changed scenario code).
"""
alias Mix.Project
alias Runbox.RunNode
alias Runbox.RunStartContext
alias Runbox.Runtime.Stage.ComponentNetwork
alias Runbox.Runtime.Stage.Sandbox.InputStream
alias Runbox.Runtime.Stage.Sandbox.OutputStream
alias Runbox.Runtime.Stage.Sandbox.StartRecipe
alias Runbox.Runtime.Stage.StateGenerator
alias Runbox.Runtime.Stage.Timezip
alias Runbox.Scenario.Template
alias Runbox.ScenarioRelease.SlaveFunc
alias Runbox.ScenarioTemplate
alias Runbox.StateStore.Entity
alias Toolbox.Message
alias Toolbox.Scenario.OutputAction
@default_start_from 0
defstruct [
:run_id,
:scenario_id,
:sup,
:topics,
:start_recipe,
start_ctx: %RunStartContext{},
start_from: @default_start_from
]
@type topics :: %{topic_name => [Message.t()]}
@type topic_name :: String.t()
@doc """
Starts new run for `scenario_id` and waits until all input messages are processed.
Returns all output actions generated by run execution. Input messages for each runtime
topic used by scenario should be defined in `topics` argument. Undefined topics are
considered empty. Messages in each topic must be in ascending order by `timestamp`.
There are few options that can be specified.
- `:start_from` - specify the start from value for the run. The value is used to filter out
messages (as in a real run, input topics only use messages with `timestamp >= start_from`) and
the value is provided to template's `init` callback.
- `:modules` - list of modules containing scenarios (manifests, templates). If list of modules is
specified `execute_run` does not search scenario in `:scenarios` application release, but in
given modules. It opens possibility to write ad-hoc scenarios with only dependency on Runbox
application, for example use it in [Livebook](https://livebook.dev/). To see example of such
livebook just run `livebook server --home .` in runbox root directory, go to generated livebook
server link in browser and finally open `sandbox-scenario-demo.livemd` document. Running docker
or altworx is not needed, only runbox app (and its deps) is required!
"""
@spec execute_run(topics, String.t(), [module] | nil) :: {:ok, OutputAction.t()} | {:error, term}
def execute_run(topics, scenario_id, opts \\ []) do
modules = Keyword.get(opts, :modules) || modules_from_mix_app()
start_from = Keyword.get(opts, :start_from, @default_start_from)
with {:ok, scenario} <- find_scenario(scenario_id, modules),
{:ok, component_network} <- create_component_network(scenario.templates),
start_recipe = StartRecipe.create(component_network),
sandbox = new(topics, scenario_id, start_recipe, start_from),
sandbox = generate_components_state(sandbox),
sandbox = append_end_of_topic_msgs(sandbox),
:ok <- maybe_execute_on_start(scenario),
{:ok, sandbox} <- start(sandbox) do
try do
with {:ok, sandbox} <- start_components(sandbox),
:ok <- start_demand(sandbox) do
OutputStream.wait_end_of_run(
sandbox.start_ctx.components_pids[:output_sink],
get_end_of_topic_msgs(sandbox)
)
end
after
:ok = stop(sandbox)
end
end
end
defp new(topics, scenario_id, start_recipe, start_from) do
%__MODULE__{
run_id: UUID.uuid1(),
topics: topics,
scenario_id: scenario_id,
start_recipe: start_recipe,
start_from: start_from
}
end
defp modules_from_mix_app do
if Code.ensure_loaded?(Project) and function_exported?(Project, :config, 0) do
case Keyword.fetch(Project.config(), :app) do
{:ok, app} -> Application.spec(app, :modules)
:error -> nil
end
end
end
defp find_scenario(scenario_id, modules) do
case Enum.find(SlaveFunc.release_info(modules), &(&1.manifest.id == scenario_id)) do
nil -> {:error, :not_found}
scenario -> {:ok, scenario}
end
end
defp create_component_network(templates) do
templates
|> Enum.map(fn %ScenarioTemplate{} = t -> %Template{mod: t.module, info: t.info} end)
|> ComponentNetwork.create(direct_ticking: false)
end
defp generate_components_state(sandbox) do
update_in(sandbox.start_recipe, fn recipe ->
Enum.map(recipe, &generate_component_state(&1, sandbox))
end)
end
defp generate_component_state(%{id: {:template, _template}} = component, sandbox) do
{id, state} = StateGenerator.generate_for_component(component.id, sandbox.run_id)
entity = Entity.new(sandbox.run_id, id, :none, sandbox.start_from, state)
config =
component.args.config
|> Map.put(:scenario_id, sandbox.scenario_id)
|> Map.put(:start_from, sandbox.start_from)
|> Map.put(:start_or_continue, :start)
args =
component.args
|> Map.put(:run_id, sandbox.run_id)
|> Map.put(:config, config)
|> Map.put(:state_entity, entity)
%{component | args: args}
end
defp generate_component_state(%{id: {:timezip, _}} = component, sandbox) do
config =
component.args.config
|> Map.put(:scenario_id, sandbox.scenario_id)
args =
component.args
|> Map.put(:run_id, sandbox.run_id)
|> Map.put(:config, config)
%{component | args: args}
end
defp generate_component_state(%{id: {input_topic_type, topic}} = component, sandbox)
when input_topic_type in [:input_topic, :load_topic] do
messages = filter_messages(sandbox.topics[topic] || [], input_topic_type, sandbox.start_from)
put_in(component, [:args, :messages], messages)
end
defp generate_component_state(component, _sandbox) do
component
end
defp append_end_of_topic_msgs(sandbox) do
timestamp = max_timestamp(sandbox.topics) + 1
update_in(sandbox.start_recipe, fn recipe ->
Enum.map(recipe, fn
%{id: {input_topic_type, topic}} = component
when input_topic_type in [:input_topic, :load_topic] ->
update_in(component, [:args, :messages], &(&1 ++ [end_of_topic_msg(topic, timestamp)]))
component ->
component
end)
end)
end
defp max_timestamp(topics, default_max \\ System.os_time(:millisecond)) do
topics
|> Map.values()
|> Enum.reject(&Enum.empty?/1)
|> Enum.map(&Enum.at(&1, -1).timestamp)
|> Enum.max(fn -> default_max end)
end
defp end_of_topic_msg(topic, timestamp) do
Message.new(:sandbox, "tick", timestamp, {:end_of_topic, topic})
end
defp get_end_of_topic_msgs(sandbox) do
Enum.flat_map(sandbox.start_recipe, fn
%{id: {input_topic_type, _}} = component
when input_topic_type in [:input_topic, :load_topic] ->
case Enum.at(component.args[:messages], -1) do
%Message{body: {:end_of_topic, _}} = msg -> [msg]
_ -> []
end
_ ->
[]
end)
end
defp maybe_execute_on_start(scenario) do
case scenario.opts[:on_start] do
{m, f, a} -> apply(m, f, a)
nil -> :ok
end
end
defp start(%__MODULE__{} = sandbox) do
with {:ok, sup} <- RunNode.start_run_sup(sandbox.run_id) do
Process.link(sup)
{:ok, %__MODULE__{sandbox | sup: sup}}
end
end
defp start_components(sandbox) do
sandbox.start_recipe
|> Enum.reduce_while(sandbox.start_ctx, fn comp, start_ctx ->
case start_comp(comp, sandbox.sup, start_ctx) do
{:ok, {pid, name}} ->
{:cont, RunStartContext.add_component(start_ctx, name, pid)}
{:error, _reason} = error ->
{:halt, error}
end
end)
|> then(fn
%RunStartContext{} = start_ctx -> {:ok, %__MODULE__{sandbox | start_ctx: start_ctx}}
{:error, _reason} = error -> error
end)
end
defp start_comp(component, sup, start_ctx) do
with {:ok, pid} <- start_comp_(component, sup, start_ctx) do
{:ok, {pid, StartRecipe.component_name(component)}}
end
end
defp start_comp_(%{mod: mod} = comp, sup, start_ctx)
when mod in [InputStream, OutputStream] do
%{id: id, mod: mod, fun: fun, args: args} = comp
child_spec = %{id: id, type: :worker, start: {mod, fun, [args, start_ctx]}, restart: :transient}
Supervisor.start_child(sup, child_spec)
end
defp start_comp_(%{id: {type, _}, mod: mod} = comp, sup, start_ctx)
when type == :template or mod == Timezip do
runbox_ctx = %Runbox.RunContext{sup_pid: sup}
RunNode.start_run_component(comp, runbox_ctx, start_ctx)
end
defp start_demand(sandbox) do
Enum.each(sandbox.start_ctx.components_pids, fn
{{:input_stream, _topic}, pid} -> GenStage.demand(pid, :forward)
_ -> :ok
end)
end
defp stop(%__MODULE__{} = sandbox) do
Supervisor.stop(sandbox.sup)
end
defp filter_messages(messages, :load_topic, _), do: messages
defp filter_messages(messages, :input_topic, start_from) do
Enum.drop_while(messages, &(&1.timestamp < start_from))
end
end