Skip to main content

src/client/livery_client_balance_store.erl

-module(livery_client_balance_store).
-moduledoc """
Owns the ETS table backing the client load balancers.

A `gen_server` that creates a public named table at startup; the
selection logic in `livery_client_balance` reads and writes it directly
(no per-request round trip through this process). One pool per `name`,
with one row per endpoint:

    {{ep, Name, Endpoint}, Status, Fails, Until, Inflight}

`Status` is `up | ejected`. `Inflight` is a per-endpoint `atomics` ref
holding the in-flight request count (the P2C load metric). `Until` is an
`erlang:monotonic_time(millisecond)` deadline: while ejected, the
endpoint rejoins only after `Until` has passed, and recovery is a single
probe leased with an atomic compare-and-swap so concurrent callers cannot
all probe at once. A `{{meta, Name}, RoundRobinRef}` row marks the pool's
existence (for create-once `ensure/2`) and carries the round-robin
counter.
""".
-behaviour(gen_server).

-export([start_link/0, ensure/2, add/2, remove/2, reset/1]).
-export([pick/3, record/5, release/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).

-define(TABLE, ?MODULE).
-record(state, {}).
-type state() :: #state{}.
-type policy() :: p2c | round_robin.

-spec start_link() -> {ok, pid()} | {error, term()}.
start_link() ->
    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

-doc """
Seed a pool from `Endpoints`, once. If the pool already exists this is a
no-op, so a lazy call on a later request never undoes a runtime
`remove/2` or re-adds a drained node. The cheap membership check runs in
the caller; only a miss talks to the gen_server.
""".
-spec ensure(term(), [binary()]) -> ok.
ensure(Name, Endpoints) ->
    case ets:member(?TABLE, {meta, Name}) of
        true -> ok;
        false -> gen_server:call(?MODULE, {ensure, Name, Endpoints})
    end.

-doc "Add an endpoint to a pool at runtime.".
-spec add(term(), binary()) -> ok.
add(Name, Endpoint) ->
    gen_server:call(?MODULE, {add, Name, Endpoint}).

-doc "Remove an endpoint from a pool at runtime.".
-spec remove(term(), binary()) -> ok.
remove(Name, Endpoint) ->
    gen_server:call(?MODULE, {remove, Name, Endpoint}).

-doc "Forget a pool entirely.".
-spec reset(term()) -> ok.
reset(Name) ->
    gen_server:call(?MODULE, {reset, Name}).

-doc """
Pick an endpoint for one request, incrementing its in-flight count.
Returns `{ok, Endpoint, Token}` (pass `Token` to `release/1` when the
request finishes) or `{error, no_endpoint}`. A recovering endpoint whose
cooldown has expired is leased for a single probe via an atomic CAS;
otherwise selection is over the healthy endpoints by `Policy`.
""".
-spec pick(term(), policy(), non_neg_integer()) ->
    {ok, binary(), atomics:atomics_ref()} | {error, no_endpoint}.
pick(Name, Policy, EjectFor) ->
    case ets:select(?TABLE, all_spec(Name)) of
        [] ->
            {error, no_endpoint};
        Rows ->
            Now = now_ms(),
            case lease_probe(Name, Rows, Now, EjectFor) of
                {ok, _Ep, _Ref} = Ok ->
                    Ok;
                none ->
                    pick_up(Name, Policy, Rows)
            end
    end.

-doc "Release the in-flight slot taken by `pick/3` (safe if removed).".
-spec release(atomics:atomics_ref()) -> ok.
release(Ref) ->
    atomics:sub(Ref, 1, 1),
    ok.

-doc """
Record a request outcome against the endpoint. `ok` resets the failure
streak and reinstates an ejected endpoint; `err` increments the streak
and ejects at `EjectAfter`, or re-ejects a failed probe. A missing row
(removed mid-flight) is a no-op.
""".
-spec record(term(), binary(), ok | err, pos_integer(), non_neg_integer()) -> ok.
record(Name, Endpoint, Outcome, EjectAfter, EjectFor) ->
    case ets:lookup(?TABLE, {ep, Name, Endpoint}) of
        [] ->
            ok;
        [{Key, Status, Fails, _Until, Ref}] ->
            apply_outcome(Key, Status, Fails, Ref, Outcome, EjectAfter, EjectFor)
    end.

%%====================================================================
%% gen_server
%%====================================================================

-spec init([]) -> {ok, state()}.
init([]) ->
    _ = ets:new(?TABLE, [named_table, public, set, {write_concurrency, true}]),
    {ok, #state{}}.

-spec handle_call(term(), {pid(), term()}, state()) -> {reply, ok, state()}.
handle_call({ensure, Name, Endpoints}, _From, State) ->
    case ets:member(?TABLE, {meta, Name}) of
        true ->
            ok;
        false ->
            true = ets:insert(?TABLE, {{meta, Name}, atomics:new(1, [])}),
            lists:foreach(fun(Ep) -> insert_endpoint(Name, Ep) end, Endpoints)
    end,
    {reply, ok, State};
handle_call({add, Name, Endpoint}, _From, State) ->
    case ets:member(?TABLE, {ep, Name, Endpoint}) of
        true -> ok;
        false -> insert_endpoint(Name, Endpoint)
    end,
    {reply, ok, State};
handle_call({remove, Name, Endpoint}, _From, State) ->
    true = ets:delete(?TABLE, {ep, Name, Endpoint}),
    {reply, ok, State};
handle_call({reset, Name}, _From, State) ->
    true = ets:match_delete(?TABLE, {{ep, Name, '_'}, '_', '_', '_', '_'}),
    true = ets:delete(?TABLE, {meta, Name}),
    {reply, ok, State};
handle_call(_Request, _From, State) ->
    {reply, ok, State}.

-spec handle_cast(term(), state()) -> {noreply, state()}.
handle_cast(_Msg, State) -> {noreply, State}.

-spec handle_info(term(), state()) -> {noreply, state()}.
handle_info(_Info, State) -> {noreply, State}.

%%====================================================================
%% Selection
%%====================================================================

%% {Endpoint, Status, Fails, Until, Inflight} for every endpoint in Name.
all_spec(Name) ->
    [
        {
            {{ep, Name, '$1'}, '$2', '$3', '$4', '$5'},
            [],
            [{{'$1', '$2', '$3', '$4', '$5'}}]
        }
    ].

%% Try to lease a single probe on the first expired-ejected endpoint, by
%% atomically pushing its deadline forward. The winner of the CAS owns
%% the probe; concurrent callers see 0 replacements and move on.
lease_probe(_Name, [], _Now, _EjectFor) ->
    none;
lease_probe(Name, [{Ep, ejected, _F, Until, Ref} | Rest], Now, EjectFor) when Until =< Now ->
    case cas_lease(Name, Ep, Now, EjectFor) of
        1 ->
            atomics:add(Ref, 1, 1),
            {ok, Ep, Ref};
        0 ->
            lease_probe(Name, Rest, Now, EjectFor)
    end;
lease_probe(Name, [_Row | Rest], Now, EjectFor) ->
    lease_probe(Name, Rest, Now, EjectFor).

%% Bind the whole row in vars and keep the key untouched ('$1'): a bare
%% tuple in a match-spec body is read as an action, so a tuple Name cannot
%% be reconstructed there. {const, Key} injects it literally in the guard.
cas_lease(Name, Ep, Now, EjectFor) ->
    Key = {ep, Name, Ep},
    NewUntil = Now + EjectFor,
    ets:select_replace(?TABLE, [
        {
            {'$1', ejected, '$2', '$3', '$4'},
            [{'=:=', '$1', {const, Key}}, {'=<', '$3', Now}],
            [{{'$1', ejected, '$2', NewUntil, '$4'}}]
        }
    ]).

pick_up(Name, Policy, Rows) ->
    case [{Ep, Ref, load(Ref)} || {Ep, up, _F, _U, Ref} <- Rows] of
        [] ->
            {error, no_endpoint};
        Ups ->
            {Ep, Ref} = choose(Policy, Name, Ups),
            atomics:add(Ref, 1, 1),
            {ok, Ep, Ref}
    end.

choose(round_robin, Name, Ups) ->
    [{_, RRRef}] = ets:lookup(?TABLE, {meta, Name}),
    Idx = atomics:add_get(RRRef, 1, 1),
    {Ep, Ref, _Load} = lists:nth((Idx rem length(Ups)) + 1, Ups),
    {Ep, Ref};
choose(_P2c, _Name, [{Ep, Ref, _Load}]) ->
    {Ep, Ref};
choose(_P2c, _Name, Ups) ->
    N = length(Ups),
    I = rand:uniform(N),
    J0 = rand:uniform(N - 1),
    J =
        case J0 >= I of
            true -> J0 + 1;
            false -> J0
        end,
    {Ep, Ref, _} = lower_load(lists:nth(I, Ups), lists:nth(J, Ups)),
    {Ep, Ref}.

lower_load({_, _, La} = A, {_, _, Lb} = B) ->
    case La =< Lb of
        true -> A;
        false -> B
    end.

load(Ref) -> atomics:get(Ref, 1).

%%====================================================================
%% Outcome accounting
%%====================================================================

apply_outcome(Key, _Status, _Fails, Ref, ok, _EjectAfter, _EjectFor) ->
    true = ets:insert(?TABLE, {Key, up, 0, 0, Ref}),
    ok;
apply_outcome(Key, up, Fails, Ref, err, EjectAfter, EjectFor) ->
    case Fails + 1 >= EjectAfter of
        true -> true = ets:insert(?TABLE, {Key, ejected, 0, now_ms() + EjectFor, Ref});
        false -> true = ets:insert(?TABLE, {Key, up, Fails + 1, 0, Ref})
    end,
    ok;
apply_outcome(Key, ejected, Fails, Ref, err, _EjectAfter, EjectFor) ->
    true = ets:insert(?TABLE, {Key, ejected, Fails, now_ms() + EjectFor, Ref}),
    ok.

%%====================================================================
%% Internals
%%====================================================================

insert_endpoint(Name, Endpoint) ->
    true = ets:insert(?TABLE, {{ep, Name, Endpoint}, up, 0, 0, atomics:new(1, [])}),
    ok.

now_ms() ->
    erlang:monotonic_time(millisecond).