%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright © 2021-2024 Broadcom. All Rights Reserved. The term "Broadcom"
%% refers to Broadcom Inc. and/or its subsidiaries.
%%
%% @doc
%% Khepri private low-level API.
%%
%% This module exposes the private "low-level" API to the Khepri database and
%% state machine. Main functions correspond to Ra commands implemented by the
%% state machine. All functions in {@link khepri} are built on top of this
%% module.
%%
%% This module is private. The documentation is still visible because it may
%% help understand some implementation details. However, this module should
%% never be called directly outside of Khepri.
%%
%% == State machine history ==
%%
%% <table>
%% <tr><th>Version</th><th>What changed</th></tr>
%% <tr>
%% <td style="text-align: right; vertical-align: top;">0</td>
%% <td>Initial version</td>
%% </tr>
%% <tr>
%% <td style="text-align: right; vertical-align: top;">1</td>
%% <td>
%% <ul>
%% <li>Added deduplication mechanism:
%% <ul>
%% <li>new command option `protect_against_dups'</li>
%% <li>new commands `#dedup{}' and `#dedup_ack{}'</li>
%% <li>new state field `dedups'</li>
%% </ul></li>
%% <li>Added command `#unregister_projections{}'. The previous
%% `#unregister_projection{}' is still supported for backward-compatibility but
%% it is no longer created.</li>
%% </ul>
%% </td>
%% </tr>
%% </table>
-module(khepri_machine).
-behaviour(ra_machine).
-include_lib("kernel/include/logger.hrl").
-include_lib("stdlib/include/assert.hrl").
-include_lib("horus/include/horus.hrl").
-include("include/khepri.hrl").
-include("src/khepri_cluster.hrl").
-include("src/khepri_error.hrl").
-include("src/khepri_evf.hrl").
-include("src/khepri_machine.hrl").
-include("src/khepri_ret.hrl").
-include("src/khepri_tx.hrl").
-include("src/khepri_projection.hrl").
-export([fold/5,
fence/2,
put/4,
delete/3,
transaction/5,
register_trigger/5,
register_projection/4,
unregister_projections/3]).
-export([get_keep_while_conds_state/2,
get_projections_state/2]).
%% ra_machine callbacks.
-export([init/1,
init_aux/1,
handle_aux/6,
apply/3,
state_enter/2,
snapshot_installed/4,
overview/1,
version/0,
which_module/1]).
%% For internal use only.
-export([clear_cache/1,
ack_triggers_execution/2,
split_query_options/1,
split_command_options/1,
split_put_options/1,
insert_or_update_node/6,
delete_matching_nodes/4,
handle_tx_exception/1,
process_query/3,
process_command/3]).
%% Internal functions to access the opaque #khepri_machine{} state.
-export([is_state/1,
ensure_is_state/1,
get_tree/1,
get_root/1,
get_keep_while_conds/1,
get_keep_while_conds_revidx/1,
get_triggers/1,
get_emitted_triggers/1,
get_projections/1,
has_projection/2,
get_metrics/1,
get_dedups/1]).
-ifdef(TEST).
-export([make_virgin_state/1,
convert_state/3,
set_tree/2]).
-endif.
-compile({no_auto_import, [apply/3]}).
-type props() :: #{payload_version := khepri:payload_version(),
child_list_version := khepri:child_list_version()}.
%% Properties attached to each node in the tree structure.
-type triggered() :: #triggered{}.
-type command() :: #put{} |
#delete{} |
#tx{} |
#register_trigger{} |
#ack_triggered{} |
#register_projection{} |
#unregister_projections{} |
#dedup{} |
#dedup_ack{}.
%% Commands specific to this Ra machine.
-type old_command() :: #unregister_projection{}.
%% Old commands that are still accepted by the Ra machine but never created.
%%
%% Even though Khepri no longer creates these commands, they may still be
%% present in existing Ra log files and thus be applied after an ugprade of
%% Khepri.
%%
%% We keep them supported for backward-compatibility.
-type machine_init_args() :: #{store_id := khepri:store_id(),
member := ra:server_id(),
snapshot_interval => non_neg_integer(),
commands => [command()],
atom() => any()}.
%% Structure passed to {@link init/1}.
-type machine_config() :: #config{}.
%% Configuration record, holding read-only or rarely changing fields.
%% State machine's internal state record.
-record(khepri_machine,
{config = #config{} :: khepri_machine:machine_config(),
tree = #tree{} :: khepri_tree:tree(),
triggers = #{} :: khepri_machine:triggers_map(),
emitted_triggers = [] :: [khepri_machine:triggered()],
projections = khepri_pattern_tree:empty() ::
khepri_machine:projection_tree(),
metrics = #{} :: khepri_machine:metrics(),
%% Added in machine version 1.
dedups = #{} :: khepri_machine:dedups_map()}).
-opaque state_v1() :: #khepri_machine{}.
%% State of this Ra state machine, version 1.
-type state() :: state_v1() | khepri_machine_v0:state().
%% State of this Ra state machine.
-type triggers_map() :: #{khepri:trigger_id() =>
#{sproc := khepri_path:native_path(),
event_filter := khepri_evf:event_filter()}}.
%% Internal triggers map in the machine state.
-type metrics() :: #{applied_command_count => non_neg_integer()}.
%% Internal state machine metrics.
-type dedups_map() :: #{reference() => {any(), integer()}}.
%% Map to handle command deduplication.
-type aux_state() :: #khepri_machine_aux{}.
%% Auxiliary state of this Ra state machine.
-type query_fun() :: fun((state()) -> any()).
%% Function representing a query and used {@link process_query/3}.
-type common_ret() :: khepri:ok(khepri_adv:node_props_map()) |
khepri:error().
-type tx_ret() :: khepri:ok(khepri_tx:tx_fun_result()) |
khepri_tx:tx_abort() |
no_return().
-type async_ret() :: ok.
-type projection_tree() :: khepri_pattern_tree:tree(
[khepri_projection:projection()]).
%% A pattern tree that holds all registered projections in the machine's state.
-type projection_map() :: #{khepri_projection:name() => khepri_path:pattern()}.
%% A mapping between the names of projections and patterns to which each
%% projection is registered.
-export_type([common_ret/0,
tx_ret/0,
async_ret/0,
machine_init_args/0,
state/0,
state_v1/0,
machine_config/0,
triggers_map/0,
metrics/0,
dedups_map/0,
props/0,
triggered/0,
projection_tree/0,
projection_map/0,
command/0,
old_command/0]).
-define(PROJECTION_PROPS_TO_RETURN, [payload_version,
child_list_version,
child_list_length,
child_names,
payload]).
-define(DEFAULT_RA_COMMAND_CORRELATION, no_correlation).
-define(DEFAULT_RA_COMMAND_PRIORITY, normal).
-define(IS_RA_COMMAND_CORRELATION(Correlation),
(is_integer(Correlation) orelse is_reference(Correlation))).
-define(IS_RA_COMMAND_PRIORITY(Priority),
(Priority =:= normal orelse Priority =:= low)).
%% -------------------------------------------------------------------
%% Machine protocol.
%% -------------------------------------------------------------------
%% TODO: Verify arguments carefully to avoid the construction of an invalid
%% command.
-spec fold(StoreId, PathPattern, Fun, Acc, Options) -> Ret when
StoreId :: khepri:store_id(),
PathPattern :: khepri_path:pattern(),
Fun :: khepri:fold_fun(),
Acc :: khepri:fold_acc(),
Options :: khepri:query_options() | khepri:tree_options(),
Ret :: khepri:ok(NewAcc) | khepri:error(),
NewAcc :: Acc.
%% @doc Returns all tree nodes matching the given path pattern.
%%
%% @param StoreId the name of the Ra cluster.
%% @param PathPattern the path (or path pattern) to the nodes to get.
%% @param Options query options such as `favor'.
%%
%% @returns an `{ok, NodePropsMap}' tuple with a map with zero, one or more
%% entries, or an `{error, Reason}' tuple.
fold(StoreId, PathPattern, Fun, Acc, Options)
when ?IS_KHEPRI_STORE_ID(StoreId) andalso
is_function(Fun, 3) ->
PathPattern1 = khepri_path:from_string(PathPattern),
khepri_path:ensure_is_valid(PathPattern1),
{QueryOptions, TreeOptions} = split_query_options(Options),
Query = fun(State) ->
Tree = get_tree(State),
try
khepri_tree:fold(
Tree, PathPattern1, Fun, Acc, TreeOptions)
catch
Class:Reason:Stacktrace ->
{exception, Class, Reason, Stacktrace}
end
end,
case process_query(StoreId, Query, QueryOptions) of
{exception, _, _, _} = Exception -> handle_tx_exception(Exception);
Ret -> Ret
end.
-spec fence(StoreId, Timeout) -> Ret when
StoreId :: khepri:store_id(),
Timeout :: timeout(),
Ret :: ok | khepri:error().
%% @doc Blocks until all updates received by the cluster leader are applied
%% locally.
%%
%% @param StoreId the name of the Ra cluster
%% @param Timeout the time limit after which the call returns with an error.
%%
%% @returns `ok' or an `{error, Reason}' tuple.
fence(StoreId, Timeout) ->
QueryFun = fun erlang:is_tuple/1,
Options = #{favor => consistency,
timeout => Timeout},
case process_query(StoreId, QueryFun, Options) of
true -> ok;
Other when Other =/= false -> Other
end.
-spec put(StoreId, PathPattern, Payload, Options) -> Ret when
StoreId :: khepri:store_id(),
PathPattern :: khepri_path:pattern(),
Payload :: khepri_payload:payload(),
Options :: khepri:command_options() |
khepri:tree_options() |
khepri:put_options(),
Ret :: khepri_machine:common_ret() | khepri_machine:async_ret().
%% @doc Creates or modifies a specific tree node in the tree structure.
%%
%% @param StoreId the name of the Ra cluster.
%% @param PathPattern the path (or path pattern) to the node to create or
%% modify.
%% @param Payload the payload to put in the specified node.
%% @param Options command, tree and put options.
%%
%% @returns in the case of a synchronous put, an `{ok, NodePropsMap}' tuple
%% with a map with zero, one or more entries, or an `{error, Reason}' tuple;
%% in the case of an asynchronous put, always `ok' (the actual return value
%% may be sent by a message if a correlation ID was specified).
%%
%% @private
put(StoreId, PathPattern, Payload, Options)
when ?IS_KHEPRI_STORE_ID(StoreId) andalso ?IS_KHEPRI_PAYLOAD(Payload) ->
PathPattern1 = khepri_path:from_string(PathPattern),
khepri_path:ensure_is_valid(PathPattern1),
Payload1 = khepri_payload:prepare(Payload),
{CommandOptions, TreeAndPutOptions} = split_command_options(Options),
Command = #put{path = PathPattern1,
payload = Payload1,
options = TreeAndPutOptions},
process_command(StoreId, Command, CommandOptions);
put(_StoreId, PathPattern, Payload, _Options) ->
?khepri_misuse(invalid_payload, #{path => PathPattern,
payload => Payload}).
-spec delete(StoreId, PathPattern, Options) -> Ret when
StoreId :: khepri:store_id(),
PathPattern :: khepri_path:pattern(),
Options :: khepri:command_options() | khepri:tree_options(),
Ret :: khepri_machine:common_ret() | khepri_machine:async_ret().
%% @doc Deletes all tree nodes matching the path pattern.
%%
%% @param StoreId the name of the Ra cluster.
%% @param PathPattern the path (or path pattern) to the nodes to delete.
%% @param Options command options such as the command type.
%%
%% @returns in the case of a synchronous delete, an `{ok, NodePropsMap}' tuple
%% with a map with zero, one or more entries, or an `{error, Reason}' tuple;
%% in the case of an asynchronous put, always `ok' (the actual return value
%% may be sent by a message if a correlation ID was specified).
delete(StoreId, PathPattern, Options) when ?IS_KHEPRI_STORE_ID(StoreId) ->
PathPattern1 = khepri_path:from_string(PathPattern),
khepri_path:ensure_is_valid(PathPattern1),
{CommandOptions, TreeOptions} = split_command_options(Options),
%% TODO: Ensure `PutOptions' are not set this map.
Command = #delete{path = PathPattern1,
options = TreeOptions},
process_command(StoreId, Command, CommandOptions).
-spec transaction(StoreId, FunOrPath, Args, ReadWrite, Options) -> Ret when
StoreId :: khepri:store_id(),
FunOrPath :: Fun | PathPattern,
Fun :: khepri_tx:tx_fun(),
PathPattern :: khepri_path:pattern(),
Args :: list(),
ReadWrite :: ro | rw | auto,
Options :: khepri:command_options() | khepri:query_options(),
Ret :: khepri_machine:tx_ret() | khepri_machine:async_ret().
%% @doc Runs a transaction and returns the result.
%%
%% @param StoreId the name of the Ra cluster.
%% @param FunOrPath an arbitrary anonymous function or a path pattern pointing
%% to a stored procedure.
%% @param Args a list of arguments to pass to `FunOrPath'.
%% @param ReadWrite the read/write or read-only nature of the transaction.
%% @param Options command options such as the command type.
%%
%% @returns in the case of a synchronous transaction, `{ok, Result}' where
%% `Result' is the return value of `FunOrPath', or `{error, Reason}' if the
%% anonymous function was aborted; in the case of an asynchronous transaction,
%% always `ok' (the actual return value may be sent by a message if a
%% correlation ID was specified).
transaction(StoreId, Fun, Args, auto = ReadWrite, Options)
when ?IS_KHEPRI_STORE_ID(StoreId) andalso
is_list(Args) andalso
is_function(Fun, length(Args)) andalso
is_map(Options) ->
case khepri_tx_adv:to_standalone_fun(Fun, ReadWrite) of
StandaloneFun when ?IS_HORUS_STANDALONE_FUN(StandaloneFun) ->
readwrite_transaction(StoreId, StandaloneFun, Args, Options);
_ ->
readonly_transaction(StoreId, Fun, Args, Options)
end;
transaction(StoreId, PathPattern, Args, auto, Options)
when ?IS_KHEPRI_STORE_ID(StoreId) andalso
?IS_KHEPRI_PATH_PATTERN(PathPattern) andalso
is_list(Args) andalso
is_map(Options) ->
PathPattern1 = khepri_path:from_string(PathPattern),
khepri_path:ensure_is_valid(PathPattern1),
readwrite_transaction(StoreId, PathPattern1, Args, Options);
transaction(StoreId, Fun, Args, rw = ReadWrite, Options)
when ?IS_KHEPRI_STORE_ID(StoreId) andalso
is_list(Args) andalso
is_function(Fun, length(Args)) andalso
is_map(Options) ->
StandaloneFun = khepri_tx_adv:to_standalone_fun(Fun, ReadWrite),
readwrite_transaction(StoreId, StandaloneFun, Args, Options);
transaction(StoreId, PathPattern, Args, rw, Options)
when ?IS_KHEPRI_STORE_ID(StoreId) andalso
?IS_KHEPRI_PATH_PATTERN(PathPattern) andalso
is_list(Args) andalso
is_map(Options) ->
PathPattern1 = khepri_path:from_string(PathPattern),
khepri_path:ensure_is_valid(PathPattern1),
readwrite_transaction(StoreId, PathPattern1, Args, Options);
transaction(StoreId, Fun, Args, ro, Options)
when ?IS_KHEPRI_STORE_ID(StoreId) andalso
is_list(Args) andalso
is_function(Fun, length(Args)) andalso
is_map(Options) ->
readonly_transaction(StoreId, Fun, Args, Options);
transaction(StoreId, PathPattern, Args, ro, Options)
when ?IS_KHEPRI_STORE_ID(StoreId) andalso
?IS_KHEPRI_PATH_PATTERN(PathPattern) andalso
is_list(Args) andalso
is_map(Options) ->
PathPattern1 = khepri_path:from_string(PathPattern),
khepri_path:ensure_is_valid(PathPattern1),
readonly_transaction(StoreId, PathPattern1, Args, Options);
transaction(StoreId, Fun, Args, ReadWrite, Options)
when ?IS_KHEPRI_STORE_ID(StoreId) andalso
is_function(Fun) andalso
is_list(Args) andalso
is_atom(ReadWrite) andalso
is_map(Options) ->
{arity, Arity} = erlang:fun_info(Fun, arity),
?khepri_misuse(
denied_tx_fun_with_invalid_args,
#{'fun' => Fun, arity => Arity, args => Args}).
-spec readonly_transaction(StoreId, FunOrPath, Args, Options) -> Ret when
StoreId :: khepri:store_id(),
FunOrPath :: Fun | PathPattern,
Fun :: khepri_tx:tx_fun(),
PathPattern :: khepri_path:pattern(),
Args :: list(),
Options :: khepri:query_options(),
Ret :: khepri_machine:tx_ret().
readonly_transaction(StoreId, Fun, Args, Options)
when is_list(Args) andalso is_function(Fun, length(Args)) ->
Query = fun(State) ->
%% It is a read-only transaction, therefore we assert that
%% the state is unchanged and that there are no side
%% effects.
{State, Ret, []} = khepri_tx_adv:run(
State, Fun, Args, false),
Ret
end,
case process_query(StoreId, Query, Options) of
{exception, _, _, _} = Exception ->
handle_tx_exception(Exception);
Ret ->
{ok, Ret}
end;
readonly_transaction(StoreId, PathPattern, Args, Options)
when ?IS_KHEPRI_PATH_PATTERN(PathPattern) andalso is_list(Args) ->
Query = fun(State) ->
%% It is a read-only transaction, therefore we assert that
%% the state is unchanged and that there are no side
%% effects.
{State, Ret, []} = locate_sproc_and_execute_tx(
State, PathPattern, Args, false),
Ret
end,
case process_query(StoreId, Query, Options) of
{exception, _, _, _} = Exception ->
handle_tx_exception(Exception);
Ret ->
{ok, Ret}
end.
-spec readwrite_transaction(StoreId, FunOrPath, Args, Options) -> Ret when
StoreId :: khepri:store_id(),
FunOrPath :: Fun | PathPattern,
Fun :: horus:horus_fun(),
PathPattern :: khepri_path:pattern(),
Args :: list(),
Options :: khepri:command_options(),
Ret :: khepri_machine:tx_ret() | khepri_machine:async_ret().
readwrite_transaction(
StoreId, Fun, Args, Options)
when is_list(Args) andalso
(is_function(Fun, length(Args)) orelse
?IS_HORUS_STANDALONE_FUN(Fun, length(Args))) ->
readwrite_transaction1(StoreId, Fun, Args, Options);
readwrite_transaction(
StoreId, PathPattern, Args, Options)
when ?IS_KHEPRI_PATH_PATTERN(PathPattern) andalso is_list(Args) ->
PathPattern1 = khepri_path:from_string(PathPattern),
khepri_path:ensure_is_valid(PathPattern1),
readwrite_transaction1(StoreId, PathPattern1, Args, Options).
readwrite_transaction1(StoreId, StandaloneFunOrPath, Args, Options) ->
Command = #tx{'fun' = StandaloneFunOrPath, args = Args},
Options1 = maps:merge(#{protect_against_dups => true}, Options),
case process_command(StoreId, Command, Options1) of
{exception, _, _, _} = Exception ->
handle_tx_exception(Exception);
ok = Ret ->
CommandType = select_command_type(Options),
case CommandType of
sync -> {ok, Ret};
{async, _, _} -> Ret
end;
Ret ->
{ok, Ret}
end.
handle_tx_exception(
{exception, _, ?TX_ABORT(Reason), _}) ->
{error, Reason};
handle_tx_exception(
{exception, error, ?khepri_exception(_, _) = Reason, _Stacktrace}) ->
%% If the exception is a programming misuse of Khepri, we
%% re-throw a new exception instead of using `erlang:raise()'.
%%
%% The reason is that the default stacktrace is limited to 8 frames by
%% default (see `erlang:system_flag(backtrace_depth, Depth)' to reconfigure
%% it). Most if not all of those 8 frames might be taken by Khepri's
%% internal calls, making the stacktrace uninformative to the caller.
%%
%% By throwing a new exception, we increase the chance that there
%% is a frame pointing to the transaction function.
?khepri_misuse(Reason);
handle_tx_exception(
{exception, Class, Reason, Stacktrace}) ->
erlang:raise(Class, Reason, Stacktrace).
-spec register_trigger(
StoreId, TriggerId, EventFilter, StoredProcPath, Options) ->
Ret when
StoreId :: khepri:store_id(),
TriggerId :: khepri:trigger_id(),
EventFilter :: khepri_evf:event_filter() | khepri_path:pattern(),
StoredProcPath :: khepri_path:path(),
Options :: khepri:command_options(),
Ret :: ok | khepri:error().
%% @doc Registers a trigger.
%%
%% @param StoreId the name of the Ra cluster.
%% @param TriggerId the name of the trigger.
%% @param EventFilter the event filter used to associate an event with a
%% stored procedure.
%% @param StoredProcPath the path to the stored procedure to execute when the
%% corresponding event occurs.
%%
%% @returns `ok' if the trigger was registered, an `{error, Reason}' tuple
%% otherwise.
register_trigger(StoreId, TriggerId, EventFilter, StoredProcPath, Options)
when ?IS_KHEPRI_STORE_ID(StoreId) ->
EventFilter1 = khepri_evf:wrap(EventFilter),
StoredProcPath1 = khepri_path:from_string(StoredProcPath),
khepri_path:ensure_is_valid(StoredProcPath1),
Command = #register_trigger{id = TriggerId,
sproc = StoredProcPath1,
event_filter = EventFilter1},
process_command(StoreId, Command, Options).
-spec register_projection(StoreId, PathPattern, Projection, Options) ->
Ret when
StoreId :: khepri:store_id(),
PathPattern :: khepri_path:pattern(),
Projection :: khepri_projection:projection(),
Options :: khepri:command_options(),
Ret :: ok | khepri:error().
%% @doc Registers a projection.
%%
%% @param StoreId the name of the Ra cluster.
%% @param PathPattern the pattern of tree nodes which should be projected.
%% @param Projection the projection record created with {@link
%% khepri_projection:new/3}.
%% @param Options command options such as the command type.
%%
%% @returns `ok' if the projection was registered, an `{error, Reason}' tuple
%% otherwise.
register_projection(
StoreId, PathPattern0,
#khepri_projection{name = Name,
projection_fun = ProjectionFun,
ets_options = EtsOptions} = Projection,
Options)
when is_atom(Name) andalso
is_list(EtsOptions) andalso
(?IS_HORUS_STANDALONE_FUN(ProjectionFun) orelse
ProjectionFun =:= copy) ->
PathPattern = khepri_path:from_string(PathPattern0),
khepri_path:ensure_is_valid(PathPattern),
Command = #register_projection{pattern = PathPattern,
projection = Projection},
process_command(StoreId, Command, Options).
-spec unregister_projections(StoreId, Names, Options) -> Ret when
StoreId :: khepri:store_id(),
Names :: all | [khepri_projection:name()],
Options :: khepri:command_options(),
Ret :: khepri:ok(khepri_machine:projection_map()) | khepri:error().
%% @doc Removes the given projections from the store.
%%
%% `Names' may either be a list of projection names to remove or the atom
%% `all'. When `all' is passed, every projection in the store is removed.
%%
%% @param StoreId the name of the Ra cluster.
%% @param Names the names of projections to unregister or the atom `all' to
%% remove all projections.
%% @param Options command options such as the command type.
%%
%% @returns `{ok, ProjectionMap}' if the command succeeds, `{error, Reason}'
%% otherwise. The `ProjectionMap' is a map with projection names ({@link
%% khepri_projection:name()} keys associated to the pattern to which each
%% projection was registered.
unregister_projections(StoreId, Names, Options)
when ?IS_KHEPRI_STORE_ID(StoreId) andalso
(Names =:= all orelse is_list(Names)) andalso
is_map(Options) ->
Command = #unregister_projections{names = Names},
process_command(StoreId, Command, Options).
-spec ack_triggers_execution(StoreId, TriggeredStoredProcs) ->
Ret when
StoreId :: khepri:store_id(),
TriggeredStoredProcs :: [triggered()],
Ret :: ok | khepri:error().
%% @doc Acknowledges the execution of a trigger.
%%
%% This is part of a mechanism to ensure that a trigger is executed at least
%% once.
%%
%% @private
ack_triggers_execution(StoreId, TriggeredStoredProcs)
when ?IS_KHEPRI_STORE_ID(StoreId) ->
Command = #ack_triggered{triggered = TriggeredStoredProcs},
process_command(StoreId, Command, #{async => true}).
-spec get_keep_while_conds_state(StoreId, Options) -> Ret when
StoreId :: khepri:store_id(),
Options :: khepri:query_options(),
Ret :: khepri:ok(khepri_tree:keep_while_conds_map()) | khepri:error().
%% @doc Returns the `keep_while' conditions internal state.
%%
%% The returned state consists of all the `keep_while' condition set so far.
%% However, it doesn't include the reverse index.
%%
%% @param StoreId the name of the Ra cluster.
%%
%% @returns the `keep_while' conditions internal state.
%%
%% @private
get_keep_while_conds_state(StoreId, Options)
when ?IS_KHEPRI_STORE_ID(StoreId) ->
Query = fun(State) ->
KeepWhileConds = get_keep_while_conds(State),
{ok, KeepWhileConds}
end,
process_query(StoreId, Query, Options).
-spec get_projections_state(StoreId, Options) -> Ret when
StoreId :: khepri:store_id(),
Options :: khepri:query_options(),
Ret :: khepri:ok(ProjectionState) | khepri:error(),
ProjectionState :: khepri_pattern_tree:tree(Projection),
Projection :: khepri_projection:projection().
%% @doc Returns the `projections' internal state.
%%
%% The returned state is a pattern tree containing the projections registered
%% in the store. (See {@link khepri_pattern_tree:tree()} and {@link
%% khepri_projection:projection()}.)
%%
%% @see khepri_pattern_tree.
%% @see khepri_projection.
%%
%% @private
get_projections_state(StoreId, Options)
when ?IS_KHEPRI_STORE_ID(StoreId) ->
Query = fun(State) ->
Projections = get_projections(State),
{ok, Projections}
end,
process_query(StoreId, Query, Options).
-spec split_query_options(Options) -> {QueryOptions, TreeOptions} when
Options :: QueryOptions | TreeOptions,
QueryOptions :: khepri:query_options(),
TreeOptions :: khepri:tree_options().
%% @private
split_query_options(Options) ->
Options1 = set_default_options(Options),
maps:fold(
fun
(Option, Value, {Q, T}) when
Option =:= condition orelse
Option =:= timeout orelse
Option =:= favor ->
Q1 = Q#{Option => Value},
{Q1, T};
(props_to_return, [], {Q, T}) ->
{Q, T};
(Option, Value, {Q, T}) when
Option =:= expect_specific_node orelse
Option =:= props_to_return orelse
Option =:= include_root_props ->
T1 = T#{Option => Value},
{Q, T1}
end, {#{}, #{}}, Options1).
-spec split_command_options(Options) ->
{CommandOptions, TreeAndPutOptions} when
Options :: CommandOptions | TreeAndPutOptions,
CommandOptions :: khepri:command_options(),
TreeAndPutOptions :: khepri:tree_options() | khepri:put_options().
%% @private
split_command_options(Options) ->
Options1 = set_default_options(Options),
maps:fold(
fun
(Option, Value, {C, TP}) when
Option =:= reply_from orelse
Option =:= timeout orelse
Option =:= async ->
C1 = C#{Option => Value},
{C1, TP};
(props_to_return, [], Acc) ->
Acc;
(Option, Value, {C, TP}) when
Option =:= expect_specific_node orelse
Option =:= props_to_return orelse
Option =:= include_root_props ->
TP1 = TP#{Option => Value},
{C, TP1};
(keep_while, KeepWhile, {C, TP}) ->
%% `keep_while' is kept in `TreeAndPutOptions' here. The state
%% machine will extract it in `apply()'.
KeepWhile1 = khepri_condition:ensure_native_keep_while(
KeepWhile),
TP1 = TP#{keep_while => KeepWhile1},
{C, TP1}
end, {#{}, #{}}, Options1).
-spec split_put_options(TreeAndPutOptions) -> {TreeOptions, PutOptions} when
TreeAndPutOptions :: TreeOptions | PutOptions,
TreeOptions :: khepri:tree_options(),
PutOptions :: khepri:put_options().
%% @private
split_put_options(TreeAndPutOptions) ->
maps:fold(
fun
(keep_while, KeepWhile, {T, P}) ->
P1 = P#{keep_while => KeepWhile},
{T, P1};
(Option, Value, {T, P}) ->
T1 = T#{Option => Value},
{T1, P}
end, {#{}, #{}}, TreeAndPutOptions).
set_default_options(Options) ->
%% By default, return payload-related properties. The caller can set
%% `props_to_return' to an empty map to get a minimal return value.
Options1 = case Options of
#{props_to_return := _} ->
Options;
_ ->
Options#{props_to_return => ?DEFAULT_PROPS_TO_RETURN}
end,
Options1.
-spec process_command(StoreId, Command, Options) -> Ret when
StoreId :: khepri:store_id(),
Command :: command(),
Options :: khepri:command_options(),
Ret :: any().
%% @doc Processes a command which is appended to the Ra log and processed by
%% this state machine code.
%%
%% `Command' may modify the state of the machine.
%%
%% The command associated code is executed in the context of the state machine
%% process on each Ra members.
%%
%% @param StoreId the name of the Ra cluster.
%%
%% @returns the result of the command or an "error" tuple.
%%
%% @private
process_command(StoreId, Command, Options) ->
CommandType = select_command_type(Options),
case CommandType of
sync ->
process_sync_command(StoreId, Command, Options);
{async, Correlation, Priority} ->
process_async_command(
StoreId, Command, Correlation, Priority)
end.
process_sync_command(
StoreId, Command, #{protect_against_dups := true} = Options) ->
MacVer = version(),
case effective_version(StoreId) of
{ok, EffectiveMacVer} when EffectiveMacVer >= MacVer ->
%% When `protect_against_dups' is true, we wrap the command inside
%% a #dedup{} one to give it a unique reference. This is used for
%% non-idempotent commands which could be replayed when there is a
%% change of leadership in the Ra cluster. The state machine uses
%% this reference to remember what it replied to a given
%% reference.
%%
%% `do_process_sync_command/3' is responsible for the retry loop
%% like with any other commands. Once it returned, we cast a
%% #dedup_ack{} command with the same reference to let the machine
%% know it can forget about that reference and the associated
%% reply.
CommandRef = make_ref(),
%% We can't keep a dedup entry forever in the machine state if the
%% initial caller never acknowledges the reply. Therefore we set
%% an expiration time after which the entry will be dropped.
%%
%% We base this expiry on the command's timeout. If it's
%% `infinity' we default to 15 minutes from now.
Now = erlang:system_time(millisecond),
Expiry = case get_timeout(Options) of
infinity -> Now + 15 * 60 * 1000;
Timeout -> Now + Timeout
end,
DedupCommand = #dedup{ref = CommandRef,
expiry = Expiry,
command = Command},
DedupAck = #dedup_ack{ref = CommandRef},
Ret = do_process_sync_command(
StoreId, DedupCommand, Options),
%% We acknowledge that we received the reply and all duplicates
%% can be ignored.
ThisNode = node(),
RaServer = khepri_cluster:node_to_member(StoreId, ThisNode),
_ = ra:pipeline_command(RaServer, DedupAck),
Ret;
_ ->
do_process_sync_command(StoreId, Command, Options)
end;
process_sync_command(
StoreId, Command, Options) ->
do_process_sync_command(StoreId, Command, Options).
do_process_sync_command(StoreId, Command, Options) ->
ThisNode = node(),
RaServer = khepri_cluster:node_to_member(StoreId, ThisNode),
Timeout = get_timeout(Options),
ReplyFrom = maps:get(reply_from, Options, {member, RaServer}),
CommandOptions = #{timeout => Timeout, reply_from => ReplyFrom},
T0 = khepri_utils:start_timeout_window(Timeout),
Dest = case ra_leaderboard:lookup_leader(StoreId) of
LeaderId when LeaderId =/= undefined ->
sending_command_remotely(StoreId),
LeaderId;
undefined ->
sending_sync_command_locally(StoreId),
RaServer
end,
case ra:process_command(Dest, Command, CommandOptions) of
{ok, Ret, _LeaderId} ->
?raise_exception_if_any(Ret);
{timeout, _LeaderId} ->
{error, timeout};
{error, Reason} = Error
when ?HAS_TIME_LEFT(Timeout) andalso
(Reason == noproc orelse Reason == nodedown orelse
Reason == shutdown) ->
%% We retry the command if either:
%% - the command was sent directly to a remote server, or
%% - the command was sent to the local server and it is still
%% alive.
ShouldRetry = (Dest =/= RaServer orelse
khepri_utils:is_ra_server_alive(RaServer)),
case ShouldRetry of
true ->
%% The follower doesn't know about the new leader yet.
%% Retry again after waiting a bit.
NewTimeout0 = khepri_utils:end_timeout_window(Timeout, T0),
NewTimeout = khepri_utils:sleep(
?TRANSIENT_ERROR_RETRY_INTERVAL,
NewTimeout0),
Options1 = Options#{timeout => NewTimeout},
do_process_sync_command(StoreId, Command, Options1);
false ->
Error
end;
{error, _} = Error ->
Error
end.
process_async_command(
StoreId, Command, ?DEFAULT_RA_COMMAND_CORRELATION = Correlation, Priority) ->
ThisNode = node(),
RaServer = khepri_cluster:node_to_member(StoreId, ThisNode),
sending_async_command_locally(StoreId),
ra:pipeline_command(RaServer, Command, Correlation, Priority);
process_async_command(
StoreId, Command, Correlation, Priority) ->
case ra_leaderboard:lookup_leader(StoreId) of
LeaderId when LeaderId =/= undefined ->
sending_command_remotely(StoreId),
ra:pipeline_command(LeaderId, Command, Correlation, Priority);
undefined ->
ThisNode = node(),
RaServer = khepri_cluster:node_to_member(StoreId, ThisNode),
sending_async_command_locally(StoreId),
ra:pipeline_command(RaServer, Command, Correlation, Priority)
end.
-spec select_command_type(Options) -> CommandType when
Options :: khepri:command_options(),
CommandType :: sync | {async, Correlation, Priority},
Correlation :: ra_server:command_correlation() |
?DEFAULT_RA_COMMAND_CORRELATION,
Priority :: ra_server:command_priority().
%% @doc Selects the command type depending on what the caller wants.
%%
%% @private
select_command_type(Options) when not is_map_key(async, Options) ->
sync;
select_command_type(#{async := false}) ->
sync;
select_command_type(#{async := true}) ->
{async, ?DEFAULT_RA_COMMAND_CORRELATION, ?DEFAULT_RA_COMMAND_PRIORITY};
select_command_type(#{async := Correlation})
when ?IS_RA_COMMAND_CORRELATION(Correlation) ->
{async, Correlation, ?DEFAULT_RA_COMMAND_PRIORITY};
select_command_type(#{async := Priority})
when ?IS_RA_COMMAND_PRIORITY(Priority) ->
{async, ?DEFAULT_RA_COMMAND_CORRELATION, Priority};
select_command_type(#{async := {Correlation, Priority}})
when ?IS_RA_COMMAND_CORRELATION(Correlation) andalso
?IS_RA_COMMAND_PRIORITY(Priority) ->
{async, Correlation, Priority}.
-spec process_query(StoreId, QueryFun, Options) -> Ret when
StoreId :: khepri:store_id(),
QueryFun :: query_fun(),
Options :: khepri:query_options(),
Ret :: any().
%% @doc Processes a query which is by the Ra leader.
%%
%% The `QueryFun' function takes the machine state as an argument and can
%% return anything. However, the machine state is never modified. The query
%% does not go through the Ra log and is not replicated.
%%
%% The `QueryFun' function is executed from a process on the leader Ra member.
%%
%% @param StoreId the name of the Ra cluster.
%%
%% @returns the result of the query or an "error" tuple.
%%
%% @private
process_query(StoreId, QueryFun, #{condition := _} = Options) ->
%% `condition' takes precedence over `favor'.
Options1 = maps:remove(favor, Options),
process_query1(StoreId, QueryFun, Options1);
process_query(StoreId, QueryFun, Options) ->
Favor = maps:get(favor, Options, low_latency),
Timeout = get_timeout(Options),
Options1 = maps:remove(favor, Options),
Options2 = Options1#{timeout => Timeout},
case Favor of
low_latency ->
process_query1(StoreId, QueryFun, Options2);
consistency ->
case add_applied_condition(StoreId, Options2) of
{ok, Options3} ->
process_query1(StoreId, QueryFun, Options3);
{error, _} = Error ->
Error
end
end.
process_query1(StoreId, QueryFun, Options) ->
sending_query_locally(StoreId),
LocalServerId = {StoreId, node()},
case ra:local_query(LocalServerId, QueryFun, Options) of
{ok, {_RaIdxTerm, Ret}, _NewLeaderId} ->
?raise_exception_if_any(Ret);
{timeout, _LeaderId} ->
{error, timeout};
{error, _} = Error ->
Error
end.
-spec add_applied_condition(StoreId, Options) -> NewOptions when
StoreId :: khepri:store_id(),
Options :: khepri:query_options(),
NewOptions :: khepri:ok(khepri:query_options()) | khepri:error().
%% @private
add_applied_condition(StoreId, Options) ->
Timeout = get_timeout(Options),
add_applied_condition1(StoreId, Options, Timeout).
add_applied_condition1(StoreId, Options, Timeout) ->
%% The `applied' condition permits that a query is only evaluated after
%% the given index is applied on the local node. This is useful to enforce
%% the order of operations between updates and queries. We have to follow
%% several steps to prepare that condition.
%%
%% If the last message from the calling process to the local Ra server was
%% an async command or if it never sent a command yet, we first send an
%% arbitrary query to the local Ra server. This is to make sure that
%% previously submitted pipelined commands were processed by that server.
%%
%% For instance, if there was a pipelined command without any correlation
%% ID, it ensures it was forwarded to the leader. Likewise for a
%% synchronous command without the `reply_from => local' option.
%%
%% We can't have this guaranty for pipelined commands with a correlation
%% because the caller is responsible for receiving the rejection from the
%% follower and handle the redirect to the leader.
case can_skip_fence_preliminary_query(StoreId) of
true ->
add_applied_condition2(StoreId, Options, Timeout);
false ->
T0 = khepri_utils:start_timeout_window(Timeout),
QueryFun = fun erlang:is_tuple/1,
case process_query1(StoreId, QueryFun, Timeout) of
true ->
NewTimeout = khepri_utils:end_timeout_window(Timeout, T0),
add_applied_condition2(StoreId, Options, NewTimeout);
Other when Other =/= false ->
Other
end
end.
add_applied_condition2(StoreId, Options, Timeout) ->
%% After the previous local query or sync command if there was one, there
%% is a great chance that the leader was cached, though not 100%
%% guarantied.
case ra_leaderboard:lookup_leader(StoreId) of
LeaderId when LeaderId =/= undefined ->
add_applied_condition3(StoreId, Options, LeaderId, Timeout);
undefined ->
%% If the leader is unknown, executing a preliminary query should
%% tell us who the leader is.
ask_fence_preliminary_query(StoreId),
add_applied_condition1(StoreId, Options, Timeout)
end.
add_applied_condition3(StoreId, Options, LeaderId, Timeout) ->
%% We query the leader to know the last index it committed in which term.
%%
%% We pay attention to its state because a map is still returned even if
%% the Ra server is stopped.
T0 = khepri_utils:start_timeout_window(Timeout),
try ra:key_metrics(LeaderId, Timeout) of
#{last_index := LastIndex, term := Term, state := State}
when State =/= noproc andalso State =/= unknown ->
NewTimeout1 = khepri_utils:end_timeout_window(Timeout, T0),
%% Now that we know the last committed index of the leader, we can
%% perform an arbitrary query on the local server. The query will
%% wait for that same index to be applied locally before it is
%% executed.
%%
%% We don't care about the result of that query. We just want to
%% block until the latest commands are applied locally.
Condition = {applied, {LastIndex, Term}},
Options1 = Options#{condition => Condition,
timeout => NewTimeout1},
{ok, Options1};
_ ->
timer:sleep(200),
NewTimeout = khepri_utils:end_timeout_window(Timeout, T0),
add_applied_condition1(StoreId, Options, NewTimeout)
catch
error:{erpc, timeout} ->
{error, timeout};
error:{erpc, noconnection} ->
timer:sleep(200),
NewTimeout2 = khepri_utils:end_timeout_window(Timeout, T0),
add_applied_condition1(StoreId, Options, NewTimeout2)
end.
-spec get_timeout(Options) -> Timeout when
Options :: khepri:command_options() | khepri:query_options(),
Timeout :: timeout().
%% @private
get_timeout(#{timeout := Timeout}) -> Timeout;
get_timeout(_) -> khepri_app:get_default_timeout().
-spec clear_cache(StoreId) -> ok when
StoreId :: khepri:store_id().
%% @doc Clears the cached data for the given `StoreId'.
%%
%% @private
clear_cache(_StoreId) ->
ok.
-define(CAN_SKIP_FENCE_PRELIMINARY_QUERY_KEY(StoreId),
{?MODULE, can_skip_fence_preliminary_query, StoreId}).
-spec sending_sync_command_locally(StoreId) -> ok when
StoreId :: khepri:store_id().
%% @doc Records that a synchronous command is about to be sent locally.
%%
%% After that, we know we don't need a fence preliminary query.
sending_sync_command_locally(StoreId) ->
Key = ?CAN_SKIP_FENCE_PRELIMINARY_QUERY_KEY(StoreId),
_ = erlang:put(Key, true),
ok.
-spec sending_query_locally(StoreId) -> ok when
StoreId :: khepri:store_id().
%% @doc Records that a query is about to be executed locally.
%%
%% After that, we know we don't need a fence preliminary query.
sending_query_locally(StoreId) ->
%% Same behavior as a local sync command.
sending_sync_command_locally(StoreId).
-spec sending_async_command_locally(StoreId) -> ok when
StoreId :: khepri:store_id().
%% @doc Records that an asynchronous command is about to be sent locally.
sending_async_command_locally(StoreId) ->
Key = ?CAN_SKIP_FENCE_PRELIMINARY_QUERY_KEY(StoreId),
_ = erlang:erase(Key),
ok.
-spec sending_command_remotely(StoreId) -> ok when
StoreId :: khepri:store_id().
%% @doc Records that a command is about to be sent to a remote store.
sending_command_remotely(StoreId) ->
%% Same behavior as a local async command.
sending_async_command_locally(StoreId).
-spec ask_fence_preliminary_query(StoreId) -> ok when
StoreId :: khepri:store_id().
%% @doc Explicitly requests that a call to {@link
%% can_skip_fence_preliminary_query/1} returns `true'.
ask_fence_preliminary_query(StoreId) ->
%% Same behavior as a local async command.
sending_async_command_locally(StoreId).
-spec can_skip_fence_preliminary_query(StoreId) -> LastMsgWasSync when
StoreId :: khepri:store_id(),
LastMsgWasSync :: boolean().
%% @doc Indicates if the calling process sent a synchronous command or a query
%% before this call.
%%
%% @returns `true' if the calling process sent a synchrorous command or a query
%% to the given store before this call, `false' if the calling process never
%% sent anything to the given store, if the last message was an asynchrorous
%% command, or if the last message was sent to a remote store.
can_skip_fence_preliminary_query(StoreId) ->
Key = ?CAN_SKIP_FENCE_PRELIMINARY_QUERY_KEY(StoreId),
erlang:get(Key) =:= true.
%% -------------------------------------------------------------------
%% ra_machine callbacks.
%% -------------------------------------------------------------------
-spec init(Params) -> State when
Params :: machine_init_args(),
State :: state().
%% @private
init(Params) ->
%% Initialize the state.
State = khepri_machine_v0:init(Params),
%% Create initial "schema" if provided.
Commands = maps:get(commands, Params, []),
State3 = lists:foldl(
fun(Command, State1) ->
Meta = #{index => 0,
term => 0,
system_time => 0},
{S, _, _} = apply(Meta, Command, State1),
S
end, State, Commands),
reset_applied_command_count(State3).
-spec init_aux(StoreId :: khepri:store_id()) -> aux_state().
%% @private
init_aux(StoreId) ->
#khepri_machine_aux{store_id = StoreId}.
-spec handle_aux(RaState, Type, Command, AuxState, LogState, MachineState) ->
{no_reply, AuxState, LogState} when
RaState :: ra_server:ra_state(),
Type :: {call, ra:from()} | cast,
Command :: term(),
AuxState :: aux_state(),
LogState :: ra_log:state(),
MachineState :: state().
%% @private
handle_aux(
_RaState, cast,
#trigger_projection{path = Path,
old_props = OldProps,
new_props = NewProps,
projection = Projection},
AuxState, LogState, _MachineState) ->
khepri_projection:trigger(Projection, Path, OldProps, NewProps),
{no_reply, AuxState, LogState};
handle_aux(
_RaState, cast, restore_projections, AuxState, LogState,
State) ->
Tree = get_tree(State),
ProjectionTree = get_projections(State),
khepri_pattern_tree:foreach(
ProjectionTree,
fun(PathPattern, Projections) ->
[restore_projection(Projection, Tree, PathPattern) ||
Projection <- Projections]
end),
{no_reply, AuxState, LogState};
handle_aux(
_RaState, cast,
#restore_projection{projection = Projection, pattern = PathPattern},
AuxState, LogState, State) ->
Tree = get_tree(State),
ok = restore_projection(Projection, Tree, PathPattern),
{no_reply, AuxState, LogState};
handle_aux(_RaState, _Type, _Command, AuxState, LogState, _MachineState) ->
{no_reply, AuxState, LogState}.
restore_projection(Projection, Tree, PathPattern) ->
_ = khepri_projection:init(Projection),
TreeOptions = #{props_to_return => ?PROJECTION_PROPS_TO_RETURN,
include_root_props => true},
case khepri_tree:find_matching_nodes(Tree, PathPattern, TreeOptions) of
{ok, MatchingNodes} ->
maps:foreach(fun(Path, Props) ->
khepri_projection:trigger(
Projection, Path, #{}, Props)
end, MatchingNodes);
Error ->
?LOG_DEBUG(
"Failed to recover projection ~s due to an error: ~p",
[khepri_projection:name(Projection), Error],
#{domain => [khepri, ra_machine]}),
ok
end.
-spec apply(Meta, Command, State) -> {State, Ret, SideEffects} when
Meta :: ra_machine:command_meta_data(),
Command :: command() | old_command(),
State :: state(),
Ret :: any(),
SideEffects :: ra_machine:effects().
%% @private
apply(
Meta,
#put{path = PathPattern, payload = Payload, options = TreeAndPutOptions},
State) ->
{TreeOptions, PutOptions} = split_put_options(TreeAndPutOptions),
Ret = insert_or_update_node(
State, PathPattern, Payload, PutOptions, TreeOptions, []),
post_apply(Ret, Meta);
apply(
Meta,
#delete{path = PathPattern, options = TreeOptions},
State) ->
Ret = delete_matching_nodes(State, PathPattern, TreeOptions, []),
post_apply(Ret, Meta);
apply(
Meta,
#tx{'fun' = StandaloneFun, args = Args},
State) when ?IS_HORUS_FUN(StandaloneFun) ->
Ret = khepri_tx_adv:run(State, StandaloneFun, Args, true),
post_apply(Ret, Meta);
apply(
Meta,
#tx{'fun' = PathPattern, args = Args},
State) when ?IS_KHEPRI_PATH_PATTERN(PathPattern) ->
Ret = locate_sproc_and_execute_tx(State, PathPattern, Args, true),
post_apply(Ret, Meta);
apply(
Meta,
#register_trigger{id = TriggerId,
sproc = StoredProcPath,
event_filter = EventFilter},
State) ->
Triggers = get_triggers(State),
StoredProcPath1 = khepri_path:realpath(StoredProcPath),
EventFilter1 = case EventFilter of
#evf_tree{path = Path} ->
Path1 = khepri_path:realpath(Path),
EventFilter#evf_tree{path = Path1}
end,
Triggers1 = Triggers#{TriggerId => #{sproc => StoredProcPath1,
event_filter => EventFilter1}},
State1 = set_triggers(State, Triggers1),
Ret = {State1, ok},
post_apply(Ret, Meta);
apply(
Meta,
#ack_triggered{triggered = ProcessedTriggers},
State) ->
EmittedTriggers = get_emitted_triggers(State),
EmittedTriggers1 = EmittedTriggers -- ProcessedTriggers,
State1 = set_emitted_triggers(State, EmittedTriggers1),
Ret = {State1, ok},
post_apply(Ret, Meta);
apply(
Meta,
#register_projection{pattern = PathPattern, projection = Projection},
State) ->
ProjectionName = khepri_projection:name(Projection),
ProjectionTree = get_projections(State),
case has_projection(ProjectionTree, ProjectionName) of
true ->
Info = #{name => ProjectionName},
Reason = ?khepri_error(projection_already_exists, Info),
Reply = {error, Reason},
Ret = {State, Reply},
post_apply(Ret, Meta);
false ->
ProjectionTree1 = khepri_pattern_tree:update(
ProjectionTree,
PathPattern,
fun (?NO_PAYLOAD) ->
[Projection];
(Projections) ->
[Projection | Projections]
end),
%% The new projection has been registered so the cached compiled
%% projection tree needs to be erased.
clear_compiled_projection_tree(),
State1 = set_projections(State, ProjectionTree1),
AuxEffect = #restore_projection{projection = Projection,
pattern = PathPattern},
Effects = [{aux, AuxEffect}],
Ret = {State1, ok, Effects},
post_apply(Ret, Meta)
end;
apply(
Meta,
#unregister_projections{names = Names},
State) ->
RemoveProjection = case Names of
all ->
fun(_Name) -> true end;
_ when is_list(Names) ->
Names1 = sets:from_list(Names, [{version, 2}]),
fun(Name) -> sets:is_element(Name, Names1) end
end,
ProjectionTree = get_projections(State),
{ProjectionTree1, RemovedProjectionsMap} =
khepri_pattern_tree:map_fold(
fun(Pattern, Projections, Acc) ->
{RemovedProjections, Projections1} =
lists:partition(
fun(Projection) ->
Name = khepri_projection:name(Projection),
RemoveProjection(Name)
end, Projections),
Acc2 = lists:foldl(
fun(Projection, Acc1) ->
Name = khepri_projection:name(Projection),
_ = khepri_projection:delete(Projection),
maps:put(Name, Pattern, Acc1)
end, Acc, RemovedProjections),
Projections2 = case Projections1 of
[] ->
?NO_PAYLOAD;
_ ->
Projections1
end,
{Projections2, Acc2}
end, #{}, ProjectionTree),
State1 = set_projections(State, ProjectionTree1),
clear_compiled_projection_tree(),
Reply = {ok, RemovedProjectionsMap},
Ret = {State1, Reply},
post_apply(Ret, Meta);
apply(
Meta,
#unregister_projection{name = Name},
State) ->
%% This command was replaced by `#unregister_projections{}'. Therefore,
%% convert it and recurse.
%%
%% For backward-compatibility; see {@link old_command()}.
NewCommand = #unregister_projections{names = [Name]},
apply(Meta, NewCommand, State);
apply(
#{machine_version := MacVer} = Meta,
#dedup{ref = CommandRef, expiry = Expiry, command = Command},
State)
when is_reference(CommandRef) andalso
is_integer(Expiry) andalso
MacVer >= 1 ->
Dedups = get_dedups(State),
case Dedups of
#{CommandRef := {Reply, _Expiry}} ->
Ret = {State, Reply},
post_apply(Ret, Meta);
_ ->
{State1, Reply, SideEffects} = apply(Meta, Command, State),
Dedups1 = Dedups#{CommandRef => {Reply, Expiry}},
State2 = set_dedups(State1, Dedups1),
{State2, Reply, SideEffects}
end;
apply(
#{machine_version := MacVer} = Meta,
#dedup_ack{ref = CommandRef},
State)
when is_reference(CommandRef) andalso
MacVer >= 1 ->
Dedups = get_dedups(State),
State1 = case Dedups of
#{CommandRef := _} ->
Dedups1 = maps:remove(CommandRef, Dedups),
set_dedups(State, Dedups1);
_ ->
State
end,
Ret = {State1, ok},
post_apply(Ret, Meta);
apply(Meta, {machine_version, OldMacVer, NewMacVer}, OldState) ->
NewState = convert_state(OldState, OldMacVer, NewMacVer),
Ret = {NewState, ok},
post_apply(Ret, Meta);
apply(#{machine_version := MacVer} = Meta, UnknownCommand, State) ->
Error = ?khepri_exception(
unknown_khepri_state_machine_command,
#{command => UnknownCommand,
machine_version => MacVer}),
Reply = {error, Error},
SideEffects = [{mod_call, logger, error,
["Unknown Khepri state machine command with machine "
"version ~b:~n~p",
[MacVer, UnknownCommand],
#{domain => [khepri, ra_machine],
mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY},
file => ?FILE,
line => ?LINE}]}],
Ret = {State, Reply, SideEffects},
post_apply(Ret, Meta).
-spec post_apply(ApplyRet, Meta) -> {State, Result, SideEffects} when
ApplyRet :: {State, Result} | {State, Result, SideEffects},
State :: state(),
Result :: any(),
Meta :: ra_machine:command_meta_data(),
SideEffects :: ra_machine:effects().
%% @private
post_apply({State, Result}, Meta) ->
post_apply({State, Result, []}, Meta);
post_apply({_State, _Result, _SideEffects} = Ret, Meta) ->
Ret1 = bump_applied_command_count(Ret, Meta),
Ret2 = drop_expired_dedups(Ret1, Meta),
Ret2.
-spec bump_applied_command_count(ApplyRet, Meta) ->
{State, Result, SideEffects} when
ApplyRet :: {State, Result, SideEffects},
State :: state(),
Result :: any(),
Meta :: ra_machine:command_meta_data(),
SideEffects :: ra_machine:effects().
%% @private
bump_applied_command_count(
{State, Result, SideEffects},
#{index := RaftIndex}) ->
#config{snapshot_interval = SnapshotInterval} = get_config(State),
Metrics = get_metrics(State),
AppliedCmdCount0 = maps:get(applied_command_count, Metrics, 0),
AppliedCmdCount = AppliedCmdCount0 + 1,
case AppliedCmdCount < SnapshotInterval of
true ->
Metrics1 = Metrics#{applied_command_count => AppliedCmdCount},
State1 = set_metrics(State, Metrics1),
{State1, Result, SideEffects};
false ->
?LOG_DEBUG(
"Move release cursor after ~b commands applied "
"(>= ~b commands)",
[AppliedCmdCount, SnapshotInterval],
#{domain => [khepri, ra_machine]}),
State1 = reset_applied_command_count(State),
ReleaseCursor = {release_cursor, RaftIndex, State1},
SideEffects1 = [ReleaseCursor | SideEffects],
{State1, Result, SideEffects1}
end.
reset_applied_command_count(State) ->
Metrics = get_metrics(State),
Metrics1 = maps:remove(applied_command_count, Metrics),
set_metrics(State, Metrics1).
-spec drop_expired_dedups(ApplyRet, Meta) ->
{State, Result, SideEffects} when
ApplyRet :: {State, Result, SideEffects},
State :: state(),
Result :: any(),
Meta :: ra_machine:command_meta_data(),
SideEffects :: ra_machine:effects().
%% @private
drop_expired_dedups(
{State, Result, SideEffects},
#{system_time := Timestamp}) ->
Dedups = get_dedups(State),
Dedups1 = maps:filter(
fun(_CommandRef, {_Reply, Expiry}) ->
Expiry >= Timestamp
end, Dedups),
State1 = set_dedups(State, Dedups1),
{State1, Result, SideEffects}.
%% @private
state_enter(leader, State) ->
SideEffects1 = emitted_triggers_to_side_effects(State),
SideEffects1;
state_enter(recovered, _State) ->
SideEffect = {aux, restore_projections},
[SideEffect];
state_enter(_StateName, _State) ->
[].
%% @private
snapshot_installed(
#{machine_version := NewMacVer}, NewState,
#{machine_version := OldMacVer}, OldState) ->
%% A snapshot might be installed on a follower member who has fallen
%% sufficiently far behind in replication of the log from the leader. When
%% a member installs a snapshot it needs to update its projections: new
%% projections may have been registered since the snapshot or old ones
%% unregistered. Projections which did not change need to be triggered
%% with the new changes to state, similar to the `restore_projections' aux
%% effect. Also see `update_projections/2'.
%%
%% Note that the snapshot installation might bump the effective machine
%% version so we need to convert the old state to the new machine version.
OldState1 = convert_state(OldState, OldMacVer, NewMacVer),
ok = update_projections(OldState1, NewState),
ok = clear_compiled_projection_tree(),
[].
%% @private
emitted_triggers_to_side_effects(State) ->
#config{store_id = StoreId} = get_config(State),
EmittedTriggers = get_emitted_triggers(State),
case EmittedTriggers of
[_ | _] ->
SideEffect = {mod_call,
khepri_event_handler,
handle_triggered_sprocs,
[StoreId, EmittedTriggers]},
[SideEffect];
[] ->
[]
end.
-spec overview(State) -> Overview when
State :: khepri_machine:state(),
Overview :: #{store_id := StoreId,
tree := NodeTree,
triggers := Triggers,
keep_while_conds := KeepWhileConds},
StoreId :: khepri:store_id(),
NodeTree :: khepri_utils:display_tree(),
Triggers :: khepri_machine:triggers_map(),
KeepWhileConds :: khepri_tree:keep_while_conds_map().
%% @private
overview(State) ->
#config{store_id = StoreId} = get_config(State),
Tree = get_tree(State),
KeepWhileConds = get_keep_while_conds(State),
Triggers = get_triggers(State),
TreeOptions = #{props_to_return => [payload,
payload_version,
child_list_version,
child_list_length],
include_root_props => true},
{ok, NodePropsMap} = khepri_tree:find_matching_nodes(
Tree, [?KHEPRI_WILDCARD_STAR_STAR], TreeOptions),
MapFun = fun
(#{sproc := Sproc} = Props) ->
Props#{sproc => horus:to_fun(Sproc)};
(Props) ->
Props
end,
NodeTree = khepri_utils:flat_struct_to_tree(NodePropsMap, MapFun),
#{store_id => StoreId,
tree => NodeTree,
triggers => Triggers,
keep_while_conds => KeepWhileConds}.
-spec version() -> MacVer when
MacVer :: 1.
%% @doc Returns the state machine version.
version() ->
1.
-spec which_module(MacVer) -> Module when
MacVer :: 1 | 0,
Module :: ?MODULE.
%% @doc Returns the state machine module corresponding to the given version.
which_module(1) -> ?MODULE;
which_module(0) -> ?MODULE.
-spec effective_version(StoreId) -> Ret when
StoreId :: khepri:store_id(),
Ret :: khepri:ok(EffectiveMacVer) | khepri:error(),
EffectiveMacVer :: ra_machine:version().
%% @doc Returns the effective state machine version of the local Ra server.
effective_version(StoreId) ->
ThisNode = node(),
RaServer = khepri_cluster:node_to_member(StoreId, ThisNode),
case ra_counters:counters(RaServer, [effective_machine_version]) of
#{effective_machine_version := EffectiveMacVer} ->
{ok, EffectiveMacVer};
_ ->
case ra:member_overview(RaServer) of
{ok, #{effective_machine_version := EffectiveMacVer}, _} ->
{ok, EffectiveMacVer};
{error, _} = Error ->
Reason = ?khepri_error(
effective_machine_version_not_defined,
#{store_id => StoreId,
ra_server => RaServer,
error => Error}),
{error, Reason}
end
end.
%% -------------------------------------------------------------------
%% Internal functions.
%% -------------------------------------------------------------------
locate_sproc_and_execute_tx(State, PathPattern, Args, AllowUpdates) ->
Tree = get_tree(State),
TreeOptions = #{expect_specific_node => true,
props_to_return => [raw_payload]},
{StandaloneFun, Args1} =
case khepri_tree:find_matching_nodes(Tree, PathPattern, TreeOptions) of
{ok, Result} ->
case maps:values(Result) of
[#{raw_payload := #p_sproc{
sproc = StoredProc,
is_valid_as_tx_fun = ReadWrite}}]
when AllowUpdates andalso
(ReadWrite =:= ro orelse ReadWrite =:= rw) ->
{StoredProc, Args};
[#{raw_payload := #p_sproc{
sproc = StoredProc,
is_valid_as_tx_fun = ro}}]
when not AllowUpdates ->
{StoredProc, Args};
[#{raw_payload := #p_sproc{}}] ->
Reason = ?khepri_error(
sproc_invalid_as_tx_fun,
#{path => PathPattern}),
{fun failed_to_locate_sproc/1, [Reason]};
_ ->
Reason = ?khepri_error(
no_sproc_at_given_path,
#{path => PathPattern}),
{fun failed_to_locate_sproc/1, [Reason]}
end;
{error, Reason} ->
{fun failed_to_locate_sproc/1, [Reason]}
end,
khepri_tx_adv:run(State, StandaloneFun, Args1, AllowUpdates).
-spec failed_to_locate_sproc(Reason) -> no_return() when
Reason :: any().
%% @private
failed_to_locate_sproc(Reason) ->
khepri_tx:abort(Reason).
-spec insert_or_update_node(
State, PathPattern, Payload, PutOptions, TreeOptions, SideEffects) ->
Ret when
State :: state(),
PathPattern :: khepri_path:native_pattern(),
Payload :: khepri_payload:payload(),
PutOptions :: khepri:put_options(),
TreeOptions :: khepri:tree_options(),
SideEffects :: ra_machine:effects(),
Ret :: {State, Result, ra_machine:effects()},
Result :: khepri_machine:common_ret().
%% @private
insert_or_update_node(
State, PathPattern, Payload, PutOptions, TreeOptions, SideEffects) ->
Tree = get_tree(State),
Ret1 = khepri_tree:insert_or_update_node(
Tree, PathPattern, Payload, PutOptions, TreeOptions),
case Ret1 of
{ok, Tree1, AppliedChanges, Ret2} ->
State1 = set_tree(State, Tree1),
{State2, SideEffects1} = add_tree_change_side_effects(
State, State1, Ret2, AppliedChanges,
SideEffects),
{State2, {ok, Ret2}, SideEffects1};
Error ->
{State, Error, SideEffects}
end.
-spec delete_matching_nodes(State, PathPattern, TreeOptions, SideEffects) ->
Ret when
State :: state(),
PathPattern :: khepri_path:native_pattern(),
TreeOptions :: khepri:tree_options(),
SideEffects :: ra_machine:effects(),
Ret :: {State, Result, ra_machine:effects()},
Result :: khepri_machine:common_ret().
%% @private
delete_matching_nodes(State, PathPattern, TreeOptions, SideEffects) ->
Tree = get_tree(State),
Ret = khepri_tree:delete_matching_nodes(
Tree, PathPattern, #{}, TreeOptions),
case Ret of
{ok, Tree1, AppliedChanges, Ret2} ->
State1 = set_tree(State, Tree1),
{State2, SideEffects1} = add_tree_change_side_effects(
State, State1, Ret2, AppliedChanges,
SideEffects),
{State2, {ok, Ret2}, SideEffects1};
Error ->
{State, Error, SideEffects}
end.
add_tree_change_side_effects(
InitialState, NewState, Ret, KeepWhileAftermath, SideEffects) ->
%% We make a map where for each affected tree node, we indicate the type
%% of change.
Changes0 = maps:merge(Ret, KeepWhileAftermath),
Changes = maps:map(
fun
(_, NodeProps) when NodeProps =:= #{} -> create;
(_, #{} = _NodeProps) -> update;
(_, delete) -> delete
end, Changes0),
NewSideEffects = create_projection_side_effects(
InitialState, NewState, Changes),
{NewState1, NewSideEffects1} = add_trigger_side_effects(
InitialState, NewState, Changes,
NewSideEffects),
SideEffects1 = lists:reverse(NewSideEffects1, SideEffects),
{NewState1, SideEffects1}.
create_projection_side_effects(InitialState, NewState, Changes) ->
InitialTree = get_tree(InitialState),
NewTree = get_tree(NewState),
ProjectionTree0 = get_projections(NewState),
ProjectionTree = get_compiled_projection_tree(ProjectionTree0),
maps:fold(
fun(Path, Change, Effects) ->
create_projection_side_effects1(
InitialTree, NewTree, ProjectionTree, Path, Change, Effects)
end, [], Changes).
create_projection_side_effects1(
InitialTree, NewTree, ProjectionTree, Path, delete = Change, Effects) ->
%% Deletion changes recursively delete the subtree below the deleted tree
%% node. Find any children in the tree that were also deleted by this
%% change and trigger any necessary projections for those children.
ChildrenFindOptions = #{props_to_return => ?PROJECTION_PROPS_TO_RETURN,
expect_specific_node => false},
ChildrenPattern = Path ++ [?KHEPRI_WILDCARD_STAR_STAR],
EffectsForChildrenFun =
fun(ChildPath, _NodeProps, EffectAcc) ->
create_projection_side_effects2(
InitialTree, NewTree, ProjectionTree,
ChildPath, Change, EffectAcc)
end,
{ok, Effects1} = khepri_tree:fold(
InitialTree, ChildrenPattern,
EffectsForChildrenFun, Effects,
ChildrenFindOptions),
%% Also trigger a change for the deleted path itself.
create_projection_side_effects2(
InitialTree, NewTree, ProjectionTree, Path, Change, Effects1);
create_projection_side_effects1(
InitialTree, NewTree, ProjectionTree, Path, Change, Effects) ->
create_projection_side_effects2(
InitialTree, NewTree, ProjectionTree, Path, Change, Effects).
create_projection_side_effects2(
InitialTree, NewTree, ProjectionTree, Path, Change, Effects) ->
PatternMatchingTree = case Change of
create ->
NewTree;
update ->
NewTree;
delete ->
InitialTree
end,
khepri_pattern_tree:fold_matching(
ProjectionTree,
PatternMatchingTree,
Path,
fun(_PathPattern, Projections, Effects1) ->
lists:foldl(
fun(Projection, Effects2) ->
evaluate_projection(
InitialTree, NewTree, Path, Projection, Effects2)
end, Effects1, Projections)
end,
Effects).
-spec evaluate_projection(InitialTree, NewTree, Path, Projection, Effects) ->
Ret when
InitialTree :: khepri_tree:tree(),
NewTree :: khepri_tree:tree(),
Path :: khepri_path:native_path(),
Projection :: khepri_projection:projection(),
Effects :: ra_machine:effects(),
Ret :: ra_machine:effects().
%% @private
evaluate_projection(
InitialTree, NewTree, Path, Projection, Effects) ->
FindOptions = #{props_to_return => ?PROJECTION_PROPS_TO_RETURN,
expect_specific_node => true},
InitialRet = khepri_tree:find_matching_nodes(
InitialTree, Path, FindOptions),
InitialProps = case InitialRet of
{ok, #{Path := InitialProps0}} ->
InitialProps0;
_ ->
#{}
end,
NewRet = khepri_tree:find_matching_nodes(
NewTree, Path, FindOptions),
NewProps = case NewRet of
{ok, #{Path := NewProps0}} ->
NewProps0;
_ ->
#{}
end,
Trigger = #trigger_projection{path = Path,
old_props = InitialProps,
new_props = NewProps,
projection = Projection},
Effect = {aux, Trigger},
[Effect | Effects].
add_trigger_side_effects(InitialState, NewState, Changes, SideEffects) ->
%% We want to consider the new state (with the updated tree), but we want
%% to use triggers from the initial state, in case they were updated too.
%% In other words, we want to evaluate triggers in the state they were at
%% the time the change to the tree was requested.
Triggers = get_triggers(InitialState),
case Triggers =:= #{} of
true ->
{NewState, SideEffects};
false ->
EmittedTriggers = get_emitted_triggers(InitialState),
#config{store_id = StoreId} = get_config(NewState),
Tree = get_tree(NewState),
TriggeredStoredProcs = list_triggered_sprocs(
Tree, Changes, Triggers),
%% We record the list of triggered stored procedures in the state
%% machine's state. This is used to guaranty at-least-once
%% execution of the trigger: the event handler process is supposed
%% to ack when it executed the triggered stored procedure. If the
%% Ra cluster changes leader in between, we know that we need to
%% retry the execution.
%%
%% This could lead to multiple execution of the same trigger,
%% therefore the stored procedure must be idempotent.
NewState1 = set_emitted_triggers(
NewState, EmittedTriggers ++ TriggeredStoredProcs),
%% We still emit a `mod_call' effect to wake up the event handler
%% process so it doesn't have to poll the internal list.
SideEffect = {mod_call,
khepri_event_handler,
handle_triggered_sprocs,
[StoreId, TriggeredStoredProcs]},
{NewState1, [SideEffect | SideEffects]}
end.
list_triggered_sprocs(Tree, Changes, Triggers) ->
TriggeredStoredProcs =
maps:fold(
fun(Path, Change, TSP) ->
% For each change, we evaluate each trigger.
maps:fold(
fun(TriggerId, TriggerProps, TSP1) ->
evaluate_trigger(
Tree, Path, Change, TriggerId, TriggerProps, TSP1)
end, TSP, Triggers)
end, [], Changes),
sort_triggered_sprocs(TriggeredStoredProcs).
evaluate_trigger(
Tree, Path, Change, TriggerId,
#{sproc := StoredProcPath,
event_filter := #evf_tree{path = PathPattern,
props = EventFilterProps} = EventFilter},
TriggeredStoredProcs) ->
%% For each trigger based on a tree event:
%% 1. we verify the path of the changed tree node matches the monitored
%% path pattern in the event filter.
%% 2. we verify the type of change matches the change filter in the
%% event filter.
PathMatches = khepri_tree:does_path_match(Path, PathPattern, Tree),
DefaultWatchedChanges = [create, update, delete],
WatchedChanges = case EventFilterProps of
#{on_actions := []} ->
DefaultWatchedChanges;
#{on_actions := OnActions}
when is_list(OnActions) ->
OnActions;
_ ->
DefaultWatchedChanges
end,
ChangeMatches = lists:member(Change, WatchedChanges),
case PathMatches andalso ChangeMatches of
true ->
%% We then locate the stored procedure. If the path doesn't point
%% to an existing tree node, or if this tree node is not a stored
%% procedure, the trigger is ignored.
%%
%% TODO: Should we return an error or at least log something? This
%% could be considered noise if the trigger exists regardless of
%% the presence of the stored procedure on purpose (for instance
%% the caller code is being updated).
case find_stored_proc(Tree, StoredProcPath) of
undefined ->
TriggeredStoredProcs;
StoredProc ->
%% TODO: Use a record to format
%% stored procedure arguments?
EventProps = #{path => Path,
on_action => Change},
Triggered = #triggered{
id = TriggerId,
event_filter = EventFilter,
sproc = StoredProc,
props = EventProps},
[Triggered | TriggeredStoredProcs]
end;
false ->
TriggeredStoredProcs
end;
evaluate_trigger(
_Root, _Path, _Change, _TriggerId, _TriggerProps, TriggeredStoredProcs) ->
TriggeredStoredProcs.
find_stored_proc(Tree, StoredProcPath) ->
TreeOptions = #{expect_specific_node => true,
props_to_return => [payload,
payload_version,
child_list_version,
child_list_length]},
Ret = khepri_tree:find_matching_nodes(
Tree, StoredProcPath, TreeOptions),
%% Non-existing nodes and nodes which are not stored procedures are
%% ignored.
case Ret of
{ok, #{StoredProcPath := #{sproc := StoredProc}}} -> StoredProc;
_ -> undefined
end.
sort_triggered_sprocs(TriggeredStoredProcs) ->
%% We first sort by priority, then by triggered ID if priorities are equal.
%% The priority can be any integer (even negative integers). The default
%% priority is 0.
%%
%% A higher priority (a greater integer) means that the triggered stored
%% procedure will be executed before another one with lower priority
%% (smaller integer).
%%
%% If the priorities are equal, a trigger with an ID earlier in
%% alphabetical order will be executed before another one with an ID later
%% in alphabetical order.
lists:sort(
fun(#triggered{id = IdA, event_filter = EventFilterA},
#triggered{id = IdB, event_filter = EventFilterB}) ->
PrioA = khepri_evf:get_priority(EventFilterA),
PrioB = khepri_evf:get_priority(EventFilterB),
if
PrioA =:= PrioB -> IdA =< IdB;
true -> PrioA > PrioB
end
end,
TriggeredStoredProcs).
-spec get_compiled_projection_tree(ProjectionTree) -> CompiledProjectionTree
when
ProjectionTree :: khepri_machine:projection_tree(),
CompiledProjectionTree :: khepri_machine:projection_tree().
%% @doc Gets the compiled version of the projection pattern tree.
%%
%% The pattern tree for projections must be compiled before it can be queried
%% into for changes to the store via {@link khepri_pattern_tree:fold/5}.
%%
%% This compiled pattern tree is cached in the process dictionary for the Ra
%% server process. If the cached value does not exist, `SourceProjectionTree'
%% is compiled with {@link khepri_pattern_tree:compile/1} and stored for
%% future lookups.
%%
%% @private
get_compiled_projection_tree(SourceProjectionTree) ->
case get(compiled_projection_tree) of
undefined ->
CompiledProjectionTree = khepri_pattern_tree:compile(
SourceProjectionTree),
put(compiled_projection_tree, CompiledProjectionTree),
CompiledProjectionTree;
CompiledProjectionTree ->
CompiledProjectionTree
end.
-spec clear_compiled_projection_tree() -> ok.
%% @doc Erases the cached projection tree.
%%
%% This function should be called whenever the projection tree is changed:
%% whenever a projection is registered or unregistered.
%%
%% @see get_compiled_projection_tree/1.
%%
%% @private
clear_compiled_projection_tree() ->
erase(compiled_projection_tree),
ok.
%% -------------------------------------------------------------------
%% State record management functions.
%% -------------------------------------------------------------------
-spec is_state(State) -> IsState when
State :: khepri_machine:state(),
IsState :: boolean().
%% @doc Tells if the given argument is a valid state.
%%
%% @private
is_state(State) ->
is_record(State, khepri_machine) orelse khepri_machine_v0:is_state(State).
-spec ensure_is_state(State) -> ok when
State :: khepri_machine:state().
%% @doc Throws an exception if the given argument is not a valid state.
%%
%% @private
ensure_is_state(State) ->
?assert(is_state(State)),
ok.
-spec get_config(State) -> Config when
State :: khepri_machine:state(),
Config :: khepri_machine:machine_config().
%% @doc Returns the config from the given state.
%%
%% @private
get_config(#khepri_machine{config = Config}) ->
Config;
get_config(State) ->
khepri_machine_v0:get_config(State).
-spec get_tree(State) -> Tree when
State :: khepri_machine:state(),
Tree :: khepri_tree:tree().
%% @doc Returns the tree from the given state.
%%
%% @private
get_tree(#khepri_machine{tree = Tree}) ->
Tree;
get_tree(State) ->
khepri_machine_v0:get_tree(State).
-spec set_tree(State, Tree) -> NewState when
State :: khepri_machine:state(),
Tree :: khepri_tree:tree(),
NewState :: khepri_machine:state().
%% @doc Sets the tree in the given state.
%%
%% @private
set_tree(#khepri_machine{} = State, Tree) ->
State#khepri_machine{tree = Tree};
set_tree(State, Tree) ->
khepri_machine_v0:set_tree(State, Tree).
-spec get_root(State) -> Root when
State :: khepri_machine:state(),
Root :: khepri_tree:tree_node().
%% @doc Returns the root of the tree from the given state.
%%
%% @private
get_root(State) ->
#tree{root = Root} = get_tree(State),
Root.
-spec get_keep_while_conds(State) -> KeepWhileConds when
State :: khepri_machine:state(),
KeepWhileConds :: khepri_tree:keep_while_conds_map().
%% @doc Returns the `keep_while' conditions in the tree from the given state.
%%
%% @private
get_keep_while_conds(State) ->
#tree{keep_while_conds = KeepWhileConds} = get_tree(State),
KeepWhileConds.
-spec get_keep_while_conds_revidx(State) -> KeepWhileCondsRevIdx when
State :: khepri_machine:state(),
KeepWhileCondsRevIdx :: khepri_tree:keep_while_conds_revidx().
%% @doc Returns the `keep_while' conditions reverse index in the tree from the
%% given state.
%%
%% @private
get_keep_while_conds_revidx(State) ->
#tree{keep_while_conds_revidx = KeepWhileCondsRevIdx} = get_tree(State),
KeepWhileCondsRevIdx.
-spec get_triggers(State) -> Triggers when
State :: khepri_machine:state(),
Triggers :: khepri_machine:triggers_map().
%% @doc Returns the triggers from the given state.
%%
%% @private
get_triggers(#khepri_machine{triggers = Triggers}) ->
Triggers;
get_triggers(State) ->
khepri_machine_v0:get_triggers(State).
-spec set_triggers(State, Triggers) -> NewState when
State :: khepri_machine:state(),
Triggers :: khepri_machine:triggers_map(),
NewState :: khepri_machine:state().
%% @doc Sets the triggers in the given state.
%%
%% @private
set_triggers(#khepri_machine{} = State, Triggers) ->
State#khepri_machine{triggers = Triggers};
set_triggers(State, Triggers) ->
khepri_machine_v0:set_triggers(State, Triggers).
-spec get_emitted_triggers(State) -> EmittedTriggers when
State :: khepri_machine:state(),
EmittedTriggers :: [khepri_machine:triggered()].
%% @doc Returns the emitted_triggers from the given state.
%%
%% @private
get_emitted_triggers(#khepri_machine{emitted_triggers = EmittedTriggers}) ->
EmittedTriggers;
get_emitted_triggers(State) ->
khepri_machine_v0:get_emitted_triggers(State).
-spec set_emitted_triggers(State, EmittedTriggers) -> NewState when
State :: khepri_machine:state(),
EmittedTriggers :: [khepri_machine:triggered()],
NewState :: khepri_machine:state().
%% @doc Sets the emitted_triggers in the given state.
%%
%% @private
set_emitted_triggers(#khepri_machine{} = State, EmittedTriggers) ->
State#khepri_machine{emitted_triggers = EmittedTriggers};
set_emitted_triggers(State, EmittedTriggers) ->
khepri_machine_v0:set_emitted_triggers(State, EmittedTriggers).
-spec get_projections(State) -> Projections when
State :: khepri_machine:state(),
Projections :: khepri_machine:projection_tree().
%% @doc Returns the projections from the given state.
%%
%% @private
get_projections(#khepri_machine{projections = Projections}) ->
Projections;
get_projections(State) ->
khepri_machine_v0:get_projections(State).
-spec has_projection(ProjectionTree, ProjectionName) -> boolean() when
ProjectionTree :: khepri_machine:projection_tree(),
ProjectionName :: khepri_projection:name().
%% @doc Determines if the given projection tree contains a projection.
%%
%% Two projections are considered equal if they have the same name.
%%
%% @private
has_projection(ProjectionTree, Name) when is_atom(Name) ->
khepri_pattern_tree:any(
ProjectionTree,
fun(Projections) ->
lists:any(
fun(#khepri_projection{name = N}) ->
N =:= Name
end, Projections)
end).
-spec set_projections(State, Projections) -> NewState when
State :: khepri_machine:state(),
Projections :: khepri_machine:projection_tree(),
NewState :: khepri_machine:state().
%% @doc Sets the projections in the given state.
%%
%% @private
set_projections(#khepri_machine{} = State, Projections) ->
State#khepri_machine{projections = Projections};
set_projections(State, Projections) ->
khepri_machine_v0:set_projections(State, Projections).
-spec get_metrics(State) -> Metrics when
State :: khepri_machine:state(),
Metrics :: khepri_machine:metrics().
%% @doc Returns the metrics from the given state.
%%
%% @private
get_metrics(#khepri_machine{metrics = Metrics}) ->
Metrics;
get_metrics(State) ->
khepri_machine_v0:get_metrics(State).
-spec set_metrics(State, Metrics) -> NewState when
State :: khepri_machine:state(),
Metrics :: khepri_machine:metrics(),
NewState :: khepri_machine:state().
%% @doc Sets the metrics in the given state.
%%
%% @private
set_metrics(#khepri_machine{} = State, Metrics) ->
State#khepri_machine{metrics = Metrics};
set_metrics(State, Metrics) ->
khepri_machine_v0:set_metrics(State, Metrics).
-spec get_dedups(State) -> Dedups when
State :: khepri_machine:state(),
Dedups :: khepri_machine:dedups_map().
%% @doc Returns the dedups from the given state.
%%
%% @private
get_dedups(#khepri_machine{dedups = Dedups}) ->
Dedups;
get_dedups(_State) ->
#{}.
-spec set_dedups(State, Dedups) -> NewState when
State :: khepri_machine:state(),
Dedups :: khepri_machine:dedups_map(),
NewState :: khepri_machine:state().
%% @doc Sets the dedups in the given state.
%%
%% @private
set_dedups(#khepri_machine{} = State, Dedups) ->
State#khepri_machine{dedups = Dedups};
set_dedups(State, _Dedups) ->
State.
-ifdef(TEST).
-spec make_virgin_state(Params) -> State when
Params :: khepri_machine:machine_init_args(),
State :: khepri_machine_v0:state().
make_virgin_state(Params) ->
khepri_machine_v0:init(Params).
-endif.
-spec convert_state(OldState, OldMacVer, NewMacVer) -> NewState when
OldState :: khepri_machine_v0:state(),
OldMacVer :: ra_machine:version(),
NewMacVer :: ra_machine:version(),
NewState :: khepri_machine:state().
%% @doc Converts a state to a newer version.
%%
%% @private
convert_state(State, MacVer, MacVer) ->
State;
convert_state(State, 0, 1) ->
%% To go from version 0 to version 1, we add the `dedups' fields at the
%% end of the record. The default value is an empty map.
?assert(khepri_machine_v0:is_state(State)),
Fields0 = khepri_machine_v0:state_to_list(State),
Fields1 = Fields0 ++ [#{}],
State1 = list_to_tuple(Fields1),
?assert(is_state(State1)),
State1.
-spec update_projections(OldState, NewState) -> ok when
OldState :: khepri_machine:state(),
NewState :: khepri_machine:state().
%% @doc Updates the machine's projections to account for changes between two
%% states.
%%
%% This is used when installing a projection - the state will jump from the
%% given `OldState' before the snapshot was installed to the given `NewState'
%% after. When we swap states we need to update the projections: the records
%% in the projection tables themselves but also which projection tables exist.
%% The changes glossed over by the snapshot may include projection
%% registrations and unregistrations so we need to initialize new projections
%% and delete unregistered ones, and we need to ensure that the projection
%% tables are up to date for any projections which didn't change.
%%
%% @private
update_projections(OldState, NewState) ->
OldTree = get_tree(OldState),
OldProjections = set_of_projections(get_projections(OldState)),
NewTree = get_tree(NewState),
NewProjections = set_of_projections(get_projections(NewState)),
CommonProjections = sets:intersection(OldProjections, NewProjections),
DeletedProjections = sets:subtract(OldProjections, CommonProjections),
CreatedProjections = sets:subtract(NewProjections, CommonProjections),
%% Tear down any projections which were unregistered.
sets:fold(
fun({_Pattern, Projection}, _Acc) ->
_ = khepri_projection:delete(Projection),
ok
end, ok, DeletedProjections),
%% Initialize any new projections which were registered.
sets:fold(
fun({Pattern, Projection}, _Acc) ->
ok = restore_projection(Projection, NewTree, Pattern)
end, ok, CreatedProjections),
%% Update in-place any projections which were not changed themselves (i.e.
%% the projection name, function and pattern) between old and new states.
%% To do this we will find the matching nodes in the old and new tree for
%% the projection's pattern and trigger the projection based on each
%% matching path's old and new properties.
sets:fold(
fun({Pattern, Projection}, _Acc) ->
ok = update_projection(Pattern, Projection, OldTree, NewTree)
end, ok, CommonProjections),
ok.
-spec set_of_projections(ProjectionTree) -> Projections when
ProjectionTree :: khepri_machine:projection_tree(),
Element :: {khepri_path:native_pattern(),
khepri_projection:projection()},
Projections :: sets:set(Element).
%% Folds the set of projections in a projection tree into a version 2 {@link
%% sets:set()}.
%%
%% @private
set_of_projections(ProjectionTree) ->
khepri_pattern_tree:fold(
ProjectionTree,
fun(Pattern, Projections, Acc) ->
lists:foldl(
fun(Projection, Acc1) ->
Entry = {Pattern, Projection},
sets:add_element(Entry, Acc1)
end, Acc, Projections)
end, sets:new([{version, 2}])).
update_projection(Pattern, Projection, OldTree, NewTree) ->
TreeOptions = #{props_to_return => ?PROJECTION_PROPS_TO_RETURN,
include_root_props => true},
case khepri_tree:find_matching_nodes(OldTree, Pattern, TreeOptions) of
{ok, OldMatchingNodes} ->
Result = khepri_tree:find_matching_nodes(
NewTree, Pattern, TreeOptions),
case Result of
{ok, NewMatchingNodes} ->
Updates = diff_matching_nodes(
OldMatchingNodes, NewMatchingNodes),
maps:foreach(
fun(Path, {OldProps, NewProps}) ->
khepri_projection:trigger(
Projection, Path, OldProps, NewProps)
end, Updates);
Error ->
?LOG_DEBUG(
"Failed to refresh projection ~s due to an error "
"finding matching nodes in the new tree: ~p",
[khepri_projection:name(Projection), Error],
#{domain => [khepri, ra_machine]})
end;
Error ->
?LOG_DEBUG(
"Failed to refresh projection ~s due to an error finding "
"matching nodes in the old tree: ~p",
[khepri_projection:name(Projection), Error],
#{domain => [khepri, ra_machine]})
end.
-spec diff_matching_nodes(OldNodeProps, NewNodeProps) -> Changes when
OldNodeProps :: khepri_adv:node_props_map(),
NewNodeProps :: khepri_adv:node_props_map(),
OldProps :: khepri:node_props(),
NewProps :: khepri:node_props(),
Changes :: #{khepri_path:native_path() => {OldProps, NewProps}}.
%% @private
diff_matching_nodes(OldNodeProps, NewNodeProps) ->
CommonProps = maps:intersect_with(
fun(_Path, OldProps, NewProps) -> {OldProps, NewProps} end,
OldNodeProps, NewNodeProps),
CommonPaths = maps:keys(CommonProps),
AllProps = maps:fold(
fun(Path, OldProps, Acc) ->
Acc#{Path => {OldProps, #{}}}
end, CommonProps, maps:without(CommonPaths, OldNodeProps)),
maps:fold(
fun(Path, NewProps, Acc) ->
Acc#{Path => {#{}, NewProps}}
end, AllProps, maps:without(CommonPaths, NewNodeProps)).