lib/sync_runner.ex

defmodule ALF.SyncRunner do
  alias ALF.Components.{
    Goto,
    GotoPoint,
    Switch,
    Done
  }

  alias ALF.Pipeline
  alias ALF.{ErrorIP, IP}

  @spec run([map], IP.t()) :: IP.t() | ErrorIP.t() | [IP.t() | ErrorIP.t()] | nil
  def run([first | _] = pipeline, %IP{sync_path: nil} = ip) do
    {path, true} = path(pipeline, first.pid)
    ip = %{ip | sync_path: path}
    run(pipeline, ip)
  end

  def run(pipeline, ip) do
    do_run(pipeline, ip, [], [])
  end

  defp do_run(pipeline, %IP{sync_path: sync_path} = ip, queue, results) do
    case sync_path do
      [] ->
        do_run(pipeline, nil, queue, [ip | results])

      [cref | rest] ->
        ip = %{ip | sync_path: rest}
        component = find_component(pipeline, cref)

        case run_component(component, ip, pipeline) do
          nil ->
            do_run(pipeline, nil, queue, results)

          [ip | ips] ->
            do_run(pipeline, ip, queue ++ ips, results)

          ip ->
            do_run(pipeline, ip, queue, results)
        end
    end
  end

  defp do_run(_pipeline, nil, [], results), do: Enum.reverse(results)

  defp do_run(pipeline, nil, [ip | ips], results) do
    do_run(pipeline, ip, ips, results)
  end

  defp do_run(pipeline, %ErrorIP{} = error_ip, queue, results) do
    do_run(pipeline, nil, queue, [error_ip | results])
  end

  def run_component(component, ip, pipeline) do
    case component do
      %Switch{} = switch ->
        case switch.__struct__.sync_process(ip, switch) do
          %ErrorIP{} = error_ip ->
            error_ip

          partition ->
            [first | _] = Map.fetch!(switch.branches, partition)
            {sync_path, true} = path(pipeline, first.pid)
            %{ip | sync_path: sync_path}
        end

      %Goto{} = goto ->
        case goto.__struct__.sync_process(ip, component) do
          {false, ip} ->
            ip

          {true, ip} ->
            goto_point = find_goto_point(pipeline, goto.to)
            {sync_path, true} = path(pipeline, goto_point)
            %{ip | sync_path: sync_path}
        end

      %Done{} = done ->
        case done.__struct__.sync_process(ip, component) do
          {true, ip} ->
            consumer = List.last(pipeline)
            {sync_path, true} = path(pipeline, consumer.pid)
            %{ip | sync_path: sync_path}

          {false, ip} ->
            ip
        end

      component ->
        component.__struct__.sync_process(ip, component)
    end
  end

  def path(pipeline, pid) do
    pipeline
    |> Enum.reduce({[], false}, fn component, {ref_list, found} ->
      case component do
        %Switch{branches: branches} = switch ->
          if found or pid == switch.pid do
            {ref_list ++ [switch.pid], true}
          else
            {inner_path, found_inside} =
              Enum.reduce(branches, {[], false}, fn {_key, inner_pipeline},
                                                    {ref_list, found_in_branch} ->
                case path(inner_pipeline, pid) do
                  {path, true} ->
                    {ref_list ++ path, true}

                  {[], false} ->
                    {ref_list, found_in_branch}
                end
              end)

            if found_inside do
              {ref_list ++ inner_path, true}
            else
              {ref_list, false}
            end
          end

        component ->
          if found or pid == component.pid do
            {ref_list ++ [component.pid], true}
          else
            {ref_list, false}
          end
      end
    end)
  end

  def find_component(pipeline, ref) do
    Pipeline.find_component_by_pid(pipeline, ref)
  end

  def find_goto_point(pipeline, name) do
    case Enum.filter(
           Pipeline.stages_to_list(pipeline),
           &(&1.name == name and &1.__struct__ == GotoPoint)
         ) do
      [component] ->
        component.pid

      [_component | _other] = components ->
        raise "Goto component error: found #{Enum.count(components)} components with name #{name}"

      [] ->
        raise "Goto component error: no component with name #{name}"
    end
  end
end