//// Deterministic ETL reliability and SLO harness for M33.
import gleam/int
import gleam/list
import gleam/string
import lightspeed/async/backpressure
import lightspeed/pipeline
import lightspeed/pipeline/connector
import lightspeed/pipeline/operator
import lightspeed/pipeline/orchestrator
import lightspeed/pipeline/quality
import lightspeed/pipeline/slo
import lightspeed/pipeline/telemetry as pipeline_telemetry
pub const snapshot_version = 1
pub const budget_snapshot_version = 1
/// M33 reliability scenarios.
pub type Scenario {
ConnectorOutageRecoveryPath
PoisonMessageReplayPath
SchemaEvolutionCompatibilityPath
ReplayIdempotencyPath
SloBudgetCompliancePath
}
/// One scenario outcome.
pub type ScenarioOutcome {
ScenarioOutcome(
scenario: Scenario,
passed: Bool,
deterministic: Bool,
signature: String,
)
}
/// Full M33 reliability report.
pub type Report {
Report(
outcomes: List(ScenarioOutcome),
failed_scenarios: Int,
nondeterministic_failures: Int,
)
}
/// Full M33 budget report.
pub type BudgetReport {
BudgetReport(
observations: List(slo.Observation),
results: List(slo.BudgetResult),
failed_budgets: Int,
)
}
/// Run all M33 reliability scenarios.
pub fn run_matrix() -> Report {
let outcomes =
[
ConnectorOutageRecoveryPath,
PoisonMessageReplayPath,
SchemaEvolutionCompatibilityPath,
ReplayIdempotencyPath,
SloBudgetCompliancePath,
]
|> list.map(run_scenario)
Report(
outcomes: outcomes,
failed_scenarios: count_failed(outcomes),
nondeterministic_failures: count_nondeterministic(outcomes),
)
}
/// Run one scenario twice and require deterministic parity.
pub fn run_scenario(scenario: Scenario) -> ScenarioOutcome {
let #(first_passed, first_signature) = evaluate(scenario)
let #(second_passed, second_signature) = evaluate(scenario)
let deterministic =
first_passed == second_passed && first_signature == second_signature
let passed = first_passed && second_passed && deterministic
ScenarioOutcome(
scenario: scenario,
passed: passed,
deterministic: deterministic,
signature: first_signature,
)
}
/// Scenario label.
pub fn scenario_label(scenario: Scenario) -> String {
case scenario {
ConnectorOutageRecoveryPath -> "connector_outage_recovery_path"
PoisonMessageReplayPath -> "poison_message_replay_path"
SchemaEvolutionCompatibilityPath -> "schema_evolution_compatibility_path"
ReplayIdempotencyPath -> "replay_idempotency_path"
SloBudgetCompliancePath -> "slo_budget_compliance_path"
}
}
/// Stable pass/fail label.
pub fn pass_fail_label(outcome: ScenarioOutcome) -> String {
case outcome.passed {
True -> "pass"
False -> "fail"
}
}
/// Scenario signature accessor.
pub fn signature(outcome: ScenarioOutcome) -> String {
outcome.signature
}
/// Scenario accessor.
pub fn scenario(outcome: ScenarioOutcome) -> Scenario {
outcome.scenario
}
/// Scenario determinism accessor.
pub fn deterministic(outcome: ScenarioOutcome) -> Bool {
outcome.deterministic
}
/// Report outcomes.
pub fn outcomes(report: Report) -> List(ScenarioOutcome) {
report.outcomes
}
/// Failed scenario count.
pub fn failed_scenarios(report: Report) -> Int {
report.failed_scenarios
}
/// Nondeterministic failure count.
pub fn nondeterministic_failures(report: Report) -> Int {
report.nondeterministic_failures
}
/// Stable report signature.
pub fn report_signature(report: Report) -> String {
let entries =
list.map(report.outcomes, fn(outcome) {
scenario_label(outcome.scenario)
<> "="
<> pass_fail_label(outcome)
<> ":deterministic="
<> bool_label(outcome.deterministic)
<> ":"
<> outcome.signature
})
join_with(";", entries)
}
/// Deterministic snapshot signature for fixture drift gates.
pub fn snapshot_signature() -> String {
"m33.snapshot.v"
<> int.to_string(snapshot_version)
<> "|"
<> report_signature(run_matrix())
}
/// Deterministic markdown report for fixture scripts.
pub fn snapshot_report_markdown() -> String {
let report = run_matrix()
let failed = failed_scenarios(report)
let nondeterministic = nondeterministic_failures(report)
let status = case failed == 0 && nondeterministic == 0 {
True -> "OK"
False -> "FAIL"
}
"# ETL Reliability Fixture Report\n\n"
<> "snapshot_version: "
<> int.to_string(snapshot_version)
<> "\n"
<> "status: "
<> status
<> "\n"
<> "failed_scenarios: "
<> int.to_string(failed)
<> "\n"
<> "nondeterministic_failures: "
<> int.to_string(nondeterministic)
<> "\n\n"
<> "snapshot_signature: "
<> snapshot_signature()
<> "\n\n"
<> "report_signature: "
<> report_signature(report)
<> "\n"
}
/// Run default M33 budget evaluation report.
pub fn run_budget_report() -> BudgetReport {
let observations = default_slo_observations()
let results = slo.evaluate(observations, slo.default_budget())
BudgetReport(
observations: observations,
results: results,
failed_budgets: slo.budget_failures(results),
)
}
/// Budget-report observation accessor.
pub fn budget_observations(report: BudgetReport) -> List(slo.Observation) {
report.observations
}
/// Budget-report results accessor.
pub fn budget_results(report: BudgetReport) -> List(slo.BudgetResult) {
report.results
}
/// Budget-report failed count accessor.
pub fn budget_failed_budgets(report: BudgetReport) -> Int {
report.failed_budgets
}
/// Stable budget report signature.
pub fn budget_report_signature(report: BudgetReport) -> String {
let observation_entries = list.map(report.observations, slo.observation_label)
let result_entries = list.map(report.results, slo.budget_result_label)
"observations="
<> join_with(";", observation_entries)
<> "|results="
<> join_with(";", result_entries)
<> "|failures="
<> int.to_string(report.failed_budgets)
}
/// Deterministic budget snapshot signature for drift gates.
pub fn budget_snapshot_signature() -> String {
"m33.slo.snapshot.v"
<> int.to_string(budget_snapshot_version)
<> "|"
<> budget_report_signature(run_budget_report())
}
/// Deterministic markdown budget report for scripts.
pub fn budget_report_markdown() -> String {
let report = run_budget_report()
let status = case report.failed_budgets == 0 {
True -> "OK"
False -> "FAIL"
}
"# ETL SLO Budget Report\n\n"
<> "budget_snapshot_version: "
<> int.to_string(budget_snapshot_version)
<> "\n"
<> "budget_version: "
<> slo.budget_version_label()
<> "\n"
<> "status: "
<> status
<> "\n"
<> "failed_budgets: "
<> int.to_string(report.failed_budgets)
<> "\n\n"
<> "budget_snapshot_signature: "
<> budget_snapshot_signature()
<> "\n\n"
<> "budget_report_signature: "
<> budget_report_signature(report)
<> "\n"
}
fn evaluate(scenario: Scenario) -> #(Bool, String) {
case scenario {
ConnectorOutageRecoveryPath -> evaluate_connector_outage_recovery_path()
PoisonMessageReplayPath -> evaluate_poison_message_replay_path()
SchemaEvolutionCompatibilityPath ->
evaluate_schema_evolution_compatibility_path()
ReplayIdempotencyPath -> evaluate_replay_idempotency_path()
SloBudgetCompliancePath -> evaluate_slo_budget_compliance_path()
}
}
fn evaluate_connector_outage_recovery_path() -> #(Bool, String) {
let runtime = fresh_orchestrator_runtime(backpressure.default_boundary())
let runtime = orchestrator.start_run(runtime, 100)
let #(runtime, enqueue_result) =
orchestrator.enqueue_batch(
runtime,
"batch-outage-1",
"extract_orders",
10,
90,
"source-outage-1",
1,
)
let #(runtime, started_before_fail) =
orchestrator.start_available(runtime, 101)
let #(runtime, retry_outcome) =
orchestrator.fail_batch(
runtime,
"batch-outage-1",
1,
"connector_outage",
102,
)
let #(runtime, started_after_retry) =
orchestrator.start_available(runtime, 103)
let #(runtime, success_outcome) =
orchestrator.ack_batch(runtime, "batch-outage-1", 104)
let runtime = orchestrator.complete_run(runtime, 110)
let telemetry_label =
pipeline_telemetry.label(
pipeline.runtime_telemetry(orchestrator.runtime_pipeline(runtime)),
)
let retry_label = process_outcome_result_label(retry_outcome)
let success_label = process_outcome_result_label(success_outcome)
let lifecycle_label =
pipeline.lifecycle_label(
pipeline.lifecycle(orchestrator.runtime_pipeline(runtime)),
)
let passed =
enqueue_result == Ok(Nil)
&& started_has_key(started_before_fail, "batch-outage-1")
&& started_has_key(started_after_retry, "batch-outage-1")
&& string.starts_with(retry_label, "retry_scheduled:batch-outage-1:2:")
&& string.contains(success_label, "processed:applied:")
&& string.starts_with(lifecycle_label, "completed:run-1:")
&& list.length(orchestrator.retries(runtime)) == 1
&& orchestrator.dead_letters(runtime) == []
&& string.contains(telemetry_label, "retries=1")
#(
passed,
"retry="
<> retry_label
<> "|success="
<> success_label
<> "|lifecycle="
<> lifecycle_label
<> "|telemetry="
<> telemetry_label,
)
}
fn evaluate_poison_message_replay_path() -> #(Bool, String) {
let runtime = fresh_orchestrator_runtime(backpressure.default_boundary())
let runtime = orchestrator.start_run(runtime, 200)
let #(runtime, _) =
orchestrator.enqueue_batch(
runtime,
"batch-source-2",
"extract_orders",
6,
60,
"source-2",
1,
)
let #(runtime, _) = orchestrator.start_available(runtime, 201)
let #(runtime, _) = orchestrator.ack_batch(runtime, "batch-source-2", 202)
let #(runtime, _) =
orchestrator.enqueue_batch(
runtime,
"batch-transform-2",
"normalize_orders",
6,
45,
"transform-2",
2,
)
let #(runtime, _) = orchestrator.start_available(runtime, 203)
let #(runtime, _) = orchestrator.ack_batch(runtime, "batch-transform-2", 204)
let #(runtime, _) =
orchestrator.enqueue_batch(
runtime,
"batch-poison-2",
"write_orders",
6,
40,
"poison-key-2",
3,
)
let #(runtime, _) = orchestrator.start_available(runtime, 205)
let #(runtime, dead_outcome) =
orchestrator.fail_batch(runtime, "batch-poison-2", 3, "poison_message", 206)
let #(runtime, replay_result) =
orchestrator.apply_action(
runtime,
operator.Replay(
from_stage: "normalize_orders",
reason: "quarantine_replay",
),
207,
)
let #(runtime, replay_resume_result) =
orchestrator.apply_action(
runtime,
operator.Resume(reason: "resume_after_replay"),
208,
)
let #(runtime, replay_enqueue_result) =
orchestrator.enqueue_batch(
runtime,
"batch-replay-2",
"write_orders",
6,
35,
"recovered-key-2",
4,
)
let #(runtime, _) = orchestrator.start_available(runtime, 209)
let #(runtime, replay_sink_outcome) =
orchestrator.ack_batch(runtime, "batch-replay-2", 210)
let runtime = orchestrator.complete_run(runtime, 215)
let dead_label = process_outcome_result_label(dead_outcome)
let replay_sink_label = process_outcome_result_label(replay_sink_outcome)
let lifecycle_label =
pipeline.lifecycle_label(
pipeline.lifecycle(orchestrator.runtime_pipeline(runtime)),
)
let operator_signature =
operator.signature(orchestrator.runtime_operator(runtime))
let passed =
string.contains(
dead_label,
"dead_lettered:batch-poison-2:orders.dead_letter:poison_message",
)
&& replay_result == Ok(Nil)
&& replay_resume_result == Ok(Nil)
&& replay_enqueue_result == Ok(Nil)
&& string.contains(replay_sink_label, "processed:applied:")
&& string.starts_with(lifecycle_label, "completed:run-2-replay:")
&& list.length(orchestrator.dead_letters(runtime)) == 1
&& string.contains(
operator_signature,
"action=replay:normalize_orders:quarantine_replay",
)
#(
passed,
"dead_letter="
<> dead_label
<> "|replay_sink="
<> replay_sink_label
<> "|lifecycle="
<> lifecycle_label
<> "|operator="
<> operator_signature,
)
}
fn evaluate_schema_evolution_compatibility_path() -> #(Bool, String) {
let v1 =
quality.schema("orders_boundary", 1, [
quality.field("order_id", quality.StringType, True),
quality.field("total_cents", quality.IntType, True),
quality.field("is_priority", quality.BoolType, False),
])
let v2 =
quality.schema("orders_boundary", 2, [
quality.field("order_id", quality.StringType, True),
quality.field("total_cents", quality.IntType, True),
quality.field("is_priority", quality.BoolType, False),
quality.field("currency_code", quality.StringType, False),
])
let breaking =
quality.schema("orders_boundary", 3, [
quality.field("order_id", quality.StringType, True),
quality.field("total_cents", quality.IntType, True),
quality.field("is_priority", quality.BoolType, False),
quality.field("currency_code", quality.StringType, False),
quality.field("tenant_id", quality.StringType, True),
])
let good_payload = [
quality.string_value("order_id", "order-9"),
quality.int_value("total_cents", 1900),
quality.bool_value("is_priority", True),
]
let bad_payload = [
quality.string_value("order_id", "order-10"),
quality.string_value("total_cents", "invalid"),
quality.bool_value("is_priority", False),
quality.int_value("unexpected_counter", 1),
]
let compatibility = quality.check_compatibility(v1, v2)
let incompatible = quality.check_compatibility(v2, breaking)
let good_validation = quality.validate(v2, good_payload)
let bad_validation = quality.validate(v2, bad_payload)
let compatibility_label = result_label(compatibility)
let incompatible_label = result_label(incompatible)
let good_label = quality.validation_result_label(good_validation)
let bad_label = quality.validation_result_label(bad_validation)
let passed =
compatibility == Ok(Nil)
&& incompatible == Error("new_required_field:tenant_id")
&& string.starts_with(good_label, "valid:")
&& string.contains(
bad_label,
"type_mismatch:total_cents:expected=int:actual=string",
)
&& string.contains(bad_label, "unexpected_field:unexpected_counter")
#(
passed,
"v1="
<> quality.schema_signature(v1)
<> "|v2="
<> quality.schema_signature(v2)
<> "|compatibility="
<> compatibility_label
<> "|breaking="
<> incompatible_label
<> "|good_validation="
<> good_label
<> "|bad_validation="
<> bad_label,
)
}
fn evaluate_replay_idempotency_path() -> #(Bool, String) {
let runtime = fresh_pipeline_runtime() |> pipeline.start(300)
let #(runtime, _) =
pipeline.process(runtime, "extract_orders", 5, 55, "source-3", 301)
let #(runtime, _) =
pipeline.process(runtime, "normalize_orders", 5, 35, "transform-3", 302)
let #(runtime, first_sink) =
pipeline.process(runtime, "write_orders", 5, 20, "sink-idempotent-1", 303)
let runtime = pipeline.crash(runtime, "worker_restart")
let resumed = case pipeline.resume_from_latest_checkpoint(runtime, 310) {
Ok(next) -> next
Error(_) -> runtime
}
let #(resumed, duplicate_sink) =
pipeline.process(resumed, "write_orders", 5, 18, "sink-idempotent-1", 311)
let #(resumed, second_sink) =
pipeline.process(resumed, "write_orders", 5, 16, "sink-idempotent-2", 312)
let resumed = pipeline.complete(resumed, 320)
let lifecycle_label = pipeline.lifecycle_label(pipeline.lifecycle(resumed))
let duplicate_label = pipeline.process_result_label(duplicate_sink)
let telemetry_label =
pipeline_telemetry.label(pipeline.runtime_telemetry(resumed))
let sink_keys = pipeline.sink_idempotency_keys(resumed)
let passed =
string.contains(
pipeline.process_result_label(first_sink),
"applied:run=run-1|stage=write_orders",
)
&& duplicate_label == "duplicate_suppressed:write_orders:sink-idempotent-1"
&& string.contains(
pipeline.process_result_label(second_sink),
"applied:run=run-2-replay|stage=write_orders",
)
&& sink_keys == ["sink-idempotent-1", "sink-idempotent-2"]
&& string.starts_with(lifecycle_label, "completed:run-2-replay:")
&& string.contains(telemetry_label, "dead_letters=0")
#(
passed,
"lifecycle="
<> lifecycle_label
<> "|duplicate="
<> duplicate_label
<> "|sink_keys="
<> join_with(",", sink_keys)
<> "|telemetry="
<> telemetry_label,
)
}
fn evaluate_slo_budget_compliance_path() -> #(Bool, String) {
let report = run_budget_report()
let strict_results =
slo.evaluate(default_slo_observations(), [
slo.budget("orders_hourly", 90, 500, 2, 2000),
])
let strict_failures = slo.budget_failures(strict_results)
let strict_labels = list.map(strict_results, slo.budget_result_label)
let report_signature = budget_report_signature(report)
let passed =
report.failed_budgets == 0
&& strict_failures == 1
&& string.contains(join_with(";", strict_labels), "lag_exceeded")
#(
passed,
"budget_signature="
<> report_signature
<> "|strict="
<> join_with(";", strict_labels),
)
}
fn default_slo_observations() -> List(slo.Observation) {
[
slo.observation("orders_hourly", 140, 420, 6, 200, 2200),
slo.observation("orders_replay", 280, 280, 9, 180, 3900),
]
}
fn fresh_pipeline_runtime() -> pipeline.Runtime {
pipeline.pipeline("orders_pipeline", pipeline.Interval(interval_ms: 60_000), [
pipeline.source_stage("extract_orders", "raw_order"),
pipeline.transform_stage(
"normalize_orders",
"raw_order",
"normalized_order",
),
pipeline.sink_stage("write_orders", "normalized_order"),
])
|> pipeline.new
}
fn fresh_orchestrator_runtime(
boundary: backpressure.Boundary,
) -> orchestrator.Runtime {
orchestrator.new(fresh_pipeline_runtime(), connector.default_plan(), boundary)
}
fn process_outcome_result_label(
result: Result(orchestrator.ProcessOutcome, String),
) -> String {
case result {
Ok(outcome) -> orchestrator.process_outcome_label(outcome)
Error(reason) -> "error:" <> reason
}
}
fn result_label(result: Result(Nil, String)) -> String {
case result {
Ok(_) -> "ok"
Error(reason) -> "error:" <> reason
}
}
fn started_has_key(entries: List(String), key: String) -> Bool {
case entries {
[] -> False
[entry, ..rest] ->
case string.starts_with(entry, key <> ":") {
True -> True
False -> started_has_key(rest, key)
}
}
}
fn count_failed(outcomes: List(ScenarioOutcome)) -> Int {
case outcomes {
[] -> 0
[outcome, ..rest] ->
case outcome.passed {
True -> count_failed(rest)
False -> 1 + count_failed(rest)
}
}
}
fn count_nondeterministic(outcomes: List(ScenarioOutcome)) -> Int {
case outcomes {
[] -> 0
[outcome, ..rest] ->
case outcome.deterministic {
True -> count_nondeterministic(rest)
False -> 1 + count_nondeterministic(rest)
}
}
}
fn bool_label(value: Bool) -> String {
case value {
True -> "true"
False -> "false"
}
}
fn join_with(separator: String, values: List(String)) -> String {
case values {
[] -> ""
[value] -> value
[value, ..rest] -> value <> separator <> join_with(separator, rest)
}
}