src/partisan_monitor.erl

%% -------------------------------------------------------------------
%%
%% Copyright (c) 2016 Christopher Meiklejohn.  All Rights Reserved.
%% Copyright (c) 2022 Alejandro M. Ramallo. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License.  You may obtain
%% a copy of the License at
%%
%%   http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied.  See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------

%% -----------------------------------------------------------------------------
%% @doc This module is responsible for monitoring processes on remote nodes and
%% implementing the monitoring API provided by the `partisan' module which
%% follows the API provided by the Erlang modules `erlang' and `net_kernel'.
%%
%% <strong>YOU SHOULD NEVER USE the functions in this module directly.</strong>
%% Use the related functions in {@link partisan} instead.
%%
%% <blockquote class="warning">
%% <h4 class="warning">NOTICE</h4>
%% <p>At the moment this only works for
%% <code class="inline">partisan_pluggable_peer_service_manager</code> backend.
%% </p>
%% <p>Also, certain partisan_peer_service_manager implementations might not
%% support the
%% <code class="inline">partisan_peer_service_manager:on_up/2</code> and
%% <code class="inline">partisan_peer_service_manager:on_down/2</code>
%%  callbacks which we need for node monitoring, so in those cases this module
%%  will not work.
%% </p>
%% </blockquote>
%%
%% @TODO on a monitor request this server should monitor the caller so that we
%% can GC its references locally. This not only for monitor/2 but for
%% monitor_node/2 and monitor_nodes/2 (which means all or part of their logic
%% should run on the server.
%% @TODO to improve on concurrency/latency we will need to have a shard of
%% servers
%% @end
%% -----------------------------------------------------------------------------
-module(partisan_monitor).

-behaviour(partisan_gen_server).

-include("partisan.hrl").
-include("partisan_logger.hrl").

-define(DUMMY_MREF_KEY, {?MODULE, monitor_ref}).
-define(IS_ENABLED, persistent_term:get({?MODULE, enabled})).

%% Table to record remote processes monitoring a local process
%% stores proc_mon_in() records
-define(PROC_MON_IN, partisan_proc_mon_in).

%% An index over ?PROC_MON_IN
%% refs by node, used to notify/cleanup on a nodedown signal
%% contains objects of type proc_mon_in_idx()
-define(PROC_MON_IN_IDX, partisan_proc_mon_in_idx).

%% Table to record local processes monitoring a remote processes.
%% For every record in ?PROC_MON_IN we have companion record in this table,
%% This is to be able to send a signal to the local monitoring process
%% when the remote node crashes or connection is lost.
%% contains objects of type proc_mon_out()
-define(PROC_MON_OUT, partisan_proc_mon_out).

%% An index over ?PROC_MON_OUT
%% refs grouped by node, used to notify/cleanup on a nodedown signal
%% contains objects of typeproc_mon_in_idx()
-define(PROC_MON_OUT_IDX, partisan_proc_mon_out_idx).

%% Table to record local processes monitoring nodes
%% contains objects of type node_mon()
-define(NODE_MON, partisan_node_mon).

%% Local pids that are monitoring all nodes of a certain type
%% contains objects of type node_type_mon()
-define(NODE_TYPE_MON, partisan_node_type_mon).


-record(state, {
    %% whether monitoring is enabled,
    %% depends on partisan_peer_service_manager offering support for on_up/down
    enabled                         ::  boolean(),
    %% A map to store async requests
    requests                        ::  #{reference() => pid()},
    %% We cache a snapshot of the nodes, so that if we are terminated we can
    %% notify the subscriptions. This is the set of nodes we are currently
    %% connected to. Also this might be a partial view of the whole cluster,
    %% dependending on the peer_service_manager backend topology.
    nodes                           ::  sets:set(node())
}).


-record(partisan_proc_mon_in, {
    %% The local monitor reference obtaind by erlang:monitor/2
    ref                             ::  reference(),
    %% The local process that is being monitored
    monitored                       ::  pid() | atom(),
    %% A remote process monitoring a local process (monitored)
    monitor                         ::  partisan:remote_pid()
                                        | partisan:remote_name(),
    %% The channel signals should be forwarded on
    channel                         ::  partisan:channel()
}).

-record(partisan_proc_mon_out, {
    %% The remote monitor reference
    ref                             ::  partisan:remote_reference(),
    %% The remote process being monitored
    monitored                       ::  partisan:remote_pid()
                                        | partisan:remote_name(),
    %% A local process monitoring the remote process
    monitor                         ::  pid() | atom()
}).


-record(partisan_node_type_mon, {
    key                             ::  {Monitor :: pid(), Hash :: integer()},
    node_type                       ::  all | visible | hidden,
    nodedown_reason                 ::  boolean()
}).


%% TODO add channel
-type proc_mon_in()                 ::  #partisan_proc_mon_in{}.
-type proc_mon_in_idx()             ::  {node(), reference()}.
-type proc_mon_out()                ::  #partisan_proc_mon_out{}.
-type proc_mon_out_idx()            ::  {node(), partisan:remote_reference()}.
-type node_mon()                    ::  {node(), pid()}.
-type node_type_mon()               ::  #partisan_node_type_mon{}.
-type node_type_mon_opts()          ::  {
                                            Type :: all | visible | hidden,
                                            InclReason :: boolean()
                                        }.


% API
-export([demonitor/2]).
-export([monitor/2]).
-export([monitor_node/2]).
-export([monitor_nodes/2]).
-export([start_link/0]).

%% gen_server callbacks
-export([code_change/3]).
-export([handle_call/3]).
-export([handle_cast/2]).
-export([handle_info/2]).
-export([init/1]).
-export([terminate/2]).

-compile({no_auto_import, [monitor_node/2]}).
-compile({no_auto_import, [monitor/3]}).
-compile({no_auto_import, [demonitor/2]}).




%% =============================================================================
%% API
%% =============================================================================



%% -----------------------------------------------------------------------------
%% @doc Starts the `partisan_monitor' server.
%%
%% There is one `partisan_monitor' server instance per node.
%% @end
%% -----------------------------------------------------------------------------
start_link() ->
    Opts = [
        {spawn_opt, ?PARALLEL_SIGNAL_OPTIMISATION([])}
    ],
    partisan_gen_server:start_link({local, ?MODULE}, ?MODULE, [], Opts).


%% -----------------------------------------------------------------------------
%% @doc When you attempt to monitor a remote process, it is not
%% guaranteed that you will receive the DOWN message. A few reasons for not
%% receiving the message are message loss and tree reconfiguration.
%% The monitor options `Opts' are currently ignored.
%%
%% Failure:
%% <dl>
%% <dt>`notalive'</dt><dd>If the partisan_monitor server process is not
%% alive.</dd>
%% <dt>`not_implemented'</dt><dd>If the partisan peer service manager in use
%% doesn't support the capabilities required for monitoring.</dd>
%% <dt>`badarg'</dt><dd>If any of the arguments are invalid.</dd>
%% </dl>
%% @end
%% -----------------------------------------------------------------------------
-spec monitor(
    Process :: partisan:remote_pid() | partisan:remote_name(),
    Opts :: [partisan:monitor_opt()]) -> partisan:remote_reference() | no_return().

monitor(Process, Opts) when is_list(Opts) ->
    partisan_remote_ref:is_pid(Process)
        orelse partisan_remote_ref:is_name(Process)
        orelse error(badarg),

    %% This might be a circular call. This occurs because this server
    %% implements the partisan_gen_server behaviour and both
    %% partisan_gen_server and partisan_gen use this server for monitoring.
    %% We also need to skip the request when somebody else is trying to monitor
    %% this server or a remote partisan_monitor server (saving the roundtrip).
    %% To solve the issue, we skip monitoring and return a node-wide static
    %% dummy reference.
    case is_monitor_server(Process) of
        true ->
            %% Return a static dummy reference and ignore the request
            persistent_term:get(?DUMMY_MREF_KEY);

        false ->
            case partisan_remote_ref:is_local(Process) of
                true ->
                    PidOrName = partisan_remote_ref:to_pid_or_name(Process),
                    %% partisan:monitor will coerce Opts to erlang monitor opts
                    partisan:monitor(process, PidOrName, Opts);

                false ->
                    Node = partisan_remote_ref:node(Process),
                    Channel = get_option(channel, Opts, ?DEFAULT_CHANNEL),
                    %% Whether we fallback to the default channel if the
                    %% requested channel is not connected
                    Fallback = get_option(channel_fallback, Opts, true),

                    IsConnected =
                        case Channel of
                            ?DEFAULT_CHANNEL ->
                                partisan_peer_connections:is_connected(Node);
                            _ ->
                                %% channel is connected orelse
                                %% falling back to the default channel is
                                %% enabled and default channel is connected
                                partisan_peer_connections:is_connected(
                                    Node, Channel
                                ) orelse (
                                    Fallback == true andalso
                                    partisan_peer_connections:is_connected(
                                        Node
                                    )
                                )
                        end,

                    %% Finally monitor
                    monitor(Process, Opts, {connected, IsConnected})
            end
    end.


%% -----------------------------------------------------------------------------
%% @doc
%% Failure:
%% <ul>
%% <li>`notalive' if the partisan_monitor process is not alive.</li>
%% <li>`not_implemented' if the partisan peer service manager does not support
%% the required capabilities required for monitoring.</li>
%% <li>`badarg' if any of the arguments is invalid.</li>
%% </ul>
%% @end
%% -----------------------------------------------------------------------------
-spec demonitor(
    MonitoredRef :: partisan:remote_reference(),
    Opts :: [partisan:demonitor_opt()]
    ) -> boolean() | no_return().

demonitor(MPRef, Opts) ->
    partisan_remote_ref:is_reference(MPRef)
        orelse erlang:error(badarg, [MPRef, Opts], [
            {error_info, #{
                cause => #{1 => "not a partisan remote reference"}
            }}
        ]),

    Skip =
        %% Is this a dummy reference or a circular call?
        MPRef == persistent_term:get(?DUMMY_MREF_KEY)
        orelse is_monitor_server(),

    case Skip of
        true ->
            true;

        false ->
            Node = partisan_remote_ref:node(MPRef),

            %% We remove the local references.
            ok = del_proc_mon_out(MPRef),
            ok = del_proc_mon_out_idx(Node, MPRef),

            %% We call the remote node to demonitor.
            %% If the remote server is unreachable we assume we lost connection
            %% and thus it must have cleaned up our references.
            case call({?MODULE, Node}, {demonitor, MPRef, Opts}, 3000) of
                {ok, Bool} ->
                    case lists:member(flush, Opts) of
                        true ->
                            receive
                                {_, MPRef, _, _, _} ->
                                    Bool
                            after
                                0 ->
                                    Bool
                            end;
                        false ->
                            Bool
                    end;
                {error, noconnection} ->
                    true;
                {error, timeout} ->
                    true;
                {error, noproc} ->
                    true;
                {error, {nodedown, _}} ->
                    true;
                {error, Reason} ->
                    ErrOpts = [{error_info, #{cause => Reason}}],
                    erlang:error(Reason, [MPRef, Opts], ErrOpts)
            end
    end.


%% -----------------------------------------------------------------------------
%% @doc Monitor the status of the node `Node'. If Flag is true, monitoring is
%% turned on. If `Flag' is `false', monitoring is turned off.
%%
%% Making several calls to `monitor_node(Node, true)' for the same `Node'
%% is not an error; it results in as many independent monitoring instances as
%% the number of different calling processes i.e. If a process has made two
%% calls to `monitor_node(Node, true)' and `Node' terminates, only one
%% `nodedown' message is delivered to the process (this differs from {@link
%% erlang:monitor_node/2}).
%%
%% If `Node' fails or does not exist, the message `{nodedown, Node}' is
%% delivered to the calling process. If there is no connection to Node, a
%% `nodedown' message is delivered. As a result when using a membership
%% strategy that uses a partial view, you cannot monitor nodes that are not
%% members of the view.
%%
%% Failure:
%% <ul>
%% <li>`notalive' if the partisan_monitor process is not alive.</li>
%% <li>`not_implemented' if the partisan peer service manager does not support
%% the required capabilities required for monitoring.</li>
%% <li>`badarg' if any of the arguments is invalid.</li>
%% </ul>
%%
%% This function is executed in the calling process.
%% @end
%% -----------------------------------------------------------------------------
-spec monitor_node(node() | partisan:node_spec(), boolean()) -> true.

monitor_node(#{name := Node}, Flag) ->
    monitor_node(Node, Flag);

monitor_node(Node, Flag) when is_atom(Node) ->
    %% TODO WE need the server to monitor the caller, so that we can cleanup if
    %% caller crashes!! Or store them in process dictionary
    case partisan_peer_connections:is_connected(Node) of
        true when Flag == true ->
            add_node_monitor(Node, self());

        false when Flag == true ->
            %% The node is down.
            %% We don't record the request and immediately send a
            %% nodedown signal
            self() ! {nodedown, Node},
            true;

        _ when Flag == false ->
            del_node_monitor(Node, self())
    end.


%% -----------------------------------------------------------------------------
%% @doc The calling process subscribes or unsubscribes to node status change
%% messages. A nodeup message is delivered to all subscribing processes when a
%% new node is connected, and a nodedown message is delivered when a node is
%% disconnected.
%% If Flag is true, a new subscription is started. If Flag is false, all
%% previous subscriptions started with the same Options are stopped. Two option
%% lists are considered the same if they contain the same set of options.
%%
%% Notice that the following two disterl guarantees are NOT yet provided by
%% Partisan:
%% <ul>
%% <li>`nodeup' messages are delivered before delivery of any message from the
%% remote node passed through the newly established connection.</li>
%% <li>`nodedown' messages are not delivered until all messages from the remote
%% node that have been passed through the connection have been delivered.</li>
%% </ul>
%%
%% This function is executed in the calling process.
%% @end
%% -----------------------------------------------------------------------------
-spec monitor_nodes(Flag :: boolean(), [partisan:monitor_nodes_opt()]) ->
    ok | error | {error, notalive | not_implemented | badarg}.

monitor_nodes(Flag, Opts0) when is_boolean(Flag), is_list(Opts0) ->
    case ?IS_ENABLED of
        true ->
            case parse_nodemon_opts(Opts0) of
                {hidden, _} ->
                    %% Do nothing as we do not have hidden nodes in Partisan
                    ok;
                Opts when Flag == true ->
                    add_node_type_mon(self(), Opts);
                Opts when Flag == false ->
                    del_note_type_mon(self(), Opts)
            end;
        false ->
            error
    end.



%% =============================================================================
%% GEN_SERVER CALLBACKS
%% =============================================================================



init([]) ->
    %% We trap exists so that we get a call to terminate w/reason shutdown when
    %% the supervisor terminates us. This happens when
    %% partisan_peer_service:manager() is terminated.
    erlang:process_flag(trap_exit, true),

    %% We subscribe to node status to implement node monitoring.
    %% Certain partisan_peer_service_manager implementations might not support
    %% the on_up and on_down events which we need for node monitoring, so in
    %% those cases this module will not work.
    Enabled = subscribe_to_node_status(),
    _ = persistent_term:put({?MODULE, enabled}, Enabled),


    _ = subscribe_to_channel_status(),

    %% partisan_gen behaviours call monitor/2 and demonitor/1 so being
    %% this server one, that would create a deadlock due to a circular call.
    %% We use a static dummy ref for those calls to avoid calling ourselves.
    _ = persistent_term:put(?DUMMY_MREF_KEY, partisan:make_ref()),

    TabOpts = [
        named_table,
        public,
        {write_concurrency, true},
        {read_concurrency, true},
        {decentralized_counters, true}
    ],

    %% Tables for process monitoring
    _ = ets:new(?PROC_MON_IN,       [set, {keypos, 2} | TabOpts]),
    _ = ets:new(?PROC_MON_IN_IDX,   [bag, {keypos, 1} | TabOpts]),
    _ = ets:new(?PROC_MON_OUT,      [set, {keypos, 2} | TabOpts]),
    _ = ets:new(?PROC_MON_OUT_IDX,  [bag, {keypos, 1} | TabOpts]),

    %% Tables for node status monitoring
    _ = ets:new(?NODE_MON,          [duplicate_bag, {keypos, 1} | TabOpts]),
    _ = ets:new(?NODE_TYPE_MON,     [set, {keypos, 2} | TabOpts]),

    State = #state{
        enabled = Enabled,
        requests = #{},
        nodes = sets:new([{version, 2}])
    },

    {ok, State}.


handle_call(_, _, #state{enabled = false} = State) ->
    %% The peer service manager does not implement support for remote monitoring
    %% Instead of failing we return the dummy ref
    Reply = {ok, persistent_term:get(?DUMMY_MREF_KEY)},
    {reply, Reply, State};

handle_call({monitor, Monitor, _}, {Monitor, _}, State) ->
    %% A circular call (partisan_gen)
    Reply = {ok, persistent_term:get(?DUMMY_MREF_KEY)},
    {reply, Reply, State};

handle_call({monitor, Process, Opts}, {Monitor, _}, State) ->
    %% A remote process (Monitor) wants to monitor a process (Process) on
    %% this node.

    %% This must be sequential, because in case the process we want to monitor
    %% is dead, we will get the reference and immediately the DOWN signal, so
    %% we need to return the reference to the user before the signal reaches it.

    %% We did this check in monitor/2, but we double check again in case
    %% someone is calling partisan_gen_server:call directly.
    Reply =
        case is_monitor_server(Process) of
            true ->
                %% The case for a process monitoring this server, this server
                %% monitoring itself or another node's monitor server monitoring
                %% this one.
                {ok, persistent_term:get(?DUMMY_MREF_KEY)};

            false ->
                %% TODO Implement options
                %%  {tag, UserDefinedTag} option
                try
                    Node = partisan_remote_ref:node(Monitor),

                    %% Process can be a pid or registered name for a local
                    %% process
                    PidOrName = partisan_remote_ref:to_pid_or_name(Process),

                    %% We monitor the process on behalf of the remote caller.
                    %% We will handle the EXIT signal and forward it to
                    %% Monitor when it occurs.
                    Mref = erlang:monitor(process, PidOrName),

                    Channel = get_option(channel, Opts, ?DEFAULT_CHANNEL),

                    %% We track the Mref to match the 'DOWN' signal
                    ok = add_proc_mon_in(
                        Node, Channel, Mref, PidOrName, Monitor
                    ),

                    %% We reply with the encoded monitor reference
                    {ok, partisan_remote_ref:from_term(Mref)}

                catch
                    error:badarg ->
                        {error, badarg}
                end
        end,

    {reply, Reply, State};

handle_call({demonitor, RemoteRef, Opts}, {_Monitor, _}, State) ->
    %% A remote process is requesting a demonitor
    case RemoteRef == persistent_term:get(?DUMMY_MREF_KEY) of
        true ->
            %% We skip.
            %% The case for a process monitoring this server or another node's
            %% monitor server monitoring this one.
            {reply, {ok, true}, State};
        false ->
            Reply = do_demonitor(RemoteRef, Opts),
            {reply, Reply, State}
    end;

handle_call(_Msg, _From, State) ->
    {reply, {error, unsupported_call}, State}.


handle_cast({'DOWN', Mref, process, _Process, Reason}, State) ->
    %% A down signal for a remote process forwarded to us by a remote
    %% partisan_monitor server.
    %% We do this as ooposed to sending the signal to each process directly as
    %% we need to cleanup the local state i.e. proc_mon_out() and
    %% proc_mon_out_idx().
    Fun = fun() ->
        case take_proc_mon_out(Mref) of
            #partisan_proc_mon_out{} = M ->
                Monitor = M#partisan_proc_mon_out.monitor,
                Monitored = M#partisan_proc_mon_out.monitored,
                Node = partisan_remote_ref:node(Monitored),

                ok = del_proc_mon_out_idx(Node, Mref),

                Tag = 'DOWN', %% can be user-defined tag when we enabled it

                Down = {
                    Tag,
                    Mref,
                    process,
                    Monitored,
                    Reason
                },
                partisan:send(Monitor, Down);

            error ->
                ok
        end,
        ok
    end,

    _ = spawn(Fun),

    {noreply, State};

handle_cast(_Msg, State) ->
    {noreply, State}.


handle_info(_, #state{enabled = false} = State) ->
    %% Functionality disabled
    {noreply, State};

handle_info({'DOWN', Mref, process, _Process, Reason}, State) ->
    %% A process we are monitoring has terminated
    %% We asynchronously send the process down signal to the monitors

    Fun = fun() ->
        case take_proc_mon_in(Mref) of
            #partisan_proc_mon_in{} = M ->
                Monitored0 = M#partisan_proc_mon_in.monitored,
                Monitor = M#partisan_proc_mon_in.monitor,
                Channel = M#partisan_proc_mon_in.channel,

                %% Cleanup index
                Node = partisan:node(Monitor),
                del_proc_mon_in_idx(Node, Mref),

                %% Send down signal
                Monitored = partisan_remote_ref:from_term(Monitored0),

                ok = cast_signal(
                    Monitor,
                    'DOWN',
                    Mref,
                    %% eqwalizer:ignore Monitored
                    Monitored,
                    Reason,
                    [{channel, Channel}]
                );
            error ->
                ok
        end
    end,

    _ = spawn(Fun),

    {noreply, State};

handle_info({nodeup, Node}, State0) ->
    %% Either a net_kernel or Partisan signal

    ?LOG_DEBUG(#{
        description => "Got nodeup signal",
        node => Node
    }),

    %% We process the signal asynchronously
    _ = spawn(fun() -> on_nodeup(Node) end),

    %% We update the node list cache
    State = State0#state{
        nodes = sets:add_element(Node, State0#state.nodes)
    },

    {noreply, State};

handle_info({nodedown, Node}, State0) ->
    %% Either a net_kernel or Partisan signal

    ?LOG_DEBUG(#{
        description => "Got nodedown signal",
        node => Node
    }),

    %% We process the signal asynchronously
    _ = spawn(fun() -> on_nodedown(Node, noconnection) end),

    State = State0#state{
        nodes = sets:del_element(Node, State0#state.nodes)
    },

    {noreply, State};

handle_info(_Msg, State) ->
    {noreply, State}.


terminate(shutdown, _State) ->
    ok;

terminate(_Reason, _State) ->
    ok.


code_change(_OldVsn, State, _Extra) ->
    {ok, State}.



%% =============================================================================
%% PRIVATE
%% =============================================================================


%% @private
get_option(Key, L, Default) ->
    case lists:keyfind(Key, 1, L) of
        {Key, Value} -> Value;
        false -> Default
    end.


%% -----------------------------------------------------------------------------
%% @private
%% @doc  We subscribe to either net_kernel's or Partisan's node status
%% signals to update an ets table that tracks each node status.
%% @end
%% -----------------------------------------------------------------------------
subscribe_to_node_status() ->
    case partisan_config:get(connect_disterl, false) of
        true ->
            case net_kernel:monitor_nodes(true) of
                ok ->
                    true;
                error ->
                    error({monitor_nodes_failed, unknown});
                {error, Reason} ->
                    error({monitor_nodes_failed, Reason})
            end;

        false ->
            Me = self(),

            %% We subscribe to the nodeup event for all nodes
            Res1 = partisan_peer_service:on_up(
                '_',
                fun(Node) -> Me ! {nodeup, Node} end
            ),

            %% We subscribe to the nodedown event for all nodes
            Res2 = partisan_peer_service:on_down(
                '_',
                fun(Node) -> Me ! {nodedown, Node} end
            ),

            %% Not all service managers implement this capability so the result
            %% can be an error.
            Res1 =:= ok andalso Res2 =:= ok
    end.


%% -----------------------------------------------------------------------------
%% @private
%% @doc
%% @end
%% -----------------------------------------------------------------------------
subscribe_to_channel_status() ->
    case partisan_config:get(connect_disterl, false) of
        true ->
            true;

        false ->
            Me = self(),

            %% We subscribe to the channelup event for all nodes
            Res1 = partisan_peer_service:on_up(
                '_',
                fun(Node, Channel) -> Me ! {channelup, Node, Channel} end,
                #{channel => '_'}
            ),

            %% We subscribe to the channeldown event for all nodes
            Res2 = partisan_peer_service:on_down(
                '_',
                fun(Node, Channel) -> Me ! {channeldown, Node, Channel} end,
                #{channel => '_'}
            ),

            %% Not all service managers implement this capability so the result
            %% can be an error.
            Res1 =:= ok andalso Res2 =:= ok
    end.


%% -----------------------------------------------------------------------------
%% @private
%% @doc This call is executed by the caller's process
%% @end
%% -----------------------------------------------------------------------------
-spec monitor(
    RemoteRef :: partisan:remote_pid() | partisan:remote_name(),
    Opts :: [partisan:monitor_opt()],
    Status :: {connected, boolean()} | noconnection | timeout | noproc) ->
    partisan:remote_reference() | no_return().

monitor(Process, Opts, {connected, true}) ->
    %% We call the remote partisan_monitor process to
    %% request a monitor
    Node = partisan_remote_ref:node(Process),
    Cmd = {monitor, Process, Opts},

    case call({?MODULE, Node}, Cmd) of
        {ok, Mref} ->
            %% We add a local reference to the remote Mref so that we can
            %% simulate a process DOWN signal when node is down
            ok = add_proc_mon_out(Mref, Process, self()),
            ok = add_proc_mon_out_idx(Node, Mref),
            Mref;
        {error, timeout} ->
            monitor(Process, Opts, timeout);
        {error, noproc} ->
            monitor(Process, Opts, noproc);
        {error, {nodedown, _}} ->
            monitor(Process, Opts, noconnection);
        {error, Reason} ->
            ErrOpts = [{error_info, #{cause => Reason}}],
            erlang:error(Reason, [Process, Opts], ErrOpts)
    end;

monitor(Process, Opts, {connected, false}) ->
    monitor(Process, Opts, noconnection);

monitor(Process, _Opts, Reason) ->
    %% We reply a transient ref and we immediately send a DOWN signal
    Mref = partisan:make_ref(),

    %% Because this is performed in the caller's process the signal can only be
    %% received after we return.
    Down = {'DOWN', Mref, process, Process, Reason},
    self() ! Down,

    Mref.


%% -----------------------------------------------------------------------------
%% @private
%% @doc
%% @end
%% -----------------------------------------------------------------------------
do_demonitor(Term) ->
    do_demonitor(Term, []).


%% -----------------------------------------------------------------------------
%% @private
%% @doc
%% @end
%% -----------------------------------------------------------------------------
do_demonitor(Term, Opts) ->
    try
        Mref = decode_ref(Term),
        Bool = erlang:demonitor(Mref, Opts),

        case take_proc_mon_in(Mref) of
            #partisan_proc_mon_in{} = M ->
                Mref = M#partisan_proc_mon_in.ref,
                Monitor = M#partisan_proc_mon_in.monitor,
                del_proc_mon_in_idx(partisan:node(Monitor), Mref);
            error ->
                ok
        end,

        {ok, Bool}

    catch
        _:_ ->
            {error, badarg}
    end.


%% -----------------------------------------------------------------------------
%% @private
%% @doc
%% @end
%% -----------------------------------------------------------------------------
call(ServerRef, Message) ->
    call(ServerRef, Message, 5000).


%% -----------------------------------------------------------------------------
%% @private
%% @doc
%% @end
%% -----------------------------------------------------------------------------
call(ServerRef, Message, Timeout) ->
    try
        partisan_gen_server:call(ServerRef, Message, Timeout)
    catch
        exit:{timeout, _} ->
            {error, timeout};
        exit:{noproc, _} ->
            {error, notalive}
    end.


%% -----------------------------------------------------------------------------
%% @private
%% @doc This functions assumes we have a singleton partisan_monitor server per
%% node.
%% @end
%% -----------------------------------------------------------------------------
is_monitor_server() ->
    {registered_name, ?MODULE} == erlang:process_info(self(), registered_name).


%% -----------------------------------------------------------------------------
%% @private
%% @doc This functions assumes we have a singleton partisan_monitor server per
%% node.
%% @end
%% -----------------------------------------------------------------------------
is_monitor_server(Process) when is_pid(Process), Process == self() ->
    true;

is_monitor_server(Process) when is_pid(Process) ->
    %% Validate. In principle this could not happen now that partisan:monitor
    %% only send us remote pids
    {registered_name, ?MODULE} == erlang:process_info(self(), registered_name);

is_monitor_server(Process) ->
    %% This does not catch the case where Process is the pid of a remote server,
    %% we just forward the message as checking for regname will also incur
    %% sending a message to the remote server.
    partisan_remote_ref:is_local_pid(Process, self())
        %% Or is this a remote partisan_monitor server?
        orelse partisan_remote_ref:is_name(Process, ?MODULE).


%% @private
decode_ref(Ref) when is_reference(Ref) ->
    Ref;

decode_ref(RemoteRef) ->
    partisan_remote_ref:to_term(RemoteRef).



%% =============================================================================
%% PRIVATE: SIGNALING
%% =============================================================================



%% -----------------------------------------------------------------------------
%% @private
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec cast_signal(
    Dest :: partisan:remote_pid() | partisan:remote_name(),
    Tag  :: term(),
    Mref :: reference() | partisan:remote_reference(),
    Monitored :: partisan:remote_pid() | partisan:remote_name(),
    Reason :: any(),
    Opts :: [tuple()]
    ) -> ok.

cast_signal(Dest, Tag, Mref0, Monitored, Reason, Opts)
when is_reference(Mref0) ->
    Mref = partisan_remote_ref:from_term(Mref0),
    %% eqwalizer:ignore Mref
    cast_signal(Dest, Tag, Mref, Monitored, Reason, Opts);

cast_signal(Dest, Tag, Mref, {Name, Node}, Reason, Opts)
when is_atom(Name), is_atom(Node) ->
    Ref = partisan_remote_ref:from_term(Name, Node),
    %% eqwalizer:ignore Ref
    cast_signal(Dest, Tag, Mref, Ref, Reason, Opts);

cast_signal(Dest, Tag, Mref, Monitored, Reason, Opts) ->
    Node = partisan_remote_ref:node(Dest),

    Down = {
        Tag,
        Mref,
        process,
        Monitored,
        Reason
    },

    try
        partisan_gen_server:cast({?MODULE, Node}, Down, Opts)
    catch
        exit:noproc ->
            ok
    end.


%% -----------------------------------------------------------------------------
%% @private
%% @doc
%% @end
%% -----------------------------------------------------------------------------
on_nodeup(Node) when is_atom(Node) ->
    Msg = {nodeup, Node},
    ExtMsg = {nodeup, Node, []},

    notify_node_type_monitors(Msg, ExtMsg).


%% -----------------------------------------------------------------------------
%% @private
%% @doc
%% @end
%% -----------------------------------------------------------------------------
on_nodedown(Node, Reason) when is_atom(Node) ->

    %% We send proc DOWN signal to all local monitors i.e. local processes
    %% monitoring remote processes on Node
    ok = notify_proc_mon_out(Node, Reason),

    %% We demonitor all proc_mon_in for remote monitors moniroting proceseses
    %% on this node. This is because Node will be sending them the process DOWN
    %% locally by executing this same function i.e. the previous call to
    %% notify_proc_mon_out/3
    lists:foreach(
        fun({_, Ref}) -> do_demonitor(Ref) end,
        proc_mon_in_indices(Node)
    ),

    %% Finally we need to notify individual and wildcard nodedown monitors
    Msg = {nodedown, Node},
    ExtMsg = {nodedown, Node, [{nodedown_reason, Reason}]},

    ok = notify_node_monitors(Node, Msg),

    ok = notify_node_type_monitors(Msg, ExtMsg).



%% -----------------------------------------------------------------------------
%% @private
%% @doc We send the process DOWN signal to all local processes monitoring
%% remote processes on Node and remove the cache.
%% @end
%% -----------------------------------------------------------------------------
notify_proc_mon_out(Node, Reason) ->
    lists:foreach(
        fun
            ({_, Mref}) ->
                case proc_mon_out(Mref) of
                    [] ->
                        ok;
                    [#partisan_proc_mon_out{} = M] ->
                        Monitored = M#partisan_proc_mon_out.monitored,
                        Monitor = M#partisan_proc_mon_out.monitor,

                        ok = del_proc_mon_out(M),
                        ok = del_proc_mon_out_idx(Node, Mref),

                        Down = {
                            'DOWN',
                            Mref,
                            process,
                            Monitored,
                            Reason
                        },

                        Monitor ! Down
                end
        end,
        proc_mon_out_indices(Node)
    ),
    ok.


%% -----------------------------------------------------------------------------
%% @private
%% @doc
%% @end
%% -----------------------------------------------------------------------------
notify_node_monitors(Node, Msg) ->
    _ = [Pid ! Msg || {_, Pid} <- node_monitors(Node)],
    ok.


%% -----------------------------------------------------------------------------
%% @private
%% @doc We send the nodedown signal to all local processes monitoring
%% ALL nodes
%% @end
%% -----------------------------------------------------------------------------
 notify_node_type_monitors(Msg, ExtMsg) ->
    ets:foldl(
        fun(#partisan_node_type_mon{key = {Pid, _}} = M, ok) ->
            case M#partisan_node_type_mon.nodedown_reason of
                true ->
                    partisan:forward_message(Pid, ExtMsg),
                    ok;
                false ->
                    partisan:forward_message(Pid, Msg),
                    ok
            end
        end,
        ok,
        ?NODE_TYPE_MON
    ),
    ok.



%% -----------------------------------------------------------------------------
%% @private
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec parse_nodemon_opts([partisan:monitor_nodes_opt()]) ->
    node_type_mon_opts().

parse_nodemon_opts(Opts0) when is_list(Opts0) ->
    Type = case lists:keyfind(node_type, 1, Opts0) of
        {node_type, Val} ->
            Val;
        false ->
            all
    end,
    InclReason = lists:member(nodedown_reason, Opts0),
    {Type, InclReason}.



%% =============================================================================
%% PRIVATE: STORAGE OPS
%% =============================================================================



%% -----------------------------------------------------------------------------
%% @private
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec new_process_mon
    (reference(), pid(), partisan:remote_pid(), partisan:channel()) ->
        proc_mon_in();
    (reference(), partisan:remote_pid(), pid(), partisan:channel()) ->
        proc_mon_in().

new_process_mon(Mref, Monitored, Monitor, Channel)
when is_reference(Mref) andalso
(is_pid(Monitored) orelse is_atom(Monitored)) andalso
is_atom(Channel) ->
    #partisan_proc_mon_in{
        ref = Mref,
        monitor = Monitor,
        channel = Channel,
        monitored = Monitored
    }.


%% -----------------------------------------------------------------------------
%% @private
%% @doc
%% @end
%% -----------------------------------------------------------------------------
add_proc_mon_in(Node, Channel, Mref, Monitored, Monitor) ->
    Obj = new_process_mon(Mref, Monitored, Monitor, Channel),
    _ = ets:insert(?PROC_MON_IN, Obj),

    %% We create an index so that we can locate all proc_mon_in() objects
    %% associated with a node
    add_proc_mon_in_idx(Node, Mref).


%% -----------------------------------------------------------------------------
%% @private
%% @doc
%% @end
%% -----------------------------------------------------------------------------
take_proc_mon_in(Mref) ->
    case ets:take(?PROC_MON_IN, Mref) of
        [#partisan_proc_mon_in{ref = Mref} = M] ->
            M;
        [] ->
            error
    end.


%% -----------------------------------------------------------------------------
%% @private
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec new_proc_mon_in_idx(Node :: node(), Ref :: reference()) ->
    proc_mon_in_idx().

new_proc_mon_in_idx(Node, Mref) when is_atom(Node), is_reference(Mref) ->
    {Node, Mref}.


%% -----------------------------------------------------------------------------
%% @private
%% @doc
%% @end
%% -----------------------------------------------------------------------------
add_proc_mon_in_idx(Node, Mref) when is_reference(Mref) ->
    _ = ets:insert(?PROC_MON_IN_IDX, new_proc_mon_in_idx(Node, Mref)),
    ok.


%% -----------------------------------------------------------------------------
%% @private
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec del_proc_mon_in_idx(node(), reference()) -> ok.

del_proc_mon_in_idx(Node, Mref) when is_reference(Mref) ->
    _ = ets:delete_object(?PROC_MON_IN_IDX, {Node, Mref}),
    ok.


%% -----------------------------------------------------------------------------
%% @private
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec proc_mon_in_indices(node()) -> [proc_mon_in_idx()].

proc_mon_in_indices(Node) ->
    ets:lookup(?PROC_MON_IN_IDX, Node).


%% -----------------------------------------------------------------------------
%% @private
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec add_proc_mon_out(
    Mref :: partisan:remote_reference(),
    Monitored :: partisan:remote_pid() | partisan:remote_name(),
    Monitor :: pid()
 ) -> ok.

add_proc_mon_out(Mref, Monitored, Monitor)
when is_pid(Monitor); is_atom(Monitor) ->
    Obj = #partisan_proc_mon_out{
        ref = Mref,
        monitored = Monitored,
        monitor = Monitor
    },
    _ = ets:insert(?PROC_MON_OUT, Obj),
    ok.


%% -----------------------------------------------------------------------------
%% @private
%% @doc
%% @end
%% -----------------------------------------------------------------------------
take_proc_mon_out(Mref) ->
    case ets:take(?PROC_MON_OUT, Mref) of
        [#partisan_proc_mon_out{ref = Mref} = M] ->
            M;
        [] ->
            error
    end.


%% -----------------------------------------------------------------------------
%% @private
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec del_proc_mon_out(proc_mon_out() | partisan:remote_reference()) -> ok.

del_proc_mon_out(#partisan_proc_mon_out{} = Obj) ->
    true = ets:delete_object(?PROC_MON_OUT, Obj),
    ok;

del_proc_mon_out(Mref) ->
    true = ets:delete(?PROC_MON_OUT, Mref),
    ok.


%% -----------------------------------------------------------------------------
%% @private
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec proc_mon_out(partisan:remote_reference()) -> [proc_mon_out()].

proc_mon_out(Ref) ->
    ets:lookup(?PROC_MON_OUT, Ref).


%% -----------------------------------------------------------------------------
%% @private
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec add_proc_mon_out_idx(node(), partisan:remote_reference()) -> ok.

add_proc_mon_out_idx(Node, Mref) when is_atom(Node), not is_reference(Mref) ->
    _ = ets:insert(?PROC_MON_OUT_IDX, {Node, Mref}),
    ok.


%% -----------------------------------------------------------------------------
%% @private
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec del_proc_mon_out_idx(node(), partisan:remote_reference()) -> ok.

del_proc_mon_out_idx(Node, Mref) when is_atom(Node), not is_reference(Mref) ->
    _ = ets:delete_object(?PROC_MON_OUT_IDX, {Node, Mref}),
    ok.


%% -----------------------------------------------------------------------------
%% @private
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec proc_mon_out_indices(node()) -> [proc_mon_out_idx()].

proc_mon_out_indices(Node) when is_atom(Node) ->
    ets:lookup(?PROC_MON_OUT_IDX, Node).




%% -----------------------------------------------------------------------------
%% @private
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec new_node_monitor(node(), pid()) -> node_mon().

new_node_monitor(Node, Pid) ->
    {Node, Pid}.

%% -----------------------------------------------------------------------------
%% @private
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec add_node_monitor(node(), pid()) -> true.

add_node_monitor(Node, Pid) ->
    _ = ets:insert(?NODE_MON, new_node_monitor(Node, Pid)),
    true.


%% -----------------------------------------------------------------------------
%% @private
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec del_node_monitor(node(), pid()) -> true.

del_node_monitor(Node, Pid) ->
    _ = ets:delete_object(?NODE_MON, new_node_monitor(Node, Pid)),
    true.


%% -----------------------------------------------------------------------------
%% @private
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec node_monitors(node()) -> [node_mon()].

node_monitors(Node) ->
    ets:lookup(?NODE_MON, Node).



%% -----------------------------------------------------------------------------
%% @private
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec new_node_type_mon(pid(), node_type_mon_opts()) -> node_type_mon().

new_node_type_mon(Pid, {Type, Reason} = Opts) ->
    #partisan_node_type_mon{
        key = {Pid, erlang:phash2(Opts)},
        node_type = Type,
        nodedown_reason = Reason
    }.


%% -----------------------------------------------------------------------------
%% @private
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec add_node_type_mon(pid(), node_type_mon_opts()) -> ok.

add_node_type_mon(Pid, Opts) ->
    Obj = new_node_type_mon(Pid, Opts),
    _ = ets:insert(?NODE_TYPE_MON, Obj),
    ok.


%% -----------------------------------------------------------------------------
%% @private
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec del_note_type_mon(pid(), node_type_mon_opts()) -> ok.

del_note_type_mon(Pid, Opts) ->
    Obj = new_node_type_mon(Pid, Opts),
    _ = ets:delete_object(?NODE_TYPE_MON, Obj),
    ok.