lib/riverside/test/test_client.ex

defmodule Riverside.Test.TestClient do
  require Logger

  use GenServer

  defstruct sock: nil,
            codec: nil

  def stop(pid) do
    GenServer.call(pid, :stop)
  end

  def send_message(pid, msg) do
    GenServer.cast(pid, {:send, msg})
  end

  def test_message(%{sender: pid, message: message, receivers: receivers}, timeout \\ 1_000) do
    receivers
    |> Enum.each(fn %{receiver: receiver, tests: tests} ->
      wait_to_test(receiver, tests, timeout)
    end)

    send_message(pid, message)
  end

  def wait_to_test(pid, functions, timeout) do
    GenServer.call(pid, {:wait_to_receive, functions, timeout}, timeout + 1_000)
  end

  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts)
  end

  def new(sock, codec) do
    %__MODULE__{sock: sock, codec: codec}
  end

  def connect(host, port, path, headers) do
    Socket.Web.connect(host, port, path: path, headers: headers)
  end

  @impl GenServer
  def init(opts) do
    host = Keyword.get(opts, :host, "localhost")
    port = Keyword.get(opts, :port, 8000)
    path = Keyword.get(opts, :path, "/")
    headers = Keyword.get(opts, :headers, [])
    codec = Keyword.get(opts, :codec, Riverside.Codec.JSON)

    case connect(host, port, path, headers) do
      {:ok, sock} ->
        start_receiver(sock)
        {:ok, new(sock, codec)}

      {:error, _reason} ->
        {:stop, :normal}
    end
  end

  @spec start_receiver(%Socket.Web{}) :: pid
  defp start_receiver(sock) do
    spawn_link(fn ->
      receiver_loop(self(), sock)
    end)
  end

  @spec receiver_loop(pid, %Socket.Web{}) :: no_return
  defp receiver_loop(parent, sock) do
    case receive_message(sock) do
      {:ok, type, data} ->
        send(parent, {:data, type, data})
        receiver_loop(parent, sock)

      {:error, :unsupported_frame} ->
        receiver_loop(parent, sock)
    end
  end

  defp receive_message(sock) do
    case Socket.Web.recv!(sock) do
      {type, data} when type in [:text, :binary] ->
        {:ok, type, data}

      _other ->
        {:error, :unsupported_frame}
    end
  end

  defp decode_message(codec, type, packet) do
    if codec.frame_type === type do
      case codec.decode(packet) do
        {:ok, value} ->
          {:ok, value}

        {:error, reason} ->
          Logger.warn("<Riverside.TestClient> failed to decode received message: #{reason}")
          {:error, :bad_format}
      end
    else
      {:error, :unsupported_frame}
    end
  end

  defp wait_to_receive(timer, [], state) do
    :erlang.cancel_timer(timer)
    {:reply, :ok, state}
  end

  defp wait_to_receive(timer, [test | rest_tests], state) do
    receive do
      {:received, msg} ->
        case test.(msg) do
          :ok ->
            wait_to_receive(timer, rest_tests, state)

          :error ->
            {:reply, {:error, :failed}, state}
        end

      {:timeout, ^timer, :timeout} ->
        {:reply, {:error, :timeout}, state}
    end
  end

  @impl GenServer
  def handle_call({:wait_to_receive, tests, timeout}, _from, state) do
    timer = :erlang.start_timer(timeout, self(), :timeout)
    wait_to_receive(timer, tests, state)
  end

  def handle_call(:stop, _from, state) do
    Socket.Web.close(state.sock)
    {:stop, :normal, :ok, state}
  end

  @impl GenServer
  def handle_info({:data, type, data}, state) do
    case decode_message(state.codec, type, data) do
      {:ok, value} ->
        send(self(), {:receive, value})
        {:noreply, state}

      {:error, _reason} ->
        {:noreply, state}
    end
  end

  @impl GenServer
  def handle_cast({:send, packet}, %{codec: codec} = state) do
    case codec.encode(packet) do
      {:ok, value} ->
        Socket.Web.send!(state.sock, {codec.frame_type, value})
        {:noreply, state}

      {:error, reason} ->
        Logger.warn("<Riverside.TestClient> failed to format message: #{reason}")
        {:noreply, state}
    end
  end

  @impl GenServer
  def terminate(_reason, _state) do
    :ok
  end
end