%%% -*- erlang -*-
%%%
%%% This file is part of hackney released under the Apache 2 license.
%%% See the NOTICE for more information.
%%%
%%% Copyright (c) 2024-2025 Benoît Chesneau <benoitc@pm.me>
%%%
%%% @doc gen_statem process for WebSocket connections.
%%%
%%% This module implements a state machine for WebSocket connections,
%%% handling HTTP upgrade handshake and WebSocket frame exchange.
%%%
%%% States:
%%% - idle: Process started, not connected
%%% - upgrading: HTTP upgrade in progress
%%% - connected: WebSocket ready for messages
%%% - closing: Close handshake in progress
%%% - closed: Connection terminated
-module(hackney_ws).
-behaviour(gen_statem).
%% API
-export([
start_link/1,
connect/1,
connect/2,
send/2,
recv/1,
recv/2,
setopts/2,
close/1,
close/2,
controlling_process/2,
peername/1,
sockname/1
]).
%% gen_statem callbacks
-export([
init/1,
callback_mode/0,
terminate/3,
code_change/4
]).
%% State functions
-export([
idle/3,
upgrading/3,
connected/3,
closing/3,
closed/3
]).
-define(CONNECT_TIMEOUT, 8000).
-define(RECV_TIMEOUT, infinity).
-define(CLOSE_TIMEOUT, 5000).
%% GHSA-q8jg: bound attacker-controlled buffers. A single frame's declared
%% length, the cumulative size of a fragmented message, and the handshake
%% response are all capped so a hostile server cannot drive the client to OOM.
-define(DEFAULT_MAX_FRAME_SIZE, 16#1000000). %% 16 MiB per frame
-define(DEFAULT_MAX_MESSAGE_SIZE, 16#4000000). %% 64 MiB per fragmented message
-define(MAX_HANDSHAKE_RESPONSE_SIZE, 65536). %% 64 KiB of upgrade headers
%% WebSocket frame types (for documentation)
-type ws_frame() :: {text, binary()}
| {binary, binary()}
| ping
| {ping, binary()}
| pong
| {pong, binary()}
| close
| {close, integer(), binary()}.
-export_type([ws_frame/0]).
%% State data record
-record(ws_data, {
%% Connection owner (linked via start_link, trap_exit handles owner death)
owner :: pid(),
%% Connection identity
host :: string() | binary(),
port :: inet:port_number(),
transport :: hackney_tcp | hackney_ssl,
path :: binary(),
%% Socket state
socket :: inet:socket() | ssl:sslsocket() | undefined,
buffer = <<>> :: binary(),
%% Options
connect_timeout = ?CONNECT_TIMEOUT :: timeout(),
recv_timeout = ?RECV_TIMEOUT :: timeout(),
connect_options = [] :: list(),
ssl_options = [] :: list(),
proxy = false :: false | {connect | socks5, string(), inet:port_number(),
undefined | {binary(), binary()}, tcp | ssl},
%% WebSocket options
active = false :: false | true | once,
headers = [] :: [{binary(), binary()}],
protocols = [] :: [binary()],
%% WebSocket handshake state
ws_key :: binary() | undefined,
ws_accept :: binary() | undefined,
ws_protocol :: binary() | undefined,
%% Frame parsing state (for hackney_ws_proto)
frag_state = undefined :: term(),
frag_buffer = [] :: list(), %% Accumulated fragment payloads
frag_size = 0 :: non_neg_integer(), %% Bytes buffered in frag_buffer
utf8_state = 0 :: integer(),
extensions = #{} :: map(),
%% GHSA-q8jg: buffer caps (bytes), infinity disables the cap
max_frame_size = ?DEFAULT_MAX_FRAME_SIZE :: non_neg_integer() | infinity,
max_message_size = ?DEFAULT_MAX_MESSAGE_SIZE :: non_neg_integer() | infinity,
%% Pending requests
connect_from :: {pid(), reference()} | undefined,
recv_from :: {pid(), reference()} | undefined
}).
%%====================================================================
%% API
%%====================================================================
%% @doc Start a WebSocket connection process.
%% Options:
%% <ul>
%% <li>host: Target host (string or binary)</li>
%% <li>port: Target port (integer)</li>
%% <li>transport: hackney_tcp or hackney_ssl</li>
%% <li>path: WebSocket path (binary, default "/")</li>
%% <li>connect_timeout: Connection timeout (default 8000ms)</li>
%% <li>recv_timeout: Receive timeout (default infinity)</li>
%% <li>connect_options: Options passed to transport connect</li>
%% <li>ssl_options: Additional SSL options</li>
%% <li>active: false | true | once (default false)</li>
%% <li>headers: Extra headers for upgrade request</li>
%% <li>protocols: Sec-WebSocket-Protocol values</li>
%% </ul>
-spec start_link(map()) -> {ok, pid()} | {error, term()}.
start_link(Opts) when is_map(Opts) ->
gen_statem:start_link(?MODULE, [self(), Opts], []).
%% @doc Initiate WebSocket connection. Blocks until upgrade completes.
-spec connect(pid()) -> ok | {error, term()}.
connect(Pid) ->
connect(Pid, ?CONNECT_TIMEOUT).
-spec connect(pid(), timeout()) -> ok | {error, term()}.
connect(Pid, Timeout) ->
gen_statem:call(Pid, connect, Timeout).
%% @doc Send a WebSocket frame.
%% Frame types: {text, Data}, {binary, Data}, ping, {ping, Data},
%% pong, {pong, Data}, close, {close, Code, Reason}
-spec send(pid(), ws_frame()) -> ok | {error, term()}.
send(Pid, Frame) ->
gen_statem:call(Pid, {send, Frame}).
%% @doc Receive a WebSocket frame (passive mode only).
-spec recv(pid()) -> {ok, ws_frame()} | {error, term()}.
recv(Pid) ->
recv(Pid, ?RECV_TIMEOUT).
-spec recv(pid(), timeout()) -> {ok, ws_frame()} | {error, term()}.
recv(Pid, Timeout) ->
gen_statem:call(Pid, recv, Timeout).
%% @doc Set socket options. Supported: [{active, true|false|once}]
-spec setopts(pid(), list()) -> ok | {error, term()}.
setopts(Pid, Opts) ->
gen_statem:call(Pid, {setopts, Opts}).
%% @doc Close the WebSocket connection gracefully.
-spec close(pid()) -> ok.
close(Pid) ->
close(Pid, {1000, <<>>}).
-spec close(pid(), {integer(), binary()}) -> ok.
close(Pid, {Code, Reason}) ->
gen_statem:cast(Pid, {close, Code, Reason}).
%% @doc Assign a new controlling process.
-spec controlling_process(pid(), pid()) -> ok | {error, term()}.
controlling_process(Pid, NewOwner) ->
gen_statem:call(Pid, {controlling_process, NewOwner}).
%% @doc Return the address and port for the other end of connection.
-spec peername(pid()) -> {ok, {inet:ip_address(), inet:port_number()}} | {error, term()}.
peername(Pid) ->
gen_statem:call(Pid, peername).
%% @doc Get the local address and port of the socket.
-spec sockname(pid()) -> {ok, {inet:ip_address(), inet:port_number()}} | {error, term()}.
sockname(Pid) ->
gen_statem:call(Pid, sockname).
%%====================================================================
%% gen_statem callbacks
%%====================================================================
%% @private
callback_mode() ->
[state_functions, state_enter].
%% @private
init([Owner, Opts]) ->
process_flag(trap_exit, true),
Host = maps:get(host, Opts),
Port = maps:get(port, Opts),
Transport = maps:get(transport, Opts, hackney_tcp),
Path = maps:get(path, Opts, <<"/">>),
Data = #ws_data{
owner = Owner,
host = Host,
port = Port,
transport = Transport,
path = Path,
connect_timeout = maps:get(connect_timeout, Opts, ?CONNECT_TIMEOUT),
recv_timeout = maps:get(recv_timeout, Opts, ?RECV_TIMEOUT),
connect_options = maps:get(connect_options, Opts, []),
ssl_options = maps:get(ssl_options, Opts, []),
proxy = maps:get(proxy, Opts, false),
active = maps:get(active, Opts, false),
headers = maps:get(headers, Opts, []),
protocols = maps:get(protocols, Opts, []),
max_frame_size = maps:get(max_frame_size, Opts, ?DEFAULT_MAX_FRAME_SIZE),
max_message_size = maps:get(max_message_size, Opts, ?DEFAULT_MAX_MESSAGE_SIZE)
},
{ok, idle, Data}.
%% @private
terminate(_Reason, _State, #ws_data{socket = undefined}) ->
ok;
terminate(_Reason, _State, #ws_data{socket = Socket, transport = Transport}) ->
catch Transport:close(Socket),
ok.
%% @private
code_change(_OldVsn, State, Data, _Extra) ->
{ok, State, Data}.
%%====================================================================
%% State: idle
%%====================================================================
idle(enter, _OldState, _Data) ->
keep_state_and_data;
idle({call, From}, connect, Data) ->
#ws_data{
host = Host,
port = Port,
transport = Transport,
connect_timeout = ConnectTimeout,
connect_options = ConnectOpts0,
ssl_options = SSLOpts,
proxy = Proxy
} = Data,
%% Build connection options
BaseOpts = [binary, {active, false}, {packet, 0}, {keepalive, true}, {nodelay, true}],
AcceptedOpts = [linger, nodelay, send_timeout, send_timeout_close, raw, inet6],
ConnectOpts = hackney_util:filter_options(ConnectOpts0, AcceptedOpts, BaseOpts),
%% Convert host to list if binary
Host1 = case is_binary(Host) of
true -> binary_to_list(Host);
false -> Host
end,
%% Connect to the server (directly or through proxy)
ConnectResult = case Proxy of
false ->
do_connect(Transport, Host1, Port, ConnectOpts, SSLOpts, ConnectTimeout);
{connect, ProxyHost, ProxyPort, ProxyAuth, ProxyTransport} ->
do_connect_via_http_proxy(Transport, Host1, Port, ProxyHost, ProxyPort,
ProxyAuth, ProxyTransport, ConnectOpts, SSLOpts, ConnectTimeout);
{socks5, ProxyHost, ProxyPort, ProxyAuth, ProxyTransport} ->
do_connect_via_socks5(Transport, Host1, Port, ProxyHost, ProxyPort,
ProxyAuth, ProxyTransport, ConnectOpts, SSLOpts, ConnectTimeout)
end,
case ConnectResult of
{ok, Socket} ->
Data1 = Data#ws_data{socket = Socket},
%% Perform WebSocket handshake
case do_handshake(Data1) of
{ok, Data2} ->
{next_state, connected, Data2, [{reply, From, ok}]};
{error, Reason} ->
close_socket(Data1),
{stop_and_reply, normal, [{reply, From, {error, Reason}}]}
end;
{error, Reason} ->
{stop_and_reply, normal, [{reply, From, {error, Reason}}]}
end;
idle({call, From}, _Request, _Data) ->
{keep_state_and_data, [{reply, From, {error, not_connected}}]};
idle(info, {'EXIT', Owner, _Reason}, #ws_data{owner = Owner}) ->
{stop, normal};
idle(_, _, _) ->
keep_state_and_data.
%%====================================================================
%% State: upgrading (reserved for async handshake, not currently used)
%%====================================================================
upgrading(enter, _OldState, _Data) ->
keep_state_and_data;
upgrading({call, From}, _Request, _Data) ->
{keep_state_and_data, [{reply, From, {error, upgrading}}]};
upgrading(info, {'EXIT', Owner, _Reason}, #ws_data{owner = Owner}) ->
{stop, normal};
upgrading(_, _, _) ->
keep_state_and_data.
%%====================================================================
%% State: connected
%%====================================================================
connected(enter, OldState, #ws_data{active = Active} = Data) when OldState =:= idle; OldState =:= upgrading ->
%% Set socket active mode if configured
case Active of
false ->
keep_state_and_data;
_ ->
_ = set_socket_active(Data, Active),
keep_state_and_data
end;
connected(enter, _OldState, _Data) ->
keep_state_and_data;
connected({call, From}, {send, Frame}, Data) ->
case do_send_frame(Frame, Data) of
ok ->
{keep_state_and_data, [{reply, From, ok}]};
{error, Reason} ->
close_socket(Data),
{next_state, closed, Data, [{reply, From, {error, Reason}}]}
end;
connected({call, From}, recv, #ws_data{active = Active}) when Active =/= false ->
{keep_state_and_data, [{reply, From, {error, {active_mode, Active}}}]};
connected({call, From}, recv, #ws_data{recv_timeout = Timeout} = Data) ->
case do_recv_frame(Data, Timeout) of
{ok, Frame, Data1} ->
{keep_state, Data1, [{reply, From, {ok, Frame}}]};
{close, Code, Reason, Data1} ->
%% Server initiated close - send close response
do_send_frame({close, Code, Reason}, Data1),
close_socket(Data1),
{next_state, closed, Data1, [{reply, From, {error, {closed, Code, Reason}}}]};
{error, Reason} ->
close_socket(Data),
{next_state, closed, Data, [{reply, From, {error, Reason}}]}
end;
connected({call, From}, {setopts, Opts}, Data) ->
case proplists:get_value(active, Opts) of
undefined ->
{keep_state_and_data, [{reply, From, ok}]};
NewActive when NewActive =:= true; NewActive =:= false; NewActive =:= once ->
_ = set_socket_active(Data, NewActive),
{keep_state, Data#ws_data{active = NewActive}, [{reply, From, ok}]};
_ ->
{keep_state_and_data, [{reply, From, {error, badarg}}]}
end;
connected({call, From}, {controlling_process, NewOwner}, #ws_data{owner = OldOwner} = Data) ->
unlink(OldOwner),
link(NewOwner),
Data1 = Data#ws_data{owner = NewOwner},
{keep_state, Data1, [{reply, From, ok}]};
connected({call, From}, peername, #ws_data{socket = Socket, transport = Transport}) ->
Result = Transport:peername(Socket),
{keep_state_and_data, [{reply, From, Result}]};
connected({call, From}, sockname, #ws_data{socket = Socket, transport = Transport}) ->
Result = Transport:sockname(Socket),
{keep_state_and_data, [{reply, From, Result}]};
connected({call, From}, _Request, _Data) ->
{keep_state_and_data, [{reply, From, {error, badrequest}}]};
connected(cast, {close, Code, Reason}, Data) ->
%% Client-initiated close
do_send_frame({close, Code, Reason}, Data),
{next_state, closing, Data, [{state_timeout, ?CLOSE_TIMEOUT, close_timeout}]};
connected(info, {Msg, Socket, SocketData}, #ws_data{socket = Socket} = Data)
when Msg =:= tcp; Msg =:= ssl ->
%% Active mode data
handle_active_data(SocketData, Data);
connected(info, {Closed, Socket}, #ws_data{socket = Socket, owner = Owner} = Data)
when Closed =:= tcp_closed; Closed =:= ssl_closed ->
Owner ! {hackney_ws, self(), closed},
close_socket(Data),
{next_state, closed, Data};
connected(info, {Error, Socket, Reason}, #ws_data{socket = Socket, owner = Owner} = Data)
when Error =:= tcp_error; Error =:= ssl_error ->
Owner ! {hackney_ws_error, self(), Reason},
close_socket(Data),
{next_state, closed, Data};
connected(info, {'EXIT', Owner, _Reason}, #ws_data{owner = Owner} = Data) ->
%% Owner died, close gracefully
do_send_frame(close, Data),
close_socket(Data),
{stop, normal};
connected(_, _, _) ->
keep_state_and_data.
%%====================================================================
%% State: closing
%%====================================================================
closing(enter, _OldState, _Data) ->
keep_state_and_data;
closing({call, From}, {send, _Frame}, _Data) ->
{keep_state_and_data, [{reply, From, {error, closing}}]};
closing({call, From}, recv, _Data) ->
{keep_state_and_data, [{reply, From, {error, closing}}]};
closing({call, From}, _Request, _Data) ->
{keep_state_and_data, [{reply, From, {error, closing}}]};
closing(cast, {close, _Code, _Reason}, _Data) ->
%% Already closing
keep_state_and_data;
closing(info, {Msg, Socket, SocketData}, #ws_data{socket = Socket} = Data)
when Msg =:= tcp; Msg =:= ssl ->
%% Look for close response
case parse_close_response(SocketData, Data) of
{ok, _Code, _Reason} ->
close_socket(Data),
{next_state, closed, Data};
more ->
%% Keep waiting
keep_state_and_data
end;
closing(info, {Closed, Socket}, #ws_data{socket = Socket} = Data)
when Closed =:= tcp_closed; Closed =:= ssl_closed ->
{next_state, closed, Data};
closing(state_timeout, close_timeout, Data) ->
close_socket(Data),
{next_state, closed, Data};
closing(info, {'EXIT', Owner, _Reason}, #ws_data{owner = Owner}) ->
{stop, normal};
closing(_, _, _) ->
keep_state_and_data.
%%====================================================================
%% State: closed
%%====================================================================
closed(enter, _OldState, _Data) ->
keep_state_and_data;
closed({call, From}, _Request, _Data) ->
{keep_state_and_data, [{reply, From, {error, closed}}]};
closed(cast, {close, _Code, _Reason}, _Data) ->
keep_state_and_data;
closed(info, {'EXIT', Owner, _Reason}, #ws_data{owner = Owner}) ->
{stop, normal};
closed(_, _, _) ->
keep_state_and_data.
%%====================================================================
%% Internal functions
%%====================================================================
%% @private Connect to server
do_connect(hackney_tcp, Host, Port, Opts, _SSLOpts, Timeout) ->
hackney_happy:connect(Host, Port, Opts, Timeout);
do_connect(hackney_ssl, Host, Port, Opts, SSLOpts, Timeout) ->
case hackney_happy:connect(Host, Port, Opts, Timeout) of
{ok, TcpSocket} ->
AllSSLOpts = hackney_ssl:ssl_opts(Host, SSLOpts),
case ssl:connect(TcpSocket, AllSSLOpts, Timeout) of
{ok, SSLSocket} ->
{ok, SSLSocket};
{error, Reason} ->
gen_tcp:close(TcpSocket),
{error, Reason}
end;
Error ->
Error
end.
%% @private Connect through HTTP CONNECT proxy
%% ProxyTransport: tcp for HTTP proxy, ssl for HTTPS proxy
do_connect_via_http_proxy(Transport, Host, Port, ProxyHost, ProxyPort,
ProxyAuth, ProxyTransport, _ConnectOpts, SSLOpts, Timeout) ->
%% Build options for hackney_http_connect
ConnectOpts0 = [
{connect_host, Host},
{connect_port, Port},
{connect_transport, Transport},
{proxy_transport, ProxyTransport}
],
ConnectOpts1 = case ProxyAuth of
undefined -> ConnectOpts0;
{User, Pass} -> [{connect_user, User}, {connect_pass, Pass} | ConnectOpts0]
end,
%% Add SSL options if connecting to wss:// target
ConnectOpts = case Transport of
hackney_ssl ->
[{ssl_options, SSLOpts} | ConnectOpts1];
_ ->
ConnectOpts1
end,
case hackney_http_connect:connect(ProxyHost, ProxyPort, ConnectOpts, Timeout) of
{ok, {_ReturnedTransport, Socket}} ->
{ok, Socket};
{error, Reason} ->
{error, Reason}
end.
%% @private Connect through SOCKS5 proxy
%% ProxyTransport: tcp for plain SOCKS5, ssl for SOCKS5 over TLS
do_connect_via_socks5(Transport, Host, Port, ProxyHost, ProxyPort,
ProxyAuth, ProxyTransport, _ConnectOpts, SSLOpts, Timeout) ->
%% Build options for hackney_socks5
Socks5Opts0 = [
{socks5_host, ProxyHost},
{socks5_port, ProxyPort},
{socks5_transport, Transport},
{proxy_transport, ProxyTransport}
],
%% Add authentication if provided
Socks5Opts = case ProxyAuth of
undefined -> Socks5Opts0;
{User, Pass} -> [{socks5_user, User}, {socks5_pass, Pass} | Socks5Opts0]
end,
%% Add SSL options
AllOpts = case Transport of
hackney_ssl ->
[{ssl_options, SSLOpts} | Socks5Opts];
_ ->
Socks5Opts
end,
case hackney_socks5:connect(Host, Port, AllOpts, Timeout) of
{ok, {_ReturnedTransport, Socket}} ->
{ok, Socket};
{error, Reason} ->
{error, Reason}
end.
%% @private Perform WebSocket handshake
do_handshake(#ws_data{transport = Transport, host = Host,
port = Port, path = Path, headers = ExtraHeaders,
protocols = Protocols} = Data) ->
%% Generate random key
Key = hackney_ws_proto:key(),
%% Build Host header
Host1 = case is_binary(Host) of
true -> Host;
false -> list_to_binary(Host)
end,
DefaultPort = case Transport of
hackney_ssl -> 443;
_ -> 80
end,
HostHdr = case Port of
DefaultPort -> Host1;
_ -> <<Host1/binary, ":", (integer_to_binary(Port))/binary>>
end,
%% Build headers
Headers0 = [
{<<"Host">>, HostHdr},
{<<"Upgrade">>, <<"websocket">>},
{<<"Connection">>, <<"Upgrade">>},
{<<"Sec-WebSocket-Key">>, Key},
{<<"Sec-WebSocket-Version">>, <<"13">>}
],
%% Add protocols if specified
Headers1 = case Protocols of
[] -> Headers0;
_ ->
ProtocolValue = hackney_bstr:join(Protocols, <<", ">>),
Headers0 ++ [{<<"Sec-WebSocket-Protocol">>, ProtocolValue}]
end,
%% Add extra headers
Headers2 = Headers1 ++ ExtraHeaders,
%% GHSA-f9vr: the upgrade request is assembled by raw concatenation, so
%% a caller-supplied host, path, sub-protocol or extra header carrying
%% CR/LF/NUL would splice in extra header lines or rewrite the request
%% line. Reject those bytes before anything reaches the socket.
case valid_handshake_fields(Path, Headers2) of
ok ->
%% Build request
HeaderLines = [[Name, <<": ">>, Value, <<"\r\n">>] || {Name, Value} <- Headers2],
Request = [
<<"GET ">>, Path, <<" HTTP/1.1\r\n">>,
HeaderLines,
<<"\r\n">>
],
do_send_handshake(Data, Key, Request);
{error, _} = Err ->
Err
end.
%% @private GHSA-f9vr: reject CR/LF/NUL in the request line path and in any
%% header name/value used to build the WebSocket upgrade request.
valid_handshake_fields(Path, Headers) ->
Fields = [Path | lists:flatmap(fun({N, V}) -> [N, V] end, Headers)],
case lists:any(fun has_ctl_bytes/1, Fields) of
true -> {error, invalid_handshake_header};
false -> ok
end.
has_ctl_bytes(Bin) when is_binary(Bin) ->
binary:match(Bin, [<<"\r">>, <<"\n">>, <<0>>]) =/= nomatch;
has_ctl_bytes(L) when is_list(L) ->
has_ctl_bytes(iolist_to_binary(L));
has_ctl_bytes(_) ->
false.
%% @private Send the assembled upgrade request and process the response.
do_send_handshake(#ws_data{socket = Socket, transport = Transport} = Data, Key, Request) ->
case Transport:send(Socket, Request) of
ok ->
%% Read response
case read_handshake_response(Socket, Transport, <<>>) of
{ok, Status, ResponseHeaders, Rest} when Status =:= 101 ->
%% Validate response
ExpectedAccept = hackney_ws_proto:encode_key(Key),
case validate_handshake(ResponseHeaders, ExpectedAccept) of
ok ->
Protocol = get_header_value(<<"sec-websocket-protocol">>, ResponseHeaders),
Data1 = Data#ws_data{
ws_key = Key,
ws_accept = ExpectedAccept,
ws_protocol = Protocol,
buffer = Rest
},
{ok, Data1};
{error, Reason} ->
{error, Reason}
end;
{ok, Status, _ResponseHeaders, _Rest} ->
{error, {http_error, Status}};
{error, Reason} ->
{error, Reason}
end;
{error, Reason} ->
{error, Reason}
end.
%% @private Read HTTP response until \r\n\r\n
read_handshake_response(Socket, Transport, Buffer) ->
case Transport:recv(Socket, 0, 10000) of
{ok, Data} ->
Buffer1 = <<Buffer/binary, Data/binary>>,
case binary:match(Buffer1, <<"\r\n\r\n">>) of
{Pos, 4} ->
HeaderPart = binary:part(Buffer1, 0, Pos),
Rest = binary:part(Buffer1, Pos + 4, byte_size(Buffer1) - Pos - 4),
parse_handshake_response(HeaderPart, Rest);
nomatch when byte_size(Buffer1) > ?MAX_HANDSHAKE_RESPONSE_SIZE ->
%% GHSA-q8jg: a server that streams bytes without ever
%% sending the CRLFCRLF terminator would otherwise grow
%% this buffer without bound.
{error, handshake_response_too_large};
nomatch ->
read_handshake_response(Socket, Transport, Buffer1)
end;
{error, Reason} ->
{error, Reason}
end.
%% @private Parse HTTP response headers
parse_handshake_response(HeaderPart, Rest) ->
Lines = binary:split(HeaderPart, <<"\r\n">>, [global]),
case Lines of
[StatusLine | HeaderLines] ->
case parse_status_line(StatusLine) of
{ok, Status} ->
Headers = parse_headers(HeaderLines),
{ok, Status, Headers, Rest};
{error, Reason} ->
{error, Reason}
end;
_ ->
{error, invalid_response}
end.
%% @private Parse status line
parse_status_line(<<"HTTP/1.1 ", Rest/binary>>) ->
parse_status_code(Rest);
parse_status_line(<<"HTTP/1.0 ", Rest/binary>>) ->
parse_status_code(Rest);
parse_status_line(_) ->
{error, invalid_status_line}.
parse_status_code(<<D1, D2, D3, _/binary>>) when D1 >= $0, D1 =< $9,
D2 >= $0, D2 =< $9,
D3 >= $0, D3 =< $9 ->
Status = (D1 - $0) * 100 + (D2 - $0) * 10 + (D3 - $0),
{ok, Status};
parse_status_code(_) ->
{error, invalid_status_code}.
%% @private Parse headers
parse_headers(Lines) ->
parse_headers(Lines, []).
parse_headers([], Acc) ->
lists:reverse(Acc);
parse_headers([Line | Rest], Acc) ->
case binary:split(Line, <<": ">>) of
[Name, Value] ->
%% Lowercase header name for case-insensitive matching
LowerName = hackney_bstr:to_lower(Name),
parse_headers(Rest, [{LowerName, Value} | Acc]);
_ ->
parse_headers(Rest, Acc)
end.
%% @private Get header value
get_header_value(Name, Headers) ->
case lists:keyfind(Name, 1, Headers) of
{_, Value} -> Value;
false -> undefined
end.
%% @private Validate handshake response
validate_handshake(Headers, ExpectedAccept) ->
%% Check Upgrade header
case get_header_value(<<"upgrade">>, Headers) of
Upgrade when is_binary(Upgrade) ->
case hackney_bstr:to_lower(Upgrade) of
<<"websocket">> ->
%% Check Sec-WebSocket-Accept
case get_header_value(<<"sec-websocket-accept">>, Headers) of
ExpectedAccept ->
ok;
Other ->
{error, {invalid_accept, Other, ExpectedAccept}}
end;
_ ->
{error, {invalid_upgrade, Upgrade}}
end;
undefined ->
{error, missing_upgrade}
end.
%% @private Send a WebSocket frame
do_send_frame(Frame, #ws_data{socket = Socket, transport = Transport, extensions = Exts}) ->
%% Client frames must be masked
EncodedFrame = hackney_ws_proto:masked_frame(Frame, Exts),
Transport:send(Socket, EncodedFrame).
%% @private Receive a WebSocket frame (passive mode)
do_recv_frame(#ws_data{buffer = Buffer} = Data, Timeout) ->
do_recv_frame(Buffer, Data, Timeout).
do_recv_frame(Buffer, #ws_data{socket = Socket, transport = Transport,
frag_state = FragState, extensions = Exts,
utf8_state = Utf8State} = Data, Timeout) ->
case hackney_ws_proto:parse_header(Buffer, Exts, FragState) of
more ->
%% Need more data
case Transport:recv(Socket, 0, Timeout) of
{ok, MoreData} ->
do_recv_frame(<<Buffer/binary, MoreData/binary>>, Data, Timeout);
{error, Reason} ->
{error, Reason}
end;
error ->
{error, invalid_frame};
{_Type, _FragState1, _Rsv, Len, _MaskKey, _Rest} when is_integer(Len),
is_integer(Data#ws_data.max_frame_size),
Len > Data#ws_data.max_frame_size ->
%% GHSA-q8jg: refuse an oversized declared frame length before
%% buffering its payload (a peer may announce up to 2^63-1).
{error, {frame_too_big, Len}};
{Type, FragState1, Rsv, Len, MaskKey, Rest} ->
%% Parse payload
parse_payload(Type, FragState1, Rsv, Len, MaskKey, Rest, Data, Timeout, Utf8State)
end.
%% @private Parse frame payload
%% hackney_ws_proto:parse_payload/9 signature:
%% parse_payload(Data, MaskKey, Utf8State, ParsedLen, Type, Len, FragState, Extensions, Rsv)
%% Returns:
%% {ok, Payload, Utf8State, Rest} - non-close frame completed
%% {ok, CloseCode, Payload, Utf8State, Rest} - close frame completed
%% {more, Payload, Utf8State} - need more data
%% {more, CloseCode, Payload, Utf8State} - need more data for close
%% {error, Reason}
parse_payload(Type, FragState1, Rsv, Len, MaskKey, Buffer,
#ws_data{socket = Socket, transport = Transport,
extensions = Exts, frag_buffer = FragBuffer,
frag_size = FragSize} = Data,
Timeout, Utf8State) ->
%% ParsedLen = 0 for new frames
case hackney_ws_proto:parse_payload(Buffer, MaskKey, Utf8State, 0, Type, Len, FragState1, Exts, Rsv) of
{ok, CloseCode, Payload, Utf8State1, Rest} when Type =:= close ->
%% Close frame with code
Data1 = Data#ws_data{buffer = Rest, frag_state = undefined,
frag_buffer = [], frag_size = 0,
utf8_state = Utf8State1},
{close, CloseCode, Payload, Data1};
{ok, Payload, Utf8State1, Rest} ->
%% Non-close frame completed
case FragState1 of
{nofin, _FragType, _Rsv} ->
%% This is a fragment, accumulate
NewFragSize = FragSize + byte_size(Payload),
case message_within_limit(NewFragSize, Data) of
ok ->
Data1 = Data#ws_data{buffer = Rest, frag_state = FragState1,
frag_buffer = [Payload | FragBuffer],
frag_size = NewFragSize,
utf8_state = Utf8State1},
do_recv_frame(Rest, Data1, Timeout);
Err ->
Err
end;
{fin, FragType, _Rsv} ->
%% Final fragment - assemble full message
AllPayloads = lists:reverse([Payload | FragBuffer]),
FullPayload = iolist_to_binary(AllPayloads),
Frame = hackney_ws_proto:make_frame(FragType, FullPayload, undefined, undefined),
Data1 = Data#ws_data{buffer = Rest, frag_state = undefined,
frag_buffer = [], frag_size = 0,
utf8_state = Utf8State1},
handle_received_frame(Frame, Data1, Timeout);
undefined ->
%% Complete non-fragmented frame
Frame = hackney_ws_proto:make_frame(Type, Payload, undefined, undefined),
Data1 = Data#ws_data{buffer = Rest, frag_state = undefined,
frag_buffer = [], frag_size = 0,
utf8_state = Utf8State1},
handle_received_frame(Frame, Data1, Timeout)
end;
{more, _PartialPayload, Utf8State1} ->
%% Need more data for payload
case Transport:recv(Socket, 0, Timeout) of
{ok, MoreData} ->
parse_payload(Type, FragState1, Rsv, Len, MaskKey, <<Buffer/binary, MoreData/binary>>,
Data#ws_data{utf8_state = Utf8State1}, Timeout, Utf8State1);
{error, Reason} ->
{error, Reason}
end;
{more, _CloseCode, _PartialPayload, Utf8State1} ->
%% Need more data for close frame payload
case Transport:recv(Socket, 0, Timeout) of
{ok, MoreData} ->
parse_payload(Type, FragState1, Rsv, Len, MaskKey, <<Buffer/binary, MoreData/binary>>,
Data#ws_data{utf8_state = Utf8State1}, Timeout, Utf8State1);
{error, Reason} ->
{error, Reason}
end;
{error, Reason} ->
{error, Reason}
end.
%% @private Handle a received frame (ping/pong auto-response, etc.)
handle_received_frame(Frame, Data, Timeout) ->
case Frame of
ping ->
do_send_frame(pong, Data),
do_recv_frame(Data#ws_data.buffer, Data, Timeout);
{ping, PingData} ->
do_send_frame({pong, PingData}, Data),
do_recv_frame(Data#ws_data.buffer, Data, Timeout);
close ->
{close, 1000, <<>>, Data};
{close, Code, Reason} ->
{close, Code, Reason, Data};
_ ->
{ok, Frame, Data}
end.
%% @private Handle active mode data
handle_active_data(SocketData, #ws_data{buffer = Buffer, owner = Owner, active = Active} = Data) ->
Data1 = Data#ws_data{buffer = <<Buffer/binary, SocketData/binary>>},
case parse_active_frames(Data1) of
{ok, Frames, Data2} ->
%% Deliver frames to owner
lists:foreach(fun(Frame) ->
Owner ! {hackney_ws, self(), Frame}
end, Frames),
%% Handle active mode
case Active of
once ->
_ = set_socket_active(Data2, false),
{keep_state, Data2#ws_data{active = false}};
true ->
{keep_state, Data2};
false ->
{keep_state, Data2}
end;
{close, Code, Reason, Data2} ->
Owner ! {hackney_ws, self(), {close, Code, Reason}},
do_send_frame({close, Code, Reason}, Data2),
close_socket(Data2),
{next_state, closed, Data2};
{error, Reason} ->
Owner ! {hackney_ws_error, self(), Reason},
close_socket(Data),
{next_state, closed, Data}
end.
%% @private Parse all available frames from buffer
parse_active_frames(Data) ->
parse_active_frames(Data, []).
parse_active_frames(#ws_data{buffer = Buffer, frag_state = FragState,
extensions = Exts, utf8_state = Utf8State} = Data, Acc) ->
case hackney_ws_proto:parse_header(Buffer, Exts, FragState) of
more ->
{ok, lists:reverse(Acc), Data};
error ->
{error, invalid_frame};
{_Type, _FragState1, _Rsv, Len, _MaskKey, _Rest} when is_integer(Len),
is_integer(Data#ws_data.max_frame_size),
Len > Data#ws_data.max_frame_size ->
%% GHSA-q8jg: refuse an oversized declared frame length.
{error, {frame_too_big, Len}};
{Type, FragState1, Rsv, Len, MaskKey, Rest} ->
case parse_active_payload(Type, FragState1, Rsv, Len, MaskKey, Rest, Data, Utf8State) of
{ok, Frame, Data1} ->
case Frame of
ping ->
do_send_frame(pong, Data1),
parse_active_frames(Data1, [ping | Acc]);
{ping, PingData} ->
do_send_frame({pong, PingData}, Data1),
parse_active_frames(Data1, [{ping, PingData} | Acc]);
close ->
{close, 1000, <<>>, Data1};
{close, Code, Reason} ->
{close, Code, Reason, Data1};
_ ->
parse_active_frames(Data1, [Frame | Acc])
end;
{fragment, Data1} ->
parse_active_frames(Data1, Acc);
more ->
{ok, lists:reverse(Acc), Data};
{error, Reason} ->
{error, Reason};
error ->
{error, invalid_payload}
end
end.
%% @private GHSA-q8jg: bound the cumulative size of a fragmented message so a
%% peer cannot stream unbounded non-final continuation frames.
message_within_limit(_Size, #ws_data{max_message_size = infinity}) ->
ok;
message_within_limit(Size, #ws_data{max_message_size = Max}) when Size > Max ->
{error, {message_too_big, Size}};
message_within_limit(_Size, _Data) ->
ok.
%% @private Parse payload in active mode (non-blocking)
parse_active_payload(Type, FragState1, Rsv, Len, MaskKey, Buffer,
#ws_data{extensions = Exts, frag_buffer = FragBuffer,
frag_size = FragSize} = Data,
Utf8State) ->
case hackney_ws_proto:parse_payload(Buffer, MaskKey, Utf8State, 0, Type, Len, FragState1, Exts, Rsv) of
{ok, CloseCode, Payload, Utf8State1, Rest} when Type =:= close ->
%% Close frame with code
Data1 = Data#ws_data{buffer = Rest, frag_state = undefined,
frag_buffer = [], frag_size = 0,
utf8_state = Utf8State1},
{ok, {close, CloseCode, Payload}, Data1};
{ok, Payload, Utf8State1, Rest} ->
%% Non-close frame completed
case FragState1 of
{nofin, _FragType, _Rsv} ->
%% This is a fragment, accumulate
NewFragSize = FragSize + byte_size(Payload),
case message_within_limit(NewFragSize, Data) of
ok ->
Data1 = Data#ws_data{buffer = Rest, frag_state = FragState1,
frag_buffer = [Payload | FragBuffer],
frag_size = NewFragSize,
utf8_state = Utf8State1},
{fragment, Data1};
Err ->
Err
end;
{fin, FragType, _Rsv} ->
%% Final fragment
AllPayloads = lists:reverse([Payload | FragBuffer]),
FullPayload = iolist_to_binary(AllPayloads),
Frame = hackney_ws_proto:make_frame(FragType, FullPayload, undefined, undefined),
Data1 = Data#ws_data{buffer = Rest, frag_state = undefined,
frag_buffer = [], frag_size = 0,
utf8_state = Utf8State1},
{ok, Frame, Data1};
undefined ->
%% Complete non-fragmented frame
Frame = hackney_ws_proto:make_frame(Type, Payload, undefined, undefined),
Data1 = Data#ws_data{buffer = Rest, frag_state = undefined,
frag_buffer = [], frag_size = 0,
utf8_state = Utf8State1},
{ok, Frame, Data1}
end;
{more, _PartialPayload, _Utf8State1} ->
more;
{more, _CloseCode, _PartialPayload, _Utf8State1} ->
more;
{error, _Reason} ->
error
end.
%% @private Parse close frame response
%% parse_payload(Data, MaskKey, Utf8State, ParsedLen, Type, Len, FragState, Exts, Rsv)
parse_close_response(SocketData, #ws_data{buffer = Buffer, extensions = Exts}) ->
FullBuffer = <<Buffer/binary, SocketData/binary>>,
case hackney_ws_proto:parse_header(FullBuffer, Exts, undefined) of
{close, FragState, Rsv, Len, MaskKey, Rest} ->
case hackney_ws_proto:parse_payload(Rest, MaskKey, 0, 0, close, Len, FragState, Exts, Rsv) of
{ok, Code, Reason, _, _} ->
{ok, Code, Reason};
_ ->
{ok, 1000, <<>>}
end;
_ ->
more
end.
%% @private Set socket active mode
set_socket_active(#ws_data{socket = Socket, transport = hackney_ssl}, Mode) ->
ssl:setopts(Socket, [{active, Mode}]);
set_socket_active(#ws_data{socket = Socket}, Mode) ->
inet:setopts(Socket, [{active, Mode}]).
%% @private Close socket
close_socket(#ws_data{socket = undefined}) ->
ok;
close_socket(#ws_data{socket = Socket, transport = Transport}) ->
catch Transport:close(Socket),
ok.