ktstr/test_support/probe_metrics.rs
1//! Flat-metric lookup helpers for the jemalloc-probe integration
2//! tests.
3//!
4//! Lives in the crate (not inline in `tests/jemalloc_probe_tests.rs`)
5//! so the logic is reachable from `#[cfg(test)]` unit tests. The
6//! probe test binary registers `#[ktstr_test]` entries, which
7//! activates the early-dispatch ctor's `--list` intercept — any
8//! plain `#[test]` fn declared in that binary is hidden from
9//! nextest's discovery (see the long comment at the head of
10//! `tests/jemalloc_alloc_worker_exit_codes.rs`). Moving the
11//! helpers here lets the ExceedsCap branch and friends be
12//! pinned by lib-crate unit tests that run under `cargo nextest
13//! run --lib` without the ctor path.
14//!
15//! All items are `pub` so the integration test file at
16//! `tests/jemalloc_probe_tests.rs` can import them through the
17//! `test_support` re-export surface.
18
19use super::payload::{Metric, PayloadMetrics};
20
21/// Outcome of scanning the flat metric list for a tid-keyed thread
22/// entry. Distinguishes "tid not present" from "tid present but
23/// `allocated_bytes` missing" AND from "probe emitted more than
24/// [`MAX_SCAN_INDEX`] contiguous threads without the caller's
25/// tid appearing in the prefix" — so a caller can issue a precise
26/// diagnostic instead of a blanket "not found".
27pub enum ThreadLookup {
28 /// `snapshots.{snap_idx}.threads.N.Ok.tid == worker_tid` and
29 /// `snapshots.{snap_idx}.threads.N.Ok.allocated_bytes` are both
30 /// present. Returns the observed counter plus the companion
31 /// `deallocated_bytes` (if emitted).
32 Found {
33 allocated_bytes: u64,
34 deallocated_bytes: Option<u64>,
35 },
36 /// Probe emitted a `snapshots.{snap_idx}.threads.N.{Ok,Err}.tid`
37 /// matching `worker_tid`, but no
38 /// `snapshots.{snap_idx}.threads.N.Ok.allocated_bytes` sibling.
39 /// Either the tid was on the `Err` arm (`.Err.tid`), where the
40 /// probe hit an error on that thread and `error`/`error_kind`
41 /// replace the counter fields, or it was on the `Ok` arm but the
42 /// `allocated_bytes` sibling was absent.
43 MissingAllocatedBytes,
44 /// No `snapshots.{snap_idx}.threads.N.{Ok,Err}.tid == worker_tid`
45 /// entry in the flat metric list. Probe did not visit the worker
46 /// at all.
47 TidAbsent,
48 /// The flat metric list contained at least [`MAX_SCAN_INDEX`]
49 /// contiguous `snapshots.{snap_idx}.threads.N.{Ok,Err}.tid`
50 /// entries, none of which matched `worker_tid`, and the scan hit
51 /// the cap
52 /// before reaching the array terminator. The worker's tid may
53 /// exist at a later index and be invisible to the scan.
54 /// Distinct from `TidAbsent` — this outcome means the lookup is
55 /// inconclusive, not that the probe definitively skipped the
56 /// worker.
57 ExceedsCap,
58}
59
60/// Safety bound on the `snapshots.*.threads.N.tid` scan in
61/// [`lookup_thread`], [`snapshot_worker_allocated`], [`thread_count`],
62/// and [`snapshot_count`]. Realistic probe runs see at most a few
63/// dozen threads in a single-allocator worker process; hitting this
64/// cap indicates either an unexpectedly wide target or a flat-metric
65/// schema change that broke the terminator convention.
66pub const MAX_SCAN_INDEX: usize = 1024;
67
68/// Find a metric by exact name. Returns `None` if absent.
69pub fn find_metric<'a>(metrics: &'a PayloadMetrics, key: &str) -> Option<&'a Metric> {
70 metrics.metrics.iter().find(|m| m.name == key)
71}
72
73/// Does the flat metric list contain a metric with this exact name?
74/// Thin wrapper around [`find_metric`] for the common existence
75/// check — avoids forcing every call site to spell `.is_some()`.
76pub fn has_metric(metrics: &PayloadMetrics, key: &str) -> bool {
77 find_metric(metrics, key).is_some()
78}
79
80/// Fetch a metric by exact name and return its numeric value as a
81/// `u64`. Returns `None` if the metric is absent. Thin wrapper
82/// around [`find_metric`] + `value as u64` for the common
83/// numeric-lookup shape.
84///
85/// # `f64` → `u64` precision
86///
87/// JSON numbers parse into the probe's flat-metric list as `f64`
88/// (serde_json's number type). Integer values round-trip through
89/// `f64` without precision loss only up to `2^53`
90/// (`9_007_199_254_740_992`); above that bound, adjacent `u64`
91/// values collapse onto the same `f64` and `value as u64` loses
92/// the low-order bits. The probe's emitted counters
93/// (`allocated_bytes`, `deallocated_bytes`, tid numbers, snapshot
94/// timestamps in seconds) are in practice far below this
95/// threshold on realistic workloads: a 64-bit byte counter would
96/// require >8 PiB of total-allocated memory, and Linux pids are
97/// capped at `2^22`. The bound is therefore a soft invariant —
98/// consumers should NOT feed arbitrary externally-controlled
99/// values through this helper without a prior range check.
100///
101/// A `debug_assert!` on the same bound catches the invariant
102/// locally so a future metric that genuinely exceeds `2^53` lights
103/// up in a debug build before the truncation silently corrupts a
104/// downstream comparison; release builds trust the soft invariant
105/// and perform the `as u64` cast unconditionally.
106pub fn find_metric_u64(metrics: &PayloadMetrics, key: &str) -> Option<u64> {
107 find_metric(metrics, key).map(|m| {
108 debug_assert!(
109 m.value.is_finite() && m.value >= 0.0 && m.value <= (1u64 << 53) as f64,
110 "metric {:?} value {} outside the f64→u64 lossless range \
111 [0, 2^53]; the `as u64` cast will truncate silently. \
112 Either range-check externally-sourced input before \
113 landing it in the flat metrics list, or consume the \
114 metric via `.value` (f64) instead of this u64 helper.",
115 m.name,
116 m.value,
117 );
118 m.value as u64
119 })
120}
121
122/// Walk `0..cap` applying `key_fn(i)` to form a metric name and
123/// count how many consecutive indices yield a present metric.
124/// Stops at the first miss — the probe's `walk_json_leaves`
125/// flattening yields indices 0..N contiguously, so the first gap is
126/// the array terminator. Returns the count, which may be `cap` if
127/// every index below the bound is present (inconclusive — the
128/// caller should treat `cap` as "saturated scan, real count may be
129/// larger").
130pub fn count_indexed_metrics<F>(metrics: &PayloadMetrics, cap: usize, key_fn: F) -> usize
131where
132 F: Fn(usize) -> String,
133{
134 let mut n = 0;
135 for i in 0..cap {
136 if find_metric(metrics, &key_fn(i)).is_some() {
137 n += 1;
138 } else {
139 break;
140 }
141 }
142 n
143}
144
145/// Extract the `allocated_bytes` / `deallocated_bytes` values for
146/// `worker_tid` from snapshot 0 in the flat metric list produced by
147/// `walk_json_leaves` over the probe's JSON output.
148///
149/// `ThreadResult` is externally-tagged so the probe emits
150/// `{"pid":P,"snapshots":[{"timestamp_unix_sec":T,"threads":[{"Ok":{"tid":T,"allocated_bytes":A,"deallocated_bytes":D,...}}, {"Err":{"tid":T,"error":...,"error_kind":...}}, ...]}, ...]}`
151/// which `walk_json_leaves` flattens per array index into contiguous
152/// keys `snapshots.0.threads.0.Ok.tid`, `snapshots.0.threads.1.Err.tid`,
153/// … with no gaps. Each index carries exactly one variant wrapper.
154/// The scan stops at the first index where neither `.Ok.tid` nor
155/// `.Err.tid` exists (the natural array terminator) and returns
156/// [`ThreadLookup::TidAbsent`]. If the cap is reached without hitting
157/// the terminator AND without matching `worker_tid`, returns
158/// [`ThreadLookup::ExceedsCap`]. If the matching tid is on the `Err`
159/// arm (no `allocated_bytes` sibling), returns
160/// [`ThreadLookup::MissingAllocatedBytes`].
161pub fn lookup_thread(metrics: &PayloadMetrics, worker_tid: i32) -> ThreadLookup {
162 let worker_tid_f64 = worker_tid as f64;
163 for i in 0..MAX_SCAN_INDEX {
164 let ok_tid_key = format!("snapshots.0.threads.{i}.Ok.tid");
165 let err_tid_key = format!("snapshots.0.threads.{i}.Err.tid");
166 let (tid_m, is_ok) = match find_metric(metrics, &ok_tid_key) {
167 Some(m) => (m, true),
168 None => match find_metric(metrics, &err_tid_key) {
169 Some(m) => (m, false),
170 None => return ThreadLookup::TidAbsent,
171 },
172 };
173 if tid_m.value == worker_tid_f64 {
174 if !is_ok {
175 return ThreadLookup::MissingAllocatedBytes;
176 }
177 let alloc_key = format!("snapshots.0.threads.{i}.Ok.allocated_bytes");
178 let dealloc_key = format!("snapshots.0.threads.{i}.Ok.deallocated_bytes");
179 let allocated_bytes = match find_metric(metrics, &alloc_key).map(|m| m.value as u64) {
180 Some(v) => v,
181 None => return ThreadLookup::MissingAllocatedBytes,
182 };
183 let deallocated_bytes = find_metric(metrics, &dealloc_key).map(|m| m.value as u64);
184 return ThreadLookup::Found {
185 allocated_bytes,
186 deallocated_bytes,
187 };
188 }
189 }
190 // Loop ran to completion — every index 0..MAX_SCAN_INDEX had a
191 // tid entry (Ok or Err), and none matched. A contiguous-array
192 // terminator would have early-returned `TidAbsent`, so the cap
193 // was hit with data remaining. Surface the inconclusive outcome
194 // distinctly from genuine absence.
195 ThreadLookup::ExceedsCap
196}
197
198/// Extract `snapshots.{snap_idx}.threads[*].allocated_bytes` for the
199/// thread whose tid matches `worker_tid`. Returns [`ThreadLookup`]
200/// so callers distinguish "tid absent" from "cap hit before tid
201/// seen" from "allocated_bytes sibling missing" — parallel to
202/// [`lookup_thread`], which covers the single-snapshot path.
203pub fn snapshot_worker_allocated(
204 metrics: &PayloadMetrics,
205 snap_idx: usize,
206 worker_tid: i32,
207) -> ThreadLookup {
208 let worker_tid_f64 = worker_tid as f64;
209 for j in 0..MAX_SCAN_INDEX {
210 let ok_tid_key = format!("snapshots.{snap_idx}.threads.{j}.Ok.tid");
211 let err_tid_key = format!("snapshots.{snap_idx}.threads.{j}.Err.tid");
212 let (tid_m, is_ok) = match find_metric(metrics, &ok_tid_key) {
213 Some(m) => (m, true),
214 None => match find_metric(metrics, &err_tid_key) {
215 Some(m) => (m, false),
216 None => return ThreadLookup::TidAbsent,
217 },
218 };
219 if tid_m.value == worker_tid_f64 {
220 if !is_ok {
221 return ThreadLookup::MissingAllocatedBytes;
222 }
223 let alloc_key = format!("snapshots.{snap_idx}.threads.{j}.Ok.allocated_bytes");
224 let dealloc_key = format!("snapshots.{snap_idx}.threads.{j}.Ok.deallocated_bytes");
225 let allocated_bytes = match find_metric(metrics, &alloc_key).map(|m| m.value as u64) {
226 Some(v) => v,
227 None => return ThreadLookup::MissingAllocatedBytes,
228 };
229 let deallocated_bytes = find_metric(metrics, &dealloc_key).map(|m| m.value as u64);
230 return ThreadLookup::Found {
231 allocated_bytes,
232 deallocated_bytes,
233 };
234 }
235 }
236 ThreadLookup::ExceedsCap
237}
238
239/// Count the number of `snapshots.0.threads.N.{Ok,Err}.tid` entries
240/// in the flat metric list, capped at [`MAX_SCAN_INDEX`]. Each index
241/// carries exactly one variant wrapper (`Ok` or `Err`); the count
242/// terminates at the first index where neither exists.
243pub fn thread_count(metrics: &PayloadMetrics) -> usize {
244 let mut n = 0;
245 for i in 0..MAX_SCAN_INDEX {
246 let ok_key = format!("snapshots.0.threads.{i}.Ok.tid");
247 let err_key = format!("snapshots.0.threads.{i}.Err.tid");
248 if find_metric(metrics, &ok_key).is_some() || find_metric(metrics, &err_key).is_some() {
249 n += 1;
250 } else {
251 break;
252 }
253 }
254 n
255}
256
257/// Count the number of `snapshots.N.timestamp_unix_sec` entries in
258/// the flat metric list, capped at [`MAX_SCAN_INDEX`].
259pub fn snapshot_count(metrics: &PayloadMetrics) -> usize {
260 count_indexed_metrics(metrics, MAX_SCAN_INDEX, |i| {
261 format!("snapshots.{i}.timestamp_unix_sec")
262 })
263}
264
265/// Flatten the full `(name, value)` pair list for diagnostic
266/// rendering inside error messages. Returned as an owned
267/// `Vec<(&str, f64)>` so call sites spell the diagnostic as a single
268/// `{:?}` formatter argument instead of re-typing the
269/// `.iter().map(...).collect()` chain at every site.
270///
271/// Intended for "probe returned nothing we expected" error paths —
272/// when a lookup helper ([`lookup_thread`], [`snapshot_worker_allocated`],
273/// [`find_metric_u64`]) returns a miss, dumping the observed flat metric
274/// list into the failure message is usually the fastest triage step.
275pub fn flat_metrics_dump(metrics: &PayloadMetrics) -> Vec<(&str, f64)> {
276 metrics
277 .metrics
278 .iter()
279 .map(|m| (m.name.as_str(), m.value))
280 .collect()
281}
282
283#[cfg(test)]
284mod tests {
285 use super::*;
286
287 fn metric(name: &str, value: f64) -> Metric {
288 use super::super::payload::{MetricStream, Polarity};
289 Metric {
290 name: name.to_owned(),
291 value,
292 polarity: Polarity::Unknown,
293 unit: String::new(),
294 stream: MetricStream::Stdout,
295 }
296 }
297
298 fn empty_payload() -> PayloadMetrics {
299 PayloadMetrics {
300 payload_index: 0,
301 metrics: Vec::new(),
302 exit_code: 0,
303 }
304 }
305
306 fn push_ok_tid(metrics: &mut PayloadMetrics, idx: usize, tid: f64) {
307 metrics
308 .metrics
309 .push(metric(&format!("snapshots.0.threads.{idx}.Ok.tid"), tid));
310 }
311
312 fn push_err_tid(metrics: &mut PayloadMetrics, idx: usize, tid: f64) {
313 metrics
314 .metrics
315 .push(metric(&format!("snapshots.0.threads.{idx}.Err.tid"), tid));
316 }
317
318 fn push_alloc(metrics: &mut PayloadMetrics, idx: usize, alloc: f64) {
319 metrics.metrics.push(metric(
320 &format!("snapshots.0.threads.{idx}.Ok.allocated_bytes"),
321 alloc,
322 ));
323 }
324
325 /// Empty flat-metric list → no tid entries at all → terminator
326 /// at index 0 → `TidAbsent`.
327 #[test]
328 fn lookup_thread_empty_metrics_returns_tid_absent() {
329 let m = empty_payload();
330 assert!(matches!(lookup_thread(&m, 42), ThreadLookup::TidAbsent));
331 }
332
333 /// Matching tid with an `allocated_bytes` sibling → `Found`
334 /// carrying the observed counter.
335 #[test]
336 fn lookup_thread_matching_tid_returns_found() {
337 let mut m = empty_payload();
338 push_ok_tid(&mut m, 0, 42.0);
339 push_alloc(&mut m, 0, 1_048_576.0);
340 match lookup_thread(&m, 42) {
341 ThreadLookup::Found {
342 allocated_bytes,
343 deallocated_bytes,
344 } => {
345 assert_eq!(allocated_bytes, 1_048_576);
346 assert_eq!(deallocated_bytes, None);
347 }
348 _ => panic!("expected ThreadLookup::Found"),
349 }
350 }
351
352 /// Matching tid but no `allocated_bytes` sibling → the probe hit
353 /// an error on that thread → `MissingAllocatedBytes`.
354 #[test]
355 fn lookup_thread_missing_allocated_bytes_returns_missing_variant() {
356 let mut m = empty_payload();
357 push_ok_tid(&mut m, 0, 42.0);
358 // no matching `.allocated_bytes`
359 assert!(matches!(
360 lookup_thread(&m, 42),
361 ThreadLookup::MissingAllocatedBytes
362 ));
363 }
364
365 /// Matching tid on the Err arm — `ThreadResult::Err` carries the
366 /// tid but no `allocated_bytes` sibling by design. Direct
367 /// detection via the `.Err.tid` path discriminator returns
368 /// `MissingAllocatedBytes` so callers route the Err arm through
369 /// the same diagnostic path as a malformed Ok entry.
370 #[test]
371 fn lookup_thread_err_arm_returns_missing_allocated_bytes() {
372 let mut m = empty_payload();
373 push_err_tid(&mut m, 0, 42.0);
374 assert!(matches!(
375 lookup_thread(&m, 42),
376 ThreadLookup::MissingAllocatedBytes
377 ));
378 }
379
380 /// A contiguous run of tids that does NOT include the caller's
381 /// tid, but terminates BEFORE the cap → natural-terminator path
382 /// → `TidAbsent` (not `ExceedsCap`).
383 #[test]
384 fn lookup_thread_contiguous_prefix_without_match_returns_tid_absent() {
385 let mut m = empty_payload();
386 for i in 0..10 {
387 push_ok_tid(&mut m, i, (1000 + i) as f64);
388 }
389 assert!(matches!(lookup_thread(&m, 42), ThreadLookup::TidAbsent));
390 }
391
392 /// The full-cap case: fill indices `0..MAX_SCAN_INDEX`
393 /// with non-matching tids, then call lookup_thread with a tid
394 /// that isn't in the list. The scan runs all 1024 iterations,
395 /// never hits a terminator, never matches, and therefore must
396 /// return `ExceedsCap` — distinct from `TidAbsent`.
397 #[test]
398 fn lookup_thread_saturated_scan_without_match_returns_exceeds_cap() {
399 let mut m = empty_payload();
400 for i in 0..MAX_SCAN_INDEX {
401 // tids chosen so none is equal to the probe tid below.
402 push_ok_tid(&mut m, i, (1_000_000 + i) as f64);
403 }
404 let target_tid: i32 = 42;
405 let outcome = lookup_thread(&m, target_tid);
406 assert!(
407 matches!(outcome, ThreadLookup::ExceedsCap),
408 "saturated scan without match must return ExceedsCap; got other variant"
409 );
410 }
411
412 /// Same invariant for `snapshot_worker_allocated` (the
413 /// multi-snapshot path): fill 1024 tid entries for snapshot
414 /// index 0, call with a non-matching tid, assert `ExceedsCap`.
415 #[test]
416 fn snapshot_worker_allocated_saturated_scan_returns_exceeds_cap() {
417 let mut m = empty_payload();
418 for i in 0..MAX_SCAN_INDEX {
419 push_ok_tid(&mut m, i, (1_000_000 + i) as f64);
420 }
421 let outcome = snapshot_worker_allocated(&m, 0, 42);
422 assert!(
423 matches!(outcome, ThreadLookup::ExceedsCap),
424 "saturated multi-snapshot scan without match must return ExceedsCap"
425 );
426 }
427
428 /// `snapshot_worker_allocated` with an empty metric list must
429 /// return `TidAbsent` — parallel to the single-snapshot path.
430 #[test]
431 fn snapshot_worker_allocated_empty_returns_tid_absent() {
432 let m = empty_payload();
433 assert!(matches!(
434 snapshot_worker_allocated(&m, 0, 42),
435 ThreadLookup::TidAbsent
436 ));
437 }
438}