src/lightspeed/ops/large_data_harness.gleam

//// Deterministic large-data and async-runtime expansion harness for M28.

import gleam/int
import gleam/list
import gleam/option.{type Option, None, Some}
import gleam/string
import lightspeed/async/backpressure
import lightspeed/data_plane
import lightspeed/diff

pub const snapshot_version = 1

pub const budget_version = 1

/// M28 conformance scenarios.
pub type Scenario {
  LargeListWindowedIncremental
  LargeGridWindowedIncremental
  LargeChartWindowedIncremental
  AsyncBackpressureRecovery
  HeavyBudgetEnforcement
}

/// One scenario outcome.
pub type ScenarioOutcome {
  ScenarioOutcome(
    scenario: Scenario,
    passed: Bool,
    deterministic: Bool,
    signature: String,
  )
}

/// Full M28 report.
pub type Report {
  Report(
    outcomes: List(ScenarioOutcome),
    failed_scenarios: Int,
    nondeterministic_failures: Int,
  )
}

/// Heavy-workload benchmark metrics.
pub type Benchmark {
  Benchmark(
    workload: String,
    p50_latency_ms: Int,
    p95_latency_ms: Int,
    avg_payload_bytes: Int,
    avg_patch_ops: Int,
    peak_memory_units: Int,
    throughput_events_per_second: Int,
    root_churn_events: Int,
  )
}

/// Versioned M28 budget profile.
pub type Budget {
  Budget(
    workload: String,
    max_p95_latency_ms: Int,
    max_avg_payload_bytes: Int,
    max_avg_patch_ops: Int,
    max_peak_memory_units: Int,
    min_throughput_events_per_second: Int,
    max_root_churn_events: Int,
  )
}

/// Budget-check result.
pub type BudgetResult {
  BudgetResult(workload: String, passed: Bool, reason: String)
}

/// Run all M28 scenarios.
pub fn run_matrix() -> Report {
  let outcomes =
    [
      LargeListWindowedIncremental,
      LargeGridWindowedIncremental,
      LargeChartWindowedIncremental,
      AsyncBackpressureRecovery,
      HeavyBudgetEnforcement,
    ]
    |> list.map(run_scenario)

  Report(
    outcomes: outcomes,
    failed_scenarios: count_failed(outcomes),
    nondeterministic_failures: count_nondeterministic(outcomes),
  )
}

/// Run one scenario twice and require deterministic signature parity.
pub fn run_scenario(scenario: Scenario) -> ScenarioOutcome {
  let #(first_passed, first_signature) = evaluate(scenario)
  let #(second_passed, second_signature) = evaluate(scenario)
  let deterministic =
    first_passed == second_passed && first_signature == second_signature
  let passed = first_passed && second_passed && deterministic

  ScenarioOutcome(
    scenario: scenario,
    passed: passed,
    deterministic: deterministic,
    signature: first_signature,
  )
}

/// Scenario label.
pub fn scenario_label(scenario: Scenario) -> String {
  case scenario {
    LargeListWindowedIncremental -> "large_list_windowed_incremental"
    LargeGridWindowedIncremental -> "large_grid_windowed_incremental"
    LargeChartWindowedIncremental -> "large_chart_windowed_incremental"
    AsyncBackpressureRecovery -> "async_backpressure_recovery"
    HeavyBudgetEnforcement -> "heavy_budget_enforcement"
  }
}

/// Stable pass/fail label.
pub fn pass_fail_label(outcome: ScenarioOutcome) -> String {
  case outcome.passed {
    True -> "pass"
    False -> "fail"
  }
}

/// Scenario signature.
pub fn signature(outcome: ScenarioOutcome) -> String {
  outcome.signature
}

/// Determinism accessor.
pub fn deterministic(outcome: ScenarioOutcome) -> Bool {
  outcome.deterministic
}

/// Scenario accessor.
pub fn scenario(outcome: ScenarioOutcome) -> Scenario {
  outcome.scenario
}

/// Report outcomes.
pub fn outcomes(report: Report) -> List(ScenarioOutcome) {
  report.outcomes
}

/// Failed scenario count.
pub fn failed_scenarios(report: Report) -> Int {
  report.failed_scenarios
}

/// Nondeterministic scenario count.
pub fn nondeterministic_failures(report: Report) -> Int {
  report.nondeterministic_failures
}

/// Stable report signature.
pub fn report_signature(report: Report) -> String {
  let entries =
    list.map(report.outcomes, fn(outcome) {
      scenario_label(outcome.scenario)
      <> "="
      <> pass_fail_label(outcome)
      <> ":deterministic="
      <> bool_label(outcome.deterministic)
      <> ":"
      <> outcome.signature
    })

  join_with(";", entries)
}

/// M28 benchmark budget profile version.
pub fn budget_version_label() -> String {
  "m28.budget.v" <> int.to_string(budget_version)
}

/// Default M28 heavy-workload budgets.
pub fn default_budget() -> List(Budget) {
  [
    Budget(
      workload: "large_grid",
      max_p95_latency_ms: 95,
      max_avg_payload_bytes: 240,
      max_avg_patch_ops: 6,
      max_peak_memory_units: 520,
      min_throughput_events_per_second: 65,
      max_root_churn_events: 0,
    ),
    Budget(
      workload: "large_chart",
      max_p95_latency_ms: 120,
      max_avg_payload_bytes: 260,
      max_avg_patch_ops: 7,
      max_peak_memory_units: 540,
      min_throughput_events_per_second: 55,
      max_root_churn_events: 0,
    ),
  ]
}

/// Run deterministic benchmark suite for M28.
pub fn run_heavy_benchmarks() -> List(Benchmark) {
  [
    simulate_benchmark(
      "large_grid",
      data_plane.GridDataset(columns: 8),
      120,
      40,
    ),
    simulate_benchmark(
      "large_chart",
      data_plane.ChartDataset(series_key: "events"),
      160,
      50,
    ),
  ]
}

/// Evaluate M28 benchmark suite against one budget profile.
pub fn evaluate_budget(
  benchmarks: List(Benchmark),
  budgets: List(Budget),
) -> List(BudgetResult) {
  evaluate_budget_loop(benchmarks, budgets, [])
}

/// Number of failing budget checks.
pub fn budget_failures(results: List(BudgetResult)) -> Int {
  case results {
    [] -> 0
    [result, ..rest] ->
      case result.passed {
        True -> budget_failures(rest)
        False -> 1 + budget_failures(rest)
      }
  }
}

/// Stable benchmark signature.
pub fn benchmark_signature(benchmark: Benchmark) -> String {
  benchmark.workload
  <> ":p50="
  <> int.to_string(benchmark.p50_latency_ms)
  <> ":p95="
  <> int.to_string(benchmark.p95_latency_ms)
  <> ":avg_payload="
  <> int.to_string(benchmark.avg_payload_bytes)
  <> ":avg_ops="
  <> int.to_string(benchmark.avg_patch_ops)
  <> ":peak_mem="
  <> int.to_string(benchmark.peak_memory_units)
  <> ":throughput="
  <> int.to_string(benchmark.throughput_events_per_second)
  <> ":root_churn="
  <> int.to_string(benchmark.root_churn_events)
}

/// Deterministic snapshot signature for fixture drift gates.
pub fn snapshot_signature() -> String {
  "m28.snapshot.v"
  <> int.to_string(snapshot_version)
  <> "|"
  <> report_signature(run_matrix())
}

/// Deterministic markdown report for fixture scripts.
pub fn snapshot_report_markdown() -> String {
  let report = run_matrix()
  let failed = failed_scenarios(report)
  let nondeterministic = nondeterministic_failures(report)
  let status = case failed == 0 && nondeterministic == 0 {
    True -> "OK"
    False -> "FAIL"
  }

  "# Large Data Fixture Report\n\n"
  <> "snapshot_version: "
  <> int.to_string(snapshot_version)
  <> "\n"
  <> "budget_version: "
  <> budget_version_label()
  <> "\n"
  <> "status: "
  <> status
  <> "\n"
  <> "failed_scenarios: "
  <> int.to_string(failed)
  <> "\n"
  <> "nondeterministic_failures: "
  <> int.to_string(nondeterministic)
  <> "\n\n"
  <> "snapshot_signature: "
  <> snapshot_signature()
  <> "\n\n"
  <> "report_signature: "
  <> report_signature(report)
  <> "\n"
}

fn evaluate(scenario: Scenario) -> #(Bool, String) {
  case scenario {
    LargeListWindowedIncremental -> evaluate_large_list_window()
    LargeGridWindowedIncremental -> evaluate_large_grid_window()
    LargeChartWindowedIncremental -> evaluate_large_chart_window()
    AsyncBackpressureRecovery -> evaluate_async_backpressure()
    HeavyBudgetEnforcement -> evaluate_heavy_budget()
  }
}

fn evaluate_large_list_window() -> #(Bool, String) {
  let plane =
    data_plane.new(
      "#large-list",
      data_plane.ListDataset,
      large_rows("list", 300, []),
    )
  let request = data_plane.request(120, 40)
  let updates = [
    data_plane.Upsert(data_plane.row("list-130", "<li>list-130:patched</li>")),
    data_plane.Delete(id: "list-121"),
    data_plane.Upsert(data_plane.row("list-500", "<li>list-500:new</li>")),
  ]
  let #(next_plane, next_window, patches) =
    data_plane.apply_updates(plane, request, updates)
  let outside_updates = [
    data_plane.Upsert(data_plane.row("list-2", "<li>list-2:x</li>")),
  ]
  let #(_steady, _steady_window, steady_patches) =
    data_plane.apply_updates(next_plane, request, outside_updates)

  let passed = case next_window {
    Ok(window) ->
      data_plane.valid(plane)
      && data_plane.steady_state_incremental(patches)
      && data_plane.steady_state_incremental(steady_patches)
      && !data_plane.contains_full_root_churn(patches)
      && !data_plane.contains_full_root_churn(steady_patches)
      && window.total == 300
    Error(_) -> False
  }

  #(
    passed,
    "patch_ops="
      <> int.to_string(list.length(patches))
      <> "|steady_ops="
      <> int.to_string(list.length(steady_patches))
      <> "|next="
      <> window_result_signature(next_window),
  )
}

fn evaluate_large_grid_window() -> #(Bool, String) {
  let plane =
    data_plane.new(
      "#large-grid",
      data_plane.GridDataset(columns: 8),
      large_rows("grid", 420, []),
    )
  let request = data_plane.request(180, 60)
  let updates = [
    data_plane.Upsert(data_plane.row("grid-200", row_payload("grid-200", "r1"))),
    data_plane.Upsert(data_plane.row("grid-219", row_payload("grid-219", "r2"))),
    data_plane.Delete(id: "grid-181"),
    data_plane.Upsert(data_plane.row("grid-451", row_payload("grid-451", "new"))),
  ]
  let #(_next_plane, next_window, patches) =
    data_plane.apply_updates(plane, request, updates)

  let passed = case next_window {
    Ok(window) ->
      data_plane.valid(plane)
      && data_plane.steady_state_incremental(patches)
      && !data_plane.contains_full_root_churn(patches)
      && window.total == 420
      && list.length(window.rows) == 60
    Error(_) -> False
  }

  #(
    passed,
    "patch_ops="
      <> int.to_string(list.length(patches))
      <> "|next="
      <> window_result_signature(next_window),
  )
}

fn evaluate_large_chart_window() -> #(Bool, String) {
  let plane =
    data_plane.new(
      "#large-chart",
      data_plane.ChartDataset(series_key: "events"),
      large_rows("point", 360, []),
    )
  let request = data_plane.request(250, 50)
  let updates = [
    data_plane.Upsert(data_plane.row(
      "point-255",
      row_payload("point-255", "v1"),
    )),
    data_plane.Upsert(data_plane.row(
      "point-256",
      row_payload("point-256", "v2"),
    )),
    data_plane.Delete(id: "point-251"),
  ]
  let #(_next_plane, next_window, patches) =
    data_plane.apply_updates(plane, request, updates)

  let passed = case next_window {
    Ok(window) ->
      data_plane.valid(plane)
      && data_plane.steady_state_incremental(patches)
      && !data_plane.contains_full_root_churn(patches)
      && window.total == 359
      && list.length(window.rows) == 50
    Error(_) -> False
  }

  #(
    passed,
    "patch_ops="
      <> int.to_string(list.length(patches))
      <> "|next="
      <> window_result_signature(next_window),
  )
}

fn evaluate_async_backpressure() -> #(Bool, String) {
  let runtime =
    backpressure.new(backpressure.boundary(backpressure.PushPull, 2, 2, 200))
  // Queue in reverse semantic order so the first started batch is a,b.
  let #(runtime, b) = backpressure.enqueue(runtime, "b")
  let #(runtime, a) = backpressure.enqueue(runtime, "a")
  let #(runtime, overflow) = backpressure.enqueue(runtime, "overflow")
  let #(runtime, started) = backpressure.start_available(runtime, 10)
  let #(runtime, failed) = backpressure.fail(runtime, "a", "timeout")
  let #(runtime, cancelled) = backpressure.cancel(runtime, "b", "backpressure")
  let #(runtime, retry_failed) = backpressure.retry(runtime, "a")
  let #(runtime, retry_cancelled) = backpressure.retry(runtime, "b")
  let #(runtime, started_again) = backpressure.start_available(runtime, 30)
  let #(runtime, succeeded_a) = backpressure.succeed(runtime, "a")
  let #(runtime, succeeded_b) = backpressure.succeed(runtime, "b")

  let passed =
    backpressure.valid(runtime)
    && a == Ok(Nil)
    && b == Ok(Nil)
    && overflow
    == Error(backpressure.QueueSaturated(key: "overflow", max_queued: 2))
    && failed == Ok(Nil)
    && cancelled == Ok(Nil)
    && retry_failed == Ok(Nil)
    && retry_cancelled == Ok(Nil)
    && succeeded_a == Ok(Nil)
    && succeeded_b == Ok(Nil)
    && list.length(started) == 2
    && list.length(started_again) == 2
    && backpressure.in_flight_count(runtime) == 0
    && backpressure.queued_count(runtime) == 0
    && backpressure.state(runtime, "a") == Some(backpressure.Succeeded(seq: 2))
    && backpressure.state(runtime, "b") == Some(backpressure.Succeeded(seq: 1))

  #(
    passed,
    "overflow="
      <> result_signature(overflow)
      <> "|started="
      <> int.to_string(list.length(started))
      <> "|started_again="
      <> int.to_string(list.length(started_again))
      <> "|runtime="
      <> backpressure.signature(runtime),
  )
}

fn evaluate_heavy_budget() -> #(Bool, String) {
  let benchmarks = run_heavy_benchmarks()
  let budget = default_budget()
  let results = evaluate_budget(benchmarks, budget)
  let failures = budget_failures(results)
  let passed = failures == 0

  #(
    passed,
    "budget="
      <> budget_version_label()
      <> "|benchmarks="
      <> join_with(";", list.map(benchmarks, benchmark_signature))
      <> "|results="
      <> join_with(";", list.map(results, budget_result_signature))
      <> "|failures="
      <> int.to_string(failures),
  )
}

fn simulate_benchmark(
  workload: String,
  dataset: data_plane.Dataset,
  offset: Int,
  limit: Int,
) -> Benchmark {
  let target = "#" <> workload
  let plane = data_plane.new(target, dataset, large_rows(workload, 700, []))
  let request = data_plane.request(offset, limit)
  let updates = benchmark_updates(workload, offset, 12, [])
  let #(_next_plane, _next_window, patches) =
    data_plane.apply_updates(plane, request, updates)

  let total_payload_bytes = total_payload_bytes(patches, 0)
  let patch_ops = list.length(patches)
  let avg_payload_bytes =
    divide_safe(total_payload_bytes, max_int(1, patch_ops))
  let avg_patch_ops = divide_safe(patch_ops, 3)
  let root_churn_events = case data_plane.contains_full_root_churn(patches) {
    True -> 1
    False -> 0
  }
  let p50_latency = 12 + divide_safe(total_payload_bytes, 200)
  let p95_latency = p50_latency + 25 + patch_ops
  let peak_memory = 340 + patch_ops * 6
  let throughput = max_int(30, 140 - divide_safe(p95_latency, 2))

  Benchmark(
    workload: workload,
    p50_latency_ms: p50_latency,
    p95_latency_ms: p95_latency,
    avg_payload_bytes: avg_payload_bytes,
    avg_patch_ops: avg_patch_ops,
    peak_memory_units: peak_memory,
    throughput_events_per_second: throughput,
    root_churn_events: root_churn_events,
  )
}

fn benchmark_updates(
  prefix: String,
  offset: Int,
  count: Int,
  updates_rev: List(data_plane.Update),
) -> List(data_plane.Update) {
  case count <= 0 {
    True -> list.reverse(updates_rev)
    False -> {
      let id = prefix <> "-" <> int.to_string(offset + count)
      benchmark_updates(prefix, offset, count - 1, [
        data_plane.Upsert(data_plane.row(id, row_payload(id, "bench"))),
        ..updates_rev
      ])
    }
  }
}

fn total_payload_bytes(patches: List(diff.Patch), total: Int) -> Int {
  case patches {
    [] -> total
    [patch, ..rest] ->
      total_payload_bytes(rest, total + string.length(diff.encode(patch)))
  }
}

fn evaluate_budget_loop(
  benchmarks: List(Benchmark),
  budgets: List(Budget),
  results_rev: List(BudgetResult),
) -> List(BudgetResult) {
  case budgets {
    [] -> list.reverse(results_rev)
    [budget, ..rest] ->
      evaluate_budget_loop(benchmarks, rest, [
        evaluate_one_budget(benchmarks, budget),
        ..results_rev
      ])
  }
}

fn evaluate_one_budget(
  benchmarks: List(Benchmark),
  budget: Budget,
) -> BudgetResult {
  case find_benchmark(benchmarks, budget.workload) {
    None ->
      BudgetResult(workload: budget.workload, passed: False, reason: "missing")
    Some(benchmark) -> {
      let p95_ok = benchmark.p95_latency_ms <= budget.max_p95_latency_ms
      let payload_ok =
        benchmark.avg_payload_bytes <= budget.max_avg_payload_bytes
      let patch_ok = benchmark.avg_patch_ops <= budget.max_avg_patch_ops
      let memory_ok =
        benchmark.peak_memory_units <= budget.max_peak_memory_units
      let throughput_ok =
        benchmark.throughput_events_per_second
        >= budget.min_throughput_events_per_second
      let churn_ok = benchmark.root_churn_events <= budget.max_root_churn_events
      let passed =
        p95_ok
        && payload_ok
        && patch_ok
        && memory_ok
        && throughput_ok
        && churn_ok

      BudgetResult(
        workload: budget.workload,
        passed: passed,
        reason: case passed {
          True -> "within_budget"
          False ->
            "p95="
            <> bool_label(p95_ok)
            <> ",payload="
            <> bool_label(payload_ok)
            <> ",patch="
            <> bool_label(patch_ok)
            <> ",memory="
            <> bool_label(memory_ok)
            <> ",throughput="
            <> bool_label(throughput_ok)
            <> ",churn="
            <> bool_label(churn_ok)
        },
      )
    }
  }
}

fn find_benchmark(
  benchmarks: List(Benchmark),
  workload: String,
) -> Option(Benchmark) {
  case benchmarks {
    [] -> None
    [benchmark, ..rest] ->
      case benchmark.workload == workload {
        True -> Some(benchmark)
        False -> find_benchmark(rest, workload)
      }
  }
}

fn budget_result_signature(result: BudgetResult) -> String {
  result.workload
  <> ":"
  <> pass_fail_label_from_bool(result.passed)
  <> ":"
  <> result.reason
}

fn pass_fail_label_from_bool(value: Bool) -> String {
  case value {
    True -> "pass"
    False -> "fail"
  }
}

fn window_result_signature(
  result: Result(data_plane.Window, data_plane.QueryError),
) -> String {
  case result {
    Ok(window) -> data_plane.window_signature(window)
    Error(error) -> "error:" <> data_plane.query_error_label(error)
  }
}

fn result_signature(result: Result(Nil, backpressure.RuntimeError)) -> String {
  case result {
    Ok(_) -> "ok"
    Error(error) -> backpressure.error_label(error)
  }
}

fn large_rows(
  prefix: String,
  total: Int,
  rows_rev: List(data_plane.Row),
) -> List(data_plane.Row) {
  case total <= 0 {
    True -> list.reverse(rows_rev)
    False -> {
      let id = prefix <> "-" <> int.to_string(total)
      large_rows(prefix, total - 1, [
        data_plane.row(id, row_payload(id, "seed")),
        ..rows_rev
      ])
    }
  }
}

fn row_payload(id: String, tag: String) -> String {
  "<div data-id=\"" <> id <> "\" data-tag=\"" <> tag <> "\">" <> id <> "</div>"
}

fn count_failed(outcomes: List(ScenarioOutcome)) -> Int {
  case outcomes {
    [] -> 0
    [outcome, ..rest] ->
      case outcome.passed {
        True -> count_failed(rest)
        False -> 1 + count_failed(rest)
      }
  }
}

fn count_nondeterministic(outcomes: List(ScenarioOutcome)) -> Int {
  case outcomes {
    [] -> 0
    [outcome, ..rest] ->
      case outcome.deterministic {
        True -> count_nondeterministic(rest)
        False -> 1 + count_nondeterministic(rest)
      }
  }
}

fn divide_safe(numerator: Int, denominator: Int) -> Int {
  case denominator <= 0 {
    True -> 0
    False -> numerator / denominator
  }
}

fn max_int(left: Int, right: Int) -> Int {
  case left >= right {
    True -> left
    False -> right
  }
}

fn bool_label(value: Bool) -> String {
  case value {
    True -> "true"
    False -> "false"
  }
}

fn join_with(separator: String, values: List(String)) -> String {
  case values {
    [] -> ""
    [value] -> value
    [value, ..rest] -> value <> separator <> join_with(separator, rest)
  }
}