defmodule ALF.Builder do
alias ALF.Pipeline
alias ALF.Components.{
Producer,
Stage,
Goto,
DeadEnd,
GotoPoint,
Switch,
Clone,
Consumer,
Plug,
Unplug,
Decomposer,
Recomposer,
Tbd
}
@spec build(atom, pid, atom, boolean) :: {:ok, Pipeline.t()}
def build(pipeline_module, supervisor_pid, manager_name, telemetry_enabled) do
pipe_spec = pipeline_module.alf_components()
producer = start_producer(supervisor_pid, manager_name, pipeline_module, telemetry_enabled)
{last_stages, final_stages} =
do_build_pipeline(pipe_spec, [producer], supervisor_pid, [], telemetry_enabled)
consumer =
start_consumer(
supervisor_pid,
last_stages,
manager_name,
pipeline_module,
telemetry_enabled
)
{producer, consumer} = set_modules({producer, consumer}, last_stages)
pipeline = %Pipeline{producer: producer, consumer: consumer, components: final_stages}
{:ok, pipeline}
end
defp do_build_pipeline(pipe_spec, producers, supervisor_pid, final_stages, telemetry_enabled)
when is_list(pipe_spec) do
pipe_spec
|> Enum.reduce({producers, final_stages}, fn stage_spec, {prev_stages, stages} ->
case stage_spec do
%Stage{count: count} = stage ->
stage_set_ref = make_ref()
new_stages =
Enum.map(0..(count - 1), fn number ->
start_stage(
%{stage | stage_set_ref: stage_set_ref, number: number},
supervisor_pid,
prev_stages,
telemetry_enabled
)
end)
{new_stages, stages ++ new_stages}
%Goto{} = goto ->
goto = start_stage(goto, supervisor_pid, prev_stages, telemetry_enabled)
{[goto], stages ++ [goto]}
%DeadEnd{} = dead_end ->
dead_end = start_stage(dead_end, supervisor_pid, prev_stages, telemetry_enabled)
{[], stages ++ [dead_end]}
%GotoPoint{} = goto_point ->
goto_point = start_stage(goto_point, supervisor_pid, prev_stages, telemetry_enabled)
{[goto_point], stages ++ [goto_point]}
%Switch{branches: branches} = switch ->
switch = start_stage(switch, supervisor_pid, prev_stages, telemetry_enabled)
{last_stages, branches} =
Enum.reduce(branches, {[], %{}}, fn {key, inner_pipe_spec},
{all_last_stages, branches} ->
{last_stages, final_stages} =
do_build_pipeline(
inner_pipe_spec,
[{switch, partition: key}],
supervisor_pid,
[],
telemetry_enabled
)
{all_last_stages ++ last_stages, Map.put(branches, key, final_stages)}
end)
switch = %{switch | branches: branches}
{last_stages, stages ++ [switch]}
%Clone{to: pipe_stages} = clone ->
clone = start_stage(clone, supervisor_pid, prev_stages, telemetry_enabled)
{last_stages, final_stages} =
do_build_pipeline(pipe_stages, [clone], supervisor_pid, [], telemetry_enabled)
clone = %{clone | to: final_stages}
{last_stages ++ [clone], stages ++ [clone]}
%Plug{} = plug ->
plug = start_stage(plug, supervisor_pid, prev_stages, telemetry_enabled)
{[plug], stages ++ [plug]}
%Unplug{} = unplug ->
unplug = start_stage(unplug, supervisor_pid, prev_stages, telemetry_enabled)
{[unplug], stages ++ [unplug]}
%Decomposer{} = decomposer ->
decomposer = start_stage(decomposer, supervisor_pid, prev_stages, telemetry_enabled)
{[decomposer], stages ++ [decomposer]}
%Recomposer{} = recomposer ->
recomposer = start_stage(recomposer, supervisor_pid, prev_stages, telemetry_enabled)
{[recomposer], stages ++ [recomposer]}
%Tbd{} = tbd ->
tbd = start_stage(tbd, supervisor_pid, prev_stages, telemetry_enabled)
{[tbd], stages ++ [tbd]}
end
end)
end
@spec build_sync(atom, boolean) :: [map]
def build_sync(pipeline_module, telemetry_enabled) do
pipe_spec = pipeline_module.alf_components()
producer = Producer.init_sync(%Producer{pipeline_module: pipeline_module}, telemetry_enabled)
{components, last_stage_refs} = do_build_sync(pipe_spec, [producer.pid], telemetry_enabled)
consumer = Consumer.init_sync(%Consumer{pipeline_module: pipeline_module}, telemetry_enabled)
subscribed_to = Enum.map(last_stage_refs, &{&1, :sync})
consumer = %{consumer | subscribed_to: subscribed_to}
[producer | components] ++ [consumer]
end
defp do_build_sync(pipe_spec, stage_refs, telemetry_enabled) when is_list(pipe_spec) do
Enum.reduce(pipe_spec, {[], stage_refs}, fn comp, {stages, last_stage_refs} ->
subscribed_to = Enum.map(last_stage_refs, &{&1, :sync})
case comp do
%Switch{branches: branches} = switch ->
switch = switch.__struct__.init_sync(switch, telemetry_enabled)
branches =
Enum.reduce(branches, %{}, fn {key, inner_pipe_spec}, branch_pipes ->
{branch_stages, _last_ref} =
do_build_sync(inner_pipe_spec, [switch.pid], telemetry_enabled)
Map.put(branch_pipes, key, branch_stages)
end)
switch = %{switch | branches: branches, subscribed_to: subscribed_to}
last_stage_refs =
Enum.map(branches, fn {_key, stages} ->
case List.last(stages) do
nil -> nil
stage -> stage.pid
end
end)
{stages ++ [switch], last_stage_refs}
%Clone{to: pipe_stages} = clone ->
clone = clone.__struct__.init_sync(clone, telemetry_enabled)
{to_stages, _last_ref} = do_build_sync(pipe_stages, [clone.pid], telemetry_enabled)
clone = %{clone | to: to_stages, subscribed_to: subscribed_to}
{stages ++ [clone], [clone.pid]}
component ->
component = component.__struct__.init_sync(component, telemetry_enabled)
component = %{component | subscribed_to: subscribed_to}
{stages ++ [component], [component.pid]}
end
end)
end
defp start_producer(supervisor_pid, manager_name, pipeline_module, telemetry_enabled) do
producer = %Producer{
manager_name: manager_name,
pipeline_module: pipeline_module,
telemetry_enabled: telemetry_enabled
}
{:ok, producer_pid} = DynamicSupervisor.start_child(supervisor_pid, {Producer, producer})
%{producer | pid: producer_pid}
end
defp start_consumer(
supervisor_pid,
last_stages,
manager_name,
pipeline_module,
telemetry_enabled
) do
subscribe_to = subscribe_to_opts(last_stages)
consumer = %Consumer{
subscribe_to: subscribe_to,
manager_name: manager_name,
pipeline_module: pipeline_module,
telemetry_enabled: telemetry_enabled
}
{:ok, consumer_pid} = DynamicSupervisor.start_child(supervisor_pid, {Consumer, consumer})
%{consumer | pid: consumer_pid}
end
defp set_modules({producer, consumer}, last_stages) do
last_stage = hd(last_stages)
producer = %{
producer
| pipe_module: last_stage.pipeline_module,
pipeline_module: last_stage.pipeline_module
}
consumer = %{
consumer
| pipe_module: last_stage.pipeline_module,
pipeline_module: last_stage.pipeline_module
}
{producer, consumer}
end
defp start_stage(stage, supervisor_pid, prev_stages, telemetry_enabled) do
stage = %{
stage
| subscribe_to: subscribe_to_opts(prev_stages),
telemetry_enabled: telemetry_enabled
}
{:ok, stage_pid} = DynamicSupervisor.start_child(supervisor_pid, {stage.__struct__, stage})
%{stage | pid: stage_pid}
end
defp subscribe_to_opts(stages) do
Enum.map(stages, fn stage ->
case stage do
{stage, partition: key} ->
{stage.pid, max_demand: 1, cancel: :transient, partition: key}
stage ->
{stage.pid, max_demand: 1, cancel: :transient}
end
end)
end
end