lib/baz_venue_adapter_open_sea/web_socket.ex

defmodule BazVenueAdapterOpenSea.WebSocket do
  use WebSockex
  require Logger

  defmodule State do
    @type venue :: Baz.Venues.Venue.t()
    @type t :: %State{
            venue: venue,
            base_endpoint: String.t(),
            api_key: String.t(),
            heartbeat_interval: pos_integer,
            heartbeat_timeout: pos_integer,
            heartbeat_timer: reference | nil,
            heartbeat_timeout_timer: reference | nil,
            compression: :unzip | :gunzip | nil,
            opts: map
          }

    @enforce_keys ~w[
      venue
      heartbeat_interval
      heartbeat_timeout
      opts
    ]a
    defstruct ~w[
      venue
      base_endpoint
      api_key
      heartbeat_interval
      heartbeat_timeout
      heartbeat_timer
      heartbeat_timeout_timer
      compression
      opts
    ]a
  end

  @type venue :: Baz.Venues.Venue.t()

  @base_endpoint "wss://stream.openseabeta.com/socket/websocket"

  @spec start_link(venue: venue) :: {:ok, pid}
  def start_link(venue: venue) do
    api_key = Map.get(venue.credentials, :web_socket_api_key)
    endpoint = "#{@base_endpoint}?token=#{api_key}"

    state = %State{
      venue: venue,
      base_endpoint: endpoint,
      api_key: api_key,
      heartbeat_interval: 5000,
      heartbeat_timeout: 10_000,
      compression: :gunzip,
      opts: %{}
    }

    WebSockex.start_link(endpoint, __MODULE__, state)
  end

  @impl true
  def handle_connect(_conn, state) do
    Logger.info("venue adapter web socket connected name=#{state.venue.name}")
    Process.flag(:trap_exit, true)
    send(self(), {:heartbeat, :start})

    {:ok, state}
  end

  @impl true
  def handle_disconnect(_conn_status, state) do
    {:ok, state}
  end

  @impl true
  def terminate(close_reason, state) do
    Logger.info(
      "venue adapter web socket terminate name=#{state.venue.name}, close_reason: #{inspect(close_reason)}"
    )

    :ok
  end

  @impl true
  def handle_frame({:binary, <<43, 200, 207, 75, 7, 0>> = pong}, state) do
    msg_received_at = received_at()

    :zlib
    |> apply(state.compression, [pong])
    |> on_msg(msg_received_at, state)
  end

  @impl true
  def handle_frame({:binary, compressed_data}, state) do
    msg_received_at = received_at()

    :zlib
    |> apply(state.compression, [compressed_data])
    |> Jason.decode!()
    |> on_msg(msg_received_at, state)
  end

  @impl true
  def handle_frame({:text, msg}, state) do
    msg_received_at = received_at()

    msg
    |> Jason.decode!()
    |> on_msg(msg_received_at, state)
  end

  @impl true
  def handle_pong(:pong, state) do
    state =
      state
      |> cancel_heartbeat_timeout()
      |> schedule_heartbeat()

    {:ok, state}
  end

  @impl true
  def handle_info({:EXIT, _pid, :normal}, state) do
    {:ok, state}
  end

  @impl true
  def handle_info({:heartbeat, :start}, state) do
    {:ok, schedule_heartbeat(state)}
  end

  @impl true
  def handle_info({:heartbeat, :ping}, state) do
    {:reply, :ping, schedule_heartbeat_timeout(state)}
  end

  @impl true
  def handle_info({:heartbeat, :timeout}, state) do
    {:close, {1000, "heartbeat timeout"}, state}
  end

  @impl true
  def handle_info({:send_msg, msg}, state) do
    json_msg = Jason.encode!(msg)
    {:reply, {:text, json_msg}, state}
  end

  @impl true
  def handle_info({:subscribe_collection_events, slugs}, state) do
    msg = %{
      "topic" => "collection:#{slugs}",
      "event" => "phx_join",
      "payload" => %{},
      "ref" => 0
    }

    json_msg = Jason.encode!(msg)

    {:reply, {:text, json_msg}, state}
  end

  def on_msg(msg, _, state) do
    try do
      case build_event_attrs({state.venue, msg}) do
        {:ok, attrs} ->
          Baz.CollectionEvents.create_collection_event(attrs)

        {:error, reason} ->
            Logger.error("build_event_attrs error: #{inspect(reason)}")
      end
    rescue
      e ->
        case e do
          %KeyError{} ->
            Logger.error("rescue key error: #{inspect(e)}")

          _ ->
            Logger.error("rescue error: #{inspect(e)}")
        end
    end

    {:ok, state}
  end

  def subscribe_collection_events(pid, slugs) do
    send(pid, {:subscribe_collection_events, slugs})
  end

  def subscribe(_, state), do: {:ok, state}

  defp build_event_attrs({venue, msg}) do
    case msg do
      %{"event" => _event, "payload" => payload, "ref" => _ref, "topic" => _topic} ->
        case payload do
          %{"response" => _, "status" => _} ->
            {:error, :ignore}

          %{"payload" => inner_payload} ->
            event_type = Map.fetch!(payload, "event_type")
            collection = Map.fetch!(inner_payload, "collection")
            slug = Map.fetch!(collection, "slug")
            event_timestamp = Map.fetch!(inner_payload, "event_timestamp")
            item = Map.fetch!(inner_payload, "item")
            nft_id = Map.fetch!(item, "nft_id")
            token_id = nft_id |> String.split("/") |> List.last()

            attrs = %{
              venue: venue.name,
              slug: slug,
              token_id: token_id,
              source_id: nft_id,
              event_type: event_type,
              event_timestamp: event_timestamp
            }

            {:ok, attrs}
        end

      _ ->
        {:error, :not_event_msg}
    end
  end

  defp received_at, do: System.monotonic_time(:microsecond)

  defp schedule_heartbeat(state) do
    timer = Process.send_after(self(), {:heartbeat, :ping}, state.heartbeat_interval)
    %{state | heartbeat_timer: timer}
  end

  defp schedule_heartbeat_timeout(state) do
    timer = Process.send_after(self(), {:heartbeat, :timeout}, state.heartbeat_timeout)
    %{state | heartbeat_timeout_timer: timer}
  end

  defp cancel_heartbeat_timeout(state) do
    Process.cancel_timer(state.heartbeat_timeout_timer)
    %{state | heartbeat_timeout_timer: nil}
  end
end