-module(lightspeed@pipeline@telemetry).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/lightspeed/pipeline/telemetry.gleam").
-export([zero/0, delta/5, apply/2, valid/1, label/1, metrics/3]).
-export_type([snapshot/0, delta/0]).
-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.
?MODULEDOC(" Pipeline telemetry contracts for lag/throughput/retry/dead-letter tracking.\n").
-type snapshot() :: {snapshot, integer(), integer(), integer(), integer()}.
-type delta() :: {delta, integer(), integer(), integer(), integer(), integer()}.
-file("src/lightspeed/pipeline/telemetry.gleam", 28).
?DOC(" Build a zeroed telemetry snapshot.\n").
-spec zero() -> snapshot().
zero() ->
{snapshot, 0, 0, 0, 0}.
-file("src/lightspeed/pipeline/telemetry.gleam", 38).
?DOC(" Build one telemetry delta.\n").
-spec delta(integer(), integer(), integer(), integer(), integer()) -> delta().
delta(Processed_records, Lag_ms, Retry_count, Dead_letter_count, Elapsed_ms) ->
{delta,
Processed_records,
Lag_ms,
Retry_count,
Dead_letter_count,
Elapsed_ms}.
-file("src/lightspeed/pipeline/telemetry.gleam", 132).
-spec max(integer(), integer()) -> integer().
max(Left, Right) ->
case Left >= Right of
true ->
Left;
false ->
Right
end.
-file("src/lightspeed/pipeline/telemetry.gleam", 117).
-spec throughput_from_delta(delta()) -> integer().
throughput_from_delta(Delta) ->
case erlang:element(6, Delta) =< 0 of
true ->
max(0, erlang:element(2, Delta));
false ->
max(0, case max(1, erlang:element(6, Delta)) of
0 -> 0;
Gleam@denominator -> (erlang:element(2, Delta) * 1000) div Gleam@denominator
end)
end.
-file("src/lightspeed/pipeline/telemetry.gleam", 55).
?DOC(" Apply one delta and return a new snapshot.\n").
-spec apply(snapshot(), delta()) -> snapshot().
apply(Snapshot, Delta) ->
{snapshot,
max(erlang:element(2, Snapshot), erlang:element(3, Delta)),
throughput_from_delta(Delta),
max(0, erlang:element(4, Snapshot) + erlang:element(4, Delta)),
max(0, erlang:element(5, Snapshot) + erlang:element(5, Delta))}.
-file("src/lightspeed/pipeline/telemetry.gleam", 68).
?DOC(" Snapshot invariants.\n").
-spec valid(snapshot()) -> boolean().
valid(Snapshot) ->
(((erlang:element(2, Snapshot) >= 0) andalso (erlang:element(3, Snapshot) >= 0))
andalso (erlang:element(4, Snapshot) >= 0))
andalso (erlang:element(5, Snapshot) >= 0).
-file("src/lightspeed/pipeline/telemetry.gleam", 76).
?DOC(" Stable snapshot label.\n").
-spec label(snapshot()) -> binary().
label(Snapshot) ->
<<<<<<<<<<<<<<"lag_ms="/utf8,
(erlang:integer_to_binary(
erlang:element(2, Snapshot)
))/binary>>/binary,
"|throughput_rps="/utf8>>/binary,
(erlang:integer_to_binary(erlang:element(3, Snapshot)))/binary>>/binary,
"|retries="/utf8>>/binary,
(erlang:integer_to_binary(erlang:element(4, Snapshot)))/binary>>/binary,
"|dead_letters="/utf8>>/binary,
(erlang:integer_to_binary(erlang:element(5, Snapshot)))/binary>>.
-file("src/lightspeed/pipeline/telemetry.gleam", 125).
-spec metric_tags(binary(), binary()) -> list(lightspeed@ops@telemetry:tag()).
metric_tags(Pipeline_name, Run_id) ->
[{tag, <<"pipeline"/utf8>>, Pipeline_name},
{tag, <<"run_id"/utf8>>, Run_id}].
-file("src/lightspeed/pipeline/telemetry.gleam", 88).
?DOC(" Convert one snapshot to metric points.\n").
-spec metrics(snapshot(), binary(), binary()) -> list(lightspeed@ops@telemetry:metric()).
metrics(Snapshot, Pipeline_name, Run_id) ->
[{gauge,
<<"lightspeed.pipeline.lag_ms"/utf8>>,
erlang:element(2, Snapshot),
metric_tags(Pipeline_name, Run_id)},
{gauge,
<<"lightspeed.pipeline.throughput_records_per_sec"/utf8>>,
erlang:element(3, Snapshot),
metric_tags(Pipeline_name, Run_id)},
{counter,
<<"lightspeed.pipeline.retry_total"/utf8>>,
erlang:element(4, Snapshot),
metric_tags(Pipeline_name, Run_id)},
{counter,
<<"lightspeed.pipeline.dead_letter_total"/utf8>>,
erlang:element(5, Snapshot),
metric_tags(Pipeline_name, Run_id)}].