defmodule Runbox.Scenario.Template.StageBased do
@moduledoc """
Module defines behaviour used in scenarios as template.
Scenario template implementations are main components of scenario. Runtime uses callbacks of
this behaviour to start scenario in run and to handle incoming messages. Each template
behaviour implementation is represented as separate process in runtime called `TemplateCarrier`.
Template module name structure must be always prefixed by the module name structure of the
Manifest. E.g. if you have manifest module `Scenarios.MyScenario` a template can be named
`Scenarios.MyScenario.Templates.Car` or just `Scenarios.MyScenario.Car` but not
`Scenarios.MyScenarioCar`.
Template can also be defined in the same module as the Manifest. This is useful in cases where
there is only a single template in the scenario.
Template is intended to be used to model behaviour of assets of single type per template.
Individual asset states are represented as `t:Runbox.Runtime.Stage.Unit.t/0`. Initial assets are
defined via `c:Runbox.Scenario.Template.StageBased.instances/0` callback. This callback should
return list of unit definitions (`t:Runbox.Scenario.Template.StageBased.instances_def/0`).
Templates are linked together via subscriptions provided by callback
`c:Runbox.Scenario.Template.StageBased.subscriptions/0`. Subscription is defined as tuple
containing component definition and routing rule list. It is possible to subscribe to other
templates, input and load topics. Routing rules are used to locate units when message is handled.
When routing rule succeeds, callback `c:Runbox.Scenario.Template.StageBased.handle_message/2`
is called, when routing rule fails to locat existing unit,
`c:Runbox.Scenario.Template.StageBased.handle_asset_discovery/1` gets called instead.
Unit initialization can produce side effects via `c:Runbox.Scenario.Template.StageBased.init/2`
callback. This callback is triggered when instances are produced by
`c:Runbox.Scenario.Template.StageBased.instances/0` or
`c:Runbox.Scenario.Template.StageBased.handle_asset_discovery/1` callbacks. Units are already
registered in the time when this callback is triggered, so changes in unit attributes won't
change unit registration (defined by `c:Runbox.Scenario.Template.StageBased.subscriptions/0`).
When message from subscribed publisher is handled by the `TemplateCarrier`, runtime tries to
locate asset unit using given subscription configuration. If there is such a unit, runtime
calls `c:Runbox.Scenario.Template.StageBased.handle_message/2` callback and uses message and
located message as attributes. In case that no unit was found,
`c:Runbox.Scenario.Template.StageBased.handle_asset_discovery/1` gets called instead.
"""
@behaviour Runbox.Scenario.TemplateInspector
alias Runbox.Message, as: Msg
alias Runbox.Runtime.RuntimeInstruction
alias Runbox.Runtime.Stage.Unit
alias Runbox.Scenario.OutputAction
require Logger
@type instances_def :: [{:unit, id :: String.t(), attributes :: map}]
@type routing_key_definition ::
{message_type :: String.t() | atom,
{function :: := | :in, [path_to_msg_body_key :: [String.t() | atom]],
[path_to_attribute_key :: [String.t() | atom]]}}
@type subscription_def ::
{:input_topic, topic_name :: String.t(), [routing_key_definition]}
| {:load_topic, topic_name :: String.t(), [routing_key_definition]}
| {:template, template_name :: module, [routing_key_definition]}
@type template_output :: [Msg.t() | OutputAction.oa_params() | RuntimeInstruction.t()]
@type handle_message_resp ::
{:reply, template_output, Unit.t()} | {:stop, template_output, Unit.t()}
@type handle_asset_discovery_resp ::
{:reply, template_output} | {:reply, template_output, Unit.t()}
@callback instances :: instances_def()
@callback init(integer, Unit.t()) :: {:ok, template_output, Unit.t()}
@callback subscriptions :: [subscription_def()]
@callback handle_message(Msg.t(), Unit.t()) :: handle_message_resp()
@callback handle_asset_discovery(Msg.t()) :: handle_asset_discovery_resp()
@optional_callbacks init: 2, handle_asset_discovery: 1
@doc """
Returns list of of asset units.
"""
@spec instances(template :: module) :: instances_def
def instances(template_module) do
template_module.instances()
end
@doc """
Updates unit and sets initial unit state.
"""
@spec init(template :: module, timestamp :: integer, Unit.t()) ::
{:ok, template_output, Unit.t()} | any()
def init(template_module, timestamp, unit) do
if Code.ensure_loaded?(template_module) && function_exported?(template_module, :init, 2) do
template_module.init(timestamp, unit)
else
{:ok, [], unit}
end
end
@doc """
Returns subscriptions defined in `template_module`.
"""
@spec subscriptions(template :: module) :: [subscription_def]
def subscriptions(template_module) do
template_module.subscriptions()
end
@doc """
Invokes `template_module.handle_message` with given `message` and `unit`.
"""
@spec handle_message(template :: module, Msg.t(), Unit.t()) :: handle_message_resp | any()
def handle_message(template_module, msg, unit) do
template_module.handle_message(msg, unit)
end
@doc """
Invokes `template_module.asset_discovery` with given `message`.
"""
@spec handle_asset_discovery(template :: module, Msg.t()) :: handle_asset_discovery_resp | any()
def handle_asset_discovery(template_module, msg) do
if Code.ensure_loaded?(template_module) &&
function_exported?(template_module, :handle_asset_discovery, 1) do
template_module.handle_asset_discovery(msg)
else
{:reply, []}
end
end
@impl true
def valid_template?(template_module) do
behaviours =
:attributes
|> template_module.module_info()
|> Keyword.get_values(:behaviour)
|> List.flatten()
Enum.member?(behaviours, Runbox.Scenario.Template.StageBased)
end
@impl true
def template_info(template_module) do
%{subscriptions: template_module.subscriptions()}
end
end