defmodule ALF.SyncRunner do
alias ALF.Components.{
Goto,
GotoPoint,
Switch,
Done
}
alias ALF.Pipeline
alias ALF.{ErrorIP, IP}
@spec run([map], IP.t()) :: IP.t() | ErrorIP.t() | [IP.t() | ErrorIP.t()] | nil
def run([first | _] = pipeline, %IP{sync_path: nil} = ip) do
{path, true} = path(pipeline, first.pid)
ip = %{ip | sync_path: path}
run(pipeline, ip)
end
def run(pipeline, ip) do
do_run(pipeline, ip, [], [])
end
defp do_run(pipeline, %IP{sync_path: sync_path} = ip, queue, results) do
case sync_path do
[] ->
do_run(pipeline, nil, queue, [ip | results])
[cref | rest] ->
ip = %{ip | sync_path: rest}
component = find_component(pipeline, cref)
case run_component(component, ip, pipeline) do
nil ->
do_run(pipeline, nil, queue, results)
[ip | ips] ->
do_run(pipeline, ip, queue ++ ips, results)
ip ->
do_run(pipeline, ip, queue, results)
end
end
end
defp do_run(_pipeline, nil, [], results), do: Enum.reverse(results)
defp do_run(pipeline, nil, [ip | ips], results) do
do_run(pipeline, ip, ips, results)
end
defp do_run(pipeline, %ErrorIP{} = error_ip, queue, results) do
do_run(pipeline, nil, queue, [error_ip | results])
end
def run_component(component, ip, pipeline) do
case component do
%Switch{} = switch ->
case switch.__struct__.sync_process(ip, switch) do
%ErrorIP{} = error_ip ->
error_ip
partition ->
[first | _] = Map.fetch!(switch.branches, partition)
{sync_path, true} = path(pipeline, first.pid)
%{ip | sync_path: sync_path}
end
%Goto{} = goto ->
case goto.__struct__.sync_process(ip, component) do
{false, ip} ->
ip
{true, ip} ->
goto_point = find_goto_point(pipeline, goto.to)
{sync_path, true} = path(pipeline, goto_point)
%{ip | sync_path: sync_path}
end
%Done{} = done ->
case done.__struct__.sync_process(ip, component) do
{true, ip} ->
consumer = List.last(pipeline)
{sync_path, true} = path(pipeline, consumer.pid)
%{ip | sync_path: sync_path}
{false, ip} ->
ip
end
component ->
component.__struct__.sync_process(ip, component)
end
end
def path(pipeline, pid) do
pipeline
|> Enum.reduce({[], false}, fn component, {ref_list, found} ->
case component do
%Switch{branches: branches} = switch ->
if found or pid == switch.pid do
{ref_list ++ [switch.pid], true}
else
{inner_path, found_inside} =
Enum.reduce(branches, {[], false}, fn {_key, inner_pipeline},
{ref_list, found_in_branch} ->
case path(inner_pipeline, pid) do
{path, true} ->
{ref_list ++ path, true}
{[], false} ->
{ref_list, found_in_branch}
end
end)
if found_inside do
{ref_list ++ inner_path, true}
else
{ref_list, false}
end
end
component ->
if found or pid == component.pid do
{ref_list ++ [component.pid], true}
else
{ref_list, false}
end
end
end)
end
def find_component(pipeline, ref) do
Pipeline.find_component_by_pid(pipeline, ref)
end
def find_goto_point(pipeline, name) do
case Enum.filter(
Pipeline.stages_to_list(pipeline),
&(&1.name == name and &1.__struct__ == GotoPoint)
) do
[component] ->
component.pid
[_component | _other] = components ->
raise "Goto component error: found #{Enum.count(components)} components with name #{name}"
[] ->
raise "Goto component error: no component with name #{name}"
end
end
end