lib/membrane_webrtc_plugin/endpoint_bin.ex

defmodule Membrane.WebRTC.EndpointBin do
  @moduledoc """
  Module responsible for interacting with a WebRTC peer.

  To send or receive tracks from a WebRTC peer, specify them with
  `:inbound_tracks` and `:outbound_tracks` options, and link corresponding
  `:input` and `:output` pads with ids matching the declared tracks' ids.

  The tracks can be manipulated by sending `t:track_message/0`.

  To initiate or modify the connection, the bin sends and expects to receive
  `t:signal_message/0`.
  """
  use Membrane.Bin
  use Bunch
  use OpenTelemetryDecorator

  alias ExSDP.Media
  alias ExSDP.Attribute.{FMTP, RTPMapping}
  alias Membrane.ICE
  alias Membrane.WebRTC.{Extension, SDP, Track}
  require Membrane.Logger
  require OpenTelemetry.Tracer, as: Tracer

  # we always want to use ICE lite at the moment
  @ice_lite true

  @type signal_message ::
          {:signal, {:sdp_offer | :sdp_answer, String.t()} | {:candidate, String.t()}}

  @type track_message :: alter_tracks_message()

  @typedoc """
  Message that adds or removes tracks.
  """
  @type alter_tracks_message :: {:add_tracks, [Track.t()]} | {:remove_tracks, [Track.id()]}

  @typedoc """
  Type describing possible media flow directions.

  * `:recvonly` - only receive media from the peer
  * `:sendonly` - only send media to the peer
  * `:sendrecv` - both send and receive media from the peer
  """
  @type direction() :: :recvonly | :sendonly | :sendrecv

  def_options inbound_tracks: [
                spec: [Membrane.WebRTC.Track.t()],
                default: [],
                description: "List of initial inbound tracks"
              ],
              outbound_tracks: [
                spec: [Membrane.WebRTC.Track.t()],
                default: [],
                description: "List of initial outbound tracks"
              ],
              direction: [
                spec: direction(),
                default: :sendrecv,
                description: """
                Direction of EndpointBin. Determines whether
                EndpointBin can send, receive or both send and receive media.
                For more information refer to t:direction/0.
                """
              ],
              handshake_opts: [
                type: :list,
                spec: Keyword.t(),
                default: [],
                description: """
                Keyword list with options for handshake module. For more information please
                refer to `t:ExDTLS.opts_t/0`
                """
              ],
              rtcp_receiver_report_interval: [
                spec: Membrane.Time.t() | nil,
                default: nil,
                description:
                  "Receiver reports's generation interval, set to nil to avoid reports generation"
              ],
              rtcp_sender_report_interval: [
                spec: Membrane.Time.t() | nil,
                default: nil,
                description:
                  "Sender reports's generation interval, set to nil to avoid reports generation"
              ],
              filter_codecs: [
                spec: ({RTPMapping.t(), FMTP.t() | nil} -> boolean()),
                default: &SDP.filter_mappings(&1),
                description: "Defines function which will filter SDP m-line by codecs"
              ],
              extensions: [
                spec: [Extension.t()],
                default: [],
                description: "List of WebRTC extensions that should be enabled"
              ],
              log_metadata: [
                spec: :list,
                spec: Keyword.t(),
                default: [],
                description: "Logger metadata used for endpoint bin and all its descendants"
              ],
              integrated_turn_options: [
                spec: [ICE.Endpoint.integrated_turn_options_t()],
                default: [],
                description: "Integrated TURN Options"
              ],
              simulcast?: [
                spec: boolean(),
                default: true,
                description: """
                Whether to accept simulcast tracks or not.
                If set to `false`, simulcast tracks will be disabled i.e.
                sender will not send them.
                """
              ],
              trace_metadata: [
                spec: :list,
                default: [],
                description: "A list of tuples to merge into Otel spans"
              ],
              trace_context: [
                spec: :list | any(),
                default: [],
                description: "Trace context for otel propagation"
              ],
              telemetry_label: [
                spec: Membrane.TelemetryMetrics.label(),
                default: [],
                description: "Label passed to Membrane.TelemetryMetrics functions"
              ]

  def_input_pad :input,
    demand_unit: :buffers,
    caps: :any,
    availability: :on_request,
    options: [
      use_payloader?: [
        spec: boolean(),
        default: true,
        description: """
        Defines if incoming stream should be payloaded based on given encoding.
        Otherwise the stream is assumed  be in RTP format.
        """
      ]
    ]

  def_output_pad :output,
    demand_unit: :buffers,
    caps: :any,
    availability: :on_request,
    options: [
      extensions: [
        spec: [Membrane.RTP.SessionBin.extension_t()],
        default: [],
        description:
          "List of general extensions that will be applied to the SessionBin's output pad"
      ],
      use_depayloader?: [
        spec: boolean(),
        default: true,
        description: """
        Defines if the outgoing stream should get depayloaded.

        This option should be used as a convenience, it is not necessary as the new track notification
        returns a depayloading filter's definition that can be attached to the output pad
        to work the same way as with the option set to true.
        """
      ],
      rtcp_fir_interval: [
        spec: Membrane.Time.t() | nil,
        default: Membrane.Time.second(),
        description: """
        Defines how often FIR should be sent.

        For more information refer to RFC 5104 section 4.3.1.
        """
      ]
    ]

  defmodule State do
    @moduledoc false
    use Bunch.Access

    @typedoc """
    * `simulcast_track_ids` - list of simulcast track ids.
    * `ssrc_to_track_id` - maps inbound track ssrc to its id.
    If track is a simulcast track it might not be in this list until
    we receive its first RTP packets. This is beacuse simulcast tracks
    don't announce their SSRCs in SDP. Instead, we have to wait for
    their first RTP packets. There might be many SSRC pointing to the same track.
    """
    @type t :: %__MODULE__{
            id: String.t(),
            trace_metadata: Keyword.t(),
            log_metadata: Keyword.t(),
            inbound_tracks: %{Track.id() => Track.t()},
            outbound_tracks: %{Track.id() => Track.t()},
            endpoint_direction: Membrane.WebRTC.EndpointBin.direction(),
            rtcp_sender_report_interval: Membrane.Time.t() | nil,
            candidates: [any()],
            candidate_gathering_state: nil | :in_progress | :done,
            dtls_fingerprint: nil | {:sha256, binary()},
            simulcast_track_ids: [Track.id()],
            ssrc_to_track_id: %{RTP.ssrc_t() => Track.id()},
            filter_codecs: ({RTPMapping.t(), FMTP.t() | nil} -> boolean()),
            extensions: [Extension.t()],
            integrated_turn_servers: [any()],
            component_path: String.t(),
            simulcast?: boolean(),
            telemetry_label: Membrane.TelemetryMetrics.label(),
            ice: %{
              restarting?: boolean(),
              waiting_restart?: boolean(),
              pwd: nil | String.t(),
              ufrag: nil | String.t(),
              first?: boolean()
            }
          }

    defstruct id: "endpointBin",
              trace_metadata: [],
              log_metadata: [],
              inbound_tracks: %{},
              outbound_tracks: %{},
              endpoint_direction: :sendrecv,
              rtcp_sender_report_interval: nil,
              candidates: [],
              candidate_gathering_state: nil,
              dtls_fingerprint: nil,
              simulcast_track_ids: [],
              ssrc_to_track_id: %{},
              filter_codecs: &SDP.filter_mappings(&1),
              extensions: [],
              integrated_turn_servers: [],
              component_path: "",
              simulcast?: true,
              telemetry_label: [],
              ice: %{
                restarting?: false,
                waiting_restart?: false,
                pwd: nil,
                ufrag: nil,
                first?: true
              }
  end

  @impl true
  def handle_init(opts) do
    trace_metadata =
      Keyword.merge(opts.trace_metadata, [
        {:"library.language", :erlang},
        {:"library.name", :membrane_webrtc_plugin},
        {:"library.version", "semver:#{Application.spec(:membrane_webrtc_plugin, :vsn)}"}
      ])

    create_or_join_otel_context(opts, trace_metadata)

    children = %{
      ice: %ICE.Endpoint{
        integrated_turn_options: opts.integrated_turn_options,
        handshake_opts: opts.handshake_opts,
        telemetry_label: opts.telemetry_label
      },
      rtp: %Membrane.RTP.SessionBin{
        secure?: true,
        rtcp_receiver_report_interval: opts.rtcp_receiver_report_interval,
        rtcp_sender_report_interval: opts.rtcp_sender_report_interval
      },
      ice_funnel: Membrane.Funnel
    }

    rtp_input_ref = make_ref()

    links = [
      # always link :rtcp_receiver_output to handle FIR RTCP packets
      link(:rtp)
      |> via_out(Pad.ref(:rtcp_receiver_output, rtp_input_ref))
      |> to(:ice_funnel),
      link(:ice)
      |> via_out(Pad.ref(:output, 1))
      |> via_in(Pad.ref(:rtp_input, rtp_input_ref))
      |> to(:rtp),
      link(:ice_funnel)
      |> via_out(:output)
      |> via_in(Pad.ref(:input, 1))
      |> to(:ice)
    ]

    spec = %ParentSpec{
      children: children,
      links: links,
      log_metadata: opts.log_metadata
    }

    state =
      %State{
        id: Keyword.get(trace_metadata, :name, "endpointBin"),
        trace_metadata: trace_metadata,
        log_metadata: opts.log_metadata,
        inbound_tracks: %{},
        outbound_tracks: %{},
        endpoint_direction: opts.direction,
        rtcp_sender_report_interval: opts.rtcp_sender_report_interval,
        candidates: [],
        candidate_gathering_state: nil,
        dtls_fingerprint: nil,
        ssrc_to_track_id: %{},
        filter_codecs: opts.filter_codecs,
        integrated_turn_servers: ICE.TURNManager.get_launched_turn_servers(),
        extensions: Enum.map(opts.extensions, &if(is_struct(&1), do: &1, else: &1.new())),
        component_path: Membrane.ComponentPath.get_formatted(),
        simulcast?: opts.simulcast?,
        telemetry_label: opts.telemetry_label,
        ice: %{
          restarting?: false,
          waiting_restart?: false,
          pwd: nil,
          ufrag: nil,
          first?: true
        }
      }
      |> add_tracks(:inbound_tracks, opts.inbound_tracks)
      |> add_tracks(:outbound_tracks, opts.outbound_tracks)

    {{:ok, spec: spec}, state}
  end

  @impl true
  @decorate trace("endpoint_bin.pad_added.input",
              include: [
                [:mapping, :clock_rate],
                [:mapping, :payload_type],
                :encoding,
                :ssrc,
                :track_id,
                [:state, :id]
              ]
            )
  def handle_pad_added(Pad.ref(:input, track_id) = pad, ctx, state) do
    # TODO: check this one
    %{use_payloader?: use_payloader?} = ctx.options

    %Track{ssrc: ssrc, encoding: encoding, rtp_mapping: mapping, extmaps: extmaps} =
      Map.fetch!(state.outbound_tracks, track_id)

    rtp_extension_mapping = Map.new(extmaps, &Extension.as_rtp_mapping(state.extensions, &1))

    options = [
      encoding: encoding,
      clock_rate: mapping.clock_rate,
      payload_type: mapping.payload_type,
      rtp_extension_mapping: rtp_extension_mapping
    ]

    encoding_specific_links =
      case encoding do
        :H264 when use_payloader? ->
          &to(&1, {:h264_parser, ssrc}, %Membrane.H264.FFmpeg.Parser{alignment: :nal})

        _other ->
          & &1
      end

    payloader =
      if use_payloader? do
        {:ok, payloader} = Membrane.RTP.PayloadFormatResolver.payloader(encoding)

        payloader
      else
        nil
      end

    # link sender reports's pad only if we are going to generate the reports
    links =
      if state.rtcp_sender_report_interval do
        [
          link(:rtp)
          |> via_out(Pad.ref(:rtcp_sender_output, ssrc))
          |> to(:ice_funnel)
        ]
      else
        []
      end ++
        [
          link_bin_input(pad)
          |> then(encoding_specific_links)
          |> via_in(Pad.ref(:input, ssrc), options: [payloader: payloader])
          |> to(:rtp)
          |> via_out(Pad.ref(:rtp_output, ssrc), options: options)
          |> to(:ice_funnel)
        ]

    {{:ok, spec: %ParentSpec{links: links}}, state}
  end

  @impl true
  @decorate trace("endpoint_bin.pad_added.output",
              include: [
                [:rtp_mapping, :clock_rate],
                [:rtp_mapping, :payload_type],
                :ssrc,
                :track_id,
                [:state, :id]
              ]
            )
  def handle_pad_added(Pad.ref(:output, {track_id, rid}) = pad, ctx, state) do
    %Track{ssrc: ssrc, encoding: encoding, rtp_mapping: rtp_mapping, extmaps: extmaps} =
      track = Map.fetch!(state.inbound_tracks, track_id)

    # if `rid` is set, it is a request for specific encoding of simulcast track
    # choose ssrc which corresponds to given `rid`
    ssrc = if rid, do: Map.fetch!(track.rid_to_ssrc, rid), else: ssrc

    %{
      use_depayloader?: use_depayloader?,
      rtcp_fir_interval: rtcp_fir_interval
    } = ctx.options

    depayloader =
      if use_depayloader? do
        {:ok, depayloader} = Membrane.RTP.PayloadFormatResolver.depayloader(encoding)

        depayloader
      else
        nil
      end

    rtp_extensions =
      extmaps
      |> Enum.map(&Extension.as_rtp_extension(state.extensions, &1))
      |> Enum.reject(fn {_name, rtp_module} -> rtp_module == :no_rtp_module end)

    telemetry_label = state.telemetry_label ++ [track_id: "#{track_id}:#{rid}"]

    output_pad_options = [
      extensions: ctx.options.extensions,
      rtp_extensions: rtp_extensions,
      clock_rate: rtp_mapping.clock_rate,
      depayloader: depayloader,
      telemetry_label: telemetry_label,
      encoding: encoding,
      rtcp_fir_interval: rtcp_fir_interval
    ]

    spec = %ParentSpec{
      links: [
        link(:rtp)
        |> via_out(Pad.ref(:output, ssrc), options: output_pad_options)
        |> to_bin_output(pad)
      ]
    }

    state = put_in(state, [:inbound_tracks, track_id], %{track | status: :linked})

    {{:ok, spec: spec}, state}
  end

  @impl true
  @decorate trace("endpoint_bin.notification.new_rtp_stream",
              include: [:track_id, :ssrc, [:state, :id]]
            )
  def handle_notification(
        {:new_rtp_stream, ssrc, pt, _rtp_header_extensions},
        _from,
        _ctx,
        %State{endpoint_direction: :sendonly} = _state
      ) do
    raise("""
    Received new RTP stream but EndpointBin is set to :sendonly.
    RTP stream params: SSRC: #{inspect(ssrc)}, PT: #{inspect(pt)}"
    """)
  end

  @impl true
  @decorate trace("endpoint_bin.notification.new_rtp_stream",
              include: [:track_id, :ssrc, [:state, :id]]
            )
  def handle_notification(
        {:new_rtp_stream, ssrc, _pt, rtp_header_extensions},
        _from,
        _ctx,
        state
      ) do
    track_id = Map.get(state.ssrc_to_track_id, ssrc)

    track =
      if track_id do
        Map.fetch!(state.inbound_tracks, track_id)
      else
        # search in simulcast tracks
        simulcast_tracks =
          state.inbound_tracks
          |> Enum.filter(fn {inbound_track_id, _inbound_track} ->
            inbound_track_id in state.simulcast_track_ids
          end)
          |> Enum.map(fn {_simulcast_track_id, simulcast_track} -> simulcast_track end)

        Enum.find(simulcast_tracks, fn simulcast_track ->
          resolved_rtp_extensions =
            SDP.resolve_rtp_header_extensions(
              simulcast_track,
              rtp_header_extensions,
              state.extensions
            )

          if resolved_rtp_extensions.mid == <<>>,
            do: raise("No MID extension for RTP stream #{inspect(ssrc)}")

          simulcast_track.mid == resolved_rtp_extensions.mid
        end)
      end

    resolved_rtp_extensions =
      SDP.resolve_rtp_header_extensions(track, rtp_header_extensions, state.extensions)

    # this might be nil when track is not a simulcast one
    rid = Map.get(resolved_rtp_extensions, :rid)

    state =
      if track.ssrc == ssrc do
        # casual track
        state
      else
        # simulcast track
        track = %Track{
          track
          | ssrc: [ssrc | track.ssrc],
            rid_to_ssrc: Map.put(track.rid_to_ssrc, rid, ssrc)
        }

        put_in(state, [:inbound_tracks, track.id], track)
      end

    state = put_in(state, [:ssrc_to_track_id, ssrc], track.id)
    depayloading_filter = depayloading_filter_for(track)

    notification = {:new_track, track.id, rid, track.encoding, depayloading_filter}

    {{:ok, [{:notify, notification}]}, state}
  end

  @impl true
  @decorate trace("endpoint_bin.notification.handle_init_data",
              include: [[:state, :id]]
            )
  def handle_notification({:handshake_init_data, _component_id, fingerprint}, _from, _ctx, state) do
    {:ok, %{state | dtls_fingerprint: {:sha256, hex_dump(fingerprint)}}}
  end

  @impl true
  @decorate trace("endpoint_bin.notification.local_credentials",
              include: [[:state, :ice, :first?], [:state, :id]]
            )
  def handle_notification({:local_credentials, credentials}, _from, _ctx, state) do
    [ice_ufrag, ice_pwd] = String.split(credentials, " ")

    {actions, state} =
      if state.ice.first? and state.outbound_tracks == %{} do
        {[], state}
      else
        state = Map.update!(state, :ice, &%{&1 | first?: false})
        get_offer_data(state)
      end

    state = %{state | ice: %{state.ice | ufrag: ice_ufrag, pwd: ice_pwd}}
    {{:ok, actions}, state}
  end

  @impl true
  @decorate trace("endpoint_bin.notification.new_candidate_full",
              include: [[:state, :id]]
            )
  def handle_notification({:new_candidate_full, cand}, _from, _ctx, state) do
    state = Map.update!(state, :candidates, &[cand | &1])
    {{:ok, notify_candidates([cand])}, state}
  end

  @impl true
  @decorate trace("endpoint_bin.notification.candidate_gathering_done",
              include: [[:state, :id]]
            )
  def handle_notification(:candidate_gathering_done, _from, _ctx, state) do
    {:ok, %{state | candidate_gathering_state: :done}}
  end

  @impl true
  @decorate trace("endpoint_bin.notification.vad", include: [[:state, :id]])
  def handle_notification({:vad, _val} = msg, _from, _ctx, state) do
    {{:ok, notify: msg}, state}
  end

  @impl true
  @decorate trace("endpoint_bin.notification.connection_failed",
              include: [[:state, :id]]
            )
  def handle_notification({:connection_failed, _stream_id, _component_id}, _from, _ctx, state) do
    state = %{state | ice: %{state.ice | restarting?: false}}
    {action, state} = maybe_restart_ice(state, true)
    {{:ok, action}, state}
  end

  @impl true
  @decorate trace("endpoint_bin.notification.connection_ready",
              include: [[:state, :ice, :restarting?], [:state, :id]]
            )
  def handle_notification({:connection_ready, _stream_id, _component_id}, _from, _ctx, state)
      when state.ice.restarting? do
    outbound_tracks = Map.values(state.outbound_tracks) |> Enum.filter(&(&1.status != :pending))

    new_outbound_tracks =
      outbound_tracks
      |> Enum.filter(&(&1.status === :ready))

    negotiations = [notify: {:negotiation_done, new_outbound_tracks}]

    state = %{state | outbound_tracks: change_tracks_status(state, :ready, :linked)}

    state = %{state | ice: %{state.ice | restarting?: false}}

    {restart_action, state} = maybe_restart_ice(state)

    actions = negotiations ++ restart_action

    {{:ok, actions}, state}
  end

  @impl true
  @decorate trace("endpoint_bin.notification.connection_ready",
              include: [[:state, :ice, :restarting?], [:state, :id]]
            )
  def handle_notification({:connection_ready, _stream_id, _component_id}, _from, _ctx, state),
    do: {:ok, state}

  @impl true
  @decorate trace("endpoint_bin.notification.integrated_turn_servers",
              include: [[:state, :id]]
            )
  def handle_notification({:udp_integrated_turn, turn}, _from, _ctx, state) do
    state = %{state | integrated_turn_servers: [turn] ++ state.integrated_turn_servers}
    {:ok, state}
  end

  @impl true
  @decorate trace("endpoint_bin.notification.ignored_notification",
              include: [[:state, :id]]
            )
  def handle_notification(_notification, _from, _ctx, state), do: {:ok, state}

  @impl true
  @decorate trace("endpoint_bin.other.sdp_offer", include: [[:state, :id]])
  def handle_other({:signal, {:sdp_offer, sdp, mid_to_track_id}}, _ctx, state) do
    {:ok, sdp} = sdp |> ExSDP.parse()

    {new_inbound_tracks, removed_inbound_tracks, inbound_tracks, outbound_tracks} =
      get_tracks_from_sdp(sdp, mid_to_track_id, state)

    state =
      removed_inbound_tracks
      |> Enum.map(fn track -> track.id end)
      |> then(fn removed_inbound_track_ids ->
        update_in(state, [:simulcast_track_ids], &(&1 -- removed_inbound_track_ids))
      end)

    state = %{
      state
      | outbound_tracks: Map.merge(state.outbound_tracks, Map.new(outbound_tracks, &{&1.id, &1})),
        inbound_tracks: Map.merge(state.inbound_tracks, Map.new(inbound_tracks, &{&1.id, &1}))
    }

    {link_notify, state} = add_inbound_tracks(new_inbound_tracks, state)

    answer =
      SDP.create_answer(
        inbound_tracks: Map.values(state.inbound_tracks),
        outbound_tracks: Map.values(state.outbound_tracks),
        ice_ufrag: state.ice.ufrag,
        ice_pwd: state.ice.pwd,
        fingerprint: state.dtls_fingerprint,
        extensions: state.extensions,
        ice_lite?: @ice_lite
      )

    {actions, state} =
      withl tracks_check: true <- state.inbound_tracks != %{} or state.outbound_tracks != %{},
            candidate_gathering_check: nil <- state.candidate_gathering_state do
        {[forward: [ice: :gather_candidates]], %{state | candidate_gathering_state: :in_progress}}
      else
        tracks_check: _ -> {[], state}
        candidate_gathering_check: _ -> {notify_candidates(state.candidates), state}
      end

    inbound_tracks = SDP.filter_simulcast_tracks(inbound_tracks)
    mid_to_track_id = Map.new(inbound_tracks ++ outbound_tracks, &{&1.mid, &1.id})

    actions =
      if Enum.empty?(removed_inbound_tracks),
        do: actions,
        else: actions ++ [notify: {:removed_tracks, removed_inbound_tracks}]

    actions =
      link_notify ++
        [notify: {:signal, {:sdp_answer, to_string(answer), mid_to_track_id}}] ++
        set_remote_credentials(sdp) ++
        actions

    {{:ok, actions}, state}
  end

  @impl true
  @decorate trace("endpoint_bin.other.candidate", include: [[:state, :id]])
  def handle_other({:signal, {:candidate, candidate}}, _ctx, state) do
    {{:ok, forward: {:ice, {:set_remote_candidate, "a=" <> candidate, 1}}}, state}
  end

  @impl true
  @decorate trace("endpoint_bin.other.renegotiate_tracks", include: [[:state, :id]])
  def handle_other({:signal, :renegotiate_tracks}, _ctx, state) do
    {action, state} =
      cond do
        state.ice.first? and state.ice.pwd != nil ->
          state = Map.update!(state, :ice, &%{&1 | first?: false})
          get_offer_data(state)

        state.ice.first? ->
          state = Map.update!(state, :ice, &%{&1 | first?: false})
          {[], state}

        state.ice.pwd == nil ->
          {[], state}

        true ->
          maybe_restart_ice(state, true)
      end

    {{:ok, action}, state}
  end

  @impl true
  @decorate trace("endpoint_bin.other.add_tracks",
              include: [[:state, :component_path], [:state, :id]]
            )
  def handle_other({:add_tracks, tracks}, _ctx, state) do
    outbound_tracks = state.outbound_tracks

    tracks =
      Enum.map(tracks, fn track ->
        if Map.has_key?(outbound_tracks, track.id),
          do: track,
          else: %{track | status: :pending, mid: nil}
      end)

    state = add_tracks(state, :outbound_tracks, tracks)

    {action, state} =
      cond do
        state.ice.first? and state.ice.pwd != nil ->
          state = Map.update!(state, :ice, &%{&1 | first?: false})
          outbound_tracks = change_tracks_status(state, :pending, :ready)
          state = %{state | outbound_tracks: outbound_tracks}
          get_offer_data(state)

        state.ice.first? and state.ice.pwd == nil ->
          outbound_tracks = change_tracks_status(state, :pending, :ready)
          state = %{state | outbound_tracks: outbound_tracks}
          {[], update_in(state, [:ice, :first?], fn _old_value -> false end)}

        true ->
          maybe_restart_ice(state, true)
      end

    {{:ok, action}, state}
  end

  @impl true
  @decorate trace("endpoint_bin.other.remove_tracks", include: [[:state, :id]])
  def handle_other({:remove_tracks, tracks_to_remove}, _ctx, state) do
    outbound_tracks = state.outbound_tracks

    new_outbound_tracks =
      Enum.map(tracks_to_remove, &Map.get(outbound_tracks, &1.id))
      |> Map.new(fn track -> {track.id, %{track | status: :disabled}} end)

    {actions, state} =
      state
      |> Map.update!(:outbound_tracks, &Map.merge(&1, new_outbound_tracks))
      |> maybe_restart_ice(true)

    {{:ok, actions}, state}
  end

  defp maybe_restart_ice(state, set_waiting_restart? \\ false) do
    state =
      if set_waiting_restart?,
        do: %{state | ice: %{state.ice | waiting_restart?: true}},
        else: state

    if not state.ice.restarting? and state.ice.waiting_restart? do
      state = %{state | ice: %{state.ice | restarting?: true, waiting_restart?: false}}
      outbound_tracks = change_tracks_status(state, :pending, :ready)
      state = %{state | outbound_tracks: outbound_tracks}

      {[forward: {:ice, :restart_stream}], state}
    else
      {[], state}
    end
  end

  defp get_offer_data(state) do
    tracks_types =
      Map.values(state.outbound_tracks)
      |> Enum.filter(&(&1.status != :pending))
      |> Enum.map(& &1.type)

    media_count = %{
      audio: Enum.count(tracks_types, &(&1 == :audio)),
      video: Enum.count(tracks_types, &(&1 == :video))
    }

    actions = [notify: {:signal, {:offer_data, media_count, state.integrated_turn_servers}}]
    state = Map.update!(state, :ice, &%{&1 | restarting?: true})

    {actions, state}
  end

  defp change_tracks_status(state, prev_status, new_status) do
    state.outbound_tracks
    |> Map.values()
    |> Map.new(fn track ->
      {track.id, if(track.status === prev_status, do: %{track | status: new_status}, else: track)}
    end)
  end

  defp get_tracks_from_sdp(sdp, mid_to_track_id, state) do
    old_inbound_tracks = Map.values(state.inbound_tracks)

    outbound_tracks = Map.values(state.outbound_tracks) |> Enum.filter(&(&1.status != :pending))

    constraints = %Track.Constraints{
      codecs_filter: state.filter_codecs,
      enabled_extensions: state.extensions,
      simulcast?: state.simulcast?,
      endpoint_direction: state.endpoint_direction
    }

    SDP.get_tracks(sdp, constraints, old_inbound_tracks, outbound_tracks, mid_to_track_id)
  end

  defp add_inbound_tracks([], state) do
    {[], state}
  end

  defp add_inbound_tracks(new_tracks, state) do
    new_track_id_to_track = Map.new(new_tracks, &{&1.id, &1})
    state = Map.update!(state, :inbound_tracks, &Map.merge(&1, new_track_id_to_track))

    {new_simulcast_tracks, new_casual_tracks} = Enum.split_with(new_tracks, &(&1.ssrc == []))

    new_ssrc_to_track_id =
      Enum.into(new_casual_tracks, %{}, fn track -> {track.ssrc, track.id} end)

    new_simulcast_track_ids = Enum.map(new_simulcast_tracks, fn track -> track.id end)

    state =
      state
      |> Map.update!(:ssrc_to_track_id, &Map.merge(&1, new_ssrc_to_track_id))
      |> Map.update!(:simulcast_track_ids, &(&1 ++ new_simulcast_track_ids))

    actions = if Enum.empty?(new_tracks), do: [], else: [notify: {:new_tracks, new_tracks}]
    {actions, state}
  end

  defp add_tracks(state, _tracks_type, []), do: state

  defp add_tracks(state, tracks_type, tracks) do
    cond do
      tracks_type == :inbound_tracks and state.endpoint_direction == :sendonly ->
        raise("""
        Cannot add inbound tracks when EndpointBin is set to #{inspect(state.endpoint_direction)}.
        You can add inbound tracks only when EndpointBin is set to :recvonly or :sendrecv.
        """)

      tracks_type == :outbound_tracks and state.endpoint_direction == :recvonly ->
        raise("""
        Cannot add outbound tracks when EndpointBin is set to #{inspect(state.endpoint_direction)}.
        You can add outbound tracks only when EndpointBin is set to :sendonly or :sendrecv.
        """)

      true ->
        :ok
    end

    tracks =
      case tracks_type do
        :outbound_tracks ->
          Track.add_ssrc(
            tracks,
            Map.values(state.inbound_tracks) ++ Map.values(state.outbound_tracks)
          )

        :inbound_tracks ->
          tracks
      end

    tracks = Map.new(tracks, &{&1.id, &1})
    Map.update!(state, tracks_type, &Map.merge(&1, tracks))
  end

  defp notify_candidates(candidates) do
    Enum.flat_map(candidates, fn cand ->
      [notify: {:signal, {:candidate, cand, 0}}]
    end)
  end

  defp set_remote_credentials(sdp) do
    case List.first(sdp.media) do
      nil ->
        []

      media ->
        {_key, ice_ufrag} = Media.get_attribute(media, :ice_ufrag)
        {_key, ice_pwd} = Media.get_attribute(media, :ice_pwd)
        remote_credentials = ice_ufrag <> " " <> ice_pwd
        [forward: {:ice, {:set_remote_credentials, remote_credentials}}]
    end
  end

  defp hex_dump(digest_str) do
    digest_str
    |> :binary.bin_to_list()
    |> Enum.map_join(":", &Base.encode16(<<&1>>))
  end

  defp depayloading_filter_for(track) do
    case Membrane.RTP.PayloadFormatResolver.depayloader(track.encoding) do
      {:ok, depayloader} ->
        %Membrane.RTP.DepayloaderBin{
          depayloader: depayloader,
          clock_rate: track.rtp_mapping.clock_rate
        }

      :error ->
        nil
    end
  end

  defp create_or_join_otel_context(opts, trace_metadata) do
    case opts.trace_context do
      [] ->
        root_span = Tracer.start_span("endpoint_bin")
        parent_ctx = Tracer.set_current_span(root_span)
        otel_ctx = OpenTelemetry.Ctx.attach(parent_ctx)
        OpenTelemetry.Span.set_attributes(root_span, trace_metadata)
        OpenTelemetry.Span.end_span(root_span)
        OpenTelemetry.Ctx.attach(otel_ctx)
        [otel_ctx]

      [ctx | _] ->
        OpenTelemetry.Ctx.attach(ctx)
        [ctx]

      ctx ->
        OpenTelemetry.Ctx.attach(ctx)
        root_span = Tracer.start_span("endpoint_bin")
        parent_ctx = Tracer.set_current_span(root_span)
        otel_ctx = OpenTelemetry.Ctx.attach(parent_ctx)
        OpenTelemetry.Span.set_attributes(root_span, trace_metadata)
        OpenTelemetry.Span.end_span(root_span)
        OpenTelemetry.Ctx.attach(otel_ctx)
        otel_ctx
    end
  end
end