-module(pharos@internal@telemetry).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/pharos/internal/telemetry.gleam").
-export([attach/6, detach/1]).
-export_type([attach_error/0, detach_error/0, telemetry_event/0]).
-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.
?MODULEDOC(false).
-type attach_error() :: handler_already_attached | {attach_failed, binary()}.
-type detach_error() :: handler_not_found | {detach_failed, binary()}.
-type telemetry_event() :: {telemetry_event,
list(gleam@erlang@atom:atom_()),
gleam@dict:dict(gleam@erlang@atom:atom_(), gleam@dynamic:dynamic_()),
gleam@dict:dict(gleam@erlang@atom:atom_(), gleam@dynamic:dynamic_())}.
-file("src/pharos/internal/telemetry.gleam", 123).
?DOC(false).
-spec route(
pharos@measurement:measurement(),
list({pharos@config:threshold(), pharos@alert_manager:alert_manager()})
) -> nil.
route(Decoded_measurement, Managers) ->
gleam@list:each(
Managers,
fun(Pair) ->
{Threshold, Manager} = Pair,
case Threshold of
{windowed, Over, _, _} ->
case pharos@threshold_eval:sample_value(
Decoded_measurement,
Over
) of
{ok, Value} ->
pharos@alert_manager:sample(Manager, Value);
{error, nil} ->
nil
end;
_ ->
case pharos@threshold_eval:evaluate(
Decoded_measurement,
Threshold
) of
not_applicable ->
nil;
breached ->
pharos@alert_manager:breach(Manager);
healthy ->
pharos@alert_manager:recover(Manager)
end
end
end
).
-file("src/pharos/internal/telemetry.gleam", 107).
?DOC(false).
-spec buffer_metrics(
pharos@internal@hot_buffer:hot_buffer(),
list(pharos@metric:metric())
) -> nil.
buffer_metrics(Buffer, Metrics) ->
gleam@list:each(
Metrics,
fun(M) ->
_ = pharos@internal@hot_buffer:push(Buffer, M),
nil
end
).
-file("src/pharos/internal/telemetry.gleam", 160).
?DOC(false).
-spec route_probe_sample(
pharos@probe:probe(),
gleam@dict:dict(binary(), float()),
list({pharos@probe:probe_threshold(), pharos@alert_manager:alert_manager()})
) -> nil.
route_probe_sample(Definition, Sample, Probe_managers) ->
gleam@list:each(
Probe_managers,
fun(Pair) ->
{Threshold, Manager} = Pair,
case erlang:element(2, Threshold) =:= pharos@probe:id(Definition) of
false ->
nil;
true ->
case pharos@threshold_eval:evaluate_probe(Sample, Threshold) of
not_applicable ->
nil;
breached ->
pharos@alert_manager:breach(Manager);
healthy ->
pharos@alert_manager:recover(Manager)
end
end
end
).
-file("src/pharos/internal/telemetry.gleam", 149).
?DOC(false).
-spec find_probe(list(pharos@probe:probe()), list(gleam@erlang@atom:atom_())) -> {ok,
pharos@probe:probe()} |
{error, nil}.
find_probe(Probes, Event_name) ->
gleam@list:find(
Probes,
fun(Definition) ->
pharos@probe:event_name(Definition) =:= Event_name
end
).
-file("src/pharos/internal/telemetry.gleam", 248).
?DOC(false).
-spec dedupe(list(list(gleam@erlang@atom:atom_()))) -> list(list(gleam@erlang@atom:atom_())).
dedupe(Items) ->
gleam@list:fold(
Items,
[],
fun(Acc, Item) -> case gleam@list:contains(Acc, Item) of
true ->
Acc;
false ->
[Item | Acc]
end end
).
-file("src/pharos/internal/telemetry.gleam", 194).
?DOC(false).
-spec event_name_for(pharos@statistic:statistic_kind()) -> list(gleam@erlang@atom:atom_()).
event_name_for(Kind) ->
case Kind of
beam_memory ->
[erlang:binary_to_atom(<<"vm"/utf8>>),
erlang:binary_to_atom(<<"memory"/utf8>>)];
beam_run_queues ->
[erlang:binary_to_atom(<<"vm"/utf8>>),
erlang:binary_to_atom(<<"total_run_queue_lengths"/utf8>>)];
beam_system_counts ->
[erlang:binary_to_atom(<<"vm"/utf8>>),
erlang:binary_to_atom(<<"system_counts"/utf8>>)];
beam_persistent_term ->
[erlang:binary_to_atom(<<"vm"/utf8>>),
erlang:binary_to_atom(<<"persistent_term"/utf8>>)];
{process_info, _, Event, _} ->
Event;
cluster_nodes ->
[erlang:binary_to_atom(<<"pharos"/utf8>>),
erlang:binary_to_atom(<<"cluster"/utf8>>),
erlang:binary_to_atom(<<"nodes"/utf8>>)];
host_memory ->
[erlang:binary_to_atom(<<"pharos"/utf8>>),
erlang:binary_to_atom(<<"host"/utf8>>),
erlang:binary_to_atom(<<"memory"/utf8>>)];
{host_disk, _} ->
[erlang:binary_to_atom(<<"pharos"/utf8>>),
erlang:binary_to_atom(<<"host"/utf8>>),
erlang:binary_to_atom(<<"disk"/utf8>>)];
host_cpu ->
[erlang:binary_to_atom(<<"pharos"/utf8>>),
erlang:binary_to_atom(<<"host"/utf8>>),
erlang:binary_to_atom(<<"cpu"/utf8>>)];
host_network ->
[erlang:binary_to_atom(<<"pharos"/utf8>>),
erlang:binary_to_atom(<<"host"/utf8>>),
erlang:binary_to_atom(<<"network"/utf8>>)];
beam_scheduler ->
[erlang:binary_to_atom(<<"pharos"/utf8>>),
erlang:binary_to_atom(<<"vm"/utf8>>),
erlang:binary_to_atom(<<"scheduler"/utf8>>)];
beam_reductions ->
[erlang:binary_to_atom(<<"pharos"/utf8>>),
erlang:binary_to_atom(<<"vm"/utf8>>),
erlang:binary_to_atom(<<"reductions"/utf8>>)]
end.
-file("src/pharos/internal/telemetry.gleam", 187).
?DOC(false).
-spec derive_event_names(pharos@config:config(), list(pharos@probe:probe())) -> list(list(gleam@erlang@atom:atom_())).
derive_event_names(Config, Probes) ->
Builtin = gleam@list:map(
erlang:element(2, Config),
fun(Stat) -> event_name_for(erlang:element(2, Stat)) end
),
Custom = gleam@list:map(Probes, fun pharos@probe:event_name/1),
dedupe(lists:append(Builtin, Custom)).
-file("src/pharos/internal/telemetry.gleam", 59).
?DOC(false).
-spec attach(
pharos@config:config(),
list({pharos@config:threshold(), pharos@alert_manager:alert_manager()}),
list({pharos@probe:probe_threshold(), pharos@alert_manager:alert_manager()}),
list(pharos@probe:probe()),
pharos@internal@hot_buffer:hot_buffer(),
gleam@erlang@atom:atom_()
) -> {ok, nil} | {error, attach_error()}.
attach(Config, Managers, Probe_managers, Probes, Buffer, Handler_id) ->
Event_names = derive_event_names(Config, Probes),
Memory_unit = erlang:element(7, Config),
Thresholds = Managers,
On_event = fun(Event, _) ->
Telemetry_event = {telemetry_event,
erlang:element(2, Event),
erlang:element(3, Event),
erlang:element(4, Event)},
case find_probe(Probes, erlang:element(2, Event)) of
{ok, P} ->
case pharos@probe:decode(P, Telemetry_event) of
{error, _} ->
nil;
{ok, Sample} ->
buffer_metrics(
Buffer,
pharos@metric:from_sample(
pharos@probe:id(P),
Sample
)
),
route_probe_sample(P, Sample, Probe_managers)
end;
{error, nil} ->
case pharos@measurement:decode_with_unit(
Telemetry_event,
Memory_unit
) of
{ok, Decoded_measurement} ->
buffer_metrics(
Buffer,
pharos@metric:from_measurement(Decoded_measurement)
),
route(Decoded_measurement, Thresholds);
{error, _} ->
nil
end
end
end,
pharos_ffi:attach_many(Handler_id, Event_names, On_event, nil).
-file("src/pharos/internal/telemetry.gleam", 115).
?DOC(false).
-spec detach(gleam@erlang@atom:atom_()) -> {ok, nil} | {error, detach_error()}.
detach(Handler_id) ->
pharos_ffi:detach(Handler_id).