-module(nhttp_h2).
-moduledoc """
HTTP/2 protocol layer.
This module implements RFC 9113 HTTP/2 connection and stream state machines.
It sits between the framing layer (nhttp_h2_frame) and the application layer,
providing:
- Connection lifecycle management (preface, settings, shutdown)
- Stream state machine (RFC 9113 Section 5.1)
- Flow control (connection and stream level)
- Header block assembly (CONTINUATION handling)
- Error handling (connection vs stream errors)
## Usage
```erlang
Conn0 = nhttp_h2:new(client),
Preface = nhttp_h2:preface(Conn0),
ok = ssl:send(Socket, Preface),
{ok, Events, Conn1} = nhttp_h2:recv(Conn0, Data),
lists:foreach(fun handle_event/1, Events),
{ok, Conn2, Frames} = nhttp_h2:send_headers(Conn1, StreamId, Headers, fin),
ok = ssl:send(Socket, Frames).
```
## Sending bodies and trailers
The HTTP/2 send surface is frame-oriented. The canonical
`t:nhttp_lib:request/0` and `t:nhttp_lib:response/0` maps can carry a `body`
and `trailers` field as a convenience for "everything is in memory", but
this layer does not consume those maps directly: the caller breaks the
exchange into discrete frame sends. The pattern is:
1. `send_headers(Conn, StreamId, Headers, nofin)` to emit pseudo-headers
plus regular headers (HEADERS + optional CONTINUATION).
2. Zero or more `send_data(Conn, StreamId, Chunk, nofin)` calls.
3. Either `send_data(Conn, StreamId, FinalChunk, fin)` to close on body,
or `send_headers(Conn, StreamId, Trailers, fin)` to close on trailers.
For a one-shot send when the body is already a single `iodata()`, call
`send_headers/4` with `nofin` followed by `send_data/4` with `fin`.
Flow-control and END_STREAM ride on the DATA frame.
""".
-include("nhttp_msg.hrl").
%%%-----------------------------------------------------------------------------
%% INLINE DIRECTIVES (PERFORMANCE OPTIMIZATION)
%%%-----------------------------------------------------------------------------
-compile({inline, [is_active_state/1]}).
-compile({inline, [transition_on_recv_end_stream/2]}).
-compile({inline, [transition_on_send_end_stream/2]}).
%%%-----------------------------------------------------------------------------
%% INITIALIZATION
%%%-----------------------------------------------------------------------------
-export([
new/1,
new/2,
preface/1,
set_peer/2
]).
%%%-----------------------------------------------------------------------------
%% RECEIVING
%%%-----------------------------------------------------------------------------
-export([
recv/2
]).
%%%-----------------------------------------------------------------------------
%% SENDING
%%%-----------------------------------------------------------------------------
-export([
send_data/4,
send_goaway/3,
send_headers/4,
send_ping/2,
send_rst_stream/3,
send_window_update/3
]).
%%%-----------------------------------------------------------------------------
%% STREAM MANAGEMENT
%%%-----------------------------------------------------------------------------
-export([
open_stream/1
]).
%%%-----------------------------------------------------------------------------
%% TYPE EXPORTS
%%%-----------------------------------------------------------------------------
-export_type([
conn/0,
error_code/0,
event/0,
fin/0,
priority/0,
recv_result/0,
role/0,
send_error/0,
send_result/0,
settings/0,
stream_state/0
]).
%%%-----------------------------------------------------------------------------
%% TYPES
%%%-----------------------------------------------------------------------------
-type fin() :: nhttp_lib:fin().
-type priority() :: #{
exclusive := boolean(),
stream_dependency := nhttp_lib:stream_id(),
weight := 1..256
}.
-type error_code() :: nhttp_lib:error_code().
-type settings() :: #{
header_table_size => non_neg_integer(),
enable_push => boolean(),
max_concurrent_streams => pos_integer() | infinity,
initial_window_size => 1..16#7fffffff,
max_frame_size => 16#4000..16#ffffff,
max_header_list_size => pos_integer() | infinity,
enable_connect_protocol => boolean()
}.
-type stream_state() ::
idle
| reserved_local
| reserved_remote
| open
| half_closed_local
| half_closed_remote
| closed.
-type role() :: nhttp_lib:role().
-type event() ::
nhttp_lib:event_common()
| {stream_closed, nhttp_lib:stream_id(), error_code()}
| {stream_refused, nhttp_lib:stream_id()}
| {window_update, nhttp_lib:stream_id(), pos_integer()}
| {settings, settings()}
| settings_ack
| {ping, binary()}
| {ping_ack, binary()}.
-type recv_result() ::
{ok, [event()], conn()}
| {ok, [event()], conn(), iodata()}
| {error, nhttp_h2_frame:decode_error()}.
-type send_error() ::
connection_closing
| {unknown_stream, nhttp_lib:stream_id()}
| {stream_closed, nhttp_lib:stream_id()}
| {stream_error, nhttp_lib:stream_id(), error_code(), binary()}.
-type send_result() ::
{ok, conn()}
| {ok, conn(), iodata()}
| {partial, conn(), iodata(), binary(), fin(), Window :: integer()}
| {error, send_error()}.
%%%-----------------------------------------------------------------------------
%% INTERNAL CONSTANTS
%%%-----------------------------------------------------------------------------
-define(H2_DEFAULT_INITIAL_WINDOW_SIZE, 65535).
-define(H2_MAX_WINDOW_SIZE, 16#7fffffff).
%%%-----------------------------------------------------------------------------
%% LOCAL MACROS (RFC 9113 SECTION 6.5.2)
%%%-----------------------------------------------------------------------------
-define(H2_DEFAULT_HEADER_TABLE_SIZE, 4096).
-define(H2_DEFAULT_MAX_FRAME_SIZE, 16384).
-define(H2_DEFAULT_MAX_HEADER_LIST_SIZE, 16384).
%%%-----------------------------------------------------------------------------
%% INTERNAL RECORDS
%%%-----------------------------------------------------------------------------
-record(h2_stream, {
id :: nhttp_lib:stream_id(),
state = idle :: stream_state(),
send_window :: integer(),
recv_window :: integer(),
header_buffer = <<>> :: binary(),
header_end_stream = false :: boolean(),
content_length = undefined :: non_neg_integer() | undefined,
recv_body_length = 0 :: non_neg_integer(),
headers_received = false :: boolean()
}).
-record(h2_conn, {
role :: role(),
state = preface :: preface | open | closing | closed,
local_settings :: settings(),
peer_settings :: settings(),
settings_acked = false :: boolean(),
streams = #{} :: #{nhttp_lib:stream_id() => #h2_stream{}},
next_stream_id :: nhttp_lib:stream_id(),
last_peer_stream_id = 0 :: nhttp_lib:stream_id(),
active_stream_count = 0 :: non_neg_integer(),
send_window = ?H2_DEFAULT_INITIAL_WINDOW_SIZE :: integer(),
recv_window = ?H2_DEFAULT_INITIAL_WINDOW_SIZE :: integer(),
hpack_enc :: nhttp_hpack:state(),
hpack_dec :: nhttp_hpack:state(),
continuation_stream = undefined :: undefined | nhttp_lib:stream_id(),
buffer = <<>> :: binary(),
goaway_sent = false :: boolean(),
goaway_received = false :: boolean(),
last_good_stream_id = 0 :: nhttp_lib:stream_id(),
peer = undefined :: undefined | nhttp_lib:peer()
}).
-opaque conn() :: #h2_conn{}.
%%%-----------------------------------------------------------------------------
%% INITIALIZATION
%%%-----------------------------------------------------------------------------
-doc "Create a new HTTP/2 connection with default settings.".
-spec new(role()) -> conn().
new(Role) ->
new(Role, default_settings()).
-doc "Create a new HTTP/2 connection with custom settings.".
-spec new(role(), settings()) -> conn().
new(Role, LocalSettings) ->
MergedSettings = maps:merge(default_settings(), LocalSettings),
HeaderTableSize = maps:get(header_table_size, MergedSettings, ?H2_DEFAULT_HEADER_TABLE_SIZE),
{ok, HpackEnc} = nhttp_hpack:new(HeaderTableSize),
{ok, HpackDec} = nhttp_hpack:new(HeaderTableSize),
#h2_conn{
role = Role,
state = preface,
local_settings = MergedSettings,
peer_settings = default_settings(),
next_stream_id = initial_stream_id(Role),
hpack_enc = HpackEnc,
hpack_dec = HpackDec
}.
-doc "Generate the connection preface for this role. Client sends: magic + SETTINGS. Server sends: SETTINGS.".
-spec preface(conn()) -> iodata().
preface(#h2_conn{role = client, local_settings = Settings}) ->
{ok, Preface} = nhttp_h2_frame:preface(),
{ok, SettingsFrame} = nhttp_h2_frame:settings(Settings),
[Preface, SettingsFrame];
preface(#h2_conn{role = server, local_settings = Settings}) ->
{ok, SettingsFrame} = nhttp_h2_frame:settings(Settings),
SettingsFrame.
-doc """
Record the peer address on the connection. Called once after the socket is
accepted (or connected) so server-built `t:nhttp_lib:request/0` maps and
client-built `t:nhttp_lib:response/0` events carry the correct remote peer.
""".
-spec set_peer(conn(), nhttp_lib:peer()) -> conn().
set_peer(#h2_conn{} = Conn, {{_, _, _, _}, Port} = Peer) when
is_integer(Port), Port >= 0, Port =< 65535
->
Conn#h2_conn{peer = Peer};
set_peer(#h2_conn{} = Conn, {{_, _, _, _, _, _, _, _}, Port} = Peer) when
is_integer(Port), Port >= 0, Port =< 65535
->
Conn#h2_conn{peer = Peer}.
%%%-----------------------------------------------------------------------------
%% RECEIVING
%%%-----------------------------------------------------------------------------
-doc "Process incoming data and return events. May return frames to send (e.g., SETTINGS_ACK, PING_ACK, WINDOW_UPDATE).".
-spec recv(conn(), binary()) -> recv_result().
recv(#h2_conn{buffer = <<>>} = Conn, Data) ->
recv_loop(Conn, Data, [], []);
recv(#h2_conn{buffer = Buffer} = Conn, Data) ->
recv_loop(Conn#h2_conn{buffer = <<>>}, <<Buffer/binary, Data/binary>>, [], []).
%%%-----------------------------------------------------------------------------
%% SENDING
%%%-----------------------------------------------------------------------------
-doc "Send DATA frame(s).".
-spec send_data(conn(), nhttp_lib:stream_id(), iodata(), fin()) -> send_result().
send_data(#h2_conn{} = Conn, StreamId, Data, EndStream) ->
case validate_send_data(Conn, StreamId) of
ok ->
do_send_data(Conn, StreamId, Data, EndStream);
{error, _} = Error ->
Error
end.
-doc "Send GOAWAY to initiate graceful shutdown.".
-spec send_goaway(conn(), error_code(), binary()) -> send_result().
send_goaway(#h2_conn{last_peer_stream_id = LastStreamId} = Conn, ErrorCode, DebugData) ->
{ok, Frame} = nhttp_h2_frame:goaway(LastStreamId, ErrorCode, DebugData),
NewConn = Conn#h2_conn{
goaway_sent = true,
state = closing,
last_good_stream_id = LastStreamId
},
{ok, NewConn, Frame}.
-doc "Send HEADERS frame for a new request/response or trailers.".
-spec send_headers(conn(), nhttp_lib:stream_id(), nhttp_lib:headers(), fin()) -> send_result().
send_headers(#h2_conn{} = Conn, StreamId, Headers, EndStream) ->
case validate_send_headers(Conn, StreamId) of
ok ->
do_send_headers(Conn, StreamId, Headers, EndStream);
{error, _} = Error ->
Error
end.
-doc """
Send PING frame with caller-supplied 8-byte opaque data. The caller is
responsible for generating the opaque value (e.g. via
`crypto:strong_rand_bytes(8)`) and matching it against the PING_ACK event.
""".
-spec send_ping(conn(), <<_:64>>) -> send_result().
send_ping(#h2_conn{} = Conn, <<_:8/binary>> = OpaqueData) ->
{ok, Frame} = nhttp_h2_frame:ping(OpaqueData),
{ok, Conn, Frame}.
-doc "Send RST_STREAM to cancel a stream.".
-spec send_rst_stream(conn(), nhttp_lib:stream_id(), error_code()) -> send_result().
send_rst_stream(#h2_conn{} = Conn, StreamId, ErrorCode) ->
{ok, Frame} = nhttp_h2_frame:rst_stream(StreamId, ErrorCode),
NewConn = close_stream(Conn, StreamId),
{ok, NewConn, Frame}.
-doc "Send WINDOW_UPDATE for connection or stream.".
-spec send_window_update(conn(), nhttp_lib:stream_id() | connection, pos_integer()) ->
send_result().
send_window_update(#h2_conn{} = Conn, connection, Increment) when
Increment > 0, Increment =< ?H2_MAX_WINDOW_SIZE
->
{ok, Frame} = nhttp_h2_frame:window_update(Increment),
NewConn = Conn#h2_conn{recv_window = Conn#h2_conn.recv_window + Increment},
{ok, NewConn, Frame};
send_window_update(#h2_conn{streams = Streams} = Conn, StreamId, Increment) when
Increment > 0, Increment =< ?H2_MAX_WINDOW_SIZE
->
case maps:get(StreamId, Streams, undefined) of
undefined ->
{error, {stream_error, StreamId, protocol_error, <<"Unknown stream">>}};
#h2_stream{recv_window = RecvWindow} = Stream ->
{ok, Frame} = nhttp_h2_frame:window_update(StreamId, Increment),
NewStream = Stream#h2_stream{recv_window = RecvWindow + Increment},
NewConn = Conn#h2_conn{streams = Streams#{StreamId => NewStream}},
{ok, NewConn, Frame}
end.
%%%-----------------------------------------------------------------------------
%% STREAM MANAGEMENT
%%%-----------------------------------------------------------------------------
-spec check_peer_max_streams(non_neg_integer(), settings()) ->
ok | {error, max_streams_reached}.
check_peer_max_streams(ActiveCount, PeerSettings) ->
case maps:get(max_concurrent_streams, PeerSettings, infinity) of
infinity -> ok;
Max when ActiveCount >= Max -> {error, max_streams_reached};
_ -> ok
end.
-doc "Open a new stream and return its ID.".
-spec open_stream(conn()) ->
{ok, nhttp_lib:stream_id(), conn()} | {error, connection_closing | max_streams_reached}.
open_stream(#h2_conn{goaway_sent = true}) ->
{error, connection_closing};
open_stream(#h2_conn{goaway_received = true}) ->
{error, connection_closing};
open_stream(
#h2_conn{
next_stream_id = StreamId,
streams = Streams,
peer_settings = PeerSettings,
active_stream_count = ActiveCount
} = Conn
) ->
case check_peer_max_streams(ActiveCount, PeerSettings) of
ok ->
InitialWindow = maps:get(
initial_window_size, PeerSettings, ?H2_DEFAULT_INITIAL_WINDOW_SIZE
),
Stream = #h2_stream{
id = StreamId,
state = idle,
send_window = InitialWindow,
recv_window = ?H2_DEFAULT_INITIAL_WINDOW_SIZE
},
NewConn = Conn#h2_conn{
next_stream_id = StreamId + 2,
streams = Streams#{StreamId => Stream}
},
{ok, StreamId, NewConn};
{error, _} = Error ->
Error
end.
%%%-----------------------------------------------------------------------------
%% INTERNAL FUNCTIONS
%%%-----------------------------------------------------------------------------
-spec check_header_block_size(conn(), non_neg_integer()) -> ok | {error, exceeded}.
check_header_block_size(#h2_conn{local_settings = Settings}, Size) ->
case maps:get(max_header_list_size, Settings, infinity) of
infinity -> ok;
Max when Size =< Max -> ok;
_ -> {error, exceeded}
end.
-spec check_stream_concurrency(conn()) -> ok | {error, at_limit}.
check_stream_concurrency(#h2_conn{
active_stream_count = Count,
local_settings = Settings
}) ->
MaxStreams = maps:get(max_concurrent_streams, Settings, 100),
case MaxStreams of
infinity -> ok;
Max when Count >= Max -> {error, at_limit};
_ -> ok
end.
-spec close_stream(conn(), nhttp_lib:stream_id()) -> conn().
close_stream(#h2_conn{streams = Streams, active_stream_count = Count} = Conn, StreamId) ->
case maps:get(StreamId, Streams, undefined) of
undefined ->
Conn;
#h2_stream{state = closed} ->
Conn#h2_conn{streams = maps:remove(StreamId, Streams)};
#h2_stream{state = OldState} ->
NewCount =
case is_active_state(OldState) of
true -> max(0, Count - 1);
false -> Count
end,
Conn#h2_conn{
streams = maps:remove(StreamId, Streams),
active_stream_count = NewCount
}
end.
-spec decode_and_emit_headers(conn(), nhttp_lib:stream_id(), fin(), binary()) ->
{ok, conn(), [event()], iodata()} | {error, nhttp_h2_frame:decode_error()}.
decode_and_emit_headers(
#h2_conn{hpack_dec = HpackDec, streams = Streams} = Conn, StreamId, EndStream, HeaderBlock
) ->
DecodeOpts = hpack_decode_opts(Conn),
case maps:is_key(StreamId, Streams) of
true ->
decode_headers_internal(Conn, StreamId, EndStream, HeaderBlock);
false ->
case validate_peer_stream_id(Conn, StreamId) of
ok ->
case check_stream_concurrency(Conn) of
ok ->
decode_headers_internal(Conn, StreamId, EndStream, HeaderBlock);
{error, at_limit} ->
_ = nhttp_hpack:decode(HeaderBlock, HpackDec, DecodeOpts),
{ok, RstFrame} = nhttp_h2_frame:rst_stream(StreamId, refused_stream),
{ok, Conn, [{stream_refused, StreamId}], RstFrame}
end;
{error, _} = Error ->
_ = nhttp_hpack:decode(HeaderBlock, HpackDec, DecodeOpts),
Error
end
end.
-spec decode_headers_internal(conn(), nhttp_lib:stream_id(), fin(), binary()) ->
{ok, conn(), [event()], iodata()} | {error, nhttp_h2_frame:decode_error()}.
decode_headers_internal(
#h2_conn{role = Role, hpack_dec = HpackDec, streams = Streams} = Conn,
StreamId,
EndStream,
HeaderBlock
) ->
DecodeOpts = hpack_decode_opts(Conn),
case nhttp_hpack:decode(HeaderBlock, HpackDec, DecodeOpts) of
{ok, Headers, NewHpackDec} ->
Stream = get_or_create_stream(Conn, StreamId),
StreamState = Stream#h2_stream.state,
case StreamState of
half_closed_remote ->
{error,
{stream_error, StreamId, stream_closed,
<<"HEADERS on half-closed (remote) stream (RFC 9113 Section 5.1)">>}};
closed ->
{error,
{connection_error, stream_closed,
<<"HEADERS on closed stream (RFC 9113 Section 5.1)">>}};
_ ->
IsTrailers = Stream#h2_stream.headers_received,
case IsTrailers andalso EndStream =:= nofin of
true ->
{error,
{stream_error, StreamId, protocol_error,
<<"Second HEADERS without END_STREAM (RFC 9113 Section 8.1)">>}};
false ->
ValidationResult = validate_decoded_headers(
Role, Headers, IsTrailers, Conn#h2_conn.local_settings
),
case ValidationResult of
ok ->
OldState = Stream#h2_stream.state,
NewState = transition_on_recv_headers(OldState, EndStream),
ContentLength =
case IsTrailers of
true -> Stream#h2_stream.content_length;
false -> nhttp_msg:extract_content_length(Headers)
end,
NewStream = Stream#h2_stream{
state = NewState,
header_buffer = <<>>,
header_end_stream = false,
content_length = ContentLength,
headers_received = true
},
ActiveCount = Conn#h2_conn.active_stream_count,
NewActiveCount = update_active_count_on_transition(
OldState, NewState, ActiveCount
),
NewConn = Conn#h2_conn{
hpack_dec = NewHpackDec,
streams = store_or_remove_stream(
Streams, StreamId, NewStream
),
last_peer_stream_id = max(
Conn#h2_conn.last_peer_stream_id, StreamId
),
active_stream_count = NewActiveCount
},
Events = build_headers_event(
Role, IsTrailers, StreamId, Headers, EndStream, NewConn
),
{ok, NewConn, Events, []};
{error, protocol_error} ->
{error,
{stream_error, StreamId, protocol_error,
<<"Invalid header field (RFC 9113 Section 8.1)">>}}
end
end
end;
{error, uppercase_header_name} ->
{error,
{stream_error, StreamId, protocol_error,
<<"Uppercase header name (RFC 9113 Section 8.2)">>}};
{error, header_list_too_large} ->
{error,
{connection_error, enhance_your_calm, <<
"Decoded header list exceeds SETTINGS_MAX_HEADER_LIST_SIZE "
"(RFC 9113 Section 10.5.1)"
>>}};
{error, HpackError} ->
{error,
{connection_error, compression_error,
iolist_to_binary(io_lib:format("HPACK decode error: ~p", [HpackError]))}}
end.
-spec default_settings() -> settings().
default_settings() ->
#{
header_table_size => ?H2_DEFAULT_HEADER_TABLE_SIZE,
initial_window_size => ?H2_DEFAULT_INITIAL_WINDOW_SIZE,
max_frame_size => ?H2_DEFAULT_MAX_FRAME_SIZE,
max_concurrent_streams => 100,
max_header_list_size => ?H2_DEFAULT_MAX_HEADER_LIST_SIZE
}.
-spec do_send_data(conn(), nhttp_lib:stream_id(), iodata(), fin()) -> send_result().
do_send_data(
#h2_conn{streams = Streams, send_window = ConnWindow, peer_settings = PeerSettings} = Conn,
StreamId,
Data,
EndStream
) ->
#h2_stream{send_window = StreamWindow} = Stream = maps:get(StreamId, Streams),
DataSize = iolist_size(Data),
EffectiveWindow = min(ConnWindow, StreamWindow),
MaxFrameSize = maps:get(max_frame_size, PeerSettings, ?H2_DEFAULT_MAX_FRAME_SIZE),
MaxSend = min(EffectiveWindow, MaxFrameSize),
case DataSize =< MaxSend of
true when EffectiveWindow > 0 ->
{ok, Frame} = nhttp_h2_frame:data(StreamId, EndStream, Data),
OldState = Stream#h2_stream.state,
NewState = transition_on_send_end_stream(OldState, EndStream),
NewStream = Stream#h2_stream{
state = NewState,
send_window = StreamWindow - DataSize
},
ActiveCount = Conn#h2_conn.active_stream_count,
NewActiveCount = update_active_count_on_transition(OldState, NewState, ActiveCount),
NewConn = Conn#h2_conn{
streams = store_or_remove_stream(Streams, StreamId, NewStream),
send_window = ConnWindow - DataSize,
active_stream_count = NewActiveCount
},
{ok, NewConn, Frame};
true when EffectiveWindow =< 0 ->
{partial, Conn, [], <<>>, EndStream, EffectiveWindow};
false when MaxSend > 0 ->
DataBin = iolist_to_binary(Data),
<<ToSend:MaxSend/binary, Remaining/binary>> = DataBin,
{ok, Frame} = nhttp_h2_frame:data(StreamId, nofin, ToSend),
NewStream = Stream#h2_stream{
send_window = StreamWindow - MaxSend
},
NewConn = Conn#h2_conn{
streams = store_or_remove_stream(Streams, StreamId, NewStream),
send_window = ConnWindow - MaxSend
},
{partial, NewConn, Frame, Remaining, EndStream, EffectiveWindow - MaxSend};
false ->
{partial, Conn, [], iolist_to_binary(Data), EndStream, EffectiveWindow}
end.
-spec do_send_headers(conn(), nhttp_lib:stream_id(), nhttp_lib:headers(), fin()) -> send_result().
do_send_headers(
#h2_conn{hpack_enc = HpackEnc, streams = Streams, peer_settings = PeerSettings} = Conn,
StreamId,
Headers,
EndStream
) ->
{ok, HeaderBlock, NewHpackEnc} = nhttp_hpack:encode(Headers, HpackEnc),
MaxFrameSize = maps:get(max_frame_size, PeerSettings, ?H2_DEFAULT_MAX_FRAME_SIZE),
{ok, Frame} = nhttp_h2_frame:headers_with_continuation(
StreamId, EndStream, HeaderBlock, MaxFrameSize
),
Stream = get_or_create_stream(Conn, StreamId),
OldState = Stream#h2_stream.state,
NewState = transition_on_send_headers(OldState, EndStream),
NewStream = Stream#h2_stream{state = NewState},
ActiveCount = Conn#h2_conn.active_stream_count,
NewActiveCount = update_active_count_on_transition(OldState, NewState, ActiveCount),
NewConn = Conn#h2_conn{
hpack_enc = NewHpackEnc,
streams = store_or_remove_stream(Streams, StreamId, NewStream),
active_stream_count = NewActiveCount
},
{ok, NewConn, Frame}.
-spec get_or_create_stream(conn(), nhttp_lib:stream_id()) -> #h2_stream{}.
get_or_create_stream(#h2_conn{streams = Streams, peer_settings = PeerSettings}, StreamId) ->
case maps:get(StreamId, Streams, undefined) of
undefined ->
InitialWindow = maps:get(
initial_window_size, PeerSettings, ?H2_DEFAULT_INITIAL_WINDOW_SIZE
),
#h2_stream{
id = StreamId,
state = idle,
send_window = InitialWindow,
recv_window = ?H2_DEFAULT_INITIAL_WINDOW_SIZE
};
Stream ->
Stream
end.
-spec header_block_too_large_error() -> {error, nhttp_h2_frame:decode_error()}.
header_block_too_large_error() ->
{error,
{connection_error, enhance_your_calm,
<<"Header block exceeds SETTINGS_MAX_HEADER_LIST_SIZE (RFC 9113 Section 10.5.1)">>}}.
-spec hpack_decode_opts(conn()) -> nhttp_hpack:decode_opts().
hpack_decode_opts(#h2_conn{local_settings = Settings}) ->
case maps:get(max_header_list_size, Settings, infinity) of
infinity -> #{};
Max -> #{max_list_size => Max}
end.
-spec initial_stream_id(role()) -> nhttp_lib:stream_id().
initial_stream_id(client) -> 1;
initial_stream_id(server) -> 2.
-spec is_active_state(stream_state()) -> boolean().
is_active_state(open) -> true;
is_active_state(half_closed_local) -> true;
is_active_state(half_closed_remote) -> true;
is_active_state(_) -> false.
-spec process_continuation(conn(), nhttp_lib:stream_id(), fin(), binary()) ->
{ok, conn(), [event()], iodata()} | {error, nhttp_h2_frame:decode_error()}.
process_continuation(
#h2_conn{continuation_stream = undefined}, StreamId, _EndHeaders, _HeaderBlock
) ->
{error,
{connection_error, protocol_error,
<<"Unexpected CONTINUATION for stream ", (integer_to_binary(StreamId))/binary>>}};
process_continuation(
#h2_conn{continuation_stream = StreamId} = Conn, StreamId, EndHeaders, HeaderBlock
) ->
#h2_stream{header_buffer = Buffer, header_end_stream = EndStream} =
Stream =
maps:get(StreamId, Conn#h2_conn.streams),
NewBuffer = <<Buffer/binary, HeaderBlock/binary>>,
case check_header_block_size(Conn, byte_size(NewBuffer)) of
ok ->
case EndHeaders of
fin ->
NewConn = Conn#h2_conn{continuation_stream = undefined},
EndStreamFin =
case EndStream of
true -> fin;
false -> nofin
end,
decode_and_emit_headers(NewConn, StreamId, EndStreamFin, NewBuffer);
nofin ->
NewStream = Stream#h2_stream{header_buffer = NewBuffer},
NewConn = Conn#h2_conn{
streams = (Conn#h2_conn.streams)#{StreamId => NewStream}
},
{ok, NewConn, [], []}
end;
{error, exceeded} ->
header_block_too_large_error()
end;
process_continuation(#h2_conn{continuation_stream = Expected}, StreamId, _EndHeaders, _HeaderBlock) ->
{error,
{connection_error, protocol_error,
<<"CONTINUATION for stream ", (integer_to_binary(StreamId))/binary,
" while expecting stream ", (integer_to_binary(Expected))/binary>>}}.
-spec process_data(conn(), nhttp_lib:stream_id(), fin(), binary()) ->
{ok, conn(), [event()], iodata()} | {error, nhttp_h2_frame:decode_error()}.
process_data(#h2_conn{streams = Streams} = Conn, StreamId, EndStream, Payload) ->
case maps:get(StreamId, Streams, undefined) of
undefined ->
{error,
{connection_error, protocol_error,
<<"DATA frame on unknown stream (RFC 9113 Section 6.1)">>}};
#h2_stream{state = State, content_length = ContentLength, recv_body_length = RecvBodyLen} =
Stream ->
case validate_recv_data(State) of
ok ->
DataLen = byte_size(Payload),
NewRecvBodyLen = RecvBodyLen + DataLen,
NewConnWindow = Conn#h2_conn.recv_window - DataLen,
NewStreamWindow = Stream#h2_stream.recv_window - DataLen,
case validate_recv_flow(NewConnWindow, NewStreamWindow, StreamId) of
ok ->
case
nhttp_msg:validate_content_length(
ContentLength, NewRecvBodyLen, EndStream
)
of
ok ->
NewState = transition_on_recv_end_stream(State, EndStream),
NewStream = Stream#h2_stream{
state = NewState,
recv_window = NewStreamWindow,
recv_body_length = NewRecvBodyLen
},
ActiveCount = Conn#h2_conn.active_stream_count,
NewActiveCount = update_active_count_on_transition(
State, NewState, ActiveCount
),
NewConn = Conn#h2_conn{
streams = store_or_remove_stream(
Streams, StreamId, NewStream
),
recv_window = NewConnWindow,
active_stream_count = NewActiveCount
},
Events = [{data, StreamId, Payload, EndStream}],
{ok, NewConn, Events, []};
{error, content_length_mismatch} ->
{error,
{stream_error, StreamId, protocol_error,
<<"Content-Length mismatch (RFC 9113 Section 8.1.2.6)">>}}
end;
{error, _} = FlowError ->
FlowError
end;
{error, Reason} ->
{error, {stream_error, StreamId, stream_closed, Reason}}
end
end.
-spec process_frame(conn(), nhttp_h2_frame:t()) ->
{ok, conn(), [event()], iodata()} | {error, nhttp_h2_frame:decode_error()}.
process_frame(#h2_conn{continuation_stream = ExpectedId}, Frame) when
ExpectedId =/= undefined,
not (is_tuple(Frame) andalso
tuple_size(Frame) >= 2 andalso
element(1, Frame) =:= continuation andalso
element(2, Frame) =:= ExpectedId)
->
{error,
{connection_error, protocol_error,
<<"Received non-CONTINUATION frame while expecting CONTINUATION (RFC 9113 Section 4.3)">>}};
process_frame(#h2_conn{role = server, state = preface} = Conn, preface) ->
{ok, Conn#h2_conn{state = open}, [], []};
process_frame(#h2_conn{role = server, state = preface}, _Frame) ->
{error,
{connection_error, protocol_error,
<<"Client must send connection preface first (RFC 9113 Section 3.4)">>}};
process_frame(
#h2_conn{role = server, streams = Streams, last_peer_stream_id = LastPeer} = Conn,
{rst_stream, StreamId, _ErrorCode}
) when StreamId band 1 =:= 1, StreamId > LastPeer ->
case maps:get(StreamId, Streams, undefined) of
undefined ->
{error,
{connection_error, protocol_error,
<<"RST_STREAM on idle stream (RFC 9113 Section 5.1)">>}};
_ ->
process_rst_stream(Conn, StreamId, protocol_error)
end;
process_frame(
#h2_conn{role = server, streams = Streams, last_peer_stream_id = LastPeer} = Conn,
{window_update, StreamId, _Increment}
) when StreamId band 1 =:= 1, StreamId > LastPeer ->
case maps:get(StreamId, Streams, undefined) of
undefined ->
{error,
{connection_error, protocol_error,
<<"WINDOW_UPDATE on idle stream (RFC 9113 Section 5.1)">>}};
_ ->
{ok, Conn, [], []}
end;
process_frame(#h2_conn{} = Conn, {settings, Settings}) ->
process_settings(Conn, Settings);
process_frame(#h2_conn{settings_acked = false} = Conn, settings_ack) ->
NewConn = Conn#h2_conn{settings_acked = true, state = open},
{ok, NewConn, [settings_ack], []};
process_frame(#h2_conn{settings_acked = true} = Conn, settings_ack) ->
{ok, Conn, [], []};
process_frame(#h2_conn{} = Conn, {ping, OpaqueData}) ->
{ok, PongFrame} = nhttp_h2_frame:ping_ack(OpaqueData),
{ok, Conn, [{ping, OpaqueData}], PongFrame};
process_frame(#h2_conn{} = Conn, {ping_ack, OpaqueData}) ->
{ok, Conn, [{ping_ack, OpaqueData}], []};
process_frame(#h2_conn{send_window = Window} = Conn, {window_update, Increment}) ->
NewWindow = Window + Increment,
case NewWindow > ?H2_MAX_WINDOW_SIZE of
true ->
{error,
{connection_error, flow_control_error,
<<"Connection window overflow (RFC 9113 Section 6.9)">>}};
false ->
{ok, Conn#h2_conn{send_window = NewWindow}, [{window_update, 0, Increment}], []}
end;
process_frame(#h2_conn{streams = Streams} = Conn, {window_update, StreamId, Increment}) ->
case maps:get(StreamId, Streams, undefined) of
undefined ->
{ok, Conn, [], []};
#h2_stream{send_window = Window, state = State} = Stream ->
case State of
closed ->
{ok, Conn, [], []};
_ ->
NewWindow = Window + Increment,
case NewWindow > ?H2_MAX_WINDOW_SIZE of
true ->
{error,
{stream_error, StreamId, flow_control_error,
<<"Stream window overflow (RFC 9113 Section 6.9)">>}};
false ->
NewStream = Stream#h2_stream{send_window = NewWindow},
NewConn = Conn#h2_conn{streams = Streams#{StreamId => NewStream}},
{ok, NewConn, [{window_update, StreamId, Increment}], []}
end
end
end;
process_frame(#h2_conn{} = Conn, {goaway, LastStreamId, ErrorCode, DebugData}) ->
NewConn = Conn#h2_conn{
goaway_received = true,
state = closing,
last_good_stream_id = LastStreamId
},
{ok, NewConn, [{goaway, LastStreamId, ErrorCode, DebugData}], []};
process_frame(#h2_conn{} = Conn, {rst_stream, StreamId, ErrorCode}) ->
process_rst_stream(Conn, StreamId, ErrorCode);
process_frame(#h2_conn{} = Conn, {data, StreamId, EndStream, Payload}) ->
process_data(Conn, StreamId, EndStream, Payload);
process_frame(#h2_conn{} = Conn, {headers, StreamId, EndStream, EndHeaders, HeaderBlock}) ->
process_headers(Conn, StreamId, EndStream, EndHeaders, HeaderBlock);
process_frame(
#h2_conn{} = Conn, {headers, StreamId, EndStream, EndHeaders, _Priority, HeaderBlock}
) ->
process_headers(Conn, StreamId, EndStream, EndHeaders, HeaderBlock);
process_frame(#h2_conn{} = Conn, {continuation, StreamId, EndHeaders, HeaderBlock}) ->
process_continuation(Conn, StreamId, EndHeaders, HeaderBlock);
process_frame(#h2_conn{} = Conn, {priority, _StreamId, _Priority}) ->
{ok, Conn, [], []};
process_frame(#h2_conn{} = Conn, {push_promise, _StreamId, _EndHeaders, PromisedId, _HeaderBlock}) ->
process_push_promise(Conn, PromisedId);
process_frame(#h2_conn{} = Conn, {unknown, _Type}) ->
{ok, Conn, [], []}.
-spec process_headers(conn(), nhttp_lib:stream_id(), fin(), fin(), binary()) ->
{ok, conn(), [event()], iodata()} | {error, nhttp_h2_frame:decode_error()}.
process_headers(
#h2_conn{continuation_stream = undefined} = Conn, StreamId, EndStream, EndHeaders, HeaderBlock
) ->
case check_header_block_size(Conn, byte_size(HeaderBlock)) of
ok ->
case EndHeaders of
fin ->
decode_and_emit_headers(Conn, StreamId, EndStream, HeaderBlock);
nofin ->
Stream = get_or_create_stream(Conn, StreamId),
NewStream = Stream#h2_stream{
header_buffer = HeaderBlock,
header_end_stream = EndStream =:= fin
},
NewConn = Conn#h2_conn{
streams = (Conn#h2_conn.streams)#{StreamId => NewStream},
continuation_stream = StreamId
},
{ok, NewConn, [], []}
end;
{error, exceeded} ->
header_block_too_large_error()
end;
process_headers(#h2_conn{continuation_stream = Expected}, _StreamId, _ES, _EH, _HB) ->
{error,
{connection_error, protocol_error,
<<"HEADERS while expecting CONTINUATION for stream ",
(integer_to_binary(Expected))/binary>>}}.
-spec process_push_promise(conn(), nhttp_lib:stream_id()) ->
{ok, conn(), [event()], iodata()} | {error, nhttp_h2_frame:decode_error()}.
process_push_promise(#h2_conn{role = client} = Conn, PromisedId) ->
{ok, RstFrame} = nhttp_h2_frame:rst_stream(PromisedId, refused_stream),
{ok, Conn, [], RstFrame};
process_push_promise(#h2_conn{role = server}, _PromisedId) ->
{error,
{connection_error, protocol_error,
<<"Server received PUSH_PROMISE (RFC 9113 Section 6.6)">>}}.
-spec process_rst_stream(conn(), nhttp_lib:stream_id(), error_code()) ->
{ok, conn(), [event()], iodata()}.
process_rst_stream(#h2_conn{streams = Streams} = Conn, StreamId, ErrorCode) ->
NewConn = close_stream(Conn, StreamId),
Event =
case maps:get(StreamId, Streams, undefined) of
undefined -> [];
_ -> [{stream_reset, StreamId, ErrorCode}]
end,
{ok, NewConn, Event, []}.
-spec process_settings(conn(), settings()) ->
{ok, conn(), [event()], iodata()} | {error, nhttp_h2_frame:decode_error()}.
process_settings(#h2_conn{peer_settings = OldSettings, streams = Streams} = Conn, NewSettings) ->
MergedSettings = maps:merge(OldSettings, NewSettings),
NewHpackEnc =
case maps:get(header_table_size, NewSettings, undefined) of
undefined ->
Conn#h2_conn.hpack_enc;
TableSize ->
{ok, UpdatedEnc} = nhttp_hpack:set_max_table_size(
TableSize, Conn#h2_conn.hpack_enc
),
UpdatedEnc
end,
{NewStreams, WindowEvents} =
case maps:get(initial_window_size, NewSettings, undefined) of
undefined ->
{Streams, []};
NewInitialWindow ->
OldInitialWindow = maps:get(
initial_window_size, OldSettings, ?H2_DEFAULT_INITIAL_WINDOW_SIZE
),
Delta = NewInitialWindow - OldInitialWindow,
UpdatedStreams = maps:map(
fun(_Id, Stream) ->
Stream#h2_stream{
send_window = Stream#h2_stream.send_window + Delta
}
end,
Streams
),
Events =
case Delta > 0 of
true ->
[{window_update, Id, Delta} || Id <- maps:keys(Streams)];
false ->
[]
end,
{UpdatedStreams, Events}
end,
NewConn = Conn#h2_conn{
peer_settings = MergedSettings,
streams = NewStreams,
hpack_enc = NewHpackEnc,
state = open
},
{ok, AckFrame} = nhttp_h2_frame:settings_ack(),
{ok, NewConn, [{settings, NewSettings}] ++ WindowEvents, AckFrame}.
-spec recv_loop(conn(), binary(), [[event()]], iodata()) -> recv_result().
recv_loop(Conn, Data, EventsAcc, ToSend) ->
MaxFrameSize = maps:get(
max_frame_size, Conn#h2_conn.local_settings, ?H2_DEFAULT_MAX_FRAME_SIZE
),
case nhttp_h2_frame:decode(Data, MaxFrameSize) of
{ok, Frame, Consumed} ->
Rest = nhttp_h2_frame:split_at(Data, Consumed),
case process_frame(Conn, Frame) of
{ok, NewConn, [], []} ->
recv_loop(NewConn, Rest, EventsAcc, ToSend);
{ok, NewConn, NewEvents, []} ->
recv_loop(NewConn, Rest, [NewEvents | EventsAcc], ToSend);
{ok, NewConn, [], FramesToSend} ->
recv_loop(NewConn, Rest, EventsAcc, [ToSend, FramesToSend]);
{ok, NewConn, NewEvents, FramesToSend} ->
recv_loop(NewConn, Rest, [NewEvents | EventsAcc], [ToSend, FramesToSend]);
{error, {stream_error, StreamId, ErrorCode, _Reason}} ->
{ok, RstFrame} = nhttp_h2_frame:rst_stream(StreamId, ErrorCode),
NewConn = close_stream(Conn, StreamId),
recv_loop(NewConn, Rest, EventsAcc, [ToSend, RstFrame]);
{error, _} = Error ->
Error
end;
{more, _} ->
case validate_preface_buffer(Conn, Data) of
ok ->
FinalConn = Conn#h2_conn{buffer = Data},
FinalEvents = lists:append(lists:reverse(EventsAcc)),
case iolist_size(ToSend) of
0 -> {ok, FinalEvents, FinalConn};
_ -> {ok, FinalEvents, FinalConn, ToSend}
end;
{error, _} = Error ->
Error
end;
{error, _} = Error ->
Error
end.
-spec store_or_remove_stream(
#{nhttp_lib:stream_id() => #h2_stream{}}, nhttp_lib:stream_id(), #h2_stream{}
) -> #{nhttp_lib:stream_id() => #h2_stream{}}.
store_or_remove_stream(Streams, StreamId, #h2_stream{state = closed}) ->
maps:remove(StreamId, Streams);
store_or_remove_stream(Streams, StreamId, Stream) ->
Streams#{StreamId => Stream}.
-spec transition_on_recv_end_stream(stream_state(), fin()) -> stream_state().
transition_on_recv_end_stream(open, fin) -> half_closed_remote;
transition_on_recv_end_stream(half_closed_local, fin) -> closed;
transition_on_recv_end_stream(State, _) -> State.
-spec transition_on_recv_headers(stream_state(), fin()) -> stream_state().
transition_on_recv_headers(idle, fin) -> half_closed_remote;
transition_on_recv_headers(idle, nofin) -> open;
transition_on_recv_headers(reserved_remote, _) -> half_closed_local;
transition_on_recv_headers(open, fin) -> half_closed_remote;
transition_on_recv_headers(half_closed_local, fin) -> closed;
transition_on_recv_headers(State, _) -> State.
-spec transition_on_send_end_stream(stream_state(), fin()) -> stream_state().
transition_on_send_end_stream(open, fin) -> half_closed_local;
transition_on_send_end_stream(half_closed_remote, fin) -> closed;
transition_on_send_end_stream(State, _) -> State.
-spec transition_on_send_headers(stream_state(), fin()) -> stream_state().
transition_on_send_headers(idle, fin) -> half_closed_local;
transition_on_send_headers(idle, nofin) -> open;
transition_on_send_headers(reserved_local, _) -> half_closed_remote;
transition_on_send_headers(open, fin) -> half_closed_local;
transition_on_send_headers(half_closed_remote, fin) -> closed;
transition_on_send_headers(State, _) -> State.
-spec update_active_count_on_transition(
stream_state(), stream_state(), non_neg_integer()
) -> non_neg_integer().
update_active_count_on_transition(OldState, NewState, Count) ->
WasActive = is_active_state(OldState),
IsActive = is_active_state(NewState),
case {WasActive, IsActive} of
{false, true} -> Count + 1;
{true, false} -> max(0, Count - 1);
_ -> Count
end.
-spec validate_decoded_headers(role(), nhttp_lib:headers(), boolean(), settings()) ->
ok | {error, protocol_error}.
validate_decoded_headers(_, Headers, true, _Settings) ->
validate_trailers(Headers);
validate_decoded_headers(server, Headers, false, Settings) ->
validate_request_headers(Headers, Settings);
validate_decoded_headers(client, Headers, false, _Settings) ->
validate_response_headers(Headers).
-spec validate_peer_stream_id(conn(), nhttp_lib:stream_id()) ->
ok | {error, {connection_error, error_code(), binary()}}.
validate_peer_stream_id(#h2_conn{role = server, last_peer_stream_id = LastPeer}, StreamId) ->
case StreamId band 1 of
0 ->
{error,
{connection_error, protocol_error,
<<"Client sent even-numbered stream ID (RFC 9113 Section 5.1.1)">>}};
1 when StreamId =< LastPeer ->
{error,
{connection_error, protocol_error,
<<"Stream ID less than or equal to previous (RFC 9113 Section 5.1.1)">>}};
1 ->
ok
end;
validate_peer_stream_id(#h2_conn{role = client, last_peer_stream_id = LastPeer}, StreamId) ->
case StreamId band 1 of
1 ->
{error,
{connection_error, protocol_error,
<<"Server sent odd-numbered stream ID (RFC 9113 Section 5.1.1)">>}};
0 when StreamId =< LastPeer, StreamId =/= 0 ->
{error,
{connection_error, protocol_error,
<<"Stream ID less than or equal to previous (RFC 9113 Section 5.1.1)">>}};
0 ->
ok
end.
-spec validate_preface_buffer(conn(), binary()) -> ok | {error, nhttp_h2_frame:decode_error()}.
validate_preface_buffer(#h2_conn{role = server, state = preface}, Data) when
byte_size(Data) >= 24
->
case Data of
<<"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n", _/binary>> ->
ok;
_ ->
{error,
{connection_error, protocol_error,
<<"Invalid connection preface (RFC 9113 Section 3.4)">>}}
end;
validate_preface_buffer(_, _) ->
ok.
-spec validate_recv_data(stream_state()) -> ok | {error, binary()}.
validate_recv_data(open) -> ok;
validate_recv_data(half_closed_local) -> ok;
validate_recv_data(_) -> {error, <<"DATA on closed or half-closed stream">>}.
-spec validate_recv_flow(integer(), integer(), nhttp_lib:stream_id()) ->
ok | {error, nhttp_h2_frame:decode_error()}.
validate_recv_flow(ConnWindow, _StreamWindow, _StreamId) when ConnWindow < 0 ->
{error,
{connection_error, flow_control_error,
<<"DATA exceeds connection flow-control window (RFC 9113 Section 6.9)">>}};
validate_recv_flow(_ConnWindow, StreamWindow, StreamId) when StreamWindow < 0 ->
{error,
{stream_error, StreamId, flow_control_error,
<<"DATA exceeds stream flow-control window (RFC 9113 Section 6.9)">>}};
validate_recv_flow(_ConnWindow, _StreamWindow, _StreamId) ->
ok.
-define(CONNECTION_HEADERS_SET, ?NHTTP_MSG_CONNECTION_HEADERS_SET).
-spec build_headers_event(
role(), boolean(), nhttp_lib:stream_id(), nhttp_lib:headers(), fin(), conn()
) ->
[event()].
build_headers_event(_, true, StreamId, Headers, _Fin, _Conn) ->
[{trailers, StreamId, Headers}];
build_headers_event(server, false, StreamId, Headers, Fin, Conn) ->
Request = build_request(Conn, Headers),
[{request, StreamId, Request, Fin}];
build_headers_event(client, false, StreamId, Headers, Fin, _Conn) ->
Response = build_response(Headers),
[{response, StreamId, Response, Fin}].
-spec build_request(conn(), nhttp_lib:headers()) -> nhttp_lib:request().
build_request(#h2_conn{peer = Peer}, Headers) ->
nhttp_msg:build_request(http2, Peer, Headers).
-spec build_response(nhttp_lib:headers()) -> nhttp_lib:response().
build_response(Headers) ->
nhttp_msg:build_response(http2, Headers).
-spec check_extended_connect(
binary() | undefined, binary() | undefined, binary() | undefined, settings()
) -> ok | {error, protocol_error}.
check_extended_connect(Method, Protocol, Authority, Settings) ->
case nhttp_msg:check_extended_connect(Method, Protocol, Authority, Settings) of
ok -> ok;
{error, _} -> {error, protocol_error}
end.
-spec finalise_request_headers(nhttp_msg:request_shape(), settings()) ->
ok | {error, protocol_error}.
finalise_request_headers(#{headers := Headers} = Shape, Settings) ->
case has_non_trailers_te(Headers) of
true ->
{error, protocol_error};
false ->
#{method := Method, protocol := Protocol, authority := Authority} = Shape,
check_extended_connect(Method, Protocol, Authority, Settings)
end.
-spec has_non_trailers_te(nhttp_lib:headers()) -> boolean().
has_non_trailers_te([]) -> false;
has_non_trailers_te([{<<"te">>, V} | _]) when V =/= <<"trailers">> -> true;
has_non_trailers_te([_ | Rest]) -> has_non_trailers_te(Rest).
-spec validate_request_headers(nhttp_lib:headers(), settings()) ->
ok | {error, protocol_error}.
validate_request_headers(Headers, Settings) ->
case nhttp_msg:validate_request_pseudo_shape(Headers) of
{error, _} ->
{error, protocol_error};
{ok, Shape} ->
finalise_request_headers(Shape, Settings)
end.
-spec validate_response_headers(nhttp_lib:headers()) -> ok | {error, protocol_error}.
validate_response_headers(Headers) ->
validate_response_headers(Headers, #{phase => pseudo, has_status => false}).
-spec validate_response_headers(nhttp_lib:headers(), map()) -> ok | {error, protocol_error}.
validate_response_headers([], #{has_status := true}) ->
ok;
validate_response_headers([], #{has_status := false}) ->
{error, protocol_error};
validate_response_headers(
[{<<":status">>, _} | Rest], #{phase := pseudo, has_status := false} = State
) ->
validate_response_headers(Rest, State#{has_status => true});
validate_response_headers([{<<":status">>, _} | _], #{has_status := true}) ->
{error, protocol_error};
validate_response_headers([{<<$:, _/binary>>, _} | _], _) ->
{error, protocol_error};
validate_response_headers([{Name, Value} | Rest], State) ->
case maps:is_key(Name, ?CONNECTION_HEADERS_SET) of
true ->
{error, protocol_error};
false ->
case Name of
<<"te">> when Value =/= <<"trailers">> ->
{error, protocol_error};
_ ->
validate_response_headers(Rest, State#{phase => regular})
end
end.
-spec validate_send_data(conn(), nhttp_lib:stream_id()) -> ok | {error, term()}.
validate_send_data(#h2_conn{streams = Streams}, StreamId) ->
case maps:get(StreamId, Streams, undefined) of
undefined ->
{error, {unknown_stream, StreamId}};
#h2_stream{state = State} ->
case State of
open -> ok;
half_closed_remote -> ok;
_ -> {error, {stream_closed, StreamId}}
end
end.
-spec validate_send_headers(conn(), nhttp_lib:stream_id()) -> ok | {error, term()}.
validate_send_headers(#h2_conn{goaway_sent = true}, _StreamId) ->
{error, connection_closing};
validate_send_headers(
#h2_conn{
role = Role,
streams = Streams,
next_stream_id = NextId,
last_peer_stream_id = LastPeer
},
StreamId
) ->
case maps:get(StreamId, Streams, undefined) of
undefined ->
case was_stream_used(Role, StreamId, NextId, LastPeer) of
true -> {error, {stream_closed, StreamId}};
false -> ok
end;
#h2_stream{state = State} ->
case State of
idle -> ok;
open -> ok;
half_closed_remote -> ok;
reserved_local -> ok;
_ -> {error, {stream_closed, StreamId}}
end
end.
-spec validate_trailers(nhttp_lib:headers()) -> ok | {error, protocol_error}.
validate_trailers(Headers) ->
case nhttp_msg:validate_trailers(Headers) of
ok -> ok;
{error, pseudo_in_trailers} -> {error, protocol_error}
end.
-spec was_stream_used(role(), nhttp_lib:stream_id(), nhttp_lib:stream_id(), nhttp_lib:stream_id()) ->
boolean().
was_stream_used(client, StreamId, NextId, _LastPeer) when StreamId band 1 =:= 1 ->
StreamId < NextId;
was_stream_used(server, StreamId, _NextId, LastPeer) when StreamId band 1 =:= 1 ->
StreamId =< LastPeer;
was_stream_used(client, StreamId, _NextId, LastPeer) when StreamId band 1 =:= 0 ->
StreamId =< LastPeer;
was_stream_used(server, StreamId, NextId, _LastPeer) when StreamId band 1 =:= 0 ->
StreamId < NextId;
was_stream_used(_, _, _, _) ->
false.