defmodule PromEx.Plugins.Beam do
@moduledoc """
Telemetry metrics for the BEAM.
This plugin captures metrics regarding the Erlang Virtual Machine (i.e the BEAM). Specifically, it captures metrics
regarding the CPU topology, system limits, VM feature support, scheduler information, memory utilization, distribution
traffic, and other internal metrics.
This plugin supports the following options:
- `poll_rate`: This is option is OPTIONAL and is the rate at which poll metrics are refreshed (default is 5 seconds).
- `metric_prefix`: This option is OPTIONAL and is used to override the default metric prefix of
`[otp_app, :prom_ex, :beam]`. If this changes you will also want to set `beam_metric_prefix`
in your `dashboard_assigns` to the snakecase version of your prefix, the default
`beam_metric_prefix` is `{otp_app}_prom_ex_beam`.
- `duration_unit`: This is an OPTIONAL option and is a `Telemetry.Metrics.time_unit()`. It can be one of:
`:second | :millisecond | :microsecond | :nanosecond`. It is `:millisecond` by default.
This plugin exposes the following metric groups:
- `:beam_memory_polling_metrics`
- `:beam_internal_polling_metrics`
- `:beam_cpu_topology_manual_metrics`
- `:beam_system_limits_manual_metrics`
- `:beam_system_info_manual_metrics`
- `:beam_scheduler_manual_metrics`
To use plugin in your application, add the following to your PromEx module:
```
defmodule MyApp.PromEx do
use PromEx, otp_app: :web_app
@impl true
def plugins do
[
...
PromEx.Plugins.Beam
]
end
@impl true
def dashboards do
[
...
{:prom_ex, "beam.json"}
]
end
end
```
This plugin exposes manual metrics so be sure to configure the PromEx `:delay_manual_start` as necessary.
"""
use PromEx.Plugin
@memory_event [:prom_ex, :plugin, :beam, :memory]
@impl true
def polling_metrics(opts) do
poll_rate = Keyword.get(opts, :poll_rate, 5_000)
otp_app = Keyword.fetch!(opts, :otp_app)
metric_prefix = Keyword.get(opts, :metric_prefix, PromEx.metric_prefix(otp_app, :beam))
duration_unit = Keyword.get(opts, :duration_unit, :millisecond)
# TODO: Investigate Microstate accounting metrics
# http://erlang.org/doc/man/erlang.html#statistics_microstate_accounting
# TODO: Add a metrics group for allocators
# https://erlang.org/doc/man/erts_alloc.html
# :erlang.system_info(:allocator)
[
memory_metrics(metric_prefix, poll_rate),
mnesia_metrics(metric_prefix, poll_rate),
distribution_metrics(metric_prefix, poll_rate),
beam_internal_metrics(metric_prefix, poll_rate, duration_unit)
]
end
@impl true
def manual_metrics(opts) do
otp_app = Keyword.fetch!(opts, :otp_app)
metric_prefix = Keyword.get(opts, :metric_prefix, PromEx.metric_prefix(otp_app, :beam))
[
beam_cpu_topology_info(metric_prefix),
beam_system_limits_info(metric_prefix),
beam_system_info(metric_prefix),
beam_scheduler_info(metric_prefix)
]
end
defp distribution_metrics(_metric_prefix, poll_rate) do
Polling.build(
:beam_distribution_polling_metrics,
poll_rate,
{__MODULE__, :execute_distribution_metrics, []},
[]
)
end
defp mnesia_metrics(_metric_prefix, poll_rate) do
Polling.build(
:beam_mnesia_polling_metrics,
poll_rate,
{__MODULE__, :execute_mnesia_metrics, []},
[]
)
end
defp beam_internal_metrics(metric_prefix, poll_rate, duration_unit) do
duration_unit_plural = String.to_atom("#{duration_unit}s")
Polling.build(
:beam_internal_polling_metrics,
poll_rate,
{__MODULE__, :execute_internal_metrics, []},
[
last_value(
metric_prefix ++ [:stats, :active_task, :count],
event_name: [:prom_ex, :plugin, :beam, :active_task, :count],
description: "The number of processes and ports that are ready to run, or are currently running.",
measurement: :count,
tags: [:type]
),
last_value(
metric_prefix ++ [:stats, :run_queue, :count],
event_name: [:prom_ex, :plugin, :beam, :run_queue, :count],
description: "The number of processes and ports that are ready to run and are in the run queue.",
measurement: :count,
tags: [:type]
),
last_value(
metric_prefix ++ [:stats, :context_switch, :count],
event_name: [:prom_ex, :plugin, :beam, :context_switch, :count],
description: "The total number of context switches since the system started.",
measurement: :count
),
last_value(
metric_prefix ++ [:stats, :reduction, :count],
event_name: [:prom_ex, :plugin, :beam, :reduction, :count],
description: "The total number of reductions since the system started.",
measurement: :count
),
last_value(
metric_prefix ++ [:stats, :gc, :count],
event_name: [:prom_ex, :plugin, :beam, :gc, :count],
description: "The total number of garbage collections since the system started.",
measurement: :count
),
last_value(
metric_prefix ++ [:stats, :gc, :reclaimed, :bytes],
event_name: [:prom_ex, :plugin, :beam, :gc, :bytes_reclaimed],
description: "The total number of bytes reclaimed since the system started.",
measurement: :count,
unit: :byte
),
last_value(
metric_prefix ++ [:stats, :port_io, :byte, :count],
event_name: [:prom_ex, :plugin, :beam, :port_io, :count],
description: "The total number of bytes sent and received through ports since the system started.",
measurement: :count,
tags: [:type],
unit: :byte
),
last_value(
metric_prefix ++ [:stats, :uptime, duration_unit_plural, :count],
event_name: [:prom_ex, :plugin, :beam, :uptime, :count],
description:
"The total number of wall clock #{duration_unit_plural} that have passed since the system started.",
measurement: :count,
unit: duration_unit
),
last_value(
metric_prefix ++ [:stats, :port, :count],
event_name: [:prom_ex, :plugin, :beam, :port, :count],
description: "A count of how many ports are currently active.",
measurement: :count
),
last_value(
metric_prefix ++ [:stats, :process, :count],
event_name: [:prom_ex, :plugin, :beam, :process, :count],
description: "A count of how many Erlang processes are currently running.",
measurement: :count
),
last_value(
metric_prefix ++ [:stats, :atom, :count],
event_name: [:prom_ex, :plugin, :beam, :atom, :count],
description: "A count of how many atoms are currently allocated.",
measurement: :count
),
last_value(
metric_prefix ++ [:stats, :ets, :count],
event_name: [:prom_ex, :plugin, :beam, :ets, :count],
description: "A count of how many ETS tables currently exist.",
measurement: :count
)
]
)
end
defp beam_system_info(metric_prefix) do
Manual.build(
:beam_system_info_manual_metrics,
{__MODULE__, :execute_system_info, []},
[
last_value(
metric_prefix ++ [:system, :version, :info],
event_name: [:prom_ex, :plugin, :beam, :version],
description: "The OTP release major version.",
measurement: :version
),
last_value(
metric_prefix ++ [:system, :smp_support, :info],
event_name: [:prom_ex, :plugin, :beam, :smp_support],
description: "Whether the BEAM instance has been compiled with SMP support.",
measurement: :enabled
),
last_value(
metric_prefix ++ [:system, :jit_support, :info],
event_name: [:prom_ex, :plugin, :beam, :jit_support],
description: "Whether the BEAM instance is running with the JIT compiler.",
measurement: :enabled
),
last_value(
metric_prefix ++ [:system, :thread_support, :info],
event_name: [:prom_ex, :plugin, :beam, :thread_support],
description: "Whether the BEAM instance has been compiled with threading support.",
measurement: :enabled
),
last_value(
metric_prefix ++ [:system, :time_correction_support, :info],
event_name: [:prom_ex, :plugin, :beam, :time_correction_support],
description: "Whether the BEAM instance has time correction support.",
measurement: :enabled
),
last_value(
metric_prefix ++ [:system, :word_size_bytes, :info],
event_name: [:prom_ex, :plugin, :beam, :word_size_bytes],
description: "The size of Erlang term words in bytes.",
measurement: :size
)
]
)
end
defp beam_scheduler_info(metric_prefix) do
Manual.build(
:beam_scheduler_manual_metrics,
{__MODULE__, :execute_scheduler_info, []},
[
last_value(
metric_prefix ++ [:system, :dirty_cpu_schedulers, :info],
event_name: [:prom_ex, :plugin, :beam, :dirty_cpu_schedulers],
description: "The total number of dirty CPU scheduler threads used by the BEAM.",
measurement: :quantity
),
last_value(
metric_prefix ++ [:system, :dirty_cpu_schedulers_online, :info],
event_name: [:prom_ex, :plugin, :beam, :dirty_cpu_schedulers_online],
description: "The total number of dirty CPU schedulers that are online.",
measurement: :quantity
),
last_value(
metric_prefix ++ [:system, :dirty_io_schedulers, :info],
event_name: [:prom_ex, :plugin, :beam, :dirty_io_schedulers],
description: "The total number of dirty I/O schedulers used to execute I/O bound native functions.",
measurement: :quantity
),
last_value(
metric_prefix ++ [:system, :schedulers, :info],
event_name: [:prom_ex, :plugin, :beam, :schedulers],
description: "The number of scheduler threads in use by the BEAM.",
measurement: :quantity
),
last_value(
metric_prefix ++ [:system, :schedulers_online, :info],
event_name: [:prom_ex, :plugin, :beam, :schedulers_online],
description: "The number of scheduler threads that are online.",
measurement: :quantity
)
]
)
end
defp beam_cpu_topology_info(metric_prefix) do
Manual.build(
:beam_cpu_topology_manual_metrics,
{__MODULE__, :execute_cpu_topology_info, []},
[
last_value(
metric_prefix ++ [:system, :logical_processors, :info],
event_name: [:prom_ex, :plugin, :beam, :logical_processors],
description: "The total number of logical processors on the host machine.",
measurement: :quantity
),
last_value(
metric_prefix ++ [:system, :logical_processors_available, :info],
event_name: [:prom_ex, :plugin, :beam, :logical_processors_available],
description: "The total number of logical processors available to the BEAM.",
measurement: :quantity
),
last_value(
metric_prefix ++ [:system, :logical_processors_online, :info],
event_name: [:prom_ex, :plugin, :beam, :logical_processors_online],
description: "The total number of logical processors online on the host machine.",
measurement: :quantity
)
]
)
end
defp beam_system_limits_info(metric_prefix) do
Manual.build(
:beam_system_limits_manual_metrics,
{__MODULE__, :execute_system_limits_info, []},
[
last_value(
metric_prefix ++ [:system, :ets_limit, :info],
event_name: [:prom_ex, :plugin, :beam, :ets_limit],
description:
"The maximum number of ETS tables allowed (this is partially obsolete given that the number of ETS tables is limited by available memory).",
measurement: :limit
),
last_value(
metric_prefix ++ [:system, :port_limit, :info],
event_name: [:prom_ex, :plugin, :beam, :port_limit],
description: "The maximum number of ports that can simultaneously exist on the BEAM instance.",
measurement: :limit
),
last_value(
metric_prefix ++ [:system, :process_limit, :info],
event_name: [:prom_ex, :plugin, :beam, :process_limit],
description: "The maximum number of processes that can simultaneously exist on the BEAM instance.",
measurement: :limit
),
last_value(
metric_prefix ++ [:system, :thread_pool_size, :info],
event_name: [:prom_ex, :plugin, :beam, :thread_pool_size],
description: "The number of async threads in the async threads pool used for async driver calls.",
measurement: :size
),
last_value(
metric_prefix ++ [:system, :atom_limit, :info],
event_name: [:prom_ex, :plugin, :beam, :atom_limit],
description: "The maximum number of atoms allowed.",
measurement: :limit
)
]
)
end
defp memory_metrics(metric_prefix, poll_rate) do
Polling.build(
:beam_memory_polling_metrics,
poll_rate,
{__MODULE__, :execute_memory_metrics, []},
[
# Capture the total memory allocated to the entire Erlang VM (or BEAM for short)
last_value(
metric_prefix ++ [:memory, :allocated, :bytes],
event_name: @memory_event,
description: "The total amount of memory currently allocated.",
measurement: :total,
unit: :byte
),
# Capture the total memory allocated to atoms
last_value(
metric_prefix ++ [:memory, :atom, :total, :bytes],
event_name: @memory_event,
description: "The total amount of memory currently allocated for atoms.",
measurement: :atom,
unit: :byte
),
# Capture the total memory allocated to binaries
last_value(
metric_prefix ++ [:memory, :binary, :total, :bytes],
event_name: @memory_event,
description: "The total amount of memory currently allocated for binaries.",
measurement: :binary,
unit: :byte
),
# Capture the total memory allocated to Erlang code
last_value(
metric_prefix ++ [:memory, :code, :total, :bytes],
event_name: @memory_event,
description: "The total amount of memory currently allocated for Erlang code.",
measurement: :code,
unit: :byte
),
# Capture the total memory allocated to ETS tables
last_value(
metric_prefix ++ [:memory, :ets, :total, :bytes],
event_name: @memory_event,
description: "The total amount of memory currently allocated for ETS tables.",
measurement: :ets,
unit: :byte
),
# Capture the total memory allocated to Erlang processes
last_value(
metric_prefix ++ [:memory, :processes, :total, :bytes],
event_name: @memory_event,
description: "The total amount of memory currently allocated to Erlang processes.",
measurement: :processes,
unit: :byte
),
# Capture the total memory allocated to :persistent_term
last_value(
metric_prefix ++ [:memory, :persistent_term, :total, :bytes],
event_name: @memory_event,
description: "The total amount of memory currently allocated to Erlang :persistent_term.",
measurement: :persistent_term,
unit: :byte
)
]
)
end
@doc false
def execute_memory_metrics do
%{memory: persistent_term_memory} = :persistent_term.info()
memory_measurements =
:erlang.memory()
|> Map.new()
|> Map.put(:persistent_term, persistent_term_memory)
:telemetry.execute(@memory_event, memory_measurements, %{})
end
@doc false
def execute_distribution_metrics do
end
@doc false
def execute_internal_metrics do
total_active_tasks = :erlang.statistics(:total_active_tasks)
total_active_tasks_all = :erlang.statistics(:total_active_tasks_all)
total_run_queue_lengths = :erlang.statistics(:total_run_queue_lengths)
total_run_queue_lengths_all = :erlang.statistics(:total_run_queue_lengths_all)
dirty_active_tasks = total_active_tasks_all - total_active_tasks
dirty_run_queue_lengths = total_run_queue_lengths_all - total_run_queue_lengths
{context_switches, _} = :erlang.statistics(:context_switches)
{total_reductions, _} = :erlang.statistics(:reductions)
word_size = :erlang.system_info(:wordsize)
{number_of_gcs, words_reclaimed, _} = :erlang.statistics(:garbage_collection)
bytes_reclaimed = words_reclaimed * word_size
{{:input, input_port_bytes}, {:output, output_port_bytes}} = :erlang.statistics(:io)
{wall_clock_time, _} = :erlang.statistics(:wall_clock)
:telemetry.execute([:prom_ex, :plugin, :beam, :port, :count], %{count: :erlang.system_info(:port_count)})
:telemetry.execute([:prom_ex, :plugin, :beam, :process, :count], %{count: :erlang.system_info(:process_count)})
:telemetry.execute([:prom_ex, :plugin, :beam, :atom, :count], %{count: :erlang.system_info(:atom_count)})
:telemetry.execute([:prom_ex, :plugin, :beam, :ets, :count], %{count: :erlang.system_info(:ets_count)})
:telemetry.execute([:prom_ex, :plugin, :beam, :active_task, :count], %{count: total_active_tasks}, %{type: :normal})
:telemetry.execute([:prom_ex, :plugin, :beam, :active_task, :count], %{count: dirty_active_tasks}, %{type: :dirty})
:telemetry.execute([:prom_ex, :plugin, :beam, :context_switch, :count], %{count: context_switches})
:telemetry.execute([:prom_ex, :plugin, :beam, :reduction, :count], %{count: total_reductions})
:telemetry.execute([:prom_ex, :plugin, :beam, :gc, :count], %{count: number_of_gcs})
:telemetry.execute([:prom_ex, :plugin, :beam, :gc, :bytes_reclaimed], %{count: bytes_reclaimed})
:telemetry.execute([:prom_ex, :plugin, :beam, :port_io, :count], %{count: input_port_bytes}, %{type: :input})
:telemetry.execute([:prom_ex, :plugin, :beam, :port_io, :count], %{count: output_port_bytes}, %{type: :output})
:telemetry.execute([:prom_ex, :plugin, :beam, :uptime, :count], %{count: wall_clock_time})
:telemetry.execute([:prom_ex, :plugin, :beam, :run_queue, :count], %{count: total_run_queue_lengths}, %{
type: :normal
})
:telemetry.execute([:prom_ex, :plugin, :beam, :run_queue, :count], %{count: dirty_run_queue_lengths}, %{
type: :dirty
})
end
@doc false
def execute_mnesia_metrics do
# https://github.com/deadtrickster/prometheus.erl/blob/master/src/collectors/mnesia/prometheus_mnesia_collector.erl
end
@doc false
def execute_system_limits_info do
:telemetry.execute([:prom_ex, :plugin, :beam, :ets_limit], %{limit: :erlang.system_info(:ets_limit)})
:telemetry.execute([:prom_ex, :plugin, :beam, :port_limit], %{limit: :erlang.system_info(:port_limit)})
:telemetry.execute([:prom_ex, :plugin, :beam, :process_limit], %{limit: :erlang.system_info(:process_limit)})
:telemetry.execute(
[:prom_ex, :plugin, :beam, :thread_pool_size],
%{size: :erlang.system_info(:thread_pool_size)},
%{}
)
:telemetry.execute([:prom_ex, :plugin, :beam, :atom_limit], %{limit: :erlang.system_info(:atom_limit)}, %{})
end
@doc false
def execute_system_info do
smp_enabled = if(:erlang.system_info(:smp_support), do: 1, else: 0)
thread_support_enabled = if(:erlang.system_info(:threads), do: 1, else: 0)
time_correction_enabled = if(:erlang.system_info(:time_correction), do: 1, else: 0)
word_size = :erlang.system_info(:wordsize)
version = :otp_release |> :erlang.system_info() |> :erlang.list_to_binary() |> String.to_integer()
jit_enabled =
try do
if :erlang.system_info(:emu_flavor) == :jit, do: 1, else: 0
rescue
_error ->
0
end
:telemetry.execute([:prom_ex, :plugin, :beam, :smp_support], %{enabled: smp_enabled}, %{})
:telemetry.execute([:prom_ex, :plugin, :beam, :jit_support], %{enabled: jit_enabled}, %{})
:telemetry.execute([:prom_ex, :plugin, :beam, :thread_support], %{enabled: thread_support_enabled}, %{})
:telemetry.execute([:prom_ex, :plugin, :beam, :time_correction_support], %{enabled: time_correction_enabled}, %{})
:telemetry.execute([:prom_ex, :plugin, :beam, :word_size_bytes], %{size: word_size}, %{})
:telemetry.execute([:prom_ex, :plugin, :beam, :version], %{version: version}, %{})
end
@doc false
def execute_cpu_topology_info do
:telemetry.execute(
[:prom_ex, :plugin, :beam, :logical_processors],
%{quantity: :erlang.system_info(:logical_processors)},
%{}
)
:telemetry.execute(
[:prom_ex, :plugin, :beam, :logical_processors_available],
%{quantity: :erlang.system_info(:logical_processors_available)},
%{}
)
:telemetry.execute(
[:prom_ex, :plugin, :beam, :logical_processors_online],
%{quantity: :erlang.system_info(:logical_processors_online)},
%{}
)
end
@doc false
def execute_scheduler_info do
:telemetry.execute(
[:prom_ex, :plugin, :beam, :dirty_cpu_schedulers],
%{quantity: :erlang.system_info(:dirty_cpu_schedulers)},
%{}
)
:telemetry.execute(
[:prom_ex, :plugin, :beam, :dirty_cpu_schedulers_online],
%{quantity: :erlang.system_info(:dirty_cpu_schedulers_online)},
%{}
)
:telemetry.execute(
[:prom_ex, :plugin, :beam, :dirty_io_schedulers],
%{quantity: :erlang.system_info(:dirty_io_schedulers)},
%{}
)
:telemetry.execute(
[:prom_ex, :plugin, :beam, :schedulers],
%{quantity: :erlang.system_info(:schedulers)},
%{}
)
:telemetry.execute(
[:prom_ex, :plugin, :beam, :schedulers_online],
%{quantity: :erlang.system_info(:schedulers_online)},
%{}
)
end
end