src/lightspeed@pipeline@slo.erl

-module(lightspeed@pipeline@slo).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/lightspeed/pipeline/slo.gleam").
-export([budget/5, observation/6, budget_version_label/0, default_budget/0, valid_budget/1, valid_observation/1, valid_budgets/1, retry_rate_percent/1, evaluate/2, budget_failures/1, budget_result_label/1, observation_label/1]).
-export_type([budget/0, observation/0, budget_result/0]).

-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.

?MODULEDOC(" ETL SLO budget contracts for lag/throughput/retry/replay recovery.\n").

-type budget() :: {budget, binary(), integer(), integer(), integer(), integer()}.

-type observation() :: {observation,
        binary(),
        integer(),
        integer(),
        integer(),
        integer(),
        integer()}.

-type budget_result() :: {budget_result, binary(), boolean(), binary()}.

-file("src/lightspeed/pipeline/slo.gleam", 38).
?DOC(" Build one SLO budget profile.\n").
-spec budget(binary(), integer(), integer(), integer(), integer()) -> budget().
budget(
    Profile,
    Max_lag_ms,
    Min_throughput_records_per_sec,
    Max_retry_rate_percent,
    Max_replay_recovery_ms
) ->
    {budget,
        Profile,
        Max_lag_ms,
        Min_throughput_records_per_sec,
        Max_retry_rate_percent,
        Max_replay_recovery_ms}.

-file("src/lightspeed/pipeline/slo.gleam", 55).
?DOC(" Build one ETL observation.\n").
-spec observation(
    binary(),
    integer(),
    integer(),
    integer(),
    integer(),
    integer()
) -> observation().
observation(
    Profile,
    Lag_ms,
    Throughput_records_per_sec,
    Retry_count,
    Processed_records,
    Replay_recovery_ms
) ->
    {observation,
        Profile,
        Lag_ms,
        Throughput_records_per_sec,
        Retry_count,
        Processed_records,
        Replay_recovery_ms}.

-file("src/lightspeed/pipeline/slo.gleam", 74).
?DOC(" Versioned M33 budget profile label.\n").
-spec budget_version_label() -> binary().
budget_version_label() ->
    <<"m33.budget.v"/utf8, (erlang:integer_to_binary(1))/binary>>.

-file("src/lightspeed/pipeline/slo.gleam", 79).
?DOC(" Default M33 ETL SLO budget profile list.\n").
-spec default_budget() -> list(budget()).
default_budget() ->
    [{budget, <<"orders_hourly"/utf8>>, 200, 350, 5, 3000},
        {budget, <<"orders_replay"/utf8>>, 320, 220, 8, 4500}].

-file("src/lightspeed/pipeline/slo.gleam", 99).
?DOC(" Validate one budget profile.\n").
-spec valid_budget(budget()) -> boolean().
valid_budget(Budget) ->
    ((((erlang:element(2, Budget) /= <<""/utf8>>) andalso (erlang:element(
        3,
        Budget
    )
    >= 0))
    andalso (erlang:element(4, Budget) > 0))
    andalso (erlang:element(5, Budget) >= 0))
    andalso (erlang:element(6, Budget) >= 0).

-file("src/lightspeed/pipeline/slo.gleam", 108).
?DOC(" Validate one observation.\n").
-spec valid_observation(observation()) -> boolean().
valid_observation(Observation) ->
    (((((erlang:element(2, Observation) /= <<""/utf8>>) andalso (erlang:element(
        3,
        Observation
    )
    >= 0))
    andalso (erlang:element(4, Observation) >= 0))
    andalso (erlang:element(5, Observation) >= 0))
    andalso (erlang:element(6, Observation) >= 0))
    andalso (erlang:element(7, Observation) >= 0).

-file("src/lightspeed/pipeline/slo.gleam", 285).
-spec contains_profile(list(binary()), binary()) -> boolean().
contains_profile(Profiles, Profile) ->
    case Profiles of
        [] ->
            false;

        [Entry | Rest] ->
            case Entry =:= Profile of
                true ->
                    true;

                false ->
                    contains_profile(Rest, Profile)
            end
    end.

-file("src/lightspeed/pipeline/slo.gleam", 274).
-spec budget_profiles_unique(list(budget()), list(binary())) -> boolean().
budget_profiles_unique(Budgets, Seen) ->
    case Budgets of
        [] ->
            true;

        [Entry | Rest] ->
            case contains_profile(Seen, erlang:element(2, Entry)) of
                true ->
                    false;

                false ->
                    budget_profiles_unique(
                        Rest,
                        [erlang:element(2, Entry) | Seen]
                    )
            end
    end.

-file("src/lightspeed/pipeline/slo.gleam", 267).
-spec budgets_valid(list(budget())) -> boolean().
budgets_valid(Budgets) ->
    case Budgets of
        [] ->
            true;

        [Entry | Rest] ->
            valid_budget(Entry) andalso budgets_valid(Rest)
    end.

-file("src/lightspeed/pipeline/slo.gleam", 118).
?DOC(" Validate a budget list and enforce unique profile names.\n").
-spec valid_budgets(list(budget())) -> boolean().
valid_budgets(Budgets) ->
    budgets_valid(Budgets) andalso budget_profiles_unique(Budgets, []).

-file("src/lightspeed/pipeline/slo.gleam", 317).
-spec join_with(binary(), list(binary())) -> binary().
join_with(Separator, Values) ->
    case Values of
        [] ->
            <<""/utf8>>;

        [Value] ->
            Value;

        [Value@1 | Rest] ->
            <<<<Value@1/binary, Separator/binary>>/binary,
                (join_with(Separator, Rest))/binary>>
    end.

-file("src/lightspeed/pipeline/slo.gleam", 256).
-spec append_if_false(list(binary()), boolean(), binary()) -> list(binary()).
append_if_false(Items, Condition, Value) ->
    case Condition of
        true ->
            Items;

        false ->
            lists:append(Items, [Value])
    end.

-file("src/lightspeed/pipeline/slo.gleam", 243).
-spec failing_checks(boolean(), boolean(), boolean(), boolean()) -> list(binary()).
failing_checks(Lag_ok, Throughput_ok, Retry_rate_ok, Replay_recovery_ok) ->
    _pipe = [],
    _pipe@1 = append_if_false(_pipe, Lag_ok, <<"lag_exceeded"/utf8>>),
    _pipe@2 = append_if_false(
        _pipe@1,
        Throughput_ok,
        <<"throughput_below"/utf8>>
    ),
    _pipe@3 = append_if_false(
        _pipe@2,
        Retry_rate_ok,
        <<"retry_rate_exceeded"/utf8>>
    ),
    append_if_false(
        _pipe@3,
        Replay_recovery_ok,
        <<"replay_recovery_exceeded"/utf8>>
    ).

-file("src/lightspeed/pipeline/slo.gleam", 325).
-spec max(integer(), integer()) -> integer().
max(Left, Right) ->
    case Left >= Right of
        true ->
            Left;

        false ->
            Right
    end.

-file("src/lightspeed/pipeline/slo.gleam", 161).
?DOC(" Retry-rate percentage derived from retry count and processed records.\n").
-spec retry_rate_percent(observation()) -> integer().
retry_rate_percent(Observation) ->
    case erlang:element(6, Observation) =< 0 of
        true ->
            case erlang:element(5, Observation) =:= 0 of
                true ->
                    0;

                false ->
                    100
            end;

        false ->
            max(0, case max(1, erlang:element(6, Observation)) of
                    0 -> 0;
                    Gleam@denominator -> (erlang:element(5, Observation) * 100)
                    div Gleam@denominator
                end)
    end.

-file("src/lightspeed/pipeline/slo.gleam", 296).
-spec find_observation(list(observation()), binary()) -> gleam@option:option(observation()).
find_observation(Observations, Profile) ->
    case Observations of
        [] ->
            none;

        [Entry | Rest] ->
            case erlang:element(2, Entry) =:= Profile of
                true ->
                    {some, Entry};

                false ->
                    find_observation(Rest, Profile)
            end
    end.

-file("src/lightspeed/pipeline/slo.gleam", 192).
-spec evaluate_one(list(observation()), budget()) -> budget_result().
evaluate_one(Observations, Budget) ->
    case find_observation(Observations, erlang:element(2, Budget)) of
        none ->
            {budget_result,
                erlang:element(2, Budget),
                false,
                <<"missing_observation"/utf8>>};

        {some, Observation} ->
            case valid_budget(Budget) andalso valid_observation(Observation) of
                false ->
                    {budget_result,
                        erlang:element(2, Budget),
                        false,
                        <<"invalid_input"/utf8>>};

                true ->
                    Lag_ok = erlang:element(3, Observation) =< erlang:element(
                        3,
                        Budget
                    ),
                    Throughput_ok = erlang:element(4, Observation) >= erlang:element(
                        4,
                        Budget
                    ),
                    Retry_rate_ok = retry_rate_percent(Observation) =< erlang:element(
                        5,
                        Budget
                    ),
                    Replay_recovery_ok = erlang:element(7, Observation) =< erlang:element(
                        6,
                        Budget
                    ),
                    Passed = ((Lag_ok andalso Throughput_ok) andalso Retry_rate_ok)
                    andalso Replay_recovery_ok,
                    Failures = failing_checks(
                        Lag_ok,
                        Throughput_ok,
                        Retry_rate_ok,
                        Replay_recovery_ok
                    ),
                    {budget_result,
                        erlang:element(2, Budget),
                        Passed,
                        case Passed of
                            true ->
                                <<"within_budget"/utf8>>;

                            false ->
                                join_with(<<"|"/utf8>>, Failures)
                        end}
            end
    end.

-file("src/lightspeed/pipeline/slo.gleam", 177).
-spec evaluate_loop(list(observation()), list(budget()), list(budget_result())) -> list(budget_result()).
evaluate_loop(Observations, Budgets, Results_rev) ->
    case Budgets of
        [] ->
            lists:reverse(Results_rev);

        [Entry | Rest] ->
            evaluate_loop(
                Observations,
                Rest,
                [evaluate_one(Observations, Entry) | Results_rev]
            )
    end.

-file("src/lightspeed/pipeline/slo.gleam", 123).
?DOC(" Evaluate one observation set against one budget list.\n").
-spec evaluate(list(observation()), list(budget())) -> list(budget_result()).
evaluate(Observations, Budgets) ->
    evaluate_loop(Observations, Budgets, []).

-file("src/lightspeed/pipeline/slo.gleam", 131).
?DOC(" Count budget failures.\n").
-spec budget_failures(list(budget_result())) -> integer().
budget_failures(Results) ->
    case Results of
        [] ->
            0;

        [Result | Rest] ->
            case erlang:element(3, Result) of
                true ->
                    budget_failures(Rest);

                false ->
                    1 + budget_failures(Rest)
            end
    end.

-file("src/lightspeed/pipeline/slo.gleam", 310).
-spec bool_label(boolean()) -> binary().
bool_label(Value) ->
    case Value of
        true ->
            <<"pass"/utf8>>;

        false ->
            <<"fail"/utf8>>
    end.

-file("src/lightspeed/pipeline/slo.gleam", 143).
?DOC(" Stable budget-result label.\n").
-spec budget_result_label(budget_result()) -> binary().
budget_result_label(Result) ->
    <<<<<<<<(erlang:element(2, Result))/binary, ":"/utf8>>/binary,
                (bool_label(erlang:element(3, Result)))/binary>>/binary,
            ":"/utf8>>/binary,
        (erlang:element(4, Result))/binary>>.

-file("src/lightspeed/pipeline/slo.gleam", 148).
?DOC(" Stable observation label.\n").
-spec observation_label(observation()) -> binary().
observation_label(Observation) ->
    <<<<<<<<<<<<<<<<(erlang:element(2, Observation))/binary, ":lag_ms="/utf8>>/binary,
                                (erlang:integer_to_binary(
                                    erlang:element(3, Observation)
                                ))/binary>>/binary,
                            ":throughput_rps="/utf8>>/binary,
                        (erlang:integer_to_binary(
                            erlang:element(4, Observation)
                        ))/binary>>/binary,
                    ":retry_rate_percent="/utf8>>/binary,
                (erlang:integer_to_binary(retry_rate_percent(Observation)))/binary>>/binary,
            ":replay_recovery_ms="/utf8>>/binary,
        (erlang:integer_to_binary(erlang:element(7, Observation)))/binary>>.