defmodule Instruments.RateTracker do
@moduledoc """
RateTracker will track how often you are reporting metrics that are not backed
by a "fast" implementation.
RateTracker is designed to catch cases where you have inadvertently reported
a metric "too" frequently, as some metrics require hitting statsd directly for
every reported value. Doing so in hot loops can result in your
application slowing significantly.
"""
@table_prefix :instruments_rate_tracker
@max_tables 128
@report_interval_ms Application.get_env(
:instruments,
:rate_tracker_report_interval,
10_000
)
@report_jitter_range_ms Application.get_env(
:instruments,
:rate_tracker_report_jitter_range,
-500..500
)
@compile {:inline, get_table_key: 2}
use GenServer
@type t :: %__MODULE__{
last_update_time: integer(),
callbacks: [callback()],
table_count: non_neg_integer()
}
@type callback :: ({String.t(), Statix.options()}, non_neg_integer() -> term())
@enforce_keys [:last_update_time, :table_count]
defstruct [
:last_update_time,
:table_count,
callbacks: []
]
def start_link(_ \\ []) do
GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
end
def init(:ok) do
table_count = :erlang.system_info(:schedulers)
for scheduler_id <- 1..table_count do
:ets.new(table_name(scheduler_id), [:named_table, :public, :set])
end
schedule_report()
{:ok,
%__MODULE__{
last_update_time: time(),
table_count: table_count,
callbacks: []
}}
end
## Public
@doc false
@spec track(iodata) :: :ok
@spec track(iodata, Statix.options()) :: :ok
def track(name, options \\ []) do
table_key = get_table_key(name, options)
:ets.update_counter(current_table(), table_key, 1, {table_key, 0})
:ok
end
@doc """
Add a callback to be notified that you are reporting a metric "too" frequently.
In order to receive notifications, you must set `:instruments` ->
`:rate_tracker_callback_threshold` to the per-second rate that you want to be
notified at. This value will be different for every system, and will require
experimentation to determine. You can use `dump_rates()` in a remote console
to see what values are currently tracked for your metrics.
This callback should be short-lived.
"""
@spec subscribe(callback()) :: :ok
def subscribe(callback) do
GenServer.cast(__MODULE__, {:subscribe, callback})
end
@doc """
Dump the currently tracked rates
"""
@spec dump_rates() :: [{{String.t(), Keyword.t()}, non_neg_integer()}]
def dump_rates() do
GenServer.call(__MODULE__, :dump_rates)
end
## GenServer callbacks
def handle_call(:dump_rates, _from, %__MODULE__{} = state) do
time_since_report = time() - state.last_update_time
rates = do_dump_rates(state, time_since_report)
{:reply, rates, state}
end
def handle_cast({:subscribe, callback}, %__MODULE__{} = state) do
state = %__MODULE__{state | callbacks: [callback | state.callbacks]}
{:noreply, state}
end
def handle_info(:report, %__MODULE__{} = state) do
report_time = time()
time_since_report = report_time - state.last_update_time
threshold = Application.get_env(:instruments, :rate_tracker_callback_threshold, nil)
# Extraordinarily unlikely to be zero, but if it is for some reason, we'll just skip this
# and let the next report get it
if time_since_report > 0 do
counts = dump_and_clear_counts(state)
do_report(state, counts, time_since_report, threshold)
end
schedule_report()
{:noreply, %__MODULE__{state | last_update_time: report_time}}
end
## Private
defp do_dump_rates(_state, 0) do
[]
end
defp do_dump_rates(state, time_since_report) do
1..state.table_count
|> Enum.flat_map(fn scheduler_id ->
scheduler_id
|> table_name()
|> :ets.tab2list()
end)
|> aggregate_stats()
|> Enum.filter(fn
{_key, 0} -> false
{_key, _rate} -> true
end)
|> Enum.map(fn {key, count} ->
{key, count / time_since_report}
end)
|> Enum.to_list()
end
defp get_table_key(name, []) do
{name, []}
end
defp get_table_key(name, options) do
case Keyword.get(options, :tags) do
[] ->
{name, options}
[_] ->
{name, options}
tags when is_list(tags) ->
{name, Keyword.replace!(options, :tags, Enum.sort(tags))}
_ ->
{name, options}
end
end
defp sample_rate_for_key({_name, opts}) do
Keyword.get(opts, :sample_rate, 1)
end
defp schedule_report() do
wait_time = @report_interval_ms + Enum.random(@report_jitter_range_ms)
Process.send_after(self(), :report, wait_time)
end
defp time() do
# Dividing so we can get the fractional part
System.monotonic_time(:microsecond) / 1_000_000
end
defp current_table() do
table_name(:erlang.system_info(:scheduler_id))
end
defp do_report(%__MODULE__{} = _state, _aggregated_counts, _time_since_report, nil) do
nil
end
defp do_report(%__MODULE__{} = state, aggregated_counts, time_since_report, threshold) do
Enum.each(aggregated_counts, fn {key, num_tracked} ->
# Sampling correction is technically approximate (we don't know if Statix or another underlying lib will report this differently)
tracked_per_second = num_tracked / time_since_report * sample_rate_for_key(key)
if tracked_per_second > threshold do
Enum.each(state.callbacks, fn callback -> callback.(key, tracked_per_second) end)
end
end)
end
defp dump_and_clear_counts(%__MODULE__{} = state) do
1..state.table_count
|> Enum.flat_map(fn scheduler_id ->
table_name = table_name(scheduler_id)
table_data = :ets.tab2list(table_name)
Enum.each(table_data, fn {key, val} ->
:ets.update_counter(table_name, key, -val)
end)
table_data
end)
|> aggregate_stats()
end
defp aggregate_stats(table_data) do
Enum.reduce(table_data, %{}, fn {key, val}, acc ->
Map.update(acc, key, val, &(&1 + val))
end)
end
for scheduler_id <- 1..@max_tables do
defp table_name(unquote(scheduler_id)) do
unquote(:"#{@table_prefix}_#{scheduler_id}")
end
end
end