//// Deterministic large-data and async-runtime expansion harness for M28.
import gleam/int
import gleam/list
import gleam/option.{type Option, None, Some}
import gleam/string
import lightspeed/async/backpressure
import lightspeed/data_plane
import lightspeed/diff
pub const snapshot_version = 1
pub const budget_version = 1
/// M28 conformance scenarios.
pub type Scenario {
LargeListWindowedIncremental
LargeGridWindowedIncremental
LargeChartWindowedIncremental
AsyncBackpressureRecovery
HeavyBudgetEnforcement
}
/// One scenario outcome.
pub type ScenarioOutcome {
ScenarioOutcome(
scenario: Scenario,
passed: Bool,
deterministic: Bool,
signature: String,
)
}
/// Full M28 report.
pub type Report {
Report(
outcomes: List(ScenarioOutcome),
failed_scenarios: Int,
nondeterministic_failures: Int,
)
}
/// Heavy-workload benchmark metrics.
pub type Benchmark {
Benchmark(
workload: String,
p50_latency_ms: Int,
p95_latency_ms: Int,
avg_payload_bytes: Int,
avg_patch_ops: Int,
peak_memory_units: Int,
throughput_events_per_second: Int,
root_churn_events: Int,
)
}
/// Versioned M28 budget profile.
pub type Budget {
Budget(
workload: String,
max_p95_latency_ms: Int,
max_avg_payload_bytes: Int,
max_avg_patch_ops: Int,
max_peak_memory_units: Int,
min_throughput_events_per_second: Int,
max_root_churn_events: Int,
)
}
/// Budget-check result.
pub type BudgetResult {
BudgetResult(workload: String, passed: Bool, reason: String)
}
/// Run all M28 scenarios.
pub fn run_matrix() -> Report {
let outcomes =
[
LargeListWindowedIncremental,
LargeGridWindowedIncremental,
LargeChartWindowedIncremental,
AsyncBackpressureRecovery,
HeavyBudgetEnforcement,
]
|> list.map(run_scenario)
Report(
outcomes: outcomes,
failed_scenarios: count_failed(outcomes),
nondeterministic_failures: count_nondeterministic(outcomes),
)
}
/// Run one scenario twice and require deterministic signature parity.
pub fn run_scenario(scenario: Scenario) -> ScenarioOutcome {
let #(first_passed, first_signature) = evaluate(scenario)
let #(second_passed, second_signature) = evaluate(scenario)
let deterministic =
first_passed == second_passed && first_signature == second_signature
let passed = first_passed && second_passed && deterministic
ScenarioOutcome(
scenario: scenario,
passed: passed,
deterministic: deterministic,
signature: first_signature,
)
}
/// Scenario label.
pub fn scenario_label(scenario: Scenario) -> String {
case scenario {
LargeListWindowedIncremental -> "large_list_windowed_incremental"
LargeGridWindowedIncremental -> "large_grid_windowed_incremental"
LargeChartWindowedIncremental -> "large_chart_windowed_incremental"
AsyncBackpressureRecovery -> "async_backpressure_recovery"
HeavyBudgetEnforcement -> "heavy_budget_enforcement"
}
}
/// Stable pass/fail label.
pub fn pass_fail_label(outcome: ScenarioOutcome) -> String {
case outcome.passed {
True -> "pass"
False -> "fail"
}
}
/// Scenario signature.
pub fn signature(outcome: ScenarioOutcome) -> String {
outcome.signature
}
/// Determinism accessor.
pub fn deterministic(outcome: ScenarioOutcome) -> Bool {
outcome.deterministic
}
/// Scenario accessor.
pub fn scenario(outcome: ScenarioOutcome) -> Scenario {
outcome.scenario
}
/// Report outcomes.
pub fn outcomes(report: Report) -> List(ScenarioOutcome) {
report.outcomes
}
/// Failed scenario count.
pub fn failed_scenarios(report: Report) -> Int {
report.failed_scenarios
}
/// Nondeterministic scenario count.
pub fn nondeterministic_failures(report: Report) -> Int {
report.nondeterministic_failures
}
/// Stable report signature.
pub fn report_signature(report: Report) -> String {
let entries =
list.map(report.outcomes, fn(outcome) {
scenario_label(outcome.scenario)
<> "="
<> pass_fail_label(outcome)
<> ":deterministic="
<> bool_label(outcome.deterministic)
<> ":"
<> outcome.signature
})
join_with(";", entries)
}
/// M28 benchmark budget profile version.
pub fn budget_version_label() -> String {
"m28.budget.v" <> int.to_string(budget_version)
}
/// Default M28 heavy-workload budgets.
pub fn default_budget() -> List(Budget) {
[
Budget(
workload: "large_grid",
max_p95_latency_ms: 95,
max_avg_payload_bytes: 240,
max_avg_patch_ops: 6,
max_peak_memory_units: 520,
min_throughput_events_per_second: 65,
max_root_churn_events: 0,
),
Budget(
workload: "large_chart",
max_p95_latency_ms: 120,
max_avg_payload_bytes: 260,
max_avg_patch_ops: 7,
max_peak_memory_units: 540,
min_throughput_events_per_second: 55,
max_root_churn_events: 0,
),
]
}
/// Run deterministic benchmark suite for M28.
pub fn run_heavy_benchmarks() -> List(Benchmark) {
[
simulate_benchmark(
"large_grid",
data_plane.GridDataset(columns: 8),
120,
40,
),
simulate_benchmark(
"large_chart",
data_plane.ChartDataset(series_key: "events"),
160,
50,
),
]
}
/// Evaluate M28 benchmark suite against one budget profile.
pub fn evaluate_budget(
benchmarks: List(Benchmark),
budgets: List(Budget),
) -> List(BudgetResult) {
evaluate_budget_loop(benchmarks, budgets, [])
}
/// Number of failing budget checks.
pub fn budget_failures(results: List(BudgetResult)) -> Int {
case results {
[] -> 0
[result, ..rest] ->
case result.passed {
True -> budget_failures(rest)
False -> 1 + budget_failures(rest)
}
}
}
/// Stable benchmark signature.
pub fn benchmark_signature(benchmark: Benchmark) -> String {
benchmark.workload
<> ":p50="
<> int.to_string(benchmark.p50_latency_ms)
<> ":p95="
<> int.to_string(benchmark.p95_latency_ms)
<> ":avg_payload="
<> int.to_string(benchmark.avg_payload_bytes)
<> ":avg_ops="
<> int.to_string(benchmark.avg_patch_ops)
<> ":peak_mem="
<> int.to_string(benchmark.peak_memory_units)
<> ":throughput="
<> int.to_string(benchmark.throughput_events_per_second)
<> ":root_churn="
<> int.to_string(benchmark.root_churn_events)
}
/// Deterministic snapshot signature for fixture drift gates.
pub fn snapshot_signature() -> String {
"m28.snapshot.v"
<> int.to_string(snapshot_version)
<> "|"
<> report_signature(run_matrix())
}
/// Deterministic markdown report for fixture scripts.
pub fn snapshot_report_markdown() -> String {
let report = run_matrix()
let failed = failed_scenarios(report)
let nondeterministic = nondeterministic_failures(report)
let status = case failed == 0 && nondeterministic == 0 {
True -> "OK"
False -> "FAIL"
}
"# Large Data Fixture Report\n\n"
<> "snapshot_version: "
<> int.to_string(snapshot_version)
<> "\n"
<> "budget_version: "
<> budget_version_label()
<> "\n"
<> "status: "
<> status
<> "\n"
<> "failed_scenarios: "
<> int.to_string(failed)
<> "\n"
<> "nondeterministic_failures: "
<> int.to_string(nondeterministic)
<> "\n\n"
<> "snapshot_signature: "
<> snapshot_signature()
<> "\n\n"
<> "report_signature: "
<> report_signature(report)
<> "\n"
}
fn evaluate(scenario: Scenario) -> #(Bool, String) {
case scenario {
LargeListWindowedIncremental -> evaluate_large_list_window()
LargeGridWindowedIncremental -> evaluate_large_grid_window()
LargeChartWindowedIncremental -> evaluate_large_chart_window()
AsyncBackpressureRecovery -> evaluate_async_backpressure()
HeavyBudgetEnforcement -> evaluate_heavy_budget()
}
}
fn evaluate_large_list_window() -> #(Bool, String) {
let plane =
data_plane.new(
"#large-list",
data_plane.ListDataset,
large_rows("list", 300, []),
)
let request = data_plane.request(120, 40)
let updates = [
data_plane.Upsert(data_plane.row("list-130", "<li>list-130:patched</li>")),
data_plane.Delete(id: "list-121"),
data_plane.Upsert(data_plane.row("list-500", "<li>list-500:new</li>")),
]
let #(next_plane, next_window, patches) =
data_plane.apply_updates(plane, request, updates)
let outside_updates = [
data_plane.Upsert(data_plane.row("list-2", "<li>list-2:x</li>")),
]
let #(_steady, _steady_window, steady_patches) =
data_plane.apply_updates(next_plane, request, outside_updates)
let passed = case next_window {
Ok(window) ->
data_plane.valid(plane)
&& data_plane.steady_state_incremental(patches)
&& data_plane.steady_state_incremental(steady_patches)
&& !data_plane.contains_full_root_churn(patches)
&& !data_plane.contains_full_root_churn(steady_patches)
&& window.total == 300
Error(_) -> False
}
#(
passed,
"patch_ops="
<> int.to_string(list.length(patches))
<> "|steady_ops="
<> int.to_string(list.length(steady_patches))
<> "|next="
<> window_result_signature(next_window),
)
}
fn evaluate_large_grid_window() -> #(Bool, String) {
let plane =
data_plane.new(
"#large-grid",
data_plane.GridDataset(columns: 8),
large_rows("grid", 420, []),
)
let request = data_plane.request(180, 60)
let updates = [
data_plane.Upsert(data_plane.row("grid-200", row_payload("grid-200", "r1"))),
data_plane.Upsert(data_plane.row("grid-219", row_payload("grid-219", "r2"))),
data_plane.Delete(id: "grid-181"),
data_plane.Upsert(data_plane.row("grid-451", row_payload("grid-451", "new"))),
]
let #(_next_plane, next_window, patches) =
data_plane.apply_updates(plane, request, updates)
let passed = case next_window {
Ok(window) ->
data_plane.valid(plane)
&& data_plane.steady_state_incremental(patches)
&& !data_plane.contains_full_root_churn(patches)
&& window.total == 420
&& list.length(window.rows) == 60
Error(_) -> False
}
#(
passed,
"patch_ops="
<> int.to_string(list.length(patches))
<> "|next="
<> window_result_signature(next_window),
)
}
fn evaluate_large_chart_window() -> #(Bool, String) {
let plane =
data_plane.new(
"#large-chart",
data_plane.ChartDataset(series_key: "events"),
large_rows("point", 360, []),
)
let request = data_plane.request(250, 50)
let updates = [
data_plane.Upsert(data_plane.row(
"point-255",
row_payload("point-255", "v1"),
)),
data_plane.Upsert(data_plane.row(
"point-256",
row_payload("point-256", "v2"),
)),
data_plane.Delete(id: "point-251"),
]
let #(_next_plane, next_window, patches) =
data_plane.apply_updates(plane, request, updates)
let passed = case next_window {
Ok(window) ->
data_plane.valid(plane)
&& data_plane.steady_state_incremental(patches)
&& !data_plane.contains_full_root_churn(patches)
&& window.total == 359
&& list.length(window.rows) == 50
Error(_) -> False
}
#(
passed,
"patch_ops="
<> int.to_string(list.length(patches))
<> "|next="
<> window_result_signature(next_window),
)
}
fn evaluate_async_backpressure() -> #(Bool, String) {
let runtime =
backpressure.new(backpressure.boundary(backpressure.PushPull, 2, 2, 200))
// Queue in reverse semantic order so the first started batch is a,b.
let #(runtime, b) = backpressure.enqueue(runtime, "b")
let #(runtime, a) = backpressure.enqueue(runtime, "a")
let #(runtime, overflow) = backpressure.enqueue(runtime, "overflow")
let #(runtime, started) = backpressure.start_available(runtime, 10)
let #(runtime, failed) = backpressure.fail(runtime, "a", "timeout")
let #(runtime, cancelled) = backpressure.cancel(runtime, "b", "backpressure")
let #(runtime, retry_failed) = backpressure.retry(runtime, "a")
let #(runtime, retry_cancelled) = backpressure.retry(runtime, "b")
let #(runtime, started_again) = backpressure.start_available(runtime, 30)
let #(runtime, succeeded_a) = backpressure.succeed(runtime, "a")
let #(runtime, succeeded_b) = backpressure.succeed(runtime, "b")
let passed =
backpressure.valid(runtime)
&& a == Ok(Nil)
&& b == Ok(Nil)
&& overflow
== Error(backpressure.QueueSaturated(key: "overflow", max_queued: 2))
&& failed == Ok(Nil)
&& cancelled == Ok(Nil)
&& retry_failed == Ok(Nil)
&& retry_cancelled == Ok(Nil)
&& succeeded_a == Ok(Nil)
&& succeeded_b == Ok(Nil)
&& list.length(started) == 2
&& list.length(started_again) == 2
&& backpressure.in_flight_count(runtime) == 0
&& backpressure.queued_count(runtime) == 0
&& backpressure.state(runtime, "a") == Some(backpressure.Succeeded(seq: 2))
&& backpressure.state(runtime, "b") == Some(backpressure.Succeeded(seq: 1))
#(
passed,
"overflow="
<> result_signature(overflow)
<> "|started="
<> int.to_string(list.length(started))
<> "|started_again="
<> int.to_string(list.length(started_again))
<> "|runtime="
<> backpressure.signature(runtime),
)
}
fn evaluate_heavy_budget() -> #(Bool, String) {
let benchmarks = run_heavy_benchmarks()
let budget = default_budget()
let results = evaluate_budget(benchmarks, budget)
let failures = budget_failures(results)
let passed = failures == 0
#(
passed,
"budget="
<> budget_version_label()
<> "|benchmarks="
<> join_with(";", list.map(benchmarks, benchmark_signature))
<> "|results="
<> join_with(";", list.map(results, budget_result_signature))
<> "|failures="
<> int.to_string(failures),
)
}
fn simulate_benchmark(
workload: String,
dataset: data_plane.Dataset,
offset: Int,
limit: Int,
) -> Benchmark {
let target = "#" <> workload
let plane = data_plane.new(target, dataset, large_rows(workload, 700, []))
let request = data_plane.request(offset, limit)
let updates = benchmark_updates(workload, offset, 12, [])
let #(_next_plane, _next_window, patches) =
data_plane.apply_updates(plane, request, updates)
let total_payload_bytes = total_payload_bytes(patches, 0)
let patch_ops = list.length(patches)
let avg_payload_bytes =
divide_safe(total_payload_bytes, max_int(1, patch_ops))
let avg_patch_ops = divide_safe(patch_ops, 3)
let root_churn_events = case data_plane.contains_full_root_churn(patches) {
True -> 1
False -> 0
}
let p50_latency = 12 + divide_safe(total_payload_bytes, 200)
let p95_latency = p50_latency + 25 + patch_ops
let peak_memory = 340 + patch_ops * 6
let throughput = max_int(30, 140 - divide_safe(p95_latency, 2))
Benchmark(
workload: workload,
p50_latency_ms: p50_latency,
p95_latency_ms: p95_latency,
avg_payload_bytes: avg_payload_bytes,
avg_patch_ops: avg_patch_ops,
peak_memory_units: peak_memory,
throughput_events_per_second: throughput,
root_churn_events: root_churn_events,
)
}
fn benchmark_updates(
prefix: String,
offset: Int,
count: Int,
updates_rev: List(data_plane.Update),
) -> List(data_plane.Update) {
case count <= 0 {
True -> list.reverse(updates_rev)
False -> {
let id = prefix <> "-" <> int.to_string(offset + count)
benchmark_updates(prefix, offset, count - 1, [
data_plane.Upsert(data_plane.row(id, row_payload(id, "bench"))),
..updates_rev
])
}
}
}
fn total_payload_bytes(patches: List(diff.Patch), total: Int) -> Int {
case patches {
[] -> total
[patch, ..rest] ->
total_payload_bytes(rest, total + string.length(diff.encode(patch)))
}
}
fn evaluate_budget_loop(
benchmarks: List(Benchmark),
budgets: List(Budget),
results_rev: List(BudgetResult),
) -> List(BudgetResult) {
case budgets {
[] -> list.reverse(results_rev)
[budget, ..rest] ->
evaluate_budget_loop(benchmarks, rest, [
evaluate_one_budget(benchmarks, budget),
..results_rev
])
}
}
fn evaluate_one_budget(
benchmarks: List(Benchmark),
budget: Budget,
) -> BudgetResult {
case find_benchmark(benchmarks, budget.workload) {
None ->
BudgetResult(workload: budget.workload, passed: False, reason: "missing")
Some(benchmark) -> {
let p95_ok = benchmark.p95_latency_ms <= budget.max_p95_latency_ms
let payload_ok =
benchmark.avg_payload_bytes <= budget.max_avg_payload_bytes
let patch_ok = benchmark.avg_patch_ops <= budget.max_avg_patch_ops
let memory_ok =
benchmark.peak_memory_units <= budget.max_peak_memory_units
let throughput_ok =
benchmark.throughput_events_per_second
>= budget.min_throughput_events_per_second
let churn_ok = benchmark.root_churn_events <= budget.max_root_churn_events
let passed =
p95_ok
&& payload_ok
&& patch_ok
&& memory_ok
&& throughput_ok
&& churn_ok
BudgetResult(
workload: budget.workload,
passed: passed,
reason: case passed {
True -> "within_budget"
False ->
"p95="
<> bool_label(p95_ok)
<> ",payload="
<> bool_label(payload_ok)
<> ",patch="
<> bool_label(patch_ok)
<> ",memory="
<> bool_label(memory_ok)
<> ",throughput="
<> bool_label(throughput_ok)
<> ",churn="
<> bool_label(churn_ok)
},
)
}
}
}
fn find_benchmark(
benchmarks: List(Benchmark),
workload: String,
) -> Option(Benchmark) {
case benchmarks {
[] -> None
[benchmark, ..rest] ->
case benchmark.workload == workload {
True -> Some(benchmark)
False -> find_benchmark(rest, workload)
}
}
}
fn budget_result_signature(result: BudgetResult) -> String {
result.workload
<> ":"
<> pass_fail_label_from_bool(result.passed)
<> ":"
<> result.reason
}
fn pass_fail_label_from_bool(value: Bool) -> String {
case value {
True -> "pass"
False -> "fail"
}
}
fn window_result_signature(
result: Result(data_plane.Window, data_plane.QueryError),
) -> String {
case result {
Ok(window) -> data_plane.window_signature(window)
Error(error) -> "error:" <> data_plane.query_error_label(error)
}
}
fn result_signature(result: Result(Nil, backpressure.RuntimeError)) -> String {
case result {
Ok(_) -> "ok"
Error(error) -> backpressure.error_label(error)
}
}
fn large_rows(
prefix: String,
total: Int,
rows_rev: List(data_plane.Row),
) -> List(data_plane.Row) {
case total <= 0 {
True -> list.reverse(rows_rev)
False -> {
let id = prefix <> "-" <> int.to_string(total)
large_rows(prefix, total - 1, [
data_plane.row(id, row_payload(id, "seed")),
..rows_rev
])
}
}
}
fn row_payload(id: String, tag: String) -> String {
"<div data-id=\"" <> id <> "\" data-tag=\"" <> tag <> "\">" <> id <> "</div>"
}
fn count_failed(outcomes: List(ScenarioOutcome)) -> Int {
case outcomes {
[] -> 0
[outcome, ..rest] ->
case outcome.passed {
True -> count_failed(rest)
False -> 1 + count_failed(rest)
}
}
}
fn count_nondeterministic(outcomes: List(ScenarioOutcome)) -> Int {
case outcomes {
[] -> 0
[outcome, ..rest] ->
case outcome.deterministic {
True -> count_nondeterministic(rest)
False -> 1 + count_nondeterministic(rest)
}
}
}
fn divide_safe(numerator: Int, denominator: Int) -> Int {
case denominator <= 0 {
True -> 0
False -> numerator / denominator
}
}
fn max_int(left: Int, right: Int) -> Int {
case left >= right {
True -> left
False -> right
}
}
fn bool_label(value: Bool) -> String {
case value {
True -> "true"
False -> "false"
}
}
fn join_with(separator: String, values: List(String)) -> String {
case values {
[] -> ""
[value] -> value
[value, ..rest] -> value <> separator <> join_with(separator, rest)
}
}