%% -------------------------------------------------------------------
%%
%% Copyright (c) 2016 Christopher Meiklejohn. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
%% @doc This module realises the {@link partisan_peer_service_manager}
%% behaviour implementing a peer-to-peer partial mesh topology using the
%% protocol described in the paper
%% <a href="https://asc.di.fct.unl.pt/~jleitao/pdf/dsn07-leitao.pdf">HyParView:
%% a membership protocol for reliable gossip-based broadcast</a>
%% by João Leitão, José Pereira and Luís Rodrigues.
%%
%% The following content contains abstracts from the paper.
%%
%% == Characteristics ==
%% <ul>
%% <li>Uses TCP/IP as an unreliable failure detector (unreliable because it can
%% generate false positives e.g. when the network becomes suddenly
%% congested).</li>
%% <li>It can sustain high level of node failres while ensuring connectivity
%% of the overlay. Nodes are considered "failed" when the TCP/IP connection is
%% dropped.</li>
%% <li>Nodes maintain partial views of the network. Every node will contain and
%% <em>active view</em> that forms a connected grah, and a
%% <em>passive view</em> of backup links that
%% are used to repair graph connectivity under failure. Some links to passive
%% nodes are kept open for fast replacement of failed nodes in the active
%% view. So the view is probabilistic, meaning that the protocol doesn't
%% prevent (nor detects) the cluter to be split into several subclusters with
%% no connections to each other.</li>
%% <li>HyParView sacrificies strong membership for high availability and
%% connectivity: the algorithm constantly works towards and ensures that
%% eventually the clsuter membership is a fully-connected component.
%% However, at any point in time different nodes may have different,
%% inconsistent views of the cluster membership. As a consequence, HyParView is
%% not designed to work with systems that require strong membership properties,
%% eg. consensus protocols like Paxos or Raft.</li>
%% <li>Point-to-point messaging for connected nodes with a minimum of 1 hop via
%% transitive message delivery (as not all nodes directly connected). Delivery
%% is probabilistic.</li>
%% <li>No explicit leave operation, because the overlay is able to react fast
%% enough to node failures. Hence when a node wishes to leave the system it is
%% simply treated as if the node have failed.</li>
%% <li>Scalability to up-to 2,000 nodes.</li>
%% </ul>
%%
%% == HyParView Membership Protocol ==
%%
%% == Partial View ==
%% A partial view is a small subset of the entire system (cluster) membership,
%% a set of node specifications maintained locally at each node.
%%
%% A node specification i.e. `partisan:node_spec()' allows a node to be
%% reached by other nodes.
%%
%% A membership protocol is in charge of initializing and maintaining the
%% partial views at each node in face of dynamic changes in the system. For
%% instance, when a new node joins the system, its identifier should be added
%% to the partial view of (some) other nodes and it has to create its own
%% partial view, including identifiers of nodes already in the system. Also, if
%% a node fails or leaves the system, its identifier should be removed from all
%% partial views as soon as possible.
%%
%% Partial views establish neighboring associations among nodes. Therefore,
%% partial views define an overlay network, in other words, partial views
%% establish an directed graph that captures the neighbor relation between all
%% nodes executing the protocol. In this graph nodes are represented by a
%% vertex while a neighbor relation is represented by an arc from the node who
%% contains the target node in his partial view.
%%
%% == Membership Protocol ==
%% The Hybrid Partial View (HyParView) membership protocol is in charge of
%% maintaining two distinct views at each node: a small active view, of size
%% `log(n) + c', and a larger passive view, of size `k(log(n) + c)'.
%%
%% It then selects which members of this view should be promoted to the active
%% view.
%%
%% === Active View ===
%% Each node maintains a small symmetric ctive view the size of fanout + 1.
%% Being symmetric means means that if node <b>q</b> is in the active view of
%% node <b>p</b> then node <b>p</b> is also in the active view of node <b>q</b>.
%%
%% The active views af all cluster nodes create an overlay that is used for
%% message dissemination. Each node keeps an open TCP connection to every other
%% node in its active view.
%%
%% Broadcast is performed deterministically by flooding the graph defined by
%% the active views across the cluster. When a node receives a message for the
%% first time, it broadcasts the message to all nodes of its active view (
%% except, obviously, to the node that has sent the message).
%% While this graph is generated at random, gossip is deterministic as long as
%% the graph remains unchanged.
%%
%% ==== Active View Management ====
%% A reactive strategy is used to maintain the active view. Nodes can be added
%% to the active view when they join the system. Also, nodes are removed from
%% the active view when they fail. When a node <b>p</b> suspects that one of the
%% nodes present in its active view has failed (by either disconnecting or
%% blocking), it selects a random node <b>q</b> from its passive view and attempts
%% to establish a TCP connection with <b>q</b>. If the connection fails to
%% establish, node <b>q</b> is considered failed and removed from <b>p’s</b>
%% passive view; another node <b>q′</b> is selected at random and a new attempt
%% is made.
%%
%% When the connection is established with success, p sends to q a Neighbor
%% request with its own identifier and a priority level. The priority level of
%% the request may take two values, depending on the number of nodes present in
%% the active view of p: if p has no elements in its active view the priority
%% is high; the priority is low otherwise.
%%
%% A node q that receives a high priority neighbor request will always accept
%% the request, even if it has to drop a random member from its active view (
%% again, the member that is dropped will receive a Disconnect notification).
%% If a node q receives a low priority Neighbor request, it will only accept
%% the request if it has a free slot in its active view, otherwise it will
%% refuse the request.
%%
%% If the node q accepts the Neighbor request, p will remove q’s identifier
%% from its passive view and add it to the active view. If q rejects the
%% Neighbor request, the initiator will select another node from its passive
%% view and repeat the whole procedure (without removing q from its passive
%% view).
%%
%% Each node tests its entire active view every
%% time it forwards a message. Therefore, the entire broadcast overlay is
%% implicitly tested at every broadcast, which allows a very fast failure
%% detection.
%%
%% === Passive View ===
%% In addition to the active view, each node maintains a larger passive view
%% of backup nodes that can be promoted to the active view when one of the
%% nodes in the active view fails.
%%
%% The passive view is not used for message dissemination. Instead, the goal of
%% the passive view is to maintain a list of nodes that can be used to replace
%% failed members of the active view. The passive view is maintained using a
%% cyclic strategy. Periodically, each node performs a shuffle operation with
%% one of its neighbors in order to update its passive view.
%%
%% ==== Passive View Management ====
%%
%% The passive view is maintained using a cyclic strategy. Periodically, each
%% node perform a shuffle operation with one of its peers at random. The
%% purpose of the shuffle operation is to update the passive views of the nodes
%% involved in the exchange. The node p that initiates the exchange creates an
%% exchange list with the following contents: p’s own identifier, ka nodes from
%% its active view and kp nodes from its passive view (where ka and kp are
%% protocol parameters). It then sends the list in a Shuffle request to a
%% random neighbor of its active view. Shuffle requests are propagated using a
%% random walk and have an associated “time to live”, just like the ForwardJoin
%% requests.
%%
%% A node q that receives a Shuffle request will first decrease its time to
%% live. If the time to live of the message is greater than zero and the number
%% of nodes in q’s active view is greater than 1, the node will select a random
%% node from its active view, different from the one he received this shuffle
%% message from, and simply forwards the Shuffle request. Otherwise, node q
%% accepts the Shuffle request and send back, using a temporary TCP connection,
%% a ShuffleReply message that includes a number of nodes selected at random
%% from q’s passive view equal to the number of nodes received in the Shuffle
%% request.
%%
%% Then, both nodes integrate the elements they received in the Shuffle/
%% ShuffleReply mes- sage into their passive views (naturally, they exclude
%% their own identifier and nodes that are part of the active or passive
%% views). Because the passive view has a fixed length, it might get full; in
%% that case, some identifiers will have to be removed in order to free space
%% to include the new ones. A node will first attempt to remove identifiers
%% sent to the peer. If no such identifiers remain in the passive view, it will
%% remove identifiers at random.
%%
%% == Configuration ==
%% The following are the HyParView configuration parameters managed by
%% {@link partisan_config}. The params are passed as `{hyparview, Config}'
%% where `Config' is a property list or map where the keys are the following:
%%
%% <dl>
%% <dt>`active_max_size'</dt><dd>Defaults to 6.</dd>
%% <dt>`active_min_size'</dt><dd>Defaults to 3.</dd>
%% <dt>`active_rwl'</dt><dd>Active View Random Walk Length. Defaults
%% to 6.</dd>
%% <dt>`passive_max_size'</dt><dd>Defaults to 30.</dd>
%% <dt>`passive_rwl'</dt><dd>Passive View Random Walk Length.
%% Defaults to 6.</dd>
%% <dt>`random_promotion'</dt><dd>A boolean indicating if random promotion is
%% enabled. Defaults `true'.</dd>
%% <dt>`random_promotion_interval'</dt><dd>Time after which the
%% protocol attempts to promote a node in the passive view to the active
%% view.Defaults to 5000.</dd>
%% <dt>`shuffle_interval'</dt><dd>Defaults to 10000.</dd>
%% <dt>`shuffle_k_active'</dt><dd>Number of peers to include in the
%% shuffle exchange. Defaults to 3.</dd>
%% <dt>`shuffle_k_passive'</dt><dd>Number of peers to include in the
%% shuffle exchange. Defaults to 4.</dd>
%% </dl>
%%
%% @end
%% -----------------------------------------------------------------------------
-module(partisan_hyparview_peer_service_manager).
-behaviour(gen_server).
-behaviour(partisan_peer_service_manager).
-include("partisan_logger.hrl").
-include("partisan.hrl").
-author("Christopher S. Meiklejohn <christopher.meiklejohn@gmail.com>").
-author("Bruno Santiago Vazquez <brunosantiagovazquez@gmail.com>").
%% Defaults
-define(SHUFFLE_INTERVAL, 10000).
-define(RANDOM_PROMOTION_INTERVAL, 5000).
-record(state, {
name :: node(),
node_spec :: partisan:node_spec(),
config :: config(),
active :: active(),
passive :: passive(),
reserved :: reserved(),
out_links :: list(),
tag :: tag(),
epoch :: epoch(),
sent_message_map :: message_id_store(),
recv_message_map :: message_id_store(),
partitions :: partisan_peer_service_manager:partitions()
}).
-type t() :: #state{}.
-type active() :: sets:set(partisan:node_spec()).
-type passive() :: sets:set(partisan:node_spec()).
-type reserved() :: #{atom() := partisan:node_spec()}.
-type tag() :: atom().
%% The epoch indicates how many times the node is restarted.
-type epoch() :: non_neg_integer().
%% The epoch_count indicates how many disconnect messages are generated.
-type epoch_count() :: non_neg_integer().
-type message_id() :: {epoch(), epoch_count()}.
-type message_id_store() :: #{partisan:node_spec() := message_id()}.
-type call() :: {join, partisan:node_spec()}
| {leave, partisan:node_spec()}
| {update_members, [partisan:node_spec()]}
| {resolve_partition, reference()}
| {inject_partition,
partisan:node_spec(),
integer()}
| {reserve, tag()}
| active
| passive
| {active, tag()}
| {send_message, node(), term()}
%% | {forward_message, node(), ...}
%% | {receive_message, node(), ...}
| members
| members_for_orchestration
| get_local_state
| connections
| partitions.
-type cast() :: {join, partisan:node_spec()}
| {receive_message,
partisan:node_spec(),
partisan:channel(),
term()}
| {disconnect, partisan:node_spec()}.
%% PARTISAN_PEER_SERVICE_MANAGER CALLBACKS
-export([cast_message/2]).
-export([cast_message/3]).
-export([cast_message/4]).
-export([decode/1]).
-export([forward_message/2]).
-export([forward_message/3]).
-export([forward_message/4]).
-export([get_local_state/0]).
-export([inject_partition/2]).
-export([join/1]).
-export([leave/0]).
-export([leave/1]).
-export([members/0]).
-export([members_for_orchestration/0]).
-export([on_down/2]).
-export([on_down/3]).
-export([on_up/2]).
-export([on_up/3]).
-export([partitions/0]).
-export([receive_message/3]).
-export([reserve/1]).
-export([resolve_partition/1]).
-export([send_message/2]).
-export([start_link/0]).
-export([supports_capability/1]).
-export([sync_join/1]).
-export([update_members/1]).
%% DEBUG API
-export([active/0]).
-export([active/1]).
-export([passive/0]).
%% GEN_SERVER CALLBACKS
-export([init/1]).
-export([handle_call/3]).
-export([handle_cast/2]).
-export([handle_info/2]).
-export([terminate/2]).
-export([code_change/3]).
%% temporary exceptions
-export([delete_state_from_disk/0]).
-eqwalizer({nowarn_function, decode/1}).
%% -----------------------------------------------------------------------------
%% Notes on HyParView protocol
%%
%% <- `join' - A node that receives a `join' request will start by adding the
%% new node to its active view, even if it has to drop a random node from it (
%% `disconnect'). Then it will send to all other nodes in its active view a
%% `forward_join' request containing the new node identifier
%% <- `forward_join' - A message send by a node to all its active view members
%% when it accepts a join request by another node. This message will be
%% propagated in the overlay using a random walk. Associated to the join
%% procedure, there are two configuration parameters, named Active Random Walk
%% Length (ARWL), that specifies the maximum number of hops a `forward_join'
%% request is propagated, and Passive Random Walk Length (PRWL), that specifies
%% at which point in the walk the node is inserted in a passive view. To use
%% these parameters, the `forward_join' request carries a “time to live” field
%% that is initially set to ARWL and decreased at every hop.
%% <- `disconnect' - a Disconnect notification is sent to the node that has been
%% dropped from the active view. This happens when another node joins the
%% active view taking the place of the dropped one (see join).
%% <- `neighbor'
%% <- `neighbor_request'
%% <- `neighbor_rejected'
%% <- `neighbor_accepted'
%% <- `shuffle_reply'
%% <- `shuffle' - Part of the passive view maintenance. Periodically, each node
%% performs a shuffle operation with one of its neighbors in order to update
%% its passive view. One interesting aspect of our shuffle mechanism is that
%% the identifiers that are exchanged in a shuffle operation are not only from
%% the passive view: a node also sends its own identifier and some nodes
%% collected from its active view to its neighbor. This increases the
%% probability of having nodes that are active in the passive views and ensures
%% that failed nodes are eventually expunged from all passive views.
%%
%% -----------------------------------------------------------------------------
%% =============================================================================
%% PARTISAN_PEER_SERVICE_MANAGER CALLBACKS
%% =============================================================================
%% -----------------------------------------------------------------------------
%% @doc Starts the peer service manager.
%% @end
%% -----------------------------------------------------------------------------
-spec start_link() -> {ok, pid()} | ignore | {error, term()}.
start_link() ->
Opts = [
{spawn_opt, ?PARALLEL_SIGNAL_OPTIMISATION([])}
],
gen_server:start_link({local, ?MODULE}, ?MODULE, [], Opts).
%% -----------------------------------------------------------------------------
%% @doc Returns membership list.
%% @end
%% -----------------------------------------------------------------------------
-spec members() -> [node()].
members() ->
gen_server:call(?MODULE, members, infinity).
%% -----------------------------------------------------------------------------
%% @doc Return membership list.
%% @end
%% -----------------------------------------------------------------------------
-spec members_for_orchestration() -> [partisan:node_spec()].
members_for_orchestration() ->
gen_server:call(?MODULE, members_for_orchestration, infinity).
%% -----------------------------------------------------------------------------
%% @doc Decode state.
%% @end
%% -----------------------------------------------------------------------------
-spec decode({state, sets:set(), any()} | sets:set()) -> list().
decode({state, Active, _Epoch}) ->
decode(Active);
decode(Active) ->
sets:to_list(Active).
%% -----------------------------------------------------------------------------
%% @doc Return local node's view of cluster membership.
%% @end
%% -----------------------------------------------------------------------------
-spec get_local_state() -> {state, Active :: active(), Epoch :: integer()}.
get_local_state() ->
gen_server:call(?MODULE, get_local_state, infinity).
%% -----------------------------------------------------------------------------
%% @doc Register a trigger to fire when a connection drops.
%% @end
%% -----------------------------------------------------------------------------
on_down(_Name, _Function) ->
{error, not_implemented}.
%% -----------------------------------------------------------------------------
%% @doc Register a trigger to fire when a connection drops.
%% @end
%% -----------------------------------------------------------------------------
on_down(_Name, _Function, _Opts) ->
{error, not_implemented}.
%% -----------------------------------------------------------------------------
%% @doc Register a trigger to fire when a connection opens.
%% @end
%% -----------------------------------------------------------------------------
on_up(_Name, _Function) ->
{error, not_implemented}.
%% -----------------------------------------------------------------------------
%% @doc Register a trigger to fire when a connection opens.
%% @end
%% -----------------------------------------------------------------------------
on_up(_Name, _Function, _Opts) ->
{error, not_implemented}.
%% -----------------------------------------------------------------------------
%% @doc Update membership.
%% @end
%% -----------------------------------------------------------------------------
update_members(Members) ->
gen_server:call(?MODULE, {update_members, Members}, infinity).
%% -----------------------------------------------------------------------------
%% @doc Send message to a remote peer service manager.
%% @end
%% -----------------------------------------------------------------------------
send_message(Name, Message) ->
Cmd = {send_message, Name, Message},
gen_server:call(?MODULE, Cmd, infinity).
%% -----------------------------------------------------------------------------
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec cast_message(
Term :: partisan:any_pid() | partisan:any_name(),
Message :: partisan:message()) -> ok.
cast_message(Term, Message) ->
FullMessage = {'$gen_cast', Message},
_ = forward_message(Term, FullMessage, #{}),
ok.
%% -----------------------------------------------------------------------------
%% @doc Cast a message to a remote gen_server.
%% @end
%% -----------------------------------------------------------------------------
cast_message(Node, ServerRef, Message) ->
cast_message(Node, ServerRef, Message, #{}).
%% -----------------------------------------------------------------------------
%% @doc Cast a message to a remote gen_server.
%% @end
%% -----------------------------------------------------------------------------
cast_message(Node, ServerRef, Message, Options) ->
FullMessage = {'$gen_cast', Message},
_ = forward_message(Node, ServerRef, FullMessage, Options),
ok.
%% -----------------------------------------------------------------------------
%% @doc Gensym support for forwarding.
%% @end
%% -----------------------------------------------------------------------------
forward_message(Term, Message) ->
forward_message(Term, Message, #{}).
%% -----------------------------------------------------------------------------
%% @doc Gensym support for forwarding.
%% @end
%% -----------------------------------------------------------------------------
forward_message(PidOrName, Message, _Opts)
when is_pid(PidOrName); is_atom(PidOrName) ->
_ = erlang:send(PidOrName, Message),
ok;
forward_message({Name, Node}, Message, Opts)
when is_atom(Name), is_atom(Node) ->
case Node == partisan:node() of
true ->
_ = erlang:send(Name, Message),
ok;
false ->
forward_message(Node, Name, Message, Opts)
end;
forward_message({global, _} = ServerRef, Message, Opts) ->
?LOG_DEBUG(#{
description => "Message cannot be delivered, global not supported",
destination => ServerRef,
message => Message,
options => Opts
}),
ok;
forward_message({via, _, _} = ServerRef, Message, Opts) ->
?LOG_DEBUG(#{
description => "Message cannot be delivered, global not supported",
destination => ServerRef,
message => Message,
options => Opts
}),
ok;
forward_message(RemoteRef, Message, Opts) ->
partisan_remote_ref:is_pid(RemoteRef)
orelse partisan_remote_ref:is_name(RemoteRef)
orelse error(badarg),
Node = partisan_remote_ref:node(RemoteRef),
Target = partisan_remote_ref:target(RemoteRef),
forward_message(Node, Target, Message, Opts).
%% -----------------------------------------------------------------------------
%% @doc Forward message to registered process on the remote side.
%% @end
%% -----------------------------------------------------------------------------
forward_message(Node, ServerRef, Message, Opts) when is_list(Opts) ->
forward_message(Node, ServerRef, Message, maps:from_list(Opts));
forward_message(Node, ServerRef, Message, Opts) when is_map(Opts) ->
?LOG_TRACE(#{
description => "About to send message",
node => partisan:node(),
process => ServerRef,
message => Message
}),
FullMessage = {forward_message, Node, ServerRef, Message, Opts},
%% Attempt to fast-path, dispatching it directly to the connection process
case partisan_peer_connections:dispatch(FullMessage) of
ok ->
ok;
{error, _} ->
gen_server:call(?MODULE, FullMessage, infinity)
end.
%% -----------------------------------------------------------------------------
%% @doc Receive message from a remote manager.
%% @end
%% -----------------------------------------------------------------------------
receive_message(Peer, Channel, {forward_message, ServerRef, Msg} = Cmd) ->
case partisan_config:get(disable_fast_receive, true) of
true ->
gen_server:call(
?MODULE, {receive_message, Peer, Channel, Cmd}, infinity
);
false ->
partisan_peer_service_manager:deliver(ServerRef, Msg)
end;
receive_message(Peer, Channel, Msg) ->
?LOG_TRACE(#{
description => "Manager received message from peer",
peer_node => Peer,
channel => Channel,
message => Msg
}),
Result = gen_server:call(
?MODULE, {receive_message, Peer, Channel, Msg}, infinity
),
?LOG_TRACE(#{
description => "Processed message from peer",
peer_node => Peer,
channel => Channel,
message => Msg
}),
Result.
%% -----------------------------------------------------------------------------
%% @doc Attempt to join a remote node.
%% @end
%% -----------------------------------------------------------------------------
join(Node) ->
gen_server:call(?MODULE, {join, Node}, infinity).
%% -----------------------------------------------------------------------------
%% @doc Attempt to join a remote node.
%% @end
%% -----------------------------------------------------------------------------
sync_join(_Node) ->
{error, not_implemented}.
%% -----------------------------------------------------------------------------
%% @doc Leave the cluster.
%% @end
%% -----------------------------------------------------------------------------
leave() ->
gen_server:call(?MODULE, {leave, partisan:node()}, infinity).
%% -----------------------------------------------------------------------------
%% @doc Remove another node from the cluster.
%% @end
%% -----------------------------------------------------------------------------
leave(Node) ->
gen_server:call(?MODULE, {leave, Node}, infinity).
%% -----------------------------------------------------------------------------
%% @doc Reserve a slot for the particular tag.
%% @end
%% -----------------------------------------------------------------------------
reserve(Tag) ->
gen_server:call(?MODULE, {reserve, Tag}, infinity).
%% -----------------------------------------------------------------------------
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec supports_capability(Arg :: atom()) -> boolean().
supports_capability(monitoring) ->
false;
supports_capability(_) ->
false.
%% -----------------------------------------------------------------------------
%% @doc Inject a partition.
%% @end
%% -----------------------------------------------------------------------------
inject_partition(Origin, TTL) ->
gen_server:call(?MODULE, {inject_partition, Origin, TTL}, infinity).
%% -----------------------------------------------------------------------------
%% @doc Resolve a partition.
%% @end
%% -----------------------------------------------------------------------------
resolve_partition(Reference) ->
gen_server:call(?MODULE, {resolve_partition, Reference}, infinity).
%% -----------------------------------------------------------------------------
%% @doc Return partitions.
%% @end
%% -----------------------------------------------------------------------------
partitions() ->
gen_server:call(?MODULE, partitions, infinity).
%% =============================================================================
%% DEBUGGING API
%% =============================================================================
%% -----------------------------------------------------------------------------
%% @doc Debugging.
%% @end
%% -----------------------------------------------------------------------------
active() ->
gen_server:call(?MODULE, active, infinity).
%% -----------------------------------------------------------------------------
%% @doc Debugging.
%% @end
%% -----------------------------------------------------------------------------
active(Tag) ->
gen_server:call(?MODULE, {active, Tag}, infinity).
%% -----------------------------------------------------------------------------
%% @doc Debugging.
%% @end
%% -----------------------------------------------------------------------------
passive() ->
gen_server:call(?MODULE, passive, infinity).
%% =============================================================================
%% GEN_SERVER CALLBACKS
%% =============================================================================
-spec init([]) -> {ok, t()} | {stop, reservation_limit_exceeded}.
init([]) ->
%% Seed the random number generator.
partisan_config:seed(),
%% Trap connection process exits.
process_flag(trap_exit, true),
ok = partisan_peer_connections:init(),
#{name := Name} = NodeSpec = partisan:node_spec(),
%% Set logger metadata
logger:set_process_metadata(#{node => Name}),
Epoch = maybe_load_epoch_from_disk(),
Active = sets:add_element(NodeSpec, sets:new([{version, 2}])),
Passive = sets:new([{version, 2}]),
SentMessageMap = maps:new(),
RecvMessageMap = maps:new(),
Partitions = [],
#{
active_max_size := ActiveMaxSize,
active_min_size := _,
active_rwl := _,
passive_max_size := _,
passive_rwl := _,
random_promotion := _,
random_promotion_interval := _,
shuffle_interval := _,
shuffle_k_active := _,
shuffle_k_passive := _,
xbot_enabled := _,
xbot_interval := _
} = Config = partisan_config:get(hyparview),
%% Get tag, if set.
Tag = partisan_config:get(tag, undefined),
%% Reserved server slots.
Reservations = partisan_config:get(reservations, []),
Reserved = maps:from_list([{T, undefined} || T <- Reservations]),
%% Verify we don't have too many reservations.
case length(Reservations) > ActiveMaxSize of
true ->
{stop, reservation_limit_exceeded};
false ->
State = #state{
name = Name,
node_spec = NodeSpec,
config = Config,
active = Active,
passive = Passive,
reserved = Reserved,
tag = Tag,
out_links = [],
epoch = Epoch + 1,
sent_message_map = SentMessageMap,
recv_message_map = RecvMessageMap,
partitions = Partitions
},
%% Schedule periodic maintenance of the passive view.
schedule_passive_view_maintenance(State),
%% Schedule periodic execution of xbot algorithm (optimization)
%% when it is enabled
schedule_xbot_execution(State),
%% Schedule tree peers refresh.
schedule_tree_refresh(State),
%% Schedule periodic random promotion when it is enabled.
schedule_random_promotion(State),
{ok, State}
end.
-spec handle_call(call(), {pid(), term()}, t()) ->
{reply, term(), t()}.
handle_call(partitions, _From, State) ->
{reply, {ok, State#state.partitions}, State};
handle_call({leave, _Node}, _From, State) ->
{reply, {error, not_implemented}, State};
handle_call({join, #{name := _Name} = Node}, _From, State) ->
gen_server:cast(?MODULE, {join, Node}),
{reply, ok, State};
handle_call({update_members, Members}, _, #state{} = State0) ->
State = handle_update_members(Members, State0),
{reply, ok, State};
handle_call({resolve_partition, Reference}, _From, State) ->
Partitions = handle_partition_resolution(Reference, State),
{reply, ok, State#state{partitions = Partitions}};
handle_call({inject_partition, Origin, TTL}, _From, State) ->
Myself = State#state.node_spec,
Reference = make_ref(),
?LOG_DEBUG(#{
description => "Injecting partition",
origin => Origin,
node_spec => Myself,
ttl => TTL
}),
case Origin of
Myself ->
Partitions = handle_partition_injection(
Reference, Origin, TTL, State
),
{reply, {ok, Reference}, State#state{partitions = Partitions}};
_ ->
Result = do_send_message(
Origin,
{inject_partition, Reference, Origin, TTL}
),
case Result of
{error, Error} ->
{reply, {error, Error}, State};
ok ->
{reply, {ok, Reference}, State}
end
end;
handle_call({reserve, Tag}, _From, State) ->
Reserved0 = State#state.reserved,
ActiveMaxSize = config_get(active_max_size, State),
Present = maps:keys(Reserved0),
case length(Present) < ActiveMaxSize of
true ->
Reserved = case lists:member(Tag, Present) of
true ->
Reserved0;
false ->
maps:put(Tag, undefined, Reserved0)
end,
{reply, ok, State#state{reserved = Reserved}};
false ->
{reply, {error, no_available_slots}, State}
end;
handle_call(active, _From, State) ->
{reply, {ok, State#state.active}, State};
handle_call({active, Tag}, _From, State) ->
Result = case maps:find(Tag, State#state.reserved) of
{ok, #{name := Peer}} ->
{ok, Peer};
{ok, undefined} ->
{ok, undefined};
error ->
error
end,
{reply, Result, State};
handle_call(passive, _From, State) ->
{reply, {ok, State#state.passive}, State};
handle_call({send_message, Name, Msg}, _From, State) ->
Result = do_send_message(Name, Msg),
{reply, Result, State};
handle_call({forward_message, Name, ServerRef, Msg, Opts}, _From, State) ->
Partitions = State#state.partitions,
IsPartitioned = lists:any(
fun({_, #{name := N}}) ->
case N of
Name ->
true;
_ ->
false
end
end,
Partitions
),
case IsPartitioned of
true ->
{reply, {error, partitioned}, State};
false ->
Result = do_send_message(
Name,
{forward_message, ServerRef, Msg},
Opts
),
{reply, Result, State}
end;
handle_call({receive_message, _, _, _} = Cmd, _From, State) ->
%% This is important, we immediately cast the message to ourselves to
%% unblock the calling process (partisan_peer_service_server who manages
%% the socket).
%% TODO: We should consider rewriting receive_message/2 to use
%% gen_server:cast directly! Erlang guarantees the delivery
%% order
%% See Issue #5
gen_server:cast(?MODULE, Cmd),
{reply, ok, State};
handle_call(members, _From, State) ->
Active = State#state.active,
Members = members(Active),
?LOG_DEBUG(#{
description => "Node active view",
node_spec => State#state.node_spec,
members => members(Active)
}),
Nodes = [Node || #{name := Node} <- Members],
{reply, {ok, Nodes}, State};
handle_call(members_for_orchestration, _From, State) ->
{reply, {ok, members(State)}, State};
handle_call(get_local_state, _From, State) ->
Active = State#state.active,
Epoch = State#state.epoch,
{reply, {ok, {state, Active, Epoch}}, State};
handle_call(connections, _From, State) ->
%% get a list of all the client connections to the various peers of the
%% active view
Cs = lists:map(
fun(Peer) ->
Pids = partisan_peer_connections:processes(Peer),
?LOG_DEBUG(#{
description => "Peer connection processes",
peer_node => Peer,
connection_processes => Pids
}),
{Peer, Pids}
end,
peers(State)
),
{reply, {ok, Cs}, State};
handle_call(Event, _From, State) ->
?LOG_WARNING(#{description => "Unhandled call event", event => Event}),
{reply, ok, State}.
-spec handle_cast(cast(), t()) -> {noreply, t()}.
handle_cast({join, Peer}, State) ->
Myself = State#state.node_spec,
Tag = State#state.tag,
Epoch = State#state.epoch,
%% Trigger connection.
ok = partisan_peer_service_manager:connect(Peer),
?LOG_DEBUG(#{
description => "Sending JOIN message",
node => Myself,
peer_node => Peer
}),
%% Send the JOIN message to the peer.
%% REVIEW we currently ignore errors, shouldn't we return them?
_ = do_send_message(Peer, {join, Myself, Tag, Epoch}),
{noreply, State};
handle_cast({receive_message, _Peer, Channel, Message}, State) ->
handle_message(Message, Channel, State);
handle_cast({disconnect, Peer}, State0) ->
Active0 = State0#state.active,
case sets:is_element(Peer, Active0) of
true ->
%% If a member of the active view, remove it.
Active = sets:del_element(Peer, Active0),
State = add_to_passive_view(
Peer,
State0#state{active = Active}
),
ok = disconnect(Peer),
{noreply, State};
false ->
{noreply, State0}
end;
handle_cast(Event, State) ->
?LOG_WARNING(#{description => "Unhandled cast event", event => Event}),
{noreply, State}.
-spec handle_info(term(), t()) -> {noreply, t()}.
handle_info(random_promotion, State0) ->
Myself = State0#state.node_spec,
Active0 = State0#state.active,
Passive = State0#state.passive,
Reserved0 = State0#state.reserved,
ActiveMinSize0 = config_get(active_min_size, State0),
Limit = has_reached_limit({active, Active0, Reserved0}, ActiveMinSize0),
State = case Limit of
true ->
%% Do nothing if the active view reaches the ActiveMinSize.
State0;
false ->
Peer = pick_random(Passive, [Myself]),
promote_peer(Peer, State0)
end,
%% Schedule periodic random promotion.
schedule_random_promotion(State),
{noreply, State};
handle_info(tree_refresh, State) ->
%% Get lazily computed outlinks.
OutLinks = retrieve_outlinks(State#state.name),
%% Reschedule.
schedule_tree_refresh(State),
{noreply, State#state{out_links = OutLinks}};
handle_info(passive_view_maintenance, State0) ->
%% The passive view is maintained using a cyclic strategy. Periodically,
%% each node perform a shuffle operation with one of its peers at random.
%% The purpose of the shuffle operation is to update the passive views of
%% the nodes involved in the exchange. The node p that initiates the
%% exchange creates an exchange list with the following contents:
%% - p’s own identifier (node_spec()),
%% - ka nodes from its active view (shuffle_k_active), and
%% - kp nodes from its passive view (shuffle_k_passive)
%% (where ka and kp are protocol parameters).
Myself = State0#state.node_spec,
Active = State0#state.active,
Exchange = select_peers_for_exchange(State0),
%% Select random member of the active list to send the shuffle message to
State = case pick_random(Active, [Myself]) of
undefined ->
State0;
Peer ->
%% Trigger connection.
ok = partisan_peer_service_manager:connect(Peer),
%% Forward shuffle request.
ARWL = config_get(active_rwl, State0),
do_send_message(Peer, {shuffle, Exchange, ARWL, Myself}),
State0
end,
%% Reschedule.
schedule_passive_view_maintenance(State),
{noreply, State};
% handle optimization using xbot algorithm
handle_info(xbot_execution, #state{} = State) ->
Active = State#state.active,
Passive = State#state.passive,
Reserved = State#state.reserved,
ActiveMaxSize = config_get(active_max_size, State),
% check if active view is full
case is_full({active, Active, Reserved}, ActiveMaxSize) of
% if full, check for candidates and try to optimize
true ->
Candidates = pick_random(Passive, 2),
send_optimization_messages(members(Active), Candidates, State);
% in other case, do nothing
false -> ok
end,
%In any case, schedule periodic xbot execution algorithm (optimization)
ok = schedule_xbot_execution(State),
{noreply, State};
handle_info({'EXIT', Pid, Reason}, State0) when is_pid(Pid) ->
?LOG_DEBUG(#{
description => "Active view connection process died.",
process => Pid,
reason => Reason
}),
Myself = State0#state.node_spec,
Active0 = State0#state.active,
Passive0 = State0#state.passive,
%% Prune active connections from map.
try partisan_peer_connections:prune(Pid) of
{Info, _Connections} ->
Peer = partisan_peer_connections:node_spec(Info),
%% If it was in the passive view and our connection attempt failed,
%% remove from the passive view altogether.
Passive = case is_in_passive_view(Peer, Passive0) of
true ->
remove_from_passive_view(Peer, Passive0);
false ->
Passive0
end,
%% If it was in the active view and our connection attempt failed,
%% remove from the active view altogether.
{Active, RemovedFromActive} =
case is_in_active_view(Peer, Active0) of
true ->
{remove_from_active_view(Peer, Active0), true};
false ->
{Active0, false}
end,
State = case RemovedFromActive of
true ->
RandomPeer = pick_random(Passive, [Myself]),
promote_peer(
RandomPeer, State0#state{active=Active, passive=Passive}
);
false ->
State0#state{active=Active, passive=Passive}
end,
?LOG_DEBUG(#{
description => "Active view",
node_spec => Myself,
active_view => members(State)
}),
{noreply, State}
catch
error:badarg ->
{noreply, State0}
end;
handle_info(
{connected, Peer, _Channel, _Tag, _PeerEpoch, _RemoteState}, State) ->
?LOG_DEBUG(#{
description => "Node is now connected",
peer_node => Peer
}),
{noreply, State};
handle_info(Event, State) ->
?LOG_WARNING(#{description => "Unhandled info event", event => Event}),
{noreply, State}.
-spec terminate(term(), t()) -> term().
terminate(_Reason, _State) ->
ok = partisan_peer_connections:kill_all().
-spec code_change(term() | {down, term()}, t(), term()) ->
{ok, t()}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%% =============================================================================
%% PRIVATE
%% =============================================================================
%% @private
handle_message({resolve_partition, Reference}, _, State) ->
Partitions = handle_partition_resolution(Reference, State),
{noreply, State#state{partitions = Partitions}};
handle_message({inject_partition, Reference, Origin, TTL}, _, State) ->
Partitions = handle_partition_injection(Reference, Origin, TTL, State),
{noreply, State#state{partitions = Partitions}};
handle_message(
{join, Peer, PeerTag, PeerEpoch},
_Channel,
#state{node_spec=Myself0,
active=Active0,
tag=Tag0,
sent_message_map=SentMessageMap0,
recv_message_map=RecvMessageMap0}=State0) ->
?LOG_DEBUG(#{
description => "Node is now connected",
node_spec => Myself0,
peer_node => Peer,
peer_epoch => PeerEpoch
}),
IsAddable = is_addable(PeerEpoch, Peer, SentMessageMap0),
NotInActiveView = not sets:is_element(Peer, Active0),
State = case IsAddable andalso NotInActiveView of
true ->
?LOG_DEBUG(#{
description => "Adding peer node to the active view",
peer_node => Peer
}),
%% Establish connections.
ok = partisan_peer_service_manager:connect(Peer),
Connected = partisan_peer_connections:is_connected(Peer),
case Connected of
true ->
%% only find the peer connection will add the peer to the
%% active
%% Add to active view.
State1 = add_to_active_view(Peer, PeerTag, State0),
LastDisconnectId = get_current_id(Peer, RecvMessageMap0),
%% Send the NEIGHBOR message to origin, that will update
%% it's view.
do_send_message(
Peer,
{neighbor, Myself0, Tag0, LastDisconnectId, Peer}
),
%% Random walk for forward join.
%% Since we might have dropped peers from the active view
%% when adding this one we need to use the most up to date
%% active view, and that's the one that's currently in the
%% state also disregard the the new joiner node
Peers =
(members(State1) -- [Myself0]) -- [Peer],
ok = lists:foreach(
fun(P) ->
%% Establish connections.
ok = partisan_peer_service_manager:connect(P),
?LOG_DEBUG(#{
description =>
"Forwarding join of to active view peer",
from => Peer,
to => P
}),
ARWL = config_get(active_rwl, State1),
Message = {
forward_join,
Peer,
PeerTag,
PeerEpoch,
ARWL,
Myself0
},
do_send_message(P, Message),
ok
end,
Peers
),
?LOG_DEBUG(
fun([S]) ->
#{
description => "Active view",
node_spec => Myself0,
active_view => members(S)
}
end,
[State1]
),
%% Notify with event.
notify(State1),
State1;
false ->
State0
end;
false ->
?LOG_DEBUG(#{
description => "Peer node will not be added to the active view",
peer_node => Peer
}),
State0
end,
{noreply, State};
handle_message({neighbor, Peer, PeerTag, DisconnectId, _Sender},
_Channel,
#state{node_spec=Myself0,
sent_message_map=SentMessageMap0}=State0) ->
?LOG_DEBUG(#{
description => "Node received the NEIGHBOR message from peer",
node_spec => Myself0,
peer_node => Peer,
peer_tag => PeerTag
}),
State =
case is_addable(DisconnectId, Peer, SentMessageMap0) of
true ->
%% Establish connections.
ok = partisan_peer_service_manager:connect(Peer),
case partisan_peer_connections:is_connected(Peer) of
true ->
%% Add node into the active view.
State1 = add_to_active_view(
Peer, PeerTag, State0
),
?LOG_DEBUG(#{
description => "Active view",
node_spec => Myself0,
active_view => members(State1)
}),
State1;
false ->
State0
end;
false ->
State0
end,
%% Notify with event.
notify(State),
{noreply, State};
handle_message({forward_join, Peer, PeerTag, PeerEpoch, TTL, Sender},
_Channel,
#state{node_spec=Myself0,
active=Active0,
tag=Tag0,
sent_message_map=SentMessageMap0,
recv_message_map=RecvMessageMap0}=State0) ->
%% When a node p receives a forward_join, it performs the following steps
%% in sequence:
%% i) If the time to live is equal to zero or if the number of
%% nodes in p’s active view is equal to one, it will add the new node to
%% its active view. This step is performed even if a random node must be
%% dropped from the active view. In the later case, the node being ejected
%% from the active view receives a disconnect notification.
%% ii) If the time to live is equal to PRWL, p will insert the new node
%% into its passive view.
%% iii) The time to live field is decremented. iv) If, at this point,
%% n has not been inserted in p’s active view, p will forward the request
%% to a random node in its active view (different from the one from which
%% the request was received).
?LOG_DEBUG("
Node ~p received the FORWARD_JOIN message from ~p about ~p",
[Myself0, Sender, Peer]
),
ActiveViewSize = sets:size(Active0),
State = case TTL =:= 0 orelse ActiveViewSize =:= 1 of
true ->
?LOG_DEBUG(
"FORWARD_JOIN: ttl(~p) expired or only one peer in "
"active view (~p), "
"adding ~p tagged ~p to active view",
[TTL, ActiveViewSize, Peer, PeerTag]
),
IsAddable0 = is_addable(PeerEpoch, Peer, SentMessageMap0),
NotInActiveView0 = not sets:is_element(Peer, Active0),
case IsAddable0 andalso NotInActiveView0 of
true ->
%% Establish connections.
ok = partisan_peer_service_manager:connect(Peer),
case partisan_peer_connections:is_connected(Peer) of
true ->
%% Add to our active view.
State1 = add_to_active_view(Peer, PeerTag, State0),
LastDisconnectId = get_current_id(
Peer, RecvMessageMap0
),
%% Send neighbor message to origin, that will
%% update it's view.
Message = {
neighbor,
Myself0, Tag0, LastDisconnectId, Peer
},
do_send_message(Peer, Message),
?LOG_DEBUG(#{
description => "Active view",
node_spec => Myself0,
active_view => members(State1)
}),
State1;
false ->
State0
end;
false ->
?LOG_DEBUG(
"Peer node ~p will not be added to the active view",
[Peer]
),
State0
end;
false ->
%% If we run out of peers before we hit the PRWL, that's
%% fine, because exchanges between peers will eventually
%% repair the passive view during shuffles.
PRWL = config_get(passive_rwl, State0),
State2 = case TTL =:= PRWL of
true ->
?LOG_DEBUG(
"FORWARD_JOIN: Passive walk ttl expired, "
"adding ~p to the passive view",
[Peer]
),
add_to_passive_view(Peer, State0);
false ->
State0
end,
%% Don't forward the join to the sender, ourself, or the joining
%% peer.
case pick_random(Active0, [Sender, Myself0, Peer]) of
undefined ->
IsAddable1 = is_addable(PeerEpoch, Peer, SentMessageMap0),
NotInActiveView1 = not sets:is_element(Peer, Active0),
case IsAddable1 andalso NotInActiveView1 of
true ->
?LOG_DEBUG(
"FORWARD_JOIN: No node for forward, "
"adding ~p to active view",
[Peer]
),
%% Establish connections.
ok = partisan_peer_service_manager:connect(Peer),
case partisan_peer_connections:is_connected(Peer) of
true ->
%% Add to our active view.
State3 = add_to_active_view(
Peer, PeerTag, State2
),
LastDisconnectId = get_current_id(
Peer, RecvMessageMap0
),
%% Send neighbor message to origin, that
%% will update it's view.
Message = {
neighbor,
Myself0,
Tag0,
LastDisconnectId,
Peer
},
do_send_message(Peer, Message),
?LOG_DEBUG(#{
description => "Active view",
node_spec => Myself0,
active_view => members(State3)
}),
State3;
false ->
State0
end;
false ->
?LOG_DEBUG(
"Peer node ~p will not be added to the "
"active view",
[Peer]
),
State2
end;
Random ->
%% Establish any new connections.
ok = partisan_peer_service_manager:connect(Random),
?LOG_DEBUG("FORWARD_JOIN: forwarding to ~p", [Random]),
Message = {forward_join,
Peer,
PeerTag,
PeerEpoch,
TTL - 1,
Myself0
},
%% Forward join.
do_send_message(Random, Message),
State2
end
end,
%% Notify with event.
notify(State),
{noreply, State};
handle_message({disconnect, Peer, DisconnectId},
_Channel,
#state{node_spec=Myself0,
active=Active0,
passive=Passive,
recv_message_map=RecvMessageMap0}=State0) ->
?LOG_DEBUG("Node ~p received the DISCONNECT message from ~p with ~p",
[Myself0, Peer, DisconnectId]),
case is_valid_disconnect(Peer, DisconnectId, RecvMessageMap0) of
false ->
%% Ignore the older disconnect message.
{noreply, State0};
true ->
%% Remove from active
Active = sets:del_element(Peer, Active0),
?LOG_DEBUG(#{
description => "Active view",
node_spec => Myself0,
active_view => members(Active)
}),
%% Add to passive view.
State1 = add_to_passive_view(Peer,
State0#state{active=Active}),
%% Update the AckMessageMap.
RecvMessageMap = maps:put(Peer, DisconnectId, RecvMessageMap0),
%% Trigger disconnection.
ok = disconnect(Peer),
State =
case sets:size(Active) == 1 of
true ->
%% the peer that disconnected us just got moved to the
%% passive view, exclude it when selecting a new one to
%% move back into the active view
RandomPeer = pick_random(Passive, [Myself0, Peer]),
?LOG_DEBUG(
"Node ~p is isolated, moving random peer ~p "
"from passive to active view",
[RandomPeer, Myself0]
),
promote_peer(RandomPeer,
State1#state{recv_message_map=RecvMessageMap});
false ->
State1#state{recv_message_map=RecvMessageMap}
end,
{noreply, State}
end;
handle_message(
{neighbor_request, Peer, Priority, PeerTag, DisconnectId, Exchange},
_Channel,
#state{} = State0) ->
Myself0 = State0#state.node_spec,
Tag0 = State0#state.tag,
SentMessageMap0 = State0#state.sent_message_map,
RecvMessageMap0 = State0#state.recv_message_map,
?LOG_DEBUG(
"Node ~p received the NEIGHBOR_REQUEST message from ~p with ~p",
[Myself0, Peer, DisconnectId]
),
%% Establish connections.
ok = partisan_peer_service_manager:connect(Peer),
Exchange_Ack = select_peers_for_exchange(State0),
State2 =
case neighbor_acceptable(Priority, PeerTag, State0) of
true ->
case is_addable(DisconnectId, Peer, SentMessageMap0) of
true ->
Connected = partisan_peer_connections:is_connected(
Peer
),
case Connected of
true ->
?LOG_DEBUG(
"Node ~p accepted neighbor peer ~p",
[Myself0, Peer]
),
LastDisconnectId =
get_current_id(Peer, RecvMessageMap0),
%% Reply to acknowledge the neighbor was
%% accepted.
do_send_message(
Peer,
{
neighbor_accepted,
Myself0,
Tag0,
LastDisconnectId,
Exchange_Ack
}
),
State1 = add_to_active_view(
Peer, PeerTag, State0
),
?LOG_DEBUG(#{
description => "Active view",
node_spec => Myself0,
active_view => members(State1)
}),
State1;
false ->
%% the connections does not change, the peer
%% can not be connected
State0
end;
false ->
?LOG_DEBUG(
"Node ~p rejected neighbor peer ~p",
[Myself0, Peer]
),
%% Reply to acknowledge the neighbor was rejected.
do_send_message(
Peer, {neighbor_rejected, Myself0, Exchange_Ack}
),
State0
end;
false ->
?LOG_DEBUG(
"Node ~p rejected neighbor peer ~p",
[Myself0, Peer]
),
%% Reply to acknowledge the neighbor was rejected.
do_send_message(Peer, {neighbor_rejected, Myself0}),
State0
end,
State = merge_exchange(Exchange, State2),
%% Notify with event.
notify(State),
{noreply, State};
handle_message({neighbor_rejected, Peer, Exchange},
_Channel,
#state{node_spec=Myself0} = State0) ->
?LOG_DEBUG("Node ~p received the NEIGHBOR_REJECTED message from ~p",
[Myself0, Peer]),
%% Trigger disconnection.
ok = disconnect(Peer),
State = merge_exchange(Exchange, State0),
{noreply, State};
handle_message({neighbor_accepted, Peer, PeerTag, DisconnectId, Exchange},
_Channel,
#state{node_spec=Myself0,
sent_message_map=SentMessageMap0} = State0) ->
?LOG_DEBUG(
"Node ~p received the NEIGHBOR_ACCEPTED message from ~p with ~p",
[Myself0, Peer, DisconnectId]
),
State1 = case is_addable(DisconnectId, Peer, SentMessageMap0) of
true ->
%% Add node into the active view.
add_to_active_view(Peer, PeerTag, State0);
false ->
State0
end,
State = merge_exchange(Exchange, State1),
%% Notify with event.
notify(State),
{noreply, State};
handle_message({shuffle_reply, Exchange, _Sender}, _Channel, State0) ->
State = merge_exchange(Exchange, State0),
{noreply, State};
handle_message({shuffle, Exchange, TTL, Sender},
_Channel,
#state{node_spec=Myself,
active=Active0,
passive=Passive0}=State0) ->
?LOG_DEBUG(
"Node ~p received the SHUFFLE message from ~p",
[Myself, Sender]
),
%% Forward to random member of the active view.
State = case TTL > 0 andalso sets:size(Active0) > 1 of
true ->
State1 = case pick_random(Active0, [Sender, Myself]) of
undefined ->
State0;
Random ->
%% Trigger connection.
ok = partisan_peer_service_manager:connect(Random),
%% Forward shuffle until random walk complete.
do_send_message(
Random,
{shuffle, Exchange, TTL - 1, Myself}
),
State0
end,
State1;
false ->
%% Randomly select nodes from the passive view and respond.
ResponseExchange = shuffle(members(Passive0), length(Exchange)),
%% Trigger connection.
ok = partisan_peer_service_manager:connect(Sender),
do_send_message(
Sender,
{shuffle_reply, ResponseExchange, Myself}
),
State2 = merge_exchange(Exchange, State0),
State2
end,
{noreply, State};
handle_message({relay_message, NodeSpec, Message, TTL}, Channel, #state{} = State) ->
?LOG_TRACE(
"Node ~p received tree relay to ~p", [partisan:node(), NodeSpec]
),
OutLinks = State#state.out_links,
ActiveMembers = [P || #{name := P} <- members(State)],
Opts = #{
out_links => OutLinks,
channel => Channel
},
case lists:member(NodeSpec, ActiveMembers) of
true ->
do_send_message(NodeSpec, Message, Opts#{transitive => true});
false ->
case TTL of
0 ->
%% No longer forward.
?LOG_DEBUG(
"TTL expired, dropping message for node ~p: ~p",
[NodeSpec, Message]
),
ok;
_ ->
do_tree_forward(NodeSpec, Message, Opts, TTL),
ok
end
end,
{noreply, State};
handle_message({forward_message, ServerRef, Message}, _Channel, State) ->
partisan_peer_service_manager:deliver(ServerRef, Message),
{noreply, State};
handle_message(
{optimization_reply, true, _, Initiator, Candidate, undefined},
_Channel,
State) ->
#{name := MyName} = Initiator,
#{name := CandidateName} = Candidate,
?LOG_DEBUG(
"XBOT: Received optimization reply message at Node ~p from ~p",
[MyName, CandidateName]
),
%% Revise this behaviour, when candidate accepts immediately because it has
%% availability in his active view
%% what to do with old node?? we cannot disconnect from it because maybe it
%% will be isolated
%Check = is_in_active_view(OldNode, Active),
%if Check ->
%remove_from_active_view(OldNode, Active),
%add_to_passive_view(OldNode, State)
% do_disconnect(OldNode, State)
%end,
%promote_peer(Candidate, State),
_ = send_join(Candidate, State),
?LOG_DEBUG(
"XBOT: Finished optimization round started by Node ~p ",
[MyName]
),
{noreply, State};
handle_message(
{optimization_reply, true, OldNode, Initiator, Candidate, _},
_Channel,
#state{} = State) ->
Active = State#state.active,
#{name := InitiatorName} = Initiator,
#{name := CandidateName} = Candidate,
?LOG_DEBUG(
"XBOT: Received optimization reply message at Node ~p from ~p",
[InitiatorName, CandidateName]
),
case is_in_active_view(OldNode, Active) of
true ->
%remove_from_active_view(OldNode, Active);
do_disconnect(OldNode, State);
false ->
ok
end,
%% promote_peer(Candidate, State),
_ = send_join(Candidate, State),
?LOG_DEBUG(
"XBOT: Finished optimization round started by Node ~p ",
[InitiatorName]
),
{noreply, State};
handle_message({optimization_reply, false, _, _, _, _}, _Channel, State) ->
{noreply, State};
handle_message(
{optimization, _, OldNode, Initiator, Candidate, undefined},
_Channel,
#state{} = State) ->
Active = State#state.active,
Reserved = State#state.reserved,
ActiveMaxSize = config_get(active_max_size, State),
#{name := CandidateName} = Candidate,
#{name := InitiatorName} = Initiator,
?LOG_DEBUG(
"XBOT: Received optimization message at Node ~p from ~p",
[CandidateName, InitiatorName]
),
Check = is_full({active, Active, Reserved}, ActiveMaxSize),
if not Check ->
%add_to_active_view(Candidate, MyTag, State),
_ = send_join(Initiator, State),
ok = partisan_peer_service_manager:connect(Initiator),
Message = {
optimization_reply,
true,
OldNode,
Initiator,
Candidate,
undefined
},
_ = do_send_message(Initiator, Message),
?LOG_DEBUG(
"XBOT: Sending optimization reply message to Node ~p from ~p", [InitiatorName, CandidateName]
);
true ->
DisconnectNode = select_disconnect_node(sets:to_list(Active)),
#{name := DisconnectName} = DisconnectNode,
ok = partisan_peer_service_manager:connect(DisconnectNode),
Message = {
replace,
undefined,
OldNode,
Initiator,
Candidate,
DisconnectNode
},
_ = do_send_message(DisconnectNode, Message),
?LOG_DEBUG(
"XBOT: Sending replace message to Node ~p from ~p", [DisconnectName, CandidateName]
)
end,
{noreply, State};
handle_message(
{replace_reply, true, OldNode, Initiator, Candidate, DisconnectNode},
_Channel,
#state{} = State) ->
#{name := InitiatorName} = Initiator,
#{name := DisconnectName} = DisconnectNode,
#{name := CandidateName} = Candidate,
?LOG_DEBUG("XBOT: Received replace reply message at Node ~p from ~p", [CandidateName, DisconnectName]),
%remove_from_active_view(DisconnectNode, Active),
do_disconnect(DisconnectNode, State),
%add_to_active_view(Initiator, MyTag, State),
_ = send_join(Initiator, State),
ok = partisan_peer_service_manager:connect(Initiator),
_ = do_send_message(Initiator,{optimization_reply, true, OldNode, Initiator, Candidate, DisconnectNode}),
?LOG_DEBUG("XBOT: Sending optimization reply to Node ~p from ~p", [InitiatorName, CandidateName]),
{noreply, State};
handle_message(
{replace_reply, false, OldNode, Initiator, Candidate, DisconnectNode},_Channel,
#state{} = State) ->
#{name := InitiatorName} = Initiator,
#{name := DisconnectName} = DisconnectNode,
#{name := CandidateName} = Candidate,
?LOG_DEBUG("XBOT: Received replace reply message at Node ~p from ~p", [CandidateName, DisconnectName]),
ok = partisan_peer_service_manager:connect(Initiator),
_ = do_send_message(Initiator,{optimization_reply, false, OldNode, Initiator, Candidate, DisconnectNode}),
?LOG_DEBUG("XBOT: Sending optimization reply to Node ~p from ~p", [InitiatorName, CandidateName]),
{noreply, State};
handle_message(
{replace, _, OldNode, Initiator, Candidate, DisconnectNode},
_Channel,
#state{} = State) ->
#{name := DisconnectName} = DisconnectNode,
#{name := CandidateName} = Candidate,
#{name := OldName} = OldNode,
?LOG_DEBUG(
"XBOT: Received replace message at Node ~p from ~p",
[DisconnectName, CandidateName]
),
Check = is_better(?HYPARVIEW_XBOT_ORACLE, OldNode, Candidate),
if
not Check ->
ok = partisan_peer_service_manager:connect(Candidate),
_ = do_send_message(
Candidate,
{
replace_reply,
false,
OldNode,
Initiator,
Candidate,
DisconnectNode
}
),
?LOG_DEBUG(
"XBOT: Sending replace reply to Node ~p from ~p",
[CandidateName, DisconnectName]
);
true ->
ok = partisan_peer_service_manager:connect(OldNode),
_ = do_send_message(OldNode,{switch, undefined, OldNode, Initiator, Candidate, DisconnectNode}),
?LOG_DEBUG(
"XBOT: Sending switch to Node ~p from ~p",
[OldName, DisconnectName]
)
end,
{noreply, State};
handle_message(
{switch_reply, true, OldNode, Initiator, Candidate, DisconnectNode},
_Channel,
#state{} = State) ->
#{name := DisconnectName} = DisconnectNode,
#{name := CandidateName} = Candidate,
#{name := OldName} = OldNode,
?LOG_DEBUG("XBOT: Received switch reply message at Node ~p from ~p", [DisconnectName, OldName]),
%remove_from_active_view(Candidate, Active),
do_disconnect(Candidate, State),
%add_to_active_view(OldNode, MyTag, State),
_ = send_join(OldNode, State),
ok = partisan_peer_service_manager:connect(Candidate),
_ = do_send_message(Candidate,{replace_reply, true, OldNode, Initiator, Candidate, DisconnectNode}),
?LOG_DEBUG("XBOT: Sending replace reply to Node ~p from ~p", [CandidateName, DisconnectName]),
{noreply, State};
handle_message(
{switch_reply, false, OldNode, Initiator, Candidate, DisconnectNode},
_Channel,
#state{} = State) ->
#{name := DisconnectName} = DisconnectNode,
#{name := CandidateName} = Candidate,
#{name := OldName} = OldNode,
?LOG_DEBUG("XBOT: Received switch reply message at Node ~p from ~p", [DisconnectName, OldName]),
ok = partisan_peer_service_manager:connect(Candidate),
_ = do_send_message(Candidate, {replace_reply, false, OldNode, Initiator, Candidate, DisconnectNode}),
?LOG_DEBUG("XBOT: Sending replace reply to Node ~p from ~p", [CandidateName, DisconnectName]),
{noreply, State};
handle_message(
{switch, _, OldNode, Initiator, Candidate, DisconnectNode},
_Channel,
#state{active = Active} = State) ->
#{name := DisconnectName} = DisconnectNode,
#{name := OldName} = OldNode,
?LOG_DEBUG("XBOT: Received switch message at Node ~p from ~p", [OldName, DisconnectName]),
Check = is_in_active_view(Initiator, Active),
if Check ->
%remove_from_active_view(Initiator, Active),
do_disconnect(Initiator, State),
%add_to_active_view(DisconnectNode, MyTag, State),
_ = send_join(DisconnectNode, State),
ok = partisan_peer_service_manager:connect(DisconnectNode),
_ = do_send_message(DisconnectNode, {switch_reply, true, OldNode, Initiator, Candidate, DisconnectNode}),
?LOG_DEBUG("XBOT: Sending switch reply to Node ~p from ~p", [DisconnectName, OldName]);
true ->
ok = partisan_peer_service_manager:connect(DisconnectNode),
_ = do_send_message(DisconnectNode, {switch_reply, false, OldNode, Initiator, Candidate, DisconnectNode}),
?LOG_DEBUG("XBOT: Sending switch reply to Node ~p from ~p", [DisconnectName, OldName])
end,
{noreply, State}.
%% @private
send_join(Peer, #state{node_spec=Myself0, tag=Tag0, epoch=Epoch0}) ->
%% Trigger connection.
ok = partisan_peer_service_manager:connect(Peer),
?LOG_DEBUG("Node ~p sends the JOIN message to ~p", [Myself0, Peer]),
%% Send the JOIN message to the peer.
do_send_message(Peer, {join, Myself0, Tag0, Epoch0}).
%% @private
zero_epoch() ->
Epoch = 0,
persist_epoch(Epoch),
Epoch.
%% @private
data_root() ->
case application:get_env(partisan, partisan_data_dir) of
{ok, PRoot} ->
filename:join(PRoot, "peer_service");
undefined ->
undefined
end.
%% @private
write_state_to_disk(Epoch) ->
case data_root() of
undefined ->
ok;
Dir ->
File = filename:join(Dir, "cluster_state"),
ok = filelib:ensure_dir(File),
ok = file:write_file(File, term_to_binary(Epoch))
end.
%% @private
delete_state_from_disk() ->
case data_root() of
undefined ->
ok;
Dir ->
File = filename:join(Dir, "cluster_state"),
ok = filelib:ensure_dir(File),
case file:delete(File) of
ok ->
?LOG_DEBUG(#{
description => "Leaving cluster, removed cluster_state"
});
{error, Reason} ->
?LOG_DEBUG(
"Unable to remove cluster_state for reason ~p",
[Reason]
)
end
end.
%% @private
maybe_load_epoch_from_disk() ->
case data_root() of
undefined ->
zero_epoch();
Dir ->
case filelib:is_regular(filename:join(Dir, "cluster_state")) of
true ->
{ok, Bin} = file:read_file(filename:join(Dir, "cluster_state")),
binary_to_term(Bin);
false ->
zero_epoch()
end
end.
%% @private
persist_epoch(Epoch) ->
write_state_to_disk(Epoch).
%% @private
members(#state{active = Set}) ->
members(Set);
members(Set) ->
sets:to_list(Set).
%% @private
peers(#state{active = Set, node_spec = NodeSpec}) ->
sets:to_list(sets:del_element(NodeSpec, Set)).
%% @private
-spec disconnect(Node :: partisan:node_spec()) -> ok.
disconnect(Node) ->
try partisan_peer_connections:prune(Node) of
{_Info, Connections} ->
[
begin
Pid = partisan_peer_connections:pid(Connection),
?LOG_DEBUG(
"disconnecting node ~p by stopping connection pid ~p",
[Node, Pid]
),
unlink(Pid),
_ = catch gen_server:stop(Pid)
end
|| Connection <- Connections
],
ok
catch
error:badarg ->
ok
end.
%% @private
-spec do_send_message(
Node :: atom() | partisan:node_spec(),
Message :: partisan:message()) ->
ok | {error, disconnected} | {error, not_yet_connected} | {error, term()}.
do_send_message(Node, Message) ->
%% transitive is disabled
Opts = #{},
do_send_message(Node, Message, Opts).
%% @private
-spec do_send_message(
Node :: atom() | partisan:node_spec(),
Message :: partisan:message(),
Options :: map()) ->
ok | {error, disconnected} | {error, not_yet_connected} | {error, term()}.
do_send_message(Node, Message, Options) when is_atom(Node), is_map(Options) ->
%% TODO Shouldn't broadcast default to true?
%% otherwise we will only forward to
%% nodes in the active view that are connected.
%% Also why do we have 2 options
Broadcast = partisan_config:get(broadcast, false),
Transitive = maps:get(transitive, Options, false),
case partisan_peer_connections:dispatch_pid(Node) of
{ok, Pid} ->
%% We have a connection to the destination Node.
try
gen_server:call(Pid, {send_message, Message})
catch
Class:EReason ->
?LOG_DEBUG(
"failed to send a message to ~p due to ~p:~p",
[Node, Class, EReason]
),
{error, EReason}
end;
{error, Reason} ->
case Reason of
not_yet_connected ->
?LOG_DEBUG(#{
description =>
"Node not yet connected to peer node "
"when sending message.",
message => Message,
peer_node => Node,
options => #{
broadcast => Broadcast,
transitive => Transitive
}
});
disconnected ->
?LOG_DEBUG(#{
description =>
"Node disconnected to peer node "
"when sending message.",
message => Message,
peer_node => Node,
options => #{
broadcast => Broadcast,
transitive => Transitive
}
})
end,
%% TODO use retransmission and acks
case {Broadcast, Transitive} of
{true, true} ->
TTL = partisan_config:get(relay_ttl, ?RELAY_TTL),
do_tree_forward(Node, Message, Options, TTL);
{_, _} ->
{error, Reason}
end
end;
do_send_message(Node, Message, Options) when is_atom(Node), is_list(Options) ->
do_send_message(Node, Message, maps:from_list(Options));
do_send_message(#{name := Node}, Message, Options) ->
do_send_message(Node, Message, Options).
%% @private
pick_random(View, Omit) ->
List = members(View) -- lists:flatten([Omit]),
%% Catch exceptions where there may not be enough members.
try
Index = rand:uniform(length(List)),
lists:nth(Index, List)
catch
_:_ ->
undefined
end.
%% -----------------------------------------------------------------------------
%% @private
%% @doc Returns a list of `K' values extracted randomly from `L'.
%% @end
%% -----------------------------------------------------------------------------
shuffle(L, K) when is_list(L) ->
%% We use maps instead of lists:sort/1 which is faster for longer lists
%% and uses less memory (erts_debug:size/1).
maps:values(
maps:from_list(
lists:sublist([{rand:uniform(), N} || N <- L], K)
)
).
%% -----------------------------------------------------------------------------
%% @private
%% @doc
%% @end
%% -----------------------------------------------------------------------------
select_peers_for_exchange(#state{} = State) ->
Myself = State#state.node_spec,
Active = State#state.active,
Passive = State#state.passive,
KActive = config_get(shuffle_k_active, State),
KPassive = config_get(shuffle_k_passive, State),
L = [Myself | shuffle(members(Active), KActive)]
++ shuffle(members(Passive), KPassive),
lists:usort(L).
%% @doc Add to the active view.
%%
%% However, interesting race condition here: if the passive random walk
%% timer exceeded and the node was added to the passive view, we might
%% also have the active random walk timer exceed *after* because of a
%% network delay; if so, we have to remove this element from the passive
%% view, otherwise it will exist in both places.
%%
add_to_active_view(#{name := Name}=Peer, Tag,
#state{active=Active0,
node_spec=Myself,
passive=Passive0,
reserved=Reserved0}=State0) ->
ActiveMaxSize = config_get(active_max_size, State0),
IsNotMyself = not (Name =:= partisan:node()),
NotInActiveView = not sets:is_element(Peer, Active0),
case IsNotMyself andalso NotInActiveView of
true ->
%% See above for more information.
Passive = remove_from_passive_view(Peer, Passive0),
State1 = State0#state{passive = Passive},
IsFull = is_full({active, Active0, Reserved0}, ActiveMaxSize),
State2 = case IsFull of
true ->
drop_random_element_from_active_view(State1);
false ->
State1
end,
?LOG_DEBUG(
"Node ~p adds ~p to active view with tag ~p",
[Myself, Peer, Tag]
),
%% Add to the active view.
Active = sets:add_element(Peer, State2#state.active),
%% Fill reserved slot if necessary.
Reserved = case maps:find(Tag, Reserved0) of
{ok, undefined} ->
?LOG_DEBUG(#{
description => "Node added to reserved slot!"
}),
maps:put(Tag, Peer, Reserved0);
{ok, _} ->
%% Slot already filled, treat this as a normal peer.
?LOG_DEBUG(#{
description =>
"Node added to active view, "
"but reserved slot already full!"
}),
Reserved0;
error ->
?LOG_DEBUG("Tag is not reserved: ~p ~p", [Tag, Reserved0]),
Reserved0
end,
State = State2#state{
active = Active,
passive = Passive,
reserved = Reserved
},
persist_epoch(State#state.epoch),
State;
false ->
State0
end.
%% -----------------------------------------------------------------------------
%% @private
%% @doc Add to the passive view.
%% @end
%% -----------------------------------------------------------------------------
add_to_passive_view(#{name := Name} = Peer, #state{} = State0) ->
Myself = State0#state.node_spec,
Active0 = State0#state.active,
Passive0 = State0#state.passive,
IsNotMyself = not (Name =:= partisan:node()),
NotInActiveView = not sets:is_element(Peer, Active0),
NotInPassiveView = not sets:is_element(Peer, Passive0),
Allowed = IsNotMyself andalso NotInActiveView andalso NotInPassiveView,
Passive = case Allowed of
true ->
PassiveMaxSize = config_get(passive_max_size, State0),
Passive1 = case is_full({passive, Passive0}, PassiveMaxSize) of
true ->
Random = pick_random(Passive0, [Myself]),
sets:del_element(Random, Passive0);
false ->
Passive0
end,
sets:add_element(Peer, Passive1);
false ->
Passive0
end,
State = State0#state{passive = Passive},
persist_epoch(State#state.epoch),
State.
%% @private
is_full({active, Active, Reserved}, MaxSize) ->
%% Find the slots that are reserved, but not filled.
Open = maps:fold(
fun
(Key, undefined, Acc) ->
[Key | Acc];
(_, _, Acc) ->
Acc
end,
[],
Reserved
),
sets:size(Active) + length(Open) >= MaxSize;
is_full({passive, Passive}, MaxSize) ->
sets:size(Passive) >= MaxSize.
%% -----------------------------------------------------------------------------
%% @private
%% @doc Process of removing a random element from the active view.
%% @end
%% -----------------------------------------------------------------------------
drop_random_element_from_active_view(
#state{node_spec=Myself0,
active=Active0,
reserved=Reserved0,
epoch=Epoch0,
sent_message_map=SentMessageMap0}=State0) ->
ReservedPeers = maps:fold(fun(_K, V, Acc) -> [V | Acc] end,
[],
Reserved0),
%% Select random peer, but omit the peers in reserved slots and omit
%% ourself from the active view.
case pick_random(Active0, [Myself0, ReservedPeers]) of
undefined ->
State0;
Peer ->
?LOG_DEBUG("Removing and disconnecting peer: ~p", [Peer]),
%% Remove from the active view.
Active = sets:del_element(Peer, Active0),
%% Add to the passive view.
State = add_to_passive_view(Peer, State0#state{active = Active}),
%% Trigger connection.
ok = partisan_peer_service_manager:connect(Peer),
%% Get next disconnect id for the peer.
NextId = get_next_id(Peer, Epoch0, SentMessageMap0),
%% Update the SentMessageMap.
SentMessageMap = maps:put(Peer, NextId, SentMessageMap0),
%% Let peer know we are disconnecting them.
do_send_message(Peer, {disconnect, Myself0, NextId}),
%% Trigger disconnection.
ok = disconnect(Peer),
?LOG_DEBUG(
fun([A]) ->
#{
description => "Active view",
node_spec => Myself0,
active_view => members(A)
}
end,
[Active]
),
State#state{sent_message_map = SentMessageMap}
end.
%% @private
remove_from_passive_view(Peer, Passive) ->
sets:del_element(Peer, Passive).
%% @private
is_in_passive_view(Peer, Passive) ->
sets:is_element(Peer, Passive).
%% @private
remove_from_active_view(Peer, Active) ->
sets:del_element(Peer, Active).
%% @private
is_in_active_view(Peer, Active) ->
sets:is_element(Peer, Active).
%% @private
neighbor_acceptable(high, _, _) ->
%% Always true.
true;
neighbor_acceptable(_, Tag, #state{} = State) ->
Reserved = State#state.reserved,
case reserved_slot_available(Tag, Reserved) of
true ->
%% Always take.
true;
_ ->
%% Otherwise, only if we have a slot available.
Active = State#state.active,
ActiveMaxSize = config_get(active_max_size, State),
not is_full({active, Active, Reserved}, ActiveMaxSize)
end.
%% @private
merge_exchange(Exchange, #state{} = State) ->
%% Remove ourself and active set members from the exchange.
Myself = State#state.node_spec,
Active = State#state.active,
ToAdd = lists:usort(Exchange -- ([Myself] ++ members(Active))),
%% Add to passive view.
lists:foldl(fun(X, P) -> add_to_passive_view(X, P) end, State, ToAdd).
%% @private
handle_update_members(Members, State) ->
merge_exchange(Members, State).
%% @private
notify(#state{active = Active}) ->
_ = catch partisan_peer_service_events:update(Active),
ok.
%% @private
reserved_slot_available(Tag, Reserved) ->
case maps:find(Tag, Reserved) of
{ok, undefined} ->
true;
_ ->
false
end.
%% %% @private
%%remove_from_reserved(Peer, Reserved) ->
%% maps:fold(fun(K, V, Acc) ->
%% case V of
%% Peer ->
%% Acc;
%% _ ->
%% maps:put(K, V, Acc)
%% end
%% end, maps:new(), Reserved).
%% @private
get_current_id(Peer, MessageMap) ->
case maps:find(Peer, MessageMap) of
{ok, Id} ->
Id;
error ->
%% Default value for the messageId:
%% {First start, No disconnect}
{1, 0}
end.
%% @private
get_next_id(Peer, MyEpoch, SentMessageMap) ->
case maps:find(Peer, SentMessageMap) of
{ok, {MyEpoch, Cnt}} ->
{MyEpoch, Cnt + 1};
error ->
{MyEpoch, 1}
end.
%% @private
is_valid_disconnect(Peer, {IdEpoch, IdCnt}, AckMessageMap) ->
case maps:find(Peer, AckMessageMap) of
error ->
true;
{ok, {Epoch, Cnt}} ->
case IdEpoch > Epoch of
true ->
true;
false ->
IdCnt > Cnt
end
end.
%% @private
is_addable({IdEpoch, IdCnt}, Peer, SentMessageMap) ->
case maps:find(Peer, SentMessageMap) of
error ->
true;
{ok, {Epoch, Cnt}} ->
case IdEpoch > Epoch of
true ->
true;
false when IdEpoch == Epoch ->
IdCnt >= Cnt;
false ->
false
end
end;
is_addable(PeerEpoch, Peer, SentMessageMap) ->
case maps:find(Peer, SentMessageMap) of
error ->
true;
{ok, {Epoch, _Cnt}} ->
PeerEpoch >= Epoch
end.
%% @private
promote_peer(undefined, State) ->
State;
promote_peer(Peer, #state{} = State) ->
Myself = State#state.node_spec,
Tag = State#state.tag,
RecvMessageMap0 = State#state.recv_message_map,
?LOG_DEBUG("Node ~p sends the NEIGHBOR_REQUEST to ~p", [Myself, Peer]),
Exchange = select_peers_for_exchange(State),
%% Trigger connection.
ok = partisan_peer_service_manager:connect(Peer),
LastDisconnectId = get_current_id(Peer, RecvMessageMap0),
do_send_message(
Peer,
{neighbor_request, Myself, high, Tag, LastDisconnectId, Exchange}
),
State.
%% @private
has_reached_limit({active, Active, Reserved}, LimitActiveSize) ->
%% Find the slots that are reserved, but not filled.
Open = maps:fold(
fun(Key, Value, Acc) ->
case Value of
undefined ->
[Key | Acc];
_ ->
Acc
end
end,
[],
Reserved
),
sets:size(Active) + length(Open) >= LimitActiveSize.
%% @private
propagate_partition_injection(Ref, Origin, TTL, Peer) ->
?LOG_DEBUG("Forwarding partition request to: ~p", [Peer]),
do_send_message(Peer, {inject_partition, Ref, Origin, TTL}).
%% @private
propagate_partition_resolution(Reference, Peer) ->
?LOG_DEBUG("Forwarding partition request to: ~p", [Peer]),
do_send_message(Peer, {resolve_partition, Reference}).
%% @private
handle_partition_injection(Reference, _Origin, TTL, #state{} = State) ->
Myself = State#state.node_spec,
Members = members(State#state.active),
Partitions0 = State#state.partitions,
%% If the TTL hasn't expired, re-forward the partition injection
%% request.
case TTL > 0 of
true ->
[
propagate_partition_injection(Reference, Myself, TTL - 1, Peer)
|| Peer <- Members
];
false ->
ok
end,
%% Update partition table marking all immediate neighbors as
%% partitioned.
Partitions0 ++ lists:map(
fun(Peer) ->
{Reference, Peer}
end,
Members
).
%% @private
handle_partition_resolution(Reference, #state{} = State) ->
Members = members(State#state.active),
Partitions0 = State#state.partitions,
%% Remove partitions.
Partitions =
lists:foldl(
fun({Ref, Peer}, Acc) ->
case Reference of
Ref ->
Acc;
_ ->
Acc ++ [{Ref, Peer}]
end
end,
[],
Partitions0
),
%% If the list hasn't changed, then don't further propagate
%% the message.
case Partitions == Partitions0 of
true ->
ok;
false ->
[
propagate_partition_resolution(Reference, Peer)
|| Peer <- Members
]
end,
Partitions.
%% @private
do_tree_forward(Node, Message, Options, TTL) ->
MyNode = partisan:node(),
?LOG_TRACE(
"Attempting to forward message ~p from ~p to ~p.",
[Message, MyNode, Node]
),
%% Preempt with user-supplied outlinks.
OutLinks = case maps:get(out_links, Options, undefined) of
undefined ->
try retrieve_outlinks(MyNode) of
Value ->
Value
catch
_:Reason ->
?LOG_INFO(#{
description => "Outlinks retrieval failed",
reason => Reason
}),
[]
end;
OL ->
OL -- [MyNode]
end,
%% Send messages, but don't attempt to forward again if we aren't
%% connected.
_ = lists:foreach(
fun(Relay) ->
?LOG_TRACE(
"Forwarding relay message ~p to node ~p "
"for node ~p from node ~p",
[Message, Relay, Node, MyNode]
),
RelayMessage = {relay_message, Node, Message, TTL - 1},
do_send_message(
Relay,
RelayMessage,
maps:without([transitive], Options)
)
end,
OutLinks
),
ok.
%% @private
retrieve_outlinks(Root) ->
?LOG_TRACE(#{description => "About to retrieve outlinks..."}),
OutLinks =
try
{EagerPeers, _LazyPeers} =
partisan_plumtree_broadcast:debug_get_peers(Root, Root, 1000),
ordsets:to_list(EagerPeers) -- [Root]
catch
_:Reason ->
?LOG_INFO(#{
description => "Request to get outlinks failed",
reason => Reason
}),
[]
end,
?LOG_TRACE("Finished getting outlinks: ~p", [OutLinks]),
OutLinks.
%% =============================================================================
%% PRIVATE CONFIG
%% =============================================================================
%% @private
config_get(Key, #state{config = C}) ->
maps:get(Key, C).
%% =============================================================================
%% PRIVATE VIEW MAINTENANCE SCHEDULING
%% =============================================================================
%% @private
schedule_tree_refresh(_State) ->
case partisan_config:get(broadcast, false) of
true ->
Time = partisan_config:get(tree_refresh, 1000),
erlang:send_after(Time, ?MODULE, tree_refresh);
false ->
ok
end.
%% @private
schedule_passive_view_maintenance(State) ->
Time = config_get(shuffle_interval, State),
erlang:send_after(Time, ?MODULE, passive_view_maintenance).
%% @private
schedule_random_promotion(#state{config = #{random_promotion := true} = C}) ->
Time = maps:get(random_promotion_interval, C),
erlang:send_after(Time, ?MODULE, random_promotion);
schedule_random_promotion(_) ->
ok.
%% =============================================================================
%% %% PRIVATE VIEW MAINTENANCE: X-BOT OPTIMIZATION
%% =============================================================================
%% @private
schedule_xbot_execution(#state{config = #{xbot_enabled := true} = C}) ->
Time = maps:get(xbot_interval, C),
erlang:send_after(Time, ?MODULE, xbot_execution);
schedule_xbot_execution(_) ->
ok.
%% -----------------------------------------------------------------------------
%% @private
%% @doc Send optimization messages when apply
%% @end
%% -----------------------------------------------------------------------------
send_optimization_messages(_, [], _) ->
ok;
send_optimization_messages(Active, L, State) ->
% check each first candidate against every node in the active view
_ = [process_candidate(Active, X, State) || X <- L],
ok.
%% -----------------------------------------------------------------------------
%% @private
%% @doc check if a candidate is valid and send message to try optimization
%% we only send an optimization messages for one node in active view, once we
%% have sent it we stop searching possibilities
%% @end
%% -----------------------------------------------------------------------------
process_candidate([], _, _) ->
ok;
process_candidate([H|T], Candidate, #state{} = State) ->
#{name := MyName} = Myself = State#state.node_spec,
#{name := CandidateName} = Candidate,
case is_better(?HYPARVIEW_XBOT_ORACLE, Candidate, H) of
true ->
ok = partisan_peer_service_manager:connect(Candidate),
%% if candidate is better that first node in active view,
%% send optimization message
Msg = {optimization, undefined, H, Myself, Candidate, undefined},
_ = do_send_message(Candidate, Msg),
?LOG_DEBUG(
"XBOT: Optimization message sent to Node ~p from ~p",
[CandidateName, MyName]
);
false ->
process_candidate(T, Candidate, State)
end.
%% -----------------------------------------------------------------------------
%% @private
%% @doc Determine if New node is better than Old node based on ping (latency)
%% @end
%% -----------------------------------------------------------------------------
is_better(latency, #{name := NewNodeName}, #{name := OldNodeName}) ->
%% TODO This only works for disterl!
is_better_node_by_latency(
timer:tc(net_adm, ping, [NewNodeName]),
timer:tc(net_adm, ping, [OldNodeName])
);
is_better(_, _, _) ->
true.
%% @private
is_better_node_by_latency({_, pang}, {_, _}) ->
%% if we do not get ping response from new node
false;
is_better_node_by_latency({_, pong}, {_, pang}) ->
%% if we cannot get response from old node but we got response from new (this should never happen, in general)
true;
is_better_node_by_latency({NewTime, pong}, {OldTime, pong}) ->
?LOG_DEBUG("XBOT: Checking is better - OldTime ~p - NewTime ~p", [OldTime, NewTime]),
%% otherwise check lower ping response
(OldTime-NewTime) > 0.
%% @private
select_disconnect_node([H | T]) ->
select_worst_in_active_view(T, H).
%% @private
select_worst_in_active_view([], Worst) ->
Worst;
select_worst_in_active_view([H | T], Worst) ->
Check = is_better(?HYPARVIEW_XBOT_ORACLE, H, Worst),
if Check -> select_worst_in_active_view(T, Worst);
true -> select_worst_in_active_view(T, H)
end.
%% -----------------------------------------------------------------------------
%% @private
%% @doc Send a disconnect message to a peer
%% @end
%% -----------------------------------------------------------------------------
do_disconnect(Peer, #state{active=Active0}=State0) ->
case sets:is_element(Peer, Active0) of
true ->
%% If a member of the active view, remove it.
Active = sets:del_element(Peer, Active0),
State = add_to_passive_view(Peer, State0#state{active=Active}),
ok = disconnect(Peer),
{noreply, State};
false ->
{noreply, State0}
end.