-module(lightspeed@ops@pipeline_harness).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/lightspeed/ops/pipeline_harness.gleam").
-export([run_scenario/1, run_matrix/0, scenario_label/1, pass_fail_label/1, signature/1, deterministic/1, scenario/1, outcomes/1, failed_scenarios/1, nondeterministic_failures/1, report_signature/1, snapshot_signature/0, snapshot_report_markdown/0]).
-export_type([scenario/0, scenario_outcome/0, report/0]).
-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.
?MODULEDOC(" Deterministic integrated data-pipeline harness for M31.\n").
-type scenario() :: success_path |
partial_failure_path |
replay_path |
crash_resume_checkpoint_path |
idempotency_sink_path.
-type scenario_outcome() :: {scenario_outcome,
scenario(),
boolean(),
boolean(),
binary()}.
-type report() :: {report, list(scenario_outcome()), integer(), integer()}.
-file("src/lightspeed/ops/pipeline_harness.gleam", 453).
-spec count_nondeterministic(list(scenario_outcome())) -> integer().
count_nondeterministic(Outcomes) ->
case Outcomes of
[] ->
0;
[Outcome | Rest] ->
case erlang:element(4, Outcome) of
true ->
count_nondeterministic(Rest);
false ->
1 + count_nondeterministic(Rest)
end
end.
-file("src/lightspeed/ops/pipeline_harness.gleam", 442).
-spec count_failed(list(scenario_outcome())) -> integer().
count_failed(Outcomes) ->
case Outcomes of
[] ->
0;
[Outcome | Rest] ->
case erlang:element(3, Outcome) of
true ->
count_failed(Rest);
false ->
1 + count_failed(Rest)
end
end.
-file("src/lightspeed/ops/pipeline_harness.gleam", 464).
-spec join_with(binary(), list(binary())) -> binary().
join_with(Separator, Values) ->
case Values of
[] ->
<<""/utf8>>;
[Value] ->
Value;
[Value@1 | Rest] ->
<<<<Value@1/binary, Separator/binary>>/binary,
(join_with(Separator, Rest))/binary>>
end.
-file("src/lightspeed/ops/pipeline_harness.gleam", 429).
-spec fresh_runtime() -> lightspeed@pipeline:runtime().
fresh_runtime() ->
_pipe = lightspeed@pipeline:pipeline(
<<"orders_pipeline"/utf8>>,
{interval, 60000},
[lightspeed@pipeline:source_stage(
<<"extract_orders"/utf8>>,
<<"raw_order"/utf8>>
),
lightspeed@pipeline:transform_stage(
<<"normalize_orders"/utf8>>,
<<"raw_order"/utf8>>,
<<"normalized_order"/utf8>>
),
lightspeed@pipeline:sink_stage(
<<"write_orders"/utf8>>,
<<"normalized_order"/utf8>>
)]
),
lightspeed@pipeline:new(_pipe).
-file("src/lightspeed/ops/pipeline_harness.gleam", 387).
-spec evaluate_idempotency_sink_path() -> {boolean(), binary()}.
evaluate_idempotency_sink_path() ->
Runtime = begin
_pipe = fresh_runtime(),
lightspeed@pipeline:start(_pipe, 400)
end,
{Runtime@1, _} = lightspeed@pipeline:process(
Runtime,
<<"extract_orders"/utf8>>,
2,
20,
<<"source-batch-5"/utf8>>,
401
),
{Runtime@2, _} = lightspeed@pipeline:process(
Runtime@1,
<<"normalize_orders"/utf8>>,
2,
18,
<<"transform-batch-5"/utf8>>,
402
),
{Runtime@3, First_sink} = lightspeed@pipeline:process(
Runtime@2,
<<"write_orders"/utf8>>,
2,
15,
<<"shared-key"/utf8>>,
403
),
{Runtime@4, Second_sink} = lightspeed@pipeline:process(
Runtime@3,
<<"write_orders"/utf8>>,
2,
15,
<<"shared-key"/utf8>>,
404
),
{Runtime@5, Third_sink} = lightspeed@pipeline:process(
Runtime@4,
<<"write_orders"/utf8>>,
2,
12,
<<"shared-key-2"/utf8>>,
405
),
Runtime@6 = lightspeed@pipeline:complete(Runtime@5, 410),
Passed = (((gleam_stdlib:string_starts_with(
lightspeed@pipeline:process_result_label(First_sink),
<<"applied:"/utf8>>
)
andalso (lightspeed@pipeline:process_result_label(Second_sink) =:= <<"duplicate_suppressed:write_orders:shared-key"/utf8>>))
andalso gleam_stdlib:string_starts_with(
lightspeed@pipeline:process_result_label(Third_sink),
<<"applied:"/utf8>>
))
andalso (erlang:length(lightspeed@pipeline:checkpoints(Runtime@6)) =:= 4))
andalso (lightspeed@pipeline:sink_idempotency_keys(Runtime@6) =:= [<<"shared-key"/utf8>>,
<<"shared-key-2"/utf8>>]),
{Passed,
<<<<<<<<<<<<<<"first="/utf8,
(lightspeed@pipeline:process_result_label(
First_sink
))/binary>>/binary,
"|duplicate="/utf8>>/binary,
(lightspeed@pipeline:process_result_label(
Second_sink
))/binary>>/binary,
"|third="/utf8>>/binary,
(lightspeed@pipeline:process_result_label(Third_sink))/binary>>/binary,
"|sink_keys="/utf8>>/binary,
(join_with(
<<","/utf8>>,
lightspeed@pipeline:sink_idempotency_keys(Runtime@6)
))/binary>>}.
-file("src/lightspeed/ops/pipeline_harness.gleam", 334).
-spec evaluate_crash_resume_checkpoint_path() -> {boolean(), binary()}.
evaluate_crash_resume_checkpoint_path() ->
Runtime = begin
_pipe = fresh_runtime(),
lightspeed@pipeline:start(_pipe, 300)
end,
{Runtime@1, _} = lightspeed@pipeline:process(
Runtime,
<<"extract_orders"/utf8>>,
6,
40,
<<"source-batch-4"/utf8>>,
301
),
{Runtime@2, _} = lightspeed@pipeline:process(
Runtime@1,
<<"normalize_orders"/utf8>>,
6,
25,
<<"transform-batch-4"/utf8>>,
302
),
Runtime@3 = lightspeed@pipeline:crash(Runtime@2, <<"store_fault"/utf8>>),
Before = lightspeed@pipeline:checkpoints(Runtime@3),
Latest = lightspeed@pipeline@checkpoint:latest(Before),
Resume_result = lightspeed@pipeline@checkpoint:resume_point_for_stage(
Before,
<<"normalize_orders"/utf8>>
),
Resumed = case lightspeed@pipeline:resume_from_latest_checkpoint(
Runtime@3,
310
) of
{ok, Next} ->
Next;
{error, _} ->
Runtime@3
end,
Replay_lifecycle = lightspeed@pipeline:lifecycle_label(
lightspeed@pipeline:lifecycle(Resumed)
),
Latest_label = case Latest of
{some, Entry} ->
lightspeed@pipeline@checkpoint:checkpoint_label(Entry);
none ->
<<"none"/utf8>>
end,
Resume_label = case Resume_result of
{ok, Point} ->
lightspeed@pipeline@checkpoint:resume_point_label(Point);
{error, Reason} ->
<<"error:"/utf8, Reason/binary>>
end,
Passed = ((((Latest /= none) andalso gleam_stdlib:contains_string(
Replay_lifecycle,
<<"replaying:run-2-replay:normalize_orders:"/utf8>>
))
andalso gleam_stdlib:contains_string(
Latest_label,
<<"stage=normalize_orders"/utf8>>
))
andalso gleam_stdlib:contains_string(
Resume_label,
<<"stage=normalize_orders"/utf8>>
))
andalso lightspeed@pipeline:valid(Resumed),
{Passed,
<<<<<<<<<<"lifecycle="/utf8, Replay_lifecycle/binary>>/binary,
"|latest="/utf8>>/binary,
Latest_label/binary>>/binary,
"|resume="/utf8>>/binary,
Resume_label/binary>>}.
-file("src/lightspeed/ops/pipeline_harness.gleam", 280).
-spec evaluate_replay_path() -> {boolean(), binary()}.
evaluate_replay_path() ->
Runtime = begin
_pipe = fresh_runtime(),
lightspeed@pipeline:start(_pipe, 200)
end,
{Runtime@1, _} = lightspeed@pipeline:process(
Runtime,
<<"extract_orders"/utf8>>,
5,
45,
<<"source-batch-3"/utf8>>,
201
),
{Runtime@2, _} = lightspeed@pipeline:process(
Runtime@1,
<<"normalize_orders"/utf8>>,
5,
30,
<<"transform-batch-3"/utf8>>,
202
),
{Runtime@3, First_sink} = lightspeed@pipeline:process(
Runtime@2,
<<"write_orders"/utf8>>,
5,
20,
<<"order-1"/utf8>>,
203
),
Runtime@4 = lightspeed@pipeline:crash(Runtime@3, <<"node_restart"/utf8>>),
Resumed = case lightspeed@pipeline:resume_from_latest_checkpoint(
Runtime@4,
210
) of
{ok, Next} ->
Next;
{error, _} ->
Runtime@4
end,
{Resumed@1, Duplicate_sink} = lightspeed@pipeline:process(
Resumed,
<<"write_orders"/utf8>>,
5,
20,
<<"order-1"/utf8>>,
211
),
{Resumed@2, Second_sink} = lightspeed@pipeline:process(
Resumed@1,
<<"write_orders"/utf8>>,
5,
15,
<<"order-2"/utf8>>,
212
),
Resumed@3 = lightspeed@pipeline:complete(Resumed@2, 220),
Passed = ((((gleam_stdlib:string_starts_with(
lightspeed@pipeline:process_result_label(First_sink),
<<"applied:"/utf8>>
)
andalso (lightspeed@pipeline:process_result_label(Duplicate_sink) =:= <<"duplicate_suppressed:write_orders:order-1"/utf8>>))
andalso gleam_stdlib:string_starts_with(
lightspeed@pipeline:process_result_label(Second_sink),
<<"applied:"/utf8>>
))
andalso (erlang:length(lightspeed@pipeline:checkpoints(Resumed@3)) =:= 4))
andalso (lightspeed@pipeline:sink_idempotency_keys(Resumed@3) =:= [<<"order-1"/utf8>>,
<<"order-2"/utf8>>]))
andalso gleam_stdlib:string_starts_with(
lightspeed@pipeline:lifecycle_label(
lightspeed@pipeline:lifecycle(Resumed@3)
),
<<"completed:run-2-replay:"/utf8>>
),
{Passed,
<<<<<<<<<<<<<<"lifecycle="/utf8,
(lightspeed@pipeline:lifecycle_label(
lightspeed@pipeline:lifecycle(Resumed@3)
))/binary>>/binary,
"|duplicate="/utf8>>/binary,
(lightspeed@pipeline:process_result_label(
Duplicate_sink
))/binary>>/binary,
"|checkpoints="/utf8>>/binary,
(lightspeed@pipeline@checkpoint:chain_signature(
lightspeed@pipeline:checkpoints(Resumed@3)
))/binary>>/binary,
"|sink_keys="/utf8>>/binary,
(join_with(
<<","/utf8>>,
lightspeed@pipeline:sink_idempotency_keys(Resumed@3)
))/binary>>}.
-file("src/lightspeed/ops/pipeline_harness.gleam", 245).
-spec evaluate_partial_failure_path() -> {boolean(), binary()}.
evaluate_partial_failure_path() ->
Runtime = begin
_pipe = fresh_runtime(),
lightspeed@pipeline:start(_pipe, 50)
end,
{Runtime@1, Source_result} = lightspeed@pipeline:process(
Runtime,
<<"extract_orders"/utf8>>,
4,
70,
<<"source-batch-2"/utf8>>,
51
),
{Runtime@2, Rejected} = lightspeed@pipeline:process(
Runtime@1,
<<"missing_stage"/utf8>>,
4,
70,
<<"missing-1"/utf8>>,
52
),
Runtime@3 = lightspeed@pipeline:record_retry(Runtime@2, 2),
Runtime@4 = lightspeed@pipeline:record_dead_letter(Runtime@3, 1),
Runtime@5 = lightspeed@pipeline:fail(Runtime@4, <<"transform_failed"/utf8>>),
Telemetry_label = lightspeed@pipeline@telemetry:label(
lightspeed@pipeline:runtime_telemetry(Runtime@5)
),
Passed = ((((gleam_stdlib:string_starts_with(
lightspeed@pipeline:process_result_label(Source_result),
<<"applied:"/utf8>>
)
andalso (lightspeed@pipeline:process_result_label(Rejected) =:= <<"rejected:unknown_stage:missing_stage"/utf8>>))
andalso gleam_stdlib:string_starts_with(
lightspeed@pipeline:lifecycle_label(
lightspeed@pipeline:lifecycle(Runtime@5)
),
<<"failed:run-1:transform_failed"/utf8>>
))
andalso gleam_stdlib:contains_string(Telemetry_label, <<"retries=2"/utf8>>))
andalso gleam_stdlib:contains_string(
Telemetry_label,
<<"dead_letters=1"/utf8>>
))
andalso (erlang:length(lightspeed@pipeline:checkpoints(Runtime@5)) =:= 1),
{Passed,
<<<<<<<<<<"rejected="/utf8,
(lightspeed@pipeline:process_result_label(Rejected))/binary>>/binary,
"|lifecycle="/utf8>>/binary,
(lightspeed@pipeline:lifecycle_label(
lightspeed@pipeline:lifecycle(Runtime@5)
))/binary>>/binary,
"|telemetry="/utf8>>/binary,
Telemetry_label/binary>>}.
-file("src/lightspeed/ops/pipeline_harness.gleam", 190).
-spec evaluate_success_path() -> {boolean(), binary()}.
evaluate_success_path() ->
Runtime = fresh_runtime(),
Runtime@1 = lightspeed@pipeline:schedule(Runtime, 100),
Runtime@2 = lightspeed@pipeline:trigger(
Runtime@1,
<<"interval_tick"/utf8>>,
100
),
Runtime@3 = lightspeed@pipeline:start(Runtime@2, 110),
{Runtime@4, Source_result} = lightspeed@pipeline:process(
Runtime@3,
<<"extract_orders"/utf8>>,
10,
45,
<<"source-batch-1"/utf8>>,
112
),
{Runtime@5, Transform_result} = lightspeed@pipeline:process(
Runtime@4,
<<"normalize_orders"/utf8>>,
10,
30,
<<"transform-batch-1"/utf8>>,
113
),
{Runtime@6, Sink_result} = lightspeed@pipeline:process(
Runtime@5,
<<"write_orders"/utf8>>,
10,
20,
<<"order-1"/utf8>>,
114
),
Runtime@7 = lightspeed@pipeline:complete(Runtime@6, 120),
Telemetry = lightspeed@pipeline:runtime_telemetry(Runtime@7),
Telemetry_label = lightspeed@pipeline@telemetry:label(Telemetry),
Checkpoints = lightspeed@pipeline:checkpoints(Runtime@7),
Passed = ((((((gleam_stdlib:string_starts_with(
lightspeed@pipeline:process_result_label(Source_result),
<<"applied:"/utf8>>
)
andalso gleam_stdlib:string_starts_with(
lightspeed@pipeline:process_result_label(Transform_result),
<<"applied:"/utf8>>
))
andalso gleam_stdlib:string_starts_with(
lightspeed@pipeline:process_result_label(Sink_result),
<<"applied:"/utf8>>
))
andalso gleam_stdlib:string_starts_with(
lightspeed@pipeline:lifecycle_label(
lightspeed@pipeline:lifecycle(Runtime@7)
),
<<"completed:run-1:"/utf8>>
))
andalso (erlang:length(Checkpoints) =:= 3))
andalso (lightspeed@pipeline:sink_idempotency_keys(Runtime@7) =:= [<<"order-1"/utf8>>]))
andalso gleam_stdlib:contains_string(
Telemetry_label,
<<"dead_letters=0"/utf8>>
))
andalso gleam_stdlib:contains_string(Telemetry_label, <<"retries=0"/utf8>>),
{Passed,
<<<<<<<<<<<<<<"lifecycle="/utf8,
(lightspeed@pipeline:lifecycle_label(
lightspeed@pipeline:lifecycle(Runtime@7)
))/binary>>/binary,
"|telemetry="/utf8>>/binary,
Telemetry_label/binary>>/binary,
"|checkpoints="/utf8>>/binary,
(lightspeed@pipeline@checkpoint:chain_signature(Checkpoints))/binary>>/binary,
"|sink_keys="/utf8>>/binary,
(join_with(
<<","/utf8>>,
lightspeed@pipeline:sink_idempotency_keys(Runtime@7)
))/binary>>}.
-file("src/lightspeed/ops/pipeline_harness.gleam", 180).
-spec evaluate(scenario()) -> {boolean(), binary()}.
evaluate(Scenario) ->
case Scenario of
success_path ->
evaluate_success_path();
partial_failure_path ->
evaluate_partial_failure_path();
replay_path ->
evaluate_replay_path();
crash_resume_checkpoint_path ->
evaluate_crash_resume_checkpoint_path();
idempotency_sink_path ->
evaluate_idempotency_sink_path()
end.
-file("src/lightspeed/ops/pipeline_harness.gleam", 61).
?DOC(" Run one scenario twice and require deterministic parity.\n").
-spec run_scenario(scenario()) -> scenario_outcome().
run_scenario(Scenario) ->
{First_passed, First_signature} = evaluate(Scenario),
{Second_passed, Second_signature} = evaluate(Scenario),
Deterministic = (First_passed =:= Second_passed) andalso (First_signature
=:= Second_signature),
Passed = (First_passed andalso Second_passed) andalso Deterministic,
{scenario_outcome, Scenario, Passed, Deterministic, First_signature}.
-file("src/lightspeed/ops/pipeline_harness.gleam", 42).
?DOC(" Run all M31 scenarios.\n").
-spec run_matrix() -> report().
run_matrix() ->
Outcomes = begin
_pipe = [success_path,
partial_failure_path,
replay_path,
crash_resume_checkpoint_path,
idempotency_sink_path],
gleam@list:map(_pipe, fun run_scenario/1)
end,
{report, Outcomes, count_failed(Outcomes), count_nondeterministic(Outcomes)}.
-file("src/lightspeed/ops/pipeline_harness.gleam", 77).
?DOC(" Scenario label.\n").
-spec scenario_label(scenario()) -> binary().
scenario_label(Scenario) ->
case Scenario of
success_path ->
<<"success_path"/utf8>>;
partial_failure_path ->
<<"partial_failure_path"/utf8>>;
replay_path ->
<<"replay_path"/utf8>>;
crash_resume_checkpoint_path ->
<<"crash_resume_checkpoint_path"/utf8>>;
idempotency_sink_path ->
<<"idempotency_sink_path"/utf8>>
end.
-file("src/lightspeed/ops/pipeline_harness.gleam", 88).
?DOC(" Stable pass/fail label.\n").
-spec pass_fail_label(scenario_outcome()) -> binary().
pass_fail_label(Outcome) ->
case erlang:element(3, Outcome) of
true ->
<<"pass"/utf8>>;
false ->
<<"fail"/utf8>>
end.
-file("src/lightspeed/ops/pipeline_harness.gleam", 96).
?DOC(" Scenario signature.\n").
-spec signature(scenario_outcome()) -> binary().
signature(Outcome) ->
erlang:element(5, Outcome).
-file("src/lightspeed/ops/pipeline_harness.gleam", 101).
?DOC(" Scenario determinism accessor.\n").
-spec deterministic(scenario_outcome()) -> boolean().
deterministic(Outcome) ->
erlang:element(4, Outcome).
-file("src/lightspeed/ops/pipeline_harness.gleam", 106).
?DOC(" Scenario accessor.\n").
-spec scenario(scenario_outcome()) -> scenario().
scenario(Outcome) ->
erlang:element(2, Outcome).
-file("src/lightspeed/ops/pipeline_harness.gleam", 111).
?DOC(" Report outcomes.\n").
-spec outcomes(report()) -> list(scenario_outcome()).
outcomes(Report) ->
erlang:element(2, Report).
-file("src/lightspeed/ops/pipeline_harness.gleam", 116).
?DOC(" Failed scenario count.\n").
-spec failed_scenarios(report()) -> integer().
failed_scenarios(Report) ->
erlang:element(3, Report).
-file("src/lightspeed/ops/pipeline_harness.gleam", 121).
?DOC(" Nondeterministic scenario count.\n").
-spec nondeterministic_failures(report()) -> integer().
nondeterministic_failures(Report) ->
erlang:element(4, Report).
-file("src/lightspeed/ops/pipeline_harness.gleam", 472).
-spec bool_label(boolean()) -> binary().
bool_label(Value) ->
case Value of
true ->
<<"true"/utf8>>;
false ->
<<"false"/utf8>>
end.
-file("src/lightspeed/ops/pipeline_harness.gleam", 126).
?DOC(" Stable report signature.\n").
-spec report_signature(report()) -> binary().
report_signature(Report) ->
Entries = gleam@list:map(
erlang:element(2, Report),
fun(Outcome) ->
<<<<<<<<<<<<(scenario_label(erlang:element(2, Outcome)))/binary,
"="/utf8>>/binary,
(pass_fail_label(Outcome))/binary>>/binary,
":deterministic="/utf8>>/binary,
(bool_label(erlang:element(4, Outcome)))/binary>>/binary,
":"/utf8>>/binary,
(erlang:element(5, Outcome))/binary>>
end
),
join_with(<<";"/utf8>>, Entries).
-file("src/lightspeed/ops/pipeline_harness.gleam", 142).
?DOC(" Deterministic snapshot signature for fixture drift gates.\n").
-spec snapshot_signature() -> binary().
snapshot_signature() ->
<<<<<<"m31.snapshot.v"/utf8, (erlang:integer_to_binary(1))/binary>>/binary,
"|"/utf8>>/binary,
(report_signature(run_matrix()))/binary>>.
-file("src/lightspeed/ops/pipeline_harness.gleam", 150).
?DOC(" Deterministic markdown report for fixture scripts.\n").
-spec snapshot_report_markdown() -> binary().
snapshot_report_markdown() ->
Report = run_matrix(),
Failed = failed_scenarios(Report),
Nondeterministic = nondeterministic_failures(Report),
Status = case (Failed =:= 0) andalso (Nondeterministic =:= 0) of
true ->
<<"OK"/utf8>>;
false ->
<<"FAIL"/utf8>>
end,
<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<"# Pipeline Fixture Report\n\n"/utf8,
"snapshot_version: "/utf8>>/binary,
(erlang:integer_to_binary(
1
))/binary>>/binary,
"\n"/utf8>>/binary,
"status: "/utf8>>/binary,
Status/binary>>/binary,
"\n"/utf8>>/binary,
"failed_scenarios: "/utf8>>/binary,
(erlang:integer_to_binary(
Failed
))/binary>>/binary,
"\n"/utf8>>/binary,
"nondeterministic_failures: "/utf8>>/binary,
(erlang:integer_to_binary(Nondeterministic))/binary>>/binary,
"\n\n"/utf8>>/binary,
"snapshot_signature: "/utf8>>/binary,
(snapshot_signature())/binary>>/binary,
"\n\n"/utf8>>/binary,
"report_signature: "/utf8>>/binary,
(report_signature(Report))/binary>>/binary,
"\n"/utf8>>.