src/erllama_cache_meta_srv.erl

%% Copyright (c) 2026 Benoit Chesneau. Licensed under the MIT License.
%% See the LICENSE file at the project root.
%%
-module(erllama_cache_meta_srv).
-moduledoc """
Sole writer for the cache meta and LRU ETS tables; arbitrates
claim/release and the reservation state machine for save
publication.

Two read-mostly ETS tables, owned by this process and `protected`
so any caller can read them without a server hop:

  erllama_cache_meta : set, key = cache_key, row layout per
                       include/erllama_cache.hrl ?POS_* constants
  erllama_cache_lru  : ordered_set, key = {LastUsedNs, cache_key},
                       value = []

Two server-internal maps in process state:

  holders      : MonRef -> {Pid, Key}; one entry per active claim
  reservations : Key -> #reservation{}; one entry per in-flight save

Plus a waiters map for `lookup_exact_or_wait/2` which defers replies
until the in-flight save publishes (or the per-call deadline fires).

The reservation state machine has two stages, `pre_link` and
`post_link`, to make crash cleanup correct: a writer that died
before `link/2` leaves no file; a writer that died after `link/2`
may have left a valid `.kvc` we can validate-and-adopt.
""".
-behaviour(gen_server).

-include("erllama_cache.hrl").

-export([
    start_link/0,
    %% Read-only (no server hop)
    lookup_exact/1,
    lookup_longest_prefix/4,
    %% Read with bounded wait for an in-flight save
    lookup_exact_or_wait/2,
    %% Claim/release (active reader of a slab)
    checkout/2,
    checkin/1,
    %% Reservation state machine (writer)
    reserve_save/3,
    check_reservation/2,
    mark_published/3,
    announce_saved/4,
    announce_saved/5,
    cancel_reservation/2,
    %% Operator/test helpers
    gc/0,
    evict_bytes/1,
    evict_bytes/2,
    dump/0,
    dump/1,
    insert_available/5,
    insert_available/6
]).

-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).

-define(SERVER, ?MODULE).
-define(TBL_META, erllama_cache_meta).
-define(TBL_LRU, erllama_cache_lru).

%% Default time-to-live for a reservation when the writer is silent.
%% Refreshed on each successful state transition (reserve / check /
%% mark_published / announce_saved).
-define(DEFAULT_TTL_NS, 60 * 1000 * 1000 * 1000).

%% Periodic sweep for stale reservations and expired waiters.
-define(SWEEP_INTERVAL_MS, 30 * 1000).

-record(reservation, {
    writer :: pid(),
    token :: reference(),
    monref :: reference(),
    expires_ns :: integer(),
    stage :: pre_link | post_link,
    tier :: erllama_cache:tier(),
    path :: file:name() | undefined
}).

-record(state, {
    holders :: #{reference() => {pid(), erllama_cache:cache_key()}},
    reservations :: #{erllama_cache:cache_key() => #reservation{}},
    waiters ::
        #{erllama_cache:cache_key() => [{gen_server:from(), integer(), reference()}]},
    sweep_timer :: reference() | undefined
}).

-type state() :: #state{}.

%% =============================================================================
%% Public API
%% =============================================================================

-spec start_link() -> {ok, pid()} | {error, term()}.
start_link() ->
    gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).

-spec lookup_exact(erllama_cache:cache_key()) ->
    {ok, tuple()} | miss.
lookup_exact(Key) ->
    %% Single ets:lookup; an eviction between the historical two-call
    %% pattern (lookup_element + lookup) would crash the match.
    try ets:lookup(?TBL_META, Key) of
        [Row] when element(?POS_STATUS, Row) =:= available ->
            {ok, Row};
        _ ->
            miss
    catch
        error:badarg -> miss
    end.

-spec lookup_exact_or_wait(erllama_cache:cache_key(), non_neg_integer()) ->
    {ok, tuple()} | miss.
lookup_exact_or_wait(Key, MaxWaitMs) ->
    gen_server:call(?SERVER, {lookup_or_wait, Key, MaxWaitMs}, infinity).

-doc """
Walk Tokens backward in Stride steps and return the row for the
longest cached prefix. Pure ETS reads, no server hop. Used by
stateless callers (HTTP front-end, agent loops) that resend a full
conversation each turn and don't have a parent_key to thread.

Stops at MinTokens floor and returns `miss` if nothing matches.
Walks at most `length(Tokens) / Stride` rows; with the default
2048-token stride that's ~15 lookups for a 30k-token prompt.

Encodes the full token list to a binary once at entry, then passes
`binary:part(TokensBin, 0, N*4)` sub-binaries (O(1) views) to
`erllama_cache_key:make/4` per probe. Avoids re-traversing the list
and re-allocating the binary on every step; the only per-probe cost
is the SHA-256 over N*4 bytes plus the ETS lookup.
""".
-spec lookup_longest_prefix(map(), [erllama_nif:token_id()], pos_integer(), pos_integer()) ->
    {ok, pos_integer(), tuple()} | miss.
lookup_longest_prefix(KeyMeta, Tokens, Stride, MinTokens) when
    is_integer(Stride), Stride > 0, is_integer(MinTokens), MinTokens > 0
->
    T0 = erlang:monotonic_time(nanosecond),
    Len = length(Tokens),
    Start = (Len div Stride) * Stride,
    Floor = max(MinTokens, Stride),
    AllTokensBin = erllama_cache_key:encode_tokens(Tokens),
    #{fingerprint := Fp, quant_type := QT, ctx_params_hash := CtxHash} = KeyMeta,
    Result = walk_prefix(Fp, QT, CtxHash, AllTokensBin, Start, Stride, Floor, 0),
    Elapsed = erlang:monotonic_time(nanosecond) - T0,
    erllama_cache_counters:add(?C_LONGEST_PREFIX_NS, max(Elapsed, 0)),
    Result.

walk_prefix(_Fp, _QT, _CtxHash, _Bin, N, _Stride, Floor, Probes) when N < Floor ->
    erllama_cache_counters:add(?C_LONGEST_PREFIX_PROBES, Probes),
    miss;
walk_prefix(Fp, QT, CtxHash, Bin, N, Stride, Floor, Probes) ->
    PrefixBin = binary:part(Bin, 0, N * 4),
    Key = erllama_cache_key:make(Fp, QT, CtxHash, PrefixBin),
    case lookup_exact(Key) of
        {ok, Row} ->
            erllama_cache_counters:add(?C_LONGEST_PREFIX_PROBES, Probes + 1),
            {ok, N, Row};
        miss ->
            walk_prefix(Fp, QT, CtxHash, Bin, N - Stride, Stride, Floor, Probes + 1)
    end.

-spec checkout(erllama_cache:cache_key(), pid()) ->
    {ok, reference(), erllama_cache:tier(), term(), binary(), term()}
    | {error, busy}
    | miss.
checkout(Key, Pid) when is_pid(Pid) ->
    gen_server:call(?SERVER, {checkout, Key, Pid}).

-spec checkin(reference()) -> ok.
checkin(MonRef) when is_reference(MonRef) ->
    gen_server:call(?SERVER, {checkin, MonRef}).

-spec reserve_save(erllama_cache:cache_key(), erllama_cache:tier(), pid()) ->
    {ok, reference()} | {error, already_present | conflict}.
reserve_save(Key, Tier, Pid) when is_pid(Pid) ->
    gen_server:call(?SERVER, {reserve_save, Key, Tier, Pid}).

-spec check_reservation(erllama_cache:cache_key(), reference()) ->
    ok | {error, expired}.
check_reservation(Key, Token) when is_reference(Token) ->
    gen_server:call(?SERVER, {check_reservation, Key, Token}).

-spec mark_published(erllama_cache:cache_key(), reference(), file:name()) ->
    ok | {error, expired}.
mark_published(Key, Token, Path) when is_reference(Token) ->
    gen_server:call(?SERVER, {mark_published, Key, Token, Path}).

-spec announce_saved(
    erllama_cache:cache_key(), reference(), non_neg_integer(), binary()
) -> ok | {error, expired}.
announce_saved(Key, Token, Size, Header) ->
    announce_saved(Key, Token, Size, Header, undefined).

-spec announce_saved(
    erllama_cache:cache_key(),
    reference(),
    non_neg_integer(),
    binary(),
    binary() | undefined
) -> ok | {error, expired}.
announce_saved(Key, Token, Size, Header, TokensBin) when
    is_reference(Token), is_binary(Header)
->
    gen_server:call(?SERVER, {announce_saved, Key, Token, Size, Header, TokensBin}).

-spec cancel_reservation(erllama_cache:cache_key(), reference()) -> ok.
cancel_reservation(Key, Token) when is_reference(Token) ->
    gen_server:call(?SERVER, {cancel_reservation, Key, Token}).

%% Direct insertion of an `available` row. Used by the RAM tier
%% (which has no on-disk publish step) and by the disk tier on-start
%% scan to register pre-existing valid files.
-spec insert_available(
    erllama_cache:cache_key(),
    erllama_cache:tier(),
    non_neg_integer(),
    binary(),
    term()
) -> ok.
insert_available(Key, Tier, Size, Header, Location) ->
    insert_available(Key, Tier, Size, Header, Location, undefined).

-spec insert_available(
    erllama_cache:cache_key(),
    erllama_cache:tier(),
    non_neg_integer(),
    binary(),
    term(),
    binary() | undefined
) -> ok.
insert_available(Key, Tier, Size, Header, Location, TokensBin) ->
    gen_server:call(
        ?SERVER, {insert_available, Key, Tier, Size, Header, Location, TokensBin}
    ).

-spec gc() -> {evicted, non_neg_integer()}.
gc() ->
    gen_server:call(?SERVER, gc).

-doc """
Evict oldest available rows until at least TargetBytes have been
freed (or no more candidates remain). Returns the number of rows
evicted and the bytes actually freed.
""".
-spec evict_bytes(non_neg_integer()) ->
    {evicted, non_neg_integer(), non_neg_integer()}.
evict_bytes(TargetBytes) ->
    evict_bytes(TargetBytes, all).

-doc """
Evict oldest available rows whose tier is in Tiers until at least
TargetBytes have been freed. `Tiers = all` matches every tier;
otherwise it must be a list drawn from `[ram, ram_file, disk]`.
""".
-spec evict_bytes(non_neg_integer(), all | [erllama_cache:tier()]) ->
    {evicted, non_neg_integer(), non_neg_integer()}.
evict_bytes(TargetBytes, Tiers) when is_integer(TargetBytes), TargetBytes >= 0 ->
    gen_server:call(?SERVER, {evict_bytes, TargetBytes, Tiers}).

-spec dump() -> [tuple()].
dump() ->
    ets:tab2list(?TBL_META).

-spec dump(erllama_cache:cache_key()) -> {ok, tuple()} | miss.
dump(Key) ->
    lookup_row(Key).
%% Note: dump/1 wants the full row, so the lookup/2 form is appropriate
%% here. lookup_element + lookup would be two ETS ops on the hit path
%% with no benefit.

%% Shared ETS row lookup used by dump/1 and by notify_waiters/2.
%% Returns the raw row tuple as `{ok, Row}` or `miss`.
-spec lookup_row(erllama_cache:cache_key()) -> {ok, tuple()} | miss.
lookup_row(Key) ->
    case ets:lookup(?TBL_META, Key) of
        [Row] -> {ok, Row};
        [] -> miss
    end.

%% =============================================================================
%% gen_server callbacks
%% =============================================================================

-spec init([]) -> {ok, state()}.
init([]) ->
    EtsOpts = [
        named_table,
        protected,
        {keypos, 1},
        {read_concurrency, true}
    ],
    ets:new(?TBL_META, [set | EtsOpts]),
    ets:new(?TBL_LRU, [ordered_set | EtsOpts]),
    Timer = erlang:send_after(?SWEEP_INTERVAL_MS, self(), sweep),
    State = #state{
        holders = #{},
        reservations = #{},
        waiters = #{},
        sweep_timer = Timer
    },
    {ok, State}.

handle_call({checkout, Key, Pid}, _From, S) ->
    try ets:lookup_element(?TBL_META, Key, ?POS_STATUS) of
        available ->
            [Row] = ets:lookup(?TBL_META, Key),
            do_checkout(Key, Pid, Row, S);
        _ ->
            {reply, {error, busy}, S}
    catch
        error:badarg -> {reply, miss, S}
    end;
handle_call({checkin, MonRef}, _From, S) ->
    case maps:take(MonRef, S#state.holders) of
        {{_Pid, Key}, Holders1} ->
            erlang:demonitor(MonRef, [flush]),
            decrement_refcount(Key),
            {reply, ok, S#state{holders = Holders1}};
        error ->
            {reply, ok, S}
    end;
handle_call({lookup_or_wait, Key, MaxWaitMs}, From, S) ->
    try ets:lookup_element(?TBL_META, Key, ?POS_STATUS) of
        available ->
            [Row] = ets:lookup(?TBL_META, Key),
            {reply, {ok, Row}, S};
        writing when MaxWaitMs > 0 ->
            {noreply, add_waiter(Key, From, MaxWaitMs, S)};
        _ ->
            {reply, miss, S}
    catch
        error:badarg -> {reply, miss, S}
    end;
handle_call({reserve_save, Key, Tier, Pid}, _From, S) ->
    do_reserve_save(Key, Tier, Pid, S);
handle_call({check_reservation, Key, Token}, _From, S) ->
    case maps:get(Key, S#state.reservations, undefined) of
        #reservation{token = Token} = R ->
            R1 = R#reservation{expires_ns = monotonic_ns() + ?DEFAULT_TTL_NS},
            {reply, ok, S#state{reservations = (S#state.reservations)#{Key => R1}}};
        _ ->
            {reply, {error, expired}, S}
    end;
handle_call({mark_published, Key, Token, Path}, _From, S) ->
    case maps:get(Key, S#state.reservations, undefined) of
        #reservation{token = Token} = R ->
            R1 = R#reservation{
                stage = post_link,
                path = Path,
                expires_ns = monotonic_ns() + ?DEFAULT_TTL_NS
            },
            {reply, ok, S#state{reservations = (S#state.reservations)#{Key => R1}}};
        _ ->
            {reply, {error, expired}, S}
    end;
handle_call({announce_saved, Key, Token, Size, Header, TokensBin}, _From, S) ->
    case maps:get(Key, S#state.reservations, undefined) of
        #reservation{token = Token, monref = MonRef, tier = Tier, path = Path} ->
            erlang:demonitor(MonRef, [flush]),
            install_available_row(
                Key, Tier, Size, Header, location_for(Tier, Path), TokensBin
            ),
            S1 = S#state{reservations = maps:remove(Key, S#state.reservations)},
            S2 = notify_waiters(Key, S1),
            {reply, ok, S2};
        _ ->
            {reply, {error, expired}, S}
    end;
handle_call({cancel_reservation, Key, Token}, _From, S) ->
    case maps:get(Key, S#state.reservations, undefined) of
        #reservation{token = Token} = R ->
            erlang:demonitor(R#reservation.monref, [flush]),
            ets:delete(?TBL_META, Key),
            S1 = S#state{reservations = maps:remove(Key, S#state.reservations)},
            {reply, ok, S1};
        _ ->
            {reply, ok, S}
    end;
handle_call({insert_available, Key, Tier, Size, Header, Location, TokensBin}, _From, S) ->
    install_available_row(Key, Tier, Size, Header, Location, TokensBin),
    S1 = notify_waiters(Key, S),
    {reply, ok, S1};
handle_call(gc, _From, S) ->
    Evicted = run_eviction(),
    {reply, {evicted, Evicted}, S};
handle_call({evict_bytes, 0, _Tiers}, _From, S) ->
    %% "Evict at least 0 bytes" is a no-op. Use gc/0 for full GC.
    {reply, {evicted, 0, 0}, S};
handle_call({evict_bytes, Target, Tiers}, _From, S) when Target > 0 ->
    {N, Bytes} = run_eviction_bytes(Target, tier_pred(Tiers)),
    {reply, {evicted, N, Bytes}, S};
handle_call(_Msg, _From, S) ->
    {reply, {error, unknown_call}, S}.

-spec handle_cast(term(), state()) -> {noreply, state()}.
handle_cast(_Msg, S) ->
    {noreply, S}.

handle_info({'DOWN', Ref, process, _DownPid, _Reason}, S) ->
    case maps:take(Ref, S#state.holders) of
        {{_HolderPid, Key}, Holders1} ->
            decrement_refcount(Key),
            {noreply, S#state{holders = Holders1}};
        error ->
            {noreply, on_writer_down(Ref, S)}
    end;
handle_info({waiter_expire, Key, From}, S) ->
    {noreply, expire_waiter(Key, From, S)};
handle_info(sweep, S) ->
    S1 = sweep_reservations(S),
    Timer = erlang:send_after(?SWEEP_INTERVAL_MS, self(), sweep),
    {noreply, S1#state{sweep_timer = Timer}};
handle_info(_Msg, S) ->
    {noreply, S}.

terminate(_Reason, S) ->
    case S#state.sweep_timer of
        undefined ->
            ok;
        TRef ->
            _ = erlang:cancel_timer(TRef),
            ok
    end.

%% =============================================================================
%% Internal: checkout / refcount / LRU
%% =============================================================================

do_checkout(Key, Pid, Row, S) ->
    MonRef = erlang:monitor(process, Pid),
    NowNs = monotonic_ns(),
    OldLastUsed = element(?POS_LAST_USED, Row),
    ets:delete(?TBL_LRU, {OldLastUsed, Key}),
    ets:insert(?TBL_LRU, {{NowNs, Key}, []}),
    [NewHits] = ets:update_counter(?TBL_META, Key, [{?POS_HITS, +1}]),
    _ = ets:update_counter(?TBL_META, Key, {?POS_REFCOUNT, +1}),
    ets:update_element(?TBL_META, Key, {?POS_LAST_USED, NowNs}),
    Tier = element(?POS_TIER, Row),
    Loc = element(?POS_LOCATION, Row),
    Header = element(?POS_HEADER_BIN, Row),
    Tokens = element(?POS_TOKENS_REF, Row),
    %% Persist the bumped hit count for restart-survival. Best-effort:
    %% a failed write only loses the increment, with no behavioural
    %% impact this run. RAM tier has no persistent file.
    persist_hits(Loc, NewHits),
    Holders1 = (S#state.holders)#{MonRef => {Pid, Key}},
    {reply, {ok, MonRef, Tier, Loc, Header, Tokens}, S#state{holders = Holders1}}.

persist_hits({disk, Path}, Hits) ->
    erllama_cache_disk_srv:touch_hits(Path, Hits);
persist_hits({ram_file, Path}, Hits) ->
    erllama_cache_disk_srv:touch_hits(Path, Hits);
persist_hits(_, _) ->
    ok.

decrement_refcount(Key) ->
    try
        _ = ets:update_counter(?TBL_META, Key, {?POS_REFCOUNT, -1, 0, 0}),
        ok
    catch
        error:badarg -> ok
    end.

install_available_row(Key, Tier, Size, Header, Location, TokensBin) ->
    NowNs = monotonic_ns(),
    try ets:lookup_element(?TBL_META, Key, ?POS_LAST_USED) of
        OldLastUsed -> ets:delete(?TBL_LRU, {OldLastUsed, Key})
    catch
        error:badarg -> ok
    end,
    Hits = hits_from_header(Header),
    %% Bias last_used by accumulated hits so a high-hit row survives
    %% an LRU walk on a freshly-restarted server, where every row's
    %% natural last_used would otherwise collapse to ~NowNs and leave
    %% the order effectively random. Each accumulated hit pushes the
    %% row 1 second forward in the LRU. Once a row is actively
    %% checked out at runtime, recency takes over.
    LastUsed = NowNs + Hits * 1_000_000_000,
    Row =
        {Key, Tier, Size, LastUsed, 0, available, Header, Location, TokensBin, Hits},
    ets:insert(?TBL_META, Row),
    ets:insert(?TBL_LRU, {{LastUsed, Key}, []}),
    ok.

%% Extract the u32 hit_count from the on-disk header. RAM tier saves
%% pass a placeholder header so we treat a too-short header as "no
%% prior hits".
hits_from_header(<<_:?KVC_HEADER_HITS_OFFSET/binary, Hits:32/little, _/binary>>) ->
    Hits;
hits_from_header(_) ->
    0.

%% =============================================================================
%% Internal: reservation
%% =============================================================================

do_reserve_save(Key, Tier, Pid, S) ->
    try ets:lookup_element(?TBL_META, Key, ?POS_STATUS) of
        available ->
            {reply, {error, already_present}, S};
        writing ->
            handle_existing_writing_row(Key, Tier, Pid, S);
        evicting ->
            {reply, {error, conflict}, S}
    catch
        error:badarg -> create_reservation(Key, Tier, Pid, S)
    end.

handle_existing_writing_row(Key, Tier, Pid, S) ->
    case maps:get(Key, S#state.reservations, undefined) of
        undefined ->
            ets:delete(?TBL_META, Key),
            create_reservation(Key, Tier, Pid, S);
        #reservation{} = Old ->
            case reservation_is_live(Old) of
                true ->
                    {reply, {error, conflict}, S};
                false ->
                    S1 = cleanup_stale_reservation(Key, Old, S),
                    create_reservation(Key, Tier, Pid, S1)
            end
    end.

reservation_is_live(#reservation{writer = Pid, expires_ns = E}) ->
    is_process_alive(Pid) andalso E > monotonic_ns().

create_reservation(Key, Tier, Pid, S) ->
    MonRef = erlang:monitor(process, Pid),
    Token = erlang:make_ref(),
    NowNs = monotonic_ns(),
    R = #reservation{
        writer = Pid,
        token = Token,
        monref = MonRef,
        expires_ns = NowNs + ?DEFAULT_TTL_NS,
        stage = pre_link,
        tier = Tier,
        path = undefined
    },
    Placeholder = {Key, Tier, 0, NowNs, 0, writing, <<>>, undefined, undefined},
    ets:insert(?TBL_META, Placeholder),
    {reply, {ok, Token}, S#state{reservations = (S#state.reservations)#{Key => R}}}.

cleanup_stale_reservation(Key, Old, S) ->
    erlang:demonitor(Old#reservation.monref, [flush]),
    cleanup_by_stage(Key, Old, S).

cleanup_by_stage(Key, #reservation{stage = pre_link}, S) ->
    ets:delete(?TBL_META, Key),
    S#state{reservations = maps:remove(Key, S#state.reservations)};
cleanup_by_stage(Key, #reservation{stage = post_link, path = Path, tier = Tier}, S) ->
    case validate_and_adopt(Key, Path) of
        {ok, Size, Header, TokensBin} ->
            install_available_row(
                Key, Tier, Size, Header, location_for(Tier, Path), TokensBin
            ),
            S1 = S#state{reservations = maps:remove(Key, S#state.reservations)},
            notify_waiters(Key, S1);
        {error, _Reason} ->
            _ = file:delete(Path),
            ets:delete(?TBL_META, Key),
            S#state{reservations = maps:remove(Key, S#state.reservations)}
    end.

validate_and_adopt(Key, Path) ->
    case file:read_file(Path) of
        {ok, Bin} ->
            case erllama_cache_kvc:parse(Bin, Key) of
                {ok, Info, _Payload} ->
                    Tokens = maps:get(tokens, Info, []),
                    TokensBin = erllama_cache_key:encode_tokens(Tokens),
                    {ok, byte_size(Bin), header_slice(Bin), TokensBin};
                {error, R} ->
                    {error, R}
            end;
        {error, R} ->
            {error, R}
    end.

header_slice(Bin) ->
    HeaderSize = 48,
    binary:part(Bin, 0, HeaderSize).

location_for(ram, _) -> {ram};
location_for(ram_file, Path) -> {ram_file, Path};
location_for(disk, Path) -> {disk, Path}.

%% =============================================================================
%% Internal: writer DOWN handling
%% =============================================================================

on_writer_down(Ref, S) ->
    case find_reservation_by_monref(Ref, S#state.reservations) of
        {Key, R} -> cleanup_by_stage(Key, R, S);
        none -> S
    end.

find_reservation_by_monref(Ref, Reservations) ->
    Found = maps:fold(
        fun(K, R = #reservation{monref = M}, Acc) ->
            case M of
                Ref -> [{K, R} | Acc];
                _ -> Acc
            end
        end,
        [],
        Reservations
    ),
    case Found of
        [] -> none;
        [{K, R} | _] -> {K, R}
    end.

%% =============================================================================
%% Internal: waiters (lookup_exact_or_wait)
%% =============================================================================

add_waiter(Key, From, MaxWaitMs, S) ->
    Expires = monotonic_ns() + MaxWaitMs * 1_000_000,
    TRef = erlang:send_after(MaxWaitMs, self(), {waiter_expire, Key, From}),
    Existing = maps:get(Key, S#state.waiters, []),
    Waiters1 = (S#state.waiters)#{Key => [{From, Expires, TRef} | Existing]},
    S#state{waiters = Waiters1}.

notify_waiters(Key, S) ->
    case maps:take(Key, S#state.waiters) of
        {Waiters, W1} ->
            Reply = lookup_row(Key),
            lists:foreach(
                fun({From, _Exp, TRef}) ->
                    _ = erlang:cancel_timer(TRef),
                    gen_server:reply(From, Reply)
                end,
                Waiters
            ),
            S#state{waiters = W1};
        error ->
            S
    end.

expire_waiter(Key, From, S) ->
    case maps:get(Key, S#state.waiters, []) of
        [] ->
            S;
        Waiters ->
            case lists:keytake(From, 1, Waiters) of
                {value, _, []} ->
                    gen_server:reply(From, miss),
                    S#state{waiters = maps:remove(Key, S#state.waiters)};
                {value, _, Rest} ->
                    gen_server:reply(From, miss),
                    S#state{waiters = (S#state.waiters)#{Key => Rest}};
                false ->
                    %% Already replied (e.g. a save published first) and the
                    %% timer was cancelled in notify_waiters; this message is
                    %% the one that lost the race between cancel and fire.
                    S
            end
    end.

%% =============================================================================
%% Internal: sweep
%% =============================================================================

sweep_reservations(S) ->
    Now = monotonic_ns(),
    Stale = maps:fold(
        fun(K, R = #reservation{expires_ns = E, writer = Pid}, Acc) ->
            case E =< Now orelse not is_process_alive(Pid) of
                true -> [{K, R} | Acc];
                false -> Acc
            end
        end,
        [],
        S#state.reservations
    ),
    lists:foldl(
        fun({Key, R}, Acc) -> cleanup_stale_reservation(Key, R, Acc) end,
        S,
        Stale
    ).

%% =============================================================================
%% Internal: eviction
%% =============================================================================

run_eviction() ->
    run_eviction(ets:first(?TBL_LRU), 0).

run_eviction('$end_of_table', N) ->
    N;
run_eviction({_LastUsed, Key} = LruKey, N) ->
    Next = ets:next(?TBL_LRU, LruKey),
    case try_evict_one(LruKey, Key) of
        {ok, _Bytes} -> run_eviction(Next, N + 1);
        skip -> run_eviction(Next, N);
        gone -> run_eviction(Next, N)
    end.

run_eviction_bytes(Target, TierPred) ->
    run_eviction_bytes(ets:first(?TBL_LRU), Target, TierPred, 0, 0).

run_eviction_bytes('$end_of_table', _Target, _TierPred, N, Bytes) ->
    {N, Bytes};
run_eviction_bytes(_LruKey, Target, _TierPred, N, Bytes) when
    Bytes >= Target, Target > 0
->
    {N, Bytes};
run_eviction_bytes({_LastUsed, Key} = LruKey, Target, TierPred, N, Bytes) ->
    Next = ets:next(?TBL_LRU, LruKey),
    case try_evict_one(LruKey, Key, TierPred) of
        {ok, B} -> run_eviction_bytes(Next, Target, TierPred, N + 1, Bytes + B);
        skip -> run_eviction_bytes(Next, Target, TierPred, N, Bytes);
        gone -> run_eviction_bytes(Next, Target, TierPred, N, Bytes)
    end.

try_evict_one(LruKey, Key) ->
    try_evict_one(LruKey, Key, fun(_) -> true end).

try_evict_one(LruKey, Key, TierPred) ->
    case ets:lookup(?TBL_META, Key) of
        [Row] ->
            Tier = element(?POS_TIER, Row),
            case {element(?POS_STATUS, Row), element(?POS_REFCOUNT, Row), TierPred(Tier)} of
                {available, 0, true} ->
                    Location = element(?POS_LOCATION, Row),
                    Size = element(?POS_SIZE, Row),
                    delete_from_tier(Tier, Key, Location),
                    ets:delete(?TBL_LRU, LruKey),
                    ets:delete(?TBL_META, Key),
                    erllama_cache_counters:incr(?C_EVICTIONS),
                    {ok, Size};
                _ ->
                    skip
            end;
        [] ->
            ets:delete(?TBL_LRU, LruKey),
            gone
    end.

tier_pred(all) ->
    fun(_) -> true end;
tier_pred(Tiers) when is_list(Tiers) ->
    fun(T) -> lists:member(T, Tiers) end.

delete_from_tier(ram, Key, _Loc) ->
    erllama_cache_ram:delete(Key);
delete_from_tier(disk, _Key, {disk, Path}) ->
    _ = file:delete(Path),
    ok;
delete_from_tier(ram_file, _Key, {ram_file, Path}) ->
    _ = file:delete(Path),
    ok;
delete_from_tier(_, _, _) ->
    ok.

%% =============================================================================
%% Internal: helpers
%% =============================================================================

-spec monotonic_ns() -> integer().
monotonic_ns() ->
    erlang:monotonic_time(nanosecond).