lib/gen_mix.ex

defmodule Strom.GenMix do
  @moduledoc """
  Generic functionality used by `Strom.Mixer` and `Strom.Splitter`.
  """

  use GenServer

  @buffer 1000

  defstruct pid: nil,
            inputs: [],
            outputs: [],
            opts: [],
            running: false,
            buffer: @buffer,
            producers: %{},
            consumers: %{}

  alias Strom.GenMix.Consumer

  def start(%__MODULE__{opts: opts} = gen_mix) when is_list(opts) do
    gen_mix = %{
      gen_mix
      | buffer: Keyword.get(opts, :buffer, @buffer)
    }

    start_link(gen_mix)
  end

  def start_link(%__MODULE__{} = state) do
    GenServer.start_link(__MODULE__, state)
  end

  @impl true
  def init(%__MODULE__{} = mix) do
    {:ok, %{mix | pid: self()}}
  end

  def call(flow, pid) do
    GenServer.call(pid, {:call, flow})
  end

  def stop(pid) do
    GenServer.call(pid, :stop)
  end

  defp run_inputs(streams, pid, buffer) do
    Enum.reduce(streams, %{}, fn {{name, fun}, stream}, acc ->
      task = async_run_stream({name, fun}, stream, buffer, pid)
      Map.put(acc, {name, fun}, task)
    end)
  end

  defp async_run_stream({name, fun}, stream, buffer, pid) do
    Task.async(fn ->
      stream
      |> Stream.chunk_every(buffer)
      |> Stream.each(fn chunk ->
        {chunk, _} = Enum.split_with(chunk, fun)
        GenServer.cast(pid, {:new_data, {name, fun}, chunk})

        receive do
          :continue ->
            flush()
        end
      end)
      |> Stream.run()

      GenServer.cast(pid, {:done, {name, fun}})
    end)
  end

  defp flush do
    receive do
      _ -> flush()
    after
      0 -> :ok
    end
  end

  @impl true
  def handle_call({:call, flow}, _from, %__MODULE__{} = mix) do
    input_streams =
      Enum.reduce(mix.inputs, %{}, fn {name, fun}, acc ->
        Map.put(acc, {name, fun}, Map.fetch!(flow, name))
      end)

    {sub_flow, mix} =
      mix.outputs
      |> Enum.reduce({%{}, mix}, fn {name, fun}, {flow, mix} ->
        consumer = Consumer.start({name, fun}, mix.pid)

        mix = %{mix | consumers: Map.put(mix.consumers, {name, fun}, consumer)}

        stream = Consumer.call(consumer)
        {Map.put(flow, name, stream), mix}
      end)

    producers = run_inputs(input_streams, mix.pid, mix.buffer)

    flow =
      flow
      |> Map.drop(Map.keys(mix.inputs))
      |> Map.merge(sub_flow)

    {:reply, flow, %{mix | running: true, producers: producers}}
  end

  def handle_call(:stop, _from, %__MODULE__{} = mix) do
    {:stop, :normal, :ok, %{mix | running: false}}
  end

  @impl true
  def handle_cast({:new_data, {_name, _fun}, chunk}, %__MODULE__{} = mix) do
    Enum.each(mix.consumers, fn {_, cons} ->
      GenServer.cast(cons.pid, {:put_data, chunk})
      GenServer.cast(cons.pid, :continue)
    end)

    {:noreply, mix}
  end

  def handle_cast({:done, {name, fun}}, %__MODULE__{} = mix) do
    mix = %{mix | producers: Map.delete(mix.producers, {name, fun})}

    if map_size(mix.producers) == 0 do
      Enum.each(mix.consumers, fn {_, cons} ->
        GenServer.cast(cons.pid, :continue)
        GenServer.cast(cons.pid, :stop)
      end)
    end

    {:noreply, mix}
  end

  def handle_cast({:consumer_got_data, {_name, _fun}}, %__MODULE__{} = mix) do
    Enum.each(mix.producers, fn {_, task} ->
      send(task.pid, :continue)
    end)

    {:noreply, mix}
  end

  @impl true
  def handle_info({_task_ref, :ok}, mix) do
    # do nothing for now
    {:noreply, mix}
  end

  def handle_info({:DOWN, _task_ref, :process, _task_pid, :normal}, mix) do
    # do nothing for now
    {:noreply, mix}
  end
end