src/khepri_event_handler.erl

%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright © 2022-2024 Broadcom. All Rights Reserved. The term "Broadcom"
%% refers to Broadcom Inc. and/or its subsidiaries.
%%

%% @hidden

-module(khepri_event_handler).
-behaviour(gen_server).

-include_lib("kernel/include/logger.hrl").
-include_lib("stdlib/include/assert.hrl").

-include("include/khepri.hrl").
-include("src/khepri_machine.hrl").

-export([start_link/0,
         handle_triggered_sprocs/2]).

-export([init/1,
         handle_call/3,
         handle_cast/2,
         handle_info/2,
         terminate/2,
         code_change/3]).

-define(SERVER, ?MODULE).

-define(SILENCE_ERROR_FOR, 10000).

-record(?MODULE, {trigger_crashes = #{}}).

start_link() ->
    gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).

handle_triggered_sprocs(_StoreId, []) ->
    ok;
handle_triggered_sprocs(StoreId, TriggeredStoredProcs) ->
    gen_server:cast(?SERVER, {?FUNCTION_NAME, StoreId, TriggeredStoredProcs}).

init(_) ->
    erlang:process_flag(trap_exit, true),
    State = #?MODULE{},
    {ok, State}.

handle_call(Request, From, State) ->
    ?LOG_WARNING(
       "Unhandled handle_call request from ~0p: ~p",
       [From, Request]),
    {State1, Timeout} = log_accumulated_trigger_crashes(State),
    {reply, ok, State1, Timeout}.

handle_cast(
  {handle_triggered_sprocs, StoreId, TriggeredStoredProcs},
  #?MODULE{trigger_crashes = Crashes} = State) ->
    State1 =
    lists:foldl(
      fun
          (#triggered{id = TriggerId,
                      sproc = StoredProc,
                      event_filter = EventFilter,
                      props = Props},
           S) ->
              Args = [Props],
              try
                  %% TODO: Be flexible and accept a function with an arity of
                  %% 0.
                  _ = khepri_sproc:run(StoredProc, Args),
                  S
              catch
                  Class:Reason:Stacktrace ->
                      Key = {Class, Reason, Stacktrace},
                      case Crashes of
                          #{Key := {Timestamp, Count, Msg}} ->
                              Crashes1 = Crashes#{
                                           Key => {Timestamp, Count + 1, Msg}},
                              S#?MODULE{trigger_crashes = Crashes1};
                          _ ->
                              Msg = io_lib:format(
                                      "Triggered stored procedure crash~n"
                                      "  Store ID: ~s~n"
                                      "  Trigger ID: ~s~n"
                                      "  Event filter:~n"
                                      "    ~p~n"
                                      "  Event props:~n"
                                      "    ~p~n"
                                      "  Crash:~n"
                                      "    ~ts",
                                      [StoreId, TriggerId, EventFilter, Props,
                                       khepri_utils:format_exception(
                                         Class, Reason, Stacktrace,
                                         #{column => 4})]),
                              ?LOG_ERROR(Msg, []),

                              Timestamp = erlang:monotonic_time(millisecond),
                              Crashes1 = Crashes#{
                                           Key => {Timestamp, 1, Msg}},
                              S#?MODULE{trigger_crashes = Crashes1}
                      end
              end
      end, State, TriggeredStoredProcs),
    _ = khepri_machine:ack_triggers_execution(StoreId, TriggeredStoredProcs),
    {State2, Timeout} = log_accumulated_trigger_crashes(State1),
    {noreply, State2, Timeout};
handle_cast(Request, State) ->
    ?LOG_WARNING("Unhandled handle_cast request: ~p", [Request]),
    {State1, Timeout} = log_accumulated_trigger_crashes(State),
    {noreply, State1, Timeout}.

handle_info(timeout, State) ->
    {State1, Timeout} = log_accumulated_trigger_crashes(State),
    {noreply, State1, Timeout};
handle_info(Msg, State) ->
    ?LOG_WARNING("Unhandled handle_info message: ~p", [Msg]),
    {State1, Timeout} = log_accumulated_trigger_crashes(State),
    {noreply, State1, Timeout}.

terminate(_Reason, _State) ->
    ok.

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

log_accumulated_trigger_crashes(
  #?MODULE{trigger_crashes = Crashes} = State)
  when Crashes =:= #{} ->
    {State, infinity};
log_accumulated_trigger_crashes(
  #?MODULE{trigger_crashes = Crashes} = State) ->
    Now = erlang:monotonic_time(millisecond),
    Crashes1 = maps:filter(
                 fun
                     (_Key, {Timestamp, Count, Msg})
                       when Now - Timestamp >= ?SILENCE_ERROR_FOR andalso
                            Msg > 1 ->
                         ?LOG_ERROR(
                            "~ts~n"
                            "(this crash occurred ~b times in the last ~b "
                            "seconds)",
                            [Msg, Count, (Now - Timestamp) div 1000]),
                         false;
                     (_Key, _Value) ->
                         true
                 end, Crashes),
    State1 = State#?MODULE{trigger_crashes = Crashes1},
    case Crashes =:= #{} of
        true  -> {State1, infinity};
        false -> {State1, ?SILENCE_ERROR_FOR}
    end.