Skip to main content

src/barrel_p2p_replica_log.erl

%%% -*- erlang -*-
%%% Copyright (c) 2026 Benoit Chesneau
%%% SPDX-License-Identifier: Apache-2.0
%%%
%%% Disk persistence for a gossiped OR-Map: a write-ahead log plus periodic
%%% snapshots, so the state survives a full-cluster restart (every node
%%% down, then up). Used by `barrel_p2p_reminder' (always) and `barrel_p2p_map'
%%% (opt-in); each holds the full replicated OR-Map, so every node persists
%%% its own copy and the cluster reloads + re-converges on boot.
%%%
%%% Layout under `Dir' (one store per Name):
%%%   <Name>.log       - disk_log (halt, internal), one appended delta per
%%%                      mutation: a `#{Key => entry}' OR-Map fragment.
%%%   <Name>.snapshot  - the full OR-Map, written atomically (temp+rename).
%%%
%%% Recovery merges the logged deltas on top of the snapshot. Because
%%% `barrel_p2p_ormap:merge/2' is commutative, associative and idempotent,
%%% replay is order-independent and safe even when a delta is also already
%%% in the snapshot (e.g. after a crash between snapshot and truncate).
%%% A snapshot writes the full map FIRST, then truncates the log, so a
%%% crash in between only replays already-applied deltas.
%%%
%%% Recovery validates every snapshot/logged entry with
%%% `barrel_p2p_crdt_wire:accept/2' (wrapper-shape + non-map guard), so a
%%% corrupt or wrong-shaped file cannot crash `merge/2' or `absorb_clock/1'
%%% at boot; unrecognised entries are dropped, not fatal.
%%%
%%% Callers MUST `barrel_p2p_ormap:absorb_clock/1' the recovered map before
%%% using it, so the restarted node's HLC advances past every persisted
%%% dot/version and cannot mint a timestamp behind it.
%%%
%%% A `handle()' of `undefined' makes every operation a no-op, so an
%%% opt-out caller (a map with `persist => false') runs the same code path
%%% with persistence disabled.
-module(barrel_p2p_replica_log).

-export([open/2, append/2, sync/1, snapshot/2, close/1, delete/2]).

-export_type([handle/0]).

-type handle() ::
    undefined
    | #{
        name := atom(),
        log := atom(),
        snapshot := file:filename_all()
    }.

%%====================================================================
%% API
%%====================================================================

%% @doc Open (creating if needed) the store for `Name' under `Dir' and
%% return the recovered OR-Map (snapshot with the logged deltas merged on
%% top). The caller must `barrel_p2p_ormap:absorb_clock/1' the result.
-spec open(atom(), file:filename_all()) ->
    {ok, handle(), barrel_p2p_ormap:ormap()} | {error, term()}.
open(Name, Dir) ->
    case filelib:ensure_dir(filename:join(Dir, "dummy")) of
        ok ->
            Base = read_snapshot(snapshot_path(Dir, Name)),
            case open_log(Name, log_path(Dir, Name)) of
                {ok, Log} ->
                    Map = replay(Log, Base),
                    Handle = #{
                        name => Name,
                        log => Log,
                        snapshot => snapshot_path(Dir, Name)
                    },
                    {ok, Handle, Map};
                {error, _} = Err ->
                    Err
            end;
        {error, _} = Err ->
            Err
    end.

%% @doc Append a delta (the concrete OR-Map entries just minted, so replay
%% never re-mints an HLC). A non-map or empty delta is ignored.
-spec append(handle(), barrel_p2p_ormap:ormap()) -> ok.
append(undefined, _Delta) ->
    ok;
append(#{log := Log}, Delta) when is_map(Delta), map_size(Delta) > 0 ->
    disk_log:log(Log, Delta);
append(_Handle, _Delta) ->
    ok.

%% @doc Flush the log to disk (persisted-before-ack for user writes).
-spec sync(handle()) -> ok.
sync(undefined) ->
    ok;
sync(#{log := Log}) ->
    disk_log:sync(Log).

%% @doc Write the full map to the snapshot atomically, then truncate the
%% log. Bounds the log and purges GC'd tombstones from disk.
-spec snapshot(handle(), barrel_p2p_ormap:ormap()) -> ok | {error, term()}.
snapshot(undefined, _Map) ->
    ok;
snapshot(#{log := Log, snapshot := SnapPath}, Map) ->
    case barrel_p2p_file:write_secure(SnapPath, term_to_binary(Map)) of
        ok -> disk_log:truncate(Log);
        {error, _} = Err -> Err
    end.

%% @doc Close the log cleanly (called from the owner's `terminate/2').
-spec close(handle()) -> ok.
close(undefined) ->
    ok;
close(#{log := Log}) ->
    _ = disk_log:close(Log),
    ok.

%% @doc Remove the on-disk files for `Name' under `Dir'. Idempotent;
%% used by `delete_map/1' so a re-created map does not reload stale data.
-spec delete(atom(), file:filename_all()) -> ok.
delete(Name, Dir) ->
    _ = catch disk_log:close(Name),
    _ = file:delete(snapshot_path(Dir, Name)),
    _ = file:delete(log_path(Dir, Name)),
    ok.

%%====================================================================
%% Internal
%%====================================================================

open_log(Name, LogPath) ->
    case
        disk_log:open([
            {name, Name},
            {file, LogPath},
            {type, halt},
            {format, internal}
        ])
    of
        {ok, Name} ->
            {ok, Name};
        {repaired, Name, {recovered, _R}, {badbytes, _B}} ->
            %% Unclean prior shutdown: the log was truncated to the last
            %% good term. The snapshot covers the rest; carry on.
            {ok, Name};
        {error, _} = Err ->
            Err
    end.

replay(Log, Base) ->
    replay(Log, start, Base).

replay(Log, Cont, Acc) ->
    case disk_log:chunk(Log, Cont) of
        eof ->
            Acc;
        {error, _Reason} ->
            %% Best-effort: stop at the first unreadable chunk, keep what
            %% we merged so far (the snapshot is still authoritative).
            Acc;
        {Cont2, Terms} ->
            replay(Log, Cont2, apply_terms(Terms, Acc));
        {Cont2, Terms, _Bad} ->
            replay(Log, Cont2, apply_terms(Terms, Acc))
    end.

apply_terms(Terms, Acc) ->
    lists:foldl(
        fun(Delta, M) -> barrel_p2p_ormap:merge(M, validate(Delta)) end,
        Acc,
        Terms
    ).

read_snapshot(SnapPath) ->
    case file:read_file(SnapPath) of
        {ok, Bin} ->
            try binary_to_term(Bin) of
                Term -> validate(Term)
            catch
                _:_ -> barrel_p2p_ormap:new()
            end;
        {error, _} ->
            barrel_p2p_ormap:new()
    end.

%% Keep only OR-Map-shaped entries (wrapper validation via barrel_p2p_crdt_wire,
%% which also guards a non-map argument). A corrupt or wrong-shaped file then
%% cannot crash merge/2 or absorb_clock/1 at boot.
validate(Term) ->
    barrel_p2p_crdt_wire:accept(Term, fun(_) -> true end).

snapshot_path(Dir, Name) ->
    filename:join(Dir, atom_to_list(Name) ++ ".snapshot").

log_path(Dir, Name) ->
    filename:join(Dir, atom_to_list(Name) ++ ".log").