lib/pipeline.ex

defmodule ALF.Pipeline do
  @type t :: %__MODULE__{
          module: atom,
          components: [map],
          producer: map,
          consumer: map
        }

  defstruct module: nil,
            components: [],
            producer: nil,
            consumer: nil

  alias ALF.Components.{Switch, Clone}

  def stages_to_list(components) do
    do_stages_to_list(components, [])
  end

  def find_component_by_pid(components, pid) do
    try do
      :ok = do_find_component_by_pid(components, pid)
      nil
    catch
      component ->
        component
    end
  end

  def do_find_component_by_pid(components, pid) do
    Enum.each(components, fn component ->
      if component.pid == pid do
        throw(component)
      else
        case component do
          %Switch{branches: branches} ->
            Enum.each(branches, fn {_key, partition_comps} ->
              do_find_component_by_pid(partition_comps, pid)
            end)

          %Clone{to: to_components} ->
            do_find_component_by_pid(to_components, pid)

          _component ->
            nil
        end
      end
    end)
  end

  defp do_stages_to_list(components, list) do
    Enum.reduce(components, list, fn stage, found ->
      found ++
        case stage do
          %Switch{branches: branches} = stage ->
            [stage] ++
              Enum.reduce(branches, [], fn {_key, partition_stages}, inner_found ->
                inner_found ++ do_stages_to_list(partition_stages, [])
              end)

          %Clone{to: pipe_stages} = stage ->
            [stage] ++ do_stages_to_list(pipe_stages, [])

          stage ->
            [stage]
        end
    end)
  end
end