-module(lightspeed@ops@connector_harness).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/lightspeed/ops/connector_harness.gleam").
-export([run_scenario/1, run_matrix/0, scenario_label/1, pass_fail_label/1, signature/1, scenario/1, deterministic/1, outcomes/1, failed_scenarios/1, nondeterministic_failures/1, report_signature/1, snapshot_signature/0, snapshot_report_markdown/0]).
-export_type([scenario/0, scenario_outcome/0, report/0]).
-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.
?MODULEDOC(" Deterministic connector/orchestration conformance harness for M32.\n").
-type scenario() :: connector_success_path |
connector_failure_path |
backpressure_bounded_queue_path |
operator_workflow_path.
-type scenario_outcome() :: {scenario_outcome,
scenario(),
boolean(),
boolean(),
binary()}.
-type report() :: {report, list(scenario_outcome()), integer(), integer()}.
-file("src/lightspeed/ops/connector_harness.gleam", 531).
-spec count_nondeterministic(list(scenario_outcome())) -> integer().
count_nondeterministic(Outcomes) ->
case Outcomes of
[] ->
0;
[Outcome | Rest] ->
case erlang:element(4, Outcome) of
true ->
count_nondeterministic(Rest);
false ->
1 + count_nondeterministic(Rest)
end
end.
-file("src/lightspeed/ops/connector_harness.gleam", 520).
-spec count_failed(list(scenario_outcome())) -> integer().
count_failed(Outcomes) ->
case Outcomes of
[] ->
0;
[Outcome | Rest] ->
case erlang:element(3, Outcome) of
true ->
count_failed(Rest);
false ->
1 + count_failed(Rest)
end
end.
-file("src/lightspeed/ops/connector_harness.gleam", 513).
-spec result_label({ok, nil} | {error, binary()}) -> binary().
result_label(Result) ->
case Result of
{ok, _} ->
<<"ok"/utf8>>;
{error, Reason} ->
<<"error:"/utf8, Reason/binary>>
end.
-file("src/lightspeed/ops/connector_harness.gleam", 484).
-spec fresh_runtime(lightspeed@async@backpressure:boundary()) -> lightspeed@pipeline@orchestrator:runtime().
fresh_runtime(Boundary) ->
Pipeline_runtime = begin
_pipe = lightspeed@pipeline:pipeline(
<<"orders_pipeline"/utf8>>,
{interval, 60000},
[lightspeed@pipeline:source_stage(
<<"extract_orders"/utf8>>,
<<"raw_order"/utf8>>
),
lightspeed@pipeline:transform_stage(
<<"normalize_orders"/utf8>>,
<<"raw_order"/utf8>>,
<<"normalized_order"/utf8>>
),
lightspeed@pipeline:sink_stage(
<<"write_orders"/utf8>>,
<<"normalized_order"/utf8>>
)]
),
lightspeed@pipeline:new(_pipe)
end,
lightspeed@pipeline@orchestrator:new(
Pipeline_runtime,
lightspeed@pipeline@connector:default_plan(),
Boundary
).
-file("src/lightspeed/ops/connector_harness.gleam", 363).
-spec evaluate_operator_workflow_path() -> {boolean(), binary()}.
evaluate_operator_workflow_path() ->
Runtime = fresh_runtime(lightspeed@async@backpressure:default_boundary()),
Runtime@1 = lightspeed@pipeline@orchestrator:start_run(Runtime, 400),
{Runtime@2, _} = lightspeed@pipeline@orchestrator:enqueue_batch(
Runtime@1,
<<"batch-op-1"/utf8>>,
<<"normalize_orders"/utf8>>,
5,
25,
<<"op-key-1"/utf8>>,
1
),
{Runtime@3, _} = lightspeed@pipeline@orchestrator:start_available(
Runtime@2,
401
),
{Runtime@4, _} = lightspeed@pipeline@orchestrator:ack_batch(
Runtime@3,
<<"batch-op-1"/utf8>>,
402
),
{Runtime@5, Pause_result} = lightspeed@pipeline@orchestrator:apply_action(
Runtime@4,
{pause, <<"maintenance"/utf8>>},
403
),
{Runtime@6, Paused_enqueue} = lightspeed@pipeline@orchestrator:enqueue_batch(
Runtime@5,
<<"batch-op-paused"/utf8>>,
<<"extract_orders"/utf8>>,
2,
20,
<<"op-key-paused"/utf8>>,
2
),
{Runtime@7, Resume_result} = lightspeed@pipeline@orchestrator:apply_action(
Runtime@6,
{resume, <<"maintenance_done"/utf8>>},
404
),
{Runtime@8, Drain_result} = lightspeed@pipeline@orchestrator:apply_action(
Runtime@7,
{drain, <<"drain_window"/utf8>>},
405
),
{Runtime@9, Draining_enqueue} = lightspeed@pipeline@orchestrator:enqueue_batch(
Runtime@8,
<<"batch-op-draining"/utf8>>,
<<"extract_orders"/utf8>>,
2,
20,
<<"op-key-draining"/utf8>>,
3
),
{Runtime@10, Resume_after_drain_result} = lightspeed@pipeline@orchestrator:apply_action(
Runtime@9,
{resume, <<"drain_complete"/utf8>>},
406
),
{Runtime@11, Replay_result} = lightspeed@pipeline@orchestrator:apply_action(
Runtime@10,
{replay, <<"normalize_orders"/utf8>>, <<"manual_replay"/utf8>>},
407
),
{Runtime@12, Resume_after_replay_result} = lightspeed@pipeline@orchestrator:apply_action(
Runtime@11,
{resume, <<"replay_complete"/utf8>>},
408
),
Paused_enqueue_label = result_label(Paused_enqueue),
Draining_enqueue_label = result_label(Draining_enqueue),
Lifecycle_label = lightspeed@pipeline:lifecycle_label(
lightspeed@pipeline:lifecycle(
lightspeed@pipeline@orchestrator:runtime_pipeline(Runtime@12)
)
),
Operator_signature = lightspeed@pipeline@operator:signature(
lightspeed@pipeline@orchestrator:runtime_operator(Runtime@12)
),
Passed = (((((((((((Pause_result =:= {ok, nil}) andalso (Resume_result =:= {ok,
nil}))
andalso (Drain_result =:= {ok, nil}))
andalso (Resume_after_drain_result =:= {ok, nil}))
andalso (Replay_result =:= {ok, nil}))
andalso (Resume_after_replay_result =:= {ok, nil}))
andalso gleam_stdlib:string_starts_with(
Paused_enqueue_label,
<<"error:operator_blocked:paused:"/utf8>>
))
andalso gleam_stdlib:string_starts_with(
Draining_enqueue_label,
<<"error:operator_blocked:draining:"/utf8>>
))
andalso gleam_stdlib:contains_string(
Lifecycle_label,
<<"replaying:run-2-replay:normalize_orders:"/utf8>>
))
andalso gleam_stdlib:contains_string(
Operator_signature,
<<"action=replay:normalize_orders:manual_replay"/utf8>>
))
andalso lightspeed@pipeline@operator:can_enqueue(
lightspeed@pipeline@orchestrator:runtime_operator(Runtime@12)
))
andalso lightspeed@pipeline@orchestrator:valid(Runtime@12),
{Passed,
<<<<<<<<<<<<<<"paused_enqueue="/utf8, Paused_enqueue_label/binary>>/binary,
"|draining_enqueue="/utf8>>/binary,
Draining_enqueue_label/binary>>/binary,
"|lifecycle="/utf8>>/binary,
Lifecycle_label/binary>>/binary,
"|operator="/utf8>>/binary,
Operator_signature/binary>>}.
-file("src/lightspeed/ops/connector_harness.gleam", 542).
-spec join_with(binary(), list(binary())) -> binary().
join_with(Separator, Values) ->
case Values of
[] ->
<<""/utf8>>;
[Value] ->
Value;
[Value@1 | Rest] ->
<<<<Value@1/binary, Separator/binary>>/binary,
(join_with(Separator, Rest))/binary>>
end.
-file("src/lightspeed/ops/connector_harness.gleam", 304).
-spec evaluate_backpressure_bounded_queue_path() -> {boolean(), binary()}.
evaluate_backpressure_bounded_queue_path() ->
Bounded = lightspeed@async@backpressure:boundary(push_pull, 1, 2, 250),
Runtime = begin
_pipe = fresh_runtime(Bounded),
lightspeed@pipeline@orchestrator:start_run(_pipe, 300)
end,
{Runtime@1, Enqueue_one} = lightspeed@pipeline@orchestrator:enqueue_batch(
Runtime,
<<"batch-bp-1"/utf8>>,
<<"extract_orders"/utf8>>,
4,
30,
<<"bp-key-1"/utf8>>,
1
),
{Runtime@2, Enqueue_two} = lightspeed@pipeline@orchestrator:enqueue_batch(
Runtime@1,
<<"batch-bp-2"/utf8>>,
<<"extract_orders"/utf8>>,
4,
30,
<<"bp-key-2"/utf8>>,
2
),
{Runtime@3, Enqueue_three} = lightspeed@pipeline@orchestrator:enqueue_batch(
Runtime@2,
<<"batch-bp-3"/utf8>>,
<<"extract_orders"/utf8>>,
4,
30,
<<"bp-key-3"/utf8>>,
3
),
{Runtime@4, Started} = lightspeed@pipeline@orchestrator:start_available(
Runtime@3,
301
),
Third_label = result_label(Enqueue_three),
Passed = ((((((Enqueue_one =:= {ok, nil}) andalso (Enqueue_two =:= {ok, nil}))
andalso gleam_stdlib:string_starts_with(
Third_label,
<<"error:backpressure:queue_saturated:"/utf8>>
))
andalso (lightspeed@pipeline@orchestrator:queued_count(Runtime@4) =< 2))
andalso (lightspeed@pipeline@orchestrator:in_flight_count(Runtime@4) =< 1))
andalso (erlang:length(Started) =< 1))
andalso lightspeed@pipeline@orchestrator:valid(Runtime@4),
{Passed,
<<<<<<<<<<<<<<"third_enqueue="/utf8, Third_label/binary>>/binary,
"|started="/utf8>>/binary,
(join_with(<<","/utf8>>, Started))/binary>>/binary,
"|queue="/utf8>>/binary,
(erlang:integer_to_binary(
lightspeed@pipeline@orchestrator:queued_count(Runtime@4)
))/binary>>/binary,
"|in_flight="/utf8>>/binary,
(erlang:integer_to_binary(
lightspeed@pipeline@orchestrator:in_flight_count(Runtime@4)
))/binary>>}.
-file("src/lightspeed/ops/connector_harness.gleam", 504).
-spec process_outcome_result_label(
{ok, lightspeed@pipeline@orchestrator:process_outcome()} | {error, binary()}
) -> binary().
process_outcome_result_label(Result) ->
case Result of
{ok, Outcome} ->
lightspeed@pipeline@orchestrator:process_outcome_label(Outcome);
{error, Reason} ->
<<"error:"/utf8, Reason/binary>>
end.
-file("src/lightspeed/ops/connector_harness.gleam", 248).
-spec evaluate_connector_failure_path() -> {boolean(), binary()}.
evaluate_connector_failure_path() ->
Runtime = fresh_runtime(lightspeed@async@backpressure:default_boundary()),
Runtime@1 = lightspeed@pipeline@orchestrator:start_run(Runtime, 200),
{Runtime@2, _} = lightspeed@pipeline@orchestrator:enqueue_batch(
Runtime@1,
<<"batch-failure-1"/utf8>>,
<<"normalize_orders"/utf8>>,
6,
55,
<<"failure-key-1"/utf8>>,
1
),
{Runtime@3, _} = lightspeed@pipeline@orchestrator:start_available(
Runtime@2,
201
),
{Runtime@4, Retry_outcome} = lightspeed@pipeline@orchestrator:fail_batch(
Runtime@3,
<<"batch-failure-1"/utf8>>,
1,
<<"timeout"/utf8>>,
202
),
{Runtime@5, _} = lightspeed@pipeline@orchestrator:start_available(
Runtime@4,
203
),
{Runtime@6, Dead_letter_outcome} = lightspeed@pipeline@orchestrator:fail_batch(
Runtime@5,
<<"batch-failure-1"/utf8>>,
3,
<<"poison_payload"/utf8>>,
204
),
Retry_label = process_outcome_result_label(Retry_outcome),
Dead_letter_label = process_outcome_result_label(Dead_letter_outcome),
Telemetry_label = lightspeed@pipeline@telemetry:label(
lightspeed@pipeline:runtime_telemetry(
lightspeed@pipeline@orchestrator:runtime_pipeline(Runtime@6)
)
),
Passed = ((((((gleam_stdlib:string_starts_with(
Retry_label,
<<"retry_scheduled:"/utf8>>
)
andalso gleam_stdlib:string_starts_with(
Dead_letter_label,
<<"dead_lettered:"/utf8>>
))
andalso gleam_stdlib:contains_string(Telemetry_label, <<"retries=1"/utf8>>))
andalso gleam_stdlib:contains_string(
Telemetry_label,
<<"dead_letters=1"/utf8>>
))
andalso (erlang:length(lightspeed@pipeline@orchestrator:retries(Runtime@6))
=:= 1))
andalso (erlang:length(
lightspeed@pipeline@orchestrator:dead_letters(Runtime@6)
)
=:= 1))
andalso (lightspeed@pipeline@orchestrator:batches(Runtime@6) =:= []))
andalso lightspeed@pipeline@orchestrator:valid(Runtime@6),
{Passed,
<<<<<<<<<<<<<<"retry="/utf8, Retry_label/binary>>/binary,
"|dead_letter="/utf8>>/binary,
Dead_letter_label/binary>>/binary,
"|telemetry="/utf8>>/binary,
Telemetry_label/binary>>/binary,
"|runtime="/utf8>>/binary,
(lightspeed@pipeline@orchestrator:signature(Runtime@6))/binary>>}.
-file("src/lightspeed/ops/connector_harness.gleam", 557).
-spec started_has_key(list(binary()), binary()) -> boolean().
started_has_key(Entries, Key) ->
case Entries of
[] ->
false;
[Entry | Rest] ->
case gleam_stdlib:string_starts_with(
Entry,
<<Key/binary, ":"/utf8>>
) of
true ->
true;
false ->
started_has_key(Rest, Key)
end
end.
-file("src/lightspeed/ops/connector_harness.gleam", 188).
-spec evaluate_connector_success_path() -> {boolean(), binary()}.
evaluate_connector_success_path() ->
Runtime = fresh_runtime(lightspeed@async@backpressure:default_boundary()),
Runtime@1 = lightspeed@pipeline@orchestrator:start_run(Runtime, 100),
{Runtime@2, Enqueue_source} = lightspeed@pipeline@orchestrator:enqueue_batch(
Runtime@1,
<<"batch-source-1"/utf8>>,
<<"extract_orders"/utf8>>,
8,
40,
<<"source-key-1"/utf8>>,
1
),
{Runtime@3, Started_source} = lightspeed@pipeline@orchestrator:start_available(
Runtime@2,
101
),
{Runtime@4, Source_outcome} = lightspeed@pipeline@orchestrator:ack_batch(
Runtime@3,
<<"batch-source-1"/utf8>>,
102
),
{Runtime@5, Enqueue_sink} = lightspeed@pipeline@orchestrator:enqueue_batch(
Runtime@4,
<<"batch-sink-1"/utf8>>,
<<"write_orders"/utf8>>,
8,
18,
<<"sink-key-1"/utf8>>,
2
),
{Runtime@6, Started_sink} = lightspeed@pipeline@orchestrator:start_available(
Runtime@5,
103
),
{Runtime@7, Sink_outcome} = lightspeed@pipeline@orchestrator:ack_batch(
Runtime@6,
<<"batch-sink-1"/utf8>>,
104
),
Runtime@8 = lightspeed@pipeline@orchestrator:complete_run(Runtime@7, 110),
Source_label = process_outcome_result_label(Source_outcome),
Sink_label = process_outcome_result_label(Sink_outcome),
Passed = ((((((((((Enqueue_source =:= {ok, nil}) andalso (Enqueue_sink =:= {ok,
nil}))
andalso started_has_key(Started_source, <<"batch-source-1"/utf8>>))
andalso started_has_key(Started_sink, <<"batch-sink-1"/utf8>>))
andalso gleam_stdlib:contains_string(
Source_label,
<<"processed:applied:"/utf8>>
))
andalso gleam_stdlib:contains_string(
Sink_label,
<<"processed:applied:"/utf8>>
))
andalso (lightspeed@pipeline@orchestrator:queued_count(Runtime@8) =:= 0))
andalso (lightspeed@pipeline@orchestrator:in_flight_count(Runtime@8) =:= 0))
andalso (lightspeed@pipeline@orchestrator:retries(Runtime@8) =:= []))
andalso (lightspeed@pipeline@orchestrator:dead_letters(Runtime@8) =:= []))
andalso lightspeed@pipeline@orchestrator:valid(Runtime@8),
{Passed,
<<<<<<<<<<<<<<"source="/utf8, Source_label/binary>>/binary,
"|sink="/utf8>>/binary,
Sink_label/binary>>/binary,
"|connector="/utf8>>/binary,
(lightspeed@pipeline@connector:signature(
lightspeed@pipeline@orchestrator:runtime_connector_plan(
Runtime@8
)
))/binary>>/binary,
"|runtime="/utf8>>/binary,
(lightspeed@pipeline@orchestrator:signature(Runtime@8))/binary>>}.
-file("src/lightspeed/ops/connector_harness.gleam", 179).
-spec evaluate(scenario()) -> {boolean(), binary()}.
evaluate(Scenario) ->
case Scenario of
connector_success_path ->
evaluate_connector_success_path();
connector_failure_path ->
evaluate_connector_failure_path();
backpressure_bounded_queue_path ->
evaluate_backpressure_bounded_queue_path();
operator_workflow_path ->
evaluate_operator_workflow_path()
end.
-file("src/lightspeed/ops/connector_harness.gleam", 61).
?DOC(" Run one scenario twice and require deterministic parity.\n").
-spec run_scenario(scenario()) -> scenario_outcome().
run_scenario(Scenario) ->
{First_passed, First_signature} = evaluate(Scenario),
{Second_passed, Second_signature} = evaluate(Scenario),
Deterministic = (First_passed =:= Second_passed) andalso (First_signature
=:= Second_signature),
Passed = (First_passed andalso Second_passed) andalso Deterministic,
{scenario_outcome, Scenario, Passed, Deterministic, First_signature}.
-file("src/lightspeed/ops/connector_harness.gleam", 43).
?DOC(" Run all M32 scenarios.\n").
-spec run_matrix() -> report().
run_matrix() ->
Outcomes = begin
_pipe = [connector_success_path,
connector_failure_path,
backpressure_bounded_queue_path,
operator_workflow_path],
gleam@list:map(_pipe, fun run_scenario/1)
end,
{report, Outcomes, count_failed(Outcomes), count_nondeterministic(Outcomes)}.
-file("src/lightspeed/ops/connector_harness.gleam", 77).
?DOC(" Scenario label.\n").
-spec scenario_label(scenario()) -> binary().
scenario_label(Scenario) ->
case Scenario of
connector_success_path ->
<<"connector_success_path"/utf8>>;
connector_failure_path ->
<<"connector_failure_path"/utf8>>;
backpressure_bounded_queue_path ->
<<"backpressure_bounded_queue_path"/utf8>>;
operator_workflow_path ->
<<"operator_workflow_path"/utf8>>
end.
-file("src/lightspeed/ops/connector_harness.gleam", 87).
?DOC(" Stable pass/fail label.\n").
-spec pass_fail_label(scenario_outcome()) -> binary().
pass_fail_label(Outcome) ->
case erlang:element(3, Outcome) of
true ->
<<"pass"/utf8>>;
false ->
<<"fail"/utf8>>
end.
-file("src/lightspeed/ops/connector_harness.gleam", 95).
?DOC(" Scenario signature.\n").
-spec signature(scenario_outcome()) -> binary().
signature(Outcome) ->
erlang:element(5, Outcome).
-file("src/lightspeed/ops/connector_harness.gleam", 100).
?DOC(" Scenario accessor.\n").
-spec scenario(scenario_outcome()) -> scenario().
scenario(Outcome) ->
erlang:element(2, Outcome).
-file("src/lightspeed/ops/connector_harness.gleam", 105).
?DOC(" Determinism accessor.\n").
-spec deterministic(scenario_outcome()) -> boolean().
deterministic(Outcome) ->
erlang:element(4, Outcome).
-file("src/lightspeed/ops/connector_harness.gleam", 110).
?DOC(" Report outcomes.\n").
-spec outcomes(report()) -> list(scenario_outcome()).
outcomes(Report) ->
erlang:element(2, Report).
-file("src/lightspeed/ops/connector_harness.gleam", 115).
?DOC(" Failed scenario count.\n").
-spec failed_scenarios(report()) -> integer().
failed_scenarios(Report) ->
erlang:element(3, Report).
-file("src/lightspeed/ops/connector_harness.gleam", 120).
?DOC(" Nondeterministic failure count.\n").
-spec nondeterministic_failures(report()) -> integer().
nondeterministic_failures(Report) ->
erlang:element(4, Report).
-file("src/lightspeed/ops/connector_harness.gleam", 550).
-spec bool_label(boolean()) -> binary().
bool_label(Value) ->
case Value of
true ->
<<"true"/utf8>>;
false ->
<<"false"/utf8>>
end.
-file("src/lightspeed/ops/connector_harness.gleam", 125).
?DOC(" Stable report signature.\n").
-spec report_signature(report()) -> binary().
report_signature(Report) ->
Entries = gleam@list:map(
erlang:element(2, Report),
fun(Outcome) ->
<<<<<<<<<<<<(scenario_label(erlang:element(2, Outcome)))/binary,
"="/utf8>>/binary,
(pass_fail_label(Outcome))/binary>>/binary,
":deterministic="/utf8>>/binary,
(bool_label(erlang:element(4, Outcome)))/binary>>/binary,
":"/utf8>>/binary,
(erlang:element(5, Outcome))/binary>>
end
),
join_with(<<";"/utf8>>, Entries).
-file("src/lightspeed/ops/connector_harness.gleam", 141).
?DOC(" Deterministic snapshot signature for fixture drift gates.\n").
-spec snapshot_signature() -> binary().
snapshot_signature() ->
<<<<<<"m32.snapshot.v"/utf8, (erlang:integer_to_binary(1))/binary>>/binary,
"|"/utf8>>/binary,
(report_signature(run_matrix()))/binary>>.
-file("src/lightspeed/ops/connector_harness.gleam", 149).
?DOC(" Deterministic markdown report for fixture scripts.\n").
-spec snapshot_report_markdown() -> binary().
snapshot_report_markdown() ->
Report = run_matrix(),
Failed = failed_scenarios(Report),
Nondeterministic = nondeterministic_failures(Report),
Status = case (Failed =:= 0) andalso (Nondeterministic =:= 0) of
true ->
<<"OK"/utf8>>;
false ->
<<"FAIL"/utf8>>
end,
<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<"# Connector Fixture Report\n\n"/utf8,
"snapshot_version: "/utf8>>/binary,
(erlang:integer_to_binary(
1
))/binary>>/binary,
"\n"/utf8>>/binary,
"status: "/utf8>>/binary,
Status/binary>>/binary,
"\n"/utf8>>/binary,
"failed_scenarios: "/utf8>>/binary,
(erlang:integer_to_binary(
Failed
))/binary>>/binary,
"\n"/utf8>>/binary,
"nondeterministic_failures: "/utf8>>/binary,
(erlang:integer_to_binary(Nondeterministic))/binary>>/binary,
"\n\n"/utf8>>/binary,
"snapshot_signature: "/utf8>>/binary,
(snapshot_signature())/binary>>/binary,
"\n\n"/utf8>>/binary,
"report_signature: "/utf8>>/binary,
(report_signature(Report))/binary>>/binary,
"\n"/utf8>>.