-module(roadrunner_conn_loop_http2).
-moduledoc false.
%% HTTP/2 (RFC 9113) connection process.
%%
%% Driven by either TLS ALPN-negotiated `h2` or a plaintext listener
%% configured with `protocols => [http2]` (RFC 7540 §3.4 prior-knowledge).
%% The dispatch decision lives in `roadrunner_conn_loop:awaiting_shoot/3`;
%% this module is transport-agnostic and reads frames via
%% `roadrunner_transport:recv/3` either way.
%%
%% Phase H8b — true multiplexing with per-stream workers. The conn
%% process owns:
%%
%% - the active-mode socket (sole reader / writer of bytes),
%% - the HPACK encoder / decoder (single context per direction
%% shared across all streams),
%% - the connection-level flow-control windows and per-stream
%% windows / pending-send queues,
%% - a `streams` map keyed by stream id, and `worker_refs` mapping
%% monitor refs back to stream ids for `'DOWN'` correlation.
%%
%% Once a request stream finishes receiving (HEADERS + body +
%% END_STREAM), the conn spawns a `roadrunner_http2_stream_worker`
%% process. The worker resolves the route, runs middleware + handler,
%% and translates the response into messages back to the conn:
%%
%% ```
%% {h2_send_headers, Worker, Ref, StreamId, Status, Headers, EndStream}
%% {h2_send_data, Worker, Ref, StreamId, Data, EndStream}
%% {h2_send_trailers, Worker, Ref, StreamId, Trailers}
%% {h2_worker_done, StreamId}
%% ```
%%
%% The conn replies `{h2_send_ack, Ref}` once the corresponding
%% frame(s) are on the wire — so workers are synchronously
%% back-pressured against flow control without buffering.
%%
%% Workers are spawn_monitored (NOT linked) so a handler crash
%% resets only the affected stream — `'DOWN'` triggers
%% `RST_STREAM(INTERNAL_ERROR)` and the other in-flight streams
%% keep running. `MAX_CONCURRENT_STREAMS=100` is advertised; HEADERS
%% beyond that limit get `RST_STREAM(REFUSED_STREAM)`.
%%
%% Response shapes supported:
%%
%% | shape | h2 |
%% |---|---|
%% | `{Status, Headers, Body}` (buffered) | yes |
%% | `{stream, _, _, Fun}` | yes |
%% | `{loop, _}` | yes (worker enters handle_info loop) |
%% | `{sendfile, _}` | yes (chunked DATA via the stream-response engine) |
%% | `{websocket, _, _}` | 501 (Phase H13) |
-export([enter/5]).
%% RFC 9113 §3.4 client connection preface — fixed 24 bytes.
-define(PREFACE, ~"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n").
-define(PREFACE_LEN, 24).
%% Read-deadline caps for the handshake and idle states. Tests
%% override via `persistent_term:put/2` on `{?MODULE, handshake_timeout}`
%% / `{?MODULE, idle_timeout}` so the timeout branches are exercisable
%% without forcing 10–30 s waits per case.
-define(HANDSHAKE_TIMEOUT_DEFAULT, 10_000).
-define(IDLE_TIMEOUT_DEFAULT, 30_000).
%% RFC 9113 §6.5.2 default `MAX_FRAME_SIZE`.
-define(MAX_FRAME_SIZE, 16_384).
%% RFC 9113 §6.9.2 initial window size for both the connection and
%% each stream.
-define(INITIAL_WINDOW, 65535).
%% Default refill threshold + recv-window peaks. RFC 9113 doesn't
%% mandate any of these — they're policy. The defaults match the
%% RFC 9113 §6.9.2 baseline (65535 for both windows; the threshold
%% is half of that, the original heuristic).
%% `roadrunner_listener:opts()` carries the override knobs as nested
%% `http2` sub-opts (`conn_window`, `stream_window`,
%% `window_refill_threshold`) — bumping the windows is the standard
%% tuning for non-LAN RTTs (window/RTT bounds per-stream
%% throughput). Defaults here mirror those baselines for static
%% record initialization; runtime values come from the proto_opts
%% map populated by `roadrunner_listener:normalize_protocols/1`.
-define(DEFAULT_CONN_RECV_WINDOW, 65535).
-define(DEFAULT_STREAM_RECV_WINDOW, 65535).
-define(DEFAULT_WINDOW_REFILL_THRESHOLD, 32_768).
%% Hard upper bound on a flow-control window per RFC 9113 §6.9.1
%% (signed 31-bit). Increments that would push past this are a
%% FLOW_CONTROL_ERROR.
-define(MAX_WINDOW, 16#7FFFFFFF).
%% Phase H8b: lift from 1 (serial) to 100 concurrent streams.
%% Clients exceeding this on this connection get
%% RST_STREAM(REFUSED_STREAM) on the over-limit HEADERS.
-define(MAX_CONCURRENT_STREAMS, 100).
-define(GOAWAY(LastStreamId, ErrorCode),
roadrunner_http2_frame:encode({goaway, (LastStreamId), (ErrorCode), <<>>})
).
-type stream_id() :: pos_integer().
-type stream_state() ::
open | half_closed_remote | half_closed_local | closed.
%% Pending-send entry. Only DATA frames are ever queued —
%% HEADERS / trailer HEADERS write straight to the wire (HPACK
%% encoding is window-independent). Workers are synchronous so at
%% most one entry can be pending per stream at a time; we still
%% type it as a list to leave room for future enrichment without
%% reshaping callers.
-type pending_send() ::
{data, reference(), pid(), iodata(), boolean()}.
-type stream_entry() :: #{
state := stream_state(),
header_fragment := binary(),
end_headers := boolean(),
end_stream_seen := boolean(),
headers := undefined | roadrunner_http:headers(),
body := iolist(),
%% Cumulative byte count of received DATA payload, used to
%% validate against the request's `content-length` header at
%% END_STREAM (RFC 9113 §8.1.2.6).
body_len := non_neg_integer(),
send_window := integer(),
recv_window := non_neg_integer(),
worker_pid := undefined | pid(),
worker_ref := undefined | reference(),
pending_sends := queue:queue(pending_send())
}.
-record(loop, {
socket :: roadrunner_transport:socket(),
proto_opts :: roadrunner_conn:proto_opts(),
listener_name :: atom(),
peer :: {inet:ip_address(), inet:port_number()} | undefined,
start_mono :: integer(),
scheme :: http | https,
%% Active-mode message tags for this transport.
msg_data :: atom(),
msg_closed :: atom(),
msg_error :: atom(),
%% Inbound bytes still to parse.
buffer = <<>> :: binary(),
%% HPACK contexts.
hpack_dec :: roadrunner_http2_hpack:context(),
hpack_enc :: roadrunner_http2_hpack:context(),
%% Highest stream id we've processed — for the LAST_STREAM_ID
%% in GOAWAY.
last_stream_id = 0 :: non_neg_integer(),
%% Pre-generated CSPRNG bytes for `request_id`, sliced 8 bytes
%% at a time. Refilled by `roadrunner_conn:generate_request_id/1`
%% in 256-byte batches so h2 conns amortize the crypto cost
%% across ~32 requests instead of paying it per dispatch.
req_id_buffer = <<>> :: binary(),
%% Connection-level flow-control windows (RFC 9113 §5.2). Send
%% window is set by peer SETTINGS / WINDOW_UPDATE. Recv window
%% starts at 65535 (RFC default) and is bumped to
%% `recv_window_peak` via an early `WINDOW_UPDATE(0, _)` in
%% `handshake/1` when the peak is greater. Refilled to the peak
%% whenever it drops below `recv_window_threshold`.
conn_send_window = 65535 :: integer(),
conn_recv_window = 65535 :: non_neg_integer(),
%% Configured peaks + refill threshold. Read from proto_opts at
%% `enter/5`. See the `?DEFAULT_*` macros above for the policy
%% baseline + the listener moduledoc for the override knobs.
recv_window_peak = ?DEFAULT_CONN_RECV_WINDOW :: pos_integer(),
stream_recv_window_peak = ?DEFAULT_STREAM_RECV_WINDOW :: pos_integer(),
recv_window_threshold = ?DEFAULT_WINDOW_REFILL_THRESHOLD :: pos_integer(),
%% Stream table, keyed by stream id.
streams = #{} :: #{stream_id() => stream_entry()},
%% Worker monitor ref → stream id, for DOWN correlation.
worker_refs = #{} :: #{reference() => stream_id()},
%% Set to a stream id while a HEADERS / PUSH_PROMISE block is
%% still being assembled (no END_HEADERS yet). The next inbound
%% frame MUST be a CONTINUATION on the same stream — anything
%% else is a connection error per RFC 9113 §6.10.
awaiting_continuation = undefined :: undefined | stream_id(),
%% Peer-advertised SETTINGS_INITIAL_WINDOW_SIZE for stream send
%% windows. Default per §6.9.2 is 65535. New streams use this
%% value; existing stream send-windows shift by the delta when
%% the peer changes the setting (§6.9.2).
peer_initial_window = 65535 :: integer(),
%% Set to `true` once a `{roadrunner_drain, _}` message has
%% been observed (Phase H9). In drain mode we've already sent
%% GOAWAY(NO_ERROR), refuse fresh streams with
%% RST_STREAM(REFUSED_STREAM), and exit as soon as the streams
%% map empties.
draining = false :: boolean()
}).
-doc """
Top-level entry from the HTTP/1.1 dispatch fork. Owns the socket
from this point on; takes responsibility for releasing the listener
slot and firing `[roadrunner, listener, conn_close]` telemetry.
""".
-spec enter(
roadrunner_transport:socket(),
roadrunner_conn:proto_opts(),
atom(),
{inet:ip_address(), inet:port_number()} | undefined,
integer()
) -> no_return().
enter(Socket, ProtoOpts, ListenerName, Peer, StartMono) ->
proc_lib:set_label({roadrunner_conn_loop_http2, ListenerName, Peer}),
Scheme = roadrunner_conn:scheme(Socket),
{Data, Closed, Error} = roadrunner_transport:messages(Socket),
%% h2 window knobs are flattened onto proto_opts top-level with
%% an `http2_` prefix by `roadrunner_listener:normalize_protocols/1`
%% — one `maps:get/2` per knob, no nested map dives. Fallback
%% defaults match the listener's so tests that build proto_opts
%% directly stay short.
ConnPeak = maps:get(http2_conn_window, ProtoOpts, ?DEFAULT_CONN_RECV_WINDOW),
StreamPeak = maps:get(http2_stream_window, ProtoOpts, ?DEFAULT_STREAM_RECV_WINDOW),
Threshold = maps:get(
http2_window_refill_threshold, ProtoOpts, ?DEFAULT_WINDOW_REFILL_THRESHOLD
),
State = #loop{
socket = Socket,
proto_opts = ProtoOpts,
listener_name = ListenerName,
peer = Peer,
start_mono = StartMono,
scheme = Scheme,
msg_data = Data,
msg_closed = Closed,
msg_error = Error,
hpack_dec = roadrunner_http2_hpack:new_decoder(4096),
hpack_enc = roadrunner_http2_hpack:new_encoder(4096),
recv_window_peak = ConnPeak,
stream_recv_window_peak = StreamPeak,
recv_window_threshold = Threshold
},
handshake(State).
%% =============================================================================
%% Handshake — RFC 9113 §3.4
%% =============================================================================
-spec handshake(#loop{}) -> no_return().
handshake(
#loop{recv_window_peak = ConnPeak, stream_recv_window_peak = StreamPeak} = State
) ->
_ = send(State, server_settings_frame(StreamPeak)),
%% RFC 9113 §6.9.2: SETTINGS_INITIAL_WINDOW_SIZE only affects
%% stream-level recv windows. The conn-level recv window stays
%% at the 65535 default until an explicit `WINDOW_UPDATE(0, _)`,
%% so emit one now if the configured peak is bigger.
State1 =
case ConnPeak > ?INITIAL_WINDOW of
true ->
Inc = ConnPeak - ?INITIAL_WINDOW,
_ = send(State, roadrunner_http2_frame:encode({window_update, 0, Inc})),
State#loop{conn_recv_window = ConnPeak};
false ->
State
end,
handshake_phase_preface(State1).
-spec server_settings_frame(pos_integer()) -> iodata().
server_settings_frame(StreamPeak) ->
%% Advertise MAX_CONCURRENT_STREAMS=100 and MAX_FRAME_SIZE.
%% IDs from RFC 9113 §6.5.2: 3 = MAX_CONCURRENT_STREAMS,
%% 5 = MAX_FRAME_SIZE. When the stream-level recv peak is bigger
%% than the RFC 65535 default, also advertise
%% SETTINGS_INITIAL_WINDOW_SIZE (id 4) so streams the peer opens
%% start with the larger receive allowance.
Base = [{3, ?MAX_CONCURRENT_STREAMS}, {5, ?MAX_FRAME_SIZE}],
Settings =
case StreamPeak > ?INITIAL_WINDOW of
true -> [{4, StreamPeak} | Base];
false -> Base
end,
roadrunner_http2_frame:encode({settings, 0, Settings}).
-spec handshake_timeout() -> non_neg_integer().
handshake_timeout() ->
persistent_term:get({?MODULE, handshake_timeout}, ?HANDSHAKE_TIMEOUT_DEFAULT).
-spec idle_timeout() -> non_neg_integer().
idle_timeout() ->
persistent_term:get({?MODULE, idle_timeout}, ?IDLE_TIMEOUT_DEFAULT).
-spec handshake_phase_preface(#loop{}) -> no_return().
handshake_phase_preface(#loop{buffer = Buf} = State) when byte_size(Buf) >= ?PREFACE_LEN ->
<<Head:?PREFACE_LEN/binary, Rest/binary>> = Buf,
case Head of
?PREFACE ->
handshake_phase_settings(State#loop{buffer = Rest});
_ ->
exit_clean(State)
end;
handshake_phase_preface(State) ->
handshake_recv(State, fun handshake_phase_preface/1).
-spec handshake_phase_settings(#loop{}) -> no_return().
handshake_phase_settings(#loop{buffer = Buf} = State) ->
case roadrunner_http2_frame:parse(Buf, ?MAX_FRAME_SIZE) of
{ok, {settings, Flags, _Params}, Rest} when (Flags band 1) =:= 0 ->
State1 = State#loop{buffer = Rest},
_ = send(State1, roadrunner_http2_frame:encode({settings, 1, []})),
frame_loop(State1);
{ok, _, _} ->
_ = send_goaway(State, protocol_error),
exit_clean(State);
{more, _Need} ->
handshake_recv(State, fun handshake_phase_settings/1);
{error, _} ->
_ = send_goaway(State, protocol_error),
exit_clean(State)
end.
-spec handshake_recv(#loop{}, fun((#loop{}) -> no_return())) -> no_return().
handshake_recv(
#loop{
socket = Sock,
msg_data = MData,
msg_closed = MClosed,
msg_error = MError,
buffer = Buf
} = State,
Cont
) ->
_ = roadrunner_transport:setopts(Sock, [{active, once}]),
receive
{MData, _, Bytes} ->
Cont(State#loop{buffer = <<Buf/binary, Bytes/binary>>});
{MClosed, _} ->
exit_clean(State);
{MError, _, _} ->
exit_clean(State)
after handshake_timeout() ->
exit_clean(State)
end.
%% =============================================================================
%% Frame loop — active-mode socket receive + worker message dispatch
%% =============================================================================
-spec frame_loop(#loop{}) -> no_return().
frame_loop(#loop{draining = true, streams = Streams} = State) when map_size(Streams) =:= 0 ->
%% Drain done — the last in-flight stream finished or peer
%% RST'd it, nothing more to do.
exit_clean(State);
frame_loop(#loop{buffer = Buf} = State) ->
case roadrunner_http2_frame:parse(Buf, ?MAX_FRAME_SIZE) of
{ok, Frame, Rest} ->
handle_frame(Frame, State#loop{buffer = Rest});
{more, _Need} ->
arm_and_recv(State);
{error, _} ->
_ = send_goaway(State, protocol_error),
exit_clean(State)
end.
-spec arm_and_recv(#loop{}) -> no_return().
arm_and_recv(#loop{socket = Sock} = State) ->
_ = roadrunner_transport:setopts(Sock, [{active, once}]),
recv_more(State).
%% Unified mailbox dispatch: socket events, worker send requests,
%% worker DOWN signals, and the idle-timeout `after` clause.
-spec recv_more(#loop{}) -> no_return().
recv_more(
#loop{
msg_data = MData,
msg_closed = MClosed,
msg_error = MError,
buffer = Buf
} = State
) ->
receive
{MData, _, Bytes} ->
frame_loop(State#loop{buffer = <<Buf/binary, Bytes/binary>>});
{MClosed, _} ->
exit_clean(State);
{MError, _, _} ->
_ = send_goaway(State, protocol_error),
exit_clean(State);
{h2_send_headers, From, Ref, StreamId, Status, Headers, EndStream} ->
recv_more(handle_send_headers(State, From, Ref, StreamId, Status, Headers, EndStream));
{h2_send_data, From, Ref, StreamId, Data, EndStream} ->
recv_more(handle_send_data(State, From, Ref, StreamId, Data, EndStream));
{h2_send_trailers, From, Ref, StreamId, Trailers} ->
recv_more(handle_send_trailers(State, From, Ref, StreamId, Trailers));
{h2_send_response, From, Ref, StreamId, Status, Headers, Body} ->
recv_more(handle_send_response(State, From, Ref, StreamId, Status, Headers, Body));
{h2_worker_done, StreamId} ->
recv_more(handle_worker_done(State, StreamId));
{'DOWN', MonRef, process, _Pid, Reason} ->
recv_more(maybe_exit_when_drained(handle_worker_down(State, MonRef, Reason)));
{roadrunner_drain, _Deadline} ->
recv_more(maybe_exit_when_drained(start_drain(State)))
after idle_timeout() ->
_ = send_goaway(State, protocol_error),
exit_clean(State)
end.
%% Begin a graceful drain: emit GOAWAY(NO_ERROR) once, refuse new
%% streams henceforth. In-flight workers continue to completion.
%% Idempotent — subsequent drain messages are no-ops.
-spec start_drain(#loop{}) -> #loop{}.
start_drain(#loop{draining = true} = State) ->
State;
start_drain(#loop{} = State) ->
_ = send_goaway(State, no_error),
State#loop{draining = true}.
%% Once we're draining and the streams map empties, exit cleanly.
%% Called after every event that could remove a stream. Note that
%% `exit_clean/1` is `no_return`, so on the drained branch this
%% function never returns and the caller's tail-call to
%% `recv_more/1` is dead — typed as `#loop{}` rather than
%% `no_return()` so the type-checker sees the live path.
-spec maybe_exit_when_drained(#loop{}) -> #loop{}.
maybe_exit_when_drained(#loop{draining = true, streams = Streams} = State) when
map_size(Streams) =:= 0
->
exit_clean(State);
maybe_exit_when_drained(State) ->
State.
%% =============================================================================
%% Per-frame dispatch — peer → server frames
%% =============================================================================
-spec handle_frame(roadrunner_http2_frame:frame(), #loop{}) -> no_return().
%% RFC 9113 §6.10: while a HEADERS / PUSH_PROMISE block is in
%% mid-flight (no END_HEADERS yet), the next frame MUST be a
%% CONTINUATION on the same stream. Anything else is PROTOCOL_ERROR.
handle_frame(Frame, #loop{awaiting_continuation = Awaiting} = State) when
Awaiting =/= undefined
->
case Frame of
{continuation, Awaiting, Flags, Fragment} ->
on_continuation(Awaiting, Flags, Fragment, State);
_ ->
_ = send_goaway(State, protocol_error),
exit_clean(State)
end;
handle_frame({settings, 1, _}, State) ->
frame_loop(State);
handle_frame({settings, 0, Params}, State) ->
case validate_settings(Params) of
ok ->
case apply_initial_window_size(Params, State) of
{ok, State1} ->
_ = send(State1, roadrunner_http2_frame:encode({settings, 1, []})),
frame_loop(State1);
{error, flow_control_error} ->
_ = send_goaway(State, flow_control_error),
exit_clean(State)
end;
{error, {protocol_error, _}} ->
_ = send_goaway(State, protocol_error),
exit_clean(State);
{error, {flow_control_error, _}} ->
_ = send_goaway(State, flow_control_error),
exit_clean(State)
end;
handle_frame({ping, 1, _Data}, State) ->
frame_loop(State);
handle_frame({ping, 0, Opaque}, State) ->
_ = send(State, roadrunner_http2_frame:encode({ping, 1, Opaque})),
frame_loop(State);
handle_frame({window_update, 0, Inc}, State) ->
case State#loop.conn_send_window + Inc of
New when New > ?MAX_WINDOW ->
_ = send_goaway(State, flow_control_error),
exit_clean(State);
New ->
State1 = State#loop{conn_send_window = New},
frame_loop(flush_all_pending_data(State1))
end;
handle_frame({window_update, StreamId, Inc}, #loop{streams = Streams} = State) ->
case Streams of
#{StreamId := #{send_window := SW} = Stream} ->
case SW + Inc of
New when New > ?MAX_WINDOW ->
%% RFC 9113 §6.9.1: stream-level overflow is a
%% stream error, not a connection error.
_ = send_rst_stream(State, StreamId, flow_control_error),
frame_loop(remove_stream(State, StreamId));
New ->
Stream1 = Stream#{send_window := New},
State1 = State#loop{streams = Streams#{StreamId := Stream1}},
frame_loop(flush_pending_data(State1, StreamId))
end;
#{} ->
%% RFC 9113 §5.1: WINDOW_UPDATE on an idle stream is
%% PROTOCOL_ERROR. A closed-stream WU (id <=
%% last_stream_id) is silently ignored per §6.9.
case StreamId > State#loop.last_stream_id of
true ->
_ = send_goaway(State, protocol_error),
exit_clean(State);
false ->
frame_loop(State)
end
end;
handle_frame({priority, StreamId, #{stream_dependency := StreamId}}, State) ->
%% RFC 9113 §5.3.1: a stream cannot depend on itself —
%% stream-error PROTOCOL_ERROR.
_ = send_rst_stream(State, StreamId, protocol_error),
frame_loop(State);
handle_frame({priority, _, _}, State) ->
frame_loop(State);
handle_frame({rst_stream, StreamId, _}, #loop{streams = Streams} = State) ->
case Streams of
#{StreamId := _} ->
frame_loop(reset_stream(State, StreamId));
#{} ->
%% RFC 9113 §5.1: RST_STREAM on an idle stream is
%% PROTOCOL_ERROR (we never opened it). On a closed
%% stream (id <= last_stream_id) the receipt is a no-op
%% per §5.4 — it's the peer telling us about the
%% already-closed lifecycle.
case StreamId > State#loop.last_stream_id of
true ->
_ = send_goaway(State, protocol_error),
exit_clean(State);
false ->
frame_loop(State)
end
end;
handle_frame({goaway, _, _, _}, State) ->
%% Client is shutting down.
exit_clean(State);
handle_frame({headers, StreamId, Flags, Priority, Fragment}, State) ->
on_headers(StreamId, Flags, Priority, Fragment, State);
handle_frame({continuation, StreamId, Flags, Fragment}, State) ->
on_continuation(StreamId, Flags, Fragment, State);
handle_frame({data, StreamId, Flags, Payload}, State) ->
on_data(StreamId, Flags, Payload, State);
handle_frame({push_promise, _, _, _, _}, State) ->
%% Servers MUST NOT receive PUSH_PROMISE — RFC 9113 §6.6.
_ = send_goaway(State, protocol_error),
exit_clean(State);
handle_frame({unknown, _Type, _StreamId}, State) ->
%% RFC 9113 §4.1: unknown frame types MUST be ignored. The
%% awaiting_continuation guard above already rejected the
%% mid-header-block case (§6.10), so reaching here is benign.
frame_loop(State).
%% --- HEADERS / CONTINUATION ---
on_headers(StreamId, _Flags, #{stream_dependency := StreamId}, _Fragment, State) ->
%% RFC 9113 §5.3.1: a stream cannot depend on itself —
%% stream-error PROTOCOL_ERROR. We never enter the stream into
%% the streams map since we're rejecting it.
_ = send_rst_stream(State, StreamId, protocol_error),
frame_loop(State);
on_headers(StreamId, _Flags, _Priority, _Fragment, State) when StreamId rem 2 =:= 0 ->
%% RFC 9113 §5.1.1: client-initiated stream IDs are odd.
_ = send_goaway(State, protocol_error),
exit_clean(State);
on_headers(StreamId, Flags, _Priority, Fragment, #loop{streams = Streams} = State) when
is_map_key(StreamId, Streams)
->
%% HEADERS for an already-open stream — only valid as a
%% trailer block per RFC 9113 §8.1: peer's first HEADERS
%% must have been finalized (end_headers + decoded), the
%% stream's body must be open (no END_STREAM yet), and the
%% trailer HEADERS frame MUST set END_STREAM.
#{StreamId := Stream} = Streams,
case is_trailer_block(Stream, Flags) of
true ->
EndHeaders = (Flags band 16#04) =/= 0,
Stream1 = Stream#{
header_fragment := Fragment,
end_headers := EndHeaders,
end_stream_seen := true
},
State1 = update_stream(State, StreamId, Stream1),
State2 =
case EndHeaders of
true -> State1;
false -> State1#loop{awaiting_continuation = StreamId}
end,
case EndHeaders of
true -> finalize_trailers(StreamId, State2);
false -> frame_loop(State2)
end;
false ->
_ = send_goaway(State, protocol_error),
exit_clean(State)
end;
on_headers(StreamId, _Flags, _Priority, _Fragment, State) when
StreamId =< State#loop.last_stream_id
->
%% RFC 9113 §5.1.1: stream IDs MUST monotonically increase.
%% Receiving a HEADERS for a stream id we've already advanced
%% past (without it being currently open) means the peer is
%% trying to (re)open a closed stream — STREAM_CLOSED is the
%% nominal stream error, but with an unknown stream we have
%% no per-stream context to RST against, so treat as a
%% connection error.
_ = send_goaway(State, protocol_error),
exit_clean(State);
on_headers(StreamId, _Flags, _Priority, _Fragment, #loop{draining = true} = State) ->
%% In drain mode — refuse all new streams. The peer already
%% saw GOAWAY(NO_ERROR); it knows new requests on this conn
%% won't be served.
_ = send_rst_stream(State, StreamId, refused_stream),
frame_loop(State);
on_headers(StreamId, _Flags, _Priority, _Fragment, #loop{streams = Streams} = State) when
map_size(Streams) >= ?MAX_CONCURRENT_STREAMS
->
%% Over the advertised concurrency limit — refuse the stream.
_ = send_rst_stream(State, StreamId, refused_stream),
frame_loop(State);
on_headers(StreamId, Flags, _Priority, Fragment, State) ->
EndHeaders = (Flags band 16#04) =/= 0,
EndStream = (Flags band 16#01) =/= 0,
Stream = new_stream(
Fragment,
EndHeaders,
EndStream,
State#loop.peer_initial_window,
State#loop.stream_recv_window_peak
),
State1 = State#loop{
streams = (State#loop.streams)#{StreamId => Stream},
last_stream_id = StreamId,
awaiting_continuation =
if
EndHeaders -> undefined;
true -> StreamId
end
},
if
EndHeaders -> finalize_headers(StreamId, State1);
true -> frame_loop(State1)
end.
new_stream(Fragment, EndHeaders, EndStream, SendWindow, RecvWindow) ->
#{
state => open,
header_fragment => Fragment,
end_headers => EndHeaders,
end_stream_seen => EndStream,
headers => undefined,
body => [],
body_len => 0,
send_window => SendWindow,
recv_window => RecvWindow,
worker_pid => undefined,
worker_ref => undefined,
pending_sends => queue:new()
}.
on_continuation(StreamId, Flags, Fragment, #loop{streams = Streams} = State) ->
case Streams of
#{
StreamId :=
#{
end_headers := false,
header_fragment := Existing,
headers := PriorHeaders
} = Stream
} ->
Combined = <<Existing/binary, Fragment/binary>>,
EndHeaders = (Flags band 16#04) =/= 0,
Stream1 = Stream#{header_fragment := Combined, end_headers := EndHeaders},
State1 = State#loop{streams = Streams#{StreamId := Stream1}},
State2 =
case EndHeaders of
true -> State1#loop{awaiting_continuation = undefined};
false -> State1
end,
case {EndHeaders, PriorHeaders =:= undefined} of
{true, true} -> finalize_headers(StreamId, State2);
{true, false} -> finalize_trailers(StreamId, State2);
{false, _} -> frame_loop(State2)
end;
_ ->
_ = send_goaway(State, protocol_error),
exit_clean(State)
end.
%% Decode an inbound trailer HPACK block. We currently drop the
%% decoded fields (handlers don't see request trailers yet) but
%% MUST advance the HPACK decoder context so subsequent header
%% blocks decode correctly. After consuming, dispatch the
%% pending request (END_STREAM was already set by `on_headers/5`).
finalize_trailers(StreamId, #loop{streams = Streams, hpack_dec = Dec} = State) ->
#{StreamId := #{header_fragment := Fragment} = Stream} = Streams,
case roadrunner_http2_hpack:decode(Fragment, Dec) of
{ok, _Trailers, Dec1} ->
Stream1 = Stream#{header_fragment := <<>>},
State1 = State#loop{
hpack_dec = Dec1,
streams = Streams#{StreamId := Stream1}
},
dispatch_stream(StreamId, State1);
{error, _} ->
_ = send_goaway(State, protocol_error),
exit_clean(State)
end.
%% Decode the accumulated HPACK fragment, store decoded headers on
%% the stream entry. If END_STREAM was set on the HEADERS frame,
%% dispatch the worker now (no body); otherwise wait for DATA.
finalize_headers(StreamId, #loop{streams = Streams, hpack_dec = Dec} = State) ->
#{
StreamId := #{
header_fragment := Fragment,
end_stream_seen := EndStreamSeen
} = Stream
} = Streams,
case roadrunner_http2_hpack:decode(Fragment, Dec) of
{ok, Headers, Dec1} ->
Stream1 = Stream#{headers := Headers, header_fragment := <<>>},
State1 = State#loop{
hpack_dec = Dec1,
streams = Streams#{StreamId := Stream1}
},
case EndStreamSeen of
true -> dispatch_stream(StreamId, State1);
false -> frame_loop(State1)
end;
{error, _} ->
_ = send_goaway(State, protocol_error),
exit_clean(State)
end.
%% --- DATA ---
on_data(StreamId, _Flags, _Payload, State) when StreamId > State#loop.last_stream_id ->
%% RFC 9113 §5.1: DATA on an idle stream is PROTOCOL_ERROR.
_ = send_goaway(State, protocol_error),
exit_clean(State);
on_data(StreamId, _Flags, _Payload, #loop{streams = Streams} = State) when
not is_map_key(StreamId, Streams)
->
%% RFC 9113 §6.1: DATA on a closed stream is STREAM_CLOSED.
%% In our setup the conn-level recv window has already been
%% partially consumed by the peer's send so we still emit a
%% RST_STREAM rather than ignoring.
_ = send_rst_stream(State, StreamId, stream_closed),
frame_loop(State);
on_data(StreamId, Flags, Payload, #loop{streams = Streams} = State) ->
#{
StreamId :=
#{body := Body, body_len := Len, recv_window := RW} = Stream
} = Streams,
EndStream = (Flags band 16#01) =/= 0,
PayloadLen = byte_size(Payload),
Stream1 = Stream#{
body := [Body, Payload],
body_len := Len + PayloadLen,
end_stream_seen := EndStream,
recv_window := RW - PayloadLen
},
State1 = State#loop{
streams = Streams#{StreamId := Stream1},
conn_recv_window = State#loop.conn_recv_window - PayloadLen
},
State2 = maybe_refill_recv_windows(State1, StreamId),
case EndStream of
true -> dispatch_stream(StreamId, State2);
false -> frame_loop(State2)
end.
%% Refill the conn-level + stream-level recv windows whenever they
%% drop below `?WINDOW_REFILL_THRESHOLD`.
-spec maybe_refill_recv_windows(#loop{}, stream_id()) -> #loop{}.
maybe_refill_recv_windows(State, StreamId) ->
State1 = maybe_refill_conn(State),
maybe_refill_stream(State1, StreamId).
maybe_refill_conn(
#loop{
conn_recv_window = W,
recv_window_peak = Peak,
recv_window_threshold = Threshold
} = State
) when W < Threshold ->
Inc = Peak - W,
_ = send(State, roadrunner_http2_frame:encode({window_update, 0, Inc})),
State#loop{conn_recv_window = W + Inc};
maybe_refill_conn(State) ->
State.
maybe_refill_stream(
#loop{
streams = Streams,
stream_recv_window_peak = Peak,
recv_window_threshold = Threshold
} = State,
StreamId
) ->
#{StreamId := #{recv_window := W} = Stream} = Streams,
if
W < Threshold ->
Inc = Peak - W,
_ = send(
State,
roadrunner_http2_frame:encode({window_update, StreamId, Inc})
),
State#loop{streams = Streams#{StreamId := Stream#{recv_window := W + Inc}}};
true ->
State
end.
%% =============================================================================
%% Stream dispatch — spawn a worker
%% =============================================================================
dispatch_stream(
StreamId,
#loop{
streams = Streams,
proto_opts = ProtoOpts,
peer = Peer,
scheme = Scheme,
listener_name = ListenerName
} = State
) ->
#{
StreamId := #{
headers := Headers,
body_len := BodyLen,
body := BodyIolist
} = Stream
} = Streams,
case content_length_matches(Headers, BodyLen) of
true ->
%% Pass the iolist body straight through to the worker's
%% Req map. The body field is typed `iodata()`; handlers
%% that need a flat binary call `iolist_to_binary/1`.
{RequestId, NewBuf} = roadrunner_conn:generate_request_id(
State#loop.req_id_buffer
),
RequestContext = #{
peer => Peer,
scheme => Scheme,
listener_name => ListenerName,
request_id => RequestId
},
case roadrunner_http2_request:from_headers(Headers, BodyIolist, RequestContext) of
{ok, Req} ->
{WorkerPid, MonRef} = roadrunner_http2_stream_worker:start(
self(), StreamId, Req, ProtoOpts
),
Stream1 = Stream#{
worker_pid := WorkerPid,
worker_ref := MonRef,
state := half_closed_remote,
body := []
},
State1 = State#loop{
streams = Streams#{StreamId := Stream1},
worker_refs = (State#loop.worker_refs)#{MonRef => StreamId},
req_id_buffer = NewBuf
},
frame_loop(State1);
{error, _Reason} ->
_ = send_rst_stream(State, StreamId, protocol_error),
frame_loop(remove_stream(State#loop{req_id_buffer = NewBuf}, StreamId))
end;
false ->
%% RFC 9113 §8.1.2.6: content-length / DATA-payload
%% mismatch is a stream-error PROTOCOL_ERROR.
_ = send_rst_stream(State, StreamId, protocol_error),
frame_loop(remove_stream(State, StreamId))
end.
%% Verify that any client-supplied `content-length` header matches
%% the cumulative bytes received in DATA frames. Absent header is
%% always acceptable; multi-valued or non-integer values are
%% rejected as mismatches.
%% Single-pass walk: find the first (and check there isn't a
%% second) `content-length` header value, then compare against
%% `BodyLen`. Avoids the per-request list-comprehension allocation
%% the prior shape paid even when no `content-length` was present.
-spec content_length_matches([{binary(), binary()}], non_neg_integer()) -> boolean().
content_length_matches(Headers, BodyLen) ->
case find_content_length(Headers, undefined) of
none ->
true;
multiple ->
false;
V ->
try binary_to_integer(V) of
BodyLen -> true;
_ -> false
catch
error:badarg -> false
end
end.
find_content_length([], undefined) -> none;
find_content_length([], V) -> V;
find_content_length([{~"content-length", _} | _], V) when V =/= undefined -> multiple;
find_content_length([{~"content-length", V} | Rest], undefined) -> find_content_length(Rest, V);
find_content_length([_ | Rest], V) -> find_content_length(Rest, V).
%% =============================================================================
%% Worker → conn message handlers
%% =============================================================================
%% Single-shot buffered-response path. Encodes HEADERS + (optional)
%% DATA(END_STREAM) and writes them in ONE `ssl:send/2`, halving
%% the per-response gen_call cost through `tls_sender`. Falls back
%% to the two-frame two-message path when the body doesn't fit a
%% single DATA frame AND the current window — the worker still
%% sees one ack at the end of the data send.
handle_send_response(State, From, Ref, StreamId, Status, Headers, Body) ->
case stream_open(State, StreamId) of
not_open ->
_ = (From ! {h2_stream_reset, StreamId}),
State;
Stream ->
case iolist_size(Body) of
0 ->
%% Empty body — single HEADERS frame with END_STREAM,
%% same wire shape as `handle_send_headers/_, true)`.
State1 = encode_and_send_headers(State, StreamId, Status, Headers, true),
_ = (From ! {h2_send_ack, Ref}),
State1;
BodyLen ->
%% Short-circuit: skip the `window_budget/2` lookup
%% when the body already won't fit in one frame.
case
BodyLen =< ?MAX_FRAME_SIZE andalso
window_budget(State, Stream) >= BodyLen
of
true ->
State1 = encode_and_send_response_atomic(
State, StreamId, Status, Headers, Body, BodyLen
),
_ = (From ! {h2_send_ack, Ref}),
State1;
false ->
%% Body too big for one frame OR window too
%% narrow — emit HEADERS now, hand the body to
%% `try_send_data` which fragments / queues +
%% acks the worker on completion.
State1 = encode_and_send_headers(
State, StreamId, Status, Headers, false
),
#{StreamId := Stream1} = State1#loop.streams,
try_send_data(
State1, Stream1, StreamId, From, Ref, Body, true
)
end
end
end.
handle_send_headers(State, From, Ref, StreamId, Status, Headers, EndStream) ->
case stream_open(State, StreamId) of
not_open ->
_ = (From ! {h2_stream_reset, StreamId}),
State;
_Stream ->
State1 = encode_and_send_headers(State, StreamId, Status, Headers, EndStream),
_ = (From ! {h2_send_ack, Ref}),
State1
end.
handle_send_data(State, From, Ref, StreamId, Data, EndStream) ->
case stream_open(State, StreamId) of
not_open ->
_ = (From ! {h2_stream_reset, StreamId}),
State;
Stream ->
try_send_data(State, Stream, StreamId, From, Ref, Data, EndStream)
end.
handle_send_trailers(State, From, Ref, StreamId, Trailers) ->
case stream_open(State, StreamId) of
not_open ->
_ = (From ! {h2_stream_reset, StreamId}),
State;
_Stream ->
State1 = encode_and_send_trailers(State, StreamId, Trailers),
_ = (From ! {h2_send_ack, Ref}),
State1
end.
handle_worker_done(State, StreamId) ->
%% The worker exits after this message; the DOWN cleanup will
%% remove the stream. Nothing to do here.
_ = StreamId,
State.
handle_worker_down(#loop{worker_refs = Refs} = State, MonRef, Reason) ->
case Refs of
#{MonRef := StreamId} ->
State1 = State#loop{worker_refs = maps:remove(MonRef, Refs)},
case Reason of
normal -> remove_stream(State1, StreamId);
_ -> abort_stream(State1, StreamId, internal_error)
end;
#{} ->
State
end.
%% Look up a stream that has not yet been reset / closed. Returns
%% `not_open` for streams the conn has already torn down (peer
%% RST_STREAM, write of END_STREAM completed, etc.) — workers
%% asking to write to those should be told to abort.
%% Returns the stream entry map directly on hit, or the atom
%% `not_open` if the stream is gone or its send side is already
%% closed. The two return shapes are disjoint (map vs. atom) so
%% callers can pattern-match without an `{ok, _}` wrapper.
stream_open(#loop{streams = Streams}, StreamId) ->
case Streams of
#{StreamId := #{state := closed}} -> not_open;
#{StreamId := Stream} -> Stream;
#{} -> not_open
end.
%% Encode + write a HEADERS frame (always fits without flow control;
%% the wire just takes it).
encode_and_send_headers(
#loop{hpack_enc = Enc} = State, StreamId, Status, Headers, EndStream
) ->
StatusBin = integer_to_binary(Status),
%% Handler-supplied header names MUST already be lowercase per RFC 9113
%% §8.1.2 (see `roadrunner_handler:response/0`). Bytes outside the
%% RFC 9110 §5.5 field-value charset (CR/LF/NUL) crash here so the
%% h2 path matches h1's `encode_headers/1` discipline.
ok = validate_headers(Headers),
AllHeaders = [{~":status", StatusBin} | Headers],
{HpackBlock, Enc1} = roadrunner_http2_hpack:encode(AllHeaders, Enc),
%% `frame:encode` accepts iodata for the header block — skip
%% the upfront flatten; ssl:send walks the iolist anyway.
Flags =
if
EndStream -> 16#04 bor 16#01;
true -> 16#04
end,
Frame = roadrunner_http2_frame:encode({headers, StreamId, Flags, undefined, HpackBlock}),
_ = send(State, Frame),
State1 = State#loop{hpack_enc = Enc1},
if
EndStream -> close_stream_send_side(State1, StreamId);
true -> State1
end.
%% HEADERS + DATA (END_STREAM) packed into ONE `ssl:send/2`. Body
%% is iodata; the frame encoder accepts it and the transport
%% `writev()`s the assembled iolist so no flatten happens here.
%% Caller has already verified `BodyLen` fits in a single DATA
%% frame AND in the current send window. Consumes the window by
%% `BodyLen` and marks the stream's send side closed.
encode_and_send_response_atomic(
#loop{hpack_enc = Enc, streams = Streams} = State,
StreamId,
Status,
Headers,
Body,
BodyLen
) ->
#{StreamId := Stream} = Streams,
StatusBin = integer_to_binary(Status),
%% Names already lowercase per `roadrunner_handler:response/0` contract;
%% reject CR/LF/NUL anywhere in the pair so they cannot reach the peer
%% or split at an h2->h1 reverse proxy.
ok = validate_headers(Headers),
AllHeaders = [{~":status", StatusBin} | Headers],
{HpackBlock, Enc1} = roadrunner_http2_hpack:encode(AllHeaders, Enc),
HFrame = roadrunner_http2_frame:encode(
{headers, StreamId, 16#04, undefined, HpackBlock}
),
DFrame = roadrunner_http2_frame:encode({data, StreamId, 16#01, Body}),
_ = send(State, [HFrame, DFrame]),
State1 = State#loop{hpack_enc = Enc1},
State2 = consume_send_window(State1, StreamId, Stream, BodyLen),
close_stream_send_side(State2, StreamId).
encode_and_send_trailers(#loop{hpack_enc = Enc} = State, StreamId, Trailers) ->
%% Trailer names already lowercase per `roadrunner_handler:response/0`;
%% h1 trailers run the same check in `roadrunner_stream_response`.
ok = validate_headers(Trailers),
{HpackBlock, Enc1} = roadrunner_http2_hpack:encode(Trailers, Enc),
Frame = roadrunner_http2_frame:encode(
{headers, StreamId, 16#04 bor 16#01, undefined, HpackBlock}
),
_ = send(State, Frame),
State1 = State#loop{hpack_enc = Enc1},
close_stream_send_side(State1, StreamId).
%% RFC 9110 §5.5 / RFC 9113 §8.2.1: field values are VCHAR / SP /
%% HTAB only; no CTLs. HPACK is length-framed so CR/LF cannot smuggle
%% a new h2 frame, but malformed bytes still reach the peer (and any
%% downstream h2->h1 reverse proxy where they would split). Crash
%% hard so a handler echoing user input into a header turns into a
%% 500, matching `roadrunner_http1:encode_headers/1`.
-spec validate_headers([{binary(), binary()}]) -> ok.
validate_headers([]) ->
ok;
validate_headers([{Name, Value} | Rest]) ->
ok = roadrunner_http1:check_header_safe(Name, name),
ok = roadrunner_http1:check_header_safe(Value, value),
validate_headers(Rest).
%% Mark the send side closed. Future worker writes on this stream
%% get `{h2_stream_reset, _}` so they unwind cleanly.
close_stream_send_side(#loop{streams = Streams} = State, StreamId) ->
#{StreamId := Stream} = Streams,
State#loop{streams = Streams#{StreamId := Stream#{state := closed}}}.
%% =============================================================================
%% DATA send + flow control
%% =============================================================================
%% Try to send `Data` as DATA frame(s). If both windows allow, send
%% everything and ack. If a partial chunk fits, send what we can
%% and queue the rest. If nothing fits, queue the whole thing.
%%
%% `Data` is iodata. The single-frame branch in `send_data_chunks`
%% passes it straight to the frame encoder (which accepts iodata)
%% and the transport `writev()`s it, so the common streaming case
%% never materialises to a flat binary.
%%
%% Empty `Data` always means `EndStream = true` because workers
%% short-circuit empty `nofin` sends — the empty-body case here
%% doesn't consume any window bytes, so it bypasses the window
%% check entirely.
try_send_data(State, Stream, StreamId, From, Ref, Data, EndStream) ->
case iolist_size(Data) of
0 when EndStream ->
%% Final empty DATA frame closes the stream.
Frame = roadrunner_http2_frame:encode({data, StreamId, 16#01, <<>>}),
_ = send(State, Frame),
_ = (From ! {h2_send_ack, Ref}),
close_stream_send_side(State, StreamId);
Total ->
case window_budget(State, Stream) of
0 ->
%% Window closed — queue the whole send.
Stream1 = enqueue_pending(Stream, {data, Ref, From, Data, EndStream}),
update_stream(State, StreamId, Stream1);
_ ->
send_data_chunks(State, Stream, StreamId, From, Ref, Data, Total, EndStream)
end
end.
send_data_chunks(State, Stream, StreamId, From, Ref, Data, Total, EndStream) ->
Available = window_budget(State, Stream),
Take = min(min(Total, Available), ?MAX_FRAME_SIZE),
case Take of
0 ->
%% Window closed mid-body — queue the remainder.
Stream1 = enqueue_pending(Stream, {data, Ref, From, Data, EndStream}),
update_stream(State, StreamId, Stream1);
N when N =:= Total ->
%% Fits in one DATA frame and within the window: ship
%% iodata straight through. Frame encoder accepts iodata
%% and the transport's writev() avoids the flatten.
Flags =
if
EndStream -> 16#01;
true -> 0
end,
Frame = roadrunner_http2_frame:encode({data, StreamId, Flags, Data}),
_ = send(State, Frame),
State1 = consume_send_window(State, StreamId, Stream, N),
_ = (From ! {h2_send_ack, Ref}),
if
EndStream -> close_stream_send_side(State1, StreamId);
true -> State1
end;
N ->
%% Chunking across window/MAX_FRAME_SIZE boundaries:
%% materialise once so we can slice with binary patterns
%% (sub-binaries, no per-chunk copy). On recursion `Rest`
%% is a binary and we thread the running size as `Total -
%% N`, so no further `iolist_size/1` walk is needed.
Bin = iolist_to_binary(Data),
<<Chunk:N/binary, Rest/binary>> = Bin,
Frame = roadrunner_http2_frame:encode({data, StreamId, 0, Chunk}),
_ = send(State, Frame),
State1 = consume_send_window(State, StreamId, Stream, N),
#{StreamId := Stream1} = State1#loop.streams,
send_data_chunks(State1, Stream1, StreamId, From, Ref, Rest, Total - N, EndStream)
end.
window_budget(#loop{conn_send_window = ConnW}, #{send_window := StreamW}) ->
max(0, min(ConnW, StreamW)).
consume_send_window(
#loop{conn_send_window = ConnW, streams = Streams} = State, StreamId, Stream, N
) ->
#{send_window := SW} = Stream,
Stream1 = Stream#{send_window := SW - N},
State#loop{
conn_send_window = ConnW - N,
streams = Streams#{StreamId := Stream1}
}.
enqueue_pending(#{pending_sends := Pending} = Stream, Entry) ->
Stream#{pending_sends := queue:in(Entry, Pending)}.
%% After a stream's send window grew, drain its pending DATA queue
%% as far as windows allow.
flush_pending_data(#loop{streams = Streams} = State, StreamId) ->
#{StreamId := #{pending_sends := Pending} = Stream} = Streams,
case queue:is_empty(Pending) of
true -> State;
false -> drain_pending(State, StreamId, Stream)
end.
%% After the conn-level send window grew, drain every stream's
%% pending queue. We iterate keys (not values) — `flush_pending_data`
%% re-fetches the stream from the post-iteration state which may
%% already be mutated by a prior drain.
flush_all_pending_data(#loop{streams = Streams} = State) ->
lists:foldl(
fun(StreamId, AccState) -> flush_pending_data(AccState, StreamId) end,
State,
maps:keys(Streams)
).
%% Drain queued sends on a single stream until either the queue is
%% empty or the window forces us to stop again.
drain_pending(State, StreamId, #{pending_sends := Pending} = Stream) ->
case queue:out(Pending) of
{empty, _} ->
State;
{{value, Entry}, Rest} ->
{data, Ref, From, Bin, EndStream} = Entry,
Stream1 = Stream#{pending_sends := Rest},
State1 = update_stream(State, StreamId, Stream1),
State2 = try_send_data(State1, Stream1, StreamId, From, Ref, Bin, EndStream),
#{StreamId := #{pending_sends := Pending2} = Stream2} = State2#loop.streams,
case queue:peek(Pending2) of
{value, Entry} ->
%% Same entry re-queued: window still closed.
State2;
_ ->
drain_pending(State2, StreamId, Stream2)
end
end.
%% =============================================================================
%% Stream lifecycle helpers
%% =============================================================================
update_stream(#loop{streams = Streams} = State, StreamId, Stream) ->
State#loop{streams = Streams#{StreamId := Stream}}.
%% Peer sent RST_STREAM for a stream we have alive. Tell the worker
%% (if any) to bail, then drop our stream entry. Pending sends get
%% reset notifications so workers waiting on `h2_send_ack` unwind.
reset_stream(#loop{streams = Streams, worker_refs = Refs} = State, StreamId) ->
#{
StreamId := #{
pending_sends := Pending,
worker_pid := WorkerPid,
worker_ref := WorkerRef
}
} = Streams,
notify_pending_reset(StreamId, Pending),
Refs1 =
case WorkerRef of
undefined ->
%% RST landed before END_STREAM dispatched a worker.
Refs;
MonRef ->
_ = (WorkerPid ! {h2_stream_reset, StreamId}),
true = demonitor(MonRef, [flush]),
maps:remove(MonRef, Refs)
end,
State#loop{
streams = maps:remove(StreamId, Streams),
worker_refs = Refs1
}.
%% Called by `handle_worker_down/3` when a worker dies abnormally.
%% Send RST_STREAM(error_code) to the peer and drop our state.
abort_stream(#loop{streams = Streams} = State, StreamId, ErrorCode) ->
#{StreamId := #{pending_sends := Pending}} = Streams,
notify_pending_reset(StreamId, Pending),
_ = send_rst_stream(State, StreamId, ErrorCode),
State#loop{streams = maps:remove(StreamId, Streams)}.
%% Worker exited normally (handler done, all frames already on the
%% wire). Just drop state.
remove_stream(#loop{streams = Streams} = State, StreamId) ->
State#loop{streams = maps:remove(StreamId, Streams)}.
notify_pending_reset(StreamId, Pending) ->
case queue:out(Pending) of
{empty, _} ->
ok;
{{value, {data, _Ref, From, _Bin, _Es}}, Rest} ->
_ = (From ! {h2_stream_reset, StreamId}),
notify_pending_reset(StreamId, Rest)
end.
%% =============================================================================
%% Generic helpers
%% =============================================================================
%% A second HEADERS frame on an already-open stream is only valid
%% as a trailer block (RFC 9113 §8.1): the body must still be
%% open (no END_STREAM seen yet) and the trailer HEADERS frame
%% MUST set END_STREAM. We don't need to check `headers =/=
%% undefined` because the only path into here is a duplicate
%% HEADERS for a stream where the first HEADERS had END_HEADERS=true
%% (otherwise the awaiting_continuation guard fires upstream),
%% which guarantees `finalize_headers/2` has already run.
-spec is_trailer_block(map(), non_neg_integer()) -> boolean().
is_trailer_block(#{end_stream_seen := true}, _Flags) ->
false;
is_trailer_block(_Stream, Flags) ->
(Flags band 16#01) =/= 0.
%% RFC 9113 §6.9.2: when peer changes INITIAL_WINDOW_SIZE we shift
%% every open stream's send window by the delta. Overflow on any
%% stream is FLOW_CONTROL_ERROR (connection error). New streams
%% use the latest value via `peer_initial_window`.
-spec apply_initial_window_size(
[{non_neg_integer(), non_neg_integer()}], #loop{}
) -> {ok, #loop{}} | {error, flow_control_error}.
apply_initial_window_size(Params, #loop{peer_initial_window = Old} = State) ->
%% RFC 9113 §6.5.3: when SETTINGS contains the same id more
%% than once, the LAST value wins. Walk in order and keep the
%% last `{4, V}` we see — single pass, no list-comp + reverse.
case last_setting(Params, 4, undefined) of
undefined ->
{ok, State};
New ->
Delta = New - Old,
case shift_stream_send_windows(State#loop.streams, Delta) of
{ok, Streams1} ->
{ok, State#loop{
peer_initial_window = New,
streams = Streams1
}};
{error, _} = E ->
E
end
end.
last_setting([], _Id, V) -> V;
last_setting([{Id, V} | Rest], Id, _) -> last_setting(Rest, Id, V);
last_setting([_ | Rest], Id, V) -> last_setting(Rest, Id, V).
%% Map-comprehension shift across the streams map (OTP 28+). A
%% single overflow short-circuits via `throw/1` — cleaner than
%% threading a Result tuple through the comprehension.
shift_stream_send_windows(Streams, Delta) ->
try
Streams1 =
#{
Id => Stream#{send_window := check_window(SW + Delta)}
|| Id := #{send_window := SW} = Stream <- Streams
},
{ok, Streams1}
catch
throw:flow_control_error ->
{error, flow_control_error}
end.
check_window(W) when W > ?MAX_WINDOW -> throw(flow_control_error);
check_window(W) -> W.
%% Validate the per-id constraints on incoming SETTINGS values per
%% RFC 9113 §6.5.2: ENABLE_PUSH ∈ {0,1}, INITIAL_WINDOW_SIZE
%% ≤ 2^31-1, MAX_FRAME_SIZE ∈ [2^14, 2^24-1]. Other ids carry no
%% range constraints (or are forward-compat unknowns we ignore).
-spec validate_settings([{non_neg_integer(), non_neg_integer()}]) ->
ok | {error, {protocol_error | flow_control_error, atom()}}.
validate_settings([]) ->
ok;
validate_settings([{2, V} | _]) when V =/= 0, V =/= 1 ->
{error, {protocol_error, enable_push_value}};
validate_settings([{4, V} | _]) when V > 16#7FFFFFFF ->
{error, {flow_control_error, initial_window_size}};
validate_settings([{5, V} | _]) when V < 16384; V > 16#FFFFFF ->
{error, {protocol_error, max_frame_size}};
validate_settings([_ | Rest]) ->
validate_settings(Rest).
-spec send(#loop{}, iodata()) -> ok | {error, term()}.
send(#loop{socket = Socket}, Data) ->
roadrunner_transport:send(Socket, Data).
-spec send_goaway(#loop{}, roadrunner_http2_frame:error_code()) -> ok | {error, term()}.
send_goaway(#loop{last_stream_id = LastId} = State, ErrorCode) ->
send(State, ?GOAWAY(LastId, ErrorCode)).
-spec send_rst_stream(#loop{}, pos_integer(), roadrunner_http2_frame:error_code()) ->
ok | {error, term()}.
send_rst_stream(State, StreamId, ErrorCode) ->
send(State, roadrunner_http2_frame:encode({rst_stream, StreamId, ErrorCode})).
-spec exit_clean(#loop{}) -> no_return().
exit_clean(#loop{
socket = Socket,
proto_opts = ProtoOpts,
listener_name = ListenerName,
peer = Peer,
start_mono = StartMono
}) ->
roadrunner_telemetry:listener_conn_close(StartMono, #{
listener_name => ListenerName,
peer => Peer,
requests_served => 0
}),
ok = roadrunner_conn:release_slot(ProtoOpts),
ok = roadrunner_transport:close(Socket),
exit(normal).