src/erlperf_monitor.erl

%%%-------------------------------------------------------------------
%%% @copyright (C) 2019-2023, Maxim Fedorov
%%% @doc
%%% System monitor: reports scheduler, RAM, and benchmarks.
%%%
%%% Monitor is started by default when {@link erlperf} starts
%%% as an application. Monitor is not started for ad-hoc
%%% benchmarking (e.g. command-line, unless verbose logging
%%% is requested).
%%%
%%% When started, the monitor provides periodic reports
%%% about Erlang VM state, and registered jobs performance.
%%% The reports are sent to all processes that joined
%%% `{erlperf_monitor, Node}' or `cluster_monitor' process
%%% group in `erlperf' scope.
%%%
%%% Reports can be received by any process, even the shell. Run
%%% the following example in `rebar3 shell' of `erlperf':
%%% ```
%%% (erlperf@ubuntu22)1> ok = pg:join(erlperf, cluster_monitor, self()).
%%% ok
%%% (erlperf@ubuntu22)2> erlperf:run(rand, uniform, []).
%%% 14976933
%%% (erlperf@ubuntu22)4> flush().
%%% Shell got {erlperf@ubuntu22,#{dcpu => 0.0,dio => 6.42619095979426e-4,
%%%                         ets => 44,jobs => [],memory_binary => 928408,
%%%                         memory_ets => 978056,
%%%                         memory_processes => 8603392,
%%%                         memory_total => 34952096,ports => 5,
%%%                         processes => 95,
%%%                         sched_util => 0.013187335960637163,
%%% '''
%%%
%%% Note that the monitor may report differently from the benchmark
%%% run results. It is running with lower priority and may be significantly
%%% affected by scheduler starvation, timing issues etc..
%%%
%%%
%%%
%%% @end
-module(erlperf_monitor).
-author("maximfca@gmail.com").

-behaviour(gen_server).

%% API
-export([
    start/0,
    start/1,
    start_link/0,
    start_link/1,
    register/3,
    unregister/1
]).

%% gen_server callbacks
-export([
    init/1,
    handle_call/3,
    handle_cast/2,
    handle_info/2
]).


-include_lib("kernel/include/logger.hrl").

-define(DEFAULT_TICK_INTERVAL_MS, 1000).


-type monitor_sample() :: #{
    time := integer(),
    node := node(),
    sched_util := float(),
    dcpu := float(),
    dio := float(),
    processes := integer(),
    ports := integer(),
    ets := integer(),
    memory_total := non_neg_integer(),
    memory_processes := non_neg_integer(),
    memory_binary := non_neg_integer(),
    memory_ets := non_neg_integer(),
    jobs => [{Job :: pid(), Cycles :: non_neg_integer()}]
}.
%% Monitoring report
%%
%% <ul>
%%   <li>`time': timestamp when the report is generates, wall clock, milliseconds</li>
%%   <li>`node': originating Erlang node name</li>
%%   <li>`sched_util': normal scheduler utilisation, percentage. See {@link scheduler:utilization/1}</li>
%%   <li>`dcpu': dirty CPU scheduler utilisation, percentage.</li>
%%   <li>`dio': dirty IO scheduler utilisation, percentage</li>
%%   <li>`processes': number of processes in the VM.</li>
%%   <li>`ports': number of ports in the VM.</li>
%%   <li>`ets': number of ETS tables created in the VM.</li>
%%   <li>`memory_total': total VM memory usage, see {@link erlang:memory/1}.</li>
%%   <li>`memory_processes': processes memory usage, see {@link erlang:system_info/1}.</li>
%%   <li>`memory_binary': binary memory usage.</li>
%%   <li>`memory_ets': ETS memory usage.</li>
%%   <li>`jobs': a map of job process identifier to the iterations surplus
%%    since last sample. If the sampling interval is default 1 second, the
%%    value of the map is "requests/queries per second" (RPS/QPS).</li>
%% </ul>

-type start_options() :: #{
    interval => pos_integer()
}.
%% Monitor startup options
%%
%% <ul>
%%   <li>`interval': monitoring interval, 1000 ms by default</li>
%% </ul>

-export_type([monitor_sample/0, start_options/0]).

%% @equiv start(#{interval => 1000})
-spec start() -> {ok, Pid :: pid()} | {error, Reason :: term()}.
start() ->
    start(#{interval => ?DEFAULT_TICK_INTERVAL_MS}).

%% @doc
%% Starts the monitor.
%%
%% `Options' are used to change the monitor behaviour.
%% <ul>
%%    <li>`interval': time, in milliseconds, to wait between sample collection</li>
%% </ul>
-spec start(Options :: start_options()) -> {ok, Pid :: pid()} | {error, Reason :: term()}.
start(Options) ->
    gen_server:start({local, ?MODULE}, ?MODULE, Options, []).

%% @equiv start_link(#{interval => 1000})
-spec(start_link() -> {ok, Pid :: pid()} | {error, Reason :: term()}).
start_link() ->
    start_link(#{interval => ?DEFAULT_TICK_INTERVAL_MS}).

%% @doc
%% Starts the monitor and links it to the current process. See {@link start/1}
%% for options description.
start_link(Options) ->
    gen_server:start_link({local, ?MODULE}, ?MODULE, Options, []).

%% @doc
%% Registers an {@link erlperf_job} to monitor.
%%
%% Running monitor queries every registered job, adding
%% the number of iterations performed by all workers of
%% that job to the report.
%% This API is intended to be used by {@link erlperf_job}
%% to enable VM monitoring while benchmarking.
%%
%% `Job' specifies job process identifier, it is only
%% used to detect when job is stopped, to stop reporting
%% counters for that job.
%%
%% `Handle' is the sampling handle, see {@link erlperf_job:handle/1}.
%%
%% `Initial' value should be provided when an existing job
%% is registered, to avoid reporting accumulated counter value
%% in the first report for that job.
%%
%% Always return `ok', even when monitor is not running.
-spec register(pid(), term(), non_neg_integer()) -> ok.
register(Job, Handle, Initial) ->
    gen_server:cast(?MODULE, {register, Job, Handle, Initial}).

%% @doc
%% Removes the job from monitoring.
%%
%% Stops reporting this job performance.
%%
%% `Job' is the process identifier of the job.
-spec unregister(pid()) -> ok.
unregister(Job) ->
    gen_server:cast(?MODULE, {unregister, Job}).

%%%===================================================================
%%% gen_server callbacks

%% System monitor state
-record(state, {
    % bi-map of job processes to counters
    jobs :: [{pid(), reference(), Handle :: erlperf_job:handle(), Prev :: integer()}],
    % scheduler data saved from last call
    sched_data :: [{pos_integer(), integer(), integer()}],
    % number of normal schedulers
    normal :: pos_integer(),
    % number of dirty schedulers
    dcpu :: pos_integer(),
    %
    tick = ?DEFAULT_TICK_INTERVAL_MS :: pos_integer(),
    next_tick :: integer()
}).

%% @private
init(#{interval := Tick}) ->
    %% TODO: figure out if there is a way to find jobs after restart.
    %% ask a supervisor? but not all jobs are supervised...
    %% Jobs = [{Pid, erlperf_job:handle(Pid), 0} ||
    %%        {_, Pid, _, _} <- try supervisor:which_children(erlperf_job_sup) catch exit:{noproc, _} -> [] end],
    %% [monitor(process, Pid) || {Pid, _, _} <- Jobs],
    Jobs = [],
    %% enable scheduler utilisation calculation
    erlang:system_flag(scheduler_wall_time, true),
    Next = erlang:monotonic_time(millisecond) + Tick,
    erlang:start_timer(Next, self(), tick, [{abs, true}]),
    {ok, #state{
        tick = Tick,
        jobs = Jobs,
        next_tick = Next,
        sched_data = lists:sort(erlang:statistics(scheduler_wall_time_all)),
        normal = erlang:system_info(schedulers),
        dcpu = erlang:system_info(dirty_cpu_schedulers)}
    }.

%% @private
handle_call(_Request, _From, _State) ->
    erlang:error(notsup).

%% @private
handle_cast({register, Job, Handle, Initial}, #state{jobs = Jobs} = State) ->
    MRef = monitor(process, Job),
    {noreply, State#state{jobs = [{Job, MRef, Handle, Initial} | Jobs]}};
handle_cast({unregister, Job}, #state{jobs = Jobs} = State) ->
    case lists:keyfind(Job, 1, Jobs) of
        {Job, MRef, _, _} ->
            demonitor(MRef, [flush]),
            {noreply, State#state{jobs = lists:keydelete(Job, 1, Jobs)}};
        false ->
            {noreply, State}
    end.

%% @private
handle_info({'DOWN', _MRef, process, Pid, _Reason}, #state{jobs = Jobs} = State) ->
    {noreply, State#state{jobs = lists:keydelete(Pid, 1, Jobs)}};

%% @private
handle_info({timeout, _, tick}, State) ->
    {noreply, handle_tick(State)}.

%%%===================================================================
%%% Internal functions

handle_tick(#state{sched_data = Data, normal = Normal, dcpu = Dcpu} = State) ->
    NewSched = lists:sort(erlang:statistics(scheduler_wall_time_all)),
    {NU, DU, DioU} = fold_normal(Data, NewSched, Normal, Dcpu, 0, 0),
    % add benchmarking info
    {Jobs, UpdatedJobs} = lists:foldl(
        fun ({Pid, MRef, Handle, Prev}, {J, Save}) ->
            Cycles =
                case erlperf_job:sample(Handle) of
                    C when is_integer(C) -> C;
                    undefined -> Prev %% job is stopped, race condition here
                end,
            {[{Pid, Cycles - Prev} | J], [{Pid, MRef, Handle, Cycles} | Save]}
        end, {[], []}, State#state.jobs),
    %
    Sample = #{
        time => erlang:system_time(millisecond),
        node => node(),
        memory_total => erlang:memory(total),
        memory_processes => erlang:memory(processes),
        memory_binary => erlang:memory(binary),
        memory_ets => erlang:memory(ets),
        sched_util => NU * 100,
        dcpu => DU * 100,
        dio => DioU * 100,
        processes => erlang:system_info(process_count),
        ports => erlang:system_info(port_count),
        ets => erlang:system_info(ets_count),
        jobs => Jobs},
    % notify local & global subscribers
    Subscribers = pg:get_members(erlperf, {erlperf_monitor, node()}) ++ pg:get_members(erlperf, cluster_monitor),
    [Pid ! Sample || Pid <- Subscribers],
    %%
    NextTick = State#state.next_tick + State#state.tick,
    erlang:start_timer(NextTick, self(), tick, [{abs, true}]),
    State#state{sched_data = NewSched, next_tick = NextTick, jobs = lists:reverse(UpdatedJobs)}.

%% Iterates over normal scheduler
fold_normal(Old, New, 0, Dcpu, AccActive, AccTotal) ->
    fold_dirty_cpu(Old, New, Dcpu, AccActive / AccTotal, 0, 0);
fold_normal([{N, OldActive, OldTotal} | Old],
    [{N, NewActive, NewTotal} | New], Normal, Dcpu, AccActive, AccTotal) ->
    fold_normal(Old, New, Normal - 1, Dcpu, AccActive + (NewActive - OldActive),
        AccTotal + (NewTotal - OldTotal)).

%% Iterates over DCPU
fold_dirty_cpu(Old, New, 0, NormalPct, AccActive, AccTotal) ->
    fold_dirty_io(Old, New, NormalPct, AccActive / AccTotal, 0, 0);
fold_dirty_cpu([{N, OldActive, OldTotal} | Old],
    [{N, NewActive, NewTotal} | New], Dcpu, NormalPct, AccActive, AccTotal) ->
    fold_dirty_cpu(Old, New, Dcpu - 1, NormalPct, AccActive + (NewActive - OldActive),
        AccTotal + (NewTotal - OldTotal)).

%% Remaining are dirty IO
fold_dirty_io([], [], NormalPct, DcpuPct, AccActive, AccTotal) ->
    {NormalPct, DcpuPct, AccActive / AccTotal};
fold_dirty_io([{N, OldActive, OldTotal} | Old],
    [{N, NewActive, NewTotal} | New], NormalPct, DcpuPct, AccActive, AccTotal) ->
    fold_dirty_io(Old, New, NormalPct, DcpuPct, AccActive + (NewActive - OldActive),
        AccTotal + (NewTotal - OldTotal)).