defmodule Jellyfish.Room do
@moduledoc """
Utilities for manipulating the rooms.
## Examples
```
iex> client = Jellyfish.Client.new()
iex> assert {:ok, %Jellyfish.Room{
...> components: [],
...> config: %{max_peers: 10, video_codec: nil},
...> peers: []
...> } = room, _jellyfish_address} = Jellyfish.Room.create(client, max_peers: 10)
iex> room == %Jellyfish.Room{
...> id: room.id,
...> components: [],
...> config: %{max_peers: 10, video_codec: nil},
...> peers: []}
true
iex> assert {:ok, %{peer: %Jellyfish.Peer{
...> status: :disconnected,
...> type: Jellyfish.Peer.WebRTC,
...> tracks: []
...> } = peer}} = Jellyfish.Room.add_peer(client, room.id, Jellyfish.Peer.WebRTC)
iex> %Jellyfish.Peer{
...> id: peer.id,
...> status: :disconnected,
...> type: Jellyfish.Peer.WebRTC,
...> tracks: [],
...> metadata: nil} == peer
true
iex> :ok = Jellyfish.Room.delete(client, room.id)
:ok
```
"""
alias Tesla.Env
alias Jellyfish.Component.{File, HLS, Recording, RTSP, SIP}
alias Jellyfish.{Client, Component, Peer, Utils}
alias Jellyfish.Exception.StructureError
@s3_keys [:access_key_id, :secret_access_key, :region, :bucket]
@subscribe_modes [:auto, :manual]
@enforce_keys [
:id,
:config,
:components,
:peers
]
defstruct @enforce_keys
@typedoc """
Id of the room, unique within Jellyfish instance.
"""
@type id :: String.t()
@typedoc """
Id of the track, unique within Jellyfish instance.
"""
@type track_id :: String.t()
@typedoc """
Peer token, created by Jellyfish. Required by client application to open connection to Jellyfish.
"""
@type peer_token :: String.t()
@typedoc """
Jellyfish response to adding a peer to room. It consists of:
* peer structure
* token used for authentication when connecting through websocket to jellyfish
* ws_url that is a websocket adress to which this specific peer have to connect
"""
@type peer_create_response :: %{peer: Peer.t(), token: peer_token(), ws_url: String.t()}
@typedoc """
Options used for creating a room.
* `:room_id` - custom room id, which uniquely identifies room. If not provided
a random UUID is generated.
* `:max_peers` - maximum number of peers present in a room simultaneously.
If set to `nil` or unspecified, the number of peers is unlimited.
* `:video_codec` - enforces specific video codec for each peer in the room.
If set to `nil` or unspecified, any codec will be accepted.
To use HLS component video codec has to be `:h264`.
* `:webhook_url` - an address of a server receiving webhook
notifications from the room.
* `:peerless_purge_timeout` - duration (in seconds) after which the room will be removed
if no peers are connected. If not provided, this feature is disabled.
* `:peer_disconnected_timeout` - duration (in seconds) after which the peer will be removed
if it is disconnected. If not provided, this feature is disabled.
"""
@type options :: [
room_id: String.t() | nil,
max_peers: non_neg_integer() | nil,
video_codec: :h264 | :vp8 | nil,
webhook_url: String.t() | nil,
peerless_purge_timeout: pos_integer() | nil,
peer_disconnected_timeout: pos_integer() | nil
]
@typedoc """
Stores information about the room.
"""
@type t :: %__MODULE__{
id: id(),
config: map(),
components: [Component.t()],
peers: [Peer.t()]
}
@doc """
Lists properties of all of the rooms.
"""
@spec get_all(Client.t()) :: {:ok, [t()]} | {:error, atom() | String.t()}
def get_all(client) do
with {:ok, data} <- Utils.make_get_request!(client, "/room"),
result <- Enum.map(data, &from_json/1) do
{:ok, result}
end
end
@doc """
Gets properties of the room with `room_id`.
"""
@spec get(Client.t(), id()) :: {:ok, t()} | {:error, atom() | String.t()}
def get(client, room_id) do
with {:ok, data} <- Utils.make_get_request!(client, "/room/#{room_id}"),
result <- from_json(data) do
{:ok, result}
end
end
@doc """
Creates a new room.
Returns an address of Jellyfish where the room was created.
When running Jellyfish in a cluster, this address might be different
than the one used in the initial call.
Therefore, it is important to call `Jellyfish.Client.update_address/2`
before subsequent operations like adding peers or components.
"""
@spec create(Client.t(), options()) :: {:ok, t(), String.t()} | {:error, atom() | String.t()}
def create(client, opts \\ []) do
with {:ok, data} <-
Utils.make_post_request!(client, "/room", %{
"roomId" => Keyword.get(opts, :room_id),
"maxPeers" => Keyword.get(opts, :max_peers),
"videoCodec" => Keyword.get(opts, :video_codec),
"webhookUrl" => Keyword.get(opts, :webhook_url),
"peerlessPurgeTimeout" => Keyword.get(opts, :peerless_purge_timeout),
"peerDisconnectedTimeout" => Keyword.get(opts, :peer_disconnected_timeout)
}),
room_json <- Map.fetch!(data, "room"),
jellyfish_address <- Map.fetch!(data, "jellyfish_address"),
result <- from_json(room_json) do
{:ok, result, jellyfish_address}
end
end
@doc """
Deletes the room with `room_id`.
"""
@spec delete(Client.t(), id()) :: :ok | {:error, atom() | String.t()}
def delete(client, room_id) do
case Tesla.delete(client.http_client, "/room/#{room_id}") do
{:ok, %Env{status: 204}} -> :ok
error -> Utils.handle_response_error(error)
end
end
@doc """
Adds a peer to the room with `room_id`.
"""
@spec add_peer(Client.t(), id(), Peer.options() | Peer.type()) ::
{:ok, peer_create_response()}
| {:error, atom() | String.t()}
def add_peer(client, room_id, peer) do
peer = if is_atom(peer), do: struct!(peer), else: peer
with {:ok, data} <-
Utils.make_post_request!(client, "/room/#{room_id}/peer", %{
"type" => Peer.string_from_options(peer),
"options" =>
Map.from_struct(peer)
|> Map.new(fn {k, v} -> {snake_case_to_camel_case(k), v} end)
}),
%{"peer" => peer, "token" => token, "peer_websocket_url" => peer_websocket_url} <- data,
result <- Peer.from_json(peer) do
{:ok, %{peer: result, token: token, ws_url: peer_websocket_url}}
end
end
@doc """
Deletes the peer with `peer_id` from the room with `room_id`.
"""
@spec delete_peer(Client.t(), id(), Peer.id()) :: :ok | {:error, atom() | String.t()}
def delete_peer(client, room_id, peer_id) do
case Tesla.delete(
client.http_client,
"/room/#{room_id}/peer/#{peer_id}"
) do
{:ok, %Env{status: 204}} -> :ok
error -> Utils.handle_response_error(error)
end
end
@doc """
Adds a component to the room with `room_id`.
"""
@spec add_component(Client.t(), id(), Component.options() | Component.type()) ::
{:ok, Component.t()} | {:error, atom() | String.t()}
def add_component(client, room_id, component) do
component = if is_atom(component), do: struct!(component), else: component
with :ok <- validate_component(component),
{:ok, %Env{status: 201, body: body}} <-
Tesla.post(
client.http_client,
"/room/#{room_id}/component",
%{
"type" => Component.string_from_options(component),
"options" =>
component
|> Map.from_struct()
|> map_snake_case_to_camel_case()
}
),
{:ok, data} <- Map.fetch(body, "data"),
result <- Component.from_json(data) do
{:ok, result}
else
:error -> raise StructureError
error -> Utils.handle_response_error(error)
end
end
@doc """
Deletes the component with `component_id` from the room with `room_id`.
"""
@spec delete_component(Client.t(), id(), Component.id()) :: :ok | {:error, atom() | String.t()}
def delete_component(client, room_id, component_id) do
case Tesla.delete(
client.http_client,
"/room/#{room_id}/component/#{component_id}"
) do
{:ok, %Env{status: 204}} -> :ok
error -> Utils.handle_response_error(error)
end
end
@doc """
Adds peers and components tracks to hls or recording component
In order to subscribe the component to peers/components, the component should be initialized with the subscribe_mode set to :manual.
This mode proves beneficial when you do not wish to record or stream all the available streams within a room.
It allows for selective addition instead – you can manually select specific streams.
For instance, you could opt to record only the stream of an event's host.
"""
@spec subscribe(Client.t(), id(), Component.id(), [Peer.id() | Component.id()]) ::
:ok | {:error, atom() | String.t()}
def subscribe(client, room_id, component_id, origins) do
with :ok <- validate_origins(origins),
{:ok, %Env{status: 201}} <-
Tesla.post(
client.http_client,
"/room/#{room_id}/component/#{component_id}/subscribe",
%{
origins: origins
}
) do
:ok
else
error -> Utils.handle_response_error(error)
end
end
@doc """
Starts a phone call from a specified component to a provided phone number.
This is asynchronous operation. In case of providing incorrect phone number you will receive a notification `ComponentCrashed`.
"""
@spec dial(Client.t(), id(), Component.id(), String.t()) ::
:ok | {:error, atom() | String.t()}
def dial(client, room_id, component_id, phone_number) do
with :ok <- validate_phone_number(phone_number),
{:ok, %Env{status: 201}} <-
Tesla.post(client.http_client, "/sip/#{room_id}/#{component_id}/call", %{
phoneNumber: phone_number
}) do
:ok
else
error -> Utils.handle_response_error(error)
end
end
@doc """
End a phone call on a specified SIP component.
This is asynchronous operation.
"""
@spec end_call(Client.t(), id(), Component.id()) ::
:ok | {:error, atom() | String.t()}
def end_call(client, room_id, component_id) do
with {:ok, %Env{status: 204}} <-
Tesla.delete(client.http_client, "/sip/#{room_id}/#{component_id}/call") do
:ok
else
error -> Utils.handle_response_error(error)
end
end
@doc false
@spec from_json(map()) :: t()
def from_json(response) do
case response do
%{
"id" => id,
"config" => %{"maxPeers" => max_peers, "videoCodec" => video_codec},
"components" => components,
"peers" => peers
} ->
%__MODULE__{
id: id,
config: %{max_peers: max_peers, video_codec: codec_to_atom(video_codec)},
components: Enum.map(components, &Component.from_json/1),
peers: Enum.map(peers, &Peer.from_json/1)
}
unknown_structure ->
raise StructureError, unknown_structure
end
end
defp validate_component(%RTSP{}), do: :ok
defp validate_component(%File{}), do: :ok
defp validate_component(%SIP{}), do: :ok
defp validate_component(%Recording{credentials: credentials}) do
case validate_s3_credentials(credentials) do
:ok -> :ok
:error -> {:error, :component_validation}
end
end
defp validate_component(%HLS{s3: s3, subscribe_mode: subscribe_mode}) do
with :ok <- validate_s3_credentials(s3),
:ok <- validate_subscribe_mode(subscribe_mode) do
:ok
else
:error -> {:error, :component_validation}
end
end
defp validate_component(_component), do: {:error, :component_validation}
defp validate_s3_credentials(%{} = credentials) do
keys = Map.keys(credentials)
if @s3_keys -- keys == [] and keys -- @s3_keys == [],
do: :ok,
else: :error
end
defp validate_s3_credentials(nil), do: :ok
defp validate_s3_credentials(_credentials), do: :error
defp validate_subscribe_mode(mode) when mode in @subscribe_modes, do: :ok
defp validate_subscribe_mode(_mode), do: :error
defp validate_origins(origins) when is_list(origins), do: :ok
defp validate_origins(_tracks), do: {:error, :origins_validation}
defp validate_phone_number(phone_number) when is_binary(phone_number), do: :ok
defp validate_phone_number(_phone_number), do: {:error, :incorrect_phone_number_type}
defp map_snake_case_to_camel_case(%{} = map),
do:
Map.new(map, fn {k, v} -> {snake_case_to_camel_case(k), map_snake_case_to_camel_case(v)} end)
defp map_snake_case_to_camel_case(value), do: value
defp snake_case_to_camel_case(atom) do
[first | rest] = Atom.to_string(atom) |> String.split("_")
rest = rest |> Enum.map(&String.capitalize/1)
Enum.join([first | rest])
end
defp codec_to_atom("h264"), do: :h264
defp codec_to_atom("vp8"), do: :vp8
defp codec_to_atom(nil), do: nil
end