ktstr/test_support/
probe_metrics.rs

1//! Flat-metric lookup helpers for the jemalloc-probe integration
2//! tests.
3//!
4//! Lives in the crate (not inline in `tests/jemalloc_probe_tests.rs`)
5//! so the logic is reachable from `#[cfg(test)]` unit tests. The
6//! probe test binary registers `#[ktstr_test]` entries, which
7//! activates the early-dispatch ctor's `--list` intercept — any
8//! plain `#[test]` fn declared in that binary is hidden from
9//! nextest's discovery (see the long comment at the head of
10//! `tests/jemalloc_alloc_worker_exit_codes.rs`). Moving the
11//! helpers here lets the ExceedsCap branch and friends be
12//! pinned by lib-crate unit tests that run under `cargo nextest
13//! run --lib` without the ctor path.
14//!
15//! All items are `pub` so the integration test file at
16//! `tests/jemalloc_probe_tests.rs` can import them through the
17//! `test_support` re-export surface.
18
19use super::payload::{Metric, PayloadMetrics};
20
21/// Outcome of scanning the flat metric list for a tid-keyed thread
22/// entry. Distinguishes "tid not present" from "tid present but
23/// `allocated_bytes` missing" AND from "probe emitted more than
24/// [`MAX_SCAN_INDEX`] contiguous threads without the caller's
25/// tid appearing in the prefix" — so a caller can issue a precise
26/// diagnostic instead of a blanket "not found".
27pub enum ThreadLookup {
28    /// `snapshots.{snap_idx}.threads.N.Ok.tid == worker_tid` and
29    /// `snapshots.{snap_idx}.threads.N.Ok.allocated_bytes` are both
30    /// present. Returns the observed counter plus the companion
31    /// `deallocated_bytes` (if emitted).
32    Found {
33        allocated_bytes: u64,
34        deallocated_bytes: Option<u64>,
35    },
36    /// Probe emitted a `snapshots.{snap_idx}.threads.N.{Ok,Err}.tid`
37    /// matching `worker_tid`, but no
38    /// `snapshots.{snap_idx}.threads.N.Ok.allocated_bytes` sibling.
39    /// Either the tid was on the `Err` arm (`.Err.tid`), where the
40    /// probe hit an error on that thread and `error`/`error_kind`
41    /// replace the counter fields, or it was on the `Ok` arm but the
42    /// `allocated_bytes` sibling was absent.
43    MissingAllocatedBytes,
44    /// No `snapshots.{snap_idx}.threads.N.{Ok,Err}.tid == worker_tid`
45    /// entry in the flat metric list. Probe did not visit the worker
46    /// at all.
47    TidAbsent,
48    /// The flat metric list contained at least [`MAX_SCAN_INDEX`]
49    /// contiguous `snapshots.{snap_idx}.threads.N.{Ok,Err}.tid`
50    /// entries, none of which matched `worker_tid`, and the scan hit
51    /// the cap
52    /// before reaching the array terminator. The worker's tid may
53    /// exist at a later index and be invisible to the scan.
54    /// Distinct from `TidAbsent` — this outcome means the lookup is
55    /// inconclusive, not that the probe definitively skipped the
56    /// worker.
57    ExceedsCap,
58}
59
60/// Safety bound on the `snapshots.*.threads.N.tid` scan in
61/// [`lookup_thread`], [`snapshot_worker_allocated`], [`thread_count`],
62/// and [`snapshot_count`]. Realistic probe runs see at most a few
63/// dozen threads in a single-allocator worker process; hitting this
64/// cap indicates either an unexpectedly wide target or a flat-metric
65/// schema change that broke the terminator convention.
66pub const MAX_SCAN_INDEX: usize = 1024;
67
68/// Find a metric by exact name. Returns `None` if absent.
69pub fn find_metric<'a>(metrics: &'a PayloadMetrics, key: &str) -> Option<&'a Metric> {
70    metrics.metrics.iter().find(|m| m.name == key)
71}
72
73/// Does the flat metric list contain a metric with this exact name?
74/// Thin wrapper around [`find_metric`] for the common existence
75/// check — avoids forcing every call site to spell `.is_some()`.
76pub fn has_metric(metrics: &PayloadMetrics, key: &str) -> bool {
77    find_metric(metrics, key).is_some()
78}
79
80/// Fetch a metric by exact name and return its numeric value as a
81/// `u64`. Returns `None` if the metric is absent. Thin wrapper
82/// around [`find_metric`] + `value as u64` for the common
83/// numeric-lookup shape.
84///
85/// # `f64` → `u64` precision
86///
87/// JSON numbers parse into the probe's flat-metric list as `f64`
88/// (serde_json's number type). Integer values round-trip through
89/// `f64` without precision loss only up to `2^53`
90/// (`9_007_199_254_740_992`); above that bound, adjacent `u64`
91/// values collapse onto the same `f64` and `value as u64` loses
92/// the low-order bits. The probe's emitted counters
93/// (`allocated_bytes`, `deallocated_bytes`, tid numbers, snapshot
94/// timestamps in seconds) are in practice far below this
95/// threshold on realistic workloads: a 64-bit byte counter would
96/// require >8 PiB of total-allocated memory, and Linux pids are
97/// capped at `2^22`. The bound is therefore a soft invariant —
98/// consumers should NOT feed arbitrary externally-controlled
99/// values through this helper without a prior range check.
100///
101/// A `debug_assert!` on the same bound catches the invariant
102/// locally so a future metric that genuinely exceeds `2^53` lights
103/// up in a debug build before the truncation silently corrupts a
104/// downstream comparison; release builds trust the soft invariant
105/// and perform the `as u64` cast unconditionally.
106pub fn find_metric_u64(metrics: &PayloadMetrics, key: &str) -> Option<u64> {
107    find_metric(metrics, key).map(|m| {
108        debug_assert!(
109            m.value.is_finite() && m.value >= 0.0 && m.value <= (1u64 << 53) as f64,
110            "metric {:?} value {} outside the f64→u64 lossless range \
111             [0, 2^53]; the `as u64` cast will truncate silently. \
112             Either range-check externally-sourced input before \
113             landing it in the flat metrics list, or consume the \
114             metric via `.value` (f64) instead of this u64 helper.",
115            m.name,
116            m.value,
117        );
118        m.value as u64
119    })
120}
121
122/// Walk `0..cap` applying `key_fn(i)` to form a metric name and
123/// count how many consecutive indices yield a present metric.
124/// Stops at the first miss — the probe's `walk_json_leaves`
125/// flattening yields indices 0..N contiguously, so the first gap is
126/// the array terminator. Returns the count, which may be `cap` if
127/// every index below the bound is present (inconclusive — the
128/// caller should treat `cap` as "saturated scan, real count may be
129/// larger").
130pub fn count_indexed_metrics<F>(metrics: &PayloadMetrics, cap: usize, key_fn: F) -> usize
131where
132    F: Fn(usize) -> String,
133{
134    let mut n = 0;
135    for i in 0..cap {
136        if find_metric(metrics, &key_fn(i)).is_some() {
137            n += 1;
138        } else {
139            break;
140        }
141    }
142    n
143}
144
145/// Extract the `allocated_bytes` / `deallocated_bytes` values for
146/// `worker_tid` from snapshot 0 in the flat metric list produced by
147/// `walk_json_leaves` over the probe's JSON output.
148///
149/// `ThreadResult` is externally-tagged so the probe emits
150/// `{"pid":P,"snapshots":[{"timestamp_unix_sec":T,"threads":[{"Ok":{"tid":T,"allocated_bytes":A,"deallocated_bytes":D,...}}, {"Err":{"tid":T,"error":...,"error_kind":...}}, ...]}, ...]}`
151/// which `walk_json_leaves` flattens per array index into contiguous
152/// keys `snapshots.0.threads.0.Ok.tid`, `snapshots.0.threads.1.Err.tid`,
153/// … with no gaps. Each index carries exactly one variant wrapper.
154/// The scan stops at the first index where neither `.Ok.tid` nor
155/// `.Err.tid` exists (the natural array terminator) and returns
156/// [`ThreadLookup::TidAbsent`]. If the cap is reached without hitting
157/// the terminator AND without matching `worker_tid`, returns
158/// [`ThreadLookup::ExceedsCap`]. If the matching tid is on the `Err`
159/// arm (no `allocated_bytes` sibling), returns
160/// [`ThreadLookup::MissingAllocatedBytes`].
161pub fn lookup_thread(metrics: &PayloadMetrics, worker_tid: i32) -> ThreadLookup {
162    let worker_tid_f64 = worker_tid as f64;
163    for i in 0..MAX_SCAN_INDEX {
164        let ok_tid_key = format!("snapshots.0.threads.{i}.Ok.tid");
165        let err_tid_key = format!("snapshots.0.threads.{i}.Err.tid");
166        let (tid_m, is_ok) = match find_metric(metrics, &ok_tid_key) {
167            Some(m) => (m, true),
168            None => match find_metric(metrics, &err_tid_key) {
169                Some(m) => (m, false),
170                None => return ThreadLookup::TidAbsent,
171            },
172        };
173        if tid_m.value == worker_tid_f64 {
174            if !is_ok {
175                return ThreadLookup::MissingAllocatedBytes;
176            }
177            let alloc_key = format!("snapshots.0.threads.{i}.Ok.allocated_bytes");
178            let dealloc_key = format!("snapshots.0.threads.{i}.Ok.deallocated_bytes");
179            let allocated_bytes = match find_metric(metrics, &alloc_key).map(|m| m.value as u64) {
180                Some(v) => v,
181                None => return ThreadLookup::MissingAllocatedBytes,
182            };
183            let deallocated_bytes = find_metric(metrics, &dealloc_key).map(|m| m.value as u64);
184            return ThreadLookup::Found {
185                allocated_bytes,
186                deallocated_bytes,
187            };
188        }
189    }
190    // Loop ran to completion — every index 0..MAX_SCAN_INDEX had a
191    // tid entry (Ok or Err), and none matched. A contiguous-array
192    // terminator would have early-returned `TidAbsent`, so the cap
193    // was hit with data remaining. Surface the inconclusive outcome
194    // distinctly from genuine absence.
195    ThreadLookup::ExceedsCap
196}
197
198/// Extract `snapshots.{snap_idx}.threads[*].allocated_bytes` for the
199/// thread whose tid matches `worker_tid`. Returns [`ThreadLookup`]
200/// so callers distinguish "tid absent" from "cap hit before tid
201/// seen" from "allocated_bytes sibling missing" — parallel to
202/// [`lookup_thread`], which covers the single-snapshot path.
203pub fn snapshot_worker_allocated(
204    metrics: &PayloadMetrics,
205    snap_idx: usize,
206    worker_tid: i32,
207) -> ThreadLookup {
208    let worker_tid_f64 = worker_tid as f64;
209    for j in 0..MAX_SCAN_INDEX {
210        let ok_tid_key = format!("snapshots.{snap_idx}.threads.{j}.Ok.tid");
211        let err_tid_key = format!("snapshots.{snap_idx}.threads.{j}.Err.tid");
212        let (tid_m, is_ok) = match find_metric(metrics, &ok_tid_key) {
213            Some(m) => (m, true),
214            None => match find_metric(metrics, &err_tid_key) {
215                Some(m) => (m, false),
216                None => return ThreadLookup::TidAbsent,
217            },
218        };
219        if tid_m.value == worker_tid_f64 {
220            if !is_ok {
221                return ThreadLookup::MissingAllocatedBytes;
222            }
223            let alloc_key = format!("snapshots.{snap_idx}.threads.{j}.Ok.allocated_bytes");
224            let dealloc_key = format!("snapshots.{snap_idx}.threads.{j}.Ok.deallocated_bytes");
225            let allocated_bytes = match find_metric(metrics, &alloc_key).map(|m| m.value as u64) {
226                Some(v) => v,
227                None => return ThreadLookup::MissingAllocatedBytes,
228            };
229            let deallocated_bytes = find_metric(metrics, &dealloc_key).map(|m| m.value as u64);
230            return ThreadLookup::Found {
231                allocated_bytes,
232                deallocated_bytes,
233            };
234        }
235    }
236    ThreadLookup::ExceedsCap
237}
238
239/// Count the number of `snapshots.0.threads.N.{Ok,Err}.tid` entries
240/// in the flat metric list, capped at [`MAX_SCAN_INDEX`]. Each index
241/// carries exactly one variant wrapper (`Ok` or `Err`); the count
242/// terminates at the first index where neither exists.
243pub fn thread_count(metrics: &PayloadMetrics) -> usize {
244    let mut n = 0;
245    for i in 0..MAX_SCAN_INDEX {
246        let ok_key = format!("snapshots.0.threads.{i}.Ok.tid");
247        let err_key = format!("snapshots.0.threads.{i}.Err.tid");
248        if find_metric(metrics, &ok_key).is_some() || find_metric(metrics, &err_key).is_some() {
249            n += 1;
250        } else {
251            break;
252        }
253    }
254    n
255}
256
257/// Count the number of `snapshots.N.timestamp_unix_sec` entries in
258/// the flat metric list, capped at [`MAX_SCAN_INDEX`].
259pub fn snapshot_count(metrics: &PayloadMetrics) -> usize {
260    count_indexed_metrics(metrics, MAX_SCAN_INDEX, |i| {
261        format!("snapshots.{i}.timestamp_unix_sec")
262    })
263}
264
265/// Flatten the full `(name, value)` pair list for diagnostic
266/// rendering inside error messages. Returned as an owned
267/// `Vec<(&str, f64)>` so call sites spell the diagnostic as a single
268/// `{:?}` formatter argument instead of re-typing the
269/// `.iter().map(...).collect()` chain at every site.
270///
271/// Intended for "probe returned nothing we expected" error paths —
272/// when a lookup helper ([`lookup_thread`], [`snapshot_worker_allocated`],
273/// [`find_metric_u64`]) returns a miss, dumping the observed flat metric
274/// list into the failure message is usually the fastest triage step.
275pub fn flat_metrics_dump(metrics: &PayloadMetrics) -> Vec<(&str, f64)> {
276    metrics
277        .metrics
278        .iter()
279        .map(|m| (m.name.as_str(), m.value))
280        .collect()
281}
282
283#[cfg(test)]
284mod tests {
285    use super::*;
286
287    fn metric(name: &str, value: f64) -> Metric {
288        use super::super::payload::{MetricStream, Polarity};
289        Metric {
290            name: name.to_owned(),
291            value,
292            polarity: Polarity::Unknown,
293            unit: String::new(),
294            stream: MetricStream::Stdout,
295        }
296    }
297
298    fn empty_payload() -> PayloadMetrics {
299        PayloadMetrics {
300            payload_index: 0,
301            metrics: Vec::new(),
302            exit_code: 0,
303        }
304    }
305
306    fn push_ok_tid(metrics: &mut PayloadMetrics, idx: usize, tid: f64) {
307        metrics
308            .metrics
309            .push(metric(&format!("snapshots.0.threads.{idx}.Ok.tid"), tid));
310    }
311
312    fn push_err_tid(metrics: &mut PayloadMetrics, idx: usize, tid: f64) {
313        metrics
314            .metrics
315            .push(metric(&format!("snapshots.0.threads.{idx}.Err.tid"), tid));
316    }
317
318    fn push_alloc(metrics: &mut PayloadMetrics, idx: usize, alloc: f64) {
319        metrics.metrics.push(metric(
320            &format!("snapshots.0.threads.{idx}.Ok.allocated_bytes"),
321            alloc,
322        ));
323    }
324
325    /// Empty flat-metric list → no tid entries at all → terminator
326    /// at index 0 → `TidAbsent`.
327    #[test]
328    fn lookup_thread_empty_metrics_returns_tid_absent() {
329        let m = empty_payload();
330        assert!(matches!(lookup_thread(&m, 42), ThreadLookup::TidAbsent));
331    }
332
333    /// Matching tid with an `allocated_bytes` sibling → `Found`
334    /// carrying the observed counter.
335    #[test]
336    fn lookup_thread_matching_tid_returns_found() {
337        let mut m = empty_payload();
338        push_ok_tid(&mut m, 0, 42.0);
339        push_alloc(&mut m, 0, 1_048_576.0);
340        match lookup_thread(&m, 42) {
341            ThreadLookup::Found {
342                allocated_bytes,
343                deallocated_bytes,
344            } => {
345                assert_eq!(allocated_bytes, 1_048_576);
346                assert_eq!(deallocated_bytes, None);
347            }
348            _ => panic!("expected ThreadLookup::Found"),
349        }
350    }
351
352    /// Matching tid but no `allocated_bytes` sibling → the probe hit
353    /// an error on that thread → `MissingAllocatedBytes`.
354    #[test]
355    fn lookup_thread_missing_allocated_bytes_returns_missing_variant() {
356        let mut m = empty_payload();
357        push_ok_tid(&mut m, 0, 42.0);
358        // no matching `.allocated_bytes`
359        assert!(matches!(
360            lookup_thread(&m, 42),
361            ThreadLookup::MissingAllocatedBytes
362        ));
363    }
364
365    /// Matching tid on the Err arm — `ThreadResult::Err` carries the
366    /// tid but no `allocated_bytes` sibling by design. Direct
367    /// detection via the `.Err.tid` path discriminator returns
368    /// `MissingAllocatedBytes` so callers route the Err arm through
369    /// the same diagnostic path as a malformed Ok entry.
370    #[test]
371    fn lookup_thread_err_arm_returns_missing_allocated_bytes() {
372        let mut m = empty_payload();
373        push_err_tid(&mut m, 0, 42.0);
374        assert!(matches!(
375            lookup_thread(&m, 42),
376            ThreadLookup::MissingAllocatedBytes
377        ));
378    }
379
380    /// A contiguous run of tids that does NOT include the caller's
381    /// tid, but terminates BEFORE the cap → natural-terminator path
382    /// → `TidAbsent` (not `ExceedsCap`).
383    #[test]
384    fn lookup_thread_contiguous_prefix_without_match_returns_tid_absent() {
385        let mut m = empty_payload();
386        for i in 0..10 {
387            push_ok_tid(&mut m, i, (1000 + i) as f64);
388        }
389        assert!(matches!(lookup_thread(&m, 42), ThreadLookup::TidAbsent));
390    }
391
392    /// The full-cap case: fill indices `0..MAX_SCAN_INDEX`
393    /// with non-matching tids, then call lookup_thread with a tid
394    /// that isn't in the list. The scan runs all 1024 iterations,
395    /// never hits a terminator, never matches, and therefore must
396    /// return `ExceedsCap` — distinct from `TidAbsent`.
397    #[test]
398    fn lookup_thread_saturated_scan_without_match_returns_exceeds_cap() {
399        let mut m = empty_payload();
400        for i in 0..MAX_SCAN_INDEX {
401            // tids chosen so none is equal to the probe tid below.
402            push_ok_tid(&mut m, i, (1_000_000 + i) as f64);
403        }
404        let target_tid: i32 = 42;
405        let outcome = lookup_thread(&m, target_tid);
406        assert!(
407            matches!(outcome, ThreadLookup::ExceedsCap),
408            "saturated scan without match must return ExceedsCap; got other variant"
409        );
410    }
411
412    /// Same invariant for `snapshot_worker_allocated` (the
413    /// multi-snapshot path): fill 1024 tid entries for snapshot
414    /// index 0, call with a non-matching tid, assert `ExceedsCap`.
415    #[test]
416    fn snapshot_worker_allocated_saturated_scan_returns_exceeds_cap() {
417        let mut m = empty_payload();
418        for i in 0..MAX_SCAN_INDEX {
419            push_ok_tid(&mut m, i, (1_000_000 + i) as f64);
420        }
421        let outcome = snapshot_worker_allocated(&m, 0, 42);
422        assert!(
423            matches!(outcome, ThreadLookup::ExceedsCap),
424            "saturated multi-snapshot scan without match must return ExceedsCap"
425        );
426    }
427
428    /// `snapshot_worker_allocated` with an empty metric list must
429    /// return `TidAbsent` — parallel to the single-snapshot path.
430    #[test]
431    fn snapshot_worker_allocated_empty_returns_tid_absent() {
432        let m = empty_payload();
433        assert!(matches!(
434            snapshot_worker_allocated(&m, 0, 42),
435            ThreadLookup::TidAbsent
436        ));
437    }
438}