Skip to main content

src/wa_raft_durable_state.erl

%%% Copyright (c) Meta Platforms, Inc. and affiliates. All rights reserved.
%%%
%%% This source code is licensed under the Apache 2.0 license found in
%%% the LICENSE file in the root directory of this source tree.
%%%
%%% This module implements functions for storing / loading persistent state.

-module(wa_raft_durable_state).
-compile(warn_missing_spec_all).

-include_lib("wa_raft/include/wa_raft.hrl").
-include_lib("wa_raft/include/wa_raft_logger.hrl").

-export([
    load/1,
    store/1,
    sync/1
]).

-spec load(StateIn :: #raft_state{}) -> {ok, StateOut :: #raft_state{}} | no_state | {error, Reason :: term()}.
load(#raft_state{name = Name, partition_path = PartitionPath} = State) ->
    StateItems = [
        {current_term,   fun is_integer/1, fun (V, S) -> S#raft_state{current_term = V} end,   required},
        {voted_for,      fun is_atom/1,    fun (V, S) -> S#raft_state{voted_for = V} end,      required},
        {disable_reason, undefined,        fun (V, S) -> S#raft_state{disable_reason = V} end, undefined}
    ],
    StateFile = filename:join(PartitionPath, ?STATE_FILE_NAME),
    case file:consult(StateFile) of
        {ok, [{crc, SavedCRC} | StateTerms]} ->
            case erlang:crc32(term_to_binary(StateTerms, [{minor_version, 1}, deterministic])) of
                SavedCRC ->
                    try
                        {ok, lists:foldl(
                            fun ({Item, Validator, Updater, Default}, StateN) ->
                                    case proplists:lookup(Item, StateTerms) of
                                        none when Default =:= required ->
                                            ?RAFT_LOG_ERROR("~p read state file but cannot find ~p.", [Name, Item]),
                                            throw({error, {missing, Item}});
                                        none ->
                                            Updater(Default, StateN);
                                        {Item, Value} ->
                                            case Validator =:= undefined orelse Validator(Value) of
                                                true ->
                                                    Updater(Value, StateN);
                                                false ->
                                                    ?RAFT_LOG_ERROR("~p read state file but ~p has an invalid value `~p`.", [Name, Item, Value]),
                                                    throw({error, {invalid, Item}})
                                            end
                                    end
                            end, State, StateItems)}
                    catch
                        throw:{error, Reason} -> {error, Reason}
                    end;
                InvalidCRC ->
                    ?RAFT_LOG_ERROR("~p read state file but CRCs did not match. (saved crc: ~p, computed crc: ~p)", [Name, SavedCRC, InvalidCRC]),
                    {error, invalid_crc}
            end;
        {ok, _} ->
            ?RAFT_LOG_ERROR("~p read state file but no CRC was found", [Name]),
            {error, no_crc};
        {error, enoent} ->
            ?RAFT_LOG_NOTICE("~p is not loading non-existent state file.", [Name]),
            no_state;
        {error, Reason} ->
            ?RAFT_LOG_ERROR("~p could not read state file due to ~p.", [Name, Reason]),
            {error, Reason}
    end.

-spec store(#raft_state{}) -> ok | {error, Reason :: term()}.
store(#raft_state{name = Name, table = Table, partition_path = PartitionPath, current_term = CurrentTerm, voted_for = VotedFor, disable_reason = DisableReason}) ->
    StateList = [
        {current_term, CurrentTerm},
        {voted_for, VotedFor},
        {disable_reason, DisableReason}
    ],
    StateListWithCRC = [{crc, erlang:crc32(term_to_binary(StateList, [{minor_version, 1}, deterministic]))} | StateList],
    StateIO = [io_lib:format("~p.~n", [Term]) || Term <- StateListWithCRC],
    StateFile = filename:join(PartitionPath, ?STATE_FILE_NAME),
    StateFileTemp = [StateFile, ".temp"],
    case filelib:ensure_dir(StateFile) of
        ok ->
            case prim_file:write_file(StateFileTemp, StateIO) of
                ok ->
                    case file:rename(StateFileTemp, StateFile) of
                        ok ->
                            ok;
                        {error, Reason} ->
                            ?RAFT_COUNT(Table, {'server.persist_state.error.rename', Reason}),
                            ?RAFT_LOG_ERROR("~p failed to rename temporary state file due to ~p.", [Name, Reason]),
                            {error, {rename, Reason}}
                    end;
                {error, Reason} ->
                    ?RAFT_COUNT(Table, {'server.persist_state.error.write', Reason}),
                    ?RAFT_LOG_ERROR("~p failed to write current state to temporary file due to ~p.", [Name, Reason]),
                    {error, {write, Reason}}
            end;
        {error, Reason} ->
            ?RAFT_COUNT(Table, {'server.persist_state.error.ensure_dir', Reason}),
            ?RAFT_LOG_ERROR("~p failed to ensure directory exists due to ~p.", [Name, Reason]),
            {error, {ensure_dir, Reason}}
    end.

-spec sync(StateIn :: #raft_state{}) -> ok.
sync(#raft_state{partition_path = PartitionPath}) ->
    StateFile = filename:join(PartitionPath, ?STATE_FILE_NAME),
    case prim_file:open(StateFile, [read, binary]) of
        {ok, Fd} ->
            prim_file:sync(Fd),
            prim_file:close(Fd),
            ok;
        _ ->
            ok
    end.