ktstr/scenario/sample/stats.rs
1//! Stats-JSON-axis projection for [`SampleSeries`].
2//!
3//! Each [`Sample`](super::Sample) carries an optional `scx_stats` JSON
4//! value captured from a scx_stats request issued just BEFORE the
5//! freeze rendezvous. This module exposes the closure-based
6//! [`SampleSeries::stats`] projection (manual path access via
7//! [`StatsValue`]) and the auto-discovering
8//! [`SampleSeries::stats_path`] → [`StatsPathProjector`] pair that
9//! walks a stats sub-tree, enumerates object keys, and projects each
10//! as `SeriesField<u64>` / `SeriesField<i64>` / `SeriesField<f64>`.
11//!
12//! Orthogonal to [`super::bpf`]: the stats axis sources its values
13//! from the userspace scheduler's `scx_stats` JSON; the BPF axis
14//! sources from kernel-side BPF state. Tests typically use both.
15//!
16//! ## Counter semantics are scheduler-defined (cumulative vs per-read delta)
17//!
18//! ktstr issues ONE fresh `scx_stats` request per periodic snapshot
19//! and stores the response verbatim — it never accumulates or diffs a
20//! field across snapshots. Whether a field is CUMULATIVE (monotonic
21//! since scheduler start) or a DELTA since the previous reader request
22//! is decided by the scheduler's stats implementation, not by ktstr.
23//! Some schedulers delta their metrics per reader request; because
24//! ktstr issues one request per snapshot, each sample of such a field
25//! is the change since the PREVIOUS snapshot, not a running total.
26//!
27//! This dictates the per-phase reduction. For a CUMULATIVE field the
28//! phase total is last − first
29//! ([`counter_delta_per_phase`](crate::assert::temporal::SeriesField::counter_delta_per_phase)).
30//! For a DELTA-per-request field that reduction is wrong — it diffs two
31//! deltas; the phase total is the SUM of the per-snapshot deltas in the
32//! phase. There is no built-in per-phase sum today, so group with
33//! [`by_stimulus_phase`](crate::scenario::sample::SampleSeries::by_stimulus_phase)
34//! and sum each phase's values by hand. Mind the boundary: a
35//! per-snapshot delta covers the interval since the previous snapshot,
36//! so the FIRST delta inside a phase spans the phase boundary and
37//! carries the tail of the prior phase. Know your scheduler's
38//! convention before choosing the reduction.
39
40use crate::assert::temporal::SeriesField;
41use crate::scenario::snapshot::{JsonField, SnapshotResult, stats_path};
42
43use super::{SampleSeries, build_series_field};
44
45impl SampleSeries {
46 /// Project the series along the stats axis. The closure
47 /// receives each sample's stats JSON (when present) and
48 /// returns a [`SnapshotResult<T>`]. Samples whose `stats` is
49 /// `Err(reason)` get a `Err(MissingStats { tag, reason })` slot —
50 /// temporal assertions surface that as a per-sample
51 /// missing-stats failure rather than vacuously skipping it,
52 /// so a coverage gap is never silent and the operator sees
53 /// the *why* (no scheduler binary configured, relay timed
54 /// out, scheduler returned errno, etc.).
55 ///
56 /// `label` is owned (`impl Into<String>`) and matches the
57 /// shape of [`Self::bpf`] — pass a literal or a runtime-built
58 /// `String` for auto-discovered keys.
59 pub fn stats<T, F>(&self, label: impl Into<String>, project: F) -> SeriesField<T>
60 where
61 F: Fn(StatsValue<'_>) -> SnapshotResult<T>,
62 {
63 build_series_field(&self.rows, label, |row| match row.stats.as_ref() {
64 Ok(v) => project(StatsValue { value: v }),
65 Err(reason) => Err(crate::scenario::snapshot::SnapshotError::MissingStats {
66 tag: row.tag.clone(),
67 reason: reason.clone(),
68 }),
69 })
70 }
71
72 /// Project the live scheduler's stats JSON field at `path` as
73 /// `u64`. Per-row equivalent of `series.stats(label, |s|
74 /// s.get(path).as_u64())` with the boilerplate elided. Mirrors
75 /// [`Self::bpf_live_u64`] for naming parity across axes.
76 ///
77 /// **Why "live" applies — per-request freshness, not a buffer.**
78 /// Each periodic snapshot issues a FRESH `scx_stats` request
79 /// just before the freeze rendezvous fires; the response in
80 /// `row.stats` came from whichever scheduler was alive at
81 /// request-issue time. There is no relay buffer of "the last
82 /// stats we saw" — a stale-pre-swap response cannot land in
83 /// a post-swap sample. After `Op::ReplaceScheduler` the host
84 /// reconnects to the new scheduler's `scx_stats` endpoint
85 /// before the next periodic boundary issues its request, so
86 /// post-swap samples carry the new scheduler's data. The
87 /// `_live` suffix matches the BPF axis naming for cross-axis
88 /// vocabulary consistency AND describes the actual freshness
89 /// guarantee — same semantic across both axes.
90 pub fn stats_live_u64(&self, path: &str) -> SeriesField<u64> {
91 let path_owned = path.to_string();
92 self.stats(path_owned.clone(), move |s| s.get(&path_owned).as_u64())
93 }
94
95 /// Sibling of [`Self::stats_live_u64`] projecting as `i64`.
96 pub fn stats_live_i64(&self, path: &str) -> SeriesField<i64> {
97 let path_owned = path.to_string();
98 self.stats(path_owned.clone(), move |s| s.get(&path_owned).as_i64())
99 }
100
101 /// Sibling of [`Self::stats_live_u64`] projecting as `f64`.
102 pub fn stats_live_f64(&self, path: &str) -> SeriesField<f64> {
103 let path_owned = path.to_string();
104 self.stats(path_owned.clone(), move |s| s.get(&path_owned).as_f64())
105 }
106
107 /// Auto-project a stats-JSON sub-tree. The returned
108 /// [`StatsPathProjector`] resolves the tree at sample 0 and
109 /// exposes object keys via `.key(name)` (for nested layer /
110 /// cgroup objects) or `.field(name)` (for scalar leaves).
111 /// `path` may be empty — `series.stats_path("")` projects from
112 /// the root and is the canonical entry for system-level stats
113 /// fields like `busy`, `antistall`, `system_cpu_util_ewma`,
114 /// etc.
115 pub fn stats_path<'a>(&'a self, path: &str) -> StatsPathProjector<'a> {
116 StatsPathProjector {
117 series: self,
118 path: path.to_string(),
119 }
120 }
121}
122
123/// Newtype carrier handed to the [`SampleSeries::stats`] closure.
124/// Wraps a borrowed [`serde_json::Value`] and exposes [`Self::get`]
125/// as a thin facade over [`stats_path`] so the closure body reads
126/// `s.get("layers.batch.util").as_f64()` without an explicit
127/// import. The `.get(path)` name mirrors
128/// [`crate::scenario::snapshot::SnapshotField::get`] and
129/// [`crate::scenario::snapshot::JsonField::get`] so test authors
130/// see one navigator vocabulary across every accessor surface.
131#[derive(Debug, Clone, Copy)]
132pub struct StatsValue<'a> {
133 value: &'a serde_json::Value,
134}
135
136impl<'a> StatsValue<'a> {
137 /// Underlying JSON value.
138 pub fn raw(&self) -> &'a serde_json::Value {
139 self.value
140 }
141
142 /// Walk along a dotted path. Empty path returns the root.
143 pub fn get(&self, path: &str) -> JsonField<'a> {
144 stats_path(self.value, path)
145 }
146}
147
148/// Auto-projector handle returned by [`SampleSeries::stats_path`].
149/// Walks a stats sub-tree per sample and exposes scalar / nested
150/// projections for the keys at that level.
151pub struct StatsPathProjector<'a> {
152 series: &'a SampleSeries,
153 path: String,
154}
155
156impl<'a> StatsPathProjector<'a> {
157 /// Project a JSON key under the resolved path as `u64`.
158 pub fn field_u64(&self, key: &str) -> SeriesField<u64> {
159 let full_path = join_paths(&self.path, key);
160 self.series
161 .stats(key, move |sv| sv.get(&full_path).as_u64())
162 }
163
164 /// Project a JSON key under the resolved path as `i64`.
165 pub fn field_i64(&self, key: &str) -> SeriesField<i64> {
166 let full_path = join_paths(&self.path, key);
167 self.series
168 .stats(key, move |sv| sv.get(&full_path).as_i64())
169 }
170
171 /// Project a JSON key under the resolved path as `f64`.
172 pub fn field_f64(&self, key: &str) -> SeriesField<f64> {
173 let full_path = join_paths(&self.path, key);
174 self.series
175 .stats(key, move |sv| sv.get(&full_path).as_f64())
176 }
177
178 /// Return a sub-projector rooted under `key`. Composable —
179 /// `series.stats_path("layers").key("batch").field_f64("util")`
180 /// drills into the per-layer scheduler stats one segment at a
181 /// time without each call site re-typing the full dotted
182 /// path.
183 pub fn key(&self, key: &str) -> StatsPathProjector<'a> {
184 StatsPathProjector {
185 series: self.series,
186 path: join_paths(&self.path, key),
187 }
188 }
189
190 /// Discover the JSON object keys of the resolved path, unioned across
191 /// ALL samples (sorted, deduplicated). Empty ONLY when no sample
192 /// resolves the path to an object.
193 ///
194 /// Discovery spans every row rather than sample 0 alone: a
195 /// scheduler-defined `scx_stats` object can be absent or `Err` in
196 /// sample 0 (the first capture often predates the scheduler's first
197 /// stats emit) while later samples carry it; reading only sample 0
198 /// would silently return no keys and blind a "assert over every
199 /// scx_stats counter" blanket projection.
200 pub fn key_names(&self) -> Vec<String> {
201 let mut names: std::collections::BTreeSet<String> = std::collections::BTreeSet::new();
202 for row in &self.series.rows {
203 let Ok(stats) = row.stats.as_ref() else {
204 continue;
205 };
206 let resolved = stats_path(stats, &self.path);
207 if let Some(serde_json::Value::Object(map)) = resolved.raw() {
208 names.extend(map.keys().cloned());
209 }
210 }
211 names.into_iter().collect()
212 }
213
214 /// Project every object key that resolves as `u64` for at
215 /// least one sample. Iterates [`Self::key_names`], calls
216 /// [`Self::field_u64`] for each, and keeps the entries whose
217 /// resulting [`SeriesField`] has at least one `Ok` value —
218 /// non-numeric leaves (strings, nested objects, floats) drop
219 /// out.
220 pub fn u64_fields(&self) -> Vec<(String, SeriesField<u64>)> {
221 self.key_names()
222 .into_iter()
223 .filter_map(|name| {
224 let field = self.field_u64(&name);
225 // Bind the predicate result and drop the
226 // values_iter borrow before moving `field`.
227 let any_ok = field.values_iter().any(|r| r.is_ok());
228 any_ok.then_some((name, field))
229 })
230 .collect()
231 }
232
233 /// Project every object key that resolves as `f64` for at
234 /// least one sample. Mirrors [`Self::u64_fields`] using
235 /// [`Self::field_f64`].
236 pub fn f64_fields(&self) -> Vec<(String, SeriesField<f64>)> {
237 self.key_names()
238 .into_iter()
239 .filter_map(|name| {
240 let field = self.field_f64(&name);
241 let any_ok = field.values_iter().any(|r| r.is_ok());
242 any_ok.then_some((name, field))
243 })
244 .collect()
245 }
246}
247
248fn join_paths(base: &str, leaf: &str) -> String {
249 if base.is_empty() {
250 leaf.to_string()
251 } else if leaf.is_empty() {
252 base.to_string()
253 } else {
254 format!("{base}.{leaf}")
255 }
256}
257
258#[cfg(test)]
259mod tests {
260 use super::*;
261 use crate::monitor::btf_render::{RenderedMember, RenderedValue};
262 use crate::monitor::dump::{FailureDumpMap, FailureDumpReport, SCHEMA_SINGLE};
263
264 fn synthetic_report(value: u64) -> FailureDumpReport {
265 let bss_value = RenderedValue::Struct {
266 type_name: Some(".bss".into()),
267 members: vec![
268 RenderedMember {
269 name: "nr_dispatched".into(),
270 value: RenderedValue::Uint { bits: 64, value },
271 },
272 RenderedMember {
273 name: "stall".into(),
274 value: RenderedValue::Uint { bits: 8, value: 0 },
275 },
276 ],
277 };
278 let bss_map = FailureDumpMap {
279 name: "scx_obj.bss".into(),
280 map_kva: 0,
281 map_type: 2,
282 value_size: 16,
283 max_entries: 1,
284 value: Some(bss_value),
285 entries: Vec::new(),
286 array_entries: Vec::new(),
287 percpu_entries: Vec::new(),
288 percpu_hash_entries: Vec::new(),
289 arena: None,
290 ringbuf: None,
291 stack_trace: None,
292 fd_array: None,
293 error: None,
294 };
295 FailureDumpReport {
296 schema: SCHEMA_SINGLE.to_string(),
297 active_map_kvas: Vec::new(),
298 maps: vec![bss_map],
299 ..Default::default()
300 }
301 }
302
303 fn synthetic_stats(busy: f64) -> serde_json::Value {
304 serde_json::json!({
305 "busy": busy,
306 "antistall": 0,
307 "layers": {
308 "batch": { "util": busy * 0.5 }
309 }
310 })
311 }
312
313 /// Build stats payloads with mixed shapes so the
314 /// `StatsPathProjector` auto-projectors exercise the same
315 /// "at least one Ok" filter on the JSON axis:
316 /// - `busy`: Number — projects Ok as u64 and f64.
317 /// - `count`: Number — projects Ok as u64 and f64.
318 /// - `ratio`: Number(float) — projects Ok as f64;
319 /// u64 errors when the float has a non-zero
320 /// fraction (see `json_to_u64`).
321 /// - `name`: String("nope") — never coerces to numeric.
322 fn mixed_stats(busy: u64, count: u64) -> serde_json::Value {
323 serde_json::json!({
324 "busy": busy,
325 "count": count,
326 "ratio": 0.5,
327 "name": "nope",
328 })
329 }
330
331 #[test]
332 fn stats_projection_handles_missing_stats_as_error() {
333 use crate::scenario::snapshot::{DrainedSnapshotEntry, MissingStatsReason};
334 let drained = vec![
335 DrainedSnapshotEntry {
336 tag: "periodic_000".to_string(),
337 report: synthetic_report(10),
338 stats: Ok(synthetic_stats(50.0)),
339 elapsed_ms: Some(100),
340 boundary_offset_ms: None,
341 step_index: None,
342 },
343 DrainedSnapshotEntry {
344 tag: "periodic_001".to_string(),
345 report: synthetic_report(20),
346 stats: Err(MissingStatsReason::NoSchedulerBinary),
347 elapsed_ms: Some(200),
348 boundary_offset_ms: None,
349 step_index: None,
350 },
351 ];
352 let series = SampleSeries::from_drained_typed(drained, None);
353 let field: SeriesField<f64> = series.stats("busy", |s| s.get("busy").as_f64());
354 let outcomes: Vec<SnapshotResult<f64>> = field.values_iter().cloned().collect();
355 assert_eq!(outcomes.len(), 2);
356 assert_eq!(
357 outcomes[0].as_ref().copied(),
358 Ok(50.0),
359 "sample with stats present must project the `busy` field verbatim"
360 );
361 match &outcomes[1] {
362 Err(crate::scenario::snapshot::SnapshotError::MissingStats { tag, reason }) => {
363 assert_eq!(
364 tag, "periodic_001",
365 "MissingStats tag must identify the sample whose stats slot was Err"
366 );
367 assert_eq!(
368 reason,
369 &MissingStatsReason::NoSchedulerBinary,
370 "MissingStats reason must propagate the carried MissingStatsReason verbatim"
371 );
372 }
373 other => panic!(
374 "sample with stats=Err must surface SnapshotError::MissingStats, got {other:?}"
375 ),
376 }
377 }
378
379 #[test]
380 fn stats_path_projector_field_f64_extracts_root_scalar() {
381 let drained = vec![
382 (
383 "periodic_000".to_string(),
384 synthetic_report(0),
385 Some(synthetic_stats(50.0)),
386 Some(100),
387 ),
388 (
389 "periodic_001".to_string(),
390 synthetic_report(0),
391 Some(synthetic_stats(60.0)),
392 Some(200),
393 ),
394 ];
395 let series = SampleSeries::from_drained(drained, None);
396 let field = series.stats_path("").field_f64("busy");
397 let values: Vec<f64> = field
398 .values_iter()
399 .filter_map(|v| v.as_ref().ok().copied())
400 .collect();
401 assert_eq!(values.len(), 2);
402 assert!((values[0] - 50.0).abs() < f64::EPSILON);
403 assert!((values[1] - 60.0).abs() < f64::EPSILON);
404 }
405
406 #[test]
407 fn stats_path_projector_key_names_at_root() {
408 let drained = vec![(
409 "periodic_000".to_string(),
410 synthetic_report(0),
411 Some(synthetic_stats(50.0)),
412 Some(100),
413 )];
414 let series = SampleSeries::from_drained(drained, None);
415 let names = series.stats_path("").key_names();
416 assert!(names.contains(&"busy".to_string()));
417 assert!(names.contains(&"layers".to_string()));
418 }
419
420 #[test]
421 fn stats_path_projector_nested_key_drills_in() {
422 let drained = vec![(
423 "periodic_000".to_string(),
424 synthetic_report(0),
425 Some(synthetic_stats(50.0)),
426 Some(100),
427 )];
428 let series = SampleSeries::from_drained(drained, None);
429 // Note: drilling deeper than 2 levels via key() chain works
430 // because key() returns the same kind of projector.
431 let field = series.stats_path("layers").key("batch").field_f64("util");
432 let values: Vec<f64> = field
433 .values_iter()
434 .filter_map(|v| v.as_ref().ok().copied())
435 .collect();
436 assert_eq!(values.len(), 1);
437 assert!((values[0] - 25.0).abs() < f64::EPSILON);
438 }
439
440 /// `StatsPathProjector::u64_fields` keeps JSON keys whose
441 /// per-sample projection lands at least one Ok and drops keys
442 /// whose every projection errors. `busy` / `count` are integer
443 /// numbers (Ok u64); `ratio` is `0.5` and lands TypeMismatch
444 /// on every sample (`json_to_u64` rejects non-integer floats);
445 /// `name` is a string that does not parse — also Err.
446 #[test]
447 fn stats_path_projector_u64_fields_keeps_at_least_one_ok_excludes_all_err() {
448 let drained = vec![
449 (
450 "periodic_000".to_string(),
451 synthetic_report(0),
452 Some(mixed_stats(50, 7)),
453 Some(100),
454 ),
455 (
456 "periodic_001".to_string(),
457 synthetic_report(0),
458 Some(mixed_stats(60, 9)),
459 Some(200),
460 ),
461 ];
462 let series = SampleSeries::from_drained(drained, None);
463 let fields = series.stats_path("").u64_fields();
464 let names: Vec<&str> = fields.iter().map(|(n, _)| n.as_str()).collect();
465 assert!(
466 names.contains(&"busy"),
467 "Number(integer) key must be kept: {names:?}",
468 );
469 assert!(
470 names.contains(&"count"),
471 "Number(integer) key must be kept: {names:?}",
472 );
473 assert!(
474 !names.contains(&"ratio"),
475 "Number(non-integer float) errors on every u64 projection — must be excluded: {names:?}",
476 );
477 assert!(
478 !names.contains(&"name"),
479 "String key must be excluded — every u64 projection errors: {names:?}",
480 );
481 // Pin the projected VALUES, not just the kept names: the kept
482 // name is the bare key label, but the value resolves through
483 // join_paths(path, key) — a wrong-leaf or wrong-cast bug keeps
484 // the name while corrupting the value, which a name-only check
485 // misses (mirror of bpf.rs).
486 let busy = fields.iter().find(|(n, _)| n == "busy").expect("busy kept");
487 assert_eq!(
488 busy.1
489 .values_iter()
490 .filter_map(|r| r.as_ref().ok().copied())
491 .collect::<Vec<u64>>(),
492 vec![50u64, 60],
493 );
494 let count = fields
495 .iter()
496 .find(|(n, _)| n == "count")
497 .expect("count kept");
498 assert_eq!(
499 count
500 .1
501 .values_iter()
502 .filter_map(|r| r.as_ref().ok().copied())
503 .collect::<Vec<u64>>(),
504 vec![7u64, 9],
505 );
506 }
507
508 /// Mirror of the u64 test for `f64_fields`. `busy`, `count`,
509 /// and `ratio` all coerce to f64; only `name` errors. Pins the
510 /// "at least one Ok" filter for the f64 axis distinctly from
511 /// the u64 axis.
512 #[test]
513 fn stats_path_projector_f64_fields_keeps_at_least_one_ok_excludes_all_err() {
514 let drained = vec![
515 (
516 "periodic_000".to_string(),
517 synthetic_report(0),
518 Some(mixed_stats(50, 7)),
519 Some(100),
520 ),
521 (
522 "periodic_001".to_string(),
523 synthetic_report(0),
524 Some(mixed_stats(60, 9)),
525 Some(200),
526 ),
527 ];
528 let series = SampleSeries::from_drained(drained, None);
529 let fields = series.stats_path("").f64_fields();
530 let names: Vec<&str> = fields.iter().map(|(n, _)| n.as_str()).collect();
531 assert!(
532 names.contains(&"busy"),
533 "Number(integer) coerces to f64 — must be kept: {names:?}",
534 );
535 assert!(
536 names.contains(&"count"),
537 "Number(integer) coerces to f64 — must be kept: {names:?}",
538 );
539 assert!(
540 names.contains(&"ratio"),
541 "Number(non-integer float) coerces to f64 — must be kept: {names:?}",
542 );
543 assert!(
544 !names.contains(&"name"),
545 "String key must be excluded — every f64 projection errors: {names:?}",
546 );
547 // Pin the projected f64 VALUES. `count` (a second integer key)
548 // catches a wrong-leaf bug; `ratio` (the only non-integer
549 // fraction) catches a fraction-mangling bug — neither is value-
550 // checked elsewhere.
551 let getf = |n: &str| -> Vec<f64> {
552 fields
553 .iter()
554 .find(|(name, _)| name == n)
555 .unwrap_or_else(|| panic!("{n} kept"))
556 .1
557 .values_iter()
558 .filter_map(|r| r.as_ref().ok().copied())
559 .collect()
560 };
561 let approx = |got: Vec<f64>, want: &[f64]| {
562 assert_eq!(got.len(), want.len());
563 for (g, w) in got.iter().zip(want) {
564 assert!((g - w).abs() < f64::EPSILON, "got {got:?} want {want:?}");
565 }
566 };
567 approx(getf("busy"), &[50.0, 60.0]);
568 approx(getf("count"), &[7.0, 9.0]);
569 approx(getf("ratio"), &[0.5, 0.5]);
570 }
571
572 /// Empty series — no rows to discover JSON keys from, so
573 /// `key_names()` returns an empty vec and both auto-projectors
574 /// yield empty results without panicking. Pins the "no first
575 /// row" branch in `StatsPathProjector::key_names`.
576 #[test]
577 fn stats_path_projector_field_helpers_empty_series_yields_empty_vec() {
578 let series = SampleSeries::empty();
579 let u64s = series.stats_path("").u64_fields();
580 assert!(
581 u64s.is_empty(),
582 "empty series must yield empty u64_fields, got {} entries",
583 u64s.len(),
584 );
585 let f64s = series.stats_path("").f64_fields();
586 assert!(
587 f64s.is_empty(),
588 "empty series must yield empty f64_fields, got {} entries",
589 f64s.len(),
590 );
591 }
592}