-module(lightspeed@ops@etl_reliability_harness).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/lightspeed/ops/etl_reliability_harness.gleam").
-export([budget_report_signature/1, run_budget_report/0, run_scenario/1, run_matrix/0, scenario_label/1, pass_fail_label/1, signature/1, scenario/1, deterministic/1, outcomes/1, failed_scenarios/1, nondeterministic_failures/1, report_signature/1, snapshot_signature/0, snapshot_report_markdown/0, budget_observations/1, budget_results/1, budget_failed_budgets/1, budget_snapshot_signature/0, budget_report_markdown/0]).
-export_type([scenario/0, scenario_outcome/0, report/0, budget_report/0]).
-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.
?MODULEDOC(" Deterministic ETL reliability and SLO harness for M33.\n").
-type scenario() :: connector_outage_recovery_path |
poison_message_replay_path |
schema_evolution_compatibility_path |
replay_idempotency_path |
slo_budget_compliance_path.
-type scenario_outcome() :: {scenario_outcome,
scenario(),
boolean(),
boolean(),
binary()}.
-type report() :: {report, list(scenario_outcome()), integer(), integer()}.
-type budget_report() :: {budget_report,
list(lightspeed@pipeline@slo:observation()),
list(lightspeed@pipeline@slo:budget_result()),
integer()}.
-file("src/lightspeed/ops/etl_reliability_harness.gleam", 661).
-spec count_nondeterministic(list(scenario_outcome())) -> integer().
count_nondeterministic(Outcomes) ->
case Outcomes of
[] ->
0;
[Outcome | Rest] ->
case erlang:element(4, Outcome) of
true ->
count_nondeterministic(Rest);
false ->
1 + count_nondeterministic(Rest)
end
end.
-file("src/lightspeed/ops/etl_reliability_harness.gleam", 650).
-spec count_failed(list(scenario_outcome())) -> integer().
count_failed(Outcomes) ->
case Outcomes of
[] ->
0;
[Outcome | Rest] ->
case erlang:element(3, Outcome) of
true ->
count_failed(Rest);
false ->
1 + count_failed(Rest)
end
end.
-file("src/lightspeed/ops/etl_reliability_harness.gleam", 679).
-spec join_with(binary(), list(binary())) -> binary().
join_with(Separator, Values) ->
case Values of
[] ->
<<""/utf8>>;
[Value] ->
Value;
[Value@1 | Rest] ->
<<<<Value@1/binary, Separator/binary>>/binary,
(join_with(Separator, Rest))/binary>>
end.
-file("src/lightspeed/ops/etl_reliability_harness.gleam", 223).
?DOC(" Stable budget report signature.\n").
-spec budget_report_signature(budget_report()) -> binary().
budget_report_signature(Report) ->
Observation_entries = gleam@list:map(
erlang:element(2, Report),
fun lightspeed@pipeline@slo:observation_label/1
),
Result_entries = gleam@list:map(
erlang:element(3, Report),
fun lightspeed@pipeline@slo:budget_result_label/1
),
<<<<<<<<<<"observations="/utf8,
(join_with(<<";"/utf8>>, Observation_entries))/binary>>/binary,
"|results="/utf8>>/binary,
(join_with(<<";"/utf8>>, Result_entries))/binary>>/binary,
"|failures="/utf8>>/binary,
(erlang:integer_to_binary(erlang:element(4, Report)))/binary>>.
-file("src/lightspeed/ops/etl_reliability_harness.gleam", 597).
-spec default_slo_observations() -> list(lightspeed@pipeline@slo:observation()).
default_slo_observations() ->
[lightspeed@pipeline@slo:observation(
<<"orders_hourly"/utf8>>,
140,
420,
6,
200,
2200
),
lightspeed@pipeline@slo:observation(
<<"orders_replay"/utf8>>,
280,
280,
9,
180,
3900
)].
-file("src/lightspeed/ops/etl_reliability_harness.gleam", 196).
?DOC(" Run default M33 budget evaluation report.\n").
-spec run_budget_report() -> budget_report().
run_budget_report() ->
Observations = default_slo_observations(),
Results = lightspeed@pipeline@slo:evaluate(
Observations,
lightspeed@pipeline@slo:default_budget()
),
{budget_report,
Observations,
Results,
lightspeed@pipeline@slo:budget_failures(Results)}.
-file("src/lightspeed/ops/etl_reliability_harness.gleam", 573).
-spec evaluate_slo_budget_compliance_path() -> {boolean(), binary()}.
evaluate_slo_budget_compliance_path() ->
Report = run_budget_report(),
Strict_results = lightspeed@pipeline@slo:evaluate(
default_slo_observations(),
[lightspeed@pipeline@slo:budget(
<<"orders_hourly"/utf8>>,
90,
500,
2,
2000
)]
),
Strict_failures = lightspeed@pipeline@slo:budget_failures(Strict_results),
Strict_labels = gleam@list:map(
Strict_results,
fun lightspeed@pipeline@slo:budget_result_label/1
),
Report_signature = budget_report_signature(Report),
Passed = ((erlang:element(4, Report) =:= 0) andalso (Strict_failures =:= 1))
andalso gleam_stdlib:contains_string(
join_with(<<";"/utf8>>, Strict_labels),
<<"lag_exceeded"/utf8>>
),
{Passed,
<<<<<<"budget_signature="/utf8, Report_signature/binary>>/binary,
"|strict="/utf8>>/binary,
(join_with(<<";"/utf8>>, Strict_labels))/binary>>}.
-file("src/lightspeed/ops/etl_reliability_harness.gleam", 604).
-spec fresh_pipeline_runtime() -> lightspeed@pipeline:runtime().
fresh_pipeline_runtime() ->
_pipe = lightspeed@pipeline:pipeline(
<<"orders_pipeline"/utf8>>,
{interval, 60000},
[lightspeed@pipeline:source_stage(
<<"extract_orders"/utf8>>,
<<"raw_order"/utf8>>
),
lightspeed@pipeline:transform_stage(
<<"normalize_orders"/utf8>>,
<<"raw_order"/utf8>>,
<<"normalized_order"/utf8>>
),
lightspeed@pipeline:sink_stage(
<<"write_orders"/utf8>>,
<<"normalized_order"/utf8>>
)]
),
lightspeed@pipeline:new(_pipe).
-file("src/lightspeed/ops/etl_reliability_harness.gleam", 522).
-spec evaluate_replay_idempotency_path() -> {boolean(), binary()}.
evaluate_replay_idempotency_path() ->
Runtime = begin
_pipe = fresh_pipeline_runtime(),
lightspeed@pipeline:start(_pipe, 300)
end,
{Runtime@1, _} = lightspeed@pipeline:process(
Runtime,
<<"extract_orders"/utf8>>,
5,
55,
<<"source-3"/utf8>>,
301
),
{Runtime@2, _} = lightspeed@pipeline:process(
Runtime@1,
<<"normalize_orders"/utf8>>,
5,
35,
<<"transform-3"/utf8>>,
302
),
{Runtime@3, First_sink} = lightspeed@pipeline:process(
Runtime@2,
<<"write_orders"/utf8>>,
5,
20,
<<"sink-idempotent-1"/utf8>>,
303
),
Runtime@4 = lightspeed@pipeline:crash(Runtime@3, <<"worker_restart"/utf8>>),
Resumed = case lightspeed@pipeline:resume_from_latest_checkpoint(
Runtime@4,
310
) of
{ok, Next} ->
Next;
{error, _} ->
Runtime@4
end,
{Resumed@1, Duplicate_sink} = lightspeed@pipeline:process(
Resumed,
<<"write_orders"/utf8>>,
5,
18,
<<"sink-idempotent-1"/utf8>>,
311
),
{Resumed@2, Second_sink} = lightspeed@pipeline:process(
Resumed@1,
<<"write_orders"/utf8>>,
5,
16,
<<"sink-idempotent-2"/utf8>>,
312
),
Resumed@3 = lightspeed@pipeline:complete(Resumed@2, 320),
Lifecycle_label = lightspeed@pipeline:lifecycle_label(
lightspeed@pipeline:lifecycle(Resumed@3)
),
Duplicate_label = lightspeed@pipeline:process_result_label(Duplicate_sink),
Telemetry_label = lightspeed@pipeline@telemetry:label(
lightspeed@pipeline:runtime_telemetry(Resumed@3)
),
Sink_keys = lightspeed@pipeline:sink_idempotency_keys(Resumed@3),
Passed = ((((gleam_stdlib:contains_string(
lightspeed@pipeline:process_result_label(First_sink),
<<"applied:run=run-1|stage=write_orders"/utf8>>
)
andalso (Duplicate_label =:= <<"duplicate_suppressed:write_orders:sink-idempotent-1"/utf8>>))
andalso gleam_stdlib:contains_string(
lightspeed@pipeline:process_result_label(Second_sink),
<<"applied:run=run-2-replay|stage=write_orders"/utf8>>
))
andalso (Sink_keys =:= [<<"sink-idempotent-1"/utf8>>,
<<"sink-idempotent-2"/utf8>>]))
andalso gleam_stdlib:string_starts_with(
Lifecycle_label,
<<"completed:run-2-replay:"/utf8>>
))
andalso gleam_stdlib:contains_string(
Telemetry_label,
<<"dead_letters=0"/utf8>>
),
{Passed,
<<<<<<<<<<<<<<"lifecycle="/utf8, Lifecycle_label/binary>>/binary,
"|duplicate="/utf8>>/binary,
Duplicate_label/binary>>/binary,
"|sink_keys="/utf8>>/binary,
(join_with(<<","/utf8>>, Sink_keys))/binary>>/binary,
"|telemetry="/utf8>>/binary,
Telemetry_label/binary>>}.
-file("src/lightspeed/ops/etl_reliability_harness.gleam", 632).
-spec result_label({ok, nil} | {error, binary()}) -> binary().
result_label(Result) ->
case Result of
{ok, _} ->
<<"ok"/utf8>>;
{error, Reason} ->
<<"error:"/utf8, Reason/binary>>
end.
-file("src/lightspeed/ops/etl_reliability_harness.gleam", 453).
-spec evaluate_schema_evolution_compatibility_path() -> {boolean(), binary()}.
evaluate_schema_evolution_compatibility_path() ->
V1 = lightspeed@pipeline@quality:schema(
<<"orders_boundary"/utf8>>,
1,
[lightspeed@pipeline@quality:field(
<<"order_id"/utf8>>,
string_type,
true
),
lightspeed@pipeline@quality:field(
<<"total_cents"/utf8>>,
int_type,
true
),
lightspeed@pipeline@quality:field(
<<"is_priority"/utf8>>,
bool_type,
false
)]
),
V2 = lightspeed@pipeline@quality:schema(
<<"orders_boundary"/utf8>>,
2,
[lightspeed@pipeline@quality:field(
<<"order_id"/utf8>>,
string_type,
true
),
lightspeed@pipeline@quality:field(
<<"total_cents"/utf8>>,
int_type,
true
),
lightspeed@pipeline@quality:field(
<<"is_priority"/utf8>>,
bool_type,
false
),
lightspeed@pipeline@quality:field(
<<"currency_code"/utf8>>,
string_type,
false
)]
),
Breaking = lightspeed@pipeline@quality:schema(
<<"orders_boundary"/utf8>>,
3,
[lightspeed@pipeline@quality:field(
<<"order_id"/utf8>>,
string_type,
true
),
lightspeed@pipeline@quality:field(
<<"total_cents"/utf8>>,
int_type,
true
),
lightspeed@pipeline@quality:field(
<<"is_priority"/utf8>>,
bool_type,
false
),
lightspeed@pipeline@quality:field(
<<"currency_code"/utf8>>,
string_type,
false
),
lightspeed@pipeline@quality:field(
<<"tenant_id"/utf8>>,
string_type,
true
)]
),
Good_payload = [lightspeed@pipeline@quality:string_value(
<<"order_id"/utf8>>,
<<"order-9"/utf8>>
),
lightspeed@pipeline@quality:int_value(<<"total_cents"/utf8>>, 1900),
lightspeed@pipeline@quality:bool_value(<<"is_priority"/utf8>>, true)],
Bad_payload = [lightspeed@pipeline@quality:string_value(
<<"order_id"/utf8>>,
<<"order-10"/utf8>>
),
lightspeed@pipeline@quality:string_value(
<<"total_cents"/utf8>>,
<<"invalid"/utf8>>
),
lightspeed@pipeline@quality:bool_value(<<"is_priority"/utf8>>, false),
lightspeed@pipeline@quality:int_value(<<"unexpected_counter"/utf8>>, 1)],
Compatibility = lightspeed@pipeline@quality:check_compatibility(V1, V2),
Incompatible = lightspeed@pipeline@quality:check_compatibility(V2, Breaking),
Good_validation = lightspeed@pipeline@quality:validate(V2, Good_payload),
Bad_validation = lightspeed@pipeline@quality:validate(V2, Bad_payload),
Compatibility_label = result_label(Compatibility),
Incompatible_label = result_label(Incompatible),
Good_label = lightspeed@pipeline@quality:validation_result_label(
Good_validation
),
Bad_label = lightspeed@pipeline@quality:validation_result_label(
Bad_validation
),
Passed = ((((Compatibility =:= {ok, nil}) andalso (Incompatible =:= {error,
<<"new_required_field:tenant_id"/utf8>>}))
andalso gleam_stdlib:string_starts_with(Good_label, <<"valid:"/utf8>>))
andalso gleam_stdlib:contains_string(
Bad_label,
<<"type_mismatch:total_cents:expected=int:actual=string"/utf8>>
))
andalso gleam_stdlib:contains_string(
Bad_label,
<<"unexpected_field:unexpected_counter"/utf8>>
),
{Passed,
<<<<<<<<<<<<<<<<<<<<<<"v1="/utf8,
(lightspeed@pipeline@quality:schema_signature(
V1
))/binary>>/binary,
"|v2="/utf8>>/binary,
(lightspeed@pipeline@quality:schema_signature(
V2
))/binary>>/binary,
"|compatibility="/utf8>>/binary,
Compatibility_label/binary>>/binary,
"|breaking="/utf8>>/binary,
Incompatible_label/binary>>/binary,
"|good_validation="/utf8>>/binary,
Good_label/binary>>/binary,
"|bad_validation="/utf8>>/binary,
Bad_label/binary>>}.
-file("src/lightspeed/ops/etl_reliability_harness.gleam", 623).
-spec process_outcome_result_label(
{ok, lightspeed@pipeline@orchestrator:process_outcome()} | {error, binary()}
) -> binary().
process_outcome_result_label(Result) ->
case Result of
{ok, Outcome} ->
lightspeed@pipeline@orchestrator:process_outcome_label(Outcome);
{error, Reason} ->
<<"error:"/utf8, Reason/binary>>
end.
-file("src/lightspeed/ops/etl_reliability_harness.gleam", 617).
-spec fresh_orchestrator_runtime(lightspeed@async@backpressure:boundary()) -> lightspeed@pipeline@orchestrator:runtime().
fresh_orchestrator_runtime(Boundary) ->
lightspeed@pipeline@orchestrator:new(
fresh_pipeline_runtime(),
lightspeed@pipeline@connector:default_plan(),
Boundary
).
-file("src/lightspeed/ops/etl_reliability_harness.gleam", 346).
-spec evaluate_poison_message_replay_path() -> {boolean(), binary()}.
evaluate_poison_message_replay_path() ->
Runtime = fresh_orchestrator_runtime(
lightspeed@async@backpressure:default_boundary()
),
Runtime@1 = lightspeed@pipeline@orchestrator:start_run(Runtime, 200),
{Runtime@2, _} = lightspeed@pipeline@orchestrator:enqueue_batch(
Runtime@1,
<<"batch-source-2"/utf8>>,
<<"extract_orders"/utf8>>,
6,
60,
<<"source-2"/utf8>>,
1
),
{Runtime@3, _} = lightspeed@pipeline@orchestrator:start_available(
Runtime@2,
201
),
{Runtime@4, _} = lightspeed@pipeline@orchestrator:ack_batch(
Runtime@3,
<<"batch-source-2"/utf8>>,
202
),
{Runtime@5, _} = lightspeed@pipeline@orchestrator:enqueue_batch(
Runtime@4,
<<"batch-transform-2"/utf8>>,
<<"normalize_orders"/utf8>>,
6,
45,
<<"transform-2"/utf8>>,
2
),
{Runtime@6, _} = lightspeed@pipeline@orchestrator:start_available(
Runtime@5,
203
),
{Runtime@7, _} = lightspeed@pipeline@orchestrator:ack_batch(
Runtime@6,
<<"batch-transform-2"/utf8>>,
204
),
{Runtime@8, _} = lightspeed@pipeline@orchestrator:enqueue_batch(
Runtime@7,
<<"batch-poison-2"/utf8>>,
<<"write_orders"/utf8>>,
6,
40,
<<"poison-key-2"/utf8>>,
3
),
{Runtime@9, _} = lightspeed@pipeline@orchestrator:start_available(
Runtime@8,
205
),
{Runtime@10, Dead_outcome} = lightspeed@pipeline@orchestrator:fail_batch(
Runtime@9,
<<"batch-poison-2"/utf8>>,
3,
<<"poison_message"/utf8>>,
206
),
{Runtime@11, Replay_result} = lightspeed@pipeline@orchestrator:apply_action(
Runtime@10,
{replay, <<"normalize_orders"/utf8>>, <<"quarantine_replay"/utf8>>},
207
),
{Runtime@12, Replay_resume_result} = lightspeed@pipeline@orchestrator:apply_action(
Runtime@11,
{resume, <<"resume_after_replay"/utf8>>},
208
),
{Runtime@13, Replay_enqueue_result} = lightspeed@pipeline@orchestrator:enqueue_batch(
Runtime@12,
<<"batch-replay-2"/utf8>>,
<<"write_orders"/utf8>>,
6,
35,
<<"recovered-key-2"/utf8>>,
4
),
{Runtime@14, _} = lightspeed@pipeline@orchestrator:start_available(
Runtime@13,
209
),
{Runtime@15, Replay_sink_outcome} = lightspeed@pipeline@orchestrator:ack_batch(
Runtime@14,
<<"batch-replay-2"/utf8>>,
210
),
Runtime@16 = lightspeed@pipeline@orchestrator:complete_run(Runtime@15, 215),
Dead_label = process_outcome_result_label(Dead_outcome),
Replay_sink_label = process_outcome_result_label(Replay_sink_outcome),
Lifecycle_label = lightspeed@pipeline:lifecycle_label(
lightspeed@pipeline:lifecycle(
lightspeed@pipeline@orchestrator:runtime_pipeline(Runtime@16)
)
),
Operator_signature = lightspeed@pipeline@operator:signature(
lightspeed@pipeline@orchestrator:runtime_operator(Runtime@16)
),
Passed = ((((((gleam_stdlib:contains_string(
Dead_label,
<<"dead_lettered:batch-poison-2:orders.dead_letter:poison_message"/utf8>>
)
andalso (Replay_result =:= {ok, nil}))
andalso (Replay_resume_result =:= {ok, nil}))
andalso (Replay_enqueue_result =:= {ok, nil}))
andalso gleam_stdlib:contains_string(
Replay_sink_label,
<<"processed:applied:"/utf8>>
))
andalso gleam_stdlib:string_starts_with(
Lifecycle_label,
<<"completed:run-2-replay:"/utf8>>
))
andalso (erlang:length(
lightspeed@pipeline@orchestrator:dead_letters(Runtime@16)
)
=:= 1))
andalso gleam_stdlib:contains_string(
Operator_signature,
<<"action=replay:normalize_orders:quarantine_replay"/utf8>>
),
{Passed,
<<<<<<<<<<<<<<"dead_letter="/utf8, Dead_label/binary>>/binary,
"|replay_sink="/utf8>>/binary,
Replay_sink_label/binary>>/binary,
"|lifecycle="/utf8>>/binary,
Lifecycle_label/binary>>/binary,
"|operator="/utf8>>/binary,
Operator_signature/binary>>}.
-file("src/lightspeed/ops/etl_reliability_harness.gleam", 639).
-spec started_has_key(list(binary()), binary()) -> boolean().
started_has_key(Entries, Key) ->
case Entries of
[] ->
false;
[Entry | Rest] ->
case gleam_stdlib:string_starts_with(
Entry,
<<Key/binary, ":"/utf8>>
) of
true ->
true;
false ->
started_has_key(Rest, Key)
end
end.
-file("src/lightspeed/ops/etl_reliability_harness.gleam", 283).
-spec evaluate_connector_outage_recovery_path() -> {boolean(), binary()}.
evaluate_connector_outage_recovery_path() ->
Runtime = fresh_orchestrator_runtime(
lightspeed@async@backpressure:default_boundary()
),
Runtime@1 = lightspeed@pipeline@orchestrator:start_run(Runtime, 100),
{Runtime@2, Enqueue_result} = lightspeed@pipeline@orchestrator:enqueue_batch(
Runtime@1,
<<"batch-outage-1"/utf8>>,
<<"extract_orders"/utf8>>,
10,
90,
<<"source-outage-1"/utf8>>,
1
),
{Runtime@3, Started_before_fail} = lightspeed@pipeline@orchestrator:start_available(
Runtime@2,
101
),
{Runtime@4, Retry_outcome} = lightspeed@pipeline@orchestrator:fail_batch(
Runtime@3,
<<"batch-outage-1"/utf8>>,
1,
<<"connector_outage"/utf8>>,
102
),
{Runtime@5, Started_after_retry} = lightspeed@pipeline@orchestrator:start_available(
Runtime@4,
103
),
{Runtime@6, Success_outcome} = lightspeed@pipeline@orchestrator:ack_batch(
Runtime@5,
<<"batch-outage-1"/utf8>>,
104
),
Runtime@7 = lightspeed@pipeline@orchestrator:complete_run(Runtime@6, 110),
Telemetry_label = lightspeed@pipeline@telemetry:label(
lightspeed@pipeline:runtime_telemetry(
lightspeed@pipeline@orchestrator:runtime_pipeline(Runtime@7)
)
),
Retry_label = process_outcome_result_label(Retry_outcome),
Success_label = process_outcome_result_label(Success_outcome),
Lifecycle_label = lightspeed@pipeline:lifecycle_label(
lightspeed@pipeline:lifecycle(
lightspeed@pipeline@orchestrator:runtime_pipeline(Runtime@7)
)
),
Passed = ((((((((Enqueue_result =:= {ok, nil}) andalso started_has_key(
Started_before_fail,
<<"batch-outage-1"/utf8>>
))
andalso started_has_key(Started_after_retry, <<"batch-outage-1"/utf8>>))
andalso gleam_stdlib:string_starts_with(
Retry_label,
<<"retry_scheduled:batch-outage-1:2:"/utf8>>
))
andalso gleam_stdlib:contains_string(
Success_label,
<<"processed:applied:"/utf8>>
))
andalso gleam_stdlib:string_starts_with(
Lifecycle_label,
<<"completed:run-1:"/utf8>>
))
andalso (erlang:length(lightspeed@pipeline@orchestrator:retries(Runtime@7))
=:= 1))
andalso (lightspeed@pipeline@orchestrator:dead_letters(Runtime@7) =:= []))
andalso gleam_stdlib:contains_string(Telemetry_label, <<"retries=1"/utf8>>),
{Passed,
<<<<<<<<<<<<<<"retry="/utf8, Retry_label/binary>>/binary,
"|success="/utf8>>/binary,
Success_label/binary>>/binary,
"|lifecycle="/utf8>>/binary,
Lifecycle_label/binary>>/binary,
"|telemetry="/utf8>>/binary,
Telemetry_label/binary>>}.
-file("src/lightspeed/ops/etl_reliability_harness.gleam", 272).
-spec evaluate(scenario()) -> {boolean(), binary()}.
evaluate(Scenario) ->
case Scenario of
connector_outage_recovery_path ->
evaluate_connector_outage_recovery_path();
poison_message_replay_path ->
evaluate_poison_message_replay_path();
schema_evolution_compatibility_path ->
evaluate_schema_evolution_compatibility_path();
replay_idempotency_path ->
evaluate_replay_idempotency_path();
slo_budget_compliance_path ->
evaluate_slo_budget_compliance_path()
end.
-file("src/lightspeed/ops/etl_reliability_harness.gleam", 76).
?DOC(" Run one scenario twice and require deterministic parity.\n").
-spec run_scenario(scenario()) -> scenario_outcome().
run_scenario(Scenario) ->
{First_passed, First_signature} = evaluate(Scenario),
{Second_passed, Second_signature} = evaluate(Scenario),
Deterministic = (First_passed =:= Second_passed) andalso (First_signature
=:= Second_signature),
Passed = (First_passed andalso Second_passed) andalso Deterministic,
{scenario_outcome, Scenario, Passed, Deterministic, First_signature}.
-file("src/lightspeed/ops/etl_reliability_harness.gleam", 57).
?DOC(" Run all M33 reliability scenarios.\n").
-spec run_matrix() -> report().
run_matrix() ->
Outcomes = begin
_pipe = [connector_outage_recovery_path,
poison_message_replay_path,
schema_evolution_compatibility_path,
replay_idempotency_path,
slo_budget_compliance_path],
gleam@list:map(_pipe, fun run_scenario/1)
end,
{report, Outcomes, count_failed(Outcomes), count_nondeterministic(Outcomes)}.
-file("src/lightspeed/ops/etl_reliability_harness.gleam", 92).
?DOC(" Scenario label.\n").
-spec scenario_label(scenario()) -> binary().
scenario_label(Scenario) ->
case Scenario of
connector_outage_recovery_path ->
<<"connector_outage_recovery_path"/utf8>>;
poison_message_replay_path ->
<<"poison_message_replay_path"/utf8>>;
schema_evolution_compatibility_path ->
<<"schema_evolution_compatibility_path"/utf8>>;
replay_idempotency_path ->
<<"replay_idempotency_path"/utf8>>;
slo_budget_compliance_path ->
<<"slo_budget_compliance_path"/utf8>>
end.
-file("src/lightspeed/ops/etl_reliability_harness.gleam", 103).
?DOC(" Stable pass/fail label.\n").
-spec pass_fail_label(scenario_outcome()) -> binary().
pass_fail_label(Outcome) ->
case erlang:element(3, Outcome) of
true ->
<<"pass"/utf8>>;
false ->
<<"fail"/utf8>>
end.
-file("src/lightspeed/ops/etl_reliability_harness.gleam", 111).
?DOC(" Scenario signature accessor.\n").
-spec signature(scenario_outcome()) -> binary().
signature(Outcome) ->
erlang:element(5, Outcome).
-file("src/lightspeed/ops/etl_reliability_harness.gleam", 116).
?DOC(" Scenario accessor.\n").
-spec scenario(scenario_outcome()) -> scenario().
scenario(Outcome) ->
erlang:element(2, Outcome).
-file("src/lightspeed/ops/etl_reliability_harness.gleam", 121).
?DOC(" Scenario determinism accessor.\n").
-spec deterministic(scenario_outcome()) -> boolean().
deterministic(Outcome) ->
erlang:element(4, Outcome).
-file("src/lightspeed/ops/etl_reliability_harness.gleam", 126).
?DOC(" Report outcomes.\n").
-spec outcomes(report()) -> list(scenario_outcome()).
outcomes(Report) ->
erlang:element(2, Report).
-file("src/lightspeed/ops/etl_reliability_harness.gleam", 131).
?DOC(" Failed scenario count.\n").
-spec failed_scenarios(report()) -> integer().
failed_scenarios(Report) ->
erlang:element(3, Report).
-file("src/lightspeed/ops/etl_reliability_harness.gleam", 136).
?DOC(" Nondeterministic failure count.\n").
-spec nondeterministic_failures(report()) -> integer().
nondeterministic_failures(Report) ->
erlang:element(4, Report).
-file("src/lightspeed/ops/etl_reliability_harness.gleam", 672).
-spec bool_label(boolean()) -> binary().
bool_label(Value) ->
case Value of
true ->
<<"true"/utf8>>;
false ->
<<"false"/utf8>>
end.
-file("src/lightspeed/ops/etl_reliability_harness.gleam", 141).
?DOC(" Stable report signature.\n").
-spec report_signature(report()) -> binary().
report_signature(Report) ->
Entries = gleam@list:map(
erlang:element(2, Report),
fun(Outcome) ->
<<<<<<<<<<<<(scenario_label(erlang:element(2, Outcome)))/binary,
"="/utf8>>/binary,
(pass_fail_label(Outcome))/binary>>/binary,
":deterministic="/utf8>>/binary,
(bool_label(erlang:element(4, Outcome)))/binary>>/binary,
":"/utf8>>/binary,
(erlang:element(5, Outcome))/binary>>
end
),
join_with(<<";"/utf8>>, Entries).
-file("src/lightspeed/ops/etl_reliability_harness.gleam", 157).
?DOC(" Deterministic snapshot signature for fixture drift gates.\n").
-spec snapshot_signature() -> binary().
snapshot_signature() ->
<<<<<<"m33.snapshot.v"/utf8, (erlang:integer_to_binary(1))/binary>>/binary,
"|"/utf8>>/binary,
(report_signature(run_matrix()))/binary>>.
-file("src/lightspeed/ops/etl_reliability_harness.gleam", 165).
?DOC(" Deterministic markdown report for fixture scripts.\n").
-spec snapshot_report_markdown() -> binary().
snapshot_report_markdown() ->
Report = run_matrix(),
Failed = failed_scenarios(Report),
Nondeterministic = nondeterministic_failures(Report),
Status = case (Failed =:= 0) andalso (Nondeterministic =:= 0) of
true ->
<<"OK"/utf8>>;
false ->
<<"FAIL"/utf8>>
end,
<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<"# ETL Reliability Fixture Report\n\n"/utf8,
"snapshot_version: "/utf8>>/binary,
(erlang:integer_to_binary(
1
))/binary>>/binary,
"\n"/utf8>>/binary,
"status: "/utf8>>/binary,
Status/binary>>/binary,
"\n"/utf8>>/binary,
"failed_scenarios: "/utf8>>/binary,
(erlang:integer_to_binary(
Failed
))/binary>>/binary,
"\n"/utf8>>/binary,
"nondeterministic_failures: "/utf8>>/binary,
(erlang:integer_to_binary(Nondeterministic))/binary>>/binary,
"\n\n"/utf8>>/binary,
"snapshot_signature: "/utf8>>/binary,
(snapshot_signature())/binary>>/binary,
"\n\n"/utf8>>/binary,
"report_signature: "/utf8>>/binary,
(report_signature(Report))/binary>>/binary,
"\n"/utf8>>.
-file("src/lightspeed/ops/etl_reliability_harness.gleam", 208).
?DOC(" Budget-report observation accessor.\n").
-spec budget_observations(budget_report()) -> list(lightspeed@pipeline@slo:observation()).
budget_observations(Report) ->
erlang:element(2, Report).
-file("src/lightspeed/ops/etl_reliability_harness.gleam", 213).
?DOC(" Budget-report results accessor.\n").
-spec budget_results(budget_report()) -> list(lightspeed@pipeline@slo:budget_result()).
budget_results(Report) ->
erlang:element(3, Report).
-file("src/lightspeed/ops/etl_reliability_harness.gleam", 218).
?DOC(" Budget-report failed count accessor.\n").
-spec budget_failed_budgets(budget_report()) -> integer().
budget_failed_budgets(Report) ->
erlang:element(4, Report).
-file("src/lightspeed/ops/etl_reliability_harness.gleam", 236).
?DOC(" Deterministic budget snapshot signature for drift gates.\n").
-spec budget_snapshot_signature() -> binary().
budget_snapshot_signature() ->
<<<<<<"m33.slo.snapshot.v"/utf8, (erlang:integer_to_binary(1))/binary>>/binary,
"|"/utf8>>/binary,
(budget_report_signature(run_budget_report()))/binary>>.
-file("src/lightspeed/ops/etl_reliability_harness.gleam", 244).
?DOC(" Deterministic markdown budget report for scripts.\n").
-spec budget_report_markdown() -> binary().
budget_report_markdown() ->
Report = run_budget_report(),
Status = case erlang:element(4, Report) =:= 0 of
true ->
<<"OK"/utf8>>;
false ->
<<"FAIL"/utf8>>
end,
<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<"# ETL SLO Budget Report\n\n"/utf8,
"budget_snapshot_version: "/utf8>>/binary,
(erlang:integer_to_binary(
1
))/binary>>/binary,
"\n"/utf8>>/binary,
"budget_version: "/utf8>>/binary,
(lightspeed@pipeline@slo:budget_version_label(
))/binary>>/binary,
"\n"/utf8>>/binary,
"status: "/utf8>>/binary,
Status/binary>>/binary,
"\n"/utf8>>/binary,
"failed_budgets: "/utf8>>/binary,
(erlang:integer_to_binary(
erlang:element(4, Report)
))/binary>>/binary,
"\n\n"/utf8>>/binary,
"budget_snapshot_signature: "/utf8>>/binary,
(budget_snapshot_signature())/binary>>/binary,
"\n\n"/utf8>>/binary,
"budget_report_signature: "/utf8>>/binary,
(budget_report_signature(Report))/binary>>/binary,
"\n"/utf8>>.