src/lightspeed@ops@etl_reliability_harness.erl

-module(lightspeed@ops@etl_reliability_harness).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/lightspeed/ops/etl_reliability_harness.gleam").
-export([budget_report_signature/1, run_budget_report/0, run_scenario/1, run_matrix/0, scenario_label/1, pass_fail_label/1, signature/1, scenario/1, deterministic/1, outcomes/1, failed_scenarios/1, nondeterministic_failures/1, report_signature/1, snapshot_signature/0, snapshot_report_markdown/0, budget_observations/1, budget_results/1, budget_failed_budgets/1, budget_snapshot_signature/0, budget_report_markdown/0]).
-export_type([scenario/0, scenario_outcome/0, report/0, budget_report/0]).

-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.

?MODULEDOC(" Deterministic ETL reliability and SLO harness for M33.\n").

-type scenario() :: connector_outage_recovery_path |
    poison_message_replay_path |
    schema_evolution_compatibility_path |
    replay_idempotency_path |
    slo_budget_compliance_path.

-type scenario_outcome() :: {scenario_outcome,
        scenario(),
        boolean(),
        boolean(),
        binary()}.

-type report() :: {report, list(scenario_outcome()), integer(), integer()}.

-type budget_report() :: {budget_report,
        list(lightspeed@pipeline@slo:observation()),
        list(lightspeed@pipeline@slo:budget_result()),
        integer()}.

-file("src/lightspeed/ops/etl_reliability_harness.gleam", 661).
-spec count_nondeterministic(list(scenario_outcome())) -> integer().
count_nondeterministic(Outcomes) ->
    case Outcomes of
        [] ->
            0;

        [Outcome | Rest] ->
            case erlang:element(4, Outcome) of
                true ->
                    count_nondeterministic(Rest);

                false ->
                    1 + count_nondeterministic(Rest)
            end
    end.

-file("src/lightspeed/ops/etl_reliability_harness.gleam", 650).
-spec count_failed(list(scenario_outcome())) -> integer().
count_failed(Outcomes) ->
    case Outcomes of
        [] ->
            0;

        [Outcome | Rest] ->
            case erlang:element(3, Outcome) of
                true ->
                    count_failed(Rest);

                false ->
                    1 + count_failed(Rest)
            end
    end.

-file("src/lightspeed/ops/etl_reliability_harness.gleam", 679).
-spec join_with(binary(), list(binary())) -> binary().
join_with(Separator, Values) ->
    case Values of
        [] ->
            <<""/utf8>>;

        [Value] ->
            Value;

        [Value@1 | Rest] ->
            <<<<Value@1/binary, Separator/binary>>/binary,
                (join_with(Separator, Rest))/binary>>
    end.

-file("src/lightspeed/ops/etl_reliability_harness.gleam", 223).
?DOC(" Stable budget report signature.\n").
-spec budget_report_signature(budget_report()) -> binary().
budget_report_signature(Report) ->
    Observation_entries = gleam@list:map(
        erlang:element(2, Report),
        fun lightspeed@pipeline@slo:observation_label/1
    ),
    Result_entries = gleam@list:map(
        erlang:element(3, Report),
        fun lightspeed@pipeline@slo:budget_result_label/1
    ),
    <<<<<<<<<<"observations="/utf8,
                        (join_with(<<";"/utf8>>, Observation_entries))/binary>>/binary,
                    "|results="/utf8>>/binary,
                (join_with(<<";"/utf8>>, Result_entries))/binary>>/binary,
            "|failures="/utf8>>/binary,
        (erlang:integer_to_binary(erlang:element(4, Report)))/binary>>.

-file("src/lightspeed/ops/etl_reliability_harness.gleam", 597).
-spec default_slo_observations() -> list(lightspeed@pipeline@slo:observation()).
default_slo_observations() ->
    [lightspeed@pipeline@slo:observation(
            <<"orders_hourly"/utf8>>,
            140,
            420,
            6,
            200,
            2200
        ),
        lightspeed@pipeline@slo:observation(
            <<"orders_replay"/utf8>>,
            280,
            280,
            9,
            180,
            3900
        )].

-file("src/lightspeed/ops/etl_reliability_harness.gleam", 196).
?DOC(" Run default M33 budget evaluation report.\n").
-spec run_budget_report() -> budget_report().
run_budget_report() ->
    Observations = default_slo_observations(),
    Results = lightspeed@pipeline@slo:evaluate(
        Observations,
        lightspeed@pipeline@slo:default_budget()
    ),
    {budget_report,
        Observations,
        Results,
        lightspeed@pipeline@slo:budget_failures(Results)}.

-file("src/lightspeed/ops/etl_reliability_harness.gleam", 573).
-spec evaluate_slo_budget_compliance_path() -> {boolean(), binary()}.
evaluate_slo_budget_compliance_path() ->
    Report = run_budget_report(),
    Strict_results = lightspeed@pipeline@slo:evaluate(
        default_slo_observations(),
        [lightspeed@pipeline@slo:budget(
                <<"orders_hourly"/utf8>>,
                90,
                500,
                2,
                2000
            )]
    ),
    Strict_failures = lightspeed@pipeline@slo:budget_failures(Strict_results),
    Strict_labels = gleam@list:map(
        Strict_results,
        fun lightspeed@pipeline@slo:budget_result_label/1
    ),
    Report_signature = budget_report_signature(Report),
    Passed = ((erlang:element(4, Report) =:= 0) andalso (Strict_failures =:= 1))
    andalso gleam_stdlib:contains_string(
        join_with(<<";"/utf8>>, Strict_labels),
        <<"lag_exceeded"/utf8>>
    ),
    {Passed,
        <<<<<<"budget_signature="/utf8, Report_signature/binary>>/binary,
                "|strict="/utf8>>/binary,
            (join_with(<<";"/utf8>>, Strict_labels))/binary>>}.

-file("src/lightspeed/ops/etl_reliability_harness.gleam", 604).
-spec fresh_pipeline_runtime() -> lightspeed@pipeline:runtime().
fresh_pipeline_runtime() ->
    _pipe = lightspeed@pipeline:pipeline(
        <<"orders_pipeline"/utf8>>,
        {interval, 60000},
        [lightspeed@pipeline:source_stage(
                <<"extract_orders"/utf8>>,
                <<"raw_order"/utf8>>
            ),
            lightspeed@pipeline:transform_stage(
                <<"normalize_orders"/utf8>>,
                <<"raw_order"/utf8>>,
                <<"normalized_order"/utf8>>
            ),
            lightspeed@pipeline:sink_stage(
                <<"write_orders"/utf8>>,
                <<"normalized_order"/utf8>>
            )]
    ),
    lightspeed@pipeline:new(_pipe).

-file("src/lightspeed/ops/etl_reliability_harness.gleam", 522).
-spec evaluate_replay_idempotency_path() -> {boolean(), binary()}.
evaluate_replay_idempotency_path() ->
    Runtime = begin
        _pipe = fresh_pipeline_runtime(),
        lightspeed@pipeline:start(_pipe, 300)
    end,
    {Runtime@1, _} = lightspeed@pipeline:process(
        Runtime,
        <<"extract_orders"/utf8>>,
        5,
        55,
        <<"source-3"/utf8>>,
        301
    ),
    {Runtime@2, _} = lightspeed@pipeline:process(
        Runtime@1,
        <<"normalize_orders"/utf8>>,
        5,
        35,
        <<"transform-3"/utf8>>,
        302
    ),
    {Runtime@3, First_sink} = lightspeed@pipeline:process(
        Runtime@2,
        <<"write_orders"/utf8>>,
        5,
        20,
        <<"sink-idempotent-1"/utf8>>,
        303
    ),
    Runtime@4 = lightspeed@pipeline:crash(Runtime@3, <<"worker_restart"/utf8>>),
    Resumed = case lightspeed@pipeline:resume_from_latest_checkpoint(
        Runtime@4,
        310
    ) of
        {ok, Next} ->
            Next;

        {error, _} ->
            Runtime@4
    end,
    {Resumed@1, Duplicate_sink} = lightspeed@pipeline:process(
        Resumed,
        <<"write_orders"/utf8>>,
        5,
        18,
        <<"sink-idempotent-1"/utf8>>,
        311
    ),
    {Resumed@2, Second_sink} = lightspeed@pipeline:process(
        Resumed@1,
        <<"write_orders"/utf8>>,
        5,
        16,
        <<"sink-idempotent-2"/utf8>>,
        312
    ),
    Resumed@3 = lightspeed@pipeline:complete(Resumed@2, 320),
    Lifecycle_label = lightspeed@pipeline:lifecycle_label(
        lightspeed@pipeline:lifecycle(Resumed@3)
    ),
    Duplicate_label = lightspeed@pipeline:process_result_label(Duplicate_sink),
    Telemetry_label = lightspeed@pipeline@telemetry:label(
        lightspeed@pipeline:runtime_telemetry(Resumed@3)
    ),
    Sink_keys = lightspeed@pipeline:sink_idempotency_keys(Resumed@3),
    Passed = ((((gleam_stdlib:contains_string(
        lightspeed@pipeline:process_result_label(First_sink),
        <<"applied:run=run-1|stage=write_orders"/utf8>>
    )
    andalso (Duplicate_label =:= <<"duplicate_suppressed:write_orders:sink-idempotent-1"/utf8>>))
    andalso gleam_stdlib:contains_string(
        lightspeed@pipeline:process_result_label(Second_sink),
        <<"applied:run=run-2-replay|stage=write_orders"/utf8>>
    ))
    andalso (Sink_keys =:= [<<"sink-idempotent-1"/utf8>>,
        <<"sink-idempotent-2"/utf8>>]))
    andalso gleam_stdlib:string_starts_with(
        Lifecycle_label,
        <<"completed:run-2-replay:"/utf8>>
    ))
    andalso gleam_stdlib:contains_string(
        Telemetry_label,
        <<"dead_letters=0"/utf8>>
    ),
    {Passed,
        <<<<<<<<<<<<<<"lifecycle="/utf8, Lifecycle_label/binary>>/binary,
                                "|duplicate="/utf8>>/binary,
                            Duplicate_label/binary>>/binary,
                        "|sink_keys="/utf8>>/binary,
                    (join_with(<<","/utf8>>, Sink_keys))/binary>>/binary,
                "|telemetry="/utf8>>/binary,
            Telemetry_label/binary>>}.

-file("src/lightspeed/ops/etl_reliability_harness.gleam", 632).
-spec result_label({ok, nil} | {error, binary()}) -> binary().
result_label(Result) ->
    case Result of
        {ok, _} ->
            <<"ok"/utf8>>;

        {error, Reason} ->
            <<"error:"/utf8, Reason/binary>>
    end.

-file("src/lightspeed/ops/etl_reliability_harness.gleam", 453).
-spec evaluate_schema_evolution_compatibility_path() -> {boolean(), binary()}.
evaluate_schema_evolution_compatibility_path() ->
    V1 = lightspeed@pipeline@quality:schema(
        <<"orders_boundary"/utf8>>,
        1,
        [lightspeed@pipeline@quality:field(
                <<"order_id"/utf8>>,
                string_type,
                true
            ),
            lightspeed@pipeline@quality:field(
                <<"total_cents"/utf8>>,
                int_type,
                true
            ),
            lightspeed@pipeline@quality:field(
                <<"is_priority"/utf8>>,
                bool_type,
                false
            )]
    ),
    V2 = lightspeed@pipeline@quality:schema(
        <<"orders_boundary"/utf8>>,
        2,
        [lightspeed@pipeline@quality:field(
                <<"order_id"/utf8>>,
                string_type,
                true
            ),
            lightspeed@pipeline@quality:field(
                <<"total_cents"/utf8>>,
                int_type,
                true
            ),
            lightspeed@pipeline@quality:field(
                <<"is_priority"/utf8>>,
                bool_type,
                false
            ),
            lightspeed@pipeline@quality:field(
                <<"currency_code"/utf8>>,
                string_type,
                false
            )]
    ),
    Breaking = lightspeed@pipeline@quality:schema(
        <<"orders_boundary"/utf8>>,
        3,
        [lightspeed@pipeline@quality:field(
                <<"order_id"/utf8>>,
                string_type,
                true
            ),
            lightspeed@pipeline@quality:field(
                <<"total_cents"/utf8>>,
                int_type,
                true
            ),
            lightspeed@pipeline@quality:field(
                <<"is_priority"/utf8>>,
                bool_type,
                false
            ),
            lightspeed@pipeline@quality:field(
                <<"currency_code"/utf8>>,
                string_type,
                false
            ),
            lightspeed@pipeline@quality:field(
                <<"tenant_id"/utf8>>,
                string_type,
                true
            )]
    ),
    Good_payload = [lightspeed@pipeline@quality:string_value(
            <<"order_id"/utf8>>,
            <<"order-9"/utf8>>
        ),
        lightspeed@pipeline@quality:int_value(<<"total_cents"/utf8>>, 1900),
        lightspeed@pipeline@quality:bool_value(<<"is_priority"/utf8>>, true)],
    Bad_payload = [lightspeed@pipeline@quality:string_value(
            <<"order_id"/utf8>>,
            <<"order-10"/utf8>>
        ),
        lightspeed@pipeline@quality:string_value(
            <<"total_cents"/utf8>>,
            <<"invalid"/utf8>>
        ),
        lightspeed@pipeline@quality:bool_value(<<"is_priority"/utf8>>, false),
        lightspeed@pipeline@quality:int_value(<<"unexpected_counter"/utf8>>, 1)],
    Compatibility = lightspeed@pipeline@quality:check_compatibility(V1, V2),
    Incompatible = lightspeed@pipeline@quality:check_compatibility(V2, Breaking),
    Good_validation = lightspeed@pipeline@quality:validate(V2, Good_payload),
    Bad_validation = lightspeed@pipeline@quality:validate(V2, Bad_payload),
    Compatibility_label = result_label(Compatibility),
    Incompatible_label = result_label(Incompatible),
    Good_label = lightspeed@pipeline@quality:validation_result_label(
        Good_validation
    ),
    Bad_label = lightspeed@pipeline@quality:validation_result_label(
        Bad_validation
    ),
    Passed = ((((Compatibility =:= {ok, nil}) andalso (Incompatible =:= {error,
        <<"new_required_field:tenant_id"/utf8>>}))
    andalso gleam_stdlib:string_starts_with(Good_label, <<"valid:"/utf8>>))
    andalso gleam_stdlib:contains_string(
        Bad_label,
        <<"type_mismatch:total_cents:expected=int:actual=string"/utf8>>
    ))
    andalso gleam_stdlib:contains_string(
        Bad_label,
        <<"unexpected_field:unexpected_counter"/utf8>>
    ),
    {Passed,
        <<<<<<<<<<<<<<<<<<<<<<"v1="/utf8,
                                                    (lightspeed@pipeline@quality:schema_signature(
                                                        V1
                                                    ))/binary>>/binary,
                                                "|v2="/utf8>>/binary,
                                            (lightspeed@pipeline@quality:schema_signature(
                                                V2
                                            ))/binary>>/binary,
                                        "|compatibility="/utf8>>/binary,
                                    Compatibility_label/binary>>/binary,
                                "|breaking="/utf8>>/binary,
                            Incompatible_label/binary>>/binary,
                        "|good_validation="/utf8>>/binary,
                    Good_label/binary>>/binary,
                "|bad_validation="/utf8>>/binary,
            Bad_label/binary>>}.

-file("src/lightspeed/ops/etl_reliability_harness.gleam", 623).
-spec process_outcome_result_label(
    {ok, lightspeed@pipeline@orchestrator:process_outcome()} | {error, binary()}
) -> binary().
process_outcome_result_label(Result) ->
    case Result of
        {ok, Outcome} ->
            lightspeed@pipeline@orchestrator:process_outcome_label(Outcome);

        {error, Reason} ->
            <<"error:"/utf8, Reason/binary>>
    end.

-file("src/lightspeed/ops/etl_reliability_harness.gleam", 617).
-spec fresh_orchestrator_runtime(lightspeed@async@backpressure:boundary()) -> lightspeed@pipeline@orchestrator:runtime().
fresh_orchestrator_runtime(Boundary) ->
    lightspeed@pipeline@orchestrator:new(
        fresh_pipeline_runtime(),
        lightspeed@pipeline@connector:default_plan(),
        Boundary
    ).

-file("src/lightspeed/ops/etl_reliability_harness.gleam", 346).
-spec evaluate_poison_message_replay_path() -> {boolean(), binary()}.
evaluate_poison_message_replay_path() ->
    Runtime = fresh_orchestrator_runtime(
        lightspeed@async@backpressure:default_boundary()
    ),
    Runtime@1 = lightspeed@pipeline@orchestrator:start_run(Runtime, 200),
    {Runtime@2, _} = lightspeed@pipeline@orchestrator:enqueue_batch(
        Runtime@1,
        <<"batch-source-2"/utf8>>,
        <<"extract_orders"/utf8>>,
        6,
        60,
        <<"source-2"/utf8>>,
        1
    ),
    {Runtime@3, _} = lightspeed@pipeline@orchestrator:start_available(
        Runtime@2,
        201
    ),
    {Runtime@4, _} = lightspeed@pipeline@orchestrator:ack_batch(
        Runtime@3,
        <<"batch-source-2"/utf8>>,
        202
    ),
    {Runtime@5, _} = lightspeed@pipeline@orchestrator:enqueue_batch(
        Runtime@4,
        <<"batch-transform-2"/utf8>>,
        <<"normalize_orders"/utf8>>,
        6,
        45,
        <<"transform-2"/utf8>>,
        2
    ),
    {Runtime@6, _} = lightspeed@pipeline@orchestrator:start_available(
        Runtime@5,
        203
    ),
    {Runtime@7, _} = lightspeed@pipeline@orchestrator:ack_batch(
        Runtime@6,
        <<"batch-transform-2"/utf8>>,
        204
    ),
    {Runtime@8, _} = lightspeed@pipeline@orchestrator:enqueue_batch(
        Runtime@7,
        <<"batch-poison-2"/utf8>>,
        <<"write_orders"/utf8>>,
        6,
        40,
        <<"poison-key-2"/utf8>>,
        3
    ),
    {Runtime@9, _} = lightspeed@pipeline@orchestrator:start_available(
        Runtime@8,
        205
    ),
    {Runtime@10, Dead_outcome} = lightspeed@pipeline@orchestrator:fail_batch(
        Runtime@9,
        <<"batch-poison-2"/utf8>>,
        3,
        <<"poison_message"/utf8>>,
        206
    ),
    {Runtime@11, Replay_result} = lightspeed@pipeline@orchestrator:apply_action(
        Runtime@10,
        {replay, <<"normalize_orders"/utf8>>, <<"quarantine_replay"/utf8>>},
        207
    ),
    {Runtime@12, Replay_resume_result} = lightspeed@pipeline@orchestrator:apply_action(
        Runtime@11,
        {resume, <<"resume_after_replay"/utf8>>},
        208
    ),
    {Runtime@13, Replay_enqueue_result} = lightspeed@pipeline@orchestrator:enqueue_batch(
        Runtime@12,
        <<"batch-replay-2"/utf8>>,
        <<"write_orders"/utf8>>,
        6,
        35,
        <<"recovered-key-2"/utf8>>,
        4
    ),
    {Runtime@14, _} = lightspeed@pipeline@orchestrator:start_available(
        Runtime@13,
        209
    ),
    {Runtime@15, Replay_sink_outcome} = lightspeed@pipeline@orchestrator:ack_batch(
        Runtime@14,
        <<"batch-replay-2"/utf8>>,
        210
    ),
    Runtime@16 = lightspeed@pipeline@orchestrator:complete_run(Runtime@15, 215),
    Dead_label = process_outcome_result_label(Dead_outcome),
    Replay_sink_label = process_outcome_result_label(Replay_sink_outcome),
    Lifecycle_label = lightspeed@pipeline:lifecycle_label(
        lightspeed@pipeline:lifecycle(
            lightspeed@pipeline@orchestrator:runtime_pipeline(Runtime@16)
        )
    ),
    Operator_signature = lightspeed@pipeline@operator:signature(
        lightspeed@pipeline@orchestrator:runtime_operator(Runtime@16)
    ),
    Passed = ((((((gleam_stdlib:contains_string(
        Dead_label,
        <<"dead_lettered:batch-poison-2:orders.dead_letter:poison_message"/utf8>>
    )
    andalso (Replay_result =:= {ok, nil}))
    andalso (Replay_resume_result =:= {ok, nil}))
    andalso (Replay_enqueue_result =:= {ok, nil}))
    andalso gleam_stdlib:contains_string(
        Replay_sink_label,
        <<"processed:applied:"/utf8>>
    ))
    andalso gleam_stdlib:string_starts_with(
        Lifecycle_label,
        <<"completed:run-2-replay:"/utf8>>
    ))
    andalso (erlang:length(
        lightspeed@pipeline@orchestrator:dead_letters(Runtime@16)
    )
    =:= 1))
    andalso gleam_stdlib:contains_string(
        Operator_signature,
        <<"action=replay:normalize_orders:quarantine_replay"/utf8>>
    ),
    {Passed,
        <<<<<<<<<<<<<<"dead_letter="/utf8, Dead_label/binary>>/binary,
                                "|replay_sink="/utf8>>/binary,
                            Replay_sink_label/binary>>/binary,
                        "|lifecycle="/utf8>>/binary,
                    Lifecycle_label/binary>>/binary,
                "|operator="/utf8>>/binary,
            Operator_signature/binary>>}.

-file("src/lightspeed/ops/etl_reliability_harness.gleam", 639).
-spec started_has_key(list(binary()), binary()) -> boolean().
started_has_key(Entries, Key) ->
    case Entries of
        [] ->
            false;

        [Entry | Rest] ->
            case gleam_stdlib:string_starts_with(
                Entry,
                <<Key/binary, ":"/utf8>>
            ) of
                true ->
                    true;

                false ->
                    started_has_key(Rest, Key)
            end
    end.

-file("src/lightspeed/ops/etl_reliability_harness.gleam", 283).
-spec evaluate_connector_outage_recovery_path() -> {boolean(), binary()}.
evaluate_connector_outage_recovery_path() ->
    Runtime = fresh_orchestrator_runtime(
        lightspeed@async@backpressure:default_boundary()
    ),
    Runtime@1 = lightspeed@pipeline@orchestrator:start_run(Runtime, 100),
    {Runtime@2, Enqueue_result} = lightspeed@pipeline@orchestrator:enqueue_batch(
        Runtime@1,
        <<"batch-outage-1"/utf8>>,
        <<"extract_orders"/utf8>>,
        10,
        90,
        <<"source-outage-1"/utf8>>,
        1
    ),
    {Runtime@3, Started_before_fail} = lightspeed@pipeline@orchestrator:start_available(
        Runtime@2,
        101
    ),
    {Runtime@4, Retry_outcome} = lightspeed@pipeline@orchestrator:fail_batch(
        Runtime@3,
        <<"batch-outage-1"/utf8>>,
        1,
        <<"connector_outage"/utf8>>,
        102
    ),
    {Runtime@5, Started_after_retry} = lightspeed@pipeline@orchestrator:start_available(
        Runtime@4,
        103
    ),
    {Runtime@6, Success_outcome} = lightspeed@pipeline@orchestrator:ack_batch(
        Runtime@5,
        <<"batch-outage-1"/utf8>>,
        104
    ),
    Runtime@7 = lightspeed@pipeline@orchestrator:complete_run(Runtime@6, 110),
    Telemetry_label = lightspeed@pipeline@telemetry:label(
        lightspeed@pipeline:runtime_telemetry(
            lightspeed@pipeline@orchestrator:runtime_pipeline(Runtime@7)
        )
    ),
    Retry_label = process_outcome_result_label(Retry_outcome),
    Success_label = process_outcome_result_label(Success_outcome),
    Lifecycle_label = lightspeed@pipeline:lifecycle_label(
        lightspeed@pipeline:lifecycle(
            lightspeed@pipeline@orchestrator:runtime_pipeline(Runtime@7)
        )
    ),
    Passed = ((((((((Enqueue_result =:= {ok, nil}) andalso started_has_key(
        Started_before_fail,
        <<"batch-outage-1"/utf8>>
    ))
    andalso started_has_key(Started_after_retry, <<"batch-outage-1"/utf8>>))
    andalso gleam_stdlib:string_starts_with(
        Retry_label,
        <<"retry_scheduled:batch-outage-1:2:"/utf8>>
    ))
    andalso gleam_stdlib:contains_string(
        Success_label,
        <<"processed:applied:"/utf8>>
    ))
    andalso gleam_stdlib:string_starts_with(
        Lifecycle_label,
        <<"completed:run-1:"/utf8>>
    ))
    andalso (erlang:length(lightspeed@pipeline@orchestrator:retries(Runtime@7))
    =:= 1))
    andalso (lightspeed@pipeline@orchestrator:dead_letters(Runtime@7) =:= []))
    andalso gleam_stdlib:contains_string(Telemetry_label, <<"retries=1"/utf8>>),
    {Passed,
        <<<<<<<<<<<<<<"retry="/utf8, Retry_label/binary>>/binary,
                                "|success="/utf8>>/binary,
                            Success_label/binary>>/binary,
                        "|lifecycle="/utf8>>/binary,
                    Lifecycle_label/binary>>/binary,
                "|telemetry="/utf8>>/binary,
            Telemetry_label/binary>>}.

-file("src/lightspeed/ops/etl_reliability_harness.gleam", 272).
-spec evaluate(scenario()) -> {boolean(), binary()}.
evaluate(Scenario) ->
    case Scenario of
        connector_outage_recovery_path ->
            evaluate_connector_outage_recovery_path();

        poison_message_replay_path ->
            evaluate_poison_message_replay_path();

        schema_evolution_compatibility_path ->
            evaluate_schema_evolution_compatibility_path();

        replay_idempotency_path ->
            evaluate_replay_idempotency_path();

        slo_budget_compliance_path ->
            evaluate_slo_budget_compliance_path()
    end.

-file("src/lightspeed/ops/etl_reliability_harness.gleam", 76).
?DOC(" Run one scenario twice and require deterministic parity.\n").
-spec run_scenario(scenario()) -> scenario_outcome().
run_scenario(Scenario) ->
    {First_passed, First_signature} = evaluate(Scenario),
    {Second_passed, Second_signature} = evaluate(Scenario),
    Deterministic = (First_passed =:= Second_passed) andalso (First_signature
    =:= Second_signature),
    Passed = (First_passed andalso Second_passed) andalso Deterministic,
    {scenario_outcome, Scenario, Passed, Deterministic, First_signature}.

-file("src/lightspeed/ops/etl_reliability_harness.gleam", 57).
?DOC(" Run all M33 reliability scenarios.\n").
-spec run_matrix() -> report().
run_matrix() ->
    Outcomes = begin
        _pipe = [connector_outage_recovery_path,
            poison_message_replay_path,
            schema_evolution_compatibility_path,
            replay_idempotency_path,
            slo_budget_compliance_path],
        gleam@list:map(_pipe, fun run_scenario/1)
    end,
    {report, Outcomes, count_failed(Outcomes), count_nondeterministic(Outcomes)}.

-file("src/lightspeed/ops/etl_reliability_harness.gleam", 92).
?DOC(" Scenario label.\n").
-spec scenario_label(scenario()) -> binary().
scenario_label(Scenario) ->
    case Scenario of
        connector_outage_recovery_path ->
            <<"connector_outage_recovery_path"/utf8>>;

        poison_message_replay_path ->
            <<"poison_message_replay_path"/utf8>>;

        schema_evolution_compatibility_path ->
            <<"schema_evolution_compatibility_path"/utf8>>;

        replay_idempotency_path ->
            <<"replay_idempotency_path"/utf8>>;

        slo_budget_compliance_path ->
            <<"slo_budget_compliance_path"/utf8>>
    end.

-file("src/lightspeed/ops/etl_reliability_harness.gleam", 103).
?DOC(" Stable pass/fail label.\n").
-spec pass_fail_label(scenario_outcome()) -> binary().
pass_fail_label(Outcome) ->
    case erlang:element(3, Outcome) of
        true ->
            <<"pass"/utf8>>;

        false ->
            <<"fail"/utf8>>
    end.

-file("src/lightspeed/ops/etl_reliability_harness.gleam", 111).
?DOC(" Scenario signature accessor.\n").
-spec signature(scenario_outcome()) -> binary().
signature(Outcome) ->
    erlang:element(5, Outcome).

-file("src/lightspeed/ops/etl_reliability_harness.gleam", 116).
?DOC(" Scenario accessor.\n").
-spec scenario(scenario_outcome()) -> scenario().
scenario(Outcome) ->
    erlang:element(2, Outcome).

-file("src/lightspeed/ops/etl_reliability_harness.gleam", 121).
?DOC(" Scenario determinism accessor.\n").
-spec deterministic(scenario_outcome()) -> boolean().
deterministic(Outcome) ->
    erlang:element(4, Outcome).

-file("src/lightspeed/ops/etl_reliability_harness.gleam", 126).
?DOC(" Report outcomes.\n").
-spec outcomes(report()) -> list(scenario_outcome()).
outcomes(Report) ->
    erlang:element(2, Report).

-file("src/lightspeed/ops/etl_reliability_harness.gleam", 131).
?DOC(" Failed scenario count.\n").
-spec failed_scenarios(report()) -> integer().
failed_scenarios(Report) ->
    erlang:element(3, Report).

-file("src/lightspeed/ops/etl_reliability_harness.gleam", 136).
?DOC(" Nondeterministic failure count.\n").
-spec nondeterministic_failures(report()) -> integer().
nondeterministic_failures(Report) ->
    erlang:element(4, Report).

-file("src/lightspeed/ops/etl_reliability_harness.gleam", 672).
-spec bool_label(boolean()) -> binary().
bool_label(Value) ->
    case Value of
        true ->
            <<"true"/utf8>>;

        false ->
            <<"false"/utf8>>
    end.

-file("src/lightspeed/ops/etl_reliability_harness.gleam", 141).
?DOC(" Stable report signature.\n").
-spec report_signature(report()) -> binary().
report_signature(Report) ->
    Entries = gleam@list:map(
        erlang:element(2, Report),
        fun(Outcome) ->
            <<<<<<<<<<<<(scenario_label(erlang:element(2, Outcome)))/binary,
                                    "="/utf8>>/binary,
                                (pass_fail_label(Outcome))/binary>>/binary,
                            ":deterministic="/utf8>>/binary,
                        (bool_label(erlang:element(4, Outcome)))/binary>>/binary,
                    ":"/utf8>>/binary,
                (erlang:element(5, Outcome))/binary>>
        end
    ),
    join_with(<<";"/utf8>>, Entries).

-file("src/lightspeed/ops/etl_reliability_harness.gleam", 157).
?DOC(" Deterministic snapshot signature for fixture drift gates.\n").
-spec snapshot_signature() -> binary().
snapshot_signature() ->
    <<<<<<"m33.snapshot.v"/utf8, (erlang:integer_to_binary(1))/binary>>/binary,
            "|"/utf8>>/binary,
        (report_signature(run_matrix()))/binary>>.

-file("src/lightspeed/ops/etl_reliability_harness.gleam", 165).
?DOC(" Deterministic markdown report for fixture scripts.\n").
-spec snapshot_report_markdown() -> binary().
snapshot_report_markdown() ->
    Report = run_matrix(),
    Failed = failed_scenarios(Report),
    Nondeterministic = nondeterministic_failures(Report),
    Status = case (Failed =:= 0) andalso (Nondeterministic =:= 0) of
        true ->
            <<"OK"/utf8>>;

        false ->
            <<"FAIL"/utf8>>
    end,
    <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<"# ETL Reliability Fixture Report\n\n"/utf8,
                                                                            "snapshot_version: "/utf8>>/binary,
                                                                        (erlang:integer_to_binary(
                                                                            1
                                                                        ))/binary>>/binary,
                                                                    "\n"/utf8>>/binary,
                                                                "status: "/utf8>>/binary,
                                                            Status/binary>>/binary,
                                                        "\n"/utf8>>/binary,
                                                    "failed_scenarios: "/utf8>>/binary,
                                                (erlang:integer_to_binary(
                                                    Failed
                                                ))/binary>>/binary,
                                            "\n"/utf8>>/binary,
                                        "nondeterministic_failures: "/utf8>>/binary,
                                    (erlang:integer_to_binary(Nondeterministic))/binary>>/binary,
                                "\n\n"/utf8>>/binary,
                            "snapshot_signature: "/utf8>>/binary,
                        (snapshot_signature())/binary>>/binary,
                    "\n\n"/utf8>>/binary,
                "report_signature: "/utf8>>/binary,
            (report_signature(Report))/binary>>/binary,
        "\n"/utf8>>.

-file("src/lightspeed/ops/etl_reliability_harness.gleam", 208).
?DOC(" Budget-report observation accessor.\n").
-spec budget_observations(budget_report()) -> list(lightspeed@pipeline@slo:observation()).
budget_observations(Report) ->
    erlang:element(2, Report).

-file("src/lightspeed/ops/etl_reliability_harness.gleam", 213).
?DOC(" Budget-report results accessor.\n").
-spec budget_results(budget_report()) -> list(lightspeed@pipeline@slo:budget_result()).
budget_results(Report) ->
    erlang:element(3, Report).

-file("src/lightspeed/ops/etl_reliability_harness.gleam", 218).
?DOC(" Budget-report failed count accessor.\n").
-spec budget_failed_budgets(budget_report()) -> integer().
budget_failed_budgets(Report) ->
    erlang:element(4, Report).

-file("src/lightspeed/ops/etl_reliability_harness.gleam", 236).
?DOC(" Deterministic budget snapshot signature for drift gates.\n").
-spec budget_snapshot_signature() -> binary().
budget_snapshot_signature() ->
    <<<<<<"m33.slo.snapshot.v"/utf8, (erlang:integer_to_binary(1))/binary>>/binary,
            "|"/utf8>>/binary,
        (budget_report_signature(run_budget_report()))/binary>>.

-file("src/lightspeed/ops/etl_reliability_harness.gleam", 244).
?DOC(" Deterministic markdown budget report for scripts.\n").
-spec budget_report_markdown() -> binary().
budget_report_markdown() ->
    Report = run_budget_report(),
    Status = case erlang:element(4, Report) =:= 0 of
        true ->
            <<"OK"/utf8>>;

        false ->
            <<"FAIL"/utf8>>
    end,
    <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<"# ETL SLO Budget Report\n\n"/utf8,
                                                                            "budget_snapshot_version: "/utf8>>/binary,
                                                                        (erlang:integer_to_binary(
                                                                            1
                                                                        ))/binary>>/binary,
                                                                    "\n"/utf8>>/binary,
                                                                "budget_version: "/utf8>>/binary,
                                                            (lightspeed@pipeline@slo:budget_version_label(
                                                                
                                                            ))/binary>>/binary,
                                                        "\n"/utf8>>/binary,
                                                    "status: "/utf8>>/binary,
                                                Status/binary>>/binary,
                                            "\n"/utf8>>/binary,
                                        "failed_budgets: "/utf8>>/binary,
                                    (erlang:integer_to_binary(
                                        erlang:element(4, Report)
                                    ))/binary>>/binary,
                                "\n\n"/utf8>>/binary,
                            "budget_snapshot_signature: "/utf8>>/binary,
                        (budget_snapshot_signature())/binary>>/binary,
                    "\n\n"/utf8>>/binary,
                "budget_report_signature: "/utf8>>/binary,
            (budget_report_signature(Report))/binary>>/binary,
        "\n"/utf8>>.