lib/membrane/rtp/parser.ex

defmodule Membrane.RTP.Parser do
  @moduledoc """
  Identifies RTP/RTCP packets, then tries to parse RTP packet (parsing header and preparing payload)
  and forwards RTCP packet to `:rtcp_output` pad unchanged.

  ## Encrypted packets
  In case of SRTP/SRTCP the parser tries to parse just the header of the RTP packet as the packet's payload
  is encrypted and must be passed as a whole to the decryptor. The whole packet remains unchanged but
  the parsed header gets  attached to `Membrane.Buffer`'s metadata.

  SRTP is treated the same as RTCP and all packets gets forwarded to `:rtcp_output` pad.

  ## Parsed packets
  In both cases, encrypted and unencryptd, parsed header is put into the metadata field in `Membrane.Buffer` under `:rtp` key.
  with the following metadata `:timestamp`, `:sequence_number`, `:ssrc`, `:payload_type`,
  `:marker`, `:extension`. See `Membrane.RTP.Header` for their meaning and specifications.
  """

  use Membrane.Filter

  require Membrane.Logger
  alias Membrane.Buffer
  alias Membrane.{RemoteStream, RTCP, RTCPEvent, RTP}

  @metadata_fields [
    :timestamp,
    :sequence_number,
    :ssrc,
    :csrcs,
    :payload_type,
    :marker,
    :extensions
  ]

  def_options secure?: [
                type: :boolean,
                default: false,
                description: """
                Specifies whether Parser should expect packets that are encrypted or not.
                Requires adding [srtp](https://github.com/membraneframework/elixir_libsrtp) dependency to work.
                """
              ]

  def_input_pad :input,
    accepted_format:
      %RemoteStream{type: :packetized, content_format: cf} when cf in [nil, RTP, RTCP],
    demand_mode: :auto

  def_output_pad :output, accepted_format: RTP, demand_mode: :auto

  def_output_pad :rtcp_output,
    mode: :push,
    accepted_format: %RemoteStream{content_format: RTCP, type: :packetized},
    availability: :on_request

  @impl true
  def handle_init(_ctx, opts) do
    {[], %{rtcp_output_pad: nil, secure?: opts.secure?}}
  end

  @impl true
  def handle_stream_format(:input, _stream_format, _ctx, state) do
    {[stream_format: {:output, %RTP{}}], state}
  end

  @impl true
  def handle_pad_added(Pad.ref(:rtcp_output, _ref) = pad, %{playback_state: :playing}, state) do
    actions = [stream_format: {pad, %RemoteStream{content_format: RTCP, type: :packetized}}]
    {actions, %{state | rtcp_output_pad: pad}}
  end

  @impl true
  def handle_pad_added(Pad.ref(:rtcp_output, _ref) = pad, _ctx, state) do
    {[], %{state | rtcp_output_pad: pad}}
  end

  @impl true
  def handle_playing(_ctx, %{rtcp_output_pad: pad} = state) when pad != nil do
    actions = [stream_format: {pad, %RemoteStream{content_format: RTCP, type: :packetized}}]
    {actions, state}
  end

  @impl true
  def handle_playing(ctx, state) do
    super(ctx, state)
  end

  @impl true
  def handle_process(:input, %Buffer{payload: payload, metadata: metadata} = buffer, _ctx, state) do
    with :rtp <- RTP.Packet.identify(payload),
         {:ok,
          %{packet: packet, padding_size: padding_size, total_header_size: total_header_size}} <-
           RTP.Packet.parse(payload, state.secure?) do
      %RTP.Packet{payload: payload, header: header} = packet

      rtp =
        header
        |> Map.take(@metadata_fields)
        |> Map.merge(%{padding_size: padding_size, total_header_size: total_header_size})

      metadata = Map.put(metadata, :rtp, rtp)
      {[buffer: {:output, %Buffer{payload: payload, metadata: metadata}}], state}
    else
      :rtcp ->
        case state.rtcp_output_pad do
          nil -> {[], state}
          pad -> {[buffer: {pad, buffer}], state}
        end

      {:error, reason} ->
        Membrane.Logger.debug("""
        Couldn't parse rtp packet:
        #{inspect(payload, limit: :infinity)}
        Reason: #{inspect(reason)}. Ignoring packet.
        """)

        {[], state}
    end
  end

  @impl true
  def handle_event(:output, %RTCPEvent{} = event, _ctx, state) do
    case state.rtcp_output_pad do
      nil ->
        {[], state}

      pad ->
        {[event: {pad, event}], state}
    end
  end

  @impl true
  def handle_event(pad, event, ctx, state), do: super(pad, event, ctx, state)
end