%%% @copyright (C) 2019-2023, Maxim Fedorov
%%% @doc
%%% Convenience APIs for benchmarking.
%%%
%%% This module implements following benchmarking modes:
%%% <ul>
%%% <li>Continuous mode</li>
%%% <li>Timed (low overhead) mode</li>
%%% <li>Concurrency estimation (squeeze) mode</li>
%%% </ul>
%%%
%%% <h2>Continuous mode</h2>
%%% This is the default mode. Separate {@link erlperf_job} is started for
%%% each benchmark, iterating supplied runner in a tight loop,
%%% bumping a counter for each iteration of each worker. `erlperf' reads
%%% this counter every second (or `sample_duration'), calculating the
%%% difference between current and previous value. This difference is
%%% called a <strong>sample</strong>.
%%%
%%% By default, `erlperf' collects 3 samples and stops, reporting the average.
%%% To give an example, if your function runs for 20 milliseconds, `erlperf'
%%% may capture samples with 48, 52 and 50 iterations. The average would be 50.
%%%
%%% This approach works well for CPU-bound calculations, but may produce
%%% unexpected results for slow functions taking longer than sample duration.
%%% For example, timer:sleep(2000) with default settings yields zero throughput.
%%% You can change the sample duration and the number of samples to take to
%%% avoid that.
%%%
%%% <h2>Timed mode</h2>
%%% In this mode `erlperf' loops your code a specified amount of times, measuring
%%% how long it took to complete. It is essentially what {@link timer:tc/3} does. This mode
%%% has slightly less overhead compared to continuous mode. This difference may be
%%% significant if you’re profiling low-level ERTS primitives.
%%%
%%% This mode does not support `concurrency' setting (concurrency locked to 1).
%%%
%%% <h2>Concurrency estimation mode</h2>
%%% In this mode `erlperf' attempts to estimate how concurrent the supplied
%%% runner code is. The run consists of multiple passes, increasing concurrency
%%% with each pass, and stopping when total throughput is no longer growing.
%%% This mode proves useful to find concurrency bottlenecks. For example, some
%%% functions may have limited throughput because they execute remote calls
%%% served by a single process. See {@link benchmark/3} for the detailed
%%% description.
%%%
%%%
%%% @end
-module(erlperf).
-author("maximfca@gmail.com").
%% Public API for single-run simple benchmarking
%% Programmatic access.
-export([
benchmark/3,
compare/2,
record/4,
run/1,
run/2,
run/3,
start/2,
time/2
]).
%% Exported for testing purposes only.
-export([report_stats/1]).
%% compare/2 accepts code map, or just the runner code
-type code() :: erlperf_job:code_map() | erlperf_job:callable().
%% Convenience type used in `run/1,2,3' and `compare/2'.
%% node isolation options:
-type isolation() :: #{
host => string()
}.
%% Node isolation settings.
%%
%% Currently, `host' selection is not supported.
-type run_options() :: #{
concurrency => pos_integer(),
sample_duration => pos_integer() | undefined | {timed, pos_integer()},
warmup => non_neg_integer(),
samples => pos_integer(),
cv => float() | undefined,
priority => erlang:priority_level(),
report => basic | extended | full,
isolation => isolation()
}.
%% Benchmarking mode selection and parameters of the benchmark run.
%%
%% <ul>
%% <li>`concurrency': number of workers to run, applies only for the continuous
%% benchmarking mode</li>
%% <li>`cv': coefficient of variation. Acceptable standard deviation for the test
%% to conclude. Not applicable for timed mode. When the value is set, benchmark
%% will continue running until standard deviation for the last collected `samples'
%% divided by average value (arithmetic mean) is smaller than `cv' specified.</li>
%% <li>`isolation': request separate Erlang VM instance
%% for each job. Some benchmarks may lead to internal VM structures corruption,
%% or change global structures affecting other benchmarks when running in the
%% same VM. `host' sub-option is currently ignored.</li>
%% <li>`priority': sets the job controller process priority (defaults to `high').
%% Running with `normal' or lower priority may prevent the controller from timely
%% starting ot stopping workers.</li>
%% <li>`report': applies only for continuous mode. `basic' report contains only
%% the average value. Specify `extended' to get the list of actual samples,
%% to calculate exotic statistics. Pass `full' to receive full report, including
%% benchmark settings and extra statistics for continuous mode - minimum, maximum,
%% average, median and 99th percentile (more metrics may be added in future
%% releases)</li>
%% <li>`samples': number of measurements to take. Default is 3. For continuous mode
%% it results in a 3 second run, when `sample_duration' is set to default 1000 ms.
%% </li>
%% <li>`sample_duration': time, milliseconds, between taking iteration counter
%% samples. Multiplied by `samples', this parameter defines the total benchmark
%% run duration. Default is 1000 ms. Passing `{timed, Counter}` engages timed
%% mode with `Counter' iterations taken `samples' times</li>
%% <li>`warmup': how many extra samples are collected and discarded at the beginning
%% of the continuous run</li>
%% </ul>
%% Concurrency test options
-type concurrency_test() :: #{
threshold => pos_integer(),
min => pos_integer(),
step => pos_integer(),
max => pos_integer()
}.
%% Concurrency estimation mode options.
%%
%% <ul>
%% <li>`min': initial number of workers, default is 1</li>
%% <li>`step': increase the number of workers by this value on each iteration, default is 1</li>
%% <li>`max': maximum number of workers, defaults to `erlang:system_info(process_limit) - 1000'</li>
%% <li>`threshold': stop concurrency run when adding this amount of workers does
%% not result in further total throughput increase. Default is 3</li>
%% </ul>
%% Single run result: one or multiple samples (depending on report verbosity)
-type run_result() :: non_neg_integer() | [non_neg_integer()].
%% Benchmark results.
%%
%% For continuous mode, an average (arithmetic mean) of the collected samples,
%% or a list of all samples collected.
%% Timed mode returns elapsed time (microseconds).
%% Concurrency test result (non-verbose)
-type concurrency_result() :: {QPS :: non_neg_integer(), Concurrency :: non_neg_integer()}.
%% Basic concurrency estimation report
%%
%% Only the highest throughput run is reported. `Concurrency' contains the number of
%% concurrently running workers when the best result is achieved.
%% Extended report returns all samples collected.
-type concurrency_test_result() :: concurrency_result() | {Max :: concurrency_result(), [concurrency_result()]}.
%% Concurrency estimation mode result
%%
%% Extended report contains results for all runs, starting from the minimum number
%% of workers, to the highest throughput detected, plus up to `threshold' more.
-type system_information() :: #{
os := {unix | win32, atom()},
system_version := string(),
debug => boolean(), %% true if the emulator has been debug-compiled, otherwise false
emu_type => atom(), %% see system_info(emu_type), since OTP 24
emu_flavor => atom(), %% see system_info(emu_flavor), since OTP 24
dynamic_trace => atom(),%% see system_info(dynamic_trace), since OTP 24
cpu => string()
}.
%% System information, as returned by {@link erlang:system_info/1}
%% May also contain CPU model name on supported operating systems.
-type run_statistics() :: #{
average => non_neg_integer(),
variance => float(),
stddev => float(),
median => non_neg_integer(),
p99 => non_neg_integer(),
best => non_neg_integer(),
worst => non_neg_integer(),
samples => [non_neg_integer()],
time => non_neg_integer(),
iteration_time => non_neg_integer()
}.
%% Results reported by a single benchmark run.
%%
%% <ul>
%% <li>`best': highest throughput for continuous mode, or lowest time for timed</li>
%% <li>`worst': lowest throughput, or highest time</li>
%% <li>`average': arithmetic mean, iterations for continuous
%% mode and microseconds for timed</li>
%% <li>`stddev': standard deviation</li>
%% <li>`median': median (50% percentile)</li>
%% <li>`p99': 99th percentile</li>
%% <li>`samples': raw samples from the run, monotonic counter for
%% continuous mode, and times measured for timed run</li>
%% <li>`time': total benchmark duration (us), may exceed `sample' * `sample_duration'
%% when `cv' is specified and results are not immediately stable</li>
%% <li>`iteration_time': approximate single iteration time (of one runner)</li>
%% </ul>
-type report() :: #{
mode := timed | continuous | concurrency,
result := run_statistics(),
history => [{Concurrency :: pos_integer(), Result :: run_statistics()}],
code := erlperf_job:code_map(),
run_options := run_options(),
concurrency_options => concurrency_test(),
system => system_information(),
sleep => sleep | busy_wait
}.
%% Full benchmark report, containing all collected samples and statistics
%%
%% <ul>
%% <li>`mode': benchmark run mode</li>
%% <li>`result': benchmark result. Concurrency estimation mode contains
%% the best result (with the highest average throughput recorded)</li>
%% <li>`code': original code</li>
%% <li>`run_options': full set of options, with all defaults filled in</li>
%% <li>`system': information about the system benchmark is running on</li>
%% <li>`history': returned only for concurrency estimation mode,
%% contains a list of all runs with their results</li>
%% <li>`concurrency_options': returned only for concurrency estimation
%% mode, with all defaults filled in</li>
%% <li>`sleep': method used for waiting for a specified amount of time.
%% Normally set to `sleep', but may be reported as `busy_wait' if
%% `erlperf' scheduling is impacted by lock contention or another
%% problem preventing it from using precise timing</li>
%% </ul>
-export_type([code/0, isolation/0, run_options/0, concurrency_test/0, report/0,
system_information/0, run_statistics/0]).
%% Milliseconds, timeout for any remote node operation
-define(REMOTE_NODE_TIMEOUT, 10000).
%% @doc
%% Generic benchmarking suite, accepting multiple code maps, modes and options.
%%
%% `Codes' contain a list of code versions. Every element is a separate job that runs
%% in parallel with all other jobs. Same `RunOptions' are applied to all jobs.
%%
%% `ConcurrencyTestOpts' specifies options for concurrency estimation mode. Passing
%% `undefined' results in a continuous or a timed run. It is not supported to
%% run multiple jobs while doing a concurrency estimation run.
%%
%% Concurrency estimation run consists of multiple passes. First pass is done with
%% a `min' number of workers, subsequent passes are increasing concurrency by 1, until
%% `max' concurrency is reached, or total job iterations stop growing for `threshold'
%% consecutive passes. To give an example, if your code is not concurrent at all,
%% and you try to benchmark it with `threshold' set to 3, there will be 4 passes in
%% total: first with a single worker, then 3 more, demonstrating no throughput growth.
%%
%% In this mode, job is started once before the first pass. Subsequent passes only
%% change the concurrency. All other options passed in `RunOptions' are honoured. So,
%% if you set `samples' to 30, keeping default duration of a second, every single
%% pass will last for 30 seconds.
%% @end
-spec benchmark([erlperf_job:code_map()], RunOptions :: run_options(), undefined) -> run_result() | [run_result()] | [report()];
([erlperf_job:code_map()], RunOptions :: run_options(), concurrency_test()) -> concurrency_test_result() | [report()].
benchmark(Codes, #{isolation := Isolation} = RunOptions, ConcurrencyTestOpts) ->
erlang:is_alive() orelse erlang:error(not_alive),
%% isolation requested: need to rely on cluster_monitor and other distributed things.
{Peers, Nodes} = prepare_nodes(length(Codes)),
Opts = maps:remove(isolation, RunOptions),
try
%% no timeout here (except that rpc itself could time out)
Promises =
[erpc:send_request(Node, erlperf, run, [Code, Opts, ConcurrencyTestOpts])
|| {Node, Code} <- lists:zip(Nodes, Codes)],
%% now wait for everyone to respond
Reports = [erpc:receive_response(Promise) || Promise <- Promises],
%% if full reports were requested, restore isolation flag
case maps:get(report, RunOptions, basic) of
full ->
[maps:update_with(run_options, fun(RO) -> RO#{isolation => Isolation} end, Report)
|| Report <- Reports];
_ ->
Reports
end
catch
error:{exception, Reason, Stack} ->
erlang:raise(error, Reason, Stack)
after
stop_nodes(Peers, Nodes)
end;
%% foolproofing
benchmark([_, _ | _], _RunOptions, #{}) ->
erlang:error(not_supported);
%% No isolation requested.
%% This is the primary entry point for all benchmark jobs.
benchmark(Codes, RunOptions0, ConOpts0) ->
%% fill in all missing defaults
ConOpts = concurrency_mode_defaults(ConOpts0),
#{report := ReportType, priority := SetPrio} = RunOptions = run_options_defaults(RunOptions0),
%% elevate priority to reduce timer skew
PrevPriority = process_flag(priority, SetPrio),
Jobs = start_jobs(Codes, []),
{JobPids, Handles, _} = lists:unzip3(Jobs),
Reports =
try
benchmark_impl(JobPids, RunOptions, ConOpts, Handles)
after
stop_jobs(Jobs),
process_flag(priority, PrevPriority)
end,
%% generate statistical information from the samples returned
report(ReportType, Codes, Reports).
%% @doc
%% Comparison run: benchmark multiple jobs at the same time.
%%
%% A job is defined by either {@link erlperf_job:code_map()},
%% or just the runner {@link erlperf_job:callable(). callable}.
%% Example comparing {@link rand:uniform/0} %% performance
%% to {@link rand:mwc59/1}:
%% ```
%% (erlperf@ubuntu22)7> erlperf:compare([
%% {rand, uniform, []},
%% #{runner => "run(X) -> rand:mwc59(X).", init_runner => {rand, mwc59_seed, []}}
%% ], #{}).
%% [14823854,134121999]
%% '''
%%
%% See {@link benchmark/3} for `RunOptions' definition and return values.
-spec compare(Codes :: [code()], RunOptions :: run_options()) -> [run_result()] | [report()].
compare(Codes, RunOptions) ->
benchmark([code(Code) || Code <- Codes], RunOptions, undefined).
%% @doc
%% Runs a single benchmark for 3 seconds, returns average number of iterations per second.
%%
%% Accepts either a full {@link erlperf_job:code_map()}, or just the runner
%% {@link erlperf_job:callable(). callable}.
-spec run(code()) -> non_neg_integer().
run(Code) ->
[Report] = benchmark([code(Code)], #{}, undefined),
Report.
%% @doc
%% Runs a single benchmark job, returns average number of iterations per second,
%% or a full report.
%%
%% Accepts either a full {@link erlperf_job:code_map()}, or just the runner
%% {@link erlperf_job:callable(). callable}.
%% Equivalent of returning the first result of `run([Code], RunOptions)'.
-spec run(Code :: code(), RunOptions :: run_options()) -> run_result() | report().
run(Code, RunOptions) ->
[Report] = benchmark([code(Code)], RunOptions, undefined),
Report.
%% @doc
%% Concurrency estimation run, or an alias for quick benchmarking of an MFA tuple.
%%
%% Attempt to find concurrency characteristics of the runner code,
%% see {@link benchmark/3} for a detailed description. Accepts either a full
%% {@link erlperf_job:code_map()}, or just the runner
%% {@link erlperf_job:callable(). callable}.
%%
%% When `Module' and `Function' are atoms, and `Args' is a list, this call is
%% equivalent of `run(Module, Function, Args)'.
-spec run(code(), run_options(), concurrency_test()) -> concurrency_test_result() | report();
(module(), atom(), [term()]) -> QPS :: non_neg_integer().
run(Module, Function, Args) when is_atom(Module), is_atom(Function), is_list(Args) ->
%% this typo is so common that I decided to have this as an unofficial API
run({Module, Function, Args});
run(Code, RunOptions, ConTestOpts) ->
[Report] = benchmark([code(Code)], RunOptions, ConTestOpts),
Report.
%% @doc
%% Starts a new supervised job with the specified concurrency.
%%
%% Requires `erlperf' application to be running. Returns job
%% controller process identifier.
%% This function is designed for distributed benchmarking, when
%% jobs are started in different nodes, and monitored via
%% {@link erlperf_cluster_monitor}.
-spec start(code(), Concurrency :: non_neg_integer()) -> pid().
start(Code, Concurrency) ->
{ok, Job} = supervisor:start_child(erlperf_job_sup, [code(Code)]),
ok = erlperf_job:set_concurrency(Job, Concurrency),
Job.
%% @doc
%% Timed benchmarking mode. Iterates the runner code `Count' times and returns
%% elapsed time in microseconds.
%%
%% This method has lower overhead compared to continuous benchmarking. It is
%% not supported to run multiple workers in this mode.
-spec time(code(), Count :: non_neg_integer()) -> TimeUs :: non_neg_integer().
time(Code, Count) ->
[Report] = benchmark([code(Code)], #{samples => Count, sample_duration => undefined}, undefined),
Report.
%% @private
%% @doc
%% Records call trace, so it could be used to benchmark later.
%% Experimental, do not use.
-spec record(module(), atom(), non_neg_integer(), pos_integer()) ->
[[{module(), atom(), [term()]}]].
record(Module, Function, Arity, TimeMs) ->
TracerPid = spawn_link(fun rec_tracer/0),
TraceSpec = [{'_', [], []}],
MFA = {Module, Function, Arity},
erlang:trace_pattern(MFA, TraceSpec, [global]),
erlang:trace(all, true, [call, {tracer, TracerPid}]),
receive after TimeMs -> ok end,
erlang:trace(all, false, [call]),
erlang:trace_pattern(MFA, false, [global]),
TracerPid ! {stop, self()},
receive
{data, Samples} ->
Samples
end.
%% ===================================================================
%% Implementation details
concurrency_mode_defaults(undefined) ->
undefined;
concurrency_mode_defaults(ConOpts) ->
maps:merge(#{min => 1, step => 1, max => erlang:system_info(process_limit) - 1000, threshold => 3}, ConOpts).
run_options_defaults(RunOptions) ->
maps:merge(#{
concurrency => 1,
sample_duration => 1000,
warmup => 0,
samples => 3,
cv => undefined,
priority => high,
report => basic},
RunOptions).
%%===================================================================
%% Codification: translate from {M, F, A} to #{runner => ...} map
code(#{runner := _Runner} = Code) ->
Code;
code({M, F, A}) when is_atom(M), is_atom(F), is_list(A) ->
#{runner => {M, F, A}};
code(Fun) when is_function(Fun) ->
#{runner => Fun};
code(Text) when is_list(Text) ->
#{runner => Text}.
%%===================================================================
%% Benchmarking itself
%% OTP 25 support
-dialyzer({no_missing_calls, start_node/1}).
-compile({nowarn_deprecated_function, [{slave, start_link, 3}, {slave, stop, 1}]}).
-compile({nowarn_removed, [{slave, start_link, 3}, {slave, stop, 1}]}).
start_node({module, peer}) ->
{ok, _Peer, _Node} = peer:start_link(#{name => peer:random_name()});
start_node({error, nofile}) ->
OsPid = os:getpid(),
[_, HostString] = string:split(atom_to_list(node()), "@"),
Host = list_to_atom(HostString),
Args = "-setcookie " ++ atom_to_list(erlang:get_cookie()),
Uniq = erlang:unique_integer([positive]),
NodeId = list_to_atom(lists:concat(["job-", Uniq, "-", OsPid])),
{ok, Node} = slave:start_link(Host, NodeId, Args),
{ok, undefined, Node}.
prepare_nodes(HowMany) ->
%% start 'erlperf' parts on all peers
%% Cannot do this via "code:add_path" because actual *.beam files are
%% parts of the binary escript.
_ = application:load(erlperf),
{ok, ModNames} = application:get_key(erlperf, modules),
Modules = [{Mod, _Bin, _Path} = code:get_object_code(Mod) || Mod <- ModNames],
PeerPresent = code:ensure_loaded(peer),
%% start multiple nodes
lists:unzip([begin
{ok, Peer, Node} = start_node(PeerPresent),
[{module, Mod} = erpc:call(Node, code, load_binary, [Mod, Path, Bin], ?REMOTE_NODE_TIMEOUT)
|| {Mod, Bin, Path} <- Modules],
{ok, _PgPid} = erpc:call(Node, pg, start, [erlperf]),
{ok, _MonPid} = erpc:call(Node, erlperf_monitor, start, []),
{Peer, Node}
end || _ <- lists:seq(1, HowMany)]).
stop_nodes([undefined | _], Nodes) ->
[slave:stop(Node) || Node <- Nodes];
stop_nodes(Peers, _Nodes) ->
[peer:stop(Peer) || Peer <- Peers].
start_jobs([], Jobs) ->
lists:reverse(Jobs);
start_jobs([Code | Codes], Jobs) ->
try
{ok, Pid} = erlperf_job:start(Code),
Handle = erlperf_job:handle(Pid),
MonRef = monitor(process, Pid),
start_jobs(Codes, [{Pid, Handle, MonRef} | Jobs])
catch Class:Reason:Stack ->
%% stop jobs that were started
stop_jobs(Jobs),
erlang:raise(Class, Reason, Stack)
end.
stop_jobs(Jobs) ->
%% do not use gen:stop/1,2 or sys:terminate/2,3 here, as they spawn process running
%% with normal priority, and they don't get scheduled fast enough when there is severe
%% lock contention
WaitFor = [begin erlperf_job:request_stop(Pid), {Pid, Mon} end || {Pid, _, Mon} <- Jobs, is_process_alive(Pid)],
%% now wait for all monitors to fire
[receive {'DOWN', Mon, process, Pid, _R} -> ok end || {Pid, Mon} <- WaitFor].
%% Benchmark implementation. Always returns a full report (post-processing will dumb it down if needed).
%% Timed mode,backwards compatibility conversion
benchmark_impl(Jobs, #{sample_duration := undefined, samples := Samples} = RunOptions, undefined, Handles) ->
benchmark_impl(Jobs, RunOptions#{sample_duration => {timed, Samples}, samples => 1}, undefined, Handles);
%% timed mode
benchmark_impl(Jobs, #{sample_duration := {timed, Duration}, samples := Samples, warmup := Warmup} = RunOptions, undefined, _Handles) ->
Proxies = [
spawn_monitor(
fun () ->
_Discarded = [erlperf_job:measure(Job, Duration) || _ <- lists:seq(1, Warmup)],
Times = [erlperf_job:measure(Job, Duration) || _ <- lists:seq(1, Samples)],
exit({success, Times})
end)
|| Job <- Jobs],
[case Res of
{success, TimesUs} ->
#{mode => timed, result => #{samples => TimesUs}, run_options => RunOptions};
Error ->
erlang:error(Error)
end || Res <- multicall_result(Proxies, [])];
%% Continuous mode
%% QPS considered stable when:
%% * 'warmup' done
%% * 'samples' received
%% * (optional) for the last 'samples' standard deviation must not exceed 'cv'
benchmark_impl(Jobs, #{sample_duration := Interval, cv := CV, samples := SampleCount,
warmup := Warmup, concurrency := Concurrency} = RunOptions, undefined, Handles) ->
%% TODO: turn the next sequential call into a multi-call, to make warmup time fair
[ok = erlperf_job:set_concurrency(Job, Concurrency) || Job <- Jobs],
%% warmup: intended to figure out sleep method (whether to apply busy_wait immediately)
NowTime = os:system_time(millisecond),
SleepMethod = warmup(Warmup, NowTime, NowTime + Interval, Interval, sleep),
%% remember initial counters in Before
Before = [[erlperf_job:sample(Handle)] || Handle <- Handles],
StartedAt = os:system_time(millisecond),
{Samples, TimerSkew, FinishedAt} = measure_impl(Before, Handles, StartedAt, StartedAt + Interval, Interval,
SleepMethod, SampleCount, CV),
Time = FinishedAt - StartedAt,
[#{mode => continuous, result => #{samples => lists:reverse(S), time => Time * 1000},
run_options => RunOptions, sleep => TimerSkew}
|| S <- Samples];
%% squeeze test - concurrency benchmark
benchmark_impl(Jobs, RunOptions, #{min := Min} = ConOpts, Handles) ->
[estimate_concurrency(Jobs, RunOptions, ConOpts, Handles, Min, [], {0, 0})].
%% warmup procedure: figure out if sleep/4 can work without falling back to busy wait
warmup(0, _LastSampleTime, _NextSampleTime, _Interval, Method) ->
Method;
warmup(Count, LastSampleTime, NextSampleTime, Interval, Method) ->
SleepFor = NextSampleTime - LastSampleTime,
NextMethod = sleep(Method, SleepFor, NextSampleTime),
NowTime = os:system_time(millisecond),
warmup(Count - 1, NowTime, NextSampleTime + Interval, Interval, NextMethod).
%% collected all samples, CV is not defined
measure_impl(Before, _Handles, LastSampleTime, _NextSampleTime, _Interval, SleepMethod, 0, undefined) ->
{Before, SleepMethod, LastSampleTime};
%% collected all samples, but CV is defined - check whether to collect more samples
measure_impl(Before, Handles, LastSampleTime, NextSampleTime, Interval, SleepMethod, 0, CV) ->
%% Complication: some jobs may need a long time to stabilise compared to others.
%% Decision: wait for all jobs to stabilise. Stopping completed jobs skews the measurements.
case
lists:any(
fun (Samples) ->
Normal = difference(Samples),
Len = length(Normal),
Mean = lists:sum(Normal) / Len,
StdDev = math:sqrt(lists:sum([(S - Mean) * (S - Mean) || S <- Normal]) / (Len - 1)),
StdDev / Mean > CV
end, Before)
of
false ->
{Before, SleepMethod, LastSampleTime};
true ->
%% imitate queue - drop last sample, push another in the head
%% TODO: change the behaviour to return all samples in the full report
TailLess = [lists:droplast(L) || L <- Before],
measure_impl(TailLess, Handles, LastSampleTime, NextSampleTime + Interval,
Interval, SleepMethod, 1, CV)
end;
%% LastSampleTime: system time of the last sample
%% NextSampleTime: system time when to take the next sample
%% Interval: to calculate the next NextSampleTime
%% Count: how many more samples to take
%% CV: acceptable standard deviation
measure_impl(Before, Handles, LastSampleTime, NextSampleTime, Interval, SleepMethod, Count, CV) ->
SleepFor = NextSampleTime - LastSampleTime,
NextSleepMethod = sleep(SleepMethod, SleepFor, NextSampleTime),
Counts = [erlperf_job:sample(Handle) || Handle <- Handles],
NowTime = os:system_time(millisecond),
measure_impl(merge(Counts, Before), Handles, NowTime, NextSampleTime + Interval, Interval,
NextSleepMethod, Count - 1, CV).
%% ERTS real-time properties are easily broken by lock contention (e.g. ETS misuse)
%% When it happens, even the 'max' priority process may not run for an extended
%% period of time.
sleep(sleep, SleepFor, _WaitUntil) when SleepFor > 0 ->
receive
{'DOWN', _Ref, process, Pid, Reason} ->
erlang:error({benchmark, {'EXIT', Pid, Reason}})
after SleepFor ->
sleep
end;
sleep(_Mode, _SleepFor, WaitUntil) ->
busy_wait(WaitUntil).
%% When sleep detects significant difference in the actual sleep time vs. expected,
%% loop is switched to the busy wait.
%% Once switched to busy wait, erlperf stays there until the end of the test.
busy_wait(WaitUntil) ->
receive
{'DOWN', _Ref, process, Pid, Reason} ->
erlang:error({benchmark, {'EXIT', Pid, Reason}})
after 0 ->
case os:system_time(millisecond) of
Now when Now > WaitUntil ->
busy_wait;
_ ->
busy_wait(WaitUntil)
end
end.
merge([], []) ->
[];
merge([M | T], [H | T2]) ->
[[M | H] | merge(T, T2)].
difference([_]) ->
[];
difference([S, F | Tail]) ->
[F - S | difference([F | Tail])].
%% Determine maximum throughput by measuring multiple times with different concurrency.
%% Test considered complete when either:
%% * maximum number of workers reached
%% * last 'threshold' added workers did not increase throughput
estimate_concurrency(Jobs, Options, #{threshold := Threshold, step := Step, max := Max} = ConOpts, Handles, Current, History, QMax) ->
RunOptions = Options#{concurrency => Current},
[Report] = benchmark_impl(Jobs, RunOptions, undefined, Handles),
#{result := Result0} = Report,
#{samples := Samples} = Result0,
%% calculate average QPS
QPS = lists:sum(difference(Samples)) div (length(Samples) - 1),
Result = Result0#{average => QPS},
NewHistory = [{Current, Result} | History],
%% this gives us nice round numbers (eg. with step of 10, we'll have [1, 10, 20...])
Next = (Current + Step) div Step * Step,
%% test if we are at Max concurrency, or saturated the node
case maxed(QPS, Current, QMax, Threshold) of
true ->
%% QPS are either stable or decreasing, get back to the best run
#{sleep := SleepMethod} = Report,
{_BestMax, BestConcurrency} = QMax,
{BestConcurrency, BestResult} = lists:keyfind(BestConcurrency, 1, History),
#{mode => concurrency, result => BestResult, history => NewHistory, sleep => SleepMethod,
concurrency_options => ConOpts, run_options => Options#{concurrency => BestConcurrency}};
_NewQMax when Next > Max ->
#{sleep := SleepMethod} = Report,
#{mode => concurrency, result => Result, history => NewHistory, sleep => SleepMethod,
concurrency_options => ConOpts, run_options => RunOptions};
NewQMax ->
% need more workers
estimate_concurrency(Jobs, RunOptions, ConOpts, Handles, Next, NewHistory, NewQMax)
end.
maxed(QPS, Current, {Q, _}, _) when QPS > Q ->
{QPS, Current};
maxed(_, Current, {_, W}, Count) when Current - W >= Count ->
true;
maxed(_, _, QMax, _) ->
QMax.
multicall_result([], Acc) ->
lists:reverse(Acc);
multicall_result([{Pid, Ref} | Proxies], Acc) ->
receive
{'DOWN', Ref, process, Pid, Result} ->
multicall_result(Proxies, [Result | Acc])
end.
%%%===================================================================
%%% Tracer process, uses heap to store tracing information.
rec_tracer() ->
process_flag(message_queue_data, off_heap),
tracer_loop([]).
-spec tracer_loop([{module(), atom(), [term()]}]) -> ok.
tracer_loop(Trace) ->
receive
{trace, _Pid, call, MFA} ->
tracer_loop([MFA | Trace]);
{stop, Control} ->
Control ! {data, Trace},
ok
end.
%% Reporting: in full mode, add extra information (e.g. codes and statistics)
%% full report for continuous mode (needs history rewritten)
report(full, [Code], [#{mode := concurrency, history := History, result := Result, run_options := RunOpts} = Report]) ->
System = system_report(),
[Report#{system => System, code => Code,
result => process_result(Result, continuous, RunOpts, #{}),
history => [{C, process_result(R, continuous, #{report => full, concurrency => C}, #{})} || {C, R} <- History]}];
%% full reports
report(full, Codes, Reports) ->
System = system_report(),
[Report#{system => System, code => Code, result => process_result(Result, Mode, RunOptions, Report)}
|| {Code, #{mode := Mode, result := Result, run_options := RunOptions} = Report} <- lists:zip(Codes, Reports)];
report(_ReportType, _Codes, Reports) ->
[process_result(Result, Mode, RunOptions, Report)
|| #{mode := Mode, result := Result, run_options := RunOptions} = Report <-Reports].
%% Transform raw samples into requested report
process_result(#{samples := Samples}, timed, #{report := full, samples := Count,
sample_duration := {timed, Loop}}, _Report) ->
Stat = report_stats(Samples),
TotalTime = lists:sum(Samples),
Stat#{time => TotalTime, iteration_time => TotalTime * 1000 div (Count * Loop), samples => Samples};
process_result(#{samples := Samples}, timed, #{report := basic}, _Report) ->
%% timed mode, basic report
lists:sum(Samples) div length(Samples) div 1000;
process_result(#{samples := Samples}, timed, #{report := extended}, _Report) ->
%% timed mode, extended report, convert to milliseconds for backwards compatibility
[S div 1000 || S <- Samples];
process_result(#{samples := Samples, time := TimeUs}, continuous, #{report := full, concurrency := C}, _Report) ->
Stat = report_stats(difference(Samples)),
IterationTime = case lists:last(Samples) - hd(Samples) of
0 ->
infinity;
Total ->
erlang:round(TimeUs * C * 1000 div Total)
end,
Stat#{samples => Samples, time => TimeUs, iteration_time => IterationTime};
process_result(#{samples := Samples}, continuous, #{report := extended}, _Report) ->
difference(Samples);
process_result(#{samples := Samples}, continuous, #{report := basic}, _Report) ->
Diffs = difference(Samples),
lists:sum(Diffs) div length(Diffs);
process_result(#{average := Avg}, concurrency, #{report := basic, concurrency := C}, _Report) ->
{Avg, C};
process_result(#{average := Avg}, concurrency, #{report := extended, concurrency := C}, #{history := H}) ->
%% return {Best, History}
{{Avg, C}, [{A, W} || {W, #{average := A}} <- H]}.
%% @private
%% Calculates a requested statistical function over the passed samples.
%% Exported for unit-testing purposes
report_stats(Samples) ->
Sum = lists:sum(Samples),
Len = length(Samples),
Avg = Sum / Len, %% arithmetic mean
Variance = if Len =:= 0 -> 0; true -> lists:sum([(S - Avg) * (S - Avg) || S <- Samples]) / (Len - 1) end,
Sorted = lists:sort(Samples),
#{
average => Avg,
min => hd(Sorted),
max => lists:last(Sorted),
stddev => math:sqrt(Variance),
median => lists:nth(erlang:round(0.50 * Len), Sorted),
p99 => lists:nth(erlang:round(0.99 * Len), Sorted)
}.
system_report() ->
OSType = erlang:system_info(os_type),
Guaranteed = detect_feature([emu_type, emu_flavor, dynamic_trace], #{
os => OSType,
system_version => string:trim(erlang:system_info(system_version), trailing)
}),
try Guaranteed#{cpu => string:trim(detect_cpu(OSType), both)}
catch _:_ -> Guaranteed
end.
detect_feature([], System) ->
System;
detect_feature([F | T], System) ->
try detect_feature(T, System#{F => erlang:system_info(F)})
catch error:badarg -> detect_feature(T, System)
end.
detect_cpu({unix, freebsd}) ->
os:cmd("sysctl -n hw.model");
detect_cpu({unix, darwin}) ->
os:cmd("sysctl -n machdep.cpu.brand_string");
detect_cpu({unix, linux}) ->
{ok, Bin} = file:read_file("/proc/cpuinfo"),
linux_cpu_model(binary:split(Bin, <<"\n">>));
detect_cpu({win32, nt}) ->
[_, CPU] = string:split(os:cmd("WMIC CPU GET NAME"), "\n"),
CPU.
linux_cpu_model([<<"model name", Model/binary>>, _]) ->
[_, ModelName] = binary:split(Model, <<":">>),
binary_to_list(ModelName);
linux_cpu_model([_Skip, Tail]) ->
linux_cpu_model(binary:split(Tail, <<"\n">>)).