Skip to main content

src/pharos@internal@telemetry.erl

-module(pharos@internal@telemetry).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/pharos/internal/telemetry.gleam").
-export([attach/6, detach/1]).
-export_type([attach_error/0, detach_error/0, telemetry_event/0]).

-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.

?MODULEDOC(false).

-type attach_error() :: handler_already_attached | {attach_failed, binary()}.

-type detach_error() :: handler_not_found | {detach_failed, binary()}.

-type telemetry_event() :: {telemetry_event,
        list(gleam@erlang@atom:atom_()),
        gleam@dict:dict(gleam@erlang@atom:atom_(), gleam@dynamic:dynamic_()),
        gleam@dict:dict(gleam@erlang@atom:atom_(), gleam@dynamic:dynamic_())}.

-file("src/pharos/internal/telemetry.gleam", 123).
?DOC(false).
-spec route(
    pharos@measurement:measurement(),
    list({pharos@config:threshold(), pharos@alert_manager:alert_manager()})
) -> nil.
route(Decoded_measurement, Managers) ->
    gleam@list:each(
        Managers,
        fun(Pair) ->
            {Threshold, Manager} = Pair,
            case Threshold of
                {windowed, Over, _, _} ->
                    case pharos@threshold_eval:sample_value(
                        Decoded_measurement,
                        Over
                    ) of
                        {ok, Value} ->
                            pharos@alert_manager:sample(Manager, Value);

                        {error, nil} ->
                            nil
                    end;

                _ ->
                    case pharos@threshold_eval:evaluate(
                        Decoded_measurement,
                        Threshold
                    ) of
                        not_applicable ->
                            nil;

                        breached ->
                            pharos@alert_manager:breach(Manager);

                        healthy ->
                            pharos@alert_manager:recover(Manager)
                    end
            end
        end
    ).

-file("src/pharos/internal/telemetry.gleam", 107).
?DOC(false).
-spec buffer_metrics(
    pharos@internal@hot_buffer:hot_buffer(),
    list(pharos@metric:metric())
) -> nil.
buffer_metrics(Buffer, Metrics) ->
    gleam@list:each(
        Metrics,
        fun(M) ->
            _ = pharos@internal@hot_buffer:push(Buffer, M),
            nil
        end
    ).

-file("src/pharos/internal/telemetry.gleam", 160).
?DOC(false).
-spec route_probe_sample(
    pharos@probe:probe(),
    gleam@dict:dict(binary(), float()),
    list({pharos@probe:probe_threshold(), pharos@alert_manager:alert_manager()})
) -> nil.
route_probe_sample(Definition, Sample, Probe_managers) ->
    gleam@list:each(
        Probe_managers,
        fun(Pair) ->
            {Threshold, Manager} = Pair,
            case erlang:element(2, Threshold) =:= pharos@probe:id(Definition) of
                false ->
                    nil;

                true ->
                    case pharos@threshold_eval:evaluate_probe(Sample, Threshold) of
                        not_applicable ->
                            nil;

                        breached ->
                            pharos@alert_manager:breach(Manager);

                        healthy ->
                            pharos@alert_manager:recover(Manager)
                    end
            end
        end
    ).

-file("src/pharos/internal/telemetry.gleam", 149).
?DOC(false).
-spec find_probe(list(pharos@probe:probe()), list(gleam@erlang@atom:atom_())) -> {ok,
        pharos@probe:probe()} |
    {error, nil}.
find_probe(Probes, Event_name) ->
    gleam@list:find(
        Probes,
        fun(Definition) ->
            pharos@probe:event_name(Definition) =:= Event_name
        end
    ).

-file("src/pharos/internal/telemetry.gleam", 248).
?DOC(false).
-spec dedupe(list(list(gleam@erlang@atom:atom_()))) -> list(list(gleam@erlang@atom:atom_())).
dedupe(Items) ->
    gleam@list:fold(
        Items,
        [],
        fun(Acc, Item) -> case gleam@list:contains(Acc, Item) of
                true ->
                    Acc;

                false ->
                    [Item | Acc]
            end end
    ).

-file("src/pharos/internal/telemetry.gleam", 194).
?DOC(false).
-spec event_name_for(pharos@statistic:statistic_kind()) -> list(gleam@erlang@atom:atom_()).
event_name_for(Kind) ->
    case Kind of
        beam_memory ->
            [erlang:binary_to_atom(<<"vm"/utf8>>),
                erlang:binary_to_atom(<<"memory"/utf8>>)];

        beam_run_queues ->
            [erlang:binary_to_atom(<<"vm"/utf8>>),
                erlang:binary_to_atom(<<"total_run_queue_lengths"/utf8>>)];

        beam_system_counts ->
            [erlang:binary_to_atom(<<"vm"/utf8>>),
                erlang:binary_to_atom(<<"system_counts"/utf8>>)];

        beam_persistent_term ->
            [erlang:binary_to_atom(<<"vm"/utf8>>),
                erlang:binary_to_atom(<<"persistent_term"/utf8>>)];

        {process_info, _, Event, _} ->
            Event;

        cluster_nodes ->
            [erlang:binary_to_atom(<<"pharos"/utf8>>),
                erlang:binary_to_atom(<<"cluster"/utf8>>),
                erlang:binary_to_atom(<<"nodes"/utf8>>)];

        host_memory ->
            [erlang:binary_to_atom(<<"pharos"/utf8>>),
                erlang:binary_to_atom(<<"host"/utf8>>),
                erlang:binary_to_atom(<<"memory"/utf8>>)];

        {host_disk, _} ->
            [erlang:binary_to_atom(<<"pharos"/utf8>>),
                erlang:binary_to_atom(<<"host"/utf8>>),
                erlang:binary_to_atom(<<"disk"/utf8>>)];

        host_cpu ->
            [erlang:binary_to_atom(<<"pharos"/utf8>>),
                erlang:binary_to_atom(<<"host"/utf8>>),
                erlang:binary_to_atom(<<"cpu"/utf8>>)];

        host_network ->
            [erlang:binary_to_atom(<<"pharos"/utf8>>),
                erlang:binary_to_atom(<<"host"/utf8>>),
                erlang:binary_to_atom(<<"network"/utf8>>)];

        beam_scheduler ->
            [erlang:binary_to_atom(<<"pharos"/utf8>>),
                erlang:binary_to_atom(<<"vm"/utf8>>),
                erlang:binary_to_atom(<<"scheduler"/utf8>>)];

        beam_reductions ->
            [erlang:binary_to_atom(<<"pharos"/utf8>>),
                erlang:binary_to_atom(<<"vm"/utf8>>),
                erlang:binary_to_atom(<<"reductions"/utf8>>)]
    end.

-file("src/pharos/internal/telemetry.gleam", 187).
?DOC(false).
-spec derive_event_names(pharos@config:config(), list(pharos@probe:probe())) -> list(list(gleam@erlang@atom:atom_())).
derive_event_names(Config, Probes) ->
    Builtin = gleam@list:map(
        erlang:element(2, Config),
        fun(Stat) -> event_name_for(erlang:element(2, Stat)) end
    ),
    Custom = gleam@list:map(Probes, fun pharos@probe:event_name/1),
    dedupe(lists:append(Builtin, Custom)).

-file("src/pharos/internal/telemetry.gleam", 59).
?DOC(false).
-spec attach(
    pharos@config:config(),
    list({pharos@config:threshold(), pharos@alert_manager:alert_manager()}),
    list({pharos@probe:probe_threshold(), pharos@alert_manager:alert_manager()}),
    list(pharos@probe:probe()),
    pharos@internal@hot_buffer:hot_buffer(),
    gleam@erlang@atom:atom_()
) -> {ok, nil} | {error, attach_error()}.
attach(Config, Managers, Probe_managers, Probes, Buffer, Handler_id) ->
    Event_names = derive_event_names(Config, Probes),
    Memory_unit = erlang:element(7, Config),
    Thresholds = Managers,
    On_event = fun(Event, _) ->
        Telemetry_event = {telemetry_event,
            erlang:element(2, Event),
            erlang:element(3, Event),
            erlang:element(4, Event)},
        case find_probe(Probes, erlang:element(2, Event)) of
            {ok, P} ->
                case pharos@probe:decode(P, Telemetry_event) of
                    {error, _} ->
                        nil;

                    {ok, Sample} ->
                        buffer_metrics(
                            Buffer,
                            pharos@metric:from_sample(
                                pharos@probe:id(P),
                                Sample
                            )
                        ),
                        route_probe_sample(P, Sample, Probe_managers)
                end;

            {error, nil} ->
                case pharos@measurement:decode_with_unit(
                    Telemetry_event,
                    Memory_unit
                ) of
                    {ok, Decoded_measurement} ->
                        buffer_metrics(
                            Buffer,
                            pharos@metric:from_measurement(Decoded_measurement)
                        ),
                        route(Decoded_measurement, Thresholds);

                    {error, _} ->
                        nil
                end
        end
    end,
    pharos_ffi:attach_many(Handler_id, Event_names, On_event, nil).

-file("src/pharos/internal/telemetry.gleam", 115).
?DOC(false).
-spec detach(gleam@erlang@atom:atom_()) -> {ok, nil} | {error, detach_error()}.
detach(Handler_id) ->
    pharos_ffi:detach(Handler_id).