defmodule Tai.VenueAdapters.Deribit.Stream.Connection do
use WebSockex
alias Tai.VenueAdapters.Deribit.Stream
defmodule State do
@type product :: Tai.Venues.Product.t()
@type account :: Tai.Venues.Account.t()
@type venue :: Tai.Venue.id()
@type credential_id :: Tai.Venue.credential_id()
@type channel_name :: atom
@type portfolio_channel :: String.t()
@type route :: :order_books
@type jsonrpc_id :: non_neg_integer
@type t :: %State{
venue: venue,
routes: %{required(route) => atom},
channels: [channel_name],
credential: {credential_id, map} | nil,
order_books: [product],
account_channels: %{optional(portfolio_channel) => account},
quote_depth: pos_integer,
opts: map,
last_heartbeat: pos_integer,
jsonrpc_id: jsonrpc_id,
jsonrpc_requests: %{
optional(jsonrpc_id) => pos_integer
}
}
@enforce_keys ~w[
venue
routes
channels
order_books
account_channels
quote_depth
opts
jsonrpc_id
jsonrpc_requests
]a
defstruct ~w[
venue
routes
channels
credential
order_books
account_channels
quote_depth
opts
last_heartbeat
jsonrpc_id
jsonrpc_requests
]a
end
@type stream :: Tai.Venues.Stream.t()
@type venue_id :: Tai.Venue.id()
@type credential_id :: Tai.Venue.credential_id()
@type credential :: Tai.Venue.credential()
@type venue_msg :: map
@spec start_link(
endpoint: String.t(),
stream: stream,
credential: {credential_id, credential} | nil
) :: {:ok, pid} | {:error, term}
def start_link(endpoint: endpoint, stream: stream, credential: credential) do
routes = %{
order_books: stream.venue.id |> Stream.RouteOrderBooks.to_name()
}
state = %State{
venue: stream.venue.id,
routes: routes,
channels: stream.venue.channels,
credential: credential,
order_books: stream.order_books,
account_channels: account_channels(stream.accounts),
quote_depth: stream.venue.quote_depth,
opts: stream.venue.opts,
jsonrpc_id: 1,
jsonrpc_requests: %{}
}
name = to_name(stream.venue.id)
headers = []
WebSockex.start_link(endpoint, __MODULE__, state, name: name, extra_headers: headers)
end
@spec to_name(venue_id) :: atom
def to_name(venue), do: :"#{__MODULE__}_#{venue}"
def handle_connect(_conn, state) do
TaiEvents.info(%Tai.Events.StreamConnect{venue: state.venue})
send(self(), :init_subscriptions)
{:ok, state}
end
def handle_disconnect(conn_status, state) do
TaiEvents.warn(%Tai.Events.StreamDisconnect{
venue: state.venue,
reason: conn_status.reason
})
{:ok, state}
end
def terminate(close_reason, state) do
TaiEvents.warn(%Tai.Events.StreamTerminate{venue: state.venue, reason: close_reason})
end
def handle_info(:init_subscriptions, state) do
send(self(), {:subscribe, :heartbeat})
send(self(), {:subscribe, :depth})
if state.credential, do: send(self(), {:subscribe, :authenticate})
{:ok, state}
end
@heartbeat_interval_s 10
def handle_info({:subscribe, :heartbeat}, state) do
msg =
%{
method: "public/set_heartbeat",
id: state.jsonrpc_id,
params: %{
interval: @heartbeat_interval_s
}
}
|> Jason.encode!()
state =
state
|> add_jsonrpc_request()
|> Map.put(:last_heartbeat, heartbeat_timestamp())
{:reply, {:text, msg}, state}
end
def handle_info({:subscribe, :depth}, state) do
channels = state.order_books |> Enum.map(&"book.#{&1.venue_symbol}.none.20.100ms")
msg =
%{
method: "public/subscribe",
id: state.jsonrpc_id,
params: %{
channels: channels
}
}
|> Jason.encode!()
state = state |> add_jsonrpc_request()
{:reply, {:text, msg}, state}
end
def handle_info({:subscribe, :authenticate}, state) do
data = ""
timestamp = ExDeribit.Auth.timestamp()
nonce = ExDeribit.Auth.nonce()
{_, credential} = state.credential
signature = ExDeribit.Auth.sign(credential.client_secret, timestamp, nonce, data)
msg =
%{
method: "public/auth",
id: state.jsonrpc_id,
params: %{
grant_type: "client_signature",
client_id: credential.client_id,
timestamp: timestamp,
signature: signature,
nonce: nonce,
data: data
}
}
|> Jason.encode!()
state = state |> add_jsonrpc_request()
{:reply, {:text, msg}, state}
end
def handle_frame({:text, msg}, state) do
msg
|> Jason.decode!()
|> handle_msg(state)
end
def handle_frame(_frame, state), do: {:ok, state}
defp handle_msg(
%{"id" => id, "result" => %{"access_token" => access_token}},
state
) do
msg =
%{
method: "private/subscribe",
id: state.jsonrpc_id,
params: %{
access_token: access_token,
channels: state.account_channels |> Map.keys()
}
}
|> Jason.encode!()
state =
state
|> delete_jsonrpc_request(id)
|> add_jsonrpc_request()
{:reply, {:text, msg}, state}
end
defp handle_msg(%{"id" => id, "result" => _}, state) do
state = delete_jsonrpc_request(state, id)
{:ok, state}
end
defp handle_msg(
%{
"method" => "subscription",
"params" => %{"channel" => "book." <> _channel}
} = msg,
state
) do
msg |> forward(:order_books, state)
{:ok, state}
end
@heartbeat_interval_timeout_ms 12_000
defp handle_msg(
%{
"method" => "heartbeat",
"params" => %{"type" => "heartbeat"}
},
state
) do
now = heartbeat_timestamp()
diff = now - state.last_heartbeat
state = Map.put(state, :last_heartbeat, now)
if diff > @heartbeat_interval_timeout_ms do
{:close, {1000, "heartbeat timeout"}, state}
else
{:ok, state}
end
end
defp handle_msg(
%{
"method" => "heartbeat",
"params" => %{"type" => "test_request"}
},
state
) do
msg =
%{method: "public/test", id: state.jsonrpc_id}
|> Jason.encode!()
state = state |> add_jsonrpc_request()
{:reply, {:text, msg}, state}
end
defp handle_msg(
%{
"params" => %{
"data" => %{"equity" => venue_equity},
"channel" => channel
},
"method" => "subscription"
},
state
) do
equity = Tai.Utils.Decimal.cast!(venue_equity)
account = state.account_channels |> Map.fetch!(channel)
account = %{account | equity: equity, locked: equity}
{:ok, _} = Tai.Venues.AccountStore.put(account)
{:ok, state}
end
defp handle_msg(_msg, state) do
{:ok, state}
end
defp heartbeat_timestamp, do: System.monotonic_time(:millisecond)
defp forward(msg, to, state) do
state.routes
|> Map.fetch!(to)
|> GenServer.cast({msg, Tai.Time.monotonic_time()})
end
defp add_jsonrpc_request(state) do
jsonrpc_requests =
state.jsonrpc_requests
|> Map.put(state.jsonrpc_id, System.monotonic_time())
state
|> Map.put(:jsonrpc_id, state.jsonrpc_id + 1)
|> Map.put(:jsonrpc_requests, jsonrpc_requests)
end
defp delete_jsonrpc_request(state, id) do
jsonrpc_requests = Map.delete(state.jsonrpc_requests, id)
Map.put(state, :jsonrpc_requests, jsonrpc_requests)
end
defp account_channels(accounts) do
accounts
|> Enum.map(&{&1.asset |> portfolio_channel(), &1})
|> Map.new()
end
defp portfolio_channel(asset) do
a = asset |> Atom.to_string() |> String.downcase()
"user.portfolio.#{a}"
end
end