lib/runbox/runtime/stage/template_carrier.ex

defmodule Runbox.Runtime.Stage.TemplateCarrier do
  @moduledoc """
  Module implements producer_consumer GenStage behaviour, scenario template is used to handle events

  Every scenario template is represented in stage based runtime by this module. TemplateCarrier uses
  `Runbox.Runtime.Stage.UnitRegistry` as its state (packed in `Runbox.StateStore.Entity`
  envelope for `StateStore` usage). `UnitRegistry` provides functionality to lookup units via
  subscriptions defined in scenario template.

  If unit is located for given message,
  `c:Toolbox.Scenario.Template.StageBased.handle_message/2` is used to handle this message. When
  there is no registered unit for given routing key,
  `c:Toolbox.Scenario.Template.StageBased.handle_asset_discovery/1` is used instead.

  The state is updated in `Runbox.StateStore.Entity` with
  `Runbox.StateStore.Entity.update_state/3` after every message is processed. Additionally,
  before a message is processed it is confirmed that all messages with lower timestamps were
  processed. This is done with `Runbox.StateStore.Entity.ack_processed_time/3` and can result in
  saving the state in state store.
  """

  use GenStage
  require Logger

  alias Runbox.RunStartContext
  alias Runbox.Runtime.Stage.SelectorBuilder
  alias Runbox.Runtime.Stage.TemplateImpl
  alias Toolbox.Message

  defmodule State do
    @moduledoc false

    defstruct [:run_id, :scenario_id, :state_entity, :template, :start_from, :runbox_ctx]
  end

  def component_name(template) do
    {:template, template}
  end

  def start_link(args, runbox_ctx, start_ctx) do
    GenStage.start_link(
      __MODULE__,
      {args, runbox_ctx, start_ctx}
    )
  end

  @impl true
  def init({args, runbox_ctx, start_ctx}) do
    %{
      run_id: run_id,
      config: %{
        template: template,
        subscribe_to: sub_defs,
        scenario_id: scenario_id,
        start_from: start_from,
        start_or_continue: start_or_continue
      },
      state_entity: state_entity
    } = args

    subscribe_to =
      Enum.map(sub_defs, fn {component, subscription_properties} ->
        opts =
          case Keyword.get(subscription_properties, :selector) do
            :none_after_timezip -> []
            _ -> [selector: SelectorBuilder.build(subscription_properties)]
          end

        {RunStartContext.component_pid(start_ctx, component), opts}
      end)

    if start_or_continue == :start do
      send(self(), :bootstrap)
    end

    Logger.metadata(run_id: run_id, scenario_id: scenario_id)

    {:producer_consumer,
     %State{
       run_id: run_id,
       scenario_id: scenario_id,
       state_entity: state_entity,
       template: template,
       start_from: start_from,
       runbox_ctx: runbox_ctx
     }, subscribe_to: subscribe_to, dispatcher: GenStage.BroadcastDispatcher}
  end

  @impl true
  def handle_info(:bootstrap, %State{} = state) do
    {outputs, state_entity} =
      TemplateImpl.init_state(
        state.template,
        state.state_entity,
        state.start_from,
        state.scenario_id,
        state.run_id
      )

    {:noreply, outputs, %State{state | state_entity: state_entity}}
  end

  @impl true
  def handle_events(msgs, _from, %State{} = state) do
    last_msg = List.last(msgs)

    {msgs, state_entity} =
      Enum.reduce(msgs, {[], state.state_entity}, fn in_msg, {o_msgs0, entity} ->
        {o_msgs, entity} =
          TemplateImpl.handle_message(
            in_msg,
            state.template,
            entity,
            state.scenario_id,
            state.run_id,
            state.runbox_ctx
          )

        {[o_msgs0 | o_msgs], entity}
      end)

    # Adding tick to the end to prevent TimeZip deadlock.  The deadlock can occur in a situation
    # where a TemplateCarrier directly before TimeZip discards all messages. If the TemplateCarrier
    # doesn't receive any ticks or OAs the output of the GenStage is completely empty, which can
    # block the TimeZip.
    # Originally we tried to prevent this situation by adding a tick to every bulk in InputStream,
    # but that turned out to be insufficient, since the frequency of these ticks can be reduced
    # inside the scenario to arbitrary level (common with a Splitter pattern, where many output
    # messages are created from single input message).
    msgs =
      if last_msg do
        [msgs, %Message{type: "tick", timestamp: last_msg.timestamp}]
      else
        msgs
      end

    {:noreply, List.flatten(msgs), %State{state | state_entity: state_entity}}
  end
end