defmodule Polymarket.WebSocket do
@moduledoc """
A WebSocket client for the Polymarket CLOB market feed.
Wraps `Mint.WebSocket` in a `GenServer`, handling the HTTP upgrade,
periodic pings, and decoding of incoming text frames.
"""
use GenServer
use TypedStruct
alias Polymarket.WebSocket.MessageHandler
require Logger
require Mint.HTTP
# `Mint.WebSocket.new/4`'s success typing is inferred as error-only (a known
# Dialyzer limitation: the `with` flow inside its private `do_new/4` hides the
# `{:ok, conn, websocket}` return), so the matching clause in `handle_responses/2`
# is flagged as unreachable even though it fires on every successful upgrade.
@dialyzer {:no_match, handle_responses: 2}
typedstruct do
field(:conn, Mint.HTTP.t())
field(:websocket, Mint.WebSocket.t())
field(:request_ref, Mint.Types.request_ref())
field(:caller, GenServer.from())
field(:status, pos_integer())
field(:resp_headers, Mint.Types.headers())
field(:closing?, boolean())
field(:timer_ref, :timer.tref())
field(:last_pong, DateTime.t())
end
# ---------------------------------------------------------------------------#
# API #
# ---------------------------------------------------------------------------#
@spec start_link(term()) :: {:ok, pid()} | {:error, term()}
def start_link(_opts) do
with {:ok, socket} <- GenServer.start_link(__MODULE__, []),
{:ok, :connected} <- GenServer.call(socket, :connect) do
message = ~S"""
{
"assets_ids": [],
"type": "market",
"custom_feature_enabled": true
}
"""
send_message(socket, message)
{:ok, socket}
end
end
@spec send_message(pid(), String.t()) :: :ok
def send_message(pid, text) do
GenServer.call(pid, {:send_text, text})
end
# ---------------------------------------------------------------------------#
# GenServer #
# ---------------------------------------------------------------------------#
@impl GenServer
def init([]) do
{:ok, timer} = :timer.send_interval(to_timeout(second: 3), :send_ping)
{:ok, %__MODULE__{timer_ref: timer}}
end
@impl GenServer
def handle_call({:send_text, text}, _from, state) do
{:ok, state} = send_frame(state, {:text, text})
{:reply, :ok, state}
end
@impl GenServer
def handle_call(:connect, from, state) do
uri = URI.parse("wss://ws-subscriptions-clob.polymarket.com/ws/market")
http_scheme =
case uri.scheme do
"ws" -> :http
"wss" -> :https
end
ws_scheme =
case uri.scheme do
"ws" -> :ws
"wss" -> :wss
end
# The endpoint is a fixed URL with no query string.
path = uri.path
with {:ok, conn} <- Mint.HTTP.connect(http_scheme, uri.host, uri.port, protocols: [:http1]),
{:ok, conn, ref} <- Mint.WebSocket.upgrade(ws_scheme, conn, path, []) do
state = %{state | conn: conn, request_ref: ref, caller: from}
{:noreply, state}
else
{:error, reason} ->
{:reply, {:error, reason}, state}
{:error, conn, reason} ->
{:reply, {:error, reason}, put_in(state.conn, conn)}
end
end
@impl GenServer
def handle_info(:send_ping, state) do
{:ok, state} = send_frame(state, {:text, "PING"})
{:noreply, state}
end
def handle_info(message, state) do
case Mint.WebSocket.stream(state.conn, message) do
{:ok, conn, responses} ->
state =
put_in(state.conn, conn)
|> handle_responses(responses)
if state.closing? do
do_close(state)
else
{:noreply, state}
end
{:error, conn, reason, _responses} ->
Logger.debug("Error: #{inspect(binding())}")
state =
put_in(state.conn, conn)
|> reply({:error, reason})
{:stop, :server_disconnected, state}
:unknown ->
{:noreply, state}
end
end
# ---------------------------------------------------------------------------#
# Helpers #
# ---------------------------------------------------------------------------#
# ---------------------------------------------------------------------------
# Handle Response
defp handle_responses(state, responses)
defp handle_responses(%{request_ref: ref} = state, [{:status, ref, status} | rest]) do
Logger.debug("Status: #{inspect(status)}")
put_in(state.status, status)
|> handle_responses(rest)
end
defp handle_responses(%{request_ref: ref} = state, [{:headers, ref, resp_headers} | rest]) do
Logger.debug("Headers: #{inspect(resp_headers)}")
put_in(state.resp_headers, resp_headers)
|> handle_responses(rest)
end
# executes
defp handle_responses(%{request_ref: ref, status: status, resp_headers: resp_headers} = state, [{:done, ref} | rest])
when is_integer(status) and is_list(resp_headers) do
Logger.debug("Done")
case Mint.WebSocket.new(state.conn, ref, status, resp_headers) do
{:ok, conn, websocket} ->
%{state | conn: conn, websocket: websocket, status: nil, resp_headers: nil}
|> reply({:ok, :connected})
|> handle_responses(rest)
{:error, conn, reason} ->
Logger.error("Error in response #{inspect(binding())}")
put_in(state.conn, conn)
|> reply({:error, reason})
end
end
defp handle_responses(%{request_ref: ref, websocket: websocket} = state, [{:data, ref, data} | rest])
when websocket != nil do
case Mint.WebSocket.decode(websocket, data) do
{:ok, websocket, frames} ->
put_in(state.websocket, websocket)
|> handle_frames(frames)
|> handle_responses(rest)
{:error, websocket, reason} ->
put_in(state.websocket, websocket)
|> reply({:error, reason})
end
end
defp handle_responses(state, [_response | rest]) do
handle_responses(state, rest)
end
defp handle_responses(state, []), do: state
# ---------------------------------------------------------------------------
# Send Frame
defp send_frame(state, frame) do
case Mint.WebSocket.encode(state.websocket, frame) do
{:ok, websocket, data} ->
state = put_in(state.websocket, websocket)
case Mint.WebSocket.stream_request_body(state.conn, state.request_ref, data) do
{:ok, conn} -> {:ok, put_in(state.conn, conn)}
{:error, conn, reason} -> {:error, put_in(state.conn, conn), reason}
end
{:error, websocket, reason} ->
{:error, put_in(state.websocket, websocket), reason}
end
end
# ---------------------------------------------------------------------------
# Receive Frame
defp handle_frames(state, frames) do
Enum.reduce(frames, state, fn
# reply to pings with pongs
{:ping, data}, state ->
Logger.debug("Received ping")
{:ok, state} = send_frame(state, {:pong, data})
state
{:close, _code, reason}, state ->
Logger.debug("Closing connection: #{inspect(reason)}")
%{state | closing?: true}
{:text, "PONG"}, state ->
Logger.debug("Received pong")
put_in(state.last_pong, DateTime.utc_now())
{:text, text}, state ->
log_message(text)
# try and decode the message to json.
case decode_message(text) do
{:ok, message} ->
Logger.debug("Received: #{inspect(message)}")
MessageHandler.handle_message(message, state)
|> process_result()
{:error, err} ->
Logger.warning("failed to decode message #{text} #{inspect(err)}")
state
end
frame, state ->
Logger.debug("Unexpected frame received: #{inspect(frame)}")
state
end)
end
# ---------------------------------------------------------------------------
# Closing
defp do_close(state) do
Logger.debug("Closing websocket #{inspect(state)}")
# Streaming a close frame may fail if the server has already closed
# for writing.
_ = send_frame(state, :close)
Mint.HTTP.close(state.conn)
{:stop, :normal, state}
end
# ---------------------------------------------------------------------------
# Reply
defp reply(state, response) do
if state.caller, do: GenServer.reply(state.caller, response)
put_in(state.caller, nil)
end
# ---------------------------------------------------------------------------
# Decoding
defp decode_message(message) do
message
|> Jason.decode(keys: :atoms)
end
# ---------------------------------------------------------------------------
# Logging for debugging
@logfile "test/fixtures/polymarket_websocket_responses.txt"
defp log_message(text) do
File.write!(@logfile, text <> "\n", [:append])
end
# ---------------------------------------------------------------------------
# Process response from handler
defp process_result({:reply, frame, state}) do
{:ok, state} = send_frame(state, frame)
state
end
defp process_result({:noreply, state}) do
state
end
end