Skip to main content

src/wa_raft_log.erl

%%% Copyright (c) Meta Platforms, Inc. and affiliates. All rights reserved.
%%%
%%% This source code is licensed under the Apache 2.0 license found in
%%% the LICENSE file in the root directory of this source tree.
%%%
%%% This module is the interface for raft log. It defines the callbacks
%%% required by the specific log implementations.

-module(wa_raft_log).
-compile(warn_missing_spec_all).
-behaviour(gen_server).

%% OTP supervision
-export([
    child_spec/1,
    start_link/1
]).

%% APIs for writing new log data
-export([
    append/2,
    try_append/2,
    try_append/3,
    check_heartbeat/3
]).

%% APIs for accessing log data
-export([
    first_index/1,
    last_index/1,

    fold/5,
    fold/6,
    fold_binary/5,
    fold_binary/6,
    fold_terms/5,

    term/2,
    get/2,
    get/3,
    get/4,
    get_terms/3,

    entries/3,
    entries/4,

    config/1
]).

%% APIs for managing logs and log data
-export([
    open/2,
    reset/2,
    truncate/2,
    trim/2,
    rotate/2, rotate/4,
    flush/1
]).

%% Internal API
-export([
    default_name/2,
    registered_name/2
]).

%% Internal API
-export([
    log/1,
    log_name/1,
    provider/1
]).

%% gen_server callbacks
-export([
    init/1,
    handle_call/3,
    handle_cast/2,
    terminate/2
]).

-export_type([
    log/0,
    log_name/0,
    log_pos/0,
    log_op/0,
    log_index/0,
    log_term/0,
    log_entry/0,
    log_record/0,
    view/0
]).

-include_lib("wa_raft/include/wa_raft.hrl").
-include_lib("wa_raft/include/wa_raft_logger.hrl").

%% Atom indicating that the provider has not been opened yet.
-define(PROVIDER_NOT_OPENED, '$not_opened').

%% A view of a RAFT log that is backed by a particular
%% log provider. This view keeps track of its own logical
%% start and end indices as well as a batch of pending
%% log entries so that the RAFT server is always able to
%% access a consistent view of the RAFT log given simple
%% RAFT log provider implementations.
-record(log_view, {
    log :: log(),
    first = 0 :: log_index(),
    last = 0 :: log_index(),
    config :: undefined | {log_index(), wa_raft_server:config()}
}).

%% The state stored by the RAFT log server which is
%% responsible for synchronizing destructive operations
%% on the RAFT log with operations that are performed
%% asynchronously to the RAFT server.
-record(log_state, {
    log :: log(),
    state = ?PROVIDER_NOT_OPENED :: term()
}).

%% Name of a raft log.
-type log() :: #raft_log{}.
-type log_name() :: atom().
-type log_index() :: non_neg_integer().
-type log_term() :: non_neg_integer().
-type log_pos() :: #raft_log_pos{}.
-type log_op() ::
    undefined
    | {wa_raft_acceptor:key(), wa_raft_acceptor:command()}
    | {wa_raft_acceptor:key(), wa_raft_label:label(), wa_raft_acceptor:command()}.
-type log_entry() :: {log_term(), log_op()}.
-type log_record() :: {log_index(), log_entry()}.

%% A view of a RAFT log.
-opaque view() :: #log_view{}.

%%-------------------------------------------------------------------
%% RAFT log provider interface for accessing log data
%%-------------------------------------------------------------------

%% Gets the first index of the RAFT log. If there are no log entries,
%% then return 'undefined'.
-callback first_index(Log :: log()) -> undefined | FirstIndex :: log_index() | {error, Reason :: term()}.

%% Gets the last index of the RAFT log. If there are no log entries,
%% then return 'undefined'.
-callback last_index(Log :: log()) -> undefined | LastIndex :: log_index() | {error, Reason :: term()}.

%% Call the provided combining function on successive log entries in the
%% specified log starting from the specified start index (inclusive) and ending
%% at the specified end index (also inclusive) or until the total cumulative
%% size of log entries for which the combining function has been called upon
%% is at least the specified size limit. The combining function must always be
%% called with log entries in log order starting with the first log entry that
%% exists within the provided range. The combining function should not be
%% called for those log indices within the provided range that do not have a
%% stored log entry. It is suggested that the external term size of the entire
%% log entry be used as the size provided to the combining function and for
%% tracking the size limit; however, implementations are free to use any value.
%%
%% The combining function may raise an error. Implementations should take care
%% to release any resources held in the case that the fold needs to terminate
%% early.
-callback fold(Log :: log(),
               Start :: log_index(),
               End :: log_index(),
               SizeLimit :: non_neg_integer() | infinity,
               Func :: fun((Index :: log_index(), Size :: non_neg_integer(), Entry :: log_entry(), Acc) -> Acc),
               Acc) -> {ok, Acc} | {error, Reason :: term()}.

%% Call the provided combining function on the external term format of
%% successive log entries in the specified log starting from the specified
%% start index (inclusive) and ending at the specified end index (also
%% inclusive) or until the total cumulative size of log entries for which the
%% combining function has been called upon is at least the specified size
%% limit. The combining function must always be called with log entries in log
%% order starting with the first log entry that exists within the provided
%% range. The combining function should not be called for those log indices
%% within the provided range that do not have a stored log entry. The byte
%% size of the binary provided to the combining function must be used as the
%% size of the entry for tracking of the size limit.
%%
%% The combining function may raise an error. Implementations should take care
%% to release any resources held in the case that the fold needs to terminate
%% early.
-callback fold_binary(
    Log :: log(),
    Start :: log_index(),
    End :: log_index(),
    SizeLimit :: non_neg_integer() | infinity,
    Func :: fun((Index :: log_index(), Entry :: binary(), Acc) -> Acc),
    Acc
) -> {ok, Acc} | {error, Reason :: term()}.
-optional_callbacks([fold_binary/6]).

%% Call the provided combining function on the log term of successive log
%% entries in the specified log starting from the specified start index
%% (inclusive) and ending at the specified end index (also inclusive). The
%% combining function must always be called with log entries in log order
%% starting with the first log entry that exists within the provided range. The
%% combining function should not be called for those log indices within the
%% provided range that do not have a stored log entry.
%%
%% The combining function may raise an error. Implementations should take care
%% to release any resources held in the case that the fold needs to terminate
%% early.
-callback fold_terms(Log :: log(),
                     Start :: log_index(),
                     End :: log_index(),
                     Func :: fun((Index :: log_index(), Term :: log_term(), Acc) -> Acc),
                     Acc) -> {ok, Acc} | {error, Reason :: term()}.

%% Get a single log entry at the specified index. This API is specified
%% separately because some implementations may have more efficient ways to
%% get log entries when only one log entry is required. If the log entry
%% does not exist, then return 'not_found'.
-callback get(Log :: log(), Index :: log_index()) -> {ok, Entry :: log_entry()} | not_found | {error, Reason :: term()}.

%% Get only the term of a specific log entry. This API is specified
%% separately because some implementations may have more efficient ways to
%% get just the term of a particular log entry. If the log entry does not
%% exist, then return 'not_found'.
-callback term(Log :: log(), Index :: log_index()) -> {ok, Term :: log_term()} | not_found | {error, Reason :: term()}.

%% Get the most recent configuration stored in the log. Log providers
%% should ensure that configuration operations are indexed so that this
%% call does not require a scan of the log.
-callback config(Log :: log()) -> {ok, Index :: log_index(), Config :: wa_raft_server:config()} | not_found | {error, Reason :: term()}.

%%-------------------------------------------------------------------
%% RAFT log provider interface for writing new log data
%%-------------------------------------------------------------------

%% Append new log entries to the end of the RAFT log.
%%  - This function should never overwrite existing log entries.
%%  - If the new log entries were written successfully, return 'ok'.
%%  - Log entries may be provided either as terms directly or as a
%%    binary encoding a log entry in external term format.
%%  - In 'strict' mode, the append should always succeed or return an
%%    error on failure.
%%  - In 'relaxed' mode, if there are transient conditions that would
%%    make it difficult to append to the log without blocking, then
%%    the append should be skipped and 'skipped' returned. Otherwise,
%%    the same conditions as the 'strict' mode apply.
-callback append(View :: view(), Entries :: [log_entry() | binary()], Mode :: strict | relaxed, Priority :: wa_raft_acceptor:priority()) ->
    ok | skipped | {error, Reason :: term()}.

%%-------------------------------------------------------------------
%% RAFT log provider interface for managing underlying RAFT log
%%-------------------------------------------------------------------

%% Perform any first time setup operations before opening the RAFT log.
%% This function is called from the RAFT log server and is only called
%% once per incarnation of a RAFT partition.
%% If this setup fails such that the log is not usable, implementations
%% should raise an error or exit to interrupt the startup process.
-callback init(Log :: wa_raft_log:log()) -> ok.

%% Open the RAFT log and return a state term that will be provided to
%% subsequent calls made from the RAFT log server. During the opening
%% process, the log will be inspected to see if it contains a record
%% corresponding to the last applied index of the storage backing this
%% RAFT partition and whether or not the term of this entry matches
%% that reported by the storage. If so, then opening proceeds normally.
%% If there is a mismatch, then the log will be reinitialized using
%% `reset/3`.
%% If this setup fails such that the log is not usable, implementations
%% should raise an error or exit to interrupt the opening process.
-callback open(Log :: wa_raft_log:log()) -> {ok, State :: term()} | {error, Reason :: term()}.

%% Close the RAFT log and release any resources used by it. This
%% is called when the RAFT log server is terminating.
-callback close(Log :: log(), State :: term()) -> term().

%% Completely clear the RAFT log and setup a new log with an initial entry
%% at the provided index with the provided term and an undefined op.
-callback reset(Log :: log(), Position :: log_pos(), State :: term()) -> {ok, NewState :: term()} | {error, Reason :: term()}.

%% Truncate the RAFT log to the given position so that all log entries
%% including and after the provided index are completely deleted from
%% the RAFT log. If the truncation failed but the log state was not
%% changed, then an error can be returned. Otherwise, a error should
%% be raised.
-callback truncate(Log :: log(), Index :: log_index(), State :: term()) -> {ok, NewState :: term()} | {error, Reason :: term()}.

%% Optionally, trim the RAFT log up to the given index.
%%  - This means that all log entries before the given index can be
%%    deleted (both term and op information can be removed) and the
%%    log entry at the given index can have its op removed (keeping
%%    only the term information).
%%  - Implementations are not required to always trim the log to exactly
%%    the provided index but must not trim past the provided index and
%%    must always ensure that if the log were to be reloaded from disk
%%    at any time that the log always remains contiguous, meaning that
%%    only the first entry in the log can be missing op information and
%%    that the indices of all log entries in the log are contiguous.
-callback trim(Log :: log(), Index :: log_index(), State :: term()) -> {ok, NewState :: term()} | {error, Reason :: term()}.

%% Flush log to disk on a best-effort basis. The return value is
%% ignored.
-callback flush(Log :: log()) -> term().

%%-------------------------------------------------------------------
%% RAFT log provider interface for writing new log data
%%-------------------------------------------------------------------

-spec child_spec(Options :: #raft_options{}) -> supervisor:child_spec().
child_spec(Options) ->
    #{
        id => ?MODULE,
        start => {?MODULE, start_link, [Options]},
        restart => permanent,
        shutdown => 30000,
        modules => [?MODULE]
    }.

-spec start_link(Options :: #raft_options{}) -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}.
start_link(#raft_options{log_name = Name} = Options) ->
    gen_server:start_link({local, Name}, ?MODULE, Options, []).

%%-------------------------------------------------------------------
%% APIs for writing new log data
%%-------------------------------------------------------------------

%% Append new log entries to the end of the log.
-spec append(View :: view(), Entries :: [log_entry() | binary()]) ->
    {ok, NewView :: view()} | {error, Reason :: term()}.
append(View, Entries) ->
    % eqwalizer:ignore - strict append cannot return skipped
    append(View, Entries, strict, high).

%% Attempt to append new log entries to the end of the log if an append can be
%% serviced immediately.
-spec try_append(View :: view(), Entries :: [log_entry() | binary()]) ->
    {ok, NewView :: view()} | skipped | {error, Reason :: term()}.
try_append(View, Entries) ->
    try_append(View, Entries, high).

-spec try_append(View :: view(), Entries :: [log_entry() | binary()], Priority :: wa_raft_acceptor:priority()) ->
    {ok, NewView :: view()} | skipped | {error, Reason :: term()}.
try_append(View, Entries, Priority) ->
    append(View, Entries, relaxed, Priority).

%% Append new log entries to the end of the log.
-spec append(View :: view(), Entries :: [log_entry() | binary()], Mode :: strict | relaxed, Priority :: wa_raft_acceptor:priority()) ->
    {ok, NewView :: view()} | skipped | {error, Reason :: term()}.
append(#log_view{log = #raft_log{table = Table}, last = Last} = View, Entries, Mode, Priority) ->
    ?RAFT_COUNT(Table, 'log.append'),
    Provider = provider(View),
    case Provider:append(View, Entries, Mode, Priority) of
        ok ->
            ?RAFT_COUNT(Table, 'log.append.ok'),
            {ok, refresh_config(View#log_view{last = Last + length(Entries)})};
        skipped when Mode =:= relaxed ->
            ?RAFT_COUNT(Table, 'log.append.skipped'),
            skipped;
        {error, Reason} ->
            ?RAFT_COUNT(Table, 'log.append.error'),
            {error, Reason}
    end.

%% Compare the provided heartbeat log entries to the local log at the provided
%% starting position in preparation for an append operation:
%%  * If the provided starting position is before the start of the log or past
%%    the end of the log, the comparison will fail with an `out_of_range`
%%    error.
%%  * If there is a conflict between the provided heartbeat log entries and any
%%    local log entries due to a term mismatch, then the comparison will fail
%%    with a `conflict` tuple that contains the log index of the first log
%%    entry with a conflicting term and the list containing the corresponding
%%    heartbeat log entry and all subsequent heartbeat log entries.
%%  * Otherwise, the comparison will succeed. Any new log entries not already
%%    in the local log will be returned.
-spec check_heartbeat(View :: view(), Start :: log_index(), Entries :: [log_entry() | binary()]) ->
    {ok, NewEntries :: [log_entry() | binary()]} |
    {conflict, ConflictIndex :: log_index(), NewEntries :: [log_entry() | binary()]} |
    {invalid, out_of_range} |
    {error, term()}.
check_heartbeat(#log_view{first = First, last = Last}, Start, _Entries) when Start =< 0; Start < First; Start > Last ->
    {invalid, out_of_range};
check_heartbeat(#log_view{log = #raft_log{table = Table} = Log, last = Last}, Start, Entries) ->
    Provider = provider(Log),
    End = Start + length(Entries) - 1,
    try Provider:fold_terms(Log, Start, End, fun check_heartbeat_terms/3, {Start, Entries}) of
        % The fold should not terminate early if the provider is well-behaved.
        {ok, {Next, []}} when Next =:= End + 1 ->
            {ok, []};
        {ok, {Next, NewEntries}} when Next =:= Last + 1 ->
            {ok, NewEntries};
        {error, Reason} ->
            ?RAFT_COUNT(Table, 'log.heartbeat.error'),
            {error, Reason}
    catch
        throw:{conflict, ConflictIndex, ConflictEntries} ->
            {conflict, ConflictIndex, ConflictEntries};
        throw:{missing, Index} ->
            ?RAFT_COUNT(Table, 'log.heartbeat.corruption'),
            {error, {missing, Index}};
        throw:{bad_entry_term, Reason} ->
            ?RAFT_COUNT(Table, 'log.heartbeat.error'),
            {error, {bad_entry_term, Reason}}
    end.

-spec check_heartbeat_terms(Index :: wa_raft_log:log_index(), Term :: wa_raft_log:log_term(), Acc) -> Acc
    when Acc :: {Next :: wa_raft_log:log_index(), Entries :: [log_entry() | binary()]}.
check_heartbeat_terms(Index, Term, {Next, [Binary | Entries]}) when is_binary(Binary) ->
    try binary_to_term(Binary, [safe]) of
        Entry -> check_heartbeat_terms(Index, Term, {Next, [Entry | Entries]})
    catch
        _:Reason -> throw({bad_entry_term, Reason})
    end;
check_heartbeat_terms(Index, Term, {Index, [{Term, _} | Entries]}) ->
    {Index + 1, Entries};
check_heartbeat_terms(Index, _, {Index, [_ | _] = Entries}) ->
    throw({conflict, Index, Entries});
check_heartbeat_terms(_, _, {Index, [_ | _]}) ->
    throw({missing, Index}).

%%-------------------------------------------------------------------
%% APIs for accessing log data
%%-------------------------------------------------------------------

%% Gets the first index of the log view or as reported by the log provider.
-spec first_index(LogOrView :: log() | view()) -> FirstIndex :: log_index().
first_index(#log_view{first = First}) ->
    First;
first_index(Log) ->
    Provider = provider(Log),
    Provider:first_index(Log).

%% Gets the last index of the log view or as reported by the log provider.
-spec last_index(LogOrView :: log() | view()) -> LastIndex :: log_index().
last_index(#log_view{last = Last}) ->
    Last;
last_index(Log) ->
    Provider = provider(Log),
    Provider:last_index(Log).

-spec fold(LogOrView :: log() | view(),
           First :: log_index(),
           Last :: log_index() | infinity,
           Func :: fun((Index :: log_index(), Entry :: log_entry(), Acc) -> Acc),
           Acc) -> {ok, Acc} | {error, term()}.
fold(LogOrView, First, Last, Func, Acc) ->
    fold(LogOrView, First, Last, infinity, Func, Acc).

%% Call the provided combining function on successive log entries in the
%% specified log starting from the specified start index (inclusive) and ending
%% at the specified end index (also inclusive). The combining function will
%% always be called with log entries in log order starting with the first log
%% entry that exists within the provided range. The combining function will
%% not be called for those log indices within the provided range that do not
%% have a stored log entry. The size provided to the combining function when
%% requested is determined by the underlying log provider.
-spec fold(
    LogOrView :: log() | view(),
    First :: log_index(),
    Last :: log_index() | infinity,
    SizeLimit :: non_neg_integer() | infinity,
    Func ::
        fun((Index :: log_index(), Entry :: log_entry(), Acc) -> Acc)
        | fun((Index :: log_index(), Size :: non_neg_integer(), Entry :: log_entry(), Acc) -> Acc),
    Acc
) -> {ok, Acc} | {error, Reason :: term()}.
fold(LogOrView, RawFirst, RawLast, SizeLimit, Func, Acc) ->
    #raft_log{table = Table} = Log = log(LogOrView),
    First = max(RawFirst, first_index(LogOrView)),
    Last = min(RawLast, last_index(LogOrView)),
    ?RAFT_COUNT(Table, 'log.fold'),
    ?RAFT_COUNTV(Table, 'log.fold.total', Last - First + 1),
    AdjFunc = if
        is_function(Func, 3) -> fun (Index, _Size, Entry, InnerAcc) -> Func(Index, Entry, InnerAcc) end;
        is_function(Func, 4) -> Func
    end,
    Provider = provider(Log),
    case Provider:fold(Log, First, Last, SizeLimit, AdjFunc, Acc) of
        {ok, AccOut} ->
            {ok, AccOut};
        {error, Reason} ->
            ?RAFT_COUNT(Table, 'log.fold.error'),
            {error, Reason}
    end.

-spec fold_binary(
    LogOrView :: log() | view(),
    First :: log_index(),
    Last :: log_index() | infinity,
    Func :: fun((Index :: log_index(), Entry :: binary(), Acc) -> Acc),
    Acc
) -> {ok, Acc} | {error, term()}.
fold_binary(LogOrView, First, Last, Func, Acc) ->
    fold_binary(LogOrView, First, Last, infinity, Func, Acc).

%% Call the provided combining function on the external term format of
%% successive log entries in the specified log starting from the specified
%% start index (inclusive) and ending at the specified end index (also
%% inclusive). The combining function will always be called with log entries
%% in log order starting with the first log entry that exists within the
%% provided range. The combining function will not be called for those log
%% indices within the provided range that do not have a stored log entry. The
%% size provided to the combining function when requested is determined by the
%% underlying log provider.
-spec fold_binary(
    LogOrView :: log() | view(),
    First :: log_index(),
    Last :: log_index() | infinity,
    SizeLimit :: non_neg_integer() | infinity,
    Func :: fun((Index :: log_index(), Entry :: binary(), Acc) -> Acc),
    Acc
) -> {ok, Acc} | {error, term()}.
fold_binary(LogOrView, RawFirst, RawLast, SizeLimit, Func, Acc) ->
    #raft_log{table = Table} = Log = log(LogOrView),
    First = max(RawFirst, first_index(LogOrView)),
    Last = min(RawLast, last_index(LogOrView)),
    ?RAFT_COUNT(Table, 'log.fold_binary'),
    ?RAFT_COUNTV(Table, 'log.fold_binary.total', Last - First + 1),
    Provider = provider(Log),
    case Provider:fold_binary(Log, First, Last, SizeLimit, Func, Acc) of
        {ok, AccOut} ->
            {ok, AccOut};
        {error, Reason} ->
            ?RAFT_COUNT(Table, 'log.fold_binary.error'),
            {error, Reason}
    end.

%% Folds over the terms in the log view of raw entries from the log provider
%% between the provided first and last log indices (inclusive).
%% If there exists a log term between the provided first and last indices then
%% the accumulator function will be called on at least that term.
%% This API provides no validation of the log indices and term passed by the
%% provider to the callback function.
-spec fold_terms(LogOrView :: log() | view(),
                 First :: log_index(),
                 Last :: log_index(),
                 Func :: fun((Index :: log_index(), Term :: log_term(), Acc) -> Acc),
                 Acc) ->
    {ok, Acc} | {error, term()}.
fold_terms(#log_view{log = Log, first = LogFirst, last = LogLast}, First, Last, Func, Acc) ->
    fold_terms_impl(Log, max(First, LogFirst), min(Last, LogLast), Func, Acc);
fold_terms(Log, First, Last, Func, Acc) ->
    Provider = provider(Log),
    LogFirst = Provider:first_index(Log),
    LogLast = Provider:last_index(Log),
    fold_terms_impl(Log, max(First, LogFirst), min(Last, LogLast), Func, Acc).

-spec fold_terms_impl(
    Log :: log(),
    First :: log_index(),
    Last :: log_index(),
    Func :: fun((Index :: log_index(), Term :: log_term(), Acc) -> Acc),
    Acc :: term()
) -> {ok, Acc} | {error, term()}.
fold_terms_impl(#raft_log{table = Table} = Log, First, Last, Func, AccIn) ->
    ?RAFT_COUNT(Table, 'log.fold_terms'),
    ?RAFT_COUNTV(Table, 'log.fold_terms.total', Last - First + 1),
    Provider = provider(Log),
    case Provider:fold_terms(Log, First, Last, Func, AccIn) of
        {ok, AccOut} ->
            {ok, AccOut};
        {error, Reason} ->
            ?RAFT_COUNT(Table, 'log.fold_terms.error'),
            {error, Reason}
    end.

%% Gets the term of entry at the provided log index. When using a log view
%% this function may return 'not_found' even if the underlying log entry still
%% exists if the entry is outside of the log view.
-spec term(LogOrView :: log() | view(), Index :: log_index()) -> {ok, Term :: log_term()} | not_found | {error, term()}.
term(#log_view{first = First, last = Last}, Index) when Index < First; Last < Index ->
    not_found;
term(#log_view{log = Log}, Index) ->
    Provider = provider(Log),
    Provider:term(Log, Index);
term(Log, Index) ->
    Provider = provider(Log),
    Provider:term(Log, Index).

%% Gets the log entry at the provided log index. When using a log view
%% this function may return 'not_found' even if the underlying log entry still
%% exists if the entry is outside of the log view.
-spec get(LogOrView :: log() | view(), Index :: log_index()) -> {ok, Entry :: log_entry()} | not_found | {error, term()}.
get(#log_view{first = First, last = Last}, Index) when Index < First; Last < Index ->
    not_found;
get(#log_view{log = #raft_log{table = Table} = Log}, Index) ->
    ?RAFT_COUNT(Table, 'log.get'),
    Provider = provider(Log),
    Provider:get(Log, Index);
get(#raft_log{table = Table} = Log, Index) ->
    ?RAFT_COUNT(Table, 'log.get'),
    Provider = provider(Log),
    Provider:get(Log, Index).

%% Fetch a contiguous range of log entries containing up to the specified
%% number of log entries starting at the provided index. When using a log view,
%% only those log entries that fall within the provided view will be returned.
-spec get(LogOrView :: log() | view(), Start :: log_index(), CountLimit :: non_neg_integer()) ->
    {ok, Entries :: [log_entry()]} | {error, term()}.
get(LogOrView, Start, CountLimit) ->
    get(LogOrView, Start, CountLimit, infinity).

%% Fetch a contiguous range of log entries containing up to the specified
%% number of log entries or the specified maximum total number of bytes (based
%% on the byte sizes reported by the underlying log provider) starting at the
%% provided index. If log entries exist at the provided starting index, then
%% at least one log entry will be returned. When using a log view, only those
%% log entries that fall within the provided view will be returned.
-spec get(
    LogOrView :: log() | view(),
    Start :: log_index(),
    CountLimit :: non_neg_integer(),
    SizeLimit :: non_neg_integer() | infinity
) -> {ok, Entries :: [log_entry()]} | {error, term()}.
get(LogOrView, Start, CountLimit, SizeLimit) ->
    End = Start + CountLimit - 1,
    try fold(LogOrView, Start, End, SizeLimit, fun get_method/3, {Start, []}) of
        {ok, {_, EntriesRev}} ->
            {ok, lists:reverse(EntriesRev)};
        {error, Reason} ->
            {error, Reason}
    catch
        throw:{missing, Index} ->
            ?RAFT_LOG_WARNING(
                "[~0p] detected missing log entry ~0p while folding range ~0p ~~ ~0p",
                [log_name(LogOrView), Index, Start, End]
            ),
            {error, corruption}
    end.

-spec get_method(Index :: log_index(), Entry :: log_entry(), Acc) -> Acc when
    Acc :: {AccIndex :: log_index(), AccEntries :: [log_entry()]}.
get_method(Index, Entry, {Index, AccEntries}) ->
    {Index + 1, [Entry | AccEntries]};
get_method(_, _, {AccIndex, _}) ->
    throw({missing, AccIndex}).

-spec get_terms(LogOrView :: log() | view(), Start :: log_index(), CountLimit :: non_neg_integer()) ->
    {ok, Terms :: [wa_raft_log:log_term()]} | {error, term()}.
get_terms(LogOrView, Start, CountLimit) ->
    End = Start + CountLimit - 1,
    try fold_terms(LogOrView, Start, End, fun get_terms_method/3, {Start, []}) of
        {ok, {_, TermsRev}} ->
            {ok, lists:reverse(TermsRev)};
        {error, Reason} ->
            {error, Reason}
    catch
        throw:{missing, Index} ->
            ?RAFT_LOG_WARNING(
                "[~0p] detected missing log entry ~0p while folding range ~0p ~~ ~0p for terms",
                [log_name(LogOrView), Index, Start, End]
            ),
            {error, corruption}
    end.

-spec get_terms_method(Index :: log_index(), Terms :: log_term(), Acc) -> Acc when
    Acc :: {AccIndex :: log_index(), AccTerms :: [log_term()]}.
get_terms_method(Index, Entry, {Index, AccTerms}) ->
    {Index + 1, [Entry | AccTerms]};
get_terms_method(_, _, {AccIndex, _}) ->
    throw({missing, AccIndex}).

%% Produce a list of log entries in a format appropriate for inclusion within
%% a heartbeat to a peer containing up to the specified number of log entries
%% starting at the provided index. When using a log view, only those log
%% entries that fall within the provided view will be returned.
-spec entries(LogOrView :: log() | view(), Start :: log_index(), CountLimit :: non_neg_integer()) ->
    {ok, Entries :: [log_entry() | binary()]} | {error, term()}.
entries(LogOrView, First, Count) ->
    entries(LogOrView, First, Count, infinity).

%% Produce a list of log entries in a format appropriate for inclusion within
%% a heartbeat to a peer containing up to the specified number of log entries
%% or the specified maximum total number of bytes (based on the byte sizes
%% reported by the underlying log provider when returning log entries or the
%% byte size of each log entry binary when returning binaries) starting at the
%% provided index. If log entries exist at the provided starting index, then
%% at least one log entry will be returned. When using a log view, only those
%% log entries that fall within the provided view will be returned.
-spec entries(
    LogOrView :: log() | view(),
    Start :: log_index(),
    CountLimit :: non_neg_integer(),
    SizeLimit :: non_neg_integer() | infinity
) -> {ok, Entries :: [log_entry() | binary()]} | {error, term()}.
entries(LogOrView, Start, CountLimit, SizeLimit) ->
    App = app(LogOrView),
    Table = table(LogOrView),
    Provider = provider(LogOrView),
    End = Start + CountLimit - 1,
    try
        case erlang:function_exported(Provider, fold_binary, 6) andalso ?RAFT_LOG_HEARTBEAT_BINARY_ENTRIES(App, Table) of
            true -> fold_binary(LogOrView, Start, End, SizeLimit, fun entries_method/3, {Start, []});
            false -> fold(LogOrView, Start, End, SizeLimit, fun entries_method/3, {Start, []})
        end
    of
        {ok, {_, EntriesRev}} ->
            {ok, lists:reverse(EntriesRev)};
        {error, Reason} ->
            {error, Reason}
    catch
        throw:{missing, Index} ->
            ?RAFT_LOG_WARNING(
                "[~0p] detected missing log entry ~0p while folding range ~0p ~~ ~0p for heartbeat",
                [log_name(LogOrView), Index, Start, End]
            ),
            {error, corruption}
    end.

-spec entries_method(Index :: log_index(), Entry :: log_entry() | binary(), Acc) -> Acc when
    Acc :: {AccIndex :: log_index(), AccEntries :: [log_entry() | binary()]}.
entries_method(Index, Entry, {Index, AccEntries}) ->
    {Index + 1, [Entry | AccEntries]};
entries_method(_, _, {AccIndex, _}) ->
    throw({missing, AccIndex}).

-spec config(LogOrView :: log() | view()) -> {ok, Index :: log_index(), Config :: wa_raft_server:config()} | not_found.
config(#log_view{config = undefined}) ->
    not_found;
config(#log_view{first = First, config = {Index, _}}) when First > Index ->
    % After trims, it is possible that we have a cached config from before the start
    % of the log view. Don't return the cached config in this case.
    not_found;
config(#log_view{config = {Index, Config}}) ->
    {ok, Index, Config};
config(Log) ->
    Provider = provider(Log),
    case Provider:config(Log) of
        {ok, Index, Config} -> {ok, Index, wa_raft_server:normalize_config(Config)};
        Other -> Other
    end.

%%-------------------------------------------------------------------
%% APIs for managing logs and log data
%%-------------------------------------------------------------------

%% Open the specified log (registered name or pid) at the provided position.
%% If the log does not contain the provided position, then the log is reset
%% to include it. Otherwise, the log is opened as is and may contain entries
%% before and after the provided position.
-spec open(PidOrName :: pid() | log_name(), Position :: log_pos()) -> {ok, View :: view()} | {error, term()}.
open(PidOrName, Position) ->
    gen_server:call(PidOrName, {open, Position}, infinity).

%% Reset the log backing the provided log view to contain only the provided
%% position. The log entry data at the provided position will be 'undefined'.
-spec reset(View :: view(), Position :: log_pos()) -> {ok, NewView :: view()} | {error, term()}.
reset(#log_view{log = Log} = View, Position) ->
    gen_server:call(log_name(Log), {reset, Position, View}, infinity).

%% Truncate the log by deleting all log entries in the log at and after the
%% provided log index. This operation is required to delete all data for the
%% affected log indices.
-spec truncate(View :: view(), Index :: log_index()) -> {ok, NewView :: view()} | {error, term()}.
truncate(#log_view{log = Log} = View, Index) ->
    gen_server:call(log_name(Log), {truncate, Index, View}, infinity).

%% Trim the log by removing log entries before the provided log index.
%% This operation is not required to remove all data before the
%% provided log index immediately and can defer this work to future
%% trimming operations. The public API is synchronous so providers that
%% must rewrite durable segments can keep the in-memory view aligned with
%% what is actually persisted.
-spec trim(View :: view(), Index :: log_index()) -> {ok, NewView :: view()} | {error, term()}.
trim(#log_view{log = Log, first = First} = View, Index) ->
    case gen_server:call(log_name(Log), {trim, Index}, infinity) of
        ok ->
            {ok, View#log_view{first = max(Index, First)}};
        {error, Reason} ->
            {error, Reason}
    end.

%% Perform a batched trimming (rotate) of the underlying log according
%% to application environment configuration values.
-spec rotate(View :: view(), Index :: log_index()) -> {ok, NewView :: view()} | {error, term()}.
rotate(#log_view{log = #raft_log{application = App, table = Table}} = View, Index) ->
    % Current rotation configuration is based on two configuration values,
    % 'raft_max_log_records_per_file' which indicates after how many outstanding extra
    % log entries are in the log should we trim and 'raft_max_log_records' which
    % indicates how many additional log entries after the fully replicated index should
    % be considered not extraneous and be kept by rotation.
    Interval = ?RAFT_LOG_ROTATION_INTERVAL(App, Table),
    Keep = ?RAFT_LOG_ROTATION_KEEP(App, Table, Interval),
    rotate(View, Index, Interval, Keep).

%% Perform a batched trimming (rotate) of the underlying log where
%% we keep some number of log entries and only trigger trimming operations
%% every so often.
-spec rotate(View :: view(), Index :: log_index(), Interval :: pos_integer(), Keep :: non_neg_integer()) ->
    {ok, NewView :: view()} | {error, term()}.
rotate(#log_view{log = #raft_log{table = Table}, first = First} = View, Index, Interval, Keep) when Index - Keep - First >= Interval ->
    ?RAFT_COUNT(Table, 'log.rotate'),
    trim(View, Index - Keep);
rotate(#log_view{log = #raft_log{table = Table}} = View, _Index, _Interval, _Keep) ->
    ?RAFT_COUNT(Table, 'log.rotate'),
    {ok, View}.

%% Try to flush any underlying log data that is not yet on disk to disk.
-spec flush(LogOrView :: log() | view()) -> ok.
flush(#log_view{log = Log}) ->
    gen_server:cast(log_name(Log), flush);
flush(Log) ->
    gen_server:cast(log_name(Log), flush).

%%-------------------------------------------------------------------
%% Internal API
%%-------------------------------------------------------------------

%% Get the default name for the RAFT log server associated with the
%% provided RAFT partition.
-spec default_name(Table :: wa_raft:table(), Partition :: wa_raft:partition()) -> Name :: atom().
default_name(Table, Partition) ->
    % elp:ignore W0023 bounded atom, one per table/partition at startup
    binary_to_atom(<<"raft_log_", (atom_to_binary(Table))/binary, "_", (integer_to_binary(Partition))/binary>>).

%% Get the registered name for the RAFT log server associated with the
%% provided RAFT partition or the default name if no registration exists.
-spec registered_name(Table :: wa_raft:table(), Partition :: wa_raft:partition()) -> Name :: atom().
registered_name(Table, Partition) ->
    case wa_raft_part_sup:options(Table, Partition) of
        undefined -> default_name(Table, Partition);
        Options   -> Options#raft_options.log_name
    end.

-spec app(LogOrView :: log() | view()) -> App :: atom().
app(LogOrView) ->
    (log(LogOrView))#raft_log.application.

-spec table(LogOrView :: log() | view()) -> Table :: wa_raft:table().
table(LogOrView) ->
    (log(LogOrView))#raft_log.table.

-spec log(LogOrView :: log() | view()) -> Log :: log().
log(#log_view{log = Log}) ->
    Log;
log(#raft_log{} = Log) ->
    Log.

-spec log_name(LogOrView :: log() | view()) -> Name :: log_name().
log_name(#log_view{log = #raft_log{name = Name}}) ->
    Name;
log_name(#raft_log{name = Name}) ->
    Name.

-spec provider(LogOrView :: log() | view()) -> Provider :: module().
provider(#log_view{log = #raft_log{provider = Provider}}) ->
    Provider;
provider(#raft_log{provider = Provider}) ->
    Provider.

-spec refresh_config(View :: view()) -> NewView :: view().
refresh_config(#log_view{log = Log} = View) ->
    Provider = provider(Log),
    case Provider:config(Log) of
        {ok, Index, Config} ->
            View#log_view{config = {Index, wa_raft_server:normalize_config(Config)}};
        not_found ->
            View#log_view{config = undefined}
    end.

%%-------------------------------------------------------------------
%% gen_server Callbacks
%%-------------------------------------------------------------------

-spec init(Options :: #raft_options{}) -> {ok, State :: #log_state{}}.
init(#raft_options{application = Application, table = Table, partition = Partition, log_name = Name, log_module = Provider}) ->
    process_flag(trap_exit, true),

    Log = #raft_log{
       name = Name,
       application = Application,
       table = Table,
       partition = Partition,
       provider = Provider
    },
    ok = Provider:init(Log),

    {ok, #log_state{log = Log}}.

-spec handle_call(Request, From :: term(), State :: #log_state{}) ->
    {reply, Reply :: term(), NewState :: #log_state{}} |
    {noreply, NewState :: #log_state{}}
    when Request ::
        {open, Position :: log_pos()} |
        {reset, Position :: log_pos(), View :: view()} |
        {truncate, Index :: log_index(), View :: view()} |
        {trim, Index :: log_index()}.
handle_call({open, Position}, _From, State) ->
    {Reply, NewState} = handle_open(Position, State),
    {reply, Reply, NewState};
handle_call({reset, Position, View}, _From, State) ->
    case handle_reset(Position, View, State) of
        {ok, NewView, NewState} ->
            {reply, {ok, NewView}, NewState};
        {error, Reason} ->
            {reply, {error, Reason}, State}
    end;
handle_call({truncate, Index, View}, _From, State) ->
    case handle_truncate(Index, View, State) of
        {ok, NewView, NewState} ->
            {reply, {ok, NewView}, NewState};
        {error, Reason} ->
            {reply, {error, Reason}, State}
    end;
handle_call({trim, Index}, _From, #log_state{log = Log} = State) ->
    case handle_trim(Index, State) of
        {ok, NewState} ->
            {reply, ok, NewState};
        {error, Reason} ->
            ?RAFT_LOG_WARNING("[~p] failed to trim log due to ~p", [log_name(Log), Reason]),
            {reply, {error, Reason}, State}
    end;
handle_call(Request, From, #log_state{log = Log} = State) ->
    ?RAFT_LOG_NOTICE("[~p] got unrecognized call ~p from ~p", [log_name(Log), Request, From]),
    {noreply, State}.

-spec handle_cast(Request, State :: #log_state{}) -> {noreply, NewState :: #log_state{}}
    when Request :: flush | {trim, Index :: log_index()}.
handle_cast(flush, #log_state{log = #raft_log{table = Table} = Log} = State) ->
    ?RAFT_COUNT(Table, 'log.flush'),
    Provider = provider(Log),
    Provider:flush(Log),
    {noreply, State};
handle_cast({trim, Index}, #log_state{log = Log} = State) ->
    case handle_trim(Index, State) of
        {ok, NewState} ->
            {noreply, NewState};
        {error, Reason} ->
            ?RAFT_LOG_WARNING("[~p] failed to trim log due to ~p", [log_name(Log), Reason]),
            {noreply, State}
    end;
handle_cast(Request, #log_state{log = Log} = State) ->
    ?RAFT_LOG_NOTICE("[~p] got unrecognized cast ~p", [log_name(Log), Request]),
    {noreply, State}.

-spec terminate(Reason :: term(), State :: #log_state{}) -> term().
terminate(Reason, #log_state{log = Log, state = State}) ->
    Provider = provider(Log),
    ?RAFT_LOG_NOTICE("[~p] terminating due to ~p", [Log, Reason]),
    State =/= ?PROVIDER_NOT_OPENED andalso Provider:close(Log, State).

%%-------------------------------------------------------------------
%% RAFT Log Server Logic
%%-------------------------------------------------------------------

-spec handle_open(Position :: log_pos(), State :: #log_state{}) ->
    {{ok, NewView :: view()} | {error, Reason :: term()}, NewState :: #log_state{}}.
handle_open(#raft_log_pos{index = Index, term = Term} = Position,
            #log_state{log = #raft_log{name = Name, table = Table, provider = Provider} = Log} = State0) ->
    ?RAFT_COUNT(Table, 'log.open'),
    ?RAFT_LOG_NOTICE("[~p] opening log at position ~p:~p", [Name, Index, Term]),
    case Provider:open(Log) of
        {ok, ProviderState} ->
            Action = case Provider:get(Log, Index) of
                {ok, {Term, _Op}} ->
                    none;
                {ok, {MismatchTerm, _Op}} ->
                    ?RAFT_LOG_WARNING(
                        "[~p] resetting log due to expecting term ~p at ~p but log contains term ~p",
                        [Name, Term, Index, MismatchTerm]
                    ),
                    reset;
                not_found ->
                    action_for_missing_open_position(Log, Provider, Position);
                Other ->
                    {failed, Other}
            end,

            State1 = State0#log_state{state = ProviderState},
            View0 = #log_view{log = Log},
            case Action of
                none ->
                    ?RAFT_COUNT(Table, 'log.open.normal'),
                    View1 = case Provider:first_index(Log) of
                        undefined ->
                            ?RAFT_LOG_WARNING(
                                "[~p] opened log normally but the first index was not set",
                                [Name]
                            ),
                            View0;
                        FirstIndex ->
                            View0#log_view{first = FirstIndex}
                    end,
                    View2 = case Provider:last_index(Log) of
                        undefined ->
                            ?RAFT_LOG_WARNING(
                                "[~p] opened log normally but the last index was not set",
                                [Name]
                            ),
                            View1;
                        LastIndex ->
                            View1#log_view{last = LastIndex}
                    end,
                    View3 = refresh_config(View2),
                    {{ok, View3}, State1};
                reset ->
                    ?RAFT_COUNT(Table, 'log.open.reset'),
                    case handle_reset(Position, View0, State1) of
                        {ok, View1, State2} ->
                            {{ok, View1}, State2};
                        {error, Reason} ->
                            ?RAFT_COUNT(Table, 'log.open.reset.error'),
                            {{error, Reason}, State1}
                    end;
                {failed, Return} ->
                    ?RAFT_COUNT(Table, 'log.open.error'),
                    {Return, State1}
            end;
        {error, Reason} ->
            ?RAFT_COUNT(Table, 'log.open.error'),
            {{error, Reason}, State0#log_state{state = ?PROVIDER_NOT_OPENED}}
    end.

-spec action_for_missing_open_position(Log :: log(), Provider :: module(), Position :: log_pos()) ->
    none | reset | {failed, term()}.
action_for_missing_open_position(Log, Provider, #raft_log_pos{index = 0, term = 0}) ->
    case Provider:first_index(Log) of
        First when is_integer(First), First > 0 ->
            none;
        undefined ->
            reset;
        {error, _Reason} = Error ->
            {failed, Error};
        _Other ->
            reset
    end;
action_for_missing_open_position(_Log, _Provider, _Position) ->
    reset.

-spec handle_reset(Position :: log_pos(), View :: view(), State :: #log_state{}) ->
    {ok, NewView :: view(), NewState :: #log_state{}} | {error, Reason :: term()}.
handle_reset(_Position, _View, #log_state{state = ?PROVIDER_NOT_OPENED}) ->
    {error, not_open};
handle_reset(#raft_log_pos{index = 0, term = Term}, _View, #log_state{log = Log}) when Term =/= 0 ->
    ?RAFT_LOG_ERROR("[~p] rejects reset to index 0 with non-zero term ~p", [log_name(Log), Term]),
    {error, invalid_position};
handle_reset(#raft_log_pos{index = Index, term = Term} = Position, View0,
             #log_state{log = #raft_log{table = Table} = Log, state = ProviderState} = State0) ->
    ?RAFT_COUNT(Table, 'log.reset'),
    ?RAFT_LOG_NOTICE("[~p] resetting log to position ~p:~p", [log_name(Log), Index, Term]),
    Provider = provider(Log),
    case Provider:reset(Log, Position, ProviderState) of
        {ok, NewProviderState} ->
            View1 = View0#log_view{first = Index, last = Index, config = undefined},
            State1 = State0#log_state{state = NewProviderState},
            {ok, View1, State1};
        {error, Reason} ->
            ?RAFT_COUNT(Table, 'log.reset.error'),
            {error, Reason}
    end.

-spec handle_truncate(Index :: log_index(), View :: view(), State :: #log_state{}) ->
    {ok, NewView :: view(), NewState :: #log_state{}} | {error, Reason :: term()}.
handle_truncate(_Index, _View, #log_state{state = ?PROVIDER_NOT_OPENED}) ->
    {error, not_open};
handle_truncate(Index, #log_view{first = First}, #log_state{log = Log}) when Index =< First ->
    ?RAFT_LOG_ERROR("[~p] rejects log deletion by truncation to ~p for log starting at ~p", [log_name(Log), Index, First]),
    {error, invalid_position};
handle_truncate(Index, #log_view{last = Last} = View0, #log_state{log = #raft_log{table = Table} = Log, state = ProviderState} = State0) ->
    ?RAFT_COUNT(Table, 'log.truncate'),
    ?RAFT_LOG_NOTICE("[~p] truncating log from ~p to past ~p", [log_name(Log), Last, Index]),
    Provider = provider(Log),
    case Provider:truncate(Log, Index, ProviderState) of
        {ok, NewProviderState} ->
            View1 = View0#log_view{last = min(Last, Index - 1)},
            View2 = refresh_config(View1),
            State1 = State0#log_state{state = NewProviderState},
            {ok, View2, State1};
        {error, Reason} ->
            ?RAFT_COUNT(Table, 'log.truncate.error'),
            {error, Reason}
    end.

%% The public wa_raft_log:trim/2 call advances the view only after this provider
%% operation succeeds. Returning an error here keeps the old view and lets the
%% server retry rotation after the storage fault clears.
-spec handle_trim(Index :: log_index(), State :: #log_state{}) ->
    {ok, NewState :: #log_state{}} | {error, Reason :: term()}.
handle_trim(_Index, #log_state{state = ?PROVIDER_NOT_OPENED}) ->
    {error, not_open};
handle_trim(Index, #log_state{log = #raft_log{table = Table} = Log, state = ProviderState} = State) ->
    ?RAFT_COUNT(Table, 'log.trim'),
    ?RAFT_LOG_DEBUG("[~p] trimming log to ~p", [log_name(Log), Index]),
    Provider = provider(Log),
    case Provider:trim(Log, Index, ProviderState) of
        {ok, NewProviderState} ->
            {ok, State#log_state{state = NewProviderState}};
        {error, Reason} ->
            ?RAFT_COUNT(Table, 'log.trim.error'),
            {error, Reason}
    end.