lib/xtb_client/streaming_socket.ex

defmodule XtbClient.StreamingSocket do
  use WebSockex

  alias XtbClient.{AccountType, StreamingMessage}
  alias XtbClient.Messages

  require Logger

  @ping_interval 30 * 1000
  @rate_limit_interval 200

  @type client :: atom | pid | {atom, any} | {:via, atom, any}

  @moduledoc """
  WebSocket server used for asynchronous communication.
  
  `StreamingSocket` is being used like standard `GenServer` - could be started with `start_link/1` and supervised.
  
  After successful connection to WebSocket the flow is:
  - process schedules to itself the `ping` command (with recurring interval) - to maintain persistent connection with backend.
  """

  @doc """
  Starts a `XtbClient.StreamingSocket` process linked to the calling process.
  """
  @spec start_link(%{
          :stream_session_id => binary(),
          :type => AccountType.t(),
          :url => binary | URI.t(),
          optional(any) => any
        }) :: GenServer.on_start()
  def start_link(%{url: url, type: type, stream_session_id: _stream_session_id} = state) do
    account_type = AccountType.format_streaming(type)
    uri = URI.merge(url, account_type) |> URI.to_string()

    state =
      state
      |> Map.put(:last_sub, actual_rate())
      |> Map.put(:subscriptions, %{})

    WebSockex.start_link(uri, __MODULE__, state)
  end

  @impl WebSockex
  def handle_connect(_conn, %{stream_session_id: stream_session_id} = state) do
    ping_command = encode_streaming_command({"ping", nil}, stream_session_id)
    ping_message = {:ping, {:text, ping_command}, @ping_interval}
    schedule_work(ping_message, 1)

    {:ok, state}
  end

  defp schedule_work(message, interval) do
    Process.send_after(self(), message, interval)
  end

  @doc """
  Subscribes `pid` process for messages from `method` query.
  
  ## Arguments
  - `server` pid of the streaming socket process,
  - `caller` pid of the caller awaiting for the result,
  - `message` struct with call context, see `XtbClient.StreamingMessage`.
  
  Result of the query will be delivered to message mailbox of the `caller` process.
  """
  @spec subscribe(client(), client(), StreamingMessage.t()) :: :ok
  def subscribe(
        server,
        caller,
        %StreamingMessage{} = message
      ) do
    WebSockex.cast(server, {:subscribe, {caller, message}})
  end

  @impl WebSockex
  def handle_cast(
        {:subscribe,
         {caller,
          %StreamingMessage{
            method: method,
            response_method: response_method,
            params: params
          } = message}},
        %{subscriptions: subscriptions, last_sub: last_sub, stream_session_id: session_id} = state
      ) do
    last_sub = check_rate(last_sub, actual_rate())

    token = StreamingMessage.encode_token(message)

    subscriptions =
      Map.update(
        subscriptions,
        response_method,
        {method, %{token => caller}},
        fn {method, value} ->
          {method, Map.put(value, token, caller)}
        end
      )

    state =
      state
      |> Map.put(:subscriptions, subscriptions)
      |> Map.put(:last_sub, last_sub)

    encoded_message = encode_streaming_command({method, params}, session_id)

    {:reply, {:text, encoded_message}, state}
  end

  defp check_rate(prev_rate_ms, actual_rate_ms) do
    rate_diff = actual_rate_ms - prev_rate_ms

    case rate_diff > @rate_limit_interval do
      true ->
        actual_rate_ms

      false ->
        Process.sleep(rate_diff)
        actual_rate()
    end
  end

  defp actual_rate() do
    DateTime.utc_now()
    |> DateTime.to_unix(:millisecond)
  end

  defp encode_streaming_command({method, nil}, streaming_session_id) do
    Jason.encode!(%{
      command: method,
      streamSessionId: streaming_session_id
    })
  end

  defp encode_streaming_command({method, params}, streaming_session_id) do
    %{
      command: method,
      streamSessionId: streaming_session_id
    }
    |> Map.merge(Map.from_struct(params))
    |> Jason.encode!()
  end

  @impl WebSockex
  def handle_frame({:text, msg}, state) do
    resp = Jason.decode!(msg)
    handle_response(resp, state)
  end

  defp handle_response(
         %{"command" => response_method, "data" => data},
         %{subscriptions: subscriptions} = state
       ) do
    {method, method_subs} = Map.get(subscriptions, response_method)
    result = Messages.decode_message(method, data)

    token =
      StreamingMessage.new(method, response_method, result)
      |> StreamingMessage.encode_token()

    caller = Map.get(method_subs, token)

    GenServer.cast(caller, {:stream_result, {token, result}})

    {:ok, state}
  end

  defp handle_response(%{"status" => true}, state) do
    {:ok, state}
  end

  defp handle_response(
         %{"status" => false, "errorCode" => code, "errorDescr" => message},
         state
       ) do
    Logger.error("Exception: #{inspect(%{code: code, message: message})}")

    {:close, state}
  end

  @impl WebSockex
  def handle_info({:ping, {:text, _command} = frame, interval} = message, state) do
    schedule_work(message, interval)
    {:reply, frame, state}
  end
end