-module(livery_body).
-moduledoc """
Streaming request-body reader.
The adapter delivers body chunks, trailers, end-of-body, and reset
notifications as messages to the per-request process:
- `{livery_body, Ref, {data, IoData}}`
- `{livery_body, Ref, {trailers, Headers}}`
- `{livery_body, Ref, eof}`
- `{livery_body, Ref, {reset, Reason}}`
A reader is a small value held by the handler that knows the
stream reference and tracks terminal state. `read/2` returns the
next chunk, blocking on the mailbox up to a caller-supplied
timeout. `read_all/1,2` drains the entire body.
Backpressure is per-chunk: the handler reads one chunk per call,
so the engine can size its windows accordingly. Real demand
signaling lands with the H1 adapter; this module exposes
`signal_demand/2` as the hook.
""".
-export([
new/0,
new/1,
new/2,
ref/1,
source/1,
ended/1,
trailers/1,
read/2,
read_all/1,
read_all/2,
read_all/3,
discard/1,
discard/2,
signal_demand/2,
account/3
]).
%% Default ceiling for read_all/1,2 (16 MiB); read_all/3 overrides it.
-define(DEFAULT_MAX_BODY, 16 * 1024 * 1024).
-export_type([reader/0, read_result/0, error_reason/0]).
-record(reader, {
ref :: reference(),
source :: pid() | undefined,
ended = false :: boolean(),
trailers :: undefined | [{binary(), binary()}],
error :: undefined | error_reason()
}).
-opaque reader() :: #reader{}.
-type error_reason() ::
timeout
| body_too_large
| {limit, max_size}
| {client_reset, term()}.
-type read_result() ::
{ok, iodata(), reader()}
| {done, reader()}
| {error, error_reason(), reader()}.
%%====================================================================
%% Construction
%%====================================================================
-doc "Reader with a fresh reference and no demand source.".
-spec new() -> reader().
new() ->
#reader{ref = make_ref()}.
-doc "Reader for the given reference, no demand source.".
-spec new(reference()) -> reader().
new(Ref) when is_reference(Ref) ->
#reader{ref = Ref}.
-doc "Reader bound to a reference and an adapter source pid.".
-spec new(reference(), pid()) -> reader().
new(Ref, Source) when is_reference(Ref), is_pid(Source) ->
#reader{ref = Ref, source = Source}.
%%====================================================================
%% Accessors
%%====================================================================
-spec ref(reader()) -> reference().
ref(#reader{ref = R}) -> R.
-spec source(reader()) -> pid() | undefined.
source(#reader{source = S}) -> S.
-spec ended(reader()) -> boolean().
ended(#reader{ended = E}) -> E.
-spec trailers(reader()) -> undefined | [{binary(), binary()}].
trailers(#reader{trailers = T}) -> T.
%%====================================================================
%% Reading
%%====================================================================
-doc "Read one chunk from the body, blocking up to `Timeout`.".
-spec read(reader(), timeout()) -> read_result().
read(#reader{ended = true} = R, _Timeout) ->
{done, R};
read(#reader{error = E} = R, _Timeout) when E =/= undefined ->
{error, E, R};
read(#reader{ref = Ref} = R, Timeout) ->
receive
{livery_body, Ref, {data, Chunk}} ->
{ok, Chunk, R};
{livery_body, Ref, eof} ->
{done, R#reader{ended = true}};
{livery_body, Ref, {trailers, Hs}} ->
{done, R#reader{ended = true, trailers = Hs}};
{livery_body, Ref, {reset, Reason}} ->
E = {client_reset, Reason},
{error, E, R#reader{error = E}};
{livery_body, Ref, {error, Reason}} ->
{error, Reason, R#reader{error = Reason}}
after Timeout ->
{error, timeout, R}
end.
-doc """
Drain the entire body, returning the concatenated bytes.
Buffers at most 16 MiB by default; a larger body yields
`{error, {limit, max_size}, _}` so a handler reading an unbounded client
body cannot exhaust memory. Use `read_all/3` to raise or disable the cap.
""".
-spec read_all(reader()) -> {ok, binary(), reader()} | {error, error_reason(), reader()}.
read_all(R) ->
read_all(R, 5000, ?DEFAULT_MAX_BODY).
-doc "`read_all/1` with an explicit per-chunk timeout (16 MiB cap).".
-spec read_all(reader(), timeout()) ->
{ok, binary(), reader()} | {error, error_reason(), reader()}.
read_all(R, Timeout) ->
read_all(R, Timeout, ?DEFAULT_MAX_BODY).
-doc "`read_all/2` with an explicit byte cap (`infinity` disables it).".
-spec read_all(reader(), timeout(), non_neg_integer() | infinity) ->
{ok, binary(), reader()} | {error, error_reason(), reader()}.
read_all(R, Timeout, Max) ->
read_all_loop(R, Timeout, Max, 0, []).
-spec read_all_loop(
reader(), timeout(), non_neg_integer() | infinity, non_neg_integer(), [iodata()]
) ->
{ok, binary(), reader()} | {error, error_reason(), reader()}.
read_all_loop(R, Timeout, Max, Seen, Acc) ->
case read(R, Timeout) of
{ok, Chunk, R1} ->
Seen1 = Seen + iolist_size(Chunk),
case is_integer(Max) andalso Seen1 > Max of
true ->
E = {limit, max_size},
{error, E, R1#reader{error = E}};
false ->
read_all_loop(R1, Timeout, Max, Seen1, [Chunk | Acc])
end;
{done, R1} ->
{ok, iolist_to_binary(lists:reverse(Acc)), R1};
{error, E, R1} ->
{error, E, R1}
end.
-doc "Drop the remainder of the body.".
-spec discard(reader()) -> {ok, reader()}.
discard(R) -> discard(R, 1000).
-doc "`discard/1` with an explicit per-chunk timeout.".
-spec discard(reader(), timeout()) -> {ok, reader()}.
discard(R, Timeout) ->
case read(R, Timeout) of
{ok, _, R1} -> discard(R1, Timeout);
{done, R1} -> {ok, R1};
{error, _, R1} -> {ok, R1}
end.
%%====================================================================
%% Backpressure hook
%%====================================================================
-doc """
Hint to the adapter that the handler is ready for more bytes.
A no-op when no source pid is registered. Real adapters will
translate this into engine-level demand (h1 read_size, h2 flow
control, h3 receive credit).
""".
-spec signal_demand(reader(), non_neg_integer()) -> ok.
signal_demand(#reader{source = undefined}, _N) ->
ok;
signal_demand(#reader{source = Pid, ref = Ref}, N) when
is_pid(Pid), is_integer(N), N >= 0
->
Pid ! {livery_body_demand, Ref, N},
ok.
%%====================================================================
%% Ingestion ceiling
%%====================================================================
-doc """
Account `Chunk` against a running byte total and a ceiling.
Used by the adapter translators to bound how much request body they
forward into the per-request worker's mailbox. Returns `{ok, NewTotal}`
while under `Max`, `over` once the ceiling is exceeded, and threads an
`aborted` sentinel through once the stream has been cut so later chunks
are ignored. `Max` may be `infinity` to disable the ceiling.
""".
-spec account(non_neg_integer() | aborted, iodata(), non_neg_integer() | infinity) ->
{ok, non_neg_integer()} | over | aborted.
account(aborted, _Chunk, _Max) ->
aborted;
account(_Bytes, _Chunk, infinity) ->
{ok, 0};
account(Bytes, Chunk, Max) ->
Total = Bytes + iolist_size(Chunk),
case Total > Max of
true -> over;
false -> {ok, Total}
end.