%%% -*- erlang -*-
%%% Copyright (c) 2026 Benoit Chesneau
%%% SPDX-License-Identifier: Apache-2.0
%%%
-module(barrel_p2p_plumtree).
-behaviour(gen_server).
%% Plumtree: Epidemic Broadcast Trees
%% Efficient reliable broadcast over HyParView overlay
%%
%% Key concepts:
%% - Eager peers: Push messages immediately (fast path)
%% - Lazy peers: Send IHAVEs, request via GRAFT (recovery path)
%% - Self-healing: GRAFT repairs missing messages
%% - O(n) messages vs O(n²) for flooding
-include("barrel_p2p.hrl").
-include_lib("hlc/include/hlc.hrl").
%% API
-export([start_link/0]).
-export([broadcast/2, broadcast/3]).
-export([subscribe/1, unsubscribe/1]).
%% Internal API (used by sync)
-export([get_stats/0]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
-define(SERVER, ?MODULE).
-define(PLUMTREE_TAG, '$barrel_p2p_plumtree').
%% Time to wait before requesting via GRAFT
-define(IHAVE_TIMEOUT, 1000).
%% Keep messages for 5 minutes
-define(MESSAGE_TTL, 300000).
%% Cleanup old messages every minute
-define(CLEANUP_INTERVAL, 60000).
-record(state, {
%% Peer classification
%% Push immediately
eager_peers = [] :: [node()],
%% Send IHAVEs only
lazy_peers = [] :: [node()],
%% Message tracking (uses HLC timestamps for clock-skew-tolerant TTL)
%% MsgId -> {Payload, HLC}
received = #{} :: #{binary() => {term(), barrel_p2p_hlc:timestamp()}},
%% MsgId -> {Sender, TimerRef}
pending_ihaves = #{} :: #{binary() => {node(), reference()}},
%% Subscribers for delivered messages
subscribers = #{} :: #{pid() => reference()},
%% Keeps our own hyparview-events subscription alive across a restart
watch = #{} :: barrel_p2p_source_monitor:watch(),
%% Stats
gossip_sent = 0 :: non_neg_integer(),
gossip_received = 0 :: non_neg_integer(),
ihave_sent = 0 :: non_neg_integer(),
graft_sent = 0 :: non_neg_integer(),
prune_sent = 0 :: non_neg_integer()
}).
%%====================================================================
%% API
%%====================================================================
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
%% Broadcast a message to all nodes
-spec broadcast(term(), term()) -> ok.
broadcast(Tag, Payload) ->
MsgId = generate_msg_id(),
broadcast(Tag, Payload, MsgId).
-spec broadcast(term(), term(), binary()) -> ok.
broadcast(Tag, Payload, MsgId) ->
gen_server:cast(?SERVER, {broadcast, Tag, Payload, MsgId}).
%% Subscribe to receive broadcast messages
-spec subscribe(pid()) -> ok.
subscribe(Pid) ->
gen_server:call(?SERVER, {subscribe, Pid}).
%% Unsubscribe from broadcast messages
-spec unsubscribe(pid()) -> ok.
unsubscribe(Pid) ->
gen_server:call(?SERVER, {unsubscribe, Pid}).
%% Get broadcast statistics
-spec get_stats() -> map().
get_stats() ->
gen_server:call(?SERVER, get_stats).
%%====================================================================
%% gen_server callbacks
%%====================================================================
init([]) ->
%% Subscribe to HyParView events, keeping the subscription alive if
%% the events process restarts (we need peer_up/peer_down to keep the
%% eager/lazy peer lists correct).
Watch = barrel_p2p_source_monitor:start([barrel_p2p_hyparview_events]),
%% Initialize eager peers from current active view
EagerPeers = barrel_p2p:active_view(),
%% Schedule periodic cleanup
erlang:send_after(?CLEANUP_INTERVAL, self(), cleanup),
{ok, #state{eager_peers = EagerPeers, watch = Watch}}.
handle_call({subscribe, Pid}, _From, State) ->
case maps:is_key(Pid, State#state.subscribers) of
true ->
{reply, ok, State};
false ->
Ref = monitor(process, Pid),
Subs = maps:put(Pid, Ref, State#state.subscribers),
{reply, ok, State#state{subscribers = Subs}}
end;
handle_call({unsubscribe, Pid}, _From, State) ->
case maps:take(Pid, State#state.subscribers) of
{Ref, Subs} ->
demonitor(Ref, [flush]),
{reply, ok, State#state{subscribers = Subs}};
error ->
{reply, ok, State}
end;
handle_call(get_stats, _From, State) ->
Stats = #{
eager_peers => length(State#state.eager_peers),
lazy_peers => length(State#state.lazy_peers),
cached_messages => maps:size(State#state.received),
pending_ihaves => maps:size(State#state.pending_ihaves),
gossip_sent => State#state.gossip_sent,
gossip_received => State#state.gossip_received,
ihave_sent => State#state.ihave_sent,
graft_sent => State#state.graft_sent,
prune_sent => State#state.prune_sent
},
{reply, Stats, State};
handle_call(_Request, _From, State) ->
{reply, ok, State}.
handle_cast({broadcast, Tag, Payload, MsgId}, State) ->
%% Check for duplicate
case maps:is_key(MsgId, State#state.received) of
true ->
%% Already broadcast - ignore
{noreply, State};
false ->
%% Store message locally with HLC timestamp
HLC = barrel_p2p_hlc:now(),
Received = maps:put(MsgId, {{Tag, Payload}, HLC}, State#state.received),
%% Deliver to local subscribers
deliver_to_subscribers({Tag, Payload}, State#state.subscribers),
%% Send GOSSIP to eager peers, IHAVE to lazy peers
State2 = send_gossip(MsgId, Tag, Payload, node(), State#state.eager_peers, State),
State3 = send_ihaves(MsgId, State#state.lazy_peers, State2),
{noreply, State3#state{received = Received}}
end;
handle_cast(_Msg, State) ->
{noreply, State}.
%% Receive GOSSIP message
handle_info({?PLUMTREE_TAG, {gossip, MsgId, Tag, Payload, Sender}}, State) ->
barrel_p2p_metrics:gossip_received(Sender),
State2 = State#state{gossip_received = State#state.gossip_received + 1},
case maps:is_key(MsgId, State2#state.received) of
true ->
%% Duplicate - send PRUNE to convert sender to lazy
send_prune(Sender, State2),
{noreply, State2#state{prune_sent = State2#state.prune_sent + 1}};
false ->
%% New message - store, deliver, and forward
HLC = barrel_p2p_hlc:now(),
Received = maps:put(MsgId, {{Tag, Payload}, HLC}, State2#state.received),
%% Cancel any pending IHAVE timer for this message
State3 = cancel_pending_ihave(MsgId, State2#state{received = Received}),
%% Deliver to local subscribers
deliver_to_subscribers({Tag, Payload}, State3#state.subscribers),
%% Forward to peers (excluding sender)
EagerPeers = State3#state.eager_peers -- [Sender],
LazyPeers = State3#state.lazy_peers -- [Sender],
State4 = send_gossip(MsgId, Tag, Payload, node(), EagerPeers, State3),
State5 = send_ihaves(MsgId, LazyPeers, State4),
%% Ensure sender is in eager peers
State6 = ensure_eager(Sender, State5),
{noreply, State6}
end;
%% Receive IHAVE notification
handle_info({?PLUMTREE_TAG, {ihave, MsgId, Sender}}, State) ->
case maps:is_key(MsgId, State#state.received) of
true ->
%% Already have it - ignore
{noreply, State};
false ->
%% Don't have it - schedule GRAFT request
case maps:is_key(MsgId, State#state.pending_ihaves) of
true ->
%% Already waiting for this message
{noreply, State};
false ->
%% Start timer to request via GRAFT
TimerRef = erlang:send_after(
?IHAVE_TIMEOUT, self(), {graft_timeout, MsgId, Sender}
),
Pending = maps:put(MsgId, {Sender, TimerRef}, State#state.pending_ihaves),
{noreply, State#state{pending_ihaves = Pending}}
end
end;
%% GRAFT timeout - request the message
handle_info({graft_timeout, MsgId, Sender}, State) ->
case maps:is_key(MsgId, State#state.received) of
true ->
%% Already received via another path
Pending = maps:remove(MsgId, State#state.pending_ihaves),
{noreply, State#state{pending_ihaves = Pending}};
false ->
%% Still missing - send GRAFT request
send_graft(MsgId, Sender),
Pending = maps:remove(MsgId, State#state.pending_ihaves),
{noreply, State#state{
pending_ihaves = Pending,
graft_sent = State#state.graft_sent + 1
}}
end;
%% Receive GRAFT request
handle_info({?PLUMTREE_TAG, {graft, MsgId, Sender}}, State) ->
%% Move sender to eager peers and send the message
State2 = ensure_eager(Sender, State),
case maps:get(MsgId, State2#state.received, undefined) of
{{Tag, Payload}, _Time} ->
send_gossip(MsgId, Tag, Payload, node(), [Sender], State2),
{noreply, State2};
undefined ->
%% Don't have it anymore
{noreply, State2}
end;
%% Receive PRUNE request
handle_info({?PLUMTREE_TAG, {prune, Sender}}, State) ->
%% Move sender from eager to lazy
State2 = move_to_lazy(Sender, State),
{noreply, State2};
%% HyParView events
handle_info({barrel_p2p_event, {peer_up, Node}}, State) ->
%% New peer joins - add to eager peers
State2 = ensure_eager(Node, State),
{noreply, State2};
handle_info({barrel_p2p_event, {peer_down, Node, _Reason}}, State) ->
%% Peer left - remove from all lists. barrel_p2p_hyparview emits the
%% 3-tuple form unconditionally; matching only on the 2-tuple let
%% the broadcast tree keep dead peers in eager/lazy.
EagerPeers = State#state.eager_peers -- [Node],
LazyPeers = State#state.lazy_peers -- [Node],
{noreply, State#state{eager_peers = EagerPeers, lazy_peers = LazyPeers}};
%% Re-subscribe if a watched source (hyparview events) restarted.
handle_info({barrel_p2p_source_monitor, retry, Source}, State) ->
{noreply, State#state{
watch = barrel_p2p_source_monitor:retry(Source, State#state.watch)
}};
%% Source restart, or a subscriber, going down.
handle_info({'DOWN', Ref, process, Pid, _Reason}, State) ->
case barrel_p2p_source_monitor:down(Ref, State#state.watch) of
{down, _Source, Watch} ->
{noreply, State#state{watch = Watch}};
ignore ->
case maps:get(Pid, State#state.subscribers, undefined) of
Ref ->
Subs = maps:remove(Pid, State#state.subscribers),
{noreply, State#state{subscribers = Subs}};
_ ->
{noreply, State}
end
end;
%% Periodic cleanup using HLC wall time
handle_info(cleanup, State) ->
NowWall = barrel_p2p_hlc:wall_time(barrel_p2p_hlc:now()),
Cutoff = NowWall - ?MESSAGE_TTL,
%% Remove old messages based on HLC wall time
Received = maps:filter(
fun(_MsgId, {_Payload, HLC}) ->
barrel_p2p_hlc:wall_time(HLC) > Cutoff
end,
State#state.received
),
erlang:send_after(?CLEANUP_INTERVAL, self(), cleanup),
{noreply, State#state{received = Received}};
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
%%====================================================================
%% Internal Functions
%%====================================================================
generate_msg_id() ->
%% Unique message ID using HLC for causal ordering
HLC = barrel_p2p_hlc:now(),
Wall = barrel_p2p_hlc:wall_time(HLC),
Logical = barrel_p2p_hlc:logical(HLC),
Rand = rand:uniform(16#FFFFFFFF),
Term = {node(), Wall, Logical, Rand},
crypto:hash(sha256, term_to_binary(Term)).
send_gossip(MsgId, Tag, Payload, Origin, Peers, State) ->
Msg = {?PLUMTREE_TAG, {gossip, MsgId, Tag, Payload, Origin}},
lists:foreach(
fun(Peer) ->
erlang:send({?SERVER, Peer}, Msg, [nosuspend])
end,
Peers
),
barrel_p2p_metrics:gossip_sent(length(Peers)),
State#state{gossip_sent = State#state.gossip_sent + length(Peers)}.
send_ihaves(MsgId, Peers, State) ->
Msg = {?PLUMTREE_TAG, {ihave, MsgId, node()}},
lists:foreach(
fun(Peer) ->
erlang:send({?SERVER, Peer}, Msg, [nosuspend])
end,
Peers
),
barrel_p2p_metrics:ihave_sent(length(Peers)),
State#state{ihave_sent = State#state.ihave_sent + length(Peers)}.
send_graft(MsgId, Peer) ->
Msg = {?PLUMTREE_TAG, {graft, MsgId, node()}},
erlang:send({?SERVER, Peer}, Msg, [nosuspend]),
barrel_p2p_metrics:graft_sent(Peer).
send_prune(Peer, _State) ->
Msg = {?PLUMTREE_TAG, {prune, node()}},
erlang:send({?SERVER, Peer}, Msg, [nosuspend]),
barrel_p2p_metrics:prune_sent(Peer).
ensure_eager(Node, State) when Node =:= node() ->
State;
ensure_eager(Node, State) ->
case lists:member(Node, State#state.eager_peers) of
true ->
State;
false ->
LazyPeers = State#state.lazy_peers -- [Node],
EagerPeers = [Node | State#state.eager_peers],
State#state{eager_peers = EagerPeers, lazy_peers = LazyPeers}
end.
move_to_lazy(Node, State) when Node =:= node() ->
State;
move_to_lazy(Node, State) ->
case lists:member(Node, State#state.eager_peers) of
true ->
EagerPeers = State#state.eager_peers -- [Node],
LazyPeers = [Node | State#state.lazy_peers],
State#state{eager_peers = EagerPeers, lazy_peers = LazyPeers};
false ->
State
end.
cancel_pending_ihave(MsgId, State) ->
case maps:take(MsgId, State#state.pending_ihaves) of
{{_Sender, TimerRef}, Pending} ->
erlang:cancel_timer(TimerRef),
State#state{pending_ihaves = Pending};
error ->
State
end.
deliver_to_subscribers(Message, Subscribers) ->
maps:foreach(
fun(Pid, _Ref) ->
Pid ! {plumtree_broadcast, Message}
end,
Subscribers
).