defmodule Runbox.Runtime.Stage.TemplateCarrier do
@moduledoc """
Module implements producer_consumer GenStage behaviour, scenario template is used to handle events
Every scenario template is represented in stage based runtime by this module. TemplateCarrier uses
`Runbox.Runtime.Stage.UnitRegistry` as its state (packed in `Runbox.StateStore.Entity`
envelope for `StateStore` usage). `UnitRegistry` provides functionality to lookup units via
subscriptions defined in scenario template.
If unit is located for given message,
`c:Toolbox.Scenario.Template.StageBased.handle_message/2` is used to handle this message. When
there is no registered unit for given routing key,
`c:Toolbox.Scenario.Template.StageBased.handle_asset_discovery/1` is used instead.
The state is updated in `Runbox.StateStore.Entity` with
`Runbox.StateStore.Entity.update_state/3` after every message is processed. Additionally,
before a message is processed it is confirmed that all messages with lower timestamps were
processed. This is done with `Runbox.StateStore.Entity.ack_processed_time/3` and can result in
saving the state in state store.
"""
use GenStage
require Logger
alias Runbox.RunStartContext
alias Runbox.Runtime.Stage.SelectorBuilder
alias Runbox.Runtime.Stage.TemplateImpl
alias Toolbox.Message
defmodule State do
@moduledoc false
defstruct [:run_id, :scenario_id, :state_entity, :template, :start_from, :runbox_ctx]
end
def component_name(template) do
{:template, template}
end
def start_link(args, runbox_ctx, start_ctx) do
GenStage.start_link(
__MODULE__,
{args, runbox_ctx, start_ctx}
)
end
@impl true
def init({args, runbox_ctx, start_ctx}) do
%{
run_id: run_id,
config: %{
template: template,
subscribe_to: sub_defs,
scenario_id: scenario_id,
start_from: start_from,
start_or_continue: start_or_continue
},
state_entity: state_entity
} = args
subscribe_to =
Enum.map(sub_defs, fn {component, subscription_properties} ->
opts =
case Keyword.get(subscription_properties, :selector) do
:none_after_timezip -> []
_ -> [selector: SelectorBuilder.build(subscription_properties)]
end
{RunStartContext.component_pid(start_ctx, component), opts}
end)
if start_or_continue == :start do
send(self(), :bootstrap)
end
Logger.metadata(run_id: run_id, scenario_id: scenario_id)
{:producer_consumer,
%State{
run_id: run_id,
scenario_id: scenario_id,
state_entity: state_entity,
template: template,
start_from: start_from,
runbox_ctx: runbox_ctx
}, subscribe_to: subscribe_to, dispatcher: GenStage.BroadcastDispatcher}
end
@impl true
def handle_info(:bootstrap, %State{} = state) do
{outputs, state_entity} =
TemplateImpl.init_state(
state.template,
state.state_entity,
state.start_from,
state.scenario_id,
state.run_id
)
{:noreply, outputs, %State{state | state_entity: state_entity}}
end
@impl true
def handle_events(msgs, _from, %State{} = state) do
last_msg = List.last(msgs)
{msgs, state_entity} =
Enum.reduce(msgs, {[], state.state_entity}, fn in_msg, {o_msgs0, entity} ->
{o_msgs, entity} =
TemplateImpl.handle_message(
in_msg,
state.template,
entity,
state.scenario_id,
state.run_id,
state.runbox_ctx
)
{[o_msgs0 | o_msgs], entity}
end)
# Adding tick to the end to prevent TimeZip deadlock. The deadlock can occur in a situation
# where a TemplateCarrier directly before TimeZip discards all messages. If the TemplateCarrier
# doesn't receive any ticks or OAs the output of the GenStage is completely empty, which can
# block the TimeZip.
# Originally we tried to prevent this situation by adding a tick to every bulk in InputStream,
# but that turned out to be insufficient, since the frequency of these ticks can be reduced
# inside the scenario to arbitrary level (common with a Splitter pattern, where many output
# messages are created from single input message).
msgs =
if last_msg do
[msgs, %Message{type: "tick", timestamp: last_msg.timestamp}]
else
msgs
end
{:noreply, List.flatten(msgs), %State{state | state_entity: state_entity}}
end
end