Skip to main content

lib/runbox/scenario/template/stage_based.ex

defmodule Runbox.Scenario.Template.StageBased do
  @moduledoc """
  Module defines behaviour used in scenarios as template.

  Scenario template implementations are main components of scenario. Runtime uses callbacks of
  this behaviour to start scenario in run and to handle incoming messages. Each template
  behaviour implementation is represented as separate process in runtime called `TemplateCarrier`.

  Template module name structure must be always prefixed by the module name structure of the
  Manifest. E.g. if you have manifest module `Scenarios.MyScenario` a template can be named
  `Scenarios.MyScenario.Templates.Car` or just `Scenarios.MyScenario.Car` but not
  `Scenarios.MyScenarioCar`.

  Template can also be defined in the same module as the Manifest. This is useful in cases where
  there is only a single template in the scenario.

  Template is intended to be used to model behaviour of assets of single type per template.
  Individual asset states are represented as `t:Runbox.Runtime.Stage.Unit.t/0`. Initial assets are
  defined via `c:Runbox.Scenario.Template.StageBased.instances/0` callback. This callback should
  return list of unit definitions (`t:Runbox.Scenario.Template.StageBased.instances_def/0`).

  Templates are linked together via subscriptions provided by callback
  `c:Runbox.Scenario.Template.StageBased.subscriptions/0`. Subscription is defined as tuple
  containing component definition and routing rule list. It is possible to subscribe to other
  templates, input and load topics. Routing rules are used to locate units when message is handled.
  When routing rule succeeds, callback `c:Runbox.Scenario.Template.StageBased.handle_message/2`
  is called, when routing rule fails to locat existing unit,
  `c:Runbox.Scenario.Template.StageBased.handle_asset_discovery/1` gets called instead.

  Unit initialization can produce side effects via `c:Runbox.Scenario.Template.StageBased.init/2`
  callback. This callback is triggered when instances are produced by
  `c:Runbox.Scenario.Template.StageBased.instances/0` or
  `c:Runbox.Scenario.Template.StageBased.handle_asset_discovery/1` callbacks. Units are already
  registered in the time when this callback is triggered, so changes in unit attributes won't
  change unit registration (defined by `c:Runbox.Scenario.Template.StageBased.subscriptions/0`).

  When message from subscribed publisher is handled by the `TemplateCarrier`, runtime tries to
  locate asset unit using given subscription configuration. If there is such a unit, runtime
  calls `c:Runbox.Scenario.Template.StageBased.handle_message/2` callback and uses message and
  located message as attributes. In case that no unit was found,
  `c:Runbox.Scenario.Template.StageBased.handle_asset_discovery/1` gets called instead.
  """

  @behaviour Runbox.Scenario.TemplateInspector

  alias Runbox.Message, as: Msg
  alias Runbox.Runtime.RuntimeInstruction
  alias Runbox.Runtime.Stage.Unit
  alias Runbox.Scenario.OutputAction

  require Logger

  @type instances_def :: [{:unit, id :: String.t(), attributes :: map}]

  @type routing_key_definition ::
          {message_type :: String.t() | atom,
           {function :: := | :in, [path_to_msg_body_key :: [String.t() | atom]],
            [path_to_attribute_key :: [String.t() | atom]]}}

  @type subscription_def ::
          {:input_topic, topic_name :: String.t(), [routing_key_definition]}
          | {:load_topic, topic_name :: String.t(), [routing_key_definition]}
          | {:template, template_name :: module, [routing_key_definition]}

  @type template_output :: [Msg.t() | OutputAction.oa_params() | RuntimeInstruction.t()]

  @type handle_message_resp ::
          {:reply, template_output, Unit.t()} | {:stop, template_output, Unit.t()}

  @type handle_asset_discovery_resp ::
          {:reply, template_output} | {:reply, template_output, Unit.t()}

  @callback instances :: instances_def()
  @callback init(integer, Unit.t()) :: {:ok, template_output, Unit.t()}
  @callback subscriptions :: [subscription_def()]
  @callback handle_message(Msg.t(), Unit.t()) :: handle_message_resp()
  @callback handle_asset_discovery(Msg.t()) :: handle_asset_discovery_resp()

  @optional_callbacks init: 2, handle_asset_discovery: 1

  @doc """
  Returns list of of asset units.
  """
  @spec instances(template :: module) :: instances_def
  def instances(template_module) do
    template_module.instances()
  end

  @doc """
  Updates unit and sets initial unit state.
  """
  @spec init(template :: module, timestamp :: integer, Unit.t()) ::
          {:ok, template_output, Unit.t()} | any()
  def init(template_module, timestamp, unit) do
    if Code.ensure_loaded?(template_module) && function_exported?(template_module, :init, 2) do
      template_module.init(timestamp, unit)
    else
      {:ok, [], unit}
    end
  end

  @doc """
  Returns subscriptions defined in `template_module`.
  """
  @spec subscriptions(template :: module) :: [subscription_def]
  def subscriptions(template_module) do
    template_module.subscriptions()
  end

  @doc """
  Invokes `template_module.handle_message` with given `message` and `unit`.
  """
  @spec handle_message(template :: module, Msg.t(), Unit.t()) :: handle_message_resp | any()
  def handle_message(template_module, msg, unit) do
    template_module.handle_message(msg, unit)
  end

  @doc """
  Invokes `template_module.asset_discovery` with given `message`.
  """
  @spec handle_asset_discovery(template :: module, Msg.t()) :: handle_asset_discovery_resp | any()
  def handle_asset_discovery(template_module, msg) do
    if Code.ensure_loaded?(template_module) &&
         function_exported?(template_module, :handle_asset_discovery, 1) do
      template_module.handle_asset_discovery(msg)
    else
      {:reply, []}
    end
  end

  @impl true
  def valid_template?(template_module) do
    behaviours =
      :attributes
      |> template_module.module_info()
      |> Keyword.get_values(:behaviour)
      |> List.flatten()

    Enum.member?(behaviours, Runbox.Scenario.Template.StageBased)
  end

  @impl true
  def template_info(template_module) do
    %{subscriptions: template_module.subscriptions()}
  end
end