lib/y4m/buffer.ex

defmodule Y4m.Buffer do
  @moduledoc """
  Data buffer for fragmented y4m streams.
  Buffer will accept data and return either the properties or frames, once enough data is present.
  """
  use GenServer

  alias Y4m.FrameParser
  alias Y4m.HeaderParser

  @spec start_link(GenServer.options()) :: GenServer.on_start()
  def start_link(opts), do: GenServer.start_link(__MODULE__, [], opts)

  @doc """
  Push data into the buffer.
  """
  @spec push(atom | pid | {atom, any} | {:via, atom, any}, binary()) ::
          {:error, :need_more_data} | {:frames, [binary()]} | {:props, map()}
  def push(buffer, data), do: GenServer.call(buffer, {:push, data})

  defmodule State do
    defstruct collecting_header: true, buffer: "", frame_size: 0
  end

  @impl GenServer
  @spec init([]) :: {:ok, any}
  def init([]) do
    {:ok, %State{}}
  end

  @impl GenServer
  def handle_call({:push, data}, _from, state = %State{collecting_header: true}) do
    buffer = state.buffer <> data

    {reply, state} =
      case :binary.split(buffer, "\n") do
        [buffer] ->
          new_state = %State{state | buffer: buffer}
          {{:error, :need_more_data}, new_state}

        [header, rest] ->
          {:ok, props} = HeaderParser.parse(header)

          frame_size =
            FrameParser.plane_lengths(props.color_space, props.width, props.height)
            |> Enum.sum()

          new_state = %State{
            state
            | buffer: rest,
              collecting_header: false,
              frame_size: frame_size
          }

          {{:props, props}, new_state}
      end

    {:reply, reply, state}
  end

  @impl GenServer
  def handle_call({:push, data}, _from, state = %State{collecting_header: false}) do
    buffer = state.buffer <> data
    frames = split_frames(buffer, state.frame_size)

    if Enum.empty?(frames) do
      {:reply, {:error, :need_more_data}, %State{state | buffer: buffer}}
    else
      cutoff_index = length(frames) * (state.frame_size + 6)
      buffer = :binary.part(buffer, cutoff_index, byte_size(buffer) - cutoff_index)
      {:reply, {:frames, frames}, %State{state | buffer: buffer}}
    end
  end

  defp split_frames(buffer, frame_size) when byte_size(buffer) < frame_size + 6, do: []

  defp split_frames(buffer, frame_size) when byte_size(buffer) >= frame_size + 6 do
    <<"FRAME\n", buffer::binary>> = buffer
    <<frame::size(frame_size * 8), rest::binary>> = buffer

    [<<frame::size(frame_size * 8)>> | split_frames(rest, frame_size)]
  end
end