Skip to main content

src/barrel_p2p_reminder.erl

%%% -*- erlang -*-
%%% Copyright (c) 2026 Benoit Chesneau
%%% SPDX-License-Identifier: Apache-2.0
%%%
%%% Durable reminders: cluster-wide, replicated, fire-at-most-once
%%% timers. `erlang:send_after/3' dies with the node; a reminder set
%%% here survives the node that armed it, because the reminder store is
%%% replicated and the firing node is chosen by placement.
%%%
%%% A reminder is `Key => {FireAtMs, Payload, H}' in a `barrel_p2p_replica'
%%% OR-Map (instance `barrel_p2p_reminder_replica'). `H' is a version HLC
%%% generated once when the reminder is set; it is identical on every
%%% node and names this exact reminder version (the OR-Map dot HLCs are
%%% separate and only resolve merges).
%%%
%%% The OWNER of a reminder is `barrel_p2p_shard:place(Key)'. Only the
%%% owner arms a local `erlang:send_after' and fires. Timers are
%%% VERSIONED HINTS: when `{fire, Key, H}' arrives we re-read the live
%%% entry and fire only if it still exists, still names version `H', this
%%% node is still the owner, and the fire time has been reached. That
%%% makes a stale timer harmless after a re-`remind' (new `H'), a
%%% `cancel_reminder/1', an ownership change, or a cancel that raced an
%%% already-queued timeout.
%%%
%%% Firing is TOMBSTONE-FIRST: write + gossip the tombstone, then deliver
%%% `{barrel_p2p_reminder, Key, Payload, Fence}' (Fence = pack(H)) to LOCAL
%%% subscribers. Committing "fired" before delivering biases toward not
%%% double-firing.
%%%
%%% Guarantee: exactly once in steady state (member set converged);
%%% best-effort under churn (two nodes can briefly both own a key and
%%% both fire before the tombstone propagates) or a crash at the fire
%%% instant (a crash between tombstone and delivery drops the fire; a
%%% crash before the tombstone gossips can let a survivor fire again).
%%% The delivered Fence lets a handler dedup if it wants at-least-once
%%% with idempotency. Fire time is wall-clock ("cluster-time"), subject
%%% to the same skew caveat as membership.
%%%
%%% Persistence: the reminder store is written to disk (a `barrel_p2p_replica_log'
%%% WAL + snapshot under `reminder_data_dir', default `data/reminders'), so
%%% reminders survive a FULL-cluster restart, not just an individual node's
%%% death. A `remind'/`cancel' is fsynced before it returns. On boot each
%%% node recovers its store and the cluster re-converges.
%%%
%%% Payload must be RESTART-SAFE DATA: a self-contained value with no pids,
%%% ports, refs, or funs. This is not new with persistence - a reminder is
%%% already delivered on whatever node OWNS the key at fire time, not where
%%% it was set, so a `Payload' carrying a local pid/ref is already a stale
%%% reference cross-node. Persistence merely extends that to "after a
%%% restart": a pid/ref/fun reloaded from disk points at a process or code
%%% version that no longer exists. Pass an id or descriptor the handler can
%%% resolve locally, not a live reference.
-module(barrel_p2p_reminder).
-behaviour(gen_server).
-behaviour(barrel_p2p_replica).

-include_lib("hlc/include/hlc.hrl").

%% Registered name of this feature's replication instance.
-define(REPLICA, barrel_p2p_reminder_replica).

%% Public API
-export([
    remind/3,
    remind_after/3,
    cancel_reminder/1,
    subscribe/1,
    unsubscribe/1
]).

%% Internal API
-export([start_link/0]).

%% barrel_p2p_replica callbacks
-export([
    replica_merge_delta/2,
    replica_apply_full_sync/2,
    replica_full_sync_snapshot/1,
    replica_remove_node/2,
    replica_anti_entropy/0
]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).

-define(SERVER, ?MODULE).
-define(DEFAULT_SCAN_MS, 1000).
-define(DEFAULT_TOMBSTONE_TTL_MS, (60 * 60 * 1000)).
%% Cap on a single send_after delay. A far-future reminder arms for at
%% most this long; the periodic scan re-arms it as the fire time nears.
%% Stays well under erlang's ~49-day send_after ceiling.
-define(MAX_TIMER_MS, (4 * 24 * 60 * 60 * 1000)).
%% Persistence: disk_log/snapshot store name and default directory.
-define(STORE, barrel_p2p_reminder).
-define(DEFAULT_DATA_DIR, "data/reminders").

-type key() :: term().
-type payload() :: term().
-type fence() :: non_neg_integer().

-export_type([fence/0]).

-record(state, {
    scan_ms :: pos_integer(),
    tombstone_ttl_ms :: non_neg_integer(),
    reminders = barrel_p2p_ormap:new() :: barrel_p2p_ormap:ormap(),
    %% Locally armed timers: Key -> {TimerRef, VersionHLC}
    timers = #{} :: #{key() => {reference(), barrel_p2p_hlc:timestamp()}},
    subscribers = #{} :: #{pid() => reference()},
    watch = #{} :: barrel_p2p_source_monitor:watch(),
    %% Disk persistence: the WAL+snapshot handle, and whether the store
    %% changed since the last snapshot (so the scan only snapshots on churn).
    log = undefined :: barrel_p2p_replica_log:handle(),
    dirty = false :: boolean()
}).

%%====================================================================
%% Public API
%%====================================================================

%% @doc Set a reminder for `Key' to fire at absolute wall-clock
%% `FireAtMs' (milliseconds, `erlang:system_time(millisecond)' scale),
%% delivering `Payload' on whichever node owns `Key' at fire time.
%% Re-setting an existing `Key' replaces it. Stability: beta.
-spec remind(key(), integer(), payload()) -> ok.
remind(Key, FireAtMs, Payload) ->
    gen_server:call(?SERVER, {remind, Key, FireAtMs, Payload}).

%% @doc Like `remind/3' but `DelayMs' from now; converted to an absolute
%% target so every node agrees on the fire time. Stability: beta.
-spec remind_after(key(), non_neg_integer(), payload()) -> ok.
remind_after(Key, DelayMs, Payload) ->
    gen_server:call(?SERVER, {remind_after, Key, DelayMs, Payload}).

%% @doc Cancel a pending reminder cluster-wide. Stability: beta.
-spec cancel_reminder(key()) -> ok.
cancel_reminder(Key) ->
    gen_server:call(?SERVER, {cancel, Key}).

-spec subscribe(pid()) -> ok.
subscribe(Pid) ->
    gen_server:call(?SERVER, {subscribe, Pid}).

-spec unsubscribe(pid()) -> ok.
unsubscribe(Pid) ->
    gen_server:call(?SERVER, {unsubscribe, Pid}).

%%====================================================================
%% Internal API
%%====================================================================

start_link() ->
    gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).

%%====================================================================
%% barrel_p2p_replica callbacks (run in the replica process)
%%====================================================================

replica_merge_delta(_Name, Delta) ->
    gen_server:cast(?SERVER, {merge_delta, Delta}).

replica_apply_full_sync(_Name, Snapshot) ->
    gen_server:cast(?SERVER, {apply_full_sync, Snapshot}).

replica_full_sync_snapshot(_Name) ->
    gen_server:call(?SERVER, snapshot).

%% A reminder must survive the node that armed it, so a peer leaving the
%% active view never drops reminders (re-ownership handles the rest).
replica_remove_node(_Name, _Node) ->
    ok.

%% Reminders are value-carrying and must reconverge after a heal even
%% without a fresh peer_up, so anti-entropy is intrinsic (no opt-out).
replica_anti_entropy() ->
    true.

%%====================================================================
%% gen_server callbacks
%%====================================================================

init([]) ->
    %% Trap exits so terminate/2 runs on supervisor shutdown and closes the
    %% disk_log cleanly (we also link to it via open). The only links are
    %% the supervisor and the log; stray exits hit the catch-all handler.
    process_flag(trap_exit, true),
    Scan = cfg(reminder_scan_ms, ?DEFAULT_SCAN_MS),
    TombTtl = cfg(reminder_tombstone_ttl_ms, ?DEFAULT_TOMBSTONE_TTL_MS),
    %% Recover the persisted store from disk (durable across a full-cluster
    %% restart). absorb_clock advances the local HLC past every recovered
    %% dot/version so a restarted node cannot mint a timestamp behind it.
    {Log, Reminders} = open_store(),
    %% Ownership transitions tell us when to take over (acquired) or hand
    %% off (released) a partition's reminders. Keep the subscription alive
    %% across a shard restart.
    Watch = barrel_p2p_source_monitor:start([barrel_p2p_shard]),
    arm_scan(Scan),
    %% Recovered reminders we own are armed by the first scan (which also
    %% resolves owners once the ring reforms).
    {ok, #state{
        scan_ms = Scan,
        tombstone_ttl_ms = TombTtl,
        watch = Watch,
        reminders = Reminders,
        log = Log
    }}.

%% Open the persistent store and recover the OR-Map. A failure (e.g. an
%% unwritable dir) is logged and degrades to in-memory only, never fatal.
open_store() ->
    Dir = cfg(reminder_data_dir, ?DEFAULT_DATA_DIR),
    case barrel_p2p_replica_log:open(?STORE, Dir) of
        {ok, Log, Map} ->
            ok = barrel_p2p_ormap:absorb_clock(Map),
            {Log, Map};
        {error, Reason} ->
            logger:warning(
                "barrel_p2p_reminder: persistence disabled, open "
                "failed: ~p",
                [Reason]
            ),
            {undefined, barrel_p2p_ormap:new()}
    end.

handle_call({remind, Key, FireAtMs, Payload}, _From, State) ->
    {reply, ok, do_remind(Key, FireAtMs, Payload, State)};
handle_call({remind_after, Key, DelayMs, Payload}, _From, State) ->
    {reply, ok, do_remind(Key, now_ms() + DelayMs, Payload, State)};
handle_call({cancel, Key}, _From, State) ->
    Reminders = barrel_p2p_ormap:remove(Key, State#state.reminders),
    barrel_p2p_replica:broadcast_update(?REPLICA, {remove, Key}),
    State1 = persist_key(Key, sync, State#state{reminders = Reminders}),
    {reply, ok, disarm(Key, State1)};
handle_call({subscribe, Pid}, _From, State) ->
    case maps:is_key(Pid, State#state.subscribers) of
        true ->
            {reply, ok, State};
        false ->
            Ref = monitor(process, Pid),
            Subs = maps:put(Pid, Ref, State#state.subscribers),
            {reply, ok, State#state{subscribers = Subs}}
    end;
handle_call({unsubscribe, Pid}, _From, State) ->
    case maps:take(Pid, State#state.subscribers) of
        {Ref, Subs} ->
            demonitor(Ref, [flush]),
            {reply, ok, State#state{subscribers = Subs}};
        error ->
            {reply, ok, State}
    end;
handle_call(snapshot, _From, State) ->
    Reply =
        case barrel_p2p_ormap:is_empty(State#state.reminders) of
            true -> empty;
            false -> {sync, State#state.reminders}
        end,
    {reply, Reply, State};
handle_call(_Request, _From, State) ->
    {reply, {error, unknown_request}, State}.

handle_cast({merge_delta, Delta}, State) ->
    {noreply, ingest(Delta, State)};
handle_cast({apply_full_sync, Snapshot}, State) ->
    {noreply, ingest(Snapshot, State)};
handle_cast(_Msg, State) ->
    {noreply, State}.

%% Versioned-hint timeout. Re-validate everything before firing.
handle_info({fire, Key, H}, State) ->
    State1 = clear_spent_timer(Key, H, State),
    case should_fire(Key, H, State1) of
        {true, Payload} -> {noreply, fire(Key, Payload, H, State1)};
        false -> {noreply, State1}
    end;
%% We became the owner of partition P: arm its reminders (this is how a
%% survivor picks up a dead owner's un-fired reminders).
handle_info({barrel_p2p_shard, {acquired, P}}, State) ->
    {noreply, reconcile_keys(keys_in_partition(P, State), State)};
%% We lost partition P: disarm its reminders.
handle_info({barrel_p2p_shard, {released, P}}, State) ->
    {noreply, reconcile_keys(keys_in_partition(P, State), State)};
%% Safety sweep: re-arm anything we own and missed, disarm anything we no
%% longer own, re-arm far-future reminders as their fire time nears, and GC
%% old fire/cancel tombstones so the replicated store stays bounded.
handle_info(scan, State) ->
    State1 = reconcile_keys(barrel_p2p_ormap:keys(State#state.reminders), State),
    Cutoff = now_ms() - State1#state.tombstone_ttl_ms,
    Old = State1#state.reminders,
    Reminders = barrel_p2p_ormap:gc_tombstones(Old, Cutoff),
    %% Snapshot (and truncate the WAL) when the store changed since the last
    %% snapshot, including a GC that dropped tombstones (purges them from
    %% disk). Stays quiet when fully idle.
    Dirty = State1#state.dirty orelse map_size(Reminders) =/= map_size(Old),
    State2 = State1#state{reminders = Reminders, dirty = false},
    case Dirty of
        true -> _ = barrel_p2p_replica_log:snapshot(State2#state.log, Reminders);
        false -> ok
    end,
    arm_scan(State2#state.scan_ms),
    {noreply, State2};
%% Re-subscribe if a watched source (the shard) restarted.
handle_info({barrel_p2p_source_monitor, retry, Source}, State) ->
    {noreply, State#state{
        watch = barrel_p2p_source_monitor:retry(Source, State#state.watch)
    }};
handle_info({'DOWN', Ref, process, Pid, _Reason}, State) ->
    case barrel_p2p_source_monitor:down(Ref, State#state.watch) of
        {down, _Source, Watch} ->
            {noreply, State#state{watch = Watch}};
        ignore ->
            case maps:get(Pid, State#state.subscribers, undefined) of
                Ref ->
                    Subs = maps:remove(Pid, State#state.subscribers),
                    {noreply, State#state{subscribers = Subs}};
                _ ->
                    {noreply, State}
            end
    end;
handle_info(_Info, State) ->
    {noreply, State}.

terminate(_Reason, State) ->
    _ = barrel_p2p_replica_log:close(State#state.log),
    ok.

%%====================================================================
%% Internal Functions
%%====================================================================

%% Merge a received delta/snapshot, rejecting malformed entries FIRST via
%% barrel_p2p_crdt_wire (it validates the whole wrapper - dots and HLCs -
%% before absorb_clock/merge, which would otherwise crash this gen_server
%% or the shared barrel_p2p_hlc server). We add the reminder-specific leaf
%% check on top: the payload must be our {FireAt, _, VersionHLC} shape.
ingest(Map, State) ->
    {Reminders, Accepted} =
        barrel_p2p_crdt_wire:ingest(State#state.reminders, Map, fun valid_leaf/1),
    State1 = State#state{reminders = Reminders},
    %% Persist state learned from peers too (so a node survives even if the
    %% originator's disk is lost). No fsync: gossip-rate, debounced by the
    %% scan snapshot.
    State2 = persist_keys(maps:keys(Accepted), nosync, State1),
    reconcile_keys(maps:keys(Accepted), State2).

%% A well-formed reminder leaf payload: {FireAt, Payload, VersionHLC}.
valid_leaf({FireAt, _Payload, #timestamp{}}) when is_integer(FireAt) -> true;
valid_leaf(_) -> false.

%% Insert locally first (broadcast does not mutate owner state), gossip
%% the add, then arm if we own the key.
-spec do_remind(key(), integer(), payload(), #state{}) -> #state{}.
do_remind(Key, FireAtMs, Payload, State) ->
    H = barrel_p2p_hlc:now(),
    Val = {FireAtMs, Payload, H},
    Reminders = barrel_p2p_ormap:add(Key, Val, State#state.reminders),
    barrel_p2p_replica:broadcast_update(?REPLICA, {add, Key, Val}),
    State1 = persist_key(Key, sync, State#state{reminders = Reminders}),
    reconcile_key(Key, State1).

%% Bring local timer state in line with the live entry and ownership:
%% own + live  -> armed for the entry's version,
%% live, not owned, or tombstoned -> disarmed.
reconcile_keys(Keys, State) ->
    lists:foldl(fun reconcile_key/2, State, Keys).

reconcile_key(Key, State) ->
    case barrel_p2p_ormap:get(Key, State#state.reminders) of
        {ok, {FireAt, _Payload, #timestamp{} = H}} when is_integer(FireAt) ->
            case barrel_p2p_shard:place(Key) =:= node() of
                true -> arm(Key, FireAt, H, State);
                false -> disarm(Key, State)
            end;
        %% Not a live, well-formed reminder (tombstoned, or a malformed
        %% value that slipped in): make sure nothing is armed for it.
        _ ->
            disarm(Key, State)
    end.

%% Arm is idempotent on version: an existing timer for the same H is left
%% in place; a different (or no) timer is (re)armed.
arm(Key, FireAt, H, State) ->
    case maps:get(Key, State#state.timers, undefined) of
        {_Ref, H} ->
            State;
        {OldRef, _OtherH} ->
            erlang:cancel_timer(OldRef),
            do_arm(Key, FireAt, H, State);
        undefined ->
            do_arm(Key, FireAt, H, State)
    end.

do_arm(Key, FireAt, H, State) ->
    Delay = min(max(0, FireAt - now_ms()), ?MAX_TIMER_MS),
    Ref = erlang:send_after(Delay, self(), {fire, Key, H}),
    State#state{timers = maps:put(Key, {Ref, H}, State#state.timers)}.

disarm(Key, State) ->
    case maps:take(Key, State#state.timers) of
        {{Ref, _H}, Timers} ->
            erlang:cancel_timer(Ref),
            State#state{timers = Timers};
        error ->
            State
    end.

%% Drop the timer entry for a fired hint, but only if it is the timer we
%% armed for this version (a re-remind may have replaced it).
clear_spent_timer(Key, H, State) ->
    case maps:get(Key, State#state.timers, undefined) of
        {_Ref, H} -> State#state{timers = maps:remove(Key, State#state.timers)};
        _ -> State
    end.

%% Fire only if the entry still exists at the same version, we still own
%% it, and the fire time has been reached.
-spec should_fire(key(), barrel_p2p_hlc:timestamp(), #state{}) ->
    {true, payload()} | false.
should_fire(Key, H, State) ->
    case barrel_p2p_ormap:get(Key, State#state.reminders) of
        {ok, {FireAt, Payload, H}} ->
            case
                barrel_p2p_shard:place(Key) =:= node() andalso
                    now_ms() >= FireAt
            of
                true -> {true, Payload};
                false -> false
            end;
        _ ->
            false
    end.

%% Tombstone-first: commit + gossip the removal, then deliver locally.
fire(Key, Payload, H, State) ->
    Reminders = barrel_p2p_ormap:remove(Key, State#state.reminders),
    barrel_p2p_replica:broadcast_update(?REPLICA, {remove, Key}),
    %% Persist the fire tombstone before delivering, so a restart cannot
    %% re-fire this reminder.
    State1 = persist_key(Key, sync, State#state{reminders = Reminders}),
    notify(State1#state.subscribers, Key, Payload, barrel_p2p_hlc:pack(H)),
    State1.

notify(Subs, Key, Payload, Fence) ->
    maps:foreach(
        fun(Pid, _Ref) -> Pid ! {barrel_p2p_reminder, Key, Payload, Fence} end,
        Subs
    ).

%% Append one key's current entry (value or tombstone) to the WAL, marking
%% the store dirty. `sync' forces the write to disk before returning (used
%% for user-facing writes and fires, so they are durable before the reply).
persist_key(Key, Sync, State) ->
    persist_keys([Key], Sync, State).

persist_keys(Keys, Sync, #state{log = Log, reminders = Reminders} = State) ->
    Delta = entries_of(Keys, Reminders),
    ok = barrel_p2p_replica_log:append(Log, Delta),
    case Sync of
        sync -> ok = barrel_p2p_replica_log:sync(Log);
        nosync -> ok
    end,
    State#state{dirty = State#state.dirty orelse map_size(Delta) > 0}.

%% Build a delta map of the current OR-Map entries for the given keys.
entries_of(Keys, Map) ->
    lists:foldl(
        fun(Key, Acc) ->
            case barrel_p2p_ormap:get_entry(Key, Map) of
                {ok, Entry} -> Acc#{Key => Entry};
                not_found -> Acc
            end
        end,
        #{},
        Keys
    ).

keys_in_partition(P, State) ->
    [
        K
     || K <- barrel_p2p_ormap:keys(State#state.reminders),
        barrel_p2p_shard:partition(K) =:= P
    ].

arm_scan(Scan) ->
    erlang:send_after(Scan, self(), scan).

now_ms() ->
    erlang:system_time(millisecond).

cfg(Key, Default) ->
    application:get_env(barrel_p2p, Key, Default).