ktstr/scenario/sample/
stats.rs

1//! Stats-JSON-axis projection for [`SampleSeries`].
2//!
3//! Each [`Sample`](super::Sample) carries an optional `scx_stats` JSON
4//! value captured from a scx_stats request issued just BEFORE the
5//! freeze rendezvous. This module exposes the closure-based
6//! [`SampleSeries::stats`] projection (manual path access via
7//! [`StatsValue`]) and the auto-discovering
8//! [`SampleSeries::stats_path`] → [`StatsPathProjector`] pair that
9//! walks a stats sub-tree, enumerates object keys, and projects each
10//! as `SeriesField<u64>` / `SeriesField<i64>` / `SeriesField<f64>`.
11//!
12//! Orthogonal to [`super::bpf`]: the stats axis sources its values
13//! from the userspace scheduler's `scx_stats` JSON; the BPF axis
14//! sources from kernel-side BPF state. Tests typically use both.
15//!
16//! ## Counter semantics are scheduler-defined (cumulative vs per-read delta)
17//!
18//! ktstr issues ONE fresh `scx_stats` request per periodic snapshot
19//! and stores the response verbatim — it never accumulates or diffs a
20//! field across snapshots. Whether a field is CUMULATIVE (monotonic
21//! since scheduler start) or a DELTA since the previous reader request
22//! is decided by the scheduler's stats implementation, not by ktstr.
23//! Some schedulers delta their metrics per reader request; because
24//! ktstr issues one request per snapshot, each sample of such a field
25//! is the change since the PREVIOUS snapshot, not a running total.
26//!
27//! This dictates the per-phase reduction. For a CUMULATIVE field the
28//! phase total is last − first
29//! ([`counter_delta_per_phase`](crate::assert::temporal::SeriesField::counter_delta_per_phase)).
30//! For a DELTA-per-request field that reduction is wrong — it diffs two
31//! deltas; the phase total is the SUM of the per-snapshot deltas in the
32//! phase. There is no built-in per-phase sum today, so group with
33//! [`by_stimulus_phase`](crate::scenario::sample::SampleSeries::by_stimulus_phase)
34//! and sum each phase's values by hand. Mind the boundary: a
35//! per-snapshot delta covers the interval since the previous snapshot,
36//! so the FIRST delta inside a phase spans the phase boundary and
37//! carries the tail of the prior phase. Know your scheduler's
38//! convention before choosing the reduction.
39
40use crate::assert::temporal::SeriesField;
41use crate::scenario::snapshot::{JsonField, SnapshotResult, stats_path};
42
43use super::{SampleSeries, build_series_field};
44
45impl SampleSeries {
46    /// Project the series along the stats axis. The closure
47    /// receives each sample's stats JSON (when present) and
48    /// returns a [`SnapshotResult<T>`]. Samples whose `stats` is
49    /// `Err(reason)` get a `Err(MissingStats { tag, reason })` slot —
50    /// temporal assertions surface that as a per-sample
51    /// missing-stats failure rather than vacuously skipping it,
52    /// so a coverage gap is never silent and the operator sees
53    /// the *why* (no scheduler binary configured, relay timed
54    /// out, scheduler returned errno, etc.).
55    ///
56    /// `label` is owned (`impl Into<String>`) and matches the
57    /// shape of [`Self::bpf`] — pass a literal or a runtime-built
58    /// `String` for auto-discovered keys.
59    pub fn stats<T, F>(&self, label: impl Into<String>, project: F) -> SeriesField<T>
60    where
61        F: Fn(StatsValue<'_>) -> SnapshotResult<T>,
62    {
63        build_series_field(&self.rows, label, |row| match row.stats.as_ref() {
64            Ok(v) => project(StatsValue { value: v }),
65            Err(reason) => Err(crate::scenario::snapshot::SnapshotError::MissingStats {
66                tag: row.tag.clone(),
67                reason: reason.clone(),
68            }),
69        })
70    }
71
72    /// Project the live scheduler's stats JSON field at `path` as
73    /// `u64`. Per-row equivalent of `series.stats(label, |s|
74    /// s.get(path).as_u64())` with the boilerplate elided. Mirrors
75    /// [`Self::bpf_live_u64`] for naming parity across axes.
76    ///
77    /// **Why "live" applies — per-request freshness, not a buffer.**
78    /// Each periodic snapshot issues a FRESH `scx_stats` request
79    /// just before the freeze rendezvous fires; the response in
80    /// `row.stats` came from whichever scheduler was alive at
81    /// request-issue time. There is no relay buffer of "the last
82    /// stats we saw" — a stale-pre-swap response cannot land in
83    /// a post-swap sample. After `Op::ReplaceScheduler` the host
84    /// reconnects to the new scheduler's `scx_stats` endpoint
85    /// before the next periodic boundary issues its request, so
86    /// post-swap samples carry the new scheduler's data. The
87    /// `_live` suffix matches the BPF axis naming for cross-axis
88    /// vocabulary consistency AND describes the actual freshness
89    /// guarantee — same semantic across both axes.
90    pub fn stats_live_u64(&self, path: &str) -> SeriesField<u64> {
91        let path_owned = path.to_string();
92        self.stats(path_owned.clone(), move |s| s.get(&path_owned).as_u64())
93    }
94
95    /// Sibling of [`Self::stats_live_u64`] projecting as `i64`.
96    pub fn stats_live_i64(&self, path: &str) -> SeriesField<i64> {
97        let path_owned = path.to_string();
98        self.stats(path_owned.clone(), move |s| s.get(&path_owned).as_i64())
99    }
100
101    /// Sibling of [`Self::stats_live_u64`] projecting as `f64`.
102    pub fn stats_live_f64(&self, path: &str) -> SeriesField<f64> {
103        let path_owned = path.to_string();
104        self.stats(path_owned.clone(), move |s| s.get(&path_owned).as_f64())
105    }
106
107    /// Auto-project a stats-JSON sub-tree. The returned
108    /// [`StatsPathProjector`] resolves the tree at sample 0 and
109    /// exposes object keys via `.key(name)` (for nested layer /
110    /// cgroup objects) or `.field(name)` (for scalar leaves).
111    /// `path` may be empty — `series.stats_path("")` projects from
112    /// the root and is the canonical entry for system-level stats
113    /// fields like `busy`, `antistall`, `system_cpu_util_ewma`,
114    /// etc.
115    pub fn stats_path<'a>(&'a self, path: &str) -> StatsPathProjector<'a> {
116        StatsPathProjector {
117            series: self,
118            path: path.to_string(),
119        }
120    }
121}
122
123/// Newtype carrier handed to the [`SampleSeries::stats`] closure.
124/// Wraps a borrowed [`serde_json::Value`] and exposes [`Self::get`]
125/// as a thin facade over [`stats_path`] so the closure body reads
126/// `s.get("layers.batch.util").as_f64()` without an explicit
127/// import. The `.get(path)` name mirrors
128/// [`crate::scenario::snapshot::SnapshotField::get`] and
129/// [`crate::scenario::snapshot::JsonField::get`] so test authors
130/// see one navigator vocabulary across every accessor surface.
131#[derive(Debug, Clone, Copy)]
132pub struct StatsValue<'a> {
133    value: &'a serde_json::Value,
134}
135
136impl<'a> StatsValue<'a> {
137    /// Underlying JSON value.
138    pub fn raw(&self) -> &'a serde_json::Value {
139        self.value
140    }
141
142    /// Walk along a dotted path. Empty path returns the root.
143    pub fn get(&self, path: &str) -> JsonField<'a> {
144        stats_path(self.value, path)
145    }
146}
147
148/// Auto-projector handle returned by [`SampleSeries::stats_path`].
149/// Walks a stats sub-tree per sample and exposes scalar / nested
150/// projections for the keys at that level.
151pub struct StatsPathProjector<'a> {
152    series: &'a SampleSeries,
153    path: String,
154}
155
156impl<'a> StatsPathProjector<'a> {
157    /// Project a JSON key under the resolved path as `u64`.
158    pub fn field_u64(&self, key: &str) -> SeriesField<u64> {
159        let full_path = join_paths(&self.path, key);
160        self.series
161            .stats(key, move |sv| sv.get(&full_path).as_u64())
162    }
163
164    /// Project a JSON key under the resolved path as `i64`.
165    pub fn field_i64(&self, key: &str) -> SeriesField<i64> {
166        let full_path = join_paths(&self.path, key);
167        self.series
168            .stats(key, move |sv| sv.get(&full_path).as_i64())
169    }
170
171    /// Project a JSON key under the resolved path as `f64`.
172    pub fn field_f64(&self, key: &str) -> SeriesField<f64> {
173        let full_path = join_paths(&self.path, key);
174        self.series
175            .stats(key, move |sv| sv.get(&full_path).as_f64())
176    }
177
178    /// Return a sub-projector rooted under `key`. Composable —
179    /// `series.stats_path("layers").key("batch").field_f64("util")`
180    /// drills into the per-layer scheduler stats one segment at a
181    /// time without each call site re-typing the full dotted
182    /// path.
183    pub fn key(&self, key: &str) -> StatsPathProjector<'a> {
184        StatsPathProjector {
185            series: self.series,
186            path: join_paths(&self.path, key),
187        }
188    }
189
190    /// Discover the JSON object keys of the resolved path, unioned across
191    /// ALL samples (sorted, deduplicated). Empty ONLY when no sample
192    /// resolves the path to an object.
193    ///
194    /// Discovery spans every row rather than sample 0 alone: a
195    /// scheduler-defined `scx_stats` object can be absent or `Err` in
196    /// sample 0 (the first capture often predates the scheduler's first
197    /// stats emit) while later samples carry it; reading only sample 0
198    /// would silently return no keys and blind a "assert over every
199    /// scx_stats counter" blanket projection.
200    pub fn key_names(&self) -> Vec<String> {
201        let mut names: std::collections::BTreeSet<String> = std::collections::BTreeSet::new();
202        for row in &self.series.rows {
203            let Ok(stats) = row.stats.as_ref() else {
204                continue;
205            };
206            let resolved = stats_path(stats, &self.path);
207            if let Some(serde_json::Value::Object(map)) = resolved.raw() {
208                names.extend(map.keys().cloned());
209            }
210        }
211        names.into_iter().collect()
212    }
213
214    /// Project every object key that resolves as `u64` for at
215    /// least one sample. Iterates [`Self::key_names`], calls
216    /// [`Self::field_u64`] for each, and keeps the entries whose
217    /// resulting [`SeriesField`] has at least one `Ok` value —
218    /// non-numeric leaves (strings, nested objects, floats) drop
219    /// out.
220    pub fn u64_fields(&self) -> Vec<(String, SeriesField<u64>)> {
221        self.key_names()
222            .into_iter()
223            .filter_map(|name| {
224                let field = self.field_u64(&name);
225                // Bind the predicate result and drop the
226                // values_iter borrow before moving `field`.
227                let any_ok = field.values_iter().any(|r| r.is_ok());
228                any_ok.then_some((name, field))
229            })
230            .collect()
231    }
232
233    /// Project every object key that resolves as `f64` for at
234    /// least one sample. Mirrors [`Self::u64_fields`] using
235    /// [`Self::field_f64`].
236    pub fn f64_fields(&self) -> Vec<(String, SeriesField<f64>)> {
237        self.key_names()
238            .into_iter()
239            .filter_map(|name| {
240                let field = self.field_f64(&name);
241                let any_ok = field.values_iter().any(|r| r.is_ok());
242                any_ok.then_some((name, field))
243            })
244            .collect()
245    }
246}
247
248fn join_paths(base: &str, leaf: &str) -> String {
249    if base.is_empty() {
250        leaf.to_string()
251    } else if leaf.is_empty() {
252        base.to_string()
253    } else {
254        format!("{base}.{leaf}")
255    }
256}
257
258#[cfg(test)]
259mod tests {
260    use super::*;
261    use crate::monitor::btf_render::{RenderedMember, RenderedValue};
262    use crate::monitor::dump::{FailureDumpMap, FailureDumpReport, SCHEMA_SINGLE};
263
264    fn synthetic_report(value: u64) -> FailureDumpReport {
265        let bss_value = RenderedValue::Struct {
266            type_name: Some(".bss".into()),
267            members: vec![
268                RenderedMember {
269                    name: "nr_dispatched".into(),
270                    value: RenderedValue::Uint { bits: 64, value },
271                },
272                RenderedMember {
273                    name: "stall".into(),
274                    value: RenderedValue::Uint { bits: 8, value: 0 },
275                },
276            ],
277        };
278        let bss_map = FailureDumpMap {
279            name: "scx_obj.bss".into(),
280            map_kva: 0,
281            map_type: 2,
282            value_size: 16,
283            max_entries: 1,
284            value: Some(bss_value),
285            entries: Vec::new(),
286            array_entries: Vec::new(),
287            percpu_entries: Vec::new(),
288            percpu_hash_entries: Vec::new(),
289            arena: None,
290            ringbuf: None,
291            stack_trace: None,
292            fd_array: None,
293            error: None,
294        };
295        FailureDumpReport {
296            schema: SCHEMA_SINGLE.to_string(),
297            active_map_kvas: Vec::new(),
298            maps: vec![bss_map],
299            ..Default::default()
300        }
301    }
302
303    fn synthetic_stats(busy: f64) -> serde_json::Value {
304        serde_json::json!({
305            "busy": busy,
306            "antistall": 0,
307            "layers": {
308                "batch": { "util": busy * 0.5 }
309            }
310        })
311    }
312
313    /// Build stats payloads with mixed shapes so the
314    /// `StatsPathProjector` auto-projectors exercise the same
315    /// "at least one Ok" filter on the JSON axis:
316    ///   - `busy`: Number — projects Ok as u64 and f64.
317    ///   - `count`: Number — projects Ok as u64 and f64.
318    ///   - `ratio`: Number(float) — projects Ok as f64;
319    ///     u64 errors when the float has a non-zero
320    ///     fraction (see `json_to_u64`).
321    ///   - `name`: String("nope") — never coerces to numeric.
322    fn mixed_stats(busy: u64, count: u64) -> serde_json::Value {
323        serde_json::json!({
324            "busy": busy,
325            "count": count,
326            "ratio": 0.5,
327            "name": "nope",
328        })
329    }
330
331    #[test]
332    fn stats_projection_handles_missing_stats_as_error() {
333        use crate::scenario::snapshot::{DrainedSnapshotEntry, MissingStatsReason};
334        let drained = vec![
335            DrainedSnapshotEntry {
336                tag: "periodic_000".to_string(),
337                report: synthetic_report(10),
338                stats: Ok(synthetic_stats(50.0)),
339                elapsed_ms: Some(100),
340                boundary_offset_ms: None,
341                step_index: None,
342            },
343            DrainedSnapshotEntry {
344                tag: "periodic_001".to_string(),
345                report: synthetic_report(20),
346                stats: Err(MissingStatsReason::NoSchedulerBinary),
347                elapsed_ms: Some(200),
348                boundary_offset_ms: None,
349                step_index: None,
350            },
351        ];
352        let series = SampleSeries::from_drained_typed(drained, None);
353        let field: SeriesField<f64> = series.stats("busy", |s| s.get("busy").as_f64());
354        let outcomes: Vec<SnapshotResult<f64>> = field.values_iter().cloned().collect();
355        assert_eq!(outcomes.len(), 2);
356        assert_eq!(
357            outcomes[0].as_ref().copied(),
358            Ok(50.0),
359            "sample with stats present must project the `busy` field verbatim"
360        );
361        match &outcomes[1] {
362            Err(crate::scenario::snapshot::SnapshotError::MissingStats { tag, reason }) => {
363                assert_eq!(
364                    tag, "periodic_001",
365                    "MissingStats tag must identify the sample whose stats slot was Err"
366                );
367                assert_eq!(
368                    reason,
369                    &MissingStatsReason::NoSchedulerBinary,
370                    "MissingStats reason must propagate the carried MissingStatsReason verbatim"
371                );
372            }
373            other => panic!(
374                "sample with stats=Err must surface SnapshotError::MissingStats, got {other:?}"
375            ),
376        }
377    }
378
379    #[test]
380    fn stats_path_projector_field_f64_extracts_root_scalar() {
381        let drained = vec![
382            (
383                "periodic_000".to_string(),
384                synthetic_report(0),
385                Some(synthetic_stats(50.0)),
386                Some(100),
387            ),
388            (
389                "periodic_001".to_string(),
390                synthetic_report(0),
391                Some(synthetic_stats(60.0)),
392                Some(200),
393            ),
394        ];
395        let series = SampleSeries::from_drained(drained, None);
396        let field = series.stats_path("").field_f64("busy");
397        let values: Vec<f64> = field
398            .values_iter()
399            .filter_map(|v| v.as_ref().ok().copied())
400            .collect();
401        assert_eq!(values.len(), 2);
402        assert!((values[0] - 50.0).abs() < f64::EPSILON);
403        assert!((values[1] - 60.0).abs() < f64::EPSILON);
404    }
405
406    #[test]
407    fn stats_path_projector_key_names_at_root() {
408        let drained = vec![(
409            "periodic_000".to_string(),
410            synthetic_report(0),
411            Some(synthetic_stats(50.0)),
412            Some(100),
413        )];
414        let series = SampleSeries::from_drained(drained, None);
415        let names = series.stats_path("").key_names();
416        assert!(names.contains(&"busy".to_string()));
417        assert!(names.contains(&"layers".to_string()));
418    }
419
420    #[test]
421    fn stats_path_projector_nested_key_drills_in() {
422        let drained = vec![(
423            "periodic_000".to_string(),
424            synthetic_report(0),
425            Some(synthetic_stats(50.0)),
426            Some(100),
427        )];
428        let series = SampleSeries::from_drained(drained, None);
429        // Note: drilling deeper than 2 levels via key() chain works
430        // because key() returns the same kind of projector.
431        let field = series.stats_path("layers").key("batch").field_f64("util");
432        let values: Vec<f64> = field
433            .values_iter()
434            .filter_map(|v| v.as_ref().ok().copied())
435            .collect();
436        assert_eq!(values.len(), 1);
437        assert!((values[0] - 25.0).abs() < f64::EPSILON);
438    }
439
440    /// `StatsPathProjector::u64_fields` keeps JSON keys whose
441    /// per-sample projection lands at least one Ok and drops keys
442    /// whose every projection errors. `busy` / `count` are integer
443    /// numbers (Ok u64); `ratio` is `0.5` and lands TypeMismatch
444    /// on every sample (`json_to_u64` rejects non-integer floats);
445    /// `name` is a string that does not parse — also Err.
446    #[test]
447    fn stats_path_projector_u64_fields_keeps_at_least_one_ok_excludes_all_err() {
448        let drained = vec![
449            (
450                "periodic_000".to_string(),
451                synthetic_report(0),
452                Some(mixed_stats(50, 7)),
453                Some(100),
454            ),
455            (
456                "periodic_001".to_string(),
457                synthetic_report(0),
458                Some(mixed_stats(60, 9)),
459                Some(200),
460            ),
461        ];
462        let series = SampleSeries::from_drained(drained, None);
463        let fields = series.stats_path("").u64_fields();
464        let names: Vec<&str> = fields.iter().map(|(n, _)| n.as_str()).collect();
465        assert!(
466            names.contains(&"busy"),
467            "Number(integer) key must be kept: {names:?}",
468        );
469        assert!(
470            names.contains(&"count"),
471            "Number(integer) key must be kept: {names:?}",
472        );
473        assert!(
474            !names.contains(&"ratio"),
475            "Number(non-integer float) errors on every u64 projection — must be excluded: {names:?}",
476        );
477        assert!(
478            !names.contains(&"name"),
479            "String key must be excluded — every u64 projection errors: {names:?}",
480        );
481        // Pin the projected VALUES, not just the kept names: the kept
482        // name is the bare key label, but the value resolves through
483        // join_paths(path, key) — a wrong-leaf or wrong-cast bug keeps
484        // the name while corrupting the value, which a name-only check
485        // misses (mirror of bpf.rs).
486        let busy = fields.iter().find(|(n, _)| n == "busy").expect("busy kept");
487        assert_eq!(
488            busy.1
489                .values_iter()
490                .filter_map(|r| r.as_ref().ok().copied())
491                .collect::<Vec<u64>>(),
492            vec![50u64, 60],
493        );
494        let count = fields
495            .iter()
496            .find(|(n, _)| n == "count")
497            .expect("count kept");
498        assert_eq!(
499            count
500                .1
501                .values_iter()
502                .filter_map(|r| r.as_ref().ok().copied())
503                .collect::<Vec<u64>>(),
504            vec![7u64, 9],
505        );
506    }
507
508    /// Mirror of the u64 test for `f64_fields`. `busy`, `count`,
509    /// and `ratio` all coerce to f64; only `name` errors. Pins the
510    /// "at least one Ok" filter for the f64 axis distinctly from
511    /// the u64 axis.
512    #[test]
513    fn stats_path_projector_f64_fields_keeps_at_least_one_ok_excludes_all_err() {
514        let drained = vec![
515            (
516                "periodic_000".to_string(),
517                synthetic_report(0),
518                Some(mixed_stats(50, 7)),
519                Some(100),
520            ),
521            (
522                "periodic_001".to_string(),
523                synthetic_report(0),
524                Some(mixed_stats(60, 9)),
525                Some(200),
526            ),
527        ];
528        let series = SampleSeries::from_drained(drained, None);
529        let fields = series.stats_path("").f64_fields();
530        let names: Vec<&str> = fields.iter().map(|(n, _)| n.as_str()).collect();
531        assert!(
532            names.contains(&"busy"),
533            "Number(integer) coerces to f64 — must be kept: {names:?}",
534        );
535        assert!(
536            names.contains(&"count"),
537            "Number(integer) coerces to f64 — must be kept: {names:?}",
538        );
539        assert!(
540            names.contains(&"ratio"),
541            "Number(non-integer float) coerces to f64 — must be kept: {names:?}",
542        );
543        assert!(
544            !names.contains(&"name"),
545            "String key must be excluded — every f64 projection errors: {names:?}",
546        );
547        // Pin the projected f64 VALUES. `count` (a second integer key)
548        // catches a wrong-leaf bug; `ratio` (the only non-integer
549        // fraction) catches a fraction-mangling bug — neither is value-
550        // checked elsewhere.
551        let getf = |n: &str| -> Vec<f64> {
552            fields
553                .iter()
554                .find(|(name, _)| name == n)
555                .unwrap_or_else(|| panic!("{n} kept"))
556                .1
557                .values_iter()
558                .filter_map(|r| r.as_ref().ok().copied())
559                .collect()
560        };
561        let approx = |got: Vec<f64>, want: &[f64]| {
562            assert_eq!(got.len(), want.len());
563            for (g, w) in got.iter().zip(want) {
564                assert!((g - w).abs() < f64::EPSILON, "got {got:?} want {want:?}");
565            }
566        };
567        approx(getf("busy"), &[50.0, 60.0]);
568        approx(getf("count"), &[7.0, 9.0]);
569        approx(getf("ratio"), &[0.5, 0.5]);
570    }
571
572    /// Empty series — no rows to discover JSON keys from, so
573    /// `key_names()` returns an empty vec and both auto-projectors
574    /// yield empty results without panicking. Pins the "no first
575    /// row" branch in `StatsPathProjector::key_names`.
576    #[test]
577    fn stats_path_projector_field_helpers_empty_series_yields_empty_vec() {
578        let series = SampleSeries::empty();
579        let u64s = series.stats_path("").u64_fields();
580        assert!(
581            u64s.is_empty(),
582            "empty series must yield empty u64_fields, got {} entries",
583            u64s.len(),
584        );
585        let f64s = series.stats_path("").f64_fields();
586        assert!(
587            f64s.is_empty(),
588            "empty series must yield empty f64_fields, got {} entries",
589            f64s.len(),
590        );
591    }
592}