lib/membrane/rtp/stream_receive_bin.ex

defmodule Membrane.RTP.StreamReceiveBin do
  @moduledoc """
  This bin gets a parsed RTP stream on input and outputs raw media stream.

  Its responsibility is to depayload the RTP stream and compensate the
  jitter.
  """

  use Membrane.Bin

  alias Membrane.{ParentSpec, RTCP, RTP, SRTP}

  def_options srtp_policies: [
                spec: [ExLibSRTP.Policy.t()],
                default: []
              ],
              secure?: [
                type: :boolean,
                default: false
              ],
              extensions: [
                spec: [RTP.SessionBin.extension_t()],
                default: []
              ],
              clock_rate: [
                type: :integer,
                spec: RTP.clock_rate_t()
              ],
              depayloader: [spec: module() | nil],
              local_ssrc: [spec: RTP.ssrc_t()],
              remote_ssrc: [spec: RTP.ssrc_t()],
              rtcp_report_interval: [spec: Membrane.Time.t() | nil],
              telemetry_label: [
                spec: [{atom(), any()}],
                default: []
              ]

  def_input_pad :input, demand_unit: :buffers, caps: :any
  def_output_pad :output, caps: :any, demand_unit: :buffers

  @impl true
  def handle_init(opts) do
    if opts.secure? and not Code.ensure_loaded?(ExLibSRTP),
      do: raise("Optional dependency :ex_libsrtp is required when using secure? option")

    maybe_link_decryptor =
      &to(&1, :decryptor, struct(SRTP.Decryptor, %{policies: opts.srtp_policies}))

    maybe_link_depayloader_bin =
      &to(&1, :depayloader, %RTP.DepayloaderBin{
        depayloader: opts.depayloader,
        clock_rate: opts.clock_rate
      })

    links = [
      link_bin_input()
      |> to_extensions(opts.extensions)
      |> to(:rtcp_receiver, %RTCP.Receiver{
        local_ssrc: opts.local_ssrc,
        remote_ssrc: opts.remote_ssrc,
        report_interval: opts.rtcp_report_interval,
        telemetry_label: opts.telemetry_label
      })
      |> to(:packet_tracker, %RTP.InboundPacketTracker{
        clock_rate: opts.clock_rate,
        repair_sequence_numbers?: true
      })
      |> then(if opts.secure?, do: maybe_link_decryptor, else: & &1)
      |> then(if opts.depayloader, do: maybe_link_depayloader_bin, else: & &1)
      |> to_bin_output()
    ]

    spec = %ParentSpec{
      links: links
    }

    {{:ok, spec: spec}, %{}}
  end

  defp to_extensions(link_builder, extensions) do
    Enum.reduce(extensions, link_builder, fn {extension_name, extension}, builder ->
      builder |> to(extension_name, extension)
    end)
  end
end