ktstr/assert/
run_metrics.rs

1use super::*;
2
3/// Aggregated statistics across all cgroups in a scenario.
4#[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize, crate::Claim)]
5pub struct ScenarioStats {
6    /// Per-cgroup stats, one entry per cgroup.
7    pub cgroups: Vec<CgroupStats>,
8    /// Sum of workers across all cgroups.
9    pub total_workers: usize,
10    /// Sum of per-cgroup distinct CPU counts (not deduplicated across cgroups).
11    pub total_cpus: usize,
12    /// Sum of migration counts across all cgroups.
13    pub total_migrations: u64,
14    /// Worst spread across any cgroup (highest).
15    pub worst_spread: f64,
16    /// Worst gap across any cgroup (highest, ms). Paired with
17    /// `worst_gap_cpu` — both come from the same cgroup.
18    pub worst_gap_ms: u64,
19    /// CPU where the worst gap occurred across all cgroups. Paired
20    /// with `worst_gap_ms` — both come from the same cgroup.
21    pub worst_gap_cpu: usize,
22    /// Worst migration ratio across any cgroup (highest).
23    pub worst_migration_ratio: f64,
24    /// Sum of iteration counts across all cgroups.
25    pub total_iterations: u64,
26    // worst_page_locality is NO LONGER a typed field: it is
27    // `crate::stats::MetricKind::WorstLowest` (NumaLocal/NumaTotal), re-pooled
28    // None-aware post-merge by `populate_run_distribution_metrics` from the
29    // per-phase NUMA carriers (the lowest per-cgroup page_locality, a measured
30    // 0.0 winning) — the `worst_page_locality` method shared with
31    // `run_metric`. The deleted typed field folded via `fold_lowest_nonzero`,
32    // which SKIPPED a measured 0.0 and reported a better-than-worst cross-run
33    // value; the ext re-pool fixes both the field and the read surfaces at once.
34    // worst_cross_node_migration_ratio is NO LONGER a typed field: it is
35    // `crate::stats::MetricKind::WorstCrossNodeRatio`, re-pooled post-merge by
36    // `populate_run_distribution_metrics` from the per-phase NUMA carriers (the
37    // MAX per-cgroup cross-node churn ratio over the latest residency total) via
38    // the `worst_cross_node_migration_ratio` method shared with `run_metric`. The
39    // deleted typed `Gauge(Last)` field/GauntletRow column was merge-max-folded
40    // within-run but cross-run averaged each run's value over `passes_observed`
41    // (folding a NUMA-less passing run's 0.0 sentinel into the mean), AND
42    // diverged from `run_metric` (which already re-derived from the per-phase
43    // carriers) — so the sidecar and the in-test read gave different values on
44    // multi-phase runs. The ext re-pool writes no key for a never-measured run,
45    // so the MEAN divides only by runs that measured NUMA. A CHURN ratio
46    // (cumulative migration EVENTS / final-snapshot resident pages), so it can
47    // exceed 1.0; NOT a bounded `[0,1]` fraction.
48    // worst_wake_latency_tail_ratio is NO LONGER a typed field: it is
49    // `crate::stats::MetricKind::WakeLatencyTailRatio`, re-selected into
50    // `ext_metrics` post-merge by `populate_run_distribution_metrics` (max
51    // over the per-cgroup `CgroupStats::wake_latency_tail_ratio` values,
52    // floor-gated below WAKE_LATENCY_TAIL_RATIO_MIN_ITERATIONS); `MetricDef::read`
53    // surfaces it via the ext fallback.
54    /// Extensible metrics for the generic comparison pipeline.
55    /// Populated from per-cgroup ext_metrics (worst value across cgroups).
56    pub ext_metrics: BTreeMap<String, f64>,
57    /// Per-phase metric buckets in step-index order. A scenario
58    /// with N Steps populates `N + 1` entries: phase 0 is the
59    /// BASELINE settle window before Step 0 fires, phases
60    /// 1..=N align with Step 0..Step N-1 in scenario order
61    /// (1-indexed Steps so the BASELINE encoding doesn't collide
62    /// with first-Step's index).
63    ///
64    /// Empty when the scenario produced no periodic captures
65    /// (Default::default() yields `vec![]`). The existing
66    /// flat-bucket scalars on this struct are independent of the
67    /// per-phase view — they remain the "all phases merged"
68    /// reading, unchanged in semantics by the introduction of
69    /// `phases`.
70    ///
71    /// **Auto-populated by the framework**: scenarios that fire
72    /// periodic captures (via
73    /// [`crate::test_support::KtstrTestEntry::num_snapshots`] or
74    /// [`crate::scenario::ops::Op::CaptureSnapshot`]) have this
75    /// field populated automatically inside
76    /// `crate::test_support::eval`'s `evaluate_vm_result` —
77    /// test code never needs to call
78    /// [`crate::assert::build_phase_buckets`] manually. The auto-
79    /// populate path drains the snapshot bridge from the
80    /// [`crate::vmm::VmResult`] returned by the framework and folds
81    /// the per-sample readings through
82    /// `crate::stats::aggregate_samples_for_phase` per metric.
83    /// Single-phase scenarios that fire no captures leave this
84    /// `vec![]`; the flat-bucket scalars on this struct cover the
85    /// single-phase case.
86    ///
87    /// See [`PhaseBucket`] for the per-phase shape.
88    #[serde(default)]
89    pub phases: Vec<PhaseBucket>,
90}
91
92impl ScenarioStats {
93    /// Look up the phase bucket for a [`Phase`] — `Phase::BASELINE` for the
94    /// pre-first-Step settle window, `Phase::step(k)` for the test author's
95    /// 0-indexed Step k. The typed `Phase` keeps the 1-indexed encoding (Step 0
96    /// lives at the underlying `step_index = 1`) invisible at the call site.
97    ///
98    /// Returns `None` when no bucket with that phase exists (single-phase
99    /// scenario, the scenario didn't reach that Step, or a phase past the last).
100    pub fn phase(&self, phase: Phase) -> Option<&PhaseBucket> {
101        self.phases.iter().find(|p| p.step_index == phase.as_u16())
102    }
103
104    /// Shortcut: look up a single metric value in a specific
105    /// phase by phase-index. Returns `None` when:
106    /// (a) the phase is absent (no bucket with `step_index` in
107    ///     [`Self::phases`]),
108    /// (b) the phase exists but had no finite samples for that
109    ///     metric, OR
110    /// (c) `metric` is a dynamic [`crate::stats::MetricId`] key (a
111    ///     scheduler-runtime / payload string) not present in this phase's
112    ///     stores. A built-in [`crate::stats::BuiltinMetric`] cannot be a
113    ///     typo — that is a compile error — so this case needs a dynamic key.
114    ///
115    /// Two stores are checked: [`PhaseBucket::metrics`] (via
116    /// [`PhaseBucket::get`]) and, failing that, the cross-cgroup phase
117    /// SUM of a per-cgroup Counter ([`PhaseBucket::cgroup_counter_total`])
118    /// for `"total_migrations"` / `"total_iterations"` / `"total_cpu_time_ns"` —
119    /// registered `Counter`s with no per-sample source, so they live ONLY in the
120    /// per-cgroup carriers; without this fallback the pooled lookup
121    /// returned a silent `None` while the per-cgroup
122    /// [`Self::phase_cgroup_metric`] surfaced the value. Symmetric with
123    /// [`crate::vmm::VmResult::phase_metric`].
124    ///
125    /// Sentinel-free: `Some(0.0)` means the reducer produced a
126    /// real zero from finite samples, NOT "missing data". See
127    /// [`PhaseBucket::metrics`] for the registry source. When
128    /// debugging an unexpected `None` on a dynamic key, check
129    /// [`crate::stats::MetricId::def`]`().is_some()` to tell an unregistered
130    /// key from absent data (built-in ids always resolve).
131    ///
132    /// Pass `Phase::BASELINE` for the settle window or `Phase::step(k)` for the
133    /// test author's 0-indexed Step k — the typed `Phase` hides the 1-indexed
134    /// encoding (see [`Self::phase`]).
135    pub fn phase_metric(
136        &self,
137        phase: Phase,
138        metric: impl Into<crate::stats::MetricId>,
139    ) -> Option<f64> {
140        let metric = metric.into();
141        self.phase(phase).and_then(|p| {
142            p.get(metric.as_str())
143                .or_else(|| p.cgroup_counter_total(metric.as_str()))
144        })
145    }
146
147    /// Cross-cgroup balance: the ratio of the busiest cell's per-worker
148    /// throughput to the quietest's — `max / min` over each cgroup's
149    /// [`CgroupStats::iterations_per_worker`]. The bread-and-butter
150    /// scheduler-fairness assertion (every balance test hand-rolls this
151    /// `max/min` over `self.cgroups` today).
152    ///
153    /// No-worker cgroups (`iterations_per_worker() == None`) are SKIPPED: a
154    /// 0-worker cell is a config condition, not a balance signal. Returns
155    /// `None` when fewer than two cgroups have workers (a ratio needs two);
156    /// check the cgroup count separately if every declared cell must have
157    /// workers. A cell that ran workers but completed zero iterations
158    /// (measured `Some(0.0)`) drives the ratio to `f64::INFINITY` so
159    /// starvation SURFACES rather than vanishing — matching the
160    /// `None`-vs-`Some(0.0)` discipline of
161    /// [`CgroupStats::iterations_per_worker`]. For an explicit starvation
162    /// gate, check `min > 0` over the same cgroups separately.
163    ///
164    /// Whole-run aggregate: this reads `self.cgroups`, which sums over all
165    /// phases. For a single phase's per-cgroup balance in a multi-phase
166    /// scenario, read each cgroup's per-phase throughput (`total_iterations`
167    /// / `num_workers`) from [`crate::vmm::VmResult::phase_cgroup`] — the
168    /// per-phase per-cgroup carriers now landed in
169    /// [`PhaseBucket::per_cgroup`].
170    pub fn cgroup_balance_ratio(&self) -> Option<f64> {
171        let mut min = f64::INFINITY;
172        let mut max = 0.0_f64;
173        let mut n = 0usize;
174        for cg in &self.cgroups {
175            if let Some(rate) = cg.iterations_per_worker() {
176                min = min.min(rate);
177                max = max.max(rate);
178                n += 1;
179            }
180        }
181        if n < 2 {
182            return None;
183        }
184        if min == 0.0 {
185            // A with-worker cell did zero work: starvation. Surface it as an
186            // infinite ratio rather than a NaN (0/0) or a hidden None.
187            return Some(f64::INFINITY);
188        }
189        Some(max / min)
190    }
191
192    /// Per-cgroup analog of [`Self::phase_metric`]: look up `metric` for a named
193    /// `cgroup` in a [`Phase`] (`Phase::BASELINE` / `Phase::step(k)`), via that
194    /// cgroup's per-phase carrier ([`PhaseCgroupStats::get`]), falling back to
195    /// [`PhaseCgroupStats::cgroup_counter`] for the per-cgroup Counters
196    /// `total_migrations`/`total_iterations`/`total_cpu_time_ns`. `None` when the
197    /// phase has no bucket, no carrier for `cgroup`, the carrier carried no finite
198    /// value for the metric, OR the metric is an unregistered dynamic key (a
199    /// built-in id is typo-proof; an unregistered [`crate::stats::MetricId`] has no
200    /// [`crate::stats::MetricId::def`], same as [`Self::phase_metric`]). The
201    /// N-cgroups-to-N-queryable-sets surface on the AssertResult-holding path (the
202    /// in-VM `post_vm` path uses [`crate::vmm::VmResult::phase_cgroup_metric`]).
203    pub fn phase_cgroup_metric(
204        &self,
205        phase: Phase,
206        cgroup: &str,
207        metric: impl Into<crate::stats::MetricId>,
208    ) -> Option<f64> {
209        let metric = metric.into();
210        self.phase(phase)
211            .and_then(|p| p.per_cgroup.get(cgroup))
212            .and_then(|pc| {
213                pc.get(metric.as_str())
214                    .or_else(|| pc.cgroup_counter(metric.as_str()))
215            })
216    }
217
218    /// True iff the scenario produced at least one Step-phase
219    /// bucket (any phase with `step_index >= 1`). False when
220    /// `phases` is empty OR contains only `BASELINE` (the
221    /// pre-first-Step settle window).
222    ///
223    /// Use this to fail a phase-aware assertion BEFORE calling
224    /// [`Self::phase`] with a `Phase::step(k)` on a scenario that
225    /// silently never advanced past BASELINE: a test that declared
226    /// no `Step`s, OR a scenario that bailed in setup before any
227    /// `Step` ran, would otherwise see [`Self::phase`] return
228    /// `None` for every Step and the test would either panic on
229    /// `.expect(...)` or pass vacuously.
230    ///
231    /// ```ignore
232    /// anyhow::ensure!(
233    ///     r.stats.has_steps(),
234    ///     "scenario produced no Step-phase buckets — \
235    ///      declare a Step or read Phase::BASELINE",
236    /// );
237    /// let throughput = r.stats.phase_metric(Phase::step(0), "throughput");
238    /// ```
239    pub fn has_steps(&self) -> bool {
240        self.phases.iter().any(|p| p.step_index >= 1)
241    }
242
243    /// Run-level value for a metric by registry name, for the
244    /// ext-sourced metric family that carries no typed
245    /// `ScenarioStats` field.
246    ///
247    /// Resolves [`Self::ext_metrics`] — the run-level map the
248    /// framework fills post-merge with every metric whose value has no
249    /// typed struct field: the pooled wake-latency / run-delay
250    /// distributions and worst-cgroup iteration efficiencies
251    /// (the `MetricKind::Distribution` / `MetricKind::WorstLowest`
252    /// registry kinds — `worst_p99_wake_latency_us`, `worst_run_delay_us`,
253    /// `worst_iterations_per_cpu_sec`, …), the derived rates
254    /// (`iteration_rate`, and the pooled `iterations_per_cpu_sec` —
255    /// distinct from the `worst_iterations_per_cpu_sec` selector above),
256    /// the per-thread-group `system_time_ns` / `user_time_ns`, and
257    /// `avg_imbalance_ratio` / `avg_dsq_depth`. This is the
258    /// run-level analogue of [`Self::phase_metric`] for that family:
259    /// code holding the run's [`AssertResult`] reads
260    /// `r.stats.run_metric("worst_run_delay_us")` instead of reaching
261    /// into the raw `ext_metrics` map by string key (`ScenarioStats` is
262    /// the [`AssertResult::stats`] field — the value a test body, or a
263    /// callback that builds an `AssertResult` via `collect_all` /
264    /// `execute_scenario`, holds). A `post_vm` callback instead receives
265    /// a `VmResult`, which has NO `stats` field and no run-level
266    /// Distribution surface — compare those cross-run via `cargo ktstr
267    /// perf-delta`.
268    ///
269    /// The ext family is populated only by the `#[ktstr_test]` eval
270    /// flow's post-merge producer
271    /// ([`populate_run_distribution_metrics`]). An `AssertResult` built
272    /// by a DIRECT host assertion (`assert_not_starved` /
273    /// `AssertPlan::assert_cgroup`, which never run that producer)
274    /// carries the per-cgroup values on [`Self::cgroups`] but none of
275    /// these run-level roll-ups, so `run_metric` returns `None` for them
276    /// on that path — read the per-cgroup `CgroupStats` field directly
277    /// (e.g. `r.stats.cgroups[i].p99_wake_latency_us`) there.
278    ///
279    /// Sentinel-free, matching [`Self::phase_metric`]: `None` means
280    /// the metric is absent from this run (no contributing cgroup or
281    /// carrier, or a name not present in the map); `Some(0.0)` is a
282    /// real measured zero. Check [`crate::stats::MetricId::def`] on a dynamic
283    /// key to tell an unregistered key from genuinely-absent data (built-in
284    /// ids always resolve). (The map also carries any
285    /// user-defined extensible-metric keys, plus the framework-internal
286    /// Rate-component Counters — `total_phase_iterations` /
287    /// `total_phase_duration_sec` / `total_iterations_pooled` /
288    /// `total_cpu_time_sec`, the numerator/denominator plumbing behind
289    /// `iteration_rate` / `iterations_per_cpu_sec` — all of which resolve
290    /// here too; prefer the derived rate over its raw components.)
291    ///
292    /// RESOLVED here None-aware, ahead of the ext lookup, via a typed dispatch
293    /// (`typed_sentinel_metric`): the cross-cgroup metrics `worst_spread`,
294    /// `worst_migration_ratio`, `worst_gap_ms`, `total_migrations`,
295    /// `total_iterations`, `worst_page_locality`,
296    /// `worst_cross_node_migration_ratio`. The dispatch RE-DERIVES each from the
297    /// per-cgroup (`self.cgroups`) / per-phase (`self.phases[].per_cgroup`, the
298    /// NUMA metrics) carriers — `None` when no carrier measured it, `Some(0.0)`
299    /// for a measured zero — preserving the sentinel-free contract. The 5
300    /// non-NUMA metrics are 0.0-sentinel typed struct fields (a not-measured
301    /// carrier coerces to 0.0, indistinguishable from a measured zero, so the
302    /// re-derivation recovers the distinction); the two NUMA roll-ups
303    /// (`worst_page_locality`, `worst_cross_node_migration_ratio`) have no struct
304    /// field and re-pool purely from the per-phase NUMA carriers.
305    /// (`worst_wake_latency_tail_ratio` resolves via the ext lookup as the
306    /// `WakeLatencyTailRatio` key.)
307    ///
308    /// NOT resolved here (not in `ext_metrics`, no typed dispatch):
309    /// - the monitor-sourced run-level metrics (`max_imbalance_ratio`,
310    ///   `max_dsq_depth`, `stuck_count`, `total_fallback`,
311    ///   `total_keep_last`), which `ScenarioStats` does not hold
312    ///   run-level — read those per-phase via [`Self::phase_metric`].
313    ///
314    /// So this resolves the ext-sourced family AND the typed cross-cgroup fields
315    /// (via the dispatch); only the monitor-sourced run-level metrics remain
316    /// unresolved here (`ScenarioStats` holds them only per-phase).
317    pub fn run_metric(&self, metric: impl Into<crate::stats::MetricId>) -> Option<f64> {
318        let id = metric.into();
319        // The sentinel-prone cross-cgroup run-level metrics re-derive None-aware
320        // from the per-cgroup / per-phase carriers, recovering the never-measured
321        // vs measured-zero distinction the 0.0-coerced struct fields (or, for
322        // worst_page_locality, the absent field) cannot carry. A non-sentinel id
323        // returns `None` from the dispatch and falls through to the ext-metrics map.
324        if let crate::stats::MetricId::Builtin(b) = &id
325            && let Some(resolved) = self.typed_sentinel_metric(*b)
326        {
327            return resolved;
328        }
329        self.ext_metrics.get(id.as_str()).copied()
330    }
331
332    /// Re-derive a typed run-level metric None-aware from the per-cgroup /
333    /// per-phase carriers — recovering the never-measured (`None`) vs
334    /// measured-zero (`Some(0.0)`) distinction a bare 0.0-sentinel `f64` cannot
335    /// carry. The 0.0-sentinel typed struct fields (`worst_spread`,
336    /// `worst_migration_ratio`, ...) coerce a not-measured carrier to 0.0 in
337    /// `scenario_stats_for_cgroup` (indistinguishable from a measured zero);
338    /// `worst_page_locality` is no longer a struct field at all — it re-pools
339    /// purely from the per-phase NUMA carriers (see below).
340    ///
341    /// Returns `None` when `m` is NOT one of the typed sentinel ids — the caller
342    /// then falls through to the ext-metrics lookup. `Some(None)` when `m` IS a
343    /// sentinel id but no contributing carrier exists (loud-absent). `Some(Some(v))`
344    /// for the re-derived value (a measured zero stays `Some(0.0)`).
345    ///
346    /// The 5 non-NUMA metrics source from `self.cgroups`, whose `Option` / counter
347    /// carriers preserve the measured-vs-unmeasured state. The 2 NUMA roll-ups
348    /// (`worst_page_locality`, `worst_cross_node_migration_ratio` — neither a
349    /// struct field) source from `self.phases[].per_cgroup`, for different
350    /// reasons: `page_locality` is structurally 0.0 on the `cgroup_stats` path
351    /// (it needs an expected-node set the reports-only builder lacks), so
352    /// `self.cgroups` cannot source it; `cross_node_migration_ratio` IS populated
353    /// on `cgroup_stats`, but only as a single pre-folded scalar that cannot carry
354    /// the cross-phase fold (SUM the per-phase migration-counter deltas over the
355    /// LATEST residency total), so it too re-pools from the per-phase
356    /// `PhaseCgroupStats` NUMA counters — which also single-sources it with the
357    /// ext/sidecar value. On a phase-less direct-assertion `AssertResult` (no NUMA
358    /// capture) both NUMA roll-ups read `None` (correct loud-absent).
359    fn typed_sentinel_metric(&self, m: crate::stats::BuiltinMetric) -> Option<Option<f64>> {
360        use crate::stats::BuiltinMetric as B;
361        Some(match m {
362            // worst_spread: highest spread over cgroups that measured it
363            // (CgroupStats::spread is already Option — None when no worker had
364            // measurable wall time). None iff no cgroup measured spread.
365            B::WorstSpread => self
366                .cgroups
367                .iter()
368                .filter_map(|c| c.spread)
369                .reduce(f64::max),
370            // worst_migration_ratio: highest migration ratio over cgroups that
371            // ran iterations (a 0.0 with total_iterations>0 is measured; with
372            // ==0 it is the rate-over-zero sentinel — never-measured).
373            B::WorstMigrationRatio => self
374                .cgroups
375                .iter()
376                .filter(|c| c.total_iterations > 0)
377                .map(|c| c.migration_ratio)
378                .reduce(f64::max),
379            // total_migrations / total_iterations: cross-cgroup SUMs. 0 is a real
380            // measured sum (cgroups ran, zero events); never-measured = no cgroups.
381            B::TotalMigrations => (!self.cgroups.is_empty()).then(|| {
382                self.cgroups
383                    .iter()
384                    .map(|c| c.total_migrations)
385                    .fold(0u64, u64::saturating_add) as f64
386            }),
387            B::TotalIterations => (!self.cgroups.is_empty()).then(|| {
388                self.cgroups
389                    .iter()
390                    .map(|c| c.total_iterations)
391                    .fold(0u64, u64::saturating_add) as f64
392            }),
393            // worst_gap_ms: longest gap over cgroups with workers (gap 0 with
394            // workers = measured "no gap observed"; never-measured = no workers).
395            B::WorstGapMs => self
396                .cgroups
397                .iter()
398                .filter(|c| c.num_workers > 0)
399                .map(|c| c.max_gap_ms as f64)
400                .reduce(f64::max),
401            // worst_page_locality: LOWEST per-cgroup locality (the worst cell) over
402            // cgroups that measured NUMA residency — a measured 0.0 (all pages
403            // off-node) is the worst and WINS the lowest fold. Shared with the
404            // ext/sidecar re-pool via worst_page_locality() so the typed read and
405            // the cross-run comparison value agree (the same fix replaces the
406            // fold_lowest_nonzero sentinel-skip on BOTH surfaces).
407            B::WorstPageLocality => self.worst_page_locality(),
408            // worst_cross_node_migration_ratio: highest cross-node churn ratio over
409            // cgroups that measured NUMA. Shared with the ext/sidecar re-pool via
410            // worst_cross_node_migration_ratio() so the typed read and the cross-run
411            // comparison value agree (the divergence the typed field had).
412            B::WorstCrossNodeMigrationRatio => self.worst_cross_node_migration_ratio(),
413            _ => return None,
414        })
415    }
416
417    /// Aggregate each cgroup's NUMA carriers across `self.phases` into
418    /// `(latest_local, latest_total, summed_migrated)`. The cross-phase fold is
419    /// ASYMMETRIC by counter class: `numa_pages_local` / `numa_pages_total` are
420    /// current-residency GAUGE snapshots (the live `/proc/.../numa_maps`
421    /// residency, recomputed each read), so the LATEST MEASURED phase's value is the run-end placement
422    /// — SUMMING across phases would multiply residency by the phase count (a
423    /// silent over-count); `cross_node_migrated` is a per-phase migration-counter
424    /// DELTA over disjoint intervals, so it SUMS to the run total. The shared
425    /// `latest_total` denominator serves both ratios (page_locality and
426    /// cross_node share one page total), matching `cgroup_stats`, which reads the
427    /// final-snapshot residency. One entry per cgroup that appeared in any phase.
428    ///
429    /// LATEST is None-aware: a later phase carrier that measured NO NUMA pages
430    /// (`numa_pages_total == 0` — an empty backdrop slice, a non-NUMA `WorkType`
431    /// phase, or a zero-worker carrier) does NOT overwrite an earlier MEASURED
432    /// residency. A bare overwrite would zero out the earlier snapshot, and the
433    /// `total > 0` filter in `worst_page_locality()` would then drop the cgroup
434    /// from the worst-pool — silently reporting a better-than-worst run-level
435    /// locality (the same measured-vs-unmeasured discipline the cross-cgroup fold
436    /// applies, extended to the cross-phase axis).
437    fn numa_agg_per_cgroup(&self) -> Vec<(u64, u64, u64)> {
438        let mut by_cg: std::collections::BTreeMap<&str, (u64, u64, u64)> =
439            std::collections::BTreeMap::new();
440        // self.phases is in step (chronological) order, so a later MEASURED
441        // phase's residency snapshot overwrites the earlier (LATEST wins); a
442        // not-measured (total == 0) phase leaves the prior snapshot intact. The
443        // migration deltas accumulate unconditionally (a 0 delta adds nothing).
444        for phase in &self.phases {
445            for (name, pc) in &phase.per_cgroup {
446                let e = by_cg.entry(name.as_str()).or_insert((0, 0, 0));
447                if pc.numa_pages_total > 0 {
448                    e.0 = pc.numa_pages_local;
449                    e.1 = pc.numa_pages_total;
450                }
451                // saturating: guest-runtime migration counter pooled across
452                // phases; never wrap the derived cross-node-migration ratio.
453                e.2 = e.2.saturating_add(pc.cross_node_migrated);
454            }
455        }
456        by_cg.into_values().collect()
457    }
458
459    /// The run-level WORST (lowest) per-cgroup page-locality fraction, re-pooled
460    /// None-aware from the per-phase NUMA carriers: the lowest
461    /// `page_locality_of(latest_local, latest_total)` over cgroups that measured
462    /// NUMA residency (`numa_pages_total > 0`) — a measured 0.0 (all pages
463    /// off-node) WINS the lowest; an all-unmeasured cohort yields `None`. Shared
464    /// by `run_metric`'s `WorstPageLocality` dispatch AND
465    /// `populate_run_distribution_metrics`'s ext/sidecar re-pool so the typed
466    /// read and the cross-run comparison value are byte-identical.
467    fn worst_page_locality(&self) -> Option<f64> {
468        self.numa_agg_per_cgroup()
469            .into_iter()
470            .filter(|&(_, total, _)| total > 0)
471            .map(|(local, total, _)| super::reductions::page_locality_of(local, total))
472            .reduce(f64::min)
473    }
474
475    /// The run-level WORST (highest) per-cgroup cross-node migration-churn ratio,
476    /// re-pooled from the per-phase NUMA carriers: the MAX
477    /// `cross_node_migration_ratio_of(summed_migrated, latest_total)` over cgroups
478    /// that measured NUMA residency (`numa_pages_total > 0`) — an all-unmeasured
479    /// cohort yields `None`. The polarity twin of `worst_page_locality` (LowerBetter
480    /// → highest-wins). Shared by `run_metric`'s `WorstCrossNodeMigrationRatio`
481    /// dispatch AND `populate_run_distribution_metrics`'s ext/sidecar re-pool so the
482    /// typed read and the cross-run comparison value are byte-identical (the typed
483    /// `Gauge(Last)` field diverged from this on multi-phase runs).
484    fn worst_cross_node_migration_ratio(&self) -> Option<f64> {
485        self.numa_agg_per_cgroup()
486            .into_iter()
487            .filter(|&(_, total, _)| total > 0)
488            .map(|(_, total, migrated)| {
489                super::reductions::cross_node_migration_ratio_of(migrated, total)
490            })
491            .reduce(f64::max)
492    }
493}
494
495/// Registry metric names that already have a typed `GauntletRow` field — the
496/// typed accessor populates them at `sidecar_to_row` time and
497/// `MetricDef::read` prefers the accessor over `ext_metrics`, so writing the
498/// same key into `ext_metrics` would create unread sidecar bloat AND
499/// double-source the run-level value. For `stuck_count` the typed whole-run
500/// count (`MonitorSummary::stuck_count`, windowed over the full sample
501/// stream) is authoritative; the per-phase fold sum shares the
502/// `is_cpu_stuck` predicate but is a lower-or-equal (`<=`),
503/// partition-dependent quantity (it drops cross-boundary + out-of-phase
504/// windows, so it falls strictly below once any of those is stuck), so
505/// injecting the ext copy would shadow the authoritative typed value with
506/// a redundant — and, once a dropped window is stuck, divergent —
507/// number. Both run-level ext-metrics populators consult this — the
508/// SampleSeries
509/// path ([`populate_run_ext_metrics`]) and the phase-fold path
510/// ([`populate_run_ext_metrics_from_phases`]) — so only ext-metrics-only
511/// registry entries are written and a typed-backed metric's run-level value
512/// always comes from its accessor. `max_imbalance_ratio` is included because
513/// its accessor reads the typed `GauntletRow.imbalance_ratio` (whole-run
514/// MonitorSummary); its per-phase monitor fold feeds rendering only.
515const TYPED_FIELD_NAMES: &[&str] = &[
516    "max_dsq_depth",
517    "max_imbalance_ratio",
518    "total_fallback",
519    "total_keep_last",
520    "stuck_count",
521    "total_iterations",
522    "total_migrations",
523];
524
525/// Sibling of [`populate_run_ext_metrics`] that mines per-phase
526/// metrics back into the run-level `ext_metrics` map. Closes the
527/// gap for registered metrics whose values live in
528/// `PhaseBucket.metrics` but never reach `ext_metrics` via the
529/// SampleSeries path (their `read_sample` returns `None`):
530/// `avg_imbalance_ratio` (sourced from MonitorSample windowing
531/// inside [`build_phase_buckets`]), `iteration_rate` (sourced from
532/// stimulus event totals inside [`build_phase_buckets_with_stimulus`]),
533/// and `system_time_ns` / `user_time_ns` (per-thread-group CPU-time
534/// deltas injected by `phase_group_cpu_delta` inside
535/// `buckets_from_grouped`). The fold is generic over every key
536/// present on any phase, so it carries any such phase-only metric (the
537/// ext-metrics-only set whose `read_sample` returns `None`). Keys with a
538/// typed `GauntletRow` field (`TYPED_FIELD_NAMES`) are SKIPPED: their
539/// run-level value comes from the typed accessor (which wins on read), so
540/// re-injecting them here would double-source the run aggregate — the
541/// hazard the const's doc describes. Their per-phase `PhaseBucket` value
542/// still feeds per-phase rendering.
543///
544/// Per-phase reduction dispatch is described on [`PhaseBucket`];
545/// the cross-phase fold here uses `sample_count` as the weight so
546/// Gauge(Avg) keys get the weighted mean (the correct cross-phase
547/// semantic for typical-load metrics) while other kinds fold per
548/// their natural reduction. Existing keys in `target` are not
549/// overwritten — `read_sample` path values win when both produced
550/// an entry.
551///
552/// Without this fill, `cargo ktstr perf-delta` silently misses
553/// these phase-only metrics (avg_imbalance_ratio, iteration_rate,
554/// system_time_ns, user_time_ns) in flat-row output because
555/// `MetricDef::read` falls back to ext_metrics and finds nothing.
556pub fn populate_run_ext_metrics_from_phases(
557    phases: &[PhaseBucket],
558    target: &mut std::collections::BTreeMap<String, f64>,
559) {
560    // No early-return on empty `phases`: the derive_rate_metrics post-pass
561    // below must still run over whatever components populate_run_ext_metrics
562    // already inserted into `target` (the empty-phases case), so a run-level
563    // Rate is re-derived rather than silently dropped. The loops below are
564    // no-ops when `phases` is empty.
565    // Collect every metric key that appears on any phase.
566    let mut keys: std::collections::BTreeSet<&String> = std::collections::BTreeSet::new();
567    for phase in phases {
568        for key in phase.metrics.keys() {
569            keys.insert(key);
570        }
571    }
572    for key in keys {
573        if target.contains_key(key) {
574            continue;
575        }
576        let Some(def) = crate::stats::metric_def(key) else {
577            continue;
578        };
579        // Derived metrics (every `is_derived()`: Rate / Distribution / WorstLowest /
580        // WakeLatencyTailRatio / WorstCrossNodeRatio / PerPhase / PerRunDistribution)
581        // are produced
582        // from their pooled components, not folded as per-phase values: skip
583        // here. A Rate re-derives after the loop (Σnum/Σdenom over the folded
584        // components); the distributional kinds (Distribution / WorstLowest /
585        // WakeLatencyTailRatio / WorstCrossNodeRatio) are re-pooled run-level by
586        // `populate_run_distribution_metrics` (and never appear in
587        // phase.metrics anyway); PerPhase is re-derived by `derive_phase_metrics`.
588        // Folding a ready-made derived value would lose
589        // the re-pool, and routing one into aggregate_samples_weighted within
590        // a run is not its producer path.
591        if def.kind.is_derived() {
592            continue;
593        }
594        // Typed-backed keys (those in TYPED_FIELD_NAMES — a typed GauntletRow
595        // accessor that wins on read) must NOT be re-injected into ext_metrics
596        // from the phase fold: the ext copy would be unread bloat and, for
597        // stuck_count (whose per-phase fold sum is `<=` the typed whole-run
598        // count, strictly below once a cross-boundary/out-of-phase window is
599        // stuck — they share the is_cpu_stuck predicate but the run-level
600        // count windows the full stream), a redundant-or-divergent value,
601        // not a guaranteed duplicate. Their per-phase
602        // PhaseBucket value still feeds rendering; the run-level value stays
603        // the typed path. Mirrors the sibling populate_run_ext_metrics.
604        // (Without this, folding max_imbalance_ratio + stuck_count onto
605        // captured buckets would leak both into ext_metrics on the common
606        // path.)
607        if TYPED_FIELD_NAMES.contains(&key.as_str()) {
608            continue;
609        }
610        // avg_nr_running is NOT typed-backed (no GauntletRow accessor), but its
611        // authoritative run-level value is MonitorSummary::avg_nr_running, folded
612        // by fold_run_level_ext. It is the one monitor-summary-fold key that ALSO
613        // appears in per-phase bucket.metrics (fold_monitor_into_bucket writes it
614        // for rendering). Skip it here so the per-phase re-pool never claims the
615        // run-level key: VmResult::run_metric runs this re-pool BEFORE
616        // fold_run_level_ext, and the fold's `or_insert` would then no-op,
617        // silently replacing the whole-run value with the per-phase weighted
618        // mean. Its per-phase PhaseBucket value still feeds rendering +
619        // change-detection. (TYPED_FIELD_NAMES is the typed-accessor analogue of
620        // this same skip rationale.)
621        if key == "avg_nr_running" {
622            continue;
623        }
624        // Per-phase (value, sample_count) for the kind-aware fold.
625        // A phase that doesn't carry the key contributes nothing.
626        // Lock-step shape enforced by the (f64, usize) pair type.
627        // `sample_count.max(1)` is load-bearing for Gauge(Avg) keys: a
628        // synthesized zero-capture phase (the
629        // build_phase_buckets_with_stimulus seam) carrying a
630        // capture-independent Gauge(Avg) value at sample_count==0 gets
631        // weight 1 (one phase observation) rather than being zero-weighted
632        // out of the run-level mean. The floor is a no-op for
633        // Counter/DeltaSum keys, which sum with weights ignored (see
634        // aggregate_finite): iteration_rate's components
635        // total_phase_iterations / total_phase_duration_sec are such
636        // Counters, so a synthesized step's iterations are INCLUDED in the
637        // re-pooled iteration_rate via the sum — the run-aggregate
638        // completion of the per-step rate handling (iteration_rate itself is a
639        // Rate, skipped above and re-derived below). A regression dropping
640        // the floor would silently re-drop a zero-capture step's Gauge(Avg)
641        // value from the sidecar aggregate.
642        let pairs: Vec<(f64, usize)> = phases
643            .iter()
644            .filter_map(|phase| {
645                phase
646                    .metrics
647                    .get(key)
648                    .copied()
649                    .map(|v| (v, phase.sample_count.max(1)))
650            })
651            .collect();
652        if pairs.is_empty() {
653            continue;
654        }
655        // PerPhaseDeltaSum folds cross-PHASE by SUM: the run-level value is the
656        // sum of the disjoint per-phase OBSERVED CPU-time deltas (a lower bound
657        // on total run CPU time — excludes head / tail / inter-phase-gap
658        // windows; see the MetricKind::PerPhaseDeltaSum doc). It is NOT routed
659        // through aggregate_samples_weighted, whose PerPhaseDeltaSum arm
660        // implements the CROSS-RUN unweighted mean — mean-folding the phases
661        // here would double-count duration (the bug this kind fixes). Weights
662        // (phase sample_count) are irrelevant to a sum.
663        if def.kind == crate::stats::MetricKind::PerPhaseDeltaSum {
664            let sum: f64 = pairs.iter().map(|(v, _)| v).sum();
665            target.insert(key.clone(), sum);
666            continue;
667        }
668        if let Some(reduced) = crate::stats::aggregate_samples_weighted(&pairs, def.kind) {
669            target.insert(key.clone(), reduced);
670        }
671    }
672    // Re-derive Rate metrics from the now-folded components so the run
673    // rate is Σnumerator / Σdenominator (the components folded by their
674    // own kinds above — a Counter numerator summed across phases).
675    crate::stats::derive_rate_metrics(target);
676}
677
678/// Run the FULL run-level `ext_metrics` population sequence into `stats` — the
679/// single source of truth shared by the eval layer (`evaluate_vm_result`) and
680/// [`crate::vmm::VmResult::run_metric`], so the two produce byte-identical
681/// run-level ext maps for the same run. Reads `samples` + `stats.phases` +
682/// `stats.cgroups`, writes `stats.ext_metrics`, in the canonical order:
683/// 1. [`populate_run_ext_metrics`] — the `read_sample`-wired registry family
684///    (over every freeze in `samples`), then the whole-run wall + IRQ rates.
685/// 2. [`populate_run_ext_metrics_from_phases`] — the phase-only ext metrics
686///    whose `read_sample` is `None` (`avg_imbalance_ratio`, `iteration_rate`,
687///    `system_time_ns`, `user_time_ns`, the per-CPU IRQ spatial maxes, and the
688///    per-cgroup PSI-irq spatial maxes).
689/// 3. [`populate_run_pooled_iterations_per_cpu_sec`] — the pooled cross-cgroup
690///    `iterations_per_cpu_sec` Rate (from `stats.cgroups`).
691/// 4. [`populate_run_pooled_taobench`] — the whole-run taobench qps + hit Rates
692///    pooled cross-cgroup (from `stats.cgroups[].taobench_whole`).
693/// 5. [`populate_run_pooled_taobench_distribution`] — the taobench whole-run
694///    open-loop serve-latency `*_whole` percentiles (union of the per-phase
695///    per-cgroup serve `PlatStats` histograms, percentile re-derived over the union).
696/// 6. [`populate_run_pooled_schbench`] — the schbench whole-run loop Counter +
697///    role-separate run-delay gate-Rates (from
698///    `stats.phases[].per_cgroup[].schbench` raw pairs, summed over phases+cgroups).
699/// 7. [`populate_run_pooled_schbench_distribution`] — the schbench whole-run
700///    latency/rps `*_whole` percentiles (union of the per-phase per-cgroup
701///    `PlatStats` histograms, percentile re-derived over the union).
702/// 8. [`populate_run_distribution_metrics`] — the `Distribution` / `WorstLowest`
703///    / `WakeLatencyTailRatio` / `WorstCrossNodeRatio` re-pools (from
704///    `stats.phases[].per_cgroup` raw samples + `stats.cgroups`).
705///
706/// ORDER IS LOAD-BEARING: step 1 must precede step 2 so the whole-run wall +
707/// whole-run IRQ-counter deltas land before step 2's `contains_key` skip (the
708/// multi-phase-rate fix); steps 3-8 fold the per-cgroup roll-up after the
709/// per-phase families.
710///
711/// `stats.phases` SHOULD be the PRE-`derive_phase_metrics` fold (the host buckets
712/// with the guest per-cgroup carriers folded in, before the per-phase scalar
713/// derivation) — the exact phase shape the eval layer feeds to step 2, since the
714/// eval path runs `derive_phase_metrics` AFTER this call. Feeding the pre-derive
715/// fold reproduces the eval sequence by construction (eval-faithful). Post-derive
716/// phases (e.g. [`crate::vmm::VmResult::phase_buckets`]) yield the SAME map today —
717/// step 2 skips `is_derived` keys, and every pooled scalar `derive_phase_metrics`
718/// writes to `bucket.metrics` (the schbench / taobench scalars) is
719/// `MetricKind::PerPhase` (which `is_derived`), so step 2 drops them either way —
720/// but the pre-derive fold avoids DEPENDING on that skip: a pooled key ever
721/// registered as non-derived would be folded run-level under post-derive,
722/// diverging from the eval map, but not under pre-derive. Pass the pre-derive
723/// fold ([`crate::vmm::VmResult::run_metric`] uses `phase_buckets_pre_derive`).
724pub fn populate_run_ext_all(
725    stats: &mut ScenarioStats,
726    samples: &crate::scenario::sample::SampleSeries,
727) {
728    populate_run_ext_metrics(samples, &mut stats.ext_metrics);
729    populate_run_ext_metrics_from_phases(&stats.phases, &mut stats.ext_metrics);
730    populate_run_pooled_iterations_per_cpu_sec(stats);
731    populate_run_pooled_taobench(stats);
732    populate_run_pooled_taobench_distribution(stats);
733    populate_run_pooled_schbench(stats);
734    populate_run_pooled_schbench_distribution(stats);
735    populate_run_distribution_metrics(stats);
736}
737
738/// Inject the run-level POOLED `iterations_per_cpu_sec` Rate's two Counter
739/// components into `stats.ext_metrics`, summed across the cgroups that have
740/// measured on-CPU time — the cross-cgroup re-pool axis. Rather than routing
741/// the per-cgroup efficiency through `AssertResult::merge`'s worst-by-polarity
742/// `ext_metrics` fold (which picks the WORST cgroup's value, not Σ, and has
743/// no derive post-pass), this reads the already-merged `stats.cgroups` vec
744/// directly: `iterations_per_cpu_sec` = Σ`total_iterations` /
745/// Σ(`total_cpu_time_ns`/1e9) over cgroups with `total_cpu_time_ns > 0` — the
746/// per-cgroup [`CgroupStats::iterations_per_cpu_sec`] re-pooled, NOT a mean of
747/// per-cgroup ratios, NOT the worst single cgroup.
748///
749/// MUST run at the eval layer AFTER the cgroup-bearing merges (every merge that
750/// contributes a [`CgroupStats`], so `stats.cgroups` holds every per-cgroup
751/// entry) and BEFORE the sidecar write. The trailing monitor-verdict merge at
752/// the eval layer merges an `inconclusive()` carrying empty `stats` (no cgroups,
753/// no ext keys), so it is safe to run after this. If component injection ever
754/// moved BEFORE a cgroup-bearing merge, that worst-by-polarity fold would
755/// min/max these Counter keys into single-cgroup scalars, silently corrupting
756/// the pooled sum.
757///
758/// A cgroup with `total_cpu_time_ns == 0` (schedstat unavailable, or
759/// `num_workers == 0`) is EXCLUDED from BOTH sums — mirroring the per-cgroup
760/// [`CgroupStats::iterations_per_cpu_sec`] None-on-zero (`total_cpu_time_ns >
761/// 0` implies `num_workers > 0`, so the one predicate covers both). Crediting
762/// an unmeasured cgroup's iterations against the measured cgroups' CPU-seconds
763/// would overstate cohort efficiency — the silent-wrong-answer this gate
764/// prevents. Both components are inserted both-or-neither (the
765/// `derive_rate_metrics` co-location invariant), only when the summed MEASURED
766/// on-CPU time is > 0 (every cgroup unmeasured ⇒ no rate). The ns→s `/1e9` is
767/// applied ONCE here on the summed ns (not per-cgroup, to avoid repeated float
768/// rounding), since `derive_rate_metrics` is a bare num/den.
769/// `total_iterations_pooled` is a DISTINCT ext-only key, not the typed
770/// `total_iterations` (skipped from ext_metrics; it folds cross-RUN as a MEAN
771/// — a display average — while a Rate numerator must SUM-fold so Σnum/Σdenom
772/// re-pools, so one shared key cannot carry both folds). Because it sums only
773/// MEASURED cgroups, it is ≤ the merge-summed typed `total_iterations` (which
774/// includes any zero-cpu-time cgroups), and equals it unless an excluded
775/// zero-cpu-time cgroup carried iterations>0.
776pub fn populate_run_pooled_iterations_per_cpu_sec(stats: &mut ScenarioStats) {
777    // Exclude cgroups with no measured on-CPU time from BOTH sums (mirrors the
778    // per-cgroup None-on-zero): crediting an unmeasured cgroup's iterations
779    // against the measured cgroups' CPU-seconds would overstate efficiency.
780    // saturating fold: pool the guest-runtime cpu-time-ns / iteration counters
781    // across cgroups; a plain `.sum()` would debug-panic / release-wrap on a
782    // corrupt/hostile component, silently corrupting iterations_per_cpu_sec.
783    let summed_ns: u64 = stats
784        .cgroups
785        .iter()
786        .filter(|c| c.total_cpu_time_ns > 0)
787        .map(|c| c.total_cpu_time_ns)
788        .fold(0u64, u64::saturating_add);
789    if summed_ns == 0 {
790        return;
791    }
792    let summed_iters: u64 = stats
793        .cgroups
794        .iter()
795        .filter(|c| c.total_cpu_time_ns > 0)
796        .map(|c| c.total_iterations)
797        .fold(0u64, u64::saturating_add);
798    stats
799        .ext_metrics
800        .insert("total_iterations_pooled".to_string(), summed_iters as f64);
801    stats
802        .ext_metrics
803        .insert("total_cpu_time_sec".to_string(), summed_ns as f64 / 1e9);
804    crate::stats::derive_rate_metrics(&mut stats.ext_metrics);
805}
806
807/// Inject the whole-run taobench engine's qps + hit Rate components into
808/// `stats.ext_metrics`, pooled across the run's `WorkType::Taobench` cgroups.
809/// Each cgroup carries its workers' merged whole-run aggregate
810/// ([`crate::assert::CgroupStats::taobench_whole`]); this folds those across
811/// cgroups (Σ ops, MAX wall window — the window is shared by the concurrent
812/// cohorts, per `TaobenchStats::merge`) and writes the six `total_taobench_*`
813/// Counter components (`ops`, `fast_ops`, `slow_ops`, `wall_sec`, plus the
814/// command-time `get_cmds` / `get_hits`), from which
815/// `crate::stats::derive_rate_metrics` derives `taobench_total_ops_per_sec`,
816/// `taobench_fast_ops_per_sec`, `taobench_slow_ops_per_sec`, the response-time
817/// `taobench_hit_fraction` (Σfast/Σcompleted), and the command-time
818/// `taobench_command_hit_rate` (Σhits/Σcmds). The whole-run Rate keys are
819/// registered METRICS, so — unlike the per-phase `taobench_*_qps`
820/// (`MetricKind::PerPhase`, invisible to the whole-run cross-run fold) — they
821/// reach the perf-delta `--noise-adjust` spread analysis. The open-loop
822/// serve-latency distribution is a SEPARATE pool
823/// ([`populate_run_pooled_taobench_distribution`], the `*_us_whole` keys).
824///
825/// MUST run post-`merge` (after every cgroup-bearing merge has populated
826/// `stats.cgroups`), exactly like [`populate_run_pooled_iterations_per_cpu_sec`]:
827/// an earlier run would pool over an incomplete cgroup set. A run with no
828/// Taobench cgroup writes nothing (the pool is `None`) — the keys stay absent,
829/// keeping a non-taobench run distinct from a measured zero.
830///
831/// Both-or-neither (the `derive_rate_metrics` co-location invariant): all six
832/// components are inserted together, gated on a measured wall window
833/// (`elapsed_ns > 0`) — the qps denominator. The three per-second Rates then
834/// derive unconditionally (wall_sec > 0); `taobench_hit_fraction` =
835/// `total_taobench_fast_ops` / `total_taobench_ops` derives iff ops completed
836/// (`total_taobench_ops > 0`) and `taobench_command_hit_rate` =
837/// `total_taobench_get_hits` / `total_taobench_get_cmds` iff lookups issued
838/// (`get_cmds > 0`); `derive_rate_metrics` skips a zero-denominator rate, so a
839/// window-but-no-ops run gets qps=0 keys but no false hit fraction / hit rate.
840/// The ns→s `/1e9` is applied ONCE here (not in `derive_rate_metrics`, a bare
841/// num/den), mirroring `total_cpu_time_sec`. Cross-RUN the components SUM-fold
842/// (Counter), so each Rate re-pools as Σnumerator / Σdenominator over the cohort
843/// — the aggregate throughput / hit rate, not a mean of per-run values.
844pub fn populate_run_pooled_taobench(stats: &mut ScenarioStats) {
845    use crate::stats::{
846        TOTAL_TAOBENCH_FAST_OPS, TOTAL_TAOBENCH_GET_CMDS, TOTAL_TAOBENCH_GET_HITS,
847        TOTAL_TAOBENCH_OPS, TOTAL_TAOBENCH_SLOW_OPS, TOTAL_TAOBENCH_WALL_SEC,
848    };
849    // Pool the per-cgroup whole-run aggregates across the run's Taobench cgroups:
850    // Σ ops, MAX wall window (shared by the concurrent cohorts). `None` when no
851    // cgroup ran a Taobench worker.
852    let pooled = stats
853        .cgroups
854        .iter()
855        .filter_map(|c| c.taobench_whole.as_ref())
856        .fold(
857            None,
858            |acc: Option<crate::workload::taobench::run::TaobenchStats>, t| {
859                Some(match acc {
860                    Some(mut a) => {
861                        a.merge(t);
862                        a
863                    }
864                    None => *t,
865                })
866            },
867        );
868    let Some(w) = pooled else {
869        return;
870    };
871    let c = &w;
872    // qps is undefined without a measured wall window; write no components (so
873    // hit_fraction stays absent too) rather than a 0/0 rate.
874    if c.elapsed_ns == 0 {
875        return;
876    }
877    stats
878        .ext_metrics
879        .insert(TOTAL_TAOBENCH_OPS.to_string(), c.total_ops() as f64);
880    stats
881        .ext_metrics
882        .insert(TOTAL_TAOBENCH_FAST_OPS.to_string(), c.fast_ops as f64);
883    stats
884        .ext_metrics
885        .insert(TOTAL_TAOBENCH_SLOW_OPS.to_string(), c.slow_ops as f64);
886    stats.ext_metrics.insert(
887        TOTAL_TAOBENCH_WALL_SEC.to_string(),
888        c.elapsed_ns as f64 / 1e9,
889    );
890    // Command-time hit components: hits = cmds − misses (request-time). The Rate
891    // taobench_command_hit_rate = Σhits / Σcmds re-derives via derive_rate_metrics
892    // (skipped, hence absent, when no lookups issued). Diverges from the
893    // response-time taobench_hit_fraction under open-loop arrival.
894    stats
895        .ext_metrics
896        .insert(TOTAL_TAOBENCH_GET_CMDS.to_string(), c.get_cmds as f64);
897    stats.ext_metrics.insert(
898        TOTAL_TAOBENCH_GET_HITS.to_string(),
899        c.get_cmds.saturating_sub(c.get_misses) as f64,
900    );
901    crate::stats::derive_rate_metrics(&mut stats.ext_metrics);
902}
903
904/// Inject the taobench WHOLE-RUN open-loop serve-latency percentiles into
905/// `stats.ext_metrics` as the `taobench_serve_*_us_whole` keys, re-pooled
906/// run-level by UNIONING the per-phase per-cgroup serve `PlatStats` histograms
907/// (`stats.phases[].per_cgroup[].taobench.serve_lat`) across every step-attributed
908/// phase and every cgroup, then re-deriving each percentile / min / max over the
909/// merged histogram (`PlatStats::combine` is an associative bucket-count add, so
910/// the merged histogram is the faithful pooled sample set and the re-derived
911/// percentile is the percentile OF the union — NOT a mean of per-source
912/// percentiles). The taobench analog of
913/// [`populate_run_pooled_schbench_distribution`], with the same source: the
914/// BASELINE (epoch 0) and inter-step-gap (`u32::MAX`) epochs are excluded (they
915/// are dropped from `stats.phases` by `expand_backdrop_phase_buckets`), so this is
916/// the steady-state serve distribution over the measured steps — a faithful
917/// PerRunDistribution, distinct from the standalone driver's full-run histogram.
918///
919/// Runs in [`populate_run_ext_all`] (post-merge); reads the per-phase carriers
920/// (disjoint from the taobench counter pool) and writes distinct `*_whole` keys,
921/// so it is order-independent. Keys are written only when the merged histogram
922/// has samples (`sample_count() > 0`) — a closed-loop run (no serve samples)
923/// reads ABSENT, never a false 0. The keys are `MetricKind::PerRunDistribution`:
924/// noise-compared per-run by `crate::stats::noise_findings`, NEVER cross-run
925/// folded (a percentile of a union is not a mean of per-run percentiles, and the
926/// per-phase histograms are dropped at the cross-run boundary).
927pub fn populate_run_pooled_taobench_distribution(stats: &mut ScenarioStats) {
928    use crate::stats::{
929        TAOBENCH_SERVE_MAX_US_WHOLE, TAOBENCH_SERVE_MIN_US_WHOLE, TAOBENCH_SERVE_P50_US_WHOLE,
930        TAOBENCH_SERVE_P90_US_WHOLE, TAOBENCH_SERVE_P99_US_WHOLE, TAOBENCH_SERVE_P999_US_WHOLE,
931    };
932    use crate::workload::schbench::plat::{Pct, PlatStats};
933
934    // Union the per-phase per-cgroup serve histograms across the whole run.
935    let mut serve = PlatStats::default();
936    for phase in &stats.phases {
937        for pc in phase.per_cgroup.values() {
938            if let Some(t) = pc.taobench.as_ref() {
939                serve.combine(&t.serve_lat);
940            }
941        }
942    }
943    if serve.sample_count() == 0 {
944        return;
945    }
946    let q = serve.percentiles();
947    stats.ext_metrics.insert(
948        TAOBENCH_SERVE_P50_US_WHOLE.to_string(),
949        q.value_at(Pct::P50) as f64,
950    );
951    stats.ext_metrics.insert(
952        TAOBENCH_SERVE_P90_US_WHOLE.to_string(),
953        q.value_at(Pct::P90) as f64,
954    );
955    stats.ext_metrics.insert(
956        TAOBENCH_SERVE_P99_US_WHOLE.to_string(),
957        q.value_at(Pct::P99) as f64,
958    );
959    stats.ext_metrics.insert(
960        TAOBENCH_SERVE_P999_US_WHOLE.to_string(),
961        q.value_at(Pct::P999) as f64,
962    );
963    stats
964        .ext_metrics
965        .insert(TAOBENCH_SERVE_MIN_US_WHOLE.to_string(), q.min as f64);
966    stats
967        .ext_metrics
968        .insert(TAOBENCH_SERVE_MAX_US_WHOLE.to_string(), q.max as f64);
969}
970
971/// Inject the schbench whole-run Class-3 metrics — the loop Counter and the
972/// role-separate run-delay gate-Rate components — into `stats.ext_metrics`,
973/// summed across EVERY phase and EVERY cgroup from the per-phase
974/// `SchbenchPhaseStats` raw pairs (`stats.phases[].per_cgroup[].schbench`). The
975/// raw `(run_delay_ns, pcount)` pairs and `loop_count` are integer and
976/// associative, so summing across phases+cgroups gives the run-level totals; the
977/// two `*_run_delay_ns_per_sched` Rates then re-derive Σrun_delay/Σpcount (the
978/// sample-weighted per-schedule mean — NOT a mean of per-run means). The MESSAGE
979/// and WORKER thread roles pool SEPARATELY (different per-schedule wait
980/// populations — never cross-pool).
981///
982/// Runs in [`populate_run_ext_all`] (post-merge, after
983/// [`populate_run_pooled_taobench`]); reads the per-phase carriers (a disjoint
984/// source from the iterations/taobench pools) and writes distinct
985/// `total_schbench_*` / `schbench_*_run_delay_ns_per_sched` keys, so it is
986/// order-independent. A run with no schbench carrier writes nothing (keys stay
987/// absent — a non-schbench run is distinct from a measured zero).
988///
989/// Both-or-neither PER ROLE: each role's two Counter components are inserted only
990/// when that role was scheduled (`pcount > 0`), so `derive_rate_metrics` yields
991/// the role's gate-Rate iff it ran (never a 0/0); the two roles are independent
992/// (a worker-only run emits only the worker Rate). `total_schbench_loops` is
993/// always written when any schbench carrier ran (0 is a measured zero). The
994/// per-phase `sched_delay_msg/worker_us` is the SAME Σrun_delay_ns/Σpcount
995/// per-schedule mean at phase scope (NOT schbench's native mean-of-per-thread-
996/// means, a separate whole-run SchbenchResult stat) and stays PerPhase
997/// display-only — only these Rates gate; no double-count. Cross-RUN the
998/// components SUM-fold (Counter), so each Rate re-pools Σrun_delay/Σpcount.
999pub fn populate_run_pooled_schbench(stats: &mut ScenarioStats) {
1000    use crate::stats::{
1001        TOTAL_SCHBENCH_LOOPS, TOTAL_SCHBENCH_MSG_PCOUNT, TOTAL_SCHBENCH_MSG_RUN_DELAY_NS,
1002        TOTAL_SCHBENCH_WORKER_PCOUNT, TOTAL_SCHBENCH_WORKER_RUN_DELAY_NS,
1003    };
1004    let mut msg_run_delay_ns: u64 = 0;
1005    let mut msg_pcount: u64 = 0;
1006    let mut worker_run_delay_ns: u64 = 0;
1007    let mut worker_pcount: u64 = 0;
1008    let mut loops: u64 = 0;
1009    let mut any = false;
1010    for phase in &stats.phases {
1011        for pc in phase.per_cgroup.values() {
1012            if let Some(s) = pc.schbench.as_ref() {
1013                any = true;
1014                // saturating: guest-runtime run-delay-ns / pcount / loop
1015                // counters pooled across phases+cgroups (matches the already-
1016                // saturating SchbenchPhaseStats::merge); never wrap a gate-Rate.
1017                msg_run_delay_ns = msg_run_delay_ns.saturating_add(s.msg_run_delay_ns);
1018                msg_pcount = msg_pcount.saturating_add(s.msg_pcount);
1019                worker_run_delay_ns = worker_run_delay_ns.saturating_add(s.worker_run_delay_ns);
1020                worker_pcount = worker_pcount.saturating_add(s.worker_pcount);
1021                loops = loops.saturating_add(s.loop_count);
1022            }
1023        }
1024    }
1025    if !any {
1026        return;
1027    }
1028    stats
1029        .ext_metrics
1030        .insert(TOTAL_SCHBENCH_LOOPS.to_string(), loops as f64);
1031    if msg_pcount > 0 {
1032        stats.ext_metrics.insert(
1033            TOTAL_SCHBENCH_MSG_RUN_DELAY_NS.to_string(),
1034            msg_run_delay_ns as f64,
1035        );
1036        stats
1037            .ext_metrics
1038            .insert(TOTAL_SCHBENCH_MSG_PCOUNT.to_string(), msg_pcount as f64);
1039    }
1040    if worker_pcount > 0 {
1041        stats.ext_metrics.insert(
1042            TOTAL_SCHBENCH_WORKER_RUN_DELAY_NS.to_string(),
1043            worker_run_delay_ns as f64,
1044        );
1045        stats.ext_metrics.insert(
1046            TOTAL_SCHBENCH_WORKER_PCOUNT.to_string(),
1047            worker_pcount as f64,
1048        );
1049    }
1050    crate::stats::derive_rate_metrics(&mut stats.ext_metrics);
1051}
1052
1053/// Inject the schbench whole-run DISTRIBUTIONAL metrics (the wakeup /
1054/// request latency percentiles + min/max and the achieved-rps percentiles) into
1055/// `stats.ext_metrics` as the `*_whole` keys, re-pooled run-level by UNIONING the
1056/// per-phase per-cgroup `PlatStats` histograms
1057/// (`stats.phases[].per_cgroup[].schbench.{wakeup,request,rps}`) across EVERY
1058/// phase and EVERY cgroup, then re-deriving each percentile / min / max over the
1059/// merged histogram. `PlatStats::combine` is an associative bucket-count add, so
1060/// the merged histogram is the FAITHFUL union and the re-derived percentile is
1061/// the percentile OF the pooled sample set — NOT a mean of per-phase / per-cgroup
1062/// percentiles (the percentile operator is non-linear). This is the schbench
1063/// histogram analog of [`populate_run_distribution_metrics`]'s raw-sample union.
1064///
1065/// Runs in [`populate_run_ext_all`] (post-merge, after
1066/// [`populate_run_pooled_schbench`]); reads the per-phase carriers (disjoint from
1067/// the iterations / taobench / schbench loop/run-delay pools) and writes distinct
1068/// `*_whole` keys, so it is order-independent. Each stream's keys are written
1069/// only when its merged histogram has samples (`sample_count() > 0`) — a stream
1070/// with no samples (e.g. a sub-1s run with no rps samples) reads ABSENT, never a
1071/// false 0 (mirrors the per-phase `write_schbench_scalars` gating and the
1072/// carrier-less graceful degradation of [`populate_run_distribution_metrics`]).
1073///
1074/// The `*_whole` keys are `crate::stats::MetricKind::PerRunDistribution`:
1075/// noise-compared per-run by `crate::stats::noise_findings` (each run's own p99),
1076/// NEVER cross-RUN folded (a percentile of a union is not a mean of per-run
1077/// percentiles, and the per-phase histograms are dropped at the cross-run
1078/// boundary), so they are gated out of the cross-RUN ext fold and the within-run
1079/// reducers (`is_derived`). Distinct names from the per-phase percentile keys
1080/// (one registry name = one kind), produced solely here.
1081pub fn populate_run_pooled_schbench_distribution(stats: &mut ScenarioStats) {
1082    use crate::stats::{
1083        SCHBENCH_REQUEST_MAX_US_WHOLE, SCHBENCH_REQUEST_MIN_US_WHOLE,
1084        SCHBENCH_REQUEST_P50_US_WHOLE, SCHBENCH_REQUEST_P90_US_WHOLE,
1085        SCHBENCH_REQUEST_P99_US_WHOLE, SCHBENCH_REQUEST_P999_US_WHOLE, SCHBENCH_RPS_MAX_WHOLE,
1086        SCHBENCH_RPS_MIN_WHOLE, SCHBENCH_RPS_P20_WHOLE, SCHBENCH_RPS_P50_WHOLE,
1087        SCHBENCH_RPS_P90_WHOLE, SCHBENCH_WAKEUP_MAX_US_WHOLE, SCHBENCH_WAKEUP_MIN_US_WHOLE,
1088        SCHBENCH_WAKEUP_P50_US_WHOLE, SCHBENCH_WAKEUP_P90_US_WHOLE, SCHBENCH_WAKEUP_P99_US_WHOLE,
1089        SCHBENCH_WAKEUP_P999_US_WHOLE,
1090    };
1091    use crate::workload::schbench::plat::{Pct, PlatStats};
1092
1093    // Union the per-stream histograms across ALL phases+cgroups (combine =
1094    // associative bucket-count add → the faithful pooled histogram).
1095    let mut wakeup = PlatStats::default();
1096    let mut request = PlatStats::default();
1097    let mut rps = PlatStats::default();
1098    for phase in &stats.phases {
1099        for pc in phase.per_cgroup.values() {
1100            if let Some(s) = pc.schbench.as_ref() {
1101                wakeup.combine(&s.wakeup);
1102                request.combine(&s.request);
1103                rps.combine(&s.rps);
1104            }
1105        }
1106    }
1107    // Latency streams: 4 percentiles + min/max, re-derived over the union, µs.
1108    if wakeup.sample_count() > 0 {
1109        let q = wakeup.percentiles();
1110        stats.ext_metrics.insert(
1111            SCHBENCH_WAKEUP_P50_US_WHOLE.to_string(),
1112            q.value_at(Pct::P50) as f64,
1113        );
1114        stats.ext_metrics.insert(
1115            SCHBENCH_WAKEUP_P90_US_WHOLE.to_string(),
1116            q.value_at(Pct::P90) as f64,
1117        );
1118        stats.ext_metrics.insert(
1119            SCHBENCH_WAKEUP_P99_US_WHOLE.to_string(),
1120            q.value_at(Pct::P99) as f64,
1121        );
1122        stats.ext_metrics.insert(
1123            SCHBENCH_WAKEUP_P999_US_WHOLE.to_string(),
1124            q.value_at(Pct::P999) as f64,
1125        );
1126        stats
1127            .ext_metrics
1128            .insert(SCHBENCH_WAKEUP_MIN_US_WHOLE.to_string(), q.min as f64);
1129        stats
1130            .ext_metrics
1131            .insert(SCHBENCH_WAKEUP_MAX_US_WHOLE.to_string(), q.max as f64);
1132    }
1133    if request.sample_count() > 0 {
1134        let q = request.percentiles();
1135        stats.ext_metrics.insert(
1136            SCHBENCH_REQUEST_P50_US_WHOLE.to_string(),
1137            q.value_at(Pct::P50) as f64,
1138        );
1139        stats.ext_metrics.insert(
1140            SCHBENCH_REQUEST_P90_US_WHOLE.to_string(),
1141            q.value_at(Pct::P90) as f64,
1142        );
1143        stats.ext_metrics.insert(
1144            SCHBENCH_REQUEST_P99_US_WHOLE.to_string(),
1145            q.value_at(Pct::P99) as f64,
1146        );
1147        stats.ext_metrics.insert(
1148            SCHBENCH_REQUEST_P999_US_WHOLE.to_string(),
1149            q.value_at(Pct::P999) as f64,
1150        );
1151        stats
1152            .ext_metrics
1153            .insert(SCHBENCH_REQUEST_MIN_US_WHOLE.to_string(), q.min as f64);
1154        stats
1155            .ext_metrics
1156            .insert(SCHBENCH_REQUEST_MAX_US_WHOLE.to_string(), q.max as f64);
1157    }
1158    // RPS stream: PLIST_FOR_RPS = 20/50/90 + min/max (the schbench rps table).
1159    if rps.sample_count() > 0 {
1160        let r = rps.percentiles();
1161        stats.ext_metrics.insert(
1162            SCHBENCH_RPS_P20_WHOLE.to_string(),
1163            r.value_at(Pct::P20) as f64,
1164        );
1165        stats.ext_metrics.insert(
1166            SCHBENCH_RPS_P50_WHOLE.to_string(),
1167            r.value_at(Pct::P50) as f64,
1168        );
1169        stats.ext_metrics.insert(
1170            SCHBENCH_RPS_P90_WHOLE.to_string(),
1171            r.value_at(Pct::P90) as f64,
1172        );
1173        stats
1174            .ext_metrics
1175            .insert(SCHBENCH_RPS_MIN_WHOLE.to_string(), r.min as f64);
1176        stats
1177            .ext_metrics
1178            .insert(SCHBENCH_RPS_MAX_WHOLE.to_string(), r.max as f64);
1179    }
1180}
1181
1182/// Populate run-level DERIVED distributional metrics into
1183/// `stats.ext_metrics`: every registered `MetricKind::Distribution`,
1184/// `MetricKind::WorstLowest`, `MetricKind::WakeLatencyTailRatio`, and
1185/// `MetricKind::WorstCrossNodeRatio`. This is the SOLE
1186/// within-run producer of those metrics' values — they carry no per-phase
1187/// sample slice and no cross-cgroup merge fold, and their registry accessors
1188/// are `|_| None`, so `MetricDef::read` reads the value
1189/// written here from `ext_metrics`.
1190///
1191/// DISTRIBUTION (the 5 wake / run-delay aggregates): pools the RAW sample
1192/// vectors held in `stats.phases[].per_cgroup` across EVERY phase and EVERY
1193/// cgroup into one combined set, then recomputes the percentile / CV / mean
1194/// / extreme over it — the statistic of the union, NOT a max or mean of
1195/// per-cgroup reductions (the percentile of a union is not the max of
1196/// per-source percentiles). The ns→µs scale is applied ONCE here (the
1197/// carriers store raw ns, per [`PhaseCgroupStats::run_delays_ns`]). The wake
1198/// pool is population-WEIGHTED: each phase carrier's samples carry weight
1199/// `wake_sample_total / wake_latencies_ns.len()`, so a phase whose reservoir
1200/// hit the cap contributes by true population, not capped length (the
1201/// cross-PHASE de-skew) — reduced via the weighted percentile / moments.
1202/// The run-delay pool is unweighted (per-worker, never reservoir-capped, so
1203/// length IS population). Below the wake cap every weight is 1.0, so the
1204/// weighted P99 / median / mean / worst are byte-identical to the unweighted
1205/// concat; the weighted CV matches only within ~1e-9 (it sums the mean in f64
1206/// where the unweighted path sums in u64 — a weighted variance cannot keep the
1207/// u64 sum).
1208///
1209/// CARRIER-LESS FOLD (graceful degradation): a cgroup whose raw samples are
1210/// NOT in the pool — a backdrop epoch that fell on BASELINE or the
1211/// inter-step gap (no paired host bucket, so no carrier) or a cgroup whose
1212/// carrier was stripped/empty (`strip_phase_cgroup_samples`) — is NOT
1213/// dropped. Its
1214/// surviving per-cgroup [`CgroupStats`] reduction folds worst-wins (max — every
1215/// Distribution metric is `LowerBetter`, registry-gated) into the pooled value.
1216/// The CgroupStats reductions are never stripped — `stats.cgroups[]` is the
1217/// already-reduced `cgroup_stats(reports)` output, a SEPARATE reduction path
1218/// from the per-phase carriers — so a carrier-less cgroup always has a source.
1219/// When EVERY carrier is empty (a fully-stripped run) the pool is empty and the
1220/// result degenerates to the max over every cgroup's reduction — the pre-Item-7
1221/// cross-cgroup max. NOTE the value CLASS of a folded cgroup differs from a
1222/// pooled one for the P99 / Median / Mean / CV reductions: a pooled cgroup
1223/// contributes to the percentile of the union; a carrier-less cgroup
1224/// contributes its per-cgroup reduction worst-wins (a worst-cgroup proxy, not
1225/// pooled). For the `SampleReduction::Worst` reduction the two COINCIDE
1226/// (max-of-union == max-of-per-cgroup-maxes), so the carrier-less fold is exact
1227/// there, not a proxy. A second asymmetry specific to CV (from the population
1228/// weighting): the POOLED CV divides variance/mean by Σ per-sample weights (the
1229/// reconstructed population), while a carrier-less cgroup's folded CV is
1230/// [`cgroup_stats`]'s UNWEIGHTED CV (`n = all_latencies.len()`). The two
1231/// coincide below the cap (all weights 1.0) and diverge above it; the mix is
1232/// sound — a carrier-less cgroup has no per-phase weight data to
1233/// population-weight (its carrier is absent by definition), and both feed the
1234/// same LowerBetter worst-wins max. Backdrop step-phase carriers now join
1235/// the pool directly (per-epoch expansion in `collect_handles`); only the
1236/// carrier-less cases above fold worst-wins.
1237///
1238/// WORSTLOWEST (the 2 iteration efficiencies): the lowest (worst) cgroup's
1239/// efficiency, computed per-cgroup from the `stats.cgroups[]` COUNTERS via
1240/// [`CgroupStats::iterations_per_worker`] / [`CgroupStats::iterations_per_cpu_sec`]
1241/// and the None-aware lowest-wins fold (a measured `Some(0.0)` — starvation
1242/// — wins; a no-data `None` is skipped; an all-`None` cohort writes no key,
1243/// preserving absence as a missing ext entry rather than a `0.0`). The
1244/// counters survive stripping, so WorstLowest needs no fallback branch.
1245///
1246/// Runs post-merge at the eval layer beside
1247/// [`populate_run_pooled_iterations_per_cpu_sec`], AFTER the per-cgroup
1248/// carriers are folded into `stats.phases` and BEFORE the sidecar write, so
1249/// `stats.phases[].per_cgroup` is fully merged and `stats.cgroups` is the
1250/// final per-cgroup roll-up.
1251pub fn populate_run_distribution_metrics(stats: &mut ScenarioStats) {
1252    // Pool the per-phase per-cgroup raw sample vectors across every phase and
1253    // cgroup ONCE for the Distribution PRIMARY path, then sort so the
1254    // percentile reductions can index directly. `wake_latencies_ns` is
1255    // per-WAKEUP (reservoir-capped at MAX_WAKE_SAMPLES on the carrier because
1256    // it can reach 100k); `run_delays_ns` is per-WORKER (one sample/worker, not
1257    // capped), so the run-delay pool is total-workers × phases — genuinely
1258    // small. The wake pool is NOT intrinsically small: it is the union of the
1259    // per-carrier wake vectors, num_carriers × MAX_WAKE_SAMPLES worst case, so
1260    // its size is bounded by the upstream 16 MiB bulk-frame cap on the arriving
1261    // carriers (strip_phase_cgroup_samples is the overflow lever) rather than by
1262    // being tiny — no OOM risk, no cap needed here. Both are transient: reduced
1263    // to scalars here, never re-serialized.
1264    // Wake samples carry a per-sample population WEIGHT (`wake_sample_total /
1265    // reservoir len`) so a >cap phase contributes in proportion to its true
1266    // population, not its guest-capped length (the cross-PHASE de-skew). Run-delay
1267    // samples are per-worker and never reservoir-capped (no `*_sample_total`), so
1268    // their length IS their population — pooled unweighted.
1269    let mut wake_pool: Vec<(u64, f64)> = Vec::new();
1270    // Distinct timer-latency pool, population-WEIGHTED like
1271    // wake_pool (reservoir-capped, so a >cap phase carries weight
1272    // timer_sample_total/len for the cross-phase de-skew).
1273    let mut timer_pool: Vec<(u64, f64)> = Vec::new();
1274    let mut run_delay_pool: Vec<u64> = Vec::new();
1275    // Names of cgroups that contributed NON-EMPTY samples to each pool. A
1276    // cgroup absent here — a backdrop epoch that fell on BASELINE / the
1277    // inter-step gap (no paired host bucket, so no carrier) or a
1278    // stripped/empty carrier — is NOT dropped from the run-level
1279    // Distribution: the re-pool folds its surviving per-cgroup CgroupStats
1280    // reduction worst-wins (see `populate_run_distribution_metrics_from`).
1281    // Backdrop step-phase carriers now join the pool directly (per-epoch
1282    // expansion in collect_handles), so a step-matched backdrop epoch pools
1283    // rather than worst-wins-folds.
1284    //
1285    // The fallback dedup keys on cgroup NAME (a `stats.cgroups` entry whose
1286    // name is in `*_carriers` is pooled, not reduction-folded), which assumes
1287    // carrier-bearing and carrier-less cgroup names are DISJOINT. That holds
1288    // WITHIN one step's collect (cgroupfs path uniqueness — two live cgroups
1289    // cannot share a name, mkdir would EEXIST — and a single collect_handles
1290    // call attaches carriers to all its handles or none). It does NOT hold
1291    // across STEPS: `AssertResult::merge` extends `stats.cgroups` per
1292    // (handle, step), so a name that carried samples at step k recurs at step
1293    // k+1, and the step-(k+1) entry is skipped by this dedup (its name is in
1294    // `*_carriers`). That only OMITS a contribution, never vanishes the metric
1295    // (the step-k pool still produces it). A skipped step-(k+1) entry whose
1296    // carrier is merely EMPTY (collected no samples) is harmless: its per-cgroup
1297    // reduction is the trivial zero a worst-wins f64::max ignores. The only
1298    // LOSSY case is a step-(k+1) entry STRIPPED of live samples while step k
1299    // survives, and that cannot arise today: `strip_phase_cgroup_samples` strips
1300    // RUN-WIDE (every phase at once), so a run is never partially stripped per
1301    // step. A backdrop name now enters `*_carriers` (pooled once via its
1302    // per-epoch expansion) so it is skipped from the reduction-fold — and a
1303    // backdrop and a step-local cgroup cannot share a live name (cgroupfs
1304    // mkdir EEXIST; a backdrop is live the whole scenario), so each
1305    // stats.cgroups entry still contributes via exactly one of {pool,
1306    // reduction-fold} — no double count.
1307    let mut wake_carriers: std::collections::BTreeSet<&str> = std::collections::BTreeSet::new();
1308    let mut timer_carriers: std::collections::BTreeSet<&str> = std::collections::BTreeSet::new();
1309    let mut run_delay_carriers: std::collections::BTreeSet<&str> =
1310        std::collections::BTreeSet::new();
1311    for phase in &stats.phases {
1312        for (cgname, pcg) in &phase.per_cgroup {
1313            if !pcg.wake_latencies_ns.is_empty() {
1314                // Per-sample weight = true population / surviving reservoir size.
1315                // A ≤cap carrier has len == wake_sample_total → weight 1.0, so the
1316                // pool is value-for-value with the unweighted concat; a >cap
1317                // carrier's capped samples each stand for `total/len > 1` true
1318                // wakes, restoring the cross-phase population proportion.
1319                //
1320                // INVARIANT: `reservoir_push` bumps wake_sample_total on EVERY
1321                // wakeup but pushes into the reservoir only up to MAX_WAKE_SAMPLES,
1322                // and both the carrier merge and `phase_cgroup_stats` SUM the two,
1323                // so wake_sample_total >= len always (== len below the cap). A
1324                // carrier violating that — samples present but a zeroed/under-count
1325                // total — would yield weight < 1 and silently UNDER-weight (at
1326                // weight 0, DROP) its samples. Clamp the numerator to len so a
1327                // malformed carrier degrades to unit weight (reservoir treated as
1328                // its own population) instead of dropping data; debug_assert the
1329                // invariant so a real counting bug surfaces in dev.
1330                let len = pcg.wake_latencies_ns.len() as u64;
1331                debug_assert!(
1332                    pcg.wake_sample_total >= len,
1333                    "wake_sample_total ({}) < reservoir len ({}): malformed carrier",
1334                    pcg.wake_sample_total,
1335                    len,
1336                );
1337                let w = pcg.wake_sample_total.max(len) as f64 / len as f64;
1338                wake_pool.extend(pcg.wake_latencies_ns.iter().map(|&v| (v, w)));
1339                wake_carriers.insert(cgname.as_str());
1340            }
1341            if !pcg.timer_latencies_ns.is_empty() {
1342                // Population-weighted exactly like the wake pool: a >cap phase's
1343                // capped samples each stand for timer_sample_total/len true
1344                // wakes, restoring the cross-phase population proportion.
1345                let len = pcg.timer_latencies_ns.len() as u64;
1346                debug_assert!(
1347                    pcg.timer_sample_total >= len,
1348                    "timer_sample_total ({}) < reservoir len ({}): malformed carrier",
1349                    pcg.timer_sample_total,
1350                    len,
1351                );
1352                let w = pcg.timer_sample_total.max(len) as f64 / len as f64;
1353                timer_pool.extend(pcg.timer_latencies_ns.iter().map(|&v| (v, w)));
1354                timer_carriers.insert(cgname.as_str());
1355            }
1356            if !pcg.run_delays_ns.is_empty() {
1357                run_delay_pool.extend_from_slice(&pcg.run_delays_ns);
1358                run_delay_carriers.insert(cgname.as_str());
1359            }
1360        }
1361    }
1362    wake_pool.sort_unstable_by_key(|&(v, _)| v);
1363    timer_pool.sort_unstable_by_key(|&(v, _)| v);
1364    run_delay_pool.sort_unstable();
1365    populate_run_distribution_metrics_from(
1366        &mut stats.ext_metrics,
1367        crate::stats::METRICS.iter().filter_map(|m| {
1368            matches!(
1369                m.kind,
1370                crate::stats::MetricKind::Distribution { .. }
1371                    | crate::stats::MetricKind::WorstLowest { .. }
1372                    | crate::stats::MetricKind::WakeLatencyTailRatio
1373            )
1374            .then_some((m.name, m.kind))
1375        }),
1376        &wake_pool,
1377        &wake_carriers,
1378        &timer_pool,
1379        &timer_carriers,
1380        &run_delay_pool,
1381        &run_delay_carriers,
1382        &stats.cgroups,
1383        stats.total_iterations,
1384    );
1385    // worst_page_locality (WorstLowest{NumaLocal,NumaTotal}) re-pools from the
1386    // per-phase NUMA carriers, which the _from helper above cannot reach (it
1387    // takes only stats.cgroups, and the reports-only CgroupStats hardcodes
1388    // page_locality 0.0). Single-sourced with run_metric's WorstPageLocality via
1389    // worst_page_locality(): the lowest per-cgroup page_locality over cgroups
1390    // that measured NUMA residency (a measured 0.0 — all off-node — winning); an
1391    // all-unmeasured cohort writes no key (absence preserved as a missing ext
1392    // entry, never a 0.0 sentinel). numa_agg_per_cgroup()'s borrow ends before
1393    // the insert.
1394    if let Some(v) = stats.worst_page_locality() {
1395        stats
1396            .ext_metrics
1397            .insert("worst_page_locality".to_string(), v);
1398    }
1399    // worst_cross_node_migration_ratio (WorstCrossNodeRatio) re-pools from the same
1400    // per-phase NUMA carriers — the MAX per-cgroup churn ratio over the latest
1401    // residency total — single-sourced with run_metric's WorstCrossNodeMigrationRatio
1402    // via worst_cross_node_migration_ratio(); an all-unmeasured cohort writes no key
1403    // (absence preserved). Not handled by the _from helper above (it has only
1404    // stats.cgroups, and the metric is excluded from that filter).
1405    if let Some(v) = stats.worst_cross_node_migration_ratio() {
1406        stats
1407            .ext_metrics
1408            .insert("worst_cross_node_migration_ratio".to_string(), v);
1409    }
1410}
1411
1412/// Inner of [`populate_run_distribution_metrics`] taking the metric specs
1413/// `(name, kind)` and the pre-pooled+SORTED sample sets explicitly, so the
1414/// re-pool math is unit-testable without registered metrics (the
1415/// `derive_rate_metrics_from` precedent). `wake_pool` / `run_delay_pool` are
1416/// the cross-phase+cross-cgroup raw-ns unions (ascending); `*_carriers` name
1417/// the cgroups that contributed samples to each pool; `cgroups` supplies the
1418/// WorstLowest counters and the per-cgroup reductions that carrier-less
1419/// cgroups (backdrop / stripped) fold into the Distribution result.
1420#[allow(clippy::too_many_arguments)]
1421pub(crate) fn populate_run_distribution_metrics_from<'a>(
1422    target: &mut std::collections::BTreeMap<String, f64>,
1423    metrics: impl Iterator<Item = (&'a str, crate::stats::MetricKind)>,
1424    wake_pool: &[(u64, f64)],
1425    wake_carriers: &std::collections::BTreeSet<&str>,
1426    timer_pool: &[(u64, f64)],
1427    timer_carriers: &std::collections::BTreeSet<&str>,
1428    run_delay_pool: &[u64],
1429    run_delay_carriers: &std::collections::BTreeSet<&str>,
1430    cgroups: &[CgroupStats],
1431    run_total_iterations: u64,
1432) {
1433    use crate::stats::{MetricKind, SampleSource, WorstLowestDenominator, WorstLowestNumerator};
1434    for (name, kind) in metrics {
1435        let value: Option<f64> = match kind {
1436            MetricKind::Distribution { source, reduction } => {
1437                // Pool the carried samples (the thesis: percentile of the
1438                // UNION), then fold worst-wins (max — Distribution is
1439                // LowerBetter, registry-gated) the surviving per-cgroup
1440                // reduction of every cgroup WITHOUT a carrier-with-samples for
1441                // this source (a backdrop, or a stripped/empty carrier), so no
1442                // cgroup is dropped from the run-level distribution. When EVERY
1443                // carrier is empty (fully stripped) the pool is empty and this
1444                // degenerates to the max over every cgroup — the pre-Item-7
1445                // cross-cgroup max.
1446                //
1447                // Pool reduction is per-source: WakeLatencyNs is population-WEIGHTED
1448                // (each phase's guest-capped samples carry weight
1449                // wake_sample_total/len, so a >cap phase contributes by true
1450                // population not capped length — the cross-PHASE de-skew, via
1451                // reduce_weighted_sorted_distribution); RunDelayNs is unweighted
1452                // (per-worker, never reservoir-capped, so length IS population, via
1453                // reduce_sorted_distribution).
1454                //
1455                // CONTRACT (now uniform with WorstLowest and WakeLatencyTailRatio
1456                // below): a cohort with NO measurement for this source anywhere
1457                // (empty carrier pool AND every non-carrier cgroup not-measured)
1458                // yields ABSENCE (None), not Some(0.0). A percentile / mean over
1459                // zero samples is undefined, not a measured zero — folding its
1460                // 0.0 sentinel into the cross-run mean would, for the LowerBetter
1461                // wake/timer/run-delay metrics, falsely drag the mean toward
1462                // "perfect". The non-carrier fold below is gated on
1463                // `cg.measured_for(source)` so an unmeasured cgroup contributes
1464                // nothing; a non-carrier cgroup that DID measure (a genuine
1465                // measured zero, e.g. workers that queued for no time) still
1466                // contributes its real value. This matches WorstLowest (None when
1467                // every iterations_per_*() is None) and WakeLatencyTailRatio (None
1468                // when no cgroup has a tail) — every distributional kind now emits
1469                // None for a no-measurement run.
1470                let (mut v, carriers): (Option<f64>, &std::collections::BTreeSet<&str>) =
1471                    match source {
1472                        SampleSource::WakeLatencyNs => (
1473                            (!wake_pool.is_empty())
1474                                .then(|| reduce_weighted_sorted_distribution(wake_pool, reduction)),
1475                            wake_carriers,
1476                        ),
1477                        SampleSource::TimerLatencyNs => (
1478                            (!timer_pool.is_empty()).then(|| {
1479                                reduce_weighted_sorted_distribution(timer_pool, reduction)
1480                            }),
1481                            timer_carriers,
1482                        ),
1483                        SampleSource::RunDelayNs => (
1484                            (!run_delay_pool.is_empty())
1485                                .then(|| reduce_sorted_distribution(run_delay_pool, reduction)),
1486                            run_delay_carriers,
1487                        ),
1488                    };
1489                for cg in cgroups {
1490                    if !carriers.contains(cg.cgroup_name.as_str()) && cg.measured_for(source) {
1491                        let r = distribution_cgroup_reduction(cg, source, reduction);
1492                        v = Some(v.map_or(r, |acc| acc.max(r)));
1493                    }
1494                }
1495                v
1496            }
1497            // The iterations-efficiency WorstLowest selectors (numerator
1498            // Iterations) re-pool from the `stats.cgroups` counters here; the
1499            // denominator picks the per-cgroup efficiency method. The
1500            // page-locality selector (numerator NumaLocal) re-pools instead from
1501            // the per-phase NUMA carriers in populate_run_distribution_metrics
1502            // (this fn has only stats.cgroups, not stats.phases), so it is
1503            // skipped here (the NumaLocal arm below returns None).
1504            //
1505            // In a MULTI-STEP scenario `AssertResult::merge` extends
1506            // `stats.cgroups` per (handle, step), so the same cgroup name
1507            // appears once per step; this selects the lowest single
1508            // (handle, step) entry, NOT a per-name whole-run efficiency. That
1509            // preserves the deleted `fold_lowest_some` granularity exactly and
1510            // mirrors `populate_run_pooled_iterations_per_cpu_sec`, which sums
1511            // over the same per-(handle, step) entries.
1512            MetricKind::WorstLowest {
1513                numerator: WorstLowestNumerator::Iterations,
1514                denominator,
1515            } => {
1516                let mut worst: Option<f64> = None;
1517                for cg in cgroups {
1518                    let per_cg = match denominator {
1519                        WorstLowestDenominator::NumWorkers => cg.iterations_per_worker(),
1520                        WorstLowestDenominator::CpuTimeNs => cg.iterations_per_cpu_sec(),
1521                        // NumaTotal pairs only with the NumaLocal numerator
1522                        // (handled in the NumaLocal arm below) — never with
1523                        // Iterations.
1524                        WorstLowestDenominator::NumaTotal => None,
1525                    };
1526                    // Lowest-wins, None-aware (the semantic the deleted
1527                    // `fold_lowest_some` carried in `AssertResult::merge`): a
1528                    // measured `Some(0.0)` (starvation) wins the worst bucket;
1529                    // a `None` is skipped.
1530                    if let Some(v) = per_cg
1531                        && worst.is_none_or(|w| v < w)
1532                    {
1533                        worst = Some(v);
1534                    }
1535                }
1536                worst
1537            }
1538            // page-locality (numerator NumaLocal): re-pooled from the per-phase
1539            // NUMA carriers in populate_run_distribution_metrics, not here.
1540            MetricKind::WorstLowest {
1541                numerator: WorstLowestNumerator::NumaLocal,
1542                ..
1543            } => None,
1544            // Worst-cgroup wake-latency tail amplification: the MAX over each
1545            // cgroup's own p99/median ratio (`CgroupStats::wake_latency_tail_ratio`).
1546            // Emit NO key below the min-iterations noise floor (low-N ratios are
1547            // single-outlier noise, not a distributional signal — gated HERE at
1548            // the producer, NOT via a meaned-iteration accessor on the
1549            // aggregated row), and none when no cgroup carried a measurable tail
1550            // (every per-cgroup ratio 0.0, i.e. no median wake latency anywhere).
1551            // Absence then stays distinct from a measured value and no
1552            // sub-threshold run enters the cross-RUN mean. `wake_latency_tail_ratio`
1553            // returns 0.0 for a cgroup with no wake samples (median <= 0), which
1554            // a max-wins fold over the r > 0.0 reals correctly skips.
1555            MetricKind::WakeLatencyTailRatio => {
1556                if run_total_iterations < crate::stats::WAKE_LATENCY_TAIL_RATIO_MIN_ITERATIONS {
1557                    None
1558                } else {
1559                    let mut worst: Option<f64> = None;
1560                    for cg in cgroups {
1561                        let r = cg.wake_latency_tail_ratio();
1562                        if r > 0.0 {
1563                            worst = Some(worst.map_or(r, |w| w.max(r)));
1564                        }
1565                    }
1566                    worst
1567                }
1568            }
1569            _ => None,
1570        };
1571        // Insert only a real, FINITE value: an absent key (all-None
1572        // WorstLowest cohort, or no cgroups at all) stays distinct from a
1573        // measured 0.0, matching the None-vs-Some(0.0) contract the typed
1574        // Option carried. The is_finite guard is a no-op for every
1575        // registry-valid metric (reduce_sorted_distribution reduces non-empty
1576        // pools with CV guarded to 0.0 on zero mean; WorstLowest reuses
1577        // iterations_per_worker()/iterations_per_cpu_sec() which return None on
1578        // a zero denominator), but it MATTERS for the registry-impossible
1579        // cross-source arm of distribution_cgroup_reduction: that arm returns
1580        // NaN, and when a Distribution has no pool (every carrier stripped) the
1581        // carrier-less fold can carry that NaN to `v`. An inserted NaN would
1582        // fail the ENTIRE serde_json sidecar write (serde_json rejects
1583        // non-finite), losing ALL run telemetry — so the guard degrades a
1584        // misauthored metric to ABSENCE here rather than risking that write
1585        // failure downstream.
1586        if let Some(v) = value.filter(|v| v.is_finite()) {
1587            target.insert(name.to_string(), v);
1588        }
1589    }
1590}
1591
1592/// Reduce a NON-EMPTY ascending-sorted raw-ns sample pool to one
1593/// [`crate::stats::SampleReduction`] value, ns→µs once. Mirrors the
1594/// per-cgroup reductions [`cgroup_stats`] computes (p99 / median via
1595/// [`percentile`], CV with `n = pool.len()`, mean, max) so the run-level
1596/// re-pool reproduces them over the COMBINED cross-cgroup set — to within
1597/// FP tolerance for CV / mean, not bit-exactly: this sums over the
1598/// ASCENDING-sorted pool while `cgroup_stats` sums over the unsorted
1599/// arrival order, so the float results differ by ~1e-15 (the parity test
1600/// `repool_distribution_value_for_value_with_cgroup_stats` uses a 1e-9
1601/// bound). Same "distribution-equivalent, not byte-identical" framing as
1602/// the `wake_latencies_ns` carrier doc.
1603pub(crate) fn reduce_sorted_distribution(
1604    sorted: &[u64],
1605    reduction: crate::stats::SampleReduction,
1606) -> f64 {
1607    use crate::stats::SampleReduction;
1608    match reduction {
1609        SampleReduction::P99 => percentile(sorted, 0.99) as f64 / 1000.0,
1610        SampleReduction::P999 => percentile(sorted, 0.999) as f64 / 1000.0,
1611        SampleReduction::Median => percentile(sorted, 0.5) as f64 / 1000.0,
1612        SampleReduction::Cv => {
1613            let n = sorted.len() as f64;
1614            let mean_ns = sorted.iter().sum::<u64>() as f64 / n;
1615            if mean_ns > 0.0 {
1616                let variance = sorted
1617                    .iter()
1618                    .map(|&v| (v as f64 - mean_ns).powi(2))
1619                    .sum::<f64>()
1620                    / n;
1621                variance.sqrt() / mean_ns
1622            } else {
1623                0.0
1624            }
1625        }
1626        // Divide ONCE on the summed/maxed ns (the carriers store raw ns):
1627        // mean(ns)/1000 == mean(ns/1000) and max(ns)/1000 == max(ns/1000).
1628        // Sum in f64 (not u64-then-cast) to match cgroup_stats's f64 run-delay
1629        // accumulation and PhaseCgroupStats::run_delay_summary — overflow-safe
1630        // (an f64 sum saturates toward +inf; a u64 sum would panic in debug /
1631        // silently wrap in release on a pathological pool), value identical
1632        // within the 1e-9 parity bound. (The Cv arm's mean_ns above keeps the u64 sum
1633        // because cgroup_stats's CV also u64-sums all_latencies — matching it is
1634        // exact-parity-preserving there.)
1635        SampleReduction::Mean => {
1636            sorted.iter().map(|&v| v as f64).sum::<f64>() / sorted.len() as f64 / 1000.0
1637        }
1638        // Sorted ascending, so the last element is the max.
1639        SampleReduction::Worst => *sorted.last().expect("non-empty by caller") as f64 / 1000.0,
1640    }
1641}
1642
1643/// Weighted nearest-rank percentile over a value-sorted `(value, weight)` pool —
1644/// the weighted sibling of [`percentile`]. Matches `percentile`'s convention
1645/// (the value at 1-indexed rank `ceil(W * p)`, `W` = total weight, floored at
1646/// rank 1) so with UNIT weights (every weight `1.0`) it returns byte-identically:
1647/// cumulative weight after `k` elements is `k`, `ceil(W*p) == ceil(n*p)`, and the
1648/// `.max(1.0)` floor mirrors `percentile`'s `saturating_sub(1)`, so the crossing
1649/// element is `percentile`'s `sorted[ceil(n*p)-1]` for p>0 and `sorted[0]` at
1650/// p=0. Used by the run-level wake re-pool to weight each phase's samples by
1651/// true population.
1652pub(crate) fn weighted_percentile(sorted: &[(u64, f64)], p: f64) -> u64 {
1653    if sorted.is_empty() {
1654        return 0;
1655    }
1656    debug_assert!(
1657        sorted.windows(2).all(|w| w[0].0 <= w[1].0),
1658        "weighted_percentile() requires value-sorted input",
1659    );
1660    let total: f64 = sorted.iter().map(|&(_, w)| w).sum();
1661    // Nearest-rank target, floored at 1 so `p == 0.0` maps to the first element
1662    // (mirrors percentile's saturating_sub(1) flooring rank 0 to index 0).
1663    let target = (total * p).ceil().max(1.0);
1664    let mut cum = 0.0;
1665    for &(v, w) in sorted {
1666        cum += w;
1667        if cum >= target {
1668            return v;
1669        }
1670    }
1671    sorted.last().map(|&(v, _)| v).unwrap_or(0)
1672}
1673
1674/// Weighted sibling of [`reduce_sorted_distribution`] for the wake-latency
1675/// re-pool: each `(value, weight)` carries a per-sample weight of
1676/// `wake_sample_total / reservoir_len`, so a >cap phase (reservoir-capped on the
1677/// guest) contributes in proportion to its TRUE population, not its capped
1678/// length — removing the cross-PHASE length-skew. With UNIT weights (every phase
1679/// ≤cap, so `len == wake_sample_total`) it reduces byte-identically to
1680/// [`reduce_sorted_distribution`] for P99 / Median / Mean / Worst; the Cv arm
1681/// differs only by the f64-vs-u64 mean sum. For the small fixed pool the parity
1682/// test uses, that gap is ~1e-15 (within its 1e-9 bound), but it grows ~n·ε with
1683/// pool size — a cross-phase pool can reach millions of samples (~1e-9–1e-8 on a
1684/// high-CV pool), so a LARGE-pool parity test must not assume a universal 1e-15.
1685/// A weighted variance cannot keep the u64 sum.  Exhaustive over SampleReduction,
1686/// mirroring [`reduce_sorted_distribution`], so a new variant fails the build.
1687///
1688/// The Cv / Mean `total_w <= 0.0` guards and [`weighted_percentile`]'s
1689/// all-weight-zero fall-through are degenerate-input belts: the capture-path
1690/// caller [`populate_run_distribution_metrics`] clamps every per-sample weight to
1691/// a floor of 1.0, so `total_w >= len >= 1` there and those branches are
1692/// unreachable on the production path.
1693pub(crate) fn reduce_weighted_sorted_distribution(
1694    sorted: &[(u64, f64)],
1695    reduction: crate::stats::SampleReduction,
1696) -> f64 {
1697    use crate::stats::SampleReduction;
1698    match reduction {
1699        SampleReduction::P99 => weighted_percentile(sorted, 0.99) as f64 / 1000.0,
1700        SampleReduction::P999 => weighted_percentile(sorted, 0.999) as f64 / 1000.0,
1701        SampleReduction::Median => weighted_percentile(sorted, 0.5) as f64 / 1000.0,
1702        SampleReduction::Cv => {
1703            let total_w: f64 = sorted.iter().map(|&(_, w)| w).sum();
1704            if total_w <= 0.0 {
1705                return 0.0;
1706            }
1707            let mean_ns = sorted.iter().map(|&(v, w)| v as f64 * w).sum::<f64>() / total_w;
1708            if mean_ns > 0.0 {
1709                let variance = sorted
1710                    .iter()
1711                    .map(|&(v, w)| w * (v as f64 - mean_ns).powi(2))
1712                    .sum::<f64>()
1713                    / total_w;
1714                variance.sqrt() / mean_ns
1715            } else {
1716                0.0
1717            }
1718        }
1719        SampleReduction::Mean => {
1720            let total_w: f64 = sorted.iter().map(|&(_, w)| w).sum();
1721            if total_w <= 0.0 {
1722                return 0.0;
1723            }
1724            sorted.iter().map(|&(v, w)| v as f64 * w).sum::<f64>() / total_w / 1000.0
1725        }
1726        // Max value present, weight-invariant — last element of the value-sorted pool.
1727        SampleReduction::Worst => sorted.last().map(|&(v, _)| v).unwrap_or(0) as f64 / 1000.0,
1728    }
1729}
1730
1731/// One cgroup's surviving [`CgroupStats`] reduction for a
1732/// [`crate::stats::MetricKind::Distribution`] (source, reduction) pair — the
1733/// value folded worst-wins into the run-level distribution for a cgroup whose
1734/// raw samples are NOT in the pool (a backdrop, or a stripped/empty carrier).
1735/// Worst-wins is `f64::max` (every Distribution metric is `LowerBetter`,
1736/// enforced by `every_metric_has_kind_consistent_with_naming`).
1737///
1738/// Per-source match, EXHAUSTIVE over SampleReduction (no `_` catch-all,
1739/// mirroring reduce_sorted_distribution) so a new SampleSource or
1740/// SampleReduction variant fails the build until a reduction field is wired.
1741/// The cross-source reductions (a wake source asking for a run-delay reduction,
1742/// or vice versa) are registry-impossible (no CgroupStats field exists), so
1743/// they debug_assert in tests and, in release, return `f64::NAN` rather than
1744/// 0.0 — NaN is IGNORED by the caller's `f64::max` worst-wins fold, and if it
1745/// still reaches `populate_run_distribution_metrics`'s insert (a pool-less
1746/// Distribution whose every carrier-less cgroup hits this arm) the is_finite
1747/// insert guard drops it to absence. Either way a registry-authoring mistake
1748/// drops the bogus contribution instead of folding a 0.0 that a LowerBetter
1749/// metric would read as "perfect".
1750fn distribution_cgroup_reduction(
1751    cg: &CgroupStats,
1752    source: crate::stats::SampleSource,
1753    reduction: crate::stats::SampleReduction,
1754) -> f64 {
1755    use crate::stats::{SampleReduction, SampleSource};
1756    match source {
1757        SampleSource::WakeLatencyNs => match reduction {
1758            SampleReduction::P99 => cg.p99_wake_latency_us,
1759            SampleReduction::Median => cg.median_wake_latency_us,
1760            SampleReduction::Cv => cg.wake_latency_cv,
1761            SampleReduction::P999 | SampleReduction::Mean | SampleReduction::Worst => {
1762                debug_assert!(false, "no CgroupStats wake reduction for {reduction:?}");
1763                f64::NAN
1764            }
1765        },
1766        SampleSource::RunDelayNs => match reduction {
1767            SampleReduction::Mean => cg.mean_run_delay_us,
1768            SampleReduction::Worst => cg.worst_run_delay_us,
1769            SampleReduction::P99
1770            | SampleReduction::P999
1771            | SampleReduction::Median
1772            | SampleReduction::Cv => {
1773                debug_assert!(
1774                    false,
1775                    "no CgroupStats run-delay reduction for {reduction:?}"
1776                );
1777                f64::NAN
1778            }
1779        },
1780        SampleSource::TimerLatencyNs => match reduction {
1781            SampleReduction::Median => cg.median_timer_latency_us,
1782            SampleReduction::P99 => cg.p99_timer_latency_us,
1783            SampleReduction::P999 => cg.p999_timer_latency_us,
1784            SampleReduction::Worst => cg.worst_timer_latency_us,
1785            SampleReduction::Cv | SampleReduction::Mean => {
1786                debug_assert!(false, "no CgroupStats timer reduction for {reduction:?}");
1787                f64::NAN
1788            }
1789        },
1790    }
1791}
1792
1793/// Populate cross-RUN aggregate entries for every registered
1794/// `crate::stats::MetricDef` whose `read_sample` returns finite
1795/// values across the entire sample series. Writes into
1796/// `target` (typically `ScenarioStats::ext_metrics`) under the
1797/// metric's registry name — the same key the per-phase
1798/// [`PhaseBucket::metrics`] uses, so cross-RUN and per-phase
1799/// consumers reference the same name.
1800///
1801/// Existing keys are NOT overwritten — a typed GauntletRow field's
1802/// value (populated via the MetricDef accessor at sidecar-write
1803/// time) wins on the read path, and this fn fills the gap for
1804/// registered metrics that have a `read_sample` wire but no typed
1805/// GauntletRow field. Without this fill, `cargo ktstr perf-delta`
1806/// silently skips the metric (read returns None on both sides, so the
1807/// `(None, None)` arm drops the pair).
1808///
1809/// Per-phase reduction dispatch is described on [`PhaseBucket`];
1810/// the cross-RUN fold here uses `crate::stats::aggregate_samples_for_phase`
1811/// over the full sample series, with TYPED_FIELD_NAMES gating to
1812/// avoid duplicating typed-accessor sources.
1813pub fn populate_run_ext_metrics(
1814    samples: &crate::scenario::sample::SampleSeries,
1815    target: &mut std::collections::BTreeMap<String, f64>,
1816) {
1817    // Typed-backed keys are skipped via the module-level TYPED_FIELD_NAMES
1818    // (shared with populate_run_ext_metrics_from_phases) so only
1819    // ext-metrics-only registry entries are populated here.
1820    for metric_def in crate::stats::METRICS {
1821        if target.contains_key(metric_def.name) {
1822            continue;
1823        }
1824        if TYPED_FIELD_NAMES.contains(&metric_def.name) {
1825            continue;
1826        }
1827        let readings: Vec<f64> = samples
1828            .iter_samples()
1829            .filter_map(|s| metric_def.read_sample(&s))
1830            .collect();
1831        if readings.is_empty() {
1832            continue;
1833        }
1834        if let Some(reduced) = crate::stats::aggregate_samples_for_phase(metric_def, &readings) {
1835            target.insert(metric_def.name.to_string(), reduced);
1836        }
1837    }
1838    // Run-level capture-window wall for the IRQ rates: the elapsed span of the
1839    // per_cpu_time-bearing freezes — the SAME freezes the IRQ-counter numerators
1840    // above were read_sampled from (whole-run last-minus-first via
1841    // phase_counter_delta). Inserting it HERE (the direct/whole-run path) makes
1842    // the run-level rate's numerator AND denominator share the whole-run span.
1843    // Without it, derive_rate_metrics below finds the numerator but no
1844    // denominator (total_phase_wall_sec has no read_sample arm), and the rate
1845    // would instead be derived by populate_run_ext_metrics_from_phases over a
1846    // Σ-per-phase-capture denominator — a NARROWER time base than this whole-run
1847    // numerator (it excludes the inter-phase / cross-capture gaps the numerator
1848    // counts), inflating the rate on MULTI-phase runs. Run-level analog of the
1849    // per-phase assert::phase_build. Gated on total_hardirqs
1850    // (irqtime-independent), so the count-rates derive even when
1851    // CONFIG_IRQ_TIME_ACCOUNTING is off and total_irq_time_ns is absent. Min/max
1852    // over the elapsed-bearing freezes: chronological periodic samples put the
1853    // numerator's first/last reading at min/max elapsed, so num and den span the
1854    // identical interval.
1855    if target.contains_key("total_hardirqs") {
1856        let irq_elapsed_ms: Vec<u64> = samples
1857            .iter_samples()
1858            .filter(|s| !s.snapshot.per_cpu_time().is_empty())
1859            .filter_map(|s| s.elapsed_ms)
1860            .collect();
1861        if let (Some(first), Some(last)) = (
1862            irq_elapsed_ms.iter().min().copied(),
1863            irq_elapsed_ms.iter().max().copied(),
1864        ) && last > first
1865        {
1866            let wall_ms = (last - first) as f64;
1867            target
1868                .entry("total_phase_wall_ns".to_string())
1869                .or_insert(wall_ms * 1_000_000.0);
1870            target
1871                .entry("total_phase_wall_sec".to_string())
1872                .or_insert(wall_ms / 1000.0);
1873        }
1874    }
1875    // Re-derive Rate metrics from the read_sample components just folded
1876    // in. populate_run_ext_metrics is pub and called standalone (tests,
1877    // and not only ahead of populate_run_ext_metrics_from_phases), so it
1878    // derives its own rates to stay self-contained.
1879    crate::stats::derive_rate_metrics(target);
1880}
1881
1882/// Derive the per-phase scalar metrics ([`crate::stats::MetricKind::PerPhase`])
1883/// for every cgroup carrier in each phase. Two families:
1884///
1885/// 1. NON-schbench carrier scalars (via [`write_carrier_scalars`], for EVERY
1886///    cgroup regardless of work type): the wake/run-delay/off-cpu distributions
1887///    + the migration/iterations/locality ratios + the carrier counters,
1888///    written ONLY into each carrier's `PhaseCgroupStats::metrics` (per-cgroup,
1889///    read via `phase_cgroup_metric`) — NO pooled [`PhaseBucket::metrics`]
1890///    entry; their run-level aggregate is the `worst_*` ext-metrics key.
1891/// 2. schbench scalars (via [`write_schbench_scalars`], only for cgroups
1892///    carrying a `SchbenchPhaseStats`): written per-cgroup into
1893///    `PhaseCgroupStats::metrics` AND, pooled across the phase's schbench
1894///    carriers, into [`PhaseBucket::metrics`] (read via `phase_metric`).
1895///    Percentiles MUST come from the POOLED histogram (combine the latency
1896///    histograms + integer-add the run-delay raw pairs), never an average of
1897///    per-cgroup percentiles; the run-delay means are ABSENT when `pcount == 0`
1898///    so a never-scheduled class is not a false `0`.
1899///
1900/// MUST run POST-merge: the buckets are final and keyed by `step_index`, so a
1901/// per-phase A/B claim reads the value via `phase_metric` /
1902/// `phase_cgroup_metric`. It runs after the per-cgroup carriers are folded
1903/// ([`fold_guest_per_cgroup_into_host_buckets`]) and after any
1904/// [`merge_matched_phase_buckets`] — both SKIP is_derived keys (the `continue`
1905/// in the merge loop), so deriving earlier would DROP the keys. Idempotent
1906/// (overwrites the keys), so callers that rebuild buckets per call (e.g.
1907/// `VmResult::phase_buckets`) may re-run it freely.
1908// doc_lazy_continuation: pre-existing numbered-list wording surfaced by the clippy
1909// 1.94 bump; renders fine. Suppress rather than reflow the prose.
1910#[allow(clippy::doc_lazy_continuation)]
1911pub(crate) fn derive_phase_metrics(phases: &mut [PhaseBucket]) {
1912    use crate::workload::schbench::run::SchbenchPhaseStats;
1913
1914    for bucket in phases.iter_mut() {
1915        // Derive the schbench scalars BOTH per cgroup (into pc.metrics — the
1916        // per-cgroup set: N cgroups -> N queryable sets) AND pooled across the
1917        // phase's cgroups (into bucket.metrics — the aggregate). Both go through the
1918        // SAME reducer (write_schbench_scalars) from the SAME carriers, so the
1919        // pooled set is the cross-cgroup re-pool of the per-cgroup carriers (a
1920        // percentile re-derives from the pooled histogram via PlatStats::combine =
1921        // bucket-count add, never averaged), and pooled == cross-cgroup re-pool.
1922        let mut pooled: Option<SchbenchPhaseStats> = None;
1923        let mut pooled_taobench: Option<crate::workload::taobench::run::TaobenchPhaseStats> = None;
1924        for pc in bucket.per_cgroup.values_mut() {
1925            // Non-schbench carrier-derived metrics (wake/run-delay/off-cpu
1926            // distributions + the migration/iterations/locality ratios + the
1927            // carrier counters), emitted for EVERY cgroup regardless of work
1928            // type. Runs BEFORE the schbench block: `write_carrier_scalars`
1929            // takes `&mut pc` (its summaries read many `pc` fields, then it
1930            // writes `pc.metrics`), so it cannot share the iteration with the
1931            // disjoint-field borrow the schbench block relies on.
1932            write_carrier_scalars(pc);
1933            // Disjoint field borrows: `s` reads pc.schbench, the reducer writes
1934            // pc.metrics — different fields, so the immutable + mutable borrows of
1935            // `*pc` coexist.
1936            if let Some(s) = pc.schbench.as_ref() {
1937                write_schbench_scalars(s, &mut pc.metrics);
1938                // `take()` completes the borrow before the reassignment in both arms.
1939                match pooled.take() {
1940                    Some(mut acc) => {
1941                        acc.merge(s);
1942                        pooled = Some(acc);
1943                    }
1944                    None => pooled = Some(s.clone()),
1945                }
1946            }
1947            // taobench per-phase: same disjoint-field-borrow pattern (`t` reads
1948            // pc.taobench, the reducer writes pc.metrics) + pool across cgroups.
1949            if let Some(t) = pc.taobench.as_ref() {
1950                write_taobench_scalars(t, &mut pc.metrics);
1951                match pooled_taobench.take() {
1952                    Some(mut acc) => {
1953                        acc.merge(t);
1954                        pooled_taobench = Some(acc);
1955                    }
1956                    None => pooled_taobench = Some(t.clone()),
1957                }
1958            }
1959        }
1960        if let Some(p) = pooled {
1961            write_schbench_scalars(&p, &mut bucket.metrics);
1962        }
1963        if let Some(t) = pooled_taobench {
1964            write_taobench_scalars(&t, &mut bucket.metrics);
1965        }
1966    }
1967}
1968
1969/// Write the per-phase per-cgroup NON-schbench DERIVED scalars from ONE
1970/// carrier into `pc.metrics`, keyed by registry [`crate::stats::MetricDef`]
1971/// name — the per-cgroup analog of the run-level reductions, single-sourced
1972/// through the shared ratio helpers in [`crate::assert::reductions`] (so the
1973/// per-cgroup value and the run-level `worst_*` fold agree by construction)
1974/// and the carrier summary methods. Each gate mirrors the carrier's ABSENT
1975/// discipline: an empty wake pool, a not-measured off-CPU state, a
1976/// zero-worker cgroup, or a no-NUMA-pages cgroup emits no key (reads as
1977/// missing, never a false 0); `migration_ratio` is ALWAYS present (a measured
1978/// 0.0 when no iterations ran).
1979///
1980/// Unlike the schbench families, these families have NO pooled
1981/// `PhaseBucket::metrics` entry at all: as PerPhase / is_derived metrics whose
1982/// `read_sample` is `None`, the per-phase fold never writes them to
1983/// `bucket.metrics`. Their run-level aggregate is the `worst_*` key
1984/// `populate_run_distribution_metrics` writes into the run-level
1985/// `ScenarioStats::ext_metrics` map; per-phase they are queryable ONLY per-cgroup
1986/// via `pc.metrics`. So this writes ONLY `pc.metrics`, never `bucket.metrics` —
1987/// the per-cgroup carrier value is authoritative for the per-cgroup question.
1988///
1989/// Takes `&mut PhaseCgroupStats` (not `(&pc, &mut pc.metrics)`): it reads many
1990/// `pc` fields AND writes `pc.metrics`, so the summaries/ratios are bound to
1991/// locals from `&*pc` first, then the `&mut pc.metrics` borrow is taken once.
1992fn write_carrier_scalars(pc: &mut PhaseCgroupStats) {
1993    use crate::assert::reductions::{
1994        cross_node_migration_ratio_of, iterations_per_cpu_sec_of, iterations_per_worker_of,
1995        migration_ratio_of, page_locality_of,
1996    };
1997    // Read everything from &*pc into locals BEFORE the &mut pc.metrics borrow.
1998    let wake = pc.wake_summary().zip(pc.wake_cv());
1999    let timer = pc.timer_summary();
2000    let run_delay = pc.run_delay_summary();
2001    let off_cpu = pc.off_cpu_summary();
2002    let migration_ratio = migration_ratio_of(pc.total_migrations, pc.total_iterations);
2003    let ipw = iterations_per_worker_of(pc.num_workers, pc.total_iterations);
2004    let ipcs = iterations_per_cpu_sec_of(pc.num_workers, pc.total_cpu_time_ns, pc.total_iterations);
2005    let numa = if pc.numa_pages_total > 0 {
2006        Some((
2007            page_locality_of(pc.numa_pages_local, pc.numa_pages_total),
2008            cross_node_migration_ratio_of(pc.cross_node_migrated, pc.numa_pages_total),
2009        ))
2010    } else {
2011        None
2012    };
2013
2014    let m = &mut pc.metrics;
2015    // WAKE — all three or none (wake_summary + wake_cv share the empty-pool gate).
2016    if let Some(((p99, median), cv)) = wake {
2017        m.insert("p99_wake_latency_us".to_string(), p99);
2018        m.insert("median_wake_latency_us".to_string(), median);
2019        m.insert("wake_latency_cv".to_string(), cv);
2020    }
2021    // TIMER — all three or none (timer_summary shares the empty-pool gate).
2022    if let Some((median, p99, p999)) = timer {
2023        m.insert("median_timer_latency_us".to_string(), median);
2024        m.insert("p99_timer_latency_us".to_string(), p99);
2025        m.insert("p999_timer_latency_us".to_string(), p999);
2026    }
2027    // RUN-DELAY — both or none.
2028    if let Some((mean, worst)) = run_delay {
2029        m.insert("mean_run_delay_us".to_string(), mean);
2030        m.insert("max_run_delay_us".to_string(), worst);
2031    }
2032    // OFF-CPU — four or none (None == not-measured).
2033    if let Some((avg, min, max, spread)) = off_cpu {
2034        m.insert("avg_off_cpu_pct".to_string(), avg);
2035        m.insert("min_off_cpu_pct".to_string(), min);
2036        m.insert("max_off_cpu_pct".to_string(), max);
2037        m.insert("off_cpu_spread_pct".to_string(), spread);
2038    }
2039    // RATIOS — migration_ratio ALWAYS (measured 0.0 when no iterations ran).
2040    m.insert("migration_ratio".to_string(), migration_ratio);
2041    if let Some(v) = ipw {
2042        m.insert("iterations_per_worker".to_string(), v);
2043    }
2044    if let Some(v) = ipcs {
2045        m.insert("iterations_per_cpu_sec".to_string(), v);
2046    }
2047    // NUMA ratios only when pages were observed (mirrors the carrier's
2048    // numa_pages_total>0 gate; absent is more honest than a 0.0 default).
2049    if let Some((locality, cross_node)) = numa {
2050        m.insert("page_locality".to_string(), locality);
2051        m.insert("cross_node_migration_ratio".to_string(), cross_node);
2052    }
2053}
2054
2055/// Write the per-phase schbench scalar metrics derived from ONE
2056/// `SchbenchPhaseStats` (a single cgroup's carrier, or the cross-cgroup pool)
2057/// into `out`, keyed by registry [`crate::stats::MetricDef`] name. The sole
2058/// producer of these keys for both the per-cgroup ([`PhaseCgroupStats::metrics`])
2059/// and pooled ([`PhaseBucket::metrics`]) maps — one derivation, no duplicated
2060/// percentile / min/max / rps / sched-delay math. Each gate mirrors the carrier's
2061/// ABSENT discipline: an empty histogram or a zero pcount emits no key (reads as
2062/// missing, never a false 0); loop_count is always present (0 = a real measured
2063/// no-cycles outcome).
2064fn write_schbench_scalars(
2065    p: &crate::workload::schbench::run::SchbenchPhaseStats,
2066    out: &mut std::collections::BTreeMap<String, f64>,
2067) {
2068    use crate::stats::{
2069        SCHBENCH_LOOP_COUNT, SCHBENCH_REQUEST_MAX_US, SCHBENCH_REQUEST_MIN_US,
2070        SCHBENCH_REQUEST_P50_US, SCHBENCH_REQUEST_P90_US, SCHBENCH_REQUEST_P99_US,
2071        SCHBENCH_REQUEST_P999_US, SCHBENCH_RPS_MAX, SCHBENCH_RPS_MIN, SCHBENCH_RPS_P20,
2072        SCHBENCH_RPS_P50, SCHBENCH_RPS_P90, SCHBENCH_SCHED_DELAY_MSG_US,
2073        SCHBENCH_SCHED_DELAY_WORKER_US, SCHBENCH_WAKEUP_MAX_US, SCHBENCH_WAKEUP_MIN_US,
2074        SCHBENCH_WAKEUP_P50_US, SCHBENCH_WAKEUP_P90_US, SCHBENCH_WAKEUP_P99_US,
2075        SCHBENCH_WAKEUP_P999_US,
2076    };
2077    use crate::workload::schbench::plat::Pct;
2078
2079    if p.wakeup.sample_count() > 0 {
2080        let q = p.wakeup.percentiles();
2081        out.insert(
2082            SCHBENCH_WAKEUP_P50_US.to_string(),
2083            q.value_at(Pct::P50) as f64,
2084        );
2085        out.insert(
2086            SCHBENCH_WAKEUP_P90_US.to_string(),
2087            q.value_at(Pct::P90) as f64,
2088        );
2089        out.insert(
2090            SCHBENCH_WAKEUP_P99_US.to_string(),
2091            q.value_at(Pct::P99) as f64,
2092        );
2093        out.insert(
2094            SCHBENCH_WAKEUP_P999_US.to_string(),
2095            q.value_at(Pct::P999) as f64,
2096        );
2097        out.insert(SCHBENCH_WAKEUP_MIN_US.to_string(), q.min as f64);
2098        out.insert(SCHBENCH_WAKEUP_MAX_US.to_string(), q.max as f64);
2099    }
2100    if p.request.sample_count() > 0 {
2101        let q = p.request.percentiles();
2102        out.insert(
2103            SCHBENCH_REQUEST_P50_US.to_string(),
2104            q.value_at(Pct::P50) as f64,
2105        );
2106        out.insert(
2107            SCHBENCH_REQUEST_P90_US.to_string(),
2108            q.value_at(Pct::P90) as f64,
2109        );
2110        out.insert(
2111            SCHBENCH_REQUEST_P99_US.to_string(),
2112            q.value_at(Pct::P99) as f64,
2113        );
2114        out.insert(
2115            SCHBENCH_REQUEST_P999_US.to_string(),
2116            q.value_at(Pct::P999) as f64,
2117        );
2118        out.insert(SCHBENCH_REQUEST_MIN_US.to_string(), q.min as f64);
2119        out.insert(SCHBENCH_REQUEST_MAX_US.to_string(), q.max as f64);
2120    }
2121    // Per-phase achieved-RPS distribution (the control thread's per-second samples
2122    // attributed to this epoch). Gated on sample_count()>0 so a phase shorter than
2123    // the ~1s control cadence reads ABSENT, never rps=0. schbench's RPS table is
2124    // PLIST_FOR_RPS = 20/50/90 (schbench.c:130) + min/max.
2125    if p.rps.sample_count() > 0 {
2126        let r = p.rps.percentiles();
2127        out.insert(SCHBENCH_RPS_P20.to_string(), r.value_at(Pct::P20) as f64);
2128        out.insert(SCHBENCH_RPS_P50.to_string(), r.value_at(Pct::P50) as f64);
2129        out.insert(SCHBENCH_RPS_P90.to_string(), r.value_at(Pct::P90) as f64);
2130        out.insert(SCHBENCH_RPS_MIN.to_string(), r.min as f64);
2131        out.insert(SCHBENCH_RPS_MAX.to_string(), r.max as f64);
2132    }
2133    // Sample-weighted run-delay mean (Σrun_delay / Σpcount), ns→µs; ABSENT when
2134    // pcount==0 (a never-scheduled class) so it reads as missing, not 0.
2135    if p.msg_pcount > 0 {
2136        let mean_us = p.msg_run_delay_ns as f64 / p.msg_pcount as f64 / 1000.0;
2137        out.insert(SCHBENCH_SCHED_DELAY_MSG_US.to_string(), mean_us);
2138    }
2139    if p.worker_pcount > 0 {
2140        let mean_us = p.worker_run_delay_ns as f64 / p.worker_pcount as f64 / 1000.0;
2141        out.insert(SCHBENCH_SCHED_DELAY_WORKER_US.to_string(), mean_us);
2142    }
2143    // loop_count is always present for a schbench carrier: 0 = no cycles ran (a real
2144    // measured value; HigherBetter → worst), distinct from a non-schbench carrier
2145    // which has no schbench data at all (the caller skips it).
2146    out.insert(SCHBENCH_LOOP_COUNT.to_string(), p.loop_count as f64);
2147}
2148
2149/// Write the per-phase taobench scalar metrics derived from ONE
2150/// `TaobenchStats` (a single cgroup's carrier, or the cross-cgroup pool)
2151/// into `out`, keyed by registry [`crate::stats::MetricDef`] name. The sole
2152/// producer of these keys for both the per-cgroup ([`PhaseCgroupStats::metrics`])
2153/// and pooled ([`PhaseBucket::metrics`]) maps. ABSENT discipline: the qps keys
2154/// only when the wall window was measured (`elapsed_ns > 0`); hit_ratio only when
2155/// ops completed (`total > 0`); hit_rate only when lookups were issued
2156/// (`get_cmds > 0`) — a not-measured value reads as missing, never a false 0.
2157fn write_taobench_scalars(
2158    p: &crate::workload::taobench::run::TaobenchPhaseStats,
2159    out: &mut std::collections::BTreeMap<String, f64>,
2160) {
2161    use crate::stats::{
2162        TAOBENCH_FAST_QPS, TAOBENCH_HIT_RATE, TAOBENCH_HIT_RATIO, TAOBENCH_SLOW_QPS,
2163        TAOBENCH_TOTAL_QPS,
2164    };
2165    let c = &p.counters;
2166    let total = c.total_ops();
2167    if c.elapsed_ns > 0 {
2168        let secs = c.elapsed_ns as f64 / 1e9;
2169        out.insert(TAOBENCH_TOTAL_QPS.to_string(), total as f64 / secs);
2170        out.insert(TAOBENCH_FAST_QPS.to_string(), c.fast_ops as f64 / secs);
2171        out.insert(TAOBENCH_SLOW_QPS.to_string(), c.slow_ops as f64 / secs);
2172    }
2173    if total > 0 {
2174        out.insert(
2175            TAOBENCH_HIT_RATIO.to_string(),
2176            c.fast_ops as f64 / total as f64,
2177        );
2178    }
2179    if c.get_cmds > 0 {
2180        out.insert(
2181            TAOBENCH_HIT_RATE.to_string(),
2182            1.0 - (c.get_misses as f64 / c.get_cmds as f64),
2183        );
2184    }
2185    // Per-phase serve-latency percentiles (open-loop only): the µs distribution
2186    // pooled across this phase's cgroups, re-derived over the union histogram.
2187    // Absent when no serve samples (closed loop, or a stream with no completions).
2188    write_taobench_serve_scalars(&p.serve_lat, out);
2189}
2190
2191/// Write the per-phase taobench serve-latency percentile scalars from a pooled
2192/// `PlatStats` into `out` (registry keys), gated on `sample_count() > 0` so a
2193/// closed-loop / no-sample carrier emits nothing (absent, never a false 0).
2194fn write_taobench_serve_scalars(
2195    serve: &crate::workload::schbench::plat::PlatStats,
2196    out: &mut std::collections::BTreeMap<String, f64>,
2197) {
2198    use crate::stats::{
2199        TAOBENCH_SERVE_MAX_US, TAOBENCH_SERVE_MIN_US, TAOBENCH_SERVE_P50_US, TAOBENCH_SERVE_P90_US,
2200        TAOBENCH_SERVE_P99_US, TAOBENCH_SERVE_P999_US,
2201    };
2202    use crate::workload::schbench::plat::Pct;
2203    if serve.sample_count() == 0 {
2204        return;
2205    }
2206    let q = serve.percentiles();
2207    out.insert(
2208        TAOBENCH_SERVE_P50_US.to_string(),
2209        q.value_at(Pct::P50) as f64,
2210    );
2211    out.insert(
2212        TAOBENCH_SERVE_P90_US.to_string(),
2213        q.value_at(Pct::P90) as f64,
2214    );
2215    out.insert(
2216        TAOBENCH_SERVE_P99_US.to_string(),
2217        q.value_at(Pct::P99) as f64,
2218    );
2219    out.insert(
2220        TAOBENCH_SERVE_P999_US.to_string(),
2221        q.value_at(Pct::P999) as f64,
2222    );
2223    out.insert(TAOBENCH_SERVE_MIN_US.to_string(), q.min as f64);
2224    out.insert(TAOBENCH_SERVE_MAX_US.to_string(), q.max as f64);
2225}