defmodule Runbox.Runtime.Simple.StateLoader do
@moduledoc """
Handles run state load or initialization for simple scenario runtime.
"""
alias Runbox.Runtime.Simple.TemplateCarrier
alias Runbox.Runtime.Simple.Timezip
alias Runbox.Scenario.StartParams
alias Runbox.StateStore
require Logger
@behaviour Runbox.Runtime.StateLoader
@template_id TemplateCarrier.component_name()
@output_id :output_stream
@timezip_id Timezip.component_name()
@impl true
def load_existing_state(%StartParams{} = start_params, instance_context, start_recipe) do
with {:ok, ts, entities} <- StateStore.load_run_state(instance_context.state_store_pid) do
{:ok, assign_states(start_recipe, start_params, ts, entities, :continue, instance_context)}
end
end
@impl true
def init_new_state(%StartParams{} = start_params, instance_context, start_recipe) do
# Template Carrier initializes its state on its own
entity_defs = [{@template_id, nil}, {@output_id, nil}]
{:ok, entities} =
StateStore.init_run_state(
instance_context.state_store_pid,
start_params.start_from,
entity_defs
)
{:ok,
assign_states(
start_recipe,
start_params,
start_params.start_from,
entities,
:start,
instance_context
)}
end
defp assign_states(
start_recipe,
%StartParams{} = start_params,
ts,
entities,
start_type,
instance_context
) do
entities = Map.new(entities, &{&1.id, &1})
# Moving state loaders to Runbox introduced this shadow dependency of knowing scenario
# components which are defined in Altworx.
# This applies to input_stream, tick_timezip components.
Enum.map(
start_recipe,
&assign_state(&1, start_params, ts, entities, start_type, instance_context)
)
end
defp assign_state(
%{id: @template_id, args: args} = def,
%StartParams{} = start_params,
ts,
entities,
type,
_
) do
entity = Map.get(entities, @template_id)
config =
args.config
|> Map.put(:scenario_id, start_params.scenario_id)
|> Map.put(:start_from, ts)
|> Map.put(:start_or_continue, type)
args =
args
|> Map.put(:run_id, start_params.run_id)
|> Map.put(:config, config)
|> Map.put(:state_entity, entity)
%{def | args: args}
end
defp assign_state(
%{id: {:input_stream, _}, args: args} = def,
%StartParams{} = start_params,
ts,
_,
type,
ctx
) do
topic = Map.get(start_params.input_topics, args.config.topic)
# where should input stream start reading data
ts =
cond do
type == :continue -> ts + 1
args.config.type == :input_topic -> ts
args.config.type == :load_topic -> :earliest
end
config =
args.config
|> Map.put(:topic, topic)
|> Map.put(:start_from, ts)
|> Map.put(:brod_client, ctx.brod_client)
args =
args
|> Map.put(:run_id, start_params.run_id)
|> Map.put(:config, config)
%{def | args: args}
end
defp assign_state(
%{id: @output_id, args: args} = def,
%StartParams{} = start_params,
ts,
entities,
_,
_
) do
entity = Map.get(entities, @output_id)
config =
args.config
|> Map.put(:side_effects?, start_params.side_effects?)
|> Map.put(:output_actions_from, start_params.output_actions_from)
|> Map.put(:notifications_from, start_params.notifications_from)
|> Map.put(:start_from, ts)
|> Map.put(:scenario_id, start_params.scenario_id)
args =
args
|> Map.put(:run_id, start_params.run_id)
|> Map.put(:config, config)
|> Map.put(:state_entity, entity)
%{def | args: args}
end
defp assign_state(%{id: @timezip_id, args: args} = def, %StartParams{} = start_params, _, _, _, _) do
config = Map.put(args.config, :scenario_id, start_params.scenario_id)
args =
args
|> Map.put(:run_id, start_params.run_id)
|> Map.put(:config, config)
%{def | args: args}
end
defp assign_state(
%{id: {:tick_timezip, logical_topic}, args: args} = def,
%StartParams{} = start_params,
start_from,
_,
_,
_
) do
{time, res} =
:timer.tc(fn ->
topic_map = Map.get(start_params.input_topics, logical_topic)
physical_topic = topic_map.physical_topic
config =
args.config
|> Map.put(:scenario_id, start_params.scenario_id)
|> Map.put(:topic, physical_topic)
|> Map.put(:start_from, start_from)
args =
args
|> Map.put(:run_id, start_params.run_id)
|> Map.put(:config, config)
%{def | args: args}
end)
Logger.info(
"Run #{start_params.run_id} fetch tick_timezip #{logical_topic} state took #{div(time, 1000)}ms."
)
res
end
end