%% -------------------------------------------------------------------
%%
%% Copyright (c) 2019 Christopher Meiklejohn. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
%% -----------------------------------------------------------------------------
%% @doc This module realises the {@link partisan_peer_service_manager}
%% behaviour implementing a peer sampling service with a pluggable overlay
%% topology by delegating the topology definition to a callback module
%% implementing the @{partisan_peer_service_strategy} behaviour.
%%
%% == Characteristics ==
%% <ul>
%% <li>Uses TCP/IP.</li>
%% <li>All nodes communicate and maintain connections with all other nodes.</li>
%% <li>Nodes periodically send heartbeat messages. The service considers a node
%% "failed" when it misses X heartbeats.</li>
%% <li>Point-to-point messaging with a single network hop.</li>
%% <li>Eventually consistent membership maintained in a CRDT and replicated
%% using gossip.</li>
%% <li>Scalability limited to hundres of nodes (60-200 nodes).</li>
%% </ul>
%% @end
%% -----------------------------------------------------------------------------
-module(partisan_pluggable_peer_service_manager).
-behaviour(gen_server).
-behaviour(partisan_peer_service_manager).
-include("partisan_logger.hrl").
-include("partisan.hrl").
-define(SET_FROM_LIST(L), sets:from_list(L, [{version, 2}])).
-define(IS_ON_EVENT_FUN(X), (
is_function(X, 0) orelse is_function(X, 1) orelse is_function(X, 2)
)).
-ifdef(TEST).
-define(INTERPOSITION, true).
-endif.
-ifdef(INTERPOSITION).
%% returns ok
-define(FIRE_PRE_INTERPOSITIONS(Type, Node, Msg, Funs),
maps:fold(
fun(_Node, Fun, ok) ->
?LOG_DEBUG(
"Firing pre-interposition fun for message: ~p",
[Msg]
),
Fun({forward_message, Node, Msg}),
ok
end,
ok,
Funs
)
).
%% returns message
-define(FIRE_INTERPOSITIONS(Type, Node, Msg, Funs),
maps:fold(
fun(_Name, Fun, M) ->
?LOG_DEBUG(
"Firing interposition fun for message: ~p",
[M]
),
Fun({Type, Node, M})
end,
Msg,
Funs
)
).
%% returns ok
-define(FIRE_POST_INTERPOSITIONS(Type, Node, Msg0, Msg1, Funs),
maps:fold(
fun(_Name, Fun, ok) ->
?LOG_DEBUG(
"Firing post-interposition fun for messages: [~p, ~p]",
[Msg0, Msg1]
),
Fun(
{Type, Node, Msg0},
{Type, Node, Msg1}
),
ok
end,
ok,
Funs
)
).
-else.
%% returns ok
-define(FIRE_PRE_INTERPOSITIONS(Type, Node, Msg, Funs),
begin
%% We do this so that we avoid the compiler warning us Funs is not
%% used
true = is_map(Funs),
ok
end
).
%% returns message
-define(FIRE_INTERPOSITIONS(Type, Node, Msg, Funs),
Msg
).
%% returns ok
-define(FIRE_POST_INTERPOSITIONS(Type, Node, Msg0, Msg1, Funs),
ok
).
-endif.
-record(state, {
name :: node(),
node_spec :: partisan:node_spec(),
actor :: partisan:actor(),
vclock :: partisan_vclock:vclock(),
%% A materialised view of the membership_strategy_state as a list
members :: [partisan:node_spec()],
%% The nodes we still need to establish connections with
pending :: [partisan:node_spec()],
membership_strategy :: atom(),
membership_strategy_state :: term(),
leaving = false :: boolean(),
distance_metrics :: map(),
sync_joins :: #{partisan:node_spec() => sets:set(from())},
out_links :: [term()],
down_funs :: node_subs(),
channel_down_funs :: channel_subs(),
up_funs :: node_subs(),
channel_up_funs :: channel_subs(),
pre_interposition_funs :: interposition_map(x_interpos_fun()),
interposition_funs :: interposition_map(interpos_fun()),
post_interposition_funs :: interposition_map(x_interpos_fun())
}).
-type t() :: #state{}.
-type from() :: {pid(), atom()}.
-type on_event_fun() :: partisan_peer_service_manager:on_event_fun().
-type node_subs() :: #{'_' | node() => on_event_fun()}.
-type channel_subs() :: #{
{'_' | node(), partisan:channel()} =>
on_event_fun()
}.
-type interposition_map(T) :: #{any() => T}.
-type interpos_arg() :: {receive_message, node(), any()}
| {forward_message, node(), any()}.
-type interpos_fun() :: fun((interpos_arg()) -> interpos_arg()).
-type x_interpos_fun() :: fun((interpos_arg()) -> ok).
-type tag() :: atom().
-type info() :: connections
| retransmit
| periodic
| instrumentation
| distance
| tree_refresh
| {'EXIT', partisan:any_pid(), any()}
| {
connected,
partisan:node_spec(),
partisan:channel(),
tag(),
t()
}.
%% %% API
-export([member/1]).
%% PARTISAN_PEER_SERVICE_MANAGER CALLBACKS
-export([add_interposition_fun/2]).
-export([add_post_interposition_fun/2]).
-export([add_pre_interposition_fun/2]).
-export([cast_message/2]).
-export([cast_message/3]).
-export([cast_message/4]).
-export([decode/1]).
-export([forward_message/2]).
-export([forward_message/3]).
-export([forward_message/4]).
-export([get_interposition_funs/0]).
-export([get_local_state/0]).
-export([get_pre_interposition_funs/0]).
-export([inject_partition/2]).
-export([join/1]).
-export([leave/0]).
-export([leave/1]).
-export([members/0]).
-export([members_for_orchestration/0]).
-export([on_down/2]).
-export([on_down/3]).
-export([on_up/2]).
-export([on_up/3]).
-export([partitions/0]).
-export([receive_message/3]).
-export([remove_interposition_fun/1]).
-export([remove_post_interposition_fun/1]).
-export([remove_pre_interposition_fun/1]).
-export([reserve/1]).
-export([resolve_partition/1]).
-export([send_message/2]).
-export([start_link/0]).
-export([supports_capability/1]).
-export([sync_join/1]).
-export([update_members/1]).
%% gen_server callbacks
-export([init/1]).
-export([handle_call/3]).
-export([handle_cast/2]).
-export([handle_info/2]).
-export([terminate/2]).
-export([code_change/3]).
%% =============================================================================
%% API
%% =============================================================================
%% -----------------------------------------------------------------------------
%% @doc Returns true if node `Node' is a member in the membership list.
%% Otherwise returns `false'.
%% @end
%% -----------------------------------------------------------------------------
member(#{name := Node}) ->
member(Node);
member(Node) when is_atom(Node) ->
gen_server:call(?MODULE, {member, Node}, infinity).
%% =============================================================================
%% PARTISAN_PEER_SERVICE_MANAGER CALLBACKS
%% =============================================================================
%% -----------------------------------------------------------------------------
%% @doc Same as start_link([]).
%% @end
%% -----------------------------------------------------------------------------
-spec start_link() -> {ok, pid()} | ignore | {error, term()}.
start_link() ->
Opts = [
{spawn_opt, ?PARALLEL_SIGNAL_OPTIMISATION([])}
],
gen_server:start_link({local, ?MODULE}, ?MODULE, [], Opts).
%% -----------------------------------------------------------------------------
%% @doc Return membership list.
%% @end
%% -----------------------------------------------------------------------------
members() ->
gen_server:call(?MODULE, members, infinity).
%% -----------------------------------------------------------------------------
%% @doc Return membership list.
%% @end
%% -----------------------------------------------------------------------------
members_for_orchestration() ->
gen_server:call(?MODULE, members_for_orchestration, infinity).
%% -----------------------------------------------------------------------------
%% @doc Update membership.
%% @end
%% -----------------------------------------------------------------------------
update_members(Members) ->
gen_server:call(?MODULE, {update_members, Members}, infinity).
%% -----------------------------------------------------------------------------
%% @doc Return local node's view of cluster membership.
%% @end
%% -----------------------------------------------------------------------------
get_local_state() ->
gen_server:call(?MODULE, get_local_state, infinity).
%% -----------------------------------------------------------------------------
%% @doc Trigger function on connection close for a given node.
%% `Fun' is a function object taking zero or a single argument, where the
%% argument is the Node name.
%% @end
%% -----------------------------------------------------------------------------
on_down(Arg, Fun) ->
on_down(Arg, Fun, #{}).
%% -----------------------------------------------------------------------------
%% @doc Trigger function on connection close for a given node.
%% `Fun' is a function object taking zero or a single argument, where the
%% argument is the Node name.
%% @end
%% -----------------------------------------------------------------------------
on_down(#{name := Node}, Fun, Opts) ->
on_down(Node, Fun, Opts);
on_down(any, Fun, Opts) ->
on_down('_', Fun, Opts);
on_down(Node, Fun, Opts)
when is_atom(Node) andalso is_map(Opts) andalso ?IS_ON_EVENT_FUN(Fun) ->
gen_server:call(?MODULE, {on_down, Node, Fun, Opts}, infinity).
%% -----------------------------------------------------------------------------
%% @doc Trigger function on connection open for a given node.
%% `Fun' is a function object taking zero or a single argument, where the
%% argument is the Node name.
%% @end
%% -----------------------------------------------------------------------------
on_up(Arg, Fun) ->
on_up(Arg, Fun, #{}).
%% -----------------------------------------------------------------------------
%% @doc Trigger function on connection open for a given node.
%% `Fun' is a function object taking zero or a single argument, where the
%% argument is the Node name.
%% @end
%% -----------------------------------------------------------------------------
on_up(#{name := Node}, Fun, Opts) ->
on_up(Node, Fun, Opts);
on_up(any, Fun, Opts) ->
on_up('_', Fun, Opts);
on_up(Node, Fun, Opts)
when is_atom(Node) andalso is_map(Opts) andalso ?IS_ON_EVENT_FUN(Fun) ->
gen_server:call(?MODULE, {on_up, Node, Fun, Opts}, infinity).
%% -----------------------------------------------------------------------------
%% @doc Attempt to join a remote node.
%% @end
%% -----------------------------------------------------------------------------
join(#{name := _} = NodeSpec) ->
gen_server:call(?MODULE, {join, NodeSpec}, infinity).
%% -----------------------------------------------------------------------------
%% @doc Attempt to join a remote node.
%% @end
%% -----------------------------------------------------------------------------
sync_join(#{name := _} = NodeSpec) ->
gen_server:call(?MODULE, {sync_join, NodeSpec}, infinity).
%% -----------------------------------------------------------------------------
%% @doc Leave the cluster.
%% @end
%% -----------------------------------------------------------------------------
leave() ->
gen_server:call(?MODULE, {leave, partisan:node_spec()}, infinity).
%% -----------------------------------------------------------------------------
%% @doc Remove another node from the cluster.
%% @end
%% -----------------------------------------------------------------------------
leave(#{name := _} = NodeSpec) ->
gen_server:call(?MODULE, {leave, NodeSpec}, infinity).
%% -----------------------------------------------------------------------------
%% @doc Send message to a remote peer service manager.
%% @end
%% -----------------------------------------------------------------------------
send_message(Node, Message) ->
%% TODO maybe deprecate, not used by Partisan and we can always do
%% partisan_rpc:call/4
Cmd = {send_message, Node, Message},
gen_server:call(?MODULE, Cmd, infinity).
%% -----------------------------------------------------------------------------
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec cast_message(
Term :: partisan:any_pid() | partisan:any_name(),
Message :: partisan:message()) -> ok.
cast_message(Term, Message) ->
_ = forward_message(Term, {'$gen_cast', Message}, #{}),
ok.
%% -----------------------------------------------------------------------------
%% @doc Cast a message to a remote gen_server.
%% @end
%% -----------------------------------------------------------------------------
cast_message(Node, ServerRef, Message) ->
cast_message(Node, ServerRef, Message, #{}).
%% -----------------------------------------------------------------------------
%% @doc Cast a message to a remote gen_server.
%% @end
%% -----------------------------------------------------------------------------
cast_message(Node, ServerRef, Message, Options) ->
%% TODO maybe deprecate, since we have partisan_gen_server:cast
%% and partisan_gen_statem:cast ?
_ = forward_message(Node, ServerRef, {'$gen_cast', Message}, Options),
ok.
%% -----------------------------------------------------------------------------
%% @doc Gensym support for forwarding.
%% @end
%% -----------------------------------------------------------------------------
forward_message(Term, Message) ->
forward_message(Term, Message, #{}).
%% -----------------------------------------------------------------------------
%% @doc Gensym support for forwarding.
%% @end
%% -----------------------------------------------------------------------------
forward_message(PidOrName, Message, _Opts)
when is_pid(PidOrName); is_atom(PidOrName) ->
_ = erlang:send(PidOrName, Message),
ok;
forward_message({Name, Node}, Message, Opts)
when is_atom(Name), is_atom(Node) ->
case Node == partisan:node() of
true ->
_ = erlang:send(Name, Message),
ok;
false ->
forward_message(Node, Name, Message, Opts)
end;
forward_message({global, _} = ServerRef, Message, _Opts) ->
%% Will do nothing is disterl is not enabled as we currently do not have
%% partisan_global
partisan_peer_service_manager:deliver(ServerRef, Message);
forward_message({via, _, _} = ServerRef, Message, _Opts) ->
partisan_peer_service_manager:deliver(ServerRef, Message);
forward_message(RemoteRef, Message, Opts) ->
partisan_remote_ref:is_pid(RemoteRef)
orelse partisan_remote_ref:is_name(RemoteRef)
orelse error(badarg),
Node = partisan_remote_ref:node(RemoteRef),
Target = partisan_remote_ref:target(RemoteRef),
forward_message(Node, Target, Message, Opts).
%% -----------------------------------------------------------------------------
%% @doc Forward message to registered process on the remote side.
%% @end
%% -----------------------------------------------------------------------------
forward_message(Node, ServerRef, Message, Opts) when is_list(Opts) ->
forward_message(Node, ServerRef, Message, maps:from_list(Opts));
forward_message(Node, ServerRef, Message, Opts) when is_map(Opts) ->
%% TODO ServerRef heer can be atom(), pid(), partisan_ref(), {via, _, _},
%% {Name, Node} or anything !!!!
%% If attempting to forward to the local node or using disterl, bypass.
Bypass =
Node =:= partisan:node()
orelse partisan_config:get(connect_disterl, false),
case Bypass of
true ->
partisan_peer_service_manager:deliver(ServerRef, Message);
false ->
%% Get forwarding options and combine with message
%% specific options.
FwdOpts = maps:merge(
partisan_config:get(forward_options, #{}), Opts
),
%% Attempt to get the partition key, if possible.
PartitionKey = maps:get(
partition_key, FwdOpts, ?DEFAULT_PARTITION_KEY
),
%% Use a clock provided by the sender,
%% otherwise, use a generated one.
Clock = maps:get(clock, FwdOpts, undefined),
PaddedMessage = partisan_util:maybe_pad_term(Message),
Cmd = {
forward_message,
Node,
Clock,
PartitionKey,
ServerRef,
PaddedMessage,
FwdOpts
},
%% Is fast forward disabled?
DisableFastForward =
partisan_config:get(disable_fast_forward, false),
%% Needs ack?
NeedsAck = maps:get(ack, FwdOpts, false),
%% Use causal delivery?
CausalDelivery =
maps:get(causal_label, FwdOpts, undefined) =:= undefined,
%% Should we use fast forwarding?
%%
%% Conditions:
%% - fastforward is not disabled
%% - not labeled for causal delivery
%% - message does not need acknowledgement
FastForward =
not DisableFastForward
andalso not NeedsAck
andalso not CausalDelivery,
%% Attempt to fast-path, dispatching it directly to the connection
%% process
case FastForward andalso partisan_peer_connections:dispatch(Cmd) of
ok ->
ok;
_ ->
%% FastForward == false or {error, _} from dispatch
%% We do a serialized execution as Opts might require
%% retransmission
gen_server:call(?MODULE, Cmd, infinity)
end
end.
%% -----------------------------------------------------------------------------
%% @doc Receive message from a remote manager.
%% @end
%% -----------------------------------------------------------------------------
receive_message(
Node,
Channel,
{forward_message, _SrcNode, _Clock, _ServerRef, _Msg} = Cmd) ->
%% Process the message and generate the acknowledgement.
gen_server:call(?MODULE, {receive_message, Node, Channel, Cmd}, infinity);
receive_message(
Node,
Channel,
{forward_message, ServerRef, {'$partisan_padded', _Padding, Msg}}) ->
receive_message(Node, Channel, {forward_message, ServerRef, Msg});
receive_message(
_,
_Channel,
{forward_message, _ServerRef, {causal, Label, _, _, _, _, _} = Msg}) ->
partisan_causality_backend:receive_message(Label, Msg);
receive_message(Node, Channel, {forward_message, ServerRef, Msg} = Cmd) ->
%% We received a message for a destination in this node.
case partisan_config:get(disable_fast_receive, false) of
true ->
%% Serialize execution
gen_server:call(
?MODULE, {receive_message, Node, Channel, Cmd}, infinity
);
false ->
%% Concurrent execution
partisan_peer_service_manager:deliver(ServerRef, Msg)
end;
receive_message(Node, Channel, Msg) ->
gen_server:call(?MODULE, {receive_message, Node, Channel, Msg}, infinity).
%% -----------------------------------------------------------------------------
%% @doc Decode state.
%% @end
%% -----------------------------------------------------------------------------
decode(Membership) ->
Membership.
%% -----------------------------------------------------------------------------
%% @doc Reserve a slot for the particular tag.
%% @end
%% -----------------------------------------------------------------------------
reserve(Tag) ->
gen_server:call(?MODULE, {reserve, Tag}, infinity).
%% -----------------------------------------------------------------------------
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec supports_capability(Arg :: atom()) -> boolean().
supports_capability(monitoring) ->
true;
supports_capability(_) ->
false.
%% -----------------------------------------------------------------------------
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec add_pre_interposition_fun(any(), x_interpos_fun()) -> ok.
add_pre_interposition_fun(Name, Fun) ->
gen_server:call(
?MODULE,
{add_pre_interposition_fun, Name, Fun},
infinity
).
%% -----------------------------------------------------------------------------
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec get_pre_interposition_funs() -> interposition_map(x_interpos_fun()).
get_pre_interposition_funs() ->
gen_server:call(?MODULE, get_pre_interposition_funs, infinity).
%% -----------------------------------------------------------------------------
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec remove_pre_interposition_fun(any()) -> ok.
remove_pre_interposition_fun(Name) ->
gen_server:call(?MODULE, {remove_pre_interposition_fun, Name}, infinity).
%% -----------------------------------------------------------------------------
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec add_interposition_fun(any(), interpos_fun()) -> ok.
add_interposition_fun(Name, InterpositionFun) ->
gen_server:call(
?MODULE, {add_interposition_fun, Name, InterpositionFun}, infinity
).
%% -----------------------------------------------------------------------------
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec get_interposition_funs() -> interposition_map(interpos_fun()).
get_interposition_funs() ->
gen_server:call(?MODULE, get_interposition_funs, infinity).
%% -----------------------------------------------------------------------------
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec remove_interposition_fun(any()) -> ok.
remove_interposition_fun(Name) ->
gen_server:call(?MODULE, {remove_interposition_fun, Name}, infinity).
%% -----------------------------------------------------------------------------
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec add_post_interposition_fun(any(), x_interpos_fun()) -> ok.
add_post_interposition_fun(Name, PostInterpositionFun) ->
gen_server:call(
?MODULE,
{add_post_interposition_fun, Name, PostInterpositionFun},
infinity
).
%% -----------------------------------------------------------------------------
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec remove_post_interposition_fun(any()) -> ok.
remove_post_interposition_fun(Name) ->
gen_server:call(?MODULE, {remove_post_interposition_fun, Name}, infinity).
%% -----------------------------------------------------------------------------
%% @doc Inject a partition.
%% @end
%% -----------------------------------------------------------------------------
inject_partition(_Origin, _TTL) ->
{error, not_implemented}.
%% -----------------------------------------------------------------------------
%% @doc Resolve a partition.
%% @end
%% -----------------------------------------------------------------------------
resolve_partition(_Reference) ->
{error, not_implemented}.
%% -----------------------------------------------------------------------------
%% @doc Return partitions.
%% @end
%% -----------------------------------------------------------------------------
partitions() ->
{error, not_implemented}.
%% =============================================================================
%% GEN_SERVER CALLBACKS
%% =============================================================================
-spec init([]) -> {ok, t()}.
init([]) ->
%% Seed the random number generator.
partisan_config:seed(),
case partisan_config:get(binary_padding, false) of
true ->
%% Use 64-byte binary to force shared heap usage to cut down on copying.
BinaryPaddingTerm = rand_bits(512),
partisan_config:set(binary_padding_term, BinaryPaddingTerm);
_ ->
undefined
end,
%% Process connection exits.
process_flag(trap_exit, true),
%% Schedule periodic.
schedule_periodic(),
%% Schedule instrumentation.
schedule_instrumentation(),
%% Schedule distance metric.
schedule_distance(),
%% Schedule periodic connections.
schedule_connections(),
%% Schedule periodic retransmissionj.
schedule_retransmit(),
%% Schedule tree peers refresh.
schedule_tree_refresh(),
Name = partisan:node(),
Actor = gen_actor(Name),
VClock = partisan_vclock:fresh(),
%% We init the connections table and we become owners, if we crash the
%% table will be destroyed.
ok = partisan_peer_connections:init(),
MStrategy = partisan_config:get(membership_strategy),
{ok, Members, MState} =
partisan_membership_strategy:init(MStrategy, Actor),
{ok, #state{
name = Name,
node_spec = partisan:node_spec(),
actor = Actor,
pending = [],
vclock = VClock,
pre_interposition_funs = #{},
interposition_funs = #{},
post_interposition_funs = #{},
distance_metrics = #{},
sync_joins = #{},
up_funs = #{},
channel_up_funs = #{},
down_funs = #{},
channel_down_funs = #{},
out_links = [],
members = Members,
membership_strategy = MStrategy,
membership_strategy_state = MState
}}.
handle_call({reserve, _Tag}, _From, State) ->
{reply, {error, no_available_slots}, State};
handle_call({on_up, Name, Fun, #{channel := Channel}}, _From, State)
when is_atom(Channel) ->
Funs0 = State#state.channel_up_funs,
Funs = partisan_util:maps_append({Name, Channel}, Fun, Funs0),
{reply, ok, State#state{channel_up_funs = Funs}};
handle_call({on_up, Name, Fun, _}, _From, State) ->
Funs0 = State#state.up_funs,
Funs = partisan_util:maps_append(Name, Fun, Funs0),
{reply, ok, State#state{up_funs = Funs}};
handle_call({on_down, Name, Fun, #{channel := Channel}}, _From, State)
when is_atom(Channel) ->
Funs0 = State#state.channel_down_funs,
Funs = partisan_util:maps_append({Name, Channel}, Fun, Funs0),
{reply, ok, State#state{channel_down_funs = Funs}};
handle_call({on_down, Name, Fun, _}, _From, State) ->
Funs0 = State#state.down_funs,
Funs = partisan_util:maps_append(Name, Fun, Funs0),
{reply, ok, State#state{down_funs = Funs}};
handle_call({add_pre_interposition_fun, Name, Fun}, _From, #state{} = State) ->
Funs = maps:put(Name, Fun, State#state.pre_interposition_funs),
{reply, ok, State#state{pre_interposition_funs = Funs}};
handle_call({remove_pre_interposition_fun, Name}, _From, #state{} = State) ->
Funs = maps:remove(Name, State#state.pre_interposition_funs),
{reply, ok, State#state{pre_interposition_funs = Funs}};
handle_call({add_interposition_fun, Name, Fun}, _From, #state{} = State) ->
Funs = maps:put(Name, Fun, State#state.interposition_funs),
{reply, ok, State#state{interposition_funs = Funs}};
handle_call({remove_interposition_fun, Name}, _From, #state{} = State) ->
Funs = maps:remove(Name, State#state.interposition_funs),
{reply, ok, State#state{interposition_funs = Funs}};
handle_call(get_interposition_funs, _From, #state{} = State) ->
{reply, {ok, State#state.interposition_funs}, State};
handle_call(get_pre_interposition_funs, _From, #state{} = State) ->
{reply, {ok, State#state.pre_interposition_funs}, State};
handle_call({add_post_interposition_fun, Name, Fun}, _From, #state{} = State) ->
Funs = maps:put(Name, Fun, State#state.post_interposition_funs),
{reply, ok, State#state{post_interposition_funs =Funs}};
handle_call({remove_post_interposition_fun, Name}, _From, #state{}=State) ->
Funs = maps:remove(Name, State#state.post_interposition_funs),
{reply, ok, State#state{post_interposition_funs = Funs}};
handle_call({update_members, _}, _, #state{leaving = true} = State) ->
%% We are leaving so do nothing
{reply, ok, State};
handle_call({update_members, Members}, _From, #state{} = State0) ->
%% For compatibility with external membership services.
Mod = State0#state.membership_strategy,
MState = State0#state.membership_strategy_state,
{Joiners, Leavers} = Mod:compare(Members, MState),
%% Issue leaves.
State1 = lists:foldl(
fun(NodeSpec , S) -> internal_leave(NodeSpec, S) end,
State0,
Leavers
),
%% Issue joins.
State = lists:foldl(
fun(NodeSpec, S) -> internal_join(NodeSpec, undefined, S) end,
State1,
Joiners
),
%% Finally schedule the removal of connections
%% We do this async because internal_leave will schedule the sending of
%% membership update messages
LeavingNodes = [Node || #{name := Node} <- Leavers],
gen_server:cast(?MODULE, {kill_connections, LeavingNodes}),
{reply, ok, State};
handle_call({leave, #{name := Name} = NodeSpec}, From, State0) ->
%% Perform leave.
State = internal_leave(NodeSpec, State0),
case Name == State0#state.name of
true ->
%% Self leave
gen_server:reply(From, ok),
%% We need to stop (to cleanup all connections and state) or do it
%% manually. However, we cannot do it straight away as
%% internal_leave/2 has send some async messages we need to process
%% (casts to ourself) so we cast ourselves a shutdown message.
gen_server:cast(?MODULE, stop),
{noreply, State#state{leaving = true}};
false ->
gen_server:cast(?MODULE, {kill_connections, [NodeSpec]}),
{reply, ok, State}
end;
handle_call({join, #{name := N}}, _From, #state{name = N} = State0) ->
%% Ignoring self join.
{reply, ok, State0};
handle_call({join, NodeSpec}, _From, State0) ->
State = internal_join(NodeSpec, undefined, State0),
{reply, ok, State};
handle_call({sync_join, #{name := N}}, _From, #state{name = N} = State0) ->
%% Ignoring self join.
{reply, ok, State0};
handle_call({sync_join, NodeSpec}, From, State0) ->
?LOG_DEBUG(#{
description => "Starting synchronous join with peer",
node => State0#state.name,
peer => NodeSpec
}),
State = internal_join(NodeSpec, From, State0),
{noreply, State};
handle_call({send_message, Node, Message}, _From, State) ->
schedule_self_message_delivery(
Node,
Message,
?DEFAULT_PARTITION_KEY,
State
),
{reply, ok, State};
handle_call(
{forward_message, Node, Clock, PartitionKey, ServerRef, Msg, Opts},
From,
State) ->
%% Run all interposition functions.
DeliveryFun =
fun() ->
%% Fire pre-interposition functions.
ok = ?FIRE_PRE_INTERPOSITIONS(
forward_message, Node, Msg, State#state.pre_interposition_funs
),
%% Once pre-interposition returns, then schedule for delivery.
Cmd = {
forward_message,
From,
Node,
Clock,
PartitionKey,
ServerRef,
Msg,
Opts
},
gen_server:cast(?MODULE, Cmd)
end,
case partisan_config:get(replaying, false) of
false ->
%% Fire all pre-interposition functions, and then deliver,
%% preserving serial order of messages.
DeliveryFun();
true ->
%% Allow the system to proceed, and the message will be delivered
%% once pre-interposition is done.
spawn_link(DeliveryFun)
end,
{noreply, State};
handle_call({receive_message, Node, Channel, Msg}, From, State) ->
DeliveryFun = fun() ->
ok = ?FIRE_PRE_INTERPOSITIONS(
receive_message, Node, Msg, State#state.pre_interposition_funs
),
%% Once pre-interposition returns, then schedule for delivery.
gen_server:cast(?MODULE, {receive_message, Node, Channel, From, Msg})
end,
case partisan_config:get(replaying, false) of
false ->
%% Fire all pre-interposition functions, and then deliver,
%% preserving serial order of messages.
DeliveryFun();
true ->
%% Allow the system to proceed, and the message will be delivered
%% once pre-interposition is done.
spawn_link(DeliveryFun)
end,
{noreply, State};
handle_call(members_for_orchestration, _From, State) ->
{reply, {ok, State#state.members}, State};
handle_call(members, _From, State) ->
Members = [Node || #{name := Node} <- State#state.members],
{reply, {ok, Members}, State};
handle_call({member, Node}, _From, State) ->
IsMember = lists:any(
fun(#{name := X}) -> X =:= Node end,
State#state.members
),
{reply, IsMember, State};
handle_call(get_local_state, _From, State) ->
{reply, {ok, State#state.membership_strategy_state}, State};
handle_call(Event, _From, State) ->
?LOG_WARNING(#{description => "Unhandled call event", event => Event}),
{reply, ok, State}.
-spec handle_cast(term(), t()) ->
{noreply, t()}
| {stop, normal, t()}.
handle_cast(stop, State) ->
%% We send ourselves this message when we left the cluster
%% We stop to cleanup, supervisor will start us again,
%% terminate/1 will kill all connections.
{stop, normal, State};
handle_cast({kill_connections, Nodes}, State) ->
ok = kill_connections(Nodes, State),
{noreply, State};
handle_cast({receive_message, Node, Channel, From, Msg0}, State) ->
%% Filter messages using interposition functions.
%% eqwalizer:ignore Node
Msg1 = ?FIRE_INTERPOSITIONS(
receive_message, Node, Msg0, State#state.interposition_funs
),
%% eqwalizer:ignore Node
ok = ?FIRE_POST_INTERPOSITIONS(
receive_message, Node, Msg0, Msg1, State#state.post_interposition_funs
),
case Msg1 of
undefined ->
%% eqwalizer:ignore From
gen_server:reply(From, ok),
{noreply, State};
{'$delay', Msg} ->
?LOG_DEBUG(
"Delaying receive_message due to interposition result: ~p",
[Msg]
),
gen_server:cast(
?MODULE, {receive_message, Node, Channel, From, Msg}
),
{noreply, State};
_ ->
handle_message(Msg1, From, Channel, State)
end;
handle_cast(
{forward_message, From, Node, Clock, PartitionKey, ServerRef, Msg0, Opts},
State) ->
#state{
vclock = VClock0
} = State,
%% eqwalizer:ignore Node
Msg = ?FIRE_INTERPOSITIONS(
forward_message, Node, Msg0, State#state.interposition_funs
),
%% Increment the clock.
VClock = partisan_vclock:increment(State#state.name, VClock0),
%% Are we using causality?
%% eqwalizer:ignore Opts
CausalLabel = maps:get(causal_label, Opts, undefined),
%% Use local information for message unless it's a causal message.
{MsgClock, FullMessage} =
case CausalLabel of
undefined ->
%% Generate a message clock or use the provided clock.
LocalClock = case Clock of
undefined ->
{undefined, VClock};
Clock ->
Clock
end,
{LocalClock, Msg};
CausalLabel ->
case Clock of
undefined ->
%% First time through.
%% We don't have a clock yet,
%% get one using the causality backend.
{ok, LocalClock0, CausalMessage} =
partisan_causality_backend:emit(
CausalLabel, Node, ServerRef, Msg
),
%% Wrap the clock with a scope.
%% TODO: Maybe do this wrapping inside of the causality
%% backend.
LocalClock = {CausalLabel, LocalClock0},
%% Return clock and wrapped message.
{LocalClock, CausalMessage};
_ ->
%% Retransmission.
%% Get the clock and message we used last time.
{ok, LocalClock, CausalMessage} =
partisan_causality_backend:reemit(
CausalLabel, Clock
),
%% Return clock and wrapped message.
{LocalClock, CausalMessage}
end
end,
case Msg of
undefined ->
%% Store for reliability, if necessary.
case maps:get(ack, Opts, false) of
true ->
%% Acknowledgements.
case maps:get(retransmission, Opts, false) of
true ->
RescheduleableMessage = {
forward_message,
From,
Node,
MsgClock,
PartitionKey,
ServerRef,
Msg0,
Opts
},
partisan_acknowledgement_backend:store(
MsgClock, RescheduleableMessage
);
false ->
ok
end;
false ->
ok
end,
ok = ?FIRE_POST_INTERPOSITIONS(
forward_message,
Node,
Msg0,
FullMessage,
State#state.post_interposition_funs
),
?LOG_DEBUG(
"~p: Message ~p after send interposition is: ~p",
[State#state.name, Msg0, FullMessage]
),
case From of
undefined ->
ok;
_ ->
gen_server:reply(From, ok)
end,
{noreply, State#state{vclock = VClock}};
{'$delay', NewMessage} ->
?LOG_DEBUG(
"Delaying receive_message due to interposition result: ~p",
[NewMessage]
),
gen_server:cast(
?MODULE,
{
forward_message,
From,
Node,
Clock,
PartitionKey,
ServerRef,
NewMessage,
Opts
}
),
{noreply, State};
_ ->
%% Store for reliability, if necessary.
Result = case maps:get(ack, Opts, false) of
false ->
%% Tracing.
WrappedMessage = {forward_message, ServerRef, FullMessage},
ok = ?FIRE_POST_INTERPOSITIONS(
forward_message,
Node,
{forward_message, ServerRef, Msg0},
WrappedMessage,
State#state.post_interposition_funs
),
%% Send message along.
do_send_message(
Node,
PartitionKey,
WrappedMessage,
Opts,
State
);
true ->
%% Tracing.
WrappedMessage = {
forward_message,
State#state.name,
MsgClock,
ServerRef,
FullMessage
},
?LOG_DEBUG(
"should acknowledge message: ~p", [WrappedMessage]
),
ok = ?FIRE_POST_INTERPOSITIONS(
forward_message,
Node,
{
forward_message,
State#state.name,
MsgClock,
ServerRef,
Msg0
},
WrappedMessage,
State#state.post_interposition_funs
),
?LOG_DEBUG(
"~p: Sending message ~p with clock: ~p",
[State#state.name, Msg, MsgClock]
),
?LOG_DEBUG(
"~p: Message after send interposition is: ~p",
[State#state.name, Msg]
),
%% Acknowledgements.
case maps:get(retransmission, Opts, false) of
false ->
RescheduleableMessage = {
forward_message,
From,
Node,
MsgClock,
PartitionKey,
ServerRef,
Msg0,
Opts
},
partisan_acknowledgement_backend:store(
MsgClock, RescheduleableMessage
);
true ->
ok
end,
%% Send message along.
do_send_message(
Node,
PartitionKey,
WrappedMessage,
Opts,
State
)
end,
case From of
undefined ->
ok;
_ ->
gen_server:reply(From, Result)
end,
{noreply, State#state{vclock = VClock}}
end;
handle_cast(Event, State) ->
?LOG_WARNING(#{description => "Unhandled cast event", event => Event}),
{noreply, State}.
-spec handle_info(info(), t()) -> {noreply, t()}.
handle_info(tree_refresh, State) ->
%% Get lazily computed outlinks.
OutLinks = retrieve_outlinks(),
%% Reschedule.
schedule_tree_refresh(),
{noreply, State#state{out_links = OutLinks}};
handle_info(distance, State0) ->
%% Establish any new connections.
State = establish_connections(State0),
%% Record time.
Time = erlang:timestamp(),
%% Send distance requests.
ok = lists:foreach(
fun(Peer) ->
schedule_self_message_delivery(
Peer,
{ping, State0#state.node_spec, Peer, Time},
?DEFAULT_PARTITION_KEY,
State,
#{channel => ?MEMBERSHIP_CHANNEL}
)
end,
State#state.members
),
schedule_distance(),
{noreply, State};
handle_info(instrumentation, State) ->
MessageQueueLen = process_info(self(), message_queue_len),
?LOG_DEBUG("message_queue_len: ~p", [MessageQueueLen]),
schedule_instrumentation(),
{noreply, State};
handle_info(periodic, #state{} = State0) ->
#state{
membership_strategy=MStrategy,
membership_strategy_state=MState0
} = State0,
{ok, Members, OutgoingMessages, MState} =
partisan_membership_strategy:periodic(MStrategy, MState0),
%% Send outgoing messages.
ok = lists:foreach(
fun({Node, Message}) ->
schedule_self_message_delivery(
Node,
Message,
?DEFAULT_PARTITION_KEY,
State0,
#{channel => ?MEMBERSHIP_CHANNEL}
)
end,
OutgoingMessages
),
State1 = State0#state{
members = Members,
membership_strategy_state = MState
},
%% Establish any new connections.
State = establish_connections(State1),
schedule_periodic(),
{noreply, State};
handle_info(retransmit, State) ->
RetransmitFun = fun({_, {forward_message, From, Node, Clock, PartitionKey, ServerRef, Message, Options}}) ->
?LOG_DEBUG(
"~p no acknowledgement yet, "
"restranmitting message ~p with clock ~p to ~p",
[State#state.name, Message, Clock, Node]
),
ok = ?FIRE_PRE_INTERPOSITIONS(
forward_message, Node, Message, State#state.pre_interposition_funs
),
%% Schedule message for redelivery.
RetryOptions = Options#{retransmission => true},
gen_server:cast(
?MODULE,
{
forward_message,
From,
Node,
Clock,
PartitionKey,
ServerRef,
Message,
RetryOptions
}
)
end,
{ok, Outstanding} = partisan_acknowledgement_backend:outstanding(),
case partisan_config:get(replaying, false) of
false ->
%% Fire all pre-interposition functions, and then deliver,
%% preserving serial order of messages.
lists:foreach(RetransmitFun, Outstanding);
true ->
%% Allow the system to proceed, and the message will be delivered
%% once pre-interposition is done.
lists:foreach(
fun(OutstandingMessage) ->
spawn_link(fun() ->
RetransmitFun(OutstandingMessage)
end)
end,
Outstanding
)
end,
%% Reschedule retransmission.
schedule_retransmit(),
{noreply, State};
handle_info(connections, State0) ->
%% TODO #244 move connection establishing to a helper process as these tasks
%% interleave with message forwarding. Also consider having a process per
%% channel or channel connection.
State1 = establish_connections(State0),
%% Advance sync_join's if we have enough open connections to remote host.
State = maybe_reply_sync_joins(State1),
schedule_connections(),
{noreply, State};
handle_info({'EXIT', Pid, Reason}, State0)->
?LOG_DEBUG(#{
description => "Connection closed",
reason => Reason
}),
%% A connection has closed, prune it from the connections table
%% eqwalizer:ignore Pid
try partisan_peer_connections:prune(Pid) of
{Info, [Connection]} ->
NodeSpec = partisan_peer_connections:node_spec(Info),
#{name := Node} = NodeSpec,
Channel = partisan_peer_connections:channel(Connection),
case partisan_peer_connections:count(Node, Channel) of
0 ->
ok = down(NodeSpec, Channel, State0);
_ ->
ok
end,
State =
case partisan_peer_connections:count(Info) of
0 ->
%% This was the last connection so the node is down.
%% We notify all subscribers.
ok = down(NodeSpec, State0),
%% If still a member we add it to pending, so that we
%% can compute the on_up signal
maybe_append_pending(NodeSpec, State0);
_ ->
State0
end,
{noreply, State}
catch
error:badarg ->
%% Weird, connection pid did not exist
{noreply, State0}
end;
handle_info(
{connected, NodeSpec, _Channel, _Tag, RemoteState}, State0) ->
#state{
pending = Pending0,
members = Members0,
membership_strategy = MStrategy,
membership_strategy_state = MState0
} = State0,
?LOG_DEBUG(#{
description => "Node connected!",
node => NodeSpec,
pending => Pending0,
membership => State0#state.members
}),
State1 = case lists:member(NodeSpec, Pending0) of
true ->
%% Move out of pending.
Pending = Pending0 -- [NodeSpec],
%% Update membership by joining with remote membership.
{ok, Members, OutgoingMessages, MState} =
partisan_membership_strategy:join(
MStrategy, NodeSpec, RemoteState, MState0
),
%% Gossip the new membership.
lists:foreach(
fun({Node, Message}) ->
schedule_self_message_delivery(
Node,
Message,
?DEFAULT_PARTITION_KEY,
State0,
#{channel => ?MEMBERSHIP_CHANNEL}
)
end,
OutgoingMessages
),
%% Notify event handlers
ok = case Members == Members0 of
true ->
ok;
false ->
partisan_peer_service_events:update(Members)
end,
%% notify subscribers
up(NodeSpec, State0),
State0#state{
pending = Pending,
members = Members,
membership_strategy_state = MState
};
false ->
State0
end,
%% Notify for sync join.
State = maybe_reply_sync_joins(State1),
{noreply, State};
handle_info(Msg, State) ->
handle_message(Msg, undefined, ?DEFAULT_CHANNEL, State).
-spec terminate(term(), t()) -> term().
terminate(_Reason, #state{}) ->
ok = partisan_peer_connections:kill_all().
-spec code_change(term() | {down, term()}, t(), term()) ->
{ok, t()}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%% =============================================================================
%% PRIVATE
%% =============================================================================
%% @private
gen_actor(Name) ->
Unique = erlang:unique_integer([positive]),
TS = integer_to_list(Unique),
Term = atom_to_list(Name) ++ TS,
crypto:hash(sha, Term).
%% -----------------------------------------------------------------------------
%% @private
%% @doc Establish any new connections and prunes no longer valid nodes.
%% @end
%% -----------------------------------------------------------------------------
-spec establish_connections(t()) -> t().
establish_connections(State) ->
Pending = State#state.pending,
Members = State#state.members,
%% Compute list of nodes that should be connected.
Nodes = Members ++ Pending,
%% Reconnect disconnected members and members waiting to join.
LoL = lists:foldl(
fun
(#{name := Name}, Acc) when Name == State#state.name ->
%% We exclude ourselves
Acc;
(#{name := _} = Node, Acc) ->
%% This function call returns the a list of invalid
%% NodeSpecs (nodes that have an invalid IP address because we
%% already have a connection to NodeSpec.node on another IP
%% address).
%% We then remove those NodeSpes from the membership set and
%% update ourselves without the need for sending any leave/join
%% gossip.
{ok, StaleSpecs} = partisan_peer_service_manager:connect(
Node, #{prune => true}
),
%% We will call lists:append at the end
[StaleSpecs | Acc]
end,
[],
Nodes
),
prune(lists:append(LoL), State).
%% -----------------------------------------------------------------------------
%% @private
%% @doc
%% @end
%% -----------------------------------------------------------------------------
prune([], State) ->
State;
prune(L, State) ->
Mod = State#state.membership_strategy,
MState0 = State#state.membership_strategy_state,
{ok, Members, MState} = partisan_membership_strategy:prune(Mod, L, MState0),
State#state{members = Members, membership_strategy_state = MState}.
%% @private
kill_connections(Nodes, State) ->
Fun = fun(Node) ->
ok = down(Node, State)
end,
partisan_peer_service_manager:disconnect(Nodes, Fun).
%% @private
handle_message(
{ping, SrcNode, DestNode, SrcTime}, From, _Channel, #state{} = State0) ->
%% Establish any new connections.
State = establish_connections(State0),
%% Send ping response.
schedule_self_message_delivery(
SrcNode,
{pong, SrcNode, DestNode, SrcTime},
?DEFAULT_PARTITION_KEY,
State,
%% we coerce channel, regardless of _Channel
#{channel => ?MEMBERSHIP_CHANNEL}
),
maybe_reply(From, ok),
{noreply, State};
handle_message(
{pong, SrcNode, DestNode, SrcTime}, From, _Channel, #state{} = State) ->
%% Compute difference.
DistanceMetrics0 = State#state.distance_metrics,
ArrivalTime = erlang:timestamp(),
Difference = timer:now_diff(ArrivalTime, SrcTime),
?LOG_TRACE(
"Updating distance metric for node ~p => ~p communication: ~p",
[SrcNode, DestNode, Difference]
),
%% Update differences.
DistanceMetrics = maps:put(DestNode, Difference, DistanceMetrics0),
%% Store in pdict.
put(distance_metrics, DistanceMetrics),
maybe_reply(From, ok),
{noreply, State#state{distance_metrics=DistanceMetrics}};
handle_message(
{membership_strategy, ProtocolMsg}, From, _Channel, #state{} = State0) ->
#state{
members = Members0,
membership_strategy = MStrategy,
membership_strategy_state = MState0
} = State0,
%% Process the protocol message.
{ok, Members, OutgoingMessages, MState} =
partisan_membership_strategy:handle_message(
MStrategy, ProtocolMsg, MState0
),
%% Update users of the peer service.
case Members == Members0 of
true ->
ok;
false ->
partisan_peer_service_events:update(Members)
end,
%% Send outgoing messages.
lists:foreach(
fun({Node, Message}) ->
schedule_self_message_delivery(
Node,
Message,
?DEFAULT_PARTITION_KEY,
State0,
%% we coerce channel, regardless of _Channel
#{channel => ?MEMBERSHIP_CHANNEL}
)
end,
OutgoingMessages
),
State1 = State0#state{
members = Members,
membership_strategy_state = MState
},
{Pending, LeavingNodes} = pending_leavers(State1),
State2 = State1#state{pending = Pending},
%% Establish any new connections.
State = establish_connections(State2),
gen_server:cast(?MODULE, {kill_connections, LeavingNodes}),
case lists:member(State#state.node_spec, Members) of
false ->
?LOG_INFO(#{
description => "Shutting down: membership doesn't contain us",
reason => "We've been removed from the cluster."
}),
?LOG_DEBUG(#{
membership => Members
}),
%% Shutdown if we've been removed from the cluster.
{stop, normal, State};
true ->
maybe_reply(From, ok),
{noreply, State}
end;
%% Causal and acknowledged messages.
handle_message(
{forward_message, SrcNode, MsgClock, ServerRef, Msg},
From,
Channel,
#state{} = State) when is_tuple(Msg) andalso element(1, Msg) == causal ->
{causal, Label, _, _, _, _, _} = Msg,
%% Send message acknowledgement.
send_acknowledgement(SrcNode, Channel, MsgClock, State),
case partisan_causality_backend:is_causal_message(Msg) of
true ->
partisan_causality_backend:receive_message(Label, Msg);
false ->
%% Attempt message delivery.
partisan_peer_service_manager:deliver(ServerRef, Msg)
end,
maybe_reply(From, ok),
{noreply, State};
%% Acknowledged messages.
handle_message(
{forward_message, SrcNode, MsgClock, ServerRef, Msg},
From,
Channel,
#state{} = State) ->
%% Send message acknowledgement.
send_acknowledgement(SrcNode, Channel, MsgClock, State),
partisan_peer_service_manager:deliver(ServerRef, Msg),
maybe_reply(From, ok),
{noreply, State};
%% Causal messages.
handle_message(
{forward_message, ServerRef, {causal, Label, _, _, _, _, _} = Msg},
From,
_Channel,
State) ->
case partisan_causality_backend:is_causal_message(Msg) of
true ->
partisan_causality_backend:receive_message(Label, Msg);
false ->
%% Attempt message delivery.
partisan_peer_service_manager:deliver(ServerRef, Msg)
end,
maybe_reply(From, ok),
{noreply, State};
%% Best-effort messages.
%% TODO: Maybe remove me.
handle_message({forward_message, ServerRef, Msg},
From,
_Channel,
State) ->
partisan_peer_service_manager:deliver(ServerRef, Msg),
maybe_reply(From, ok),
{noreply, State};
handle_message({ack, MsgClock},
From,
_Channel,
State) ->
partisan_acknowledgement_backend:ack(MsgClock),
maybe_reply(From, ok),
{noreply, State};
handle_message(Msg,
_From,
_Channel,
State) ->
?LOG_WARNING(#{description => "Unhandled message", message => Msg}),
{noreply, State}.
%% @private
schedule_distance() ->
case partisan_config:get(distance_enabled, false) of
true ->
DistanceInterval = partisan_config:get(distance_interval, 10000),
erlang:send_after(DistanceInterval, ?MODULE, distance);
false ->
ok
end.
%% @private
schedule_instrumentation() ->
case partisan_config:get(instrumentation, false) of
true ->
erlang:send_after(1000, ?MODULE, instrumentation);
_ ->
ok
end.
%% @private
schedule_periodic() ->
case partisan_config:get(periodic_enabled, false) of
true ->
Time = partisan_config:get(periodic_interval, ?PERIODIC_INTERVAL),
erlang:send_after(Time, ?MODULE, periodic);
false ->
ok
end.
%% @private
schedule_retransmit() ->
Time = partisan_config:get(retransmit_interval, 1000),
erlang:send_after(Time, ?MODULE, retransmit).
%% @private
schedule_connections() ->
Time = partisan_config:get(connection_interval, 1000),
erlang:send_after(Time, ?MODULE, connections).
%% @private
do_send_message(Node, PartitionKey, Message, Options, State) ->
%% Find a connection for the remote node, if we have one.
Channel = maps:get(channel, Options, ?DEFAULT_CHANNEL),
Res = partisan_peer_connections:dispatch_pid(Node, Channel, PartitionKey),
case Res of
{ok, Pid} ->
gen_server:cast(Pid, {send_message, Message});
{error, Reason} ->
%% We were connected, but we're not anymore, or never connected
case partisan_config:get(broadcast, false) of
true ->
case maps:get(transitive, Options, false) of
true ->
?LOG_DEBUG(
"Performing tree forward from node ~p "
"to node ~p and message: ~p",
[State#state.name, Node, Message]
),
TTL = partisan_config:get(relay_ttl, ?RELAY_TTL),
do_tree_forward(
Node,
PartitionKey,
Message,
Options,
TTL,
State
);
false ->
ok
end;
false ->
case Reason of
disconnected ->
?LOG_TRACE(
"Node ~p was connected, "
"but is now disconnected!",
[Node]
),
{error, disconnected};
not_yet_connected ->
?LOG_TRACE(
"Node ~p not yet connected!",
[Node]
),
{error, not_yet_connected}
end
end
end.
%% @private
up(NodeOrSpec, State) ->
apply_funs(NodeOrSpec, State#state.up_funs).
%% up(NodeOrSpec, Channel, State) ->
%% apply_funs(NodeOrSpec, State#state.up_funs).
%% @private
down(NodeOrSpec, State) ->
apply_funs(NodeOrSpec, State#state.down_funs).
%% @private
down(NodeSpec, _Channel, State) ->
%% TODO use Channel
apply_funs(NodeSpec, State#state.channel_down_funs).
%% @private
apply_funs(Node, Mapping) when is_atom(Node) ->
?LOG_DEBUG(#{
description => "Node status change notification",
node => Node,
funs => Mapping
}),
Funs = lists:append(
%% Notify functions matching the wildcard '_'
maps:get('_', Mapping, []),
%% Notify functions matching Node
maps:get(Node, Mapping, [])
),
_ = [
begin
case erlang:fun_info(F, arity) of
{arity, 0} -> catch F();
{arity, 1} -> catch F(Node)
end
end || F <- Funs
],
ok;
apply_funs(#{name := Node}, Mapping) ->
apply_funs(Node, Mapping).
%% @private
pending_leavers(#state{} = State) ->
Members = ?SET_FROM_LIST(State#state.members),
Pending0 = ?SET_FROM_LIST(State#state.pending),
Connected = ?SET_FROM_LIST(partisan_peer_connections:node_specs()),
%% Connected nodes that are no longer members
Leavers = sets:to_list(sets:subtract(Connected, Members)),
%% Disconnected nodes that are members
Pending = sets:to_list(
sets:subtract(sets:union(Members, Pending0), Connected)
),
{Pending, Leavers}.
%% @private
internal_leave(#{name := Name} = Node, State0) ->
#state{
membership_strategy = MStrategy,
membership_strategy_state = MState0
} = State0,
?LOG_DEBUG(#{
description => "Processing leave",
leaving_node => Name
}),
{ok, Members, OutgoingMessages, MState} =
partisan_membership_strategy:leave(MStrategy, Node, MState0),
?LOG_DEBUG(#{
description => "Processing leave",
leaving_node => Name,
outgoing_messages => OutgoingMessages,
new_membership => Members
}),
State1 = State0#state{
members = Members,
membership_strategy_state = MState
},
%% Establish any new connections.
%% This will also prune no longer valid node_specs, setting the new
%% membership in State
State = establish_connections(State1),
%% Transmit outgoing messages.
lists:foreach(
fun
({#{name := Peername}, Message}) ->
schedule_self_message_delivery(
Peername,
Message,
?DEFAULT_PARTITION_KEY,
State,
#{channel => ?MEMBERSHIP_CHANNEL}
)
end,
OutgoingMessages
),
partisan_peer_service_events:update(State#state.members),
State.
%% @private
internal_join(#{name := Node} = NodeSpec, From, #state{} = State0) ->
ok = partisan_util:maybe_connect_disterl(Node),
%% Sleep before connecting, to avoid a rush on connections.
avoid_rush(),
%% Add to list of pending connections.
Pending0 = State0#state.pending,
Pending = Pending0 ++ [NodeSpec],
State = maybe_add_sync_join(
NodeSpec, From, State0#state{pending = Pending}
),
%% Establish any new connections.
establish_connections(State).
%% @private
maybe_add_sync_join(_, undefined, State) ->
State;
maybe_add_sync_join(NodeSpec, From, State) ->
SyncJoins0 = State#state.sync_joins,
SyncJoins =
try
Fun = fun(Value) -> sets:add_element(From, Value) end,
maps:update_with(NodeSpec, Fun, SyncJoins0)
catch
error:{badkey, NodeSpec} ->
maps:put(
NodeSpec,
sets:from_list([From], [{version, 2}]),
SyncJoins0
)
end,
State#state{sync_joins = SyncJoins}.
maybe_reply_sync_joins(State) ->
Fun = fun(#{name := Node} = NodeSpec, Set, Acc) ->
case partisan:is_fully_connected(NodeSpec) of
true ->
?LOG_DEBUG("Node ~p is now fully connected.", [Node]),
[
gen_server:reply(FromPid, ok)
|| FromPid <- sets:to_list(Set)
],
%% We remove the entry from map
Acc;
false ->
%% We keep the Node in the new map
maps:put(NodeSpec, Set, Acc)
end
end,
SyncJoins = maps:fold(Fun, maps:new(), State#state.sync_joins),
State#state{sync_joins = SyncJoins}.
%% @private
rand_bits(BitLen) ->
Bytes = (BitLen + 7) div 8,
<<Result:BitLen/bits, _/bits>> = crypto:strong_rand_bytes(Bytes),
Result.
%% @private
avoid_rush() ->
%% Sleep before connecting, to avoid a rush on connections.
Jitter = partisan_config:get(connection_jitter, ?CONNECTION_JITTER),
case partisan_config:get(jitter, false) of
true ->
timer:sleep(rand:uniform(Jitter));
false ->
timer:sleep(Jitter)
end.
%% @private
do_tree_forward(Node, PartitionKey, Message, Opts, TTL, State) ->
?LOG_TRACE(
"Attempting to forward message ~p from ~p to ~p.",
[Message, State#state.name, Node]
),
%% Preempt with user-supplied outlinks.
UserOutLinks = maps:get(out_links, Opts, undefined),
OutLinks = case UserOutLinks of
undefined ->
try retrieve_outlinks() of
Value ->
Value
catch
_:Reason ->
?LOG_ERROR(#{
description => "Outlinks retrieval failed",
reason => Reason
}),
[]
end;
OL ->
OL -- [State#state.name]
end,
%% Send messages, but don't attempt to forward again, if we aren't
%% connected.
_ = lists:foreach(
fun(Peer) ->
?LOG_TRACE(
"Forwarding relay message ~p to node ~p "
"for node ~p from node ~p",
[Message, Peer, Node, State#state.name]
),
RelayMessage = {relay_message, Node, Message, TTL - 1},
schedule_self_message_delivery(
Peer,
RelayMessage,
PartitionKey,
State,
maps:without([transitive], Opts)
)
end,
OutLinks
),
ok.
%% @private
retrieve_outlinks() ->
?LOG_TRACE(#{description => "About to retrieve outlinks..."}),
Root = partisan:node(),
OutLinks =
try
{EagerPeers, _LazyPeers} =
partisan_plumtree_broadcast:debug_get_peers(Root, Root, 1000),
ordsets:to_list(EagerPeers) -- [Root]
catch
_:Reason ->
?LOG_INFO(#{
description => "Request to get outlinks failed",
reason => Reason
}),
[]
end,
?LOG_TRACE("Finished getting outlinks: ~p", [OutLinks]),
OutLinks.
%% @private
schedule_tree_refresh() ->
case partisan_config:get(broadcast, false) of
true ->
Period = partisan_config:get(tree_refresh, 1000),
erlang:send_after(Period, ?MODULE, tree_refresh);
false ->
ok
end.
%% @private
schedule_self_message_delivery(Node, Message, PartitionKey, State) ->
schedule_self_message_delivery(Node, Message, PartitionKey, State, #{}).
%% @private
schedule_self_message_delivery(Node, Message, PartitionKey, State, Options) ->
Funs = State#state.pre_interposition_funs,
DeliveryFun = fun() ->
ok = ?FIRE_PRE_INTERPOSITIONS(forward_message, Node, Message, Funs),
%% Once pre-interposition returns, then schedule for delivery.
gen_server:cast(?MODULE, {
forward_message,
undefined, % from
Node,
undefined, % clock
PartitionKey,
?MODULE, % ServerRef
Message,
Options
})
end,
case partisan_config:get(replaying, false) of
false ->
%% Fire all pre-interposition functions, and then deliver,
%% preserving serial order of messages.
DeliveryFun();
true ->
%% Allow the system to proceed, and the message will be delivered
%% once pre-interposition is done.
spawn_link(DeliveryFun)
end,
ok.
%% @private
send_acknowledgement(Node, Channel, MsgClock, State) ->
%% Generate message.
Message = {ack, MsgClock},
%% Send on the default channel.
schedule_self_message_delivery(
Node,
Message,
?DEFAULT_PARTITION_KEY,
State,
#{channel => Channel}
).
%% @private
maybe_reply(From, Response) ->
case From of
undefined ->
ok;
_ ->
gen_server:reply(From, Response)
end.
%% @private
maybe_append_pending(NodeSpec, #state{} = State) ->
Pending0 = State#state.pending,
Members = State#state.members,
Pending = case lists:member(NodeSpec, Members) of
true ->
Pending0 ++ [NodeSpec];
false ->
Pending0
end,
State#state{pending = Pending}.