lib/membrane/rtcp/receiver.ex

defmodule Membrane.RTCP.Receiver do
  @moduledoc """
  Element exchanging RTCP packets and jitter buffer statistics.
  """
  use Membrane.Filter

  alias Membrane.RTCPEvent
  alias Membrane.RTCP.{FeedbackPacket, SenderReportPacket}
  alias Membrane.{RTCP, RTP}
  alias Membrane.RTCP.ReceiverReport
  alias Membrane.Time

  require Membrane.Logger

  def_input_pad :input, demand_unit: :buffers, caps: :any
  def_output_pad :output, caps: :any

  def_options local_ssrc: [spec: RTP.ssrc_t()],
              remote_ssrc: [spec: RTP.ssrc_t()],
              report_interval: [spec: Membrane.Time.t() | nil, default: nil],
              fir_interval: [spec: Membrane.Time.t() | nil, default: nil]

  @impl true
  def handle_init(opts) do
    {:ok, Map.from_struct(opts) |> Map.merge(%{fir_seq_num: 0, sr_info: %{}})}
  end

  @impl true
  def handle_prepared_to_playing(_ctx, state) do
    fir_timer =
      if state.fir_interval, do: [start_timer: {:fir_timer, state.fir_interval}], else: []

    report_timer =
      if state.report_interval,
        do: [start_timer: {:report_timer, state.report_interval}],
        else: []

    {{:ok, fir_timer ++ report_timer}, state}
  end

  @impl true
  def handle_playing_to_prepared(_ctx, state) do
    fir_timer = if state.fir_interval, do: [stop_timer: :fir_timer], else: []
    report_timer = if state.report_interval, do: [stop_timer: :report_timer], else: []
    {{:ok, fir_timer ++ report_timer}, state}
  end

  @impl true
  def handle_tick(:report_timer, _ctx, state) do
    {{:ok, event: {:output, %ReceiverReport.StatsRequestEvent{}}}, state}
  end

  @impl true
  def handle_tick(:fir_timer, _ctx, state) do
    send_fir(state)
  end

  @impl true
  def handle_event(:input, %RTCPEvent{rtcp: %SenderReportPacket{} = rtcp} = event, _ctx, state) do
    <<_wallclock_ts_upper_16_bits::16, wallclock_ts_middle_32_bits::32,
      _wallclock_ts_lower_16_bits::16>> =
      Time.to_ntp_timestamp(rtcp.sender_info.wallclock_timestamp)

    sr_info = %{
      cut_wallclock_ts: wallclock_ts_middle_32_bits,
      arrival_ts: event.arrival_timestamp
    }

    {:ok, %{state | sr_info: sr_info}}
  end

  @impl true
  def handle_event(:input, %RTCPEvent{} = event, _ctx, state) do
    Membrane.Logger.warn("Received unknown RTCP packet: #{inspect(event)}")
    {:ok, state}
  end

  @impl true
  def handle_event(:output, %ReceiverReport.StatsEvent{stats: :no_stats}, _ctx, state) do
    {:ok, state}
  end

  @impl true
  def handle_event(:output, %ReceiverReport.StatsEvent{stats: stats}, _ctx, state) do
    now = Time.vm_time()
    delay_since_sr = now - Map.get(state.sr_info, :arrival_ts, now)

    report_block = %RTCP.ReportPacketBlock{
      ssrc: state.remote_ssrc,
      fraction_lost: stats.fraction_lost,
      total_lost: stats.total_lost,
      highest_seq_num: stats.highest_seq_num,
      interarrival_jitter: trunc(stats.interarrival_jitter),
      last_sr_timestamp: Map.get(state.sr_info, :cut_wallclock_ts, 0),
      # delay_since_sr is expressed in 1/65536 seconds, see https://tools.ietf.org/html/rfc3550#section-6.4.1
      delay_since_sr: Time.to_seconds(65_536 * delay_since_sr)
    }

    packet = %RTCP.ReceiverReportPacket{ssrc: state.local_ssrc, reports: [report_block]}
    {{:ok, event: {:input, %RTCPEvent{rtcp: packet}}}, state}
  end

  @impl true
  def handle_event(:output, %Membrane.KeyframeRequestEvent{}, _ctx, state) do
    send_fir(state)
  end

  @impl true
  def handle_event(pad, event, ctx, state), do: super(pad, event, ctx, state)

  @impl true
  def handle_demand(:output, size, :buffers, _ctx, state) do
    {{:ok, demand: {:input, size}}, state}
  end

  @impl true
  def handle_process(:input, buffer, _ctx, state) do
    {{:ok, buffer: {:output, buffer}}, state}
  end

  defp send_fir(state) do
    rtcp = %FeedbackPacket{
      origin_ssrc: state.local_ssrc,
      payload: %FeedbackPacket.FIR{
        target_ssrc: state.remote_ssrc,
        seq_num: state.fir_seq_num
      }
    }

    event = %RTCPEvent{rtcp: rtcp}
    state = Map.update!(state, :fir_seq_num, &(&1 + 1))
    {{:ok, event: {:input, event}}, state}
  end
end