%%% -*- erlang -*-
%%% Copyright (c) 2026 Benoit Chesneau
%%% SPDX-License-Identifier: Apache-2.0
%%%
-module(barrel_p2p_hyparview).
-behaviour(gen_server).
-include("barrel_p2p.hrl").
%% API
-export([start_link/1, join/1, leave/0]).
-export([active_view/0, passive_view/0]).
-export([peer_connected/2, peer_disconnected/2, peer_failed/2]).
-export([initiate_shuffle/2]).
%% Churn handling API
-export([get_churn_stats/0, cleanup_passive_view/0]).
%% Test-support: hold a partition by refusing overlay links to given nodes.
-export([block_peers/1, unblock_peers/0]).
%% Protocol handlers (called by barrel_p2p_protocol)
-export([handle_msg/2]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
%% Test exports - internal functions exposed for unit testing
-ifdef(TEST).
-export([
record_churn_event/2,
maybe_reset_churn_window/2,
record_peer_failure/2,
do_cleanup_passive_view/1,
find_eligible_passive_peer/2,
is_backoff_expired/2,
last_seen_cmp/2,
make_peer/1,
add_to_passive/3
]).
-endif.
-define(SERVER, ?MODULE).
%%====================================================================
%% API
%%====================================================================
start_link(Config) ->
gen_server:start_link({local, ?SERVER}, ?MODULE, Config, []).
%% @doc Test-support: refuse overlay links to `Nodes' - drop them from the
%% active/passive views, stop gossiping to them, and never (re)connect them -
%% until `unblock_peers/0'. Holds a partition that `disconnect_node' alone
%% cannot (HyParView would re-dial via passive promotion). Inert in normal
%% operation: nothing calls it and the blocked set defaults empty.
-spec block_peers([node()]) -> ok.
block_peers(Nodes) ->
gen_server:call(?SERVER, {block_peers, Nodes}).
%% @doc Test-support: clear the blocked set (heal the partition).
-spec unblock_peers() -> ok.
unblock_peers() ->
gen_server:call(?SERVER, unblock_peers).
-spec join(node()) -> ok | {error, term()}.
join(ContactNode) ->
gen_server:call(?SERVER, {join, ContactNode}).
-spec leave() -> ok.
leave() ->
gen_server:call(?SERVER, leave).
-spec active_view() -> [node()].
active_view() ->
gen_server:call(?SERVER, active_view).
-spec passive_view() -> [node()].
passive_view() ->
gen_server:call(?SERVER, passive_view).
%% Called by bridge when Erlang connection established/lost
-spec peer_connected(node(), term()) -> ok.
peer_connected(Node, DHandle) ->
gen_server:cast(?SERVER, {peer_connected, Node, DHandle}).
-spec peer_disconnected(node(), term()) -> ok.
peer_disconnected(Node, Reason) ->
gen_server:cast(?SERVER, {peer_disconnected, Node, Reason}).
-spec peer_failed(node(), term()) -> ok.
peer_failed(Node, Reason) ->
gen_server:cast(?SERVER, {peer_failed, Node, Reason}).
%% Called by shuffle timer
-spec initiate_shuffle(node(), pos_integer()) -> ok.
initiate_shuffle(Target, ShuffleLength) ->
gen_server:cast(?SERVER, {initiate_shuffle, Target, ShuffleLength}).
%% Churn stats for adaptive shuffle
-spec get_churn_stats() -> {Joins :: non_neg_integer(), Leaves :: non_neg_integer()}.
get_churn_stats() ->
gen_server:call(?SERVER, get_churn_stats).
%% Trigger passive view cleanup (called by cleanup worker)
-spec cleanup_passive_view() -> ok.
cleanup_passive_view() ->
gen_server:cast(?SERVER, cleanup_passive_view).
%% Called by barrel_p2p_protocol when HyParView message received
-spec handle_msg(hyparview_msg(), node()) -> ok.
handle_msg(Msg, From) ->
gen_server:cast(?SERVER, {protocol_msg, Msg, From}).
%%====================================================================
%% gen_server callbacks
%%====================================================================
init(Config) ->
Self = #peer{
id = node(),
address = get_self_address(),
port = get_self_port(),
connected = true,
priority = high
},
Now = erlang:monotonic_time(millisecond),
State = #view_state{
active_size = maps:get(active_size, Config, 5),
passive_size = maps:get(passive_size, Config, 30),
arwl = maps:get(arwl, Config, 6),
prwl = maps:get(prwl, Config, 3),
shuffle_length = maps:get(shuffle_length, Config, 8),
shuffle_period = maps:get(shuffle_period, Config, 10000),
self = Self,
%% Churn handling
max_fail_count = maps:get(max_fail_count, Config, 5),
base_backoff_ms = maps:get(base_backoff_ms, Config, 1000),
passive_max_age_ms = maps:get(passive_max_age_ms, Config, 300000),
churn_window_ms = maps:get(churn_window_ms, Config, 30000),
churn_window_start = Now
},
{ok, State}.
handle_call({join, ContactNode}, _From, State) ->
case ContactNode =:= node() of
true ->
{reply, {error, cannot_join_self}, State};
false ->
case is_blocked(ContactNode, State) of
true ->
{reply, ok, State};
false ->
Pending = insert_pending(
ContactNode, join, State#view_state.pending
),
barrel_p2p_bridge:request_connect(ContactNode),
{reply, ok, State#view_state{pending = Pending}}
end
end;
handle_call(leave, _From, State) ->
Self = State#view_state.self,
%% HyParView-level leave: tell every active peer we no longer
%% participate in gossip. Dist channels stay up; barrel_p2p_dist_gc
%% will reap them once they go idle and carry no live user streams.
maps:foreach(
fun(Node, _Peer) ->
barrel_p2p_protocol:send(Node, {disconnect, Self})
end,
State#view_state.active_view
),
barrel_p2p_hyparview_events:notify(left),
{reply, ok, State#view_state{active_view = #{}, passive_view = #{}}};
handle_call({block_peers, Nodes}, _From, State) ->
%% Drop the blocked nodes from both views and notify peer_down so
%% Plumtree stops gossiping to them. The guards (is_blocked/2) keep
%% them out until unblock_peers/0.
[
barrel_p2p_hyparview_events:notify({peer_down, N, blocked})
|| N <- Nodes, maps:is_key(N, State#view_state.active_view)
],
Blocked = lists:usort(Nodes ++ State#view_state.blocked),
State1 = State#view_state{
blocked = Blocked,
active_view = maps:without(Nodes, State#view_state.active_view),
passive_view = maps:without(Nodes, State#view_state.passive_view)
},
{reply, ok, State1};
handle_call(unblock_peers, _From, State) ->
{reply, ok, State#view_state{blocked = []}};
handle_call(active_view, _From, State) ->
{reply, maps:keys(State#view_state.active_view), State};
handle_call(passive_view, _From, State) ->
{reply, maps:keys(State#view_state.passive_view), State};
handle_call(get_churn_stats, _From, State) ->
%% Reset window if expired
Now = erlang:monotonic_time(millisecond),
State1 = maybe_reset_churn_window(Now, State),
{reply, {State1#view_state.recent_joins, State1#view_state.recent_leaves}, State1};
handle_call(_Request, _From, State) ->
{reply, {error, unknown_request}, State}.
handle_cast({peer_connected, Node, _DHandle}, State) ->
case maps:take(Node, State#view_state.pending) of
{{join, _Ref, TimerRef}, NewPending} ->
%% Initial join - send JOIN message
cancel_pending_timer(TimerRef),
Peer = make_peer(Node),
Active = maps:put(Node, Peer, State#view_state.active_view),
barrel_p2p_protocol:send(Node, {join, State#view_state.self}),
barrel_p2p_hyparview_events:notify({peer_up, Node}),
{noreply, State#view_state{active_view = Active, pending = NewPending}};
{{connect, _Ref, TimerRef}, NewPending} ->
%% Regular connect from passive promotion
cancel_pending_timer(TimerRef),
Peer = make_peer(Node),
State1 = add_to_active_view(Peer, State#view_state{pending = NewPending}),
barrel_p2p_hyparview_events:notify({peer_up, Node}),
{noreply, State1};
error ->
%% Incoming connection or unknown
case maps:is_key(Node, State#view_state.active_view) of
true ->
%% Already in active view
{noreply, State};
false ->
Peer = make_peer(Node),
State1 = add_to_active_view(Peer, State),
barrel_p2p_hyparview_events:notify({peer_up, Node}),
{noreply, State1}
end
end;
handle_cast({peer_disconnected, Node, Reason}, State) ->
handle_peer_removal(Node, Reason, graceful, State);
handle_cast({peer_failed, Node, Reason}, State) ->
handle_peer_removal(Node, Reason, failed, State);
handle_cast(cleanup_passive_view, State) ->
State1 = do_cleanup_passive_view(State),
{noreply, State1};
handle_cast({initiate_shuffle, Target, ShuffleLength}, State) ->
case maps:is_key(Target, State#view_state.active_view) of
true ->
Peers = random_peers(State, ShuffleLength),
TTL = State#view_state.prwl,
Self = State#view_state.self,
barrel_p2p_protocol:send(Target, {shuffle, TTL, Peers, Self}),
{noreply, State};
false ->
{noreply, State}
end;
handle_cast({protocol_msg, {join, Sender}, _From}, State) ->
State1 = handle_join(Sender, State),
{noreply, State1};
handle_cast({protocol_msg, {forward_join, NewPeer, TTL, Sender}, _From}, State) ->
State1 = handle_forward_join(NewPeer, TTL, Sender, State),
{noreply, State1};
handle_cast({protocol_msg, {disconnect, Sender}, _From}, State) ->
State1 = handle_disconnect(Sender, State),
{noreply, State1};
handle_cast({protocol_msg, {neighbor, Priority, Sender}, _From}, State) ->
State1 = handle_neighbor(Priority, Sender, State),
{noreply, State1};
handle_cast({protocol_msg, {neighbor_reply, Accept, Sender}, _From}, State) ->
State1 = handle_neighbor_reply(Accept, Sender, State),
{noreply, State1};
handle_cast({protocol_msg, {shuffle, TTL, Peers, Sender}, _From}, State) ->
State1 = handle_shuffle(TTL, Peers, Sender, State),
{noreply, State1};
handle_cast({protocol_msg, {shuffle_reply, Peers, _Sender}, _From}, State) ->
State1 = handle_shuffle_reply(Peers, State),
{noreply, State1};
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({pending_timeout, Node}, State) ->
%% Backstop: a peer never produced peer_connected or peer_failed.
%% Drop the pending entry so it doesn't leak across long uptime.
case maps:take(Node, State#view_state.pending) of
{_Entry, NewPending} ->
barrel_p2p_metrics:pending_timeout(Node),
{noreply, State#view_state{pending = NewPending}};
error ->
{noreply, State}
end;
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
%%====================================================================
%% Pending-entry helpers
%%====================================================================
%% Default backstop for pending entries. Backstop must be longer
%% than `auth_handshake_timeout' (default 10s) so the happy-path
%% peer_connected/peer_failed reach us first.
-define(DEFAULT_PENDING_TIMEOUT_MS, 30000).
%% Insert a `{Kind, Ref, TimerRef}' entry into the pending map.
insert_pending(Node, Kind, Pending) ->
Ref = make_ref(),
Timeout = application:get_env(
barrel_p2p, pending_timeout_ms, ?DEFAULT_PENDING_TIMEOUT_MS
),
TimerRef = erlang:send_after(Timeout, self(), {pending_timeout, Node}),
maps:put(Node, {Kind, Ref, TimerRef}, Pending).
pending_timer({_Kind, _Ref, TimerRef}) -> TimerRef;
pending_timer(_) -> undefined.
cancel_pending_timer(TimerRef) when is_reference(TimerRef) ->
_ = erlang:cancel_timer(TimerRef),
ok;
cancel_pending_timer(_) ->
ok.
%%====================================================================
%% HyParView Protocol Handlers
%%====================================================================
handle_join(Sender, State) ->
%% Track churn event
State0 = record_churn_event(join, State),
%% Add sender to active view
State1 = add_to_active_view(Sender, State0),
%% Surface the new peer to subscribers (plumtree's eager_peers list,
%% the replica drivers, etc). Without this, broadcast trees miss any
%% peer that joined via the JOIN handshake.
case maps:is_key(Sender#peer.id, State#view_state.active_view) of
true ->
%% Already had them; no transition.
ok;
false ->
barrel_p2p_hyparview_events:notify({peer_up, Sender#peer.id})
end,
%% Forward join to all active peers (except sender)
TTL = State1#view_state.arwl,
Self = State1#view_state.self,
maps:foreach(
fun
(Node, _P) when Node =/= Sender#peer.id ->
barrel_p2p_protocol:send(Node, {forward_join, Sender, TTL, Self});
(_, _) ->
ok
end,
State1#view_state.active_view
),
State1.
handle_forward_join(NewPeer, 0, _Sender, State) ->
%% TTL expired - add to passive view if not self
case NewPeer#peer.id =:= node() of
true ->
State;
false ->
Passive = add_to_passive(
NewPeer,
State#view_state.passive_view,
State#view_state.passive_size
),
State#view_state{passive_view = Passive}
end;
handle_forward_join(NewPeer, _TTL, _Sender, State) when NewPeer#peer.id =:= node() ->
%% Ignore forward_join for self
State;
handle_forward_join(NewPeer, TTL, Sender, State) ->
ActiveSize = maps:size(State#view_state.active_view),
PRWL = State#view_state.prwl,
%% If TTL = PRWL, add to passive view
State1 =
case TTL =:= PRWL of
true ->
Passive = add_to_passive(
NewPeer,
State#view_state.passive_view,
State#view_state.passive_size
),
State#view_state{passive_view = Passive};
false ->
State
end,
case
ActiveSize < State1#view_state.active_size andalso
not is_blocked(NewPeer#peer.id, State1)
of
true ->
%% Room in active view - connect to new peer
Pending = insert_pending(
NewPeer#peer.id, connect, State1#view_state.pending
),
barrel_p2p_bridge:request_connect(NewPeer#peer.id),
State1#view_state{pending = Pending};
false ->
%% Forward to random active peer
Exclude = [Sender#peer.id, NewPeer#peer.id],
case random_active_peer(State1#view_state.active_view, Exclude) of
{ok, Target} ->
Self = State1#view_state.self,
barrel_p2p_protocol:send(Target, {forward_join, NewPeer, TTL - 1, Self});
none ->
ok
end,
State1
end.
handle_disconnect(Sender, State) ->
case maps:is_key(Sender#peer.id, State#view_state.active_view) of
true ->
%% Track churn event
State0 = record_churn_event(leave, State),
Active = maps:remove(Sender#peer.id, State0#view_state.active_view),
Passive = add_to_passive(
Sender,
State0#view_state.passive_view,
State0#view_state.passive_size
),
barrel_p2p_hyparview_events:notify({peer_down, Sender#peer.id, graceful}),
State1 = State0#view_state{active_view = Active, passive_view = Passive},
maybe_promote_passive(State1);
false ->
State
end.
handle_neighbor(Priority, Sender, State) ->
ActiveSize = maps:size(State#view_state.active_view),
Accept =
case Priority of
high -> true;
low -> ActiveSize < State#view_state.active_size
end,
barrel_p2p_protocol:send(Sender#peer.id, {neighbor_reply, Accept, State#view_state.self}),
case Accept of
true ->
add_to_active_view(Sender, State);
false ->
State
end.
handle_neighbor_reply(true, Sender, State) ->
add_to_active_view(Sender, State);
handle_neighbor_reply(false, _Sender, State) ->
%% Rejected - try another passive peer
maybe_promote_passive(State).
handle_shuffle(TTL, Peers, Sender, State) ->
%% Filter out self from received peers
FilteredPeers = [P || P <- Peers, P#peer.id =/= node()],
%% Add received peers to passive view
Passive = lists:foldl(
fun(P, Acc) ->
case maps:is_key(P#peer.id, State#view_state.active_view) of
true -> Acc;
false -> add_to_passive(P, Acc, State#view_state.passive_size)
end
end,
State#view_state.passive_view,
FilteredPeers
),
%% Send reply with our random peers
ReplyPeers = random_peers(State, length(FilteredPeers)),
barrel_p2p_protocol:send(Sender#peer.id, {shuffle_reply, ReplyPeers, State#view_state.self}),
case TTL > 0 of
true ->
%% Forward to random active peer
Exclude = [Sender#peer.id],
case random_active_peer(State#view_state.active_view, Exclude) of
{ok, Target} ->
Self = State#view_state.self,
barrel_p2p_protocol:send(Target, {shuffle, TTL - 1, FilteredPeers, Self});
none ->
ok
end;
false ->
ok
end,
State#view_state{passive_view = Passive}.
handle_shuffle_reply(Peers, State) ->
%% Filter out self and nodes already in active view
FilteredPeers = [
P
|| P <- Peers,
P#peer.id =/= node(),
not maps:is_key(P#peer.id, State#view_state.active_view)
],
Passive = lists:foldl(
fun(P, Acc) ->
add_to_passive(P, Acc, State#view_state.passive_size)
end,
State#view_state.passive_view,
FilteredPeers
),
State#view_state{passive_view = Passive}.
%%====================================================================
%% Internal Functions
%%====================================================================
handle_peer_removal(Node, Reason, Type, State) ->
%% Track churn event
State0 = record_churn_event(leave, State),
case maps:is_key(Node, State0#view_state.active_view) of
true ->
Active = maps:remove(Node, State0#view_state.active_view),
State1 =
case Type of
graceful ->
Peer = maps:get(Node, State0#view_state.active_view),
Passive = add_to_passive(
Peer,
State0#view_state.passive_view,
State0#view_state.passive_size
),
State0#view_state{active_view = Active, passive_view = Passive};
failed ->
%% Track failure in passive view if present
Passive = record_peer_failure(Node, State0),
State0#view_state{active_view = Active, passive_view = Passive}
end,
barrel_p2p_hyparview_events:notify({peer_down, Node, Reason}),
State2 = maybe_promote_passive(State1),
{noreply, State2};
false ->
%% Check if this was a pending connection that failed
case maps:take(Node, State0#view_state.pending) of
{{neighbor, _Ref, TimerRef}, NewPending} ->
%% Failed neighbor request - track failure and try another
cancel_pending_timer(TimerRef),
Passive = record_peer_failure(Node, State0),
State1 = State0#view_state{pending = NewPending, passive_view = Passive},
State2 = maybe_promote_passive(State1),
{noreply, State2};
{Entry, NewPending} ->
cancel_pending_timer(pending_timer(Entry)),
Passive = record_peer_failure(Node, State0),
{noreply, State0#view_state{pending = NewPending, passive_view = Passive}};
error ->
{noreply, State0}
end
end.
make_peer(Node) ->
#peer{
id = Node,
address = undefined,
port = undefined,
connected = true,
priority = low,
last_seen = erlang:monotonic_time()
}.
add_to_active_view(Peer, State) ->
case is_blocked(Peer#peer.id, State) of
true ->
%% Refuse: a blocked peer must never enter the overlay.
State;
false ->
do_add_to_active_view(Peer, State)
end.
do_add_to_active_view(Peer, State) ->
Active = State#view_state.active_view,
case maps:size(Active) < State#view_state.active_size of
true ->
NewActive = maps:put(Peer#peer.id, Peer, Active),
State#view_state{active_view = NewActive};
false ->
%% Need to drop someone
Exclude = [Peer#peer.id],
{DroppedNode, DroppedPeer} = random_active_peer_pair(Active, Exclude),
%% Send HyParView-level disconnect: the peer learns we no
%% longer treat it as an active gossip peer. The dist channel
%% itself stays up; barrel_p2p_dist_gc reaps it later if it goes
%% idle and carries no live user streams.
barrel_p2p_protocol:send(DroppedNode, {disconnect, State#view_state.self}),
barrel_p2p_hyparview_events:notify({peer_down, DroppedNode, dropped}),
Active1 = maps:remove(DroppedNode, Active),
Active2 = maps:put(Peer#peer.id, Peer, Active1),
%% Move dropped to passive
Passive = add_to_passive(
DroppedPeer,
State#view_state.passive_view,
State#view_state.passive_size
),
State#view_state{active_view = Active2, passive_view = Passive}
end.
add_to_passive(Peer, Passive, MaxSize) ->
%% Don't add self to passive view
case Peer#peer.id =:= node() of
true ->
Passive;
false ->
case maps:is_key(Peer#peer.id, Passive) of
true ->
%% Update existing entry
maps:put(Peer#peer.id, Peer#peer{connected = false}, Passive);
false when map_size(Passive) >= MaxSize ->
%% Full, need to drop random
{ToRemove, _} = random_active_peer_pair(Passive, [Peer#peer.id]),
Passive1 = maps:remove(ToRemove, Passive),
maps:put(Peer#peer.id, Peer#peer{connected = false}, Passive1);
false ->
maps:put(Peer#peer.id, Peer#peer{connected = false}, Passive)
end
end.
maybe_promote_passive(State) ->
ActiveSize = maps:size(State#view_state.active_view),
case ActiveSize < State#view_state.active_size of
true ->
Now = erlang:monotonic_time(millisecond),
case find_eligible_passive_peer(State, Now) of
{ok, Node, _Peer} ->
Passive = maps:remove(Node, State#view_state.passive_view),
%% Send NEIGHBOR request with priority based on active view state
Priority =
case ActiveSize of
0 -> high;
_ -> low
end,
Pending = insert_pending(
Node, neighbor, State#view_state.pending
),
barrel_p2p_protocol:send(Node, {neighbor, Priority, State#view_state.self}),
barrel_p2p_bridge:request_connect(Node),
State#view_state{passive_view = Passive, pending = Pending};
none ->
State
end;
false ->
State
end.
%% Find a passive peer eligible for promotion (not in backoff, not too many
%% failures, not blocked).
find_eligible_passive_peer(State, Now) ->
MaxFails = State#view_state.max_fail_count,
Blocked = State#view_state.blocked,
Candidates = maps:to_list(State#view_state.passive_view),
Eligible = [
{N, P}
|| {N, P} <- Candidates,
P#peer.fail_count < MaxFails,
is_backoff_expired(P, Now),
not lists:member(N, Blocked)
],
case Eligible of
[] ->
none;
_ ->
%% Prefer recently seen peers (more likely to be alive)
Sorted = lists:sort(
fun({_, A}, {_, B}) ->
last_seen_cmp(A, B)
end,
Eligible
),
{Node, Peer} = hd(Sorted),
{ok, Node, Peer}
end.
%% Test-support: is this node currently blocked (partition held)? Always
%% false in normal operation (the blocked set defaults empty).
is_blocked(Node, #view_state{blocked = Blocked}) ->
lists:member(Node, Blocked).
%% Compare peers by last_seen (more recent first)
last_seen_cmp(#peer{last_seen = undefined}, #peer{last_seen = _}) -> false;
last_seen_cmp(#peer{last_seen = _}, #peer{last_seen = undefined}) -> true;
last_seen_cmp(#peer{last_seen = A}, #peer{last_seen = B}) -> A > B.
%% Check if backoff period has expired
is_backoff_expired(#peer{backoff_until = undefined}, _Now) -> true;
is_backoff_expired(#peer{backoff_until = Until}, Now) -> Now >= Until.
random_active_peer(Active, Exclude) ->
Candidates = maps:keys(Active) -- Exclude,
case Candidates of
[] -> none;
_ -> {ok, lists:nth(rand:uniform(length(Candidates)), Candidates)}
end.
random_active_peer_pair(Map, Exclude) ->
Candidates = maps:to_list(Map),
Filtered = [{K, V} || {K, V} <- Candidates, not lists:member(K, Exclude)],
case Filtered of
[] -> error(no_candidates);
_ -> lists:nth(rand:uniform(length(Filtered)), Filtered)
end.
random_peers(State, N) ->
Active = maps:values(State#view_state.active_view),
Passive = maps:values(State#view_state.passive_view),
All = [State#view_state.self | Active ++ Passive],
%% Shuffle and take N
Shuffled = [X || {_, X} <- lists:sort([{rand:uniform(), P} || P <- All])],
lists:sublist(Shuffled, N).
%%====================================================================
%% Churn Handling Functions
%%====================================================================
%% Record a churn event (join or leave)
record_churn_event(Type, State) ->
Now = erlang:monotonic_time(millisecond),
State1 = maybe_reset_churn_window(Now, State),
case Type of
join ->
State1#view_state{recent_joins = State1#view_state.recent_joins + 1};
leave ->
State1#view_state{recent_leaves = State1#view_state.recent_leaves + 1}
end.
%% Reset churn window if expired
maybe_reset_churn_window(Now, State) ->
WindowStart = State#view_state.churn_window_start,
WindowMs = State#view_state.churn_window_ms,
case WindowStart =:= undefined orelse (Now - WindowStart) > WindowMs of
true ->
State#view_state{
recent_joins = 0,
recent_leaves = 0,
churn_window_start = Now
};
false ->
State
end.
%% Record a peer failure in the passive view with exponential backoff
record_peer_failure(Node, State) ->
Passive = State#view_state.passive_view,
case maps:get(Node, Passive, undefined) of
undefined ->
Passive;
Peer ->
Now = erlang:monotonic_time(millisecond),
NewFailCount = Peer#peer.fail_count + 1,
MaxFails = State#view_state.max_fail_count,
case NewFailCount >= MaxFails of
true ->
%% Too many failures - remove from passive view
maps:remove(Node, Passive);
false ->
%% Calculate exponential backoff: base * 2^fail_count
Base = State#view_state.base_backoff_ms,
%% 2^fail_count
BackoffMs = Base * (1 bsl NewFailCount),
%% Cap at 5 minutes
CappedBackoff = min(BackoffMs, 300000),
UpdatedPeer = Peer#peer{
fail_count = NewFailCount,
backoff_until = Now + CappedBackoff
},
maps:put(Node, UpdatedPeer, Passive)
end
end.
%% Cleanup passive view - remove stale and too-failed entries
do_cleanup_passive_view(State) ->
Now = erlang:monotonic_time(millisecond),
MaxAge = State#view_state.passive_max_age_ms,
MaxFails = State#view_state.max_fail_count,
Passive = maps:filter(
fun(_Node, Peer) ->
%% Keep if: not too many failures AND (recently seen OR no last_seen)
FailOk = Peer#peer.fail_count < MaxFails,
AgeOk =
case Peer#peer.last_seen of
undefined -> true;
LastSeen -> (Now - LastSeen) < MaxAge
end,
FailOk andalso AgeOk
end,
State#view_state.passive_view
),
State#view_state{passive_view = Passive}.
get_self_address() ->
case application:get_env(barrel_p2p, address) of
{ok, Addr} -> Addr;
undefined -> {127, 0, 0, 1}
end.
get_self_port() ->
case application:get_env(barrel_p2p, listen_port) of
{ok, Port} -> Port;
undefined -> 0
end.