Skip to main content

src/roadrunner_conn_loop.erl

-module(roadrunner_conn_loop).
-moduledoc false.

%% Tail-recursive per-connection process for HTTP/1.1.
%%
%% The lifecycle (`awaiting_shoot → reading_request → reading_body →
%% dispatching → finishing → loop or close`) is expressed as direct
%% function calls between phase functions. No `gen_statem` dispatch,
%% no per-request timer arms in the steady state.
%%
%% ## Two recv paths
%%
%% Default is **passive recv** — `gen_tcp:recv(Socket, 0, ChunkTimeout)`
%% in a loop with mailbox drain checks at `?DRAIN_CHECK_INTERVAL_MS`
%% (100 ms) granularity. Bypasses `gen_tcp_socket`'s gen_statem dispatch
%% on every recv (saves ~7 % CPU on the hot path: `gen:do_call/4`,
%% `recv_data_deliver/4`, `gen_statem:loop/3`, `nif_getopt/3`).
%%
%% When the listener sets `hibernate_after => Ms > 0`, the recv path
%% flips to **active-mode** (`{active, once}` + receive). The receive's
%% `after Ms` clause has a window to call `erlang:hibernate/3` between
%% keep-alive iterations so the conn's heap GCs and shrinks — only
%% `receive` supports hibernation; passive recv blocks the process
%% inside a NIF.
%%
%% ## Phase introspection vs hot-path cost
%%
%% The label set on the conn process via `proc_lib:set_label/1` is
%% written **at most twice per conn**: once at `init_loop` time
%% (`{roadrunner_conn, awaiting_shoot, ListenerName}`) and once at the
%% `shoot` handoff (`refine_conn_label/2` rewrites it to include the
%% peer). The phases AFTER `awaiting_shoot` (read_request, read_body,
%% dispatching, finishing) run in microseconds on the happy path — too
%% fast for an operator's `observer` snapshot to catch a specific phase
%% anyway. Per-phase label updates measured ~1.2 % CPU on hello (4
%% writes/req, each touching the process dictionary) and contributed
%% to run-to-run variance. Stuck conns still surface via the conn-entry
%% label and the `reading_request` idle window (where hibernation
%% parks the process).
%%
%% ## Stability features preserved
%%
%% Drain via `pg`-broadcast, slot tracking via atomics, full telemetry
%% pairing (`[roadrunner, request, start | stop | exception]` with
%% shared `StartMono`), HEAD body suppression, `Expect: 100-continue`,
%% anti-Slowloris rate-check on the request-read phase, all five
%% response shapes (buffered, `{stream, ...}`, `{loop, ...}`,
%% `{sendfile, ...}`, `{websocket, ...}`).

-export([start/2]).

%% Internal entries — invoked via `proc_lib:start/3` and via
%% `erlang:hibernate/3`. Must stay exported so the runtime can
%% re-enter them after wake-from-hibernate.
-export([init_loop/3, recv_request_bytes_hib/2]).

%% Loop-state record carried through every phase. Allocated once on
%% the transition out of `awaiting_shoot` and pattern-matched (not
%% reconstructed) thereafter, so the per-request hot path stays
%% allocation-light.
-record(loop_state, {
    socket :: roadrunner_transport:socket(),
    proto_opts :: roadrunner_conn:proto_opts(),
    listener_name :: atom(),
    %% Captured at `shoot` from `roadrunner_telemetry:listener_accept/1`.
    %% Paired with `listener_conn_close` in `exit_clean/2`.
    start_mono :: integer(),
    peer :: {inet:ip_address(), inet:port_number()} | undefined,
    scheme = http :: http | https,
    %% `first` for the first request on a fresh conn; `keep_alive` for
    %% subsequent loop-back iterations. Drives the timeout selection
    %% (request_timeout vs keep_alive_timeout) and the silent-vs-408
    %% behavior in `recv_request_bytes/2`'s `after` clause.
    phase = first :: first | keep_alive,
    %% Cumulative successfully-served requests on this conn. Bumped
    %% in `run_pipeline/4` after each successful dispatch; checked in
    %% `finishing_phase/3` against `max_keep_alive_requests`.
    requests_served = 0 :: non_neg_integer(),
    %% Bytes received but not yet parsed. Empty on first iteration;
    %% populated mid-recv when `parse_request/1` returns `{more, _}`,
    %% AND on the keep-alive loop-back when a prior request's body
    %% drain leaves post-body bytes (RFC 9112 §9.3.2 pipelining).
    buffered = <<>> :: binary(),
    %% Pre-generated CSPRNG bytes for `request_id`. Filled lazily
    %% (on first request) by `roadrunner_conn:generate_request_id/1`
    %% — 256 bytes batched, 8 bytes sliced per request. Empty
    %% buffer triggers a refill. Threaded via the loop_state to
    %% avoid process-dictionary writes.
    req_id_buffer = <<>> :: binary(),
    %% Conn-stable values pulled from `proto_opts` once at `shoot`
    %% time so the per-request hot path doesn't re-`maps:get` them.
    %% Saves ~5 hash lookups per request.
    requests_counter :: atomics:atomics_ref() | undefined,
    dispatch :: roadrunner_conn:dispatch() | undefined,
    middlewares = [] :: roadrunner_middleware:middleware_list(),
    max_content_length = 0 :: non_neg_integer(),
    max_keep_alive_requests = 1 :: pos_integer(),
    %% Anti-slowloris on the request-read phase. `min_rate` is cached
    %% from `proto_opts.min_bytes_per_second` at `shoot`. When > 0,
    %% `recv_passive/2` tracks bytes received since `recv_phase_start`
    %% and closes the conn if the running average drops below
    %% `min_rate` after a 1 s grace (matches `roadrunner_conn:rate_ok/3`).
    %% Reset at every `read_request_phase` entry (per-request window —
    %% each request gets its own grace + budget).
    min_rate = 0 :: non_neg_integer(),
    recv_phase_start = 0 :: integer(),
    recv_phase_bytes = 0 :: non_neg_integer()
}).

-spec start(roadrunner_transport:socket(), roadrunner_conn:proto_opts()) ->
    {ok, pid()}.
start(Socket, ProtoOpts) when is_map(ProtoOpts) ->
    %% Unlinked from the acceptor — a single-conn crash never propagates
    %% to the acceptor pool. Mirrors `gen_statem:start/3` (NOT start_link)
    %% so existing acceptor handoff (`controlling_process` then `! shoot`)
    %% works without modification.
    Parent = self(),
    proc_lib:start(?MODULE, init_loop, [Parent, Socket, ProtoOpts]).

-doc false.
-spec init_loop(pid(), roadrunner_transport:socket(), roadrunner_conn:proto_opts()) ->
    no_return().
init_loop(Parent, Socket, ProtoOpts) ->
    ListenerName = maps:get(listener_name, ProtoOpts, undefined),
    DrainGroup = maps:get(graceful_drain, ProtoOpts, true),
    proc_lib:set_label({roadrunner_conn, awaiting_shoot, ListenerName}),
    ok = roadrunner_conn:join_drain_group(ListenerName, DrainGroup),
    proc_lib:init_ack(Parent, {ok, self()}),
    awaiting_shoot(Socket, ProtoOpts, ListenerName).

-spec awaiting_shoot(roadrunner_transport:socket(), roadrunner_conn:proto_opts(), atom()) ->
    no_return().
awaiting_shoot(Socket, ProtoOpts, ListenerName) ->
    receive
        shoot ->
            %% Socket ownership has just transferred from the acceptor —
            %% refine the proc_lib label with the peer (which we couldn't
            %% know on init/1 because the OS-level socket wasn't ours yet).
            Peer = roadrunner_conn:peer(Socket),
            ok = roadrunner_conn:refine_conn_label(ProtoOpts, Peer),
            Scheme = roadrunner_conn:scheme(Socket),
            StartMono = roadrunner_telemetry:listener_accept(#{
                listener_name => ListenerName, peer => Peer
            }),
            case http2_negotiated(Socket) orelse h2c_only(ProtoOpts) of
                true ->
                    %% Either ALPN landed on `h2` (TLS listener) or the
                    %% listener is in prior-knowledge h2c mode (plain TCP
                    %% with `protocols => [http2]`). The HTTP/2 path takes
                    %% over; slot release and `listener_conn_close` happen
                    %% inside the h2 module.
                    roadrunner_conn_loop_http2:enter(
                        Socket, ProtoOpts, ListenerName, Peer, StartMono
                    );
                false ->
                    S = #loop_state{
                        socket = Socket,
                        proto_opts = ProtoOpts,
                        listener_name = ListenerName,
                        start_mono = StartMono,
                        peer = Peer,
                        scheme = Scheme,
                        requests_counter = maps:get(requests_counter, ProtoOpts),
                        dispatch = maps:get(dispatch, ProtoOpts),
                        middlewares = maps:get(middlewares, ProtoOpts),
                        max_content_length = maps:get(max_content_length, ProtoOpts),
                        max_keep_alive_requests = maps:get(max_keep_alive_requests, ProtoOpts),
                        min_rate = maps:get(min_bytes_per_second, ProtoOpts)
                    },
                    read_request_phase(S)
            end;
        {roadrunner_drain, _Deadline} ->
            %% Drain before `shoot` — no telemetry was fired yet (accept
            %% pairs with `shoot`), so no listener_conn_close either. Just
            %% release the slot and close the socket.
            exit_clean(Socket, ProtoOpts, undefined, undefined, ListenerName, 0, normal);
        _Stray ->
            %% Stray-msg tolerance — gen_statem drops unmatched info events
            %% silently; we do the same so a buggy library can't crash the
            %% conn with a typo'd message.
            awaiting_shoot(Socket, ProtoOpts, ListenerName)
    end.

%% True iff the TLS handshake negotiated `h2`. Plain TCP and TLS
%% sessions where the client picked `http/1.1` (or sent no ALPN)
%% fall through to HTTP/1.1. h2 ALPN is advertised when the listener
%% lists `http2` in its `protocols` opt — that derivation runs in
%% `roadrunner_transport:build_tls_opts/2` (or the user can override
%% `alpn_preferred_protocols` explicitly inside `tls`).
-spec http2_negotiated(roadrunner_transport:socket()) -> boolean().
http2_negotiated(Socket) ->
    case roadrunner_transport:negotiated_alpn(Socket) of
        {ok, ~"h2"} -> true;
        _ -> false
    end.

%% True iff this listener accepts HTTP/2 only (`protocols => [http2]`).
%% On plain TCP this forces h2c prior-knowledge dispatch (RFC 7540
%% §3.4). On TLS the ALPN handshake already drove the dispatch via
%% `http2_negotiated/1`; this short-circuit is a no-op there.
-spec h2c_only(roadrunner_conn:proto_opts()) -> boolean().
h2c_only(#{protocols := [http2]}) -> true;
h2c_only(_) -> false.

%% --- read_request phase ---
%%
%% Active-mode header read. Each iteration:
%%
%%   1. `setopts({active, once})` so the next inbound packet arrives
%%      as a `{TcpTag, Sock, Bytes}` info event.
%%   2. `receive` matches the data tag (accumulate + parse), the
%%      closed/error tags (clean exit), `{roadrunner_drain, _}`
%%      (clean exit), or stray messages (drop and re-loop).
%%   3. The receive's `after RequestTimeout` clause handles
%%      slowloris — no `start_timer` / `cancel_timer` per iteration.
%%
%% Re-entered for every keep-alive iteration; pipelined leftover from
%% the previous request is already in `buffered` and parsed first
%% before any new recv.
-spec read_request_phase(#loop_state{}) -> no_return().
read_request_phase(#loop_state{buffered = Buf} = S) ->
    Timeout = phase_timeout(S),
    %% Compute an *absolute* deadline once. Each iteration's `after`
    %% clause decays it — `recv_request_bytes/2` recomputes
    %% `Deadline - now` so a slow client that drips bytes can NOT keep
    %% extending the receive's timeout. Mirrors gen_statem's one-shot
    %% `state_timeout` semantics in a hand-rolled receive.
    Now = erlang:monotonic_time(millisecond),
    Deadline = Now + Timeout,
    %% Reset the slowloris window at every read_request_phase entry —
    %% pipelined keep-alive iterations get their own grace + budget,
    %% same shape as `roadrunner_conn:make_recv/3`.
    S1 = S#loop_state{recv_phase_start = Now, recv_phase_bytes = 0},
    case Buf of
        <<>> ->
            recv_request_bytes(S1, Deadline);
        _ ->
            %% Pipelined leftover from a prior keep-alive iteration —
            %% try parsing it before reading more bytes. If the parse
            %% returns `{more, _}` we'll fall back to recv_request_bytes.
            handle_request_bytes(S1, Deadline)
    end.

%% First requests use `request_timeout`; keep-alive loop-backs use
%% `keep_alive_timeout` (typically shorter — connections that have
%% nothing to do drop faster).
-spec phase_timeout(#loop_state{}) -> non_neg_integer().
phase_timeout(#loop_state{phase = first, proto_opts = ProtoOpts}) ->
    maps:get(request_timeout, ProtoOpts);
phase_timeout(#loop_state{phase = keep_alive, proto_opts = ProtoOpts}) ->
    maps:get(keep_alive_timeout, ProtoOpts).

%% Drain-detection cap when in passive recv. Each `gen_tcp:recv` call
%% is bounded to at most this many ms so the conn re-checks its
%% mailbox for `{roadrunner_drain, _}` between blocks. Trade-off:
%% shorter = lower drain latency, more recv NIF calls; longer =
%% the opposite. 100 ms matches typical ops-tooling expectations
%% for graceful-drain detection.
-define(DRAIN_CHECK_INTERVAL_MS, 100).

-spec recv_request_bytes(#loop_state{}, integer()) -> no_return().
recv_request_bytes(S, Deadline) ->
    case S#loop_state.proto_opts of
        #{hibernate_after := Ms} when is_integer(Ms), Ms > 0 ->
            arm_active_once(S),
            recv_with_hibernate(S, Deadline, Ms);
        #{} ->
            recv_passive(S, Deadline)
    end.

%% Passive-mode recv path (default — `hibernate_after` unset or 0).
%% Bypasses `gen_tcp_socket`'s gen_statem dispatch entirely — direct
%% `gen_tcp:recv` call into the kernel via the prim_socket NIF.
%%
%% Drain detection: zero-timeout receive before each blocking recv
%% drains pending `{roadrunner_drain, _}` and stray messages from
%% the mailbox. Each recv is capped at `?DRAIN_CHECK_INTERVAL_MS` so
%% drain delivered while we're blocked surfaces within ~100 ms.
%%
%% Request-timeout: tracked via the absolute `Deadline`. `{error,
%% timeout}` from recv with `Remaining =< chunk_timeout` means the
%% request_timeout actually fired; otherwise it's a drain-check tick
%% and we loop.
-spec recv_passive(#loop_state{}, integer()) -> no_return().
recv_passive(
    #loop_state{
        socket = Socket,
        buffered = Buf,
        min_rate = MinRate,
        recv_phase_start = PhaseStart,
        recv_phase_bytes = PhaseBytes
    } = S,
    Deadline
) ->
    case drain_mailbox_check() of
        drain ->
            exit_normal(S);
        ok ->
            Now = erlang:monotonic_time(millisecond),
            Remaining = max(0, Deadline - Now),
            ChunkTimeout = min(Remaining, ?DRAIN_CHECK_INTERVAL_MS),
            case roadrunner_transport:recv(Socket, 0, ChunkTimeout) of
                {ok, Bytes} ->
                    %% Anti-slowloris: track running bytes/sec and drop
                    %% the conn if `min_rate` isn't met after the 1 s
                    %% grace. Cheap when `min_rate = 0` because
                    %% `roadrunner_conn:rate_ok/3` short-circuits at
                    %% `MinRate * Elapsed = 0`.
                    NewBytes = PhaseBytes + byte_size(Bytes),
                    case roadrunner_conn:rate_ok(Now - PhaseStart, NewBytes, MinRate) of
                        true ->
                            handle_request_bytes(
                                S#loop_state{
                                    buffered = <<Buf/binary, Bytes/binary>>,
                                    recv_phase_bytes = NewBytes
                                },
                                Deadline
                            );
                        false ->
                            %% Slowloris — silent close. No 408; the peer
                            %% wasn't going to read it anyway.
                            exit_normal(S)
                    end;
                {error, timeout} when Remaining =< ChunkTimeout ->
                    %% True request/keep-alive timeout — the deadline
                    %% has elapsed.
                    timeout_response(S),
                    exit_normal(S);
                {error, timeout} ->
                    %% Drain-check tick. Loop back to the mailbox
                    %% drain + recv.
                    recv_passive(S, Deadline);
                {error, _} ->
                    %% Peer close, kernel error, etc. Silent exit.
                    exit_normal(S)
            end
    end.

%% Zero-timeout drain of the mailbox. Returns `drain` if a drain
%% message was queued, otherwise drops every other message and
%% returns `ok`. Stray messages don't queue forever — drained on
%% every recv iteration.
-spec drain_mailbox_check() -> drain | ok.
drain_mailbox_check() ->
    receive
        {roadrunner_drain, _Deadline} -> drain;
        _Stray -> drain_mailbox_check()
    after 0 ->
        ok
    end.

%% Recv path with hibernation. Arms ONE `send_after` for the
%% request/keep-alive deadline and uses the receive's `after` clause
%% exclusively for the hibernate trigger. On wake (any inbound
%% message) the function re-enters via `erlang:hibernate/3`'s
%% `(?MODULE, recv_request_bytes_hib, [S, Deadline])` continuation.
-spec recv_with_hibernate(#loop_state{}, integer(), pos_integer()) -> no_return().
recv_with_hibernate(
    #loop_state{
        socket = Socket,
        buffered = Buf,
        min_rate = MinRate,
        recv_phase_start = PhaseStart,
        recv_phase_bytes = PhaseBytes
    } = S,
    Deadline,
    HibernateAfter
) ->
    {DataTag, ClosedTag, ErrorTag} = roadrunner_transport:messages(Socket),
    Remaining = max(0, Deadline - erlang:monotonic_time(millisecond)),
    DeadlineRef = erlang:send_after(Remaining, self(), {?MODULE, deadline_fired}),
    receive
        {DataTag, _Sock, Bytes} ->
            _ = erlang:cancel_timer(DeadlineRef),
            NewBytes = PhaseBytes + byte_size(Bytes),
            Now = erlang:monotonic_time(millisecond),
            case roadrunner_conn:rate_ok(Now - PhaseStart, NewBytes, MinRate) of
                true ->
                    handle_request_bytes(
                        S#loop_state{
                            buffered = <<Buf/binary, Bytes/binary>>,
                            recv_phase_bytes = NewBytes
                        },
                        Deadline
                    );
                false ->
                    exit_normal(S)
            end;
        {ClosedTag, _Sock} ->
            _ = erlang:cancel_timer(DeadlineRef),
            exit_normal(S);
        {ErrorTag, _Sock, _Reason} ->
            _ = erlang:cancel_timer(DeadlineRef),
            exit_normal(S);
        {roadrunner_drain, _Deadline} ->
            _ = erlang:cancel_timer(DeadlineRef),
            exit_normal(S);
        {?MODULE, deadline_fired} ->
            timeout_response(S),
            exit_normal(S);
        _Stray ->
            _ = erlang:cancel_timer(DeadlineRef),
            arm_active_once(S),
            recv_with_hibernate(S, Deadline, HibernateAfter)
    after HibernateAfter ->
        %% Idle window elapsed — drop the deadline timer (it'll
        %% be re-armed when the conn wakes) and hibernate. The
        %% process's heap GCs and shrinks; the next inbound
        %% TCP packet wakes it.
        _ = erlang:cancel_timer(DeadlineRef),
        erlang:hibernate(?MODULE, recv_request_bytes_hib, [S, Deadline])
    end.

%% Hibernate continuation. Re-enters `recv_request_bytes/2` so
%% the next iteration picks up wherever the recv shape demands
%% (with or without hibernation, depending on `hibernate_after`).
-doc false.
-spec recv_request_bytes_hib(#loop_state{}, integer()) -> no_return().
recv_request_bytes_hib(S, Deadline) ->
    recv_request_bytes(S, Deadline).

%% Slowloris on first request → 408. Idle keep-alive timeout →
%% silent close (peer wasn't waiting on bytes).
-spec timeout_response(#loop_state{}) -> ok.
timeout_response(#loop_state{phase = first, socket = Socket}) ->
    _ = roadrunner_conn:send_request_timeout(Socket),
    ok;
timeout_response(#loop_state{phase = keep_alive}) ->
    ok.

%% Try parsing the accumulated buffer. On `{more, _}` we re-arm and
%% wait for more bytes; on `{ok, Req, Rest}` we've parsed a full
%% request and dispatch into the body/handler phase; on `{error, _}`
%% we send 400 and exit.
-spec handle_request_bytes(#loop_state{}, integer()) -> no_return().
handle_request_bytes(
    #loop_state{
        socket = Socket,
        listener_name = ListenerName,
        peer = Peer,
        scheme = Scheme,
        requests_counter = ReqCounter
    } = S,
    Deadline
) ->
    Buf = S#loop_state.buffered,
    case roadrunner_http1:parse_request(Buf) of
        {ok, Req0, Rest} ->
            _ = atomics:add(ReqCounter, 1, 1),
            {RequestId, NewBuffer} = roadrunner_conn:generate_request_id(
                S#loop_state.req_id_buffer
            ),
            Req = Req0#{
                peer => Peer,
                scheme => Scheme,
                request_id => RequestId,
                listener_name => ListenerName
            },
            ok = roadrunner_conn:set_request_logger_metadata(Req),
            ok = roadrunner_conn:maybe_send_continue(Socket, Req, Rest),
            read_body_phase(
                S#loop_state{buffered = Rest, req_id_buffer = NewBuffer}, Req, Deadline
            );
        {more, _} ->
            %% Need more bytes to complete the request line/headers.
            %% `recv_request_bytes/2` dispatches to passive (default)
            %% or active+hibernate; either path arms whatever it
            %% needs. Don't call `arm_active_once` here — passive
            %% mode would silently leak the resulting `{tcp, _, _}`
            %% delivery into the mailbox.
            recv_request_bytes(S, Deadline);
        {error, Reason} ->
            logger:debug(#{
                msg => "roadrunner rejecting malformed request",
                peer => Peer,
                listener_name => ListenerName,
                reason => Reason
            }),
            ok = roadrunner_telemetry:request_rejected(#{
                listener_name => ListenerName,
                peer => Peer,
                reason => Reason
            }),
            _ = roadrunner_conn:send_bad_request(Socket),
            exit_normal(S)
    end.

%% --- read_body phase ---
%%
%% Auto-buffering reads the full body synchronously via the recv
%% closure (passive `gen_tcp:recv` with the request_timeout deadline).
%% Manual buffering builds a body_reader the handler will pull from
%% via `roadrunner_req:read_body/1,2`.
-spec read_body_phase(#loop_state{}, roadrunner_req:request(), integer()) -> no_return().
read_body_phase(
    #loop_state{
        socket = Socket,
        proto_opts = ProtoOpts,
        max_content_length = MaxCL
    } = S,
    Req,
    Deadline
) ->
    MinRate = maps:get(min_bytes_per_second, ProtoOpts),
    Recv = roadrunner_conn:make_recv(Socket, Deadline, MinRate),
    Buffered = S#loop_state.buffered,
    case maps:get(body_buffering, ProtoOpts) of
        auto ->
            case roadrunner_conn:read_body(Req, Buffered, Recv, MaxCL) of
                {ok, Body, Leftover} ->
                    %% Leftover is bytes past the body — feed it
                    %% back to the next read_request_phase iteration
                    %% as pipelined leftover.
                    dispatch_phase(S#loop_state{buffered = Leftover}, Req#{body => Body});
                {error, content_length_too_large} ->
                    _ = roadrunner_conn:drain_oversized_body(Buffered, Socket, MaxCL),
                    ok = rejection(S, content_length_too_large),
                    _ = roadrunner_conn:send_payload_too_large(Socket),
                    exit_normal(S);
                {error, request_timeout} ->
                    _ = roadrunner_conn:send_request_timeout(Socket),
                    exit_normal(S);
                {error, slow_client} ->
                    exit_normal(S);
                {error, BodyReason} ->
                    ok = rejection(S, BodyReason),
                    _ = roadrunner_conn:send_bad_request(Socket),
                    exit_normal(S)
            end;
        manual ->
            case roadrunner_conn:body_framing(Req) of
                {error, FramingReason} ->
                    ok = rejection(S, FramingReason),
                    _ = roadrunner_conn:send_bad_request(Socket),
                    exit_normal(S);
                Framing ->
                    BodyState = roadrunner_conn:make_body_reader(
                        Framing, Buffered, Recv, MaxCL
                    ),
                    dispatch_phase(S, Req#{body_reader => BodyState})
            end
    end.

%% --- dispatch phase ---
%%
%% Resolve handler (single-handler or routed), build the middleware
%% pipeline, run it bracketed by request_start / request_stop |
%% request_exception telemetry. The 5 response shapes (buffered,
%% stream, loop, sendfile, websocket) dispatch to their respective
%% writers via `dispatch_response/4`.
-spec dispatch_phase(#loop_state{}, roadrunner_req:request()) -> no_return().
dispatch_phase(
    #loop_state{
        socket = Socket,
        dispatch = Dispatch
    } = S,
    Req
) ->
    case roadrunner_conn:resolve_handler(Dispatch, Req) of
        {ok, Handler, Bindings, Pipeline, _State} ->
            run_pipeline(S, Handler, Req#{bindings => Bindings}, Pipeline);
        not_found ->
            _ = roadrunner_conn:send_not_found(Socket),
            exit_normal(S)
    end.

-spec run_pipeline(
    #loop_state{},
    module(),
    roadrunner_req:request(),
    roadrunner_middleware:next()
) -> no_return().
run_pipeline(#loop_state{socket = Socket} = S, Handler, Req, Pipeline) ->
    Metadata = telemetry_metadata(Req),
    ReqStart = roadrunner_telemetry:request_start(Metadata),
    try Pipeline(Req) of
        {Response, Req2} when is_map(Req2) ->
            _ = dispatch_response(Socket, Handler, Req2, Response),
            ok = roadrunner_telemetry:request_stop(ReqStart, Metadata, #{
                status => roadrunner_conn:response_status(Response),
                response_kind => roadrunner_conn:response_kind(Response)
            }),
            finishing_phase(
                S#loop_state{requests_served = S#loop_state.requests_served + 1},
                Req2,
                Response
            )
    catch
        Class:Reason:Stack ->
            ok = roadrunner_telemetry:request_exception(
                ReqStart, Metadata, Class, Reason
            ),
            logger:error(#{
                msg => "roadrunner handler crashed",
                handler => Handler,
                class => Class,
                reason => Reason,
                stacktrace => Stack
            }),
            _ = roadrunner_conn:send_internal_error(Socket),
            exit_normal(S)
    end.

%% Dispatches the 5 response shapes that match the
%% `roadrunner_handler:result/0` type. Stream / loop /
%% sendfile / websocket force connection close (the underlying
%% writers manage their own keep-alive semantics — generally none).
-spec dispatch_response(
    roadrunner_transport:socket(),
    module(),
    roadrunner_req:request(),
    roadrunner_handler:response()
) -> ok.
dispatch_response(Socket, _Handler, Req, {websocket, Mod, State}) when is_atom(Mod) ->
    _ = roadrunner_ws_session:run(Socket, Req, Mod, State),
    ok;
dispatch_response(Socket, _Handler, _Req, {stream, Status, Headers0, Fun}) when
    is_function(Fun, 1)
->
    Headers = with_date(Headers0),
    _ = roadrunner_stream_response:run(Socket, Status, Headers, Fun),
    ok;
dispatch_response(Socket, Handler, _Req, {loop, Status, Headers0, LoopState}) when
    is_integer(Status)
->
    Headers = with_date(Headers0),
    _ = roadrunner_loop_response:run(Socket, Status, Headers, Handler, LoopState),
    ok;
dispatch_response(
    Socket, _Handler, Req, {sendfile, Status, Headers0, {Filename, Offset, Length}}
) when
    is_integer(Status)
->
    Headers = with_date(Headers0),
    Head = roadrunner_http1:response(Status, Headers, ~""),
    _ = roadrunner_telemetry:response_send(
        roadrunner_transport:send(Socket, Head), sendfile_response_head
    ),
    _ =
        case roadrunner_req:method(Req) of
            ~"HEAD" ->
                ok;
            _ ->
                roadrunner_telemetry:response_send(
                    roadrunner_transport:sendfile(Socket, Filename, Offset, Length),
                    sendfile_body
                )
        end,
    ok;
%% Buffered (3-tuple) response shape. RFC 9110 §9.3.2: HEAD must NOT
%% include a message body — match on `method := ~"HEAD"` in the
%% function head and emit the response with an empty body. Free
%% pattern-match dispatch (no `maps:get(method, _)` per response).
dispatch_response(Socket, _Handler, #{method := ~"HEAD"}, {Status, Headers0, _Body}) when
    is_integer(Status)
->
    Headers = with_date(Headers0),
    Resp = roadrunner_http1:response(Status, Headers, ~""),
    _ = roadrunner_telemetry:response_send(
        roadrunner_transport:send(Socket, Resp), buffered_response
    ),
    ok;
dispatch_response(Socket, _Handler, _Req, {Status, Headers0, Body}) when is_integer(Status) ->
    Headers = with_date(Headers0),
    Resp = roadrunner_http1:response(Status, Headers, Body),
    _ = roadrunner_telemetry:response_send(
        roadrunner_transport:send(Socket, Resp), buffered_response
    ),
    ok.

%% RFC 9110 §6.6.1: an origin server MUST emit `Date` on every
%% response. We always have a clock (BEAM `erlang:system_time/0`),
%% so inject Date unless the handler already set it. dispatch_response
%% only fires for handler-emitted responses (status ≥ 200); 1xx
%% interim responses are emitted by the framework directly without
%% routing through this path, so we don't need a 1xx skip here.
-spec with_date(roadrunner_http:headers()) -> roadrunner_http:headers().
with_date(Headers) ->
    case lists:keymember(~"date", 1, Headers) of
        true -> Headers;
        false -> [{~"date", roadrunner_http:http_date_now()} | Headers]
    end.

%% --- finishing phase ---
%%
%% Drain any unread manual-mode body bytes, then decide keep-alive
%% vs close. Stream / loop / sendfile / websocket all force close
%% (their writers manage their own lifecycle); only buffered
%% (3-tuple) responses are eligible for keep-alive.
%%
%% On keep-alive, recurse into `read_request_phase` with phase
%% flipped to `keep_alive`, the request-specific scratch fields
%% reset to defaults, and any pipelined leftover bytes carried in
%% `buffered` so the next iteration can parse them without waiting
%% on more inbound packets.
%% Pattern-matched on the response shape directly — saves the
%% cross-module call to `roadrunner_conn:response_kind/1` and the
%% subsequent `case` dispatch. Buffered (3-tuple) is the only shape
%% eligible for keep-alive; stream/loop/sendfile/websocket writers
%% own the wire from here and force close.
-spec finishing_phase(#loop_state{}, roadrunner_req:request(), roadrunner_handler:response()) ->
    no_return().
finishing_phase(S, Req, {Status, Headers, _Body}) when is_integer(Status) ->
    buffered_finish(S, Req, Headers);
finishing_phase(S, Req, _Response) ->
    %% Stream / loop / sendfile / websocket: writer owns the wire.
    %% In auto-mode the body is already fully read; manual-mode
    %% may have leftover that needs draining.
    _ = drain_body_if_manual(Req),
    exit_normal(S).

-spec buffered_finish(
    #loop_state{},
    roadrunner_req:request(),
    roadrunner_http:headers()
) -> no_return().
buffered_finish(S, Req, Headers) ->
    case drain_body_if_manual(Req) of
        {ok, ManualLeftover} ->
            case roadrunner_conn:keep_alive_decision(Req, Headers) of
                close ->
                    exit_normal(S);
                keep_alive ->
                    Served = S#loop_state.requests_served,
                    case Served >= S#loop_state.max_keep_alive_requests of
                        true ->
                            exit_normal(S);
                        false ->
                            Leftover = pipelined_leftover(Req, S, ManualLeftover),
                            read_request_phase(S#loop_state{
                                phase = keep_alive,
                                buffered = Leftover
                            })
                    end
            end;
        {error, _} ->
            %% Drain failure — close. Manual-mode handlers can leave the
            %% body_reader in a broken state if they read past EOF.
            exit_normal(S)
    end.

%% Manual-mode body_reader owns its post-body leftover (returned by
%% `drain_body/1`). Auto-mode stashes the leftover in the loop state's
%% `buffered` field — set by `read_body_phase` on `read_body/4` return.
-spec pipelined_leftover(roadrunner_req:request(), #loop_state{}, binary()) -> binary().
pipelined_leftover(Req, _S, ManualLeftover) when is_map_key(body_reader, Req) ->
    ManualLeftover;
pipelined_leftover(_Req, #loop_state{buffered = Buf}, _ManualLeftover) ->
    Buf.

%% In auto-mode the body has already been fully read by
%% `read_body_phase` — the request map has no `body_reader` key, and
%% `roadrunner_conn:drain_body/1`'s second clause would just return
%% `{ok, <<>>}`. Skip the call entirely on auto. Manual-mode (where
%% the handler may have left bytes unread) goes through the real
%% `drain_body/1` which consumes the body_reader.
-spec drain_body_if_manual(roadrunner_req:request()) ->
    {ok, binary()} | {error, term()}.
drain_body_if_manual(#{body_reader := _} = Req) ->
    roadrunner_conn:drain_body(Req);
drain_body_if_manual(_Req) ->
    {ok, <<>>}.

-spec telemetry_metadata(roadrunner_req:request()) -> roadrunner_telemetry:metadata().
telemetry_metadata(Req) ->
    #{
        request_id => maps:get(request_id, Req),
        peer => maps:get(peer, Req),
        method => maps:get(method, Req),
        path => maps:get(target, Req),
        scheme => maps:get(scheme, Req),
        listener_name => maps:get(listener_name, Req, undefined)
    }.

-spec rejection(#loop_state{}, atom()) -> ok.
rejection(#loop_state{listener_name = ListenerName, peer = Peer}, Reason) ->
    roadrunner_telemetry:request_rejected(#{
        listener_name => ListenerName, peer => Peer, reason => Reason
    }).

-spec arm_active_once(#loop_state{}) -> ok.
arm_active_once(#loop_state{socket = Socket} = S) ->
    %% On socket failure (peer RST during the gap, kernel-side close)
    %% we can't `setopts` — exit cleanly instead of crashing.
    case roadrunner_transport:setopts(Socket, [{active, once}]) of
        ok -> ok;
        {error, _} -> exit_normal(S)
    end.

%% Convenience wrapper around `exit_clean/7` that pulls fields off
%% the loop state. Most exit paths in the read/dispatch/finishing
%% phases fire `listener_conn_close` (accept already fired during
%% `shoot`).
-spec exit_normal(#loop_state{}) -> no_return().
exit_normal(#loop_state{
    socket = Socket,
    proto_opts = ProtoOpts,
    listener_name = ListenerName,
    start_mono = StartMono,
    peer = Peer,
    requests_served = Served
}) ->
    exit_clean(Socket, ProtoOpts, StartMono, Peer, ListenerName, Served, normal).

%% Funnel for every clean exit path. Fires the paired
%% listener_conn_close telemetry (only if accept already fired),
%% releases the listener slot, closes the socket, and exits with the
%% supplied Reason.
%%
%% **Limitation**: under `exit(Pid, kill)` this funnel is skipped — the
%% runtime tears the process down without giving it a chance to run
%% any cleanup. Same OTP semantics as `terminate/3` callbacks. Bounded
%% by `max_clients` and reset on listener restart.
-spec exit_clean(
    roadrunner_transport:socket(),
    roadrunner_conn:proto_opts(),
    integer() | undefined,
    {inet:ip_address(), inet:port_number()} | undefined,
    atom(),
    non_neg_integer(),
    term()
) -> no_return().
exit_clean(Socket, ProtoOpts, StartMono, Peer, ListenerName, Served, Reason) ->
    case StartMono of
        undefined ->
            ok;
        _ ->
            roadrunner_telemetry:listener_conn_close(StartMono, #{
                listener_name => ListenerName,
                peer => Peer,
                requests_served => Served
            })
    end,
    ok = roadrunner_conn:release_slot(ProtoOpts),
    ok = roadrunner_transport:close(Socket),
    exit(Reason).