src/partisan_monitor.erl

%% -------------------------------------------------------------------
%%
%% Copyright (c) 2016 Christopher Meiklejohn.  All Rights Reserved.
%% Copyright (c) 2022 Alejandro M. Ramallo. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License.  You may obtain
%% a copy of the License at
%%
%%   http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied.  See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------

%% -----------------------------------------------------------------------------
%% @doc This module is responsible for monitoring processes on remote nodes.
%%
%% <strong>YOU SHOULD NEVER USE the functions in this module.</strong>
%% Use the related functions in {@link partisan} instead.
%%
%% <blockquote class="warning">
%% <h4 class="warning">NOTICE</h4>
%% <p>At the moment this only works for
%% <code class="inline">partisan_pluggable_peer_service_manager</code> backend.
%% </p>
%% <p>Also, certain partisan_peer_service_manager implementations might not
%% support the
%% <code class="inline">partisan_peer_service_manager:on_up/2</code> and
%% <code class="inline">partisan_peer_service_manager:on_down/2</code>
%%  callbacks which we need for node monitoring, so in those cases this module
%%  will not work.
%% </p>
%% </blockquote>
%%

%%
%% @end
%% -----------------------------------------------------------------------------
-module(partisan_monitor).

-behaviour(partisan_gen_server).

-include("partisan.hrl").
-include("partisan_logger.hrl").

-define(REF_KEY, {?MODULE, ref}).
-define(DUMMY_MREF_KEY, {?MODULE, monitor_ref}).
-define(IS_ENABLED, persistent_term:get({?MODULE, enabled})).

%% stores process_monitor()
%% local process mrefs held on behalf of remote processes
-define(PROC_MON_TAB, partisan_monitor_process_mon).
%% refs grouped by node, used to notify/cleanup on a nodedown signal
%% stores process_monitor_idx()
-define(PROC_MON_NODE_IDX_TAB, partisan_monitor_process_mon_node_idx).
%% local cache for a remote process monitor
-define(PROC_MON_CACHE_TAB, partisan_monitor_process_mon_cache).
%% Local pids that are monitoring individual node
%% stores node_monitor()
-define(NODE_MON_TAB, partisan_monitor_node_mon).
%% Local pids that are monitoring all nodes
%% stores nodes_monitor()
-define(NODES_MON_TAB, partisan_monitor_nodes_mon).

-record(state, {
    %% wether monitoring is enabled, depends on partisan_peer_service_manager
    %% being used
    enabled                     ::  boolean(),
    %% We cache a snapshot of the nodes, so that if we are terminated we can
    %% notify the subscriptions
    nodes                       ::  sets:set(node())
}).


-type monitor_opts()            ::  list().
-type demonitor_opts()          ::  [flush | info].
-type process_monitor()         ::  {
                                        Mref :: reference(),
                                        MPid :: pid(),
                                        Owner :: partisan_remote_ref:p()
                                                | partisan_remote_ref:n()
                                    }.
-type process_monitor_idx()     ::  {node(), reference()}.
-type process_monitor_cache()   ::  {
                                        Node :: node(),
                                        Mref :: reference(),
                                        MPid :: partisan_remote_ref:p()
                                                | partisan_remote_ref:n(),
                                        Owner :: pid()

                                    }.
-type node_monitor()            ::  {node(), pid()}.
-type nodes_monitor()           ::  {
                                        {
                                            Owner :: pid(),
                                            Hash :: integer()
                                        },
                                        nodes_monitor_opts()
                                    }.
-type nodes_monitor_opts()      ::  {
                                        Type :: all | visible | hidden,
                                        InclReason :: boolean()
                                    }.


-export_type([monitor_opts/0]).
-export_type([demonitor_opts/0]).


% API
-export([demonitor/2]).
-export([monitor/2]).
-export([monitor_node/2]).
-export([monitor_nodes/2]).
-export([start_link/0]).

%% gen_server callbacks
-export([code_change/3]).
-export([handle_call/3]).
-export([handle_cast/2]).
-export([handle_info/2]).
-export([init/1]).
-export([terminate/2]).

-compile({no_auto_import, [monitor_node/2]}).
-compile({no_auto_import, [monitor/3]}).
-compile({no_auto_import, [demonitor/2]}).




%% =============================================================================
%% API
%% =============================================================================


%% -----------------------------------------------------------------------------
%% @doc Starts the partisan monitor server.
%% @end
%% -----------------------------------------------------------------------------
start_link() ->
    Opts = [
        {spawn_opt, ?PARALLEL_SIGNAL_OPTIMISATION([])}
    ],
    partisan_gen_server:start_link({local, ?MODULE}, ?MODULE, [], Opts).


%% -----------------------------------------------------------------------------
%% @doc When you attempt to monitor a remote process, it is not
%% guaranteed that you will receive the DOWN message. A few reasons for not
%% receiving the message are message loss, tree reconfiguration and the node
%% is no longer reachable.
%% The monitor options `Opts' are currently ignored.
%%
%% Failure:
%% <ul>
%% <li>`notalive' if the partisan_monitor process is not alive.</li>
%% <li>`not_implemented' if the partisan peer service manager does not support
%% the required capabilities required for monitoring.</li>
%% <li>`badarg' if any of the arguments is invalid.</li>
%% </ul>
%% @end
%% -----------------------------------------------------------------------------
-spec monitor(
    RemoteRef :: partisan_remote_ref:p() | partisan_remote_ref:n(),
    Opts :: monitor_opts()) -> partisan_remote_ref:r() | no_return().

monitor(RemoteRef, Opts) ->
    partisan_remote_ref:is_pid(RemoteRef)
        orelse partisan_remote_ref:is_name(RemoteRef)
        orelse error(badarg),

    %% We might be calling ourselves, in whicn case we need to skip monitoring
    %% This occurs becuase we are a partisan_gen_server ourselves and
    %% partisan_gen calls for monitoring.
    %% We return the static ref so that we can recognise when demonitor
    Skip =
        %% Is this a circular call?
        {registered_name, ?MODULE} ==
            erlang:process_info(self(), registered_name)
        %% Is this a remote partisan_monitor server?
        orelse partisan_remote_ref:is_name(RemoteRef, ?MODULE)
        %% Is someone trying to monitor us
        orelse is_self(RemoteRef),


    case Skip of
        true ->
            persistent_term:get(?DUMMY_MREF_KEY);

        false ->
            case partisan_remote_ref:is_local(RemoteRef) of
                true ->
                    Term = partisan_remote_ref:to_term(RemoteRef),
                    erlang:monitor(process, Term, Opts);

                false ->
                    Node = partisan_remote_ref:node(RemoteRef),
                    IsConnected = partisan_peer_connections:is_connected(Node),
                    monitor(RemoteRef, Opts, {connection, IsConnected})
            end
    end.


%% -----------------------------------------------------------------------------
%% @doc
%% Failure:
%% <ul>
%% <li>`notalive' if the partisan_monitor process is not alive.</li>
%% <li>`not_implemented' if the partisan peer service manager does not support
%% the required capabilities required for monitoring.</li>
%% <li>`badarg' if any of the arguments is invalid.</li>
%% </ul>
%% @end
%% -----------------------------------------------------------------------------
-spec demonitor(
    RemoteRef :: partisan_remote_ref:r(), Opts :: demonitor_opts()) ->
    boolean() | no_return().

demonitor(RemoteRef, Opts) ->
    partisan_remote_ref:is_reference(RemoteRef) orelse error(badarg),

    Skip =
        %% Is this a circular call?
        RemoteRef == persistent_term:get(?DUMMY_MREF_KEY)
        orelse {registered_name, ?MODULE} ==
            erlang:process_info(self(), registered_name),

    case Skip of
        true ->
            true;

        false ->
            %% We remove the local cache
            ok = del_process_monitor_cache(RemoteRef),

            %% We call the remote node to demonitor
            Node = partisan_remote_ref:node(RemoteRef),
            Cmd = {demonitor, RemoteRef, Opts},

            case call({?MODULE, Node}, Cmd) of
                {ok, Bool} ->
                    case lists:member(flush, Opts) of
                        true ->
                            receive
                                {_, RemoteRef, _, _, _} ->
                                    Bool
                            after
                                0 ->
                                    Bool
                            end;
                        false ->
                            Bool
                    end;
                {error, timeout} ->
                    true;
                {error, noproc} ->
                    true;
                {error, {nodedown, _}} ->
                    true;
                {error, badarg} ->
                    error(badarg)
            end
    end.


%% -----------------------------------------------------------------------------
%% @doc Monitor the status of the node `Node'. If Flag is true, monitoring is
%% turned on. If `Flag' is `false', monitoring is turned off.
%%
%% Making several calls to `monitor_node(Node, true)' for the same `Node'
%% is not an error; it results in as many independent monitoring instances as
%% the number of different calling processes i.e. If a process has made two
%% calls to `monitor_node(Node, true)' and `Node' terminates, only one
%% `nodedown' message is delivered to the process (this differs from {@link
%% erlang:monitor_node/2}).
%%
%% If `Node' fails or does not exist, the message `{nodedown, Node}' is
%% delivered to the calling process. If there is no connection to Node, a
%% `nodedown' message is delivered. As a result when using a membership
%% strategy that uses a partial view, you cannot monitor nodes that are not
%% members of the view.
%%
%% Failure:
%% <ul>
%% <li>`notalive' if the partisan_monitor process is not alive.</li>
%% <li>`not_implemented' if the partisan peer service manager does not support
%% the required capabilities required for monitoring.</li>
%% <li>`badarg' if any of the arguments is invalid.</li>
%% </ul>
%% @end
%% -----------------------------------------------------------------------------
-spec monitor_node(node() | partisan:node_spec(), boolean()) -> true.

monitor_node(#{name := Node}, Flag) ->
    monitor_node(Node, Flag);

monitor_node(Node, Flag) when is_atom(Node) ->
    case partisan_peer_connections:is_connected(Node) of
        true when Flag == true ->
            add_node_monitor(Node, self());
        false when Flag == true ->
            %% The node is down.
            %% We don't record the request and immediatly send a
            %% nodedown signal
            self() ! {nodedown, Node},
            true;
        _ when Flag == false ->
            del_node_monitor(Node, self())
    end.


%% -----------------------------------------------------------------------------
%% @doc The calling process subscribes or unsubscribes to node status change
%% messages. A nodeup message is delivered to all subscribing processes when a
%% new node is connected, and a nodedown message is delivered when a node is
%% disconnected.
%% If Flag is true, a new subscription is started. If Flag is false, all
%% previous subscriptions started with the same Options are stopped. Two option
%% lists are considered the same if they contain the same set of options.
%%
%% Notice that the following two disterl guarantees are NOT yet provided by
%% Partisan:
%% <ul>
%% <li>`nodeup' messages are delivered before delivery of any message from the
%% remote node passed through the newly established connection.</li>
%% <li>`nodedown' messages are not delivered until all messages from the remote
%% node that have been passed through the connection have been delivered.</li>
%% </ul>
%% @end
%% -----------------------------------------------------------------------------
-spec monitor_nodes(Flag :: boolean(), [partisan:monitor_nodes_opt()]) ->
    ok | error | {error, notalive | not_implemented | badarg}.

monitor_nodes(Flag, Opts0) when is_boolean(Flag), is_list(Opts0) ->
    case ?IS_ENABLED of
        true ->
            case parse_nodemon_opts(Opts0) of
                {hidden, _} ->
                    %% Do nothing as we do not have hidden nodes in Partisan
                    ok;
                Opts when Flag == true ->
                    add_nodes_mon(self(), Opts);
                Opts when Flag == false ->
                    del_nodes_mon(self(), Opts)
            end;
        false ->
            error
    end.



%% =============================================================================
%% GEN_SERVER CALLBACKS
%% =============================================================================



init([]) ->
    %% We trap exists so that we get a call to terminate w/reason shutdown when
    %% the supervisor terminates us when partisan_peer_service:manager() is
    %% terminated.
    erlang:process_flag(trap_exit, true),

    %% We subscribe to node status to implement node monitoring
    %% Certain partisan_peer_service_manager implementations might not support
    %% the on_up and on_down events which we need for node monitoring, so in
    %% those cases this module will not work.
    Enabled = subscribe_to_node_status(),
    _ = persistent_term:put({?MODULE, enabled}, Enabled),

    %% gen behaviours call monitor/2 and demonitor/1 so being ourselves a
    %% gen_server that would create a deadlock due to a circular call.
    %% We use static refs for those calls to avoid calling ourselves.
    _ = persistent_term:put(?REF_KEY, partisan_remote_ref:from_term(?MODULE)),
    _ = persistent_term:put(?DUMMY_MREF_KEY, partisan:make_ref()),

    TabOpts = [
        named_table,
        public,
        {keypos, 1},
        {write_concurrency, true},
        {read_concurrency, true}
    ],

    ?PROC_MON_TAB = ets:new(?PROC_MON_TAB, [set | TabOpts]),
    ?PROC_MON_NODE_IDX_TAB = ets:new(?PROC_MON_NODE_IDX_TAB, [bag | TabOpts]),
    ?PROC_MON_CACHE_TAB = ets:new(?PROC_MON_CACHE_TAB, [bag | TabOpts]),
    ?NODE_MON_TAB = ets:new(?NODE_MON_TAB, [duplicate_bag | TabOpts]),
    ?NODES_MON_TAB = ets:new(?NODES_MON_TAB, [set | TabOpts]),

    State = #state{
        enabled = Enabled,
        nodes = sets:new([{version, 2}])
    },

    {ok, State}.


handle_call(_, _, #state{enabled = false} = State) ->
    %% Functionality disabled
    %% The peer service manager does not implement on_up/down calls which we
    %% require to implement monitors
    {reply, {error, disabled}, State};

handle_call({monitor, RemoteRef, _}, {RemoteRef, _}, State) ->
    %% A circular call (partisan_gen)
    Reply = {ok, persistent_term:get(?DUMMY_MREF_KEY)},
    {reply, Reply, State};

handle_call({monitor, RemoteRef, _Opts}, {RemotePid, _}, State) ->
    %% A remote process (RemotePid) wants to monitor a process (RemoteRef) on
    %% this node.

    %% We did this check in monitor/2, but we double check again in case
    %% someone is calling partisan_gen_server:call directly.
    Reply =
        case is_self(RemoteRef) of
            true ->
                %% The case for a process monitoring this server, this server
                %% monitoring itself or another node's monitor server monitoring
                %% this one.
                {ok, persistent_term:get(?DUMMY_MREF_KEY)};

            false ->
                %% TODO Implement {tag, UserDefinedTag} option
                try
                    Node = partisan_remote_ref:node(RemotePid),

                    %% Can be a pid or registered name
                    LocalProcess = partisan_remote_ref:to_term(RemoteRef),

                    %% We monitor the process on behalf of the remote caller
                    Mref = erlang:monitor(process, LocalProcess),

                    %% We track the ref to match the 'DOWN' signal
                    ok = add_process_monitor(
                        Node, Mref, LocalProcess, RemotePid
                    ),

                    %% We reply the encoded monitor reference
                    {ok, partisan_remote_ref:from_term(Mref)}

                catch
                    error:badarg ->
                        {error, badarg}
                end
        end,

    {reply, Reply, State};

handle_call({demonitor, RemoteRef, _}, {RemoteRef, _}, State) ->
    %% A circular call: this server monitoring itself (partisan_gen)
    {reply, true, State};

handle_call({demonitor, RemoteRef, Opts}, {_RemotePid, _}, State) ->
    %% Remote process is requesting a demonitor
    case RemoteRef == persistent_term:get(?DUMMY_MREF_KEY) of
        true ->
            %% We skip.
            %% The case for a process monitoring this server or another node's
            %% monitor server monitoring this one.
            {reply, true, State};
        false ->
            Reply = do_demonitor(RemoteRef, Opts),
            {reply, Reply, State}
    end;

handle_call(_Msg, _From, State) ->
    {reply, {error, unsupported_call}, State}.


handle_cast(_Msg, State) ->
    {noreply, State}.


handle_info(_, #state{enabled = false} = State) ->
    %% Functionality disabled
    {noreply, State};

handle_info({'DOWN', Mref, process, _ServerRef, Reason}, State) ->
    case take_process_mon(Mref) of
        {Mref, PidOrName, OwnerRef} ->
            %% Local process down signal
            Node = partisan:node(OwnerRef),
            del_process_monitor_idx(Node, Mref),
            ok = send_process_down(OwnerRef, Mref, PidOrName, Reason);
        error ->
            ok
    end,

    {noreply, State};

handle_info({nodeup, Node}, State0) ->
    %% Either a net_kernel or Partisan signal

    ?LOG_DEBUG(#{
        description => "Got nodeup signal",
        node => Node
    }),

    ok = send_nodeup(Node),

    State = State0#state{
        nodes = sets:add_element(Node, State0#state.nodes)
    },

    {noreply, State};

handle_info({nodedown, Node}, State0) ->
    %% Either a net_kernel or Partisan signal

    ?LOG_DEBUG(#{
        description => "Got nodedown signal",
        node => Node
    }),

    ok = send_nodedown(Node, connection_closed),

    State = State0#state{
        nodes = sets:del_element(Node, State0#state.nodes)
    },

    {noreply, State};

handle_info(_Msg, State) ->
    {noreply, State}.


terminate(shutdown, State) ->
    send_self_down(shutdown, State);


terminate(_Reason, _State) ->
    ok.



code_change(_OldVsn, State, _Extra) ->
    {ok, State}.



%% =============================================================================
%% PRIVATE
%% =============================================================================



%% -----------------------------------------------------------------------------
%% @private
%% @doc  We subscribe to either net_kernel's or Partisan's node status
%% signals to update an ets table that tracks each node status.
%% @end
%% -----------------------------------------------------------------------------
subscribe_to_node_status() ->
    case partisan_config:get(connect_disterl, false) of
        true ->
            case net_kernel:monitor_nodes(true) of
                ok ->
                    true;
                error ->
                    error({monitor_nodes_failed, unknown});
                {error, Reason} ->
                    error({monitor_nodes_failed, Reason})
            end;

        false ->
            Me = self(),

            OnUp = fun(Node) -> Me ! {nodeup, Node} end,
            Res1 = partisan_peer_service:on_up('_', OnUp),

            OnDown = fun(Node) -> Me ! {nodedown, Node} end,
            Res2 = partisan_peer_service:on_down('_', OnDown),

            %% Not all service managers implement this capability so the result
            %% can be an error.
            Res1 =:= ok andalso Res2 =:= ok
    end.


%% -----------------------------------------------------------------------------
%% @private
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec monitor(
    RemoteRef :: partisan_remote_ref:p() | partisan_remote_ref:n(),
    Opts :: monitor_opts(),
    Reason :: atom() | {connection, boolean()}) ->
    partisan_remote_ref:r() | no_return().

monitor(RemoteRef, Opts, {connection, true}) ->
    %% We call the remote partisan_monitor process to
    %% request a monitor
    Node = partisan_remote_ref:node(RemoteRef),
    Cmd = {monitor, RemoteRef, Opts},
    case call({?MODULE, Node}, Cmd) of
        {ok, Mref} ->
            %% We add a local reference to the remote Mref so that we can
            %% simulate a process DOWN signal when node is down
            ok = add_process_monitor_cache(Node, Mref, RemoteRef, self()),
            Mref;
        {error, timeout} ->
            monitor(RemoteRef, Opts, timeout);
        {error, noproc} ->
            monitor(RemoteRef, Opts, noproc);
        {error, {nodedown, _}} ->
            monitor(RemoteRef, Opts, noconnection);
        {error, Reason} ->
            ErrOpts = [
                {error_info, #{
                    cause => Reason,
                    module => ?MODULE,
                    function => monitor
                }}
            ],
            erlang:error(Reason, [RemoteRef, Opts], ErrOpts)
    end;

monitor(RemoteRef, Opts, {connection, false}) ->
    monitor(RemoteRef, Opts, noconnection);

monitor(RemoteRef, _, Reason) ->
    %% We reply a transient ref but we do not record the
    %% request as we are immediatly sending a DOWN
    %% signal
    Mref = partisan:make_ref(),
    ok = send_process_down(self(), Mref, {ref, RemoteRef}, Reason),
    Mref.


%% -----------------------------------------------------------------------------
%% @private
%% @doc
%% @end
%% -----------------------------------------------------------------------------
do_demonitor(Term) ->
    do_demonitor(Term, []).


%% -----------------------------------------------------------------------------
%% @private
%% @doc
%% @end
%% -----------------------------------------------------------------------------
do_demonitor(Term, Opts) ->
    try
        Mref = decode_ref(Term),
        Bool = erlang:demonitor(Mref, Opts),

        case take_process_mon(Mref) of
            {Mref, _, OwnerRef} ->
                del_process_monitor_idx(partisan:node(OwnerRef), Mref);
            error ->
                ok
        end,

        {ok, Bool}

    catch
        _:_ ->
            {error, badarg}
    end.


%% -----------------------------------------------------------------------------
%% @private
%% @doc
%% @end
%% -----------------------------------------------------------------------------
call(ServerRef, Message) ->
    try
        partisan_gen_server:call(ServerRef, Message)
    catch
        exit:noproc ->
            exit(notalive)
    end.


%% @private
is_self(Pid) when is_pid(Pid) ->
    Pid == self();

is_self(RemoteRef) when is_tuple(RemoteRef); is_binary(RemoteRef) ->
    SelfRef = persistent_term:get(?REF_KEY),
    partisan_remote_ref:is_identical(RemoteRef, SelfRef).


%% @private
decode_ref(Ref) when is_reference(Ref) ->
    Ref;

decode_ref(RemoteRef) ->
    partisan_remote_ref:to_term(RemoteRef).



%% =============================================================================
%% PRIVATE: SIGNALING
%% =============================================================================



%% -----------------------------------------------------------------------------
%% @private
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec send_process_down(
    Dest :: partisan_remote_ref:p() | partisan_remote_ref:n() | pid() | atom(),
    Mref :: reference() | partisan_remote_ref:r(),
    Term :: {ref, partisan_remote_ref:p() | partisan_remote_ref:n()}
    | pid() | atom(),
    Reason :: any()) -> ok.

send_process_down(Dest, Mref0, {ref, PRef}, Reason) when is_reference(Mref0) ->
    Mref = partisan_remote_ref:from_term(Mref0),
    send_process_down(Dest, Mref, {ref, PRef}, Reason);

send_process_down(Dest, Mref, {ref, PRef}, Reason) ->
    Down = {
        'DOWN',
        Mref,
        process,
        PRef,
        Reason
    },
    partisan:forward_message(Dest, Down);

send_process_down(Dest, Mref, Term, Reason)
when is_pid(Term) orelse is_atom(Term) ->
    Ref = partisan_remote_ref:from_term(Term),
    send_process_down(Dest, Mref, {ref, Ref}, Reason);

send_process_down(Dest, Mref, {Name, Node}, Reason)
when is_atom(Name), is_atom(Node) ->
    Ref = partisan_remote_ref:from_term(Name, Node),
    send_process_down(Dest, Mref, {ref, Ref}, Reason).


%% -----------------------------------------------------------------------------
%% @private
%% @doc
%% @end
%% -----------------------------------------------------------------------------
send_nodeup(Node) when is_atom(Node) ->
    Msg = {nodeup, Node},
    ExtMsg = {nodeup, Node, []},

    ets:foldl(
        fun
            ({{Pid, _Hash}, {_Type, true}}, ok) ->
                partisan:forward_message(Pid, ExtMsg);
            ({{Pid, _Hash}, {_Type, false}}, ok) ->
                partisan:forward_message(Pid, Msg)
        end,
        ok,
        ?NODES_MON_TAB
    ).


%% -----------------------------------------------------------------------------
%% @private
%% @doc
%% @end
%% -----------------------------------------------------------------------------
send_nodedown(Node, Reason) when is_atom(Node) ->
    %% We assume Node has crashed.

    %% 1. So we send the process DOWN signal to all local processes monitoring
    %% remote processes on Node and remove the cache.
    lists:foreach(
        fun
            ({_, Mref, MPid, Owner}) ->
                %% A local index to a remote monitor
                ok = del_process_monitor_cache(Mref),
                send_process_down(Owner, Mref, {ref, MPid}, noconnection)
        end,
        process_monitor_caches(Node)
    ),

    %% 2. We demonitor all monitors with owners in Node (monitors originating
    %% in Node with targets in this node)
    %% REVIEW we could keep them in case the node comes back
    lists:foreach(
        fun({_, Ref}) -> do_demonitor(Ref) end,
        process_monitor_indices(Node)
    ),

    %% 3. We send a nodedown signal to the local processes monitoring Node.
    %% We then demonitor.
    Msg = {nodedown, Node},
    ExtMsg = {nodedown, Node, [{nodedown_reason, Reason}]},
    Pids = take_node_monitors(Node),
    [partisan:forward_message(Pid, Msg) || {_, Pid} <- Pids],


    %% 4. We send the nodedown signal to all processes monitoring ALL nodes
    ets:foldl(
        fun
            ({{Pid, _Hash}, {_Type, true}}, ok) ->
                partisan:forward_message(Pid, ExtMsg);
            ({{Pid, _Hash}, {_Type, false}}, ok) ->
                partisan:forward_message(Pid, Msg)
        end,
        ok,
        ?NODES_MON_TAB
    ).


%% -----------------------------------------------------------------------------
%% @private
%% @doc
%% @end
%% -----------------------------------------------------------------------------
send_self_down(Reason, State) ->
    %% We are shutting down or crashing so we do not cleanup ets, it will be
    %% done automatically.

    %% 1. We send the process DOWN signal to all local processes monitoring
    %% remote processes on remote nodes.
    ets:foldl(
        fun
            ({_, Mref, MPid, Owner}, ok) ->
                %% A local index to a remote monitor
                send_process_down(Owner, Mref, {ref, MPid}, noconnection)
        end,
        ok,
        ?PROC_MON_CACHE_TAB
    ),

    %% 2. We send a nodedown message to the local processes monitoring the
    %% individual nodes
    ets:foldl(
        fun({Node, Pid}, ok) ->
            partisan:forward_message(Pid, {nodedown, Node}),
            ok
        end,
        ok,
        ?NODE_MON_TAB
    ),

    %% 3. We send the nodedown signal to all processes monitor ALL nodes
    ExtReason = [{nodedown_reason, Reason}],

    Acc0 = ok,

    ets:foldl(
        fun
            ({{Pid, _Hash}, {_Type, true}}, Acc) ->
                sets:fold(
                    fun(N, IAcc) ->
                        partisan:forward_message(Pid, {nodedown, N, ExtReason}),
                        IAcc
                    end,
                    Acc,
                    State#state.nodes
                );

            ({{Pid, _Hash}, {_Type, false}}, Acc) ->
                sets:fold(
                    fun(N, IAcc) ->
                        partisan:forward_message(Pid, {nodedown, N}),
                        IAcc
                    end,
                    Acc,
                    State#state.nodes
                )
        end,
        Acc0,
        ?NODES_MON_TAB
    ).



%% =============================================================================
%% PRIVATE: TYPES
%% =============================================================================



%% -----------------------------------------------------------------------------
%% @private
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec new_process_mon
    (reference(), pid(), partisan_remote_ref:p()) ->
        process_monitor();
    (reference(), partisan_remote_ref:p(), pid()) ->
        process_monitor().

new_process_mon(Mref, MPid, Owner) when is_pid(MPid) orelse is_atom(MPid) ->
    {Mref, MPid, Owner};

new_process_mon(Mref, MPid, Owner) when is_pid(Owner) orelse is_atom(MPid) ->
    {Mref, MPid, Owner}.


%% -----------------------------------------------------------------------------
%% @private
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec new_process_monitor_idx(Node :: node(), Ref :: reference()) ->
    process_monitor_idx().

new_process_monitor_idx(Node, Mref) ->
    {Node, Mref}.


%% -----------------------------------------------------------------------------
%% @private
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec new_process_monitor_cache(
    Node :: node(),
    Mref :: partisan_remote_ref:r(),
    RPid :: partisan_remote_ref:p() | partisan_remote_ref:n(),
    Owner :: pid()) -> process_monitor_cache().

new_process_monitor_cache(Node, Mref, RPid, Owner)
when (is_pid(Owner) orelse is_atom(Owner)) ->
    %% This for tracking a remote monitor locally, so that if node goes down we
    %% have a way to send a process DOWN signal
    {Node, Mref, RPid, Owner}.


%% -----------------------------------------------------------------------------
%% @private
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec new_node_monitor(node(), pid()) -> node_monitor().

new_node_monitor(Node, Pid) ->
    {Node, Pid}.


%% -----------------------------------------------------------------------------
%% @private
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec new_nodes_mon(pid(), nodes_monitor_opts()) -> nodes_monitor().

new_nodes_mon(Pid, Opts) when is_tuple(Opts) ->
    {{Pid, erlang:phash2(Opts)}, Opts}.


%% -----------------------------------------------------------------------------
%% @private
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec parse_nodemon_opts([partisan:monitor_nodes_opt()]) ->
    nodes_monitor_opts().

parse_nodemon_opts(Opts0) ->
    Type = case lists:keyfind(node_type, 1, Opts0) of
        {node_type, Val} ->
            Val;
        false ->
            all
    end,
    InclReason = lists:member(nodedown_reason, Opts0),
    {Type, InclReason}.



%% =============================================================================
%% PRIVATE: STORAGE OPS
%% =============================================================================



%% -----------------------------------------------------------------------------
%% @private
%% @doc
%% @end
%% -----------------------------------------------------------------------------
add_process_monitor(Node, Mref, LocalProcess, RemoteOwner) ->
    %% 1. monitor ref -> {monitored pid, caller}
    _ = ets:insert_new(
        ?PROC_MON_TAB, new_process_mon(Mref, LocalProcess, RemoteOwner)
    ),

    %% 2. an index to fetch all refs associated with a remote node
    add_process_monitor_idx(Node, Mref).


%% -----------------------------------------------------------------------------
%% @private
%% @doc
%% @end
%% -----------------------------------------------------------------------------
take_process_mon(Mref) ->
    case ets:take(?PROC_MON_TAB, Mref) of
        [{Mref, _, _} = Existing] ->
            Existing;
        [] ->
            error
    end.



%% -----------------------------------------------------------------------------
%% @private
%% @doc
%% @end
%% -----------------------------------------------------------------------------
add_process_monitor_idx(Node, Mref) when is_reference(Mref) ->
    _ = ets:insert_new(
        ?PROC_MON_NODE_IDX_TAB, new_process_monitor_idx(Node, Mref)
    ),
    ok.


%% -----------------------------------------------------------------------------
%% @private
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec del_process_monitor_idx(node(), reference()) -> ok.

del_process_monitor_idx(Node, Mref) when is_reference(Mref) ->
    _ = ets:delete_object(?PROC_MON_NODE_IDX_TAB, {Node, Mref}),
    ok.

%% -----------------------------------------------------------------------------
%% @private
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec process_monitor_indices(node()) -> [process_monitor_idx()].

process_monitor_indices(Node) ->
    ets:lookup(?PROC_MON_NODE_IDX_TAB, Node).


%% -----------------------------------------------------------------------------
%% @private
%% @doc
%% @end
%% -----------------------------------------------------------------------------
add_process_monitor_cache(Node, Mref, RPid, Owner) ->
    Obj = new_process_monitor_cache(Node, Mref, RPid, Owner),
    _ = ets:insert_new(?PROC_MON_CACHE_TAB, Obj),
    ok.


%% -----------------------------------------------------------------------------
%% @private
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec del_process_monitor_cache(partisan_remote_ref:r()) -> ok.

del_process_monitor_cache(Mref) ->
    Node = partisan_remote_ref:node(Mref),
    Pattern = {Node, Mref, '_', '_'},
    true = ets:match_delete(?PROC_MON_CACHE_TAB, Pattern),
    ok.


%% -----------------------------------------------------------------------------
%% @private
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec process_monitor_caches(node()) -> [process_monitor_idx()].

process_monitor_caches(Node) ->
    ets:lookup(?PROC_MON_CACHE_TAB, Node).


%% -----------------------------------------------------------------------------
%% @private
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec add_node_monitor(node(), pid()) -> true.

add_node_monitor(Node, Pid) ->
    _ = ets:insert(?NODE_MON_TAB, new_node_monitor(Node, Pid)),
    true.


%% -----------------------------------------------------------------------------
%% @private
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec del_node_monitor(node(), pid()) -> true.

del_node_monitor(Node, Pid) ->
    _ = ets:delete_object(?NODE_MON_TAB, new_node_monitor(Node, Pid)),
    true.


%% -----------------------------------------------------------------------------
%% @private
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec take_node_monitors(node()) -> [node_monitor()].

take_node_monitors(Node) ->
    ets:take(?NODE_MON_TAB, Node).


%% -----------------------------------------------------------------------------
%% @private
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec add_nodes_mon(pid(), nodes_monitor_opts()) -> ok.

add_nodes_mon(Pid, Opts) ->
    Obj = new_nodes_mon(Pid, Opts),
    _ = ets:insert_new(?NODES_MON_TAB, Obj),
    ok.


%% -----------------------------------------------------------------------------
%% @private
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec del_nodes_mon(pid(), nodes_monitor_opts()) -> ok.

del_nodes_mon(Pid, Opts) ->
    Obj = new_nodes_mon(Pid, Opts),
    _ = ets:delete_object(?NODES_MON_TAB, Obj),
    ok.