Skip to main content

src/livery.erl

-module(livery).
-moduledoc """
Public Livery facade.

Holds the user-visible API for service lifecycle plus the shared
response-emission walker that every adapter calls back into.
""".

-include("livery.hrl").

-export([
    start_listener/2,
    stop_listener/1,
    start_service/1,
    stop_service/1,
    drain/1,
    drain/2,
    which_listeners/1,
    router_handler/1,
    router_handler/2,
    dispatch/3,
    emit/3
]).

%%====================================================================
%% Service lifecycle
%%====================================================================

-doc """
Start a single-protocol listener. Useful for serving over just
one wire; for multi-protocol services with Alt-Svc, use
`start_service/1`.

`Name` selects the adapter (`livery_h1`, `livery_h2`, or
`livery_h3`). `Opts` is the adapter's `listen_opts()` map.
""".
-spec start_listener(atom(), map()) -> {ok, term()} | {error, term()}.
start_listener(livery_h1, Opts) -> livery_h1:start(Opts);
start_listener(livery_h2, Opts) -> livery_h2:start(Opts);
start_listener(livery_h3, Opts) -> livery_h3:start(Opts);
start_listener(_Name, _Opts) -> {error, unknown_adapter}.

-doc "Stop a single-protocol listener by adapter and handle.".
-spec stop_listener(
    {livery_h1, livery_h1:listener()}
    | {livery_h2, livery_h2:listener()}
    | {livery_h3, livery_h3:listener()}
    | term()
) -> ok | {error, term()}.
stop_listener({livery_h1, Ref}) -> livery_h1:stop(Ref);
stop_listener({livery_h2, Ref}) -> livery_h2:stop(Ref);
stop_listener({livery_h3, Ref}) -> livery_h3:stop(Ref);
stop_listener(_) -> {error, unknown_listener}.

-doc """
Start the full service: H3 on UDP, H2 on TLS, H1 on TCP, sharing
one middleware stack and handler, optionally advertising Alt-Svc
on H1 and H2 responses.

See `livery_service:start_link/1` for the opts shape.
""".
-spec start_service(livery_service:service_opts()) ->
    {ok, pid()} | {error, term()}.
start_service(Opts) ->
    livery_service:start_link(Opts).

-doc "Stop a running service by pid (immediate; cuts off in-flight).".
-spec stop_service(pid()) -> ok.
stop_service(Pid) when is_pid(Pid) ->
    livery_service:stop(Pid).

-doc "Gracefully drain and stop a service. See `livery_drain:drain/1`.".
-spec drain(pid()) -> ok | {error, timeout}.
drain(Pid) when is_pid(Pid) ->
    livery_drain:drain(Pid).

-doc """
Gracefully drain and stop a service: stop accepting new
connections, wait up to the timeout for in-flight requests to
finish, then stop. See `livery_drain:drain/2`.
""".
-spec drain(pid(), livery_drain:opts()) -> ok | {error, timeout}.
drain(Pid, Opts) when is_pid(Pid) ->
    livery_drain:drain(Pid, Opts).

-doc """
List the bound ports of a running service, keyed by protocol.

Returns a map containing only the protocols that were configured.
""".
-spec which_listeners(pid()) -> #{h1 | h2 | h3 => inet:port_number()}.
which_listeners(Pid) when is_pid(Pid) ->
    livery_service:which_listeners(Pid).

%%====================================================================
%% Router dispatch
%%====================================================================

-doc """
Turn a compiled router into a request handler.

Returns a `fun((livery_req:req()) -> livery_resp:resp())` that
matches the request's method and path against `Router`, sets the
captured path parameters as bindings, and invokes the matched
route handler. Unmatched paths get `404`; a path that exists for a
different method gets `405` with an `Allow` header.

Pass the result as the `handler` for a listener, or give the
router directly to `start_service/1` (which calls this for you).
""".
-spec router_handler(livery_router:router()) ->
    fun((livery_req:req()) -> livery_resp:resp()).
router_handler(Router) ->
    router_handler(Router, #{}).

-doc """
`router_handler/1` with fallbacks.

`Opts` may set `not_found => fun((Req) -> Resp)` and
`method_not_allowed => fun((Req, [Method]) -> Resp)` to override the
default `404`/`405` responses.
""".
-spec router_handler(livery_router:router(), map()) ->
    fun((livery_req:req()) -> livery_resp:resp()).
router_handler(Router, Opts) ->
    NotFound = maps:get(not_found, Opts, fun default_not_found/1),
    NotAllowed = maps:get(
        method_not_allowed,
        Opts,
        fun default_method_not_allowed/2
    ),
    fun(Req) ->
        Method = livery_req:method(Req),
        Path = livery_req:path(Req),
        case livery_router:match(Method, Path, Router) of
            {ok, Handler, Bindings, Meta} ->
                Req1 = livery_req:set_bindings(Bindings, Req),
                %% A route may carry its own middleware stack under
                %% `Meta`'s `middleware' key; it runs inside any
                %% service-level stack, just for this route.
                dispatch(route_stack(Meta), Handler, Req1);
            {error, not_found} ->
                NotFound(Req);
            {error, {method_not_allowed, Methods}} ->
                NotAllowed(Req, Methods)
        end
    end.

-spec route_stack(term()) -> livery_middleware:stack().
route_stack(Meta) when is_map(Meta) ->
    maps:get(middleware, Meta, []);
route_stack(_Meta) ->
    [].

-spec default_not_found(livery_req:req()) -> livery_resp:resp().
default_not_found(_Req) ->
    livery_resp:text(404, <<"not found">>).

-spec default_method_not_allowed(
    livery_req:req(),
    [binary() | '_']
) -> livery_resp:resp().
default_method_not_allowed(_Req, Methods) ->
    Allow = [M || M <- Methods, M =/= '_'],
    Resp = livery_resp:text(405, <<"method not allowed">>),
    livery_resp:with_header(
        <<"allow">>,
        iolist_to_binary(lists:join(<<", ">>, Allow)),
        Resp
    ).

%%====================================================================
%% Dispatch and emit
%%====================================================================

-doc """
Run a middleware stack and handler against a request value.

Pure dispatch: returns the `#livery_resp{}` value produced by the
pipeline. Adapters generally invoke this from a per-request process
and then call `emit/3` to write the response back to the wire.
""".
-spec dispatch(
    livery_middleware:stack(),
    livery_middleware:handler(),
    livery_req:req()
) -> livery_resp:resp().
dispatch(Stack, Handler, Req) ->
    livery_middleware:run(Stack, Handler, Req).

-doc """
Walk a response body variant and drive the adapter callbacks.

Called once a handler has returned. The walker emits status and
headers, then iterates the body variant (`full`, `chunked`, `sse`,
`file`, `empty`, or `upgrade`) into `Adapter:send_headers/4`,
`Adapter:send_data/3`, and `Adapter:send_trailers/2`. Errors from
the adapter are propagated by stopping the walk and returning the
error tuple.

A `{file, Path, Range}` body is streamed from the filesystem in
64 KiB chunks. `Content-Length` is set from the resolved segment
(unless the handler already set it); a byte range adds a
`Content-Range` header. A missing file emits `404`, an
unsatisfiable range emits `416`.
""".
-spec emit(module(), livery_adapter:stream(), livery_resp:resp()) ->
    ok | {error, term()}.
emit(Adapter, Stream, #livery_resp{} = Resp) ->
    Status = livery_resp:status(Resp),
    Headers = livery_resp:headers(Resp),
    Body = livery_resp:body(Resp),
    Trailers = livery_resp:trailers(Resp),
    EndOpts = drain_opts(livery_resp:early_response_drain(Resp)),
    emit_body(Adapter, Stream, Status, Headers, Body, Trailers, EndOpts).

%% Base send-opts carrying the per-response early-response drain budget.
%% `default' adds nothing, so the adapter falls back to the listener
%% budget; any other value is passed through for the adapter to honor on
%% the stream-terminating call.
drain_opts(default) -> #{};
drain_opts(Drain) -> #{early_response_drain => Drain}.

emit_body(_Adapter, _Stream, _Status, _Hs, taken_over, _Trailers, _EndOpts) ->
    %% Stream/socket was handed off (e.g. via livery_ws:upgrade/3).
    %% The adapter no longer owns it; nothing more to emit.
    ok;
emit_body(Adapter, Stream, Status, Hs, empty, _Trailers, EndOpts) ->
    Adapter:send_headers(Stream, Status, Hs, EndOpts#{end_stream => true});
emit_body(Adapter, Stream, Status, Hs, {full, IoData}, Trailers, EndOpts) ->
    HasTrailers = Trailers =/= undefined,
    case iolist_size(IoData) of
        0 when not HasTrailers ->
            Adapter:send_headers(Stream, Status, Hs, EndOpts#{end_stream => true});
        0 ->
            case Adapter:send_headers(Stream, Status, Hs, #{end_stream => false}) of
                ok -> emit_trailers(Adapter, Stream, Trailers);
                Other -> Other
            end;
        _ ->
            CanCoalesce =
                not HasTrailers andalso
                    erlang:function_exported(Adapter, send_full, 5),
            case CanCoalesce of
                true ->
                    %% Coalesce headers + body into one adapter call (and
                    %% one socket write) when the adapter supports it.
                    Adapter:send_full(Stream, Status, Hs, IoData, EndOpts#{end_stream => true});
                false ->
                    emit_full_granular(Adapter, Stream, Status, Hs, IoData, Trailers)
            end
    end;
emit_body(Adapter, Stream, Status, Hs, {chunked, Producer}, Trailers, _EndOpts) ->
    case Adapter:send_headers(Stream, Status, Hs, #{end_stream => false}) of
        ok ->
            Emit = fun(Chunk) ->
                Adapter:send_data(Stream, Chunk, #{end_stream => false})
            end,
            finish_stream(Adapter, Stream, Producer(Emit), Trailers);
        Other ->
            Other
    end;
emit_body(Adapter, Stream, Status, Hs, {sse, Producer}, Trailers, _EndOpts) ->
    case Adapter:send_headers(Stream, Status, Hs, #{end_stream => false}) of
        ok ->
            Emit = fun(Event) ->
                Adapter:send_data(
                    Stream,
                    sse_frame(Event),
                    #{end_stream => false}
                )
            end,
            finish_stream(Adapter, Stream, Producer(Emit), Trailers);
        Other ->
            Other
    end;
emit_body(Adapter, Stream, _Status, Hs, {deferred, Resolver}, _Trailers, _EndOpts) ->
    %% The resolver runs here, in the worker, before any header write,
    %% so it can still choose the status and body shape. The outer Hs
    %% carries any headers added by wrapping middleware; the decision
    %% wins on a name conflict.
    Resolved = livery_resp:resolve_deferred(Hs, Resolver()),
    emit(Adapter, Stream, Resolved);
emit_body(Adapter, Stream, Status, Hs, {file, Path, Range}, Trailers, _EndOpts) ->
    case file_segment(Path, Range) of
        {error, enoent} ->
            Adapter:send_headers(Stream, 404, [], #{end_stream => true});
        {error, range_not_satisfiable} ->
            Adapter:send_headers(Stream, 416, [], #{end_stream => true});
        {error, Reason} ->
            Adapter:reset(Stream, Reason),
            {error, Reason};
        {ok, Offset, Length, FileSize} ->
            {Status1, Hs1} = file_headers(
                Status,
                Hs,
                Offset,
                Length,
                FileSize,
                Range
            ),
            HasTrailers = Trailers =/= undefined,
            case Length of
                0 when not HasTrailers ->
                    Adapter:send_headers(
                        Stream,
                        Status1,
                        Hs1,
                        #{end_stream => true}
                    );
                0 ->
                    case
                        Adapter:send_headers(
                            Stream,
                            Status1,
                            Hs1,
                            #{end_stream => false}
                        )
                    of
                        ok -> emit_trailers(Adapter, Stream, Trailers);
                        Other -> Other
                    end;
                _ ->
                    case
                        Adapter:send_headers(
                            Stream,
                            Status1,
                            Hs1,
                            #{end_stream => false}
                        )
                    of
                        ok ->
                            stream_file(
                                Adapter,
                                Stream,
                                Path,
                                Offset,
                                Length,
                                Trailers
                            );
                        Other ->
                            Other
                    end
            end
    end;
emit_body(Adapter, Stream, _Status, _Hs, {upgrade, _Kind, _State}, _Trailers, _EndOpts) ->
    %% Upgrades are handled at the adapter level (livery_ws, livery_wt).
    Adapter:reset(Stream, upgrade_not_handled_at_emit),
    {error, not_implemented}.

%% Granular full-body emit: separate headers then body, closing the
%% stream after the body unless trailers follow. Used when the adapter
%% does not export the coalesced `send_full/5'.
emit_full_granular(Adapter, Stream, Status, Hs, IoData, Trailers) ->
    case Adapter:send_headers(Stream, Status, Hs, #{end_stream => false}) of
        ok ->
            EndStream = Trailers =:= undefined,
            case Adapter:send_data(Stream, IoData, #{end_stream => EndStream}) of
                ok -> emit_trailers(Adapter, Stream, Trailers);
                Other -> Other
            end;
        Other ->
            Other
    end.

emit_trailers(_Adapter, _Stream, undefined) ->
    ok;
emit_trailers(Adapter, Stream, Trailers) when is_list(Trailers) ->
    Adapter:send_trailers(Stream, Trailers);
emit_trailers(Adapter, Stream, Fun) when is_function(Fun, 0) ->
    Adapter:send_trailers(Stream, Fun()).

%% Close a chunked/SSE stream once its producer returns. A producer
%% that reports a failed send (the client is gone) short-circuits the
%% terminal write and surfaces the error.
finish_stream(_Adapter, _Stream, {error, _} = Err, _Trailers) ->
    Err;
finish_stream(Adapter, Stream, _ProducerResult, Trailers) ->
    close_stream(Adapter, Stream, Trailers).

close_stream(Adapter, Stream, undefined) ->
    Adapter:send_data(Stream, <<>>, #{end_stream => true});
close_stream(Adapter, Stream, Trailers) ->
    emit_trailers(Adapter, Stream, Trailers).

%%====================================================================
%% File emission
%%====================================================================

-define(FILE_CHUNK_SIZE, 65536).

-type trailers() ::
    undefined
    | [{binary(), binary()}]
    | fun(() -> [{binary(), binary()}]).

-spec file_segment(
    file:name_all(),
    undefined | {non_neg_integer(), non_neg_integer() | eof}
) ->
    {ok, non_neg_integer(), non_neg_integer(), non_neg_integer()}
    | {error, enoent | not_a_regular_file | range_not_satisfiable}.
file_segment(Path, Range) ->
    case filelib:is_regular(Path) of
        true ->
            resolve_range(Range, filelib:file_size(Path));
        false ->
            case filelib:is_file(Path) of
                true -> {error, not_a_regular_file};
                false -> {error, enoent}
            end
    end.

-spec resolve_range(
    undefined | {non_neg_integer(), non_neg_integer() | eof},
    non_neg_integer()
) ->
    {ok, non_neg_integer(), non_neg_integer(), non_neg_integer()}
    | {error, range_not_satisfiable}.
resolve_range(undefined, Size) ->
    {ok, 0, Size, Size};
resolve_range({Offset, eof}, Size) when
    is_integer(Offset), Offset >= 0, Offset =< Size
->
    {ok, Offset, Size - Offset, Size};
resolve_range({Offset, Length}, Size) when
    is_integer(Offset),
    is_integer(Length),
    Offset >= 0,
    Length >= 0,
    Offset =< Size
->
    {ok, Offset, min(Length, Size - Offset), Size};
resolve_range(_Range, _Size) ->
    {error, range_not_satisfiable}.

-spec file_headers(
    100..599,
    [{binary(), iodata()}],
    non_neg_integer(),
    non_neg_integer(),
    non_neg_integer(),
    undefined | {non_neg_integer(), non_neg_integer() | eof}
) ->
    {100..599, [{binary(), iodata()}]}.
file_headers(Status, Hs, _Offset, Length, _FileSize, undefined) ->
    {Status, set_content_length(Hs, Length)};
file_headers(Status, Hs, _Offset, 0, _FileSize, _Range) ->
    {Status, set_content_length(Hs, 0)};
file_headers(Status, Hs, Offset, Length, FileSize, _Range) ->
    Hs1 = set_content_length(Hs, Length),
    Last = Offset + Length - 1,
    CR = iolist_to_binary([
        <<"bytes ">>,
        integer_to_binary(Offset),
        <<"-">>,
        integer_to_binary(Last),
        <<"/">>,
        integer_to_binary(FileSize)
    ]),
    {Status, [{<<"content-range">>, CR} | Hs1]}.

-spec set_content_length([{binary(), iodata()}], non_neg_integer()) ->
    [{binary(), iodata()}].
set_content_length(Hs, Length) ->
    case has_header(<<"content-length">>, Hs) of
        true -> Hs;
        false -> [{<<"content-length">>, integer_to_binary(Length)} | Hs]
    end.

-spec has_header(binary(), [{binary(), iodata()}]) -> boolean().
has_header(Name, Hs) ->
    Lower = string:lowercase(Name),
    lists:any(fun({K, _}) -> string:lowercase(K) =:= Lower end, Hs).

-spec stream_file(
    module(),
    livery_adapter:stream(),
    file:name_all(),
    non_neg_integer(),
    non_neg_integer(),
    trailers()
) -> ok | {error, term()}.
stream_file(Adapter, Stream, Path, Offset, Length, Trailers) ->
    case file:open(Path, [read, raw, binary]) of
        {ok, Fd} ->
            try
                {ok, _} = file:position(Fd, Offset),
                send_file_chunks(Adapter, Stream, Fd, Length, Trailers)
            after
                file:close(Fd)
            end;
        {error, Reason} ->
            Adapter:reset(Stream, Reason),
            {error, Reason}
    end.

-spec send_file_chunks(
    module(),
    livery_adapter:stream(),
    file:io_device(),
    non_neg_integer(),
    trailers()
) ->
    ok | {error, term()}.
send_file_chunks(Adapter, Stream, Fd, Remaining, Trailers) ->
    HasTrailers = Trailers =/= undefined,
    case file:read(Fd, min(Remaining, ?FILE_CHUNK_SIZE)) of
        eof ->
            close_stream(Adapter, Stream, Trailers);
        {ok, Data} ->
            Rest = Remaining - byte_size(Data),
            Last = Rest =< 0,
            EndStream = Last andalso not HasTrailers,
            case Adapter:send_data(Stream, Data, #{end_stream => EndStream}) of
                ok when Last andalso HasTrailers ->
                    emit_trailers(Adapter, Stream, Trailers);
                ok when Last ->
                    ok;
                ok ->
                    send_file_chunks(Adapter, Stream, Fd, Rest, Trailers);
                Other ->
                    Other
            end;
        {error, Reason} ->
            Adapter:reset(Stream, Reason),
            {error, Reason}
    end.

%%====================================================================
%% SSE framing (RFC text/event-stream)
%%====================================================================

-spec sse_frame(map() | iodata()) -> iodata().
sse_frame(#{data := Data} = E) ->
    Event = maps:get(event, E, undefined),
    Id = maps:get(id, E, undefined),
    Retry = maps:get(retry, E, undefined),
    [
        maybe_field(<<"event">>, Event),
        maybe_field(<<"id">>, Id),
        maybe_field(<<"retry">>, Retry),
        data_lines(Data),
        <<"\n">>
    ];
sse_frame(IoData) ->
    [<<"data: ">>, IoData, <<"\n\n">>].

maybe_field(_, undefined) -> [];
maybe_field(Name, Value) -> [Name, <<": ">>, to_iodata(Value), <<"\n">>].

data_lines(B) when is_binary(B) ->
    [<<"data: ">>, B, <<"\n">>];
data_lines(L) when is_list(L) ->
    [<<"data: ">>, L, <<"\n">>].

to_iodata(B) when is_binary(B) -> B;
to_iodata(L) when is_list(L) -> L;
to_iodata(A) when is_atom(A) -> atom_to_binary(A);
to_iodata(I) when is_integer(I) -> integer_to_binary(I).