Skip to main content

src/middleware/livery_concurrency.erl

-module(livery_concurrency).
-moduledoc """
Concurrency-limit / load-shedding middleware (admission control).

Caps the number of in-flight requests. Over the limit it sheds load
immediately with `503 Service Unavailable` and never calls the handler;
at or under the limit the request proceeds and the slot is released when
the handler returns.

Build the stack entry with the `limiter/1,2` factory, which creates the
shared counter once:

```erlang
Stack = [
    {livery_concurrency, livery_concurrency:limiter(1000)}
    %% ... handler runs only while < 1000 requests are in flight
].
```

The counter is a lock-free `atomics` cell shared across the request
processes (no extra process). A global limiter is one factory call in
the service stack; per-route limiters are independent factory calls.

Scope: a slot is held from admission until the handler RETURNS its
response. Body streaming happens after that (outside the middleware
stack), so the slot does not cover the duration of a streamed/SSE body.
The limit is approximate under a burst (a request that increments past
the limit decrements again), which is acceptable for load-shedding.
""".
-behaviour(livery_middleware).

-export([limiter/1, limiter/2, call/3]).

-export_type([state/0]).

-type state() :: #{
    ref := atomics:atomics_ref(),
    limit := non_neg_integer(),
    status := 100..599,
    body := iodata(),
    retry_after := non_neg_integer() | binary() | undefined
}.

-doc "Build a limiter stack State capping in-flight requests at `Limit`.".
-spec limiter(non_neg_integer()) -> state().
limiter(Limit) ->
    limiter(Limit, #{}).

-doc """
`limiter/1` with options.

`status` (default 503), `body` (default `<<"service unavailable">>`),
and `retry_after` (seconds as an integer, a literal binary, or
`undefined`) shape the shed response.
""".
-spec limiter(non_neg_integer(), map()) -> state().
limiter(Limit, Opts) when is_integer(Limit), Limit >= 0 ->
    #{
        ref => atomics:new(1, [{signed, false}]),
        limit => Limit,
        status => maps:get(status, Opts, 503),
        body => maps:get(body, Opts, <<"service unavailable">>),
        retry_after => maps:get(retry_after, Opts, undefined)
    }.

-doc "Admit the request if under the limit, otherwise shed with 503.".
-spec call(livery_req:req(), livery_middleware:next(), state()) ->
    livery_resp:resp().
call(Req, Next, #{ref := Ref, limit := Limit} = State) ->
    case atomics:add_get(Ref, 1, 1) of
        Count when Count > Limit ->
            atomics:sub(Ref, 1, 1),
            overloaded(State);
        _Count ->
            try
                Next(Req)
            after
                atomics:sub(Ref, 1, 1)
            end
    end.

-spec overloaded(state()) -> livery_resp:resp().
overloaded(#{status := Status, body := Body} = State) ->
    Resp = livery_resp:text(Status, Body),
    case maps:get(retry_after, State) of
        undefined -> Resp;
        Value -> livery_resp:with_header(<<"retry-after">>, retry_value(Value), Resp)
    end.

-spec retry_value(non_neg_integer() | binary()) -> binary().
retry_value(Value) when is_integer(Value) -> integer_to_binary(Value);
retry_value(Value) when is_binary(Value) -> Value.