%% @author Maas-Maarten Zeeman <mmzeeman@xs4all.nl>
%% @copyright 2013-2024 Maas-Maarten Zeeman
%% @doc Module for handling request statistics.
%% @end
%% Copyright 2013-2024 Maas-Maarten Zeeman
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(z_stats).
-include_lib("zotonic_core/include/zotonic.hrl").
-behaviour(gen_server).
-export([init_system/0, init_site/1]).
-export([
start_link/1,
init/1,
handle_call/3,
handle_cast/2,
handle_info/2
]).
%% Act as a webmachine logger
-export([
log_access/1,
record_event/3,
record_count/4,
record_duration/4,
system_usage/1,
filezcache_stats/0
]).
-record(state, {
backoff = 0 :: non_neg_integer(),
buffers :: list( atom() )
}).
%% Initial delay after start before consuming the buffers.
-define(INITIAL_DELAY, 100).
%% Backoff if no logging happening, values in msecs.
-define(BACKOFF_QUIET, 10).
-define(BACKOFF_INCREMENT, 10).
-define(BACKOFF_MAX, 2000).
%% Batch size of metrics handling per poll
-define(BATCH_SIZE, 200).
%% Tell dialyzer to not complain about an improper list in a match expression.
-dialyzer({[ no_return, nowarn_function ], setup_system_reporter/0}).
%% -------------------------------------------------------------------------------------------------
%% API functions
%% -------------------------------------------------------------------------------------------------
%% @doc Start the log buffer consumer.
start_link(Buffers) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, Buffers, []).
%% @doc Initialize the statistics collection machinery.
init_system() ->
create_system_metrics(),
setup_system_reporter(),
ok.
% @doc Setup stats for each site.
init_site(Site) ->
lists:foreach(
fun({Path, Stat, Options}) ->
ok = exometer:re_register(Path, Stat, Options)
end,
site_stats( Site )).
site_stats( Site ) ->
[
% db pool status
{
[site, Site, db, pool],
{function, z_db_pool, status, [ z_context:new(Site) ], match, {'_', {workers, working}}},
[]
},
% Keep track of the size of the depcache
{
[site, Site, depcache, size],
{function, z_depcache, size, [ z_context:new(Site) ], match, size},
[]
},
% And some basic broker statistics
{
[site, Site, broker, session_count],
{function, mqtt_sessions, session_count, [Site], match, count},
[]
},
{
[site, Site, broker, router_info],
{function, mqtt_sessions, router_info, [Site], value, []},
[]
}
].
% @doc Count a event
record_event(System, What, #context{}=Context) ->
record_event(System, What, z_context:site(Context));
record_event(System, What, Site) when is_atom(Site) ->
ok = exometer:update_or_create([site, Site, System, What], 1, spiral, []).
% @doc Count some amount, like data transfers
record_count(System, What, Count, #context{}=Context) ->
record_count(System, What, Count, z_context:site(Context));
record_count(System, What, Count, Site) when is_atom(Site) ->
ok = exometer:update_or_create([site, Site, System, What], Count, spiral, []).
% @doc Record a duration
record_duration(System, What, Duration, #context{}=Context) ->
Site = z_context:site(Context),
record_duration(System, What, Duration, Site);
record_duration(System, What, Duration, Site) when is_atom(Site) ->
record_event(System, What, Site),
ok = exometer:update_or_create([site, Site, System, What, duration], Duration, histogram, []).
%% -------------------------------------------------------------------------------------------------
%% gen_server callbacks
%% -------------------------------------------------------------------------------------------------
init(Buffers) ->
{ok, #state{ buffers = Buffers }, ?INITIAL_DELAY}.
handle_call(_Cmd, _From, State) ->
{stop, not_implemented, State}.
handle_cast(_Cmd, State) ->
{stop, not_implemented, State}.
handle_info(timeout, #state{ buffers = Buffers } = State) ->
% Poll the buffers, start with draining the first one, then drain
% the other buffers.
% If drained, then increment backoff counter (till max)
% If entries found, reset backoff counter.
maybe_backoff(drain_buffers(Buffers, 0), State).
maybe_backoff({empty, 0}, #state{ backoff = BackOff } = State) ->
BackOff1 = erlang:max(BackOff + ?BACKOFF_INCREMENT, ?BACKOFF_MAX),
{noreply, State#state{ backoff = BackOff1 }, BackOff1};
maybe_backoff({empty, _Count}, State) ->
{noreply, State#state{ backoff = 0 }, ?BACKOFF_QUIET};
maybe_backoff({full, _Count}, State) ->
{noreply, State#state{ backoff = 0 }, 0}.
%% -------------------------------------------------------------------------------------------------
%% local functions
%% -------------------------------------------------------------------------------------------------
drain_buffers([], Count) ->
{empty, Count};
drain_buffers([ Buffer | Buffers ] = Bs, Count) when Count < ?BATCH_SIZE ->
case ringbuffer:read(Buffer) of
{ok, {Skipped, Metric}} ->
report_skipped(Skipped),
log_access(Metric),
drain_buffers(Bs, Count+1);
{error, empty} ->
drain_buffers(Buffers, Count)
end;
drain_buffers(_, Count) ->
{full, Count}.
report_skipped(0) -> ok;
report_skipped(Count) -> record_count(stats, skipped, Count, zotonic).
%% @doc Collect log data from cowmachine and update cowmachine metrics
%%
%% The log entry from zotonic_listen_http_metrics is:
%% ```
%% Log = #{
%% site => Site,
%% reason => Reason,
%% req_start => ReqStart,
%% duration_total_usec => DurationTotalUsec,
%% duration_process_usec => DurationProcessUsec,
%% resp_status => RespStatus,
%% resp_status_category => StatusCategory,
%% req_bytes => ReqBodyLength,
%% resp_bytes => RespBodyLength,
%% http_version => cowboy_req:version(Req),
%% method => cowboy_req:method(Req),
%% path => cowboy_req:path(Req),
%% user_agent => cowboy_req:header(<<"user-agent">>, Req),
%% referer => cowboy_req:header(<<"referer">>, Req),
%% metrics => UserData#{ peer_ip => PeerIP }
%% }
%% '''
log_access(#{
site := Site,
req_start := ReqStart,
resp_status := Status,
resp_status_category := StatusCat,
method := Method
} = MetricsData) when Site =/= undefined, is_atom(Site) ->
try
handle_cowmachine_stats(MetricsData)
after
Context = z_context:new(Site, 'en', <<"UTC">>),
Msg = #http_log_access{
timestamp = monotonic_time_to_timestamp(ReqStart),
status = Status,
status_category = StatusCat,
method = Method,
metrics = MetricsData
},
z_notifier:notify_sync(Msg, Context)
end;
log_access(_Metrics) ->
ok.
monotonic_time_to_timestamp(MonotonicTime) ->
Time = erlang:convert_time_unit(MonotonicTime, native, second) + erlang:time_offset(second),
MegaSecs = Time div 1000000,
Secs = Time rem 1000000,
{MegaSecs, Secs, 0}.
% @private Register the request.
handle_cowmachine_stats(#{
site := Site,
duration_process_usec := DurationUSec,
resp_status_category := StatusCategory,
req_bytes := DataIn,
resp_bytes := DataOut,
metrics := Metrics
}) ->
DispatchRule = maps:get(dispatch_rule, Metrics, unknown),
PathPrefix = [site, Site, cowmachine, DispatchRule],
exometer:update_or_create(PathPrefix ++ [StatusCategory], 1, spiral, []),
if DurationUSec > 0 ->
exometer:update_or_create(PathPrefix ++ [duration], DurationUSec, histogram, []);
true ->
ok
end,
case DataIn of
undefined -> ok;
0 -> ok;
_ -> exometer:update_or_create(PathPrefix ++ [data_in], DataIn, spiral, [])
end,
case DataOut of
undefined -> ok;
0 -> ok;
_ -> exometer:update_or_create(PathPrefix ++ [data_out], DataOut, spiral, [])
end,
ok.
% Return the usage in percentage, for atoms, ports and processes.
system_usage(atom) -> system_usage_helper(atom_count, atom_limit);
system_usage(port) -> system_usage_helper(port_count, port_limit);
system_usage(ets) -> system_usage_helper(ets_count, ets_limit);
system_usage(process) -> system_usage_helper(process_count, process_limit);
% Memory
system_usage(ets_memory) -> system_memory_usage_helper(ets);
system_usage(code_memory) -> system_memory_usage_helper(code);
system_usage(process_memory) -> system_memory_usage_helper(processes);
system_usage(binary_memory) -> system_memory_usage_helper(binary);
system_usage(atom_memory) -> system_memory_usage_helper(atom).
%% Returns the usage in percentage
system_usage_helper(Count, Limit) ->
erlang:round((erlang:system_info(Count) / erlang:system_info(Limit)) * 100).
%% Returns the memory usage in percentage
system_memory_usage_helper(Type) ->
erlang:round((erlang:memory(Type) / erlang:memory(total)) * 100).
filezcache_stats() ->
Stats = maps:with([
bytes, max_bytes, entries,
insert_count, delete_count,
hit_count, miss_count, evict_count
], filezcache:stats()),
maps:to_list(Stats).
% @private system_stats
create_system_metrics() ->
lists:foreach(
fun({Path, Stat, Options}) ->
ok = exometer:re_register(Path, Stat, Options)
end,
system_stats()).
system_stats() ->
[
{
[erlang, memory],
{function, erlang, memory, [], value, []},
[]
},
{
[erlang, usage],
{function, z_stats, system_usage, ['$dp'], value,
[
atom, process, port, ets_memory, binary_memory,
code_memory, process_memory, atom_memory
]
},
[]
},
{
[erlang, system],
{function, erlang, system_info, ['$dp'], value,
[
process_count, process_limit, port_count, port_limit,
atom_count, atom_limit
]
},
[]
},
{
[erlang, statistics],
{function, erlang, statistics, ['$dp'], value, [run_queue]},
[]
},
{
[erlang, gc],
{function, erlang, statistics, [garbage_collection], match, {total_coll, rec_wrd, '_'}},
[]
},
{
[erlang, io],
{function, erlang, statistics, [io], match, {{'_', input}, {'_', output}}},
[]
},
{
[statistics, filezcache],
{function, z_stats, filezcache_stats, [], value, []},
[]
}
].
% @private Setup mqtt reporter
setup_system_reporter() ->
add_system_reporter(),
ok = exometer_report:subscribe(system_reporter,
{select,
[{ {[site | '_'], '_', enabled}, [], ['$_'] }]},
default,
10000),
ok = exometer_report:subscribe(system_reporter,
{select,
[{ {[erlang | '_'], '_', enabled}, [], ['$_'] }]},
default,
10000),
ok = exometer_report:subscribe(system_reporter,
{select,
[{ {[statistics | '_'], '_', enabled}, [], ['$_'] }]},
default,
10000),
ok.
% Add the system reporter. It publishes on the default mqtt pool for
% the whole system.
add_system_reporter() ->
case exometer_report:add_reporter(
system_reporter,
[
{module, z_exometer_mqtt},
{status, enabled},
{report_bulk, true},
{topic_prefix, [<<"$SYS">>]},
{context, #context{ site = '-mqtt-', acl = admin }}
])
of
ok -> ok;
{error, already_running} -> ok
end.
% datapoints() ->
% [counter, spiral, gauge, histogram, meter].
% datapoints(counter) ->[value];
% datapoints(spiral) -> [count, one];
% datapoints(gauge) -> [value];
% datapoints(histogram) -> [mean, min, max, 50, 95, 99, 999];
% datapoints(meter) -> [count, one, five, fifteen, day, mean].