-module(pharos@measurement).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/pharos/measurement.gleam").
-export([decode_with_unit/2, decode/1]).
-export_type([memory_unit/0, telemetry_event/0, beam_memory_stats/0, run_queue_stats/0, system_count_stats/0, persistent_term_stats/0, process_info_stats/0, cluster_node_stats/0, host_probe_status/0, host_memory_stats/0, host_disk_stats/0, host_cpu_stats/0, host_network_stats/0, scheduler_stats/0, reduction_stats/0, measurement/0]).
-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.
?MODULEDOC(
" Decoded measurement values.\n"
"\n"
" `telemetry_poller` and pharos's custom emitters dispatch raw telemetry\n"
" events; `decode/2` lifts them into typed `Measurement` records. Memory\n"
" values arrive in bytes and are scaled to the configured `MemoryUnit`.\n"
).
-type memory_unit() :: kb | mb | gb.
-type telemetry_event() :: {telemetry_event,
list(gleam@erlang@atom:atom_()),
gleam@dict:dict(gleam@erlang@atom:atom_(), gleam@dynamic:dynamic_()),
gleam@dict:dict(gleam@erlang@atom:atom_(), gleam@dynamic:dynamic_())}.
-type beam_memory_stats() :: {beam_memory_stats,
memory_unit(),
float(),
float(),
float(),
float(),
float(),
float(),
float(),
float(),
float()}.
-type run_queue_stats() :: {run_queue_stats, integer(), integer(), integer()}.
-type system_count_stats() :: {system_count_stats,
integer(),
integer(),
integer(),
integer(),
integer(),
integer()}.
-type persistent_term_stats() :: {persistent_term_stats,
integer(),
memory_unit(),
float()}.
-type process_info_stats() :: {process_info_stats,
gleam@erlang@atom:atom_(),
gleam@dict:dict(gleam@erlang@atom:atom_(), gleam@dynamic:dynamic_())}.
-type cluster_node_stats() :: {cluster_node_stats, binary(), list(binary())}.
-type host_probe_status() :: implemented | unimplemented.
-type host_memory_stats() :: {host_memory_stats,
host_probe_status(),
memory_unit(),
float(),
float(),
float()}.
-type host_disk_stats() :: {host_disk_stats,
host_probe_status(),
memory_unit(),
float(),
float(),
float()}.
-type host_cpu_stats() :: {host_cpu_stats, float(), float(), float(), float()}.
-type host_network_stats() :: {host_network_stats,
float(),
float(),
float(),
float()}.
-type scheduler_stats() :: {scheduler_stats, float()}.
-type reduction_stats() :: {reduction_stats, integer()}.
-type measurement() :: {beam_memory, beam_memory_stats()} |
{beam_run_queues, run_queue_stats()} |
{beam_system_counts, system_count_stats()} |
{beam_persistent_term, persistent_term_stats()} |
{process_info, process_info_stats()} |
{cluster_nodes, cluster_node_stats()} |
{host_memory, host_memory_stats()} |
{host_disk, host_disk_stats()} |
{host_cpu, host_cpu_stats()} |
{host_network, host_network_stats()} |
{beam_scheduler, scheduler_stats()} |
{beam_reductions, reduction_stats()}.
-file("src/pharos/measurement.gleam", 261).
-spec to_unit(integer(), memory_unit()) -> float().
to_unit(Bytes, Unit) ->
As_float = erlang:float(Bytes),
case Unit of
kb ->
As_float / 1024.0;
mb ->
As_float / 1048576.0;
gb ->
As_float / 1073741824.0
end.
-file("src/pharos/measurement.gleam", 347).
-spec decode_process_info(
gleam@dict:dict(gleam@erlang@atom:atom_(), gleam@dynamic:dynamic_()),
gleam@dict:dict(gleam@erlang@atom:atom_(), gleam@dynamic:dynamic_())
) -> {ok, process_info_stats()} | {error, binary()}.
decode_process_info(Measurements, Metadata) ->
case gleam_stdlib:map_get(Metadata, erlang:binary_to_atom(<<"name"/utf8>>)) of
{error, nil} ->
{error, <<"process_info event missing 'name' in metadata"/utf8>>};
{ok, Dynamic_value} ->
_pipe = gleam@dynamic@decode:run(
Dynamic_value,
gleam@erlang@atom:decoder()
),
_pipe@1 = gleam@result:map(
_pipe,
fun(Name) -> {process_info_stats, Name, Measurements} end
),
gleam@result:map_error(
_pipe@1,
fun(_) ->
<<"expected atom for 'name' in process_info metadata"/utf8>>
end
)
end.
-file("src/pharos/measurement.gleam", 232).
-spec get_int(
gleam@dict:dict(gleam@erlang@atom:atom_(), gleam@dynamic:dynamic_()),
binary()
) -> {ok, integer()} | {error, binary()}.
get_int(Measurements, Key) ->
case gleam_stdlib:map_get(Measurements, erlang:binary_to_atom(Key)) of
{error, nil} ->
{error, <<"missing key: "/utf8, Key/binary>>};
{ok, Dynamic_value} ->
_pipe = gleam@dynamic@decode:run(
Dynamic_value,
{decoder, fun gleam@dynamic@decode:decode_int/1}
),
gleam@result:map_error(
_pipe,
fun(_) -> <<"expected int for key: "/utf8, Key/binary>> end
)
end.
-file("src/pharos/measurement.gleam", 440).
-spec decode_reductions(
gleam@dict:dict(gleam@erlang@atom:atom_(), gleam@dynamic:dynamic_())
) -> {ok, reduction_stats()} | {error, binary()}.
decode_reductions(Measurements) ->
gleam@result:'try'(
get_int(Measurements, <<"count"/utf8>>),
fun(Count) -> {ok, {reduction_stats, Count}} end
).
-file("src/pharos/measurement.gleam", 246).
?DOC(
" Decode a numeric measurement as a `Float`, accepting an integer (widened)\n"
" or a float. Used for rates / percentages that may arrive as either.\n"
).
-spec get_number(
gleam@dict:dict(gleam@erlang@atom:atom_(), gleam@dynamic:dynamic_()),
binary()
) -> {ok, float()} | {error, binary()}.
get_number(Measurements, Key) ->
case gleam_stdlib:map_get(Measurements, erlang:binary_to_atom(Key)) of
{error, nil} ->
{error, <<"missing key: "/utf8, Key/binary>>};
{ok, Dynamic_value} ->
_pipe@1 = gleam@dynamic@decode:run(
Dynamic_value,
gleam@dynamic@decode:one_of(
{decoder, fun gleam@dynamic@decode:decode_float/1},
[begin
_pipe = {decoder,
fun gleam@dynamic@decode:decode_int/1},
gleam@dynamic@decode:map(_pipe, fun erlang:float/1)
end]
)
),
gleam@result:map_error(
_pipe@1,
fun(_) -> <<"expected number for key: "/utf8, Key/binary>> end
)
end.
-file("src/pharos/measurement.gleam", 433).
-spec decode_scheduler(
gleam@dict:dict(gleam@erlang@atom:atom_(), gleam@dynamic:dynamic_())
) -> {ok, scheduler_stats()} | {error, binary()}.
decode_scheduler(Measurements) ->
gleam@result:'try'(
get_number(Measurements, <<"utilization"/utf8>>),
fun(Utilization) -> {ok, {scheduler_stats, Utilization}} end
).
-file("src/pharos/measurement.gleam", 418).
-spec decode_host_network(
gleam@dict:dict(gleam@erlang@atom:atom_(), gleam@dynamic:dynamic_())
) -> {ok, host_network_stats()} | {error, binary()}.
decode_host_network(Measurements) ->
gleam@result:'try'(
get_number(Measurements, <<"rx_bytes_per_sec"/utf8>>),
fun(Rx_bytes) ->
gleam@result:'try'(
get_number(Measurements, <<"tx_bytes_per_sec"/utf8>>),
fun(Tx_bytes) ->
gleam@result:'try'(
get_number(Measurements, <<"rx_packets_per_sec"/utf8>>),
fun(Rx_packets) ->
gleam@result:'try'(
get_number(
Measurements,
<<"tx_packets_per_sec"/utf8>>
),
fun(Tx_packets) ->
{ok,
{host_network_stats,
Rx_bytes,
Tx_bytes,
Rx_packets,
Tx_packets}}
end
)
end
)
end
)
end
).
-file("src/pharos/measurement.gleam", 408).
-spec decode_host_cpu(
gleam@dict:dict(gleam@erlang@atom:atom_(), gleam@dynamic:dynamic_())
) -> {ok, host_cpu_stats()} | {error, binary()}.
decode_host_cpu(Measurements) ->
gleam@result:'try'(
get_number(Measurements, <<"util"/utf8>>),
fun(Util) ->
gleam@result:'try'(
get_number(Measurements, <<"load1"/utf8>>),
fun(Load1) ->
gleam@result:'try'(
get_number(Measurements, <<"load5"/utf8>>),
fun(Load5) ->
gleam@result:'try'(
get_number(Measurements, <<"load15"/utf8>>),
fun(Load15) ->
{ok,
{host_cpu_stats,
Util,
Load1,
Load5,
Load15}}
end
)
end
)
end
)
end
).
-file("src/pharos/measurement.gleam", 270).
-spec get_bytes(
gleam@dict:dict(gleam@erlang@atom:atom_(), gleam@dynamic:dynamic_()),
binary(),
memory_unit()
) -> {ok, float()} | {error, binary()}.
get_bytes(Measurements, Key, Unit) ->
gleam@result:'try'(
get_int(Measurements, Key),
fun(Bytes) -> {ok, to_unit(Bytes, Unit)} end
).
-file("src/pharos/measurement.gleam", 447).
-spec decode_status(
gleam@dict:dict(gleam@erlang@atom:atom_(), gleam@dynamic:dynamic_())
) -> host_probe_status().
decode_status(Metadata) ->
case gleam_stdlib:map_get(
Metadata,
erlang:binary_to_atom(<<"status"/utf8>>)
) of
{ok, Dynamic_value} ->
case gleam@dynamic@decode:run(
Dynamic_value,
gleam@erlang@atom:decoder()
) of
{ok, Value} ->
case erlang:atom_to_binary(Value) of
<<"unimplemented"/utf8>> ->
unimplemented;
_ ->
implemented
end;
{error, _} ->
implemented
end;
{error, nil} ->
implemented
end.
-file("src/pharos/measurement.gleam", 390).
-spec decode_host_disk(
gleam@dict:dict(gleam@erlang@atom:atom_(), gleam@dynamic:dynamic_()),
gleam@dict:dict(gleam@erlang@atom:atom_(), gleam@dynamic:dynamic_()),
memory_unit()
) -> {ok, host_disk_stats()} | {error, binary()}.
decode_host_disk(Measurements, Metadata, Unit) ->
Status = decode_status(Metadata),
gleam@result:'try'(
get_bytes(Measurements, <<"total_bytes"/utf8>>, Unit),
fun(Total) ->
gleam@result:'try'(
get_bytes(Measurements, <<"used_bytes"/utf8>>, Unit),
fun(Used) ->
gleam@result:'try'(
get_bytes(
Measurements,
<<"available_bytes"/utf8>>,
Unit
),
fun(Available) ->
{ok,
{host_disk_stats,
Status,
Unit,
Total,
Used,
Available}}
end
)
end
)
end
).
-file("src/pharos/measurement.gleam", 372).
-spec decode_host_memory(
gleam@dict:dict(gleam@erlang@atom:atom_(), gleam@dynamic:dynamic_()),
gleam@dict:dict(gleam@erlang@atom:atom_(), gleam@dynamic:dynamic_()),
memory_unit()
) -> {ok, host_memory_stats()} | {error, binary()}.
decode_host_memory(Measurements, Metadata, Unit) ->
Status = decode_status(Metadata),
gleam@result:'try'(
get_bytes(Measurements, <<"total_bytes"/utf8>>, Unit),
fun(Total) ->
gleam@result:'try'(
get_bytes(Measurements, <<"used_bytes"/utf8>>, Unit),
fun(Used) ->
gleam@result:'try'(
get_bytes(
Measurements,
<<"available_bytes"/utf8>>,
Unit
),
fun(Available) ->
{ok,
{host_memory_stats,
Status,
Unit,
Total,
Used,
Available}}
end
)
end
)
end
).
-file("src/pharos/measurement.gleam", 474).
-spec get_string_list(
gleam@dict:dict(gleam@erlang@atom:atom_(), gleam@dynamic:dynamic_()),
binary()
) -> {ok, list(binary())} | {error, binary()}.
get_string_list(Values, Key) ->
case gleam_stdlib:map_get(Values, erlang:binary_to_atom(Key)) of
{error, nil} ->
{error, <<"missing key: "/utf8, Key/binary>>};
{ok, Dynamic_value} ->
_pipe = gleam@dynamic@decode:run(
Dynamic_value,
gleam@dynamic@decode:list(
{decoder, fun gleam@dynamic@decode:decode_string/1}
)
),
gleam@result:map_error(
_pipe,
fun(_) ->
<<"expected list of strings for key: "/utf8, Key/binary>>
end
)
end.
-file("src/pharos/measurement.gleam", 462).
-spec get_string(
gleam@dict:dict(gleam@erlang@atom:atom_(), gleam@dynamic:dynamic_()),
binary()
) -> {ok, binary()} | {error, binary()}.
get_string(Values, Key) ->
case gleam_stdlib:map_get(Values, erlang:binary_to_atom(Key)) of
{error, nil} ->
{error, <<"missing key: "/utf8, Key/binary>>};
{ok, Dynamic_value} ->
_pipe = gleam@dynamic@decode:run(
Dynamic_value,
{decoder, fun gleam@dynamic@decode:decode_string/1}
),
gleam@result:map_error(
_pipe,
fun(_) -> <<"expected string for key: "/utf8, Key/binary>> end
)
end.
-file("src/pharos/measurement.gleam", 364).
-spec decode_cluster_nodes(
gleam@dict:dict(gleam@erlang@atom:atom_(), gleam@dynamic:dynamic_())
) -> {ok, cluster_node_stats()} | {error, binary()}.
decode_cluster_nodes(Metadata) ->
gleam@result:'try'(
get_string(Metadata, <<"self"/utf8>>),
fun(Self) ->
gleam@result:'try'(
get_string_list(Metadata, <<"nodes"/utf8>>),
fun(Nodes) -> {ok, {cluster_node_stats, Self, Nodes}} end
)
end
).
-file("src/pharos/measurement.gleam", 338).
-spec decode_persistent_term(
gleam@dict:dict(gleam@erlang@atom:atom_(), gleam@dynamic:dynamic_()),
memory_unit()
) -> {ok, persistent_term_stats()} | {error, binary()}.
decode_persistent_term(Measurements, Unit) ->
gleam@result:'try'(
get_int(Measurements, <<"count"/utf8>>),
fun(Count) ->
gleam@result:'try'(
get_bytes(Measurements, <<"memory"/utf8>>, Unit),
fun(Memory) ->
{ok, {persistent_term_stats, Count, Unit, Memory}}
end
)
end
).
-file("src/pharos/measurement.gleam", 319).
-spec decode_system_counts(
gleam@dict:dict(gleam@erlang@atom:atom_(), gleam@dynamic:dynamic_())
) -> {ok, system_count_stats()} | {error, binary()}.
decode_system_counts(Measurements) ->
gleam@result:'try'(
get_int(Measurements, <<"process_count"/utf8>>),
fun(Process_count) ->
gleam@result:'try'(
get_int(Measurements, <<"atom_count"/utf8>>),
fun(Atom_count) ->
gleam@result:'try'(
get_int(Measurements, <<"port_count"/utf8>>),
fun(Port_count) ->
gleam@result:'try'(
get_int(Measurements, <<"process_limit"/utf8>>),
fun(Process_limit) ->
gleam@result:'try'(
get_int(
Measurements,
<<"atom_limit"/utf8>>
),
fun(Atom_limit) ->
gleam@result:'try'(
get_int(
Measurements,
<<"port_limit"/utf8>>
),
fun(Port_limit) ->
{ok,
{system_count_stats,
Process_count,
Atom_count,
Port_count,
Process_limit,
Atom_limit,
Port_limit}}
end
)
end
)
end
)
end
)
end
)
end
).
-file("src/pharos/measurement.gleam", 310).
-spec decode_run_queues(
gleam@dict:dict(gleam@erlang@atom:atom_(), gleam@dynamic:dynamic_())
) -> {ok, run_queue_stats()} | {error, binary()}.
decode_run_queues(Measurements) ->
gleam@result:'try'(
get_int(Measurements, <<"total"/utf8>>),
fun(Total) ->
gleam@result:'try'(
get_int(Measurements, <<"cpu"/utf8>>),
fun(Cpu) ->
gleam@result:'try'(
get_int(Measurements, <<"io"/utf8>>),
fun(Io) -> {ok, {run_queue_stats, Total, Cpu, Io}} end
)
end
)
end
).
-file("src/pharos/measurement.gleam", 279).
-spec decode_beam_memory(
gleam@dict:dict(gleam@erlang@atom:atom_(), gleam@dynamic:dynamic_()),
memory_unit()
) -> {ok, beam_memory_stats()} | {error, binary()}.
decode_beam_memory(Measurements, Unit) ->
gleam@result:'try'(
get_bytes(Measurements, <<"total"/utf8>>, Unit),
fun(Total) ->
gleam@result:'try'(
get_bytes(Measurements, <<"processes"/utf8>>, Unit),
fun(Processes) ->
gleam@result:'try'(
get_bytes(Measurements, <<"processes_used"/utf8>>, Unit),
fun(Processes_used) ->
gleam@result:'try'(
get_bytes(Measurements, <<"system"/utf8>>, Unit),
fun(System) ->
gleam@result:'try'(
get_bytes(
Measurements,
<<"atom"/utf8>>,
Unit
),
fun(Atom_value) ->
gleam@result:'try'(
get_bytes(
Measurements,
<<"atom_used"/utf8>>,
Unit
),
fun(Atom_used) ->
gleam@result:'try'(
get_bytes(
Measurements,
<<"binary"/utf8>>,
Unit
),
fun(Binary) ->
gleam@result:'try'(
get_bytes(
Measurements,
<<"code"/utf8>>,
Unit
),
fun(Code) ->
gleam@result:'try'(
get_bytes(
Measurements,
<<"ets"/utf8>>,
Unit
),
fun(Ets) ->
{ok,
{beam_memory_stats,
Unit,
Total,
Processes,
Processes_used,
System,
Atom_value,
Atom_used,
Binary,
Code,
Ets}}
end
)
end
)
end
)
end
)
end
)
end
)
end
)
end
)
end
).
-file("src/pharos/measurement.gleam", 173).
?DOC(
" Decode a `TelemetryEvent` into a typed `Measurement`, scaling any memory\n"
" values to `unit`.\n"
).
-spec decode_with_unit(telemetry_event(), memory_unit()) -> {ok, measurement()} |
{error, binary()}.
decode_with_unit(Event, Unit) ->
case gleam@list:map(erlang:element(2, Event), fun erlang:atom_to_binary/1) of
[<<"vm"/utf8>>, <<"memory"/utf8>>] ->
_pipe = decode_beam_memory(erlang:element(3, Event), Unit),
gleam@result:map(_pipe, fun(Field@0) -> {beam_memory, Field@0} end);
[<<"vm"/utf8>>, <<"total_run_queue_lengths"/utf8>>] ->
_pipe@1 = decode_run_queues(erlang:element(3, Event)),
gleam@result:map(
_pipe@1,
fun(Field@0) -> {beam_run_queues, Field@0} end
);
[<<"vm"/utf8>>, <<"system_counts"/utf8>>] ->
_pipe@2 = decode_system_counts(erlang:element(3, Event)),
gleam@result:map(
_pipe@2,
fun(Field@0) -> {beam_system_counts, Field@0} end
);
[<<"vm"/utf8>>, <<"persistent_term"/utf8>>] ->
_pipe@3 = decode_persistent_term(erlang:element(3, Event), Unit),
gleam@result:map(
_pipe@3,
fun(Field@0) -> {beam_persistent_term, Field@0} end
);
[<<"pharos"/utf8>>, <<"cluster"/utf8>>, <<"nodes"/utf8>>] ->
_pipe@4 = decode_cluster_nodes(erlang:element(4, Event)),
gleam@result:map(
_pipe@4,
fun(Field@0) -> {cluster_nodes, Field@0} end
);
[<<"pharos"/utf8>>, <<"host"/utf8>>, <<"memory"/utf8>>] ->
_pipe@5 = decode_host_memory(
erlang:element(3, Event),
erlang:element(4, Event),
Unit
),
gleam@result:map(
_pipe@5,
fun(Field@0) -> {host_memory, Field@0} end
);
[<<"pharos"/utf8>>, <<"host"/utf8>>, <<"disk"/utf8>>] ->
_pipe@6 = decode_host_disk(
erlang:element(3, Event),
erlang:element(4, Event),
Unit
),
gleam@result:map(_pipe@6, fun(Field@0) -> {host_disk, Field@0} end);
[<<"pharos"/utf8>>, <<"host"/utf8>>, <<"cpu"/utf8>>] ->
_pipe@7 = decode_host_cpu(erlang:element(3, Event)),
gleam@result:map(_pipe@7, fun(Field@0) -> {host_cpu, Field@0} end);
[<<"pharos"/utf8>>, <<"host"/utf8>>, <<"network"/utf8>>] ->
_pipe@8 = decode_host_network(erlang:element(3, Event)),
gleam@result:map(
_pipe@8,
fun(Field@0) -> {host_network, Field@0} end
);
[<<"pharos"/utf8>>, <<"vm"/utf8>>, <<"scheduler"/utf8>>] ->
_pipe@9 = decode_scheduler(erlang:element(3, Event)),
gleam@result:map(
_pipe@9,
fun(Field@0) -> {beam_scheduler, Field@0} end
);
[<<"pharos"/utf8>>, <<"vm"/utf8>>, <<"reductions"/utf8>>] ->
_pipe@10 = decode_reductions(erlang:element(3, Event)),
gleam@result:map(
_pipe@10,
fun(Field@0) -> {beam_reductions, Field@0} end
);
_ ->
_pipe@11 = decode_process_info(
erlang:element(3, Event),
erlang:element(4, Event)
),
gleam@result:map(
_pipe@11,
fun(Field@0) -> {process_info, Field@0} end
)
end.
-file("src/pharos/measurement.gleam", 167).
?DOC(
" Decode a `TelemetryEvent` into a typed `Measurement`, scaling any memory\n"
" values to `Mb` by default.\n"
).
-spec decode(telemetry_event()) -> {ok, measurement()} | {error, binary()}.
decode(Event) ->
decode_with_unit(Event, mb).