ktstr/assert/stats_types.rs
1use super::*;
2
3/// Verdict for a single test scenario.
4///
5/// # Reading the verdict
6///
7/// Inspect the terminal verdict via [`Self::outcome`] (returns the
8/// folded [`Outcome`] enum) or the convenience accessors
9/// [`Self::is_pass`] / [`Self::is_fail`] / [`Self::is_inconclusive`] /
10/// [`Self::is_skip`]. Iterate the per-variant payloads via
11/// [`Self::failure_details`] (all [`Outcome::Fail`] payloads),
12/// [`Self::inconclusive_details`] (all [`Outcome::Inconclusive`]
13/// payloads), and [`Self::skip_details`] (all [`Outcome::Skip`]
14/// payloads). All four bool accessors mirror
15/// [`Outcome::is_pass`] / [`Outcome::is_fail`] /
16/// [`Outcome::is_inconclusive`] / [`Outcome::is_skip`].
17///
18/// # Recording outcomes
19///
20/// Producers use the atomic mutators [`Self::record_fail`] /
21/// [`Self::record_skip`] / [`Self::record_inconclusive`] /
22/// [`Self::record_pass`] (each pushes a single [`Outcome`] variant
23/// onto [`Self::outcomes`]) and the escape hatch
24/// [`Self::record_outcome`] for pre-folded values. Constructors
25/// [`Self::pass`] / [`Self::skip`] / [`Self::fail`] seed the
26/// outcomes vec with the corresponding variant; [`Self::pass`] is
27/// zero-allocation (empty vec; the Pass identity element).
28///
29/// **Wire-format stability**: this struct is postcard-serialized as
30/// part of the in-VM `MSG_TYPE_TEST_RESULT` payload and as
31/// sidecar artifacts under `~/.cache/ktstr`. The wire format is
32/// **not stable across crate versions** — pre-1.0, fields can be
33/// added, removed, or reshaped at any time, and old sidecars must
34/// be regenerated after upgrades (re-running the affected tests
35/// produces a fresh sidecar). Per the project's pre-1.0 no-compat
36/// stance, no `#[serde(default)]` shims are added for old payloads.
37#[must_use = "test verdict is lost if not checked"]
38#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
39pub struct AssertResult {
40 /// Recorded terminal verdicts in emission order, one entry per
41 /// check that explicitly called [`Self::record_pass`],
42 /// [`Self::record_skip`], [`Self::record_inconclusive`], or
43 /// [`Self::record_fail`] (plus the single entry seeded by
44 /// [`Self::skip`] / [`Self::fail`] constructors).
45 ///
46 /// **Empty `outcomes` is the Pass identity** — [`Self::pass`]
47 /// constructs with `outcomes: vec![]`, and [`Self::outcome`]
48 /// resolves an empty `outcomes` to [`Outcome::Pass`] via the
49 /// ordered `Fail > Inconclusive > Pass > Skip` precedence (NOT a
50 /// fold via [`Outcome::merge`]; see [`Self::outcome`]), so a
51 /// never-touched accumulator naturally resolves to Pass without
52 /// any allocation. `record_pass()` is
53 /// for the rare case where a test explicitly records a passing
54 /// check (e.g. per-check helpers that document what passed);
55 /// `pass()` is the zero-state "nothing failed so far"
56 /// constructor.
57 ///
58 /// The folded terminal verdict is computed by [`Self::outcome`]
59 /// per the precedence `Fail > Inconclusive > Pass > Skip`. Use
60 /// [`Self::is_pass`] / [`Self::is_fail`] /
61 /// [`Self::is_inconclusive`] / [`Self::is_skip`] for bool
62 /// checks; use [`Self::failure_details`] /
63 /// [`Self::inconclusive_details`] / [`Self::skip_details`] to
64 /// iterate the per-variant [`AssertDetail`] payloads.
65 pub outcomes: Vec<Outcome>,
66 /// Structured records of every passing claim. Counterpart to
67 /// [`Self::outcomes`]: where `outcomes` carries terminal-verdict
68 /// records (Fail/Skip/Pass per-check), `passes` carries the
69 /// positive confirmations every comparator's pass arm emits via
70 /// [`Verdict`]'s `record_pass_unary` / `record_pass_binary`
71 /// helpers.
72 /// Empty in tests that don't exercise the structured-pass path
73 /// (the no-claim base case), populated whenever a [`Verdict`]
74 /// records claims. The auto-repro renderer iterates both vecs
75 /// to compose the bracketed phase-grouped output that surfaces
76 /// passing context alongside failing assertions.
77 ///
78 /// **Bounded by [`MAX_RECORDED_PASSES`]** — past that count,
79 /// further pushes drop on the floor and a single sentinel
80 /// record named [`PASSES_TRUNCATION_SENTINEL_NAME`] appears at
81 /// the tail. Use the sentinel-name check (not `len()`
82 /// arithmetic) to detect truncation.
83 ///
84 /// **Test-author convention**: do NOT pin `result.passes` shape
85 /// or contents in test assertions unless the test exists
86 /// specifically to verify the structured-pass surface (e.g.
87 /// the auto-repro renderer's own coverage tests). The field
88 /// exists for the renderer's consumption; pinning it
89 /// elsewhere makes the test surface viral — every new
90 /// comparator that fires under the test starts churning the
91 /// pin. Pin `outcome()`, `failure_details()`, and `measurements` for
92 /// scenario verification.
93 pub passes: Vec<PassDetail>,
94 /// Aggregated stats from all workers in this scenario.
95 pub stats: ScenarioStats,
96 /// Structured measurements attached via [`Self::note_value`] /
97 /// [`Verdict::note_value`]. Distinct from [`Self::outcomes`] —
98 /// outcomes carry typed verdict variants with `AssertDetail`
99 /// payloads for operator triage, `measurements` carries typed
100 /// `(key, NoteValue)` pairs for programmatic consumption (sidecar
101 /// parsers, `perf-delta`, regression dashboards).
102 pub measurements: std::collections::BTreeMap<String, NoteValue>,
103 /// Informational annotations attached via [`Self::note`] /
104 /// [`Verdict::note`]. Structurally separated from [`Self::outcomes`]
105 /// so the failure stream stays purely failure-shaped: sidecar
106 /// consumers iterating `details` count real failures without
107 /// the "forgot to filter notes" silent-miscount class of bug
108 /// that the prior `DetailKind::Note` variant on [`AssertDetail`]
109 /// invited. The auto-repro renderer surfaces these alongside the
110 /// failure summary so the operator still sees them on a failing
111 /// run.
112 pub info_notes: Vec<InfoNote>,
113}
114
115/// Per-cgroup statistics from worker telemetry.
116///
117/// # Percentile convention
118///
119/// `p99_wake_latency_us` and `median_wake_latency_us` are computed
120/// by `percentile` using the NEAREST-RANK (Type 1) definition:
121/// the value at `ceil(n * p) - 1` in sorted order. No interpolation
122/// between samples. This matches the percentile convention used
123/// throughout schbench and the BPF latency histograms the project
124/// cross-references, so a `ktstr` p99 reading aligns with a
125/// schbench `lat99` without adjustment. For small `n` (wake
126/// reservoirs cap at `MAX_WAKE_SAMPLES = 100_000` per worker —
127/// see `workload.rs`) nearest-rank is also numerically stable —
128/// interpolation between the two nearest ranks would be
129/// implementation-defined at sample-set boundaries.
130///
131/// # CV pooling scope
132///
133/// `wake_latency_cv` is POOLED across every sample from every
134/// worker in the cgroup, not a per-worker CV averaged back. That
135/// collapses per-worker dispersion into the cgroup-wide signal:
136/// two workers with uniformly low jitter but different means
137/// produce a high pooled CV (mean-shift between workers inflates
138/// stddev), while per-worker CV would show neither worker as
139/// bad. This is intentional for the fairness threshold
140/// (`max_wake_latency_cv`): a scheduler that gives worker A
141/// 10µs wakes and worker B 1ms wakes is failing fairness even if
142/// each worker on its own is tight. Tests comparing single-worker
143/// behavior should scope their assertions to per-worker data
144/// rather than this aggregate.
145///
146/// # Derived ratios
147///
148/// Two metrics are DERIVED rather than measured and live as
149/// `&self` methods, NOT as serde-serialized fields:
150/// [`Self::wake_latency_tail_ratio`] (= p99/median) and
151/// [`Self::iterations_per_worker`] (= total_iterations/num_workers).
152/// Pre-1.0 cleanup eliminated the prior stored-field shadow and
153/// `derive_ratios` stamper. Consumers always recompute on read,
154/// so a hand-constructed fixture or a deserialized sidecar from an
155/// older build cannot silently carry a stale ratio. The run-level
156/// worst-cgroup tail ratio (`crate::stats::MetricKind::WakeLatencyTailRatio`,
157/// an `ext_metrics` entry) and the iterations efficiencies
158/// (`worst_iterations_per_worker` / `worst_iterations_per_cpu_sec`) are all
159/// re-pooled POST-merge by [`populate_run_distribution_metrics`] — the tail
160/// ratio as the max over [`Self::wake_latency_tail_ratio`] across per-cgroup
161/// [`Self`] entries, the efficiencies lowest-wins from
162/// [`Self::iterations_per_worker`] / [`Self::iterations_per_cpu_sec`].
163#[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize, crate::Claim)]
164pub struct CgroupStats {
165 /// Cgroup name (the workload-handle label this telemetry belongs to),
166 /// or empty for unlabeled call sites (`collect_all`, bare
167 /// `assert_cgroup`). Set post-hoc by `collect_handles` (in
168 /// `crate::scenario`) where the name is in scope; `cgroup_stats`
169 /// itself has only the reports and leaves it empty. Lets a PASSING-run
170 /// consumer say which cgroup's work landed on which CPUs.
171 pub cgroup_name: String,
172 /// Number of workers in this cgroup.
173 pub num_workers: usize,
174 /// Distinct CPUs the workers in this cgroup actually ran on (union of
175 /// each [`crate::workload::WorkerReport::cpus_used`]). `num_cpus` is
176 /// its length, kept for the existing rollups; this set surfaces WHICH
177 /// CPUs (not just how many) on every run, pass or fail.
178 pub cpus_used: BTreeSet<usize>,
179 /// Distinct CPUs used across all workers in this cgroup
180 /// (`cpus_used.len()`).
181 pub num_cpus: usize,
182 /// Mean off-CPU percentage across workers (off_cpu_ns /
183 /// wall_time_ns * 100). `None` when no worker reported a
184 /// positive `wall_time_ns` (off-CPU% is undefined without wall
185 /// time) — distinct from `Some(0.0)`, a measured "never off
186 /// CPU". The `Option` keeps a not-measured cgroup from reading
187 /// as a perfectly-on-CPU one in the telemetry consumers
188 /// (`ScenarioStats.cgroups`).
189 pub avg_off_cpu_pct: Option<f64>,
190 /// Minimum off-CPU percentage across workers. `None` under the
191 /// same no-measurable-wall-time condition as `avg_off_cpu_pct`.
192 pub min_off_cpu_pct: Option<f64>,
193 /// Maximum off-CPU percentage across workers. `None` under the
194 /// same no-measurable-wall-time condition as `avg_off_cpu_pct`.
195 pub max_off_cpu_pct: Option<f64>,
196 /// `max_off_cpu_pct - min_off_cpu_pct`. Measures scheduling
197 /// fairness within the cgroup. `None` when off-CPU% was not
198 /// measured (no worker with positive wall time) — a not-measured
199 /// cgroup is inconclusive for fairness, NOT "spread 0 = perfectly
200 /// fair". `Some(0.0)` means a real measured zero spread.
201 pub spread: Option<f64>,
202 /// Longest scheduling gap across all workers (ms).
203 pub max_gap_ms: u64,
204 /// CPU where the longest scheduling gap occurred.
205 pub max_gap_cpu: usize,
206 /// Sum of CPU migration counts across all workers.
207 pub total_migrations: u64,
208 /// Migrations per iteration (total_migrations / total_iterations).
209 pub migration_ratio: f64,
210 /// 99th percentile wake latency across all workers (microseconds).
211 pub p99_wake_latency_us: f64,
212 /// Median wake latency across all workers (microseconds).
213 pub median_wake_latency_us: f64,
214 /// Coefficient of variation (stddev / mean) of wake latencies.
215 ///
216 /// Computed over the POOLED latency samples from every worker in
217 /// the cgroup, not as a mean of per-worker CVs. Per-worker
218 /// dispersion is therefore masked: a cgroup with one tight
219 /// worker and one wildly variable worker can report a moderate
220 /// pooled CV that looks healthier than either constituent. Use
221 /// [`WorkerReport::wake_latencies_ns`] directly if per-worker
222 /// CV is needed.
223 pub wake_latency_cv: f64,
224 /// Whether any worker in this cgroup recorded a wake-latency sample.
225 /// `false` makes the wake reductions above (`p99_wake_latency_us`,
226 /// `median_wake_latency_us`, `wake_latency_cv`) a not-measured sentinel
227 /// `0.0` rather than a measured zero — a percentile over zero samples is
228 /// undefined, not "instant wakes". The run-level distributional re-pool
229 /// (`populate_run_distribution_metrics`) reads this to EXCLUDE a
230 /// no-wake-sample cgroup from the cross-run mean instead of folding its
231 /// `0.0` in (which, for the LowerBetter wake metrics, would falsely drag
232 /// the mean toward "perfect"). Same not-measured-vs-measured-zero
233 /// discipline the off-CPU% `Option` fields above carry.
234 pub wake_measured: bool,
235 /// Median timer-latency across all workers (microseconds) — the
236 /// [`crate::workload::WorkType::TimerLatency`] cyclictest probe's per-cgroup
237 /// pooled reduction over
238 /// [`crate::workload::WorkerReport::timer_latencies_ns`]. `0.0` when no
239 /// worker recorded timer samples.
240 pub median_timer_latency_us: f64,
241 /// 99th-percentile timer-latency across all workers (microseconds). See
242 /// [`Self::median_timer_latency_us`].
243 pub p99_timer_latency_us: f64,
244 /// 99.9th-percentile (deep-tail) timer-latency across all workers
245 /// (microseconds). See [`Self::median_timer_latency_us`].
246 pub p999_timer_latency_us: f64,
247 /// Worst (maximum) timer-latency across all workers (microseconds). See
248 /// [`Self::median_timer_latency_us`].
249 pub worst_timer_latency_us: f64,
250 /// Whether any worker in this cgroup recorded a timer-latency sample.
251 /// `false` makes the timer reductions above a not-measured sentinel `0.0`
252 /// (no [`crate::workload::WorkType::TimerLatency`] worker ran), distinct
253 /// from a measured zero. Read by the run-level re-pool to EXCLUDE a
254 /// no-timer-sample cgroup from the cross-run mean. Mirrors
255 /// [`Self::wake_measured`] for the timer carrier.
256 pub timer_measured: bool,
257 /// Sum of iteration counts across all workers.
258 pub total_iterations: u64,
259 /// Sum of per-worker on-CPU time (nanoseconds), from each worker's
260 /// schedstat run time ([`crate::workload::WorkerReport::schedstat_cpu_time_ns`]
261 /// — `task->se.sum_exec_runtime`, the FIRST `/proc/<pid>/schedstat` field
262 /// (`sched_info` supplies only the run_delay/pcount fields 2/3, not the
263 /// on-CPU time), the summable per-thread proxy for the cgroup's
264 /// `cpu.stat usage_usec`).
265 /// Denominator for [`Self::iterations_per_cpu_sec`], the
266 /// overcommit-invariant per-cell rate. `0` when no worker reported on-CPU
267 /// time (the accessor then returns `None`).
268 pub total_cpu_time_ns: u64,
269 /// Mean schedstat run delay across workers (microseconds).
270 pub mean_run_delay_us: f64,
271 /// Worst schedstat run delay across workers (microseconds).
272 pub worst_run_delay_us: f64,
273 /// Whether this cgroup had any worker to measure run-delay from
274 /// (`!run_delays.is_empty()`, i.e. `num_workers > 0`) — `false` only for a
275 /// worker-less cgroup, keeping a degenerate empty cohort from folding a
276 /// sentinel `0.0` into the cross-run run-delay mean. Unlike wake/timer
277 /// (per-sample streams a running worker may never emit), run-delay is one
278 /// `sched_info.run_delay` value per worker, always present once a worker
279 /// exists: a worker that never queued reads a real measured `0.0`, not a
280 /// no-measurement sentinel. `sched_info.run_delay` accumulates whenever
281 /// `CONFIG_SCHED_INFO` is built in (compile-time — forced on in ktstr,
282 /// `select`ed by both `CONFIG_SCHEDSTATS` and `CONFIG_TASK_DELAY_ACCT`),
283 /// with no gate on the runtime `kernel.sched_schedstats` key (that key
284 /// gates only the `schedstat_*` rq/se aggregates, never `run_delay`), so
285 /// run-delay is genuinely measured on every ktstr run and worker-presence
286 /// is the correct measured predicate. Mirrors [`Self::wake_measured`] for
287 /// the run-delay carrier.
288 pub run_delay_measured: bool,
289 /// Fraction of pages on the expected NUMA node(s) (0.0-1.0).
290 /// Derived from `/proc/self/numa_maps` and the worker's
291 /// [`MemPolicy`](crate::workload::MemPolicy).
292 pub page_locality: f64,
293 /// Cross-node page migration ratio from `/proc/vmstat`
294 /// `numa_pages_migrated` delta divided by total allocated pages.
295 pub cross_node_migration_ratio: f64,
296 /// Whole-run taobench engine COUNTER aggregate pooled across this cgroup's
297 /// [`crate::workload::WorkType::Taobench`] workers (Σ ops, MAX wall window —
298 /// the window is shared by concurrent workers, per
299 /// [`crate::workload::WorkerReport::taobench_whole`]). `None` for every
300 /// non-taobench cgroup. A RAW carrier, like [`Self::total_iterations`] /
301 /// [`Self::total_cpu_time_ns`] — not a reduced ratio: the run-level
302 /// cross-cgroup pool [`crate::assert::populate_run_pooled_taobench`] folds it
303 /// into the `total_taobench_*` Counter components and the derived
304 /// `taobench_*_per_sec` / `taobench_hit_fraction` / `taobench_command_hit_rate`
305 /// Rates in [`Self::ext_metrics`] (whole-run keys visible to `--noise-adjust`
306 /// spread, unlike the per-phase `taobench_*_qps` which are
307 /// `MetricKind::PerPhase`). Whole-run, NOT summable from the per-phase
308 /// `PhaseCgroupStats::taobench` carriers (per-phase `elapsed_ns` is
309 /// MAX-merged across concurrent threads, so summing phase windows is the
310 /// wrong qps denominator), so the engine's authoritative whole-run aggregate
311 /// is shipped from the worker. Holds COUNTERS only
312 /// ([`TaobenchStats`](crate::workload::taobench::run::TaobenchStats)) — the
313 /// serve-latency histogram is per-phase data on `PhaseCgroupStats::taobench`,
314 /// and the whole-run serve distribution (`taobench_serve_*_us_whole`) is the
315 /// union of those per-phase histograms. `pub` (every `CgroupStats` field is
316 /// `pub` and the struct is preluded, so a test author can read the counters).
317 /// `#[claim(skip)]`: a raw aggregate carrier, not a test-author claim
318 /// surface — assertions run against the host-derived run-level `taobench_*`
319 /// Rate / serve-latency metrics, mirroring
320 /// [`crate::workload::WorkerReport::taobench_whole`].
321 #[claim(skip)]
322 pub taobench_whole: Option<crate::workload::taobench::run::TaobenchStats>,
323 /// Extensible metrics for the generic comparison pipeline.
324 pub ext_metrics: BTreeMap<String, f64>,
325}
326
327/// Per-phase per-cgroup raw telemetry components — the per-phase analogue of
328/// [`CgroupStats`]. Holds RAW components (sample vectors + counters), NOT the
329/// reduced ratios/percentiles [`CgroupStats`] computes, so whole-run and
330/// cross-run aggregates RE-POOL from the components at every level (the
331/// per-phase telemetry thesis: an aggregate is recomputed over the pooled
332/// components, never averaged from ready-made per-phase reductions — a
333/// percentile or weighted ratio cannot be recovered from per-phase scalars).
334/// Covers every TYPED [`CgroupStats`] reduction: avg/min/max off-CPU% and
335/// spread from `off_cpu_pcts`; p99/median/CV wake latency from
336/// `wake_latencies_ns`; mean/worst run-delay from `run_delays_ns`;
337/// migration_ratio, iterations_per_cpu_sec, iterations_per_worker,
338/// page_locality, cross_node_migration_ratio from their counter components;
339/// the COUPLED worst gap (ms + the CPU that owned it) from `max_gap_ms` /
340/// `max_gap_cpu`; cpus_used / num_cpus from `cpus_used`. EXCLUDES
341/// [`CgroupStats::ext_metrics`] (the generic extensible map — a per-phase
342/// per-cgroup custom metric is a future extension, not part of the typed
343/// carrier). Lives in [`PhaseBucket::per_cgroup`], keyed by cgroup name. The
344/// structural carrier is empty until a capture path populates it per phase.
345#[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize, crate::Claim)]
346pub struct PhaseCgroupStats {
347 /// Worker count in this cgroup for the phase — the denominator for the
348 /// re-pooled per-worker iteration rate (`iterations_per_worker` =
349 /// `total_iterations` / this). This is a set CARDINALITY (`reports.len()`),
350 /// not a kernel counter, but it SUMs in `merge` because a single cgroup name
351 /// can emit MULTIPLE carriers in one step — `collect_handles` builds one per
352 /// `WorkloadHandle`, and a `CgroupDef` with several `WorkSpec` entries
353 /// (`.work(..).work(..)`) spawns one handle per `WorkSpec` under the same
354 /// name (`apply_setup`). Those carriers cover DISJOINT worker subsets, so the
355 /// cardinality of their union is the SUM (4 + 2 → 6), matching [`cgroup_stats`]
356 /// over the pooled reports (`reports.len()`); a MAX would understate the count
357 /// and inflate `iterations_per_worker`. (The disjointness is the real
358 /// justification — were carriers ever to overlap, the SUM would over-count.)
359 pub num_workers: usize,
360 /// Distinct CPUs the cgroup's workers ran on in the phase (union of each
361 /// worker's `cpus_used`). Re-pools [`CgroupStats::cpus_used`] / `num_cpus`
362 /// (= the set / its length) via a set UNION.
363 pub cpus_used: std::collections::BTreeSet<usize>,
364 /// Pooled per-wakeup latency samples (ns) across the cgroup's workers in
365 /// the phase, un-reduced so p99 / median / CV re-pool over the combined set.
366 /// The POOL is reservoir-capped at `MAX_WAKE_SAMPLES` (the per-worker bound,
367 /// re-applied when same-name carriers merge so the carrier payload stays
368 /// bounded on the size-limited guest bulk port — without it the pool would be
369 /// `workers × MAX_WAKE_SAMPLES`); `wake_sample_total` carries the true
370 /// pre-cap population. The CARRIER-level reductions divide by
371 /// `wake_latencies_ns.len()` (this capped pool size), NOT by
372 /// `wake_sample_total`: [`Self::wake_summary`] takes p99 / median over `len`,
373 /// and [`cgroup_stats`] computes `cv = stddev/mean` with
374 /// `n = all_latencies.len()`. The RUN-level cross-phase re-pool
375 /// ([`populate_run_distribution_metrics`]) instead population-WEIGHTS (see
376 /// the PARITY CONTRACT below): its CV / mean divide by Σ per-sample weights
377 /// (the reconstructed true population), which equals `len` only below the cap.
378 ///
379 /// PARITY CONTRACT (the one component whose parity is size-dependent): for
380 /// pools ≤ `MAX_WAKE_SAMPLES` the reservoir IS the full concatenation, so the
381 /// p99 / median / CV re-pool reproduces [`cgroup_stats`] VALUE-FOR-VALUE.
382 /// Above the cap the carrier holds a distribution-preserving reservoir
383 /// SUBSAMPLE while [`cgroup_stats`] reduces over the full per-worker concat,
384 /// so the re-pool is DISTRIBUTION-EQUIVALENT, not byte-identical (the bounded
385 /// bulk-port frame forbids carrying the full pool; staged reservoirs cannot be
386 /// byte-identical to a single full-pool reduction). This is BY DESIGN:
387 /// `cgroup_stats` stays the uncapped run-level authority (capping it to match
388 /// the carrier would discard most of a multi-worker cgroup's samples to chase
389 /// a sub-display-precision artifact), and the carrier's >cap merge is WEIGHTED
390 /// by `wake_sample_total` (`Self::weighted_merge_reservoirs`) so the subsample
391 /// is an UNBIASED sample of the combined population — no smaller-population
392 /// skew. Both layers de-skew the cap: the carrier MERGE weights by
393 /// `wake_sample_total` (`Self::weighted_merge_reservoirs`), and the
394 /// cross-PHASE run-level pool in `populate_run_distribution_metrics` weights
395 /// each phase carrier's samples by `wake_sample_total / wake_latencies_ns.len()`
396 /// (so a phase that exceeded the cap contributes by true population, not
397 /// capped length) and reduces with the weighted percentile / moments — the
398 /// prior length-weighted concat is gone. Below the cap every weight is 1.0,
399 /// so the weighted P99 / median / mean / worst are BYTE-identical to the
400 /// unweighted concat; the weighted CV matches only within ~1e-9 (it sums in
401 /// f64 where the unweighted path sums the mean in u64 — a weighted variance
402 /// cannot keep the u64 sum).
403 pub wake_latencies_ns: Vec<u64>,
404 /// True wakeup count before reservoir clamping (`wake_latencies_ns` is
405 /// capped), so the re-pool can report the real population size. An
406 /// intentional ADDITION over [`CgroupStats`] (which has no such field), NOT
407 /// a mirrored reduction — do not strip it in a strict-parity audit; it is
408 /// the only source of the true wakeup population once `wake_latencies_ns` is
409 /// reservoir-clamped, and it is for REPORTING, not the CV denominator.
410 pub wake_sample_total: u64,
411 /// Pooled per-timer-cycle latency samples (ns) across the cgroup's
412 /// [`crate::workload::WorkType::TimerLatency`] workers in the phase,
413 /// un-reduced so median / p99 / p999 / worst re-pool over the combined set.
414 /// Reservoir-capped at `MAX_WAKE_SAMPLES` on same-name-carrier merge exactly
415 /// like `wake_latencies_ns` (population-weighted >cap); `timer_sample_total`
416 /// carries the true pre-cap population. Distinct carrier from
417 /// `wake_latencies_ns`.
418 pub timer_latencies_ns: Vec<u64>,
419 /// True timer-cycle count before reservoir clamping (`timer_latencies_ns` is
420 /// capped), so the re-pool reports the real population. Mirrors
421 /// `wake_sample_total` for the timer carrier.
422 pub timer_sample_total: u64,
423 /// Pooled per-worker schedstat run-delay samples (RAW ns) for the phase,
424 /// un-reduced so mean / worst run-delay re-pool over the combined set; the
425 /// re-pool converts ns → µs to match [`CgroupStats`]'s run-delay-µs fields.
426 /// Stored as raw kernel ns (like `wake_latencies_ns`), not pre-converted,
427 /// per the raw-component thesis. GRANULARITY: unlike `wake_latencies_ns`
428 /// (one per WAKEUP), each entry here is ONE per-worker value — that
429 /// worker's `sched_info.run_delay` delta over the carrier's window: the
430 /// whole-run `schedstat_run_delay_ns` (end−start) for the step-local
431 /// carrier, or the per-phase delta for the backdrop slice carrier. So the
432 /// pool size is the worker
433 /// count, the mean is the average per-worker total queued-to-run delay, and
434 /// `worst_run_delay_us` selects the single worker with the largest total
435 /// queued-to-run delay (NOT the worst single dispatch).
436 pub run_delays_ns: Vec<u64>,
437 /// Per-worker off-CPU% samples for the phase, un-reduced. Carried for the
438 /// per-phase per-cgroup off-CPU% RENDER — the avg / min / max /
439 /// spread of the combined set. NOT consumed by the run-level
440 /// distributional re-pool: off-CPU% has no run-level Distribution metric
441 /// (off-CPU%/spread is intrinsically per-cgroup, so the run-level
442 /// `worst_spread` stays the cross-cgroup max of per-cgroup
443 /// [`CgroupStats::spread`] via the typed [`AssertResult::merge`] fold, not a
444 /// pooled distribution). An EMPTY vec is the not-measured state (no worker
445 /// with positive wall time), preserving the not-measured vs measured-zero
446 /// distinction [`CgroupStats`] keeps. Stored as raw samples, not pre-reduced
447 /// extremes, because the mean is unrecoverable from min/max alone for >2
448 /// workers. Each sample is `off_cpu_ns /
449 /// wall_time_ns * 100`, where `off_cpu_ns = wall_time_ns - cpu_time_ns` and
450 /// `cpu_time_ns` is the `CLOCK_THREAD_CPUTIME_ID` thread on-CPU time
451 /// (workload/worker `off_cpu_ns` at report build). `total_cpu_time_ns` is a
452 /// DISTINCT on-CPU measurement (`schedstat_cpu_time_ns`, the `/proc`
453 /// schedstat `se.sum_exec_runtime`): both ultimately track on-CPU runtime but
454 /// are sampled at different points (the `CLOCK_THREAD_CPUTIME_ID` read folds
455 /// the in-flight delta; the schedstat field reads the stored value), so the
456 /// two need not be byte-identical and must not be cross-wired in a re-pool.
457 pub off_cpu_pcts: Vec<f64>,
458 /// Sum of per-worker CPU-migration counts in the phase (Counter).
459 pub total_migrations: u64,
460 /// Sum of per-worker iteration counts in the phase (Counter).
461 pub total_iterations: u64,
462 /// Sum of per-worker on-CPU time (ns) in the phase — the
463 /// overcommit-invariant rate denominator (Counter). Sourced from
464 /// `schedstat_cpu_time_ns` (the `/proc` schedstat `se.sum_exec_runtime`,
465 /// rq-charged on-CPU ns) — a DISTINCT on-CPU-time sample from the
466 /// `CLOCK_THREAD_CPUTIME_ID` time behind `off_cpu_pcts` (different sample
467 /// point; not byte-identical), so do not cross-wire the two in a re-pool.
468 pub total_cpu_time_ns: u64,
469 /// Pages on the expected NUMA node(s) — page-locality numerator. A per-task
470 /// `/proc/self/numa_maps` residency GAUGE (current snapshot of the task's mm,
471 /// recomputed each read — the kernel zeroes and re-walks the page tables),
472 /// SPATIALLY summed across the cgroup's workers within a phase: disjoint-mm
473 /// under the `CloneMode::Fork` default (the true cgroup total), but
474 /// `CloneMode::Thread` siblings share one mm and the SUM over-counts shared
475 /// pages once per thread (caveat inherited from `WorkerReport::numa_pages`).
476 /// The CROSS-PHASE fold takes the LATEST measured snapshot (see
477 /// `numa_agg_per_cgroup`), never a sum (summing residency across phases
478 /// over-counts by the phase count).
479 pub numa_pages_local: u64,
480 /// Total allocated pages — the SHARED denominator for BOTH page_locality
481 /// (`numa_pages_local` / this) AND cross_node_migration_ratio
482 /// (`cross_node_migrated` / this). A per-task `/proc/self/numa_maps` residency
483 /// GAUGE (same class/folds as `numa_pages_local`: within-phase SUM across
484 /// workers — disjoint-mm under the `CloneMode::Fork` default, Thread-mode
485 /// over-count caveat inherited from `WorkerReport::numa_pages` — cross-phase
486 /// LATEST snapshot); the kernel computes both ratios over the identical page
487 /// total, so one field serves both — a separate cross_node_total would invite
488 /// a silent desync.
489 pub numa_pages_total: u64,
490 /// Cross-node migrated pages — cross_node_migration_ratio numerator
491 /// (denominator is `numa_pages_total`). A SYSTEM-WIDE
492 /// `/proc/vmstat numa_pages_migrated` monotonic-COUNTER delta each worker
493 /// observes redundantly, so the within-phase fold is MAX across
494 /// workers/sources (summing would inflate it by the worker count — mirrors
495 /// [`CgroupStats`]'s deliberate max-fold); the CROSS-PHASE fold SUMs the
496 /// per-phase deltas over disjoint intervals to the run total.
497 pub cross_node_migrated: u64,
498 /// Longest scheduling gap (ms) across the cgroup's workers in the phase,
499 /// coupled with `max_gap_cpu`. A Peak folded as an ARGMAX of the (ms, cpu)
500 /// pair so the worst gap and its CPU survive together — mirrors
501 /// [`CgroupStats`]'s `max_gap_ms` / `max_gap_cpu` coupling (a bare
502 /// independent max would desync the gap from its CPU).
503 pub max_gap_ms: u64,
504 /// CPU that owned the worst scheduling gap — `max_gap_ms`'s argmax
505 /// companion. Folded together with `max_gap_ms`, never independently.
506 pub max_gap_cpu: usize,
507 /// True when this carrier's raw sample vectors (`wake_latencies_ns` /
508 /// `timer_latencies_ns` / `run_delays_ns` / `off_cpu_pcts`, plus the
509 /// `schbench` histograms) were dropped by
510 /// `AssertResult::strip_phase_cgroup_samples` to fit the size-limited guest
511 /// bulk frame — distinct from a carrier that genuinely measured no samples.
512 /// The reduced counters survive; only the per-phase distribution render
513 /// loses its source, so the render shows "samples stripped" rather
514 /// than the not-measured "n/a". Defaults to `false` (not stripped) and is set
515 /// only on a carrier that actually HAD samples to drop; ORs across `merge` so
516 /// a merged carrier is stripped if either input was.
517 pub stripped: bool,
518 /// Per-cgroup DERIVED scalar metrics for this (phase, cgroup), keyed by
519 /// `crate::stats::MetricDef` name — the per-cgroup analog of
520 /// [`PhaseBucket::metrics`] (which is the pooled-across-cgroups set). Populated
521 /// post-fold by `derive_phase_metrics` (both the schbench per-phase family AND
522 /// the non-schbench families — wake p99/median/cv, mean/max run-delay,
523 /// avg/min/max/spread off-CPU%, and the migration / iterations / locality
524 /// ratios) from the SAME reducers that fill
525 /// the pooled map, so a
526 /// test can query "metric M of cgroup C in phase P" as readily as the phase
527 /// aggregate (N cgroups -> N queryable sets + the pooled aggregate). DERIVED, not a
528 /// raw component: `PhaseCgroupStats::merge` leaves it empty and it is (re)derived
529 /// POST-merge, exactly as the pooled map skips is_derived keys in
530 /// `merge_matched_phase_buckets`. ALWAYS serialized (no `skip_serializing_if`):
531 /// PhaseCgroupStats rides the postcard bulk-TLV port, a NON-self-describing
532 /// POSITIONAL format — a conditionally-omitted field desyncs the byte stream and
533 /// corrupts the fields after it (here `schbench`), so the field must always be
534 /// present. No `serde(default)` either: pre-1.0, old sidecar/cache data is
535 /// disposable and regenerates (no compat shim). Read via [`Self::get`]; the
536 /// `crate::Claim` derive skips a `BTreeMap` field (matching
537 /// [`PhaseBucket::metrics`], which has no Claim accessor either).
538 pub metrics: std::collections::BTreeMap<String, f64>,
539 /// Per-phase schbench engine metrics for a `WorkType::Schbench` backdrop
540 /// cgroup (`None` for every non-schbench carrier). Pooled across the
541 /// cgroup's workers by [`PhaseCgroupStats::merge`] (histogram
542 /// bucket-add + integer-add of the run-delay raw pairs). The schbench
543 /// per-phase derivation reads it, pools across cgroups, and derives the
544 /// per-phase percentile / run-delay-mean / loop-count scalars into
545 /// [`PhaseBucket::metrics`]. `pub(crate)`: the element type is `pub(crate)`
546 /// and a per-phase carrier is internal — test authors read `PhaseBucket`,
547 /// not this. Non-`pub` so the `crate::Claim` derive skips it (a percentile
548 /// histogram has no meaningful scalar claim accessor).
549 pub(crate) schbench: Option<crate::workload::schbench::run::SchbenchPhaseStats>,
550 /// Per-phase taobench engine metrics for a `WorkType::Taobench` backdrop
551 /// cgroup (`None` for every non-taobench carrier). Pooled across the cgroup's
552 /// workers by [`PhaseCgroupStats::merge`] (counter-add, wall-window MAX, serve
553 /// histograms unioned). The taobench per-phase derivation reads it, pools
554 /// across cgroups, and derives the per-phase qps / hit-ratio / serve-latency
555 /// scalars into [`PhaseBucket::metrics`]. `pub(crate)`: an internal carrier.
556 pub(crate) taobench: Option<crate::workload::taobench::run::TaobenchPhaseStats>,
557}
558
559impl PhaseCgroupStats {
560 /// Component-wise union of two per-phase per-cgroup data for the SAME
561 /// cgroup name (same `step_index`). Fold rule by component class:
562 /// - sample vectors (`wake_latencies_ns`, `run_delays_ns`, `off_cpu_pcts`)
563 /// CONCAT, so the re-pool sees the combined set, never a mean of
564 /// per-source reductions;
565 /// - the CPU set (`cpus_used`) UNIONs;
566 /// - genuine Counters (`num_workers`, `wake_sample_total`,
567 /// `total_migrations`, `total_iterations`, `total_cpu_time_ns`) SUM —
568 /// `num_workers` included, because a multi-`WorkSpec` cgroup emits one
569 /// carrier per handle covering DISJOINT worker subsets, so summing
570 /// reproduces the pooled count (see the `num_workers` field doc);
571 /// - the residency GAUGEs (`numa_pages_local`, `numa_pages_total`) also SUM
572 /// here — the WITHIN-PHASE spatial sum across the cgroup's workers
573 /// (disjoint-mm under the `CloneMode::Fork` default; Thread-mode shares one
574 /// mm and over-counts, the caveat inherited from `WorkerReport::numa_pages`),
575 /// NOT a cross-phase sum (the cross-phase fold takes the LATEST snapshot in
576 /// `numa_agg_per_cgroup`);
577 /// - `cross_node_migrated`, a system-wide vmstat monotonic-Counter delta each
578 /// worker observes redundantly, takes the MAX across workers (summing would
579 /// inflate it); its cross-phase fold SUMs the per-phase deltas;
580 /// - the COUPLED worst gap (`max_gap_ms`, `max_gap_cpu`) folds as an
581 /// ARGMAX — the pair from whichever side has the larger ms (b's on tie,
582 /// matching the builders' `max_by_key` last-wins) so the gap and its CPU
583 /// stay bound together.
584 ///
585 /// The counter SUMs use plain `+`: debug builds panic on overflow rather
586 /// than wrapping. The realistic magnitudes (iteration / ns counts far
587 /// below `u64::MAX` even pooled across a long run) keep overflow
588 /// unreachable; a loud debug panic is preferred over a silently wrong
589 /// re-pool denominator.
590 pub(crate) fn merge(a: PhaseCgroupStats, b: PhaseCgroupStats) -> PhaseCgroupStats {
591 // Merge the two capped wake-latency reservoirs. Same-name carriers (a
592 // multi-`WorkSpec` cgroup's per-handle carriers) merge ON THE GUEST before
593 // the AssertResult is serialized over the bulk port, so K carriers must
594 // not concat to K × MAX_WAKE_SAMPLES (it could overrun the 16 MiB frame,
595 // flipping a PASS to a truncated FAIL).
596 //
597 // ≤cap: the concatenation IS the true combined population, so it passes
598 // through unchanged — value-for-value parity with cgroup_stats for small
599 // pools (only >cap pools become a subsample; see the `wake_latencies_ns`
600 // field doc). >cap: a WEIGHTED reservoir merge weighted by each carrier's
601 // true pre-cap population (`wake_sample_total`), so the merged sample is an
602 // UNBIASED uniform sample of the combined population — NOT the
603 // smaller-population-skewed reservoir-of-reservoirs an unweighted
604 // concat-and-re-cap produced (which weighted by reservoir LENGTH ≈ 50/50,
605 // ignoring the true populations).
606 let cap = crate::workload::MAX_WAKE_SAMPLES;
607 let wake_latencies_ns = if a.wake_latencies_ns.len() + b.wake_latencies_ns.len() <= cap {
608 let mut v = a.wake_latencies_ns;
609 v.extend(b.wake_latencies_ns);
610 v
611 } else {
612 Self::weighted_merge_reservoirs(
613 &a.wake_latencies_ns,
614 a.wake_sample_total,
615 &b.wake_latencies_ns,
616 b.wake_sample_total,
617 cap,
618 )
619 };
620 // Same bounded merge for the distinct timer reservoir:
621 // <=cap concatenation is value-for-value; >cap a population-weighted
622 // reservoir merge keeps it an unbiased sample of the combined population.
623 let timer_latencies_ns = if a.timer_latencies_ns.len() + b.timer_latencies_ns.len() <= cap {
624 let mut v = a.timer_latencies_ns;
625 v.extend(b.timer_latencies_ns);
626 v
627 } else {
628 Self::weighted_merge_reservoirs(
629 &a.timer_latencies_ns,
630 a.timer_sample_total,
631 &b.timer_latencies_ns,
632 b.timer_sample_total,
633 cap,
634 )
635 };
636 let mut run_delays_ns = a.run_delays_ns;
637 run_delays_ns.extend(b.run_delays_ns);
638 let mut off_cpu_pcts = a.off_cpu_pcts;
639 off_cpu_pcts.extend(b.off_cpu_pcts);
640 let mut cpus_used = a.cpus_used;
641 cpus_used.extend(b.cpus_used);
642 // Coupled worst-gap ARGMAX: take the (ms, cpu) pair together from the
643 // side with the larger gap (b's on tie, matching the builders'
644 // max_by_key last-wins) so the CPU stays bound to the gap it owned — a
645 // bare independent max would desync them. The last-wins tie-break is
646 // parity-coupled to fold order: AssertResult::merge folds same-name
647 // carriers in the order reports are pooled (handle iteration order), so
648 // on an equal-gap tie this yields the same CPU as a single cgroup_stats
649 // over the concatenated reports. A reordered fold would break that parity.
650 let (max_gap_ms, max_gap_cpu) = if b.max_gap_ms >= a.max_gap_ms {
651 (b.max_gap_ms, b.max_gap_cpu)
652 } else {
653 (a.max_gap_ms, a.max_gap_cpu)
654 };
655 // Per-phase schbench: OR-with-combine. Both Some → merge the pooled
656 // histograms + integer-add the run-delay raw pairs / loop_count
657 // (SchbenchPhaseStats::merge, the SAME associative+commutative op the
658 // guest engine uses to pool across message threads); one Some → carry
659 // it; both None (non-schbench cgroup) → None.
660 let schbench = match (a.schbench, b.schbench) {
661 (Some(mut x), Some(y)) => {
662 x.merge(&y);
663 Some(x)
664 }
665 (Some(x), None) | (None, Some(x)) => Some(x),
666 (None, None) => None,
667 };
668 // Per-phase taobench: OR-with-merge — both Some → counter-add + wall-window
669 // MAX + serve-latency histogram union (TaobenchPhaseStats::merge, the SAME
670 // op the guest engine pools with); one Some → carry; both None
671 // (non-taobench cgroup) → None.
672 let taobench = match (a.taobench, b.taobench) {
673 (Some(mut x), Some(y)) => {
674 x.merge(&y);
675 Some(x)
676 }
677 (Some(x), None) | (None, Some(x)) => Some(x),
678 (None, None) => None,
679 };
680 // saturating_add: pool the guest-runtime monotonic counters/times (the
681 // sample totals, migrations, iterations, cpu-time-ns, numa pages) across
682 // cgroups. Overflow is physically unreachable for honest data (real
683 // counts << u64::MAX), but a corrupt/hostile guest value would otherwise
684 // debug-panic or release-wrap to a silently-wrong derived metric;
685 // saturating is exact for every in-range value. num_workers is a
686 // ktstr-configured TOPOLOGY count (bounded by the guest CPU count, not a
687 // runtime accumulator), so it cannot overflow and keeps plain `+`.
688 // cross_node_migrated pools as MAX (a system-wide vmstat snapshot shared
689 // by the concurrent cgroups), not a sum.
690 PhaseCgroupStats {
691 num_workers: a.num_workers + b.num_workers,
692 cpus_used,
693 wake_latencies_ns,
694 wake_sample_total: a.wake_sample_total.saturating_add(b.wake_sample_total),
695 timer_latencies_ns,
696 timer_sample_total: a.timer_sample_total.saturating_add(b.timer_sample_total),
697 run_delays_ns,
698 off_cpu_pcts,
699 total_migrations: a.total_migrations.saturating_add(b.total_migrations),
700 total_iterations: a.total_iterations.saturating_add(b.total_iterations),
701 total_cpu_time_ns: a.total_cpu_time_ns.saturating_add(b.total_cpu_time_ns),
702 numa_pages_local: a.numa_pages_local.saturating_add(b.numa_pages_local),
703 numa_pages_total: a.numa_pages_total.saturating_add(b.numa_pages_total),
704 cross_node_migrated: a.cross_node_migrated.max(b.cross_node_migrated),
705 max_gap_ms,
706 max_gap_cpu,
707 stripped: a.stripped || b.stripped,
708 // DERIVED, not a raw component: left EMPTY here and (re)derived post-merge
709 // by derive_phase_metrics (the fold runs derive AFTER merging, the
710 // same reason the pooled map skips is_derived keys in
711 // merge_matched_phase_buckets). Folding stale per-operand metrics would
712 // double-count; the post-merge re-derive is the sole producer.
713 metrics: std::collections::BTreeMap::new(),
714 schbench,
715 taobench,
716 }
717 }
718
719 /// Look up this cgroup's per-phase DERIVED value for `metric_name` — the
720 /// per-cgroup analog of [`PhaseBucket::get`] (see [`Self::metrics`]). `None`
721 /// when this cgroup carried no finite samples for the metric (the ABSENT
722 /// discipline), distinct from `Some(0.0)` (a real reducer zero).
723 pub fn get(&self, metric_name: &str) -> Option<f64> {
724 self.metrics.get(metric_name).copied()
725 }
726
727 /// Like [`Self::get`] but panics citing the metric keys actually present when
728 /// the metric is absent — use when the caller knows this cgroup MUST carry the
729 /// metric in the phase.
730 pub fn expect_metric(&self, metric_name: &str) -> f64 {
731 self.get(metric_name).unwrap_or_else(|| {
732 panic!(
733 "PhaseCgroupStats::expect_metric: metric '{}' absent from this \
734 per-cgroup carrier (num_workers={}, stripped={}); keys present: {:?}. \
735 Causes: (a) the cgroup produced no finite samples for it; (b) metric \
736 name typo; (c) a non-schbench carrier has no schbench keys.",
737 metric_name,
738 self.num_workers,
739 self.stripped,
740 self.metrics.keys().collect::<Vec<_>>(),
741 )
742 })
743 }
744
745 /// The per-cgroup analog of [`PhaseBucket::cgroup_counter_total`] for the
746 /// three per-cgroup Counters that live ONLY on the carrier (no derived
747 /// `metrics` entry): `total_migrations`, `total_iterations`, and
748 /// `total_cpu_time_ns`. Lets
749 /// [`crate::vmm::VmResult::phase_cgroup_metric`] expose them by metric name
750 /// symmetrically with the pooled [`crate::vmm::VmResult::phase_metric`]
751 /// `cgroup_counter_total` fallback (the pooled `_total` suffix marks the
752 /// cross-cgroup SUM; this per-cgroup form returns one cgroup's value). `None`
753 /// for any other name (derived metrics are read via [`Self::get`]).
754 pub fn cgroup_counter(&self, name: &str) -> Option<f64> {
755 match name {
756 "total_migrations" => Some(self.total_migrations as f64),
757 "total_iterations" => Some(self.total_iterations as f64),
758 "total_cpu_time_ns" => Some(self.total_cpu_time_ns as f64),
759 _ => None,
760 }
761 }
762
763 /// Merge two CAPPED uniform reservoirs into one of size ≤ `cap` that is a
764 /// uniform sample of the COMBINED population. `a` is a uniform reservoir of
765 /// `w_a` true samples, `b` of `w_b` (their `wake_sample_total` weights). Each
766 /// output slot is drawn from `a` with probability `w_a / (w_a + w_b)` and from
767 /// `b` otherwise; within a source the index is uniform. Composing the
768 /// source-level uniform reservoir with the within-source uniform draw makes
769 /// each output a uniform draw from the combined population, so the merged
770 /// A-fraction is the TRUE `w_a / (w_a + w_b)`. This removes the equal-slot
771 /// ("reservoir-of-reservoirs") skew an unweighted concat-and-re-cap imposes:
772 /// two already-capped inputs concat ≈ 50/50 by LENGTH regardless of their true
773 /// populations, over-counting the smaller-population carrier. Sampling WITH
774 /// replacement is the correct estimator once the inputs are capped (each
775 /// reservoir element stands for `w/len` population units; the pre-cap samples
776 /// are gone).
777 ///
778 /// DETERMINISTIC: the xorshift64 stream is seeded from the inputs (populations +
779 /// lengths) so the merge is a PURE function of its arguments — unlike
780 /// `crate::workload::reservoir_push`, whose stream is gettid-seeded
781 /// thread-local (a merge run twice would otherwise differ). The triple-shift
782 /// mirrors the codebase's inline xorshift64 (`reservoir_push` /
783 /// `io::xorshift64`).
784 ///
785 /// Assumes `w_a + w_b < 2^64` — a realistic wake population is far below it
786 /// (2^64 wakeups is physically unreachable), so the single-u64 `s % total`
787 /// draw spans `[0, total)`. Callers gate on `a.len() + b.len() > cap`, which
788 /// (each input ≤ cap) guarantees both sources non-empty; the per-slot guards
789 /// below stay safe for a degenerate hand-built input regardless.
790 pub(crate) fn weighted_merge_reservoirs(
791 a: &[u64],
792 w_a: u64,
793 b: &[u64],
794 w_b: u64,
795 cap: usize,
796 ) -> Vec<u64> {
797 if a.is_empty() && b.is_empty() {
798 return Vec::new();
799 }
800 // Weights are the true populations; fall back to reservoir lengths if a
801 // (hand-built) carrier reports zero population alongside non-empty samples,
802 // keeping the split well-defined instead of dividing by a zero total. The
803 // mixed case (one weight 0, the other > 0) is left as-is: a zero weight
804 // sends every draw to the other source, the only defensible split for a
805 // source claiming zero population. Production maintains wake_sample_total
806 // >= len (reservoir_push counts every push), so neither edge is reachable
807 // on the capture path.
808 let (wa, wb) = if w_a == 0 && w_b == 0 {
809 (a.len() as u128, b.len() as u128)
810 } else {
811 (w_a as u128, w_b as u128)
812 };
813 let total = wa + wb;
814 // Loud-panic on the documented `w_a + w_b < 2^64` assumption (a realistic
815 // wake population is far below it): if total exceeded u64::MAX the
816 // `s as u128 % total` draw — s spans [0, 2^64) — could not reach
817 // [2^64, total) and would silently bias the source split. Matches the merge
818 // SUM's debug-panic-on-overflow discipline (loud over silently wrong).
819 debug_assert!(
820 total <= u64::MAX as u128,
821 "weighted_merge_reservoirs: w_a + w_b overflows u64 ({total}); source draw would bias",
822 );
823 // Golden-ratio Weyl multiplier (the codebase's standard PRNG seed mixer);
824 // a non-zero, input-derived seed makes the merge deterministic. xorshift64
825 // has 0 as a fixed point, hence the fallback.
826 const GOLDEN: u64 = 0x9E37_79B9_7F4A_7C15;
827 let mut s =
828 (w_a ^ w_b.rotate_left(32) ^ (a.len() as u64).rotate_left(16) ^ (b.len() as u64))
829 .wrapping_mul(GOLDEN);
830 if s == 0 {
831 s = GOLDEN;
832 }
833 let step = |x: u64| {
834 let mut v = x;
835 v ^= v << 13;
836 v ^= v >> 7;
837 v ^= v << 17;
838 v
839 };
840 let mut out = Vec::with_capacity(cap);
841 for _ in 0..cap {
842 s = step(s);
843 // Defensive empty-source guards: caller gates ensure both non-empty,
844 // but a stripped / zero-population fixture must never index an empty
845 // slice.
846 let from_a = if a.is_empty() {
847 false
848 } else if b.is_empty() {
849 true
850 } else {
851 (s as u128 % total) < wa
852 };
853 s = step(s);
854 if from_a {
855 out.push(a[(s % a.len() as u64) as usize]);
856 } else {
857 out.push(b[(s % b.len() as u64) as usize]);
858 }
859 }
860 out
861 }
862
863 /// Off-CPU% reduction for the per-phase per-cgroup render:
864 /// `(avg, min, max, spread)` over [`Self::off_cpu_pcts`], or `None` when
865 /// the vec is empty — the NOT-measured state (no worker had positive wall
866 /// time). Reduces the SAME per-worker pcts [`cgroup_stats`] reduces
867 /// (off_cpu_ns / wall_time_ns × 100), so for a phase spanning the whole run
868 /// it reproduces that whole-run reduction; `spread = max − min`.
869 /// `Some((0.0, ..))` is a MEASURED zero (distinct from the `None`
870 /// not-measured state), preserving the discipline the empty-vec contract on
871 /// `off_cpu_pcts` keeps. Display-only: never written back into a re-pool.
872 pub fn off_cpu_summary(&self) -> Option<(f64, f64, f64, f64)> {
873 let pcts = &self.off_cpu_pcts;
874 if pcts.is_empty() {
875 return None;
876 }
877 let min = pcts.iter().cloned().reduce(f64::min).expect("non-empty");
878 let max = pcts.iter().cloned().reduce(f64::max).expect("non-empty");
879 let avg = pcts.iter().sum::<f64>() / pcts.len() as f64;
880 Some((avg, min, max, max - min))
881 }
882
883 /// Wake-latency reduction for the per-phase render:
884 /// `(p99_us, median_us)` over the pooled [`Self::wake_latencies_ns`], or
885 /// `None` when the pool is empty. Nearest-rank percentile via `percentile`
886 /// (ns→µs once), reproducing [`cgroup_stats`]'s p99/median value-for-value
887 /// for the ≤cap pool (and the run-level re-pool's `reduce_sorted_distribution`).
888 /// Above `MAX_WAKE_SAMPLES` the pool is a distribution-preserving reservoir
889 /// subsample (see [`Self::wake_latencies_ns`]), so p99/median is then
890 /// distribution-equivalent, NOT byte-identical, to the full-pool reduction —
891 /// the rendered tail stays accurate, only exact parity is size-bounded.
892 /// `None`-on-empty omits the wake segment from the render rather than
893 /// painting a misleading 0µs (the display analogue of `cgroup_stats`'s
894 /// 0.0-sentinel, which has no Option to carry not-measured).
895 pub fn wake_summary(&self) -> Option<(f64, f64)> {
896 if self.wake_latencies_ns.is_empty() {
897 return None;
898 }
899 let mut sorted = self.wake_latencies_ns.clone();
900 sorted.sort_unstable();
901 let p99 = percentile(&sorted, 0.99) as f64 / 1000.0;
902 let median = percentile(&sorted, 0.5) as f64 / 1000.0;
903 Some((p99, median))
904 }
905
906 /// Timer-latency reduction for the per-phase metric emission:
907 /// `(median_us, p99_us, p999_us)` over the pooled
908 /// [`Self::timer_latencies_ns`] (nearest-rank, ns→µs), or `None` when the
909 /// pool is empty (the not-measured state — `write_carrier_scalars` omits all
910 /// three timer keys together). Reproduces the per-cgroup
911 /// [`crate::assert::cgroup_stats`] timer reductions value-for-value below the
912 /// reservoir cap (distribution-equivalent above it), exactly like
913 /// [`Self::wake_summary`]. The run-level WORST (max) is NOT here — it comes
914 /// from the cross-phase re-pool (`populate_run_distribution_metrics`).
915 pub fn timer_summary(&self) -> Option<(f64, f64, f64)> {
916 if self.timer_latencies_ns.is_empty() {
917 return None;
918 }
919 let mut sorted = self.timer_latencies_ns.clone();
920 sorted.sort_unstable();
921 let median = percentile(&sorted, 0.5) as f64 / 1000.0;
922 let p99 = percentile(&sorted, 0.99) as f64 / 1000.0;
923 let p999 = percentile(&sorted, 0.999) as f64 / 1000.0;
924 Some((median, p99, p999))
925 }
926
927 /// Wake-latency coefficient of variation (stddev / mean) over the pooled
928 /// [`Self::wake_latencies_ns`] (`n = len`), or `None` when the pool is
929 /// empty (the not-measured state — so the `run_metrics` carrier writer
930 /// omits all three wake keys together). `Some(0.0)` when
931 /// the mean is zero is a MEASURED zero. Reproduces [`cgroup_stats`]'s
932 /// `wake_latency_cv` value-for-value for the ≤cap pool (`Σv` in `u64`,
933 /// same accumulation); above `MAX_WAKE_SAMPLES` the pool is a
934 /// distribution-preserving reservoir subsample, so the CV is then
935 /// distribution-equivalent, not byte-identical.
936 pub fn wake_cv(&self) -> Option<f64> {
937 if self.wake_latencies_ns.is_empty() {
938 return None;
939 }
940 let n = self.wake_latencies_ns.len() as f64;
941 let mean_ns = self.wake_latencies_ns.iter().sum::<u64>() as f64 / n;
942 if mean_ns > 0.0 {
943 let variance = self
944 .wake_latencies_ns
945 .iter()
946 .map(|&v| (v as f64 - mean_ns).powi(2))
947 .sum::<f64>()
948 / n;
949 Some(variance.sqrt() / mean_ns)
950 } else {
951 Some(0.0)
952 }
953 }
954
955 /// Run-delay reduction for the per-phase render:
956 /// `(mean_us, worst_us)` over the per-worker [`Self::run_delays_ns`] (raw
957 /// ns), or `None` when empty. Divides ns→µs ONCE on the summed / maxed ns.
958 /// `worst` reproduces [`cgroup_stats`]'s value-for-value (`max(ns)/1000 ==
959 /// max(ns/1000)`, division is monotone). `mean` reproduces it to f64 ULP,
960 /// not bit-exactly: this f64-sums then divides once (`Σns/n/1000`), while
961 /// `cgroup_stats` divides each worker's ns by 1000 first then sums
962 /// (`Σ(ns/1000)/n`) — the same value reassociated, differing only
963 /// sub-display-precision (a divergent-input parity test bounds it at 1e-9).
964 /// Each sample is
965 /// one worker's whole-phase cumulative `sched_info.run_delay` delta, so
966 /// `mean` is the average per-worker total queued-to-run delay and `worst`
967 /// the largest. `None`-on-empty omits the segment.
968 pub fn run_delay_summary(&self) -> Option<(f64, f64)> {
969 if self.run_delays_ns.is_empty() {
970 return None;
971 }
972 let n = self.run_delays_ns.len() as f64;
973 // Sum in f64, NOT u64-then-cast: matches cgroup_stats's f64 accumulation
974 // and cannot integer-overflow (an f64 sum saturates toward +inf; a u64
975 // sum would panic in debug / silently wrap in release on a pathological
976 // pool). Values are identical within the documented 1e-9 ULP bound.
977 let mean = self.run_delays_ns.iter().map(|&v| v as f64).sum::<f64>() / n / 1000.0;
978 let worst = *self.run_delays_ns.iter().max().expect("non-empty") as f64 / 1000.0;
979 Some((mean, worst))
980 }
981}
982
983impl CgroupStats {
984 /// Wake-latency tail amplification:
985 /// `p99_wake_latency_us / median_wake_latency_us`. Returns `0.0`
986 /// when `median_wake_latency_us <= 0.0` so the result never
987 /// propagates `NaN` / `Infinity` into downstream
988 /// `finite_or_zero` filters. Method-only access (no stored
989 /// shadow) — recomputed every call from the raw fields.
990 ///
991 /// Unitless; ≥1.0 by definition of order statistics (p99 cannot
992 /// undershoot the median on the same sample set). Values far
993 /// above 1.0 signal a long tail — the scheduler wakes most
994 /// workers promptly but occasionally stalls some, a regression
995 /// axis that neither `median_*` nor `p99_*` exposes in
996 /// isolation.
997 pub fn wake_latency_tail_ratio(&self) -> f64 {
998 if self.median_wake_latency_us > 0.0 {
999 self.p99_wake_latency_us / self.median_wake_latency_us
1000 } else {
1001 0.0
1002 }
1003 }
1004
1005 /// Whether this cgroup measured the given distribution `source`. Gates the
1006 /// run-level carrier-less re-pool in [`populate_run_distribution_metrics`]
1007 /// so a cgroup that recorded no samples for `source` contributes ABSENCE
1008 /// (leaving the fold `None` when no cgroup measured it), not a sentinel
1009 /// `0.0`. See [`Self::wake_measured`] / [`Self::timer_measured`] /
1010 /// [`Self::run_delay_measured`].
1011 pub fn measured_for(&self, source: crate::stats::SampleSource) -> bool {
1012 match source {
1013 crate::stats::SampleSource::WakeLatencyNs => self.wake_measured,
1014 crate::stats::SampleSource::TimerLatencyNs => self.timer_measured,
1015 crate::stats::SampleSource::RunDelayNs => self.run_delay_measured,
1016 }
1017 }
1018
1019 /// Throughput per parallel degree:
1020 /// `total_iterations / num_workers`. `None` when
1021 /// `num_workers == 0` (no worker reported, so per-worker
1022 /// throughput is undefined — distinct from a measured zero);
1023 /// `Some(0.0)` when workers ran but completed zero iterations
1024 /// (a real throughput collapse). The `None` / `Some(0.0)` split
1025 /// is load-bearing: the run-level worst-cgroup re-pool in
1026 /// [`populate_run_distribution_metrics`] (the
1027 /// `MetricKind::WorstLowest` arm) must treat a measured zero as
1028 /// the worst reading (it wins the "lowest" bucket) while skipping
1029 /// a no-data cgroup — collapsing both to `0.0` would hide a
1030 /// starved cgroup behind the no-data sentinel. Method-only
1031 /// access (no stored shadow) — recomputed every call from the
1032 /// raw fields.
1033 ///
1034 /// Only meaningful across runs of the SAME variant (equal
1035 /// scenario duration): cross-variant comparison is misleading
1036 /// because this metric is NOT rate-normalized — a longer-
1037 /// running scenario racks up more iterations per worker even if
1038 /// the scheduler is identical. `perf-delta`-style
1039 /// comparisons hold scenario, topology, and work_type constant
1040 /// before reading this method.
1041 pub fn iterations_per_worker(&self) -> Option<f64> {
1042 crate::assert::reductions::iterations_per_worker_of(self.num_workers, self.total_iterations)
1043 }
1044
1045 /// Worker iterations per CPU-second of on-CPU time consumed by this
1046 /// cgroup's workers — `total_iterations / (total_cpu_time_ns / 1e9)`.
1047 ///
1048 /// Unlike [`Self::iterations_per_worker`] (raw work, which scales with
1049 /// the host-CPU budget delivered to the guest) and a wall-time rate
1050 /// (which also drops under host oversubscription), this is
1051 /// OVERCOMMIT-INVARIANT: under `cpu_budget < vcpus` a cell completes
1052 /// proportionally fewer iterations AND consumes proportionally less
1053 /// on-CPU time, so the ratio cancels the lost host-CPU-time factor. Use
1054 /// it to compare per-cgroup throughput across `cpu_budget` settings.
1055 ///
1056 /// `None` when `num_workers == 0` (no worker — undefined, distinct from a
1057 /// measured zero) or `total_cpu_time_ns == 0` (no on-CPU time captured;
1058 /// returns inconclusive rather than `Inf`). For a pure busy-spin
1059 /// workload this rate is ~constant by construction, so it measures
1060 /// CPU-time EFFICIENCY; for the cross-cell ALLOCATION balance use
1061 /// [`ScenarioStats::cgroup_balance_ratio`] over `iterations_per_worker`.
1062 pub fn iterations_per_cpu_sec(&self) -> Option<f64> {
1063 crate::assert::reductions::iterations_per_cpu_sec_of(
1064 self.num_workers,
1065 self.total_cpu_time_ns,
1066 self.total_iterations,
1067 )
1068 }
1069}
1070
1071/// Identifier for a scenario phase. Newtype over `u16` carrying
1072/// the same 1-indexed encoding documented on every other
1073/// phase-touching site: `Phase::BASELINE` is the pre-first-Step
1074/// settle window (`u16` 0); `Phase::step(k)` is scenario Step `k`
1075/// at 1-indexed `u16` `k + 1`. The newtype catches the bug class
1076/// where a raw `u16` flows between sites that disagree about
1077/// 0-indexed vs 1-indexed Step encoding, and gives operators
1078/// readable construction at consumer sites (`Phase::BASELINE` /
1079/// `Phase::step(2)` instead of magic `0u16` / `3u16`).
1080///
1081/// Wire-format identical to a `u16` via `#[serde(transparent)]` —
1082/// the on-disk sidecar shape is unchanged from the bare-`u16`
1083/// pipeline, and existing JSON / typeshare consumers see the same
1084/// scalar field. `.phase_raw()` exposes the inner `u16` for paths
1085/// that hand the value to a serializer or formatter that does not
1086/// understand the newtype.
1087#[derive(
1088 Debug,
1089 Clone,
1090 Copy,
1091 PartialEq,
1092 Eq,
1093 Hash,
1094 PartialOrd,
1095 Ord,
1096 Default,
1097 serde::Serialize,
1098 serde::Deserialize,
1099)]
1100#[serde(transparent)]
1101pub struct Phase(u16);
1102
1103impl Phase {
1104 /// Pre-first-Step settle window. The framework writes
1105 /// `Phase::BASELINE` to `Ctx::current_step` at scenario start
1106 /// (before any Step's `current_step.store` advance), so any
1107 /// capture taken before the first Step transition stamps with
1108 /// this value.
1109 pub const BASELINE: Self = Self(0);
1110
1111 /// Construct a `Phase` for the `zero_indexed`-th scenario Step.
1112 /// The 1-indexed encoding (Step 0 → `u16` 1, Step 1 → `u16` 2,
1113 /// ...) keeps `BASELINE` unambiguous at `u16` 0. Saturates at
1114 /// `u16::MAX` rather than overflowing — a scenario with > 65k
1115 /// Steps is pathological and the saturating value still
1116 /// distinguishes "well past any real Step" from BASELINE.
1117 pub const fn step(zero_indexed: u16) -> Self {
1118 Self(zero_indexed.saturating_add(1))
1119 }
1120
1121 /// True iff this is `Phase::BASELINE` (the pre-first-Step
1122 /// settle window).
1123 pub const fn is_baseline(&self) -> bool {
1124 self.0 == 0
1125 }
1126
1127 /// Inner `u16`. Use this when handing the value to a
1128 /// serializer / formatter / external consumer that does not
1129 /// understand the newtype. Production callers that build a
1130 /// `Phase` for downstream comparison should prefer
1131 /// `Phase::BASELINE` / `Phase::step(k)` over wrapping a raw
1132 /// `u16` themselves.
1133 pub const fn as_u16(self) -> u16 {
1134 self.0
1135 }
1136}
1137
1138impl std::fmt::Display for Phase {
1139 /// `"BASELINE"` for [`Phase::BASELINE`], `"Step[k]"` for
1140 /// [`Phase::step`] (decoded back via the 1-indexed
1141 /// encoding). Matches the labels [`PhaseBucket`] embeds in
1142 /// `label` so operators see consistent phase identifiers
1143 /// across structured-sidecar reads and ad-hoc `format!`
1144 /// output.
1145 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1146 if self.is_baseline() {
1147 write!(f, "BASELINE")
1148 } else {
1149 write!(f, "Step[{}]", self.0 - 1)
1150 }
1151 }
1152}
1153
1154impl From<u16> for Phase {
1155 /// Wrap a raw 1-indexed encoded value as a [`Phase`]. Production
1156 /// paths that already have the encoded value (e.g. drained from
1157 /// the host-side mirror of `current_step`, or read out of a
1158 /// deserialized sidecar) construct the typed wrapper via this
1159 /// conversion without re-deriving the encoding.
1160 fn from(value: u16) -> Self {
1161 Self(value)
1162 }
1163}
1164
1165impl From<Phase> for u16 {
1166 fn from(value: Phase) -> Self {
1167 value.0
1168 }
1169}
1170
1171/// Per-phase metric bucket — one entry per scenario phase in
1172/// [`ScenarioStats::phases`].
1173///
1174/// A scenario with N Steps yields `N + 1` phases: phase 0 is the
1175/// BASELINE (pre-first-Step settle window), and phases 1..=N
1176/// correspond to Step 0..Step N-1 in scenario order. The
1177/// 1-indexed Step encoding (instead of 0-indexed) lets BASELINE
1178/// own `step_index = 0` unambiguously — a `step_index = 0` sample
1179/// is always settle, not first-Step.
1180///
1181/// Each bucket carries the metric values reduced over the phase's
1182/// sample window. For `crate::stats::MetricKind::Counter`
1183/// metrics the reduction is `last - first` across the phase's
1184/// periodic samples (cumulative-counter delta); for `Gauge` /
1185/// `Peak` / `Timestamp` it dispatches per the kind via
1186/// `crate::stats::aggregate_samples`. Missing metric keys mean
1187/// the phase had no finite samples for that metric.
1188///
1189/// Metric keys match `crate::stats::MetricDef::name` — see
1190/// `crate::stats::METRICS` for the canonical list of registered
1191/// metric names a `get` / `phase_metric` lookup expects.
1192#[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize, crate::Claim)]
1193pub struct PhaseBucket {
1194 /// Phase index. `0` = BASELINE (pre-first-Step settle window).
1195 /// `1..=N` align with Step ordinals (1-indexed): Step 0 of the
1196 /// scenario lives at `step_index = 1`, Step 1 at
1197 /// `step_index = 2`, etc. The encoding avoids the collision
1198 /// where a 0-indexed Step would share `step_index = 0` with
1199 /// the BASELINE settle window.
1200 pub step_index: u16,
1201 /// Human-readable label. `"BASELINE"` for `step_index = 0`,
1202 /// `"Step[0]"` / `"Step[1]"` / ... for `step_index = 1..=N`.
1203 /// Mirrors the formatting used by
1204 /// `crate::timeline::Timeline`'s phase rendering so operator
1205 /// inspection of the formatted diagnostic and the structured
1206 /// sidecar yield the same phase identifiers.
1207 pub label: String,
1208 /// Phase window start: the MINIMUM per-sample time anchor in the
1209 /// phase — each sample's `boundary_offset_ms`, falling back to its
1210 /// `elapsed_ms`. Samples with neither anchor (both `None` — a
1211 /// not-measured timestamp) are excluded from the min.
1212 pub start_ms: u64,
1213 /// Phase window end: the MAXIMUM per-sample time anchor in the
1214 /// phase (the same `boundary_offset_ms`-or-`elapsed_ms` key as
1215 /// `start_ms`). A phase whose every sample is unanchored yields the
1216 /// inverted window `(start_ms = u64::MAX, end_ms = 0)`, which folds
1217 /// no monitor samples. Downstream renderers should not assume the
1218 /// value is closed against a stimulus event.
1219 pub end_ms: u64,
1220 /// Number of periodic samples bucketed into this phase. Zero
1221 /// when the phase fired no captures: BASELINE when the settle window
1222 /// was shorter than the periodic interval, OR a synthesized
1223 /// capture-free interior step (the
1224 /// `build_phase_buckets_with_stimulus` seam — a `StepStart`-step
1225 /// whose window held no periodic boundary still gets a bucket so its
1226 /// capture-independent `iteration_rate` is not dropped).
1227 pub sample_count: usize,
1228 /// Per-metric phase-aggregated values. See the [`PhaseBucket`]
1229 /// struct doc for the registry key source and per-kind reduction
1230 /// dispatch; missing keys mean the phase carried no finite
1231 /// samples for that metric (sentinel-free: `None` from the
1232 /// reducer surfaces as "key absent" rather than "value 0.0").
1233 pub metrics: std::collections::BTreeMap<String, f64>,
1234 /// Per-cgroup raw telemetry components for this phase, keyed by cgroup
1235 /// name (see [`PhaseCgroupStats`]). Empty until a capture path populates
1236 /// it; the structural carrier for the per-phase per-cgroup distributional
1237 /// re-pool. Whole-run = aggregate of these per-phase per-cgroup components.
1238 ///
1239 /// An ORPHAN bucket — a guest carrier whose `step_index` has NO paired host
1240 /// bucket (a dropped/absent StepStart frame, or a stimulus-less host/fixture
1241 /// path; NOT merely a short step, since `build_phase_buckets_with_stimulus`
1242 /// synthesizes a bucket for every StepStart so a captured-but-short step
1243 /// takes the matched arm) — is carried by
1244 /// `fold_guest_per_cgroup_into_host_buckets` with the shape
1245 /// `(start_ms, end_ms) == (0, 0)` AND empty `metrics` AND non-empty
1246 /// `per_cgroup` (it carries only these components). On every non-zero-duration
1247 /// window that shape is the orphan arm's, so the timeline render keys on it
1248 /// to surface "window not measured" rather than a misleading `0ms` (see
1249 /// `crate::timeline::phase_from_bucket`): a captured bucket has metrics. A
1250 /// zero-duration step at scenario start (`StepStart==StepEnd==0`) can also
1251 /// produce it via the matched arm, but harmlessly — a zero-duration step has
1252 /// no window, so "not measured" reads the same as "0ms".
1253 pub per_cgroup: std::collections::BTreeMap<String, PhaseCgroupStats>,
1254}
1255
1256impl PhaseBucket {
1257 /// Look up the phase-aggregated value for `metric_name` (see
1258 /// [`PhaseBucket::metrics`] for the registry source). Returns
1259 /// `None` when the phase carried no finite samples for that
1260 /// metric — distinct from `Some(0.0)` which means the reducer
1261 /// produced a real zero from finite samples.
1262 pub fn get(&self, metric_name: &str) -> Option<f64> {
1263 self.metrics.get(metric_name).copied()
1264 }
1265
1266 /// Like [`Self::get`], but panics with a diagnostic message citing
1267 /// the bucket's `step_index` + `label` + `sample_count` + the set
1268 /// of metric keys actually present when the metric is absent. Use
1269 /// when the caller knows the metric MUST be in the bucket (the
1270 /// phase fired samples and the metric is registered — see
1271 /// [`PhaseBucket::metrics`]) — the panic message tells the operator whether the cause is
1272 /// "phase produced no samples" (sample_count of 0) or "metric key
1273 /// typo" (positive sample_count but the key isn't in `metrics`).
1274 ///
1275 /// ```ignore
1276 /// let bucket = r.stats.phase(Phase::step(0)).expect("Step[0] phase");
1277 /// let throughput = bucket.expect_metric("throughput");
1278 /// ```
1279 pub fn expect_metric(&self, metric_name: &str) -> f64 {
1280 self.get(metric_name).unwrap_or_else(|| {
1281 panic!(
1282 "PhaseBucket::expect_metric: metric '{}' absent from phase \
1283 step_index={} ('{}') with sample_count={}. \
1284 metric keys present in this bucket: {:?}. \
1285 Possible causes: (a) phase carried 0 samples for this \
1286 metric (sample_count==0 means no captures landed in the \
1287 phase at all; sample_count>0 means captures landed but \
1288 the metric extracted no finite values from them); \
1289 (b) metric name typo — built-ins are typed via \
1290 crate::stats::BuiltinMetric (a typo is a compile \
1291 error); a dynamic key resolves via crate::stats::MetricId::def.",
1292 metric_name,
1293 self.step_index,
1294 self.label,
1295 self.sample_count,
1296 self.metrics.keys().collect::<Vec<_>>(),
1297 )
1298 })
1299 }
1300
1301 /// Cross-cgroup phase total for a per-cgroup Counter metric that lives
1302 /// in [`Self::per_cgroup`] but never in [`Self::metrics`] — currently
1303 /// `"total_migrations"`, `"total_iterations"`, and `"total_cpu_time_ns"`.
1304 ///
1305 /// These are registered `MetricKind::Counter`s
1306 /// (`crate::stats::METRICS`) whose per-sample source is absent
1307 /// (`crate::stats::MetricDef::read_sample` returns `None` — they are
1308 /// per-task guest counters not captured per tick), so
1309 /// [`crate::assert::build_phase_buckets`] never folds them into
1310 /// `metrics`; only the per-cgroup carrier ([`PhaseCgroupStats`], built
1311 /// by `phase_cgroup_stats`) holds them. This sums them across the
1312 /// phase's `per_cgroup` carriers — the SAME cross-cgroup Counter sum
1313 /// [`ScenarioStats::total_migrations`] takes run-level and
1314 /// `merge_matched_phase_buckets` takes per key — so the value is the
1315 /// phase total, not a per-cgroup fragment.
1316 ///
1317 /// `None` when the phase has no `per_cgroup` carriers (NOT measured —
1318 /// distinct from a measured `Some(0.0)` when carriers exist but counted
1319 /// zero) or `name` is not one of the per-cgroup-sourced counters.
1320 /// Surfacing them here never double-sources the run-level `ext_metrics`,
1321 /// but by two different mechanisms: `total_migrations` / `total_iterations`
1322 /// are in `TYPED_FIELD_NAMES` (`populate_run_ext_metrics_from_phases` skips
1323 /// them; the typed `GauntletRow` accessor stays the single run-level
1324 /// authority), while `total_cpu_time_ns` is NOT in `TYPED_FIELD_NAMES` —
1325 /// it is safe because it is never written into any pooled `metrics` map
1326 /// (its `MetricDef` accessor and `read_sample` both return `None`, so no
1327 /// run-level fold ever picks it up; it lives only on the per-cgroup
1328 /// carrier).
1329 ///
1330 /// Counterpart to [`Self::get`] (which reads `metrics` only).
1331 /// [`crate::vmm::VmResult::phase_metric`] falls back to this so a
1332 /// `post_vm` callback reading `phase_metric(phase, "total_migrations")`
1333 /// gets the value instead of a silent `None`.
1334 pub fn cgroup_counter_total(&self, name: &str) -> Option<f64> {
1335 let field: fn(&PhaseCgroupStats) -> u64 = match name {
1336 "total_migrations" => |c| c.total_migrations,
1337 "total_iterations" => |c| c.total_iterations,
1338 "total_cpu_time_ns" => |c| c.total_cpu_time_ns,
1339 _ => return None,
1340 };
1341 if self.per_cgroup.is_empty() {
1342 return None;
1343 }
1344 // saturating: overflow-safe cross-cgroup pool of the guest counter for
1345 // the post_vm by-name read (a wrapped sum would read silently-small).
1346 Some(
1347 self.per_cgroup
1348 .values()
1349 .map(field)
1350 .fold(0u64, u64::saturating_add) as f64,
1351 )
1352 }
1353}
1354
1355/// Merge two [`PhaseBucket`]s sharing the same `step_index` per
1356/// the per-MetricKind dispatch in [`crate::stats::MergeKind`].
1357/// Called by [`AssertResult::merge`] for matched buckets;
1358/// unmatched buckets are appended verbatim by the caller.
1359///
1360/// Window-invariant merge:
1361/// - `step_index`: equal by precondition (caller pairs buckets by
1362/// `step_index`), kept from `a`.
1363/// - `label`: kept from `a`. By construction the label is derived
1364/// purely from `step_index` (`"BASELINE"` / `"Step[k]"`) so both
1365/// sides agree.
1366/// - `start_ms`: `min(a.start_ms, b.start_ms)` so the merged
1367/// window covers the earliest start of either side.
1368/// - `end_ms`: `max(a.end_ms, b.end_ms)` so the merged window
1369/// covers the latest end. Drives the [`crate::stats::MergeKind::NonCommutative`]
1370/// tiebreak on Gauge(Last) / Timestamp metrics — the value
1371/// from the bucket whose `end_ms` is later wins.
1372/// - `sample_count`: `a + b`. Used as the weighting denominator
1373/// for the `MetricKind::Gauge(GaugeAgg::Avg)` weighted mean.
1374///
1375/// Per-metric merge dispatches on the metric's `crate::stats::MetricKind`
1376/// from the registry via [`crate::stats::metric_def`]:
1377/// - `MetricKind::Counter` → `a + b` (the two reduced values are
1378/// per-phase deltas; the merge across cgroups sums per-cgroup
1379/// contributions to the phase delta, mirroring how
1380/// `ScenarioStats::total_migrations` adds across cgroups).
1381/// - `MetricKind::Peak` and `MetricKind::Gauge(GaugeAgg::Max)` →
1382/// `max(a, b)` (the worst-case "peak that fired" survives).
1383/// - `MetricKind::Gauge(GaugeAgg::Avg)` → weighted mean
1384/// `(a * a_w + b * b_w) / (a_w + b_w)` where `a_w = a_count.max(1)`
1385/// and `b_w = b_count.max(1)` — the unbiased combination of both
1386/// sides' per-phase means weighted by sample population, each weight
1387/// floored at 1. The `.max(1)` floor (mirroring
1388/// `populate_run_ext_metrics_from_phases`) keeps a synthesized
1389/// zero-capture bucket's capture-independent Gauge(Avg) value
1390/// contributing one phase-observation of weight rather than being
1391/// zero-weighted out of
1392/// the merge — the silent-drop the synthesize seam exists to prevent.
1393/// With both counts > 0 the floor is a no-op (the plain
1394/// sample-population weighting); both counts zero degenerates to
1395/// `(a + b) / 2.0`.
1396/// - `MetricKind::Gauge(GaugeAgg::Last)` and `MetricKind::Timestamp`
1397/// → value from the bucket with the larger `end_ms`; ties keep
1398/// `a`'s value. Captures the "latest-sample-wins" semantic per
1399/// the [`crate::stats::MergeKind::NonCommutative`] contract.
1400/// - `MetricKind::Rate { .. }` → SKIPPED in the per-key fold and
1401/// re-derived from the pooled components by
1402/// [`crate::stats::derive_rate_metrics`] as a post-pass, so the
1403/// merged rate is `Σnumerator / Σdenominator` (each component
1404/// folds by its own kind first) rather than a fold of two
1405/// ready-made per-phase ratios.
1406///
1407/// Unregistered metric names (not in `crate::stats::METRICS`)
1408/// fall back to a commutative arithmetic mean
1409/// `(a + b) / 2.0`. The mean is the safest default for an unknown
1410/// kind: sum would over-count Gauge / Timestamp values, max would
1411/// lose Counter / Avg signal, and "last" requires a tiebreak the
1412/// caller can't compute without the kind. Producers attaching
1413/// unregistered metrics to a `PhaseBucket` should add them to
1414/// `METRICS` to get the typed merge instead of the fallback.
1415pub(crate) fn merge_matched_phase_buckets(a: PhaseBucket, b: PhaseBucket) -> PhaseBucket {
1416 assert_eq!(
1417 a.step_index, b.step_index,
1418 "merge_matched_phase_buckets: caller must pair by step_index",
1419 );
1420 let mut metrics = std::collections::BTreeMap::new();
1421 // Collect every key present on either side; iterate once,
1422 // dispatching per the kind of the key (or the unregistered
1423 // mean fallback) so the merge is single-pass.
1424 let mut keys: std::collections::BTreeSet<&String> = a.metrics.keys().collect();
1425 keys.extend(b.metrics.keys());
1426 for key in keys {
1427 // Derived metrics (every `is_derived()`: Rate / Distribution / WorstLowest /
1428 // WakeLatencyTailRatio / WorstCrossNodeRatio / PerPhase) are NOT merged
1429 // here: a Rate re-derives from the merged components in the post-pass
1430 // below; the distributional kinds (Distribution / WorstLowest /
1431 // WakeLatencyTailRatio / WorstCrossNodeRatio) re-pool run-level by
1432 // `populate_run_distribution_metrics` (they never appear in
1433 // phase.metrics — `aggregate_samples_for_phase` returns None — so this
1434 // skip is also a structural guard); PerPhase is re-derived by
1435 // `derive_phase_metrics`. Folding a ready-made derived value
1436 // would lose the re-pool.
1437 if crate::stats::metric_def(key).is_some_and(|m| m.kind.is_derived()) {
1438 continue;
1439 }
1440 let av = a.metrics.get(key).copied();
1441 let bv = b.metrics.get(key).copied();
1442 let merged = match (av, bv) {
1443 (Some(av), Some(bv)) => {
1444 let kind = crate::stats::metric_def(key).map(|m| m.kind);
1445 merge_metric_values(
1446 kind,
1447 av,
1448 bv,
1449 a.sample_count,
1450 b.sample_count,
1451 a.end_ms,
1452 b.end_ms,
1453 )
1454 }
1455 (Some(v), None) | (None, Some(v)) => v,
1456 (None, None) => continue,
1457 };
1458 metrics.insert(key.clone(), merged);
1459 }
1460 // Re-derive Rate metrics from the now-pooled components: each
1461 // component merged by its own kind above (a Counter numerator
1462 // summed), so the rate becomes Σnumerator / Σdenominator — the
1463 // correct re-pool, not a mean of the two phases' ready-made ratios.
1464 crate::stats::derive_rate_metrics(&mut metrics);
1465 // Union per_cgroup by cgroup name: a cgroup present on both sides folds
1466 // its raw components per PhaseCgroupStats::merge (concat samples, sum
1467 // counters, combine extremes); a cgroup on only one side is carried
1468 // verbatim. Empty ∪ empty = empty, so this is a no-op until a capture
1469 // path populates per_cgroup (the structural-carrier invariant).
1470 let mut per_cgroup = a.per_cgroup;
1471 for (name, b_cg) in b.per_cgroup {
1472 match per_cgroup.remove(&name) {
1473 Some(a_cg) => {
1474 per_cgroup.insert(name, PhaseCgroupStats::merge(a_cg, b_cg));
1475 }
1476 None => {
1477 per_cgroup.insert(name, b_cg);
1478 }
1479 }
1480 }
1481 PhaseBucket {
1482 step_index: a.step_index,
1483 label: a.label,
1484 start_ms: a.start_ms.min(b.start_ms),
1485 end_ms: a.end_ms.max(b.end_ms),
1486 sample_count: a.sample_count.saturating_add(b.sample_count),
1487 metrics,
1488 per_cgroup,
1489 }
1490}
1491
1492/// Fold the guest-collected per-phase `per_cgroup` carriers into the
1493/// host-rebuilt phase buckets, keyed by `step_index`.
1494///
1495/// The host rebuilds phase buckets from the periodic-capture series
1496/// (window + metric folds), but those buckets carry an empty `per_cgroup`
1497/// by construction. The guest collects per-cgroup RAW components per step
1498/// ([`crate::scenario::collect_handles`] under `collect_step`) into carrier
1499/// buckets whose only payload is `per_cgroup` — a merge-neutral
1500/// `(u64::MAX, 0)` window and empty `metrics`. Guest and host `step_index`
1501/// are the SAME 1-indexed value: the step loop stamps
1502/// `phase_step_index = step_idx + 1` onto BOTH the `StepStart` frames the
1503/// host rebuilds buckets from AND the `collect_step` carrier, so pairing by
1504/// `step_index` is exact and cannot drift.
1505///
1506/// Each guest carrier whose `step_index` matches a host bucket folds its
1507/// `per_cgroup` in via [`merge_matched_phase_buckets`] — a no-op on the
1508/// host's window (`min`/`max` against `MAX`/`0`), metrics (the carrier has
1509/// none, so each host key is carried verbatim), and `sample_count` (`+ 0`),
1510/// contributing ONLY the unioned `per_cgroup`. A guest `step_index` with no
1511/// host bucket — a DEFENSIVE case: the carrier's `step_index` has no `StepStart`
1512/// frame in the host stimulus timeline (a dropped/absent stimulus frame, or a
1513/// stimulus-less host/fixture path), since `build_phase_buckets_with_stimulus`
1514/// SYNTHESIZES a capture-free bucket for every StepStart-step, so a
1515/// captured-but-short step takes the matched arm above, not this one — is carried
1516/// verbatim with its window normalized to `(0, 0)` so duration consumers
1517/// (`end_ms - start_ms`) never underflow the merge-neutral sentinel — no
1518/// `per_cgroup` datum is silently dropped. With no guest carriers (a run
1519/// with no step-local cgroups) the host buckets pass through unchanged. The
1520/// returned vec is sorted by `step_index`.
1521pub(crate) fn fold_guest_per_cgroup_into_host_buckets(
1522 host_buckets: Vec<PhaseBucket>,
1523 guest_buckets: Vec<PhaseBucket>,
1524) -> Vec<PhaseBucket> {
1525 let host_len = host_buckets.len();
1526 // No-silent-drops: host buckets have unique step_index
1527 // (build_phase_buckets_with_stimulus emits one bucket per step_index), but
1528 // fold same-step_index duplicates via merge rather than a last-wins collect so
1529 // a future producer that violated the invariant DEGRADES to a merge, never a
1530 // silent release-mode drop. The debug_assert still trips loudly in test/debug.
1531 let mut by_idx: std::collections::BTreeMap<u16, PhaseBucket> =
1532 std::collections::BTreeMap::new();
1533 for b in host_buckets {
1534 match by_idx.remove(&b.step_index) {
1535 Some(existing) => {
1536 by_idx.insert(b.step_index, merge_matched_phase_buckets(existing, b));
1537 }
1538 None => {
1539 by_idx.insert(b.step_index, b);
1540 }
1541 }
1542 }
1543 debug_assert_eq!(
1544 by_idx.len(),
1545 host_len,
1546 "host buckets must have unique step_index; a collision merged (not dropped)",
1547 );
1548 for gb in guest_buckets {
1549 // Every guest carrier MUST carry the merge-neutral (u64::MAX, 0) sentinel
1550 // window (the step_per_cgroup_bucket invariant). Validate it BEFORE the
1551 // match so BOTH arms are guarded: the matched arm relies on the window
1552 // being merge-neutral (min/max no-op against the host window), and the
1553 // orphan arm normalizes it to (0,0). A future caller handing a
1554 // real-window carrier (incl. a duplicate orphan via the matched arm)
1555 // trips loudly instead of silently corrupting the merged window.
1556 debug_assert!(
1557 gb.start_ms == u64::MAX && gb.end_ms == 0,
1558 "guest carrier must carry the merge-neutral (u64::MAX, 0) window; got ({}, {})",
1559 gb.start_ms,
1560 gb.end_ms,
1561 );
1562 match by_idx.remove(&gb.step_index) {
1563 Some(hb) => {
1564 by_idx.insert(gb.step_index, merge_matched_phase_buckets(hb, gb));
1565 }
1566 None => {
1567 // Orphan arm: a guest carrier whose step_index has no host bucket.
1568 //
1569 // Invariant: build_phase_buckets_with_stimulus synthesizes a host
1570 // bucket for every StepStart-step, so a carrier whose step has a
1571 // StepStart frame always takes the matched arm above. This arm is
1572 // reached only by a carrier whose step has NO StepStart frame —
1573 // defensive, not produced by normal capture.
1574 //
1575 // Normalize the merge-neutral sentinel window to (0,0) so duration
1576 // consumers don't underflow it. The resulting (0,0)-window +
1577 // empty-metrics + non-empty-per_cgroup shape is the orphan
1578 // signature the timeline render keys on to show "window not
1579 // measured" instead of a misleading 0ms — the (0,0) means "no host
1580 // window known", NOT a measured zero-duration step.
1581 //
1582 // A zero-duration step at scenario start (StepStart==StepEnd==0)
1583 // produces the same shape via the matched arm, but harmlessly: a
1584 // zero-duration step has no window, so the render's "not measured"
1585 // reads the same as "0ms". See `crate::timeline::phase_from_bucket`.
1586 let mut orphan = gb;
1587 orphan.start_ms = 0;
1588 orphan.end_ms = 0;
1589 by_idx.insert(orphan.step_index, orphan);
1590 }
1591 }
1592 }
1593 by_idx.into_values().collect()
1594}
1595
1596/// Per-metric merge inner helper used by
1597/// [`merge_matched_phase_buckets`]. Dispatches on the metric's
1598/// `crate::stats::MetricKind` (or the unregistered fallback)
1599/// to combine two reduced values into one.
1600///
1601/// `a_count` / `b_count` are the source buckets' `sample_count`
1602/// fields, used as weights for `Gauge(Avg)`. `a_end_ms` /
1603/// `b_end_ms` are the source buckets' window-end timestamps,
1604/// used to pick the later sample for `Gauge(Last)` / `Timestamp`.
1605fn merge_metric_values(
1606 kind: Option<crate::stats::MetricKind>,
1607 a: f64,
1608 b: f64,
1609 a_count: usize,
1610 b_count: usize,
1611 a_end_ms: u64,
1612 b_end_ms: u64,
1613) -> f64 {
1614 use crate::stats::{GaugeAgg, MetricKind};
1615 match kind {
1616 // Counter (cumulative) and DeltaSum (sum of per-read deltas) merge
1617 // across AssertResults by summing the reduced values (commutative — see
1618 // MetricKind::merge_kind). PerPhaseDeltaSum shares the `a + b` arm, but
1619 // it is a run-wide POOLED per-phase scalar (injected once into the host
1620 // bucket, never per-cgroup), so this same-step-index merge is the
1621 // defensive path only — two real PerPhaseDeltaSum values are never
1622 // summed here.
1623 Some(MetricKind::Counter)
1624 | Some(MetricKind::DeltaSum)
1625 | Some(MetricKind::PerPhaseDeltaSum) => a + b,
1626 Some(MetricKind::Peak) | Some(MetricKind::Gauge(GaugeAgg::Max)) => a.max(b),
1627 Some(MetricKind::Gauge(GaugeAgg::Avg)) => {
1628 // Weight by sample_count, floored at 1: a sample_count==0
1629 // bucket carrying a capture-independent Gauge(Avg) value must
1630 // contribute one phase-observation of weight, not be
1631 // zero-weighted out of the merge. Mirrors the .max(1) floor in
1632 // populate_run_ext_metrics_from_phases. With both counts > 0
1633 // the floor is a no-op (the prior sample_count weighting);
1634 // with both 0 each still floors to weight 1, giving the
1635 // (a+b)/2 equal-weight mean (the aggregate_samples_weighted
1636 // zero-total-weight fallback is unreachable from here).
1637 // (iteration_rate — the original synthesized zero-capture case —
1638 // is now a MetricKind::Rate: merge_matched_phase_buckets skips it
1639 // via the Rate `continue` in its key loop (above this fn) and
1640 // re-pools it from its summed Counter components, so a Rate value
1641 // never reaches this Gauge(Avg) fold.)
1642 let a_w = a_count.max(1) as f64;
1643 let b_w = b_count.max(1) as f64;
1644 (a * a_w + b * b_w) / (a_w + b_w)
1645 }
1646 Some(MetricKind::Gauge(GaugeAgg::Last)) | Some(MetricKind::Timestamp) => {
1647 if b_end_ms > a_end_ms { b } else { a }
1648 }
1649 // Derived kinds (every `is_derived()`: Rate / Distribution / WorstLowest /
1650 // WakeLatencyTailRatio / WorstCrossNodeRatio / PerPhase /
1651 // PerRunDistribution) are skipped in the merge loop (see
1652 // `merge_matched_phase_buckets`'s `is_derived` continue) and produced
1653 // post-merge (`derive_rate_metrics` / `populate_run_distribution_metrics`
1654 // / `populate_run_pooled_schbench_distribution`), so a derived value never
1655 // reaches this per-value merge — folding a ready-made derived value
1656 // would lose the re-pool. (PerRunDistribution is additionally run-level
1657 // only: it never appears in a PhaseBucket, so it cannot reach this
1658 // per-phase-bucket merge at all.)
1659 Some(MetricKind::Rate { .. })
1660 | Some(MetricKind::Distribution { .. })
1661 | Some(MetricKind::WorstLowest { .. })
1662 | Some(MetricKind::WakeLatencyTailRatio)
1663 | Some(MetricKind::WorstCrossNodeRatio)
1664 | Some(MetricKind::PerPhase)
1665 | Some(MetricKind::PerRunDistribution) => unreachable!(
1666 "derived metrics (Rate/Distribution/WorstLowest/WakeLatencyTailRatio/WorstCrossNodeRatio/PerPhase/PerRunDistribution) are produced post-merge, not merged as values"
1667 ),
1668 // Unregistered metric: commutative mean fallback. Sum
1669 // would over-count Gauge values; max would lose Counter
1670 // signal; "last" needs a tiebreak the caller can't
1671 // compute without the kind. Mean is the safest commutative
1672 // default.
1673 None => (a + b) / 2.0,
1674 }
1675}