lib/membrane_rtsp/rtsp.ex

defmodule Membrane.RTSP do
  @moduledoc "RTSP Session"
  use GenServer

  import Membrane.RTSP.Logic

  alias Membrane.RTSP
  alias Membrane.RTSP.Logic.State
  alias Membrane.RTSP.{Request, Response}

  @type t() :: pid()

  @default_rtsp_port 554

  @doc """
  Starts and links session process.

  Sets following properties of Session:
    * transport - information for executing request over the network. For
    reference see `Membrane.RTSP.Transport`
    * url - a base path for requests
    * options - a keyword list that shall be passed when executing request over
    transport
  """
  @spec start_link(binary(), module() | URI.t(), Keyword.t()) :: GenServer.on_start()
  def start_link(url, transport \\ Membrane.RTSP.Transport.TCPSocket, options \\ []) do
    do_start(url, transport, options, &GenServer.start_link/2)
  end

  @spec start(binary(), module() | URI.t(), Keyword.t()) :: GenServer.on_start()
  def start(url, transport \\ Membrane.RTSP.Transport.TCPSocket, options \\ []) do
    do_start(url, transport, options, &GenServer.start/2)
  end

  defp do_start(url, transport, options, start_fun) do
    case URI.parse(url) do
      %URI{host: host, scheme: "rtsp"} = url when is_binary(host) ->
        start_fun.(__MODULE__, %{
          transport: transport,
          url: %URI{url | port: url.port || @default_rtsp_port},
          options: options
        })

      _else ->
        {:error, :invalid_url}
    end
  end

  @impl true
  def init(%{url: url, options: options, transport: transport_module}) do
    auth_type =
      case url do
        %URI{userinfo: nil} -> nil
        # default to basic. If it is actually digest, it will get set
        # when the correct header is detected
        %URI{userinfo: info} when is_binary(info) -> :basic
      end

    with {:ok, transport} <- transport_module.init(url, options) do
      state = %State{
        transport: transport,
        transport_module: transport_module,
        uri: url,
        execution_options: options,
        auth: auth_type
      }

      {:ok, state}
    else
      {:error, reason} -> {:stop, reason}
    end
  end

  @impl true
  def handle_call({:execute, request}, _from, state) do
    with {:ok, raw_response} <- execute(request, state),
         {:ok, response, state} <- parse_response(raw_response, state) do
      {:reply, {:ok, response}, state}
    else
      {:error, reason} -> {:reply, {:error, reason}, state}
    end
  end

  def handle_call(:get_transport, _from, %State{transport: transport} = state) do
    {:reply, transport, state}
  end

  def handle_call({:parse_response, raw_response}, _from, state) do
    with {:ok, response, state} <- parse_response(raw_response, state) do
      {:reply, {:ok, response}, state}
    else
      {:error, reason} -> {:reply, {:error, reason}, state}
    end
  end

  @impl true
  def handle_cast(:terminate, %State{} = state) do
    {:stop, :normal, state}
  end

  def handle_cast({:execute, request}, %State{cseq: cseq} = state) do
    case execute(request, state, false) do
      :ok ->
        {:noreply, %State{state | cseq: cseq + 1}}

      {:error, reason} ->
        raise "Error: #{reason}"
    end
  end

  @impl true
  # this might be a message for transport layer. Redirect
  def handle_info(msg, %State{} = state) do
    state.transport_module.handle_info(msg, state.transport)
    |> translate(:transport, state)
  end

  @impl true
  def terminate(_reason, state) do
    state.transport_module.close(state.transport)
  end

  @spec request(pid(), binary(), RTSP.headers(), binary(), nil | binary()) :: Response.result()
  def request(session, method, headers \\ [], body \\ "", path \\ nil) do
    request = %Request{method: method, headers: headers, body: body, path: path}
    GenServer.call(session, {:execute, request}, :infinity)
  end

  @spec request_no_response(pid(), binary(), RTSP.headers(), binary(), nil | binary()) :: :ok
  def request_no_response(session, method, headers \\ [], body \\ "", path \\ nil) do
    request = %Request{method: method, headers: headers, body: body, path: path}
    GenServer.cast(session, {:execute, request})
  end

  @spec close(pid()) :: :ok
  def close(session), do: GenServer.cast(session, :terminate)

  defp translate({action, new_state}, key, state) do
    {action, Map.put(state, key, new_state)}
  end

  defp translate({action, reply, new_state}, key, state) do
    {action, reply, Map.put(state, key, new_state)}
  end

  @type headers :: [{binary(), binary()}]

  @spec get_transport(t()) :: any()
  def get_transport(session) do
    GenServer.call(session, :get_transport)
  end

  @spec get_parameter_no_response(t(), headers(), binary()) :: :ok
  def get_parameter_no_response(session, headers \\ [], body \\ ""),
    do: request_no_response(session, "GET_PARAMETER", headers, body)

  @spec play_no_response(t(), headers()) :: :ok
  def play_no_response(session, headers \\ []),
    do: request_no_response(session, "PLAY", headers, "")

  @spec handle_response(t(), binary()) :: Response.result()
  def handle_response(session, raw_response),
    do: GenServer.call(session, {:parse_response, raw_response})

  @spec describe(t(), headers()) :: Response.result()
  def describe(session, headers \\ []), do: request(session, "DESCRIBE", headers, "")

  @spec announce(t(), headers(), binary()) :: Response.result()
  def announce(session, headers \\ [], body \\ ""),
    do: request(session, "ANNOUNCE", headers, body)

  @spec get_parameter(t(), headers(), binary()) :: Response.result()
  def get_parameter(session, headers \\ [], body \\ ""),
    do: request(session, "GET_PARAMETER", headers, body)

  @spec options(t(), headers()) :: Response.result()
  def options(session, headers \\ []), do: request(session, "OPTIONS", headers)

  @spec pause(t(), headers()) :: Response.result()
  def pause(session, headers \\ []), do: request(session, "PAUSE", headers)

  @spec play(t(), headers()) :: Response.result()
  def play(session, headers \\ []), do: request(session, "PLAY", headers, "")

  @spec record(t(), headers()) :: Response.result()
  def record(session, headers \\ []), do: request(session, "RECORD", headers)

  @spec setup(t(), binary(), headers()) :: Response.result()
  def setup(session, path, headers \\ []), do: request(session, "SETUP", headers, "", path)

  @spec set_parameter(t(), headers(), binary()) :: Response.result()
  def set_parameter(session, headers \\ [], body \\ ""),
    do: request(session, "SET_PARAMETER", headers, body)

  @spec teardown(t(), headers()) :: Response.result()
  def teardown(session, headers \\ []), do: request(session, "TEARDOWN", headers)
end