lib/runbox/runtime/simple/state_loader.ex

defmodule Runbox.Runtime.Simple.StateLoader do
  @moduledoc """
  Handles run state load or initialization for simple scenario runtime.
  """

  alias Runbox.Runtime.Simple.TemplateCarrier
  alias Runbox.Runtime.Simple.Timezip
  alias Runbox.Scenario.StartParams
  alias Runbox.StateStore
  require Logger

  @behaviour Runbox.Runtime.StateLoader

  @template_id TemplateCarrier.component_name()
  @output_id :output_stream
  @timezip_id Timezip.component_name()

  @impl true
  def load_existing_state(%StartParams{} = start_params, instance_context, start_recipe) do
    with {:ok, ts, entities} <- StateStore.load_run_state(instance_context.state_store_pid) do
      {:ok, assign_states(start_recipe, start_params, ts, entities, :continue, instance_context)}
    end
  end

  @impl true
  def init_new_state(%StartParams{} = start_params, instance_context, start_recipe) do
    # Template Carrier initializes its state on its own
    entity_defs = [{@template_id, nil}, {@output_id, nil}]

    {:ok, entities} =
      StateStore.init_run_state(
        instance_context.state_store_pid,
        start_params.start_from,
        entity_defs
      )

    {:ok,
     assign_states(
       start_recipe,
       start_params,
       start_params.start_from,
       entities,
       :start,
       instance_context
     )}
  end

  defp assign_states(
         start_recipe,
         %StartParams{} = start_params,
         ts,
         entities,
         start_type,
         instance_context
       ) do
    entities = Map.new(entities, &{&1.id, &1})

    # Moving state loaders to Runbox introduced this shadow dependency of knowing scenario
    # components which are defined in Altworx.
    # This applies to input_stream, tick_timezip components.
    Enum.map(
      start_recipe,
      &assign_state(&1, start_params, ts, entities, start_type, instance_context)
    )
  end

  defp assign_state(
         %{id: @template_id, args: args} = def,
         %StartParams{} = start_params,
         ts,
         entities,
         type,
         _
       ) do
    entity = Map.get(entities, @template_id)

    config =
      args.config
      |> Map.put(:scenario_id, start_params.scenario_id)
      |> Map.put(:start_from, ts)
      |> Map.put(:start_or_continue, type)

    args =
      args
      |> Map.put(:run_id, start_params.run_id)
      |> Map.put(:config, config)
      |> Map.put(:state_entity, entity)

    %{def | args: args}
  end

  defp assign_state(
         %{id: {:input_stream, _}, args: args} = def,
         %StartParams{} = start_params,
         ts,
         _,
         type,
         ctx
       ) do
    topic = Map.get(start_params.input_topics, args.config.topic)

    # where should input stream start reading data
    ts =
      cond do
        type == :continue -> ts + 1
        args.config.type == :input_topic -> ts
        args.config.type == :load_topic -> :earliest
      end

    config =
      args.config
      |> Map.put(:topic, topic)
      |> Map.put(:start_from, ts)
      |> Map.put(:brod_client, ctx.brod_client)

    args =
      args
      |> Map.put(:run_id, start_params.run_id)
      |> Map.put(:config, config)

    %{def | args: args}
  end

  defp assign_state(
         %{id: @output_id, args: args} = def,
         %StartParams{} = start_params,
         ts,
         entities,
         _,
         _
       ) do
    entity = Map.get(entities, @output_id)

    config =
      args.config
      |> Map.put(:side_effects?, start_params.side_effects?)
      |> Map.put(:output_actions_from, start_params.output_actions_from)
      |> Map.put(:notifications_from, start_params.notifications_from)
      |> Map.put(:start_from, ts)
      |> Map.put(:scenario_id, start_params.scenario_id)

    args =
      args
      |> Map.put(:run_id, start_params.run_id)
      |> Map.put(:config, config)
      |> Map.put(:state_entity, entity)

    %{def | args: args}
  end

  defp assign_state(%{id: @timezip_id, args: args} = def, %StartParams{} = start_params, _, _, _, _) do
    config = Map.put(args.config, :scenario_id, start_params.scenario_id)

    args =
      args
      |> Map.put(:run_id, start_params.run_id)
      |> Map.put(:config, config)

    %{def | args: args}
  end

  defp assign_state(
         %{id: {:tick_timezip, logical_topic}, args: args} = def,
         %StartParams{} = start_params,
         start_from,
         _,
         _,
         _
       ) do
    {time, res} =
      :timer.tc(fn ->
        topic_map = Map.get(start_params.input_topics, logical_topic)
        physical_topic = topic_map.physical_topic

        config =
          args.config
          |> Map.put(:scenario_id, start_params.scenario_id)
          |> Map.put(:topic, physical_topic)
          |> Map.put(:start_from, start_from)

        args =
          args
          |> Map.put(:run_id, start_params.run_id)
          |> Map.put(:config, config)

        %{def | args: args}
      end)

    Logger.info(
      "Run #{start_params.run_id} fetch tick_timezip #{logical_topic} state took #{div(time, 1000)}ms."
    )

    res
  end
end