lib/membrane_rtc_engine/engine.ex

defmodule Membrane.RTC.Engine do
  @moduledoc """
  RTC Engine implementation.

  RTC Engine is an abstraction layer responsible for linking together different types of Endpoints.
  From the implementation point of view, RTC Engine is a `Membrane.Pipeline`.

  ## Messages

  The RTC Engine works by sending messages which notify user logic about important events.
  To receive RTC Engine messages you have to register your process so that RTC Engine will
  know where to send them.
  All messages RTC Engine can emit are described in `#{inspect(__MODULE__)}.Message` docs.

  ### Registering for messages

  Registration can be done using `register/2` e.g.

  ```elixir
  Engine.register(rtc_engine, self())
  ```

  This will register your process to receive RTC Engine messages.
  You can register multiple processes to receive messages from an RTC Engine instance.
  In such a case each message will be sent to each registered process.

  ## Endpoints

  Endpoints are `Membrane.Bin`s able to publish their own tracks and subscribe to tracks from other Endpoints.
  One can think about Endpoint as an entity responsible for handling some specific task.
  An Endpoint can be added and removed using `add_endpoint/3` and `remove_endpoint/2` respectively.

  There are two types of Endpoints:
  * Standalone Endpoints - they are in most cases spawned only once per RTC Engine instance and they are not associated with any client.
  * Client Endpoints - they are associated with some client.
  Associating Endpoint with client will send some Media Events to the Endpoint's Client Library
  e.g. one which indicates which tracks belong to which client.

  Currently RTC Engine ships with the implementation of the following Endpoints:
  * `#{inspect(__MODULE__)}.Endpoint.WebRTC` which is responsible for establishing a connection with some WebRTC
  client (mainly browser) and exchanging media with it. WebRTC Endpoint is a Client Endpoint.
  * `#{inspect(__MODULE__)}.Endpoint.HLS` which is responsible for receiving media tracks from all other Endpoints and
  saving them to files by creating HLS playlists. HLS Endpoint is a Standalone Endpoint.
  * `#{inspect(__MODULE__)}.Endpoint.RTSP` which is responsible for connecting to a remote RTSP stream source and
  sending the appropriate media track to other Endpoints. RTSP Endpoint is a Standalone Endpoint.
  * `#{inspect(__MODULE__)}.Endpoint.File` which is responsible for reading track from a file, payloading it into RTP, and
  sending it to other Endpoints. File Endpoint is a Standalone Endpoint.
  * `#{inspect(__MODULE__)}.Endpoint.SIP` which is responsible for establishing a connection with some SIP
  device (e.g. phone) and exchanging media with it. SIP Endpoint is a Standalone Endpoint.
  * `#{inspect(__MODULE__)}.Endpoint.Recording` which is responsible for saving incoming tracks to pointed storages.
  Recording Endpoint is a Standalone Endpoint.

  Each of these endpoints is available as a separate package. Refer to the appropriate package for usage examples:
  * [`membrane_rtc_engine_webrtc`](https://hexdocs.pm/membrane_rtc_engine_webrtc/readme.html)
  * [`membrane_rtc_engine_hls`](https://hexdocs.pm/membrane_rtc_engine_hls/readme.html)
  * [`membrane_rtc_engine_rtsp`](https://hexdocs.pm/membrane_rtc_engine_rtsp/readme.html)
  * [`membrane_rtc_engine_file`](https://hexdocs.pm/membrane_rtc_engine_file/readme.html)
  * [`membrane_rtc_engine_sip`](https://hexdocs.pm/membrane_rtc_engine_sip/readme.html)
  * [`membrane_rtc_engine_recording`](https://hexdocs.pm/membrane_rtc_engine_recording/readme.html)

  User can also implement custom Endpoints, see Custom Endpoints guide.

  ### Readiness state
  Each endpoint is presumed to be initially inactive and has to declare itself ready to fully join the Engine.

  Before it does, it:
  * will not receive notifications about other endpoints and their metadata
  * will not receive information about tracks
  * will not be able to publish any tracks
  * will not be able to update their metadata

  When declaring itself as ready, the endpoint also has an opportunity to set their metadata.
  To mark the endpoint as active, it has to send the `t:ready_action_t/0`.

  **Example**
  ```elixir
  @impl true
  def handle_other({:media_event, %{type: "join", metadata: metadata}}, _context, state) do
    {{:ok, notify: {:ready, metadata}}, state}
  end

  @impl true
  def handle_other(:ready, _context, state) do
    Membrane.Logger.debug("Succesfully activated the endpoint")
    {:ok, state}
  end
  ```
  """

  use Membrane.Pipeline

  import Membrane.RTC.Utils

  require Membrane.Logger

  alias Membrane.RTC.Engine.{
    DisplayManager,
    Endpoint,
    FilterTee,
    Message,
    Subscription,
    Tee,
    Track
  }

  alias Membrane.RTC.Engine.Notifications.TrackNotification

  alias Membrane.RTC.Engine.Exception.{PublishTrackError, TrackReadyError}

  @registry_name Membrane.RTC.Engine.Registry.Dispatcher

  @typedoc """
  RTC Engine configuration options.

  * `id` is used by logger. If not provided it will be generated.
  * `display_manager?` - set to `true` if you want to limit number of tracks sent from `#{inspect(__MODULE__)}.Endpoint.WebRTC` to a browser.
  * `toilet_capacity` - sets capacity of buffer between engine and endpoints. Use it when you expect bursts of data for your tracks. If not provided it will be set to 200.
  """

  @type options_t() :: [
          id: String.t(),
          trace_ctx: map(),
          display_manager?: boolean(),
          toilet_capacity: pos_integer() | nil
        ]

  defmodule State do
    @moduledoc false

    use Bunch.Access

    @enforce_keys [:id, :component_path, :trace_context]
    defstruct @enforce_keys ++
                [
                  endpoints: %{},
                  pending_endpoints: %{},
                  pending_subscriptions: [],
                  subscriptions: %{},
                  display_manager: nil,
                  toilet_capacity: 200
                ]

    @type t() :: %__MODULE__{
            id: String.t(),
            component_path: String.t(),
            trace_context: map(),
            display_manager: pid() | nil,
            endpoints: %{Endpoint.id() => Endpoint.t()},
            pending_endpoints: %{Endpoint.id() => Endpoint.t()},
            subscriptions: %{Endpoint.id() => %{Track.id() => Subscription.t()}},
            pending_subscriptions: [Subscription.t()],
            toilet_capacity: pos_integer()
          }
  end

  @typedoc """
  Endpoint configuration options.

  * `id` - assigned endpoint id. If not provided it will be generated by RTC Engine.
  * `node` - node on which endpoint should be spawned. If not provided, current node is used.
  """
  @type endpoint_options_t() :: [
          id: String.t(),
          node: node()
        ]

  @typedoc """
  Subscription options.
  """
  @type subscription_opts_t() :: Keyword.t()

  @typedoc """
  Membrane action that will cause RTC Engine to publish some message to all other endpoints.
  """
  @type publish_action_t() :: {:notify_parent, {:publish, publish_message_t()}}

  @typedoc """
  Membrane action that will mark the endpoint as ready and set its metadata.
  The Engine will respond with `t:ready_ack_msg_t/0` to acknowledge your transition to ready state.

  This action can only be used once, any further calls by an endpoint will be ignored.
  """
  @type ready_action_t() :: {:notify_parent, :ready | {:ready, metadata :: any()}}

  @typedoc """
  Membrane action that informs engine that endpoint finished processing and should be removed.
  """
  @type finished_action_t() :: {:notify_parent, :finished}

  @typedoc """
  A message that the Engine sends to the endpoint when it ackowledges its `t:ready_action_t/0`
  """
  @type ready_ack_msg_t() :: {:ready, other_endpoints :: [Endpoint.t()]}

  @typedoc """
  Membrane action that will cause RTC Engine to forward supplied message to the business logic.
  """
  @type forward_to_parent_action_t() :: {:notify_parent, {:forward_to_parent, message :: any()}}

  @typedoc """
  Membrane action that will inform RTC Engine about track readiness.
  """
  @type track_ready_action_t() ::
          {:notify_parent, {:track_ready, Track.id(), Track.variant(), Track.encoding()}}

  @typedoc """
  Membrane action that will inform RTC Engine about enabling track variant by the client.
  """
  @type enable_track_variant_action_t() ::
          {:notify_parent, {:enable_track_variant, Track.id(), Track.variant()}}

  @typedoc """
  Membrane action that will inform RTC Engine about disabling track variant by the client.
  """
  @type disable_track_variant_action_t() ::
          {:notify_parent, {:disable_track_variant, Track.id(), Track.variant()}}

  @typedoc """
  Types of messages that can be published to other Endpoints.
  """
  @type publish_message_t() ::
          {:new_tracks, [Track.t()]}
          | {:removed_tracks, [Track.t()]}
          | {:track_metadata_updated, metadata :: any()}
          | {:enable_track_variant, Track.id(), Track.variant()}
          | {:disable_track_variant, Track.id(), Track.variant()}
          | {:endpoint_metadata_updated, metadata :: any()}
          | {:tracks_priority, tracks :: list()}
          | TrackNotification.t()

  @typedoc """
  Type of messages that need to be handled by each endpoint.
  """
  @type published_message_t() ::
          {:new_tracks, [Track.t()]}
          | {:removed_tracks, [Track.t()]}
          | {:new_endpoint, Endpoint.t()}
          | {:endpoint_removed, Endpoint.id()}
          | {:track_metadata_updated, Track.t()}
          | {:track_variant_enabled, Track.t(), Track.variant()}
          | {:track_variant_disabled, Track.t(), Track.variant()}
          | {:endpoint_metadata_updated, Endpoint.t()}
          | {:tracks_priority, tracks :: list()}
          | ready_ack_msg_t()
          | TrackNotification.t()

  @spec start(options :: options_t(), process_options :: GenServer.options()) ::
          GenServer.on_start()
  def start(options, process_options) do
    do_start(:start, options, process_options)
  end

  @spec start_link(options :: options_t(), process_options :: GenServer.options()) ::
          GenServer.on_start()
  def start_link(options, process_options) do
    do_start(:start_link, options, process_options)
  end

  @doc """
  Terminates the engine.

  Accepts three options:
    * `asynchronous?` - if set to `true`, pipline termination won't be blocking and
      will be executed in the process, which pid is returned as function result. If
      set to `false`, engine termination will be blocking and will be executed in
      the process that called this function. Defaults to `false`.
    * `timeout` - tells how much time (ms) to wait for engine to get gracefully
      terminated. Defaults to 5000.
    * `force?` - if set to `true` and engine is still alive after `timeout`,
      engine will be killed using `Process.exit/2` with reason `:kill`, and function
      will return `{:error, :timeout}`. If set to `false` and engine is still alive
      after `timeout`, function will raise an error. Defaults to `false`.

  Returns:
    * `{:ok, pid}` - if option `asynchronous?: true` was passed.
    * `:ok` - if engine was gracefully terminated within `timeout`.
    * `{:error, :timeout}` - if engine was killed after a `timeout`.
  """
  @spec terminate(pid,
          timeout: timeout(),
          force?: boolean(),
          asynchronous?: boolean()
        ) :: :ok | {:ok, pid()} | {:error, :timeout}
  def terminate(engine, opts \\ []) do
    Membrane.Pipeline.terminate(engine, opts)
  end

  defp do_start(func, options, process_options) when func in [:start, :start_link] do
    id = options[:id] || "#{UUID.uuid4()}"
    display_manager? = options[:display_manager?] || false
    options = Keyword.put(options, :id, id)
    options = Keyword.put(options, :display_manager?, display_manager?)
    Membrane.Logger.info("Starting a new RTC Engine instance with id: #{id}")

    with {:ok, _supervisor, pipeline} <-
           apply(Membrane.Pipeline, func, [__MODULE__, options, process_options]) do
      {:ok, pipeline}
    end
  end

  @spec get_registry_name() :: atom()
  def get_registry_name(), do: @registry_name

  @doc """
  Adds endpoint to the RTC Engine

  For more information refer to `t:endpoint_options_t/0`.
  """
  @spec add_endpoint(
          pid :: pid(),
          endpoint :: Membrane.ChildrenSpec.child_definition(),
          opts :: endpoint_options_t()
        ) :: :ok | :error
  def add_endpoint(pid, endpoint, opts \\ []) do
    send(pid, {:add_endpoint, endpoint, opts})
    :ok
  end

  @doc """
  Removes endpoint from the RTC Engine
  """
  @spec remove_endpoint(
          pid :: pid(),
          id :: String.t()
        ) :: :ok
  def remove_endpoint(rtc_engine, id) do
    send(rtc_engine, {:remove_endpoint, id})
    :ok
  end

  @doc """
  Registers process with pid `who` for receiving messages from RTC Engine
  """
  @spec register(rtc_engine :: pid(), who :: pid()) :: :ok
  def register(rtc_engine, who \\ self()) do
    send(rtc_engine, {:register, who})
    :ok
  end

  @doc """
  Unregisters process with pid `who` from receiving messages from RTC Engine
  """
  @spec unregister(rtc_engine :: pid(), who :: pid()) :: :ok
  def unregister(rtc_engine, who \\ self()) do
    send(rtc_engine, {:unregister, who})
    :ok
  end

  @doc """
  Sends message to RTC Engine endpoint.
  If endpoint doesn't exist message is ignored
  """
  @spec message_endpoint(rtc_engine :: pid(), endpoint_id :: String.t(), message :: any()) ::
          :ok
  def message_endpoint(rtc_engine, endpoint_id, message) do
    send(rtc_engine, {:message_endpoint, {:endpoint, endpoint_id}, message})
    :ok
  end

  @doc """
  Returns list of the RTC Engine's endpoints.
  """
  @spec get_endpoints(rtc_engine :: pid()) :: [%{id: String.t(), type: atom()}]
  def get_endpoints(rtc_engine) do
    Pipeline.call(rtc_engine, :get_endpoints)
  end

  @doc """
  Returns list of the RTC Engine's tracks.
  """
  @spec get_tracks(rtc_engine :: pid()) :: [Track.t()]
  def get_tracks(rtc_engine) do
    Pipeline.call(rtc_engine, :get_tracks)
  end

  @doc """
  Returns number of forwarded tracks in RTC Engine.

  It is number of active and pending subscriptions.
  """
  @spec get_num_forwarded_tracks(rtc_engine :: pid()) :: integer()
  def get_num_forwarded_tracks(rtc_engine) do
    Pipeline.call(rtc_engine, :get_num_forwarded_tracks)
  end

  @doc """
  Subscribes an endpoint for a track.

  The endpoint will be notified about track readiness in `c:Membrane.Bin.handle_pad_added/3` callback.
  `endpoint_id` is the id of the endpoint, which wants to subscribe to the track. Possible return values are:
  * `:ok` - when endpoint subscribed on track successfully
  * `:ignored` - when subscribing was impossible because the state of the engine changed e.g:
  the track was already removed, or subscribing endpoint was removed
  """
  @spec subscribe(
          rtc_engine :: pid(),
          endpoint_id :: String.t(),
          track_id :: Track.id(),
          opts :: subscription_opts_t
        ) :: :ok | :ignored
  def subscribe(rtc_engine, endpoint_id, track_id, opts \\ []) do
    ref = make_ref()

    send(rtc_engine, {:subscribe, {self(), ref}, endpoint_id, track_id, opts})

    receive do
      {^ref, :ok} ->
        :ok

      {^ref, {:error, :invalid_track_id}} ->
        Membrane.Logger.debug("""
        Couldn't subscribe endpoint #{endpoint_id} to the track: #{track_id} (no such track). Ignoring.
        """)

        :ignored

      {^ref, {:error, :endpoint_terminating}} ->
        Membrane.Logger.debug("""
        Couldn't subscribe to the track: #{track_id} because endpoint #{endpoint_id} is already removed. Ignoring.
        """)

        :ignored

      {^ref, {:error, :endpoint_not_exist}} ->
        Membrane.Logger.error(%{
          error: :endpoint_not_exist,
          message: "Couldn't subscribe to track",
          track_id: track_id
        })

        raise "Couldn't subscribe to the track: #{inspect(track_id)}, because endpoint #{endpoint_id} doesn't exist."
    after
      5_000 ->
        Membrane.Logger.error(%{
          error: :timeout,
          message: "Couldn't subscribe to track",
          track_id: track_id
        })

        raise "Couldn't subscribe to the track: #{inspect(track_id)}, because of timeout."
    end
  end

  @impl true
  def handle_init(_ctx, options) do
    Logger.metadata(rtc_engine: options[:id])

    display_manager =
      if options[:display_manager?] do
        {:ok, pid} = DisplayManager.start_link(ets_name: options[:id], engine: self())
        pid
      else
        nil
      end

    toilet_capacity = options[:toilet_capacity] || 200
    if toilet_capacity < 0, do: raise("toilet_capacity has to be a positive integer")

    {[],
     %State{
       id: options[:id],
       component_path: Membrane.ComponentPath.get_formatted(),
       trace_context: options[:trace_ctx],
       display_manager: display_manager,
       toilet_capacity: toilet_capacity
     }}
  end

  @impl true
  def handle_terminate_request(ctx, state) do
    {actions, state} =
      Enum.flat_map_reduce(state.endpoints, state, fn {endpoint_id, _endpoint}, state ->
        {_status, actions, state} = handle_remove_endpoint(endpoint_id, ctx, state)
        {actions, state}
      end)

    {actions ++ [terminate: :normal], state}
  end

  @impl true
  def handle_info({:add_endpoint, endpoint, opts}, _ctx, state) do
    endpoint_id = opts[:id] || UUID.uuid4()
    opts = Keyword.put(opts, :id, endpoint_id)

    if Map.has_key?(state.endpoints, endpoint_id) do
      Membrane.Logger.warning(
        "Cannot add Endpoint with id #{inspect(endpoint_id)} as it already exists"
      )

      {[], state}
    else
      handle_add_endpoint(endpoint, opts, state)
    end
  end

  @impl true
  def handle_info({:remove_endpoint, id}, ctx, state) do
    case handle_remove_endpoint(id, ctx, state) do
      {:absent, [], state} ->
        Membrane.Logger.info("Endpoint #{inspect(id)} already removed")
        {[], state}

      {{:present, endpoint}, actions, state} ->
        dispatch(%Message.EndpointRemoved{endpoint_id: id, endpoint_type: endpoint.type})
        {actions, state}
    end
  end

  @impl true
  def handle_info({:register, pid}, _ctx, state) do
    Registry.register(get_registry_name(), self(), pid)
    {[], state}
  end

  @impl true
  def handle_info({:unregister, pid}, _ctx, state) do
    Registry.unregister_match(get_registry_name(), self(), pid)
    {[], state}
  end

  @impl true
  def handle_info(
        {:subscribe, {endpoint_pid, ref}, endpoint_id, track_id, opts},
        ctx,
        state
      ) do
    subscription = %Subscription{
      endpoint_id: endpoint_id,
      track_id: track_id,
      opts: opts
    }

    case validate_subscription(subscription, ctx, state) do
      :ok ->
        {spec, state} = fulfill_or_postpone_subscription(subscription, ctx, state)
        send(endpoint_pid, {ref, :ok})
        Membrane.Logger.debug("Subscription fulfilled by #{endpoint_id} on track: #{track_id}")
        {[spec: {spec, log_metadata: [rtc: state.id]}], state}

      {:error, _reason} = error ->
        send(endpoint_pid, {ref, error})
        {[], state}
    end
  end

  @impl true
  def handle_info({:track_priorities, endpoint_to_tracks}, ctx, state) do
    endpoint_msg_actions =
      for {endpoint, tracks} <- endpoint_to_tracks do
        {:notify_child, {endpoint, {:tracks_priority, tracks}}}
      end

    tee_actions =
      ctx
      |> filter_children(pattern: {:tee, _tee_name})
      |> Enum.flat_map(&[notify_child: {&1, :track_priorities_updated}])

    {endpoint_msg_actions ++ tee_actions, state}
  end

  @impl true
  def handle_info({:message_endpoint, endpoint, message}, ctx, state) do
    actions =
      if find_child(ctx, pattern: ^endpoint) != nil do
        [notify_child: {endpoint, message}]
      else
        Membrane.Logger.warning(
          "Message #{inspect(message)} sent to non exisiting endpoint #{inspect(endpoint)}. Ignoring."
        )

        []
      end

    {actions, state}
  end

  @impl true
  def handle_call(:get_endpoints, ctx, state) do
    endpoints =
      ctx.children
      |> Map.values()
      |> Enum.filter(fn child -> match?({:endpoint, _id}, child.name) end)
      |> Enum.map(fn endpoint ->
        {:endpoint, id} = endpoint.name
        %{id: id, type: endpoint.module}
      end)

    {[reply: endpoints], state}
  end

  @impl true
  def handle_call(:get_num_forwarded_tracks, _ctx, state) do
    forwarded_tracks = Map.values(state.subscriptions) |> Enum.flat_map(& &1) |> Enum.count()
    pending_forwarded_tracks = Enum.count(state.pending_subscriptions)
    {[reply: forwarded_tracks + pending_forwarded_tracks], state}
  end

  @impl true
  def handle_call(:get_tracks, _ctx, state) do
    tracks =
      state.endpoints
      |> Map.values()
      |> Enum.flat_map(&Endpoint.get_tracks/1)

    {[reply: tracks], state}
  end

  @impl true
  def handle_child_notification(
        notification,
        {:endpoint, endpoint_id},
        ctx,
        %{endpoints: endpoints, pending_endpoints: pending_endpoints} = state
      ) do
    handle_notification? =
      Map.has_key?(endpoints, endpoint_id) or
        (Map.has_key?(pending_endpoints, endpoint_id) and
           endpoint_ready_notification?(notification))

    if handle_notification? do
      handle_endpoint_notification(notification, endpoint_id, ctx, state)
    else
      Membrane.Logger.warning(
        "Ignoring notification: #{inspect(notification)}, endpoint: #{endpoint_id} is not ready"
      )

      {[], state}
    end
  end

  @impl true
  def handle_crash_group_down(endpoint_id, ctx, state) do
    case handle_remove_endpoint(endpoint_id, ctx, state) do
      {{:present, endpoint}, actions, state} ->
        dispatch(%Message.EndpointCrashed{
          endpoint_id: endpoint_id,
          endpoint_type: endpoint.type,
          reason: Map.get(ctx, :crash_reason)
        })

        {actions, state}

      {:absent, actions, state} ->
        Membrane.Logger.warning("Endpoint #{endpoint_id} crashed after removing from the state")
        {actions, state}
    end
  end

  @impl true
  def handle_child_pad_removed(_child, _pad, _ctx, state) do
    {[], state}
  end

  #
  # Endpoint Notifications
  #
  # - handle_endpoint_notification/4: Handles incoming notifications from an Endpoint, usually
  #   the WebRTC endpoint. Handles track_ready, publication of new tracks, and publication of
  #   removed tracks. Also forwards custom media events.
  #

  defp handle_endpoint_notification(:finished, endpoint_id, ctx, state) do
    Membrane.Logger.debug("Endpoint: #{endpoint_id} marked itself for removal. Trying to remove.")

    case handle_remove_endpoint(endpoint_id, ctx, state) do
      {{:present, endpoint}, actions, new_state} ->
        dispatch(%Message.EndpointRemoved{endpoint_id: endpoint_id, endpoint_type: endpoint.type})

        Membrane.Logger.debug("Endpoint #{endpoint_id} successfully removed.")

        {actions, new_state}

      {:absent, actions, new_state} ->
        Membrane.Logger.warning(
          "Endpoint #{endpoint_id} marked itself for removal but it has already been removed."
        )

        {actions, new_state}
    end
  end

  defp handle_endpoint_notification(:ready, endpoint_id, ctx, state) do
    handle_endpoint_notification({:ready, nil}, endpoint_id, ctx, state)
  end

  defp handle_endpoint_notification({:ready, metadata}, endpoint_id, _ctx, state) do
    if Map.has_key?(state.pending_endpoints, endpoint_id) do
      {new_endpoint, state} = pop_in(state, [:pending_endpoints, endpoint_id])
      new_endpoint = %{new_endpoint | metadata: metadata}

      other_endpoints = Map.values(state.endpoints)

      new_endpoint_notifications =
        state.endpoints
        |> Map.keys()
        |> Enum.map(&{:notify_child, {{:endpoint, &1}, {:new_endpoint, new_endpoint}}})

      active_tracks = get_active_tracks(state.endpoints)

      new_tracks_notification =
        if active_tracks == [] do
          []
        else
          [notify_child: {{:endpoint, endpoint_id}, {:new_tracks, active_tracks}}]
        end

      actions =
        [
          notify_child: {{:endpoint, endpoint_id}, {:ready, other_endpoints}}
        ] ++ new_tracks_notification ++ new_endpoint_notifications

      state =
        state
        |> put_in([:endpoints, endpoint_id], new_endpoint)
        |> put_in([:subscriptions, endpoint_id], %{})

      dispatch(%Message.EndpointMetadataUpdated{
        endpoint_id: endpoint_id,
        endpoint_metadata: metadata
      })

      {actions, state}
    else
      Membrane.Logger.warning(
        "Endpoint #{endpoint_id} sent a `:ready` message even though it has been already accepted. Ignoring."
      )

      {[], state}
    end
  end

  defp handle_endpoint_notification({:forward_to_parent, message}, endpoint_id, _ctx, state) do
    endpoint =
      Map.get(state.endpoints, endpoint_id) || Map.fetch!(state.pending_endpoints, endpoint_id)

    dispatch(%Message.EndpointMessage{
      endpoint_id: endpoint_id,
      endpoint_type: endpoint.type,
      message: message
    })

    {[], state}
  end

  defp handle_endpoint_notification(
         {:update_endpoint_metadata, metadata},
         endpoint_id,
         _ctx,
         state
       ) do
    endpoint = Map.get(state.endpoints, endpoint_id)

    if endpoint.metadata != metadata do
      updated_endpoint = %{endpoint | metadata: metadata}
      state = put_in(state, [:endpoints, endpoint_id], updated_endpoint)

      actions =
        state.endpoints
        |> Map.keys()
        |> Enum.map(
          &{:notify_child, {{:endpoint, &1}, {:endpoint_metadata_updated, updated_endpoint}}}
        )

      dispatch(%Message.EndpointMetadataUpdated{
        endpoint_id: endpoint_id,
        endpoint_metadata: metadata
      })

      {actions, state}
    else
      {[], state}
    end
  end

  defp handle_endpoint_notification(
         {:update_track_metadata, track_id, track_metadata},
         endpoint_id,
         _ctx,
         state
       ) do
    with {:ok, endpoint} <- Map.fetch(state.endpoints, endpoint_id),
         track when track != nil <- Endpoint.get_track_by_id(endpoint, track_id),
         true <- track.metadata != track_metadata do
      endpoint = Endpoint.update_track_metadata(endpoint, track_id, track_metadata)
      state = put_in(state, [:endpoints, endpoint_id], endpoint)

      actions =
        prepare_track_notifications(
          state.subscriptions,
          state.pending_subscriptions,
          track_id,
          {:track_metadata_updated, Endpoint.get_track_by_id(endpoint, track_id)}
        )

      dispatch(%Message.TrackMetadataUpdated{
        endpoint_id: endpoint_id,
        track_id: track_id,
        track_metadata: track_metadata
      })

      {actions, state}
    else
      _other ->
        {[], state}
    end
  end

  defp handle_endpoint_notification(
         {:enable_track_variant, track_id, variant},
         endpoint_id,
         _ctx,
         state
       ) do
    with {:ok, endpoint} <- Map.fetch(state.endpoints, endpoint_id),
         track when track != nil <- Endpoint.get_track_by_id(endpoint, track_id),
         true <- variant in track.disabled_variants do
      endpoint =
        Endpoint.update_track_disabled_variants(
          endpoint,
          track_id,
          Enum.reject(track.disabled_variants, &(&1 == variant))
        )

      state = put_in(state, [:endpoints, endpoint_id], endpoint)

      actions =
        prepare_track_notifications(
          state.subscriptions,
          state.pending_subscriptions,
          track_id,
          {:track_variant_enabled, Endpoint.get_track_by_id(endpoint, track_id), variant}
        )

      {actions, state}
    else
      _other ->
        {[], state}
    end
  end

  defp handle_endpoint_notification(
         {:disable_track_variant, track_id, variant},
         endpoint_id,
         _ctx,
         state
       ) do
    with {:ok, endpoint} <- Map.fetch(state.endpoints, endpoint_id),
         track when track != nil <- Endpoint.get_track_by_id(endpoint, track_id),
         true <- variant not in track.disabled_variants do
      endpoint =
        Endpoint.update_track_disabled_variants(
          endpoint,
          track_id,
          [variant | track.disabled_variants]
        )

      state = put_in(state, [:endpoints, endpoint_id], endpoint)

      actions =
        prepare_track_notifications(
          state.subscriptions,
          state.pending_subscriptions,
          track_id,
          {:track_variant_disabled, Endpoint.get_track_by_id(endpoint, track_id), variant}
        )

      {actions, state}
    else
      _other ->
        {[], state}
    end
  end

  defp handle_endpoint_notification(
         {:publish, %TrackNotification{track_id: track_id} = notification},
         endpoint_id,
         _ctx,
         state
       ) do
    subscribed_endpoints =
      state.subscriptions
      |> Map.values()
      |> Enum.flat_map(&Map.values/1)
      |> Enum.filter(&(&1.track_id == track_id))
      |> Enum.map(& &1.endpoint_id)

    message_actions =
      Enum.map(subscribed_endpoints, &{:notify_child, {{:endpoint, &1}, notification}})

    if Map.has_key?(state.endpoints[endpoint_id].inbound_tracks, track_id) do
      {message_actions, state}
    else
      Membrane.Logger.error("""
      Non-owner attempted to send a notification about the track. It is being ignored.
      Offending endpoint: #{inspect(endpoint_id)}
      TrackId: #{track_id}
      notification: #{inspect(notification)}
      """)
    end
  end

  defp handle_endpoint_notification(
         {:track_ready, track_id, variant, encoding},
         endpoint_id,
         _ctx,
         state
       ) do
    Membrane.Logger.info(
      "New incoming #{encoding} track #{track_id} (variant: #{variant}) from endpoint #{inspect(endpoint_id)}"
    )

    track = get_in(state, [:endpoints, endpoint_id]) |> Endpoint.get_track_by_id(track_id)

    if variant not in track.variants do
      raise TrackReadyError, track: track, variant: variant
    end

    track_link = build_track_link(variant, track, endpoint_id, state)

    # check if there are subscriptions for this track and fulfill them
    {subscriptions, pending_subscriptions} =
      Enum.split_with(state.pending_subscriptions, &(&1.track_id == track_id))

    {subscription_links, state} = fulfill_subscriptions(subscriptions, state)

    links = [track_link] ++ subscription_links
    state = %{state | pending_subscriptions: pending_subscriptions}

    state =
      update_in(
        state,
        [:endpoints, endpoint_id],
        &Endpoint.update_track_encoding(&1, track_id, encoding)
      )

    spec =
      {links,
       group: endpoint_id, crash_group_mode: :temporary, log_metadata: [rtc_engine: state.id]}

    {[spec: spec], state}
  end

  defp handle_endpoint_notification(
         {:publish, {:new_tracks, tracks}},
         endpoint_id,
         _ctx,
         state
       ) do
    Enum.each(tracks, &validate_track(&1))

    id_to_track = Map.new(tracks, &{&1.id, &1})

    state =
      update_in(
        state,
        [:endpoints, endpoint_id, :inbound_tracks],
        &Map.merge(&1, id_to_track)
      )

    Enum.each(
      tracks,
      &dispatch(%Message.TrackAdded{
        endpoint_id: endpoint_id,
        endpoint_type: state.endpoints[endpoint_id].type,
        track_id: &1.id,
        track_type: &1.type,
        track_encoding: &1.encoding,
        track_metadata: &1.metadata
      })
    )

    tracks_msgs = build_track_added_actions(tracks, endpoint_id, state)
    {tracks_msgs, state}
  end

  defp handle_endpoint_notification(
         {:publish, {:removed_tracks, tracks}},
         endpoint_id,
         ctx,
         state
       ) do
    track_ids = Enum.map(tracks, & &1.id)

    state =
      update_in(
        state,
        [:endpoints, endpoint_id, :inbound_tracks],
        &Map.drop(&1, track_ids)
      )

    tracks_msgs = build_track_removed_actions(tracks, endpoint_id, ctx, state)

    track_tees = tracks |> Enum.map(&get_track_tee(&1.id, ctx)) |> Enum.reject(&is_nil(&1))
    tees_msgs = if Enum.empty?(track_tees), do: [], else: [remove_children: track_tees]

    state = cleanup_subscriptions(track_ids, state)

    Enum.each(
      tracks,
      &dispatch(%Message.TrackRemoved{
        endpoint_id: endpoint_id,
        endpoint_type: state.endpoints[endpoint_id].type,
        track_id: &1.id,
        track_type: &1.type,
        track_encoding: &1.encoding
      })
    )

    {tracks_msgs ++ tees_msgs, state}
  end

  defp validate_track(track) do
    variants = MapSet.new(track.variants)
    supported_variants = Track.supported_variants() |> MapSet.new()

    cond do
      variants == MapSet.new([]) ->
        raise PublishTrackError, track: track

      track.type == :audio and not MapSet.equal?(variants, MapSet.new([:high])) ->
        raise PublishTrackError, track: track

      MapSet.subset?(variants, supported_variants) == false ->
        raise PublishTrackError, track: track

      true ->
        :ok
    end
  end

  #
  # Endpoint Management
  #
  # - handle_add_endpoint/3: Adds a new Endpoint based on the entry provided. Part of the
  #   implementation for the public API.
  #
  # - handle_remove_endpoint/3: Removes the given Endpoint. Part of the implementation for the
  #   public API.
  #
  # - get_active_tracks/1: Helper function for add_endpoint/3. Returns a list of Tracks that can
  #   be provided to the newly added Endpoint straight away.
  #
  # - find_children_for_endpoint/2: Convenience function to identify all Elements owned by an
  #   Endpoint, via its Tracks.
  #
  # - get_track_tee/2: Convenience function to get tee for given track
  #

  defp handle_add_endpoint(endpoint_entry, opts, state) do
    if Keyword.has_key?(opts, :endpoint_id) or Keyword.has_key?(opts, :peer_id) do
      raise("`:endpoint_id` and `:peer_id` options were removed. Use `:id` option instead.")
    end

    endpoint_id = Keyword.fetch!(opts, :id)
    endpoint_name = {:endpoint, endpoint_id}
    %endpoint_module{} = endpoint_entry

    spec = {
      child(endpoint_name, endpoint_entry),
      node: opts[:node],
      group: endpoint_id,
      crash_group_mode: :temporary,
      log_metadata: [rtc_engine: state.id]
    }

    display_manager_message =
      if state.display_manager != nil,
        do: [notify_child: {endpoint_name, {:display_manager, state.display_manager}}],
        else: []

    actions = [spec: spec] ++ display_manager_message

    endpoint = Endpoint.new(endpoint_id, endpoint_module, [])

    state =
      state
      |> put_in([:subscriptions, endpoint_id], %{})
      |> put_in([:pending_endpoints, endpoint_id], endpoint)

    dispatch(%Message.EndpointAdded{endpoint_id: endpoint_id, endpoint_type: endpoint_module})
    {actions, state}
  end

  defp handle_remove_endpoint(endpoint_id, ctx, state) do
    cond do
      Map.has_key?(state.endpoints, endpoint_id) ->
        {endpoint, state} = pop_in(state, [:endpoints, endpoint_id])

        # Clean up subscriptions of the endpoint being removed
        state =
          state
          |> update_in([:subscriptions], &Map.delete(&1, endpoint_id))
          |> update_in([:pending_subscriptions], fn subscriptions ->
            Enum.reject(subscriptions, &(&1.endpoint_id == endpoint_id))
          end)

        tracks = Enum.map(Endpoint.get_tracks(endpoint), &%Track{&1 | active?: true})
        track_ids = Enum.map(tracks, & &1.id)
        tracks_msgs = build_track_removed_actions(tracks, endpoint_id, ctx, state)

        # After building TrackRemoved actions, clean up subscriptions of other endpoints
        # which have subscribed to tracks published by the endpoint being removed
        state = cleanup_subscriptions(track_ids, state)

        endpoint_bin = ctx.children[{:endpoint, endpoint_id}]

        endpoint_removed_msgs =
          state.endpoints
          |> Map.keys()
          |> Enum.map(&{:notify_child, {{:endpoint, &1}, {:endpoint_removed, endpoint_id}}})

        if endpoint_bin == nil or endpoint_bin.terminating? do
          {{:present, endpoint}, tracks_msgs ++ endpoint_removed_msgs, state}
        else
          actions = [remove_children: find_children_for_endpoint(endpoint, ctx)]
          {{:present, endpoint}, tracks_msgs ++ endpoint_removed_msgs ++ actions, state}
        end

      Map.has_key?(state.pending_endpoints, endpoint_id) ->
        {pending_endpoint, state} = pop_in(state, [:pending_endpoints, endpoint_id])
        endpoint_bin = ctx.children[{:endpoint, endpoint_id}]

        if endpoint_bin == nil or endpoint_bin.terminating? do
          {{:present, pending_endpoint}, [], state}
        else
          {{:present, pending_endpoint}, [remove_children: {:endpoint, endpoint_id}], state}
        end

      true ->
        {:absent, [], state}
    end
  end

  defp get_active_tracks(endpoints) do
    endpoints
    |> Map.values()
    |> Enum.flat_map(&Endpoint.get_tracks/1)
    |> Enum.filter(& &1.active?)
  end

  defp find_children_for_endpoint(endpoint, ctx) do
    children =
      endpoint
      |> Endpoint.get_tracks()
      |> Enum.map(&get_track_tee(&1.id, ctx))
      |> Enum.reject(&is_nil(&1))

    [endpoint: endpoint.id] ++ children
  end

  defp get_track_tee(track_id, ctx) do
    if Map.has_key?(ctx.children, {:tee, track_id}) do
      {:tee, track_id}
    end
  end

  #
  # Track Actions
  #
  # - build_track_added_actions/3: Called when new tracks were published by the WebRTC endpoint
  #   and the Engine has been notified. Notifies all other Endpoints of the new tracks.
  #
  # - build_track_removed_actions/3: Called when the underlying endpoint was removed (either
  #   normally, or due to crash).
  #

  defp build_track_added_actions(tracks, endpoint_id, state) do
    state.endpoints
    |> Map.delete(endpoint_id)
    |> Map.keys()
    |> Enum.flat_map(fn endpoint_id ->
      [notify_child: {{:endpoint, endpoint_id}, {:new_tracks, tracks}}]
    end)
  end

  defp build_track_removed_actions(tracks, from_endpoint_id, ctx, state) do
    state.endpoints
    |> Stream.reject(&(elem(&1, 0) == from_endpoint_id))
    |> Stream.reject(&is_nil(elem(&1, 1)))
    |> Stream.filter(fn {endpoint_id, _endpoint} ->
      Map.has_key?(ctx.children, {:endpoint, endpoint_id})
    end)
    |> Enum.flat_map(fn {endpoint_id, _endpoint} ->
      tracks = Enum.filter(tracks, &subscribed?(endpoint_id, &1.id, state))

      if Enum.empty?(tracks),
        do: [],
        else: [notify_child: {{:endpoint, endpoint_id}, {:remove_tracks, tracks}}]
    end)
  end

  #
  # Track Links
  #
  # - build_track_link/4 - called when the track is ready, via notification from the WebRTC
  #   endpoint. Creates the link from the endpoint which published the track, and starts the
  #   underlying tee which is required to bring the content of the track to all subscribers.
  #
  # - build_track_tee/4 - Called by build_track_link/4; builds the correct tee depending on
  #   display manager is turned on or off
  #

  defp build_track_link(variant, track, endpoint_id, state) do
    get_child({:endpoint, endpoint_id})
    |> via_out(Pad.ref(:output, {track.id, variant}))
    |> via_in(Pad.ref(:input, {track.id, variant}),
      toilet_capacity: state.toilet_capacity
    )
    |> child({:tee, track.id}, build_track_tee(track.id, variant, track, state),
      get_if_exists: true
    )
  end

  defp build_track_tee(track_id, _variant, track, %{display_manager: dm} = state)
       when dm != nil do
    %FilterTee{
      ets_name: state.id,
      track_id: track_id,
      type: track.type,
      codec: track.encoding
    }
  end

  defp build_track_tee(_track_id, _variant, track, _state) do
    %Tee{track: track}
  end

  #
  # Track Subscriptions
  #
  # - validate_subscription/3: Validates proposed subscription, called when a new subscription
  #   is to be added, via handle_info.
  #
  # - fulfill_or_postpone_subscription/3: Called immediately upon validation of subscription,
  #   optimistically links track's tee to the subscriber if the track is ready, otherwise adds the
  #   subscription to the list of pending subscriptions
  #
  # - fulfill_subscriptions/2: Called when a new track is ready and there are pending
  #   subscriptions to the track.
  #
  # - build_subscription_links/1, build_subscription_link/1: Called by fulfill_subscriptions/2,
  #   these functions build the actual links between the tee and the endpoint subscribing for the given track.
  #
  # - cleanup_subscriptions/2: Called after building TrackRemoved actions (when removing a track or
  #   an endpoint), performs the cleanup of :subscriptions and :pending_subscriptions in state.
  #
  # - subscribed?/3: Convenience function. Checks whether a given endpoint is subscribed, or has a
  #   pending subscription, on the given track.
  #
  # - get_track/2: Convenience function. Searches for a Track with the given Track ID which is
  #   owned by one of the Endpoints in the list.
  #

  defp validate_subscription(subscription, ctx, state) do
    # checks whether subscription is correct
    track = get_track(subscription.track_id, state.endpoints)

    endpoint_id = {:endpoint, subscription.endpoint_id}

    endpoint = find_child(ctx, pattern: ^endpoint_id)
    endpoint_in_state? = Map.has_key?(state.endpoints, subscription.endpoint_id)

    cond do
      is_nil(track) -> {:error, :invalid_track_id}
      not endpoint_in_state? and is_nil(endpoint) -> {:error, :endpoint_not_exist}
      not endpoint_in_state? -> {:error, :endpoint_terminating}
      true -> :ok
    end
  end

  defp fulfill_or_postpone_subscription(subscription, ctx, state) do
    # If the tee for this track is already spawned, fulfill subscription.
    # Otherwise, save subscription as pending, we will fulfill it when the tee is linked.

    if Map.has_key?(ctx.children, {:tee, subscription.track_id}) do
      fulfill_subscriptions([subscription], state)
    else
      state = update_in(state, [:pending_subscriptions], &[subscription | &1])
      {[], state}
    end
  end

  defp fulfill_subscriptions(subscriptions, state) do
    links = build_subscription_links(subscriptions, state)

    Enum.reduce(subscriptions, {links, state}, fn subscription, {links, state} ->
      endpoint_id = subscription.endpoint_id
      track_id = subscription.track_id
      subscription = %{subscription | status: :active}
      state = put_in(state, [:subscriptions, endpoint_id, track_id], subscription)
      {links, state}
    end)
  end

  defp build_subscription_links(subscriptions, state) do
    Enum.map(subscriptions, &build_subscription_link(&1, state))
  end

  defp build_subscription_link(subscription, state) do
    get_child({:tee, subscription.track_id})
    |> via_out(Pad.ref(:output, {:endpoint, subscription.endpoint_id}))
    |> via_in(Pad.ref(:input, subscription.track_id),
      toilet_capacity: state.toilet_capacity
    )
    |> get_child({:endpoint, subscription.endpoint_id})
  end

  defp cleanup_subscriptions(removed_track_ids, state) do
    subscriptions =
      Map.new(state.subscriptions, fn {subscriber_id, subscriptions} ->
        {subscriber_id, Map.drop(subscriptions, removed_track_ids)}
      end)

    pending_subscriptions =
      Enum.reject(state.pending_subscriptions, fn subscription ->
        Enum.member?(removed_track_ids, subscription.track_id)
      end)

    %{state | subscriptions: subscriptions, pending_subscriptions: pending_subscriptions}
  end

  defp subscribed?(endpoint_id, track_id, state) do
    subscriptions = state.subscriptions[endpoint_id]

    has_pending_sub? =
      state.pending_subscriptions
      |> Stream.filter(&(&1.endpoint_id == endpoint_id))
      |> Enum.any?(&(&1.track_id == track_id))

    Map.has_key?(subscriptions, track_id) or has_pending_sub?
  end

  defp get_track(track_id, endpoints) do
    endpoints
    |> Map.values()
    |> Enum.flat_map(&Endpoint.get_tracks/1)
    |> Map.new(&{&1.id, &1})
    |> Map.get(track_id)
  end

  defp prepare_track_notifications(subscriptions, pending_subscriptions, track_id, notification) do
    subscriptions
    |> Map.values()
    |> Enum.flat_map(&Map.values/1)
    |> then(&(&1 ++ pending_subscriptions))
    |> Enum.filter(&(&1.track_id == track_id))
    |> Enum.map(& &1.endpoint_id)
    |> Enum.map(&{:notify_child, {{:endpoint, &1}, notification}})
  end

  #
  # Message Dispatch
  #
  # - dispatch/1: Internal function, dispatches the message to all registered processes within the
  #   registry
  #
  # - dispatch/2: Dispatches the Media Event to all registered processes within the registry, with
  #   the correct `to` field populated
  #
  # - brodcast/1: Convenience function, dispatches the data within a Media Event and sets the `to`
  #   field to `:broadcast`.
  #

  defp dispatch(message) do
    Registry.dispatch(get_registry_name(), self(), fn entries ->
      for {_, pid} <- entries, do: send(pid, message)
    end)
  end

  defp endpoint_ready_notification?(:ready), do: true
  defp endpoint_ready_notification?({:ready, _metadata}), do: true
  defp endpoint_ready_notification?(_notification), do: false
end