%%% -*- erlang -*-
%%% Copyright (c) 2026 Benoit Chesneau
%%% SPDX-License-Identifier: Apache-2.0
%%%
%%% Generic replication driver for a gossiped OR-Map.
%%%
%%% This is the low-level public substrate behind `barrel_p2p_map' and the
%%% built-in registry / leader / sharded-placement / durable-reminder
%%% features. Use `barrel_p2p_map' for an ordinary replicated key-value map;
%%% implement this behaviour directly only when you need custom merge or
%%% snapshot semantics (e.g. leader election layers fencing tokens on top
%%% via `broadcast_custom/2'). Stability: beta.
%%%
%%% An instance is started with `start_link(#{name := atom(), callback :=
%%% module()})'. `name' is BOTH the registered process name and the
%%% Plumtree tag that scopes this instance's broadcasts, so it must be a
%%% unique atom. Instances share the Plumtree bus; each ignores payloads
%%% carrying another instance's tag. The driver handles:
%%%
%%% - broadcast add/remove deltas as OR-Map entries (`broadcast_update/2'),
%%% - route incoming deltas to the owner's merge callback,
%%% - seed peers from the active view and full-sync on start / `peer_up',
%%% - drop a node's entries on `peer_down'.
%%%
%%% The OWNER process holds the actual OR-Map (so it can run its side
%%% effects synchronously) and implements the callbacks below. The driver
%%% calls them from its own process, passing the instance `name' first so
%%% one callback module can back many instances. Start the owner BEFORE
%%% its replica instance (the callbacks cast into the owner).
%%%
%%% Wire safety: callbacks receive entries straight off gossip. Passing
%%% them to `barrel_p2p_ormap:absorb_clock/merge' unvalidated can crash the
%%% merge or the shared `barrel_p2p_hlc' server (malformed dot/HLC, empty dot
%%% map, non-map payload). An implementer that merges deltas from sources
%%% it does not fully control SHOULD validate via `barrel_p2p_crdt_wire'
%%% (the recommended helper; not enforced). Leaf-payload validation is the
%%% app's own concern. (Of the built-ins, only the reminder validates;
%%% registry/leader/shard have internal writers.)
%%%
%%% Transport: gossip rides `barrel_p2p_plumtree' + `barrel_p2p_hyparview_events'
%%% over barrel_p2p's dist carrier, so a consumer must run on barrel_p2p's
%%% distribution. A pluggable transport (for apps with their own membership)
%%% is future work.
-module(barrel_p2p_replica).
-behaviour(gen_server).
%% API
-export([start_link/1]).
-export([broadcast_update/2, broadcast_custom/2]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
-define(SYNC_TAG, '$barrel_p2p_replica').
%% Every callback receives the instance `Name' as its first argument, so
%% one callback module can back several independently-named instances
%% (e.g. many `barrel_p2p_map' instances share the `barrel_p2p_map' module).
%% Merge an incoming delta (one or more {Key, entry}) into the owner's
%% map and run its side effects.
-callback replica_merge_delta(Name :: atom(), Delta :: barrel_p2p_ormap:ormap()) -> ok.
%% Apply a full snapshot received from a peer on connect.
-callback replica_apply_full_sync(Name :: atom(), Snapshot :: term()) -> ok.
%% Produce a snapshot to send to a newly connected peer, or `empty'
%% when there is nothing to send.
-callback replica_full_sync_snapshot(Name :: atom()) ->
{sync, Snapshot :: term()} | empty.
%% Drop all entries owned by a node that left or failed.
-callback replica_remove_node(Name :: atom(), node()) -> ok.
%% Merge a feature-specific custom broadcast (optional).
-callback replica_merge_custom(Name :: atom(), Payload :: term()) -> ok.
%% Whether this callback's instances run periodic anti-entropy (optional;
%% absent or `false' = off). A module returns `true' here to make periodic
%% full-sync convergence intrinsic to its instances, with no per-instance or
%% operator opt-out; the only knob is the `replica_anti_entropy_ms' interval.
%% Implement it only for value-carrying stores with a full-state snapshot and
%% tombstone removal (the built-in reminder and `barrel_p2p_map' do); the
%% registry/leader/shard do not, so they stay off structurally.
-callback replica_anti_entropy() -> boolean().
-optional_callbacks([replica_merge_custom/2, replica_anti_entropy/0]).
-record(state, {
name :: atom(),
cb :: module(),
peers = [] :: [node()],
watch = #{} :: barrel_p2p_source_monitor:watch(),
%% Periodic anti-entropy interval (ms); 0 = disabled (timer never armed).
ae_ms = 0 :: non_neg_integer()
}).
%% The instance `name' is both the registered process name and the
%% Plumtree tag that scopes this instance's broadcasts. Periodic
%% anti-entropy (a full-sync PULL from a random peer, so the instance
%% reconverges after a heal even without a fresh `peer_up') is governed by
%% the callback's `replica_anti_entropy/0', not by config: a value-carrying
%% store declares it in code and there is no per-instance toggle.
-type config() :: #{name := atom(), callback := module()}.
-export_type([config/0]).
%%====================================================================
%% API
%%====================================================================
-spec start_link(config()) -> {ok, pid()} | {error, term()}.
start_link(#{name := Name} = Config) ->
gen_server:start_link({local, Name}, ?MODULE, Config, []).
%% Broadcast an OR-Map add/remove on this instance.
-spec broadcast_update(atom(), {add, term(), term()} | {remove, term()}) -> ok.
broadcast_update(Name, Update) ->
gen_server:cast(Name, {broadcast, Update}).
%% Broadcast a feature-specific payload on this instance's tag,
%% delivered to the owner's `replica_merge_custom/1'.
-spec broadcast_custom(atom(), term()) -> ok.
broadcast_custom(Name, Payload) ->
gen_server:cast(Name, {broadcast_custom, Payload}).
%%====================================================================
%% gen_server callbacks
%%====================================================================
init(#{name := Name, callback := Cb}) ->
%% Subscribe to both sources and keep the subscriptions alive across a
%% source restart (a plumtree/hyparview-events crash does not restart
%% us, so a one-shot subscribe would be silently dropped).
Watch = barrel_p2p_source_monitor:start(
[barrel_p2p_plumtree, barrel_p2p_hyparview_events]
),
%% Seed from the current active view and pull existing state. peer_up
%% only fires for FUTURE joins, so an instance started after the cluster
%% has already formed (e.g. a barrel_p2p_map created at runtime) would
%% otherwise sit at peers=[] and never sync. Deferred via a self-message
%% so init/1 stays non-blocking.
self() ! seed_initial_sync,
AeMs =
case anti_entropy_enabled(Cb) of
true -> application:get_env(barrel_p2p, replica_anti_entropy_ms, 30000);
false -> 0
end,
arm_anti_entropy(AeMs),
{ok, #state{name = Name, cb = Cb, watch = Watch, ae_ms = AeMs}}.
handle_call(_Request, _From, State) ->
{reply, ok, State}.
handle_cast({broadcast, {add, Key, Val}}, #state{name = Name} = State) ->
Dot = {node(), barrel_p2p_hlc:now()},
Delta = #{Key => {value, Val, #{Dot => true}}},
barrel_p2p_plumtree:broadcast(Name, {delta, node(), Delta}),
{noreply, State};
handle_cast({broadcast, {remove, Key}}, #state{name = Name} = State) ->
%% Tombstone-as-delta: the receiver's OR-Map merge resolves against
%% any in-flight value by HLC, so a delayed add cannot resurrect it.
Delta = #{Key => {tombstone, barrel_p2p_hlc:now()}},
barrel_p2p_plumtree:broadcast(Name, {delta, node(), Delta}),
{noreply, State};
handle_cast({broadcast_custom, Payload}, #state{name = Name} = State) ->
barrel_p2p_plumtree:broadcast(Name, {custom, node(), Payload}),
{noreply, State};
handle_cast(_Msg, State) ->
{noreply, State}.
%% Plumtree delivery scoped to this instance (tag =:= our name).
handle_info({plumtree_broadcast, {MsgTag, Payload}}, #state{name = Name} = State) when
MsgTag =:= Name
->
handle_payload(Payload, Name, State);
%% Plumtree delivery for another instance.
handle_info({plumtree_broadcast, _Other}, State) ->
{noreply, State};
handle_info({barrel_p2p_event, {peer_up, Node}}, #state{peers = Peers} = State) ->
case Node =:= node() orelse lists:member(Node, Peers) of
true ->
{noreply, State};
false ->
self() ! {do_full_sync, Node},
{noreply, State#state{peers = [Node | Peers]}}
end;
handle_info(
{barrel_p2p_event, {peer_down, Node, _Reason}},
#state{name = Name, cb = Cb, peers = Peers} = State
) ->
Cb:replica_remove_node(Name, Node),
{noreply, State#state{peers = lists:delete(Node, Peers)}};
handle_info({barrel_p2p_event, _Other}, State) ->
{noreply, State};
%% Seed peers from the current active view and pull their state (see init/1).
handle_info(seed_initial_sync, #state{peers = Peers} = State) ->
Seeded = lists:usort(Peers ++ active_view_peers()),
{noreply, request_sync_from_peers(State#state{peers = Seeded})};
handle_info({do_full_sync, Node}, #state{name = Name, cb = Cb, peers = Peers} = State) ->
case lists:member(Node, Peers) of
true ->
case Cb:replica_full_sync_snapshot(Name) of
empty -> ok;
{sync, Snap} -> send_to_peer(Name, Node, {full_sync, node(), Snap})
end;
false ->
ok
end,
{noreply, State};
handle_info(
{?SYNC_TAG, {full_sync, _FromNode, Snapshot}},
#state{name = Name, cb = Cb} = State
) ->
Cb:replica_apply_full_sync(Name, Snapshot),
{noreply, State};
%% A peer that just re-subscribed asks us to push our state to it.
handle_info(
{?SYNC_TAG, {request_sync, FromNode}},
#state{name = Name, cb = Cb} = State
) ->
case Cb:replica_full_sync_snapshot(Name) of
empty -> ok;
{sync, Snap} -> send_to_peer(Name, FromNode, {full_sync, node(), Snap})
end,
{noreply, State};
%% A watched source restarted: re-subscribe (with retry) and, once back,
%% pull a full sync from known peers to recover deltas missed during the
%% gap. Full sync rides direct dist messages, not plumtree, so it works
%% even while plumtree is bouncing.
handle_info({barrel_p2p_source_monitor, retry, Source}, #state{watch = Watch} = State) ->
Was = maps:is_key(Source, Watch),
Watch1 = barrel_p2p_source_monitor:retry(Source, Watch),
State1 = State#state{watch = Watch1},
case (not Was) andalso maps:is_key(Source, Watch1) of
true -> {noreply, request_sync_from_peers(State1)};
false -> {noreply, State1}
end;
handle_info({'DOWN', Ref, process, _Pid, _Reason}, #state{watch = Watch} = State) ->
case barrel_p2p_source_monitor:down(Ref, Watch) of
{down, _Source, Watch1} -> {noreply, State#state{watch = Watch1}};
ignore -> {noreply, State}
end;
%% Periodic anti-entropy: pull a full sync from one random peer so a node
%% that missed updates (e.g. a surviving link after a partition heal, which
%% gets no fresh peer_up) reconverges. The merge is idempotent, so repeated
%% pulls are safe; over a few ticks state propagates transitively.
handle_info(anti_entropy, #state{name = Name, peers = Peers, ae_ms = AeMs} = State) ->
case Peers of
[] -> ok;
_ -> send_to_peer(Name, random_peer(Peers), {request_sync, node()})
end,
arm_anti_entropy(AeMs),
{noreply, State};
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
%%====================================================================
%% Internal Functions
%%====================================================================
handle_payload({delta, _FromNode, Delta}, Name, #state{cb = Cb} = State) ->
Cb:replica_merge_delta(Name, Delta),
{noreply, State};
handle_payload({custom, _FromNode, Payload}, Name, #state{cb = Cb} = State) ->
case erlang:function_exported(Cb, replica_merge_custom, 2) of
true -> Cb:replica_merge_custom(Name, Payload);
false -> ok
end,
{noreply, State};
handle_payload(_Other, _Name, State) ->
{noreply, State}.
send_to_peer(Name, Node, Msg) ->
erlang:send({Name, Node}, {?SYNC_TAG, Msg}, [noconnect]).
%% Current HyParView active view (other nodes), used to seed a freshly
%% started instance. Safe during early boot: returns [] if HyParView is
%% not answering yet.
active_view_peers() ->
try barrel_p2p:active_view() of
Nodes -> [N || N <- Nodes, N =/= node()]
catch
_:_ ->
[]
end.
%% Ask every known peer to push its full state to us. Used after a
%% re-subscribe to recover anything missed while a source was down.
request_sync_from_peers(#state{name = Name, peers = Peers} = State) ->
[send_to_peer(Name, Node, {request_sync, node()}) || Node <- Peers],
State.
%% A callback module opts its instances into periodic anti-entropy by
%% exporting `replica_anti_entropy/0' returning `true'. Absent (the
%% registry/leader/shard) it stays off.
anti_entropy_enabled(Cb) ->
erlang:function_exported(Cb, replica_anti_entropy, 0) andalso Cb:replica_anti_entropy().
random_peer(Peers) ->
lists:nth(rand:uniform(length(Peers)), Peers).
%% Arm the next anti-entropy tick. 0 = disabled (never armed). Integer-only
%% jitter (+-25%) so instances/nodes do not sync in lockstep; robust at small
%% intervals (never rand:uniform(0), never a float for send_after).
arm_anti_entropy(0) ->
ok;
arm_anti_entropy(AeMs) when is_integer(AeMs), AeMs > 0 ->
_ = erlang:send_after(jitter(AeMs), self(), anti_entropy),
ok.
jitter(AeMs) ->
case AeMs div 4 of
0 -> AeMs;
J -> AeMs - J + rand:uniform(2 * J)
end.