Skip to main content

src/middleware/livery_ratelimit_store.erl

-module(livery_ratelimit_store).
-moduledoc """
Owner and store for `livery_ratelimit` token buckets.

A supervised gen_server that owns one public named ETS table and reaps
idle buckets on a timer. The per-request token-bucket decision
(`check/5`) runs in the calling process directly against the public
table (lock-free CAS), so the gen_server is never on the hot path - it
only owns the table and runs cleanup. The table is `public` so requests
update buckets without serializing through the owner; this is safe
because Livery runs no untrusted in-VM code, and making it `protected`
would force every check through the owner and reintroduce exactly the
single-process bottleneck the lock-free design avoids.

Each row is `{{Name, KeyDigest}, Tokens, LastMicros, Cap, Rate}`.
`KeyDigest` is a SHA-256 of the rate-limit key, so raw bearer tokens are
never stored. `Cap`/`Rate` are denormalized so the sweep can compute
refill without the limiter config.

The table is bounded: once it holds `ratelimit_max_keys` rows
(application environment, default 1,000,000) new keys are shed
(`{deny, undefined}`) rather than inserted, so a flood of distinct keys
cannot grow the table without limit. Idle buckets are reaped every
minute, so the bound is also released as load drops.
""".
-behaviour(gen_server).

-export([start_link/0, check/5, sweep/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).

-define(TABLE, livery_ratelimit).
-define(CLEANUP_INTERVAL, 60000).
-define(DEFAULT_MAX_KEYS, 1000000).

-record(state, {}).
-type state() :: #state{}.

-type result() ::
    {allow, float(), non_neg_integer() | undefined}
    | {deny, non_neg_integer() | undefined}.

-export_type([result/0]).

%%====================================================================
%% API
%%====================================================================

-spec start_link() -> {ok, pid()} | {error, term()}.
start_link() ->
    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

-doc """
Token-bucket decision for `{Name, KeyDigest}`.

Returns `{allow, RemainingTokens, ResetSecs}` (a token was consumed) or
`{deny, RetryAfterSecs}`. `ResetSecs`/`RetryAfterSecs` are `undefined`
when `Rate =< 0` (no refill). Runs in the caller; lock-free.
""".
-spec check(term(), binary(), non_neg_integer(), number(), integer()) -> result().
check(Name, KeyDigest, Cap, Rate, Now) ->
    do_check({Name, KeyDigest}, Cap, Rate, Now).

-doc "Reap fully-refilled buckets now; returns the number removed.".
-spec sweep() -> non_neg_integer().
sweep() ->
    gen_server:call(?MODULE, sweep).

%%====================================================================
%% Token bucket (runs in the caller against the public table)
%%====================================================================

-spec do_check({term(), binary()}, non_neg_integer(), number(), integer()) ->
    result().
do_check(Id, Cap, Rate, Now) ->
    case ets:lookup(?TABLE, Id) of
        [] ->
            decide_new(Id, float(Cap), Now, Cap, Rate);
        [{_Id, Tokens, Last, _Cap, _Rate} = Old] ->
            Refilled = min(float(Cap), Tokens + (Now - Last) / 1.0e6 * Rate),
            decide_existing(Id, Old, Refilled, Now, Cap, Rate)
    end.

-spec decide_new({term(), binary()}, float(), integer(), non_neg_integer(), number()) ->
    result().
decide_new(Id, Tokens, Now, Cap, Rate) when Tokens >= 1.0 ->
    case at_capacity() of
        true ->
            %% Shed new keys rather than grow the table without bound.
            {deny, undefined};
        false ->
            New = {Id, Tokens - 1.0, Now, Cap, Rate},
            case ets:insert_new(?TABLE, New) of
                true -> {allow, Tokens - 1.0, reset_secs(Tokens - 1.0, Cap, Rate)};
                false -> do_check(Id, Cap, Rate, Now)
            end
    end;
decide_new(_Id, Tokens, _Now, _Cap, Rate) ->
    {deny, retry_secs(Tokens, Rate)}.

-spec at_capacity() -> boolean().
at_capacity() ->
    Max = application:get_env(livery, ratelimit_max_keys, ?DEFAULT_MAX_KEYS),
    ets:info(?TABLE, size) >= Max.

-spec decide_existing(
    {term(), binary()}, tuple(), float(), integer(), non_neg_integer(), number()
) -> result().
decide_existing(Id, Old, Refilled, Now, Cap, Rate) when Refilled >= 1.0 ->
    New = {Id, Refilled - 1.0, Now, Cap, Rate},
    case ets:select_replace(?TABLE, [{Old, [], [{const, New}]}]) of
        1 -> {allow, Refilled - 1.0, reset_secs(Refilled - 1.0, Cap, Rate)};
        0 -> do_check(Id, Cap, Rate, Now)
    end;
decide_existing(_Id, _Old, Refilled, _Now, _Cap, Rate) ->
    {deny, retry_secs(Refilled, Rate)}.

-spec reset_secs(float(), non_neg_integer(), number()) ->
    non_neg_integer() | undefined.
reset_secs(_Tokens, _Cap, Rate) when Rate =< 0 ->
    undefined;
reset_secs(Tokens, Cap, Rate) ->
    erlang:ceil((float(Cap) - Tokens) / Rate).

-spec retry_secs(float(), number()) -> non_neg_integer() | undefined.
retry_secs(_Tokens, Rate) when Rate =< 0 ->
    undefined;
retry_secs(Tokens, Rate) ->
    erlang:ceil((1.0 - Tokens) / Rate).

%%====================================================================
%% gen_server
%%====================================================================

-spec init([]) -> {ok, state()}.
init([]) ->
    _ = ets:new(?TABLE, [
        named_table,
        public,
        set,
        {write_concurrency, true},
        {read_concurrency, true}
    ]),
    schedule_cleanup(),
    {ok, #state{}}.

-spec handle_call(term(), gen_server:from(), state()) -> {reply, term(), state()}.
handle_call(sweep, _From, State) ->
    {reply, do_sweep(now_micros()), State};
handle_call(_Request, _From, State) ->
    {reply, ok, State}.

-spec handle_cast(term(), state()) -> {noreply, state()}.
handle_cast(_Request, State) ->
    {noreply, State}.

-spec handle_info(term(), state()) -> {noreply, state()}.
handle_info(cleanup, State) ->
    _ = do_sweep(now_micros()),
    schedule_cleanup(),
    {noreply, State};
handle_info(_Info, State) ->
    {noreply, State}.

%%====================================================================
%% Cleanup
%%====================================================================

-spec schedule_cleanup() -> reference().
schedule_cleanup() ->
    erlang:send_after(?CLEANUP_INTERVAL, self(), cleanup).

%% Delete only buckets that are (or by now would be) fully refilled:
%% `Tokens + (Now - Last)/1e6 * Rate >= Cap`. A fully-refilled bucket is
%% behaviorally identical to a fresh one, so this never grants extra
%% budget and never resets a hot/exhausted key (its refill is < Cap).
-spec do_sweep(integer()) -> non_neg_integer().
do_sweep(Now) ->
    NowF = float(Now),
    Spec = [
        {
            {'_', '$2', '$3', '$4', '$5'},
            [
                {'>=', {'+', '$2', {'*', {'/', {'-', NowF, '$3'}, 1.0e6}, '$5'}}, '$4'}
            ],
            [true]
        }
    ],
    ets:select_delete(?TABLE, Spec).

-spec now_micros() -> integer().
now_micros() ->
    erlang:monotonic_time(microsecond).