Skip to main content

src/livery_body.erl

-module(livery_body).
-moduledoc """
Streaming request-body reader.

The adapter delivers body chunks, trailers, end-of-body, and reset
notifications as messages to the per-request process:

- `{livery_body, Ref, {data, IoData}}`
- `{livery_body, Ref, {trailers, Headers}}`
- `{livery_body, Ref, eof}`
- `{livery_body, Ref, {reset, Reason}}`

A reader is a small value held by the handler that knows the
stream reference and tracks terminal state. `read/2` returns the
next chunk, blocking on the mailbox up to a caller-supplied
timeout. `read_all/1,2` drains the entire body.

Backpressure is per-chunk: the handler reads one chunk per call,
so the engine can size its windows accordingly. Real demand
signaling lands with the H1 adapter; this module exposes
`signal_demand/2` as the hook.
""".

-export([
    new/0,
    new/1,
    new/2,
    ref/1,
    source/1,
    ended/1,
    trailers/1,
    read/2,
    read_all/1,
    read_all/2,
    read_all/3,
    discard/1,
    discard/2,
    signal_demand/2,
    account/3
]).

%% Default ceiling for read_all/1,2 (16 MiB); read_all/3 overrides it.
-define(DEFAULT_MAX_BODY, 16 * 1024 * 1024).

-export_type([reader/0, read_result/0, error_reason/0]).

-record(reader, {
    ref :: reference(),
    source :: pid() | undefined,
    ended = false :: boolean(),
    trailers :: undefined | [{binary(), binary()}],
    error :: undefined | error_reason()
}).

-opaque reader() :: #reader{}.

-type error_reason() ::
    timeout
    | body_too_large
    | {limit, max_size}
    | {client_reset, term()}.

-type read_result() ::
    {ok, iodata(), reader()}
    | {done, reader()}
    | {error, error_reason(), reader()}.

%%====================================================================
%% Construction
%%====================================================================

-doc "Reader with a fresh reference and no demand source.".
-spec new() -> reader().
new() ->
    #reader{ref = make_ref()}.

-doc "Reader for the given reference, no demand source.".
-spec new(reference()) -> reader().
new(Ref) when is_reference(Ref) ->
    #reader{ref = Ref}.

-doc "Reader bound to a reference and an adapter source pid.".
-spec new(reference(), pid()) -> reader().
new(Ref, Source) when is_reference(Ref), is_pid(Source) ->
    #reader{ref = Ref, source = Source}.

%%====================================================================
%% Accessors
%%====================================================================

-spec ref(reader()) -> reference().
ref(#reader{ref = R}) -> R.

-spec source(reader()) -> pid() | undefined.
source(#reader{source = S}) -> S.

-spec ended(reader()) -> boolean().
ended(#reader{ended = E}) -> E.

-spec trailers(reader()) -> undefined | [{binary(), binary()}].
trailers(#reader{trailers = T}) -> T.

%%====================================================================
%% Reading
%%====================================================================

-doc "Read one chunk from the body, blocking up to `Timeout`.".
-spec read(reader(), timeout()) -> read_result().
read(#reader{ended = true} = R, _Timeout) ->
    {done, R};
read(#reader{error = E} = R, _Timeout) when E =/= undefined ->
    {error, E, R};
read(#reader{ref = Ref} = R, Timeout) ->
    receive
        {livery_body, Ref, {data, Chunk}} ->
            {ok, Chunk, R};
        {livery_body, Ref, eof} ->
            {done, R#reader{ended = true}};
        {livery_body, Ref, {trailers, Hs}} ->
            {done, R#reader{ended = true, trailers = Hs}};
        {livery_body, Ref, {reset, Reason}} ->
            E = {client_reset, Reason},
            {error, E, R#reader{error = E}};
        {livery_body, Ref, {error, Reason}} ->
            {error, Reason, R#reader{error = Reason}}
    after Timeout ->
        {error, timeout, R}
    end.

-doc """
Drain the entire body, returning the concatenated bytes.

Buffers at most 16 MiB by default; a larger body yields
`{error, {limit, max_size}, _}` so a handler reading an unbounded client
body cannot exhaust memory. Use `read_all/3` to raise or disable the cap.
""".
-spec read_all(reader()) -> {ok, binary(), reader()} | {error, error_reason(), reader()}.
read_all(R) ->
    read_all(R, 5000, ?DEFAULT_MAX_BODY).

-doc "`read_all/1` with an explicit per-chunk timeout (16 MiB cap).".
-spec read_all(reader(), timeout()) ->
    {ok, binary(), reader()} | {error, error_reason(), reader()}.
read_all(R, Timeout) ->
    read_all(R, Timeout, ?DEFAULT_MAX_BODY).

-doc "`read_all/2` with an explicit byte cap (`infinity` disables it).".
-spec read_all(reader(), timeout(), non_neg_integer() | infinity) ->
    {ok, binary(), reader()} | {error, error_reason(), reader()}.
read_all(R, Timeout, Max) ->
    read_all_loop(R, Timeout, Max, 0, []).

-spec read_all_loop(
    reader(), timeout(), non_neg_integer() | infinity, non_neg_integer(), [iodata()]
) ->
    {ok, binary(), reader()} | {error, error_reason(), reader()}.
read_all_loop(R, Timeout, Max, Seen, Acc) ->
    case read(R, Timeout) of
        {ok, Chunk, R1} ->
            Seen1 = Seen + iolist_size(Chunk),
            case is_integer(Max) andalso Seen1 > Max of
                true ->
                    E = {limit, max_size},
                    {error, E, R1#reader{error = E}};
                false ->
                    read_all_loop(R1, Timeout, Max, Seen1, [Chunk | Acc])
            end;
        {done, R1} ->
            {ok, iolist_to_binary(lists:reverse(Acc)), R1};
        {error, E, R1} ->
            {error, E, R1}
    end.

-doc "Drop the remainder of the body.".
-spec discard(reader()) -> {ok, reader()}.
discard(R) -> discard(R, 1000).

-doc "`discard/1` with an explicit per-chunk timeout.".
-spec discard(reader(), timeout()) -> {ok, reader()}.
discard(R, Timeout) ->
    case read(R, Timeout) of
        {ok, _, R1} -> discard(R1, Timeout);
        {done, R1} -> {ok, R1};
        {error, _, R1} -> {ok, R1}
    end.

%%====================================================================
%% Backpressure hook
%%====================================================================

-doc """
Hint to the adapter that the handler is ready for more bytes.

A no-op when no source pid is registered. Real adapters will
translate this into engine-level demand (h1 read_size, h2 flow
control, h3 receive credit).
""".
-spec signal_demand(reader(), non_neg_integer()) -> ok.
signal_demand(#reader{source = undefined}, _N) ->
    ok;
signal_demand(#reader{source = Pid, ref = Ref}, N) when
    is_pid(Pid), is_integer(N), N >= 0
->
    Pid ! {livery_body_demand, Ref, N},
    ok.

%%====================================================================
%% Ingestion ceiling
%%====================================================================

-doc """
Account `Chunk` against a running byte total and a ceiling.

Used by the adapter translators to bound how much request body they
forward into the per-request worker's mailbox. Returns `{ok, NewTotal}`
while under `Max`, `over` once the ceiling is exceeded, and threads an
`aborted` sentinel through once the stream has been cut so later chunks
are ignored. `Max` may be `infinity` to disable the ceiling.
""".
-spec account(non_neg_integer() | aborted, iodata(), non_neg_integer() | infinity) ->
    {ok, non_neg_integer()} | over | aborted.
account(aborted, _Chunk, _Max) ->
    aborted;
account(_Bytes, _Chunk, infinity) ->
    {ok, 0};
account(Bytes, Chunk, Max) ->
    Total = Bytes + iolist_size(Chunk),
    case Total > Max of
        true -> over;
        false -> {ok, Total}
    end.