lib/membrane_rtp_vp8/depayloader.ex

defmodule Membrane.RTP.VP8.Depayloader do
  @moduledoc """
  Depayloads VP8 frames from RTP packets according to: https://tools.ietf.org/html/rfc7741
  """

  use Membrane.Filter

  require Membrane.Logger

  alias Membrane.RTP.VP8.Frame
  alias Membrane.{Buffer, RemoteStream, RTP}
  alias Membrane.Event.Discontinuity
  alias Membrane.VP8

  @type sequence_number :: 0..65_535

  def_input_pad :input, accepted_format: RTP
  def_output_pad :output, accepted_format: %RemoteStream{content_format: VP8, type: :packetized}

  defmodule State do
    @moduledoc false

    @type t :: %__MODULE__{
            frame_acc: Frame.t(),
            first_buffer: nil | Buffer.t()
          }
    defstruct frame_acc: %Frame{}, first_buffer: nil
  end

  @impl true
  def handle_init(_ctx, _options), do: {[], %State{}}

  @impl true
  def handle_stream_format(:input, _format, _context, state) do
    stream_format = %RemoteStream{content_format: VP8, type: :packetized}
    {[stream_format: {:output, stream_format}], state}
  end

  @impl true
  def handle_event(:input, %Discontinuity{} = event, _ctx, state),
    do: {[forward: event], %State{state | frame_acc: %Frame{}}}

  @impl true
  def handle_end_of_stream(:input, _ctx, %State{first_buffer: nil} = state) do
    {[end_of_stream: :output], state}
  end

  @impl true
  def handle_end_of_stream(:input, _ctx, state) do
    {frame, _acc} = Frame.flush(state.frame_acc)

    {[
       buffer: {:output, %Buffer{state.first_buffer | payload: frame}},
       end_of_stream: :output
     ], %State{}}
  end

  @impl true
  def handle_buffer(:input, buffer, _ctx, state) do
    state = %{state | first_buffer: state.first_buffer || buffer}

    case parse_buffer(buffer, state) do
      {:ok, actions, new_state} ->
        {actions, new_state}

      {:error, reason} ->
        log_malformed_buffer(buffer, reason)
        {[], %State{}}
    end
  end

  defp parse_buffer(buffer, state) do
    case Frame.parse(buffer, state.frame_acc) do
      {:ok, :incomplete, acc} ->
        {:ok, [], %State{state | frame_acc: acc}}

      {:ok, frame, acc} ->
        {:ok, [buffer: {:output, %{state.first_buffer | payload: frame}}],
         %State{state | frame_acc: acc, first_buffer: buffer}}

      {:error, _} = error ->
        error
    end
  end

  defp log_malformed_buffer(packet, reason) do
    Membrane.Logger.warning("""
    An error occurred while parsing RTP packet.
    Reason: #{reason}
    Packet: #{inspect(packet, limit: :infinity)}
    """)
  end
end