%%% Copyright (c) Meta Platforms, Inc. and affiliates. All rights reserved.
%%%
%%% This source code is licensed under the Apache 2.0 license found in
%%% the LICENSE file in the root directory of this source tree.
%%%
%%% This module is an implementation of a completely in-memory RAFT
%%% log provider that uses ETS as a backing store for the log data.
%%% This module is only suitable as a log provider for an fully
%%% in-memory RAFT cluster and should not be used when any durability
%%% guarantees are required against node shutdown.
-module(wa_raft_log_ets).
-compile(warn_missing_spec_all).
-behaviour(wa_raft_log).
%% RAFT log provider interface for accessing log data
-export([
first_index/1,
last_index/1,
fold/6,
fold_terms/5,
get/2,
term/2,
config/1
]).
%% RAFT log provider interface for writing new log data
-export([
append/4
]).
%% RAFT log provider interface for managing underlying RAFT log
-export([
init/1,
open/1,
close/2,
reset/3,
truncate/3,
trim/3,
flush/1
]).
-include_lib("wa_raft/include/wa_raft.hrl").
-type state() :: undefined.
%%-------------------------------------------------------------------
%% RAFT log provider interface for accessing log data
%%-------------------------------------------------------------------
-spec first_index(Log :: wa_raft_log:log()) -> undefined | wa_raft_log:log_index().
first_index(#raft_log{name = Name}) ->
case ets:first(Name) of
'$end_of_table' -> undefined;
Key -> Key
end.
-spec last_index(Log :: wa_raft_log:log()) -> undefined | wa_raft_log:log_index().
last_index(#raft_log{name = Name}) ->
case ets:last(Name) of
'$end_of_table' -> undefined;
Key -> Key
end.
-spec fold(Log :: wa_raft_log:log(),
Start :: wa_raft_log:log_index() | '$end_of_table',
End :: wa_raft_log:log_index(),
SizeLimit :: non_neg_integer() | infinity,
Func :: fun((Index :: wa_raft_log:log_index(), Size :: non_neg_integer(), Entry :: wa_raft_log:log_entry(), Acc) -> Acc),
Acc) -> {ok, Acc}.
fold(Log, Start, End, SizeLimit, Func, Acc) ->
fold_impl(Log, Start, End, 0, SizeLimit, Func, Acc).
-spec fold_impl(
Log :: wa_raft_log:log(),
Start :: wa_raft_log:log_index() | '$end_of_table',
End :: wa_raft_log:log_index(),
Size :: non_neg_integer(),
SizeLimit :: non_neg_integer() | infinity,
Func :: fun((Index :: wa_raft_log:log_index(), Size :: non_neg_integer(), Entry :: wa_raft_log:log_entry(), Acc) -> Acc),
Acc
) -> {ok, Acc}.
fold_impl(_Log, Start, End, Size, SizeLimit, _Func, Acc) when End < Start; Size >= SizeLimit ->
{ok, Acc};
fold_impl(#raft_log{name = Name} = Log, Start, End, Size, SizeLimit, Func, Acc) ->
case ets:lookup(Name, Start) of
[{Start, Entry}] ->
EntrySize = erlang:external_size(Entry),
fold_impl(Log, ets:next(Name, Start), End, Size + EntrySize, SizeLimit, Func, Func(Start, EntrySize, Entry, Acc));
[] ->
fold_impl(Log, ets:next(Name, Start), End, Size, SizeLimit, Func, Acc)
end.
-spec fold_terms(Log :: wa_raft_log:log(),
Start :: wa_raft_log:log_index() | '$end_of_table',
End :: wa_raft_log:log_index(),
Func :: fun((Index :: wa_raft_log:log_index(), Entry :: wa_raft_log:log_term(), Acc) -> Acc),
Acc) -> {ok, Acc}.
fold_terms(Log, Start, End, Func, Acc) ->
fold_terms_impl(Log, Start, End, Func, Acc).
-spec fold_terms_impl(
Log :: wa_raft_log:log(),
Start :: wa_raft_log:log_index() | '$end_of_table',
End :: wa_raft_log:log_index(),
Func :: fun((Index :: wa_raft_log:log_index(), Term :: wa_raft_log:log_term(), Acc) -> Acc),
Acc
) -> {ok, Acc}.
fold_terms_impl(_Log, Start, End, _Func, Acc) when End < Start ->
{ok, Acc};
fold_terms_impl(#raft_log{name = Name} = Log, Start, End, Func, Acc) ->
case ets:lookup(Name, Start) of
[{Start, {Term, _Op}}] ->
fold_terms_impl(Log, ets:next(Name, Start), End, Func, Func(Start, Term, Acc));
[] ->
fold_terms_impl(Log, ets:next(Name, Start), End, Func, Acc)
end.
-spec get(Log :: wa_raft_log:log(), Index :: wa_raft_log:log_index()) -> {ok, Entry :: wa_raft_log:log_entry()} | not_found.
get(#raft_log{name = Name}, Index) ->
case ets:lookup(Name, Index) of
[{Index, Entry}] -> {ok, Entry};
[] -> not_found
end.
-spec term(Log :: wa_raft_log:log(), Index :: wa_raft_log:log_index()) -> {ok, Term :: wa_raft_log:log_term()} | not_found.
term(Log, Index) ->
case get(Log, Index) of
{ok, {Term, _Op}} -> {ok, Term};
not_found -> not_found
end.
-spec config(Log :: wa_raft_log:log()) -> {ok, Index :: wa_raft_log:log_index(), Entry :: wa_raft_server:config()} | not_found.
config(#raft_log{name = Name}) ->
case ets:select_reverse(Name, [{{'$1', {'_', {'_', {config, '$2'}}}}, [], [{{'$1', '$2'}}]}], 1) of
{[{Index, Config}], _Cont} -> {ok, Index, Config};
_ -> not_found
end.
%%-------------------------------------------------------------------
%% RAFT log provider interface for writing new log data
%%-------------------------------------------------------------------
-spec append(View :: wa_raft_log:view(), Entries :: [wa_raft_log:log_entry() | binary()], Mode :: strict | relaxed, Priority :: wa_raft_acceptor:priority()) ->
ok | {error, term()}.
append(View, Entries, _Mode, _Priority) ->
Name = wa_raft_log:log_name(View),
Last = wa_raft_log:last_index(View),
case append_decode(Last + 1, Entries) of
{ok, Records} ->
true = ets:insert(Name, Records),
ok;
{error, _Reason} = Error ->
Error
end.
-spec append_decode(Index :: wa_raft_log:log_index(), Entries :: [wa_raft_log:log_entry() | binary()]) ->
{ok, [{wa_raft_log:log_index(), wa_raft_log:log_entry()}]} | {error, term()}.
append_decode(Index, Entries) ->
append_decode(Index, Entries, []).
-spec append_decode(
Index :: wa_raft_log:log_index(),
Entries :: [wa_raft_log:log_entry() | binary()],
Acc :: [{wa_raft_log:log_index(), wa_raft_log:log_entry()}]
) -> {ok, [{wa_raft_log:log_index(), wa_raft_log:log_entry()}]} | {error, term()}.
append_decode(_Index, [], Acc) ->
{ok, lists:reverse(Acc)};
append_decode(Index, [Entry | Entries], Acc) ->
case decode_append_entry(Entry) of
{ok, NewEntry} ->
append_decode(Index + 1, Entries, [{Index, NewEntry} | Acc]);
{error, _Reason} = Error ->
Error
end.
-spec decode_append_entry(Entry :: wa_raft_log:log_entry() | binary()) ->
{ok, wa_raft_log:log_entry()} | {error, term()}.
decode_append_entry(Entry) when is_binary(Entry) ->
try binary_to_term(Entry, [safe]) of
Decoded -> {ok, Decoded}
catch
_:Reason -> {error, {bad_entry_term, Reason}}
end;
decode_append_entry(Entry) ->
{ok, Entry}.
%%-------------------------------------------------------------------
%% RAFT log provider interface for managing underlying RAFT log
%%-------------------------------------------------------------------
-spec init(Log :: wa_raft_log:log()) -> ok.
init(#raft_log{name = LogName}) ->
ets:new(LogName, [ordered_set, public, named_table]),
ok.
-spec open(Log :: wa_raft_log:log()) -> {ok, State :: state()}.
open(_Log) ->
{ok, undefined}.
-spec close(Log :: wa_raft_log:log(), State :: state()) -> ok.
close(_Log, _State) ->
ok.
-spec reset(Log :: wa_raft_log:log(), Position :: wa_raft_log:log_pos(), State :: state()) ->
{ok, NewState :: state()}.
reset(#raft_log{name = Name}, #raft_log_pos{index = Index, term = Term}, State) ->
true = ets:delete_all_objects(Name),
true = ets:insert(Name, {Index, {Term, undefined}}),
{ok, State}.
-spec truncate(Log :: wa_raft_log:log(), Index :: wa_raft_log:log_index() | '$end_of_table', State :: state()) ->
{ok, NewState :: state()}.
truncate(_Log, '$end_of_table', State) ->
{ok, State};
truncate(#raft_log{name = Name} = Log, Index, State) ->
true = ets:delete(Name, Index),
truncate(Log, ets:next(Name, Index), State).
-spec trim(Log :: wa_raft_log:log(), Index :: wa_raft_log:log_index(), State :: state()) ->
{ok, NewState :: state()}.
trim(Log, Index, State) ->
trim_impl(Log, Index - 1),
{ok, State}.
-spec trim_impl(Log :: wa_raft_log:log(), Index :: wa_raft_log:log_index() | '$end_of_table') -> ok.
trim_impl(_Log, '$end_of_table') ->
ok;
trim_impl(#raft_log{name = Name} = Log, Index) ->
true = ets:delete(Name, Index),
trim_impl(Log, ets:prev(Name, Index)).
-spec flush(Log :: wa_raft_log:log()) -> ok.
flush(_Log) ->
ok.