Skip to main content

src/barrel_p2p_dist_gc.erl

%%% -*- erlang -*-
%%% Copyright (c) 2026 Benoit Chesneau
%%% SPDX-License-Identifier: Apache-2.0
%%%
%%% Idle dist-channel garbage collector.
%%%
%%% Barrel P2P decouples Erlang dist channels from the HyParView active
%%% view: `Pid ! Msg' to any cluster node auto-connects on demand.
%%% Without bounded fan-out, every cross-cluster send leaves a channel
%%% open and the cluster drifts toward full mesh. This GC reaps idle
%%% channels so HyParView's O(log n) gossip topology actually bounds
%%% the natural connection count.
%%%
%%% Predicate (a node is reaped when ALL of these hold):
%%%   1. not in `barrel_p2p:active_view()' (HyParView would re-bind it
%%%      otherwise),
%%%   2. `quic_dist:list_streams(Node) =:= []' (no live user stream
%%%      rides this channel),
%%%   3. age >= `dist_gc_min_age_ms' (avoid reaping a channel right
%%%      after a brief send/receive that completed milliseconds ago).
%%%
%%% Always runs. Sweep period and min-age are tunable; the GC itself
%%% has no enable/disable env. Removing it would break the architectural
%%% claim of bounded fan-out, so it's not optional.

-module(barrel_p2p_dist_gc).
-behaviour(gen_server).

-export([start_link/0]).

%% Test/observability hooks
-export([sweep_now/0, get_age_ms/1]).

-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).

-define(SERVER, ?MODULE).
-define(AGES, barrel_p2p_dist_gc_ages).

-define(DEFAULT_SWEEP_PERIOD_MS, 60000).
-define(DEFAULT_MIN_AGE_MS, 300000).

-record(state, {
    sweep_period_ms :: pos_integer(),
    min_age_ms :: pos_integer(),
    timer_ref :: reference() | undefined
}).

%%====================================================================
%% API
%%====================================================================

start_link() ->
    gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).

%% Run a sweep synchronously. Useful for tests that don't want to wait
%% for the timer.
-spec sweep_now() -> ok.
sweep_now() ->
    gen_server:call(?SERVER, sweep_now).

%% Get how long Node has been visible on this dist (ms), or
%% `not_tracked' if we haven't seen a nodeup for it (e.g. the GC
%% started after the connection formed).
-spec get_age_ms(node()) -> pos_integer() | not_tracked.
get_age_ms(Node) ->
    case ets:lookup(?AGES, Node) of
        [{Node, Since}] ->
            erlang:monotonic_time(millisecond) - Since;
        [] ->
            not_tracked
    end.

%%====================================================================
%% gen_server callbacks
%%====================================================================

init([]) ->
    SweepPeriod = application:get_env(
        barrel_p2p,
        dist_gc_sweep_period_ms,
        ?DEFAULT_SWEEP_PERIOD_MS
    ),
    MinAge = application:get_env(
        barrel_p2p,
        dist_gc_min_age_ms,
        ?DEFAULT_MIN_AGE_MS
    ),
    ?AGES = ets:new(?AGES, [
        named_table,
        public,
        set,
        {read_concurrency, true}
    ]),
    %% Record any node already connected at boot (we may have started
    %% after the dist channel formed). They get the current time as
    %% their first-seen, which is conservative: they won't be reaped
    %% until they pass the min-age threshold from now.
    Now = erlang:monotonic_time(millisecond),
    [ets:insert(?AGES, {N, Now}) || N <- nodes()],
    net_kernel:monitor_nodes(true, [{node_type, visible}]),
    TRef = erlang:send_after(SweepPeriod, self(), sweep),
    {ok, #state{
        sweep_period_ms = SweepPeriod,
        min_age_ms = MinAge,
        timer_ref = TRef
    }}.

handle_call(sweep_now, _From, State) ->
    do_sweep(State),
    {reply, ok, State};
handle_call(_Req, _From, State) ->
    {reply, {error, unknown_request}, State}.

handle_cast(_Msg, State) ->
    {noreply, State}.

handle_info({nodeup, Node, _Info}, State) ->
    ets:insert_new(?AGES, {Node, erlang:monotonic_time(millisecond)}),
    {noreply, State};
handle_info({nodedown, Node, _Info}, State) ->
    ets:delete(?AGES, Node),
    {noreply, State};
handle_info(sweep, State = #state{sweep_period_ms = Period}) ->
    do_sweep(State),
    TRef = erlang:send_after(Period, self(), sweep),
    {noreply, State#state{timer_ref = TRef}};
handle_info(_Info, State) ->
    {noreply, State}.

terminate(_Reason, _State) ->
    catch net_kernel:monitor_nodes(false, [{node_type, visible}]),
    ok.

%%====================================================================
%% Internal
%%====================================================================

do_sweep(#state{min_age_ms = MinAge}) ->
    Active = active_view_safe(),
    Now = erlang:monotonic_time(millisecond),
    [maybe_reap(N, Active, MinAge, Now) || N <- nodes()],
    ok.

maybe_reap(Node, Active, MinAge, Now) ->
    case lists:member(Node, Active) of
        true ->
            ok;
        false ->
            case has_live_streams(Node) of
                true ->
                    ok;
                false ->
                    case is_old_enough(Node, MinAge, Now) of
                        false ->
                            ok;
                        true ->
                            _ = erlang:disconnect_node(Node),
                            ets:delete(?AGES, Node),
                            barrel_p2p_metrics:gc_reap(Node),
                            ok
                    end
            end
    end.

active_view_safe() ->
    %% barrel_p2p app may not be fully started yet during early-boot
    %% sweeps; treat as empty active view rather than crash.
    try
        barrel_p2p:active_view()
    catch
        _:_ -> []
    end.

has_live_streams(Node) ->
    try quic_dist:list_streams(Node) of
        [] -> false;
        L when is_list(L) -> true
    catch
        _:_ ->
            %% If we cannot ask, be conservative and keep the channel.
            true
    end.

is_old_enough(Node, MinAge, Now) ->
    case ets:lookup(?AGES, Node) of
        [{Node, Since}] -> (Now - Since) >= MinAge;
        [] -> false
    end.