lib/mixer.ex

defmodule Strom.Mixer do
  use GenServer

  defstruct [:streams, :pid, :running, :data, :chunk_every]

  @chunk_every 100

  def start(streams, opts \\ []) when is_list(streams) and is_list(opts) do
    state = %__MODULE__{
      streams: streams,
      running: false,
      chunk_every: Keyword.get(opts, :chunk_every, @chunk_every)
    }

    {:ok, pid} = GenServer.start_link(__MODULE__, state)
    __state__(pid)
  end

  def init(%__MODULE__{streams: streams} = mixer) do
    {:ok, %{mixer | pid: self(), data: [], streams: MapSet.new(streams)}}
  end

  def stream(%__MODULE__{} = mixer) do
    :ok = GenServer.call(mixer.pid, :run_streams)

    Stream.resource(
      fn -> mixer end,
      fn mixer ->
        case GenServer.call(mixer.pid, :get_data) do
          {:ok, data} ->
            {data, mixer}

          {:error, :done} ->
            {:halt, mixer}
        end
      end,
      fn mixer -> mixer end
    )
  end

  def stop(%__MODULE__{pid: pid}), do: GenServer.call(pid, :stop)

  def add(%__MODULE__{pid: pid}, stream), do: GenServer.call(pid, {:add, stream})

  def __state__(pid) when is_pid(pid), do: GenServer.call(pid, :__state__)

  defp run_streams(streams, pid, chunk_every) do
    Enum.map(streams, fn stream ->
      async_run_stream(stream, chunk_every, pid)
    end)
  end

  defp async_run_stream(stream, chunk_every, pid) do
    Task.async(fn ->
      stream
      |> Stream.chunk_every(chunk_every)
      |> Stream.each(fn chunk ->
        data_length = GenServer.call(pid, {:new_data, chunk})
        maybe_wait(data_length, chunk_every)
      end)
      |> Stream.run()

      GenServer.call(pid, {:done, stream})
    end)
  end

  defp maybe_wait(data_length, chunk_every) do
    if data_length > 10 * chunk_every do
      div = div(data_length, 10 * chunk_every)
      to_sleep = trunc(:math.pow(2, div))
      Process.sleep(to_sleep)
    end
  end

  def handle_call({:new_data, data}, _from, %__MODULE__{data: prev_data} = mixer) do
    data = data ++ prev_data

    {:reply, length(data), %{mixer | data: data}}
  end

  def handle_call(:run_streams, _from, %__MODULE__{} = mixer) do
    run_streams(mixer.streams, mixer.pid, mixer.chunk_every)

    {:reply, :ok, %{mixer | running: true}}
  end

  def handle_call({:done, stream}, _from, %__MODULE__{streams: streams} = mixer) do
    streams = MapSet.delete(streams, stream)
    {:reply, :ok, %{mixer | streams: streams, running: false}}
  end

  def handle_call(:get_data, _from, %__MODULE__{data: data, streams: streams} = mixer) do
    if length(data) == 0 && MapSet.size(streams) == 0 do
      {:reply, {:error, :done}, mixer}
    else
      {:reply, {:ok, data}, %{mixer | data: []}}
    end
  end

  def handle_call(:stop, _from, %__MODULE__{} = mixer) do
    {:stop, :normal, :ok, %{mixer | running: false}}
  end

  def handle_call({:add, stream}, _from, %__MODULE__{streams: streams} = mixer) do
    mixer = %{mixer | streams: MapSet.put(streams, stream)}

    if mixer.running do
      async_run_stream(stream, mixer.chunk_every, mixer.pid)
    end

    {:reply, mixer, mixer}
  end

  def handle_call(:__state__, _from, mixer), do: {:reply, mixer, mixer}

  def handle_info({_task_ref, :ok}, mixer) do
    # do nothing for now
    {:noreply, mixer}
  end

  def handle_info({:DOWN, _task_ref, :process, _task_pid, :normal}, mixer) do
    # do nothing for now
    {:noreply, mixer}
  end
end