lib/membrane/rtcp/parser.ex

defmodule Membrane.RTCP.Parser do
  @moduledoc """
  Element responsible for receiving raw RTCP packets, parsing them and emitting proper RTCP events.
  """

  use Membrane.Filter

  require Membrane.Logger
  alias Membrane.Buffer
  alias Membrane.{RemoteStream, RTCP, RTCPEvent}

  def_input_pad :input,
    caps: {RemoteStream, type: :packetized, content_format: one_of([nil, RTCP])},
    demand_mode: :auto

  def_output_pad :output, caps: RTCP, demand_mode: :auto

  def_output_pad :receiver_report_output,
    mode: :push,
    caps: {RemoteStream, type: :packetized, content_format: RTCP}

  @impl true
  def handle_init(_opts) do
    {:ok, %{}}
  end

  @impl true
  def handle_prepared_to_playing(_ctx, state) do
    caps = %RemoteStream{type: :packetized, content_format: RTCP}
    {{:ok, caps: {:receiver_report_output, caps}}, state}
  end

  @impl true
  def handle_caps(:input, _caps, _ctx, state) do
    {{:ok, caps: {:output, %RTCP{}}}, state}
  end

  @impl true
  def handle_process(:input, %Buffer{payload: payload, metadata: metadata}, _ctx, state) do
    payload
    |> RTCP.Packet.parse()
    |> case do
      {:ok, packets} ->
        actions = process_packets(packets, metadata)
        {{:ok, actions}, state}

      {:error, reason} ->
        Membrane.Logger.warn("""
        Couldn't parse rtcp packet:
        #{inspect(payload, limit: :infinity)}
        Reason: #{inspect(reason)}. Ignoring packet.
        """)

        {:ok, state}
    end
  end

  @impl true
  def handle_event(:output, %RTCPEvent{} = event, _ctx, state) do
    buffer = %Buffer{payload: RTCP.Packet.serialize(event.rtcp)}
    {{:ok, buffer: {:receiver_report_output, buffer}}, state}
  end

  @impl true
  def handle_event(pad, event, ctx, state), do: super(pad, event, ctx, state)

  defp process_packets(rtcp, metadata) do
    Enum.flat_map(rtcp, &process_rtcp(&1, metadata))
  end

  defp process_rtcp(%RTCP.FeedbackPacket{payload: %keyframe_request{}} = packet, metadata)
       when keyframe_request in [RTCP.FeedbackPacket.FIR, RTCP.FeedbackPacket.PLI] do
    event = to_rtcp_event(packet, packet.target_ssrc, metadata)
    [event: {:output, event}]
  end

  defp process_rtcp(
         %RTCP.TransportFeedbackPacket{payload: %RTCP.TransportFeedbackPacket.TWCC{} = feedback},
         _metadata
       ) do
    [notify: {:twcc_feedback, feedback}]
  end

  defp process_rtcp(%RTCP.SenderReportPacket{ssrc: ssrc} = packet, metadata) do
    event = to_rtcp_event(packet, ssrc, metadata)
    [event: {:output, event}]
  end

  defp process_rtcp(%RTCP.ReceiverReportPacket{reports: reports}, metadata) do
    reports
    |> Enum.map(fn report ->
      event = to_rtcp_event(report, report.ssrc, metadata)
      {:event, {:output, event}}
    end)
  end

  defp process_rtcp(
         %RTCP.FeedbackPacket{
           payload: %RTCP.FeedbackPacket.AFB{message: "REMB" <> _remb_data}
         },
         _metadata
       ) do
    # maybe TODO: handle REMB extension
    # Even though we do not support REMB and do not advertise such support in SDP,
    # browsers ignore that and send REMB packets for video as part of sender report ¯\_(ツ)_/¯
    []
  end

  defp process_rtcp(%RTCP.ByePacket{ssrcs: ssrcs}, _metadata) do
    Membrane.Logger.debug("SSRCs #{inspect(ssrcs)} are leaving (received RTCP Bye)")
    []
  end

  defp process_rtcp(%RTCP.SdesPacket{}, _metadata) do
    # We don't care about SdesPacket, usually included in compound packet with SenderReportPacket or ReceiverReportPacket
    []
  end

  defp process_rtcp(unknown_packet, metadata) do
    Membrane.Logger.warn("""
    Unhandled RTCP packet
    #{inspect(unknown_packet, pretty: true, limit: :infinity)}
    #{inspect(metadata, pretty: true)}
    """)

    []
  end

  defp to_rtcp_event(rtcp_packet, ssrc, metadata) do
    %RTCPEvent{
      rtcp: rtcp_packet,
      ssrcs: [ssrc],
      arrival_timestamp: Map.get(metadata, :arrival_ts, Membrane.Time.vm_time())
    }
  end
end