Skip to main content

src/barrel_p2p_leader.erl

%%% -*- erlang -*-
%%% Copyright (c) 2026 Benoit Chesneau
%%% SPDX-License-Identifier: Apache-2.0
%%%
%%% Cluster-wide singleton / leader election.
%%%
%%% Processes campaign for a named singleton with `lead/2'. The winner
%%% is computed independently and identically on every node from the
%%% replicated candidate set: highest `priority', ties broken by the
%%% lowest node atom. No consensus is involved, matching barrel_p2p's
%%% AP/gossip posture.
%%%
%%% Each leadership term is handed a fencing token minted from the HLC
%%% and advanced past a replicated per-name high-water mark, so it is
%%% strictly monotonic within a connected partition. The leader stamps
%%% the token on writes; the protected resource rejects any operation
%%% whose token is not strictly greater than the highest it accepted.
%%%
%%% The candidate set is an OR-Map keyed `{Name, node()}', replicated
%%% through a `barrel_p2p_replica' instance (gossip deltas, full-sync on
%%% peer_up, prune on peer_down). Fencing high-water marks ride the same
%%% instance as a custom broadcast.
-module(barrel_p2p_leader).
-behaviour(gen_server).
-behaviour(barrel_p2p_replica).

-include_lib("hlc/include/hlc.hrl").

%% Registered name of this feature's replication instance.
-define(REPLICA, barrel_p2p_leader_replica).

%% Public API
-export([
    lead/1, lead/2,
    resign/1,
    leader/1,
    is_leader/1,
    fence/1,
    candidates/1
]).

%% Internal API (used by the replica callbacks below)
-export([start_link/0]).
-export([
    merge_remote/1,
    merge_fence/2,
    apply_full_sync/2,
    remove_node/1,
    get_state/0,
    high_water/1
]).

%% barrel_p2p_replica callbacks
-export([
    replica_merge_delta/2,
    replica_merge_custom/2,
    replica_apply_full_sync/2,
    replica_full_sync_snapshot/1,
    replica_remove_node/2
]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).

-define(SERVER, ?MODULE).

-type name() :: term().
-type fence() :: non_neg_integer().

-export_type([name/0, fence/0]).

-record(state, {
    %% This node's campaigns: Name -> {CandidatePid, MonitorRef, Priority}
    local = #{} :: #{name() => {pid(), reference(), integer()}},
    %% Replicated candidate set: {Name, node()} -> {Pid, Priority}
    cands = barrel_p2p_ormap:new() :: barrel_p2p_ormap:ormap(),
    %% Last-computed leader identity per *local* Name (transition detection)
    leaders = #{} :: #{name() => {node(), pid()}},
    %% Replicated per-name high-water mark (greatest term token observed)
    fences = #{} :: #{name() => barrel_p2p_hlc:timestamp()},
    %% Token self minted for its current term (only while self leads)
    my_fence = #{} :: #{name() => barrel_p2p_hlc:timestamp()}
}).

%%====================================================================
%% Public API
%%====================================================================

-spec lead(name()) -> {ok, {leader, fence()}} | {ok, follower} | {error, term()}.
lead(Name) ->
    lead(Name, #{}).

-spec lead(name(), map()) ->
    {ok, {leader, fence()}} | {ok, follower} | {error, term()}.
lead(Name, Opts) when is_map(Opts) ->
    Priority = maps:get(priority, Opts, 0),
    gen_server:call(?SERVER, {lead, Name, Priority}).

-spec resign(name()) -> ok.
resign(Name) ->
    gen_server:call(?SERVER, {resign, Name}).

-spec leader(name()) -> {ok, node(), pid()} | {error, no_leader}.
leader(Name) ->
    gen_server:call(?SERVER, {leader, Name}).

-spec is_leader(name()) -> boolean().
is_leader(Name) ->
    gen_server:call(?SERVER, {is_leader, Name}).

-spec fence(name()) -> {ok, fence()} | {error, not_leader}.
fence(Name) ->
    gen_server:call(?SERVER, {fence, Name}).

-spec candidates(name()) -> [node()].
candidates(Name) ->
    gen_server:call(?SERVER, {candidates, Name}).

%%====================================================================
%% Internal API
%%====================================================================

start_link() ->
    gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).

-spec merge_remote(barrel_p2p_ormap:ormap()) -> ok.
merge_remote(Delta) ->
    gen_server:cast(?SERVER, {merge_remote, Delta}).

-spec merge_fence(name(), barrel_p2p_hlc:timestamp()) -> ok.
merge_fence(Name, Fence) ->
    gen_server:cast(?SERVER, {merge_fence, Name, Fence}).

-spec apply_full_sync(
    barrel_p2p_ormap:ormap(),
    #{name() => barrel_p2p_hlc:timestamp()}
) -> ok.
apply_full_sync(Cands, Fences) ->
    gen_server:cast(?SERVER, {apply_full_sync, Cands, Fences}).

-spec remove_node(node()) -> ok.
remove_node(Node) ->
    gen_server:cast(?SERVER, {remove_node, Node}).

%% Returns {Cands, Fences} for a peer's full sync.
-spec get_state() -> {barrel_p2p_ormap:ormap(), #{name() => barrel_p2p_hlc:timestamp()}}.
get_state() ->
    gen_server:call(?SERVER, get_state).

%% Packed high-water token for a name, or undefined. For ops/debug.
-spec high_water(name()) -> fence() | undefined.
high_water(Name) ->
    gen_server:call(?SERVER, {high_water, Name}).

%%====================================================================
%% barrel_p2p_replica callbacks
%%====================================================================

replica_merge_delta(_Inst, Delta) ->
    merge_remote(Delta).

replica_merge_custom(_Inst, {Name, Fence}) ->
    merge_fence(Name, Fence);
replica_merge_custom(_Inst, _Other) ->
    %% Malformed custom payload (not a {Name, Fence} tuple): drop it.
    ok.

replica_apply_full_sync(_Inst, {Cands, Fences}) ->
    apply_full_sync(Cands, Fences).

replica_full_sync_snapshot(_Inst) ->
    {Cands, Fences} = get_state(),
    case barrel_p2p_ormap:is_empty(Cands) andalso map_size(Fences) =:= 0 of
        true -> empty;
        false -> {sync, {Cands, Fences}}
    end.

replica_remove_node(_Inst, Node) ->
    remove_node(Node).

%%====================================================================
%% gen_server callbacks
%%====================================================================

init([]) ->
    {ok, #state{}}.

handle_call({lead, Name, Priority}, {Pid, _Tag}, State) ->
    case maps:is_key(Name, State#state.local) of
        true ->
            {reply, {error, already_candidate}, State};
        false ->
            Ref = monitor(process, Pid),
            Key = {Name, node()},
            Val = {Pid, Priority},
            Cands = barrel_p2p_ormap:add(Key, Val, State#state.cands),
            Local = maps:put(Name, {Pid, Ref, Priority}, State#state.local),
            barrel_p2p_replica:broadcast_update(?REPLICA, {add, Key, Val}),
            State1 = State#state{cands = Cands, local = Local},
            %% We just added ourselves, so leader_of/2 is non-empty.
            {LNode, LPid} = leader_of(Name, Cands),
            case LNode =:= node() of
                true ->
                    {Fence, State2} = become_leader(Name, State1),
                    Leaders = maps:put(Name, {LNode, LPid}, State2#state.leaders),
                    {reply, {ok, {leader, Fence}}, State2#state{leaders = Leaders}};
                false ->
                    Leaders = maps:put(Name, {LNode, LPid}, State1#state.leaders),
                    {reply, {ok, follower}, State1#state{leaders = Leaders}}
            end
    end;
handle_call({resign, Name}, _From, State) ->
    {reply, ok, drop_local(Name, demonitor, State)};
handle_call({leader, Name}, _From, State) ->
    case leader_of(Name, State#state.cands) of
        none -> {reply, {error, no_leader}, State};
        {Node, Pid} -> {reply, {ok, Node, Pid}, State}
    end;
handle_call({is_leader, Name}, _From, State) ->
    Reply =
        case leader_of(Name, State#state.cands) of
            {Node, _Pid} -> Node =:= node();
            none -> false
        end,
    {reply, Reply, State};
handle_call({fence, Name}, _From, State) ->
    Reply =
        case is_self_leader(maps:get(Name, State#state.leaders, none)) of
            true ->
                case maps:get(Name, State#state.my_fence, undefined) of
                    undefined -> {error, not_leader};
                    F -> {ok, barrel_p2p_hlc:pack(F)}
                end;
            false ->
                {error, not_leader}
        end,
    {reply, Reply, State};
handle_call({candidates, Name}, _From, State) ->
    Nodes = [
        Node
     || {{N, Node}, _V} <- barrel_p2p_ormap:to_list(State#state.cands),
        N =:= Name
    ],
    {reply, lists:usort(Nodes), State};
handle_call({high_water, Name}, _From, State) ->
    Reply =
        case maps:get(Name, State#state.fences, undefined) of
            undefined -> undefined;
            F -> barrel_p2p_hlc:pack(F)
        end,
    {reply, Reply, State};
handle_call(get_state, _From, State) ->
    {reply, {State#state.cands, State#state.fences}, State};
handle_call(_Request, _From, State) ->
    {reply, {error, unknown_request}, State}.

handle_cast({merge_remote, Delta}, State) ->
    %% Validate the wrapper + candidate leaf before absorb/merge so a
    %% malformed peer delta cannot crash this gen_server or barrel_p2p_hlc.
    {Cands, _Acc} = barrel_p2p_crdt_wire:ingest(
        State#state.cands, Delta, fun valid_candidate/1
    ),
    {noreply, recompute_local(State#state{cands = Cands})};
handle_cast({merge_fence, Name, F}, State) ->
    %% A fence is an HLC timestamp; reject anything else before it reaches
    %% barrel_p2p_hlc:update/compare.
    case valid_fence(F) of
        true ->
            barrel_p2p_hlc:update(F),
            Fences = raise_fence(State#state.fences, Name, F),
            {noreply, State#state{fences = Fences}};
        false ->
            {noreply, State}
    end;
handle_cast({apply_full_sync, RemoteCands, RemoteFences}, State) ->
    {Cands, _Acc} = barrel_p2p_crdt_wire:ingest(
        State#state.cands, RemoteCands, fun valid_candidate/1
    ),
    Fences = merge_fences(State#state.fences, valid_fences(RemoteFences)),
    {noreply, recompute_local(State#state{cands = Cands, fences = Fences})};
handle_cast({remove_node, Node}, State) ->
    Cands = maps:filter(
        fun({_Name, N}, _Entry) -> N =/= Node end,
        State#state.cands
    ),
    {noreply, recompute_local(State#state{cands = Cands})};
handle_cast(_Msg, State) ->
    {noreply, State}.

handle_info({'DOWN', Ref, process, _Pid, _Reason}, State) ->
    case find_name_by_ref(Ref, State#state.local) of
        {ok, Name} ->
            %% Monitor already cleared by the process exit; just drop.
            {noreply, drop_local(Name, no_demonitor, State)};
        error ->
            {noreply, State}
    end;
handle_info(_Info, State) ->
    {noreply, State}.

terminate(_Reason, _State) ->
    ok.

%%====================================================================
%% Internal Functions
%%====================================================================

%% Remove this node's candidacy for Name: tombstone the OR-Map key,
%% drop bookkeeping, broadcast the removal. `Demon' controls whether we
%% demonitor (resign) or leave it (the monitored process already died).
drop_local(Name, Demon, State) ->
    case maps:take(Name, State#state.local) of
        {{_Pid, Ref, _Prio}, Local} ->
            case Demon of
                demonitor -> demonitor(Ref, [flush]);
                no_demonitor -> ok
            end,
            Key = {Name, node()},
            Cands = barrel_p2p_ormap:remove(Key, State#state.cands),
            barrel_p2p_replica:broadcast_update(?REPLICA, {remove, Key}),
            State#state{
                local = Local,
                cands = Cands,
                leaders = maps:remove(Name, State#state.leaders),
                my_fence = maps:remove(Name, State#state.my_fence)
            };
        error ->
            State
    end.

%% Mint a fencing token for a term that self is starting: advance the
%% local HLC past the replicated high-water, take a fresh timestamp
%% (now strictly greater), raise + gossip the high-water, and record it
%% as our current term token. Returns the packed integer token.
become_leader(Name, State) ->
    HighWater = maps:get(Name, State#state.fences, undefined),
    case HighWater of
        undefined -> ok;
        _ -> barrel_p2p_hlc:update(HighWater)
    end,
    F = barrel_p2p_hlc:now(),
    Fences = raise_fence(State#state.fences, Name, F),
    MyFence = maps:put(Name, F, State#state.my_fence),
    barrel_p2p_replica:broadcast_custom(?REPLICA, {Name, F}),
    {barrel_p2p_hlc:pack(F), State#state{fences = Fences, my_fence = MyFence}}.

%% Recompute leadership for every local campaign and fire transitions.
recompute_local(State) ->
    lists:foldl(fun recompute_name/2, State, maps:keys(State#state.local)).

recompute_name(Name, State) ->
    New = leader_of(Name, State#state.cands),
    Old = maps:get(Name, State#state.leaders, none),
    case New =:= Old of
        true -> State;
        false -> apply_transition(Name, Old, New, State)
    end.

apply_transition(Name, Old, New, State) ->
    WasSelf = is_self_leader(Old),
    IsSelf = is_self_leader(New),
    State1 =
        if
            IsSelf andalso not WasSelf ->
                {Fence, S} = become_leader(Name, State),
                notify_local(Name, {elected, Fence}, S),
                S;
            WasSelf andalso not IsSelf ->
                S = State#state{
                    my_fence = maps:remove(Name, State#state.my_fence)
                },
                notify_local(Name, revoked, S),
                S;
            true ->
                State
        end,
    Leaders =
        case New of
            none -> maps:remove(Name, State1#state.leaders);
            _ -> maps:put(Name, New, State1#state.leaders)
        end,
    State1#state{leaders = Leaders}.

notify_local(Name, Msg, State) ->
    case maps:get(Name, State#state.local, undefined) of
        {Pid, _Ref, _Prio} -> Pid ! {barrel_p2p_leader, Name, Msg};
        undefined -> ok
    end.

%% Winner among live candidates for Name: highest priority, ties to the
%% lowest node atom. `none' when there are no candidates.
leader_of(Name, ORMap) ->
    Cands = [
        {Node, Pid, Prio}
     || {{N, Node}, {Pid, Prio}} <- barrel_p2p_ormap:to_list(ORMap),
        N =:= Name
    ],
    case Cands of
        [] ->
            none;
        _ ->
            [{BNode, BPid, _} | _] = lists:sort(fun cmp_cand/2, Cands),
            {BNode, BPid}
    end.

cmp_cand({Na, _Pa, Pria}, {Nb, _Pb, Prib}) ->
    {-Pria, Na} =< {-Prib, Nb}.

is_self_leader({Node, _Pid}) -> Node =:= node();
is_self_leader(none) -> false.

%% Keep the HLC-greater token for a name.
raise_fence(Fences, Name, F) ->
    case maps:get(Name, Fences, undefined) of
        undefined ->
            maps:put(Name, F, Fences);
        Old ->
            case barrel_p2p_hlc:compare(F, Old) of
                gt -> maps:put(Name, F, Fences);
                _ -> Fences
            end
    end.

merge_fences(F1, F2) ->
    maps:fold(fun(Name, F, Acc) -> raise_fence(Acc, Name, F) end, F1, F2).

%% Gossip-ingest validators (used to reject malformed peer payloads before
%% they reach the OR-Map merge or the shared barrel_p2p_hlc server).

%% A candidate OR-Map leaf value: {Pid, Priority}.
valid_candidate({Pid, Prio}) when is_pid(Pid), is_integer(Prio) -> true;
valid_candidate(_) -> false.

%% A fence is an HLC timestamp.
valid_fence(#timestamp{}) -> true;
valid_fence(_) -> false.

%% Keep only well-formed fences from a peer's fence map (non-map -> empty).
valid_fences(Fences) when is_map(Fences) ->
    maps:filter(fun(_Name, F) -> valid_fence(F) end, Fences);
valid_fences(_) ->
    #{}.

find_name_by_ref(Ref, Local) ->
    case [N || {N, {_P, R, _Pr}} <- maps:to_list(Local), R =:= Ref] of
        [Name | _] -> {ok, Name};
        [] -> error
    end.