lib/xtb_client/streaming_socket.ex

defmodule XtbClient.StreamingSocket do
  use WebSockex

  alias XtbClient.{AccountType}
  alias XtbClient.Messages

  require Logger

  @ping_interval 30 * 1000
  @rate_limit_interval 200

  @type client :: atom | pid | {atom, any} | {:via, atom, any}

  @moduledoc """
  WebSocket server used for asynchronous communication.
  
  `StreamingSocket` is being used like standard `GenServer` - could be started with `start_link/1` and supervised.
  
  After successful connection to WebSocket the flow is:
  - process schedules to itself the `ping` command (with recurring interval) - to maintain persistent connection with backend.
  """

  @doc """
  Starts a `XtbClient.StreamingSocket` process linked to the calling process.
  """
  @spec start_link(%{
          :stream_session_id => binary(),
          :type => AccountType.t(),
          :url => binary | URI.t(),
          optional(any) => any
        }) :: GenServer.on_start()
  def start_link(%{url: url, type: type, stream_session_id: _stream_session_id} = state) do
    account_type = AccountType.format_streaming(type)
    uri = URI.merge(url, account_type) |> URI.to_string()

    state =
      state
      |> Map.put(:last_sub, actual_rate())
      |> Map.put(:subscriptions, %{})

    WebSockex.start_link(uri, __MODULE__, state)
  end

  @impl WebSockex
  def handle_connect(_conn, %{stream_session_id: stream_session_id} = state) do
    ping_command = encode_streaming_command("ping", stream_session_id)
    ping_message = {:ping, {:text, ping_command}, @ping_interval}
    schedule_work(ping_message, 1)

    {:ok, state}
  end

  defp schedule_work(message, interval) do
    Process.send_after(self(), message, interval)
  end

  @doc """
  Subscribes `pid` process for messages from `method` query.
  
  ## Arguments
  - `client` pid of the streaming socket process,
  - `pid` pid of the caller awaiting for the result,
  - `ref` unique reference of the query,
  - `method` name of the query method,
  - `params` [optional] arguments of the `method`.
  
  Result of the query will be delivered to message mailbox of the `pid` process.
  """
  @spec subscribe(client(), client(), term(), binary()) :: :ok
  def subscribe(client, pid, ref, method) do
    WebSockex.cast(client, {:subscribe, {pid, ref, method}})
  end

  @spec subscribe(client(), client(), term(), binary(), map()) :: :ok
  def subscribe(client, pid, ref, method, params) do
    WebSockex.cast(client, {:subscribe, {pid, ref, method, params}})
  end

  @impl WebSockex
  def handle_cast(
        {:subscribe, {pid, ref, method}},
        %{subscriptions: subscriptions, last_sub: last_sub, stream_session_id: session_id} = state
      ) do
    last_sub = check_rate(last_sub, actual_rate())

    message = encode_streaming_command(method, session_id)
    subscriptions = Map.put(subscriptions, ref, {:subscription, pid, ref, method})

    state =
      state
      |> Map.put(:subscriptions, subscriptions)
      |> Map.put(:last_sub, last_sub)

    {:reply, {:text, message}, state}
  end

  @impl WebSockex
  def handle_cast(
        {:subscribe, {pid, ref, method, params}},
        %{subscriptions: subscriptions, last_sub: last_sub, stream_session_id: session_id} = state
      ) do
    last_sub = check_rate(last_sub, actual_rate())

    message = encode_streaming_command(method, session_id, params)
    subscriptions = Map.put(subscriptions, ref, {:subscription, pid, ref, method})

    state =
      state
      |> Map.put(:subscriptions, subscriptions)
      |> Map.put(:last_sub, last_sub)

    {:reply, {:text, message}, state}
  end

  defp check_rate(prev_rate_ms, actual_rate_ms) do
    rate_diff = actual_rate_ms - prev_rate_ms

    case rate_diff > @rate_limit_interval do
      true ->
        actual_rate_ms

      false ->
        Process.sleep(rate_diff)
        actual_rate()
    end
  end

  defp actual_rate() do
    DateTime.utc_now()
    |> DateTime.to_unix(:millisecond)
  end

  defp encode_streaming_command(type, streaming_session_id) do
    Jason.encode!(%{
      command: type,
      streamSessionId: streaming_session_id
    })
  end

  defp encode_streaming_command(type, streaming_session_id, params) do
    %{
      command: type,
      streamSessionId: streaming_session_id
    }
    |> Map.merge(Map.from_struct(params))
    |> Jason.encode!()
  end

  @impl WebSockex
  def handle_frame({:text, msg}, state) do
    resp = Jason.decode!(msg)
    handle_response(resp, state)
  end

  defp handle_response(
         %{"command" => command, "data" => data},
         %{subscriptions: subscriptions} = state
       ) do
    {:subscription, pid, ^command, method} = Map.get(subscriptions, command)

    result = Messages.decode_message(method, data)
    GenServer.cast(pid, {:stream, method, result})

    {:ok, state}
  end

  defp handle_response(%{"status" => true}, state) do
    {:ok, state}
  end

  defp handle_response(
         %{"status" => false, "errorCode" => code, "errorDescr" => message},
         state
       ) do
    Logger.error("Exception: #{inspect(%{code: code, message: message})}")

    {:close, state}
  end

  @impl WebSockex
  def handle_info({:ping, {:text, _command} = frame, interval} = message, state) do
    schedule_work(message, interval)
    {:reply, frame, state}
  end
end