Skip to main content

src/client/livery_client.erl

-module(livery_client).
-moduledoc """
A composable HTTP client: the outbound twin of the server middleware.

Build a client once with `new/1` (a transport adapter, a base URL,
default headers, and a layer stack), then call it with `get/2`, `post/3`,
`request/3,4`. Layers run outermost-first and each is
`call(Request, Next, State) -> {ok, response()} | {error, term()}`, the
same shape as server middleware, with errors threaded as values.

```erlang
Client = livery_client:new(#{
    base_url => <<"https://api.example.com">>,
    stack    => [
        livery_client:timeout(5000),
        livery_client:retry(#{max => 3}),
        livery_client:circuit_breaker(#{name => api}),
        livery_client:concurrency(50)
    ]
}),
{ok, Resp} = livery_client:get(Client, <<"/users/42">>),
200 = livery_client:status(Resp).
```

The transport is a `livery_client_adapter`; the default,
`livery_client_hackney`, speaks HTTP/1.1, HTTP/2, and HTTP/3.
""".

-export([new/1]).
-export([get/2, post/3, put/3, delete/2, request/3, request/4, run/2]).
-export([status/1, headers/1, header/2, header/3, body/1, method/1, url/1, set_header/3]).
-export([read/2, read_body/1]).
-export([stream_next/1, stop_stream/1]).
-export([timeout/1, concurrency/1, retry/1, circuit_breaker/1, balance/1]).
-export([cookie_jar/0, cookie_jar/1]).
-export([add_endpoint/2, remove_endpoint/2]).
-export([before/1, after_response/1, wrap/1]).
-export([rebase/2]).

-export_type([
    client/0, request/0, response/0, result/0, next/0, entry/0, stack/0, endpoint/0, stream_ref/0
]).

-type request() :: #{
    method := atom() | binary(),
    url := binary(),
    headers := [{binary(), binary()}],
    body := empty | {full, iodata()} | {stream, fun()},
    timeout := timeout(),
    stream := boolean(),
    stream_to := pid() | undefined,
    flow := auto | manual,
    meta := map()
}.
-type response() :: #{
    status := 100..599 | undefined,
    headers := [{binary(), binary()}],
    body := {full, binary()} | {stream, term()} | {push, stream_ref()}
}.
-opaque stream_ref() :: {livery_stream, module(), term()}.
-type result() :: {ok, response()} | {error, term()}.
-type endpoint() :: binary().
-type next() :: fun((request()) -> result()).
-type entry() :: {module(), term()} | fun((request(), next()) -> result()).
-type stack() :: [entry()].
-opaque client() :: #{
    adapter := module(),
    adapter_opts := map(),
    base_url := binary(),
    headers := [{binary(), binary()}],
    stack := stack()
}.

%%====================================================================
%% Build and call
%%====================================================================

-doc """
Build a client. Opts: `adapter` (default `livery_client_hackney`),
`adapter_opts`, `base_url`, `headers` (defaults applied to every
request), `stack` (the layers).
""".
-spec new(map()) -> client().
new(Opts) ->
    #{
        adapter => maps:get(adapter, Opts, livery_client_hackney),
        adapter_opts => maps:get(adapter_opts, Opts, #{}),
        base_url => maps:get(base_url, Opts, <<>>),
        headers => maps:get(headers, Opts, []),
        stack => maps:get(stack, Opts, [])
    }.

-spec get(client(), binary()) -> result().
get(Client, Path) -> request(Client, get, Path, #{}).

-spec post(client(), binary(), iodata()) -> result().
post(Client, Path, Body) -> request(Client, post, Path, #{body => {full, Body}}).

-spec put(client(), binary(), iodata()) -> result().
put(Client, Path, Body) -> request(Client, put, Path, #{body => {full, Body}}).

-spec delete(client(), binary()) -> result().
delete(Client, Path) -> request(Client, delete, Path, #{}).

-spec request(client(), atom() | binary(), binary()) -> result().
request(Client, Method, Path) -> request(Client, Method, Path, #{}).

-doc """
Send a request. `Opts`: `body` (`iodata` | `{full, _}` | `{stream, Fun}`),
`headers`, `timeout`, `stream` (`true` to receive a `{stream, Reader}`
response body), `meta`.

Push streaming: set `stream_to` to a pid and the response is delivered to
it as ordered messages, freeing the pid to selectively receive body chunks
alongside its own control messages instead of dedicating a process to a
pull loop. The reply is `{ok, #{body := {push, Ref}}}`; the recipient then
receives

```erlang
{livery_response, Ref, {status, Status, Headers}}
{livery_response, Ref, {chunk, Binary}}      %% zero or more
{livery_response, Ref, done}
{livery_response, Ref, {error, Reason}}       %% instead of done, on failure
```

`flow` (default `auto`) pushes chunks as fast as the wire allows; `manual`
sends one chunk per `stream_next/1` for backpressure. `stop_stream/1`
cancels the stream and drops its connection. Push streaming bypasses the
layer stack; the adapter owns the connection.
""".
-spec request(client(), atom() | binary(), binary(), map()) -> result().
request(Client, Method, Path, Opts) ->
    run(Client, build_request(Client, Method, Path, Opts)).

-doc "Run a fully built request through the client's layer stack.".
-spec run(client(), request()) -> result().
run(Client, Req) ->
    #{adapter := Adapter, adapter_opts := AdapterOpts, stack := Stack} = Client,
    case maps:get(stream_to, Req, undefined) of
        Pid when is_pid(Pid) ->
            start_push(Adapter, AdapterOpts, Req, Pid);
        _ ->
            Handler = fun(R) -> Adapter:request(R, AdapterOpts) end,
            run_stack(Stack, Handler, Req)
    end.

start_push(Adapter, AdapterOpts, Req, Pid) ->
    StreamOpts = #{stream_to => Pid, flow => maps:get(flow, Req, auto)},
    case Adapter:stream(Req, AdapterOpts, StreamOpts) of
        {ok, Ref} -> {ok, #{status => undefined, headers => [], body => {push, Ref}}};
        {error, _} = E -> E
    end.

%%====================================================================
%% Accessors
%%====================================================================

-spec status(response()) -> 100..599.
status(#{status := S}) -> S.

-spec headers(request() | response()) -> [{binary(), binary()}].
headers(#{headers := H}) -> H.

-spec header(binary(), request() | response()) -> binary() | undefined.
header(Name, Map) -> header(Name, Map, undefined).

-spec header(binary(), request() | response(), Default) -> binary() | Default.
header(Name, #{headers := H}, Default) ->
    L = string:lowercase(Name),
    case lists:search(fun({K, _}) -> string:lowercase(K) =:= L end, H) of
        {value, {_, V}} -> V;
        false -> Default
    end.

-spec body(response()) -> {full, binary()} | {stream, term()}.
body(#{body := B}) -> B.

-spec method(request()) -> atom() | binary().
method(#{method := M}) -> M.

-spec url(request()) -> binary().
url(#{url := U}) -> U.

-spec set_header(binary(), binary(), request()) -> request().
set_header(Name, Value, #{headers := H} = Req) ->
    L = string:lowercase(Name),
    Kept = [KV || {K, _} = KV <- H, string:lowercase(K) =/= L],
    Req#{headers => [{Name, Value} | Kept]}.

%%====================================================================
%% Streamed response bodies
%%====================================================================

-doc "Pull the next chunk of a `{stream, Reader}` response body.".
-spec read(term(), timeout()) -> {ok, binary(), term()} | {done, term()} | {error, term()}.
read({Adapter, State}, Timeout) ->
    case Adapter:read(State, Timeout) of
        {ok, Data, State1} -> {ok, Data, {Adapter, State1}};
        {done, State1} -> {done, {Adapter, State1}};
        {error, _} = E -> E
    end.

-doc "Drain a `{stream, Reader}` response body to a single binary.".
-spec read_body(term()) -> {ok, binary()} | {error, term()}.
read_body(Reader) -> read_body(Reader, []).

read_body(Reader, Acc) ->
    case read(Reader, 30000) of
        {ok, Data, Reader1} -> read_body(Reader1, [Data | Acc]);
        {done, _Reader1} -> {ok, iolist_to_binary(lists:reverse(Acc))};
        {error, _} = E -> E
    end.

-doc """
Ask a `flow => manual` push stream for one more `{chunk, _}` message.
`Ref` is the value from the `{push, Ref}` response body.
""".
-spec stream_next(stream_ref()) -> ok | {error, term()}.
stream_next({livery_stream, Adapter, _} = Ref) ->
    Adapter:stream_next(Ref).

-doc "Cancel a push stream and release its connection.".
-spec stop_stream(stream_ref()) -> ok.
stop_stream({livery_stream, Adapter, _} = Ref) ->
    Adapter:stop_stream(Ref).

%%====================================================================
%% Layer constructors
%%====================================================================

-spec timeout(pos_integer()) -> entry().
timeout(Ms) -> {livery_client_timeout, Ms}.

-spec concurrency(non_neg_integer()) -> entry().
concurrency(Limit) -> {livery_client_concurrency, livery_client_concurrency:limiter(Limit)}.

-spec retry(map()) -> entry().
retry(Opts) -> {livery_client_retry, Opts}.

-spec circuit_breaker(map()) -> entry().
circuit_breaker(Opts) -> {livery_client_circuit, Opts}.

-doc """
Spread requests across a pool of endpoints, with passive outlier
ejection and lazy half-open recovery. `Opts`: `name` (required),
`endpoints` (base URLs or a `{Module, Arg}` discovery pair), `policy`
(`p2c` | `round_robin`), `eject_after`, `eject_for`, `fail_status`.
With `balance` you pass paths; the chosen endpoint supplies the host.
""".
-spec balance(map()) -> entry().
balance(Opts) -> {livery_client_balance, Opts}.

-doc """
A cookie jar (RFC 6265 client subset). Stores the `Set-Cookie` cookies a
response carries and sends the matching ones (host, path, secure) on later
requests through this layer, in an in-memory per-jar store shared across
the client's request processes. `Opts`: `max_cookies` (default 3000),
`store` (a `livery_client_cookie_store` callback module; default ETS).
""".
-spec cookie_jar() -> entry().
cookie_jar() -> {livery_client_cookie, livery_client_cookie:jar()}.

-spec cookie_jar(map()) -> entry().
cookie_jar(Opts) -> {livery_client_cookie, livery_client_cookie:jar(Opts)}.

-doc "Add an endpoint to a balance pool at runtime.".
-spec add_endpoint(term(), endpoint()) -> ok.
add_endpoint(Name, Endpoint) -> livery_client_balance_store:add(Name, Endpoint).

-doc "Remove an endpoint from a balance pool at runtime.".
-spec remove_endpoint(term(), endpoint()) -> ok.
remove_endpoint(Name, Endpoint) -> livery_client_balance_store:remove(Name, Endpoint).

%%====================================================================
%% Sugar (mirrors livery_middleware, error-aware)
%%====================================================================

-spec before(fun((request()) -> request())) -> entry().
before(Fun) when is_function(Fun, 1) ->
    fun(Req, Next) -> Next(Fun(Req)) end.

-spec after_response(fun((response()) -> response())) -> entry().
after_response(Fun) when is_function(Fun, 1) ->
    fun(Req, Next) ->
        case Next(Req) of
            {ok, Resp} -> {ok, Fun(Resp)};
            {error, _} = E -> E
        end
    end.

-spec wrap(fun((throw | error | exit, term(), list()) -> result())) -> entry().
wrap(Fun) when is_function(Fun, 3) ->
    fun(Req, Next) ->
        try
            Next(Req)
        catch
            Class:Reason:Stack -> Fun(Class, Reason, Stack)
        end
    end.

%%====================================================================
%% Internals
%%====================================================================

build_request(Client, Method, Path, Opts) ->
    #{
        method => Method,
        url => full_url(Client, Path),
        headers => merge_headers(maps:get(headers, Client), maps:get(headers, Opts, [])),
        body => normalize_body(maps:get(body, Opts, empty)),
        timeout => maps:get(timeout, Opts, 30000),
        stream => maps:get(stream, Opts, false),
        stream_to => maps:get(stream_to, Opts, undefined),
        flow => maps:get(flow, Opts, auto),
        meta => maps:get(meta, Opts, #{})
    }.

run_stack([], Handler, Req) ->
    Handler(Req);
run_stack([Entry | Rest], Handler, Req) ->
    Next = fun(R) -> run_stack(Rest, Handler, R) end,
    call_entry(Entry, Req, Next).

call_entry({Mod, State}, Req, Next) when is_atom(Mod) ->
    Mod:call(Req, Next, State);
call_entry(Fun, Req, Next) when is_function(Fun, 2) ->
    Fun(Req, Next).

normalize_body(empty) -> empty;
normalize_body({full, _} = B) -> B;
normalize_body({stream, _} = B) -> B;
normalize_body(IoData) -> {full, IoData}.

full_url(Client, Path) ->
    case maps:get(base_url, Client) of
        <<>> -> Path;
        Base -> <<(strip_trailing_slash(Base))/binary, (ensure_leading_slash(Path))/binary>>
    end.

-doc """
Point a request URL at a chosen endpoint. Strips any scheme+authority
the URL already carries and joins the endpoint with the remaining
path+query, so the balancer owns the host. Used by `livery_client_balance`.
""".
-spec rebase(endpoint(), binary()) -> binary().
rebase(Endpoint, Url) ->
    Path = path_and_query(Url),
    <<(strip_trailing_slash(Endpoint))/binary, Path/binary>>.

%% The path+query of a URL, dropping scheme+authority if present.
path_and_query(Url) ->
    case binary:match(Url, <<"://">>) of
        nomatch ->
            ensure_leading_slash(Url);
        {Start, Len} ->
            Rest = binary_from(Url, Start + Len),
            case binary:match(Rest, <<"/">>) of
                nomatch -> <<"/">>;
                {Slash, _} -> binary_from(Rest, Slash)
            end
    end.

binary_from(Bin, Pos) ->
    binary:part(Bin, Pos, byte_size(Bin) - Pos).

strip_trailing_slash(B) ->
    case binary:last(B) of
        $/ -> binary:part(B, 0, byte_size(B) - 1);
        _ -> B
    end.

ensure_leading_slash(<<$/, _/binary>> = P) -> P;
ensure_leading_slash(P) -> <<$/, P/binary>>.

%% Default headers are overridden by per-request headers of the same name.
merge_headers(Defaults, Extra) ->
    Names = [string:lowercase(N) || {N, _} <- Extra],
    Kept = [KV || {N, _} = KV <- Defaults, not lists:member(string:lowercase(N), Names)],
    Kept ++ Extra.