Skip to main content

src/roadrunner_http2_stream_worker.erl

-module(roadrunner_http2_stream_worker).
-moduledoc false.

%% Per-stream worker process for HTTP/2 dispatch (Phase H8b).
%%
%% Spawned by `roadrunner_conn_loop_http2` once a request stream
%% finishes receiving (HEADERS + body + END_STREAM). The worker:
%%
%% 1. Resolves the route + middleware stack.
%% 2. Calls the handler.
%% 3. Translates the response shape (`{Status, Headers, Body}` or
%%    `{stream, _, _, Fun}`) into messages back to the conn process,
%%    which is the single owner of HPACK encoder state and frame I/O.
%%
%% Worker → conn protocol (synchronous round-trip per write so the
%% worker doesn't outpace flow control or the wire):
%%
%% ```
%% Worker → Conn: {h2_send_headers, Worker, Ref, StreamId, Status, Headers, EndStream}
%% Conn   → Worker: {h2_send_ack, Ref}                       %% frame written
%%
%% Worker → Conn: {h2_send_data, Worker, Ref, StreamId, Data, EndStream}
%% Conn   → Worker: {h2_send_ack, Ref}                       %% frame written
%%                   %% (delayed if the conn or stream send window was
%%                   %% closed — drained on the next WINDOW_UPDATE)
%%
%% Worker → Conn: {h2_send_trailers, Worker, Ref, StreamId, Trailers}
%% Conn   → Worker: {h2_send_ack, Ref}
%%
%% Worker → Conn: {h2_worker_done, StreamId}                 %% normal exit
%% ```
%%
%% If the peer cancels the stream (`RST_STREAM` or worker-level error
%% on the conn side), the conn sends `{h2_stream_reset, StreamId}` —
%% the worker observes this on its next sync round and exits without
%% emitting further frames.
%%
%% Crash isolation: workers are spawn_monitored (NOT linked) so a
%% handler crash resets only the affected stream — the conn
%% observes the `'DOWN'` and emits
%% `RST_STREAM(INTERNAL_ERROR)`, leaving in-flight peers untouched.

-export([start/4]).
-export([init/4]).

-doc """
Spawn a new worker for `StreamId`, monitored by the calling conn
process. The worker runs `Req` through the dispatch pipeline and
emits frame messages back to the conn. Returns the
`{Pid, MonitorRef}` so the conn can correlate the eventual `'DOWN'`
back to the stream id.

Workers are NOT linked: a handler crash should reset just the
affected stream, not tear down the whole connection. The conn
sees the `'DOWN'` and emits `RST_STREAM(INTERNAL_ERROR)` for the
stream's id, leaving the other 99 streams intact.
""".
-spec start(pid(), pos_integer(), roadrunner_req:request(), map()) ->
    {pid(), reference()}.
start(ConnPid, StreamId, Req, ProtoOpts) ->
    spawn_monitor(?MODULE, init, [ConnPid, StreamId, Req, ProtoOpts]).

-doc false.
-spec init(pid(), pos_integer(), roadrunner_req:request(), map()) -> ok.
init(ConnPid, StreamId, Req, ProtoOpts) ->
    proc_lib:set_label({roadrunner_http2_stream_worker, StreamId}),
    %% Mirror the h1 path: attach request-scoped metadata so any
    %% `?LOG_*` from middleware/handlers is auto-correlated by
    %% `request_id`. The conn process can't do this for us — the
    %% handler runs in this worker, not on the conn.
    ok = roadrunner_conn:set_request_logger_metadata(Req),
    run_handler(ConnPid, StreamId, Req, ProtoOpts),
    ConnPid ! {h2_worker_done, StreamId},
    ok.

run_handler(ConnPid, StreamId, Req, ProtoOpts) ->
    %% `dispatch` is set by listener init and always present. The
    %% matched route's `Pipeline` is a pre-composed `next()` fun
    %% (listener mws ++ per-route mws, with `state` injected up front
    %% if attached, ending in `fun Handler:handle/1`), built once at
    %% compile / `reload_routes/2` time — we just call it with the
    %% request, no per-request closure allocation.
    #{dispatch := Dispatch} = ProtoOpts,
    Metadata = telemetry_metadata(Req),
    ReqStart = roadrunner_telemetry:request_start(Metadata),
    case roadrunner_conn:resolve_handler(Dispatch, Req) of
        {ok, Handler, Bindings, Pipeline, _State} ->
            invoke(
                ConnPid,
                StreamId,
                Handler,
                Pipeline,
                Req#{bindings => Bindings},
                Metadata,
                ReqStart
            );
        not_found ->
            send_buffered(
                ConnPid, StreamId, 404, [{~"content-type", ~"text/plain"}], ~"Not Found"
            ),
            ok = roadrunner_telemetry:request_stop(ReqStart, Metadata, #{
                status => 404, response_kind => buffered
            })
    end.

invoke(ConnPid, StreamId, Handler, Pipeline, Req, Metadata, ReqStart) ->
    try Pipeline(Req) of
        {Response, _Req2} ->
            emit_handler_response(ConnPid, StreamId, Handler, Response),
            ok = roadrunner_telemetry:request_stop(ReqStart, Metadata, #{
                status => roadrunner_conn:response_status(Response),
                response_kind => roadrunner_conn:response_kind(Response)
            })
    catch
        Class:Reason:Stack ->
            ok = roadrunner_telemetry:request_exception(
                ReqStart, Metadata, Class, Reason
            ),
            logger:error(#{
                msg => "roadrunner h2 handler crashed",
                handler => Handler,
                class => Class,
                reason => Reason,
                stacktrace => Stack
            }),
            send_buffered(
                ConnPid,
                StreamId,
                500,
                [{~"content-type", ~"text/plain"}],
                ~"Internal Server Error"
            )
    end.

-spec telemetry_metadata(roadrunner_req:request()) -> roadrunner_telemetry:metadata().
telemetry_metadata(#{
    request_id := RequestId,
    peer := Peer,
    method := Method,
    target := Target,
    scheme := Scheme,
    listener_name := ListenerName
}) ->
    %% All five required keys are populated by
    %% `roadrunner_http2_request:build/6`; pattern-match destructure
    %% replaces the prior 6 `maps:get/2,3` calls per request.
    #{
        request_id => RequestId,
        peer => Peer,
        method => Method,
        path => Target,
        scheme => Scheme,
        listener_name => ListenerName
    }.

emit_handler_response(ConnPid, StreamId, _Handler, {Status, Headers, Body}) when
    is_integer(Status, 100, 599)
->
    send_buffered(ConnPid, StreamId, Status, Headers, Body);
emit_handler_response(ConnPid, StreamId, _Handler, {stream, Status, Headers, Fun}) when
    is_integer(Status, 100, 599), is_function(Fun, 1)
->
    roadrunner_http2_stream_response:run(ConnPid, StreamId, Status, Headers, Fun);
emit_handler_response(ConnPid, StreamId, Handler, {loop, Status, Headers, State}) when
    is_integer(Status, 100, 599)
->
    %% Mirrors h1's `{loop, _}` path: enter a selective-receive loop
    %% in the worker (sharing the handler's mailbox), dispatch each
    %% non-OTP message through `Handler:handle_info/3`, emit DATA
    %% frames via Push. On `{stop, _}` the worker emits an empty
    %% DATA + END_STREAM and exits.
    roadrunner_http2_loop_response:run(
        ConnPid, StreamId, Status, Headers, {Handler, State}
    );
emit_handler_response(
    ConnPid, StreamId, _Handler, {sendfile, Status, Headers, {File, Offset, Length}}
) when is_integer(Status, 100, 599) ->
    %% h2 has no kernel sendfile path. Wrap the file-read loop in a
    %% stream_fun so the existing streaming machinery handles
    %% HEADERS, DATA framing, MAX_FRAME_SIZE chunking, and per-stream
    %% / conn-level flow control. File-open failures crash the worker;
    %% the conn process RST_STREAM-s the stream, matching h1.
    Fun = fun(Send) ->
        {ok, IoDev} = file:open(File, [read, raw, binary]),
        try
            {ok, _} = file:position(IoDev, Offset),
            sendfile_loop(IoDev, Length, Send)
        after
            _ = file:close(IoDev)
        end
    end,
    roadrunner_http2_stream_response:run(ConnPid, StreamId, Status, Headers, Fun);
emit_handler_response(ConnPid, StreamId, _Handler, {websocket, _, _}) ->
    emit_501(ConnPid, StreamId).

%% h2 DATA frame payload cap. The default SETTINGS_MAX_FRAME_SIZE is
%% 16384 (RFC 9113 §6.5.2); larger reads just force the framer to
%% split, so cap at the floor.
-define(SENDFILE_CHUNK_SIZE, 16384).

sendfile_loop(_IoDev, 0, Send) ->
    Send(<<>>, fin);
sendfile_loop(IoDev, Remaining, Send) ->
    Want = min(Remaining, ?SENDFILE_CHUNK_SIZE),
    {ok, Bin} = file:read(IoDev, Want),
    case Remaining - byte_size(Bin) of
        0 ->
            Send(Bin, fin);
        NextRemaining ->
            Send(Bin, nofin),
            sendfile_loop(IoDev, NextRemaining, Send)
    end.

emit_501(ConnPid, StreamId) ->
    send_buffered(
        ConnPid,
        StreamId,
        501,
        [{~"content-type", ~"text/plain"}],
        ~"HTTP/2 does not yet support this response shape"
    ).

%% Buffered response: a single conn-side message that emits
%% HEADERS + (optional) DATA in one `ssl:send/2` for the common
%% case where the body fits in a single DATA frame AND the
%% stream's flow-control window. Conn-side handler falls back to
%% the two-step (HEADERS, then queued DATA) path for large bodies
%% or constrained windows; the worker still sees a single sync
%% round-trip in either case.
send_buffered(ConnPid, StreamId, Status, Headers, Body) ->
    sync(fun(Ref) ->
        _ = (ConnPid ! {h2_send_response, self(), Ref, StreamId, Status, Headers, Body}),
        ok
    end).

sync(SendFun) ->
    Ref = make_ref(),
    ok = SendFun(Ref),
    receive
        {h2_send_ack, Ref} -> ok;
        {h2_stream_reset, _StreamId} -> exit(stream_reset)
    end.