//// Pipeline telemetry contracts for lag/throughput/retry/dead-letter tracking.
import gleam/int
import lightspeed/ops/telemetry
/// Aggregated telemetry snapshot for one pipeline runtime.
pub type Snapshot {
Snapshot(
lag_ms: Int,
throughput_records_per_sec: Int,
retry_count: Int,
dead_letter_count: Int,
)
}
/// Delta update applied to a telemetry snapshot.
pub type Delta {
Delta(
processed_records: Int,
lag_ms: Int,
retry_count: Int,
dead_letter_count: Int,
elapsed_ms: Int,
)
}
/// Build a zeroed telemetry snapshot.
pub fn zero() -> Snapshot {
Snapshot(
lag_ms: 0,
throughput_records_per_sec: 0,
retry_count: 0,
dead_letter_count: 0,
)
}
/// Build one telemetry delta.
pub fn delta(
processed_records: Int,
lag_ms: Int,
retry_count: Int,
dead_letter_count: Int,
elapsed_ms: Int,
) -> Delta {
Delta(
processed_records: processed_records,
lag_ms: lag_ms,
retry_count: retry_count,
dead_letter_count: dead_letter_count,
elapsed_ms: elapsed_ms,
)
}
/// Apply one delta and return a new snapshot.
pub fn apply(snapshot: Snapshot, delta: Delta) -> Snapshot {
Snapshot(
lag_ms: max(snapshot.lag_ms, delta.lag_ms),
throughput_records_per_sec: throughput_from_delta(delta),
retry_count: max(0, snapshot.retry_count + delta.retry_count),
dead_letter_count: max(
0,
snapshot.dead_letter_count + delta.dead_letter_count,
),
)
}
/// Snapshot invariants.
pub fn valid(snapshot: Snapshot) -> Bool {
snapshot.lag_ms >= 0
&& snapshot.throughput_records_per_sec >= 0
&& snapshot.retry_count >= 0
&& snapshot.dead_letter_count >= 0
}
/// Stable snapshot label.
pub fn label(snapshot: Snapshot) -> String {
"lag_ms="
<> int.to_string(snapshot.lag_ms)
<> "|throughput_rps="
<> int.to_string(snapshot.throughput_records_per_sec)
<> "|retries="
<> int.to_string(snapshot.retry_count)
<> "|dead_letters="
<> int.to_string(snapshot.dead_letter_count)
}
/// Convert one snapshot to metric points.
pub fn metrics(
snapshot: Snapshot,
pipeline_name: String,
run_id: String,
) -> List(telemetry.Metric) {
[
telemetry.Gauge(
name: "lightspeed.pipeline.lag_ms",
value: snapshot.lag_ms,
tags: metric_tags(pipeline_name, run_id),
),
telemetry.Gauge(
name: "lightspeed.pipeline.throughput_records_per_sec",
value: snapshot.throughput_records_per_sec,
tags: metric_tags(pipeline_name, run_id),
),
telemetry.Counter(
name: "lightspeed.pipeline.retry_total",
value: snapshot.retry_count,
tags: metric_tags(pipeline_name, run_id),
),
telemetry.Counter(
name: "lightspeed.pipeline.dead_letter_total",
value: snapshot.dead_letter_count,
tags: metric_tags(pipeline_name, run_id),
),
]
}
fn throughput_from_delta(delta: Delta) -> Int {
case delta.elapsed_ms <= 0 {
True -> max(0, delta.processed_records)
False ->
max(0, { delta.processed_records * 1000 } / max(1, delta.elapsed_ms))
}
}
fn metric_tags(pipeline_name: String, run_id: String) -> List(telemetry.Tag) {
[
telemetry.Tag(key: "pipeline", value: pipeline_name),
telemetry.Tag(key: "run_id", value: run_id),
]
}
fn max(left: Int, right: Int) -> Int {
case left >= right {
True -> left
False -> right
}
}