src/lightspeed/ops/etl_reliability_harness.gleam

//// Deterministic ETL reliability and SLO harness for M33.

import gleam/int
import gleam/list
import gleam/string
import lightspeed/async/backpressure
import lightspeed/pipeline
import lightspeed/pipeline/connector
import lightspeed/pipeline/operator
import lightspeed/pipeline/orchestrator
import lightspeed/pipeline/quality
import lightspeed/pipeline/slo
import lightspeed/pipeline/telemetry as pipeline_telemetry

pub const snapshot_version = 1

pub const budget_snapshot_version = 1

/// M33 reliability scenarios.
pub type Scenario {
  ConnectorOutageRecoveryPath
  PoisonMessageReplayPath
  SchemaEvolutionCompatibilityPath
  ReplayIdempotencyPath
  SloBudgetCompliancePath
}

/// One scenario outcome.
pub type ScenarioOutcome {
  ScenarioOutcome(
    scenario: Scenario,
    passed: Bool,
    deterministic: Bool,
    signature: String,
  )
}

/// Full M33 reliability report.
pub type Report {
  Report(
    outcomes: List(ScenarioOutcome),
    failed_scenarios: Int,
    nondeterministic_failures: Int,
  )
}

/// Full M33 budget report.
pub type BudgetReport {
  BudgetReport(
    observations: List(slo.Observation),
    results: List(slo.BudgetResult),
    failed_budgets: Int,
  )
}

/// Run all M33 reliability scenarios.
pub fn run_matrix() -> Report {
  let outcomes =
    [
      ConnectorOutageRecoveryPath,
      PoisonMessageReplayPath,
      SchemaEvolutionCompatibilityPath,
      ReplayIdempotencyPath,
      SloBudgetCompliancePath,
    ]
    |> list.map(run_scenario)

  Report(
    outcomes: outcomes,
    failed_scenarios: count_failed(outcomes),
    nondeterministic_failures: count_nondeterministic(outcomes),
  )
}

/// Run one scenario twice and require deterministic parity.
pub fn run_scenario(scenario: Scenario) -> ScenarioOutcome {
  let #(first_passed, first_signature) = evaluate(scenario)
  let #(second_passed, second_signature) = evaluate(scenario)
  let deterministic =
    first_passed == second_passed && first_signature == second_signature
  let passed = first_passed && second_passed && deterministic

  ScenarioOutcome(
    scenario: scenario,
    passed: passed,
    deterministic: deterministic,
    signature: first_signature,
  )
}

/// Scenario label.
pub fn scenario_label(scenario: Scenario) -> String {
  case scenario {
    ConnectorOutageRecoveryPath -> "connector_outage_recovery_path"
    PoisonMessageReplayPath -> "poison_message_replay_path"
    SchemaEvolutionCompatibilityPath -> "schema_evolution_compatibility_path"
    ReplayIdempotencyPath -> "replay_idempotency_path"
    SloBudgetCompliancePath -> "slo_budget_compliance_path"
  }
}

/// Stable pass/fail label.
pub fn pass_fail_label(outcome: ScenarioOutcome) -> String {
  case outcome.passed {
    True -> "pass"
    False -> "fail"
  }
}

/// Scenario signature accessor.
pub fn signature(outcome: ScenarioOutcome) -> String {
  outcome.signature
}

/// Scenario accessor.
pub fn scenario(outcome: ScenarioOutcome) -> Scenario {
  outcome.scenario
}

/// Scenario determinism accessor.
pub fn deterministic(outcome: ScenarioOutcome) -> Bool {
  outcome.deterministic
}

/// Report outcomes.
pub fn outcomes(report: Report) -> List(ScenarioOutcome) {
  report.outcomes
}

/// Failed scenario count.
pub fn failed_scenarios(report: Report) -> Int {
  report.failed_scenarios
}

/// Nondeterministic failure count.
pub fn nondeterministic_failures(report: Report) -> Int {
  report.nondeterministic_failures
}

/// Stable report signature.
pub fn report_signature(report: Report) -> String {
  let entries =
    list.map(report.outcomes, fn(outcome) {
      scenario_label(outcome.scenario)
      <> "="
      <> pass_fail_label(outcome)
      <> ":deterministic="
      <> bool_label(outcome.deterministic)
      <> ":"
      <> outcome.signature
    })

  join_with(";", entries)
}

/// Deterministic snapshot signature for fixture drift gates.
pub fn snapshot_signature() -> String {
  "m33.snapshot.v"
  <> int.to_string(snapshot_version)
  <> "|"
  <> report_signature(run_matrix())
}

/// Deterministic markdown report for fixture scripts.
pub fn snapshot_report_markdown() -> String {
  let report = run_matrix()
  let failed = failed_scenarios(report)
  let nondeterministic = nondeterministic_failures(report)
  let status = case failed == 0 && nondeterministic == 0 {
    True -> "OK"
    False -> "FAIL"
  }

  "# ETL Reliability Fixture Report\n\n"
  <> "snapshot_version: "
  <> int.to_string(snapshot_version)
  <> "\n"
  <> "status: "
  <> status
  <> "\n"
  <> "failed_scenarios: "
  <> int.to_string(failed)
  <> "\n"
  <> "nondeterministic_failures: "
  <> int.to_string(nondeterministic)
  <> "\n\n"
  <> "snapshot_signature: "
  <> snapshot_signature()
  <> "\n\n"
  <> "report_signature: "
  <> report_signature(report)
  <> "\n"
}

/// Run default M33 budget evaluation report.
pub fn run_budget_report() -> BudgetReport {
  let observations = default_slo_observations()
  let results = slo.evaluate(observations, slo.default_budget())

  BudgetReport(
    observations: observations,
    results: results,
    failed_budgets: slo.budget_failures(results),
  )
}

/// Budget-report observation accessor.
pub fn budget_observations(report: BudgetReport) -> List(slo.Observation) {
  report.observations
}

/// Budget-report results accessor.
pub fn budget_results(report: BudgetReport) -> List(slo.BudgetResult) {
  report.results
}

/// Budget-report failed count accessor.
pub fn budget_failed_budgets(report: BudgetReport) -> Int {
  report.failed_budgets
}

/// Stable budget report signature.
pub fn budget_report_signature(report: BudgetReport) -> String {
  let observation_entries = list.map(report.observations, slo.observation_label)
  let result_entries = list.map(report.results, slo.budget_result_label)

  "observations="
  <> join_with(";", observation_entries)
  <> "|results="
  <> join_with(";", result_entries)
  <> "|failures="
  <> int.to_string(report.failed_budgets)
}

/// Deterministic budget snapshot signature for drift gates.
pub fn budget_snapshot_signature() -> String {
  "m33.slo.snapshot.v"
  <> int.to_string(budget_snapshot_version)
  <> "|"
  <> budget_report_signature(run_budget_report())
}

/// Deterministic markdown budget report for scripts.
pub fn budget_report_markdown() -> String {
  let report = run_budget_report()
  let status = case report.failed_budgets == 0 {
    True -> "OK"
    False -> "FAIL"
  }

  "# ETL SLO Budget Report\n\n"
  <> "budget_snapshot_version: "
  <> int.to_string(budget_snapshot_version)
  <> "\n"
  <> "budget_version: "
  <> slo.budget_version_label()
  <> "\n"
  <> "status: "
  <> status
  <> "\n"
  <> "failed_budgets: "
  <> int.to_string(report.failed_budgets)
  <> "\n\n"
  <> "budget_snapshot_signature: "
  <> budget_snapshot_signature()
  <> "\n\n"
  <> "budget_report_signature: "
  <> budget_report_signature(report)
  <> "\n"
}

fn evaluate(scenario: Scenario) -> #(Bool, String) {
  case scenario {
    ConnectorOutageRecoveryPath -> evaluate_connector_outage_recovery_path()
    PoisonMessageReplayPath -> evaluate_poison_message_replay_path()
    SchemaEvolutionCompatibilityPath ->
      evaluate_schema_evolution_compatibility_path()
    ReplayIdempotencyPath -> evaluate_replay_idempotency_path()
    SloBudgetCompliancePath -> evaluate_slo_budget_compliance_path()
  }
}

fn evaluate_connector_outage_recovery_path() -> #(Bool, String) {
  let runtime = fresh_orchestrator_runtime(backpressure.default_boundary())
  let runtime = orchestrator.start_run(runtime, 100)
  let #(runtime, enqueue_result) =
    orchestrator.enqueue_batch(
      runtime,
      "batch-outage-1",
      "extract_orders",
      10,
      90,
      "source-outage-1",
      1,
    )
  let #(runtime, started_before_fail) =
    orchestrator.start_available(runtime, 101)
  let #(runtime, retry_outcome) =
    orchestrator.fail_batch(
      runtime,
      "batch-outage-1",
      1,
      "connector_outage",
      102,
    )
  let #(runtime, started_after_retry) =
    orchestrator.start_available(runtime, 103)
  let #(runtime, success_outcome) =
    orchestrator.ack_batch(runtime, "batch-outage-1", 104)
  let runtime = orchestrator.complete_run(runtime, 110)
  let telemetry_label =
    pipeline_telemetry.label(
      pipeline.runtime_telemetry(orchestrator.runtime_pipeline(runtime)),
    )
  let retry_label = process_outcome_result_label(retry_outcome)
  let success_label = process_outcome_result_label(success_outcome)
  let lifecycle_label =
    pipeline.lifecycle_label(
      pipeline.lifecycle(orchestrator.runtime_pipeline(runtime)),
    )

  let passed =
    enqueue_result == Ok(Nil)
    && started_has_key(started_before_fail, "batch-outage-1")
    && started_has_key(started_after_retry, "batch-outage-1")
    && string.starts_with(retry_label, "retry_scheduled:batch-outage-1:2:")
    && string.contains(success_label, "processed:applied:")
    && string.starts_with(lifecycle_label, "completed:run-1:")
    && list.length(orchestrator.retries(runtime)) == 1
    && orchestrator.dead_letters(runtime) == []
    && string.contains(telemetry_label, "retries=1")

  #(
    passed,
    "retry="
      <> retry_label
      <> "|success="
      <> success_label
      <> "|lifecycle="
      <> lifecycle_label
      <> "|telemetry="
      <> telemetry_label,
  )
}

fn evaluate_poison_message_replay_path() -> #(Bool, String) {
  let runtime = fresh_orchestrator_runtime(backpressure.default_boundary())
  let runtime = orchestrator.start_run(runtime, 200)
  let #(runtime, _) =
    orchestrator.enqueue_batch(
      runtime,
      "batch-source-2",
      "extract_orders",
      6,
      60,
      "source-2",
      1,
    )
  let #(runtime, _) = orchestrator.start_available(runtime, 201)
  let #(runtime, _) = orchestrator.ack_batch(runtime, "batch-source-2", 202)
  let #(runtime, _) =
    orchestrator.enqueue_batch(
      runtime,
      "batch-transform-2",
      "normalize_orders",
      6,
      45,
      "transform-2",
      2,
    )
  let #(runtime, _) = orchestrator.start_available(runtime, 203)
  let #(runtime, _) = orchestrator.ack_batch(runtime, "batch-transform-2", 204)
  let #(runtime, _) =
    orchestrator.enqueue_batch(
      runtime,
      "batch-poison-2",
      "write_orders",
      6,
      40,
      "poison-key-2",
      3,
    )
  let #(runtime, _) = orchestrator.start_available(runtime, 205)
  let #(runtime, dead_outcome) =
    orchestrator.fail_batch(runtime, "batch-poison-2", 3, "poison_message", 206)
  let #(runtime, replay_result) =
    orchestrator.apply_action(
      runtime,
      operator.Replay(
        from_stage: "normalize_orders",
        reason: "quarantine_replay",
      ),
      207,
    )
  let #(runtime, replay_resume_result) =
    orchestrator.apply_action(
      runtime,
      operator.Resume(reason: "resume_after_replay"),
      208,
    )
  let #(runtime, replay_enqueue_result) =
    orchestrator.enqueue_batch(
      runtime,
      "batch-replay-2",
      "write_orders",
      6,
      35,
      "recovered-key-2",
      4,
    )
  let #(runtime, _) = orchestrator.start_available(runtime, 209)
  let #(runtime, replay_sink_outcome) =
    orchestrator.ack_batch(runtime, "batch-replay-2", 210)
  let runtime = orchestrator.complete_run(runtime, 215)
  let dead_label = process_outcome_result_label(dead_outcome)
  let replay_sink_label = process_outcome_result_label(replay_sink_outcome)
  let lifecycle_label =
    pipeline.lifecycle_label(
      pipeline.lifecycle(orchestrator.runtime_pipeline(runtime)),
    )
  let operator_signature =
    operator.signature(orchestrator.runtime_operator(runtime))

  let passed =
    string.contains(
      dead_label,
      "dead_lettered:batch-poison-2:orders.dead_letter:poison_message",
    )
    && replay_result == Ok(Nil)
    && replay_resume_result == Ok(Nil)
    && replay_enqueue_result == Ok(Nil)
    && string.contains(replay_sink_label, "processed:applied:")
    && string.starts_with(lifecycle_label, "completed:run-2-replay:")
    && list.length(orchestrator.dead_letters(runtime)) == 1
    && string.contains(
      operator_signature,
      "action=replay:normalize_orders:quarantine_replay",
    )

  #(
    passed,
    "dead_letter="
      <> dead_label
      <> "|replay_sink="
      <> replay_sink_label
      <> "|lifecycle="
      <> lifecycle_label
      <> "|operator="
      <> operator_signature,
  )
}

fn evaluate_schema_evolution_compatibility_path() -> #(Bool, String) {
  let v1 =
    quality.schema("orders_boundary", 1, [
      quality.field("order_id", quality.StringType, True),
      quality.field("total_cents", quality.IntType, True),
      quality.field("is_priority", quality.BoolType, False),
    ])
  let v2 =
    quality.schema("orders_boundary", 2, [
      quality.field("order_id", quality.StringType, True),
      quality.field("total_cents", quality.IntType, True),
      quality.field("is_priority", quality.BoolType, False),
      quality.field("currency_code", quality.StringType, False),
    ])
  let breaking =
    quality.schema("orders_boundary", 3, [
      quality.field("order_id", quality.StringType, True),
      quality.field("total_cents", quality.IntType, True),
      quality.field("is_priority", quality.BoolType, False),
      quality.field("currency_code", quality.StringType, False),
      quality.field("tenant_id", quality.StringType, True),
    ])
  let good_payload = [
    quality.string_value("order_id", "order-9"),
    quality.int_value("total_cents", 1900),
    quality.bool_value("is_priority", True),
  ]
  let bad_payload = [
    quality.string_value("order_id", "order-10"),
    quality.string_value("total_cents", "invalid"),
    quality.bool_value("is_priority", False),
    quality.int_value("unexpected_counter", 1),
  ]
  let compatibility = quality.check_compatibility(v1, v2)
  let incompatible = quality.check_compatibility(v2, breaking)
  let good_validation = quality.validate(v2, good_payload)
  let bad_validation = quality.validate(v2, bad_payload)
  let compatibility_label = result_label(compatibility)
  let incompatible_label = result_label(incompatible)
  let good_label = quality.validation_result_label(good_validation)
  let bad_label = quality.validation_result_label(bad_validation)

  let passed =
    compatibility == Ok(Nil)
    && incompatible == Error("new_required_field:tenant_id")
    && string.starts_with(good_label, "valid:")
    && string.contains(
      bad_label,
      "type_mismatch:total_cents:expected=int:actual=string",
    )
    && string.contains(bad_label, "unexpected_field:unexpected_counter")

  #(
    passed,
    "v1="
      <> quality.schema_signature(v1)
      <> "|v2="
      <> quality.schema_signature(v2)
      <> "|compatibility="
      <> compatibility_label
      <> "|breaking="
      <> incompatible_label
      <> "|good_validation="
      <> good_label
      <> "|bad_validation="
      <> bad_label,
  )
}

fn evaluate_replay_idempotency_path() -> #(Bool, String) {
  let runtime = fresh_pipeline_runtime() |> pipeline.start(300)
  let #(runtime, _) =
    pipeline.process(runtime, "extract_orders", 5, 55, "source-3", 301)
  let #(runtime, _) =
    pipeline.process(runtime, "normalize_orders", 5, 35, "transform-3", 302)
  let #(runtime, first_sink) =
    pipeline.process(runtime, "write_orders", 5, 20, "sink-idempotent-1", 303)
  let runtime = pipeline.crash(runtime, "worker_restart")
  let resumed = case pipeline.resume_from_latest_checkpoint(runtime, 310) {
    Ok(next) -> next
    Error(_) -> runtime
  }
  let #(resumed, duplicate_sink) =
    pipeline.process(resumed, "write_orders", 5, 18, "sink-idempotent-1", 311)
  let #(resumed, second_sink) =
    pipeline.process(resumed, "write_orders", 5, 16, "sink-idempotent-2", 312)
  let resumed = pipeline.complete(resumed, 320)
  let lifecycle_label = pipeline.lifecycle_label(pipeline.lifecycle(resumed))
  let duplicate_label = pipeline.process_result_label(duplicate_sink)
  let telemetry_label =
    pipeline_telemetry.label(pipeline.runtime_telemetry(resumed))
  let sink_keys = pipeline.sink_idempotency_keys(resumed)

  let passed =
    string.contains(
      pipeline.process_result_label(first_sink),
      "applied:run=run-1|stage=write_orders",
    )
    && duplicate_label == "duplicate_suppressed:write_orders:sink-idempotent-1"
    && string.contains(
      pipeline.process_result_label(second_sink),
      "applied:run=run-2-replay|stage=write_orders",
    )
    && sink_keys == ["sink-idempotent-1", "sink-idempotent-2"]
    && string.starts_with(lifecycle_label, "completed:run-2-replay:")
    && string.contains(telemetry_label, "dead_letters=0")

  #(
    passed,
    "lifecycle="
      <> lifecycle_label
      <> "|duplicate="
      <> duplicate_label
      <> "|sink_keys="
      <> join_with(",", sink_keys)
      <> "|telemetry="
      <> telemetry_label,
  )
}

fn evaluate_slo_budget_compliance_path() -> #(Bool, String) {
  let report = run_budget_report()
  let strict_results =
    slo.evaluate(default_slo_observations(), [
      slo.budget("orders_hourly", 90, 500, 2, 2000),
    ])
  let strict_failures = slo.budget_failures(strict_results)
  let strict_labels = list.map(strict_results, slo.budget_result_label)
  let report_signature = budget_report_signature(report)

  let passed =
    report.failed_budgets == 0
    && strict_failures == 1
    && string.contains(join_with(";", strict_labels), "lag_exceeded")

  #(
    passed,
    "budget_signature="
      <> report_signature
      <> "|strict="
      <> join_with(";", strict_labels),
  )
}

fn default_slo_observations() -> List(slo.Observation) {
  [
    slo.observation("orders_hourly", 140, 420, 6, 200, 2200),
    slo.observation("orders_replay", 280, 280, 9, 180, 3900),
  ]
}

fn fresh_pipeline_runtime() -> pipeline.Runtime {
  pipeline.pipeline("orders_pipeline", pipeline.Interval(interval_ms: 60_000), [
    pipeline.source_stage("extract_orders", "raw_order"),
    pipeline.transform_stage(
      "normalize_orders",
      "raw_order",
      "normalized_order",
    ),
    pipeline.sink_stage("write_orders", "normalized_order"),
  ])
  |> pipeline.new
}

fn fresh_orchestrator_runtime(
  boundary: backpressure.Boundary,
) -> orchestrator.Runtime {
  orchestrator.new(fresh_pipeline_runtime(), connector.default_plan(), boundary)
}

fn process_outcome_result_label(
  result: Result(orchestrator.ProcessOutcome, String),
) -> String {
  case result {
    Ok(outcome) -> orchestrator.process_outcome_label(outcome)
    Error(reason) -> "error:" <> reason
  }
}

fn result_label(result: Result(Nil, String)) -> String {
  case result {
    Ok(_) -> "ok"
    Error(reason) -> "error:" <> reason
  }
}

fn started_has_key(entries: List(String), key: String) -> Bool {
  case entries {
    [] -> False
    [entry, ..rest] ->
      case string.starts_with(entry, key <> ":") {
        True -> True
        False -> started_has_key(rest, key)
      }
  }
}

fn count_failed(outcomes: List(ScenarioOutcome)) -> Int {
  case outcomes {
    [] -> 0
    [outcome, ..rest] ->
      case outcome.passed {
        True -> count_failed(rest)
        False -> 1 + count_failed(rest)
      }
  }
}

fn count_nondeterministic(outcomes: List(ScenarioOutcome)) -> Int {
  case outcomes {
    [] -> 0
    [outcome, ..rest] ->
      case outcome.deterministic {
        True -> count_nondeterministic(rest)
        False -> 1 + count_nondeterministic(rest)
      }
  }
}

fn bool_label(value: Bool) -> String {
  case value {
    True -> "true"
    False -> "false"
  }
}

fn join_with(separator: String, values: List(String)) -> String {
  case values {
    [] -> ""
    [value] -> value
    [value, ..rest] -> value <> separator <> join_with(separator, rest)
  }
}