Skip to main content

src/roadrunner_conn.erl

-module(roadrunner_conn).
-moduledoc false.

%% Public connection-process API and pure helpers.
%%
%% `start/2` spawns the per-connection process — `roadrunner_conn_loop`,
%% a tail-recursive loop with phase tracking via `proc_lib:set_label/1`
%% for observer / recon visibility.
%%
%% The other public functions are pure-ish helpers; many are also called
%% directly from `roadrunner_req` (manual body buffering) and from
%% `roadrunner_conn_tests.erl`'s closure-driven unit tests.
%%
%% Per-connection behavior — keep-alive (capped by
%% `max_keep_alive_requests`, idle-bound by `keep_alive_timeout`),
%% `Expect: 100-continue`, HEAD body suppression, anti-Slowloris rate
%% check (`min_bytes_per_second`), the five handler return shapes
%% (`{Status, Headers, Body}`, `{stream, ...}`, `{loop, ...}`,
%% `{sendfile, ...}`, `{websocket, ...}`) — lives in `roadrunner_conn_loop`
%% and the response-shape-specific modules (`roadrunner_stream_response`,
%% `roadrunner_loop_response`, `roadrunner_ws_session`).
%%
%% The 4xx/5xx error responses (400 on parse failure, 408 on
%% first-request silence, 413 on oversized bodies, 500 on handler
%% crashes) are emitted via the `send_*/1` helpers exported here. Idle
%% keep-alive timeouts and slow-client rate violations close the
%% connection silently — no response to a peer that wasn't going to
%% read it anyway.

-export([
    start/2,
    parse_loop/2,
    read_body/4,
    peer/1,
    try_acquire_slot/1,
    release_slot/1,
    consume_body_reader/2,
    join_drain_group/2
]).
%% Internal helpers shared with `roadrunner_conn_loop`. Marked `-doc false`
%% individually so they stay invisible to the public API surface but
%% are still reachable across the module boundary. They live here
%% (rather than inside the conn-loop module) because the closure-driven
%% unit tests in `roadrunner_conn_tests.erl` exercise the body-state
%% machinery directly through these functions.
-export([
    make_recv/3,
    rate_ok/3,
    body_framing/1,
    generate_request_id/1,
    set_request_logger_metadata/1,
    maybe_send_continue/3,
    refine_conn_label/2,
    scheme/1,
    make_body_reader/4,
    drain_body/1,
    keep_alive_decision/2,
    send_request_timeout/1,
    send_bad_request/1,
    send_payload_too_large/1,
    drain_oversized_body/3,
    send_internal_error/1,
    send_not_found/1,
    resolve_handler/2,
    response_status/1,
    response_kind/1
]).

-export_type([proto_opts/0, dispatch/0]).

-on_load(init_patterns/0).

-define(CLOSE_CP_KEY, {?MODULE, close_cp}).
-define(KEEP_ALIVE_CP_KEY, {?MODULE, keep_alive_cp}).

-type dispatch() ::
    {handler, module(), roadrunner_middleware:next(), State :: term()}
    | {router, ListenerName :: atom()}.

-type proto_opts() :: #{
    dispatch := dispatch(),
    middlewares := roadrunner_middleware:middleware_list(),
    max_content_length := non_neg_integer(),
    request_timeout := non_neg_integer(),
    keep_alive_timeout := non_neg_integer(),
    max_keep_alive_requests := pos_integer(),
    max_clients := pos_integer(),
    client_counter := atomics:atomics_ref(),
    requests_counter := atomics:atomics_ref(),
    min_bytes_per_second := non_neg_integer(),
    body_buffering := auto | manual,
    listener_name => atom(),
    %% When `false`, conns skip the per-process `pg:join` into
    %% `{roadrunner_drain, ListenerName}`. The drain group is the
    %% mechanism `roadrunner_listener:drain/2` uses to broadcast
    %% `{roadrunner_drain, Deadline}` to in-flight conns; loop /
    %% SSE / WebSocket handlers depend on it. Short-lived h1
    %% workloads can opt out for ~10% lower per-conn overhead.
    graceful_drain => boolean(),
    %% Enabled protocols as a flat atom list in user-supplied (ALPN
    %% preference) order. On plain TCP with `[http2]`,
    %% `roadrunner_conn_loop:awaiting_shoot/3` routes straight to the
    %% h2 conn loop. HTTP/2 sub-opts are flattened onto proto_opts
    %% top-level as `http2_conn_window`, `http2_stream_window`,
    %% `http2_window_refill_threshold` — see those keys below. The
    %% user-facing nested shape (`{http2, #{conn_window => N, ...}}`)
    %% is documented in `t:roadrunner_listener:opts/0`.
    protocols => [http1 | http2, ...],
    %% HTTP/2 receive-window tuning, populated by the listener only
    %% when `http2` is in the protocols list. Pattern-match these
    %% keys directly in code paths that already know http2 is
    %% enabled (e.g. `roadrunner_conn_loop_http2:enter/5`) — they're
    %% guaranteed present, defaults filled at normalization time.
    %% See `t:roadrunner_listener:opts/0` for the user-facing shape
    %% (`{http2, #{conn_window => N, stream_window => N,
    %% window_refill_threshold => N}}`) and RFC 9113 §6.5.2 / §6.9.2
    %% for the wire semantics.
    http2_conn_window => 1..16#7FFFFFFF,
    http2_stream_window => 1..16#7FFFFFFF,
    http2_window_refill_threshold => 1..16#7FFFFFFF,
    %% Optional fields the listener injects only when the user
    %% supplies them — see `roadrunner_listener:build_proto_opts/2`.
    %% Declared here so dialyzer accepts pattern matches like
    %% `#{hibernate_after := Ms}` against `proto_opts()`.
    hibernate_after => pos_integer(),
    rate_check_interval => pos_integer()
}.

-doc """
Spawn an unlinked connection process for the accepted `Socket` and the
shared `ProtoOpts` (handler module, body limits, ...).

The caller (typically `roadrunner_acceptor`) must transfer socket
ownership via `roadrunner_transport:controlling_process/2` and then
send the process the atom `shoot` to release it.
""".
-spec start(roadrunner_transport:socket(), proto_opts()) -> {ok, pid()}.
start(Socket, ProtoOpts) when is_map(ProtoOpts) ->
    {ok, _Pid} = roadrunner_conn_loop:start(Socket, ProtoOpts).

-doc """
Join the per-listener `pg` group so `roadrunner_listener:drain/2` can
broadcast a `{roadrunner_drain, Deadline}` notification to the calling
process. `pg` removes the caller automatically when the process
exits. The `pg` scope is started by `roadrunner_sup`; in tests that
drive `roadrunner_listener:start_link/2` directly without starting the
application, the scope is absent and the join is silently skipped
— drain will simply not see those conns.

Called by `roadrunner_conn_loop:init_loop/3` after the conn process
starts but before it accepts any work.
""".
-spec join_drain_group(atom(), boolean()) -> ok.
join_drain_group(_Name, false) ->
    ok;
join_drain_group(undefined, _) ->
    ok;
join_drain_group(Name, true) ->
    case whereis(pg) of
        undefined -> ok;
        _ -> pg:join({roadrunner_drain, Name}, self())
    end.

-doc """
Try to bump the live-connection counter under `max_clients`. Returns
`true` on success (caller may proceed to spawn a conn), `false` if
the cap is already met (caller must close the accepted socket).

The check is racy by a small amount: between increment and rollback
multiple acceptors may briefly observe a count slightly above the
cap, but the count is corrected immediately by the rollback. The
overshoot is at most `num_acceptors - 1` — bounded and harmless.

## Slot leak under abnormal exits

The slot is released by `roadrunner_conn_loop:exit_clean/2` on every
normal exit path (handler crash, parse error, drain stop, peer
close). Under `exit(Pid, kill)` — sent by a supervisor or by an
operator using `recon:proc_count/2`-style cleanup — the runtime
skips the cleanup funnel, so the slot is **leaked** for the lifetime
of the listener process. This is bounded:
`max_clients` accepted connections each leak at most one slot
under killing, and the listener restart resets the counter. If
leaks become a real concern under chaos-test conditions, add a
periodic reaper that compares `pg:get_members({roadrunner_drain, _})`
against the live counter and reconciles the difference.
""".
-spec try_acquire_slot(proto_opts()) -> boolean().
try_acquire_slot(#{client_counter := Ref, max_clients := Max}) ->
    case atomics:add_get(Ref, 1, 1) of
        N when N =< Max ->
            true;
        _ ->
            atomics:sub(Ref, 1, 1),
            false
    end.

-doc "Decrement the live-connection counter — paired with `try_acquire_slot/1`.".
-spec release_slot(proto_opts()) -> ok.
release_slot(#{client_counter := Ref}) ->
    _ = atomics:sub(Ref, 1, 1),
    ok.

-doc false.
-spec refine_conn_label(
    proto_opts(), {inet:ip_address(), inet:port_number()} | undefined
) -> ok.
refine_conn_label(ProtoOpts, Peer) ->
    ListenerName = maps:get(listener_name, ProtoOpts, undefined),
    proc_lib:set_label({roadrunner_conn, ListenerName, Peer}),
    ok.

%% 64 random bits in lowercase hex — collision-resistant for billions of
%% requests, short enough to embed in log lines.
%%
%% `/1` accepts a per-conn buffer of pre-generated random bytes and
%% returns `{RequestId, NewBuffer}` — caller threads the buffer
%% through its own state. The conn_loop variant uses this to amortize
%% the CSPRNG NIF call: one `crypto:strong_rand_bytes/1` per ~32
%% requests instead of one per request. Each 8-byte slice still
%% carries a full 64 bits of independent entropy — the batch boundary
%% doesn't reduce randomness.

-define(REQ_ID_BATCH_BYTES, 256).

-doc false.
-spec generate_request_id(binary()) -> {binary(), binary()}.
generate_request_id(<<Slice:8/binary, Rest/binary>>) ->
    {binary:encode_hex(Slice, lowercase), Rest};
generate_request_id(_Empty) ->
    %% Buffer drained (or never initialized) — refill with one NIF call.
    <<Slice:8/binary, Rest/binary>> = crypto:strong_rand_bytes(?REQ_ID_BATCH_BYTES),
    {binary:encode_hex(Slice, lowercase), Rest}.

%% Replaces (not merges) the conn process's logger metadata so a
%% keep-alive request never inherits the previous request's correlation.
-doc false.
-spec set_request_logger_metadata(roadrunner_req:request()) -> ok.
set_request_logger_metadata(#{
    request_id := RequestId,
    method := Method,
    target := Target,
    peer := Peer
}) ->
    logger:set_process_metadata(#{
        request_id => RequestId,
        method => Method,
        path => Target,
        peer => Peer
    }).

%% Build a recv closure with a single overall deadline plus a rolling
%% rate check. `gen_tcp:recv` with a negative timeout is undefined, so
%% we cap at 0 — which makes gen_tcp return `{error, timeout}`
%% immediately when the deadline has passed. Any timeout here is, by
%% construction, the request_timeout.
%%
%% Rate enforcement (anti-Slowloris): track total bytes received and
%% time since the first recv. After a 1-second grace, require the
%% running average to meet `MinRate` bytes/sec, otherwise return
%% `{error, slow_client}`. The state is a per-conn atomics ref — no
%% cross-process contention.
-doc false.
-spec make_recv(roadrunner_transport:socket(), integer(), non_neg_integer()) ->
    fun(() -> {ok, binary()} | {error, request_timeout | slow_client | term()}).
make_recv(Socket, Deadline, MinRate) ->
    Bytes = atomics:new(1, [{signed, false}]),
    Start = erlang:monotonic_time(millisecond),
    fun() ->
        Now = erlang:monotonic_time(millisecond),
        Remaining = max(0, Deadline - Now),
        case roadrunner_transport:recv(Socket, 0, Remaining) of
            {ok, Data} ->
                Total = atomics:add_get(Bytes, 1, byte_size(Data)),
                case rate_ok(Now - Start, Total, MinRate) of
                    true -> {ok, Data};
                    false -> {error, slow_client}
                end;
            {error, timeout} ->
                {error, request_timeout};
            {error, _} = E ->
                E
        end
    end.

-doc false.
%% A 1-second grace lets a slow handshake / TLS session start without
%% being misclassified. After that, the running average must meet the
%% minimum or the client is dropped. `MinRate = 0` falls through and
%% always passes — the inequality `Total * 1000 >= 0` is trivially true.
-spec rate_ok(integer(), non_neg_integer(), non_neg_integer()) -> boolean().
rate_ok(ElapsedMs, _Total, _MinRate) when ElapsedMs =< 1000 -> true;
rate_ok(ElapsedMs, Total, MinRate) -> Total * 1000 >= MinRate * ElapsedMs.

-doc false.
-spec peer(roadrunner_transport:socket()) -> {inet:ip_address(), inet:port_number()} | undefined.
peer(Socket) ->
    case roadrunner_transport:peername(Socket) of
        {ok, Peer} -> Peer;
        {error, _} -> undefined
    end.

-doc false.
-spec scheme(roadrunner_transport:socket()) -> http | https.
scheme({gen_tcp, _}) -> http;
scheme({ssl, _}) -> https;
scheme({fake, _}) -> http.

-doc false.
-spec resolve_handler(dispatch(), roadrunner_req:request()) ->
    {ok, module(), roadrunner_router:bindings(), roadrunner_middleware:next(), term()}
    | not_found.
resolve_handler({handler, Mod, Pipeline, State}, _Req) ->
    {ok, Mod, #{}, Pipeline, State};
resolve_handler({router, ListenerName}, Req) ->
    %% Routes are stored in `persistent_term` by `roadrunner_listener` so
    %% the lookup is O(1) and `roadrunner_listener:reload_routes/2` can
    %% atomically swap the table without bouncing the listener.
    Compiled = persistent_term:get({roadrunner_routes, ListenerName}),
    roadrunner_router:match(roadrunner_req:path(Req), Compiled).

-doc false.
-spec read_body(
    roadrunner_req:request(),
    binary(),
    fun(() -> {ok, binary()} | {error, term()}),
    non_neg_integer()
) ->
    {ok, Body :: iodata(), Leftover :: binary()}
    | {error,
        content_length_too_large
        | bad_content_length
        | bad_transfer_encoding
        | term()}.
read_body(Req, Buffered, RecvFun, MaxCL) ->
    case body_framing(Req) of
        none ->
            %% Per RFC 9112 §6.3: a request without `Content-Length`
            %% or `Transfer-Encoding` has a zero-length message body.
            %% Any leftover bytes in `Buffered` belong to a pipelined
            %% next request — preserve them as `Leftover` so the conn
            %% can feed them into the next `reading_request` parse.
            {ok, <<>>, Buffered};
        chunked ->
            read_chunked(Buffered, RecvFun, MaxCL, 0);
        {content_length, N} when N > MaxCL ->
            {error, content_length_too_large};
        {content_length, N} ->
            read_body_until(N, Buffered, RecvFun);
        {error, _} = Err ->
            Err
    end.

%% RFC 9110 §10.1.1: when a request carries `Expect: 100-continue` and
%% we're about to read a body, send `HTTP/1.1 100 Continue` so clients
%% that gate body transmission on this signal don't stall. We only do
%% this if no body bytes have already arrived in the buffer — once we
%% see body data the client clearly didn't wait, and the 100 line is
%% redundant.
-doc false.
-spec maybe_send_continue(roadrunner_transport:socket(), roadrunner_req:request(), binary()) ->
    ok.
maybe_send_continue(Socket, Req, Buffered) ->
    case Buffered =:= ~"" andalso has_continue_expectation(Req) of
        true ->
            _ = roadrunner_transport:send(Socket, ~"HTTP/1.1 100 Continue\r\n\r\n"),
            ok;
        false ->
            ok
    end.

-spec has_continue_expectation(roadrunner_req:request()) -> boolean().
has_continue_expectation(#{cached_decisions := #{expects_continue := EC}}) ->
    EC;
has_continue_expectation(Req) ->
    %% Manually-built request maps (tests, middleware) skip the parse-time
    %% precompute — fall back to the lowercase-and-compare path.
    case roadrunner_req:header(~"expect", Req) of
        undefined -> false;
        Value -> roadrunner_bin:ascii_lowercase(Value) =:= ~"100-continue"
    end.

-doc false.
-spec make_body_reader(
    none | chunked | {content_length, non_neg_integer()},
    binary(),
    fun(() -> {ok, binary()} | {error, term()}),
    non_neg_integer()
) -> roadrunner_req:body_reader().
make_body_reader(Framing, Buffered, Recv, Max) ->
    #{
        framing => Framing,
        buffered => Buffered,
        bytes_read => 0,
        pending => <<>>,
        done => false,
        recv => Recv,
        max => Max
    }.

-doc false.
-spec body_framing(roadrunner_req:request()) ->
    none
    | chunked
    | {content_length, non_neg_integer()}
    | {error, bad_content_length | bad_transfer_encoding}.
body_framing(#{cached_decisions := #{is_chunked := true}}) ->
    chunked;
body_framing(#{cached_decisions := #{has_transfer_encoding := true}}) ->
    %% Non-chunked Transfer-Encoding (e.g. `gzip`). Rejected per
    %% RFC 9112 §6.1 — we only support identity and chunked.
    {error, bad_transfer_encoding};
body_framing(#{cached_decisions := #{content_length := CL}}) ->
    %% No Transfer-Encoding header. `parse_request/1`'s `check_framing/1`
    %% already rejected TE+CL combos and inconsistent multi-CL, so the
    %% cached Content-Length is the body framing.
    case CL of
        none -> none;
        {ok, N} -> {content_length, N};
        {error, _} = Err -> Err
    end;
body_framing(Req) ->
    %% Manually-built request maps without cached_decisions — full path.
    case roadrunner_req:header(~"transfer-encoding", Req) of
        undefined ->
            case content_length(Req) of
                none -> none;
                {ok, N} -> {content_length, N};
                {error, _} = Err -> Err
            end;
        Value ->
            %% RFC 9110 §10.1.4: transfer-coding names are
            %% case-insensitive. Accept `chunked`, `Chunked`,
            %% `CHUNKED` etc. (clients in the wild send all variants).
            case roadrunner_bin:ascii_lowercase(Value) of
                ~"chunked" -> chunked;
                _ -> {error, bad_transfer_encoding}
            end
    end.

-spec read_body_until(
    non_neg_integer(),
    binary(),
    fun(() -> {ok, binary()} | {error, term()})
) ->
    {ok, iodata(), binary()} | {error, term()}.
read_body_until(N, Buffered, _RecvFun) when byte_size(Buffered) >= N ->
    <<Body:N/binary, Leftover/binary>> = Buffered,
    {ok, Body, Leftover};
read_body_until(N, Buffered, RecvFun) ->
    %% Body recursion: each level reads one recv-chunk and prepends it
    %% to the iolist returned from below on the way out. The auto-path
    %% body field is `iodata()` so handlers that only need
    %% `iolist_size/1` or want to forward the body via `gen_tcp:send/2`
    %% never pay the flatten cost. Handlers requiring a flat binary
    %% (e.g. pattern matching, `roadrunner_qs:parse/1`) call
    %% `iolist_to_binary/1` themselves.
    case read_body_until_io(N - byte_size(Buffered), RecvFun) of
        {ok, MoreIo, Leftover} ->
            {ok, [Buffered | MoreIo], Leftover};
        {error, _} = E ->
            E
    end.

-spec read_body_until_io(
    non_neg_integer(),
    fun(() -> {ok, binary()} | {error, term()})
) ->
    {ok, iolist(), binary()} | {error, term()}.
read_body_until_io(N, RecvFun) ->
    case RecvFun() of
        {ok, Data} ->
            DataSz = byte_size(Data),
            if
                DataSz >= N ->
                    <<Chunk:N/binary, Leftover/binary>> = Data,
                    {ok, [Chunk], Leftover};
                true ->
                    case read_body_until_io(N - DataSz, RecvFun) of
                        {ok, More, Leftover} -> {ok, [Data | More], Leftover};
                        {error, _} = E -> E
                    end
            end;
        {error, _} = E ->
            E
    end.

%% Read chunks until the size-0 last-chunk, concatenating decoded data
%% into the result. Caps the accumulated body at MaxCL — a malicious
%% client cannot stream unbounded chunked bytes past the configured
%% limit. Body recursion: each call returns the body of the remaining
%% chunks, the current call prepends its own data on the way out.
-spec read_chunked(
    binary(),
    fun(() -> {ok, binary()} | {error, term()}),
    non_neg_integer(),
    non_neg_integer()
) ->
    {ok, binary(), binary()} | {error, content_length_too_large | term()}.
read_chunked(Buf, RecvFun, MaxCL, Decoded) ->
    case roadrunner_http1:parse_chunk(Buf) of
        {ok, last, _Trailers, Leftover} ->
            %% Bytes after the size-0 last-chunk + trailer block are
            %% pipelined-next-request leftover; thread them up so the
            %% conn can feed them into the next parse.
            {ok, <<>>, Leftover};
        {ok, Data, Rest} ->
            NewDecoded = Decoded + byte_size(Data),
            if
                NewDecoded > MaxCL ->
                    {error, content_length_too_large};
                true ->
                    case read_chunked(Rest, RecvFun, MaxCL, NewDecoded) of
                        {ok, More, Leftover} ->
                            {ok, <<Data/binary, More/binary>>, Leftover};
                        {error, _} = E ->
                            E
                    end
            end;
        {more, _} ->
            case RecvFun() of
                {ok, More} ->
                    read_chunked(<<Buf/binary, More/binary>>, RecvFun, MaxCL, Decoded);
                {error, _} = E ->
                    E
            end;
        {error, _} = E ->
            E
    end.

%% Read and discard whatever the handler left in the manual-mode
%% `body_reader`, returning any post-body leftover bytes that belong
%% to a pipelined next request. Called only on the 4-tuple response
%% path; `roadrunner_conn_loop`'s finishing phase threads `Leftover`
%% forward into the next `reading_request` parse so pipelined
%% clients get their N+1 request seen.
-doc false.
-spec drain_body(roadrunner_req:request()) -> {ok, binary()} | {error, term()}.
drain_body(#{body_reader := BS}) ->
    case consume_body_reader(BS, all) of
        {ok, _Bytes, #{buffered := Leftover}} -> {ok, Leftover};
        {error, _} = E -> E
    end.

-doc """
Consume bytes from a manual-mode `roadrunner_req:body_reader()`. Returns either the
final tail (`{ok, Bytes, NewState}` — the body has been fully drained)
or a partial chunk (`{more, Bytes, NewState}` — more is still pending).
Used by `roadrunner_req:read_body/1,2`; not part of the public API.

`Mode` is `all` (drain to end) or `{length, N}` (read up to `N`
bytes — content-length framing only; chunked falls through to a
full read).
""".
-spec consume_body_reader(
    roadrunner_req:body_reader(), all | next_chunk | {length, non_neg_integer()}
) ->
    {ok, iodata(), roadrunner_req:body_reader()}
    | {more, iodata(), roadrunner_req:body_reader()}
    | {error, term()}.
consume_body_reader(#{framing := none} = BS, _Mode) ->
    %% Per RFC 9112 §6.3: no framing means the body is empty.
    %% Any `buffered` bytes are pipelined-next-request leftovers —
    %% preserve them in the body_reader's `buffered` field so
    %% `roadrunner_conn_loop`'s finishing phase can thread them into
    %% the next `reading_request` parse for full pipelining support.
    {ok, <<>>, BS};
consume_body_reader(
    #{framing := {content_length, N}, bytes_read := Read} = BS, _Mode
) when Read >= N ->
    {ok, <<>>, BS};
consume_body_reader(
    #{
        framing := {content_length, N},
        bytes_read := Read,
        buffered := Buf,
        recv := Recv,
        max := Max
    } = BS,
    Mode
) ->
    Remaining = N - Read,
    Want =
        case Mode of
            all -> Remaining;
            next_chunk -> Remaining;
            {length, L} -> min(Remaining, L)
        end,
    case Want > Max of
        true ->
            {error, content_length_too_large};
        false ->
            case fill_n(Want, Buf, Recv) of
                {ok, Bytes, NewBuf} ->
                    NewRead = Read + iolist_size(Bytes),
                    NewState = BS#{buffered := NewBuf, bytes_read := NewRead},
                    case NewRead >= N of
                        true -> {ok, Bytes, NewState};
                        false -> {more, Bytes, NewState}
                    end;
                {error, _} = E ->
                    E
            end
    end;
consume_body_reader(#{framing := chunked} = BS, all) ->
    %% Drain everything left: any pending decoded bytes plus all
    %% remaining chunks, accumulated in one return. Iodata stays unflattened
    %% so callers that only need `iolist_size/1` or want to forward via
    %% `gen_tcp:send/2` skip a flatten.
    chunked_collect(BS, infinity);
consume_body_reader(#{framing := chunked} = BS, {length, N}) ->
    chunked_collect(BS, N);
consume_body_reader(#{framing := chunked} = BS, next_chunk) ->
    next_chunk(BS).
%% Non-chunked framing (none, content_length) is handled by the
%% earlier clauses above — `next_chunk` is treated as a full drain
%% inside those, since there are no chunk boundaries to honor.

%% Pull decoded chunked-body bytes out of `BS` until either `Want`
%% bytes are collected or the body is fully drained. `Want` is either
%% `infinity` (drain to end — caller asked for `all`) or a positive
%% integer (caller asked for `{length, N}`). Returns
%% `{ok | more, Bytes, BS2}` with Bytes as iodata (propagated through
%% `consume_body_reader` to the public API unflattened).
-spec chunked_collect(roadrunner_req:body_reader(), infinity | non_neg_integer()) ->
    {ok, iodata(), roadrunner_req:body_reader()}
    | {more, iodata(), roadrunner_req:body_reader()}
    | {error, term()}.
chunked_collect(#{pending := Pending} = BS, Want) when
    Want =/= infinity, byte_size(Pending) >= Want
->
    %% Pending alone satisfies the request — no need to look at the
    %% wire. The body may or may not have more bytes; we always tag
    %% `more` here and let the next call detect end-of-body via the
    %% `done` clause below.
    <<Take:Want/binary, RestPending/binary>> = Pending,
    {more, [Take], BS#{pending := RestPending}};
chunked_collect(#{pending := Pending} = BS, Want) when byte_size(Pending) > 0 ->
    %% Take everything pending, then try to fill more from the wire.
    %% Cons `Pending` in front of the recursion's result on the way
    %% out — body recursion replaces the old `[Pending | Acc]` /
    %% `lists:reverse` shape.
    NewWant =
        case Want of
            infinity -> infinity;
            N -> N - byte_size(Pending)
        end,
    case chunked_collect(BS#{pending := <<>>}, NewWant) of
        {Tag, RestIo, BS2} -> {Tag, [Pending | RestIo], BS2};
        {error, _} = E -> E
    end;
chunked_collect(#{done := true} = BS, _Want) ->
    {ok, [], BS};
chunked_collect(#{buffered := Buf, recv := Recv, max := Max, bytes_read := Read} = BS, Want) ->
    case roadrunner_http1:parse_chunk(Buf) of
        {ok, Data, Rest} ->
            NewRead = Read + byte_size(Data),
            case NewRead > Max of
                true ->
                    {error, content_length_too_large};
                false ->
                    BS2 = BS#{buffered := Rest, bytes_read := NewRead, pending := Data},
                    chunked_collect(BS2, Want)
            end;
        {ok, last, _Trailers, Rest} ->
            chunked_collect(BS#{buffered := Rest, done := true}, Want);
        {more, _} ->
            case Recv() of
                {ok, More} ->
                    chunked_collect(BS#{buffered := <<Buf/binary, More/binary>>}, Want);
                {error, _} = E ->
                    E
            end;
        {error, _} = E ->
            E
    end.

%% Pull exactly one decoded chunk out of a chunked body_reader. Pending
%% bytes (left over from a length-bounded read) are returned first; if
%% pending is empty, parse the next wire chunk. End-of-body returns
%% `{ok, <<>>, BS}`.
-spec next_chunk(roadrunner_req:body_reader()) ->
    {ok, binary(), roadrunner_req:body_reader()}
    | {more, binary(), roadrunner_req:body_reader()}
    | {error, term()}.
next_chunk(#{pending := Pending} = BS) when byte_size(Pending) > 0 ->
    {more, Pending, BS#{pending := <<>>}};
next_chunk(#{done := true} = BS) ->
    {ok, <<>>, BS};
next_chunk(#{buffered := Buf, recv := Recv, max := Max, bytes_read := Read} = BS) ->
    case roadrunner_http1:parse_chunk(Buf) of
        {ok, Data, Rest} ->
            NewRead = Read + byte_size(Data),
            case NewRead > Max of
                true ->
                    {error, content_length_too_large};
                false ->
                    {more, Data, BS#{buffered := Rest, bytes_read := NewRead}}
            end;
        {ok, last, _Trailers, Rest} ->
            {ok, <<>>, BS#{buffered := Rest, done := true}};
        {more, _} ->
            case Recv() of
                {ok, More} ->
                    next_chunk(BS#{buffered := <<Buf/binary, More/binary>>});
                {error, _} = E ->
                    E
            end;
        {error, _} = E ->
            E
    end.

-spec fill_n(non_neg_integer(), binary(), fun(() -> {ok, binary()} | {error, term()})) ->
    {ok, iodata(), binary()} | {error, term()}.
fill_n(N, Buf, _Recv) when byte_size(Buf) >= N ->
    <<Bytes:N/binary, Rest/binary>> = Buf,
    {ok, Bytes, Rest};
fill_n(N, Buf, Recv) ->
    %% Body recursion that conses each recv chunk onto the iolist on
    %% the way OUT. The iolist propagates through `consume_body_reader`
    %% to the caller unflattened so callers that only need
    %% `iolist_size/1` (or want to forward via `gen_tcp:send/2`) avoid
    %% an O(total-body) copy.
    Need = N - byte_size(Buf),
    case fill_iolist(Need, Recv) of
        {ok, Iolist, Leftover} ->
            {ok, [Buf | Iolist], Leftover};
        {error, _} = E ->
            E
    end.

%% Always called with `Need >= 1` from `fill_n/3` (the
%% `byte_size(Buf) >= N` clause handles the Need = 0 case before
%% we get here). When `MoreSize == Need` exactly, the
%% `MoreSize >= Need` branch returns directly — we never recurse
%% with Need = 0, so no base clause is needed.
-spec fill_iolist(pos_integer(), fun(() -> {ok, binary()} | {error, term()})) ->
    {ok, iolist(), binary()} | {error, term()}.
fill_iolist(Need, Recv) ->
    case Recv() of
        {ok, More} ->
            MoreSize = byte_size(More),
            if
                MoreSize >= Need ->
                    <<Take:Need/binary, Leftover/binary>> = More,
                    {ok, [Take], Leftover};
                true ->
                    case fill_iolist(Need - MoreSize, Recv) of
                        {ok, Rest, Leftover} -> {ok, [More | Rest], Leftover};
                        {error, _} = E -> E
                    end
            end;
        {error, _} = E ->
            E
    end.

-spec content_length(roadrunner_req:request()) ->
    none | {ok, non_neg_integer()} | {error, bad_content_length}.
content_length(Req) ->
    case roadrunner_req:header(~"content-length", Req) of
        undefined ->
            none;
        Bin ->
            try binary_to_integer(Bin) of
                N when N >= 0 -> {ok, N};
                _ -> {error, bad_content_length}
            catch
                _:_ -> {error, bad_content_length}
            end
    end.

-doc false.
-spec parse_loop(binary(), fun(() -> {ok, binary()} | {error, term()})) ->
    {ok, roadrunner_req:request(), binary()} | {error, term()}.
parse_loop(Buf, RecvFun) ->
    case roadrunner_http1:parse_request(Buf) of
        {ok, Req, Rest} ->
            {ok, Req, Rest};
        {more, _} ->
            case RecvFun() of
                {ok, Data} -> parse_loop(<<Buf/binary, Data/binary>>, RecvFun);
                {error, _} = E -> E
            end;
        {error, _} = E ->
            E
    end.

%% Order matters — `{websocket, _, _}` is a 3-tuple too, so the
%% atom-tagged variants must precede the buffered catch-all.
-doc false.
-spec response_status(roadrunner_handler:response()) -> roadrunner_http:status().
response_status({stream, Status, _, _}) -> Status;
response_status({loop, Status, _, _}) -> Status;
response_status({sendfile, Status, _, _}) -> Status;
response_status({websocket, _, _}) -> 101;
response_status({Status, _, _}) when is_integer(Status) -> Status.

-doc false.
-spec response_kind(roadrunner_handler:response()) ->
    buffered | stream | loop | sendfile | websocket.
response_kind({stream, _, _, _}) -> stream;
response_kind({loop, _, _, _}) -> loop;
response_kind({sendfile, _, _, _}) -> sendfile;
response_kind({websocket, _, _}) -> websocket;
response_kind({_, _, _}) -> buffered.

%% HTTP/1.0 default close. HTTP/1.1 keep-alive unless either side
%% set Connection: close.
-doc false.
-spec keep_alive_decision(roadrunner_req:request(), roadrunner_http:headers()) ->
    keep_alive | close.
%% Common-case fast path: HTTP/1.1, parser-cached request `Connection`
%% empty, response has no `connection` header → `keep_alive` directly.
%% Skips the lowercase + has_token dance entirely. Most production
%% hello/echo responses hit this path.
keep_alive_decision(
    #{
        version := {1, 1},
        cached_decisions := #{connection_lower := <<>>}
    } = Req,
    RespHeaders
) when is_list(RespHeaders) ->
    case lists:keymember(~"connection", 1, RespHeaders) of
        false -> keep_alive;
        true -> keep_alive_decision_full(Req, RespHeaders)
    end;
keep_alive_decision(Req, RespHeaders) ->
    keep_alive_decision_full(Req, RespHeaders).

-spec keep_alive_decision_full(roadrunner_req:request(), roadrunner_http:headers()) ->
    keep_alive | close.
keep_alive_decision_full(Req, RespHeaders) ->
    ReqConn = req_connection_lower(Req),
    RespConn = roadrunner_bin:ascii_lowercase(resp_connection_token(RespHeaders)),
    CloseCp = persistent_term:get(?CLOSE_CP_KEY),
    case roadrunner_req:version(Req) of
        {1, 0} ->
            %% RFC 9112 §9.3 + RFC 9110 §7.6.1: HTTP/1.0 default is close, but
            %% `Connection: keep-alive` from client opts in (so long
            %% as the response doesn't force close). `andalso` short-
            %% circuits on the keep-alive check so the response-side
            %% `has_token` only fires when the client opted in.
            case
                has_token(ReqConn, persistent_term:get(?KEEP_ALIVE_CP_KEY)) andalso
                    not has_token(RespConn, CloseCp)
            of
                true -> keep_alive;
                false -> close
            end;
        {1, 1} ->
            %% `orelse` short-circuits on ReqClose = true so the
            %% response-side `has_token` only fires when the client
            %% didn't already say `close`.
            case has_token(ReqConn, CloseCp) orelse has_token(RespConn, CloseCp) of
                true -> close;
                false -> keep_alive
            end
    end.

%% Returns the request's `Connection` header value, lowercased. Reads from
%% `cached_decisions` when present (parser populates it once per request)
%% and falls back to a per-call lowercase for manually-built request maps.
-spec req_connection_lower(roadrunner_req:request()) -> binary().
req_connection_lower(#{cached_decisions := #{connection_lower := V}}) ->
    V;
req_connection_lower(Req) ->
    case roadrunner_req:header(~"connection", Req) of
        undefined -> ~"";
        V -> roadrunner_bin:ascii_lowercase(V)
    end.

-spec resp_connection_token(roadrunner_http:headers()) -> binary().
resp_connection_token(Headers) ->
    case header_value(~"connection", Headers) of
        undefined -> ~"";
        V -> V
    end.

-spec has_token(binary(), binary:cp()) -> boolean().
has_token(Value, TokenCp) ->
    binary:match(Value, TokenCp) =/= nomatch.

-spec init_patterns() -> ok.
init_patterns() ->
    persistent_term:put(?CLOSE_CP_KEY, binary:compile_pattern(~"close")),
    persistent_term:put(?KEEP_ALIVE_CP_KEY, binary:compile_pattern(~"keep-alive")),
    ok.

-spec header_value(binary(), roadrunner_http:headers()) -> binary() | undefined.
header_value(Name, Headers) ->
    case lists:keyfind(Name, 1, Headers) of
        {_, V} -> V;
        false -> undefined
    end.

-doc false.
-spec send_bad_request(roadrunner_transport:socket()) -> ok | {error, term()}.
send_bad_request(Socket) ->
    Resp = roadrunner_http1:response(
        400,
        [{~"content-length", ~"0"}, {~"connection", ~"close"}],
        ~""
    ),
    roadrunner_transport:send(Socket, Resp).

%% Drain up to `2 * MaxCL` bytes from the socket (counting the
%% already-buffered bytes), discarding them. Used to flush an
%% oversized in-flight body off the wire so the peer can read the
%% 413 we're about to send before we close. Bounded by `2 * MaxCL`
%% (memory) and a 1-second per-recv timeout (wall-clock) so a slow
%% peer can't pin us indefinitely.
-doc false.
-spec drain_oversized_body(binary(), roadrunner_transport:socket(), non_neg_integer()) -> ok.
drain_oversized_body(Buffered, Socket, MaxCL) ->
    Cap = 2 * MaxCL,
    drain_oversized_loop(Socket, byte_size(Buffered), Cap).

-spec drain_oversized_loop(
    roadrunner_transport:socket(), non_neg_integer(), non_neg_integer()
) -> ok.
drain_oversized_loop(_Socket, Read, Cap) when Read >= Cap ->
    ok;
drain_oversized_loop(Socket, Read, Cap) ->
    case roadrunner_transport:recv(Socket, 0, 1000) of
        {ok, Data} ->
            drain_oversized_loop(Socket, Read + byte_size(Data), Cap);
        {error, _} ->
            ok
    end.

-spec send_payload_too_large(roadrunner_transport:socket()) -> ok | {error, term()}.
send_payload_too_large(Socket) ->
    Resp = roadrunner_http1:response(
        413,
        [{~"content-length", ~"0"}, {~"connection", ~"close"}],
        ~""
    ),
    roadrunner_transport:send(Socket, Resp).

-doc false.
-spec send_not_found(roadrunner_transport:socket()) -> ok | {error, term()}.
send_not_found(Socket) ->
    Resp = roadrunner_http1:response(
        404,
        [{~"content-length", ~"0"}, {~"connection", ~"close"}],
        ~""
    ),
    roadrunner_transport:send(Socket, Resp).

-doc false.
-spec send_request_timeout(roadrunner_transport:socket()) -> ok | {error, term()}.
send_request_timeout(Socket) ->
    Resp = roadrunner_http1:response(
        408,
        [{~"content-length", ~"0"}, {~"connection", ~"close"}],
        ~""
    ),
    roadrunner_transport:send(Socket, Resp).

-doc false.
-spec send_internal_error(roadrunner_transport:socket()) -> ok | {error, term()}.
send_internal_error(Socket) ->
    Resp = roadrunner_http1:response(
        500,
        [{~"content-length", ~"0"}, {~"connection", ~"close"}],
        ~""
    ),
    roadrunner_transport:send(Socket, Resp).