lib/ex_libnice.ex

defmodule ExLibnice do
  @moduledoc """
  Module that wraps functions from [libnice](https://libnice.freedesktop.org/libnice/index.html).

  For more information about each function please refer to [libnice] documentation.
  """

  use GenServer
  use Bunch

  require Logger
  require Unifex.CNode

  defmodule State do
    @moduledoc false
    use Bunch.Access

    @type t :: %__MODULE__{
            parent: pid,
            impl: NIF | CNode,
            cnode: Unifex.CNode.t(),
            native_state: reference(),
            stream_components: %{stream_id: integer(), n_components: integer()},
            mdns_queries: %{
              query: String.t(),
              candidates: [{sdp :: String.t(), stream_id :: integer(), component_id :: integer()}]
            }
          }

    @enforce_keys [:parent, :impl]
    defstruct @enforce_keys ++
                [
                  cnode: nil,
                  native_state: nil,
                  stream_components: %{},
                  mdns_queries: %{}
                ]
  end

  @typedoc """
  Fully qualified domain name e.g. "my.domain.com".
  """
  @type fqdn() :: String.t()

  @type stun_server() :: %{server_addr: :inet.ip_address() | fqdn(), server_port: 0..65_535}

  @typedoc """
  Type describing TURN server configuration
  """
  @type relay_info :: %{
          server_addr: :inet.ip_address() | fqdn(),
          server_port: 0..65_535,
          username: String.t(),
          password: String.t(),
          relay_type: :udp | :tcp | :tls
        }

  @typedoc """
  Type describing ExLibnice configuration.

  It's a keyword list containing the following keys:
  * impl - implementation to use. Possible values are NIF and CNode.
  You can also choose `impl` via config.exs by
  ```elixir
  config :ex_libnice, impl: :NIF
  ```
  * parent - pid of calling process
  * stun_servers - list of stun servers in form of ip:port
  * controlling_mode - refer to RFC 8445 section 4 - Controlling and Controlled Agent
  * min_port..max_port - the port range to use. Pass 0..0 if you not willing to set it.
  Passed port range will be set for each newly added stream. At this moment it is not possible to
  set port range per stream.
  """
  @type opts_t :: [
          impl: NIF | CNode,
          parent: pid(),
          stun_servers: [stun_server()],
          controlling_mode: boolean(),
          port_range: 0..65_535
        ]

  @doc """
  Spawns new process responsible for interacting with `libnice`.
  """
  @spec start_link(opts :: opts_t) :: {:ok, pid()}
  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts)
  end

  @doc """
  Adds a new stream with `n_components` components. `name` is optional but not passing it will
  influence on SDP string format, see `generate_local_sdp/1`.
  """
  @spec add_stream(pid :: pid(), n_components :: integer(), name :: String.t()) ::
          {:ok, integer()}
          | {:error,
             :failed_to_add_stream | :invalid_stream_or_duplicate_name | :failed_to_attach_recv}
  def add_stream(pid, n_components, name \\ "") do
    GenServer.call(pid, {:add_stream, n_components, name})
  end

  @doc """
  Removes stream with id `stream_id`.
  """
  @spec remove_stream(pid :: pid(), stream_id :: integer()) :: :ok
  def remove_stream(pid, stream_id) do
    GenServer.cast(pid, {:remove_stream, stream_id})
  end

  @doc """
  Sets TURN server. Can be called multiple times for the same component to add many TURN
  servers.
  """
  @spec set_relay_info(
          pid :: pid(),
          stream_id :: integer(),
          component_id :: integer() | [integer()] | :all,
          relay_info :: relay_info() | [relay_info()]
        ) ::
          :ok
          | {:error,
             :bad_stream_id | :bad_relay_type | :failed_to_resolve_addr | :failed_to_set_turn}
  def set_relay_info(pid, stream_id, component_id, relay_info) do
    GenServer.call(pid, {:set_relay_info, stream_id, component_id, relay_info})
  end

  @doc """
  Forget all TURN servers for given component.
  """
  @spec forget_relays(pid :: pid(), stream_id :: integer(), component_id :: integer()) ::
          :ok | {:error, :component_not_found}
  def forget_relays(pid, stream_id, component_id) do
    GenServer.call(pid, {:forget_relays, stream_id, component_id})
  end

  @doc """
  Generates a SDP string containing the local candidates and credentials for all streams and
  components.

  Notice that:
  - local candidates will be present in the result SDP only if `gather_candidates/1` has been
  invoked previously
  - if a stream is created without a name the 'm' line will contain '-' mark but SDP in this form
  will not be parsable by `parse_remote_sdp/2`
  """
  @spec generate_local_sdp(pid :: pid()) :: {:ok, sdp :: String.t()}
  def generate_local_sdp(pid) do
    GenServer.call(pid, :generate_local_sdp)
  end

  @doc """
  Parses a remote SDP string setting credentials and remote candidates for proper streams and
  components.

  It is important that `m` line does not contain `-` mark but the name of the stream.
  """
  @spec parse_remote_sdp(pid :: pid(), sdp :: String.t()) ::
          {:ok, added_cand_num :: integer()} | {:error, :failed_to_parse_sdp}
  def parse_remote_sdp(pid, sdp) do
    GenServer.call(pid, {:parse_remote_sdp, sdp})
  end

  @doc """
  Returns local credentials for stream with id `stream_id`.
  """
  @spec get_local_credentials(pid :: pid(), stream_id :: integer()) ::
          {:ok, credentials :: String.t()} | {:error, :failed_to_get_credentials}
  def get_local_credentials(pid, stream_id) do
    GenServer.call(pid, {:get_local_credentials, stream_id})
  end

  @doc """
  Sets remote credentials for stream with `stream_id`.

  Credentials have to be passed in form of `ufrag pwd`.
  """
  @spec set_remote_credentials(pid :: pid(), credentials :: String.t(), stream_id :: integer()) ::
          :ok | {:error, :failed_to_set_credentials}
  def set_remote_credentials(pid, credentials, stream_id) do
    GenServer.call(pid, {:set_remote_credentials, credentials, stream_id})
  end

  @doc """
  Starts gathering candidates process for stream with id `stream_id`.

  May cause the parent process receive following messages:
  - `{:new_candidate_full, candidate}` - new local candidate
  - `{:candidate_gathering_done, stream_id}` - gathering candidates for stream with `stream_id`
  has been done
  """
  @spec gather_candidates(pid :: pid(), stream_id :: integer()) ::
          :ok | {:error, :invalid_stream_or_allocation}
  def gather_candidates(pid, stream_id) do
    GenServer.call(pid, {:gather_candidates, stream_id})
  end

  @doc """
  Indicates that all remote candidates for stream with id `stream_id` have been passed.

  After receiving this message components can change their state to `FAILED` if all their
  connectivity checks have failed. Not sending this message will cause components stay in
  `CONNECTING` state. (In fact there is a bug and components can change their state to `FAILED`
  even without sending this message. Please refer to
  [#120](https://gitlab.freedesktop.org/libnice/libnice/-/issues/120.)
  """
  @spec peer_candidate_gathering_done(pid :: pid(), stream_id :: integer()) ::
          :ok | {:error, :stream_not_found}
  def peer_candidate_gathering_done(pid, stream_id) do
    GenServer.call(pid, {:peer_candidate_gathering_done, stream_id})
  end

  @doc """
  Sets remote candidate for component with id `component_id` in stream with id `stream_id`.

  Candidate has to be passed as SDP string.
  May cause the parent process receive following messages:
  - `{:new_remote_candidate_full, candidate}` - new remote (prflx) candidate
  - `{:new_selected_pair, stream_id, component_id, lfoundation, rfoundation}` - new selected pair
  - `{:component_state_failed, stream_id, component_id}` - component with id `component_id` in
  stream with id `stream_id` has changed state to FAILED
  - `{:component_state_ready, stream_id, component_id}` - component with id `component_id` in stream
  with id `stream_id` has changed state to READY i.e. it is ready to receive and send data
  """
  @spec set_remote_candidate(
          pid :: pid(),
          candidate :: String.t(),
          stream_id :: integer(),
          component_id :: integer()
        ) :: :ok | {:error, :failed_to_parse_sdp_string | :failed_to_set}
  def set_remote_candidate(pid, candidate, stream_id, component_id) do
    GenServer.call(pid, {:set_remote_candidate, candidate, stream_id, component_id})
  end

  @doc """
  Restarts all streams.
  """
  @spec restart(pid :: pid()) :: :ok | {:error, :failed_to_restart}
  def restart(pid) do
    GenServer.call(pid, :restart)
  end

  @doc """
  Restarts stream with id `stream_id`.
  """
  @spec restart_stream(pid :: pid(), stream_id :: integer()) :: :ok | {:error, :failed_to_restart}
  def restart_stream(pid, stream_id) do
    GenServer.call(pid, {:restart_stream, stream_id})
  end

  @doc """
  Sends payload on component with id `component_id` in stream with id `stream_id`. Payload has to
  be in a binary format.
  """
  @spec send_payload(
          pid :: pid(),
          stream_id :: integer(),
          component_id :: integer(),
          payload :: binary()
        ) :: :ok | {:error, :failed_to_send}
  def send_payload(pid, stream_id, component_id, payload) do
    GenServer.call(pid, {:send_payload, stream_id, component_id, payload})
  end

  @doc """
  Stops and cleans up `ExLibnice` instance.
  """
  @spec stop(pid :: pid(), reason :: term(), timeout :: timeout()) :: :ok
  def stop(pid, reason \\ :normal, timeout \\ :infinity) do
    GenServer.stop(pid, reason, timeout)
  end

  # Server API
  @impl true
  def init(opts) do
    min_port..max_port = opts[:port_range]

    {:ok, stun_servers} = lookup_stun_servers(opts[:stun_servers])

    impl = Application.get_env(:ex_libnice, :impl) || opts[:impl] || CNode
    state = %State{parent: opts[:parent], impl: impl}

    {:ok, state} =
      call(impl, :init, [stun_servers, opts[:controlling_mode], min_port, max_port], state)

    {:ok, state}
  end

  @impl true
  def handle_call({:add_stream, n_components}, from, state) do
    handle_call({:add_stream, n_components, ""}, from, state)
  end

  @impl true
  def handle_call({:add_stream, n_components, name}, _from, state) do
    case call(state.impl, :add_stream, [n_components, name], state) do
      {{:ok, stream_id}, state} ->
        Logger.debug("New stream_id: #{stream_id}")
        {:reply, {:ok, stream_id}, put_in(state.stream_components[stream_id], n_components)}

      {{:error, cause}, state} ->
        Logger.warn("""
        Couldn't add stream with #{n_components} components and name "#{inspect(name)}": \
        #{inspect(cause)}
        """)

        {:reply, {:error, cause}, state}
    end
  end

  @impl true
  def handle_call({:set_relay_info, stream_id, component_id, relay_info}, _from, state) do
    {ret, state} =
      Bunch.listify(relay_info)
      |> Bunch.Enum.try_reduce(state, fn single_relay_info, state ->
        do_set_relay_info(state, stream_id, component_id, single_relay_info)
      end)

    {:reply, ret, state}
  end

  @impl true
  def handle_call({:forget_relays, stream_id, component_id}, _from, state) do
    case call(state.impl, :forget_relays, [stream_id, component_id], state) do
      {:ok, state} ->
        {:reply, :ok, state}

      {{:error, cause}, state} ->
        Logger.warn("""
        Couldn't forget TURN servers for component: #{inspect(component_id)} in stream: \
        #{inspect(stream_id)}, reason: #{inspect(cause)}
        """)

        {:reply, {:error, cause}, state}
    end
  end

  @impl true
  def handle_call(:generate_local_sdp, _from, state) do
    {{:ok, local_sdp}, state} = call(state.impl, :generate_local_sdp, [], state)
    Logger.debug("local sdp: #{inspect(local_sdp)}")
    {:reply, {:ok, local_sdp}, state}
  end

  @impl true
  def handle_call({:parse_remote_sdp, remote_sdp}, _from, state) do
    case call(state.impl, :parse_remote_sdp, [remote_sdp], state) do
      {{:ok, added_cand_num}, state} ->
        Logger.debug("parse_remote_sdp: ok; added #{added_cand_num} candidates")
        {:reply, {:ok, added_cand_num}, state}

      {{:error, cause}, state} ->
        Logger.warn("Couldn't parse remote sdp #{inspect(remote_sdp)}")
        {:reply, {:error, cause}, state}
    end
  end

  @impl true
  def handle_call({:get_local_credentials, stream_id}, _from, state) do
    case call(state.impl, :get_local_credentials, [stream_id], state) do
      {{:ok, credentials}, state} ->
        Logger.debug("local credentials: #{credentials}")
        {:reply, {:ok, credentials}, state}

      {{:error, cause}, state} ->
        Logger.error("get_local_credentials: #{inspect(cause)}")
        {{:error, cause}, state}
    end
  end

  @impl true
  def handle_call(
        {:set_remote_credentials, credentials, stream_id},
        _from,
        state
      ) do
    case call(state.impl, :set_remote_credentials, [credentials, stream_id], state) do
      {:ok, state} ->
        Logger.debug("set_remote_credentials: ok")
        {:reply, :ok, state}

      {{:error, cause}, state} ->
        Logger.error("set_remote_credentials: #{inspect(cause)}")
        {:reply, {:error, cause}, state}
    end
  end

  @impl true
  def handle_call({:gather_candidates, stream_id} = msg, _from, state) do
    case call(state.impl, :gather_candidates, [stream_id], state) do
      {:ok, state} ->
        Logger.debug("#{inspect(msg)}")
        {:reply, :ok, state}

      {{:error, cause}, state} ->
        Logger.error("gather_candidates: #{inspect(msg)}")
        {:reply, {:error, cause}, state}
    end
  end

  @impl true
  def handle_call(
        {:peer_candidate_gathering_done, stream_id},
        _ctx,
        state
      ) do
    case call(state.impl, :peer_candidate_gathering_done, [stream_id], state) do
      {:ok, state} ->
        Logger.debug("peer_candidate_gathering_done: ok")
        {:reply, :ok, state}

      {{:error, cause}, state} ->
        Logger.warn("peer_candidate_gathering_done: #{inspect(cause)}")
        {:reply, {:error, cause}, state}
    end
  end

  @impl true
  def handle_call(
        {:set_remote_candidate, "a=", _stream_id, _component_id},
        _from,
        state
      ) do
    Logger.debug("Empty candidate \"a=\". Should we do something with it?")
    {:reply, :ok, state}
  end

  @impl true
  def handle_call({:set_remote_candidate, candidate, stream_id, component_id}, _from, state) do
    candidate_sp = String.split(candidate, " ", parts: 6)

    withl candidate_check: 6 <- length(candidate_sp),
          do: address = Enum.at(candidate_sp, 4),
          mdns_check: true <- String.ends_with?(address, ".local") do
      if Application.get_env(:ex_libnice, :mdns, true) do
        state =
          update_in(state, [:mdns_queries, address], fn
            # if we haven't query this address yet
            nil ->
              ExLibnice.Mdns.query(address)
              [{candidate, stream_id, component_id}]

            candidates ->
              [{candidate, stream_id, component_id} | candidates]
          end)

        {:reply, :ok, state}
      else
        Logger.debug("Got mdns address but mdns client is turned off. Ignoring.")
        {:reply, :ok, state}
      end
    else
      candidate_check: _ -> {:reply, {:error, :failed_to_parse_sdp_string}, state}
      mdns_check: _ -> do_set_remote_candidate(candidate, stream_id, component_id, state)
    end
  end

  @impl true
  def handle_call(:restart, _from, state) do
    case call(state.impl, :restart, [], state) do
      {:ok, state} ->
        Logger.debug("ICE restarted")
        {:reply, :ok, state}

      {{:error, cause}, state} ->
        Logger.warn("Couldn't restart ICE")
        {:reply, {:error, cause}, state}
    end
  end

  @impl true
  def handle_call({:restart_stream, stream_id}, _from, state) do
    case call(state.impl, :restart_stream, [stream_id], state) do
      {:ok, state} ->
        Logger.debug("Stream #{inspect(stream_id)} restarted")
        {:reply, :ok, state}

      {{:error, cause}, state} ->
        Logger.warn("Couldn't restart stream #{inspect(stream_id)}")
        {:reply, {:error, cause}, state}
    end
  end

  @impl true
  def handle_call({:send_payload, stream_id, component_id, payload}, _from, state) do
    case call(state.impl, :send_payload, [stream_id, component_id, payload], state) do
      {:ok, state} ->
        {:reply, :ok, state}

      {{:error, cause}, state} ->
        Logger.warn("Couldn't send payload: #{inspect(cause)}")
        {:reply, {:error, cause}, state}
    end
  end

  @impl true
  def handle_cast({:remove_stream, stream_id}, state) do
    {:ok, state} = call(state.impl, :remove_stream, [stream_id], state)
    Logger.debug("remove_stream #{stream_id}: ok")
    {_n_components, state} = pop_in(state.stream_components[stream_id])
    {:noreply, state}
  end

  @impl true
  def handle_info({:new_candidate_full, _cand} = msg, %State{parent: parent} = state) do
    Logger.debug("#{inspect(msg)}")
    send(parent, msg)
    {:noreply, state}
  end

  @impl true
  def handle_info({:new_remote_candidate_full, _cand} = msg, %State{parent: parent} = state) do
    Logger.debug("#{inspect(msg)}")
    send(parent, msg)
    {:noreply, state}
  end

  @impl true
  def handle_info({:candidate_gathering_done, _stream_id} = msg, %State{parent: parent} = state) do
    Logger.debug("#{inspect(msg)}")
    send(parent, msg)
    {:noreply, state}
  end

  @impl true
  def handle_info(
        {:new_selected_pair, _stream_id, _component_id, _lfoundation, _rfoundation} = msg,
        %State{parent: parent} = state
      ) do
    Logger.debug("#{inspect(msg)}")
    send(parent, msg)
    {:noreply, state}
  end

  @impl true
  def handle_info(
        {:component_state_ready, _stream_id, _component_id} = msg,
        %State{parent: parent} = state
      ) do
    Logger.debug("#{inspect(msg)}")
    send(parent, msg)
    {:noreply, state}
  end

  @impl true
  def handle_info(
        {:component_state_failed, _stream_id, _component_id} = msg,
        %State{parent: parent} = state
      ) do
    Logger.warn("#{inspect(msg)}")
    send(parent, msg)
    {:noreply, state}
  end

  @impl true
  def handle_info(
        {:ice_payload, _stream_id, _component_id, _payload} = msg,
        %State{parent: parent} = state
      ) do
    send(parent, msg)
    {:noreply, state}
  end

  @impl true
  def handle_info({:mdns_response, address, ip}, state) do
    {candidates, state} = pop_in(state, [:mdns_queries, address])

    for {candidate, stream_id, component_id} <- candidates do
      candidate_parts =
        String.split(candidate, " ", parts: 6)
        |> List.replace_at(4, :inet.ntoa(ip))

      candidate = Enum.join(candidate_parts, " ")
      do_set_remote_candidate(candidate, stream_id, component_id, state)
    end

    {:noreply, state}
  end

  @impl true
  def handle_info(msg, state) do
    Logger.warn("Unknown message #{inspect(msg)}")
    {:noreply, state}
  end

  @impl true
  def terminate(_reason, %State{native_state: nil, cnode: cnode}) do
    Logger.debug("Terminating ExLibnice instance #{inspect(self())}")
    Unifex.CNode.stop(cnode)
  end

  @impl true
  def terminate(_reason, _state) do
    Logger.debug("Terminating ExLibnice instance #{inspect(self())}")
    :ok
  end

  defp do_set_relay_info(state, stream_id, n_components, relay_info) when is_list(n_components),
    do:
      Bunch.Enum.try_reduce(n_components, state, fn n_component, state ->
        do_set_relay_info(state, stream_id, n_component, relay_info)
      end)

  defp do_set_relay_info(state, stream_id, :all, relay_info) do
    case Map.get(state.stream_components, stream_id) do
      nil ->
        Logger.warn("Couldn't set TURN servers. No stream with id #{inspect(stream_id)}")

        {{:error, :bad_stream_id}, state}

      n_components ->
        Bunch.Enum.try_reduce(1..n_components, state, fn n_component, state ->
          do_set_relay_info(state, stream_id, n_component, relay_info)
        end)
    end
  end

  defp do_set_relay_info(
         state,
         stream_id,
         component_id,
         %{server_addr: server_addr, server_port: server_port, relay_type: relay_type}
       )
       when relay_type not in [:udp, :tcp, :tls] do
    Logger.warn("""
    Couldn't set TURN server #{inspect(server_addr)} #{inspect(server_port)} \
    #{inspect(relay_type)} for component: #{inspect(component_id)} in stream: \
    #{inspect(stream_id)}, cause: bad_relay_type
    """)

    {{:error, :bad_relay_type}, state}
  end

  defp do_set_relay_info(state, stream_id, component_id, %{
         server_addr: server_addr,
         server_port: server_port,
         username: username,
         password: password,
         relay_type: relay_type
       }) do
    with {:ok, server_ip} <- lookup_addr(server_addr),
         {:ok, state} <-
           call(
             state.impl,
             :set_relay_info,
             [
               stream_id,
               component_id,
               :inet.ntoa(server_ip) |> to_string(),
               server_port,
               username,
               password,
               Atom.to_string(relay_type)
             ],
             state
           ) do
      {:ok, state}
    else
      {:error, :failed_to_lookup_addr} = err ->
        Logger.warn("""
        Couldn't lookup TURN server address #{inspect(server_addr)}
        """)

        {err, state}

      {{:error, cause}, _state} = ret ->
        Logger.warn("""
        Couldn't set TURN server #{inspect(server_addr)} #{inspect(server_port)} \
        #{inspect(relay_type)} for component: #{inspect(component_id)} in stream: \
        #{inspect(stream_id)}, cause: #{inspect(cause)}
        """)

        ret
    end
  end

  defp do_set_remote_candidate(candidate, stream_id, component_id, state) do
    case call(state.impl, :set_remote_candidate, [candidate, stream_id, component_id], state) do
      {:ok, state} ->
        Logger.debug("Set remote candidate: #{inspect(candidate)}")
        {:reply, :ok, state}

      {{:error, cause}, state} ->
        Logger.warn("Couldn't set remote candidate: #{inspect(cause)}")
        {:reply, {:error, cause}, state}
    end
  end

  defp call(NIF, :init = func, args, state) do
    {ret, native_state} = apply(ExLibnice.Native, func, args)
    {ret, %{state | native_state: native_state}}
  end

  defp call(NIF, func, args, state) do
    {ret, native_state} = apply(ExLibnice.Native, func, [state.native_state | args])
    {ret, %{state | native_state: native_state}}
  end

  defp call(CNode, func, args, %{cnode: nil} = state) do
    {:ok, cnode} = Unifex.CNode.start_link(:native)
    call(CNode, func, args, %{state | cnode: cnode})
  end

  defp call(CNode, func, args, state) do
    ret = apply(Unifex.CNode, :call, [state.cnode, func, args])
    {ret, state}
  end

  defp lookup_stun_servers(stun_servers) do
    Bunch.Enum.try_map(stun_servers, fn %{server_addr: addr, server_port: port} ->
      case lookup_addr(addr) do
        {:ok, ip} -> {:ok, "#{:inet.ntoa(ip)}:#{port}"}
        {:error, cause} -> {:error, cause, addr}
      end
    end)
  end

  defp lookup_addr({_a, _b, _c, _d} = addr), do: {:ok, addr}
  defp lookup_addr({_a, _b, _c, _d, _e, _f, _g, _h} = addr), do: {:ok, addr}

  defp lookup_addr(addr) when is_binary(addr) do
    case :inet_res.lookup(to_charlist(addr), :in, :a) do
      [] -> {:error, :failed_to_lookup_address}
      [h | _t] -> {:ok, h}
    end
  end
end