src/khepri_projection.erl

%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright © 2022 VMware, Inc. or its affiliates.  All rights reserved.
%%

%% @doc Khepri projections
%%
%% Projections build a replicated ETS table using tree nodes from the store
%% which match a {@link khepri_path:pattern()}. When a tree node matching a
%% projection's pattern is changed in the store, the tree node is passed
%% through the projection's {@link projection_fun()} to create record(s).
%% These records are then stored in the projection's ETS table for all members
%% in a Khepri cluster.
%%
%% Projections provide a way to query the store as fast as possible and are
%% appropriate for lookups which require low latency and/or high throughput.
%% Projection tables contain all records matching the pattern, though, so the
%% memory footprint of a projection table grows with the number of tree nodes in
%% the store matching the pattern.
%%
%% Projection ETS tables are owned by the Khepri cluster and are deleted when
%% the cluster stops.
%%
%% Updates to projection tables are immediately consistent for the member of
%% the cluster on which the change to the store is made and the leader member
%% but are eventually consistent for all other followers.

-module(khepri_projection).

-include_lib("kernel/include/logger.hrl").

-include("src/khepri_projection.hrl").
-include("src/khepri_machine.hrl").
-include("src/khepri_fun.hrl").

-export([new/2, new/3, name/1]).

 %% For internal use only
-export([init/1, trigger/4]).

-type projection() :: #khepri_projection{}.
%% A projection resource.
%%
%% @see khepri_projection:new/3.
%% @see khepri:register_projection/4.

-type simple_projection_fun() ::
fun((Path :: khepri_path:native_path(),
     Payload :: khepri:data()) -> Record :: tuple()).
%% A simple projection function.
%%
%% Simple projection functions only take the path and payload for a tree node
%% in the store. The record produced by the function is used to create and
%% delete objects in the ETS table.
%%
%% This function is compiled the same way as a transaction function: all
%% side-effects are not allowed. Additionally, for any `Path' and `Payload'
%% inputs, this function must consistently return the same `Record'.

-type extended_projection_fun() ::
fun((Table :: ets:tid(),
     Path :: khepri_path:native_path(),
     OldPayload :: khepri:node_props(),
     NewPayload :: khepri:node_props()) -> any()).
%% An extended projection function.
%%
%% In some cases, a tree node in the store might correspond to many objects in
%% a projection table. Extended projection functions are allowed to call ETS
%% functions directly in order to build the projection table.
%%
%% `OldPayload' or `NewPayload' are empty maps if there is no tree node. For
%% example, a newly created tree node will have an empty map for `OldPayload'
%% and a {@link khepri:node_props()} map with values for `NewPayload'.
%%
%% This function is compiled like a transaction function except that calls
%% to the {@link ets} module are allowed.
%%
%% The return value of this function is ignored.

-type projection_fun() :: simple_projection_fun() | extended_projection_fun().
%% A function that formats an entry in the tree into a record to be stored in a
%% projection.
%%
%% Projection functions may either be "simple" or "extended." See {@link
%% simple_projection_fun()} and {@link extended_projection_fun()} for more
%% information.
%%
%% The projection function is executed directly by the Ra server process. The
%% function should be as simple and fast as possible to avoid slowing down the
%% server.

-type options() :: #{type => ets:table_type(),
                     keypos => pos_integer(),
                     read_concurrency => boolean(),
                     write_concurrency => boolean() | auto,
                     compressed => boolean()}.
%% Options which control the created ETS table.
%%
%% These options are a subset of the options available to {@link ets:new/2}.
%% Refer to the {@link ets:new/2} documentation for a reference on each type
%% and available values.
%%
%% When a projection is created from a {@link simple_projection_fun()}, the
%% `type' option may only be `set' or `ordered_set': `bag' types are not
%% allowed. {@link extended_projection_fun()}s may use any valid {@link
%% ets:table_type()}.

-export_type([projection/0,
              projection_fun/0,
              options/0]).

-spec new(Name, ProjectionFun) -> Projection when
      Name :: atom(),
      ProjectionFun :: khepri_projection:projection_fun(),
      Projection :: khepri_projection:projection().
%% Creates a new projection data structure with default options.
%% This is the same as calling `new(Name, ProjectionFun, #{})'.

new(Name, ProjectionFun) ->
    new(Name, ProjectionFun, #{}).

-spec new(Name, ProjectionFun, Options) -> Projection when
      Name :: atom(),
      ProjectionFun :: khepri_projection:projection_fun(),
      Options :: khepri_projection:options(),
      Projection :: khepri_projection:projection().
%% @doc Creates a new projection data structure.
%%
%% This function throws an error in the shape of `{unexpected_option, Key,
%% Value}' when an unknown or invalid {@link option()} is passed. For example,
%% if the passed `ProjectionFun' is a {@link simple_projection_fun()} and the
%% `Options' map sets the `type' to `bag', this function throws
%% `{unexpected_option, type, bag}' since `bag' is not valid for simple
%% projection funs.
%%
%% @param Name the name of the projection. This corresponds to the name of
%%        the ETS table which is created when the projection is registered.
%% @param ProjectionFun the function which turns paths and data into records
%%        to be stored in the projection table.
%% @param Options options which control properties of the projection table.
%%
%% @returns a {@link projection()} resource.


new(Name, ProjectionFun, Options)
  when is_map(Options) andalso
       (is_function(ProjectionFun, 2) orelse
        is_function(ProjectionFun, 4)) ->
    EtsOptions = maps:fold(fun to_ets_options/3, [named_table], Options),
    ShouldProcessFunction =
    if
        is_function(ProjectionFun, 2) ->
            %% Ensure that the type is set or ordered_set for simple projection
            %% funs.
            case maps:get(type, Options, set) of
                set ->
                    ok;
                ordered_set ->
                    ok;
                Type ->
                    throw({unexpected_option, type, Type})
            end,
            fun khepri_tx_adv:should_process_function/4;
        is_function(ProjectionFun, 4) ->
            fun (ets, _F, _A, _From) ->
                    false;
                (M, F, A, From) ->
                    khepri_tx_adv:should_process_function(M, F, A, From)
            end
    end,
    FunOptions = #{ensure_instruction_is_permitted =>
                   fun khepri_tx_adv:ensure_instruction_is_permitted/1,
                   should_process_function => ShouldProcessFunction,
                   is_standalone_fun_still_needed => fun(_Params) -> true end},
    StandaloneFun = khepri_fun:to_standalone_fun(ProjectionFun, FunOptions),
    #khepri_projection{name = Name,
                       projection_fun = StandaloneFun,
                       ets_options = EtsOptions}.

-spec to_ets_options(Key, Value, Acc) -> Acc
    when
      Key :: atom(),
      Value :: atom() | pos_integer() | boolean(),
      Acc :: [tuple() | atom()].
%% @hidden

to_ets_options(type, Type, Acc)
  when Type =:= set orelse Type =:= ordered_set orelse
       Type =:= bag orelse Type =:= duplicate_bag ->
    [Type | Acc];
to_ets_options(keypos, Pos, Acc) when Pos >= 1 ->
    [{keypos, Pos} | Acc];
to_ets_options(read_concurrency, ReadConcurrency, Acc)
  when is_boolean(ReadConcurrency) ->
    [{read_concurrency, ReadConcurrency} | Acc];
to_ets_options(write_concurrency, WriteConcurrency, Acc)
  when is_boolean(WriteConcurrency) orelse WriteConcurrency =:= auto ->
    [{write_concurrency, WriteConcurrency} | Acc];
to_ets_options(compressed, true, Acc) ->
    [compressed | Acc];
to_ets_options(compressed, false, Acc) ->
    Acc;
to_ets_options(Key, Value, _Acc) ->
    throw({unexpected_option, Key, Value}).

-spec name(Projection) -> Name when
      Projection :: projection(),
      Name :: atom().
%% @doc Returns the name of the given `Projection'.

name(#khepri_projection{name = Name}) ->
    Name.

-spec init(Projection) -> Ret when
      Projection :: projection(),
      Ret :: ok | {error, exists}.
%% @hidden
%% Initializes a projection. The current implementation creates an ETS
%% table using the projection's `name/1' and {@link options()}.

init(#khepri_projection{name = Name, ets_options = EtsOptions}) ->
    case ets:info(Name) of
        undefined ->
            _ = ets:new(Name, EtsOptions),
            ok;
        _Info ->
            {error, exists}
    end.

-spec trigger(Projection, Path, OldProps, NewProps) -> Ret when
      Projection :: projection(),
      Path :: khepri_path:native_path(),
      OldProps :: khepri:node_props(),
      NewProps :: khepri:node_props(),
      Ret :: ok.
%% @hidden
%% Applies the projection function against the entry from the tree to return
%% a projected record. This projected record is then applied to the ETS table.

trigger(
  #khepri_projection{name = Name, projection_fun = ProjectionFun},
  Path, OldProps, NewProps) ->
    Table = ets:whereis(Name),
    case ProjectionFun#standalone_fun.arity of
        2 ->
            trigger_simple_projection(
              Table, Name, ProjectionFun, Path, OldProps, NewProps);
        4 ->
            trigger_extended_projection(
              Table, Name, ProjectionFun, Path, OldProps, NewProps)
    end.

-spec trigger_extended_projection(
        Table, Name, StandaloneFun, Path, OldProps, NewProps) ->
    Ret when
      Table :: ets:tid(),
      Name :: atom(),
      StandaloneFun :: khepri_fun:standalone_fun(),
      Path :: khepri_path:native_path(),
      OldProps :: khepri:node_props(),
      NewProps :: khepri:node_props(),
      Ret :: ok.
%% @hidden

trigger_extended_projection(
  Table, Name, StandaloneFun, Path, OldProps, NewProps) ->
    Args = [Table, Path, OldProps, NewProps],
    try
        khepri_fun:exec(StandaloneFun, Args)
    catch
        Class:Reason:Stacktrace ->
            Msg = io_lib:format("Failed to trigger extended projection~n"
                                "  Projection: ~s~n"
                                "  Path:~n"
                                "    ~p~n"
                                "  Old props:~n"
                                "    ~p~n"
                                "  New props:~n"
                                "    ~p~n"
                                "  Crash:~n"
                                "    ~ts",
                                [Name, Path, OldProps, NewProps,
                                 khepri_utils:format_exception(
                                   Class, Reason, Stacktrace,
                                   #{column => 4})]),
            ?LOG_ERROR(Msg, [])
    end,
    ok.

-spec trigger_simple_projection(
        Table, Name, StandaloneFun, Path, OldProps, NewProps) ->
    Ret when
      Table :: ets:tid(),
      Name :: atom(),
      StandaloneFun :: khepri_fun:standalone_fun(),
      Path :: khepri_path:native_path(),
      OldProps :: khepri:node_props(),
      NewProps :: khepri:node_props(),
      Ret :: ok.
%% @hidden

trigger_simple_projection(
  Table, Name, StandaloneFun, Path, OldProps, NewProps) ->
    TryExec =
    fun(Args) ->
        try
            {ok, khepri_fun:exec(StandaloneFun, Args)}
        catch
            Class:Reason:Stacktrace ->
                %% Funs have better formatting:
                Fun = khepri_fun:to_fun(StandaloneFun),
                Exception = khepri_utils:format_exception(
                              Class, Reason, Stacktrace, #{column => 4}),
                Msg = io_lib:format("Failed to execute simple projection "
                                    "function: ~p~n"
                                    "  Projection: ~s~n"
                                    "  Path~n"
                                    "    ~p~n"
                                    "  Old props:~n"
                                    "    ~p~n"
                                    "  New props:~n"
                                    "    ~p~n"
                                    "  Args:~n"
                                    "    ~p~n"
                                    "  Crash:~n"
                                    "    ~ts",
                                    [Fun, Name, Path, OldProps, NewProps, Args,
                                     Exception]),
                ?LOG_ERROR(Msg, []),
                error
        end
    end,
    case {OldProps, NewProps} of
        {_, #{data := NewPayload}} ->
            case TryExec([Path, NewPayload]) of
                {ok, Record} ->
                    ets:insert(Table, Record);
                error ->
                    ok
            end;
        {#{data := OldPayload}, _} ->
            case TryExec([Path, OldPayload]) of
                {ok, Record} ->
                    ets:delete_object(Table, Record);
                error ->
                    ok
            end;
        {_, _} ->
            ok
    end,
    ok.