ktstr/assert/run_metrics.rs
1use super::*;
2
3/// Aggregated statistics across all cgroups in a scenario.
4#[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize, crate::Claim)]
5pub struct ScenarioStats {
6 /// Per-cgroup stats, one entry per cgroup.
7 pub cgroups: Vec<CgroupStats>,
8 /// Sum of workers across all cgroups.
9 pub total_workers: usize,
10 /// Sum of per-cgroup distinct CPU counts (not deduplicated across cgroups).
11 pub total_cpus: usize,
12 /// Sum of migration counts across all cgroups.
13 pub total_migrations: u64,
14 /// Worst spread across any cgroup (highest).
15 pub worst_spread: f64,
16 /// Worst gap across any cgroup (highest, ms). Paired with
17 /// `worst_gap_cpu` — both come from the same cgroup.
18 pub worst_gap_ms: u64,
19 /// CPU where the worst gap occurred across all cgroups. Paired
20 /// with `worst_gap_ms` — both come from the same cgroup.
21 pub worst_gap_cpu: usize,
22 /// Worst migration ratio across any cgroup (highest).
23 pub worst_migration_ratio: f64,
24 /// Sum of iteration counts across all cgroups.
25 pub total_iterations: u64,
26 // worst_page_locality is NO LONGER a typed field: it is
27 // `crate::stats::MetricKind::WorstLowest` (NumaLocal/NumaTotal), re-pooled
28 // None-aware post-merge by `populate_run_distribution_metrics` from the
29 // per-phase NUMA carriers (the lowest per-cgroup page_locality, a measured
30 // 0.0 winning) — the `worst_page_locality` method shared with
31 // `run_metric`. The deleted typed field folded via `fold_lowest_nonzero`,
32 // which SKIPPED a measured 0.0 and reported a better-than-worst cross-run
33 // value; the ext re-pool fixes both the field and the read surfaces at once.
34 // worst_cross_node_migration_ratio is NO LONGER a typed field: it is
35 // `crate::stats::MetricKind::WorstCrossNodeRatio`, re-pooled post-merge by
36 // `populate_run_distribution_metrics` from the per-phase NUMA carriers (the
37 // MAX per-cgroup cross-node churn ratio over the latest residency total) via
38 // the `worst_cross_node_migration_ratio` method shared with `run_metric`. The
39 // deleted typed `Gauge(Last)` field/GauntletRow column was merge-max-folded
40 // within-run but cross-run averaged each run's value over `passes_observed`
41 // (folding a NUMA-less passing run's 0.0 sentinel into the mean), AND
42 // diverged from `run_metric` (which already re-derived from the per-phase
43 // carriers) — so the sidecar and the in-test read gave different values on
44 // multi-phase runs. The ext re-pool writes no key for a never-measured run,
45 // so the MEAN divides only by runs that measured NUMA. A CHURN ratio
46 // (cumulative migration EVENTS / final-snapshot resident pages), so it can
47 // exceed 1.0; NOT a bounded `[0,1]` fraction.
48 // worst_wake_latency_tail_ratio is NO LONGER a typed field: it is
49 // `crate::stats::MetricKind::WakeLatencyTailRatio`, re-selected into
50 // `ext_metrics` post-merge by `populate_run_distribution_metrics` (max
51 // over the per-cgroup `CgroupStats::wake_latency_tail_ratio` values,
52 // floor-gated below WAKE_LATENCY_TAIL_RATIO_MIN_ITERATIONS); `MetricDef::read`
53 // surfaces it via the ext fallback.
54 /// Extensible metrics for the generic comparison pipeline.
55 /// Populated from per-cgroup ext_metrics (worst value across cgroups).
56 pub ext_metrics: BTreeMap<String, f64>,
57 /// Per-phase metric buckets in step-index order. A scenario
58 /// with N Steps populates `N + 1` entries: phase 0 is the
59 /// BASELINE settle window before Step 0 fires, phases
60 /// 1..=N align with Step 0..Step N-1 in scenario order
61 /// (1-indexed Steps so the BASELINE encoding doesn't collide
62 /// with first-Step's index).
63 ///
64 /// Empty when the scenario produced no periodic captures
65 /// (Default::default() yields `vec![]`). The existing
66 /// flat-bucket scalars on this struct are independent of the
67 /// per-phase view — they remain the "all phases merged"
68 /// reading, unchanged in semantics by the introduction of
69 /// `phases`.
70 ///
71 /// **Auto-populated by the framework**: scenarios that fire
72 /// periodic captures (via
73 /// [`crate::test_support::KtstrTestEntry::num_snapshots`] or
74 /// [`crate::scenario::ops::Op::CaptureSnapshot`]) have this
75 /// field populated automatically inside
76 /// `crate::test_support::eval`'s `evaluate_vm_result` —
77 /// test code never needs to call
78 /// [`crate::assert::build_phase_buckets`] manually. The auto-
79 /// populate path drains the snapshot bridge from the
80 /// [`crate::vmm::VmResult`] returned by the framework and folds
81 /// the per-sample readings through
82 /// `crate::stats::aggregate_samples_for_phase` per metric.
83 /// Single-phase scenarios that fire no captures leave this
84 /// `vec![]`; the flat-bucket scalars on this struct cover the
85 /// single-phase case.
86 ///
87 /// See [`PhaseBucket`] for the per-phase shape.
88 #[serde(default)]
89 pub phases: Vec<PhaseBucket>,
90}
91
92impl ScenarioStats {
93 /// Look up the phase bucket for a [`Phase`] — `Phase::BASELINE` for the
94 /// pre-first-Step settle window, `Phase::step(k)` for the test author's
95 /// 0-indexed Step k. The typed `Phase` keeps the 1-indexed encoding (Step 0
96 /// lives at the underlying `step_index = 1`) invisible at the call site.
97 ///
98 /// Returns `None` when no bucket with that phase exists (single-phase
99 /// scenario, the scenario didn't reach that Step, or a phase past the last).
100 pub fn phase(&self, phase: Phase) -> Option<&PhaseBucket> {
101 self.phases.iter().find(|p| p.step_index == phase.as_u16())
102 }
103
104 /// Shortcut: look up a single metric value in a specific
105 /// phase by phase-index. Returns `None` when:
106 /// (a) the phase is absent (no bucket with `step_index` in
107 /// [`Self::phases`]),
108 /// (b) the phase exists but had no finite samples for that
109 /// metric, OR
110 /// (c) `metric` is a dynamic [`crate::stats::MetricId`] key (a
111 /// scheduler-runtime / payload string) not present in this phase's
112 /// stores. A built-in [`crate::stats::BuiltinMetric`] cannot be a
113 /// typo — that is a compile error — so this case needs a dynamic key.
114 ///
115 /// Two stores are checked: [`PhaseBucket::metrics`] (via
116 /// [`PhaseBucket::get`]) and, failing that, the cross-cgroup phase
117 /// SUM of a per-cgroup Counter ([`PhaseBucket::cgroup_counter_total`])
118 /// for `"total_migrations"` / `"total_iterations"` / `"total_cpu_time_ns"` —
119 /// registered `Counter`s with no per-sample source, so they live ONLY in the
120 /// per-cgroup carriers; without this fallback the pooled lookup
121 /// returned a silent `None` while the per-cgroup
122 /// [`Self::phase_cgroup_metric`] surfaced the value. Symmetric with
123 /// [`crate::vmm::VmResult::phase_metric`].
124 ///
125 /// Sentinel-free: `Some(0.0)` means the reducer produced a
126 /// real zero from finite samples, NOT "missing data". See
127 /// [`PhaseBucket::metrics`] for the registry source. When
128 /// debugging an unexpected `None` on a dynamic key, check
129 /// [`crate::stats::MetricId::def`]`().is_some()` to tell an unregistered
130 /// key from absent data (built-in ids always resolve).
131 ///
132 /// Pass `Phase::BASELINE` for the settle window or `Phase::step(k)` for the
133 /// test author's 0-indexed Step k — the typed `Phase` hides the 1-indexed
134 /// encoding (see [`Self::phase`]).
135 pub fn phase_metric(
136 &self,
137 phase: Phase,
138 metric: impl Into<crate::stats::MetricId>,
139 ) -> Option<f64> {
140 let metric = metric.into();
141 self.phase(phase).and_then(|p| {
142 p.get(metric.as_str())
143 .or_else(|| p.cgroup_counter_total(metric.as_str()))
144 })
145 }
146
147 /// Cross-cgroup balance: the ratio of the busiest cell's per-worker
148 /// throughput to the quietest's — `max / min` over each cgroup's
149 /// [`CgroupStats::iterations_per_worker`]. The bread-and-butter
150 /// scheduler-fairness assertion (every balance test hand-rolls this
151 /// `max/min` over `self.cgroups` today).
152 ///
153 /// No-worker cgroups (`iterations_per_worker() == None`) are SKIPPED: a
154 /// 0-worker cell is a config condition, not a balance signal. Returns
155 /// `None` when fewer than two cgroups have workers (a ratio needs two);
156 /// check the cgroup count separately if every declared cell must have
157 /// workers. A cell that ran workers but completed zero iterations
158 /// (measured `Some(0.0)`) drives the ratio to `f64::INFINITY` so
159 /// starvation SURFACES rather than vanishing — matching the
160 /// `None`-vs-`Some(0.0)` discipline of
161 /// [`CgroupStats::iterations_per_worker`]. For an explicit starvation
162 /// gate, check `min > 0` over the same cgroups separately.
163 ///
164 /// Whole-run aggregate: this reads `self.cgroups`, which sums over all
165 /// phases. For a single phase's per-cgroup balance in a multi-phase
166 /// scenario, read each cgroup's per-phase throughput (`total_iterations`
167 /// / `num_workers`) from [`crate::vmm::VmResult::phase_cgroup`] — the
168 /// per-phase per-cgroup carriers now landed in
169 /// [`PhaseBucket::per_cgroup`].
170 pub fn cgroup_balance_ratio(&self) -> Option<f64> {
171 let mut min = f64::INFINITY;
172 let mut max = 0.0_f64;
173 let mut n = 0usize;
174 for cg in &self.cgroups {
175 if let Some(rate) = cg.iterations_per_worker() {
176 min = min.min(rate);
177 max = max.max(rate);
178 n += 1;
179 }
180 }
181 if n < 2 {
182 return None;
183 }
184 if min == 0.0 {
185 // A with-worker cell did zero work: starvation. Surface it as an
186 // infinite ratio rather than a NaN (0/0) or a hidden None.
187 return Some(f64::INFINITY);
188 }
189 Some(max / min)
190 }
191
192 /// Per-cgroup analog of [`Self::phase_metric`]: look up `metric` for a named
193 /// `cgroup` in a [`Phase`] (`Phase::BASELINE` / `Phase::step(k)`), via that
194 /// cgroup's per-phase carrier ([`PhaseCgroupStats::get`]), falling back to
195 /// [`PhaseCgroupStats::cgroup_counter`] for the per-cgroup Counters
196 /// `total_migrations`/`total_iterations`/`total_cpu_time_ns`. `None` when the
197 /// phase has no bucket, no carrier for `cgroup`, the carrier carried no finite
198 /// value for the metric, OR the metric is an unregistered dynamic key (a
199 /// built-in id is typo-proof; an unregistered [`crate::stats::MetricId`] has no
200 /// [`crate::stats::MetricId::def`], same as [`Self::phase_metric`]). The
201 /// N-cgroups-to-N-queryable-sets surface on the AssertResult-holding path (the
202 /// in-VM `post_vm` path uses [`crate::vmm::VmResult::phase_cgroup_metric`]).
203 pub fn phase_cgroup_metric(
204 &self,
205 phase: Phase,
206 cgroup: &str,
207 metric: impl Into<crate::stats::MetricId>,
208 ) -> Option<f64> {
209 let metric = metric.into();
210 self.phase(phase)
211 .and_then(|p| p.per_cgroup.get(cgroup))
212 .and_then(|pc| {
213 pc.get(metric.as_str())
214 .or_else(|| pc.cgroup_counter(metric.as_str()))
215 })
216 }
217
218 /// True iff the scenario produced at least one Step-phase
219 /// bucket (any phase with `step_index >= 1`). False when
220 /// `phases` is empty OR contains only `BASELINE` (the
221 /// pre-first-Step settle window).
222 ///
223 /// Use this to fail a phase-aware assertion BEFORE calling
224 /// [`Self::phase`] with a `Phase::step(k)` on a scenario that
225 /// silently never advanced past BASELINE: a test that declared
226 /// no `Step`s, OR a scenario that bailed in setup before any
227 /// `Step` ran, would otherwise see [`Self::phase`] return
228 /// `None` for every Step and the test would either panic on
229 /// `.expect(...)` or pass vacuously.
230 ///
231 /// ```ignore
232 /// anyhow::ensure!(
233 /// r.stats.has_steps(),
234 /// "scenario produced no Step-phase buckets — \
235 /// declare a Step or read Phase::BASELINE",
236 /// );
237 /// let throughput = r.stats.phase_metric(Phase::step(0), "throughput");
238 /// ```
239 pub fn has_steps(&self) -> bool {
240 self.phases.iter().any(|p| p.step_index >= 1)
241 }
242
243 /// Run-level value for a metric by registry name, for the
244 /// ext-sourced metric family that carries no typed
245 /// `ScenarioStats` field.
246 ///
247 /// Resolves [`Self::ext_metrics`] — the run-level map the
248 /// framework fills post-merge with every metric whose value has no
249 /// typed struct field: the pooled wake-latency / run-delay
250 /// distributions and worst-cgroup iteration efficiencies
251 /// (the `MetricKind::Distribution` / `MetricKind::WorstLowest`
252 /// registry kinds — `worst_p99_wake_latency_us`, `worst_run_delay_us`,
253 /// `worst_iterations_per_cpu_sec`, …), the derived rates
254 /// (`iteration_rate`, and the pooled `iterations_per_cpu_sec` —
255 /// distinct from the `worst_iterations_per_cpu_sec` selector above),
256 /// the per-thread-group `system_time_ns` / `user_time_ns`, and
257 /// `avg_imbalance_ratio` / `avg_dsq_depth`. This is the
258 /// run-level analogue of [`Self::phase_metric`] for that family:
259 /// code holding the run's [`AssertResult`] reads
260 /// `r.stats.run_metric("worst_run_delay_us")` instead of reaching
261 /// into the raw `ext_metrics` map by string key (`ScenarioStats` is
262 /// the [`AssertResult::stats`] field — the value a test body, or a
263 /// callback that builds an `AssertResult` via `collect_all` /
264 /// `execute_scenario`, holds). A `post_vm` callback instead receives
265 /// a `VmResult`, which has NO `stats` field and no run-level
266 /// Distribution surface — compare those cross-run via `cargo ktstr
267 /// perf-delta`.
268 ///
269 /// The ext family is populated only by the `#[ktstr_test]` eval
270 /// flow's post-merge producer
271 /// ([`populate_run_distribution_metrics`]). An `AssertResult` built
272 /// by a DIRECT host assertion (`assert_not_starved` /
273 /// `AssertPlan::assert_cgroup`, which never run that producer)
274 /// carries the per-cgroup values on [`Self::cgroups`] but none of
275 /// these run-level roll-ups, so `run_metric` returns `None` for them
276 /// on that path — read the per-cgroup `CgroupStats` field directly
277 /// (e.g. `r.stats.cgroups[i].p99_wake_latency_us`) there.
278 ///
279 /// Sentinel-free, matching [`Self::phase_metric`]: `None` means
280 /// the metric is absent from this run (no contributing cgroup or
281 /// carrier, or a name not present in the map); `Some(0.0)` is a
282 /// real measured zero. Check [`crate::stats::MetricId::def`] on a dynamic
283 /// key to tell an unregistered key from genuinely-absent data (built-in
284 /// ids always resolve). (The map also carries any
285 /// user-defined extensible-metric keys, plus the framework-internal
286 /// Rate-component Counters — `total_phase_iterations` /
287 /// `total_phase_duration_sec` / `total_iterations_pooled` /
288 /// `total_cpu_time_sec`, the numerator/denominator plumbing behind
289 /// `iteration_rate` / `iterations_per_cpu_sec` — all of which resolve
290 /// here too; prefer the derived rate over its raw components.)
291 ///
292 /// RESOLVED here None-aware, ahead of the ext lookup, via a typed dispatch
293 /// (`typed_sentinel_metric`): the cross-cgroup metrics `worst_spread`,
294 /// `worst_migration_ratio`, `worst_gap_ms`, `total_migrations`,
295 /// `total_iterations`, `worst_page_locality`,
296 /// `worst_cross_node_migration_ratio`. The dispatch RE-DERIVES each from the
297 /// per-cgroup (`self.cgroups`) / per-phase (`self.phases[].per_cgroup`, the
298 /// NUMA metrics) carriers — `None` when no carrier measured it, `Some(0.0)`
299 /// for a measured zero — preserving the sentinel-free contract. The 5
300 /// non-NUMA metrics are 0.0-sentinel typed struct fields (a not-measured
301 /// carrier coerces to 0.0, indistinguishable from a measured zero, so the
302 /// re-derivation recovers the distinction); the two NUMA roll-ups
303 /// (`worst_page_locality`, `worst_cross_node_migration_ratio`) have no struct
304 /// field and re-pool purely from the per-phase NUMA carriers.
305 /// (`worst_wake_latency_tail_ratio` resolves via the ext lookup as the
306 /// `WakeLatencyTailRatio` key.)
307 ///
308 /// NOT resolved here (not in `ext_metrics`, no typed dispatch):
309 /// - the monitor-sourced run-level metrics (`max_imbalance_ratio`,
310 /// `max_dsq_depth`, `stuck_count`, `total_fallback`,
311 /// `total_keep_last`), which `ScenarioStats` does not hold
312 /// run-level — read those per-phase via [`Self::phase_metric`].
313 ///
314 /// So this resolves the ext-sourced family AND the typed cross-cgroup fields
315 /// (via the dispatch); only the monitor-sourced run-level metrics remain
316 /// unresolved here (`ScenarioStats` holds them only per-phase).
317 pub fn run_metric(&self, metric: impl Into<crate::stats::MetricId>) -> Option<f64> {
318 let id = metric.into();
319 // The sentinel-prone cross-cgroup run-level metrics re-derive None-aware
320 // from the per-cgroup / per-phase carriers, recovering the never-measured
321 // vs measured-zero distinction the 0.0-coerced struct fields (or, for
322 // worst_page_locality, the absent field) cannot carry. A non-sentinel id
323 // returns `None` from the dispatch and falls through to the ext-metrics map.
324 if let crate::stats::MetricId::Builtin(b) = &id
325 && let Some(resolved) = self.typed_sentinel_metric(*b)
326 {
327 return resolved;
328 }
329 self.ext_metrics.get(id.as_str()).copied()
330 }
331
332 /// Re-derive a typed run-level metric None-aware from the per-cgroup /
333 /// per-phase carriers — recovering the never-measured (`None`) vs
334 /// measured-zero (`Some(0.0)`) distinction a bare 0.0-sentinel `f64` cannot
335 /// carry. The 0.0-sentinel typed struct fields (`worst_spread`,
336 /// `worst_migration_ratio`, ...) coerce a not-measured carrier to 0.0 in
337 /// `scenario_stats_for_cgroup` (indistinguishable from a measured zero);
338 /// `worst_page_locality` is no longer a struct field at all — it re-pools
339 /// purely from the per-phase NUMA carriers (see below).
340 ///
341 /// Returns `None` when `m` is NOT one of the typed sentinel ids — the caller
342 /// then falls through to the ext-metrics lookup. `Some(None)` when `m` IS a
343 /// sentinel id but no contributing carrier exists (loud-absent). `Some(Some(v))`
344 /// for the re-derived value (a measured zero stays `Some(0.0)`).
345 ///
346 /// The 5 non-NUMA metrics source from `self.cgroups`, whose `Option` / counter
347 /// carriers preserve the measured-vs-unmeasured state. The 2 NUMA roll-ups
348 /// (`worst_page_locality`, `worst_cross_node_migration_ratio` — neither a
349 /// struct field) source from `self.phases[].per_cgroup`, for different
350 /// reasons: `page_locality` is structurally 0.0 on the `cgroup_stats` path
351 /// (it needs an expected-node set the reports-only builder lacks), so
352 /// `self.cgroups` cannot source it; `cross_node_migration_ratio` IS populated
353 /// on `cgroup_stats`, but only as a single pre-folded scalar that cannot carry
354 /// the cross-phase fold (SUM the per-phase migration-counter deltas over the
355 /// LATEST residency total), so it too re-pools from the per-phase
356 /// `PhaseCgroupStats` NUMA counters — which also single-sources it with the
357 /// ext/sidecar value. On a phase-less direct-assertion `AssertResult` (no NUMA
358 /// capture) both NUMA roll-ups read `None` (correct loud-absent).
359 fn typed_sentinel_metric(&self, m: crate::stats::BuiltinMetric) -> Option<Option<f64>> {
360 use crate::stats::BuiltinMetric as B;
361 Some(match m {
362 // worst_spread: highest spread over cgroups that measured it
363 // (CgroupStats::spread is already Option — None when no worker had
364 // measurable wall time). None iff no cgroup measured spread.
365 B::WorstSpread => self
366 .cgroups
367 .iter()
368 .filter_map(|c| c.spread)
369 .reduce(f64::max),
370 // worst_migration_ratio: highest migration ratio over cgroups that
371 // ran iterations (a 0.0 with total_iterations>0 is measured; with
372 // ==0 it is the rate-over-zero sentinel — never-measured).
373 B::WorstMigrationRatio => self
374 .cgroups
375 .iter()
376 .filter(|c| c.total_iterations > 0)
377 .map(|c| c.migration_ratio)
378 .reduce(f64::max),
379 // total_migrations / total_iterations: cross-cgroup SUMs. 0 is a real
380 // measured sum (cgroups ran, zero events); never-measured = no cgroups.
381 B::TotalMigrations => (!self.cgroups.is_empty()).then(|| {
382 self.cgroups
383 .iter()
384 .map(|c| c.total_migrations)
385 .fold(0u64, u64::saturating_add) as f64
386 }),
387 B::TotalIterations => (!self.cgroups.is_empty()).then(|| {
388 self.cgroups
389 .iter()
390 .map(|c| c.total_iterations)
391 .fold(0u64, u64::saturating_add) as f64
392 }),
393 // worst_gap_ms: longest gap over cgroups with workers (gap 0 with
394 // workers = measured "no gap observed"; never-measured = no workers).
395 B::WorstGapMs => self
396 .cgroups
397 .iter()
398 .filter(|c| c.num_workers > 0)
399 .map(|c| c.max_gap_ms as f64)
400 .reduce(f64::max),
401 // worst_page_locality: LOWEST per-cgroup locality (the worst cell) over
402 // cgroups that measured NUMA residency — a measured 0.0 (all pages
403 // off-node) is the worst and WINS the lowest fold. Shared with the
404 // ext/sidecar re-pool via worst_page_locality() so the typed read and
405 // the cross-run comparison value agree (the same fix replaces the
406 // fold_lowest_nonzero sentinel-skip on BOTH surfaces).
407 B::WorstPageLocality => self.worst_page_locality(),
408 // worst_cross_node_migration_ratio: highest cross-node churn ratio over
409 // cgroups that measured NUMA. Shared with the ext/sidecar re-pool via
410 // worst_cross_node_migration_ratio() so the typed read and the cross-run
411 // comparison value agree (the divergence the typed field had).
412 B::WorstCrossNodeMigrationRatio => self.worst_cross_node_migration_ratio(),
413 _ => return None,
414 })
415 }
416
417 /// Aggregate each cgroup's NUMA carriers across `self.phases` into
418 /// `(latest_local, latest_total, summed_migrated)`. The cross-phase fold is
419 /// ASYMMETRIC by counter class: `numa_pages_local` / `numa_pages_total` are
420 /// current-residency GAUGE snapshots (the live `/proc/.../numa_maps`
421 /// residency, recomputed each read), so the LATEST MEASURED phase's value is the run-end placement
422 /// — SUMMING across phases would multiply residency by the phase count (a
423 /// silent over-count); `cross_node_migrated` is a per-phase migration-counter
424 /// DELTA over disjoint intervals, so it SUMS to the run total. The shared
425 /// `latest_total` denominator serves both ratios (page_locality and
426 /// cross_node share one page total), matching `cgroup_stats`, which reads the
427 /// final-snapshot residency. One entry per cgroup that appeared in any phase.
428 ///
429 /// LATEST is None-aware: a later phase carrier that measured NO NUMA pages
430 /// (`numa_pages_total == 0` — an empty backdrop slice, a non-NUMA `WorkType`
431 /// phase, or a zero-worker carrier) does NOT overwrite an earlier MEASURED
432 /// residency. A bare overwrite would zero out the earlier snapshot, and the
433 /// `total > 0` filter in `worst_page_locality()` would then drop the cgroup
434 /// from the worst-pool — silently reporting a better-than-worst run-level
435 /// locality (the same measured-vs-unmeasured discipline the cross-cgroup fold
436 /// applies, extended to the cross-phase axis).
437 fn numa_agg_per_cgroup(&self) -> Vec<(u64, u64, u64)> {
438 let mut by_cg: std::collections::BTreeMap<&str, (u64, u64, u64)> =
439 std::collections::BTreeMap::new();
440 // self.phases is in step (chronological) order, so a later MEASURED
441 // phase's residency snapshot overwrites the earlier (LATEST wins); a
442 // not-measured (total == 0) phase leaves the prior snapshot intact. The
443 // migration deltas accumulate unconditionally (a 0 delta adds nothing).
444 for phase in &self.phases {
445 for (name, pc) in &phase.per_cgroup {
446 let e = by_cg.entry(name.as_str()).or_insert((0, 0, 0));
447 if pc.numa_pages_total > 0 {
448 e.0 = pc.numa_pages_local;
449 e.1 = pc.numa_pages_total;
450 }
451 // saturating: guest-runtime migration counter pooled across
452 // phases; never wrap the derived cross-node-migration ratio.
453 e.2 = e.2.saturating_add(pc.cross_node_migrated);
454 }
455 }
456 by_cg.into_values().collect()
457 }
458
459 /// The run-level WORST (lowest) per-cgroup page-locality fraction, re-pooled
460 /// None-aware from the per-phase NUMA carriers: the lowest
461 /// `page_locality_of(latest_local, latest_total)` over cgroups that measured
462 /// NUMA residency (`numa_pages_total > 0`) — a measured 0.0 (all pages
463 /// off-node) WINS the lowest; an all-unmeasured cohort yields `None`. Shared
464 /// by `run_metric`'s `WorstPageLocality` dispatch AND
465 /// `populate_run_distribution_metrics`'s ext/sidecar re-pool so the typed
466 /// read and the cross-run comparison value are byte-identical.
467 fn worst_page_locality(&self) -> Option<f64> {
468 self.numa_agg_per_cgroup()
469 .into_iter()
470 .filter(|&(_, total, _)| total > 0)
471 .map(|(local, total, _)| super::reductions::page_locality_of(local, total))
472 .reduce(f64::min)
473 }
474
475 /// The run-level WORST (highest) per-cgroup cross-node migration-churn ratio,
476 /// re-pooled from the per-phase NUMA carriers: the MAX
477 /// `cross_node_migration_ratio_of(summed_migrated, latest_total)` over cgroups
478 /// that measured NUMA residency (`numa_pages_total > 0`) — an all-unmeasured
479 /// cohort yields `None`. The polarity twin of `worst_page_locality` (LowerBetter
480 /// → highest-wins). Shared by `run_metric`'s `WorstCrossNodeMigrationRatio`
481 /// dispatch AND `populate_run_distribution_metrics`'s ext/sidecar re-pool so the
482 /// typed read and the cross-run comparison value are byte-identical (the typed
483 /// `Gauge(Last)` field diverged from this on multi-phase runs).
484 fn worst_cross_node_migration_ratio(&self) -> Option<f64> {
485 self.numa_agg_per_cgroup()
486 .into_iter()
487 .filter(|&(_, total, _)| total > 0)
488 .map(|(_, total, migrated)| {
489 super::reductions::cross_node_migration_ratio_of(migrated, total)
490 })
491 .reduce(f64::max)
492 }
493}
494
495/// Registry metric names that already have a typed `GauntletRow` field — the
496/// typed accessor populates them at `sidecar_to_row` time and
497/// `MetricDef::read` prefers the accessor over `ext_metrics`, so writing the
498/// same key into `ext_metrics` would create unread sidecar bloat AND
499/// double-source the run-level value. For `stuck_count` the typed whole-run
500/// count (`MonitorSummary::stuck_count`, windowed over the full sample
501/// stream) is authoritative; the per-phase fold sum shares the
502/// `is_cpu_stuck` predicate but is a lower-or-equal (`<=`),
503/// partition-dependent quantity (it drops cross-boundary + out-of-phase
504/// windows, so it falls strictly below once any of those is stuck), so
505/// injecting the ext copy would shadow the authoritative typed value with
506/// a redundant — and, once a dropped window is stuck, divergent —
507/// number. Both run-level ext-metrics populators consult this — the
508/// SampleSeries
509/// path ([`populate_run_ext_metrics`]) and the phase-fold path
510/// ([`populate_run_ext_metrics_from_phases`]) — so only ext-metrics-only
511/// registry entries are written and a typed-backed metric's run-level value
512/// always comes from its accessor. `max_imbalance_ratio` is included because
513/// its accessor reads the typed `GauntletRow.imbalance_ratio` (whole-run
514/// MonitorSummary); its per-phase monitor fold feeds rendering only.
515const TYPED_FIELD_NAMES: &[&str] = &[
516 "max_dsq_depth",
517 "max_imbalance_ratio",
518 "total_fallback",
519 "total_keep_last",
520 "stuck_count",
521 "total_iterations",
522 "total_migrations",
523];
524
525/// Sibling of [`populate_run_ext_metrics`] that mines per-phase
526/// metrics back into the run-level `ext_metrics` map. Closes the
527/// gap for registered metrics whose values live in
528/// `PhaseBucket.metrics` but never reach `ext_metrics` via the
529/// SampleSeries path (their `read_sample` returns `None`):
530/// `avg_imbalance_ratio` (sourced from MonitorSample windowing
531/// inside [`build_phase_buckets`]), `iteration_rate` (sourced from
532/// stimulus event totals inside [`build_phase_buckets_with_stimulus`]),
533/// and `system_time_ns` / `user_time_ns` (per-thread-group CPU-time
534/// deltas injected by `phase_group_cpu_delta` inside
535/// `buckets_from_grouped`). The fold is generic over every key
536/// present on any phase, so it carries any such phase-only metric (the
537/// ext-metrics-only set whose `read_sample` returns `None`). Keys with a
538/// typed `GauntletRow` field (`TYPED_FIELD_NAMES`) are SKIPPED: their
539/// run-level value comes from the typed accessor (which wins on read), so
540/// re-injecting them here would double-source the run aggregate — the
541/// hazard the const's doc describes. Their per-phase `PhaseBucket` value
542/// still feeds per-phase rendering.
543///
544/// Per-phase reduction dispatch is described on [`PhaseBucket`];
545/// the cross-phase fold here uses `sample_count` as the weight so
546/// Gauge(Avg) keys get the weighted mean (the correct cross-phase
547/// semantic for typical-load metrics) while other kinds fold per
548/// their natural reduction. Existing keys in `target` are not
549/// overwritten — `read_sample` path values win when both produced
550/// an entry.
551///
552/// Without this fill, `cargo ktstr perf-delta` silently misses
553/// these phase-only metrics (avg_imbalance_ratio, iteration_rate,
554/// system_time_ns, user_time_ns) in flat-row output because
555/// `MetricDef::read` falls back to ext_metrics and finds nothing.
556pub fn populate_run_ext_metrics_from_phases(
557 phases: &[PhaseBucket],
558 target: &mut std::collections::BTreeMap<String, f64>,
559) {
560 // No early-return on empty `phases`: the derive_rate_metrics post-pass
561 // below must still run over whatever components populate_run_ext_metrics
562 // already inserted into `target` (the empty-phases case), so a run-level
563 // Rate is re-derived rather than silently dropped. The loops below are
564 // no-ops when `phases` is empty.
565 // Collect every metric key that appears on any phase.
566 let mut keys: std::collections::BTreeSet<&String> = std::collections::BTreeSet::new();
567 for phase in phases {
568 for key in phase.metrics.keys() {
569 keys.insert(key);
570 }
571 }
572 for key in keys {
573 if target.contains_key(key) {
574 continue;
575 }
576 let Some(def) = crate::stats::metric_def(key) else {
577 continue;
578 };
579 // Derived metrics (every `is_derived()`: Rate / Distribution / WorstLowest /
580 // WakeLatencyTailRatio / WorstCrossNodeRatio / PerPhase / PerRunDistribution)
581 // are produced
582 // from their pooled components, not folded as per-phase values: skip
583 // here. A Rate re-derives after the loop (Σnum/Σdenom over the folded
584 // components); the distributional kinds (Distribution / WorstLowest /
585 // WakeLatencyTailRatio / WorstCrossNodeRatio) are re-pooled run-level by
586 // `populate_run_distribution_metrics` (and never appear in
587 // phase.metrics anyway); PerPhase is re-derived by `derive_phase_metrics`.
588 // Folding a ready-made derived value would lose
589 // the re-pool, and routing one into aggregate_samples_weighted within
590 // a run is not its producer path.
591 if def.kind.is_derived() {
592 continue;
593 }
594 // Typed-backed keys (those in TYPED_FIELD_NAMES — a typed GauntletRow
595 // accessor that wins on read) must NOT be re-injected into ext_metrics
596 // from the phase fold: the ext copy would be unread bloat and, for
597 // stuck_count (whose per-phase fold sum is `<=` the typed whole-run
598 // count, strictly below once a cross-boundary/out-of-phase window is
599 // stuck — they share the is_cpu_stuck predicate but the run-level
600 // count windows the full stream), a redundant-or-divergent value,
601 // not a guaranteed duplicate. Their per-phase
602 // PhaseBucket value still feeds rendering; the run-level value stays
603 // the typed path. Mirrors the sibling populate_run_ext_metrics.
604 // (Without this, folding max_imbalance_ratio + stuck_count onto
605 // captured buckets would leak both into ext_metrics on the common
606 // path.)
607 if TYPED_FIELD_NAMES.contains(&key.as_str()) {
608 continue;
609 }
610 // avg_nr_running is NOT typed-backed (no GauntletRow accessor), but its
611 // authoritative run-level value is MonitorSummary::avg_nr_running, folded
612 // by fold_run_level_ext. It is the one monitor-summary-fold key that ALSO
613 // appears in per-phase bucket.metrics (fold_monitor_into_bucket writes it
614 // for rendering). Skip it here so the per-phase re-pool never claims the
615 // run-level key: VmResult::run_metric runs this re-pool BEFORE
616 // fold_run_level_ext, and the fold's `or_insert` would then no-op,
617 // silently replacing the whole-run value with the per-phase weighted
618 // mean. Its per-phase PhaseBucket value still feeds rendering +
619 // change-detection. (TYPED_FIELD_NAMES is the typed-accessor analogue of
620 // this same skip rationale.)
621 if key == "avg_nr_running" {
622 continue;
623 }
624 // Per-phase (value, sample_count) for the kind-aware fold.
625 // A phase that doesn't carry the key contributes nothing.
626 // Lock-step shape enforced by the (f64, usize) pair type.
627 // `sample_count.max(1)` is load-bearing for Gauge(Avg) keys: a
628 // synthesized zero-capture phase (the
629 // build_phase_buckets_with_stimulus seam) carrying a
630 // capture-independent Gauge(Avg) value at sample_count==0 gets
631 // weight 1 (one phase observation) rather than being zero-weighted
632 // out of the run-level mean. The floor is a no-op for
633 // Counter/DeltaSum keys, which sum with weights ignored (see
634 // aggregate_finite): iteration_rate's components
635 // total_phase_iterations / total_phase_duration_sec are such
636 // Counters, so a synthesized step's iterations are INCLUDED in the
637 // re-pooled iteration_rate via the sum — the run-aggregate
638 // completion of the per-step rate handling (iteration_rate itself is a
639 // Rate, skipped above and re-derived below). A regression dropping
640 // the floor would silently re-drop a zero-capture step's Gauge(Avg)
641 // value from the sidecar aggregate.
642 let pairs: Vec<(f64, usize)> = phases
643 .iter()
644 .filter_map(|phase| {
645 phase
646 .metrics
647 .get(key)
648 .copied()
649 .map(|v| (v, phase.sample_count.max(1)))
650 })
651 .collect();
652 if pairs.is_empty() {
653 continue;
654 }
655 // PerPhaseDeltaSum folds cross-PHASE by SUM: the run-level value is the
656 // sum of the disjoint per-phase OBSERVED CPU-time deltas (a lower bound
657 // on total run CPU time — excludes head / tail / inter-phase-gap
658 // windows; see the MetricKind::PerPhaseDeltaSum doc). It is NOT routed
659 // through aggregate_samples_weighted, whose PerPhaseDeltaSum arm
660 // implements the CROSS-RUN unweighted mean — mean-folding the phases
661 // here would double-count duration (the bug this kind fixes). Weights
662 // (phase sample_count) are irrelevant to a sum.
663 if def.kind == crate::stats::MetricKind::PerPhaseDeltaSum {
664 let sum: f64 = pairs.iter().map(|(v, _)| v).sum();
665 target.insert(key.clone(), sum);
666 continue;
667 }
668 if let Some(reduced) = crate::stats::aggregate_samples_weighted(&pairs, def.kind) {
669 target.insert(key.clone(), reduced);
670 }
671 }
672 // Re-derive Rate metrics from the now-folded components so the run
673 // rate is Σnumerator / Σdenominator (the components folded by their
674 // own kinds above — a Counter numerator summed across phases).
675 crate::stats::derive_rate_metrics(target);
676}
677
678/// Run the FULL run-level `ext_metrics` population sequence into `stats` — the
679/// single source of truth shared by the eval layer (`evaluate_vm_result`) and
680/// [`crate::vmm::VmResult::run_metric`], so the two produce byte-identical
681/// run-level ext maps for the same run. Reads `samples` + `stats.phases` +
682/// `stats.cgroups`, writes `stats.ext_metrics`, in the canonical order:
683/// 1. [`populate_run_ext_metrics`] — the `read_sample`-wired registry family
684/// (over every freeze in `samples`), then the whole-run wall + IRQ rates.
685/// 2. [`populate_run_ext_metrics_from_phases`] — the phase-only ext metrics
686/// whose `read_sample` is `None` (`avg_imbalance_ratio`, `iteration_rate`,
687/// `system_time_ns`, `user_time_ns`, the per-CPU IRQ spatial maxes, and the
688/// per-cgroup PSI-irq spatial maxes).
689/// 3. [`populate_run_pooled_iterations_per_cpu_sec`] — the pooled cross-cgroup
690/// `iterations_per_cpu_sec` Rate (from `stats.cgroups`).
691/// 4. [`populate_run_pooled_taobench`] — the whole-run taobench qps + hit Rates
692/// pooled cross-cgroup (from `stats.cgroups[].taobench_whole`).
693/// 5. [`populate_run_pooled_taobench_distribution`] — the taobench whole-run
694/// open-loop serve-latency `*_whole` percentiles (union of the per-phase
695/// per-cgroup serve `PlatStats` histograms, percentile re-derived over the union).
696/// 6. [`populate_run_pooled_schbench`] — the schbench whole-run loop Counter +
697/// role-separate run-delay gate-Rates (from
698/// `stats.phases[].per_cgroup[].schbench` raw pairs, summed over phases+cgroups).
699/// 7. [`populate_run_pooled_schbench_distribution`] — the schbench whole-run
700/// latency/rps `*_whole` percentiles (union of the per-phase per-cgroup
701/// `PlatStats` histograms, percentile re-derived over the union).
702/// 8. [`populate_run_distribution_metrics`] — the `Distribution` / `WorstLowest`
703/// / `WakeLatencyTailRatio` / `WorstCrossNodeRatio` re-pools (from
704/// `stats.phases[].per_cgroup` raw samples + `stats.cgroups`).
705///
706/// ORDER IS LOAD-BEARING: step 1 must precede step 2 so the whole-run wall +
707/// whole-run IRQ-counter deltas land before step 2's `contains_key` skip (the
708/// multi-phase-rate fix); steps 3-8 fold the per-cgroup roll-up after the
709/// per-phase families.
710///
711/// `stats.phases` SHOULD be the PRE-`derive_phase_metrics` fold (the host buckets
712/// with the guest per-cgroup carriers folded in, before the per-phase scalar
713/// derivation) — the exact phase shape the eval layer feeds to step 2, since the
714/// eval path runs `derive_phase_metrics` AFTER this call. Feeding the pre-derive
715/// fold reproduces the eval sequence by construction (eval-faithful). Post-derive
716/// phases (e.g. [`crate::vmm::VmResult::phase_buckets`]) yield the SAME map today —
717/// step 2 skips `is_derived` keys, and every pooled scalar `derive_phase_metrics`
718/// writes to `bucket.metrics` (the schbench / taobench scalars) is
719/// `MetricKind::PerPhase` (which `is_derived`), so step 2 drops them either way —
720/// but the pre-derive fold avoids DEPENDING on that skip: a pooled key ever
721/// registered as non-derived would be folded run-level under post-derive,
722/// diverging from the eval map, but not under pre-derive. Pass the pre-derive
723/// fold ([`crate::vmm::VmResult::run_metric`] uses `phase_buckets_pre_derive`).
724pub fn populate_run_ext_all(
725 stats: &mut ScenarioStats,
726 samples: &crate::scenario::sample::SampleSeries,
727) {
728 populate_run_ext_metrics(samples, &mut stats.ext_metrics);
729 populate_run_ext_metrics_from_phases(&stats.phases, &mut stats.ext_metrics);
730 populate_run_pooled_iterations_per_cpu_sec(stats);
731 populate_run_pooled_taobench(stats);
732 populate_run_pooled_taobench_distribution(stats);
733 populate_run_pooled_schbench(stats);
734 populate_run_pooled_schbench_distribution(stats);
735 populate_run_distribution_metrics(stats);
736}
737
738/// Inject the run-level POOLED `iterations_per_cpu_sec` Rate's two Counter
739/// components into `stats.ext_metrics`, summed across the cgroups that have
740/// measured on-CPU time — the cross-cgroup re-pool axis. Rather than routing
741/// the per-cgroup efficiency through `AssertResult::merge`'s worst-by-polarity
742/// `ext_metrics` fold (which picks the WORST cgroup's value, not Σ, and has
743/// no derive post-pass), this reads the already-merged `stats.cgroups` vec
744/// directly: `iterations_per_cpu_sec` = Σ`total_iterations` /
745/// Σ(`total_cpu_time_ns`/1e9) over cgroups with `total_cpu_time_ns > 0` — the
746/// per-cgroup [`CgroupStats::iterations_per_cpu_sec`] re-pooled, NOT a mean of
747/// per-cgroup ratios, NOT the worst single cgroup.
748///
749/// MUST run at the eval layer AFTER the cgroup-bearing merges (every merge that
750/// contributes a [`CgroupStats`], so `stats.cgroups` holds every per-cgroup
751/// entry) and BEFORE the sidecar write. The trailing monitor-verdict merge at
752/// the eval layer merges an `inconclusive()` carrying empty `stats` (no cgroups,
753/// no ext keys), so it is safe to run after this. If component injection ever
754/// moved BEFORE a cgroup-bearing merge, that worst-by-polarity fold would
755/// min/max these Counter keys into single-cgroup scalars, silently corrupting
756/// the pooled sum.
757///
758/// A cgroup with `total_cpu_time_ns == 0` (schedstat unavailable, or
759/// `num_workers == 0`) is EXCLUDED from BOTH sums — mirroring the per-cgroup
760/// [`CgroupStats::iterations_per_cpu_sec`] None-on-zero (`total_cpu_time_ns >
761/// 0` implies `num_workers > 0`, so the one predicate covers both). Crediting
762/// an unmeasured cgroup's iterations against the measured cgroups' CPU-seconds
763/// would overstate cohort efficiency — the silent-wrong-answer this gate
764/// prevents. Both components are inserted both-or-neither (the
765/// `derive_rate_metrics` co-location invariant), only when the summed MEASURED
766/// on-CPU time is > 0 (every cgroup unmeasured ⇒ no rate). The ns→s `/1e9` is
767/// applied ONCE here on the summed ns (not per-cgroup, to avoid repeated float
768/// rounding), since `derive_rate_metrics` is a bare num/den.
769/// `total_iterations_pooled` is a DISTINCT ext-only key, not the typed
770/// `total_iterations` (skipped from ext_metrics; it folds cross-RUN as a MEAN
771/// — a display average — while a Rate numerator must SUM-fold so Σnum/Σdenom
772/// re-pools, so one shared key cannot carry both folds). Because it sums only
773/// MEASURED cgroups, it is ≤ the merge-summed typed `total_iterations` (which
774/// includes any zero-cpu-time cgroups), and equals it unless an excluded
775/// zero-cpu-time cgroup carried iterations>0.
776pub fn populate_run_pooled_iterations_per_cpu_sec(stats: &mut ScenarioStats) {
777 // Exclude cgroups with no measured on-CPU time from BOTH sums (mirrors the
778 // per-cgroup None-on-zero): crediting an unmeasured cgroup's iterations
779 // against the measured cgroups' CPU-seconds would overstate efficiency.
780 // saturating fold: pool the guest-runtime cpu-time-ns / iteration counters
781 // across cgroups; a plain `.sum()` would debug-panic / release-wrap on a
782 // corrupt/hostile component, silently corrupting iterations_per_cpu_sec.
783 let summed_ns: u64 = stats
784 .cgroups
785 .iter()
786 .filter(|c| c.total_cpu_time_ns > 0)
787 .map(|c| c.total_cpu_time_ns)
788 .fold(0u64, u64::saturating_add);
789 if summed_ns == 0 {
790 return;
791 }
792 let summed_iters: u64 = stats
793 .cgroups
794 .iter()
795 .filter(|c| c.total_cpu_time_ns > 0)
796 .map(|c| c.total_iterations)
797 .fold(0u64, u64::saturating_add);
798 stats
799 .ext_metrics
800 .insert("total_iterations_pooled".to_string(), summed_iters as f64);
801 stats
802 .ext_metrics
803 .insert("total_cpu_time_sec".to_string(), summed_ns as f64 / 1e9);
804 crate::stats::derive_rate_metrics(&mut stats.ext_metrics);
805}
806
807/// Inject the whole-run taobench engine's qps + hit Rate components into
808/// `stats.ext_metrics`, pooled across the run's `WorkType::Taobench` cgroups.
809/// Each cgroup carries its workers' merged whole-run aggregate
810/// ([`crate::assert::CgroupStats::taobench_whole`]); this folds those across
811/// cgroups (Σ ops, MAX wall window — the window is shared by the concurrent
812/// cohorts, per `TaobenchStats::merge`) and writes the six `total_taobench_*`
813/// Counter components (`ops`, `fast_ops`, `slow_ops`, `wall_sec`, plus the
814/// command-time `get_cmds` / `get_hits`), from which
815/// `crate::stats::derive_rate_metrics` derives `taobench_total_ops_per_sec`,
816/// `taobench_fast_ops_per_sec`, `taobench_slow_ops_per_sec`, the response-time
817/// `taobench_hit_fraction` (Σfast/Σcompleted), and the command-time
818/// `taobench_command_hit_rate` (Σhits/Σcmds). The whole-run Rate keys are
819/// registered METRICS, so — unlike the per-phase `taobench_*_qps`
820/// (`MetricKind::PerPhase`, invisible to the whole-run cross-run fold) — they
821/// reach the perf-delta `--noise-adjust` spread analysis. The open-loop
822/// serve-latency distribution is a SEPARATE pool
823/// ([`populate_run_pooled_taobench_distribution`], the `*_us_whole` keys).
824///
825/// MUST run post-`merge` (after every cgroup-bearing merge has populated
826/// `stats.cgroups`), exactly like [`populate_run_pooled_iterations_per_cpu_sec`]:
827/// an earlier run would pool over an incomplete cgroup set. A run with no
828/// Taobench cgroup writes nothing (the pool is `None`) — the keys stay absent,
829/// keeping a non-taobench run distinct from a measured zero.
830///
831/// Both-or-neither (the `derive_rate_metrics` co-location invariant): all six
832/// components are inserted together, gated on a measured wall window
833/// (`elapsed_ns > 0`) — the qps denominator. The three per-second Rates then
834/// derive unconditionally (wall_sec > 0); `taobench_hit_fraction` =
835/// `total_taobench_fast_ops` / `total_taobench_ops` derives iff ops completed
836/// (`total_taobench_ops > 0`) and `taobench_command_hit_rate` =
837/// `total_taobench_get_hits` / `total_taobench_get_cmds` iff lookups issued
838/// (`get_cmds > 0`); `derive_rate_metrics` skips a zero-denominator rate, so a
839/// window-but-no-ops run gets qps=0 keys but no false hit fraction / hit rate.
840/// The ns→s `/1e9` is applied ONCE here (not in `derive_rate_metrics`, a bare
841/// num/den), mirroring `total_cpu_time_sec`. Cross-RUN the components SUM-fold
842/// (Counter), so each Rate re-pools as Σnumerator / Σdenominator over the cohort
843/// — the aggregate throughput / hit rate, not a mean of per-run values.
844pub fn populate_run_pooled_taobench(stats: &mut ScenarioStats) {
845 use crate::stats::{
846 TOTAL_TAOBENCH_FAST_OPS, TOTAL_TAOBENCH_GET_CMDS, TOTAL_TAOBENCH_GET_HITS,
847 TOTAL_TAOBENCH_OPS, TOTAL_TAOBENCH_SLOW_OPS, TOTAL_TAOBENCH_WALL_SEC,
848 };
849 // Pool the per-cgroup whole-run aggregates across the run's Taobench cgroups:
850 // Σ ops, MAX wall window (shared by the concurrent cohorts). `None` when no
851 // cgroup ran a Taobench worker.
852 let pooled = stats
853 .cgroups
854 .iter()
855 .filter_map(|c| c.taobench_whole.as_ref())
856 .fold(
857 None,
858 |acc: Option<crate::workload::taobench::run::TaobenchStats>, t| {
859 Some(match acc {
860 Some(mut a) => {
861 a.merge(t);
862 a
863 }
864 None => *t,
865 })
866 },
867 );
868 let Some(w) = pooled else {
869 return;
870 };
871 let c = &w;
872 // qps is undefined without a measured wall window; write no components (so
873 // hit_fraction stays absent too) rather than a 0/0 rate.
874 if c.elapsed_ns == 0 {
875 return;
876 }
877 stats
878 .ext_metrics
879 .insert(TOTAL_TAOBENCH_OPS.to_string(), c.total_ops() as f64);
880 stats
881 .ext_metrics
882 .insert(TOTAL_TAOBENCH_FAST_OPS.to_string(), c.fast_ops as f64);
883 stats
884 .ext_metrics
885 .insert(TOTAL_TAOBENCH_SLOW_OPS.to_string(), c.slow_ops as f64);
886 stats.ext_metrics.insert(
887 TOTAL_TAOBENCH_WALL_SEC.to_string(),
888 c.elapsed_ns as f64 / 1e9,
889 );
890 // Command-time hit components: hits = cmds − misses (request-time). The Rate
891 // taobench_command_hit_rate = Σhits / Σcmds re-derives via derive_rate_metrics
892 // (skipped, hence absent, when no lookups issued). Diverges from the
893 // response-time taobench_hit_fraction under open-loop arrival.
894 stats
895 .ext_metrics
896 .insert(TOTAL_TAOBENCH_GET_CMDS.to_string(), c.get_cmds as f64);
897 stats.ext_metrics.insert(
898 TOTAL_TAOBENCH_GET_HITS.to_string(),
899 c.get_cmds.saturating_sub(c.get_misses) as f64,
900 );
901 crate::stats::derive_rate_metrics(&mut stats.ext_metrics);
902}
903
904/// Inject the taobench WHOLE-RUN open-loop serve-latency percentiles into
905/// `stats.ext_metrics` as the `taobench_serve_*_us_whole` keys, re-pooled
906/// run-level by UNIONING the per-phase per-cgroup serve `PlatStats` histograms
907/// (`stats.phases[].per_cgroup[].taobench.serve_lat`) across every step-attributed
908/// phase and every cgroup, then re-deriving each percentile / min / max over the
909/// merged histogram (`PlatStats::combine` is an associative bucket-count add, so
910/// the merged histogram is the faithful pooled sample set and the re-derived
911/// percentile is the percentile OF the union — NOT a mean of per-source
912/// percentiles). The taobench analog of
913/// [`populate_run_pooled_schbench_distribution`], with the same source: the
914/// BASELINE (epoch 0) and inter-step-gap (`u32::MAX`) epochs are excluded (they
915/// are dropped from `stats.phases` by `expand_backdrop_phase_buckets`), so this is
916/// the steady-state serve distribution over the measured steps — a faithful
917/// PerRunDistribution, distinct from the standalone driver's full-run histogram.
918///
919/// Runs in [`populate_run_ext_all`] (post-merge); reads the per-phase carriers
920/// (disjoint from the taobench counter pool) and writes distinct `*_whole` keys,
921/// so it is order-independent. Keys are written only when the merged histogram
922/// has samples (`sample_count() > 0`) — a closed-loop run (no serve samples)
923/// reads ABSENT, never a false 0. The keys are `MetricKind::PerRunDistribution`:
924/// noise-compared per-run by `crate::stats::noise_findings`, NEVER cross-run
925/// folded (a percentile of a union is not a mean of per-run percentiles, and the
926/// per-phase histograms are dropped at the cross-run boundary).
927pub fn populate_run_pooled_taobench_distribution(stats: &mut ScenarioStats) {
928 use crate::stats::{
929 TAOBENCH_SERVE_MAX_US_WHOLE, TAOBENCH_SERVE_MIN_US_WHOLE, TAOBENCH_SERVE_P50_US_WHOLE,
930 TAOBENCH_SERVE_P90_US_WHOLE, TAOBENCH_SERVE_P99_US_WHOLE, TAOBENCH_SERVE_P999_US_WHOLE,
931 };
932 use crate::workload::schbench::plat::{Pct, PlatStats};
933
934 // Union the per-phase per-cgroup serve histograms across the whole run.
935 let mut serve = PlatStats::default();
936 for phase in &stats.phases {
937 for pc in phase.per_cgroup.values() {
938 if let Some(t) = pc.taobench.as_ref() {
939 serve.combine(&t.serve_lat);
940 }
941 }
942 }
943 if serve.sample_count() == 0 {
944 return;
945 }
946 let q = serve.percentiles();
947 stats.ext_metrics.insert(
948 TAOBENCH_SERVE_P50_US_WHOLE.to_string(),
949 q.value_at(Pct::P50) as f64,
950 );
951 stats.ext_metrics.insert(
952 TAOBENCH_SERVE_P90_US_WHOLE.to_string(),
953 q.value_at(Pct::P90) as f64,
954 );
955 stats.ext_metrics.insert(
956 TAOBENCH_SERVE_P99_US_WHOLE.to_string(),
957 q.value_at(Pct::P99) as f64,
958 );
959 stats.ext_metrics.insert(
960 TAOBENCH_SERVE_P999_US_WHOLE.to_string(),
961 q.value_at(Pct::P999) as f64,
962 );
963 stats
964 .ext_metrics
965 .insert(TAOBENCH_SERVE_MIN_US_WHOLE.to_string(), q.min as f64);
966 stats
967 .ext_metrics
968 .insert(TAOBENCH_SERVE_MAX_US_WHOLE.to_string(), q.max as f64);
969}
970
971/// Inject the schbench whole-run Class-3 metrics — the loop Counter and the
972/// role-separate run-delay gate-Rate components — into `stats.ext_metrics`,
973/// summed across EVERY phase and EVERY cgroup from the per-phase
974/// `SchbenchPhaseStats` raw pairs (`stats.phases[].per_cgroup[].schbench`). The
975/// raw `(run_delay_ns, pcount)` pairs and `loop_count` are integer and
976/// associative, so summing across phases+cgroups gives the run-level totals; the
977/// two `*_run_delay_ns_per_sched` Rates then re-derive Σrun_delay/Σpcount (the
978/// sample-weighted per-schedule mean — NOT a mean of per-run means). The MESSAGE
979/// and WORKER thread roles pool SEPARATELY (different per-schedule wait
980/// populations — never cross-pool).
981///
982/// Runs in [`populate_run_ext_all`] (post-merge, after
983/// [`populate_run_pooled_taobench`]); reads the per-phase carriers (a disjoint
984/// source from the iterations/taobench pools) and writes distinct
985/// `total_schbench_*` / `schbench_*_run_delay_ns_per_sched` keys, so it is
986/// order-independent. A run with no schbench carrier writes nothing (keys stay
987/// absent — a non-schbench run is distinct from a measured zero).
988///
989/// Both-or-neither PER ROLE: each role's two Counter components are inserted only
990/// when that role was scheduled (`pcount > 0`), so `derive_rate_metrics` yields
991/// the role's gate-Rate iff it ran (never a 0/0); the two roles are independent
992/// (a worker-only run emits only the worker Rate). `total_schbench_loops` is
993/// always written when any schbench carrier ran (0 is a measured zero). The
994/// per-phase `sched_delay_msg/worker_us` is the SAME Σrun_delay_ns/Σpcount
995/// per-schedule mean at phase scope (NOT schbench's native mean-of-per-thread-
996/// means, a separate whole-run SchbenchResult stat) and stays PerPhase
997/// display-only — only these Rates gate; no double-count. Cross-RUN the
998/// components SUM-fold (Counter), so each Rate re-pools Σrun_delay/Σpcount.
999pub fn populate_run_pooled_schbench(stats: &mut ScenarioStats) {
1000 use crate::stats::{
1001 TOTAL_SCHBENCH_LOOPS, TOTAL_SCHBENCH_MSG_PCOUNT, TOTAL_SCHBENCH_MSG_RUN_DELAY_NS,
1002 TOTAL_SCHBENCH_WORKER_PCOUNT, TOTAL_SCHBENCH_WORKER_RUN_DELAY_NS,
1003 };
1004 let mut msg_run_delay_ns: u64 = 0;
1005 let mut msg_pcount: u64 = 0;
1006 let mut worker_run_delay_ns: u64 = 0;
1007 let mut worker_pcount: u64 = 0;
1008 let mut loops: u64 = 0;
1009 let mut any = false;
1010 for phase in &stats.phases {
1011 for pc in phase.per_cgroup.values() {
1012 if let Some(s) = pc.schbench.as_ref() {
1013 any = true;
1014 // saturating: guest-runtime run-delay-ns / pcount / loop
1015 // counters pooled across phases+cgroups (matches the already-
1016 // saturating SchbenchPhaseStats::merge); never wrap a gate-Rate.
1017 msg_run_delay_ns = msg_run_delay_ns.saturating_add(s.msg_run_delay_ns);
1018 msg_pcount = msg_pcount.saturating_add(s.msg_pcount);
1019 worker_run_delay_ns = worker_run_delay_ns.saturating_add(s.worker_run_delay_ns);
1020 worker_pcount = worker_pcount.saturating_add(s.worker_pcount);
1021 loops = loops.saturating_add(s.loop_count);
1022 }
1023 }
1024 }
1025 if !any {
1026 return;
1027 }
1028 stats
1029 .ext_metrics
1030 .insert(TOTAL_SCHBENCH_LOOPS.to_string(), loops as f64);
1031 if msg_pcount > 0 {
1032 stats.ext_metrics.insert(
1033 TOTAL_SCHBENCH_MSG_RUN_DELAY_NS.to_string(),
1034 msg_run_delay_ns as f64,
1035 );
1036 stats
1037 .ext_metrics
1038 .insert(TOTAL_SCHBENCH_MSG_PCOUNT.to_string(), msg_pcount as f64);
1039 }
1040 if worker_pcount > 0 {
1041 stats.ext_metrics.insert(
1042 TOTAL_SCHBENCH_WORKER_RUN_DELAY_NS.to_string(),
1043 worker_run_delay_ns as f64,
1044 );
1045 stats.ext_metrics.insert(
1046 TOTAL_SCHBENCH_WORKER_PCOUNT.to_string(),
1047 worker_pcount as f64,
1048 );
1049 }
1050 crate::stats::derive_rate_metrics(&mut stats.ext_metrics);
1051}
1052
1053/// Inject the schbench whole-run DISTRIBUTIONAL metrics (the wakeup /
1054/// request latency percentiles + min/max and the achieved-rps percentiles) into
1055/// `stats.ext_metrics` as the `*_whole` keys, re-pooled run-level by UNIONING the
1056/// per-phase per-cgroup `PlatStats` histograms
1057/// (`stats.phases[].per_cgroup[].schbench.{wakeup,request,rps}`) across EVERY
1058/// phase and EVERY cgroup, then re-deriving each percentile / min / max over the
1059/// merged histogram. `PlatStats::combine` is an associative bucket-count add, so
1060/// the merged histogram is the FAITHFUL union and the re-derived percentile is
1061/// the percentile OF the pooled sample set — NOT a mean of per-phase / per-cgroup
1062/// percentiles (the percentile operator is non-linear). This is the schbench
1063/// histogram analog of [`populate_run_distribution_metrics`]'s raw-sample union.
1064///
1065/// Runs in [`populate_run_ext_all`] (post-merge, after
1066/// [`populate_run_pooled_schbench`]); reads the per-phase carriers (disjoint from
1067/// the iterations / taobench / schbench loop/run-delay pools) and writes distinct
1068/// `*_whole` keys, so it is order-independent. Each stream's keys are written
1069/// only when its merged histogram has samples (`sample_count() > 0`) — a stream
1070/// with no samples (e.g. a sub-1s run with no rps samples) reads ABSENT, never a
1071/// false 0 (mirrors the per-phase `write_schbench_scalars` gating and the
1072/// carrier-less graceful degradation of [`populate_run_distribution_metrics`]).
1073///
1074/// The `*_whole` keys are `crate::stats::MetricKind::PerRunDistribution`:
1075/// noise-compared per-run by `crate::stats::noise_findings` (each run's own p99),
1076/// NEVER cross-RUN folded (a percentile of a union is not a mean of per-run
1077/// percentiles, and the per-phase histograms are dropped at the cross-run
1078/// boundary), so they are gated out of the cross-RUN ext fold and the within-run
1079/// reducers (`is_derived`). Distinct names from the per-phase percentile keys
1080/// (one registry name = one kind), produced solely here.
1081pub fn populate_run_pooled_schbench_distribution(stats: &mut ScenarioStats) {
1082 use crate::stats::{
1083 SCHBENCH_REQUEST_MAX_US_WHOLE, SCHBENCH_REQUEST_MIN_US_WHOLE,
1084 SCHBENCH_REQUEST_P50_US_WHOLE, SCHBENCH_REQUEST_P90_US_WHOLE,
1085 SCHBENCH_REQUEST_P99_US_WHOLE, SCHBENCH_REQUEST_P999_US_WHOLE, SCHBENCH_RPS_MAX_WHOLE,
1086 SCHBENCH_RPS_MIN_WHOLE, SCHBENCH_RPS_P20_WHOLE, SCHBENCH_RPS_P50_WHOLE,
1087 SCHBENCH_RPS_P90_WHOLE, SCHBENCH_WAKEUP_MAX_US_WHOLE, SCHBENCH_WAKEUP_MIN_US_WHOLE,
1088 SCHBENCH_WAKEUP_P50_US_WHOLE, SCHBENCH_WAKEUP_P90_US_WHOLE, SCHBENCH_WAKEUP_P99_US_WHOLE,
1089 SCHBENCH_WAKEUP_P999_US_WHOLE,
1090 };
1091 use crate::workload::schbench::plat::{Pct, PlatStats};
1092
1093 // Union the per-stream histograms across ALL phases+cgroups (combine =
1094 // associative bucket-count add → the faithful pooled histogram).
1095 let mut wakeup = PlatStats::default();
1096 let mut request = PlatStats::default();
1097 let mut rps = PlatStats::default();
1098 for phase in &stats.phases {
1099 for pc in phase.per_cgroup.values() {
1100 if let Some(s) = pc.schbench.as_ref() {
1101 wakeup.combine(&s.wakeup);
1102 request.combine(&s.request);
1103 rps.combine(&s.rps);
1104 }
1105 }
1106 }
1107 // Latency streams: 4 percentiles + min/max, re-derived over the union, µs.
1108 if wakeup.sample_count() > 0 {
1109 let q = wakeup.percentiles();
1110 stats.ext_metrics.insert(
1111 SCHBENCH_WAKEUP_P50_US_WHOLE.to_string(),
1112 q.value_at(Pct::P50) as f64,
1113 );
1114 stats.ext_metrics.insert(
1115 SCHBENCH_WAKEUP_P90_US_WHOLE.to_string(),
1116 q.value_at(Pct::P90) as f64,
1117 );
1118 stats.ext_metrics.insert(
1119 SCHBENCH_WAKEUP_P99_US_WHOLE.to_string(),
1120 q.value_at(Pct::P99) as f64,
1121 );
1122 stats.ext_metrics.insert(
1123 SCHBENCH_WAKEUP_P999_US_WHOLE.to_string(),
1124 q.value_at(Pct::P999) as f64,
1125 );
1126 stats
1127 .ext_metrics
1128 .insert(SCHBENCH_WAKEUP_MIN_US_WHOLE.to_string(), q.min as f64);
1129 stats
1130 .ext_metrics
1131 .insert(SCHBENCH_WAKEUP_MAX_US_WHOLE.to_string(), q.max as f64);
1132 }
1133 if request.sample_count() > 0 {
1134 let q = request.percentiles();
1135 stats.ext_metrics.insert(
1136 SCHBENCH_REQUEST_P50_US_WHOLE.to_string(),
1137 q.value_at(Pct::P50) as f64,
1138 );
1139 stats.ext_metrics.insert(
1140 SCHBENCH_REQUEST_P90_US_WHOLE.to_string(),
1141 q.value_at(Pct::P90) as f64,
1142 );
1143 stats.ext_metrics.insert(
1144 SCHBENCH_REQUEST_P99_US_WHOLE.to_string(),
1145 q.value_at(Pct::P99) as f64,
1146 );
1147 stats.ext_metrics.insert(
1148 SCHBENCH_REQUEST_P999_US_WHOLE.to_string(),
1149 q.value_at(Pct::P999) as f64,
1150 );
1151 stats
1152 .ext_metrics
1153 .insert(SCHBENCH_REQUEST_MIN_US_WHOLE.to_string(), q.min as f64);
1154 stats
1155 .ext_metrics
1156 .insert(SCHBENCH_REQUEST_MAX_US_WHOLE.to_string(), q.max as f64);
1157 }
1158 // RPS stream: PLIST_FOR_RPS = 20/50/90 + min/max (the schbench rps table).
1159 if rps.sample_count() > 0 {
1160 let r = rps.percentiles();
1161 stats.ext_metrics.insert(
1162 SCHBENCH_RPS_P20_WHOLE.to_string(),
1163 r.value_at(Pct::P20) as f64,
1164 );
1165 stats.ext_metrics.insert(
1166 SCHBENCH_RPS_P50_WHOLE.to_string(),
1167 r.value_at(Pct::P50) as f64,
1168 );
1169 stats.ext_metrics.insert(
1170 SCHBENCH_RPS_P90_WHOLE.to_string(),
1171 r.value_at(Pct::P90) as f64,
1172 );
1173 stats
1174 .ext_metrics
1175 .insert(SCHBENCH_RPS_MIN_WHOLE.to_string(), r.min as f64);
1176 stats
1177 .ext_metrics
1178 .insert(SCHBENCH_RPS_MAX_WHOLE.to_string(), r.max as f64);
1179 }
1180}
1181
1182/// Populate run-level DERIVED distributional metrics into
1183/// `stats.ext_metrics`: every registered `MetricKind::Distribution`,
1184/// `MetricKind::WorstLowest`, `MetricKind::WakeLatencyTailRatio`, and
1185/// `MetricKind::WorstCrossNodeRatio`. This is the SOLE
1186/// within-run producer of those metrics' values — they carry no per-phase
1187/// sample slice and no cross-cgroup merge fold, and their registry accessors
1188/// are `|_| None`, so `MetricDef::read` reads the value
1189/// written here from `ext_metrics`.
1190///
1191/// DISTRIBUTION (the 5 wake / run-delay aggregates): pools the RAW sample
1192/// vectors held in `stats.phases[].per_cgroup` across EVERY phase and EVERY
1193/// cgroup into one combined set, then recomputes the percentile / CV / mean
1194/// / extreme over it — the statistic of the union, NOT a max or mean of
1195/// per-cgroup reductions (the percentile of a union is not the max of
1196/// per-source percentiles). The ns→µs scale is applied ONCE here (the
1197/// carriers store raw ns, per [`PhaseCgroupStats::run_delays_ns`]). The wake
1198/// pool is population-WEIGHTED: each phase carrier's samples carry weight
1199/// `wake_sample_total / wake_latencies_ns.len()`, so a phase whose reservoir
1200/// hit the cap contributes by true population, not capped length (the
1201/// cross-PHASE de-skew) — reduced via the weighted percentile / moments.
1202/// The run-delay pool is unweighted (per-worker, never reservoir-capped, so
1203/// length IS population). Below the wake cap every weight is 1.0, so the
1204/// weighted P99 / median / mean / worst are byte-identical to the unweighted
1205/// concat; the weighted CV matches only within ~1e-9 (it sums the mean in f64
1206/// where the unweighted path sums in u64 — a weighted variance cannot keep the
1207/// u64 sum).
1208///
1209/// CARRIER-LESS FOLD (graceful degradation): a cgroup whose raw samples are
1210/// NOT in the pool — a backdrop epoch that fell on BASELINE or the
1211/// inter-step gap (no paired host bucket, so no carrier) or a cgroup whose
1212/// carrier was stripped/empty (`strip_phase_cgroup_samples`) — is NOT
1213/// dropped. Its
1214/// surviving per-cgroup [`CgroupStats`] reduction folds worst-wins (max — every
1215/// Distribution metric is `LowerBetter`, registry-gated) into the pooled value.
1216/// The CgroupStats reductions are never stripped — `stats.cgroups[]` is the
1217/// already-reduced `cgroup_stats(reports)` output, a SEPARATE reduction path
1218/// from the per-phase carriers — so a carrier-less cgroup always has a source.
1219/// When EVERY carrier is empty (a fully-stripped run) the pool is empty and the
1220/// result degenerates to the max over every cgroup's reduction — the pre-Item-7
1221/// cross-cgroup max. NOTE the value CLASS of a folded cgroup differs from a
1222/// pooled one for the P99 / Median / Mean / CV reductions: a pooled cgroup
1223/// contributes to the percentile of the union; a carrier-less cgroup
1224/// contributes its per-cgroup reduction worst-wins (a worst-cgroup proxy, not
1225/// pooled). For the `SampleReduction::Worst` reduction the two COINCIDE
1226/// (max-of-union == max-of-per-cgroup-maxes), so the carrier-less fold is exact
1227/// there, not a proxy. A second asymmetry specific to CV (from the population
1228/// weighting): the POOLED CV divides variance/mean by Σ per-sample weights (the
1229/// reconstructed population), while a carrier-less cgroup's folded CV is
1230/// [`cgroup_stats`]'s UNWEIGHTED CV (`n = all_latencies.len()`). The two
1231/// coincide below the cap (all weights 1.0) and diverge above it; the mix is
1232/// sound — a carrier-less cgroup has no per-phase weight data to
1233/// population-weight (its carrier is absent by definition), and both feed the
1234/// same LowerBetter worst-wins max. Backdrop step-phase carriers now join
1235/// the pool directly (per-epoch expansion in `collect_handles`); only the
1236/// carrier-less cases above fold worst-wins.
1237///
1238/// WORSTLOWEST (the 2 iteration efficiencies): the lowest (worst) cgroup's
1239/// efficiency, computed per-cgroup from the `stats.cgroups[]` COUNTERS via
1240/// [`CgroupStats::iterations_per_worker`] / [`CgroupStats::iterations_per_cpu_sec`]
1241/// and the None-aware lowest-wins fold (a measured `Some(0.0)` — starvation
1242/// — wins; a no-data `None` is skipped; an all-`None` cohort writes no key,
1243/// preserving absence as a missing ext entry rather than a `0.0`). The
1244/// counters survive stripping, so WorstLowest needs no fallback branch.
1245///
1246/// Runs post-merge at the eval layer beside
1247/// [`populate_run_pooled_iterations_per_cpu_sec`], AFTER the per-cgroup
1248/// carriers are folded into `stats.phases` and BEFORE the sidecar write, so
1249/// `stats.phases[].per_cgroup` is fully merged and `stats.cgroups` is the
1250/// final per-cgroup roll-up.
1251pub fn populate_run_distribution_metrics(stats: &mut ScenarioStats) {
1252 // Pool the per-phase per-cgroup raw sample vectors across every phase and
1253 // cgroup ONCE for the Distribution PRIMARY path, then sort so the
1254 // percentile reductions can index directly. `wake_latencies_ns` is
1255 // per-WAKEUP (reservoir-capped at MAX_WAKE_SAMPLES on the carrier because
1256 // it can reach 100k); `run_delays_ns` is per-WORKER (one sample/worker, not
1257 // capped), so the run-delay pool is total-workers × phases — genuinely
1258 // small. The wake pool is NOT intrinsically small: it is the union of the
1259 // per-carrier wake vectors, num_carriers × MAX_WAKE_SAMPLES worst case, so
1260 // its size is bounded by the upstream 16 MiB bulk-frame cap on the arriving
1261 // carriers (strip_phase_cgroup_samples is the overflow lever) rather than by
1262 // being tiny — no OOM risk, no cap needed here. Both are transient: reduced
1263 // to scalars here, never re-serialized.
1264 // Wake samples carry a per-sample population WEIGHT (`wake_sample_total /
1265 // reservoir len`) so a >cap phase contributes in proportion to its true
1266 // population, not its guest-capped length (the cross-PHASE de-skew). Run-delay
1267 // samples are per-worker and never reservoir-capped (no `*_sample_total`), so
1268 // their length IS their population — pooled unweighted.
1269 let mut wake_pool: Vec<(u64, f64)> = Vec::new();
1270 // Distinct timer-latency pool, population-WEIGHTED like
1271 // wake_pool (reservoir-capped, so a >cap phase carries weight
1272 // timer_sample_total/len for the cross-phase de-skew).
1273 let mut timer_pool: Vec<(u64, f64)> = Vec::new();
1274 let mut run_delay_pool: Vec<u64> = Vec::new();
1275 // Names of cgroups that contributed NON-EMPTY samples to each pool. A
1276 // cgroup absent here — a backdrop epoch that fell on BASELINE / the
1277 // inter-step gap (no paired host bucket, so no carrier) or a
1278 // stripped/empty carrier — is NOT dropped from the run-level
1279 // Distribution: the re-pool folds its surviving per-cgroup CgroupStats
1280 // reduction worst-wins (see `populate_run_distribution_metrics_from`).
1281 // Backdrop step-phase carriers now join the pool directly (per-epoch
1282 // expansion in collect_handles), so a step-matched backdrop epoch pools
1283 // rather than worst-wins-folds.
1284 //
1285 // The fallback dedup keys on cgroup NAME (a `stats.cgroups` entry whose
1286 // name is in `*_carriers` is pooled, not reduction-folded), which assumes
1287 // carrier-bearing and carrier-less cgroup names are DISJOINT. That holds
1288 // WITHIN one step's collect (cgroupfs path uniqueness — two live cgroups
1289 // cannot share a name, mkdir would EEXIST — and a single collect_handles
1290 // call attaches carriers to all its handles or none). It does NOT hold
1291 // across STEPS: `AssertResult::merge` extends `stats.cgroups` per
1292 // (handle, step), so a name that carried samples at step k recurs at step
1293 // k+1, and the step-(k+1) entry is skipped by this dedup (its name is in
1294 // `*_carriers`). That only OMITS a contribution, never vanishes the metric
1295 // (the step-k pool still produces it). A skipped step-(k+1) entry whose
1296 // carrier is merely EMPTY (collected no samples) is harmless: its per-cgroup
1297 // reduction is the trivial zero a worst-wins f64::max ignores. The only
1298 // LOSSY case is a step-(k+1) entry STRIPPED of live samples while step k
1299 // survives, and that cannot arise today: `strip_phase_cgroup_samples` strips
1300 // RUN-WIDE (every phase at once), so a run is never partially stripped per
1301 // step. A backdrop name now enters `*_carriers` (pooled once via its
1302 // per-epoch expansion) so it is skipped from the reduction-fold — and a
1303 // backdrop and a step-local cgroup cannot share a live name (cgroupfs
1304 // mkdir EEXIST; a backdrop is live the whole scenario), so each
1305 // stats.cgroups entry still contributes via exactly one of {pool,
1306 // reduction-fold} — no double count.
1307 let mut wake_carriers: std::collections::BTreeSet<&str> = std::collections::BTreeSet::new();
1308 let mut timer_carriers: std::collections::BTreeSet<&str> = std::collections::BTreeSet::new();
1309 let mut run_delay_carriers: std::collections::BTreeSet<&str> =
1310 std::collections::BTreeSet::new();
1311 for phase in &stats.phases {
1312 for (cgname, pcg) in &phase.per_cgroup {
1313 if !pcg.wake_latencies_ns.is_empty() {
1314 // Per-sample weight = true population / surviving reservoir size.
1315 // A ≤cap carrier has len == wake_sample_total → weight 1.0, so the
1316 // pool is value-for-value with the unweighted concat; a >cap
1317 // carrier's capped samples each stand for `total/len > 1` true
1318 // wakes, restoring the cross-phase population proportion.
1319 //
1320 // INVARIANT: `reservoir_push` bumps wake_sample_total on EVERY
1321 // wakeup but pushes into the reservoir only up to MAX_WAKE_SAMPLES,
1322 // and both the carrier merge and `phase_cgroup_stats` SUM the two,
1323 // so wake_sample_total >= len always (== len below the cap). A
1324 // carrier violating that — samples present but a zeroed/under-count
1325 // total — would yield weight < 1 and silently UNDER-weight (at
1326 // weight 0, DROP) its samples. Clamp the numerator to len so a
1327 // malformed carrier degrades to unit weight (reservoir treated as
1328 // its own population) instead of dropping data; debug_assert the
1329 // invariant so a real counting bug surfaces in dev.
1330 let len = pcg.wake_latencies_ns.len() as u64;
1331 debug_assert!(
1332 pcg.wake_sample_total >= len,
1333 "wake_sample_total ({}) < reservoir len ({}): malformed carrier",
1334 pcg.wake_sample_total,
1335 len,
1336 );
1337 let w = pcg.wake_sample_total.max(len) as f64 / len as f64;
1338 wake_pool.extend(pcg.wake_latencies_ns.iter().map(|&v| (v, w)));
1339 wake_carriers.insert(cgname.as_str());
1340 }
1341 if !pcg.timer_latencies_ns.is_empty() {
1342 // Population-weighted exactly like the wake pool: a >cap phase's
1343 // capped samples each stand for timer_sample_total/len true
1344 // wakes, restoring the cross-phase population proportion.
1345 let len = pcg.timer_latencies_ns.len() as u64;
1346 debug_assert!(
1347 pcg.timer_sample_total >= len,
1348 "timer_sample_total ({}) < reservoir len ({}): malformed carrier",
1349 pcg.timer_sample_total,
1350 len,
1351 );
1352 let w = pcg.timer_sample_total.max(len) as f64 / len as f64;
1353 timer_pool.extend(pcg.timer_latencies_ns.iter().map(|&v| (v, w)));
1354 timer_carriers.insert(cgname.as_str());
1355 }
1356 if !pcg.run_delays_ns.is_empty() {
1357 run_delay_pool.extend_from_slice(&pcg.run_delays_ns);
1358 run_delay_carriers.insert(cgname.as_str());
1359 }
1360 }
1361 }
1362 wake_pool.sort_unstable_by_key(|&(v, _)| v);
1363 timer_pool.sort_unstable_by_key(|&(v, _)| v);
1364 run_delay_pool.sort_unstable();
1365 populate_run_distribution_metrics_from(
1366 &mut stats.ext_metrics,
1367 crate::stats::METRICS.iter().filter_map(|m| {
1368 matches!(
1369 m.kind,
1370 crate::stats::MetricKind::Distribution { .. }
1371 | crate::stats::MetricKind::WorstLowest { .. }
1372 | crate::stats::MetricKind::WakeLatencyTailRatio
1373 )
1374 .then_some((m.name, m.kind))
1375 }),
1376 &wake_pool,
1377 &wake_carriers,
1378 &timer_pool,
1379 &timer_carriers,
1380 &run_delay_pool,
1381 &run_delay_carriers,
1382 &stats.cgroups,
1383 stats.total_iterations,
1384 );
1385 // worst_page_locality (WorstLowest{NumaLocal,NumaTotal}) re-pools from the
1386 // per-phase NUMA carriers, which the _from helper above cannot reach (it
1387 // takes only stats.cgroups, and the reports-only CgroupStats hardcodes
1388 // page_locality 0.0). Single-sourced with run_metric's WorstPageLocality via
1389 // worst_page_locality(): the lowest per-cgroup page_locality over cgroups
1390 // that measured NUMA residency (a measured 0.0 — all off-node — winning); an
1391 // all-unmeasured cohort writes no key (absence preserved as a missing ext
1392 // entry, never a 0.0 sentinel). numa_agg_per_cgroup()'s borrow ends before
1393 // the insert.
1394 if let Some(v) = stats.worst_page_locality() {
1395 stats
1396 .ext_metrics
1397 .insert("worst_page_locality".to_string(), v);
1398 }
1399 // worst_cross_node_migration_ratio (WorstCrossNodeRatio) re-pools from the same
1400 // per-phase NUMA carriers — the MAX per-cgroup churn ratio over the latest
1401 // residency total — single-sourced with run_metric's WorstCrossNodeMigrationRatio
1402 // via worst_cross_node_migration_ratio(); an all-unmeasured cohort writes no key
1403 // (absence preserved). Not handled by the _from helper above (it has only
1404 // stats.cgroups, and the metric is excluded from that filter).
1405 if let Some(v) = stats.worst_cross_node_migration_ratio() {
1406 stats
1407 .ext_metrics
1408 .insert("worst_cross_node_migration_ratio".to_string(), v);
1409 }
1410}
1411
1412/// Inner of [`populate_run_distribution_metrics`] taking the metric specs
1413/// `(name, kind)` and the pre-pooled+SORTED sample sets explicitly, so the
1414/// re-pool math is unit-testable without registered metrics (the
1415/// `derive_rate_metrics_from` precedent). `wake_pool` / `run_delay_pool` are
1416/// the cross-phase+cross-cgroup raw-ns unions (ascending); `*_carriers` name
1417/// the cgroups that contributed samples to each pool; `cgroups` supplies the
1418/// WorstLowest counters and the per-cgroup reductions that carrier-less
1419/// cgroups (backdrop / stripped) fold into the Distribution result.
1420#[allow(clippy::too_many_arguments)]
1421pub(crate) fn populate_run_distribution_metrics_from<'a>(
1422 target: &mut std::collections::BTreeMap<String, f64>,
1423 metrics: impl Iterator<Item = (&'a str, crate::stats::MetricKind)>,
1424 wake_pool: &[(u64, f64)],
1425 wake_carriers: &std::collections::BTreeSet<&str>,
1426 timer_pool: &[(u64, f64)],
1427 timer_carriers: &std::collections::BTreeSet<&str>,
1428 run_delay_pool: &[u64],
1429 run_delay_carriers: &std::collections::BTreeSet<&str>,
1430 cgroups: &[CgroupStats],
1431 run_total_iterations: u64,
1432) {
1433 use crate::stats::{MetricKind, SampleSource, WorstLowestDenominator, WorstLowestNumerator};
1434 for (name, kind) in metrics {
1435 let value: Option<f64> = match kind {
1436 MetricKind::Distribution { source, reduction } => {
1437 // Pool the carried samples (the thesis: percentile of the
1438 // UNION), then fold worst-wins (max — Distribution is
1439 // LowerBetter, registry-gated) the surviving per-cgroup
1440 // reduction of every cgroup WITHOUT a carrier-with-samples for
1441 // this source (a backdrop, or a stripped/empty carrier), so no
1442 // cgroup is dropped from the run-level distribution. When EVERY
1443 // carrier is empty (fully stripped) the pool is empty and this
1444 // degenerates to the max over every cgroup — the pre-Item-7
1445 // cross-cgroup max.
1446 //
1447 // Pool reduction is per-source: WakeLatencyNs is population-WEIGHTED
1448 // (each phase's guest-capped samples carry weight
1449 // wake_sample_total/len, so a >cap phase contributes by true
1450 // population not capped length — the cross-PHASE de-skew, via
1451 // reduce_weighted_sorted_distribution); RunDelayNs is unweighted
1452 // (per-worker, never reservoir-capped, so length IS population, via
1453 // reduce_sorted_distribution).
1454 //
1455 // CONTRACT (now uniform with WorstLowest and WakeLatencyTailRatio
1456 // below): a cohort with NO measurement for this source anywhere
1457 // (empty carrier pool AND every non-carrier cgroup not-measured)
1458 // yields ABSENCE (None), not Some(0.0). A percentile / mean over
1459 // zero samples is undefined, not a measured zero — folding its
1460 // 0.0 sentinel into the cross-run mean would, for the LowerBetter
1461 // wake/timer/run-delay metrics, falsely drag the mean toward
1462 // "perfect". The non-carrier fold below is gated on
1463 // `cg.measured_for(source)` so an unmeasured cgroup contributes
1464 // nothing; a non-carrier cgroup that DID measure (a genuine
1465 // measured zero, e.g. workers that queued for no time) still
1466 // contributes its real value. This matches WorstLowest (None when
1467 // every iterations_per_*() is None) and WakeLatencyTailRatio (None
1468 // when no cgroup has a tail) — every distributional kind now emits
1469 // None for a no-measurement run.
1470 let (mut v, carriers): (Option<f64>, &std::collections::BTreeSet<&str>) =
1471 match source {
1472 SampleSource::WakeLatencyNs => (
1473 (!wake_pool.is_empty())
1474 .then(|| reduce_weighted_sorted_distribution(wake_pool, reduction)),
1475 wake_carriers,
1476 ),
1477 SampleSource::TimerLatencyNs => (
1478 (!timer_pool.is_empty()).then(|| {
1479 reduce_weighted_sorted_distribution(timer_pool, reduction)
1480 }),
1481 timer_carriers,
1482 ),
1483 SampleSource::RunDelayNs => (
1484 (!run_delay_pool.is_empty())
1485 .then(|| reduce_sorted_distribution(run_delay_pool, reduction)),
1486 run_delay_carriers,
1487 ),
1488 };
1489 for cg in cgroups {
1490 if !carriers.contains(cg.cgroup_name.as_str()) && cg.measured_for(source) {
1491 let r = distribution_cgroup_reduction(cg, source, reduction);
1492 v = Some(v.map_or(r, |acc| acc.max(r)));
1493 }
1494 }
1495 v
1496 }
1497 // The iterations-efficiency WorstLowest selectors (numerator
1498 // Iterations) re-pool from the `stats.cgroups` counters here; the
1499 // denominator picks the per-cgroup efficiency method. The
1500 // page-locality selector (numerator NumaLocal) re-pools instead from
1501 // the per-phase NUMA carriers in populate_run_distribution_metrics
1502 // (this fn has only stats.cgroups, not stats.phases), so it is
1503 // skipped here (the NumaLocal arm below returns None).
1504 //
1505 // In a MULTI-STEP scenario `AssertResult::merge` extends
1506 // `stats.cgroups` per (handle, step), so the same cgroup name
1507 // appears once per step; this selects the lowest single
1508 // (handle, step) entry, NOT a per-name whole-run efficiency. That
1509 // preserves the deleted `fold_lowest_some` granularity exactly and
1510 // mirrors `populate_run_pooled_iterations_per_cpu_sec`, which sums
1511 // over the same per-(handle, step) entries.
1512 MetricKind::WorstLowest {
1513 numerator: WorstLowestNumerator::Iterations,
1514 denominator,
1515 } => {
1516 let mut worst: Option<f64> = None;
1517 for cg in cgroups {
1518 let per_cg = match denominator {
1519 WorstLowestDenominator::NumWorkers => cg.iterations_per_worker(),
1520 WorstLowestDenominator::CpuTimeNs => cg.iterations_per_cpu_sec(),
1521 // NumaTotal pairs only with the NumaLocal numerator
1522 // (handled in the NumaLocal arm below) — never with
1523 // Iterations.
1524 WorstLowestDenominator::NumaTotal => None,
1525 };
1526 // Lowest-wins, None-aware (the semantic the deleted
1527 // `fold_lowest_some` carried in `AssertResult::merge`): a
1528 // measured `Some(0.0)` (starvation) wins the worst bucket;
1529 // a `None` is skipped.
1530 if let Some(v) = per_cg
1531 && worst.is_none_or(|w| v < w)
1532 {
1533 worst = Some(v);
1534 }
1535 }
1536 worst
1537 }
1538 // page-locality (numerator NumaLocal): re-pooled from the per-phase
1539 // NUMA carriers in populate_run_distribution_metrics, not here.
1540 MetricKind::WorstLowest {
1541 numerator: WorstLowestNumerator::NumaLocal,
1542 ..
1543 } => None,
1544 // Worst-cgroup wake-latency tail amplification: the MAX over each
1545 // cgroup's own p99/median ratio (`CgroupStats::wake_latency_tail_ratio`).
1546 // Emit NO key below the min-iterations noise floor (low-N ratios are
1547 // single-outlier noise, not a distributional signal — gated HERE at
1548 // the producer, NOT via a meaned-iteration accessor on the
1549 // aggregated row), and none when no cgroup carried a measurable tail
1550 // (every per-cgroup ratio 0.0, i.e. no median wake latency anywhere).
1551 // Absence then stays distinct from a measured value and no
1552 // sub-threshold run enters the cross-RUN mean. `wake_latency_tail_ratio`
1553 // returns 0.0 for a cgroup with no wake samples (median <= 0), which
1554 // a max-wins fold over the r > 0.0 reals correctly skips.
1555 MetricKind::WakeLatencyTailRatio => {
1556 if run_total_iterations < crate::stats::WAKE_LATENCY_TAIL_RATIO_MIN_ITERATIONS {
1557 None
1558 } else {
1559 let mut worst: Option<f64> = None;
1560 for cg in cgroups {
1561 let r = cg.wake_latency_tail_ratio();
1562 if r > 0.0 {
1563 worst = Some(worst.map_or(r, |w| w.max(r)));
1564 }
1565 }
1566 worst
1567 }
1568 }
1569 _ => None,
1570 };
1571 // Insert only a real, FINITE value: an absent key (all-None
1572 // WorstLowest cohort, or no cgroups at all) stays distinct from a
1573 // measured 0.0, matching the None-vs-Some(0.0) contract the typed
1574 // Option carried. The is_finite guard is a no-op for every
1575 // registry-valid metric (reduce_sorted_distribution reduces non-empty
1576 // pools with CV guarded to 0.0 on zero mean; WorstLowest reuses
1577 // iterations_per_worker()/iterations_per_cpu_sec() which return None on
1578 // a zero denominator), but it MATTERS for the registry-impossible
1579 // cross-source arm of distribution_cgroup_reduction: that arm returns
1580 // NaN, and when a Distribution has no pool (every carrier stripped) the
1581 // carrier-less fold can carry that NaN to `v`. An inserted NaN would
1582 // fail the ENTIRE serde_json sidecar write (serde_json rejects
1583 // non-finite), losing ALL run telemetry — so the guard degrades a
1584 // misauthored metric to ABSENCE here rather than risking that write
1585 // failure downstream.
1586 if let Some(v) = value.filter(|v| v.is_finite()) {
1587 target.insert(name.to_string(), v);
1588 }
1589 }
1590}
1591
1592/// Reduce a NON-EMPTY ascending-sorted raw-ns sample pool to one
1593/// [`crate::stats::SampleReduction`] value, ns→µs once. Mirrors the
1594/// per-cgroup reductions [`cgroup_stats`] computes (p99 / median via
1595/// [`percentile`], CV with `n = pool.len()`, mean, max) so the run-level
1596/// re-pool reproduces them over the COMBINED cross-cgroup set — to within
1597/// FP tolerance for CV / mean, not bit-exactly: this sums over the
1598/// ASCENDING-sorted pool while `cgroup_stats` sums over the unsorted
1599/// arrival order, so the float results differ by ~1e-15 (the parity test
1600/// `repool_distribution_value_for_value_with_cgroup_stats` uses a 1e-9
1601/// bound). Same "distribution-equivalent, not byte-identical" framing as
1602/// the `wake_latencies_ns` carrier doc.
1603pub(crate) fn reduce_sorted_distribution(
1604 sorted: &[u64],
1605 reduction: crate::stats::SampleReduction,
1606) -> f64 {
1607 use crate::stats::SampleReduction;
1608 match reduction {
1609 SampleReduction::P99 => percentile(sorted, 0.99) as f64 / 1000.0,
1610 SampleReduction::P999 => percentile(sorted, 0.999) as f64 / 1000.0,
1611 SampleReduction::Median => percentile(sorted, 0.5) as f64 / 1000.0,
1612 SampleReduction::Cv => {
1613 let n = sorted.len() as f64;
1614 let mean_ns = sorted.iter().sum::<u64>() as f64 / n;
1615 if mean_ns > 0.0 {
1616 let variance = sorted
1617 .iter()
1618 .map(|&v| (v as f64 - mean_ns).powi(2))
1619 .sum::<f64>()
1620 / n;
1621 variance.sqrt() / mean_ns
1622 } else {
1623 0.0
1624 }
1625 }
1626 // Divide ONCE on the summed/maxed ns (the carriers store raw ns):
1627 // mean(ns)/1000 == mean(ns/1000) and max(ns)/1000 == max(ns/1000).
1628 // Sum in f64 (not u64-then-cast) to match cgroup_stats's f64 run-delay
1629 // accumulation and PhaseCgroupStats::run_delay_summary — overflow-safe
1630 // (an f64 sum saturates toward +inf; a u64 sum would panic in debug /
1631 // silently wrap in release on a pathological pool), value identical
1632 // within the 1e-9 parity bound. (The Cv arm's mean_ns above keeps the u64 sum
1633 // because cgroup_stats's CV also u64-sums all_latencies — matching it is
1634 // exact-parity-preserving there.)
1635 SampleReduction::Mean => {
1636 sorted.iter().map(|&v| v as f64).sum::<f64>() / sorted.len() as f64 / 1000.0
1637 }
1638 // Sorted ascending, so the last element is the max.
1639 SampleReduction::Worst => *sorted.last().expect("non-empty by caller") as f64 / 1000.0,
1640 }
1641}
1642
1643/// Weighted nearest-rank percentile over a value-sorted `(value, weight)` pool —
1644/// the weighted sibling of [`percentile`]. Matches `percentile`'s convention
1645/// (the value at 1-indexed rank `ceil(W * p)`, `W` = total weight, floored at
1646/// rank 1) so with UNIT weights (every weight `1.0`) it returns byte-identically:
1647/// cumulative weight after `k` elements is `k`, `ceil(W*p) == ceil(n*p)`, and the
1648/// `.max(1.0)` floor mirrors `percentile`'s `saturating_sub(1)`, so the crossing
1649/// element is `percentile`'s `sorted[ceil(n*p)-1]` for p>0 and `sorted[0]` at
1650/// p=0. Used by the run-level wake re-pool to weight each phase's samples by
1651/// true population.
1652pub(crate) fn weighted_percentile(sorted: &[(u64, f64)], p: f64) -> u64 {
1653 if sorted.is_empty() {
1654 return 0;
1655 }
1656 debug_assert!(
1657 sorted.windows(2).all(|w| w[0].0 <= w[1].0),
1658 "weighted_percentile() requires value-sorted input",
1659 );
1660 let total: f64 = sorted.iter().map(|&(_, w)| w).sum();
1661 // Nearest-rank target, floored at 1 so `p == 0.0` maps to the first element
1662 // (mirrors percentile's saturating_sub(1) flooring rank 0 to index 0).
1663 let target = (total * p).ceil().max(1.0);
1664 let mut cum = 0.0;
1665 for &(v, w) in sorted {
1666 cum += w;
1667 if cum >= target {
1668 return v;
1669 }
1670 }
1671 sorted.last().map(|&(v, _)| v).unwrap_or(0)
1672}
1673
1674/// Weighted sibling of [`reduce_sorted_distribution`] for the wake-latency
1675/// re-pool: each `(value, weight)` carries a per-sample weight of
1676/// `wake_sample_total / reservoir_len`, so a >cap phase (reservoir-capped on the
1677/// guest) contributes in proportion to its TRUE population, not its capped
1678/// length — removing the cross-PHASE length-skew. With UNIT weights (every phase
1679/// ≤cap, so `len == wake_sample_total`) it reduces byte-identically to
1680/// [`reduce_sorted_distribution`] for P99 / Median / Mean / Worst; the Cv arm
1681/// differs only by the f64-vs-u64 mean sum. For the small fixed pool the parity
1682/// test uses, that gap is ~1e-15 (within its 1e-9 bound), but it grows ~n·ε with
1683/// pool size — a cross-phase pool can reach millions of samples (~1e-9–1e-8 on a
1684/// high-CV pool), so a LARGE-pool parity test must not assume a universal 1e-15.
1685/// A weighted variance cannot keep the u64 sum. Exhaustive over SampleReduction,
1686/// mirroring [`reduce_sorted_distribution`], so a new variant fails the build.
1687///
1688/// The Cv / Mean `total_w <= 0.0` guards and [`weighted_percentile`]'s
1689/// all-weight-zero fall-through are degenerate-input belts: the capture-path
1690/// caller [`populate_run_distribution_metrics`] clamps every per-sample weight to
1691/// a floor of 1.0, so `total_w >= len >= 1` there and those branches are
1692/// unreachable on the production path.
1693pub(crate) fn reduce_weighted_sorted_distribution(
1694 sorted: &[(u64, f64)],
1695 reduction: crate::stats::SampleReduction,
1696) -> f64 {
1697 use crate::stats::SampleReduction;
1698 match reduction {
1699 SampleReduction::P99 => weighted_percentile(sorted, 0.99) as f64 / 1000.0,
1700 SampleReduction::P999 => weighted_percentile(sorted, 0.999) as f64 / 1000.0,
1701 SampleReduction::Median => weighted_percentile(sorted, 0.5) as f64 / 1000.0,
1702 SampleReduction::Cv => {
1703 let total_w: f64 = sorted.iter().map(|&(_, w)| w).sum();
1704 if total_w <= 0.0 {
1705 return 0.0;
1706 }
1707 let mean_ns = sorted.iter().map(|&(v, w)| v as f64 * w).sum::<f64>() / total_w;
1708 if mean_ns > 0.0 {
1709 let variance = sorted
1710 .iter()
1711 .map(|&(v, w)| w * (v as f64 - mean_ns).powi(2))
1712 .sum::<f64>()
1713 / total_w;
1714 variance.sqrt() / mean_ns
1715 } else {
1716 0.0
1717 }
1718 }
1719 SampleReduction::Mean => {
1720 let total_w: f64 = sorted.iter().map(|&(_, w)| w).sum();
1721 if total_w <= 0.0 {
1722 return 0.0;
1723 }
1724 sorted.iter().map(|&(v, w)| v as f64 * w).sum::<f64>() / total_w / 1000.0
1725 }
1726 // Max value present, weight-invariant — last element of the value-sorted pool.
1727 SampleReduction::Worst => sorted.last().map(|&(v, _)| v).unwrap_or(0) as f64 / 1000.0,
1728 }
1729}
1730
1731/// One cgroup's surviving [`CgroupStats`] reduction for a
1732/// [`crate::stats::MetricKind::Distribution`] (source, reduction) pair — the
1733/// value folded worst-wins into the run-level distribution for a cgroup whose
1734/// raw samples are NOT in the pool (a backdrop, or a stripped/empty carrier).
1735/// Worst-wins is `f64::max` (every Distribution metric is `LowerBetter`,
1736/// enforced by `every_metric_has_kind_consistent_with_naming`).
1737///
1738/// Per-source match, EXHAUSTIVE over SampleReduction (no `_` catch-all,
1739/// mirroring reduce_sorted_distribution) so a new SampleSource or
1740/// SampleReduction variant fails the build until a reduction field is wired.
1741/// The cross-source reductions (a wake source asking for a run-delay reduction,
1742/// or vice versa) are registry-impossible (no CgroupStats field exists), so
1743/// they debug_assert in tests and, in release, return `f64::NAN` rather than
1744/// 0.0 — NaN is IGNORED by the caller's `f64::max` worst-wins fold, and if it
1745/// still reaches `populate_run_distribution_metrics`'s insert (a pool-less
1746/// Distribution whose every carrier-less cgroup hits this arm) the is_finite
1747/// insert guard drops it to absence. Either way a registry-authoring mistake
1748/// drops the bogus contribution instead of folding a 0.0 that a LowerBetter
1749/// metric would read as "perfect".
1750fn distribution_cgroup_reduction(
1751 cg: &CgroupStats,
1752 source: crate::stats::SampleSource,
1753 reduction: crate::stats::SampleReduction,
1754) -> f64 {
1755 use crate::stats::{SampleReduction, SampleSource};
1756 match source {
1757 SampleSource::WakeLatencyNs => match reduction {
1758 SampleReduction::P99 => cg.p99_wake_latency_us,
1759 SampleReduction::Median => cg.median_wake_latency_us,
1760 SampleReduction::Cv => cg.wake_latency_cv,
1761 SampleReduction::P999 | SampleReduction::Mean | SampleReduction::Worst => {
1762 debug_assert!(false, "no CgroupStats wake reduction for {reduction:?}");
1763 f64::NAN
1764 }
1765 },
1766 SampleSource::RunDelayNs => match reduction {
1767 SampleReduction::Mean => cg.mean_run_delay_us,
1768 SampleReduction::Worst => cg.worst_run_delay_us,
1769 SampleReduction::P99
1770 | SampleReduction::P999
1771 | SampleReduction::Median
1772 | SampleReduction::Cv => {
1773 debug_assert!(
1774 false,
1775 "no CgroupStats run-delay reduction for {reduction:?}"
1776 );
1777 f64::NAN
1778 }
1779 },
1780 SampleSource::TimerLatencyNs => match reduction {
1781 SampleReduction::Median => cg.median_timer_latency_us,
1782 SampleReduction::P99 => cg.p99_timer_latency_us,
1783 SampleReduction::P999 => cg.p999_timer_latency_us,
1784 SampleReduction::Worst => cg.worst_timer_latency_us,
1785 SampleReduction::Cv | SampleReduction::Mean => {
1786 debug_assert!(false, "no CgroupStats timer reduction for {reduction:?}");
1787 f64::NAN
1788 }
1789 },
1790 }
1791}
1792
1793/// Populate cross-RUN aggregate entries for every registered
1794/// `crate::stats::MetricDef` whose `read_sample` returns finite
1795/// values across the entire sample series. Writes into
1796/// `target` (typically `ScenarioStats::ext_metrics`) under the
1797/// metric's registry name — the same key the per-phase
1798/// [`PhaseBucket::metrics`] uses, so cross-RUN and per-phase
1799/// consumers reference the same name.
1800///
1801/// Existing keys are NOT overwritten — a typed GauntletRow field's
1802/// value (populated via the MetricDef accessor at sidecar-write
1803/// time) wins on the read path, and this fn fills the gap for
1804/// registered metrics that have a `read_sample` wire but no typed
1805/// GauntletRow field. Without this fill, `cargo ktstr perf-delta`
1806/// silently skips the metric (read returns None on both sides, so the
1807/// `(None, None)` arm drops the pair).
1808///
1809/// Per-phase reduction dispatch is described on [`PhaseBucket`];
1810/// the cross-RUN fold here uses `crate::stats::aggregate_samples_for_phase`
1811/// over the full sample series, with TYPED_FIELD_NAMES gating to
1812/// avoid duplicating typed-accessor sources.
1813pub fn populate_run_ext_metrics(
1814 samples: &crate::scenario::sample::SampleSeries,
1815 target: &mut std::collections::BTreeMap<String, f64>,
1816) {
1817 // Typed-backed keys are skipped via the module-level TYPED_FIELD_NAMES
1818 // (shared with populate_run_ext_metrics_from_phases) so only
1819 // ext-metrics-only registry entries are populated here.
1820 for metric_def in crate::stats::METRICS {
1821 if target.contains_key(metric_def.name) {
1822 continue;
1823 }
1824 if TYPED_FIELD_NAMES.contains(&metric_def.name) {
1825 continue;
1826 }
1827 let readings: Vec<f64> = samples
1828 .iter_samples()
1829 .filter_map(|s| metric_def.read_sample(&s))
1830 .collect();
1831 if readings.is_empty() {
1832 continue;
1833 }
1834 if let Some(reduced) = crate::stats::aggregate_samples_for_phase(metric_def, &readings) {
1835 target.insert(metric_def.name.to_string(), reduced);
1836 }
1837 }
1838 // Run-level capture-window wall for the IRQ rates: the elapsed span of the
1839 // per_cpu_time-bearing freezes — the SAME freezes the IRQ-counter numerators
1840 // above were read_sampled from (whole-run last-minus-first via
1841 // phase_counter_delta). Inserting it HERE (the direct/whole-run path) makes
1842 // the run-level rate's numerator AND denominator share the whole-run span.
1843 // Without it, derive_rate_metrics below finds the numerator but no
1844 // denominator (total_phase_wall_sec has no read_sample arm), and the rate
1845 // would instead be derived by populate_run_ext_metrics_from_phases over a
1846 // Σ-per-phase-capture denominator — a NARROWER time base than this whole-run
1847 // numerator (it excludes the inter-phase / cross-capture gaps the numerator
1848 // counts), inflating the rate on MULTI-phase runs. Run-level analog of the
1849 // per-phase assert::phase_build. Gated on total_hardirqs
1850 // (irqtime-independent), so the count-rates derive even when
1851 // CONFIG_IRQ_TIME_ACCOUNTING is off and total_irq_time_ns is absent. Min/max
1852 // over the elapsed-bearing freezes: chronological periodic samples put the
1853 // numerator's first/last reading at min/max elapsed, so num and den span the
1854 // identical interval.
1855 if target.contains_key("total_hardirqs") {
1856 let irq_elapsed_ms: Vec<u64> = samples
1857 .iter_samples()
1858 .filter(|s| !s.snapshot.per_cpu_time().is_empty())
1859 .filter_map(|s| s.elapsed_ms)
1860 .collect();
1861 if let (Some(first), Some(last)) = (
1862 irq_elapsed_ms.iter().min().copied(),
1863 irq_elapsed_ms.iter().max().copied(),
1864 ) && last > first
1865 {
1866 let wall_ms = (last - first) as f64;
1867 target
1868 .entry("total_phase_wall_ns".to_string())
1869 .or_insert(wall_ms * 1_000_000.0);
1870 target
1871 .entry("total_phase_wall_sec".to_string())
1872 .or_insert(wall_ms / 1000.0);
1873 }
1874 }
1875 // Re-derive Rate metrics from the read_sample components just folded
1876 // in. populate_run_ext_metrics is pub and called standalone (tests,
1877 // and not only ahead of populate_run_ext_metrics_from_phases), so it
1878 // derives its own rates to stay self-contained.
1879 crate::stats::derive_rate_metrics(target);
1880}
1881
1882/// Derive the per-phase scalar metrics ([`crate::stats::MetricKind::PerPhase`])
1883/// for every cgroup carrier in each phase. Two families:
1884///
1885/// 1. NON-schbench carrier scalars (via [`write_carrier_scalars`], for EVERY
1886/// cgroup regardless of work type): the wake/run-delay/off-cpu distributions
1887/// + the migration/iterations/locality ratios + the carrier counters,
1888/// written ONLY into each carrier's `PhaseCgroupStats::metrics` (per-cgroup,
1889/// read via `phase_cgroup_metric`) — NO pooled [`PhaseBucket::metrics`]
1890/// entry; their run-level aggregate is the `worst_*` ext-metrics key.
1891/// 2. schbench scalars (via [`write_schbench_scalars`], only for cgroups
1892/// carrying a `SchbenchPhaseStats`): written per-cgroup into
1893/// `PhaseCgroupStats::metrics` AND, pooled across the phase's schbench
1894/// carriers, into [`PhaseBucket::metrics`] (read via `phase_metric`).
1895/// Percentiles MUST come from the POOLED histogram (combine the latency
1896/// histograms + integer-add the run-delay raw pairs), never an average of
1897/// per-cgroup percentiles; the run-delay means are ABSENT when `pcount == 0`
1898/// so a never-scheduled class is not a false `0`.
1899///
1900/// MUST run POST-merge: the buckets are final and keyed by `step_index`, so a
1901/// per-phase A/B claim reads the value via `phase_metric` /
1902/// `phase_cgroup_metric`. It runs after the per-cgroup carriers are folded
1903/// ([`fold_guest_per_cgroup_into_host_buckets`]) and after any
1904/// [`merge_matched_phase_buckets`] — both SKIP is_derived keys (the `continue`
1905/// in the merge loop), so deriving earlier would DROP the keys. Idempotent
1906/// (overwrites the keys), so callers that rebuild buckets per call (e.g.
1907/// `VmResult::phase_buckets`) may re-run it freely.
1908// doc_lazy_continuation: pre-existing numbered-list wording surfaced by the clippy
1909// 1.94 bump; renders fine. Suppress rather than reflow the prose.
1910#[allow(clippy::doc_lazy_continuation)]
1911pub(crate) fn derive_phase_metrics(phases: &mut [PhaseBucket]) {
1912 use crate::workload::schbench::run::SchbenchPhaseStats;
1913
1914 for bucket in phases.iter_mut() {
1915 // Derive the schbench scalars BOTH per cgroup (into pc.metrics — the
1916 // per-cgroup set: N cgroups -> N queryable sets) AND pooled across the
1917 // phase's cgroups (into bucket.metrics — the aggregate). Both go through the
1918 // SAME reducer (write_schbench_scalars) from the SAME carriers, so the
1919 // pooled set is the cross-cgroup re-pool of the per-cgroup carriers (a
1920 // percentile re-derives from the pooled histogram via PlatStats::combine =
1921 // bucket-count add, never averaged), and pooled == cross-cgroup re-pool.
1922 let mut pooled: Option<SchbenchPhaseStats> = None;
1923 let mut pooled_taobench: Option<crate::workload::taobench::run::TaobenchPhaseStats> = None;
1924 for pc in bucket.per_cgroup.values_mut() {
1925 // Non-schbench carrier-derived metrics (wake/run-delay/off-cpu
1926 // distributions + the migration/iterations/locality ratios + the
1927 // carrier counters), emitted for EVERY cgroup regardless of work
1928 // type. Runs BEFORE the schbench block: `write_carrier_scalars`
1929 // takes `&mut pc` (its summaries read many `pc` fields, then it
1930 // writes `pc.metrics`), so it cannot share the iteration with the
1931 // disjoint-field borrow the schbench block relies on.
1932 write_carrier_scalars(pc);
1933 // Disjoint field borrows: `s` reads pc.schbench, the reducer writes
1934 // pc.metrics — different fields, so the immutable + mutable borrows of
1935 // `*pc` coexist.
1936 if let Some(s) = pc.schbench.as_ref() {
1937 write_schbench_scalars(s, &mut pc.metrics);
1938 // `take()` completes the borrow before the reassignment in both arms.
1939 match pooled.take() {
1940 Some(mut acc) => {
1941 acc.merge(s);
1942 pooled = Some(acc);
1943 }
1944 None => pooled = Some(s.clone()),
1945 }
1946 }
1947 // taobench per-phase: same disjoint-field-borrow pattern (`t` reads
1948 // pc.taobench, the reducer writes pc.metrics) + pool across cgroups.
1949 if let Some(t) = pc.taobench.as_ref() {
1950 write_taobench_scalars(t, &mut pc.metrics);
1951 match pooled_taobench.take() {
1952 Some(mut acc) => {
1953 acc.merge(t);
1954 pooled_taobench = Some(acc);
1955 }
1956 None => pooled_taobench = Some(t.clone()),
1957 }
1958 }
1959 }
1960 if let Some(p) = pooled {
1961 write_schbench_scalars(&p, &mut bucket.metrics);
1962 }
1963 if let Some(t) = pooled_taobench {
1964 write_taobench_scalars(&t, &mut bucket.metrics);
1965 }
1966 }
1967}
1968
1969/// Write the per-phase per-cgroup NON-schbench DERIVED scalars from ONE
1970/// carrier into `pc.metrics`, keyed by registry [`crate::stats::MetricDef`]
1971/// name — the per-cgroup analog of the run-level reductions, single-sourced
1972/// through the shared ratio helpers in [`crate::assert::reductions`] (so the
1973/// per-cgroup value and the run-level `worst_*` fold agree by construction)
1974/// and the carrier summary methods. Each gate mirrors the carrier's ABSENT
1975/// discipline: an empty wake pool, a not-measured off-CPU state, a
1976/// zero-worker cgroup, or a no-NUMA-pages cgroup emits no key (reads as
1977/// missing, never a false 0); `migration_ratio` is ALWAYS present (a measured
1978/// 0.0 when no iterations ran).
1979///
1980/// Unlike the schbench families, these families have NO pooled
1981/// `PhaseBucket::metrics` entry at all: as PerPhase / is_derived metrics whose
1982/// `read_sample` is `None`, the per-phase fold never writes them to
1983/// `bucket.metrics`. Their run-level aggregate is the `worst_*` key
1984/// `populate_run_distribution_metrics` writes into the run-level
1985/// `ScenarioStats::ext_metrics` map; per-phase they are queryable ONLY per-cgroup
1986/// via `pc.metrics`. So this writes ONLY `pc.metrics`, never `bucket.metrics` —
1987/// the per-cgroup carrier value is authoritative for the per-cgroup question.
1988///
1989/// Takes `&mut PhaseCgroupStats` (not `(&pc, &mut pc.metrics)`): it reads many
1990/// `pc` fields AND writes `pc.metrics`, so the summaries/ratios are bound to
1991/// locals from `&*pc` first, then the `&mut pc.metrics` borrow is taken once.
1992fn write_carrier_scalars(pc: &mut PhaseCgroupStats) {
1993 use crate::assert::reductions::{
1994 cross_node_migration_ratio_of, iterations_per_cpu_sec_of, iterations_per_worker_of,
1995 migration_ratio_of, page_locality_of,
1996 };
1997 // Read everything from &*pc into locals BEFORE the &mut pc.metrics borrow.
1998 let wake = pc.wake_summary().zip(pc.wake_cv());
1999 let timer = pc.timer_summary();
2000 let run_delay = pc.run_delay_summary();
2001 let off_cpu = pc.off_cpu_summary();
2002 let migration_ratio = migration_ratio_of(pc.total_migrations, pc.total_iterations);
2003 let ipw = iterations_per_worker_of(pc.num_workers, pc.total_iterations);
2004 let ipcs = iterations_per_cpu_sec_of(pc.num_workers, pc.total_cpu_time_ns, pc.total_iterations);
2005 let numa = if pc.numa_pages_total > 0 {
2006 Some((
2007 page_locality_of(pc.numa_pages_local, pc.numa_pages_total),
2008 cross_node_migration_ratio_of(pc.cross_node_migrated, pc.numa_pages_total),
2009 ))
2010 } else {
2011 None
2012 };
2013
2014 let m = &mut pc.metrics;
2015 // WAKE — all three or none (wake_summary + wake_cv share the empty-pool gate).
2016 if let Some(((p99, median), cv)) = wake {
2017 m.insert("p99_wake_latency_us".to_string(), p99);
2018 m.insert("median_wake_latency_us".to_string(), median);
2019 m.insert("wake_latency_cv".to_string(), cv);
2020 }
2021 // TIMER — all three or none (timer_summary shares the empty-pool gate).
2022 if let Some((median, p99, p999)) = timer {
2023 m.insert("median_timer_latency_us".to_string(), median);
2024 m.insert("p99_timer_latency_us".to_string(), p99);
2025 m.insert("p999_timer_latency_us".to_string(), p999);
2026 }
2027 // RUN-DELAY — both or none.
2028 if let Some((mean, worst)) = run_delay {
2029 m.insert("mean_run_delay_us".to_string(), mean);
2030 m.insert("max_run_delay_us".to_string(), worst);
2031 }
2032 // OFF-CPU — four or none (None == not-measured).
2033 if let Some((avg, min, max, spread)) = off_cpu {
2034 m.insert("avg_off_cpu_pct".to_string(), avg);
2035 m.insert("min_off_cpu_pct".to_string(), min);
2036 m.insert("max_off_cpu_pct".to_string(), max);
2037 m.insert("off_cpu_spread_pct".to_string(), spread);
2038 }
2039 // RATIOS — migration_ratio ALWAYS (measured 0.0 when no iterations ran).
2040 m.insert("migration_ratio".to_string(), migration_ratio);
2041 if let Some(v) = ipw {
2042 m.insert("iterations_per_worker".to_string(), v);
2043 }
2044 if let Some(v) = ipcs {
2045 m.insert("iterations_per_cpu_sec".to_string(), v);
2046 }
2047 // NUMA ratios only when pages were observed (mirrors the carrier's
2048 // numa_pages_total>0 gate; absent is more honest than a 0.0 default).
2049 if let Some((locality, cross_node)) = numa {
2050 m.insert("page_locality".to_string(), locality);
2051 m.insert("cross_node_migration_ratio".to_string(), cross_node);
2052 }
2053}
2054
2055/// Write the per-phase schbench scalar metrics derived from ONE
2056/// `SchbenchPhaseStats` (a single cgroup's carrier, or the cross-cgroup pool)
2057/// into `out`, keyed by registry [`crate::stats::MetricDef`] name. The sole
2058/// producer of these keys for both the per-cgroup ([`PhaseCgroupStats::metrics`])
2059/// and pooled ([`PhaseBucket::metrics`]) maps — one derivation, no duplicated
2060/// percentile / min/max / rps / sched-delay math. Each gate mirrors the carrier's
2061/// ABSENT discipline: an empty histogram or a zero pcount emits no key (reads as
2062/// missing, never a false 0); loop_count is always present (0 = a real measured
2063/// no-cycles outcome).
2064fn write_schbench_scalars(
2065 p: &crate::workload::schbench::run::SchbenchPhaseStats,
2066 out: &mut std::collections::BTreeMap<String, f64>,
2067) {
2068 use crate::stats::{
2069 SCHBENCH_LOOP_COUNT, SCHBENCH_REQUEST_MAX_US, SCHBENCH_REQUEST_MIN_US,
2070 SCHBENCH_REQUEST_P50_US, SCHBENCH_REQUEST_P90_US, SCHBENCH_REQUEST_P99_US,
2071 SCHBENCH_REQUEST_P999_US, SCHBENCH_RPS_MAX, SCHBENCH_RPS_MIN, SCHBENCH_RPS_P20,
2072 SCHBENCH_RPS_P50, SCHBENCH_RPS_P90, SCHBENCH_SCHED_DELAY_MSG_US,
2073 SCHBENCH_SCHED_DELAY_WORKER_US, SCHBENCH_WAKEUP_MAX_US, SCHBENCH_WAKEUP_MIN_US,
2074 SCHBENCH_WAKEUP_P50_US, SCHBENCH_WAKEUP_P90_US, SCHBENCH_WAKEUP_P99_US,
2075 SCHBENCH_WAKEUP_P999_US,
2076 };
2077 use crate::workload::schbench::plat::Pct;
2078
2079 if p.wakeup.sample_count() > 0 {
2080 let q = p.wakeup.percentiles();
2081 out.insert(
2082 SCHBENCH_WAKEUP_P50_US.to_string(),
2083 q.value_at(Pct::P50) as f64,
2084 );
2085 out.insert(
2086 SCHBENCH_WAKEUP_P90_US.to_string(),
2087 q.value_at(Pct::P90) as f64,
2088 );
2089 out.insert(
2090 SCHBENCH_WAKEUP_P99_US.to_string(),
2091 q.value_at(Pct::P99) as f64,
2092 );
2093 out.insert(
2094 SCHBENCH_WAKEUP_P999_US.to_string(),
2095 q.value_at(Pct::P999) as f64,
2096 );
2097 out.insert(SCHBENCH_WAKEUP_MIN_US.to_string(), q.min as f64);
2098 out.insert(SCHBENCH_WAKEUP_MAX_US.to_string(), q.max as f64);
2099 }
2100 if p.request.sample_count() > 0 {
2101 let q = p.request.percentiles();
2102 out.insert(
2103 SCHBENCH_REQUEST_P50_US.to_string(),
2104 q.value_at(Pct::P50) as f64,
2105 );
2106 out.insert(
2107 SCHBENCH_REQUEST_P90_US.to_string(),
2108 q.value_at(Pct::P90) as f64,
2109 );
2110 out.insert(
2111 SCHBENCH_REQUEST_P99_US.to_string(),
2112 q.value_at(Pct::P99) as f64,
2113 );
2114 out.insert(
2115 SCHBENCH_REQUEST_P999_US.to_string(),
2116 q.value_at(Pct::P999) as f64,
2117 );
2118 out.insert(SCHBENCH_REQUEST_MIN_US.to_string(), q.min as f64);
2119 out.insert(SCHBENCH_REQUEST_MAX_US.to_string(), q.max as f64);
2120 }
2121 // Per-phase achieved-RPS distribution (the control thread's per-second samples
2122 // attributed to this epoch). Gated on sample_count()>0 so a phase shorter than
2123 // the ~1s control cadence reads ABSENT, never rps=0. schbench's RPS table is
2124 // PLIST_FOR_RPS = 20/50/90 (schbench.c:130) + min/max.
2125 if p.rps.sample_count() > 0 {
2126 let r = p.rps.percentiles();
2127 out.insert(SCHBENCH_RPS_P20.to_string(), r.value_at(Pct::P20) as f64);
2128 out.insert(SCHBENCH_RPS_P50.to_string(), r.value_at(Pct::P50) as f64);
2129 out.insert(SCHBENCH_RPS_P90.to_string(), r.value_at(Pct::P90) as f64);
2130 out.insert(SCHBENCH_RPS_MIN.to_string(), r.min as f64);
2131 out.insert(SCHBENCH_RPS_MAX.to_string(), r.max as f64);
2132 }
2133 // Sample-weighted run-delay mean (Σrun_delay / Σpcount), ns→µs; ABSENT when
2134 // pcount==0 (a never-scheduled class) so it reads as missing, not 0.
2135 if p.msg_pcount > 0 {
2136 let mean_us = p.msg_run_delay_ns as f64 / p.msg_pcount as f64 / 1000.0;
2137 out.insert(SCHBENCH_SCHED_DELAY_MSG_US.to_string(), mean_us);
2138 }
2139 if p.worker_pcount > 0 {
2140 let mean_us = p.worker_run_delay_ns as f64 / p.worker_pcount as f64 / 1000.0;
2141 out.insert(SCHBENCH_SCHED_DELAY_WORKER_US.to_string(), mean_us);
2142 }
2143 // loop_count is always present for a schbench carrier: 0 = no cycles ran (a real
2144 // measured value; HigherBetter → worst), distinct from a non-schbench carrier
2145 // which has no schbench data at all (the caller skips it).
2146 out.insert(SCHBENCH_LOOP_COUNT.to_string(), p.loop_count as f64);
2147}
2148
2149/// Write the per-phase taobench scalar metrics derived from ONE
2150/// `TaobenchStats` (a single cgroup's carrier, or the cross-cgroup pool)
2151/// into `out`, keyed by registry [`crate::stats::MetricDef`] name. The sole
2152/// producer of these keys for both the per-cgroup ([`PhaseCgroupStats::metrics`])
2153/// and pooled ([`PhaseBucket::metrics`]) maps. ABSENT discipline: the qps keys
2154/// only when the wall window was measured (`elapsed_ns > 0`); hit_ratio only when
2155/// ops completed (`total > 0`); hit_rate only when lookups were issued
2156/// (`get_cmds > 0`) — a not-measured value reads as missing, never a false 0.
2157fn write_taobench_scalars(
2158 p: &crate::workload::taobench::run::TaobenchPhaseStats,
2159 out: &mut std::collections::BTreeMap<String, f64>,
2160) {
2161 use crate::stats::{
2162 TAOBENCH_FAST_QPS, TAOBENCH_HIT_RATE, TAOBENCH_HIT_RATIO, TAOBENCH_SLOW_QPS,
2163 TAOBENCH_TOTAL_QPS,
2164 };
2165 let c = &p.counters;
2166 let total = c.total_ops();
2167 if c.elapsed_ns > 0 {
2168 let secs = c.elapsed_ns as f64 / 1e9;
2169 out.insert(TAOBENCH_TOTAL_QPS.to_string(), total as f64 / secs);
2170 out.insert(TAOBENCH_FAST_QPS.to_string(), c.fast_ops as f64 / secs);
2171 out.insert(TAOBENCH_SLOW_QPS.to_string(), c.slow_ops as f64 / secs);
2172 }
2173 if total > 0 {
2174 out.insert(
2175 TAOBENCH_HIT_RATIO.to_string(),
2176 c.fast_ops as f64 / total as f64,
2177 );
2178 }
2179 if c.get_cmds > 0 {
2180 out.insert(
2181 TAOBENCH_HIT_RATE.to_string(),
2182 1.0 - (c.get_misses as f64 / c.get_cmds as f64),
2183 );
2184 }
2185 // Per-phase serve-latency percentiles (open-loop only): the µs distribution
2186 // pooled across this phase's cgroups, re-derived over the union histogram.
2187 // Absent when no serve samples (closed loop, or a stream with no completions).
2188 write_taobench_serve_scalars(&p.serve_lat, out);
2189}
2190
2191/// Write the per-phase taobench serve-latency percentile scalars from a pooled
2192/// `PlatStats` into `out` (registry keys), gated on `sample_count() > 0` so a
2193/// closed-loop / no-sample carrier emits nothing (absent, never a false 0).
2194fn write_taobench_serve_scalars(
2195 serve: &crate::workload::schbench::plat::PlatStats,
2196 out: &mut std::collections::BTreeMap<String, f64>,
2197) {
2198 use crate::stats::{
2199 TAOBENCH_SERVE_MAX_US, TAOBENCH_SERVE_MIN_US, TAOBENCH_SERVE_P50_US, TAOBENCH_SERVE_P90_US,
2200 TAOBENCH_SERVE_P99_US, TAOBENCH_SERVE_P999_US,
2201 };
2202 use crate::workload::schbench::plat::Pct;
2203 if serve.sample_count() == 0 {
2204 return;
2205 }
2206 let q = serve.percentiles();
2207 out.insert(
2208 TAOBENCH_SERVE_P50_US.to_string(),
2209 q.value_at(Pct::P50) as f64,
2210 );
2211 out.insert(
2212 TAOBENCH_SERVE_P90_US.to_string(),
2213 q.value_at(Pct::P90) as f64,
2214 );
2215 out.insert(
2216 TAOBENCH_SERVE_P99_US.to_string(),
2217 q.value_at(Pct::P99) as f64,
2218 );
2219 out.insert(
2220 TAOBENCH_SERVE_P999_US.to_string(),
2221 q.value_at(Pct::P999) as f64,
2222 );
2223 out.insert(TAOBENCH_SERVE_MIN_US.to_string(), q.min as f64);
2224 out.insert(TAOBENCH_SERVE_MAX_US.to_string(), q.max as f64);
2225}