ktstr/assert/
stats_types.rs

1use super::*;
2
3/// Verdict for a single test scenario.
4///
5/// # Reading the verdict
6///
7/// Inspect the terminal verdict via [`Self::outcome`] (returns the
8/// folded [`Outcome`] enum) or the convenience accessors
9/// [`Self::is_pass`] / [`Self::is_fail`] / [`Self::is_inconclusive`] /
10/// [`Self::is_skip`]. Iterate the per-variant payloads via
11/// [`Self::failure_details`] (all [`Outcome::Fail`] payloads),
12/// [`Self::inconclusive_details`] (all [`Outcome::Inconclusive`]
13/// payloads), and [`Self::skip_details`] (all [`Outcome::Skip`]
14/// payloads). All four bool accessors mirror
15/// [`Outcome::is_pass`] / [`Outcome::is_fail`] /
16/// [`Outcome::is_inconclusive`] / [`Outcome::is_skip`].
17///
18/// # Recording outcomes
19///
20/// Producers use the atomic mutators [`Self::record_fail`] /
21/// [`Self::record_skip`] / [`Self::record_inconclusive`] /
22/// [`Self::record_pass`] (each pushes a single [`Outcome`] variant
23/// onto [`Self::outcomes`]) and the escape hatch
24/// [`Self::record_outcome`] for pre-folded values. Constructors
25/// [`Self::pass`] / [`Self::skip`] / [`Self::fail`] seed the
26/// outcomes vec with the corresponding variant; [`Self::pass`] is
27/// zero-allocation (empty vec; the Pass identity element).
28///
29/// **Wire-format stability**: this struct is postcard-serialized as
30/// part of the in-VM `MSG_TYPE_TEST_RESULT` payload and as
31/// sidecar artifacts under `~/.cache/ktstr`. The wire format is
32/// **not stable across crate versions** — pre-1.0, fields can be
33/// added, removed, or reshaped at any time, and old sidecars must
34/// be regenerated after upgrades (re-running the affected tests
35/// produces a fresh sidecar). Per the project's pre-1.0 no-compat
36/// stance, no `#[serde(default)]` shims are added for old payloads.
37#[must_use = "test verdict is lost if not checked"]
38#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
39pub struct AssertResult {
40    /// Recorded terminal verdicts in emission order, one entry per
41    /// check that explicitly called [`Self::record_pass`],
42    /// [`Self::record_skip`], [`Self::record_inconclusive`], or
43    /// [`Self::record_fail`] (plus the single entry seeded by
44    /// [`Self::skip`] / [`Self::fail`] constructors).
45    ///
46    /// **Empty `outcomes` is the Pass identity** — [`Self::pass`]
47    /// constructs with `outcomes: vec![]`, and [`Self::outcome`]
48    /// resolves an empty `outcomes` to [`Outcome::Pass`] via the
49    /// ordered `Fail > Inconclusive > Pass > Skip` precedence (NOT a
50    /// fold via [`Outcome::merge`]; see [`Self::outcome`]), so a
51    /// never-touched accumulator naturally resolves to Pass without
52    /// any allocation. `record_pass()` is
53    /// for the rare case where a test explicitly records a passing
54    /// check (e.g. per-check helpers that document what passed);
55    /// `pass()` is the zero-state "nothing failed so far"
56    /// constructor.
57    ///
58    /// The folded terminal verdict is computed by [`Self::outcome`]
59    /// per the precedence `Fail > Inconclusive > Pass > Skip`. Use
60    /// [`Self::is_pass`] / [`Self::is_fail`] /
61    /// [`Self::is_inconclusive`] / [`Self::is_skip`] for bool
62    /// checks; use [`Self::failure_details`] /
63    /// [`Self::inconclusive_details`] / [`Self::skip_details`] to
64    /// iterate the per-variant [`AssertDetail`] payloads.
65    pub outcomes: Vec<Outcome>,
66    /// Structured records of every passing claim. Counterpart to
67    /// [`Self::outcomes`]: where `outcomes` carries terminal-verdict
68    /// records (Fail/Skip/Pass per-check), `passes` carries the
69    /// positive confirmations every comparator's pass arm emits via
70    /// [`Verdict`]'s `record_pass_unary` / `record_pass_binary`
71    /// helpers.
72    /// Empty in tests that don't exercise the structured-pass path
73    /// (the no-claim base case), populated whenever a [`Verdict`]
74    /// records claims. The auto-repro renderer iterates both vecs
75    /// to compose the bracketed phase-grouped output that surfaces
76    /// passing context alongside failing assertions.
77    ///
78    /// **Bounded by [`MAX_RECORDED_PASSES`]** — past that count,
79    /// further pushes drop on the floor and a single sentinel
80    /// record named [`PASSES_TRUNCATION_SENTINEL_NAME`] appears at
81    /// the tail. Use the sentinel-name check (not `len()`
82    /// arithmetic) to detect truncation.
83    ///
84    /// **Test-author convention**: do NOT pin `result.passes` shape
85    /// or contents in test assertions unless the test exists
86    /// specifically to verify the structured-pass surface (e.g.
87    /// the auto-repro renderer's own coverage tests). The field
88    /// exists for the renderer's consumption; pinning it
89    /// elsewhere makes the test surface viral — every new
90    /// comparator that fires under the test starts churning the
91    /// pin. Pin `outcome()`, `failure_details()`, and `measurements` for
92    /// scenario verification.
93    pub passes: Vec<PassDetail>,
94    /// Aggregated stats from all workers in this scenario.
95    pub stats: ScenarioStats,
96    /// Structured measurements attached via [`Self::note_value`] /
97    /// [`Verdict::note_value`]. Distinct from [`Self::outcomes`] —
98    /// outcomes carry typed verdict variants with `AssertDetail`
99    /// payloads for operator triage, `measurements` carries typed
100    /// `(key, NoteValue)` pairs for programmatic consumption (sidecar
101    /// parsers, `perf-delta`, regression dashboards).
102    pub measurements: std::collections::BTreeMap<String, NoteValue>,
103    /// Informational annotations attached via [`Self::note`] /
104    /// [`Verdict::note`]. Structurally separated from [`Self::outcomes`]
105    /// so the failure stream stays purely failure-shaped: sidecar
106    /// consumers iterating `details` count real failures without
107    /// the "forgot to filter notes" silent-miscount class of bug
108    /// that the prior `DetailKind::Note` variant on [`AssertDetail`]
109    /// invited. The auto-repro renderer surfaces these alongside the
110    /// failure summary so the operator still sees them on a failing
111    /// run.
112    pub info_notes: Vec<InfoNote>,
113}
114
115/// Per-cgroup statistics from worker telemetry.
116///
117/// # Percentile convention
118///
119/// `p99_wake_latency_us` and `median_wake_latency_us` are computed
120/// by `percentile` using the NEAREST-RANK (Type 1) definition:
121/// the value at `ceil(n * p) - 1` in sorted order. No interpolation
122/// between samples. This matches the percentile convention used
123/// throughout schbench and the BPF latency histograms the project
124/// cross-references, so a `ktstr` p99 reading aligns with a
125/// schbench `lat99` without adjustment. For small `n` (wake
126/// reservoirs cap at `MAX_WAKE_SAMPLES = 100_000` per worker —
127/// see `workload.rs`) nearest-rank is also numerically stable —
128/// interpolation between the two nearest ranks would be
129/// implementation-defined at sample-set boundaries.
130///
131/// # CV pooling scope
132///
133/// `wake_latency_cv` is POOLED across every sample from every
134/// worker in the cgroup, not a per-worker CV averaged back. That
135/// collapses per-worker dispersion into the cgroup-wide signal:
136/// two workers with uniformly low jitter but different means
137/// produce a high pooled CV (mean-shift between workers inflates
138/// stddev), while per-worker CV would show neither worker as
139/// bad. This is intentional for the fairness threshold
140/// (`max_wake_latency_cv`): a scheduler that gives worker A
141/// 10µs wakes and worker B 1ms wakes is failing fairness even if
142/// each worker on its own is tight. Tests comparing single-worker
143/// behavior should scope their assertions to per-worker data
144/// rather than this aggregate.
145///
146/// # Derived ratios
147///
148/// Two metrics are DERIVED rather than measured and live as
149/// `&self` methods, NOT as serde-serialized fields:
150/// [`Self::wake_latency_tail_ratio`] (= p99/median) and
151/// [`Self::iterations_per_worker`] (= total_iterations/num_workers).
152/// Pre-1.0 cleanup eliminated the prior stored-field shadow and
153/// `derive_ratios` stamper. Consumers always recompute on read,
154/// so a hand-constructed fixture or a deserialized sidecar from an
155/// older build cannot silently carry a stale ratio. The run-level
156/// worst-cgroup tail ratio (`crate::stats::MetricKind::WakeLatencyTailRatio`,
157/// an `ext_metrics` entry) and the iterations efficiencies
158/// (`worst_iterations_per_worker` / `worst_iterations_per_cpu_sec`) are all
159/// re-pooled POST-merge by [`populate_run_distribution_metrics`] — the tail
160/// ratio as the max over [`Self::wake_latency_tail_ratio`] across per-cgroup
161/// [`Self`] entries, the efficiencies lowest-wins from
162/// [`Self::iterations_per_worker`] / [`Self::iterations_per_cpu_sec`].
163#[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize, crate::Claim)]
164pub struct CgroupStats {
165    /// Cgroup name (the workload-handle label this telemetry belongs to),
166    /// or empty for unlabeled call sites (`collect_all`, bare
167    /// `assert_cgroup`). Set post-hoc by `collect_handles` (in
168    /// `crate::scenario`) where the name is in scope; `cgroup_stats`
169    /// itself has only the reports and leaves it empty. Lets a PASSING-run
170    /// consumer say which cgroup's work landed on which CPUs.
171    pub cgroup_name: String,
172    /// Number of workers in this cgroup.
173    pub num_workers: usize,
174    /// Distinct CPUs the workers in this cgroup actually ran on (union of
175    /// each [`crate::workload::WorkerReport::cpus_used`]). `num_cpus` is
176    /// its length, kept for the existing rollups; this set surfaces WHICH
177    /// CPUs (not just how many) on every run, pass or fail.
178    pub cpus_used: BTreeSet<usize>,
179    /// Distinct CPUs used across all workers in this cgroup
180    /// (`cpus_used.len()`).
181    pub num_cpus: usize,
182    /// Mean off-CPU percentage across workers (off_cpu_ns /
183    /// wall_time_ns * 100). `None` when no worker reported a
184    /// positive `wall_time_ns` (off-CPU% is undefined without wall
185    /// time) — distinct from `Some(0.0)`, a measured "never off
186    /// CPU". The `Option` keeps a not-measured cgroup from reading
187    /// as a perfectly-on-CPU one in the telemetry consumers
188    /// (`ScenarioStats.cgroups`).
189    pub avg_off_cpu_pct: Option<f64>,
190    /// Minimum off-CPU percentage across workers. `None` under the
191    /// same no-measurable-wall-time condition as `avg_off_cpu_pct`.
192    pub min_off_cpu_pct: Option<f64>,
193    /// Maximum off-CPU percentage across workers. `None` under the
194    /// same no-measurable-wall-time condition as `avg_off_cpu_pct`.
195    pub max_off_cpu_pct: Option<f64>,
196    /// `max_off_cpu_pct - min_off_cpu_pct`. Measures scheduling
197    /// fairness within the cgroup. `None` when off-CPU% was not
198    /// measured (no worker with positive wall time) — a not-measured
199    /// cgroup is inconclusive for fairness, NOT "spread 0 = perfectly
200    /// fair". `Some(0.0)` means a real measured zero spread.
201    pub spread: Option<f64>,
202    /// Longest scheduling gap across all workers (ms).
203    pub max_gap_ms: u64,
204    /// CPU where the longest scheduling gap occurred.
205    pub max_gap_cpu: usize,
206    /// Sum of CPU migration counts across all workers.
207    pub total_migrations: u64,
208    /// Migrations per iteration (total_migrations / total_iterations).
209    pub migration_ratio: f64,
210    /// 99th percentile wake latency across all workers (microseconds).
211    pub p99_wake_latency_us: f64,
212    /// Median wake latency across all workers (microseconds).
213    pub median_wake_latency_us: f64,
214    /// Coefficient of variation (stddev / mean) of wake latencies.
215    ///
216    /// Computed over the POOLED latency samples from every worker in
217    /// the cgroup, not as a mean of per-worker CVs. Per-worker
218    /// dispersion is therefore masked: a cgroup with one tight
219    /// worker and one wildly variable worker can report a moderate
220    /// pooled CV that looks healthier than either constituent. Use
221    /// [`WorkerReport::wake_latencies_ns`] directly if per-worker
222    /// CV is needed.
223    pub wake_latency_cv: f64,
224    /// Whether any worker in this cgroup recorded a wake-latency sample.
225    /// `false` makes the wake reductions above (`p99_wake_latency_us`,
226    /// `median_wake_latency_us`, `wake_latency_cv`) a not-measured sentinel
227    /// `0.0` rather than a measured zero — a percentile over zero samples is
228    /// undefined, not "instant wakes". The run-level distributional re-pool
229    /// (`populate_run_distribution_metrics`) reads this to EXCLUDE a
230    /// no-wake-sample cgroup from the cross-run mean instead of folding its
231    /// `0.0` in (which, for the LowerBetter wake metrics, would falsely drag
232    /// the mean toward "perfect"). Same not-measured-vs-measured-zero
233    /// discipline the off-CPU% `Option` fields above carry.
234    pub wake_measured: bool,
235    /// Median timer-latency across all workers (microseconds) — the
236    /// [`crate::workload::WorkType::TimerLatency`] cyclictest probe's per-cgroup
237    /// pooled reduction over
238    /// [`crate::workload::WorkerReport::timer_latencies_ns`]. `0.0` when no
239    /// worker recorded timer samples.
240    pub median_timer_latency_us: f64,
241    /// 99th-percentile timer-latency across all workers (microseconds). See
242    /// [`Self::median_timer_latency_us`].
243    pub p99_timer_latency_us: f64,
244    /// 99.9th-percentile (deep-tail) timer-latency across all workers
245    /// (microseconds). See [`Self::median_timer_latency_us`].
246    pub p999_timer_latency_us: f64,
247    /// Worst (maximum) timer-latency across all workers (microseconds). See
248    /// [`Self::median_timer_latency_us`].
249    pub worst_timer_latency_us: f64,
250    /// Whether any worker in this cgroup recorded a timer-latency sample.
251    /// `false` makes the timer reductions above a not-measured sentinel `0.0`
252    /// (no [`crate::workload::WorkType::TimerLatency`] worker ran), distinct
253    /// from a measured zero. Read by the run-level re-pool to EXCLUDE a
254    /// no-timer-sample cgroup from the cross-run mean. Mirrors
255    /// [`Self::wake_measured`] for the timer carrier.
256    pub timer_measured: bool,
257    /// Sum of iteration counts across all workers.
258    pub total_iterations: u64,
259    /// Sum of per-worker on-CPU time (nanoseconds), from each worker's
260    /// schedstat run time ([`crate::workload::WorkerReport::schedstat_cpu_time_ns`]
261    /// — `task->se.sum_exec_runtime`, the FIRST `/proc/<pid>/schedstat` field
262    /// (`sched_info` supplies only the run_delay/pcount fields 2/3, not the
263    /// on-CPU time), the summable per-thread proxy for the cgroup's
264    /// `cpu.stat usage_usec`).
265    /// Denominator for [`Self::iterations_per_cpu_sec`], the
266    /// overcommit-invariant per-cell rate. `0` when no worker reported on-CPU
267    /// time (the accessor then returns `None`).
268    pub total_cpu_time_ns: u64,
269    /// Mean schedstat run delay across workers (microseconds).
270    pub mean_run_delay_us: f64,
271    /// Worst schedstat run delay across workers (microseconds).
272    pub worst_run_delay_us: f64,
273    /// Whether this cgroup had any worker to measure run-delay from
274    /// (`!run_delays.is_empty()`, i.e. `num_workers > 0`) — `false` only for a
275    /// worker-less cgroup, keeping a degenerate empty cohort from folding a
276    /// sentinel `0.0` into the cross-run run-delay mean. Unlike wake/timer
277    /// (per-sample streams a running worker may never emit), run-delay is one
278    /// `sched_info.run_delay` value per worker, always present once a worker
279    /// exists: a worker that never queued reads a real measured `0.0`, not a
280    /// no-measurement sentinel. `sched_info.run_delay` accumulates whenever
281    /// `CONFIG_SCHED_INFO` is built in (compile-time — forced on in ktstr,
282    /// `select`ed by both `CONFIG_SCHEDSTATS` and `CONFIG_TASK_DELAY_ACCT`),
283    /// with no gate on the runtime `kernel.sched_schedstats` key (that key
284    /// gates only the `schedstat_*` rq/se aggregates, never `run_delay`), so
285    /// run-delay is genuinely measured on every ktstr run and worker-presence
286    /// is the correct measured predicate. Mirrors [`Self::wake_measured`] for
287    /// the run-delay carrier.
288    pub run_delay_measured: bool,
289    /// Fraction of pages on the expected NUMA node(s) (0.0-1.0).
290    /// Derived from `/proc/self/numa_maps` and the worker's
291    /// [`MemPolicy`](crate::workload::MemPolicy).
292    pub page_locality: f64,
293    /// Cross-node page migration ratio from `/proc/vmstat`
294    /// `numa_pages_migrated` delta divided by total allocated pages.
295    pub cross_node_migration_ratio: f64,
296    /// Whole-run taobench engine COUNTER aggregate pooled across this cgroup's
297    /// [`crate::workload::WorkType::Taobench`] workers (Σ ops, MAX wall window —
298    /// the window is shared by concurrent workers, per
299    /// [`crate::workload::WorkerReport::taobench_whole`]). `None` for every
300    /// non-taobench cgroup. A RAW carrier, like [`Self::total_iterations`] /
301    /// [`Self::total_cpu_time_ns`] — not a reduced ratio: the run-level
302    /// cross-cgroup pool [`crate::assert::populate_run_pooled_taobench`] folds it
303    /// into the `total_taobench_*` Counter components and the derived
304    /// `taobench_*_per_sec` / `taobench_hit_fraction` / `taobench_command_hit_rate`
305    /// Rates in [`Self::ext_metrics`] (whole-run keys visible to `--noise-adjust`
306    /// spread, unlike the per-phase `taobench_*_qps` which are
307    /// `MetricKind::PerPhase`). Whole-run, NOT summable from the per-phase
308    /// `PhaseCgroupStats::taobench` carriers (per-phase `elapsed_ns` is
309    /// MAX-merged across concurrent threads, so summing phase windows is the
310    /// wrong qps denominator), so the engine's authoritative whole-run aggregate
311    /// is shipped from the worker. Holds COUNTERS only
312    /// ([`TaobenchStats`](crate::workload::taobench::run::TaobenchStats)) — the
313    /// serve-latency histogram is per-phase data on `PhaseCgroupStats::taobench`,
314    /// and the whole-run serve distribution (`taobench_serve_*_us_whole`) is the
315    /// union of those per-phase histograms. `pub` (every `CgroupStats` field is
316    /// `pub` and the struct is preluded, so a test author can read the counters).
317    /// `#[claim(skip)]`: a raw aggregate carrier, not a test-author claim
318    /// surface — assertions run against the host-derived run-level `taobench_*`
319    /// Rate / serve-latency metrics, mirroring
320    /// [`crate::workload::WorkerReport::taobench_whole`].
321    #[claim(skip)]
322    pub taobench_whole: Option<crate::workload::taobench::run::TaobenchStats>,
323    /// Extensible metrics for the generic comparison pipeline.
324    pub ext_metrics: BTreeMap<String, f64>,
325}
326
327/// Per-phase per-cgroup raw telemetry components — the per-phase analogue of
328/// [`CgroupStats`]. Holds RAW components (sample vectors + counters), NOT the
329/// reduced ratios/percentiles [`CgroupStats`] computes, so whole-run and
330/// cross-run aggregates RE-POOL from the components at every level (the
331/// per-phase telemetry thesis: an aggregate is recomputed over the pooled
332/// components, never averaged from ready-made per-phase reductions — a
333/// percentile or weighted ratio cannot be recovered from per-phase scalars).
334/// Covers every TYPED [`CgroupStats`] reduction: avg/min/max off-CPU% and
335/// spread from `off_cpu_pcts`; p99/median/CV wake latency from
336/// `wake_latencies_ns`; mean/worst run-delay from `run_delays_ns`;
337/// migration_ratio, iterations_per_cpu_sec, iterations_per_worker,
338/// page_locality, cross_node_migration_ratio from their counter components;
339/// the COUPLED worst gap (ms + the CPU that owned it) from `max_gap_ms` /
340/// `max_gap_cpu`; cpus_used / num_cpus from `cpus_used`. EXCLUDES
341/// [`CgroupStats::ext_metrics`] (the generic extensible map — a per-phase
342/// per-cgroup custom metric is a future extension, not part of the typed
343/// carrier). Lives in [`PhaseBucket::per_cgroup`], keyed by cgroup name. The
344/// structural carrier is empty until a capture path populates it per phase.
345#[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize, crate::Claim)]
346pub struct PhaseCgroupStats {
347    /// Worker count in this cgroup for the phase — the denominator for the
348    /// re-pooled per-worker iteration rate (`iterations_per_worker` =
349    /// `total_iterations` / this). This is a set CARDINALITY (`reports.len()`),
350    /// not a kernel counter, but it SUMs in `merge` because a single cgroup name
351    /// can emit MULTIPLE carriers in one step — `collect_handles` builds one per
352    /// `WorkloadHandle`, and a `CgroupDef` with several `WorkSpec` entries
353    /// (`.work(..).work(..)`) spawns one handle per `WorkSpec` under the same
354    /// name (`apply_setup`). Those carriers cover DISJOINT worker subsets, so the
355    /// cardinality of their union is the SUM (4 + 2 → 6), matching [`cgroup_stats`]
356    /// over the pooled reports (`reports.len()`); a MAX would understate the count
357    /// and inflate `iterations_per_worker`. (The disjointness is the real
358    /// justification — were carriers ever to overlap, the SUM would over-count.)
359    pub num_workers: usize,
360    /// Distinct CPUs the cgroup's workers ran on in the phase (union of each
361    /// worker's `cpus_used`). Re-pools [`CgroupStats::cpus_used`] / `num_cpus`
362    /// (= the set / its length) via a set UNION.
363    pub cpus_used: std::collections::BTreeSet<usize>,
364    /// Pooled per-wakeup latency samples (ns) across the cgroup's workers in
365    /// the phase, un-reduced so p99 / median / CV re-pool over the combined set.
366    /// The POOL is reservoir-capped at `MAX_WAKE_SAMPLES` (the per-worker bound,
367    /// re-applied when same-name carriers merge so the carrier payload stays
368    /// bounded on the size-limited guest bulk port — without it the pool would be
369    /// `workers × MAX_WAKE_SAMPLES`); `wake_sample_total` carries the true
370    /// pre-cap population. The CARRIER-level reductions divide by
371    /// `wake_latencies_ns.len()` (this capped pool size), NOT by
372    /// `wake_sample_total`: [`Self::wake_summary`] takes p99 / median over `len`,
373    /// and [`cgroup_stats`] computes `cv = stddev/mean` with
374    /// `n = all_latencies.len()`. The RUN-level cross-phase re-pool
375    /// ([`populate_run_distribution_metrics`]) instead population-WEIGHTS (see
376    /// the PARITY CONTRACT below): its CV / mean divide by Σ per-sample weights
377    /// (the reconstructed true population), which equals `len` only below the cap.
378    ///
379    /// PARITY CONTRACT (the one component whose parity is size-dependent): for
380    /// pools ≤ `MAX_WAKE_SAMPLES` the reservoir IS the full concatenation, so the
381    /// p99 / median / CV re-pool reproduces [`cgroup_stats`] VALUE-FOR-VALUE.
382    /// Above the cap the carrier holds a distribution-preserving reservoir
383    /// SUBSAMPLE while [`cgroup_stats`] reduces over the full per-worker concat,
384    /// so the re-pool is DISTRIBUTION-EQUIVALENT, not byte-identical (the bounded
385    /// bulk-port frame forbids carrying the full pool; staged reservoirs cannot be
386    /// byte-identical to a single full-pool reduction). This is BY DESIGN:
387    /// `cgroup_stats` stays the uncapped run-level authority (capping it to match
388    /// the carrier would discard most of a multi-worker cgroup's samples to chase
389    /// a sub-display-precision artifact), and the carrier's >cap merge is WEIGHTED
390    /// by `wake_sample_total` (`Self::weighted_merge_reservoirs`) so the subsample
391    /// is an UNBIASED sample of the combined population — no smaller-population
392    /// skew. Both layers de-skew the cap: the carrier MERGE weights by
393    /// `wake_sample_total` (`Self::weighted_merge_reservoirs`), and the
394    /// cross-PHASE run-level pool in `populate_run_distribution_metrics` weights
395    /// each phase carrier's samples by `wake_sample_total / wake_latencies_ns.len()`
396    /// (so a phase that exceeded the cap contributes by true population, not
397    /// capped length) and reduces with the weighted percentile / moments — the
398    /// prior length-weighted concat is gone. Below the cap every weight is 1.0,
399    /// so the weighted P99 / median / mean / worst are BYTE-identical to the
400    /// unweighted concat; the weighted CV matches only within ~1e-9 (it sums in
401    /// f64 where the unweighted path sums the mean in u64 — a weighted variance
402    /// cannot keep the u64 sum).
403    pub wake_latencies_ns: Vec<u64>,
404    /// True wakeup count before reservoir clamping (`wake_latencies_ns` is
405    /// capped), so the re-pool can report the real population size. An
406    /// intentional ADDITION over [`CgroupStats`] (which has no such field), NOT
407    /// a mirrored reduction — do not strip it in a strict-parity audit; it is
408    /// the only source of the true wakeup population once `wake_latencies_ns` is
409    /// reservoir-clamped, and it is for REPORTING, not the CV denominator.
410    pub wake_sample_total: u64,
411    /// Pooled per-timer-cycle latency samples (ns) across the cgroup's
412    /// [`crate::workload::WorkType::TimerLatency`] workers in the phase,
413    /// un-reduced so median / p99 / p999 / worst re-pool over the combined set.
414    /// Reservoir-capped at `MAX_WAKE_SAMPLES` on same-name-carrier merge exactly
415    /// like `wake_latencies_ns` (population-weighted >cap); `timer_sample_total`
416    /// carries the true pre-cap population. Distinct carrier from
417    /// `wake_latencies_ns`.
418    pub timer_latencies_ns: Vec<u64>,
419    /// True timer-cycle count before reservoir clamping (`timer_latencies_ns` is
420    /// capped), so the re-pool reports the real population. Mirrors
421    /// `wake_sample_total` for the timer carrier.
422    pub timer_sample_total: u64,
423    /// Pooled per-worker schedstat run-delay samples (RAW ns) for the phase,
424    /// un-reduced so mean / worst run-delay re-pool over the combined set; the
425    /// re-pool converts ns → µs to match [`CgroupStats`]'s run-delay-µs fields.
426    /// Stored as raw kernel ns (like `wake_latencies_ns`), not pre-converted,
427    /// per the raw-component thesis. GRANULARITY: unlike `wake_latencies_ns`
428    /// (one per WAKEUP), each entry here is ONE per-worker value — that
429    /// worker's `sched_info.run_delay` delta over the carrier's window: the
430    /// whole-run `schedstat_run_delay_ns` (end−start) for the step-local
431    /// carrier, or the per-phase delta for the backdrop slice carrier. So the
432    /// pool size is the worker
433    /// count, the mean is the average per-worker total queued-to-run delay, and
434    /// `worst_run_delay_us` selects the single worker with the largest total
435    /// queued-to-run delay (NOT the worst single dispatch).
436    pub run_delays_ns: Vec<u64>,
437    /// Per-worker off-CPU% samples for the phase, un-reduced. Carried for the
438    /// per-phase per-cgroup off-CPU% RENDER — the avg / min / max /
439    /// spread of the combined set. NOT consumed by the run-level
440    /// distributional re-pool: off-CPU% has no run-level Distribution metric
441    /// (off-CPU%/spread is intrinsically per-cgroup, so the run-level
442    /// `worst_spread` stays the cross-cgroup max of per-cgroup
443    /// [`CgroupStats::spread`] via the typed [`AssertResult::merge`] fold, not a
444    /// pooled distribution). An EMPTY vec is the not-measured state (no worker
445    /// with positive wall time), preserving the not-measured vs measured-zero
446    /// distinction [`CgroupStats`] keeps. Stored as raw samples, not pre-reduced
447    /// extremes, because the mean is unrecoverable from min/max alone for >2
448    /// workers. Each sample is `off_cpu_ns /
449    /// wall_time_ns * 100`, where `off_cpu_ns = wall_time_ns - cpu_time_ns` and
450    /// `cpu_time_ns` is the `CLOCK_THREAD_CPUTIME_ID` thread on-CPU time
451    /// (workload/worker `off_cpu_ns` at report build). `total_cpu_time_ns` is a
452    /// DISTINCT on-CPU measurement (`schedstat_cpu_time_ns`, the `/proc`
453    /// schedstat `se.sum_exec_runtime`): both ultimately track on-CPU runtime but
454    /// are sampled at different points (the `CLOCK_THREAD_CPUTIME_ID` read folds
455    /// the in-flight delta; the schedstat field reads the stored value), so the
456    /// two need not be byte-identical and must not be cross-wired in a re-pool.
457    pub off_cpu_pcts: Vec<f64>,
458    /// Sum of per-worker CPU-migration counts in the phase (Counter).
459    pub total_migrations: u64,
460    /// Sum of per-worker iteration counts in the phase (Counter).
461    pub total_iterations: u64,
462    /// Sum of per-worker on-CPU time (ns) in the phase — the
463    /// overcommit-invariant rate denominator (Counter). Sourced from
464    /// `schedstat_cpu_time_ns` (the `/proc` schedstat `se.sum_exec_runtime`,
465    /// rq-charged on-CPU ns) — a DISTINCT on-CPU-time sample from the
466    /// `CLOCK_THREAD_CPUTIME_ID` time behind `off_cpu_pcts` (different sample
467    /// point; not byte-identical), so do not cross-wire the two in a re-pool.
468    pub total_cpu_time_ns: u64,
469    /// Pages on the expected NUMA node(s) — page-locality numerator. A per-task
470    /// `/proc/self/numa_maps` residency GAUGE (current snapshot of the task's mm,
471    /// recomputed each read — the kernel zeroes and re-walks the page tables),
472    /// SPATIALLY summed across the cgroup's workers within a phase: disjoint-mm
473    /// under the `CloneMode::Fork` default (the true cgroup total), but
474    /// `CloneMode::Thread` siblings share one mm and the SUM over-counts shared
475    /// pages once per thread (caveat inherited from `WorkerReport::numa_pages`).
476    /// The CROSS-PHASE fold takes the LATEST measured snapshot (see
477    /// `numa_agg_per_cgroup`), never a sum (summing residency across phases
478    /// over-counts by the phase count).
479    pub numa_pages_local: u64,
480    /// Total allocated pages — the SHARED denominator for BOTH page_locality
481    /// (`numa_pages_local` / this) AND cross_node_migration_ratio
482    /// (`cross_node_migrated` / this). A per-task `/proc/self/numa_maps` residency
483    /// GAUGE (same class/folds as `numa_pages_local`: within-phase SUM across
484    /// workers — disjoint-mm under the `CloneMode::Fork` default, Thread-mode
485    /// over-count caveat inherited from `WorkerReport::numa_pages` — cross-phase
486    /// LATEST snapshot); the kernel computes both ratios over the identical page
487    /// total, so one field serves both — a separate cross_node_total would invite
488    /// a silent desync.
489    pub numa_pages_total: u64,
490    /// Cross-node migrated pages — cross_node_migration_ratio numerator
491    /// (denominator is `numa_pages_total`). A SYSTEM-WIDE
492    /// `/proc/vmstat numa_pages_migrated` monotonic-COUNTER delta each worker
493    /// observes redundantly, so the within-phase fold is MAX across
494    /// workers/sources (summing would inflate it by the worker count — mirrors
495    /// [`CgroupStats`]'s deliberate max-fold); the CROSS-PHASE fold SUMs the
496    /// per-phase deltas over disjoint intervals to the run total.
497    pub cross_node_migrated: u64,
498    /// Longest scheduling gap (ms) across the cgroup's workers in the phase,
499    /// coupled with `max_gap_cpu`. A Peak folded as an ARGMAX of the (ms, cpu)
500    /// pair so the worst gap and its CPU survive together — mirrors
501    /// [`CgroupStats`]'s `max_gap_ms` / `max_gap_cpu` coupling (a bare
502    /// independent max would desync the gap from its CPU).
503    pub max_gap_ms: u64,
504    /// CPU that owned the worst scheduling gap — `max_gap_ms`'s argmax
505    /// companion. Folded together with `max_gap_ms`, never independently.
506    pub max_gap_cpu: usize,
507    /// True when this carrier's raw sample vectors (`wake_latencies_ns` /
508    /// `timer_latencies_ns` / `run_delays_ns` / `off_cpu_pcts`, plus the
509    /// `schbench` histograms) were dropped by
510    /// `AssertResult::strip_phase_cgroup_samples` to fit the size-limited guest
511    /// bulk frame — distinct from a carrier that genuinely measured no samples.
512    /// The reduced counters survive; only the per-phase distribution render
513    /// loses its source, so the render shows "samples stripped" rather
514    /// than the not-measured "n/a". Defaults to `false` (not stripped) and is set
515    /// only on a carrier that actually HAD samples to drop; ORs across `merge` so
516    /// a merged carrier is stripped if either input was.
517    pub stripped: bool,
518    /// Per-cgroup DERIVED scalar metrics for this (phase, cgroup), keyed by
519    /// `crate::stats::MetricDef` name — the per-cgroup analog of
520    /// [`PhaseBucket::metrics`] (which is the pooled-across-cgroups set). Populated
521    /// post-fold by `derive_phase_metrics` (both the schbench per-phase family AND
522    /// the non-schbench families — wake p99/median/cv, mean/max run-delay,
523    /// avg/min/max/spread off-CPU%, and the migration / iterations / locality
524    /// ratios) from the SAME reducers that fill
525    /// the pooled map, so a
526    /// test can query "metric M of cgroup C in phase P" as readily as the phase
527    /// aggregate (N cgroups -> N queryable sets + the pooled aggregate). DERIVED, not a
528    /// raw component: `PhaseCgroupStats::merge` leaves it empty and it is (re)derived
529    /// POST-merge, exactly as the pooled map skips is_derived keys in
530    /// `merge_matched_phase_buckets`. ALWAYS serialized (no `skip_serializing_if`):
531    /// PhaseCgroupStats rides the postcard bulk-TLV port, a NON-self-describing
532    /// POSITIONAL format — a conditionally-omitted field desyncs the byte stream and
533    /// corrupts the fields after it (here `schbench`), so the field must always be
534    /// present. No `serde(default)` either: pre-1.0, old sidecar/cache data is
535    /// disposable and regenerates (no compat shim). Read via [`Self::get`]; the
536    /// `crate::Claim` derive skips a `BTreeMap` field (matching
537    /// [`PhaseBucket::metrics`], which has no Claim accessor either).
538    pub metrics: std::collections::BTreeMap<String, f64>,
539    /// Per-phase schbench engine metrics for a `WorkType::Schbench` backdrop
540    /// cgroup (`None` for every non-schbench carrier). Pooled across the
541    /// cgroup's workers by [`PhaseCgroupStats::merge`] (histogram
542    /// bucket-add + integer-add of the run-delay raw pairs). The schbench
543    /// per-phase derivation reads it, pools across cgroups, and derives the
544    /// per-phase percentile / run-delay-mean / loop-count scalars into
545    /// [`PhaseBucket::metrics`]. `pub(crate)`: the element type is `pub(crate)`
546    /// and a per-phase carrier is internal — test authors read `PhaseBucket`,
547    /// not this. Non-`pub` so the `crate::Claim` derive skips it (a percentile
548    /// histogram has no meaningful scalar claim accessor).
549    pub(crate) schbench: Option<crate::workload::schbench::run::SchbenchPhaseStats>,
550    /// Per-phase taobench engine metrics for a `WorkType::Taobench` backdrop
551    /// cgroup (`None` for every non-taobench carrier). Pooled across the cgroup's
552    /// workers by [`PhaseCgroupStats::merge`] (counter-add, wall-window MAX, serve
553    /// histograms unioned). The taobench per-phase derivation reads it, pools
554    /// across cgroups, and derives the per-phase qps / hit-ratio / serve-latency
555    /// scalars into [`PhaseBucket::metrics`]. `pub(crate)`: an internal carrier.
556    pub(crate) taobench: Option<crate::workload::taobench::run::TaobenchPhaseStats>,
557}
558
559impl PhaseCgroupStats {
560    /// Component-wise union of two per-phase per-cgroup data for the SAME
561    /// cgroup name (same `step_index`). Fold rule by component class:
562    /// - sample vectors (`wake_latencies_ns`, `run_delays_ns`, `off_cpu_pcts`)
563    ///   CONCAT, so the re-pool sees the combined set, never a mean of
564    ///   per-source reductions;
565    /// - the CPU set (`cpus_used`) UNIONs;
566    /// - genuine Counters (`num_workers`, `wake_sample_total`,
567    ///   `total_migrations`, `total_iterations`, `total_cpu_time_ns`) SUM —
568    ///   `num_workers` included, because a multi-`WorkSpec` cgroup emits one
569    ///   carrier per handle covering DISJOINT worker subsets, so summing
570    ///   reproduces the pooled count (see the `num_workers` field doc);
571    /// - the residency GAUGEs (`numa_pages_local`, `numa_pages_total`) also SUM
572    ///   here — the WITHIN-PHASE spatial sum across the cgroup's workers
573    ///   (disjoint-mm under the `CloneMode::Fork` default; Thread-mode shares one
574    ///   mm and over-counts, the caveat inherited from `WorkerReport::numa_pages`),
575    ///   NOT a cross-phase sum (the cross-phase fold takes the LATEST snapshot in
576    ///   `numa_agg_per_cgroup`);
577    /// - `cross_node_migrated`, a system-wide vmstat monotonic-Counter delta each
578    ///   worker observes redundantly, takes the MAX across workers (summing would
579    ///   inflate it); its cross-phase fold SUMs the per-phase deltas;
580    /// - the COUPLED worst gap (`max_gap_ms`, `max_gap_cpu`) folds as an
581    ///   ARGMAX — the pair from whichever side has the larger ms (b's on tie,
582    ///   matching the builders' `max_by_key` last-wins) so the gap and its CPU
583    ///   stay bound together.
584    ///
585    /// The counter SUMs use plain `+`: debug builds panic on overflow rather
586    /// than wrapping. The realistic magnitudes (iteration / ns counts far
587    /// below `u64::MAX` even pooled across a long run) keep overflow
588    /// unreachable; a loud debug panic is preferred over a silently wrong
589    /// re-pool denominator.
590    pub(crate) fn merge(a: PhaseCgroupStats, b: PhaseCgroupStats) -> PhaseCgroupStats {
591        // Merge the two capped wake-latency reservoirs. Same-name carriers (a
592        // multi-`WorkSpec` cgroup's per-handle carriers) merge ON THE GUEST before
593        // the AssertResult is serialized over the bulk port, so K carriers must
594        // not concat to K × MAX_WAKE_SAMPLES (it could overrun the 16 MiB frame,
595        // flipping a PASS to a truncated FAIL).
596        //
597        // ≤cap: the concatenation IS the true combined population, so it passes
598        // through unchanged — value-for-value parity with cgroup_stats for small
599        // pools (only >cap pools become a subsample; see the `wake_latencies_ns`
600        // field doc). >cap: a WEIGHTED reservoir merge weighted by each carrier's
601        // true pre-cap population (`wake_sample_total`), so the merged sample is an
602        // UNBIASED uniform sample of the combined population — NOT the
603        // smaller-population-skewed reservoir-of-reservoirs an unweighted
604        // concat-and-re-cap produced (which weighted by reservoir LENGTH ≈ 50/50,
605        // ignoring the true populations).
606        let cap = crate::workload::MAX_WAKE_SAMPLES;
607        let wake_latencies_ns = if a.wake_latencies_ns.len() + b.wake_latencies_ns.len() <= cap {
608            let mut v = a.wake_latencies_ns;
609            v.extend(b.wake_latencies_ns);
610            v
611        } else {
612            Self::weighted_merge_reservoirs(
613                &a.wake_latencies_ns,
614                a.wake_sample_total,
615                &b.wake_latencies_ns,
616                b.wake_sample_total,
617                cap,
618            )
619        };
620        // Same bounded merge for the distinct timer reservoir:
621        // <=cap concatenation is value-for-value; >cap a population-weighted
622        // reservoir merge keeps it an unbiased sample of the combined population.
623        let timer_latencies_ns = if a.timer_latencies_ns.len() + b.timer_latencies_ns.len() <= cap {
624            let mut v = a.timer_latencies_ns;
625            v.extend(b.timer_latencies_ns);
626            v
627        } else {
628            Self::weighted_merge_reservoirs(
629                &a.timer_latencies_ns,
630                a.timer_sample_total,
631                &b.timer_latencies_ns,
632                b.timer_sample_total,
633                cap,
634            )
635        };
636        let mut run_delays_ns = a.run_delays_ns;
637        run_delays_ns.extend(b.run_delays_ns);
638        let mut off_cpu_pcts = a.off_cpu_pcts;
639        off_cpu_pcts.extend(b.off_cpu_pcts);
640        let mut cpus_used = a.cpus_used;
641        cpus_used.extend(b.cpus_used);
642        // Coupled worst-gap ARGMAX: take the (ms, cpu) pair together from the
643        // side with the larger gap (b's on tie, matching the builders'
644        // max_by_key last-wins) so the CPU stays bound to the gap it owned — a
645        // bare independent max would desync them. The last-wins tie-break is
646        // parity-coupled to fold order: AssertResult::merge folds same-name
647        // carriers in the order reports are pooled (handle iteration order), so
648        // on an equal-gap tie this yields the same CPU as a single cgroup_stats
649        // over the concatenated reports. A reordered fold would break that parity.
650        let (max_gap_ms, max_gap_cpu) = if b.max_gap_ms >= a.max_gap_ms {
651            (b.max_gap_ms, b.max_gap_cpu)
652        } else {
653            (a.max_gap_ms, a.max_gap_cpu)
654        };
655        // Per-phase schbench: OR-with-combine. Both Some → merge the pooled
656        // histograms + integer-add the run-delay raw pairs / loop_count
657        // (SchbenchPhaseStats::merge, the SAME associative+commutative op the
658        // guest engine uses to pool across message threads); one Some → carry
659        // it; both None (non-schbench cgroup) → None.
660        let schbench = match (a.schbench, b.schbench) {
661            (Some(mut x), Some(y)) => {
662                x.merge(&y);
663                Some(x)
664            }
665            (Some(x), None) | (None, Some(x)) => Some(x),
666            (None, None) => None,
667        };
668        // Per-phase taobench: OR-with-merge — both Some → counter-add + wall-window
669        // MAX + serve-latency histogram union (TaobenchPhaseStats::merge, the SAME
670        // op the guest engine pools with); one Some → carry; both None
671        // (non-taobench cgroup) → None.
672        let taobench = match (a.taobench, b.taobench) {
673            (Some(mut x), Some(y)) => {
674                x.merge(&y);
675                Some(x)
676            }
677            (Some(x), None) | (None, Some(x)) => Some(x),
678            (None, None) => None,
679        };
680        // saturating_add: pool the guest-runtime monotonic counters/times (the
681        // sample totals, migrations, iterations, cpu-time-ns, numa pages) across
682        // cgroups. Overflow is physically unreachable for honest data (real
683        // counts << u64::MAX), but a corrupt/hostile guest value would otherwise
684        // debug-panic or release-wrap to a silently-wrong derived metric;
685        // saturating is exact for every in-range value. num_workers is a
686        // ktstr-configured TOPOLOGY count (bounded by the guest CPU count, not a
687        // runtime accumulator), so it cannot overflow and keeps plain `+`.
688        // cross_node_migrated pools as MAX (a system-wide vmstat snapshot shared
689        // by the concurrent cgroups), not a sum.
690        PhaseCgroupStats {
691            num_workers: a.num_workers + b.num_workers,
692            cpus_used,
693            wake_latencies_ns,
694            wake_sample_total: a.wake_sample_total.saturating_add(b.wake_sample_total),
695            timer_latencies_ns,
696            timer_sample_total: a.timer_sample_total.saturating_add(b.timer_sample_total),
697            run_delays_ns,
698            off_cpu_pcts,
699            total_migrations: a.total_migrations.saturating_add(b.total_migrations),
700            total_iterations: a.total_iterations.saturating_add(b.total_iterations),
701            total_cpu_time_ns: a.total_cpu_time_ns.saturating_add(b.total_cpu_time_ns),
702            numa_pages_local: a.numa_pages_local.saturating_add(b.numa_pages_local),
703            numa_pages_total: a.numa_pages_total.saturating_add(b.numa_pages_total),
704            cross_node_migrated: a.cross_node_migrated.max(b.cross_node_migrated),
705            max_gap_ms,
706            max_gap_cpu,
707            stripped: a.stripped || b.stripped,
708            // DERIVED, not a raw component: left EMPTY here and (re)derived post-merge
709            // by derive_phase_metrics (the fold runs derive AFTER merging, the
710            // same reason the pooled map skips is_derived keys in
711            // merge_matched_phase_buckets). Folding stale per-operand metrics would
712            // double-count; the post-merge re-derive is the sole producer.
713            metrics: std::collections::BTreeMap::new(),
714            schbench,
715            taobench,
716        }
717    }
718
719    /// Look up this cgroup's per-phase DERIVED value for `metric_name` — the
720    /// per-cgroup analog of [`PhaseBucket::get`] (see [`Self::metrics`]). `None`
721    /// when this cgroup carried no finite samples for the metric (the ABSENT
722    /// discipline), distinct from `Some(0.0)` (a real reducer zero).
723    pub fn get(&self, metric_name: &str) -> Option<f64> {
724        self.metrics.get(metric_name).copied()
725    }
726
727    /// Like [`Self::get`] but panics citing the metric keys actually present when
728    /// the metric is absent — use when the caller knows this cgroup MUST carry the
729    /// metric in the phase.
730    pub fn expect_metric(&self, metric_name: &str) -> f64 {
731        self.get(metric_name).unwrap_or_else(|| {
732            panic!(
733                "PhaseCgroupStats::expect_metric: metric '{}' absent from this \
734                 per-cgroup carrier (num_workers={}, stripped={}); keys present: {:?}. \
735                 Causes: (a) the cgroup produced no finite samples for it; (b) metric \
736                 name typo; (c) a non-schbench carrier has no schbench keys.",
737                metric_name,
738                self.num_workers,
739                self.stripped,
740                self.metrics.keys().collect::<Vec<_>>(),
741            )
742        })
743    }
744
745    /// The per-cgroup analog of [`PhaseBucket::cgroup_counter_total`] for the
746    /// three per-cgroup Counters that live ONLY on the carrier (no derived
747    /// `metrics` entry): `total_migrations`, `total_iterations`, and
748    /// `total_cpu_time_ns`. Lets
749    /// [`crate::vmm::VmResult::phase_cgroup_metric`] expose them by metric name
750    /// symmetrically with the pooled [`crate::vmm::VmResult::phase_metric`]
751    /// `cgroup_counter_total` fallback (the pooled `_total` suffix marks the
752    /// cross-cgroup SUM; this per-cgroup form returns one cgroup's value). `None`
753    /// for any other name (derived metrics are read via [`Self::get`]).
754    pub fn cgroup_counter(&self, name: &str) -> Option<f64> {
755        match name {
756            "total_migrations" => Some(self.total_migrations as f64),
757            "total_iterations" => Some(self.total_iterations as f64),
758            "total_cpu_time_ns" => Some(self.total_cpu_time_ns as f64),
759            _ => None,
760        }
761    }
762
763    /// Merge two CAPPED uniform reservoirs into one of size ≤ `cap` that is a
764    /// uniform sample of the COMBINED population. `a` is a uniform reservoir of
765    /// `w_a` true samples, `b` of `w_b` (their `wake_sample_total` weights). Each
766    /// output slot is drawn from `a` with probability `w_a / (w_a + w_b)` and from
767    /// `b` otherwise; within a source the index is uniform. Composing the
768    /// source-level uniform reservoir with the within-source uniform draw makes
769    /// each output a uniform draw from the combined population, so the merged
770    /// A-fraction is the TRUE `w_a / (w_a + w_b)`. This removes the equal-slot
771    /// ("reservoir-of-reservoirs") skew an unweighted concat-and-re-cap imposes:
772    /// two already-capped inputs concat ≈ 50/50 by LENGTH regardless of their true
773    /// populations, over-counting the smaller-population carrier. Sampling WITH
774    /// replacement is the correct estimator once the inputs are capped (each
775    /// reservoir element stands for `w/len` population units; the pre-cap samples
776    /// are gone).
777    ///
778    /// DETERMINISTIC: the xorshift64 stream is seeded from the inputs (populations +
779    /// lengths) so the merge is a PURE function of its arguments — unlike
780    /// `crate::workload::reservoir_push`, whose stream is gettid-seeded
781    /// thread-local (a merge run twice would otherwise differ). The triple-shift
782    /// mirrors the codebase's inline xorshift64 (`reservoir_push` /
783    /// `io::xorshift64`).
784    ///
785    /// Assumes `w_a + w_b < 2^64` — a realistic wake population is far below it
786    /// (2^64 wakeups is physically unreachable), so the single-u64 `s % total`
787    /// draw spans `[0, total)`. Callers gate on `a.len() + b.len() > cap`, which
788    /// (each input ≤ cap) guarantees both sources non-empty; the per-slot guards
789    /// below stay safe for a degenerate hand-built input regardless.
790    pub(crate) fn weighted_merge_reservoirs(
791        a: &[u64],
792        w_a: u64,
793        b: &[u64],
794        w_b: u64,
795        cap: usize,
796    ) -> Vec<u64> {
797        if a.is_empty() && b.is_empty() {
798            return Vec::new();
799        }
800        // Weights are the true populations; fall back to reservoir lengths if a
801        // (hand-built) carrier reports zero population alongside non-empty samples,
802        // keeping the split well-defined instead of dividing by a zero total. The
803        // mixed case (one weight 0, the other > 0) is left as-is: a zero weight
804        // sends every draw to the other source, the only defensible split for a
805        // source claiming zero population. Production maintains wake_sample_total
806        // >= len (reservoir_push counts every push), so neither edge is reachable
807        // on the capture path.
808        let (wa, wb) = if w_a == 0 && w_b == 0 {
809            (a.len() as u128, b.len() as u128)
810        } else {
811            (w_a as u128, w_b as u128)
812        };
813        let total = wa + wb;
814        // Loud-panic on the documented `w_a + w_b < 2^64` assumption (a realistic
815        // wake population is far below it): if total exceeded u64::MAX the
816        // `s as u128 % total` draw — s spans [0, 2^64) — could not reach
817        // [2^64, total) and would silently bias the source split. Matches the merge
818        // SUM's debug-panic-on-overflow discipline (loud over silently wrong).
819        debug_assert!(
820            total <= u64::MAX as u128,
821            "weighted_merge_reservoirs: w_a + w_b overflows u64 ({total}); source draw would bias",
822        );
823        // Golden-ratio Weyl multiplier (the codebase's standard PRNG seed mixer);
824        // a non-zero, input-derived seed makes the merge deterministic. xorshift64
825        // has 0 as a fixed point, hence the fallback.
826        const GOLDEN: u64 = 0x9E37_79B9_7F4A_7C15;
827        let mut s =
828            (w_a ^ w_b.rotate_left(32) ^ (a.len() as u64).rotate_left(16) ^ (b.len() as u64))
829                .wrapping_mul(GOLDEN);
830        if s == 0 {
831            s = GOLDEN;
832        }
833        let step = |x: u64| {
834            let mut v = x;
835            v ^= v << 13;
836            v ^= v >> 7;
837            v ^= v << 17;
838            v
839        };
840        let mut out = Vec::with_capacity(cap);
841        for _ in 0..cap {
842            s = step(s);
843            // Defensive empty-source guards: caller gates ensure both non-empty,
844            // but a stripped / zero-population fixture must never index an empty
845            // slice.
846            let from_a = if a.is_empty() {
847                false
848            } else if b.is_empty() {
849                true
850            } else {
851                (s as u128 % total) < wa
852            };
853            s = step(s);
854            if from_a {
855                out.push(a[(s % a.len() as u64) as usize]);
856            } else {
857                out.push(b[(s % b.len() as u64) as usize]);
858            }
859        }
860        out
861    }
862
863    /// Off-CPU% reduction for the per-phase per-cgroup render:
864    /// `(avg, min, max, spread)` over [`Self::off_cpu_pcts`], or `None` when
865    /// the vec is empty — the NOT-measured state (no worker had positive wall
866    /// time). Reduces the SAME per-worker pcts [`cgroup_stats`] reduces
867    /// (off_cpu_ns / wall_time_ns × 100), so for a phase spanning the whole run
868    /// it reproduces that whole-run reduction; `spread = max − min`.
869    /// `Some((0.0, ..))` is a MEASURED zero (distinct from the `None`
870    /// not-measured state), preserving the discipline the empty-vec contract on
871    /// `off_cpu_pcts` keeps. Display-only: never written back into a re-pool.
872    pub fn off_cpu_summary(&self) -> Option<(f64, f64, f64, f64)> {
873        let pcts = &self.off_cpu_pcts;
874        if pcts.is_empty() {
875            return None;
876        }
877        let min = pcts.iter().cloned().reduce(f64::min).expect("non-empty");
878        let max = pcts.iter().cloned().reduce(f64::max).expect("non-empty");
879        let avg = pcts.iter().sum::<f64>() / pcts.len() as f64;
880        Some((avg, min, max, max - min))
881    }
882
883    /// Wake-latency reduction for the per-phase render:
884    /// `(p99_us, median_us)` over the pooled [`Self::wake_latencies_ns`], or
885    /// `None` when the pool is empty. Nearest-rank percentile via `percentile`
886    /// (ns→µs once), reproducing [`cgroup_stats`]'s p99/median value-for-value
887    /// for the ≤cap pool (and the run-level re-pool's `reduce_sorted_distribution`).
888    /// Above `MAX_WAKE_SAMPLES` the pool is a distribution-preserving reservoir
889    /// subsample (see [`Self::wake_latencies_ns`]), so p99/median is then
890    /// distribution-equivalent, NOT byte-identical, to the full-pool reduction —
891    /// the rendered tail stays accurate, only exact parity is size-bounded.
892    /// `None`-on-empty omits the wake segment from the render rather than
893    /// painting a misleading 0µs (the display analogue of `cgroup_stats`'s
894    /// 0.0-sentinel, which has no Option to carry not-measured).
895    pub fn wake_summary(&self) -> Option<(f64, f64)> {
896        if self.wake_latencies_ns.is_empty() {
897            return None;
898        }
899        let mut sorted = self.wake_latencies_ns.clone();
900        sorted.sort_unstable();
901        let p99 = percentile(&sorted, 0.99) as f64 / 1000.0;
902        let median = percentile(&sorted, 0.5) as f64 / 1000.0;
903        Some((p99, median))
904    }
905
906    /// Timer-latency reduction for the per-phase metric emission:
907    /// `(median_us, p99_us, p999_us)` over the pooled
908    /// [`Self::timer_latencies_ns`] (nearest-rank, ns→µs), or `None` when the
909    /// pool is empty (the not-measured state — `write_carrier_scalars` omits all
910    /// three timer keys together). Reproduces the per-cgroup
911    /// [`crate::assert::cgroup_stats`] timer reductions value-for-value below the
912    /// reservoir cap (distribution-equivalent above it), exactly like
913    /// [`Self::wake_summary`]. The run-level WORST (max) is NOT here — it comes
914    /// from the cross-phase re-pool (`populate_run_distribution_metrics`).
915    pub fn timer_summary(&self) -> Option<(f64, f64, f64)> {
916        if self.timer_latencies_ns.is_empty() {
917            return None;
918        }
919        let mut sorted = self.timer_latencies_ns.clone();
920        sorted.sort_unstable();
921        let median = percentile(&sorted, 0.5) as f64 / 1000.0;
922        let p99 = percentile(&sorted, 0.99) as f64 / 1000.0;
923        let p999 = percentile(&sorted, 0.999) as f64 / 1000.0;
924        Some((median, p99, p999))
925    }
926
927    /// Wake-latency coefficient of variation (stddev / mean) over the pooled
928    /// [`Self::wake_latencies_ns`] (`n = len`), or `None` when the pool is
929    /// empty (the not-measured state — so the `run_metrics` carrier writer
930    /// omits all three wake keys together). `Some(0.0)` when
931    /// the mean is zero is a MEASURED zero. Reproduces [`cgroup_stats`]'s
932    /// `wake_latency_cv` value-for-value for the ≤cap pool (`Σv` in `u64`,
933    /// same accumulation); above `MAX_WAKE_SAMPLES` the pool is a
934    /// distribution-preserving reservoir subsample, so the CV is then
935    /// distribution-equivalent, not byte-identical.
936    pub fn wake_cv(&self) -> Option<f64> {
937        if self.wake_latencies_ns.is_empty() {
938            return None;
939        }
940        let n = self.wake_latencies_ns.len() as f64;
941        let mean_ns = self.wake_latencies_ns.iter().sum::<u64>() as f64 / n;
942        if mean_ns > 0.0 {
943            let variance = self
944                .wake_latencies_ns
945                .iter()
946                .map(|&v| (v as f64 - mean_ns).powi(2))
947                .sum::<f64>()
948                / n;
949            Some(variance.sqrt() / mean_ns)
950        } else {
951            Some(0.0)
952        }
953    }
954
955    /// Run-delay reduction for the per-phase render:
956    /// `(mean_us, worst_us)` over the per-worker [`Self::run_delays_ns`] (raw
957    /// ns), or `None` when empty. Divides ns→µs ONCE on the summed / maxed ns.
958    /// `worst` reproduces [`cgroup_stats`]'s value-for-value (`max(ns)/1000 ==
959    /// max(ns/1000)`, division is monotone). `mean` reproduces it to f64 ULP,
960    /// not bit-exactly: this f64-sums then divides once (`Σns/n/1000`), while
961    /// `cgroup_stats` divides each worker's ns by 1000 first then sums
962    /// (`Σ(ns/1000)/n`) — the same value reassociated, differing only
963    /// sub-display-precision (a divergent-input parity test bounds it at 1e-9).
964    /// Each sample is
965    /// one worker's whole-phase cumulative `sched_info.run_delay` delta, so
966    /// `mean` is the average per-worker total queued-to-run delay and `worst`
967    /// the largest. `None`-on-empty omits the segment.
968    pub fn run_delay_summary(&self) -> Option<(f64, f64)> {
969        if self.run_delays_ns.is_empty() {
970            return None;
971        }
972        let n = self.run_delays_ns.len() as f64;
973        // Sum in f64, NOT u64-then-cast: matches cgroup_stats's f64 accumulation
974        // and cannot integer-overflow (an f64 sum saturates toward +inf; a u64
975        // sum would panic in debug / silently wrap in release on a pathological
976        // pool). Values are identical within the documented 1e-9 ULP bound.
977        let mean = self.run_delays_ns.iter().map(|&v| v as f64).sum::<f64>() / n / 1000.0;
978        let worst = *self.run_delays_ns.iter().max().expect("non-empty") as f64 / 1000.0;
979        Some((mean, worst))
980    }
981}
982
983impl CgroupStats {
984    /// Wake-latency tail amplification:
985    /// `p99_wake_latency_us / median_wake_latency_us`. Returns `0.0`
986    /// when `median_wake_latency_us <= 0.0` so the result never
987    /// propagates `NaN` / `Infinity` into downstream
988    /// `finite_or_zero` filters. Method-only access (no stored
989    /// shadow) — recomputed every call from the raw fields.
990    ///
991    /// Unitless; ≥1.0 by definition of order statistics (p99 cannot
992    /// undershoot the median on the same sample set). Values far
993    /// above 1.0 signal a long tail — the scheduler wakes most
994    /// workers promptly but occasionally stalls some, a regression
995    /// axis that neither `median_*` nor `p99_*` exposes in
996    /// isolation.
997    pub fn wake_latency_tail_ratio(&self) -> f64 {
998        if self.median_wake_latency_us > 0.0 {
999            self.p99_wake_latency_us / self.median_wake_latency_us
1000        } else {
1001            0.0
1002        }
1003    }
1004
1005    /// Whether this cgroup measured the given distribution `source`. Gates the
1006    /// run-level carrier-less re-pool in [`populate_run_distribution_metrics`]
1007    /// so a cgroup that recorded no samples for `source` contributes ABSENCE
1008    /// (leaving the fold `None` when no cgroup measured it), not a sentinel
1009    /// `0.0`. See [`Self::wake_measured`] / [`Self::timer_measured`] /
1010    /// [`Self::run_delay_measured`].
1011    pub fn measured_for(&self, source: crate::stats::SampleSource) -> bool {
1012        match source {
1013            crate::stats::SampleSource::WakeLatencyNs => self.wake_measured,
1014            crate::stats::SampleSource::TimerLatencyNs => self.timer_measured,
1015            crate::stats::SampleSource::RunDelayNs => self.run_delay_measured,
1016        }
1017    }
1018
1019    /// Throughput per parallel degree:
1020    /// `total_iterations / num_workers`. `None` when
1021    /// `num_workers == 0` (no worker reported, so per-worker
1022    /// throughput is undefined — distinct from a measured zero);
1023    /// `Some(0.0)` when workers ran but completed zero iterations
1024    /// (a real throughput collapse). The `None` / `Some(0.0)` split
1025    /// is load-bearing: the run-level worst-cgroup re-pool in
1026    /// [`populate_run_distribution_metrics`] (the
1027    /// `MetricKind::WorstLowest` arm) must treat a measured zero as
1028    /// the worst reading (it wins the "lowest" bucket) while skipping
1029    /// a no-data cgroup — collapsing both to `0.0` would hide a
1030    /// starved cgroup behind the no-data sentinel. Method-only
1031    /// access (no stored shadow) — recomputed every call from the
1032    /// raw fields.
1033    ///
1034    /// Only meaningful across runs of the SAME variant (equal
1035    /// scenario duration): cross-variant comparison is misleading
1036    /// because this metric is NOT rate-normalized — a longer-
1037    /// running scenario racks up more iterations per worker even if
1038    /// the scheduler is identical. `perf-delta`-style
1039    /// comparisons hold scenario, topology, and work_type constant
1040    /// before reading this method.
1041    pub fn iterations_per_worker(&self) -> Option<f64> {
1042        crate::assert::reductions::iterations_per_worker_of(self.num_workers, self.total_iterations)
1043    }
1044
1045    /// Worker iterations per CPU-second of on-CPU time consumed by this
1046    /// cgroup's workers — `total_iterations / (total_cpu_time_ns / 1e9)`.
1047    ///
1048    /// Unlike [`Self::iterations_per_worker`] (raw work, which scales with
1049    /// the host-CPU budget delivered to the guest) and a wall-time rate
1050    /// (which also drops under host oversubscription), this is
1051    /// OVERCOMMIT-INVARIANT: under `cpu_budget < vcpus` a cell completes
1052    /// proportionally fewer iterations AND consumes proportionally less
1053    /// on-CPU time, so the ratio cancels the lost host-CPU-time factor. Use
1054    /// it to compare per-cgroup throughput across `cpu_budget` settings.
1055    ///
1056    /// `None` when `num_workers == 0` (no worker — undefined, distinct from a
1057    /// measured zero) or `total_cpu_time_ns == 0` (no on-CPU time captured;
1058    /// returns inconclusive rather than `Inf`). For a pure busy-spin
1059    /// workload this rate is ~constant by construction, so it measures
1060    /// CPU-time EFFICIENCY; for the cross-cell ALLOCATION balance use
1061    /// [`ScenarioStats::cgroup_balance_ratio`] over `iterations_per_worker`.
1062    pub fn iterations_per_cpu_sec(&self) -> Option<f64> {
1063        crate::assert::reductions::iterations_per_cpu_sec_of(
1064            self.num_workers,
1065            self.total_cpu_time_ns,
1066            self.total_iterations,
1067        )
1068    }
1069}
1070
1071/// Identifier for a scenario phase. Newtype over `u16` carrying
1072/// the same 1-indexed encoding documented on every other
1073/// phase-touching site: `Phase::BASELINE` is the pre-first-Step
1074/// settle window (`u16` 0); `Phase::step(k)` is scenario Step `k`
1075/// at 1-indexed `u16` `k + 1`. The newtype catches the bug class
1076/// where a raw `u16` flows between sites that disagree about
1077/// 0-indexed vs 1-indexed Step encoding, and gives operators
1078/// readable construction at consumer sites (`Phase::BASELINE` /
1079/// `Phase::step(2)` instead of magic `0u16` / `3u16`).
1080///
1081/// Wire-format identical to a `u16` via `#[serde(transparent)]` —
1082/// the on-disk sidecar shape is unchanged from the bare-`u16`
1083/// pipeline, and existing JSON / typeshare consumers see the same
1084/// scalar field. `.phase_raw()` exposes the inner `u16` for paths
1085/// that hand the value to a serializer or formatter that does not
1086/// understand the newtype.
1087#[derive(
1088    Debug,
1089    Clone,
1090    Copy,
1091    PartialEq,
1092    Eq,
1093    Hash,
1094    PartialOrd,
1095    Ord,
1096    Default,
1097    serde::Serialize,
1098    serde::Deserialize,
1099)]
1100#[serde(transparent)]
1101pub struct Phase(u16);
1102
1103impl Phase {
1104    /// Pre-first-Step settle window. The framework writes
1105    /// `Phase::BASELINE` to `Ctx::current_step` at scenario start
1106    /// (before any Step's `current_step.store` advance), so any
1107    /// capture taken before the first Step transition stamps with
1108    /// this value.
1109    pub const BASELINE: Self = Self(0);
1110
1111    /// Construct a `Phase` for the `zero_indexed`-th scenario Step.
1112    /// The 1-indexed encoding (Step 0 → `u16` 1, Step 1 → `u16` 2,
1113    /// ...) keeps `BASELINE` unambiguous at `u16` 0. Saturates at
1114    /// `u16::MAX` rather than overflowing — a scenario with > 65k
1115    /// Steps is pathological and the saturating value still
1116    /// distinguishes "well past any real Step" from BASELINE.
1117    pub const fn step(zero_indexed: u16) -> Self {
1118        Self(zero_indexed.saturating_add(1))
1119    }
1120
1121    /// True iff this is `Phase::BASELINE` (the pre-first-Step
1122    /// settle window).
1123    pub const fn is_baseline(&self) -> bool {
1124        self.0 == 0
1125    }
1126
1127    /// Inner `u16`. Use this when handing the value to a
1128    /// serializer / formatter / external consumer that does not
1129    /// understand the newtype. Production callers that build a
1130    /// `Phase` for downstream comparison should prefer
1131    /// `Phase::BASELINE` / `Phase::step(k)` over wrapping a raw
1132    /// `u16` themselves.
1133    pub const fn as_u16(self) -> u16 {
1134        self.0
1135    }
1136}
1137
1138impl std::fmt::Display for Phase {
1139    /// `"BASELINE"` for [`Phase::BASELINE`], `"Step[k]"` for
1140    /// [`Phase::step`] (decoded back via the 1-indexed
1141    /// encoding). Matches the labels [`PhaseBucket`] embeds in
1142    /// `label` so operators see consistent phase identifiers
1143    /// across structured-sidecar reads and ad-hoc `format!`
1144    /// output.
1145    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1146        if self.is_baseline() {
1147            write!(f, "BASELINE")
1148        } else {
1149            write!(f, "Step[{}]", self.0 - 1)
1150        }
1151    }
1152}
1153
1154impl From<u16> for Phase {
1155    /// Wrap a raw 1-indexed encoded value as a [`Phase`]. Production
1156    /// paths that already have the encoded value (e.g. drained from
1157    /// the host-side mirror of `current_step`, or read out of a
1158    /// deserialized sidecar) construct the typed wrapper via this
1159    /// conversion without re-deriving the encoding.
1160    fn from(value: u16) -> Self {
1161        Self(value)
1162    }
1163}
1164
1165impl From<Phase> for u16 {
1166    fn from(value: Phase) -> Self {
1167        value.0
1168    }
1169}
1170
1171/// Per-phase metric bucket — one entry per scenario phase in
1172/// [`ScenarioStats::phases`].
1173///
1174/// A scenario with N Steps yields `N + 1` phases: phase 0 is the
1175/// BASELINE (pre-first-Step settle window), and phases 1..=N
1176/// correspond to Step 0..Step N-1 in scenario order. The
1177/// 1-indexed Step encoding (instead of 0-indexed) lets BASELINE
1178/// own `step_index = 0` unambiguously — a `step_index = 0` sample
1179/// is always settle, not first-Step.
1180///
1181/// Each bucket carries the metric values reduced over the phase's
1182/// sample window. For `crate::stats::MetricKind::Counter`
1183/// metrics the reduction is `last - first` across the phase's
1184/// periodic samples (cumulative-counter delta); for `Gauge` /
1185/// `Peak` / `Timestamp` it dispatches per the kind via
1186/// `crate::stats::aggregate_samples`. Missing metric keys mean
1187/// the phase had no finite samples for that metric.
1188///
1189/// Metric keys match `crate::stats::MetricDef::name` — see
1190/// `crate::stats::METRICS` for the canonical list of registered
1191/// metric names a `get` / `phase_metric` lookup expects.
1192#[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize, crate::Claim)]
1193pub struct PhaseBucket {
1194    /// Phase index. `0` = BASELINE (pre-first-Step settle window).
1195    /// `1..=N` align with Step ordinals (1-indexed): Step 0 of the
1196    /// scenario lives at `step_index = 1`, Step 1 at
1197    /// `step_index = 2`, etc. The encoding avoids the collision
1198    /// where a 0-indexed Step would share `step_index = 0` with
1199    /// the BASELINE settle window.
1200    pub step_index: u16,
1201    /// Human-readable label. `"BASELINE"` for `step_index = 0`,
1202    /// `"Step[0]"` / `"Step[1]"` / ... for `step_index = 1..=N`.
1203    /// Mirrors the formatting used by
1204    /// `crate::timeline::Timeline`'s phase rendering so operator
1205    /// inspection of the formatted diagnostic and the structured
1206    /// sidecar yield the same phase identifiers.
1207    pub label: String,
1208    /// Phase window start: the MINIMUM per-sample time anchor in the
1209    /// phase — each sample's `boundary_offset_ms`, falling back to its
1210    /// `elapsed_ms`. Samples with neither anchor (both `None` — a
1211    /// not-measured timestamp) are excluded from the min.
1212    pub start_ms: u64,
1213    /// Phase window end: the MAXIMUM per-sample time anchor in the
1214    /// phase (the same `boundary_offset_ms`-or-`elapsed_ms` key as
1215    /// `start_ms`). A phase whose every sample is unanchored yields the
1216    /// inverted window `(start_ms = u64::MAX, end_ms = 0)`, which folds
1217    /// no monitor samples. Downstream renderers should not assume the
1218    /// value is closed against a stimulus event.
1219    pub end_ms: u64,
1220    /// Number of periodic samples bucketed into this phase. Zero
1221    /// when the phase fired no captures: BASELINE when the settle window
1222    /// was shorter than the periodic interval, OR a synthesized
1223    /// capture-free interior step (the
1224    /// `build_phase_buckets_with_stimulus` seam — a `StepStart`-step
1225    /// whose window held no periodic boundary still gets a bucket so its
1226    /// capture-independent `iteration_rate` is not dropped).
1227    pub sample_count: usize,
1228    /// Per-metric phase-aggregated values. See the [`PhaseBucket`]
1229    /// struct doc for the registry key source and per-kind reduction
1230    /// dispatch; missing keys mean the phase carried no finite
1231    /// samples for that metric (sentinel-free: `None` from the
1232    /// reducer surfaces as "key absent" rather than "value 0.0").
1233    pub metrics: std::collections::BTreeMap<String, f64>,
1234    /// Per-cgroup raw telemetry components for this phase, keyed by cgroup
1235    /// name (see [`PhaseCgroupStats`]). Empty until a capture path populates
1236    /// it; the structural carrier for the per-phase per-cgroup distributional
1237    /// re-pool. Whole-run = aggregate of these per-phase per-cgroup components.
1238    ///
1239    /// An ORPHAN bucket — a guest carrier whose `step_index` has NO paired host
1240    /// bucket (a dropped/absent StepStart frame, or a stimulus-less host/fixture
1241    /// path; NOT merely a short step, since `build_phase_buckets_with_stimulus`
1242    /// synthesizes a bucket for every StepStart so a captured-but-short step
1243    /// takes the matched arm) — is carried by
1244    /// `fold_guest_per_cgroup_into_host_buckets` with the shape
1245    /// `(start_ms, end_ms) == (0, 0)` AND empty `metrics` AND non-empty
1246    /// `per_cgroup` (it carries only these components). On every non-zero-duration
1247    /// window that shape is the orphan arm's, so the timeline render keys on it
1248    /// to surface "window not measured" rather than a misleading `0ms` (see
1249    /// `crate::timeline::phase_from_bucket`): a captured bucket has metrics. A
1250    /// zero-duration step at scenario start (`StepStart==StepEnd==0`) can also
1251    /// produce it via the matched arm, but harmlessly — a zero-duration step has
1252    /// no window, so "not measured" reads the same as "0ms".
1253    pub per_cgroup: std::collections::BTreeMap<String, PhaseCgroupStats>,
1254}
1255
1256impl PhaseBucket {
1257    /// Look up the phase-aggregated value for `metric_name` (see
1258    /// [`PhaseBucket::metrics`] for the registry source). Returns
1259    /// `None` when the phase carried no finite samples for that
1260    /// metric — distinct from `Some(0.0)` which means the reducer
1261    /// produced a real zero from finite samples.
1262    pub fn get(&self, metric_name: &str) -> Option<f64> {
1263        self.metrics.get(metric_name).copied()
1264    }
1265
1266    /// Like [`Self::get`], but panics with a diagnostic message citing
1267    /// the bucket's `step_index` + `label` + `sample_count` + the set
1268    /// of metric keys actually present when the metric is absent. Use
1269    /// when the caller knows the metric MUST be in the bucket (the
1270    /// phase fired samples and the metric is registered — see
1271    /// [`PhaseBucket::metrics`]) — the panic message tells the operator whether the cause is
1272    /// "phase produced no samples" (sample_count of 0) or "metric key
1273    /// typo" (positive sample_count but the key isn't in `metrics`).
1274    ///
1275    /// ```ignore
1276    /// let bucket = r.stats.phase(Phase::step(0)).expect("Step[0] phase");
1277    /// let throughput = bucket.expect_metric("throughput");
1278    /// ```
1279    pub fn expect_metric(&self, metric_name: &str) -> f64 {
1280        self.get(metric_name).unwrap_or_else(|| {
1281            panic!(
1282                "PhaseBucket::expect_metric: metric '{}' absent from phase \
1283                 step_index={} ('{}') with sample_count={}. \
1284                 metric keys present in this bucket: {:?}. \
1285                 Possible causes: (a) phase carried 0 samples for this \
1286                 metric (sample_count==0 means no captures landed in the \
1287                 phase at all; sample_count>0 means captures landed but \
1288                 the metric extracted no finite values from them); \
1289                 (b) metric name typo — built-ins are typed via \
1290                 crate::stats::BuiltinMetric (a typo is a compile \
1291                 error); a dynamic key resolves via crate::stats::MetricId::def.",
1292                metric_name,
1293                self.step_index,
1294                self.label,
1295                self.sample_count,
1296                self.metrics.keys().collect::<Vec<_>>(),
1297            )
1298        })
1299    }
1300
1301    /// Cross-cgroup phase total for a per-cgroup Counter metric that lives
1302    /// in [`Self::per_cgroup`] but never in [`Self::metrics`] — currently
1303    /// `"total_migrations"`, `"total_iterations"`, and `"total_cpu_time_ns"`.
1304    ///
1305    /// These are registered `MetricKind::Counter`s
1306    /// (`crate::stats::METRICS`) whose per-sample source is absent
1307    /// (`crate::stats::MetricDef::read_sample` returns `None` — they are
1308    /// per-task guest counters not captured per tick), so
1309    /// [`crate::assert::build_phase_buckets`] never folds them into
1310    /// `metrics`; only the per-cgroup carrier ([`PhaseCgroupStats`], built
1311    /// by `phase_cgroup_stats`) holds them. This sums them across the
1312    /// phase's `per_cgroup` carriers — the SAME cross-cgroup Counter sum
1313    /// [`ScenarioStats::total_migrations`] takes run-level and
1314    /// `merge_matched_phase_buckets` takes per key — so the value is the
1315    /// phase total, not a per-cgroup fragment.
1316    ///
1317    /// `None` when the phase has no `per_cgroup` carriers (NOT measured —
1318    /// distinct from a measured `Some(0.0)` when carriers exist but counted
1319    /// zero) or `name` is not one of the per-cgroup-sourced counters.
1320    /// Surfacing them here never double-sources the run-level `ext_metrics`,
1321    /// but by two different mechanisms: `total_migrations` / `total_iterations`
1322    /// are in `TYPED_FIELD_NAMES` (`populate_run_ext_metrics_from_phases` skips
1323    /// them; the typed `GauntletRow` accessor stays the single run-level
1324    /// authority), while `total_cpu_time_ns` is NOT in `TYPED_FIELD_NAMES` —
1325    /// it is safe because it is never written into any pooled `metrics` map
1326    /// (its `MetricDef` accessor and `read_sample` both return `None`, so no
1327    /// run-level fold ever picks it up; it lives only on the per-cgroup
1328    /// carrier).
1329    ///
1330    /// Counterpart to [`Self::get`] (which reads `metrics` only).
1331    /// [`crate::vmm::VmResult::phase_metric`] falls back to this so a
1332    /// `post_vm` callback reading `phase_metric(phase, "total_migrations")`
1333    /// gets the value instead of a silent `None`.
1334    pub fn cgroup_counter_total(&self, name: &str) -> Option<f64> {
1335        let field: fn(&PhaseCgroupStats) -> u64 = match name {
1336            "total_migrations" => |c| c.total_migrations,
1337            "total_iterations" => |c| c.total_iterations,
1338            "total_cpu_time_ns" => |c| c.total_cpu_time_ns,
1339            _ => return None,
1340        };
1341        if self.per_cgroup.is_empty() {
1342            return None;
1343        }
1344        // saturating: overflow-safe cross-cgroup pool of the guest counter for
1345        // the post_vm by-name read (a wrapped sum would read silently-small).
1346        Some(
1347            self.per_cgroup
1348                .values()
1349                .map(field)
1350                .fold(0u64, u64::saturating_add) as f64,
1351        )
1352    }
1353}
1354
1355/// Merge two [`PhaseBucket`]s sharing the same `step_index` per
1356/// the per-MetricKind dispatch in [`crate::stats::MergeKind`].
1357/// Called by [`AssertResult::merge`] for matched buckets;
1358/// unmatched buckets are appended verbatim by the caller.
1359///
1360/// Window-invariant merge:
1361/// - `step_index`: equal by precondition (caller pairs buckets by
1362///   `step_index`), kept from `a`.
1363/// - `label`: kept from `a`. By construction the label is derived
1364///   purely from `step_index` (`"BASELINE"` / `"Step[k]"`) so both
1365///   sides agree.
1366/// - `start_ms`: `min(a.start_ms, b.start_ms)` so the merged
1367///   window covers the earliest start of either side.
1368/// - `end_ms`: `max(a.end_ms, b.end_ms)` so the merged window
1369///   covers the latest end. Drives the [`crate::stats::MergeKind::NonCommutative`]
1370///   tiebreak on Gauge(Last) / Timestamp metrics — the value
1371///   from the bucket whose `end_ms` is later wins.
1372/// - `sample_count`: `a + b`. Used as the weighting denominator
1373///   for the `MetricKind::Gauge(GaugeAgg::Avg)` weighted mean.
1374///
1375/// Per-metric merge dispatches on the metric's `crate::stats::MetricKind`
1376/// from the registry via [`crate::stats::metric_def`]:
1377/// - `MetricKind::Counter` → `a + b` (the two reduced values are
1378///   per-phase deltas; the merge across cgroups sums per-cgroup
1379///   contributions to the phase delta, mirroring how
1380///   `ScenarioStats::total_migrations` adds across cgroups).
1381/// - `MetricKind::Peak` and `MetricKind::Gauge(GaugeAgg::Max)` →
1382///   `max(a, b)` (the worst-case "peak that fired" survives).
1383/// - `MetricKind::Gauge(GaugeAgg::Avg)` → weighted mean
1384///   `(a * a_w + b * b_w) / (a_w + b_w)` where `a_w = a_count.max(1)`
1385///   and `b_w = b_count.max(1)` — the unbiased combination of both
1386///   sides' per-phase means weighted by sample population, each weight
1387///   floored at 1. The `.max(1)` floor (mirroring
1388///   `populate_run_ext_metrics_from_phases`) keeps a synthesized
1389///   zero-capture bucket's capture-independent Gauge(Avg) value
1390///   contributing one phase-observation of weight rather than being
1391///   zero-weighted out of
1392///   the merge — the silent-drop the synthesize seam exists to prevent.
1393///   With both counts > 0 the floor is a no-op (the plain
1394///   sample-population weighting); both counts zero degenerates to
1395///   `(a + b) / 2.0`.
1396/// - `MetricKind::Gauge(GaugeAgg::Last)` and `MetricKind::Timestamp`
1397///   → value from the bucket with the larger `end_ms`; ties keep
1398///   `a`'s value. Captures the "latest-sample-wins" semantic per
1399///   the [`crate::stats::MergeKind::NonCommutative`] contract.
1400/// - `MetricKind::Rate { .. }` → SKIPPED in the per-key fold and
1401///   re-derived from the pooled components by
1402///   [`crate::stats::derive_rate_metrics`] as a post-pass, so the
1403///   merged rate is `Σnumerator / Σdenominator` (each component
1404///   folds by its own kind first) rather than a fold of two
1405///   ready-made per-phase ratios.
1406///
1407/// Unregistered metric names (not in `crate::stats::METRICS`)
1408/// fall back to a commutative arithmetic mean
1409/// `(a + b) / 2.0`. The mean is the safest default for an unknown
1410/// kind: sum would over-count Gauge / Timestamp values, max would
1411/// lose Counter / Avg signal, and "last" requires a tiebreak the
1412/// caller can't compute without the kind. Producers attaching
1413/// unregistered metrics to a `PhaseBucket` should add them to
1414/// `METRICS` to get the typed merge instead of the fallback.
1415pub(crate) fn merge_matched_phase_buckets(a: PhaseBucket, b: PhaseBucket) -> PhaseBucket {
1416    assert_eq!(
1417        a.step_index, b.step_index,
1418        "merge_matched_phase_buckets: caller must pair by step_index",
1419    );
1420    let mut metrics = std::collections::BTreeMap::new();
1421    // Collect every key present on either side; iterate once,
1422    // dispatching per the kind of the key (or the unregistered
1423    // mean fallback) so the merge is single-pass.
1424    let mut keys: std::collections::BTreeSet<&String> = a.metrics.keys().collect();
1425    keys.extend(b.metrics.keys());
1426    for key in keys {
1427        // Derived metrics (every `is_derived()`: Rate / Distribution / WorstLowest /
1428        // WakeLatencyTailRatio / WorstCrossNodeRatio / PerPhase) are NOT merged
1429        // here: a Rate re-derives from the merged components in the post-pass
1430        // below; the distributional kinds (Distribution / WorstLowest /
1431        // WakeLatencyTailRatio / WorstCrossNodeRatio) re-pool run-level by
1432        // `populate_run_distribution_metrics` (they never appear in
1433        // phase.metrics — `aggregate_samples_for_phase` returns None — so this
1434        // skip is also a structural guard); PerPhase is re-derived by
1435        // `derive_phase_metrics`. Folding a ready-made derived value
1436        // would lose the re-pool.
1437        if crate::stats::metric_def(key).is_some_and(|m| m.kind.is_derived()) {
1438            continue;
1439        }
1440        let av = a.metrics.get(key).copied();
1441        let bv = b.metrics.get(key).copied();
1442        let merged = match (av, bv) {
1443            (Some(av), Some(bv)) => {
1444                let kind = crate::stats::metric_def(key).map(|m| m.kind);
1445                merge_metric_values(
1446                    kind,
1447                    av,
1448                    bv,
1449                    a.sample_count,
1450                    b.sample_count,
1451                    a.end_ms,
1452                    b.end_ms,
1453                )
1454            }
1455            (Some(v), None) | (None, Some(v)) => v,
1456            (None, None) => continue,
1457        };
1458        metrics.insert(key.clone(), merged);
1459    }
1460    // Re-derive Rate metrics from the now-pooled components: each
1461    // component merged by its own kind above (a Counter numerator
1462    // summed), so the rate becomes Σnumerator / Σdenominator — the
1463    // correct re-pool, not a mean of the two phases' ready-made ratios.
1464    crate::stats::derive_rate_metrics(&mut metrics);
1465    // Union per_cgroup by cgroup name: a cgroup present on both sides folds
1466    // its raw components per PhaseCgroupStats::merge (concat samples, sum
1467    // counters, combine extremes); a cgroup on only one side is carried
1468    // verbatim. Empty ∪ empty = empty, so this is a no-op until a capture
1469    // path populates per_cgroup (the structural-carrier invariant).
1470    let mut per_cgroup = a.per_cgroup;
1471    for (name, b_cg) in b.per_cgroup {
1472        match per_cgroup.remove(&name) {
1473            Some(a_cg) => {
1474                per_cgroup.insert(name, PhaseCgroupStats::merge(a_cg, b_cg));
1475            }
1476            None => {
1477                per_cgroup.insert(name, b_cg);
1478            }
1479        }
1480    }
1481    PhaseBucket {
1482        step_index: a.step_index,
1483        label: a.label,
1484        start_ms: a.start_ms.min(b.start_ms),
1485        end_ms: a.end_ms.max(b.end_ms),
1486        sample_count: a.sample_count.saturating_add(b.sample_count),
1487        metrics,
1488        per_cgroup,
1489    }
1490}
1491
1492/// Fold the guest-collected per-phase `per_cgroup` carriers into the
1493/// host-rebuilt phase buckets, keyed by `step_index`.
1494///
1495/// The host rebuilds phase buckets from the periodic-capture series
1496/// (window + metric folds), but those buckets carry an empty `per_cgroup`
1497/// by construction. The guest collects per-cgroup RAW components per step
1498/// ([`crate::scenario::collect_handles`] under `collect_step`) into carrier
1499/// buckets whose only payload is `per_cgroup` — a merge-neutral
1500/// `(u64::MAX, 0)` window and empty `metrics`. Guest and host `step_index`
1501/// are the SAME 1-indexed value: the step loop stamps
1502/// `phase_step_index = step_idx + 1` onto BOTH the `StepStart` frames the
1503/// host rebuilds buckets from AND the `collect_step` carrier, so pairing by
1504/// `step_index` is exact and cannot drift.
1505///
1506/// Each guest carrier whose `step_index` matches a host bucket folds its
1507/// `per_cgroup` in via [`merge_matched_phase_buckets`] — a no-op on the
1508/// host's window (`min`/`max` against `MAX`/`0`), metrics (the carrier has
1509/// none, so each host key is carried verbatim), and `sample_count` (`+ 0`),
1510/// contributing ONLY the unioned `per_cgroup`. A guest `step_index` with no
1511/// host bucket — a DEFENSIVE case: the carrier's `step_index` has no `StepStart`
1512/// frame in the host stimulus timeline (a dropped/absent stimulus frame, or a
1513/// stimulus-less host/fixture path), since `build_phase_buckets_with_stimulus`
1514/// SYNTHESIZES a capture-free bucket for every StepStart-step, so a
1515/// captured-but-short step takes the matched arm above, not this one — is carried
1516/// verbatim with its window normalized to `(0, 0)` so duration consumers
1517/// (`end_ms - start_ms`) never underflow the merge-neutral sentinel — no
1518/// `per_cgroup` datum is silently dropped. With no guest carriers (a run
1519/// with no step-local cgroups) the host buckets pass through unchanged. The
1520/// returned vec is sorted by `step_index`.
1521pub(crate) fn fold_guest_per_cgroup_into_host_buckets(
1522    host_buckets: Vec<PhaseBucket>,
1523    guest_buckets: Vec<PhaseBucket>,
1524) -> Vec<PhaseBucket> {
1525    let host_len = host_buckets.len();
1526    // No-silent-drops: host buckets have unique step_index
1527    // (build_phase_buckets_with_stimulus emits one bucket per step_index), but
1528    // fold same-step_index duplicates via merge rather than a last-wins collect so
1529    // a future producer that violated the invariant DEGRADES to a merge, never a
1530    // silent release-mode drop. The debug_assert still trips loudly in test/debug.
1531    let mut by_idx: std::collections::BTreeMap<u16, PhaseBucket> =
1532        std::collections::BTreeMap::new();
1533    for b in host_buckets {
1534        match by_idx.remove(&b.step_index) {
1535            Some(existing) => {
1536                by_idx.insert(b.step_index, merge_matched_phase_buckets(existing, b));
1537            }
1538            None => {
1539                by_idx.insert(b.step_index, b);
1540            }
1541        }
1542    }
1543    debug_assert_eq!(
1544        by_idx.len(),
1545        host_len,
1546        "host buckets must have unique step_index; a collision merged (not dropped)",
1547    );
1548    for gb in guest_buckets {
1549        // Every guest carrier MUST carry the merge-neutral (u64::MAX, 0) sentinel
1550        // window (the step_per_cgroup_bucket invariant). Validate it BEFORE the
1551        // match so BOTH arms are guarded: the matched arm relies on the window
1552        // being merge-neutral (min/max no-op against the host window), and the
1553        // orphan arm normalizes it to (0,0). A future caller handing a
1554        // real-window carrier (incl. a duplicate orphan via the matched arm)
1555        // trips loudly instead of silently corrupting the merged window.
1556        debug_assert!(
1557            gb.start_ms == u64::MAX && gb.end_ms == 0,
1558            "guest carrier must carry the merge-neutral (u64::MAX, 0) window; got ({}, {})",
1559            gb.start_ms,
1560            gb.end_ms,
1561        );
1562        match by_idx.remove(&gb.step_index) {
1563            Some(hb) => {
1564                by_idx.insert(gb.step_index, merge_matched_phase_buckets(hb, gb));
1565            }
1566            None => {
1567                // Orphan arm: a guest carrier whose step_index has no host bucket.
1568                //
1569                // Invariant: build_phase_buckets_with_stimulus synthesizes a host
1570                // bucket for every StepStart-step, so a carrier whose step has a
1571                // StepStart frame always takes the matched arm above. This arm is
1572                // reached only by a carrier whose step has NO StepStart frame —
1573                // defensive, not produced by normal capture.
1574                //
1575                // Normalize the merge-neutral sentinel window to (0,0) so duration
1576                // consumers don't underflow it. The resulting (0,0)-window +
1577                // empty-metrics + non-empty-per_cgroup shape is the orphan
1578                // signature the timeline render keys on to show "window not
1579                // measured" instead of a misleading 0ms — the (0,0) means "no host
1580                // window known", NOT a measured zero-duration step.
1581                //
1582                // A zero-duration step at scenario start (StepStart==StepEnd==0)
1583                // produces the same shape via the matched arm, but harmlessly: a
1584                // zero-duration step has no window, so the render's "not measured"
1585                // reads the same as "0ms". See `crate::timeline::phase_from_bucket`.
1586                let mut orphan = gb;
1587                orphan.start_ms = 0;
1588                orphan.end_ms = 0;
1589                by_idx.insert(orphan.step_index, orphan);
1590            }
1591        }
1592    }
1593    by_idx.into_values().collect()
1594}
1595
1596/// Per-metric merge inner helper used by
1597/// [`merge_matched_phase_buckets`]. Dispatches on the metric's
1598/// `crate::stats::MetricKind` (or the unregistered fallback)
1599/// to combine two reduced values into one.
1600///
1601/// `a_count` / `b_count` are the source buckets' `sample_count`
1602/// fields, used as weights for `Gauge(Avg)`. `a_end_ms` /
1603/// `b_end_ms` are the source buckets' window-end timestamps,
1604/// used to pick the later sample for `Gauge(Last)` / `Timestamp`.
1605fn merge_metric_values(
1606    kind: Option<crate::stats::MetricKind>,
1607    a: f64,
1608    b: f64,
1609    a_count: usize,
1610    b_count: usize,
1611    a_end_ms: u64,
1612    b_end_ms: u64,
1613) -> f64 {
1614    use crate::stats::{GaugeAgg, MetricKind};
1615    match kind {
1616        // Counter (cumulative) and DeltaSum (sum of per-read deltas) merge
1617        // across AssertResults by summing the reduced values (commutative — see
1618        // MetricKind::merge_kind). PerPhaseDeltaSum shares the `a + b` arm, but
1619        // it is a run-wide POOLED per-phase scalar (injected once into the host
1620        // bucket, never per-cgroup), so this same-step-index merge is the
1621        // defensive path only — two real PerPhaseDeltaSum values are never
1622        // summed here.
1623        Some(MetricKind::Counter)
1624        | Some(MetricKind::DeltaSum)
1625        | Some(MetricKind::PerPhaseDeltaSum) => a + b,
1626        Some(MetricKind::Peak) | Some(MetricKind::Gauge(GaugeAgg::Max)) => a.max(b),
1627        Some(MetricKind::Gauge(GaugeAgg::Avg)) => {
1628            // Weight by sample_count, floored at 1: a sample_count==0
1629            // bucket carrying a capture-independent Gauge(Avg) value must
1630            // contribute one phase-observation of weight, not be
1631            // zero-weighted out of the merge. Mirrors the .max(1) floor in
1632            // populate_run_ext_metrics_from_phases. With both counts > 0
1633            // the floor is a no-op (the prior sample_count weighting);
1634            // with both 0 each still floors to weight 1, giving the
1635            // (a+b)/2 equal-weight mean (the aggregate_samples_weighted
1636            // zero-total-weight fallback is unreachable from here).
1637            // (iteration_rate — the original synthesized zero-capture case —
1638            // is now a MetricKind::Rate: merge_matched_phase_buckets skips it
1639            // via the Rate `continue` in its key loop (above this fn) and
1640            // re-pools it from its summed Counter components, so a Rate value
1641            // never reaches this Gauge(Avg) fold.)
1642            let a_w = a_count.max(1) as f64;
1643            let b_w = b_count.max(1) as f64;
1644            (a * a_w + b * b_w) / (a_w + b_w)
1645        }
1646        Some(MetricKind::Gauge(GaugeAgg::Last)) | Some(MetricKind::Timestamp) => {
1647            if b_end_ms > a_end_ms { b } else { a }
1648        }
1649        // Derived kinds (every `is_derived()`: Rate / Distribution / WorstLowest /
1650        // WakeLatencyTailRatio / WorstCrossNodeRatio / PerPhase /
1651        // PerRunDistribution) are skipped in the merge loop (see
1652        // `merge_matched_phase_buckets`'s `is_derived` continue) and produced
1653        // post-merge (`derive_rate_metrics` / `populate_run_distribution_metrics`
1654        // / `populate_run_pooled_schbench_distribution`), so a derived value never
1655        // reaches this per-value merge — folding a ready-made derived value
1656        // would lose the re-pool. (PerRunDistribution is additionally run-level
1657        // only: it never appears in a PhaseBucket, so it cannot reach this
1658        // per-phase-bucket merge at all.)
1659        Some(MetricKind::Rate { .. })
1660        | Some(MetricKind::Distribution { .. })
1661        | Some(MetricKind::WorstLowest { .. })
1662        | Some(MetricKind::WakeLatencyTailRatio)
1663        | Some(MetricKind::WorstCrossNodeRatio)
1664        | Some(MetricKind::PerPhase)
1665        | Some(MetricKind::PerRunDistribution) => unreachable!(
1666            "derived metrics (Rate/Distribution/WorstLowest/WakeLatencyTailRatio/WorstCrossNodeRatio/PerPhase/PerRunDistribution) are produced post-merge, not merged as values"
1667        ),
1668        // Unregistered metric: commutative mean fallback. Sum
1669        // would over-count Gauge values; max would lose Counter
1670        // signal; "last" needs a tiebreak the caller can't
1671        // compute without the kind. Mean is the safest commutative
1672        // default.
1673        None => (a + b) / 2.0,
1674    }
1675}