%%% -*- erlang -*-
%%% Copyright (c) 2026 Benoit Chesneau
%%% SPDX-License-Identifier: Apache-2.0
%%%
-module(barrel_p2p_service_proxy).
-behaviour(gen_server).
%% Proxy that forwards messages to remote service through overlay
%% Registered with global for transparency: global:whereis_name returns proxy pid
-include("barrel_p2p.hrl").
%% API
-export([start_link/2]).
-export([relay/3, relay/4]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
-define(CALL_TIMEOUT, 10000).
-define(RELAY_TAG, '$barrel_p2p_relay').
%% Cap on concurrent overlay-cast helpers. Each cast through a
%% `{via, NextHop}' route used to spawn an unbounded process.
-define(DEFAULT_MAX_IN_FLIGHT, 32).
%% Maximum overlay hops a relayed call may traverse. Matches the
%% route-lookup TTL in barrel_p2p_router.
-define(DEFAULT_TTL, 5).
-record(state, {
name :: atom() | binary(),
target_node :: node(),
in_flight = 0 :: non_neg_integer(),
max_in_flight :: pos_integer(),
watch = #{} :: barrel_p2p_source_monitor:watch()
}).
%%====================================================================
%% API
%%====================================================================
-spec start_link(atom() | binary(), node()) -> {ok, pid()} | {error, term()}.
start_link(Name, TargetNode) ->
gen_server:start_link(?MODULE, [Name, TargetNode], []).
%% Relay a call through this node to the target. The 3-arity form
%% starts a fresh hop budget; the 4-arity form is what every relay
%% hop calls recursively, carrying the TTL and visited list.
-spec relay(atom() | binary(), node(), term()) -> term().
relay(Name, TargetNode, Request) ->
relay(
Name,
TargetNode,
Request,
#{ttl => ?DEFAULT_TTL, visited => [node()]}
).
-spec relay(atom() | binary(), node(), term(), map()) -> term().
relay(_Name, _TargetNode, _Request, #{ttl := TTL}) when TTL =< 0 ->
{error, ttl_expired};
relay(Name, TargetNode, Request, #{visited := Visited} = Ctx) ->
case barrel_p2p_router:find_route(TargetNode) of
{direct, _} ->
gen_server:call({Name, TargetNode}, Request, ?CALL_TIMEOUT);
{via, NextHop} ->
case lists:member(NextHop, Visited) of
true ->
{error, relay_loop};
false ->
NextCtx = Ctx#{
ttl => maps:get(ttl, Ctx) - 1,
visited => [node() | Visited]
},
rpc:call(
NextHop,
?MODULE,
relay,
[Name, TargetNode, Request, NextCtx],
?CALL_TIMEOUT
)
end;
no_route ->
{error, no_route}
end.
%%====================================================================
%% gen_server callbacks
%%====================================================================
init([Name, TargetNode]) ->
%% Subscribe to the service event bus so we hear when the remote
%% service dies. barrel_p2p:subscribe/1 routes to HyParView events,
%% which never carry service_* notifications. Keep the subscription
%% alive across a service-events restart.
Watch = barrel_p2p_source_monitor:start([barrel_p2p_service_events]),
Max = application:get_env(
barrel_p2p, proxy_cast_max_in_flight, ?DEFAULT_MAX_IN_FLIGHT
),
{ok, #state{
name = Name,
target_node = TargetNode,
max_in_flight = Max,
watch = Watch
}}.
handle_call(Request, _From, #state{name = Name, target_node = Target} = State) ->
Result = forward_call(Name, Target, Request),
{reply, Result, State}.
handle_cast(Request, #state{name = Name, target_node = Target} = State) ->
State1 = forward_cast(Name, Target, Request, State),
{noreply, State1}.
handle_info(
{barrel_p2p_service_event, {service_down, Name, _Node, _Reason}},
#state{name = Name} = State
) ->
%% Service died on remote node, proxy should terminate.
{stop, normal, State};
handle_info({barrel_p2p_service_event, _}, State) ->
%% Other service events are not relevant to this proxy.
{noreply, State};
%% Re-subscribe if a watched source (service events) restarted.
handle_info({barrel_p2p_source_monitor, retry, Source}, #state{watch = W} = State) ->
{noreply, State#state{watch = barrel_p2p_source_monitor:retry(Source, W)}};
handle_info(
{'DOWN', Ref, process, _Pid, _Reason},
#state{watch = W} = State
) ->
case barrel_p2p_source_monitor:down(Ref, W) of
{down, _Source, W1} ->
{noreply, State#state{watch = W1}};
ignore ->
case State#state.in_flight of
N when N > 0 -> {noreply, State#state{in_flight = N - 1}};
_ -> {noreply, State}
end
end;
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, #state{name = Name}) ->
%% Unregister from global if registered
catch global:unregister_name(Name),
ok.
%%====================================================================
%% Internal Functions
%%====================================================================
forward_call(Name, Target, Request) ->
case barrel_p2p_router:find_route(Target) of
{direct, _} ->
%% Direct connection exists
try
gen_server:call({Name, Target}, Request, ?CALL_TIMEOUT)
catch
exit:{noproc, _} -> {error, service_not_found};
exit:{{nodedown, _}, _} -> {error, node_down};
exit:{timeout, _} -> {error, timeout}
end;
{via, NextHop} ->
%% Route through overlay. Start the hop with a fresh
%% TTL and visited list seeded with our own node.
Ctx = #{ttl => ?DEFAULT_TTL, visited => [node()]},
case
rpc:call(
NextHop,
?MODULE,
relay,
[Name, Target, Request, Ctx],
?CALL_TIMEOUT
)
of
{badrpc, Reason} -> {error, Reason};
Result -> Result
end;
no_route ->
{error, no_route}
end.
forward_cast(Name, Target, Request, State) ->
case barrel_p2p_router:find_route(Target) of
{direct, _} ->
gen_server:cast({Name, Target}, Request),
State;
{via, NextHop} ->
spawn_overlay_cast(NextHop, Name, Target, Request, State);
no_route ->
State
end.
%% Fire-and-forget overlay cast, bounded by `max_in_flight'. Excess
%% casts are dropped with a metric.
spawn_overlay_cast(
_NextHop,
_Name,
_Target,
_Request,
#state{in_flight = N, max_in_flight = Max} = State
) when
N >= Max
->
barrel_p2p_metrics:proxy_cast_dropped(),
State;
spawn_overlay_cast(
NextHop,
Name,
Target,
Request,
#state{in_flight = N} = State
) ->
{_Pid, _Ref} = spawn_monitor(fun() ->
rpc:call(NextHop, gen_server, cast, [{Name, Target}, Request])
end),
State#state{in_flight = N + 1}.