lib/membrane/rtcp/receiver.ex

defmodule Membrane.RTCP.Receiver do
  @moduledoc """
  Element exchanging RTCP packets and RTCP receiver statistics.
  """
  use Membrane.Filter

  require Membrane.Logger
  require Membrane.TelemetryMetrics
  alias Membrane.RTCP.ReceiverReport
  alias Membrane.RTCP.{FeedbackPacket, SenderReportPacket}
  alias Membrane.RTCPEvent
  alias Membrane.Time
  alias Membrane.{RTCP, RTP}

  def_input_pad :input, caps: :any, demand_mode: :auto
  def_output_pad :output, caps: :any, demand_mode: :auto

  def_options local_ssrc: [spec: RTP.ssrc_t()],
              remote_ssrc: [spec: RTP.ssrc_t()],
              report_interval: [spec: Membrane.Time.t() | nil, default: nil],
              telemetry_label: [spec: Membrane.TelemetryMetrics.label(), default: []]

  @fir_throttle_duration Application.compile_env(
                           :membrane_rtp_plugin,
                           :fir_throttle_duration_ms,
                           500
                         )
                         |> Membrane.Time.milliseconds()

  @fir_telemetry_event [Membrane.RTP, :rtcp, :fir, :sent]

  @impl true
  def handle_init(opts) do
    Membrane.TelemetryMetrics.register(@fir_telemetry_event, opts.telemetry_label)

    state =
      opts
      |> Map.from_struct()
      |> Map.merge(%{fir_seq_num: 0, last_fir_timestamp: 0, sr_info: %{}})

    {:ok, state}
  end

  @impl true
  def handle_prepared_to_playing(_ctx, state) do
    report_timer =
      if state.report_interval,
        do: [start_timer: {:report_timer, state.report_interval}],
        else: []

    {{:ok, report_timer}, state}
  end

  @impl true
  def handle_playing_to_prepared(_ctx, state) do
    report_timer = if state.report_interval, do: [stop_timer: :report_timer], else: []
    {{:ok, report_timer}, state}
  end

  @impl true
  def handle_tick(:report_timer, _ctx, state) do
    {{:ok, event: {:output, %ReceiverReport.StatsRequestEvent{}}}, state}
  end

  @impl true
  def handle_event(:input, %RTCPEvent{rtcp: %SenderReportPacket{} = rtcp} = event, _ctx, state) do
    <<_wallclock_ts_upper_16_bits::16, wallclock_ts_middle_32_bits::32,
      _wallclock_ts_lower_16_bits::16>> =
      Time.to_ntp_timestamp(rtcp.sender_info.wallclock_timestamp)

    sr_info = %{
      cut_wallclock_ts: wallclock_ts_middle_32_bits,
      arrival_ts: event.arrival_timestamp
    }

    {:ok, %{state | sr_info: sr_info}}
  end

  @impl true
  def handle_event(:input, %RTCPEvent{} = event, _ctx, state) do
    Membrane.Logger.error("Unexpected RTCPEvent: #{inspect(event)}")
    {:ok, state}
  end

  @impl true
  def handle_event(:output, %ReceiverReport.StatsEvent{stats: :no_stats}, _ctx, state) do
    {:ok, state}
  end

  @impl true
  def handle_event(:output, %ReceiverReport.StatsEvent{stats: stats}, _ctx, state) do
    now = Time.vm_time()
    delay_since_sr = now - Map.get(state.sr_info, :arrival_ts, now)

    report_block = %RTCP.ReportPacketBlock{
      ssrc: state.remote_ssrc,
      fraction_lost: stats.fraction_lost,
      total_lost: stats.total_lost,
      highest_seq_num: stats.highest_seq_num,
      interarrival_jitter: trunc(stats.interarrival_jitter),
      last_sr_timestamp: Map.get(state.sr_info, :cut_wallclock_ts, 0),
      # delay_since_sr is expressed in 1/65536 seconds, see https://tools.ietf.org/html/rfc3550#section-6.4.1
      delay_since_sr: Time.to_seconds(65_536 * delay_since_sr)
    }

    packet = %RTCP.ReceiverReportPacket{ssrc: state.local_ssrc, reports: [report_block]}
    {{:ok, event: {:input, %RTCPEvent{rtcp: packet}}}, state}
  end

  @impl true
  def handle_event(:output, %Membrane.KeyframeRequestEvent{}, _ctx, state) do
    send_fir(state)
  end

  @impl true
  def handle_event(pad, event, ctx, state), do: super(pad, event, ctx, state)

  @impl true
  def handle_process(:input, buffer, _ctx, state) do
    {{:ok, buffer: {:output, buffer}}, state}
  end

  defp send_fir(state) do
    now = Time.vm_time()

    if now - state.last_fir_timestamp > @fir_throttle_duration do
      rtcp = %FeedbackPacket{
        origin_ssrc: state.local_ssrc,
        payload: %FeedbackPacket.FIR{
          target_ssrc: state.remote_ssrc,
          seq_num: state.fir_seq_num
        }
      }

      Membrane.TelemetryMetrics.execute(@fir_telemetry_event, %{}, %{}, state.telemetry_label)

      event = %RTCPEvent{rtcp: rtcp}
      state = %{state | fir_seq_num: state.fir_seq_num + 1, last_fir_timestamp: now}
      Membrane.Logger.info("Sending FIR to #{state.remote_ssrc}")
      {{:ok, event: {:input, event}}, state}
    else
      Membrane.Logger.debug("Not sending FIR to #{state.remote_ssrc} due to throttling")
      {:ok, state}
    end
  end
end