lib/membrane_rtc_engine/engine.ex

defmodule Membrane.RTC.Engine do
  @moduledoc """
  RTC Engine implementation.

  RTC Engine is an abstraction layer responsible for linking together different types of `Endpoints`.
  From the implementation point of view, RTC Engine is a `Membrane.Pipeline`.

  ## Messages

  The RTC Engine works by sending messages which notify user logic about important events like
  "There is a new peer, would you like to to accept it?".
  To receive RTC Engine messages you have to register your process so that RTC Engine will
  know where to send them.
  All messages RTC Engine can emit are described in `#{inspect(__MODULE__)}.Message` docs.

  ### Registering for messages

  Registration can be done using `register/2` e.g.

  ```elixir
  Engine.register(rtc_engine, self())
  ```

  This will register your process to receive RTC Engine messages.
  If your process implements `GenServer` behavior then all messages can be handled
  by `c:GenServer.handle_info/2`, e.g.

  ```elixir
  @impl true
  def handle_info(%Message.NewPeer{rtc_engine: rtc_engine, peer: peer}, state) do
    Engine.accept_peer(rtc_engine, peer.id)
    {:noreply, state}
  end
  ```

  You can register multiple processes to receive messages from an RTC Engine instance.
  In such a case each message will be sent to each registered process.

  ## Client Libraries

  RTC Engine allows creating Client Libraries that can send and receive media tracks from it.
  The current version of RTC Engine ships with WebRTC Client Library which connects to the RTC Engine
  via WebRTC standard.
  Communication with Client Libraries is done using `Media Events`.
  Media Events are control messages which notify about e.g. new peer joining to the RTC Engine.
  When Client Library receives Media Event it can invoke some callbacks.
  In the case of WebRTC Client Library, these are e.g. `onPeerJoined` or `onTrackAdded`.
  When RTC Engine receives Media Event it can emit some messages e.g. `t:#{inspect(__MODULE__)}.Message.NewPeer.t/0`.
  More about Media Events can be read in subsequent sections.
  Below there is a figure showing the architecture of the RTC Engine working in conjunction with some Client Library.

  ```txt
      +--------------------------------- media events -----------------------------+
      |                                (signaling layer)                           |
      |                                                                            |
      |                                                                            |
  +--------+                 +---------+             +--------+               +---------+
  | user   | <-   media   -> | Client  |             |  RTC   | <- media   -> | user    |
  | client |      events     | Library | <- media -> | Engine |    events     | backend |
  | logic  | <- callbacks -  |         |             |        | - messages -> | logic   |
  +--------+                 +---------+             +--------+               +---------+
  ```



  ### Media Events

  Media Events are blackbox messages that carry data important for the
  RTC Engine and its Client Library, but not for the user.
  There are two types of Media Events:
  * Internal Media Events - generic, protocol-agnostic Media Events sent by RTC Engine itself.
  Example Internal Media Events are `peerJoined`, `peerLeft`, `tracksAdded` or `tracksRemoved`.
  * Custom Media Events - they can be used to send custom data from Client Library to some Endpoint inside RTC Engine
  and vice versa. In the case of WebRTC Client Library, these are `sdpOffer`, `sdpAnswer`, or `iceCandidate`.

  An application is obligated to transport Media Events from an RTC Engine instance to
  its Client Library, and vice versa.

  When the RTC Engine needs to send a Media Event to a specific client, registered processes will
  receive `t:#{inspect(__MODULE__)}.Message.MediaEvent.t/0` message with `to` field indicating where this Media Event
  should be sent to.
  This can be either `:broadcast`, when the event should be sent to all peers, or `peer_id`
  when the messages should be sent to the specified peer. The `event` is encoded in binary format,
  so it is ready to send without modification.

  Feeding an RTC Engine instance with Media Events from a Client Library can be done using `receive_media_event/2`.
  Assuming the user process is a GenServer, the Media Event can be received by `c:GenServer.handle_info/2` and
  conveyed to the RTC Engine in the following way:

  ```elixir
  @impl true
  def handle_info({:media_event, from, event} = msg, state) do
    Engine.receive_media_event(state.rtc_engine, from, event)
    {:noreply, state}
  end
  ```

  What is important, Membrane RTC Engine doesn't impose usage of any specific transport layer for carrying
  Media Events through the network.
  You can e.g. use Phoenix and its channels.
  This can look like this:

  ```elixir
  @impl true
  def handle_in("mediaEvent", %{"data" => event}, socket) do
    Engine.receive_media_event(socket.assigns.room, socket.assigns.peer_id, event)
    {:noreply, socket}
  end
  ```

  ## Peers

  Each peer represents some user that can possess some metadata.
  A Peer can be added in two ways:
  * by sending proper Media Event from a Client Library
  * using `add_peer/3`

  Adding a peer will cause RTC Engine to emit Media Event which will notify connected clients about new peer.

  ### Peer id

  Peer ids must be assigned by application code. This is not done by the RTC Engine or its client library.
  Ids can be assigned when a peer initializes its signaling layer.

  ## Endpoints

  `Endpoints` are `Membrane.Bin`s able to publish their own tracks and subscribe for tracks from other Endpoints.
  One can think about Endpoint as an entity responsible for handling some specific task.
  An Endpoint can be added and removed using `add_endpoint/3` and `remove_endpoint/2` respectively.

  There are two types of Endpoints:
  * Standalone Endpoints - they are in most cases spawned only once per RTC Engine instance and they are not associated with any peer.
  * Peer Endpoints - they are associated with some peer.
  Associating Endpoint with Peer will cause RTC Engine to send some Media Events to the Enpoint's Client Library
  e.g. one which indicates which tracks belong to which peer.

  Currently RTC Engine ships with the implementation of two Endpoints:
  * `#{inspect(__MODULE__)}.Endpoint.WebRTC` which is responsible for establishing a connection with some WebRTC
  peer (mainly browser) and exchanging media with it. WebRTC Endpoint is a Peer Endpoint.
  * `#{inspect(__MODULE__)}.Endpoint.HLS` which is responsible for receiving media tracks from all other Endpoints and
  saving them to files by creating HLS playlists. HLS Endpoint is a Standalone Endpoint.

  User can also implement custom Endpoints.

  ### Implementing custom RTC Engine Endpoint

  Each RTC Engine Endpoint has to:
  * implement `Membrane.Bin` behavior
  * specify input, output, or both input and output pads depending on what it is intended to do.
  For example, if Endpoint will not publish any tracks but only subscribe for tracks from other Endpoints it can specify only input pads.
  Pads should have the following form

  ```elixir
    def_input_pad :input,
      demand_unit: :buffers,
      caps: <caps>,
      availability: :on_request

    def_output_pad :output,
      demand_unit: :buffers,
      caps: <caps>,
      availability: :on_request
  ```

  Where `caps` are `t:Membrane.Caps.t/0` or `:any`.

  * publish for some tracks using actions `t:publish_action_t/0` and subscribe for some tracks using
  function `#{inspect(__MODULE__)}.subscribe/5`. The first will cause RTC Engine to send a message in
  form of `{:new_tracks, tracks}` where `tracks` is a list of `t:#{inspect(__MODULE__)}.Track.t/0` to all other Endpoints.
  When an Endpoint receives such a message it can subscribe for new tracks by
  using `#{inspect(__MODULE__)}.subscribe/5` function. An Endpoint will be notified about track readiness
  it subscribed for in `c:Membrane.Bin.handle_pad_added/3` callback. An example implementation of `handle_pad_added`
  callback can look like this

  ```elixir
    @impl true
    def handle_pad_added(Pad.ref(:input, _track_id) = pad, _ctx, state) do
      links = [
        link_bin_input(pad)
        |> via_in(pad)
        |> to(:my_element)
      ]

      {{:ok, spec: %ParentSpec{links: links}}, state}
    end
  ```

  Where `:my_element` is a custom Membrane element responsible for processing track.

  Endpoint will be also notified when some tracks it subscribed for are removed with
  `{:removed_tracks, tracks}` message where `tracks` is a list of `t:#{inspect(__MODULE__)}.Track.t/0`.
  """
  use Membrane.Pipeline
  use OpenTelemetryDecorator
  import Membrane.RTC.Utils

  alias Membrane.RTC.Engine.{
    Endpoint,
    MediaEvent,
    Message,
    Track,
    Peer,
    DisplayManager,
    Subscription
  }

  alias Membrane.RTC.Engine
  alias Membrane.RTC.Engine.Endpoint.WebRTC.SimulcastTee

  require Membrane.Logger

  @registry_name Membrane.RTC.Engine.Registry.Dispatcher

  @typedoc """
  RTC Engine configuration options.

  * `id` is used by logger. If not provided it will be generated.
  * `trace_ctx` is used by OpenTelemetry. All traces from this engine will be attached to this context.
  Example function from which you can get Otel Context is `get_current/0` from `OpenTelemetry.Ctx`.
  * `display_manager?` - set to `true` if you want to limit number of tracks sent from `#{inspect(__MODULE__)}.Endpoint.WebRTC` to a browser.
  """

  @type options_t() :: [
          id: String.t(),
          trace_ctx: map(),
          telemetry_label: Membrane.TelemetryMetrics.label(),
          display_manager?: boolean()
        ]

  @typedoc """
  Endpoint configuration options.

  * `peer_id` - associate endpoint with exisiting peer
  * `endpoint_id` - assign endpoint id. If not provided it will be generated by RTC Engine. This option cannot be used together with `peer_id`.
  Endpoints associated with peers have the id `peer_id`.
  * `node` - node on which endpoint should be spawned. If not provided, current node is used.
  """
  @type endpoint_options_t() :: [
          endpoint_id: String.t(),
          peer_id: String.t(),
          node: node()
        ]

  @typedoc """
  Subscription options.

  * `default_simulcast_encoding` - initial encoding that
  endpoint making subscription wants to receive.
  This option has no effect for audio tracks and video tracks
  that are not simulcast.
  """
  @type subscription_opts_t() :: [default_simulcast_encoding: String.t()]

  @typedoc """
  Membrane action that will cause RTC Engine to publish some message to all other endpoints.
  """
  @type publish_action_t() :: {:notify, {:publish, publish_message_t()}}

  @typedoc """
  Membrane action that will inform RTC Engine about track readiness.
  """
  @type track_ready_action_t() ::
          {:notify,
           {:track_ready, Track.id(), Track.encoding(),
            depayloading_filter :: Membrane.ParentSpec.child_spec_t()}}

  @typedoc """
  Membrane action that will generate Custom Media Event.
  """
  @type custom_media_event_action_t() :: {:notify, {:custom_media_event, data :: binary()}}

  @typedoc """
  Types of messages that can be published to other Endpoints.
  """
  @type publish_message_t() :: {:new_tracks, [Track.t()]} | {:removed_tracks, [Track.t()]}

  @spec start(options :: options_t(), process_options :: GenServer.options()) ::
          GenServer.on_start()
  def start(options, process_options) do
    do_start(:start, options, process_options)
  end

  @spec start_link(options :: options_t(), process_options :: GenServer.options()) ::
          GenServer.on_start()
  def start_link(options, process_options) do
    do_start(:start_link, options, process_options)
  end

  defp do_start(func, options, process_options) when func in [:start, :start_link] do
    id = options[:id] || "#{UUID.uuid4()}"
    display_manager? = options[:display_manager?] || false
    options = Keyword.put(options, :id, id)
    options = Keyword.put(options, :display_manager?, display_manager?)

    Membrane.Logger.info("Starting a new RTC Engine instance with id: #{id}")

    apply(Membrane.Pipeline, func, [
      __MODULE__,
      options,
      process_options
    ])
  end

  @spec get_registry_name() :: atom()
  def get_registry_name(), do: @registry_name

  @doc """
  Adds endpoint to the RTC Engine

  Returns `:error` when there are both `peer_id` and `endpoint_id` specified in `opts`.
  For more information refer to `t:endpoint_options_t/0`.
  """
  @spec add_endpoint(
          pid :: pid(),
          endpoint :: Membrane.ParentSpec.child_spec_t(),
          opts :: endpoint_options_t()
        ) :: :ok | :error
  def add_endpoint(pid, endpoint, opts \\ []) do
    if Keyword.has_key?(opts, :endpoint_id) and
         Keyword.has_key?(opts, :peer_id) do
      raise "You can't pass both option endpoint_id and peer_id"
    else
      send(pid, {:add_endpoint, endpoint, opts})
      :ok
    end
  end

  @doc """
  Removes endpoint from the RTC Engine
  """
  @spec remove_endpoint(
          pid :: pid(),
          id :: String.t()
        ) :: :ok
  def remove_endpoint(rtc_engine, id) do
    send(rtc_engine, {:remove_endpoint, id})
    :ok
  end

  @doc """
  Adds peer to the RTC Engine
  """
  @spec add_peer(pid :: pid(), peer :: Peer.t()) :: :ok
  def add_peer(pid, peer) do
    send(pid, {:add_peer, peer})
    :ok
  end

  @doc """
  Removes peer from RTC Engine.

  If reason is other than `nil`, RTC Engine will inform client library about peer removal with passed reason.
  """
  @spec remove_peer(rtc_engine :: pid(), peer_id :: any(), reason :: String.t() | nil) :: :ok
  def remove_peer(rtc_engine, peer_id, reason \\ nil) do
    send(rtc_engine, {:remove_peer, peer_id, reason})
    :ok
  end

  @doc """
  Allows peer for joining to the RTC Engine
  """
  @spec accept_peer(
          pid :: pid(),
          peer_id :: String.t()
        ) :: :ok
  def accept_peer(pid, peer_id) do
    send(pid, {:accept_new_peer, peer_id})
    :ok
  end

  @doc """
  Deny peer from joining to the RTC Engine.
  """
  @spec deny_peer(pid :: pid(), peer_id :: String.t()) :: :ok
  def deny_peer(pid, peer_id) do
    send(pid, {:deny_new_peer, peer_id})
    :ok
  end

  @doc """
  The same as `deny_peer/2` but allows for passing any data that will be returned to the client.

  This can be used for passing reason of peer refusal.
  """
  @spec deny_peer(pid :: pid(), peer_id :: String.t(), data: any()) :: :ok
  def deny_peer(pid, peer_id, data) do
    send(pid, {:deny_new_peer, peer_id, data})
    :ok
  end

  @doc """
  Registers process with pid `who` for receiving messages from RTC Engine
  """
  @spec register(rtc_engine :: pid(), who :: pid()) :: :ok
  def register(rtc_engine, who \\ self()) do
    send(rtc_engine, {:register, who})
    :ok
  end

  @doc """
  Unregisters process with pid `who` from receiving messages from RTC Engine
  """
  @spec unregister(rtc_engine :: pid(), who :: pid()) :: :ok
  def unregister(rtc_engine, who \\ self()) do
    send(rtc_engine, {:unregister, who})
    :ok
  end

  @doc """
  Sends Media Event to RTC Engine.
  """
  @spec receive_media_event(rtc_engine :: pid(), media_event :: {:media_event, pid(), any()}) ::
          :ok
  def receive_media_event(rtc_engine, media_event) do
    send(rtc_engine, media_event)
    :ok
  end

  @doc """
  Subscribes endpoint for tracks.

  Endpoint  will be notified about track readiness in `c:Membrane.Bin.handle_pad_added/3` callback.
  `tracks` is a list in form of pairs `{track_id, track_format}`, where `track_id` is id of track this endpoint subscribes for
  and `track_format` is the format of track that this endpoint is willing to receive.
  If `track_format` is `:raw` Endpoint will receive track in `t:#{inspect(__MODULE__)}.Track.encoding/0` format.
  Endpoint_id is a an id of endpoint, which want to subscribe on tracks.
  """
  @spec subscribe(
          rtc_engine :: pid(),
          endpoint_id :: String.t(),
          track_id :: Track.id(),
          format :: atom(),
          opts :: subscription_opts_t
        ) ::
          :ok
          | {:error,
             :timeout
             | :invalid_track_id
             | :invalid_track_format
             | :invalid_default_simulcast_encoding}
  def subscribe(rtc_engine, endpoint_id, track_id, format, opts \\ []) do
    ref = make_ref()
    send(rtc_engine, {:subscribe, {self(), ref}, endpoint_id, track_id, format, opts})

    receive do
      {^ref, :ok} ->
        :ok

      {^ref, {:error, reason}} ->
        {:error, reason}
    after
      5_000 ->
        {:error, :timeout}
    end
  end

  @impl true
  def handle_init(options) do
    trace_ctx =
      if Keyword.has_key?(options, :trace_ctx) do
        OpenTelemetry.Ctx.attach(options[:trace_ctx])
      else
        Membrane.RTC.Utils.create_otel_context("rtc:#{options[:id]}")
      end

    display_manager =
      if options[:display_manager?] do
        {:ok, pid} = DisplayManager.start_link(ets_name: options[:id], engine: self())
        pid
      else
        nil
      end

    telemetry_label = (options[:telemetry_label] || []) ++ [room_id: options[:id]]

    {{:ok, playback: :playing},
     %{
       id: options[:id],
       component_path: Membrane.ComponentPath.get_formatted(),
       trace_context: trace_ctx,
       telemetry_label: telemetry_label,
       peers: %{},
       endpoints: %{},
       pending_subscriptions: [],
       filters: %{},
       subscriptions: %{},
       display_manager: display_manager
     }}
  end

  @impl true
  def handle_playing_to_prepared(ctx, state) do
    {actions, state} =
      state.peers
      |> Map.keys()
      |> Enum.reduce({[], state}, fn peer_id, {all_actions, state} ->
        {actions, state} = handle_remove_peer(peer_id, "playback_finished", ctx, state)
        {all_actions ++ actions, state}
      end)

    {{:ok, actions}, state}
  end

  @impl true
  @decorate trace("engine.other.register", include: [[:state, :id]])
  def handle_other({:register, pid}, _ctx, state) do
    Registry.register(get_registry_name(), self(), pid)
    {:ok, state}
  end

  @impl true
  @decorate trace("engine.other.unregister", include: [[:state, :id]])
  def handle_other({:unregister, pid}, _ctx, state) do
    Registry.unregister_match(get_registry_name(), self(), pid)
    {:ok, state}
  end

  @impl true
  @decorate trace("engine.other.tracks_priority", include: [[:state, :id]])
  def handle_other({:track_priorities, endpoint_to_tracks}, ctx, state) do
    _msgs =
      Enum.map(endpoint_to_tracks, fn {{:endpoint, endpoint_id}, tracks} ->
        MediaEvent.create_tracks_priority_event(tracks)
        |> then(&%Message.MediaEvent{rtc_engine: self(), to: endpoint_id, data: &1})
        |> dispatch()
      end)

    tee_actions =
      ctx
      |> filter_children(pattern: {:tee, _tee_name})
      |> Enum.flat_map(&[forward: {&1, :track_priorities_updated}])

    {{:ok, tee_actions}, state}
  end

  @impl true
  @decorate trace("engine.other.remove_peer", include: [[:state, :id]])
  def handle_other({:remove_peer, id, reason}, ctx, state) do
    {actions, state} = handle_remove_peer(id, reason, ctx, state)
    {{:ok, actions}, state}
  end

  @impl true
  @decorate trace("engine.other.add_endpoint", include: [[:state, :component_path], [:state, :id]])
  def handle_other({:add_endpoint, endpoint, opts}, _ctx, state) do
    peer_id = opts[:peer_id]
    endpoint_id = opts[:endpoint_id] || opts[:peer_id]

    endpoint =
      case endpoint do
        %Endpoint.WebRTC{} ->
          telemetry_label = state.telemetry_label ++ [peer_id: peer_id]
          %Endpoint.WebRTC{endpoint | telemetry_label: telemetry_label}

        another_endpoint ->
          another_endpoint
      end

    cond do
      Map.has_key?(state.endpoints, endpoint_id) ->
        Membrane.Logger.warn(
          "Cannot add Endpoint with id #{inspect(endpoint_id)} as it already exists"
        )

        {:ok, state}

      peer_id != nil and !Map.has_key?(state.peers, peer_id) ->
        Membrane.Logger.warn(
          "Cannot attach Endpoint to peer with id #{peer_id} as such peer does not exist"
        )

        {:ok, state}

      true ->
        {actions, state} = setup_endpoint(endpoint, opts, state)
        {{:ok, actions}, state}
    end
  end

  @impl true
  @decorate trace("engine.other.add_peer", include: [[:state, :id]])
  def handle_other({:add_peer, peer}, _ctx, state) do
    {actions, state} = do_accept_new_peer(peer, state)
    {{:ok, actions}, state}
  end

  @impl true
  @decorate trace("engine.other.remove_endpoint", include: [[:state, :id]])
  def handle_other({:remove_endpoint, id}, ctx, state) do
    case(do_remove_endpoint(id, ctx, state)) do
      {:absent, [], state} ->
        Membrane.Logger.info("Endpoint #{inspect(id)} already removed")
        {:ok, state}

      {:present, actions, state} ->
        {{:ok, actions}, state}
    end
  end

  @decorate trace("engine.other.subscribe", include: [[:state, :id]])
  def handle_other(
        {:subscribe, {endpoint_pid, ref}, endpoint_id, track_id, format, opts},
        ctx,
        state
      ) do
    subscription = %Subscription{
      endpoint_id: endpoint_id,
      track_id: track_id,
      format: format,
      opts: opts
    }

    case check_subscription(subscription, state) do
      :ok ->
        {links, state} = try_fulfill_subscription(subscription, ctx, state)
        parent_spec = %ParentSpec{links: links, log_metadata: [rtc: state.id]}
        send(endpoint_pid, {ref, :ok})
        {{:ok, [spec: parent_spec]}, state}

      {:error, _reason} = error ->
        send(endpoint_pid, {ref, error})
        {:ok, state}
    end
  end

  @impl true
  @decorate trace("engine.other.media_event", include: [[:state, :id]])
  def handle_other({:media_event, from, data}, ctx, state) do
    case MediaEvent.deserialize(data) do
      {:ok, event} ->
        if event.type == :join or Map.has_key?(state.peers, from) do
          {actions, state} = handle_media_event(event, from, ctx, state)
          {{:ok, actions}, state}
        else
          Membrane.Logger.warn("Received media event from unknown peer id: #{inspect(from)}")
          {:ok, state}
        end

      {:error, :invalid_media_event} ->
        Membrane.Logger.warn("Invalid media event #{inspect(data)}")
        {:ok, state}
    end
  end

  @impl true
  def handle_crash_group_down(endpoint_id, ctx, state) do
    if Map.has_key?(state.peers, endpoint_id) do
      MediaEvent.create_peer_removed_event(endpoint_id, "Internal server error.")
      |> then(&%Message.MediaEvent{rtc_engine: self(), to: endpoint_id, data: &1})
      |> dispatch()
    end

    %Message.EndpointCrashed{endpoint_id: endpoint_id}
    |> dispatch()

    {_status, actions, state} = do_remove_endpoint(endpoint_id, ctx, state)

    {{:ok, actions}, state}
  end

  defp handle_media_event(%{type: :join, data: data}, peer_id, _ctx, state) do
    peer = Peer.new(peer_id, data.metadata || %{})
    dispatch(%Message.NewPeer{rtc_engine: self(), peer: peer})

    receive do
      {:accept_new_peer, ^peer_id} ->
        do_accept_new_peer(peer, state)

      {:accept_new_peer, peer_id} ->
        Membrane.Logger.warn("Unknown peer id passed for acceptance: #{inspect(peer_id)}")
        {[], state}

      {:deny_new_peer, peer_id} ->
        MediaEvent.create_peer_denied_event()
        |> then(&%Message.MediaEvent{rtc_engine: self(), to: peer_id, data: &1})
        |> dispatch()

        {[], state}

      {:deny_new_peer, peer_id, data: data} ->
        MediaEvent.create_peer_denied_event(data)
        |> then(&%Message.MediaEvent{rtc_engine: self(), to: peer_id, data: &1})
        |> dispatch()

        {[], state}
    end
  end

  defp handle_media_event(%{type: :custom, data: event}, peer_id, ctx, state) do
    actions = forward({:endpoint, peer_id}, {:custom_media_event, event}, ctx)

    {actions, state}
  end

  defp handle_media_event(%{type: :leave}, peer_id, ctx, state) do
    %Message.PeerLeft{rtc_engine: self(), peer: state.peers[peer_id]}
    |> dispatch()

    handle_remove_peer(peer_id, nil, ctx, state)
  end

  defp handle_media_event(
         %{type: :update_peer_metadata, data: %{metadata: metadata}},
         peer_id,
         _ctx,
         state
       ) do
    peer = Map.get(state.peers, peer_id)

    if peer.metadata != metadata do
      updated_peer = %{peer | metadata: metadata}
      state = put_in(state, [:peers, peer_id], updated_peer)

      MediaEvent.create_peer_updated_event(updated_peer)
      |> then(&%Message.MediaEvent{rtc_engine: self(), to: :broadcast, data: &1})
      |> dispatch()

      {[], state}
    else
      {[], state}
    end
  end

  defp handle_media_event(
         %{
           type: :update_track_metadata,
           data: %{track_id: track_id, track_metadata: track_metadata}
         },
         endpoint_id,
         _ctx,
         state
       ) do
    if Map.has_key?(state.endpoints, endpoint_id) do
      endpoint = Map.get(state.endpoints, endpoint_id)
      track = Endpoint.get_track_by_id(endpoint, track_id)

      if track != nil and track.metadata != track_metadata do
        endpoint = Endpoint.update_track_metadata(endpoint, track_id, track_metadata)
        state = put_in(state, [:endpoints, endpoint_id], endpoint)

        MediaEvent.create_track_updated_event(endpoint_id, track_id, track_metadata)
        |> then(&%Message.MediaEvent{rtc_engine: self(), to: :broadcast, data: &1})
        |> dispatch()

        {[], state}
      else
        {[], state}
      end
    else
      {[], state}
    end
  end

  defp handle_media_event(
         %{
           type: :select_encoding,
           data: %{peer_id: peer_id, track_id: track_id, encoding: encoding}
         },
         requester,
         _ctx,
         state
       ) do
    endpoint = Map.fetch!(state.endpoints, peer_id)
    subscription = get_in(state, [:subscriptions, requester, track_id])
    video_track = Endpoint.get_track_by_id(endpoint, track_id)

    cond do
      subscription == nil ->
        Membrane.Logger.warn("""
        Endpoint #{inspect(requester)} requested encoding #{inspect(encoding)} for
        track #{inspect(track_id)} belonging to peer #{inspect(peer_id)} but
        given endpoint is not subscribed for this track. Ignoring.
        """)

        {[], state}

      video_track == nil ->
        Membrane.Logger.warn("""
        Endpoint #{inspect(requester)} requested encoding #{inspect(encoding)} for
        track #{inspect(track_id)} belonging to peer #{inspect(peer_id)} but
        given peer does not have this track.
        Peer tracks: #{inspect(Endpoint.get_tracks(endpoint) |> Enum.map(& &1.id))}
        Ignoring.
        """)

        {[], state}

      encoding not in video_track.simulcast_encodings ->
        Membrane.Logger.warn("""
        Endpoint #{inspect(requester)} requested encoding #{inspect(encoding)} for
        track #{inspect(track_id)} belonging to peer #{inspect(peer_id)} but
        given track does not have this encoding.
        Track encodings: #{inspect(video_track.simulcast_encodings)}
        Ignoring.
        """)

        {[], state}

      true ->
        tee = {:tee, track_id}
        actions = [forward: {tee, {:select_encoding, {requester, encoding}}}]
        {actions, state}
    end
  end

  @impl true
  def handle_notification(notifcation, {:endpoint, endpoint_id} = from, ctx, state) do
    if Map.has_key?(state.endpoints, endpoint_id) do
      do_handle_notification(notifcation, from, ctx, state)
    else
      {:ok, state}
    end
  end

  @impl true
  def handle_notification(
        {:encoding_switched, receiver_endpoint_id, encoding},
        {:tee, track_id},
        _ctx,
        state
      ) do
    # send event that endpoint with id `sender_endpoint_id` is sending encoding `encoding` for track
    # `track_id` now

    {sender_endpoint_id, _endpoint} =
      Enum.find(state.endpoints, fn {_endpoint_id, endpoint} ->
        Endpoint.get_track_by_id(endpoint, track_id) != nil
      end)

    MediaEvent.create_encoding_switched_event(sender_endpoint_id, track_id, encoding)
    |> then(&%Message.MediaEvent{rtc_engine: self(), to: receiver_endpoint_id, data: &1})
    |> dispatch()

    {:ok, state}
  end

  # NOTE: When `payload_and_depayload_tracks?` options is set to false we may still want to depayload
  # some streams just in one place to e.g. dump them to HLS or perform any actions on depayloaded
  # media without adding payload/depayload elements to all EndpointBins (performing unnecessary work).
  #
  # To do that one just need to apply `depayloading_filter` after the tee element on which filter's the notification arrived.
  @decorate trace("engine.notification.track_ready",
              include: [:track_id, :encoding, [:state, :id]]
            )
  defp do_handle_notification(
         {:track_ready, track_id, rid, encoding, depayloading_filter},
         {:endpoint, endpoint_id},
         ctx,
         state
       ) do
    Membrane.Logger.info(
      "New incoming #{encoding} track #{track_id} from endpoint #{inspect(endpoint_id)}"
    )

    state = put_in(state, [:filters, track_id], depayloading_filter)
    track = state.endpoints |> Map.fetch!(endpoint_id) |> Endpoint.get_track_by_id(track_id)
    {tee_links, state} = create_and_link_tee(track_id, rid, track, endpoint_id, ctx, state)

    # check if there are subscriptions for this track and fulfill them
    {pending_track_subscriptions, pending_rest_subscriptions} =
      Enum.split_with(state.pending_subscriptions, &(&1.track_id == track.id))

    {subscription_links, state} =
      Enum.flat_map_reduce(pending_track_subscriptions, state, fn subscription, state ->
        fulfill_subscription(subscription, ctx, state)
      end)

    links = tee_links ++ subscription_links
    state = %{state | pending_subscriptions: pending_rest_subscriptions}

    state =
      update_in(
        state,
        [:endpoints, endpoint_id],
        &Endpoint.update_track_encoding(&1, track_id, encoding)
      )

    spec = %ParentSpec{
      links: links,
      crash_group: {endpoint_id, :temporary},
      log_metadata: [rtc: state.id]
    }

    {{:ok, spec: spec}, state}
  end

  @decorate trace("engine.notification.publish.new_tracks", include: [:endpoint_id, [:state, :id]])
  defp do_handle_notification(
         {:publish, {:new_tracks, tracks}},
         {:endpoint, endpoint_id},
         _ctx,
         state
       ) do
    id_to_track = Map.new(tracks, &{&1.id, &1})

    state =
      update_in(
        state,
        [:endpoints, endpoint_id, :inbound_tracks],
        &Map.merge(&1, id_to_track)
      )

    tracks_msgs = do_publish({:new_tracks, tracks}, {:endpoint, endpoint_id}, state)

    endpoint = get_in(state, [:endpoints, endpoint_id])
    track_id_to_track_metadata = Endpoint.get_active_track_metadata(endpoint)

    MediaEvent.create_tracks_added_event(endpoint_id, track_id_to_track_metadata)
    |> then(&%Message.MediaEvent{rtc_engine: self(), to: :broadcast, data: &1})
    |> dispatch()

    {{:ok, tracks_msgs}, state}
  end

  @decorate trace("engine.notification.publish.removed_tracks",
              include: [:endpoint_id, [:state, :id]]
            )
  defp do_handle_notification(
         {:publish, {:removed_tracks, tracks}},
         {:endpoint, endpoint_id},
         ctx,
         state
       ) do
    id_to_track = Map.new(tracks, &{&1.id, &1})

    state =
      update_in(
        state,
        [:endpoints, endpoint_id, :inbound_tracks],
        &Map.merge(&1, id_to_track)
      )

    tracks_msgs = do_publish({:remove_tracks, tracks}, {:endpoint, endpoint_id}, state)

    track_ids = Enum.map(tracks, & &1.id)

    MediaEvent.create_tracks_removed_event(endpoint_id, track_ids)
    |> then(&%Message.MediaEvent{rtc_engine: self(), to: :broadcast, data: &1})
    |> dispatch()

    tracks_children = Enum.flat_map(tracks, &get_track_elements(&1.id, ctx))

    {{:ok, tracks_msgs ++ [remove_child: tracks_children]}, state}
  end

  @decorate trace("engine.notification.custom_media_event", include: [[:state, :id]])
  defp do_handle_notification({:custom_media_event, data}, {:endpoint, peer_id}, _ctx, state) do
    MediaEvent.create_custom_event(data)
    |> then(&%Message.MediaEvent{rtc_engine: self(), to: peer_id, data: &1})
    |> dispatch()

    {:ok, state}
  end

  defp create_and_link_tee(track_id, rid, track, endpoint_id, ctx, state) do
    telemetry_label =
      state.telemetry_label ++
        [
          peer_id: endpoint_id,
          track_id: "#{track_id}:#{rid}"
        ]

    tee =
      cond do
        rid != nil ->
          %SimulcastTee{track: track}

        state.display_manager != nil ->
          %Engine.FilterTee{
            ets_name: state.id,
            track_id: track_id,
            type: track.type,
            codec: track.encoding,
            telemetry_label: telemetry_label
          }

        true ->
          %Engine.PushOutputTee{
            codec: track.encoding,
            telemetry_label: telemetry_label
          }
      end

    # spawn tee if it doesn't exist
    tee_link =
      if Map.has_key?(ctx.children, {:tee, track_id}) do
        &to(&1, {:tee, track_id})
      else
        &to(&1, {:tee, track_id}, tee)
      end

    endpoint_to_tee_links = [
      if rid do
        link({:endpoint, endpoint_id})
        |> via_out(Pad.ref(:output, {track_id, rid}))
        |> via_in(Pad.ref(:input, {track_id, rid}),
          options: [telemetry_label: telemetry_label]
        )
        |> then(&tee_link.(&1))
      else
        link({:endpoint, endpoint_id})
        |> via_out(Pad.ref(:output, {track_id, rid}))
        |> then(&tee_link.(&1))
      end
    ]

    {endpoint_to_tee_links, state}
  end

  defp check_subscription(subscription, state) do
    # checks whether subscription is correct
    track = get_track(subscription.track_id, state.endpoints)
    default_simulcast_encoding = subscription.opts[:default_simulcast_encoding]

    cond do
      track == nil ->
        {:error, :invalid_track_id}

      subscription.format not in track.format ->
        {:error, :invalid_format}

      # check if subscribed for existing simulcast encoding if simulcast is used
      track.simulcast_encodings != [] and default_simulcast_encoding != nil and
          default_simulcast_encoding not in track.simulcast_encodings ->
        {:error, :invalid_default_simulcast_encoding}

      true ->
        :ok
    end
  end

  defp try_fulfill_subscription(subscription, ctx, state) do
    # if tee for this track is already spawned, fulfill subscription
    # otherwise, save subscription as pending, we will fulfill it
    # when tee appears
    if Map.has_key?(ctx.children, {:tee, subscription.track_id}) do
      fulfill_subscription(subscription, ctx, state)
    else
      state = update_in(state, [:pending_subscriptions], &[subscription | &1])
      {[], state}
    end
  end

  defp fulfill_subscription(%Subscription{format: :raw} = subscription, ctx, state) do
    raw_format_links =
      if Map.has_key?(ctx.children, {:raw_format_tee, subscription.track_id}) do
        []
      else
        prepare_raw_format_links(subscription.track_id, state)
      end

    {links, state} = do_fulfill_subscription(subscription, :raw_format_tee, state)

    {raw_format_links ++ links, state}
  end

  defp fulfill_subscription(%Subscription{format: _remote_format} = subscription, _ctx, state) do
    do_fulfill_subscription(subscription, :tee, state)
  end

  defp do_fulfill_subscription(subscription, tee_kind, state) do
    links = prepare_track_to_endpoint_links(subscription, tee_kind, state)
    subscription = %Subscription{subscription | status: :active}
    endpoint_id = subscription.endpoint_id
    track_id = subscription.track_id
    state = put_in(state, [:subscriptions, endpoint_id, track_id], subscription)
    {links, state}
  end

  defp prepare_raw_format_links(track_id, state) do
    [
      link({:tee, track_id})
      |> to({:raw_format_filter, track_id}, get_in(state, [:filters, track_id]))
      |> to({:raw_format_tee, track_id}, Engine.PushOutputTee)
    ]
  end

  defp prepare_track_to_endpoint_links(subscription, :tee, state) do
    # if someone subscribed for simulcast track, prepare options
    # for SimulcastTee
    track = get_track(subscription.track_id, state.endpoints)

    options =
      if track.type == :video and track.simulcast_encodings != [] do
        [default_simulcast_encoding: subscription.opts[:default_simulcast_encoding]]
      else
        []
      end

    [
      link({:tee, subscription.track_id})
      |> via_out(Pad.ref(:output, {:endpoint, subscription.endpoint_id}), options: options)
      |> via_in(Pad.ref(:input, subscription.track_id))
      |> to({:endpoint, subscription.endpoint_id})
    ]
  end

  defp prepare_track_to_endpoint_links(subscription, tee_kind, _state) do
    [
      link({tee_kind, subscription.track_id})
      |> via_out(Pad.ref(:output, {:endpoint, subscription.endpoint_id}))
      |> via_in(Pad.ref(:input, subscription.track_id))
      |> to({:endpoint, subscription.endpoint_id})
    ]
  end

  defp dispatch(msg) do
    Registry.dispatch(get_registry_name(), self(), fn entries ->
      for {_, pid} <- entries, do: send(pid, msg)
    end)
  end

  defp do_accept_new_peer(peer, state) do
    if Map.has_key?(state.peers, peer.id) do
      Membrane.Logger.warn("Peer with id: #{inspect(peer.id)} has already been added")
      {[], state}
    else
      state = put_in(state, [:peers, peer.id], peer)

      MediaEvent.create_peer_accepted_event(
        peer.id,
        Map.delete(state.peers, peer.id),
        state.endpoints
      )
      |> then(&%Message.MediaEvent{rtc_engine: self(), to: peer.id, data: &1})
      |> dispatch()

      MediaEvent.create_peer_joined_event(peer)
      |> then(&%Message.MediaEvent{rtc_engine: self(), to: :broadcast, data: &1})
      |> dispatch()

      {[], state}
    end
  end

  defp setup_endpoint(endpoint_entry, opts, state) do
    inbound_tracks = []

    outbound_tracks = state.endpoints |> get_outbound_tracks() |> Enum.filter(& &1.active?)

    endpoint_id = opts[:endpoint_id] || opts[:peer_id] || "#{UUID.uuid4()}"
    endpoint = Endpoint.new(endpoint_id, inbound_tracks)

    endpoint_name = {:endpoint, endpoint_id}

    children = %{
      endpoint_name => endpoint_entry
    }

    action = [
      forward: {endpoint_name, {:display_manager, state.display_manager}},
      forward: {endpoint_name, {:new_tracks, outbound_tracks}}
    ]

    state = put_in(state, [:subscriptions, endpoint_id], %{})

    spec = %ParentSpec{
      node: opts[:node],
      children: children,
      crash_group: {endpoint_id, :temporary},
      log_metadata: [rtc: state.id]
    }

    state = put_in(state.endpoints[endpoint_id], endpoint)

    {[spec: spec] ++ action, state}
  end

  defp get_outbound_tracks(endpoints),
    do: Enum.flat_map(endpoints, fn {_id, endpoint} -> Endpoint.get_tracks(endpoint) end)

  defp get_track(track_id, endpoints) do
    endpoints
    |> Map.values()
    |> Enum.flat_map(&Endpoint.get_tracks/1)
    |> Map.new(&{&1.id, &1})
    |> Map.get(track_id)
  end

  defp handle_remove_peer(peer_id, reason, ctx, state) do
    case do_remove_peer(peer_id, reason, ctx, state) do
      {:absent, [], state} ->
        Membrane.Logger.info("Peer #{inspect(peer_id)} already removed")
        {[], state}

      {:present, actions, state} ->
        MediaEvent.create_peer_left_event(peer_id)
        |> then(&%Message.MediaEvent{rtc_engine: self(), to: :broadcast, data: &1})
        |> dispatch()

        send_if_not_nil(state.display_manager, {:unregister_endpoint, {:endpoint, peer_id}})

        {actions, state}
    end
  end

  defp do_remove_peer(peer_id, reason, ctx, state) do
    if Map.has_key?(state.peers, peer_id) do
      unless reason == nil,
        do:
          MediaEvent.create_peer_removed_event(peer_id, reason)
          |> then(&%Message.MediaEvent{rtc_engine: self(), to: peer_id, data: &1})
          |> dispatch()

      {_peer, state} = pop_in(state, [:peers, peer_id])
      {_status, actions, state} = do_remove_endpoint(peer_id, ctx, state)
      {:present, actions, state}
    else
      {:absent, [], state}
    end
  end

  defp do_remove_endpoint(endpoint_id, ctx, state) do
    if Map.has_key?(state.endpoints, endpoint_id) do
      {endpoint, state} = pop_in(state, [:endpoints, endpoint_id])
      {_subscriptions, state} = pop_in(state, [:subscriptions, endpoint_id])

      state =
        update_in(state, [:pending_subscriptions], fn subscriptions ->
          Enum.filter(subscriptions, &(&1.endpoint_id != endpoint_id))
        end)

      tracks = Enum.map(Endpoint.get_tracks(endpoint), &%Track{&1 | active?: true})

      tracks_msgs = do_publish({:remove_tracks, tracks}, {:endpoint, endpoint_id}, state)

      endpoint_bin = ctx.children[{:endpoint, endpoint_id}]

      actions =
        if endpoint_bin == nil or endpoint_bin.terminating? do
          []
        else
          [remove_child: find_children_for_endpoint(endpoint, ctx)]
        end

      {:present, tracks_msgs ++ actions, state}
    else
      {:absent, [], state}
    end
  end

  defp find_children_for_endpoint(endpoint, ctx) do
    children =
      endpoint
      |> Endpoint.get_tracks()
      |> Enum.flat_map(fn track -> get_track_elements(track.id, ctx) end)

    [endpoint: endpoint.id] ++ children
  end

  defp get_track_elements(track_id, ctx) do
    [
      tee: track_id,
      raw_format_filter: track_id,
      raw_format_tee: track_id
    ]
    |> Enum.filter(&Map.has_key?(ctx.children, &1))
  end

  defp do_publish({_, []} = _tracks, _endpoint_bin, _state), do: []

  defp do_publish({:new_tracks, _tracks} = msg, endpoint_bin_name, state) do
    Enum.flat_map(state.endpoints, fn {endpoint_id, endpoint} ->
      current_endpoint_bin_name = {:endpoint, endpoint_id}

      if current_endpoint_bin_name != endpoint_bin_name and not is_nil(endpoint) do
        [forward: {current_endpoint_bin_name, msg}]
      else
        []
      end
    end)
  end

  defp do_publish({:remove_tracks, tracks}, endpoint_bin_name, state) do
    Enum.flat_map(state.endpoints, fn {endpoint_id, endpoint} ->
      current_endpoint_bin_name = {:endpoint, endpoint_id}

      has_subscription_on_track = fn track_id ->
        state.subscriptions
        |> Map.fetch!(endpoint_id)
        |> Map.has_key?(track_id)
      end

      tracks_to_remove = Enum.filter(tracks, &has_subscription_on_track.(&1.id))
      msg = {:remove_tracks, tracks_to_remove}

      if current_endpoint_bin_name != endpoint_bin_name and not is_nil(endpoint) do
        [forward: {current_endpoint_bin_name, msg}]
      else
        []
      end
    end)
  end

  defp do_publish(msg, _endpoint_bin_name, _state) do
    Membrane.Logger.warn(
      "Requested unknown message type to be published by RTC Engine #{inspect(msg)}. Ignoring."
    )

    []
  end
end