lib/tai/venue_adapters/bitmex/stream/connection.ex

defmodule Tai.VenueAdapters.Bitmex.Stream.Connection do
  use Tai.Venues.Streams.ConnectionAdapter
  alias Tai.VenueAdapters.Bitmex.Stream

  @type stream :: Tai.Venues.Stream.t()
  @type venue_id :: Tai.Venue.id()
  @type credential_id :: Tai.Venue.credential_id()
  @type credential :: Tai.Venue.credential()

  @spec start_link(
          endpoint: String.t(),
          stream: stream,
          credential: {credential_id, credential} | nil
        ) :: {:ok, pid} | {:error, term}
  def start_link(endpoint: endpoint, stream: stream, credential: credential) do
    routes = %{
      auth: stream.venue.id |> Stream.ProcessAuth.process_name(),
      markets: stream.venue.id |> Stream.RouteOrderBooks.to_name(),
      optional_channels: stream.venue.id |> Stream.ProcessOptionalChannels.to_name()
    }

    state = %Tai.Venues.Streams.ConnectionAdapter.State{
      venue: stream.venue.id,
      routes: routes,
      channels: stream.venue.channels,
      credential: credential,
      markets: stream.markets,
      quote_depth: stream.venue.quote_depth,
      heartbeat_interval: stream.venue.stream_heartbeat_interval,
      heartbeat_timeout: stream.venue.stream_heartbeat_timeout,
      opts: stream.venue.opts
    }

    name = process_name(stream.venue.id)
    headers = auth_headers(state.credential)
    WebSockex.start_link(endpoint, __MODULE__, state, name: name, extra_headers: headers)
  end

  @optional_channels [
    :trades,
    :connected_stats,
    :liquidations,
    :notifications,
    :funding,
    :insurance,
    :settlement
  ]
  @impl true
  def subscribe(:init, state) do
    if state.credential do
      send(self(), {:subscribe, :login})
      send(self(), {:subscribe, :margin})
      send(self(), {:subscribe, :positions})
    end

    send(self(), {:subscribe, :depth})

    state.channels
    |> Enum.each(fn c ->
      if Enum.member?(@optional_channels, c) do
        send(self(), {:subscribe, c})
      else
        TaiEvents.warning(%Tai.Events.StreamChannelInvalid{
          venue: state.venue,
          name: c,
          available: @optional_channels
        })
      end
    end)

    schedule_autocancel(0)

    {:ok, state}
  end

  @impl true
  def subscribe(:login, state) do
    msg = %{"op" => "subscribe", "args" => ["order"]} |> Jason.encode!()
    {:reply, {:text, msg}, state}
  end

  # Bitmex has an unpublished limit to websocket message lengths.
  @order_books_chunk_count 10
  @impl true
  def subscribe(:depth, state) do
    # > 25 quotes are experimental. It has performance issues causing message queue back pressure
    order_book_table = if state.quote_depth <= 25, do: "orderBookL2_25", else: "orderBookL2"

    state.markets
    |> Enum.chunk_every(@order_books_chunk_count)
    |> Enum.each(fn markets ->
      args = markets |> Enum.map(fn p -> "#{order_book_table}:#{p.venue_symbol}" end)
      msg = %{"op" => "subscribe", "args" => args}
      send(self(), {:send_msg, msg})
    end)

    {:ok, state}
  end

  @impl true
  def subscribe(:trades, state) do
    state.markets
    |> Enum.chunk_every(@order_books_chunk_count)
    |> Enum.each(fn markets ->
      args = markets |> Enum.map(fn p -> "trade:#{p.venue_symbol}" end)
      msg = %{"op" => "subscribe", "args" => args}
      send(self(), {:send_msg, msg})
    end)

    {:ok, state}
  end

  @impl true
  def subscribe(:positions, state) do
    msg = %{"op" => "subscribe", "args" => ["position"]} |> Jason.encode!()
    {:reply, {:text, msg}, state}
  end

  @impl true
  def subscribe(:margin, state) do
    msg = %{"op" => "subscribe", "args" => ["margin"]} |> Jason.encode!()
    {:reply, {:text, msg}, state}
  end

  @impl true
  def subscribe(:connected_stats, state) do
    msg = %{"op" => "subscribe", "args" => ["connected"]} |> Jason.encode!()
    {:reply, {:text, msg}, state}
  end

  @impl true
  def subscribe(:liquidations, state) do
    msg = %{"op" => "subscribe", "args" => ["liquidation"]} |> Jason.encode!()
    {:reply, {:text, msg}, state}
  end

  @impl true
  def subscribe(:notifications, state) do
    msg = %{"op" => "subscribe", "args" => ["publicNotifications"]} |> Jason.encode!()
    {:reply, {:text, msg}, state}
  end

  @impl true
  def subscribe(:funding, state) do
    msg = %{"op" => "subscribe", "args" => ["funding"]} |> Jason.encode!()
    {:reply, {:text, msg}, state}
  end

  @impl true
  def subscribe(:insurance, state) do
    msg = %{"op" => "subscribe", "args" => ["insurance"]} |> Jason.encode!()
    {:reply, {:text, msg}, state}
  end

  @impl true
  def subscribe(:settlement, state) do
    msg = %{"op" => "subscribe", "args" => ["settlement"]} |> Jason.encode!()
    {:reply, {:text, msg}, state}
  end

  # TODO: Figure out how to move this
  @impl true
  def handle_info(
        :ping_autocancel,
        %{
          opts: %{
            autocancel: %{ping_interval_ms: ping_interval_ms, cancel_after_ms: cancel_after_ms}
          }
        } = state
      )
      when ping_interval_ms > 0 and is_integer(ping_interval_ms) and
             cancel_after_ms > 0 and is_integer(cancel_after_ms) do
    schedule_autocancel(ping_interval_ms)
    msg = %{"op" => "cancelAllAfter", "args" => cancel_after_ms} |> Jason.encode!()
    {:reply, {:text, msg}, state}
  end

  @impl true
  def handle_info(:ping_autocancel, state), do: {:ok, state}

  @impl true
  def on_msg(%{"limit" => %{"remaining" => remaining}, "version" => _}, _received_at, state) do
    TaiEvents.info(%Tai.Events.BitmexStreamConnectionLimitDetails{
      venue_id: state.venue,
      remaining: remaining
    })
    {:ok, state}
  end

  @impl true
  def on_msg(%{"request" => _, "subscribe" => _} = msg, received_at, state) do
    msg |> forward(:markets, received_at, state)
    {:ok, state}
  end

  @order_book_tables ~w(orderBookL2 orderBookL2_25)
  @impl true
  def on_msg(%{"table" => table} = msg, received_at, state) when table in @order_book_tables do
    msg |> forward(:markets, received_at, state)
    {:ok, state}
  end

  @auth_tables ~w(position wallet margin order execution transact)
  @impl true
  def on_msg(%{"table" => table} = msg, received_at, state) when table in @auth_tables do
    msg |> forward(:auth, received_at, state)
    {:ok, state}
  end

  @impl true
  def on_msg(msg, received_at, state) do
    msg |> forward(:optional_channels, received_at, state)
    {:ok, state}
  end

  defp auth_headers({_credential_id, %{api_key: api_key, api_secret: api_secret}}) do
    nonce = ExBitmex.Auth.nonce()
    api_signature = ExBitmex.Auth.sign(api_secret, "GET", "/realtime", nonce, "")

    [
      "api-key": api_key,
      "api-signature": api_signature,
      "api-expires": nonce
    ]
  end

  defp auth_headers(nil), do: []

  defp schedule_autocancel(after_ms) do
    Process.send_after(self(), :ping_autocancel, after_ms)
  end

  defp forward(msg, to, received_at, state) do
    state.routes
    |> Map.fetch!(to)
    |> GenServer.cast({msg, received_at})
  end
end