lib/runbox/runtime/stage/template_impl.ex

defmodule Runbox.Runtime.Stage.TemplateImpl do
  @moduledoc """
  Module encapsulates scenario template callback calls.

  Scenario defines `Toolbox.Runtime.Stage.Unit` as state carrier, these units are organized
  into `Runbox.Runtime.Stage.UnitRegistry` which is helper structure to route messages between
  units managed by one template.

  Messages are handled via `Toolbox.Scenario.Template.StageBased` callbacks. We distinguish two
  types of messages = message to existing unit and message to new unit (asset discovery).

  Messages that doesn't have any unit registered and thus found to handle them, we call this
  situation asset discovery and use `c:Toolbox.Scenario.Template.StageBased.handle_asset_discovery/1`
  callback to handle these message. New unit may or may not be created here, but when it does,
  it needs to be initialized via `c:Toolbox.Scenario.Template.StageBased.init/2` callback.

  When there is at least one unit which is registered to the message, we use callback
  `c:Toolbox.Scenario.Template.StageBased.handle_message/2`. Unit may be updated or stopped here.

  We support timeouts to be defined. These timeouts are triggered via `external` time, which is
  derived from incomming message timestamps. Timeouts are registered for one unit.

  Some output actions produced by any of template callback should be further updated. Events
  require filling `run id` and notifications require `scenario id` to be filled.
  """

  alias Runbox.Runtime.Stage.UnitRegistry
  alias Runbox.StateStore.Entity
  alias Toolbox.Message
  alias Toolbox.Runtime.RuntimeInstruction
  alias Toolbox.Runtime.RuntimeInstruction.Timeout, as: TimeoutRuntimeInstruction
  alias Toolbox.Runtime.Stage.Unit
  alias Toolbox.Scenario.OutputAction
  alias Toolbox.Scenario.Template.StageBased, as: Template

  require Logger

  @error_inspect_limit [limit: 20, printable_limit: 512]

  def init_state(template, state, timestamp, scenario_id, run_id) do
    unit_registry = Entity.state(state)

    {outputs, unit_registry} =
      unit_registry
      |> UnitRegistry.units()
      |> Enum.reduce({[], unit_registry}, fn unit, {outputs0, unit_registry} ->
        {outputs, unit_registry} =
          init_unit(
            template,
            unit_registry,
            timestamp,
            unit,
            scenario_id,
            run_id
          )

        {[outputs | outputs0], unit_registry}
      end)

    {List.flatten(outputs), Entity.update_state(state, timestamp, unit_registry)}
  end

  defp ack_processed_time(entity, timestamp, ctx) do
    Entity.ack_processed_time(entity, timestamp, {Runbox, :save_entity, [ctx]})
  end

  def handle_message(%OutputAction{} = oa, template, state, scenario_id, run_id, ctx) do
    # incomming output actions are only handled for their timestamp to check for timeouts
    {:ok, timeout_outputs, state} =
      check_for_timeouts(template, state, oa.timestamp, scenario_id, run_id, ctx)

    state = Entity.update_state(state, oa.timestamp, Entity.state(state))
    {List.flatten([timeout_outputs, oa]), state}
  end

  def handle_message(%Message{type: "tick"} = msg, template, state, scenario_id, run_id, ctx) do
    # tick messages are only handled for their timestamps to check for timeouts
    {:ok, timeout_outputs, state} =
      check_for_timeouts(template, state, msg.timestamp, scenario_id, run_id, ctx)

    state = Entity.update_state(state, msg.timestamp, Entity.state(state))
    {List.flatten([timeout_outputs, msg]), state}
  end

  def handle_message(%Message{} = msg, template, state, scenario_id, run_id, ctx) do
    # check for expired timeouts and handle incoming msg afterwards
    {:ok, timeout_outputs, state} =
      check_for_timeouts(template, state, msg.timestamp, scenario_id, run_id, ctx)

    unit_registry = Entity.state(state)

    {outputs, unit_registry} =
      case UnitRegistry.lookup(unit_registry, msg) do
        {:ok, []} ->
          handle_asset_discovery_message(template, unit_registry, msg, scenario_id, run_id)

        {:ok, registered_units} ->
          Enum.reduce(registered_units, {[], unit_registry}, fn
            unit, {output_msgs0, unit_registry0} ->
              {output_msgs, unit_registry} =
                handle_regular_message(template, unit_registry0, unit, msg, scenario_id, run_id)

              {output_msgs0 ++ output_msgs, unit_registry}
          end)

        {:error, :unknown_message_type} ->
          {[], unit_registry}
      end

    state = Entity.update_state(state, msg.timestamp, unit_registry)
    {List.flatten([timeout_outputs, outputs]), state}
  end

  defp check_for_timeouts(template, state, timestamp, scenario_id, run_id, ctx) do
    {timeout_outputs, state} = handle_timeout(template, state, timestamp, scenario_id, run_id, ctx)
    {:ok, state} = ack_processed_time(state, timestamp, ctx)
    {:ok, timeout_outputs, state}
  end

  defp handle_timeout(template, state, timestamp, scenario_id, run_id, ctx, outputs0 \\ []) do
    unit_registry = Entity.state(state)

    case UnitRegistry.pop_reached_timeout(unit_registry, timestamp) do
      {:ok, %Unit{} = unit, %Message{} = timeout_msg, %UnitRegistry{} = unit_registry} ->
        {:ok, state} = ack_processed_time(state, timeout_msg.timestamp, ctx)

        {outputs, unit_registry} =
          handle_regular_message(template, unit_registry, unit, timeout_msg, scenario_id, run_id)

        state = Entity.update_state(state, timeout_msg.timestamp, unit_registry)

        handle_timeout(template, state, timestamp, scenario_id, run_id, ctx, [outputs0, outputs])

      :no_reached_timeout ->
        {List.flatten(outputs0), state}
    end
  end

  defp init_unit(template, unit_registry, timestamp, unit, scenario_id, run_id) do
    tmpl_res =
      try do
        Template.init(template, timestamp, unit)
      rescue
        e -> {:error, e, __STACKTRACE__}
      catch
        e -> {:error, e, __STACKTRACE__}
      end

    case tmpl_res do
      {:ok, outputs, %Unit{} = unit} when is_list(outputs) ->
        {:ok, unit_registry} = UnitRegistry.update(unit_registry, unit)

        {_outputs, %UnitRegistry{}} =
          process_unit_outputs(
            unit_registry,
            unit,
            outputs,
            scenario_id,
            run_id,
            template,
            "init"
          )

      {:error, e, trace} ->
        template = inspect(template)
        e = format_exception(e, trace)

        Logger.warn("""
        TemplateCarrier has ignored initialization of unit #{template}/#{unit.id}
        Error: #{e}
        """)

        {[], unit_registry}

      {:ok, outputs, bad_unit} when is_list(outputs) ->
        template = inspect(template)
        bad_unit = inspect(bad_unit, @error_inspect_limit)
        Logger.warn("Unit #{template}/#{unit.id} returned bad unit during init: #{bad_unit}")
        {[], unit_registry}

      bad_resp ->
        template = inspect(template)
        bad_resp = inspect(bad_resp, [{:pretty, true} | @error_inspect_limit])
        Logger.warn("Unit #{template}/#{unit.id} returned bad response during init: #{bad_resp}")
        {[], unit_registry}
    end
  end

  defp handle_asset_discovery_message(template, unit_registry, msg, scenario_id, run_id) do
    tmpl_res =
      try do
        Template.handle_asset_discovery(template, msg)
      rescue
        e -> {:error, e, __STACKTRACE__}
      catch
        e -> {:error, e, __STACKTRACE__}
      end

    case tmpl_res do
      {:error, e, trace} ->
        msg = inspect(msg, [{:pretty, true} | @error_inspect_limit])
        template = inspect(template)
        e = format_exception(e, trace)

        Logger.warn("""
        TemplateCarrier has ignored discovery message of template #{template}
        Error: #{e}
        Message: #{msg}
        """)

        {[], unit_registry}

      {:reply, outputs} when is_list(outputs) ->
        # there is no action to be handled by template_carrier yet, so we filter out
        # possible timeout registrations
        outputs =
          outputs
          |> remove_timeout_registrations()
          |> process_template_outputs(scenario_id, run_id, template)

        {outputs, unit_registry}

      {:reply, outputs, %Unit{} = unit} when is_list(outputs) ->
        unit_registry = UnitRegistry.register(unit_registry, unit)

        {outputs, unit_registry} =
          process_unit_outputs(
            unit_registry,
            unit,
            outputs,
            scenario_id,
            run_id,
            template,
            "asset discovery"
          )

        {init_outputs, unit_registry} =
          init_unit(template, unit_registry, msg.timestamp, unit, scenario_id, run_id)

        {[outputs, init_outputs], unit_registry}

      {:reply, outputs, bad_unit} when is_list(outputs) ->
        template = inspect(template)
        bad_unit = inspect(bad_unit, @error_inspect_limit)
        Logger.warn("Unit #{template} returned bad unit during asset discovery: #{bad_unit}")
        {[], unit_registry}

      bad_resp ->
        template = inspect(template)
        bad_resp = inspect(bad_resp, [{:pretty, true} | @error_inspect_limit])
        Logger.warn("Unit #{template} returned bad response during asset discovery: #{bad_resp}")
        {[], unit_registry}
    end
  end

  defp handle_regular_message(template, unit_registry, unit, msg, scenario_id, run_id) do
    tmpl_res =
      try do
        Template.handle_message(template, msg, unit)
      rescue
        e -> {:error, e, __STACKTRACE__}
      catch
        e -> {:error, e, __STACKTRACE__}
      end

    case tmpl_res do
      {:error, e, trace} ->
        msg = inspect(msg, [{:pretty, true} | @error_inspect_limit])
        e = format_exception(e, trace)
        template = inspect(template)
        inspected_unit = inspect(unit, @error_inspect_limit)

        Logger.warn("""
        TemplateCarrier has ignored regular message for unit #{template}/#{unit.id}
        Error: #{e}
        Message: #{msg}
        Unit: #{inspected_unit}
        """)

        {[], unit_registry}

      {:reply, msgs, %Unit{} = unit} when is_list(msgs) ->
        {:ok, unit_registry} = UnitRegistry.update(unit_registry, unit)

        {_msgs, %UnitRegistry{}} =
          process_unit_outputs(
            unit_registry,
            unit,
            msgs,
            scenario_id,
            run_id,
            template,
            "handle message"
          )

      {:stop, msgs, %Unit{} = unit} when is_list(msgs) ->
        # process the OAs, changes to the unit are not important, since the unit will be removed
        {msgs, unit_registry} =
          process_unit_outputs(
            unit_registry,
            unit,
            msgs,
            scenario_id,
            run_id,
            template,
            "handle message"
          )

        unit_registry = UnitRegistry.unregister(unit_registry, unit)
        {msgs, unit_registry}

      {_op, msgs, bad_unit} when is_list(msgs) ->
        template = inspect(template)
        bad_unit = inspect(bad_unit, @error_inspect_limit)

        Logger.warn(
          "Unit #{template}/#{unit.id} returned bad unit during handle message: #{bad_unit}"
        )

        {[], unit_registry}

      bad_resp ->
        template = inspect(template)
        bad_resp = inspect(bad_resp, [{:pretty, true} | @error_inspect_limit])

        Logger.warn(
          "Unit #{template}/#{unit.id} returned bad response during handle message: #{bad_resp}"
        )

        {[], unit_registry}
    end
  end

  defp process_template_outputs(outputs, scenario_id, run_id, template) do
    outputs
    |> Enum.filter(fn
      %OutputAction{} ->
        true

      %Message{} ->
        true

      bad_oa ->
        template = inspect(template)
        bad_oa = inspect(bad_oa, @error_inspect_limit)
        Logger.warn("Unit #{template} returned bad OA during asset discovery: #{bad_oa}")
        false
    end)
    |> Enum.map(fn
      %OutputAction{} = output -> maybe_update_output_action(output, scenario_id, run_id)
      # do nothing with output messages
      %Message{} = output -> output
    end)
  end

  defp process_unit_outputs(unit_registry, unit, outputs, scenario_id, run_id, template, phase) do
    {outputs, unit_registry} =
      Enum.reduce(outputs, {[], unit_registry}, fn
        %RuntimeInstruction{body: %TimeoutRuntimeInstruction{}} = output,
        {outputs, unit_registry} ->
          unit_registry = register_timeout(unit_registry, unit, output)
          {outputs, unit_registry}

        %OutputAction{} = output, {outputs, unit_registry} ->
          output = maybe_update_output_action(output, scenario_id, run_id)
          {[output | outputs], unit_registry}

        %Message{} = output, {outputs, unit_registry} ->
          # do nothing with output messages
          {[output | outputs], unit_registry}

        bad_oa, state ->
          template = inspect(template)
          bad_oa = inspect(bad_oa, @error_inspect_limit)
          Logger.warn("Unit #{template}/#{unit.id} returned bad OA during #{phase}: #{bad_oa}")
          state
      end)

    {Enum.reverse(outputs), unit_registry}
  end

  defp remove_timeout_registrations(outputs) do
    Enum.reject(outputs, fn
      %RuntimeInstruction{body: %TimeoutRuntimeInstruction{}} -> true
      _ -> false
    end)
  end

  defp register_timeout(unit_registry, unit, %RuntimeInstruction{
         body: %TimeoutRuntimeInstruction{} = timeout_registration
       }) do
    unit_registry =
      UnitRegistry.register_timeout(
        unit_registry,
        unit,
        timeout_registration.timeout,
        timeout_registration.timeout_message
      )

    unit_registry
  end

  defp maybe_update_output_action(
         %OutputAction{type: :notify} = output_action,
         scenario_id,
         _run_id
       ) do
    notification_body = %{output_action.body | "scenario" => scenario_id}
    %OutputAction{output_action | body: notification_body}
  end

  defp maybe_update_output_action(
         %OutputAction{type: :create_event} = output_action,
         _scenario_id,
         run_id
       ) do
    event_body = %{output_action.body | "run_id" => run_id}
    %OutputAction{output_action | body: event_body}
  end

  defp maybe_update_output_action(
         %OutputAction{type: :execute_sql} = output_action,
         _scenario_id,
         run_id
       ) do
    body = %{output_action.body | "run_id" => run_id}
    %OutputAction{output_action | body: body}
  end

  defp maybe_update_output_action(%OutputAction{} = output_action, _scenario_id, _run_id) do
    # ignore output actions which do not need updates
    output_action
  end

  defp format_exception(e, trace) do
    String.trim(Exception.format(:error, e, trim_trace(trace)))
  end

  defp trim_trace(trace) do
    get_file = fn
      {_mod, _fun, _arity, location} -> to_string(location[:file])
      {_fun, _arity, location} -> to_string(location[:file])
    end

    trace
    |> Enum.reduce_while([], fn item, trace ->
      if String.starts_with?(get_file.(item), "lib/runtime/") do
        {:halt, trace}
      else
        {:cont, [item | trace]}
      end
    end)
    |> Enum.reverse()
  end
end