Skip to main content

src/wa_raft_storage_ets.erl

%%% Copyright (c) Meta Platforms, Inc. and affiliates. All rights reserved.
%%%
%%% This source code is licensed under the Apache 2.0 license found in
%%% the LICENSE file in the root directory of this source tree.
%%%
%%% An example implementation of the RAFT storage provider behaviour that
%%% uses ETS as a backing store. This implementation is for demonstration
%%% purposes only and should not be used in actual applications.

-module(wa_raft_storage_ets).
-compile(warn_missing_spec_all).
-behaviour(wa_raft_storage).

-export([
    storage_open/2,
    storage_close/1,
    storage_label/1,
    storage_position/1,
    storage_config/1,
    storage_apply/3,
    storage_apply/4,
    storage_apply_config/3,
    storage_read/3,
    storage_create_snapshot/2,
    storage_create_witness_snapshot/2,
    storage_open_snapshot/3,
    storage_make_empty_snapshot/5
]).

-include_lib("wa_raft/include/wa_raft.hrl").

%% Options used for the ETS table
-define(OPTIONS, [set, public, {read_concurrency, true}, {write_concurrency, true}]).

%% Filename used for the actual ETS table file in a snapshot
-define(SNAPSHOT_FILENAME, "data").

%% Tag used in keys for metadata stored on the behalf of RAFT
-define(METADATA_TAG, '$metadata').
%% Tag used for label metadata stored on behalf of RAFT.
-define(LABEL_TAG, '$label').
%% Tag used for recording the current storage position
-define(POSITION_TAG, '$position').
%% Tag used for tracking if the current storage is incomplete.
-define(INCOMPLETE_TAG, '$incomplete').

-record(state, {
    name :: atom(),
    table :: wa_raft:table(),
    partition :: wa_raft:partition(),
    self :: #raft_identity{},
    storage :: ets:table()
}).

-spec storage_open(#raft_options{}, file:filename()) -> #state{}.
storage_open(#raft_options{table = Table, partition = Partition, self = Self, storage_name = Name}, _RootDir) ->
    Storage = ets:new(Name, ?OPTIONS),
    #state{name = Name, table = Table, partition = Partition, self = Self, storage = Storage}.

-spec storage_close(#state{}) -> ok.
storage_close(#state{storage = Storage}) ->
    true = ets:delete(Storage),
    ok.

-spec storage_position(#state{}) -> wa_raft_log:log_pos().
storage_position(#state{storage = Storage}) ->
    ets:lookup_element(Storage, ?POSITION_TAG, 2, #raft_log_pos{}).

-spec storage_label(#state{}) -> {ok, Label :: wa_raft_label:label()}.
storage_label(#state{storage = Storage}) ->
    case ets:lookup(Storage, ?LABEL_TAG) of
        [{_, Label}] -> {ok, Label};
        []           -> {ok, undefined}
    end.

-spec storage_config(#state{}) -> {ok, wa_raft_log:log_pos(), wa_raft_server:config()} | undefined.
storage_config(#state{storage = Storage}) ->
    case ets:lookup(Storage, {?METADATA_TAG, config}) of
        [{_, {Version, Value}}] -> {ok, Version, Value};
        []                      -> undefined
    end.

-spec storage_incomplete(#state{}) -> boolean().
storage_incomplete(#state{storage = Storage}) ->
    ets:lookup_element(Storage, ?INCOMPLETE_TAG, 2, false).

-spec storage_apply(Command :: wa_raft_acceptor:command(), Position :: wa_raft_log:log_pos(), Label :: wa_raft_label:label(), Storage :: #state{}) -> {ok, #state{}}.
storage_apply(Command, Position, Label, #state{storage = Storage} = State) ->
    true = ets:insert(Storage, {?LABEL_TAG, Label}),
    storage_apply(Command, Position, State).

-spec storage_apply(Command :: wa_raft_acceptor:command(), Position :: wa_raft_log:log_pos(), Storage :: #state{}) -> {ok, #state{}}.
storage_apply(noop, Position, #state{storage = Storage} = State) ->
    true = ets:insert(Storage, {?POSITION_TAG, Position}),
    {ok, State};
storage_apply(noop_omitted, Position, #state{storage = Storage} = State) ->
    true = ets:insert(Storage, [{?INCOMPLETE_TAG, true}, {?POSITION_TAG, Position}]),
    {ok, State};
storage_apply({write, _Table, Key, Value}, Position, #state{storage = Storage} = State) ->
    true = ets:insert(Storage, [{Key, Value}, {?POSITION_TAG, Position}]),
    {ok, State};
storage_apply({delete, _Table, Key}, Position, #state{storage = Storage} = State) ->
    true = ets:delete(Storage, Key),
    true = ets:insert(Storage, {?POSITION_TAG, Position}),
    {ok, State}.

-spec storage_apply_config(
    Config :: wa_raft_server:config(),
    LogPos :: wa_raft_log:log_pos(),
    State :: #state{}
) -> {ok | {error, Reason :: term()}, #state{}}.
storage_apply_config(Config, LogPos, State) ->
    storage_check_config(Config, State),
    storage_apply_config(Config, LogPos, LogPos, State).

-spec storage_apply_config(
    Config :: wa_raft_server:config(),
    ConfigPos :: wa_raft_log:log_pos(),
    LogPos :: wa_raft_log:log_pos(),
    State :: #state{}
) -> {ok | {error, Reason :: term()}, #state{}}.
storage_apply_config(Config, ConfigPos, LogPos, #state{storage = Storage} = State) ->
    true = ets:insert(Storage, [{{?METADATA_TAG, config}, {ConfigPos, Config}}, {?POSITION_TAG, LogPos}]),
    {ok, State}.

-spec storage_read(Command :: wa_raft_acceptor:command(), Position :: wa_raft_log:log_pos(), State :: #state{}) -> ok | {ok, Value :: dynamic()} | not_found.
storage_read(noop, _Position, #state{}) ->
    ok;
storage_read({read, _Table, Key}, _Position, #state{storage = Storage}) ->
    case ets:lookup(Storage, Key) of
        [{_, Value}] -> {ok, Value};
        []           -> not_found
    end.

-spec storage_create_snapshot(file:filename(), #state{}) -> ok | {error, Reason :: term()}.
storage_create_snapshot(SnapshotPath, #state{storage = Storage}) ->
    case filelib:ensure_path(SnapshotPath) of
        ok              -> ets:tab2file(Storage, filename:join(SnapshotPath, ?SNAPSHOT_FILENAME));
        {error, Reason} -> {error, Reason}
    end.

-spec storage_create_witness_snapshot(file:filename(), #state{}) -> ok | {error, Reason :: term()}.
storage_create_witness_snapshot(SnapshotPath, #state{name = Name, table = Table, partition = Partition, self = Self} = State) ->
    {ok, ConfigPosition, Config} = storage_config(State),
    SnapshotPosition = storage_position(State),
    storage_make_empty_snapshot(Name, Table, Partition, Self, SnapshotPath, SnapshotPosition, Config, ConfigPosition, #{}).

-spec storage_open_snapshot(file:filename(), wa_raft_log:log_pos(), #state{}) -> {ok, #state{}} | {error, Reason :: term()}.
storage_open_snapshot(SnapshotPath, SnapshotPosition, #state{storage = Storage} = State) ->
    SnapshotData = filename:join(SnapshotPath, ?SNAPSHOT_FILENAME),
    case ets:file2tab(SnapshotData) of
        {ok, NewStorage} ->
            case ets:lookup_element(NewStorage, ?POSITION_TAG, 2, #raft_log_pos{}) of
                SnapshotPosition ->
                    NewState = State#state{storage = NewStorage},
                    storage_check_config(NewState),
                    try ets:delete(Storage) catch _:_ -> ok end,
                    {ok, NewState};
                _IncorrectPosition ->
                    try ets:delete(NewStorage) catch _:_ -> ok end,
                    {error, bad_position}
            end;
        {error, Reason} ->
            {error, Reason}
    end.

-spec storage_check_config(#state{}) -> ok.
storage_check_config(State) ->
    case storage_config(State) of
        {ok, _, Config} -> storage_check_config(Config, State);
        undefined -> ok
    end.

-spec storage_check_config(wa_raft_server:config(), #state{}) -> ok.
storage_check_config(Config, #state{self = Self} = State) ->
    case storage_incomplete(State) andalso wa_raft_server:is_data_replica(Self, Config) of
        true -> error(invalid_incomplete_replica);
        false -> ok
    end.

-spec storage_make_empty_snapshot(#raft_options{}, file:filename(), wa_raft_log:log_pos(), wa_raft_server:config(), dynamic()) -> ok | {error, Reason :: term()}.
storage_make_empty_snapshot(#raft_options{table = Table, partition = Partition, self = Self, storage_name = Name}, SnapshotPath, SnapshotPosition, Config, Data) ->
    storage_make_empty_snapshot(Name, Table, Partition, Self, SnapshotPath, SnapshotPosition, Config, SnapshotPosition, Data).

-spec storage_make_empty_snapshot(atom(), wa_raft:table(), wa_raft:partition(), #raft_identity{}, file:filename(), wa_raft_log:log_pos(), wa_raft_server:config(), wa_raft_log:log_pos(), dynamic()) -> ok | {error, Reason :: term()}.
storage_make_empty_snapshot(Name, Table, Partition, Self, SnapshotPath, SnapshotPosition, Config, ConfigPosition, _Data) ->
    Storage = ets:new(Name, ?OPTIONS),
    State = #state{name = Name, table = Table, partition = Partition, self = Self, storage = Storage},
    storage_apply_config(Config, ConfigPosition, SnapshotPosition, State),
    storage_create_snapshot(SnapshotPath, State).