Skip to main content

src/barrel_p2p_shard.erl

%%% -*- erlang -*-
%%% Copyright (c) 2026 Benoit Chesneau
%%% SPDX-License-Identifier: Apache-2.0
%%%
%%% Sharded service placement: given a key, agree cluster-wide on the
%%% node that should own it, and emit a churn-aware ownership event so
%%% owners can hand off / take over. This is the substrate for
%%% partitioning state across nodes.
%%%
%%% Membership is a replicated, LEASE-based live-node set (not the
%%% bounded HyParView active view, and not driven by `peer_down', which
%%% is active-view churn rather than cluster death). Each node gossips a
%%% periodic heartbeat carrying its wall-clock time; a node is "in the
%%% ring" while its lease is fresh (`Now - EmitWallMs =< member_ttl_ms').
%%% Heartbeats too far in the future are rejected so a fast clock cannot
%%% pin a dead node. This converges WITHOUT tombstones: a stale entry
%%% carried in a full-sync is already expired by its timestamp.
%%%
%%% Placement is rendezvous (HRW) hashing over the live set, bucketed
%%% into `ring_size' partitions (so ownership events are finite and a
%%% departing node moves only its own partitions). Ownership is
%%% computed deterministically as `max {phash2({Node, P}), Node}'.
%%%
%%% The live member list is published to a read-concurrency ETS table so
%%% `place/1' and friends are lock-free pure reads off the hot path.
%%% The replicated set rides a `barrel_p2p_replica' instance named
%%% `barrel_p2p_members_replica' (callback = this module).
-module(barrel_p2p_shard).
-behaviour(gen_server).
-behaviour(barrel_p2p_replica).

%% Public API (pure reads + subscription)
-export([
    place/1,
    owners/2,
    is_owner/1,
    partition/1,
    members/0,
    subscribe/1,
    unsubscribe/1
]).

%% Internal API
-export([start_link/0]).

%% barrel_p2p_replica callbacks
-export([
    replica_merge_delta/2,
    replica_apply_full_sync/2,
    replica_full_sync_snapshot/1,
    replica_remove_node/2
]).

%% Internal (invoked by the replica callbacks; owner/2 also used by tests)
-export([merge_delta/1, apply_full_sync/1, snapshot/0, owner/2]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).

-define(SERVER, ?MODULE).
-define(TAB, ?MODULE).
-define(REPLICA, barrel_p2p_members_replica).
-define(DEFAULT_RING_SIZE, 64).
-define(DEFAULT_HEARTBEAT_MS, 2000).
-define(DEFAULT_TTL_MS, 6000).
-define(DEFAULT_SKEW_MS, 5000).

-record(state, {
    ring_size :: pos_integer(),
    heartbeat_ms :: pos_integer(),
    ttl_ms :: pos_integer(),
    skew_ms :: non_neg_integer(),
    leases = #{} :: #{node() => integer()},
    members = [] :: [node()],
    owned = #{} :: #{non_neg_integer() => true},
    subscribers = #{} :: #{pid() => reference()},
    watch = #{} :: barrel_p2p_source_monitor:watch()
}).

%%====================================================================
%% Public API (pure, read from ETS, off the gen_server hot path)
%%====================================================================

%% @doc The node that should own `Key' cluster-wide.
-spec place(term()) -> node() | undefined.
place(Key) ->
    owner(partition(Key), members()).

%% @doc The top-`N' distinct owner nodes for `Key' (for replicated
%% placement), best owner first.
-spec owners(term(), pos_integer()) -> [node()].
owners(Key, N) ->
    P = partition(Key),
    Desc = lists:reverse(
        lists:sort(
            [{erlang:phash2({Nd, P}), Nd} || Nd <- members()]
        )
    ),
    [Nd || {_Score, Nd} <- lists:sublist(Desc, N)].

%% @doc Whether this node currently owns `Key'.
-spec is_owner(term()) -> boolean().
is_owner(Key) ->
    place(Key) =:= node().

%% @doc The ring partition `Key' falls in. Consumers (e.g. reminders)
%% use this to map keys to partitions without duplicating ring/hash
%% logic.
-spec partition(term()) -> non_neg_integer().
partition(Key) ->
    erlang:phash2(Key, ring_size()).

%% @doc The current live member set (sorted).
-spec members() -> [node()].
members() ->
    case ets:info(?TAB, name) of
        undefined ->
            [];
        _ ->
            case ets:lookup(?TAB, members) of
                [{members, M}] -> M;
                [] -> []
            end
    end.

-spec subscribe(pid()) -> ok.
subscribe(Pid) ->
    gen_server:call(?SERVER, {subscribe, Pid}).

-spec unsubscribe(pid()) -> ok.
unsubscribe(Pid) ->
    gen_server:call(?SERVER, {unsubscribe, Pid}).

%%====================================================================
%% Internal API
%%====================================================================

start_link() ->
    gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).

%%====================================================================
%% barrel_p2p_replica callbacks (run in the replica process)
%%====================================================================

replica_merge_delta(_Name, Delta) ->
    merge_delta(Delta).

replica_apply_full_sync(_Name, Snapshot) ->
    apply_full_sync(Snapshot).

replica_full_sync_snapshot(_Name) ->
    snapshot().

%% Membership is lease-based; a peer leaving the active view is not
%% cluster death, so do nothing here (the lease expiry handles it).
replica_remove_node(_Name, _Node) ->
    ok.

-spec merge_delta(barrel_p2p_ormap:ormap()) -> ok.
merge_delta(Delta) ->
    gen_server:cast(?SERVER, {merge_delta, Delta}).

-spec apply_full_sync(#{node() => integer()}) -> ok.
apply_full_sync(Snapshot) ->
    gen_server:cast(?SERVER, {apply_full_sync, Snapshot}).

-spec snapshot() -> {sync, #{node() => integer()}} | empty.
snapshot() ->
    gen_server:call(?SERVER, snapshot).

%%====================================================================
%% gen_server callbacks
%%====================================================================

init([]) ->
    RingSize = cfg(ring_size, ?DEFAULT_RING_SIZE),
    Hb = cfg(member_heartbeat_ms, ?DEFAULT_HEARTBEAT_MS),
    Ttl = cfg(member_ttl_ms, ?DEFAULT_TTL_MS),
    Skew = cfg(member_skew_ms, ?DEFAULT_SKEW_MS),
    ?TAB = ets:new(?TAB, [named_table, protected, set, {read_concurrency, true}]),
    ets:insert(?TAB, {ring_size, RingSize}),
    %% Self is always live; seed the lease and publish before anyone
    %% can call place/1.
    Now = now_ms(),
    Leases = #{node() => Now},
    Members = [node()],
    ets:insert(?TAB, {members, Members}),
    Owned = compute_owned(Members, RingSize),
    %% Convergence hints: heartbeats reach the whole cluster, and a new
    %% peer triggers a full-sync (in the replica) plus an immediate beat.
    %% Keep the subscription alive across a hyparview-events restart.
    Watch = barrel_p2p_source_monitor:start([barrel_p2p_hyparview_events]),
    %% Do NOT broadcast inline here: the replica process may not be
    %% registered yet, and broadcast_update/2 is a cast that would be
    %% dropped. The first heartbeat fires from the timer.
    arm_timer(Hb),
    {ok, #state{
        ring_size = RingSize,
        heartbeat_ms = Hb,
        ttl_ms = Ttl,
        skew_ms = Skew,
        leases = Leases,
        members = Members,
        owned = Owned,
        watch = Watch
    }}.

handle_call({subscribe, Pid}, _From, State) ->
    case maps:is_key(Pid, State#state.subscribers) of
        true ->
            {reply, ok, State};
        false ->
            Ref = monitor(process, Pid),
            Subs = maps:put(Pid, Ref, State#state.subscribers),
            {reply, ok, State#state{subscribers = Subs}}
    end;
handle_call({unsubscribe, Pid}, _From, State) ->
    case maps:take(Pid, State#state.subscribers) of
        {Ref, Subs} ->
            demonitor(Ref, [flush]),
            {reply, ok, State#state{subscribers = Subs}};
        error ->
            {reply, ok, State}
    end;
handle_call(snapshot, _From, State) ->
    Now = now_ms(),
    Ttl = State#state.ttl_ms,
    Live = maps:filter(
        fun(_N, Emit) -> Now - Emit =< Ttl end,
        State#state.leases
    ),
    Reply =
        case map_size(Live) of
            0 -> empty;
            _ -> {sync, Live}
        end,
    {reply, Reply, State};
handle_call(_Request, _From, State) ->
    {reply, {error, unknown_request}, State}.

handle_cast({merge_delta, Delta}, State) ->
    Now = now_ms(),
    Skew = State#state.skew_ms,
    %% Validate the OR-Map wrapper (dots/HLCs) and the {alive, Emit} leaf
    %% first, so a malformed peer delta cannot crash absorb_clock or the
    %% shared barrel_p2p_hlc server.
    Valid = barrel_p2p_crdt_wire:accept(Delta, fun valid_alive/1),
    %% Then accept only plausible heartbeats. The guard is a FUTURE bound:
    %% reject `Emit > Now + skew' so a fast clock cannot pin a dead node.
    %% Past-staleness is handled by the TTL, not here.
    Accepted = maps:filter(
        fun
            (_Node, {value, {alive, Emit}, _Dots}) -> Emit =< Now + Skew;
            (_Node, _Other) -> false
        end,
        Valid
    ),
    %% Filter BEFORE absorbing: barrel_p2p_hlc:update/1 accepts future
    %% timestamps, so absorbing a rejected far-future dot would still
    %% move our clock forward.
    ok = barrel_p2p_ormap:absorb_clock(Accepted),
    Leases = maps:fold(
        fun
            (Node, {value, {alive, Emit}, _Dots}, Acc) ->
                lww(Node, Emit, Acc);
            (_Node, _Other, Acc) ->
                Acc
        end,
        State#state.leases,
        Accepted
    ),
    {noreply, recompute(State#state{leases = Leases})};
handle_cast({apply_full_sync, Snapshot}, State) when is_map(Snapshot) ->
    Now = now_ms(),
    Skew = State#state.skew_ms,
    %% Snapshot carries plain wall-clock leases (no OR-Map dots), so there is
    %% nothing to absorb; guard the shape (node atom + integer ms), then
    %% future-bound and LWW-merge.
    Leases = maps:fold(
        fun
            (Node, Emit, Acc) when
                is_atom(Node), is_integer(Emit), Emit =< Now + Skew
            ->
                lww(Node, Emit, Acc);
            (_Node, _Emit, Acc) ->
                Acc
        end,
        State#state.leases,
        Snapshot
    ),
    {noreply, recompute(State#state{leases = Leases})};
handle_cast({apply_full_sync, _NotAMap}, State) ->
    {noreply, State};
handle_cast(_Msg, State) ->
    {noreply, State}.

handle_info(heartbeat, State) ->
    State1 = do_heartbeat(State),
    arm_timer(State1#state.heartbeat_ms),
    {noreply, State1};
%% A new peer: gossip our liveness immediately to speed convergence
%% (the replica also full-syncs the member set on peer_up).
handle_info({barrel_p2p_event, {peer_up, _Node}}, State) ->
    {noreply, do_heartbeat(State)};
handle_info({barrel_p2p_event, _Other}, State) ->
    {noreply, State};
%% Re-subscribe if a watched source (hyparview events) restarted.
handle_info({barrel_p2p_source_monitor, retry, Source}, State) ->
    {noreply, State#state{
        watch = barrel_p2p_source_monitor:retry(Source, State#state.watch)
    }};
handle_info({'DOWN', Ref, process, Pid, _Reason}, State) ->
    case barrel_p2p_source_monitor:down(Ref, State#state.watch) of
        {down, _Source, Watch} ->
            {noreply, State#state{watch = Watch}};
        ignore ->
            case maps:get(Pid, State#state.subscribers, undefined) of
                Ref ->
                    Subs = maps:remove(Pid, State#state.subscribers),
                    {noreply, State#state{subscribers = Subs}};
                _ ->
                    {noreply, State}
            end
    end;
handle_info(_Info, State) ->
    {noreply, State}.

terminate(_Reason, _State) ->
    ok.

%%====================================================================
%% Internal Functions
%%====================================================================

%% Refresh our own lease, gossip it, then sweep + recompute.
do_heartbeat(State) ->
    Now = now_ms(),
    Leases = maps:put(node(), Now, State#state.leases),
    barrel_p2p_replica:broadcast_update(?REPLICA, {add, node(), {alive, Now}}),
    recompute(State#state{leases = Leases}).

%% GC expired leases from the local map (bounding its size), derive the
%% live set, and on a change publish it + emit ownership transitions.
recompute(State) ->
    Now = now_ms(),
    Ttl = State#state.ttl_ms,
    Leases = maps:filter(
        fun(_N, Emit) -> Now - Emit =< Ttl end,
        State#state.leases
    ),
    Members = lists:sort(maps:keys(Leases)),
    case Members =:= State#state.members of
        true ->
            State#state{leases = Leases};
        false ->
            ets:insert(?TAB, {members, Members}),
            NewOwned = compute_owned(Members, State#state.ring_size),
            emit_changes(State#state.owned, NewOwned, State#state.subscribers),
            State#state{leases = Leases, members = Members, owned = NewOwned}
    end.

lww(Node, Emit, Acc) ->
    case Emit > maps:get(Node, Acc, 0) of
        true -> maps:put(Node, Emit, Acc);
        false -> Acc
    end.

%% Leaf validator for a gossiped heartbeat value: {alive, EmitMs}. Used by
%% barrel_p2p_crdt_wire to reject malformed peer values before the OR-Map merge.
valid_alive({alive, Emit}) when is_integer(Emit) -> true;
valid_alive(_) -> false.

compute_owned(Members, RingSize) ->
    Self = node(),
    maps:from_list(
        [
            {P, true}
         || P <- lists:seq(0, RingSize - 1),
            owner(P, Members) =:= Self
        ]
    ).

emit_changes(OldOwned, NewOwned, Subs) ->
    Acquired = maps:keys(maps:without(maps:keys(OldOwned), NewOwned)),
    Released = maps:keys(maps:without(maps:keys(NewOwned), OldOwned)),
    lists:foreach(fun(P) -> notify(Subs, {acquired, P}) end, Acquired),
    lists:foreach(fun(P) -> notify(Subs, {released, P}) end, Released).

notify(Subs, Event) ->
    maps:foreach(fun(Pid, _Ref) -> Pid ! {barrel_p2p_shard, Event} end, Subs).

%% HRW: the node maximizing {phash2({Node, P}), Node}. The trailing
%% Node is a deterministic tie-breaker (phash2 can collide; ownership
%% must not depend on traversal order).
owner(_P, []) ->
    undefined;
owner(P, Members) ->
    {_Score, Node} = lists:max([{erlang:phash2({N, P}), N} || N <- Members]),
    Node.

ring_size() ->
    case ets:info(?TAB, name) of
        undefined ->
            ?DEFAULT_RING_SIZE;
        _ ->
            case ets:lookup(?TAB, ring_size) of
                [{ring_size, R}] -> R;
                [] -> ?DEFAULT_RING_SIZE
            end
    end.

arm_timer(Hb) ->
    erlang:send_after(Hb, self(), heartbeat).

now_ms() ->
    erlang:system_time(millisecond).

cfg(Key, Default) ->
    application:get_env(barrel_p2p, Key, Default).