Skip to main content

crates/aube-util/src/diag.rs

/*!
 * Cold-install deep diagnostics for `aube install`.
 *
 * Activation paths:
 *
 * - CLI flag `aube install --diag <summary|trace|live|full>`,
 *   which builds a [`DiagConfig`] and calls [`init_with_config`].
 * - Env vars for programmatic / CI use:
 *   - `AUBE_DIAG_FILE=<path>` writes JSONL events to a file.
 *   - `AUBE_DIAG_PRINT=1` prints every recorded span to stderr.
 *   - `AUBE_DIAG_THRESHOLD_MS=<n>` filters live prints to spans
 *     whose duration is at least `n` milliseconds.
 *   - `AUBE_DIAG_SUMMARY=1` enables the end-of-run aggregate table only.
 *   - `AUBE_DIAG_CRITPATH=1` retains per-event records for the
 *     critical-path / lifecycle / what-if / starvation analyzers.
 *
 * Event wire format (JSONL, one event per line):
 *
 * `{"t":<ms>,"cat":"<category>","name":"<name>","dur":<ms>,"meta":<obj>?}`
 *
 * Field semantics:
 *
 * - `t` elapsed milliseconds since the recorder was initialized.
 * - `cat` category bucket (e.g. `resolver`, `registry`, `fetch`,
 *   `store`, `linker`, `materialize`, `install`, `install_phase`,
 *   `lockfile`, `manifest`, `starvation`, `channel`, `sample`).
 * - `name` event identifier within the category.
 * - `dur` duration in milliseconds; zero for instant markers.
 * - `meta` optional inline JSON object with structured context.
 *   Embedded strings are escaped via [`jstr`].
 *
 * Recording primitives:
 *
 * - [`Span::new`] for scope-bracketed timings (RAII; emits on drop).
 *   Prefer [`Span::with_meta_fn`] over [`Span::with_meta`] so the
 *   `format!` work is skipped when diagnostics are disabled.
 * - [`event_lazy`] / [`instant_lazy`] for one-shot events with
 *   closure-deferred metadata; same disabled-path no-op behaviour.
 * - [`event`] / [`instant`] for events whose metadata is either
 *   absent or already available without allocation.
 *
 * Concurrency tracking:
 *
 * - [`inflight`] returns an [`InflightGuard`] which increments on
 *   creation and decrements on drop. The sampler emits a
 *   `cat=sample,name=concurrency` event every 50 ms recording each
 *   [`Slot`]'s in-flight count.
 *
 * Permit-holder attribution:
 *
 * - [`register_holder`] records the package currently holding a
 *   [`Slot`]. [`attribute_wait`] queries the active holders when a
 *   waiter blocks for at least 50 ms and emits a `starvation` event
 *   naming the blockers.
 */

use std::cmp::Ordering as CmpOrdering;
use std::fs::File;
use std::io::{BufWriter, Write};
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Mutex, OnceLock};
use std::time::{Duration, Instant};

struct Recorder {
    start: Instant,
    file: Option<Mutex<BufWriter<File>>>,
    print_stderr: bool,
    threshold_ms: u64,
    summary: bool,
    track_events: bool,
    in_flight_packuments: AtomicU64,
    in_flight_tarballs: AtomicU64,
    in_flight_imports: AtomicU64,
    in_flight_links: AtomicU64,
    in_flight_decode: AtomicU64,
    event_count: AtomicU64,
    aggregates: Mutex<AggMap>,
    /// In-memory event log used by the critical-path / lifecycle / what-if
    /// / starvation analyzers. Allocated up front to its [`EVENTS_CAP`]
    /// capacity when `track_events` is on so the emit hot path never
    /// pays a reallocation under the mutex lock.
    events: Mutex<Vec<EventRec>>,
}

/**
 * Closed enumeration of event categories.
 *
 * Centralizing the category set lets the type system guarantee that
 * every emit site uses one of the recognized buckets, eliminating the
 * silent-typo-becomes-orphan-row class of bugs that the old `&str`
 * signature allowed. Each variant carries a stable wire identifier
 * accessible via [`Category::wire`].
 *
 * Adding a new category is a one-line enum extension; analyzer filters
 * such as [`is_envelope`] and the `--diag` summary printer stay in
 * sync because they match against the enum rather than re-typing
 * literals at every site.
 */
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub enum Category {
    Resolver,
    Registry,
    Fetch,
    Store,
    Linker,
    Materialize,
    Install,
    InstallPhase,
    Lockfile,
    Manifest,
    Starvation,
    Channel,
    Sample,
    Frozen,
    Script,
    Kernel,
}

impl Category {
    /**
     * Stable wire identifier emitted as the `cat` field in JSONL traces
     * and used as the row key in the end-of-run summary table. The
     * mapping is part of the public output contract and must not
     * change once a release ships a value.
     */
    pub const fn wire(self) -> &'static str {
        match self {
            Category::Resolver => "resolver",
            Category::Registry => "registry",
            Category::Fetch => "fetch",
            Category::Store => "store",
            Category::Linker => "linker",
            Category::Materialize => "materialize",
            Category::Install => "install",
            Category::InstallPhase => "install_phase",
            Category::Lockfile => "lockfile",
            Category::Manifest => "manifest",
            Category::Starvation => "starvation",
            Category::Channel => "channel",
            Category::Sample => "sample",
            Category::Frozen => "frozen",
            Category::Script => "script",
            Category::Kernel => "kernel",
        }
    }
}

/**
 * Per-bucket key for aggregate stats. Uses the typed [`Category`] enum
 * and a `&'static str` name so map lookups are pointer comparisons and
 * insert costs no allocation.
 */
type AggKey = (Category, &'static str);

/**
 * Per-bucket aggregate tallies.
 *
 * Field reference:
 *
 * - `count` number of events recorded for this `(category, name)`.
 * - `sum_ns` cumulative duration in nanoseconds. Rescaled to ms at
 *   output time; the underlying ns precision survives summing across
 *   millions of sub-microsecond spans without overflow.
 * - `max_ns` largest single-event duration observed for this bucket.
 */
#[derive(Default, Clone, Copy, Debug)]
struct AggVal {
    count: u64,
    sum_ns: u128,
    max_ns: u128,
}

/**
 * Per-`(category, name)` aggregate map maintained while diag is active.
 * Keyed by [`AggKey`] with [`AggVal`] tallies. Used by [`print_summary`].
 */
type AggMap = std::collections::BTreeMap<AggKey, AggVal>;

/**
 * In-memory event record retained when `track_events` is on. The
 * critical-path, lifecycle, what-if, and starvation analyzers iterate
 * these records after the install completes.
 *
 * `cat` and `name` are typed/static rather than owned `String`; this
 * makes [`EventRec::clone`] a pointer copy and cuts the per-event
 * allocation budget by two heap entries (~56 bytes typical) on hot
 * installs that emit hundreds of thousands of records.
 */
#[derive(Clone)]
struct EventRec {
    cat: Category,
    name: &'static str,
    start_ms: f64,
    end_ms: f64,
    pkg_id: Option<String>,
    meta: Option<String>,
}

/**
 * Hard cap on the per-event in-memory log when `track_events` is on.
 *
 * Each entry retains the event's category, name, optional package id,
 * and optional metadata string. At ~256 bytes amortized per entry the
 * cap holds the worst-case footprint to a few hundred MiB on hostile
 * inputs while still capturing more than enough to drive the analyzers
 * for any realistic install (the largest fixture observed produces
 * ~95k events).
 */
const EVENTS_CAP: usize = 1_000_000;

static RECORDER: OnceLock<Option<Recorder>> = OnceLock::new();

/**
 * Fast-path activation flag set after [`init_with_config`] populates
 * [`RECORDER`]. The hot-path [`enabled`] check loads this atomic
 * directly instead of indirecting through the `OnceLock` discriminant
 * plus the `Option` match, which matters at the millions-of-calls
 * scale a busy install reaches. `Relaxed` ordering is sufficient: the
 * first observed `true` triggers the heavyweight code path which then
 * re-acquires the recorder via [`rec`] under standard `OnceLock`
 * happens-before semantics.
 */
static ENABLED: AtomicBool = AtomicBool::new(false);

/**
 * Configuration knobs for the diagnostics recorder.
 *
 * Two construction paths are supported:
 *   - The `--diag <mode>` CLI flag in the binary builds a [`DiagConfig`]
 *     directly and passes it to [`init_with_config`].
 *   - The env-var driven path uses [`DiagConfig::from_env`] which reads
 *     `AUBE_DIAG_*` vars. Returns `None` when no diag is requested.
 *
 * Field reference:
 *   `file`           Optional sink for the JSONL event stream.
 *   `print_stderr`   When `true`, every recorded span is also printed
 *                    to stderr (filtered by `threshold_ms`).
 *   `summary`        Enable the end-of-run aggregate table.
 *   `track_events`   Retain per-event records in memory for the
 *                    [`print_critical_path`] / [`print_pkg_lifecycle`]
 *                    / [`print_what_if`] / [`print_starvation`]
 *                    analyzers. Costs memory proportional to event count.
 *   `threshold_ms`   Minimum span duration (in ms) for the live stderr
 *                    printer to emit. Ignored when `print_stderr` is `false`.
 */
#[derive(Default, Clone)]
pub struct DiagConfig {
    pub file: Option<PathBuf>,
    pub print_stderr: bool,
    pub summary: bool,
    pub track_events: bool,
    pub threshold_ms: u64,
}

impl DiagConfig {
    /**
     * Build a [`DiagConfig`] from the `AUBE_DIAG_*` environment variables.
     *
     * Returns `None` when none of `AUBE_DIAG_FILE`, `AUBE_DIAG_PRINT`,
     * `AUBE_DIAG_SUMMARY`, or `AUBE_DIAG_CRITPATH` is set, meaning the
     * caller should leave diagnostics off entirely.
     *
     * `summary` is implied whenever the recorder is alive at all;
     * `track_events` is gated on `AUBE_DIAG_CRITPATH` because the
     * per-event log is the costly bit.
     */
    pub fn from_env() -> Option<Self> {
        let file = std::env::var_os("AUBE_DIAG_FILE").map(PathBuf::from);
        let print = std::env::var_os("AUBE_DIAG_PRINT").is_some();
        let summary_env = std::env::var_os("AUBE_DIAG_SUMMARY").is_some();
        let critpath_env = std::env::var_os("AUBE_DIAG_CRITPATH").is_some();
        if file.is_none() && !print && !summary_env && !critpath_env {
            return None;
        }
        let threshold_ms = std::env::var("AUBE_DIAG_THRESHOLD_MS")
            .ok()
            .and_then(|s| s.parse::<u64>().ok())
            .unwrap_or(0);
        Some(Self {
            file,
            print_stderr: print,
            // Summary table emits whenever the recorder is alive.
            summary: true,
            // Critical-path / lifecycle / what-if / starvation analysis requires
            // retaining per-event records, which is more memory-intensive.
            track_events: critpath_env,
            threshold_ms,
        })
    }
}

/**
 * Initialize the recorder from `AUBE_DIAG_*` environment variables.
 *
 * No-op when no relevant env var is set. Provided as the env-driven entry
 * point for callers that do not parse the `--diag` CLI flag (CI scripts,
 * external tooling). Internally delegates to [`init_with_config`] with
 * the result of [`DiagConfig::from_env`].
 *
 * Idempotent: subsequent calls after the first do nothing.
 */
pub fn init() {
    init_with_config(DiagConfig::from_env());
}

/**
 * Validate that a `--diag-file` / `AUBE_DIAG_FILE` path is safe to
 * truncate-and-write.
 *
 * Rejects:
 *   - UNC paths (`\\server\share\...`) and Windows device namespaces
 *     (`\\.\...`, `\\?\...`) so that an attacker-supplied env var can
 *     not target named pipes, raw devices, or remote shares.
 *   - NTFS alternate data streams (any `:` after the volume on
 *     Windows). On non-Windows targets `:` is allowed for normal use.
 *   - Reserved Windows device names in the final filename
 *     (`CON`, `PRN`, `AUX`, `NUL`, `COM1`-`COM9`, `LPT1`-`LPT9`).
 *
 * Path traversal beyond the working directory is permitted because
 * legitimate uses include CI scratch dirs and the system temp folder.
 * Operators can additionally constrain via the file-permission model.
 *
 * Returns `Err(reason)` with a human-readable message on rejection.
 */
fn validate_diag_path(path: &std::path::Path) -> Result<(), &'static str> {
    let s = path.to_string_lossy();
    // `\\…` covers Windows UNC, device (`\\.\…`), and verbatim (`\\?\…`)
    // namespaces and is rejected on every platform. `//…` is reserved
    // by POSIX (XBD §4.13) and is the wire form of UNC under MSYS / Git
    // Bash on Windows; reject it only on Windows so legitimate Unix
    // paths starting with `//` (POSIX permits any double-slash prefix
    // to be interpreted by the implementation) are not blocked.
    if s.starts_with(r"\\") {
        return Err("UNC and device paths are not permitted");
    }
    #[cfg(windows)]
    if s.starts_with("//") {
        return Err("UNC and device paths are not permitted");
    }
    #[cfg(windows)]
    {
        // Strip volume root (e.g. "C:") then reject any further colon use,
        // which would address an alternate data stream.
        let after_volume = match s.as_bytes() {
            [_, b':', b'/' | b'\\', rest @ ..] | [_, b':', rest @ ..] => {
                std::str::from_utf8(rest).unwrap_or(s.as_ref())
            }
            _ => s.as_ref(),
        };
        if after_volume.contains(':') {
            return Err("alternate data stream paths are not permitted");
        }
        if let Some(stem) = path
            .file_stem()
            .and_then(|s| s.to_str())
            .map(str::to_ascii_uppercase)
        {
            const RESERVED: &[&str] = &[
                "CON", "PRN", "AUX", "NUL", "COM1", "COM2", "COM3", "COM4", "COM5", "COM6", "COM7",
                "COM8", "COM9", "LPT1", "LPT2", "LPT3", "LPT4", "LPT5", "LPT6", "LPT7", "LPT8",
                "LPT9",
            ];
            if RESERVED.contains(&stem.as_str()) {
                return Err("reserved Windows device name");
            }
        }
    }
    Ok(())
}

/**
 * Initialize the recorder from an explicit [`DiagConfig`].
 *
 * Passing `None` keeps diagnostics off. Passing `Some(cfg)` opens the
 * configured JSONL file (when `cfg.file` is set) and records the install
 * start instant against which all event timestamps are measured.
 *
 * If `cfg.file` is set but rejected by [`validate_diag_path`] or fails
 * to open, the recorder still activates without a file sink and a
 * stderr warning is emitted. The diag run continues so that summary,
 * critpath, and other in-memory analyzers still produce output.
 *
 * Idempotent: only the first call wins. Subsequent calls (including from
 * spawned tasks) are no-ops.
 *
 * The CLI binary calls this from `main` after argument parsing; library
 * callers using env-driven configuration should prefer [`init`].
 */
pub fn init_with_config(cfg: Option<DiagConfig>) {
    RECORDER.get_or_init(|| {
        let cfg = cfg?;
        let file = cfg.file.and_then(|p| match validate_diag_path(&p) {
            Ok(()) => match File::create(&p) {
                Ok(f) => Some(Mutex::new(BufWriter::with_capacity(64 * 1024, f))),
                Err(err) => {
                    eprintln!("[diag] could not open trace file {}: {err}", p.display());
                    None
                }
            },
            Err(reason) => {
                eprintln!(
                    "[diag] refusing to write trace file {}: {reason}",
                    p.display()
                );
                None
            }
        });
        if cfg.print_stderr {
            eprintln!(
                "[diag] active threshold={}ms summary={}",
                cfg.threshold_ms, cfg.summary
            );
        }
        // Track-events allocates the worst-case event log eagerly.
        // Rationale: with `EVENTS_CAP` ≈ 1M and ~256 B amortized per
        // record, growing by doubling under the mutex lock would copy
        // ~134 MiB at 524k entries while every emitter blocks. One
        // upfront allocation keeps emit latency predictable.
        let events_cap = if cfg.track_events { EVENTS_CAP } else { 0 };
        let recorder = Recorder {
            start: Instant::now(),
            file,
            print_stderr: cfg.print_stderr,
            threshold_ms: cfg.threshold_ms,
            summary: cfg.summary,
            track_events: cfg.track_events,
            in_flight_packuments: AtomicU64::new(0),
            in_flight_tarballs: AtomicU64::new(0),
            in_flight_imports: AtomicU64::new(0),
            in_flight_links: AtomicU64::new(0),
            in_flight_decode: AtomicU64::new(0),
            event_count: AtomicU64::new(0),
            aggregates: Mutex::new(std::collections::BTreeMap::new()),
            events: Mutex::new(Vec::with_capacity(events_cap)),
        };
        // Publish the active flag last so [`enabled`] starts returning
        // `true` only after the recorder is fully constructed and
        // visible through [`RECORDER`].
        ENABLED.store(true, Ordering::Relaxed);
        Some(recorder)
    });
}

/**
 * Reports whether diagnostics are active.
 *
 * Single relaxed atomic load on [`ENABLED`]. Hot-path call sites gate
 * `format!` work behind this check via [`Span::with_meta_fn`],
 * [`event_lazy`], and [`instant_lazy`]. The flag is set after
 * [`init_with_config`] finishes installing the recorder, so a few
 * events emitted during a concurrent first-init may be silently
 * dropped — acceptable for a diagnostics layer.
 */
#[inline]
pub fn enabled() -> bool {
    ENABLED.load(Ordering::Relaxed)
}

/**
 * Internal accessor that returns `Some(&Recorder)` only when diag is
 * configured. All public emit paths short-circuit on `None`.
 */
fn rec() -> Option<&'static Recorder> {
    RECORDER.get().and_then(|o| o.as_ref())
}

/**
 * Emit a recorded event with an explicit duration and an already-built
 * metadata string (or `None` for no metadata).
 *
 * Most call sites should prefer [`event_lazy`] so the metadata `format!`
 * is skipped when diag is disabled. This eager variant is appropriate
 * when the metadata is already available as `&str` without allocation
 * (e.g. a cached buffer) or when the call site is itself behind an
 * [`enabled`] check.
 *
 * The recorder fans the event out to up to three sinks depending on
 * configuration:
 *   - the in-memory aggregate table (always when `summary` is on),
 *   - the in-memory event log (when `track_events` is on and the span
 *     has non-zero duration),
 *   - the JSONL file (when `file` was opened),
 *   - the stderr live printer (when `print_stderr` is on and the span
 *     duration is at least `threshold_ms`).
 */
pub fn event(category: Category, name: &'static str, duration: Duration, meta: Option<&str>) {
    let Some(r) = rec() else { return };
    let t_ms = r.start.elapsed().as_secs_f64() * 1000.0;
    let dur_ms = duration.as_secs_f64() * 1000.0;
    r.event_count.fetch_add(1, Ordering::Relaxed);

    if r.summary {
        let dur_ns = duration.as_nanos();
        let mut agg = r.aggregates.lock().unwrap_or_else(|e| e.into_inner());
        let entry = agg.entry((category, name)).or_default();
        entry.count += 1;
        entry.sum_ns += dur_ns;
        if dur_ns > entry.max_ns {
            entry.max_ns = dur_ns;
        }
    }

    if r.track_events && dur_ms > 0.0 {
        let pkg_id = meta.and_then(extract_pkg_id);
        // Starvation events carry blame metadata that the analyzer
        // re-parses; every other category drops meta after writing
        // since the in-memory log is consumed only by analyzers that
        // do not look at meta.
        let stored_meta = if matches!(category, Category::Starvation) {
            meta.map(|s| s.to_string())
        } else {
            None
        };
        let mut evs = r.events.lock().unwrap_or_else(|e| e.into_inner());
        if evs.len() < EVENTS_CAP {
            evs.push(EventRec {
                cat: category,
                name,
                start_ms: t_ms - dur_ms,
                end_ms: t_ms,
                pkg_id,
                meta: stored_meta,
            });
            if evs.len() == EVENTS_CAP {
                eprintln!(
                    "[diag] event log reached {} entries; further per-event records will be dropped",
                    EVENTS_CAP
                );
            }
        }
    }

    if let Some(file) = &r.file {
        let mut f = file.lock().unwrap_or_else(|e| e.into_inner());
        let cat_wire = category.wire();
        let _ = match meta {
            Some(m) => writeln!(
                f,
                r#"{{"t":{:.3},"cat":"{}","name":"{}","dur":{:.3},"meta":{}}}"#,
                t_ms, cat_wire, name, dur_ms, m
            ),
            None => writeln!(
                f,
                r#"{{"t":{:.3},"cat":"{}","name":"{}","dur":{:.3}}}"#,
                t_ms, cat_wire, name, dur_ms
            ),
        };
    }

    if r.print_stderr && (dur_ms as u64) >= r.threshold_ms {
        let cat_wire = category.wire();
        match meta {
            Some(m) => eprintln!(
                "[diag {:>8.2}ms] {:>10}.{:<28} {:>9.2}ms  {}",
                t_ms, cat_wire, name, dur_ms, m
            ),
            None => eprintln!(
                "[diag {:>8.2}ms] {:>10}.{:<28} {:>9.2}ms",
                t_ms, cat_wire, name, dur_ms
            ),
        }
    }
}

/**
 * Emit an instantaneous marker. Equivalent to [`event`] with a zero
 * duration.
 *
 * Useful for phase boundaries and irreversible state transitions
 * (e.g. `install.resolve_end`, `materialize.drain_rx_begin`).
 */
pub fn instant(category: Category, name: &'static str, meta: Option<&str>) {
    event(category, name, Duration::ZERO, meta);
}

/**
 * Closure-deferred [`instant`]. The metadata `format!` runs only when
 * [`enabled`] returns `true`, so disabled call sites pay only an atomic
 * load.
 */
pub fn instant_lazy<F: FnOnce() -> String>(category: Category, name: &'static str, meta_fn: F) {
    if !enabled() {
        return;
    }
    let meta = meta_fn();
    event(category, name, Duration::ZERO, Some(&meta));
}

/**
 * Closure-deferred [`event`]. The metadata `format!` runs only when
 * [`enabled`] returns `true`, so disabled call sites pay only an atomic
 * load.
 */
pub fn event_lazy<F: FnOnce() -> String>(
    category: Category,
    name: &'static str,
    duration: Duration,
    meta_fn: F,
) {
    if !enabled() {
        return;
    }
    let meta = meta_fn();
    event(category, name, duration, Some(&meta));
}

/**
 * RAII timer for a scoped duration measurement.
 *
 * The span captures `Instant::now()` on construction and emits a
 * recorded event with the elapsed duration when dropped (or when
 * [`Span::finish`] is called explicitly). The category is `&'static str`
 * so it can be cheaply embedded in the JSONL output without allocation.
 *
 * Idiomatic usage at a call site:
 *
 * ```ignore
 * let _diag = aube_util::diag::Span::new("registry", "fetch_packument")
 *     .with_meta_fn(|| format!(r#"{{"name":{}}}"#, jstr(name)));
 * // ... work ...
 * // Drop on scope exit emits the event.
 * ```
 *
 * Always prefer [`Span::with_meta_fn`] over [`Span::with_meta`] so the
 * `format!` allocation is skipped when diag is disabled.
 */
pub struct Span {
    category: Category,
    name: &'static str,
    start: Instant,
    meta: Option<String>,
    finished: bool,
}

impl Span {
    /**
     * Construct a span with the given category and name.
     *
     * Captures the current monotonic [`Instant`] as the start time.
     * The span emits its recorded event on [`Drop`] unless previously
     * finalized via [`Span::finish`]. Both `category` and `name` are
     * compile-time fixed: the call site cannot construct a span with a
     * dynamic name, which keeps the per-event allocation budget at zero
     * and lets aggregate keys be `(Category, &'static str)` pairs.
     */
    pub fn new(category: Category, name: &'static str) -> Self {
        Self {
            category,
            name,
            start: Instant::now(),
            meta: None,
            finished: false,
        }
    }

    /**
     * Attach metadata. The closure runs only when [`enabled`] returns
     * `true`, so the call site pays only an atomic load when
     * diagnostics are off. Sites that already own a built `String` can
     * pass `|| s` — the closure-over-owned-string costs nothing.
     */
    pub fn with_meta<F: FnOnce() -> String>(mut self, f: F) -> Self {
        if enabled() {
            self.meta = Some(f());
        }
        self
    }

    /**
     * Alias for [`Span::with_meta`] retained for migration. New code
     * should use [`Span::with_meta`] directly; the closure-deferred
     * shape is now the only one offered.
     */
    #[doc(hidden)]
    pub fn with_meta_fn<F: FnOnce() -> String>(self, f: F) -> Self {
        self.with_meta(f)
    }

    /**
     * Finalize the span eagerly rather than waiting for [`Drop`].
     *
     * Useful when emitting the event is correlated with a subsequent
     * span (e.g. boundary markers around a `match` arm).
     */
    pub fn finish(mut self) {
        self.flush();
        self.finished = true;
    }

    /**
     * Internal sink used by both [`Span::finish`] and [`Drop`]. Emits
     * the event via [`event`] when diag is active.
     */
    fn flush(&mut self) {
        if !enabled() {
            return;
        }
        event(
            self.category,
            self.name,
            self.start.elapsed(),
            self.meta.as_deref(),
        );
    }

    /**
     * Snapshot the elapsed duration so far without finalizing the span.
     *
     * The span continues to record and will emit its full duration on
     * drop. Useful for periodic "still alive" probes inside long-running
     * spans.
     */
    pub fn elapsed_ms(&self) -> f64 {
        self.start.elapsed().as_secs_f64() * 1000.0
    }
}

impl Drop for Span {
    /**
     * Emits the recorded event unless [`Span::finish`] was called.
     */
    fn drop(&mut self) {
        if !self.finished {
            self.flush();
        }
    }
}

/**
 * Discriminator for the in-flight counters and permit-holder registry.
 *
 * Variants:
 *
 * - `Pack` packument fetches (resolver phase).
 * - `Tar` tarball downloads (fetch phase).
 * - `Imp` CAS imports (materialize phase).
 * - `Link` linker materialize / symlink work.
 * - `Decode` gzip decompression / tar extraction (CPU bound).
 *
 * The discriminant order matches the index of [`SLOT_COUNT`]-sized
 * arrays used internally for fast O(1) lookup of per-slot mutex-guarded
 * state and atomic counters.
 */
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum Slot {
    Pack,
    Tar,
    Imp,
    Link,
    Decode,
}

/// Number of [`Slot`] variants. Used to size the per-slot holder arrays.
pub const SLOT_COUNT: usize = 5;

impl Slot {
    /**
     * Wire-format identifier emitted as the `name` field of starvation
     * events for this slot. Used by [`attribute_wait`] when surfacing
     * permit-wait blame.
     */
    pub const fn wire_name(self) -> &'static str {
        match self {
            Slot::Pack => "packument_sem",
            Slot::Tar => "tarball_sem",
            Slot::Imp => "import_sem",
            Slot::Link => "link_sem",
            Slot::Decode => "decode_sem",
        }
    }

    /**
     * Short JSON key emitted in the periodic `sample.concurrency`
     * event's metadata. Mirrors the variant order in [`Slot`].
     */
    pub const fn sample_key(self) -> &'static str {
        match self {
            Slot::Pack => "pack",
            Slot::Tar => "tar",
            Slot::Imp => "imp",
            Slot::Link => "link",
            Slot::Decode => "decode",
        }
    }
}

/**
 * RAII guard that decrements its [`Slot`]'s in-flight counter on drop.
 *
 * Constructed via [`inflight`]. The 50 ms concurrency sampler reads the
 * counters and emits `cat=sample,name=concurrency` events with the
 * per-slot in-flight totals at sample time.
 */
pub struct InflightGuard {
    slot: Slot,
    /// Whether the matching counter increment ran. Set to `true` only when
    /// [`inflight`] observed an active recorder. The drop site checks
    /// this before decrementing so a guard constructed in the disabled
    /// path or before the recorder was initialized cannot wrap the
    /// counter into a near-`u64::MAX` underflow.
    incremented: bool,
}

impl Drop for InflightGuard {
    fn drop(&mut self) {
        if !self.incremented {
            return;
        }
        let Some(r) = rec() else { return };
        slot_counter(r, self.slot).fetch_sub(1, Ordering::Relaxed);
    }
}

/// Resolve a slot's atomic counter inside the recorder. Centralized so the
/// `Slot` discriminator is mapped in one place; previous code duplicated
/// the match across `inflight()`, `Drop`, and `sample_concurrency`.
fn slot_counter(r: &Recorder, slot: Slot) -> &AtomicU64 {
    match slot {
        Slot::Pack => &r.in_flight_packuments,
        Slot::Tar => &r.in_flight_tarballs,
        Slot::Imp => &r.in_flight_imports,
        Slot::Link => &r.in_flight_links,
        Slot::Decode => &r.in_flight_decode,
    }
}

/**
 * Increment the in-flight counter for the given [`Slot`] and return a
 * guard that decrements on drop.
 *
 * Always returns a guard, even when diag is disabled, to keep call-site
 * shapes uniform. The guard is a no-op in that case — its `incremented`
 * flag stays `false` so the matching drop does not wrap the counter.
 */
pub fn inflight(slot: Slot) -> InflightGuard {
    let mut incremented = false;
    if let Some(r) = rec() {
        slot_counter(r, slot).fetch_add(1, Ordering::Relaxed);
        incremented = true;
    }
    InflightGuard { slot, incremented }
}

/**
 * Per-[`Slot`] registry of currently-held permits.
 *
 * The slot index of [`Slot`] is used directly to address one of the
 * five mutex-guarded vectors. Callers that hold a permit invoke
 * [`register_holder`] to add their package identifier; the returned
 * [`HolderGuard`] removes the entry on drop.
 *
 * When a waiter on the same slot wants to attribute its wait time, it
 * calls [`attribute_wait`] which snapshots the current holders and
 * emits a `starvation` event naming them.
 */
static HOLDERS: OnceLock<[Mutex<Vec<Arc<str>>>; SLOT_COUNT]> = OnceLock::new();

/**
 * Resolve the per-slot mutex-guarded holder vector, lazy-initializing
 * the [`HOLDERS`] array on first access. Each entry is an [`Arc<str>`]
 * so `clone` is a refcount bump rather than a heap allocation.
 */
fn holders_for(slot: Slot) -> &'static Mutex<Vec<Arc<str>>> {
    let arr = HOLDERS.get_or_init(|| {
        [
            Mutex::new(Vec::new()),
            Mutex::new(Vec::new()),
            Mutex::new(Vec::new()),
            Mutex::new(Vec::new()),
            Mutex::new(Vec::new()),
        ]
    });
    &arr[slot as usize]
}

/**
 * RAII guard that removes a registered holder on drop.
 *
 * Constructed by [`register_holder`]. Removes its entry from the
 * appropriate per-[`Slot`] vector when scope exits. The guard owns
 * its [`Arc<str>`] so `Drop` matches by cheap pointer equality on the
 * common path.
 */
pub struct HolderGuard {
    slot: Slot,
    pkg_id: Arc<str>,
    /// Whether the matching `register_holder` push ran. Mirrors
    /// [`InflightGuard::incremented`]: the drop only attempts removal
    /// when the registration actually happened, so a guard built in the
    /// disabled path or with a poisoned mutex during init does not
    /// silently swap-remove an unrelated holder.
    registered: bool,
}

impl Drop for HolderGuard {
    fn drop(&mut self) {
        if !self.registered {
            return;
        }
        let Ok(mut g) = holders_for(self.slot).lock() else {
            return;
        };
        // Pointer-equality first (the cheap, expected case), fall back
        // to value compare for the rare case where the same `&str`
        // arrived via different `Arc<str>` allocations.
        let pos = g
            .iter()
            .position(|p| Arc::ptr_eq(p, &self.pkg_id))
            .or_else(|| g.iter().position(|p| **p == *self.pkg_id));
        if let Some(pos) = pos {
            g.swap_remove(pos);
        }
    }
}

/**
 * Register `pkg_id` as a current holder of a permit on `slot`.
 *
 * The returned [`HolderGuard`] removes the entry on drop. Always returns
 * a guard, even when diag is disabled, so call sites can use a uniform
 * `let _holder = register_holder(...)` shape regardless of mode.
 *
 * The registry stores the package identifier as [`Arc<str>`]. Call
 * sites that already hold an `Arc<str>` can pass it directly; the
 * `impl Into<Arc<str>>` bound also accepts `String` (one allocation
 * to interleave the data into the Arc) and `&str` (same).
 */
pub fn register_holder(slot: Slot, pkg_id: impl AsRef<str>) -> HolderGuard {
    let pkg_id: Arc<str> = Arc::from(pkg_id.as_ref());
    let mut registered = false;
    if rec().is_some()
        && let Ok(mut g) = holders_for(slot).lock()
    {
        g.push(Arc::clone(&pkg_id));
        registered = true;
    }
    HolderGuard {
        slot,
        pkg_id,
        registered,
    }
}

/**
 * Record a starvation event for `waiter` blocking on `slot` for `wait`.
 *
 * Only emits when `wait` is at least 50 ms; shorter waits are
 * statistical noise. The emitted event names every package currently
 * registered as a holder via [`register_holder`], giving the analyzer
 * a list of plausible blockers to attribute the wait to.
 */
pub fn attribute_wait(slot: Slot, waiter: &str, wait: Duration) {
    if rec().is_none() {
        return;
    }
    if wait.as_millis() < 50 {
        return;
    }
    // Snapshot the current holders by cloning Arc handles inside the
    // lock — no string allocations under the mutex.
    let names: Vec<Arc<str>> = {
        let g = holders_for(slot).lock().unwrap_or_else(|e| e.into_inner());
        g.clone()
    };
    let holders = if names.is_empty() {
        "<none>".to_string()
    } else {
        let mut s = String::with_capacity(names.len() * 32);
        for (i, n) in names.iter().enumerate() {
            if i > 0 {
                s.push(',');
            }
            s.push_str(n);
        }
        // Walk back to a UTF-8 char boundary before truncating so a
        // multi-byte holder name that straddles byte 200 (e.g. CJK or
        // emoji in a scoped registry) does not panic. Cap is 200 bytes
        // of payload + 3 bytes for the trailing ellipsis.
        if s.len() > 200 {
            let mut end = 200;
            while end > 0 && !s.is_char_boundary(end) {
                end -= 1;
            }
            s.truncate(end);
            s.push('…');
        }
        s
    };
    event(
        Category::Starvation,
        slot.wire_name(),
        wait,
        Some(&format!(
            r#"{{"waiter":{},"holders":{}}}"#,
            jstr(waiter),
            jstr(&holders)
        )),
    );
}

/// Track an mpsc channel's fill ratio. Register at construction with `register_channel`,
/// then a background sampler reads `Sender::capacity()` every 100ms and emits events.
static CHANNELS: OnceLock<Mutex<Vec<ChannelTracker>>> = OnceLock::new();

struct ChannelTracker {
    name: &'static str,
    capacity: usize,
    sender_capacity_fn: Box<dyn Fn() -> usize + Send + Sync>,
}

pub fn register_channel<T: Send + Sync + 'static>(
    name: &'static str,
    sender: &tokio::sync::mpsc::Sender<T>,
    capacity: usize,
) {
    if rec().is_none() {
        return;
    }
    let weak = sender.downgrade();
    let tracker = ChannelTracker {
        name,
        capacity,
        sender_capacity_fn: Box::new(move || weak.upgrade().map(|s| s.capacity()).unwrap_or(0)),
    };
    let lock = CHANNELS.get_or_init(|| Mutex::new(Vec::new()));
    lock.lock().unwrap_or_else(|e| e.into_inner()).push(tracker);
}

pub fn sample_channels() {
    let Some(lock) = CHANNELS.get() else { return };
    let guard = lock.lock().unwrap_or_else(|e| e.into_inner());
    for t in guard.iter() {
        let remaining = (t.sender_capacity_fn)();
        let used = t.capacity.saturating_sub(remaining);
        let fill = (used as f64 / t.capacity.max(1) as f64) * 100.0;
        instant(
            Category::Channel,
            t.name,
            Some(&format!(
                r#"{{"used":{},"cap":{},"fill_pct":{:.1}}}"#,
                used, t.capacity, fill
            )),
        );
    }
}

/// All slots in declaration order. Iterating this is the canonical way
/// to fan a per-slot operation across every counter without writing
/// out the discriminator literals.
const ALL_SLOTS: [Slot; SLOT_COUNT] = [Slot::Pack, Slot::Tar, Slot::Imp, Slot::Link, Slot::Decode];

pub fn sample_concurrency() {
    let Some(r) = rec() else { return };
    use std::fmt::Write;
    let mut meta = String::with_capacity(80);
    meta.push('{');
    for (idx, slot) in ALL_SLOTS.iter().enumerate() {
        if idx > 0 {
            meta.push(',');
        }
        let value = slot_counter(r, *slot).load(Ordering::Relaxed);
        let _ = write!(meta, r#""{}":{}"#, slot.sample_key(), value);
    }
    meta.push('}');
    instant(Category::Sample, "concurrency", Some(&meta));
}

pub fn spawn_concurrency_sampler() {
    if !enabled() {
        return;
    }
    tokio::spawn(async {
        let mut iv = tokio::time::interval(Duration::from_millis(50));
        iv.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
        let mut tick = 0u32;
        loop {
            iv.tick().await;
            sample_concurrency();
            // Channel sampler runs at 1/4 the rate (200ms)
            tick = tick.wrapping_add(1);
            if tick.is_multiple_of(4) {
                sample_channels();
            }
        }
    });
}

pub fn flush() {
    if let Some(r) = rec() {
        if let Some(file) = &r.file {
            let _ = file.lock().unwrap_or_else(|e| e.into_inner()).flush();
        }
        let n = r.event_count.load(Ordering::Relaxed);
        let total_ms = r.start.elapsed().as_secs_f64() * 1000.0;

        if r.summary {
            print_summary(r, total_ms);
        }

        if r.track_events {
            let evs = r.events.lock().unwrap_or_else(|e| e.into_inner()).clone();
            print_critical_path(&evs, total_ms);
            print_starvation(&evs, total_ms);
            print_what_if(&evs, total_ms);
            print_pkg_lifecycle(&evs, total_ms);
        }

        if r.print_stderr {
            eprintln!(
                "[diag] flushed {} events over {:.1}ms ({:.0}/s)",
                n,
                total_ms,
                (n as f64 / total_ms.max(1.0)) * 1000.0
            );
        }
    }
}

/**
 * Derive a canonical package identifier from inline JSON metadata for
 * cross-event correlation in the per-package lifecycle analyzer.
 *
 * The identifier is always just the `name` field, never `name@version`.
 * Different event categories embed different fields — `task_wait_packument`
 * carries only `name`, while `tarball` carries both `name` and `version`.
 * If the lifecycle analyzer keyed on `name@version` it would treat the
 * resolver wait and the tarball fetch as belonging to two different
 * packages and split the timeline. Keying on `name` alone keeps the
 * lifecycle whole at the cost of conflating multiple resolved versions of
 * the same name (which is rare in a single install).
 *
 * Returns `None` when no `name` field is present (e.g. starvation events
 * which carry `waiter` and `holders` instead).
 */
fn extract_pkg_id(meta: &str) -> Option<String> {
    extract_field(meta, "name")
}

/**
 * Read a JSON string field by substring scan and unescape the result.
 *
 * Locates the field as `"<field>":"`, then walks bytes until an
 * unescaped closing `"`. A `"` is considered escaped when an odd
 * number of `\` precedes it; that handles the `\"` produced by
 * [`jstr`] and the literal-`\\` boundary case (`\\"` is `\` followed
 * by an unescaped quote).
 *
 * The returned string has the standard JSON escape sequences
 * (`\"`, `\\`, `\n`, `\r`, `\t`, `\u00XX`) un-escaped so that the
 * value matches what a real JSON parser would produce.
 */
fn extract_field(meta: &str, field: &str) -> Option<String> {
    let needle = format!("\"{field}\":\"");
    let i = meta.find(&needle)?;
    let after = &meta[i + needle.len()..];
    let bytes = after.as_bytes();
    let mut idx = 0usize;
    while idx < bytes.len() {
        if bytes[idx] == b'"' {
            // Count consecutive `\` immediately before this quote; an
            // odd count means the quote is itself escaped (`\"`), an
            // even count (including zero) means the quote terminates
            // the field.
            let mut bs = 0usize;
            let mut j = idx;
            while j > 0 && bytes[j - 1] == b'\\' {
                bs += 1;
                j -= 1;
            }
            if bs.is_multiple_of(2) {
                return Some(unescape_json_str(&after[..idx]));
            }
        }
        idx += 1;
    }
    None
}

/**
 * Reverse of [`jstr`]. Decodes the standard JSON escape sequences this
 * crate emits: `\"`, `\\`, `\n`, `\r`, `\t`, and `\u00XX` for the
 * control range. Bytes outside those forms pass through unchanged.
 */
fn unescape_json_str(s: &str) -> String {
    let mut out = String::with_capacity(s.len());
    let mut chars = s.chars();
    while let Some(c) = chars.next() {
        if c != '\\' {
            out.push(c);
            continue;
        }
        match chars.next() {
            Some('"') => out.push('"'),
            Some('\\') => out.push('\\'),
            Some('n') => out.push('\n'),
            Some('r') => out.push('\r'),
            Some('t') => out.push('\t'),
            Some('u') => {
                // Read exactly four hex digits; on any malformed input
                // emit the raw `\u` and continue rather than panic.
                let mut hex = String::with_capacity(4);
                for _ in 0..4 {
                    if let Some(h) = chars.next() {
                        hex.push(h);
                    }
                }
                if let Ok(code) = u32::from_str_radix(&hex, 16)
                    && let Some(decoded) = char::from_u32(code)
                {
                    out.push(decoded);
                } else {
                    out.push('\\');
                    out.push('u');
                    out.push_str(&hex);
                }
            }
            Some(other) => {
                out.push('\\');
                out.push(other);
            }
            None => out.push('\\'),
        }
    }
    out
}

/**
 * Whether `e` is an envelope/wrapper span that wraps the entire install
 * and would dominate any longest-chain analysis if not filtered out.
 *
 * `install.total`, every `install_phase.*` row, and a small set of
 * named install-pipeline boundaries fall into this category. The
 * critical-path and what-if analyzers operate on leaf events only;
 * keying off this single predicate keeps both in sync.
 */
fn is_envelope(e: &EventRec) -> bool {
    matches!(e.cat, Category::Install | Category::InstallPhase)
        || e.name == "phase_resolve"
        || e.name == "phase_fetch_await"
        || e.name == "phase_materialize_await"
}

/**
 * Compute the longest-duration chain of strictly non-overlapping events
 * in `sorted` (which must already be sorted by `end_ms` ascending).
 *
 * Implements weighted-interval-scheduling DP in O(n log n): for each
 * event at index `i`, binary-search the latest predecessor `j` with
 * `end_ms[j] <= start_ms[i]`, then choose between extending the chain
 * through `j` (`take`) or skipping `i` (`skip`).
 *
 * Returns `(chain, total)` where `chain` is the indices of the
 * selected events and `total` is the summed duration.
 */
fn longest_chain(sorted: &[&EventRec]) -> (Vec<usize>, f64) {
    let n = sorted.len();
    if n == 0 {
        return (Vec::new(), 0.0);
    }
    let ends: Vec<f64> = sorted.iter().map(|e| e.end_ms).collect();
    let mut p: Vec<Option<usize>> = vec![None; n];
    for i in 0..n {
        let s = sorted[i].start_ms;
        let mut lo = 0i64;
        let mut hi = i as i64 - 1;
        let mut found: Option<usize> = None;
        while lo <= hi {
            let mid = ((lo + hi) / 2) as usize;
            if ends[mid] <= s {
                found = Some(mid);
                lo = mid as i64 + 1;
            } else {
                hi = mid as i64 - 1;
            }
        }
        p[i] = found;
    }
    let mut dp: Vec<f64> = vec![0.0; n];
    let mut include: Vec<bool> = vec![false; n];
    for i in 0..n {
        let dur_i = sorted[i].end_ms - sorted[i].start_ms;
        let take = dur_i + p[i].map_or(0.0, |j| dp[j]);
        let skip = if i == 0 { 0.0 } else { dp[i - 1] };
        if take >= skip {
            dp[i] = take;
            include[i] = true;
        } else {
            dp[i] = skip;
        }
    }
    let total = dp[n - 1];
    let mut chain: Vec<usize> = Vec::new();
    let mut i: i64 = n as i64 - 1;
    while i >= 0 {
        let idx = i as usize;
        if include[idx] {
            chain.push(idx);
            i = p[idx].map(|j| j as i64).unwrap_or(-1);
        } else {
            i -= 1;
        }
    }
    chain.reverse();
    (chain, total)
}

fn print_critical_path(events: &[EventRec], total_ms: f64) {
    if events.is_empty() {
        return;
    }
    let mut sorted: Vec<&EventRec> = events.iter().filter(|e| !is_envelope(e)).collect();
    if sorted.is_empty() {
        return;
    }
    sorted.sort_by(|a, b| {
        a.end_ms
            .partial_cmp(&b.end_ms)
            .unwrap_or(CmpOrdering::Equal)
    });
    let (chain, critical_total) = longest_chain(&sorted);

    eprintln!();
    eprintln!(
        "critical path {:.1}ms ({:.0}% of {:.1}ms wall, {} spans)",
        critical_total,
        (critical_total / total_ms.max(1.0)) * 100.0,
        total_ms,
        chain.len()
    );
    eprintln!(
        "{:>4} {:>9} {:>9} {:<14} {:<28} pkg",
        "#", "start", "dur", "cat", "name"
    );
    // Collapse trivial spans (< 1 ms) into a single summary line so the
    // first useful entry of the chain (typically a multi-second packument
    // or tarball wait) appears at or near the top of the rendered list.
    let trivial_threshold = 1.0;
    let mut printed = 0usize;
    let mut trivial_run = 0usize;
    let mut trivial_run_dur = 0.0f64;
    let mut chain_iter = chain.iter().peekable();
    while let Some(&idx) = chain_iter.next() {
        let e = &sorted[idx];
        let dur = e.end_ms - e.start_ms;
        if dur < trivial_threshold {
            trivial_run += 1;
            trivial_run_dur += dur;
            // Flush the run when we hit a non-trivial span or end of chain.
            let next_trivial = chain_iter
                .peek()
                .map(|&&i| (sorted[i].end_ms - sorted[i].start_ms) < trivial_threshold)
                .unwrap_or(false);
            if !next_trivial {
                eprintln!(
                    "  (collapsed {} sub-1ms spans, {:.1}ms total)",
                    trivial_run, trivial_run_dur
                );
                trivial_run = 0;
                trivial_run_dur = 0.0;
            }
            continue;
        }
        if printed >= 40 {
            break;
        }
        printed += 1;
        let pkg = e.pkg_id.as_deref().unwrap_or("");
        eprintln!(
            "{:>4} {:>8.0}ms {:>8.1}ms {:<14} {:<28} {}",
            printed,
            e.start_ms,
            dur,
            truncate(e.cat.wire(), 14),
            truncate(e.name, 28),
            truncate(pkg, 50)
        );
    }

    // Slack analysis: top 10 fattest spans NOT on critical path.
    let on_path: std::collections::HashSet<usize> = chain.iter().copied().collect();
    let mut off_path: Vec<(usize, f64)> = (0..sorted.len())
        .filter(|i| !on_path.contains(i))
        .map(|i| (i, sorted[i].end_ms - sorted[i].start_ms))
        .collect();
    off_path.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(CmpOrdering::Equal));

    eprintln!();
    eprintln!("top off-critical (already-overlapped, recoverable=0):");
    for (idx, dur) in off_path.iter().take(10) {
        let e = &sorted[*idx];
        let pkg = e.pkg_id.as_deref().unwrap_or("");
        eprintln!(
            "  {:>8.1}ms {:<14} {:<28} {}",
            dur,
            truncate(e.cat.wire(), 14),
            truncate(e.name, 28),
            truncate(pkg, 50)
        );
    }
}

fn print_starvation(events: &[EventRec], _total_ms: f64) {
    use std::collections::HashMap;
    let starv: Vec<&EventRec> = events
        .iter()
        .filter(|e| e.cat == Category::Starvation)
        .collect();
    if starv.is_empty() {
        return;
    }
    // Group by sem name. For each, top blamers (holder pkg names appearing in meta).
    let mut by_sem: HashMap<&str, Vec<&EventRec>> = HashMap::new();
    for e in &starv {
        by_sem.entry(e.name).or_default().push(e);
    }
    eprintln!();
    eprintln!("starvation events ({} total):", starv.len());
    eprintln!(
        "{:<16} {:>6} {:>9} {:>9} top_blamers",
        "sem", "n", "sum_ms", "max_ms"
    );
    let mut keys: Vec<&&str> = by_sem.keys().collect();
    keys.sort();
    for k in keys {
        let evs = &by_sem[k];
        let sum: f64 = evs.iter().map(|e| e.end_ms - e.start_ms).sum();
        let max: f64 = evs
            .iter()
            .map(|e| e.end_ms - e.start_ms)
            .fold(0.0_f64, f64::max);
        // Tally holder names from each starvation event's meta.holders (comma-separated).
        let mut blame_count: HashMap<String, u32> = HashMap::new();
        for e in evs {
            let Some(m) = &e.meta else { continue };
            let Some(holders_field) = extract_field(m, "holders") else {
                continue;
            };
            for h in holders_field.split(',') {
                let h = h.trim();
                if h.is_empty() || h == "<none>" {
                    continue;
                }
                *blame_count.entry(h.to_string()).or_insert(0) += 1;
            }
        }
        let mut blamers: Vec<(String, u32)> = blame_count.into_iter().collect();
        blamers.sort_by_key(|b| std::cmp::Reverse(b.1));
        let top: String = blamers
            .iter()
            .take(3)
            .map(|(n, c)| format!("{}({})", truncate(n, 30), c))
            .collect::<Vec<_>>()
            .join(" ");
        eprintln!(
            "{:<16} {:>6} {:>7.0}ms {:>7.0}ms {}",
            k,
            evs.len(),
            sum,
            max,
            top
        );
    }
}

/// What-if causal simulator. For each high-impact (cat,name) bucket, compute how much
/// wall time would drop if that bucket were 25%, 50%, 100% faster — by simulating the
/// span-duration reduction over the critical path only. Off-critical-path spans have
/// zero recoverable wall impact (they were already overlapped).
fn print_what_if(events: &[EventRec], total_ms: f64) {
    use std::collections::HashMap;
    let leaf_events: Vec<&EventRec> = events.iter().filter(|e| !is_envelope(e)).collect();
    if leaf_events.is_empty() {
        return;
    }

    // Compute on-critical-path duration per (cat,name): for each event on the critical
    // path, attribute its duration to the (cat,name) bucket.
    let mut sorted = leaf_events.clone();
    sorted.sort_by(|a, b| {
        a.end_ms
            .partial_cmp(&b.end_ms)
            .unwrap_or(CmpOrdering::Equal)
    });
    let (on_path, critical_total) = longest_chain(&sorted);

    // Sum on-critical-path duration per (cat,name). Keyed by the typed
    // [`AggKey`] so the hash is a pointer compare rather than a string
    // compare; with hundreds of events each, this matters.
    let mut bucket_critical: HashMap<AggKey, f64> = HashMap::new();
    for &idx in &on_path {
        let e = sorted[idx];
        let dur = e.end_ms - e.start_ms;
        *bucket_critical.entry((e.cat, e.name)).or_insert(0.0) += dur;
    }

    // Total off-critical wall per (cat,name); these are 0 recoverable.
    let mut bucket_total: HashMap<AggKey, f64> = HashMap::new();
    for e in &leaf_events {
        *bucket_total.entry((e.cat, e.name)).or_insert(0.0) += e.end_ms - e.start_ms;
    }

    // Sort by on-critical-path contribution
    let mut rows: Vec<(AggKey, f64, f64)> = bucket_critical
        .into_iter()
        .map(|(k, on)| {
            let total_b = bucket_total.get(&k).copied().unwrap_or(0.0);
            (k, on, total_b)
        })
        .collect();
    rows.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(CmpOrdering::Equal));

    eprintln!();
    eprintln!(
        "what-if speedup simulator (critical path total {:.0}ms / wall {:.0}ms)",
        critical_total, total_ms
    );
    eprintln!(
        "{:<14} {:<32} {:>10} {:>10} {:>10} {:>10} {:>10} {:>9}",
        "cat", "name", "on_path", "off_path", "−25%", "−50%", "−100%", "%recoverable"
    );
    for ((cat, name), on, total_b) in rows.iter().take(15) {
        let off = total_b - on;
        let s25 = on * 0.25;
        let s50 = on * 0.50;
        let s100 = on * 1.00;
        let pct_rec = (on / total_ms.max(1.0)) * 100.0;
        eprintln!(
            "{:<14} {:<32} {:>8.0}ms {:>8.0}ms {:>+8.0}ms {:>+8.0}ms {:>+8.0}ms {:>8.1}%",
            truncate(cat.wire(), 14),
            truncate(name, 32),
            on,
            off,
            -s25,
            -s50,
            -s100,
            pct_rec
        );
    }
}

fn print_pkg_lifecycle(events: &[EventRec], total_ms: f64) {
    use std::collections::BTreeMap;
    // Group events by pkg_id, collect (cat, name, dur, start) per pkg.
    let mut by_pkg: BTreeMap<String, Vec<&EventRec>> = BTreeMap::new();
    for e in events {
        if let Some(pkg) = &e.pkg_id {
            by_pkg.entry(pkg.clone()).or_default().push(e);
        }
    }
    if by_pkg.is_empty() {
        return;
    }
    // Score each pkg: total wall (max end - min start). Sort desc.
    let mut scored: Vec<(String, f64, f64, f64, usize)> = by_pkg
        .iter()
        .map(|(pkg, evs)| {
            let min_start = evs.iter().map(|e| e.start_ms).fold(f64::INFINITY, f64::min);
            let max_end = evs.iter().map(|e| e.end_ms).fold(0.0_f64, f64::max);
            let sum_dur: f64 = evs.iter().map(|e| e.end_ms - e.start_ms).sum();
            (pkg.clone(), min_start, max_end, sum_dur, evs.len())
        })
        .collect();
    scored.sort_by(|a, b| {
        (b.2 - b.1)
            .partial_cmp(&(a.2 - a.1))
            .unwrap_or(CmpOrdering::Equal)
    });

    eprintln!();
    eprintln!(
        "per-package lifecycle (top 20 by wall span, {} pkgs total)",
        scored.len()
    );
    eprintln!(
        "{:<48} {:>9} {:>9} {:>8} {:>5}",
        "pkg", "first", "last", "span", "evts"
    );
    for (pkg, min_s, max_e, _sum, n) in scored.iter().take(20) {
        let span = max_e - min_s;
        let pct = (span / total_ms.max(1.0)) * 100.0;
        eprintln!(
            "{:<48} {:>7.0}ms {:>7.0}ms {:>6.0}ms {:>5} {:>4.1}%",
            truncate(pkg, 48),
            min_s,
            max_e,
            span,
            n,
            pct
        );
    }
}

/**
 * Truncate `s` to at most `n` bytes for tabular display, taking care
 * not to slice through the middle of a UTF-8 code-point sequence.
 *
 * The result is suffixed with the ellipsis character `…` (3 bytes)
 * whenever truncation occurred. Falls back to the empty string if
 * the requested cap is too small to fit even the ellipsis.
 */
pub fn truncate(s: &str, n: usize) -> String {
    if s.len() <= n {
        return s.to_string();
    }
    let cap = n.saturating_sub(1);
    if cap == 0 {
        return String::new();
    }
    let mut end = cap;
    while end > 0 && !s.is_char_boundary(end) {
        end -= 1;
    }
    format!("{}…", &s[..end])
}

fn print_summary(r: &Recorder, total_ms: f64) {
    let agg = r.aggregates.lock().unwrap_or_else(|e| e.into_inner());
    let mut rows: Vec<(AggKey, AggVal)> = agg.iter().map(|(k, v)| (*k, *v)).collect();
    drop(agg);
    rows.sort_by_key(|b| std::cmp::Reverse(b.1.sum_ns));

    eprintln!("diag total {:.1}ms", total_ms);
    eprintln!(
        "{:<10} {:<32} {:>6} {:>9} {:>9} {:>9} {:>7}",
        "cat", "name", "n", "sum_ms", "mean_ms", "max_ms", "%wall"
    );
    for ((cat, name), stats) in rows.iter().take(40) {
        let sum_ms = (stats.sum_ns as f64) / 1_000_000.0;
        let mean_ms = sum_ms / (stats.count as f64);
        let max_ms = (stats.max_ns as f64) / 1_000_000.0;
        let pct = (sum_ms / total_ms.max(1.0)) * 100.0;
        eprintln!(
            "{:<10} {:<32} {:>6} {:>9.1} {:>9.2} {:>9.1} {:>6.1}%",
            cat.wire(),
            name,
            stats.count,
            sum_ms,
            mean_ms,
            max_ms,
            pct
        );
    }
}

/// Helper: time a synchronous expression and return (value, duration).
#[inline]
pub fn time_sync<T>(category: Category, name: &'static str, f: impl FnOnce() -> T) -> T {
    if !enabled() {
        return f();
    }
    let start = Instant::now();
    let v = f();
    event(category, name, start.elapsed(), None);
    v
}

/// Escape a string for safe inclusion in a JSON value.
pub fn jstr(s: &str) -> String {
    let mut out = String::with_capacity(s.len() + 2);
    out.push('"');
    for c in s.chars() {
        match c {
            '"' => out.push_str("\\\""),
            '\\' => out.push_str("\\\\"),
            '\n' => out.push_str("\\n"),
            '\r' => out.push_str("\\r"),
            '\t' => out.push_str("\\t"),
            c if (c as u32) < 0x20 => out.push_str(&format!("\\u{:04x}", c as u32)),
            c => out.push(c),
        }
    }
    out.push('"');
    out
}

/**
 * Construct a [`Span`] with optional `key = value` metadata pairs.
 *
 * The metadata `format!` runs only when [`enabled`] returns `true`, so
 * call sites pay only the atomic-load fast path when diagnostics are
 * disabled. Values are escaped via [`jstr`].
 *
 * # Examples
 *
 * ```ignore
 * let _diag = diag_span!("registry", "fetch_packument", name = pkg_name);
 * let _diag = diag_span!("fetch", "tarball", name = name, version = ver);
 * ```
 */
#[macro_export]
macro_rules! diag_span {
    ($cat:expr, $name:expr) => {
        $crate::diag::Span::new($cat, $name)
    };
    ($cat:expr, $name:expr, $($k:ident = $v:expr),+ $(,)?) => {{
        $crate::diag::Span::new($cat, $name).with_meta(|| {
            format!(
                "{{{}}}",
                [$(format!("\"{}\":{}", stringify!($k), $crate::diag::jstr(&$v.to_string()))),+].join(",")
            )
        })
    }};
}

/**
 * Emit an instantaneous marker with optional `key = value` metadata.
 *
 * The metadata `format!` runs only when [`enabled`] returns `true`.
 * Values are escaped via [`jstr`].
 */
#[macro_export]
macro_rules! diag_instant {
    ($cat:expr, $name:expr) => {
        $crate::diag::instant($cat, $name, None)
    };
    ($cat:expr, $name:expr, $($k:ident = $v:expr),+ $(,)?) => {{
        $crate::diag::instant_lazy($cat, $name, || {
            format!(
                "{{{}}}",
                [$(format!("\"{}\":{}", stringify!($k), $crate::diag::jstr(&$v.to_string()))),+].join(",")
            )
        });
    }};
}

#[cfg(test)]
mod tests {
    use super::*;

    /// `jstr` must round-trip valid JSON for every category of input
    /// the meta builder hands it: ASCII, control chars, embedded
    /// quotes, multi-byte UTF-8.
    #[test]
    fn jstr_escapes_all_categories() {
        assert_eq!(jstr("hi"), "\"hi\"");
        assert_eq!(jstr("a\"b"), "\"a\\\"b\"");
        assert_eq!(jstr("a\\b"), "\"a\\\\b\"");
        assert_eq!(jstr("a\nb\tc\rd"), "\"a\\nb\\tc\\rd\"");
        assert_eq!(jstr("\x01"), "\"\\u0001\"");
        assert_eq!(jstr("café"), "\"café\"");
        assert_eq!(jstr("日本"), "\"日本\"");
    }

    /// `truncate` must never panic on multi-byte boundaries — CJK and
    /// scoped npm package names land mid-codepoint at common widths.
    #[test]
    fn truncate_is_utf8_boundary_safe() {
        let s = "日本語パッケージ";
        for n in 1..=s.len() + 2 {
            let _ = truncate(s, n);
        }
        assert_eq!(truncate("hello", 10), "hello");
        assert_eq!(truncate("hello", 4), "hel…");
    }

    /// `extract_pkg_id` returns the canonical name (without `@version`)
    /// so the lifecycle analyzer correlates events of one package
    /// across categories that disagree on whether to include version.
    #[test]
    fn extract_pkg_id_returns_name_only() {
        let m = r#"{"name":"lodash","version":"4.17.21"}"#;
        assert_eq!(extract_pkg_id(m).as_deref(), Some("lodash"));
        let m2 = r#"{"name":"lodash"}"#;
        assert_eq!(extract_pkg_id(m2).as_deref(), Some("lodash"));
        let m3 = r#"{"version":"1.0.0"}"#;
        assert!(extract_pkg_id(m3).is_none());
    }

    /// Exercise the longest-chain DP against a brute-force oracle on
    /// small inputs. Catches off-by-one in the `p[i]` binary search,
    /// the take/skip tie-break, and reconstruction edge cases.
    #[test]
    fn longest_chain_matches_brute_force() {
        fn brute(events: &[(f64, f64)]) -> f64 {
            let n = events.len();
            let mut best = 0.0_f64;
            for mask in 0u32..(1u32 << n) {
                let mut picked: Vec<(f64, f64)> = (0..n)
                    .filter(|i| mask & (1 << i) != 0)
                    .map(|i| events[i])
                    .collect();
                picked.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap());
                let ok = picked.windows(2).all(|w| w[0].1 <= w[1].0);
                if !ok {
                    continue;
                }
                let total: f64 = picked.iter().map(|(s, e)| e - s).sum();
                if total > best {
                    best = total;
                }
            }
            best
        }
        let cases: &[&[(f64, f64)]] = &[
            &[],
            &[(0.0, 10.0)],
            &[(0.0, 10.0), (5.0, 15.0)],
            &[(0.0, 10.0), (10.0, 20.0)],
            &[(0.0, 5.0), (3.0, 8.0), (7.0, 12.0)],
            &[(0.0, 100.0), (10.0, 20.0), (30.0, 40.0)],
        ];
        for case in cases {
            let evs: Vec<EventRec> = case
                .iter()
                .map(|(s, e)| EventRec {
                    cat: Category::Resolver,
                    name: "y",
                    start_ms: *s,
                    end_ms: *e,
                    pkg_id: None,
                    meta: None,
                })
                .collect();
            let mut sorted: Vec<&EventRec> = evs.iter().collect();
            sorted.sort_by(|a, b| {
                a.end_ms
                    .partial_cmp(&b.end_ms)
                    .unwrap_or(CmpOrdering::Equal)
            });
            let (_, total) = longest_chain(&sorted);
            let bf = brute(case);
            assert!(
                (total - bf).abs() < 1e-6,
                "case {case:?}: dp={total} bf={bf}"
            );
        }
    }

    /// `Slot::wire_name` and `sample_key` must agree with the variant
    /// order used internally by `slot_counter` / array indexing.
    #[test]
    fn slot_wire_names_are_distinct_and_stable() {
        let names: Vec<&'static str> = ALL_SLOTS.iter().map(|s| s.wire_name()).collect();
        assert_eq!(
            names,
            vec![
                "packument_sem",
                "tarball_sem",
                "import_sem",
                "link_sem",
                "decode_sem"
            ]
        );
        let keys: Vec<&'static str> = ALL_SLOTS.iter().map(|s| s.sample_key()).collect();
        assert_eq!(keys, vec!["pack", "tar", "imp", "link", "decode"]);
    }

    /// `extract_field` must skip past `\"` produced by [`jstr`]; the
    /// previous naive parser stopped at the first `"` and silently
    /// truncated the value.
    #[test]
    fn extract_field_handles_escaped_quotes() {
        let meta = r#"{"name":"foo\"bar","version":"1.0"}"#;
        assert_eq!(extract_field(meta, "name").as_deref(), Some(r#"foo"bar"#));
        assert_eq!(extract_field(meta, "version").as_deref(), Some("1.0"));
        // Round-trip through jstr/extract preserves an arbitrary value.
        let original = "weird \"name\" with \\ slash and \n newline";
        let wire = format!("{{\"name\":{}}}", jstr(original));
        assert_eq!(extract_field(&wire, "name").as_deref(), Some(original));
    }

    /// `validate_diag_path` rejects Windows-style UNC on every platform
    /// but accepts POSIX double-slash prefixes on non-Windows targets.
    #[test]
    fn validate_diag_path_rejects_unc_only() {
        use std::path::Path;
        assert!(validate_diag_path(Path::new(r"\\srv\share\f.jsonl")).is_err());
        assert!(validate_diag_path(Path::new("./local.jsonl")).is_ok());
        // POSIX `//foo` should be accepted on Unix; on Windows it is
        // rejected as the MSYS/Git-Bash UNC wire form.
        let r = validate_diag_path(Path::new("//tmp/foo.jsonl"));
        if cfg!(windows) {
            assert!(r.is_err());
        } else {
            assert!(r.is_ok());
        }
    }
}