lib/jellyfish/ws_notifier.ex

defmodule Fishjam.WSNotifier do
  @moduledoc """
  Module defining a process responsible for establishing
  WebSocket connection and receiving events from Fishjam server.

  First, [configure the connection options](README.md#fishjam-connection-configuration).

  ```
  # Start the Notifier
  iex> {:ok, notifier} = Fishjam.WSNotifier.start()
  {:ok, #PID<0.301.0>}
  ```

  ```
  # Subscribe current process to server notifications.
  iex> :ok = Fishjam.WSNotifier.subscribe_server_notifications(notifier)

  # here add a room and a peer using functions from `Fishjam.Room` module
  # you should receive a notification after the peer established connection

  iex> flush()
  {:fishjam, %Fishjam.Notification.PeerConnected{
    room_id: "21604fbe-8ac8-44e6-8474-98b5f50f1863",
    peer_id: "ae07f94e-0887-44c3-81d5-bfa9eac96252"
  }}
  :ok
  ```

  When starting the Notifier, you can provide the name under which the process will be registered.
  ```
  iex> {:ok, notifier} = Fishjam.WSNotifier.start_link(name: Fishjam.WSNotifier)
  ```

  """

  use WebSockex

  require Logger

  alias Fishjam.Utils
  alias Fishjam.{Notification, ServerMessage}

  alias Fishjam.ServerMessage.{
    Authenticated,
    AuthRequest,
    MetricsReport,
    SubscribeRequest,
    SubscribeResponse
  }

  @auth_timeout 2000
  @subscribe_timeout 5000

  @typedoc """
  The reference to the `Notifier` process.
  """
  @type notifier() :: GenServer.server()

  @typedoc """
  Connection options used to connect to Fishjam server.
  """
  @type options() :: [
          server_address: String.t(),
          server_api_token: String.t(),
          secure?: boolean(),
          name: GenServer.name()
        ]

  @doc """
  Starts the Notifier process and connects to Fishjam.

  Acts like `start/1` but links to the calling process.

  See `start/1` for more information.
  """
  @spec start_link(options()) :: {:ok, pid()} | {:error, term()}
  def start_link(opts \\ []) do
    connect(:start_link, opts)
  end

  @doc """
  Starts the Notifier process and connects to Fishjam.

  To learn how to receive notifications, see `subscribe/3`.

  For information about options, see `t:Fishjam.Client.connection_options/0`.
  """
  @spec start(options()) :: {:ok, pid()} | {:error, term()}
  def start(opts \\ []) do
    connect(:start, opts)
  end

  @doc """
  Subscribes the process to receive server notifications from all the rooms.

  Notifications are sent to the process in a form of `{:fishjam, msg}`,
  where `msg` is one of structs defined under "Fishjam.Notification" section in the docs,
  for example `{:fishjam, %Fishjam.Notification.RoomCrashed{room_id: "some_id"}}`.
  """
  @spec subscribe_server_notifications(notifier()) :: :ok | {:error, atom()}
  def subscribe_server_notifications(notifier) do
    WebSockex.cast(notifier, {:subscribe_server_notifications, self()})

    receive do
      {:fishjam, {:subscribe_answer, :ok}} -> :ok
      {:error, _reason} = error -> error
    after
      @subscribe_timeout -> {:error, :timeout}
    end
  end

  @doc """
  Subscribes the process to the WebRTC metrics from all the rooms.

  Metrics are periodically sent to the process in a form of `{:fishjam, metrics_report}`,
  where `metrics_report` is the `Fishjam.MetricsReport` struct.
  """

  @spec subscribe_metrics(notifier()) :: :ok | {:error, :timeout}
  def subscribe_metrics(notifier) do
    WebSockex.cast(notifier, {:subscribe_metrics, self()})

    receive do
      {:fishjam, {:subscribe_answer, :ok}} -> :ok
    after
      @subscribe_timeout -> {:error, :timeout}
    end
  end

  @impl true
  def handle_cast({:subscribe_server_notifications, pid}, state) do
    {request, state} = subscribe_request(:server_notification, pid, state)
    {:reply, {:binary, ServerMessage.encode(request)}, state}
  end

  def handle_cast({:subscribe_metrics, pid}, state) do
    {request, state} = subscribe_request(:metrics, pid, state)
    {:reply, {:binary, ServerMessage.encode(request)}, state}
  end

  @impl true
  def handle_frame({:binary, msg}, state) do
    case ServerMessage.decode(msg) do
      %ServerMessage{content: {_type, notification}} ->
        handle_notification(notification, state)

      %ServerMessage{content: nil, __unknown_fields__: _binary} ->
        Logger.warning(
          "Can't decode received notification. This probably means that fishjam is using a different version of protobuffs."
        )

        {:ok, state}
    end
  end

  @impl true
  def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do
    state =
      [:server_notification, :metrics]
      |> Enum.reduce(state, fn event_type, state ->
        update_in(state.subscriptions[event_type], &MapSet.delete(&1, pid))
      end)

    {:ok, state}
  end

  @impl true
  def terminate({:remote, 1000, "invalid token"}, state) do
    send(state.caller_pid, {:fishjam, :invalid_token})
  end

  @impl true
  def terminate(_reason, _state) do
    :ok
  end

  defp connect(fun, opts) do
    {address, api_token, secure?} = Utils.get_options_or_defaults(opts)
    address = if secure?, do: "wss://#{address}", else: "ws://#{address}"

    empty_subscriptions = %{server_notification: MapSet.new(), metrics: MapSet.new()}

    state = %{
      caller_pid: self(),
      subscriptions: empty_subscriptions,
      pending_subscriptions: empty_subscriptions
    }

    auth_msg =
      %ServerMessage{content: {:auth_request, %AuthRequest{token: api_token}}}
      |> ServerMessage.encode()

    websockex_opts = Keyword.take(opts, [:name])

    with {:ok, ws} <-
           apply(WebSockex, fun, [
             "#{address}/socket/server/websocket",
             __MODULE__,
             state,
             websockex_opts
           ]),
         :ok <- WebSockex.send_frame(ws, {:binary, auth_msg}) do
      receive do
        {:fishjam, :authenticated} ->
          {:ok, ws}

        {:fishjam, :invalid_token} ->
          {:error, :invalid_token}
      after
        @auth_timeout ->
          Process.exit(ws, :normal)
          {:error, :authentication_timeout}
      end
    else
      {:error, _reason} = error ->
        error
    end
  end

  defp handle_notification(%Authenticated{}, state) do
    send(state.caller_pid, {:fishjam, :authenticated})
    {:ok, state}
  end

  defp handle_notification(%SubscribeResponse{event_type: proto_event_type}, state) do
    event_type = from_proto_event_type(proto_event_type)

    {pending_subscriptions, state} = pop_in(state.pending_subscriptions[event_type])

    state =
      if Enum.empty?(pending_subscriptions) do
        state
      else
        pending_subscriptions
        |> Enum.reduce(state, fn
          pid, state ->
            send(pid, {:fishjam, {:subscribe_answer, :ok}})
            update_in(state.subscriptions[event_type], &MapSet.put(&1, pid))
        end)
      end

    {:ok, state}
  end

  defp handle_notification(%MetricsReport{metrics: metrics}, state) do
    notification = %Fishjam.MetricsReport{metrics: Jason.decode!(metrics)}

    state.subscriptions.metrics
    |> Enum.each(fn pid ->
      send(pid, {:fishjam, notification})
    end)

    {:ok, state}
  end

  defp handle_notification(%{room_id: _room_id} = message, state) do
    state.subscriptions.server_notification
    |> Enum.each(&send(&1, {:fishjam, Notification.to_notification(message)}))

    {:ok, state}
  end

  defp subscribe_request(event_type, caller_pid, state) do
    request = %ServerMessage{
      content:
        {:subscribe_request, %SubscribeRequest{event_type: to_proto_event_type(event_type)}}
    }

    state = update_in(state.pending_subscriptions[event_type], &MapSet.put(&1, caller_pid))

    {request, state}
  end

  defp from_proto_event_type(:EVENT_TYPE_SERVER_NOTIFICATION), do: :server_notification
  defp from_proto_event_type(:EVENT_TYPE_METRICS), do: :metrics

  defp to_proto_event_type(:server_notification), do: :EVENT_TYPE_SERVER_NOTIFICATION
  defp to_proto_event_type(:metrics), do: :EVENT_TYPE_METRICS
end