-module(pharos@internal@poller).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/pharos/internal/poller.gleam").
-export([jitter_spread/2, child_specs/2, probe_child_specs/2]).
-export_type([poller_start_error/0, start_option/0, poller_measurement/0]).
-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.
?MODULEDOC(false).
-type poller_start_error() :: poller_ignored | {poller_start_failed, binary()}.
-type start_option() :: {period, integer()} |
{init_delay, integer()} |
{measurements, list(poller_measurement())}.
-type poller_measurement() :: {builtin, gleam@erlang@atom:atom_()} |
{process_info_spec,
gleam@erlang@atom:atom_(),
list(gleam@erlang@atom:atom_()),
list(gleam@erlang@atom:atom_())} |
{custom,
gleam@erlang@atom:atom_(),
gleam@erlang@atom:atom_(),
list(gleam@dynamic:dynamic_())}.
-file("src/pharos/internal/poller.gleam", 193).
?DOC(false).
-spec lower_kind(pharos@statistic:statistic_kind()) -> poller_measurement().
lower_kind(Kind) ->
case Kind of
beam_memory ->
{builtin, erlang:binary_to_atom(<<"memory"/utf8>>)};
beam_run_queues ->
{builtin, erlang:binary_to_atom(<<"total_run_queue_lengths"/utf8>>)};
beam_system_counts ->
{builtin, erlang:binary_to_atom(<<"system_counts"/utf8>>)};
beam_persistent_term ->
{builtin, erlang:binary_to_atom(<<"persistent_term"/utf8>>)};
{process_info, Name, Event, Keys} ->
{process_info_spec, Name, Event, Keys};
cluster_nodes ->
{custom,
erlang:binary_to_atom(<<"pharos_ffi"/utf8>>),
erlang:binary_to_atom(<<"emit_cluster_nodes"/utf8>>),
[]};
host_memory ->
{custom,
erlang:binary_to_atom(<<"pharos_ffi"/utf8>>),
erlang:binary_to_atom(<<"emit_host_memory"/utf8>>),
[]};
{host_disk, Path} ->
{custom,
erlang:binary_to_atom(<<"pharos_ffi"/utf8>>),
erlang:binary_to_atom(<<"emit_host_disk"/utf8>>),
[gleam_stdlib:identity(Path)]};
host_cpu ->
{custom,
erlang:binary_to_atom(<<"pharos_ffi"/utf8>>),
erlang:binary_to_atom(<<"emit_host_cpu"/utf8>>),
[]};
host_network ->
{custom,
erlang:binary_to_atom(<<"pharos_ffi"/utf8>>),
erlang:binary_to_atom(<<"emit_host_network"/utf8>>),
[]};
beam_scheduler ->
{custom,
erlang:binary_to_atom(<<"pharos_ffi"/utf8>>),
erlang:binary_to_atom(<<"emit_beam_scheduler"/utf8>>),
[]};
beam_reductions ->
{custom,
erlang:binary_to_atom(<<"pharos_ffi"/utf8>>),
erlang:binary_to_atom(<<"emit_beam_reductions"/utf8>>),
[]}
end.
-file("src/pharos/internal/poller.gleam", 144).
?DOC(false).
-spec jitter_spread(integer(), float()) -> integer().
jitter_spread(Interval_ms, Ratio) ->
gleam@int:max(erlang:round(erlang:float(Interval_ms) * Ratio), 0).
-file("src/pharos/internal/poller.gleam", 126).
?DOC(false).
-spec init_delay_options(integer(), gleam@option:option(float())) -> list(start_option()).
init_delay_options(Interval_ms, Jitter) ->
case Jitter of
none ->
[];
{some, Ratio} ->
case jitter_spread(Interval_ms, Ratio) of
0 ->
[];
Spread ->
[{init_delay, gleam@int:random(Spread)}]
end
end.
-file("src/pharos/internal/poller.gleam", 102).
?DOC(false).
-spec poller_child(
integer(),
list(poller_measurement()),
gleam@option:option(float())
) -> gleam@otp@supervision:child_specification(gleam@erlang@process:pid_()).
poller_child(Interval_ms, Measurements, Jitter) ->
gleam@otp@supervision:worker(
fun() ->
Options = begin
_pipe = [{period, Interval_ms}, {measurements, Measurements}],
lists:append(_pipe, init_delay_options(Interval_ms, Jitter))
end,
case pharos_ffi:start_poller(Options) of
{ok, Pid} ->
{ok, {started, Pid, Pid}};
{error, poller_ignored} ->
{error,
{init_failed,
<<"telemetry_poller ignored startup"/utf8>>}};
{error, {poller_start_failed, Reason}} ->
{error, {init_failed, Reason}}
end
end
).
-file("src/pharos/internal/poller.gleam", 152).
?DOC(false).
-spec bucket_by_interval(list(pharos@statistic:statistic())) -> list({integer(),
list(pharos@statistic:statistic_kind())}).
bucket_by_interval(Statistics) ->
Grouped = gleam@list:fold(
Statistics,
maps:new(),
fun(Acc, Stat) ->
{statistic, Kind, Interval_ms} = Stat,
Existing = case gleam_stdlib:map_get(Acc, Interval_ms) of
{ok, Kinds} ->
Kinds;
{error, nil} ->
[]
end,
gleam@dict:insert(Acc, Interval_ms, [Kind | Existing])
end
),
maps:to_list(Grouped).
-file("src/pharos/internal/poller.gleam", 76).
?DOC(false).
-spec child_specs(
list(pharos@statistic:statistic()),
gleam@option:option(float())
) -> list(gleam@otp@supervision:child_specification(gleam@erlang@process:pid_())).
child_specs(Statistics, Jitter) ->
_pipe = bucket_by_interval(Statistics),
gleam@list:map(
_pipe,
fun(Bucket) ->
{Interval_ms, Kinds} = Bucket,
poller_child(
Interval_ms,
gleam@list:map(Kinds, fun lower_kind/1),
Jitter
)
end
).
-file("src/pharos/internal/poller.gleam", 186).
?DOC(false).
-spec lower_probe(pharos@probe:probe()) -> poller_measurement().
lower_probe(Definition) ->
case pharos@probe:source(Definition) of
{custom_mfa, Module, Function, Args} ->
{custom, Module, Function, Args}
end.
-file("src/pharos/internal/poller.gleam", 167).
?DOC(false).
-spec bucket_probes_by_interval(list(pharos@probe:probe())) -> list({integer(),
list(poller_measurement())}).
bucket_probes_by_interval(Probes) ->
Grouped = gleam@list:fold(
Probes,
maps:new(),
fun(Acc, Definition) ->
Interval_ms = pharos@probe:interval_ms(Definition),
Existing = case gleam_stdlib:map_get(Acc, Interval_ms) of
{ok, Measurements} ->
Measurements;
{error, nil} ->
[]
end,
gleam@dict:insert(
Acc,
Interval_ms,
[lower_probe(Definition) | Existing]
)
end
),
maps:to_list(Grouped).
-file("src/pharos/internal/poller.gleam", 91).
?DOC(false).
-spec probe_child_specs(
list(pharos@probe:probe()),
gleam@option:option(float())
) -> list(gleam@otp@supervision:child_specification(gleam@erlang@process:pid_())).
probe_child_specs(Probes, Jitter) ->
_pipe = bucket_probes_by_interval(Probes),
gleam@list:map(
_pipe,
fun(Bucket) ->
{Interval_ms, Measurements} = Bucket,
poller_child(Interval_ms, Measurements, Jitter)
end
).