//// Deterministic operational autopilot expansion harness (M54).
import gleam/int
import gleam/list
import gleam/string
import lightspeed/ops/operational_autopilot
pub const snapshot_version = 1
pub const policy_version = 1
/// M54 conformance scenarios.
pub type Scenario {
CrossSurfaceSloControlLoops
PolicyBoundedOverrideSemantics
DiagnosticsCorrelationAndRecoveryTiming
DeterministicAutopilotDrillCertification
RegressionGateDeterminism
}
/// One M54 scenario outcome.
pub type ScenarioOutcome {
ScenarioOutcome(
scenario: Scenario,
passed: Bool,
deterministic: Bool,
signature: String,
)
}
/// Full M54 report.
pub type Report {
Report(
outcomes: List(ScenarioOutcome),
failed_scenarios: Int,
nondeterministic_failures: Int,
)
}
/// Run all M54 scenarios.
pub fn run_matrix() -> Report {
let outcomes =
[
CrossSurfaceSloControlLoops,
PolicyBoundedOverrideSemantics,
DiagnosticsCorrelationAndRecoveryTiming,
DeterministicAutopilotDrillCertification,
RegressionGateDeterminism,
]
|> list.map(run_scenario)
Report(
outcomes: outcomes,
failed_scenarios: count_failed(outcomes),
nondeterministic_failures: count_nondeterministic(outcomes),
)
}
/// Run one M54 scenario twice and require deterministic parity.
pub fn run_scenario(scenario: Scenario) -> ScenarioOutcome {
let #(first_passed, first_signature) = evaluate(scenario)
let #(second_passed, second_signature) = evaluate(scenario)
let deterministic =
first_passed == second_passed && first_signature == second_signature
let passed = first_passed && second_passed && deterministic
ScenarioOutcome(
scenario: scenario,
passed: passed,
deterministic: deterministic,
signature: first_signature,
)
}
/// M54 policy version label.
pub fn policy_version_label() -> String {
"m54.policy.v" <> int.to_string(policy_version)
}
/// Scenario label.
pub fn scenario_label(scenario: Scenario) -> String {
case scenario {
CrossSurfaceSloControlLoops -> "cross_surface_slo_control_loops"
PolicyBoundedOverrideSemantics -> "policy_bounded_override_semantics"
DiagnosticsCorrelationAndRecoveryTiming ->
"diagnostics_correlation_and_recovery_timing"
DeterministicAutopilotDrillCertification ->
"deterministic_autopilot_drill_certification"
RegressionGateDeterminism -> "regression_gate_determinism"
}
}
/// Stable pass/fail label.
pub fn pass_fail_label(outcome: ScenarioOutcome) -> String {
case outcome.passed {
True -> "pass"
False -> "fail"
}
}
/// Scenario signature accessor.
pub fn signature(outcome: ScenarioOutcome) -> String {
outcome.signature
}
/// Scenario accessor.
pub fn scenario(outcome: ScenarioOutcome) -> Scenario {
outcome.scenario
}
/// Determinism accessor.
pub fn deterministic(outcome: ScenarioOutcome) -> Bool {
outcome.deterministic
}
/// Report outcomes accessor.
pub fn outcomes(report: Report) -> List(ScenarioOutcome) {
report.outcomes
}
/// Failed scenario count.
pub fn failed_scenarios(report: Report) -> Int {
report.failed_scenarios
}
/// Nondeterministic scenario count.
pub fn nondeterministic_failures(report: Report) -> Int {
report.nondeterministic_failures
}
/// Stable report signature.
pub fn report_signature(report: Report) -> String {
let entries =
list.map(report.outcomes, fn(outcome) {
scenario_label(outcome.scenario)
<> "="
<> pass_fail_label(outcome)
<> ":deterministic="
<> bool_label(outcome.deterministic)
<> ":"
<> outcome.signature
})
join_with(";", entries)
}
/// Deterministic snapshot signature for M54 fixture drift gates.
pub fn snapshot_signature() -> String {
"m54.snapshot.v"
<> int.to_string(snapshot_version)
<> "|"
<> report_signature(run_matrix())
}
/// Deterministic markdown report for M54 fixture scripts.
pub fn snapshot_report_markdown() -> String {
let report = run_matrix()
let failed = failed_scenarios(report)
let nondeterministic = nondeterministic_failures(report)
let status = case failed == 0 && nondeterministic == 0 {
True -> "OK"
False -> "FAIL"
}
"# Operational Autopilot Fixture Report\n\n"
<> "snapshot_version: "
<> int.to_string(snapshot_version)
<> "\n"
<> "policy_version: "
<> policy_version_label()
<> "\n"
<> "status: "
<> status
<> "\n"
<> "failed_scenarios: "
<> int.to_string(failed)
<> "\n"
<> "nondeterministic_failures: "
<> int.to_string(nondeterministic)
<> "\n\n"
<> "snapshot_signature: "
<> snapshot_signature()
<> "\n\n"
<> "report_signature: "
<> report_signature(report)
<> "\n"
}
fn evaluate(scenario: Scenario) -> #(Bool, String) {
case scenario {
CrossSurfaceSloControlLoops -> evaluate_cross_surface_slo_control_loops()
PolicyBoundedOverrideSemantics ->
evaluate_policy_bounded_override_semantics()
DiagnosticsCorrelationAndRecoveryTiming ->
evaluate_diagnostics_correlation_and_recovery_timing()
DeterministicAutopilotDrillCertification ->
evaluate_deterministic_autopilot_drill_certification()
RegressionGateDeterminism -> evaluate_regression_gate_determinism()
}
}
fn evaluate_cross_surface_slo_control_loops() -> #(Bool, String) {
let policy = operational_autopilot.default_policy()
let drills = operational_autopilot.run_default_drills()
let signature = join_drill_signatures(drills)
let passed =
list.length(drills) == 4
&& operational_autopilot.all_drills_valid(drills, policy)
&& string.contains(signature, "surface=runtime")
&& string.contains(signature, "surface=transport")
&& string.contains(signature, "surface=tenant")
&& string.contains(signature, "surface=etl")
#(passed, signature)
}
fn evaluate_policy_bounded_override_semantics() -> #(Bool, String) {
let strict = operational_autopilot.strict_policy()
let blocked = operational_autopilot.blocked_policy()
let tenant =
operational_autopilot.run_drill(
operational_autopilot.TenantHotspotIsolation,
strict,
)
let etl =
operational_autopilot.run_drill(operational_autopilot.EtlReplayLag, strict)
let blocked_tenant =
operational_autopilot.run_drill(
operational_autopilot.TenantHotspotIsolation,
blocked,
)
let signature =
operational_autopilot.policy_label(strict)
<> "|tenant="
<> operational_autopilot.drill_result_signature(tenant)
<> "|etl="
<> operational_autopilot.drill_result_signature(etl)
<> "|blocked="
<> operational_autopilot.drill_result_signature(blocked_tenant)
let tenant_action_requires_override =
tenant
|> operational_autopilot.actions
|> list.any(operational_autopilot.requires_override)
let etl_action_requires_override =
etl
|> operational_autopilot.actions
|> list.any(operational_autopilot.requires_override)
let blocked_action_has_policy_bound =
blocked_tenant
|> operational_autopilot.actions
|> list.any(operational_autopilot.blocked_by_policy)
let passed =
operational_autopilot.drill_result_valid(tenant, strict)
&& operational_autopilot.drill_result_valid(etl, strict)
&& tenant_action_requires_override
&& etl_action_requires_override
&& blocked_action_has_policy_bound
&& operational_autopilot.actions_bounded(blocked_tenant, blocked)
#(passed, signature)
}
fn evaluate_diagnostics_correlation_and_recovery_timing() -> #(Bool, String) {
let runtime =
operational_autopilot.run_drill(
operational_autopilot.RuntimeLatencySpike,
operational_autopilot.default_policy(),
)
let tenant =
operational_autopilot.run_drill(
operational_autopilot.TenantHotspotIsolation,
operational_autopilot.default_policy(),
)
let runtime_correlation =
runtime
|> operational_autopilot.correlation
|> operational_autopilot.correlation_label
let tenant_correlation =
tenant
|> operational_autopilot.correlation
|> operational_autopilot.correlation_label
let signature =
"runtime="
<> runtime_correlation
<> ":recovery_ms="
<> int.to_string(operational_autopilot.recovery_latency_ms(runtime))
<> "|tenant="
<> tenant_correlation
<> ":recovery_ms="
<> int.to_string(operational_autopilot.recovery_latency_ms(tenant))
let passed =
string.contains(runtime_correlation, "incident=m54-runtime-latency")
&& string.contains(tenant_correlation, "incident=m54-tenant-hotspot")
&& operational_autopilot.latency_reduced(runtime)
&& operational_autopilot.latency_reduced(tenant)
&& operational_autopilot.recovery_latency_ms(runtime) > 0
&& operational_autopilot.recovery_latency_ms(tenant) > 0
#(passed, signature)
}
fn evaluate_deterministic_autopilot_drill_certification() -> #(Bool, String) {
let policy = operational_autopilot.default_policy()
let first = operational_autopilot.run_default_drills()
let second = operational_autopilot.run_default_drills()
let first_signature = join_drill_signatures(first)
let second_signature = join_drill_signatures(second)
let passed =
first_signature == second_signature
&& operational_autopilot.all_drills_valid(first, policy)
&& operational_autopilot.all_drills_valid(second, policy)
#(passed, first_signature)
}
fn evaluate_regression_gate_determinism() -> #(Bool, String) {
let cross = run_scenario(CrossSurfaceSloControlLoops)
let policy = run_scenario(PolicyBoundedOverrideSemantics)
let diagnostics = run_scenario(DiagnosticsCorrelationAndRecoveryTiming)
let drills = run_scenario(DeterministicAutopilotDrillCertification)
let signature =
"cross="
<> pass_fail_label(cross)
<> ":"
<> cross.signature
<> "|policy="
<> pass_fail_label(policy)
<> ":"
<> policy.signature
<> "|diagnostics="
<> pass_fail_label(diagnostics)
<> ":"
<> diagnostics.signature
<> "|drills="
<> pass_fail_label(drills)
<> ":"
<> drills.signature
let passed =
cross.passed
&& policy.passed
&& diagnostics.passed
&& drills.passed
&& cross.deterministic
&& policy.deterministic
&& diagnostics.deterministic
&& drills.deterministic
#(passed, signature)
}
fn join_drill_signatures(
drills: List(operational_autopilot.DrillResult),
) -> String {
join_with("|", list.map(drills, operational_autopilot.drill_result_signature))
}
fn count_failed(outcomes: List(ScenarioOutcome)) -> Int {
outcomes
|> list.filter(fn(outcome) { !outcome.passed })
|> list.length
}
fn count_nondeterministic(outcomes: List(ScenarioOutcome)) -> Int {
outcomes
|> list.filter(fn(outcome) { !outcome.deterministic })
|> list.length
}
fn bool_label(value: Bool) -> String {
case value {
True -> "true"
False -> "false"
}
}
fn join_with(separator: String, values: List(String)) -> String {
case values {
[] -> ""
[first, ..rest] ->
list.fold(rest, first, fn(accumulator, value) {
accumulator <> separator <> value
})
}
}