src/lightspeed@pipeline@telemetry.erl

-module(lightspeed@pipeline@telemetry).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/lightspeed/pipeline/telemetry.gleam").
-export([zero/0, delta/5, apply/2, valid/1, label/1, metrics/3]).
-export_type([snapshot/0, delta/0]).

-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.

?MODULEDOC(" Pipeline telemetry contracts for lag/throughput/retry/dead-letter tracking.\n").

-type snapshot() :: {snapshot, integer(), integer(), integer(), integer()}.

-type delta() :: {delta, integer(), integer(), integer(), integer(), integer()}.

-file("src/lightspeed/pipeline/telemetry.gleam", 28).
?DOC(" Build a zeroed telemetry snapshot.\n").
-spec zero() -> snapshot().
zero() ->
    {snapshot, 0, 0, 0, 0}.

-file("src/lightspeed/pipeline/telemetry.gleam", 38).
?DOC(" Build one telemetry delta.\n").
-spec delta(integer(), integer(), integer(), integer(), integer()) -> delta().
delta(Processed_records, Lag_ms, Retry_count, Dead_letter_count, Elapsed_ms) ->
    {delta,
        Processed_records,
        Lag_ms,
        Retry_count,
        Dead_letter_count,
        Elapsed_ms}.

-file("src/lightspeed/pipeline/telemetry.gleam", 132).
-spec max(integer(), integer()) -> integer().
max(Left, Right) ->
    case Left >= Right of
        true ->
            Left;

        false ->
            Right
    end.

-file("src/lightspeed/pipeline/telemetry.gleam", 117).
-spec throughput_from_delta(delta()) -> integer().
throughput_from_delta(Delta) ->
    case erlang:element(6, Delta) =< 0 of
        true ->
            max(0, erlang:element(2, Delta));

        false ->
            max(0, case max(1, erlang:element(6, Delta)) of
                    0 -> 0;
                    Gleam@denominator -> (erlang:element(2, Delta) * 1000) div Gleam@denominator
                end)
    end.

-file("src/lightspeed/pipeline/telemetry.gleam", 55).
?DOC(" Apply one delta and return a new snapshot.\n").
-spec apply(snapshot(), delta()) -> snapshot().
apply(Snapshot, Delta) ->
    {snapshot,
        max(erlang:element(2, Snapshot), erlang:element(3, Delta)),
        throughput_from_delta(Delta),
        max(0, erlang:element(4, Snapshot) + erlang:element(4, Delta)),
        max(0, erlang:element(5, Snapshot) + erlang:element(5, Delta))}.

-file("src/lightspeed/pipeline/telemetry.gleam", 68).
?DOC(" Snapshot invariants.\n").
-spec valid(snapshot()) -> boolean().
valid(Snapshot) ->
    (((erlang:element(2, Snapshot) >= 0) andalso (erlang:element(3, Snapshot) >= 0))
    andalso (erlang:element(4, Snapshot) >= 0))
    andalso (erlang:element(5, Snapshot) >= 0).

-file("src/lightspeed/pipeline/telemetry.gleam", 76).
?DOC(" Stable snapshot label.\n").
-spec label(snapshot()) -> binary().
label(Snapshot) ->
    <<<<<<<<<<<<<<"lag_ms="/utf8,
                                (erlang:integer_to_binary(
                                    erlang:element(2, Snapshot)
                                ))/binary>>/binary,
                            "|throughput_rps="/utf8>>/binary,
                        (erlang:integer_to_binary(erlang:element(3, Snapshot)))/binary>>/binary,
                    "|retries="/utf8>>/binary,
                (erlang:integer_to_binary(erlang:element(4, Snapshot)))/binary>>/binary,
            "|dead_letters="/utf8>>/binary,
        (erlang:integer_to_binary(erlang:element(5, Snapshot)))/binary>>.

-file("src/lightspeed/pipeline/telemetry.gleam", 125).
-spec metric_tags(binary(), binary()) -> list(lightspeed@ops@telemetry:tag()).
metric_tags(Pipeline_name, Run_id) ->
    [{tag, <<"pipeline"/utf8>>, Pipeline_name},
        {tag, <<"run_id"/utf8>>, Run_id}].

-file("src/lightspeed/pipeline/telemetry.gleam", 88).
?DOC(" Convert one snapshot to metric points.\n").
-spec metrics(snapshot(), binary(), binary()) -> list(lightspeed@ops@telemetry:metric()).
metrics(Snapshot, Pipeline_name, Run_id) ->
    [{gauge,
            <<"lightspeed.pipeline.lag_ms"/utf8>>,
            erlang:element(2, Snapshot),
            metric_tags(Pipeline_name, Run_id)},
        {gauge,
            <<"lightspeed.pipeline.throughput_records_per_sec"/utf8>>,
            erlang:element(3, Snapshot),
            metric_tags(Pipeline_name, Run_id)},
        {counter,
            <<"lightspeed.pipeline.retry_total"/utf8>>,
            erlang:element(4, Snapshot),
            metric_tags(Pipeline_name, Run_id)},
        {counter,
            <<"lightspeed.pipeline.dead_letter_total"/utf8>>,
            erlang:element(5, Snapshot),
            metric_tags(Pipeline_name, Run_id)}].