Skip to main content

src/pharos@internal@poller.erl

-module(pharos@internal@poller).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/pharos/internal/poller.gleam").
-export([jitter_spread/2, child_specs/2, probe_child_specs/2]).
-export_type([poller_start_error/0, start_option/0, poller_measurement/0]).

-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.

?MODULEDOC(false).

-type poller_start_error() :: poller_ignored | {poller_start_failed, binary()}.

-type start_option() :: {period, integer()} |
    {init_delay, integer()} |
    {measurements, list(poller_measurement())}.

-type poller_measurement() :: {builtin, gleam@erlang@atom:atom_()} |
    {process_info_spec,
        gleam@erlang@atom:atom_(),
        list(gleam@erlang@atom:atom_()),
        list(gleam@erlang@atom:atom_())} |
    {custom,
        gleam@erlang@atom:atom_(),
        gleam@erlang@atom:atom_(),
        list(gleam@dynamic:dynamic_())}.

-file("src/pharos/internal/poller.gleam", 193).
?DOC(false).
-spec lower_kind(pharos@statistic:statistic_kind()) -> poller_measurement().
lower_kind(Kind) ->
    case Kind of
        beam_memory ->
            {builtin, erlang:binary_to_atom(<<"memory"/utf8>>)};

        beam_run_queues ->
            {builtin, erlang:binary_to_atom(<<"total_run_queue_lengths"/utf8>>)};

        beam_system_counts ->
            {builtin, erlang:binary_to_atom(<<"system_counts"/utf8>>)};

        beam_persistent_term ->
            {builtin, erlang:binary_to_atom(<<"persistent_term"/utf8>>)};

        {process_info, Name, Event, Keys} ->
            {process_info_spec, Name, Event, Keys};

        cluster_nodes ->
            {custom,
                erlang:binary_to_atom(<<"pharos_ffi"/utf8>>),
                erlang:binary_to_atom(<<"emit_cluster_nodes"/utf8>>),
                []};

        host_memory ->
            {custom,
                erlang:binary_to_atom(<<"pharos_ffi"/utf8>>),
                erlang:binary_to_atom(<<"emit_host_memory"/utf8>>),
                []};

        {host_disk, Path} ->
            {custom,
                erlang:binary_to_atom(<<"pharos_ffi"/utf8>>),
                erlang:binary_to_atom(<<"emit_host_disk"/utf8>>),
                [gleam_stdlib:identity(Path)]};

        host_cpu ->
            {custom,
                erlang:binary_to_atom(<<"pharos_ffi"/utf8>>),
                erlang:binary_to_atom(<<"emit_host_cpu"/utf8>>),
                []};

        host_network ->
            {custom,
                erlang:binary_to_atom(<<"pharos_ffi"/utf8>>),
                erlang:binary_to_atom(<<"emit_host_network"/utf8>>),
                []};

        beam_scheduler ->
            {custom,
                erlang:binary_to_atom(<<"pharos_ffi"/utf8>>),
                erlang:binary_to_atom(<<"emit_beam_scheduler"/utf8>>),
                []};

        beam_reductions ->
            {custom,
                erlang:binary_to_atom(<<"pharos_ffi"/utf8>>),
                erlang:binary_to_atom(<<"emit_beam_reductions"/utf8>>),
                []}
    end.

-file("src/pharos/internal/poller.gleam", 144).
?DOC(false).
-spec jitter_spread(integer(), float()) -> integer().
jitter_spread(Interval_ms, Ratio) ->
    gleam@int:max(erlang:round(erlang:float(Interval_ms) * Ratio), 0).

-file("src/pharos/internal/poller.gleam", 126).
?DOC(false).
-spec init_delay_options(integer(), gleam@option:option(float())) -> list(start_option()).
init_delay_options(Interval_ms, Jitter) ->
    case Jitter of
        none ->
            [];

        {some, Ratio} ->
            case jitter_spread(Interval_ms, Ratio) of
                0 ->
                    [];

                Spread ->
                    [{init_delay, gleam@int:random(Spread)}]
            end
    end.

-file("src/pharos/internal/poller.gleam", 102).
?DOC(false).
-spec poller_child(
    integer(),
    list(poller_measurement()),
    gleam@option:option(float())
) -> gleam@otp@supervision:child_specification(gleam@erlang@process:pid_()).
poller_child(Interval_ms, Measurements, Jitter) ->
    gleam@otp@supervision:worker(
        fun() ->
            Options = begin
                _pipe = [{period, Interval_ms}, {measurements, Measurements}],
                lists:append(_pipe, init_delay_options(Interval_ms, Jitter))
            end,
            case pharos_ffi:start_poller(Options) of
                {ok, Pid} ->
                    {ok, {started, Pid, Pid}};

                {error, poller_ignored} ->
                    {error,
                        {init_failed,
                            <<"telemetry_poller ignored startup"/utf8>>}};

                {error, {poller_start_failed, Reason}} ->
                    {error, {init_failed, Reason}}
            end
        end
    ).

-file("src/pharos/internal/poller.gleam", 152).
?DOC(false).
-spec bucket_by_interval(list(pharos@statistic:statistic())) -> list({integer(),
    list(pharos@statistic:statistic_kind())}).
bucket_by_interval(Statistics) ->
    Grouped = gleam@list:fold(
        Statistics,
        maps:new(),
        fun(Acc, Stat) ->
            {statistic, Kind, Interval_ms} = Stat,
            Existing = case gleam_stdlib:map_get(Acc, Interval_ms) of
                {ok, Kinds} ->
                    Kinds;

                {error, nil} ->
                    []
            end,
            gleam@dict:insert(Acc, Interval_ms, [Kind | Existing])
        end
    ),
    maps:to_list(Grouped).

-file("src/pharos/internal/poller.gleam", 76).
?DOC(false).
-spec child_specs(
    list(pharos@statistic:statistic()),
    gleam@option:option(float())
) -> list(gleam@otp@supervision:child_specification(gleam@erlang@process:pid_())).
child_specs(Statistics, Jitter) ->
    _pipe = bucket_by_interval(Statistics),
    gleam@list:map(
        _pipe,
        fun(Bucket) ->
            {Interval_ms, Kinds} = Bucket,
            poller_child(
                Interval_ms,
                gleam@list:map(Kinds, fun lower_kind/1),
                Jitter
            )
        end
    ).

-file("src/pharos/internal/poller.gleam", 186).
?DOC(false).
-spec lower_probe(pharos@probe:probe()) -> poller_measurement().
lower_probe(Definition) ->
    case pharos@probe:source(Definition) of
        {custom_mfa, Module, Function, Args} ->
            {custom, Module, Function, Args}
    end.

-file("src/pharos/internal/poller.gleam", 167).
?DOC(false).
-spec bucket_probes_by_interval(list(pharos@probe:probe())) -> list({integer(),
    list(poller_measurement())}).
bucket_probes_by_interval(Probes) ->
    Grouped = gleam@list:fold(
        Probes,
        maps:new(),
        fun(Acc, Definition) ->
            Interval_ms = pharos@probe:interval_ms(Definition),
            Existing = case gleam_stdlib:map_get(Acc, Interval_ms) of
                {ok, Measurements} ->
                    Measurements;

                {error, nil} ->
                    []
            end,
            gleam@dict:insert(
                Acc,
                Interval_ms,
                [lower_probe(Definition) | Existing]
            )
        end
    ),
    maps:to_list(Grouped).

-file("src/pharos/internal/poller.gleam", 91).
?DOC(false).
-spec probe_child_specs(
    list(pharos@probe:probe()),
    gleam@option:option(float())
) -> list(gleam@otp@supervision:child_specification(gleam@erlang@process:pid_())).
probe_child_specs(Probes, Jitter) ->
    _pipe = bucket_probes_by_interval(Probes),
    gleam@list:map(
        _pipe,
        fun(Bucket) ->
            {Interval_ms, Measurements} = Bucket,
            poller_child(Interval_ms, Measurements, Jitter)
        end
    ).