ktstr/assert/reductions.rs
1use super::*;
2
3/// Check that workers only ran on CPUs in `expected`.
4///
5/// Any worker that used a CPU outside the expected set produces a
6/// failure with the unexpected CPU IDs listed.
7///
8/// ```
9/// # use ktstr::assert::assert_isolation;
10/// # use ktstr::workload::WorkerReport;
11/// # use std::collections::BTreeSet;
12/// # let report = WorkerReport {
13/// # tid: 1, cpus_used: [0, 1].into_iter().collect(),
14/// # work_units: 100, cpu_time_ns: 1_000_000, wall_time_ns: 2_000_000,
15/// # off_cpu_ns: 1_000_000, migration_count: 0, migrations: vec![],
16/// # max_gap_ms: 0, max_gap_cpu: 0, max_gap_at_ms: 0,
17/// # wake_latencies_ns: vec![], wake_sample_total: 0,
18/// # iteration_costs_ns: vec![], iteration_cost_sample_total: 0,
19/// # iterations: 0,
20/// # schedstat_run_delay_ns: 0, schedstat_run_count: 0,
21/// # schedstat_cpu_time_ns: 0,
22/// # completed: true,
23/// # numa_pages: std::collections::BTreeMap::new(),
24/// # vmstat_numa_pages_migrated: 0,
25/// # exit_info: None,
26/// # is_messenger: false,
27/// # ..Default::default()
28/// # };
29/// let expected: BTreeSet<usize> = [0, 1, 2].into_iter().collect();
30/// assert!(assert_isolation(&[report], &expected).is_pass());
31/// ```
32pub fn assert_isolation(reports: &[WorkerReport], expected: &BTreeSet<usize>) -> AssertResult {
33 let mut r = AssertResult::pass();
34 for w in reports {
35 let bad: BTreeSet<usize> = w.cpus_used.difference(expected).copied().collect();
36 if !bad.is_empty() {
37 r.record_fail(AssertDetail::new(
38 DetailKind::Isolation,
39 format!("tid {} ran on unexpected CPUs {:?}", w.tid, bad),
40 ));
41 }
42 }
43 r
44}
45
46/// Nearest-rank percentile of a sorted slice (`p` in `[0.0, 1.0]`).
47///
48/// Returns the value at index `ceil(n * p) - 1`, clamped into
49/// `[0, n-1]`. For `n = 100` and `p = 0.99` this is `sorted[98]` (the
50/// 99th element in 1-indexed order), not `sorted[99]` (the max). The
51/// previous formulation, `ceil(n * 0.99)` without the `-1`, was
52/// off-by-one and returned the max for `n = 100`.
53///
54/// # Preconditions
55///
56/// `sorted` must be non-decreasing. The function indexes by rank
57/// without checking order, so an unsorted input silently returns
58/// the value at the computed index — a meaningless number. A
59/// `debug_assert!` enforces this in debug builds; release builds
60/// skip the check (the production callers sort immediately upstream
61/// — `cgroup_stats` and `assert_benchmarks` both
62/// `sorted.sort_unstable()` before this call — so the runtime
63/// guard is unnecessary in production paths).
64///
65/// An empty slice yields `0` (the caller should short-circuit
66/// before invoking).
67pub(crate) fn percentile(sorted: &[u64], p: f64) -> u64 {
68 if sorted.is_empty() {
69 return 0;
70 }
71 debug_assert!(
72 sorted.windows(2).all(|w| w[0] <= w[1]),
73 "percentile() requires sorted input; got slice with out-of-order pair",
74 );
75 let n = sorted.len();
76 let idx = ((n as f64 * p).ceil() as usize)
77 .saturating_sub(1)
78 .min(n - 1);
79 sorted[idx]
80}
81
82/// Build per-cgroup telemetry (pure measurement, no assertions) from
83/// worker reports. This is the SINGLE telemetry builder on the assertion
84/// path: `AssertPlan::assert_cgroup` calls it unconditionally and
85/// [`assert_not_starved`] wraps it with the default fairness checks, so
86/// per-cgroup [`CgroupStats`] is never gated behind whether a worker-check
87/// assertion was configured. Empty `reports` yield a `num_workers == 0`
88/// `CgroupStats` (the reduces below collapse to 0.0/0), so a declared
89/// cgroup that collected no reports surfaces as a zero-worker entry rather
90/// than silently vanishing from [`ScenarioStats::cgroups`].
91pub fn cgroup_stats(reports: &[WorkerReport]) -> CgroupStats {
92 let cpus: BTreeSet<usize> = reports
93 .iter()
94 .flat_map(|w| w.cpus_used.iter().copied())
95 .collect();
96 let pcts: Vec<f64> = reports
97 .iter()
98 .filter(|w| w.wall_time_ns > 0)
99 .map(|w| w.off_cpu_ns as f64 / w.wall_time_ns as f64 * 100.0)
100 .collect();
101
102 // None when no worker had measurable wall time (pcts empty):
103 // off-CPU% is undefined, and a not-measured cgroup must not read
104 // as a measured 0% / spread-0 (perfectly fair) one. Some(_)
105 // otherwise, including a real measured zero.
106 let min = pcts.iter().cloned().reduce(f64::min);
107 let max = pcts.iter().cloned().reduce(f64::max);
108 let avg = if pcts.is_empty() {
109 None
110 } else {
111 Some(pcts.iter().sum::<f64>() / pcts.len() as f64)
112 };
113 let spread = match (min, max) {
114 (Some(lo), Some(hi)) => Some(hi - lo),
115 _ => None,
116 };
117
118 let worst_gap = reports.iter().max_by_key(|w| w.max_gap_ms);
119 let (gap_ms, gap_cpu) = worst_gap
120 .map(|w| (w.max_gap_ms, w.max_gap_cpu))
121 .unwrap_or((0, 0));
122
123 // Compute benchmarking stats from worker reports.
124 let all_latencies: Vec<u64> = reports
125 .iter()
126 .flat_map(|w| w.wake_latencies_ns.iter().copied())
127 .collect();
128 let (p99_us, median_us, lat_cv) = if all_latencies.is_empty() {
129 (0.0, 0.0, 0.0)
130 } else {
131 let mut sorted = all_latencies.clone();
132 sorted.sort_unstable();
133 let p99 = percentile(&sorted, 0.99) as f64 / 1000.0;
134 // Median routes through `percentile(sorted, 0.5)` so the
135 // nearest-rank algorithm matches every other percentile in
136 // the project (p99, schbench's `lat99`, the BPF latency
137 // histograms). A bare `sorted[n/2]` would pick the upper of
138 // the two middle samples for even `n`, while `percentile`
139 // returns the value at `ceil(n * 0.5) - 1` — the lower of
140 // the two middles — and that lower-bound convention is what
141 // the docs on [`CgroupStats::median_wake_latency_us`] and
142 // the schbench cross-reference promise.
143 let median = percentile(&sorted, 0.5) as f64 / 1000.0;
144 let n = all_latencies.len() as f64;
145 let mean_ns = all_latencies.iter().sum::<u64>() as f64 / n;
146 let cv = if mean_ns > 0.0 {
147 let variance = all_latencies
148 .iter()
149 .map(|&v| (v as f64 - mean_ns).powi(2))
150 .sum::<f64>()
151 / n;
152 variance.sqrt() / mean_ns
153 } else {
154 0.0
155 };
156 (p99, median, cv)
157 };
158
159 // Timer-latency reductions (WorkType::TimerLatency), pooled across the
160 // cgroup's workers exactly like the wake-latency block above but over the
161 // distinct `timer_latencies_ns` reservoir. p999 is the deep RT tail; worst
162 // is the single maximum. All `0.0` when no worker recorded timer samples.
163 let all_timer: Vec<u64> = reports
164 .iter()
165 .flat_map(|w| w.timer_latencies_ns.iter().copied())
166 .collect();
167 let (median_timer, p99_timer, p999_timer, worst_timer) = if all_timer.is_empty() {
168 (0.0, 0.0, 0.0, 0.0)
169 } else {
170 let mut sorted = all_timer.clone();
171 sorted.sort_unstable();
172 (
173 percentile(&sorted, 0.5) as f64 / 1000.0,
174 percentile(&sorted, 0.99) as f64 / 1000.0,
175 percentile(&sorted, 0.999) as f64 / 1000.0,
176 *sorted.last().expect("non-empty: checked above") as f64 / 1000.0,
177 )
178 };
179
180 // saturating folds throughout this builder: pool the per-worker guest-runtime
181 // counters (iterations, migrations, cpu-time-ns, sample totals, numa pages)
182 // overflow-safely. This worker->cgroup fold is the FIRST cross-source sum and
183 // the one most exposed to a corrupt/hostile WorkerReport; a u64::MAX component
184 // clamps to u64::MAX instead of debug-panicking / release-wrapping a per-cgroup
185 // total (and the derived migration-ratio / page-locality) to a silently-wrong
186 // value. saturating_add is exact for every in-range value.
187 let total_iters: u64 = reports
188 .iter()
189 .map(|w| w.iterations)
190 .fold(0u64, u64::saturating_add);
191 let run_delays: Vec<f64> = reports
192 .iter()
193 .map(|w| w.schedstat_run_delay_ns as f64 / 1000.0)
194 .collect();
195 let mean_run_delay = if run_delays.is_empty() {
196 0.0
197 } else {
198 run_delays.iter().sum::<f64>() / run_delays.len() as f64
199 };
200 let worst_run_delay = run_delays.iter().cloned().reduce(f64::max).unwrap_or(0.0);
201
202 let total_mig: u64 = reports
203 .iter()
204 .map(|w| w.migration_count)
205 .fold(0u64, u64::saturating_add);
206 let mig_ratio = migration_ratio_of(total_mig, total_iters);
207
208 // Cross-node page-migration ratio: pages migrated cross-node over the
209 // cgroup's total allocated pages. `vmstat_numa_pages_migrated` is a
210 // system-wide delta each worker captured over its own loop; concurrent
211 // workers observe overlapping deltas, so take the MAX across the cgroup
212 // (summing would inflate by the worker count) over the cgroup-wide
213 // total of allocated pages. Pure measurement — populated whenever NUMA
214 // pages were seen, 0.0 otherwise. (The `max_cross_node_migration_ratio`
215 // CHECK in `AssertPlan::assert_cgroup` recomputes the same raw counts
216 // for its diagnostic; this is the always-on telemetry.)
217 let total_numa_pages: u64 = reports
218 .iter()
219 .map(|w| {
220 w.numa_pages
221 .values()
222 .copied()
223 .fold(0u64, u64::saturating_add)
224 })
225 .fold(0u64, u64::saturating_add);
226 let migrated_pages: u64 = reports
227 .iter()
228 .map(|w| w.vmstat_numa_pages_migrated)
229 .max()
230 .unwrap_or(0);
231 let cross_node_ratio = cross_node_migration_ratio_of(migrated_pages, total_numa_pages);
232
233 // Whole-run taobench engine COUNTER aggregate, pooled across this cgroup's
234 // `WorkType::Taobench` workers: Σ ops, MAX wall window (shared across the
235 // concurrent workers), per `TaobenchStats::merge`. `None` for a cgroup with no
236 // Taobench worker. The run-level cross-cgroup pool + the qps/hit Rate
237 // derivation happen in `populate_run_pooled_taobench`; this is the per-cgroup
238 // raw carrier, the taobench analogue of `total_iterations`. Counters only —
239 // serve latency is per-phase data (the `taobench_serve_*_us_whole` keys union
240 // the `PhaseCgroupStats::taobench` histograms).
241 let taobench_whole = reports
242 .iter()
243 .filter_map(|w| w.taobench_whole.as_ref())
244 .fold(
245 None,
246 |acc: Option<crate::workload::taobench::run::TaobenchStats>, t| {
247 Some(match acc {
248 Some(mut a) => {
249 a.merge(t);
250 a
251 }
252 None => *t,
253 })
254 },
255 );
256
257 CgroupStats {
258 // Empty here; collect_handles labels the entry post-hoc (it has
259 // the cgroup name in scope, this reports-only builder does not).
260 cgroup_name: String::new(),
261 num_workers: reports.len(),
262 num_cpus: cpus.len(),
263 cpus_used: cpus,
264 avg_off_cpu_pct: avg,
265 min_off_cpu_pct: min,
266 max_off_cpu_pct: max,
267 spread,
268 max_gap_ms: gap_ms,
269 max_gap_cpu: gap_cpu,
270 total_migrations: total_mig,
271 migration_ratio: mig_ratio,
272 p99_wake_latency_us: p99_us,
273 median_wake_latency_us: median_us,
274 wake_latency_cv: lat_cv,
275 // `0.0` above is a not-measured sentinel iff no worker recorded a
276 // wake/timer sample; the flag distinguishes that from a measured zero
277 // so the run-level re-pool excludes (not folds in) a no-sample cgroup.
278 wake_measured: !all_latencies.is_empty(),
279 median_timer_latency_us: median_timer,
280 p99_timer_latency_us: p99_timer,
281 p999_timer_latency_us: p999_timer,
282 worst_timer_latency_us: worst_timer,
283 timer_measured: !all_timer.is_empty(),
284 total_iterations: total_iters,
285 total_cpu_time_ns: reports
286 .iter()
287 .map(|w| w.schedstat_cpu_time_ns)
288 .fold(0u64, u64::saturating_add),
289 mean_run_delay_us: mean_run_delay,
290 worst_run_delay_us: worst_run_delay,
291 // Run-delay is measured whenever a worker exists (a worker with `0.0`
292 // delay is a real measured zero); only a worker-less cohort is
293 // not-measured.
294 run_delay_measured: !run_delays.is_empty(),
295 // page_locality requires the expected NUMA node set (the cpuset's
296 // nodes), which this reports-only builder does not have. It is
297 // populated by `AssertPlan::assert_cgroup` when `numa_nodes` is
298 // supplied; left 0.0 here (no NUMA context).
299 page_locality: 0.0,
300 cross_node_migration_ratio: cross_node_ratio,
301 taobench_whole,
302 ext_metrics: BTreeMap::new(),
303 }
304}
305
306/// Migrations per iteration; `0.0` when no iterations ran (a measured-zero,
307/// not a rate over zero). Single-sourced so the per-phase carrier
308/// (`write_carrier_scalars`) and [`cgroup_stats`] (whose `migration_ratio`
309/// feeds the run-level `worst_migration_ratio` fold) compute it identically.
310pub(crate) fn migration_ratio_of(total_migrations: u64, total_iterations: u64) -> f64 {
311 if total_iterations > 0 {
312 total_migrations as f64 / total_iterations as f64
313 } else {
314 0.0
315 }
316}
317
318/// Cross-node migrated pages over the cgroup-wide total allocated pages; `0.0`
319/// when no NUMA pages were seen. Single-sourced with [`cgroup_stats`]. A CHURN
320/// ratio, NOT a bounded `[0,1]` fraction: the numerator is cumulative migration
321/// EVENTS (`/proc/vmstat numa_pages_migrated`, which counts each migration, so a
322/// page can be counted more than once) and the denominator is a residency
323/// SNAPSHOT — so the result can legitimately exceed 1.0 under heavy re-migration.
324pub(crate) fn cross_node_migration_ratio_of(migrated_pages: u64, total_numa_pages: u64) -> f64 {
325 if total_numa_pages > 0 {
326 migrated_pages as f64 / total_numa_pages as f64
327 } else {
328 0.0
329 }
330}
331
332/// Fraction of allocated pages on the expected NUMA nodes; `0.0` when no NUMA
333/// pages were seen. Single-sourced with the per-phase carrier.
334pub(crate) fn page_locality_of(numa_pages_local: u64, numa_pages_total: u64) -> f64 {
335 if numa_pages_total > 0 {
336 numa_pages_local as f64 / numa_pages_total as f64
337 } else {
338 0.0
339 }
340}
341
342/// Iterations per worker; `None` when there are no workers (undefined,
343/// distinct from a measured zero). Single-sourced with
344/// [`CgroupStats::iterations_per_worker`](crate::assert::CgroupStats::iterations_per_worker)
345/// so the run-level WorstLowest fold stays bit-identical.
346pub(crate) fn iterations_per_worker_of(num_workers: usize, total_iterations: u64) -> Option<f64> {
347 if num_workers > 0 {
348 Some(total_iterations as f64 / num_workers as f64)
349 } else {
350 None
351 }
352}
353
354/// Worker iterations per CPU-second (`total_iterations / (total_cpu_time_ns /
355/// 1e9)`); `None` when there are no workers or no on-CPU time captured.
356/// Single-sourced with
357/// [`CgroupStats::iterations_per_cpu_sec`](crate::assert::CgroupStats::iterations_per_cpu_sec).
358pub(crate) fn iterations_per_cpu_sec_of(
359 num_workers: usize,
360 total_cpu_time_ns: u64,
361 total_iterations: u64,
362) -> Option<f64> {
363 if num_workers == 0 || total_cpu_time_ns == 0 {
364 return None;
365 }
366 Some(total_iterations as f64 / (total_cpu_time_ns as f64 / 1e9))
367}
368
369/// Per-phase per-cgroup RAW-component builder — the sibling of [`cgroup_stats`]
370/// that emits [`PhaseCgroupStats`]'s un-reduced components instead of the
371/// reduced ratios/percentiles, so the distributional re-pool recomputes each
372/// aggregate from the pooled components at every level. Every [`CgroupStats`]
373/// reduction re-pools from these fields: avg/min/max/spread off-CPU% from
374/// `off_cpu_pcts`; p99/median/CV from `wake_latencies_ns`; mean/worst run-delay
375/// from `run_delays_ns` (RAW ns, the re-pool divides by 1000); migration_ratio
376/// / iterations_per_cpu_sec / iterations_per_worker from the counters;
377/// page_locality / cross_node_migration_ratio from the numa counters; the
378/// coupled worst gap from the argmax pair; cpus_used / num_cpus from `cpus_used`.
379///
380/// `expected_nodes` is this cgroup's cpuset NUMA-node set (from
381/// [`crate::topology::TestTopology::numa_nodes_for_cpuset`]); `numa_pages_local`
382/// is the page count on those nodes (0 when `None`, mirroring [`cgroup_stats`]
383/// leaving `page_locality` 0.0 without NUMA context — the partition lives with
384/// the caller that has the node set, as [`AssertPlan::assert_cgroup`] does).
385/// The whole-run [`cgroup_stats`] reductions stay the run-level authority; this
386/// feeds the per-phase [`PhaseBucket::per_cgroup`] carrier.
387///
388/// RE-POOL GUARD CONTRACT: the div-by-zero / not-measured guards live in
389/// [`cgroup_stats`], NOT in these raw components — a future re-pool over them
390/// MUST mirror them exactly or ship a NaN/Inf or a not-measured-vs-zero
391/// collapse: `migration_ratio` only when `total_iterations > 0`;
392/// `cross_node_migration_ratio` / `page_locality` only when `numa_pages_total >
393/// 0`; mean/worst run-delay only when `run_delays_ns` is non-empty; and
394/// avg/min/max/spread off-CPU% return None (not 0.0) when `off_cpu_pcts` is
395/// empty (the not-measured state).
396pub(crate) fn phase_cgroup_stats(
397 reports: &[WorkerReport],
398 expected_nodes: Option<&BTreeSet<usize>>,
399) -> PhaseCgroupStats {
400 let cpus_used: BTreeSet<usize> = reports
401 .iter()
402 .flat_map(|w| w.cpus_used.iter().copied())
403 .collect();
404 // Per-worker off-CPU% (only workers with measurable wall time), un-reduced.
405 // EMPTY = not measured: the re-pool then yields None for avg/min/max/spread,
406 // preserving the not-measured-vs-measured-zero distinction cgroup_stats keeps.
407 let off_cpu_pcts: Vec<f64> = reports
408 .iter()
409 .filter(|w| w.wall_time_ns > 0)
410 .map(|w| w.off_cpu_ns as f64 / w.wall_time_ns as f64 * 100.0)
411 .collect();
412 // Pool every worker's already per-worker-capped wake-latency vec, RE-CAPPING
413 // the concatenation at MAX_WAKE_SAMPLES via the same Algorithm-R reservoir the
414 // per-worker path uses. This carrier is the FIRST to serialize raw samples
415 // over the size-limited guest bulk port (the AssertResult); without the
416 // re-cap the pool would be workers × MAX_WAKE_SAMPLES and could overrun the
417 // 16 MiB frame on a many-core host, flipping a PASS to a truncated FAIL. The
418 // reservoir is distribution-preserving, so p99 / median / CV re-pool over it
419 // as cgroup_stats does over the per-worker pool; `wake_sample_total` keeps the
420 // TRUE pre-cap population for the re-pool. PARITY: for pools ≤
421 // MAX_WAKE_SAMPLES the reservoir is the full concatenation, so the re-pool is
422 // VALUE-FOR-VALUE with cgroup_stats; above the cap it is a distribution-
423 // preserving SUBSAMPLE (cgroup_stats keeps the full concat), so the re-pool is
424 // distribution-equivalent, not byte-identical — see the wake_latencies_ns
425 // field doc for the full contract. PhaseCgroupStats::merge keeps same-name
426 // carriers bounded too: ≤cap it concatenates (value-for-value), >cap it uses a
427 // population-WEIGHTED reservoir merge (weighted_merge_reservoirs) so the merged
428 // subsample is unbiased rather than length-skewed toward the smaller carrier.
429 let mut wake_latencies_ns: Vec<u64> = Vec::new();
430 let mut pooled_wake_count: u64 = 0;
431 for w in reports {
432 for &sample in &w.wake_latencies_ns {
433 crate::workload::reservoir_push(
434 &mut wake_latencies_ns,
435 &mut pooled_wake_count,
436 sample,
437 crate::workload::MAX_WAKE_SAMPLES,
438 );
439 }
440 }
441 // saturating folds (per cgroup_stats's rationale): overflow-safe pool of the
442 // per-worker guest-runtime counters into this per-phase carrier.
443 let wake_sample_total: u64 = reports
444 .iter()
445 .map(|w| w.wake_sample_total)
446 .fold(0u64, u64::saturating_add);
447 // Distinct timer reservoir, pooled across workers exactly like
448 // wake_latencies_ns; timer_sample_total keeps the true pre-cap population.
449 let mut timer_latencies_ns: Vec<u64> = Vec::new();
450 let mut pooled_timer_count: u64 = 0;
451 for w in reports {
452 for &sample in &w.timer_latencies_ns {
453 crate::workload::reservoir_push(
454 &mut timer_latencies_ns,
455 &mut pooled_timer_count,
456 sample,
457 crate::workload::MAX_WAKE_SAMPLES,
458 );
459 }
460 }
461 let timer_sample_total: u64 = reports
462 .iter()
463 .map(|w| w.timer_sample_total)
464 .fold(0u64, u64::saturating_add);
465 // RAW ns, one per worker — NOT divided by 1000. cgroup_stats divides at
466 // reduction time; the re-pool over the concatenated samples divides once,
467 // so pre-dividing here would double-divide (a 1000x error).
468 let run_delays_ns: Vec<u64> = reports.iter().map(|w| w.schedstat_run_delay_ns).collect();
469 // Coupled worst gap: take (ms, cpu) TOGETHER from the worst worker (argmax),
470 // never two independent maxes — keeps the gap bound to its CPU.
471 let (max_gap_ms, max_gap_cpu) = reports
472 .iter()
473 .max_by_key(|w| w.max_gap_ms)
474 .map(|w| (w.max_gap_ms, w.max_gap_cpu))
475 .unwrap_or((0, 0));
476 let total_migrations: u64 = reports
477 .iter()
478 .map(|w| w.migration_count)
479 .fold(0u64, u64::saturating_add);
480 let total_iterations: u64 = reports
481 .iter()
482 .map(|w| w.iterations)
483 .fold(0u64, u64::saturating_add);
484 // schedstat_cpu_time_ns (task->se.sum_exec_runtime), NOT cpu_time_ns
485 // (CLOCK_THREAD_CPUTIME_ID) — matches cgroup_stats's total_cpu_time_ns.
486 let total_cpu_time_ns: u64 = reports
487 .iter()
488 .map(|w| w.schedstat_cpu_time_ns)
489 .fold(0u64, u64::saturating_add);
490 let numa_pages_total: u64 = reports
491 .iter()
492 .map(|w| {
493 w.numa_pages
494 .values()
495 .copied()
496 .fold(0u64, u64::saturating_add)
497 })
498 .fold(0u64, u64::saturating_add);
499 // System-wide /proc/vmstat numa_pages_migrated delta each worker observes
500 // redundantly -> MAX, not SUM (summing inflates by the worker count).
501 let cross_node_migrated: u64 = reports
502 .iter()
503 .map(|w| w.vmstat_numa_pages_migrated)
504 .max()
505 .unwrap_or(0);
506 // Pages on the cgroup's expected NUMA nodes (page_locality numerator),
507 // partitioned exactly as AssertPlan::assert_cgroup does; 0 without a node
508 // set (mirrors cgroup_stats leaving page_locality 0.0 absent NUMA context).
509 let numa_pages_local: u64 = expected_nodes
510 .map(|nodes| {
511 let mut local = 0u64;
512 for w in reports {
513 for (&node, &count) in &w.numa_pages {
514 if nodes.contains(&node) {
515 local = local.saturating_add(count);
516 }
517 }
518 }
519 local
520 })
521 .unwrap_or(0);
522 PhaseCgroupStats {
523 num_workers: reports.len(),
524 cpus_used,
525 wake_latencies_ns,
526 wake_sample_total,
527 timer_latencies_ns,
528 timer_sample_total,
529 run_delays_ns,
530 off_cpu_pcts,
531 total_migrations,
532 total_iterations,
533 total_cpu_time_ns,
534 numa_pages_local,
535 numa_pages_total,
536 cross_node_migrated,
537 max_gap_ms,
538 max_gap_cpu,
539 // Fresh carrier built from worker reports — never stripped.
540 stripped: false,
541 // Derived scalars are filled post-build by derive_phase_metrics.
542 metrics: std::collections::BTreeMap::new(),
543 // schbench rides PhaseSlice (the backdrop per-phase carrier), not the
544 // whole-run WorkerReports this fn pools, so it is always None here.
545 schbench: None,
546 taobench: None,
547 }
548}
549
550/// Build a per-cgroup carrier from ONE already-per-phase backdrop
551/// [`crate::workload::PhaseSlice`] (no whole-run differencing — the
552/// slice's counter fields are already per-phase deltas). The single-worker
553/// analog of [`phase_cgroup_stats`]: `num_workers` is 1 and each list field
554/// carries this one worker's value, so [`PhaseCgroupStats::merge`] pools
555/// slices across backdrop workers into a per-epoch carrier identically to
556/// how [`phase_cgroup_stats`] pools whole-run reports.
557pub(crate) fn phase_slice_to_cgroup_stats(
558 slice: &crate::workload::PhaseSlice,
559 expected_nodes: Option<&BTreeSet<usize>>,
560) -> PhaseCgroupStats {
561 // Per-phase off-CPU%, one value, only when wall time was measured
562 // (wall_ns == 0 => the worker never ran this phase => EMPTY = not
563 // measured, matching phase_cgroup_stats's not-measured contract).
564 let off_cpu_pcts: Vec<f64> = if slice.wall_ns > 0 {
565 vec![slice.off_cpu_ns as f64 / slice.wall_ns as f64 * 100.0]
566 } else {
567 Vec::new()
568 };
569 // saturating folds: overflow-safe pool of this slice's per-node page counts.
570 let numa_pages_total: u64 = slice
571 .numa_pages
572 .values()
573 .copied()
574 .fold(0u64, u64::saturating_add);
575 let numa_pages_local: u64 = expected_nodes
576 .map(|nodes| {
577 slice
578 .numa_pages
579 .iter()
580 .filter(|(node, _)| nodes.contains(node))
581 .map(|(_, &count)| count)
582 .fold(0u64, u64::saturating_add)
583 })
584 .unwrap_or(0);
585 PhaseCgroupStats {
586 num_workers: 1,
587 cpus_used: slice.cpus_used.clone(),
588 wake_latencies_ns: slice.wake_latencies_ns.clone(),
589 wake_sample_total: slice.wake_sample_total,
590 timer_latencies_ns: slice.timer_latencies_ns.clone(),
591 timer_sample_total: slice.timer_sample_total,
592 // RAW ns, one per worker (NOT divided) — same contract as
593 // phase_cgroup_stats::run_delays_ns.
594 run_delays_ns: vec![slice.run_delay_ns],
595 off_cpu_pcts,
596 total_migrations: slice.migration_count,
597 total_iterations: slice.iterations,
598 total_cpu_time_ns: slice.schedstat_cpu_time_ns,
599 numa_pages_local,
600 numa_pages_total,
601 cross_node_migrated: slice.vmstat_numa_pages_migrated,
602 max_gap_ms: slice.max_gap_ms,
603 max_gap_cpu: slice.max_gap_cpu,
604 stripped: false,
605 // Derived scalars are filled post-build by derive_phase_metrics.
606 metrics: std::collections::BTreeMap::new(),
607 // Carry the per-phase schbench engine metrics through (None for every
608 // non-schbench backdrop slice); PhaseCgroupStats::merge pools them.
609 schbench: slice.schbench.clone(),
610 // Carry the per-phase taobench engine metrics through (None for every
611 // non-taobench backdrop slice); PhaseCgroupStats::merge pools them.
612 taobench: slice.taobench.clone(),
613 }
614}
615
616/// Pool a set of backdrop [`crate::workload::PhaseSlice`]s (all for
617/// the SAME epoch, one per worker) into a single per-cgroup carrier via
618/// [`PhaseCgroupStats::merge`] — the per-phase analog of
619/// [`phase_cgroup_stats`] pooling whole-run reports. An empty input yields
620/// a zero-worker carrier (all fields empty/0, `stripped: false`) so a phase
621/// no backdrop worker observed renders as not-measured rather than
622/// panicking.
623pub(crate) fn pool_phase_slice_stats(
624 slices: &[&crate::workload::PhaseSlice],
625 expected_nodes: Option<&BTreeSet<usize>>,
626) -> PhaseCgroupStats {
627 let mut iter = slices
628 .iter()
629 .map(|s| phase_slice_to_cgroup_stats(s, expected_nodes));
630 match iter.next() {
631 Some(first) => iter.fold(first, PhaseCgroupStats::merge),
632 None => PhaseCgroupStats {
633 num_workers: 0,
634 cpus_used: BTreeSet::new(),
635 wake_latencies_ns: Vec::new(),
636 wake_sample_total: 0,
637 timer_latencies_ns: Vec::new(),
638 timer_sample_total: 0,
639 run_delays_ns: Vec::new(),
640 off_cpu_pcts: Vec::new(),
641 total_migrations: 0,
642 total_iterations: 0,
643 total_cpu_time_ns: 0,
644 numa_pages_local: 0,
645 numa_pages_total: 0,
646 cross_node_migrated: 0,
647 max_gap_ms: 0,
648 max_gap_cpu: 0,
649 stripped: false,
650 metrics: std::collections::BTreeMap::new(),
651 schbench: None,
652 taobench: None,
653 },
654 }
655}
656
657/// Expand a backdrop worker set's per-phase
658/// [`crate::workload::PhaseSlice`]s into one [`PhaseBucket`] per epoch,
659/// keyed by the epoch as `step_index`, each pooling that epoch's slices
660/// across workers via [`pool_phase_slice_stats`]. BASELINE (epoch 0) and
661/// inter-step-gap (`u32::MAX`) epochs are skipped — they have no paired
662/// host bucket and the host fold discards them. Called by the backdrop
663/// (None-`step_index`) arm of [`crate::scenario::collect_handles`]; the
664/// host's [`fold_guest_per_cgroup_into_host_buckets`] then unions these
665/// into the host-rebuilt buckets (matched epochs) or surfaces them as
666/// orphan not-measured windows. Extracted (rather than inlined in
667/// collect_handles, which calls `stop_and_collect`) so the grouping +
668/// per-epoch pooling is unit-testable directly.
669pub(crate) fn expand_backdrop_phase_buckets(
670 name: &str,
671 reports: &[WorkerReport],
672 expected_nodes: Option<&BTreeSet<usize>>,
673) -> Vec<PhaseBucket> {
674 let mut by_epoch: std::collections::BTreeMap<u32, Vec<&crate::workload::PhaseSlice>> =
675 std::collections::BTreeMap::new();
676 for report in reports {
677 for slice in &report.phase_slices {
678 if slice.phase_epoch == 0 || slice.phase_epoch == u32::MAX {
679 continue;
680 }
681 by_epoch.entry(slice.phase_epoch).or_default().push(slice);
682 }
683 }
684 by_epoch
685 .into_iter()
686 .map(|(epoch, slices)| {
687 let mut per_cgroup = std::collections::BTreeMap::new();
688 per_cgroup.insert(
689 name.to_string(),
690 pool_phase_slice_stats(&slices, expected_nodes),
691 );
692 // Lossless: a real epoch == u32::from(phase_step_index: u16),
693 // and 0 / u32::MAX are filtered above.
694 let step_index = epoch as u16;
695 PhaseBucket {
696 step_index,
697 label: Phase::from(step_index).to_string(),
698 start_ms: u64::MAX,
699 end_ms: 0,
700 sample_count: 0,
701 metrics: std::collections::BTreeMap::new(),
702 per_cgroup,
703 }
704 })
705 .collect()
706}
707
708/// Build the single-bucket guest-side per-phase carrier for one step-local
709/// cgroup: a [`PhaseBucket`] at `step_index` whose only payload is the
710/// `per_cgroup` entry `name -> phase_cgroup_stats(reports, expected_nodes)`.
711///
712/// The guest emits one of these per step-local cgroup at `collect_step`
713/// teardown ([`crate::scenario::collect_handles`]). The window is the
714/// merge-neutral `(u64::MAX, 0)` sentinel and `metrics` is empty: the carrier
715/// contributes ONLY `per_cgroup`. When folded into the host-rebuilt bucket of
716/// the same `step_index` ([`fold_guest_per_cgroup_into_host_buckets`] via
717/// [`merge_matched_phase_buckets`]) the `MAX`/`0` window is a no-op against the
718/// host's real window (`min`/`max`), so the host's window and metrics win and
719/// only `per_cgroup` is carried. The `label` uses [`Phase`]'s `Display` so an
720/// orphan carrier (no host bucket) still reads `BASELINE`/`Step[k]`.
721pub(crate) fn step_per_cgroup_bucket(
722 name: &str,
723 reports: &[WorkerReport],
724 expected_nodes: Option<&BTreeSet<usize>>,
725 step_index: u16,
726) -> PhaseBucket {
727 let mut per_cgroup = std::collections::BTreeMap::new();
728 per_cgroup.insert(
729 name.to_string(),
730 phase_cgroup_stats(reports, expected_nodes),
731 );
732 PhaseBucket {
733 step_index,
734 label: Phase::from(step_index).to_string(),
735 start_ms: u64::MAX,
736 end_ms: 0,
737 sample_count: 0,
738 metrics: std::collections::BTreeMap::new(),
739 per_cgroup,
740 }
741}
742
743/// Roll a single cgroup's [`CgroupStats`] up into a one-cgroup
744/// [`ScenarioStats`]. The KEPT typed `worst_*` fields carry this cgroup's
745/// values and fold across cgroups in [`AssertResult::merge`] by max (all are
746/// higher-is-worse): `worst_spread`, `worst_migration_ratio`, and the coupled
747/// `worst_gap_ms` / `worst_gap_cpu`. The two NUMA roll-ups
748/// (`worst_page_locality`, `worst_cross_node_migration_ratio`), the wake-latency
749/// / run-delay distributions, the iteration efficiencies, and the wake-latency
750/// tail ratio are NOT carried here — they have no typed field and re-pool
751/// run-level POST-merge in [`populate_run_distribution_metrics`]:
752/// `worst_page_locality` (WorstLowest) re-pools None-aware from
753/// `stats.phases[].per_cgroup` NUMA carriers (lowest per-cgroup locality, a
754/// measured 0.0 winning the lowest rather than being skipped as a sentinel),
755/// `worst_cross_node_migration_ratio` (WorstCrossNodeRatio) re-pools the MAX
756/// per-cgroup churn ratio from the same carriers, the tail ratio is the max over
757/// the per-cgroup `CgroupStats::wake_latency_tail_ratio`, and the distributions /
758/// efficiencies pool from `stats.phases[].per_cgroup` / `stats.cgroups`.
759/// `cgroups` carries exactly this one entry so merge appends one per
760/// handle without double-counting.
761pub(crate) fn scenario_stats_for_cgroup(cg: &CgroupStats) -> ScenarioStats {
762 ScenarioStats {
763 total_workers: cg.num_workers,
764 total_cpus: cg.num_cpus,
765 total_migrations: cg.total_migrations,
766 // worst_spread is higher-is-worse (merge takes max). A
767 // not-measured cgroup (`spread == None`) maps to 0.0 — the
768 // neutral element for max, and the gauntlet layer's
769 // documented no-data convention. A measured zero stays 0.0.
770 worst_spread: cg.spread.unwrap_or(0.0),
771 worst_gap_ms: cg.max_gap_ms,
772 worst_gap_cpu: cg.max_gap_cpu,
773 worst_migration_ratio: cg.migration_ratio,
774 total_iterations: cg.total_iterations,
775 // worst_page_locality and worst_cross_node_migration_ratio are no longer
776 // typed fields — both re-pool from the per-phase NUMA carriers in
777 // populate_run_distribution_metrics. cg.page_locality is structurally 0.0
778 // here (it needs an expected-node set this reports-only builder lacks);
779 // cg.cross_node_migration_ratio IS populated but is a single pre-folded
780 // scalar that cannot carry the cross-phase SUM-migrated / LATEST-total
781 // fold, so the per-phase carriers own both.
782 ext_metrics: cg.ext_metrics.clone(),
783 cgroups: vec![cg.clone()],
784 phases: Vec::new(),
785 }
786}
787
788/// Record the DEFAULT fairness outcomes (Starved / Unfair / Stuck) for one
789/// cgroup against the framework default thresholds
790/// ([`spread_threshold_pct`] / [`gap_threshold_ms`]). Telemetry is built
791/// separately by [`cgroup_stats`]; this only appends fail outcomes, so it
792/// is shared by [`assert_not_starved`] and the `not_starved` arm of
793/// [`AssertPlan::assert_cgroup`] without rebuilding stats.
794pub(crate) fn record_default_fairness(
795 r: &mut AssertResult,
796 cg: &CgroupStats,
797 reports: &[WorkerReport],
798) {
799 for w in reports {
800 if w.work_units == 0 {
801 r.record_fail(AssertDetail::new(
802 DetailKind::Starved,
803 format!("tid {} starved (0 work units)", w.tid),
804 ));
805 }
806 }
807 // Off-cpu spread above the default threshold, gated on >=2 workers
808 // with measurable wall time (the historical `pcts.len() >= 2`).
809 // `cg.spread` is None when off-CPU% was not measured — inconclusive,
810 // never flagged unfair.
811 let measurable = reports.iter().filter(|w| w.wall_time_ns > 0).count();
812 let spread_limit = spread_threshold_pct();
813 if let Some(spread) = cg.spread
814 && spread > spread_limit
815 && measurable >= 2
816 {
817 r.record_fail(AssertDetail::new(
818 DetailKind::Unfair,
819 format!(
820 "unfair cgroup: spread={:.0}% ({:.0}-{:.0}%) {} workers on {} cpus (threshold {:.0}%)",
821 spread,
822 cg.min_off_cpu_pct.unwrap_or(0.0),
823 cg.max_off_cpu_pct.unwrap_or(0.0),
824 cg.num_workers,
825 cg.num_cpus,
826 spread_limit,
827 ),
828 ));
829 }
830 let gap_limit = gap_threshold_ms();
831 for w in reports {
832 if w.max_gap_ms > gap_limit {
833 r.record_fail(AssertDetail::new(
834 DetailKind::Stuck,
835 format!(
836 "tid {} stuck {}ms on cpu{} at +{}ms (threshold {}ms)",
837 w.tid, w.max_gap_ms, w.max_gap_cpu, w.max_gap_at_ms, gap_limit,
838 ),
839 ));
840 }
841 }
842}
843
844/// Default fairness check for one cgroup's worker reports: builds the
845/// per-cgroup telemetry ([`cgroup_stats`]) and records Starved / Unfair /
846/// Stuck against the framework default thresholds. Telemetry is ALWAYS
847/// populated — including a `num_workers == 0` entry for empty reports — so
848/// `r.stats.cgroups` is never empty for a declared cgroup, independent of
849/// whether any fail outcome fired.
850pub fn assert_not_starved(reports: &[WorkerReport]) -> AssertResult {
851 let cg = cgroup_stats(reports);
852 let mut r = AssertResult::pass();
853 record_default_fairness(&mut r, &cg, reports);
854 r.stats = scenario_stats_for_cgroup(&cg);
855 r
856}
857
858/// Check throughput parity across workers: coefficient of variation and
859/// minimum work rate.
860///
861/// `max_cv`: maximum allowed coefficient of variation (stddev/mean) for
862/// work_units / cpu_time_ns across workers. `None` skips the CV check.
863///
864/// `min_rate`: minimum work_units per CPU-second. `None` skips the floor check.
865///
866/// When every worker recorded `cpu_time_ns == 0`, both gates record
867/// their OWN Inconclusive outcome (the CV gate emits a "CV cannot be
868/// computed" detail; the min_rate gate emits a "rates cannot be
869/// computed" detail). Each gate carries its own diagnostic so a
870/// caller that supplies only one of the two threshold parameters
871/// sees the matching Inconclusive message and an operator reading
872/// [`AssertResult::inconclusive_details`] can identify which gate(s)
873/// misfired without re-deriving the inputs.
874///
875/// ```
876/// # use ktstr::assert::assert_throughput_parity;
877/// # use ktstr::workload::WorkerReport;
878/// # let mk = |units, cpu_ns| WorkerReport {
879/// # tid: 1, cpus_used: [0].into_iter().collect(),
880/// # work_units: units, cpu_time_ns: cpu_ns, wall_time_ns: cpu_ns,
881/// # off_cpu_ns: cpu_ns, migration_count: 0, migrations: vec![],
882/// # max_gap_ms: 0, max_gap_cpu: 0, max_gap_at_ms: 0,
883/// # wake_latencies_ns: vec![], wake_sample_total: 0,
884/// # iteration_costs_ns: vec![], iteration_cost_sample_total: 0,
885/// # iterations: 0,
886/// # schedstat_run_delay_ns: 0, schedstat_run_count: 0,
887/// # schedstat_cpu_time_ns: 0,
888/// # completed: true,
889/// # numa_pages: std::collections::BTreeMap::new(),
890/// # vmstat_numa_pages_migrated: 0,
891/// # exit_info: None,
892/// # is_messenger: false,
893/// # ..Default::default()
894/// # };
895/// // Equal throughput -> low CV -> passes.
896/// let reports = [mk(1000, 1_000_000_000), mk(1000, 1_000_000_000)];
897/// assert!(assert_throughput_parity(&reports, Some(0.5), None).is_pass());
898/// ```
899pub fn assert_throughput_parity(
900 reports: &[WorkerReport],
901 max_cv: Option<f64>,
902 min_rate: Option<f64>,
903) -> AssertResult {
904 let mut r = AssertResult::pass();
905 if reports.is_empty() {
906 return r;
907 }
908
909 // Compute per-worker throughput: work_units / cpu_seconds
910 let rates: Vec<f64> = reports
911 .iter()
912 .map(|w| {
913 if w.cpu_time_ns == 0 {
914 0.0
915 } else {
916 w.work_units as f64 / (w.cpu_time_ns as f64 / 1e9)
917 }
918 })
919 .collect();
920
921 // CV is computed over MEASURED workers only (cpu_time_ns > 0). A zero-cpu
922 // worker's rate is unknowable — it is forced to 0.0 above so the min_rate
923 // gate below can index `reports[i]`, but that 0.0 is NOT a measured rate.
924 // Folding it into the mean/variance inflates the spread and FAILs a uniform
925 // workload that merely had one worker record no CPU time — the same exclusion
926 // the min_rate gate applies (it skips zero-cpu workers as "rate unknowable,
927 // not failing").
928 let measured: Vec<f64> = reports
929 .iter()
930 .zip(&rates)
931 .filter(|(w, _)| w.cpu_time_ns > 0)
932 .map(|(_, &rate)| rate)
933 .collect();
934 let mean = if measured.is_empty() {
935 0.0
936 } else {
937 measured.iter().sum::<f64>() / measured.len() as f64
938 };
939
940 // Detect the all-zero-cpu condition once so a call with both
941 // `max_cv` and `min_rate` set surfaces a single Inconclusive
942 // listing every threshold that couldn't evaluate, rather than
943 // emitting one record per gate (which produced duplicate
944 // "denominator is zero" diagnostics for the same root cause).
945 let all_zero_cpu = reports.iter().all(|w| w.cpu_time_ns == 0);
946
947 if all_zero_cpu && (max_cv.is_some() || min_rate.is_some()) {
948 let mut limits: Vec<String> = Vec::with_capacity(2);
949 if let Some(cv_limit) = max_cv {
950 limits.push(format!("max_cv {cv_limit:.3}"));
951 }
952 if let Some(floor) = min_rate {
953 limits.push(format!("min_rate {floor:.0}"));
954 }
955 r.record_inconclusive(AssertDetail::new(
956 DetailKind::Benchmark,
957 format!(
958 "throughput parity inconclusive: all {} workers recorded zero cpu_time_ns — \
959 denominator is zero, rates cannot be computed; {} neither pass nor fail \
960 (was the workload able to run?)",
961 reports.len(),
962 limits.join(" + "),
963 ),
964 ));
965 return r;
966 }
967
968 if let Some(cv_limit) = max_cv
969 && mean > 0.0
970 && measured.len() >= 2
971 {
972 let n_measured = measured.len() as f64;
973 let variance = measured.iter().map(|r| (r - mean).powi(2)).sum::<f64>() / n_measured;
974 let stddev = variance.sqrt();
975 let cv = stddev / mean;
976 if cv > cv_limit {
977 r.record_fail(AssertDetail::new(
978 DetailKind::Benchmark,
979 format!(
980 "throughput CV {cv:.3} exceeds limit {cv_limit:.3} (mean={mean:.0} work/cpu_s)"
981 ),
982 ));
983 }
984 }
985
986 if let Some(floor) = min_rate {
987 // Skip per-worker zero-cpu cases: their rate is forced to
988 // 0.0 above, and comparing that to `floor` would synthesize
989 // a guaranteed Fail with a misleading "below floor" message
990 // when the real story is "this worker recorded no CPU time
991 // — the rate is unknowable, not failing". The all-zero-cpu
992 // case is already handled at the top of the function as a
993 // single combined Inconclusive.
994 for (i, &rate) in rates.iter().enumerate() {
995 if reports[i].cpu_time_ns == 0 {
996 continue;
997 }
998 if rate < floor {
999 r.record_fail(AssertDetail::new(
1000 DetailKind::Benchmark,
1001 format!(
1002 "worker {} throughput {rate:.0} work/cpu_s below floor {floor:.0}",
1003 reports[i].tid
1004 ),
1005 ));
1006 }
1007 }
1008 }
1009
1010 r
1011}
1012
1013/// Check benchmarking metrics: p99 wake latency, wake latency CV,
1014/// and minimum iteration rate.
1015///
1016/// ```
1017/// # use ktstr::assert::assert_benchmarks;
1018/// # use ktstr::workload::WorkerReport;
1019/// # let report = WorkerReport {
1020/// # tid: 1, cpus_used: [0].into_iter().collect(),
1021/// # work_units: 1000, cpu_time_ns: 2_500_000_000,
1022/// # wall_time_ns: 5_000_000_000, off_cpu_ns: 2_500_000_000,
1023/// # migration_count: 0, migrations: vec![],
1024/// # max_gap_ms: 50, max_gap_cpu: 0, max_gap_at_ms: 1000,
1025/// # wake_latencies_ns: vec![100, 200, 300, 400, 500],
1026/// # wake_sample_total: 5,
1027/// # iteration_costs_ns: vec![], iteration_cost_sample_total: 0,
1028/// # iterations: 1000,
1029/// # schedstat_run_delay_ns: 0, schedstat_run_count: 0,
1030/// # schedstat_cpu_time_ns: 0,
1031/// # completed: true,
1032/// # numa_pages: std::collections::BTreeMap::new(),
1033/// # vmstat_numa_pages_migrated: 0,
1034/// # exit_info: None,
1035/// # is_messenger: false,
1036/// # ..Default::default()
1037/// # };
1038/// // p99 = 500ns, well under 10000ns limit.
1039/// assert!(assert_benchmarks(&[report], Some(10000), None, None).is_pass());
1040/// ```
1041pub fn assert_benchmarks(
1042 reports: &[WorkerReport],
1043 max_p99_ns: Option<u64>,
1044 max_cv: Option<f64>,
1045 min_iter_rate: Option<f64>,
1046) -> AssertResult {
1047 let mut r = AssertResult::pass();
1048 if reports.is_empty() {
1049 // No worker reports means nothing to measure — any benchmark
1050 // threshold the caller supplied cannot be evaluated. A silent
1051 // pass would let thresholds look "green" on a broken run that
1052 // never produced signal; surface it as skip so the operator
1053 // knows the benchmark was not actually exercised.
1054 return AssertResult::skip("no worker reports — benchmark skipped");
1055 }
1056
1057 // Collect all wake latencies across workers.
1058 let all_latencies: Vec<u64> = reports
1059 .iter()
1060 .flat_map(|w| w.wake_latencies_ns.iter().copied())
1061 .collect();
1062
1063 if let Some(p99_limit) = max_p99_ns
1064 && !all_latencies.is_empty()
1065 {
1066 let mut sorted = all_latencies.clone();
1067 sorted.sort_unstable();
1068 let p99 = percentile(&sorted, 0.99);
1069 if p99 > p99_limit {
1070 r.record_fail(AssertDetail::new(
1071 DetailKind::Benchmark,
1072 format!(
1073 "p99 wake latency {p99}ns exceeds limit {p99_limit}ns ({} samples)",
1074 sorted.len()
1075 ),
1076 ));
1077 }
1078 }
1079
1080 if let Some(cv_limit) = max_cv
1081 && all_latencies.len() >= 2
1082 {
1083 let n = all_latencies.len() as f64;
1084 let mean = all_latencies.iter().sum::<u64>() as f64 / n;
1085 if mean > 0.0 {
1086 let variance = all_latencies
1087 .iter()
1088 .map(|&v| (v as f64 - mean).powi(2))
1089 .sum::<f64>()
1090 / n;
1091 let cv = variance.sqrt() / mean;
1092 if cv > cv_limit {
1093 r.record_fail(AssertDetail::new(
1094 DetailKind::Benchmark,
1095 format!(
1096 "wake latency CV {cv:.3} exceeds limit {cv_limit:.3} (mean={mean:.0}ns)"
1097 ),
1098 ));
1099 }
1100 } else {
1101 // CV is dispersion / mean. With mean == 0 every captured
1102 // wake-latency sample was zero, so the denominator is
1103 // zero and CV is undefined — neither pass nor fail is
1104 // truthful. The same workload that fails to record
1105 // measurable wake latency at all (typically: nothing
1106 // actually woke, or every wake landed at <1ns and
1107 // truncated to zero in the ns counter) previously slid
1108 // past the gate as a silent pass; surface it as
1109 // Inconclusive so a broken benchmarking run does not
1110 // masquerade as a CV-compliant one.
1111 r.record_inconclusive(AssertDetail::new(
1112 DetailKind::Benchmark,
1113 format!(
1114 "wake latency CV inconclusive: all {} sample(s) had zero mean wake \
1115 latency — denominator is zero, CV cannot be computed; limit \
1116 {cv_limit:.3} neither pass nor fail (did any wake event capture a \
1117 non-zero latency?)",
1118 all_latencies.len(),
1119 ),
1120 ));
1121 }
1122 }
1123
1124 if let Some(rate_floor) = min_iter_rate {
1125 // Skip per-worker zero-wall cases (rate is unknowable when
1126 // wall_time_ns == 0) but count them: if every worker had
1127 // zero wall_time, the gate silently passed before — record
1128 // Inconclusive instead so a broken run that produced no
1129 // signal at all doesn't masquerade as a passing benchmark.
1130 let mut zero_wall_count = 0usize;
1131 for w in reports {
1132 if w.wall_time_ns == 0 {
1133 zero_wall_count += 1;
1134 continue;
1135 }
1136 let rate = w.iterations as f64 / (w.wall_time_ns as f64 / 1e9);
1137 if rate < rate_floor {
1138 r.record_fail(AssertDetail::new(
1139 DetailKind::Benchmark,
1140 format!(
1141 "worker {} iteration rate {rate:.1}/s below floor {rate_floor:.1}/s",
1142 w.tid
1143 ),
1144 ));
1145 }
1146 }
1147 if zero_wall_count == reports.len() {
1148 r.record_inconclusive(AssertDetail::new(
1149 DetailKind::Benchmark,
1150 format!(
1151 "min iteration rate inconclusive: all {} workers recorded zero wall_time_ns — \
1152 denominator is zero, rate cannot be computed; floor {rate_floor:.1}/s \
1153 neither pass nor fail (was the workload able to run?)",
1154 reports.len()
1155 ),
1156 ));
1157 }
1158 }
1159
1160 r
1161}
1162
1163/// Assert that every SCX event counter in `events` is at or below
1164/// `max_count`. `events` is a slice of `(name, count)` pairs sourced
1165/// from the kernel's per-task `scx_event_stats` (see `kernel/sched/ext.c`,
1166/// `SCX_EV_*` macros) — typically aggregated and surfaced via
1167/// `monitor::ScxEventDeltas` or sidecar `GauntletRow.fallback_count` /
1168/// `keep_last_count` fields. Pass `None` for `max_count` to require zero
1169/// (the strict default — error-class events should not fire under a
1170/// healthy scheduler).
1171///
1172/// The assertion is decoupled from the `monitor` module on purpose:
1173/// callers harvest the counters they care about (via the live monitor
1174/// path or by reading sidecar JSON post-hoc) and feed name/count
1175/// pairs in. This keeps the assert API surface decoupled from the
1176/// kernel-side counter inventory, which evolves across kernel
1177/// versions — adding a new `SCX_EV_*` does not force an API change
1178/// here.
1179///
1180/// Returns a passing result if every counter is within bound; failures
1181/// concatenate one [`AssertDetail`] per offending counter under
1182/// [`DetailKind::SchedulerEvent`] so an operator can identify which
1183/// events fired without scanning the full counter set.
1184///
1185/// ```
1186/// # use ktstr::assert::assert_scx_events_clean;
1187/// // Strict default — every counter must be zero.
1188/// let r = assert_scx_events_clean(&[("enq_skip_exiting", 0), ("dispatch_local_dsq_offline", 0)], None);
1189/// assert!(r.is_pass());
1190///
1191/// // A non-zero error-class counter fails.
1192/// let r = assert_scx_events_clean(&[("enq_skip_exiting", 7)], None);
1193/// assert!(r.is_fail());
1194///
1195/// // Caller-supplied bound tolerates small counts.
1196/// let r = assert_scx_events_clean(&[("dispatch_keep_last", 3)], Some(10));
1197/// assert!(r.is_pass());
1198/// ```
1199pub fn assert_scx_events_clean(events: &[(&str, i64)], max_count: Option<i64>) -> AssertResult {
1200 let mut r = AssertResult::pass();
1201 for (name, count) in events {
1202 // Kernel `scx_event_stats` counters are monotonic u64 — a
1203 // negative i64 here means the source data is corrupted
1204 // (counter reset, wraparound on a signed conversion, or
1205 // sidecar JSON bit-loss). Treat negatives as failures rather
1206 // than letting them silently pass `*count > bound` for any
1207 // non-negative bound.
1208 let failed = match max_count {
1209 // Strict default: every counter must be exactly zero.
1210 // `*count > 0` would let -5 slip through.
1211 None => *count != 0,
1212 // Bounded: reject negatives explicitly, then enforce
1213 // the upper bound.
1214 Some(bound) => *count < 0 || *count > bound,
1215 };
1216 if failed {
1217 let bound_desc = match max_count {
1218 None => "0".to_string(),
1219 Some(b) => b.to_string(),
1220 };
1221 r.record_fail(AssertDetail::new(
1222 DetailKind::SchedulerEvent,
1223 format!("scx event `{name}` count {count} exceeds bound {bound_desc}",),
1224 ));
1225 }
1226 }
1227 r
1228}
1229
1230/// Threshold-preset bundle for [`assert_thresholds`]. Captures the
1231/// guarantees a scheduler-under-test should meet on a healthy run:
1232/// wake latency stays within bound, per-iteration compute cost stays
1233/// within bound, CPU migrations stay within bound, and every worker
1234/// makes some forward progress.
1235///
1236/// Each `Option` field is independent — `None` skips that check. A
1237/// `AbsoluteThresholds` with every field `None` is a no-op (the
1238/// returned [`AssertResult`] always passes), useful as a starting
1239/// point for builder-style composition. Construct the all-`None`
1240/// thresholds via `AbsoluteThresholds::default()` and chain the
1241/// `max_*` / `min_*` setters (e.g. `AbsoluteThresholds::default().max_migrations(5)`)
1242/// or spread into a struct literal (`AbsoluteThresholds { max_migrations: Some(5), ..Default::default() }`).
1243/// Use [`Self::strict`] for the "every check enabled with sane defaults" preset.
1244///
1245/// Distinct from [`Assert`]: `Assert` is the merge-tree threshold
1246/// config consumed by the worker-side `AssertPlan`; `AbsoluteThresholds`
1247/// is a flat preset designed for direct invocation in test bodies
1248/// where the test author wants a one-call multi-field check without
1249/// engaging the merge chain. The two surfaces compose — a test can
1250/// run `assert_thresholds` against a worker-report slice AND merge the
1251/// `Assert`-derived result into the same accumulator via
1252/// [`AssertResult::merge`].
1253#[must_use = "AbsoluteThresholds only takes effect when passed to assert_thresholds"]
1254#[derive(Debug, Clone, Copy, Default)]
1255pub struct AbsoluteThresholds {
1256 /// Maximum acceptable p99 wake latency (nanoseconds). Compared
1257 /// against the pooled p99 across every worker's
1258 /// [`WorkerReport::wake_latencies_ns`]. `None` skips the check.
1259 /// Same units / semantics as [`Assert::max_p99_wake_latency_ns`].
1260 pub max_p99_wake_latency_ns: Option<u64>,
1261 /// Maximum acceptable p99 per-iteration compute cost (nanoseconds).
1262 /// Compared against the pooled p99 across every worker's
1263 /// [`WorkerReport::iteration_costs_ns`]. `None` skips the check.
1264 /// Only meaningful for compute work types that populate the
1265 /// reservoir (`AluHot`, `SmtSiblingSpin`, `IpcVariance`); blocking
1266 /// variants report empty `iteration_costs_ns` and the check is a
1267 /// no-op for those.
1268 pub max_iteration_cost_p99_ns: Option<u64>,
1269 /// Maximum acceptable total CPU migrations across every worker.
1270 /// Compared against the sum of [`WorkerReport::migration_count`].
1271 /// `None` skips the check. Distinct from
1272 /// [`Assert::max_migration_ratio`] (migrations per iteration) —
1273 /// this is an absolute count, useful when the test pins a known
1274 /// workload size and migrations should stay below a fixed ceiling
1275 /// regardless of how many iterations completed.
1276 pub max_migrations: Option<u64>,
1277 /// Minimum acceptable per-worker work_units. Every worker must
1278 /// have completed at least this many work units; one starved
1279 /// worker fails the check. `None` skips. Distinct from
1280 /// [`assert_not_starved`]'s zero-work-units check, which gates
1281 /// only against literal zero — this gate accepts a non-zero
1282 /// floor so a test can reject "barely made progress" runs that
1283 /// pass the strict starvation gate.
1284 pub min_work_units: Option<u64>,
1285}
1286
1287impl AbsoluteThresholds {
1288 /// Sane-default preset: p99 wake latency under 10ms, p99
1289 /// iteration cost under 1ms, total migrations under 1000, every
1290 /// worker completes ≥1 work unit. The defaults are deliberately
1291 /// loose — a threshold set tight enough to catch egregious
1292 /// regressions without flagging every routine scheduler
1293 /// perturbation. Tests
1294 /// that need tighter bounds should set the fields explicitly via
1295 /// the bare-verb builder methods rather than tuning these constants.
1296 pub const fn strict() -> Self {
1297 Self {
1298 max_p99_wake_latency_ns: Some(10_000_000),
1299 max_iteration_cost_p99_ns: Some(1_000_000),
1300 max_migrations: Some(1000),
1301 min_work_units: Some(1),
1302 }
1303 }
1304
1305 /// Builder setter for [`Self::max_p99_wake_latency_ns`].
1306 pub const fn max_p99_wake_latency_ns(mut self, v: u64) -> Self {
1307 self.max_p99_wake_latency_ns = Some(v);
1308 self
1309 }
1310
1311 /// Builder setter for [`Self::max_iteration_cost_p99_ns`].
1312 pub const fn max_iteration_cost_p99_ns(mut self, v: u64) -> Self {
1313 self.max_iteration_cost_p99_ns = Some(v);
1314 self
1315 }
1316
1317 /// Builder setter for [`Self::max_migrations`].
1318 pub const fn max_migrations(mut self, v: u64) -> Self {
1319 self.max_migrations = Some(v);
1320 self
1321 }
1322
1323 /// Builder setter for [`Self::min_work_units`].
1324 pub const fn min_work_units(mut self, v: u64) -> Self {
1325 self.min_work_units = Some(v);
1326 self
1327 }
1328}
1329
1330/// Run every check in `thresholds` against `reports`, merging results
1331/// into a single [`AssertResult`]. A `None` field on the thresholds
1332/// skips that check.
1333///
1334/// An empty `reports` slice short-circuits to a skip (`"no worker
1335/// reports to evaluate"`) regardless of thresholds content — silently
1336/// passing thresholds against zero samples would let them look
1337/// "green" on a run that produced no measurement.
1338///
1339/// Field-to-check mapping:
1340/// - `max_p99_wake_latency_ns` -> pooled p99 across every worker's
1341/// `wake_latencies_ns`; tagged [`DetailKind::Benchmark`].
1342/// - `max_iteration_cost_p99_ns` -> pooled p99 across every worker's
1343/// `iteration_costs_ns`; tagged [`DetailKind::Benchmark`].
1344/// - `max_migrations` -> sum of `migration_count` across workers;
1345/// tagged [`DetailKind::Migration`].
1346/// - `min_work_units` -> per-worker `work_units >= floor`; tagged
1347/// [`DetailKind::Starved`] when a worker is below the floor.
1348///
1349/// The wake-latency check delegates to [`assert_benchmarks`] for the
1350/// percentile path so the same nearest-rank algorithm applies; the
1351/// iteration-cost check uses an inline percentile call against the
1352/// pooled `iteration_costs_ns` reservoir.
1353///
1354/// ```
1355/// # use ktstr::assert::{AbsoluteThresholds, assert_thresholds};
1356/// # use ktstr::workload::WorkerReport;
1357/// # let report = WorkerReport {
1358/// # tid: 1, cpus_used: [0].into_iter().collect(),
1359/// # work_units: 1000, cpu_time_ns: 2_500_000_000,
1360/// # wall_time_ns: 5_000_000_000, off_cpu_ns: 2_500_000_000,
1361/// # migration_count: 5, migrations: vec![],
1362/// # max_gap_ms: 50, max_gap_cpu: 0, max_gap_at_ms: 1000,
1363/// # wake_latencies_ns: vec![100, 200, 300, 400, 500],
1364/// # wake_sample_total: 5,
1365/// # iteration_costs_ns: vec![1000, 2000, 3000, 4000, 5000],
1366/// # iteration_cost_sample_total: 5,
1367/// # iterations: 1000,
1368/// # schedstat_run_delay_ns: 0, schedstat_run_count: 0,
1369/// # schedstat_cpu_time_ns: 0,
1370/// # completed: true,
1371/// # numa_pages: std::collections::BTreeMap::new(),
1372/// # vmstat_numa_pages_migrated: 0,
1373/// # exit_info: None,
1374/// # affinity_error: None,
1375/// # is_messenger: false,
1376/// # group_idx: 0,
1377/// # phase_slices: vec![],
1378/// # ..Default::default()
1379/// # };
1380/// // Strict preset on a healthy run — passes.
1381/// let r = assert_thresholds(&[report], &AbsoluteThresholds::strict());
1382/// assert!(r.is_pass());
1383/// ```
1384pub fn assert_thresholds(
1385 reports: &[WorkerReport],
1386 thresholds: &AbsoluteThresholds,
1387) -> AssertResult {
1388 // Empty `reports` means nothing was measured. Returning a fresh
1389 // `pass()` here would silently green-light a broken run that
1390 // produced no signal. `assert_benchmarks` also skips on empty
1391 // reports, and merging that skip into a `pass()` would preserve it
1392 // (`AssertResult::merge` extends `outcomes`, and an all-Skip
1393 // non-empty `outcomes` folds to Skip) — but its message is generic.
1394 // Surface the skip directly so the operator sees the
1395 // thresholds-specific reason: no worker reports to evaluate.
1396 if reports.is_empty() {
1397 return AssertResult::skip("no worker reports to evaluate");
1398 }
1399
1400 let mut r = AssertResult::pass();
1401
1402 // Wake-latency p99: reuse the existing `assert_benchmarks` path
1403 // so the percentile algorithm stays unified. With `reports`
1404 // non-empty here, `assert_benchmarks` cannot return a skip —
1405 // the merge sees only pass/fail, preserving thresholds semantics.
1406 if thresholds.max_p99_wake_latency_ns.is_some() {
1407 r.merge(assert_benchmarks(
1408 reports,
1409 thresholds.max_p99_wake_latency_ns,
1410 None,
1411 None,
1412 ));
1413 }
1414
1415 // Iteration-cost p99: pooled across every worker's reservoir.
1416 // Skipped when no samples are present — compute work types that
1417 // populate `iteration_costs_ns` are sparse, so an empty pooled
1418 // set is the common case for blocking variants and not a failure.
1419 if let Some(cost_limit) = thresholds.max_iteration_cost_p99_ns {
1420 let all_costs: Vec<u64> = reports
1421 .iter()
1422 .flat_map(|w| w.iteration_costs_ns.iter().copied())
1423 .collect();
1424 if !all_costs.is_empty() {
1425 let mut sorted = all_costs.clone();
1426 sorted.sort_unstable();
1427 let p99 = percentile(&sorted, 0.99);
1428 if p99 > cost_limit {
1429 r.record_fail(AssertDetail::new(
1430 DetailKind::Benchmark,
1431 format!(
1432 "p99 iteration cost {p99}ns exceeds limit {cost_limit}ns ({} samples)",
1433 sorted.len(),
1434 ),
1435 ));
1436 }
1437 }
1438 }
1439
1440 // Total migrations across all workers: absolute-count gate
1441 // (distinct from migration_ratio which is a per-iteration rate).
1442 if let Some(max_mig) = thresholds.max_migrations {
1443 // saturating: a corrupt/hostile per-worker migration_count must clamp,
1444 // not wrap, this absolute-count gate — a wrapped small total would
1445 // silently PASS a max_migrations limit it should fail.
1446 let total_mig: u64 = reports
1447 .iter()
1448 .map(|w| w.migration_count)
1449 .fold(0u64, u64::saturating_add);
1450 if total_mig > max_mig {
1451 r.record_fail(AssertDetail::new(
1452 DetailKind::Migration,
1453 format!(
1454 "total migrations {total_mig} exceeds limit {max_mig} ({} workers)",
1455 reports.len(),
1456 ),
1457 ));
1458 }
1459 }
1460
1461 // Per-worker work_units floor: every worker must have completed
1462 // at least `min` work units. One starved worker fails the check.
1463 if let Some(min_units) = thresholds.min_work_units {
1464 for w in reports {
1465 if w.work_units < min_units {
1466 r.record_fail(AssertDetail::new(
1467 DetailKind::Starved,
1468 format!(
1469 "tid {} work_units {} below floor {min_units}",
1470 w.tid, w.work_units,
1471 ),
1472 ));
1473 }
1474 }
1475 }
1476
1477 r
1478}
1479
1480// (The legacy `Expect` / `Checks` / `CheckBuilder` types previously
1481// living here were replaced by the [`Verdict`]-based claim API
1482// (defined further up in this file). The new flow is
1483// `Assert::default_checks().verdict().claim_<field>(stats).at_most(N)` for
1484// stats-struct-derived accessors, or `claim!(verdict, expr)` for
1485// expression-labeled claims. Both produce
1486// [`ClaimBuilder`]/[`SetClaim`]/[`SeqClaim`] under the hood and
1487// record outcomes onto the same [`AssertResult`] envelope that
1488// `assert_not_starved` / `assert_isolation` produce, so the two
1489// paths compose via [`Verdict::merge`].)