-module(nhttp_h3).
-moduledoc """
HTTP/3 protocol layer.
Pure functional HTTP/3 connection state machine (RFC 9114). Sits between
the QUIC transport (nquic) and the application layer. The caller manages
QUIC I/O and routes data to/from this module. Return values include
`actions` that the caller must execute via nquic.
## Usage
```erlang
H3 = nhttp_h3:new(server, #{}),
{ok, H3_1, Actions} = nhttp_h3:init_local_streams(H3, #{
control => CtrlId, encoder => EncId, decoder => DecId
}),
execute_actions(QConn, Actions),
loop(QConn, H3_1).
```
## Sending bodies and trailers
The HTTP/3 send surface is frame-oriented. The canonical
`t:nhttp_lib:request/0` and `t:nhttp_lib:response/0` maps can carry a `body`
and `trailers` field as a convenience for "everything is in memory", but
this layer does not consume those maps directly: the caller breaks the
exchange into discrete frame sends. The pattern is:
1. `send_headers(Conn, StreamId, Headers, nofin)` to emit pseudo-headers
plus regular headers (HEADERS frame, QPACK-encoded).
2. Zero or more `send_data(Conn, StreamId, Chunk, nofin)` calls.
3. Either `send_data(Conn, StreamId, FinalChunk, fin)` to close on body,
or `send_headers(Conn, StreamId, Trailers, fin)` to close on trailers.
`send_response/4` is the convenience one-shot for "headers + complete
body + END_STREAM" when the body is already a single `iodata()`.
""".
-include("nhttp_msg.hrl").
%%%-----------------------------------------------------------------------------
%% API EXPORTS
%%%-----------------------------------------------------------------------------
-export([
init_local_streams/2,
new/2,
recv/4,
send_data/4,
send_goaway/1,
send_headers/4,
send_response/4,
set_peer/2,
stream_opened/3,
stream_reset/3
]).
%%%-----------------------------------------------------------------------------
%% TYPE EXPORTS
%%%-----------------------------------------------------------------------------
-export_type([
action/0,
conn/0,
error_code/0,
event/0,
fin/0,
h3_error/0,
h3_error_code/0,
h3_settings/0,
role/0
]).
%%%-----------------------------------------------------------------------------
%% TYPES
%%%-----------------------------------------------------------------------------
-type fin() :: nhttp_lib:fin().
-type role() :: nhttp_lib:role().
-type h3_settings() :: nhttp_h3_frame:h3_settings().
-type h3_error_code() ::
h3_no_error
| h3_general_protocol_error
| h3_internal_error
| h3_stream_creation_error
| h3_closed_critical_stream
| h3_frame_unexpected
| h3_frame_error
| h3_excessive_load
| h3_id_error
| h3_settings_error
| h3_missing_settings
| h3_request_rejected
| h3_request_cancelled
| h3_request_incomplete
| h3_message_error
| h3_connect_error
| h3_version_fallback
| qpack_decompression_failed
| qpack_encoder_stream_error
| qpack_decoder_stream_error.
-type h3_error() ::
{connection_error, h3_error_code(), binary()}
| {stream_error, nhttp_lib:stream_id(), h3_error_code(), binary()}.
-type error_code() :: nhttp_lib:error_code().
-type event() ::
nhttp_lib:event_common()
| {settings, h3_settings()}
| {push_promise, nhttp_lib:stream_id(), non_neg_integer(), nhttp_lib:headers()}.
-type action() ::
{send, nhttp_lib:stream_id(), iodata()}
| {send_fin, nhttp_lib:stream_id(), iodata()}
| {close_connection, non_neg_integer(), binary()}.
-type stream_state() ::
open
| half_closed_local
| half_closed_remote
| closed.
%%%-----------------------------------------------------------------------------
%% CONSTANTS
%%%-----------------------------------------------------------------------------
-define(UNI_CONTROL, 16#00).
-define(UNI_PUSH, 16#01).
-define(UNI_QPACK_ENCODER, 16#02).
-define(UNI_QPACK_DECODER, 16#03).
%%%-----------------------------------------------------------------------------
%% RECORDS
%%%-----------------------------------------------------------------------------
-record(h3_stream, {
id :: nhttp_lib:stream_id(),
state = open :: stream_state(),
headers_received = false :: boolean(),
trailers_received = false :: boolean(),
content_length = undefined :: non_neg_integer() | undefined,
recv_body_length = 0 :: non_neg_integer()
}).
-record(h3_conn, {
role :: role(),
state = init :: init | open | closing | closed,
local_settings :: h3_settings(),
peer_settings :: h3_settings(),
settings_received = false :: boolean(),
local_control_stream :: nhttp_lib:stream_id() | undefined,
peer_control_stream :: nhttp_lib:stream_id() | undefined,
local_encoder_stream :: nhttp_lib:stream_id() | undefined,
peer_encoder_stream :: nhttp_lib:stream_id() | undefined,
local_decoder_stream :: nhttp_lib:stream_id() | undefined,
peer_decoder_stream :: nhttp_lib:stream_id() | undefined,
qpack_enc :: nhttp_qpack:encoder(),
qpack_dec :: nhttp_qpack:decoder(),
streams = #{} :: #{nhttp_lib:stream_id() => #h3_stream{}},
last_peer_stream_id = 0 :: nhttp_lib:stream_id(),
max_push_id = 0 :: non_neg_integer(),
peer_max_push_id = 0 :: non_neg_integer(),
goaway_sent = false :: boolean(),
goaway_received = false :: boolean(),
goaway_id = 0 :: non_neg_integer(),
uni_stream_bufs = #{} :: #{nhttp_lib:stream_id() => binary()},
ignored_uni_streams = #{} :: #{nhttp_lib:stream_id() => true},
stream_bufs = #{} :: #{nhttp_lib:stream_id() => binary()},
peer = undefined :: undefined | nhttp_lib:peer()
}).
-opaque conn() :: #h3_conn{}.
%%%-----------------------------------------------------------------------------
%% API FUNCTIONS
%%%-----------------------------------------------------------------------------
-doc """
Register QUIC stream IDs for local unidirectional streams.
Returns initial data to send on each (stream type prefix + settings on control).
""".
-spec init_local_streams(conn(), #{
control := nhttp_lib:stream_id(),
encoder := nhttp_lib:stream_id(),
decoder := nhttp_lib:stream_id()
}) -> {ok, conn(), [action()]}.
init_local_streams(Conn, #{control := CtrlId, encoder := EncId, decoder := DecId}) ->
CtrlTypeBin = nquic_varint:encode(?UNI_CONTROL),
{ok, SettingsFrame} = nhttp_h3_frame:settings(Conn#h3_conn.local_settings),
CtrlData = [CtrlTypeBin, SettingsFrame],
EncTypeBin = nquic_varint:encode(?UNI_QPACK_ENCODER),
DecTypeBin = nquic_varint:encode(?UNI_QPACK_DECODER),
NewConn = Conn#h3_conn{
local_control_stream = CtrlId,
local_encoder_stream = EncId,
local_decoder_stream = DecId,
state = open
},
Actions = [
{send, CtrlId, CtrlData},
{send, EncId, EncTypeBin},
{send, DecId, DecTypeBin}
],
{ok, NewConn, Actions}.
-doc "Create a new HTTP/3 connection.".
-spec new(role(), h3_settings()) -> conn().
new(Role, Settings) ->
Merged = maps:merge(default_settings(), Settings),
QpackMaxCap = maps:get(qpack_max_table_capacity, Merged, 0),
QpackBlocked = maps:get(qpack_blocked_streams, Merged, 0),
{ok, Enc} = nhttp_qpack:new_encoder(#{
max_table_capacity => 0,
configured_max_capacity => QpackMaxCap,
max_blocked_streams => 0,
configured_max_blocked => QpackBlocked
}),
{ok, Dec} = nhttp_qpack:new_decoder(#{
max_table_capacity => QpackMaxCap,
max_blocked_streams => QpackBlocked
}),
#h3_conn{
role = Role,
local_settings = Merged,
peer_settings = default_settings(),
qpack_enc = Enc,
qpack_dec = Dec
}.
-doc "Process incoming data on any QUIC stream.".
-spec recv(conn(), nhttp_lib:stream_id(), binary(), fin()) ->
{ok, [event()], conn(), [action()]}
| {error, h3_error()}.
recv(#h3_conn{} = Conn, StreamId, Data, Fin) ->
case classify_stream(Conn, StreamId) of
{bidi, request} ->
recv_request_stream(Conn, StreamId, Data, Fin);
{uni, control} ->
recv_control_stream(Conn, StreamId, Data, Fin);
{uni, qpack_encoder} ->
recv_encoder_stream(Conn, StreamId, Data, Fin);
{uni, qpack_decoder} ->
recv_decoder_stream(Conn, StreamId, Data, Fin);
ignored_uni ->
{ok, [], Conn, []};
unknown_uni ->
recv_new_uni_stream(Conn, StreamId, Data, Fin)
end.
-doc "Encode and send data on a request stream.".
-spec send_data(conn(), nhttp_lib:stream_id(), iodata(), fin()) ->
{ok, conn(), [action()]} | {error, h3_error()}.
send_data(#h3_conn{} = Conn, StreamId, Data, Fin) ->
case validate_send_state(Conn, StreamId) of
ok ->
{ok, DataFrame} = nhttp_h3_frame:data(Data),
Stream = get_or_create_stream(Conn, StreamId),
OldState = Stream#h3_stream.state,
NewState = transition_on_send(OldState, Fin),
NewStream = Stream#h3_stream{state = NewState},
NewConn = update_stream(Conn, StreamId, NewStream),
Action =
case Fin of
fin -> {send_fin, StreamId, DataFrame};
nofin -> {send, StreamId, DataFrame}
end,
{ok, NewConn, [Action]};
{error, _} = E ->
E
end.
-doc "Send GOAWAY on the control stream.".
-spec send_goaway(conn()) ->
{ok, conn(), [action()]}.
send_goaway(
#h3_conn{
role = server,
last_peer_stream_id = LastId,
local_control_stream = CtrlId
} = Conn
) ->
{ok, Frame} = nhttp_h3_frame:goaway(LastId),
NewConn = Conn#h3_conn{
goaway_sent = true,
goaway_id = LastId,
state = closing
},
{ok, NewConn, [{send, CtrlId, Frame}]};
send_goaway(
#h3_conn{
role = client,
peer_max_push_id = PushId,
local_control_stream = CtrlId
} = Conn
) ->
{ok, Frame} = nhttp_h3_frame:goaway(PushId),
NewConn = Conn#h3_conn{
goaway_sent = true,
goaway_id = PushId,
state = closing
},
{ok, NewConn, [{send, CtrlId, Frame}]}.
-doc "Encode and send headers on a request stream.".
-spec send_headers(conn(), nhttp_lib:stream_id(), nhttp_lib:headers(), fin()) ->
{ok, conn(), [action()]} | {error, h3_error()}.
send_headers(
#h3_conn{qpack_enc = Enc, local_encoder_stream = EncStreamId} = Conn,
StreamId,
Headers,
Fin
) ->
case validate_send_state(Conn, StreamId) of
ok ->
{ok, NewEnc, EncStreamData, FieldSection} =
nhttp_qpack:encode_field_section(Enc, StreamId, Headers),
{ok, HeadersFrame} = nhttp_h3_frame:headers(FieldSection),
Stream = get_or_create_stream(Conn, StreamId),
OldState = Stream#h3_stream.state,
NewState = transition_on_send(OldState, Fin),
NewStream = Stream#h3_stream{state = NewState},
NewConn0 = Conn#h3_conn{qpack_enc = NewEnc},
NewConn = update_stream(NewConn0, StreamId, NewStream),
Actions = build_send_actions(EncStreamId, EncStreamData, StreamId, HeadersFrame, Fin),
{ok, NewConn, Actions};
{error, _} = E ->
E
end.
-doc """
Encode and send a complete response (headers + body) on a request stream.
Combines HEADERS and DATA frames into a single `send_fin` action, avoiding
redundant stream lookups and state transitions compared to calling
`send_headers/4` then `send_data/4` separately.
""".
-spec send_response(conn(), nhttp_lib:stream_id(), nhttp_lib:headers(), iodata()) ->
{ok, conn(), [action()]} | {error, h3_error()}.
send_response(
#h3_conn{qpack_enc = Enc, local_encoder_stream = EncStreamId} = Conn,
StreamId,
Headers,
Body
) ->
case validate_send_state(Conn, StreamId) of
ok ->
{ok, NewEnc, EncStreamData, FieldSection} =
nhttp_qpack:encode_field_section(Enc, StreamId, Headers),
{ok, HeadersFrame} = nhttp_h3_frame:headers(FieldSection),
{ok, DataFrame} = nhttp_h3_frame:data(Body),
Stream = get_or_create_stream(Conn, StreamId),
OldState = Stream#h3_stream.state,
NewState = transition_on_send(OldState, fin),
NewStream = Stream#h3_stream{state = NewState},
NewConn0 = Conn#h3_conn{qpack_enc = NewEnc},
NewConn = update_stream(NewConn0, StreamId, NewStream),
CombinedData = [HeadersFrame, DataFrame],
Actions = build_response_actions(
EncStreamId, EncStreamData, StreamId, CombinedData
),
{ok, NewConn, Actions};
{error, _} = E ->
E
end.
-doc """
Record the peer address on the connection. Called once after the QUIC
connection is established so server-built `t:nhttp_lib:request/0` maps and
client-built `t:nhttp_lib:response/0` events carry the correct remote peer.
""".
-spec set_peer(conn(), nhttp_lib:peer()) -> conn().
set_peer(#h3_conn{} = Conn, {{_, _, _, _}, Port} = Peer) when
is_integer(Port), Port >= 0, Port =< 65535
->
Conn#h3_conn{peer = Peer};
set_peer(#h3_conn{} = Conn, {{_, _, _, _, _, _, _, _}, Port} = Peer) when
is_integer(Port), Port >= 0, Port =< 65535
->
Conn#h3_conn{peer = Peer}.
-doc "Notify that the peer opened a new stream.".
-spec stream_opened(conn(), nhttp_lib:stream_id(), bidi | uni) ->
{ok, conn()} | {error, h3_error()}.
stream_opened(Conn, StreamId, bidi) ->
case Conn#h3_conn.goaway_sent of
true ->
GoawayId = Conn#h3_conn.goaway_id,
case StreamId > GoawayId of
true ->
{error,
{stream_error, StreamId, h3_request_rejected,
<<"Stream opened after GOAWAY">>}};
false ->
{ok, Conn}
end;
false ->
{ok, Conn}
end;
stream_opened(Conn, _StreamId, uni) ->
{ok, Conn}.
-doc "Handle a peer stream reset.".
-spec stream_reset(conn(), nhttp_lib:stream_id(), non_neg_integer()) ->
{ok, [event()], conn()}.
stream_reset(Conn, StreamId, ErrorCode) ->
Events = [{stream_reset, StreamId, error_code_atom(ErrorCode)}],
NewConn = close_stream(Conn, StreamId),
{ok, Events, NewConn}.
%%%-----------------------------------------------------------------------------
%% INTERNAL: STREAM CLASSIFICATION
%%%-----------------------------------------------------------------------------
-spec classify_stream(conn(), nhttp_lib:stream_id()) ->
{bidi, request}
| {uni, control | qpack_encoder | qpack_decoder}
| ignored_uni
| unknown_uni.
classify_stream(#h3_conn{peer_control_stream = Id}, Id) when Id =/= undefined ->
{uni, control};
classify_stream(#h3_conn{peer_encoder_stream = Id}, Id) when Id =/= undefined ->
{uni, qpack_encoder};
classify_stream(#h3_conn{peer_decoder_stream = Id}, Id) when Id =/= undefined ->
{uni, qpack_decoder};
classify_stream(#h3_conn{uni_stream_bufs = Bufs, ignored_uni_streams = Ignored}, StreamId) ->
case maps:is_key(StreamId, Ignored) of
true ->
ignored_uni;
false ->
case maps:is_key(StreamId, Bufs) of
true ->
unknown_uni;
false ->
case is_uni_stream(StreamId) of
true -> unknown_uni;
false -> {bidi, request}
end
end
end.
%%%-----------------------------------------------------------------------------
%% INTERNAL: RECEIVING ON DIFFERENT STREAM TYPES
%%%-----------------------------------------------------------------------------
-spec process_control_frame(conn(), nhttp_h3_frame:t()) ->
{ok, conn(), [event()], [action()]} | {error, h3_error()}.
process_control_frame(#h3_conn{settings_received = false}, Frame) when
element(1, Frame) =/= settings
->
{error,
{connection_error, h3_missing_settings,
<<"First frame on control stream must be SETTINGS (RFC 9114 Section 6.2.1)">>}};
process_control_frame(#h3_conn{settings_received = true}, {settings, _}) ->
{error,
{connection_error, h3_frame_unexpected,
<<"Second SETTINGS on control stream (RFC 9114 Section 7.2.4)">>}};
process_control_frame(Conn, {settings, PeerSettings}) ->
NewConn = apply_peer_settings(Conn, PeerSettings),
{ok, NewConn, [{settings, PeerSettings}], []};
process_control_frame(Conn, {goaway, Id}) ->
process_goaway(Conn, Id);
process_control_frame(Conn, {max_push_id, PushId}) ->
process_max_push_id(Conn, PushId);
process_control_frame(Conn, {cancel_push, PushId}) ->
{ok, Conn, [{cancel_push, PushId}], []};
process_control_frame(_Conn, {data, _}) ->
{error,
{connection_error, h3_frame_unexpected,
<<"DATA on control stream (RFC 9114 Section 7.2.1)">>}};
process_control_frame(_Conn, {headers, _}) ->
{error,
{connection_error, h3_frame_unexpected,
<<"HEADERS on control stream (RFC 9114 Section 7.2.2)">>}};
process_control_frame(_Conn, {push_promise, _, _}) ->
{error,
{connection_error, h3_frame_unexpected,
<<"PUSH_PROMISE on control stream (RFC 9114 Section 7.2.5)">>}};
process_control_frame(Conn, {unknown, _Type, _Payload}) ->
{ok, Conn, [], []}.
-spec process_request_frame(conn(), nhttp_lib:stream_id(), nhttp_h3_frame:t(), boolean()) ->
{ok, conn(), [event()], [action()]} | {error, h3_error()}.
process_request_frame(Conn, StreamId, {headers, FieldSection}, IsFinal) ->
Stream = get_or_create_stream(Conn, StreamId),
case Stream#h3_stream.headers_received of
false ->
decode_headers(Conn, StreamId, Stream, FieldSection, IsFinal);
true ->
case Stream#h3_stream.trailers_received of
true ->
{error,
{stream_error, StreamId, h3_message_error,
<<"Multiple trailer sections (RFC 9114 Section 4.1)">>}};
false ->
decode_trailers(Conn, StreamId, Stream, FieldSection, IsFinal)
end
end;
process_request_frame(Conn, StreamId, {data, Payload}, IsFinal) ->
Stream = get_or_create_stream(Conn, StreamId),
case Stream#h3_stream.headers_received of
false ->
{error,
{connection_error, h3_frame_unexpected,
<<"DATA before HEADERS (RFC 9114 Section 4.1)">>}};
true ->
DataLen = byte_size(Payload),
NewRecvLen = Stream#h3_stream.recv_body_length + DataLen,
FinAtom = bool_to_fin(IsFinal),
case
nhttp_msg:validate_content_length(
Stream#h3_stream.content_length, NewRecvLen, FinAtom
)
of
ok ->
NewState = transition_on_recv(Stream#h3_stream.state, FinAtom),
NewStream = Stream#h3_stream{
state = NewState,
recv_body_length = NewRecvLen
},
NewConn = update_stream(Conn, StreamId, NewStream),
{ok, NewConn, [{data, StreamId, Payload, FinAtom}], []};
{error, content_length_mismatch} ->
{error,
{stream_error, StreamId, h3_message_error,
<<"Content-Length mismatch (RFC 9114 Section 4.1.2)">>}}
end
end;
process_request_frame(_Conn, _StreamId, {cancel_push, _}, _IsFinal) ->
{error,
{connection_error, h3_frame_unexpected,
<<"CANCEL_PUSH on request stream (RFC 9114 Section 7.2.3)">>}};
process_request_frame(_Conn, _StreamId, {settings, _}, _IsFinal) ->
{error,
{connection_error, h3_frame_unexpected,
<<"SETTINGS on request stream (RFC 9114 Section 7.2.4)">>}};
process_request_frame(_Conn, _StreamId, {goaway, _}, _IsFinal) ->
{error,
{connection_error, h3_frame_unexpected,
<<"GOAWAY on request stream (RFC 9114 Section 7.2.6)">>}};
process_request_frame(_Conn, _StreamId, {max_push_id, _}, _IsFinal) ->
{error,
{connection_error, h3_frame_unexpected,
<<"MAX_PUSH_ID on request stream (RFC 9114 Section 7.2.7)">>}};
process_request_frame(Conn, StreamId, {push_promise, PushId, FieldSection}, _IsFinal) ->
process_push_promise(Conn, StreamId, PushId, FieldSection);
process_request_frame(Conn, _StreamId, {unknown, _Type, _Payload}, _IsFinal) ->
{ok, Conn, [], []}.
-spec recv_control_frames(conn(), nhttp_lib:stream_id(), binary(), [event()], [action()]) ->
{ok, [event()], conn(), [action()]} | {error, h3_error()}.
recv_control_frames(Conn, StreamId, Data, Events, Actions) ->
case nhttp_h3_frame:decode(Data) of
{ok, Frame, Rest} ->
case process_control_frame(Conn, Frame) of
{ok, NewConn, NewEvents, NewActions} ->
recv_control_frames(
NewConn,
StreamId,
Rest,
lists:reverse(NewEvents, Events),
lists:reverse(NewActions, Actions)
);
{error, _} = E ->
E
end;
{more, _} ->
NewConn = set_stream_buf(Conn, StreamId, Data),
{ok, lists:reverse(Events), NewConn, lists:reverse(Actions)};
{error, h3_frame_unexpected} ->
{error,
{connection_error, h3_frame_unexpected,
<<"Forbidden H2 frame on control stream (RFC 9114 Section 7.2.8)">>}};
{error, h3_settings_error} ->
{error,
{connection_error, h3_settings_error,
<<"Forbidden H2 setting in SETTINGS (RFC 9114 Section 7.2.4.1)">>}};
{error, h3_frame_error} ->
{error,
{connection_error, h3_frame_error,
<<"Malformed frame on control stream (RFC 9114 Section 7)">>}}
end.
-spec recv_control_stream(conn(), nhttp_lib:stream_id(), binary(), fin()) ->
{ok, [event()], conn(), [action()]} | {error, h3_error()}.
recv_control_stream(Conn, StreamId, Data, Fin) ->
case Fin of
fin ->
{error,
{connection_error, h3_closed_critical_stream,
<<"Control stream closed (RFC 9114 Section 6.2.1)">>}};
nofin ->
Combined = prepend_stream_buf(Conn, StreamId, Data),
recv_control_frames(Conn, StreamId, Combined, [], [])
end.
-spec recv_decoder_stream(conn(), nhttp_lib:stream_id(), binary(), fin()) ->
{ok, [event()], conn(), [action()]} | {error, h3_error()}.
recv_decoder_stream(Conn, StreamId, Data, Fin) ->
case Fin of
fin ->
{error,
{connection_error, h3_closed_critical_stream,
<<"QPACK decoder stream closed (RFC 9114 Section 6.2.1)">>}};
nofin when byte_size(Data) =:= 0 ->
{ok, [], Conn, []};
nofin ->
case nhttp_qpack:feed_decoder_stream(Conn#h3_conn.qpack_enc, Data) of
{ok, NewEnc} ->
NewConn = Conn#h3_conn{qpack_enc = NewEnc},
NewConn1 = set_stream_buf(NewConn, StreamId, <<>>),
{ok, [], NewConn1, []};
{error, Reason} ->
{error,
{connection_error, qpack_decoder_stream_error,
iolist_to_binary(
io_lib:format("QPACK decoder stream error: ~p", [Reason])
)}}
end
end.
-spec recv_encoder_stream(conn(), nhttp_lib:stream_id(), binary(), fin()) ->
{ok, [event()], conn(), [action()]} | {error, h3_error()}.
recv_encoder_stream(Conn, StreamId, Data, Fin) ->
case Fin of
fin ->
{error,
{connection_error, h3_closed_critical_stream,
<<"QPACK encoder stream closed (RFC 9114 Section 6.2.1)">>}};
nofin when byte_size(Data) =:= 0 ->
{ok, [], Conn, []};
nofin ->
case nhttp_qpack:feed_encoder_stream(Conn#h3_conn.qpack_dec, Data) of
{ok, NewDec, Unblocked} ->
{Events, Actions} = process_unblocked(Unblocked),
DecStreamId = Conn#h3_conn.local_decoder_stream,
DecActions = decoder_stream_actions(DecStreamId, Unblocked),
NewConn = Conn#h3_conn{qpack_dec = NewDec},
NewConn1 = set_stream_buf(NewConn, StreamId, <<>>),
{ok, Events, NewConn1, Actions ++ DecActions};
{error, Reason} ->
{error,
{connection_error, qpack_encoder_stream_error,
iolist_to_binary(
io_lib:format("QPACK encoder stream error: ~p", [Reason])
)}}
end
end.
-spec recv_new_uni_stream(conn(), nhttp_lib:stream_id(), binary(), fin()) ->
{ok, [event()], conn(), [action()]} | {error, h3_error()}.
recv_new_uni_stream(Conn, StreamId, Data, Fin) ->
Bufs = Conn#h3_conn.uni_stream_bufs,
Combined = prepend(maps:get(StreamId, Bufs, <<>>), Data),
case nquic_varint:decode(Combined) of
{ok, Type, Rest} ->
NewConn = Conn#h3_conn{
uni_stream_bufs = maps:remove(StreamId, Bufs)
},
register_uni_stream(NewConn, StreamId, Type, Rest, Fin);
{error, incomplete_binary} ->
case Fin of
fin ->
{ok, [], Conn, []};
nofin ->
NewConn = Conn#h3_conn{
uni_stream_bufs = Bufs#{StreamId => Combined}
},
{ok, [], NewConn, []}
end
end.
-spec recv_request_frames(conn(), nhttp_lib:stream_id(), binary(), fin(), [event()], [action()]) ->
{ok, [event()], conn(), [action()]} | {error, h3_error()}.
recv_request_frames(Conn, StreamId, Data, Fin, Events, Actions) ->
case nhttp_h3_frame:decode(Data) of
{ok, Frame, Rest} ->
IsFinal = Fin =:= fin andalso Rest =:= <<>>,
case process_request_frame(Conn, StreamId, Frame, IsFinal) of
{ok, NewConn, NewEvents, NewActions} ->
recv_request_frames(
NewConn,
StreamId,
Rest,
Fin,
lists:reverse(NewEvents, Events),
lists:reverse(NewActions, Actions)
);
{error, _} = E ->
E
end;
{more, _} ->
NewConn = set_stream_buf(Conn, StreamId, Data),
{ok, lists:reverse(Events), NewConn, lists:reverse(Actions)};
{error, h3_frame_unexpected} ->
{error,
{connection_error, h3_frame_unexpected,
<<"Forbidden H2 frame on request stream (RFC 9114 Section 7.2.8)">>}};
{error, ErrorCode} ->
{error, {connection_error, ErrorCode, <<"Frame error on request stream">>}}
end.
-spec recv_request_stream(conn(), nhttp_lib:stream_id(), binary(), fin()) ->
{ok, [event()], conn(), [action()]} | {error, h3_error()}.
recv_request_stream(Conn, StreamId, Data, Fin) ->
Combined = prepend_stream_buf(Conn, StreamId, Data),
recv_request_frames(Conn, StreamId, Combined, Fin, [], []).
-spec register_uni_stream(conn(), nhttp_lib:stream_id(), non_neg_integer(), binary(), fin()) ->
{ok, [event()], conn(), [action()]} | {error, h3_error()}.
register_uni_stream(
#h3_conn{peer_control_stream = undefined} = Conn,
StreamId,
?UNI_CONTROL,
Rest,
Fin
) ->
NewConn = Conn#h3_conn{peer_control_stream = StreamId},
recv_control_stream(NewConn, StreamId, Rest, Fin);
register_uni_stream(
#h3_conn{peer_control_stream = Existing},
_StreamId,
?UNI_CONTROL,
_Rest,
_Fin
) when Existing =/= undefined ->
{error,
{connection_error, h3_stream_creation_error,
<<"Duplicate control stream (RFC 9114 Section 6.2.1)">>}};
register_uni_stream(
#h3_conn{peer_encoder_stream = undefined} = Conn,
StreamId,
?UNI_QPACK_ENCODER,
Rest,
Fin
) ->
NewConn = Conn#h3_conn{peer_encoder_stream = StreamId},
recv_encoder_stream(NewConn, StreamId, Rest, Fin);
register_uni_stream(
#h3_conn{peer_encoder_stream = Existing},
_StreamId,
?UNI_QPACK_ENCODER,
_Rest,
_Fin
) when Existing =/= undefined ->
{error,
{connection_error, h3_stream_creation_error,
<<"Duplicate QPACK encoder stream (RFC 9114 Section 6.2.1)">>}};
register_uni_stream(
#h3_conn{peer_decoder_stream = undefined} = Conn,
StreamId,
?UNI_QPACK_DECODER,
Rest,
Fin
) ->
NewConn = Conn#h3_conn{peer_decoder_stream = StreamId},
recv_decoder_stream(NewConn, StreamId, Rest, Fin);
register_uni_stream(
#h3_conn{peer_decoder_stream = Existing},
_StreamId,
?UNI_QPACK_DECODER,
_Rest,
_Fin
) when Existing =/= undefined ->
{error,
{connection_error, h3_stream_creation_error,
<<"Duplicate QPACK decoder stream (RFC 9114 Section 6.2.1)">>}};
register_uni_stream(Conn, StreamId, ?UNI_PUSH, _Rest, _Fin) ->
case Conn#h3_conn.role of
server ->
{error,
{connection_error, h3_stream_creation_error,
<<"Server received push stream (RFC 9114 Section 6.2.2)">>}};
client ->
NewConn = add_ignored_uni(Conn, StreamId),
{ok, [], NewConn, []}
end;
register_uni_stream(Conn, StreamId, _Type, _Rest, _Fin) ->
NewConn = add_ignored_uni(Conn, StreamId),
{ok, [], NewConn, []}.
%%%-----------------------------------------------------------------------------
%% INTERNAL: HEADER DECODE / VALIDATE
%%%-----------------------------------------------------------------------------
-spec decode_headers(conn(), nhttp_lib:stream_id(), #h3_stream{}, binary(), boolean()) ->
{ok, conn(), [event()], [action()]} | {error, h3_error()}.
decode_headers(
#h3_conn{qpack_dec = Dec, local_decoder_stream = DecStreamId} = Conn,
StreamId,
Stream,
FieldSection,
IsFinal
) ->
case nhttp_qpack:decode_field_section(Dec, StreamId, FieldSection) of
{ok, NewDec, DecStreamData, Headers} ->
FinAtom = bool_to_fin(IsFinal),
case
validate_headers(
Conn#h3_conn.role, Headers, false, Conn#h3_conn.local_settings
)
of
ok ->
NewState = transition_on_recv(Stream#h3_stream.state, FinAtom),
ContentLength = nhttp_msg:extract_content_length(Headers),
NewStream = Stream#h3_stream{
state = NewState,
headers_received = true,
content_length = ContentLength
},
NewConn0 = Conn#h3_conn{qpack_dec = NewDec},
NewConn = update_stream(NewConn0, StreamId, NewStream),
NewConn1 = update_last_peer_stream(NewConn, StreamId),
DecActions =
case iolist_size(DecStreamData) of
0 -> [];
_ -> [{send, DecStreamId, DecStreamData}]
end,
Event = build_initial_headers_event(
Conn#h3_conn.role, StreamId, Headers, FinAtom, NewConn1
),
{ok, NewConn1, [Event], DecActions};
{error, Reason} ->
{error, {stream_error, StreamId, h3_message_error, Reason}}
end;
{blocked, NewDec} ->
NewConn = Conn#h3_conn{qpack_dec = NewDec},
{ok, NewConn, [], []};
{error, Reason} ->
{error,
{connection_error, qpack_decompression_failed,
iolist_to_binary(
io_lib:format("QPACK decode error: ~p", [Reason])
)}}
end.
-spec decode_trailers(conn(), nhttp_lib:stream_id(), #h3_stream{}, binary(), boolean()) ->
{ok, conn(), [event()], [action()]} | {error, h3_error()}.
decode_trailers(
#h3_conn{qpack_dec = Dec, local_decoder_stream = DecStreamId} = Conn,
StreamId,
Stream,
FieldSection,
IsFinal
) ->
case nhttp_qpack:decode_field_section(Dec, StreamId, FieldSection) of
{ok, NewDec, DecStreamData, Trailers} ->
case
validate_headers(
Conn#h3_conn.role, Trailers, true, Conn#h3_conn.local_settings
)
of
ok ->
FinAtom = bool_to_fin(IsFinal),
NewState = transition_on_recv(Stream#h3_stream.state, FinAtom),
NewStream = Stream#h3_stream{
state = NewState,
trailers_received = true
},
NewConn0 = Conn#h3_conn{qpack_dec = NewDec},
NewConn = update_stream(NewConn0, StreamId, NewStream),
DecActions =
case iolist_size(DecStreamData) of
0 -> [];
_ -> [{send, DecStreamId, DecStreamData}]
end,
{ok, NewConn, [{trailers, StreamId, Trailers}], DecActions};
{error, Reason} ->
{error, {stream_error, StreamId, h3_message_error, Reason}}
end;
{blocked, NewDec} ->
NewConn = Conn#h3_conn{qpack_dec = NewDec},
{ok, NewConn, [], []};
{error, Reason} ->
{error,
{connection_error, qpack_decompression_failed,
iolist_to_binary(
io_lib:format("QPACK decode error: ~p", [Reason])
)}}
end.
-spec validate_headers(role(), nhttp_lib:headers(), boolean(), h3_settings()) ->
ok | {error, binary()}.
validate_headers(_, Headers, true, _Settings) ->
validate_trailers(Headers);
validate_headers(server, Headers, false, Settings) ->
validate_request_headers(Headers, Settings);
validate_headers(client, Headers, false, _Settings) ->
validate_response_headers(Headers).
-define(H3_CONNECTION_HEADERS_SET, ?NHTTP_MSG_CONNECTION_HEADERS_SET).
-spec build_initial_headers_event(
role(), nhttp_lib:stream_id(), nhttp_lib:headers(), fin(), conn()
) -> event().
build_initial_headers_event(server, StreamId, Headers, Fin, Conn) ->
Request = build_request(Conn, Headers),
{request, StreamId, Request, Fin};
build_initial_headers_event(client, StreamId, Headers, Fin, _Conn) ->
Response = build_response(Headers),
{response, StreamId, Response, Fin}.
-spec build_request(conn(), nhttp_lib:headers()) -> nhttp_lib:request().
build_request(#h3_conn{peer = Peer}, Headers) ->
nhttp_msg:build_request(http3, Peer, Headers).
-spec build_response(nhttp_lib:headers()) -> nhttp_lib:response().
build_response(Headers) ->
nhttp_msg:build_response(http3, Headers).
-spec check_extended_connect(
binary() | undefined, binary() | undefined, binary() | undefined, h3_settings()
) -> ok | {error, binary()}.
check_extended_connect(Method, Protocol, Authority, Settings) ->
case nhttp_msg:check_extended_connect(Method, Protocol, Authority, Settings) of
ok ->
ok;
{error, missing_authority} ->
{error, <<"Extended CONNECT requires :authority (RFC 9220 Section 3)">>};
{error, not_enabled} ->
{error,
<<":protocol received without SETTINGS_ENABLE_CONNECT_PROTOCOL=1 (RFC 9220 Section 3)">>};
{error, bad_method} ->
{error, <<":protocol pseudo-header requires :method=CONNECT (RFC 8441 Section 4)">>}
end.
-spec finalise_request_headers(nhttp_msg:request_shape(), h3_settings()) ->
ok | {error, binary()}.
finalise_request_headers(#{headers := Headers} = Shape, Settings) ->
case has_te(Headers) of
true ->
{error, <<"te header forbidden in HTTP/3 (RFC 9114 Section 4.2)">>};
false ->
#{
method := Method,
scheme := Scheme,
authority := Authority,
host := Host,
protocol := Protocol
} = Shape,
case
requires_authority(Scheme) andalso Authority =:= undefined andalso
Host =:= undefined
of
true ->
{error,
<<"Missing :authority or Host for authority-requiring scheme (RFC 9114 Section 4.3.1.2)">>};
false ->
check_extended_connect(Method, Protocol, Authority, Settings)
end
end.
-spec has_te(nhttp_lib:headers()) -> boolean().
has_te([]) -> false;
has_te([{<<"te">>, _} | _]) -> true;
has_te([_ | Rest]) -> has_te(Rest).
-spec shape_error_message(nhttp_msg:request_shape_error()) -> binary().
shape_error_message(missing_required_pseudo) ->
<<"Missing required pseudo-headers (RFC 9114 Section 4.3.1)">>;
shape_error_message(bad_wire_scheme) ->
<<":scheme must be http or https on the wire (RFC 9114 Section 4.3.1)">>;
shape_error_message(authority_host_mismatch) ->
<<":authority and Host mismatch (RFC 9114 Section 4.3.1)">>;
shape_error_message(duplicate_pseudo) ->
<<"Duplicate pseudo-header">>;
shape_error_message(unknown_pseudo) ->
<<"Unknown pseudo-header">>;
shape_error_message(pseudo_after_regular) ->
<<"Pseudo-header after regular header">>;
shape_error_message(forbidden_connection_header) ->
<<"Forbidden connection-specific header">>;
shape_error_message(multiple_host_headers) ->
<<"Multiple Host headers">>.
-spec validate_request_headers(nhttp_lib:headers(), h3_settings()) ->
ok | {error, binary()}.
validate_request_headers(Headers, Settings) ->
case nhttp_msg:validate_request_pseudo_shape(Headers) of
{error, Reason} ->
{error, shape_error_message(Reason)};
{ok, Shape} ->
finalise_request_headers(Shape, Settings)
end.
-spec validate_response_headers(nhttp_lib:headers()) -> ok | {error, binary()}.
validate_response_headers(Headers) ->
validate_response_headers_loop(Headers, #{phase => pseudo, has_status => false}).
-spec validate_response_headers_loop(nhttp_lib:headers(), map()) ->
ok | {error, binary()}.
validate_response_headers_loop([], #{has_status := true}) ->
ok;
validate_response_headers_loop([], #{has_status := false}) ->
{error, <<"Missing :status pseudo-header">>};
validate_response_headers_loop(
[{<<":status">>, _} | Rest], #{phase := pseudo, has_status := false} = State
) ->
validate_response_headers_loop(Rest, State#{has_status => true});
validate_response_headers_loop([{<<":status">>, _} | _], #{has_status := true}) ->
{error, <<"Duplicate :status">>};
validate_response_headers_loop([{<<$:, _/binary>>, _} | _], _) ->
{error, <<"Invalid pseudo-header in response">>};
validate_response_headers_loop([{Name, _Value} | Rest], State) ->
case maps:is_key(Name, ?H3_CONNECTION_HEADERS_SET) of
true ->
{error, <<"Forbidden connection-specific header">>};
false ->
case Name of
<<"te">> ->
{error, <<"te header forbidden in HTTP/3 (RFC 9114 Section 4.2)">>};
_ ->
validate_response_headers_loop(Rest, State#{phase => regular})
end
end.
-spec validate_trailers(nhttp_lib:headers()) -> ok | {error, binary()}.
validate_trailers(Headers) ->
case nhttp_msg:validate_trailers(Headers) of
ok ->
ok;
{error, pseudo_in_trailers} ->
{error, <<"Pseudo-header in trailers (RFC 9114 Section 4.1)">>}
end.
%%%-----------------------------------------------------------------------------
%% INTERNAL: GOAWAY / PUSH
%%%-----------------------------------------------------------------------------
-spec process_goaway(conn(), non_neg_integer()) ->
{ok, conn(), [event()], [action()]} | {error, h3_error()}.
process_goaway(#h3_conn{goaway_received = true, goaway_id = PrevId}, Id) when
Id > PrevId
->
{error,
{connection_error, h3_id_error, <<"GOAWAY ID must not increase (RFC 9114 Section 5.2)">>}};
process_goaway(Conn, Id) ->
NewConn = Conn#h3_conn{
goaway_received = true,
goaway_id = Id,
state = closing
},
{ok, NewConn, [{goaway, Id, h3_no_error, <<>>}], []}.
-spec process_max_push_id(conn(), non_neg_integer()) ->
{ok, conn(), [event()], [action()]} | {error, h3_error()}.
process_max_push_id(#h3_conn{role = client}, _PushId) ->
{error,
{connection_error, h3_frame_unexpected,
<<"Client received MAX_PUSH_ID (RFC 9114 Section 7.2.7)">>}};
process_max_push_id(#h3_conn{max_push_id = Current}, PushId) when PushId < Current ->
{error,
{connection_error, h3_id_error,
<<"MAX_PUSH_ID must not decrease (RFC 9114 Section 7.2.7)">>}};
process_max_push_id(Conn, PushId) ->
{ok, Conn#h3_conn{max_push_id = PushId}, [], []}.
-spec process_push_promise(conn(), nhttp_lib:stream_id(), non_neg_integer(), binary()) ->
{ok, conn(), [event()], [action()]} | {error, h3_error()}.
process_push_promise(#h3_conn{role = server}, _StreamId, _PushId, _FieldSection) ->
{error,
{connection_error, h3_frame_unexpected,
<<"Server received PUSH_PROMISE (RFC 9114 Section 7.2.5)">>}};
process_push_promise(
#h3_conn{qpack_dec = Dec, local_decoder_stream = DecStreamId} = Conn,
StreamId,
PushId,
FieldSection
) ->
case nhttp_qpack:decode_field_section(Dec, StreamId, FieldSection) of
{ok, NewDec, DecStreamData, Headers} ->
NewConn = Conn#h3_conn{qpack_dec = NewDec},
DecActions =
case iolist_size(DecStreamData) of
0 -> [];
_ -> [{send, DecStreamId, DecStreamData}]
end,
{ok, NewConn, [{push_promise, StreamId, PushId, Headers}], DecActions};
{blocked, NewDec} ->
{ok, Conn#h3_conn{qpack_dec = NewDec}, [], []};
{error, Reason} ->
{error,
{connection_error, qpack_decompression_failed,
iolist_to_binary(
io_lib:format("QPACK decode error in PUSH_PROMISE: ~p", [Reason])
)}}
end.
%%%-----------------------------------------------------------------------------
%% INTERNAL: SETTINGS
%%%-----------------------------------------------------------------------------
-spec apply_peer_settings(conn(), h3_settings()) -> conn().
apply_peer_settings(#h3_conn{qpack_enc = Enc} = Conn, PeerSettings) ->
PeerCap = maps:get(qpack_max_table_capacity, PeerSettings, 0),
PeerBlocked = maps:get(qpack_blocked_streams, PeerSettings, 0),
Enc1 = nhttp_qpack:reconcile_peer_limits(PeerCap, PeerBlocked, Enc),
Conn#h3_conn{
peer_settings = PeerSettings,
settings_received = true,
qpack_enc = Enc1
}.
-spec default_settings() -> h3_settings().
default_settings() ->
#{
qpack_max_table_capacity => 0,
qpack_blocked_streams => 0
}.
%%%-----------------------------------------------------------------------------
%% INTERNAL: STREAM HELPERS
%%%-----------------------------------------------------------------------------
-spec add_ignored_uni(conn(), nhttp_lib:stream_id()) -> conn().
add_ignored_uni(#h3_conn{ignored_uni_streams = Ignored} = Conn, StreamId) ->
Conn#h3_conn{ignored_uni_streams = Ignored#{StreamId => true}}.
-spec bool_to_fin(boolean()) -> fin().
bool_to_fin(true) -> fin;
bool_to_fin(false) -> nofin.
-spec build_response_actions(nhttp_lib:stream_id(), iodata(), nhttp_lib:stream_id(), iodata()) ->
[action()].
build_response_actions(EncStreamId, EncStreamData, StreamId, Data) ->
EncAction =
case iolist_size(EncStreamData) of
0 -> [];
_ -> [{send, EncStreamId, EncStreamData}]
end,
EncAction ++ [{send_fin, StreamId, Data}].
-spec build_send_actions(nhttp_lib:stream_id(), iodata(), nhttp_lib:stream_id(), iodata(), fin()) ->
[action()].
build_send_actions(EncStreamId, EncStreamData, StreamId, HeadersFrame, Fin) ->
EncAction =
case iolist_size(EncStreamData) of
0 -> [];
_ -> [{send, EncStreamId, EncStreamData}]
end,
HeaderAction =
case Fin of
fin -> [{send_fin, StreamId, HeadersFrame}];
nofin -> [{send, StreamId, HeadersFrame}]
end,
EncAction ++ HeaderAction.
-spec close_stream(conn(), nhttp_lib:stream_id()) -> conn().
close_stream(#h3_conn{streams = Streams, stream_bufs = Bufs} = Conn, StreamId) ->
Conn#h3_conn{
streams = maps:remove(StreamId, Streams),
stream_bufs = maps:remove(StreamId, Bufs)
}.
-spec decoder_stream_actions(
nhttp_lib:stream_id() | undefined,
[{nhttp_lib:stream_id(), iodata(), [nhttp_qpack:field_line()]}]
) -> [action()].
decoder_stream_actions(undefined, _) ->
[];
decoder_stream_actions(_DecStreamId, []) ->
[];
decoder_stream_actions(DecStreamId, Unblocked) ->
DecData = [DecStreamData || {_, DecStreamData, _} <- Unblocked],
case iolist_size(DecData) of
0 -> [];
_ -> [{send, DecStreamId, DecData}]
end.
-spec error_code_atom(non_neg_integer()) -> error_code().
error_code_atom(16#100) -> h3_no_error;
error_code_atom(16#101) -> h3_general_protocol_error;
error_code_atom(16#102) -> h3_internal_error;
error_code_atom(16#103) -> h3_stream_creation_error;
error_code_atom(16#104) -> h3_closed_critical_stream;
error_code_atom(16#105) -> h3_frame_unexpected;
error_code_atom(16#106) -> h3_frame_error;
error_code_atom(16#107) -> h3_excessive_load;
error_code_atom(16#108) -> h3_id_error;
error_code_atom(16#109) -> h3_settings_error;
error_code_atom(16#10A) -> h3_missing_settings;
error_code_atom(16#10B) -> h3_request_rejected;
error_code_atom(16#10C) -> h3_request_cancelled;
error_code_atom(16#10D) -> h3_request_incomplete;
error_code_atom(16#10E) -> h3_message_error;
error_code_atom(16#10F) -> h3_connect_error;
error_code_atom(16#110) -> h3_version_fallback;
error_code_atom(16#200) -> qpack_decompression_failed;
error_code_atom(16#201) -> qpack_encoder_stream_error;
error_code_atom(16#202) -> qpack_decoder_stream_error;
error_code_atom(N) -> N.
-spec get_or_create_stream(conn(), nhttp_lib:stream_id()) -> #h3_stream{}.
get_or_create_stream(#h3_conn{streams = Streams}, StreamId) ->
case maps:get(StreamId, Streams, undefined) of
undefined -> #h3_stream{id = StreamId};
Stream -> Stream
end.
-spec is_uni_stream(nhttp_lib:stream_id()) -> boolean().
is_uni_stream(StreamId) ->
StreamId band 2 =:= 2.
-spec prepend(binary(), binary()) -> binary().
prepend(<<>>, Data) -> Data;
prepend(Buffered, Data) -> <<Buffered/binary, Data/binary>>.
-spec prepend_stream_buf(conn(), nhttp_lib:stream_id(), binary()) -> binary().
prepend_stream_buf(#h3_conn{stream_bufs = Bufs}, StreamId, Data) ->
prepend(maps:get(StreamId, Bufs, <<>>), Data).
-spec process_unblocked([{nhttp_lib:stream_id(), iodata(), [nhttp_qpack:field_line()]}]) ->
{[event()], [action()]}.
process_unblocked([]) ->
{[], []};
process_unblocked(Unblocked) ->
Events = [{headers, StreamId, Headers, nofin} || {StreamId, _, Headers} <- Unblocked],
{Events, []}.
-spec requires_authority(binary() | undefined) -> boolean().
requires_authority(<<"http">>) -> true;
requires_authority(<<"https">>) -> true;
requires_authority(_) -> false.
-spec set_stream_buf(conn(), nhttp_lib:stream_id(), binary()) -> conn().
set_stream_buf(#h3_conn{stream_bufs = Bufs} = Conn, StreamId, <<>>) ->
Conn#h3_conn{stream_bufs = maps:remove(StreamId, Bufs)};
set_stream_buf(#h3_conn{stream_bufs = Bufs} = Conn, StreamId, Data) ->
Conn#h3_conn{stream_bufs = Bufs#{StreamId => Data}}.
-spec transition_on_recv(stream_state(), fin()) -> stream_state().
transition_on_recv(open, fin) -> half_closed_remote;
transition_on_recv(half_closed_local, fin) -> closed;
transition_on_recv(State, _) -> State.
-spec transition_on_send(stream_state(), fin()) -> stream_state().
transition_on_send(open, fin) -> half_closed_local;
transition_on_send(half_closed_remote, fin) -> closed;
transition_on_send(State, _) -> State.
-spec update_last_peer_stream(conn(), nhttp_lib:stream_id()) -> conn().
update_last_peer_stream(#h3_conn{last_peer_stream_id = Last} = Conn, StreamId) when
StreamId > Last
->
Conn#h3_conn{last_peer_stream_id = StreamId};
update_last_peer_stream(Conn, _StreamId) ->
Conn.
-spec update_stream(conn(), nhttp_lib:stream_id(), #h3_stream{}) -> conn().
update_stream(#h3_conn{streams = Streams, stream_bufs = Bufs} = Conn, StreamId, #h3_stream{
state = closed
}) ->
Conn#h3_conn{
streams = maps:remove(StreamId, Streams),
stream_bufs = maps:remove(StreamId, Bufs)
};
update_stream(#h3_conn{streams = Streams} = Conn, StreamId, Stream) ->
Conn#h3_conn{streams = Streams#{StreamId => Stream}}.
-spec validate_send_state(conn(), nhttp_lib:stream_id()) -> ok | {error, h3_error()}.
validate_send_state(#h3_conn{goaway_sent = true}, StreamId) ->
{error, {stream_error, StreamId, h3_request_rejected, <<"Connection closing">>}};
validate_send_state(#h3_conn{streams = Streams}, StreamId) ->
case maps:get(StreamId, Streams, undefined) of
undefined ->
ok;
#h3_stream{state = open} ->
ok;
#h3_stream{state = half_closed_remote} ->
ok;
_ ->
{error,
{stream_error, StreamId, h3_frame_unexpected, <<"Stream not in sendable state">>}}
end.