lib/parallel_stream/producer.ex

defmodule ParallelStream.Producer do
  alias ParallelStream.Defaults
  alias ParallelStream.Workers
  alias ParallelStream.Executor

  def build!(stream, fun, options) do
    build!(stream, fun, Executor, options)
  end

  def build!(stream, fun, executor, options) do
    worker_work_ratio = options |> Keyword.get(:worker_work_ratio, Defaults.worker_work_ratio())
    worker_count = options |> Keyword.get(:num_workers, Defaults.num_workers())
    chunk_size = worker_count * worker_work_ratio

    stream
    |> Stream.chunk_every(chunk_size, chunk_size, [])
    |> Stream.transform(
      fn ->
        {
          inqueue,
          workers,
          outqueues
        } = worker_count |> Workers.build!(fun, executor)

        {inqueue, workers, outqueues, 0}
      end,
      fn items, {inqueue, workers, outqueues, index} ->
        mapped =
          items
          |> Stream.with_index()
          |> Enum.map(fn {item, i} ->
            outqueue = outqueues |> Enum.at(rem(i, worker_count))
            inqueue |> send({index + i, item, outqueue})

            {outqueue, index + i}
          end)

        {[mapped], {inqueue, workers, outqueues, index + chunk_size}}
      end,
      fn {inqueue, workers, outqueues, _} ->
        inqueue |> send(:halt)
        outqueues |> Enum.each(fn outqueue -> outqueue |> send(:halt) end)
        workers |> Enum.each(fn worker -> worker |> send(:halt) end)
      end
    )
  end
end