lib/membrane_raw_video/parser.ex

defmodule Membrane.RawVideo.Parser do
  @moduledoc """
  Simple module responsible for splitting the incoming buffers into
  frames of raw (uncompressed) video frames of desired format.

  The parser sends proper stream_format when moves to playing state.
  No data analysis is done, this element simply ensures that
  the resulting packets have proper size.
  """
  use Membrane.Filter
  alias Membrane.{Buffer, Payload}
  alias Membrane.{RawVideo, RemoteStream}

  def_input_pad :input,
    demand_unit: :bytes,
    demand_mode: :auto,
    accepted_format: RemoteStream

  def_output_pad :output,
    demand_mode: :auto,
    accepted_format: %RawVideo{aligned: true}

  def_options pixel_format: [
                spec: RawVideo.pixel_format_t(),
                description: """
                Format used to encode pixels of the video frame.
                """
              ],
              width: [
                spec: pos_integer(),
                description: """
                Width of a frame in pixels.
                """
              ],
              height: [
                spec: pos_integer(),
                description: """
                Height of a frame in pixels.
                """
              ],
              framerate: [
                spec: RawVideo.framerate_t(),
                default: {0, 1},
                description: """
                Framerate of video stream. Passed forward in stream_format.
                """
              ]

  @supported_formats [:I420, :I422, :I444, :RGB, :BGRA, :RGBA, :NV12, :NV21, :YV12, :AYUV, :YUY2]

  @impl true
  def handle_init(_ctx, opts) do
    unless opts.pixel_format in @supported_formats do
      raise """
      Unsupported frame pixel format: #{inspect(opts.pixel_format)}
      The elements supports: #{Enum.map_join(@supported_formats, ", ", &inspect/1)}
      """
    end

    frame_size =
      case RawVideo.frame_size(opts.pixel_format, opts.width, opts.height) do
        {:ok, frame_size} ->
          frame_size

        {:error, :invalid_dimensions} ->
          raise "Provided dimensions (#{opts.width}x#{opts.height}) are invalid for #{inspect(opts.pixel_format)} pixel format"
      end

    stream_format = %RawVideo{
      pixel_format: opts.pixel_format,
      width: opts.width,
      height: opts.height,
      framerate: opts.framerate,
      aligned: true
    }

    {num, denom} = stream_format.framerate
    frame_duration = if num == 0, do: 0, else: Ratio.new(denom * Membrane.Time.second(), num)

    {[],
     %{
       stream_format: stream_format,
       timestamp: 0,
       frame_duration: frame_duration,
       frame_size: frame_size,
       queue: []
     }}
  end

  @impl true
  def handle_playing(_ctx, state) do
    {[stream_format: {:output, state.stream_format}], state}
  end

  @impl true
  def handle_stream_format(:input, _stream_format, _ctx, state) do
    # Do not forward stream_format
    {[], state}
  end

  @impl true
  def handle_process_list(:input, buffers, _ctx, state) do
    %{frame_size: frame_size} = state

    payload_iodata =
      buffers |> Enum.map(fn %Buffer{payload: payload} -> Payload.to_binary(payload) end)

    queue = [payload_iodata | state.queue]
    size = IO.iodata_length(queue)

    if size < frame_size do
      {[], %{state | queue: queue}}
    else
      data_binary = queue |> Enum.reverse() |> IO.iodata_to_binary()

      {payloads, tail} = Bunch.Binary.chunk_every_rem(data_binary, frame_size)

      {bufs, state} =
        payloads
        |> Enum.map_reduce(state, fn payload, state_acc ->
          timestamp = state_acc.timestamp |> Ratio.floor()
          {%Buffer{payload: payload, pts: timestamp}, bump_timestamp(state_acc)}
        end)

      {[buffer: {:output, bufs}], %{state | queue: [tail]}}
    end
  end

  defp bump_timestamp(%{stream_format: %{framerate: {0, _}}} = state) do
    state
  end

  defp bump_timestamp(state) do
    use Ratio
    %{timestamp: timestamp, frame_duration: frame_duration} = state
    timestamp = timestamp + frame_duration
    %{state | timestamp: timestamp}
  end
end