Skip to main content

src/livery_h1.erl

-module(livery_h1).
-moduledoc """
HTTP/1.1 adapter on top of the `h1` library.

Starts an `h1` server bound to a Livery middleware stack and
handler. For every inbound request the adapter:

1. Builds a `#livery_req{}` from the method, path, and headers
   delivered by `h1`.
2. Spawns a `livery_req_proc` worker under `livery_req_sup`.
3. Spawns a small translator that turns `{h1, Conn, _}` body and
   trailer events into the `{livery_body, Ref, _}` shape the
   worker reads via `livery_body`.
4. Registers the translator as the `h1` stream handler so the
   engine never blocks on the worker.

Response emission goes through `livery:emit/3` and lands on the
adapter callbacks (`send_headers/4`, `send_full/5`, `send_data/3`,
`send_trailers/2`, `reset/2`), which in turn call into
`h1:send_response/4`, `h1:respond/5`, `h1:send_data/3,4`, and
`h1:send_trailers/3`. Full bodies coalesce into a single
content-length write via `send_full/5` -> `h1:respond/5`.
""".

-behaviour(livery_adapter).

-include("livery.hrl").

%% Default request-body ceiling (16 MiB). Bounds how much body the
%% translator forwards into the worker mailbox; override per listener
%% with the `max_body' option (`infinity' disables it).
-define(DEFAULT_MAX_BODY, 16 * 1024 * 1024).

%% Public API
-export([start/1, accept_ws/4]).

%% livery_adapter callbacks
-export([
    start/3,
    stop/1,
    send_headers/4,
    send_full/5,
    send_data/3,
    send_trailers/2,
    reset/2,
    peer_info/1,
    capabilities/1
]).

-export_type([listen_opts/0, listener/0, stream/0]).

-type listener() :: h1:server_ref().
-type stream() :: {h1:connection(), h1:stream_id()}.

-type listen_opts() :: #{
    port => inet:port_number(),
    %% Bind address. An IPv6 8-tuple selects the inet6 family.
    ip => inet:ip_address(),
    %% Bind the IPv6 wildcard (`::') when no explicit `ip' is given.
    inet6 => boolean(),
    transport => tcp | ssl,
    cert => binary() | string(),
    key => binary() | string(),
    cacerts => [binary()],
    ssl_opts => [ssl:tls_server_option()],
    acceptors => pos_integer(),
    max_body => non_neg_integer() | infinity,
    %% Slow-client guards, passed through to `h1'. They have finite
    %% defaults there (idle_timeout 300000 ms, request_timeout 60000 ms),
    %% so a slow-loris client is already bounded; tighten for an
    %% edge-facing listener.
    idle_timeout => timeout(),
    request_timeout => timeout(),
    handshake_timeout => timeout(),
    %% Early-response inbound-drain budget (lingering close): when a
    %% handler responds before the request body is fully read, h1 drains
    %% the leftover body before closing so the client reads the response.
    %% `{MaxBytes, MaxMs}' (either component `infinity'), `0' disables.
    %% Defaults to h1's `{infinity, 30000}'. `lingering_timeout => Ms' is
    %% the time-only form.
    early_response_drain => 0 | {non_neg_integer() | infinity, non_neg_integer() | infinity},
    lingering_timeout => timeout(),
    max_keepalive_requests => pos_integer() | infinity,
    %% Shared service config, readable in handlers via livery_req:config/1.
    config => term(),
    stack := livery_middleware:stack(),
    handler := livery_middleware:handler()
}.

%%====================================================================
%% Public API
%%====================================================================

-doc """
Start a listener with the given options.

`Opts` must include `stack` and `handler`. `port` defaults to 0
(random port). `transport` defaults to `tcp`. Returns the same
listener handle the `h1` library does, suitable for passing to
`stop/1` or to `h1:server_port/1`.
""".
-spec start(listen_opts()) -> {ok, listener()} | {error, term()}.
start(Opts) when is_map(Opts) ->
    start(undefined, Opts, #{}).

%%====================================================================
%% livery_adapter callbacks
%%====================================================================

-spec start(atom() | undefined, listen_opts(), map()) ->
    {ok, listener()} | {error, term()}.
start(_Name, Opts, _StartOpts) ->
    Stack = maps:get(stack, Opts),
    Handler = maps:get(handler, Opts),
    Port = maps:get(port, Opts, 0),
    H1Opts = build_h1_opts(Opts, Stack, Handler),
    h1:start_server(Port, H1Opts).

-spec stop(listener()) -> ok.
stop(Listener) ->
    _ = h1:stop_server(Listener),
    ok.

-spec send_headers(
    stream(),
    100..599,
    [{binary(), binary()}],
    livery_adapter:send_opts()
) ->
    livery_adapter:send_result().
send_headers({Conn, StreamId}, Status, Headers, Opts) ->
    closed_guard(fun() ->
        case {maps:get(end_stream, Opts, false), drain_opts(Opts)} of
            {true, {ok, RespOpts}} ->
                %% Empty-body early response with a per-response drain budget:
                %% commit it via respond/6 so h1 honors the budget on close.
                h1:respond(Conn, StreamId, Status, Headers, <<>>, RespOpts);
            {EndStream, _} ->
                case h1:send_response(Conn, StreamId, Status, Headers) of
                    ok when EndStream -> h1:send_data(Conn, StreamId, <<>>, true);
                    ok -> ok;
                    Other -> Other
                end
        end
    end).

-spec send_full(
    stream(),
    100..599,
    [{binary(), binary()}],
    iodata(),
    livery_adapter:send_opts()
) ->
    livery_adapter:send_result().
%% Coalesce a full response (headers + body) into a single content-length
%% write via h1:respond/5, instead of the granular send_headers/send_data
%% path which frames the body as chunked. livery:emit/3 picks this up
%% because the callback is exported. A per-response early-response drain
%% budget routes through respond/6 so h1 honors it on an early close.
send_full({Conn, StreamId}, Status, Headers, IoData, Opts) ->
    closed_guard(fun() ->
        case drain_opts(Opts) of
            {ok, RespOpts} -> h1:respond(Conn, StreamId, Status, Headers, IoData, RespOpts);
            none -> h1:respond(Conn, StreamId, Status, Headers, IoData)
        end
    end).

-spec send_data(stream(), iodata(), livery_adapter:send_opts()) ->
    livery_adapter:send_result().
send_data({Conn, StreamId}, IoData, Opts) ->
    EndStream = maps:get(end_stream, Opts, false),
    closed_guard(fun() -> h1:send_data(Conn, StreamId, IoData, EndStream) end).

-spec send_trailers(stream(), [{binary(), binary()}]) ->
    livery_adapter:send_result().
send_trailers({Conn, StreamId}, Trailers) ->
    closed_guard(fun() -> h1:send_trailers(Conn, StreamId, Trailers) end).

%% A send to a connection whose process has gone away (peer closed) exits
%% the underlying gen_statem:call with noproc/normal/shutdown. Map that to
%% {error, closed} so livery:emit/3 treats it as a normal disconnect, not a
%% handler crash. See livery_h2:closed_guard/1 for the full rationale.
-spec closed_guard(fun(() -> R)) -> R | {error, closed}.
closed_guard(Fun) ->
    try
        Fun()
    catch
        exit:{noproc, _} -> {error, closed};
        exit:{normal, _} -> {error, closed};
        exit:{{shutdown, _}, _} -> {error, closed}
    end.

-spec reset(stream(), term()) -> ok.
reset({Conn, StreamId}, Reason) ->
    _ = h1:cancel_stream(Conn, StreamId, Reason),
    ok.

-spec peer_info(stream()) -> livery_adapter:peer_info().
peer_info({_Conn, _StreamId}) ->
    %% The h1 library does not surface the peer address, so it stays
    %% undefined here.
    #{peer => undefined, tls => undefined, alpn => <<"http/1.1">>}.

-spec capabilities(listener()) -> livery_adapter:capabilities().
capabilities(_Listener) ->
    #{
        trailers => true,
        extended_connect => false,
        datagrams => false,
        capsules => false
    }.

%% Translate the per-response `early_response_drain' send-opt into the
%% h1:respond/6 options map. Absent or `default' yields `none' (h1 uses
%% the listener budget); any other value produces the per-response map.
-spec drain_opts(livery_adapter:send_opts()) -> {ok, h1:respond_opts()} | none.
drain_opts(Opts) ->
    case maps:get(early_response_drain, Opts, default) of
        default -> none;
        Drain -> {ok, #{early_response_drain => to_h1(Drain)}}
    end.

%% `none' disables the drain (h1's `0'); a `{MaxBytes, MaxMs}' budget
%% passes straight through.
-spec to_h1(none | {non_neg_integer() | infinity, non_neg_integer() | infinity}) ->
    h1:early_response_drain().
to_h1(none) -> 0;
to_h1({MaxBytes, MaxMs}) -> {MaxBytes, MaxMs}.

%%====================================================================
%% WebSocket handoff (called by livery_ws:upgrade/3)
%%====================================================================

-doc """
Hand the stream's socket to the `ws` library to run a WebSocket
session.

Validates the RFC 6455 handshake headers, replies 101 via
`h1:accept_upgrade/3`, takes ownership of the raw socket, and
calls `ws:accept/5` with the supplied handler module and opts.

Returns `{ok, SessionPid}` on success or `{error, _}` on a bad
handshake or socket transfer failure.
""".
-spec accept_ws(
    stream(),
    livery_req:req(),
    module(),
    term()
) ->
    {ok, pid()} | {error, term()}.
accept_ws({Conn, StreamId}, Req, HandlerMod, Opts) ->
    Headers = livery_req:headers(Req),
    case ws_h1_upgrade:validate_request(Headers) of
        {ok, Info} ->
            RespHeaders = ws_h1_upgrade:response_headers(Info),
            case h1:accept_upgrade(Conn, StreamId, RespHeaders) of
                {ok, Socket, _BufferedBytes} ->
                    WsReq = build_ws_req(Req),
                    ws:accept(
                        ws_transport(Req),
                        Socket,
                        WsReq,
                        HandlerMod,
                        Opts
                    );
                {error, Reason} ->
                    {error, {accept_upgrade_failed, Reason}}
            end;
        {error, Reason} ->
            {error, {bad_request, Reason}}
    end.

-spec build_ws_req(livery_req:req()) -> map().
build_ws_req(Req) ->
    #{
        method => livery_req:method(Req),
        path => livery_req:path(Req),
        query => livery_req:query(Req),
        headers => livery_req:headers(Req)
    }.

%% Pick the ws transport from the connection's security, which the
%% listener records on the request (an ssl listener sets `tls'). This
%% reads the listener's own configuration rather than inspecting the
%% opaque socket representation. Mirrors `ws_client', which selects the
%% transport from the `ws://' / `wss://' scheme.
-spec ws_transport(livery_req:req()) -> ws_transport_gen_tcp | ws_transport_ssl.
ws_transport(Req) ->
    case livery_req:tls(Req) of
        undefined -> ws_transport_gen_tcp;
        _ -> ws_transport_ssl
    end.

%%====================================================================
%% Internals: per-request dispatch
%%====================================================================

-spec build_h1_opts(
    listen_opts(),
    livery_middleware:stack(),
    livery_middleware:handler()
) -> map().
build_h1_opts(Opts, Stack, Handler) ->
    MaxBody = maps:get(max_body, Opts, ?DEFAULT_MAX_BODY),
    Transport = maps:get(transport, Opts, tcp),
    Base = #{
        transport => Transport,
        handler => make_handler_fun(
            Stack, Handler, MaxBody, maps:get(config, Opts, undefined), Transport
        )
    },
    copy_keys(
        [
            ip,
            inet6,
            cert,
            key,
            cacerts,
            ssl_opts,
            acceptors,
            handshake_timeout,
            idle_timeout,
            request_timeout,
            early_response_drain,
            lingering_timeout,
            max_keepalive_requests
        ],
        Opts,
        Base
    ).

-spec copy_keys([atom()], map(), map()) -> map().
copy_keys([], _Src, Dst) ->
    Dst;
copy_keys([K | Rest], Src, Dst) ->
    case maps:find(K, Src) of
        {ok, V} -> copy_keys(Rest, Src, maps:put(K, V, Dst));
        error -> copy_keys(Rest, Src, Dst)
    end.

-spec make_handler_fun(
    livery_middleware:stack(),
    livery_middleware:handler(),
    non_neg_integer() | infinity,
    term(),
    tcp | ssl
) ->
    fun(
        (
            h1:connection(),
            h1:stream_id(),
            binary(),
            binary(),
            h1:headers()
        ) -> ok
    ).
make_handler_fun(Stack, Handler, MaxBody, Config, Transport) ->
    fun(Conn, StreamId, Method, Path, Headers) ->
        dispatch_request(
            Conn,
            StreamId,
            Method,
            Path,
            Headers,
            Stack,
            Handler,
            MaxBody,
            Config,
            Transport
        )
    end.

-spec dispatch_request(
    h1:connection(),
    h1:stream_id(),
    binary(),
    binary(),
    h1:headers(),
    livery_middleware:stack(),
    livery_middleware:handler(),
    non_neg_integer() | infinity,
    term(),
    tcp | ssl
) -> ok.
dispatch_request(
    Conn, StreamId, Method, Path, Headers, Stack, Handler, MaxBody, Config, Transport
) ->
    %% This function runs in the per-request worker spawned by
    %% h1_server. Body and trailer events arrive as
    %% `{h1_stream, StreamId, _}' messages in this process's
    %% mailbox.
    BodyRef = make_ref(),
    DiscRef = make_ref(),
    Reader = livery_body:new(BodyRef),
    {RawPath, RawQuery} = split_query(Path),
    Req = build_req(Conn, StreamId, Method, RawPath, Headers, Reader, Transport),
    %% The dispatch process (this one) runs the translator loop, so it
    %% is the disconnect notifier the handler registers with.
    Req1 = Req#livery_req{
        raw_query = RawQuery,
        notifier_pid = self(),
        disc_ref = DiscRef,
        config = Config
    },
    case
        livery_req_sup:start_request(#{
            adapter => ?MODULE,
            stream => {Conn, StreamId},
            req => Req1,
            stack => Stack,
            handler => Handler
        })
    of
        {ok, WorkerPid} ->
            WMRef = erlang:monitor(process, WorkerPid),
            %% Monitor the h1 connection process too, so a client
            %% disconnect fires even when the handler is not reading the
            %% body or emitting.
            CMRef = erlang:monitor(process, Conn),
            translate_until_done(
                StreamId, BodyRef, DiscRef, Conn, WorkerPid, WMRef, CMRef, [], false, MaxBody, 0
            );
        {error, _} ->
            reject_overload({Conn, StreamId})
    end.

%% No worker slot available (concurrency cap reached): answer 503 and
%% serve the next request instead of crashing the stream handler.
-spec reject_overload(stream()) -> ok.
reject_overload(Stream) ->
    _ = send_headers(
        Stream,
        503,
        [{<<"content-type">>, <<"text/plain; charset=utf-8">>}],
        #{end_stream => true}
    ),
    ok.

-spec build_req(
    h1:connection(),
    h1:stream_id(),
    binary(),
    binary(),
    h1:headers(),
    livery_body:reader(),
    tcp | ssl
) -> livery_req:req().
build_req(Conn, StreamId, Method, Path, Headers, Reader, Transport) ->
    livery_req:new(#{
        protocol => h1,
        method => Method,
        path => Path,
        headers => Headers,
        body => {stream, Reader},
        adapter => ?MODULE,
        stream => {Conn, StreamId},
        engine_pid => Conn,
        tls => tls_info(Transport)
    }).

%% Mark TLS connections so handlers and the WebSocket handoff can tell a
%% secure listener from a plaintext one. h1 does not surface certificate
%% details, so this is an empty map rather than undefined.
-spec tls_info(tcp | ssl) -> undefined | map().
tls_info(ssl) -> #{};
tls_info(_) -> undefined.

-spec split_query(binary()) -> {binary(), binary()}.
split_query(Path) ->
    case binary:split(Path, <<"?">>) of
        [P, Q] -> {P, Q};
        [P] -> {P, <<>>}
    end.

%%====================================================================
%% Translator: h1 messages -> livery_body protocol
%%====================================================================

%% Run inside the h1-spawned worker. Forward body and trailer events
%% to the livery_req_proc until it exits (the worker is monitored).
%% Returning from this function lets h1_server pump the next request
%% on the same connection.
-spec translate_until_done(
    h1:stream_id(),
    reference(),
    reference(),
    pid(),
    pid(),
    reference(),
    reference(),
    [fun(() -> term())],
    boolean(),
    non_neg_integer() | infinity,
    non_neg_integer() | aborted
) -> ok.
translate_until_done(
    StreamId, BodyRef, DiscRef, Conn, WorkerPid, WMRef, CMRef, Cbs, Fired, Max, Bytes
) ->
    Loop = fun(Cbs1, Fired1, Bytes1) ->
        translate_until_done(
            StreamId, BodyRef, DiscRef, Conn, WorkerPid, WMRef, CMRef, Cbs1, Fired1, Max, Bytes1
        )
    end,
    receive
        {h1_stream, StreamId, {data, <<>>, true}} ->
            WorkerPid ! {livery_body, BodyRef, eof},
            Loop(Cbs, Fired, Bytes);
        {h1_stream, StreamId, {data, Chunk, true}} ->
            case livery_body:account(Bytes, Chunk, Max) of
                {ok, Bytes1} ->
                    WorkerPid ! {livery_body, BodyRef, {data, Chunk}},
                    WorkerPid ! {livery_body, BodyRef, eof},
                    Loop(Cbs, Fired, Bytes1);
                _ ->
                    abort_body(WorkerPid, BodyRef),
                    Loop(Cbs, Fired, aborted)
            end;
        {h1_stream, StreamId, {data, Chunk, false}} ->
            case livery_body:account(Bytes, Chunk, Max) of
                {ok, Bytes1} ->
                    WorkerPid ! {livery_body, BodyRef, {data, Chunk}},
                    Loop(Cbs, Fired, Bytes1);
                aborted ->
                    Loop(Cbs, Fired, aborted);
                over ->
                    abort_body(WorkerPid, BodyRef),
                    Loop(Cbs, Fired, aborted)
            end;
        {h1_stream, StreamId, {trailers, Headers}} ->
            WorkerPid ! {livery_body, BodyRef, {trailers, Headers}},
            Loop(Cbs, Fired, Bytes);
        {h1_stream, StreamId, {stream_reset, Reason}} ->
            WorkerPid ! {livery_body, BodyRef, {reset, Reason}},
            Loop(Cbs, livery_disconnect:fire_once(Fired, WorkerPid, DiscRef, Reason, Cbs), Bytes);
        {'DOWN', CMRef, process, Conn, Reason} ->
            Loop(
                Cbs,
                livery_disconnect:fire_once(
                    Fired, WorkerPid, DiscRef, {connection_closed, Reason}, Cbs
                ),
                Bytes
            );
        {livery_on_disconnect, DiscRef, Fun} ->
            Loop(livery_disconnect:register(Fired, Fun, Cbs), Fired, Bytes);
        {'DOWN', WMRef, process, WorkerPid, _Reason} ->
            ok
    end.

%% Signal the worker that the body exceeded `max_body' and switch the
%% translator to drop-mode (the caller loops with `aborted'), which
%% bounds the worker mailbox. We deliberately do NOT reset the stream:
%% the handler can still commit a response (typically 413) and h1's
%% early-response drain reads and discards the rest of the inbound body
%% before closing, so the client reads the response instead of an RST.
%% The leftover inbound is bounded by h1's drain budget and the
%% request timeout; the worker mailbox is bounded by drop-mode.
-spec abort_body(pid(), reference()) -> ok.
abort_body(WorkerPid, BodyRef) ->
    WorkerPid ! {livery_body, BodyRef, {error, body_too_large}},
    ok.