lib/membrane_webrtc_plugin/endpoint_bin.ex

defmodule Membrane.WebRTC.EndpointBin do
  @moduledoc """
  Module responsible for interacting with a WebRTC peer.

  New tracks are specified by SDP negotiation conducted by messages
  `t:signal_message/0`, and then linking corresponding
  `:input` and `:output` pads with ids reported via `t:new_track_notification/0`

  The tracks can be manipulated by sending `t:track_message/0`.

  To initiate or modify the connection, the bin sends and expects to receive
  `t:signal_message/0`.
  """
  use Membrane.Bin
  use Bunch

  require Membrane.Logger
  require Membrane.OpenTelemetry

  require Membrane.TelemetryMetrics

  alias __MODULE__.TracksState
  alias ExSDP.Media
  alias ExSDP.Attribute.{FMTP, RTPMapping}
  alias Membrane.ICE
  alias Membrane.RTP
  alias Membrane.WebRTC.{Extension, SDP, Track}
  alias Membrane.WebRTC.Utils

  # we always want to use ICE lite at the moment
  @ice_lite true

  @life_span_id "endpoint_bin.life_span"
  @ice_restart_span_id "endpoint_bin.ice_restart"

  @sdp_offer_event [Membrane.WebRTC, :sdp, :offer]
  @sdp_answer_event [Membrane.WebRTC, :sdp, :answer]

  @type new_track_notification ::
          {:new_track, Track.id(), nil | Track.rid(), RTP.ssrc_t(), Track.encoding_key(),
           depayloading_filter :: module()}
  @type signal_message ::
          {:signal, {:sdp_offer | :sdp_answer, String.t()} | {:candidate, String.t()}}

  @type track_message :: alter_tracks_message()

  @typedoc """
  Message that adds or removes tracks.
  """
  @type alter_tracks_message :: {:add_tracks, [Track.t()]} | {:remove_tracks, [Track.t()]}

  @typedoc """
  Type describing possible media flow directions.

  * `:recvonly` - only receive media from the peer
  * `:sendonly` - only send media to the peer
  * `:sendrecv` - both send and receive media from the peer
  """
  @type direction() :: :recvonly | :sendonly | :sendrecv

  def_options direction: [
                spec: direction(),
                default: :sendrecv,
                description: """
                Direction of EndpointBin. Determines whether
                EndpointBin can send, receive or both send and receive media.
                For more information refer to t:direction/0.
                """
              ],
              handshake_opts: [
                type: :list,
                spec: Keyword.t(),
                default: [],
                description: """
                Keyword list with options for handshake module. For more information please
                refer to `t:ExDTLS.opts_t/0`
                """
              ],
              rtcp_receiver_report_interval: [
                spec: Membrane.Time.t() | nil,
                default: nil,
                description:
                  "Receiver reports's generation interval, set to nil to avoid reports generation"
              ],
              rtcp_sender_report_interval: [
                spec: Membrane.Time.t() | nil,
                default: nil,
                description:
                  "Sender reports's generation interval, set to nil to avoid reports generation"
              ],
              filter_codecs: [
                spec: (Membrane.WebRTC.Track.Encoding.t() -> boolean()),
                default: &SDP.filter_encodings(&1),
                description: "Defines function which will filter SDP m-line by codecs"
              ],
              extensions: [
                spec: [Extension.t()],
                default: [],
                description: "List of WebRTC extensions that should be enabled"
              ],
              log_metadata: [
                spec: :list,
                spec: Keyword.t(),
                default: [],
                description: "Logger metadata used for endpoint bin and all its descendants"
              ],
              integrated_turn_options: [
                spec: [ICE.Endpoint.integrated_turn_options_t()],
                default: [],
                description: "Integrated TURN Options"
              ],
              simulcast?: [
                spec: boolean(),
                default: true,
                description: """
                Whether to accept simulcast tracks or not.
                If set to `false`, simulcast tracks will be disabled i.e.
                sender will not send them.
                """
              ],
              trace_metadata: [
                spec: :list,
                default: [],
                description: "A list of tuples to merge into Otel spans"
              ],
              trace_context: [
                spec: :list | any(),
                default: [],
                description: "Trace context for otel propagation"
              ],
              parent_span: [
                spec: :opentelemetry.span_ctx() | nil,
                default: nil,
                description: "Parent span of #{@life_span_id}"
              ],
              telemetry_label: [
                spec: Membrane.TelemetryMetrics.label(),
                default: [],
                description: "Label passed to Membrane.TelemetryMetrics functions"
              ]

  def_input_pad :input,
    demand_unit: :buffers,
    accepted_format: _any,
    availability: :on_request,
    options: [
      use_payloader?: [
        spec: boolean(),
        default: true,
        description: """
        Defines if incoming stream should be payloaded based on given encoding.
        Otherwise the stream is assumed  be in RTP format.
        """
      ]
    ]

  def_output_pad :output,
    demand_unit: :buffers,
    accepted_format: _any,
    availability: :on_request,
    options: [
      extensions: [
        spec: [Membrane.RTP.SessionBin.extension_t()],
        default: [],
        description:
          "List of general extensions that will be applied to the SessionBin's output pad"
      ],
      use_depayloader?: [
        spec: boolean(),
        default: true,
        description: """
        Defines if the outgoing stream should get depayloaded.

        This option should be used as a convenience, it is not necessary as the new track notification
        returns a depayloading filter's definition that can be attached to the output pad
        to work the same way as with the option set to true.
        """
      ]
    ]

  defmodule State do
    @moduledoc false
    use Bunch.Access

    alias Membrane.WebRTC.EndpointBin.TracksState

    @type t :: %__MODULE__{
            id: String.t(),
            trace_metadata: Keyword.t(),
            log_metadata: Keyword.t(),
            tracks: TracksState.t(),
            endpoint_direction: Membrane.WebRTC.EndpointBin.direction(),
            rtcp_sender_report_interval: Membrane.Time.t() | nil,
            candidates: [any()],
            candidate_gathering_state: nil | :in_progress | :done,
            dtls_fingerprint: nil | {:sha256, binary()},
            pending_rtx: %{Track.id() => {Track.rid(), RTP.ssrc_t()}},
            filter_codecs: ({RTPMapping.t(), FMTP.t() | nil} -> boolean()),
            extensions: [Extension.t()],
            integrated_turn_servers: [any()],
            component_path: String.t(),
            simulcast?: boolean(),
            telemetry_label: Membrane.TelemetryMetrics.label(),
            ice: %{
              restarting?: boolean(),
              waiting_restart?: boolean(),
              pwd: nil | String.t(),
              ufrag: nil | String.t(),
              first?: boolean()
            }
          }

    defstruct id: "endpointBin",
              trace_metadata: [],
              log_metadata: [],
              tracks: %TracksState{},
              endpoint_direction: :sendrecv,
              rtcp_sender_report_interval: nil,
              candidates: [],
              candidate_gathering_state: nil,
              dtls_fingerprint: nil,
              pending_rtx: %{},
              filter_codecs: &SDP.filter_encodings(&1),
              extensions: [],
              integrated_turn_servers: [],
              component_path: "",
              simulcast?: true,
              telemetry_label: [],
              ice: %{
                restarting?: false,
                waiting_restart?: false,
                pwd: nil,
                ufrag: nil,
                first?: true
              }
  end

  @impl true
  def handle_init(_ctx, %__MODULE__{} = opts) do
    Membrane.TelemetryMetrics.register(@sdp_offer_event, opts.telemetry_label)
    Membrane.TelemetryMetrics.register(@sdp_answer_event, opts.telemetry_label)

    trace_metadata =
      Keyword.merge(opts.trace_metadata, [
        {:"library.language", :erlang},
        {:"library.name", :membrane_webrtc_plugin},
        {:"library.version", "semver:#{Application.spec(:membrane_webrtc_plugin, :vsn)}"}
      ])

    if opts.trace_context != [], do: Membrane.OpenTelemetry.attach(opts.trace_context)
    start_span_opts = if opts.parent_span, do: [parent_span: opts.parent_span], else: []
    Membrane.OpenTelemetry.start_span(@life_span_id, start_span_opts)
    Membrane.OpenTelemetry.set_attributes(@life_span_id, trace_metadata)

    rtp_input_ref = make_ref()

    spec = [
      child(:ice, %ICE.Endpoint{
        integrated_turn_options: opts.integrated_turn_options,
        handshake_opts: opts.handshake_opts,
        telemetry_label: opts.telemetry_label,
        trace_context: opts.trace_context,
        parent_span: Membrane.OpenTelemetry.get_span(@life_span_id)
      }),
      child(:rtp, %Membrane.RTP.SessionBin{
        secure?: true,
        rtcp_receiver_report_interval: opts.rtcp_receiver_report_interval,
        rtcp_sender_report_interval: opts.rtcp_sender_report_interval
      }),
      child(:ice_funnel, Membrane.Funnel),
      # always link :rtcp_receiver_output to handle FIR RTCP packets
      get_child(:rtp)
      |> via_out(Pad.ref(:rtcp_receiver_output, rtp_input_ref))
      |> get_child(:ice_funnel),
      get_child(:ice)
      |> via_out(Pad.ref(:output, 1))
      |> via_in(Pad.ref(:rtp_input, rtp_input_ref))
      |> get_child(:rtp),
      get_child(:ice_funnel)
      |> via_out(:output)
      |> via_in(Pad.ref(:input, 1))
      |> get_child(:ice)
    ]

    spec = {spec, log_metadata: opts.log_metadata}

    extensions = Enum.map(opts.extensions, &if(is_struct(&1), do: &1, else: &1.new()))

    state = %State{
      id: Keyword.get(trace_metadata, :name, "endpointBin"),
      trace_metadata: trace_metadata,
      log_metadata: opts.log_metadata,
      endpoint_direction: opts.direction,
      rtcp_sender_report_interval: opts.rtcp_sender_report_interval,
      filter_codecs: opts.filter_codecs,
      integrated_turn_servers: ICE.TURNManager.get_launched_turn_servers(),
      extensions: extensions,
      component_path: Membrane.ComponentPath.get_formatted(),
      simulcast?: opts.simulcast?,
      telemetry_label: opts.telemetry_label
    }

    {[spec: spec], state}
  end

  @impl true
  def handle_pad_added(Pad.ref(:input, track_id) = pad, ctx, state) do
    # TODO: check this one
    %{use_payloader?: use_payloader?} = ctx.options

    %Track{
      ssrc: ssrc,
      selected_encoding_key: encoding_key,
      selected_encoding: encoding,
      extmaps: extmaps
    } = Map.fetch!(state.tracks.outbound, track_id)

    rtp_extension_mapping = Map.new(extmaps, &Extension.as_rtp_mapping(state.extensions, &1))

    options = [
      encoding: encoding_key,
      clock_rate: encoding.clock_rate,
      payload_type: encoding.payload_type,
      rtp_extension_mapping: rtp_extension_mapping
    ]

    encoding_specific_links =
      case encoding_key do
        :H264 when use_payloader? ->
          &child(&1, {:h264_parser, ssrc}, %Membrane.H264.FFmpeg.Parser{alignment: :nal})

        _other ->
          & &1
      end

    payloader =
      if use_payloader? do
        {:ok, payloader} = Membrane.RTP.PayloadFormatResolver.payloader(encoding_key)

        payloader
      else
        nil
      end

    rtp_extensions = to_rtp_extensions(extmaps, :outbound, state)

    spec = [
      get_child(:rtp)
      |> via_out(Pad.ref(:rtcp_sender_output, ssrc))
      |> get_child(:ice_funnel),
      bin_input(pad)
      |> then(encoding_specific_links)
      |> via_in(Pad.ref(:input, ssrc),
        options: [
          payloader: payloader,
          rtp_extensions: rtp_extensions,
          telemetry_label: state.telemetry_label ++ [track_id: "#{track_id}"]
        ]
      )
      |> get_child(:rtp)
      |> via_out(Pad.ref(:rtp_output, ssrc), options: options)
      |> get_child(:ice_funnel)
    ]

    {[spec: spec], state}
  end

  @impl true
  def handle_pad_added(Pad.ref(:output, {track_id, rid}) = pad, ctx, state) do
    %Track{
      ssrc: ssrc,
      selected_encoding_key: encoding_key,
      selected_encoding: encoding,
      extmaps: extmaps
    } = track = Map.fetch!(state.tracks.inbound, track_id)

    # if `rid` is set, it is a request for specific encoding of simulcast track
    # choose ssrc which corresponds to given `rid`
    ssrc = if rid, do: Map.fetch!(track.rid_to_ssrc, rid), else: ssrc

    %{
      use_depayloader?: use_depayloader?
    } = ctx.options

    depayloader =
      if use_depayloader? do
        {:ok, depayloader} = Membrane.RTP.PayloadFormatResolver.depayloader(encoding_key)

        depayloader
      else
        nil
      end

    telemetry_label = state.telemetry_label ++ [track_id: "#{track_id}:#{rid}"]

    output_pad_options = [
      extensions: ctx.options.extensions,
      rtp_extensions: to_rtp_extensions(extmaps, :inbound, state),
      clock_rate: encoding.clock_rate,
      depayloader: depayloader,
      telemetry_label: telemetry_label,
      encoding: encoding_key
    ]

    spec =
      get_child(:rtp)
      |> via_out(Pad.ref(:output, ssrc), options: output_pad_options)
      |> bin_output(pad)

    state = put_in(state.tracks.inbound[track_id].status, :linked)

    {[spec: spec], state}
  end

  @impl true
  def handle_child_notification(
        {:new_rtp_stream, ssrc, pt, _rtp_header_extensions},
        _from,
        _ctx,
        %State{endpoint_direction: :sendonly} = _state
      ) do
    raise """
    Received new RTP stream but EndpointBin is set to :sendonly.
    RTP stream params: SSRC: #{inspect(ssrc)}, PT: #{inspect(pt)}"
    """
  end

  @impl true
  def handle_child_notification(
        {:new_rtp_stream, ssrc, pt, rtp_header_extensions},
        _from,
        _ctx,
        %State{} = state
      ) do
    stream_info =
      TracksState.identify_inbound_stream(
        state.tracks,
        ssrc,
        pt,
        rtp_header_extensions,
        state.extensions
      )

    state = %State{state | tracks: TracksState.register_stream(state.tracks, ssrc, stream_info)}
    {_cast_type, encoding_type, rid, track_id} = stream_info

    case encoding_type do
      :rtx -> handle_new_rtx_stream(ssrc, track_id, rid, state)
      :media -> handle_new_rtp_stream(ssrc, track_id, rid, state)
    end
  end

  @impl true
  def handle_child_notification(
        {:handshake_init_data, _component_id, fingerprint},
        _from,
        _ctx,
        state
      ) do
    {[], %{state | dtls_fingerprint: {:sha256, hex_dump(fingerprint)}}}
  end

  @impl true
  def handle_child_notification({:local_credentials, credentials}, _from, _ctx, state) do
    [ice_ufrag, ice_pwd] = String.split(credentials, " ")

    {actions, state} =
      if state.ice.first? and state.tracks.outbound == %{} do
        {[], state}
      else
        state = Map.update!(state, :ice, &%{&1 | first?: false})
        get_offer_data(state)
      end

    state = %{state | ice: %{state.ice | ufrag: ice_ufrag, pwd: ice_pwd}}
    {actions, state}
  end

  @impl true
  def handle_child_notification({:new_candidate_full, cand}, _from, _ctx, state) do
    Membrane.OpenTelemetry.add_event(@ice_restart_span_id, :local_candidate, candidate: cand)
    state = Map.update!(state, :candidates, &[cand | &1])
    {notify_candidates([cand]), state}
  end

  @impl true
  def handle_child_notification(:candidate_gathering_done, _from, _ctx, state) do
    Membrane.OpenTelemetry.add_event(@ice_restart_span_id, :candidate_gathering_done, [])
    {[], %{state | candidate_gathering_state: :done}}
  end

  @impl true
  def handle_child_notification(
        {:connection_failed, _stream_id, _component_id},
        _from,
        _ctx,
        state
      ) do
    Membrane.OpenTelemetry.add_event(@ice_restart_span_id, :connection_failed)
    Membrane.OpenTelemetry.end_span(@ice_restart_span_id)

    state = %{state | ice: %{state.ice | restarting?: false}}
    maybe_restart_ice(state, true)
  end

  @impl true
  def handle_child_notification({:connection_ready, stream_id, component_id}, _from, _ctx, state)
      when state.ice.restarting? do
    Membrane.OpenTelemetry.add_event(@ice_restart_span_id, :connection_ready,
      stream_id: stream_id,
      component_id: component_id
    )

    Membrane.OpenTelemetry.end_span(@ice_restart_span_id)

    new_outbound_tracks =
      state.tracks.outbound
      |> Map.values()
      |> Enum.filter(&(&1.status === :ready))

    negotiations = [notify_parent: {:negotiation_done, new_outbound_tracks}]

    state =
      state
      |> Map.update!(:tracks, &TracksState.change_outbound_status(&1, :ready, :linked))
      |> put_in([:ice, :restarting?], false)

    {restart_action, state} = maybe_restart_ice(state)

    actions = negotiations ++ restart_action

    {actions, state}
  end

  @impl true
  def handle_child_notification(
        {:connection_ready, _stream_id, _component_id},
        _from,
        _ctx,
        state
      ),
      do: {[], state}

  @impl true
  def handle_child_notification({:udp_integrated_turn, turn}, _from, _ctx, state) do
    state = %{state | integrated_turn_servers: [turn] ++ state.integrated_turn_servers}
    {[], state}
  end

  @impl true
  def handle_child_notification({:bandwidth_estimation, _estimation} = msg, _from, _ctx, state) do
    {[notify_parent: msg], state}
  end

  @impl true
  def handle_child_notification(notification, from, _ctx, state) do
    Membrane.Logger.warn("Ignoring child #{inspect(from)} notification #{inspect(notification)}")
    {[], state}
  end

  @impl true
  def handle_parent_notification({:signal, {:sdp_offer, raw_sdp, mid_to_track_id}}, _ctx, state) do
    Membrane.TelemetryMetrics.execute(
      @sdp_offer_event,
      %{sdp: Utils.anonymize_sdp(raw_sdp)},
      %{},
      state.telemetry_label
    )

    Membrane.OpenTelemetry.add_event(@ice_restart_span_id, :sdp_offer, sdp: raw_sdp)

    sdp = ExSDP.parse!(raw_sdp)

    {new_inbound_tracks, removed_inbound_tracks, inbound_tracks, outbound_tracks} =
      get_tracks_from_sdp(sdp, mid_to_track_id, state)

    for {label, tracks} <- [
          new_inbound_tracks: new_inbound_tracks,
          removed_inbound_tracks: removed_inbound_tracks,
          inbound_tracks: inbound_tracks,
          outbound_tracks: outbound_tracks
        ] do
      if tracks != [] do
        Membrane.OpenTelemetry.set_attribute(
          @ice_restart_span_id,
          label,
          Enum.map(tracks, &Track.to_otel_data/1)
        )
      end
    end

    tracks_state =
      state.tracks
      |> TracksState.update(outbound: outbound_tracks, inbound: inbound_tracks)
      |> TracksState.add_inbound_tracks(new_inbound_tracks)

    state = %State{state | tracks: tracks_state}

    new_actions = new_tracks_actions(new_inbound_tracks)

    answer =
      SDP.create_answer(
        inbound_tracks: inbound_tracks,
        outbound_tracks: outbound_tracks,
        ice_ufrag: state.ice.ufrag,
        ice_pwd: state.ice.pwd,
        fingerprint: state.dtls_fingerprint,
        extensions: state.extensions,
        ice_lite?: @ice_lite
      )

    {actions, state} =
      withl tracks_check: false <- TracksState.empty?(state.tracks),
            candidate_gathering_check: nil <- state.candidate_gathering_state do
        {[notify_child: {:ice, :gather_candidates}],
         %{state | candidate_gathering_state: :in_progress}}
      else
        tracks_check: _ -> {[], state}
        candidate_gathering_check: _ -> {notify_candidates(state.candidates), state}
      end

    inbound_tracks = SDP.filter_simulcast_tracks(inbound_tracks)
    mid_to_track_id = Map.new(inbound_tracks ++ outbound_tracks, &{&1.mid, &1.id})

    actions =
      if Enum.empty?(removed_inbound_tracks),
        do: actions,
        else: actions ++ [notify_parent: {:removed_tracks, removed_inbound_tracks}]

    answer_str = to_string(answer)

    Membrane.TelemetryMetrics.execute(
      @sdp_answer_event,
      %{sdp: Utils.anonymize_sdp(answer_str)},
      %{},
      state.telemetry_label
    )

    actions =
      new_actions ++
        [notify_parent: {:signal, {:sdp_answer, answer_str, mid_to_track_id}}] ++
        set_remote_credentials(sdp) ++
        actions

    Membrane.OpenTelemetry.add_event(@ice_restart_span_id, :sdp_answer, sdp: to_string(answer))

    {actions, state}
  end

  @impl true
  def handle_parent_notification({:signal, {:candidate, candidate}}, _ctx, state) do
    candidate = "a=" <> candidate

    Membrane.OpenTelemetry.add_event(@ice_restart_span_id, :remote_candidate, candidate: candidate)

    # TODO: decide what to do with this candidate
    {[], state}
  end

  @impl true
  def handle_parent_notification({:signal, :renegotiate_tracks}, _ctx, state) do
    cond do
      state.ice.first? and state.ice.pwd != nil ->
        state = Map.update!(state, :ice, &%{&1 | first?: false})
        get_offer_data(state)

      state.ice.first? ->
        state = Map.update!(state, :ice, &%{&1 | first?: false})
        {[], state}

      state.ice.pwd == nil ->
        {[], state}

      true ->
        maybe_restart_ice(state, true)
    end
  end

  @impl true
  def handle_parent_notification({:add_tracks, _tracks}, _ctx, %State{
        endpoint_direction: :recvonly
      }) do
    raise """
    Cannot add outbound tracks when EndpointBin is set to :recvonly.
    You can add outbound tracks only when EndpointBin is set to :sendonly or :sendrecv.
    """
  end

  def handle_parent_notification({:add_tracks, tracks}, _ctx, state) do
    state = %State{state | tracks: TracksState.add_outbound_tracks(state.tracks, tracks)}

    if state.ice.first? do
      state
      |> put_in([:ice, :first?], false)
      |> Map.update!(:tracks, &TracksState.change_outbound_status(&1, :pending, :ready))
      |> case do
        %{ice: %{pwd: nil}} = state -> {[], state}
        state -> get_offer_data(state)
      end
    else
      maybe_restart_ice(state, true)
    end
  end

  @impl true
  def handle_parent_notification({:remove_tracks, tracks_to_remove}, _ctx, state) do
    state
    |> Map.update!(:tracks, &TracksState.disable_tracks(&1, tracks_to_remove))
    |> maybe_restart_ice(true)
  end

  defp maybe_restart_ice(state, set_waiting_restart? \\ false) do
    state =
      if set_waiting_restart?,
        do: %{state | ice: %{state.ice | waiting_restart?: true}},
        else: state

    if not state.ice.restarting? and state.ice.waiting_restart? do
      Membrane.OpenTelemetry.start_span(@ice_restart_span_id, parent_id: @life_span_id)

      state =
        state
        |> put_in([:ice, :restarting?], true)
        |> put_in([:ice, :waiting_restart?], false)
        |> Map.update!(:tracks, &TracksState.change_outbound_status(&1, :pending, :ready))

      {[notify_child: {:ice, :restart_stream}], state}
    else
      {[], state}
    end
  end

  defp get_offer_data(state) do
    tracks_types =
      state.tracks.outbound
      |> Map.values()
      |> Enum.filter(&(&1.status != :pending))
      |> Enum.map(& &1.type)

    media_count = %{
      audio: Enum.count(tracks_types, &(&1 == :audio)),
      video: Enum.count(tracks_types, &(&1 == :video))
    }

    actions = [
      notify_parent: {:signal, {:offer_data, media_count, state.integrated_turn_servers}}
    ]

    if not state.ice.restarting?,
      do: Membrane.OpenTelemetry.start_span(@ice_restart_span_id, parent_id: @life_span_id)

    state = Map.update!(state, :ice, &%{&1 | restarting?: true})

    {actions, state}
  end

  defp get_tracks_from_sdp(sdp, mid_to_track_id, state) do
    old_inbound_tracks = Map.values(state.tracks.inbound)

    outbound_tracks = Map.values(state.tracks.outbound) |> Enum.filter(&(&1.status != :pending))

    constraints = %Track.Constraints{
      codecs_filter: state.filter_codecs,
      enabled_extensions: state.extensions,
      simulcast?: state.simulcast?,
      endpoint_direction: state.endpoint_direction
    }

    SDP.get_tracks(sdp, constraints, old_inbound_tracks, outbound_tracks, mid_to_track_id)
  end

  defp new_tracks_actions([]) do
    []
  end

  defp new_tracks_actions(new_tracks) do
    notify = [notify_parent: {:new_tracks, new_tracks}]

    known_ssrcs =
      new_tracks
      |> Enum.flat_map(fn track ->
        List.wrap(track.ssrc) ++ List.wrap(track.rtx_ssrc)
      end)

    require_extensions =
      new_tracks
      |> Enum.filter(&Track.simulcast?/1)
      |> Enum.flat_map(fn track ->
        mid_ext = Enum.find(track.extmaps, &(&1.uri == Extension.Mid.uri()))
        rid_ext = Enum.find(track.extmaps, &(&1.uri == Extension.Rid.uri()))
        repaired_rid_ext = Enum.find(track.extmaps, &(&1.uri == Extension.RepairedRid.uri()))

        if is_nil(mid_ext) or is_nil(rid_ext) do
          raise "Simulcast without mid or rid extensions is not supported!"
        end

        pt_to_ext_id = %{track.selected_encoding.payload_type => [mid_ext.id, rid_ext.id]}

        cond do
          is_nil(track.selected_encoding.rtx) ->
            pt_to_ext_id

          is_nil(repaired_rid_ext) ->
            raise "RTX simulcast requires RepairedRid extension to be enabled!"

          true ->
            Map.put(pt_to_ext_id, track.selected_encoding.rtx.payload_type, [
              mid_ext.id,
              repaired_rid_ext.id
            ])
        end
      end)
      |> then(
        &[
          notify_child:
            {:rtp,
             %Membrane.RTP.SSRCRouter.StreamsInfo{
               require_extensions: Map.new(&1),
               accept_ssrcs: known_ssrcs
             }}
        ]
      )

    require_extensions ++ notify
  end

  defp notify_candidates(candidates) do
    Enum.flat_map(candidates, fn cand ->
      [notify_parent: {:signal, {:candidate, cand, 0}}]
    end)
  end

  defp set_remote_credentials(sdp) do
    case List.first(sdp.media) do
      nil ->
        []

      media ->
        {_key, ice_ufrag} = Media.get_attribute(media, :ice_ufrag)
        {_key, ice_pwd} = Media.get_attribute(media, :ice_pwd)
        remote_credentials = ice_ufrag <> " " <> ice_pwd
        [notify_child: {:ice, {:set_remote_credentials, remote_credentials}}]
    end
  end

  defp hex_dump(digest_str) do
    digest_str
    |> :binary.bin_to_list()
    |> Enum.map_join(":", &Base.encode16(<<&1>>))
  end

  defp handle_new_rtx_stream(ssrc, track_id, nil, state) do
    # non-simulcast RTX, track.ssrc is not a list
    track = Map.fetch!(state.tracks.inbound, track_id)
    actions = rtx_info_actions(ssrc, track.ssrc, track, state.extensions)
    {actions, state}
  end

  defp handle_new_rtx_stream(ssrc, track_id, rid, state) do
    # simulcast RTX, we need a mapping from rid to ssrc
    track = Map.fetch!(state.tracks.inbound, track_id)

    case Map.fetch(track.rid_to_ssrc, rid) do
      :error ->
        # Since we don't know the original_ssrc yet, we must wait for it and out the new ssrc in pending_rtx map
        {[], put_in(state.pending_rtx[{track_id, rid}], ssrc)}

      {:ok, original_ssrc} ->
        actions = rtx_info_actions(ssrc, original_ssrc, track, state.extensions)
        {actions, state}
    end
  end

  defp handle_new_rtp_stream(ssrc, track_id, rid, state) do
    track = Map.fetch!(state.tracks.inbound, track_id)
    depayloading_filter = depayloading_filter_for(track)

    notification = [
      notify_parent:
        {:new_track, track_id, rid, ssrc, track.selected_encoding_key, depayloading_filter}
    ]

    {pending_rtx, state} = pop_in(state.pending_rtx[{track_id, rid}])

    case pending_rtx do
      nil ->
        {notification, state}

      rtx_ssrc ->
        rtx_info_actions = rtx_info_actions(rtx_ssrc, ssrc, track, state.extensions)

        {notification ++ rtx_info_actions, state}
    end
  end

  defp rtx_info_actions(ssrc, original_ssrc, track, extensions) do
    rtp_extension_mapping = Map.new(track.extmaps, &Extension.as_rtp_mapping(extensions, &1))

    rtx_info = %Membrane.RTP.SessionBin.RTXInfo{
      ssrc: ssrc,
      original_ssrc: original_ssrc,
      original_payload_type: track.selected_encoding.payload_type,
      rid_id: rtp_extension_mapping[:rid],
      repaired_rid_id: rtp_extension_mapping[:repaired_rid]
    }

    [notify_child: {:rtp, rtx_info}]
  end

  defp depayloading_filter_for(track) do
    case Membrane.RTP.PayloadFormatResolver.depayloader(track.selected_encoding_key) do
      {:ok, depayloader} ->
        %Membrane.RTP.DepayloaderBin{
          depayloader: depayloader,
          clock_rate: track.selected_encoding.clock_rate
        }

      :error ->
        nil
    end
  end

  defp to_rtp_extensions(extmaps, track_type, state) do
    extmaps
    |> Enum.map(&Extension.as_rtp_extension(state.extensions, &1, track_type))
    |> Enum.reject(fn {_name, rtp_module} -> rtp_module == :no_rtp_module end)
  end
end