Skip to main content

src/wa_raft_queue.erl

%%% Copyright (c) Meta Platforms, Inc. and affiliates. All rights reserved.
%%%
%%% This source code is licensed under the Apache 2.0 license found in
%%% the LICENSE file in the root directory of this source tree.
%%%
%%% This module implements tracking of pending requests and queue limits.

-module(wa_raft_queue).
-compile(warn_missing_spec_all).
-behaviour(gen_server).

%% PUBLIC API
-export([
    queues/1,
    queues/2,
    commit_queue_size/2,
    commit_queue_size/3,
    commit_queue_full/2,
    commit_queue_full/3,
    apply_queue_size/1,
    apply_queue_size/2,
    apply_queue_byte_size/1,
    apply_queue_byte_size/2,
    apply_queue_full/1,
    apply_queue_full/2
]).

%% INTERNAL API
-export([
    default_name/2,
    default_counters/0,
    default_read_queue_name/2,
    registered_name/2
]).

%% PENDING COMMIT QUEUE API
-export([
    commit_started/2,
    commit_cancelled/4,
    commit_completed/4
]).

%% PENDING READ API
-export([
    reserve_read/1,
    submit_read/4,
    query_reads/2,
    fulfill_read/3,
    fulfill_incomplete_read/3,
    fulfill_all_reads/2
]).

%% APPLY QUEUE API
-export([
    reserve_apply/2,
    fulfill_apply/2
]).

%% OTP SUPERVISION
-export([
    child_spec/1,
    start_link/1
]).

%% QUEUE SERVER CALLBACKS
-export([
    init/1,
    handle_call/3,
    handle_cast/2,
    terminate/2
]).

%% TYPES
-export_type([
    queues/0
]).

-include_lib("stdlib/include/ms_transform.hrl"). % used by ets:fun2ms
-include_lib("wa_raft/include/wa_raft.hrl").
-include_lib("wa_raft/include/wa_raft_logger.hrl").

%%-------------------------------------------------------------------

%% ETS table creation options shared by all queue tables
-define(RAFT_QUEUE_TABLE_OPTIONS, [named_table, public, {read_concurrency, true}, {write_concurrency, true}]).

%% Total number of counters for RAFT partition specific counters
-define(RAFT_NUMBER_OF_QUEUE_SIZE_COUNTERS, 5).
%% Index into counter reference for counter tracking apply queue size
-define(RAFT_APPLY_QUEUE_SIZE_COUNTER, 1).
%% Index into counter reference for counter tracking apply total byte size
-define(RAFT_APPLY_QUEUE_BYTE_SIZE_COUNTER, 2).
%% Index into counter reference for counter tracking high priority commit queue size
-define(RAFT_HIGH_PRIORITY_COMMIT_QUEUE_SIZE_COUNTER, 3).
%% Index into counter reference for counter tracking low priority commit queue size
-define(RAFT_LOW_PRIORITY_COMMIT_QUEUE_SIZE_COUNTER, 4).
%% Index into counter reference for counter tracking read queue size
-define(RAFT_READ_QUEUE_SIZE_COUNTER, 5).

%%-------------------------------------------------------------------
%% INTERNAL TYPES
%%-------------------------------------------------------------------

-record(state, {
    name :: atom()
}).

-record(queues, {
    application :: atom(),
    table :: wa_raft:table(),
    counters :: atomics:atomics_ref(),
    reads :: atom()
}).
-opaque queues() :: #queues{}.

%%-------------------------------------------------------------------
%% PUBLIC API
%%-------------------------------------------------------------------

-spec queues(Options :: #raft_options{}) -> Queues :: queues().
queues(Options) ->
    #queues{
        application = Options#raft_options.application,
        table = Options#raft_options.table,
        counters = Options#raft_options.queue_counters,
        reads = Options#raft_options.queue_reads
    }.

-spec queues(Table :: wa_raft:table(), Partition :: wa_raft:partition()) -> Queues :: queues() | undefined.
queues(Table, Partition) ->
    case wa_raft_part_sup:options(Table, Partition) of
        undefined -> undefined;
        Options -> queues(Options)
    end.

-spec commit_queue_size(Queues :: queues(), Priority :: wa_raft_acceptor:priority()) -> non_neg_integer().
commit_queue_size(#queues{counters = Counters}, high) ->
    atomics:get(Counters, ?RAFT_HIGH_PRIORITY_COMMIT_QUEUE_SIZE_COUNTER);
commit_queue_size(#queues{counters = Counters}, low) ->
    atomics:get(Counters, ?RAFT_LOW_PRIORITY_COMMIT_QUEUE_SIZE_COUNTER).

-spec commit_queue_size(Table :: wa_raft:table(), Partition :: wa_raft:partition(), Priority :: wa_raft_acceptor:priority()) -> non_neg_integer().
commit_queue_size(Table, Partition, Priority) ->
    case queues(Table, Partition) of
        undefined -> 0;
        Queue     -> commit_queue_size(Queue, Priority)
    end.

-spec commit_queue_full(Queues :: queues(), Priority :: wa_raft_acceptor:priority()) -> boolean().
commit_queue_full(#queues{application = App, table = Table, counters = Counters}, high) ->
    atomics:get(Counters, ?RAFT_HIGH_PRIORITY_COMMIT_QUEUE_SIZE_COUNTER) >= ?RAFT_MAX_PENDING_HIGH_PRIORITY_COMMITS(App, Table);
commit_queue_full(#queues{application = App, table = Table, counters = Counters}, low) ->
    atomics:get(Counters, ?RAFT_LOW_PRIORITY_COMMIT_QUEUE_SIZE_COUNTER) >= ?RAFT_MAX_PENDING_LOW_PRIORITY_COMMITS(App, Table).

-spec commit_queue_full(Table :: wa_raft:table(), Partition :: wa_raft:partition(), Priority :: wa_raft_acceptor:priority()) -> boolean().
commit_queue_full(Table, Partition, Priority) ->
    case queues(Table, Partition) of
        undefined -> false;
        Queues    -> commit_queue_full(Queues, Priority)
    end.

-spec apply_queue_size(Queues :: queues()) -> non_neg_integer().
apply_queue_size(#queues{counters = Counters}) ->
    atomics:get(Counters, ?RAFT_APPLY_QUEUE_SIZE_COUNTER).

-spec apply_queue_size(Table :: wa_raft:table(), Partition :: wa_raft:partition()) -> non_neg_integer().
apply_queue_size(Table, Partition) ->
    case queues(Table, Partition) of
        undefined -> 0;
        Queues    -> apply_queue_size(Queues)
    end.

-spec apply_queue_byte_size(Queues :: queues()) -> non_neg_integer().
apply_queue_byte_size(#queues{counters = Counters}) ->
    atomics:get(Counters, ?RAFT_APPLY_QUEUE_BYTE_SIZE_COUNTER).

-spec apply_queue_byte_size(wa_raft:table(), wa_raft:partition()) -> non_neg_integer().
apply_queue_byte_size(Table, Partition) ->
    case queues(Table, Partition) of
        undefined -> 0;
        Queues    -> apply_queue_byte_size(Queues)
    end.

-spec apply_queue_full(Queues :: queues()) -> boolean().
apply_queue_full(#queues{application = App, table = Table, counters = Counters}) ->
    atomics:get(Counters, ?RAFT_APPLY_QUEUE_SIZE_COUNTER) >= ?RAFT_MAX_PENDING_APPLIES(App, Table) orelse
        atomics:get(Counters, ?RAFT_APPLY_QUEUE_BYTE_SIZE_COUNTER) >= ?RAFT_MAX_PENDING_APPLY_BYTES(App, Table).

-spec apply_queue_full(wa_raft:table(), wa_raft:partition()) -> boolean().
apply_queue_full(Table, Partition) ->
    case queues(Table, Partition) of
        undefined -> false;
        Queues    -> apply_queue_full(Queues)
    end.

%%-------------------------------------------------------------------
%% INTERNAL API
%%-------------------------------------------------------------------

%% Get the default name for the RAFT queue server associated with the
%% provided RAFT partition.
-spec default_name(Table :: wa_raft:table(), Partition :: wa_raft:partition()) -> Name :: atom().
default_name(Table, Partition) ->
    % elp:ignore W0023 bounded atom, one per table/partition at startup
    binary_to_atom(<<"raft_queue_", (atom_to_binary(Table))/bytes, "_", (integer_to_binary(Partition))/bytes>>).

%% Create a properly-sized atomics array for use by a RAFT queue
-spec default_counters() -> Counters :: atomics:atomics_ref().
default_counters() ->
    atomics:new(?RAFT_NUMBER_OF_QUEUE_SIZE_COUNTERS, []).

%% Get the default name for the RAFT read queue ETS table associated with the
%% provided RAFT partition.
-spec default_read_queue_name(Table :: wa_raft:table(), Partition :: wa_raft:partition()) -> Name :: atom().
default_read_queue_name(Table, Partition) ->
    % elp:ignore W0023 bounded atom, one per table/partition at startup
    binary_to_atom(<<"raft_read_queue_", (atom_to_binary(Table))/bytes, "_", (integer_to_binary(Partition))/bytes>>).

%% Get the registered name for the RAFT queue server associated with the
%% provided RAFT partition or the default name if no registration exists.
-spec registered_name(Table :: wa_raft:table(), Partition :: wa_raft:partition()) -> Name :: atom().
registered_name(Table, Partition) ->
    case wa_raft_part_sup:options(Table, Partition) of
        undefined -> default_name(Table, Partition);
        Options   -> Options#raft_options.queue_name
    end.

%%-------------------------------------------------------------------
%% PENDING COMMIT QUEUE API
%%-------------------------------------------------------------------

-spec commit_started(Queues :: queues(), Priority :: wa_raft_acceptor:priority()) -> ok | apply_queue_full | commit_queue_full.
commit_started(#queues{table = Table, counters = Counters} = Queues, Priority) ->
    case commit_queue_full(Queues, Priority) of
        true ->
            ?RAFT_COUNT(Table, {'acceptor.error.commit_queue_full', Priority}),
            commit_queue_full;
        false ->
            case apply_queue_full(Queues) of
                true ->
                    apply_queue_full;
                false ->
                    PendingCommits =
                        case Priority of
                            high ->
                                atomics:add_get(Counters, ?RAFT_HIGH_PRIORITY_COMMIT_QUEUE_SIZE_COUNTER, 1);
                            low ->
                                atomics:add_get(Counters, ?RAFT_LOW_PRIORITY_COMMIT_QUEUE_SIZE_COUNTER, 1)
                        end,
                    ?RAFT_GATHER(Table, {'acceptor.commit.request.pending', Priority}, PendingCommits),
                    ok
            end
    end.


-spec commit_cancelled(Queues :: queues(), From :: gen_server:from(), Reason :: wa_raft_acceptor:commit_error() | undefined, Priority :: wa_raft_acceptor:priority()) -> ok.
commit_cancelled(#queues{counters = Counters}, From, Reason, Priority) ->
    case Priority of
        high ->
            atomics:sub(Counters, ?RAFT_HIGH_PRIORITY_COMMIT_QUEUE_SIZE_COUNTER, 1);
        low ->
            atomics:sub(Counters, ?RAFT_LOW_PRIORITY_COMMIT_QUEUE_SIZE_COUNTER, 1)
    end,
    Reason =/= undefined andalso gen_server:reply(From, Reason),
    ok.

-spec commit_completed(Queues :: queues(), From :: gen_server:from(), Reply :: term(), Priority :: wa_raft_acceptor:priority()) -> ok.
commit_completed(#queues{counters = Counters}, From, Reply, Priority) ->
    case Priority of
        high ->
            atomics:sub(Counters, ?RAFT_HIGH_PRIORITY_COMMIT_QUEUE_SIZE_COUNTER, 1);
        low ->
            atomics:sub(Counters, ?RAFT_LOW_PRIORITY_COMMIT_QUEUE_SIZE_COUNTER, 1)
    end,
    gen_server:reply(From, Reply),
    ok.

%%-------------------------------------------------------------------
%% PENDING READ QUEUE API
%%-------------------------------------------------------------------

% Inspects the read and apply queues to check if a strong read is allowed
% to be submitted to the RAFT server currently. If so, then returns 'ok'
% and increments the read counter. Inspecting the queues and actually
% adding the read request to the table are done in two stages for reads
% because the acceptor does not have enough information to add the read
% to the ETS table directly.
-spec reserve_read(Queues :: queues()) -> ok | read_queue_full | apply_queue_full.
reserve_read(#queues{application = App, table = Table, counters = Counters}) ->
    PendingReads = atomics:get(Counters, ?RAFT_READ_QUEUE_SIZE_COUNTER),
    case PendingReads >= ?RAFT_MAX_PENDING_READS(App, Table) of
        true -> read_queue_full;
        false ->
            case atomics:get(Counters, ?RAFT_APPLY_QUEUE_SIZE_COUNTER) >= ?RAFT_MAX_PENDING_APPLIES(App, Table) of
                true -> apply_queue_full;
                false ->
                    ?RAFT_GATHER(Table, 'acceptor.strong_read.request.pending', PendingReads + 1),
                    atomics:add(Counters, ?RAFT_READ_QUEUE_SIZE_COUNTER, 1),
                    ok
            end
    end.

% Called from the RAFT server once it knows the proper ReadIndex for the
% read request to add the read request to the reads table for storage
% to handle upon applying.
-spec submit_read(Queues :: queues(), wa_raft_log:log_index(), From :: gen_server:from(), Command :: wa_raft_acceptor:command()) -> ok.
submit_read(#queues{reads = Reads}, ReadIndex, From, Command) ->
    ets:insert(Reads, {{ReadIndex, make_ref()}, From, Command}),
    ok.

-spec query_reads(Queues :: queues(), wa_raft_log:log_index() | infinity) -> [{{wa_raft_log:log_index(), reference()}, wa_raft_acceptor:command()}].
query_reads(#queues{reads = Reads}, MaxLogIndex) ->
    MatchSpec = ets:fun2ms(
        fun({{LogIndex, Reference}, _, Command}) when LogIndex =< MaxLogIndex ->
            {{LogIndex, Reference}, Command}
        end
    ),
    ets:select(Reads, MatchSpec).

-spec fulfill_read(Queues :: queues(), {wa_raft_log:log_index(), reference()}, dynamic()) -> ok | not_found.
fulfill_read(#queues{counters = Counters, reads = Reads}, Reference, Reply) ->
    case ets:take(Reads, Reference) of
        [{Reference, From, _}] ->
            atomics:sub(Counters, ?RAFT_READ_QUEUE_SIZE_COUNTER, 1),
            gen_server:reply(From, Reply);
        [] ->
            not_found
    end.

% Complete a read that was reserved by the RAFT acceptor but was rejected
% before it could be added to the read queue and so has no reference.
-spec fulfill_incomplete_read(Queues :: queues(), gen_server:from(), wa_raft_acceptor:read_error()) -> ok.
fulfill_incomplete_read(#queues{counters = Counters}, From, Reply) ->
    atomics:sub(Counters, ?RAFT_READ_QUEUE_SIZE_COUNTER, 1),
    gen_server:reply(From, Reply).

% Fulfill a pending reads with an error that indicates that the read was not completed.
-spec fulfill_all_reads(Queues :: queues(), wa_raft_acceptor:read_error()) -> ok.
fulfill_all_reads(#queues{counters = Counters, reads = Reads}, Reply) ->
    lists:foreach(
        fun ({Reference, _, _}) ->
            case ets:take(Reads, Reference) of
                [{Reference, From, _}] ->
                    atomics:sub(Counters, ?RAFT_READ_QUEUE_SIZE_COUNTER, 1),
                    gen_server:reply(From, Reply);
                [] ->
                    ok
            end
        end, ets:tab2list(Reads)).

%%-------------------------------------------------------------------
%% APPLY QUEUE API
%%-------------------------------------------------------------------

-spec reserve_apply(Queues :: queues(), non_neg_integer()) -> ok.
reserve_apply(#queues{counters = Counters}, Size) ->
    atomics:add(Counters, ?RAFT_APPLY_QUEUE_SIZE_COUNTER, 1),
    atomics:add(Counters, ?RAFT_APPLY_QUEUE_BYTE_SIZE_COUNTER, Size).

-spec fulfill_apply(Queues :: queues(), non_neg_integer()) -> ok.
fulfill_apply(#queues{counters = Counters}, Size) ->
    atomics:sub(Counters, ?RAFT_APPLY_QUEUE_SIZE_COUNTER, 1),
    atomics:sub(Counters, ?RAFT_APPLY_QUEUE_BYTE_SIZE_COUNTER, Size).

%%-------------------------------------------------------------------
%% OTP SUPERVISION
%%-------------------------------------------------------------------

-spec child_spec(Options :: #raft_options{}) -> supervisor:child_spec().
child_spec(Options) ->
    #{
        id => ?MODULE,
        start => {?MODULE, start_link, [Options]},
        restart => transient,
        shutdown => 1000,
        modules => [?MODULE]
    }.

-spec start_link(Options :: #raft_options{}) -> gen_server:start_ret().
start_link(#raft_options{queue_name = Name} = Options) ->
    gen_server:start_link({local, Name}, ?MODULE, Options, []).

%%-------------------------------------------------------------------
%% QUEUE SERVER CALLBACKS
%%-------------------------------------------------------------------

-spec init(Options :: #raft_options{}) -> {ok, #state{}}.
init(
    #raft_options{
        table = Table,
        partition = Partition,
        queue_name = Name,
        queue_counters = Counters,
        queue_reads = ReadsName
    }
) ->
    process_flag(trap_exit, true),

    ?RAFT_LOG_NOTICE(
        "Queue[~p] starting for partition ~0p/~0p with read queue ~0p",
        [Name, Table, Partition, ReadsName]
    ),

    % The queue process is the first process in the supervision for a single
    % RAFT partition. The supervisor is configured to restart all processes if
    % even a single process fails. Since the queue process is starting up, all
    % queues tracked should be empty so reset all counters.
    [atomics:put(Counters, Index, 0) || Index <- lists:seq(1, ?RAFT_NUMBER_OF_QUEUE_SIZE_COUNTERS)],

    % Create ETS table for pending reads.
    ets:new(ReadsName, [ordered_set | ?RAFT_QUEUE_TABLE_OPTIONS]),

    {ok, #state{name = Name}}.

-spec handle_call(Request :: term(), From :: gen_server:from(), State :: #state{}) -> {noreply, #state{}}.
handle_call(Request, From, #state{name = Name} = State) ->
    ?RAFT_LOG_NOTICE("Queue[~p] got unexpected request ~0P from ~0p", [Name, Request, 100, From]),
    {noreply, State}.

-spec handle_cast(Request :: term(), State :: #state{}) -> {noreply, #state{}}.
handle_cast(Request, #state{name = Name} = State) ->
    ?RAFT_LOG_NOTICE("Queue[~p] got unexpected call ~0P", [Name, Request, 100]),
    {noreply, State}.

-spec terminate(Reason :: dynamic(), State :: #state{}) -> ok.
terminate(Reason, #state{name = Name}) ->
    ?RAFT_LOG_NOTICE("Queue[~p] terminating due to ~0P", [Name, Reason, 100]).