Skip to main content

src/wa_raft_info.erl

%%% Copyright (c) Meta Platforms, Inc. and affiliates. All rights reserved.
%%%
%%% This source code is licensed under the Apache 2.0 license found in
%%% the LICENSE file in the root directory of this source tree.
%%%
%%% API for accessing certain useful information about the state of local
%%% RAFT servers without requiring a status request against the RAFT server
%%% itself.

-module(wa_raft_info).
-compile(warn_missing_spec_all).

%% Public API
-export([
    get_current_term_info/2,
    get_current_term_and_leader/2,
    get_current_term/2,
    get_leader/2,
    get_status/2,
    get_state/2,
    get_live/2,
    get_lagging/2,
    get_stale/2,
    get_message_queue_length/1
]).

%% Internal API
-export([
    init_tables/0
]).

%% Internal API
-export([
    set_current_term_info/5,
    set_status/6,
    set_message_queue_length/2,
    refresh_message_queue_length/1,
    clear/3
]).

%% Local RAFT server's current status flags (state, liveness, staleness and read readiness)
-define(STATUS_KEY(Table, Partition), {status, Table, Partition}).
%% Local RAFT server's most recently known term and leader
-define(CURRENT_TERM_INFO_KEY(Table, Partition), {current_term_info, Table, Partition}).
%% Local RAFT server's message queue length
-define(MESSAGE_QUEUE_LENGTH_KEY(Name), {message_queue_length, Name}).

%%-------------------------------------------------------------------
%% RAFT Info - Public API
%%-------------------------------------------------------------------

-spec get_current_term_info(Table :: wa_raft:table(), Partition :: wa_raft:partition()) -> Info | undefined when
    Info :: {Term :: wa_raft_log:log_term(), Leader :: node() | undefined, Candidate :: node() | undefined}.
get_current_term_info(Table, Partition) ->
    case ets:lookup(?MODULE, ?CURRENT_TERM_INFO_KEY(Table, Partition)) of
        [] -> undefined;
        [{_, Term, Leader, Candidate}] -> {Term, Leader, Candidate}
    end.

%% The RAFT server always sets both the known term and leader together, so that
%% the atomic read performed by this method will not return a known leader for
%% a different term.
-spec get_current_term_and_leader(Table :: wa_raft:table(), Partition :: wa_raft:partition()) -> Info | undefined when
    Info :: {Term :: wa_raft_log:log_term(), Leader :: node() | undefined}.
get_current_term_and_leader(Table, Partition) ->
    case ets:lookup(?MODULE, ?CURRENT_TERM_INFO_KEY(Table, Partition)) of
        [] -> undefined;
        [{_, Term, Leader, _}] -> {Term, Leader}
    end.

-spec get_current_term(Table :: wa_raft:table(), Partition :: wa_raft:partition()) ->
    wa_raft_log:log_term() | undefined.
get_current_term(Table, Partition) ->
    ets:lookup_element(?MODULE, ?CURRENT_TERM_INFO_KEY(Table, Partition), 2, undefined).

-spec get_leader(Table :: wa_raft:table(), Partition :: wa_raft:partition()) -> node() | undefined.
get_leader(Table, Partition) ->
    ets:lookup_element(?MODULE, ?CURRENT_TERM_INFO_KEY(Table, Partition), 3, undefined).

-spec get_status(Table :: wa_raft:table(), Partition :: wa_raft:partition()) -> Status | undefined when
    Status ::
        {
            State :: wa_raft_server:state(),
            Live :: boolean(),
            Lagging :: boolean(),
            Stale :: boolean()
        }.
get_status(Table, Partition) ->
    case ets:lookup(?MODULE, ?STATUS_KEY(Table, Partition)) of
        [] -> undefined;
        [{_, State, Live, Lagging, Stale}] -> {State, Live, Lagging, Stale}
    end.

-spec get_state(Table :: wa_raft:table(), Partition :: wa_raft:partition()) ->
    wa_raft_server:state() | undefined.
get_state(Table, Partition) ->
    ets:lookup_element(?MODULE, ?STATUS_KEY(Table, Partition), 2, undefined).

-spec get_live(Table :: wa_raft:table(), Partition :: wa_raft:partition()) -> boolean().
get_live(Table, Partition) ->
    ets:lookup_element(?MODULE, ?STATUS_KEY(Table, Partition), 3, false).

-spec get_lagging(Table :: wa_raft:table(), Partition :: wa_raft:partition()) -> boolean().
get_lagging(Table, Partition) ->
    ets:lookup_element(?MODULE, ?STATUS_KEY(Table, Partition), 4, true).

-spec get_stale(Table :: wa_raft:table(), Partition :: wa_raft:partition()) -> boolean().
get_stale(Table, Partition) ->
    ets:lookup_element(?MODULE, ?STATUS_KEY(Table, Partition), 5, true).

-spec get_message_queue_length(Name :: atom()) -> non_neg_integer() | undefined.
get_message_queue_length(Name) ->
    ets:lookup_element(?MODULE, ?MESSAGE_QUEUE_LENGTH_KEY(Name), 2, undefined).

%%-------------------------------------------------------------------
%% RAFT Info - Internal API
%%-------------------------------------------------------------------

-spec init_tables() -> ok.
init_tables() ->
    ets:new(?MODULE, [set, public, named_table, {write_concurrency, true}, {read_concurrency, true}]),
    ok.

-spec set_current_term_info(
    Table :: wa_raft:table(),
    Partition :: wa_raft:partition(),
    Term :: wa_raft_log:log_term(),
    Leader :: node() | undefined,
    Candidate :: node() | undefined
) -> boolean().
set_current_term_info(Table, Partition, Term, Leader, Candidate) ->
    ets:update_element(
        ?MODULE,
        ?CURRENT_TERM_INFO_KEY(Table, Partition),
        [{2, Term}, {3, Leader}, {4, Candidate}],
        {?CURRENT_TERM_INFO_KEY(Table, Partition), Term, Leader, Candidate}
    ).

-spec set_status(
    Table :: wa_raft:table(),
    Partition :: wa_raft:partition(),
    State :: wa_raft_server:state(),
    Live :: boolean(),
    Lagging :: boolean(),
    Stale :: boolean()
) -> boolean().
set_status(Table, Partition, State, Live, Lagging, Stale) ->
    ets:update_element(
        ?MODULE,
        ?STATUS_KEY(Table, Partition),
        [{2, State}, {3, Live}, {4, Lagging}, {5, Stale}],
        {?STATUS_KEY(Table, Partition), State, Live, Lagging, Stale}
    ).

-spec set_message_queue_length(Name :: atom(), Length :: non_neg_integer()) -> boolean().
set_message_queue_length(Name, Length) ->
    ets:update_element(
        ?MODULE,
        ?MESSAGE_QUEUE_LENGTH_KEY(Name),
        {2, Length},
        {?MESSAGE_QUEUE_LENGTH_KEY(Name), Length}
    ).

-spec refresh_message_queue_length(Name :: atom()) -> boolean().
refresh_message_queue_length(Name) ->
    {message_queue_len, Length} = process_info(self(), message_queue_len),
    set_message_queue_length(Name, Length).

-spec clear(Table :: wa_raft:table(), Partition :: wa_raft:partition(), Name :: atom()) -> true.
clear(Table, Partition, Name) ->
    safe_delete(?STATUS_KEY(Table, Partition)),
    safe_delete(?MESSAGE_QUEUE_LENGTH_KEY(Name)),
    true.

-spec safe_delete(Key :: term()) -> true.
safe_delete(Key) ->
    case ets:whereis(?MODULE) of
        undefined ->
            true;
        Table ->
            try
                ets:delete(Table, Key),
                true
            catch
                error:badarg ->
                    true
            end
    end.