Skip to main content

src/barrel_p2p_router.erl

%%% -*- erlang -*-
%%% Copyright (c) 2026 Benoit Chesneau
%%% SPDX-License-Identifier: Apache-2.0
%%%
-module(barrel_p2p_router).
-behaviour(gen_server).

-include("barrel_p2p.hrl").
-include_lib("hlc/include/hlc.hrl").

%% API
-export([start_link/0]).
-export([find_route/1, find_service/1]).
-export([cache_route/2, invalidate_route/1, invalidate_all/0]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).

-define(SERVER, ?MODULE).
-define(ROUTE_CACHE, barrel_p2p_route_cache).
-define(ROUTE_TAG, '$barrel_p2p_route').

%% Route cache TTL (30 minutes in milliseconds)
-define(CACHE_TTL_MS, 1800000).

%% Default routing TTL (max hops)
-define(DEFAULT_TTL, 5).

%% Timeout for remote lookups (ms)
-define(LOOKUP_TIMEOUT, 5000).

-record(state, {
    in_flight = 0 :: non_neg_integer(),
    max_in_flight :: pos_integer()
}).

%% Cap on concurrent route_request handlers. A peer flood used to
%% spawn an unbounded number of helpers; the gen_server now drops
%% excess requests with a metric.
-define(DEFAULT_MAX_IN_FLIGHT, 256).

%% Default route-cache sweep period. Entries are also evicted lazily
%% by get_cached_route/1; the sweep is a backstop for keys that are
%% inserted once and never looked up again.
-define(DEFAULT_SWEEP_PERIOD_MS, 60000).

%%====================================================================
%% API
%%====================================================================

start_link() ->
    gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).

%% Find route to a target node through the overlay
-spec find_route(node()) -> {direct, node()} | {via, node()} | no_route.
find_route(Target) when Target =:= node() ->
    {direct, Target};
find_route(Target) ->
    ActiveView = barrel_p2p:active_view(),
    case lists:member(Target, ActiveView) of
        true ->
            {direct, Target};
        false ->
            case get_cached_route(Target) of
                {ok, ViaNode} ->
                    %% Verify cached route is still valid
                    case lists:member(ViaNode, ActiveView) of
                        true ->
                            {via, ViaNode};
                        false ->
                            invalidate_route(Target),
                            find_next_hop(Target, ActiveView)
                    end;
                not_found ->
                    find_next_hop(Target, ActiveView)
            end
    end.

%% Find a service by name through overlay routing
-spec find_service(atom() | binary()) -> {found, node(), pid()} | {error, term()}.
find_service(ServiceName) ->
    Req = #route_req{
        service_name = ServiceName,
        ttl = ?DEFAULT_TTL,
        origin = node(),
        visited = [node()]
    },
    route_lookup(Req).

%% Cache a successful route
-spec cache_route(atom() | binary(), node()) -> ok.
cache_route(ServiceName, ViaNode) ->
    gen_server:cast(?SERVER, {cache_route, ServiceName, ViaNode}).

%% Invalidate cached route for a service
-spec invalidate_route(atom() | binary() | node()) -> ok.
invalidate_route(Key) ->
    gen_server:cast(?SERVER, {invalidate, Key}).

%% Invalidate all cached routes
-spec invalidate_all() -> ok.
invalidate_all() ->
    gen_server:cast(?SERVER, invalidate_all).

%%====================================================================
%% gen_server callbacks
%%====================================================================

init([]) ->
    ?ROUTE_CACHE = ets:new(?ROUTE_CACHE, [
        named_table,
        public,
        set,
        {read_concurrency, true}
    ]),
    Max = application:get_env(
        barrel_p2p, router_max_in_flight, ?DEFAULT_MAX_IN_FLIGHT
    ),
    schedule_sweep(),
    {ok, #state{max_in_flight = Max}}.

handle_call(_Request, _From, State) ->
    {reply, {error, unknown_request}, State}.

handle_cast({cache_route, ServiceName, ViaNode}, State) ->
    HLC = barrel_p2p_hlc:now(),
    ets:insert(?ROUTE_CACHE, {ServiceName, ViaNode, HLC}),
    {noreply, State};
handle_cast({invalidate, Key}, State) ->
    ets:delete(?ROUTE_CACHE, Key),
    {noreply, State};
handle_cast(invalidate_all, State) ->
    ets:delete_all_objects(?ROUTE_CACHE),
    {noreply, State};
handle_cast(_Msg, State) ->
    {noreply, State}.

handle_info(
    {?ROUTE_TAG, {route_request, _Req, ReplyTo}},
    #state{in_flight = N, max_in_flight = Max} = State
) when N >= Max ->
    %% Shed load: refuse over-cap requests rather than spawn an
    %% unbounded helper per inbound message.
    barrel_p2p_metrics:router_request_dropped(),
    reply_dropped(ReplyTo),
    {noreply, State};
handle_info(
    {?ROUTE_TAG, {route_request, Req, ReplyTo}},
    #state{in_flight = N} = State
) ->
    {_Pid, _Ref} = spawn_monitor(
        fun() -> handle_route_request(Req, ReplyTo) end
    ),
    {noreply, State#state{in_flight = N + 1}};
handle_info(
    {'DOWN', _Ref, process, _Pid, _Reason},
    #state{in_flight = N} = State
) when N > 0 ->
    {noreply, State#state{in_flight = N - 1}};
handle_info(sweep_cache, State) ->
    sweep_cache(),
    schedule_sweep(),
    {noreply, State};
handle_info(_Info, State) ->
    {noreply, State}.

terminate(_Reason, _State) ->
    ok.

%%====================================================================
%% Internal Functions
%%====================================================================

%% Get cached route if fresh (uses HLC wall time for clock-skew-tolerant TTL)
get_cached_route(Key) ->
    case ets:lookup(?ROUTE_CACHE, Key) of
        [{_, ViaNode, CacheHLC}] ->
            CacheWall = barrel_p2p_hlc:wall_time(CacheHLC),
            NowWall = barrel_p2p_hlc:wall_time(barrel_p2p_hlc:now()),
            case NowWall - CacheWall < ?CACHE_TTL_MS of
                true ->
                    {ok, ViaNode};
                false ->
                    ets:delete(?ROUTE_CACHE, Key),
                    not_found
            end;
        [] ->
            not_found
    end.

%% Find next hop to forward to
find_next_hop(_Target, []) ->
    no_route;
find_next_hop(_Target, ActiveView) ->
    %% Random selection from active view
    NextHop = lists:nth(rand:uniform(length(ActiveView)), ActiveView),
    {via, NextHop}.

%% Route lookup through overlay
route_lookup(#route_req{ttl = 0}) ->
    {error, ttl_expired};
route_lookup(#route_req{service_name = Name, visited = Visited} = Req) ->
    %% Check local first
    case barrel_p2p_registry:lookup_local(Name) of
        {ok, Pid} ->
            {found, node(), Pid};
        {error, not_found} ->
            %% Forward to active peers
            ActiveView = barrel_p2p:active_view(),
            %% Exclude visited nodes and origin to prevent backtracking
            Candidates = ActiveView -- Visited,
            forward_to_peers(Req, Candidates)
    end.

%% Forward route request to peers
forward_to_peers(_Req, []) ->
    {error, not_found};
forward_to_peers(Req, Candidates) ->
    %% Try peers in random order
    Shuffled = shuffle(Candidates),
    forward_to_peers_sequential(Req, Shuffled).

forward_to_peers_sequential(_Req, []) ->
    {error, not_found};
forward_to_peers_sequential(Req, [Peer | Rest]) ->
    NewReq = Req#route_req{
        ttl = Req#route_req.ttl - 1,
        visited = [node() | Req#route_req.visited]
    },
    case send_route_request(Peer, NewReq) of
        {found, Node, Pid} ->
            %% Cache the route
            cache_route(Req#route_req.service_name, Peer),
            {found, Node, Pid};
        {error, _} ->
            forward_to_peers_sequential(Req, Rest)
    end.

%% Send route request to remote node
send_route_request(Node, Req) ->
    ReplyRef = make_ref(),
    ReplyTo = {self(), ReplyRef},
    erlang:send({?SERVER, Node}, {?ROUTE_TAG, {route_request, Req, ReplyTo}}, [noconnect]),
    receive
        {ReplyRef, Result} -> Result
    after ?LOOKUP_TIMEOUT ->
        {error, timeout}
    end.

%% Handle incoming route request (runs in spawned process)
handle_route_request(Req, {Caller, Ref}) ->
    Result = route_lookup(Req),
    %% Send reply back to caller
    erlang:send(Caller, {Ref, Result}, [noconnect]).

%% Tell the caller we shed the request. Caller treats it as a
%% lookup error and falls through to the next candidate.
reply_dropped({Caller, Ref}) ->
    erlang:send(Caller, {Ref, {error, overloaded}}, [noconnect]).

%% Shuffle a list randomly. Callers only pass non-empty lists.
shuffle(List) ->
    [X || {_, X} <- lists:sort([{rand:uniform(), E} || E <- List])].

%% Schedule the next periodic sweep of the route cache. A backstop
%% for entries that get inserted once and never re-read.
schedule_sweep() ->
    Period = application:get_env(
        barrel_p2p, route_cache_sweep_period_ms, ?DEFAULT_SWEEP_PERIOD_MS
    ),
    erlang:send_after(Period, self(), sweep_cache),
    ok.

%% Drop every entry whose wall-time age exceeds the cache TTL.
sweep_cache() ->
    NowWall = barrel_p2p_hlc:wall_time(barrel_p2p_hlc:now()),
    Expired = ets:foldl(
        fun({Key, _Via, CacheHLC}, Acc) ->
            case NowWall - barrel_p2p_hlc:wall_time(CacheHLC) >= ?CACHE_TTL_MS of
                true -> [Key | Acc];
                false -> Acc
            end
        end,
        [],
        ?ROUTE_CACHE
    ),
    lists:foreach(fun(K) -> ets:delete(?ROUTE_CACHE, K) end, Expired),
    ok.