src/lightspeed/pipeline/telemetry.gleam

//// Pipeline telemetry contracts for lag/throughput/retry/dead-letter tracking.

import gleam/int
import lightspeed/ops/telemetry

/// Aggregated telemetry snapshot for one pipeline runtime.
pub type Snapshot {
  Snapshot(
    lag_ms: Int,
    throughput_records_per_sec: Int,
    retry_count: Int,
    dead_letter_count: Int,
  )
}

/// Delta update applied to a telemetry snapshot.
pub type Delta {
  Delta(
    processed_records: Int,
    lag_ms: Int,
    retry_count: Int,
    dead_letter_count: Int,
    elapsed_ms: Int,
  )
}

/// Build a zeroed telemetry snapshot.
pub fn zero() -> Snapshot {
  Snapshot(
    lag_ms: 0,
    throughput_records_per_sec: 0,
    retry_count: 0,
    dead_letter_count: 0,
  )
}

/// Build one telemetry delta.
pub fn delta(
  processed_records: Int,
  lag_ms: Int,
  retry_count: Int,
  dead_letter_count: Int,
  elapsed_ms: Int,
) -> Delta {
  Delta(
    processed_records: processed_records,
    lag_ms: lag_ms,
    retry_count: retry_count,
    dead_letter_count: dead_letter_count,
    elapsed_ms: elapsed_ms,
  )
}

/// Apply one delta and return a new snapshot.
pub fn apply(snapshot: Snapshot, delta: Delta) -> Snapshot {
  Snapshot(
    lag_ms: max(snapshot.lag_ms, delta.lag_ms),
    throughput_records_per_sec: throughput_from_delta(delta),
    retry_count: max(0, snapshot.retry_count + delta.retry_count),
    dead_letter_count: max(
      0,
      snapshot.dead_letter_count + delta.dead_letter_count,
    ),
  )
}

/// Snapshot invariants.
pub fn valid(snapshot: Snapshot) -> Bool {
  snapshot.lag_ms >= 0
  && snapshot.throughput_records_per_sec >= 0
  && snapshot.retry_count >= 0
  && snapshot.dead_letter_count >= 0
}

/// Stable snapshot label.
pub fn label(snapshot: Snapshot) -> String {
  "lag_ms="
  <> int.to_string(snapshot.lag_ms)
  <> "|throughput_rps="
  <> int.to_string(snapshot.throughput_records_per_sec)
  <> "|retries="
  <> int.to_string(snapshot.retry_count)
  <> "|dead_letters="
  <> int.to_string(snapshot.dead_letter_count)
}

/// Convert one snapshot to metric points.
pub fn metrics(
  snapshot: Snapshot,
  pipeline_name: String,
  run_id: String,
) -> List(telemetry.Metric) {
  [
    telemetry.Gauge(
      name: "lightspeed.pipeline.lag_ms",
      value: snapshot.lag_ms,
      tags: metric_tags(pipeline_name, run_id),
    ),
    telemetry.Gauge(
      name: "lightspeed.pipeline.throughput_records_per_sec",
      value: snapshot.throughput_records_per_sec,
      tags: metric_tags(pipeline_name, run_id),
    ),
    telemetry.Counter(
      name: "lightspeed.pipeline.retry_total",
      value: snapshot.retry_count,
      tags: metric_tags(pipeline_name, run_id),
    ),
    telemetry.Counter(
      name: "lightspeed.pipeline.dead_letter_total",
      value: snapshot.dead_letter_count,
      tags: metric_tags(pipeline_name, run_id),
    ),
  ]
}

fn throughput_from_delta(delta: Delta) -> Int {
  case delta.elapsed_ms <= 0 {
    True -> max(0, delta.processed_records)
    False ->
      max(0, { delta.processed_records * 1000 } / max(1, delta.elapsed_ms))
  }
}

fn metric_tags(pipeline_name: String, run_id: String) -> List(telemetry.Tag) {
  [
    telemetry.Tag(key: "pipeline", value: pipeline_name),
    telemetry.Tag(key: "run_id", value: run_id),
  ]
}

fn max(left: Int, right: Int) -> Int {
  case left >= right {
    True -> left
    False -> right
  }
}