src/partisan_peer_service.erl

%% -------------------------------------------------------------------
%%
%% Copyright (c) 2014 Helium Systems, Inc.  All Rights Reserved.
%% Copyright (c) 2016 Christopher Meiklejohn.  All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License.  You may obtain
%% a copy of the License at
%%
%%   http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied.  See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------

-module(partisan_peer_service).

-export([join/1,
         join/2,
         join/3,
         attempt_join/1,
         sync_join/1,
         sync_join/2,
         sync_join/3,
         attempt_sync_join/1,
         leave/1,
         decode/1,
         update_members/1,
         stop/0,
         stop/1,
         members/0,
         connections/0,
         manager/0,
         add_sup_callback/1,
         cast_message/3,
         forward_message/3]).

-include("partisan.hrl").

%% @doc Return current peer service manager.
manager() ->
    partisan_config:get(partisan_peer_service_manager,
                        ?DEFAULT_PEER_SERVICE_MANAGER).

%% @doc prepare node to join a cluster
join(Node) ->
    join(Node, true).

%% @doc prepare node to join a cluster
sync_join(Node) ->
    sync_join(Node, true).

%% @doc Convert nodename to atom
join(NodeStr, Auto) when is_list(NodeStr) ->
    join(erlang:list_to_atom(lists:flatten(NodeStr)), Auto);
join(Node, Auto) when is_atom(Node) ->
    join(partisan_peer_service_manager:mynode(), Node, Auto);
join(Node, _Auto) ->
    attempt_join(Node).

%% @doc Convert nodename to atom
sync_join(NodeStr, Auto) when is_list(NodeStr) ->
    sync_join(erlang:list_to_atom(lists:flatten(NodeStr)), Auto);
sync_join(Node, Auto) when is_atom(Node) ->
    sync_join(partisan_peer_service_manager:mynode(), Node, Auto);
sync_join(Node, _Auto) ->
    attempt_sync_join(Node).

%% @doc Initiate join. Nodes cannot join themselves.
join(Node, Node, _) ->
    {error, self_join};
join(_, Node, _Auto) ->
    attempt_join(Node).

%% @doc Initiate join. Nodes cannot join themselves.
sync_join(Node, Node, _) ->
    {error, self_join};
sync_join(_, Node, _Auto) ->
    attempt_sync_join(Node).

%% @doc Return node members.
members() ->
    Manager = manager(),
    Manager:members().

%% @doc Return node connections.
connections() ->
    Manager = manager(),
    Manager:connections().

%% @doc Update cluster members.
update_members(Nodes) ->
    Manager = manager(),
    Manager:update_members(Nodes).

%% @doc Add callback.
add_sup_callback(Function) ->
    partisan_peer_service_events:add_sup_callback(Function).

%% @doc Cast message to registered process on the remote side.
cast_message(Name, ServerRef, Message) ->
    Manager = manager(),
    Manager:cast_message(Name, ServerRef, Message).

%% @doc Forward message to registered process on the remote side.
forward_message(Name, ServerRef, Message) ->
    Manager = manager(),
    Manager:forward_message(Name, ServerRef, Message).

%% @private
decode(State) ->
    Manager = manager(),
    [P || #{name := P} <- Manager:decode(State)].

%% @private
attempt_join(Node) when is_atom(Node) ->
    ListenAddrs = rpc:call(Node, partisan_config, get, [listen_addrs]),
    attempt_join(#{name => Node, listen_addrs => ListenAddrs});
attempt_join(#{name := _Name} = Node) ->
    Manager = manager(),
    Manager:join(Node).

%% @private
attempt_sync_join(Node) when is_atom(Node) ->
    ListenAddrs = rpc:call(Node, partisan_config, get, [listen_addrs]),
    attempt_sync_join(#{name => Node, listen_addrs => ListenAddrs});
attempt_sync_join(#{name := _Name} = Node) ->
    Manager = manager(),
    Manager:sync_join(Node).

%% @doc Attempt to leave the cluster.
leave(Node) ->
    Manager = manager(),
    Manager:leave(Node).

%% @doc Stop.
stop() ->
    stop("received stop request").

%% @doc Stop for a given reason.
stop(Reason) ->
    lager:notice("~p", [Reason]),
    init:stop().