lib/gen_mix.ex

defmodule Strom.GenMix do
  use GenServer

  @buffer 1000

  defstruct pid: nil,
            opts: [],
            flow_pid: nil,
            sup_pid: nil,
            running: false,
            buffer: @buffer,
            producers: %{},
            consumers: %{}

  alias Strom.GenMix.Consumer

  def start(opts \\ [])

  def start(%__MODULE__{opts: opts} = gen_mix) when is_list(opts) do
    gen_mix = %{
      gen_mix
      | buffer: Keyword.get(opts, :buffer, @buffer)
    }

    {:ok, pid} = DynamicSupervisor.start_child(gen_mix.sup_pid, {__MODULE__, gen_mix})
    __state__(pid)
  end

  def start(opts) when is_list(opts) do
    state = %__MODULE__{
      buffer: Keyword.get(opts, :buffer, @buffer)
    }

    {:ok, pid} = GenServer.start_link(__MODULE__, state)
    __state__(pid)
  end

  def start_link(%__MODULE__{} = state) do
    GenServer.start_link(__MODULE__, state)
  end

  @impl true
  def init(%__MODULE__{} = mix) do
    {:ok, %{mix | pid: self()}}
  end

  def call(flow, %__MODULE__{} = mix, inputs, outputs)
      when is_map(flow) and is_map(inputs) and is_map(outputs) do
    input_streams =
      Enum.reduce(inputs, %{}, fn {name, fun}, acc ->
        Map.put(acc, {name, fun}, Map.fetch!(flow, name))
      end)

    sub_flow =
      outputs
      |> Enum.reduce(%{}, fn {name, fun}, flow ->
        consumer = Consumer.start({name, fun}, mix.pid)
        :ok = GenServer.call(mix.pid, {:register_consumer, {{name, fun}, consumer}})
        stream = Consumer.call(consumer)
        Map.put(flow, name, stream)
      end)

    :ok = GenServer.call(mix.pid, {:run_inputs, input_streams})

    flow
    |> Map.drop(Map.keys(inputs))
    |> Map.merge(sub_flow)
  end

  def stop(%__MODULE__{pid: pid, sup_pid: sup_pid}) do
    if sup_pid do
      :ok
    else
      GenServer.call(pid, :stop)
    end
  end

  def __state__(pid) when is_pid(pid), do: GenServer.call(pid, :__state__)

  defp run_inputs(streams, pid, buffer) do
    Enum.reduce(streams, %{}, fn {{name, fun}, stream}, acc ->
      task = async_run_stream({name, fun}, stream, buffer, pid)
      Map.put(acc, {name, fun}, task)
    end)
  end

  defp async_run_stream({name, fun}, stream, buffer, pid) do
    Task.async(fn ->
      stream
      |> Stream.chunk_every(buffer)
      |> Stream.each(fn chunk ->
        {chunk, _} = Enum.split_with(chunk, fun)
        GenServer.cast(pid, {:new_data, {name, fun}, chunk})

        receive do
          :continue ->
            flush()
        end
      end)
      |> Stream.run()

      GenServer.cast(pid, {:done, {name, fun}})
    end)
  end

  defp flush do
    receive do
      _ -> flush()
    after
      0 -> :ok
    end
  end

  @impl true
  def handle_call({:run_inputs, streams_to_mix}, _from, %__MODULE__{} = mix) do
    producers = run_inputs(streams_to_mix, mix.pid, mix.buffer)

    {:reply, :ok, %{mix | running: true, producers: producers}}
  end

  def handle_call({:register_consumer, {{name, fun}, cons}}, _from, %__MODULE__{} = mix) do
    mix = %{mix | consumers: Map.put(mix.consumers, {name, fun}, cons)}
    {:reply, :ok, mix}
  end

  def handle_call(:stop, _from, %__MODULE__{} = mix) do
    {:stop, :normal, :ok, %{mix | running: false}}
  end

  def handle_call(:__state__, _from, mix), do: {:reply, mix, mix}

  @impl true
  def handle_cast({:new_data, {_name, _fun}, chunk}, %__MODULE__{} = mix) do
    Enum.each(mix.consumers, fn {_, cons} ->
      GenServer.cast(cons.pid, {:put_data, chunk})
      GenServer.cast(cons.pid, :continue)
    end)

    {:noreply, mix}
  end

  def handle_cast({:done, {name, fun}}, %__MODULE__{} = mix) do
    mix = %{mix | producers: Map.delete(mix.producers, {name, fun})}

    if map_size(mix.producers) == 0 do
      Enum.each(mix.consumers, fn {_, cons} ->
        GenServer.cast(cons.pid, :continue)
        GenServer.cast(cons.pid, :stop)
      end)
    end

    {:noreply, mix}
  end

  def handle_cast({:consumer_got_data, {_name, _fun}}, %__MODULE__{} = mix) do
    Enum.each(mix.producers, fn {_, task} ->
      send(task.pid, :continue)
    end)

    {:noreply, mix}
  end

  @impl true
  def handle_info({_task_ref, :ok}, mix) do
    # do nothing for now
    {:noreply, mix}
  end

  def handle_info({:DOWN, _task_ref, :process, _task_pid, :normal}, mix) do
    # do nothing for now
    {:noreply, mix}
  end
end