Skip to main content

src/wa_raft_server.erl

%%% Copyright (c) Meta Platforms, Inc. and affiliates. All rights reserved.
%%%
%%% This source code is licensed under the Apache 2.0 license found in
%%% the LICENSE file in the root directory of this source tree.
%%%
%%% This module implements RPC of raft consensus protocol. See raft spec
%%% on https://raft.github.io/. A wa_raft instance is a participant in
%%% a consensus group. Each participant plays a certain role (follower,
%%% leader or candidate). The mission of a consensus group is to
%%% implement a replicated state machine in a distributed cluster.

-module(wa_raft_server).
-compile(warn_missing_spec_all).
-behaviour(gen_statem).
-compile({inline, [require_valid_state/1]}).

%%------------------------------------------------------------------------------
%% RAFT Server - OTP Supervision
%%------------------------------------------------------------------------------

-export([
    child_spec/1,
    start_link/1
]).

%%------------------------------------------------------------------------------
%% RAFT Server - Public APIs - RAFT Cluster Configuration
%%------------------------------------------------------------------------------

-export([
    latest_config_version/0
]).

%% Inspection of cluster configuration
-export([
    get_config_version/1,
    get_config_participants/1,
    get_config_members/1,
    get_config_full_members/1,
    get_config_witness_members/1,
    get_config_witnesses/1,
    is_data_replica/2,
    is_witness/2
]).

%% Creation and modification of cluster configuration
-export([
    make_config/0,
    make_config/1,
    make_config/2,
    make_config/3,
    normalize_config/1
]).

% Stubbing log entries for witnesses
-export([
    stub_entries_for_witness/1
]).

%% Modification of cluster configuration
-export([
    set_config_members/2,
    set_config_members/3,
    set_config_members/4
]).

%%------------------------------------------------------------------------------
%% RAFT Server - Public APIs
%%------------------------------------------------------------------------------

-export([
    get_current_config/1
]).

-export([
    status/1,
    status/2,
    membership/1
]).

%%------------------------------------------------------------------------------
%% RAFT Server - Internal APIs - Local Options
%%------------------------------------------------------------------------------

-export([
    default_name/2,
    registered_name/2
]).

%%------------------------------------------------------------------------------
%% RAFT Server - Internal APIs - RPC Handling
%%------------------------------------------------------------------------------

-export([
    make_rpc/3,
    parse_rpc/2
]).

%%------------------------------------------------------------------------------
%% RAFT Server - Internal APIs - Commands
%%------------------------------------------------------------------------------

-export([
    commit/4,
    read/2,
    snapshot_available/3,
    adjust_config/2,
    adjust_config/3,
    adjust_config/4,
    adjust_membership/3,
    adjust_membership/4,
    refresh_config/1,
    trigger_election/1,
    trigger_election/2,
    promote/2,
    promote/3,
    resign/1,
    handover/1,
    handover/2,
    handover_candidates/1,
    is_peer_ready/2,
    disable/2,
    enable/1,
    bootstrap/4,
    notify_complete/1
]).

%%------------------------------------------------------------------------------
%% RAFT Server - State Machine Implementation
%%------------------------------------------------------------------------------

%% General callbacks
-export([
    init/1,
    callback_mode/0,
    terminate/3
]).

%% State-specific callbacks
-export([
    stalled/3,
    leader/3,
    follower/3,
    candidate/3,
    disabled/3,
    witness/3
]).

%%------------------------------------------------------------------------------
%% RAFT Server - Test Exports
%%------------------------------------------------------------------------------

-ifdef(TEST).
-export([
    config/1,
    compute_member_quorum/2,
    leader_adjust_config/2
]).
-endif.

%%------------------------------------------------------------------------------
%% RAFT Server - Public Types
%%------------------------------------------------------------------------------

-export_type([
    state/0,
    config/0,
    config_all/0,
    membership/0,
    status/0,
    config_action/0
]).

%%------------------------------------------------------------------------------
%% RAFT Server - Internal Types
%%------------------------------------------------------------------------------

-export_type([
    election_type/0
]).

%%------------------------------------------------------------------------------

-include_lib("wa_raft/include/wa_raft.hrl").
-include_lib("wa_raft/include/wa_raft_logger.hrl").
-include_lib("wa_raft/include/wa_raft_rpc.hrl").

%%------------------------------------------------------------------------------

%% Section 5.2. Randomized election timeout for fast election and to avoid split votes
-define(ELECTION_TIMEOUT(State), {state_timeout, random_election_timeout(State), election}).

%% Timeout in milliseconds before the next heartbeat is to be sent by a RAFT leader with no pending log entries
-define(HEARTBEAT_TIMEOUT(State),    {state_timeout, ?RAFT_HEARTBEAT_INTERVAL(State#raft_state.application, State#raft_state.table), heartbeat}).
%% Timeout in milliseconds before the next heartbeat is to be sent by a RAFT leader with pending log entries
-define(COMMIT_BATCH_TIMEOUT(State), {state_timeout, ?RAFT_COMMIT_BATCH_INTERVAL(State#raft_state.application, State#raft_state.table), batch_commit}).

%%------------------------------------------------------------------------------

-define(SERVER_LOG_PREFIX, "Server[~0p, term ~0p, ~0p] ").
-define(SERVER_LOG_FORMAT(Format), ?SERVER_LOG_PREFIX Format).

-define(SERVER_LOG_ARGS(State, Data, Args), [(Data)#raft_state.name, (Data)#raft_state.current_term, require_valid_state(State) | Args]).

% elp:ignore W0002 (unused_macro) - Keeping for consistency
-define(SERVER_LOG_ERROR(Data, Format, Args), ?SERVER_LOG_ERROR(?FUNCTION_NAME, Data, Format, Args)).
-define(SERVER_LOG_ERROR(State, Data, Format, Args), ?RAFT_LOG_ERROR(?SERVER_LOG_FORMAT(Format), ?SERVER_LOG_ARGS(State, Data, Args))).

-define(SERVER_LOG_WARNING(Data, Format, Args), ?SERVER_LOG_WARNING(?FUNCTION_NAME, Data, Format, Args)).
-define(SERVER_LOG_WARNING(State, Data, Format, Args), ?RAFT_LOG_WARNING(?SERVER_LOG_FORMAT(Format), ?SERVER_LOG_ARGS(State, Data, Args))).

-define(SERVER_LOG_NOTICE(Data, Format, Args), ?SERVER_LOG_NOTICE(?FUNCTION_NAME, Data, Format, Args)).
-define(SERVER_LOG_NOTICE(State, Data, Format, Args), ?RAFT_LOG_NOTICE(?SERVER_LOG_FORMAT(Format), ?SERVER_LOG_ARGS(State, Data, Args))).

-define(SERVER_LOG_DEBUG(Data, Format, Args), ?SERVER_LOG_DEBUG(?FUNCTION_NAME, Data, Format, Args)).
-define(SERVER_LOG_DEBUG(State, Data, Format, Args), ?RAFT_LOG_DEBUG(?SERVER_LOG_FORMAT(Format), ?SERVER_LOG_ARGS(State, Data, Args))).

%%------------------------------------------------------------------------------
%% RAFT Server - Public Types
%%------------------------------------------------------------------------------

-type state() ::
    stalled |
    leader |
    follower |
    candidate |
    disabled |
    witness.

-type term_or_offset() :: wa_raft_log:log_term() | current | next | {next, Offset :: pos_integer()}.

-type peer() :: {Name :: atom(), Node :: node()}.
-type membership() :: [peer()].

-type config() ::
    #{
        version := 1,
        participants := membership(),
        membership := membership(),
        witness := membership()
    }.
-type config_all() :: config_v1_all().

-type config_v1_all() ::
    #{
        version := 1,
        participants => membership(),
        membership => membership(),
        witness => membership()
    }.

-type status() :: [status_element()].
-type status_element() ::
      {state, state()}
    | {id, atom()}
    | {peers, [{atom(), {node(), atom()}}]}
    | {partition, wa_raft:partition()}
    | {partition_path, file:filename_all()}
    | {current_term, wa_raft_log:log_term()}
    | {voted_for, node()}
    | {commit_index, wa_raft_log:log_index()}
    | {last_applied, wa_raft_log:log_index()}
    | {leader_name, atom() | undefined}
    | {leader_id, node() | undefined}
    | {pending_high, non_neg_integer()}
    | {pending_low, non_neg_integer()}
    | {pending_read, boolean()}
    | {queued, non_neg_integer()}
    | {next_indices, #{node() => wa_raft_log:log_index()}}
    | {match_indices, #{node() => wa_raft_log:log_index()}}
    | {log_module, module()}
    | {log_first, wa_raft_log:log_index()}
    | {log_last, wa_raft_log:log_index()}
    | {votes, #{node() => boolean()}}
    | {inflight_applies, non_neg_integer()}
    | {disable_reason, string()}
    | {witness, boolean()}
    | {config, config()}
    | {config_index, wa_raft_log:log_index()}.

%%------------------------------------------------------------------------------
%% RAFT Server - Internal Types
%%------------------------------------------------------------------------------

-type election_type() :: normal | force | allowed.
-type async_append_cancel_reason() ::
    wa_raft_acceptor:common_error()
    | {error, {commit_call_failed_after_submit, term()}}.

%%------------------------------------------------------------------------------
%% RAFT Server - Private Types
%%------------------------------------------------------------------------------

-type event() :: rpc() | remote(normalized_procedure()) | command() | internal_event() | timeout_type().

-type rpc() :: rpc_named() | legacy_rpc().
-type legacy_rpc() :: ?LEGACY_RAFT_RPC(atom(), wa_raft_log:log_term(), node(), undefined | tuple()).
-type rpc_named() :: ?RAFT_NAMED_RPC(atom(), wa_raft_log:log_term(), atom(), node(), undefined | tuple()).

-type command() :: commit_command() | read_command() | current_config_command() | status_command() |
                   trigger_election_command() | promote_command() | resign_command() | adjust_config_command() |
                   snapshot_available_command() | handover_candidates_command() | handover_command() |
                   enable_command() | disable_command() | bootstrap_command() | notify_complete_command() |
                   is_peer_ready_command().

-type commit_command()              :: ?COMMIT_COMMAND(gen_server:from(), wa_raft_acceptor:op(), wa_raft_acceptor:priority()).
-type read_command()                :: ?READ_COMMAND(wa_raft_acceptor:read_op()).
-type current_config_command()      :: ?CURRENT_CONFIG_COMMAND.
-type status_command()              :: ?STATUS_COMMAND.
-type trigger_election_command()    :: ?TRIGGER_ELECTION_COMMAND(term_or_offset()).
-type promote_command()             :: ?PROMOTE_COMMAND(term_or_offset(), boolean()).
-type resign_command()              :: ?RESIGN_COMMAND.
-type adjust_config_command()       :: ?ADJUST_CONFIG_COMMAND(gen_server:from() | undefined, config_action(), wa_raft_log:log_index() | undefined).
-type snapshot_available_command()  :: ?SNAPSHOT_AVAILABLE_COMMAND(string(), wa_raft_log:log_pos()).
-type handover_candidates_command() :: ?HANDOVER_CANDIDATES_COMMAND.
-type handover_command()            :: ?HANDOVER_COMMAND(node()).
-type enable_command()              :: ?ENABLE_COMMAND.
-type disable_command()             :: ?DISABLE_COMMAND(term()).
-type bootstrap_command()           :: ?BOOTSTRAP_COMMAND(wa_raft_log:log_pos(), config(), dynamic()).
-type notify_complete_command()     :: ?NOTIFY_COMPLETE_COMMAND().
-type is_peer_ready_command()       :: ?IS_PEER_READY_COMMAND(peer()).

-type internal_event() :: advance_term_event().
-type advance_term_event() :: ?ADVANCE_TERM(wa_raft_log:log_term()).

-type timeout_type() :: election | heartbeat.

-type refresh_action() :: refresh.
-type membership_action() ::
    %% Add a new peer to the cluster as a voting member or promote an existing
    %% non-voting participant to a voting member.
    {add, Peer :: peer()} |
    %% Add a new peer or existing non-voting witness participant to the
    %% cluster as a voting witness member.
    {add_witness, Peer :: peer()} |
    %% Add a new peer to the cluster as a non-voting participant.
    {add_participant, Peer :: peer()} |
    %% Promote a non-voting participant to a voting member if the participant
    %% is ready. A participant is ready if it would be eligible for a handover
    %% if it were a voting member.
    {promote_participant_if_ready, Peer :: peer()} |
    %% Remove a voting member's membership and participation or a non-voting
    %% participant's participation from the cluster.
    {remove, Peer :: peer()} |
    %% Remove a voting witness member's membership and participation from the
    %% cluster.
    {remove_witness, Peer :: peer()} |
    %% Demote an existing voting member to a non-voting participant or a
    %% voting witness member to a non-voting witness participant.
    {remove_membership, Peer :: peer()} |
    %% Demote an existing voting member or non-voting participant to a voting
    %% witness member or non-voting witness participant.
    {demote_to_witness, Peer :: peer()}.

-type config_action() :: refresh_action() | membership_action().

%%------------------------------------------------------------------------------
%% RAFT Server - OTP Supervision
%%------------------------------------------------------------------------------

-spec child_spec(Options :: #raft_options{}) -> supervisor:child_spec().
child_spec(Options) ->
    #{
        id => ?MODULE,
        start => {?MODULE, start_link, [Options]},
        restart => transient,
        shutdown => 30000,
        modules => [?MODULE]
    }.

-spec start_link(Options :: #raft_options{}) -> gen_statem:start_ret().
start_link(#raft_options{server_name = Name} = Options) ->
    gen_statem:start_link({local, Name}, ?MODULE, Options, []).

%%------------------------------------------------------------------------------
%% RAFT Server - Public APIs - RAFT Cluster Configuration
%%------------------------------------------------------------------------------

%% Returns the version number for the latest cluster configuration format that
%% is supported by the current RAFT implementation. All cluster configurations
%% returned by methods used to create or modify cluster configurations in this
%% module will return cluster configurations of this version.
-spec latest_config_version() -> pos_integer().
latest_config_version() ->
    1.

-spec get_config_version(Config :: config() | config_all()) -> pos_integer().
get_config_version(#{version := Version}) ->
    Version.

-spec get_config_participants(Config :: config() | config_all()) -> [#raft_identity{}].
get_config_participants(#{version := 1, participants := Participants}) ->
    [#raft_identity{name = Name, node = Node} || {Name, Node} <- Participants];
get_config_participants(Config) ->
    get_config_members(Config).

-spec get_config_members(Config :: config() | config_all()) -> [#raft_identity{}].
get_config_members(#{version := 1, membership := Members}) ->
    [#raft_identity{name = Name, node = Node} || {Name, Node} <- Members];
get_config_members(_) ->
    [].

-spec get_config_full_members(Config :: config() | config_all()) -> [#raft_identity{}].
get_config_full_members(#{version := 1, membership := Members, witness := Witnesses}) ->
    [#raft_identity{name = Name, node = Node} || {Name, Node} <- Members -- Witnesses];
get_config_full_members(Config) ->
    get_config_members(Config).

-spec get_config_witness_members(Config :: config() | config_all()) -> [#raft_identity{}].
get_config_witness_members(#{version := 1, membership := Members, witness := Witnesses}) ->
    MembersMap = maps:from_keys(Members, []),
    [#raft_identity{name = Name, node = Node} || {Name, Node} = Witness <- Witnesses, maps:is_key(Witness, MembersMap)];
get_config_witness_members(_) ->
    [].

-spec get_config_witnesses(Config :: config() | config_all()) -> [#raft_identity{}].
get_config_witnesses(#{version := 1, witness := Witnesses}) ->
    [#raft_identity{name = Name, node = Node} || {Name, Node} <- Witnesses];
get_config_witnesses(_) ->
    [].

-spec is_data_replica(Identity :: #raft_identity{}, Config :: config() | config_all()) -> boolean().
is_data_replica(Identity, Config) ->
    lists:member(Identity, get_config_participants(Config)) andalso not lists:member(Identity, get_config_witnesses(Config)).

-spec is_witness(Identity :: #raft_identity{}, Config :: config() | config_all()) -> boolean().
is_witness(Identity, Config) ->
    lists:member(Identity, get_config_witnesses(Config)).

%% Create a new cluster configuration with no members.
%% Without any members, this cluster configuration should not be used as
%% the active configuration for a RAFT cluster.
-spec make_config() -> config().
make_config() ->
    #{
        version => 1,
        participants => [],
        membership => [],
        witness => []
    }.

%% Create a new cluster configuration with the provided members.
-spec make_config(Members :: [#raft_identity{}]) -> config().
make_config(Members) ->
    set_config_members(Members, make_config()).

%% Create a new cluster configuration with the provided members and witnesses.
-spec make_config(Members :: [#raft_identity{}], Witnesses :: [#raft_identity{}]) -> config().
make_config(Members, Witnesses) ->
    set_config_members(Members, Witnesses, make_config()).

%% Create a new cluster configuration with the provided participants, members, and witnesses.
%% Any members that are not in the participants list will be included.
-spec make_config(Participants :: [#raft_identity{}], Members :: [#raft_identity{}], Witnesses :: [#raft_identity{}]) -> config().
make_config(Participants, Members, Witnesses) ->
    set_config_members(Participants, Members, Witnesses, make_config()).

%% Replace the set of members in the provided cluster configuration.
%% After replacement, the set of participants will be equal to the provided list of members.
%% After replacement, no witnesses will be set.
%% Will upgrade the cluster configuration to the latest version.
-spec set_config_members(Members :: [#raft_identity{}], ConfigAll :: config() | config_all()) -> config().
set_config_members(Members, ConfigAll) ->
    set_config_members(Members, [], ConfigAll).

%% Replace the set of participants, members and witnesses in the provided cluster configuration.
%% After replacement, the set of participants will be equal to the provided list of members.
%% Will upgrade the cluster configuration to the latest version.
-spec set_config_members(Members :: [#raft_identity{}], Witnesses :: [#raft_identity{}], ConfigAll :: config() | config_all()) -> config().
set_config_members(Members, Witnesses, ConfigAll) ->
    set_config_members(Members, Members, Witnesses, ConfigAll).

%% Replace the set of participants, members, and witnesses in the provided cluster configuration.
%% After replacement, the set of participants will include at least all members.
%% Will upgrade the cluster configuration to the latest version.
-spec set_config_members(Participants :: [#raft_identity{}], Members :: [#raft_identity{}], Witnesses :: [#raft_identity{}], ConfigAll :: config() | config_all()) -> config().
set_config_members(Participants, Members, Witnesses, ConfigAll) ->
    ParticipantPeers = lists:usort([{Name, Node} || #raft_identity{name = Name, node = Node} <- Participants]),
    MemberPeers = lists:usort([{Name, Node} || #raft_identity{name = Name, node = Node} <- Members]),
    WitnessPeers = lists:usort([{Name, Node} || #raft_identity{name = Name, node = Node} <- Witnesses]),
    Config = normalize_config(ConfigAll),
    Config#{
        participants => lists:umerge(ParticipantPeers, MemberPeers),
        membership => MemberPeers,
        witness => WitnessPeers
    }.

%% Attempt to upgrade any configuration from an older configuration version to the
%% latest configuration version if possible.
-spec normalize_config(ConfigAll :: config() | config_all()) -> Config :: config().
normalize_config(#{version := 1} = Config) ->
    Membership = maps:get(membership, Config, []),
    #{
        version => 1,
        participants => maps:get(participants, Config, Membership),
        membership => Membership,
        witness => maps:get(witness, Config, [])
    };
normalize_config(#{version := Version}) ->
    % All valid configurations will contain at least their own version; however,
    % we do not know how to handle configurations with newer versions.
    error({unsupported_version, Version});
normalize_config(#{}) ->
    error(no_version).

%%------------------------------------------------------------------------------
%% RAFT Server - Public APIs
%%------------------------------------------------------------------------------

-spec get_current_config(Server :: gen_statem:server_ref()) -> config().
get_current_config(Server) ->
    gen_statem:call(Server, ?CURRENT_CONFIG_COMMAND, ?RAFT_RPC_CALL_TIMEOUT()).

-spec status(Server :: gen_statem:server_ref()) -> status().
status(Server) ->
    gen_statem:call(Server, ?STATUS_COMMAND, ?RAFT_RPC_CALL_TIMEOUT()).

-spec status
    (Server :: gen_statem:server_ref(), Key :: atom()) -> Value :: dynamic();
    (Server :: gen_statem:server_ref(), Keys :: [atom()]) -> Value :: [dynamic()].
status(Server, Key) when is_atom(Key) ->
    hd(status(Server, [Key]));
status(Server, Keys) when is_list(Keys) ->
    case status(Server) of
        [_|_] = Status ->
            [proplists:get_value(Key, Status, undefined) || Key <- Keys];
        _ ->
            lists:duplicate(length(Keys), undefined)
    end.

-spec membership(Service :: gen_statem:server_ref()) -> undefined | [#raft_identity{}].
membership(Service) ->
    case proplists:get_value(config, status(Service), undefined) of
        undefined -> undefined;
        Config    -> get_config_members(Config)
    end.

%%------------------------------------------------------------------------------
%% RAFT Server - Internal APIs - Local Options
%%------------------------------------------------------------------------------

%% Get the default name for the RAFT server associated with the provided
%% RAFT partition.
-spec default_name(Table :: wa_raft:table(), Partition :: wa_raft:partition()) -> Name :: atom().
default_name(Table, Partition) ->
    % elp:ignore W0023 bounded atom, one per table/partition at startup
    binary_to_atom(<<"raft_server_", (atom_to_binary(Table))/binary, "_", (integer_to_binary(Partition))/binary>>).

%% Get the registered name for the RAFT server associated with the provided
%% RAFT partition or the default name if no registration exists.
-spec registered_name(Table :: wa_raft:table(), Partition :: wa_raft:partition()) -> Name :: atom().
registered_name(Table, Partition) ->
    case wa_raft_part_sup:options(Table, Partition) of
        undefined -> default_name(Table, Partition);
        Options   -> Options#raft_options.server_name
    end.

%%------------------------------------------------------------------------------
%% RAFT Server - Internal APIs - RPC Handling
%%------------------------------------------------------------------------------

-spec make_rpc(Self :: #raft_identity{}, Term :: wa_raft_log:log_term(), Procedure :: normalized_procedure()) -> rpc().
make_rpc(#raft_identity{name = Name, node = Node}, Term, ?PROCEDURE(Procedure, Payload)) ->
    % For compatibility with legacy versions that expect RPCs sent with no arguments to have payload 'undefined' instead of {}.
    PayloadOrUndefined = case Payload of
        {} -> undefined;
        _  -> Payload
    end,
    ?RAFT_NAMED_RPC(Procedure, Term, Name, Node, PayloadOrUndefined).

-spec parse_rpc(Self :: #raft_identity{}, RPC :: rpc()) -> {Term :: wa_raft_log:log_term(), Sender :: #raft_identity{}, Procedure :: procedure()}.
parse_rpc(_, ?RAFT_NAMED_RPC(Key, Term, SenderName, SenderNode, PayloadOrUndefined)) ->
    Payload = case PayloadOrUndefined of
        undefined -> {};
        _         -> PayloadOrUndefined
    end,
    #{Key := ?PROCEDURE(Procedure, Defaults)} = protocol(),
    {Term, #raft_identity{name = SenderName, node = SenderNode}, ?PROCEDURE(Procedure, defaultize_payload(Defaults, Payload))};
parse_rpc(#raft_identity{name = Name} = Self, ?LEGACY_RAFT_RPC(Procedure, Term, SenderId, Payload)) ->
    parse_rpc(Self, ?RAFT_NAMED_RPC(Procedure, Term, Name, SenderId, Payload)).

%%------------------------------------------------------------------------------
%% RAFT Server - Internal APIs - Commands
%%------------------------------------------------------------------------------

-spec commit(
    Server :: gen_statem:server_ref(),
    From :: gen_server:from(),
    Op :: wa_raft_acceptor:op(),
    Priority :: wa_raft_acceptor:priority()
) -> ok.
commit(Server, From, Op, Priority) ->
    gen_statem:cast(Server, ?COMMIT_COMMAND(From, Op, Priority)).

-spec read(
    Server :: gen_statem:server_ref(),
    Op :: wa_raft_acceptor:read_op()
) -> ok.
read(Server, Op) ->
    gen_statem:cast(Server, ?READ_COMMAND(Op)).

-spec snapshot_available(
    Server :: gen_statem:server_ref(),
    Root :: file:filename(),
    Position :: wa_raft_log:log_pos()
) -> ok | {error, Reason :: term()}.
snapshot_available(Server, Root, Position) ->
    % Use the storage call timeout because this command requires the RAFT
    % server to make a potentially expensive call against the RAFT storage
    % server to complete.
    gen_statem:call(Server, ?SNAPSHOT_AVAILABLE_COMMAND(Root, Position), ?RAFT_STORAGE_CALL_TIMEOUT()).

-spec adjust_config(Server :: gen_statem:server_ref(), Action :: config_action()) ->
    {ok, Position :: wa_raft_log:log_pos()} | {error, Reason :: term()}.
adjust_config(Server, Action) ->
    adjust_config(Server, Action, undefined).

-spec adjust_config(
    Server :: gen_statem:server_ref(),
    Action :: config_action(),
    Index :: wa_raft_log:log_index() | undefined
) -> {ok, Position :: wa_raft_log:log_pos()} | {error, Reason :: term()}.
adjust_config(Server, Action, Index) ->
    gen_statem:call(Server, ?ADJUST_CONFIG_COMMAND(undefined, Action, Index), ?RAFT_RPC_CALL_TIMEOUT()).

-spec adjust_config(
    Server :: gen_statem:server_ref(),
    From :: gen_server:from(),
    Action :: config_action(),
    Index :: wa_raft_log:log_index() | undefined
) -> ok.
adjust_config(Server, From, Action, Index) ->
    gen_statem:cast(Server, ?ADJUST_CONFIG_COMMAND(From, Action, Index)).

-spec adjust_membership(
    Server :: gen_statem:server_ref(),
    Action :: add | add_witness | remove | remove_witness,
    Peer :: peer()
) -> {ok, Position :: wa_raft_log:log_pos()} | {error, Reason :: term()}.
adjust_membership(Server, Action, Peer) ->
    adjust_config(Server, {Action, Peer}).

-spec adjust_membership(
    Server :: gen_statem:server_ref(),
    Action :: add | add_witness | remove | remove_witness,
    Peer :: peer(),
    ConfigIndex :: wa_raft_log:log_index() | undefined
) -> {ok, Position :: wa_raft_log:log_pos()} | {error, Reason :: term()}.
adjust_membership(Server, Action, Peer, ConfigIndex) ->
    adjust_config(Server, {Action, Peer}, ConfigIndex).

-spec refresh_config(Server :: gen_statem:server_ref()) ->
    {ok, Position :: wa_raft_log:log_pos()} | {error, Reason :: term()}.
refresh_config(Server) ->
    adjust_config(Server, refresh).

%% Request the specified RAFT server to start an election in the next term.
-spec trigger_election(Server :: gen_statem:server_ref()) -> ok | {error, Reason :: term()}.
trigger_election(Server) ->
    trigger_election(Server, current).

%% Request the specified RAFT server to trigger a new election in the term *after* the specified term.
-spec trigger_election(Server :: gen_statem:server_ref(), Term :: term_or_offset()) -> ok | {error, Reason :: term()}.
trigger_election(Server, Term) ->
    gen_statem:call(Server, ?TRIGGER_ELECTION_COMMAND(Term), ?RAFT_RPC_CALL_TIMEOUT()).

%% Request the specified RAFT server to promote itself to leader of the specified term.
-spec promote(Server :: gen_statem:server_ref(), Term :: term_or_offset()) -> ok | {error, Reason :: term()}.
promote(Server, Term) ->
    promote(Server, Term, false).

-spec promote(Server :: gen_statem:server_ref(), Term :: term_or_offset(), Force :: boolean()) -> ok | {error, Reason :: term()}.
promote(Server, Term, Force) ->
    gen_statem:call(Server, ?PROMOTE_COMMAND(Term, Force), ?RAFT_RPC_CALL_TIMEOUT()).

-spec resign(Server :: gen_statem:server_ref()) -> ok | {error, Reason :: term()}.
resign(Server) ->
    gen_statem:call(Server, ?RESIGN_COMMAND, ?RAFT_RPC_CALL_TIMEOUT()).

%% Instruct a RAFT leader to attempt a handover to a random handover candidate.
-spec handover(Server :: gen_statem:server_ref()) -> ok.
handover(Server) ->
    gen_statem:cast(Server, ?HANDOVER_COMMAND(undefined)).

%% Instruct a RAFT leader to attempt a handover to the specified peer node.
%% If an `undefined` peer node is specified, then handover to a random handover candidate.
%% Returns which peer node the handover was sent to or otherwise an error.
-spec handover(Server :: gen_statem:server_ref(), Peer :: node() | undefined) -> {ok, Peer :: node()} | {error, Reason :: term()}.
handover(Server, Peer) ->
    gen_statem:call(Server, ?HANDOVER_COMMAND(Peer), ?RAFT_RPC_CALL_TIMEOUT()).

-spec handover_candidates(Server :: gen_statem:server_ref()) -> {ok, Candidates :: [node()]} | {error, Reason :: term()}.
handover_candidates(Server) ->
    gen_statem:call(Server, ?HANDOVER_CANDIDATES_COMMAND, ?RAFT_RPC_CALL_TIMEOUT()).

-spec is_peer_ready(Server :: gen_statem:server_ref(), Peer :: peer()) -> ok | {error, Reason :: term()}.
is_peer_ready(Server, Peer) ->
    gen_statem:call(Server, ?IS_PEER_READY_COMMAND(Peer), ?RAFT_RPC_CALL_TIMEOUT()).

-spec disable(Server :: gen_statem:server_ref(), Reason :: term()) -> ok | {error, ErrorReason :: atom()}.
disable(Server, Reason) ->
    gen_statem:call(Server, ?DISABLE_COMMAND(Reason), ?RAFT_RPC_CALL_TIMEOUT()).

-spec enable(Server :: gen_statem:server_ref()) -> ok | {error, ErrorReason :: atom()}.
enable(Server) ->
    gen_statem:call(Server, ?ENABLE_COMMAND, ?RAFT_RPC_CALL_TIMEOUT()).

-spec bootstrap(
    Server :: gen_statem:server_ref(),
    Position :: wa_raft_log:log_pos(),
    Config :: config(),
    Data :: dynamic()
) -> ok | {error, Reason :: term()}.
bootstrap(Server, Position, Config, Data) ->
    gen_statem:call(Server, ?BOOTSTRAP_COMMAND(Position, Config, Data), ?RAFT_STORAGE_CALL_TIMEOUT()).

-spec notify_complete(Server :: gen_statem:server_ref()) -> ok.
notify_complete(Server) ->
    gen_statem:cast(Server, ?NOTIFY_COMPLETE_COMMAND()).

%%------------------------------------------------------------------------------
%% RAFT Server - State Machine Implementation - Logging
%%------------------------------------------------------------------------------

-spec require_valid_state(state()) -> state().
require_valid_state(State) ->
    State.

%%------------------------------------------------------------------------------
%% RAFT Server - State Machine Implementation - General Callbacks
%%------------------------------------------------------------------------------

-spec init(Options :: #raft_options{}) -> gen_statem:init_result(state()).
init(
    #raft_options{
        application = Application,
        table = Table,
        partition = Partition,
        self = Self,
        identifier = Identifier,
        database = PartitionPath,
        label_module = LabelModule,
        distribution_module = DistributionModule,
        log_name = Log,
        server_name = Name,
        storage_name = Storage
    } = Options
) ->
    process_flag(trap_exit, true),

    ?RAFT_LOG_NOTICE("Server[~0p] starting with options ~0p", [Name, Options]),

    % This increases the potential overhead of sending messages to server;
    % however, can protect the server from GC overhead
    % and other memory-related issues (most notably when receiving log entries
    % when undergoing a fast log catchup).
    ?RAFT_CONFIG(raft_server_message_queue_off_heap, true) andalso
        process_flag(message_queue_data, off_heap),

    % Open storage and the log. Both providers can reject startup on
    % recoverable corruption or unsafe disk state, so keep those as normal
    % start failures instead of crashing through a bad pattern match.
    case profile_startup_phase(Application, Partition, server_open_storage_and_log, fun() ->
             open_storage_and_log(Application, Partition, Storage, Log)
         end) of
        {ok, Last, View} ->
            Now = erlang:monotonic_time(millisecond),
            State0 = #raft_state{
                application = Application,
                name = Name,
                self = Self,
                identifier = Identifier,
                table = Table,
                partition = Partition,
                partition_path = PartitionPath,
                log_view = View,
                queues = wa_raft_queue:queues(Options),
                label_module = LabelModule,
                last_label = undefined,
                distribution_module = DistributionModule,
                storage = Storage,
                commit_index = Last#raft_log_pos.index,
                last_applied = Last#raft_log_pos.index,
                current_term = Last#raft_log_pos.term,
                state_start_ts = Now
            },

            State1 = load_config(State0),
            rand:seed(exsp, {erlang:monotonic_time(), erlang:time_offset(), erlang:unique_integer()}),
            % TODO T246543655 When we have proper error handling for data corruption
            %                 vs. stalled server then handle {error, Reason} type
            %                 returns from load_state.
            State2 = case profile_startup_phase(Application, Partition, server_durable_state_load, fun() ->
                              wa_raft_durable_state:load(State1)
                          end) of
                {ok, NewState} -> NewState;
                _              -> State1
            end,

            % 1. Begin as disabled if a disable reason is set
            % 2. Begin as stalled if there is no data
            % 3. Begin as witness if configured
            % 4. Begin as follower otherwise
            InitialState = case State2 of
                #raft_state{disable_reason = undefined, last_applied = 0} ->
                    case bootstrapped_log_or_storage(State2) of
                        true -> follower_or_witness(State2);
                        false -> stalled
                    end;
                #raft_state{disable_reason = undefined} -> follower_or_witness(State2);
                _ -> disabled
            end,

            wa_raft_info:set_current_term_info(Table, Partition, State2#raft_state.current_term, undefined, undefined),
            wa_raft_info:set_status(Table, Partition, InitialState, false, true, true),

            {ok, InitialState, State2};
        {error, Reason} ->
            ?RAFT_LOG_ERROR("Server[~0p] failed to open storage/log due to ~0P", [Name, Reason, 30]),
            {stop, Reason}
    end.

-spec open_storage_and_log(
    Application :: atom(),
    Partition :: wa_raft:partition(),
    Storage :: gen_server:server_ref(),
    Log :: gen_server:server_ref()
) ->
    {ok, wa_raft_log:log_pos(), wa_raft_log:view()} | {error, term()}.
open_storage_and_log(Application, Partition, Storage, Log) ->
    case profile_startup_phase(Application, Partition, server_storage_open, fun() ->
             wa_raft_storage:open(Storage)
         end) of
        {ok, Last} ->
            case profile_startup_phase(Application, Partition, server_log_open, fun() ->
                     wa_raft_log:open(Log, Last)
                 end) of
                {ok, View} -> {ok, Last, View};
                {error, Reason} -> {error, Reason}
            end;
        {error, Reason} ->
            {error, Reason}
    end.

-spec bootstrapped_log_or_storage(State :: #raft_state{}) -> boolean().
bootstrapped_log_or_storage(State) ->
    config_index(State) > 0.

-spec callback_mode() -> gen_statem:callback_mode_result().
callback_mode() ->
    [state_functions, state_enter].

-spec terminate(Reason :: term(), State :: state(), Data :: #raft_state{}) -> ok.
terminate(Reason, State, #raft_state{name = Name, table = Table, partition = Partition, handover = Handover} = Data0) ->
    ?SERVER_LOG_NOTICE(State, Data0, "terminating due to ~0P", [Reason, 20]),
    CancelReason =
        case Handover of
            {Peer, _, _} -> {error, {notify_redirect, Peer}};
            undefined    -> {error, not_leader}
        end,
    Data1 = cancel_async_append_in_flight(State, CancelReason, Data0),
    cancel_pending_and_queued(CancelReason, Data1),
    wa_raft_durable_state:sync(Data0),
    wa_raft_info:clear(Table, Partition, Name),
    ok.

%%------------------------------------------------------------------------------
%% RAFT Server - State Machine Implementation - Procedure Call Marshalling
%%------------------------------------------------------------------------------

%% A macro that destructures the identity record indicating that the
%% relevant procedure should be refactored to treat identities
%% opaquely.
-define(IDENTITY_REQUIRES_MIGRATION(Name, Node), #raft_identity{name = Name, node = Node}).

-type remote(Call) :: ?REMOTE(#raft_identity{}, Call).
-type procedure()  :: ?PROCEDURE(atom(), tuple()).

-type normalized_procedure() :: append_entries() | append_entries_response() | request_pre_vote() | pre_vote() | request_vote() | vote() | handover() | handover_failed() | notify_term().
-type append_entries()          :: ?APPEND_ENTRIES         (wa_raft_log:log_index(), wa_raft_log:log_term(), [wa_raft_log:log_entry() | binary()], wa_raft_log:log_index(), wa_raft_log:log_index()).
-type append_entries_response() :: ?APPEND_ENTRIES_RESPONSE(wa_raft_log:log_index(), boolean(), wa_raft_log:log_index(), wa_raft_log:log_index() | undefined).
-type request_pre_vote()        :: ?REQUEST_PRE_VOTE       (reference()).
-type pre_vote()                :: ?PRE_VOTE               (reference(), boolean(), wa_raft_log:log_index(), wa_raft_log:log_term()).
-type request_vote()            :: ?REQUEST_VOTE           (election_type(), wa_raft_log:log_index(), wa_raft_log:log_term()).
-type vote()                    :: ?VOTE                   (boolean()).
-type handover()                :: ?HANDOVER               (reference(), wa_raft_log:log_index(), wa_raft_log:log_term(), [wa_raft_log:log_entry() | binary()]).
-type handover_failed()         :: ?HANDOVER_FAILED        (reference()).
-type notify_term()             :: ?NOTIFY_TERM            ().

-spec protocol() -> #{atom() => procedure()}.
protocol() ->
    #{
        ?APPEND_ENTRIES          => ?APPEND_ENTRIES(0, 0, [], 0, 0),
        ?APPEND_ENTRIES_RESPONSE => ?APPEND_ENTRIES_RESPONSE(0, false, 0, undefined),
        ?REQUEST_PRE_VOTE        => ?REQUEST_PRE_VOTE(undefined),
        ?PRE_VOTE                => ?PRE_VOTE(undefined, false, 0, 0),
        ?REQUEST_VOTE            => ?REQUEST_VOTE(normal, 0, 0),
        ?VOTE                    => ?VOTE(false),
        ?HANDOVER                => ?HANDOVER(undefined, 0, 0, []),
        ?HANDOVER_FAILED         => ?HANDOVER_FAILED(undefined)
    }.

-spec handle_rpc(
    Type :: gen_statem:event_type(),
    RPC :: rpc(),
    State :: state(),
    Data :: #raft_state{}
) -> gen_statem:event_handler_result(state(), #raft_state{}).

handle_rpc(Type, ?RAFT_NAMED_RPC(Procedure, Term, SenderName, SenderNode, Payload) = Event, State, #raft_state{} = Data) ->
    handle_rpc_impl(Type, Event, Procedure, Term, #raft_identity{name = SenderName, node = SenderNode}, Payload, State, Data);
handle_rpc(Type, ?LEGACY_RAFT_RPC(Procedure, Term, SenderId, Payload) = Event, State, #raft_state{name = Name} = Data) ->
    handle_rpc_impl(Type, Event, Procedure, Term, #raft_identity{name = Name, node = SenderId}, Payload, State, Data);
handle_rpc(_, RPC, State, #raft_state{table = Table} = Data) ->
    ?RAFT_COUNT(Table, {'rpc.unrecognized', State}),
    ?SERVER_LOG_NOTICE(State, Data, "receives unknown RPC format ~0P", [RPC, 20]),
    keep_state_and_data.

-spec handle_rpc_impl(
    Type :: gen_statem:event_type(),
    Event :: rpc(),
    Key :: atom(),
    Term :: wa_raft_log:log_term(),
    Sender :: #raft_identity{},
    Payload :: undefined | tuple(),
    State :: state(),
    Data :: #raft_state{}
) -> gen_statem:event_handler_result(state(), #raft_state{}).

%% [Protocol] Undefined payload should be treated as an empty tuple
handle_rpc_impl(Type, Event, Key, Term, Sender, undefined, State, Data) ->
    handle_rpc_impl(Type, Event, Key, Term, Sender, {}, State, Data);
%% [General Rules] Discard any incoming RPCs with a term older than the current term
handle_rpc_impl(_, _, Key, Term, Sender, _, State, #raft_state{current_term = CurrentTerm} = Data) when Term < CurrentTerm ->
    ?SERVER_LOG_NOTICE(State, Data, "dropping stale ~0p from ~0p with old term ~0p.", [Key, Sender, Term]),
    State =/= disabled andalso send_rpc(Sender, ?NOTIFY_TERM(), Data),
    keep_state_and_data;
%% [PreVote RPC] Pre-vote requests are processed without checking term
handle_rpc_impl(Type, _, ?REQUEST_PRE_VOTE, _, Sender, Payload, State, Data) ->
    handle_rpc_normalization(Type, ?REQUEST_PRE_VOTE, Sender, Payload, State, Data);
%% [RequestVote RPC] RAFT servers should ignore vote requests with reason `normal`
%%                   if it knows about a currently active leader even if the vote
%%                   request has a newer term. A leader is only active if it is
%%                   replicating to peers so we check if we have recently received
%%                   a heartbeat. (4.2.3)
handle_rpc_impl(Type, Event, ?REQUEST_VOTE, Term, Sender, Payload, State, #raft_state{table = Table} = Data) when is_tuple(Payload), element(1, Payload) =:= normal ->
    case is_leader_missing(Data) of
        {true, _, _} ->
            % We have not gotten a heartbeat from the leader recently so allow this vote request
            % to go through by reraising it with the special 'allowed' election type.
            handle_rpc_impl(Type, Event, ?REQUEST_VOTE, Term, Sender, setelement(1, Payload, allowed), State, Data);
        {false, Delay, AllowedDelay} ->
            % We have gotten a heartbeat recently so drop this vote request.
            % Log this at debug level because we may end up with a lot of these when we have
            % removed a server from the cluster but not yet shut it down.
            ?RAFT_COUNT(Table, 'server.request_vote.drop'),
            ?SERVER_LOG_DEBUG(
                State,
                Data,
                "rejecting normal vote request from ~p because leader was still active ~p ms ago (allowed ~p ms).",
                [Sender, Delay, AllowedDelay]
            ),
            send_rpc(Sender, Term, ?VOTE(false), Data),
            keep_state_and_data
    end;
%% [General Rules] Advance to the newer term and reset state when seeing a newer term in an incoming RPC
handle_rpc_impl(Type, Event, _, Term, _, _, _, #raft_state{current_term = CurrentTerm}) when Term > CurrentTerm ->
    {keep_state_and_data, [{next_event, internal, ?ADVANCE_TERM(Term)}, {next_event, Type, Event}]};
%% [NotifyTerm RPC] Drop NotifyTerm RPCs with matching term
handle_rpc_impl(_, _, ?NOTIFY_TERM, _, _, _, _, #raft_state{}) ->
    keep_state_and_data;
%% [Protocol] Continue with normalization
handle_rpc_impl(Type, _, Key, _, Sender, Payload, State, Data) when is_tuple(Payload) ->
    handle_rpc_normalization(Type, Key, Sender, Payload, State, Data).

-spec handle_rpc_normalization(
    Type :: gen_statem:event_type(),
    Key :: atom(),
    Sender :: #raft_identity{},
    Payload :: undefined | tuple(),
    State :: state(),
    Data :: #raft_state{}
) -> gen_statem:event_handler_result(state(), #raft_state{}).

%% [Protocol] Convert any valid remote procedure call to the appropriate local procedure call.
handle_rpc_normalization(Type, Key, Sender, Payload, State, #raft_state{table = Table} = Data) when is_tuple(Payload) ->
    case protocol() of
        #{Key := ?PROCEDURE(Procedure, Defaults)} ->
            handle_procedure(Type, ?REMOTE(Sender, ?PROCEDURE(Procedure, defaultize_payload(Defaults, Payload))), State, Data);
        #{} ->
            ?RAFT_COUNT(Table, {'rpc.unknown', State}),
            ?SERVER_LOG_DEBUG(State, Data, "receives unknown RPC type ~0p with payload ~0P", [Key, Payload, 25]),
            keep_state_and_data
    end.

-spec handle_procedure(
    Type :: gen_statem:event_type(),
    ProcedureCall :: remote(procedure()),
    State :: state(),
    Data :: #raft_state{}
) -> gen_statem:event_handler_result(state(), #raft_state{}).

%% [AppendEntries RPC] If we haven't discovered leader for this term, record it
handle_procedure(Type, ?REMOTE(Sender, ?APPEND_ENTRIES(_, _, _, _, _)) = Procedure, State, #raft_state{leader = undefined} = Data) ->
    ?SERVER_LOG_NOTICE(State, Data, "leader is now ~0p.", [identity_to_server(Sender)]),
    NewData = Data#raft_state{leader = Sender},
    update_current_term_info(State, NewData),
    {keep_state, NewData, {next_event, Type, Procedure}};
%% [Handover][Handover RPC] If we haven't discovered leader for this term, record it
handle_procedure(Type, ?REMOTE(Sender, ?HANDOVER(_, _, _, _)) = Procedure, State, #raft_state{leader = undefined} = Data) ->
    ?SERVER_LOG_NOTICE(State, Data, "leader is now ~0p.", [identity_to_server(Sender)]),
    NewData = Data#raft_state{leader = Sender},
    update_current_term_info(State, NewData),
    {keep_state, NewData, {next_event, Type, Procedure}};
handle_procedure(Type, Procedure, _, #raft_state{}) ->
    {keep_state_and_data, {next_event, Type, Procedure}}.

-spec defaultize_payload(Defaults :: tuple(), Payload :: tuple()) -> tuple().
defaultize_payload(Defaults, Payload) ->
    defaultize_payload(Defaults, Payload, tuple_size(Defaults), tuple_size(Payload)).

-spec defaultize_payload(tuple(), tuple(), non_neg_integer(), non_neg_integer()) -> tuple().
defaultize_payload(_, Payload, N, N) ->
    Payload;
defaultize_payload(Defaults, Payload, N, M) when N > M ->
    defaultize_payload(Defaults, erlang:insert_element(M + 1, Payload, element(M + 1, Defaults)), N, M + 1);
defaultize_payload(Defaults, Payload, N, M) when N < M ->
    defaultize_payload(Defaults, erlang:delete_element(M, Payload), N, M - 1).

%%------------------------------------------------------------------------------
%% RAFT Server - State Machine Implementation - Stalled State
%%------------------------------------------------------------------------------
%% The stalled state is an extension to the RAFT protocol designed to handle
%% situations in which a replica of the FSM is lost or replaced within a RAFT
%% cluster without being removed from the cluster membership. As the replica of
%% the FSM stored before the machine was lost or replaced could have been a
%% critical member of a quorum, it is important to ensure that the replacement
%% does not support a different result for any quorums before it receives a
%% fresh copy of the FSM state and log that is guaranteed to reflect any
%% quorums that the machine supported before it was lost or replaced.
%%
%% This is achieved by preventing a stalled node from participating in quorum
%% for both log entries and election. A leader of the cluster must provide a
%% fresh copy of its FSM state before the stalled node can return to normal
%% operation.
%%------------------------------------------------------------------------------

-spec stalled
    (
        Type :: enter,
        LastState :: state(),
        Data :: #raft_state{}
    ) -> gen_statem:state_enter_result(state(), #raft_state{});
    (
        Type :: gen_statem:event_type(),
        Event :: event(),
        Data :: #raft_state{}
    ) -> gen_statem:event_handler_result(state(), #raft_state{}).

stalled(enter, LastState, #raft_state{table = Table} = State) ->
    ?RAFT_COUNT(Table, 'stalled.enter'),

    ?FUNCTION_NAME =/= LastState andalso
        ?SERVER_LOG_NOTICE(State, "becomes stalled from state ~0p.", [LastState]),
    {keep_state, enter_state(?FUNCTION_NAME, State)};

%% [Internal] Advance to newer term when requested
stalled(internal, ?ADVANCE_TERM(NewTerm), #raft_state{table = Table, current_term = CurrentTerm} = State) when NewTerm > CurrentTerm ->
    ?RAFT_COUNT(Table, 'stalled.advance_term'),
    ?SERVER_LOG_NOTICE(State, "advancing to new term ~0p.", [NewTerm]),
    {repeat_state, advance_term(?FUNCTION_NAME, NewTerm, undefined, State)};

%% [Protocol] Parse any RPCs in network formats
stalled(Type, Event, #raft_state{} = State) when is_tuple(Event), element(1, Event) =:= rpc ->
    handle_rpc(Type, Event, ?FUNCTION_NAME, State);

%% [AppendEntries RPC] Stalled nodes always discard heartbeats
stalled(_, ?REMOTE(Sender, ?APPEND_ENTRIES(PrevLogIndex, _, _, _, _)), #raft_state{} = State) ->
    NewState = State#raft_state{last_quorum_ts = erlang:monotonic_time(millisecond)},
    send_rpc(Sender, ?APPEND_ENTRIES_RESPONSE(PrevLogIndex, false, 0, 0), NewState),
    {keep_state, NewState};

stalled({call, From}, ?TRIGGER_ELECTION_COMMAND(_), #raft_state{} = State) ->
    ?SERVER_LOG_WARNING(State, "cannot start an election.", []),
    {keep_state_and_data, {reply, From, {error, invalid_state}}};

stalled({call, From}, ?PROMOTE_COMMAND(_, _), #raft_state{} = State) ->
    ?SERVER_LOG_WARNING(State, "cannot be promoted to leader.", []),
    {keep_state_and_data, {reply, From, {error, invalid_state}}};

stalled(
    {call, From},
    ?BOOTSTRAP_COMMAND(#raft_log_pos{index = Index, term = Term} = Position, Config, Data),
    #raft_state{
        self = Self,
        partition_path = PartitionPath,
        storage = Storage,
        current_term = CurrentTerm,
        last_applied = LastApplied
    } = State0
) ->
    case LastApplied =:= 0 of
        true ->
            ?SERVER_LOG_NOTICE(State0, "attempting bootstrap at ~0p:~0p with config ~0p and data ~0P.", [Index, Term, Config, Data, 30]),
            Path = filename:join(PartitionPath, io_lib:format("snapshot.~0p.~0p.bootstrap.tmp", [Index, Term])),
            try
                ok = wa_raft_storage:make_empty_snapshot(Storage, Path, Position, Config, Data),
                State1 = open_snapshot(Path, Position, State0),
                AdjustedTerm = max(1, Term),
                case AdjustedTerm > CurrentTerm of
                    true ->
                        case is_single_member(Self, config(State1)) of
                            true ->
                                State2 = advance_term(?FUNCTION_NAME, AdjustedTerm, node(), State1),
                                ?SERVER_LOG_NOTICE(State2, "switching to leader as sole member after successful bootstrap.", []),
                                {next_state, leader, State2, {reply, From, ok}};
                            false ->
                                State2 = advance_term(?FUNCTION_NAME, AdjustedTerm, undefined, State1),
                                ?SERVER_LOG_NOTICE(State2, "switching to follower after successful bootstrap.", []),
                                {next_state, follower_or_witness(State2), State2, {reply, From, ok}}
                        end;
                    false ->
                        ?SERVER_LOG_NOTICE(State1, "switching to follower after successful bootstrap.", []),
                        {next_state, follower_or_witness(State1), State1, {reply, From, ok}}
                end
            catch
                _:Reason ->
                    ?SERVER_LOG_WARNING(State0, "failed to bootstrap due to ~0P.", [Reason, 20]),
                    {keep_state_and_data, {reply, From, {error, Reason}}}
            after
                try file:del_dir_r(Path)
                catch _:_ -> ok
                end
            end;
        false ->
            ?SERVER_LOG_NOTICE(State0, "at ~0p rejecting request to bootstrap with data.", [LastApplied]),
            {keep_state_and_data, {reply, From, {error, rejected}}}
    end;

stalled(
    Type,
    ?SNAPSHOT_AVAILABLE_COMMAND(Root, #raft_log_pos{index = SnapshotIndex, term = SnapshotTerm} = SnapshotPos),
    #raft_state{
        current_term = CurrentTerm,
        last_applied = LastApplied
    } = State0
) ->
    case SnapshotIndex > LastApplied orelse LastApplied =:= 0 of
        true ->
            try
                ?SERVER_LOG_NOTICE(State0, "applying snapshot at ~0p:~0p.", [SnapshotIndex, SnapshotTerm]),
                State1 = open_snapshot(Root, SnapshotPos, State0),
                State2 = case SnapshotTerm > CurrentTerm of
                    true -> advance_term(?FUNCTION_NAME, SnapshotTerm, undefined, State1);
                    false -> State1
                end,
                % At this point, we assume that we received some cluster membership configuration from
                % our peer so it is safe to transition to an operational state.
                reply(Type, ok),
                {next_state, follower_or_witness(State2), State2}
            catch
                _:Reason ->
                    ?SERVER_LOG_WARNING(State0, "failed to load available snapshot ~0p due to ~0P", [Root, Reason, 20]),
                    reply(Type, {error, Reason}),
                    keep_state_and_data
            end;
        false ->
            ?SERVER_LOG_NOTICE(State0, "at ~0p ignoring old snapshot at ~0p:~0p", [LastApplied, SnapshotIndex, SnapshotTerm]),
            reply(Type, {error, rejected}),
            keep_state_and_data
    end;

%% [Command] Defer to common handling for generic RAFT server commands
stalled(Type, ?RAFT_COMMAND(_, _) = Event, #raft_state{} = State) ->
    command(?FUNCTION_NAME, Type, Event, State);

%% [Async Log Append]
%% A stale completion can arrive after a leader stepped down. Since the client
%% request is cancelled, rollback any local append bytes so they cannot replay
%% as an unacknowledged write after restart.
stalled(info, {async_log_append_complete, Ref, Result}, #raft_state{} = State0) ->
    State1 = complete_async_log_append_after_leadership_loss(?FUNCTION_NAME, Ref, Result, State0),
    {keep_state, State1};

stalled(info, {'DOWN', MonitorRef, process, WorkerPid, Reason}, #raft_state{} = State0) ->
    State1 = complete_async_log_append_down(?FUNCTION_NAME, MonitorRef, WorkerPid, Reason, State0),
    {keep_state, State1};

%% [Fallback] Report unhandled events
stalled(Type, Event, #raft_state{} = State) ->
    ?SERVER_LOG_WARNING(State, "did not know how to handle ~0p event ~0P", [Type, Event, 20]),
    keep_state_and_data.

%%------------------------------------------------------------------------------
%% RAFT Server - State Machine Implementation - Leader State
%%------------------------------------------------------------------------------
%% In a RAFT cluster, the leader of a RAFT term is a replica that has received
%% a quorum of votes from the cluster in that RAFT term, establishing it as the
%% unique coordinator for that RAFT term. The leader is responsible for
%% accepting and replicating new log entries to progress the state of the FSM.
%%------------------------------------------------------------------------------

-spec leader
    (
        Type :: enter,
        LastState :: state(),
        Data :: #raft_state{}
    ) -> gen_statem:state_enter_result(state(), #raft_state{});
    (
        Type :: gen_statem:event_type(),
        Event :: event(),
        Data :: #raft_state{}
    ) -> gen_statem:event_handler_result(state(), #raft_state{}).

leader(enter, LastState, #raft_state{table = Table, log_view = View} = State0) ->
    ?RAFT_COUNT(Table, 'leader.enter'),
    ?FUNCTION_NAME =/= LastState andalso
        ?SERVER_LOG_NOTICE(State0, "becomes leader from state ~0p.", [LastState]),

    % Setup leader state
    State1 = enter_state(?FUNCTION_NAME, State0),

    % Attempt to refresh the label state as necessary for new log entries
    LastLogIndex = wa_raft_log:last_index(View),
    State2 = case wa_raft_log:get(View, LastLogIndex) of
        {ok, {_, {_, LastLabel, _}}} ->
            State1#raft_state{last_label = LastLabel};
        {ok, {_, undefined}} ->
            % The RAFT log could have been reset (i.e. after snapshot installation).
            % In such case load the log label state from storage.
            LastLabel = load_label_state(State1),
            State1#raft_state{last_label = LastLabel};
        {ok, _} ->
            State1#raft_state{last_label = undefined}
    end,

    % At the start of a new term, the leader should append at least one log
    % entry to start the process of establishing the first quorum in the new
    % term.
    State3 = State2#raft_state{first_current_term_log_index = LastLogIndex + 1},
    State4 = case has_pending_commits(State3) of
        true ->
            % If there are pending commits, then use those for the initial
            % heartbeat. There is potential risk that the log may reject
            % these initial commits; leaving the leader with no log entry
            % in the current term; however, this should be a rare occurrence.
            State3;
        false ->
            % Otherwise, we should explicitly add a `noop` so that there is
            % something to establish quorum on.
            {LogEntry, NewState3} = make_log_entry({make_ref(), noop}, State3),
            {ok, NewView} = wa_raft_log:append(View, [LogEntry]),
            NewState3#raft_state{log_view = NewView}
    end,

    % Perform initial heartbeat and log entry resolution
    State5 = append_entries_to_followers(State4),
    State6 = update_quorum_ts(State5),
    State7 = apply_single_node_cluster(State6),
    {keep_state, State7, ?HEARTBEAT_TIMEOUT(State7)};

%% [Internal] Advance to newer term when requested
leader(
    internal,
    ?ADVANCE_TERM(NewTerm),
    #raft_state{table = Table, current_term = CurrentTerm} = State0
) when NewTerm > CurrentTerm ->
    ?RAFT_COUNT(Table, 'leader.advance_term'),
    ?SERVER_LOG_NOTICE(State0, "advancing to new term ~0p.", [NewTerm]),
    State1 = advance_term(?FUNCTION_NAME, NewTerm, undefined, State0),
    {next_state, follower_or_witness(State1), State1};

%% [Protocol] Parse any RPCs in network formats
leader(Type, Event, #raft_state{} = State) when is_tuple(Event), element(1, Event) =:= rpc ->
    handle_rpc(Type, Event, ?FUNCTION_NAME, State);

%% [AppendEntries RPC] Leaders should not act upon any incoming heartbeats (5.1, 5.2)
leader(_, ?REMOTE(_, ?APPEND_ENTRIES(_, _, _, _, _)), #raft_state{}) ->
    keep_state_and_data;

%% [Leader] Handle AppendEntries RPC responses (5.2, 5.3, 7).
%% Handle normal-case successes
leader(
    cast,
    ?REMOTE(
        ?IDENTITY_REQUIRES_MIGRATION(_, FollowerId) = Sender,
        ?APPEND_ENTRIES_RESPONSE(_, true, FollowerMatchIndex, FollowerLastAppliedIndex)
    ),
    #raft_state{
        table = Table,
        commit_index = CommitIndex,
        next_indices = NextIndices,
        match_indices = MatchIndices,
        last_applied_indices = LastAppliedIndices,
        first_current_term_log_index = TermStartIndex
    } = State0
) ->
    StartT = erlang:monotonic_time(microsecond),
    ?SERVER_LOG_DEBUG(State0, "at commit index ~0p completed append to ~0p whose log now matches up to ~0p.",
        [CommitIndex, Sender, FollowerMatchIndex]),
    State1 = update_heartbeat_reply_ts(FollowerId, State0),

    NextIndex = maps:get(FollowerId, NextIndices, TermStartIndex),
    NewMatchIndices = MatchIndices#{FollowerId => FollowerMatchIndex},
    NewNextIndices = NextIndices#{FollowerId => max(NextIndex, FollowerMatchIndex + 1)},
    NewLastAppliedIndices = case FollowerLastAppliedIndex of
        undefined -> LastAppliedIndices;
        _ -> LastAppliedIndices#{FollowerId => FollowerLastAppliedIndex}
    end,

    State2 = State1#raft_state{
        next_indices = NewNextIndices,
        match_indices = NewMatchIndices,
        last_applied_indices = NewLastAppliedIndices
    },
    State3 = leader_advance_commit_index(State2),
    State4 = leader_apply_log(State3),
    ?RAFT_GATHER(Table, 'leader.apply.func', erlang:monotonic_time(microsecond) - StartT),
    {keep_state, maybe_heartbeat(State4), ?HEARTBEAT_TIMEOUT(State4)};

%% and failures.
leader(
    cast,
    ?REMOTE(
        ?IDENTITY_REQUIRES_MIGRATION(_, FollowerId) = Sender,
        ?APPEND_ENTRIES_RESPONSE(_, false, FollowerEndIndex, FollowerLastAppliedIndex)
    ),
    #raft_state{
        table = Table,
        commit_index = CommitIndex,
        next_indices = NextIndices,
        last_applied_indices = LastAppliedIndices
    } = State0
) ->
    ?RAFT_COUNT(Table, 'leader.append.failure'),
    ?SERVER_LOG_DEBUG(State0, "at commit index ~0p failed append to ~0p whose log now ends at ~0p.",
        [CommitIndex, Sender, FollowerEndIndex]),
    State1 = update_heartbeat_reply_ts(FollowerId, State0),

    % Check to see if we should request a snapshot due to this failure
    leader_maybe_request_snapshot(FollowerId, FollowerEndIndex, FollowerLastAppliedIndex, State1),

    % We must trust the follower's last log index here because the follower may have
    % applied a snapshot since the last successful heartbeat. In such case, we need
    % to fast-forward the follower's next index so that we resume replication at the
    % point after the snapshot.
    NewNextIndices = NextIndices#{FollowerId => FollowerEndIndex + 1},
    NewLastAppliedIndices = case FollowerLastAppliedIndex of
        undefined -> LastAppliedIndices;
        _ -> LastAppliedIndices#{FollowerId => FollowerLastAppliedIndex}
    end,

    State2 = State1#raft_state{
        next_indices = NewNextIndices,
        last_applied_indices = NewLastAppliedIndices
    },
    State3 = leader_apply_log(State2),
    {keep_state, maybe_heartbeat(State3), ?HEARTBEAT_TIMEOUT(State3)};

%% [RequestPreVote RPC] Respond to pre-vote
leader(_, ?REMOTE(Candidate, ?REQUEST_PRE_VOTE(Ref)), #raft_state{} = Data) ->
    handle_request_pre_vote(Candidate, Ref, Data);

%% [PreVote RPC] Pre-votes are only useful to candidates
leader(_, ?REMOTE(_, ?PRE_VOTE(_, _, _, _)), #raft_state{}) ->
    keep_state_and_data;

%% [RequestVote RPC] Reject any vote requests as leadership is already established (5.1, 5.2)
leader(_, ?REMOTE(Sender, ?REQUEST_VOTE(_, _, _)), #raft_state{} = Data) ->
    send_rpc(Sender, ?VOTE(false), Data),
    keep_state_and_data;

%% [Vote RPC] Votes are only useful to candidates
leader(_, ?REMOTE(_, ?VOTE(_)), #raft_state{}) ->
    keep_state_and_data;

%% [Handover][Handover RPC] We are already leader so ignore any handover requests.
leader(_, ?REMOTE(Sender, ?HANDOVER(Reference, _, _, _)), #raft_state{} = State) ->
    send_rpc(Sender, ?HANDOVER_FAILED(Reference), State),
    keep_state_and_data;

%% [Handover][HandoverFailed RPC] Our handover failed, so clear the handover status.
leader(
    _,
    ?REMOTE(?IDENTITY_REQUIRES_MIGRATION(_, NodeId) = Sender, ?HANDOVER_FAILED(Reference)),
    #raft_state{
        handover = {NodeId, Reference, _}
    } = State
) ->
    ?SERVER_LOG_NOTICE(State, "resuming normal operations after failed handover to ~0p.", [Sender]),
    {keep_state, State#raft_state{handover = undefined}};

%% [Handover][HandoverFailed RPC] Got a handover failed with an unknown ID. Ignore.
leader(_, ?REMOTE(_, ?HANDOVER_FAILED(_)), #raft_state{}) ->
    keep_state_and_data;

%% [Timeout] Suspend periodic heartbeat to followers while handover is active
leader(state_timeout = Type, Event, #raft_state{handover = {Peer, _, Timeout}} = State) ->
    NowMillis = erlang:monotonic_time(millisecond),
    case NowMillis > Timeout of
        true ->
            ?SERVER_LOG_NOTICE(State, "handover to ~0p times out.", [Peer]),
            {keep_state, State#raft_state{handover = undefined}, {next_event, Type, Event}};
        false ->
            update_status(?FUNCTION_NAME, State),
            {keep_state_and_data, ?HEARTBEAT_TIMEOUT(State)}
    end;

%% [Timeout] Periodic heartbeat to followers
leader(state_timeout, _, #raft_state{table = Table} = State0) ->
    case leader_eligible(State0) of
        eligible ->
            State1 = append_entries_to_followers(State0),
            State2 = apply_single_node_cluster(State1),
            update_status(?FUNCTION_NAME, State2),
            {keep_state, State2, ?HEARTBEAT_TIMEOUT(State2)};
        Reason ->
            ?RAFT_COUNT(Table, {'leader.resign', Reason}),
            ?SERVER_LOG_NOTICE(State0, "resigns from leadership because this node is ~0p.", [Reason]),
            {next_state, follower_or_witness(State0), State0}
    end;

%% [Commit]
%%   If a handover is in progress, reject the commit immediately with a
%%   notify_redirect error so the client can redirect to the new leader.
%%   Otherwise, add the commit to the pending list and append if enough
%%   have accumulated.
leader(
    cast,
    ?COMMIT_COMMAND(From, _Op, Priority),
    #raft_state{
        table = Table,
        queues = Queues,
        handover = {Peer, _, _}
    } = _State
) ->
    ?RAFT_COUNT(Table, 'commit.rejected.handover'),
    wa_raft_queue:commit_cancelled(Queues, From, {error, {notify_redirect, Peer}}, Priority),
    keep_state_and_data;
leader(
    cast,
    ?COMMIT_COMMAND(From, Op, Priority),
    #raft_state{application = App, table = Table} = State0
) ->
    % No size limit is imposed here as the pending queue cannot grow larger
    % than the limit on the number of pending commits.
    ?RAFT_COUNT(Table, {'commit', Priority}),
    HadPending = has_pending_commits(State0),
    State1 = add_pending(From, Op, Priority, State0),
    % Single-member leaders can apply their own log immediately, but doing it
    % before this batching gate defeats raft_commit_batch_interval_ms and turns
    % every standalone durable write into its own fsync. Keep the command
    % pending until the same batch window/max-count policy used by multi-member
    % leaders decides to flush. Arm the window only for the first pending
    % commit. Re-arming it on every commit lets continuous sub-max traffic
    % postpone the append until the stream goes idle.
    PendingCount = pending_count(State1),
    case ?RAFT_COMMIT_BATCH_INTERVAL(App, Table) > 0 andalso PendingCount =< ?RAFT_COMMIT_BATCH_MAX_ENTRIES(App, Table) of
        true ->
            ?RAFT_COUNT(Table, 'commit.batch.delay'),
            case HadPending of
                true ->
                    {keep_state, State1};
                false ->
                    {keep_state, State1, ?COMMIT_BATCH_TIMEOUT(State1)}
            end;
        false ->
            State2 = append_entries_to_followers(State1),
            State3 = apply_single_node_cluster(State2),
            {keep_state, State3, ?HEARTBEAT_TIMEOUT(State3)}
    end;

%% [Strong Read]
%%   If a handover is in progress, reject the read immediately with a
%%   notify_redirect error so the client can redirect to the new leader.
leader(
    cast,
    ?READ_COMMAND({From, _}),
    #raft_state{
        table = Table,
        queues = Queues,
        handover = {Peer, _, _}
    } = _State
) ->
    ?RAFT_COUNT(Table, 'read.rejected.handover'),
    wa_raft_queue:fulfill_incomplete_read(Queues, From, {error, {notify_redirect, Peer}}),
    keep_state_and_data;
leader(
    cast,
    ?READ_COMMAND({From, Command}),
    #raft_state{
        self = Self,
        queues = Queues,
        storage = Storage,
        commit_index = CommitIndex,
        last_applied = LastApplied,
        pending_high = PendingHigh,
        pending_low = PendingLow,
        first_current_term_log_index = FirstLogIndex
    } = State0
) ->
    ReadIndex = max(CommitIndex, FirstLogIndex),
    case is_single_member(Self, config(State0)) of
        % If we are a single node cluster and we are fully-applied, then immediately dispatch.
        true when PendingHigh =:= [], PendingLow =:= [], ReadIndex =< LastApplied ->
            wa_raft_storage:apply_read(Storage, From, Command),
            {keep_state, State0};
        _ ->
            ok = wa_raft_queue:submit_read(Queues, ReadIndex, From, Command),
            % Regardless of whether or not the read index is an existing log entry, indicate that
            % a read is pending as the leader must establish a new quorum to be able to serve the
            % read request.
            {keep_state, State0#raft_state{pending_read = true}}
    end;

%% [Resign] Leader resigns by switching to follower state.
leader({call, From}, ?RESIGN_COMMAND, #raft_state{} = State) ->
    ?SERVER_LOG_NOTICE(State, "resigns.", []),
    {next_state, follower_or_witness(State), State, {reply, From, ok}};

%% [Adjust Membership] Leader attempts to commit a single-node membership change.
leader(Type, ?ADJUST_CONFIG_COMMAND(From, Action, Index), #raft_state{queues = Queues} = State0) ->
    maybe
        ok ?= leader_config_change_allowed(Index, State0),
        {ok, NewConfig} ?= leader_adjust_config(Action, State0),
        % With all checks completed, we can now attempt to append the new
        % configuration to the log. If successful, a round of heartbeats is
        % immediately started to replicate the change as soon as possible.
        {ok, #raft_log_pos{index = NewConfigIndex} = NewConfigPosition, State1} ?=
            leader_change_config(NewConfig, From, State0),

        ?SERVER_LOG_NOTICE(
            State1,
            "is attempting to change configuration from ~0p to ~0p at ~0p.",
            [config(State0), NewConfig, NewConfigIndex]
        ),
        State2 = apply_single_node_cluster(State1),
        State3 = append_entries_to_followers(State2),
        reply(Type, {ok, NewConfigPosition}),
        {keep_state, State3, ?HEARTBEAT_TIMEOUT(State3)}
    else
        {error, Reason} ->
            ?SERVER_LOG_NOTICE(
                State0,
                "failed to apply action ~0p to current configuration ~0p due to ~0P.",
                [Action, config(State0), Reason, 20]
            ),
            From =/= undefined andalso
                wa_raft_queue:commit_completed(Queues, From, {error, Reason}, high),
            reply(Type, {error, Reason}),
            keep_state_and_data
    end;

%% [Handover Candidates] Return list of handover candidates (peers that are not lagging too much)
leader({call, From}, ?HANDOVER_CANDIDATES_COMMAND, #raft_state{} = State) ->
    {keep_state_and_data, {reply, From, {ok, get_handover_candidates(State)}}};

%% [Is Peer Ready] Check if participant is caught up
leader({call, From}, ?IS_PEER_READY_COMMAND(Peer), #raft_state{} = State) ->
    Config = config(State),
    IsParticipant = lists:member(Peer, config_participants(Config)),
    IsReady = is_eligible_for_handover(Peer, State),
    Result =
        if
            not IsParticipant -> {error, not_a_participant};
            not IsReady -> {error, not_ready};
            true -> ok
        end,
    {keep_state_and_data, {reply, From, Result}};

%% [Handover] With peer 'undefined' randomly select a valid candidate to handover to
leader(Type, ?HANDOVER_COMMAND(undefined), #raft_state{} = State) ->
    case get_handover_candidates(State) of
        [] ->
            ?SERVER_LOG_NOTICE(State, "has no valid peer to handover to.", []),
            reply(Type, {error, no_valid_peer}),
            keep_state_and_data;
        Candidates ->
            Peer = lists:nth(rand:uniform(length(Candidates)), Candidates),
            leader(Type, ?HANDOVER_COMMAND(Peer), State)
    end;

%% [Handover] Handover to self results in no-op
leader(Type, ?HANDOVER_COMMAND(Peer), #raft_state{} = State) when Peer =:= node() ->
    ?SERVER_LOG_WARNING(State, "dropping handover to self.", []),
    reply(Type, {ok, Peer}),
    {keep_state, State};

%% [Handover] Attempt to start a handover to the specified peer
leader(
    Type,
    ?HANDOVER_COMMAND(Peer),
    #raft_state{
        application = App,
        table = Table,
        name = Name,
        log_view = View,
        match_indices = MatchIndices,
        handover = undefined
    } = State0
) ->
    % TODO T246543673 For the time being, assume that all members of the
    %                 cluster use the same server name.
    case is_member({Name, Peer}, config(State0)) of
        false ->
            ?SERVER_LOG_WARNING(State0, "dropping handover to unknown peer ~0p.", [Peer]),
            reply(Type, {error, invalid_peer}),
            keep_state_and_data;
        true ->
            PeerMatchIndex = maps:get(Peer, MatchIndices, 0),
            FirstIndex = wa_raft_log:first_index(View),
            PeerSendIndex = max(PeerMatchIndex + 1, FirstIndex + 1),
            LastIndex = wa_raft_log:last_index(View),
            MaxHandoverBatchSize = ?RAFT_HANDOVER_MAX_ENTRIES(App, Table),
            MaxHandoverBytes = ?RAFT_HANDOVER_MAX_BYTES(App, Table),

            case LastIndex - PeerSendIndex =< MaxHandoverBatchSize of
                true ->
                    ?RAFT_COUNT(Table, 'leader.handover'),
                    ?SERVER_LOG_NOTICE(State0, "starting handover to ~p.", [Peer]),

                    PrevLogIndex = PeerSendIndex - 1,
                    {ok, PrevLogTerm} = wa_raft_log:term(View, PrevLogIndex),
                    {ok, LogEntries} = wa_raft_log:entries(View, PeerSendIndex, MaxHandoverBatchSize, MaxHandoverBytes),

                    % The request to load the log may result in not all required log entries being loaded
                    % if we hit the byte size limit. Ensure that we have loaded all required log entries
                    % before initiating a handover.
                    case PrevLogIndex + length(LogEntries) of
                        LastIndex ->
                            Ref = make_ref(),
                            Timeout = erlang:monotonic_time(millisecond) + ?RAFT_HANDOVER_TIMEOUT(App, Table),
                            State1 = State0#raft_state{handover = {Peer, Ref, Timeout}},
                            send_rpc(?IDENTITY_REQUIRES_MIGRATION(Name, Peer), ?HANDOVER(Ref, PrevLogIndex, PrevLogTerm, LogEntries), State1),
                            reply(Type, {ok, Peer}),
                            {keep_state, State1};
                        _ ->
                            ?RAFT_COUNT(Table, 'leader.handover.oversize'),
                            ?SERVER_LOG_WARNING(State0, "handover to peer ~0p would require an oversized RPC.", [Peer]),
                            reply(Type, {error, oversize}),
                            keep_state_and_data
                    end;
                false ->
                    ?RAFT_COUNT(Table, 'leader.handover.peer_lagging'),
                    ?SERVER_LOG_WARNING(State0, "determines that peer ~0p is not eligible for handover because it is ~0p entries behind.",
                        [Peer, LastIndex - PeerSendIndex]),
                    reply(Type, {error, peer_lagging}),
                    keep_state_and_data
            end
    end;

%% [Handover] Reject starting a handover when a handover is still in progress
leader({call, From}, ?HANDOVER_COMMAND(Peer), #raft_state{handover = {Node, _, _}} = State) ->
    ?SERVER_LOG_WARNING(State, "rejecting duplicate handover to ~0p with running handover to ~0p.", [Peer, Node]),
    {keep_state_and_data, {reply, From, {error, duplicate}}};

%% [Command] Defer to common handling for generic RAFT server commands
leader(Type, ?RAFT_COMMAND(_, _) = Event, #raft_state{} = State) ->
    command(?FUNCTION_NAME, Type, Event, State);

%% [Async Log Append]
%% A feature-flagged single-member fast path writes the local durable log on a
%% background process. Completion is the only point where log_view advances and
%% entries may become visible/applied.
leader(info, {async_log_append_complete, Ref, Result}, #raft_state{} = State0) ->
    State1 = complete_async_log_append(Ref, Result, State0),
    State2 = append_entries_to_followers(State1),
    State3 = apply_single_node_cluster(State2),
    State4 = update_quorum_ts(State3),
    update_status(?FUNCTION_NAME, State4),
    {keep_state, State4, ?HEARTBEAT_TIMEOUT(State4)};

leader(info, {'DOWN', MonitorRef, process, WorkerPid, Reason}, #raft_state{} = State0) ->
    State1 = complete_async_log_append_down(?FUNCTION_NAME, MonitorRef, WorkerPid, Reason, State0),
    State2 = append_entries_to_followers(State1),
    update_status(?FUNCTION_NAME, State2),
    {keep_state, State2, ?HEARTBEAT_TIMEOUT(State2)};

%% [Fallback] Report unhandled events
leader(Type, Event, #raft_state{} = State) ->
    ?SERVER_LOG_WARNING(State, "did not know how to handle ~0p event ~0P", [Type, Event, 20]),
    keep_state_and_data.

%%------------------------------------------------------------------------------
%% RAFT Server - State Machine Implementation - Follower State
%%------------------------------------------------------------------------------
%% In a RAFT cluster, a follower is a replica that is receiving replicated log
%% entries from the leader of a RAFT term. The follower participates in quorum
%% decisions about log entries received from the leader by appending those log
%% entries to its own local copy of the RAFT log.
%%------------------------------------------------------------------------------

-spec follower
    (
        Type :: enter,
        LastState :: state(),
        Data :: #raft_state{}
    ) -> gen_statem:state_enter_result(state(), #raft_state{});
    (
        Type :: gen_statem:event_type(),
        Event :: event(),
        Data :: #raft_state{}
    ) -> gen_statem:event_handler_result(state(), #raft_state{}).

follower(enter, LastState, #raft_state{table = Table} = State) ->
    ?RAFT_COUNT(Table, 'follower.enter'),
    ?FUNCTION_NAME =/= LastState andalso
        ?SERVER_LOG_NOTICE(State, "becomes follower from state ~0p.", [LastState]),
    {keep_state, enter_state(?FUNCTION_NAME, State), ?ELECTION_TIMEOUT(State)};

%% [Internal] Advance to newer term when requested
follower(internal, ?ADVANCE_TERM(NewTerm), #raft_state{table = Table, current_term = CurrentTerm} = State) when NewTerm > CurrentTerm ->
    ?RAFT_COUNT(Table, 'follower.advance_term'),
    ?SERVER_LOG_NOTICE(State, "advancing to new term ~0p.", [NewTerm]),
    {repeat_state, advance_term(?FUNCTION_NAME, NewTerm, undefined, State)};

%% [Protocol] Parse any RPCs in network formats
follower(Type, Event, #raft_state{} = State) when is_tuple(Event), element(1, Event) =:= rpc ->
    handle_rpc(Type, Event, ?FUNCTION_NAME, State);

%% [AppendEntries RPC] Handle incoming heartbeats (5.2, 5.3)
follower(
    Type,
    ?REMOTE(Leader, ?APPEND_ENTRIES(PrevLogIndex, PrevLogTerm, Entries, CommitIndex, TrimIndex)),
    #raft_state{} = State
) ->
    handle_heartbeat(?FUNCTION_NAME, Type, Leader, PrevLogIndex, PrevLogTerm, Entries, CommitIndex, TrimIndex, State);

%% [AppendEntriesResponse RPC] Followers should not act upon any incoming heartbeat responses (5.2)
follower(_, ?REMOTE(_, ?APPEND_ENTRIES_RESPONSE(_, _, _, _)), #raft_state{}) ->
    keep_state_and_data;

%% [RequestPreVote RPC] Respond to pre-vote
follower(_, ?REMOTE(Candidate, ?REQUEST_PRE_VOTE(Ref)), #raft_state{} = Data) ->
    handle_request_pre_vote(Candidate, Ref, Data);

%% [PreVote RPC] Pre-votes are only useful to candidates
follower(_, ?REMOTE(_, ?PRE_VOTE(_, _, _, _)), #raft_state{}) ->
    keep_state_and_data;

%% [RequestVote RPC] Handle incoming vote requests (5.2)
follower(_, ?REMOTE(Candidate, ?REQUEST_VOTE(_, CandidateIndex, CandidateTerm)), #raft_state{} = State) ->
    handle_request_vote(?FUNCTION_NAME, Candidate, CandidateIndex, CandidateTerm, State);

%% [Vote RPC] Votes are only useful to candidates
follower(_, ?REMOTE(_, ?VOTE(_)), #raft_state{}) ->
    keep_state_and_data;

%% [Handover][Handover RPC] The leader is requesting this follower to take over leadership in a new term
follower(
    _,
    ?REMOTE(
        Sender,
        ?HANDOVER(Ref, PrevLogIndex, PrevLogTerm, LogEntries)
    ),
    #raft_state{
        application = App,
        table = Table,
        leader = Sender
    } = State0
) ->
    ?RAFT_COUNT(Table, 'follower.handover'),
    ?SERVER_LOG_NOTICE(State0, "evaluating handover RPC from ~0p.", [Sender]),
    case ?RAFT_LEADER_ELIGIBLE(App) andalso ?RAFT_ELECTION_WEIGHT(App) =/= 0 of
        true ->
            case append_entries(?FUNCTION_NAME, PrevLogIndex, PrevLogTerm, LogEntries, length(LogEntries), State0) of
                {ok, true, _, State1} ->
                    case is_self_witness(State1) of
                        false ->
                            ?SERVER_LOG_NOTICE(State1, "immediately starting new election due to append success during handover RPC.", []),
                            {next_state, candidate, State1#raft_state{next_election_type = force}};
                        true ->
                            ?SERVER_LOG_NOTICE(State1, "failing handover as a witness after handover append.", []),
                            send_rpc(Sender, ?HANDOVER_FAILED(Ref), State1),
                            {next_state, witness, State1}
                    end;
                {ok, false, _, State1} ->
                    ?RAFT_COUNT(Table, 'follower.handover.rejected'),
                    ?SERVER_LOG_WARNING(State1, "failing handover request because append was rejected.", []),
                    send_rpc(Sender, ?HANDOVER_FAILED(Ref), State1),
                    {keep_state, State1};
                {fatal, Reason} ->
                    ?RAFT_COUNT(Table, 'follower.handover.fatal'),
                    ?SERVER_LOG_WARNING(State0, "failing handover request because append was fatal due to ~0P.", [Reason, 30]),
                    send_rpc(Sender, ?HANDOVER_FAILED(Ref), State0),
                    {next_state, disabled, State0#raft_state{disable_reason = Reason}}
            end;
        false ->
            ?SERVER_LOG_NOTICE(State0, "not considering handover RPC due to being inelgibile for leadership.", []),
            send_rpc(Sender, ?HANDOVER_FAILED(Ref), State0),
            {keep_state, State0}
    end;

%% [Handover][HandoverFailed RPC] Followers should not act upon any incoming failed handover
follower(_, ?REMOTE(_, ?HANDOVER_FAILED(_)), #raft_state{}) ->
    keep_state_and_data;

%% [Follower] handle timeout
%% follower doesn't receive any heartbeat. starting a new election
follower(state_timeout, _, #raft_state{
        table = Table,
        leader = Leader,
        log_view = View,
        last_quorum_ts = LastQuorumTs
    } = State
) ->
    ?RAFT_COUNT(Table, 'follower.timeout'),
    case candidate_eligible(State) of
        true ->
            WaitingMs = case LastQuorumTs of
                undefined -> undefined;
                _         -> erlang:monotonic_time(millisecond) - LastQuorumTs
            end,
            ?SERVER_LOG_NOTICE(
                State,
                "times out and starts election at ~0p after waiting for leader ~0p for ~0p ms.",
                [wa_raft_log:last_index(View), identity_to_server(Leader), WaitingMs]
            ),
            {next_state, candidate, State};
        false ->
            ?SERVER_LOG_NOTICE(State, "is not timing out due to being ineligible or having zero election weight.", []),
            {repeat_state, State}
    end;

%% [Command] Defer to common handling for generic RAFT server commands
follower(Type, ?RAFT_COMMAND(_, _) = Event, #raft_state{} = State) ->
    command(?FUNCTION_NAME, Type, Event, State);

%% [Async Log Append]
%% Completion can arrive after resign/term change. Since the client request is
%% cancelled, rollback any local append bytes before releasing it as not-leader.
follower(info, {async_log_append_complete, Ref, Result}, #raft_state{} = State0) ->
    State1 = complete_async_log_append_after_leadership_loss(?FUNCTION_NAME, Ref, Result, State0),
    {keep_state, State1};

follower(info, {'DOWN', MonitorRef, process, WorkerPid, Reason}, #raft_state{} = State0) ->
    State1 = complete_async_log_append_down(?FUNCTION_NAME, MonitorRef, WorkerPid, Reason, State0),
    {keep_state, State1};

%% [Fallback] Report unhandled events
follower(Type, Event, #raft_state{} = State) ->
    ?SERVER_LOG_WARNING(State, "did not know how to handle ~0p event ~0P", [Type, Event, 20]),
    keep_state_and_data.

%%------------------------------------------------------------------------------
%% RAFT Server - State Machine Implementation - Candidate State
%%------------------------------------------------------------------------------
%% In a RAFT cluster, a candidate is a replica that is attempting to become the
%% leader of a RAFT term. It is waiting for responses from the other members of
%% the RAFT cluster to determine if it has received enough votes to assume the
%% leadership of the RAFT term.
%%------------------------------------------------------------------------------

-spec candidate
    (
        Type :: enter,
        LastState :: state(),
        Data :: #raft_state{}
    ) -> gen_statem:state_enter_result(state(), #raft_state{});
    (
        Type :: gen_statem:event_type(),
        Event :: event(),
        Data :: #raft_state{}
    ) -> gen_statem:event_handler_result(state(), #raft_state{}).

%% [Enter] Node starts a new election upon entering the candidate state.
candidate(
    enter,
    LastState,
    #raft_state{
        application = App,
        table = Table,
        next_election_type = ElectionType
    } = State0
) ->
    ?RAFT_COUNT(Table, 'leader.election_started'),
    ?RAFT_COUNT(Table, 'candidate.enter'),
    ?FUNCTION_NAME =/= LastState andalso
        ?SERVER_LOG_NOTICE(State0, "becomes candidate from state ~0p.", [LastState]),

    State1 = enter_state(?FUNCTION_NAME, State0#raft_state{next_election_type = normal}),

    case ElectionType =:= normal andalso ?RAFT_ELECTION_PRE_VOTE(App, Table) of
        true ->
            ?SERVER_LOG_NOTICE(State1, "advances to new term and starts pre-vote.", []),

            % When pre-vote is enabled, a round of pre-votes must first be issued
            % before the term can be advanced.
            Ref = make_ref(),
            State2 = State1#raft_state{
                pre_vote_ref = Ref,
                pre_votes = #{node() => true}
            },

            % Request a pre-vote from all peers.
            send_rpc_to_all_members(?REQUEST_PRE_VOTE(Ref), State2),

            {keep_state, State2, ?ELECTION_TIMEOUT(State2)};
        false ->
            % Otherwise, immediately start the election.
            candidate_start_election(ElectionType, State1)
    end;

%% [Internal] Advance to newer term when requested
candidate(
    internal,
    ?ADVANCE_TERM(NewTerm),
    #raft_state{
        table = Table,
        current_term = CurrentTerm
    } = State
) when NewTerm > CurrentTerm ->
    ?RAFT_COUNT(Table, 'candidate.advance_term'),
    ?SERVER_LOG_NOTICE(State, "advancing to new term ~0p.", [NewTerm]),
    State1 = advance_term(?FUNCTION_NAME, NewTerm, undefined, State),
    {next_state, follower_or_witness(State1), State1};

%% [Protocol] Parse any RPCs in network formats
candidate(Type, Event, #raft_state{} = State) when is_tuple(Event), element(1, Event) =:= rpc ->
    handle_rpc(Type, Event, ?FUNCTION_NAME, State);

%% [AppendEntries RPC] Switch to follower because current term now has a leader (5.2, 5.3)
candidate(Type, ?REMOTE(Sender, ?APPEND_ENTRIES(_, _, _, _, _)) = Event, #raft_state{} = State) ->
    ?SERVER_LOG_NOTICE(State, "switching to follower after receiving heartbeat from ~0p.", [Sender]),
    {next_state, follower_or_witness(State), State, {next_event, Type, Event}};

%% [RequestPreVote RPC] Respond to pre-vote
candidate(_, ?REMOTE(Candidate, ?REQUEST_PRE_VOTE(Ref)), #raft_state{} = Data) ->
    handle_request_pre_vote(Candidate, Ref, Data);

%% [PreVote RPC] Check if a full pre-vote quorum is established and if so, proceed with election (9.6)
candidate(
    _,
    ?REMOTE(?IDENTITY_REQUIRES_MIGRATION(_, PeerId), ?PRE_VOTE(Ref, Vote, PeerLastIndex, PeerLastTerm)),
    #raft_state{
        table = Table,
        state_start_ts = StateStart,
        log_view = View,
        pre_vote_ref = Ref,
        pre_votes = PreVotes
    } = Data
) ->
    Now = erlang:monotonic_time(millisecond),
    LastIndex = wa_raft_log:last_index(View),
    {ok, LastTerm} = wa_raft_log:term(View, LastIndex),
    Result = Vote andalso {LastTerm, LastIndex} >= {PeerLastTerm, PeerLastIndex},
    NewPreVotes = PreVotes#{PeerId => Result},
    NewData = Data#raft_state{pre_votes = NewPreVotes},
    case find_member_majority(NewPreVotes, config(NewData)) of
        {found, Outcome} ->
            % If a majority is found, then the pre-vote round is over, either accepted or rejected.
            Duration = Now - StateStart,
            case Outcome of
                true ->
                    Support = [Peer || Peer := true <- NewPreVotes],
                    ?SERVER_LOG_NOTICE(
                        NewData,
                        "is proceeding to election after ~0p ms with pre-votes from ~0p.",
                        [Duration, Support]
                    ),
                    ?RAFT_COUNT(Table, 'candidate.pre_vote.passed'),
                    candidate_start_election(normal, NewData);
                false ->
                    Opposition = [Peer || Peer := false <- NewPreVotes],
                    ?SERVER_LOG_NOTICE(
                        NewData,
                        "has failed pre-vote after ~0p ms with opposition from ~0p.",
                        [Duration, Opposition]
                    ),
                    ?RAFT_COUNT(Table, 'candidate.pre_vote.failed'),
                    {next_state, follower_or_witness(NewData), NewData}
            end;
        none ->
            {keep_state, NewData}
    end;

%% [PreVote RPC] Ignore pre-votes from other rounds
candidate(_, ?REMOTE(_, ?PRE_VOTE(_, _, _, _)), #raft_state{}) ->
    keep_state_and_data;

%% [RequestVote RPC] Candidates should reject incoming vote requests as they always vote for themselves (5.2)
candidate(_, ?REMOTE(Sender, ?REQUEST_VOTE(_, _, _)), #raft_state{} = Data) ->
    send_rpc(Sender, ?VOTE(false), Data),
    keep_state_and_data;

%% [Vote RPC] Candidate receives a vote (5.2)
candidate(
    cast,
    ?REMOTE(?IDENTITY_REQUIRES_MIGRATION(_, Node), ?VOTE(Vote)),
    #raft_state{
        table = Table,
        log_view = View,
        state_start_ts = StateStart,
        votes = Votes
    } = Data0
) ->
    NewVotes = Votes#{Node => Vote},
    Data1 = Data0#raft_state{votes = NewVotes},
    Data2 = update_heartbeat_reply_ts(Node, Data1),
    case find_member_majority(NewVotes, config(Data2)) of
        {found, Outcome} ->
            % If a majority is found, then the election is over, either accepted or rejected.
            Now = erlang:monotonic_time(millisecond),
            Duration = Now - StateStart,
            LastIndex = wa_raft_log:last_index(View),
            {ok, LastTerm} = wa_raft_log:term(View, LastIndex),
            case Outcome of
                true ->
                    % If the bid for leadership is accepted, then transition to leader.
                    Support = [Peer || Peer := true <- NewVotes],
                    ?SERVER_LOG_NOTICE(
                        Data2,
                        "is becoming leader after ~0p ms with log at ~0p:~0p and votes from ~0p.",
                        [Duration, LastIndex, LastTerm, Support]
                    ),
                    ?RAFT_COUNT(Table, 'candidate.elected'),
                    ?RAFT_GATHER(Table, 'candidate.election.duration', Duration),
                    {next_state, leader, Data2};
                false ->
                    % If the bid for leadership is rejected, then transition to follower.
                    % This will reset the election timeout which will, in effect, make this
                    % peer less likely to be the first candidate of the next term.
                    Opposition = [Peer || Peer := false <- NewVotes],
                    ?SERVER_LOG_NOTICE(
                        Data2,
                        "has failed to be elected after ~0p ms with log at ~0p:~0p and opposition from ~0p.",
                        [Duration, LastIndex, LastTerm, Opposition]
                    ),
                    ?RAFT_COUNT(Table, 'candidate.rejected'),
                    ?RAFT_GATHER(Table, 'candidate.election.duration', Duration),
                    {next_state, follower_or_witness(Data2), Data2}
            end;
        none ->
            % If no majority is found, the continue waiting.
            {keep_state, Data2}
    end;

%% [Handover][Handover RPC] Switch to follower because current term now has a leader (5.2, 5.3)
candidate(Type, ?REMOTE(_, ?HANDOVER(_, _, _, _)) = Event, #raft_state{} = State) ->
    ?SERVER_LOG_NOTICE(State, "ends election to handle handover.", []),
    {next_state, follower_or_witness(State), State, {next_event, Type, Event}};

%% [Handover][HandoverFailed RPC] Candidates should not act upon any incoming failed handover
candidate(_, ?REMOTE(_, ?HANDOVER_FAILED(_)), #raft_state{}) ->
    keep_state_and_data;

%% [Candidate] Handle Election Timeout during pre-vote phase (9.6)
%% Candidate doesn't get enough pre-votes after a period of time, restart or fallback
%% to follower if the local replica is no longer eligible.
candidate(state_timeout, _, #raft_state{pre_vote_ref = PreVoteRef, pre_votes = PreVotes} = State) when PreVoteRef =/= undefined ->
    ?SERVER_LOG_NOTICE(
        State,
        "pre-vote timed out with support from ~0p and opposition from ~0p.",
        [[Node || Node := true <- PreVotes], [Node || Node := false <- PreVotes]]
    ),
    case candidate_eligible(State) of
        true -> {repeat_state, State};
        false -> {next_state, follower_or_witness(State), State}
    end;

%% [Candidate] Handle Election Timeout during normal election (5.2)
%% Candidate doesn't get enough votes after a period of time, restart election or fallback
%% to follower if the local replica is no longer eligible.
candidate(state_timeout, _, #raft_state{votes = Votes} = State) ->
    ?SERVER_LOG_NOTICE(
        State,
        "election timed out with votes from ~0p and opposition from ~0p.",
        [[Node || Node := true <- Votes], [Node || Node := false <- Votes]]
    ),
    case candidate_eligible(State) of
        true -> {repeat_state, State};
        false -> {next_state, follower_or_witness(State), State}
    end;

%% [Commit] Candidates may optimistically buffer any incoming commits pending election.
candidate(
    cast = Event,
    ?COMMIT_COMMAND(From, Op, Priority) = Command,
    #raft_state{application = App, table = Table} = State
) ->
    case ?RAFT_CANDIDATE_BUFFER_REQUESTS(App, Table) of
        true -> {keep_state, add_pending(From, Op, Priority, State)};
        false -> command(?FUNCTION_NAME, Event, Command, State)
    end;

%% [Strong Read] Candidates may optimistically submit reads to storage pending election.
%% The ReadIndex is set to one past the last log index which is the index that the
%% new term will start at if the candidate wins. This is safe because an election
%% failure will cancel all reads via cancel_pending/2.
candidate(
    cast = Event,
    ?READ_COMMAND({From, ReadOp}) = Command,
    #raft_state{application = App, table = Table, queues = Queues, log_view = View} = State
) ->
    case ?RAFT_CANDIDATE_BUFFER_REQUESTS(App, Table) of
        true ->
            ReadIndex = wa_raft_log:last_index(View) + 1,
            ok = wa_raft_queue:submit_read(Queues, ReadIndex, From, ReadOp),
            {keep_state, State#raft_state{pending_read = true}};
        false ->
            command(?FUNCTION_NAME, Event, Command, State)
    end;

%% [Command] Defer to common handling for generic RAFT server commands
candidate(Type, ?RAFT_COMMAND(_, _) = Event, #raft_state{} = State) ->
    command(?FUNCTION_NAME, Type, Event, State);

%% [Async Log Append]
%% The append may finish while this node is campaigning. Preserve disk/log-view
%% consistency, but do not report the stale leader's client request as committed.
candidate(info, {async_log_append_complete, Ref, Result}, #raft_state{} = State0) ->
    State1 = complete_async_log_append_after_leadership_loss(?FUNCTION_NAME, Ref, Result, State0),
    {keep_state, State1};

candidate(info, {'DOWN', MonitorRef, process, WorkerPid, Reason}, #raft_state{} = State0) ->
    State1 = complete_async_log_append_down(?FUNCTION_NAME, MonitorRef, WorkerPid, Reason, State0),
    {keep_state, State1};

%% [Fallback] Report unhandled events
candidate(Type, Event, #raft_state{} = State) ->
    ?SERVER_LOG_WARNING(State, "did not know how to handle ~0p event ~0P.", [Type, Event, 20]),
    keep_state_and_data.

%%------------------------------------------------------------------------------
%% RAFT Server - State Machine Implementation - Disabled State
%%------------------------------------------------------------------------------
%% The disabled state is an extension to the RAFT protocol used to hold any
%% replicas of an FSM that have for some reason or another identified that some
%% deficiency or malfunction that makes them unfit to either enforce any prior
%% quorum decisions or properly participate in future quorum decisions. Common
%% reasons include the detection of corruptions or inconsistencies within the
%% FSM state or RAFT log. The reason for which the replica was disabled is kept
%% in persistent so that the replica will remain disabled even when restarted.
%%------------------------------------------------------------------------------

-spec disabled
    (
        Type :: enter,
        LastState :: state(),
        Data :: #raft_state{}
    ) -> gen_statem:state_enter_result(state(), #raft_state{});
    (
        Type :: gen_statem:event_type(),
        Event :: event(),
        Data :: #raft_state{}
    ) -> gen_statem:event_handler_result(state(), #raft_state{}).

disabled(enter, LastState, #raft_state{table = Table, disable_reason = DisableReason} = State0) ->
    ?RAFT_COUNT(Table, 'disabled.enter'),
    case LastState of
        ?FUNCTION_NAME ->
            {keep_state, enter_state(?FUNCTION_NAME, State0)};
        _ ->
            ?SERVER_LOG_NOTICE(State0, "becomes disabled from state ~0p with reason ~0p.", [LastState, DisableReason]),
            State1 = case DisableReason of
                undefined -> State0#raft_state{disable_reason = "No reason specified."};
                _         -> State0
            end,
            State2 = enter_state(?FUNCTION_NAME, State1),
            wa_raft_durable_state:store(State2),
            {keep_state, State2}
    end;

%% [Internal] Advance to newer term when requested
disabled(
    internal,
    ?ADVANCE_TERM(NewTerm),
    #raft_state{
        table = Table,
        current_term = CurrentTerm
    } = State
) when NewTerm > CurrentTerm ->
    ?RAFT_COUNT(Table, 'disabled.advance_term'),
    ?SERVER_LOG_NOTICE(State, "advancing to new term ~0p.", [NewTerm]),
    {repeat_state, advance_term(?FUNCTION_NAME, NewTerm, undefined, State)};

%% [Protocol] Parse any RPCs in network formats
disabled(Type, Event, #raft_state{} = State) when is_tuple(Event), element(1, Event) =:= rpc ->
    handle_rpc(Type, Event, ?FUNCTION_NAME, State);

%% [AppendEntries RPC] Disabled servers should not act upon any incoming heartbeats as they should
%%                     behave as if dead to the cluster
disabled(_, ?REMOTE(_, ?APPEND_ENTRIES(_, _, _, _, _)), #raft_state{}) ->
    keep_state_and_data;

%% [RequestPreVote RPC]
%% Disabled servers should not act upon any pre-vote requests as they should
%% be invisible to the rest of the cluster
disabled(_, ?REMOTE(_, ?REQUEST_PRE_VOTE(_)), #raft_state{}) ->
    keep_state_and_data;

%% [PreVote RPC] Pre-votes are only useful to candidates
disabled(_, ?REMOTE(_, ?PRE_VOTE(_, _, _, _)), #raft_state{}) ->
    keep_state_and_data;

%% [RequestVote RPC] Disabled servers should reject any vote requests as they should behave
%%                   as if dead to the cluster
disabled(_, ?REMOTE(Sender, ?REQUEST_VOTE(_, _, _)), #raft_state{} = Data) ->
    send_rpc(Sender, ?VOTE(false), Data),
    keep_state_and_data;

%% [Vote RPC] Votes are only useful to candidates
disabled(_, ?REMOTE(_, ?VOTE(_)), #raft_state{}) ->
    keep_state_and_data;

disabled({call, From}, ?TRIGGER_ELECTION_COMMAND(_), #raft_state{}) ->
    {keep_state_and_data, {reply, From, {error, invalid_state}}};

disabled({call, From}, ?PROMOTE_COMMAND(_, _), #raft_state{}) ->
    {keep_state_and_data, {reply, From, {error, invalid_state}}};

disabled({call, From}, ?ENABLE_COMMAND, #raft_state{} = State0) ->
    ?SERVER_LOG_NOTICE(State0, "re-enabling by request from ~0p by moving to stalled state.", [From]),
    State1 = State0#raft_state{disable_reason = undefined},
    wa_raft_durable_state:store(State1),
    {next_state, stalled, State1, {reply, From, ok}};

%% [Command] Defer to common handling for generic RAFT server commands
disabled(Type, ?RAFT_COMMAND(_, _) = Event, #raft_state{} = State) ->
    command(?FUNCTION_NAME, Type, Event, State);

%% [Async Log Append]
%% Disabled nodes must still consume a stale async append completion so the
%% append gate cannot leak across enable/restart paths.
disabled(info, {async_log_append_complete, Ref, Result}, #raft_state{} = State0) ->
    State1 = complete_async_log_append_after_leadership_loss(?FUNCTION_NAME, Ref, Result, State0),
    {keep_state, State1};

disabled(info, {'DOWN', MonitorRef, process, WorkerPid, Reason}, #raft_state{} = State0) ->
    State1 = complete_async_log_append_down(?FUNCTION_NAME, MonitorRef, WorkerPid, Reason, State0),
    {keep_state, State1};

%% [Fallback] Report unhandled events
disabled(Type, Event, #raft_state{} = State) ->
    ?SERVER_LOG_WARNING(State, "did not know how to handle ~0p event ~0P", [Type, Event, 20]),
    keep_state_and_data.

%%------------------------------------------------------------------------------
%% RAFT Server - State Machine Implementation - Witness State
%%------------------------------------------------------------------------------
%% The witness state is an extension to the RAFT protocol that identifies a
%% replica as a special "witness replica" that participates in quorum decisions
%% but does not retain a full copy of the actual underlying FSM. These replicas
%% can use significantly fewer system resources to operate however it is not
%% recommended for more than 25% of the replicas in a RAFT cluster to be
%% witness replicas as having more than such a number of witness replicas can
%% result in significantly reduced chance of data durability in the face of
%% unexpected replica loss.
%%------------------------------------------------------------------------------

-spec witness
    (
        Type :: enter,
        LastState :: state(),
        Data :: #raft_state{}
    ) -> gen_statem:state_enter_result(state(), #raft_state{});
    (
        Type :: gen_statem:event_type(),
        Event :: event(),
        Data :: #raft_state{}
    ) -> gen_statem:event_handler_result(state(), #raft_state{}).

witness(enter, LastState, #raft_state{table = Table} = State) ->
    ?RAFT_COUNT(Table, 'witness.enter'),
    ?FUNCTION_NAME =/= LastState andalso
        ?SERVER_LOG_NOTICE(State, "becomes witness from state ~0p.", [LastState]),
    {keep_state, enter_state(?FUNCTION_NAME, State), ?ELECTION_TIMEOUT(State)};

%% [Internal] Advance to newer term when requested
witness(
    internal,
    ?ADVANCE_TERM(NewTerm),
    #raft_state{
        table = Table,
        current_term = CurrentTerm
    } = State
) when NewTerm > CurrentTerm ->
    ?RAFT_COUNT(Table, 'witness.advance_term'),
    ?SERVER_LOG_NOTICE(State, "advancing to new term ~0p.", [NewTerm]),
    {repeat_state, advance_term(?FUNCTION_NAME, NewTerm, undefined, State)};

%% [Protocol] Parse any RPCs in network formats
witness(Type, Event, #raft_state{} = State) when is_tuple(Event), element(1, Event) =:= rpc ->
    handle_rpc(Type, Event, ?FUNCTION_NAME, State);

%% [AppendEntries RPC] Handle incoming heartbeats (5.2, 5.3)
witness(
    Type,
    ?REMOTE(Leader, ?APPEND_ENTRIES(PrevLogIndex, PrevLogTerm, Entries, CommitIndex, TrimIndex)),
    #raft_state{} = State
) ->
    handle_heartbeat(?FUNCTION_NAME, Type, Leader, PrevLogIndex, PrevLogTerm, Entries, CommitIndex, TrimIndex, State);

%% [AppendEntriesResponse RPC] Witnesses should not act upon any incoming heartbeat responses (5.2)
witness(_, ?REMOTE(_, ?APPEND_ENTRIES_RESPONSE(_, _, _, _)), #raft_state{}) ->
    keep_state_and_data;

%% [Handover][Handover RPC] Witnesses should not receive handover requests
witness(_, ?REMOTE(Sender, ?HANDOVER(Reference, _, _, _)), #raft_state{} = State) ->
    send_rpc(Sender, ?HANDOVER_FAILED(Reference), State),
    keep_state_and_data;

%% [Handover][HandoverFailed RPC] Witnesses should not act upon any incoming failed handover
witness(_, ?REMOTE(_, ?HANDOVER_FAILED(_)), #raft_state{}) ->
    keep_state_and_data;

witness({call, From}, ?TRIGGER_ELECTION_COMMAND(_), #raft_state{}) ->
    {keep_state_and_data, {reply, From, {error, invalid_state}}};

witness({call, From}, ?PROMOTE_COMMAND(_, _), #raft_state{}) ->
    {keep_state_and_data, {reply, From, {error, invalid_state}}};

%% [RequestPreVote RPC] Respond to pre-vote
witness(_, ?REMOTE(Candidate, ?REQUEST_PRE_VOTE(Ref)), #raft_state{} = Data) ->
    handle_request_pre_vote(Candidate, Ref, Data);

%% [PreVote RPC] Pre-votes are only useful to candidates
witness(_, ?REMOTE(_, ?PRE_VOTE(_, _, _, _)), #raft_state{}) ->
    keep_state_and_data;

%% [RequestVote RPC] Handle incoming vote requests (5.2)
witness(_, ?REMOTE(Candidate, ?REQUEST_VOTE(_, CandidateIndex, CandidateTerm)), #raft_state{} = State) ->
    handle_request_vote(?FUNCTION_NAME, Candidate, CandidateIndex, CandidateTerm, State);

%% [Vote RPC] Votes are only useful to candidates
witness(_, ?REMOTE(_, ?VOTE(_)), #raft_state{}) ->
    keep_state_and_data;

%% [State Timeout] Check liveness, but do not restart state.
witness(state_timeout, _, #raft_state{} = State) ->
    update_status(?FUNCTION_NAME, State),
    {keep_state_and_data, ?ELECTION_TIMEOUT(State)};

%% [Command] Defer to common handling for generic RAFT server commands
witness(Type, ?RAFT_COMMAND(_, _) = Event, #raft_state{} = State) ->
    command(?FUNCTION_NAME, Type, Event, State);

%% [Async Log Append]
%% Witness state should not normally own local log appends, but consume stale
%% completions defensively after role changes.
witness(info, {async_log_append_complete, Ref, Result}, #raft_state{} = State0) ->
    State1 = complete_async_log_append_after_leadership_loss(?FUNCTION_NAME, Ref, Result, State0),
    {keep_state, State1};

witness(info, {'DOWN', MonitorRef, process, WorkerPid, Reason}, #raft_state{} = State0) ->
    State1 = complete_async_log_append_down(?FUNCTION_NAME, MonitorRef, WorkerPid, Reason, State0),
    {keep_state, State1};

%% [Fallback] Report unhandled events
witness(Type, Event, #raft_state{} = State) ->
    ?SERVER_LOG_WARNING(State, "did not know how to handle ~0p event ~0P", [Type, Event, 20]),
    keep_state_and_data.

%%------------------------------------------------------------------------------
%% RAFT Server - State Machine Implementation - Command Handlers
%%------------------------------------------------------------------------------
%% Fallbacks for command calls to the RAFT server for when there is no special
%% handling for a command defined within the state-specific callback itself.
%%------------------------------------------------------------------------------

-spec command(
    State :: state(),
    Type :: gen_statem:event_type(),
    Command :: command(),
    Data :: #raft_state{}
) -> gen_statem:event_handler_result(state(), #raft_state{}).

%% [Commit] Non-leader nodes should fail commits with {error, not_leader}.
command(
    State,
    cast,
    ?COMMIT_COMMAND(From, _Op, Priority),
    #raft_state{queues = Queues, leader = Leader} = Data
) when State =/= leader ->
    ?SERVER_LOG_WARNING(
        State,
        Data,
        "commit fails as leader is currently ~0p.",
        [identity_to_server(Leader)]
    ),
    wa_raft_queue:commit_cancelled(Queues, From, {error, not_leader}, Priority),
    keep_state_and_data;

%% [Strong Read] Non-leader nodes are not eligible for strong reads.
command(
    State,
    cast,
    ?READ_COMMAND({From, _}),
    #raft_state{queues = Queues, leader = Leader} = Data
) when State =/= leader ->
    ?SERVER_LOG_WARNING(
        State,
        Data,
        "strong read fails as leader is currently ~0p.",
        [identity_to_server(Leader)]
    ),
    wa_raft_queue:fulfill_incomplete_read(Queues, From, {error, not_leader}),
    keep_state_and_data;

%% [Notify Complete] Attempt to send more log entries to storage if applicable.
command(
    State,
    cast,
    ?NOTIFY_COMPLETE_COMMAND(),
    #raft_state{queues = Queues} = Data
) when State =:= leader; State =:= follower; State =:= witness ->
    case wa_raft_queue:apply_queue_size(Queues) of
        0 ->
            NewState = case State of
                leader -> leader_apply_log(Data);
                _ -> apply_log(State, infinity, Data)
            end,
            {keep_state, NewState};
        _ ->
            keep_state_and_data
    end;
command(_, cast, ?NOTIFY_COMPLETE_COMMAND(), #raft_state{}) ->
    keep_state_and_data;

%% [CurrentConfig] Get replica's effective RAFT cluster configuration
command(_, Type, ?CURRENT_CONFIG_COMMAND, #raft_state{} = Data) ->
    reply(Type, config(Data)),
    keep_state_and_data;

%% [Status] Get status of node.
command(State, {call, From}, ?STATUS_COMMAND, #raft_state{} = Data) ->
    {LeaderName, LeaderId} =
        case Data#raft_state.leader of
            undefined -> {undefined, undefined};
            #raft_identity{name = Name, node = Node} -> {Name, Node}
        end,
    Status = [
        {state, State},
        {id, Data#raft_state.self#raft_identity.node},
        {table, Data#raft_state.table},
        {partition, Data#raft_state.partition},
        {partition_path, Data#raft_state.partition_path},
        {current_term, Data#raft_state.current_term},
        {voted_for, Data#raft_state.voted_for},
        {commit_index, Data#raft_state.commit_index},
        {last_applied, Data#raft_state.last_applied},
        {leader_name, LeaderName},
        {leader_id, LeaderId},
        {pending_high, length(Data#raft_state.pending_high)},
        {pending_low, length(Data#raft_state.pending_low)},
        {pending_read, Data#raft_state.pending_read},
        {queued, maps:size(Data#raft_state.queued)},
        {next_indices, Data#raft_state.next_indices},
        {match_indices, Data#raft_state.match_indices},
        {log_module, wa_raft_log:provider(Data#raft_state.log_view)},
        {log_first, wa_raft_log:first_index(Data#raft_state.log_view)},
        {log_last, wa_raft_log:last_index(Data#raft_state.log_view)},
        {votes, Data#raft_state.votes},
        {inflight_applies, wa_raft_queue:apply_queue_size(Data#raft_state.table, Data#raft_state.partition)},
        {disable_reason, Data#raft_state.disable_reason},
        {config, config(Data)},
        {config_index, config_index(Data)},
        {witness, is_self_witness(Data)}
    ],
    {keep_state_and_data, {reply, From, Status}};

%% [Promote] Request full replica nodes to start a new election.
command(
    State,
    {call, From},
    ?TRIGGER_ELECTION_COMMAND(TermOrOffset),
    #raft_state{
        application = App,
        current_term = CurrentTerm
    } = Data
) when State =/= stalled, State =/= witness, State =/= disabled ->
    Term = case TermOrOffset of
        current -> CurrentTerm;
        next -> CurrentTerm + 1;
        {next, Offset} -> CurrentTerm + Offset;
        _ -> TermOrOffset
    end,
    case is_integer(Term) andalso Term >= CurrentTerm of
        true ->
            case ?RAFT_LEADER_ELIGIBLE(App) of
                true ->
                    ?SERVER_LOG_NOTICE(State, Data, "switching to candidate after promotion request.", []),
                    NewData = case Term > CurrentTerm of
                        true -> advance_term(State, Term, undefined, Data);
                        false -> Data
                    end,
                    case State of
                        candidate -> {repeat_state, NewData, {reply, From, ok}};
                        _         -> {next_state, candidate, NewData, {reply, From, ok}}
                    end;
                false ->
                    ?SERVER_LOG_WARNING(State, Data, "cannot be promoted as candidate while ineligible.", []),
                    {keep_state_and_data, {reply, From, {error, ineligible}}}
            end;
        false ->
            ?SERVER_LOG_WARNING(State, Data, "refusing to promote to current, older, or invalid term ~0p.", [Term]),
            {keep_state_and_data, {reply, From, {error, rejected}}}
    end;

%% [Promote] Non-disabled nodes check if eligible to promote and then promote to leader.
command(
    State,
    {call, From},
    ?PROMOTE_COMMAND(TermOrOffset, Force),
    #raft_state{
        application = App,
        table = Table,
        current_term = CurrentTerm,
        last_quorum_ts = LastQuorumTs,
        leader = Leader
    } = Data
) when State =/= stalled, State =/= witness, State =/= disabled ->
    Now = erlang:monotonic_time(millisecond),
    Eligible = ?RAFT_LEADER_ELIGIBLE(App),
    HeartbeatGracePeriodMs = ?RAFT_PROMOTION_GRACE_PERIOD(App, Table) * 1000,
    Term = case TermOrOffset of
        current -> CurrentTerm;
        next -> CurrentTerm + 1;
        {next, Offset} -> CurrentTerm + Offset;
        _ -> TermOrOffset
    end,
    Membership = get_config_members(config(Data)),
    Allowed = if
        % Prevent promotions to older or invalid terms
        not is_integer(Term) orelse Term < CurrentTerm ->
            ?SERVER_LOG_WARNING(
                State,
                Data,
                "cannot attempt promotion to current, older, or invalid term ~0p.",
                [Term]
            ),
            invalid_term;
        Term =:= CurrentTerm andalso Leader =/= undefined ->
            ?SERVER_LOG_WARNING(
                State,
                Data,
                "refusing to promote to leader of current term already led by ~0p.",
                [identity_to_server(Leader)]
            ),
            invalid_term;
        % Prevent promotions that will immediately result in a resignation.
        not Eligible ->
            ?SERVER_LOG_WARNING(
                State,
                Data,
                "cannot promote to leader as the node is ineligible.",
                []
            ),
            ineligible;
        State =:= witness ->
            ?SERVER_LOG_WARNING(
                State,
                Data,
                "cannot promote a witness node.",
                []
            ),
            invalid_state;
        % Prevent promotions to any operational state when there is no cluster membership configuration.
        Membership =:= [] ->
            ?SERVER_LOG_WARNING(
                State,
                Data,
                "cannot promote to leader with no existing membership.",
                []
            ),
            invalid_configuration;
        Force ->
            true;
        LastQuorumTs =:= undefined ->
            true;
        Now - LastQuorumTs >= HeartbeatGracePeriodMs ->
            true;
        true ->
            ?SERVER_LOG_WARNING(
                State,
                Data,
                "rejecting request to promote to leader as a valid heartbeat was recently received.",
                []
            ),
            rejected
    end,
    case Allowed of
        true ->
            ?SERVER_LOG_NOTICE(State, Data, "is promoting to leader of term ~0p.", [Term]),
            NewData = case Term > CurrentTerm of
                true -> advance_term(State, Term, node(), Data);
                false -> Data
            end,
            case State of
                leader -> {repeat_state, NewData, {reply, From, ok}};
                _      -> {next_state, leader, NewData, {reply, From, ok}}
            end;
        Reason ->
            {keep_state_and_data, {reply, From, {error, Reason}}}
    end;

%% [Resign] Non-leader nodes cannot resign.
command(State, {call, From}, ?RESIGN_COMMAND, #raft_state{} = Data) when State =/= leader ->
    ?SERVER_LOG_NOTICE(State, Data, "not resigning because we are not leader.", []),
    {keep_state_and_data, {reply, From, {error, not_leader}}};

%% [AdjustMembership] Non-leader nodes cannot adjust their config.
command(
    State,
    Type,
    ?ADJUST_CONFIG_COMMAND(From, Action, _),
    #raft_state{queues = Queues} = Data
) when State =/= leader ->
    ?SERVER_LOG_NOTICE(State, Data, "refusing to adjust config with action ~0p because we are not leader.", [Action]),
    From =/= undefined andalso
        wa_raft_queue:commit_cancelled(Queues, From, {error, not_leader}, high),
    reply(Type, {error, not_leader}),
    {keep_state, Data};

%% [Snapshot Available] Follower and candidate nodes might switch to stalled to install snapshot.
command(
    State,
    Type,
    ?SNAPSHOT_AVAILABLE_COMMAND(_, #raft_log_pos{index = SnapshotIndex}) = Event,
    #raft_state{
        last_applied = LastAppliedIndex
    } = Data
) when State =:= follower; State =:= candidate; State =:= witness ->
    case SnapshotIndex > LastAppliedIndex of
        true ->
            ?SERVER_LOG_NOTICE(State, Data, "at ~0p is notified of a newer snapshot at ~0p.", [LastAppliedIndex, SnapshotIndex]),
            {next_state, stalled, Data, {next_event, Type, Event}};
        false ->
            ?SERVER_LOG_NOTICE(State, Data, "at ~0p is ignoring an older snapshot at ~0p.", [LastAppliedIndex, SnapshotIndex]),
            reply(Type, {error, rejected}),
            keep_state_and_data
    end;

%% [Snapshot Available] Leader and disabled nodes should not install snapshots.
command(
    State,
    Type,
    ?SNAPSHOT_AVAILABLE_COMMAND(_, _),
    #raft_state{}
) when State =:= leader; State =:= disabled ->
    reply(Type, {error, rejected}),
    keep_state_and_data;

%% [Handover Candidates] Non-leader nodes cannot serve handovers.
command(State, {call, From}, ?HANDOVER_CANDIDATES_COMMAND, #raft_state{}) when State =/= leader ->
    {keep_state_and_data, {reply, From, {error, not_leader}}};

%% [Is Peer Ready] Non-leader nodes cannot check peer readiness.
command(State, {call, From}, ?IS_PEER_READY_COMMAND(_), #raft_state{}) when State =/= leader ->
    {keep_state_and_data, {reply, From, {error, not_leader}}};

%% [Handover] Non-leader nodes cannot serve handovers.
command(State, Type, ?HANDOVER_COMMAND(_), #raft_state{}) when State =/= leader ->
    reply(Type, {error, not_leader}),
    keep_state_and_data;

%% [Enable] Non-disabled nodes are already enabled.
command(State, {call, From}, ?ENABLE_COMMAND, #raft_state{}) when State =/= disabled ->
    {keep_state_and_data, {reply, From, {error, already_enabled}}};

%% [Disable] Any state can be disabled by setting a disable reason.
command(State, {call, From}, ?DISABLE_COMMAND(Reason), #raft_state{} = Data) ->
    ?SERVER_LOG_NOTICE(State, Data, "disabling due to ~0p.", [Reason]),
    {next_state, disabled, Data#raft_state{disable_reason = Reason}, {reply, From, ok}};

%% [Bootstrap] Non-stalled nodes are already bootstrapped.
command(_State, {call, From}, ?BOOTSTRAP_COMMAND(_Position, _Config, _Data), #raft_state{}) ->
    {keep_state_and_data, {reply, From, {error, already_bootstrapped}}};

%% [Fallback] Drop unknown command calls.
command(State, Type, Event, #raft_state{} = Data) ->
    ?SERVER_LOG_NOTICE(State, Data, "dropping unhandled ~0p command ~0P", [Type, Event, 20]),
    keep_state_and_data.

%%------------------------------------------------------------------------------
%% RAFT Server - State Machine Implementation - Cluster Configuration Helpers
%%------------------------------------------------------------------------------

%% Return whether or not the specified peer is a member of in the provided configuration.
%% Raises an error if the membership list is missing or empty.
-spec is_member(Peer :: peer(), Config :: config()) -> boolean().
is_member(Peer, Config) ->
    lists:member(Peer, config_membership(Config)).

-spec is_self(Peer :: peer(), Data :: #raft_state{}) -> boolean().
is_self({Name, Node}, #raft_state{self = #raft_identity{name = Name, node = Node}}) -> true;
is_self(_, _) -> false.

-spec is_self_member(Data :: #raft_state{}) -> boolean().
is_self_member(#raft_state{self = #raft_identity{name = Name, node = Node}} = Data) ->
    is_member({Name, Node}, config(Data)).

-spec is_self_witness(Data :: #raft_state{}) -> boolean().
is_self_witness(#raft_state{self = #raft_identity{name = Name, node = Node}} = Data) ->
    lists:member({Name, Node}, config_witnesses(config(Data))).

%% Returns true only if the membership of the current configuration contains exactly
%% the provided peer and that the provided peer is not specified as a witness.
-spec is_single_member(Peer :: #raft_identity{} | peer(), Config :: config()) -> IsSingleMember :: boolean().
is_single_member(#raft_identity{name = Name, node = Node}, Config) ->
    is_single_member({Name, Node}, Config);
is_single_member(Peer, #{membership := Membership, witness := Witnesses}) ->
    Membership =:= [Peer] andalso not lists:member(Peer, Witnesses).

-spec config_has_membership(Config :: config()) -> boolean().
config_has_membership(#{membership := Membership}) ->
    Membership =/= [].

%% Get the non-empty participants list from the provided config. Falls back to the
%% membership list if the participants list is missing or empty. Raises an error if
%% both the participants and membership lists are missing or empty.
-spec config_participants(Config :: config()) -> Participants :: membership().
config_participants(#{participants := Participants}) when Participants =/= [] ->
    Participants;
config_participants(Config) ->
    config_membership(Config).

%% Get the non-empty membership list from the provided config. Raises an error
%% if the membership list is missing or empty.
-spec config_membership(Config :: config()) -> Membership :: membership().
config_membership(#{membership := Membership}) when Membership =/= [] ->
    Membership;
config_membership(_) ->
    error(membership_not_set).

-spec config_membership_size(Config :: config()) -> Size :: non_neg_integer().
config_membership_size(Config) ->
    length(config_membership(Config)).

-spec config_witnesses(Config :: config()) -> Witnesses :: [peer()].
config_witnesses(#{witness := Witnesses}) ->
    Witnesses.

-spec config_participant_identities(Config :: config()) -> Peers :: [#raft_identity{}].
config_participant_identities(Config) ->
    [#raft_identity{name = Name, node = Node} || {Name, Node} <- config_participants(Config)].

-spec config_member_identities(Config :: config()) -> Peers :: [#raft_identity{}].
config_member_identities(Config) ->
    [#raft_identity{name = Name, node = Node} || {Name, Node} <- config_membership(Config)].

-spec config_full_member_identities(Config :: config()) -> Replicas :: [#raft_identity{}].
config_full_member_identities(Config) ->
    FullMembers = config_membership(Config) -- config_witnesses(Config),
    [#raft_identity{name = Name, node = Node} || {Name, Node} <- FullMembers].

%% Returns the current effective RAFT configuration. This is the most recent configuration
%% stored in either the RAFT log or the RAFT storage.
-spec config(State :: #raft_state{}) -> Config :: config().
config(#raft_state{log_view = View, cached_config = {ConfigIndex, Config}}) ->
    case wa_raft_log:config(View) of
        {ok, LogConfigIndex, LogConfig} when LogConfigIndex > ConfigIndex ->
            LogConfig;
        {ok, _, _} ->
            % This case will normally only occur when the log has leftover log entries from
            % previous incarnations of the RAFT server that have been applied but not yet
            % trimmed this incarnation.
            Config;
        not_found ->
            Config
    end;
config(#raft_state{log_view = View}) ->
    case wa_raft_log:config(View) of
        {ok, _, LogConfig} -> LogConfig;
        not_found -> make_config()
    end.

-spec config_index(State :: #raft_state{}) -> ConfigIndex :: wa_raft_log:log_index().
config_index(#raft_state{log_view = View, cached_config = {ConfigIndex, _}}) ->
    case wa_raft_log:config(View) of
        {ok, LogConfigIndex, _} ->
            % The case where the log contains a config that is already applied generally
            % only occurs after a restart as any log entries whose trim was deferred
            % will become visible again.
            max(LogConfigIndex, ConfigIndex);
        not_found ->
            ConfigIndex
    end;
config_index(#raft_state{log_view = View}) ->
    case wa_raft_log:config(View) of
        {ok, LogConfigIndex, _} -> LogConfigIndex;
        not_found -> 0
    end.

%% Loads and caches the current configuration stored in the RAFT storage.
%% This configuration is used whenever there is no newer configuration
%% available in the RAFT log and so needs to be kept in sync with what
%% the RAFT server expects is in storage.
-spec load_config(State :: #raft_state{}) -> NewState :: #raft_state{}.
load_config(#raft_state{storage = Storage} = State) ->
    case wa_raft_storage:config(Storage) of
        {ok, #raft_log_pos{index = ConfigIndex}, Config} ->
            State#raft_state{cached_config = {ConfigIndex, normalize_config(Config)}};
        undefined ->
            State#raft_state{cached_config = undefined};
        {error, Reason} ->
            error({could_not_load_config, Reason})
    end.

-spec load_label_state(State :: #raft_state{}) -> LabelState :: wa_raft_label:label().
load_label_state(#raft_state{storage = Storage, label_module = LabelModule}) when LabelModule =/= undefined ->
    case wa_raft_storage:label(Storage) of
        {ok, Label} ->
            Label;
        {error, Reason} ->
            error({failed_to_load_label_state, Reason})
    end;
load_label_state(_) ->
    undefined.

%% Add a new peer to the participants list of the provided cluster
%% configuration.
-spec config_add_participant(Peer :: peer(), Config :: config()) -> config().
config_add_participant(Peer, Config) ->
    Config#{
        participants => lists:umerge([Peer], config_participants(Config))
    }.

%% Add a new peer to the participants and membership lists of the provided
%% cluster configuration.
-spec config_add_member(Peer :: peer(), Config :: config()) -> config().
config_add_member(Peer, Config) ->
    Config#{
        participants => lists:umerge([Peer], config_participants(Config)),
        membership => lists:umerge([Peer], config_membership(Config))
    }.

%% Add a new peer to the participants, membership, and witness lists of the
%% provided cluster configuration.
-spec config_add_witness(Peer :: peer(), Config :: config()) -> config().
config_add_witness(Peer, Config) ->
    Config#{
        participants => lists:umerge([Peer], config_participants(Config)),
        membership => lists:umerge([Peer], config_membership(Config)),
        witness => lists:umerge([Peer], config_witnesses(Config))
    }.

%% Add a peer to just the witness list of the provided cluster configuration.
-spec config_add_witness_only(Peer :: peer(), Config :: config()) -> config().
config_add_witness_only(Peer, Config) ->
    Config#{
        witness => lists:umerge([Peer], config_witnesses(Config))
    }.

%% Remove a peer from the participants, membership, and witness lists of the
%% provided cluster configuration.
-spec config_remove_participant(Peer :: peer(), Config :: config()) -> config().
config_remove_participant(Peer, Config) ->
    Config#{
        participants => config_participants(Config) -- [Peer],
        membership => config_membership(Config) -- [Peer],
        witness => config_witnesses(Config) -- [Peer]
    }.

%% Remove a peer from the membership list of the provided
%% cluster configuration.
-spec config_remove_member(Peer :: peer(), Config :: config()) -> config().
config_remove_member(Peer, Config) ->
    Config#{
        membership => config_membership(Config) -- [Peer]
    }.

%% After an apply is sent to storage, check to see if it is a new configuration
%% being applied. If it is, then update the cached configuration.
-spec maybe_update_config(
    Index :: wa_raft_log:log_index(),
    Term :: wa_raft_log:log_term(),
    Op :: wa_raft_acceptor:command(),
    State :: #raft_state{}
) -> NewState :: #raft_state{}.
maybe_update_config(Index, _, {config, Config}, State) ->
    State#raft_state{cached_config = {Index, Config}};
maybe_update_config(_, _, _, State) ->
    State.

%% Convert a mapping of peers to values to a list of values for all members of the
%% cluster, only including those whose value exists in the mapping.
-spec to_member_list(
    Mapping :: #{node() => Value},
    Config :: config()
) -> Existing :: [Value].
to_member_list(Mapping, Config) ->
    [Value || {_, Node} <- config_membership(Config), {ok, Value} <- [maps:find(Node, Mapping)]].

%% Convert a mapping of peers to values to a list of values for all members of the
%% cluster, using the provided default value for any members that are not
%% represented in the mapping.
-spec to_member_list(
    Mapping :: #{node() => Value},
    Default :: Value,
    Config :: config()
) -> Normalized :: [Value].
to_member_list(Mapping, Default, Config) ->
    [maps:get(Node, Mapping, Default) || {_, Node} <- config_membership(Config)].

%%------------------------------------------------------------------------------
%% RAFT Server - State Machine Implementation - Quorum and Majority
%%------------------------------------------------------------------------------

%% Compute the quorum value of the list formed by converting the given mapping
%% of peers to values to a list of values for all members of the cluster. Members
%% that are not present in the mapping do not contribute to the quorum.
-spec compute_member_quorum(
    Mapping :: #{node() => Value},
    Config :: config()
 ) -> {quorum, Quorum :: Value} | none.
compute_member_quorum(Mapping, Config) ->
    case config_has_membership(Config) of
        true -> compute_quorum(to_member_list(Mapping, Config), config_membership_size(Config));
        false -> none
    end.

%% Returns the largest value that at least a majority of the Total nodes
%% have reached or exceeded, or 'none' if a majority of values are unknown.
-spec compute_quorum(Values :: [Value], Total :: non_neg_integer()) -> {quorum, Quorum :: Value} | none.
compute_quorum(Values, Total) when length(Values) * 2 =< Total ->
    % Quorum requires a majority of values to be known.
    none;
compute_quorum(Values, Total) ->
    % Sort ascending and pick the element at index (length - Total div 2).
    % This element has at least (Total div 2 + 1) values >= it (a majority),
    % making it the largest value that a majority of nodes have reached or exceeded.
    {quorum, lists:nth(length(Values) - Total div 2, lists:sort(Values))}.

-spec find_member_majority(
    Mapping :: #{node() => Value},
    Config :: config()
) -> {found, Majority :: Value} | none.
find_member_majority(Mapping, Config) ->
    case config_has_membership(Config) of
        true -> find_majority(to_member_list(Mapping, Config), config_membership_size(Config));
        false -> none
    end.

%% Find, if it exists, the majority value of the given list fragment. The full
%% list is assumed to have the given size. Only elements of the given list
%% fragment contribute towards a majority. A value is the majority of a list
%% if more than half of the elements of the list are the value.
-spec find_majority(Values :: [Value], Total :: non_neg_integer()) -> {found, Majority :: Value} | none.
find_majority(Values, Total) ->
    find_majority_impl(Values, #{}, Total div 2 + 1).

-spec find_majority_impl(
    Values :: [Value],
    Counts :: #{Value => non_neg_integer()},
    Threshold :: non_neg_integer()
) -> {found, Majority :: Value} | none.
find_majority_impl([], _, _) ->
    none;
find_majority_impl([Value | Values], Counts, Threshold) ->
    case maps:get(Value, Counts, 0) + 1 of
        Threshold -> {found, Value};
        Count -> find_majority_impl(Values, Counts#{Value => Count}, Threshold)
    end.

%%------------------------------------------------------------------------------
%% RAFT Server - State Machine Implementation - Private Functions
%%------------------------------------------------------------------------------

-spec random_election_timeout(#raft_state{}) -> non_neg_integer().
random_election_timeout(#raft_state{application = App, table = Table}) ->
    Max = ?RAFT_ELECTION_TIMEOUT_MAX(App, Table),
    Min = ?RAFT_ELECTION_TIMEOUT_MIN(App, Table),
    Timeout =
        case Max > Min of
            true -> Min + rand:uniform(Max - Min);
            false -> Min
        end,
    case ?RAFT_ELECTION_WEIGHT(App) of
        Weight when Weight > 0, Weight =< ?RAFT_ELECTION_MAX_WEIGHT ->
            % higher weight, shorter timeout so it has more chances to initiate an leader election
            round(Timeout * ?RAFT_ELECTION_MAX_WEIGHT div Weight);
        _ ->
            Timeout * ?RAFT_ELECTION_DEFAULT_WEIGHT
    end.

-spec make_log_entry(Op :: wa_raft_acceptor:op(), Data :: #raft_state{}) -> {wa_raft_log:log_entry(), #raft_state{}}.
make_log_entry(Op, #raft_state{last_label = LastLabel} = Data) ->
    {Entry, NewLabel} = make_log_entry_impl(Op, LastLabel, Data),
    {Entry, Data#raft_state{last_label = NewLabel}}.

-spec make_log_entry_impl(
    Op :: wa_raft_acceptor:op(),
    Data :: #raft_state{}
) -> {wa_raft_log:log_entry(), wa_raft_label:label() | undefined}.
make_log_entry_impl(Op, #raft_state{last_label = LastLabel} = Data) ->
    make_log_entry_impl(Op, LastLabel, Data).

-spec make_log_entry_impl(
    Op :: wa_raft_acceptor:op(),
    LastLabel :: wa_raft_label:label() | undefined,
    Data :: #raft_state{}
) -> {wa_raft_log:log_entry(), wa_raft_label:label() | undefined}.
make_log_entry_impl({Key, Command}, LastLabel, #raft_state{label_module = undefined} = Data) ->
    {make_log_entry_impl(Key, LastLabel, Command, Data), LastLabel};
make_log_entry_impl({Key, Command}, LastLabel, #raft_state{label_module = LabelModule} = Data) ->
    NewLabel = case requires_new_label(Command) of
        true -> LabelModule:new_label(LastLabel, Command);
        false -> LastLabel
    end,
    {make_log_entry_impl(Key, NewLabel, Command, Data), NewLabel}.

-spec make_log_entry_impl(
    Key :: wa_raft_acceptor:key(),
    Label :: wa_raft_label:label() | undefined,
    Command :: wa_raft_acceptor:command(),
    Data :: #raft_state{}
) -> wa_raft_log:log_entry().
make_log_entry_impl(Key, undefined, Command, #raft_state{current_term = CurrentTerm}) ->
    {CurrentTerm, {Key, Command}};
make_log_entry_impl(Key, Label, Command, #raft_state{current_term = CurrentTerm}) ->
    {CurrentTerm, {Key, Label, Command}}.

-spec requires_new_label(Command :: wa_raft_acceptor:command()) -> boolean().
requires_new_label(noop) -> false;
requires_new_label({config, _}) -> false;
requires_new_label(_) -> true.

-spec apply_single_node_cluster(Data :: #raft_state{}) -> #raft_state{}.
apply_single_node_cluster(#raft_state{self = Self} = Data0) ->
    case is_single_member(Self, config(Data0)) of
        true ->
            Data1 = commit_pending(Data0, high),
            Data2 = commit_pending(Data1, low),
            Data3 = leader_advance_commit_index(Data2),
            Data4 = leader_apply_log(Data3),
            Data5 = update_quorum_ts(Data4),
            Data5;
        false ->
            Data0
    end.

%% Check if the leader should update the recorded commit index due to an
%% advancement in the log quorum. (5.4.2)
-spec leader_advance_commit_index(Data :: #raft_state{}) -> #raft_state{}.
leader_advance_commit_index(
    #raft_state{
        table = Table,
        log_view = View,
        commit_index = CommitIndex,
        match_indices = MatchIndices,
        first_current_term_log_index = TermStartIndex
    } = Data
) ->
    LastIndex = wa_raft_log:last_index(View),
    AllMatchIndices = MatchIndices#{node() => LastIndex},
    case compute_member_quorum(AllMatchIndices, config(Data)) of
        % Only log entries from the leader's current term can be committed
        % solely by counting replicas (5.4.3).
        {quorum, QuorumIndex} when QuorumIndex < TermStartIndex ->
            ?RAFT_COUNT(Table, 'apply.delay.old'),
            ?SERVER_LOG_WARNING(
                leader,
                Data,
                "unable to establish quorum at ~0p before the start of the current term at ~0p.",
                [QuorumIndex, TermStartIndex]
            ),
            Data;
        {quorum, QuorumIndex} when QuorumIndex > CommitIndex ->
            Data#raft_state{commit_index = QuorumIndex};
        _ ->
            Data
    end.

-spec leader_apply_log(Data :: #raft_state{}) -> #raft_state{}.
leader_apply_log(
    #raft_state{
        last_applied = LastApplied,
        match_indices = MatchIndices
    } = Data
) ->
    TrimIndex = lists:min(to_member_list(MatchIndices#{node() => LastApplied}, 0, config(Data))),
    apply_log(leader, TrimIndex, Data).

-spec apply_log(
    State :: state(),
    TrimIndex :: wa_raft_log:log_index() | infinity,
    Data :: #raft_state{}
) -> #raft_state{}.
apply_log(
    State,
    TrimIndex,
    #raft_state{
        application = App,
        table = Table,
        queues = Queues,
        log_view = View,
        commit_index = CommitIndex,
        last_applied = LastApplied
    } = Data0
) when CommitIndex > LastApplied ->
    StartTUsec = erlang:monotonic_time(microsecond),
    case wa_raft_queue:apply_queue_full(Queues) of
        false ->
            % Apply a limited number of log entries (both count and total byte size limited)
            LimitedIndex = min(CommitIndex, LastApplied + ?RAFT_MAX_CONSECUTIVE_APPLY_ENTRIES(App, Table)),
            LimitBytes = ?RAFT_MAX_CONSECUTIVE_APPLY_BYTES(App, Table),
            {ok, {_, #raft_state{log_view = View1, last_applied = NewLastApplied} = Data1}} = wa_raft_log:fold(View, LastApplied + 1, LimitedIndex, LimitBytes,
                fun (Index, Size, Entry, {Index, AccData}) ->
                    wa_raft_queue:reserve_apply(Queues, Size),
                    {Index + 1, apply_op(State, Index, Size, Entry, AccData)}
                end, {LastApplied + 1, Data0}),

            % Perform log trimming since we've now applied some log entries, only keeping
            % at maximum MaxRotateDelay log entries.
            MaxRotateDelay = ?RAFT_MAX_RETAINED_ENTRIES(App, Table),
            RotateIndex = max(LimitedIndex - MaxRotateDelay, min(NewLastApplied, TrimIndex)),
            RotateIndex =/= infinity orelse error(bad_state),
            Data2 =
                case maybe_rotate_log(View1, RotateIndex, Data1) of
                    {ok, View2} ->
                        Data1#raft_state{log_view = View2};
                    {error, Reason} ->
                        ?RAFT_COUNT(Table, 'log.rotate.error'),
                        ?SERVER_LOG_WARNING(
                            State,
                            Data1,
                            "failed to rotate log at ~0p due to ~0P; leaving log view unchanged.",
                            [RotateIndex, Reason, 30]
                        ),
                        Data1
                end,
            ?RAFT_GATHER(Table, 'apply_log.latency_us', erlang:monotonic_time(microsecond) - StartTUsec),
            Data2;
        true ->
            ApplyQueueSize = wa_raft_queue:apply_queue_size(Queues),
            ?RAFT_COUNT(Table, 'apply.delay'),
            ?RAFT_GATHER(Table, 'apply.queue', ApplyQueueSize),
            ?RAFT_GATHER(Table, 'apply_log.latency_us', erlang:monotonic_time(microsecond) - StartTUsec),
            Data0
    end;
apply_log(_, _, #raft_state{} = Data) ->
    Data.

-spec maybe_rotate_log(View :: wa_raft_log:view(), RotateIndex :: wa_raft_log:log_index(), Data :: #raft_state{}) ->
    {ok, wa_raft_log:view()} | {error, term()}.
maybe_rotate_log(View, _RotateIndex, #raft_state{append_in_flight = {_Ref, _WorkerPid, _MonitorRef, _Priority, _Pending, _NewLabel}}) ->
    %% Async append writes the provider from a spawned process. Log rotation is
    %% destructive for providers with segment rewrite/swap semantics, so defer
    %% it until the in-flight append has completed and the provider is quiescent.
    {ok, View};
maybe_rotate_log(View, RotateIndex, #raft_state{}) ->
    wa_raft_log:rotate(View, RotateIndex).

-spec apply_op(
    State :: state(),
    Index :: wa_raft_log:log_index(),
    Size :: non_neg_integer(),
    Entry :: wa_raft_log:log_entry() | undefined,
    Data :: #raft_state{}
) -> #raft_state{}.
apply_op(State, LogIndex, _, _, #raft_state{last_applied = LastApplied} = Data) when LogIndex =< LastApplied ->
    ?SERVER_LOG_WARNING(State, Data, "is skipping applying log entry ~0p because log entries up to ~0p are already applied.", [LogIndex, LastApplied]),
    Data;
apply_op(_, Index, Size, {Term, {Key, Command}}, #raft_state{} = Data) ->
    Record = {Index, {Term, {Key, undefined, Command}}},
    NewData = send_op_to_storage(Index, Record, Size, Data),
    maybe_update_config(Index, Term, Command, NewData);
apply_op(_, Index, Size, {Term, {_, _, Command}} = Entry, #raft_state{} = Data) ->
    Record = {Index, Entry},
    NewData = send_op_to_storage(Index, Record, Size, Data),
    maybe_update_config(Index, Term, Command, NewData);
apply_op(State, LogIndex, _, undefined, #raft_state{table = Table, log_view = View} = Data) ->
    ?RAFT_COUNT(Table, 'server.missing.log.entry'),
    ?SERVER_LOG_ERROR(State, Data, "failed to apply ~0p because log entry is missing from log covering ~0p to ~0p.",
        [LogIndex, wa_raft_log:first_index(View), wa_raft_log:last_index(View)]),
    exit({invalid_op, LogIndex});
apply_op(State, LogIndex, _, Entry, #raft_state{table = Table} = Data) ->
    ?RAFT_COUNT(Table, 'server.corrupted.log.entry'),
    ?SERVER_LOG_ERROR(State, Data, "failed to apply unrecognized entry ~0P at ~0p.", [Entry, 20, LogIndex]),
    exit({invalid_op, LogIndex, Entry}).

-spec send_op_to_storage(
    Index :: wa_raft_log:log_index(),
    Record :: wa_raft_log:log_record(),
    Size :: non_neg_integer(),
    Data :: #raft_state{}
) -> #raft_state{}.
send_op_to_storage(Index, Record, Size, #raft_state{storage = Storage, queued = Queued} = Data) ->
    {{From, Priority}, NewQueued} =
        case maps:take(Index, Queued) of
            error -> {{undefined, high}, Queued};
            Value -> Value
        end,
    wa_raft_storage:apply(Storage, From, Record, Size, Priority),
    Data#raft_state{last_applied = Index, queued = NewQueued}.

%%------------------------------------------------------------------------------
%% RAFT Server - State Machine Implementation - State Management
%%------------------------------------------------------------------------------

-spec follower_or_witness(Data :: #raft_state{}) -> follower | witness.
follower_or_witness(Data) ->
    case is_self_witness(Data) of
        false -> follower;
        true -> witness
    end.

%% Setup the RAFT state upon entry into a new RAFT server state.
-spec enter_state(State :: state(), Data :: #raft_state{}) -> #raft_state{}.
enter_state(State, Data0) ->
    Now = erlang:monotonic_time(millisecond),
    Data1 = Data0#raft_state{state_start_ts = Now},
    Data2 = set_leader_upon_entry(State, Data1),
    Data3 = cancel_pending_upon_entry(State, Data2),
    Data4 = update_quorum_ts(Now, Data3),
    update_current_term_info(State, Data4),
    update_status(State, Data4),
    Data4.

%% If we are entering leader, then ensure that we are set as the current leader.
%% If we are entering any other state, then ensure that we don't have ourselves recorded as such.
-spec set_leader_upon_entry(State :: state(), Data :: #raft_state{}) -> #raft_state{}.
set_leader_upon_entry(leader, #raft_state{self = Self} = Data) ->
    Data#raft_state{leader = Self};
set_leader_upon_entry(_, #raft_state{self = Self, leader = Self} = Data) ->
    Data#raft_state{leader = undefined};
set_leader_upon_entry(_, #raft_state{} = Data) ->
    Data.

%% If we are entering leader, then keep any pending commits buffered from candidacy.
%% If we are entering any other state, then cancel all pending commits and reads.
-spec cancel_pending_upon_entry(State :: state(), Data :: #raft_state{}) -> #raft_state{}.
cancel_pending_upon_entry(leader, Data) ->
    Data;
cancel_pending_upon_entry(_, Data) ->
    cancel_pending({error, not_leader}, Data).

%% Set a new current term and voted-for peer and clear any state that is associated with the previous term.
-spec advance_term(
    State :: state(),
    NewTerm :: wa_raft_log:log_term(),
    VotedFor :: undefined | node(),
    Data :: #raft_state{}
) -> #raft_state{}.
advance_term(
    State,
    NewTerm,
    VotedFor,
    #raft_state{
        current_term = CurrentTerm
    } = Data
) when NewTerm > CurrentTerm ->
    NewData = Data#raft_state{
        current_term = NewTerm,
        voted_for = VotedFor,
        votes = #{},
        pre_vote_ref = undefined,
        pre_votes = #{},
        leader = undefined,
        next_indices = #{},
        match_indices = #{},
        last_applied_indices = #{},
        heartbeat_send_ts = #{},
        handover = undefined
    },
    update_current_term_info(State, NewData),
    ok = wa_raft_durable_state:store(NewData),
    NewData.

%%------------------------------------------------------------------------------
%% RAFT Server - State Machine Implementation - Elections
%%------------------------------------------------------------------------------

%% Whether or not the local replica is eligible to maintain leadership.
%% Generally, this requires that the replica is a member of cluster and is
%% eligible to be leader. When check quorum is enabled, the leader must also
%% have received heartbeat responses from a quorum of followers recently.
-spec leader_eligible(Data :: #raft_state{}) -> eligible | ineligible | stale.
leader_eligible(#raft_state{application = App, table = Table, last_quorum_ts = LastQuorumTs} = Data) ->
    case ?RAFT_LEADER_ELIGIBLE(App) andalso is_self_member(Data) of
        false ->
            ineligible;
        true ->
            case ?RAFT_LEADER_CHECK_QUORUM(App, Table) of
                false ->
                    eligible;
                true when LastQuorumTs =:= undefined ->
                    stale;
                true ->
                    GracePeriod = ?RAFT_LIVENESS_GRACE_PERIOD_MS(App, Table),
                    case erlang:monotonic_time(millisecond) - LastQuorumTs =< GracePeriod of
                        true -> eligible;
                        false -> stale
                    end
            end
    end.

%% Whether or not the local replica is eligible to start elections.
%% Generally, this requires that the replica is a member of cluster, is
%% eligible to be leader and has a non-zero election weight.
-spec candidate_eligible(Data :: #raft_state{}) -> boolean().
candidate_eligible(#raft_state{application = App} = Data) ->
    ?RAFT_LEADER_ELIGIBLE(App) andalso is_self_member(Data) andalso ?RAFT_ELECTION_WEIGHT(App) =/= 0.


-spec candidate_start_election(
    ElectionType :: election_type(),
    Data :: #raft_state{}
) -> gen_statem:state_enter_result(state(), #raft_state{}).
candidate_start_election(
    ElectionType,
    #raft_state{
        self = Self,
        log_view = View,
        current_term = CurrentTerm
    } = Data
) ->
    % Advance the term and record that we will vote for ourselves.
    NewData = advance_term(candidate, CurrentTerm + 1, node(), Data),

    % Determine the log index and term at which the election will occur.
    LastLogIndex = wa_raft_log:last_index(View),
    {ok, LastLogTerm} = wa_raft_log:term(View, LastLogIndex),

    ?SERVER_LOG_NOTICE(
        candidate,
        NewData,
        "advances to new term and starts ~0p election at ~0p:~0p.",
        [ElectionType, LastLogIndex, LastLogTerm]
    ),

    % Request a vote from all peers and also vote for ourselves.
    % Candidates always vote for themselves at the start of an election.
    send_rpc_to_all_members(?REQUEST_VOTE(ElectionType, LastLogIndex, LastLogTerm), NewData),
    send_rpc(Self, ?VOTE(true), NewData),

    {keep_state, NewData, ?ELECTION_TIMEOUT(NewData)}.

%%------------------------------------------------------------------------------
%% RAFT Server - State Machine Implementation - Leader Methods
%%------------------------------------------------------------------------------

-spec append_entries_to_followers(Data :: #raft_state{}) -> #raft_state{}.
append_entries_to_followers(Data0) ->
    Data1 = commit_pending(Data0, high),
    Data2 = commit_pending(Data1, low),
    Data3 = lists:foldl(fun heartbeat/2, Data2, config_participant_identities(config(Data2))),
    Data3.

-spec has_pending_commits(Data :: #raft_state{}) -> boolean().
has_pending_commits(#raft_state{pending_high = [], pending_low = []}) ->
    false;
has_pending_commits(#raft_state{}) ->
    true.

-spec pending_count(Data :: #raft_state{}) -> non_neg_integer().
pending_count(#raft_state{pending_high = PendingHigh, pending_low = PendingLow}) ->
    length(PendingHigh) + length(PendingLow).

-spec add_pending(
    From :: gen_server:from(),
    Op :: wa_raft_acceptor:op(),
    Priority :: wa_raft_acceptor:priority(),
    Data :: #raft_state{}
) -> #raft_state{}.
add_pending(From, Op, high, #raft_state{pending_high = PendingHigh} = Data) ->
    Data#raft_state{pending_high = [{From, Op} | PendingHigh]};
add_pending(From, Op, low, #raft_state{pending_low = PendingLow} = Data) ->
    Data#raft_state{pending_low = [{From, Op} | PendingLow]}.

-spec commit_pending(Data :: #raft_state{}, Priority :: wa_raft_acceptor:priority()) -> #raft_state{}.
commit_pending(#raft_state{append_in_flight = {_Ref, _WorkerPid, _MonitorRef, _AppendPriority, _Pending, _NewLabel}} = Data, _Priority) ->
    Data;
commit_pending(#raft_state{pending_high = [], pending_low = [], pending_read = false} = Data, _Priority) ->
    Data;
commit_pending(#raft_state{table = Table, log_view = View, pending_high = PendingHigh, pending_low = PendingLow, queued = Queued} = Data, Priority) ->
    Pending =
        case Priority of
            high ->
                PendingHigh;
            low ->
                PendingLow
        end,
    {Entries, NewLabel} = collect_pending(Data, Priority),
    case Entries of
        [_ | _] ->
            % If we have processed at least one new log entry
            % with the given priority, we try to append to the log.
            case should_async_log_append(Data, Priority) of
                true ->
                    start_async_log_append(Data, Priority, Pending, Entries, NewLabel);
                false ->
                    case wa_raft_log:try_append(View, Entries, Priority) of
                {ok, NewView} ->
                    % Add the newly appended log entries to the pending queue (which
                    % is kept in reverse order).
                    Last = wa_raft_log:last_index(NewView),
                    {_, NewQueued} =
                        lists:foldl(
                            fun ({From, _Op}, {Index, AccQueued}) ->
                                {Index - 1, AccQueued#{Index => {From, Priority}}}
                            end,
                            {Last, Queued},
                            Pending
                        ),
                    % We can clear pending read flag as we've successfully added at
                    % least one new log entry so the leader will proceed to replicate
                    % and establish a new quorum is it is still up to date.
                    Data1 = Data#raft_state{
                        log_view = NewView,
                        last_label = NewLabel,
                        pending_read = false,
                        queued = NewQueued
                    },
                    case Priority of
                        high ->
                            Data1#raft_state{pending_high = []};
                        low ->
                            Data1#raft_state{pending_low = []}
                    end;
                skipped ->
                    % Since the append failed, we do not advance to the new label.
                    % We cancel all pending commits and reads.
                    ?RAFT_COUNTV(Table, {'commit.skipped', Priority}, length(Entries)),
                    ?SERVER_LOG_WARNING(leader, Data, "skipped pre-heartbeat sync for ~0p log entr(ies).", [length(Entries)]),
                    cancel_pending({error, commit_stalled}, Data);
                {error, Error} ->
                    ?RAFT_COUNTV(Table, {'commit.error', Priority}, length(Entries)),
                    ?SERVER_LOG_ERROR(leader, Data, "sync failed due to ~0P.", [Error, 20]),
                    error(Error)
                    end
            end;
        _ ->
            Data
    end.

-spec should_async_log_append(Data :: #raft_state{}, Priority :: wa_raft_acceptor:priority()) -> boolean().
should_async_log_append(
    #raft_state{application = App, table = Table, self = Self, append_in_flight = undefined} = Data,
    _Priority
) ->
    ?RAFT_ASYNC_LOG_APPEND(App, Table) andalso is_single_member(Self, config(Data));
should_async_log_append(#raft_state{}, _Priority) ->
    false.

-spec start_async_log_append(
    Data :: #raft_state{},
    Priority :: wa_raft_acceptor:priority(),
    Pending :: [{gen_server:from(), wa_raft_acceptor:op()}],
    Entries :: [wa_raft_log:log_entry()],
    NewLabel :: wa_raft_label:label() | undefined
) -> #raft_state{}.
start_async_log_append(#raft_state{log_view = View} = Data0, Priority, Pending, Entries, NewLabel) ->
    Ref = make_ref(),
    Parent = self(),
    {WorkerPid, MonitorRef} = spawn_monitor(fun() ->
        Result =
            try wa_raft_log:try_append(View, Entries, Priority) of
                AppendResult -> AppendResult
            catch
                Class:Reason:Stacktrace -> {error, {Class, Reason, Stacktrace}}
            end,
        Parent ! {async_log_append_complete, Ref, Result}
    end),
    clear_pending_priority(
        Priority,
        Data0#raft_state{
            pending_read = false,
            append_in_flight = {Ref, WorkerPid, MonitorRef, Priority, Pending, NewLabel}
        }
    ).

-spec complete_async_log_append(Ref :: reference(), Result :: term(), Data :: #raft_state{}) -> #raft_state{}.
complete_async_log_append(
    Ref,
    {ok, NewView},
    #raft_state{table = Table, queued = Queued, append_in_flight = {Ref, _WorkerPid, MonitorRef, Priority, Pending, NewLabel}} = Data
) ->
    _ = erlang:demonitor(MonitorRef, [flush]),
    Last = wa_raft_log:last_index(NewView),
    {_, NewQueued} =
        lists:foldl(
            fun ({From, _Op}, {Index, AccQueued}) ->
                {Index - 1, AccQueued#{Index => {From, Priority}}}
            end,
            {Last, Queued},
            Pending
        ),
    ?RAFT_COUNTV(Table, {'commit.async_append.ok', Priority}, length(Pending)),
    Data#raft_state{
        log_view = NewView,
        last_label = NewLabel,
        queued = NewQueued,
        append_in_flight = undefined
    };
complete_async_log_append(
    Ref,
    skipped,
    #raft_state{table = Table, append_in_flight = {Ref, _WorkerPid, MonitorRef, Priority, Pending, _NewLabel}} = Data
) ->
    _ = erlang:demonitor(MonitorRef, [flush]),
    ?RAFT_COUNTV(Table, {'commit.async_append.skipped', Priority}, length(Pending)),
    ?SERVER_LOG_WARNING(leader, Data, "skipped async pre-heartbeat sync for ~0p log entr(ies).", [length(Pending)]),
    cancel_pending({error, commit_stalled}, Data#raft_state{append_in_flight = undefined});
complete_async_log_append(
    Ref,
    {error, Error},
    #raft_state{table = Table, append_in_flight = {Ref, _WorkerPid, MonitorRef, Priority, Pending, _NewLabel}} = Data
) ->
    _ = erlang:demonitor(MonitorRef, [flush]),
    ?RAFT_COUNTV(Table, {'commit.async_append.error', Priority}, length(Pending)),
    ?SERVER_LOG_ERROR(leader, Data, "async sync failed due to ~0P.", [Error, 20]),
    Data1 = cancel_async_append_pending(
        {error, {commit_call_failed_after_submit, Error}},
        Priority,
        Pending,
        Data#raft_state{append_in_flight = undefined}
    ),
    rollback_async_append_disk(leader, Data1);
complete_async_log_append(_Ref, _Result, #raft_state{} = Data) ->
    Data.

-spec complete_async_log_append_after_leadership_loss(
    StateName :: state(),
    Ref :: reference(),
    Result :: term(),
    Data :: #raft_state{}
) -> #raft_state{}.
complete_async_log_append_after_leadership_loss(
    StateName,
    Ref,
    {ok, _NewView},
    #raft_state{
        table = Table,
        append_in_flight = {Ref, _WorkerPid, MonitorRef, Priority, Pending, _NewLabel}
    } = Data
) ->
    _ = erlang:demonitor(MonitorRef, [flush]),
    ?RAFT_COUNTV(Table, {'commit.async_append.not_leader', Priority}, length(Pending)),
    ?SERVER_LOG_NOTICE(StateName, Data, "completed stale async append after leadership loss; cancelling ~0p entr(ies).", [length(Pending)]),
    Data1 = cancel_async_append_pending({error, not_leader}, Priority, Pending, Data#raft_state{
        append_in_flight = undefined
    }),
    rollback_async_append_disk(StateName, Data1);
complete_async_log_append_after_leadership_loss(
    StateName,
    Ref,
    skipped,
    #raft_state{
        table = Table,
        append_in_flight = {Ref, _WorkerPid, MonitorRef, Priority, Pending, _NewLabel}
    } = Data
) ->
    _ = erlang:demonitor(MonitorRef, [flush]),
    ?RAFT_COUNTV(Table, {'commit.async_append.not_leader.skipped', Priority}, length(Pending)),
    ?SERVER_LOG_WARNING(StateName, Data, "stale async append skipped after leadership loss for ~0p entr(ies).", [length(Pending)]),
    cancel_async_append_pending({error, not_leader}, Priority, Pending, Data#raft_state{
        append_in_flight = undefined
    });
complete_async_log_append_after_leadership_loss(
    StateName,
    Ref,
    {error, Error},
    #raft_state{
        table = Table,
        append_in_flight = {Ref, _WorkerPid, MonitorRef, Priority, Pending, _NewLabel}
    } = Data
) ->
    _ = erlang:demonitor(MonitorRef, [flush]),
    ?RAFT_COUNTV(Table, {'commit.async_append.not_leader.error', Priority}, length(Pending)),
    ?SERVER_LOG_WARNING(StateName, Data, "stale async append failed after leadership loss due to ~0P.", [Error, 20]),
    cancel_async_append_pending({error, not_leader}, Priority, Pending, Data#raft_state{
        append_in_flight = undefined
    });
complete_async_log_append_after_leadership_loss(_StateName, _Ref, _Result, #raft_state{} = Data) ->
    Data.

-spec complete_async_log_append_down(
    StateName :: state(),
    MonitorRef :: reference(),
    WorkerPid :: pid(),
    Reason :: term(),
    Data :: #raft_state{}
) -> #raft_state{}.
complete_async_log_append_down(
    StateName,
    MonitorRef,
    WorkerPid,
    Reason,
    #raft_state{
        table = Table,
        append_in_flight = {_Ref, WorkerPid, MonitorRef, Priority, Pending, _NewLabel}
    } = Data
) ->
    ?RAFT_COUNTV(Table, {'commit.async_append.down', Priority}, length(Pending)),
    ?SERVER_LOG_WARNING(StateName, Data, "async append worker exited before completion due to ~0P; cancelling ~0p entr(ies).", [Reason, 20, length(Pending)]),
    Data1 = cancel_async_append_pending({error, commit_stalled}, Priority, Pending, Data#raft_state{
        append_in_flight = undefined
    }),
    rollback_async_append_disk(StateName, Data1);
complete_async_log_append_down(_StateName, _MonitorRef, _WorkerPid, _Reason, #raft_state{} = Data) ->
    Data.

-spec cancel_async_append_in_flight(
    StateName :: state(),
    Reason :: wa_raft_acceptor:common_error(),
    Data :: #raft_state{}
) -> #raft_state{}.
cancel_async_append_in_flight(
    StateName,
    Reason,
    #raft_state{
        append_in_flight = {_Ref, WorkerPid, MonitorRef, Priority, Pending, _NewLabel}
    } = Data
) ->
    exit(WorkerPid, kill),
    case await_async_append_worker_down(WorkerPid, MonitorRef) of
        ok ->
            Data1 = cancel_async_append_pending(Reason, Priority, Pending, Data#raft_state{
                append_in_flight = undefined
            }),
            rollback_async_append_disk(StateName, Data1);
        {error, WaitReason} ->
            ?SERVER_LOG_ERROR(StateName, Data, "failed to stop async append worker due to ~0P.", [WaitReason, 20]),
            error({async_append_cancel_failed, WaitReason})
    end;
cancel_async_append_in_flight(_StateName, _Reason, #raft_state{} = Data) ->
    Data.

-spec await_async_append_worker_down(WorkerPid :: pid(), MonitorRef :: reference()) -> ok | {error, term()}.
await_async_append_worker_down(WorkerPid, MonitorRef) ->
    receive
        {'DOWN', MonitorRef, process, WorkerPid, _Reason} ->
            ok
    after 5000 ->
        _ = erlang:demonitor(MonitorRef, [flush]),
        {error, async_append_worker_kill_timeout}
    end.

-spec rollback_async_append_disk(StateName :: state(), Data :: #raft_state{}) -> #raft_state{}.
rollback_async_append_disk(StateName, #raft_state{table = Table, log_view = View} = Data) ->
    %% The async worker writes the provider before the Raft server advances
    %% log_view. If that append is later cancelled, persisted bytes beyond this
    %% view must be removed or restart can replay an unacknowledged command.
    TruncateIndex = wa_raft_log:last_index(View) + 1,
    case close_async_append_owner_writers(View) of
        ok ->
            case wa_raft_log:truncate(View, TruncateIndex) of
                {ok, NewView} ->
                    ?RAFT_COUNT(Table, 'commit.async_append.rollback.ok'),
                    Data#raft_state{log_view = NewView};
                {error, Reason} ->
                    ?RAFT_COUNT(Table, 'commit.async_append.rollback.error'),
                    ?SERVER_LOG_ERROR(StateName, Data, "failed to rollback cancelled async append at ~0p due to ~0P.", [TruncateIndex, Reason, 30]),
                    error({async_append_rollback_failed, Reason})
            end;
        {error, Reason} ->
            ?RAFT_COUNT(Table, 'commit.async_append.rollback.error'),
            ?SERVER_LOG_ERROR(StateName, Data, "failed to close caller-owned writers before async append rollback due to ~0P.", [Reason, 30]),
            error({async_append_rollback_failed, Reason})
    end.

-spec close_async_append_owner_writers(View :: wa_raft_log:view()) -> ok | {error, term()}.
close_async_append_owner_writers(View) ->
    Provider = wa_raft_log:provider(View),
    case erlang:function_exported(Provider, close_process_writers, 1) of
        true -> Provider:close_process_writers(wa_raft_log:log(View));
        false -> ok
    end.

-spec cancel_async_append_pending(
    Reason :: async_append_cancel_reason(),
    Priority :: wa_raft_acceptor:priority(),
    Pending :: [{gen_server:from(), wa_raft_acceptor:op()}],
    Data :: #raft_state{}
) -> #raft_state{}.
cancel_async_append_pending(Reason, Priority, Pending, #raft_state{queues = Queues} = Data) ->
    [
        wa_raft_queue:commit_cancelled(Queues, From, Reason, Priority)
     || {From, _Op} <- lists:reverse(Pending)
    ],
    Data.

-spec clear_pending_priority(Priority :: wa_raft_acceptor:priority(), Data :: #raft_state{}) -> #raft_state{}.
clear_pending_priority(high, #raft_state{} = Data) ->
    Data#raft_state{pending_high = []};
clear_pending_priority(low, #raft_state{} = Data) ->
    Data#raft_state{pending_low = []}.

-spec collect_pending(Data :: #raft_state{}, Priority :: wa_raft_acceptor:priority()) -> {[wa_raft_log:log_entry()], wa_raft_label:label() | undefined}.
collect_pending(#raft_state{pending_high = [], pending_low = [], pending_read = true} = Data, _Priority) ->
    % If the pending queues are empty, then we have at least one pending
    % read but no commits to go along with it.
    {ReadEntry, NewLabel} = make_log_entry_impl({?READ_OP, noop}, Data),
    {[ReadEntry], NewLabel};
collect_pending(#raft_state{last_label = LastLabel, pending_high = PendingHigh, pending_low = PendingLow} = Data, Priority) ->
    % Otherwise, the pending queues are kept in reverse order so fold
    % from the right to ensure that the log entries are labeled
    % in the correct order.
    Pending =
        case Priority of
            high ->
                PendingHigh;
            low ->
                PendingLow
        end,
    {Entries, NewLabel} = lists:foldr(
        fun ({_, Op}, {AccEntries, AccLabel}) ->
            {Entry, NewAccLabel} = make_log_entry_impl(Op, AccLabel, Data),
            {[Entry | AccEntries], NewAccLabel}
        end,
        {[], LastLabel},
        Pending
    ),
    {lists:reverse(Entries), NewLabel}.

-spec cancel_pending(Reason :: wa_raft_acceptor:common_error(), Data :: #raft_state{}) -> #raft_state{}.
cancel_pending(
    Reason,
    #raft_state{
        queues = Queues,
        storage = Storage,
        pending_high = PendingHigh,
        pending_low = PendingLow,
        pending_read = PendingRead
    } = Data
) ->
    % Pending commits are kept in reverse order. Pending reads are also cancelled.
    PendingHigh =/= [] andalso
        [wa_raft_queue:commit_cancelled(Queues, From, Reason, high) || {From, _Op} <- lists:reverse(PendingHigh)],
    PendingLow =/= [] andalso
        [wa_raft_queue:commit_cancelled(Queues, From, Reason, low) || {From, _Op} <- lists:reverse(PendingLow)],
    PendingRead andalso
        wa_raft_storage:cancel_reads(Storage, Reason),
    Data#raft_state{pending_high = [], pending_low = [], pending_read = false}.

%% Cancel all pending and queued commits and reads with the given error reason.
-spec cancel_pending_and_queued(Reason :: wa_raft_acceptor:common_error(), Data :: #raft_state{}) -> #raft_state{}.
cancel_pending_and_queued(Reason, #raft_state{queues = Queues, queued = Queued} = Data) ->
    NewData = cancel_pending(Reason, Data),
    [wa_raft_queue:commit_cancelled(Queues, From, Reason, Priority) || _ := {From, Priority} <- maps:iterator(Queued, ordered)],
    NewData#raft_state{queued = #{}}.

-spec heartbeat(Peer :: #raft_identity{}, State :: #raft_state{}) -> #raft_state{}.
heartbeat(Self, #raft_state{self = Self} = Data) ->
    % Skip sending heartbeat to self
    Data;
heartbeat(
    ?IDENTITY_REQUIRES_MIGRATION(_, FollowerId) = Sender,
    #raft_state{
        application = App,
        table = Table,
        name = Name,
        log_view = View,
        commit_index = CommitIndex,
        next_indices = NextIndices,
        match_indices = MatchIndices,
        heartbeat_send_ts = HeartbeatSendTs,
        first_current_term_log_index = TermStartIndex
    } = State0
) ->
    FollowerNextIndex = maps:get(FollowerId, NextIndices, TermStartIndex),
    PrevLogIndex = FollowerNextIndex - 1,
    PrevLogTermRes = wa_raft_log:term(View, PrevLogIndex),
    FollowerMatchIndex = maps:get(FollowerId, MatchIndices, 0),
    FollowerMatchIndex =/= 0 andalso
        ?RAFT_GATHER(Table, 'leader.follower.lag', CommitIndex - FollowerMatchIndex),
    NowTs = erlang:monotonic_time(millisecond),
    LastFollowerHeartbeatSendTs = maps:get(FollowerId, HeartbeatSendTs, undefined),
    State1 = State0#raft_state{heartbeat_send_ts = HeartbeatSendTs#{FollowerId => NowTs}},
    LastIndex = wa_raft_log:last_index(View),
    case PrevLogTermRes of %% no log entry to replicate
        not_found ->
            ?RAFT_COUNT(Table, 'leader.heartbeat.not_ready'),
            ?SERVER_LOG_DEBUG(leader, State1, "at ~0p sends empty heartbeat to follower ~0p at ~0p.",
                [CommitIndex, FollowerId, FollowerNextIndex]),
            % Send append entries request.
            {ok, LastTerm} = wa_raft_log:term(View, LastIndex),
            send_rpc(Sender, ?APPEND_ENTRIES(LastIndex, LastTerm, [], CommitIndex, 0), State1),
            LastFollowerHeartbeatSendTs =/= undefined andalso
                ?RAFT_GATHER(Table, 'leader.heartbeat.interval_ms', erlang:monotonic_time(millisecond) - LastFollowerHeartbeatSendTs),
            State1;
        _ ->
            MaxLogEntries = ?RAFT_HEARTBEAT_MAX_ENTRIES(App, Table),
            MaxHeartbeatSize = ?RAFT_HEARTBEAT_MAX_BYTES(App, Table),
            Witnesses = config_witnesses(config(State0)),
            Entries = case lists:member({Name, FollowerId}, Witnesses) of
                true ->
                    {ok, RawEntries} = wa_raft_log:get(View, FollowerNextIndex, MaxLogEntries, MaxHeartbeatSize),
                    stub_entries_for_witness(RawEntries);
                false ->
                    {ok, RawEntries} = wa_raft_log:entries(View, FollowerNextIndex, MaxLogEntries, MaxHeartbeatSize),
                    RawEntries
            end,
            {ok, PrevLogTerm} = PrevLogTermRes,
            % track when we send out a heartbeat that is empty but also not at the end of the log
            Entries =:= [] andalso PrevLogIndex =/= LastIndex andalso ?RAFT_COUNT(Table, 'leader.heartbeat.empty'),
            ?RAFT_GATHER(Table, 'leader.heartbeat.size', length(Entries)),
            ?SERVER_LOG_DEBUG(leader, State1, "at ~0p sends heartbeat to follower ~0p at ~0p with ~0p entr(ies).",
                [CommitIndex, FollowerId, FollowerNextIndex, length(Entries)]),
            % Compute trim index.
            TrimIndex = lists:min(to_member_list(MatchIndices#{node() => LastIndex}, 0, config(State1))),
            % Send append entries request.
            CastResult = send_rpc(Sender, ?APPEND_ENTRIES(PrevLogIndex, PrevLogTerm, Entries, CommitIndex, TrimIndex), State1),
            NewNextIndices =
                case CastResult of
                    ok ->
                        % pipelining - move NextIndex after sending out logs. If a packet is lost, follower's AppendEntriesResponse
                        % will return send back its correct index
                        NextIndices#{FollowerId => PrevLogIndex + length(Entries) + 1};
                    _ ->
                        NextIndices
                end,
            LastFollowerHeartbeatSendTs =/= undefined andalso
                ?RAFT_GATHER(Table, 'leader.heartbeat.interval_ms', erlang:monotonic_time(millisecond) - LastFollowerHeartbeatSendTs),
            State1#raft_state{next_indices = NewNextIndices}
    end.

-spec stub_entries_for_witness([wa_raft_log:log_entry() | binary()]) -> [wa_raft_log:log_entry() | binary()].
stub_entries_for_witness(Entries) ->
    [stub_entry(E) || E <- Entries].

-spec stub_entry(wa_raft_log:log_entry() | binary()) -> wa_raft_log:log_entry() | binary().
stub_entry(Binary) when is_binary(Binary) ->
    Binary;
stub_entry({Term, undefined}) ->
    {Term, undefined};
stub_entry({Term, {Key, Cmd}}) ->
    {Term, {Key, stub_command(Cmd)}};
stub_entry({Term, {Key, Label, Cmd}}) ->
    {Term, {Key, Label, stub_command(Cmd)}}.

-spec stub_command(wa_raft_acceptor:command()) -> wa_raft_acceptor:command().
stub_command(noop) -> noop;
stub_command({config, _} = ConfigCmd) -> ConfigCmd;
stub_command(_) -> noop_omitted.

-spec get_handover_eligibility_match_cutoff(State :: #raft_state{}) -> wa_raft_log:log_index().
get_handover_eligibility_match_cutoff(#raft_state{application = App, table = Table, log_view = View}) ->
    wa_raft_log:last_index(View) - ?RAFT_HANDOVER_MAX_ENTRIES(App, Table).

-spec get_handover_eligibility_apply_cutoff(State :: #raft_state{}) -> wa_raft_log:log_index().
get_handover_eligibility_apply_cutoff(#raft_state{application = App, table = Table, log_view = View}) ->
    LastIndex = wa_raft_log:last_index(View),
    case ?RAFT_HANDOVER_MAX_UNAPPLIED_ENTRIES(App, Table) of
        undefined -> LastIndex - ?RAFT_HANDOVER_MAX_ENTRIES(App, Table);
        Limit -> LastIndex - Limit
    end.

-spec get_handover_candidates(State :: #raft_state{}) -> [node()].
get_handover_candidates(State) ->
    Replicas = config_full_member_identities(config(State)),
    MatchCutoffIndex = get_handover_eligibility_match_cutoff(State),
    ApplyCutoffIndex = get_handover_eligibility_apply_cutoff(State),
    [Peer || ?IDENTITY_REQUIRES_MIGRATION(_, Peer) <- Replicas,
        Peer =/= node(),
        is_eligible_for_handover_impl(Peer, MatchCutoffIndex, ApplyCutoffIndex, State)].

-spec is_eligible_for_handover(Peer :: peer(), Data :: #raft_state{}) -> boolean().
is_eligible_for_handover({_, Node}, Data) ->
    MatchCutoffIndex = get_handover_eligibility_match_cutoff(Data),
    ApplyCutoffIndex = get_handover_eligibility_apply_cutoff(Data),
    is_eligible_for_handover_impl(Node, MatchCutoffIndex, ApplyCutoffIndex, Data).

-spec is_eligible_for_handover_impl(
    Node :: node(),
    MatchCutoffIndex :: wa_raft_log:log_index(),
    ApplyCutoffIndex :: wa_raft_log:log_index(),
    Data :: #raft_state{}
) -> boolean().
is_eligible_for_handover_impl(
    Node,
    MatchCutoffIndex,
    ApplyCutoffIndex,
    #raft_state{
        match_indices = MatchIndices,
        last_applied_indices = LastAppliedIndices
    }
) ->
    % A peer whose matching index or last applied index is unknown should not be eligible for handovers.
    case {maps:find(Node, MatchIndices), maps:find(Node, LastAppliedIndices)} of
        {{ok, MatchIndex}, {ok, LastAppliedIndex}} ->
            MatchIndex >= MatchCutoffIndex andalso LastAppliedIndex >= ApplyCutoffIndex;
        _ ->
            false
    end.

%%------------------------------------------------------------------------------
%% RAFT Server - State Machine Implementation - Configuration Changes
%%------------------------------------------------------------------------------

-spec leader_config_change_allowed(
    Index :: wa_raft_log:log_index() | undefined,
    State :: #raft_state{}
) -> ok | {error, Reason :: term()}.
leader_config_change_allowed(
    Index,
    #raft_state{
        log_view = View,
        commit_index = CommitIndex,
        last_applied = LastApplied,
        cached_config = {CachedConfigIndex, _},
        first_current_term_log_index = TermStartIndex
    } = State
) ->
    % A leader must establish quorum on at least one log entry in the current
    % term because it is ready for a configuration change.
    QuorumReady = CommitIndex >= TermStartIndex,
    % No two configuration changes can be in the log at the same time
    % and both be not yet committed.
    ConfigIndex = case wa_raft_log:config(View) of
        {ok, LogConfigIndex, _} -> erlang:max(CachedConfigIndex, LogConfigIndex);
        _ -> CachedConfigIndex
    end,
    if
        not QuorumReady ->
            ?SERVER_LOG_NOTICE(
                leader,
                State,
                "at ~0p is not ready for a new configuration because it has not established a quorum in the current term.",
                [CommitIndex]
            ),
            {error, no_quorum};
        ConfigIndex > LastApplied ->
            ?SERVER_LOG_NOTICE(
                leader,
                State,
                "at ~0p is not ready for a new configuration because a new configuration is not yet committed at ~0p.",
                [CommitIndex, ConfigIndex]
            ),
            {error, not_ready};
        Index =:= undefined ->
            ok;
        Index =/= ConfigIndex ->
            ?SERVER_LOG_NOTICE(
                leader,
                State,
                "refuses a new configuration because the configuration at ~0p is not at the required ~0p",
                [ConfigIndex, Index]
            ),
            {error, config_index_mismatch};
        true ->
            ok
    end.

-spec leader_change_config(NewConfig :: wa_raft_server:config(), From :: undefined | gen_server:from(), State :: #raft_state{}) ->
    {ok, NewConfigPosition :: wa_raft_log:log_pos(), NewState :: #raft_state{}} | {error, Reason :: term()}.
leader_change_config(NewConfig, From, #raft_state{log_view = View, current_term = CurrentTerm} = State0) ->
    {LogEntry, State1} = make_log_entry({make_ref(), {config, NewConfig}}, State0),
    case wa_raft_log:try_append(View, [LogEntry]) of
        {ok, NewView} ->
            NewConfigIndex = wa_raft_log:last_index(NewView),
            NewConfigPosition = #raft_log_pos{index = NewConfigIndex, term = CurrentTerm},
            State2 = State1#raft_state{log_view = NewView},
            % When initiated from the commit flow (via wa_raft_acceptor), From is
            % non-undefined and needs to be stored in the queued map so that the
            % storage can reply to the caller when the config change is applied.
            State3 = case From of
                undefined ->
                    State2;
                _ ->
                    Queued = State2#raft_state.queued,
                    State2#raft_state{queued = Queued#{NewConfigIndex => {From, high}}}
            end,
            {ok, NewConfigPosition, State3};
        skipped ->
            {error, commit_stalled};
        {error, Reason} ->
            {error, Reason}
    end.

-spec leader_adjust_config(Action :: config_action(), State :: #raft_state{}) ->
    {ok, NewConfig :: config()} | {error, Reason :: atom()}.
leader_adjust_config(refresh, Data) ->
    {ok, config(Data)};
leader_adjust_config({Action, Peer}, Data) ->
    Config = config(Data),
    IsSelf = is_self(Peer, Data),
    IsReady = is_eligible_for_handover(Peer, Data),
    IsMember = lists:member(Peer, config_membership(Config)),
    IsWitness = lists:member(Peer, config_witnesses(Config)),
    IsParticipant = lists:member(Peer, config_participants(Config)),
    case Action of
        add ->
            if
                IsMember -> {error, already_member};
                IsWitness -> {error, already_witness};
                true -> {ok, config_add_member(Peer, Config)}
            end;
        add_witness ->
            if
                IsMember andalso IsWitness -> {error, already_witness};
                IsMember -> {error, already_member};
                IsParticipant andalso not IsWitness -> {error, not_a_witness};
                true -> {ok, config_add_witness(Peer, Config)}
            end;
        add_participant ->
            if
                IsMember -> {error, already_member};
                IsWitness -> {error, already_witness};
                IsParticipant -> {error, already_participating};
                true -> {ok, config_add_participant(Peer, Config)}
            end;
        promote_participant_if_ready ->
            if
                IsMember -> {error, already_member};
                IsWitness -> {error, already_witness};
                not IsParticipant -> {error, not_a_participant};
                not IsReady -> {error, not_ready};
                true -> {ok, config_add_member(Peer, Config)}
            end;
        remove ->
            if
                IsSelf -> {error, cannot_remove_self};
                not IsParticipant -> {error, not_a_participant};
                true -> {ok, config_remove_participant(Peer, Config)}
            end;
        remove_witness ->
            if
                IsSelf -> {error, cannot_remove_self};
                not IsWitness -> {error, not_a_witness};
                true -> {ok, config_remove_participant(Peer, Config)}
            end;
        remove_membership ->
            if
                IsSelf -> {error, cannot_remove_self};
                not IsMember -> {error, not_a_member};
                true -> {ok, config_remove_member(Peer, Config)}
            end;
        demote_to_witness ->
            if
                IsSelf -> {error, cannot_demote_self};
                not IsParticipant -> {error, not_a_participant};
                IsWitness -> {error, already_witness};
                true -> {ok, config_add_witness_only(Peer, Config)}
            end
    end.

%%------------------------------------------------------------------------------
%% RAFT Server - State Machine Implementation - Log State
%%------------------------------------------------------------------------------

-spec reset_log(Position :: wa_raft_log:log_pos(), Data :: #raft_state{}) -> #raft_state{}.
reset_log(#raft_log_pos{index = Index} = Position, #raft_state{queues = Queues, log_view = View, queued = Queued} = Data) ->
    {ok, NewView} = wa_raft_log:reset(View, Position),
    [wa_raft_queue:commit_cancelled(Queues, From, {error, cancelled}, Priority) || _ := {From, Priority} <- maps:iterator(Queued, ordered)],
    NewData = Data#raft_state{
        log_view = NewView,
        last_applied = Index,
        commit_index = Index,
        queued = #{}
    },
    load_config(NewData).

%%------------------------------------------------------------------------------
%% RAFT Server - State Machine Implementation - Snapshots
%%------------------------------------------------------------------------------

-spec open_snapshot(Root :: string(), Position :: wa_raft_log:log_pos(), Data :: #raft_state{}) -> #raft_state{}.
open_snapshot(Root, Position, #raft_state{storage = Storage} = Data) ->
    ok = wa_raft_storage:open_snapshot(Storage, Root, Position),
    reset_log(Position, Data).

%%------------------------------------------------------------------------------
%% RAFT Server - State Machine Implementation - Heartbeat
%%------------------------------------------------------------------------------

%% Attempt to append the log entries declared by a leader in a heartbeat,
%% apply committed but not yet applied log entries, trim the log, and reset
%% the election timeout timer as necessary.
-spec handle_heartbeat(
    State :: state(),
    Event :: gen_statem:event_type(),
    Leader :: #raft_identity{},
    PrevLogIndex :: wa_raft_log:log_index(),
    PrevLogTerm :: wa_raft_log:log_term(),
    Entries :: [wa_raft_log:log_entry() | binary()],
    CommitIndex :: wa_raft_log:log_index(),
    TrimIndex :: wa_raft_log:log_index(),
    Data :: #raft_state{}
) -> gen_statem:event_handler_result(state(), #raft_state{}).

handle_heartbeat(
    State,
    _Event,
    Leader,
    PrevLogIndex,
    PrevLogTerm,
    Entries,
    LeaderCommitIndex,
    TrimIndex,
    #raft_state{
        application = App,
        table = Table,
        queues = Queues,
        log_view = View,
        commit_index = CommitIndex,
        last_applied = LastApplied
    } = Data0
) ->
    EntryCount = length(Entries),

    ?RAFT_GATHER(Table, {'heartbeat.size', State}, EntryCount),
    EntryCount =/= 0 andalso
        ?SERVER_LOG_DEBUG(State, Data0, "considering appending ~0p log entries in range ~0p to ~0p to log ending at ~0p.",
            [EntryCount, PrevLogIndex + 1, PrevLogIndex + EntryCount, wa_raft_log:last_index(View)]),

    case append_entries(State, PrevLogIndex, PrevLogTerm, Entries, EntryCount, Data0) of
        {ok, Accepted, NewMatchIndex, Data1} ->
            AdjustedLastApplied = max(0, LastApplied - wa_raft_queue:apply_queue_size(Queues)),
            send_rpc(Leader, ?APPEND_ENTRIES_RESPONSE(PrevLogIndex, Accepted, NewMatchIndex, AdjustedLastApplied), Data1),

            Data2 = Data1#raft_state{
                last_quorum_ts = erlang:monotonic_time(millisecond),
                leader_commit_index = LeaderCommitIndex
            },
            Data3 = case Accepted of
                true ->
                    LocalTrimIndex = case ?RAFT_LOG_ROTATION_BY_TRIM_INDEX(App, Table) of
                        true  -> TrimIndex;
                        false -> infinity
                    end,
                    NewCommitIndex = max(CommitIndex, min(LeaderCommitIndex, NewMatchIndex)),
                    apply_log(State, LocalTrimIndex, Data2#raft_state{commit_index = NewCommitIndex});
                _ ->
                    Data2
            end,
            update_status(State, Data3),
            case follower_or_witness(Data3) of
                State    -> {keep_state, Data3, ?ELECTION_TIMEOUT(Data3)};
                NewState -> {next_state, NewState, Data3}
            end;
        {fatal, Reason} ->
            {next_state, disabled, Data0#raft_state{disable_reason = Reason}}
    end.

%% Append the provided range of the log entries to the local log only if the
%% term of the previous log matches the term stored by the local log,
%% otherwise, truncate the log if the term does not match or do nothing if
%% the previous log entry is not available locally. If an unrecoverable error
%% is encountered, returns a diagnostic that can be used as a reason to
%% disable the current replica.
-spec append_entries(
    State :: state(),
    PrevLogIndex :: wa_raft_log:log_index(),
    PrevLogTerm :: wa_raft_log:log_term(),
    Entries :: [wa_raft_log:log_entry() | binary()],
    EntryCount :: non_neg_integer(),
    Data :: #raft_state{}
) -> {ok, Accepted :: boolean(), NewMatchIndex :: wa_raft_log:log_index(), NewData :: #raft_state{}} | {fatal, Reason :: term()}.

append_entries(
    State,
    PrevLogIndex,
    PrevLogTerm,
    Entries,
    EntryCount,
    #raft_state{
        log_view = View0,
        table = Table,
        queues = Queues,
        commit_index = CommitIndex,
        current_term = CurrentTerm,
        leader = Leader,
        queued = Queued
    } = Data
) ->
    % Compare the incoming heartbeat with the local log to determine what
    % actions need to be taken as part of handling this heartbeat.
    case wa_raft_log:check_heartbeat(View0, PrevLogIndex, [{PrevLogTerm, undefined} | Entries]) of
        {ok, []} ->
            % No append is required as all the log entries in the heartbeat
            % are already in the local log.
            {ok, true, PrevLogIndex + EntryCount, Data};
        {ok, NewEntries} ->
            % No conflicting log entries were found in the heartbeat, but the
            % heartbeat does contain new log entries to be appended to the end
            % of the log.
            case wa_raft_log:try_append(View0, NewEntries) of
                {ok, View1} ->
                    {ok, true, PrevLogIndex + EntryCount, Data#raft_state{log_view = View1}};
                skipped ->
                    NewCount = length(NewEntries),
                    Last = wa_raft_log:last_index(View0),
                    ?SERVER_LOG_WARNING(
                        State,
                        Data,
                        "is not ready to append ~0p log entries in range ~0p to ~0p to log ending at ~0p.",
                        [NewCount, Last + 1, Last + NewCount, Last]
                    ),
                    {ok, false, Last, Data}
            end;
        {conflict, ConflictIndex, [ConflictEntry | _]} when ConflictIndex =< CommitIndex ->
            % A conflict is detected that would result in the truncation of a
            % log entry that the local replica has committed. We cannot validly
            % delete log entries that are already committed because doing so
            % may potentially cause the log entry to be no longer present on a
            % majority of replicas.
            {ok, LocalTerm} = wa_raft_log:term(View0, ConflictIndex),
            case decode_conflict_entry(ConflictEntry) of
                {ok, {ConflictTerm, _}} ->
                    ?RAFT_COUNT(Table, {'heartbeat.error.corruption.excessive_truncation', State}),
                    ?SERVER_LOG_WARNING(
                        State,
                        Data,
                        "refuses heartbeat at ~0p to ~0p that requires truncation past ~0p (term ~0p vs ~0p) when log entries up to ~0p are already committed.",
                        [PrevLogIndex, PrevLogIndex + EntryCount, ConflictIndex, ConflictTerm, LocalTerm, CommitIndex]
                    ),
                    Fatal = io_lib:format(
                        "A heartbeat at ~0p to ~0p from ~0p in term ~0p required truncating past ~0p (term ~0p vs ~0p) when log entries up to ~0p were already committed.",
                        [PrevLogIndex, PrevLogIndex + EntryCount, identity_to_server(Leader), CurrentTerm, ConflictIndex, ConflictTerm, LocalTerm, CommitIndex]
                    ),
                    {fatal, lists:flatten(Fatal)};
                {error, Reason} ->
                    ?RAFT_COUNT(Table, {'heartbeat.skip.error', State}),
                    ?SERVER_LOG_WARNING(
                        State,
                        Data,
                        "refuses heartbeat at ~0p to ~0p with malformed conflict entry due to ~0P",
                        [PrevLogIndex, PrevLogIndex + EntryCount, Reason, 30]
                    ),
                    {ok, false, wa_raft_log:last_index(View0), Data}
            end;
        {conflict, ConflictIndex, NewEntries} when ConflictIndex >= PrevLogIndex ->
            % A truncation is required as there is a conflict between the local
            % log and the incoming heartbeat.
            Last = wa_raft_log:last_index(View0),
            ?SERVER_LOG_NOTICE(
                State,
                Data,
                "handling heartbeat at ~0p by truncating local log ending at ~0p to past ~0p.",
                [PrevLogIndex, Last, ConflictIndex]
            ),
            case wa_raft_log:truncate(View0, ConflictIndex) of
                {ok, View1} ->
                    NewQueued = cancel_queued(Queues, ConflictIndex, Last, {error, not_leader}, Queued),
                    case ConflictIndex =:= PrevLogIndex of
                        true ->
                            % If the conflict precedes the heartbeat's log
                            % entries then no append can be performed.
                            {ok, false, wa_raft_log:last_index(View1), Data#raft_state{log_view = View1, queued = NewQueued}};
                        false ->
                            % Otherwise, we can replace the truncated log
                            % entries with those from the current heartbeat.
                            case wa_raft_log:try_append(View1, NewEntries) of
                                {ok, View2} ->
                                    {ok, true, PrevLogIndex + EntryCount, Data#raft_state{log_view = View2, queued = NewQueued}};
                                skipped ->
                                    NewCount = length(NewEntries),
                                    NewLast = wa_raft_log:last_index(View1),
                                    ?SERVER_LOG_WARNING(
                                        State,
                                        Data,
                                        "is not ready to append ~0p log entries in range ~0p to ~0p to log ending at ~0p.",
                                        [NewCount, NewLast + 1, NewLast + NewCount, NewLast]
                                    ),
                                    {ok, false, NewLast, Data}
                            end
                    end;
                {error, Reason} ->
                    ?RAFT_COUNT(Table, {'heartbeat.truncate.error', State}),
                    ?SERVER_LOG_WARNING(
                        State,
                        Data,
                        "fails to truncate past ~0p while handling heartbeat at ~0p to ~0p due to ~0P",
                        [ConflictIndex, PrevLogIndex, PrevLogIndex + EntryCount, Reason, 30]
                    ),
                    {ok, false, wa_raft_log:last_index(View0), Data}
            end;
        {invalid, out_of_range} ->
            % If the heartbeat is out of range (generally past the end of the
            % log) then ignore and notify the leader of what log entry is
            % required by this replica.
            ?RAFT_COUNT(Table, {'heartbeat.skip.out_of_range', State}),
            EntryCount =/= 0 andalso
                ?SERVER_LOG_WARNING(
                    State,
                    Data,
                    "refuses out of range heartbeat at ~0p to ~0p with local log covering ~0p to ~0p.",
                    [PrevLogIndex, PrevLogIndex + EntryCount, wa_raft_log:first_index(View0), wa_raft_log:last_index(View0)]
                ),
            {ok, false, wa_raft_log:last_index(View0), Data};
        {error, Reason} ->
            ?RAFT_COUNT(Table, {'heartbeat.skip.error', State}),
            ?SERVER_LOG_WARNING(
                State,
                Data,
                "fails to check heartbeat at ~0p to ~0p for validity due to ~0P",
                [PrevLogIndex, PrevLogIndex + EntryCount, Reason, 30]
            ),
            {ok, false, wa_raft_log:last_index(View0), Data}
    end.

-spec decode_conflict_entry(Entry :: wa_raft_log:log_entry() | binary()) ->
    {ok, wa_raft_log:log_entry()} | {error, term()}.
decode_conflict_entry(Entry) when is_binary(Entry) ->
    try binary_to_term(Entry, [safe]) of
        Decoded -> {ok, Decoded}
    catch
        _:Reason -> {error, {bad_entry_term, Reason}}
    end;
decode_conflict_entry(Entry) ->
    {ok, Entry}.

-spec cancel_queued(
    Queues :: wa_raft_queue:queues(),
    Start :: wa_raft_log:log_index(),
    End :: wa_raft_log:log_index(),
    Reason :: wa_raft_acceptor:commit_error() | undefined,
    Queued :: #{wa_raft_log:log_index() => {gen_server:from(), wa_raft_acceptor:priority()}}
) -> #{wa_raft_log:log_index() => {gen_server:from(), wa_raft_acceptor:priority()}}.
cancel_queued(_, Start, End, _, Queued) when Start > End; Queued =:= #{} ->
    Queued;
cancel_queued(Queues, Start, End, Reason, Queued) ->
    case maps:take(Start, Queued) of
        {{From, Priority}, NewQueued} ->
            wa_raft_queue:commit_cancelled(Queues, From, Reason, Priority),
            cancel_queued(Queues, Start + 1, End, Reason, NewQueued);
        error ->
            cancel_queued(Queues, Start + 1, End, Reason, Queued)
    end.

%%------------------------------------------------------------------------------
%% RAFT Server - State Machine Implementation - Vote Requests
%%------------------------------------------------------------------------------

%% [RequestPreVote RPC]
-spec handle_request_pre_vote(
    Candidate :: #raft_identity{},
    Ref :: reference(),
    Data :: #raft_state{}
) -> gen_statem:event_handler_result(state(), #raft_state{}).
handle_request_pre_vote(
    Candidate,
    Ref,
    #raft_state{log_view = View} = Data
) ->
    Index = wa_raft_log:last_index(View),
    {ok, Term} = wa_raft_log:term(View, Index),
    {Missing, _, _} = is_leader_missing(Data),
    send_rpc(Candidate, ?PRE_VOTE(Ref, Missing, Index, Term), Data),
    keep_state_and_data.

%% [RequestVote RPC]
-spec handle_request_vote(
    State :: state(),
    Candidate :: #raft_identity{},
    CandidateIndex :: wa_raft_log:log_index(),
    CandidateTerm :: wa_raft_log:log_term(),
    Data :: #raft_state{}
) -> gen_statem:event_handler_result(state(), #raft_state{}).
%% A replica with an available vote in the current term should allocate its vote
%% if the candidate's log is at least as up-to-date as the local log. (5.4.1)
handle_request_vote(
    State,
    ?IDENTITY_REQUIRES_MIGRATION(_, CandidateId) = Candidate,
    CandidateIndex,
    CandidateTerm,
    #raft_state{
        log_view = View,
        voted_for = VotedFor
    } = Data
) when VotedFor =:= undefined; VotedFor =:= CandidateId ->
    Index = wa_raft_log:last_index(View),
    {ok, Term} = wa_raft_log:term(View, Index),
    case {CandidateTerm, CandidateIndex} >= {Term, Index} of
        true ->
            ?SERVER_LOG_NOTICE(
                State,
                Data,
                "decides to vote for candidate ~0p with up-to-date log at ~0p:~0p versus local log at ~0p:~0p.",
                [Candidate, CandidateIndex, CandidateTerm, Index, Term]
            ),
            case VotedFor of
                undefined ->
                    % If this vote request causes the current replica to allocate its vote, then
                    % persist the vote before responding. (Fig. 2)
                    NewData = Data#raft_state{voted_for = CandidateId},
                    wa_raft_durable_state:store(NewData),
                    send_rpc(Candidate, ?VOTE(true), NewData),
                    {keep_state, NewData};
                CandidateId ->
                    % Otherwise, the vote allocation did not change, so just send the response.
                    send_rpc(Candidate, ?VOTE(true), Data),
                    keep_state_and_data
            end;
        false ->
            ?SERVER_LOG_NOTICE(
                State,
                Data,
                "refuses to vote for candidate ~0p with outdated log at ~0p:~0p versus local log at ~0p:~0p.",
                [Candidate, CandidateIndex, CandidateTerm, Index, Term]
            ),
            send_rpc(Candidate, ?VOTE(false), Data),
            keep_state_and_data
    end;
%% A replica that was already allocated its vote to a specific candidate in the
%% current term should ignore vote requests from other candidates. (5.4.1)
handle_request_vote(State, Candidate, _, _,  #raft_state{voted_for = VotedFor} = Data) ->
    ?SERVER_LOG_NOTICE(
        State,
        Data,
        "refusing to vote for candidate ~0p after previously voting for candidate ~0p in the current term.",
        [Candidate, VotedFor]
    ),
    send_rpc(Candidate, ?VOTE(false), Data),
    keep_state_and_data.

%%------------------------------------------------------------------------------
%% RAFT Server - State Machine Implementation - Helpers
%%------------------------------------------------------------------------------

%% Generic reply function for non-RPC requests that operates based on event type.
-spec reply(Type :: enter | gen_statem:event_type(), Message :: term()) -> ok | {error, Reason :: term()}.
reply(cast, _) ->
    ok;
reply({call, From}, Message) ->
    gen_statem:reply(From, Message);
reply(Type, Message) ->
    ?RAFT_LOG_WARNING("Attempted to reply to non-reply event type ~0p with message ~0P.", [Type, Message, 20]),
    ok.

-spec send_rpc(
    Destination :: #raft_identity{},
    Procedure :: normalized_procedure(),
    State :: #raft_state{}
) -> ok | {error, term()}.
send_rpc(Destination, Procedure, #raft_state{current_term = Term} = State) ->
    send_rpc(Destination, Term, Procedure, State).

-spec send_rpc(
    Destination :: #raft_identity{},
    Term :: wa_raft_log:log_term(),
    Procedure :: normalized_procedure(),
    State :: #raft_state{}
) -> ok | {error, term()}.
send_rpc(Destination, Term, Procedure, #raft_state{self = Self} = State) ->
    cast(Destination, make_rpc(Self, Term, Procedure), State).

-spec send_rpc_to_all_members(ProcedureCall :: normalized_procedure(), State :: #raft_state{}) -> [ok | {error, term()}].
send_rpc_to_all_members(ProcedureCall, #raft_state{self = Self} = State) ->
    [send_rpc(Peer, ProcedureCall, State) || Peer <- config_member_identities(config(State)), Peer =/= Self].

-spec cast(Destination :: #raft_identity{}, RPC :: rpc(), State :: #raft_state{}) -> ok | {error, term()}.
cast(
    #raft_identity{
        name = Name,
        node = Node
    } = Destination,
    Message,
    #raft_state{
        table = Table,
        identifier = Identifier,
        distribution_module = Distribution
    }
) ->
    try
        ok = Distribution:cast({Name, Node}, Identifier, Message)
    catch
        _:E ->
            ?RAFT_COUNT(Table, {'server.cast.error', E}),
            ?RAFT_LOG_DEBUG("Cast to ~p error ~100p", [Destination, E]),
            {error, E}
    end.

-spec maybe_heartbeat(#raft_state{}) -> #raft_state{}.
maybe_heartbeat(#raft_state{table = Table} = State) ->
    case should_heartbeat(State) of
        true ->
            ?RAFT_COUNT(Table, 'leader.heartbeat'),
            append_entries_to_followers(State);
        false ->
            State
    end.

-spec should_heartbeat(#raft_state{}) -> boolean().
should_heartbeat(#raft_state{handover = Handover}) when Handover =/= undefined ->
    false;
should_heartbeat(#raft_state{application = App, table = Table, heartbeat_send_ts = HeartbeatSendTs}) ->
    Latest = lists:max(maps:values(HeartbeatSendTs)),
    Current = erlang:monotonic_time(millisecond),
    Current - Latest > ?RAFT_HEARTBEAT_INTERVAL(App, Table).

%% Update the server's current term info in `wa_raft_info`.
-spec update_current_term_info(
    State :: state(),
    Data :: #raft_state{}
) -> ok.
update_current_term_info(
    State,
    #raft_state{
        table = Table,
        partition = Partition,
        self = #raft_identity{node = Node},
        current_term = CurrentTerm,
        leader = Leader
    }
) ->
    LeaderNode = case Leader of
        undefined -> undefined;
        #raft_identity{node = N} -> N
    end,
    CandidateNode = case State of
        candidate -> Node;
        _ -> undefined
    end,
    wa_raft_info:set_current_term_info(Table, Partition, CurrentTerm, LeaderNode, CandidateNode),
    ok.

%% Update the heartbeat reply timestamp for a peer and recompute the quorum timestamp.
-spec update_heartbeat_reply_ts(Peer :: node(), Data :: #raft_state{}) -> #raft_state{}.
update_heartbeat_reply_ts(Peer, #raft_state{heartbeat_reply_ts = HeartbeatReply} = Data) ->
    NowTs = erlang:monotonic_time(millisecond),
    NewHeartbeatReply = HeartbeatReply#{Peer => NowTs},
    NewData = Data#raft_state{heartbeat_reply_ts = NewHeartbeatReply},
    update_quorum_ts(NowTs, NewData).

%% Recompute the quorum timestamp from the current heartbeat reply timestamps.
-spec update_quorum_ts(Data :: #raft_state{}) -> #raft_state{}.
update_quorum_ts(Data) ->
    update_quorum_ts(erlang:monotonic_time(millisecond), Data).

%% Recompute the quorum timestamp from the current heartbeat reply timestamps.
-spec update_quorum_ts(
    NowTs :: integer(),
    Data :: #raft_state{}
) -> #raft_state{}.
update_quorum_ts(NowTs, #raft_state{self = #raft_identity{node = Node}, heartbeat_reply_ts = HeartbeatReply} = Data) ->
    case compute_member_quorum(HeartbeatReply#{Node => NowTs}, config(Data)) of
        {quorum, QuorumTs} -> Data#raft_state{last_quorum_ts = QuorumTs};
        _                  -> Data
    end.

%% Update the server's current status in `wa_raft_info`.
%% For stalled, disabled:
%%  - Never live and always lagging and stale.
%% For leaders:
%%  - Liveness is determined based on the age of the most recent quorum
%%    determined by accepted heartbeat responses from followers.
%%  - Lagging and staleness is the opposite of liveness.
%% For followers, candidates and witnesses:
%%  - Liveness is determined based on the timeliness of the last heartbeat
%%    received from a leader
%%  - Lagging is determined based on the number of log entries that are
%%    committed but not yet applied locally
%%  - Staleness is determined based on whether the current server state
%%    supports reading and lagging.
-spec update_status(State :: state(), Data :: #raft_state{}) -> ok.
update_status(
    State,
    #raft_state{
        table = Table,
        partition = Partition
    }
) when State =:= stalled; State =:= disabled ->
    wa_raft_info:set_status(Table, Partition, State, false, true, true),
    ok;
update_status(
    State,
    #raft_state{
        application = App,
        name = Name,
        table = Table,
        partition = Partition,
        last_quorum_ts = LastQuorumTs
    } = Data
) when State =:= leader ->

    % If the quorum of the most recent timestamps of follower's acknowledgement
    % of heartbeats is too old, then the leader is considered stale.
    NowTs = erlang:monotonic_time(millisecond),
    QuorumAge = case LastQuorumTs of
        undefined -> infinity;
        _ -> NowTs - LastQuorumTs
    end,
    QuorumGrace = ?RAFT_LEADER_STALE_INTERVAL(App, Table),
    NewLive = QuorumAge =< QuorumGrace,

    % Status should always exist while server is running
    {CachedState, CachedLive, _, _} = wa_raft_info:get_status(Table, Partition),

    case NewLive of
        CachedLive ->
            ok;
        true ->
            ?SERVER_LOG_NOTICE(
                leader,
                Data,
                "is live after heartbeat quorum age drops to ~0p ms (within ~0p ms grace period)",
                [QuorumAge, QuorumGrace]
            );
        false ->
            ?SERVER_LOG_NOTICE(
                leader,
                Data,
                "is no longer live after heartbeat quorum age rises to ~0p ms (outside ~0p ms grace period)",
                [QuorumAge, QuorumGrace]
            )
    end,

    % Update status and message queue length if necessary
    State =:= CachedState andalso NewLive =:= CachedLive orelse
        wa_raft_info:set_status(Table, Partition, leader, NewLive, not NewLive, not NewLive),
    wa_raft_info:refresh_message_queue_length(Name),
    ok;
update_status(
    State,
    #raft_state{
        application = App,
        name = Name,
        table = Table,
        partition = Partition,
        last_applied = LastApplied,
        last_quorum_ts = LeaderHeartbeatTs,
        leader_commit_index = LeaderCommitIndex
    } = Data
) when State =:= follower; State =:= candidate; State =:= witness ->
    Now = erlang:monotonic_time(millisecond),
    NewLive =
        LeaderHeartbeatTs =/= undefined andalso
            LeaderHeartbeatTs + ?RAFT_LIVENESS_GRACE_PERIOD_MS(App, Table) >= Now,
    NewLagging =
        not NewLive orelse
            LeaderCommitIndex =:= undefined orelse
            LeaderCommitIndex - LastApplied >= ?RAFT_STALE_GRACE_PERIOD_ENTRIES(App, Table),
    NewStale = NewLagging orelse State =:= witness,

    % Status should always exist while server is running
    {CachedState, CachedLive, CachedLagging, CachedStale} = wa_raft_info:get_status(Table, Partition),

    case NewLive of
        CachedLive ->
            ok;
        true ->
            ?SERVER_LOG_NOTICE(State, Data, "is live", []);
        false ->
            ?SERVER_LOG_NOTICE(
                State,
                Data,
                "is no longer live after receiving leader heartbeat at ~0p",
                [LeaderHeartbeatTs]
            )
    end,

    case NewLagging of
        CachedLagging ->
            ok;
        true when LeaderCommitIndex =:= undefined ->
            ok;
        true ->
            ?SERVER_LOG_NOTICE(
                State,
                Data,
                "is stale as last applied at ~0p is ~0p behind leader's commit at ~0p.",
                [LastApplied, LeaderCommitIndex - LastApplied, LeaderCommitIndex]
            );
        false ->
            ?SERVER_LOG_NOTICE(State, Data, "catches up.", [])
    end,

    State =:= CachedState andalso NewLive =:= CachedLive andalso NewLagging =:= CachedLagging andalso NewStale =:= CachedStale orelse
        wa_raft_info:set_status(Table, Partition, State, NewLive, NewLagging, NewStale),
    wa_raft_info:refresh_message_queue_length(Name),
    ok.

%% Check if the leader has not been seen for at least the minimum election timeout.
-spec is_leader_missing(Data :: #raft_state{}) -> {Missing :: boolean(), Delay :: infinity | integer(), AllowedDelay :: integer()}.
is_leader_missing(#raft_state{application = App, table = Table, last_quorum_ts = LastQuorumTs}) ->
    AllowedDelay = ?RAFT_ELECTION_TIMEOUT_MIN(App, Table),
    Delay = case LastQuorumTs of
        undefined -> infinity;
        _         -> erlang:monotonic_time(millisecond) - LastQuorumTs
    end,
    {Delay > AllowedDelay, Delay, AllowedDelay}.

%% Check if the leader should send a storage snapshot to this follower and request
%% if necessary.
-spec leader_maybe_request_snapshot(
    Follower :: node(),
    FollowerLastIndex :: wa_raft_log:log_index(),
    FollowerLastAppliedIndex :: wa_raft_log:log_index() | undefined,
    State :: #raft_state{}
) -> ok.
leader_maybe_request_snapshot(
    Follower,
    FollowerLastIndex,
    FollowerLastAppliedIndex,
    #raft_state{log_view = View} = State
) ->
    FirstIndex = wa_raft_log:first_index(View),
    leader_should_send_snapshot(FirstIndex, FollowerLastIndex, FollowerLastAppliedIndex, State) andalso
        request_snapshot_for_follower(Follower, State),
    ok.

%% Leaders should send a storage snapshot to a follower instead of replicating
%% when the follower:
%%  * is stalled or otherwise missing their entire log,
%%  * requires a log entry that is already rotated out, or
%%  * is very far behind (past the snapshot catchup threshold).
-spec leader_should_send_snapshot(
    FirstIndex :: wa_raft_log:log_index(),
    FollowerLastIndex :: wa_raft_log:log_index(),
    FollowerLastAppliedIndex :: wa_raft_log:log_index() | undefined,
    State :: #raft_state{}
) -> boolean().
leader_should_send_snapshot(_, 0, _, _) ->
    true;
leader_should_send_snapshot(FirstIndex, FollowerLastIndex, _, _) when FirstIndex > FollowerLastIndex ->
    true;
leader_should_send_snapshot(_, _, undefined, _) ->
    false;
leader_should_send_snapshot(_, FollowerLastIndex, FollowerLastAppliedIndex, #raft_state{application = App, table = Table}) ->
    SnapshotCatchupThreshold = ?RAFT_SNAPSHOT_CATCHUP_THRESHOLD(App, Table),
    FollowerLastIndex - FollowerLastAppliedIndex > SnapshotCatchupThreshold.

%% Try to start a snapshot transport to a follower if the snapshot transport
%% service is available. If the follower is a witness or too many snapshot
%% transports have been started then no transport is created. This function
%% always performs this request asynchronously.
-spec request_snapshot_for_follower(Follower :: node(), State :: #raft_state{}) -> ok.
request_snapshot_for_follower(
    FollowerId,
    #raft_state{
        application = App,
        name = Name,
        table = Table,
        partition = Partition
    } = State
) ->
    Witness = lists:member({Name, FollowerId}, config_witnesses(config(State))),
    wa_raft_snapshot_catchup:catchup(App, Name, FollowerId, Table, Partition, Witness).

-spec profile_startup_phase(
    Application :: atom(),
    Partition :: wa_raft:partition(),
    Phase :: atom(),
    Fun :: fun(() -> Result)
) -> Result.
profile_startup_phase(Application, Partition, Phase, Fun) ->
    StartedAt = erlang:monotonic_time(),
    Result = Fun(),
    DurationUs = erlang:convert_time_unit(erlang:monotonic_time() - StartedAt, native, microsecond),
    emit_startup_phase(Application, Partition, Phase, DurationUs),
    Result.

-spec emit_startup_phase(
    Application :: atom(),
    Partition :: wa_raft:partition(),
    Phase :: atom(),
    DurationUs :: non_neg_integer()
) -> ok.
emit_startup_phase(Application, Partition, Phase, DurationUs) ->
    Callback = application:get_env(Application, waraft_startup_phase_callback, undefined),
    emit_startup_phase_callback(Callback, Application, Partition, Phase, DurationUs).

-spec emit_startup_phase_callback(
    Callback :: undefined | fun() | {module(), atom()},
    Application :: atom(),
    Partition :: wa_raft:partition(),
    Phase :: atom(),
    DurationUs :: non_neg_integer()
) -> ok.
emit_startup_phase_callback(undefined, _Application, _Partition, _Phase, _DurationUs) ->
    ok;
emit_startup_phase_callback(Callback, Application, Partition, Phase, DurationUs)
    when is_function(Callback, 4) ->
    try Callback(Partition, Phase, DurationUs, Application) of
        _ -> ok
    catch
        _:_ -> ok
    end;
emit_startup_phase_callback(Callback, Application, Partition, Phase, DurationUs)
    when is_function(Callback, 5) ->
    try Callback(Application, Partition, Phase, DurationUs, ?MODULE) of
        _ -> ok
    catch
        _:_ -> ok
    end;
emit_startup_phase_callback({Module, Function}, Application, Partition, Phase, DurationUs)
    when is_atom(Module), is_atom(Function) ->
    try erlang:apply(Module, Function, [Application, Partition, Phase, DurationUs, ?MODULE]) of
        _ -> ok
    catch
        _:_ -> ok
    end;
emit_startup_phase_callback(_Callback, _Application, _Partition, _Phase, _DurationUs) ->
    ok.

-spec identity_to_server(Identity :: #raft_identity{} | undefined) -> {Name :: atom(), Node :: node()} | undefined.
identity_to_server(undefined) ->
    undefined;
identity_to_server(#raft_identity{name = Name, node = Node}) ->
    {Name, Node}.