%% -------------------------------------------------------------------
%%
%% Copyright (c) 2014 Helium Systems, Inc. All Rights Reserved.
%% Copyright (c) 2016 Christopher Meiklejohn. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
%% -----------------------------------------------------------------------------
%% @doc This module realises the {@link partisan_peer_service_manager}
%% behaviour implementing client-server topology where clients communicate with
%% a single server and servers form a full-mesh topology.
%%
%% == Characteristics ==
%% <ul>
%% <li>Uses TCP/IP.</li>
%% <li>Client nodes communicate and maintain connections with server nodes.
%% They refuse connections from other clients but refer them to a server
%% node.</li>
%% <li>Server nodes communicate and maintain connections with all other server
%% nodes.</li>
%% <li>Nodes periodically send heartbeat messages. The service considers a node
%% "failed" when it misses X heartbeats.</li>
%% <li>Point-to-point messaging through the server (server as relay).</li>
%% <li>Eventually consistent membership maintained in a CRDT and replicated
%% using gossip.</li>
%% <li>Scalability limited to hundres of nodes (60-200 nodes).</li>
%% </ul>
%% @end
%% -----------------------------------------------------------------------------
-module(partisan_client_server_peer_service_manager).
-author("Christopher S. Meiklejohn <christopher.meiklejohn@gmail.com>").
-behaviour(gen_server).
-behaviour(partisan_peer_service_manager).
-include("partisan_logger.hrl").
-include("partisan.hrl").
-define(IS_ON_EVENT_FUN(X), (
is_function(X, 0) orelse is_function(X, 1) orelse is_function(X, 2)
)).
-record(state, {
tag :: tag(),
pending :: pending(),
membership :: membership(),
down_functions = #{} :: #{'_' | node() => on_event_fun()},
up_functions = #{} :: #{'_' | node() => on_event_fun()}
}).
-type on_event_fun() :: partisan_peer_service_manager:on_event_fun().
-type pending() :: sets:set(partisan:node_spec()).
-type membership() :: sets:set(partisan:node_spec()).
-type tag() :: atom().
-type state() :: #state{}.
-type call() :: {on_up | on_down, node(), on_event_fun()}
| {reserve, term()}
| {leave, partisan:node_spec()}
| {join, partisan:node_spec()}
| {send_message, node(), term()}
%% | {forward_message, node(), term(), ...}
| {receive_message, partisan:channel(), term()}
| members
| members_for_orchestration
| get_local_state.
-type cast() :: {join, partisan:node_spec()}
| {kill_connections, [node()]}.
%% partisan_peer_service_manager callbacks
-export([cast_message/2]).
-export([cast_message/3]).
-export([cast_message/4]).
-export([decode/1]).
-export([forward_message/2]).
-export([forward_message/3]).
-export([forward_message/4]).
-export([get_local_state/0]).
-export([inject_partition/2]).
-export([join/1]).
-export([leave/0]).
-export([leave/1]).
-export([members/0]).
-export([members_for_orchestration/0]).
-export([on_down/2]).
-export([on_up/2]).
-export([on_down/3]).
-export([on_up/3]).
-export([partitions/0]).
-export([receive_message/3]).
-export([reserve/1]).
-export([resolve_partition/1]).
-export([send_message/2]).
-export([start_link/0]).
-export([supports_capability/1]).
-export([sync_join/1]).
-export([update_members/1]).
%% gen_server callbacks
-export([init/1]).
-export([handle_call/3]).
-export([handle_cast/2]).
-export([handle_info/2]).
-export([terminate/2]).
-export([code_change/3]).
%% =============================================================================
%% PARTISAN_PEER_SERVICE_MANAGER CALLBACKS
%% =============================================================================
%% -----------------------------------------------------------------------------
%% @doc Same as start_link([]).
%% @end
%% -----------------------------------------------------------------------------
-spec start_link() -> {ok, pid()} | ignore | {error, term()}.
start_link() ->
Opts = [
{spawn_opt, ?PARALLEL_SIGNAL_OPTIMISATION([])}
],
gen_server:start_link({local, ?MODULE}, ?MODULE, [], Opts).
%% -----------------------------------------------------------------------------
%% @doc Return membership list.
%% @end
%% -----------------------------------------------------------------------------
members() ->
gen_server:call(?MODULE, members, infinity).
%% -----------------------------------------------------------------------------
%% @doc Return membership list.
%% @end
%% -----------------------------------------------------------------------------
members_for_orchestration() ->
gen_server:call(?MODULE, members_for_orchestration, infinity).
%% -----------------------------------------------------------------------------
%% @doc Update membership.
%% @end
%% -----------------------------------------------------------------------------
update_members(_Nodes) ->
{error, not_implemented}.
%% -----------------------------------------------------------------------------
%% @doc Return local node's view of cluster membership.
%% @end
%% -----------------------------------------------------------------------------
get_local_state() ->
gen_server:call(?MODULE, get_local_state, infinity).
%% -----------------------------------------------------------------------------
%% @doc Trigger function on connection close for a given node.
%% `Fun' is a function object taking zero or a single argument, where the
%% argument is the Node name.
%% @end
%% -----------------------------------------------------------------------------
on_down(Arg, Fun) ->
on_down(Arg, Fun, #{}).
%% -----------------------------------------------------------------------------
%% @doc Trigger function on connection close for a given node.
%% `Fun' is a function object taking zero or a single argument, where the
%% argument is the Node name.
%% @end
%% -----------------------------------------------------------------------------
on_down(_Node, _Fun, _Opts) ->
{error, not_implemented}.
%% -----------------------------------------------------------------------------
%% @doc Trigger function on connection open for a given node.
%% `Fun' is a function object taking zero or a single argument, where the
%% argument is the Node name.
%% @end
%% -----------------------------------------------------------------------------
on_up(Arg, Fun) ->
on_up(Arg, Fun, #{}).
%% -----------------------------------------------------------------------------
%% @doc Trigger function on connection open for a given node.
%% `Fun' is a function object taking zero or a single argument, where the
%% argument is the Node name.
%% @end
%% -----------------------------------------------------------------------------
on_up(_Node, _Fun, _O_pts) ->
{error, not_implemented}.
%% -----------------------------------------------------------------------------
%% @doc Send message to a remote manager.
%% @end
%% -----------------------------------------------------------------------------
send_message(Name, Message) ->
gen_server:call(?MODULE, {send_message, Name, Message}, infinity).
%% -----------------------------------------------------------------------------
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec cast_message(
Term :: partisan:any_pid() | partisan:any_name(),
MEssage :: partisan:message()) -> ok.
cast_message(Term, Message) ->
FullMessage = {'$gen_cast', Message},
_ = forward_message(Term, FullMessage, #{}),
ok.
%% -----------------------------------------------------------------------------
%% @doc Cast a message to a remote gen_server.
%% @end
%% -----------------------------------------------------------------------------
cast_message(Node, ServerRef, Message) ->
cast_message(Node, ServerRef, Message, #{}).
%% -----------------------------------------------------------------------------
%% @doc Cast a message to a remote gen_server.
%% @end
%% -----------------------------------------------------------------------------
cast_message(Node, ServerRef, Message, Options) ->
FullMessage = {'$gen_cast', Message},
_ = forward_message(Node, ServerRef, FullMessage, Options),
ok.
%% -----------------------------------------------------------------------------
%% @doc Gensym support for forwarding.
%% @end
%% -----------------------------------------------------------------------------
forward_message(Term, Message) ->
forward_message(Term, Message, #{}).
%% -----------------------------------------------------------------------------
%% @doc Gensym support for forwarding.
%% @end
%% -----------------------------------------------------------------------------
forward_message(Pid, Message, Opts) when is_pid(Pid) ->
forward_message(partisan:node(Pid), Pid, Message, Opts);
forward_message(Name, Message, Opts) when is_atom(Name) ->
forward_message(partisan:node(), Name, Message, Opts);
forward_message({Name, Node}, Message, Opts) ->
forward_message(Node, Name, Message, Opts);
forward_message(RemoteRef, Message, Opts) ->
partisan_remote_ref:is_pid(RemoteRef)
orelse partisan_remote_ref:is_name(RemoteRef)
orelse error(badarg),
Node = partisan_remote_ref:node(RemoteRef),
Target = partisan_remote_ref:target(RemoteRef),
forward_message(Node, Target, Message, Opts).
%% -----------------------------------------------------------------------------
%% @doc Forward message to registered process on the remote side.
%% @end
%% -----------------------------------------------------------------------------
forward_message(Node, ServerRef, Message, Opts) when is_list(Opts) ->
forward_message(Node, ServerRef, Message, maps:from_list(Opts));
forward_message(Node, ServerRef, Message, Opts) when is_map(Opts) ->
%% We ignore Opts.channel
gen_server:call(
?MODULE,
{forward_message, Node, ServerRef, Message, Opts},
infinity
).
%% -----------------------------------------------------------------------------
%% @doc Receive message from a remote manager.
%% @end
%% -----------------------------------------------------------------------------
receive_message(_Peer, Channel, Message) ->
gen_server:call(?MODULE, {receive_message, Channel, Message}, infinity).
%% -----------------------------------------------------------------------------
%% @doc Attempt to join a remote node.
%% @end
%% -----------------------------------------------------------------------------
join(Node) ->
gen_server:call(?MODULE, {join, Node}, infinity).
%% -----------------------------------------------------------------------------
%% @doc Attempt to join a remote node.
%% @end
%% -----------------------------------------------------------------------------
sync_join(_Node) ->
{error, not_implemented}.
%% -----------------------------------------------------------------------------
%% @doc Leave the cluster.
%% @end
%% -----------------------------------------------------------------------------
leave() ->
gen_server:call(?MODULE, {leave, partisan:node_spec()}, infinity).
%% -----------------------------------------------------------------------------
%% @doc Remove another node from the cluster.
%% @end
%% -----------------------------------------------------------------------------
leave(#{name := _} = NodeSpec) ->
gen_server:call(?MODULE, {leave, NodeSpec}, infinity).
%% -----------------------------------------------------------------------------
%% @doc Decode state.
%% @end
%% -----------------------------------------------------------------------------
decode(State) ->
sets:to_list(State).
%% -----------------------------------------------------------------------------
%% @doc Reserve a slot for the particular tag.
%% @end
%% -----------------------------------------------------------------------------
reserve(Tag) ->
gen_server:call(?MODULE, {reserve, Tag}, infinity).
%% -----------------------------------------------------------------------------
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec supports_capability(Arg :: atom()) -> boolean().
supports_capability(monitoring) ->
false;
supports_capability(_) ->
false.
%% -----------------------------------------------------------------------------
%% @doc Inject a partition.
%% @end
%% -----------------------------------------------------------------------------
inject_partition(_Origin, _TTL) ->
{error, not_implemented}.
%% -----------------------------------------------------------------------------
%% @doc Resolve a partition.
%% @end
%% -----------------------------------------------------------------------------
resolve_partition(_Reference) ->
{error, not_implemented}.
%% -----------------------------------------------------------------------------
%% @doc Return partitions.
%% @end
%% -----------------------------------------------------------------------------
partitions() ->
{error, not_implemented}.
%% =============================================================================
%% GEN_SERVER CALLBACKS
%% =============================================================================
-spec init([]) -> {ok, state()}.
init([]) ->
%% Seed the random number generator.
partisan_config:seed(),
%% Process connection exits.
process_flag(trap_exit, true),
ok = partisan_peer_connections:init(),
Membership = maybe_load_state_from_disk(),
Myself = partisan:node_spec(),
logger:set_process_metadata(#{
node => Myself
}),
%% Get tag, if set.
Tag = partisan_config:get(tag, undefined),
State = #state{
tag = Tag,
pending = sets:new([{version, 2}]),
membership = Membership
},
{ok, State}.
-spec handle_call(call(), {pid(), term()}, state()) ->
{reply, term(), state()}
| {reply, term(), state(), timeout()}
| {reply, term(), state(), hibernate}
| {reply, term(), state(), {continue, term()}}
| {noreply, state()} | {noreply, state(), timeout()}
| {noreply, state(), hibernate}
| {noreply, state(), {continue, term()}}
| {stop, term(), term(), state()}
| {stop, term(), state()}.
handle_call(
{on_up, Name, Function},
_From,
#state{up_functions = UpFunctions0} = State) ->
UpFunctions = partisan_util:maps_append(Name, Function, UpFunctions0),
{reply, ok, State#state{up_functions = UpFunctions}};
handle_call(
{on_down, Name, Function},
_From,
#state{down_functions = DownFunctions0} = State) ->
DownFunctions = partisan_util:maps_append(Name, Function, DownFunctions0),
{reply, ok, State#state{down_functions = DownFunctions}};
handle_call({reserve, _Tag}, _From, State) ->
{reply, {error, no_available_slots}, State};
handle_call({leave, #{name := Peer}}, _From, #state{} = State0) ->
Membership0 = State0#state.membership,
Pending = State0#state.pending,
%% Node may exist in the membership on multiple ports, so we need to
%% remove all.
Membership = sets:fold(
fun
(#{name := Node} = NodeSpec, Acc) when Node == Peer ->
sets:del_element(NodeSpec, Acc);
(#{name := Node}, Acc) ->
%% call the net_kernel:disconnect(Node) function to leave
%% erlang network explicitly
%% TODO why do we do this here? If Node is myself then I would
%% not be able to call the subsequent peers, also, shouldn't be
%% enough to disconnect myself from Peer (every other peer will
%% do the same)?
_ = rpc:call(Peer, net_kernel, disconnect, [Node]),
Acc
end,
Membership0,
Membership0
),
%% Remove state and shutdown if we are removing ourselves.
case partisan:node() of
Peer ->
delete_state_from_disk(),
%% Shutdown; connections terminated on shutdown.
State = State0#state{membership = Membership},
{stop, normal, State};
_ ->
%% TODO maybe we need to do the following here (see prev TODO)
%% _ = net_kernel:disconnect(Peer),
ok = partisan_peer_service_manager:disconnect(Peer),
NewPending = sets:filter(
fun(#{name := Node}) -> Node =/= Peer end,
Pending
),
State = State0#state{
membership = Membership,
pending = NewPending
},
{reply, ok, State}
end;
handle_call({join, #{name := _} = Spec}, _From, State) ->
gen_server:cast(?MODULE, {join, Spec}),
{reply, ok, State};
handle_call({send_message, Name, Msg}, _From, State) ->
Result = do_send_message(Name, Msg),
{reply, Result, State};
handle_call({forward_message, Name, ServerRef, Msg, _Opts}, _From, State) ->
Result = do_send_message(Name, {forward_message, ServerRef, Msg}),
{reply, Result, State};
handle_call({receive_message, Channel, Msg}, _From, State) ->
handle_message(Msg, Channel, State);
handle_call(members, _From, State) ->
Members = [Node || #{name := Node} <- members(State#state.membership)],
{reply, {ok, Members}, State};
handle_call(members_for_orchestration, _From, State) ->
{reply, {ok, members(State#state.membership)}, State};
handle_call(get_local_state, _From, State) ->
{reply, {ok, State#state.membership}, State};
handle_call(Msg, _From, State) ->
?LOG_WARNING(#{
description => "Unhandled call messages",
module => ?MODULE,
messages => Msg
}),
{reply, ok, State}.
-spec handle_cast(cast(), state()) -> {noreply, state()}.
handle_cast({join, #{name := Node} = Spec}, #state{} = State) ->
%% Attempt to join via disterl for control messages during testing.
ok = partisan_util:maybe_connect_disterl(Node),
%% Trigger connection.
ok = partisan_peer_service_manager:connect(Spec),
%% Add to set of pending connections as we will get a 'connected'
%% message asynchronously (see handle_info)
Pending = sets:add_element(Spec, State#state.pending),
{noreply, State#state{pending = Pending}};
handle_cast({kill_connections, Nodes}, State) ->
ok = kill_connections(Nodes, State),
{noreply, State};
handle_cast(Msg, State) ->
?LOG_WARNING(#{
description => "Unhandled cast messages",
module => ?MODULE,
messages => Msg
}),
{noreply, State}.
handle_info({'EXIT', From, Reason}, State0) ->
?LOG_DEBUG(#{
description => "Connection closed",
reason => Reason
}),
%% A connection has closed, prune it from the connections table
try partisan_peer_connections:prune(From) of
{Info, [_Connection]} ->
State =
case partisan_peer_connections:count(Info) of
0 ->
%% This was the last connection so the node is down.
NodeSpec = partisan_peer_connections:node_spec(Info),
%% We notify all subscribers.
ok = down(NodeSpec, State0),
%% If still a member we add it to pending, so that we
%% can compute the on_up signal
maybe_add_pending(NodeSpec, State0);
_ ->
State0
end,
{noreply, State}
catch
error:badarg ->
%% Weird, connection pid did not exist
{noreply, State0}
end;
handle_info(
{connected, NodeSpec, _Channel, TheirTag, _RemoteState},
#state{} = State0) ->
Pending0 = State0#state.pending,
Membership0 = State0#state.membership,
OurTag = State0#state.tag,
case sets:is_element(NodeSpec, Pending0) of
true ->
case accept_join_with_tag(OurTag, TheirTag) of
true ->
%% Move out of pending.
Pending = sets:del_element(NodeSpec, Pending0),
%% Add to our membership.
Membership = sets:add_element(NodeSpec, Membership0),
%% Notify event handlers
ok = case Membership == Membership0 of
true ->
ok;
false ->
partisan_peer_service_events:update(Membership)
end,
?LOG_DEBUG(#{
description => ">>>>>>>>>>>>>>> Notify subscribers."
}),
%% notify subscribers
up(NodeSpec, State0),
?LOG_DEBUG(#{
description => ">>>>>>>>>>>>>>> Establish more conns."
}),
%% Establish any new connections.
ok = establish_connections(Pending, Membership),
%% Compute count.
Count = sets:size(Membership),
?LOG_DEBUG(#{
description => "Join ACCEPTED.",
peer => NodeSpec,
our_tag => OurTag,
their_tag => TheirTag,
view_member_count => Count
}),
State = State0#state{
pending = Pending,
membership = Membership
},
{noreply, State};
false ->
?LOG_INFO(#{
description => "Join REFUSED, keeping membership.",
peer => NodeSpec,
our_tag => OurTag,
their_tag => TheirTag,
view_membership => Membership0
}),
{noreply, State0}
end;
false ->
{noreply, State0}
end;
handle_info(Event, State) ->
?LOG_WARNING(#{description => "Unhandled info event", event => Event}),
{noreply, State}.
%% @private
-spec terminate(term(), state()) -> term().
terminate(_Reason, #state{}) ->
Fun = fun(_NodeInfo, Connections) ->
lists:foreach(
fun(C) ->
Pid = partisan_peer_connections:pid(C),
catch gen_server:stop(Pid, normal, infinity),
ok
end,
Connections
)
end,
ok = partisan_peer_connections:foreach(Fun),
ok.
%% @private
-spec code_change(term() | {down, term()}, state(), term()) -> {ok, state()}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%% =============================================================================
%% PRIVATE
%% =============================================================================
%% @private
empty_membership() ->
LocalState = sets:add_element(
partisan:node_spec(),
sets:new([{version, 2}])
),
persist_state(LocalState),
LocalState.
%% @private
data_root() ->
case application:get_env(partisan, partisan_data_dir) of
{ok, PRoot} ->
filename:join(PRoot, "default_peer_service");
undefined ->
undefined
end.
%% @private
write_state_to_disk(State) ->
case data_root() of
undefined ->
ok;
Dir ->
File = filename:join(Dir, "cluster_state"),
ok = filelib:ensure_dir(File),
ok = file:write_file(File, term_to_binary(State))
end.
%% @private
delete_state_from_disk() ->
case data_root() of
undefined ->
ok;
Dir ->
File = filename:join(Dir, "cluster_state"),
ok = filelib:ensure_dir(File),
case file:delete(File) of
ok ->
?LOG_INFO(#{
description => "Leaving cluster, removed cluster_state"
});
{error, Reason} ->
?LOG_INFO(#{
description => "Unable to remove cluster_state",
reason => Reason
})
end
end.
%% @private
maybe_load_state_from_disk() ->
case data_root() of
undefined ->
empty_membership();
Dir ->
case filelib:is_regular(filename:join(Dir, "cluster_state")) of
true ->
{ok, Bin} = file:read_file(filename:join(Dir, "cluster_state")),
binary_to_term(Bin);
false ->
empty_membership()
end
end.
%% @private
persist_state(State) ->
write_state_to_disk(State).
%% @private
members(Membership) ->
sets:to_list(Membership).
%% @private
up(NodeOrSpec, State) ->
apply_funs(NodeOrSpec, State#state.up_functions).
%% @private
down(NodeOrSpec, State) ->
apply_funs(NodeOrSpec, State#state.down_functions).
%% @private
apply_funs(Node, Mapping) when is_atom(Node) ->
?LOG_DEBUG(#{
description => "Node status change notification",
node => Node,
funs => Mapping
}),
Funs = lists:append(
%% Notify functions matching the wildcard '_'
maps:get('_', Mapping, []),
%% Notify functions matching Node
maps:get(Node, Mapping, [])
),
_ = [
begin
case erlang:fun_info(F, arity) of
{arity, 0} -> catch F();
{arity, 1} -> catch F(Node)
end
end || F <- Funs
],
ok;
apply_funs(#{name := Node}, Mapping) ->
apply_funs(Node, Mapping).
%% @private
-spec peers(sets:set(partisan:node_spec())) -> [partisan:node_spec()].
peers(Set) ->
sets:to_list(without_myself(Set)).
%% @private
-spec without_myself(sets:set(partisan:node_spec())) ->
sets:set(partisan:node_spec()).
without_myself(Set) ->
Exclude = sets:from_list([partisan:node_spec()]),
sets:subtract(Set, Exclude).
%% @private
establish_connections(Pending, Membership) ->
%% Reconnect disconnected members and members waiting to join.
Peers = peers(sets:union(Membership, Pending)),
lists:foreach(fun partisan_peer_service_manager:connect/1, Peers).
%% @private
kill_connections(Nodes, State) ->
Fun = fun(Node) ->
ok = down(Node, State)
end,
partisan_peer_service_manager:disconnect(Nodes, Fun).
%% @private
handle_message({forward_message, ServerRef, Message}, _Channel, State) ->
partisan_peer_service_manager:deliver(ServerRef, Message),
{reply, ok, State}.
%% @private
-spec do_send_message(
Node :: atom() | partisan:node_spec(), Message :: term())->
ok | {error, disconnected | not_yet_connected | notalive}.
do_send_message(Node, Message) ->
%% Find a connection for the remote node, if we have one.
case partisan_peer_connections:dispatch_pid(Node) of
{ok, Pid} ->
gen_server:cast(Pid, {send_message, Message});
{error, _} = Error ->
Error
end.
%% @private
accept_join_with_tag(server, server) ->
true;
accept_join_with_tag(server, client) ->
true;
accept_join_with_tag(client, server) ->
true;
accept_join_with_tag(client, client) ->
false;
accept_join_with_tag(undefined, undefined) ->
true;
accept_join_with_tag(undefined, _) ->
false.
%% @private
maybe_add_pending(NodeSpec, #state{} = State) ->
Pending0 = State#state.pending,
Membership = State#state.membership,
Pending = case sets:is_element(NodeSpec, Membership) of
true ->
sets:add_element(NodeSpec, Pending0);
false ->
Pending0
end,
State#state{pending = Pending}.