Skip to main content

src/barrel_p2p_service_events.erl

%%% -*- erlang -*-
%%% Copyright (c) 2026 Benoit Chesneau
%%% SPDX-License-Identifier: Apache-2.0
%%%
-module(barrel_p2p_service_events).
-behaviour(gen_server).

%% API
-export([start_link/0]).
-export([notify/1, subscribe/1, subscribe/2, unsubscribe/1]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).

-define(SERVER, ?MODULE).

-record(state, {
    subscribers = #{} :: #{pid() => {reference(), filter()}}
}).

-type filter() :: all | {name, atom() | binary()} | {pattern, binary()}.

%%====================================================================
%% API
%%====================================================================

start_link() ->
    gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).

-spec notify(term()) -> ok.
notify(Event) ->
    gen_server:cast(?SERVER, {notify, Event}).

-spec subscribe(pid()) -> ok.
subscribe(Pid) ->
    subscribe(Pid, all).

-spec subscribe(pid(), filter()) -> ok.
subscribe(Pid, Filter) ->
    gen_server:call(?SERVER, {subscribe, Pid, Filter}).

-spec unsubscribe(pid()) -> ok.
unsubscribe(Pid) ->
    gen_server:call(?SERVER, {unsubscribe, Pid}).

%%====================================================================
%% gen_server callbacks
%%====================================================================

init([]) ->
    {ok, #state{}}.

handle_call({subscribe, Pid, Filter}, _From, State) ->
    case maps:is_key(Pid, State#state.subscribers) of
        true ->
            %% Update filter for existing subscriber
            {Ref, _OldFilter} = maps:get(Pid, State#state.subscribers),
            Subs = maps:put(Pid, {Ref, Filter}, State#state.subscribers),
            {reply, ok, State#state{subscribers = Subs}};
        false ->
            Ref = monitor(process, Pid),
            Subs = maps:put(Pid, {Ref, Filter}, State#state.subscribers),
            {reply, ok, State#state{subscribers = Subs}}
    end;
handle_call({unsubscribe, Pid}, _From, State) ->
    case maps:take(Pid, State#state.subscribers) of
        {{Ref, _Filter}, Subs} ->
            demonitor(Ref, [flush]),
            {reply, ok, State#state{subscribers = Subs}};
        error ->
            {reply, ok, State}
    end;
handle_call(_Request, _From, State) ->
    {reply, ok, State}.

handle_cast({notify, Event}, State) ->
    maps:foreach(
        fun(Pid, {_Ref, Filter}) ->
            case matches_filter(Event, Filter) of
                true -> Pid ! {barrel_p2p_service_event, Event};
                false -> ok
            end
        end,
        State#state.subscribers
    ),
    {noreply, State};
handle_cast(_Msg, State) ->
    {noreply, State}.

handle_info({'DOWN', Ref, process, Pid, _Reason}, State) ->
    case maps:get(Pid, State#state.subscribers, undefined) of
        {Ref, _Filter} ->
            Subs = maps:remove(Pid, State#state.subscribers),
            {noreply, State#state{subscribers = Subs}};
        _ ->
            {noreply, State}
    end;
handle_info(_Info, State) ->
    {noreply, State}.

terminate(_Reason, _State) ->
    ok.

%%====================================================================
%% Internal Functions
%%====================================================================

matches_filter(_Event, all) ->
    true;
matches_filter(Event, {name, Name}) ->
    get_event_name(Event) =:= Name;
matches_filter(Event, {pattern, Pattern}) ->
    case get_event_name(Event) of
        undefined ->
            false;
        Name ->
            NameBin = to_binary(Name),
            case re:run(NameBin, Pattern) of
                {match, _} -> true;
                nomatch -> false
            end
    end.

get_event_name({service_registered, Name, _Node}) -> Name;
get_event_name({service_unregistered, Name, _Node}) -> Name;
get_event_name({service_down, Name, _Node, _Reason}) -> Name;
get_event_name(_) -> undefined.

to_binary(Name) when is_atom(Name) -> atom_to_binary(Name, utf8);
to_binary(Name) when is_binary(Name) -> Name;
to_binary(Name) when is_list(Name) -> list_to_binary(Name).