Skip to main content

src/barrel_p2p_map.erl

%%% -*- erlang -*-
%%% Copyright (c) 2026 Benoit Chesneau
%%% SPDX-License-Identifier: Apache-2.0
%%%
%%% Public replicated key-value map.
%%%
%%% A named, gossiped, last-write-wins map for small cluster-wide
%%% control-plane state (config, feature flags, routing/placement tables,
%%% presence). Built on the `barrel_p2p_replica' substrate: writes broadcast
%%% OR-Map deltas, peers converge eventually, and a freshly started map
%%% pulls existing state from peers.
%%%
%%% Each named map is one owner gen_server (the sole writer; holds the
%%% OR-Map and a lock-free ETS read cache) plus its own `barrel_p2p_replica'
%%% instance. Reads (`get'/`keys'/`to_list') hit the ETS table directly and
%%% never block on writes.
%%%
%%% A map is NODE-LOCAL: `new/2' starts it on the calling node only. To be
%%% cluster-wide it must run on every participating node - declare it in the
%%% `replicated_maps' app env (started on every node at boot) or call
%%% `new/2' on each node. Not for bulk data, high write rates, or data
%%% needing custom conflict resolution (use the `barrel_p2p_replica' behaviour
%%% directly for that).
%%%
%%% State is in-memory + gossip by default (a full-cluster restart loses it).
%%% Pass `persist => true' to back the map with a `barrel_p2p_replica_log' WAL +
%%% snapshot (under `barrel_p2p_map_data_dir', default `data/maps'): writes are
%%% fsynced and the map is recovered on boot, so it is durable across a
%%% full-cluster restart. As with reminders, persisted values must be
%%% restart-safe data (no pids/ports/refs/funs).

-module(barrel_p2p_map).
-behaviour(gen_server).
-behaviour(barrel_p2p_replica).

%% Public API
-export([
    new/1, new/2,
    delete_map/1,
    put/3,
    remove/2,
    delete/2,
    get/2,
    keys/1,
    to_list/1,
    subscribe/1, subscribe/2,
    unsubscribe/1, unsubscribe/2
]).

%% Name helpers (used by the instance supervisor).
-export([owner_name/1, replica_name/1, tab_name/1]).

%% Owner process start.
-export([start_link/2]).

%% barrel_p2p_replica callbacks.
-export([
    replica_merge_delta/2,
    replica_apply_full_sync/2,
    replica_full_sync_snapshot/1,
    replica_remove_node/2,
    replica_anti_entropy/0
]).

%% gen_server callbacks.
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).

-define(OWNER_PREFIX, "barrel_p2p_map$").
-define(REPLICA_PREFIX, "barrel_p2p_map_replica$").
-define(TAB_PREFIX, "barrel_p2p_map_tab$").
-define(STORE_PREFIX, "barrel_p2p_map_store$").
-define(DEFAULT_MAP_DIR, "data/maps").

-type opts() :: #{
    validator => fun((term()) -> boolean()) | {module(), atom()},
    tombstone_ttl_ms => non_neg_integer(),
    scan_ms => pos_integer(),
    prune_on_peer_down => boolean(),
    persist => boolean()
}.
-export_type([opts/0]).

-record(state, {
    name :: atom(),
    replica :: atom(),
    tab :: ets:tid(),
    map :: barrel_p2p_ormap:ormap(),
    validator :: fun((term()) -> boolean()),
    scan_ms :: pos_integer(),
    tombstone_ttl_ms :: non_neg_integer(),
    prune :: boolean(),
    subscribers = #{} :: #{pid() => reference()},
    %% Optional disk persistence (opt-in via `persist'): WAL+snapshot handle
    %% and a dirty flag so the scan only snapshots on churn.
    log = undefined :: barrel_p2p_replica_log:handle(),
    dirty = false :: boolean()
}).

%%====================================================================
%% Public API
%%====================================================================

%% @doc Start a replicated map named `Name' on this node. Idempotent.
-spec new(atom()) -> {ok, pid()} | {error, term()}.
new(Name) -> new(Name, #{}).

-spec new(atom(), opts()) -> {ok, pid()} | {error, term()}.
new(Name, Opts) when is_atom(Name), is_map(Opts) ->
    barrel_p2p_map_sup:start_map(Name, Opts);
new(_Name, _Opts) ->
    {error, invalid_map_name}.

%% @doc Stop the map on THIS node (node-local; not a cluster-wide erase).
%% Also removes any persisted files, so a later `new/2' starts fresh rather
%% than reloading stale data. `stop_map/1' is synchronous (it waits for the
%% owner to terminate, which closes the log), so the delete runs strictly
%% AFTER the log is closed - no `terminate/2' write can race the delete. The
%% file delete is a no-op for a map that did not persist.
-spec delete_map(atom()) -> ok.
delete_map(Name) when is_atom(Name) ->
    ok = barrel_p2p_map_sup:stop_map(Name),
    barrel_p2p_replica_log:delete(store_name(Name), cfg(barrel_p2p_map_data_dir, ?DEFAULT_MAP_DIR)).

%% @doc Put `Value' under `Key'. Rejected with `{error, invalid_value}' if
%% the map's validator rejects the value.
-spec put(atom(), term(), term()) -> ok | {error, invalid_value | no_such_map}.
put(Name, Key, Value) ->
    call(Name, {put, Key, Value}).

%% @doc Remove `Key'.
-spec remove(atom(), term()) -> ok | {error, no_such_map}.
remove(Name, Key) ->
    call(Name, {remove, Key}).

%% @doc Alias for `remove/2'.
-spec delete(atom(), term()) -> ok | {error, no_such_map}.
delete(Name, Key) -> remove(Name, Key).

%% @doc Look up a live value (lock-free ETS read).
-spec get(atom(), term()) -> {ok, term()} | not_found.
get(Name, Key) ->
    case tab(Name) of
        undefined ->
            not_found;
        Tab ->
            case ets:lookup(Tab, Key) of
                [{_, Value}] -> {ok, Value};
                [] -> not_found
            end
    end.

%% @doc Live keys (lock-free ETS read).
-spec keys(atom()) -> [term()].
keys(Name) ->
    case tab(Name) of
        undefined -> [];
        Tab -> [K || {K, _V} <- ets:tab2list(Tab)]
    end.

%% @doc Live key/value pairs (lock-free ETS read).
-spec to_list(atom()) -> [{term(), term()}].
to_list(Name) ->
    case tab(Name) of
        undefined -> [];
        Tab -> ets:tab2list(Tab)
    end.

%% @doc Subscribe the calling process to `{barrel_p2p_map, Name, Event}'
%% change events, where Event is `{put, Key, Value}' or `{remove, Key}'.
-spec subscribe(atom()) -> ok | {error, no_such_map}.
subscribe(Name) -> subscribe(Name, self()).

-spec subscribe(atom(), pid()) -> ok | {error, no_such_map}.
subscribe(Name, Pid) when is_pid(Pid) ->
    call(Name, {subscribe, Pid}).

-spec unsubscribe(atom()) -> ok | {error, no_such_map}.
unsubscribe(Name) -> unsubscribe(Name, self()).

-spec unsubscribe(atom(), pid()) -> ok | {error, no_such_map}.
unsubscribe(Name, Pid) when is_pid(Pid) ->
    call(Name, {unsubscribe, Pid}).

%%====================================================================
%% Name helpers
%%====================================================================

owner_name(Name) -> derived(?OWNER_PREFIX, Name).
replica_name(Name) -> derived(?REPLICA_PREFIX, Name).
tab_name(Name) -> derived(?TAB_PREFIX, Name).
store_name(Name) -> derived(?STORE_PREFIX, Name).

derived(Prefix, Name) ->
    list_to_atom(Prefix ++ atom_to_list(Name)).

%% Owner registered name recovered from the replica instance name (the
%% callbacks run in the replica process and only receive that name). The
%% owner is always registered by the time a callback fires (the instance
%% supervisor starts owner-then-replica), so the atom exists.
owner_of(ReplicaName) ->
    ?REPLICA_PREFIX ++ Suffix = atom_to_list(ReplicaName),
    list_to_existing_atom(?OWNER_PREFIX ++ Suffix).

%%====================================================================
%% barrel_p2p_replica callbacks (run in the replica process)
%%====================================================================

replica_merge_delta(ReplicaName, Delta) ->
    gen_server:cast(owner_of(ReplicaName), {merge, Delta}).

replica_apply_full_sync(ReplicaName, Snapshot) ->
    gen_server:cast(owner_of(ReplicaName), {merge, Snapshot}).

replica_full_sync_snapshot(ReplicaName) ->
    gen_server:call(owner_of(ReplicaName), snapshot).

replica_remove_node(ReplicaName, Node) ->
    gen_server:cast(owner_of(ReplicaName), {remove_node, Node}).

%% A replicated map carries values that must reconverge after a heal even
%% without a fresh peer_up, so anti-entropy is intrinsic (no opt-out).
replica_anti_entropy() ->
    true.

%%====================================================================
%% Owner gen_server
%%====================================================================

start_link(Name, Opts) ->
    gen_server:start_link({local, owner_name(Name)}, ?MODULE, {Name, Opts}, []).

init({Name, Opts}) ->
    %% Trap exits so terminate/2 runs on supervisor shutdown and closes the
    %% disk_log cleanly (we link to it via open).
    process_flag(trap_exit, true),
    Tab = ets:new(
        tab_name(Name),
        [named_table, protected, set, {read_concurrency, true}]
    ),
    Scan = opt(scan_ms, Opts, cfg(barrel_p2p_map_scan_ms, 1000)),
    Ttl = opt(
        tombstone_ttl_ms,
        Opts,
        cfg(barrel_p2p_map_tombstone_ttl_ms, 3600000)
    ),
    %% Optional disk recovery. Seeds both the OR-Map and the ETS read cache
    %% from the persisted state; absorb_clock keeps the HLC monotonic.
    {Log, Map0} = open_store(Name, maps:get(persist, Opts, false)),
    seed_ets(Tab, Map0),
    arm_scan(Scan),
    {ok, #state{
        name = Name,
        replica = replica_name(Name),
        tab = Tab,
        map = Map0,
        validator = normalise_validator(maps:get(validator, Opts, undefined)),
        scan_ms = Scan,
        tombstone_ttl_ms = Ttl,
        prune = maps:get(prune_on_peer_down, Opts, false),
        log = Log
    }}.

%% Open the persistent store when `persist' is set; otherwise run with no
%% log handle (every log call is a no-op). A failed open degrades to
%% in-memory only, never fatal.
open_store(_Name, false) ->
    {undefined, barrel_p2p_ormap:new()};
open_store(Name, true) ->
    Dir = cfg(barrel_p2p_map_data_dir, ?DEFAULT_MAP_DIR),
    case barrel_p2p_replica_log:open(store_name(Name), Dir) of
        {ok, Log, Map} ->
            ok = barrel_p2p_ormap:absorb_clock(Map),
            {Log, Map};
        {error, Reason} ->
            logger:warning(
                "barrel_p2p_map ~p: persistence disabled, open "
                "failed: ~p",
                [Name, Reason]
            ),
            {undefined, barrel_p2p_ormap:new()}
    end.

seed_ets(Tab, Map) ->
    lists:foreach(
        fun({K, V}) -> ets:insert(Tab, {K, V}) end,
        barrel_p2p_ormap:to_list(Map)
    ).

handle_call({put, Key, Value}, _From, State) ->
    case run_validator(State#state.validator, Value) of
        true ->
            Map = barrel_p2p_ormap:add(Key, Value, State#state.map),
            ets:insert(State#state.tab, {Key, Value}),
            notify(State, {put, Key, Value}),
            barrel_p2p_replica:broadcast_update(State#state.replica, {add, Key, Value}),
            {reply, ok, persist_key(Key, sync, State#state{map = Map})};
        false ->
            {reply, {error, invalid_value}, State}
    end;
handle_call({remove, Key}, _From, State) ->
    Map = barrel_p2p_ormap:remove(Key, State#state.map),
    ets:delete(State#state.tab, Key),
    notify(State, {remove, Key}),
    barrel_p2p_replica:broadcast_update(State#state.replica, {remove, Key}),
    {reply, ok, persist_key(Key, sync, State#state{map = Map})};
handle_call({subscribe, Pid}, _From, State) ->
    case maps:is_key(Pid, State#state.subscribers) of
        true ->
            {reply, ok, State};
        false ->
            Ref = monitor(process, Pid),
            {reply, ok, State#state{
                subscribers = maps:put(Pid, Ref, State#state.subscribers)
            }}
    end;
handle_call({unsubscribe, Pid}, _From, State) ->
    case maps:take(Pid, State#state.subscribers) of
        {Ref, Subs} ->
            demonitor(Ref, [flush]),
            {reply, ok, State#state{subscribers = Subs}};
        error ->
            {reply, ok, State}
    end;
handle_call(snapshot, _From, State) ->
    Reply =
        case barrel_p2p_ormap:is_empty(State#state.map) of
            true -> empty;
            false -> {sync, State#state.map}
        end,
    {reply, Reply, State};
handle_call(_Req, _From, State) ->
    {reply, {error, unknown_request}, State}.

%% Both gossip deltas and full-sync snapshots flow here. Validate the
%% wrapper, merge, then project exactly the accepted keys into ETS and
%% emit diff events.
handle_cast({merge, Incoming}, State) ->
    {Map, Accepted} =
        barrel_p2p_crdt_wire:ingest(State#state.map, Incoming, State#state.validator),
    State1 = persist_keys(maps:keys(Accepted), nosync, State#state{map = Map}),
    {noreply, project_keys(maps:keys(Accepted), State1)};
handle_cast({remove_node, Node}, State = #state{prune = true}) ->
    Drop = [
        K
     || K <- barrel_p2p_ormap:keys(State#state.map),
        owned_only_by(K, Node, State#state.map)
    ],
    {noreply, lists:foldl(fun drop_key/2, State, Drop)};
handle_cast({remove_node, _Node}, State) ->
    {noreply, State};
handle_cast(_Msg, State) ->
    {noreply, State}.

%% Periodic tombstone GC so the replicated store stays bounded. Snapshots
%% to disk (and truncates the WAL) when the store changed since the last
%% snapshot, including a GC that dropped tombstones.
handle_info(scan, State) ->
    Cutoff = now_ms() - State#state.tombstone_ttl_ms,
    Old = State#state.map,
    Map = barrel_p2p_ormap:gc_tombstones(Old, Cutoff),
    Dirty = State#state.dirty orelse map_size(Map) =/= map_size(Old),
    State1 = State#state{map = Map, dirty = false},
    case Dirty of
        true -> _ = barrel_p2p_replica_log:snapshot(State1#state.log, Map);
        false -> ok
    end,
    arm_scan(State1#state.scan_ms),
    {noreply, State1};
handle_info({'DOWN', Ref, process, Pid, _Reason}, State) ->
    case maps:get(Pid, State#state.subscribers, undefined) of
        Ref -> {noreply, State#state{subscribers = maps:remove(Pid, State#state.subscribers)}};
        _ -> {noreply, State}
    end;
handle_info(_Info, State) ->
    {noreply, State}.

terminate(_Reason, State) ->
    _ = barrel_p2p_replica_log:close(State#state.log),
    ok.

%%====================================================================
%% Internal
%%====================================================================

%% Recompute the ETS projection + emit a diff event for each changed key.
project_keys(Keys, State) ->
    lists:foldl(fun(K, S) -> project_key(K, S) end, State, Keys).

project_key(Key, State) ->
    case barrel_p2p_ormap:get(Key, State#state.map) of
        {ok, Value} ->
            case ets:lookup(State#state.tab, Key) of
                %% unchanged
                [{_, Value}] ->
                    State;
                _ ->
                    ets:insert(State#state.tab, {Key, Value}),
                    notify(State, {put, Key, Value}),
                    State
            end;
        not_found ->
            case ets:lookup(State#state.tab, Key) of
                [] ->
                    State;
                _ ->
                    ets:delete(State#state.tab, Key),
                    notify(State, {remove, Key}),
                    State
            end
    end.

%% Local-only prune of a departed node's entries (prune_on_peer_down). Mark
%% dirty so the next scan snapshots the prune to disk (no WAL entry: the key
%% is hard-removed, not tombstoned).
drop_key(Key, State) ->
    Map = maps:remove(Key, State#state.map),
    ets:delete(State#state.tab, Key),
    notify(State, {remove, Key}),
    State#state{map = Map, dirty = true}.

owned_only_by(Key, Node, Map) ->
    case barrel_p2p_ormap:get_entry(Key, Map) of
        {ok, {value, _V, Dots}} ->
            lists:all(fun({N, _HLC}) -> N =:= Node end, maps:keys(Dots));
        _ ->
            false
    end.

notify(#state{name = Name, subscribers = Subs}, Event) ->
    maps:foreach(fun(Pid, _Ref) -> Pid ! {barrel_p2p_map, Name, Event} end, Subs).

%% Append the current entry (value or tombstone) for the given key(s) to the
%% WAL, marking the store dirty. `sync' forces it to disk before returning
%% (user-facing writes); `nosync' defers durability to the scan snapshot
%% (gossip merges). A no-op when persistence is off (log = undefined).
persist_key(Key, Sync, State) ->
    persist_keys([Key], Sync, State).

persist_keys(Keys, Sync, #state{log = Log, map = Map} = State) ->
    Delta = entries_of(Keys, Map),
    ok = barrel_p2p_replica_log:append(Log, Delta),
    case Sync of
        sync -> ok = barrel_p2p_replica_log:sync(Log);
        nosync -> ok
    end,
    State#state{dirty = State#state.dirty orelse map_size(Delta) > 0}.

entries_of(Keys, Map) ->
    lists:foldl(
        fun(Key, Acc) ->
            case barrel_p2p_ormap:get_entry(Key, Map) of
                {ok, Entry} -> Acc#{Key => Entry};
                not_found -> Acc
            end
        end,
        #{},
        Keys
    ).

call(Name, Msg) ->
    try
        gen_server:call(owner_name(Name), Msg)
    catch
        exit:{noproc, _} -> {error, no_such_map};
        exit:{{nodedown, _}, _} -> {error, no_such_map}
    end.

tab(Name) ->
    ets:whereis(tab_name(Name)).

normalise_validator(undefined) -> fun(_) -> true end;
normalise_validator(Fun) when is_function(Fun, 1) -> Fun;
normalise_validator({M, F}) -> fun(V) -> M:F(V) end.

run_validator(Fun, Value) ->
    try
        Fun(Value) =:= true
    catch
        _:_ -> false
    end.

arm_scan(Scan) -> erlang:send_after(Scan, self(), scan).

now_ms() -> erlang:system_time(millisecond).

cfg(Key, Default) -> application:get_env(barrel_p2p, Key, Default).

opt(Key, Opts, Default) -> maps:get(Key, Opts, Default).