%% -----------------------------------------------------------------------------
%%
%% Copyright (c) 2013 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -----------------------------------------------------------------------------
%% -----------------------------------------------------------------------------
%% @doc This module implements the
%% <a href="https://www.dpss.inesc-id.pt/~ler/reports/srds07.pdf">
%% Plumtree Protocol
%% </a>
%% which is the component that materialises Partisan's gossip scheme.
%%
%% The component is implemented as as a {@link gen_server} that has two
%% functions:
%% <dl>
%% <dt><b>Tree construction</b></dt>
%% <dd>This component is in charge of selecting which links of the random
%% overlay network will be used to forward the message payload using an eager
%% push strategy. It implements a tree construction mechanisms that is as
%% simple as possible, with minimal overhead in terms of control messages.</dd>
%% <dt><b>Tree repair</b></dt>
%% <dd>This component is in charge of repairing the tree when failures occur.
%% The process ensures that, despite failures, all nodes remain covered by the
%% spanning tree. Therefore, it should be able to detect and heal partitions of
%% the tree. The overhead imposed by this operation should also be as low as
%% possible.</dd>
%% </dl>
%%
%% == Overview ==
%% The protocol operates as any pure gossip protocol, in the sense that, in
%% order to broadcast a message, each node gossips with `f' nodes provided by a
%% peer sampling service (where `f' is the protocol fanout). However, each node
%% uses a combination of eager push and lazy push gossip.
%%
%% <em>Eager push</em> is used just for a subset of the `f' nodes, while
%% <em>lazy push</em> is used for the remaining nodes. The links used for eager
%% push are selected in such a way that their closure effectively builds a
%% broadcast tree embedded in the random overlay network. Lazy push links are
%% used to ensure gossip reliability when nodes fail and also to quickly heal
%% the broadcast tree.
%% Furthermore, the set of (random) peers is not changed at each gossip round.
%% Instead, the same peers are used until failures are detected.
%%
%% The protocol uses a Partisan Channel (See @{link broadcast_channel}), i.e.
%% based on TCP, to support the message exchange, as they
%% offer extra reliability and an additional source of failure detection.
%%
%% Plumtree depends on an overlay network which is maintained by a peer
%% sampling service ({@link partisan_peer_service_manager}) and relies on the
%% peer sampling services/s exhibiting a property: <em>Symmetric (partial)
%% views</em>. If the links that form the spanning tree are symmetric,
%% then the tree may be shared by multiple sources. Symmetric partial views
%% render the task of creating bi-directional trees easier, and reduce the
%% amount of peers that each node has to maintain.
%%
%%
%% @end
%% -----------------------------------------------------------------------------
-module(partisan_plumtree_broadcast).
-behaviour(gen_server).
-include("partisan.hrl").
-include("partisan_logger.hrl").
-define(EVENT_MANAGER, partisan_peer_service_events).
-define(SERVER, ?MODULE).
-type message_id() :: any().
-type message_round() :: non_neg_integer().
%% Lazy messages that have not been acked. Messages are added to
%% this set when a node is sent a lazy message (or when it should be
%% sent one sometime in the future). Messages are removed when the lazy
%% pushes are acknowledged via graft or ignores. Entries are keyed by their
%% destination
%% These are stored in the ?PLUMTREE_OUTSTANDING ets table under using nodename
%% as key.
%% PLUMTREE_OUTSTANDING is created and owned by partisan_sup
-type outstanding() :: {message_id(), module(), message_round(), node()}.
-type exchange() :: {module(), node(), reference(), pid()}.
-type exchanges() :: [exchange()].
-type selector() :: all
| {peer, node()}
| {mod, module()}
| reference()
| pid().
-type opts() :: opts_map() | opts_list().
-type opts_map() :: #{
lazy_tick_period => non_neg_integer(),
exchange_tick_period => non_neg_integer()
}.
-type opts_list() :: [
{lazy_tick_period, non_neg_integer()}
| {exchange_tick_period, non_neg_integer()}
].
-record(state, {
%% Initially trees rooted at each node are the same.
%% Portions of that tree belonging to this node are
%% shared in this set.
common_eagers :: nodeset(),
%% Initially trees rooted at each node share the same lazy links.
%% Typically this set will contain a single element. However, it may
%% contain more in large clusters and may be empty for clusters with
%% less than three nodes.
common_lazys :: nodeset(),
%% A mapping of sender node (root of each broadcast tree)
%% to this node's portion of the tree. Elements are
%% added to this structure as messages rooted at a node
%% propagate to this node. Nodes that are never the
%% root of a message will never have a key added to
%% `eager_sets'
eager_sets :: #{node() := nodeset()},
%% A Mapping of sender node (root of each spanning tree)
%% to this node's set of lazy peers. Elements are added
%% to this structure as messages rooted at a node
%% propagate to this node. Nodes that are never the root
%% of a message will never have a key added to `lazy_sets'
lazy_sets :: #{node() := nodeset()},
%% Set of registered modules that may handle messages that
%% have been broadcast
mods :: [module()],
%% List of outstanding exchanges
exchanges :: exchanges(),
%% Set of all known members. Used to determine
%% which members have joined and left during a membership update
all_members :: nodeset(),
%% Lazy tick period in milliseconds. On every tick all outstanding
%% lazy pushes are sent out
lazy_tick_period :: non_neg_integer(),
%% Exchange tick period in milliseconds that may or may not occur
exchange_tick_period :: non_neg_integer()
}).
-type state() :: #state{}.
-type nodeset() :: ordsets:ordset(node()).
%% API
-export([broadcast/2]).
-export([broadcast_channel/1]).
-export([broadcast_members/0]).
-export([broadcast_members/1]).
-export([cancel_exchanges/1]).
-export([exchanges/0]).
-export([exchanges/1]).
-export([exchanges/2]).
-export([start_link/0]).
-export([start_link/5]).
-export([update/1]).
%% Debug API
-export([get_peers/1]).
-export([get_eager_peers/1]).
-export([get_lazy_peers/1]).
-export([debug_get_peers/2]).
-export([debug_get_peers/3]).
-export([debug_get_tree/2]).
-export([debug_get_tree/3]).
%% gen_server callbacks
-export([init/1]).
-export([handle_call/3]).
-export([handle_cast/2]).
-export([handle_info/2]).
-export([terminate/2]).
-export([code_change/3]).
-eqwalizer({nowarn_function, start_link/5}).
%% =============================================================================
%% API
%% =============================================================================
%% -----------------------------------------------------------------------------
%% @doc Starts the broadcast server on this node.
%%
%% The initial membership list is fetched from the configured @{link
%% partisan_peer_service}.
%%
%% If the node is a singleton then the initial eager and lazy sets are empty.
%% If there are two nodes, each will be in the others
%% eager set and the lazy sets will be empty. When number of members is less
%% than 5, each node will initially have one other node in its eager set and
%% lazy set. If there are more than five nodes each node will have at most two
%% other nodes in its eager set and one in its lazy set, initially.
%%
%% In addition, after the broadcast server is started, all callbacks defined in
%% the configuration option `broadcast_mods' are registered.
%% By default the list of callbacks includes the module
%% {@link partisan_plumtree_backend} which is used by to generate membership
%% updates as the ring changes.
%%
%% @TODO we should spawn 1 broadcast server per channel and or channel
%% partition
%% @end
%% -----------------------------------------------------------------------------
-spec start_link() -> {ok, pid()} | ignore | {error, term()}.
start_link() ->
LazyTickPeriod = partisan_config:get(
lazy_tick_period, ?DEFAULT_LAZY_TICK_PERIOD
),
ExchangeTickPeriod = partisan_config:get(
exchange_tick_period, ?DEFAULT_EXCHANGE_TICK_PERIOD
),
Opts = #{
lazy_tick_period => LazyTickPeriod,
exchange_tick_period => ExchangeTickPeriod
},
{ok, Members} = partisan_peer_service:members(),
?LOG_DEBUG("Peer sampling service members: ~p", [Members]),
%% The peer service has already sampled the members, we start off
%% with pure gossip (ie. all members are in the eager push list and lazy
%% list is empty)
InitEagers = Members,
InitLazys = [],
?LOG_DEBUG("Init peers, eager: ~p, lazy: ~p", [InitEagers, InitLazys]),
Mods = partisan_config:get(broadcast_mods, []),
start_link(Members, InitEagers, InitLazys, Mods, Opts).
%% -----------------------------------------------------------------------------
%% @doc Starts the broadcast server on this node.
%% `Members' must be a list of all members known to this node when starting
%% the broadcast server.
%% `Eagers' are the initial peers of this node for all broadcast trees.
%% `Lazys' is a list of random peers not in `Eagers' that will be used
%% as the initial lazy peer shared by all trees for this node. If the number
%% of nodes in the cluster is less than 3, `Lazys' should be an empty list.
%% `Eagers' and `Lazys' must also be subsets of `Members'. `Mods' is
%% a list of modules that may be handlers for broadcasted messages. All modules
%% in `Mods' should implement the `partisan_plumtree_broadcast_handler'
%% behaviour.
%%
%% `Opts' is a proplist or map with the following possible options:
%% <ul>
%% <li> `lazy_tick_period :: non_neg_integer()' - Flush all outstanding lazy pushes period (in milliseconds)</li>
%% <li> `exchange_tick_period :: non_neg_integer()' - Possibly perform an exchange period (in milliseconds)</li>
%% </ul>
%%
%% NOTE: When starting the server using start_link/2 no automatic membership
%% update from ring_events is registered. Use {@link start_link/0}.
%% @end
%% -----------------------------------------------------------------------------
-spec start_link(
Members :: [node()],
Eagers :: [node()],
Lazys :: [node()],
Mods :: [module()],
Opts :: opts()) ->
{ok, pid()} | ignore | {error, term()}.
start_link(Members, Eagers, Lazys, Mods, Opts)
when is_list(Members), is_list(Eagers), is_list(Lazys), is_list(Mods),
is_list(Opts) ->
start_link(Members, Eagers, Lazys, Mods, maps:from_list(Opts));
start_link(Members, Eagers, Lazys, Mods, Opts)
when is_list(Members), is_list(Eagers), is_list(Lazys), is_list(Mods),
is_map(Opts) ->
Args = [Members, Eagers, Lazys, Mods, Opts],
StartOpts = [
{spawn_opt, ?PARALLEL_SIGNAL_OPTIMISATION([])}
],
gen_server:start_link({local, ?SERVER}, ?MODULE, Args, StartOpts).
%% -----------------------------------------------------------------------------
%% @doc Broadcasts a message originating from this node.
%% The message will be delivered to each node at least once. The `Mod' passed
%% must be loaded on all members of the cluster and implement the
%% `partisan_plumtree_broadcast_handler' behaviour which is responsible for
%% handling the message on remote nodes as well as providing some other
%% information both locally and on other nodes.
%%
%% The broadcast will be sent over the channel defined by
%% {@link broadcast_channel/1}.
%% @end
%% -----------------------------------------------------------------------------
-spec broadcast(any(), module()) -> ok.
broadcast(Broadcast, Mod) ->
{MessageId, Payload} = Mod:broadcast_data(Broadcast),
gen_server:cast(?SERVER, {broadcast, MessageId, Payload, Mod}).
%% -----------------------------------------------------------------------------
%% @doc Returns the channel to be used when sending broadcasting a message
%% on behalf of module `Mod'.
%%
%% The channel defined by the callback `Mod:broadcast_channel()' or default
%% channel i.e. {@link partisan:default_channel/0} if the callback is not
%% implemented.
%% @end
%% -----------------------------------------------------------------------------
-spec broadcast_channel(Mod :: module()) -> partisan:channel().
broadcast_channel(Mod) ->
case erlang:function_exported(Mod, broadcast_channel, 0) of
true ->
Mod:broadcast_channel();
false ->
?DEFAULT_CHANNEL
end.
%% -----------------------------------------------------------------------------
%% @doc Notifies broadcast server of membership update
%% This is the function is added to partisan_peer_service_events using
%% partisan_peer_service:add_sup_callback(fun ?MODULE:update/1),
%% @end
%% -----------------------------------------------------------------------------
-spec update([node()]) -> ok.
update(LocalState0) ->
LocalState = partisan_peer_service:decode(LocalState0),
gen_server:cast(?SERVER, {update, LocalState}).
%% -----------------------------------------------------------------------------
%% @doc Returns the broadcast servers view of full cluster membership.
%% Wait indefinitely for a response is returned from the process.
%% @end
%% -----------------------------------------------------------------------------
-spec broadcast_members() -> nodeset().
broadcast_members() ->
broadcast_members(infinity).
%% -----------------------------------------------------------------------------
%% @doc Returns the broadcast servers view of full cluster membership.
%% Waits `Timeout' ms for a response from the server.
%% @end
%% -----------------------------------------------------------------------------
-spec broadcast_members(infinity | pos_integer()) -> nodeset().
broadcast_members(Timeout) ->
gen_server:call(?SERVER, broadcast_members, Timeout).
%% -----------------------------------------------------------------------------
%% @doc return a list of exchanges, started by broadcast on this node, that are
%% running.
%% @end
%% -----------------------------------------------------------------------------
-spec exchanges() -> {ok, exchanges()}.
exchanges() ->
gen_server:call(?SERVER, exchanges, infinity).
%% -----------------------------------------------------------------------------
%% @doc Returns a list of running exchanges, started on `Node'.
%% @end
%% -----------------------------------------------------------------------------
-spec exchanges(node()) ->
{ok, exchanges()}
| {error, {badrpc, Reason :: any()}}.
exchanges(Node) ->
exchanges(Node, infinity).
%% -----------------------------------------------------------------------------
%% @doc Returns a list of running exchanges, started on `Node'.
%% @end
%% -----------------------------------------------------------------------------
-spec exchanges(node(), timeout()) ->
{ok, exchanges()}
| {error, {badrpc, Reason :: any()}}.
exchanges(Node, Timeout) ->
%% This will not work because gen_server uses disterl
%% TODO reconsider turning this server into a partisan_gen_serv
%% gen_server:call({?SERVER, Node}, exchanges, infinity).
case partisan_rpc:call(Node, ?SERVER, exchanges, [], Timeout) of
{ok, _} = OK ->
%% eqwalizer:ignore
OK;
{badrpc, _} = Reason ->
{error, Reason}
end.
%% -----------------------------------------------------------------------------
%% @doc Cancel exchanges started by this node.
%% @end
%% -----------------------------------------------------------------------------
-spec cancel_exchanges(selector()) -> exchanges().
cancel_exchanges(Selector) ->
gen_server:call(?SERVER, {cancel_exchanges, Selector}, infinity).
%% -----------------------------------------------------------------------------
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec get_peers(Root :: node()) -> list().
get_peers(Root) ->
gen_server:call(?SERVER, {get_peers, Root}).
%% -----------------------------------------------------------------------------
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec get_eager_peers(Root :: node()) -> list().
get_eager_peers(Root) ->
gen_server:call(?SERVER, {get_eager_peers, Root}).
%% -----------------------------------------------------------------------------
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec get_lazy_peers(Root :: node()) -> list().
get_lazy_peers(Root) ->
gen_server:call(?SERVER, {get_lazy_peers, Root}).
%% =============================================================================
%% GEN_SERVER CALLBACKS
%% =============================================================================
-spec init(list()) -> {ok, state()}.
init([Members, InitEagers0, InitLazys0, Mods, Opts]) ->
%% We subscribe to the membership change events
partisan_peer_service:add_sup_callback(fun ?MODULE:update/1),
LazyTickPeriod = maps:get(lazy_tick_period, Opts),
ExchangeTickPeriod = maps:get(exchange_tick_period, Opts),
schedule_lazy_tick(LazyTickPeriod),
schedule_exchange_tick(ExchangeTickPeriod),
State1 = #state{
all_members = ordsets:new(),
common_lazys = ordsets:new(),
common_eagers = ordsets:new(),
eager_sets = maps:new(),
lazy_sets = maps:new(),
mods = lists:usort(Mods),
exchanges = [],
lazy_tick_period = LazyTickPeriod,
exchange_tick_period = ExchangeTickPeriod
},
AllMembers = ordsets:from_list(Members),
InitEagers = ordsets:from_list(InitEagers0),
InitLazys = ordsets:from_list(InitLazys0),
%% We finish initialising the state
State2 = reset_peers(AllMembers, InitEagers, InitLazys, State1),
{ok, State2}.
-spec handle_call(term(), {pid(), term()}, state()) -> {reply, term(), state()}.
handle_call({get_peers, Root}, _From, State) ->
EagerPeers = all_peers(
Root, State#state.eager_sets, State#state.common_eagers
),
LazyPeers = all_peers(
Root, State#state.lazy_sets, State#state.common_lazys
),
{reply, {EagerPeers, LazyPeers}, State};
handle_call({get_eager_peers, Root}, _From, State) ->
EagerPeers = all_peers(
Root, State#state.eager_sets, State#state.common_eagers
),
{reply, EagerPeers, State};
handle_call({get_lazy_peers, Root}, _From, State) ->
LazyPeers = all_peers(
Root, State#state.lazy_sets, State#state.common_lazys
),
{reply, LazyPeers, State};
handle_call(broadcast_members, _From, State=#state{all_members=AllMembers}) ->
{reply, AllMembers, State};
handle_call(exchanges, _From, State=#state{exchanges=Exchanges}) ->
{reply, Exchanges, State};
handle_call({cancel_exchanges, WhichExchanges}, _From, State) ->
Cancelled = cancel_exchanges(WhichExchanges, State#state.exchanges),
{reply, Cancelled, State}.
-spec handle_cast(term(), state()) -> {noreply, state()}.
handle_cast({broadcast, MessageId, Message, Mod}, State) ->
?LOG_DEBUG("received {broadcast, ~p, Msg, ~p}", [MessageId, Mod]),
State1 = eager_push(MessageId, Message, Mod, State),
State2 = schedule_lazy_push(MessageId, Mod, State1),
{noreply, State2};
handle_cast({broadcast, MessageId, Message, Mod, Round, Root, From}, State) ->
?LOG_DEBUG(
"received {broadcast, ~p, Msg, ~p, ~p, ~p, ~p}",
[MessageId, Mod, Round, Root, From]
),
Valid = Mod:merge(MessageId, Message),
State1 = handle_broadcast(Valid, MessageId, Message, Mod, Round, Root, From, State),
{noreply, State1};
handle_cast({prune, Root, From}, State) ->
?LOG_DEBUG("received ~p", [{prune, Root, From}]),
?LOG_DEBUG("moving peer ~p from eager to lazy", [From]),
State1 = add_lazy(From, Root, State),
{noreply, State1};
handle_cast({i_have, MessageId, Mod, Round, Root, From}, State) ->
?LOG_DEBUG("received ~p", [{i_have, MessageId, Mod, Round, Root, From}]),
Stale = Mod:is_stale(MessageId),
State1 = handle_ihave(Stale, MessageId, Mod, Round, Root, From, State),
{noreply, State1};
handle_cast({ignored_i_have, MessageId, Mod, Round, Root, From}, State) ->
?LOG_DEBUG(#{
description => "received ~p",
message => {ignored_i_have, MessageId, Mod, Round, Root, From}
}),
ok = ack_outstanding(MessageId, Mod, Round, Root, From),
{noreply, State};
handle_cast({graft, MessageId, Mod, Round, Root, From}, State) ->
?LOG_DEBUG("received ~p", [{graft, MessageId, Mod, Round, Root, From}]),
Result = Mod:graft(MessageId),
?LOG_DEBUG("graft(~p): ~p", [MessageId, Result]),
State1 = handle_graft(Result, MessageId, Mod, Round, Root, From, State),
{noreply, State1};
handle_cast({update, MemberList}, #state{} = State) when is_list(MemberList) ->
?LOG_DEBUG("received ~p", [{update, MemberList}]),
#state{
all_members = AllMembers,
common_eagers = EagerPeers0,
common_lazys = LazyPeers
} = State,
Members = ordsets:from_list(MemberList),
New = ordsets:subtract(Members, AllMembers),
Removed = ordsets:subtract(AllMembers, Members),
?LOG_DEBUG("new members: ~p", [ordsets:to_list(New)]),
?LOG_DEBUG("removed members: ~p", [ordsets:to_list(Removed)]),
State1 = case ordsets:size(New) > 0 of
false ->
State;
true ->
%% as per the paper (page 9):
%% "When a new member is detected, it is simply added to the set
%% of eagerPushPeers"
EagerPeers = ordsets:union(EagerPeers0, New),
?LOG_DEBUG(
"new peers, eager: ~p, lazy: ~p", [EagerPeers, LazyPeers]
),
%% eqwalizer:ignore Members
reset_peers(Members, EagerPeers, LazyPeers, State)
end,
State2 = neighbors_down(Removed, State1),
{noreply, State2}.
-spec handle_info(
'exchange_tick' | 'lazy_tick' | {'DOWN', _, 'process', _, _}, state()) ->
{noreply, state()}.
handle_info(lazy_tick, #state{lazy_tick_period = Period} = State) ->
ok = send_lazy(),
schedule_lazy_tick(Period),
{noreply, State};
handle_info(exchange_tick, #state{exchange_tick_period = Period} = State) ->
State1 = maybe_exchange(State),
schedule_exchange_tick(Period),
{noreply, State1};
handle_info(
{'DOWN', Ref, process, _Pid, _Reason}, State=#state{exchanges=Exchanges}) ->
%% An exchange has terminated
Exchanges1 = lists:keydelete(Ref, 3, Exchanges),
{noreply, State#state{exchanges=Exchanges1}};
handle_info({gen_event_EXIT, {?EVENT_MANAGER, _}, Reason}, State)
when Reason == normal; Reason == shutdown ->
{noreply, State};
handle_info({gen_event_EXIT, {?EVENT_MANAGER, _}, {swapped, _, _}}, State) ->
{noreply, State};
handle_info({gen_event_EXIT, {?EVENT_MANAGER, _}, Reason}, State) ->
?LOG_INFO(#{
description => "Event handler terminated. Adding new handler.",
reason => Reason,
manager => ?EVENT_MANAGER
}),
partisan_peer_service:add_sup_callback(fun ?MODULE:update/1),
{noreply, State};
handle_info(Event, State) ->
?LOG_INFO(#{description => "Unhandled info event", event => Event}),
{noreply, State}.
-spec terminate(term(), state()) -> term().
terminate(_Reason, _State) ->
ok.
-spec code_change(term() | {down, term()}, state(), term()) -> {ok, state()}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%% =============================================================================
%% DEBUG API
%% =============================================================================
%% @doc return the peers for `Node' for the tree rooted at `Root'.
%% Wait indefinitely for a response is returned from the process
-spec debug_get_peers(node(), node()) ->
{nodeset(), nodeset()} | no_return().
debug_get_peers(Node, Root) ->
debug_get_peers(Node, Root, infinity).
%% @doc return the peers for `Node' for the tree rooted at `Root'.
%% Waits `Timeout' ms for a response from the server
-spec debug_get_peers(node(), node(), infinity | pos_integer()) ->
{nodeset(), nodeset()} | no_return().
debug_get_peers(Node, Root, Timeout) ->
%% This will not work because gen_server uses disterl
%% gen_server:call({?SERVER, Node}, {get_peers, Root}, Timeout).
%% TODO reconsider turning this server into a partisan_gen_server
case partisan_rpc:call(Node, ?MODULE, get_peers, [Root], Timeout) of
{badrpc, Reason} ->
error(Reason);
{_, _} = Result ->
%% eqwalizer:ignore
Result
end.
%% -----------------------------------------------------------------------------
%% @doc return peers for all `Nodes' for tree rooted at `Root'
%% Wait indefinitely for a response is returned from the process
%% @end
%% -----------------------------------------------------------------------------
-spec debug_get_tree(node(), [node()]) ->
[{node(), {nodeset(), nodeset()} | down}].
debug_get_tree(Root, Nodes) ->
debug_get_tree(Root, Nodes, infinity).
%% -----------------------------------------------------------------------------
%% @doc return peers for all `Nodes' for tree rooted at `Root'
%% Wait `Timeout' for a response is returned from the process
%% @end
%% -----------------------------------------------------------------------------
-spec debug_get_tree(node(), [node()], timeout()) ->
[{node(), {nodeset(), nodeset()} | down}].
debug_get_tree(Root, Nodes, Timeout) ->
[
begin
try
{Node, debug_get_peers(Node, Root, Timeout)}
catch
_:Reason ->
?LOG_INFO(#{
description =>
"Call to get remote root tree failed.",
peer => Node,
root => Root,
reason => Reason
}),
{Node, down}
end
end
|| Node <- Nodes
].
%% =============================================================================
%% PRIVATE
%% =============================================================================
%% @private
handle_broadcast(
false, _MessageId, _Message, Mod, _Round, Root, From, State) ->
%% stale msg
%% remove sender from eager and set as lazy
?LOG_DEBUG("moving peer ~p from eager to lazy", [From]),
State1 = add_lazy(From, Root, State),
ok = send({prune, Root, partisan:node()}, Mod, From),
State1;
handle_broadcast(true, MessageId, Message, Mod, Round, Root, From, State) ->
%% valid msg
%% remove sender from lazy and set as eager
State1 = add_eager(From, Root, State),
State2 = eager_push(MessageId, Message, Mod, Round + 1, Root, From, State1),
schedule_lazy_push(MessageId, Mod, Round + 1, Root, From, State2).
%% @private
handle_ihave(true, MessageId, Mod, Round, Root, From, State) ->
%% stale i_have
ok = send(
{ignored_i_have, MessageId, Mod, Round, Root, partisan:node()},
Mod,
From
),
State;
handle_ihave(false, MessageId, Mod, Round, Root, From, State) ->
%% valid i_have
%% TODO: don't graft immediately
ok = send(
{graft, MessageId, Mod, Round, Root, partisan:node()}, Mod, From
),
add_eager(From, Root, State).
%% @private
handle_graft(stale, MessageId, Mod, Round, Root, From, State) ->
%% There has been a subsequent broadcast that is causally newer than this
%% message according to Mod. We ack the outstanding message since the
%% outstanding entry for the newer message exists.
ok = ack_outstanding(MessageId, Mod, Round, Root, From),
State;
handle_graft({ok, Message}, MessageId, Mod, Round, Root, From, State) ->
%% We don't ack outstanding here because the broadcast may fail to be
%% delivered.
%% Instead we will allow the i_have to be sent once more and let the
%% subsequent ignore serve as the ack.
State1 = add_eager(From, Root, State),
ok = send(
{broadcast, MessageId, Message, Mod, Round, Root, partisan:node()},
Mod,
From
),
State1;
handle_graft({error, Reason}, _MessageId, Mod, _Round, _Root, _From, State) ->
?LOG_ERROR(#{
description => "Unable to graft message",
callback_mod => Mod,
reason => Reason
}),
State.
%% @private
neighbors_down(Removed, #state{} = State) ->
#state{
all_members = AllMembers,
common_eagers = CommonEagers,
eager_sets = EagerSets,
common_lazys = CommonLazys,
lazy_sets = LazySets
} = State,
NewAllMembers = ordsets:subtract(AllMembers, Removed),
NewCommonEagers = ordsets:subtract(CommonEagers, Removed),
NewCommonLazys = ordsets:subtract(CommonLazys, Removed),
%% TODO: once we have delayed grafting need to remove timers
NewEagerSets = maps:from_list([
{Root, ordsets:subtract(Existing, Removed)}
|| {Root, Existing} <- maps:to_list(EagerSets)
]),
NewLazySets = maps:from_list([
{Root, ordsets:subtract(Existing, Removed)}
|| {Root, Existing} <- maps:to_list(LazySets)
]),
%% delete outstanding messages to removed peers
ok = ordsets:fold(
fun(Peer, Acc) ->
%% PLUMTREE_OUTSTANDING is a duplicate bag, so delete will delete
%% all messages for the removed Peer
_ = ets:delete(?PLUMTREE_OUTSTANDING, Peer),
Acc
end,
ok,
Removed
),
State#state{
all_members = NewAllMembers,
common_eagers = NewCommonEagers,
common_lazys = NewCommonLazys,
eager_sets = NewEagerSets,
lazy_sets = NewLazySets
}.
%% @private
eager_push(MessageId, Message, Mod, State) ->
eager_push(
MessageId, Message, Mod, 0, partisan:node(), partisan:node(), State
).
%% @private
eager_push(MessageId, Message, Mod, Round, Root, From, State) ->
Peers = eager_peers(Root, From, State),
?LOG_DEBUG("eager push to peers: ~p", [Peers]),
ok = send(
{broadcast, MessageId, Message, Mod, Round, Root, partisan:node()},
Mod,
Peers
),
State.
%% @private
schedule_lazy_push(MessageId, Mod, State) ->
schedule_lazy_push(
MessageId, Mod, 0, partisan:node(), partisan:node(), State
).
%% @private
schedule_lazy_push(MessageId, Mod, Round, Root, From, State) ->
Peers = lazy_peers(Root, From, State),
?LOG_DEBUG(
"scheduling lazy push to peers ~p: ~p",
[Peers, {MessageId, Mod, Round, Root, From}]
),
ok = add_all_outstanding(MessageId, Mod, Round, Root, Peers),
State.
%% @private
send_lazy() ->
_ = ets:foldl(
fun
({Peer, Message}, {Peer, true} = Acc) ->
ok = send_lazy(Message, Peer),
Acc;
({Peer, _}, {Peer, false} = Acc) ->
%% We skip sending
Acc;
({Peer, Message}, _) ->
case partisan:is_connected(Peer) of
true ->
%% This will send even when Mod:broadcast_channel is
%% not connected as it will use the default channel.
%% TODO make this option configurable so that we can
%% ask Partisan to skip in case broadcast_channel is
%% not connected.
ok = send_lazy(Message, Peer),
{Peer, true};
false ->
%% We skip sending
{Peer, false}
end
end,
undefined,
?PLUMTREE_OUTSTANDING
),
ok.
%% @private
-spec send_lazy(outstanding(), node()) -> ok.
send_lazy({MessageId, Mod, Round, Root}, Peer) ->
?LOG_DEBUG(#{
description => "sending lazy push ~p",
message => {i_have, MessageId, Mod, Round, Root, partisan:node()}
}),
send({i_have, MessageId, Mod, Round, Root, partisan:node()}, Mod, Peer).
%% @private
maybe_exchange(State) ->
Root = random_root(State),
Peer = random_peer(Root, State),
maybe_exchange(Peer, State).
maybe_exchange(undefined, State) ->
State;
maybe_exchange(_, #state{mods = []} = State) ->
State;
maybe_exchange(Peer, State) ->
%% limit the number of exchanges this node can start concurrently.
%% the exchange must (currently?) implement any "inbound" concurrency limits
Limit = partisan_config:get(broadcast_start_exchange_limit),
case length(State#state.exchanges) >= Limit of
true ->
State;
false ->
maybe_exchange(Peer, State, State#state.mods)
end.
%% @private
maybe_exchange(_Peer, State, []) ->
State;
maybe_exchange(Peer, #state{mods = [_|Mods]} = State, [H|T]) ->
%% We place the current Mod at the end of the list i.e. results in a
%% roundrobin algorithm for when limit =/= length(Mods)
NewState = State#state{mods = Mods ++ [H]},
case lists:keyfind(H, 1, State#state.exchanges) of
{H, _, _, _} ->
%% We skip current Mod as there is already an exchange for it
?LOG_DEBUG(
"Ignoring exchange request for ~p with ~p, "
"there is already another exchange running "
"for the same handler.",
[H, Peer]
),
maybe_exchange(Peer, NewState, T);
false ->
maybe_exchange(Peer, exchange(Peer, State, H), T)
end.
%% @private
exchange(Peer, #state{exchanges = Exchanges} = State, Mod) ->
case Mod:exchange(Peer) of
ignore ->
?LOG_DEBUG(
"~p ignored exchange request with ~p.", [Mod, Peer]
),
State;
{ok, Pid} ->
?LOG_DEBUG(
"Started ~p exchange with ~p (~p).", [Mod, Peer, Pid]
),
Ref = monitor(process, Pid),
State#state{exchanges = [{Mod, Peer, Ref, Pid} | Exchanges]};
{error, _Reason} ->
State
end.
%% @private
cancel_exchanges(all, Exchanges) ->
kill_exchanges(Exchanges);
cancel_exchanges(WhichProc, Exchanges)
when is_reference(WhichProc) orelse is_pid(WhichProc) ->
KeyPos = case is_reference(WhichProc) of
true -> 3;
false -> 4
end,
case lists:keyfind(WhichProc, KeyPos, Exchanges) of
false ->
[];
Exchange ->
kill_exchange(Exchange),
[Exchange]
end;
cancel_exchanges(Which, Exchanges) ->
Filter = exchange_filter(Which),
ToCancel = [Ex || Ex <- Exchanges, Filter(Ex)],
kill_exchanges(ToCancel).
%% @private
kill_exchanges(Exchanges) ->
_ = [kill_exchange(Exchange) || Exchange <- Exchanges],
Exchanges.
%% @private
kill_exchange({_, _, _, ExchangePid}) ->
exit(ExchangePid, cancel_exchange).
%% @private
exchange_filter({peer, Peer}) ->
fun({_, ExchangePeer, _, _}) ->
Peer =:= ExchangePeer
end;
exchange_filter({mod, Mod}) ->
fun({ExchangeMod, _, _, _}) ->
Mod =:= ExchangeMod
end.
%% -----------------------------------------------------------------------------
%% @private
%% @doc picks random root uniformly
%% @end
%% -----------------------------------------------------------------------------
random_root(#state{all_members=Members}) ->
random_other_node(Members).
%% -----------------------------------------------------------------------------
%% @doc picks random peer favoring peers not in eager or lazy set and ensuring
%% peer is not this node
%% @end
%% -----------------------------------------------------------------------------
random_peer(Root, State=#state{all_members=All}) ->
Node = partisan:node(),
Mode = partisan_config:get(exchange_selection, optimized),
Other = case Mode of
normal ->
%% Normal; randomly select a peer from the known membership at
%% this node.
ordsets:del_element(Node, All);
optimized ->
%% Optimized; attempt to find a peer that's not in the broadcast
%% tree, to increase probability of selecting a lagging node.
Eagers = all_eager_peers(Root, State),
Lazys = all_lazy_peers(Root, State),
Union = ordsets:union([Eagers, Lazys]),
ordsets:del_element(Node, ordsets:subtract(All, Union))
end,
case ordsets:size(Other) of
0 ->
random_other_node(ordsets:del_element(Node, All));
_ ->
random_other_node(Other)
end.
%% -----------------------------------------------------------------------------
%% @private
%% @doc picks random node from ordset
%% @end
%% -----------------------------------------------------------------------------
random_other_node(OrdSet) ->
Size = ordsets:size(OrdSet),
case Size of
0 -> undefined;
_ ->
lists:nth(rand:uniform(Size), ordsets:to_list(OrdSet))
end.
%% @private
ack_outstanding(MessageId, Mod, Round, Root, From) ->
true = ets:delete_object(
?PLUMTREE_OUTSTANDING, {From, {MessageId, Mod, Round, Root}}
),
ok.
%% @private
add_all_outstanding(MessageId, Mod, Round, Root, Peers) ->
Message = {MessageId, Mod, Round, Root},
Objects = [{Peer, Message} || Peer <- ordsets:to_list(Peers)],
true = ets:insert(?PLUMTREE_OUTSTANDING, Objects),
ok.
%% @private
add_eager(From, Root, State) ->
update_peers(From, Root, fun ordsets:add_element/2, fun ordsets:del_element/2, State).
%% @private
add_lazy(From, Root, State) ->
update_peers(From, Root, fun ordsets:del_element/2, fun ordsets:add_element/2, State).
%% @private
update_peers(From, Root, EagerUpdate, LazyUpdate, State) ->
CurrentEagers = all_eager_peers(Root, State),
CurrentLazys = all_lazy_peers(Root, State),
NewEagers = EagerUpdate(From, CurrentEagers),
NewLazys = LazyUpdate(From, CurrentLazys),
set_peers(Root, NewEagers, NewLazys, State).
%% @private
set_peers(Root, Eagers, Lazys, #state{} = State) ->
#state{eager_sets = EagerSets, lazy_sets = LazySets} = State,
NewEagers = maps:put(Root, Eagers, EagerSets),
NewLazys = maps:put(Root, Lazys, LazySets),
State#state{eager_sets = NewEagers, lazy_sets = NewLazys}.
%% @private
all_eager_peers(Root, State) ->
all_peers(Root, State#state.eager_sets, State#state.common_eagers).
%% @private
all_lazy_peers(Root, State) ->
all_peers(Root, State#state.lazy_sets, State#state.common_lazys).
%% @private
eager_peers(Root, From, #state{} = State) ->
#state{eager_sets = EagerSets, common_eagers = CommonEagers} = State,
all_filtered_peers(Root, From, EagerSets, CommonEagers).
%% @private
lazy_peers(Root, From, #state{} = State) ->
#state{lazy_sets = LazySets, common_lazys = CommonLazys} = State,
all_filtered_peers(Root, From, LazySets, CommonLazys).
%% @private
all_filtered_peers(Root, From, Sets, Common) ->
All = all_peers(Root, Sets, Common),
ordsets:del_element(From, All).
%% @private
all_peers(Root, Sets, Default) ->
case maps:find(Root, Sets) of
{ok, Peers} -> Peers;
error -> Default
end.
%% @private
-spec send(
Msg :: partisan:message(),
Mod :: module(),
Peers :: [node()] | node()) -> ok.
send(Msg, Mod, Peers) when is_list(Peers) ->
_ = [send(Msg, Mod, P) || P <- Peers],
ok;
send(Msg, Mod, Peer) ->
instrument_transmission(Msg, Mod),
Opts = #{channel => broadcast_channel(Mod)},
partisan:cast_message(Peer, ?SERVER, Msg, Opts).
%% @private
schedule_lazy_tick(Period) ->
schedule_tick(lazy_tick, lazy_tick_period, Period).
%% @private
schedule_exchange_tick(Period) ->
schedule_tick(exchange_tick, exchange_tick_period, Period).
%% @private
schedule_tick(Message, Timer, Default) ->
TickMs = partisan_config:get(Timer, Default),
erlang:send_after(TickMs, ?MODULE, Message).
%% @private
-spec reset_peers(nodeset(), nodeset(), nodeset(), state()) -> state().
reset_peers(AllMembers, EagerPeers, LazyPeers, State) ->
MyNode = partisan:node(),
State#state{
common_eagers = ordsets:del_element(MyNode, EagerPeers),
common_lazys = ordsets:del_element(MyNode, LazyPeers),
eager_sets = maps:new(),
lazy_sets = maps:new(),
all_members = AllMembers
}.
%% @private
instrument_transmission(Message, Mod) ->
case partisan_config:get(transmission_logging_mfa, undefined) of
undefined ->
ok;
{Module, Function, Args} ->
ToLog = try
Mod:extract_log_type_and_payload(Message)
catch
_:Error ->
?LOG_INFO(
"Couldn't extract log type and payload. Reason ~p",
[Error]
),
[]
end,
lists:foreach(
fun({Type, Payload}) ->
erlang:apply(Module, Function, Args ++ [Type, Payload])
end,
ToLog
)
end.