src/partisan_peer_service.erl

%% -----------------------------------------------------------------------------
%%
%% Copyright (c) 2014 Helium Systems, Inc.  All Rights Reserved.
%% Copyright (c) 2016 Christopher Meiklejohn.  All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License.  You may obtain
%% a copy of the License at
%%
%%   http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied.  See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -----------------------------------------------------------------------------

%% -----------------------------------------------------------------------------
%% @doc This modules implements the Peer Service API.
%% All functions in this module forward the invocation to the configured
%% peer service manager (option `peer_service_manager') which must be
%% one of the Partisan's managers implementing
%% {@link partisan_peer_service_manager}, i.e. one of:
%% <ul>
%% <li>`partisan_pluggable_peer_service_manager'</li>
%% <li>`partisan_client_server_peer_service_manager'</li>
%% <li>`partisan_hyparview_peer_service_manager'</li>
%% <li>`partisan_static_peer_service_manager'</li>
%% </ul>
%%
%% Each node running Partisan listens for connections on a particular IP
%% address and port. This is the information that is required when other nodes
%% wish to join this node.
%% @end
%% -----------------------------------------------------------------------------
-module(partisan_peer_service).

-include("partisan_logger.hrl").
-include("partisan.hrl").

-export([add_sup_callback/1]).
-export([broadcast_members/0]).
-export([broadcast_members/1]).
-export([cancel_exchanges/1]).
-export([connections/0]).
-export([decode/1]).
-export([exchanges/0]).
-export([exchanges/1]).
-export([get_local_state/0]).
-export([inject_partition/2]).
-export([join/1]).
-export([leave/0]).
-export([leave/1]).
-export([manager/0]).
-export([member/1]).
-export([members/0]).
-export([members_for_orchestration/0]).
-export([on_down/2]).
-export([on_down/3]).
-export([on_up/2]).
-export([on_up/3]).
-export([partitions/0]).
-export([reserve/1]).
-export([resolve_partition/1]).
-export([stop/0]).
-export([stop/1]).
-export([sync_join/1]).
-export([update_members/1]).



%% =============================================================================
%% API
%% =============================================================================



%% -----------------------------------------------------------------------------
%% @doc Stop
%% @end
%% -----------------------------------------------------------------------------
stop() ->
    stop("received stop request").


%% -----------------------------------------------------------------------------
%% @doc Stop
%% @end
%% -----------------------------------------------------------------------------
stop(Reason) ->
    ?LOG_NOTICE(#{
        description => "Peer service stopping",
        reason => Reason
    }),
    init:stop().


%% -----------------------------------------------------------------------------
%% @doc Return current peer service manager for this
%% @end
%% -----------------------------------------------------------------------------
-spec manager() -> module().

manager() ->
    ?PEER_SERVICE_MANAGER.


%% -----------------------------------------------------------------------------
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec join(partisan:node_spec() | node() | list) ->
    ok | {error, self_join | any()}.

join(#{name := Node} = NodeSpec) ->
    case partisan:node() of
        Node ->
            {error, self_join};
        _ ->
            ?PEER_SERVICE_MANAGER:join(NodeSpec)
    end;

join(Node) ->
    case partisan:node_spec(Node) of
        {ok, NodeSpec} ->
            join(NodeSpec);
        {error, _} = Error ->
            Error
    end.


%% -----------------------------------------------------------------------------
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec sync_join(partisan:node_spec()) ->
    ok | {error, self_join | not_implemented | any()}.

sync_join(#{name := Node} = NodeSpec) ->
    case partisan:node() of
        Node ->
            {error, self_join};
        _ ->
            ?PEER_SERVICE_MANAGER:sync_join(NodeSpec)
    end.


%% -----------------------------------------------------------------------------
%% @doc Leave the cluster. We will not be able to re-join the cluster, we must
%% be restarted first.
%% @end
%% -----------------------------------------------------------------------------
-spec leave() -> ok.

leave() ->
    ?PEER_SERVICE_MANAGER:leave().


%% -----------------------------------------------------------------------------
%% @doc Remove a node from the cluster. Subsequently calling `join
%% (NodeSpec)' will not work for the removed node. The removed node must be
%% restarted first.
%% @end
%% -----------------------------------------------------------------------------
-spec leave(partisan:node_spec()) -> ok.

leave(#{name := Node} = NodeSpec) ->
    case partisan:node() of
        Node ->
            ?PEER_SERVICE_MANAGER:leave();
        _ ->
            ?PEER_SERVICE_MANAGER:leave(NodeSpec)
    end.


%% -----------------------------------------------------------------------------
%% @doc Trigger function on connection open for a given node.
%% `Function' is a function object taking zero or a single argument, where the
%% argument is the Node name.
%%
%% At the moment, this only works when using a full-mesh topology i.e.
%% `partisan_pluggable_peer_service_manager' or
%% `partisan_static_peer_service_manager'.
%% @end
%% -----------------------------------------------------------------------------
-spec on_up(
    node() | partisan:node_spec() | any | '_',
    partisan_peer_service_manager:on_event_fun()) ->
    ok | {error, not_implemented}.

on_up(Node, Function) ->
    ?PEER_SERVICE_MANAGER:on_up(Node, Function).


%% -----------------------------------------------------------------------------
%% @doc Trigger function on connection open for a given node.
%% `Function' is a function object taking zero or a single argument, where the
%% argument is the Node name.
%%
%% At the moment, this only works when using a full-mesh topology i.e.
%% `partisan_pluggable_peer_service_manager' or
%% `partisan_static_peer_service_manager'.
%% @end
%% -----------------------------------------------------------------------------
-spec on_up(
    node() | partisan:node_spec() | any | '_',
    partisan_peer_service_manager:on_event_fun(),
    Opts :: #{channel => partisan:channel()}) ->
    ok | {error, not_implemented}.

on_up(Node, Function, Opts) ->
    ?PEER_SERVICE_MANAGER:on_up(Node, Function, Opts).


%% -----------------------------------------------------------------------------
%% @doc Trigger function on connection close for a given node.
%% `Function' is a function object taking zero or a single argument, where the
%% argument is the Node name.
%%
%% At the moment, this only works when using a full-mesh topology i.e.
%% `partisan_pluggable_peer_service_manager' or
%% `partisan_static_peer_service_manager'.
%% @end
%% -----------------------------------------------------------------------------
-spec on_down(node() | partisan:node_spec() | any | '_', function()) ->
    ok | {error, not_implemented}.

on_down(Node, Function) ->
    ?PEER_SERVICE_MANAGER:on_down(Node, Function).


%% -----------------------------------------------------------------------------
%% @doc Trigger function on connection close for a given node.
%% `Function' is a function object taking zero or a single argument, where the
%% argument is the Node name.
%%
%% At the moment, this only works when using a full-mesh topology i.e.
%% `partisan_pluggable_peer_service_manager' or
%% `partisan_static_peer_service_manager'.
%% @end
%% -----------------------------------------------------------------------------
-spec on_down(
    node() | partisan:node_spec() | any | '_',
    partisan_peer_service_manager:on_event_fun(),
    Opts :: #{channel => partisan:channel()}) ->
    ok | {error, not_implemented}.

on_down(Node, Function, Opts) ->
    ?PEER_SERVICE_MANAGER:on_down(Node, Function, Opts).


%% -----------------------------------------------------------------------------
%% @doc Return a sampling of nodes connected to this node.
%% When using a full-mesh topology i.e.
%% `partisan_pluggable_peer_service_manager' or
%% `partisan_static_peer_service_manager' this is the set of all cluster
%% members. However, if you're using other managers, the result will only be a
%% sampling of the nodes.
%% @end
%% -----------------------------------------------------------------------------
-spec member(Node :: node() | partisan:node_spec()) -> boolean().

member(Node) ->
    ?PEER_SERVICE_MANAGER:member(Node).



%% -----------------------------------------------------------------------------
%% @doc Return cluster members
%% @end
%% -----------------------------------------------------------------------------
-spec members() -> {ok, [node()]}.

members() ->
    ?PEER_SERVICE_MANAGER:members().


%% -----------------------------------------------------------------------------
%% @doc Return cluster members
%% @end
%% -----------------------------------------------------------------------------
-spec members_for_orchestration() -> [partisan:node_spec()].

members_for_orchestration() ->
    ?PEER_SERVICE_MANAGER:members_for_orchestration().


%% -----------------------------------------------------------------------------
%% @doc Return peer service connections
%% @end
%% -----------------------------------------------------------------------------
connections() ->
    {ok, partisan_peer_connections:connections()}.


%% -----------------------------------------------------------------------------
%% @doc Update cluster members with a list of node specifications.
%% @end
%% -----------------------------------------------------------------------------
-spec update_members(Members :: [partisan:node_spec()]) ->
    ok | {error, not_implemented}.

update_members(Members) ->
    ?PEER_SERVICE_MANAGER:update_members(Members).


%% -----------------------------------------------------------------------------
%% @doc Decode peer_service_manager state from an encoded form
%% @end
%% -----------------------------------------------------------------------------
-spec decode(term()) -> term().

decode(State) ->
    Manager = ?PEER_SERVICE_MANAGER,
    [P || #{name := P} <- Manager:decode(State)].


%% -----------------------------------------------------------------------------
%% @doc Reserve a slot for the particular tag.
%% @end
%% -----------------------------------------------------------------------------
-spec reserve(atom()) -> ok | {error, no_available_slots}.

reserve(Tag) ->
    ?PEER_SERVICE_MANAGER:reserve(Tag).


%% -----------------------------------------------------------------------------
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec partitions() -> {ok, partisan_peer_service_manager:partitions()} | {error, not_implemented}.

partitions() ->
    ?PEER_SERVICE_MANAGER:partitions().



%% -----------------------------------------------------------------------------
%% @doc Inject a partition.
%% @end
%% -----------------------------------------------------------------------------
-spec inject_partition(partisan:node_spec(), ttl()) ->
    {ok, reference()} | {error, not_implemented}.

inject_partition(Origin, TTL) ->
    ?PEER_SERVICE_MANAGER:inject_partition(Origin, TTL).


%% -----------------------------------------------------------------------------
%% @doc Resolve a partition.
%% @end
%% -----------------------------------------------------------------------------
-spec resolve_partition(reference()) ->
    ok | {error, not_implemented}.

resolve_partition(Reference) ->
    ?PEER_SERVICE_MANAGER:resolve_partition(Reference).


%% -----------------------------------------------------------------------------
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec get_local_state() -> term().

get_local_state() ->
    ?PEER_SERVICE_MANAGER:get_local_state().



%% -----------------------------------------------------------------------------
%% @doc Adds a supervised callback to receive peer service membership updates.
%% @end
%% -----------------------------------------------------------------------------
add_sup_callback(Function) ->
    partisan_peer_service_events:add_sup_callback(Function).




%% -----------------------------------------------------------------------------
%% @doc Returns the broadcast servers view of full cluster membership.
%% Wait indefinitely for a response is returned from the process.
%% @end
%% -----------------------------------------------------------------------------
-spec broadcast_members() -> ordsets:ordset(node()).

broadcast_members() ->
    partisan_plumtree_broadcast:broadcast_members().


%% -----------------------------------------------------------------------------
%% @doc Returns the broadcast servers view of full cluster membership.
%% Waits `Timeout' ms for a response from the server.
%% @end
%% -----------------------------------------------------------------------------
-spec broadcast_members(infinity | pos_integer()) -> ordsets:ordset(node()).

broadcast_members(Timeout) ->
    partisan_plumtree_broadcast:broadcast_members(Timeout).


%% -----------------------------------------------------------------------------
%% @doc return a list of exchanges, started by broadcast on thisnode, that are
%% running.
%% @end
%% -----------------------------------------------------------------------------
-spec exchanges() ->
    {ok, partisan_plumtree_broadcast:exchanges()}
    | {error, {badrpc, Reason :: any()}}.

exchanges() ->
    partisan_plumtree_broadcast:exchanges().


%% -----------------------------------------------------------------------------
%% @doc returns a list of exchanges, started by broadcast on `Node', that are
%% running.
%% @end
%% -----------------------------------------------------------------------------
-spec exchanges(node()) ->
    {ok, partisan_plumtree_broadcast:exchanges()}
    | {error, {badrpc, Reason :: any()}}.

exchanges(Node) ->
    partisan_plumtree_broadcast:exchanges(Node).


%% -----------------------------------------------------------------------------
%% @doc cancel exchanges started by this node.
%% @end
%% -----------------------------------------------------------------------------
-spec cancel_exchanges(partisan_plumtree_broadcast:selector()) ->
    partisan_plumtree_broadcast:exchanges().

cancel_exchanges(WhichExchanges) ->
    partisan_plumtree_broadcast:cancel_exchanges(WhichExchanges).