%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright © 2021-2024 Broadcom. All Rights Reserved. The term "Broadcom"
%% refers to Broadcom Inc. and/or its subsidiaries.
%%
%% @doc Khepri service and cluster management API.
%%
%% This module provides the public API for the service and cluster management.
%% For convenience, some functions of this API are repeated in the {@link
%% khepri} module for easier access.
%%
%% == The Khepri store and the Ra cluster ==
%%
%% A Khepri store is a Ra server inside a Ra cluster. The Khepri store and the
%% Ra cluster share the same name in fact. The only constraint is that the name
%% must be an atom, even though Ra accepts other Erlang types as cluster names.
%%
%% By default, Khepri uses `khepri' as the store ID (and thus Ra cluster name).
%% This default can be overridden using an argument to the `start()' functions
%% or the `default_store_id' application environment variable.
%%
%% Examples:
%% <ul>
%% <li>Use the default Khepri store ID:
%% <pre>{ok, khepri} = khepri:start().</pre></li>
%% <li>Override the default store ID using an argument:
%% <pre>{ok, my_store} = khepri:start("/var/lib/khepri", my_store).</pre></li>
%% <li>Override the default store ID using an application environment variable:
%% <pre>ok = application:set_env(
%% khepri, default_store_id, my_store, [{persistent, true}]),
%%
%% {ok, my_store} = khepri:start().</pre></li>
%% </ul>
%%
%% == The data directory and the Ra system ==
%%
%% A Ra server relies on a Ra system to provide various functions and to
%% configure the directory where the data should be stored on disk.
%%
%% By default, Khepri will configure its own Ra system to write data under
%% `khepri#Nodename' in the current working directory, where `Nodename' is
%% the name of the Erlang node.
%%
%% ```
%% {ok, StoreId} = khepri:start().
%%
%% %% If the Erlang node was started without distribution (the default), the
%% %% statement above will start a Ra system called like the store (`khepri')
%% %% and will use the `khepri#nonode@nohost' directory.
%% '''
%%
%% The default data directory or Ra system name can be overridden using an
%% argument to the `start()' or the `default_ra_system' application environment
%% variable. Both a directory (string or binary) or the name of an already
%% running Ra system are accepted.
%%
%% Examples:
%% <ul>
%% <li>Override the default with the name of a running Ra system using an
%% argument:
%% <pre>{ok, StoreId} = khepri:start(my_ra_system).</pre></li>
%% <li>Override the default data directory using an application environment
%% variable:
%% <pre>ok = application:set_env(
%% khepri, default_ra_system, "/var/lib/khepri", [{persistent, true}]),
%%
%% {ok, StoreId} = khepri:start().</pre></li>
%% </ul>
%%
%% Please refer to <a href="https://github.com/rabbitmq/ra">Ra
%% documentation</a> to learn more about Ra systems and Ra clusters.
%%
%% == Managing Ra cluster members ==
%%
%% A Khepri/Ra cluster can be expanded by telling a node to join a remote
%% cluster. Note that the Khepri store/Ra server to add to the cluster must run
%% before it can join.
%%
%% ```
%% %% Start the local Khepri store.
%% {ok, StoreId} = khepri:start().
%%
%% %% Join a remote cluster.
%% ok = khepri_cluster:join(RemoteNode).
%% '''
%%
%% To remove the local Khepri store node from the cluster, it must be reset.
%%
%% ```
%% %% Start the local Khepri store.
%% ok = khepri_cluster:reset().
%% '''
-module(khepri_cluster).
-include_lib("kernel/include/logger.hrl").
-include_lib("stdlib/include/assert.hrl").
-include("include/khepri.hrl").
-include("src/khepri_cluster.hrl").
-include("src/khepri_error.hrl").
-export([start/0, start/1, start/2, start/3,
join/1, join/2,
reset/0, reset/1, reset/2,
stop/0, stop/1,
members/0, members/1, members/2,
locally_known_members/0,
locally_known_members/1,
locally_known_members/2,
nodes/0, nodes/1, nodes/2,
locally_known_nodes/0,
locally_known_nodes/1,
locally_known_nodes/2,
wait_for_leader/0, wait_for_leader/1, wait_for_leader/2,
get_default_ra_system_or_data_dir/0,
get_default_store_id/0,
get_store_ids/0,
is_store_running/1]).
%% Internal.
-export([node_to_member/2,
this_member/1]).
-ifdef(TEST).
-export([wait_for_ra_server_exit/1,
generate_default_data_dir/0]).
-endif.
-dialyzer({no_underspecs, [start/1,
stop/0, stop/1,
stop_locked/1,
join/2]}).
-define(IS_RA_SYSTEM(RaSystem), is_atom(RaSystem)).
-define(IS_RA_SERVER(RaServer), (is_tuple(RaServer) andalso
size(RaServer) =:= 2 andalso
is_atom(element(1, RaServer)) andalso
is_atom(element(2, RaServer)))).
-define(IS_DATA_DIR(DataDir), (is_list(DataDir) orelse is_binary(DataDir))).
-type incomplete_ra_server_config() :: map().
%% A Ra server config map.
%%
%% This configuration map can lack the required parameters, Khepri will fill
%% them if necessary. Important parameters for Khepri (e.g. `machine') will be
%% overridden anyway.
%%
%% @see ra_server:ra_server_config().
-type ra_server_config_with_cluster_name() :: #{cluster_name :=
khepri:store_id()}.
%% Intermediate Ra server configuration with `cluster_name' set.
-type ra_server_config_with_id_and_cn() :: #{id := ra:server_id(),
cluster_name :=
khepri:store_id()}.
%% Intermediate Ra server configuration with `id' and `cluster_name' set.
-export_type([incomplete_ra_server_config/0]).
%% -------------------------------------------------------------------
%% Database management.
%% -------------------------------------------------------------------
-spec start() -> Ret when
Ret :: khepri:ok(StoreId) | khepri:error(),
StoreId :: khepri:store_id().
%% @doc Starts a store.
%%
%% Calling this function is the same as calling `start(DefaultRaSystem)' where
%% `DefaultRaSystem' is returned by {@link
%% get_default_ra_system_or_data_dir/0}.
%%
%% @see start/1.
start() ->
case application:ensure_all_started(khepri) of
{ok, _} ->
RaSystemOrDataDir = get_default_ra_system_or_data_dir(),
start(RaSystemOrDataDir);
Error ->
Error
end.
-spec start(RaSystem | DataDir) -> Ret when
RaSystem :: atom(),
DataDir :: file:filename_all(),
Ret :: khepri:ok(StoreId) | khepri:error(),
StoreId :: khepri:store_id().
%% @doc Starts a store.
%%
%% Calling this function is the same as calling `start(RaSystemOrDataDir,
%% DefaultStoreId)' where `DefaultStoreId' is returned by {@link
%% get_default_store_id/0}.
%%
%% @see start/2.
start(RaSystemOrDataDir) ->
case application:ensure_all_started(khepri) of
{ok, _} ->
StoreId = get_default_store_id(),
start(RaSystemOrDataDir, StoreId);
Error ->
Error
end.
-spec start(RaSystem | DataDir, StoreId | RaServerConfig) -> Ret when
RaSystem :: atom(),
DataDir :: file:filename_all(),
StoreId :: khepri:store_id(),
RaServerConfig :: incomplete_ra_server_config(),
Ret :: khepri:ok(StoreId) | khepri:error().
%% @doc Starts a store.
%%
%% Calling this function is the same as calling `start(RaSystemOrDataDir,
%% StoreIdOrRaServerConfig, DefaultTimeout)' where `DefaultTimeout' is
%% returned by {@link khepri_app:get_default_timeout/0}.
%%
%% @param RaSystem the name of the Ra system.
%% @param DataDir a directory to write data.
%% @param StoreId the name of the Khepri store.
%% @param RaServerConfig the skeleton of each Ra server's configuration.
%%
%% @see start/3.
start(RaSystemOrDataDir, StoreIdOrRaServerConfig) ->
Timeout = khepri_app:get_default_timeout(),
start(RaSystemOrDataDir, StoreIdOrRaServerConfig, Timeout).
-spec start(RaSystem | DataDir, StoreId | RaServerConfig, Timeout) ->
Ret when
RaSystem :: atom(),
DataDir :: file:filename_all(),
StoreId :: khepri:store_id(),
RaServerConfig :: incomplete_ra_server_config(),
Timeout :: timeout(),
Ret :: khepri:ok(StoreId) | khepri:error().
%% @doc Starts a store.
%%
%% It accepts either a Ra system name (atom) or a data directory (string or
%% binary) as its first argument. If a Ra system name is given, that Ra system
%% must be running when this function is called. If a data directory is given,
%% a new Ra system will be started, using this directory. The directory will
%% be created automatically if it doesn't exist. The Ra system will use the
%% same name as the Khepri store.
%%
%% It accepts a Khepri store ID or a Ra server configuration as its second
%% argument. If a store ID is given, a Ra server configuration will be created
%% based on it. If a Ra server configuration is given, the name of the Khepri
%% store will be derived from it.
%%
%% If this is a new store, the Ra server is started and an election is
%% triggered so that it becomes its own leader and is ready to process
%% commands and queries.
%%
%% If the store was started in the past and stopped, it will be restarted. In
%% this case, `RaServerConfig' will be ignored. Ra will take care of the
%% eletion automatically.
%%
%% @param RaSystem the name of the Ra system.
%% @param DataDir a directory to write data.
%% @param StoreId the name of the Khepri store.
%% @param RaServerConfig the skeleton of each Ra server's configuration.
%% @param TImeout a timeout.
%%
%% @returns the ID of the started store in an "ok" tuple, or an error tuple if
%% the store couldn't be started.
start(RaSystemOrDataDir, StoreIdOrRaServerConfig, Timeout) ->
case application:ensure_all_started(khepri) of
{ok, _} ->
ensure_ra_server_config_and_start(
RaSystemOrDataDir, StoreIdOrRaServerConfig, Timeout);
Error ->
Error
end.
-spec ensure_ra_server_config_and_start(
RaSystem | DataDir, StoreId | RaServerConfig, Timeout) ->
Ret when
RaSystem :: atom(),
DataDir :: file:filename_all(),
StoreId :: khepri:store_id(),
RaServerConfig :: incomplete_ra_server_config(),
Timeout :: timeout(),
Ret :: khepri:ok(StoreId) | khepri:error().
%% @private
ensure_ra_server_config_and_start(
RaSystemOrDataDir, StoreIdOrRaServerConfig, Timeout)
when (?IS_DATA_DIR(RaSystemOrDataDir) orelse
?IS_RA_SYSTEM(RaSystemOrDataDir)) andalso
(?IS_KHEPRI_STORE_ID(StoreIdOrRaServerConfig) orelse
is_map(StoreIdOrRaServerConfig)) ->
%% If the store ID derived from `StoreIdOrRaServerConfig' is not an atom,
%% it will cause a case clause exception below.
RaServerConfig =
case StoreIdOrRaServerConfig of
_ when ?IS_KHEPRI_STORE_ID(StoreIdOrRaServerConfig) ->
#{cluster_name => StoreIdOrRaServerConfig};
#{cluster_name := CN} when ?IS_KHEPRI_STORE_ID(CN) ->
StoreIdOrRaServerConfig;
#{} when not is_map_key(cluster_name, StoreIdOrRaServerConfig) ->
StoreIdOrRaServerConfig#{
cluster_name => get_default_store_id()
}
end,
verify_ra_system_and_start(RaSystemOrDataDir, RaServerConfig, Timeout).
-spec verify_ra_system_and_start(
RaSystem | DataDir, RaServerConfig, Timeout) ->
Ret when
RaSystem :: atom(),
DataDir :: file:filename_all(),
RaServerConfig :: ra_server_config_with_cluster_name(),
Timeout :: timeout(),
Ret :: khepri:ok(StoreId) | khepri:error(),
StoreId :: khepri:store_id().
%% @private
verify_ra_system_and_start(RaSystem, RaServerConfig, Timeout)
when ?IS_RA_SYSTEM(RaSystem) ->
ensure_server_started(RaSystem, RaServerConfig, Timeout);
verify_ra_system_and_start(DataDir, RaServerConfig, Timeout)
when is_list(DataDir) ->
RaSystem = ?DEFAULT_RA_SYSTEM_NAME,
DefaultConfig = ra_system:default_config(),
RaSystemConfig = DefaultConfig#{name => RaSystem,
data_dir => DataDir,
wal_data_dir => DataDir,
names => ra_system:derive_names(RaSystem)},
?LOG_DEBUG(
"Starting Ra system \"~s\" using data dir \"~ts\"",
[RaSystem, DataDir]),
case ra_system:start(RaSystemConfig) of
{ok, _} ->
ensure_server_started(RaSystem, RaServerConfig, Timeout);
{error, {already_started, _}} ->
ensure_server_started(RaSystem, RaServerConfig, Timeout);
Error ->
Error
end;
verify_ra_system_and_start(DataDir, RaServerConfig, Timeout)
when is_binary(DataDir) ->
DataDir1 = unicode:characters_to_list(DataDir),
verify_ra_system_and_start(DataDir1, RaServerConfig, Timeout).
-spec ensure_server_started(RaSystem, RaServerConfig, Timeout) -> Ret when
RaSystem :: atom(),
RaServerConfig :: ra_server_config_with_cluster_name(),
Timeout :: timeout(),
Ret :: khepri:ok(StoreId) | khepri:error(),
StoreId :: khepri:store_id().
%% @private
ensure_server_started(
RaSystem, #{cluster_name := StoreId} = RaServerConfig, Timeout) ->
Lock = server_start_lock(StoreId),
global:set_lock(Lock),
try
Ret = ensure_server_started_locked(RaSystem, RaServerConfig, Timeout),
global:del_lock(Lock),
Ret
catch
Class:Reason:Stacktrace ->
global:del_lock(Lock),
erlang:raise(Class, Reason, Stacktrace)
end.
-spec ensure_server_started_locked(RaSystem, RaServerConfig, Timeout) ->
Ret when
RaSystem :: atom(),
RaServerConfig :: ra_server_config_with_cluster_name(),
Timeout :: timeout(),
Ret :: khepri:ok(StoreId) | khepri:error(),
StoreId :: khepri:store_id().
%% @private
ensure_server_started_locked(
RaSystem, #{cluster_name := StoreId} = RaServerConfig, Timeout) ->
ThisMember = this_member(StoreId),
RaServerConfig1 = RaServerConfig#{id => ThisMember},
?LOG_DEBUG(
"Trying to restart local Ra server for store \"~s\" "
"in Ra system \"~s\"",
[StoreId, RaSystem]),
case ra:restart_server(RaSystem, ThisMember) of
{error, name_not_registered} ->
?LOG_DEBUG(
"Ra server for store \"~s\" not registered in Ra system "
"\"~s\", try to start a new one",
[StoreId, RaSystem]),
case do_start_server(RaSystem, RaServerConfig1) of
ok ->
ok = trigger_election(RaServerConfig1, Timeout),
{ok, StoreId};
Error ->
Error
end;
ok ->
ok = remember_store(RaSystem, RaServerConfig1),
{ok, StoreId};
{error, {already_started, _}} ->
{ok, StoreId};
Error ->
Error
end.
-spec do_start_server(RaSystem, RaServerConfig) -> Ret when
RaSystem :: atom(),
RaServerConfig :: ra_server_config_with_id_and_cn(),
Ret :: ok | khepri:error().
%% @private
do_start_server(RaSystem, RaServerConfig) ->
RaServerConfig1 = complete_ra_server_config(RaServerConfig),
#{cluster_name := StoreId} = RaServerConfig1,
?LOG_DEBUG(
"Starting a Ra server with the following configuration:~n~p",
[RaServerConfig1]),
case ra:start_server(RaSystem, RaServerConfig1) of
ok ->
ok = remember_store(RaSystem, RaServerConfig1),
?LOG_DEBUG(
"Started Ra server for store \"~s\"",
[StoreId]),
ok;
{error, _} = Error ->
?LOG_ERROR(
"Failed to start Ra server for store \"~s\" using the "
"following Ra server configuration:~n~p",
[StoreId, RaServerConfig1]),
Error
end.
-spec trigger_election(Member | RaServerConfig, Timeout) -> ok when
Member :: ra:server_id(),
RaServerConfig :: ra_server_config_with_id_and_cn(),
Timeout :: timeout().
%% @private
trigger_election(#{id := Member}, Timeout) ->
trigger_election(Member, Timeout);
trigger_election({StoreId, _Node} = Member, Timeout) ->
?LOG_DEBUG("Trigger election in store \"~s\"", [StoreId]),
ok = ra:trigger_election(Member, Timeout),
ok.
-spec stop() -> Ret when
Ret :: ok | khepri:error().
%% @doc Stops a store.
%%
%% Calling this function is the same as calling `stop(DefaultStoreId)'
%% where `DefaultStoreId' is returned by {@link
%% get_default_store_id/0}.
%%
%% @see stop/1.
stop() ->
StoreId = get_default_store_id(),
stop(StoreId).
-spec stop(StoreId) -> Ret when
StoreId :: khepri:store_id(),
Ret :: ok | khepri:error().
%% @doc Stops a store.
%%
%% @param StoreId the ID of the store to stop.
%%
%% @returns `ok' if it succeeds, an error tuple otherwise.
stop(StoreId) when ?IS_KHEPRI_STORE_ID(StoreId) ->
Lock = server_start_lock(StoreId),
global:set_lock(Lock),
try
Ret = stop_locked(StoreId),
global:del_lock(Lock),
Ret
catch
Class:Reason:Stacktrace ->
global:del_lock(Lock),
erlang:raise(Class, Reason, Stacktrace)
end.
-spec stop_locked(StoreId) -> Ret when
StoreId :: khepri:store_id(),
Ret :: ok | khepri:error().
stop_locked(StoreId) ->
ThisMember = this_member(StoreId),
case get_store_prop(StoreId, ra_system) of
{ok, RaSystem} ->
?LOG_DEBUG(
"Stopping member ~0p in store \"~s\"",
[ThisMember, StoreId]),
case ra:stop_server(RaSystem, ThisMember) of
ok ->
forget_store(StoreId),
wait_for_ra_server_exit(ThisMember);
%% TODO: Handle idempotency: if the Ra server is not running,
%% don't fail.
{error, _} = Error ->
%% We don't call `forget_store()' in case the caller wants
%% to try again.
Error
end;
{error, _} ->
%% The store is unknown, it must have been stopped already.
ok
end.
-spec wait_for_ra_server_exit(Member) -> ok when
Member :: ra:server_id().
%% @private
wait_for_ra_server_exit({StoreId, _} = Member) ->
%% FIXME: This monitoring can go away when/if the following pull request
%% in Ra is merged:
%% https://github.com/rabbitmq/ra/pull/270
?LOG_DEBUG(
"Wait for Ra server ~0p to exit in store \"~s\"",
[Member, StoreId]),
MRef = erlang:monitor(process, Member),
receive
{'DOWN', MRef, _, _, noproc} ->
?LOG_DEBUG(
"Ra server ~0p in store \"~s\" already exited",
[Member, StoreId]),
ok;
{'DOWN', MRef, _, _, Reason} ->
?LOG_DEBUG(
"Ra server ~0p in store \"~s\" exited: ~p",
[Member, StoreId, Reason]),
ok
end.
-spec join(RemoteMember | RemoteNode) -> Ret when
RemoteMember :: ra:server_id(),
RemoteNode :: node(),
Ret :: ok | khepri:error().
%% @doc Adds the local running Khepri store to a remote cluster.
%%
%% This function accepts the following forms:
%% <ul>
%% <li>`join(RemoteNode)'. Calling it is the same as calling
%% `join(DefaultStoreId, RemoteNode)' where `DefaultStoreId' is
%% returned by {@link get_default_store_id/0}.</li>
%% <li>`join(RemoteMember)'. Calling it is the same as calling
%% `join(StoreId, RemoteNode)' where `StoreId' and `RemoteNode' are
%% derived from `RemoteMember'.</li>
%% </ul>
%%
%% @see join/2.
join(RemoteNode) when is_atom(RemoteNode) ->
StoreId = get_default_store_id(),
join(StoreId, RemoteNode);
join({StoreId, RemoteNode} = _RemoteMember) ->
join(StoreId, RemoteNode).
-spec join(
RemoteMember | RemoteNode | StoreId, Timeout | RemoteNode) ->
Ret when
RemoteMember :: ra:server_id(),
RemoteNode :: node(),
StoreId :: khepri:store_id(),
Timeout :: timeout(),
Ret :: ok | khepri:error().
%% @doc Adds the local running Khepri store to a remote cluster.
%%
%% This function accepts the following forms:
%% <ul>
%% <li>`join(RemoteNode, Timeout)'. Calling it is the same as calling
%% `join(DefaultStoreId, RemoteNode, Timeout)' where `DefaultStoreId'
%% is returned by {@link get_default_store_id/0}.</li>
%% <li>`join(StoreId, RemoteNode)'. Calling it is the same as calling
%% `join(StoreId, RemoteNode, DefaultTimeout)' where `DefaultTimeout' is
%% returned by {@link khepri_app:get_default_timeout/0}.</li>
%% <li>`join(RemoteMember, Timeout)'. Calling it is the same as calling
%% `join(StoreId, RemoteNode, Timeout)' where `StoreId' and
%% `RemoteNode' are derived from `RemoteMember'.</li>
%% </ul>
%%
%% @see join/3.
join(RemoteNode, Timeout)
when is_atom(RemoteNode) andalso ?IS_TIMEOUT(Timeout) ->
StoreId = get_default_store_id(),
join(StoreId, RemoteNode, Timeout);
join(StoreId, RemoteNode)
when ?IS_KHEPRI_STORE_ID(StoreId) andalso is_atom(RemoteNode) ->
Timeout = khepri_app:get_default_timeout(),
join(StoreId, RemoteNode, Timeout);
join({StoreId, RemoteNode} = _RemoteMember, Timeout) ->
join(StoreId, RemoteNode, Timeout).
-spec join(StoreId, RemoteMember | RemoteNode, Timeout) -> Ret when
StoreId :: khepri:store_id(),
RemoteMember :: ra:server_id(),
RemoteNode :: node(),
Timeout :: timeout(),
Ret :: ok | khepri:error().
%% @doc Adds the local running Khepri store to a remote cluster.
%%
%% The local Khepri store must have been started with {@link start/3} before
%% it can be added to a cluster. It is also expected that the remote store ID
%% is the same as the local one.
%%
%% `RemoteNode' is the entry point to the remote cluster. It must run for this
%% function to work. A cluster membership also requires a quorum, therefore
%% this join depends on it to succeed.
%%
%% If `RemoteMember' is specified, the remote node is derived from it. At the
%% same time, the function asserts that the specified `StoreId' matches
%% the one derived from `RemoteMember'.
%%
%% As part of this function, the local Khepri store will reset. It means it
%% will leave the cluster it is already part of (if any) and all its data
%% removed. This is also the case if the node is already part of the given
%% cluster (i.e. the joining node will be reset and clustered again).
%%
%% If a clustered node loses its data directory for any reason, this function
%% can be called again to make it join the cluster again and restore its data.
%%
%% @param StoreId the ID of the local Khepri store.
%% @param RemoteNode the name of remote Erlang node running Khepri to join.
%% @param Timeout the timeout.
%%
%% @returns `ok' if it succeeds, an error tuple otherwise.
join(StoreId, RemoteNode, Timeout) when is_atom(RemoteNode) ->
%% We first ping the remote node. It serves two purposes:
%% 1. Make sure we can reach it
%% 2. Make sure they are connected before acquiring a lock, so that the
%% global lock is really global.
case net_adm:ping(RemoteNode) of
pong ->
Lock = server_start_lock(StoreId),
global:set_lock(Lock),
try
Ret = check_status_and_join_locked(
StoreId, RemoteNode, Timeout),
global:del_lock(Lock),
Ret
catch
Class:Reason:Stacktrace ->
global:del_lock(Lock),
erlang:raise(Class, Reason, Stacktrace)
end;
pang ->
Reason = ?khepri_error(
failed_to_join_remote_khepri_node,
#{store_id => StoreId,
node => RemoteNode}),
{error, Reason}
end;
join(StoreId, {StoreId, RemoteNode} = _RemoteMember, Timeout) ->
join(StoreId, RemoteNode, Timeout).
-spec check_status_and_join_locked(StoreId, RemoteNode, Timeout) ->
Ret when
StoreId :: khepri:store_id(),
RemoteNode :: node(),
Timeout :: timeout(),
Ret :: ok | khepri:error().
%% @private
check_status_and_join_locked(StoreId, RemoteNode, Timeout) ->
ThisMember = this_member(StoreId),
RaServerRunning = erlang:is_pid(erlang:whereis(StoreId)),
Prop1 = get_store_prop(StoreId, ra_system),
Prop2 = get_store_prop(StoreId, ra_server_config),
case {RaServerRunning, Prop1, Prop2} of
{true, {ok, RaSystem}, {ok, RaServerConfig}} ->
reset_remotely_and_join_locked(
StoreId, ThisMember, RaSystem, RaServerConfig,
RemoteNode, Timeout);
{false, {error, _} = Error, _} ->
Error;
{false, _, {error, _} = Error} ->
Error;
{false, _, _} ->
?LOG_ERROR(
"Local Ra server ~0p not running for store \"~s\", "
"but properties are still available: ~0p and ~0p",
[ThisMember, StoreId, Prop1, Prop2]),
erlang:error(
?khepri_exception(
ra_server_not_running_but_props_available,
#{store_id => StoreId,
this_member => ThisMember,
ra_system => Prop1,
ra_server_config => Prop2}))
end.
-spec reset_remotely_and_join_locked(
StoreId, ThisMember, RaSystem, RaServerConfig, RemoteNode, Timeout) ->
Ret when
StoreId :: khepri:store_id(),
ThisMember :: ra:server_id(),
RaSystem :: atom(),
RaServerConfig :: ra_server:config(),
RemoteNode :: node(),
Timeout :: timeout(),
Ret :: ok | khepri:error().
%% @private
reset_remotely_and_join_locked(
StoreId, ThisMember, RaSystem, RaServerConfig, RemoteNode, Timeout)
when RemoteNode =/= node() ->
%% We attempt to remove the local Ra server from the remote cluster we
%% want to join.
%%
%% This is usually a no-op because it is not part of it yet. However, if
%% the local Ra server lost its data on disc for whatever reason, the
%% cluster membership view will be inconsistent (the local Ra server won't
%% know about its former cluster anymore).
%%
%% Therefore, it is safer to ask the remote cluster to remove the local Ra
%% server, just in case. If we don't do that and the remote cluster starts
%% to send messages to the local Ra server, the local Ra server might
%% crash with a `leader_saw_append_entries_rpc_in_same_term' exception.
%%
%% TODO: Should we verify the cluster membership first? To avoid resetting
%% a node which is already part of the cluster? On the other hand, such a
%% check would not be atomic and the membership could change between the
%% check and the reset...
%%
%% TODO: Do we want to provide an option to verify the state of the local
%% node before resetting it? Like "if it has data in the Khepri database,
%% abort". It may be difficult to make this kind of check atomic though.
RemoteMember = node_to_member(StoreId, RemoteNode),
?LOG_DEBUG(
"Removing this node (~0p) from the remote node's cluster (~0p) to "
"make sure the membership view is consistent",
[ThisMember, RemoteMember]),
T1 = khepri_utils:start_timeout_window(Timeout),
Ret1 = ra:remove_member(RemoteMember, ThisMember, Timeout),
Timeout1 = khepri_utils:end_timeout_window(Timeout, T1),
case Ret1 of
{ok, _, _} ->
reset_locally_and_join_locked(
StoreId, ThisMember, RaSystem, RaServerConfig, RemoteNode,
Timeout1);
{error, not_member} ->
reset_locally_and_join_locked(
StoreId, ThisMember, RaSystem, RaServerConfig, RemoteNode,
Timeout1);
{error, cluster_change_not_permitted} ->
T2 = khepri_utils:start_timeout_window(Timeout1),
?LOG_DEBUG(
"Remote cluster (reached through node ~0p) is not ready "
"for a membership change yet; waiting...", [RemoteNode]),
Ret2 = wait_for_cluster_change_permitted(StoreId, Timeout1),
Timeout2 = khepri_utils:end_timeout_window(Timeout1, T2),
case Ret2 of
ok ->
reset_remotely_and_join_locked(
StoreId, ThisMember, RaSystem, RaServerConfig,
RemoteNode, Timeout2);
Error ->
Error
end;
{timeout, _} ->
{error, timeout};
{error, _} = Error ->
Error
end;
reset_remotely_and_join_locked(
_StoreId, _ThisMember, _RaSystem, _RaServerConfig, RemoteNode, _Timeout)
when RemoteNode =:= node() ->
ok.
-spec reset_locally_and_join_locked(
StoreId, ThisMember, RaSystem, RaServerConfig, RemoteNode, Timeout) ->
Ret when
StoreId :: khepri:store_id(),
ThisMember :: ra:server_id(),
RaSystem :: atom(),
RaServerConfig :: ra_server:config(),
RemoteNode :: node(),
Timeout :: timeout(),
Ret :: ok | khepri:error().
%% @private
reset_locally_and_join_locked(
StoreId, ThisMember, RaSystem, RaServerConfig, RemoteNode, Timeout) ->
%% The local node is reset in case it is already a standalone elected
%% leader (which would be the case after a successful call to
%% `khepri_cluster:start()') or part of a cluster, and have any data.
%%
%% Just after the reset, we restart it skipping the `trigger_election()'
%% step: this is required so that it does not become a leader before
%% joining the remote node. Otherwise, we hit an assertion in Ra.
T0 = khepri_utils:start_timeout_window(Timeout),
case do_reset(RaSystem, StoreId, ThisMember, Timeout) of
ok ->
NewTimeout = khepri_utils:end_timeout_window(Timeout, T0),
case do_start_server(RaSystem, RaServerConfig) of
ok ->
do_join_locked(
StoreId, ThisMember, RemoteNode, NewTimeout);
Error ->
Error
end;
Error ->
Error
end.
-spec do_join_locked(
StoreId, ThisMember, RemoteNode, Timeout) ->
Ret when
StoreId :: khepri:store_id(),
ThisMember :: ra:server_id(),
RemoteNode :: node(),
Timeout :: timeout(),
Ret :: ok | khepri:error().
%% @private
do_join_locked(StoreId, ThisMember, RemoteNode, Timeout) ->
RemoteMember = node_to_member(StoreId, RemoteNode),
?LOG_DEBUG(
"Adding this node (~0p) to the remote node's cluster (~0p)",
[ThisMember, RemoteMember]),
RemoteMember = node_to_member(StoreId, RemoteNode),
T1 = khepri_utils:start_timeout_window(Timeout),
Ret1 = ra:add_member(RemoteMember, ThisMember, Timeout),
Timeout1 = khepri_utils:end_timeout_window(Timeout, T1),
case Ret1 of
{ok, _, _StoreId} ->
?LOG_DEBUG(
"Cluster for store \"~s\" successfully expanded",
[StoreId]),
ok;
{error, already_member} ->
?LOG_DEBUG(
"This node (~0p) is already a member of the remote node's "
"cluster (~0p)",
[ThisMember, RemoteMember]),
ok;
{error, cluster_change_not_permitted} ->
T2 = khepri_utils:start_timeout_window(Timeout1),
?LOG_DEBUG(
"Remote cluster (reached through node ~0p) is not ready "
"for a membership change yet; waiting...", [RemoteNode]),
Ret2 = wait_for_cluster_change_permitted(RemoteMember, Timeout1),
Timeout2 = khepri_utils:end_timeout_window(Timeout1, T2),
case Ret2 of
ok ->
do_join_locked(
StoreId, ThisMember, RemoteNode, Timeout2);
Error ->
Error
end;
Error ->
?LOG_ERROR(
"Failed to expand cluster for store \"~s\": ~p; aborting",
[StoreId, Error]),
%% After failing to join, the local Ra server is running
%% standalone (after a reset) and needs an election to be in a
%% working state again. We don't care about the result at this
%% point.
_ = trigger_election(ThisMember, Timeout1),
case Error of
{timeout, _} -> {error, timeout};
{error, _} -> Error
end
end.
wait_for_cluster_change_permitted(RaMemberOrStoreId, Timeout) ->
Ret = wait_for_leader(RaMemberOrStoreId, Timeout),
%% We wait for an additional fixed amount of time because the
%% cluster could have a leader and still not be ready to accept
%% a cluster change. This avoids too many retries that will
%% just eat resources.
timer:sleep(200),
Ret.
-spec reset() -> Ret when
Ret :: ok | khepri:error().
%% @doc Resets the store on this Erlang node.
reset() ->
StoreId = get_default_store_id(),
reset(StoreId).
-spec reset(StoreId | Timeout) -> Ret when
StoreId :: khepri:store_id(),
Timeout :: timeout(),
Ret :: ok | khepri:error().
%% @doc Resets the store on this Erlang node.
reset(Timeout)
when ?IS_TIMEOUT(Timeout) ->
StoreId = get_default_store_id(),
reset(StoreId, Timeout);
reset(StoreId)
when ?IS_KHEPRI_STORE_ID(StoreId) ->
Timeout = khepri_app:get_default_timeout(),
reset(StoreId, Timeout).
-spec reset(StoreId, Timeout) -> Ret when
StoreId :: khepri:store_id(),
Timeout :: timeout(),
Ret :: ok | khepri:error().
%% @doc Resets the store on this Erlang node.
%%
%% It does that by force-deleting the Ra local server.
%%
%% This function is also used to gracefully remove the local Khepri store node
%% from a cluster.
%%
%% @param StoreId the name of the Khepri store.
reset(StoreId, Timeout) ->
Lock = server_start_lock(StoreId),
global:set_lock(Lock),
try
Ret = reset_locked(StoreId, Timeout),
global:del_lock(Lock),
Ret
catch
Class:Reason:Stacktrace ->
global:del_lock(Lock),
erlang:raise(Class, Reason, Stacktrace)
end.
reset_locked(StoreId, Timeout) ->
ThisMember = this_member(StoreId),
case get_store_prop(StoreId, ra_system) of
{ok, RaSystem} -> do_reset(RaSystem, StoreId, ThisMember, Timeout);
{error, _} = Error -> Error
end.
do_reset(RaSystem, StoreId, ThisMember, Timeout) ->
?LOG_DEBUG(
"Detaching this node (~0p) in store \"~s\" from its cluster (if any) "
"before reset",
[ThisMember, StoreId]),
T1 = khepri_utils:start_timeout_window(Timeout),
Ret1 = ra:remove_member(ThisMember, ThisMember, Timeout),
Timeout1 = khepri_utils:end_timeout_window(Timeout, T1),
case Ret1 of
{ok, _, _} ->
force_stop(RaSystem, StoreId, ThisMember);
{error, not_member} ->
force_stop(RaSystem, StoreId, ThisMember);
{error, cluster_change_not_permitted} ->
T2 = khepri_utils:start_timeout_window(Timeout1),
?LOG_DEBUG(
"Cluster is not ready for a membership change yet; waiting",
[]),
try
Ret2 = wait_for_cluster_change_permitted(StoreId, Timeout1),
Timeout2 = khepri_utils:end_timeout_window(Timeout1, T2),
case Ret2 of
ok -> do_reset(RaSystem, StoreId, ThisMember, Timeout2);
Error -> Error
end
catch
exit:{normal, _} ->
?LOG_DEBUG(
"The local Ra server exited while we were waiting "
"for it to be ready for a membership change. It "
"means it was removed from the cluster by another "
"member; we can proceed with the reset."),
forget_store(StoreId),
ok
end;
{timeout, _} ->
{error, timeout};
{error, noproc} ->
?LOG_DEBUG(
"The local Ra server exited while we tried to detach it from "
"its cluster. It means it was removed from the cluster by "
"another member; we can proceed with the reset."),
forget_store(StoreId),
ok;
{error, _} = Error ->
Error
end.
force_stop(RaSystem, StoreId, ThisMember) ->
?LOG_DEBUG(
"Resetting member ~0p in store \"~s\"",
[ThisMember, StoreId]),
case ra:force_delete_server(RaSystem, ThisMember) of
ok ->
forget_store(StoreId),
wait_for_ra_server_exit(ThisMember);
{error, noproc} = Error ->
forget_store(StoreId),
Error;
{error, _} = Error ->
Error
end.
-spec get_default_ra_system_or_data_dir() -> RaSystem | DataDir when
RaSystem :: atom(),
DataDir :: file:filename_all().
%% @doc Returns the default Ra system name or data directory.
%%
%% This is based on Khepri's `default_ra_system` application environment
%% variable. The variable can be set to:
%% <ul>
%% <li>A directory (a string or binary) where data should be stored. A new Ra
%% system called `khepri` will be initialized with this directory.</li>
%% <li>A Ra system name (an atom). In this case, the user is expected to
%% configure and start the Ra system before starting Khepri.</li>
%% </ul>
%%
%% If this application environment variable is unset, the default is to
%% configure a Ra system called `khepri' which will write data in
%% `"khepri-$NODE"' in the current working directory where `$NODE' is the
%% Erlang node name.
%%
%% Example of an Erlang configuration file for Khepri:
%% ```
%% {khepri, [{default_ra_system, "/var/db/khepri"}]}.
%% '''
%%
%% @returns the value of the `default_ra_system' application environment
%% variable.
get_default_ra_system_or_data_dir() ->
RaSystemOrDataDir = application:get_env(
khepri, default_ra_system,
generate_default_data_dir()),
if
?IS_DATA_DIR(RaSystemOrDataDir) ->
ok;
?IS_RA_SYSTEM(RaSystemOrDataDir) ->
ok;
true ->
?LOG_ERROR(
"Invalid Ra system or data directory set in "
"`default_ra_system` application environment: ~p",
[RaSystemOrDataDir]),
?khepri_misuse(
invalid_default_ra_system_value,
#{default_ra_system => RaSystemOrDataDir})
end,
RaSystemOrDataDir.
generate_default_data_dir() ->
lists:flatten(io_lib:format("khepri#~s", [node()])).
-spec get_default_store_id() -> StoreId when
StoreId :: khepri:store_id().
%% @doc Returns the default Khepri store ID.
%%
%% This is based on Khepri's `default_store_id' application environment
%% variable. The variable can be set to an atom. The default is `khepri'.
%%
%% @returns the value of the `default_store_id' application environment
%% variable.
get_default_store_id() ->
StoreId = application:get_env(
khepri, default_store_id,
?DEFAULT_STORE_ID),
if
?IS_KHEPRI_STORE_ID(StoreId) ->
ok;
true ->
?LOG_ERROR(
"Invalid store ID set in `default_store_id` "
"application environment: ~p",
[StoreId]),
?khepri_misuse(
invalid_default_store_id_value,
#{default_store_id => StoreId})
end,
StoreId.
-spec members() -> Ret when
Ret :: khepri:ok(Members) | khepri:error(),
Members :: [ra:server_id(), ...].
%% @doc Returns the list of Ra members that are part of the cluster.
%%
%% Calling this function is the same as calling `members(StoreId)' with the
%% default store ID (see {@link khepri_cluster:get_default_store_id/0}).
%%
%% @see members/1.
%% @see members/2.
members() ->
StoreId = get_default_store_id(),
members(StoreId).
-spec members(StoreId) -> Ret when
StoreId :: khepri:store_id(),
Ret :: khepri:ok(Members) | khepri:error(),
Members :: [ra:server_id(), ...].
%% @doc Returns the list of Ra members that are part of the cluster.
%%
%% Calling this function is the same as calling `members(StoreId,
%% DefaultTimeout)' where `DefaultTimeout' is returned by {@link
%% khepri_app:get_default_timeout/0}.
%%
%% @see members/2.
members(StoreId) ->
Timeout = khepri_app:get_default_timeout(),
members(StoreId, Timeout).
-spec members(StoreId, Timeout) -> Ret when
StoreId :: khepri:store_id(),
Timeout :: timeout(),
Ret :: khepri:ok(Members) | khepri:error(),
Members :: [ra:server_id(), ...].
%% @doc Returns the list of Ra members that are part of the cluster.
%%
%% The Ra leader is queried for the list of members, therefore the membership
%% view is consistent with the rest of the cluster.
%%
%% @param StoreId the ID of the store to stop.
%% @param Timeout the timeout.
%%
%% @returns an `{ok, Members}' tuple or an `{error, Reason}' tuple. `Members'
%% is a non-empty list of Ra server IDs.
members(StoreId, Timeout) ->
ThisMember = this_member(StoreId),
do_query_members(StoreId, ThisMember, leader, Timeout).
-spec locally_known_members() -> Ret when
Ret :: khepri:ok(Members) | khepri:error(),
Members :: [ra:server_id(), ...].
%% @doc Returns the list of Ra members that are part of the cluster.
%%
%% Calling this function is the same as calling
%% `locally_known_members(StoreId)' with the default store ID (see {@link
%% khepri_cluster:get_default_store_id/0}).
%%
%% @see locally_known_members/1.
%% @see locally_known_members/2.
locally_known_members() ->
StoreId = get_default_store_id(),
locally_known_members(StoreId).
-spec locally_known_members(StoreId) -> Ret when
StoreId :: khepri:store_id(),
Ret :: khepri:ok(Members) | khepri:error(),
Members :: [ra:server_id(), ...].
%% @doc Returns the list of Ra members that are part of the cluster.
%%
%% Calling this function is the same as calling
%% `locally_known_members(StoreId, DefaultTimeout)' where `DefaultTimeout' is
%% returned by {@link khepri_app:get_default_timeout/0}.
%%
%% @see locally_known_members/2.
locally_known_members(StoreId) ->
Timeout = khepri_app:get_default_timeout(),
locally_known_members(StoreId, Timeout).
-spec locally_known_members(StoreId, Timeout) -> Ret when
StoreId :: khepri:store_id(),
Timeout :: timeout(),
Ret :: khepri:ok(Members) | khepri:error(),
Members :: [ra:server_id(), ...].
%% @doc Returns the list of Ra members that are part of the cluster.
%%
%% The function queries a locally cached value first, then queries the local
%% Ra server. Either way, the returned value may be out-of-date compared to
%% the current membership known by the Ra leader.
%%
%% @param StoreId the ID of the store to stop.
%% @param Timeout the timeout.
%%
%% @returns an `{ok, Members}' tuple or an `{error, Reason}' tuple. `Members'
%% is a non-empty list of Ra server IDs.
locally_known_members(StoreId, Timeout) ->
case ra_leaderboard:lookup_members(StoreId) of
Members when is_list(Members) ->
{ok, lists:sort(Members)};
undefined ->
ThisMember = this_member(StoreId),
do_query_members(StoreId, ThisMember, local, Timeout)
end.
-spec do_query_members(StoreId, RaServer, QueryType, Timeout) -> Ret when
StoreId :: khepri:store_id(),
RaServer :: ra:server_id(),
QueryType :: leader | local,
Timeout :: timeout(),
Ret :: khepri:ok(Members) | khepri:error(),
Members :: [ra:server_id(), ...].
%% @private
do_query_members(StoreId, RaServer, QueryType, Timeout) ->
T0 = khepri_utils:start_timeout_window(Timeout),
Arg = case QueryType of
leader -> RaServer;
local -> {local, RaServer}
end,
case ra:members(Arg, Timeout) of
{ok, Members, _} ->
{ok, lists:sort(Members)};
{error, noproc} = Error
when ?HAS_TIME_LEFT(Timeout) ->
case khepri_utils:is_ra_server_alive(RaServer) of
true ->
NewTimeout0 = khepri_utils:end_timeout_window(Timeout, T0),
NewTimeout = khepri_utils:sleep(
?TRANSIENT_ERROR_RETRY_INTERVAL,
NewTimeout0),
do_query_members(
StoreId, RaServer, QueryType, NewTimeout);
false ->
?LOG_DEBUG(
"Cannot query members in store \"~s\": "
"the store is stopped or non-existent",
[StoreId]),
Error
end;
{error, Reason}
when ?HAS_TIME_LEFT(Timeout) andalso
(Reason == noconnection orelse
Reason == nodedown orelse
Reason == shutdown) ->
NewTimeout0 = khepri_utils:end_timeout_window(Timeout, T0),
NewTimeout = khepri_utils:sleep(
?TRANSIENT_ERROR_RETRY_INTERVAL, NewTimeout0),
do_query_members(
StoreId, RaServer, QueryType, NewTimeout);
{timeout, _} ->
?LOG_WARNING(
"Timeout while querying members in store \"~s\"",
[StoreId]),
{error, timeout};
Error ->
?LOG_WARNING(
"Failed to query members in store \"~s\": ~p",
[StoreId, Error]),
Error
end.
-spec nodes() -> Ret when
Ret :: khepri:ok(Nodes) | khepri:error(),
Nodes :: [node(), ...].
%% @doc Returns the list of Erlang nodes that are part of the cluster.
%%
%% Calling this function is the same as calling `nodes(StoreId)' with the
%% default store ID (see {@link khepri_cluster:get_default_store_id/0}).
%%
%% @see nodes/1.
%% @see nodes/2.
nodes() ->
case members() of
{ok, Members} -> {ok, [Node || {_, Node} <- Members]};
Error -> Error
end.
-spec nodes(StoreId) -> Ret when
StoreId :: khepri:store_id(),
Ret :: khepri:ok(Nodes) | khepri:error(),
Nodes :: [node(), ...].
%% @doc Returns the list of Erlang nodes that are part of the cluster.
%%
%% Calling this function is the same as calling `nodes(StoreId,
%% DefaultTimeout)' where `DefaultTimeout' is returned by {@link
%% khepri_app:get_default_timeout/0}.
%%
%% @see nodes/2.
nodes(StoreId) ->
case members(StoreId) of
{ok, Members} -> {ok, [Node || {_, Node} <- Members]};
Error -> Error
end.
-spec nodes(StoreId, Timeout) -> Ret when
StoreId :: khepri:store_id(),
Timeout :: timeout(),
Ret :: khepri:ok(Nodes) | khepri:error(),
Nodes :: [node(), ...].
%% @doc Returns the list of Erlang nodes that are part of the cluster.
%%
%% The Ra leader is queried for the list of members, therefore the membership
%% view is consistent with the rest of the cluster.
%%
%% @see members/2.
nodes(StoreId, Timeout) ->
case members(StoreId, Timeout) of
{ok, Members} -> {ok, [Node || {_, Node} <- Members]};
Error -> Error
end.
-spec locally_known_nodes() -> Ret when
Ret :: khepri:ok(Nodes) | khepri:error(),
Nodes :: [node(), ...].
%% @doc Returns the list of Erlang nodes that are part of the cluster.
%%
%% Calling this function is the same as calling `locally_known_nodes(StoreId)'
%% with the default store ID (see {@link
%% khepri_cluster:get_default_store_id/0}).
%%
%% @see locally_known_nodes/1.
%% @see locally_known_nodes/2.
locally_known_nodes() ->
case locally_known_members() of
{ok, Members} -> {ok, [Node || {_, Node} <- Members]};
Error -> Error
end.
-spec locally_known_nodes(StoreId) -> Ret when
StoreId :: khepri:store_id(),
Ret :: khepri:ok(Nodes) | khepri:error(),
Nodes :: [node(), ...].
%% @doc Returns the list of Erlang nodes that are part of the cluster.
%%
%% Calling this function is the same as calling `locally_known_nodes(StoreId,
%% DefaultTimeout)' where `DefaultTimeout' is returned by {@link
%% khepri_app:get_default_timeout/0}.
%%
%% @see locally_known_nodes/2.
locally_known_nodes(StoreId) ->
case locally_known_members(StoreId) of
{ok, Members} -> {ok, [Node || {_, Node} <- Members]};
Error -> Error
end.
-spec locally_known_nodes(StoreId, Timeout) -> Ret when
StoreId :: khepri:store_id(),
Timeout :: timeout(),
Ret :: khepri:ok(Nodes) | khepri:error(),
Nodes :: [node(), ...].
%% @doc Returns the list of Erlang nodes that are part of the cluster.
%%
%% The function queries a locally cached value first, then queries the local
%% Ra server. Either way, the returned value may be out-of-date compared to
%% the current membership known by the Ra leader.
%%
%% @see locally_known_members/2.
locally_known_nodes(StoreId, Timeout) ->
case locally_known_members(StoreId, Timeout) of
{ok, Members} -> {ok, [Node || {_, Node} <- Members]};
Error -> Error
end.
-spec wait_for_leader() -> Ret when
Ret :: ok | khepri:error().
%% @doc Waits for a leader to be elected.
%%
%% Calling this function is the same as calling `wait_for_leader(StoreId)'
%% with the default store ID (see {@link
%% khepri_cluster:get_default_store_id/0}).
%%
%% @see wait_for_leader/1.
%% @see wait_for_leader/2.
wait_for_leader() ->
StoreId = get_default_store_id(),
wait_for_leader(StoreId).
-spec wait_for_leader(StoreIdOrRaServer) -> Ret when
StoreIdOrRaServer :: StoreId | RaServer,
StoreId :: khepri:store_id(),
RaServer :: ra:server_id(),
Ret :: ok | khepri:error().
%% @doc Waits for a leader to be elected.
%%
%% Calling this function is the same as calling `wait_for_leader(StoreId,
%% DefaultTimeout)' where `DefaultTimeout' is returned by {@link
%% khepri_app:get_default_timeout/0}.
%%
%% @see wait_for_leader/2.
wait_for_leader(StoreIdOrRaServer) ->
Timeout = khepri_app:get_default_timeout(),
wait_for_leader(StoreIdOrRaServer, Timeout).
-spec wait_for_leader(StoreIdOrRaServer, Timeout) -> Ret when
StoreIdOrRaServer :: StoreId | RaServer,
StoreId :: khepri:store_id(),
RaServer :: ra:server_id(),
Timeout :: timeout(),
Ret :: ok | khepri:error().
%% @doc Waits for a leader to be elected.
%%
%% This is useful if you want to be sure the clustered store is ready before
%% issueing writes and queries. Note that there are obviously no guaranties
%% that the Raft quorum will be lost just after this call.
%%
%% @param StoreId the ID of the store that should elect a leader before this
%% call can return successfully.
%% @param Timeout the timeout.
%%
%% @returns `ok' if a leader was elected or an `{error, Reason}' tuple.
wait_for_leader(StoreId, Timeout) when is_atom(StoreId) ->
ThisMember = this_member(StoreId),
wait_for_leader(ThisMember, Timeout);
wait_for_leader(RaServer, Timeout) ->
T0 = khepri_utils:start_timeout_window(Timeout),
case ra:members(RaServer, Timeout) of
{ok, _Members, _LeaderId} ->
ok;
{error, Reason}
when ?HAS_TIME_LEFT(Timeout) andalso
(Reason == noproc orelse
Reason == noconnection orelse
Reason == nodedown orelse
Reason == shutdown) ->
NewTimeout0 = khepri_utils:end_timeout_window(Timeout, T0),
NewTimeout = khepri_utils:sleep(
?TRANSIENT_ERROR_RETRY_INTERVAL, NewTimeout0),
wait_for_leader(RaServer, NewTimeout);
{timeout, _} ->
{error, timeout};
{error, _} = Error ->
Error
end.
-spec node_to_member(StoreId, Node) -> Member when
StoreId :: khepri:store_id(),
Node :: node(),
Member :: ra:server_id().
%% @private
node_to_member(StoreId, Node) ->
{StoreId, Node}.
-spec this_member(StoreId) -> Member when
StoreId :: khepri:store_id(),
Member :: ra:server_id().
%% @private
this_member(StoreId) ->
ThisNode = node(),
node_to_member(StoreId, ThisNode).
server_start_lock(StoreId) ->
{{khepri, StoreId}, self()}.
complete_ra_server_config(#{cluster_name := StoreId,
id := Member} = RaServerConfig) ->
%% We warn the caller if he sets `initial_members' that the setting will
%% be ignored. The reason is we mandate the use of
%% `khepri_cluster:start()' to start the local node (and can't use on the
%% "auto-cluster start" of Ra) because we also populate a few
%% persistent_term for Ra system & server config bookkeeping.
RaServerConfig1 = case RaServerConfig of
#{initial_members := _} ->
?LOG_WARNING(
"Initial Ra cluster members in the "
"following Ra server config "
"(`initial_members`) will be ignored: ~p",
[RaServerConfig]),
maps:remove(initial_members, RaServerConfig);
_ ->
RaServerConfig
end,
%% We set a default friendly name for this Ra server if the caller didn't
%% set any.
RaServerConfig2 = case RaServerConfig1 of
#{friendly_name := _} ->
RaServerConfig1;
_ ->
FriendlyName = lists:flatten(
io_lib:format(
"Khepri store \"~s\"",
[StoreId])),
RaServerConfig1#{friendly_name => FriendlyName}
end,
UId = ra:new_uid(ra_lib:to_binary(StoreId)),
MachineConfig0 = case RaServerConfig of
#{machine_config := MachineConfig00} ->
MachineConfig00;
_ ->
#{}
end,
MachineConfig = MachineConfig0#{store_id => StoreId,
member => Member},
Machine = {module, khepri_machine, MachineConfig},
LogInitArgs0 = #{uid => UId},
LogInitArgs = case MachineConfig0 of
#{snapshot_interval := SnapshotInterval} ->
%% Ra takes a snapshot when the number of applied
%% commands is _greater than_ the interval (not
%% equal), so we need to subtract one from Khepri's
%% configured snapshot interval so that Ra snapshots
%% exactly at the interval.
MinSnapshotInterval = SnapshotInterval - 1,
LogInitArgs0#{min_snapshot_interval =>
MinSnapshotInterval};
_ ->
LogInitArgs0
end,
RaServerConfig2#{uid => UId,
log_init_args => LogInitArgs,
machine => Machine}.
-define(PT_STORE_IDS, {khepri, store_ids}).
-spec remember_store(RaSystem, RaServerConfig) -> ok when
RaSystem :: atom(),
RaServerConfig :: ra_server:ra_server_config().
%% @private
remember_store(RaSystem, #{cluster_name := StoreId} = RaServerConfig) ->
?assert(maps:is_key(id, RaServerConfig)),
StoreIds = persistent_term:get(?PT_STORE_IDS, #{}),
Props = #{ra_system => RaSystem,
ra_server_config => RaServerConfig},
StoreIds1 = StoreIds#{StoreId => Props},
persistent_term:put(?PT_STORE_IDS, StoreIds1),
ok.
-spec get_store_prop(StoreId, PropName) -> Ret when
StoreId :: khepri:store_id(),
PropName :: ra_system | ra_server_config,
PropValue :: any(),
Ret :: khepri:ok(PropValue) |
khepri:error(?khepri_error(
not_a_khepri_store,
#{store_id := StoreId})).
%% @private
get_store_prop(StoreId, PropName) ->
case persistent_term:get(?PT_STORE_IDS, #{}) of
#{StoreId := #{PropName := PropValue}} ->
{ok, PropValue};
_ ->
Reason = ?khepri_error(
not_a_khepri_store,
#{store_id => StoreId}),
{error, Reason}
end.
-spec forget_store(StoreId) -> ok when
StoreId :: khepri:store_id().
%% @private
forget_store(StoreId) ->
ok = khepri_machine:clear_cache(StoreId),
StoreIds = persistent_term:get(?PT_STORE_IDS, #{}),
StoreIds1 = maps:remove(StoreId, StoreIds),
case maps:size(StoreIds1) of
0 -> _ = persistent_term:erase(?PT_STORE_IDS);
_ -> ok = persistent_term:put(?PT_STORE_IDS, StoreIds1)
end,
ok.
-spec get_store_ids() -> [StoreId] when
StoreId :: khepri:store_id().
%% @doc Returns the list of running stores.
get_store_ids() ->
StoreIds0 = maps:keys(persistent_term:get(?PT_STORE_IDS, #{})),
StoreIds1 = lists:filter(fun is_store_running/1, StoreIds0),
StoreIds1.
-spec is_store_running(StoreId) -> IsRunning when
StoreId :: khepri:store_id(),
IsRunning :: boolean().
%% @doc Indicates if `StoreId' is running or not.
is_store_running(StoreId) ->
%% FIXME: Ra has no API to know if a Ra server is running or not. We could
%% use a public API such as `ra:key_metrics/2', but unfortunately, it is
%% not as efficient as querying the process directly. Therefore, we bypass
%% Ra and rely on the fact that the server ID is internally a registered
%% process name and resolve it to determine if it is running.
Runs = erlang:whereis(StoreId) =/= undefined,
%% We know the real state of the Ra server. In the case the Ra server
%% stopped behind the back of Khepri, we update the cached list of running
%% stores as a side effect here.
StoreIds = persistent_term:get(?PT_STORE_IDS, #{}),
case maps:is_key(StoreId, StoreIds) of
true when Runs ->
ok;
false when not Runs ->
ok;
false when Runs ->
%% This function was called between the start of the Ra server and
%% the record of its configuration. This is a race, but that's ok.
ok;
true when not Runs ->
?LOG_DEBUG(
"Ra server for store ~s stopped behind the back of Khepri",
[StoreId]),
forget_store(StoreId)
end,
Runs.