defmodule Runbox.Runtime.Stage.Sandbox.StartRecipe do
@moduledoc """
Transform valid component network to gen_stage definitions.
"""
alias Runbox.Runtime.Stage.ComponentNetwork
alias Runbox.Runtime.Stage.Sandbox.InputStream
alias Runbox.Runtime.Stage.Sandbox.OutputStream
alias Runbox.Runtime.Stage.TemplateCarrier
alias Runbox.Runtime.Stage.Timezip
@type component_network :: ComponentNetwork.t()
@type component :: ComponentNetwork.component()
@type subscription :: ComponentNetwork.subscription()
@doc """
Transform valid component network to gen_stage definitions.
"""
@spec create(component_network) :: [map]
def create(component_network) do
for component <- ComponentNetwork.topology_sort(component_network) do
component_to_child_spec(component, component_network[component])
end
end
def component_name(component) do
resolve_component_name(component.id)
end
defp component_to_child_spec({:input_topic, topic_name} = component, []) do
%{
id: component,
mod: InputStream,
fun: :start_link,
args: %{config: %{topic: topic_name}}
}
end
defp component_to_child_spec({:load_topic, topic_name} = component, []) do
%{
id: component,
mod: InputStream,
fun: :start_link,
args: %{config: %{topic: topic_name}}
}
end
defp component_to_child_spec({:template, template_module}, subscriptions) do
%{
id: {:template, template_module},
mod: TemplateCarrier,
fun: :start_link,
args: %{
config: %{
template: template_module,
subscribe_to: resolve_subscription_components(subscriptions)
}
}
}
end
defp component_to_child_spec({:output_sink, :output_sink}, subscriptions) do
%{
id: :output_sink,
mod: OutputStream,
fun: :start_link,
args: %{config: %{subscribe_to: resolve_subscription_components(subscriptions)}}
}
end
defp component_to_child_spec({:timezip, subscriptions}, subscriptions) do
%{
id: {:timezip, subscriptions},
mod: Timezip,
fun: :start_link,
args: %{config: %{subscribe_to: resolve_subscription_components(subscriptions)}}
}
end
defp resolve_component_name({:template, template_module}) do
TemplateCarrier.component_name(template_module)
end
defp resolve_component_name({:input_topic, topic_name}) do
InputStream.component_name(topic_name)
end
defp resolve_component_name({:load_topic, topic_name}) do
InputStream.component_name(topic_name)
end
defp resolve_component_name(:output_sink) do
OutputStream.component_name()
end
defp resolve_component_name({:timezip, subscriptions}) do
subscriptions
|> resolve_subscription_components()
|> Timezip.component_name()
end
defp resolve_subscription_components(subscriptions) do
Enum.map(subscriptions, fn {component, selector_config} ->
{resolve_component_name(component), selector_config}
end)
end
end