%% @author Marc Worrell <marc@worrell.nl>
%% @copyright 2009-2017 Marc Worrell
%%
%% @doc Simple implementation of an observer/notifier. Relays events to observers of that event.
%% Also implements map and fold operations over the observers.
%% Copyright 2009-2017 Marc Worrell
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(zotonic_notifier_worker).
-author("Marc Worrell <marc@worrell.nl>").
-behaviour(gen_server).
-include_lib("kernel/include/logger.hrl").
%% gen_server exports
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-export([start_link/1]).
%% interface functions
-export([
start_tests/0,
observe/5,
detach/3,
detach_all/2,
get_observers/1,
get_observers/2,
notify_sync/4,
notify_async/4,
notify1/4,
first/4,
map/4,
foldl/5,
foldr/5,
await/4,
await_exact/4
]).
%% internal
-export([notify_observer/4]).
-define(TIMEOUT, 60000).
-record(state, {
name :: atom(),
observers :: ets:tab(), % { term(), [ {prio, zotonic_notifier:observer(), pid()} ] }
monitors :: map(), % pid() => reference()
pid2event :: map() % pid() => [ term() ]
}).
%%====================================================================
%% API
%%====================================================================
%% @doc Starts the notification server
-spec start_link(Name :: atom()) -> {ok, pid()} | {error, term()}.
start_link(Name) ->
gen_server:start_link({local, Name}, ?MODULE, Name, []).
%% @doc Start a notifier server for unit testing
-spec start_tests() -> {ok, pid()} | {error, term()}.
start_tests() ->
io:format("Starting notifier server.~n"),
start_link(test).
%%====================================================================
%% API for subscription
%%====================================================================
%% @doc Subscribe to an event. Observer is a {M,F} or pid()
-spec observe(zotonic_notifier:notifier(), zotonic_notifier:event(), zotonic_notifier:observer(),
pid(), integer()) -> ok | {error, term()}.
observe(Notifier, Event, Observer, OwnerPid, Prio) ->
gen_server:call(Notifier, {observe, Event, Observer, OwnerPid, Prio}, infinity).
%% @doc Detach all observers for the owner
-spec detach_all(zotonic_notifier:notifier(), pid()) -> ok | {error, term()}.
detach_all(Notifier, OwnerPid) when is_pid(OwnerPid) ->
gen_server:call(Notifier, {detach_all, OwnerPid}, infinity).
%% @doc Unsubscribe an owner-pid from an event.
-spec detach(zotonic_notifier:notifier(), zotonic_notifier:event(), pid()) -> ok | {error, term()}.
detach(Notifier, Event, OwnerPid) ->
gen_server:call(Notifier, {detach, Event, OwnerPid}, infinity).
%% @doc Return all observers
-spec get_observers(zotonic_notifier:notifier()) -> list().
get_observers(Notifier) ->
Table = observer_table_name(Notifier),
lists:flatten( ets:match(Table, '$1') ).
%% @doc Return all observers for a particular event
-spec get_observers(zotonic_notifier:notifier(), zotonic_notifier:event()) -> list().
get_observers(Notifier, Event) ->
Table = observer_table_name(Notifier),
case ets:lookup(Table, Event) of
[] -> [];
[{Event, Observers}] -> Observers
end.
%%====================================================================
%% API for notification
%% Calls are done in the calling process, to prevent copying of
%% possibly large contexts for small notifications.
%%====================================================================
%% @doc Cast the event to all observers. The prototype of the observer is: f(Msg, Context) -> void
notify_sync(Notifier, Event, Msg, ContextArg) ->
case get_observers(Notifier, Event) of
[] ->
ok;
Observers ->
lists:foreach(
fun(Obs) ->
notify_observer(Msg, Obs, false, ContextArg)
end,
Observers)
end.
%% @doc Cast the event to all observers. The prototype of the observer is: f(Msg, Context) -> void
notify_async(Notifier, Event, Msg, ContextArg) ->
case get_observers(Notifier, Event) of
[] ->
ok;
Observers ->
case lists:all(fun is_pid_observer/1, Observers) of
true ->
lists:foreach(
fun(Obs) ->
notify_observer(Msg, Obs, false, ContextArg)
end,
Observers);
false ->
MD = logger:get_process_metadata(),
erlang:spawn(
fun() ->
set_process_metadata(MD),
lists:foreach(
fun(Obs) ->
notify_observer(Msg, Obs, false, ContextArg)
end,
Observers)
end),
ok
end
end.
%% @doc Cast the event to the first observer. The prototype of the observer is: f(Msg, Context) -> void
notify1(Notifier, Event, Msg, ContextArg) ->
case get_observers(Notifier, Event) of
[] -> ok;
[ {_, Pid, _} = Obs | _ ] when is_pid(Pid) ->
notify_observer(Msg, Obs, false, ContextArg);
[ Obs | _ ] ->
MD = logger:get_process_metadata(),
erlang:spawn(
fun() ->
set_process_metadata(MD),
notify_observer(Msg, Obs, false, ContextArg)
end)
end.
set_process_metadata(undefined) -> ok;
set_process_metadata(MD) -> logger:set_process_metadata(MD).
%% @doc Call all observers till one returns something else than undefined.
%% The prototype of the observer is: f(Msg, Context)
first(Notifier, Event, Msg, ContextArg) ->
Observers = get_observers(Notifier, Event),
first_1(Observers, Msg, ContextArg).
first_1([], _Msg, _ContextArgs) ->
undefined;
first_1([Obs|Rest], Msg, ContextArg) ->
case notify_observer(Msg, Obs, true, ContextArg) of
undefined -> first_1(Rest, Msg, ContextArg);
continue -> first_1(Rest, Msg, ContextArg);
{continue, Msg1} -> first_1(Rest, Msg1, ContextArg);
Result -> Result
end.
%% @doc Call all observers, return the list of answers. The prototype of the
%% observer is: f(Msg, ContextArg)
map(Notifier, Event, Msg, ContextArg) ->
Observers = get_observers(Notifier, Event),
lists:map(
fun(Obs) ->
notify_observer(Msg, Obs, true, ContextArg)
end,
Observers).
%% @doc Do a fold over all observers, prio 1 observers first. The prototype of
%% the observer is: f(Msg, Acc, ContextArg)
foldl(Notifier, Event, Msg, Acc0, ContextArg) ->
Observers = get_observers(Notifier, Event),
lists:foldl(
fun(Obs, Acc) ->
notify_observer_fold(Msg, Obs, Acc, ContextArg)
end,
Acc0,
Observers).
%% @doc Do a fold over all observers, prio 1 observers last
foldr(Notifier, Event, Msg, Acc0, ContextArg) ->
Observers = get_observers(Notifier, Event),
lists:foldr(
fun(Obs, Acc) ->
notify_observer_fold(Msg, Obs, Acc, ContextArg)
end,
Acc0,
Observers).
%% @doc Subscribe once to a notification, detach after receiving the notification.
-spec await(zotonic_notifier:notifier(), zotonic_notifier:event(), atom()|tuple(), pos_integer()) ->
{ok, tuple()|atom()} |
{ok, {pid(), reference()}, tuple()|atom()} |
{error, timeout}.
await(Notifier, Event, Msg, Timeout) ->
observe(Notifier, Event, self(), self(), 1),
await_1(Notifier, Event, Msg, Timeout).
await_1(Notifier, Event, Msg, Timeout) ->
Result = await_receive(Msg, Timeout),
detach(Notifier, Event, self()),
Result.
await_receive(Msg, Timeout) when is_atom(Msg) ->
receive
Msg -> {ok, Msg};
M when is_tuple(M), element(1, M) =:= Msg -> {ok, M};
{'$gen_cast', Msg} -> {ok, Msg};
{'$gen_cast', M} when is_tuple(M), element(1, M) =:= Msg -> {ok, M};
{'$gen_call', From, Msg} -> {ok, From, Msg};
{'$gen_call', From, M} when is_tuple(M), element(1, M) =:= Msg -> {ok, From, M}
after Timeout ->
{error, timeout}
end.
-spec await_exact(zotonic_notifier:notifier(), zotonic_notifier:event(), term(), pos_integer()) ->
{ok, term()} |
{ok, {pid(), reference()}, term()} |
{error, timeout}.
await_exact(Notifier, Event, Msg, Timeout) ->
observe(Notifier, Event, self(), self(), 1),
Result = receive
Msg -> {ok, Msg};
{'$gen_cast', Msg} -> {ok, Msg};
{'$gen_call', From, Msg} -> {ok, From, Msg}
after Timeout ->
{error, timeout}
end,
detach(Notifier, Event, self()),
Result.
%%====================================================================
%% gen_server callbacks
%%====================================================================
%% @doc Initiates the server, creates a new observer list
-spec init(atom()) -> {ok, #state{}}.
init(Name) ->
logger:set_process_metadata(#{
name => Name,
module => ?MODULE
}),
Table = observer_table_name(Name),
ets:new(Table, [ named_table, set, {keypos, 1}, protected ]),
State = #state{
name = Name,
observers = Table,
monitors = #{},
pid2event = #{}
},
{ok, State}.
%% @doc Add an observer to an event
handle_call({observe, Event, Observer, OwnerPid, Prio}, _From, State) ->
do_add_observer(State#state.observers, Event, {Prio, Observer, OwnerPid}),
State1 = do_add_monitor(OwnerPid, State),
State2 = do_add_pid2event(OwnerPid, Event, State1),
{reply, ok, State2};
%% @doc Detach an observer from an event
handle_call({detach, Event, OwnerPid}, _From, State) ->
do_detach_observer(State#state.observers, Event, OwnerPid),
State1 = do_remove_pid2event(OwnerPid, Event, State),
{reply, ok, State1};
%% @doc Detach all observers owned by a pid
handle_call({detach_all, OwnerPid}, _From, State) ->
State1 = do_detach_all(OwnerPid, State),
{reply, ok, State1};
%% @doc Trap unknown calls
handle_call(Message, _From, State) ->
{stop, {unknown_call, Message}, State}.
%% @doc Trap unknown casts
handle_cast(Message, State) ->
{stop, {unknown_cast, Message}, State}.
%% @doc Handling all non call/cast messages
handle_info({'DOWN', _MonitorRef, process, Pid, _Info}, State) ->
State1 = do_detach_all(Pid, State),
{noreply, State1};
handle_info(_Info, State) ->
{noreply, State}.
-spec terminate(term(), #state{}) -> ok.
terminate(_Reason, _State) ->
ok.
%% @doc Convert process state when code is changed
-spec code_change(term(), #state{}, term()) -> {ok, #state{}}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%====================================================================
%% support functions
%%====================================================================
is_pid_observer({_Prio, Observer, _OwnerPid}) ->
is_pid(Observer).
do_detach_all(OwnerPid, #state{ observers = Table, pid2event = Ps, monitors = Ms } = State) ->
case maps:find(OwnerPid, Ps) of
{ok, Es} ->
lists:foreach(
fun(Event) ->
do_detach_observer(Table, Event, OwnerPid)
end,
Es);
error ->
ok
end,
State#state{
pid2event = maps:remove(OwnerPid, Ps),
monitors = maps:remove(OwnerPid, Ms)
}.
do_detach_observer(Table, Event, OwnerPid) ->
case ets:lookup(Table, Event) of
[] ->
ok;
[{Event, Observers}] ->
UpdatedObservers = lists:filter(
fun({_Prio, _Obs, Pid}) ->
Pid =/= OwnerPid
end,
Observers),
ets:insert(Table, {Event, UpdatedObservers})
end.
do_remove_pid2event(OwnerPid, Event, #state{ pid2event = P2E } = State) ->
case maps:find(OwnerPid, P2E) of
{ok, [ Event ]} ->
State#state{ pid2event = maps:remove(OwnerPid, P2E) };
{ok, Es} ->
Es1 = [ E || E <- Es, E =/= Event ],
State#state{ pid2event = P2E#{ OwnerPid => Es1 } };
error ->
State
end.
do_add_observer(Table, Event, {_Prio, Observer, OwnerPid} = PObs) ->
UpdatedObservers = case ets:lookup(Table, Event) of
[] ->
[ PObs ];
[{Event, Observers}] ->
% Prevent double observers, remove old observe first
OtherObservers = lists:filter(
fun({_, Obs, Pid}) ->
Obs =/= Observer orelse Pid =/= OwnerPid
end,
Observers),
lists:sort([ PObs | OtherObservers ])
end,
ets:insert(Table, {Event, UpdatedObservers}).
do_add_pid2event(Pid, Event, #state{ pid2event = P2E } = State) ->
case maps:find(Pid, P2E) of
{ok, Es} ->
case lists:member(Event, Es) of
true ->
State;
false ->
P2E1 = P2E#{ Pid => [ Event | Es ] },
State#state{ pid2event = P2E1 }
end;
error ->
P2E1 = P2E#{ Pid => [ Event ] },
State#state{ pid2event = P2E1 }
end.
do_add_monitor(Pid, #state{ monitors = Monitors } = State) ->
case maps:find(Pid, Monitors) of
{ok, _} ->
State;
error ->
MRef = erlang:monitor(process, Pid),
State#state{ monitors = Monitors#{ Pid => MRef } }
end.
% Return the name of the observer table
%
observer_table_name(zotonic_notifier) ->
'observers$zotonic_notifier';
observer_table_name(Name) when is_atom(Name) ->
list_to_atom("observers$" ++ atom_to_list(Name)).
%% @doc Notify an observer of an event
notify_observer(Msg, {_Prio, Pid, _OwnerPid}, true, ContextArg) when is_pid(Pid) ->
try
gen_server:call(Pid, {Msg, ContextArg}, ?TIMEOUT)
catch
EM:E:Trace ->
?LOG_ERROR(#{
text => <<"Error notifying observer">>,
in => zotonic_notifier,
result => EM,
reason => E,
pid => Pid,
event => Msg,
stack => Trace
}),
{error, {notify_observer, Pid, Msg, EM, E}}
end;
notify_observer(Msg, {_Prio, Pid, _OwnerPid}, false, ContextArg) when is_pid(Pid) ->
gen_server:cast(Pid, {Msg, ContextArg});
notify_observer(Msg, {_Prio, {M, F}, _OwnerPid}, _IsCall, ContextArg) ->
erlang:apply(M, F, [ Msg, ContextArg ]);
notify_observer(Msg, {_Prio, {M, F, As}, _OwnerPid}, _IsCall, ContextArg) ->
erlang:apply(M, F, As ++ [ Msg, ContextArg ]).
%% @doc Notify an observer of an event, used in fold operations. The receiving function should accept the message, the
%% accumulator and the context.
notify_observer_fold(Msg, {_Prio, Pid, _OwnerPid}, Acc, ContextArg) when is_pid(Pid) ->
try
gen_server:call(Pid, {Msg, Acc, ContextArg}, ?TIMEOUT)
catch
EM:E:Trace ->
?LOG_ERROR(#{
text => <<"Error folding observers">>,
in => zotonic_notifier,
pid => Pid,
result => EM,
reason => E,
message => Msg,
stack => Trace
}),
Acc
end;
notify_observer_fold(Msg, {_Prio, {M, F}, _OwnerPid}, Acc, ContextArg) ->
erlang:apply(M, F, [ Msg, Acc, ContextArg ]);
notify_observer_fold(Msg, {_Prio, {M, F, As}, _OwnerPid}, Acc, ContextArg) ->
erlang:apply(M, F, As ++ [ Msg, Acc, ContextArg ]).