lib/membrane/rtp/stream_receive_bin.ex

defmodule Membrane.RTP.StreamReceiveBin do
  @moduledoc """
  This bin gets a parsed RTP stream on input and outputs raw media stream.

  Its responsibility is to depayload the RTP stream and compensate the
  jitter.
  """

  use Membrane.Bin

  alias Membrane.{RTCP, RTP, SRTP}

  def_options srtp_policies: [
                spec: [ExLibSRTP.Policy.t()],
                default: []
              ],
              secure?: [
                spec: boolean(),
                default: false
              ],
              extensions: [
                spec: [RTP.SessionBin.extension_t()],
                default: []
              ],
              clock_rate: [
                spec: RTP.clock_rate_t()
              ],
              depayloader: [spec: module() | nil],
              local_ssrc: [spec: RTP.ssrc_t()],
              remote_ssrc: [spec: RTP.ssrc_t()],
              rtcp_report_interval: [spec: Membrane.Time.t() | nil],
              telemetry_label: [
                spec: [{atom(), any()}],
                default: []
              ]

  def_input_pad :input, accepted_format: _any, demand_unit: :buffers
  def_output_pad :output, accepted_format: _any, demand_unit: :buffers

  @impl true
  def handle_init(_ctx, opts) do
    if opts.secure? and not Code.ensure_loaded?(ExLibSRTP),
      do: raise("Optional dependency :ex_libsrtp is required when using secure? option")

    add_decryptor =
      &child(&1, :decryptor, struct(SRTP.Decryptor, %{policies: opts.srtp_policies}))

    add_depayloader_bin =
      &child(&1, :depayloader, %RTP.DepayloaderBin{
        depayloader: opts.depayloader,
        clock_rate: opts.clock_rate
      })

    structure =
      bin_input()
      |> add_extensions(opts.extensions)
      |> child(:rtcp_receiver, %RTCP.Receiver{
        local_ssrc: opts.local_ssrc,
        remote_ssrc: opts.remote_ssrc,
        report_interval: opts.rtcp_report_interval,
        telemetry_label: opts.telemetry_label
      })
      |> child(:packet_tracker, %RTP.InboundPacketTracker{
        clock_rate: opts.clock_rate,
        repair_sequence_numbers?: true
      })
      |> then(if opts.secure?, do: add_decryptor, else: & &1)
      |> then(if opts.depayloader, do: add_depayloader_bin, else: & &1)
      |> bin_output()

    {[spec: structure], %{}}
  end

  defp add_extensions(link_builder, extensions) do
    Enum.reduce(extensions, link_builder, fn {extension_name, extension}, builder ->
      builder |> child(extension_name, extension)
    end)
  end
end