Skip to main content

src/pharos/internal/connection.gleam

//// The "Brain" connection manager
////
//// An `eparch/state_machine` that owns delivery of metrics/alerts to the
//// central "Brain" (or any remote sink) and degrades gracefully under network
//// partitions:
////
//// ```
//// Disconnected ──tick──▶ Connecting ──ok──▶ Streaming
////      ▲                     │                  │
////      └──────fail───────────┘                  │ over capacity
////      ▲                                         ▼
////      └──────────conn lost──────────────────Throttled
//// ```
////
//// - **Disconnected** buffers everything in the hot buffer and arms a retry
////   timer using exponential backoff **with jitter** (so a fleet doesn't
////   reconnect in lockstep).
//// - **Connecting** performs one connection attempt; success flushes the
////   buffer and moves to Streaming, failure bumps the backoff and returns to
////   Disconnected.
//// - **Streaming** delivers each metric as it arrives, flushing the buffer
////   first. A lost connection drops back to Disconnected (re-buffering).
//// - **Throttled** is entered when the buffer outruns `max_pending`; it paces
////   draining and returns to Streaming once it has caught up.
////
//// The transport is injected (`Transport`) so the manager is testable without
//// a real Brain and so alternative transports (e.g. OTLP) can reuse the FSM.

import eparch/state_machine.{
  type StartError, After, StateTimeout, StateTimeoutType,
}
import gleam/erlang/process.{type Name, type Pid}
import gleam/int
import gleam/list
import gleam/option.{type Option, None, Some}
import gleam/otp/actor
import gleam/otp/supervision.{type ChildSpecification}
import pharos/internal/hot_buffer.{type HotBuffer}
import pharos/internal/spillover.{type Spillover}
import pharos/metric.{type Metric}

/// The four lifecycle states (also the FSM's `state` value).
pub type ConnectionState {
  Disconnected
  Connecting
  Streaming
  Throttled
}

/// Tuning knobs for reconnection backoff and backpressure.
pub type ConnectionConfig {
  ConnectionConfig(
    /// Base backoff delay in milliseconds (first retry, pre-jitter).
    base_ms: Int,
    /// Maximum backoff delay in milliseconds (cap on exponential growth).
    max_ms: Int,
    /// Buffer depth above which the manager enters `Throttled`.
    max_pending: Int,
    /// How often (ms) to drain the buffer while `Streaming`, so metrics pushed
    /// straight to the buffer (e.g. by the telemetry handler) are shipped
    /// without an explicit `deliver` cast.
    flush_interval_ms: Int,
  )
}

/// Pluggable delivery transport. `connect` establishes/verifies reachability;
/// `deliver` ships one metric. Both return `Error(reason)` on failure.
pub type Transport {
  Transport(
    connect: fn() -> Result(Nil, String),
    deliver: fn(Metric) -> Result(Nil, String),
  )
}

/// Restart-resilient handle to the connection manager.
pub opaque type Connection {
  Connection(name: Name(Message))
}

/// FSM messages. `Deliver` is the only external command; the rest are
/// self-armed timer payloads.
pub type Message {
  /// Enqueue a metric for delivery to the Brain.
  Deliver(Metric)
  /// Disconnected retry timer fired: time to attempt a connection.
  Tick
  /// Connecting entry timer fired: perform the actual connection attempt.
  Attempt
  /// Throttled pacing timer fired: try to drain the backlog.
  DrainTick
  /// Streaming periodic timer fired: drain whatever the buffer has accrued.
  FlushTick
}

type Data {
  Data(
    transport: Transport,
    buffer: HotBuffer,
    /// Disk-backed cold tier. `None` keeps the manager RAM-only; `Some` spills
    /// the hot buffer to disk during partitions and replays it first on
    /// reconnect.
    spillover: Option(Spillover),
    config: ConnectionConfig,
    /// Consecutive failed connection attempts; drives the backoff curve.
    attempt: Int,
  )
}

/// Wrap a registered name as a handle (mirrors the alert_manager pattern), so
/// the handle stays valid across supervisor restarts.
pub fn from_name(name: Name(Message)) -> Connection {
  Connection(name: name)
}

/// Start the connection manager registered under `name`, delivering through
/// `transport` and buffering in `buffer`. `spillover` is the optional disk
/// cold tier (`None` = RAM-only).
pub fn start_link(
  name: Name(Message),
  transport: Transport,
  buffer: HotBuffer,
  spillover: Option(Spillover),
  config: ConnectionConfig,
) -> Result(Connection, StartError) {
  case do_start(name, transport, buffer, spillover, config) {
    Ok(#(_pid, connection)) -> Ok(connection)
    Error(error) -> Error(error)
  }
}

/// A supervised worker running the connection manager.
pub fn child_spec(
  name: Name(Message),
  transport: Transport,
  buffer: HotBuffer,
  spillover: Option(Spillover),
  config: ConnectionConfig,
) -> ChildSpecification(Connection) {
  supervision.worker(fn() {
    case do_start(name, transport, buffer, spillover, config) {
      Ok(#(pid, connection)) -> Ok(actor.Started(pid: pid, data: connection))
      Error(error) -> Error(actor.InitFailed(start_error_to_string(error)))
    }
  })
}

fn do_start(
  name: Name(Message),
  transport: Transport,
  buffer: HotBuffer,
  spillover: Option(Spillover),
  config: ConnectionConfig,
) -> Result(#(Pid, Connection), StartError) {
  let initial = Data(transport:, buffer:, spillover:, config:, attempt: 0)
  let result =
    state_machine.new(initial_state: Disconnected, initial_data: initial)
    |> state_machine.named(state_machine.Local(name))
    |> state_machine.on_event(handle_event)
    |> state_machine.start_link

  case result {
    // Kick the lifecycle: arm the first (immediate) connection attempt.
    Ok(started) -> {
      let connection = Connection(name: name)
      schedule_first_tick(connection)
      Ok(#(started.pid, connection))
    }
    Error(error) -> Error(error)
  }
}

fn start_error_to_string(error: StartError) -> String {
  case error {
    state_machine.InitTimeout -> "connection manager init timeout"
    state_machine.InitFailed(reason) -> reason
    state_machine.InitExited(_) -> "connection manager init exited"
    state_machine.AlreadyStarted(_) ->
      "connection manager name already registered"
  }
}

/// Enqueue `metric` for delivery. Non-blocking.
pub fn deliver(connection: Connection, metric: Metric) -> Nil {
  state_machine.cast(server_ref(connection), Deliver(metric))
}

fn schedule_first_tick(connection: Connection) -> Nil {
  state_machine.cast(server_ref(connection), Tick)
}

fn server_ref(connection: Connection) -> state_machine.ServerRef(Message) {
  state_machine.ref_from_subject(process.named_subject(connection.name))
}

// ---------------------------------------------------------------------------
// Event handling
// ---------------------------------------------------------------------------

fn handle_event(
  event: state_machine.Event(ConnectionState, Message, reply),
  state: ConnectionState,
  data: Data,
) -> state_machine.Step(ConnectionState, Data, Message, reply) {
  case event {
    state_machine.Cast(Deliver(metric)) -> on_deliver(state, data, metric)
    state_machine.Cast(Tick) -> on_tick(state, data)
    state_machine.Cast(Attempt) -> on_attempt(state, data)
    state_machine.Cast(DrainTick) -> on_drain_tick(state, data)
    state_machine.Cast(FlushTick) -> on_flush_tick(state, data)
    state_machine.Timeout(StateTimeoutType, Tick) -> on_tick(state, data)
    state_machine.Timeout(StateTimeoutType, Attempt) -> on_attempt(state, data)
    state_machine.Timeout(StateTimeoutType, DrainTick) ->
      on_drain_tick(state, data)
    state_machine.Timeout(StateTimeoutType, FlushTick) ->
      on_flush_tick(state, data)
    // Inert: we only ever arm state timeouts with the payloads above.
    state_machine.Timeout(_, _) -> state_machine.keep_state(data, [])
    state_machine.Info(_) -> state_machine.keep_state(data, [])
    state_machine.Call(_, _) -> state_machine.keep_state(data, [])
    state_machine.Enter(_) -> state_machine.keep_state(data, [])
  }
}

fn on_deliver(
  state: ConnectionState,
  data: Data,
  metric: Metric,
) -> state_machine.Step(ConnectionState, Data, Message, reply) {
  case state {
    // Buffering states: spill the ring to disk before it can evict, then keep
    // the new metric in RAM. The connect path will replay disk-then-RAM.
    Disconnected | Connecting | Throttled -> {
      buffer_with_spill(data, metric)
      state_machine.keep_state(data, [])
    }
    Streaming -> {
      let _overflow = hot_buffer.push(data.buffer, metric)
      case flush(data) {
        Ok(Nil) -> maybe_throttle(data)
        Error(_) -> go_disconnected(data)
      }
    }
  }
}

/// Push `metric` into the hot buffer while buffering. With a cold tier enabled,
/// once the ring reaches the `max_pending` high-watermark its whole contents
/// are drained to disk *before* the next push could evict the oldest entry, so
/// a long partition never loses data to the bounded ring.
fn buffer_with_spill(data: Data, metric: Metric) -> Nil {
  case data.spillover {
    Some(sp) ->
      case hot_buffer.size(data.buffer) >= data.config.max_pending {
        True -> spillover.push_all(sp, hot_buffer.drain(data.buffer))
        False -> Nil
      }
    None -> Nil
  }
  let _overflow = hot_buffer.push(data.buffer, metric)
  Nil
}

fn on_tick(
  state: ConnectionState,
  data: Data,
) -> state_machine.Step(ConnectionState, Data, Message, reply) {
  case state {
    // Move into Connecting and fire the attempt immediately.
    Disconnected ->
      state_machine.next_state(Connecting, data, [
        StateTimeout(After(0), Attempt),
      ])
    // A retry tick that arrives in any other state is stale; ignore it.
    Connecting | Streaming | Throttled -> state_machine.keep_state(data, [])
  }
}

fn on_attempt(
  state: ConnectionState,
  data: Data,
) -> state_machine.Step(ConnectionState, Data, Message, reply) {
  case state {
    Connecting ->
      case data.transport.connect() {
        Error(_) -> go_disconnected(data)
        Ok(Nil) ->
          case flush(data) {
            Error(_) -> go_disconnected(data)
            Ok(Nil) -> {
              let reset = Data(..data, attempt: 0)
              maybe_throttle(reset)
            }
          }
      }
    // An attempt tick outside Connecting is stale; ignore it.
    Disconnected | Streaming | Throttled -> state_machine.keep_state(data, [])
  }
}

fn on_drain_tick(
  state: ConnectionState,
  data: Data,
) -> state_machine.Step(ConnectionState, Data, Message, reply) {
  case state {
    Throttled ->
      case flush(data) {
        Error(_) -> go_disconnected(data)
        Ok(Nil) -> maybe_throttle(data)
      }
    // A drain tick outside Throttled is stale; ignore it.
    Disconnected | Connecting | Streaming -> state_machine.keep_state(data, [])
  }
}

fn on_flush_tick(
  state: ConnectionState,
  data: Data,
) -> state_machine.Step(ConnectionState, Data, Message, reply) {
  case state {
    Streaming ->
      case flush(data) {
        Error(_) -> go_disconnected(data)
        // Re-arm the periodic flush (a state timeout fires only once).
        Ok(Nil) ->
          state_machine.keep_state(data, [
            StateTimeout(After(data.config.flush_interval_ms), FlushTick),
          ])
      }
    // A flush tick outside Streaming is stale; ignore it.
    Disconnected | Connecting | Throttled -> state_machine.keep_state(data, [])
  }
}

/// After a successful flush, decide between Streaming and Throttled based on
/// how much backlog remains relative to `max_pending`.
fn maybe_throttle(
  data: Data,
) -> state_machine.Step(ConnectionState, Data, Message, reply) {
  case hot_buffer.size(data.buffer) > data.config.max_pending {
    True ->
      state_machine.next_state(Throttled, data, [
        StateTimeout(After(data.config.base_ms), DrainTick),
      ])
    // Arm the periodic flush so buffer-only pushes get shipped.
    False ->
      state_machine.next_state(Streaming, data, [
        StateTimeout(After(data.config.flush_interval_ms), FlushTick),
      ])
  }
}

/// Transition to Disconnected and arm the jittered backoff retry timer,
/// bumping the attempt counter.
fn go_disconnected(
  data: Data,
) -> state_machine.Step(ConnectionState, Data, Message, reply) {
  let bumped = Data(..data, attempt: data.attempt + 1)
  let delay = jittered_delay(data.config, data.attempt)
  state_machine.next_state(Disconnected, bumped, [
    StateTimeout(After(delay), Tick),
  ])
}

// ---------------------------------------------------------------------------
// Flushing
// ---------------------------------------------------------------------------

/// Replay the cold (disk) tier first, then drain and deliver the hot (RAM)
/// tier. On the first delivery failure nothing is lost: cold-tier failures
/// re-spill the unsent remainder to disk, hot-tier failures push the remainder
/// back onto the ring. Either way an error is returned and the caller drops
/// back to Disconnected.
fn flush(data: Data) -> Result(Nil, String) {
  case replay_cold(data) {
    Error(reason) -> Error(reason)
    Ok(Nil) -> deliver_all(data, hot_buffer.drain(data.buffer))
  }
}

/// Drain the cold tier (oldest-first) and deliver it. A no-op when no cold tier
/// is configured. On a delivery failure the failed metric and all remaining
/// cold-tier metrics are re-spilled to disk (oldest-first) so disk stays first
/// on the next replay, and an error is returned.
fn replay_cold(data: Data) -> Result(Nil, String) {
  case data.spillover {
    None -> Ok(Nil)
    Some(sp) -> deliver_cold(data, sp, spillover.drain(sp))
  }
}

fn deliver_cold(
  data: Data,
  sp: Spillover,
  metrics: List(Metric),
) -> Result(Nil, String) {
  case metrics {
    [] -> Ok(Nil)
    [metric, ..rest] ->
      case data.transport.deliver(metric) {
        Ok(Nil) -> deliver_cold(data, sp, rest)
        Error(reason) -> {
          spillover.push_all(sp, [metric, ..rest])
          Error(reason)
        }
      }
  }
}

fn deliver_all(data: Data, metrics: List(Metric)) -> Result(Nil, String) {
  case metrics {
    [] -> Ok(Nil)
    [metric, ..rest] ->
      case data.transport.deliver(metric) {
        Ok(Nil) -> deliver_all(data, rest)
        Error(reason) -> {
          list.each([metric, ..rest], fn(m) {
            let _ = hot_buffer.push(data.buffer, m)
          })
          Error(reason)
        }
      }
  }
}

// ---------------------------------------------------------------------------
// Backoff
// ---------------------------------------------------------------------------

/// Deterministic exponential backoff: `min(max_ms, base_ms * 2^attempt)`.
/// `2^exponent` is an exact left shift (`1 << exponent`); the exponent is
/// capped at 16 so the shift can't run away.
pub fn backoff_delay(config: ConnectionConfig, attempt: Int) -> Int {
  let exponent = int.min(attempt, 16)
  let raw = config.base_ms * int.bitwise_shift_left(1, exponent)
  int.min(raw, config.max_ms)
}

/// "Full Jitter" backoff (AWS *Exponential Backoff And Jitter*): a uniform
/// random delay in `[0, backoff_delay(config, attempt))`. Spreading the delay
/// across the *whole* backoff window (rather than adding a small fixed jitter
/// band) minimises reconnection contention across a fleet.
pub fn jittered_delay(config: ConnectionConfig, attempt: Int) -> Int {
  int.random(backoff_delay(config, attempt))
}

// ---------------------------------------------------------------------------
// Brain transport (BEAM-to-BEAM ETF)
// ---------------------------------------------------------------------------

/// An opaque, resolved Brain destination (node atom + registered-name atom).
/// Built once by `resolve_brain_target`, which is where the node/name strings
/// are turned into atoms, so message volume never drives atom creation
/// (secure coding rule DSG-003). Reused by both the metric stream and the
/// Brain alert sink.
pub type BrainTarget

/// Resolve a Brain `node`/`name` config pair to a target, once. An empty
/// `node` targets the local node.
@external(erlang, "pharos_ffi", "brain_target")
pub fn resolve_brain_target(node: String, name: String) -> BrainTarget

/// Verify the target node is reachable.
@external(erlang, "pharos_ffi", "brain_connect")
pub fn brain_connect(target: BrainTarget) -> Result(Nil, String)

/// Send an already-ETF-encoded payload to the target's registered process.
@external(erlang, "pharos_ffi", "brain_deliver")
pub fn brain_deliver(
  target: BrainTarget,
  payload: BitArray,
) -> Result(Nil, String)

/// ETF-encode any term to a binary (`erlang:term_to_binary/1`).
@external(erlang, "pharos_ffi", "encode_etf")
pub fn encode_etf(term: a) -> BitArray

/// The Brain transport: connect verifies the remote node is reachable;
/// deliver ETF-encodes the metric and sends it to `{name, node}`.
/// The target's atoms are resolved once here, not per delivery.
pub fn brain_transport(node: String, name: String) -> Transport {
  let target = resolve_brain_target(node, name)
  Transport(connect: fn() { brain_connect(target) }, deliver: fn(metric) {
    brain_deliver(target, encode_etf(metric))
  })
}