Skip to main content

src/livery_resp.erl

-module(livery_resp).
-moduledoc """
Response constructors and accessors.

Handlers return an immutable `#livery_resp{}` value. The core
emits it onto the wire by walking the body variant and driving
the adapter's `send_headers`/`send_data`/`send_trailers` calls.
Response builders here are pure: they never touch sockets.

Most builders fix the status and headers up front. When the status
is not known until the first byte (admission control, lazy probing),
use `stream_deferred/1` to choose between streaming and a one-shot
error after the handler returns but before anything is written.
""".

-include("livery.hrl").

-export([
    new/2,
    new/3,

    status/1,
    headers/1,
    body/1,
    trailers/1,
    early_response_drain/1,

    with_status/2,
    with_header/3,
    append_header/3,
    delete_header/2,
    with_trailers/2,
    with_body/2,
    with_etag/2,
    with_cache_control/2,
    with_early_response_drain/2,

    text/2,
    text/3,
    text/4,
    html/2,
    html/3,
    json/2,
    json/3,
    json/4,

    empty/1,

    stream/3,
    stream_deferred/1,
    resolve_deferred/1,
    resolve_deferred/2,
    sse/2,
    sse/3,

    ndjson/2,
    ndjson/3,

    file/2,
    file/3,

    redirect/2,
    redirect/3,

    upgrade/2
]).

-export_type([resp/0, body/0, deferred_decision/0, cache_directive/0, drain/0, resp_opts/0]).

%% Early-response inbound-drain budget. `default' keeps the listener's
%% budget, `none' disables the drain (close immediately), and a
%% `{MaxBytes, MaxMs}' tuple (either component `infinity') bounds it for
%% this one response.
-type drain() ::
    default
    | none
    | {non_neg_integer() | infinity, non_neg_integer() | infinity}.

-type resp_opts() :: #{early_response_drain => drain()}.

-type cache_directive() ::
    no_cache
    | no_store
    | public
    | private
    | immutable
    | must_revalidate
    | proxy_revalidate
    | no_transform
    | {max_age, non_neg_integer()}
    | {s_maxage, non_neg_integer()}
    | {stale_while_revalidate, non_neg_integer()}
    | {stale_if_error, non_neg_integer()}.

-type resp() :: #livery_resp{}.
-type header_name() :: binary().
-type header_value() :: binary().
-type producer() :: fun((term()) -> ok | {error, term()}).
-type body() ::
    {full, iodata()}
    | {chunked, producer()}
    | {sse, producer()}
    | {deferred, fun(() -> deferred_decision())}
    | {file, file:name_all(), undefined | {non_neg_integer(), non_neg_integer() | eof}}
    | {upgrade, ws | wt, term()}
    | empty
    | taken_over.

%% A deferred response resolves to exactly one of these on first
%% emit, before any header is written. See `stream_deferred/1`.
-type deferred_decision() ::
    {stream, 100..599, [{header_name(), header_value()}], producer()}
    | {sse, 100..599, [{header_name(), header_value()}], producer()}
    | {ndjson, 100..599, [{header_name(), header_value()}], producer()}
    | {full, 100..599, [{header_name(), header_value()}], iodata()}.

%%====================================================================
%% Construction
%%====================================================================

-doc "Build a response with the given status and headers, no body.".
-spec new(100..599, [{header_name(), header_value()}]) -> resp().
new(Status, Headers) ->
    new(Status, Headers, {full, <<>>}).

-doc "Build a response with status, headers, and a body variant.".
-spec new(100..599, [{header_name(), header_value()}], body()) -> resp().
new(Status, Headers, Body) ->
    #livery_resp{
        status = Status,
        headers = normalize_headers(Headers),
        body = Body
    }.

%%====================================================================
%% Basic accessors and mutators
%%====================================================================

-spec status(resp()) -> 100..599.
status(#livery_resp{status = S}) -> S.

-spec headers(resp()) -> [{header_name(), header_value()}].
headers(#livery_resp{headers = H}) -> H.

-spec body(resp()) -> body().
body(#livery_resp{body = B}) -> B.

-spec trailers(resp()) ->
    undefined
    | [{header_name(), header_value()}]
    | fun(() -> [{header_name(), header_value()}]).
trailers(#livery_resp{trailers = T}) -> T.

-doc "The response's early-response inbound-drain budget. See `with_early_response_drain/2`.".
-spec early_response_drain(resp()) -> drain().
early_response_drain(#livery_resp{early_response_drain = D}) -> D.

-spec with_status(100..599, resp()) -> resp().
with_status(Status, Resp) -> Resp#livery_resp{status = Status}.

-doc "Replace (or insert) a header. Names are matched case-insensitively.".
-spec with_header(header_name(), header_value(), resp()) -> resp().
with_header(Name, Value, #livery_resp{headers = Hs} = Resp) ->
    LName = lowercase(Name),
    Hs1 = lists:keystore(LName, 1, delete_all(LName, Hs), {LName, Value}),
    Resp#livery_resp{headers = Hs1}.

-doc "Append a header even when one with this name exists.".
-spec append_header(header_name(), header_value(), resp()) -> resp().
append_header(Name, Value, #livery_resp{headers = Hs} = Resp) ->
    Resp#livery_resp{headers = Hs ++ [{lowercase(Name), Value}]}.

-doc "Remove every header with this name.".
-spec delete_header(header_name(), resp()) -> resp().
delete_header(Name, #livery_resp{headers = Hs} = Resp) ->
    Resp#livery_resp{headers = delete_all(lowercase(Name), Hs)}.

-doc """
Attach trailers.

Pass a list of `{Name, Value}` pairs computed up front, or a fun
`fun() -> [{Name, Value}]` evaluated lazily after the body has
been emitted.
""".
-spec with_trailers(
    undefined
    | [{header_name(), header_value()}]
    | fun(() -> [{header_name(), header_value()}]),
    resp()
) -> resp().
with_trailers(Trailers, Resp) ->
    Resp#livery_resp{trailers = Trailers}.

-doc """
Replace the body variant, keeping status, headers, and trailers.

Used by middleware (e.g. `livery_compress`) that rewrites the body
after the handler has produced the response.
""".
-spec with_body(body(), resp()) -> resp().
with_body(Body, Resp) ->
    Resp#livery_resp{body = Body}.

-doc """
Set the `ETag` header.

A value already shaped as a strong (`"..."`) or weak (`W/"..."`) tag is
used as-is; a bare token is wrapped as a strong ETag.
""".
-spec with_etag(binary(), resp()) -> resp().
with_etag(Tag, Resp) ->
    with_header(<<"etag">>, format_etag(Tag), Resp).

-doc """
Set the `Cache-Control` header.

Pass a verbatim binary, or a list of directives: the atoms `no_cache`,
`no_store`, `public`, `private`, `immutable`, `must_revalidate`,
`proxy_revalidate`, `no_transform`, or the tuples `{max_age, Secs}`,
`{s_maxage, Secs}`, `{stale_while_revalidate, Secs}`,
`{stale_if_error, Secs}`.
""".
-spec with_cache_control(binary() | [cache_directive()], resp()) -> resp().
with_cache_control(Value, Resp) ->
    with_header(<<"cache-control">>, format_cache_control(Value), Resp).

-doc """
Set the early-response inbound-drain budget for this response.

When a handler commits a response before reading the request body
(e.g. rejecting an oversized upload with 413), HTTP/1.1 drains the
leftover inbound body before closing so the client reads the
response. `default` keeps the listener's budget; `none` disables the
drain and closes immediately; `{MaxBytes, MaxMs}` (either component
`infinity`) bounds it for this response only. Honored on full
responses; streaming responses use the listener budget.
""".
-spec with_early_response_drain(drain(), resp()) -> resp().
with_early_response_drain(Drain, Resp) ->
    Resp#livery_resp{early_response_drain = Drain}.

%%====================================================================
%% Convenience builders
%%====================================================================

-doc "`text/plain; charset=utf-8` response.".
-spec text(100..599, iodata()) -> resp().
text(Status, Body) -> text(Status, [], Body).

-doc "`text/2` with extra headers.".
-spec text(100..599, [{header_name(), header_value()}], iodata()) -> resp().
text(Status, ExtraHeaders, Body) ->
    new(
        Status,
        with_default(
            <<"content-type">>,
            <<"text/plain; charset=utf-8">>,
            ExtraHeaders
        ),
        {full, Body}
    ).

-doc "`text/3` with response options (e.g. `early_response_drain`).".
-spec text(100..599, [{header_name(), header_value()}], iodata(), resp_opts()) -> resp().
text(Status, ExtraHeaders, Body, Opts) ->
    apply_opts(text(Status, ExtraHeaders, Body), Opts).

-doc "`text/html; charset=utf-8` response.".
-spec html(100..599, iodata()) -> resp().
html(Status, Body) -> html(Status, [], Body).

-doc "`html/2` with extra headers.".
-spec html(100..599, [{header_name(), header_value()}], iodata()) -> resp().
html(Status, ExtraHeaders, Body) ->
    new(
        Status,
        with_default(
            <<"content-type">>,
            <<"text/html; charset=utf-8">>,
            ExtraHeaders
        ),
        {full, Body}
    ).

-doc """
Wrap pre-encoded JSON bytes as a response.

Livery does not bundle a JSON codec. Pass already-encoded iodata.
Higher-level helpers plugging in `thoas` or `jsx` can sit on top
of this builder.
""".
-spec json(100..599, iodata()) -> resp().
json(Status, Body) -> json(Status, [], Body).

-doc "`json/2` with extra headers.".
-spec json(100..599, [{header_name(), header_value()}], iodata()) -> resp().
json(Status, ExtraHeaders, Body) ->
    new(
        Status,
        with_default(
            <<"content-type">>,
            <<"application/json">>,
            ExtraHeaders
        ),
        {full, Body}
    ).

-doc """
`json/3` with response options.

Use the `early_response_drain` key to tune the inbound drain for an
early reject, e.g.
`json(413, [], Body, #{early_response_drain => {16#400000, 5000}})`.
""".
-spec json(100..599, [{header_name(), header_value()}], iodata(), resp_opts()) -> resp().
json(Status, ExtraHeaders, Body, Opts) ->
    apply_opts(json(Status, ExtraHeaders, Body), Opts).

-doc "Headers-only response.".
-spec empty(100..599) -> resp().
empty(Status) ->
    new(Status, [], empty).

-doc """
Streaming chunked response.

The producer is called with a `SendFun` and drives body chunks
until it returns.
""".
-spec stream(
    100..599,
    [{header_name(), header_value()}],
    fun((term()) -> ok | {error, term()})
) -> resp().
stream(Status, Headers, Producer) when is_function(Producer, 1) ->
    new(Status, Headers, {chunked, Producer}).

-doc """
Deferred response: choose the status, headers, and body shape at the
first byte rather than at construction.

`stream/3`, `sse/3`, and `ndjson/3` fix the status and headers when the
response is built, so the status is on the wire before the producer runs.
That blocks the "admit, then stream; if admission fails before the first
byte, reply with an error status" pattern: a saturated request can only
emit `200 OK` followed by an in-band error frame, never `429` + a JSON
envelope.

`Resolver` is invoked once, in the handler's process, before any header
is written. It returns one of:

- `{stream, Status, Headers, Producer}`  -- like `stream/3`
- `{sse, Status, Headers, Producer}`     -- like `sse/3`
- `{ndjson, Status, Headers, Producer}`  -- like `ndjson/3`
- `{full, Status, Headers, Body}`        -- a one-shot full response

```erlang
livery_resp:stream_deferred(fun() ->
    case admit() of
        ok            -> {sse, 200, [], fun(Emit) -> drive(Emit) end};
        pool_exhausted -> {full, 429, [{<<"content-type">>, <<"application/json">>}],
                           <<"{\\"error\\":\\"busy\\"}">>}
    end
end).
```

Headers added by middleware that wraps the handler (request id, security
headers, CORS) are kept; the decision's own headers win on a name
conflict. An invalid decision crashes before any byte is sent, so it
surfaces as a clean 500.
""".
-spec stream_deferred(fun(() -> deferred_decision())) -> resp().
stream_deferred(Resolver) when is_function(Resolver, 0) ->
    new(200, [], {deferred, Resolver}).

-doc "Resolve a deferred decision into a concrete response.".
-spec resolve_deferred(deferred_decision()) -> resp().
resolve_deferred(Decision) ->
    resolve_deferred([], Decision).

-doc """
`resolve_deferred/1` merging `OuterHeaders` (e.g. headers added by
middleware to the deferred wrapper) under the decision's headers. The
decision wins on a name conflict.
""".
-spec resolve_deferred([{header_name(), header_value()}], deferred_decision()) ->
    resp().
resolve_deferred(OuterHeaders, Decision) ->
    Resolved = from_decision(Decision),
    Merged = merge_under(OuterHeaders, headers(Resolved)),
    Resolved#livery_resp{headers = Merged}.

-spec from_decision(deferred_decision()) -> resp().
from_decision({stream, Status, Headers, Producer}) ->
    stream(Status, Headers, Producer);
from_decision({sse, Status, Headers, Producer}) ->
    sse(Status, Headers, Producer);
from_decision({ndjson, Status, Headers, Producer}) ->
    ndjson(Status, Headers, Producer);
from_decision({full, Status, Headers, Body}) ->
    new(Status, Headers, {full, Body}).

%% Higher (the decision) wins on a name conflict; Lower entries survive
%% only when their lowercased name is absent from Higher.
-spec merge_under(
    [{header_name(), header_value()}],
    [{header_name(), header_value()}]
) -> [{header_name(), header_value()}].
merge_under(Lower, Higher) ->
    Names = [string:lowercase(K) || {K, _} <- Higher],
    Extra = [
        KV
     || {K, _} = KV <- Lower,
        not lists:member(string:lowercase(K), Names)
    ],
    Higher ++ Extra.

-doc "Server-Sent Events response.".
-spec sse(100..599, fun((term()) -> ok | {error, term()})) -> resp().
sse(Status, Producer) -> sse(Status, [], Producer).

-doc "`sse/2` with extra headers.".
-spec sse(
    100..599,
    [{header_name(), header_value()}],
    fun((term()) -> ok | {error, term()})
) -> resp().
sse(Status, ExtraHeaders, Producer) when is_function(Producer, 1) ->
    Hs0 = with_default(<<"content-type">>, <<"text/event-stream">>, ExtraHeaders),
    Hs1 = with_default(<<"cache-control">>, <<"no-cache">>, Hs0),
    new(Status, Hs1, {sse, Producer}).

-doc """
Newline-delimited JSON streaming response.

The producer is called with an `Emit` callback that takes any
JSON-encodable Erlang term. Each `Emit(Term)` encodes the term
via the OTP `json` module, appends a literal `\\n`, and writes
one frame. Content-Type defaults to `application/x-ndjson`.

```erlang
livery_resp:ndjson(200, fun(Emit) ->
    [Emit(#{n => N}) || N <- lists:seq(1, 5)],
    ok
end).
```

For pre-encoded bytes, use `stream/3` directly.
""".
-spec ndjson(100..599, fun((term()) -> ok | {error, term()})) -> resp().
ndjson(Status, Producer) -> ndjson(Status, [], Producer).

-doc "`ndjson/2` with extra headers.".
-spec ndjson(
    100..599,
    [{header_name(), header_value()}],
    fun((term()) -> ok | {error, term()})
) -> resp().
ndjson(Status, ExtraHeaders, Producer) when is_function(Producer, 1) ->
    Hs = with_default(
        <<"content-type">>,
        <<"application/x-ndjson">>,
        ExtraHeaders
    ),
    Wrapped = fun(Emit) ->
        Encode = fun(Term) ->
            Emit([json:encode(Term), <<"\n">>])
        end,
        Producer(Encode)
    end,
    new(Status, Hs, {chunked, Wrapped}).

-doc """
Send a file from the filesystem.

Range is `undefined` for the whole file, or `{Offset, Length}`
where `Length` may be `eof`.
""".
-spec file(100..599, file:name_all()) -> resp().
file(Status, Path) -> file(Status, Path, undefined).

-doc "`file/2` with an explicit byte range.".
-spec file(
    100..599,
    file:name_all(),
    undefined | {non_neg_integer(), non_neg_integer() | eof}
) -> resp().
file(Status, Path, Range) ->
    new(Status, [], {file, Path, Range}).

-doc "Redirect response, setting the `Location` header.".
-spec redirect(301 | 302 | 303 | 307 | 308, iodata()) -> resp().
redirect(Status, Location) -> redirect(Status, Location, []).

-doc "`redirect/2` with extra headers.".
-spec redirect(
    301 | 302 | 303 | 307 | 308,
    iodata(),
    [{header_name(), header_value()}]
) -> resp().
redirect(Status, Location, ExtraHeaders) ->
    new(
        Status,
        [{<<"location">>, iolist_to_binary(Location)} | ExtraHeaders],
        empty
    ).

-doc "Protocol upgrade response (WebSocket / WebTransport).".
-spec upgrade(ws | wt, term()) -> resp().
upgrade(Kind, State) when Kind =:= ws; Kind =:= wt ->
    #livery_resp{
        status = 101,
        headers = [],
        body = {upgrade, Kind, State}
    }.

%%====================================================================
%% Helpers
%%====================================================================

-spec apply_opts(resp(), resp_opts()) -> resp().
apply_opts(Resp, Opts) ->
    case maps:find(early_response_drain, Opts) of
        {ok, Drain} -> with_early_response_drain(Drain, Resp);
        error -> Resp
    end.

-spec with_default(
    header_name(),
    header_value(),
    [{header_name(), header_value()}]
) ->
    [{header_name(), header_value()}].
with_default(Name, Default, Headers) ->
    LName = lowercase(Name),
    Normalized = normalize_headers(Headers),
    case lists:keyfind(LName, 1, Normalized) of
        {_, _} -> Normalized;
        false -> [{LName, Default} | Normalized]
    end.

-spec normalize_headers([{header_name(), header_value()}]) ->
    [{header_name(), header_value()}].
normalize_headers(Hs) ->
    [{lowercase(N), V} || {N, V} <- Hs].

-spec lowercase(binary()) -> binary().
lowercase(Bin) when is_binary(Bin) ->
    case is_lower_ascii(Bin) of
        true -> Bin;
        false -> iolist_to_binary(string:lowercase(Bin))
    end.

-spec is_lower_ascii(binary()) -> boolean().
is_lower_ascii(<<>>) -> true;
is_lower_ascii(<<C, Rest/binary>>) when C >= $a, C =< $z -> is_lower_ascii(Rest);
is_lower_ascii(<<C, Rest/binary>>) when C >= $0, C =< $9 -> is_lower_ascii(Rest);
is_lower_ascii(<<$-, Rest/binary>>) -> is_lower_ascii(Rest);
is_lower_ascii(<<$:, Rest/binary>>) -> is_lower_ascii(Rest);
is_lower_ascii(_) -> false.

-spec delete_all(binary(), [{binary(), term()}]) -> [{binary(), term()}].
delete_all(Key, L) ->
    [KV || {K, _} = KV <- L, K =/= Key].

-spec format_etag(binary()) -> binary().
format_etag(<<"W/", _/binary>> = Weak) -> Weak;
format_etag(<<$", _/binary>> = Quoted) -> Quoted;
format_etag(Tag) -> <<$", Tag/binary, $">>.

-spec format_cache_control(binary() | [cache_directive()]) -> binary().
format_cache_control(Value) when is_binary(Value) ->
    Value;
format_cache_control(Directives) when is_list(Directives) ->
    iolist_to_binary(lists:join(<<", ">>, [cc_directive(D) || D <- Directives])).

-spec cc_directive(cache_directive()) -> binary().
cc_directive(no_cache) ->
    <<"no-cache">>;
cc_directive(no_store) ->
    <<"no-store">>;
cc_directive(public) ->
    <<"public">>;
cc_directive(private) ->
    <<"private">>;
cc_directive(immutable) ->
    <<"immutable">>;
cc_directive(must_revalidate) ->
    <<"must-revalidate">>;
cc_directive(proxy_revalidate) ->
    <<"proxy-revalidate">>;
cc_directive(no_transform) ->
    <<"no-transform">>;
cc_directive({max_age, Secs}) ->
    <<"max-age=", (integer_to_binary(Secs))/binary>>;
cc_directive({s_maxage, Secs}) ->
    <<"s-maxage=", (integer_to_binary(Secs))/binary>>;
cc_directive({stale_while_revalidate, Secs}) ->
    <<"stale-while-revalidate=", (integer_to_binary(Secs))/binary>>;
cc_directive({stale_if_error, Secs}) ->
    <<"stale-if-error=", (integer_to_binary(Secs))/binary>>.