lib/membrane_rtsp/transport/tcp_socket.ex

defmodule Membrane.RTSP.Transport.TCPSocket do
  @moduledoc """
  This module implements the Transport behaviour and transmits requests over TCP
  Socket keeping connection until either session is closed or connection is
  closed by server.

  Supported options:
    * timeout - time after request will be deemed missing and error shall be
     returned.
  """
  use Membrane.RTSP.Transport
  import Mockery.Macro

  @connection_timeout 1000
  @tcp_receive_timeout 100

  @impl true
  def init(%URI{} = connection_info, options \\ []) do
    connection_timeout = options[:connection_timeout] || @connection_timeout

    with {:ok, socket} <- open(connection_info, connection_timeout) do
      {:ok, socket}
    else
      {:error, reason} -> {:error, reason}
    end
  end

  defp open(%URI{host: host, port: port}, connection_timeout) do
    mockable(:gen_tcp).connect(
      to_charlist(host),
      port,
      [:binary, active: true],
      connection_timeout
    )
  end

  @impl true
  def execute(request, socket, _options \\ []) do
    with :ok <- mockable(:gen_tcp).send(socket, request),
         {:ok, data} <- recv() do
      {:ok, data}
    else
      {:error, _reason} = error -> error
    end
  end

  @impl true
  def handle_info({:tcp_closed, _socket}, state) do
    {:stop, :socket_closed, state}
  end

  @impl true
  def close(_state), do: :ok

  defp recv(acc \\ <<>>) do
    receive do
      {:tcp, _socket, data} ->
        recv(acc <> data)

      {:tcp_closed, _socket} ->
        {:error, :connection_closed}
    after
      @tcp_receive_timeout ->
        {:ok, acc}
    end
  end
end