lib/runbox/runtime/stage/sandbox/start_recipe.ex

defmodule Runbox.Runtime.Stage.Sandbox.StartRecipe do
  @moduledoc """
  Transform valid component network to gen_stage definitions.
  """

  alias Runbox.Runtime.Stage.ComponentNetwork
  alias Runbox.Runtime.Stage.Sandbox.InputStream
  alias Runbox.Runtime.Stage.Sandbox.OutputStream
  alias Runbox.Runtime.Stage.TemplateCarrier
  alias Runbox.Runtime.Stage.Timezip

  @type component_network :: ComponentNetwork.t()
  @type component :: ComponentNetwork.component()
  @type subscription :: ComponentNetwork.subscription()

  @doc """
  Transform valid component network to gen_stage definitions.
  """
  @spec create(component_network) :: [map]
  def create(component_network) do
    for component <- ComponentNetwork.topology_sort(component_network) do
      component_to_child_spec(component, component_network[component])
    end
  end

  def component_name(component) do
    resolve_component_name(component.id)
  end

  defp component_to_child_spec({:input_topic, topic_name} = component, []) do
    %{
      id: component,
      mod: InputStream,
      fun: :start_link,
      args: %{config: %{topic: topic_name}}
    }
  end

  defp component_to_child_spec({:load_topic, topic_name} = component, []) do
    %{
      id: component,
      mod: InputStream,
      fun: :start_link,
      args: %{config: %{topic: topic_name}}
    }
  end

  defp component_to_child_spec({:template, template_module}, subscriptions) do
    %{
      id: {:template, template_module},
      mod: TemplateCarrier,
      fun: :start_link,
      args: %{
        config: %{
          template: template_module,
          subscribe_to: resolve_subscription_components(subscriptions)
        }
      }
    }
  end

  defp component_to_child_spec({:output_sink, :output_sink}, subscriptions) do
    %{
      id: :output_sink,
      mod: OutputStream,
      fun: :start_link,
      args: %{config: %{subscribe_to: resolve_subscription_components(subscriptions)}}
    }
  end

  defp component_to_child_spec({:timezip, subscriptions}, subscriptions) do
    %{
      id: {:timezip, subscriptions},
      mod: Timezip,
      fun: :start_link,
      args: %{config: %{subscribe_to: resolve_subscription_components(subscriptions)}}
    }
  end

  defp resolve_component_name({:template, template_module}) do
    TemplateCarrier.component_name(template_module)
  end

  defp resolve_component_name({:input_topic, topic_name}) do
    InputStream.component_name(topic_name)
  end

  defp resolve_component_name({:load_topic, topic_name}) do
    InputStream.component_name(topic_name)
  end

  defp resolve_component_name(:output_sink) do
    OutputStream.component_name()
  end

  defp resolve_component_name({:timezip, subscriptions}) do
    subscriptions
    |> resolve_subscription_components()
    |> Timezip.component_name()
  end

  defp resolve_subscription_components(subscriptions) do
    Enum.map(subscriptions, fn {component, selector_config} ->
      {resolve_component_name(component), selector_config}
    end)
  end
end