-module(lightspeed@pipeline@slo).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/lightspeed/pipeline/slo.gleam").
-export([budget/5, observation/6, budget_version_label/0, default_budget/0, valid_budget/1, valid_observation/1, valid_budgets/1, retry_rate_percent/1, evaluate/2, budget_failures/1, budget_result_label/1, observation_label/1]).
-export_type([budget/0, observation/0, budget_result/0]).
-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.
?MODULEDOC(" ETL SLO budget contracts for lag/throughput/retry/replay recovery.\n").
-type budget() :: {budget, binary(), integer(), integer(), integer(), integer()}.
-type observation() :: {observation,
binary(),
integer(),
integer(),
integer(),
integer(),
integer()}.
-type budget_result() :: {budget_result, binary(), boolean(), binary()}.
-file("src/lightspeed/pipeline/slo.gleam", 38).
?DOC(" Build one SLO budget profile.\n").
-spec budget(binary(), integer(), integer(), integer(), integer()) -> budget().
budget(
Profile,
Max_lag_ms,
Min_throughput_records_per_sec,
Max_retry_rate_percent,
Max_replay_recovery_ms
) ->
{budget,
Profile,
Max_lag_ms,
Min_throughput_records_per_sec,
Max_retry_rate_percent,
Max_replay_recovery_ms}.
-file("src/lightspeed/pipeline/slo.gleam", 55).
?DOC(" Build one ETL observation.\n").
-spec observation(
binary(),
integer(),
integer(),
integer(),
integer(),
integer()
) -> observation().
observation(
Profile,
Lag_ms,
Throughput_records_per_sec,
Retry_count,
Processed_records,
Replay_recovery_ms
) ->
{observation,
Profile,
Lag_ms,
Throughput_records_per_sec,
Retry_count,
Processed_records,
Replay_recovery_ms}.
-file("src/lightspeed/pipeline/slo.gleam", 74).
?DOC(" Versioned M33 budget profile label.\n").
-spec budget_version_label() -> binary().
budget_version_label() ->
<<"m33.budget.v"/utf8, (erlang:integer_to_binary(1))/binary>>.
-file("src/lightspeed/pipeline/slo.gleam", 79).
?DOC(" Default M33 ETL SLO budget profile list.\n").
-spec default_budget() -> list(budget()).
default_budget() ->
[{budget, <<"orders_hourly"/utf8>>, 200, 350, 5, 3000},
{budget, <<"orders_replay"/utf8>>, 320, 220, 8, 4500}].
-file("src/lightspeed/pipeline/slo.gleam", 99).
?DOC(" Validate one budget profile.\n").
-spec valid_budget(budget()) -> boolean().
valid_budget(Budget) ->
((((erlang:element(2, Budget) /= <<""/utf8>>) andalso (erlang:element(
3,
Budget
)
>= 0))
andalso (erlang:element(4, Budget) > 0))
andalso (erlang:element(5, Budget) >= 0))
andalso (erlang:element(6, Budget) >= 0).
-file("src/lightspeed/pipeline/slo.gleam", 108).
?DOC(" Validate one observation.\n").
-spec valid_observation(observation()) -> boolean().
valid_observation(Observation) ->
(((((erlang:element(2, Observation) /= <<""/utf8>>) andalso (erlang:element(
3,
Observation
)
>= 0))
andalso (erlang:element(4, Observation) >= 0))
andalso (erlang:element(5, Observation) >= 0))
andalso (erlang:element(6, Observation) >= 0))
andalso (erlang:element(7, Observation) >= 0).
-file("src/lightspeed/pipeline/slo.gleam", 285).
-spec contains_profile(list(binary()), binary()) -> boolean().
contains_profile(Profiles, Profile) ->
case Profiles of
[] ->
false;
[Entry | Rest] ->
case Entry =:= Profile of
true ->
true;
false ->
contains_profile(Rest, Profile)
end
end.
-file("src/lightspeed/pipeline/slo.gleam", 274).
-spec budget_profiles_unique(list(budget()), list(binary())) -> boolean().
budget_profiles_unique(Budgets, Seen) ->
case Budgets of
[] ->
true;
[Entry | Rest] ->
case contains_profile(Seen, erlang:element(2, Entry)) of
true ->
false;
false ->
budget_profiles_unique(
Rest,
[erlang:element(2, Entry) | Seen]
)
end
end.
-file("src/lightspeed/pipeline/slo.gleam", 267).
-spec budgets_valid(list(budget())) -> boolean().
budgets_valid(Budgets) ->
case Budgets of
[] ->
true;
[Entry | Rest] ->
valid_budget(Entry) andalso budgets_valid(Rest)
end.
-file("src/lightspeed/pipeline/slo.gleam", 118).
?DOC(" Validate a budget list and enforce unique profile names.\n").
-spec valid_budgets(list(budget())) -> boolean().
valid_budgets(Budgets) ->
budgets_valid(Budgets) andalso budget_profiles_unique(Budgets, []).
-file("src/lightspeed/pipeline/slo.gleam", 317).
-spec join_with(binary(), list(binary())) -> binary().
join_with(Separator, Values) ->
case Values of
[] ->
<<""/utf8>>;
[Value] ->
Value;
[Value@1 | Rest] ->
<<<<Value@1/binary, Separator/binary>>/binary,
(join_with(Separator, Rest))/binary>>
end.
-file("src/lightspeed/pipeline/slo.gleam", 256).
-spec append_if_false(list(binary()), boolean(), binary()) -> list(binary()).
append_if_false(Items, Condition, Value) ->
case Condition of
true ->
Items;
false ->
lists:append(Items, [Value])
end.
-file("src/lightspeed/pipeline/slo.gleam", 243).
-spec failing_checks(boolean(), boolean(), boolean(), boolean()) -> list(binary()).
failing_checks(Lag_ok, Throughput_ok, Retry_rate_ok, Replay_recovery_ok) ->
_pipe = [],
_pipe@1 = append_if_false(_pipe, Lag_ok, <<"lag_exceeded"/utf8>>),
_pipe@2 = append_if_false(
_pipe@1,
Throughput_ok,
<<"throughput_below"/utf8>>
),
_pipe@3 = append_if_false(
_pipe@2,
Retry_rate_ok,
<<"retry_rate_exceeded"/utf8>>
),
append_if_false(
_pipe@3,
Replay_recovery_ok,
<<"replay_recovery_exceeded"/utf8>>
).
-file("src/lightspeed/pipeline/slo.gleam", 325).
-spec max(integer(), integer()) -> integer().
max(Left, Right) ->
case Left >= Right of
true ->
Left;
false ->
Right
end.
-file("src/lightspeed/pipeline/slo.gleam", 161).
?DOC(" Retry-rate percentage derived from retry count and processed records.\n").
-spec retry_rate_percent(observation()) -> integer().
retry_rate_percent(Observation) ->
case erlang:element(6, Observation) =< 0 of
true ->
case erlang:element(5, Observation) =:= 0 of
true ->
0;
false ->
100
end;
false ->
max(0, case max(1, erlang:element(6, Observation)) of
0 -> 0;
Gleam@denominator -> (erlang:element(5, Observation) * 100)
div Gleam@denominator
end)
end.
-file("src/lightspeed/pipeline/slo.gleam", 296).
-spec find_observation(list(observation()), binary()) -> gleam@option:option(observation()).
find_observation(Observations, Profile) ->
case Observations of
[] ->
none;
[Entry | Rest] ->
case erlang:element(2, Entry) =:= Profile of
true ->
{some, Entry};
false ->
find_observation(Rest, Profile)
end
end.
-file("src/lightspeed/pipeline/slo.gleam", 192).
-spec evaluate_one(list(observation()), budget()) -> budget_result().
evaluate_one(Observations, Budget) ->
case find_observation(Observations, erlang:element(2, Budget)) of
none ->
{budget_result,
erlang:element(2, Budget),
false,
<<"missing_observation"/utf8>>};
{some, Observation} ->
case valid_budget(Budget) andalso valid_observation(Observation) of
false ->
{budget_result,
erlang:element(2, Budget),
false,
<<"invalid_input"/utf8>>};
true ->
Lag_ok = erlang:element(3, Observation) =< erlang:element(
3,
Budget
),
Throughput_ok = erlang:element(4, Observation) >= erlang:element(
4,
Budget
),
Retry_rate_ok = retry_rate_percent(Observation) =< erlang:element(
5,
Budget
),
Replay_recovery_ok = erlang:element(7, Observation) =< erlang:element(
6,
Budget
),
Passed = ((Lag_ok andalso Throughput_ok) andalso Retry_rate_ok)
andalso Replay_recovery_ok,
Failures = failing_checks(
Lag_ok,
Throughput_ok,
Retry_rate_ok,
Replay_recovery_ok
),
{budget_result,
erlang:element(2, Budget),
Passed,
case Passed of
true ->
<<"within_budget"/utf8>>;
false ->
join_with(<<"|"/utf8>>, Failures)
end}
end
end.
-file("src/lightspeed/pipeline/slo.gleam", 177).
-spec evaluate_loop(list(observation()), list(budget()), list(budget_result())) -> list(budget_result()).
evaluate_loop(Observations, Budgets, Results_rev) ->
case Budgets of
[] ->
lists:reverse(Results_rev);
[Entry | Rest] ->
evaluate_loop(
Observations,
Rest,
[evaluate_one(Observations, Entry) | Results_rev]
)
end.
-file("src/lightspeed/pipeline/slo.gleam", 123).
?DOC(" Evaluate one observation set against one budget list.\n").
-spec evaluate(list(observation()), list(budget())) -> list(budget_result()).
evaluate(Observations, Budgets) ->
evaluate_loop(Observations, Budgets, []).
-file("src/lightspeed/pipeline/slo.gleam", 131).
?DOC(" Count budget failures.\n").
-spec budget_failures(list(budget_result())) -> integer().
budget_failures(Results) ->
case Results of
[] ->
0;
[Result | Rest] ->
case erlang:element(3, Result) of
true ->
budget_failures(Rest);
false ->
1 + budget_failures(Rest)
end
end.
-file("src/lightspeed/pipeline/slo.gleam", 310).
-spec bool_label(boolean()) -> binary().
bool_label(Value) ->
case Value of
true ->
<<"pass"/utf8>>;
false ->
<<"fail"/utf8>>
end.
-file("src/lightspeed/pipeline/slo.gleam", 143).
?DOC(" Stable budget-result label.\n").
-spec budget_result_label(budget_result()) -> binary().
budget_result_label(Result) ->
<<<<<<<<(erlang:element(2, Result))/binary, ":"/utf8>>/binary,
(bool_label(erlang:element(3, Result)))/binary>>/binary,
":"/utf8>>/binary,
(erlang:element(4, Result))/binary>>.
-file("src/lightspeed/pipeline/slo.gleam", 148).
?DOC(" Stable observation label.\n").
-spec observation_label(observation()) -> binary().
observation_label(Observation) ->
<<<<<<<<<<<<<<<<(erlang:element(2, Observation))/binary, ":lag_ms="/utf8>>/binary,
(erlang:integer_to_binary(
erlang:element(3, Observation)
))/binary>>/binary,
":throughput_rps="/utf8>>/binary,
(erlang:integer_to_binary(
erlang:element(4, Observation)
))/binary>>/binary,
":retry_rate_percent="/utf8>>/binary,
(erlang:integer_to_binary(retry_rate_percent(Observation)))/binary>>/binary,
":replay_recovery_ms="/utf8>>/binary,
(erlang:integer_to_binary(erlang:element(7, Observation)))/binary>>.