%% Copyright (c) 2026 Benoit Chesneau. Licensed under the MIT License.
%% See the LICENSE file at the project root.
%%
-module(erllama_cache_disk_srv).
-moduledoc """
Disk tier server (read_write mode).
One server per disk root directory. Owns no ETS tables; the disk
itself is the source of truth for slabs. Reads, writes, and deletes
all funnel through the gen_server, which keeps file ordering
sequential per directory.
Save pipeline (`save/3`):
1. Build framed bytes via `erllama_cache_kvc:build/2`.
2. Open writer-unique temp file `<hex>.kvc.<writer_id>.tmp`
with O_EXCL.
3. `prim_file:write/2` an iolist of `[Prefix, Payload]`. Multi-GB
payloads are not concatenated in BEAM memory; the IO subsystem
uses writev under the hood.
4. `prim_file:datasync/1`, then close.
5. `prim_file:make_link/2` to publish at `<hex>.kvc`. EEXIST is
handled: if the existing file is a valid KVC for this Key, we
adopt it (delete our temp). Otherwise we delete the corrupt
file and retry once.
6. `erllama_nif:fsync_dir/1` on the root.
7. Reopen the published file and parse it (validation belt-and-
braces against FS bugs).
Returns `{ok, Header, Size}` on success. The caller is responsible
for the `meta_srv:reserve_save -> check_reservation -> mark_published
-> announce_saved` protocol around this call (the writer pool in
step 10 wires that up).
On startup, the server scans its directory: deletes any `*.tmp`
files (interrupted writes; safe to drop), parses every `<hex>.kvc`
header, and registers each valid file with the meta server. Files
that fail to parse are deleted.
""".
-behaviour(gen_server).
-include("erllama_cache.hrl").
-include_lib("kernel/include/file.hrl").
-export([
start_link/2,
start_link/3,
save/3,
write_tmp/3,
publish/1,
publish/4,
abort_tmp/1,
load/2,
delete/2,
dir/1,
scan/1,
touch_hits/2
]).
-export([init/1, handle_call/3, handle_cast/2]).
-record(state, {
name :: atom(),
tier :: disk | ram_file,
root :: file:name()
}).
-type state() :: #state{}.
%% =============================================================================
%% Public API
%% =============================================================================
-spec start_link(atom(), file:name()) -> {ok, pid()} | {error, term()}.
start_link(Name, RootDir) ->
start_link(Name, disk, RootDir).
-spec start_link(atom(), disk | ram_file, file:name()) ->
{ok, pid()} | {error, term()}.
start_link(Name, Tier, RootDir) when
Tier =:= disk; Tier =:= ram_file
->
gen_server:start_link({local, Name}, ?MODULE, [Name, Tier, RootDir], []).
-spec save(atom(), erllama_cache_kvc:build_meta(), binary()) ->
{ok, erllama_cache:cache_key(), binary(), non_neg_integer()}
| {error, term()}.
save(SrvName, BuildMeta, Payload) ->
gen_server:call(SrvName, {save, BuildMeta, Payload}, infinity).
%% Two-phase save for callers that need to interleave the meta
%% server's reservation protocol (check_reservation, mark_published)
%% between the temp write and the link publish. write_tmp does the
%% expensive part (build prefix, write+datasync the temp file);
%% publish does the atomic link, dir-fsync, and validate. Both are
%% stateless module functions that take the directory directly.
%%
%% {ok, WriteOut} = write_tmp(Root, BuildMeta, Payload),
%% ok = meta_srv:check_reservation(Key, Token),
%% {ok, Key, Header, Size} = publish(Root, WriteOut),
%% ok = meta_srv:mark_published(Key, Token, FinalPath),
%% ok = meta_srv:announce_saved(Key, Token, Size, Header, TokensBin).
%%
%% On any meta-server rejection between phases, call abort_tmp/1
%% with the WriteOut to delete the temp file.
-type write_out() :: #{
key := erllama_cache:cache_key(),
tmp := file:name(),
final := file:name(),
prefix := binary(),
payload := binary(),
root := file:name()
}.
-spec write_tmp(file:name(), erllama_cache_kvc:build_meta(), binary()) ->
{ok, write_out()} | {error, term()}.
write_tmp(Root, BuildMeta, Payload) ->
Key = erllama_cache_key:make(#{
fingerprint => maps:get(fingerprint, BuildMeta),
quant_type => maps:get(quant_type, BuildMeta),
ctx_params_hash => maps:get(ctx_params_hash, BuildMeta),
tokens => maps:get(tokens, BuildMeta)
}),
HexKey = bin_to_hex(Key),
FinalPath = filename:join(Root, HexKey ++ ".kvc"),
TmpPath = filename:join(Root, HexKey ++ ".kvc." ++ writer_suffix() ++ ".tmp"),
case erllama_cache_kvc:build(BuildMeta, Payload) of
{ok, Prefix} ->
case write_temp(TmpPath, Prefix, Payload) of
ok ->
{ok, #{
key => Key,
tmp => TmpPath,
final => FinalPath,
prefix => Prefix,
payload => Payload,
root => Root
}};
{error, _} = E ->
_ = file:delete(TmpPath),
E
end;
{error, _} = E ->
E
end.
-spec publish(write_out()) ->
{ok, erllama_cache:cache_key(), binary(), non_neg_integer()}
| {error, term()}.
publish(#{
key := Key,
tmp := Tmp,
final := Final,
prefix := Prefix,
payload := Payload,
root := Root
}) ->
link_temp(Key, Prefix, Payload, Tmp, Final, Root).
%% Compatibility 4-arg form for the rare caller that only has paths.
%% Re-reads the file to recover the prefix.
-spec publish(file:name(), file:name(), file:name(), erllama_cache:cache_key()) ->
{ok, erllama_cache:cache_key(), binary(), non_neg_integer()}
| {error, term()}.
publish(Root, TmpPath, FinalPath, Key) ->
case prim_file:read_file(TmpPath) of
{ok, Bin} when byte_size(Bin) >= 48 ->
<<Prefix:48/binary, _/binary>> = Bin,
link_temp(Key, Prefix, <<>>, TmpPath, FinalPath, Root);
_ ->
{error, tmp_unreadable}
end.
-spec abort_tmp(write_out()) -> ok.
abort_tmp(#{tmp := Tmp}) ->
_ = file:delete(Tmp),
ok.
-spec load(atom(), erllama_cache:cache_key()) ->
{ok, erllama_cache_kvc:info(), binary()} | miss | {error, term()}.
load(SrvName, Key) ->
gen_server:call(SrvName, {load, Key}, infinity).
-spec delete(atom(), erllama_cache:cache_key()) -> ok.
delete(SrvName, Key) ->
gen_server:call(SrvName, {delete, Key}).
-spec dir(atom()) -> file:name().
dir(SrvName) ->
gen_server:call(SrvName, dir).
%% Scan the directory and return a list of `{Key, Header, Size}` for
%% every parseable `<hex>.kvc`. Side effect: deletes `*.tmp` files
%% and any unreadable `<hex>.kvc` files.
-spec scan(atom()) ->
[{erllama_cache:cache_key(), binary(), non_neg_integer()}].
scan(SrvName) ->
gen_server:call(SrvName, scan, infinity).
%% Best-effort in-place rewrite of the u32 hit_count at offset 12 of
%% the on-disk header. No server hop: the meta server is the sole
%% caller, the write is exactly 4 bytes (atomic at the OS level), and
%% the file's other bytes are immutable post-publish. ENOENT (file
%% just got evicted) is silently ignored.
-spec touch_hits(file:name(), non_neg_integer()) -> ok.
touch_hits(Path, Hits) ->
case prim_file:open(Path, [write, read, raw, binary]) of
{ok, Fd} ->
try
_ = prim_file:pwrite(Fd, ?KVC_HEADER_HITS_OFFSET, <<Hits:32/little>>)
after
prim_file:close(Fd)
end,
ok;
{error, _} ->
ok
end.
%% =============================================================================
%% gen_server callbacks
%% =============================================================================
-spec init([term()]) -> {ok, state()}.
init([Name, Tier, Root]) ->
case filelib:ensure_path(Root) of
ok -> ok;
{error, Reason} -> erlang:error({cannot_create_dir, Root, Reason})
end,
%% Drop any leftover `*.tmp` from a previous run.
sweep_tmps(Root),
%% Register every valid .kvc with the meta server.
register_existing(Tier, Root),
{ok, #state{name = Name, tier = Tier, root = Root}}.
handle_call({save, BuildMeta, Payload}, _From, S) ->
{reply, do_save(BuildMeta, Payload, S#state.root), S};
handle_call({load, Key}, _From, S) ->
{reply, do_load(Key, S#state.root), S};
handle_call({delete, Key}, _From, S) ->
{reply, do_delete(Key, S#state.root), S};
handle_call(dir, _From, S) ->
{reply, S#state.root, S};
handle_call(scan, _From, S) ->
sweep_tmps(S#state.root),
{reply, scan_dir(S#state.root), S};
handle_call(_Msg, _From, S) ->
{reply, {error, unknown_call}, S}.
-spec handle_cast(term(), state()) -> {noreply, state()}.
handle_cast(_Msg, S) ->
{noreply, S}.
%% =============================================================================
%% Internal: save
%% =============================================================================
do_save(BuildMeta, Payload, Root) ->
case write_tmp(Root, BuildMeta, Payload) of
{ok, WriteOut} ->
publish(WriteOut);
{error, _} = E ->
E
end.
write_temp(TmpPath, Prefix, Payload) ->
case prim_file:open(TmpPath, [write, exclusive, binary, raw]) of
{ok, Fd} ->
try
case prim_file:write(Fd, [Prefix, Payload]) of
ok ->
prim_file:datasync(Fd);
{error, _} = E ->
E
end
after
prim_file:close(Fd)
end;
{error, _} = E ->
E
end.
link_temp(Key, Prefix, Payload, TmpPath, FinalPath, Root) ->
case try_link(TmpPath, FinalPath) of
ok ->
finalise(Key, Prefix, Payload, FinalPath, Root);
eexist ->
handle_eexist(Key, Prefix, Payload, TmpPath, FinalPath, Root);
{error, _} = E ->
E
end.
handle_eexist(Key, Prefix, Payload, TmpPath, FinalPath, Root) ->
case validate_at(FinalPath, Key) of
{ok, _Info, _Payload} ->
_ = prim_file:delete(TmpPath),
ok = erllama_nif:fsync_dir(unicode_path(Root)),
{ok, Key, header_of(Prefix), file_size(FinalPath)};
{error, _} ->
_ = prim_file:delete(FinalPath),
case try_link(TmpPath, FinalPath) of
ok -> finalise(Key, Prefix, Payload, FinalPath, Root);
_ -> {error, eexist}
end
end.
try_link(TmpPath, FinalPath) ->
case prim_file:make_link(TmpPath, FinalPath) of
ok ->
_ = prim_file:delete(TmpPath),
ok;
{error, eexist} ->
eexist;
{error, _} = E ->
_ = prim_file:delete(TmpPath),
E
end.
finalise(Key, Prefix, _Payload, FinalPath, Root) ->
case erllama_nif:fsync_dir(unicode_path(Root)) of
ok ->
case validate_at(FinalPath, Key) of
{ok, _Info, _ParsedPayload} ->
{ok, Key, header_of(Prefix), file_size(FinalPath)};
{error, _} = E ->
_ = prim_file:delete(FinalPath),
E
end;
{error, _} = E ->
E
end.
%% =============================================================================
%% Internal: load / delete
%% =============================================================================
do_load(Key, Root) ->
Path = filename:join(Root, bin_to_hex(Key) ++ ".kvc"),
case load_bin(Path) of
{ok, Bin} -> parse_or_drop(Bin, Key, Path);
miss -> miss;
{error, _} = E -> E
end.
parse_or_drop(Bin, Key, Path) ->
case erllama_cache_kvc:parse(Bin, Key) of
{ok, Info, Payload} ->
{ok, Info, Payload};
{error, R} ->
%% Corrupt file: drop it so the next request doesn't
%% repeat the same failed parse.
_ = prim_file:delete(Path),
erllama_cache_counters:incr(?C_CORRUPT_FILES),
{error, R}
end.
%% Plain read I/O. mmap was deliberately removed: the process already
%% mmaps a multi-GB GGUF, so cache restore via mmap doubles the VM
%% pressure, and the truncation hazard on a region binary outliving
%% any NIF call would crash the BEAM if the cache directory is ever
%% mutated by another process. ds4 makes the same choice.
load_bin(Path) ->
case file:read_file(Path) of
{ok, Bin} -> {ok, Bin};
{error, enoent} -> miss;
{error, _} = E -> E
end.
do_delete(Key, Root) ->
Path = filename:join(Root, bin_to_hex(Key) ++ ".kvc"),
case prim_file:delete(Path) of
ok -> ok;
{error, enoent} -> ok;
{error, _} = E -> E
end.
%% =============================================================================
%% Internal: scan
%% =============================================================================
sweep_tmps(Root) ->
case file:list_dir(Root) of
{ok, Entries} ->
[
_ = prim_file:delete(filename:join(Root, E))
|| E <- Entries, lists:suffix(".tmp", E)
],
ok;
{error, _} ->
ok
end.
scan_dir(Root) ->
case file:list_dir(Root) of
{ok, Entries} ->
lists:foldl(
fun(E, Acc) -> scan_entry(filename:join(Root, E), E, Acc) end,
[],
Entries
);
{error, _} ->
[]
end.
scan_entry(Path, Name, Acc) ->
case lists:suffix(".kvc", Name) andalso (not lists:suffix(".tmp", Name)) of
true -> scan_kvc(Path, Acc);
false -> Acc
end.
scan_kvc(Path, Acc) ->
case file:read_file(Path) of
{ok, Bin} ->
case erllama_cache_kvc:parse_meta(Bin) of
{ok, Info} ->
Header = binary:part(Bin, 0, 48),
Tokens = maps:get(tokens, Info),
Key = erllama_cache_key:make(#{
fingerprint => maps:get(fingerprint, Info),
quant_type => maps:get(quant_type, Info),
ctx_params_hash => maps:get(ctx_params_hash, Info),
tokens => Tokens
}),
TokensBin = erllama_cache_key:encode_tokens(Tokens),
[{Key, Header, byte_size(Bin), TokensBin} | Acc];
{error, _} ->
_ = prim_file:delete(Path),
Acc
end;
{error, _} ->
Acc
end.
register_existing(Tier, Root) ->
Entries = scan_dir(Root),
lists:foreach(
fun({Key, Header, Size, TokensBin}) ->
Path = filename:join(Root, bin_to_hex(Key) ++ ".kvc"),
erllama_cache_meta_srv:insert_available(
Key, Tier, Size, Header, {Tier, Path}, TokensBin
)
end,
Entries
).
%% =============================================================================
%% Internal: helpers
%% =============================================================================
validate_at(Path, Key) ->
case file:read_file(Path) of
{ok, Bin} -> erllama_cache_kvc:parse(Bin, Key);
{error, _} = E -> E
end.
header_of(Prefix) ->
binary:part(Prefix, 0, 48).
file_size(Path) ->
case file:read_file_info(Path) of
{ok, #file_info{size = S}} -> S;
_ -> 0
end.
bin_to_hex(Bin) ->
binary_to_list(binary:encode_hex(Bin, lowercase)).
writer_suffix() ->
pid_to_list(self()) ++ "." ++ integer_to_list(erlang:unique_integer([positive])).
unicode_path(P) when is_binary(P) -> P;
unicode_path(P) when is_list(P) -> unicode:characters_to_binary(P, utf8).