%%% Copyright (c) Meta Platforms, Inc. and affiliates. All rights reserved.
%%%
%%% This source code is licensed under the Apache 2.0 license found in
%%% the LICENSE file in the root directory of this source tree.
%%%
%%% This module implements RPC of raft consensus protocol. See raft spec
%%% on https://raft.github.io/. A wa_raft instance is a participant in
%%% a consensus group. Each participant plays a certain role (follower,
%%% leader or candidate). The mission of a consensus group is to
%%% implement a replicated state machine in a distributed cluster.
-module(wa_raft_server).
-compile(warn_missing_spec_all).
-behaviour(gen_statem).
-compile({inline, [require_valid_state/1]}).
%%------------------------------------------------------------------------------
%% RAFT Server - OTP Supervision
%%------------------------------------------------------------------------------
-export([
child_spec/1,
start_link/1
]).
%%------------------------------------------------------------------------------
%% RAFT Server - Public APIs - RAFT Cluster Configuration
%%------------------------------------------------------------------------------
-export([
latest_config_version/0
]).
%% Inspection of cluster configuration
-export([
get_config_version/1,
get_config_participants/1,
get_config_members/1,
get_config_full_members/1,
get_config_witness_members/1,
get_config_witnesses/1,
is_data_replica/2,
is_witness/2
]).
%% Creation and modification of cluster configuration
-export([
make_config/0,
make_config/1,
make_config/2,
make_config/3,
normalize_config/1
]).
% Stubbing log entries for witnesses
-export([
stub_entries_for_witness/1
]).
%% Modification of cluster configuration
-export([
set_config_members/2,
set_config_members/3,
set_config_members/4
]).
%%------------------------------------------------------------------------------
%% RAFT Server - Public APIs
%%------------------------------------------------------------------------------
-export([
get_current_config/1
]).
-export([
status/1,
status/2,
membership/1
]).
%%------------------------------------------------------------------------------
%% RAFT Server - Internal APIs - Local Options
%%------------------------------------------------------------------------------
-export([
default_name/2,
registered_name/2
]).
%%------------------------------------------------------------------------------
%% RAFT Server - Internal APIs - RPC Handling
%%------------------------------------------------------------------------------
-export([
make_rpc/3,
parse_rpc/2
]).
%%------------------------------------------------------------------------------
%% RAFT Server - Internal APIs - Commands
%%------------------------------------------------------------------------------
-export([
commit/4,
read/2,
snapshot_available/3,
adjust_config/2,
adjust_config/3,
adjust_config/4,
adjust_membership/3,
adjust_membership/4,
refresh_config/1,
trigger_election/1,
trigger_election/2,
promote/2,
promote/3,
resign/1,
handover/1,
handover/2,
handover_candidates/1,
is_peer_ready/2,
disable/2,
enable/1,
bootstrap/4,
notify_complete/1
]).
%%------------------------------------------------------------------------------
%% RAFT Server - State Machine Implementation
%%------------------------------------------------------------------------------
%% General callbacks
-export([
init/1,
callback_mode/0,
terminate/3
]).
%% State-specific callbacks
-export([
stalled/3,
leader/3,
follower/3,
candidate/3,
disabled/3,
witness/3
]).
%%------------------------------------------------------------------------------
%% RAFT Server - Test Exports
%%------------------------------------------------------------------------------
-ifdef(TEST).
-export([
config/1,
compute_member_quorum/2,
leader_adjust_config/2
]).
-endif.
%%------------------------------------------------------------------------------
%% RAFT Server - Public Types
%%------------------------------------------------------------------------------
-export_type([
state/0,
config/0,
config_all/0,
membership/0,
status/0,
config_action/0
]).
%%------------------------------------------------------------------------------
%% RAFT Server - Internal Types
%%------------------------------------------------------------------------------
-export_type([
election_type/0
]).
%%------------------------------------------------------------------------------
-include_lib("wa_raft/include/wa_raft.hrl").
-include_lib("wa_raft/include/wa_raft_logger.hrl").
-include_lib("wa_raft/include/wa_raft_rpc.hrl").
%%------------------------------------------------------------------------------
%% Section 5.2. Randomized election timeout for fast election and to avoid split votes
-define(ELECTION_TIMEOUT(State), {state_timeout, random_election_timeout(State), election}).
%% Timeout in milliseconds before the next heartbeat is to be sent by a RAFT leader with no pending log entries
-define(HEARTBEAT_TIMEOUT(State), {state_timeout, ?RAFT_HEARTBEAT_INTERVAL(State#raft_state.application, State#raft_state.table), heartbeat}).
%% Timeout in milliseconds before the next heartbeat is to be sent by a RAFT leader with pending log entries
-define(COMMIT_BATCH_TIMEOUT(State), {state_timeout, ?RAFT_COMMIT_BATCH_INTERVAL(State#raft_state.application, State#raft_state.table), batch_commit}).
%%------------------------------------------------------------------------------
-define(SERVER_LOG_PREFIX, "Server[~0p, term ~0p, ~0p] ").
-define(SERVER_LOG_FORMAT(Format), ?SERVER_LOG_PREFIX Format).
-define(SERVER_LOG_ARGS(State, Data, Args), [(Data)#raft_state.name, (Data)#raft_state.current_term, require_valid_state(State) | Args]).
% elp:ignore W0002 (unused_macro) - Keeping for consistency
-define(SERVER_LOG_ERROR(Data, Format, Args), ?SERVER_LOG_ERROR(?FUNCTION_NAME, Data, Format, Args)).
-define(SERVER_LOG_ERROR(State, Data, Format, Args), ?RAFT_LOG_ERROR(?SERVER_LOG_FORMAT(Format), ?SERVER_LOG_ARGS(State, Data, Args))).
-define(SERVER_LOG_WARNING(Data, Format, Args), ?SERVER_LOG_WARNING(?FUNCTION_NAME, Data, Format, Args)).
-define(SERVER_LOG_WARNING(State, Data, Format, Args), ?RAFT_LOG_WARNING(?SERVER_LOG_FORMAT(Format), ?SERVER_LOG_ARGS(State, Data, Args))).
-define(SERVER_LOG_NOTICE(Data, Format, Args), ?SERVER_LOG_NOTICE(?FUNCTION_NAME, Data, Format, Args)).
-define(SERVER_LOG_NOTICE(State, Data, Format, Args), ?RAFT_LOG_NOTICE(?SERVER_LOG_FORMAT(Format), ?SERVER_LOG_ARGS(State, Data, Args))).
-define(SERVER_LOG_DEBUG(Data, Format, Args), ?SERVER_LOG_DEBUG(?FUNCTION_NAME, Data, Format, Args)).
-define(SERVER_LOG_DEBUG(State, Data, Format, Args), ?RAFT_LOG_DEBUG(?SERVER_LOG_FORMAT(Format), ?SERVER_LOG_ARGS(State, Data, Args))).
%%------------------------------------------------------------------------------
%% RAFT Server - Public Types
%%------------------------------------------------------------------------------
-type state() ::
stalled |
leader |
follower |
candidate |
disabled |
witness.
-type term_or_offset() :: wa_raft_log:log_term() | current | next | {next, Offset :: pos_integer()}.
-type peer() :: {Name :: atom(), Node :: node()}.
-type membership() :: [peer()].
-type config() ::
#{
version := 1,
participants := membership(),
membership := membership(),
witness := membership()
}.
-type config_all() :: config_v1_all().
-type config_v1_all() ::
#{
version := 1,
participants => membership(),
membership => membership(),
witness => membership()
}.
-type status() :: [status_element()].
-type status_element() ::
{state, state()}
| {id, atom()}
| {peers, [{atom(), {node(), atom()}}]}
| {partition, wa_raft:partition()}
| {partition_path, file:filename_all()}
| {current_term, wa_raft_log:log_term()}
| {voted_for, node()}
| {commit_index, wa_raft_log:log_index()}
| {last_applied, wa_raft_log:log_index()}
| {leader_name, atom() | undefined}
| {leader_id, node() | undefined}
| {pending_high, non_neg_integer()}
| {pending_low, non_neg_integer()}
| {pending_read, boolean()}
| {queued, non_neg_integer()}
| {next_indices, #{node() => wa_raft_log:log_index()}}
| {match_indices, #{node() => wa_raft_log:log_index()}}
| {log_module, module()}
| {log_first, wa_raft_log:log_index()}
| {log_last, wa_raft_log:log_index()}
| {votes, #{node() => boolean()}}
| {inflight_applies, non_neg_integer()}
| {disable_reason, string()}
| {witness, boolean()}
| {config, config()}
| {config_index, wa_raft_log:log_index()}.
%%------------------------------------------------------------------------------
%% RAFT Server - Internal Types
%%------------------------------------------------------------------------------
-type election_type() :: normal | force | allowed.
-type async_append_cancel_reason() ::
wa_raft_acceptor:common_error()
| {error, {commit_call_failed_after_submit, term()}}.
%%------------------------------------------------------------------------------
%% RAFT Server - Private Types
%%------------------------------------------------------------------------------
-type event() :: rpc() | remote(normalized_procedure()) | command() | internal_event() | timeout_type().
-type rpc() :: rpc_named() | legacy_rpc().
-type legacy_rpc() :: ?LEGACY_RAFT_RPC(atom(), wa_raft_log:log_term(), node(), undefined | tuple()).
-type rpc_named() :: ?RAFT_NAMED_RPC(atom(), wa_raft_log:log_term(), atom(), node(), undefined | tuple()).
-type command() :: commit_command() | read_command() | current_config_command() | status_command() |
trigger_election_command() | promote_command() | resign_command() | adjust_config_command() |
snapshot_available_command() | handover_candidates_command() | handover_command() |
enable_command() | disable_command() | bootstrap_command() | notify_complete_command() |
is_peer_ready_command().
-type commit_command() :: ?COMMIT_COMMAND(gen_server:from(), wa_raft_acceptor:op(), wa_raft_acceptor:priority()).
-type read_command() :: ?READ_COMMAND(wa_raft_acceptor:read_op()).
-type current_config_command() :: ?CURRENT_CONFIG_COMMAND.
-type status_command() :: ?STATUS_COMMAND.
-type trigger_election_command() :: ?TRIGGER_ELECTION_COMMAND(term_or_offset()).
-type promote_command() :: ?PROMOTE_COMMAND(term_or_offset(), boolean()).
-type resign_command() :: ?RESIGN_COMMAND.
-type adjust_config_command() :: ?ADJUST_CONFIG_COMMAND(gen_server:from() | undefined, config_action(), wa_raft_log:log_index() | undefined).
-type snapshot_available_command() :: ?SNAPSHOT_AVAILABLE_COMMAND(string(), wa_raft_log:log_pos()).
-type handover_candidates_command() :: ?HANDOVER_CANDIDATES_COMMAND.
-type handover_command() :: ?HANDOVER_COMMAND(node()).
-type enable_command() :: ?ENABLE_COMMAND.
-type disable_command() :: ?DISABLE_COMMAND(term()).
-type bootstrap_command() :: ?BOOTSTRAP_COMMAND(wa_raft_log:log_pos(), config(), dynamic()).
-type notify_complete_command() :: ?NOTIFY_COMPLETE_COMMAND().
-type is_peer_ready_command() :: ?IS_PEER_READY_COMMAND(peer()).
-type internal_event() :: advance_term_event().
-type advance_term_event() :: ?ADVANCE_TERM(wa_raft_log:log_term()).
-type timeout_type() :: election | heartbeat.
-type refresh_action() :: refresh.
-type membership_action() ::
%% Add a new peer to the cluster as a voting member or promote an existing
%% non-voting participant to a voting member.
{add, Peer :: peer()} |
%% Add a new peer or existing non-voting witness participant to the
%% cluster as a voting witness member.
{add_witness, Peer :: peer()} |
%% Add a new peer to the cluster as a non-voting participant.
{add_participant, Peer :: peer()} |
%% Promote a non-voting participant to a voting member if the participant
%% is ready. A participant is ready if it would be eligible for a handover
%% if it were a voting member.
{promote_participant_if_ready, Peer :: peer()} |
%% Remove a voting member's membership and participation or a non-voting
%% participant's participation from the cluster.
{remove, Peer :: peer()} |
%% Remove a voting witness member's membership and participation from the
%% cluster.
{remove_witness, Peer :: peer()} |
%% Demote an existing voting member to a non-voting participant or a
%% voting witness member to a non-voting witness participant.
{remove_membership, Peer :: peer()} |
%% Demote an existing voting member or non-voting participant to a voting
%% witness member or non-voting witness participant.
{demote_to_witness, Peer :: peer()}.
-type config_action() :: refresh_action() | membership_action().
%%------------------------------------------------------------------------------
%% RAFT Server - OTP Supervision
%%------------------------------------------------------------------------------
-spec child_spec(Options :: #raft_options{}) -> supervisor:child_spec().
child_spec(Options) ->
#{
id => ?MODULE,
start => {?MODULE, start_link, [Options]},
restart => transient,
shutdown => 30000,
modules => [?MODULE]
}.
-spec start_link(Options :: #raft_options{}) -> gen_statem:start_ret().
start_link(#raft_options{server_name = Name} = Options) ->
gen_statem:start_link({local, Name}, ?MODULE, Options, []).
%%------------------------------------------------------------------------------
%% RAFT Server - Public APIs - RAFT Cluster Configuration
%%------------------------------------------------------------------------------
%% Returns the version number for the latest cluster configuration format that
%% is supported by the current RAFT implementation. All cluster configurations
%% returned by methods used to create or modify cluster configurations in this
%% module will return cluster configurations of this version.
-spec latest_config_version() -> pos_integer().
latest_config_version() ->
1.
-spec get_config_version(Config :: config() | config_all()) -> pos_integer().
get_config_version(#{version := Version}) ->
Version.
-spec get_config_participants(Config :: config() | config_all()) -> [#raft_identity{}].
get_config_participants(#{version := 1, participants := Participants}) ->
[#raft_identity{name = Name, node = Node} || {Name, Node} <- Participants];
get_config_participants(Config) ->
get_config_members(Config).
-spec get_config_members(Config :: config() | config_all()) -> [#raft_identity{}].
get_config_members(#{version := 1, membership := Members}) ->
[#raft_identity{name = Name, node = Node} || {Name, Node} <- Members];
get_config_members(_) ->
[].
-spec get_config_full_members(Config :: config() | config_all()) -> [#raft_identity{}].
get_config_full_members(#{version := 1, membership := Members, witness := Witnesses}) ->
[#raft_identity{name = Name, node = Node} || {Name, Node} <- Members -- Witnesses];
get_config_full_members(Config) ->
get_config_members(Config).
-spec get_config_witness_members(Config :: config() | config_all()) -> [#raft_identity{}].
get_config_witness_members(#{version := 1, membership := Members, witness := Witnesses}) ->
MembersMap = maps:from_keys(Members, []),
[#raft_identity{name = Name, node = Node} || {Name, Node} = Witness <- Witnesses, maps:is_key(Witness, MembersMap)];
get_config_witness_members(_) ->
[].
-spec get_config_witnesses(Config :: config() | config_all()) -> [#raft_identity{}].
get_config_witnesses(#{version := 1, witness := Witnesses}) ->
[#raft_identity{name = Name, node = Node} || {Name, Node} <- Witnesses];
get_config_witnesses(_) ->
[].
-spec is_data_replica(Identity :: #raft_identity{}, Config :: config() | config_all()) -> boolean().
is_data_replica(Identity, Config) ->
lists:member(Identity, get_config_participants(Config)) andalso not lists:member(Identity, get_config_witnesses(Config)).
-spec is_witness(Identity :: #raft_identity{}, Config :: config() | config_all()) -> boolean().
is_witness(Identity, Config) ->
lists:member(Identity, get_config_witnesses(Config)).
%% Create a new cluster configuration with no members.
%% Without any members, this cluster configuration should not be used as
%% the active configuration for a RAFT cluster.
-spec make_config() -> config().
make_config() ->
#{
version => 1,
participants => [],
membership => [],
witness => []
}.
%% Create a new cluster configuration with the provided members.
-spec make_config(Members :: [#raft_identity{}]) -> config().
make_config(Members) ->
set_config_members(Members, make_config()).
%% Create a new cluster configuration with the provided members and witnesses.
-spec make_config(Members :: [#raft_identity{}], Witnesses :: [#raft_identity{}]) -> config().
make_config(Members, Witnesses) ->
set_config_members(Members, Witnesses, make_config()).
%% Create a new cluster configuration with the provided participants, members, and witnesses.
%% Any members that are not in the participants list will be included.
-spec make_config(Participants :: [#raft_identity{}], Members :: [#raft_identity{}], Witnesses :: [#raft_identity{}]) -> config().
make_config(Participants, Members, Witnesses) ->
set_config_members(Participants, Members, Witnesses, make_config()).
%% Replace the set of members in the provided cluster configuration.
%% After replacement, the set of participants will be equal to the provided list of members.
%% After replacement, no witnesses will be set.
%% Will upgrade the cluster configuration to the latest version.
-spec set_config_members(Members :: [#raft_identity{}], ConfigAll :: config() | config_all()) -> config().
set_config_members(Members, ConfigAll) ->
set_config_members(Members, [], ConfigAll).
%% Replace the set of participants, members and witnesses in the provided cluster configuration.
%% After replacement, the set of participants will be equal to the provided list of members.
%% Will upgrade the cluster configuration to the latest version.
-spec set_config_members(Members :: [#raft_identity{}], Witnesses :: [#raft_identity{}], ConfigAll :: config() | config_all()) -> config().
set_config_members(Members, Witnesses, ConfigAll) ->
set_config_members(Members, Members, Witnesses, ConfigAll).
%% Replace the set of participants, members, and witnesses in the provided cluster configuration.
%% After replacement, the set of participants will include at least all members.
%% Will upgrade the cluster configuration to the latest version.
-spec set_config_members(Participants :: [#raft_identity{}], Members :: [#raft_identity{}], Witnesses :: [#raft_identity{}], ConfigAll :: config() | config_all()) -> config().
set_config_members(Participants, Members, Witnesses, ConfigAll) ->
ParticipantPeers = lists:usort([{Name, Node} || #raft_identity{name = Name, node = Node} <- Participants]),
MemberPeers = lists:usort([{Name, Node} || #raft_identity{name = Name, node = Node} <- Members]),
WitnessPeers = lists:usort([{Name, Node} || #raft_identity{name = Name, node = Node} <- Witnesses]),
Config = normalize_config(ConfigAll),
Config#{
participants => lists:umerge(ParticipantPeers, MemberPeers),
membership => MemberPeers,
witness => WitnessPeers
}.
%% Attempt to upgrade any configuration from an older configuration version to the
%% latest configuration version if possible.
-spec normalize_config(ConfigAll :: config() | config_all()) -> Config :: config().
normalize_config(#{version := 1} = Config) ->
Membership = maps:get(membership, Config, []),
#{
version => 1,
participants => maps:get(participants, Config, Membership),
membership => Membership,
witness => maps:get(witness, Config, [])
};
normalize_config(#{version := Version}) ->
% All valid configurations will contain at least their own version; however,
% we do not know how to handle configurations with newer versions.
error({unsupported_version, Version});
normalize_config(#{}) ->
error(no_version).
%%------------------------------------------------------------------------------
%% RAFT Server - Public APIs
%%------------------------------------------------------------------------------
-spec get_current_config(Server :: gen_statem:server_ref()) -> config().
get_current_config(Server) ->
gen_statem:call(Server, ?CURRENT_CONFIG_COMMAND, ?RAFT_RPC_CALL_TIMEOUT()).
-spec status(Server :: gen_statem:server_ref()) -> status().
status(Server) ->
gen_statem:call(Server, ?STATUS_COMMAND, ?RAFT_RPC_CALL_TIMEOUT()).
-spec status
(Server :: gen_statem:server_ref(), Key :: atom()) -> Value :: dynamic();
(Server :: gen_statem:server_ref(), Keys :: [atom()]) -> Value :: [dynamic()].
status(Server, Key) when is_atom(Key) ->
hd(status(Server, [Key]));
status(Server, Keys) when is_list(Keys) ->
case status(Server) of
[_|_] = Status ->
[proplists:get_value(Key, Status, undefined) || Key <- Keys];
_ ->
lists:duplicate(length(Keys), undefined)
end.
-spec membership(Service :: gen_statem:server_ref()) -> undefined | [#raft_identity{}].
membership(Service) ->
case proplists:get_value(config, status(Service), undefined) of
undefined -> undefined;
Config -> get_config_members(Config)
end.
%%------------------------------------------------------------------------------
%% RAFT Server - Internal APIs - Local Options
%%------------------------------------------------------------------------------
%% Get the default name for the RAFT server associated with the provided
%% RAFT partition.
-spec default_name(Table :: wa_raft:table(), Partition :: wa_raft:partition()) -> Name :: atom().
default_name(Table, Partition) ->
% elp:ignore W0023 bounded atom, one per table/partition at startup
binary_to_atom(<<"raft_server_", (atom_to_binary(Table))/binary, "_", (integer_to_binary(Partition))/binary>>).
%% Get the registered name for the RAFT server associated with the provided
%% RAFT partition or the default name if no registration exists.
-spec registered_name(Table :: wa_raft:table(), Partition :: wa_raft:partition()) -> Name :: atom().
registered_name(Table, Partition) ->
case wa_raft_part_sup:options(Table, Partition) of
undefined -> default_name(Table, Partition);
Options -> Options#raft_options.server_name
end.
%%------------------------------------------------------------------------------
%% RAFT Server - Internal APIs - RPC Handling
%%------------------------------------------------------------------------------
-spec make_rpc(Self :: #raft_identity{}, Term :: wa_raft_log:log_term(), Procedure :: normalized_procedure()) -> rpc().
make_rpc(#raft_identity{name = Name, node = Node}, Term, ?PROCEDURE(Procedure, Payload)) ->
% For compatibility with legacy versions that expect RPCs sent with no arguments to have payload 'undefined' instead of {}.
PayloadOrUndefined = case Payload of
{} -> undefined;
_ -> Payload
end,
?RAFT_NAMED_RPC(Procedure, Term, Name, Node, PayloadOrUndefined).
-spec parse_rpc(Self :: #raft_identity{}, RPC :: rpc()) -> {Term :: wa_raft_log:log_term(), Sender :: #raft_identity{}, Procedure :: procedure()}.
parse_rpc(_, ?RAFT_NAMED_RPC(Key, Term, SenderName, SenderNode, PayloadOrUndefined)) ->
Payload = case PayloadOrUndefined of
undefined -> {};
_ -> PayloadOrUndefined
end,
#{Key := ?PROCEDURE(Procedure, Defaults)} = protocol(),
{Term, #raft_identity{name = SenderName, node = SenderNode}, ?PROCEDURE(Procedure, defaultize_payload(Defaults, Payload))};
parse_rpc(#raft_identity{name = Name} = Self, ?LEGACY_RAFT_RPC(Procedure, Term, SenderId, Payload)) ->
parse_rpc(Self, ?RAFT_NAMED_RPC(Procedure, Term, Name, SenderId, Payload)).
%%------------------------------------------------------------------------------
%% RAFT Server - Internal APIs - Commands
%%------------------------------------------------------------------------------
-spec commit(
Server :: gen_statem:server_ref(),
From :: gen_server:from(),
Op :: wa_raft_acceptor:op(),
Priority :: wa_raft_acceptor:priority()
) -> ok.
commit(Server, From, Op, Priority) ->
gen_statem:cast(Server, ?COMMIT_COMMAND(From, Op, Priority)).
-spec read(
Server :: gen_statem:server_ref(),
Op :: wa_raft_acceptor:read_op()
) -> ok.
read(Server, Op) ->
gen_statem:cast(Server, ?READ_COMMAND(Op)).
-spec snapshot_available(
Server :: gen_statem:server_ref(),
Root :: file:filename(),
Position :: wa_raft_log:log_pos()
) -> ok | {error, Reason :: term()}.
snapshot_available(Server, Root, Position) ->
% Use the storage call timeout because this command requires the RAFT
% server to make a potentially expensive call against the RAFT storage
% server to complete.
gen_statem:call(Server, ?SNAPSHOT_AVAILABLE_COMMAND(Root, Position), ?RAFT_STORAGE_CALL_TIMEOUT()).
-spec adjust_config(Server :: gen_statem:server_ref(), Action :: config_action()) ->
{ok, Position :: wa_raft_log:log_pos()} | {error, Reason :: term()}.
adjust_config(Server, Action) ->
adjust_config(Server, Action, undefined).
-spec adjust_config(
Server :: gen_statem:server_ref(),
Action :: config_action(),
Index :: wa_raft_log:log_index() | undefined
) -> {ok, Position :: wa_raft_log:log_pos()} | {error, Reason :: term()}.
adjust_config(Server, Action, Index) ->
gen_statem:call(Server, ?ADJUST_CONFIG_COMMAND(undefined, Action, Index), ?RAFT_RPC_CALL_TIMEOUT()).
-spec adjust_config(
Server :: gen_statem:server_ref(),
From :: gen_server:from(),
Action :: config_action(),
Index :: wa_raft_log:log_index() | undefined
) -> ok.
adjust_config(Server, From, Action, Index) ->
gen_statem:cast(Server, ?ADJUST_CONFIG_COMMAND(From, Action, Index)).
-spec adjust_membership(
Server :: gen_statem:server_ref(),
Action :: add | add_witness | remove | remove_witness,
Peer :: peer()
) -> {ok, Position :: wa_raft_log:log_pos()} | {error, Reason :: term()}.
adjust_membership(Server, Action, Peer) ->
adjust_config(Server, {Action, Peer}).
-spec adjust_membership(
Server :: gen_statem:server_ref(),
Action :: add | add_witness | remove | remove_witness,
Peer :: peer(),
ConfigIndex :: wa_raft_log:log_index() | undefined
) -> {ok, Position :: wa_raft_log:log_pos()} | {error, Reason :: term()}.
adjust_membership(Server, Action, Peer, ConfigIndex) ->
adjust_config(Server, {Action, Peer}, ConfigIndex).
-spec refresh_config(Server :: gen_statem:server_ref()) ->
{ok, Position :: wa_raft_log:log_pos()} | {error, Reason :: term()}.
refresh_config(Server) ->
adjust_config(Server, refresh).
%% Request the specified RAFT server to start an election in the next term.
-spec trigger_election(Server :: gen_statem:server_ref()) -> ok | {error, Reason :: term()}.
trigger_election(Server) ->
trigger_election(Server, current).
%% Request the specified RAFT server to trigger a new election in the term *after* the specified term.
-spec trigger_election(Server :: gen_statem:server_ref(), Term :: term_or_offset()) -> ok | {error, Reason :: term()}.
trigger_election(Server, Term) ->
gen_statem:call(Server, ?TRIGGER_ELECTION_COMMAND(Term), ?RAFT_RPC_CALL_TIMEOUT()).
%% Request the specified RAFT server to promote itself to leader of the specified term.
-spec promote(Server :: gen_statem:server_ref(), Term :: term_or_offset()) -> ok | {error, Reason :: term()}.
promote(Server, Term) ->
promote(Server, Term, false).
-spec promote(Server :: gen_statem:server_ref(), Term :: term_or_offset(), Force :: boolean()) -> ok | {error, Reason :: term()}.
promote(Server, Term, Force) ->
gen_statem:call(Server, ?PROMOTE_COMMAND(Term, Force), ?RAFT_RPC_CALL_TIMEOUT()).
-spec resign(Server :: gen_statem:server_ref()) -> ok | {error, Reason :: term()}.
resign(Server) ->
gen_statem:call(Server, ?RESIGN_COMMAND, ?RAFT_RPC_CALL_TIMEOUT()).
%% Instruct a RAFT leader to attempt a handover to a random handover candidate.
-spec handover(Server :: gen_statem:server_ref()) -> ok.
handover(Server) ->
gen_statem:cast(Server, ?HANDOVER_COMMAND(undefined)).
%% Instruct a RAFT leader to attempt a handover to the specified peer node.
%% If an `undefined` peer node is specified, then handover to a random handover candidate.
%% Returns which peer node the handover was sent to or otherwise an error.
-spec handover(Server :: gen_statem:server_ref(), Peer :: node() | undefined) -> {ok, Peer :: node()} | {error, Reason :: term()}.
handover(Server, Peer) ->
gen_statem:call(Server, ?HANDOVER_COMMAND(Peer), ?RAFT_RPC_CALL_TIMEOUT()).
-spec handover_candidates(Server :: gen_statem:server_ref()) -> {ok, Candidates :: [node()]} | {error, Reason :: term()}.
handover_candidates(Server) ->
gen_statem:call(Server, ?HANDOVER_CANDIDATES_COMMAND, ?RAFT_RPC_CALL_TIMEOUT()).
-spec is_peer_ready(Server :: gen_statem:server_ref(), Peer :: peer()) -> ok | {error, Reason :: term()}.
is_peer_ready(Server, Peer) ->
gen_statem:call(Server, ?IS_PEER_READY_COMMAND(Peer), ?RAFT_RPC_CALL_TIMEOUT()).
-spec disable(Server :: gen_statem:server_ref(), Reason :: term()) -> ok | {error, ErrorReason :: atom()}.
disable(Server, Reason) ->
gen_statem:call(Server, ?DISABLE_COMMAND(Reason), ?RAFT_RPC_CALL_TIMEOUT()).
-spec enable(Server :: gen_statem:server_ref()) -> ok | {error, ErrorReason :: atom()}.
enable(Server) ->
gen_statem:call(Server, ?ENABLE_COMMAND, ?RAFT_RPC_CALL_TIMEOUT()).
-spec bootstrap(
Server :: gen_statem:server_ref(),
Position :: wa_raft_log:log_pos(),
Config :: config(),
Data :: dynamic()
) -> ok | {error, Reason :: term()}.
bootstrap(Server, Position, Config, Data) ->
gen_statem:call(Server, ?BOOTSTRAP_COMMAND(Position, Config, Data), ?RAFT_STORAGE_CALL_TIMEOUT()).
-spec notify_complete(Server :: gen_statem:server_ref()) -> ok.
notify_complete(Server) ->
gen_statem:cast(Server, ?NOTIFY_COMPLETE_COMMAND()).
%%------------------------------------------------------------------------------
%% RAFT Server - State Machine Implementation - Logging
%%------------------------------------------------------------------------------
-spec require_valid_state(state()) -> state().
require_valid_state(State) ->
State.
%%------------------------------------------------------------------------------
%% RAFT Server - State Machine Implementation - General Callbacks
%%------------------------------------------------------------------------------
-spec init(Options :: #raft_options{}) -> gen_statem:init_result(state()).
init(
#raft_options{
application = Application,
table = Table,
partition = Partition,
self = Self,
identifier = Identifier,
database = PartitionPath,
label_module = LabelModule,
distribution_module = DistributionModule,
log_name = Log,
server_name = Name,
storage_name = Storage
} = Options
) ->
process_flag(trap_exit, true),
?RAFT_LOG_NOTICE("Server[~0p] starting with options ~0p", [Name, Options]),
% This increases the potential overhead of sending messages to server;
% however, can protect the server from GC overhead
% and other memory-related issues (most notably when receiving log entries
% when undergoing a fast log catchup).
?RAFT_CONFIG(raft_server_message_queue_off_heap, true) andalso
process_flag(message_queue_data, off_heap),
% Open storage and the log. Both providers can reject startup on
% recoverable corruption or unsafe disk state, so keep those as normal
% start failures instead of crashing through a bad pattern match.
case profile_startup_phase(Application, Partition, server_open_storage_and_log, fun() ->
open_storage_and_log(Application, Partition, Storage, Log)
end) of
{ok, Last, View} ->
Now = erlang:monotonic_time(millisecond),
State0 = #raft_state{
application = Application,
name = Name,
self = Self,
identifier = Identifier,
table = Table,
partition = Partition,
partition_path = PartitionPath,
log_view = View,
queues = wa_raft_queue:queues(Options),
label_module = LabelModule,
last_label = undefined,
distribution_module = DistributionModule,
storage = Storage,
commit_index = Last#raft_log_pos.index,
last_applied = Last#raft_log_pos.index,
current_term = Last#raft_log_pos.term,
state_start_ts = Now
},
State1 = load_config(State0),
rand:seed(exsp, {erlang:monotonic_time(), erlang:time_offset(), erlang:unique_integer()}),
% TODO T246543655 When we have proper error handling for data corruption
% vs. stalled server then handle {error, Reason} type
% returns from load_state.
State2 = case profile_startup_phase(Application, Partition, server_durable_state_load, fun() ->
wa_raft_durable_state:load(State1)
end) of
{ok, NewState} -> NewState;
_ -> State1
end,
% 1. Begin as disabled if a disable reason is set
% 2. Begin as stalled if there is no data
% 3. Begin as witness if configured
% 4. Begin as follower otherwise
InitialState = case State2 of
#raft_state{disable_reason = undefined, last_applied = 0} ->
case bootstrapped_log_or_storage(State2) of
true -> follower_or_witness(State2);
false -> stalled
end;
#raft_state{disable_reason = undefined} -> follower_or_witness(State2);
_ -> disabled
end,
wa_raft_info:set_current_term_info(Table, Partition, State2#raft_state.current_term, undefined, undefined),
wa_raft_info:set_status(Table, Partition, InitialState, false, true, true),
{ok, InitialState, State2};
{error, Reason} ->
?RAFT_LOG_ERROR("Server[~0p] failed to open storage/log due to ~0P", [Name, Reason, 30]),
{stop, Reason}
end.
-spec open_storage_and_log(
Application :: atom(),
Partition :: wa_raft:partition(),
Storage :: gen_server:server_ref(),
Log :: gen_server:server_ref()
) ->
{ok, wa_raft_log:log_pos(), wa_raft_log:view()} | {error, term()}.
open_storage_and_log(Application, Partition, Storage, Log) ->
case profile_startup_phase(Application, Partition, server_storage_open, fun() ->
wa_raft_storage:open(Storage)
end) of
{ok, Last} ->
case profile_startup_phase(Application, Partition, server_log_open, fun() ->
wa_raft_log:open(Log, Last)
end) of
{ok, View} -> {ok, Last, View};
{error, Reason} -> {error, Reason}
end;
{error, Reason} ->
{error, Reason}
end.
-spec bootstrapped_log_or_storage(State :: #raft_state{}) -> boolean().
bootstrapped_log_or_storage(State) ->
config_index(State) > 0.
-spec callback_mode() -> gen_statem:callback_mode_result().
callback_mode() ->
[state_functions, state_enter].
-spec terminate(Reason :: term(), State :: state(), Data :: #raft_state{}) -> ok.
terminate(Reason, State, #raft_state{name = Name, table = Table, partition = Partition, handover = Handover} = Data0) ->
?SERVER_LOG_NOTICE(State, Data0, "terminating due to ~0P", [Reason, 20]),
CancelReason =
case Handover of
{Peer, _, _} -> {error, {notify_redirect, Peer}};
undefined -> {error, not_leader}
end,
Data1 = cancel_async_append_in_flight(State, CancelReason, Data0),
cancel_pending_and_queued(CancelReason, Data1),
wa_raft_durable_state:sync(Data0),
wa_raft_info:clear(Table, Partition, Name),
ok.
%%------------------------------------------------------------------------------
%% RAFT Server - State Machine Implementation - Procedure Call Marshalling
%%------------------------------------------------------------------------------
%% A macro that destructures the identity record indicating that the
%% relevant procedure should be refactored to treat identities
%% opaquely.
-define(IDENTITY_REQUIRES_MIGRATION(Name, Node), #raft_identity{name = Name, node = Node}).
-type remote(Call) :: ?REMOTE(#raft_identity{}, Call).
-type procedure() :: ?PROCEDURE(atom(), tuple()).
-type normalized_procedure() :: append_entries() | append_entries_response() | request_pre_vote() | pre_vote() | request_vote() | vote() | handover() | handover_failed() | notify_term().
-type append_entries() :: ?APPEND_ENTRIES (wa_raft_log:log_index(), wa_raft_log:log_term(), [wa_raft_log:log_entry() | binary()], wa_raft_log:log_index(), wa_raft_log:log_index()).
-type append_entries_response() :: ?APPEND_ENTRIES_RESPONSE(wa_raft_log:log_index(), boolean(), wa_raft_log:log_index(), wa_raft_log:log_index() | undefined).
-type request_pre_vote() :: ?REQUEST_PRE_VOTE (reference()).
-type pre_vote() :: ?PRE_VOTE (reference(), boolean(), wa_raft_log:log_index(), wa_raft_log:log_term()).
-type request_vote() :: ?REQUEST_VOTE (election_type(), wa_raft_log:log_index(), wa_raft_log:log_term()).
-type vote() :: ?VOTE (boolean()).
-type handover() :: ?HANDOVER (reference(), wa_raft_log:log_index(), wa_raft_log:log_term(), [wa_raft_log:log_entry() | binary()]).
-type handover_failed() :: ?HANDOVER_FAILED (reference()).
-type notify_term() :: ?NOTIFY_TERM ().
-spec protocol() -> #{atom() => procedure()}.
protocol() ->
#{
?APPEND_ENTRIES => ?APPEND_ENTRIES(0, 0, [], 0, 0),
?APPEND_ENTRIES_RESPONSE => ?APPEND_ENTRIES_RESPONSE(0, false, 0, undefined),
?REQUEST_PRE_VOTE => ?REQUEST_PRE_VOTE(undefined),
?PRE_VOTE => ?PRE_VOTE(undefined, false, 0, 0),
?REQUEST_VOTE => ?REQUEST_VOTE(normal, 0, 0),
?VOTE => ?VOTE(false),
?HANDOVER => ?HANDOVER(undefined, 0, 0, []),
?HANDOVER_FAILED => ?HANDOVER_FAILED(undefined)
}.
-spec handle_rpc(
Type :: gen_statem:event_type(),
RPC :: rpc(),
State :: state(),
Data :: #raft_state{}
) -> gen_statem:event_handler_result(state(), #raft_state{}).
handle_rpc(Type, ?RAFT_NAMED_RPC(Procedure, Term, SenderName, SenderNode, Payload) = Event, State, #raft_state{} = Data) ->
handle_rpc_impl(Type, Event, Procedure, Term, #raft_identity{name = SenderName, node = SenderNode}, Payload, State, Data);
handle_rpc(Type, ?LEGACY_RAFT_RPC(Procedure, Term, SenderId, Payload) = Event, State, #raft_state{name = Name} = Data) ->
handle_rpc_impl(Type, Event, Procedure, Term, #raft_identity{name = Name, node = SenderId}, Payload, State, Data);
handle_rpc(_, RPC, State, #raft_state{table = Table} = Data) ->
?RAFT_COUNT(Table, {'rpc.unrecognized', State}),
?SERVER_LOG_NOTICE(State, Data, "receives unknown RPC format ~0P", [RPC, 20]),
keep_state_and_data.
-spec handle_rpc_impl(
Type :: gen_statem:event_type(),
Event :: rpc(),
Key :: atom(),
Term :: wa_raft_log:log_term(),
Sender :: #raft_identity{},
Payload :: undefined | tuple(),
State :: state(),
Data :: #raft_state{}
) -> gen_statem:event_handler_result(state(), #raft_state{}).
%% [Protocol] Undefined payload should be treated as an empty tuple
handle_rpc_impl(Type, Event, Key, Term, Sender, undefined, State, Data) ->
handle_rpc_impl(Type, Event, Key, Term, Sender, {}, State, Data);
%% [General Rules] Discard any incoming RPCs with a term older than the current term
handle_rpc_impl(_, _, Key, Term, Sender, _, State, #raft_state{current_term = CurrentTerm} = Data) when Term < CurrentTerm ->
?SERVER_LOG_NOTICE(State, Data, "dropping stale ~0p from ~0p with old term ~0p.", [Key, Sender, Term]),
State =/= disabled andalso send_rpc(Sender, ?NOTIFY_TERM(), Data),
keep_state_and_data;
%% [PreVote RPC] Pre-vote requests are processed without checking term
handle_rpc_impl(Type, _, ?REQUEST_PRE_VOTE, _, Sender, Payload, State, Data) ->
handle_rpc_normalization(Type, ?REQUEST_PRE_VOTE, Sender, Payload, State, Data);
%% [RequestVote RPC] RAFT servers should ignore vote requests with reason `normal`
%% if it knows about a currently active leader even if the vote
%% request has a newer term. A leader is only active if it is
%% replicating to peers so we check if we have recently received
%% a heartbeat. (4.2.3)
handle_rpc_impl(Type, Event, ?REQUEST_VOTE, Term, Sender, Payload, State, #raft_state{table = Table} = Data) when is_tuple(Payload), element(1, Payload) =:= normal ->
case is_leader_missing(Data) of
{true, _, _} ->
% We have not gotten a heartbeat from the leader recently so allow this vote request
% to go through by reraising it with the special 'allowed' election type.
handle_rpc_impl(Type, Event, ?REQUEST_VOTE, Term, Sender, setelement(1, Payload, allowed), State, Data);
{false, Delay, AllowedDelay} ->
% We have gotten a heartbeat recently so drop this vote request.
% Log this at debug level because we may end up with a lot of these when we have
% removed a server from the cluster but not yet shut it down.
?RAFT_COUNT(Table, 'server.request_vote.drop'),
?SERVER_LOG_DEBUG(
State,
Data,
"rejecting normal vote request from ~p because leader was still active ~p ms ago (allowed ~p ms).",
[Sender, Delay, AllowedDelay]
),
send_rpc(Sender, Term, ?VOTE(false), Data),
keep_state_and_data
end;
%% [General Rules] Advance to the newer term and reset state when seeing a newer term in an incoming RPC
handle_rpc_impl(Type, Event, _, Term, _, _, _, #raft_state{current_term = CurrentTerm}) when Term > CurrentTerm ->
{keep_state_and_data, [{next_event, internal, ?ADVANCE_TERM(Term)}, {next_event, Type, Event}]};
%% [NotifyTerm RPC] Drop NotifyTerm RPCs with matching term
handle_rpc_impl(_, _, ?NOTIFY_TERM, _, _, _, _, #raft_state{}) ->
keep_state_and_data;
%% [Protocol] Continue with normalization
handle_rpc_impl(Type, _, Key, _, Sender, Payload, State, Data) when is_tuple(Payload) ->
handle_rpc_normalization(Type, Key, Sender, Payload, State, Data).
-spec handle_rpc_normalization(
Type :: gen_statem:event_type(),
Key :: atom(),
Sender :: #raft_identity{},
Payload :: undefined | tuple(),
State :: state(),
Data :: #raft_state{}
) -> gen_statem:event_handler_result(state(), #raft_state{}).
%% [Protocol] Convert any valid remote procedure call to the appropriate local procedure call.
handle_rpc_normalization(Type, Key, Sender, Payload, State, #raft_state{table = Table} = Data) when is_tuple(Payload) ->
case protocol() of
#{Key := ?PROCEDURE(Procedure, Defaults)} ->
handle_procedure(Type, ?REMOTE(Sender, ?PROCEDURE(Procedure, defaultize_payload(Defaults, Payload))), State, Data);
#{} ->
?RAFT_COUNT(Table, {'rpc.unknown', State}),
?SERVER_LOG_DEBUG(State, Data, "receives unknown RPC type ~0p with payload ~0P", [Key, Payload, 25]),
keep_state_and_data
end.
-spec handle_procedure(
Type :: gen_statem:event_type(),
ProcedureCall :: remote(procedure()),
State :: state(),
Data :: #raft_state{}
) -> gen_statem:event_handler_result(state(), #raft_state{}).
%% [AppendEntries RPC] If we haven't discovered leader for this term, record it
handle_procedure(Type, ?REMOTE(Sender, ?APPEND_ENTRIES(_, _, _, _, _)) = Procedure, State, #raft_state{leader = undefined} = Data) ->
?SERVER_LOG_NOTICE(State, Data, "leader is now ~0p.", [identity_to_server(Sender)]),
NewData = Data#raft_state{leader = Sender},
update_current_term_info(State, NewData),
{keep_state, NewData, {next_event, Type, Procedure}};
%% [Handover][Handover RPC] If we haven't discovered leader for this term, record it
handle_procedure(Type, ?REMOTE(Sender, ?HANDOVER(_, _, _, _)) = Procedure, State, #raft_state{leader = undefined} = Data) ->
?SERVER_LOG_NOTICE(State, Data, "leader is now ~0p.", [identity_to_server(Sender)]),
NewData = Data#raft_state{leader = Sender},
update_current_term_info(State, NewData),
{keep_state, NewData, {next_event, Type, Procedure}};
handle_procedure(Type, Procedure, _, #raft_state{}) ->
{keep_state_and_data, {next_event, Type, Procedure}}.
-spec defaultize_payload(Defaults :: tuple(), Payload :: tuple()) -> tuple().
defaultize_payload(Defaults, Payload) ->
defaultize_payload(Defaults, Payload, tuple_size(Defaults), tuple_size(Payload)).
-spec defaultize_payload(tuple(), tuple(), non_neg_integer(), non_neg_integer()) -> tuple().
defaultize_payload(_, Payload, N, N) ->
Payload;
defaultize_payload(Defaults, Payload, N, M) when N > M ->
defaultize_payload(Defaults, erlang:insert_element(M + 1, Payload, element(M + 1, Defaults)), N, M + 1);
defaultize_payload(Defaults, Payload, N, M) when N < M ->
defaultize_payload(Defaults, erlang:delete_element(M, Payload), N, M - 1).
%%------------------------------------------------------------------------------
%% RAFT Server - State Machine Implementation - Stalled State
%%------------------------------------------------------------------------------
%% The stalled state is an extension to the RAFT protocol designed to handle
%% situations in which a replica of the FSM is lost or replaced within a RAFT
%% cluster without being removed from the cluster membership. As the replica of
%% the FSM stored before the machine was lost or replaced could have been a
%% critical member of a quorum, it is important to ensure that the replacement
%% does not support a different result for any quorums before it receives a
%% fresh copy of the FSM state and log that is guaranteed to reflect any
%% quorums that the machine supported before it was lost or replaced.
%%
%% This is achieved by preventing a stalled node from participating in quorum
%% for both log entries and election. A leader of the cluster must provide a
%% fresh copy of its FSM state before the stalled node can return to normal
%% operation.
%%------------------------------------------------------------------------------
-spec stalled
(
Type :: enter,
LastState :: state(),
Data :: #raft_state{}
) -> gen_statem:state_enter_result(state(), #raft_state{});
(
Type :: gen_statem:event_type(),
Event :: event(),
Data :: #raft_state{}
) -> gen_statem:event_handler_result(state(), #raft_state{}).
stalled(enter, LastState, #raft_state{table = Table} = State) ->
?RAFT_COUNT(Table, 'stalled.enter'),
?FUNCTION_NAME =/= LastState andalso
?SERVER_LOG_NOTICE(State, "becomes stalled from state ~0p.", [LastState]),
{keep_state, enter_state(?FUNCTION_NAME, State)};
%% [Internal] Advance to newer term when requested
stalled(internal, ?ADVANCE_TERM(NewTerm), #raft_state{table = Table, current_term = CurrentTerm} = State) when NewTerm > CurrentTerm ->
?RAFT_COUNT(Table, 'stalled.advance_term'),
?SERVER_LOG_NOTICE(State, "advancing to new term ~0p.", [NewTerm]),
{repeat_state, advance_term(?FUNCTION_NAME, NewTerm, undefined, State)};
%% [Protocol] Parse any RPCs in network formats
stalled(Type, Event, #raft_state{} = State) when is_tuple(Event), element(1, Event) =:= rpc ->
handle_rpc(Type, Event, ?FUNCTION_NAME, State);
%% [AppendEntries RPC] Stalled nodes always discard heartbeats
stalled(_, ?REMOTE(Sender, ?APPEND_ENTRIES(PrevLogIndex, _, _, _, _)), #raft_state{} = State) ->
NewState = State#raft_state{last_quorum_ts = erlang:monotonic_time(millisecond)},
send_rpc(Sender, ?APPEND_ENTRIES_RESPONSE(PrevLogIndex, false, 0, 0), NewState),
{keep_state, NewState};
stalled({call, From}, ?TRIGGER_ELECTION_COMMAND(_), #raft_state{} = State) ->
?SERVER_LOG_WARNING(State, "cannot start an election.", []),
{keep_state_and_data, {reply, From, {error, invalid_state}}};
stalled({call, From}, ?PROMOTE_COMMAND(_, _), #raft_state{} = State) ->
?SERVER_LOG_WARNING(State, "cannot be promoted to leader.", []),
{keep_state_and_data, {reply, From, {error, invalid_state}}};
stalled(
{call, From},
?BOOTSTRAP_COMMAND(#raft_log_pos{index = Index, term = Term} = Position, Config, Data),
#raft_state{
self = Self,
partition_path = PartitionPath,
storage = Storage,
current_term = CurrentTerm,
last_applied = LastApplied
} = State0
) ->
case LastApplied =:= 0 of
true ->
?SERVER_LOG_NOTICE(State0, "attempting bootstrap at ~0p:~0p with config ~0p and data ~0P.", [Index, Term, Config, Data, 30]),
Path = filename:join(PartitionPath, io_lib:format("snapshot.~0p.~0p.bootstrap.tmp", [Index, Term])),
try
ok = wa_raft_storage:make_empty_snapshot(Storage, Path, Position, Config, Data),
State1 = open_snapshot(Path, Position, State0),
AdjustedTerm = max(1, Term),
case AdjustedTerm > CurrentTerm of
true ->
case is_single_member(Self, config(State1)) of
true ->
State2 = advance_term(?FUNCTION_NAME, AdjustedTerm, node(), State1),
?SERVER_LOG_NOTICE(State2, "switching to leader as sole member after successful bootstrap.", []),
{next_state, leader, State2, {reply, From, ok}};
false ->
State2 = advance_term(?FUNCTION_NAME, AdjustedTerm, undefined, State1),
?SERVER_LOG_NOTICE(State2, "switching to follower after successful bootstrap.", []),
{next_state, follower_or_witness(State2), State2, {reply, From, ok}}
end;
false ->
?SERVER_LOG_NOTICE(State1, "switching to follower after successful bootstrap.", []),
{next_state, follower_or_witness(State1), State1, {reply, From, ok}}
end
catch
_:Reason ->
?SERVER_LOG_WARNING(State0, "failed to bootstrap due to ~0P.", [Reason, 20]),
{keep_state_and_data, {reply, From, {error, Reason}}}
after
try file:del_dir_r(Path)
catch _:_ -> ok
end
end;
false ->
?SERVER_LOG_NOTICE(State0, "at ~0p rejecting request to bootstrap with data.", [LastApplied]),
{keep_state_and_data, {reply, From, {error, rejected}}}
end;
stalled(
Type,
?SNAPSHOT_AVAILABLE_COMMAND(Root, #raft_log_pos{index = SnapshotIndex, term = SnapshotTerm} = SnapshotPos),
#raft_state{
current_term = CurrentTerm,
last_applied = LastApplied
} = State0
) ->
case SnapshotIndex > LastApplied orelse LastApplied =:= 0 of
true ->
try
?SERVER_LOG_NOTICE(State0, "applying snapshot at ~0p:~0p.", [SnapshotIndex, SnapshotTerm]),
State1 = open_snapshot(Root, SnapshotPos, State0),
State2 = case SnapshotTerm > CurrentTerm of
true -> advance_term(?FUNCTION_NAME, SnapshotTerm, undefined, State1);
false -> State1
end,
% At this point, we assume that we received some cluster membership configuration from
% our peer so it is safe to transition to an operational state.
reply(Type, ok),
{next_state, follower_or_witness(State2), State2}
catch
_:Reason ->
?SERVER_LOG_WARNING(State0, "failed to load available snapshot ~0p due to ~0P", [Root, Reason, 20]),
reply(Type, {error, Reason}),
keep_state_and_data
end;
false ->
?SERVER_LOG_NOTICE(State0, "at ~0p ignoring old snapshot at ~0p:~0p", [LastApplied, SnapshotIndex, SnapshotTerm]),
reply(Type, {error, rejected}),
keep_state_and_data
end;
%% [Command] Defer to common handling for generic RAFT server commands
stalled(Type, ?RAFT_COMMAND(_, _) = Event, #raft_state{} = State) ->
command(?FUNCTION_NAME, Type, Event, State);
%% [Async Log Append]
%% A stale completion can arrive after a leader stepped down. Since the client
%% request is cancelled, rollback any local append bytes so they cannot replay
%% as an unacknowledged write after restart.
stalled(info, {async_log_append_complete, Ref, Result}, #raft_state{} = State0) ->
State1 = complete_async_log_append_after_leadership_loss(?FUNCTION_NAME, Ref, Result, State0),
{keep_state, State1};
stalled(info, {'DOWN', MonitorRef, process, WorkerPid, Reason}, #raft_state{} = State0) ->
State1 = complete_async_log_append_down(?FUNCTION_NAME, MonitorRef, WorkerPid, Reason, State0),
{keep_state, State1};
%% [Fallback] Report unhandled events
stalled(Type, Event, #raft_state{} = State) ->
?SERVER_LOG_WARNING(State, "did not know how to handle ~0p event ~0P", [Type, Event, 20]),
keep_state_and_data.
%%------------------------------------------------------------------------------
%% RAFT Server - State Machine Implementation - Leader State
%%------------------------------------------------------------------------------
%% In a RAFT cluster, the leader of a RAFT term is a replica that has received
%% a quorum of votes from the cluster in that RAFT term, establishing it as the
%% unique coordinator for that RAFT term. The leader is responsible for
%% accepting and replicating new log entries to progress the state of the FSM.
%%------------------------------------------------------------------------------
-spec leader
(
Type :: enter,
LastState :: state(),
Data :: #raft_state{}
) -> gen_statem:state_enter_result(state(), #raft_state{});
(
Type :: gen_statem:event_type(),
Event :: event(),
Data :: #raft_state{}
) -> gen_statem:event_handler_result(state(), #raft_state{}).
leader(enter, LastState, #raft_state{table = Table, log_view = View} = State0) ->
?RAFT_COUNT(Table, 'leader.enter'),
?FUNCTION_NAME =/= LastState andalso
?SERVER_LOG_NOTICE(State0, "becomes leader from state ~0p.", [LastState]),
% Setup leader state
State1 = enter_state(?FUNCTION_NAME, State0),
% Attempt to refresh the label state as necessary for new log entries
LastLogIndex = wa_raft_log:last_index(View),
State2 = case wa_raft_log:get(View, LastLogIndex) of
{ok, {_, {_, LastLabel, _}}} ->
State1#raft_state{last_label = LastLabel};
{ok, {_, undefined}} ->
% The RAFT log could have been reset (i.e. after snapshot installation).
% In such case load the log label state from storage.
LastLabel = load_label_state(State1),
State1#raft_state{last_label = LastLabel};
{ok, _} ->
State1#raft_state{last_label = undefined}
end,
% At the start of a new term, the leader should append at least one log
% entry to start the process of establishing the first quorum in the new
% term.
State3 = State2#raft_state{first_current_term_log_index = LastLogIndex + 1},
State4 = case has_pending_commits(State3) of
true ->
% If there are pending commits, then use those for the initial
% heartbeat. There is potential risk that the log may reject
% these initial commits; leaving the leader with no log entry
% in the current term; however, this should be a rare occurrence.
State3;
false ->
% Otherwise, we should explicitly add a `noop` so that there is
% something to establish quorum on.
{LogEntry, NewState3} = make_log_entry({make_ref(), noop}, State3),
{ok, NewView} = wa_raft_log:append(View, [LogEntry]),
NewState3#raft_state{log_view = NewView}
end,
% Perform initial heartbeat and log entry resolution
State5 = append_entries_to_followers(State4),
State6 = update_quorum_ts(State5),
State7 = apply_single_node_cluster(State6),
{keep_state, State7, ?HEARTBEAT_TIMEOUT(State7)};
%% [Internal] Advance to newer term when requested
leader(
internal,
?ADVANCE_TERM(NewTerm),
#raft_state{table = Table, current_term = CurrentTerm} = State0
) when NewTerm > CurrentTerm ->
?RAFT_COUNT(Table, 'leader.advance_term'),
?SERVER_LOG_NOTICE(State0, "advancing to new term ~0p.", [NewTerm]),
State1 = advance_term(?FUNCTION_NAME, NewTerm, undefined, State0),
{next_state, follower_or_witness(State1), State1};
%% [Protocol] Parse any RPCs in network formats
leader(Type, Event, #raft_state{} = State) when is_tuple(Event), element(1, Event) =:= rpc ->
handle_rpc(Type, Event, ?FUNCTION_NAME, State);
%% [AppendEntries RPC] Leaders should not act upon any incoming heartbeats (5.1, 5.2)
leader(_, ?REMOTE(_, ?APPEND_ENTRIES(_, _, _, _, _)), #raft_state{}) ->
keep_state_and_data;
%% [Leader] Handle AppendEntries RPC responses (5.2, 5.3, 7).
%% Handle normal-case successes
leader(
cast,
?REMOTE(
?IDENTITY_REQUIRES_MIGRATION(_, FollowerId) = Sender,
?APPEND_ENTRIES_RESPONSE(_, true, FollowerMatchIndex, FollowerLastAppliedIndex)
),
#raft_state{
table = Table,
commit_index = CommitIndex,
next_indices = NextIndices,
match_indices = MatchIndices,
last_applied_indices = LastAppliedIndices,
first_current_term_log_index = TermStartIndex
} = State0
) ->
StartT = erlang:monotonic_time(microsecond),
?SERVER_LOG_DEBUG(State0, "at commit index ~0p completed append to ~0p whose log now matches up to ~0p.",
[CommitIndex, Sender, FollowerMatchIndex]),
State1 = update_heartbeat_reply_ts(FollowerId, State0),
NextIndex = maps:get(FollowerId, NextIndices, TermStartIndex),
NewMatchIndices = MatchIndices#{FollowerId => FollowerMatchIndex},
NewNextIndices = NextIndices#{FollowerId => max(NextIndex, FollowerMatchIndex + 1)},
NewLastAppliedIndices = case FollowerLastAppliedIndex of
undefined -> LastAppliedIndices;
_ -> LastAppliedIndices#{FollowerId => FollowerLastAppliedIndex}
end,
State2 = State1#raft_state{
next_indices = NewNextIndices,
match_indices = NewMatchIndices,
last_applied_indices = NewLastAppliedIndices
},
State3 = leader_advance_commit_index(State2),
State4 = leader_apply_log(State3),
?RAFT_GATHER(Table, 'leader.apply.func', erlang:monotonic_time(microsecond) - StartT),
{keep_state, maybe_heartbeat(State4), ?HEARTBEAT_TIMEOUT(State4)};
%% and failures.
leader(
cast,
?REMOTE(
?IDENTITY_REQUIRES_MIGRATION(_, FollowerId) = Sender,
?APPEND_ENTRIES_RESPONSE(_, false, FollowerEndIndex, FollowerLastAppliedIndex)
),
#raft_state{
table = Table,
commit_index = CommitIndex,
next_indices = NextIndices,
last_applied_indices = LastAppliedIndices
} = State0
) ->
?RAFT_COUNT(Table, 'leader.append.failure'),
?SERVER_LOG_DEBUG(State0, "at commit index ~0p failed append to ~0p whose log now ends at ~0p.",
[CommitIndex, Sender, FollowerEndIndex]),
State1 = update_heartbeat_reply_ts(FollowerId, State0),
% Check to see if we should request a snapshot due to this failure
leader_maybe_request_snapshot(FollowerId, FollowerEndIndex, FollowerLastAppliedIndex, State1),
% We must trust the follower's last log index here because the follower may have
% applied a snapshot since the last successful heartbeat. In such case, we need
% to fast-forward the follower's next index so that we resume replication at the
% point after the snapshot.
NewNextIndices = NextIndices#{FollowerId => FollowerEndIndex + 1},
NewLastAppliedIndices = case FollowerLastAppliedIndex of
undefined -> LastAppliedIndices;
_ -> LastAppliedIndices#{FollowerId => FollowerLastAppliedIndex}
end,
State2 = State1#raft_state{
next_indices = NewNextIndices,
last_applied_indices = NewLastAppliedIndices
},
State3 = leader_apply_log(State2),
{keep_state, maybe_heartbeat(State3), ?HEARTBEAT_TIMEOUT(State3)};
%% [RequestPreVote RPC] Respond to pre-vote
leader(_, ?REMOTE(Candidate, ?REQUEST_PRE_VOTE(Ref)), #raft_state{} = Data) ->
handle_request_pre_vote(Candidate, Ref, Data);
%% [PreVote RPC] Pre-votes are only useful to candidates
leader(_, ?REMOTE(_, ?PRE_VOTE(_, _, _, _)), #raft_state{}) ->
keep_state_and_data;
%% [RequestVote RPC] Reject any vote requests as leadership is already established (5.1, 5.2)
leader(_, ?REMOTE(Sender, ?REQUEST_VOTE(_, _, _)), #raft_state{} = Data) ->
send_rpc(Sender, ?VOTE(false), Data),
keep_state_and_data;
%% [Vote RPC] Votes are only useful to candidates
leader(_, ?REMOTE(_, ?VOTE(_)), #raft_state{}) ->
keep_state_and_data;
%% [Handover][Handover RPC] We are already leader so ignore any handover requests.
leader(_, ?REMOTE(Sender, ?HANDOVER(Reference, _, _, _)), #raft_state{} = State) ->
send_rpc(Sender, ?HANDOVER_FAILED(Reference), State),
keep_state_and_data;
%% [Handover][HandoverFailed RPC] Our handover failed, so clear the handover status.
leader(
_,
?REMOTE(?IDENTITY_REQUIRES_MIGRATION(_, NodeId) = Sender, ?HANDOVER_FAILED(Reference)),
#raft_state{
handover = {NodeId, Reference, _}
} = State
) ->
?SERVER_LOG_NOTICE(State, "resuming normal operations after failed handover to ~0p.", [Sender]),
{keep_state, State#raft_state{handover = undefined}};
%% [Handover][HandoverFailed RPC] Got a handover failed with an unknown ID. Ignore.
leader(_, ?REMOTE(_, ?HANDOVER_FAILED(_)), #raft_state{}) ->
keep_state_and_data;
%% [Timeout] Suspend periodic heartbeat to followers while handover is active
leader(state_timeout = Type, Event, #raft_state{handover = {Peer, _, Timeout}} = State) ->
NowMillis = erlang:monotonic_time(millisecond),
case NowMillis > Timeout of
true ->
?SERVER_LOG_NOTICE(State, "handover to ~0p times out.", [Peer]),
{keep_state, State#raft_state{handover = undefined}, {next_event, Type, Event}};
false ->
update_status(?FUNCTION_NAME, State),
{keep_state_and_data, ?HEARTBEAT_TIMEOUT(State)}
end;
%% [Timeout] Periodic heartbeat to followers
leader(state_timeout, _, #raft_state{table = Table} = State0) ->
case leader_eligible(State0) of
eligible ->
State1 = append_entries_to_followers(State0),
State2 = apply_single_node_cluster(State1),
update_status(?FUNCTION_NAME, State2),
{keep_state, State2, ?HEARTBEAT_TIMEOUT(State2)};
Reason ->
?RAFT_COUNT(Table, {'leader.resign', Reason}),
?SERVER_LOG_NOTICE(State0, "resigns from leadership because this node is ~0p.", [Reason]),
{next_state, follower_or_witness(State0), State0}
end;
%% [Commit]
%% If a handover is in progress, reject the commit immediately with a
%% notify_redirect error so the client can redirect to the new leader.
%% Otherwise, add the commit to the pending list and append if enough
%% have accumulated.
leader(
cast,
?COMMIT_COMMAND(From, _Op, Priority),
#raft_state{
table = Table,
queues = Queues,
handover = {Peer, _, _}
} = _State
) ->
?RAFT_COUNT(Table, 'commit.rejected.handover'),
wa_raft_queue:commit_cancelled(Queues, From, {error, {notify_redirect, Peer}}, Priority),
keep_state_and_data;
leader(
cast,
?COMMIT_COMMAND(From, Op, Priority),
#raft_state{application = App, table = Table} = State0
) ->
% No size limit is imposed here as the pending queue cannot grow larger
% than the limit on the number of pending commits.
?RAFT_COUNT(Table, {'commit', Priority}),
HadPending = has_pending_commits(State0),
State1 = add_pending(From, Op, Priority, State0),
% Single-member leaders can apply their own log immediately, but doing it
% before this batching gate defeats raft_commit_batch_interval_ms and turns
% every standalone durable write into its own fsync. Keep the command
% pending until the same batch window/max-count policy used by multi-member
% leaders decides to flush. Arm the window only for the first pending
% commit. Re-arming it on every commit lets continuous sub-max traffic
% postpone the append until the stream goes idle.
PendingCount = pending_count(State1),
case ?RAFT_COMMIT_BATCH_INTERVAL(App, Table) > 0 andalso PendingCount =< ?RAFT_COMMIT_BATCH_MAX_ENTRIES(App, Table) of
true ->
?RAFT_COUNT(Table, 'commit.batch.delay'),
case HadPending of
true ->
{keep_state, State1};
false ->
{keep_state, State1, ?COMMIT_BATCH_TIMEOUT(State1)}
end;
false ->
State2 = append_entries_to_followers(State1),
State3 = apply_single_node_cluster(State2),
{keep_state, State3, ?HEARTBEAT_TIMEOUT(State3)}
end;
%% [Strong Read]
%% If a handover is in progress, reject the read immediately with a
%% notify_redirect error so the client can redirect to the new leader.
leader(
cast,
?READ_COMMAND({From, _}),
#raft_state{
table = Table,
queues = Queues,
handover = {Peer, _, _}
} = _State
) ->
?RAFT_COUNT(Table, 'read.rejected.handover'),
wa_raft_queue:fulfill_incomplete_read(Queues, From, {error, {notify_redirect, Peer}}),
keep_state_and_data;
leader(
cast,
?READ_COMMAND({From, Command}),
#raft_state{
self = Self,
queues = Queues,
storage = Storage,
commit_index = CommitIndex,
last_applied = LastApplied,
pending_high = PendingHigh,
pending_low = PendingLow,
first_current_term_log_index = FirstLogIndex
} = State0
) ->
ReadIndex = max(CommitIndex, FirstLogIndex),
case is_single_member(Self, config(State0)) of
% If we are a single node cluster and we are fully-applied, then immediately dispatch.
true when PendingHigh =:= [], PendingLow =:= [], ReadIndex =< LastApplied ->
wa_raft_storage:apply_read(Storage, From, Command),
{keep_state, State0};
_ ->
ok = wa_raft_queue:submit_read(Queues, ReadIndex, From, Command),
% Regardless of whether or not the read index is an existing log entry, indicate that
% a read is pending as the leader must establish a new quorum to be able to serve the
% read request.
{keep_state, State0#raft_state{pending_read = true}}
end;
%% [Resign] Leader resigns by switching to follower state.
leader({call, From}, ?RESIGN_COMMAND, #raft_state{} = State) ->
?SERVER_LOG_NOTICE(State, "resigns.", []),
{next_state, follower_or_witness(State), State, {reply, From, ok}};
%% [Adjust Membership] Leader attempts to commit a single-node membership change.
leader(Type, ?ADJUST_CONFIG_COMMAND(From, Action, Index), #raft_state{queues = Queues} = State0) ->
maybe
ok ?= leader_config_change_allowed(Index, State0),
{ok, NewConfig} ?= leader_adjust_config(Action, State0),
% With all checks completed, we can now attempt to append the new
% configuration to the log. If successful, a round of heartbeats is
% immediately started to replicate the change as soon as possible.
{ok, #raft_log_pos{index = NewConfigIndex} = NewConfigPosition, State1} ?=
leader_change_config(NewConfig, From, State0),
?SERVER_LOG_NOTICE(
State1,
"is attempting to change configuration from ~0p to ~0p at ~0p.",
[config(State0), NewConfig, NewConfigIndex]
),
State2 = apply_single_node_cluster(State1),
State3 = append_entries_to_followers(State2),
reply(Type, {ok, NewConfigPosition}),
{keep_state, State3, ?HEARTBEAT_TIMEOUT(State3)}
else
{error, Reason} ->
?SERVER_LOG_NOTICE(
State0,
"failed to apply action ~0p to current configuration ~0p due to ~0P.",
[Action, config(State0), Reason, 20]
),
From =/= undefined andalso
wa_raft_queue:commit_completed(Queues, From, {error, Reason}, high),
reply(Type, {error, Reason}),
keep_state_and_data
end;
%% [Handover Candidates] Return list of handover candidates (peers that are not lagging too much)
leader({call, From}, ?HANDOVER_CANDIDATES_COMMAND, #raft_state{} = State) ->
{keep_state_and_data, {reply, From, {ok, get_handover_candidates(State)}}};
%% [Is Peer Ready] Check if participant is caught up
leader({call, From}, ?IS_PEER_READY_COMMAND(Peer), #raft_state{} = State) ->
Config = config(State),
IsParticipant = lists:member(Peer, config_participants(Config)),
IsReady = is_eligible_for_handover(Peer, State),
Result =
if
not IsParticipant -> {error, not_a_participant};
not IsReady -> {error, not_ready};
true -> ok
end,
{keep_state_and_data, {reply, From, Result}};
%% [Handover] With peer 'undefined' randomly select a valid candidate to handover to
leader(Type, ?HANDOVER_COMMAND(undefined), #raft_state{} = State) ->
case get_handover_candidates(State) of
[] ->
?SERVER_LOG_NOTICE(State, "has no valid peer to handover to.", []),
reply(Type, {error, no_valid_peer}),
keep_state_and_data;
Candidates ->
Peer = lists:nth(rand:uniform(length(Candidates)), Candidates),
leader(Type, ?HANDOVER_COMMAND(Peer), State)
end;
%% [Handover] Handover to self results in no-op
leader(Type, ?HANDOVER_COMMAND(Peer), #raft_state{} = State) when Peer =:= node() ->
?SERVER_LOG_WARNING(State, "dropping handover to self.", []),
reply(Type, {ok, Peer}),
{keep_state, State};
%% [Handover] Attempt to start a handover to the specified peer
leader(
Type,
?HANDOVER_COMMAND(Peer),
#raft_state{
application = App,
table = Table,
name = Name,
log_view = View,
match_indices = MatchIndices,
handover = undefined
} = State0
) ->
% TODO T246543673 For the time being, assume that all members of the
% cluster use the same server name.
case is_member({Name, Peer}, config(State0)) of
false ->
?SERVER_LOG_WARNING(State0, "dropping handover to unknown peer ~0p.", [Peer]),
reply(Type, {error, invalid_peer}),
keep_state_and_data;
true ->
PeerMatchIndex = maps:get(Peer, MatchIndices, 0),
FirstIndex = wa_raft_log:first_index(View),
PeerSendIndex = max(PeerMatchIndex + 1, FirstIndex + 1),
LastIndex = wa_raft_log:last_index(View),
MaxHandoverBatchSize = ?RAFT_HANDOVER_MAX_ENTRIES(App, Table),
MaxHandoverBytes = ?RAFT_HANDOVER_MAX_BYTES(App, Table),
case LastIndex - PeerSendIndex =< MaxHandoverBatchSize of
true ->
?RAFT_COUNT(Table, 'leader.handover'),
?SERVER_LOG_NOTICE(State0, "starting handover to ~p.", [Peer]),
PrevLogIndex = PeerSendIndex - 1,
{ok, PrevLogTerm} = wa_raft_log:term(View, PrevLogIndex),
{ok, LogEntries} = wa_raft_log:entries(View, PeerSendIndex, MaxHandoverBatchSize, MaxHandoverBytes),
% The request to load the log may result in not all required log entries being loaded
% if we hit the byte size limit. Ensure that we have loaded all required log entries
% before initiating a handover.
case PrevLogIndex + length(LogEntries) of
LastIndex ->
Ref = make_ref(),
Timeout = erlang:monotonic_time(millisecond) + ?RAFT_HANDOVER_TIMEOUT(App, Table),
State1 = State0#raft_state{handover = {Peer, Ref, Timeout}},
send_rpc(?IDENTITY_REQUIRES_MIGRATION(Name, Peer), ?HANDOVER(Ref, PrevLogIndex, PrevLogTerm, LogEntries), State1),
reply(Type, {ok, Peer}),
{keep_state, State1};
_ ->
?RAFT_COUNT(Table, 'leader.handover.oversize'),
?SERVER_LOG_WARNING(State0, "handover to peer ~0p would require an oversized RPC.", [Peer]),
reply(Type, {error, oversize}),
keep_state_and_data
end;
false ->
?RAFT_COUNT(Table, 'leader.handover.peer_lagging'),
?SERVER_LOG_WARNING(State0, "determines that peer ~0p is not eligible for handover because it is ~0p entries behind.",
[Peer, LastIndex - PeerSendIndex]),
reply(Type, {error, peer_lagging}),
keep_state_and_data
end
end;
%% [Handover] Reject starting a handover when a handover is still in progress
leader({call, From}, ?HANDOVER_COMMAND(Peer), #raft_state{handover = {Node, _, _}} = State) ->
?SERVER_LOG_WARNING(State, "rejecting duplicate handover to ~0p with running handover to ~0p.", [Peer, Node]),
{keep_state_and_data, {reply, From, {error, duplicate}}};
%% [Command] Defer to common handling for generic RAFT server commands
leader(Type, ?RAFT_COMMAND(_, _) = Event, #raft_state{} = State) ->
command(?FUNCTION_NAME, Type, Event, State);
%% [Async Log Append]
%% A feature-flagged single-member fast path writes the local durable log on a
%% background process. Completion is the only point where log_view advances and
%% entries may become visible/applied.
leader(info, {async_log_append_complete, Ref, Result}, #raft_state{} = State0) ->
State1 = complete_async_log_append(Ref, Result, State0),
State2 = append_entries_to_followers(State1),
State3 = apply_single_node_cluster(State2),
State4 = update_quorum_ts(State3),
update_status(?FUNCTION_NAME, State4),
{keep_state, State4, ?HEARTBEAT_TIMEOUT(State4)};
leader(info, {'DOWN', MonitorRef, process, WorkerPid, Reason}, #raft_state{} = State0) ->
State1 = complete_async_log_append_down(?FUNCTION_NAME, MonitorRef, WorkerPid, Reason, State0),
State2 = append_entries_to_followers(State1),
update_status(?FUNCTION_NAME, State2),
{keep_state, State2, ?HEARTBEAT_TIMEOUT(State2)};
%% [Fallback] Report unhandled events
leader(Type, Event, #raft_state{} = State) ->
?SERVER_LOG_WARNING(State, "did not know how to handle ~0p event ~0P", [Type, Event, 20]),
keep_state_and_data.
%%------------------------------------------------------------------------------
%% RAFT Server - State Machine Implementation - Follower State
%%------------------------------------------------------------------------------
%% In a RAFT cluster, a follower is a replica that is receiving replicated log
%% entries from the leader of a RAFT term. The follower participates in quorum
%% decisions about log entries received from the leader by appending those log
%% entries to its own local copy of the RAFT log.
%%------------------------------------------------------------------------------
-spec follower
(
Type :: enter,
LastState :: state(),
Data :: #raft_state{}
) -> gen_statem:state_enter_result(state(), #raft_state{});
(
Type :: gen_statem:event_type(),
Event :: event(),
Data :: #raft_state{}
) -> gen_statem:event_handler_result(state(), #raft_state{}).
follower(enter, LastState, #raft_state{table = Table} = State) ->
?RAFT_COUNT(Table, 'follower.enter'),
?FUNCTION_NAME =/= LastState andalso
?SERVER_LOG_NOTICE(State, "becomes follower from state ~0p.", [LastState]),
{keep_state, enter_state(?FUNCTION_NAME, State), ?ELECTION_TIMEOUT(State)};
%% [Internal] Advance to newer term when requested
follower(internal, ?ADVANCE_TERM(NewTerm), #raft_state{table = Table, current_term = CurrentTerm} = State) when NewTerm > CurrentTerm ->
?RAFT_COUNT(Table, 'follower.advance_term'),
?SERVER_LOG_NOTICE(State, "advancing to new term ~0p.", [NewTerm]),
{repeat_state, advance_term(?FUNCTION_NAME, NewTerm, undefined, State)};
%% [Protocol] Parse any RPCs in network formats
follower(Type, Event, #raft_state{} = State) when is_tuple(Event), element(1, Event) =:= rpc ->
handle_rpc(Type, Event, ?FUNCTION_NAME, State);
%% [AppendEntries RPC] Handle incoming heartbeats (5.2, 5.3)
follower(
Type,
?REMOTE(Leader, ?APPEND_ENTRIES(PrevLogIndex, PrevLogTerm, Entries, CommitIndex, TrimIndex)),
#raft_state{} = State
) ->
handle_heartbeat(?FUNCTION_NAME, Type, Leader, PrevLogIndex, PrevLogTerm, Entries, CommitIndex, TrimIndex, State);
%% [AppendEntriesResponse RPC] Followers should not act upon any incoming heartbeat responses (5.2)
follower(_, ?REMOTE(_, ?APPEND_ENTRIES_RESPONSE(_, _, _, _)), #raft_state{}) ->
keep_state_and_data;
%% [RequestPreVote RPC] Respond to pre-vote
follower(_, ?REMOTE(Candidate, ?REQUEST_PRE_VOTE(Ref)), #raft_state{} = Data) ->
handle_request_pre_vote(Candidate, Ref, Data);
%% [PreVote RPC] Pre-votes are only useful to candidates
follower(_, ?REMOTE(_, ?PRE_VOTE(_, _, _, _)), #raft_state{}) ->
keep_state_and_data;
%% [RequestVote RPC] Handle incoming vote requests (5.2)
follower(_, ?REMOTE(Candidate, ?REQUEST_VOTE(_, CandidateIndex, CandidateTerm)), #raft_state{} = State) ->
handle_request_vote(?FUNCTION_NAME, Candidate, CandidateIndex, CandidateTerm, State);
%% [Vote RPC] Votes are only useful to candidates
follower(_, ?REMOTE(_, ?VOTE(_)), #raft_state{}) ->
keep_state_and_data;
%% [Handover][Handover RPC] The leader is requesting this follower to take over leadership in a new term
follower(
_,
?REMOTE(
Sender,
?HANDOVER(Ref, PrevLogIndex, PrevLogTerm, LogEntries)
),
#raft_state{
application = App,
table = Table,
leader = Sender
} = State0
) ->
?RAFT_COUNT(Table, 'follower.handover'),
?SERVER_LOG_NOTICE(State0, "evaluating handover RPC from ~0p.", [Sender]),
case ?RAFT_LEADER_ELIGIBLE(App) andalso ?RAFT_ELECTION_WEIGHT(App) =/= 0 of
true ->
case append_entries(?FUNCTION_NAME, PrevLogIndex, PrevLogTerm, LogEntries, length(LogEntries), State0) of
{ok, true, _, State1} ->
case is_self_witness(State1) of
false ->
?SERVER_LOG_NOTICE(State1, "immediately starting new election due to append success during handover RPC.", []),
{next_state, candidate, State1#raft_state{next_election_type = force}};
true ->
?SERVER_LOG_NOTICE(State1, "failing handover as a witness after handover append.", []),
send_rpc(Sender, ?HANDOVER_FAILED(Ref), State1),
{next_state, witness, State1}
end;
{ok, false, _, State1} ->
?RAFT_COUNT(Table, 'follower.handover.rejected'),
?SERVER_LOG_WARNING(State1, "failing handover request because append was rejected.", []),
send_rpc(Sender, ?HANDOVER_FAILED(Ref), State1),
{keep_state, State1};
{fatal, Reason} ->
?RAFT_COUNT(Table, 'follower.handover.fatal'),
?SERVER_LOG_WARNING(State0, "failing handover request because append was fatal due to ~0P.", [Reason, 30]),
send_rpc(Sender, ?HANDOVER_FAILED(Ref), State0),
{next_state, disabled, State0#raft_state{disable_reason = Reason}}
end;
false ->
?SERVER_LOG_NOTICE(State0, "not considering handover RPC due to being inelgibile for leadership.", []),
send_rpc(Sender, ?HANDOVER_FAILED(Ref), State0),
{keep_state, State0}
end;
%% [Handover][HandoverFailed RPC] Followers should not act upon any incoming failed handover
follower(_, ?REMOTE(_, ?HANDOVER_FAILED(_)), #raft_state{}) ->
keep_state_and_data;
%% [Follower] handle timeout
%% follower doesn't receive any heartbeat. starting a new election
follower(state_timeout, _, #raft_state{
table = Table,
leader = Leader,
log_view = View,
last_quorum_ts = LastQuorumTs
} = State
) ->
?RAFT_COUNT(Table, 'follower.timeout'),
case candidate_eligible(State) of
true ->
WaitingMs = case LastQuorumTs of
undefined -> undefined;
_ -> erlang:monotonic_time(millisecond) - LastQuorumTs
end,
?SERVER_LOG_NOTICE(
State,
"times out and starts election at ~0p after waiting for leader ~0p for ~0p ms.",
[wa_raft_log:last_index(View), identity_to_server(Leader), WaitingMs]
),
{next_state, candidate, State};
false ->
?SERVER_LOG_NOTICE(State, "is not timing out due to being ineligible or having zero election weight.", []),
{repeat_state, State}
end;
%% [Command] Defer to common handling for generic RAFT server commands
follower(Type, ?RAFT_COMMAND(_, _) = Event, #raft_state{} = State) ->
command(?FUNCTION_NAME, Type, Event, State);
%% [Async Log Append]
%% Completion can arrive after resign/term change. Since the client request is
%% cancelled, rollback any local append bytes before releasing it as not-leader.
follower(info, {async_log_append_complete, Ref, Result}, #raft_state{} = State0) ->
State1 = complete_async_log_append_after_leadership_loss(?FUNCTION_NAME, Ref, Result, State0),
{keep_state, State1};
follower(info, {'DOWN', MonitorRef, process, WorkerPid, Reason}, #raft_state{} = State0) ->
State1 = complete_async_log_append_down(?FUNCTION_NAME, MonitorRef, WorkerPid, Reason, State0),
{keep_state, State1};
%% [Fallback] Report unhandled events
follower(Type, Event, #raft_state{} = State) ->
?SERVER_LOG_WARNING(State, "did not know how to handle ~0p event ~0P", [Type, Event, 20]),
keep_state_and_data.
%%------------------------------------------------------------------------------
%% RAFT Server - State Machine Implementation - Candidate State
%%------------------------------------------------------------------------------
%% In a RAFT cluster, a candidate is a replica that is attempting to become the
%% leader of a RAFT term. It is waiting for responses from the other members of
%% the RAFT cluster to determine if it has received enough votes to assume the
%% leadership of the RAFT term.
%%------------------------------------------------------------------------------
-spec candidate
(
Type :: enter,
LastState :: state(),
Data :: #raft_state{}
) -> gen_statem:state_enter_result(state(), #raft_state{});
(
Type :: gen_statem:event_type(),
Event :: event(),
Data :: #raft_state{}
) -> gen_statem:event_handler_result(state(), #raft_state{}).
%% [Enter] Node starts a new election upon entering the candidate state.
candidate(
enter,
LastState,
#raft_state{
application = App,
table = Table,
next_election_type = ElectionType
} = State0
) ->
?RAFT_COUNT(Table, 'leader.election_started'),
?RAFT_COUNT(Table, 'candidate.enter'),
?FUNCTION_NAME =/= LastState andalso
?SERVER_LOG_NOTICE(State0, "becomes candidate from state ~0p.", [LastState]),
State1 = enter_state(?FUNCTION_NAME, State0#raft_state{next_election_type = normal}),
case ElectionType =:= normal andalso ?RAFT_ELECTION_PRE_VOTE(App, Table) of
true ->
?SERVER_LOG_NOTICE(State1, "advances to new term and starts pre-vote.", []),
% When pre-vote is enabled, a round of pre-votes must first be issued
% before the term can be advanced.
Ref = make_ref(),
State2 = State1#raft_state{
pre_vote_ref = Ref,
pre_votes = #{node() => true}
},
% Request a pre-vote from all peers.
send_rpc_to_all_members(?REQUEST_PRE_VOTE(Ref), State2),
{keep_state, State2, ?ELECTION_TIMEOUT(State2)};
false ->
% Otherwise, immediately start the election.
candidate_start_election(ElectionType, State1)
end;
%% [Internal] Advance to newer term when requested
candidate(
internal,
?ADVANCE_TERM(NewTerm),
#raft_state{
table = Table,
current_term = CurrentTerm
} = State
) when NewTerm > CurrentTerm ->
?RAFT_COUNT(Table, 'candidate.advance_term'),
?SERVER_LOG_NOTICE(State, "advancing to new term ~0p.", [NewTerm]),
State1 = advance_term(?FUNCTION_NAME, NewTerm, undefined, State),
{next_state, follower_or_witness(State1), State1};
%% [Protocol] Parse any RPCs in network formats
candidate(Type, Event, #raft_state{} = State) when is_tuple(Event), element(1, Event) =:= rpc ->
handle_rpc(Type, Event, ?FUNCTION_NAME, State);
%% [AppendEntries RPC] Switch to follower because current term now has a leader (5.2, 5.3)
candidate(Type, ?REMOTE(Sender, ?APPEND_ENTRIES(_, _, _, _, _)) = Event, #raft_state{} = State) ->
?SERVER_LOG_NOTICE(State, "switching to follower after receiving heartbeat from ~0p.", [Sender]),
{next_state, follower_or_witness(State), State, {next_event, Type, Event}};
%% [RequestPreVote RPC] Respond to pre-vote
candidate(_, ?REMOTE(Candidate, ?REQUEST_PRE_VOTE(Ref)), #raft_state{} = Data) ->
handle_request_pre_vote(Candidate, Ref, Data);
%% [PreVote RPC] Check if a full pre-vote quorum is established and if so, proceed with election (9.6)
candidate(
_,
?REMOTE(?IDENTITY_REQUIRES_MIGRATION(_, PeerId), ?PRE_VOTE(Ref, Vote, PeerLastIndex, PeerLastTerm)),
#raft_state{
table = Table,
state_start_ts = StateStart,
log_view = View,
pre_vote_ref = Ref,
pre_votes = PreVotes
} = Data
) ->
Now = erlang:monotonic_time(millisecond),
LastIndex = wa_raft_log:last_index(View),
{ok, LastTerm} = wa_raft_log:term(View, LastIndex),
Result = Vote andalso {LastTerm, LastIndex} >= {PeerLastTerm, PeerLastIndex},
NewPreVotes = PreVotes#{PeerId => Result},
NewData = Data#raft_state{pre_votes = NewPreVotes},
case find_member_majority(NewPreVotes, config(NewData)) of
{found, Outcome} ->
% If a majority is found, then the pre-vote round is over, either accepted or rejected.
Duration = Now - StateStart,
case Outcome of
true ->
Support = [Peer || Peer := true <- NewPreVotes],
?SERVER_LOG_NOTICE(
NewData,
"is proceeding to election after ~0p ms with pre-votes from ~0p.",
[Duration, Support]
),
?RAFT_COUNT(Table, 'candidate.pre_vote.passed'),
candidate_start_election(normal, NewData);
false ->
Opposition = [Peer || Peer := false <- NewPreVotes],
?SERVER_LOG_NOTICE(
NewData,
"has failed pre-vote after ~0p ms with opposition from ~0p.",
[Duration, Opposition]
),
?RAFT_COUNT(Table, 'candidate.pre_vote.failed'),
{next_state, follower_or_witness(NewData), NewData}
end;
none ->
{keep_state, NewData}
end;
%% [PreVote RPC] Ignore pre-votes from other rounds
candidate(_, ?REMOTE(_, ?PRE_VOTE(_, _, _, _)), #raft_state{}) ->
keep_state_and_data;
%% [RequestVote RPC] Candidates should reject incoming vote requests as they always vote for themselves (5.2)
candidate(_, ?REMOTE(Sender, ?REQUEST_VOTE(_, _, _)), #raft_state{} = Data) ->
send_rpc(Sender, ?VOTE(false), Data),
keep_state_and_data;
%% [Vote RPC] Candidate receives a vote (5.2)
candidate(
cast,
?REMOTE(?IDENTITY_REQUIRES_MIGRATION(_, Node), ?VOTE(Vote)),
#raft_state{
table = Table,
log_view = View,
state_start_ts = StateStart,
votes = Votes
} = Data0
) ->
NewVotes = Votes#{Node => Vote},
Data1 = Data0#raft_state{votes = NewVotes},
Data2 = update_heartbeat_reply_ts(Node, Data1),
case find_member_majority(NewVotes, config(Data2)) of
{found, Outcome} ->
% If a majority is found, then the election is over, either accepted or rejected.
Now = erlang:monotonic_time(millisecond),
Duration = Now - StateStart,
LastIndex = wa_raft_log:last_index(View),
{ok, LastTerm} = wa_raft_log:term(View, LastIndex),
case Outcome of
true ->
% If the bid for leadership is accepted, then transition to leader.
Support = [Peer || Peer := true <- NewVotes],
?SERVER_LOG_NOTICE(
Data2,
"is becoming leader after ~0p ms with log at ~0p:~0p and votes from ~0p.",
[Duration, LastIndex, LastTerm, Support]
),
?RAFT_COUNT(Table, 'candidate.elected'),
?RAFT_GATHER(Table, 'candidate.election.duration', Duration),
{next_state, leader, Data2};
false ->
% If the bid for leadership is rejected, then transition to follower.
% This will reset the election timeout which will, in effect, make this
% peer less likely to be the first candidate of the next term.
Opposition = [Peer || Peer := false <- NewVotes],
?SERVER_LOG_NOTICE(
Data2,
"has failed to be elected after ~0p ms with log at ~0p:~0p and opposition from ~0p.",
[Duration, LastIndex, LastTerm, Opposition]
),
?RAFT_COUNT(Table, 'candidate.rejected'),
?RAFT_GATHER(Table, 'candidate.election.duration', Duration),
{next_state, follower_or_witness(Data2), Data2}
end;
none ->
% If no majority is found, the continue waiting.
{keep_state, Data2}
end;
%% [Handover][Handover RPC] Switch to follower because current term now has a leader (5.2, 5.3)
candidate(Type, ?REMOTE(_, ?HANDOVER(_, _, _, _)) = Event, #raft_state{} = State) ->
?SERVER_LOG_NOTICE(State, "ends election to handle handover.", []),
{next_state, follower_or_witness(State), State, {next_event, Type, Event}};
%% [Handover][HandoverFailed RPC] Candidates should not act upon any incoming failed handover
candidate(_, ?REMOTE(_, ?HANDOVER_FAILED(_)), #raft_state{}) ->
keep_state_and_data;
%% [Candidate] Handle Election Timeout during pre-vote phase (9.6)
%% Candidate doesn't get enough pre-votes after a period of time, restart or fallback
%% to follower if the local replica is no longer eligible.
candidate(state_timeout, _, #raft_state{pre_vote_ref = PreVoteRef, pre_votes = PreVotes} = State) when PreVoteRef =/= undefined ->
?SERVER_LOG_NOTICE(
State,
"pre-vote timed out with support from ~0p and opposition from ~0p.",
[[Node || Node := true <- PreVotes], [Node || Node := false <- PreVotes]]
),
case candidate_eligible(State) of
true -> {repeat_state, State};
false -> {next_state, follower_or_witness(State), State}
end;
%% [Candidate] Handle Election Timeout during normal election (5.2)
%% Candidate doesn't get enough votes after a period of time, restart election or fallback
%% to follower if the local replica is no longer eligible.
candidate(state_timeout, _, #raft_state{votes = Votes} = State) ->
?SERVER_LOG_NOTICE(
State,
"election timed out with votes from ~0p and opposition from ~0p.",
[[Node || Node := true <- Votes], [Node || Node := false <- Votes]]
),
case candidate_eligible(State) of
true -> {repeat_state, State};
false -> {next_state, follower_or_witness(State), State}
end;
%% [Commit] Candidates may optimistically buffer any incoming commits pending election.
candidate(
cast = Event,
?COMMIT_COMMAND(From, Op, Priority) = Command,
#raft_state{application = App, table = Table} = State
) ->
case ?RAFT_CANDIDATE_BUFFER_REQUESTS(App, Table) of
true -> {keep_state, add_pending(From, Op, Priority, State)};
false -> command(?FUNCTION_NAME, Event, Command, State)
end;
%% [Strong Read] Candidates may optimistically submit reads to storage pending election.
%% The ReadIndex is set to one past the last log index which is the index that the
%% new term will start at if the candidate wins. This is safe because an election
%% failure will cancel all reads via cancel_pending/2.
candidate(
cast = Event,
?READ_COMMAND({From, ReadOp}) = Command,
#raft_state{application = App, table = Table, queues = Queues, log_view = View} = State
) ->
case ?RAFT_CANDIDATE_BUFFER_REQUESTS(App, Table) of
true ->
ReadIndex = wa_raft_log:last_index(View) + 1,
ok = wa_raft_queue:submit_read(Queues, ReadIndex, From, ReadOp),
{keep_state, State#raft_state{pending_read = true}};
false ->
command(?FUNCTION_NAME, Event, Command, State)
end;
%% [Command] Defer to common handling for generic RAFT server commands
candidate(Type, ?RAFT_COMMAND(_, _) = Event, #raft_state{} = State) ->
command(?FUNCTION_NAME, Type, Event, State);
%% [Async Log Append]
%% The append may finish while this node is campaigning. Preserve disk/log-view
%% consistency, but do not report the stale leader's client request as committed.
candidate(info, {async_log_append_complete, Ref, Result}, #raft_state{} = State0) ->
State1 = complete_async_log_append_after_leadership_loss(?FUNCTION_NAME, Ref, Result, State0),
{keep_state, State1};
candidate(info, {'DOWN', MonitorRef, process, WorkerPid, Reason}, #raft_state{} = State0) ->
State1 = complete_async_log_append_down(?FUNCTION_NAME, MonitorRef, WorkerPid, Reason, State0),
{keep_state, State1};
%% [Fallback] Report unhandled events
candidate(Type, Event, #raft_state{} = State) ->
?SERVER_LOG_WARNING(State, "did not know how to handle ~0p event ~0P.", [Type, Event, 20]),
keep_state_and_data.
%%------------------------------------------------------------------------------
%% RAFT Server - State Machine Implementation - Disabled State
%%------------------------------------------------------------------------------
%% The disabled state is an extension to the RAFT protocol used to hold any
%% replicas of an FSM that have for some reason or another identified that some
%% deficiency or malfunction that makes them unfit to either enforce any prior
%% quorum decisions or properly participate in future quorum decisions. Common
%% reasons include the detection of corruptions or inconsistencies within the
%% FSM state or RAFT log. The reason for which the replica was disabled is kept
%% in persistent so that the replica will remain disabled even when restarted.
%%------------------------------------------------------------------------------
-spec disabled
(
Type :: enter,
LastState :: state(),
Data :: #raft_state{}
) -> gen_statem:state_enter_result(state(), #raft_state{});
(
Type :: gen_statem:event_type(),
Event :: event(),
Data :: #raft_state{}
) -> gen_statem:event_handler_result(state(), #raft_state{}).
disabled(enter, LastState, #raft_state{table = Table, disable_reason = DisableReason} = State0) ->
?RAFT_COUNT(Table, 'disabled.enter'),
case LastState of
?FUNCTION_NAME ->
{keep_state, enter_state(?FUNCTION_NAME, State0)};
_ ->
?SERVER_LOG_NOTICE(State0, "becomes disabled from state ~0p with reason ~0p.", [LastState, DisableReason]),
State1 = case DisableReason of
undefined -> State0#raft_state{disable_reason = "No reason specified."};
_ -> State0
end,
State2 = enter_state(?FUNCTION_NAME, State1),
wa_raft_durable_state:store(State2),
{keep_state, State2}
end;
%% [Internal] Advance to newer term when requested
disabled(
internal,
?ADVANCE_TERM(NewTerm),
#raft_state{
table = Table,
current_term = CurrentTerm
} = State
) when NewTerm > CurrentTerm ->
?RAFT_COUNT(Table, 'disabled.advance_term'),
?SERVER_LOG_NOTICE(State, "advancing to new term ~0p.", [NewTerm]),
{repeat_state, advance_term(?FUNCTION_NAME, NewTerm, undefined, State)};
%% [Protocol] Parse any RPCs in network formats
disabled(Type, Event, #raft_state{} = State) when is_tuple(Event), element(1, Event) =:= rpc ->
handle_rpc(Type, Event, ?FUNCTION_NAME, State);
%% [AppendEntries RPC] Disabled servers should not act upon any incoming heartbeats as they should
%% behave as if dead to the cluster
disabled(_, ?REMOTE(_, ?APPEND_ENTRIES(_, _, _, _, _)), #raft_state{}) ->
keep_state_and_data;
%% [RequestPreVote RPC]
%% Disabled servers should not act upon any pre-vote requests as they should
%% be invisible to the rest of the cluster
disabled(_, ?REMOTE(_, ?REQUEST_PRE_VOTE(_)), #raft_state{}) ->
keep_state_and_data;
%% [PreVote RPC] Pre-votes are only useful to candidates
disabled(_, ?REMOTE(_, ?PRE_VOTE(_, _, _, _)), #raft_state{}) ->
keep_state_and_data;
%% [RequestVote RPC] Disabled servers should reject any vote requests as they should behave
%% as if dead to the cluster
disabled(_, ?REMOTE(Sender, ?REQUEST_VOTE(_, _, _)), #raft_state{} = Data) ->
send_rpc(Sender, ?VOTE(false), Data),
keep_state_and_data;
%% [Vote RPC] Votes are only useful to candidates
disabled(_, ?REMOTE(_, ?VOTE(_)), #raft_state{}) ->
keep_state_and_data;
disabled({call, From}, ?TRIGGER_ELECTION_COMMAND(_), #raft_state{}) ->
{keep_state_and_data, {reply, From, {error, invalid_state}}};
disabled({call, From}, ?PROMOTE_COMMAND(_, _), #raft_state{}) ->
{keep_state_and_data, {reply, From, {error, invalid_state}}};
disabled({call, From}, ?ENABLE_COMMAND, #raft_state{} = State0) ->
?SERVER_LOG_NOTICE(State0, "re-enabling by request from ~0p by moving to stalled state.", [From]),
State1 = State0#raft_state{disable_reason = undefined},
wa_raft_durable_state:store(State1),
{next_state, stalled, State1, {reply, From, ok}};
%% [Command] Defer to common handling for generic RAFT server commands
disabled(Type, ?RAFT_COMMAND(_, _) = Event, #raft_state{} = State) ->
command(?FUNCTION_NAME, Type, Event, State);
%% [Async Log Append]
%% Disabled nodes must still consume a stale async append completion so the
%% append gate cannot leak across enable/restart paths.
disabled(info, {async_log_append_complete, Ref, Result}, #raft_state{} = State0) ->
State1 = complete_async_log_append_after_leadership_loss(?FUNCTION_NAME, Ref, Result, State0),
{keep_state, State1};
disabled(info, {'DOWN', MonitorRef, process, WorkerPid, Reason}, #raft_state{} = State0) ->
State1 = complete_async_log_append_down(?FUNCTION_NAME, MonitorRef, WorkerPid, Reason, State0),
{keep_state, State1};
%% [Fallback] Report unhandled events
disabled(Type, Event, #raft_state{} = State) ->
?SERVER_LOG_WARNING(State, "did not know how to handle ~0p event ~0P", [Type, Event, 20]),
keep_state_and_data.
%%------------------------------------------------------------------------------
%% RAFT Server - State Machine Implementation - Witness State
%%------------------------------------------------------------------------------
%% The witness state is an extension to the RAFT protocol that identifies a
%% replica as a special "witness replica" that participates in quorum decisions
%% but does not retain a full copy of the actual underlying FSM. These replicas
%% can use significantly fewer system resources to operate however it is not
%% recommended for more than 25% of the replicas in a RAFT cluster to be
%% witness replicas as having more than such a number of witness replicas can
%% result in significantly reduced chance of data durability in the face of
%% unexpected replica loss.
%%------------------------------------------------------------------------------
-spec witness
(
Type :: enter,
LastState :: state(),
Data :: #raft_state{}
) -> gen_statem:state_enter_result(state(), #raft_state{});
(
Type :: gen_statem:event_type(),
Event :: event(),
Data :: #raft_state{}
) -> gen_statem:event_handler_result(state(), #raft_state{}).
witness(enter, LastState, #raft_state{table = Table} = State) ->
?RAFT_COUNT(Table, 'witness.enter'),
?FUNCTION_NAME =/= LastState andalso
?SERVER_LOG_NOTICE(State, "becomes witness from state ~0p.", [LastState]),
{keep_state, enter_state(?FUNCTION_NAME, State), ?ELECTION_TIMEOUT(State)};
%% [Internal] Advance to newer term when requested
witness(
internal,
?ADVANCE_TERM(NewTerm),
#raft_state{
table = Table,
current_term = CurrentTerm
} = State
) when NewTerm > CurrentTerm ->
?RAFT_COUNT(Table, 'witness.advance_term'),
?SERVER_LOG_NOTICE(State, "advancing to new term ~0p.", [NewTerm]),
{repeat_state, advance_term(?FUNCTION_NAME, NewTerm, undefined, State)};
%% [Protocol] Parse any RPCs in network formats
witness(Type, Event, #raft_state{} = State) when is_tuple(Event), element(1, Event) =:= rpc ->
handle_rpc(Type, Event, ?FUNCTION_NAME, State);
%% [AppendEntries RPC] Handle incoming heartbeats (5.2, 5.3)
witness(
Type,
?REMOTE(Leader, ?APPEND_ENTRIES(PrevLogIndex, PrevLogTerm, Entries, CommitIndex, TrimIndex)),
#raft_state{} = State
) ->
handle_heartbeat(?FUNCTION_NAME, Type, Leader, PrevLogIndex, PrevLogTerm, Entries, CommitIndex, TrimIndex, State);
%% [AppendEntriesResponse RPC] Witnesses should not act upon any incoming heartbeat responses (5.2)
witness(_, ?REMOTE(_, ?APPEND_ENTRIES_RESPONSE(_, _, _, _)), #raft_state{}) ->
keep_state_and_data;
%% [Handover][Handover RPC] Witnesses should not receive handover requests
witness(_, ?REMOTE(Sender, ?HANDOVER(Reference, _, _, _)), #raft_state{} = State) ->
send_rpc(Sender, ?HANDOVER_FAILED(Reference), State),
keep_state_and_data;
%% [Handover][HandoverFailed RPC] Witnesses should not act upon any incoming failed handover
witness(_, ?REMOTE(_, ?HANDOVER_FAILED(_)), #raft_state{}) ->
keep_state_and_data;
witness({call, From}, ?TRIGGER_ELECTION_COMMAND(_), #raft_state{}) ->
{keep_state_and_data, {reply, From, {error, invalid_state}}};
witness({call, From}, ?PROMOTE_COMMAND(_, _), #raft_state{}) ->
{keep_state_and_data, {reply, From, {error, invalid_state}}};
%% [RequestPreVote RPC] Respond to pre-vote
witness(_, ?REMOTE(Candidate, ?REQUEST_PRE_VOTE(Ref)), #raft_state{} = Data) ->
handle_request_pre_vote(Candidate, Ref, Data);
%% [PreVote RPC] Pre-votes are only useful to candidates
witness(_, ?REMOTE(_, ?PRE_VOTE(_, _, _, _)), #raft_state{}) ->
keep_state_and_data;
%% [RequestVote RPC] Handle incoming vote requests (5.2)
witness(_, ?REMOTE(Candidate, ?REQUEST_VOTE(_, CandidateIndex, CandidateTerm)), #raft_state{} = State) ->
handle_request_vote(?FUNCTION_NAME, Candidate, CandidateIndex, CandidateTerm, State);
%% [Vote RPC] Votes are only useful to candidates
witness(_, ?REMOTE(_, ?VOTE(_)), #raft_state{}) ->
keep_state_and_data;
%% [State Timeout] Check liveness, but do not restart state.
witness(state_timeout, _, #raft_state{} = State) ->
update_status(?FUNCTION_NAME, State),
{keep_state_and_data, ?ELECTION_TIMEOUT(State)};
%% [Command] Defer to common handling for generic RAFT server commands
witness(Type, ?RAFT_COMMAND(_, _) = Event, #raft_state{} = State) ->
command(?FUNCTION_NAME, Type, Event, State);
%% [Async Log Append]
%% Witness state should not normally own local log appends, but consume stale
%% completions defensively after role changes.
witness(info, {async_log_append_complete, Ref, Result}, #raft_state{} = State0) ->
State1 = complete_async_log_append_after_leadership_loss(?FUNCTION_NAME, Ref, Result, State0),
{keep_state, State1};
witness(info, {'DOWN', MonitorRef, process, WorkerPid, Reason}, #raft_state{} = State0) ->
State1 = complete_async_log_append_down(?FUNCTION_NAME, MonitorRef, WorkerPid, Reason, State0),
{keep_state, State1};
%% [Fallback] Report unhandled events
witness(Type, Event, #raft_state{} = State) ->
?SERVER_LOG_WARNING(State, "did not know how to handle ~0p event ~0P", [Type, Event, 20]),
keep_state_and_data.
%%------------------------------------------------------------------------------
%% RAFT Server - State Machine Implementation - Command Handlers
%%------------------------------------------------------------------------------
%% Fallbacks for command calls to the RAFT server for when there is no special
%% handling for a command defined within the state-specific callback itself.
%%------------------------------------------------------------------------------
-spec command(
State :: state(),
Type :: gen_statem:event_type(),
Command :: command(),
Data :: #raft_state{}
) -> gen_statem:event_handler_result(state(), #raft_state{}).
%% [Commit] Non-leader nodes should fail commits with {error, not_leader}.
command(
State,
cast,
?COMMIT_COMMAND(From, _Op, Priority),
#raft_state{queues = Queues, leader = Leader} = Data
) when State =/= leader ->
?SERVER_LOG_WARNING(
State,
Data,
"commit fails as leader is currently ~0p.",
[identity_to_server(Leader)]
),
wa_raft_queue:commit_cancelled(Queues, From, {error, not_leader}, Priority),
keep_state_and_data;
%% [Strong Read] Non-leader nodes are not eligible for strong reads.
command(
State,
cast,
?READ_COMMAND({From, _}),
#raft_state{queues = Queues, leader = Leader} = Data
) when State =/= leader ->
?SERVER_LOG_WARNING(
State,
Data,
"strong read fails as leader is currently ~0p.",
[identity_to_server(Leader)]
),
wa_raft_queue:fulfill_incomplete_read(Queues, From, {error, not_leader}),
keep_state_and_data;
%% [Notify Complete] Attempt to send more log entries to storage if applicable.
command(
State,
cast,
?NOTIFY_COMPLETE_COMMAND(),
#raft_state{queues = Queues} = Data
) when State =:= leader; State =:= follower; State =:= witness ->
case wa_raft_queue:apply_queue_size(Queues) of
0 ->
NewState = case State of
leader -> leader_apply_log(Data);
_ -> apply_log(State, infinity, Data)
end,
{keep_state, NewState};
_ ->
keep_state_and_data
end;
command(_, cast, ?NOTIFY_COMPLETE_COMMAND(), #raft_state{}) ->
keep_state_and_data;
%% [CurrentConfig] Get replica's effective RAFT cluster configuration
command(_, Type, ?CURRENT_CONFIG_COMMAND, #raft_state{} = Data) ->
reply(Type, config(Data)),
keep_state_and_data;
%% [Status] Get status of node.
command(State, {call, From}, ?STATUS_COMMAND, #raft_state{} = Data) ->
{LeaderName, LeaderId} =
case Data#raft_state.leader of
undefined -> {undefined, undefined};
#raft_identity{name = Name, node = Node} -> {Name, Node}
end,
Status = [
{state, State},
{id, Data#raft_state.self#raft_identity.node},
{table, Data#raft_state.table},
{partition, Data#raft_state.partition},
{partition_path, Data#raft_state.partition_path},
{current_term, Data#raft_state.current_term},
{voted_for, Data#raft_state.voted_for},
{commit_index, Data#raft_state.commit_index},
{last_applied, Data#raft_state.last_applied},
{leader_name, LeaderName},
{leader_id, LeaderId},
{pending_high, length(Data#raft_state.pending_high)},
{pending_low, length(Data#raft_state.pending_low)},
{pending_read, Data#raft_state.pending_read},
{queued, maps:size(Data#raft_state.queued)},
{next_indices, Data#raft_state.next_indices},
{match_indices, Data#raft_state.match_indices},
{log_module, wa_raft_log:provider(Data#raft_state.log_view)},
{log_first, wa_raft_log:first_index(Data#raft_state.log_view)},
{log_last, wa_raft_log:last_index(Data#raft_state.log_view)},
{votes, Data#raft_state.votes},
{inflight_applies, wa_raft_queue:apply_queue_size(Data#raft_state.table, Data#raft_state.partition)},
{disable_reason, Data#raft_state.disable_reason},
{config, config(Data)},
{config_index, config_index(Data)},
{witness, is_self_witness(Data)}
],
{keep_state_and_data, {reply, From, Status}};
%% [Promote] Request full replica nodes to start a new election.
command(
State,
{call, From},
?TRIGGER_ELECTION_COMMAND(TermOrOffset),
#raft_state{
application = App,
current_term = CurrentTerm
} = Data
) when State =/= stalled, State =/= witness, State =/= disabled ->
Term = case TermOrOffset of
current -> CurrentTerm;
next -> CurrentTerm + 1;
{next, Offset} -> CurrentTerm + Offset;
_ -> TermOrOffset
end,
case is_integer(Term) andalso Term >= CurrentTerm of
true ->
case ?RAFT_LEADER_ELIGIBLE(App) of
true ->
?SERVER_LOG_NOTICE(State, Data, "switching to candidate after promotion request.", []),
NewData = case Term > CurrentTerm of
true -> advance_term(State, Term, undefined, Data);
false -> Data
end,
case State of
candidate -> {repeat_state, NewData, {reply, From, ok}};
_ -> {next_state, candidate, NewData, {reply, From, ok}}
end;
false ->
?SERVER_LOG_WARNING(State, Data, "cannot be promoted as candidate while ineligible.", []),
{keep_state_and_data, {reply, From, {error, ineligible}}}
end;
false ->
?SERVER_LOG_WARNING(State, Data, "refusing to promote to current, older, or invalid term ~0p.", [Term]),
{keep_state_and_data, {reply, From, {error, rejected}}}
end;
%% [Promote] Non-disabled nodes check if eligible to promote and then promote to leader.
command(
State,
{call, From},
?PROMOTE_COMMAND(TermOrOffset, Force),
#raft_state{
application = App,
table = Table,
current_term = CurrentTerm,
last_quorum_ts = LastQuorumTs,
leader = Leader
} = Data
) when State =/= stalled, State =/= witness, State =/= disabled ->
Now = erlang:monotonic_time(millisecond),
Eligible = ?RAFT_LEADER_ELIGIBLE(App),
HeartbeatGracePeriodMs = ?RAFT_PROMOTION_GRACE_PERIOD(App, Table) * 1000,
Term = case TermOrOffset of
current -> CurrentTerm;
next -> CurrentTerm + 1;
{next, Offset} -> CurrentTerm + Offset;
_ -> TermOrOffset
end,
Membership = get_config_members(config(Data)),
Allowed = if
% Prevent promotions to older or invalid terms
not is_integer(Term) orelse Term < CurrentTerm ->
?SERVER_LOG_WARNING(
State,
Data,
"cannot attempt promotion to current, older, or invalid term ~0p.",
[Term]
),
invalid_term;
Term =:= CurrentTerm andalso Leader =/= undefined ->
?SERVER_LOG_WARNING(
State,
Data,
"refusing to promote to leader of current term already led by ~0p.",
[identity_to_server(Leader)]
),
invalid_term;
% Prevent promotions that will immediately result in a resignation.
not Eligible ->
?SERVER_LOG_WARNING(
State,
Data,
"cannot promote to leader as the node is ineligible.",
[]
),
ineligible;
State =:= witness ->
?SERVER_LOG_WARNING(
State,
Data,
"cannot promote a witness node.",
[]
),
invalid_state;
% Prevent promotions to any operational state when there is no cluster membership configuration.
Membership =:= [] ->
?SERVER_LOG_WARNING(
State,
Data,
"cannot promote to leader with no existing membership.",
[]
),
invalid_configuration;
Force ->
true;
LastQuorumTs =:= undefined ->
true;
Now - LastQuorumTs >= HeartbeatGracePeriodMs ->
true;
true ->
?SERVER_LOG_WARNING(
State,
Data,
"rejecting request to promote to leader as a valid heartbeat was recently received.",
[]
),
rejected
end,
case Allowed of
true ->
?SERVER_LOG_NOTICE(State, Data, "is promoting to leader of term ~0p.", [Term]),
NewData = case Term > CurrentTerm of
true -> advance_term(State, Term, node(), Data);
false -> Data
end,
case State of
leader -> {repeat_state, NewData, {reply, From, ok}};
_ -> {next_state, leader, NewData, {reply, From, ok}}
end;
Reason ->
{keep_state_and_data, {reply, From, {error, Reason}}}
end;
%% [Resign] Non-leader nodes cannot resign.
command(State, {call, From}, ?RESIGN_COMMAND, #raft_state{} = Data) when State =/= leader ->
?SERVER_LOG_NOTICE(State, Data, "not resigning because we are not leader.", []),
{keep_state_and_data, {reply, From, {error, not_leader}}};
%% [AdjustMembership] Non-leader nodes cannot adjust their config.
command(
State,
Type,
?ADJUST_CONFIG_COMMAND(From, Action, _),
#raft_state{queues = Queues} = Data
) when State =/= leader ->
?SERVER_LOG_NOTICE(State, Data, "refusing to adjust config with action ~0p because we are not leader.", [Action]),
From =/= undefined andalso
wa_raft_queue:commit_cancelled(Queues, From, {error, not_leader}, high),
reply(Type, {error, not_leader}),
{keep_state, Data};
%% [Snapshot Available] Follower and candidate nodes might switch to stalled to install snapshot.
command(
State,
Type,
?SNAPSHOT_AVAILABLE_COMMAND(_, #raft_log_pos{index = SnapshotIndex}) = Event,
#raft_state{
last_applied = LastAppliedIndex
} = Data
) when State =:= follower; State =:= candidate; State =:= witness ->
case SnapshotIndex > LastAppliedIndex of
true ->
?SERVER_LOG_NOTICE(State, Data, "at ~0p is notified of a newer snapshot at ~0p.", [LastAppliedIndex, SnapshotIndex]),
{next_state, stalled, Data, {next_event, Type, Event}};
false ->
?SERVER_LOG_NOTICE(State, Data, "at ~0p is ignoring an older snapshot at ~0p.", [LastAppliedIndex, SnapshotIndex]),
reply(Type, {error, rejected}),
keep_state_and_data
end;
%% [Snapshot Available] Leader and disabled nodes should not install snapshots.
command(
State,
Type,
?SNAPSHOT_AVAILABLE_COMMAND(_, _),
#raft_state{}
) when State =:= leader; State =:= disabled ->
reply(Type, {error, rejected}),
keep_state_and_data;
%% [Handover Candidates] Non-leader nodes cannot serve handovers.
command(State, {call, From}, ?HANDOVER_CANDIDATES_COMMAND, #raft_state{}) when State =/= leader ->
{keep_state_and_data, {reply, From, {error, not_leader}}};
%% [Is Peer Ready] Non-leader nodes cannot check peer readiness.
command(State, {call, From}, ?IS_PEER_READY_COMMAND(_), #raft_state{}) when State =/= leader ->
{keep_state_and_data, {reply, From, {error, not_leader}}};
%% [Handover] Non-leader nodes cannot serve handovers.
command(State, Type, ?HANDOVER_COMMAND(_), #raft_state{}) when State =/= leader ->
reply(Type, {error, not_leader}),
keep_state_and_data;
%% [Enable] Non-disabled nodes are already enabled.
command(State, {call, From}, ?ENABLE_COMMAND, #raft_state{}) when State =/= disabled ->
{keep_state_and_data, {reply, From, {error, already_enabled}}};
%% [Disable] Any state can be disabled by setting a disable reason.
command(State, {call, From}, ?DISABLE_COMMAND(Reason), #raft_state{} = Data) ->
?SERVER_LOG_NOTICE(State, Data, "disabling due to ~0p.", [Reason]),
{next_state, disabled, Data#raft_state{disable_reason = Reason}, {reply, From, ok}};
%% [Bootstrap] Non-stalled nodes are already bootstrapped.
command(_State, {call, From}, ?BOOTSTRAP_COMMAND(_Position, _Config, _Data), #raft_state{}) ->
{keep_state_and_data, {reply, From, {error, already_bootstrapped}}};
%% [Fallback] Drop unknown command calls.
command(State, Type, Event, #raft_state{} = Data) ->
?SERVER_LOG_NOTICE(State, Data, "dropping unhandled ~0p command ~0P", [Type, Event, 20]),
keep_state_and_data.
%%------------------------------------------------------------------------------
%% RAFT Server - State Machine Implementation - Cluster Configuration Helpers
%%------------------------------------------------------------------------------
%% Return whether or not the specified peer is a member of in the provided configuration.
%% Raises an error if the membership list is missing or empty.
-spec is_member(Peer :: peer(), Config :: config()) -> boolean().
is_member(Peer, Config) ->
lists:member(Peer, config_membership(Config)).
-spec is_self(Peer :: peer(), Data :: #raft_state{}) -> boolean().
is_self({Name, Node}, #raft_state{self = #raft_identity{name = Name, node = Node}}) -> true;
is_self(_, _) -> false.
-spec is_self_member(Data :: #raft_state{}) -> boolean().
is_self_member(#raft_state{self = #raft_identity{name = Name, node = Node}} = Data) ->
is_member({Name, Node}, config(Data)).
-spec is_self_witness(Data :: #raft_state{}) -> boolean().
is_self_witness(#raft_state{self = #raft_identity{name = Name, node = Node}} = Data) ->
lists:member({Name, Node}, config_witnesses(config(Data))).
%% Returns true only if the membership of the current configuration contains exactly
%% the provided peer and that the provided peer is not specified as a witness.
-spec is_single_member(Peer :: #raft_identity{} | peer(), Config :: config()) -> IsSingleMember :: boolean().
is_single_member(#raft_identity{name = Name, node = Node}, Config) ->
is_single_member({Name, Node}, Config);
is_single_member(Peer, #{membership := Membership, witness := Witnesses}) ->
Membership =:= [Peer] andalso not lists:member(Peer, Witnesses).
-spec config_has_membership(Config :: config()) -> boolean().
config_has_membership(#{membership := Membership}) ->
Membership =/= [].
%% Get the non-empty participants list from the provided config. Falls back to the
%% membership list if the participants list is missing or empty. Raises an error if
%% both the participants and membership lists are missing or empty.
-spec config_participants(Config :: config()) -> Participants :: membership().
config_participants(#{participants := Participants}) when Participants =/= [] ->
Participants;
config_participants(Config) ->
config_membership(Config).
%% Get the non-empty membership list from the provided config. Raises an error
%% if the membership list is missing or empty.
-spec config_membership(Config :: config()) -> Membership :: membership().
config_membership(#{membership := Membership}) when Membership =/= [] ->
Membership;
config_membership(_) ->
error(membership_not_set).
-spec config_membership_size(Config :: config()) -> Size :: non_neg_integer().
config_membership_size(Config) ->
length(config_membership(Config)).
-spec config_witnesses(Config :: config()) -> Witnesses :: [peer()].
config_witnesses(#{witness := Witnesses}) ->
Witnesses.
-spec config_participant_identities(Config :: config()) -> Peers :: [#raft_identity{}].
config_participant_identities(Config) ->
[#raft_identity{name = Name, node = Node} || {Name, Node} <- config_participants(Config)].
-spec config_member_identities(Config :: config()) -> Peers :: [#raft_identity{}].
config_member_identities(Config) ->
[#raft_identity{name = Name, node = Node} || {Name, Node} <- config_membership(Config)].
-spec config_full_member_identities(Config :: config()) -> Replicas :: [#raft_identity{}].
config_full_member_identities(Config) ->
FullMembers = config_membership(Config) -- config_witnesses(Config),
[#raft_identity{name = Name, node = Node} || {Name, Node} <- FullMembers].
%% Returns the current effective RAFT configuration. This is the most recent configuration
%% stored in either the RAFT log or the RAFT storage.
-spec config(State :: #raft_state{}) -> Config :: config().
config(#raft_state{log_view = View, cached_config = {ConfigIndex, Config}}) ->
case wa_raft_log:config(View) of
{ok, LogConfigIndex, LogConfig} when LogConfigIndex > ConfigIndex ->
LogConfig;
{ok, _, _} ->
% This case will normally only occur when the log has leftover log entries from
% previous incarnations of the RAFT server that have been applied but not yet
% trimmed this incarnation.
Config;
not_found ->
Config
end;
config(#raft_state{log_view = View}) ->
case wa_raft_log:config(View) of
{ok, _, LogConfig} -> LogConfig;
not_found -> make_config()
end.
-spec config_index(State :: #raft_state{}) -> ConfigIndex :: wa_raft_log:log_index().
config_index(#raft_state{log_view = View, cached_config = {ConfigIndex, _}}) ->
case wa_raft_log:config(View) of
{ok, LogConfigIndex, _} ->
% The case where the log contains a config that is already applied generally
% only occurs after a restart as any log entries whose trim was deferred
% will become visible again.
max(LogConfigIndex, ConfigIndex);
not_found ->
ConfigIndex
end;
config_index(#raft_state{log_view = View}) ->
case wa_raft_log:config(View) of
{ok, LogConfigIndex, _} -> LogConfigIndex;
not_found -> 0
end.
%% Loads and caches the current configuration stored in the RAFT storage.
%% This configuration is used whenever there is no newer configuration
%% available in the RAFT log and so needs to be kept in sync with what
%% the RAFT server expects is in storage.
-spec load_config(State :: #raft_state{}) -> NewState :: #raft_state{}.
load_config(#raft_state{storage = Storage} = State) ->
case wa_raft_storage:config(Storage) of
{ok, #raft_log_pos{index = ConfigIndex}, Config} ->
State#raft_state{cached_config = {ConfigIndex, normalize_config(Config)}};
undefined ->
State#raft_state{cached_config = undefined};
{error, Reason} ->
error({could_not_load_config, Reason})
end.
-spec load_label_state(State :: #raft_state{}) -> LabelState :: wa_raft_label:label().
load_label_state(#raft_state{storage = Storage, label_module = LabelModule}) when LabelModule =/= undefined ->
case wa_raft_storage:label(Storage) of
{ok, Label} ->
Label;
{error, Reason} ->
error({failed_to_load_label_state, Reason})
end;
load_label_state(_) ->
undefined.
%% Add a new peer to the participants list of the provided cluster
%% configuration.
-spec config_add_participant(Peer :: peer(), Config :: config()) -> config().
config_add_participant(Peer, Config) ->
Config#{
participants => lists:umerge([Peer], config_participants(Config))
}.
%% Add a new peer to the participants and membership lists of the provided
%% cluster configuration.
-spec config_add_member(Peer :: peer(), Config :: config()) -> config().
config_add_member(Peer, Config) ->
Config#{
participants => lists:umerge([Peer], config_participants(Config)),
membership => lists:umerge([Peer], config_membership(Config))
}.
%% Add a new peer to the participants, membership, and witness lists of the
%% provided cluster configuration.
-spec config_add_witness(Peer :: peer(), Config :: config()) -> config().
config_add_witness(Peer, Config) ->
Config#{
participants => lists:umerge([Peer], config_participants(Config)),
membership => lists:umerge([Peer], config_membership(Config)),
witness => lists:umerge([Peer], config_witnesses(Config))
}.
%% Add a peer to just the witness list of the provided cluster configuration.
-spec config_add_witness_only(Peer :: peer(), Config :: config()) -> config().
config_add_witness_only(Peer, Config) ->
Config#{
witness => lists:umerge([Peer], config_witnesses(Config))
}.
%% Remove a peer from the participants, membership, and witness lists of the
%% provided cluster configuration.
-spec config_remove_participant(Peer :: peer(), Config :: config()) -> config().
config_remove_participant(Peer, Config) ->
Config#{
participants => config_participants(Config) -- [Peer],
membership => config_membership(Config) -- [Peer],
witness => config_witnesses(Config) -- [Peer]
}.
%% Remove a peer from the membership list of the provided
%% cluster configuration.
-spec config_remove_member(Peer :: peer(), Config :: config()) -> config().
config_remove_member(Peer, Config) ->
Config#{
membership => config_membership(Config) -- [Peer]
}.
%% After an apply is sent to storage, check to see if it is a new configuration
%% being applied. If it is, then update the cached configuration.
-spec maybe_update_config(
Index :: wa_raft_log:log_index(),
Term :: wa_raft_log:log_term(),
Op :: wa_raft_acceptor:command(),
State :: #raft_state{}
) -> NewState :: #raft_state{}.
maybe_update_config(Index, _, {config, Config}, State) ->
State#raft_state{cached_config = {Index, Config}};
maybe_update_config(_, _, _, State) ->
State.
%% Convert a mapping of peers to values to a list of values for all members of the
%% cluster, only including those whose value exists in the mapping.
-spec to_member_list(
Mapping :: #{node() => Value},
Config :: config()
) -> Existing :: [Value].
to_member_list(Mapping, Config) ->
[Value || {_, Node} <- config_membership(Config), {ok, Value} <- [maps:find(Node, Mapping)]].
%% Convert a mapping of peers to values to a list of values for all members of the
%% cluster, using the provided default value for any members that are not
%% represented in the mapping.
-spec to_member_list(
Mapping :: #{node() => Value},
Default :: Value,
Config :: config()
) -> Normalized :: [Value].
to_member_list(Mapping, Default, Config) ->
[maps:get(Node, Mapping, Default) || {_, Node} <- config_membership(Config)].
%%------------------------------------------------------------------------------
%% RAFT Server - State Machine Implementation - Quorum and Majority
%%------------------------------------------------------------------------------
%% Compute the quorum value of the list formed by converting the given mapping
%% of peers to values to a list of values for all members of the cluster. Members
%% that are not present in the mapping do not contribute to the quorum.
-spec compute_member_quorum(
Mapping :: #{node() => Value},
Config :: config()
) -> {quorum, Quorum :: Value} | none.
compute_member_quorum(Mapping, Config) ->
case config_has_membership(Config) of
true -> compute_quorum(to_member_list(Mapping, Config), config_membership_size(Config));
false -> none
end.
%% Returns the largest value that at least a majority of the Total nodes
%% have reached or exceeded, or 'none' if a majority of values are unknown.
-spec compute_quorum(Values :: [Value], Total :: non_neg_integer()) -> {quorum, Quorum :: Value} | none.
compute_quorum(Values, Total) when length(Values) * 2 =< Total ->
% Quorum requires a majority of values to be known.
none;
compute_quorum(Values, Total) ->
% Sort ascending and pick the element at index (length - Total div 2).
% This element has at least (Total div 2 + 1) values >= it (a majority),
% making it the largest value that a majority of nodes have reached or exceeded.
{quorum, lists:nth(length(Values) - Total div 2, lists:sort(Values))}.
-spec find_member_majority(
Mapping :: #{node() => Value},
Config :: config()
) -> {found, Majority :: Value} | none.
find_member_majority(Mapping, Config) ->
case config_has_membership(Config) of
true -> find_majority(to_member_list(Mapping, Config), config_membership_size(Config));
false -> none
end.
%% Find, if it exists, the majority value of the given list fragment. The full
%% list is assumed to have the given size. Only elements of the given list
%% fragment contribute towards a majority. A value is the majority of a list
%% if more than half of the elements of the list are the value.
-spec find_majority(Values :: [Value], Total :: non_neg_integer()) -> {found, Majority :: Value} | none.
find_majority(Values, Total) ->
find_majority_impl(Values, #{}, Total div 2 + 1).
-spec find_majority_impl(
Values :: [Value],
Counts :: #{Value => non_neg_integer()},
Threshold :: non_neg_integer()
) -> {found, Majority :: Value} | none.
find_majority_impl([], _, _) ->
none;
find_majority_impl([Value | Values], Counts, Threshold) ->
case maps:get(Value, Counts, 0) + 1 of
Threshold -> {found, Value};
Count -> find_majority_impl(Values, Counts#{Value => Count}, Threshold)
end.
%%------------------------------------------------------------------------------
%% RAFT Server - State Machine Implementation - Private Functions
%%------------------------------------------------------------------------------
-spec random_election_timeout(#raft_state{}) -> non_neg_integer().
random_election_timeout(#raft_state{application = App, table = Table}) ->
Max = ?RAFT_ELECTION_TIMEOUT_MAX(App, Table),
Min = ?RAFT_ELECTION_TIMEOUT_MIN(App, Table),
Timeout =
case Max > Min of
true -> Min + rand:uniform(Max - Min);
false -> Min
end,
case ?RAFT_ELECTION_WEIGHT(App) of
Weight when Weight > 0, Weight =< ?RAFT_ELECTION_MAX_WEIGHT ->
% higher weight, shorter timeout so it has more chances to initiate an leader election
round(Timeout * ?RAFT_ELECTION_MAX_WEIGHT div Weight);
_ ->
Timeout * ?RAFT_ELECTION_DEFAULT_WEIGHT
end.
-spec make_log_entry(Op :: wa_raft_acceptor:op(), Data :: #raft_state{}) -> {wa_raft_log:log_entry(), #raft_state{}}.
make_log_entry(Op, #raft_state{last_label = LastLabel} = Data) ->
{Entry, NewLabel} = make_log_entry_impl(Op, LastLabel, Data),
{Entry, Data#raft_state{last_label = NewLabel}}.
-spec make_log_entry_impl(
Op :: wa_raft_acceptor:op(),
Data :: #raft_state{}
) -> {wa_raft_log:log_entry(), wa_raft_label:label() | undefined}.
make_log_entry_impl(Op, #raft_state{last_label = LastLabel} = Data) ->
make_log_entry_impl(Op, LastLabel, Data).
-spec make_log_entry_impl(
Op :: wa_raft_acceptor:op(),
LastLabel :: wa_raft_label:label() | undefined,
Data :: #raft_state{}
) -> {wa_raft_log:log_entry(), wa_raft_label:label() | undefined}.
make_log_entry_impl({Key, Command}, LastLabel, #raft_state{label_module = undefined} = Data) ->
{make_log_entry_impl(Key, LastLabel, Command, Data), LastLabel};
make_log_entry_impl({Key, Command}, LastLabel, #raft_state{label_module = LabelModule} = Data) ->
NewLabel = case requires_new_label(Command) of
true -> LabelModule:new_label(LastLabel, Command);
false -> LastLabel
end,
{make_log_entry_impl(Key, NewLabel, Command, Data), NewLabel}.
-spec make_log_entry_impl(
Key :: wa_raft_acceptor:key(),
Label :: wa_raft_label:label() | undefined,
Command :: wa_raft_acceptor:command(),
Data :: #raft_state{}
) -> wa_raft_log:log_entry().
make_log_entry_impl(Key, undefined, Command, #raft_state{current_term = CurrentTerm}) ->
{CurrentTerm, {Key, Command}};
make_log_entry_impl(Key, Label, Command, #raft_state{current_term = CurrentTerm}) ->
{CurrentTerm, {Key, Label, Command}}.
-spec requires_new_label(Command :: wa_raft_acceptor:command()) -> boolean().
requires_new_label(noop) -> false;
requires_new_label({config, _}) -> false;
requires_new_label(_) -> true.
-spec apply_single_node_cluster(Data :: #raft_state{}) -> #raft_state{}.
apply_single_node_cluster(#raft_state{self = Self} = Data0) ->
case is_single_member(Self, config(Data0)) of
true ->
Data1 = commit_pending(Data0, high),
Data2 = commit_pending(Data1, low),
Data3 = leader_advance_commit_index(Data2),
Data4 = leader_apply_log(Data3),
Data5 = update_quorum_ts(Data4),
Data5;
false ->
Data0
end.
%% Check if the leader should update the recorded commit index due to an
%% advancement in the log quorum. (5.4.2)
-spec leader_advance_commit_index(Data :: #raft_state{}) -> #raft_state{}.
leader_advance_commit_index(
#raft_state{
table = Table,
log_view = View,
commit_index = CommitIndex,
match_indices = MatchIndices,
first_current_term_log_index = TermStartIndex
} = Data
) ->
LastIndex = wa_raft_log:last_index(View),
AllMatchIndices = MatchIndices#{node() => LastIndex},
case compute_member_quorum(AllMatchIndices, config(Data)) of
% Only log entries from the leader's current term can be committed
% solely by counting replicas (5.4.3).
{quorum, QuorumIndex} when QuorumIndex < TermStartIndex ->
?RAFT_COUNT(Table, 'apply.delay.old'),
?SERVER_LOG_WARNING(
leader,
Data,
"unable to establish quorum at ~0p before the start of the current term at ~0p.",
[QuorumIndex, TermStartIndex]
),
Data;
{quorum, QuorumIndex} when QuorumIndex > CommitIndex ->
Data#raft_state{commit_index = QuorumIndex};
_ ->
Data
end.
-spec leader_apply_log(Data :: #raft_state{}) -> #raft_state{}.
leader_apply_log(
#raft_state{
last_applied = LastApplied,
match_indices = MatchIndices
} = Data
) ->
TrimIndex = lists:min(to_member_list(MatchIndices#{node() => LastApplied}, 0, config(Data))),
apply_log(leader, TrimIndex, Data).
-spec apply_log(
State :: state(),
TrimIndex :: wa_raft_log:log_index() | infinity,
Data :: #raft_state{}
) -> #raft_state{}.
apply_log(
State,
TrimIndex,
#raft_state{
application = App,
table = Table,
queues = Queues,
log_view = View,
commit_index = CommitIndex,
last_applied = LastApplied
} = Data0
) when CommitIndex > LastApplied ->
StartTUsec = erlang:monotonic_time(microsecond),
case wa_raft_queue:apply_queue_full(Queues) of
false ->
% Apply a limited number of log entries (both count and total byte size limited)
LimitedIndex = min(CommitIndex, LastApplied + ?RAFT_MAX_CONSECUTIVE_APPLY_ENTRIES(App, Table)),
LimitBytes = ?RAFT_MAX_CONSECUTIVE_APPLY_BYTES(App, Table),
{ok, {_, #raft_state{log_view = View1, last_applied = NewLastApplied} = Data1}} = wa_raft_log:fold(View, LastApplied + 1, LimitedIndex, LimitBytes,
fun (Index, Size, Entry, {Index, AccData}) ->
wa_raft_queue:reserve_apply(Queues, Size),
{Index + 1, apply_op(State, Index, Size, Entry, AccData)}
end, {LastApplied + 1, Data0}),
% Perform log trimming since we've now applied some log entries, only keeping
% at maximum MaxRotateDelay log entries.
MaxRotateDelay = ?RAFT_MAX_RETAINED_ENTRIES(App, Table),
RotateIndex = max(LimitedIndex - MaxRotateDelay, min(NewLastApplied, TrimIndex)),
RotateIndex =/= infinity orelse error(bad_state),
Data2 =
case maybe_rotate_log(View1, RotateIndex, Data1) of
{ok, View2} ->
Data1#raft_state{log_view = View2};
{error, Reason} ->
?RAFT_COUNT(Table, 'log.rotate.error'),
?SERVER_LOG_WARNING(
State,
Data1,
"failed to rotate log at ~0p due to ~0P; leaving log view unchanged.",
[RotateIndex, Reason, 30]
),
Data1
end,
?RAFT_GATHER(Table, 'apply_log.latency_us', erlang:monotonic_time(microsecond) - StartTUsec),
Data2;
true ->
ApplyQueueSize = wa_raft_queue:apply_queue_size(Queues),
?RAFT_COUNT(Table, 'apply.delay'),
?RAFT_GATHER(Table, 'apply.queue', ApplyQueueSize),
?RAFT_GATHER(Table, 'apply_log.latency_us', erlang:monotonic_time(microsecond) - StartTUsec),
Data0
end;
apply_log(_, _, #raft_state{} = Data) ->
Data.
-spec maybe_rotate_log(View :: wa_raft_log:view(), RotateIndex :: wa_raft_log:log_index(), Data :: #raft_state{}) ->
{ok, wa_raft_log:view()} | {error, term()}.
maybe_rotate_log(View, _RotateIndex, #raft_state{append_in_flight = {_Ref, _WorkerPid, _MonitorRef, _Priority, _Pending, _NewLabel}}) ->
%% Async append writes the provider from a spawned process. Log rotation is
%% destructive for providers with segment rewrite/swap semantics, so defer
%% it until the in-flight append has completed and the provider is quiescent.
{ok, View};
maybe_rotate_log(View, RotateIndex, #raft_state{}) ->
wa_raft_log:rotate(View, RotateIndex).
-spec apply_op(
State :: state(),
Index :: wa_raft_log:log_index(),
Size :: non_neg_integer(),
Entry :: wa_raft_log:log_entry() | undefined,
Data :: #raft_state{}
) -> #raft_state{}.
apply_op(State, LogIndex, _, _, #raft_state{last_applied = LastApplied} = Data) when LogIndex =< LastApplied ->
?SERVER_LOG_WARNING(State, Data, "is skipping applying log entry ~0p because log entries up to ~0p are already applied.", [LogIndex, LastApplied]),
Data;
apply_op(_, Index, Size, {Term, {Key, Command}}, #raft_state{} = Data) ->
Record = {Index, {Term, {Key, undefined, Command}}},
NewData = send_op_to_storage(Index, Record, Size, Data),
maybe_update_config(Index, Term, Command, NewData);
apply_op(_, Index, Size, {Term, {_, _, Command}} = Entry, #raft_state{} = Data) ->
Record = {Index, Entry},
NewData = send_op_to_storage(Index, Record, Size, Data),
maybe_update_config(Index, Term, Command, NewData);
apply_op(State, LogIndex, _, undefined, #raft_state{table = Table, log_view = View} = Data) ->
?RAFT_COUNT(Table, 'server.missing.log.entry'),
?SERVER_LOG_ERROR(State, Data, "failed to apply ~0p because log entry is missing from log covering ~0p to ~0p.",
[LogIndex, wa_raft_log:first_index(View), wa_raft_log:last_index(View)]),
exit({invalid_op, LogIndex});
apply_op(State, LogIndex, _, Entry, #raft_state{table = Table} = Data) ->
?RAFT_COUNT(Table, 'server.corrupted.log.entry'),
?SERVER_LOG_ERROR(State, Data, "failed to apply unrecognized entry ~0P at ~0p.", [Entry, 20, LogIndex]),
exit({invalid_op, LogIndex, Entry}).
-spec send_op_to_storage(
Index :: wa_raft_log:log_index(),
Record :: wa_raft_log:log_record(),
Size :: non_neg_integer(),
Data :: #raft_state{}
) -> #raft_state{}.
send_op_to_storage(Index, Record, Size, #raft_state{storage = Storage, queued = Queued} = Data) ->
{{From, Priority}, NewQueued} =
case maps:take(Index, Queued) of
error -> {{undefined, high}, Queued};
Value -> Value
end,
wa_raft_storage:apply(Storage, From, Record, Size, Priority),
Data#raft_state{last_applied = Index, queued = NewQueued}.
%%------------------------------------------------------------------------------
%% RAFT Server - State Machine Implementation - State Management
%%------------------------------------------------------------------------------
-spec follower_or_witness(Data :: #raft_state{}) -> follower | witness.
follower_or_witness(Data) ->
case is_self_witness(Data) of
false -> follower;
true -> witness
end.
%% Setup the RAFT state upon entry into a new RAFT server state.
-spec enter_state(State :: state(), Data :: #raft_state{}) -> #raft_state{}.
enter_state(State, Data0) ->
Now = erlang:monotonic_time(millisecond),
Data1 = Data0#raft_state{state_start_ts = Now},
Data2 = set_leader_upon_entry(State, Data1),
Data3 = cancel_pending_upon_entry(State, Data2),
Data4 = update_quorum_ts(Now, Data3),
update_current_term_info(State, Data4),
update_status(State, Data4),
Data4.
%% If we are entering leader, then ensure that we are set as the current leader.
%% If we are entering any other state, then ensure that we don't have ourselves recorded as such.
-spec set_leader_upon_entry(State :: state(), Data :: #raft_state{}) -> #raft_state{}.
set_leader_upon_entry(leader, #raft_state{self = Self} = Data) ->
Data#raft_state{leader = Self};
set_leader_upon_entry(_, #raft_state{self = Self, leader = Self} = Data) ->
Data#raft_state{leader = undefined};
set_leader_upon_entry(_, #raft_state{} = Data) ->
Data.
%% If we are entering leader, then keep any pending commits buffered from candidacy.
%% If we are entering any other state, then cancel all pending commits and reads.
-spec cancel_pending_upon_entry(State :: state(), Data :: #raft_state{}) -> #raft_state{}.
cancel_pending_upon_entry(leader, Data) ->
Data;
cancel_pending_upon_entry(_, Data) ->
cancel_pending({error, not_leader}, Data).
%% Set a new current term and voted-for peer and clear any state that is associated with the previous term.
-spec advance_term(
State :: state(),
NewTerm :: wa_raft_log:log_term(),
VotedFor :: undefined | node(),
Data :: #raft_state{}
) -> #raft_state{}.
advance_term(
State,
NewTerm,
VotedFor,
#raft_state{
current_term = CurrentTerm
} = Data
) when NewTerm > CurrentTerm ->
NewData = Data#raft_state{
current_term = NewTerm,
voted_for = VotedFor,
votes = #{},
pre_vote_ref = undefined,
pre_votes = #{},
leader = undefined,
next_indices = #{},
match_indices = #{},
last_applied_indices = #{},
heartbeat_send_ts = #{},
handover = undefined
},
update_current_term_info(State, NewData),
ok = wa_raft_durable_state:store(NewData),
NewData.
%%------------------------------------------------------------------------------
%% RAFT Server - State Machine Implementation - Elections
%%------------------------------------------------------------------------------
%% Whether or not the local replica is eligible to maintain leadership.
%% Generally, this requires that the replica is a member of cluster and is
%% eligible to be leader. When check quorum is enabled, the leader must also
%% have received heartbeat responses from a quorum of followers recently.
-spec leader_eligible(Data :: #raft_state{}) -> eligible | ineligible | stale.
leader_eligible(#raft_state{application = App, table = Table, last_quorum_ts = LastQuorumTs} = Data) ->
case ?RAFT_LEADER_ELIGIBLE(App) andalso is_self_member(Data) of
false ->
ineligible;
true ->
case ?RAFT_LEADER_CHECK_QUORUM(App, Table) of
false ->
eligible;
true when LastQuorumTs =:= undefined ->
stale;
true ->
GracePeriod = ?RAFT_LIVENESS_GRACE_PERIOD_MS(App, Table),
case erlang:monotonic_time(millisecond) - LastQuorumTs =< GracePeriod of
true -> eligible;
false -> stale
end
end
end.
%% Whether or not the local replica is eligible to start elections.
%% Generally, this requires that the replica is a member of cluster, is
%% eligible to be leader and has a non-zero election weight.
-spec candidate_eligible(Data :: #raft_state{}) -> boolean().
candidate_eligible(#raft_state{application = App} = Data) ->
?RAFT_LEADER_ELIGIBLE(App) andalso is_self_member(Data) andalso ?RAFT_ELECTION_WEIGHT(App) =/= 0.
-spec candidate_start_election(
ElectionType :: election_type(),
Data :: #raft_state{}
) -> gen_statem:state_enter_result(state(), #raft_state{}).
candidate_start_election(
ElectionType,
#raft_state{
self = Self,
log_view = View,
current_term = CurrentTerm
} = Data
) ->
% Advance the term and record that we will vote for ourselves.
NewData = advance_term(candidate, CurrentTerm + 1, node(), Data),
% Determine the log index and term at which the election will occur.
LastLogIndex = wa_raft_log:last_index(View),
{ok, LastLogTerm} = wa_raft_log:term(View, LastLogIndex),
?SERVER_LOG_NOTICE(
candidate,
NewData,
"advances to new term and starts ~0p election at ~0p:~0p.",
[ElectionType, LastLogIndex, LastLogTerm]
),
% Request a vote from all peers and also vote for ourselves.
% Candidates always vote for themselves at the start of an election.
send_rpc_to_all_members(?REQUEST_VOTE(ElectionType, LastLogIndex, LastLogTerm), NewData),
send_rpc(Self, ?VOTE(true), NewData),
{keep_state, NewData, ?ELECTION_TIMEOUT(NewData)}.
%%------------------------------------------------------------------------------
%% RAFT Server - State Machine Implementation - Leader Methods
%%------------------------------------------------------------------------------
-spec append_entries_to_followers(Data :: #raft_state{}) -> #raft_state{}.
append_entries_to_followers(Data0) ->
Data1 = commit_pending(Data0, high),
Data2 = commit_pending(Data1, low),
Data3 = lists:foldl(fun heartbeat/2, Data2, config_participant_identities(config(Data2))),
Data3.
-spec has_pending_commits(Data :: #raft_state{}) -> boolean().
has_pending_commits(#raft_state{pending_high = [], pending_low = []}) ->
false;
has_pending_commits(#raft_state{}) ->
true.
-spec pending_count(Data :: #raft_state{}) -> non_neg_integer().
pending_count(#raft_state{pending_high = PendingHigh, pending_low = PendingLow}) ->
length(PendingHigh) + length(PendingLow).
-spec add_pending(
From :: gen_server:from(),
Op :: wa_raft_acceptor:op(),
Priority :: wa_raft_acceptor:priority(),
Data :: #raft_state{}
) -> #raft_state{}.
add_pending(From, Op, high, #raft_state{pending_high = PendingHigh} = Data) ->
Data#raft_state{pending_high = [{From, Op} | PendingHigh]};
add_pending(From, Op, low, #raft_state{pending_low = PendingLow} = Data) ->
Data#raft_state{pending_low = [{From, Op} | PendingLow]}.
-spec commit_pending(Data :: #raft_state{}, Priority :: wa_raft_acceptor:priority()) -> #raft_state{}.
commit_pending(#raft_state{append_in_flight = {_Ref, _WorkerPid, _MonitorRef, _AppendPriority, _Pending, _NewLabel}} = Data, _Priority) ->
Data;
commit_pending(#raft_state{pending_high = [], pending_low = [], pending_read = false} = Data, _Priority) ->
Data;
commit_pending(#raft_state{table = Table, log_view = View, pending_high = PendingHigh, pending_low = PendingLow, queued = Queued} = Data, Priority) ->
Pending =
case Priority of
high ->
PendingHigh;
low ->
PendingLow
end,
{Entries, NewLabel} = collect_pending(Data, Priority),
case Entries of
[_ | _] ->
% If we have processed at least one new log entry
% with the given priority, we try to append to the log.
case should_async_log_append(Data, Priority) of
true ->
start_async_log_append(Data, Priority, Pending, Entries, NewLabel);
false ->
case wa_raft_log:try_append(View, Entries, Priority) of
{ok, NewView} ->
% Add the newly appended log entries to the pending queue (which
% is kept in reverse order).
Last = wa_raft_log:last_index(NewView),
{_, NewQueued} =
lists:foldl(
fun ({From, _Op}, {Index, AccQueued}) ->
{Index - 1, AccQueued#{Index => {From, Priority}}}
end,
{Last, Queued},
Pending
),
% We can clear pending read flag as we've successfully added at
% least one new log entry so the leader will proceed to replicate
% and establish a new quorum is it is still up to date.
Data1 = Data#raft_state{
log_view = NewView,
last_label = NewLabel,
pending_read = false,
queued = NewQueued
},
case Priority of
high ->
Data1#raft_state{pending_high = []};
low ->
Data1#raft_state{pending_low = []}
end;
skipped ->
% Since the append failed, we do not advance to the new label.
% We cancel all pending commits and reads.
?RAFT_COUNTV(Table, {'commit.skipped', Priority}, length(Entries)),
?SERVER_LOG_WARNING(leader, Data, "skipped pre-heartbeat sync for ~0p log entr(ies).", [length(Entries)]),
cancel_pending({error, commit_stalled}, Data);
{error, Error} ->
?RAFT_COUNTV(Table, {'commit.error', Priority}, length(Entries)),
?SERVER_LOG_ERROR(leader, Data, "sync failed due to ~0P.", [Error, 20]),
error(Error)
end
end;
_ ->
Data
end.
-spec should_async_log_append(Data :: #raft_state{}, Priority :: wa_raft_acceptor:priority()) -> boolean().
should_async_log_append(
#raft_state{application = App, table = Table, self = Self, append_in_flight = undefined} = Data,
_Priority
) ->
?RAFT_ASYNC_LOG_APPEND(App, Table) andalso is_single_member(Self, config(Data));
should_async_log_append(#raft_state{}, _Priority) ->
false.
-spec start_async_log_append(
Data :: #raft_state{},
Priority :: wa_raft_acceptor:priority(),
Pending :: [{gen_server:from(), wa_raft_acceptor:op()}],
Entries :: [wa_raft_log:log_entry()],
NewLabel :: wa_raft_label:label() | undefined
) -> #raft_state{}.
start_async_log_append(#raft_state{log_view = View} = Data0, Priority, Pending, Entries, NewLabel) ->
Ref = make_ref(),
Parent = self(),
{WorkerPid, MonitorRef} = spawn_monitor(fun() ->
Result =
try wa_raft_log:try_append(View, Entries, Priority) of
AppendResult -> AppendResult
catch
Class:Reason:Stacktrace -> {error, {Class, Reason, Stacktrace}}
end,
Parent ! {async_log_append_complete, Ref, Result}
end),
clear_pending_priority(
Priority,
Data0#raft_state{
pending_read = false,
append_in_flight = {Ref, WorkerPid, MonitorRef, Priority, Pending, NewLabel}
}
).
-spec complete_async_log_append(Ref :: reference(), Result :: term(), Data :: #raft_state{}) -> #raft_state{}.
complete_async_log_append(
Ref,
{ok, NewView},
#raft_state{table = Table, queued = Queued, append_in_flight = {Ref, _WorkerPid, MonitorRef, Priority, Pending, NewLabel}} = Data
) ->
_ = erlang:demonitor(MonitorRef, [flush]),
Last = wa_raft_log:last_index(NewView),
{_, NewQueued} =
lists:foldl(
fun ({From, _Op}, {Index, AccQueued}) ->
{Index - 1, AccQueued#{Index => {From, Priority}}}
end,
{Last, Queued},
Pending
),
?RAFT_COUNTV(Table, {'commit.async_append.ok', Priority}, length(Pending)),
Data#raft_state{
log_view = NewView,
last_label = NewLabel,
queued = NewQueued,
append_in_flight = undefined
};
complete_async_log_append(
Ref,
skipped,
#raft_state{table = Table, append_in_flight = {Ref, _WorkerPid, MonitorRef, Priority, Pending, _NewLabel}} = Data
) ->
_ = erlang:demonitor(MonitorRef, [flush]),
?RAFT_COUNTV(Table, {'commit.async_append.skipped', Priority}, length(Pending)),
?SERVER_LOG_WARNING(leader, Data, "skipped async pre-heartbeat sync for ~0p log entr(ies).", [length(Pending)]),
cancel_pending({error, commit_stalled}, Data#raft_state{append_in_flight = undefined});
complete_async_log_append(
Ref,
{error, Error},
#raft_state{table = Table, append_in_flight = {Ref, _WorkerPid, MonitorRef, Priority, Pending, _NewLabel}} = Data
) ->
_ = erlang:demonitor(MonitorRef, [flush]),
?RAFT_COUNTV(Table, {'commit.async_append.error', Priority}, length(Pending)),
?SERVER_LOG_ERROR(leader, Data, "async sync failed due to ~0P.", [Error, 20]),
Data1 = cancel_async_append_pending(
{error, {commit_call_failed_after_submit, Error}},
Priority,
Pending,
Data#raft_state{append_in_flight = undefined}
),
rollback_async_append_disk(leader, Data1);
complete_async_log_append(_Ref, _Result, #raft_state{} = Data) ->
Data.
-spec complete_async_log_append_after_leadership_loss(
StateName :: state(),
Ref :: reference(),
Result :: term(),
Data :: #raft_state{}
) -> #raft_state{}.
complete_async_log_append_after_leadership_loss(
StateName,
Ref,
{ok, _NewView},
#raft_state{
table = Table,
append_in_flight = {Ref, _WorkerPid, MonitorRef, Priority, Pending, _NewLabel}
} = Data
) ->
_ = erlang:demonitor(MonitorRef, [flush]),
?RAFT_COUNTV(Table, {'commit.async_append.not_leader', Priority}, length(Pending)),
?SERVER_LOG_NOTICE(StateName, Data, "completed stale async append after leadership loss; cancelling ~0p entr(ies).", [length(Pending)]),
Data1 = cancel_async_append_pending({error, not_leader}, Priority, Pending, Data#raft_state{
append_in_flight = undefined
}),
rollback_async_append_disk(StateName, Data1);
complete_async_log_append_after_leadership_loss(
StateName,
Ref,
skipped,
#raft_state{
table = Table,
append_in_flight = {Ref, _WorkerPid, MonitorRef, Priority, Pending, _NewLabel}
} = Data
) ->
_ = erlang:demonitor(MonitorRef, [flush]),
?RAFT_COUNTV(Table, {'commit.async_append.not_leader.skipped', Priority}, length(Pending)),
?SERVER_LOG_WARNING(StateName, Data, "stale async append skipped after leadership loss for ~0p entr(ies).", [length(Pending)]),
cancel_async_append_pending({error, not_leader}, Priority, Pending, Data#raft_state{
append_in_flight = undefined
});
complete_async_log_append_after_leadership_loss(
StateName,
Ref,
{error, Error},
#raft_state{
table = Table,
append_in_flight = {Ref, _WorkerPid, MonitorRef, Priority, Pending, _NewLabel}
} = Data
) ->
_ = erlang:demonitor(MonitorRef, [flush]),
?RAFT_COUNTV(Table, {'commit.async_append.not_leader.error', Priority}, length(Pending)),
?SERVER_LOG_WARNING(StateName, Data, "stale async append failed after leadership loss due to ~0P.", [Error, 20]),
cancel_async_append_pending({error, not_leader}, Priority, Pending, Data#raft_state{
append_in_flight = undefined
});
complete_async_log_append_after_leadership_loss(_StateName, _Ref, _Result, #raft_state{} = Data) ->
Data.
-spec complete_async_log_append_down(
StateName :: state(),
MonitorRef :: reference(),
WorkerPid :: pid(),
Reason :: term(),
Data :: #raft_state{}
) -> #raft_state{}.
complete_async_log_append_down(
StateName,
MonitorRef,
WorkerPid,
Reason,
#raft_state{
table = Table,
append_in_flight = {_Ref, WorkerPid, MonitorRef, Priority, Pending, _NewLabel}
} = Data
) ->
?RAFT_COUNTV(Table, {'commit.async_append.down', Priority}, length(Pending)),
?SERVER_LOG_WARNING(StateName, Data, "async append worker exited before completion due to ~0P; cancelling ~0p entr(ies).", [Reason, 20, length(Pending)]),
Data1 = cancel_async_append_pending({error, commit_stalled}, Priority, Pending, Data#raft_state{
append_in_flight = undefined
}),
rollback_async_append_disk(StateName, Data1);
complete_async_log_append_down(_StateName, _MonitorRef, _WorkerPid, _Reason, #raft_state{} = Data) ->
Data.
-spec cancel_async_append_in_flight(
StateName :: state(),
Reason :: wa_raft_acceptor:common_error(),
Data :: #raft_state{}
) -> #raft_state{}.
cancel_async_append_in_flight(
StateName,
Reason,
#raft_state{
append_in_flight = {_Ref, WorkerPid, MonitorRef, Priority, Pending, _NewLabel}
} = Data
) ->
exit(WorkerPid, kill),
case await_async_append_worker_down(WorkerPid, MonitorRef) of
ok ->
Data1 = cancel_async_append_pending(Reason, Priority, Pending, Data#raft_state{
append_in_flight = undefined
}),
rollback_async_append_disk(StateName, Data1);
{error, WaitReason} ->
?SERVER_LOG_ERROR(StateName, Data, "failed to stop async append worker due to ~0P.", [WaitReason, 20]),
error({async_append_cancel_failed, WaitReason})
end;
cancel_async_append_in_flight(_StateName, _Reason, #raft_state{} = Data) ->
Data.
-spec await_async_append_worker_down(WorkerPid :: pid(), MonitorRef :: reference()) -> ok | {error, term()}.
await_async_append_worker_down(WorkerPid, MonitorRef) ->
receive
{'DOWN', MonitorRef, process, WorkerPid, _Reason} ->
ok
after 5000 ->
_ = erlang:demonitor(MonitorRef, [flush]),
{error, async_append_worker_kill_timeout}
end.
-spec rollback_async_append_disk(StateName :: state(), Data :: #raft_state{}) -> #raft_state{}.
rollback_async_append_disk(StateName, #raft_state{table = Table, log_view = View} = Data) ->
%% The async worker writes the provider before the Raft server advances
%% log_view. If that append is later cancelled, persisted bytes beyond this
%% view must be removed or restart can replay an unacknowledged command.
TruncateIndex = wa_raft_log:last_index(View) + 1,
case close_async_append_owner_writers(View) of
ok ->
case wa_raft_log:truncate(View, TruncateIndex) of
{ok, NewView} ->
?RAFT_COUNT(Table, 'commit.async_append.rollback.ok'),
Data#raft_state{log_view = NewView};
{error, Reason} ->
?RAFT_COUNT(Table, 'commit.async_append.rollback.error'),
?SERVER_LOG_ERROR(StateName, Data, "failed to rollback cancelled async append at ~0p due to ~0P.", [TruncateIndex, Reason, 30]),
error({async_append_rollback_failed, Reason})
end;
{error, Reason} ->
?RAFT_COUNT(Table, 'commit.async_append.rollback.error'),
?SERVER_LOG_ERROR(StateName, Data, "failed to close caller-owned writers before async append rollback due to ~0P.", [Reason, 30]),
error({async_append_rollback_failed, Reason})
end.
-spec close_async_append_owner_writers(View :: wa_raft_log:view()) -> ok | {error, term()}.
close_async_append_owner_writers(View) ->
Provider = wa_raft_log:provider(View),
case erlang:function_exported(Provider, close_process_writers, 1) of
true -> Provider:close_process_writers(wa_raft_log:log(View));
false -> ok
end.
-spec cancel_async_append_pending(
Reason :: async_append_cancel_reason(),
Priority :: wa_raft_acceptor:priority(),
Pending :: [{gen_server:from(), wa_raft_acceptor:op()}],
Data :: #raft_state{}
) -> #raft_state{}.
cancel_async_append_pending(Reason, Priority, Pending, #raft_state{queues = Queues} = Data) ->
[
wa_raft_queue:commit_cancelled(Queues, From, Reason, Priority)
|| {From, _Op} <- lists:reverse(Pending)
],
Data.
-spec clear_pending_priority(Priority :: wa_raft_acceptor:priority(), Data :: #raft_state{}) -> #raft_state{}.
clear_pending_priority(high, #raft_state{} = Data) ->
Data#raft_state{pending_high = []};
clear_pending_priority(low, #raft_state{} = Data) ->
Data#raft_state{pending_low = []}.
-spec collect_pending(Data :: #raft_state{}, Priority :: wa_raft_acceptor:priority()) -> {[wa_raft_log:log_entry()], wa_raft_label:label() | undefined}.
collect_pending(#raft_state{pending_high = [], pending_low = [], pending_read = true} = Data, _Priority) ->
% If the pending queues are empty, then we have at least one pending
% read but no commits to go along with it.
{ReadEntry, NewLabel} = make_log_entry_impl({?READ_OP, noop}, Data),
{[ReadEntry], NewLabel};
collect_pending(#raft_state{last_label = LastLabel, pending_high = PendingHigh, pending_low = PendingLow} = Data, Priority) ->
% Otherwise, the pending queues are kept in reverse order so fold
% from the right to ensure that the log entries are labeled
% in the correct order.
Pending =
case Priority of
high ->
PendingHigh;
low ->
PendingLow
end,
{Entries, NewLabel} = lists:foldr(
fun ({_, Op}, {AccEntries, AccLabel}) ->
{Entry, NewAccLabel} = make_log_entry_impl(Op, AccLabel, Data),
{[Entry | AccEntries], NewAccLabel}
end,
{[], LastLabel},
Pending
),
{lists:reverse(Entries), NewLabel}.
-spec cancel_pending(Reason :: wa_raft_acceptor:common_error(), Data :: #raft_state{}) -> #raft_state{}.
cancel_pending(
Reason,
#raft_state{
queues = Queues,
storage = Storage,
pending_high = PendingHigh,
pending_low = PendingLow,
pending_read = PendingRead
} = Data
) ->
% Pending commits are kept in reverse order. Pending reads are also cancelled.
PendingHigh =/= [] andalso
[wa_raft_queue:commit_cancelled(Queues, From, Reason, high) || {From, _Op} <- lists:reverse(PendingHigh)],
PendingLow =/= [] andalso
[wa_raft_queue:commit_cancelled(Queues, From, Reason, low) || {From, _Op} <- lists:reverse(PendingLow)],
PendingRead andalso
wa_raft_storage:cancel_reads(Storage, Reason),
Data#raft_state{pending_high = [], pending_low = [], pending_read = false}.
%% Cancel all pending and queued commits and reads with the given error reason.
-spec cancel_pending_and_queued(Reason :: wa_raft_acceptor:common_error(), Data :: #raft_state{}) -> #raft_state{}.
cancel_pending_and_queued(Reason, #raft_state{queues = Queues, queued = Queued} = Data) ->
NewData = cancel_pending(Reason, Data),
[wa_raft_queue:commit_cancelled(Queues, From, Reason, Priority) || _ := {From, Priority} <- maps:iterator(Queued, ordered)],
NewData#raft_state{queued = #{}}.
-spec heartbeat(Peer :: #raft_identity{}, State :: #raft_state{}) -> #raft_state{}.
heartbeat(Self, #raft_state{self = Self} = Data) ->
% Skip sending heartbeat to self
Data;
heartbeat(
?IDENTITY_REQUIRES_MIGRATION(_, FollowerId) = Sender,
#raft_state{
application = App,
table = Table,
name = Name,
log_view = View,
commit_index = CommitIndex,
next_indices = NextIndices,
match_indices = MatchIndices,
heartbeat_send_ts = HeartbeatSendTs,
first_current_term_log_index = TermStartIndex
} = State0
) ->
FollowerNextIndex = maps:get(FollowerId, NextIndices, TermStartIndex),
PrevLogIndex = FollowerNextIndex - 1,
PrevLogTermRes = wa_raft_log:term(View, PrevLogIndex),
FollowerMatchIndex = maps:get(FollowerId, MatchIndices, 0),
FollowerMatchIndex =/= 0 andalso
?RAFT_GATHER(Table, 'leader.follower.lag', CommitIndex - FollowerMatchIndex),
NowTs = erlang:monotonic_time(millisecond),
LastFollowerHeartbeatSendTs = maps:get(FollowerId, HeartbeatSendTs, undefined),
State1 = State0#raft_state{heartbeat_send_ts = HeartbeatSendTs#{FollowerId => NowTs}},
LastIndex = wa_raft_log:last_index(View),
case PrevLogTermRes of %% no log entry to replicate
not_found ->
?RAFT_COUNT(Table, 'leader.heartbeat.not_ready'),
?SERVER_LOG_DEBUG(leader, State1, "at ~0p sends empty heartbeat to follower ~0p at ~0p.",
[CommitIndex, FollowerId, FollowerNextIndex]),
% Send append entries request.
{ok, LastTerm} = wa_raft_log:term(View, LastIndex),
send_rpc(Sender, ?APPEND_ENTRIES(LastIndex, LastTerm, [], CommitIndex, 0), State1),
LastFollowerHeartbeatSendTs =/= undefined andalso
?RAFT_GATHER(Table, 'leader.heartbeat.interval_ms', erlang:monotonic_time(millisecond) - LastFollowerHeartbeatSendTs),
State1;
_ ->
MaxLogEntries = ?RAFT_HEARTBEAT_MAX_ENTRIES(App, Table),
MaxHeartbeatSize = ?RAFT_HEARTBEAT_MAX_BYTES(App, Table),
Witnesses = config_witnesses(config(State0)),
Entries = case lists:member({Name, FollowerId}, Witnesses) of
true ->
{ok, RawEntries} = wa_raft_log:get(View, FollowerNextIndex, MaxLogEntries, MaxHeartbeatSize),
stub_entries_for_witness(RawEntries);
false ->
{ok, RawEntries} = wa_raft_log:entries(View, FollowerNextIndex, MaxLogEntries, MaxHeartbeatSize),
RawEntries
end,
{ok, PrevLogTerm} = PrevLogTermRes,
% track when we send out a heartbeat that is empty but also not at the end of the log
Entries =:= [] andalso PrevLogIndex =/= LastIndex andalso ?RAFT_COUNT(Table, 'leader.heartbeat.empty'),
?RAFT_GATHER(Table, 'leader.heartbeat.size', length(Entries)),
?SERVER_LOG_DEBUG(leader, State1, "at ~0p sends heartbeat to follower ~0p at ~0p with ~0p entr(ies).",
[CommitIndex, FollowerId, FollowerNextIndex, length(Entries)]),
% Compute trim index.
TrimIndex = lists:min(to_member_list(MatchIndices#{node() => LastIndex}, 0, config(State1))),
% Send append entries request.
CastResult = send_rpc(Sender, ?APPEND_ENTRIES(PrevLogIndex, PrevLogTerm, Entries, CommitIndex, TrimIndex), State1),
NewNextIndices =
case CastResult of
ok ->
% pipelining - move NextIndex after sending out logs. If a packet is lost, follower's AppendEntriesResponse
% will return send back its correct index
NextIndices#{FollowerId => PrevLogIndex + length(Entries) + 1};
_ ->
NextIndices
end,
LastFollowerHeartbeatSendTs =/= undefined andalso
?RAFT_GATHER(Table, 'leader.heartbeat.interval_ms', erlang:monotonic_time(millisecond) - LastFollowerHeartbeatSendTs),
State1#raft_state{next_indices = NewNextIndices}
end.
-spec stub_entries_for_witness([wa_raft_log:log_entry() | binary()]) -> [wa_raft_log:log_entry() | binary()].
stub_entries_for_witness(Entries) ->
[stub_entry(E) || E <- Entries].
-spec stub_entry(wa_raft_log:log_entry() | binary()) -> wa_raft_log:log_entry() | binary().
stub_entry(Binary) when is_binary(Binary) ->
Binary;
stub_entry({Term, undefined}) ->
{Term, undefined};
stub_entry({Term, {Key, Cmd}}) ->
{Term, {Key, stub_command(Cmd)}};
stub_entry({Term, {Key, Label, Cmd}}) ->
{Term, {Key, Label, stub_command(Cmd)}}.
-spec stub_command(wa_raft_acceptor:command()) -> wa_raft_acceptor:command().
stub_command(noop) -> noop;
stub_command({config, _} = ConfigCmd) -> ConfigCmd;
stub_command(_) -> noop_omitted.
-spec get_handover_eligibility_match_cutoff(State :: #raft_state{}) -> wa_raft_log:log_index().
get_handover_eligibility_match_cutoff(#raft_state{application = App, table = Table, log_view = View}) ->
wa_raft_log:last_index(View) - ?RAFT_HANDOVER_MAX_ENTRIES(App, Table).
-spec get_handover_eligibility_apply_cutoff(State :: #raft_state{}) -> wa_raft_log:log_index().
get_handover_eligibility_apply_cutoff(#raft_state{application = App, table = Table, log_view = View}) ->
LastIndex = wa_raft_log:last_index(View),
case ?RAFT_HANDOVER_MAX_UNAPPLIED_ENTRIES(App, Table) of
undefined -> LastIndex - ?RAFT_HANDOVER_MAX_ENTRIES(App, Table);
Limit -> LastIndex - Limit
end.
-spec get_handover_candidates(State :: #raft_state{}) -> [node()].
get_handover_candidates(State) ->
Replicas = config_full_member_identities(config(State)),
MatchCutoffIndex = get_handover_eligibility_match_cutoff(State),
ApplyCutoffIndex = get_handover_eligibility_apply_cutoff(State),
[Peer || ?IDENTITY_REQUIRES_MIGRATION(_, Peer) <- Replicas,
Peer =/= node(),
is_eligible_for_handover_impl(Peer, MatchCutoffIndex, ApplyCutoffIndex, State)].
-spec is_eligible_for_handover(Peer :: peer(), Data :: #raft_state{}) -> boolean().
is_eligible_for_handover({_, Node}, Data) ->
MatchCutoffIndex = get_handover_eligibility_match_cutoff(Data),
ApplyCutoffIndex = get_handover_eligibility_apply_cutoff(Data),
is_eligible_for_handover_impl(Node, MatchCutoffIndex, ApplyCutoffIndex, Data).
-spec is_eligible_for_handover_impl(
Node :: node(),
MatchCutoffIndex :: wa_raft_log:log_index(),
ApplyCutoffIndex :: wa_raft_log:log_index(),
Data :: #raft_state{}
) -> boolean().
is_eligible_for_handover_impl(
Node,
MatchCutoffIndex,
ApplyCutoffIndex,
#raft_state{
match_indices = MatchIndices,
last_applied_indices = LastAppliedIndices
}
) ->
% A peer whose matching index or last applied index is unknown should not be eligible for handovers.
case {maps:find(Node, MatchIndices), maps:find(Node, LastAppliedIndices)} of
{{ok, MatchIndex}, {ok, LastAppliedIndex}} ->
MatchIndex >= MatchCutoffIndex andalso LastAppliedIndex >= ApplyCutoffIndex;
_ ->
false
end.
%%------------------------------------------------------------------------------
%% RAFT Server - State Machine Implementation - Configuration Changes
%%------------------------------------------------------------------------------
-spec leader_config_change_allowed(
Index :: wa_raft_log:log_index() | undefined,
State :: #raft_state{}
) -> ok | {error, Reason :: term()}.
leader_config_change_allowed(
Index,
#raft_state{
log_view = View,
commit_index = CommitIndex,
last_applied = LastApplied,
cached_config = {CachedConfigIndex, _},
first_current_term_log_index = TermStartIndex
} = State
) ->
% A leader must establish quorum on at least one log entry in the current
% term because it is ready for a configuration change.
QuorumReady = CommitIndex >= TermStartIndex,
% No two configuration changes can be in the log at the same time
% and both be not yet committed.
ConfigIndex = case wa_raft_log:config(View) of
{ok, LogConfigIndex, _} -> erlang:max(CachedConfigIndex, LogConfigIndex);
_ -> CachedConfigIndex
end,
if
not QuorumReady ->
?SERVER_LOG_NOTICE(
leader,
State,
"at ~0p is not ready for a new configuration because it has not established a quorum in the current term.",
[CommitIndex]
),
{error, no_quorum};
ConfigIndex > LastApplied ->
?SERVER_LOG_NOTICE(
leader,
State,
"at ~0p is not ready for a new configuration because a new configuration is not yet committed at ~0p.",
[CommitIndex, ConfigIndex]
),
{error, not_ready};
Index =:= undefined ->
ok;
Index =/= ConfigIndex ->
?SERVER_LOG_NOTICE(
leader,
State,
"refuses a new configuration because the configuration at ~0p is not at the required ~0p",
[ConfigIndex, Index]
),
{error, config_index_mismatch};
true ->
ok
end.
-spec leader_change_config(NewConfig :: wa_raft_server:config(), From :: undefined | gen_server:from(), State :: #raft_state{}) ->
{ok, NewConfigPosition :: wa_raft_log:log_pos(), NewState :: #raft_state{}} | {error, Reason :: term()}.
leader_change_config(NewConfig, From, #raft_state{log_view = View, current_term = CurrentTerm} = State0) ->
{LogEntry, State1} = make_log_entry({make_ref(), {config, NewConfig}}, State0),
case wa_raft_log:try_append(View, [LogEntry]) of
{ok, NewView} ->
NewConfigIndex = wa_raft_log:last_index(NewView),
NewConfigPosition = #raft_log_pos{index = NewConfigIndex, term = CurrentTerm},
State2 = State1#raft_state{log_view = NewView},
% When initiated from the commit flow (via wa_raft_acceptor), From is
% non-undefined and needs to be stored in the queued map so that the
% storage can reply to the caller when the config change is applied.
State3 = case From of
undefined ->
State2;
_ ->
Queued = State2#raft_state.queued,
State2#raft_state{queued = Queued#{NewConfigIndex => {From, high}}}
end,
{ok, NewConfigPosition, State3};
skipped ->
{error, commit_stalled};
{error, Reason} ->
{error, Reason}
end.
-spec leader_adjust_config(Action :: config_action(), State :: #raft_state{}) ->
{ok, NewConfig :: config()} | {error, Reason :: atom()}.
leader_adjust_config(refresh, Data) ->
{ok, config(Data)};
leader_adjust_config({Action, Peer}, Data) ->
Config = config(Data),
IsSelf = is_self(Peer, Data),
IsReady = is_eligible_for_handover(Peer, Data),
IsMember = lists:member(Peer, config_membership(Config)),
IsWitness = lists:member(Peer, config_witnesses(Config)),
IsParticipant = lists:member(Peer, config_participants(Config)),
case Action of
add ->
if
IsMember -> {error, already_member};
IsWitness -> {error, already_witness};
true -> {ok, config_add_member(Peer, Config)}
end;
add_witness ->
if
IsMember andalso IsWitness -> {error, already_witness};
IsMember -> {error, already_member};
IsParticipant andalso not IsWitness -> {error, not_a_witness};
true -> {ok, config_add_witness(Peer, Config)}
end;
add_participant ->
if
IsMember -> {error, already_member};
IsWitness -> {error, already_witness};
IsParticipant -> {error, already_participating};
true -> {ok, config_add_participant(Peer, Config)}
end;
promote_participant_if_ready ->
if
IsMember -> {error, already_member};
IsWitness -> {error, already_witness};
not IsParticipant -> {error, not_a_participant};
not IsReady -> {error, not_ready};
true -> {ok, config_add_member(Peer, Config)}
end;
remove ->
if
IsSelf -> {error, cannot_remove_self};
not IsParticipant -> {error, not_a_participant};
true -> {ok, config_remove_participant(Peer, Config)}
end;
remove_witness ->
if
IsSelf -> {error, cannot_remove_self};
not IsWitness -> {error, not_a_witness};
true -> {ok, config_remove_participant(Peer, Config)}
end;
remove_membership ->
if
IsSelf -> {error, cannot_remove_self};
not IsMember -> {error, not_a_member};
true -> {ok, config_remove_member(Peer, Config)}
end;
demote_to_witness ->
if
IsSelf -> {error, cannot_demote_self};
not IsParticipant -> {error, not_a_participant};
IsWitness -> {error, already_witness};
true -> {ok, config_add_witness_only(Peer, Config)}
end
end.
%%------------------------------------------------------------------------------
%% RAFT Server - State Machine Implementation - Log State
%%------------------------------------------------------------------------------
-spec reset_log(Position :: wa_raft_log:log_pos(), Data :: #raft_state{}) -> #raft_state{}.
reset_log(#raft_log_pos{index = Index} = Position, #raft_state{queues = Queues, log_view = View, queued = Queued} = Data) ->
{ok, NewView} = wa_raft_log:reset(View, Position),
[wa_raft_queue:commit_cancelled(Queues, From, {error, cancelled}, Priority) || _ := {From, Priority} <- maps:iterator(Queued, ordered)],
NewData = Data#raft_state{
log_view = NewView,
last_applied = Index,
commit_index = Index,
queued = #{}
},
load_config(NewData).
%%------------------------------------------------------------------------------
%% RAFT Server - State Machine Implementation - Snapshots
%%------------------------------------------------------------------------------
-spec open_snapshot(Root :: string(), Position :: wa_raft_log:log_pos(), Data :: #raft_state{}) -> #raft_state{}.
open_snapshot(Root, Position, #raft_state{storage = Storage} = Data) ->
ok = wa_raft_storage:open_snapshot(Storage, Root, Position),
reset_log(Position, Data).
%%------------------------------------------------------------------------------
%% RAFT Server - State Machine Implementation - Heartbeat
%%------------------------------------------------------------------------------
%% Attempt to append the log entries declared by a leader in a heartbeat,
%% apply committed but not yet applied log entries, trim the log, and reset
%% the election timeout timer as necessary.
-spec handle_heartbeat(
State :: state(),
Event :: gen_statem:event_type(),
Leader :: #raft_identity{},
PrevLogIndex :: wa_raft_log:log_index(),
PrevLogTerm :: wa_raft_log:log_term(),
Entries :: [wa_raft_log:log_entry() | binary()],
CommitIndex :: wa_raft_log:log_index(),
TrimIndex :: wa_raft_log:log_index(),
Data :: #raft_state{}
) -> gen_statem:event_handler_result(state(), #raft_state{}).
handle_heartbeat(
State,
_Event,
Leader,
PrevLogIndex,
PrevLogTerm,
Entries,
LeaderCommitIndex,
TrimIndex,
#raft_state{
application = App,
table = Table,
queues = Queues,
log_view = View,
commit_index = CommitIndex,
last_applied = LastApplied
} = Data0
) ->
EntryCount = length(Entries),
?RAFT_GATHER(Table, {'heartbeat.size', State}, EntryCount),
EntryCount =/= 0 andalso
?SERVER_LOG_DEBUG(State, Data0, "considering appending ~0p log entries in range ~0p to ~0p to log ending at ~0p.",
[EntryCount, PrevLogIndex + 1, PrevLogIndex + EntryCount, wa_raft_log:last_index(View)]),
case append_entries(State, PrevLogIndex, PrevLogTerm, Entries, EntryCount, Data0) of
{ok, Accepted, NewMatchIndex, Data1} ->
AdjustedLastApplied = max(0, LastApplied - wa_raft_queue:apply_queue_size(Queues)),
send_rpc(Leader, ?APPEND_ENTRIES_RESPONSE(PrevLogIndex, Accepted, NewMatchIndex, AdjustedLastApplied), Data1),
Data2 = Data1#raft_state{
last_quorum_ts = erlang:monotonic_time(millisecond),
leader_commit_index = LeaderCommitIndex
},
Data3 = case Accepted of
true ->
LocalTrimIndex = case ?RAFT_LOG_ROTATION_BY_TRIM_INDEX(App, Table) of
true -> TrimIndex;
false -> infinity
end,
NewCommitIndex = max(CommitIndex, min(LeaderCommitIndex, NewMatchIndex)),
apply_log(State, LocalTrimIndex, Data2#raft_state{commit_index = NewCommitIndex});
_ ->
Data2
end,
update_status(State, Data3),
case follower_or_witness(Data3) of
State -> {keep_state, Data3, ?ELECTION_TIMEOUT(Data3)};
NewState -> {next_state, NewState, Data3}
end;
{fatal, Reason} ->
{next_state, disabled, Data0#raft_state{disable_reason = Reason}}
end.
%% Append the provided range of the log entries to the local log only if the
%% term of the previous log matches the term stored by the local log,
%% otherwise, truncate the log if the term does not match or do nothing if
%% the previous log entry is not available locally. If an unrecoverable error
%% is encountered, returns a diagnostic that can be used as a reason to
%% disable the current replica.
-spec append_entries(
State :: state(),
PrevLogIndex :: wa_raft_log:log_index(),
PrevLogTerm :: wa_raft_log:log_term(),
Entries :: [wa_raft_log:log_entry() | binary()],
EntryCount :: non_neg_integer(),
Data :: #raft_state{}
) -> {ok, Accepted :: boolean(), NewMatchIndex :: wa_raft_log:log_index(), NewData :: #raft_state{}} | {fatal, Reason :: term()}.
append_entries(
State,
PrevLogIndex,
PrevLogTerm,
Entries,
EntryCount,
#raft_state{
log_view = View0,
table = Table,
queues = Queues,
commit_index = CommitIndex,
current_term = CurrentTerm,
leader = Leader,
queued = Queued
} = Data
) ->
% Compare the incoming heartbeat with the local log to determine what
% actions need to be taken as part of handling this heartbeat.
case wa_raft_log:check_heartbeat(View0, PrevLogIndex, [{PrevLogTerm, undefined} | Entries]) of
{ok, []} ->
% No append is required as all the log entries in the heartbeat
% are already in the local log.
{ok, true, PrevLogIndex + EntryCount, Data};
{ok, NewEntries} ->
% No conflicting log entries were found in the heartbeat, but the
% heartbeat does contain new log entries to be appended to the end
% of the log.
case wa_raft_log:try_append(View0, NewEntries) of
{ok, View1} ->
{ok, true, PrevLogIndex + EntryCount, Data#raft_state{log_view = View1}};
skipped ->
NewCount = length(NewEntries),
Last = wa_raft_log:last_index(View0),
?SERVER_LOG_WARNING(
State,
Data,
"is not ready to append ~0p log entries in range ~0p to ~0p to log ending at ~0p.",
[NewCount, Last + 1, Last + NewCount, Last]
),
{ok, false, Last, Data}
end;
{conflict, ConflictIndex, [ConflictEntry | _]} when ConflictIndex =< CommitIndex ->
% A conflict is detected that would result in the truncation of a
% log entry that the local replica has committed. We cannot validly
% delete log entries that are already committed because doing so
% may potentially cause the log entry to be no longer present on a
% majority of replicas.
{ok, LocalTerm} = wa_raft_log:term(View0, ConflictIndex),
case decode_conflict_entry(ConflictEntry) of
{ok, {ConflictTerm, _}} ->
?RAFT_COUNT(Table, {'heartbeat.error.corruption.excessive_truncation', State}),
?SERVER_LOG_WARNING(
State,
Data,
"refuses heartbeat at ~0p to ~0p that requires truncation past ~0p (term ~0p vs ~0p) when log entries up to ~0p are already committed.",
[PrevLogIndex, PrevLogIndex + EntryCount, ConflictIndex, ConflictTerm, LocalTerm, CommitIndex]
),
Fatal = io_lib:format(
"A heartbeat at ~0p to ~0p from ~0p in term ~0p required truncating past ~0p (term ~0p vs ~0p) when log entries up to ~0p were already committed.",
[PrevLogIndex, PrevLogIndex + EntryCount, identity_to_server(Leader), CurrentTerm, ConflictIndex, ConflictTerm, LocalTerm, CommitIndex]
),
{fatal, lists:flatten(Fatal)};
{error, Reason} ->
?RAFT_COUNT(Table, {'heartbeat.skip.error', State}),
?SERVER_LOG_WARNING(
State,
Data,
"refuses heartbeat at ~0p to ~0p with malformed conflict entry due to ~0P",
[PrevLogIndex, PrevLogIndex + EntryCount, Reason, 30]
),
{ok, false, wa_raft_log:last_index(View0), Data}
end;
{conflict, ConflictIndex, NewEntries} when ConflictIndex >= PrevLogIndex ->
% A truncation is required as there is a conflict between the local
% log and the incoming heartbeat.
Last = wa_raft_log:last_index(View0),
?SERVER_LOG_NOTICE(
State,
Data,
"handling heartbeat at ~0p by truncating local log ending at ~0p to past ~0p.",
[PrevLogIndex, Last, ConflictIndex]
),
case wa_raft_log:truncate(View0, ConflictIndex) of
{ok, View1} ->
NewQueued = cancel_queued(Queues, ConflictIndex, Last, {error, not_leader}, Queued),
case ConflictIndex =:= PrevLogIndex of
true ->
% If the conflict precedes the heartbeat's log
% entries then no append can be performed.
{ok, false, wa_raft_log:last_index(View1), Data#raft_state{log_view = View1, queued = NewQueued}};
false ->
% Otherwise, we can replace the truncated log
% entries with those from the current heartbeat.
case wa_raft_log:try_append(View1, NewEntries) of
{ok, View2} ->
{ok, true, PrevLogIndex + EntryCount, Data#raft_state{log_view = View2, queued = NewQueued}};
skipped ->
NewCount = length(NewEntries),
NewLast = wa_raft_log:last_index(View1),
?SERVER_LOG_WARNING(
State,
Data,
"is not ready to append ~0p log entries in range ~0p to ~0p to log ending at ~0p.",
[NewCount, NewLast + 1, NewLast + NewCount, NewLast]
),
{ok, false, NewLast, Data}
end
end;
{error, Reason} ->
?RAFT_COUNT(Table, {'heartbeat.truncate.error', State}),
?SERVER_LOG_WARNING(
State,
Data,
"fails to truncate past ~0p while handling heartbeat at ~0p to ~0p due to ~0P",
[ConflictIndex, PrevLogIndex, PrevLogIndex + EntryCount, Reason, 30]
),
{ok, false, wa_raft_log:last_index(View0), Data}
end;
{invalid, out_of_range} ->
% If the heartbeat is out of range (generally past the end of the
% log) then ignore and notify the leader of what log entry is
% required by this replica.
?RAFT_COUNT(Table, {'heartbeat.skip.out_of_range', State}),
EntryCount =/= 0 andalso
?SERVER_LOG_WARNING(
State,
Data,
"refuses out of range heartbeat at ~0p to ~0p with local log covering ~0p to ~0p.",
[PrevLogIndex, PrevLogIndex + EntryCount, wa_raft_log:first_index(View0), wa_raft_log:last_index(View0)]
),
{ok, false, wa_raft_log:last_index(View0), Data};
{error, Reason} ->
?RAFT_COUNT(Table, {'heartbeat.skip.error', State}),
?SERVER_LOG_WARNING(
State,
Data,
"fails to check heartbeat at ~0p to ~0p for validity due to ~0P",
[PrevLogIndex, PrevLogIndex + EntryCount, Reason, 30]
),
{ok, false, wa_raft_log:last_index(View0), Data}
end.
-spec decode_conflict_entry(Entry :: wa_raft_log:log_entry() | binary()) ->
{ok, wa_raft_log:log_entry()} | {error, term()}.
decode_conflict_entry(Entry) when is_binary(Entry) ->
try binary_to_term(Entry, [safe]) of
Decoded -> {ok, Decoded}
catch
_:Reason -> {error, {bad_entry_term, Reason}}
end;
decode_conflict_entry(Entry) ->
{ok, Entry}.
-spec cancel_queued(
Queues :: wa_raft_queue:queues(),
Start :: wa_raft_log:log_index(),
End :: wa_raft_log:log_index(),
Reason :: wa_raft_acceptor:commit_error() | undefined,
Queued :: #{wa_raft_log:log_index() => {gen_server:from(), wa_raft_acceptor:priority()}}
) -> #{wa_raft_log:log_index() => {gen_server:from(), wa_raft_acceptor:priority()}}.
cancel_queued(_, Start, End, _, Queued) when Start > End; Queued =:= #{} ->
Queued;
cancel_queued(Queues, Start, End, Reason, Queued) ->
case maps:take(Start, Queued) of
{{From, Priority}, NewQueued} ->
wa_raft_queue:commit_cancelled(Queues, From, Reason, Priority),
cancel_queued(Queues, Start + 1, End, Reason, NewQueued);
error ->
cancel_queued(Queues, Start + 1, End, Reason, Queued)
end.
%%------------------------------------------------------------------------------
%% RAFT Server - State Machine Implementation - Vote Requests
%%------------------------------------------------------------------------------
%% [RequestPreVote RPC]
-spec handle_request_pre_vote(
Candidate :: #raft_identity{},
Ref :: reference(),
Data :: #raft_state{}
) -> gen_statem:event_handler_result(state(), #raft_state{}).
handle_request_pre_vote(
Candidate,
Ref,
#raft_state{log_view = View} = Data
) ->
Index = wa_raft_log:last_index(View),
{ok, Term} = wa_raft_log:term(View, Index),
{Missing, _, _} = is_leader_missing(Data),
send_rpc(Candidate, ?PRE_VOTE(Ref, Missing, Index, Term), Data),
keep_state_and_data.
%% [RequestVote RPC]
-spec handle_request_vote(
State :: state(),
Candidate :: #raft_identity{},
CandidateIndex :: wa_raft_log:log_index(),
CandidateTerm :: wa_raft_log:log_term(),
Data :: #raft_state{}
) -> gen_statem:event_handler_result(state(), #raft_state{}).
%% A replica with an available vote in the current term should allocate its vote
%% if the candidate's log is at least as up-to-date as the local log. (5.4.1)
handle_request_vote(
State,
?IDENTITY_REQUIRES_MIGRATION(_, CandidateId) = Candidate,
CandidateIndex,
CandidateTerm,
#raft_state{
log_view = View,
voted_for = VotedFor
} = Data
) when VotedFor =:= undefined; VotedFor =:= CandidateId ->
Index = wa_raft_log:last_index(View),
{ok, Term} = wa_raft_log:term(View, Index),
case {CandidateTerm, CandidateIndex} >= {Term, Index} of
true ->
?SERVER_LOG_NOTICE(
State,
Data,
"decides to vote for candidate ~0p with up-to-date log at ~0p:~0p versus local log at ~0p:~0p.",
[Candidate, CandidateIndex, CandidateTerm, Index, Term]
),
case VotedFor of
undefined ->
% If this vote request causes the current replica to allocate its vote, then
% persist the vote before responding. (Fig. 2)
NewData = Data#raft_state{voted_for = CandidateId},
wa_raft_durable_state:store(NewData),
send_rpc(Candidate, ?VOTE(true), NewData),
{keep_state, NewData};
CandidateId ->
% Otherwise, the vote allocation did not change, so just send the response.
send_rpc(Candidate, ?VOTE(true), Data),
keep_state_and_data
end;
false ->
?SERVER_LOG_NOTICE(
State,
Data,
"refuses to vote for candidate ~0p with outdated log at ~0p:~0p versus local log at ~0p:~0p.",
[Candidate, CandidateIndex, CandidateTerm, Index, Term]
),
send_rpc(Candidate, ?VOTE(false), Data),
keep_state_and_data
end;
%% A replica that was already allocated its vote to a specific candidate in the
%% current term should ignore vote requests from other candidates. (5.4.1)
handle_request_vote(State, Candidate, _, _, #raft_state{voted_for = VotedFor} = Data) ->
?SERVER_LOG_NOTICE(
State,
Data,
"refusing to vote for candidate ~0p after previously voting for candidate ~0p in the current term.",
[Candidate, VotedFor]
),
send_rpc(Candidate, ?VOTE(false), Data),
keep_state_and_data.
%%------------------------------------------------------------------------------
%% RAFT Server - State Machine Implementation - Helpers
%%------------------------------------------------------------------------------
%% Generic reply function for non-RPC requests that operates based on event type.
-spec reply(Type :: enter | gen_statem:event_type(), Message :: term()) -> ok | {error, Reason :: term()}.
reply(cast, _) ->
ok;
reply({call, From}, Message) ->
gen_statem:reply(From, Message);
reply(Type, Message) ->
?RAFT_LOG_WARNING("Attempted to reply to non-reply event type ~0p with message ~0P.", [Type, Message, 20]),
ok.
-spec send_rpc(
Destination :: #raft_identity{},
Procedure :: normalized_procedure(),
State :: #raft_state{}
) -> ok | {error, term()}.
send_rpc(Destination, Procedure, #raft_state{current_term = Term} = State) ->
send_rpc(Destination, Term, Procedure, State).
-spec send_rpc(
Destination :: #raft_identity{},
Term :: wa_raft_log:log_term(),
Procedure :: normalized_procedure(),
State :: #raft_state{}
) -> ok | {error, term()}.
send_rpc(Destination, Term, Procedure, #raft_state{self = Self} = State) ->
cast(Destination, make_rpc(Self, Term, Procedure), State).
-spec send_rpc_to_all_members(ProcedureCall :: normalized_procedure(), State :: #raft_state{}) -> [ok | {error, term()}].
send_rpc_to_all_members(ProcedureCall, #raft_state{self = Self} = State) ->
[send_rpc(Peer, ProcedureCall, State) || Peer <- config_member_identities(config(State)), Peer =/= Self].
-spec cast(Destination :: #raft_identity{}, RPC :: rpc(), State :: #raft_state{}) -> ok | {error, term()}.
cast(
#raft_identity{
name = Name,
node = Node
} = Destination,
Message,
#raft_state{
table = Table,
identifier = Identifier,
distribution_module = Distribution
}
) ->
try
ok = Distribution:cast({Name, Node}, Identifier, Message)
catch
_:E ->
?RAFT_COUNT(Table, {'server.cast.error', E}),
?RAFT_LOG_DEBUG("Cast to ~p error ~100p", [Destination, E]),
{error, E}
end.
-spec maybe_heartbeat(#raft_state{}) -> #raft_state{}.
maybe_heartbeat(#raft_state{table = Table} = State) ->
case should_heartbeat(State) of
true ->
?RAFT_COUNT(Table, 'leader.heartbeat'),
append_entries_to_followers(State);
false ->
State
end.
-spec should_heartbeat(#raft_state{}) -> boolean().
should_heartbeat(#raft_state{handover = Handover}) when Handover =/= undefined ->
false;
should_heartbeat(#raft_state{application = App, table = Table, heartbeat_send_ts = HeartbeatSendTs}) ->
Latest = lists:max(maps:values(HeartbeatSendTs)),
Current = erlang:monotonic_time(millisecond),
Current - Latest > ?RAFT_HEARTBEAT_INTERVAL(App, Table).
%% Update the server's current term info in `wa_raft_info`.
-spec update_current_term_info(
State :: state(),
Data :: #raft_state{}
) -> ok.
update_current_term_info(
State,
#raft_state{
table = Table,
partition = Partition,
self = #raft_identity{node = Node},
current_term = CurrentTerm,
leader = Leader
}
) ->
LeaderNode = case Leader of
undefined -> undefined;
#raft_identity{node = N} -> N
end,
CandidateNode = case State of
candidate -> Node;
_ -> undefined
end,
wa_raft_info:set_current_term_info(Table, Partition, CurrentTerm, LeaderNode, CandidateNode),
ok.
%% Update the heartbeat reply timestamp for a peer and recompute the quorum timestamp.
-spec update_heartbeat_reply_ts(Peer :: node(), Data :: #raft_state{}) -> #raft_state{}.
update_heartbeat_reply_ts(Peer, #raft_state{heartbeat_reply_ts = HeartbeatReply} = Data) ->
NowTs = erlang:monotonic_time(millisecond),
NewHeartbeatReply = HeartbeatReply#{Peer => NowTs},
NewData = Data#raft_state{heartbeat_reply_ts = NewHeartbeatReply},
update_quorum_ts(NowTs, NewData).
%% Recompute the quorum timestamp from the current heartbeat reply timestamps.
-spec update_quorum_ts(Data :: #raft_state{}) -> #raft_state{}.
update_quorum_ts(Data) ->
update_quorum_ts(erlang:monotonic_time(millisecond), Data).
%% Recompute the quorum timestamp from the current heartbeat reply timestamps.
-spec update_quorum_ts(
NowTs :: integer(),
Data :: #raft_state{}
) -> #raft_state{}.
update_quorum_ts(NowTs, #raft_state{self = #raft_identity{node = Node}, heartbeat_reply_ts = HeartbeatReply} = Data) ->
case compute_member_quorum(HeartbeatReply#{Node => NowTs}, config(Data)) of
{quorum, QuorumTs} -> Data#raft_state{last_quorum_ts = QuorumTs};
_ -> Data
end.
%% Update the server's current status in `wa_raft_info`.
%% For stalled, disabled:
%% - Never live and always lagging and stale.
%% For leaders:
%% - Liveness is determined based on the age of the most recent quorum
%% determined by accepted heartbeat responses from followers.
%% - Lagging and staleness is the opposite of liveness.
%% For followers, candidates and witnesses:
%% - Liveness is determined based on the timeliness of the last heartbeat
%% received from a leader
%% - Lagging is determined based on the number of log entries that are
%% committed but not yet applied locally
%% - Staleness is determined based on whether the current server state
%% supports reading and lagging.
-spec update_status(State :: state(), Data :: #raft_state{}) -> ok.
update_status(
State,
#raft_state{
table = Table,
partition = Partition
}
) when State =:= stalled; State =:= disabled ->
wa_raft_info:set_status(Table, Partition, State, false, true, true),
ok;
update_status(
State,
#raft_state{
application = App,
name = Name,
table = Table,
partition = Partition,
last_quorum_ts = LastQuorumTs
} = Data
) when State =:= leader ->
% If the quorum of the most recent timestamps of follower's acknowledgement
% of heartbeats is too old, then the leader is considered stale.
NowTs = erlang:monotonic_time(millisecond),
QuorumAge = case LastQuorumTs of
undefined -> infinity;
_ -> NowTs - LastQuorumTs
end,
QuorumGrace = ?RAFT_LEADER_STALE_INTERVAL(App, Table),
NewLive = QuorumAge =< QuorumGrace,
% Status should always exist while server is running
{CachedState, CachedLive, _, _} = wa_raft_info:get_status(Table, Partition),
case NewLive of
CachedLive ->
ok;
true ->
?SERVER_LOG_NOTICE(
leader,
Data,
"is live after heartbeat quorum age drops to ~0p ms (within ~0p ms grace period)",
[QuorumAge, QuorumGrace]
);
false ->
?SERVER_LOG_NOTICE(
leader,
Data,
"is no longer live after heartbeat quorum age rises to ~0p ms (outside ~0p ms grace period)",
[QuorumAge, QuorumGrace]
)
end,
% Update status and message queue length if necessary
State =:= CachedState andalso NewLive =:= CachedLive orelse
wa_raft_info:set_status(Table, Partition, leader, NewLive, not NewLive, not NewLive),
wa_raft_info:refresh_message_queue_length(Name),
ok;
update_status(
State,
#raft_state{
application = App,
name = Name,
table = Table,
partition = Partition,
last_applied = LastApplied,
last_quorum_ts = LeaderHeartbeatTs,
leader_commit_index = LeaderCommitIndex
} = Data
) when State =:= follower; State =:= candidate; State =:= witness ->
Now = erlang:monotonic_time(millisecond),
NewLive =
LeaderHeartbeatTs =/= undefined andalso
LeaderHeartbeatTs + ?RAFT_LIVENESS_GRACE_PERIOD_MS(App, Table) >= Now,
NewLagging =
not NewLive orelse
LeaderCommitIndex =:= undefined orelse
LeaderCommitIndex - LastApplied >= ?RAFT_STALE_GRACE_PERIOD_ENTRIES(App, Table),
NewStale = NewLagging orelse State =:= witness,
% Status should always exist while server is running
{CachedState, CachedLive, CachedLagging, CachedStale} = wa_raft_info:get_status(Table, Partition),
case NewLive of
CachedLive ->
ok;
true ->
?SERVER_LOG_NOTICE(State, Data, "is live", []);
false ->
?SERVER_LOG_NOTICE(
State,
Data,
"is no longer live after receiving leader heartbeat at ~0p",
[LeaderHeartbeatTs]
)
end,
case NewLagging of
CachedLagging ->
ok;
true when LeaderCommitIndex =:= undefined ->
ok;
true ->
?SERVER_LOG_NOTICE(
State,
Data,
"is stale as last applied at ~0p is ~0p behind leader's commit at ~0p.",
[LastApplied, LeaderCommitIndex - LastApplied, LeaderCommitIndex]
);
false ->
?SERVER_LOG_NOTICE(State, Data, "catches up.", [])
end,
State =:= CachedState andalso NewLive =:= CachedLive andalso NewLagging =:= CachedLagging andalso NewStale =:= CachedStale orelse
wa_raft_info:set_status(Table, Partition, State, NewLive, NewLagging, NewStale),
wa_raft_info:refresh_message_queue_length(Name),
ok.
%% Check if the leader has not been seen for at least the minimum election timeout.
-spec is_leader_missing(Data :: #raft_state{}) -> {Missing :: boolean(), Delay :: infinity | integer(), AllowedDelay :: integer()}.
is_leader_missing(#raft_state{application = App, table = Table, last_quorum_ts = LastQuorumTs}) ->
AllowedDelay = ?RAFT_ELECTION_TIMEOUT_MIN(App, Table),
Delay = case LastQuorumTs of
undefined -> infinity;
_ -> erlang:monotonic_time(millisecond) - LastQuorumTs
end,
{Delay > AllowedDelay, Delay, AllowedDelay}.
%% Check if the leader should send a storage snapshot to this follower and request
%% if necessary.
-spec leader_maybe_request_snapshot(
Follower :: node(),
FollowerLastIndex :: wa_raft_log:log_index(),
FollowerLastAppliedIndex :: wa_raft_log:log_index() | undefined,
State :: #raft_state{}
) -> ok.
leader_maybe_request_snapshot(
Follower,
FollowerLastIndex,
FollowerLastAppliedIndex,
#raft_state{log_view = View} = State
) ->
FirstIndex = wa_raft_log:first_index(View),
leader_should_send_snapshot(FirstIndex, FollowerLastIndex, FollowerLastAppliedIndex, State) andalso
request_snapshot_for_follower(Follower, State),
ok.
%% Leaders should send a storage snapshot to a follower instead of replicating
%% when the follower:
%% * is stalled or otherwise missing their entire log,
%% * requires a log entry that is already rotated out, or
%% * is very far behind (past the snapshot catchup threshold).
-spec leader_should_send_snapshot(
FirstIndex :: wa_raft_log:log_index(),
FollowerLastIndex :: wa_raft_log:log_index(),
FollowerLastAppliedIndex :: wa_raft_log:log_index() | undefined,
State :: #raft_state{}
) -> boolean().
leader_should_send_snapshot(_, 0, _, _) ->
true;
leader_should_send_snapshot(FirstIndex, FollowerLastIndex, _, _) when FirstIndex > FollowerLastIndex ->
true;
leader_should_send_snapshot(_, _, undefined, _) ->
false;
leader_should_send_snapshot(_, FollowerLastIndex, FollowerLastAppliedIndex, #raft_state{application = App, table = Table}) ->
SnapshotCatchupThreshold = ?RAFT_SNAPSHOT_CATCHUP_THRESHOLD(App, Table),
FollowerLastIndex - FollowerLastAppliedIndex > SnapshotCatchupThreshold.
%% Try to start a snapshot transport to a follower if the snapshot transport
%% service is available. If the follower is a witness or too many snapshot
%% transports have been started then no transport is created. This function
%% always performs this request asynchronously.
-spec request_snapshot_for_follower(Follower :: node(), State :: #raft_state{}) -> ok.
request_snapshot_for_follower(
FollowerId,
#raft_state{
application = App,
name = Name,
table = Table,
partition = Partition
} = State
) ->
Witness = lists:member({Name, FollowerId}, config_witnesses(config(State))),
wa_raft_snapshot_catchup:catchup(App, Name, FollowerId, Table, Partition, Witness).
-spec profile_startup_phase(
Application :: atom(),
Partition :: wa_raft:partition(),
Phase :: atom(),
Fun :: fun(() -> Result)
) -> Result.
profile_startup_phase(Application, Partition, Phase, Fun) ->
StartedAt = erlang:monotonic_time(),
Result = Fun(),
DurationUs = erlang:convert_time_unit(erlang:monotonic_time() - StartedAt, native, microsecond),
emit_startup_phase(Application, Partition, Phase, DurationUs),
Result.
-spec emit_startup_phase(
Application :: atom(),
Partition :: wa_raft:partition(),
Phase :: atom(),
DurationUs :: non_neg_integer()
) -> ok.
emit_startup_phase(Application, Partition, Phase, DurationUs) ->
Callback = application:get_env(Application, waraft_startup_phase_callback, undefined),
emit_startup_phase_callback(Callback, Application, Partition, Phase, DurationUs).
-spec emit_startup_phase_callback(
Callback :: undefined | fun() | {module(), atom()},
Application :: atom(),
Partition :: wa_raft:partition(),
Phase :: atom(),
DurationUs :: non_neg_integer()
) -> ok.
emit_startup_phase_callback(undefined, _Application, _Partition, _Phase, _DurationUs) ->
ok;
emit_startup_phase_callback(Callback, Application, Partition, Phase, DurationUs)
when is_function(Callback, 4) ->
try Callback(Partition, Phase, DurationUs, Application) of
_ -> ok
catch
_:_ -> ok
end;
emit_startup_phase_callback(Callback, Application, Partition, Phase, DurationUs)
when is_function(Callback, 5) ->
try Callback(Application, Partition, Phase, DurationUs, ?MODULE) of
_ -> ok
catch
_:_ -> ok
end;
emit_startup_phase_callback({Module, Function}, Application, Partition, Phase, DurationUs)
when is_atom(Module), is_atom(Function) ->
try erlang:apply(Module, Function, [Application, Partition, Phase, DurationUs, ?MODULE]) of
_ -> ok
catch
_:_ -> ok
end;
emit_startup_phase_callback(_Callback, _Application, _Partition, _Phase, _DurationUs) ->
ok.
-spec identity_to_server(Identity :: #raft_identity{} | undefined) -> {Name :: atom(), Node :: node()} | undefined.
identity_to_server(undefined) ->
undefined;
identity_to_server(#raft_identity{name = Name, node = Node}) ->
{Name, Node}.