src/khepri_cluster.erl

%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright © 2021-2022 VMware, Inc. or its affiliates.  All rights reserved.
%%

%% @doc Khepri service and cluster management API.
%%
%% This module provides the public API for the service and cluster management.
%% For convenience, some functions of this API are repeated in the {@link
%% khepri} module for easier access.
%%
%% == The Khepri store and the Ra cluster ==
%%
%% A Khepri store is a Ra server inside a Ra cluster. The Khepri store and the
%% Ra cluster share the same name in fact. The only constraint is that the name
%% must be an atom, even though Ra accepts other Erlang types as cluster names.
%%
%% By default, Khepri uses `khepri' as the store ID (and thus Ra cluster name).
%% This default can be overridden using an argument to the `start()' functions
%% or the `default_store_id' application environment variable.
%%
%% Examples:
%% <ul>
%% <li>Use the default Khepri store ID:
%% <pre>{ok, khepri} = khepri:start().</pre></li>
%% <li>Override the default store ID using an argument:
%% <pre>{ok, my_store} = khepri:start("/var/lib/khepri", my_store).</pre></li>
%% <li>Override the default store ID using an application environment variable:
%% <pre>ok = application:set_env(
%%        khepri, default_store_id, my_store, [{persistent, true}]),
%%
%% {ok, my_store} = khepri:start().</pre></li>
%% </ul>
%%
%% == The data directory and the Ra system ==
%%
%% A Ra server relies on a Ra system to provide various functions and to
%% configure the directory where the data should be stored on disk.
%%
%% By default, Khepri will configure its own Ra system to write data under
%% `khepri#Nodename' in the current working directory, where `Nodename' is
%% the name of the Erlang node.
%%
%% ```
%% {ok, StoreId} = khepri:start().
%%
%% %% If the Erlang node was started without distribution (the default), the
%% %% statement above will start a Ra system called like the store (`khepri')
%% %% and will use the `khepri#nonode@nohost' directory.
%% '''
%%
%% The default data directory or Ra system name can be overridden using an
%% argument to the `start()' or the `default_ra_system' application environment
%% variable. Both a directory (string or binary) or the name of an already
%% running Ra system are accepted.
%%
%% Examples:
%% <ul>
%% <li>Override the default with the name of a running Ra system using an
%% argument:
%% <pre>{ok, StoreId} = khepri:start(my_ra_system).</pre></li>
%% <li>Override the default data directory using an application environment
%% variable:
%% <pre>ok = application:set_env(
%%        khepri, default_ra_system, "/var/lib/khepri", [{persistent, true}]),
%%
%% {ok, StoreId} = khepri:start().</pre></li>
%% </ul>
%%
%% Please refer to <a href="https://github.com/rabbitmq/ra">Ra
%% documentation</a> to learn more about Ra systems and Ra clusters.
%%
%% == Managing Ra cluster members ==
%%
%% A Khepri/Ra cluster can be expanded by telling a node to join a remote
%% cluster. Note that the Khepri store/Ra server to add to the cluster must run
%% before it can join.
%%
%% ```
%% %% Start the local Khepri store.
%% {ok, StoreId} = khepri:start().
%%
%% %% Join a remote cluster.
%% ok = khepri_cluster:join(RemoteNode).
%% '''
%%
%% To remove the local Khepri store node from the cluster, it must be reset.
%%
%% ```
%% %% Start the local Khepri store.
%% ok = khepri_cluster:reset().
%% '''

-module(khepri_cluster).

-include_lib("kernel/include/logger.hrl").
-include_lib("stdlib/include/assert.hrl").

-include("include/khepri.hrl").
-include("src/khepri_cluster.hrl").
-include("src/khepri_error.hrl").

-export([start/0, start/1, start/2, start/3,
         join/1, join/2,
         reset/0, reset/1, reset/2,
         stop/0, stop/1,
         members/1, members/2,
         locally_known_members/1, locally_known_members/2,
         nodes/1,
         locally_known_nodes/1,
         get_default_ra_system_or_data_dir/0,
         get_default_store_id/0,
         get_store_ids/0]).

%% Internal.
-export([node_to_member/2,
         this_member/1,
         wait_for_cluster_readiness/2,
         get_cached_leader/1,
         cache_leader/2,
         cache_leader_if_changed/3,
         clear_cached_leader/1]).

-ifdef(TEST).
-export([wait_for_ra_server_exit/1,
         generate_default_data_dir/0]).
-endif.

-dialyzer({no_underspecs, [start/1,
                           stop/0, stop/1,
                           stop_locked/1,
                           join/2,
                           wait_for_remote_cluster_readiness/3]}).

-define(IS_RA_SYSTEM(RaSystem), is_atom(RaSystem)).
-define(IS_DATA_DIR(DataDir), (is_list(DataDir) orelse is_binary(DataDir))).

-type incomplete_ra_server_config() :: map().
%% A Ra server config map.
%%
%% This configuration map can lack the required parameters, Khepri will fill
%% them if necessary. Important parameters for Khepri (e.g. `machine') will be
%% overridden anyway.
%%
%% @see ra_server:ra_server_config/0.

-type ra_server_config_with_cluster_name() :: #{cluster_name :=
                                                khepri:store_id()}.
%% Intermediate Ra server configuration with `cluster_name' set.

-type ra_server_config_with_id_and_cn() :: #{id := ra:server_id(),
                                             cluster_name :=
                                             khepri:store_id()}.
%% Intermediate Ra server configuration with `id' and `cluster_name' set.

-export_type([incomplete_ra_server_config/0]).

%% -------------------------------------------------------------------
%% Database management.
%% -------------------------------------------------------------------

-spec start() -> Ret when
      Ret :: khepri:ok(StoreId) | khepri:error(),
      StoreId :: khepri:store_id().
%% @doc Starts a store.
%%
%% Calling this function is the same as calling `start(DefaultRaSystem)' where
%% `DefaultRaSystem' is returned by {@link
%% get_default_ra_system_or_data_dir/0}.
%%
%% @see start/1.

start() ->
    case application:ensure_all_started(khepri) of
        {ok, _} ->
            RaSystemOrDataDir = get_default_ra_system_or_data_dir(),
            start(RaSystemOrDataDir);
        Error ->
            Error
    end.

-spec start(RaSystem | DataDir) -> Ret when
      RaSystem :: atom(),
      DataDir :: file:filename_all(),
      Ret :: khepri:ok(StoreId) | khepri:error(),
      StoreId :: khepri:store_id().
%% @doc Starts a store.
%%
%% Calling this function is the same as calling `start(RaSystemOrDataDir,
%% DefaultStoreId)' where `DefaultStoreId' is returned by {@link
%% get_default_store_id/0}.
%%
%% @see start/2.

start(RaSystemOrDataDir) ->
    case application:ensure_all_started(khepri) of
        {ok, _} ->
            StoreId = get_default_store_id(),
            start(RaSystemOrDataDir, StoreId);
        Error ->
            Error
    end.

-spec start(RaSystem | DataDir, StoreId | RaServerConfig) -> Ret when
      RaSystem :: atom(),
      DataDir :: file:filename_all(),
      StoreId :: khepri:store_id(),
      RaServerConfig :: incomplete_ra_server_config(),
      Ret :: khepri:ok(StoreId) | khepri:error().
%% @doc Starts a store.
%%
%% Calling this function is the same as calling `start(RaSystemOrDataDir,
%% StoreIdOrRaServerConfig, DefaultTimeout)' where `DefaultTimeout' is
%% returned by {@link khepri_app:get_default_timeout/0}.
%%
%% @param RaSystem the name of the Ra system.
%% @param DataDir a directory to write data.
%% @param StoreId the name of the Khepri store.
%% @param RaServerConfig the skeleton of each Ra server's configuration.
%%
%% @see start/3.

start(RaSystemOrDataDir, StoreIdOrRaServerConfig) ->
    Timeout = khepri_app:get_default_timeout(),
    start(RaSystemOrDataDir, StoreIdOrRaServerConfig, Timeout).

-spec start(RaSystem | DataDir, StoreId | RaServerConfig, Timeout) ->
    Ret when
      RaSystem :: atom(),
      DataDir :: file:filename_all(),
      StoreId :: khepri:store_id(),
      RaServerConfig :: incomplete_ra_server_config(),
      Timeout :: timeout(),
      Ret :: khepri:ok(StoreId) | khepri:error().
%% @doc Starts a store.
%%
%% It accepts either a Ra system name (atom) or a data directory (string or
%% binary) as its first argument. If a Ra system name is given, that Ra system
%% must be running when this function is called. If a data directory is given,
%% a new Ra system will be started, using this directory. The directory will
%% be created automatically if it doesn't exist. The Ra system will use the
%% same name as the Khepri store.
%%
%% It accepts a Khepri store ID or a Ra server configuration as its second
%% argument. If a store ID is given, a Ra server configuration will be created
%% based on it. If a Ra server configuration is given, the name of the Khepri
%% store will be derived from it.
%%
%% If this is a new store, the Ra server is started and an election is
%% triggered so that it becomes its own leader and is ready to process
%% commands and queries.
%%
%% If the store was started in the past and stopped, it will be restarted. In
%% this case, `RaServerConfig' will be ignored. Ra will take care of the
%% eletion automatically.
%%
%% @param RaSystem the name of the Ra system.
%% @param DataDir a directory to write data.
%% @param StoreId the name of the Khepri store.
%% @param RaServerConfig the skeleton of each Ra server's configuration.
%% @param TImeout a timeout.
%%
%% @returns the ID of the started store in an "ok" tuple, or an error tuple if
%% the store couldn't be started.

start(RaSystemOrDataDir, StoreIdOrRaServerConfig, Timeout) ->
    case application:ensure_all_started(khepri) of
        {ok, _} ->
            ensure_ra_server_config_and_start(
              RaSystemOrDataDir, StoreIdOrRaServerConfig, Timeout);
        Error ->
            Error
    end.

-spec ensure_ra_server_config_and_start(
        RaSystem | DataDir, StoreId | RaServerConfig, Timeout) ->
    Ret when
      RaSystem :: atom(),
      DataDir :: file:filename_all(),
      StoreId :: khepri:store_id(),
      RaServerConfig :: incomplete_ra_server_config(),
      Timeout :: timeout(),
      Ret :: khepri:ok(StoreId) | khepri:error().
%% @private

ensure_ra_server_config_and_start(
  RaSystemOrDataDir, StoreIdOrRaServerConfig, Timeout)
  when (?IS_DATA_DIR(RaSystemOrDataDir) orelse
        ?IS_RA_SYSTEM(RaSystemOrDataDir)) andalso
       (?IS_STORE_ID(StoreIdOrRaServerConfig) orelse
        is_map(StoreIdOrRaServerConfig)) ->
    %% If the store ID derived from `StoreIdOrRaServerConfig' is not an atom,
    %% it will cause a cause clause exception below.
    RaServerConfig =
    case StoreIdOrRaServerConfig of
        _ when ?IS_STORE_ID(StoreIdOrRaServerConfig) ->
            #{cluster_name => StoreIdOrRaServerConfig};
        #{cluster_name := CN} when ?IS_STORE_ID(CN) ->
            StoreIdOrRaServerConfig;
        #{} when not is_map_key(cluster_name, StoreIdOrRaServerConfig) ->
            StoreIdOrRaServerConfig#{
              cluster_name => get_default_store_id()
             }
    end,
    verify_ra_system_and_start(RaSystemOrDataDir, RaServerConfig, Timeout).

-spec verify_ra_system_and_start(
        RaSystem | DataDir, RaServerConfig, Timeout) ->
    Ret when
      RaSystem :: atom(),
      DataDir :: file:filename_all(),
      RaServerConfig :: ra_server_config_with_cluster_name(),
      Timeout :: timeout(),
      Ret :: khepri:ok(StoreId) | khepri:error(),
      StoreId :: khepri:store_id().
%% @private

verify_ra_system_and_start(RaSystem, RaServerConfig, Timeout)
  when ?IS_RA_SYSTEM(RaSystem) ->
    ensure_server_started(RaSystem, RaServerConfig, Timeout);
verify_ra_system_and_start(DataDir, RaServerConfig, Timeout)
  when is_list(DataDir) ->
    RaSystem = ?DEFAULT_RA_SYSTEM_NAME,
    DefaultConfig = ra_system:default_config(),
    RaSystemConfig = DefaultConfig#{name => RaSystem,
                                    data_dir => DataDir,
                                    wal_data_dir => DataDir,
                                    names => ra_system:derive_names(RaSystem)},
    ?LOG_DEBUG(
       "Starting Ra system \"~s\" using data dir \"~ts\"",
       [RaSystem, DataDir]),
    case ra_system:start(RaSystemConfig) of
        {ok, _} ->
            ensure_server_started(RaSystem, RaServerConfig, Timeout);
        {error, {already_started, _}} ->
            ensure_server_started(RaSystem, RaServerConfig, Timeout);
        Error ->
            Error
    end;
verify_ra_system_and_start(DataDir, RaServerConfig, Timeout)
  when is_binary(DataDir) ->
    DataDir1 = unicode:characters_to_list(DataDir),
    verify_ra_system_and_start(DataDir1, RaServerConfig, Timeout).

-spec ensure_server_started(RaSystem, RaServerConfig, Timeout) -> Ret when
      RaSystem :: atom(),
      RaServerConfig :: ra_server_config_with_cluster_name(),
      Timeout :: timeout(),
      Ret :: khepri:ok(StoreId) | khepri:error(),
      StoreId :: khepri:store_id().
%% @private

ensure_server_started(
  RaSystem, #{cluster_name := StoreId} = RaServerConfig, Timeout) ->
    Lock = server_start_lock(StoreId),
    global:set_lock(Lock),
    try
        Ret = ensure_server_started_locked(RaSystem, RaServerConfig, Timeout),
        global:del_lock(Lock),
        Ret
    catch
        Class:Reason:Stacktrace ->
            global:del_lock(Lock),
            erlang:raise(Class, Reason, Stacktrace)
    end.

-spec ensure_server_started_locked(RaSystem, RaServerConfig, Timeout) ->
    Ret when
      RaSystem :: atom(),
      RaServerConfig :: ra_server_config_with_cluster_name(),
      Timeout :: timeout(),
      Ret :: khepri:ok(StoreId) | khepri:error(),
      StoreId :: khepri:store_id().
%% @private

ensure_server_started_locked(
  RaSystem, #{cluster_name := StoreId} = RaServerConfig, Timeout) ->
    ThisMember = this_member(StoreId),
    RaServerConfig1 = RaServerConfig#{id => ThisMember},
    ?LOG_DEBUG(
       "Trying to restart local Ra server for store \"~s\" "
       "in Ra system \"~s\"",
       [StoreId, RaSystem]),
    case ra:restart_server(RaSystem, ThisMember) of
        {error, name_not_registered} ->
            ?LOG_DEBUG(
               "Ra server for store \"~s\" not registered in Ra system "
               "\"~s\", try to start a new one",
               [StoreId, RaSystem]),
            case do_start_server(RaSystem, RaServerConfig1) of
                ok ->
                    ok = trigger_election(RaServerConfig1, Timeout),
                    {ok, StoreId};
                Error ->
                    Error
            end;
        ok ->
            ok = remember_store(RaSystem, RaServerConfig1),
            {ok, StoreId};
        {error, {already_started, _}} ->
            {ok, StoreId};
        Error ->
            Error
    end.

-spec do_start_server(RaSystem, RaServerConfig) -> Ret when
      RaSystem :: atom(),
      RaServerConfig :: ra_server_config_with_id_and_cn(),
      Ret :: ok | khepri:error().
%% @private

do_start_server(RaSystem, RaServerConfig) ->
    RaServerConfig1 = complete_ra_server_config(RaServerConfig),
    #{cluster_name := StoreId} = RaServerConfig1,
    ?LOG_DEBUG(
       "Starting a Ra server with the following configuration:~n~p",
       [RaServerConfig1]),
    case ra:start_server(RaSystem, RaServerConfig1) of
        ok ->
            ok = remember_store(RaSystem, RaServerConfig1),
            ?LOG_DEBUG(
               "Started Ra server for store \"~s\"",
               [StoreId]),
            ok;
        {error, _} = Error ->
            ?LOG_ERROR(
               "Failed to start Ra server for store \"~s\" using the "
               "following Ra server configuration:~n~p",
               [StoreId, RaServerConfig1]),
            Error
    end.

-spec trigger_election(Member | RaServerConfig, Timeout) -> ok when
      Member :: ra:server_id(),
      RaServerConfig :: ra_server_config_with_id_and_cn(),
      Timeout :: timeout().
%% @private

trigger_election(#{id := Member}, Timeout) ->
    trigger_election(Member, Timeout);
trigger_election({StoreId, _Node} = Member, Timeout) ->
    ?LOG_DEBUG("Trigger election in store \"~s\"", [StoreId]),
    ok = ra:trigger_election(Member, Timeout),
    ok.

-spec stop() -> Ret when
      Ret :: ok | khepri:error().
%% @doc Stops a store.
%%
%% Calling this function is the same as calling `stop(DefaultStoreId)'
%% where `DefaultStoreId' is returned by {@link
%% get_default_store_id/0}.
%%
%% @see stop/1.

stop() ->
    StoreId = get_default_store_id(),
    stop(StoreId).

-spec stop(StoreId) -> Ret when
      StoreId :: khepri:store_id(),
      Ret :: ok | khepri:error().
%% @doc Stops a store.
%%
%% @param StoreId the ID of the store to stop.
%%
%% @returns `ok' if it succeeds, an error tuple otherwise.

stop(StoreId) when ?IS_STORE_ID(StoreId) ->
    Lock = server_start_lock(StoreId),
    global:set_lock(Lock),
    try
        Ret = stop_locked(StoreId),
        global:del_lock(Lock),
        Ret
    catch
        Class:Reason:Stacktrace ->
            global:del_lock(Lock),
            erlang:raise(Class, Reason, Stacktrace)
    end.

-spec stop_locked(StoreId) -> Ret when
      StoreId :: khepri:store_id(),
      Ret :: ok | khepri:error().

stop_locked(StoreId) ->
    ThisMember = this_member(StoreId),
    case get_store_prop(StoreId, ra_system) of
        {ok, RaSystem} ->
            ?LOG_DEBUG(
               "Stopping member ~0p in store \"~s\"",
               [ThisMember, StoreId]),
            case ra:stop_server(RaSystem, ThisMember) of
                ok ->
                    forget_store(StoreId),
                    wait_for_ra_server_exit(ThisMember);
                %% TODO: Handle idempotency: if the Ra server is not running,
                %% don't fail.
                {error, _} = Error ->
                    %% We don't call `forget_store()' in case the caller wants
                    %% to try again.
                    Error
            end;
        {error, _} ->
            %% The store is unknown, it must have been stopped already.
            ok
    end.

-spec wait_for_ra_server_exit(Member) -> ok when
      Member :: ra:server_id().
%% @private

wait_for_ra_server_exit({StoreId, _} = Member) ->
    %% FIXME: This monitoring can go away when/if the following pull request
    %% in Ra is merged:
    %% https://github.com/rabbitmq/ra/pull/270
    ?LOG_DEBUG(
       "Wait for Ra server ~0p to exit in store \"~s\"",
       [Member, StoreId]),
    MRef = erlang:monitor(process, Member),
    receive
        {'DOWN', MRef, _, _, noproc} ->
            ?LOG_DEBUG(
               "Ra server ~0p in store \"~s\" already exited",
               [Member, StoreId]),
            ok;
        {'DOWN', MRef, _, _, Reason} ->
            ?LOG_DEBUG(
               "Ra server ~0p in store \"~s\" exited: ~p",
               [Member, StoreId, Reason]),
            ok
    end.

-spec join(RemoteMember | RemoteNode) -> Ret when
      RemoteMember :: ra:server_id(),
      RemoteNode :: node(),
      Ret :: ok | khepri:error().
%% @doc Adds the local running Khepri store to a remote cluster.
%%
%% This function accepts the following forms:
%% <ul>
%% <li>`join(RemoteNode)'. Calling it is the same as calling
%% `join(DefaultStoreId, RemoteNode)' where `DefaultStoreId' is
%% returned by {@link get_default_store_id/0}.</li>
%% <li>`join(RemoteMember)'. Calling it is the same as calling
%% `join(StoreId, RemoteNode)' where `StoreId' and `RemoteNode' are
%% derived from `RemoteMember'.</li>
%% </ul>
%%
%% @see join/2.

join(RemoteNode) when is_atom(RemoteNode) ->
    StoreId = get_default_store_id(),
    join(StoreId, RemoteNode);
join({StoreId, RemoteNode} = _RemoteMember) ->
    join(StoreId, RemoteNode).

-spec join(
        RemoteMember | RemoteNode | StoreId, Timeout | RemoteNode) ->
    Ret when
      RemoteMember :: ra:server_id(),
      RemoteNode :: node(),
      StoreId :: khepri:store_id(),
      Timeout :: timeout(),
      Ret :: ok | khepri:error().
%% @doc Adds the local running Khepri store to a remote cluster.
%%
%% This function accepts the following forms:
%% <ul>
%% <li>`join(RemoteNode, Timeout)'. Calling it is the same as calling
%% `join(DefaultStoreId, RemoteNode, Timeout)' where `DefaultStoreId'
%% is returned by {@link get_default_store_id/0}.</li>
%% <li>`join(StoreId, RemoteNode)'. Calling it is the same as calling
%% `join(StoreId, RemoteNode, DefaultTimeout)' where `DefaultTimeout' is
%% returned by {@link khepri_app:get_default_timeout/0}.</li>
%% <li>`join(RemoteMember, Timeout)'. Calling it is the same as calling
%% `join(StoreId, RemoteNode, Timeout)' where `StoreId' and
%% `RemoteNode' are derived from `RemoteMember'.</li>
%% </ul>
%%
%% @see join/3.

join(RemoteNode, Timeout)
  when is_atom(RemoteNode) andalso ?IS_TIMEOUT(Timeout) ->
    StoreId = get_default_store_id(),
    join(StoreId, RemoteNode, Timeout);
join(StoreId, RemoteNode)
  when ?IS_STORE_ID(StoreId) andalso is_atom(RemoteNode) ->
    Timeout = khepri_app:get_default_timeout(),
    join(StoreId, RemoteNode, Timeout);
join({StoreId, RemoteNode} = _RemoteMember, Timeout) ->
    join(StoreId, RemoteNode, Timeout).

-spec join(StoreId, RemoteMember | RemoteNode, Timeout) -> Ret when
      StoreId :: khepri:store_id(),
      RemoteMember :: ra:server_id(),
      RemoteNode :: node(),
      Timeout :: timeout(),
      Ret :: ok | khepri:error().
%% @doc Adds the local running Khepri store to a remote cluster.
%%
%% The local Khepri store must have been started with {@link start/3} before
%% it can be added to a cluster. It is also expected that the remote store ID
%% is the same as the local one.
%%
%% `RemoteNode' is the entry point to the remote cluster. It must run for this
%% function to work. A cluster membership also requires a quorum, therefore
%% this join depends on it to succeed.
%%
%% If `RemoteMember' is specified, the remote node is derived from it. At the
%% same time, the function asserts that the specified `StoreId' matches
%% the one derived from `RemoteMember'.
%%
%% As part of this function, the local Khepri store will reset. It means it
%% will leave the cluster it is already part of (if any) and all its data
%% removed.
%%
%% @param StoreId the ID of the local Khepri store.
%% @param RemoteNode the name of remote Erlang node running Khepri to join.
%% @param Timeout the timeout.
%%
%% @returns `ok' if it succeeds, an error tuple otherwise.

join(StoreId, RemoteNode, Timeout) when is_atom(RemoteNode) ->
    %% We first ping the remote node. It serves two purposes:
    %% 1. Make sure we can reach it
    %% 2. Make sure they are connected before acquiring a lock, so that the
    %%    global lock is really global.
    case net_adm:ping(RemoteNode) of
        pong ->
            Lock = server_start_lock(StoreId),
            global:set_lock(Lock),
            try
                Ret = check_status_and_join_locked(
                        StoreId, RemoteNode, Timeout),
                global:del_lock(Lock),
                Ret
            catch
                Class:Reason:Stacktrace ->
                    global:del_lock(Lock),
                    erlang:raise(Class, Reason, Stacktrace)
            end;
        pang ->
            Reason = ?khepri_error(
                        failed_to_join_remote_khepri_node,
                        #{store_id => StoreId,
                          node => RemoteNode}),
            {error, Reason}
    end;
join(StoreId, {StoreId, RemoteNode} = _RemoteMember, Timeout) ->
    join(StoreId, RemoteNode, Timeout).

-spec check_status_and_join_locked(StoreId, RemoteNode, Timeout) ->
    Ret when
      StoreId :: khepri:store_id(),
      RemoteNode :: node(),
      Timeout :: timeout(),
      Ret :: ok | khepri:error().
%% @private

check_status_and_join_locked(StoreId, RemoteNode, Timeout) ->
    ThisMember = this_member(StoreId),
    RaServerRunning = erlang:is_pid(erlang:whereis(StoreId)),
    Prop1 = get_store_prop(StoreId, ra_system),
    Prop2 = get_store_prop(StoreId, ra_server_config),
    case {RaServerRunning, Prop1, Prop2} of
        {true, {ok, RaSystem}, {ok, RaServerConfig}} ->
            reset_and_join_locked(
              StoreId, ThisMember, RaSystem, RaServerConfig,
              RemoteNode, Timeout);
        {false, {error, _} = Error, _} ->
            Error;
        {false, _, {error, _} = Error} ->
            Error;
        {false, _, _} ->
            ?LOG_ERROR(
               "Local Ra server ~0p not running for store \"~s\", "
               "but properties are still available: ~0p and ~0p",
               [ThisMember, StoreId, Prop1, Prop2]),
            erlang:error(
              ?khepri_exception(
                 ra_server_not_running_but_props_available,
                 #{store_id => StoreId,
                   this_member => ThisMember,
                   ra_system => Prop1,
                   ra_server_config => Prop2}))
    end.

-spec reset_and_join_locked(
  StoreId, ThisMember, RaSystem, RaServerConfig, RemoteNode, Timeout) ->
    Ret when
      StoreId :: khepri:store_id(),
      ThisMember :: ra:server_id(),
      RaSystem :: atom(),
      RaServerConfig :: ra_server:config(),
      RemoteNode :: node(),
      Timeout :: timeout(),
      Ret :: ok | khepri:error().
%% @private

reset_and_join_locked(
  StoreId, ThisMember, RaSystem, RaServerConfig, RemoteNode, Timeout) ->
    %% The local node is reset in case it is already a standalone elected
    %% leader (which would be the case after a successful call to
    %% `khepri_cluster:start()') or part of a cluster, and have any data.
    %%
    %% Just after the reset, we restart it skipping the `trigger_election()'
    %% step: this is required so that it does not become a leader before
    %% joining the remote node. Otherwise, we hit an assertion in Ra.
    %%
    %% TODO: Should we verify the cluster membership first? To avoid resetting
    %% a node which is already part of the cluster? On the other hand, such a
    %% check would not be atomic and the membership could change between the
    %% check and the reset...
    %%
    %% TODO: Do we want to provide an option to verify the state of the local
    %% node before resetting it? Like "if it has data in the Khepri database,
    %% abort". It may be difficult to make this kind of check atomic though.
    T0 = khepri_utils:start_timeout_window(Timeout),
    case do_reset(RaSystem, StoreId, ThisMember, Timeout) of
        ok ->
            NewTimeout = khepri_utils:end_timeout_window(Timeout, T0),
            case do_start_server(RaSystem, RaServerConfig) of
                ok ->
                    do_join_locked(
                      StoreId, ThisMember, RemoteNode, NewTimeout);
                Error ->
                    Error
            end;
        Error ->
            Error
    end.

-spec do_join_locked(
  StoreId, ThisMember, RemoteNode, Timeout) ->
    Ret when
      StoreId :: khepri:store_id(),
      ThisMember :: ra:server_id(),
      RemoteNode :: node(),
      Timeout :: timeout(),
      Ret :: ok | khepri:error().
%% @private

do_join_locked(StoreId, ThisMember, RemoteNode, Timeout) ->
    RemoteMember = node_to_member(StoreId, RemoteNode),
    ?LOG_DEBUG(
       "Adding this node (~0p) to the remote node's cluster (~0p)",
       [ThisMember, RemoteMember]),
    T1 = khepri_utils:start_timeout_window(Timeout),
    Ret1 = rpc:call(
             RemoteNode,
             ra, add_member, [StoreId, ThisMember, Timeout],
             Timeout),
    Timeout1 = khepri_utils:end_timeout_window(Timeout, T1),
    case Ret1 of
        {ok, _, _StoreId} ->
            ?LOG_DEBUG(
               "Cluster for store \"~s\" successfully expanded",
               [StoreId]),
            ok;
        {error, cluster_change_not_permitted} ->
            T2 = khepri_utils:start_timeout_window(Timeout1),
            ?LOG_DEBUG(
               "Remote cluster (reached through node node ~0p) is not ready "
               "for a membership change yet; waiting", [RemoteNode]),
            Ret2 = wait_for_remote_cluster_readiness(
                     StoreId, RemoteNode, Timeout1),
            Timeout2 = khepri_utils:end_timeout_window(Timeout1, T2),
            case Ret2 of
                ok ->
                    do_join_locked(
                      StoreId, ThisMember, RemoteNode, Timeout2);
                Error ->
                    Error
            end;
        Error ->
            ?LOG_ERROR(
               "Failed to expand cluster for store \"~s\": ~p; aborting",
               [StoreId, Error]),
            %% After failing to join, the local Ra server is running
            %% standalone (after a reset) and needs an election to be in a
            %% working state again. We don't care about the result at this
            %% point.
            _ = trigger_election(ThisMember, Timeout1),
            case Error of
                {badrpc, _} -> {error, Error};
                _           -> Error
            end
    end.

-spec wait_for_cluster_readiness(StoreId, Timeout) ->
    Ret when
      StoreId :: khepri:store_id(),
      Timeout :: timeout(),
      Ret :: ok | khepri:error(?khepri_error(
                                  timeout_waiting_for_cluster_readiness,
                                  #{store_id := StoreId})).
%% @private

wait_for_cluster_readiness(StoreId, Timeout) ->
    %% If querying the cluster members succeeds, we must have a quorum, right?
    case members(StoreId, Timeout) of
        [_ | _] ->
            ok;
        [] ->
            Reason = ?khepri_error(
                        timeout_waiting_for_cluster_readiness,
                        #{store_id => StoreId}),
            {error, Reason}
    end.

-spec wait_for_remote_cluster_readiness(StoreId, RemoteNode, Timeout) ->
    Ret when
      StoreId :: khepri:store_id(),
      RemoteNode :: node(),
      Timeout :: timeout(),
      Ret :: ok | khepri:error().
%% @private

wait_for_remote_cluster_readiness(StoreId, RemoteNode, Timeout) ->
    Ret = rpc:call(
            RemoteNode,
            khepri_cluster, wait_for_cluster_readiness, [StoreId, Timeout],
            Timeout),
    case Ret of
        {badrpc, _} -> {error, Ret};
        _           -> Ret
    end.

-spec reset() -> Ret when
      Ret :: ok | khepri:error().
%% @doc Resets the store on this Erlang node.

reset() ->
    StoreId = get_default_store_id(),
    reset(StoreId).

-spec reset(StoreId | Timeout) -> Ret when
      StoreId :: khepri:store_id(),
      Timeout :: timeout(),
      Ret :: ok | khepri:error().
%% @doc Resets the store on this Erlang node.

reset(Timeout)
  when ?IS_TIMEOUT(Timeout) ->
    StoreId = get_default_store_id(),
    reset(StoreId, Timeout);
reset(StoreId)
  when ?IS_STORE_ID(StoreId) ->
    Timeout = khepri_app:get_default_timeout(),
    reset(StoreId, Timeout).

-spec reset(StoreId, Timeout) -> Ret when
      StoreId :: khepri:store_id(),
      Timeout :: timeout(),
      Ret :: ok | khepri:error().
%% @doc Resets the store on this Erlang node.
%%
%% It does that by force-deleting the Ra local server.
%%
%% This function is also used to gracefully remove the local Khepri store node
%% from a cluster.
%%
%% @param StoreId the name of the Khepri store.

reset(StoreId, Timeout) ->
    Lock = server_start_lock(StoreId),
    global:set_lock(Lock),
    try
        Ret = reset_locked(StoreId, Timeout),
        global:del_lock(Lock),
        Ret
    catch
        Class:Reason:Stacktrace ->
            global:del_lock(Lock),
            erlang:raise(Class, Reason, Stacktrace)
    end.

reset_locked(StoreId, Timeout) ->
    ThisMember = this_member(StoreId),
    case get_store_prop(StoreId, ra_system) of
        {ok, RaSystem}     -> do_reset(RaSystem, StoreId, ThisMember, Timeout);
        {error, _} = Error -> Error
    end.

do_reset(RaSystem, StoreId, ThisMember, Timeout) ->
    ?LOG_DEBUG(
       "Detaching this node (~0p) in store \"~s\" from cluster (if any) "
       "before reset",
       [ThisMember, StoreId]),
    T1 = khepri_utils:start_timeout_window(Timeout),
    Ret1 = ra:remove_member(ThisMember, ThisMember, Timeout),
    Timeout1 = khepri_utils:end_timeout_window(Timeout, T1),
    case Ret1 of
        {ok, _, _} ->
            force_stop(RaSystem, StoreId, ThisMember);
        {error, not_member} ->
            force_stop(RaSystem, StoreId, ThisMember);
        {error, cluster_change_not_permitted} ->
            T2 = khepri_utils:start_timeout_window(Timeout1),
            ?LOG_DEBUG(
               "Cluster is not ready for a membership change yet; waiting",
               []),
            Ret2 = wait_for_cluster_readiness(StoreId, Timeout1),
            Timeout2 = khepri_utils:end_timeout_window(Timeout1, T2),
            case Ret2 of
                ok    -> do_reset(RaSystem, StoreId, ThisMember, Timeout2);
                Error -> Error
            end;
        {timeout, _} = TimedOut ->
            {error, TimedOut};
        {error, _} = Error ->
            Error
    end.

force_stop(RaSystem, StoreId, ThisMember) ->
    ?LOG_DEBUG(
       "Resetting member ~0p in store \"~s\"",
       [ThisMember, StoreId]),
    case ra:force_delete_server(RaSystem, ThisMember) of
        ok ->
            forget_store(StoreId),
            wait_for_ra_server_exit(ThisMember);
        {error, noproc} = Error ->
            forget_store(StoreId),
            Error;
        {error, _} = Error ->
            Error
    end.

-spec get_default_ra_system_or_data_dir() -> RaSystem | DataDir when
      RaSystem :: atom(),
      DataDir :: file:filename_all().
%% @doc Returns the default Ra system name or data directory.
%%
%% This is based on Khepri's `default_ra_system` application environment
%% variable. The variable can be set to:
%% <ul>
%% <li>A directory (a string or binary) where data should be stored. A new Ra
%% system called `khepri` will be initialized with this directory.</li>
%% <li>A Ra system name (an atom). In this case, the user is expected to
%% configure and start the Ra system before starting Khepri.</li>
%% </ul>
%%
%% If this application environment variable is unset, the default is to
%% configure a Ra system called `khepri' which will write data in
%% `"khepri-$NODE"' in the current working directory where `$NODE' is the
%% Erlang node name.
%%
%% Example of an Erlang configuration file for Khepri:
%% ```
%% {khepri, [{default_ra_system, "/var/db/khepri"}]}.
%% '''
%%
%% @returns the value of the `default_ra_system' application environment
%% variable.

get_default_ra_system_or_data_dir() ->
    RaSystemOrDataDir = application:get_env(
                          khepri, default_ra_system,
                          generate_default_data_dir()),
    if
        ?IS_DATA_DIR(RaSystemOrDataDir) ->
            ok;
        ?IS_RA_SYSTEM(RaSystemOrDataDir) ->
            ok;
        true ->
            ?LOG_ERROR(
               "Invalid Ra system or data directory set in "
               "`default_ra_system` application environment: ~p",
               [RaSystemOrDataDir]),
            ?khepri_misuse(
               invalid_default_ra_system_value,
               #{default_ra_system => RaSystemOrDataDir})
    end,
    RaSystemOrDataDir.

generate_default_data_dir() ->
    lists:flatten(io_lib:format("khepri#~s", [node()])).

-spec get_default_store_id() -> StoreId when
      StoreId :: khepri:store_id().
%% @doc Returns the default Khepri store ID.
%%
%% This is based on Khepri's `default_store_id' application environment
%% variable. The variable can be set to an atom. The default is `khepri'.
%%
%% @returns the value of the `default_store_id' application environment
%% variable.

get_default_store_id() ->
    StoreId = application:get_env(
                    khepri, default_store_id,
                    ?DEFAULT_STORE_ID),
    if
        ?IS_STORE_ID(StoreId) ->
            ok;
        true ->
            ?LOG_ERROR(
               "Invalid store ID set in `default_store_id` "
               "application environment: ~p",
               [StoreId]),
            ?khepri_misuse(
               invalid_default_store_id_value,
               #{default_store_id => StoreId})
    end,
    StoreId.

members(StoreId) ->
    Timeout = khepri_app:get_default_timeout(),
    members(StoreId, Timeout).

members(StoreId, Timeout) ->
    ThisMember = this_member(StoreId),
    do_query_members(StoreId, ThisMember, leader, Timeout).

locally_known_members(StoreId) ->
    Timeout = khepri_app:get_default_timeout(),
    locally_known_members(StoreId, Timeout).

locally_known_members(StoreId, Timeout) ->
    ThisMember = this_member(StoreId),
    do_query_members(StoreId, ThisMember, local, Timeout).

do_query_members(StoreId, RaServer, QueryType, Timeout) ->
    ?LOG_DEBUG("Query members in store \"~s\"", [StoreId]),
    T0 = khepri_utils:start_timeout_window(Timeout),
    Arg = case QueryType of
              leader -> RaServer;
              local  -> {local, RaServer}
          end,
    case ra:members(Arg, Timeout) of
        {ok, Members, _} ->
            ?LOG_DEBUG(
               "Found the following members in store \"~s\": ~p",
               [StoreId, Members]),
            Members;
        {error, noproc} = Error ->
            case khepri_utils:is_ra_server_alive(RaServer) of
                true ->
                    NewTimeout0 = khepri_utils:end_timeout_window(Timeout, T0),
                    NewTimeout = khepri_utils:sleep(
                                   ?NOPROC_RETRY_INTERVAL, NewTimeout0),
                    do_query_members(
                      StoreId, RaServer, QueryType, NewTimeout);
                false ->
                    ?LOG_WARNING(
                       "Failed to query members in store \"~s\": ~p",
                       [StoreId, Error]),
                    []
            end;
        Error ->
            ?LOG_WARNING(
               "Failed to query members in store \"~s\": ~p",
               [StoreId, Error]),
            []
    end.

nodes(StoreId) ->
    [Node || {_, Node} <- members(StoreId)].

locally_known_nodes(StoreId) ->
    [Node || {_, Node} <- locally_known_members(StoreId)].

-spec node_to_member(StoreId, Node) -> Member when
      StoreId :: khepri:store_id(),
      Node :: node(),
      Member :: ra:server_id().
%% @private

node_to_member(StoreId, Node) ->
    {StoreId, Node}.

-spec this_member(StoreId) -> Member when
      StoreId :: khepri:store_id(),
      Member :: ra:server_id().
%% @private

this_member(StoreId) ->
    ThisNode = node(),
    node_to_member(StoreId, ThisNode).

server_start_lock(StoreId) ->
    {{khepri, StoreId}, self()}.

complete_ra_server_config(#{cluster_name := StoreId,
                            id := Member} = RaServerConfig) ->
    %% We warn the caller if he sets `initial_members' that the setting will
    %% be ignored. The reason is we mandate the use of
    %% `khepri_cluster:start()' to start the local node (and can't use on the
    %% "auto-cluster start" of Ra) because we also populate a few
    %% persistent_term for Ra system & server config bookkeeping.
    RaServerConfig1 = case RaServerConfig of
                          #{initial_members := _} ->
                              ?LOG_WARNING(
                                 "Initial Ra cluster members in the "
                                 "following Ra server config "
                                 "(`initial_members`) will be ignored: ~p",
                                 [RaServerConfig]),
                              maps:remove(initial_members, RaServerConfig);
                          _ ->
                              RaServerConfig
                      end,

    %% We set a default friendly name for this Ra server if the caller didn't
    %% set any.
    RaServerConfig2 = case RaServerConfig1 of
                          #{friendly_name := _} ->
                              RaServerConfig1;
                          _ ->
                              FriendlyName = lists:flatten(
                                               io_lib:format(
                                                 "Khepri store \"~s\"",
                                                 [StoreId])),
                              RaServerConfig1#{friendly_name => FriendlyName}
                      end,

    UId = ra:new_uid(ra_lib:to_binary(StoreId)),
    MachineConfig0 = case RaServerConfig of
                         #{machine_config := MachineConfig00} ->
                             MachineConfig00;
                         _ ->
                             #{}
                     end,
    MachineConfig = MachineConfig0#{store_id => StoreId,
                                    member => Member},
    Machine = {module, khepri_machine, MachineConfig},
    RaServerConfig2#{uid => UId,
                     log_init_args => #{uid => UId},
                     machine => Machine}.

-define(PT_STORE_IDS, {khepri, store_ids}).

-spec remember_store(RaSystem, RaServerConfig) -> ok when
      RaSystem :: atom(),
      RaServerConfig :: ra_server:ra_server_config().
%% @private

remember_store(RaSystem, #{cluster_name := StoreId} = RaServerConfig) ->
    ?assert(maps:is_key(id, RaServerConfig)),
    StoreIds = persistent_term:get(?PT_STORE_IDS, #{}),
    Props = #{ra_system => RaSystem,
              ra_server_config => RaServerConfig},
    StoreIds1 = StoreIds#{StoreId => Props},
    persistent_term:put(?PT_STORE_IDS, StoreIds1),
    ok.

-spec get_store_prop(StoreId, PropName) -> Ret when
      StoreId :: khepri:store_id(),
      PropName :: ra_system | ra_server_config,
      PropValue :: any(),
      Ret :: khepri:ok(PropValue) |
             khepri:error(?khepri_error(
                             not_a_khepri_store,
                             #{store_id := StoreId})).
%% @private

get_store_prop(StoreId, PropName) ->
    case persistent_term:get(?PT_STORE_IDS, #{}) of
        #{StoreId := #{PropName := PropValue}} ->
            {ok, PropValue};
        _ ->
            Reason = ?khepri_error(
                        not_a_khepri_store,
                        #{store_id => StoreId}),
            {error, Reason}
    end.

-spec forget_store(StoreId) -> ok when
      StoreId :: khepri:store_id().
%% @private

forget_store(StoreId) ->
    ok = khepri_machine:clear_cache(StoreId),
    ok = clear_cached_leader(StoreId),
    StoreIds = persistent_term:get(?PT_STORE_IDS, #{}),
    StoreIds1 = maps:remove(StoreId, StoreIds),
    case maps:size(StoreIds1) of
        0 -> _ = persistent_term:erase(?PT_STORE_IDS);
        _ -> ok = persistent_term:put(?PT_STORE_IDS, StoreIds1)
    end,
    ok.

-spec get_store_ids() -> [StoreId] when
      StoreId :: khepri:store_id().
%% @doc Returns the list of running stores.

get_store_ids() ->
    maps:keys(persistent_term:get(?PT_STORE_IDS, #{})).

%% Cache the Ra leader ID to avoid command/query redirections from a follower
%% to the leader. The leader ID is returned after each command or query. If we
%% don't know it yet, wait for a leader election using khepri_event_handler.

-define(RA_LEADER_CACHE_KEY(StoreId), {khepri, ra_leader_cache, StoreId}).

-spec get_cached_leader(StoreId) -> Ret when
      StoreId :: khepri:store_id(),
      Ret :: LeaderId | undefined,
      LeaderId :: ra:server_id().

get_cached_leader(StoreId) ->
    Key = ?RA_LEADER_CACHE_KEY(StoreId),
    persistent_term:get(Key, undefined).

-spec cache_leader(StoreId, LeaderId) -> ok when
      StoreId :: khepri:store_id(),
      LeaderId :: ra:server_id().

cache_leader(StoreId, LeaderId) ->
    ok = persistent_term:put(?RA_LEADER_CACHE_KEY(StoreId), LeaderId).

-spec cache_leader_if_changed(StoreId, LeaderId, NewLeaderId) -> ok when
      StoreId :: khepri:store_id(),
      LeaderId :: ra:server_id(),
      NewLeaderId :: ra:server_id().

cache_leader_if_changed(_StoreId, LeaderId, LeaderId) ->
    ok;
cache_leader_if_changed(StoreId, undefined, NewLeaderId) ->
    case persistent_term:get(?RA_LEADER_CACHE_KEY(StoreId), undefined) of
        LeaderId when LeaderId =/= undefined ->
            cache_leader_if_changed(StoreId, LeaderId, NewLeaderId);
        undefined ->
            cache_leader(StoreId, NewLeaderId)
    end;
cache_leader_if_changed(StoreId, _OldLeaderId, NewLeaderId) ->
    cache_leader(StoreId, NewLeaderId).

-spec clear_cached_leader(StoreId) -> ok when
      StoreId :: khepri:store_id().

clear_cached_leader(StoreId) ->
    _ = persistent_term:erase(?RA_LEADER_CACHE_KEY(StoreId)),
    ok.