lib/runbox/runtime/simple/component_network.ex

defmodule Runbox.Runtime.Simple.ComponentNetwork do
  @moduledoc group: :internal
  @moduledoc """
  Component network builds and validates template dependency network for Simple scenario runtime.

  The component network for Simple scenarios is quite simple.
  * first you have input topics, usually followed by tick timezip for each of them
  * following is a timezip zipping all input topics into a single stream
  * the stream is processed by the template which is the heart of the scenario, it produces a stream of output actions
  * finally there's an output sink executing the output actions
  """

  @behaviour Runbox.Runtime.ComponentNetwork

  @type timezip :: {:timezip, to_zip :: [component()]}
  @type template :: {:template, module()}
  @type tick_timezip :: {:tick_timezip, topic :: String.t()}
  @type input :: {:input_topic | :load_topic, topic :: String.t()}
  @type output_sink() :: :output_sink
  @type component :: template() | input() | tick_timezip() | output_sink() | timezip()
  @type t :: [component()]

  @impl true
  def convert_to_network([template]) do
    template
  end

  @impl true
  def create([template], _opts) do
    network =
      List.flatten([
        inputs(template.info.topics),
        [{:template, template.module}, :output_sink]
      ])

    {:ok, network}
  end

  def create([], _) do
    {:error, :no_template_found}
  end

  def create(_, _) do
    {:error, :multiple_templates_not_supported}
  end

  @impl true
  def input_topics(network) do
    network
    |> Enum.filter(fn
      {:input_topic, _} -> true
      {:load_topic, _} -> true
      _ -> false
    end)
    |> Enum.map(fn {_, topic} -> topic end)
  end

  # generate topics, optionally with tick timezips and a single timezip at the end, if needed
  defp inputs([topic]), do: input_with_tick_timezip(topic)

  defp inputs(topics) do
    topic_components = Enum.map(topics, &input_with_tick_timezip/1)
    timezip = [{:timezip, Enum.map(topic_components, &List.last/1)}]

    List.flatten(topic_components) ++ timezip
  end

  defp input_with_tick_timezip({_, logical_topic} = topic),
    do: [topic, {:tick_timezip, logical_topic}]
end