defmodule Runbox.Runtime.Stage.TemplateImpl do
@moduledoc """
Module encapsulates scenario template callback calls.
Scenario defines `Toolbox.Runtime.Stage.Unit` as state carrier, these units are organized
into `Runbox.Runtime.Stage.UnitRegistry` which is helper structure to route messages between
units managed by one template.
Messages are handled via `Runbox.Scenario.Template.StageBased` callbacks. We distinguish two
types of messages = message to existing unit and message to new unit (asset discovery).
Messages that doesn't have any unit registered and thus found to handle them, we call this
situation asset discovery and use `c:Runbox.Scenario.Template.StageBased.handle_asset_discovery/1`
callback to handle these message. New unit may or may not be created here, but when it does,
it needs to be initialized via `c:Runbox.Scenario.Template.StageBased.init/2` callback.
When there is at least one unit which is registered to the message, we use callback
`c:Runbox.Scenario.Template.StageBased.handle_message/2`. Unit may be updated or stopped here.
We support timeouts to be defined. These timeouts are triggered via `external` time, which is
derived from incomming message timestamps. Timeouts are registered for one unit.
Some output actions produced by any of template callback should be further updated. Events
require filling `run id` and notifications require `scenario id` to be filled.
"""
alias Runbox.Runtime.OutputAction
alias Runbox.Runtime.RuntimeInstruction
alias Runbox.Runtime.RuntimeInstruction.Timeout, as: TimeoutRuntimeInstruction
alias Runbox.Runtime.Stage.UnitRegistry
alias Runbox.Scenario.Template.StageBased, as: Template
alias Runbox.StateStore.Entity
alias Toolbox.Message
alias Toolbox.Runtime.Stage.Unit
require Logger
@error_inspect_limit [limit: 20, printable_limit: 512]
def init_state(template, state, timestamp, scenario_id, run_id) do
unit_registry = Entity.state(state)
{outputs, unit_registry} =
unit_registry
|> UnitRegistry.units()
|> Enum.reduce({[], unit_registry}, fn unit, {outputs0, unit_registry} ->
{outputs, unit_registry} =
init_unit(
template,
unit_registry,
timestamp,
unit,
scenario_id,
run_id
)
{[outputs | outputs0], unit_registry}
end)
{List.flatten(outputs), Entity.update_state(state, timestamp, unit_registry)}
end
defp ack_processed_time(entity, timestamp, ctx) do
Entity.ack_processed_time(entity, timestamp, {Runbox, :save_entity, [ctx]})
end
def handle_message(%OutputAction{} = oa, template, state, scenario_id, run_id, ctx) do
# incomming output actions are only handled for their timestamp to check for timeouts
{:ok, timeout_outputs, state} =
check_for_timeouts(template, state, oa.timestamp, scenario_id, run_id, ctx)
state = Entity.update_state(state, oa.timestamp, Entity.state(state))
{List.flatten([timeout_outputs, oa]), state}
end
def handle_message(%Message{type: "tick"} = msg, template, state, scenario_id, run_id, ctx) do
# tick messages are only handled for their timestamps to check for timeouts
{:ok, timeout_outputs, state} =
check_for_timeouts(template, state, msg.timestamp, scenario_id, run_id, ctx)
state = Entity.update_state(state, msg.timestamp, Entity.state(state))
{List.flatten([timeout_outputs, msg]), state}
end
def handle_message(%Message{} = msg, template, state, scenario_id, run_id, ctx) do
# check for expired timeouts and handle incoming msg afterwards
{:ok, timeout_outputs, state} =
check_for_timeouts(template, state, msg.timestamp, scenario_id, run_id, ctx)
unit_registry = Entity.state(state)
{outputs, unit_registry} =
case UnitRegistry.lookup(unit_registry, msg) do
{:ok, []} ->
handle_asset_discovery_message(template, unit_registry, msg, scenario_id, run_id)
{:ok, registered_units} ->
Enum.reduce(registered_units, {[], unit_registry}, fn
unit, {output_msgs0, unit_registry0} ->
{output_msgs, unit_registry} =
handle_regular_message(template, unit_registry0, unit, msg, scenario_id, run_id)
{output_msgs0 ++ output_msgs, unit_registry}
end)
{:error, :unknown_message_type} ->
{[], unit_registry}
end
state = Entity.update_state(state, msg.timestamp, unit_registry)
{List.flatten([timeout_outputs, outputs]), state}
end
defp check_for_timeouts(template, state, timestamp, scenario_id, run_id, ctx) do
{timeout_outputs, state} = handle_timeout(template, state, timestamp, scenario_id, run_id, ctx)
{:ok, state} = ack_processed_time(state, timestamp, ctx)
{:ok, timeout_outputs, state}
end
defp handle_timeout(template, state, timestamp, scenario_id, run_id, ctx, outputs0 \\ []) do
unit_registry = Entity.state(state)
case UnitRegistry.pop_reached_timeout(unit_registry, timestamp) do
{:ok, %Unit{} = unit, %Message{} = timeout_msg, %UnitRegistry{} = unit_registry} ->
{:ok, state} = ack_processed_time(state, timeout_msg.timestamp, ctx)
{outputs, unit_registry} =
handle_regular_message(template, unit_registry, unit, timeout_msg, scenario_id, run_id)
state = Entity.update_state(state, timeout_msg.timestamp, unit_registry)
handle_timeout(template, state, timestamp, scenario_id, run_id, ctx, [outputs0, outputs])
:no_reached_timeout ->
{List.flatten(outputs0), state}
end
end
defp init_unit(template, unit_registry, timestamp, unit, scenario_id, run_id) do
tmpl_res =
try do
Template.init(template, timestamp, unit)
rescue
e -> {:error, e, __STACKTRACE__}
catch
e -> {:error, e, __STACKTRACE__}
end
case tmpl_res do
{:ok, outputs, %Unit{} = unit} when is_list(outputs) ->
{:ok, unit_registry} = UnitRegistry.update(unit_registry, unit)
{_outputs, %UnitRegistry{}} =
process_unit_outputs(
timestamp,
unit_registry,
unit,
outputs,
scenario_id,
run_id,
template,
"init"
)
{:error, e, trace} ->
template = inspect(template)
e = format_exception(e, trace)
Logger.warning("""
TemplateCarrier has ignored initialization of unit #{template}/#{unit.id}
Error: #{e}
""")
{[], unit_registry}
{:ok, outputs, bad_unit} when is_list(outputs) ->
template = inspect(template)
bad_unit = inspect(bad_unit, @error_inspect_limit)
Logger.warning("Unit #{template}/#{unit.id} returned bad unit during init: #{bad_unit}")
{[], unit_registry}
bad_resp ->
template = inspect(template)
bad_resp = inspect(bad_resp, [{:pretty, true} | @error_inspect_limit])
Logger.warning("Unit #{template}/#{unit.id} returned bad response during init: #{bad_resp}")
{[], unit_registry}
end
end
defp handle_asset_discovery_message(template, unit_registry, msg, scenario_id, run_id) do
tmpl_res =
try do
Template.handle_asset_discovery(template, msg)
rescue
e -> {:error, e, __STACKTRACE__}
catch
e -> {:error, e, __STACKTRACE__}
end
case tmpl_res do
{:error, e, trace} ->
msg = inspect(msg, [{:pretty, true} | @error_inspect_limit])
template = inspect(template)
e = format_exception(e, trace)
Logger.warning("""
TemplateCarrier has ignored discovery message of template #{template}
Error: #{e}
Message: #{msg}
""")
{[], unit_registry}
{:reply, outputs} when is_list(outputs) ->
# there is no action to be handled by template_carrier yet, so we filter out
# possible timeout registrations
outputs =
outputs
|> remove_timeout_registrations()
|> process_template_outputs(msg.timestamp, scenario_id, run_id, template)
{outputs, unit_registry}
{:reply, outputs, %Unit{} = unit} when is_list(outputs) ->
unit_registry = UnitRegistry.register(unit_registry, unit)
{outputs, unit_registry} =
process_unit_outputs(
msg.timestamp,
unit_registry,
unit,
outputs,
scenario_id,
run_id,
template,
"asset discovery"
)
{init_outputs, unit_registry} =
init_unit(template, unit_registry, msg.timestamp, unit, scenario_id, run_id)
{[outputs, init_outputs], unit_registry}
{:reply, outputs, bad_unit} when is_list(outputs) ->
template = inspect(template)
bad_unit = inspect(bad_unit, @error_inspect_limit)
Logger.warning("Unit #{template} returned bad unit during asset discovery: #{bad_unit}")
{[], unit_registry}
bad_resp ->
template = inspect(template)
bad_resp = inspect(bad_resp, [{:pretty, true} | @error_inspect_limit])
Logger.warning("Unit #{template} returned bad response during asset discovery: #{bad_resp}")
{[], unit_registry}
end
end
defp handle_regular_message(template, unit_registry, unit, msg, scenario_id, run_id) do
tmpl_res =
try do
Template.handle_message(template, msg, unit)
rescue
e -> {:error, e, __STACKTRACE__}
catch
e -> {:error, e, __STACKTRACE__}
end
case tmpl_res do
{:error, e, trace} ->
msg = inspect(msg, [{:pretty, true} | @error_inspect_limit])
e = format_exception(e, trace)
template = inspect(template)
inspected_unit = inspect(unit, @error_inspect_limit)
Logger.warning("""
TemplateCarrier has ignored regular message for unit #{template}/#{unit.id}
Error: #{e}
Message: #{msg}
Unit: #{inspected_unit}
""")
{[], unit_registry}
{:reply, msgs, %Unit{} = unit} when is_list(msgs) ->
{:ok, unit_registry} = UnitRegistry.update(unit_registry, unit)
{_msgs, %UnitRegistry{}} =
process_unit_outputs(
msg.timestamp,
unit_registry,
unit,
msgs,
scenario_id,
run_id,
template,
"handle message"
)
{:stop, msgs, %Unit{} = unit} when is_list(msgs) ->
# process the OAs, changes to the unit are not important, since the unit will be removed
{msgs, unit_registry} =
process_unit_outputs(
msg.timestamp,
unit_registry,
unit,
msgs,
scenario_id,
run_id,
template,
"handle message"
)
unit_registry = UnitRegistry.unregister(unit_registry, unit)
{msgs, unit_registry}
{_op, msgs, bad_unit} when is_list(msgs) ->
template = inspect(template)
bad_unit = inspect(bad_unit, @error_inspect_limit)
Logger.warning(
"Unit #{template}/#{unit.id} returned bad unit during handle message: #{bad_unit}"
)
{[], unit_registry}
bad_resp ->
template = inspect(template)
bad_resp = inspect(bad_resp, [{:pretty, true} | @error_inspect_limit])
Logger.warning(
"Unit #{template}/#{unit.id} returned bad response during handle message: #{bad_resp}"
)
{[], unit_registry}
end
end
defp process_template_outputs(outputs, timestamp, scenario_id, run_id, template) do
outputs
|> create_output_actions(timestamp, scenario_id, run_id)
|> Enum.filter(fn
%OutputAction{} ->
true
%Message{} ->
true
bad_oa ->
template = inspect(template)
bad_oa = inspect(bad_oa, @error_inspect_limit)
Logger.warning("Unit #{template} returned bad OA during asset discovery: #{bad_oa}")
false
end)
|> Enum.map(fn
%OutputAction{} = output ->
output
%Message{} = output ->
maybe_update_output_timestamp(output, timestamp)
end)
end
defp process_unit_outputs(
timestamp,
unit_registry,
unit,
outputs,
scenario_id,
run_id,
template,
phase
) do
{outputs, unit_registry} =
outputs
|> create_output_actions(timestamp, scenario_id, run_id)
|> Enum.reduce({[], unit_registry}, fn
%RuntimeInstruction{body: %TimeoutRuntimeInstruction{}} = runtime_instruction,
{outputs, unit_registry} ->
runtime_instruction = maybe_fix_timeout_in_the_past(runtime_instruction, timestamp)
unit_registry = register_timeout(unit_registry, unit, runtime_instruction)
{outputs, unit_registry}
%OutputAction{} = output, {outputs, unit_registry} ->
{[output | outputs], unit_registry}
%Message{} = output, {outputs, unit_registry} ->
output = maybe_update_output_timestamp(output, timestamp)
{[output | outputs], unit_registry}
bad_oa, state ->
template = inspect(template)
bad_oa = inspect(bad_oa, @error_inspect_limit)
Logger.warning("Unit #{template}/#{unit.id} returned bad OA during #{phase}: #{bad_oa}")
state
end)
{Enum.reverse(outputs), unit_registry}
end
defp create_output_actions(outputs, timestamp, scenario_id, run_id) do
Enum.map(outputs, fn output ->
if OutputAction.is_oa_body?(output) do
OutputAction.new(output, timestamp, scenario_id, run_id)
else
output
end
end)
end
defp maybe_update_output_timestamp(%Message{timestamp: timestamp} = msg, timestamp) do
msg
end
defp maybe_update_output_timestamp(%Message{} = msg, timestamp) do
Logger.warning(
"Template produced message #{inspect(msg)} with different timestamp than source timestamp, message timestamp changed to #{timestamp}"
)
%Message{msg | timestamp: timestamp}
end
defp maybe_fix_timeout_in_the_past(
%RuntimeInstruction{
body:
%TimeoutRuntimeInstruction{timeout_message: %Message{timestamp: timeout}} =
timeout_registration
} = runtime_instruction,
timestamp
)
when timeout < timestamp do
Logger.warning(
"Timeout registered to the past #{inspect(timeout_registration.timeout_message)}, changed to current timestamp #{timestamp}"
)
put_in(runtime_instruction.body.timeout_message.timestamp, timestamp)
end
defp maybe_fix_timeout_in_the_past(timeout_registration, _timestamp) do
# nothing to do, timeout is set to the future (or current timestamp, which is allowed)
timeout_registration
end
defp remove_timeout_registrations(outputs) do
Enum.reject(outputs, fn
%RuntimeInstruction{body: %TimeoutRuntimeInstruction{}} -> true
_ -> false
end)
end
defp register_timeout(unit_registry, unit, %RuntimeInstruction{
body: %TimeoutRuntimeInstruction{} = timeout_registration
}) do
unit_registry =
UnitRegistry.register_timeout(
unit_registry,
unit,
timeout_registration.timeout_message.timestamp,
timeout_registration.timeout_message
)
unit_registry
end
defp format_exception(e, trace) do
String.trim(Exception.format(:error, e, trim_trace(trace)))
end
defp trim_trace(trace) do
get_file = fn
{_mod, _fun, _arity, location} -> to_string(location[:file])
{_fun, _arity, location} -> to_string(location[:file])
end
trace
|> Enum.reduce_while([], fn item, trace ->
if String.starts_with?(get_file.(item), "lib/runtime/") do
{:halt, trace}
else
{:cont, [item | trace]}
end
end)
|> Enum.reverse()
end
end