src/khepri_machine.erl

%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright © 2021-2022 VMware, Inc. or its affiliates.  All rights reserved.
%%

%% @doc
%% Khepri private low-level API.
%%
%% This module exposes the private "low-level" API to the Khepri database and
%% state machine. Main functions correspond to Ra commands implemented by the
%% state machine. All functions in {@link khepri} are built on top of this
%% module.
%%
%% This module is private. The documentation is still visible because it may
%% help understand some implementation details. However, this module should
%% never be called directly outside of Khepri.

-module(khepri_machine).
-behaviour(ra_machine).

-include_lib("kernel/include/logger.hrl").
-include_lib("stdlib/include/assert.hrl").

-include("include/khepri.hrl").
-include("src/khepri_cluster.hrl").
-include("src/khepri_error.hrl").
-include("src/khepri_evf.hrl").
-include("src/khepri_fun.hrl").
-include("src/khepri_machine.hrl").
-include("src/khepri_ret.hrl").
-include("src/khepri_tx.hrl").
-include("src/khepri_projection.hrl").

-export([fold/5,
         put/4,
         delete/3,
         transaction/5,
         register_trigger/5,
         register_projection/4]).
-export([get_keep_while_conds_state/2,
         get_projections_state/2]).

%% ra_machine callbacks.
-export([init/1,
         init_aux/1,
         handle_aux/6,
         apply/3,
         state_enter/2,
         overview/1]).

%% For internal use only.
-export([clear_cache/1,
         ack_triggers_execution/2,
         split_query_options/1,
         split_command_options/1,
         split_put_options/1,
         find_matching_nodes/3,
         find_matching_nodes/5,
         collect_node_props_cb/3,
         count_node_cb/3,
         insert_or_update_node/5,
         delete_matching_nodes/3,
         handle_tx_exception/1,
         process_query/3,
         process_command/3,
         walk_down_the_tree/6]).

-ifdef(TEST).
-export([are_keep_while_conditions_met/2,
         get_root/1,
         get_keep_while_conds/1,
         get_keep_while_conds_revidx/1,
         get_last_consistent_call_atomics/1]).
-endif.

-compile({no_auto_import, [apply/3]}).

-type tree_node() :: #node{}.
%% A node in the tree structure.

-type props() :: #{payload_version := khepri:payload_version(),
                   child_list_version := khepri:child_list_version()}.
%% Properties attached to each node in the tree structure.

-type triggered() :: #triggered{}.

-type command() :: #put{} |
                   #delete{} |
                   #tx{} |
                   #register_trigger{} |
                   #ack_triggered{} |
                   #register_projection{}.
%% Commands specific to this Ra machine.

-type machine_init_args() :: #{store_id := khepri:store_id(),
                               member := ra:server_id(),
                               snapshot_interval => non_neg_integer(),
                               commands => [command()],
                               atom() => any()}.
%% Structure passed to {@link init/1}.

-type machine_config() :: #config{}.
%% Configuration record, holding read-only or rarely changing fields.

-type keep_while_conds_map() :: #{khepri_path:native_path() =>
                                  khepri_condition:native_keep_while()}.
%% Per-node `keep_while' conditions.

-type keep_while_conds_revidx() :: #{khepri_path:native_path() =>
                                     #{khepri_path:native_path() => ok}}.
%% Internal reverse index of the keep_while conditions. If node A depends on a
%% condition on node B, then this reverse index will have a "node B => node A"
%% entry.

-type applied_changes() :: #{khepri_path:native_path() =>
                             khepri:node_props() | delete}.
%% Internal index of the per-node changes which happened during a traversal.
%% This is used when the tree is walked back up to determine the list of tree
%% nodes to remove after some keep_while condition evaluates to false.

-type projections_map() :: #{khepri_projection:projection() =>
                             khepri_path:native_pattern()}.
%% Internal mapping between {@link khepri_projection:projection()} records and
%% the native path patterns which trigger updates to each projection.

-type state() :: #?MODULE{}.
%% State of this Ra state machine.

-type aux_state() :: #khepri_machine_aux{}.
%% Auxiliary state of this Ra state machine.

-type query_fun() :: fun((state()) -> any()).
%% Function representing a query and used {@link process_query/3}.

-type walk_down_the_tree_extra() :: #{keep_while_conds =>
                                      keep_while_conds_map(),
                                      keep_while_conds_revidx =>
                                      keep_while_conds_revidx(),
                                      applied_changes =>
                                      applied_changes()}.

-type walk_down_the_tree_fun() ::
    fun((khepri_path:native_path(),
         tree_node() | {interrupted, any(), map()},
         Acc :: any()) ->
        ok(tree_node() | keep | delete, any()) |
        khepri:error()).
%% Function called to handle a node found (or an error) and used in {@link
%% walk_down_the_tree/6}.

-type ok(Type1, Type2) :: {ok, Type1, Type2}.
-type ok(Type1, Type2, Type3) :: {ok, Type1, Type2, Type3}.

-type common_ret() :: khepri:ok(khepri_adv:node_props_map()) |
                      khepri:error().

-type tx_ret() :: khepri:ok(khepri_tx:tx_fun_result()) |
                  khepri_tx:tx_abort() |
                  no_return().

-type async_ret() :: ok.

-export_type([common_ret/0,
              tx_ret/0,
              async_ret/0,

              state/0,
              machine_config/0,
              tree_node/0,
              props/0,
              triggered/0,
              keep_while_conds_map/0,
              keep_while_conds_revidx/0,
              projections_map/0]).

-define(HAS_TIME_LEFT(Timeout), (Timeout =:= infinity orelse Timeout > 0)).

-define(PROJECTION_PROPS_TO_RETURN, [payload_version,
                                     child_list_version,
                                     child_list_length,
                                     child_names,
                                     payload]).

%% -------------------------------------------------------------------
%% Machine protocol.
%% -------------------------------------------------------------------

%% TODO: Verify arguments carefully to avoid the construction of an invalid
%% command.

-spec fold(StoreId, PathPattern, Fun, Acc, Options) -> Ret when
      StoreId :: khepri:store_id(),
      PathPattern :: khepri_path:pattern(),
      Fun :: khepri:fold_fun(),
      Acc :: khepri:fold_acc(),
      Options :: khepri:query_options() | khepri:tree_options(),
      Ret :: khepri:ok(NewAcc) | khepri:error(),
      NewAcc :: Acc.
%% @doc Returns all tree nodes matching the given path pattern.
%%
%% @param StoreId the name of the Ra cluster.
%% @param PathPattern the path (or path pattern) to the nodes to get.
%% @param Options query options such as `favor'.
%%
%% @returns an `{ok, NodePropsMap}' tuple with a map with zero, one or more
%% entries, or an `{error, Reason}' tuple.

fold(StoreId, PathPattern, Fun, Acc, Options)
  when ?IS_STORE_ID(StoreId) andalso
       is_function(Fun, 3) ->
    PathPattern1 = khepri_path:from_string(PathPattern),
    khepri_path:ensure_is_valid(PathPattern1),
    {QueryOptions, TreeOptions} = split_query_options(Options),
    Query = fun(#?MODULE{root = Root}) ->
                    try
                        find_matching_nodes(
                          Root, PathPattern1, Fun, Acc, TreeOptions)
                    catch
                        Class:Reason:Stacktrace ->
                            {exception, Class, Reason, Stacktrace}
                    end
            end,
    case process_query(StoreId, Query, QueryOptions) of
        {exception, _, _, _} = Exception -> handle_tx_exception(Exception);
        Ret                              -> Ret
    end.

-spec put(StoreId, PathPattern, Payload, Options) -> Ret when
      StoreId :: khepri:store_id(),
      PathPattern :: khepri_path:pattern(),
      Payload :: khepri_payload:payload(),
      Options :: khepri:command_options() |
                 khepri:tree_options() |
                 khepri:put_options(),
      Ret :: khepri_machine:common_ret() | khepri_machine:async_ret().
%% @doc Creates or modifies a specific tree node in the tree structure.
%%
%% @param StoreId the name of the Ra cluster.
%% @param PathPattern the path (or path pattern) to the node to create or
%%        modify.
%% @param Payload the payload to put in the specified node.
%% @param Options command, tree and put options.
%%
%% @returns in the case of a synchronous put, an `{ok, NodePropsMap}' tuple
%% with a map with zero, one or more entries, or an `{error, Reason}' tuple;
%% in the case of an asynchronous put, always `ok' (the actual return value
%% may be sent by a message if a correlation ID was specified).
%%
%% @private

put(StoreId, PathPattern, Payload, Options)
  when ?IS_STORE_ID(StoreId) andalso ?IS_KHEPRI_PAYLOAD(Payload) ->
    PathPattern1 = khepri_path:from_string(PathPattern),
    khepri_path:ensure_is_valid(PathPattern1),
    Payload1 = khepri_payload:prepare(Payload),
    {CommandOptions, TreeAndPutOptions} = split_command_options(Options),
    Command = #put{path = PathPattern1,
                   payload = Payload1,
                   options = TreeAndPutOptions},
    process_command(StoreId, Command, CommandOptions);
put(_StoreId, PathPattern, Payload, _Options) ->
    ?khepri_misuse(invalid_payload, #{path => PathPattern,
                                      payload => Payload}).

-spec delete(StoreId, PathPattern, Options) -> Ret when
      StoreId :: khepri:store_id(),
      PathPattern :: khepri_path:pattern(),
      Options :: khepri:command_options() | khepri:tree_options(),
      Ret :: khepri_machine:common_ret() | khepri_machine:async_ret().
%% @doc Deletes all tree nodes matching the path pattern.
%%
%% @param StoreId the name of the Ra cluster.
%% @param PathPattern the path (or path pattern) to the nodes to delete.
%% @param Options command options such as the command type.
%%
%% @returns in the case of a synchronous delete, an `{ok, NodePropsMap}' tuple
%% with a map with zero, one or more entries, or an `{error, Reason}' tuple;
%% in the case of an asynchronous put, always `ok' (the actual return value
%% may be sent by a message if a correlation ID was specified).

delete(StoreId, PathPattern, Options) when ?IS_STORE_ID(StoreId) ->
    PathPattern1 = khepri_path:from_string(PathPattern),
    khepri_path:ensure_is_valid(PathPattern1),
    {CommandOptions, TreeOptions} = split_command_options(Options),
    %% TODO: Ensure `Extra' is unset.
    Command = #delete{path = PathPattern1,
                      options = TreeOptions},
    process_command(StoreId, Command, CommandOptions).

-spec transaction(StoreId, FunOrPath, Args, ReadWrite, Options) -> Ret when
      StoreId :: khepri:store_id(),
      FunOrPath :: Fun | PathPattern,
      Fun :: khepri_tx:tx_fun(),
      PathPattern :: khepri_path:pattern(),
      Args :: list(),
      ReadWrite :: ro | rw | auto,
      Options :: khepri:command_options() | khepri:query_options(),
      Ret :: khepri_machine:tx_ret() | khepri_machine:async_ret().
%% @doc Runs a transaction and returns the result.
%%
%% @param StoreId the name of the Ra cluster.
%% @param FunOrPath an arbitrary anonymous function or a path pattern pointing
%%        to a stored procedure.
%% @param Args a list of arguments to pass to `FunOrPath'.
%% @param ReadWrite the read/write or read-only nature of the transaction.
%% @param Options command options such as the command type.
%%
%% @returns in the case of a synchronous transaction, `{ok, Result}' where
%% `Result' is the return value of `FunOrPath', or `{error, Reason}' if the
%% anonymous function was aborted; in the case of an asynchronous transaction,
%% always `ok' (the actual return value may be sent by a message if a
%% correlation ID was specified).

transaction(StoreId, Fun, Args, auto = ReadWrite, Options)
  when ?IS_STORE_ID(StoreId) andalso
       is_list(Args) andalso
       is_function(Fun, length(Args)) andalso
       is_map(Options) ->
    case khepri_tx_adv:to_standalone_fun(Fun, ReadWrite) of
        #standalone_fun{} = StandaloneFun ->
            readwrite_transaction(StoreId, StandaloneFun, Args, Options);
        _ ->
            readonly_transaction(StoreId, Fun, Args, Options)
    end;
transaction(StoreId, PathPattern, Args, auto, Options)
  when ?IS_STORE_ID(StoreId) andalso
       ?IS_KHEPRI_PATH_PATTERN(PathPattern) andalso
       is_list(Args) andalso
       is_map(Options) ->
    PathPattern1 = khepri_path:from_string(PathPattern),
    khepri_path:ensure_is_valid(PathPattern1),
    readwrite_transaction(StoreId, PathPattern1, Args, Options);
transaction(StoreId, Fun, Args, rw = ReadWrite, Options)
  when ?IS_STORE_ID(StoreId) andalso
       is_list(Args) andalso
       is_function(Fun, length(Args)) andalso
       is_map(Options) ->
    StandaloneFun = khepri_tx_adv:to_standalone_fun(Fun, ReadWrite),
    readwrite_transaction(StoreId, StandaloneFun, Args, Options);
transaction(StoreId, PathPattern, Args, rw, Options)
  when ?IS_STORE_ID(StoreId) andalso
       ?IS_KHEPRI_PATH_PATTERN(PathPattern) andalso
       is_list(Args) andalso
       is_map(Options) ->
    PathPattern1 = khepri_path:from_string(PathPattern),
    khepri_path:ensure_is_valid(PathPattern1),
    readwrite_transaction(StoreId, PathPattern1, Args, Options);
transaction(StoreId, Fun, Args, ro, Options)
  when ?IS_STORE_ID(StoreId) andalso
       is_list(Args) andalso
       is_function(Fun, length(Args)) andalso
       is_map(Options) ->
    readonly_transaction(StoreId, Fun, Args, Options);
transaction(StoreId, PathPattern, Args, ro, Options)
  when ?IS_STORE_ID(StoreId) andalso
       ?IS_KHEPRI_PATH_PATTERN(PathPattern) andalso
       is_list(Args) andalso
       is_map(Options) ->
    PathPattern1 = khepri_path:from_string(PathPattern),
    khepri_path:ensure_is_valid(PathPattern1),
    readonly_transaction(StoreId, PathPattern1, Args, Options);
transaction(StoreId, Fun, Args, ReadWrite, Options)
  when ?IS_STORE_ID(StoreId) andalso
       is_function(Fun) andalso
       is_list(Args) andalso
       is_atom(ReadWrite) andalso
       is_map(Options) ->
    {arity, Arity} = erlang:fun_info(Fun, arity),
    ?khepri_misuse(
       denied_tx_fun_with_invalid_args,
       #{'fun' => Fun, arity => Arity, args => Args}).

-spec readonly_transaction(StoreId, FunOrPath, Args, Options) -> Ret when
      StoreId :: khepri:store_id(),
      FunOrPath :: Fun | PathPattern,
      Fun :: khepri_tx:tx_fun(),
      PathPattern :: khepri_path:pattern(),
      Args :: list(),
      Options :: khepri:query_options(),
      Ret :: khepri_machine:tx_ret().

readonly_transaction(StoreId, Fun, Args, Options)
  when is_list(Args) andalso is_function(Fun, length(Args)) ->
    Query = fun(State) ->
                    %% It is a read-only transaction, therefore we assert that
                    %% the state is unchanged and that there are no side
                    %% effects.
                    {State, Ret, []} = khepri_tx_adv:run(
                                         State, Fun, Args, false),
                    Ret
            end,
    case process_query(StoreId, Query, Options) of
        {exception, _, _, _} = Exception ->
            handle_tx_exception(Exception);
        Ret ->
            {ok, Ret}
    end;
readonly_transaction(StoreId, PathPattern, Args, Options)
  when ?IS_KHEPRI_PATH_PATTERN(PathPattern) andalso is_list(Args) ->
    Query = fun(State) ->
                    %% It is a read-only transaction, therefore we assert that
                    %% the state is unchanged and that there are no side
                    %% effects.
                    {State, Ret, []} = locate_sproc_and_execute_tx(
                                         State, PathPattern, Args, false),
                    Ret
            end,
    case process_query(StoreId, Query, Options) of
        {exception, _, _, _} = Exception ->
            handle_tx_exception(Exception);
        Ret ->
            {ok, Ret}
    end.

-spec readwrite_transaction(StoreId, FunOrPath, Args, Options) -> Ret when
      StoreId :: khepri:store_id(),
      FunOrPath :: Fun | PathPattern,
      Fun :: khepri_fun:standalone_fun(),
      PathPattern :: khepri_path:pattern(),
      Args :: list(),
      Options :: khepri:command_options(),
      Ret :: khepri_machine:tx_ret() | khepri_machine:async_ret().

readwrite_transaction(
  StoreId, #standalone_fun{arity = Arity} = StandaloneFun, Args, Options)
  when is_list(Args) andalso length(Args) =:= Arity ->
    readwrite_transaction1(StoreId, StandaloneFun, Args, Options);
readwrite_transaction(
  StoreId, Fun, Args, Options)
  when is_list(Args) andalso is_function(Fun, length(Args)) ->
    readwrite_transaction1(StoreId, Fun, Args, Options);
readwrite_transaction(
  StoreId, PathPattern, Args, Options)
  when ?IS_KHEPRI_PATH_PATTERN(PathPattern) andalso is_list(Args) ->
    PathPattern1 = khepri_path:from_string(PathPattern),
    khepri_path:ensure_is_valid(PathPattern1),
    readwrite_transaction1(StoreId, PathPattern1, Args, Options).

readwrite_transaction1(StoreId, StandaloneFunOrPath, Args, Options) ->
    Command = #tx{'fun' = StandaloneFunOrPath, args = Args},
    case process_command(StoreId, Command, Options) of
        {exception, _, _, _} = Exception ->
            handle_tx_exception(Exception);
        ok = Ret ->
            CommandType = select_command_type(Options),
            case CommandType of
                sync          -> {ok, Ret};
                {async, _, _} -> Ret
            end;
        Ret ->
            {ok, Ret}
    end.

handle_tx_exception(
  {exception, _, ?TX_ABORT(Reason), _}) ->
    {error, Reason};
handle_tx_exception(
  {exception, error, ?khepri_exception(_, _) = Reason, _Stacktrace}) ->
    %% If the exception is a programming misuse of Khepri, we
    %% re-throw a new exception instead of using `erlang:raise()'.
    %%
    %% The reason is that the default stacktrace is limited to 8 frames by
    %% default (see `erlang:system_flag(backtrace_depth, Depth)' to reconfigure
    %% it). Most if not all of those 8 frames might be taken by Khepri's
    %% internal calls, making the stacktrace uninformative to the caller.
    %%
    %% By throwing a new exception, we increase the chance that there
    %% is a frame pointing to the transaction function.
    ?khepri_misuse(Reason);
handle_tx_exception(
  {exception, Class, Reason, Stacktrace}) ->
    erlang:raise(Class, Reason, Stacktrace).

-spec register_trigger(
        StoreId, TriggerId, EventFilter, StoredProcPath, Options) ->
    Ret when
      StoreId :: khepri:store_id(),
      TriggerId :: khepri:trigger_id(),
      EventFilter :: khepri_evf:event_filter() | khepri_path:pattern(),
      StoredProcPath :: khepri_path:path(),
      Options :: khepri:command_options(),
      Ret :: ok | khepri:error().
%% @doc Registers a trigger.
%%
%% @param StoreId the name of the Ra cluster.
%% @param TriggerId the name of the trigger.
%% @param EventFilter the event filter used to associate an event with a
%%        stored procedure.
%% @param StoredProcPath the path to the stored procedure to execute when the
%%        corresponding event occurs.
%%
%% @returns `ok' if the trigger was registered, an `{error, Reason}' tuple
%% otherwise.

register_trigger(StoreId, TriggerId, EventFilter, StoredProcPath, Options)
  when ?IS_STORE_ID(StoreId) ->
    EventFilter1 = khepri_evf:wrap(EventFilter),
    StoredProcPath1 = khepri_path:from_string(StoredProcPath),
    khepri_path:ensure_is_valid(StoredProcPath1),
    Command = #register_trigger{id = TriggerId,
                                sproc = StoredProcPath1,
                                event_filter = EventFilter1},
    process_command(StoreId, Command, Options).

-spec register_projection(StoreId, PathPattern, Projection, Options) ->
    Ret when
      StoreId :: khepri:store_id(),
      PathPattern :: khepri_path:pattern(),
      Projection :: khepri_projection:projection(),
      Options :: khepri:command_options(),
      Ret :: ok | khepri:error().
%% @doc Registers a projection.
%%
%% @param StoreId the name of the Ra cluster.
%% @param PathPattern the pattern of tree nodes which should be projected.
%% @param Projection the projection record created with {@link
%%        khepri_projection:new/3}.
%% @param Options command options such as the command type.
%%
%% @returns `ok' if the projection was registered, an `{error, Reason}' tuple
%% otherwise.

register_projection(
  StoreId, PathPattern0,
  #khepri_projection{name = Name,
                     projection_fun = #standalone_fun{},
                     ets_options = EtsOptions} = Projection,
  Options0) when is_atom(Name) andalso is_list(EtsOptions) ->
    Options = Options0#{reply_from => local},
    PathPattern = khepri_path:from_string(PathPattern0),
    khepri_path:ensure_is_valid(PathPattern),
    Command = #register_projection{pattern = PathPattern,
                                   projection = Projection},
    process_command(StoreId, Command, Options).

-spec ack_triggers_execution(StoreId, TriggeredStoredProcs) ->
    Ret when
      StoreId :: khepri:store_id(),
      TriggeredStoredProcs :: [triggered()],
      Ret :: ok | khepri:error().
%% @doc Acknowledges the execution of a trigger.
%%
%% This is part of a mechanism to ensure that a trigger is executed at least
%% once.
%%
%% @private

ack_triggers_execution(StoreId, TriggeredStoredProcs)
  when ?IS_STORE_ID(StoreId) ->
    Command = #ack_triggered{triggered = TriggeredStoredProcs},
    process_command(StoreId, Command, #{async => true}).

-spec get_keep_while_conds_state(StoreId, Options) -> Ret when
      StoreId :: khepri:store_id(),
      Options :: khepri:query_options(),
      Ret :: khepri:ok(keep_while_conds_map()) | khepri:error().
%% @doc Returns the `keep_while' conditions internal state.
%%
%% The returned state consists of all the `keep_while' condition set so far.
%% However, it doesn't include the reverse index.
%%
%% @param StoreId the name of the Ra cluster.
%%
%% @returns the `keep_while' conditions internal state.
%%
%% @private

get_keep_while_conds_state(StoreId, Options)
  when ?IS_STORE_ID(StoreId) ->
    Query = fun(#?MODULE{keep_while_conds = KeepWhileConds}) ->
                    {ok, KeepWhileConds}
            end,
    Options1 = Options#{favor => consistency},
    process_query(StoreId, Query, Options1).

-spec get_projections_state(StoreId, Options) -> Ret when
      StoreId :: khepri:store_id(),
      Options :: khepri:query_options(),
      Ret :: khepri:ok(projections_map()) | khepri:error().
%% @doc Returns the `projections' internal state.
%%
%% The returned state consists of the mapping between path pattern and
%% {@link khepri_projection:projection()} records.
%%
%% @see khepri_projection.
%%
%% @private

get_projections_state(StoreId, Options)
  when ?IS_STORE_ID(StoreId) ->
    Query = fun(#?MODULE{projections = Projections}) ->
                    {ok, Projections}
            end,
    Options1 = Options#{favor => consistency},
    process_query(StoreId, Query, Options1).

-spec split_query_options(Options) -> {QueryOptions, TreeOptions} when
      Options :: QueryOptions | TreeOptions,
      QueryOptions :: khepri:query_options(),
      TreeOptions :: khepri:tree_options().
%% @private

split_query_options(Options) ->
    Options1 = set_default_options(Options),
    maps:fold(
      fun
          (Option, Value, {Q, T}) when
                Option =:= timeout orelse
                Option =:= favor ->
              Q1 = Q#{Option => Value},
              {Q1, T};
          (props_to_return, [], {Q, T}) ->
              {Q, T};
          (Option, Value, {Q, T}) when
                Option =:= expect_specific_node orelse
                Option =:= props_to_return orelse
                Option =:= include_root_props ->
              T1 = T#{Option => Value},
              {Q, T1}
      end, {#{}, #{}}, Options1).

-spec split_command_options(Options) ->
    {CommandOptions, TreeAndPutOptions} when
      Options :: CommandOptions | TreeAndPutOptions,
      CommandOptions :: khepri:command_options(),
      TreeAndPutOptions :: khepri:tree_options() | khepri:put_options().
%% @private

split_command_options(Options) ->
    Options1 = set_default_options(Options),
    maps:fold(
      fun
          (Option, Value, {C, TP}) when
                Option =:= reply_from orelse
                Option =:= timeout orelse
                Option =:= async ->
              C1 = C#{Option => Value},
              {C1, TP};
          (props_to_return, [], Acc) ->
              Acc;
          (Option, Value, {C, TP}) when
                Option =:= expect_specific_node orelse
                Option =:= props_to_return orelse
                Option =:= include_root_props ->
              TP1 = TP#{Option => Value},
              {C, TP1};
          (keep_while, KeepWhile, {C, TP}) ->
              %% `keep_while' is kept in `TreeAndPutOptions' here. The state
              %% machine will extract it in `apply()'.
              KeepWhile1 = khepri_condition:ensure_native_keep_while(
                             KeepWhile),
              TP1 = TP#{keep_while => KeepWhile1},
              {C, TP1}
      end, {#{}, #{}}, Options1).

-spec split_put_options(TreeAndPutOptions) -> {TreeOptions, PutOptions} when
      TreeAndPutOptions :: TreeOptions | PutOptions,
      TreeOptions :: khepri:tree_options(),
      PutOptions :: khepri:put_options().
%% @private

split_put_options(TreeAndPutOptions) ->
    maps:fold(
      fun
          (keep_while, KeepWhile, {T, P}) ->
              P1 = P#{keep_while => KeepWhile},
              {T, P1};
          (Option, Value, {T, P}) ->
              T1 = T#{Option => Value},
              {T1, P}
      end, {#{}, #{}}, TreeAndPutOptions).

-define(DEFAULT_PROPS_TO_RETURN, [payload,
                                  payload_version]).

set_default_options(Options) ->
    %% By default, return payload-related properties. The caller can set
    %% `props_to_return' to an empty map to get a minimal return value.
    Options1 = case Options of
                   #{props_to_return := _} ->
                       Options;
                   _ ->
                       Options#{props_to_return => ?DEFAULT_PROPS_TO_RETURN}
               end,
    Options1.

-spec process_command(StoreId, Command, Options) -> Ret when
      StoreId :: khepri:store_id(),
      Command :: command(),
      Options :: khepri:command_options(),
      Ret :: any().
%% @doc Processes a command which is appended to the Ra log and processed by
%% this state machine code.
%%
%% `Command' may modify the state of the machine.
%%
%% The command associated code is executed in the context of the state machine
%% process on each Ra members.
%%
%% @param StoreId the name of the Ra cluster.
%%
%% @returns the result of the command or an "error" tuple.
%%
%% @private

process_command(StoreId, Command, Options) ->
    CommandType = select_command_type(Options),
    case CommandType of
        sync ->
            process_sync_command(StoreId, Command, Options);
        {async, Correlation, Priority} ->
            process_async_command(
              StoreId, Command, Correlation, Priority)
    end.

process_sync_command(StoreId, Command, Options) ->
    Timeout = get_timeout(Options),
    ReplyFrom = maps:get(reply_from, Options, leader),
    CommandOptions = #{timeout => Timeout, reply_from => ReplyFrom},
    T0 = khepri_utils:start_timeout_window(Timeout),
    LeaderId = khepri_cluster:get_cached_leader(StoreId),
    RaServer = use_leader_or_local_ra_server(StoreId, LeaderId),
    case ra:process_command(RaServer, Command, CommandOptions) of
        {ok, Ret, NewLeaderId} ->
            khepri_cluster:cache_leader_if_changed(
              StoreId, LeaderId, NewLeaderId),
            just_did_consistent_call(StoreId),
            ?raise_exception_if_any(Ret);
        {timeout, _} = TimedOut ->
            {error, TimedOut};
        {error, Reason}
          when LeaderId =/= undefined andalso ?HAS_TIME_LEFT(Timeout) andalso
               (Reason == noproc orelse Reason == nodedown) ->
            %% The cached leader is no more. We simply clear the cache
            %% entry and retry.
            khepri_cluster:clear_cached_leader(StoreId),
            NewTimeout = khepri_utils:end_timeout_window(Timeout, T0),
            Options1 = Options#{timeout => NewTimeout},
            process_sync_command(StoreId, Command, Options1);
        {error, Reason} = Error
          when LeaderId =:= undefined andalso ?HAS_TIME_LEFT(Timeout) andalso
               (Reason == noproc orelse Reason == nodedown) ->
            case khepri_utils:is_ra_server_alive(RaServer) of
                true ->
                    %% The follower doesn't know about the new leader yet.
                    %% Retry again after waiting a bit.
                    NewTimeout0 = khepri_utils:end_timeout_window(Timeout, T0),
                    NewTimeout = khepri_utils:sleep(
                                   ?NOPROC_RETRY_INTERVAL, NewTimeout0),
                    Options1 = Options#{timeout => NewTimeout},
                    process_sync_command(StoreId, Command, Options1);
                false ->
                    Error
            end;
        {error, _} = Error ->
            Error
    end.

process_async_command(StoreId, Command, Correlation, Priority) ->
    LocalServerId = {StoreId, node()},
    ra:pipeline_command(LocalServerId, Command, Correlation, Priority).

-spec select_command_type(Options) -> CommandType when
      Options :: khepri:command_options(),
      CommandType :: sync | {async, Correlation, Priority},
      Correlation :: ra_server:command_correlation(),
      Priority :: ra_server:command_priority().
%% @doc Selects the command type depending on what the caller wants.
%%
%% @private

-define(DEFAULT_RA_COMMAND_CORRELATION, no_correlation).
-define(DEFAULT_RA_COMMAND_PRIORITY, low).
-define(IS_RA_COMMAND_CORRELATION(Correlation),
        (is_integer(Correlation) orelse is_reference(Correlation))).
-define(IS_RA_COMMAND_PRIORITY(Priority),
        (Priority =:= normal orelse Priority =:= low)).

select_command_type(Options) when not is_map_key(async, Options) ->
    sync;
select_command_type(#{async := false}) ->
    sync;
select_command_type(#{async := true}) ->
    {async, ?DEFAULT_RA_COMMAND_CORRELATION, ?DEFAULT_RA_COMMAND_PRIORITY};
select_command_type(#{async := Correlation})
  when ?IS_RA_COMMAND_CORRELATION(Correlation) ->
    {async, Correlation, ?DEFAULT_RA_COMMAND_PRIORITY};
select_command_type(#{async := Priority})
  when ?IS_RA_COMMAND_PRIORITY(Priority) ->
    {async, ?DEFAULT_RA_COMMAND_CORRELATION, Priority};
select_command_type(#{async := {Correlation, Priority}})
  when ?IS_RA_COMMAND_CORRELATION(Correlation) andalso
       ?IS_RA_COMMAND_PRIORITY(Priority) ->
    {async, Correlation, Priority}.

-spec process_query(StoreId, QueryFun, Options) -> Ret when
      StoreId :: khepri:store_id(),
      QueryFun :: query_fun(),
      Options :: khepri:query_options(),
      Ret :: any().
%% @doc Processes a query which is by the Ra leader.
%%
%% The `QueryFun' function takes the machine state as an argument and can
%% return anything. However, the machine state is never modified. The query
%% does not go through the Ra log and is not replicated.
%%
%% The `QueryFun' function is executed from a process on the leader Ra member.
%%
%% @param StoreId the name of the Ra cluster.
%%
%% @returns the result of the query or an "error" tuple.
%%
%% @private

process_query(StoreId, QueryFun, Options) ->
    QueryType = select_query_type(StoreId, Options),
    Timeout = get_timeout(Options),
    case QueryType of
        local -> process_local_query(StoreId, QueryFun, Timeout);
        _     -> process_non_local_query(StoreId, QueryFun, QueryType, Timeout)
    end.

-spec process_local_query(StoreId, QueryFun, Timeout) -> Ret when
      StoreId :: khepri:store_id(),
      QueryFun :: query_fun(),
      Timeout :: timeout(),
      Ret :: any().

process_local_query(StoreId, QueryFun, Timeout) ->
    LocalServerId = {StoreId, node()},
    Ret = ra:local_query(LocalServerId, QueryFun, Timeout),
    process_query_response(
      StoreId, LocalServerId, false, QueryFun, local, Timeout, Ret).

-spec process_non_local_query(StoreId, QueryFun, QueryType, Timeout) ->
    Ret when
      StoreId :: khepri:store_id(),
      QueryFun :: query_fun(),
      QueryType :: leader | consistent,
      Timeout :: timeout(),
      Ret :: any().

process_non_local_query(StoreId, QueryFun, QueryType, Timeout)
  when QueryType =:= leader orelse
       QueryType =:= consistent ->
    T0 = khepri_utils:start_timeout_window(Timeout),
    LeaderId = khepri_cluster:get_cached_leader(StoreId),
    RaServer = use_leader_or_local_ra_server(StoreId, LeaderId),
    Ret = case QueryType of
              leader     -> ra:leader_query(RaServer, QueryFun, Timeout);
              consistent -> ra:consistent_query(RaServer, QueryFun, Timeout)
          end,
    NewTimeout = khepri_utils:end_timeout_window(Timeout, T0),
    %% TODO: If the consistent query times out in the context of
    %% `QueryType=compromise`, should we retry with a local query to
    %% never block the query and let the caller continue?
    process_query_response(
      StoreId, RaServer, LeaderId =/= undefined, QueryFun, QueryType,
      NewTimeout, Ret).

-spec process_query_response(
        StoreId, RaServer, IsLeader, QueryFun, QueryType, Timeout,
        Response) ->
    Ret when
      StoreId :: khepri:store_id(),
      RaServer :: ra:server_id(),
      IsLeader :: boolean(),
      QueryFun :: query_fun(),
      QueryType :: local | leader | consistent,
      Timeout :: timeout(),
      Response :: {ok, {RaIndex, any()}, NewLeaderId} |
                  {ok, any(), NewLeaderId} |
                  {error, any()} |
                  {timeout, ra:server_id()},
      RaIndex :: ra:index(),
      NewLeaderId :: ra:server_id(),
      Ret :: any().

process_query_response(
  StoreId, RaServer, IsLeader, _QueryFun, consistent, _Timeout,
  {ok, Ret, NewLeaderId}) ->
    case IsLeader of
        true ->
            khepri_cluster:cache_leader_if_changed(
              StoreId, RaServer, NewLeaderId);
        false ->
            khepri_cluster:cache_leader(StoreId, NewLeaderId)
    end,
    just_did_consistent_call(StoreId),
    ?raise_exception_if_any(Ret);
process_query_response(
  StoreId, RaServer, IsLeader, _QueryFun, _QueryType, _Timeout,
  {ok, {_RaIndex, Ret}, NewLeaderId}) ->
    case IsLeader of
        true ->
            khepri_cluster:cache_leader_if_changed(
              StoreId, RaServer, NewLeaderId);
        false ->
            khepri_cluster:cache_leader(StoreId, NewLeaderId)
    end,
    ?raise_exception_if_any(Ret);
process_query_response(
  _StoreId, _RaServer, _IsLeader, _QueryFun, _QueryType, _Timeout,
  {timeout, _} = TimedOut) ->
    {error, TimedOut};
process_query_response(
  StoreId, _RaServer, true = _IsLeader, QueryFun, QueryType, Timeout,
  {error, Reason})
  when QueryType =/= local andalso ?HAS_TIME_LEFT(Timeout) andalso
       (Reason == noproc orelse Reason == nodedown) ->
    %% The cached leader is no more. We simply clear the cache
    %% entry and retry. It may time out eventually.
    khepri_cluster:clear_cached_leader(StoreId),
    process_non_local_query(StoreId, QueryFun, QueryType, Timeout);
process_query_response(
  StoreId, RaServer, false = _IsLeader, QueryFun, QueryType, Timeout,
  {error, Reason} = Error)
  when QueryType =/= local andalso ?HAS_TIME_LEFT(Timeout) andalso
       (Reason == noproc orelse Reason == nodedown)->
    case khepri_utils:is_ra_server_alive(RaServer) of
        true ->
            %% The follower doesn't know about the new leader yet. Retry again
            %% after waiting a bit.
            NewTimeout = khepri_utils:sleep(?NOPROC_RETRY_INTERVAL, Timeout),
            process_non_local_query(StoreId, QueryFun, QueryType, NewTimeout);
        false ->
            Error
    end;
process_query_response(
  _StoreId, _RaServer, _IsLeader, _QueryFun, _QueryType, _Timeout,
  {error, _} = Error) ->
    Error.

-spec select_query_type(StoreId, Options) -> QueryType when
      StoreId :: khepri:store_id(),
      Options :: khepri:query_options(),
      QueryType :: local | leader | consistent.
%% @doc Selects the query type depending on what the caller favors.
%%
%% @private

select_query_type(StoreId, #{favor := Favor}) ->
    do_select_query_type(StoreId, Favor);
select_query_type(StoreId, _Options) ->
    do_select_query_type(StoreId, compromise).

-define(
   LAST_CONSISTENT_CALL_TS_REF(StoreId),
   {khepri, last_consistent_call_ts_ref, StoreId}).

do_select_query_type(StoreId, compromise) ->
    Key = ?LAST_CONSISTENT_CALL_TS_REF(StoreId),
    Idx = 1,
    case persistent_term:get(Key, undefined) of
        AtomicsRef when AtomicsRef =/= undefined ->
            %% We verify when was the last time we did a command or a
            %% consistent query (i.e. we made sure there was an active leader
            %% in a cluster with a quorum of active members).
            %%
            %% If the last one was more than 10 seconds ago, we force a
            %% consistent query to verify the cluster health at the same time.
            %% Otherwise, we select a leader query which is a good balance
            %% between freshness and latency.
            Last = atomics:get(AtomicsRef, Idx),
            Now = erlang:system_time(second),
            ConsistentAgainAfter = application:get_env(
                                     khepri,
                                     consistent_query_interval_in_compromise,
                                     10),
            if
                Now - Last < ConsistentAgainAfter -> leader;
                true                              -> consistent
            end;
        undefined ->
            consistent
    end;
do_select_query_type(_StoreId, consistency) ->
    consistent;
do_select_query_type(_StoreId, low_latency) ->
    local.

just_did_consistent_call(StoreId) ->
    %% We record the timestamp of the successful command or consistent query
    %% which just returned. This timestamp is used in the `compromise' query
    %% strategy to perform a consistent query from time to time, and leader
    %% queries the rest of the time.
    %%
    %% We store the system time as seconds in an `atomics' structure. The
    %% reference of that structure is stored in a persistent term. We don't
    %% store the timestamp directly in a persistent term because it is not
    %% suited for frequent writes. This way, we store the `atomics' reference
    %% once and update the `atomics' afterwards.
    Idx = 1,
    AtomicsRef = case get_last_consistent_call_atomics(StoreId) of
                     Ref when Ref =/= undefined ->
                         Ref;
                     undefined ->
                         Key = ?LAST_CONSISTENT_CALL_TS_REF(StoreId),
                         Ref = atomics:new(1, []),
                         persistent_term:put(Key, Ref),
                         Ref
                 end,
    Now = erlang:system_time(second),
    ok = atomics:put(AtomicsRef, Idx, Now),
    ok.

get_last_consistent_call_atomics(StoreId) ->
    Key = ?LAST_CONSISTENT_CALL_TS_REF(StoreId),
    persistent_term:get(Key, undefined).

-spec get_timeout(Options) -> Timeout when
      Options :: khepri:command_options() | khepri:query_options(),
      Timeout :: timeout().
%% @private

get_timeout(#{timeout := Timeout}) -> Timeout;
get_timeout(_)                     -> khepri_app:get_default_timeout().

use_leader_or_local_ra_server(_StoreId, LeaderId)
  when LeaderId =/= undefined ->
    LeaderId;
use_leader_or_local_ra_server(StoreId, undefined) ->
    ThisNode = node(),
    khepri_cluster:node_to_member(StoreId, ThisNode).

-spec clear_cache(StoreId) -> ok when
      StoreId :: khepri:store_id().
%% @doc Clears the cached data for the given `StoreId'.
%%
%% @private

clear_cache(StoreId) ->
    _ = persistent_term:erase(?LAST_CONSISTENT_CALL_TS_REF(StoreId)),
    ok.

%% -------------------------------------------------------------------
%% ra_machine callbacks.
%% -------------------------------------------------------------------

-spec init(Params) -> State when
      Params :: machine_init_args(),
      State :: state().
%% @private

init(#{store_id := StoreId,
       member := Member} = Params) ->
    Config = case Params of
                 #{snapshot_interval := SnapshotInterval} ->
                     #config{store_id = StoreId,
                             member = Member,
                             snapshot_interval = SnapshotInterval};
                 _ ->
                     #config{store_id = StoreId,
                             member = Member}
             end,
    State = #?MODULE{config = Config},

    %% Create initial "schema" if provided.
    Commands = maps:get(commands, Params, []),
    State3 = lists:foldl(
               fun (Command, State1) ->
                       Meta = #{index => 0,
                                term => 0,
                                system_time => 0},
                       {S, _, _} = apply(Meta, Command, State1),
                       S
               end, State, Commands),
    reset_applied_command_count(State3).

-spec init_aux(StoreId :: khepri:store_id()) -> aux_state().
%% @private

init_aux(StoreId) ->
    #khepri_machine_aux{store_id = StoreId}.

-spec handle_aux(RaState, Type, Command, AuxState, LogState, MachineState) ->
    {no_reply, AuxState, LogState} when
      RaState :: ra_server:ra_state(),
      Type :: {call, ra:from()} | cast,
      Command :: term(),
      AuxState :: aux_state(),
      LogState :: ra_log:state(),
      MachineState :: state().
%% @private

handle_aux(
  _RaState, cast,
  #trigger_projection{path = Path,
                      old_props = OldProps,
                      new_props = NewProps,
                      projection = Projection},
  AuxState, LogState, _MachineState) ->
    khepri_projection:trigger(Projection, Path, OldProps, NewProps),
    {no_reply, AuxState, LogState};
handle_aux(_RaState, cast, restore_projections, AuxState, LogState,
  #?MODULE{projections = Projections, root = Root}) ->
    maps:foreach(fun(Projection, PathPattern) ->
                         restore_projection(
                           Projection, Root, PathPattern)
                 end, Projections),
    {no_reply, AuxState, LogState};
handle_aux(_RaState, _Type, _Command, AuxState, LogState, _MachineState) ->
    {no_reply, AuxState, LogState}.

restore_projection(Projection, Root, PathPattern) ->
    TreeOptions = #{props_to_return => ?PROJECTION_PROPS_TO_RETURN,
                    include_root_props => true},
    case find_matching_nodes(Root, PathPattern, TreeOptions) of
        {ok, MatchingNodes} ->
            maps:foreach(fun(Path, Props) ->
                                 khepri_projection:trigger(
                                   Projection, Path, #{}, Props)
                         end, MatchingNodes);
        Error ->
            ?LOG_DEBUG(
               "Failed to recover projection ~s due to an error: ~p",
               khepri_projection:name(Projection), Error),
            ok
    end.

-spec apply(Meta, Command, State) -> {State, Ret, SideEffects} when
      Meta :: ra_machine:command_meta_data(),
      Command :: command(),
      State :: state(),
      Ret :: any(),
      SideEffects :: ra_machine:effects().
%% @private

%% TODO: Handle unknown/invalid commands.
apply(
  Meta,
  #put{path = PathPattern, payload = Payload, options = TreeAndPutOptions},
  State) ->
    {TreeOptions, Extra} = split_put_options(TreeAndPutOptions),
    Ret = insert_or_update_node(
            State, PathPattern, Payload, Extra, TreeOptions),
    bump_applied_command_count(Ret, Meta);
apply(
  Meta,
  #delete{path = PathPattern, options = TreeOptions},
  State) ->
    Ret = delete_matching_nodes(State, PathPattern, TreeOptions),
    bump_applied_command_count(Ret, Meta);
apply(
  Meta,
  #tx{'fun' = StandaloneFun, args = Args},
  State) when ?IS_STANDALONE_FUN(StandaloneFun) ->
    Ret = khepri_tx_adv:run(State, StandaloneFun, Args, true),
    bump_applied_command_count(Ret, Meta);
apply(
  Meta,
  #tx{'fun' = PathPattern, args = Args},
  State) when ?IS_KHEPRI_PATH_PATTERN(PathPattern) ->
    Ret = locate_sproc_and_execute_tx(State, PathPattern, Args, true),
    bump_applied_command_count(Ret, Meta);
apply(
  Meta,
  #register_trigger{id = TriggerId,
                    sproc = StoredProcPath,
                    event_filter = EventFilter},
  #?MODULE{triggers = Triggers} = State) ->
    StoredProcPath1 = khepri_path:realpath(StoredProcPath),
    EventFilter1 = case EventFilter of
                       #evf_tree{path = Path} ->
                           Path1 = khepri_path:realpath(Path),
                           EventFilter#evf_tree{path = Path1}
                   end,
    Triggers1 = Triggers#{TriggerId => #{sproc => StoredProcPath1,
                                         event_filter => EventFilter1}},
    State1 = State#?MODULE{triggers = Triggers1},
    Ret = {State1, ok},
    bump_applied_command_count(Ret, Meta);
apply(
  Meta,
  #ack_triggered{triggered = ProcessedTriggers},
  #?MODULE{emitted_triggers = EmittedTriggers} = State) ->
    EmittedTriggers1 = EmittedTriggers -- ProcessedTriggers,
    State1 = State#?MODULE{emitted_triggers = EmittedTriggers1},
    Ret = {State1, ok},
    bump_applied_command_count(Ret, Meta);
apply(
  Meta,
  #register_projection{pattern = PathPattern, projection = Projection},
  #?MODULE{projections = Projections, root = Root} = State) ->
    Reply = khepri_projection:init(Projection),
    State1 = case Reply of
                 ok ->
                     restore_projection(Projection, Root, PathPattern),
                     Projections1 = Projections#{Projection => PathPattern},
                     State#?MODULE{projections = Projections1};
                 _  ->
                     State
             end,
    Ret = {State1, Reply},
    bump_applied_command_count(Ret, Meta).

-spec bump_applied_command_count(ApplyRet, Meta) ->
    {State, Ret, SideEffects} when
      ApplyRet :: {State, Ret} | {State, Ret, SideEffects},
      State :: state(),
      Ret :: any(),
      Meta :: ra_machine:command_meta_data(),
      SideEffects :: ra_machine:effects().
%% @private

bump_applied_command_count({State, Result}, Meta) ->
    bump_applied_command_count({State, Result, []}, Meta);
bump_applied_command_count(
  {#?MODULE{config = #config{snapshot_interval = SnapshotInterval},
            metrics = Metrics} = State,
   Result,
   SideEffects},
  #{index := RaftIndex}) ->
    AppliedCmdCount0 = maps:get(applied_command_count, Metrics, 0),
    AppliedCmdCount = AppliedCmdCount0 + 1,
    case AppliedCmdCount < SnapshotInterval of
        true ->
            Metrics1 = Metrics#{applied_command_count => AppliedCmdCount},
            State1 = State#?MODULE{metrics = Metrics1},
            {State1, Result, SideEffects};
        false ->
            ?LOG_DEBUG(
               "Move release cursor after ~b commands applied "
               "(>= ~b commands)",
               [AppliedCmdCount, SnapshotInterval],
               #{domain => [khepri, ra_machine]}),
            State1 = reset_applied_command_count(State),
            ReleaseCursor = {release_cursor, RaftIndex, State1},
            SideEffects1 = [ReleaseCursor | SideEffects],
            {State1, Result, SideEffects1}
    end.

reset_applied_command_count(#?MODULE{metrics = Metrics} = State) ->
    Metrics1 = maps:remove(applied_command_count, Metrics),
    State#?MODULE{metrics = Metrics1}.

%% @private

state_enter(leader, State) ->
    SideEffects1 = emitted_triggers_to_side_effects(State),
    SideEffects1;
state_enter(recovered, _State) ->
    SideEffect = {aux, restore_projections},
    [SideEffect];
state_enter(_StateName, _State) ->
    [].

%% @private

emitted_triggers_to_side_effects(
  #?MODULE{config = #config{store_id = StoreId},
           emitted_triggers = [_ | _] = EmittedTriggers}) ->
    SideEffect = {mod_call,
                  khepri_event_handler,
                  handle_triggered_sprocs,
                  [StoreId, EmittedTriggers]},
    [SideEffect];
emitted_triggers_to_side_effects(_State) ->
    [].

%% @private

overview(#?MODULE{config = #config{store_id = StoreId},
                  root = Root,
                  triggers = Triggers,
                  keep_while_conds = KeepWhileConds}) ->
    TreeOptions = #{props_to_return => [payload,
                                        payload_version,
                                        child_list_version,
                                        child_list_length],
                    include_root_props => true},
    {ok, NodePropsMap} = find_matching_nodes(
                           Root, [?KHEPRI_WILDCARD_STAR_STAR], TreeOptions),
    MapFun = fun (#{sproc := Sproc} = Props) ->
                     Props#{sproc => khepri_fun:to_fun(Sproc)};
                 (Props) ->
                     Props
             end,
    Tree = khepri_utils:flat_struct_to_tree(NodePropsMap, MapFun),
    #{store_id => StoreId,
      tree => Tree,
      triggers => Triggers,
      keep_while_conds => KeepWhileConds}.

%% -------------------------------------------------------------------
%% Internal functions.
%% -------------------------------------------------------------------

-spec create_node_record(Payload) -> Node when
      Payload :: khepri_payload:payload(),
      Node :: tree_node().
%% @private

create_node_record(Payload) ->
    #node{props = ?INIT_NODE_PROPS,
          payload = Payload}.

-spec set_node_payload(Node, Payload) -> Node when
      Node :: tree_node(),
      Payload :: khepri_payload:payload().
%% @private

set_node_payload(#node{payload = Payload} = Node, Payload) ->
    Node;
set_node_payload(#node{props = #{payload_version := DVersion} = Stat} = Node,
                 Payload) ->
    Stat1 = Stat#{payload_version => DVersion + 1},
    Node#node{props = Stat1, payload = Payload}.

-spec remove_node_payload(Node) -> Node when
      Node :: tree_node().
%% @private

remove_node_payload(
  #node{payload = ?NO_PAYLOAD} = Node) ->
    Node;
remove_node_payload(
  #node{props = #{payload_version := DVersion} = Stat} = Node) ->
    Stat1 = Stat#{payload_version => DVersion + 1},
    Node#node{props = Stat1, payload = khepri_payload:none()}.

-spec add_node_child(Node, ChildName, Child) -> Node when
      Node :: tree_node(),
      Child :: tree_node(),
      ChildName :: khepri_path:component().

add_node_child(#node{props = #{child_list_version := CVersion} = Stat,
                     child_nodes = Children} = Node,
               ChildName, Child) ->
    Children1 = Children#{ChildName => Child},
    Stat1 = Stat#{child_list_version => CVersion + 1},
    Node#node{props = Stat1, child_nodes = Children1}.

-spec update_node_child(Node, ChildName, Child) -> Node when
      Node :: tree_node(),
      Child :: tree_node(),
      ChildName :: khepri_path:component().

update_node_child(#node{child_nodes = Children} = Node, ChildName, Child) ->
    Children1 = Children#{ChildName => Child},
    Node#node{child_nodes = Children1}.

-spec remove_node_child(Node, ChildName) -> Node when
      Node :: tree_node(),
      ChildName :: khepri_path:component().

remove_node_child(#node{props = #{child_list_version := CVersion} = Stat,
                        child_nodes = Children} = Node,
                  ChildName) ->
    ?assert(maps:is_key(ChildName, Children)),
    Stat1 = Stat#{child_list_version => CVersion + 1},
    Children1 = maps:remove(ChildName, Children),
    Node#node{props = Stat1, child_nodes = Children1}.

-spec remove_node_child_nodes(Node) -> Node when
      Node :: tree_node().

remove_node_child_nodes(
  #node{child_nodes = Children} = Node) when Children =:= #{} ->
    Node;
remove_node_child_nodes(
  #node{props = #{child_list_version := CVersion} = Stat} = Node) ->
    Stat1 = Stat#{child_list_version => CVersion + 1},
    Node#node{props = Stat1, child_nodes = #{}}.

-spec gather_node_props(Node, TreeOptions) -> NodeProps when
      Node :: tree_node(),
      TreeOptions :: khepri:tree_options(),
      NodeProps :: khepri:node_props().

gather_node_props(#node{props = #{payload_version := PVersion,
                                  child_list_version := CVersion},
                        payload = Payload,
                        child_nodes = Children},
                  #{props_to_return := WantedProps}) ->
    lists:foldl(
      fun
          (payload_version, Acc) ->
              Acc#{payload_version => PVersion};
          (child_list_version, Acc) ->
              Acc#{child_list_version => CVersion};
          (child_list_length, Acc) ->
              Acc#{child_list_length => maps:size(Children)};
          (child_names, Acc) ->
              Acc#{child_names => maps:keys(Children)};
          (payload, Acc) ->
              case Payload of
                  #p_data{data = Data}  -> Acc#{data => Data};
                  #p_sproc{sproc = Fun} -> Acc#{sproc => Fun};
                  _                     -> Acc
              end;
          (has_payload, Acc) ->
              case Payload of
                  #p_data{data = _}   -> Acc#{has_data => true};
                  #p_sproc{sproc = _} -> Acc#{is_sproc => true};
                  _                   -> Acc
              end;
          (raw_payload, Acc) ->
              Acc#{raw_payload => Payload}
      end, #{}, WantedProps);
gather_node_props(#node{}, _Options) ->
    #{}.

gather_node_props_from_old_and_new_nodes(OldNode, NewNode, TreeOptions) ->
    OldNodeProps = case OldNode of
                       undefined -> #{};
                       _         -> gather_node_props(OldNode, TreeOptions)
                   end,
    NewNodeProps0 = gather_node_props(NewNode, TreeOptions),
    NewNodeProps1 = maps:remove(data, NewNodeProps0),
    NewNodeProps2 = maps:remove(sproc, NewNodeProps1),
    maps:merge(OldNodeProps, NewNodeProps2).

gather_node_props_for_error(Node) ->
    gather_node_props(Node, #{props_to_return => ?DEFAULT_PROPS_TO_RETURN}).

-spec to_absolute_keep_while(BasePath, KeepWhile) -> KeepWhile when
      BasePath :: khepri_path:native_path(),
      KeepWhile :: khepri_condition:native_keep_while().
%% @private

to_absolute_keep_while(BasePath, KeepWhile) ->
    maps:fold(
      fun(Path, Cond, Acc) ->
              AbsPath = khepri_path:abspath(Path, BasePath),
              Acc#{AbsPath => Cond}
      end, #{}, KeepWhile).

-spec are_keep_while_conditions_met(Node, KeepWhile) -> Ret when
      Node :: tree_node(),
      KeepWhile :: khepri_condition:native_keep_while(),
      Ret :: true | {false, any()}.
%% @private

are_keep_while_conditions_met(_, KeepWhile)
  when KeepWhile =:= #{} ->
    true;
are_keep_while_conditions_met(Root, KeepWhile) ->
    TreeOptions = #{props_to_return => [payload,
                                        payload_version,
                                        child_list_version,
                                        child_list_length]},
    maps:fold(
      fun
          (Path, Condition, true) ->
              case find_matching_nodes(Root, Path, TreeOptions) of
                  {ok, Result} when Result =/= #{} ->
                      are_keep_while_conditions_met1(Result, Condition);
                  {ok, _} ->
                      {false, {pattern_matches_no_nodes, Path}};
                  {error, Reason} ->
                      {false, Reason}
              end;
          (_, _, False) ->
              False
      end, true, KeepWhile).

are_keep_while_conditions_met1(Result, Condition) ->
    maps:fold(
      fun
          (Path, NodeProps, true) ->
              khepri_condition:is_met(Condition, Path, NodeProps);
          (_, _, False) ->
              False
      end, true, Result).

is_keep_while_condition_met_on_self(
  Path, Node, #{keep_while_conds := KeepWhileConds}) ->
    case KeepWhileConds of
        #{Path := #{Path := Condition}} ->
            khepri_condition:is_met(Condition, Path, Node);
        _ ->
            true
    end;
is_keep_while_condition_met_on_self(_, _, _) ->
    true.

update_keep_while_conds_extra(
  #{keep_while_conds := KeepWhileConds,
    keep_while_conds_revidx := KeepWhileCondsRevIdx} = Extra,
  Path, KeepWhile) ->
    AbsKeepWhile = to_absolute_keep_while(Path, KeepWhile),
    KeepWhileCondsRevIdx1 = update_keep_while_conds_revidx(
                              KeepWhileConds, KeepWhileCondsRevIdx,
                              Path, AbsKeepWhile),
    KeepWhileConds1 = KeepWhileConds#{Path => AbsKeepWhile},
    Extra#{keep_while_conds => KeepWhileConds1,
           keep_while_conds_revidx => KeepWhileCondsRevIdx1}.

-spec update_keep_while_conds_revidx(
        KeepWhileConds, KeepWhileCondsRevIdx, Watcher, KeepWhile) ->
    KeepWhileConds when
      KeepWhileConds :: keep_while_conds_map(),
      KeepWhileCondsRevIdx :: keep_while_conds_revidx(),
      Watcher :: khepri_path:native_path(),
      KeepWhile :: khepri_condition:native_keep_while().

update_keep_while_conds_revidx(
  KeepWhileConds, KeepWhileCondsRevIdx, Watcher, KeepWhile) ->
    %% First, clean up reversed index where a watched path isn't watched
    %% anymore in the new keep_while.
    OldWatcheds = maps:get(Watcher, KeepWhileConds, #{}),
    KeepWhileCondsRevIdx1 = maps:fold(
                          fun(Watched, _, KWRevIdx) ->
                                  Watchers = maps:get(Watched, KWRevIdx),
                                  Watchers1 = maps:remove(Watcher, Watchers),
                                  case maps:size(Watchers1) of
                                      0 -> maps:remove(Watched, KWRevIdx);
                                      _ -> KWRevIdx#{Watched => Watchers1}
                                  end
                          end, KeepWhileCondsRevIdx, OldWatcheds),
    %% Then, record the watched paths.
    maps:fold(
      fun(Watched, _, KWRevIdx) ->
              Watchers = maps:get(Watched, KWRevIdx, #{}),
              Watchers1 = Watchers#{Watcher => ok},
              KWRevIdx#{Watched => Watchers1}
      end, KeepWhileCondsRevIdx1, KeepWhile).

locate_sproc_and_execute_tx(
  #?MODULE{root = Root} = State, PathPattern, Args, AllowUpdates) ->
    TreeOptions = #{expect_specific_node => true,
                    props_to_return => [raw_payload]},
    {StandaloneFun, Args1} =
    case find_matching_nodes(Root, PathPattern, TreeOptions) of
        {ok, Result} ->
            case maps:values(Result) of
                [#{raw_payload := #p_sproc{
                                     sproc = StoredProc,
                                     is_valid_as_tx_fun = ReadWrite}}]
                  when AllowUpdates andalso
                       (ReadWrite =:= ro orelse ReadWrite =:= rw) ->
                    {StoredProc, Args};
                [#{raw_payload := #p_sproc{
                                     sproc = StoredProc,
                                     is_valid_as_tx_fun = ro}}]
                  when not AllowUpdates ->
                    {StoredProc, Args};
                [#{raw_payload := #p_sproc{}}] ->
                    Reason = ?khepri_error(
                                sproc_invalid_as_tx_fun,
                                #{path => PathPattern}),
                    {fun failed_to_locate_sproc/1, [Reason]};
                _ ->
                    Reason = ?khepri_error(
                                no_sproc_at_given_path,
                                #{path => PathPattern}),
                    {fun failed_to_locate_sproc/1, [Reason]}
            end;
        {error, Reason} ->
            {fun failed_to_locate_sproc/1, [Reason]}
    end,
    khepri_tx_adv:run(State, StandaloneFun, Args1, AllowUpdates).

-spec failed_to_locate_sproc(Reason) -> no_return() when
      Reason :: any().
%% @private

failed_to_locate_sproc(Reason) ->
    khepri_tx:abort(Reason).

-spec find_matching_nodes(Root, PathPattern, TreeOptions) ->
    Ret when
      Root :: tree_node(),
      PathPattern :: khepri_path:native_pattern(),
      TreeOptions :: khepri:tree_options(),
      Ret :: khepri_machine:common_ret().
%% @private

find_matching_nodes(Root, PathPattern, TreeOptions) ->
    find_matching_nodes(
      Root, PathPattern,
      fun collect_node_props_cb/3, #{},
      TreeOptions).

-spec collect_node_props_cb(Path, NodeProps, Map) ->
    Ret when
      Path :: khepri_path:native_path(),
      NodeProps :: khepri:node_props(),
      Map :: khepri_adv:node_props_map(),
      Ret :: Map.
%% @private

collect_node_props_cb(Path, NodeProps, Map) when is_map(Map) ->
    Map#{Path => NodeProps}.

-spec count_node_cb(Path, NodeProps, Count) ->
    Ret when
      Path :: khepri_path:native_path(),
      NodeProps :: khepri:node_props(),
      Count :: non_neg_integer(),
      Ret :: Count.
%% @private

count_node_cb(_Path, _NodeProps, Count) when is_integer(Count) ->
    Count + 1.

-spec find_matching_nodes(Root, PathPattern, Fun, Acc, TreeOptions) ->
    Ret when
      Root :: tree_node(),
      PathPattern :: khepri_path:native_pattern(),
      Fun :: khepri:fold_fun(),
      Acc :: khepri:fold_acc(),
      TreeOptions :: khepri:tree_options(),
      Ret :: khepri:ok(Acc) | khepri:error().
%% @private

find_matching_nodes(Root, PathPattern, Fun, Acc, TreeOptions) ->
    WalkFun = fun(Path, Node, Acc1) ->
                      find_matching_nodes_cb(
                        Path, Node, Fun, Acc1, TreeOptions)
              end,
    Ret = walk_down_the_tree(
            Root, PathPattern, TreeOptions, #{}, WalkFun, Acc),
    case Ret of
        {ok, NewRoot, _, Acc2} ->
            ?assertEqual(Root, NewRoot),
            {ok, Acc2};
        Error ->
            Error
    end.

find_matching_nodes_cb(Path, #node{} = Node, Fun, Acc, TreeOptions) ->
    NodeProps = gather_node_props(Node, TreeOptions),
    Acc1 = Fun(Path, NodeProps, Acc),
    {ok, keep, Acc1};
find_matching_nodes_cb(
  _,
  {interrupted, node_not_found = Reason, Info},
  _Fun, _Acc,
  #{expect_specific_node := true}) ->
    %% If we are collecting node properties (the result is a map) and the path
    %% targets a specific node which is not found, we return an error.
    %%
    %% If we are counting nodes, that's fine and the next function clause will
    %% run. The walk won't be interrupted.
    Reason1 = ?khepri_error(Reason, Info),
    {error, Reason1};
find_matching_nodes_cb(_, {interrupted, _, _}, _, Acc, _) ->
    {ok, keep, Acc}.

-spec insert_or_update_node(State, PathPattern, Payload, Extra, TreeOptions) ->
    Ret when
      State :: state(),
      PathPattern :: khepri_path:native_pattern(),
      Payload :: khepri_payload:payload(),
      Extra :: khepri:put_options(),
      TreeOptions :: khepri:tree_options(),
      Ret :: {State, Result} | {State, Result, ra_machine:effects()},
      Result :: khepri_machine:common_ret().
%% @private

insert_or_update_node(
  #?MODULE{root = Root,
           keep_while_conds = KeepWhileConds,
           keep_while_conds_revidx = KeepWhileCondsRevIdx} = State,
  PathPattern, Payload,
  #{keep_while := KeepWhile},
  TreeOptions) ->
    Fun = fun(Path, Node, {_, _, Result}) ->
                  Ret = insert_or_update_node_cb(
                          Path, Node, Payload, TreeOptions, Result),
                  case Ret of
                      {ok, Node1, Result1} when Result1 =/= #{} ->
                          AbsKeepWhile = to_absolute_keep_while(
                                           Path, KeepWhile),
                          KeepWhileOnOthers = maps:remove(Path, AbsKeepWhile),
                          KWMet = are_keep_while_conditions_met(
                                    Root, KeepWhileOnOthers),
                          case KWMet of
                              true ->
                                  {ok, Node1, {updated, Path, Result1}};
                              {false, Reason} ->
                                  %% The keep_while condition is not met. We
                                  %% can't insert the node and return an
                                  %% error instead.
                                  NodeName = case Path of
                                                 [] -> ?KHEPRI_ROOT_NODE;
                                                 _  -> lists:last(Path)
                                             end,
                                  Reason1 = ?khepri_error(
                                               keep_while_conditions_not_met,
                                               #{node_name => NodeName,
                                                 node_path => Path,
                                                 keep_while_reason => Reason}),
                                  {error, Reason1}
                          end;
                      {ok, Node1, Result1} ->
                          {ok, Node1, {updated, Path, Result1}};
                      Error ->
                          Error
                  end
          end,
    Ret1 = walk_down_the_tree(
             Root, PathPattern, TreeOptions,
             #{keep_while_conds => KeepWhileConds,
               keep_while_conds_revidx => KeepWhileCondsRevIdx,
               applied_changes => #{}},
             Fun, {undefined, [], #{}}),
    case Ret1 of
        {ok, Root1, #{applied_changes := AppliedChanges} = Extra,
         {updated, ResolvedPath, Ret2}} ->
            #{keep_while_conds := KeepWhileConds1,
              keep_while_conds_revidx := KeepWhileCondsRevIdx1} =
            update_keep_while_conds_extra(Extra, ResolvedPath, KeepWhile),
            State1 = State#?MODULE{
                              root = Root1,
                              keep_while_conds = KeepWhileConds1,
                              keep_while_conds_revidx = KeepWhileCondsRevIdx1},
            {State2, SideEffects} = create_tree_change_side_effects(
                                      State, State1, Ret2, AppliedChanges),
            {State2, {ok, Ret2}, SideEffects};
        Error ->
            ?assertMatch({error, _}, Error),
            {State, Error}
    end;
insert_or_update_node(
  #?MODULE{root = Root,
           keep_while_conds = KeepWhileConds,
           keep_while_conds_revidx = KeepWhileCondsRevIdx} = State,
  PathPattern, Payload,
  _Extra, TreeOptions) ->
    Fun = fun(Path, Node, Result) ->
                  insert_or_update_node_cb(
                    Path, Node, Payload, TreeOptions, Result)
          end,
    Ret1 = walk_down_the_tree(
             Root, PathPattern, TreeOptions,
             #{keep_while_conds => KeepWhileConds,
               keep_while_conds_revidx => KeepWhileCondsRevIdx,
               applied_changes => #{}},
             Fun, #{}),
    case Ret1 of
        {ok, Root1, #{keep_while_conds := KeepWhileConds1,
                      keep_while_conds_revidx := KeepWhileCondsRevIdx1,
                      applied_changes := AppliedChanges},
         Ret2} ->
            State1 = State#?MODULE{root = Root1,
                                   keep_while_conds = KeepWhileConds1,
                                   keep_while_conds_revidx =
                                   KeepWhileCondsRevIdx1},
            {State2, SideEffects} = create_tree_change_side_effects(
                                      State, State1, Ret2, AppliedChanges),
            {State2, {ok, Ret2}, SideEffects};
        Error ->
            ?assertMatch({error, _}, Error),
            {State, Error}
    end.

insert_or_update_node_cb(
  Path, #node{} = Node, Payload, TreeOptions, Result) ->
    case maps:is_key(Path, Result) of
        false ->
            %% After a node is modified, we collect properties from the updated
            %% `#node{}', except the payload which is from the old one.
            Node1 = set_node_payload(Node, Payload),
            NodeProps = gather_node_props_from_old_and_new_nodes(
                          Node, Node1, TreeOptions),
            {ok, Node1, Result#{Path => NodeProps}};
        true ->
            {ok, Node, Result}
    end;
insert_or_update_node_cb(
  Path, {interrupted, node_not_found = Reason, Info}, Payload, TreeOptions,
  Result) ->
    %% We store the payload when we reached the target node only, not in the
    %% parent nodes we have to create in between.
    IsTarget = maps:get(node_is_target, Info),
    case can_continue_update_after_node_not_found(Info) of
        true when IsTarget ->
            Node = create_node_record(Payload),
            NodeProps = gather_node_props_from_old_and_new_nodes(
                          undefined, Node, TreeOptions),
            {ok, Node, Result#{Path => NodeProps}};
        true ->
            Node = create_node_record(khepri_payload:none()),
            {ok, Node, Result};
        false ->
            Reason1 = ?khepri_error(Reason, Info),
            {error, Reason1}
    end;
insert_or_update_node_cb(_, {interrupted, Reason, Info}, _, _, _) ->
    Reason1 = ?khepri_error(Reason, Info),
    {error, Reason1}.

can_continue_update_after_node_not_found(#{condition := Condition}) ->
    can_continue_update_after_node_not_found1(Condition);
can_continue_update_after_node_not_found(#{node_name := NodeName}) ->
    can_continue_update_after_node_not_found1(NodeName).

can_continue_update_after_node_not_found1(ChildName)
  when ?IS_KHEPRI_PATH_COMPONENT(ChildName) ->
    true;
can_continue_update_after_node_not_found1(#if_node_exists{exists = false}) ->
    true;
can_continue_update_after_node_not_found1(#if_all{conditions = Conds}) ->
    lists:all(fun can_continue_update_after_node_not_found1/1, Conds);
can_continue_update_after_node_not_found1(#if_any{conditions = Conds}) ->
    lists:any(fun can_continue_update_after_node_not_found1/1, Conds);
can_continue_update_after_node_not_found1(_) ->
    false.

-spec delete_matching_nodes(State, PathPattern, TreeOptions) -> Ret when
      State :: state(),
      PathPattern :: khepri_path:native_pattern(),
      TreeOptions :: khepri:tree_options(),
      Ret :: {State, Result} | {State, Result, ra_machine:effects()},
      Result :: khepri_machine:common_ret().
%% @private

delete_matching_nodes(
  #?MODULE{root = Root,
           keep_while_conds = KeepWhileConds,
           keep_while_conds_revidx = KeepWhileCondsRevIdx} = State,
  PathPattern,
  TreeOptions) ->
    Ret1 = do_delete_matching_nodes(
             PathPattern, Root,
             #{keep_while_conds => KeepWhileConds,
               keep_while_conds_revidx => KeepWhileCondsRevIdx,
               applied_changes => #{}},
             TreeOptions),
    case Ret1 of
        {ok, Root1, #{keep_while_conds := KeepWhileConds1,
                      keep_while_conds_revidx := KeepWhileCondsRevIdx1,
                      applied_changes := AppliedChanges},
         Ret2} ->
            State1 = State#?MODULE{
                              root = Root1,
                              keep_while_conds = KeepWhileConds1,
                              keep_while_conds_revidx = KeepWhileCondsRevIdx1},
            {State2, SideEffects} = create_tree_change_side_effects(
                                      State, State1, Ret2, AppliedChanges),
            {State2, {ok, Ret2}, SideEffects};
        Error ->
            {State, Error}
    end.

do_delete_matching_nodes(PathPattern, Root, Extra, TreeOptions) ->
    Fun = fun(Path, Node, Result) ->
                  delete_matching_nodes_cb(Path, Node, TreeOptions, Result)
          end,
    walk_down_the_tree(Root, PathPattern, TreeOptions, Extra, Fun, #{}).

delete_matching_nodes_cb([] = Path, #node{} = Node, TreeOptions, Result) ->
    Node1 = remove_node_payload(Node),
    Node2 = remove_node_child_nodes(Node1),
    NodeProps = gather_node_props(Node, TreeOptions),
    {ok, Node2, Result#{Path => NodeProps}};
delete_matching_nodes_cb(Path, #node{} = Node, TreeOptions, Result) ->
    NodeProps = gather_node_props(Node, TreeOptions),
    {ok, delete, Result#{Path => NodeProps}};
delete_matching_nodes_cb(_, {interrupted, _, _}, _Options, Result) ->
    {ok, keep, Result}.

create_tree_change_side_effects(
  InitialState, NewState, Ret, KeepWhileAftermath) ->
    %% We make a map where for each affected tree node, we indicate the type
    %% of change.
    Changes0 = maps:merge(Ret, KeepWhileAftermath),
    Changes = maps:map(
                fun
                    (_, NodeProps) when NodeProps =:= #{} -> create;
                    (_, #{} = _NodeProps)                 -> update;
                    (_, delete)                           -> delete
                end, Changes0),
    ProjectionEffects = create_projection_side_effects(
                          InitialState, NewState, Changes),
    {NewState1, TriggerEffects} = create_trigger_side_effects(
                                    InitialState, NewState, Changes),
    {NewState1, ProjectionEffects ++ TriggerEffects}.

create_projection_side_effects(
  #?MODULE{root = InitialRoot} = _InitialState,
  #?MODULE{projections = Projections, root = NewRoot} = _NewState,
  Changes) ->
    %% Note: the order in which updates are applied to projections should not
    %% be relied on.
    Effects =
    maps:fold(
      fun(Path, Change, Effects) ->
              maps:fold(
                fun(Projection, Pattern, Effects1) ->
                        evaluate_projection(
                          InitialRoot, NewRoot,
                          Path, Change, Pattern, Projection, Effects1)
                end, Effects, Projections)
      end, [], Changes),
    lists:reverse(Effects).

evaluate_projection(
  InitialRoot, NewRoot, Path, Change, Pattern, Projection, Effects) ->
    PathMatchingRoot = case Change of
                           Put when Put =:= create orelse Put =:= update ->
                               NewRoot;
                           delete ->
                               InitialRoot
                       end,
    case does_path_match(Path, Pattern, [], PathMatchingRoot) of
        true ->
            FindOptions = #{props_to_return => ?PROJECTION_PROPS_TO_RETURN,
                            expect_specific_node => true},
            InitialRet = find_matching_nodes(InitialRoot, Path, FindOptions),
            InitialProps = case InitialRet of
                               {ok, #{Path := InitialProps0}} ->
                                   InitialProps0;
                               _ ->
                                   #{}
                           end,
            NewRet = find_matching_nodes(NewRoot, Path, FindOptions),
            NewProps = case NewRet of
                             {ok, #{Path := NewProps0}} ->
                                 NewProps0;
                             _ ->
                                 #{}
                         end,
            Trigger = #trigger_projection{path = Path,
                                          old_props = InitialProps,
                                          new_props = NewProps,
                                          projection = Projection},
            Effect = {aux, Trigger},
            [Effect | Effects];
        false ->
            Effects
    end.

create_trigger_side_effects(
  #?MODULE{triggers = Triggers} = _InitialState, NewState, _Changes)
  when Triggers =:= #{} ->
    {NewState, []};
create_trigger_side_effects(
  %% We want to consider the new state (with the updated tree), but we want
  %% to use triggers from the initial state, in case they were updated too.
  %% In other words, we want to evaluate triggers in the state they were at
  %% the time the change to the tree was requested.
  #?MODULE{triggers = Triggers,
           emitted_triggers = EmittedTriggers} = _InitialState,
  #?MODULE{config = #config{store_id = StoreId},
           root = Root} = NewState,
  Changes) ->
    TriggeredStoredProcs = list_triggered_sprocs(Root, Changes, Triggers),

    %% We record the list of triggered stored procedures in the state
    %% machine's state. This is used to guaranty at-least-once execution of
    %% the trigger: the event handler process is supposed to ack when it
    %% executed the triggered stored procedure. If the Ra cluster changes
    %% leader in between, we know that we need to retry the execution.
    %%
    %% This could lead to multiple execution of the same trigger, therefore
    %% the stored procedure must be idempotent.
    NewState1 = NewState#?MODULE{
                            emitted_triggers =
                            EmittedTriggers ++ TriggeredStoredProcs},

    %% We still emit a `mod_call' effect to wake up the event handler process
    %% so it doesn't have to poll the internal list.
    SideEffect = {mod_call,
                  khepri_event_handler,
                  handle_triggered_sprocs,
                  [StoreId, TriggeredStoredProcs]},
    {NewState1, [SideEffect]}.

list_triggered_sprocs(Root, Changes, Triggers) ->
    TriggeredStoredProcs =
    maps:fold(
      fun(Path, Change, TSP) ->
              % For each change, we evaluate each trigger.
              maps:fold(
                fun(TriggerId, TriggerProps, TSP1) ->
                        evaluate_trigger(
                          Root, Path, Change, TriggerId, TriggerProps, TSP1)
                end, TSP, Triggers)
      end, [], Changes),
    sort_triggered_sprocs(TriggeredStoredProcs).

evaluate_trigger(
  Root, Path, Change, TriggerId,
  #{sproc := StoredProcPath,
    event_filter := #evf_tree{path = PathPattern,
                              props = EventFilterProps} = EventFilter},
  TriggeredStoredProcs) ->
    %% For each trigger based on a tree event:
    %%   1. we verify the path of the changed tree node matches the monitored
    %%      path pattern in the event filter.
    %%   2. we verify the type of change matches the change filter in the
    %%      event filter.
    PathMatches = does_path_match(Path, PathPattern, [], Root),
    DefaultWatchedChanges = [create, update, delete],
    WatchedChanges = case EventFilterProps of
                         #{on_actions := []} ->
                             DefaultWatchedChanges;
                         #{on_actions := OnActions}
                           when is_list(OnActions) ->
                             OnActions;
                         _ ->
                             DefaultWatchedChanges
                     end,
    ChangeMatches = lists:member(Change, WatchedChanges),
    case PathMatches andalso ChangeMatches of
        true ->
            %% We then locate the stored procedure. If the path doesn't point
            %% to an existing tree node, or if this tree node is not a stored
            %% procedure, the trigger is ignored.
            %%
            %% TODO: Should we return an error or at least log something? This
            %% could be considered noise if the trigger exists regardless of
            %% the presence of the stored procedure on purpose (for instance
            %% the caller code is being updated).
            case find_stored_proc(Root, StoredProcPath) of
                undefined ->
                    TriggeredStoredProcs;
                StoredProc ->
                    %% TODO: Use a record to format
                    %% stored procedure arguments?
                    EventProps = #{path => Path,
                                   on_action => Change},
                    Triggered = #triggered{
                                   id = TriggerId,
                                   event_filter = EventFilter,
                                   sproc = StoredProc,
                                   props = EventProps},
                    [Triggered | TriggeredStoredProcs]
            end;
        false ->
            TriggeredStoredProcs
    end;
evaluate_trigger(
  _Root, _Path, _Change, _TriggerId, _TriggerProps, TriggeredStoredProcs) ->
    TriggeredStoredProcs.

does_path_match(PathRest, PathRest, _ReversedPath, _Root) ->
    true;
does_path_match([], _PathPatternRest, _ReversedPath, _Root) ->
    false;
does_path_match(_PathRest, [], _ReversedPath, _Root) ->
    false;
does_path_match(
  [Component | Path], [Component | PathPattern], ReversedPath, Root)
  when ?IS_KHEPRI_PATH_COMPONENT(Component) ->
    does_path_match(Path, PathPattern, [Component | ReversedPath], Root);
does_path_match(
  [Component | _Path], [Condition | _PathPattern], _ReversedPath, _Root)
  when ?IS_KHEPRI_PATH_COMPONENT(Component) andalso
       ?IS_KHEPRI_PATH_COMPONENT(Condition) ->
    false;
does_path_match(
  [Component | Path], [Condition | PathPattern], ReversedPath, Root) ->
    %% Query the tree node, required to evaluate the condition.
    ReversedPath1 = [Component | ReversedPath],
    CurrentPath = lists:reverse(ReversedPath1),
    TreeOptions = #{expect_specific_node => true,
                    props_to_return => [payload,
                                        payload_version,
                                        child_list_version,
                                        child_list_length]},
    {ok, #{CurrentPath := Node}} = find_matching_nodes(
                                     Root,
                                     lists:reverse([Component | ReversedPath]),
                                     TreeOptions),
    case khepri_condition:is_met(Condition, Component, Node) of
        true ->
            ConditionMatchesGrandchildren =
            case khepri_condition:applies_to_grandchildren(Condition) of
                true ->
                    does_path_match(
                      Path, [Condition | PathPattern], ReversedPath1, Root);
                false ->
                    false
            end,
            ConditionMatchesGrandchildren orelse
              does_path_match(Path, PathPattern, ReversedPath1, Root);
        {false, _} ->
            false
    end.

find_stored_proc(Root, StoredProcPath) ->
    TreeOptions = #{expect_specific_node => true,
                    props_to_return => [payload,
                                        payload_version,
                                        child_list_version,
                                        child_list_length]},
    Ret = find_matching_nodes(
            Root, StoredProcPath, TreeOptions),
    %% Non-existing nodes and nodes which are not stored procedures are
    %% ignored.
    case Ret of
        {ok, #{StoredProcPath := #{sproc := StoredProc}}} -> StoredProc;
        _                                                 -> undefined
    end.

sort_triggered_sprocs(TriggeredStoredProcs) ->
    %% We first sort by priority, then by triggered ID if priorities are equal.
    %% The priority can be any integer (even negative integers). The default
    %% priority is 0.
    %%
    %% A higher priority (a greater integer) means that the triggered stored
    %% procedure will be executed before another one with lower priority
    %% (smaller integer).
    %%
    %% If the priorities are equal, a trigger with an ID earlier in
    %% alphabetical order will be executed before another one with an ID later
    %% in alphabetical order.
    lists:sort(
      fun(#triggered{id = IdA, event_filter = EventFilterA},
          #triggered{id = IdB, event_filter = EventFilterB}) ->
              PrioA = khepri_evf:get_priority(EventFilterA),
              PrioB = khepri_evf:get_priority(EventFilterB),
              if
                  PrioA =:= PrioB -> IdA =< IdB;
                  true            -> PrioA > PrioB
              end
      end,
      TriggeredStoredProcs).

%% -------

-spec walk_down_the_tree(
        Root, PathPattern, TreeOptions, Extra, Fun, FunAcc) -> Ret when
      Root :: tree_node(),
      PathPattern :: khepri_path:native_pattern(),
      TreeOptions :: khepri:tree_options(),
      Extra :: walk_down_the_tree_extra(),
      Fun :: walk_down_the_tree_fun(),
      FunAcc :: any(),
      Node :: tree_node(),
      Ret :: ok(Node, Extra, FunAcc) | khepri:error().
%% @private

walk_down_the_tree(Root, PathPattern, TreeOptions, Extra, Fun, FunAcc) ->
    CompiledPathPattern = khepri_path:compile(PathPattern),
    TreeOptions1 = case TreeOptions of
                       #{expect_specific_node := true} ->
                           TreeOptions;
                       _ ->
                           TreeOptions#{expect_specific_node => false}
                   end,
    walk_down_the_tree1(
      Root, CompiledPathPattern, TreeOptions1,
      [], %% Used to remember the path of the node currently on.
      [], %% Used to update parents up in the tree in a tail-recursive
          %% function.
      Extra, Fun, FunAcc).

-spec walk_down_the_tree1(
        Root, CompiledPathPattern, TreeOptions,
        ReversedPath, ReversedParentTree, Extra, Fun, FunAcc) -> Ret when
      Root :: tree_node(),
      CompiledPathPattern :: khepri_path:native_pattern(),
      TreeOptions :: khepri:tree_options(),
      ReversedPath :: khepri_path:native_pattern(),
      ReversedParentTree :: [Node | {Node, child_created}],
      Extra :: walk_down_the_tree_extra(),
      Fun :: walk_down_the_tree_fun(),
      FunAcc :: any(),
      Node :: tree_node(),
      Ret :: ok(Node, Extra, FunAcc) | khepri:error().
%% @private

walk_down_the_tree1(
  CurrentNode,
  [?KHEPRI_ROOT_NODE | PathPattern],
  TreeOptions, ReversedPath, ReversedParentTree, Extra, Fun, FunAcc) ->
    ?assertEqual([], ReversedPath),
    ?assertEqual([], ReversedParentTree),
    walk_down_the_tree1(
      CurrentNode, PathPattern, TreeOptions,
      ReversedPath,
      ReversedParentTree,
      Extra, Fun, FunAcc);
walk_down_the_tree1(
  CurrentNode,
  [?THIS_KHEPRI_NODE | PathPattern],
  TreeOptions, ReversedPath, ReversedParentTree, Extra, Fun, FunAcc) ->
    walk_down_the_tree1(
      CurrentNode, PathPattern, TreeOptions,
      ReversedPath, ReversedParentTree, Extra, Fun, FunAcc);
walk_down_the_tree1(
  _CurrentNode,
  [?PARENT_KHEPRI_NODE | PathPattern],
  TreeOptions,
  [_CurrentName | ReversedPath], [ParentNode0 | ReversedParentTree],
  Extra, Fun, FunAcc) ->
    ParentNode = case ParentNode0 of
                     {PN, child_created} -> PN;
                     _                   -> ParentNode0
                 end,
    walk_down_the_tree1(
      ParentNode, PathPattern, TreeOptions,
      ReversedPath, ReversedParentTree, Extra, Fun, FunAcc);
walk_down_the_tree1(
  CurrentNode,
  [?PARENT_KHEPRI_NODE | PathPattern],
  TreeOptions,
  [] = ReversedPath, [] = ReversedParentTree,
  Extra, Fun, FunAcc) ->
    %% The path tries to go above the root node, like "cd /..". In this case,
    %% we stay on the root node.
    walk_down_the_tree1(
      CurrentNode, PathPattern, TreeOptions,
      ReversedPath, ReversedParentTree, Extra, Fun, FunAcc);
walk_down_the_tree1(
  #node{child_nodes = Children} = CurrentNode,
  [ChildName | PathPattern],
  TreeOptions, ReversedPath, ReversedParentTree, Extra, Fun, FunAcc)
  when ?IS_KHEPRI_NODE_ID(ChildName) ->
    case Children of
        #{ChildName := Child} ->
            walk_down_the_tree1(
              Child, PathPattern, TreeOptions,
              [ChildName | ReversedPath],
              [CurrentNode | ReversedParentTree],
              Extra, Fun, FunAcc);
        _ ->
            interrupted_walk_down(
              node_not_found,
              #{node_name => ChildName,
                node_path => lists:reverse([ChildName | ReversedPath])},
              PathPattern, TreeOptions,
              [ChildName | ReversedPath],
              [CurrentNode | ReversedParentTree],
              Extra, Fun, FunAcc)
    end;
walk_down_the_tree1(
  #node{child_nodes = Children} = CurrentNode,
  [Condition | PathPattern], #{expect_specific_node := true} = TreeOptions,
  ReversedPath, ReversedParentTree, Extra, Fun, FunAcc)
  when ?IS_KHEPRI_CONDITION(Condition) ->
    %% We distinguish the case where the condition must be verified against the
    %% current node (i.e. the node name is ?KHEPRI_ROOT_NODE or
    %% ?THIS_KHEPRI_NODE in the condition) instead of its child nodes.
    SpecificNode = khepri_path:component_targets_specific_node(Condition),
    case SpecificNode of
        {true, NodeName}
          when NodeName =:= ?KHEPRI_ROOT_NODE orelse
               NodeName =:= ?THIS_KHEPRI_NODE ->
            CurrentName = special_component_to_node_name(
                            NodeName, ReversedPath),
            CondMet = khepri_condition:is_met(
                        Condition, CurrentName, CurrentNode),
            case CondMet of
                true ->
                    walk_down_the_tree1(
                      CurrentNode, PathPattern, TreeOptions,
                      ReversedPath, ReversedParentTree,
                      Extra, Fun, FunAcc);
                {false, Cond} ->
                    interrupted_walk_down(
                      mismatching_node,
                      #{node_name => CurrentName,
                        node_path => lists:reverse(ReversedPath),
                        node_props => gather_node_props_for_error(CurrentNode),
                        condition => Cond},
                      PathPattern, TreeOptions, ReversedPath,
                      ReversedParentTree, Extra, Fun, FunAcc)
            end;
        {true, ChildName} when ChildName =/= ?PARENT_KHEPRI_NODE ->
            case Children of
                #{ChildName := Child} ->
                    CondMet = khepri_condition:is_met(
                                Condition, ChildName, Child),
                    case CondMet of
                        true ->
                            walk_down_the_tree1(
                              Child, PathPattern, TreeOptions,
                              [ChildName | ReversedPath],
                              [CurrentNode | ReversedParentTree],
                              Extra, Fun, FunAcc);
                        {false, Cond} ->
                            interrupted_walk_down(
                              mismatching_node,
                              #{node_name => ChildName,
                                node_path => lists:reverse(
                                               [ChildName | ReversedPath]),
                                node_props => gather_node_props_for_error(
                                                Child),
                                condition => Cond},
                              PathPattern, TreeOptions,
                              [ChildName | ReversedPath],
                              [CurrentNode | ReversedParentTree],
                              Extra, Fun, FunAcc)
                    end;
                _ ->
                    interrupted_walk_down(
                      node_not_found,
                      #{node_name => ChildName,
                        node_path => lists:reverse([ChildName | ReversedPath]),
                        condition => Condition},
                      PathPattern, TreeOptions,
                      [ChildName | ReversedPath],
                      [CurrentNode | ReversedParentTree],
                      Extra, Fun, FunAcc)
            end;
        {true, ?PARENT_KHEPRI_NODE} ->
            %% TODO: Support calling Fun() with parent node based on
            %% conditions on child nodes.
            BadPathPattern =
            lists:reverse(ReversedPath, [Condition | PathPattern]),
            Exception = ?khepri_exception(
                           condition_targets_parent_node,
                           #{path => BadPathPattern,
                             condition => Condition}),
            {error, Exception};
        false ->
            %% The caller expects that the path matches a single specific node
            %% (no matter if it exists or not), but the condition could match
            %% several nodes.
            BadPathPattern =
            lists:reverse(ReversedPath, [Condition | PathPattern]),
            Exception = ?khepri_exception(
                           possibly_matching_many_nodes_denied,
                           #{path => BadPathPattern}),
            {error, Exception}
    end;
walk_down_the_tree1(
  #node{child_nodes = Children} = CurrentNode,
  [Condition | PathPattern] = WholePathPattern,
  #{expect_specific_node := false} = TreeOptions,
  ReversedPath, ReversedParentTree, Extra, Fun, FunAcc)
  when ?IS_KHEPRI_CONDITION(Condition) ->
    %% Like with "expect_specific_node =:= true" function clause above, We
    %% distinguish the case where the condition must be verified against the
    %% current node (i.e. the node name is ?KHEPRI_ROOT_NODE or
    %% ?THIS_KHEPRI_NODE in the condition) instead of its child nodes.
    SpecificNode = khepri_path:component_targets_specific_node(Condition),
    case SpecificNode of
        {true, NodeName}
          when NodeName =:= ?KHEPRI_ROOT_NODE orelse
               NodeName =:= ?THIS_KHEPRI_NODE ->
            CurrentName = special_component_to_node_name(
                            NodeName, ReversedPath),
            CondMet = khepri_condition:is_met(
                        Condition, CurrentName, CurrentNode),
            case CondMet of
                true ->
                    walk_down_the_tree1(
                      CurrentNode, PathPattern, TreeOptions,
                      ReversedPath, ReversedParentTree,
                      Extra, Fun, FunAcc);
                {false, _} ->
                    StartingNode = starting_node_in_rev_parent_tree(
                                     ReversedParentTree, CurrentNode),
                    {ok, StartingNode, Extra, FunAcc}
            end;
        {true, ?PARENT_KHEPRI_NODE} ->
            %% TODO: Support calling Fun() with parent node based on
            %% conditions on child nodes.
            BadPathPattern =
            lists:reverse(ReversedPath, [Condition | PathPattern]),
            Exception = ?khepri_exception(
                           condition_targets_parent_node,
                           #{path => BadPathPattern,
                             condition => Condition}),
            {error, Exception};
        _ ->
            %% There is a special case if the current node is the root node.
            %% The caller can request that the root node's properties are
            %% included. This is true by default if the path is `[]'. This
            %% allows to get its props and payload atomically in a single
            %% query.
            IsRoot = ReversedPath =:= [],
            IncludeRootProps = maps:get(
                                 include_root_props, TreeOptions, false),
            Ret0 = case IsRoot andalso IncludeRootProps of
                       true ->
                           walk_down_the_tree1(
                             CurrentNode, [], TreeOptions,
                             [], [],
                             Extra, Fun, FunAcc);
                       _ ->
                           {ok, CurrentNode, Extra, FunAcc}
                   end,

            %% The result of the first part (the special case for the root
            %% node if relevant) is used as a starting point for handling all
            %% child nodes.
            Ret1 = maps:fold(
                     fun
                         (ChildName, Child, {ok, CurNode, Extra1, FunAcc1}) ->
                             handle_branch(
                               CurNode, ChildName, Child,
                               WholePathPattern, TreeOptions,
                               ReversedPath,
                               Extra1, Fun, FunAcc1);
                         (_, _, Error) ->
                             Error
                     end, Ret0, Children),
            case Ret1 of
                {ok, CurrentNode, Extra, FunAcc2} ->
                    %% The current node didn't change, no need to update the
                    %% tree and evaluate keep_while conditions.
                    StartingNode = starting_node_in_rev_parent_tree(
                                     ReversedParentTree, CurrentNode),
                    {ok, StartingNode, Extra, FunAcc2};
                {ok, CurrentNode1, Extra2, FunAcc2} ->
                    CurrentNode2 = case CurrentNode1 of
                                       CurrentNode ->
                                           CurrentNode;
                                       delete ->
                                           CurrentNode1;
                                       _ ->
                                           %% Because of the loop, payload &
                                           %% child list versions may have
                                           %% been increased multiple times.
                                           %% We want them to increase once
                                           %% for the whole (atomic)
                                           %% operation.
                                           squash_version_bumps(
                                             CurrentNode, CurrentNode1)
                                   end,
                    walk_back_up_the_tree(
                      CurrentNode2, ReversedPath, ReversedParentTree,
                      Extra2, FunAcc2);
                Error ->
                    Error
            end
    end;
walk_down_the_tree1(
  #node{} = CurrentNode, [], _,
  ReversedPath, ReversedParentTree, Extra, Fun, FunAcc) ->
    CurrentPath = lists:reverse(ReversedPath),
    case Fun(CurrentPath, CurrentNode, FunAcc) of
        {ok, keep, FunAcc1} ->
            StartingNode = starting_node_in_rev_parent_tree(
                             ReversedParentTree, CurrentNode),
            {ok, StartingNode, Extra, FunAcc1};
        {ok, delete, FunAcc1} ->
            walk_back_up_the_tree(
              delete, ReversedPath, ReversedParentTree, Extra, FunAcc1);
        {ok, #node{} = CurrentNode1, FunAcc1} ->
            walk_back_up_the_tree(
              CurrentNode1, ReversedPath, ReversedParentTree, Extra, FunAcc1);
        Error ->
            Error
    end.

-spec special_component_to_node_name(SpecialComponent, ReversedPath) ->
    NodeName when
      SpecialComponent :: ?KHEPRI_ROOT_NODE | ?THIS_KHEPRI_NODE,
      ReversedPath :: khepri_path:native_path(),
      NodeName :: khepri_path:component().

special_component_to_node_name(?KHEPRI_ROOT_NODE = NodeName, []) ->
    NodeName;
special_component_to_node_name(?THIS_KHEPRI_NODE, [NodeName | _]) ->
    NodeName;
special_component_to_node_name(?THIS_KHEPRI_NODE, []) ->
    ?KHEPRI_ROOT_NODE.

-spec starting_node_in_rev_parent_tree(ReversedParentTree) -> Node when
      Node :: tree_node(),
      ReversedParentTree :: [Node].
%% @private

starting_node_in_rev_parent_tree(ReversedParentTree) ->
    hd(lists:reverse(ReversedParentTree)).

-spec starting_node_in_rev_parent_tree(ReversedParentTree, Node) -> Node when
      Node :: tree_node(),
      ReversedParentTree :: [Node].
%% @private

starting_node_in_rev_parent_tree([], CurrentNode) ->
    CurrentNode;
starting_node_in_rev_parent_tree(ReversedParentTree, _) ->
    starting_node_in_rev_parent_tree(ReversedParentTree).

-spec handle_branch(
        Node, ChildName, Child, WholePathPattern, TreeOptions,
        ReversedPath, Extra, Fun, FunAcc) -> Ret when
      Node :: tree_node(),
      ChildName :: khepri_path:component(),
      Child :: tree_node(),
      WholePathPattern :: khepri_path:native_pattern(),
      TreeOptions :: khepri:tree_options(),
      ReversedPath :: [Node | {Node, child_created}],
      Extra :: walk_down_the_tree_extra(),
      Fun :: walk_down_the_tree_fun(),
      FunAcc :: any(),
      Ret :: ok(Node, Extra, FunAcc) | khepri:error().
%% @private

handle_branch(
  CurrentNode, ChildName, Child,
  [Condition | PathPattern] = WholePathPattern,
  TreeOptions, ReversedPath, Extra, Fun, FunAcc) ->
    CondMet = khepri_condition:is_met(Condition, ChildName, Child),
    Ret = case CondMet of
              true ->
                  walk_down_the_tree1(
                    Child, PathPattern, TreeOptions,
                    [ChildName | ReversedPath],
                    [CurrentNode],
                    Extra, Fun, FunAcc);
              {false, _} ->
                  {ok, CurrentNode, Extra, FunAcc}
          end,
    case Ret of
        {ok, #node{child_nodes = Children} = CurrentNode1, Extra1, FunAcc1}
          when is_map_key(ChildName, Children) ->
            case khepri_condition:applies_to_grandchildren(Condition) of
                false ->
                    Ret;
                true ->
                    walk_down_the_tree1(
                      Child, WholePathPattern, TreeOptions,
                      [ChildName | ReversedPath],
                      [CurrentNode1],
                      Extra1, Fun, FunAcc1)
            end;
        {ok, _CurrentNode1, _Extra1, _FunAcc1} ->
            %% The child node is gone, no need to test if the condition
            %% applies to it or recurse.
            Ret;
        Error ->
            Error
    end.

-spec interrupted_walk_down(
        Reason, Info, PathPattern, TreeOptions,
        ReversedPath, ReversedParentTree,
        Extra, Fun, FunAcc) -> Ret when
      Reason :: mismatching_node | node_not_found,
      Info :: map(),
      PathPattern :: khepri_path:native_pattern(),
      TreeOptions :: khepri:tree_options(),
      ReversedPath :: khepri_path:native_path(),
      Node :: tree_node(),
      ReversedParentTree :: [Node | {Node, child_created}],
      Extra :: walk_down_the_tree_extra(),
      Fun :: walk_down_the_tree_fun(),
      FunAcc :: any(),
      Ret :: ok(Node, Extra, FunAcc) | khepri:error().
%% @private

interrupted_walk_down(
  Reason, Info, PathPattern, TreeOptions, ReversedPath, ReversedParentTree,
  Extra, Fun, FunAcc) ->
    NodePath = lists:reverse(ReversedPath),
    IsTarget = khepri_path:realpath(PathPattern) =:= [],
    Info1 = Info#{node_is_target => IsTarget},
    ErrorTuple = {interrupted, Reason, Info1},
    case Fun(NodePath, ErrorTuple, FunAcc) of
        {ok, ToDo, FunAcc1}
          when ToDo =:= keep orelse ToDo =:= delete ->
            ?assertNotEqual([], ReversedParentTree),
            StartingNode = starting_node_in_rev_parent_tree(
                             ReversedParentTree),
            {ok, StartingNode, Extra, FunAcc1};
        {ok, #node{} = NewNode, FunAcc1} ->
            ReversedParentTree1 =
            case Reason of
                node_not_found ->
                    %% We record the fact the child is a new node. This is used
                    %% to reset the child's stats if it got new payload or
                    %% child nodes at the same time.
                    [{hd(ReversedParentTree), child_created}
                     | tl(ReversedParentTree)];
                _ ->
                    ReversedParentTree
            end,
            case PathPattern of
                [] ->
                    %% We reached the target node. We could call
                    %% walk_down_the_tree1() again, but it would call Fun() a
                    %% second time.
                    walk_back_up_the_tree(
                      NewNode, ReversedPath, ReversedParentTree1,
                      Extra, FunAcc1);
                _ ->
                    %% We created a tree node automatically on our way to the
                    %% target. We want to add a `keep_while' condition for it
                    %% so it is automatically reclaimed when it becomes
                    %% useless (i.e., no payload and no child nodes).
                    Cond = #if_any{conditions =
                                   [#if_child_list_length{count = {gt, 0}},
                                    #if_has_payload{has_payload = true}]},
                    KeepWhile = #{NodePath => Cond},
                    Extra1 = update_keep_while_conds_extra(
                               Extra, NodePath, KeepWhile),

                    walk_down_the_tree1(
                      NewNode, PathPattern, TreeOptions,
                      ReversedPath, ReversedParentTree1,
                      Extra1, Fun, FunAcc1)
            end;
        Error ->
            Error
    end.

-spec reset_versions(Node) -> Node when
      Node :: tree_node().
%% @private

reset_versions(#node{props = Stat} = CurrentNode) ->
    Stat1 = Stat#{payload_version => ?INIT_DATA_VERSION,
                  child_list_version => ?INIT_CHILD_LIST_VERSION},
    CurrentNode#node{props = Stat1}.

-spec squash_version_bumps(OldNode, NewNode) -> Node when
      OldNode :: tree_node(),
      NewNode :: tree_node(),
      Node :: tree_node().
%% @private

squash_version_bumps(
  #node{props = #{payload_version := DVersion,
                  child_list_version := CVersion}},
  #node{props = #{payload_version := DVersion,
                  child_list_version := CVersion}} = CurrentNode) ->
    CurrentNode;
squash_version_bumps(
  #node{props = #{payload_version := OldDVersion,
                  child_list_version := OldCVersion}},
  #node{props = #{payload_version := NewDVersion,
                  child_list_version := NewCVersion} = Stat} = CurrentNode) ->
    DVersion = case NewDVersion > OldDVersion of
                   true  -> OldDVersion + 1;
                   false -> OldDVersion
               end,
    CVersion = case NewCVersion > OldCVersion of
                   true  -> OldCVersion + 1;
                   false -> OldCVersion
               end,
    Stat1 = Stat#{payload_version => DVersion,
                  child_list_version => CVersion},
    CurrentNode#node{props = Stat1}.

-spec walk_back_up_the_tree(
  Child, ReversedPath, ReversedParentTree, Extra, FunAcc) -> Ret when
      Node :: tree_node(),
      Child :: Node | delete,
      ReversedPath :: khepri_path:native_path(),
      ReversedParentTree :: [Node | {Node, child_created}],
      Extra :: walk_down_the_tree_extra(),
      FunAcc :: any(),
      Ret :: ok(Node, Extra, FunAcc).
%% @private

walk_back_up_the_tree(
  Child, ReversedPath, ReversedParentTree, Extra, FunAcc) ->
    walk_back_up_the_tree(
      Child, ReversedPath, ReversedParentTree, Extra, #{}, FunAcc).

-spec walk_back_up_the_tree(
  Child, ReversedPath, ReversedParentTree, Extra, AppliedChanges,
  FunAcc) -> Ret when
      Node :: tree_node(),
      Child :: Node | delete,
      ReversedPath :: khepri_path:native_path(),
      ReversedParentTree :: [Node | {Node, child_created}],
      Extra :: walk_down_the_tree_extra(),
      AppliedChanges :: #{khepri_path:native_path() => Node | delete},
      FunAcc :: any(),
      Ret :: ok(Node, Extra, FunAcc).
%% @private

walk_back_up_the_tree(
  delete,
  [ChildName | ReversedPath] = WholeReversedPath,
  [ParentNode | ReversedParentTree], Extra, AppliedChanges, FunAcc) ->
    %% Evaluate keep_while of nodes which depended on ChildName (it is
    %% removed) at the end of walk_back_up_the_tree().
    Path = lists:reverse(WholeReversedPath),
    AppliedChanges1 = AppliedChanges#{Path => delete},

    %% Evaluate keep_while of parent node on itself right now (its child_count
    %% has changed).
    ParentNode1 = remove_node_child(ParentNode, ChildName),
    handle_keep_while_for_parent_update(
      ParentNode1, ReversedPath, ReversedParentTree,
      Extra, AppliedChanges1, FunAcc);
walk_back_up_the_tree(
  Child,
  [ChildName | ReversedPath],
  [{ParentNode, child_created} | ReversedParentTree],
  Extra, AppliedChanges, FunAcc) ->
    %% No keep_while to evaluate, the child is new and no nodes depend on it
    %% at this stage.
    %% FIXME: Perhaps there is a condition in a if_any{}?
    Child1 = reset_versions(Child),

    %% Evaluate keep_while of parent node on itself right now (its child_count
    %% has changed).
    ParentNode1 = add_node_child(ParentNode, ChildName, Child1),
    handle_keep_while_for_parent_update(
      ParentNode1, ReversedPath, ReversedParentTree,
      Extra, AppliedChanges, FunAcc);
walk_back_up_the_tree(
  Child,
  [ChildName | ReversedPath] = WholeReversedPath,
  [ParentNode | ReversedParentTree],
  Extra, AppliedChanges, FunAcc) ->
    %% Evaluate keep_while of nodes which depend on ChildName (it is
    %% modified) at the end of walk_back_up_the_tree().
    Path = lists:reverse(WholeReversedPath),
    TreeOptions = #{props_to_return => [payload,
                                        payload_version,
                                        child_list_version,
                                        child_list_length]},
    NodeProps = gather_node_props(Child, TreeOptions),
    AppliedChanges1 = AppliedChanges#{Path => NodeProps},

    %% No need to evaluate keep_while of ParentNode, its child_count is
    %% unchanged.
    ParentNode1 = update_node_child(ParentNode, ChildName, Child),
    walk_back_up_the_tree(
      ParentNode1, ReversedPath, ReversedParentTree,
      Extra, AppliedChanges1, FunAcc);
walk_back_up_the_tree(
  StartingNode,
  [], %% <-- We reached the root (i.e. not in a branch, see handle_branch())
  [], Extra, AppliedChanges, FunAcc) ->
    Extra1 = merge_applied_changes(Extra, AppliedChanges),
    handle_applied_changes(StartingNode, Extra1, FunAcc);
walk_back_up_the_tree(
  StartingNode,
  _ReversedPath,
  [], Extra, AppliedChanges, FunAcc) ->
    Extra1 = merge_applied_changes(Extra, AppliedChanges),
    {ok, StartingNode, Extra1, FunAcc}.

handle_keep_while_for_parent_update(
  ParentNode,
  ReversedPath,
  [{_GrandParentNode, child_created} | _] = ReversedParentTree,
  Extra, AppliedChanges, FunAcc) ->
    %% This is a freshly created node, we don't want to get rid of it right
    %% away.
    walk_back_up_the_tree(
      ParentNode, ReversedPath, ReversedParentTree,
      Extra, AppliedChanges, FunAcc);
handle_keep_while_for_parent_update(
  ParentNode,
  ReversedPath,
  ReversedParentTree,
  Extra, AppliedChanges, FunAcc) ->
    ParentPath = lists:reverse(ReversedPath),
    IsMet = is_keep_while_condition_met_on_self(
              ParentPath, ParentNode, Extra),
    case IsMet of
        true ->
            %% We continue with the update.
            walk_back_up_the_tree(
              ParentNode, ReversedPath, ReversedParentTree,
              Extra, AppliedChanges, FunAcc);
        {false, _Reason} ->
            %% This parent node must be removed because it doesn't meet its
            %% own keep_while condition. keep_while conditions for nodes
            %% depending on this one will be evaluated with the recursion.
            walk_back_up_the_tree(
              delete, ReversedPath, ReversedParentTree,
              Extra, AppliedChanges, FunAcc)
    end.

merge_applied_changes(Extra, AppliedChanges) ->
    OldKWA = maps:get(applied_changes, Extra, #{}),
    NewKWA = maps:fold(
               fun
                   (Path, delete, KWA1) ->
                       KWA1#{Path => delete};
                   (Path, NodeProps, KWA1) ->
                       case KWA1 of
                           #{Path := delete} -> KWA1;
                           _                 -> KWA1#{Path => NodeProps}
                       end
               end, OldKWA, AppliedChanges),
    Extra#{applied_changes => NewKWA}.

handle_applied_changes(
  Root,
  #{applied_changes := AppliedChanges} = Extra,
  FunAcc)
  when AppliedChanges =:= #{} ->
    {ok, Root, Extra, FunAcc};
handle_applied_changes(
  Root,
  #{keep_while_conds := KeepWhileConds,
    keep_while_conds_revidx := KeepWhileCondsRevIdx,
    applied_changes := AppliedChanges} = Extra,
  FunAcc) ->
    ToDelete = eval_keep_while_conditions(
                 AppliedChanges, KeepWhileConds, KeepWhileCondsRevIdx,
                 Root),

    {KeepWhileConds1,
     KeepWhileCondsRevIdx1} = maps:fold(
                            fun
                                (RemovedPath, delete, {KW, KWRevIdx}) ->
                                    KW1 = maps:remove(RemovedPath, KW),
                                    KWRevIdx1 = update_keep_while_conds_revidx(
                                                  KW, KWRevIdx,
                                                  RemovedPath, #{}),
                                    {KW1, KWRevIdx1};
                                (_, _, Acc) ->
                                    Acc
                            end, {KeepWhileConds, KeepWhileCondsRevIdx},
                            AppliedChanges),
    Extra1 = Extra#{keep_while_conds => KeepWhileConds1,
                    keep_while_conds_revidx => KeepWhileCondsRevIdx1},

    ToDelete1 = filter_and_sort_paths_to_delete(ToDelete, AppliedChanges),
    remove_expired_nodes(ToDelete1, Root, Extra1, FunAcc).

eval_keep_while_conditions(
  AppliedChanges, KeepWhileConds, KeepWhileCondsRevIdx, Root) ->
    %% AppliedChanges lists all nodes which were modified or removed. We
    %% want to transform that into a list of nodes to remove.
    %%
    %% Those marked as `delete' in AppliedChanges are already gone. We
    %% need to find the nodes which depended on them, i.e. their keep_while
    %% condition is not met anymore. Note that removed nodes' child nodes are
    %% gone as well and must be handled (they are not specified in
    %% AppliedChanges).
    %%
    %% Those modified in AppliedChanges must be evaluated again to decide
    %% if they should be removed.
    maps:fold(
      fun
          (RemovedPath, delete, ToDelete) ->
              maps:fold(
                fun(Path, Watchers, ToDelete1) ->
                        case lists:prefix(RemovedPath, Path) of
                            true ->
                                eval_keep_while_conditions_after_removal(
                                  Watchers, KeepWhileConds, Root, ToDelete1);
                            false ->
                                ToDelete1
                        end
                end, ToDelete, KeepWhileCondsRevIdx);
          (UpdatedPath, NodeProps, ToDelete) ->
              case KeepWhileCondsRevIdx of
                  #{UpdatedPath := Watchers} ->
                      eval_keep_while_conditions_after_update(
                        UpdatedPath, NodeProps,
                        Watchers, KeepWhileConds, Root, ToDelete);
                  _ ->
                      ToDelete
              end
      end, #{}, AppliedChanges).

eval_keep_while_conditions_after_update(
  UpdatedPath, NodeProps, Watchers, KeepWhileConds, Root, ToDelete) ->
    maps:fold(
      fun(Watcher, ok, ToDelete1) ->
              KeepWhile = maps:get(Watcher, KeepWhileConds),
              CondOnUpdated = maps:get(UpdatedPath, KeepWhile),
              IsMet = khepri_condition:is_met(
                        CondOnUpdated, UpdatedPath, NodeProps),
              case IsMet of
                  true ->
                      ToDelete1;
                  {false, _} ->
                      case are_keep_while_conditions_met(Root, KeepWhile) of
                          true       -> ToDelete1;
                          {false, _} -> ToDelete1#{Watcher => delete}
                      end
              end
      end, ToDelete, Watchers).

eval_keep_while_conditions_after_removal(
  Watchers, KeepWhileConds, Root, ToDelete) ->
    maps:fold(
      fun(Watcher, ok, ToDelete1) ->
              KeepWhile = maps:get(Watcher, KeepWhileConds),
              case are_keep_while_conditions_met(Root, KeepWhile) of
                  true       -> ToDelete1;
                  {false, _} -> ToDelete1#{Watcher => delete}
              end
      end, ToDelete, Watchers).

filter_and_sort_paths_to_delete(ToDelete, AppliedChanges) ->
    Paths1 = lists:sort(
               fun
                   (A, B) when length(A) =:= length(B) ->
                       A < B;
                   (A, B) ->
                       length(A) < length(B)
               end,
               maps:keys(ToDelete)),
    Paths2 = lists:foldl(
               fun(Path, Map) ->
                       case AppliedChanges of
                           #{Path := delete} ->
                               Map;
                           _ ->
                               case is_parent_being_removed(Path, Map) of
                                   false -> Map#{Path => delete};
                                   true  -> Map
                               end
                       end
               end, #{}, Paths1),
    maps:keys(Paths2).

is_parent_being_removed([], _) ->
    false;
is_parent_being_removed(Path, Map) ->
    is_parent_being_removed1(lists:reverse(Path), Map).

is_parent_being_removed1([_ | Parent], Map) ->
    case maps:is_key(lists:reverse(Parent), Map) of
        true  -> true;
        false -> is_parent_being_removed1(Parent, Map)
    end;
is_parent_being_removed1([], _) ->
    false.

remove_expired_nodes([], Root, Extra, FunAcc) ->
    {ok, Root, Extra, FunAcc};
remove_expired_nodes([PathToDelete | Rest], Root, Extra, FunAcc) ->
    case do_delete_matching_nodes(PathToDelete, Root, Extra, #{}) of
        {ok, Root1, Extra1, _} ->
            remove_expired_nodes(Rest, Root1, Extra1, FunAcc)
    end.

-ifdef(TEST).
get_root(#?MODULE{root = Root}) ->
    Root.

get_keep_while_conds(
  #?MODULE{keep_while_conds = KeepWhileConds}) ->
    KeepWhileConds.

get_keep_while_conds_revidx(
  #?MODULE{keep_while_conds_revidx = KeepWhileCondsRevIdx}) ->
    KeepWhileCondsRevIdx.
-endif.