ktstr/assert/
reductions.rs

1use super::*;
2
3/// Check that workers only ran on CPUs in `expected`.
4///
5/// Any worker that used a CPU outside the expected set produces a
6/// failure with the unexpected CPU IDs listed.
7///
8/// ```
9/// # use ktstr::assert::assert_isolation;
10/// # use ktstr::workload::WorkerReport;
11/// # use std::collections::BTreeSet;
12/// # let report = WorkerReport {
13/// #     tid: 1, cpus_used: [0, 1].into_iter().collect(),
14/// #     work_units: 100, cpu_time_ns: 1_000_000, wall_time_ns: 2_000_000,
15/// #     off_cpu_ns: 1_000_000, migration_count: 0, migrations: vec![],
16/// #     max_gap_ms: 0, max_gap_cpu: 0, max_gap_at_ms: 0,
17/// #     wake_latencies_ns: vec![], wake_sample_total: 0,
18/// #     iteration_costs_ns: vec![], iteration_cost_sample_total: 0,
19/// #     iterations: 0,
20/// #     schedstat_run_delay_ns: 0, schedstat_run_count: 0,
21/// #     schedstat_cpu_time_ns: 0,
22/// #     completed: true,
23/// #     numa_pages: std::collections::BTreeMap::new(),
24/// #     vmstat_numa_pages_migrated: 0,
25/// #     exit_info: None,
26/// #     is_messenger: false,
27/// #     ..Default::default()
28/// # };
29/// let expected: BTreeSet<usize> = [0, 1, 2].into_iter().collect();
30/// assert!(assert_isolation(&[report], &expected).is_pass());
31/// ```
32pub fn assert_isolation(reports: &[WorkerReport], expected: &BTreeSet<usize>) -> AssertResult {
33    let mut r = AssertResult::pass();
34    for w in reports {
35        let bad: BTreeSet<usize> = w.cpus_used.difference(expected).copied().collect();
36        if !bad.is_empty() {
37            r.record_fail(AssertDetail::new(
38                DetailKind::Isolation,
39                format!("tid {} ran on unexpected CPUs {:?}", w.tid, bad),
40            ));
41        }
42    }
43    r
44}
45
46/// Nearest-rank percentile of a sorted slice (`p` in `[0.0, 1.0]`).
47///
48/// Returns the value at index `ceil(n * p) - 1`, clamped into
49/// `[0, n-1]`. For `n = 100` and `p = 0.99` this is `sorted[98]` (the
50/// 99th element in 1-indexed order), not `sorted[99]` (the max). The
51/// previous formulation, `ceil(n * 0.99)` without the `-1`, was
52/// off-by-one and returned the max for `n = 100`.
53///
54/// # Preconditions
55///
56/// `sorted` must be non-decreasing. The function indexes by rank
57/// without checking order, so an unsorted input silently returns
58/// the value at the computed index — a meaningless number. A
59/// `debug_assert!` enforces this in debug builds; release builds
60/// skip the check (the production callers sort immediately upstream
61/// — `cgroup_stats` and `assert_benchmarks` both
62/// `sorted.sort_unstable()` before this call — so the runtime
63/// guard is unnecessary in production paths).
64///
65/// An empty slice yields `0` (the caller should short-circuit
66/// before invoking).
67pub(crate) fn percentile(sorted: &[u64], p: f64) -> u64 {
68    if sorted.is_empty() {
69        return 0;
70    }
71    debug_assert!(
72        sorted.windows(2).all(|w| w[0] <= w[1]),
73        "percentile() requires sorted input; got slice with out-of-order pair",
74    );
75    let n = sorted.len();
76    let idx = ((n as f64 * p).ceil() as usize)
77        .saturating_sub(1)
78        .min(n - 1);
79    sorted[idx]
80}
81
82/// Build per-cgroup telemetry (pure measurement, no assertions) from
83/// worker reports. This is the SINGLE telemetry builder on the assertion
84/// path: `AssertPlan::assert_cgroup` calls it unconditionally and
85/// [`assert_not_starved`] wraps it with the default fairness checks, so
86/// per-cgroup [`CgroupStats`] is never gated behind whether a worker-check
87/// assertion was configured. Empty `reports` yield a `num_workers == 0`
88/// `CgroupStats` (the reduces below collapse to 0.0/0), so a declared
89/// cgroup that collected no reports surfaces as a zero-worker entry rather
90/// than silently vanishing from [`ScenarioStats::cgroups`].
91pub fn cgroup_stats(reports: &[WorkerReport]) -> CgroupStats {
92    let cpus: BTreeSet<usize> = reports
93        .iter()
94        .flat_map(|w| w.cpus_used.iter().copied())
95        .collect();
96    let pcts: Vec<f64> = reports
97        .iter()
98        .filter(|w| w.wall_time_ns > 0)
99        .map(|w| w.off_cpu_ns as f64 / w.wall_time_ns as f64 * 100.0)
100        .collect();
101
102    // None when no worker had measurable wall time (pcts empty):
103    // off-CPU% is undefined, and a not-measured cgroup must not read
104    // as a measured 0% / spread-0 (perfectly fair) one. Some(_)
105    // otherwise, including a real measured zero.
106    let min = pcts.iter().cloned().reduce(f64::min);
107    let max = pcts.iter().cloned().reduce(f64::max);
108    let avg = if pcts.is_empty() {
109        None
110    } else {
111        Some(pcts.iter().sum::<f64>() / pcts.len() as f64)
112    };
113    let spread = match (min, max) {
114        (Some(lo), Some(hi)) => Some(hi - lo),
115        _ => None,
116    };
117
118    let worst_gap = reports.iter().max_by_key(|w| w.max_gap_ms);
119    let (gap_ms, gap_cpu) = worst_gap
120        .map(|w| (w.max_gap_ms, w.max_gap_cpu))
121        .unwrap_or((0, 0));
122
123    // Compute benchmarking stats from worker reports.
124    let all_latencies: Vec<u64> = reports
125        .iter()
126        .flat_map(|w| w.wake_latencies_ns.iter().copied())
127        .collect();
128    let (p99_us, median_us, lat_cv) = if all_latencies.is_empty() {
129        (0.0, 0.0, 0.0)
130    } else {
131        let mut sorted = all_latencies.clone();
132        sorted.sort_unstable();
133        let p99 = percentile(&sorted, 0.99) as f64 / 1000.0;
134        // Median routes through `percentile(sorted, 0.5)` so the
135        // nearest-rank algorithm matches every other percentile in
136        // the project (p99, schbench's `lat99`, the BPF latency
137        // histograms). A bare `sorted[n/2]` would pick the upper of
138        // the two middle samples for even `n`, while `percentile`
139        // returns the value at `ceil(n * 0.5) - 1` — the lower of
140        // the two middles — and that lower-bound convention is what
141        // the docs on [`CgroupStats::median_wake_latency_us`] and
142        // the schbench cross-reference promise.
143        let median = percentile(&sorted, 0.5) as f64 / 1000.0;
144        let n = all_latencies.len() as f64;
145        let mean_ns = all_latencies.iter().sum::<u64>() as f64 / n;
146        let cv = if mean_ns > 0.0 {
147            let variance = all_latencies
148                .iter()
149                .map(|&v| (v as f64 - mean_ns).powi(2))
150                .sum::<f64>()
151                / n;
152            variance.sqrt() / mean_ns
153        } else {
154            0.0
155        };
156        (p99, median, cv)
157    };
158
159    // Timer-latency reductions (WorkType::TimerLatency), pooled across the
160    // cgroup's workers exactly like the wake-latency block above but over the
161    // distinct `timer_latencies_ns` reservoir. p999 is the deep RT tail; worst
162    // is the single maximum. All `0.0` when no worker recorded timer samples.
163    let all_timer: Vec<u64> = reports
164        .iter()
165        .flat_map(|w| w.timer_latencies_ns.iter().copied())
166        .collect();
167    let (median_timer, p99_timer, p999_timer, worst_timer) = if all_timer.is_empty() {
168        (0.0, 0.0, 0.0, 0.0)
169    } else {
170        let mut sorted = all_timer.clone();
171        sorted.sort_unstable();
172        (
173            percentile(&sorted, 0.5) as f64 / 1000.0,
174            percentile(&sorted, 0.99) as f64 / 1000.0,
175            percentile(&sorted, 0.999) as f64 / 1000.0,
176            *sorted.last().expect("non-empty: checked above") as f64 / 1000.0,
177        )
178    };
179
180    // saturating folds throughout this builder: pool the per-worker guest-runtime
181    // counters (iterations, migrations, cpu-time-ns, sample totals, numa pages)
182    // overflow-safely. This worker->cgroup fold is the FIRST cross-source sum and
183    // the one most exposed to a corrupt/hostile WorkerReport; a u64::MAX component
184    // clamps to u64::MAX instead of debug-panicking / release-wrapping a per-cgroup
185    // total (and the derived migration-ratio / page-locality) to a silently-wrong
186    // value. saturating_add is exact for every in-range value.
187    let total_iters: u64 = reports
188        .iter()
189        .map(|w| w.iterations)
190        .fold(0u64, u64::saturating_add);
191    let run_delays: Vec<f64> = reports
192        .iter()
193        .map(|w| w.schedstat_run_delay_ns as f64 / 1000.0)
194        .collect();
195    let mean_run_delay = if run_delays.is_empty() {
196        0.0
197    } else {
198        run_delays.iter().sum::<f64>() / run_delays.len() as f64
199    };
200    let worst_run_delay = run_delays.iter().cloned().reduce(f64::max).unwrap_or(0.0);
201
202    let total_mig: u64 = reports
203        .iter()
204        .map(|w| w.migration_count)
205        .fold(0u64, u64::saturating_add);
206    let mig_ratio = migration_ratio_of(total_mig, total_iters);
207
208    // Cross-node page-migration ratio: pages migrated cross-node over the
209    // cgroup's total allocated pages. `vmstat_numa_pages_migrated` is a
210    // system-wide delta each worker captured over its own loop; concurrent
211    // workers observe overlapping deltas, so take the MAX across the cgroup
212    // (summing would inflate by the worker count) over the cgroup-wide
213    // total of allocated pages. Pure measurement — populated whenever NUMA
214    // pages were seen, 0.0 otherwise. (The `max_cross_node_migration_ratio`
215    // CHECK in `AssertPlan::assert_cgroup` recomputes the same raw counts
216    // for its diagnostic; this is the always-on telemetry.)
217    let total_numa_pages: u64 = reports
218        .iter()
219        .map(|w| {
220            w.numa_pages
221                .values()
222                .copied()
223                .fold(0u64, u64::saturating_add)
224        })
225        .fold(0u64, u64::saturating_add);
226    let migrated_pages: u64 = reports
227        .iter()
228        .map(|w| w.vmstat_numa_pages_migrated)
229        .max()
230        .unwrap_or(0);
231    let cross_node_ratio = cross_node_migration_ratio_of(migrated_pages, total_numa_pages);
232
233    // Whole-run taobench engine COUNTER aggregate, pooled across this cgroup's
234    // `WorkType::Taobench` workers: Σ ops, MAX wall window (shared across the
235    // concurrent workers), per `TaobenchStats::merge`. `None` for a cgroup with no
236    // Taobench worker. The run-level cross-cgroup pool + the qps/hit Rate
237    // derivation happen in `populate_run_pooled_taobench`; this is the per-cgroup
238    // raw carrier, the taobench analogue of `total_iterations`. Counters only —
239    // serve latency is per-phase data (the `taobench_serve_*_us_whole` keys union
240    // the `PhaseCgroupStats::taobench` histograms).
241    let taobench_whole = reports
242        .iter()
243        .filter_map(|w| w.taobench_whole.as_ref())
244        .fold(
245            None,
246            |acc: Option<crate::workload::taobench::run::TaobenchStats>, t| {
247                Some(match acc {
248                    Some(mut a) => {
249                        a.merge(t);
250                        a
251                    }
252                    None => *t,
253                })
254            },
255        );
256
257    CgroupStats {
258        // Empty here; collect_handles labels the entry post-hoc (it has
259        // the cgroup name in scope, this reports-only builder does not).
260        cgroup_name: String::new(),
261        num_workers: reports.len(),
262        num_cpus: cpus.len(),
263        cpus_used: cpus,
264        avg_off_cpu_pct: avg,
265        min_off_cpu_pct: min,
266        max_off_cpu_pct: max,
267        spread,
268        max_gap_ms: gap_ms,
269        max_gap_cpu: gap_cpu,
270        total_migrations: total_mig,
271        migration_ratio: mig_ratio,
272        p99_wake_latency_us: p99_us,
273        median_wake_latency_us: median_us,
274        wake_latency_cv: lat_cv,
275        // `0.0` above is a not-measured sentinel iff no worker recorded a
276        // wake/timer sample; the flag distinguishes that from a measured zero
277        // so the run-level re-pool excludes (not folds in) a no-sample cgroup.
278        wake_measured: !all_latencies.is_empty(),
279        median_timer_latency_us: median_timer,
280        p99_timer_latency_us: p99_timer,
281        p999_timer_latency_us: p999_timer,
282        worst_timer_latency_us: worst_timer,
283        timer_measured: !all_timer.is_empty(),
284        total_iterations: total_iters,
285        total_cpu_time_ns: reports
286            .iter()
287            .map(|w| w.schedstat_cpu_time_ns)
288            .fold(0u64, u64::saturating_add),
289        mean_run_delay_us: mean_run_delay,
290        worst_run_delay_us: worst_run_delay,
291        // Run-delay is measured whenever a worker exists (a worker with `0.0`
292        // delay is a real measured zero); only a worker-less cohort is
293        // not-measured.
294        run_delay_measured: !run_delays.is_empty(),
295        // page_locality requires the expected NUMA node set (the cpuset's
296        // nodes), which this reports-only builder does not have. It is
297        // populated by `AssertPlan::assert_cgroup` when `numa_nodes` is
298        // supplied; left 0.0 here (no NUMA context).
299        page_locality: 0.0,
300        cross_node_migration_ratio: cross_node_ratio,
301        taobench_whole,
302        ext_metrics: BTreeMap::new(),
303    }
304}
305
306/// Migrations per iteration; `0.0` when no iterations ran (a measured-zero,
307/// not a rate over zero). Single-sourced so the per-phase carrier
308/// (`write_carrier_scalars`) and [`cgroup_stats`] (whose `migration_ratio`
309/// feeds the run-level `worst_migration_ratio` fold) compute it identically.
310pub(crate) fn migration_ratio_of(total_migrations: u64, total_iterations: u64) -> f64 {
311    if total_iterations > 0 {
312        total_migrations as f64 / total_iterations as f64
313    } else {
314        0.0
315    }
316}
317
318/// Cross-node migrated pages over the cgroup-wide total allocated pages; `0.0`
319/// when no NUMA pages were seen. Single-sourced with [`cgroup_stats`]. A CHURN
320/// ratio, NOT a bounded `[0,1]` fraction: the numerator is cumulative migration
321/// EVENTS (`/proc/vmstat numa_pages_migrated`, which counts each migration, so a
322/// page can be counted more than once) and the denominator is a residency
323/// SNAPSHOT — so the result can legitimately exceed 1.0 under heavy re-migration.
324pub(crate) fn cross_node_migration_ratio_of(migrated_pages: u64, total_numa_pages: u64) -> f64 {
325    if total_numa_pages > 0 {
326        migrated_pages as f64 / total_numa_pages as f64
327    } else {
328        0.0
329    }
330}
331
332/// Fraction of allocated pages on the expected NUMA nodes; `0.0` when no NUMA
333/// pages were seen. Single-sourced with the per-phase carrier.
334pub(crate) fn page_locality_of(numa_pages_local: u64, numa_pages_total: u64) -> f64 {
335    if numa_pages_total > 0 {
336        numa_pages_local as f64 / numa_pages_total as f64
337    } else {
338        0.0
339    }
340}
341
342/// Iterations per worker; `None` when there are no workers (undefined,
343/// distinct from a measured zero). Single-sourced with
344/// [`CgroupStats::iterations_per_worker`](crate::assert::CgroupStats::iterations_per_worker)
345/// so the run-level WorstLowest fold stays bit-identical.
346pub(crate) fn iterations_per_worker_of(num_workers: usize, total_iterations: u64) -> Option<f64> {
347    if num_workers > 0 {
348        Some(total_iterations as f64 / num_workers as f64)
349    } else {
350        None
351    }
352}
353
354/// Worker iterations per CPU-second (`total_iterations / (total_cpu_time_ns /
355/// 1e9)`); `None` when there are no workers or no on-CPU time captured.
356/// Single-sourced with
357/// [`CgroupStats::iterations_per_cpu_sec`](crate::assert::CgroupStats::iterations_per_cpu_sec).
358pub(crate) fn iterations_per_cpu_sec_of(
359    num_workers: usize,
360    total_cpu_time_ns: u64,
361    total_iterations: u64,
362) -> Option<f64> {
363    if num_workers == 0 || total_cpu_time_ns == 0 {
364        return None;
365    }
366    Some(total_iterations as f64 / (total_cpu_time_ns as f64 / 1e9))
367}
368
369/// Per-phase per-cgroup RAW-component builder — the sibling of [`cgroup_stats`]
370/// that emits [`PhaseCgroupStats`]'s un-reduced components instead of the
371/// reduced ratios/percentiles, so the distributional re-pool recomputes each
372/// aggregate from the pooled components at every level. Every [`CgroupStats`]
373/// reduction re-pools from these fields: avg/min/max/spread off-CPU% from
374/// `off_cpu_pcts`; p99/median/CV from `wake_latencies_ns`; mean/worst run-delay
375/// from `run_delays_ns` (RAW ns, the re-pool divides by 1000); migration_ratio
376/// / iterations_per_cpu_sec / iterations_per_worker from the counters;
377/// page_locality / cross_node_migration_ratio from the numa counters; the
378/// coupled worst gap from the argmax pair; cpus_used / num_cpus from `cpus_used`.
379///
380/// `expected_nodes` is this cgroup's cpuset NUMA-node set (from
381/// [`crate::topology::TestTopology::numa_nodes_for_cpuset`]); `numa_pages_local`
382/// is the page count on those nodes (0 when `None`, mirroring [`cgroup_stats`]
383/// leaving `page_locality` 0.0 without NUMA context — the partition lives with
384/// the caller that has the node set, as [`AssertPlan::assert_cgroup`] does).
385/// The whole-run [`cgroup_stats`] reductions stay the run-level authority; this
386/// feeds the per-phase [`PhaseBucket::per_cgroup`] carrier.
387///
388/// RE-POOL GUARD CONTRACT: the div-by-zero / not-measured guards live in
389/// [`cgroup_stats`], NOT in these raw components — a future re-pool over them
390/// MUST mirror them exactly or ship a NaN/Inf or a not-measured-vs-zero
391/// collapse: `migration_ratio` only when `total_iterations > 0`;
392/// `cross_node_migration_ratio` / `page_locality` only when `numa_pages_total >
393/// 0`; mean/worst run-delay only when `run_delays_ns` is non-empty; and
394/// avg/min/max/spread off-CPU% return None (not 0.0) when `off_cpu_pcts` is
395/// empty (the not-measured state).
396pub(crate) fn phase_cgroup_stats(
397    reports: &[WorkerReport],
398    expected_nodes: Option<&BTreeSet<usize>>,
399) -> PhaseCgroupStats {
400    let cpus_used: BTreeSet<usize> = reports
401        .iter()
402        .flat_map(|w| w.cpus_used.iter().copied())
403        .collect();
404    // Per-worker off-CPU% (only workers with measurable wall time), un-reduced.
405    // EMPTY = not measured: the re-pool then yields None for avg/min/max/spread,
406    // preserving the not-measured-vs-measured-zero distinction cgroup_stats keeps.
407    let off_cpu_pcts: Vec<f64> = reports
408        .iter()
409        .filter(|w| w.wall_time_ns > 0)
410        .map(|w| w.off_cpu_ns as f64 / w.wall_time_ns as f64 * 100.0)
411        .collect();
412    // Pool every worker's already per-worker-capped wake-latency vec, RE-CAPPING
413    // the concatenation at MAX_WAKE_SAMPLES via the same Algorithm-R reservoir the
414    // per-worker path uses. This carrier is the FIRST to serialize raw samples
415    // over the size-limited guest bulk port (the AssertResult); without the
416    // re-cap the pool would be workers × MAX_WAKE_SAMPLES and could overrun the
417    // 16 MiB frame on a many-core host, flipping a PASS to a truncated FAIL. The
418    // reservoir is distribution-preserving, so p99 / median / CV re-pool over it
419    // as cgroup_stats does over the per-worker pool; `wake_sample_total` keeps the
420    // TRUE pre-cap population for the re-pool. PARITY: for pools ≤
421    // MAX_WAKE_SAMPLES the reservoir is the full concatenation, so the re-pool is
422    // VALUE-FOR-VALUE with cgroup_stats; above the cap it is a distribution-
423    // preserving SUBSAMPLE (cgroup_stats keeps the full concat), so the re-pool is
424    // distribution-equivalent, not byte-identical — see the wake_latencies_ns
425    // field doc for the full contract. PhaseCgroupStats::merge keeps same-name
426    // carriers bounded too: ≤cap it concatenates (value-for-value), >cap it uses a
427    // population-WEIGHTED reservoir merge (weighted_merge_reservoirs) so the merged
428    // subsample is unbiased rather than length-skewed toward the smaller carrier.
429    let mut wake_latencies_ns: Vec<u64> = Vec::new();
430    let mut pooled_wake_count: u64 = 0;
431    for w in reports {
432        for &sample in &w.wake_latencies_ns {
433            crate::workload::reservoir_push(
434                &mut wake_latencies_ns,
435                &mut pooled_wake_count,
436                sample,
437                crate::workload::MAX_WAKE_SAMPLES,
438            );
439        }
440    }
441    // saturating folds (per cgroup_stats's rationale): overflow-safe pool of the
442    // per-worker guest-runtime counters into this per-phase carrier.
443    let wake_sample_total: u64 = reports
444        .iter()
445        .map(|w| w.wake_sample_total)
446        .fold(0u64, u64::saturating_add);
447    // Distinct timer reservoir, pooled across workers exactly like
448    // wake_latencies_ns; timer_sample_total keeps the true pre-cap population.
449    let mut timer_latencies_ns: Vec<u64> = Vec::new();
450    let mut pooled_timer_count: u64 = 0;
451    for w in reports {
452        for &sample in &w.timer_latencies_ns {
453            crate::workload::reservoir_push(
454                &mut timer_latencies_ns,
455                &mut pooled_timer_count,
456                sample,
457                crate::workload::MAX_WAKE_SAMPLES,
458            );
459        }
460    }
461    let timer_sample_total: u64 = reports
462        .iter()
463        .map(|w| w.timer_sample_total)
464        .fold(0u64, u64::saturating_add);
465    // RAW ns, one per worker — NOT divided by 1000. cgroup_stats divides at
466    // reduction time; the re-pool over the concatenated samples divides once,
467    // so pre-dividing here would double-divide (a 1000x error).
468    let run_delays_ns: Vec<u64> = reports.iter().map(|w| w.schedstat_run_delay_ns).collect();
469    // Coupled worst gap: take (ms, cpu) TOGETHER from the worst worker (argmax),
470    // never two independent maxes — keeps the gap bound to its CPU.
471    let (max_gap_ms, max_gap_cpu) = reports
472        .iter()
473        .max_by_key(|w| w.max_gap_ms)
474        .map(|w| (w.max_gap_ms, w.max_gap_cpu))
475        .unwrap_or((0, 0));
476    let total_migrations: u64 = reports
477        .iter()
478        .map(|w| w.migration_count)
479        .fold(0u64, u64::saturating_add);
480    let total_iterations: u64 = reports
481        .iter()
482        .map(|w| w.iterations)
483        .fold(0u64, u64::saturating_add);
484    // schedstat_cpu_time_ns (task->se.sum_exec_runtime), NOT cpu_time_ns
485    // (CLOCK_THREAD_CPUTIME_ID) — matches cgroup_stats's total_cpu_time_ns.
486    let total_cpu_time_ns: u64 = reports
487        .iter()
488        .map(|w| w.schedstat_cpu_time_ns)
489        .fold(0u64, u64::saturating_add);
490    let numa_pages_total: u64 = reports
491        .iter()
492        .map(|w| {
493            w.numa_pages
494                .values()
495                .copied()
496                .fold(0u64, u64::saturating_add)
497        })
498        .fold(0u64, u64::saturating_add);
499    // System-wide /proc/vmstat numa_pages_migrated delta each worker observes
500    // redundantly -> MAX, not SUM (summing inflates by the worker count).
501    let cross_node_migrated: u64 = reports
502        .iter()
503        .map(|w| w.vmstat_numa_pages_migrated)
504        .max()
505        .unwrap_or(0);
506    // Pages on the cgroup's expected NUMA nodes (page_locality numerator),
507    // partitioned exactly as AssertPlan::assert_cgroup does; 0 without a node
508    // set (mirrors cgroup_stats leaving page_locality 0.0 absent NUMA context).
509    let numa_pages_local: u64 = expected_nodes
510        .map(|nodes| {
511            let mut local = 0u64;
512            for w in reports {
513                for (&node, &count) in &w.numa_pages {
514                    if nodes.contains(&node) {
515                        local = local.saturating_add(count);
516                    }
517                }
518            }
519            local
520        })
521        .unwrap_or(0);
522    PhaseCgroupStats {
523        num_workers: reports.len(),
524        cpus_used,
525        wake_latencies_ns,
526        wake_sample_total,
527        timer_latencies_ns,
528        timer_sample_total,
529        run_delays_ns,
530        off_cpu_pcts,
531        total_migrations,
532        total_iterations,
533        total_cpu_time_ns,
534        numa_pages_local,
535        numa_pages_total,
536        cross_node_migrated,
537        max_gap_ms,
538        max_gap_cpu,
539        // Fresh carrier built from worker reports — never stripped.
540        stripped: false,
541        // Derived scalars are filled post-build by derive_phase_metrics.
542        metrics: std::collections::BTreeMap::new(),
543        // schbench rides PhaseSlice (the backdrop per-phase carrier), not the
544        // whole-run WorkerReports this fn pools, so it is always None here.
545        schbench: None,
546        taobench: None,
547    }
548}
549
550/// Build a per-cgroup carrier from ONE already-per-phase backdrop
551/// [`crate::workload::PhaseSlice`] (no whole-run differencing — the
552/// slice's counter fields are already per-phase deltas). The single-worker
553/// analog of [`phase_cgroup_stats`]: `num_workers` is 1 and each list field
554/// carries this one worker's value, so [`PhaseCgroupStats::merge`] pools
555/// slices across backdrop workers into a per-epoch carrier identically to
556/// how [`phase_cgroup_stats`] pools whole-run reports.
557pub(crate) fn phase_slice_to_cgroup_stats(
558    slice: &crate::workload::PhaseSlice,
559    expected_nodes: Option<&BTreeSet<usize>>,
560) -> PhaseCgroupStats {
561    // Per-phase off-CPU%, one value, only when wall time was measured
562    // (wall_ns == 0 => the worker never ran this phase => EMPTY = not
563    // measured, matching phase_cgroup_stats's not-measured contract).
564    let off_cpu_pcts: Vec<f64> = if slice.wall_ns > 0 {
565        vec![slice.off_cpu_ns as f64 / slice.wall_ns as f64 * 100.0]
566    } else {
567        Vec::new()
568    };
569    // saturating folds: overflow-safe pool of this slice's per-node page counts.
570    let numa_pages_total: u64 = slice
571        .numa_pages
572        .values()
573        .copied()
574        .fold(0u64, u64::saturating_add);
575    let numa_pages_local: u64 = expected_nodes
576        .map(|nodes| {
577            slice
578                .numa_pages
579                .iter()
580                .filter(|(node, _)| nodes.contains(node))
581                .map(|(_, &count)| count)
582                .fold(0u64, u64::saturating_add)
583        })
584        .unwrap_or(0);
585    PhaseCgroupStats {
586        num_workers: 1,
587        cpus_used: slice.cpus_used.clone(),
588        wake_latencies_ns: slice.wake_latencies_ns.clone(),
589        wake_sample_total: slice.wake_sample_total,
590        timer_latencies_ns: slice.timer_latencies_ns.clone(),
591        timer_sample_total: slice.timer_sample_total,
592        // RAW ns, one per worker (NOT divided) — same contract as
593        // phase_cgroup_stats::run_delays_ns.
594        run_delays_ns: vec![slice.run_delay_ns],
595        off_cpu_pcts,
596        total_migrations: slice.migration_count,
597        total_iterations: slice.iterations,
598        total_cpu_time_ns: slice.schedstat_cpu_time_ns,
599        numa_pages_local,
600        numa_pages_total,
601        cross_node_migrated: slice.vmstat_numa_pages_migrated,
602        max_gap_ms: slice.max_gap_ms,
603        max_gap_cpu: slice.max_gap_cpu,
604        stripped: false,
605        // Derived scalars are filled post-build by derive_phase_metrics.
606        metrics: std::collections::BTreeMap::new(),
607        // Carry the per-phase schbench engine metrics through (None for every
608        // non-schbench backdrop slice); PhaseCgroupStats::merge pools them.
609        schbench: slice.schbench.clone(),
610        // Carry the per-phase taobench engine metrics through (None for every
611        // non-taobench backdrop slice); PhaseCgroupStats::merge pools them.
612        taobench: slice.taobench.clone(),
613    }
614}
615
616/// Pool a set of backdrop [`crate::workload::PhaseSlice`]s (all for
617/// the SAME epoch, one per worker) into a single per-cgroup carrier via
618/// [`PhaseCgroupStats::merge`] — the per-phase analog of
619/// [`phase_cgroup_stats`] pooling whole-run reports. An empty input yields
620/// a zero-worker carrier (all fields empty/0, `stripped: false`) so a phase
621/// no backdrop worker observed renders as not-measured rather than
622/// panicking.
623pub(crate) fn pool_phase_slice_stats(
624    slices: &[&crate::workload::PhaseSlice],
625    expected_nodes: Option<&BTreeSet<usize>>,
626) -> PhaseCgroupStats {
627    let mut iter = slices
628        .iter()
629        .map(|s| phase_slice_to_cgroup_stats(s, expected_nodes));
630    match iter.next() {
631        Some(first) => iter.fold(first, PhaseCgroupStats::merge),
632        None => PhaseCgroupStats {
633            num_workers: 0,
634            cpus_used: BTreeSet::new(),
635            wake_latencies_ns: Vec::new(),
636            wake_sample_total: 0,
637            timer_latencies_ns: Vec::new(),
638            timer_sample_total: 0,
639            run_delays_ns: Vec::new(),
640            off_cpu_pcts: Vec::new(),
641            total_migrations: 0,
642            total_iterations: 0,
643            total_cpu_time_ns: 0,
644            numa_pages_local: 0,
645            numa_pages_total: 0,
646            cross_node_migrated: 0,
647            max_gap_ms: 0,
648            max_gap_cpu: 0,
649            stripped: false,
650            metrics: std::collections::BTreeMap::new(),
651            schbench: None,
652            taobench: None,
653        },
654    }
655}
656
657/// Expand a backdrop worker set's per-phase
658/// [`crate::workload::PhaseSlice`]s into one [`PhaseBucket`] per epoch,
659/// keyed by the epoch as `step_index`, each pooling that epoch's slices
660/// across workers via [`pool_phase_slice_stats`]. BASELINE (epoch 0) and
661/// inter-step-gap (`u32::MAX`) epochs are skipped — they have no paired
662/// host bucket and the host fold discards them. Called by the backdrop
663/// (None-`step_index`) arm of [`crate::scenario::collect_handles`]; the
664/// host's [`fold_guest_per_cgroup_into_host_buckets`] then unions these
665/// into the host-rebuilt buckets (matched epochs) or surfaces them as
666/// orphan not-measured windows. Extracted (rather than inlined in
667/// collect_handles, which calls `stop_and_collect`) so the grouping +
668/// per-epoch pooling is unit-testable directly.
669pub(crate) fn expand_backdrop_phase_buckets(
670    name: &str,
671    reports: &[WorkerReport],
672    expected_nodes: Option<&BTreeSet<usize>>,
673) -> Vec<PhaseBucket> {
674    let mut by_epoch: std::collections::BTreeMap<u32, Vec<&crate::workload::PhaseSlice>> =
675        std::collections::BTreeMap::new();
676    for report in reports {
677        for slice in &report.phase_slices {
678            if slice.phase_epoch == 0 || slice.phase_epoch == u32::MAX {
679                continue;
680            }
681            by_epoch.entry(slice.phase_epoch).or_default().push(slice);
682        }
683    }
684    by_epoch
685        .into_iter()
686        .map(|(epoch, slices)| {
687            let mut per_cgroup = std::collections::BTreeMap::new();
688            per_cgroup.insert(
689                name.to_string(),
690                pool_phase_slice_stats(&slices, expected_nodes),
691            );
692            // Lossless: a real epoch == u32::from(phase_step_index: u16),
693            // and 0 / u32::MAX are filtered above.
694            let step_index = epoch as u16;
695            PhaseBucket {
696                step_index,
697                label: Phase::from(step_index).to_string(),
698                start_ms: u64::MAX,
699                end_ms: 0,
700                sample_count: 0,
701                metrics: std::collections::BTreeMap::new(),
702                per_cgroup,
703            }
704        })
705        .collect()
706}
707
708/// Build the single-bucket guest-side per-phase carrier for one step-local
709/// cgroup: a [`PhaseBucket`] at `step_index` whose only payload is the
710/// `per_cgroup` entry `name -> phase_cgroup_stats(reports, expected_nodes)`.
711///
712/// The guest emits one of these per step-local cgroup at `collect_step`
713/// teardown ([`crate::scenario::collect_handles`]). The window is the
714/// merge-neutral `(u64::MAX, 0)` sentinel and `metrics` is empty: the carrier
715/// contributes ONLY `per_cgroup`. When folded into the host-rebuilt bucket of
716/// the same `step_index` ([`fold_guest_per_cgroup_into_host_buckets`] via
717/// [`merge_matched_phase_buckets`]) the `MAX`/`0` window is a no-op against the
718/// host's real window (`min`/`max`), so the host's window and metrics win and
719/// only `per_cgroup` is carried. The `label` uses [`Phase`]'s `Display` so an
720/// orphan carrier (no host bucket) still reads `BASELINE`/`Step[k]`.
721pub(crate) fn step_per_cgroup_bucket(
722    name: &str,
723    reports: &[WorkerReport],
724    expected_nodes: Option<&BTreeSet<usize>>,
725    step_index: u16,
726) -> PhaseBucket {
727    let mut per_cgroup = std::collections::BTreeMap::new();
728    per_cgroup.insert(
729        name.to_string(),
730        phase_cgroup_stats(reports, expected_nodes),
731    );
732    PhaseBucket {
733        step_index,
734        label: Phase::from(step_index).to_string(),
735        start_ms: u64::MAX,
736        end_ms: 0,
737        sample_count: 0,
738        metrics: std::collections::BTreeMap::new(),
739        per_cgroup,
740    }
741}
742
743/// Roll a single cgroup's [`CgroupStats`] up into a one-cgroup
744/// [`ScenarioStats`]. The KEPT typed `worst_*` fields carry this cgroup's
745/// values and fold across cgroups in [`AssertResult::merge`] by max (all are
746/// higher-is-worse): `worst_spread`, `worst_migration_ratio`, and the coupled
747/// `worst_gap_ms` / `worst_gap_cpu`. The two NUMA roll-ups
748/// (`worst_page_locality`, `worst_cross_node_migration_ratio`), the wake-latency
749/// / run-delay distributions, the iteration efficiencies, and the wake-latency
750/// tail ratio are NOT carried here — they have no typed field and re-pool
751/// run-level POST-merge in [`populate_run_distribution_metrics`]:
752/// `worst_page_locality` (WorstLowest) re-pools None-aware from
753/// `stats.phases[].per_cgroup` NUMA carriers (lowest per-cgroup locality, a
754/// measured 0.0 winning the lowest rather than being skipped as a sentinel),
755/// `worst_cross_node_migration_ratio` (WorstCrossNodeRatio) re-pools the MAX
756/// per-cgroup churn ratio from the same carriers, the tail ratio is the max over
757/// the per-cgroup `CgroupStats::wake_latency_tail_ratio`, and the distributions /
758/// efficiencies pool from `stats.phases[].per_cgroup` / `stats.cgroups`.
759/// `cgroups` carries exactly this one entry so merge appends one per
760/// handle without double-counting.
761pub(crate) fn scenario_stats_for_cgroup(cg: &CgroupStats) -> ScenarioStats {
762    ScenarioStats {
763        total_workers: cg.num_workers,
764        total_cpus: cg.num_cpus,
765        total_migrations: cg.total_migrations,
766        // worst_spread is higher-is-worse (merge takes max). A
767        // not-measured cgroup (`spread == None`) maps to 0.0 — the
768        // neutral element for max, and the gauntlet layer's
769        // documented no-data convention. A measured zero stays 0.0.
770        worst_spread: cg.spread.unwrap_or(0.0),
771        worst_gap_ms: cg.max_gap_ms,
772        worst_gap_cpu: cg.max_gap_cpu,
773        worst_migration_ratio: cg.migration_ratio,
774        total_iterations: cg.total_iterations,
775        // worst_page_locality and worst_cross_node_migration_ratio are no longer
776        // typed fields — both re-pool from the per-phase NUMA carriers in
777        // populate_run_distribution_metrics. cg.page_locality is structurally 0.0
778        // here (it needs an expected-node set this reports-only builder lacks);
779        // cg.cross_node_migration_ratio IS populated but is a single pre-folded
780        // scalar that cannot carry the cross-phase SUM-migrated / LATEST-total
781        // fold, so the per-phase carriers own both.
782        ext_metrics: cg.ext_metrics.clone(),
783        cgroups: vec![cg.clone()],
784        phases: Vec::new(),
785    }
786}
787
788/// Record the DEFAULT fairness outcomes (Starved / Unfair / Stuck) for one
789/// cgroup against the framework default thresholds
790/// ([`spread_threshold_pct`] / [`gap_threshold_ms`]). Telemetry is built
791/// separately by [`cgroup_stats`]; this only appends fail outcomes, so it
792/// is shared by [`assert_not_starved`] and the `not_starved` arm of
793/// [`AssertPlan::assert_cgroup`] without rebuilding stats.
794pub(crate) fn record_default_fairness(
795    r: &mut AssertResult,
796    cg: &CgroupStats,
797    reports: &[WorkerReport],
798) {
799    for w in reports {
800        if w.work_units == 0 {
801            r.record_fail(AssertDetail::new(
802                DetailKind::Starved,
803                format!("tid {} starved (0 work units)", w.tid),
804            ));
805        }
806    }
807    // Off-cpu spread above the default threshold, gated on >=2 workers
808    // with measurable wall time (the historical `pcts.len() >= 2`).
809    // `cg.spread` is None when off-CPU% was not measured — inconclusive,
810    // never flagged unfair.
811    let measurable = reports.iter().filter(|w| w.wall_time_ns > 0).count();
812    let spread_limit = spread_threshold_pct();
813    if let Some(spread) = cg.spread
814        && spread > spread_limit
815        && measurable >= 2
816    {
817        r.record_fail(AssertDetail::new(
818            DetailKind::Unfair,
819            format!(
820                "unfair cgroup: spread={:.0}% ({:.0}-{:.0}%) {} workers on {} cpus (threshold {:.0}%)",
821                spread,
822                cg.min_off_cpu_pct.unwrap_or(0.0),
823                cg.max_off_cpu_pct.unwrap_or(0.0),
824                cg.num_workers,
825                cg.num_cpus,
826                spread_limit,
827            ),
828        ));
829    }
830    let gap_limit = gap_threshold_ms();
831    for w in reports {
832        if w.max_gap_ms > gap_limit {
833            r.record_fail(AssertDetail::new(
834                DetailKind::Stuck,
835                format!(
836                    "tid {} stuck {}ms on cpu{} at +{}ms (threshold {}ms)",
837                    w.tid, w.max_gap_ms, w.max_gap_cpu, w.max_gap_at_ms, gap_limit,
838                ),
839            ));
840        }
841    }
842}
843
844/// Default fairness check for one cgroup's worker reports: builds the
845/// per-cgroup telemetry ([`cgroup_stats`]) and records Starved / Unfair /
846/// Stuck against the framework default thresholds. Telemetry is ALWAYS
847/// populated — including a `num_workers == 0` entry for empty reports — so
848/// `r.stats.cgroups` is never empty for a declared cgroup, independent of
849/// whether any fail outcome fired.
850pub fn assert_not_starved(reports: &[WorkerReport]) -> AssertResult {
851    let cg = cgroup_stats(reports);
852    let mut r = AssertResult::pass();
853    record_default_fairness(&mut r, &cg, reports);
854    r.stats = scenario_stats_for_cgroup(&cg);
855    r
856}
857
858/// Check throughput parity across workers: coefficient of variation and
859/// minimum work rate.
860///
861/// `max_cv`: maximum allowed coefficient of variation (stddev/mean) for
862/// work_units / cpu_time_ns across workers. `None` skips the CV check.
863///
864/// `min_rate`: minimum work_units per CPU-second. `None` skips the floor check.
865///
866/// When every worker recorded `cpu_time_ns == 0`, both gates record
867/// their OWN Inconclusive outcome (the CV gate emits a "CV cannot be
868/// computed" detail; the min_rate gate emits a "rates cannot be
869/// computed" detail). Each gate carries its own diagnostic so a
870/// caller that supplies only one of the two threshold parameters
871/// sees the matching Inconclusive message and an operator reading
872/// [`AssertResult::inconclusive_details`] can identify which gate(s)
873/// misfired without re-deriving the inputs.
874///
875/// ```
876/// # use ktstr::assert::assert_throughput_parity;
877/// # use ktstr::workload::WorkerReport;
878/// # let mk = |units, cpu_ns| WorkerReport {
879/// #     tid: 1, cpus_used: [0].into_iter().collect(),
880/// #     work_units: units, cpu_time_ns: cpu_ns, wall_time_ns: cpu_ns,
881/// #     off_cpu_ns: cpu_ns, migration_count: 0, migrations: vec![],
882/// #     max_gap_ms: 0, max_gap_cpu: 0, max_gap_at_ms: 0,
883/// #     wake_latencies_ns: vec![], wake_sample_total: 0,
884/// #     iteration_costs_ns: vec![], iteration_cost_sample_total: 0,
885/// #     iterations: 0,
886/// #     schedstat_run_delay_ns: 0, schedstat_run_count: 0,
887/// #     schedstat_cpu_time_ns: 0,
888/// #     completed: true,
889/// #     numa_pages: std::collections::BTreeMap::new(),
890/// #     vmstat_numa_pages_migrated: 0,
891/// #     exit_info: None,
892/// #     is_messenger: false,
893/// #     ..Default::default()
894/// # };
895/// // Equal throughput -> low CV -> passes.
896/// let reports = [mk(1000, 1_000_000_000), mk(1000, 1_000_000_000)];
897/// assert!(assert_throughput_parity(&reports, Some(0.5), None).is_pass());
898/// ```
899pub fn assert_throughput_parity(
900    reports: &[WorkerReport],
901    max_cv: Option<f64>,
902    min_rate: Option<f64>,
903) -> AssertResult {
904    let mut r = AssertResult::pass();
905    if reports.is_empty() {
906        return r;
907    }
908
909    // Compute per-worker throughput: work_units / cpu_seconds
910    let rates: Vec<f64> = reports
911        .iter()
912        .map(|w| {
913            if w.cpu_time_ns == 0 {
914                0.0
915            } else {
916                w.work_units as f64 / (w.cpu_time_ns as f64 / 1e9)
917            }
918        })
919        .collect();
920
921    // CV is computed over MEASURED workers only (cpu_time_ns > 0). A zero-cpu
922    // worker's rate is unknowable — it is forced to 0.0 above so the min_rate
923    // gate below can index `reports[i]`, but that 0.0 is NOT a measured rate.
924    // Folding it into the mean/variance inflates the spread and FAILs a uniform
925    // workload that merely had one worker record no CPU time — the same exclusion
926    // the min_rate gate applies (it skips zero-cpu workers as "rate unknowable,
927    // not failing").
928    let measured: Vec<f64> = reports
929        .iter()
930        .zip(&rates)
931        .filter(|(w, _)| w.cpu_time_ns > 0)
932        .map(|(_, &rate)| rate)
933        .collect();
934    let mean = if measured.is_empty() {
935        0.0
936    } else {
937        measured.iter().sum::<f64>() / measured.len() as f64
938    };
939
940    // Detect the all-zero-cpu condition once so a call with both
941    // `max_cv` and `min_rate` set surfaces a single Inconclusive
942    // listing every threshold that couldn't evaluate, rather than
943    // emitting one record per gate (which produced duplicate
944    // "denominator is zero" diagnostics for the same root cause).
945    let all_zero_cpu = reports.iter().all(|w| w.cpu_time_ns == 0);
946
947    if all_zero_cpu && (max_cv.is_some() || min_rate.is_some()) {
948        let mut limits: Vec<String> = Vec::with_capacity(2);
949        if let Some(cv_limit) = max_cv {
950            limits.push(format!("max_cv {cv_limit:.3}"));
951        }
952        if let Some(floor) = min_rate {
953            limits.push(format!("min_rate {floor:.0}"));
954        }
955        r.record_inconclusive(AssertDetail::new(
956            DetailKind::Benchmark,
957            format!(
958                "throughput parity inconclusive: all {} workers recorded zero cpu_time_ns — \
959                 denominator is zero, rates cannot be computed; {} neither pass nor fail \
960                 (was the workload able to run?)",
961                reports.len(),
962                limits.join(" + "),
963            ),
964        ));
965        return r;
966    }
967
968    if let Some(cv_limit) = max_cv
969        && mean > 0.0
970        && measured.len() >= 2
971    {
972        let n_measured = measured.len() as f64;
973        let variance = measured.iter().map(|r| (r - mean).powi(2)).sum::<f64>() / n_measured;
974        let stddev = variance.sqrt();
975        let cv = stddev / mean;
976        if cv > cv_limit {
977            r.record_fail(AssertDetail::new(
978                DetailKind::Benchmark,
979                format!(
980                    "throughput CV {cv:.3} exceeds limit {cv_limit:.3} (mean={mean:.0} work/cpu_s)"
981                ),
982            ));
983        }
984    }
985
986    if let Some(floor) = min_rate {
987        // Skip per-worker zero-cpu cases: their rate is forced to
988        // 0.0 above, and comparing that to `floor` would synthesize
989        // a guaranteed Fail with a misleading "below floor" message
990        // when the real story is "this worker recorded no CPU time
991        // — the rate is unknowable, not failing". The all-zero-cpu
992        // case is already handled at the top of the function as a
993        // single combined Inconclusive.
994        for (i, &rate) in rates.iter().enumerate() {
995            if reports[i].cpu_time_ns == 0 {
996                continue;
997            }
998            if rate < floor {
999                r.record_fail(AssertDetail::new(
1000                    DetailKind::Benchmark,
1001                    format!(
1002                        "worker {} throughput {rate:.0} work/cpu_s below floor {floor:.0}",
1003                        reports[i].tid
1004                    ),
1005                ));
1006            }
1007        }
1008    }
1009
1010    r
1011}
1012
1013/// Check benchmarking metrics: p99 wake latency, wake latency CV,
1014/// and minimum iteration rate.
1015///
1016/// ```
1017/// # use ktstr::assert::assert_benchmarks;
1018/// # use ktstr::workload::WorkerReport;
1019/// # let report = WorkerReport {
1020/// #     tid: 1, cpus_used: [0].into_iter().collect(),
1021/// #     work_units: 1000, cpu_time_ns: 2_500_000_000,
1022/// #     wall_time_ns: 5_000_000_000, off_cpu_ns: 2_500_000_000,
1023/// #     migration_count: 0, migrations: vec![],
1024/// #     max_gap_ms: 50, max_gap_cpu: 0, max_gap_at_ms: 1000,
1025/// #     wake_latencies_ns: vec![100, 200, 300, 400, 500],
1026/// #     wake_sample_total: 5,
1027/// #     iteration_costs_ns: vec![], iteration_cost_sample_total: 0,
1028/// #     iterations: 1000,
1029/// #     schedstat_run_delay_ns: 0, schedstat_run_count: 0,
1030/// #     schedstat_cpu_time_ns: 0,
1031/// #     completed: true,
1032/// #     numa_pages: std::collections::BTreeMap::new(),
1033/// #     vmstat_numa_pages_migrated: 0,
1034/// #     exit_info: None,
1035/// #     is_messenger: false,
1036/// #     ..Default::default()
1037/// # };
1038/// // p99 = 500ns, well under 10000ns limit.
1039/// assert!(assert_benchmarks(&[report], Some(10000), None, None).is_pass());
1040/// ```
1041pub fn assert_benchmarks(
1042    reports: &[WorkerReport],
1043    max_p99_ns: Option<u64>,
1044    max_cv: Option<f64>,
1045    min_iter_rate: Option<f64>,
1046) -> AssertResult {
1047    let mut r = AssertResult::pass();
1048    if reports.is_empty() {
1049        // No worker reports means nothing to measure — any benchmark
1050        // threshold the caller supplied cannot be evaluated. A silent
1051        // pass would let thresholds look "green" on a broken run that
1052        // never produced signal; surface it as skip so the operator
1053        // knows the benchmark was not actually exercised.
1054        return AssertResult::skip("no worker reports — benchmark skipped");
1055    }
1056
1057    // Collect all wake latencies across workers.
1058    let all_latencies: Vec<u64> = reports
1059        .iter()
1060        .flat_map(|w| w.wake_latencies_ns.iter().copied())
1061        .collect();
1062
1063    if let Some(p99_limit) = max_p99_ns
1064        && !all_latencies.is_empty()
1065    {
1066        let mut sorted = all_latencies.clone();
1067        sorted.sort_unstable();
1068        let p99 = percentile(&sorted, 0.99);
1069        if p99 > p99_limit {
1070            r.record_fail(AssertDetail::new(
1071                DetailKind::Benchmark,
1072                format!(
1073                    "p99 wake latency {p99}ns exceeds limit {p99_limit}ns ({} samples)",
1074                    sorted.len()
1075                ),
1076            ));
1077        }
1078    }
1079
1080    if let Some(cv_limit) = max_cv
1081        && all_latencies.len() >= 2
1082    {
1083        let n = all_latencies.len() as f64;
1084        let mean = all_latencies.iter().sum::<u64>() as f64 / n;
1085        if mean > 0.0 {
1086            let variance = all_latencies
1087                .iter()
1088                .map(|&v| (v as f64 - mean).powi(2))
1089                .sum::<f64>()
1090                / n;
1091            let cv = variance.sqrt() / mean;
1092            if cv > cv_limit {
1093                r.record_fail(AssertDetail::new(
1094                    DetailKind::Benchmark,
1095                    format!(
1096                        "wake latency CV {cv:.3} exceeds limit {cv_limit:.3} (mean={mean:.0}ns)"
1097                    ),
1098                ));
1099            }
1100        } else {
1101            // CV is dispersion / mean. With mean == 0 every captured
1102            // wake-latency sample was zero, so the denominator is
1103            // zero and CV is undefined — neither pass nor fail is
1104            // truthful. The same workload that fails to record
1105            // measurable wake latency at all (typically: nothing
1106            // actually woke, or every wake landed at <1ns and
1107            // truncated to zero in the ns counter) previously slid
1108            // past the gate as a silent pass; surface it as
1109            // Inconclusive so a broken benchmarking run does not
1110            // masquerade as a CV-compliant one.
1111            r.record_inconclusive(AssertDetail::new(
1112                DetailKind::Benchmark,
1113                format!(
1114                    "wake latency CV inconclusive: all {} sample(s) had zero mean wake \
1115                     latency — denominator is zero, CV cannot be computed; limit \
1116                     {cv_limit:.3} neither pass nor fail (did any wake event capture a \
1117                     non-zero latency?)",
1118                    all_latencies.len(),
1119                ),
1120            ));
1121        }
1122    }
1123
1124    if let Some(rate_floor) = min_iter_rate {
1125        // Skip per-worker zero-wall cases (rate is unknowable when
1126        // wall_time_ns == 0) but count them: if every worker had
1127        // zero wall_time, the gate silently passed before — record
1128        // Inconclusive instead so a broken run that produced no
1129        // signal at all doesn't masquerade as a passing benchmark.
1130        let mut zero_wall_count = 0usize;
1131        for w in reports {
1132            if w.wall_time_ns == 0 {
1133                zero_wall_count += 1;
1134                continue;
1135            }
1136            let rate = w.iterations as f64 / (w.wall_time_ns as f64 / 1e9);
1137            if rate < rate_floor {
1138                r.record_fail(AssertDetail::new(
1139                    DetailKind::Benchmark,
1140                    format!(
1141                        "worker {} iteration rate {rate:.1}/s below floor {rate_floor:.1}/s",
1142                        w.tid
1143                    ),
1144                ));
1145            }
1146        }
1147        if zero_wall_count == reports.len() {
1148            r.record_inconclusive(AssertDetail::new(
1149                DetailKind::Benchmark,
1150                format!(
1151                    "min iteration rate inconclusive: all {} workers recorded zero wall_time_ns — \
1152                     denominator is zero, rate cannot be computed; floor {rate_floor:.1}/s \
1153                     neither pass nor fail (was the workload able to run?)",
1154                    reports.len()
1155                ),
1156            ));
1157        }
1158    }
1159
1160    r
1161}
1162
1163/// Assert that every SCX event counter in `events` is at or below
1164/// `max_count`. `events` is a slice of `(name, count)` pairs sourced
1165/// from the kernel's per-task `scx_event_stats` (see `kernel/sched/ext.c`,
1166/// `SCX_EV_*` macros) — typically aggregated and surfaced via
1167/// `monitor::ScxEventDeltas` or sidecar `GauntletRow.fallback_count` /
1168/// `keep_last_count` fields. Pass `None` for `max_count` to require zero
1169/// (the strict default — error-class events should not fire under a
1170/// healthy scheduler).
1171///
1172/// The assertion is decoupled from the `monitor` module on purpose:
1173/// callers harvest the counters they care about (via the live monitor
1174/// path or by reading sidecar JSON post-hoc) and feed name/count
1175/// pairs in. This keeps the assert API surface decoupled from the
1176/// kernel-side counter inventory, which evolves across kernel
1177/// versions — adding a new `SCX_EV_*` does not force an API change
1178/// here.
1179///
1180/// Returns a passing result if every counter is within bound; failures
1181/// concatenate one [`AssertDetail`] per offending counter under
1182/// [`DetailKind::SchedulerEvent`] so an operator can identify which
1183/// events fired without scanning the full counter set.
1184///
1185/// ```
1186/// # use ktstr::assert::assert_scx_events_clean;
1187/// // Strict default — every counter must be zero.
1188/// let r = assert_scx_events_clean(&[("enq_skip_exiting", 0), ("dispatch_local_dsq_offline", 0)], None);
1189/// assert!(r.is_pass());
1190///
1191/// // A non-zero error-class counter fails.
1192/// let r = assert_scx_events_clean(&[("enq_skip_exiting", 7)], None);
1193/// assert!(r.is_fail());
1194///
1195/// // Caller-supplied bound tolerates small counts.
1196/// let r = assert_scx_events_clean(&[("dispatch_keep_last", 3)], Some(10));
1197/// assert!(r.is_pass());
1198/// ```
1199pub fn assert_scx_events_clean(events: &[(&str, i64)], max_count: Option<i64>) -> AssertResult {
1200    let mut r = AssertResult::pass();
1201    for (name, count) in events {
1202        // Kernel `scx_event_stats` counters are monotonic u64 — a
1203        // negative i64 here means the source data is corrupted
1204        // (counter reset, wraparound on a signed conversion, or
1205        // sidecar JSON bit-loss). Treat negatives as failures rather
1206        // than letting them silently pass `*count > bound` for any
1207        // non-negative bound.
1208        let failed = match max_count {
1209            // Strict default: every counter must be exactly zero.
1210            // `*count > 0` would let -5 slip through.
1211            None => *count != 0,
1212            // Bounded: reject negatives explicitly, then enforce
1213            // the upper bound.
1214            Some(bound) => *count < 0 || *count > bound,
1215        };
1216        if failed {
1217            let bound_desc = match max_count {
1218                None => "0".to_string(),
1219                Some(b) => b.to_string(),
1220            };
1221            r.record_fail(AssertDetail::new(
1222                DetailKind::SchedulerEvent,
1223                format!("scx event `{name}` count {count} exceeds bound {bound_desc}",),
1224            ));
1225        }
1226    }
1227    r
1228}
1229
1230/// Threshold-preset bundle for [`assert_thresholds`]. Captures the
1231/// guarantees a scheduler-under-test should meet on a healthy run:
1232/// wake latency stays within bound, per-iteration compute cost stays
1233/// within bound, CPU migrations stay within bound, and every worker
1234/// makes some forward progress.
1235///
1236/// Each `Option` field is independent — `None` skips that check. A
1237/// `AbsoluteThresholds` with every field `None` is a no-op (the
1238/// returned [`AssertResult`] always passes), useful as a starting
1239/// point for builder-style composition. Construct the all-`None`
1240/// thresholds via `AbsoluteThresholds::default()` and chain the
1241/// `max_*` / `min_*` setters (e.g. `AbsoluteThresholds::default().max_migrations(5)`)
1242/// or spread into a struct literal (`AbsoluteThresholds { max_migrations: Some(5), ..Default::default() }`).
1243/// Use [`Self::strict`] for the "every check enabled with sane defaults" preset.
1244///
1245/// Distinct from [`Assert`]: `Assert` is the merge-tree threshold
1246/// config consumed by the worker-side `AssertPlan`; `AbsoluteThresholds`
1247/// is a flat preset designed for direct invocation in test bodies
1248/// where the test author wants a one-call multi-field check without
1249/// engaging the merge chain. The two surfaces compose — a test can
1250/// run `assert_thresholds` against a worker-report slice AND merge the
1251/// `Assert`-derived result into the same accumulator via
1252/// [`AssertResult::merge`].
1253#[must_use = "AbsoluteThresholds only takes effect when passed to assert_thresholds"]
1254#[derive(Debug, Clone, Copy, Default)]
1255pub struct AbsoluteThresholds {
1256    /// Maximum acceptable p99 wake latency (nanoseconds). Compared
1257    /// against the pooled p99 across every worker's
1258    /// [`WorkerReport::wake_latencies_ns`]. `None` skips the check.
1259    /// Same units / semantics as [`Assert::max_p99_wake_latency_ns`].
1260    pub max_p99_wake_latency_ns: Option<u64>,
1261    /// Maximum acceptable p99 per-iteration compute cost (nanoseconds).
1262    /// Compared against the pooled p99 across every worker's
1263    /// [`WorkerReport::iteration_costs_ns`]. `None` skips the check.
1264    /// Only meaningful for compute work types that populate the
1265    /// reservoir (`AluHot`, `SmtSiblingSpin`, `IpcVariance`); blocking
1266    /// variants report empty `iteration_costs_ns` and the check is a
1267    /// no-op for those.
1268    pub max_iteration_cost_p99_ns: Option<u64>,
1269    /// Maximum acceptable total CPU migrations across every worker.
1270    /// Compared against the sum of [`WorkerReport::migration_count`].
1271    /// `None` skips the check. Distinct from
1272    /// [`Assert::max_migration_ratio`] (migrations per iteration) —
1273    /// this is an absolute count, useful when the test pins a known
1274    /// workload size and migrations should stay below a fixed ceiling
1275    /// regardless of how many iterations completed.
1276    pub max_migrations: Option<u64>,
1277    /// Minimum acceptable per-worker work_units. Every worker must
1278    /// have completed at least this many work units; one starved
1279    /// worker fails the check. `None` skips. Distinct from
1280    /// [`assert_not_starved`]'s zero-work-units check, which gates
1281    /// only against literal zero — this gate accepts a non-zero
1282    /// floor so a test can reject "barely made progress" runs that
1283    /// pass the strict starvation gate.
1284    pub min_work_units: Option<u64>,
1285}
1286
1287impl AbsoluteThresholds {
1288    /// Sane-default preset: p99 wake latency under 10ms, p99
1289    /// iteration cost under 1ms, total migrations under 1000, every
1290    /// worker completes ≥1 work unit. The defaults are deliberately
1291    /// loose — a threshold set tight enough to catch egregious
1292    /// regressions without flagging every routine scheduler
1293    /// perturbation. Tests
1294    /// that need tighter bounds should set the fields explicitly via
1295    /// the bare-verb builder methods rather than tuning these constants.
1296    pub const fn strict() -> Self {
1297        Self {
1298            max_p99_wake_latency_ns: Some(10_000_000),
1299            max_iteration_cost_p99_ns: Some(1_000_000),
1300            max_migrations: Some(1000),
1301            min_work_units: Some(1),
1302        }
1303    }
1304
1305    /// Builder setter for [`Self::max_p99_wake_latency_ns`].
1306    pub const fn max_p99_wake_latency_ns(mut self, v: u64) -> Self {
1307        self.max_p99_wake_latency_ns = Some(v);
1308        self
1309    }
1310
1311    /// Builder setter for [`Self::max_iteration_cost_p99_ns`].
1312    pub const fn max_iteration_cost_p99_ns(mut self, v: u64) -> Self {
1313        self.max_iteration_cost_p99_ns = Some(v);
1314        self
1315    }
1316
1317    /// Builder setter for [`Self::max_migrations`].
1318    pub const fn max_migrations(mut self, v: u64) -> Self {
1319        self.max_migrations = Some(v);
1320        self
1321    }
1322
1323    /// Builder setter for [`Self::min_work_units`].
1324    pub const fn min_work_units(mut self, v: u64) -> Self {
1325        self.min_work_units = Some(v);
1326        self
1327    }
1328}
1329
1330/// Run every check in `thresholds` against `reports`, merging results
1331/// into a single [`AssertResult`]. A `None` field on the thresholds
1332/// skips that check.
1333///
1334/// An empty `reports` slice short-circuits to a skip (`"no worker
1335/// reports to evaluate"`) regardless of thresholds content — silently
1336/// passing thresholds against zero samples would let them look
1337/// "green" on a run that produced no measurement.
1338///
1339/// Field-to-check mapping:
1340/// - `max_p99_wake_latency_ns` -> pooled p99 across every worker's
1341///   `wake_latencies_ns`; tagged [`DetailKind::Benchmark`].
1342/// - `max_iteration_cost_p99_ns` -> pooled p99 across every worker's
1343///   `iteration_costs_ns`; tagged [`DetailKind::Benchmark`].
1344/// - `max_migrations` -> sum of `migration_count` across workers;
1345///   tagged [`DetailKind::Migration`].
1346/// - `min_work_units` -> per-worker `work_units >= floor`; tagged
1347///   [`DetailKind::Starved`] when a worker is below the floor.
1348///
1349/// The wake-latency check delegates to [`assert_benchmarks`] for the
1350/// percentile path so the same nearest-rank algorithm applies; the
1351/// iteration-cost check uses an inline percentile call against the
1352/// pooled `iteration_costs_ns` reservoir.
1353///
1354/// ```
1355/// # use ktstr::assert::{AbsoluteThresholds, assert_thresholds};
1356/// # use ktstr::workload::WorkerReport;
1357/// # let report = WorkerReport {
1358/// #     tid: 1, cpus_used: [0].into_iter().collect(),
1359/// #     work_units: 1000, cpu_time_ns: 2_500_000_000,
1360/// #     wall_time_ns: 5_000_000_000, off_cpu_ns: 2_500_000_000,
1361/// #     migration_count: 5, migrations: vec![],
1362/// #     max_gap_ms: 50, max_gap_cpu: 0, max_gap_at_ms: 1000,
1363/// #     wake_latencies_ns: vec![100, 200, 300, 400, 500],
1364/// #     wake_sample_total: 5,
1365/// #     iteration_costs_ns: vec![1000, 2000, 3000, 4000, 5000],
1366/// #     iteration_cost_sample_total: 5,
1367/// #     iterations: 1000,
1368/// #     schedstat_run_delay_ns: 0, schedstat_run_count: 0,
1369/// #     schedstat_cpu_time_ns: 0,
1370/// #     completed: true,
1371/// #     numa_pages: std::collections::BTreeMap::new(),
1372/// #     vmstat_numa_pages_migrated: 0,
1373/// #     exit_info: None,
1374/// #     affinity_error: None,
1375/// #     is_messenger: false,
1376/// #     group_idx: 0,
1377/// #     phase_slices: vec![],
1378/// #     ..Default::default()
1379/// # };
1380/// // Strict preset on a healthy run — passes.
1381/// let r = assert_thresholds(&[report], &AbsoluteThresholds::strict());
1382/// assert!(r.is_pass());
1383/// ```
1384pub fn assert_thresholds(
1385    reports: &[WorkerReport],
1386    thresholds: &AbsoluteThresholds,
1387) -> AssertResult {
1388    // Empty `reports` means nothing was measured. Returning a fresh
1389    // `pass()` here would silently green-light a broken run that
1390    // produced no signal. `assert_benchmarks` also skips on empty
1391    // reports, and merging that skip into a `pass()` would preserve it
1392    // (`AssertResult::merge` extends `outcomes`, and an all-Skip
1393    // non-empty `outcomes` folds to Skip) — but its message is generic.
1394    // Surface the skip directly so the operator sees the
1395    // thresholds-specific reason: no worker reports to evaluate.
1396    if reports.is_empty() {
1397        return AssertResult::skip("no worker reports to evaluate");
1398    }
1399
1400    let mut r = AssertResult::pass();
1401
1402    // Wake-latency p99: reuse the existing `assert_benchmarks` path
1403    // so the percentile algorithm stays unified. With `reports`
1404    // non-empty here, `assert_benchmarks` cannot return a skip —
1405    // the merge sees only pass/fail, preserving thresholds semantics.
1406    if thresholds.max_p99_wake_latency_ns.is_some() {
1407        r.merge(assert_benchmarks(
1408            reports,
1409            thresholds.max_p99_wake_latency_ns,
1410            None,
1411            None,
1412        ));
1413    }
1414
1415    // Iteration-cost p99: pooled across every worker's reservoir.
1416    // Skipped when no samples are present — compute work types that
1417    // populate `iteration_costs_ns` are sparse, so an empty pooled
1418    // set is the common case for blocking variants and not a failure.
1419    if let Some(cost_limit) = thresholds.max_iteration_cost_p99_ns {
1420        let all_costs: Vec<u64> = reports
1421            .iter()
1422            .flat_map(|w| w.iteration_costs_ns.iter().copied())
1423            .collect();
1424        if !all_costs.is_empty() {
1425            let mut sorted = all_costs.clone();
1426            sorted.sort_unstable();
1427            let p99 = percentile(&sorted, 0.99);
1428            if p99 > cost_limit {
1429                r.record_fail(AssertDetail::new(
1430                    DetailKind::Benchmark,
1431                    format!(
1432                        "p99 iteration cost {p99}ns exceeds limit {cost_limit}ns ({} samples)",
1433                        sorted.len(),
1434                    ),
1435                ));
1436            }
1437        }
1438    }
1439
1440    // Total migrations across all workers: absolute-count gate
1441    // (distinct from migration_ratio which is a per-iteration rate).
1442    if let Some(max_mig) = thresholds.max_migrations {
1443        // saturating: a corrupt/hostile per-worker migration_count must clamp,
1444        // not wrap, this absolute-count gate — a wrapped small total would
1445        // silently PASS a max_migrations limit it should fail.
1446        let total_mig: u64 = reports
1447            .iter()
1448            .map(|w| w.migration_count)
1449            .fold(0u64, u64::saturating_add);
1450        if total_mig > max_mig {
1451            r.record_fail(AssertDetail::new(
1452                DetailKind::Migration,
1453                format!(
1454                    "total migrations {total_mig} exceeds limit {max_mig} ({} workers)",
1455                    reports.len(),
1456                ),
1457            ));
1458        }
1459    }
1460
1461    // Per-worker work_units floor: every worker must have completed
1462    // at least `min` work units. One starved worker fails the check.
1463    if let Some(min_units) = thresholds.min_work_units {
1464        for w in reports {
1465            if w.work_units < min_units {
1466                r.record_fail(AssertDetail::new(
1467                    DetailKind::Starved,
1468                    format!(
1469                        "tid {} work_units {} below floor {min_units}",
1470                        w.tid, w.work_units,
1471                    ),
1472                ));
1473            }
1474        }
1475    }
1476
1477    r
1478}
1479
1480// (The legacy `Expect` / `Checks` / `CheckBuilder` types previously
1481// living here were replaced by the [`Verdict`]-based claim API
1482// (defined further up in this file). The new flow is
1483// `Assert::default_checks().verdict().claim_<field>(stats).at_most(N)` for
1484// stats-struct-derived accessors, or `claim!(verdict, expr)` for
1485// expression-labeled claims. Both produce
1486// [`ClaimBuilder`]/[`SetClaim`]/[`SeqClaim`] under the hood and
1487// record outcomes onto the same [`AssertResult`] envelope that
1488// `assert_not_starved` / `assert_isolation` produce, so the two
1489// paths compose via [`Verdict::merge`].)