Skip to main content

src/pharos@metric.erl

-module(pharos@metric).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/pharos/metric.gleam").
-export([new/2, labelled/3, with_label/3, from_measurement/1, from_sample/2]).
-export_type([metric/0]).

-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.

?MODULEDOC(
    " The canonical metric record.\n"
    "\n"
    " `Measurement` (BEAM/host) and `MetricSample` (custom probes) are the\n"
    " *typed* shapes the collection and threshold lanes work with. `Metric` is\n"
    " the *flat, transport-ready* shape the resilient pipeline buffers and ships:\n"
    " a single named scalar with labels and a capture timestamp. One typed\n"
    " measurement fans out into several `Metric`s (one per numeric field).\n"
    "\n"
    " Keeping `Metric` deliberately flat (name + value + labels + timestamp)\n"
    " means the hot buffer, spillover store, and sinks never need to know about\n"
    " the BEAM/host/probe taxonomy: they move opaque scalars.\n"
).

-type metric() :: {metric,
        binary(),
        float(),
        gleam@dict:dict(binary(), binary()),
        integer()}.

-file("src/pharos/metric.gleam", 39).
?DOC(" Build a metric with no labels, stamped with the current wall-clock time.\n").
-spec new(binary(), float()) -> metric().
new(Name, Value) ->
    {metric, Name, Value, maps:new(), pharos_ffi:now_ms()}.

-file("src/pharos/metric.gleam", 49).
?DOC(" Build a metric with labels, stamped with the current wall-clock time.\n").
-spec labelled(binary(), float(), gleam@dict:dict(binary(), binary())) -> metric().
labelled(Name, Value, Labels) ->
    {metric, Name, Value, Labels, pharos_ffi:now_ms()}.

-file("src/pharos/metric.gleam", 58).
?DOC(" Attach (or overwrite) a single label on a metric.\n").
-spec with_label(metric(), binary(), binary()) -> metric().
with_label(Metric, Key, Value) ->
    {metric,
        erlang:element(2, Metric),
        erlang:element(3, Metric),
        gleam@dict:insert(erlang:element(4, Metric), Key, Value),
        erlang:element(5, Metric)}.

-file("src/pharos/metric.gleam", 151).
-spec at(integer(), binary(), float()) -> metric().
at(Ts, Name, Value) ->
    {metric, Name, Value, maps:new(), Ts}.

-file("src/pharos/metric.gleam", 70).
?DOC(
    " Flatten a decoded `Measurement` into its constituent `Metric`s (one per\n"
    " numeric field), all sharing a single capture timestamp. Non-numeric\n"
    " measurements (`ProcessInfo`) and host probes still reporting\n"
    " `Unimplemented` yield no metrics.\n"
).
-spec from_measurement(pharos@measurement:measurement()) -> list(metric()).
from_measurement(Measurement) ->
    Ts = pharos_ffi:now_ms(),
    case Measurement of
        {beam_memory, M} ->
            [at(Ts, <<"vm.memory.total"/utf8>>, erlang:element(3, M)),
                at(Ts, <<"vm.memory.processes"/utf8>>, erlang:element(4, M)),
                at(Ts, <<"vm.memory.system"/utf8>>, erlang:element(6, M)),
                at(Ts, <<"vm.memory.binary"/utf8>>, erlang:element(9, M)),
                at(Ts, <<"vm.memory.ets"/utf8>>, erlang:element(11, M)),
                at(Ts, <<"vm.memory.atom"/utf8>>, erlang:element(7, M)),
                at(Ts, <<"vm.memory.code"/utf8>>, erlang:element(10, M))];

        {beam_run_queues, Q} ->
            [at(
                    Ts,
                    <<"vm.run_queue.total"/utf8>>,
                    erlang:float(erlang:element(2, Q))
                ),
                at(
                    Ts,
                    <<"vm.run_queue.cpu"/utf8>>,
                    erlang:float(erlang:element(3, Q))
                ),
                at(
                    Ts,
                    <<"vm.run_queue.io"/utf8>>,
                    erlang:float(erlang:element(4, Q))
                )];

        {beam_system_counts, C} ->
            [at(
                    Ts,
                    <<"vm.system.process_count"/utf8>>,
                    erlang:float(erlang:element(2, C))
                ),
                at(
                    Ts,
                    <<"vm.system.atom_count"/utf8>>,
                    erlang:float(erlang:element(3, C))
                ),
                at(
                    Ts,
                    <<"vm.system.port_count"/utf8>>,
                    erlang:float(erlang:element(4, C))
                )];

        {beam_persistent_term, P} ->
            [at(
                    Ts,
                    <<"vm.persistent_term.count"/utf8>>,
                    erlang:float(erlang:element(2, P))
                ),
                at(
                    Ts,
                    <<"vm.persistent_term.memory"/utf8>>,
                    erlang:element(4, P)
                )];

        {cluster_nodes, N} ->
            [at(
                    Ts,
                    <<"cluster.nodes.count"/utf8>>,
                    erlang:float(erlang:length(erlang:element(3, N)))
                )];

        {host_memory, H} ->
            case erlang:element(2, H) of
                unimplemented ->
                    [];

                implemented ->
                    [at(Ts, <<"host.memory.total"/utf8>>, erlang:element(4, H)),
                        at(
                            Ts,
                            <<"host.memory.used"/utf8>>,
                            erlang:element(5, H)
                        ),
                        at(
                            Ts,
                            <<"host.memory.available"/utf8>>,
                            erlang:element(6, H)
                        )]
            end;

        {host_disk, D} ->
            case erlang:element(2, D) of
                unimplemented ->
                    [];

                implemented ->
                    [at(Ts, <<"host.disk.total"/utf8>>, erlang:element(4, D)),
                        at(Ts, <<"host.disk.used"/utf8>>, erlang:element(5, D)),
                        at(
                            Ts,
                            <<"host.disk.available"/utf8>>,
                            erlang:element(6, D)
                        )]
            end;

        {host_cpu, C@1} ->
            [at(Ts, <<"host.cpu.util"/utf8>>, erlang:element(2, C@1)),
                at(Ts, <<"host.cpu.load1"/utf8>>, erlang:element(3, C@1)),
                at(Ts, <<"host.cpu.load5"/utf8>>, erlang:element(4, C@1)),
                at(Ts, <<"host.cpu.load15"/utf8>>, erlang:element(5, C@1))];

        {host_network, N@1} ->
            [at(
                    Ts,
                    <<"host.network.rx_bytes_per_sec"/utf8>>,
                    erlang:element(2, N@1)
                ),
                at(
                    Ts,
                    <<"host.network.tx_bytes_per_sec"/utf8>>,
                    erlang:element(3, N@1)
                ),
                at(
                    Ts,
                    <<"host.network.rx_packets_per_sec"/utf8>>,
                    erlang:element(4, N@1)
                ),
                at(
                    Ts,
                    <<"host.network.tx_packets_per_sec"/utf8>>,
                    erlang:element(5, N@1)
                )];

        {beam_scheduler, S} ->
            [at(Ts, <<"vm.scheduler.utilization"/utf8>>, erlang:element(2, S))];

        {beam_reductions, R} ->
            [at(
                    Ts,
                    <<"vm.reductions.count"/utf8>>,
                    erlang:float(erlang:element(2, R))
                )];

        {process_info, _} ->
            []
    end.

-file("src/pharos/metric.gleam", 139).
?DOC(
    " Flatten a custom probe `sample` (field -> value) into `Metric`s named\n"
    " `probe.<probe_id>.<field>`.\n"
).
-spec from_sample(binary(), gleam@dict:dict(binary(), float())) -> list(metric()).
from_sample(Probe_id, Sample) ->
    Ts = pharos_ffi:now_ms(),
    _pipe = maps:to_list(Sample),
    gleam@list:map(
        _pipe,
        fun(Pair) ->
            {Field, Value} = Pair,
            at(
                Ts,
                <<<<<<"probe."/utf8, Probe_id/binary>>/binary, "."/utf8>>/binary,
                    Field/binary>>,
                Value
            )
        end
    ).