defmodule BazVenueAdapterOpenSea.WebSocket do
use WebSockex
require Logger
defmodule State do
@type venue :: Baz.Venues.Venue.t()
@type t :: %State{
venue: venue,
base_endpoint: String.t(),
api_key: String.t(),
heartbeat_interval: pos_integer,
heartbeat_timeout: pos_integer,
heartbeat_timer: reference | nil,
heartbeat_timeout_timer: reference | nil,
compression: :unzip | :gunzip | nil,
opts: map
}
@enforce_keys ~w[
venue
heartbeat_interval
heartbeat_timeout
opts
]a
defstruct ~w[
venue
base_endpoint
api_key
heartbeat_interval
heartbeat_timeout
heartbeat_timer
heartbeat_timeout_timer
compression
opts
]a
end
@type venue :: Baz.Venues.Venue.t()
@base_endpoint "wss://stream.openseabeta.com/socket/websocket"
@spec start_link(venue: venue) :: {:ok, pid}
def start_link(venue: venue) do
api_key = Map.get(venue.credentials, :web_socket_api_key)
endpoint = "#{@base_endpoint}?token=#{api_key}"
state = %State{
venue: venue,
base_endpoint: endpoint,
api_key: api_key,
heartbeat_interval: 5000,
heartbeat_timeout: 10_000,
compression: :gunzip,
opts: %{}
}
WebSockex.start_link(endpoint, __MODULE__, state)
end
@impl true
def handle_connect(_conn, state) do
Logger.info("venue adapter web socket connected name=#{state.venue.name}")
Process.flag(:trap_exit, true)
send(self(), {:heartbeat, :start})
{:ok, state}
end
@impl true
def handle_disconnect(_conn_status, state) do
{:ok, state}
end
@impl true
def terminate(close_reason, state) do
Logger.info(
"venue adapter web socket terminate name=#{state.venue.name}, close_reason: #{inspect(close_reason)}"
)
:ok
end
@impl true
def handle_frame({:binary, <<43, 200, 207, 75, 7, 0>> = pong}, state) do
msg_received_at = received_at()
:zlib
|> apply(state.compression, [pong])
|> on_msg(msg_received_at, state)
end
@impl true
def handle_frame({:binary, compressed_data}, state) do
msg_received_at = received_at()
:zlib
|> apply(state.compression, [compressed_data])
|> Jason.decode!()
|> on_msg(msg_received_at, state)
end
@impl true
def handle_frame({:text, msg}, state) do
msg_received_at = received_at()
msg
|> Jason.decode!()
|> on_msg(msg_received_at, state)
end
@impl true
def handle_pong(:pong, state) do
state =
state
|> cancel_heartbeat_timeout()
|> schedule_heartbeat()
{:ok, state}
end
@impl true
def handle_info({:EXIT, _pid, :normal}, state) do
{:ok, state}
end
@impl true
def handle_info({:heartbeat, :start}, state) do
{:ok, schedule_heartbeat(state)}
end
@impl true
def handle_info({:heartbeat, :ping}, state) do
{:reply, :ping, schedule_heartbeat_timeout(state)}
end
@impl true
def handle_info({:heartbeat, :timeout}, state) do
{:close, {1000, "heartbeat timeout"}, state}
end
@impl true
def handle_info({:send_msg, msg}, state) do
json_msg = Jason.encode!(msg)
{:reply, {:text, json_msg}, state}
end
@impl true
def handle_info({:subscribe_collection_events, slugs}, state) do
msg = %{
"topic" => "collection:#{slugs}",
"event" => "phx_join",
"payload" => %{},
"ref" => 0
}
json_msg = Jason.encode!(msg)
{:reply, {:text, json_msg}, state}
end
def on_msg(msg, _, state) do
try do
case build_event_attrs({state.venue, msg}) do
{:ok, attrs} ->
Baz.CollectionEvents.create_collection_event(attrs)
{:error, reason} ->
Logger.error("build_event_attrs error: #{inspect(reason)}")
end
rescue
e ->
case e do
%KeyError{} ->
Logger.error("rescue key error: #{inspect(e)}")
_ ->
Logger.error("rescue error: #{inspect(e)}")
end
end
{:ok, state}
end
def subscribe_collection_events(pid, slugs) do
send(pid, {:subscribe_collection_events, slugs})
end
def subscribe(_, state), do: {:ok, state}
defp build_event_attrs({venue, msg}) do
case msg do
%{"event" => _event, "payload" => payload, "ref" => _ref, "topic" => _topic} ->
case payload do
%{"response" => _, "status" => _} ->
{:error, :ignore}
%{"payload" => inner_payload} ->
event_type = Map.fetch!(payload, "event_type")
collection = Map.fetch!(inner_payload, "collection")
slug = Map.fetch!(collection, "slug")
event_timestamp = Map.fetch!(inner_payload, "event_timestamp")
item = Map.fetch!(inner_payload, "item")
nft_id = Map.fetch!(item, "nft_id")
token_id = nft_id |> String.split("/") |> List.last()
attrs = %{
venue: venue.name,
slug: slug,
token_id: token_id,
source_id: nft_id,
event_type: event_type,
event_timestamp: event_timestamp
}
{:ok, attrs}
end
_ ->
{:error, :not_event_msg}
end
end
defp received_at, do: System.monotonic_time(:microsecond)
defp schedule_heartbeat(state) do
timer = Process.send_after(self(), {:heartbeat, :ping}, state.heartbeat_interval)
%{state | heartbeat_timer: timer}
end
defp schedule_heartbeat_timeout(state) do
timer = Process.send_after(self(), {:heartbeat, :timeout}, state.heartbeat_timeout)
%{state | heartbeat_timeout_timer: timer}
end
defp cancel_heartbeat_timeout(state) do
Process.cancel_timer(state.heartbeat_timeout_timer)
%{state | heartbeat_timeout_timer: nil}
end
end