src/partisan.erl

%% -------------------------------------------------------------------
%%
%% Copyright (c) 2015 Helium Systems, Inc. All Rights Reserved.
%% Copyright (c) 2016 Christopher Meiklejohn. All Rights Reserved.
%% Copyright (c) 2022 Alejandro M. Ramallo. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License.  You may obtain
%% a copy of the License at
%%
%%   http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied.  See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------

-module(partisan).

-include("partisan.hrl").
-include("partisan_logger.hrl").

-type monitor_opt()         ::  erlang:monitor_option()
                                | {channel, partisan:channel()}.
-type demonitor_opt()       ::  flush | info.
-type monitor_nodes_opt()   ::  nodedown_reason
                                | {node_type, visible | hidden | all}
                                | {channel, channel()}
                                | {channel_fallback, boolean()}.
-type send_dst()            ::  server_ref() | reference() | port().
-type monitor_process_id()  ::  erlang:monitor_process_identifier()
                                | partisan_remote_ref:p()
                                | partisan_remote_ref:n().

-type server_ref()          ::  partisan_peer_service_manager:server_ref().
-type forward_opts()        ::  partisan_peer_service_manager:forward_opts().
-type node_type()           ::  this | known | visible | connected | hidden
                                | all.
-type channel()             ::  atom().
-type channel_opts()        ::  #{
                                    parallelism := non_neg_integer(),
                                    monotonic => boolean(),
                                    compression => boolean() | 0..9
                                }.
-type actor()               ::  binary().
-type listen_addr()         ::  #{
                                    ip := inet:ip_address(),
                                    port := non_neg_integer()
                                }.
-type node_spec()           ::  #{
                                    name := node(),
                                    listen_addrs := [listen_addr()],
                                    channels := #{channel() => channel_opts()}
                                }.

-type message()             ::  term().
-type time()                ::  non_neg_integer().
-type send_after_dst()      ::  pid()
                                | (RegName :: atom())
                                | (Pid :: partisan_remote_ref:p())
                                | (RegName :: partisan_remote_ref:n())
                                | {RegName :: atom(), Node :: node()}.
-type send_after_opts()     ::  forward_opts() | [{abs, boolean()}].
-opaque tref()              ::  reference() | {tref_type(), reference()}.
-type tref_type()           ::  'once' | 'interval' | 'instant' | 'send_local'.


-export_type([actor/0]).
-export_type([channel/0]).
-export_type([channel_opts/0]).
-export_type([demonitor_opt/0]).
-export_type([forward_opts/0]).
-export_type([listen_addr/0]).
-export_type([message/0]).
-export_type([monitor_nodes_opt/0]).
-export_type([monitor_opt/0]).
-export_type([node_spec/0]).
-export_type([node_type/0]).
-export_type([server_ref/0]).
-export_type([tref/0]).


%% API
-export([start/0]).
-export([stop/0]).

-export([broadcast/2]).
-export([cancel_timer/1]).
-export([cancel_timer/2]).
-export([cast_message/2]).
-export([cast_message/3]).
-export([cast_message/4]).
-export([channel_opts/1]).
-export([default_channel/0]).
-export([demonitor/1]).
-export([demonitor/2]).
-export([disconnect_node/1]).
-export([exit/2]).
-export([forward_message/2]).
-export([forward_message/3]).
-export([forward_message/4]).
-export([is_alive/0]).
-export([is_connected/1]).
-export([is_connected/2]).
-export([is_fully_connected/1]).
-export([is_local/1]).
-export([is_local_name/1]).
-export([is_local_name/2]).
-export([is_local_pid/1]).
-export([is_local_pid/2]).
-export([is_local_reference/1]).
-export([is_local_reference/2]).
-export([is_pid/1]).
-export([is_process_alive/1]).
-export([is_reference/1]).
-export([is_self/1]).
-export([make_ref/0]).
-export([monitor/1]).
-export([monitor/2]).
-export([monitor/3]).
-export([monitor_node/2]).
-export([monitor_nodes/1]).
-export([monitor_nodes/2]).
-export([node/0]).
-export([node/1]).
-export([node_spec/0]).
-export([node_spec/1]).
-export([node_spec/2]).
-export([nodes/0]).
-export([nodes/1]).
-export([nodestring/0]).
-export([process_info/1]).
-export([process_info/2]).
-export([self/0]).
-export([self/1]).
-export([send/2]).
-export([send/3]).
-export([send_after/3]).
-export([send_after/4]).
-export([whereis/1]).


-export([spawn/2]).
-export([spawn/4]).
-export([spawn_monitor/2]).
-export([spawn_monitor/4]).


-compile({no_auto_import, [demonitor/2]}).
-compile({no_auto_import, [is_pid/1]}).
-compile({no_auto_import, [is_process_alive/1]}).
-compile({no_auto_import, [is_reference/0]}).
-compile({no_auto_import, [make_ref/0]}).
-compile({no_auto_import, [monitor/2]}).
-compile({no_auto_import, [monitor/3]}).
-compile({no_auto_import, [monitor_node/2]}).
-compile({no_auto_import, [node/0]}).
-compile({no_auto_import, [node/1]}).
-compile({no_auto_import, [nodes/1]}).
-compile({no_auto_import, [process_info/2]}).
-compile({no_auto_import, [self/0]}).
-compile({no_auto_import, [whereis/1]}).
-compile({no_auto_import, [spawn/2]}).
-compile({no_auto_import, [spawn/4]}).
-compile({no_auto_import, [spawn_monitor/2]}).
-compile({no_auto_import, [spawn_monitor/4]}).



%% =============================================================================
%% API
%% =============================================================================



%% -----------------------------------------------------------------------------
%% @doc Start the application.
%% @end
%% -----------------------------------------------------------------------------
start() ->
    application:ensure_all_started(partisan).


%% -----------------------------------------------------------------------------
%% @doc Stop the application.
%% @end
%% -----------------------------------------------------------------------------
stop() ->
    application:stop(partisan).



%% -----------------------------------------------------------------------------
%% @doc Returns a new partisan_remote_ref.
%% This is the same as calling
%% `partisan_remote_ref:from_term(erlang:make_ref())'.
%% @end
%% -----------------------------------------------------------------------------
-spec make_ref() -> partisan_remote_ref:r().

make_ref() ->
    partisan_remote_ref:from_term(erlang:make_ref()).


%% -----------------------------------------------------------------------------
%% @doc Returns the partisan encoded pid for the calling process.
%% This is equivalent to calling `partisan_remote_ref:from_term(self())'.
%%
%% Notice that this call is more expensive than its erlang counterpart. So you
%% might want to cache the result in your process state.
%% See function {@link self/1} which allows you to cache the result in the
%% process dictionary and take notice of the warning related to doing this on
%% an Erlang Shell process.
%% @end
%% -----------------------------------------------------------------------------
-spec self() -> partisan_remote_ref:p().

self() ->
    partisan_remote_ref:from_term(erlang:self()).


%% -----------------------------------------------------------------------------
%% @doc Returns the partisan encoded pid for the calling process.
%%
%% If `Opts' is the empty list, this is equivalent to calling
%% `partisan_remote_ref:from_term(self())'.
%%
%% Otherwise, if the option `cache' is present in `Opts' the function lazily
%% caches the result of calling {@link partisan_remote_ref:from_term/1} in
%% the process dictionary the first time and retrieves the value in subsequent
%% calls.
%% <blockquote class="warning">
%% <h4 class="warning">NOTICE</h4>
%% <p>You SHOULD avoid using this function in the Erlang Shell. This is because
%% when an Erlang Shell process crashes it will copy the contents of its
%% dictionary to the new shell process and thus you will end up with the wrong
%% partisan remote reference.
%% </p>
%% </blockquote>
%% @end
%% -----------------------------------------------------------------------------
-spec self(Opts :: [cache]) -> partisan_remote_ref:p().

self([]) ->
    partisan_remote_ref:from_term(erlang:self());

self([cache]) ->
    Key = {?MODULE, ?FUNCTION_NAME},

    case get(Key) of
        undefined ->
            Ref = partisan_remote_ref:from_term(erlang:self()),
            _ = put(Key, Ref),
            Ref;
        Ref ->
            Ref
    end.


%% -----------------------------------------------------------------------------
%% @deprecated Use monitor/2 instead.
%% @doc
%% @end
%% -----------------------------------------------------------------------------
monitor(Term) ->
    monitor(process, Term).


%% -----------------------------------------------------------------------------
%% @doc Sends a monitor request of type `Type' to the entity identified by
%% `Item'. If the monitored entity does not exist or it changes monitored
%% state, the caller of `monitor/2' is notified by a message on the following
%% format:
%% `{Tag, MonitorRef, Type, Object, Info}'
%%
%% This is the Partisan's equivalent to {@link erlang:monitor/2}.
%%
%% Failure: `notalive' if the `partisan_monitor' server is not alive.
%% @end
%% -----------------------------------------------------------------------------
-spec monitor
    (process, monitor_process_id()) ->
        reference() | partisan_remote_ref:r() | no_return();
    (port, erlang:monitor_port_identifier()) ->
        reference() |  no_return();
    (time_offset, clock_service) ->
        reference() | no_return().

monitor(Type, Item) ->
    monitor(Type, Item, []).


%% -----------------------------------------------------------------------------
%% @doc Sends a monitor request of type `Type' to the entity identified by
%% `Item'. If the monitored entity does not exist or it changes monitored
%% state, the caller of `monitor/2' is notified by a message on the following
%% format:
%%
%% `{Tag, MonitorRef, Type, Object, Info}'
%%
%% where it differs from the message sent by {@link erlang:monitor/3} only when
%% the item being monitored is a remote process identifier
%% {@link erlang:monitor/3} in which case:
%% <ul>
%% <li>`MonitorRef' is a {@link partisan_remote_ref:r()}</li>
%% <li>`Object' is a {@link partisan_remote_ref:p()} or {@link
%% partisan_remote_ref:n()}</li>
%% </ul>
%%
%% This is the Partisan's equivalent to {@link erlang:monitor/2}. It differs
%% from the Erlang implementation only when monitoring a `process'. For all
%% other cases (monitoring a `port' or `time_offset') this function calls
%% `erlang:monitor/2'.
%%
%% As opposed to {@link erlang:monitor/2} this function does not support
%% aliases.
%%
%% === Monitoring a `process` ===
%% Creates monitor between the current process and another process identified
%% by Item, which can be a `pid()' (local or remote), an atom `RegisteredName'
%% or a tuple `{RegisteredName, Node}'' for a registered process, located
%% elsewhere.
%%
%% In the case of a local `pid()' or a remote `pid()'
%% A process monitor by name resolves the `RegisteredName' to `pid()' or
%% `port()' only once at the moment of monitor instantiation, later changes to
%% the name registration will not affect the existing monitor.
%%
%% Failure: if the `partisan_monitor' server is not alive, a reference will be
%% returned with an immediate 'DOWN' signal with reason `notalive'.
%% @end
%% -----------------------------------------------------------------------------
-spec monitor
    (process, monitor_process_id(), [monitor_opt()]) ->
        reference() | partisan_remote_ref:r();
    (port, erlang:monitor_port_identifier(), [erlang:monitor_option()]) ->
        reference();
    (time_offset, clock_service, [erlang:monitor_option()]) ->
        reference().

monitor(process, RegisteredName, Opts) when is_atom(RegisteredName) ->
    erlang:monitor(process, RegisteredName, to_erl_opts(Opts));

monitor(process, Pid, Opts) when erlang:is_pid(Pid) ->
    erlang:monitor(process, Pid, to_erl_opts(Opts));

monitor(process, {RegisteredName, Node}, Opts)
when is_atom(RegisteredName) ->
    case partisan_config:get(connect_disterl) orelse partisan:node() == Node of
        true ->
            erlang:monitor(process, RegisteredName, to_erl_opts(Opts));
        false ->
            Ref = partisan_remote_ref:from_term(RegisteredName, Node),
            partisan_monitor:monitor(Ref, Opts)
    end;

monitor(process, Term, Opts) when erlang:is_pid(Term) orelse is_atom(Term) ->
    erlang:monitor(process, Term, to_erl_opts(Opts));

monitor(process, RemoteRef, Opts) ->
    partisan_monitor:monitor(RemoteRef, Opts);

monitor(Type, Term, Opts) when Type == port orelse Type == time_offset ->
    erlang:monitor(Type, Term, to_erl_opts(Opts)).


%% -----------------------------------------------------------------------------
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec demonitor(MonitorRef :: reference() | partisan_remote_ref:r()) -> true.

demonitor(Ref) ->
    demonitor(Ref, []).


%% -----------------------------------------------------------------------------
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec demonitor(
    MonitorRef :: reference() | partisan_remote_ref:r(),
    OptionList :: [demonitor_opt()]) -> boolean().

demonitor(Ref, Opts) when erlang:is_reference(Ref) ->
    erlang:demonitor(Ref, Opts);

demonitor(Ref, Opts) ->
    %% partisan_monitor:demonitor will raise a badarg if Ref is not valid
    partisan_monitor:demonitor(Ref, Opts).


%% -----------------------------------------------------------------------------
%% @doc Monitor the status of the node `Node'. If Flag is true, monitoring is
%% turned on. If `Flag' is `false', monitoring is turned off.
%%
%% Making several calls to `monitor_node(Node, true)' for the same `Node' from
%% is not an error; it results in as many independent monitoring instances as
%% the number of different calling processes i.e. If a process has made two
%% calls to `monitor_node(Node, true)' and `Node' terminates, only one
%% `nodedown' message is delivered to the process (this differs from {@link
%% erlang:monitor_node/2}).
%%
%% If `Node' fails or does not exist, the message `{nodedown, Node}' is
%% delivered to the calling process. If there is no connection to Node, a
%% `nodedown' message is delivered. As a result when using a membership
%% strategy that uses a partial view, you can not monitor nodes that are not
%% members of the view.
%%
%% If `Node' is the caller's node, the function returns `false'.
%% @end
%% -----------------------------------------------------------------------------
-spec monitor_node(node() | node_spec(), boolean()) -> boolean().

monitor_node(#{name := Node}, Flag) ->
    monitor_node(Node, Flag, []);

monitor_node(Node, Flag) ->
    monitor_node(Node, Flag, []).


%% -----------------------------------------------------------------------------
%% @doc Monitor the status of the node `Node'. If Flag is true, monitoring is
%% turned on. If `Flag' is `false', monitoring is turned off.
%%
%% Making several calls to `monitor_node(Node, true)' for the same `Node' from
%% is not an error; it results in as many independent monitoring instances as
%% the number of different calling processes i.e. If a process has made two
%% calls to `monitor_node(Node, true)' and `Node' terminates, only one
%% `nodedown' message is delivered to the process (this differs from {@link
%% erlang:monitor_node/2}).
%%
%% If `Node' fails or does not exist, the message `{nodedown, Node}' is
%% delivered to the calling process. If there is no connection to Node, a
%% `nodedown' message is delivered. As a result when using a membership
%% strategy that uses a partial view, you can not monitor nodes that are not
%% members of the view.
%%
%% If `Node' is the caller's node, the function returns `false'.
%% @end
%% -----------------------------------------------------------------------------
-spec monitor_node(
    Node :: node() | node_spec(),
    Flag :: boolean(),
    Options :: [allow_passive_connect]) -> true.

monitor_node(Node, Flag, Opts) ->
    case partisan_config:get(connect_disterl, false) of
        true ->
            erlang:monitor_node(Node, Flag, Opts);
        false ->
            %% No opts for partisan_monitor
            partisan_monitor:monitor_node(Node, Flag)
    end.


%% -----------------------------------------------------------------------------
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec monitor_nodes(Flag :: boolean()) -> ok | error | {error, term()}.

monitor_nodes(Flag) ->
    monitor_nodes(Flag, []).


%% -----------------------------------------------------------------------------
%% @doc The calling process subscribes or unsubscribes to node status change
%% messages. A nodeup message is delivered to all subscribing processes when a
%% new node is connected, and a nodedown message is delivered when a node is
%% disconnected.
%% @end
%% -----------------------------------------------------------------------------
-spec monitor_nodes(Flag :: boolean(), [monitor_nodes_opt()]) ->
    ok | error | {error, term()}.

monitor_nodes(Flag, Opts) ->
    case partisan_config:get(connect_disterl, false) of
        true ->
            %% Returns ok | error | {error, term()}.
            net_kernel:monitor_nodes(Flag, Opts);
        false ->
            partisan_monitor:monitor_nodes(Flag, Opts)
    end.


%% -----------------------------------------------------------------------------
%% @doc Returns the name of the local node.
%% @end
%% -----------------------------------------------------------------------------
-spec is_local(Arg) -> Result when
    Arg :: pid() | port() | reference()
            | partisan_remote_ref:p()
            | partisan_remote_ref:r(),
    Result :: boolean().

is_local(Arg) ->
    Node = node(Arg),
    Node == 'nonode@nohost' orelse Node =:= node().


%% -----------------------------------------------------------------------------
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec is_local_name(Arg :: atom() | partisan_remote_ref:n()) ->
    boolean() | no_return().

is_local_name(Arg) when is_atom(Arg) ->
    true;

is_local_name(Arg) ->
    partisan_remote_ref:is_local_name(Arg).


%% -----------------------------------------------------------------------------
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec is_local_name(
    Arg :: atom() | partisan_remote_ref:n(), Name :: atom()) ->
    boolean() | no_return().

is_local_name(Name, Name) when is_atom(Name) ->
    true;

is_local_name(Arg, Name) when is_atom(Name) ->
    partisan_remote_ref:is_local_name(Arg, Name);

is_local_name(Arg, _) ->
    is_local_name(Arg) orelse error(badarg),
    false.


%% -----------------------------------------------------------------------------
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec is_local_pid(Arg :: pid() | partisan_remote_ref:p()) ->
    boolean() | no_return().


is_local_pid(Pid) when erlang:is_pid(Pid) ->
    is_local(Pid);

is_local_pid(Arg) ->
    partisan_remote_ref:is_local_pid(Arg).


%% -----------------------------------------------------------------------------
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec is_local_pid(
    Arg :: pid() | partisan_remote_ref:p(), Pid :: pid()) ->
    boolean() | no_return().

is_local_pid(Pid, Pid) when erlang:is_pid(Pid) ->
    is_local(Pid);

is_local_pid(Arg, Pid) when erlang:is_pid(Pid) ->
    partisan_remote_ref:is_local_pid(Arg, Pid);

is_local_pid(Arg, _) ->
    is_local_pid(Arg) orelse error(badarg),
    false.


%% -----------------------------------------------------------------------------
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec is_local_reference(Arg :: reference() | partisan_remote_ref:r()) ->
    boolean() | no_return().

is_local_reference(Ref) when erlang:is_reference(Ref) ->
    is_local(Ref);

is_local_reference(Arg) ->
    partisan_remote_ref:is_local_reference(Arg).


%% -----------------------------------------------------------------------------
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec is_local_reference(
    Arg :: reference() | partisan_remote_ref:r(), LocalRef :: reference()) ->
    boolean() | no_return().

is_local_reference(Ref, Ref) when erlang:is_reference(Ref) ->
    is_local(Ref);

is_local_reference(Arg, Ref) when erlang:is_reference(Ref) ->
    partisan_remote_ref:is_local_reference(Arg, Ref);

is_local_reference(Arg, _) ->
    is_local_reference(Arg) orelse error(badarg),
    false.


%% -----------------------------------------------------------------------------
%% @doc Returns true if `Arg' is the caller's process identifier.
%% @end
%% -----------------------------------------------------------------------------
-spec is_self(Arg) -> Result when
    Arg :: pid() | partisan_remote_ref:p(),
    Result :: boolean().

is_self(Arg) when erlang:is_pid(Arg) ->
    Arg =:= erlang:self();

is_self(Arg) ->
    partisan_remote_ref:is_local_pid(Arg, erlang:self()).


%% -----------------------------------------------------------------------------
%% @doc Returns the name of the local node.
%% @end
%% -----------------------------------------------------------------------------
-spec node() -> node().

node() ->
    partisan_config:get(name).


%% -----------------------------------------------------------------------------
%% @doc Returns the name of the local node as a binary string.
%% @end
%% -----------------------------------------------------------------------------
-spec nodestring() -> binary().

nodestring() ->
    partisan_config:get(nodestring).


%% -----------------------------------------------------------------------------
%% @doc Returns the node where Arg originates. Arg can be a process identifier,
%% a reference, a port or a partisan remote reference.
%% @end
%% -----------------------------------------------------------------------------
-spec node(Term) -> Result when
    Term :: pid() | port() | reference()
            | partisan_remote_ref:p()
            | partisan_remote_ref:r(),
    Result :: node() | no_return().

node(Arg)
when erlang:is_pid(Arg) orelse erlang:is_reference(Arg) orelse is_port(Arg) ->
    Node = erlang:node(Arg),

    case partisan_config:get(connect_disterl) of
        true ->
            %% If node is down we will get 'nonode@nohost' and we should return
            %% this value, even if partisan:node() has been set
            Node;
        false ->
            case Node of
                'nonode@nohost' ->
                    %% Return the partisan node
                    node();
                Other ->
                    %% This is the case when the use has assigned a nodename
                    %% via vm.args but disabled erlang distribution
                    %% In this case erlang:node() == partisan:node()
                    Other
            end
    end;

node(RemoteRef) ->
    partisan_remote_ref:node(RemoteRef).


%% -----------------------------------------------------------------------------
%% @doc Returns a list of all nodes connected to this node via Partisan.
%% Equivalent to {@link erlang:nodes/1}.
%% Sames as calling `nodes(visible)'.
%%
%% Notice that if `connect_disterl' is `true' (possibly the case when testing),
%% this function will NOT return the disterl nodes. For that you still need to
%% call {@link erlang:nodes/1}.
%% @end
%% -----------------------------------------------------------------------------
-spec nodes() -> [node()].

nodes() ->
    nodes(visible).


%% -----------------------------------------------------------------------------
%% @doc Returns a list of all nodes connected to this node through normal
%% connections (that is, hidden nodes are not listed). Same as nodes(visible).
%% @end
%% -----------------------------------------------------------------------------
-spec nodes(Arg :: node_type()) -> [node()].

nodes(Arg) ->
    case partisan_config:get(connect_disterl) of
        true ->
            erlang:nodes(Arg);
        false ->
            partisan_peer_connections:nodes(Arg)
    end.


%% -----------------------------------------------------------------------------
%% @doc Returns the name of the local node.
%% @end
%% -----------------------------------------------------------------------------
-spec is_connected(NodeOrSpec :: node_spec() | node()) -> boolean().

is_connected(NodeOrSpec) ->
    partisan_peer_connections:is_connected(NodeOrSpec).


%% -----------------------------------------------------------------------------
%% @doc Returns the name of the local node.
%% @end
%% -----------------------------------------------------------------------------
-spec is_connected(NodeOrSpec :: node_spec() | node(), Channel :: channel()) ->
    boolean().

is_connected(NodeOrSpec, Channel) ->
    partisan_peer_connections:is_connected(NodeOrSpec, Channel).


%% -----------------------------------------------------------------------------
%% @doc Returns the name of the local node.
%% @end
%% -----------------------------------------------------------------------------
-spec is_fully_connected(NodeOrSpec :: node_spec() | node()) -> boolean().

is_fully_connected(NodeOrSpec) ->
    partisan_peer_connections:is_fully_connected(NodeOrSpec).


%% -----------------------------------------------------------------------------
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec disconnect_node(Node :: node()) -> boolean() | ignored.

disconnect_node(Node) ->
    case Node == node() of
        true ->
            partisan_peer_service:leave();
        false ->
            try node_spec(Node) of
                {ok, NodeSpec} ->
                    ok = partisan_peer_service:leave(NodeSpec),
                    true;
                {error, _} ->
                    false
            catch
                _:_ ->
                    false
            end
    end.


%% -----------------------------------------------------------------------------
%% @doc Returns true if the local node is alive (that is, if the node can be
%% part of a distributed system), otherwise false.
%% @end
%% -----------------------------------------------------------------------------
-spec is_alive() -> boolean().

is_alive() ->
    undefined =/= erlang:whereis(?PEER_SERVICE_MANAGER).


%% -----------------------------------------------------------------------------
%% @doc
%% Failes with `badarg' if `Arg' is a remote reference for another node.
%% @end
%% -----------------------------------------------------------------------------
-spec whereis(Arg :: atom() | partisan_remote_ref:n()) -> pid().

whereis(Arg) when is_atom(Arg) ->
    erlang:whereis(Arg);

whereis(Arg) ->
    case partisan_remote_ref:to_term(Arg) of
        Ref when is_atom(Ref) ->
            erlang:whereis(Ref);
        _ ->
            error(badarg)
    end.


%% -----------------------------------------------------------------------------
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec process_info(Arg :: pid() | partisan_remote_ref:p()) ->
    [tuple()] | undefined.

process_info(Arg) when erlang:is_pid(Arg) ->
    erlang:process_info(Arg);

process_info(Arg) ->
    try partisan_remote_ref:to_term(Arg) of
        Term when erlang:is_pid(Term) ->
            erlang:process_info(Term);
        _ ->
            throw(badarg)
    catch
        _:_ ->
            error(badarg)
    end.


%% -----------------------------------------------------------------------------
%% @doc
%% @end
%%
%% -----------------------------------------------------------------------------
-spec process_info(
    Arg :: pid() | partisan_remote_ref:p(),
    Item :: atom() | [atom()]) -> [tuple()] | undefined.

process_info(Arg, ItemOrItems) when erlang:is_pid(Arg) ->
    erlang:process_info(Arg, ItemOrItems);

process_info(Arg, ItemOrItems) ->
    try partisan_remote_ref:to_term(Arg) of
        Term when erlang:is_pid(Term) ->
            erlang:process_info(Term, ItemOrItems);
        _ ->
            throw(badarg)
    catch
        _:_ ->
            error(badarg)
    end.


%% -----------------------------------------------------------------------------
%% @doc Returns the node specification of the local node.
%% This is the information required when other nodes wish to join this node
%% (See {@link partisan_peer_service:join/1}).
%%
%% Notice that the values of the keys must be sorted for the peer service to be
%% able to compare node specifications, and prevent duplicates in the
%% membership view data structure. This is important in case you find
%% yourself building this representation manually in order to implement a
%% particular orchestration strategy. As Erlang maps are naturally sorted, the
%% only property that you need to keep sorted is `listen_addrs' as it is
%% implemented as a list.
%% @end
%% -----------------------------------------------------------------------------
-spec node_spec() -> node_spec().

node_spec() ->
    %% Channels and ListenAddrs are sorted already
    #{
        name => node(),
        listen_addrs => partisan_config:get(listen_addrs),
        channels => partisan_config:get(channels)
    }.


%% -----------------------------------------------------------------------------
%% @doc Return the partisan node_spec() for node named `Node'.
%%
%% This function retrieves the `node_spec()' from the remote node using RPC and
%% returns `{error, Reason}' if the RPC fails. Otherwise, assumes the node is
%% running on the same host and returns a `node_spec()' with with nodename
%% `Name' and host 'Host' and same metadata as `myself/0'.
%%
%% If configuration option `connect_disterl' is `true', the RPC will be
%% implemented using the `rpc' module. Otherwise it will use `partisan_rpc'.
%%
%% You should only use this function when distributed erlang is enabled
%% (configuration option `connect_disterl' is `true') or if the node is running
%% on the same host and you are using this for testing purposes as there is no
%% much sense in running a partisan cluster on a single host.
%% @end
%% -----------------------------------------------------------------------------
-spec node_spec(node()) -> {ok, node_spec()} | {error, Reason :: any()}.

node_spec(Node) when is_atom(Node) ->
    node_spec(Node, #{}).


%% -----------------------------------------------------------------------------
%% @doc Return the tuple `{ok, node_spec()' for node named `Node' or the tuple
%% `{error, Reason}'.
%%
%% This function first checks If there is a partisan connection to `Node', if
%% so returns the cached specification that was used for creating the
%% connection. If no connection is present (the case for a p2p topology), then
%% it tries to use @@link partisan_rpc} to retrieve the node specification from
%% the remote node. This later alternative requires the partisan configuration
%% `forward_opts` to have `broadcast' and `transitive' enabled.
%%
%% NOTICE: At the moment partisan_rpc might not work correctly w/ a p2p
%% topology.
%% @end
%% -----------------------------------------------------------------------------
-spec node_spec(
    Node :: list() | node(),
    Opts :: #{rpc_timeout => timeout()}) ->
    {ok, node_spec()} | {error, Reason :: any()}.

node_spec(Node, Opts) when is_list(Node) ->
    node_spec(list_to_node(Node), Opts);

node_spec(Node, Opts) when is_atom(Node), is_map(Opts) ->
    Timeout = maps:get(rpc_timeout, Opts, 5000),

    case partisan:node() of
        Node ->
            {ok, partisan:node_spec()};

        _ ->
            case is_connected(Node) of
                true ->
                    {ok, Info} = partisan_peer_connections:info(Node),
                    {ok, partisan_peer_connections:node_spec(Info)};

                false ->
                    M = ?MODULE,
                    F = node_spec,
                    A = [],
                    case partisan_rpc:call(Node, M, F, A, Timeout) of
                        #{name := Node} = Spec ->
                            {ok, Spec};
                        {badrpc, Reason} ->
                            {error, Reason}
                    end
            end
    end.


%% -----------------------------------------------------------------------------
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec default_channel() -> channel().

default_channel() ->
    ?DEFAULT_CHANNEL.


%% -----------------------------------------------------------------------------
%% @doc Returns a channel with name `Name'.
%% Fails if a channel named `Name' doesn't exist.
%% @end
%% -----------------------------------------------------------------------------
-spec channel_opts(Name :: channel()) -> channel_opts() | no_return().

channel_opts(Name) when is_atom(Name) ->
    partisan_config:channel_opts(Name).


%% -----------------------------------------------------------------------------
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec is_pid(pid() | partisan_remote_ref:p()) ->
    boolean() | no_return().

is_pid(Pid) when erlang:is_pid(Pid) ->
    true;

is_pid(RemoteRef) ->
    partisan_remote_ref:is_pid(RemoteRef).


%% -----------------------------------------------------------------------------
%% @doc Returns the name of the local node.
%% @end
%% -----------------------------------------------------------------------------
-spec is_reference(reference() | partisan_remote_ref:r()) ->
    boolean() | no_return().

is_reference(Pid) when erlang:is_reference(Pid) ->
    true;

is_reference(RemoteRef) ->
    partisan_remote_ref:is_reference(RemoteRef).


%% -----------------------------------------------------------------------------
%% @doc Returns the name of the local node.
%% @end
%% -----------------------------------------------------------------------------
-spec is_process_alive(pid() | partisan_remote_ref:p()) ->
    boolean() | no_return().

is_process_alive(Pid) when erlang:is_pid(Pid) ->
    erlang:is_process_alive(Pid);

is_process_alive(RemoteRef) ->
    case partisan_remote_ref:is_local(RemoteRef) of
        true ->
            erlang:is_process_alive(partisan_remote_ref:to_term(RemoteRef));
        false ->
            Node = node(RemoteRef),
            partisan_rpc:call(
                Node, ?MODULE, is_process_alive, [RemoteRef], 5000
            )
    end.


%% -----------------------------------------------------------------------------
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec exit(Pid :: pid() | partisan_remote_ref:p(), Reason :: term()) -> true.

exit(Pid, Reason) when erlang:is_pid(Pid) ->
    erlang:exit(Pid, Reason);

exit(RemoteRef, Reason) ->
    try partisan_remote_ref:to_term(RemoteRef) of
        Pid when erlang:is_pid(Pid) ->
            erlang:exit(Pid, Reason);
        Name when is_atom(Name) ->
            erlang:exit(whereis(Name), Reason)
    catch
        error:badarg ->
            %% Not local
            Node = node(RemoteRef),
            partisan_rpc:call(Node, ?MODULE, exit, [RemoteRef, Reason], 5000)
    end.


%% -----------------------------------------------------------------------------
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec send(Dest :: send_dst(), Msg :: message()) -> ok.

send(Dest, Msg) ->
    send(Dest, Msg, []).


%% -----------------------------------------------------------------------------
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec send(Dest :: send_dst(), Msg :: message(), Opts :: forward_opts()) -> ok.


send(Dest, Msg, Opts)
when (erlang:is_pid(Dest) andalso erlang:node(Dest) == erlang:node())
orelse is_atom(Dest) ->
    %% Local send
    erlang:send(Dest, Msg, to_erl_opts(Opts));

send({RegName, Node} = Dest, Msg, Opts)
when is_atom(RegName), is_atom(Node), Node == erlang:node() ->
    %% Local send
    erlang:send(Dest, Msg, to_erl_opts(Opts));

send({RegName, Node}, Msg, Opts)
when is_atom(RegName), is_atom(Node) ->
    forward_message(Node, RegName, Msg, Opts);

send(Dest, Msg, Opts) ->
    forward_message(Dest, Msg, Opts).


%% -----------------------------------------------------------------------------
%% @doc Equivalent to calling `send_after(Time, Dest, Msg, [])'.
%% @end
%% -----------------------------------------------------------------------------
-spec send_after(
    Time :: time(),
    Destination :: send_after_dst(),
    Msg :: message()) -> TRef :: reference().

send_after(Time, Dest, Msg) ->
    send_after(Time, Dest, Msg, []).


%% -----------------------------------------------------------------------------
%% @doc Equivalent to the native `erlang:send_after/4'.
%% It calls the native implementation for local destinations. For remote
%% destinations it spawns a process that uses a timer and accepts cancellation (
%% via `cancel_timer/1,2'), so this is less efficient than the native
%% implementation.
%% @end
%% -----------------------------------------------------------------------------
-spec send_after(
    Time :: time(),
    Destination :: send_after_dst(),
    Message :: message(),
    Opts :: send_after_opts()) -> TRef :: reference().

send_after(Time, Dest, Msg, Opts)
when ?IS_VALID_TIME(Time) andalso is_list(Opts) andalso
(
    (erlang:is_pid(Dest) andalso erlang:node(Dest) == erlang:node())
    orelse is_atom(Dest)
) ->
    %% Local send
    erlang:send_after(Time, Dest, Msg, Opts);


send_after(Time, {RegName, Node} = Dest, Msg, Opts)
when ?IS_VALID_TIME(Time) andalso
is_list(Opts) andalso
is_atom(RegName) andalso
is_atom(Node) andalso
Node == erlang:node() ->
    %% Local send
    erlang:send_after(Time, Dest, Msg, Opts);

send_after(Time, Dest, Msg, Opts)
when ?IS_VALID_TIME(Time) andalso is_list(Opts) ->
    case partisan_remote_ref:is_type(Dest) of
        true ->
            Caller = erlang:self(),
            Pid = spawn(
                fun() ->
                    %% Setup an alias which the receiver can use to send us an
                    %% cancel message
                    Alias = alias(),
                    Me = erlang:self(),

                    Caller ! {send_after_init, Me, Alias},

                    %% Set the timer and wait for the timeout or cancel message
                    TimerRef = erlang:start_timer(Time, Me, send),

                    try
                        receive
                            {timeout, TimerRef, send} ->
                                catch partisan:send(Dest, Msg, Opts),
                                ok;

                            {cancel, Alias, Pid, Info} ->
                                CancelOpts = [{info, Info}],
                                Result = erlang:cancel_timer(
                                    TimerRef, CancelOpts
                                ),
                                case partisan_util:get(info, Opts, true) of
                                    true ->
                                        Canceled = {
                                            send_after_canceled, Alias, Result
                                        },
                                        Pid ! Canceled;
                                    false ->
                                        ok
                                end
                        end
                    catch
                        _:_ ->
                            ok
                    after
                        unalias(Alias)
                    end

                end
            ),

            receive
                {send_after_init, Pid, Ref} ->
                    Ref
            after
                3000 ->
                    error(timeout)
            end;

        false ->
            Info = #{
                cause => #{
                    2 =>
                        "should be a local pid, local registered name, "
                        "a tuple containing registered name and node, "
                        "or a partisan_remote_ref for a remote pid or "
                        "registered name."
                }
            },
            erlang:error(
                badarg, [Time, Dest, Msg, Opts], [{error_info, Info}]
            )
    end;

send_after(Time, Dest, Msg, Opts) when not ?IS_VALID_TIME(Time) ->
    Info = #{cause => #{1 => "should be a pos_integer"}},
    erlang:error(
        badarg, [Time, Dest, Msg, Opts], [{error_info, Info}]
    );

send_after(Time, Dest, Msg, Opts) when not is_list(Opts) ->
    Info = #{cause => #{4 => "should be a list"}},
    erlang:error(
        badarg, [Time, Dest, Msg, Opts], [{error_info, Info}]
    ).


%% -----------------------------------------------------------------------------
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec cancel_timer(TRef :: tref()) ->
    ok | time() | false.

cancel_timer(TRef) ->
    cancel_timer(TRef, []).


-spec cancel_timer(Ref :: reference(), Opts :: list()) ->
    ok | time() | false.

cancel_timer(Ref, Opts) when erlang:is_reference(Ref), is_list(Opts) ->
    Me = erlang:self(),

    case partisan_util:get(async, Opts, false) of
        true ->
            _ = spawn(fun() -> do_cancel_timer(Ref, Opts, Me) end),
            ok;

        false ->
            do_cancel_timer(Ref, Opts, Me)
    end.


%% -----------------------------------------------------------------------------
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec spawn(Node :: node(), Fun :: fun(() -> any())) -> partisan_remote_ref:p().

spawn(Node, Fun) ->
    case Node == node() of
        true ->
            Pid = erlang:spawn(Fun),
            partisan_remote_ref:from_term(Pid);
        false ->
            case partisan_rpc:call(Node, erlang, spawn, [Fun], 5000) of
                {badrpc, Reason} ->
                    ?LOG_WARNING(#{
                        description =>
                            "Can not start erlang:apply/1 on remote node",
                        node => Node
                    }),
                    Pid = erlang:spawn(
                        fun() ->
                            erlang:exit(process_exit_reason(Reason))
                        end
                    ),
                    partisan_remote_ref:from_term(Pid);

                Pid when erlang:is_pid(Pid) ->
                    partisan_remote_ref:from_term(Pid, Node);

                Encoded ->
                    %% We assume it is a partisan_remote_ref, this is the case
                    %% when pid_encoding is enabled
                    Encoded
            end
    end.



%% -----------------------------------------------------------------------------
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec spawn(
    Node :: node(), Mod :: module(), Function :: atom(), Args :: [term()]) -> partisan_remote_ref:p().

spawn(Node, Module, Function, Args) ->
    case Node == node() of
        true ->
            Pid = erlang:spawn(Module, Function, Args),
            partisan_remote_ref:from_term(Pid);
        false ->
            SpawnArgs = [Module, Function, Args],
            case partisan_rpc:call(Node, erlang, spawn, SpawnArgs, 5000) of
                {badrpc, Reason} ->
                    ?LOG_WARNING(#{
                        description =>
                            "Can not start erlang:apply/1 on remote node",
                        node => Node
                    }),
                    Pid = erlang:spawn(
                        fun() ->
                            erlang:exit(process_exit_reason(Reason))
                        end
                    ),
                    partisan_remote_ref:from_term(Pid);

                Pid when erlang:is_pid(Pid) ->
                    partisan_remote_ref:from_term(Pid, Node);

                Encoded ->
                    %% We assume it is a partisan_remote_ref, this is the case
                    %% when pid_encoding is enabled
                    Encoded
            end
    end.


%% -----------------------------------------------------------------------------
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec spawn_monitor(Node :: node(), Fun :: fun(() -> any())) ->
    partisan_remote_ref:p().

spawn_monitor(Node, Fun) ->
    Pid = spawn(Node, Fun),
    monitor(process, Pid).



%% -----------------------------------------------------------------------------
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec spawn_monitor(
    Node :: node(), Mod :: module(), Function :: atom(), Args :: [term()]) -> partisan_remote_ref:p().

spawn_monitor(Node, Module, Function, Args) ->
    Pid = spawn(Node, Module, Function, Args),
    monitor(process, Pid).



%% -----------------------------------------------------------------------------
%% @doc Cast message to a remote ref
%% @end
%% -----------------------------------------------------------------------------
-spec cast_message(
    ServerRef :: server_ref(),
    Msg :: message()) -> ok.

cast_message(Term, Message) ->
    ?PEER_SERVICE_MANAGER:cast_message(Term, Message).


%% -----------------------------------------------------------------------------
%% @doc Cast message to registered process on the remote side.
%% @end
%% -----------------------------------------------------------------------------
-spec cast_message(
    ServerRef :: server_ref(),
    Msg :: message(),
    Opts :: forward_opts()) -> ok.

cast_message(ServerRef, Msg, Opts) ->
    ?PEER_SERVICE_MANAGER:cast_message(ServerRef, Msg, Opts).


%% -----------------------------------------------------------------------------
%% @doc Cast message to registered process on the remote side.
%% @end
%% -----------------------------------------------------------------------------
-spec cast_message(
    Node :: node(),
    ServerRef :: server_ref(),
    Msg :: message(),
    Opts :: forward_opts()) -> ok.

cast_message(Node, ServerRef, Message, Options) ->
    ?PEER_SERVICE_MANAGER:cast_message(Node, ServerRef, Message, Options).


%% -----------------------------------------------------------------------------
%% @doc Forward message to registered process on the remote side.
%% @end
%% -----------------------------------------------------------------------------
-spec forward_message(
    ServerRef :: server_ref(),
    Msg :: message()) -> ok.

forward_message(ServerRef, Message) ->
    ?PEER_SERVICE_MANAGER:forward_message(ServerRef, Message).


%% -----------------------------------------------------------------------------
%% @doc Forward message to registered process on the remote side.
%% @end
%% -----------------------------------------------------------------------------
-spec forward_message(
    ServerRef :: server_ref(),
    Msg :: message(),
    Opts :: forward_opts()) -> ok.

forward_message(ServerRef, Message, Opts) ->
    ?PEER_SERVICE_MANAGER:forward_message(ServerRef, Message, Opts).


%% -----------------------------------------------------------------------------
%% @doc Forward message to registered process on the remote side.
%% @end
%% -----------------------------------------------------------------------------
-spec forward_message(
    Node :: node(),
    ServerRef :: server_ref(),
    Msg :: message(),
    Opts :: forward_opts()) -> ok.

forward_message(Node, ServerRef, Message, Opts) ->
    ?PEER_SERVICE_MANAGER:forward_message(Node, ServerRef, Message, Opts).


%% -----------------------------------------------------------------------------
%% @doc Broadcasts a message originating from this node.
%%
%% The message will be delivered to each node at least once. The `Mod' passed
%% is responsible for handling the message on remote nodes as well as providing
%% some other information both locally and and on other nodes.
%% `Mod' must be loaded on all members of the clusters and implement the
%% `partisan_plumtree_broadcast_handler' behaviour.
%% @end
%% -----------------------------------------------------------------------------
-spec broadcast(any(), module()) -> ok.

broadcast(Broadcast, Mod) ->
    partisan_plumtree_broadcast:broadcast(Broadcast, Mod).



%% =============================================================================
%% PRIVATE
%% =============================================================================



%% @private
list_to_node(NodeStr) ->
    erlang:list_to_atom(lists:flatten(NodeStr)).


%% @private
to_erl_opts(Opts0) when is_list(Opts0) ->
    case lists:keytake(channel, 1, Opts0) of
        {value, _, Opts} ->
            Opts;
        false ->
            Opts0
    end.


%% @private
maybe_wait_for_cancel_timer_result(Ref, Info, undefined) ->
    maybe_wait_for_cancel_timer_result(Ref, Info, erlang:self());

maybe_wait_for_cancel_timer_result(Ref, Info, Pid) when erlang:is_pid(Pid) ->
    maybe_wait_for_cancel_timer_result(Ref, Info, Pid, Pid == erlang:self()).


%% @private
maybe_wait_for_cancel_timer_result(_, false, _, _) ->
    ok;

maybe_wait_for_cancel_timer_result(Ref, true, _, true) ->
    receive
        {send_after_canceled, Ref, Result} ->
            Result
    after
        500 ->
            false
    end;

maybe_wait_for_cancel_timer_result(Ref, true, Pid, _) ->
    receive
        {send_after_canceled, Ref, Result} ->
            Pid ! {cancel_timer, Ref, Result}
    after
        500 ->
            Pid ! {cancel_timer, Ref, false}
    end.


%% @private
do_cancel_timer(Ref, Opts, Pid) ->
    Me = erlang:self(),
    Info = partisan_util:get(info, Opts, true),

    case erlang:cancel_timer(Ref) of
        false ->
            %% Timer not found, maybe is our custom solution for send_after
            Info = partisan_util:get(info, Opts, true),
            Ref ! {cancel, Ref, Me, Info},
            maybe_wait_for_cancel_timer_result(Ref, Info, Pid);

        Result when Info == true, erlang:is_pid(Pid) ->
            Pid ! {cancel_timer, Ref, Result};

        Result when Info == true ->
            Result;

        _ ->
            ok
    end.


%% @private
process_exit_reason(disconnected) ->
    noconnection;
process_exit_reason(not_yet_connected) ->
    noconnection;
process_exit_reason(_) ->
    noproc.