Skip to main content

src/pharos.gleam

//// Public API.
////
//// `start_link/1` boots the entire pharos supervision tree from a
//// `Config`. The returned `Pharos` handle lets callers `subscribe` to
//// alert events, `unsubscribe` later, and `stop` everything when done.
////
//// ## Example
////
//// ```gleam
//// import pharos
//// import pharos/config
//// import pharos/statistic
////
//// let assert Ok(started) =
////   pharos.start_link(
////     config.new()
////     |> config.with_statistics([
////       statistic.poll(statistic.BeamMemory),
////       statistic.poll_every(statistic.BeamRunQueues, 500),
////     ])
////     |> config.with_thresholds([config.TotalMemory(above: 500.0)]),
////   )
////
//// let assert Ok(_handler) =
////   pharos.subscribe(started.data, fn(event) { handle(event) })
//// ```

import eparch/event_manager
import gleam/erlang/atom.{type Atom}
import gleam/erlang/process.{type Pid}
import gleam/list
import gleam/option.{None, Some}
import gleam/otp/actor
import pharos/alert.{type AlertEvent, type WindowSpec, AlertData, WindowSpec}
import pharos/alert_manager
import pharos/config.{type Config}
import pharos/event_bus.{type EventBus, type HandlerId}
import pharos/internal/connection
import pharos/internal/hot_buffer
import pharos/internal/spillover
import pharos/internal/supervisor as pharos_supervisor
import pharos/internal/telemetry
import pharos/probe
import pharos/sink
import pharos/threshold_eval

/// Opaque handle returned by `start_link`. Pass it to `subscribe`,
/// `unsubscribe`, and `stop`.
pub opaque type Pharos {
  Pharos(supervisor_pid: Pid, bus: EventBus, handler_id: Atom)
}

/// Reasons `start_link/1` can fail.
pub type StartError {
  /// The OTP supervisor itself could not be started; carries the reason.
  SupervisorStartFailed(reason: String)
  /// The supervisor started but the telemetry handler could not be
  /// attached. The supervisor (and all its children) is shut down before
  /// returning this error so we don't leak processes.
  TelemetryAttachFailed(reason: String)
}

/// Boot pharos from `config`.
///
/// Generates stable registered names for the event bus and one per
/// threshold, builds the supervision tree, then attaches a single
/// telemetry handler that decodes events and routes
/// `breach`/`recover` casts to the matching alert managers.
pub fn start_link(config: Config) -> Result(actor.Started(Pharos), StartError) {
  let bus_name = process.new_name(prefix: "pharos_event_bus")

  // BEAM threshold lane: pair each threshold with its registered name.
  let beam_lane =
    list.map(config.thresholds, fn(threshold) {
      let id = config.threshold_id(threshold)
      #(threshold, process.new_name(prefix: "pharos_alert_" <> id))
    })
  // Custom probe threshold lane: same shape, distinct name prefix.
  let probe_lane =
    list.map(config.custom_thresholds, fn(threshold) {
      let id = probe.threshold_id(threshold)
      #(threshold, process.new_name(prefix: "pharos_probe_" <> id))
    })

  let manager_specs =
    list.append(
      list.map(beam_lane, fn(pair) {
        let #(threshold, name) = pair
        // Windowed rules debounce via the window itself, so they skip the soak
        // (soak 0 → fire as soon as the windowed aggregate breaches); cool
        // still applies for resolution hysteresis.
        let window = window_spec(threshold)
        let soak_period_ms = case window {
          Some(_) -> 0
          None -> config.soak_period_ms
        }
        pharos_supervisor.AlertManagerSpec(
          data: AlertData(
            id: config.threshold_id(threshold),
            level: config.default_alert_level,
            soak_period_ms: soak_period_ms,
            cool_period_ms: config.cool_period_ms,
            window: window,
          ),
          name: name,
        )
      }),
      list.map(probe_lane, fn(pair) {
        let #(threshold, name) = pair
        pharos_supervisor.AlertManagerSpec(
          data: AlertData(
            id: probe.threshold_id(threshold),
            level: threshold.level,
            soak_period_ms: config.soak_period_ms,
            cool_period_ms: config.cool_period_ms,
            window: None,
          ),
          name: name,
        )
      }),
    )

  // Hot buffer for the metric stream. Its ETS table name reuses the atom
  // minted by `new_name` (no extra atoms), and readers/writers address it by
  // that stable name across owner restarts.
  let buffer_name = process.new_name(prefix: "pharos_hot_buffer")
  let table = hot_buffer.table_name(buffer_name)
  let buffer = hot_buffer.from_table(table)
  let buffer_child = hot_buffer.child_spec(table, config.metric_buffer_capacity)

  // Cold tier: a Dets spillover file, started only when a spillover path AND a
  // Brain stream are both configured (no connection means nothing to spill
  // for). The same handle is shared with the connection manager below.
  let #(spillover_handle, spillover_child) = case
    config.brain,
    config.metric_spillover_path
  {
    Some(_), Some(path) -> {
      let spillover_name = process.new_name(prefix: "pharos_spillover")
      let spillover_table = spillover.table_name(spillover_name)
      let handle = spillover.from_table(spillover_table)
      #(Some(handle), Some(spillover.child_spec(spillover_table, path)))
    }
    _, _ -> #(None, None)
  }

  // Connection manager: only started when a Brain stream target is configured.
  let connection_child = case config.brain {
    None -> None
    Some(brain) ->
      Some(connection.child_spec(
        process.new_name(prefix: "pharos_connection"),
        connection.brain_transport(brain.node, brain.name),
        buffer,
        spillover_handle,
        connection.ConnectionConfig(
          base_ms: 1000,
          max_ms: 30_000,
          max_pending: config.metric_buffer_capacity / 2,
          flush_interval_ms: 1000,
        ),
      ))
  }

  case
    pharos_supervisor.start_link(
      config,
      bus_name,
      manager_specs,
      buffer_child,
      spillover_child,
      connection_child,
    )
  {
    Error(error) -> Error(SupervisorStartFailed(format_actor_error(error)))
    Ok(started) -> {
      let bus = event_bus.from_name(bus_name)
      let handler_id = atom.create("pharos_handler")

      let manager_handles =
        list.map(beam_lane, fn(pair) {
          let #(threshold, name) = pair
          #(threshold, alert_manager.from_name(name))
        })
      let probe_handles =
        list.map(probe_lane, fn(pair) {
          let #(threshold, name) = pair
          #(threshold, alert_manager.from_name(name))
        })

      case
        telemetry.attach(
          config,
          manager_handles,
          probe_handles,
          config.custom_statistics,
          buffer,
          handler_id,
        )
      {
        Ok(Nil) -> {
          // Wire each configured alert sink as a handler on the bus.
          let _ = sink.attach_all(bus, config.alert_sinks)
          Ok(actor.Started(
            pid: started.pid,
            data: Pharos(
              supervisor_pid: started.pid,
              bus: bus,
              handler_id: handler_id,
            ),
          ))
        }
        Error(attach_error) -> {
          // Supervisor started but the handler couldn't attach: tear it
          // down so we don't leak processes.
          shutdown_supervisor(started.pid)
          Error(TelemetryAttachFailed(format_attach_error(attach_error)))
        }
      }
    }
  }
}

/// Subscribe `on_event` to the alert event bus. Returns an opaque handle
/// that can be passed to `unsubscribe`.
pub fn subscribe(
  pharos: Pharos,
  on_event: fn(AlertEvent) -> Nil,
) -> Result(HandlerId, event_manager.AddError(Nil, Nil)) {
  event_bus.add_handler(pharos.bus, on_event)
}

/// Unsubscribe a previously registered handler.
pub fn unsubscribe(
  pharos: Pharos,
  handler: HandlerId,
) -> Result(Nil, event_manager.RemoveError(Nil, Nil)) {
  event_bus.remove_handler(pharos.bus, handler)
}

/// Stop pharos: detach the telemetry handler, then shut down the
/// supervision tree. Safe to call multiple times - extra calls become
/// no-ops once the supervisor is already gone.
pub fn stop(pharos: Pharos) -> Nil {
  let _ = telemetry.detach(pharos.handler_id)
  shutdown_supervisor(pharos.supervisor_pid)
}

// ---------------------------------------------------------------------------
// Internals
// ---------------------------------------------------------------------------

/// Build the runtime `WindowSpec` for a windowed threshold (the inner
/// threshold supplies the metric's limit); `None` for per-tick thresholds.
fn window_spec(threshold: config.Threshold) -> option.Option(WindowSpec) {
  case threshold {
    config.Windowed(over:, window_ms:, mode:) ->
      Some(WindowSpec(
        window_ms: window_ms,
        limit: threshold_eval.threshold_limit(over),
        mode: mode,
      ))
    _ -> None
  }
}

@external(erlang, "pharos_ffi", "shutdown_supervisor")
fn shutdown_supervisor(pid: Pid) -> Nil

fn format_actor_error(error: actor.StartError) -> String {
  case error {
    actor.InitTimeout -> "supervisor initialisation timed out"
    actor.InitFailed(reason) -> reason
    actor.InitExited(_) -> "supervisor exited during initialisation"
  }
}

fn format_attach_error(error: telemetry.AttachError) -> String {
  case error {
    telemetry.HandlerAlreadyAttached -> "telemetry handler already attached"
    telemetry.AttachFailed(reason: reason) -> reason
  }
}