lib/builder.ex

defmodule Strom.Builder do
  alias Strom.Flow
  alias Strom.DSL

  def build(components, flow_pid) when is_pid(flow_pid) do
    do_build(components, nil, flow_pid)
  end

  defp do_build(components, stream, flow_pid) when is_list(components) do
    components
    |> Enum.reduce(stream, fn component, stream ->
      case component do
        %DSL.Source{origin: origin} ->
          source = Strom.Source.start(origin)
          Flow.add_component(flow_pid, {:source, source})
          Strom.Source.stream(source)

        %DSL.Sink{origin: origin} ->
          sink = Strom.Sink.start(origin)
          Flow.add_component(flow_pid, {:sink, sink})
          Strom.Sink.stream(stream, sink)

        %DSL.Mixer{sources: sources} ->
          sources = Enum.map(sources, &do_build(&1, nil, flow_pid))
          mixer = Strom.Mixer.start(sources)
          Flow.add_component(flow_pid, {:mixer, mixer})
          Strom.Mixer.stream(mixer)

        %DSL.Pipeline{pipeline: pipeline} ->
          :ok = pipeline.start()
          Flow.add_component(flow_pid, {:pipeline, pipeline})
          pipeline.stream(stream)

        %DSL.Transform{function: function} ->
          function.(stream)

        %DSL.Splitter{branches: branches} ->
          partitions = Map.keys(branches)

          splitter = Strom.Splitter.start(stream, partitions)
          Flow.add_component(flow_pid, {:splitter, splitter})

          splitter
          |> Strom.Splitter.stream()
          |> Enum.with_index(fn str, index ->
            partition = Enum.at(partitions, index)
            branch = Map.fetch!(branches, partition)
            do_build(branch, str, flow_pid)
          end)

        %DSL.Run{} ->
          Flow.add_stream(flow_pid, stream)
          stream
      end
    end)
  end

  #  defp do_build(components, {stream, flow}) do
  #    components
  #    |> Enum.reduce({stream, flow}, fn(component, {stream, flow}) ->
  #      case component do
  #        %DSL.Source{source: source} ->
  #          stream = Strom.Source.stream(source)
  #          {stream, flow}
  #
  #        %DSL.Sink{sink: sink} ->
  #          stream = Strom.Sink.stream(stream, sink)
  #          {stream, flow}
  #
  #        %DSL.Mixer{sources: sources} ->
  #          {stream, flow} = do_build(sources, {nil, flow})
  #          stream =
  #            stream
  #            |> Strom.Mixer.new()
  #            |> Strom.Mixer.stream()
  #          {stream, flow}
  #        %DSL.Pipeline{pipeline: pipeline} ->
  #          pipeline.start()
  #          stream = pipeline.stream(stream)
  #          {stream, flow}
  #        %DSL.Splitter{branches: branches} ->
  #          partitions = Map.keys(branches)
  #          stream
  #          |> Strom.Splitter.new(partitions)
  #          |> Strom.Splitter.stream()
  #          |> Enum.with_index(fn str, index ->
  #            partition = Enum.at(partitions, index)
  #            branch = Map.fetch!(branches, partition)
  #            {stream, flow} = do_build(branch, {stream, flow})
  #          end)
  #          {stream, flow}
  #      end
  #    end)
  #  end
end