lib/membrane_rtc_engine/engine.ex

defmodule Membrane.RTC.Engine do
  @moduledoc """
  RTC Engine implementation.

  RTC Engine is an abstraction layer responsible for linking together different types of `Endpoints`.
  From the implementation point of view, RTC Engine is a `Membrane.Pipeline`.

  ## Messages

  The RTC Engine works by sending messages which notify user logic about important events.
  To receive RTC Engine messages you have to register your process so that RTC Engine will
  know where to send them.
  All messages RTC Engine can emit are described in `#{inspect(__MODULE__)}.Message` docs.

  ### Registering for messages

  Registration can be done using `register/2` e.g.

  ```elixir
  Engine.register(rtc_engine, self())
  ```

  This will register your process to receive RTC Engine messages.
  You can register multiple processes to receive messages from an RTC Engine instance.
  In such a case each message will be sent to each registered process.

  ## Peers

  > ### Deprecation notice {: .warning }
  >
  > **Peers are deprecated as of version 0.10.0 and will be removed in the future**
  >
  > While Peers are still present in the RTC Engine, it's not recommended to use
  > this feature in new applications.
  >
  > Existing applications should take steps to move away from using built-in concept of peers.

  Each peer represents some user that can possess some metadata.
  In RTC Engine, each peer is represented by an endpoint and from the purposes of the Engine,
  it is an enpdoint that we happen to store additional information about, metadata in particular.
  The peer doesn't exist without their endpoint.

  ### Adding a peer
  The only way to add a peer to the RTC Engine is to assign a `peer_id` to the endpoint representing it.
  This is done when adding an endpoint to the Engine by passing a `peer_id` option.

  **Example**
  ```elixir
  :ok = Engine.add_endpoint(webrtc_endpoint, peer_id: "Peer1")
  ```

  Each peer then needs to declare itself as ready before being fully connected to RTC Engine.

  ### Readiness state
  Each peer endpoint is presumed to be initially inactive and has to declare itself ready to fully join the Engine.

  Before it does, it:
  * will not receive notifications about other peers and their metadata
  * will not receive information about tracks
  * will not be able to publish any tracks
  * will not be able to update their metadata

  When declaring itself as ready, the peer also has an opportunity to set their metadata.
  To mark the peer as active, their endpoint has to send the `t:ready_action_t/0`.

  **Example**
  ```elixir
  @impl true
  def handle_other({:media_event, %{type: "join", metadata: metadata}}, _context, state) do
    {{:ok, notify: {:ready, metadata}}, state}
  end

  @impl true
  def handle_other(:ready, _context, state) do
    Membrane.Logger.debug("Succesfully activated the peer")
    {:ok, state}
  end
  ```

  ## Endpoints

  `Endpoints` are `Membrane.Bin`s able to publish their own tracks and subscribe to tracks from other Endpoints.
  One can think about Endpoint as an entity responsible for handling some specific task.
  An Endpoint can be added and removed using `add_endpoint/3` and `remove_endpoint/2` respectively.

  There are two types of Endpoints:
  * Standalone Endpoints - they are in most cases spawned only once per RTC Engine instance and they are not associated with any peer.
  * Peer Endpoints - they are associated with some peer.
  Associating Endpoint with Peer will cause RTC Engine to send some Media Events to the Enpoint's Client Library
  e.g. one which indicates which tracks belong to which peer.

  Currently RTC Engine ships with the implementation of two Endpoints:
  * `#{inspect(__MODULE__)}.Endpoint.WebRTC` which is responsible for establishing a connection with some WebRTC
  peer (mainly browser) and exchanging media with it. WebRTC Endpoint is a Peer Endpoint.
  * `#{inspect(__MODULE__)}.Endpoint.HLS` which is responsible for receiving media tracks from all other Endpoints and
  saving them to files by creating HLS playlists. HLS Endpoint is a Standalone Endpoint.

  User can also implement custom Endpoints, see Custom Endpoints guide.
  """

  use Membrane.Pipeline

  import Membrane.RTC.Utils

  require Membrane.Logger
  require Membrane.OpenTelemetry
  require Membrane.TelemetryMetrics

  alias Membrane.RTC.Engine.{
    DisplayManager,
    Endpoint,
    FilterTee,
    Message,
    Peer,
    Subscription,
    Tee,
    Track
  }

  alias Membrane.RTC.Engine.Notifications.TrackNotification

  alias Membrane.RTC.Engine.Exception.{PublishTrackError, TrackReadyError}

  # `Membrane.Pipeline.call/3 currently has invalid typespec`
  @dialyzer {:nowarn_function, get_endpoints: 1}

  @registry_name Membrane.RTC.Engine.Registry.Dispatcher

  @life_span_id "rtc_engine.life_span"

  @typedoc """
  RTC Engine configuration options.

  * `id` is used by logger. If not provided it will be generated.
  * `trace_ctx` is used by OpenTelemetry. All traces from this engine will be attached to this context.
  Example function from which you can get Otel Context is `get_current/0` from `OpenTelemetry.Ctx`.
  * `display_manager?` - set to `true` if you want to limit number of tracks sent from `#{inspect(__MODULE__)}.Endpoint.WebRTC` to a browser.
  * `toilet_capacity` - sets capacity of buffer between engine and endpoints. Use it when you expect bursts of data for your tracks. If not provided it will be set to 200.
  """

  @type options_t() :: [
          id: String.t(),
          trace_ctx: map(),
          telemetry_label: Membrane.TelemetryMetrics.label(),
          display_manager?: boolean(),
          toilet_capacity: pos_integer() | nil
        ]

  defmodule State do
    @moduledoc false

    use Bunch.Access

    @enforce_keys [:id, :component_path, :trace_context, :telemetry_label]
    defstruct @enforce_keys ++
                [
                  peers: %{},
                  endpoints: %{},
                  pending_subscriptions: [],
                  pending_peers: %{},
                  subscriptions: %{},
                  display_manager: nil,
                  toilet_capacity: 200
                ]

    @type t() :: %__MODULE__{
            id: String.t(),
            component_path: String.t(),
            trace_context: map(),
            telemetry_label: Membrane.TelemetryMetrics.label(),
            display_manager: pid() | nil,
            peers: %{Endpoint.id() => Peer.t()},
            endpoints: %{Endpoint.id() => Endpoint.t()},
            subscriptions: %{Endpoint.id() => %{Track.id() => Subscription.t()}},
            pending_subscriptions: [Subscription.t()],
            pending_peers: %{Endpoint.id() => %{peer: Peer.t(), endpoint: Endpoint.t()}},
            toilet_capacity: pos_integer()
          }
  end

  @typedoc """
  Endpoint configuration options.

  * `peer_id` - associate endpoint with existing peer
  * `endpoint_id` - assign endpoint id. If not provided it will be generated by RTC Engine. This option cannot be used together with `peer_id`.
  Endpoints associated with peers have the id `peer_id`.
  * `node` - node on which endpoint should be spawned. If not provided, current node is used.
  """
  @type endpoint_options_t() :: [
          endpoint_id: String.t(),
          peer_id: String.t(),
          node: node()
        ]

  @typedoc """
  Subscription options.
  """
  @type subscription_opts_t() :: Keyword.t()

  @typedoc """
  Membrane action that will cause RTC Engine to publish some message to all other endpoints.
  """
  @type publish_action_t() :: {:notify_parent, {:publish, publish_message_t()}}

  @typedoc """
  Membrane action that will mark the peer endpoint as ready and set its metadata.
  The Engine will respond with `t:ready_ack_msg_t/0` to acknowledge your transition to ready state.

  This action can only be used once, any further calls by an endpoint will be ignored.
  It will also be ignored for non-peer endpoints.
  """
  @type ready_action_t() :: {:notify, {:ready, metadata :: any()}}

  @typedoc """
  A message that the Engine sends to the endpoint when it ackowledges its `t:ready_action_t/0`
  """
  @type ready_ack_msg_t() ::
          {:ready,
           peers_in_room :: [
             %{
               id: Peer.id(),
               metadata: any(),
               trackIdToMetadata: %{Track.id() => any()}
             }
           ]}

  @typedoc """
  Membrane action that will cause RTC Engine to forward supplied message to the business logic.
  """
  @type forward_to_parent_action_t() :: {:notify_parent, {:forward_to_parent, message :: any()}}

  @typedoc """
  Membrane action that will inform RTC Engine about track readiness.
  """
  @type track_ready_action_t() ::
          {:notify_parent, {:track_ready, Track.id(), Track.encoding(), Track.variant()}}

  @typedoc """
  Types of messages that can be published to other Endpoints.
  """
  @type publish_message_t() ::
          {:new_tracks, [Track.t()]}
          | {:removed_tracks, [Track.t()]}
          | {:track_metadata_updated, metadata :: any()}
          | {:peer_metadata_updated, metadata :: any()}
          | {:tracks_priority, tracks :: list()}
          | TrackNotification.t()

  @typedoc """
  Type of messages that need to be handled by each endpoint.
  """
  @type published_message_t() ::
          {:new_tracks, [Track.t()]}
          | {:removed_tracks, [Track.t()]}
          | {:new_peer, Peer.t()}
          | {:peer_left, Peer.id()}
          | {:track_metadata_updated, Track.t()}
          | {:peer_metadata_updated, Peer.t()}
          | {:tracks_priority, tracks :: list()}
          | ready_ack_msg_t()
          | TrackNotification.t()

  @spec start(options :: options_t(), process_options :: GenServer.options()) ::
          GenServer.on_start()
  def start(options, process_options) do
    do_start(:start, options, process_options)
  end

  @spec start_link(options :: options_t(), process_options :: GenServer.options()) ::
          GenServer.on_start()
  def start_link(options, process_options) do
    do_start(:start_link, options, process_options)
  end

  defp do_start(func, options, process_options) when func in [:start, :start_link] do
    id = options[:id] || "#{UUID.uuid4()}"
    display_manager? = options[:display_manager?] || false
    options = Keyword.put(options, :id, id)
    options = Keyword.put(options, :display_manager?, display_manager?)
    Membrane.Logger.info("Starting a new RTC Engine instance with id: #{id}")

    with {:ok, _supervisor, pipeline} <-
           apply(Membrane.Pipeline, func, [__MODULE__, options, process_options]) do
      {:ok, pipeline}
    end
  end

  @spec get_registry_name() :: atom()
  def get_registry_name(), do: @registry_name

  @doc """
  Adds endpoint to the RTC Engine

  Returns `:error` when there are both `peer_id` and `endpoint_id` specified in `opts`.
  For more information refer to `t:endpoint_options_t/0`.
  """
  @spec add_endpoint(
          pid :: pid(),
          endpoint :: Membrane.ChildrenSpec.child_definition_t(),
          opts :: endpoint_options_t()
        ) :: :ok | :error
  def add_endpoint(pid, endpoint, opts \\ []) do
    if Keyword.has_key?(opts, :endpoint_id) and Keyword.has_key?(opts, :peer_id) do
      raise "You can't pass both option endpoint_id and peer_id"
    end

    send(pid, {:add_endpoint, endpoint, opts})
    :ok
  end

  @doc """
  Removes endpoint from the RTC Engine
  """
  @spec remove_endpoint(
          pid :: pid(),
          id :: String.t()
        ) :: :ok
  def remove_endpoint(rtc_engine, id) do
    send(rtc_engine, {:remove_endpoint, id})
    :ok
  end

  @doc """
  Registers process with pid `who` for receiving messages from RTC Engine
  """
  @spec register(rtc_engine :: pid(), who :: pid()) :: :ok
  def register(rtc_engine, who \\ self()) do
    send(rtc_engine, {:register, who})
    :ok
  end

  @doc """
  Unregisters process with pid `who` from receiving messages from RTC Engine
  """
  @spec unregister(rtc_engine :: pid(), who :: pid()) :: :ok
  def unregister(rtc_engine, who \\ self()) do
    send(rtc_engine, {:unregister, who})
    :ok
  end

  @doc """
  Sends message to RTC Engine endpoint.
  If endpoint doesn't exist message is ignored
  """
  @spec message_endpoint(rtc_engine :: pid(), endpoint_id :: String.t(), message :: any()) ::
          :ok
  def message_endpoint(rtc_engine, endpoint_id, message) do
    send(rtc_engine, {:message_endpoint, {:endpoint, endpoint_id}, message})
    :ok
  end

  @doc """
  Returns list of the RTC Engine's endpoints.
  """
  @spec get_endpoints(rtc_engine :: pid()) :: [%{id: String.t(), type: atom()}]
  def get_endpoints(rtc_engine) do
    Pipeline.call(rtc_engine, :get_endpoints)
  end

  @doc """
  Subscribes an endpoint for a track.

  The endpoint will be notified about track readiness in `c:Membrane.Bin.handle_pad_added/3` callback.
  `endpoint_id` is the id of the endpoint, which wants to subscribe to the track.
  """
  @spec subscribe(
          rtc_engine :: pid(),
          endpoint_id :: String.t(),
          track_id :: Track.id(),
          opts :: subscription_opts_t
        ) :: :ok | {:error, :timeout | :invalid_track_id}
  def subscribe(rtc_engine, endpoint_id, track_id, opts \\ []) do
    ref = make_ref()
    send(rtc_engine, {:subscribe, {self(), ref}, endpoint_id, track_id, opts})

    receive do
      {^ref, :ok} -> :ok
      {^ref, {:error, reason}} -> {:error, reason}
    after
      5_000 -> {:error, :timeout}
    end
  end

  @impl true
  def handle_init(_ctx, options) do
    Logger.metadata(rtc_engine: options[:id])

    if Keyword.has_key?(options, :trace_ctx),
      do: Membrane.OpenTelemetry.attach(options[:trace_ctx])

    start_span_opts =
      case options[:parent_span] do
        nil -> []
        parent_span -> [parent_span: parent_span]
      end

    Membrane.OpenTelemetry.start_span(@life_span_id, start_span_opts)

    display_manager =
      if options[:display_manager?] do
        {:ok, pid} = DisplayManager.start_link(ets_name: options[:id], engine: self())
        pid
      else
        nil
      end

    telemetry_label = (options[:telemetry_label] || []) ++ [room_id: options[:id]]

    toilet_capacity = options[:toilet_capacity] || 200
    if toilet_capacity < 0, do: raise("toilet_capacity has to be a positive integer")

    {[playback: :playing],
     %State{
       id: options[:id],
       component_path: Membrane.ComponentPath.get_formatted(),
       trace_context: options[:trace_ctx],
       telemetry_label: telemetry_label,
       display_manager: display_manager,
       toilet_capacity: toilet_capacity
     }}
  end

  @impl true
  def handle_terminate_request(ctx, state) do
    {actions, state} =
      Enum.flat_map_reduce(state.endpoints, state, fn {endpoint_id, _endpoint}, state ->
        {_status, actions, state} = handle_remove_endpoint(endpoint_id, ctx, state)
        {actions, state}
      end)

    {actions ++ [terminate: :normal], state}
  end

  @impl true
  def handle_info({:add_endpoint, endpoint, opts}, _ctx, state) do
    peer_id = opts[:peer_id]
    endpoint_id = opts[:endpoint_id] || opts[:peer_id]

    endpoint =
      case endpoint do
        %Endpoint.WebRTC{} ->
          %Endpoint.WebRTC{
            endpoint
            | telemetry_label: state.telemetry_label ++ [peer_id: peer_id],
              parent_span: Membrane.OpenTelemetry.get_span(@life_span_id),
              trace_context: state.trace_context
          }

        another_endpoint ->
          another_endpoint
      end

    if Map.has_key?(state.endpoints, endpoint_id) do
      Membrane.Logger.warn(
        "Cannot add Endpoint with id #{inspect(endpoint_id)} as it already exists"
      )

      {[], state}
    else
      handle_add_endpoint(endpoint, opts, state)
    end
  end

  @impl true
  def handle_info({:remove_endpoint, id}, ctx, state) do
    case handle_remove_endpoint(id, ctx, state) do
      {:absent, [], state} ->
        Membrane.Logger.info("Endpoint #{inspect(id)} already removed")
        {[], state}

      {:present, actions, state} ->
        {actions, state}
    end
  end

  @impl true
  def handle_info({:register, pid}, _ctx, state) do
    Registry.register(get_registry_name(), self(), pid)
    {[], state}
  end

  @impl true
  def handle_info({:unregister, pid}, _ctx, state) do
    Registry.unregister_match(get_registry_name(), self(), pid)
    {[], state}
  end

  def handle_info(
        {:subscribe, {endpoint_pid, ref}, endpoint_id, track_id, opts},
        ctx,
        state
      ) do
    subscription = %Subscription{
      endpoint_id: endpoint_id,
      track_id: track_id,
      opts: opts
    }

    case validate_subscription(subscription, state) do
      :ok ->
        {spec, state} = fulfill_or_postpone_subscription(subscription, ctx, state)
        send(endpoint_pid, {ref, :ok})
        {[spec: {spec, log_metadata: [rtc: state.id]}], state}

      {:error, _reason} = error ->
        send(endpoint_pid, {ref, error})
        {[], state}
    end
  end

  @impl true
  def handle_info({:track_priorities, endpoint_to_tracks}, ctx, state) do
    endpoint_msg_actions =
      for {endpoint, tracks} <- endpoint_to_tracks do
        {:notify_child, {endpoint, {:tracks_priority, tracks}}}
      end

    tee_actions =
      ctx
      |> filter_children(pattern: {:tee, _tee_name})
      |> Enum.flat_map(&[notify_child: {&1, :track_priorities_updated}])

    {endpoint_msg_actions ++ tee_actions, state}
  end

  @impl true
  def handle_info({:message_endpoint, endpoint, message}, ctx, state) do
    actions =
      if find_child(ctx, pattern: ^endpoint) != nil,
        do: [notify_child: {endpoint, message}],
        else: []

    {actions, state}
  end

  @impl true
  def handle_call(:get_endpoints, ctx, state) do
    endpoints =
      ctx.children
      |> Map.values()
      |> Enum.map(fn endpoint ->
        {:endpoint, id} = endpoint.name
        %{id: id, type: endpoint.module}
      end)

    {[reply: endpoints], state}
  end

  @impl true
  def handle_child_notification(notification, {:endpoint, endpoint_id}, ctx, state) do
    if Map.has_key?(state.endpoints, endpoint_id) or
         Map.has_key?(state.pending_peers, endpoint_id) do
      handle_endpoint_notification(notification, endpoint_id, ctx, state)
    else
      {[], state}
    end
  end

  @impl true
  def handle_crash_group_down(endpoint_id, ctx, state) do
    dispatch(%Message.EndpointCrashed{endpoint_id: endpoint_id})
    {_status, actions, state} = handle_remove_endpoint(endpoint_id, ctx, state)
    {actions, state}
  end

  #
  # Endpoint Notifications
  #
  # - handle_endpoint_notification/4: Handles incoming notifications from an Endpoint, usually
  #   the WebRTC endpoint. Handles track_ready, publication of new tracks, and publication of
  #   removed tracks. Also forwards custom media events.
  #

  defp handle_endpoint_notification({:ready, metadata}, endpoint_id, _ctx, state) do
    if Map.has_key?(state.pending_peers, endpoint_id) do
      {%{peer: peer, endpoint: endpoint}, state} = pop_in(state, [:pending_peers, endpoint_id])
      peer = %{peer | metadata: metadata}

      peers_in_room =
        state.peers
        |> Map.values()
        |> Enum.map(fn peer ->
          track_id_to_metadata = Endpoint.get_active_track_metadata(state.endpoints[peer.id])

          peer
          |> Map.from_struct()
          |> Map.put(:trackIdToMetadata, track_id_to_metadata)
        end)

      new_peer_notifications =
        state.endpoints
        |> Map.keys()
        |> Enum.map(&{:notify_child, {{:endpoint, &1}, {:new_peer, peer}}})

      actions =
        [
          notify_child: {{:endpoint, endpoint_id}, {:ready, peers_in_room}},
          notify_child:
            {{:endpoint, endpoint_id}, {:new_tracks, get_active_tracks(state.endpoints)}}
        ] ++ new_peer_notifications

      state =
        state
        |> put_in([:peers, endpoint_id], peer)
        |> put_in([:endpoints, endpoint_id], endpoint)
        |> put_in([:subscriptions, endpoint_id], %{})

      {actions, state}
    else
      Membrane.Logger.warn(
        "Endpoint #{endpoint_id} sent a `:ready` message even though it's not a peer endpoint. Ignoring."
      )

      {[], state}
    end
  end

  defp handle_endpoint_notification({:forward_to_parent, message}, endpoint_id, _ctx, state) do
    dispatch(%Message.EndpointMessage{endpoint_id: endpoint_id, message: message})
    {[], state}
  end

  defp handle_endpoint_notification({:update_peer_metadata, metadata}, peer_id, _ctx, state) do
    peer = Map.get(state.peers, peer_id)

    if peer.metadata != metadata do
      updated_peer = %{peer | metadata: metadata}
      state = put_in(state, [:peers, peer_id], updated_peer)

      actions =
        state.endpoints
        |> Map.keys()
        |> Enum.map(&{:notify_child, {{:endpoint, &1}, {:peer_metadata_updated, updated_peer}}})

      {actions, state}
    else
      {[], state}
    end
  end

  defp handle_endpoint_notification(
         {:update_track_metadata, track_id, track_metadata},
         endpoint_id,
         _ctx,
         state
       ) do
    if Map.has_key?(state.endpoints, endpoint_id) do
      endpoint = Map.get(state.endpoints, endpoint_id)
      track = Endpoint.get_track_by_id(endpoint, track_id)

      if track != nil and track.metadata != track_metadata do
        endpoint = Endpoint.update_track_metadata(endpoint, track_id, track_metadata)
        state = put_in(state, [:endpoints, endpoint_id], endpoint)

        actions =
          state.subscriptions
          |> Map.values()
          |> Enum.flat_map(&Map.values/1)
          |> then(&(&1 ++ state.pending_subscriptions))
          |> Enum.filter(&(&1.track_id == track_id))
          |> Enum.map(& &1.endpoint_id)
          |> Enum.map(
            &{:notify_child,
             {{:endpoint, &1},
              {:track_metadata_updated, Endpoint.get_track_by_id(endpoint, track_id)}}}
          )

        {actions, state}
      else
        {[], state}
      end
    else
      {[], state}
    end
  end

  defp handle_endpoint_notification(
         {:publish, %TrackNotification{track_id: track_id} = notification},
         endpoint_id,
         _ctx,
         state
       ) do
    subscribed_endpoints =
      state.subscriptions
      |> Map.values()
      |> Enum.flat_map(&Map.values/1)
      |> Enum.filter(&(&1.track_id == track_id))
      |> Enum.map(& &1.endpoint_id)

    message_actions =
      Enum.map(subscribed_endpoints, &{:notify_child, {{:endpoint, &1}, notification}})

    if Map.has_key?(state.endpoints[endpoint_id].inbound_tracks, track_id) do
      {message_actions, state}
    else
      Membrane.Logger.error("""
      Non-owner attempted to send a notification about the track. It is being ignored.
      Offending endpoint: #{inspect(endpoint_id)}
      TrackId: #{track_id}
      notification: #{inspect(notification)}
      """)
    end
  end

  defp handle_endpoint_notification(
         {:track_ready, track_id, variant, encoding},
         endpoint_id,
         _ctx,
         state
       ) do
    Membrane.Logger.info(
      "New incoming #{encoding} track #{track_id} (variant: #{variant}) from endpoint #{inspect(endpoint_id)}"
    )

    track = get_in(state, [:endpoints, endpoint_id]) |> Endpoint.get_track_by_id(track_id)

    if variant not in track.variants do
      raise TrackReadyError, track: track, variant: variant
    end

    track_link = build_track_link(variant, track, endpoint_id, state)

    # check if there are subscriptions for this track and fulfill them
    {subscriptions, pending_subscriptions} =
      Enum.split_with(state.pending_subscriptions, &(&1.track_id == track_id))

    {subscription_links, state} = fulfill_subscriptions(subscriptions, state)

    links = [track_link] ++ subscription_links
    state = %{state | pending_subscriptions: pending_subscriptions}

    state =
      update_in(
        state,
        [:endpoints, endpoint_id],
        &Endpoint.update_track_encoding(&1, track_id, encoding)
      )

    spec = {links, crash_group: {endpoint_id, :temporary}, log_metadata: [rtc_engine: state.id]}
    {[spec: spec], state}
  end

  defp handle_endpoint_notification(
         {:publish, {:new_tracks, tracks}},
         endpoint_id,
         _ctx,
         state
       ) do
    Enum.each(tracks, &validate_track(&1))

    id_to_track = Map.new(tracks, &{&1.id, &1})

    state =
      update_in(
        state,
        [:endpoints, endpoint_id, :inbound_tracks],
        &Map.merge(&1, id_to_track)
      )

    tracks_msgs = build_track_added_actions(tracks, endpoint_id, state)
    {tracks_msgs, state}
  end

  defp handle_endpoint_notification(
         {:publish, {:removed_tracks, tracks}},
         endpoint_id,
         ctx,
         state
       ) do
    id_to_track = Map.new(tracks, &{&1.id, &1})

    state =
      update_in(
        state,
        [:endpoints, endpoint_id, :inbound_tracks],
        &Map.merge(&1, id_to_track)
      )

    tracks_msgs = build_track_removed_actions(tracks, endpoint_id, state)
    track_ids = Enum.map(tracks, & &1.id)
    track_tees = tracks |> Enum.map(&get_track_tee(&1.id, ctx)) |> Enum.reject(&is_nil(&1))

    subscriptions =
      Map.new(state.subscriptions, fn {endpoint_id, subscriptions} ->
        subscriptions =
          subscriptions
          |> Enum.reject(fn {track_id, _data} -> Enum.member?(track_ids, track_id) end)
          |> Map.new()

        {endpoint_id, subscriptions}
      end)

    {tracks_msgs ++ [remove_child: track_tees], %{state | subscriptions: subscriptions}}
  end

  defp validate_track(track) do
    variants = MapSet.new(track.variants)
    supported_variants = Track.supported_variants() |> MapSet.new()

    cond do
      variants == MapSet.new([]) ->
        raise PublishTrackError, track: track

      track.type == :audio and not MapSet.equal?(variants, MapSet.new([:high])) ->
        raise PublishTrackError, track: track

      MapSet.subset?(variants, supported_variants) == false ->
        raise PublishTrackError, track: track

      true ->
        :ok
    end
  end

  #
  # Endpoint Management
  #
  # - handle_add_endpoint/3: Adds a new Endpoint based on the entry provided. Part of the
  #   implementation for the public API.
  #
  # - handle_remove_endpoint/3: Removes the given Endpoint. Part of the implementation for the
  #   public API. Also called when the peer leaves.
  #
  # - get_active_tracks/1: Helper function for add_endpoint/3. Returns a list of Tracks that can
  #   be provided to the newly added Endpoint straight away.
  #
  # - find_children_for_endpoint/2: Convenience function to identify all Elements owned by an
  #   Endpoint, via its Tracks.
  #
  # - get_track_tee/2: Convenience function to get tee for given track
  #

  defp handle_add_endpoint(endpoint_entry, opts, state) do
    endpoint_id = opts[:endpoint_id] || opts[:peer_id] || UUID.uuid4()
    endpoint_name = {:endpoint, endpoint_id}
    is_peer? = Keyword.has_key?(opts, :peer_id)

    spec = {
      child(endpoint_name, endpoint_entry),
      node: opts[:node],
      crash_group: {endpoint_id, :temporary},
      log_metadata: [rtc_engine: state.id]
    }

    display_manager_message =
      if state.display_manager != nil,
        do: [notify_child: {endpoint_name, {:display_manager, state.display_manager}}],
        else: []

    # Only inform about the tracks if we're not taking about a peer
    tracks_actions =
      if is_peer? do
        []
      else
        [notify_child: {endpoint_name, {:new_tracks, get_active_tracks(state.endpoints)}}]
      end

    actions = [spec: spec] ++ display_manager_message ++ tracks_actions

    endpoint = Endpoint.new(endpoint_id, [])

    state =
      if is_peer? do
        put_in(state, [:pending_peers, endpoint_id], %{
          peer: %Peer{id: endpoint_id, metadata: nil},
          endpoint: endpoint
        })
      else
        state
        |> put_in([:subscriptions, endpoint_id], %{})
        |> put_in([:endpoints, endpoint_id], endpoint)
      end

    {actions, state}
  end

  defp handle_remove_endpoint(endpoint_id, ctx, state) do
    cond do
      Map.has_key?(state.endpoints, endpoint_id) ->
        pending_subscriptions_fun = fn subscriptions ->
          Enum.filter(subscriptions, &(&1.endpoint_id != endpoint_id))
        end

        {endpoint, state} = pop_in(state, [:endpoints, endpoint_id])
        {_, state} = pop_in(state, [:subscriptions, endpoint_id])
        state = update_in(state, [:pending_subscriptions], pending_subscriptions_fun)

        tracks = Enum.map(Endpoint.get_tracks(endpoint), &%Track{&1 | active?: true})
        tracks_msgs = build_track_removed_actions(tracks, endpoint_id, state)
        endpoint_bin = ctx.children[{:endpoint, endpoint_id}]

        peer_left_msgs =
          if Map.has_key?(state.peers, endpoint_id) do
            state.endpoints
            |> Map.keys()
            |> Enum.map(&{:notify_child, {{:endpoint, &1}, {:peer_left, endpoint_id}}})
          else
            []
          end

        {_, state} = pop_in(state, [:peers, endpoint_id])

        if endpoint_bin == nil or endpoint_bin.terminating? do
          {:present, tracks_msgs ++ peer_left_msgs, state}
        else
          actions = [remove_child: find_children_for_endpoint(endpoint, ctx)]
          {:present, tracks_msgs ++ peer_left_msgs ++ actions, state}
        end

      Map.has_key?(state.pending_peers, endpoint_id) ->
        {_pending_peer, state} = pop_in(state, [:pending_peers, endpoint_id])
        endpoint_bin = ctx.children[{:endpoint, endpoint_id}]

        if endpoint_bin == nil or endpoint_bin.terminating? do
          {:present, [], state}
        else
          {:present, [remove_child: {:endpoint, endpoint_id}], state}
        end

      true ->
        {:absent, [], state}
    end
  end

  defp get_active_tracks(endpoints) do
    endpoints
    |> Map.values()
    |> Enum.flat_map(&Endpoint.get_tracks/1)
    |> Enum.filter(& &1.active?)
  end

  defp find_children_for_endpoint(endpoint, ctx) do
    children =
      endpoint
      |> Endpoint.get_tracks()
      |> Enum.map(&get_track_tee(&1.id, ctx))
      |> Enum.reject(&is_nil(&1))

    [endpoint: endpoint.id] ++ children
  end

  defp get_track_tee(track_id, ctx) do
    if Map.has_key?(ctx.children, {:tee, track_id}) do
      {:tee, track_id}
    end
  end

  #
  # Track Actions
  #
  # - build_track_added_actions/3: Called when new tracks were published by the WebRTC endpoint
  #   and the Engine has been notified. Notifies all other Endpoints of the new tracks.
  #
  # - build_track_removed_actions/3: Called when the underlying endpoint was removed (either
  #   normally, or due to crash).
  #

  defp build_track_added_actions(tracks, endpoint_id, state) do
    state.endpoints
    |> Map.delete(endpoint_id)
    |> Map.keys()
    |> Enum.flat_map(fn endpoint_id ->
      [notify_child: {{:endpoint, endpoint_id}, {:new_tracks, tracks}}]
    end)
  end

  defp build_track_removed_actions(tracks, from_endpoint_id, state) do
    state.endpoints
    |> Stream.reject(&(elem(&1, 0) == from_endpoint_id))
    |> Stream.reject(&is_nil(elem(&1, 1)))
    |> Enum.flat_map(fn {endpoint_id, _endpoint} ->
      subscriptions = state.subscriptions[endpoint_id]
      tracks = Enum.filter(tracks, &Map.has_key?(subscriptions, &1.id))
      [notify_child: {{:endpoint, endpoint_id}, {:remove_tracks, tracks}}]
    end)
  end

  #
  # Track Links
  #
  # - build_track_link/4 - called when the track is ready, via notification from the WebRTC
  #   endpoint. Creates the link from the endpoint which published the track, and starts the
  #   underlying tee which is required to bring the content of the track to all subscribers.
  #
  # - build_track_tee/4 - Called by build_track_link/4; builds the correct tee depending on
  #   display manager is turned on or off
  #

  defp build_track_link(variant, track, endpoint_id, state) do
    get_child({:endpoint, endpoint_id})
    |> via_out(Pad.ref(:output, {track.id, variant}))
    |> via_in(Pad.ref(:input, {track.id, variant}),
      toilet_capacity: state.toilet_capacity
    )
    |> child({:tee, track.id}, build_track_tee(track.id, variant, track, state),
      get_if_exists: true
    )
  end

  defp build_track_tee(track_id, _variant, track, %{display_manager: dm} = state)
       when dm != nil do
    %FilterTee{
      ets_name: state.id,
      track_id: track_id,
      type: track.type,
      codec: track.encoding
    }
  end

  defp build_track_tee(_track_id, _variant, track, _state) do
    %Tee{track: track}
  end

  #
  # Track Subscriptions
  #
  # - validate_subscription/2: Validates proposed subscription, called when a new subscription
  #   is to be added, via handle_info.
  #
  # - fulfill_or_postpone_subscription/3: Called immediately upon validation of subscription,
  #   optimistically links track's tee to the subscriber if the track is ready, otherwise adds the
  #   subscription to the list of pending subscriptions
  #
  # - fulfill_subscriptions/2: Called when a new track is ready and there are pending
  #   subscriptions to the track.
  #
  # - build_subscription_links/1, build_subscription_link/1: Called by fulfill_subscriptions/2,
  #   these functions build the actual links between the tee and the endpoint subscribing for the given track.
  #
  # - get_track/2: Convenience function. Searches for a Track with the given Track ID which is
  #   owned by one of the Endpoints in the list.
  #

  defp validate_subscription(subscription, state) do
    # checks whether subscription is correct
    track = get_track(subscription.track_id, state.endpoints)

    if track, do: :ok, else: {:error, :invalid_track_id}
  end

  defp fulfill_or_postpone_subscription(subscription, ctx, state) do
    # If the tee for this track is already spawned, fulfill subscription.
    # Otherwise, save subscription as pending, we will fulfill it when the tee is linked.

    if Map.has_key?(ctx.children, {:tee, subscription.track_id}) do
      fulfill_subscriptions([subscription], state)
    else
      state = update_in(state, [:pending_subscriptions], &[subscription | &1])
      {[], state}
    end
  end

  defp fulfill_subscriptions(subscriptions, state) do
    links = build_subscription_links(subscriptions, state)

    Enum.reduce(subscriptions, {links, state}, fn subscription, {links, state} ->
      endpoint_id = subscription.endpoint_id
      track_id = subscription.track_id
      subscription = %{subscription | status: :active}
      state = put_in(state, [:subscriptions, endpoint_id, track_id], subscription)
      {links, state}
    end)
  end

  defp build_subscription_links(subscriptions, state) do
    Enum.map(subscriptions, &build_subscription_link(&1, state))
  end

  defp build_subscription_link(subscription, state) do
    get_child({:tee, subscription.track_id})
    |> via_out(Pad.ref(:output, {:endpoint, subscription.endpoint_id}))
    |> via_in(Pad.ref(:input, subscription.track_id),
      toilet_capacity: state.toilet_capacity
    )
    |> get_child({:endpoint, subscription.endpoint_id})
  end

  defp get_track(track_id, endpoints) do
    endpoints
    |> Map.values()
    |> Enum.flat_map(&Endpoint.get_tracks/1)
    |> Map.new(&{&1.id, &1})
    |> Map.get(track_id)
  end

  #
  # Message Dispatch
  #
  # - dispatch/1: Internal function, dispatches the message to all registered processes within the
  #   registry
  #
  # - dispatch/2: Dispatches the Media Event to all registered processes within the registry, with
  #   the correct `to` field populated
  #
  # - brodcast/1: Convenience function, dispatches the data within a Media Event and sets the `to`
  #   field to `:broadcast`.
  #

  defp dispatch(message) do
    Registry.dispatch(get_registry_name(), self(), fn entries ->
      for {_, pid} <- entries, do: send(pid, message)
    end)
  end
end