%%% -*- erlang -*-
%%% Copyright (c) 2026 Benoit Chesneau
%%% SPDX-License-Identifier: Apache-2.0
%%%
-module(barrel_p2p_registry).
-behaviour(gen_server).
-behaviour(barrel_p2p_replica).
-include("barrel_p2p.hrl").
-include_lib("hlc/include/hlc.hrl").
%% Registered name of this feature's replication instance.
-define(REPLICA, barrel_p2p_registry_replica).
%% API
-export([start_link/0]).
-export([register_service/2, register_service/3, unregister_service/1]).
-export([lookup/1, lookup_local/1, list_services/0]).
-export([get_all_local/0, get_local_ormap/0]).
-export([overlay_lookup/1]).
-export([ensure_proxy/2, get_proxy/1]).
%% Internal API (used by sync)
-export([merge_remote/1, remove_node_entries/1, remove_entry/2]).
%% barrel_p2p_replica callbacks
-export([
replica_merge_delta/2,
replica_apply_full_sync/2,
replica_full_sync_snapshot/1,
replica_remove_node/2
]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
-define(SERVER, ?MODULE).
-record(state, {
%% Local services: {name, node} -> #service_entry{} (OR-Map)
local = barrel_p2p_ormap:new() :: barrel_p2p_ormap:ormap(),
%% Remote services: {name, node} -> #service_entry{} (OR-Map)
remote = barrel_p2p_ormap:new() :: barrel_p2p_ormap:ormap(),
%% Monitor refs: ref -> {name, pid}
monitors = #{} :: #{reference() => {atom() | binary(), pid()}}
}).
%%====================================================================
%% API
%%====================================================================
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
-spec register_service(atom() | binary(), map()) -> ok | {error, term()}.
register_service(Name, Meta) ->
gen_server:call(?SERVER, {register, Name, Meta}).
%% @doc Register a service with a specific pid (for via callbacks)
-spec register_service(atom() | binary(), pid(), map()) -> ok | {error, term()}.
register_service(Name, Pid, Meta) when is_pid(Pid) ->
gen_server:call(?SERVER, {register_pid, Name, Pid, Meta}).
-spec unregister_service(atom() | binary()) -> ok.
unregister_service(Name) ->
gen_server:call(?SERVER, {unregister, Name}).
-spec lookup(atom() | binary()) -> {ok, [#service_entry{}]} | {error, not_found}.
lookup(Name) ->
gen_server:call(?SERVER, {lookup, Name}).
-spec lookup_local(atom() | binary()) -> {ok, pid()} | {error, not_found}.
lookup_local(Name) ->
gen_server:call(?SERVER, {lookup_local, Name}).
-spec list_services() -> [atom() | binary()].
list_services() ->
gen_server:call(?SERVER, list_services).
-spec get_all_local() -> [#service_entry{}].
get_all_local() ->
gen_server:call(?SERVER, get_all_local).
%% Get the local OR-Map for full sync
-spec get_local_ormap() -> barrel_p2p_ormap:ormap().
get_local_ormap() ->
gen_server:call(?SERVER, get_local_ormap).
%% Lookup using overlay routing (when not found locally/remotely).
%% Normalises the router's `{found, Node, Pid}' shape to the contract
%% callers expect; any other router return is reported as not_found.
-spec overlay_lookup(atom() | binary()) -> {ok, node(), pid()} | {error, not_found}.
overlay_lookup(Name) ->
case barrel_p2p_router:find_service(Name) of
{found, Node, Pid} -> {ok, Node, Pid};
_ -> {error, not_found}
end.
%% Ensure a proxy exists for a remote service
-spec ensure_proxy(atom() | binary(), node()) -> {ok, pid()} | {error, term()}.
ensure_proxy(Name, TargetNode) ->
barrel_p2p_proxy_sup:start_proxy(Name, TargetNode).
%% Get existing proxy for a service
-spec get_proxy(atom() | binary()) -> {ok, pid()} | not_found.
get_proxy(Name) ->
barrel_p2p_proxy_sup:get_proxy(Name).
%% Internal API
%% Merge remote OR-Map delta
-spec merge_remote(barrel_p2p_ormap:ormap()) -> ok.
merge_remote(DeltaMap) ->
gen_server:cast(?SERVER, {merge_remote, DeltaMap}).
-spec remove_node_entries(node()) -> ok.
remove_node_entries(Node) ->
gen_server:cast(?SERVER, {remove_node, Node}).
-spec remove_entry(atom() | binary(), node()) -> ok.
remove_entry(Name, Node) ->
gen_server:cast(?SERVER, {remove_entry, Name, Node}).
%%====================================================================
%% gen_server callbacks
%%====================================================================
init([]) ->
{ok, #state{}}.
handle_call({register, Name, Meta}, {Pid, _}, State) ->
do_register(Name, Pid, Meta, State);
handle_call({register_pid, Name, Pid, Meta}, _From, State) ->
do_register(Name, Pid, Meta, State);
handle_call({unregister, Name}, _From, State) ->
Key = {Name, node()},
case barrel_p2p_ormap:get(Key, State#state.local) of
{ok, _Entry} ->
Local = barrel_p2p_ormap:remove(Key, State#state.local),
%% Find and remove monitor
MonitorRef = find_monitor_for_name(Name, State#state.monitors),
Monitors =
case MonitorRef of
undefined ->
State#state.monitors;
Ref ->
demonitor(Ref, [flush]),
maps:remove(Ref, State#state.monitors)
end,
%% Broadcast removal
barrel_p2p_replica:broadcast_update(?REPLICA, {remove, Key}),
%% Emit service event
barrel_p2p_service_events:notify({service_unregistered, Name, node()}),
{reply, ok, State#state{local = Local, monitors = Monitors}};
not_found ->
{reply, ok, State}
end;
handle_call({lookup, Name}, _From, State) ->
%% Collect entries from local and remote OR-Maps
LocalEntries = collect_entries_by_name(Name, State#state.local),
RemoteEntries = collect_entries_by_name(Name, State#state.remote),
case LocalEntries ++ RemoteEntries of
[] -> {reply, {error, not_found}, State};
Entries -> {reply, {ok, Entries}, State}
end;
handle_call({lookup_local, Name}, _From, State) ->
Key = {Name, node()},
case barrel_p2p_ormap:get(Key, State#state.local) of
not_found -> {reply, {error, not_found}, State};
{ok, #service_entry{pid = Pid}} -> {reply, {ok, Pid}, State}
end;
handle_call(list_services, _From, State) ->
LocalNames = [N || {N, _Node} <- barrel_p2p_ormap:keys(State#state.local)],
RemoteNames = [N || {N, _Node} <- barrel_p2p_ormap:keys(State#state.remote)],
{reply, lists:usort(LocalNames ++ RemoteNames), State};
handle_call(get_all_local, _From, State) ->
Entries = [E || {_Key, E} <- barrel_p2p_ormap:to_list(State#state.local)],
{reply, Entries, State};
handle_call(get_local_ormap, _From, State) ->
{reply, State#state.local, State};
handle_call(_Request, _From, State) ->
{reply, {error, unknown_request}, State}.
handle_cast({merge_remote, DeltaMap}, State) ->
%% Validate the wrapper (dots/HLCs) and the service-entry leaf before
%% absorbing the clock and merging, so a malformed peer delta cannot
%% crash this gen_server or the shared barrel_p2p_hlc server. Only valid
%% entries are absorbed/merged; the rest are dropped.
{Remote, _Accepted} = barrel_p2p_crdt_wire:ingest(
State#state.remote, DeltaMap, fun valid_service_entry/1
),
{noreply, State#state{remote = Remote}};
handle_cast({remove_node, Node}, State) ->
%% Remove all entries from the specified node
Remote = maps:filter(
fun({_Name, N}, _Entry) ->
N =/= Node
end,
State#state.remote
),
%% Invalidate routes to this node
barrel_p2p_router:invalidate_route(Node),
{noreply, State#state{remote = Remote}};
handle_cast({remove_entry, Name, Node}, State) ->
%% Remove specific entry from remote cache
Key = {Name, Node},
Remote = barrel_p2p_ormap:remove(Key, State#state.remote),
%% Invalidate route cache for this service
barrel_p2p_router:invalidate_route(Name),
{noreply, State#state{remote = Remote}};
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({'DOWN', Ref, process, Pid, Reason}, State) ->
case maps:take(Ref, State#state.monitors) of
{{Name, Pid}, Monitors} ->
Key = {Name, node()},
Local = barrel_p2p_ormap:remove(Key, State#state.local),
barrel_p2p_replica:broadcast_update(?REPLICA, {remove, Key}),
%% Emit service down event
barrel_p2p_service_events:notify({service_down, Name, node(), Reason}),
{noreply, State#state{local = Local, monitors = Monitors}};
error ->
{noreply, State}
end;
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
%%====================================================================
%% barrel_p2p_replica callbacks
%%====================================================================
%% Merge a peer's delta into the remote map, then surface the change
%% to service-event subscribers. A broadcast delta is single-key, so
%% the key's node is the originating node.
replica_merge_delta(_Name, Delta) ->
merge_remote(Delta),
%% Emit events only for entries that pass validation (the same set the
%% merge accepts); iterating the raw Delta would crash on a malformed
%% entry that does not match {value,_,_} | {tombstone,_}.
Accepted = barrel_p2p_crdt_wire:accept(Delta, fun valid_service_entry/1),
maps:foreach(
fun
({Name, Node}, {value, _Entry, _Dots}) ->
barrel_p2p_service_events:notify({service_registered, Name, Node});
({Name, Node}, {tombstone, _HLC}) ->
barrel_p2p_router:invalidate_route(Name),
barrel_p2p_service_events:notify({service_unregistered, Name, Node})
end,
Accepted
),
ok.
%% A full sync carries a peer's local map; merge it silently (no
%% per-entry events, matching the prior behaviour).
replica_apply_full_sync(_Name, RemoteORMap) ->
merge_remote(RemoteORMap),
ok.
replica_full_sync_snapshot(_Name) ->
ORMap = get_local_ormap(),
case barrel_p2p_ormap:is_empty(ORMap) of
true -> empty;
false -> {sync, ORMap}
end.
replica_remove_node(_Name, Node) ->
remove_node_entries(Node),
barrel_p2p_router:invalidate_route(Node),
ok.
%%====================================================================
%% Internal Functions
%%====================================================================
%% Leaf validator for gossiped service entries: a well-formed
%% #service_entry{}. Used by barrel_p2p_crdt_wire to reject malformed peer
%% values before they reach the OR-Map merge.
valid_service_entry(#service_entry{name = N, pid = P, node = Nd, meta = M}) when
(is_atom(N) orelse is_binary(N)) andalso
is_pid(P) andalso is_atom(Nd) andalso is_map(M)
->
true;
valid_service_entry(_) ->
false.
%% Common registration logic for both register and register_pid
do_register(Name, Pid, Meta, State) ->
Key = {Name, node()},
case barrel_p2p_ormap:get(Key, State#state.local) of
{ok, _} ->
{reply, {error, already_registered}, State};
not_found ->
Entry = #service_entry{name = Name, pid = Pid, node = node(), meta = Meta},
Local = barrel_p2p_ormap:add(Key, Entry, State#state.local),
Ref = monitor(process, Pid),
Monitors = maps:put(Ref, {Name, Pid}, State#state.monitors),
%% Broadcast delta via sync
barrel_p2p_replica:broadcast_update(?REPLICA, {add, Key, Entry}),
%% Emit service event
barrel_p2p_service_events:notify({service_registered, Name, node()}),
{reply, ok, State#state{local = Local, monitors = Monitors}}
end.
find_monitor_for_name(Name, Monitors) ->
case [Ref || {Ref, {N, _Pid}} <- maps:to_list(Monitors), N =:= Name] of
[Ref | _] -> Ref;
[] -> undefined
end.
%% Collect all entries matching a service name from an OR-Map
collect_entries_by_name(Name, ORMap) ->
[Entry || {{N, _Node}, Entry} <- barrel_p2p_ormap:to_list(ORMap), N =:= Name].