%% @doc Log handler and formatter for the logger app
%%
%% It is optimized to use a fixed size ring buffer and
%% return chucks of older logs first while storing new ones.
%% It can be synched to discard old logs if they are not needed anymore.
%% If the buffer is filled, oldest logs are dropped
%% and a fake logger event is inserted to inform the user.
%% @end
-module(grisp_connect_logger_bin).
-include_lib("kernel/include/logger.hrl").
% API
-export([sync/2]).
-export([chunk/2]).
% Behaviour logger_handler callback functions
-export([adding_handler/1]).
-export([changing_config/3]).
-export([removing_handler/1]).
-export([log/2]).
-export([filter_config/1]).
% Behaviour logger_formater callback functions
-export([format/2]).
-export([check_config/4]).
% logger_h_common Callbacks
-export([init/2]).
-export([config_changed/3]).
-export([reset_state/2]).
-export([filesync/3]).
-export([write/4]).
-export([handle_info/3]).
-export([terminate/3]).
% Internal Callbacks
-export([queue_ctrl_init/1]).
-export([queue_ctrl_loop/1]).
%--- Macros --------------------------------------------------------------------
-define(CNT_KEY, {?MODULE, counters}).
-define(CNT_N, 1).
-define(CNT_SEQ, 1).
-define(DEFAULT_CONFIG, #{
count_max => 10_000,
% 10 MiB:
bytes_max => 10 * 1_024 * 1_024
}).
-define(DEFAULT_CALL_TIMEOUT, 5000).
% FixMe:
% Sending over ~30_000 bytes over WS breaks rtems I/O driver.
% We want avoid to return chunks that are bigger then that.
-define(MAX_LOG_BYTES, 30_000).
%--- API -----------------------------------------------------------------------
sync(Seq, DroppedConfirmed) when
is_integer(Seq), is_integer(DroppedConfirmed)
->
case whereis(?MODULE) of
undefined ->
?LOG_WARNING("grisp_connect log handler not setup properly"),
ok;
Pid ->
call(Pid, {sync, Seq, DroppedConfirmed})
end.
%% @doc
%% The chunk will contain Count logs at maximum.
%% while ensuring that the whole chunk is less then MaxBytes.
chunk(MaxCount, MaxBytes) ->
case whereis(?MODULE) of
undefined ->
?LOG_WARNING("grisp_connect log handler not setup properly"),
{[], 0};
Pid ->
insert_drop_event(call(Pid, {chunk, MaxCount, MaxBytes}))
end.
%--- Behaviour logger_handler Callback Callbacks -------------------------------
adding_handler(Config) ->
logger_h_common:adding_handler(Config).
changing_config(SetOrUpdate, OldConfig, NewConfig) ->
logger_h_common:changing_config(SetOrUpdate, OldConfig, NewConfig).
removing_handler(Config) ->
logger_h_common:removing_handler(Config).
log(LogEvent, Config) ->
logger_h_common:log(LogEvent, Config).
filter_config(Config) ->
logger_h_common:filter_config(Config).
%--- Behaviour logger_formater Callback Functions ------------------------------
format(Event, Config) ->
case Config of
#{stdout := {Formatter, FormatterConfig}} ->
catch io:format(Formatter:format(Event, FormatterConfig));
_Else ->
ok
end,
Encoded = base64:encode(term_to_binary(Event)),
EncodedSize = byte_size(Encoded),
case EncodedSize > ?MAX_LOG_BYTES of
true ->
NewLog = log_discard(EncodedSize),
base64:encode(term_to_binary(NewLog));
false ->
Encoded
end.
check_config(_Name, _SetOrUpdate, _OldHConfig, NewHConfig0) ->
{ok, NewHConfig0}.
%--- logger_h_common Callbacks -------------------------------------------------
init(_Name, UserConfig) ->
CleanConfig = maps:with(maps:keys(?DEFAULT_CONFIG), UserConfig),
Counters =
case persistent_term:get(?CNT_KEY, undefined) of
undefined ->
C = atomics:new(?CNT_N, [{signed, false}]),
persistent_term:put(?CNT_KEY, C),
C;
C ->
C
end,
Config = maps:merge(?DEFAULT_CONFIG, CleanConfig),
{ok, #{pid => queue_ctrl_start(Config#{counters => Counters})}}.
config_changed(_Name, NewHConfig, #{pid := Pid} = State) ->
call(Pid, {config_changed, NewHConfig}),
State.
reset_state(_Name, State) -> State.
filesync(_Name, _SyncAsync, State) ->
{ok, State}.
write(_Name, async, Bin, #{pid := Pid} = State) ->
Pid ! {log, Bin},
{ok, State};
write(_Name, sync, Bin, #{pid := Pid} = State) ->
Result = call(Pid, {log, Bin}),
{Result, State}.
handle_info(_, _, State) ->
State.
terminate(_Name, _Reason, _State) ->
ok.
%--- Internal ------------------------------------------------------------------
call(Pid, Msg) ->
MRef = monitor(process, Pid),
Pid ! {Msg, {self(), MRef}},
receive
{MRef, Result} ->
demonitor(MRef, [flush]),
Result;
{'DOWN', MRef, _Type, _Object, Reason} ->
{error, Reason}
after ?DEFAULT_CALL_TIMEOUT ->
%% If this timeout triggers we will get a stray
%% reply message in our mailbox eventually.
%% That does not really matter though as it will
%% end up in this module's handle_info and be ignored
demonitor(MRef, [flush]),
{error, {no_response, Pid}}
end.
queue_ctrl_start(Config) ->
spawn_link(fun() -> ?MODULE:queue_ctrl_init(Config) end).
queue_ctrl_init(#{counters := Counters} = Config) ->
register(?MODULE, self()),
?MODULE:queue_ctrl_loop(#{
counters => Counters,
binlog => grisp_connect_binlog:new(Config),
dropped => 0
}).
queue_ctrl_loop(S0) ->
S2 =
receive
{log, Bin} ->
insert(Bin, S0);
{{log, Bin}, From} ->
S1 = insert(Bin, S0),
reply(From, ok),
S1;
{config_changed, _Config} ->
S0;
{{sync, Seq, DroppedConfirmed}, From} ->
S1 = sync(Seq, DroppedConfirmed, S0),
reply(From, ok),
S1;
{{chunk, Count, Bytes}, From} ->
reply(From, peek(Count, Bytes, S0)),
S0
end,
?MODULE:queue_ctrl_loop(S2).
reply({From, MRef}, Msg) -> From ! {MRef, Msg}.
insert(Event, #{counters := Counters, binlog := B0, dropped := D0} = State) ->
Seq = atomics:add_get(Counters, ?CNT_SEQ, 1),
{B1, D1} = grisp_connect_binlog:insert({Seq, Event}, B0),
State#{binlog := B1, dropped := D0 + D1}.
sync(Seq, DroppedConfirmed, #{binlog := B0, dropped := Dropped} = State) ->
B1 = grisp_connect_binlog:truncate(Seq, B0),
State#{binlog := B1, dropped := Dropped - DroppedConfirmed}.
peek(Count, MaxBytes, #{binlog := B0, dropped := Dropped}) ->
Events = grisp_connect_binlog:peek(Count, B0),
{trim_to_size(Events, MaxBytes), Dropped}.
trim_to_size(Events, MaxBytes) ->
rec_trim_to_size(Events, MaxBytes, 0, []).
rec_trim_to_size([], _, _, Acc) ->
lists:reverse(Acc);
rec_trim_to_size([{_Seq, Bin} = E | TL], MaxBytes, AccSize, Acc) ->
NewSize = byte_size(Bin) + AccSize,
case NewSize > MaxBytes of
true -> lists:reverse(Acc);
false -> rec_trim_to_size(TL, MaxBytes, NewSize, [E | Acc])
end.
insert_drop_event({Events, 0}) ->
{Events, 0};
insert_drop_event({Events, Dropped}) ->
{[drop_event(Dropped) | Events], Dropped}.
log_discard(Size) ->
Meta = simulate_log_metadata(),
Report = {report, #{
event => log_discarded,
reason => too_long,
size => Size}
},
#{level => warning, meta => Meta, msg => Report}.
drop_event(Count) ->
% Create a fake logger event mimicking a real log event as much as possible
Meta = simulate_log_metadata(),
Report = {report, #{event => log_drop, count => Count}},
{null, format(#{level => warning, meta => Meta, msg => Report}, #{})}.
simulate_log_metadata() ->
% See internal function logger:add_default_metadata/2
DefaultMetadata = #{
pid => self(),
gl => group_leader(),
time => logger:timestamp()
},
PrimaryConfig = logger:get_primary_config(),
PrimaryMetadata = maps:get(logger_metadata, PrimaryConfig, #{}),
% See internal function logger:log_allowed/4
ProcessMetadata =
case logger:get_process_metadata() of
ProcMeta when is_map(ProcMeta) -> ProcMeta;
_ -> #{}
end,
lists:foldl(
fun(M, MAcc) ->
maps:merge(MAcc, M)
end,
DefaultMetadata,
[?LOCATION, PrimaryMetadata, ProcessMetadata]
).