ktstr/assert/phase_build.rs
1use super::*;
2
3/// Phase buckets attributed against the guest stimulus timeline, then
4/// enriched with stimulus-event-derived per-phase `iteration_rate`.
5///
6/// Unlike the plain [`build_phase_buckets`] (which groups by the
7/// bridge-stamped step_index), this re-groups each periodic capture by
8/// the guest step whose stimulus window contains the capture's
9/// workload-relative boundary offset (`Sample::boundary_offset_ms`).
10/// That offset is derived from the boundary schedule rather than the
11/// fire time, so it is immune to the deferred-fire burst that makes
12/// every capture stamp the same late CURRENT_STEP (the
13/// `phases.len() == 1` collapse). Captures with no offset (on-demand /
14/// fixture) fall back to their stamped step_index. Because the bucket
15/// windows are then workload-relative, the run-relative monitor samples
16/// are shifted by the stimulus/monitor clock skew before windowing.
17///
18/// Additionally synthesizes a capture-free `PhaseBucket`
19/// (`sample_count == 0`) for any stimulus `StepStart`-step that
20/// captured no periodic samples — the uniform whole-workload boundary
21/// placement (`compute_periodic_boundaries_ns`) is step-agnostic, so a
22/// short interior step can land zero captures and otherwise leave no
23/// bucket, silently dropping its capture-independent `iteration_rate`.
24/// The synthesized bucket carries the step's full stimulus window so its
25/// `iteration_rate` (from `StepStart`/`StepEnd` deltas) and
26/// `avg_imbalance_ratio` (from in-window monitor samples) are still
27/// recovered. The returned vec therefore holds one bucket per
28/// (captured phase ∪ `StepStart`-step), sorted by `step_index` — NOT
29/// one-per-captured-phase, so `len()` is no longer "number of captured
30/// phases".
31///
32/// The `iteration_rate` enrichment lets
33/// `crate::timeline::Timeline::from_phase_buckets` render the per-phase
34/// throughput annotation without going through the legacy
35/// `crate::timeline::Timeline::build` path.
36///
37/// For each `StepStart[k]` -> `StepEnd[k]` pair with
38/// `total_iterations: Some(_)`, the per-phase rate is
39/// `(later - earlier) / duration_s` where `duration_s` is the
40/// elapsed-ms delta BETWEEN THE TWO STIMULUS EVENTS (guest clock),
41/// not the PhaseBucket sample window. The rate is attributed to the
42/// step the EARLIER event starts (`prev.step_index`); the attribution
43/// loop skips any `is_step_end` (or `is_terminal`) `prev`, so only a
44/// StepStart is ever the earlier member. Phases that don't overlap a
45/// stimulus pair keep their PhaseBucket.metrics map unchanged (no
46/// iteration_rate key).
47///
48/// SEMANTICS: `total_iterations` is the sum of the worker handles
49/// alive at each event (see
50/// [`crate::timeline::StimulusEvent::total_iterations`]). Each step's
51/// rate is its STEP-LOCAL `StepStart[k]` -> `StepEnd[k]` delta — the
52/// step's own workers measured over its own hold — so a bucket is
53/// sourced ONLY by its own pair (the `is_step_end` guard drops the
54/// inter-step `StepEnd[k]` -> `StepStart[k+1]` pair entirely). This
55/// measures BOTH fresh-per-step workers (which read ~0 at each
56/// StepStart, so the old cross-step delta produced no rate) and
57/// persistent (Backdrop) workers (excluding the inter-step teardown
58/// wall-time a cross-step window would span). On a clean run the
59/// `(StepEnd[N], terminal)` pair is guard-skipped and the trailing
60/// `is_terminal` event is not consumed; it supplies a step's right
61/// boundary ONLY for legacy/synthetic data carrying a `ScenarioEnd`
62/// frame but no `StepEnd` frames. A sched-died step has neither frame
63/// (its early return skips both emissions), so the dead step's
64/// `StepStart` is never a `prev` with a successor and it reports no
65/// rate.
66///
67/// iteration_rate is registered as `MetricKind::Rate` with the Counter
68/// components `total_phase_iterations` / `total_phase_duration_sec` and
69/// `HigherBetter` polarity (more throughput is better). The per-step
70/// producer below emits those two components (the iteration delta and the
71/// window seconds — the ms→s `/1000` applied at the component, since
72/// `derive_rate_metrics` does a bare num/den) rather than a ready ratio,
73/// and the `derive_rate_metrics` post-pass re-derives `iteration_rate` =
74/// Σiterations / Σseconds at every in-map aggregation level. Its per-run
75/// run-scalar fold (one run's per-phase values → that run's `ext_metrics`)
76/// runs through `populate_run_ext_metrics_from_phases`, which SUMS the
77/// Counter components across phases (a synthesized zero-capture phase's
78/// components are summed in, not zero-weighted out — the run-aggregate
79/// completion of the per-step rate handling) and re-derives the rate. The
80/// cross-sidecar-run rollup `group_and_average_by` likewise re-pools via
81/// its `derive_rate_metrics` post-pass. `iteration_rate` has no cross-cgroup
82/// axis to re-pool: it is derived from run-level phase buckets and
83/// host-injected into the run `ext_metrics` by
84/// `populate_run_ext_metrics_from_phases` (the eval layer) AFTER the
85/// cross-cgroup `merge`, so `AssertResult::merge`'s worst-case
86/// (min/max-by-polarity) `ext_metrics` fold never sees its components. The rate whose components ARE per-cgroup is the separate
87/// pooled `iterations_per_cpu_sec`, re-pooled across a run's cgroups by
88/// `populate_run_pooled_iterations_per_cpu_sec` (reading `stats.cgroups`
89/// post-merge).
90///
91/// Live caller: `evaluate_vm_result` at `src/test_support/eval/mod.rs`
92/// — has both the SampleSeries and the stimulus_events vec in scope.
93pub fn build_phase_buckets_with_stimulus(
94 samples: &crate::scenario::sample::SampleSeries,
95 stimulus_events: &[crate::timeline::StimulusEvent],
96) -> Vec<PhaseBucket> {
97 let monitor_samples: &[crate::monitor::MonitorSample] =
98 samples.monitor().map(|m| m.samples()).unwrap_or(&[]);
99 // vCPU-preemption exemption window for the per-phase stall predicate,
100 // threaded into fold_monitor_into_bucket -> compute_metrics so the
101 // per-phase stuck_count uses the SAME predicate as the run-level
102 // MonitorSummary::stuck_count. 0 (no monitor) derives from CONFIG_HZ
103 // inside compute_metrics, matching from_samples_with_threshold.
104 let preemption_threshold_ns: u64 = samples
105 .monitor()
106 .map(|m| m.preemption_threshold_ns())
107 .unwrap_or(0);
108 // Re-group periodic captures by the guest step whose stimulus
109 // window contains each capture's workload-relative boundary offset,
110 // NOT the step_index stamped at (deferred) fire time — see
111 // [`crate::scenario::sample::SampleSeries::by_stimulus_phase`] for
112 // why the scheduled offset is the timing-independent truth (it
113 // survives the deferred-fire burst that collapses the stamped
114 // step_index to one phase).
115 let by_phase = samples.by_stimulus_phase(stimulus_events);
116 // Bucket windows are workload-relative (boundary-offset) here, so the
117 // monitor samples (run-relative) are shifted by the stimulus/monitor
118 // clock skew before windowing.
119 let monitor_to_window_offset_ms = monitor_clock_offset(stimulus_events, monitor_samples);
120 let mut buckets = buckets_from_grouped(
121 by_phase,
122 monitor_samples,
123 monitor_to_window_offset_ms,
124 preemption_threshold_ns,
125 );
126 let step_starts = crate::scenario::sample::step_starts_from_stimulus(stimulus_events);
127 synthesize_missing_step_buckets(
128 &mut buckets,
129 &step_starts,
130 stimulus_events,
131 monitor_samples,
132 monitor_to_window_offset_ms,
133 preemption_threshold_ns,
134 );
135 // Keep buckets in step_index order: buckets_from_grouped emits them
136 // sorted (BTreeMap key order), but the synthesize loop appends out of
137 // order. Downstream lookups resolve by step_index, but a sorted vec
138 // matches the captured-bucket invariant and keeps rendered output
139 // stable.
140 buckets.sort_by_key(|b| b.step_index);
141 fill_phase_iteration_rates(&mut buckets, stimulus_events);
142 buckets
143}
144
145/// Synthesize a `PhaseBucket` for any scenario step that has a stimulus
146/// StepStart but produced no capture bucket. Periodic boundaries are
147/// placed uniformly over the whole workload (step-agnostic — see
148/// `compute_periodic_boundaries_ns`), so a short interior step can
149/// capture zero samples and leave no bucket. The iteration_rate
150/// attribution in [`fill_phase_iteration_rates`] only mutates EXISTING
151/// buckets, so without this seam that step's capture-independent rate
152/// (derived purely from the StepStart/StepEnd total_iterations deltas,
153/// needing no capture) would be silently dropped. The synthesized bucket
154/// carries the step's true stimulus window so `fold_monitor_into_bucket`
155/// still recovers its monitor-derived imbalance; `sample_count == 0` marks
156/// it capture-free for downstream consumers. `Timeline::from_phase_buckets`
157/// COMPARES this bucket's stimulus-derived throughput across the gap
158/// (the iteration_rate is real, not a sampling artifact) but GATES the
159/// monitor-derived metrics (imbalance/dsq/fallback/keep_last) behind
160/// both sides having samples — see `detect_boundary_changes`. BASELINE
161/// (step 0) is synthesized only if a StepStart carries
162/// `step_index == 0` — it is not special-cased.
163fn synthesize_missing_step_buckets(
164 buckets: &mut Vec<PhaseBucket>,
165 step_starts: &[(u64, u16)],
166 stimulus_events: &[crate::timeline::StimulusEvent],
167 monitor_samples: &[crate::monitor::MonitorSample],
168 monitor_to_window_offset_ms: i64,
169 preemption_threshold_ns: u64,
170) {
171 for &(start_ms, k) in step_starts {
172 if buckets.iter().any(|b| b.step_index == k) {
173 continue;
174 }
175 // Right edge: the step's own StepEnd, CLAMPED to the next
176 // step-start so a non-monotonic (corrupt-wire) StepEnd[k] >=
177 // StepStart[k+1] can never extend this synthesized window past the
178 // next step's start (which would fold the same MonitorSample into
179 // two adjacent synthesized buckets). Falls back to the next
180 // step-start alone, else open-ended (the last step on data with no
181 // StepEnd — e.g. sched-died — which the rate loop leaves rateless).
182 // Earliest StepEnd elapsed for k (min, symmetric with start_ms's
183 // earliest step-start), so a duplicate/corrupt StepEnd later in the
184 // vec can't pick a wrong right edge.
185 let step_end = stimulus_events
186 .iter()
187 .filter(|e| e.is_step_end && e.step_index == Some(k))
188 .map(|e| e.elapsed_ms)
189 .min();
190 // `ms > start_ms` (strict): two DISTINCT step_index sharing the
191 // same elapsed is corrupt-wire only (build_stimulus emits StepStarts
192 // at distinct monotone guest-clock times), so the equal-elapsed
193 // window-overlap edge is out of scope — its worst case is a
194 // redundant double-fold of one monitor sample, never a drop or panic.
195 let next_start = step_starts
196 .iter()
197 .filter(|&&(ms, _)| ms > start_ms)
198 .map(|&(ms, _)| ms)
199 .min();
200 // Last-resort right edge for a last step with no StepEnd and no
201 // successor: the scenario-end terminal event, bounding the window
202 // at run-end instead of folding every trailing teardown monitor
203 // sample into the bucket. NOTE the production sched-died path emits
204 // NEITHER a StepEnd NOR a ScenarioEnd (its early return skips both),
205 // so it carries no terminal here and falls to u64::MAX — matching
206 // Timeline::build, which extends the last phase to end-of-monitor.
207 // This terminal clamp therefore only binds for legacy/synthetic
208 // data carrying a ScenarioEnd frame without StepEnd frames.
209 let terminal = stimulus_events
210 .iter()
211 .find(|e| e.is_terminal)
212 .map(|e| e.elapsed_ms);
213 let end_ms = match (step_end, next_start) {
214 (Some(se), Some(ns)) => se.min(ns),
215 (Some(se), None) => se,
216 (None, Some(ns)) => ns,
217 (None, None) => terminal.unwrap_or(u64::MAX),
218 };
219 // BASELINE (step 0) is synthesized only if a StepStart carries
220 // step_index == 0; build_stimulus 1-indexes scenario Steps (Step 0
221 // -> step_index 1), so it never emits step_index 0 and BASELINE is
222 // intentionally never synthesized (its pre-first-Step settle window
223 // carries no meaningful per-step rate). The branch is kept for the
224 // convention, not for a production path.
225 let label = if k == 0 {
226 "BASELINE".to_string()
227 } else {
228 format!("Step[{}]", k.saturating_sub(1))
229 };
230 let mut bucket = PhaseBucket {
231 step_index: k,
232 label,
233 start_ms,
234 end_ms,
235 sample_count: 0,
236 metrics: std::collections::BTreeMap::new(),
237 per_cgroup: std::collections::BTreeMap::new(),
238 };
239 fold_monitor_into_bucket(
240 &mut bucket,
241 monitor_samples,
242 monitor_to_window_offset_ms,
243 preemption_threshold_ns,
244 );
245 buckets.push(bucket);
246 }
247}
248
249/// Fill each phase bucket's `iteration_rate` Rate components from the
250/// stimulus event `total_iterations` deltas. Walk events pairwise; for
251/// each pair compute the rate. Sort events by elapsed_ms first so an
252/// out-of-order arrival from the bulk-port drain doesn't silently lose the
253/// delta to saturating_sub (the legacy Timeline::build path at
254/// src/timeline.rs sorts the same way; without the sort, an inversion
255/// produces duration_ms == 0 → skipped, a silent drop).
256///
257/// Must run AFTER the synthesize seam and the step_index sort so it sees
258/// the full, ordered bucket set exactly as the returned vec will hold it.
259fn fill_phase_iteration_rates(
260 buckets: &mut [PhaseBucket],
261 stimulus_events: &[crate::timeline::StimulusEvent],
262) {
263 let mut sorted_events: Vec<&crate::timeline::StimulusEvent> = stimulus_events.iter().collect();
264 // Total-order on an elapsed_ms tie: StepEnd before StepStart
265 // (`!is_step_end` is false=0 for StepEnd) so a zero-length inter-step
266 // gap at the guest's coarse-ms clock attributes the step-local
267 // StepStart[k]->StepEnd[k] rate to bucket k, never the cross-step
268 // StepStart[k]->StepStart[k+1] delta (the is_step_end guard below only
269 // stops StepEnd from being `prev`; it does not order a StepEnd before
270 // the next StepStart on a tie). Mirrors Timeline::build's sort.
271 sorted_events.sort_by_key(|e| (e.elapsed_ms, !e.is_step_end));
272 for w in sorted_events.windows(2) {
273 let prev = w[0];
274 let curr = w[1];
275 // The terminal scenario-end event and per-step StepEnd events are
276 // right boundaries only — never the EARLIER member of a pair (the
277 // step a rate is attributed to). Both carry an end-of-hold count,
278 // not a step-start, so pairing them as `prev` would attribute the
279 // WRONG window to a bucket:
280 // - terminal sorts last so it is naturally only ever `curr`, but
281 // guard explicitly so a future caller producing a non-last or
282 // duplicate terminal can't fall into the `None` step_index
283 // timestamp-window branch below and attach a bogus rate.
284 // - StepEnd[k] carries step_index k (same as StepStart[k]); if its
285 // step's own StepStart[k] -> StepEnd[k] pair returned None (a
286 // BACKWARD count, e < s — a counter reset; a stalled step e == s
287 // instead yields a measured-ZERO rate, see
288 // StimulusEvent::rate_components), the bucket k key is still empty,
289 // so without this guard the next pair StepEnd[k] -> StepStart[k+1]
290 // would or_insert an inter-step teardown-gap rate into bucket k,
291 // mislabeling gap throughput as step k's. The guard keeps bucket k
292 // sourced ONLY by its own StepStart[k] -> StepEnd[k] pair (whether
293 // a measured zero or a real rate), never a leaked gap rate.
294 if prev.is_terminal || prev.is_step_end {
295 continue;
296 }
297 // Emit the iteration_rate Rate's two components (the per-step
298 // iteration delta + window seconds) rather than the ready ratio, so
299 // derive_rate_metrics re-pools Σiters/Σseconds at every aggregation
300 // level. rate_components is None only when the count went BACKWARD
301 // (e < s, a counter reset), an event lacks total_iterations, or the
302 // window is zero-length; a non-advancing count (e == s over a
303 // positive window) yields Some((0.0, secs)) — a measured zero, not
304 // None (the same shared formula Timeline::build's rate_to divides
305 // for display).
306 let Some((iters, secs)) = prev.rate_components(curr) else {
307 continue;
308 };
309 // Attribute the rate to the step PREV starts: the guards above
310 // leave `prev` as a StepStart (or a legacy/synthetic step event),
311 // so the pair (prev, curr) measures iterations accumulated from
312 // prev's step-start to curr's — the throughput DURING prev's step.
313 // Match by prev.step_index, NOT a timestamp window: the bucket
314 // windows here are workload-relative capture offsets (the step
315 // INTERIOR, 10-90% of the workload per
316 // compute_periodic_boundaries_ns), so prev.elapsed_ms — the
317 // step-START — lands in the inter-step gap, inside no bucket's
318 // [start_ms, end_ms) window; a window match would drop every
319 // rate. This mirrors the legacy Timeline::build alignment:
320 // phase[i] gets events[i]→events[i+1] where events[i] is
321 // phase[i]'s left boundary = prev.
322 //
323 // Stimulus events without a step stamp (step_index == None:
324 // legacy / synthetic callers) fall back to the timestamp
325 // window — half-open [start_ms, end_ms) with a single-sample
326 // (start == end) equality carve-out so a boundary event lands
327 // in exactly one bucket.
328 for bucket in buckets.iter_mut() {
329 let in_bucket = match prev.step_index {
330 Some(k) => bucket.step_index == k,
331 None => {
332 if bucket.start_ms == bucket.end_ms {
333 prev.elapsed_ms == bucket.start_ms
334 } else {
335 prev.elapsed_ms >= bucket.start_ms && prev.elapsed_ms < bucket.end_ms
336 }
337 }
338 };
339 if in_bucket {
340 // Both components come from the SAME (prev, curr) pair and are
341 // inserted together; or_insert keeps the first pair that
342 // matched this bucket (mirrors the prior first-wins
343 // iteration_rate attribution). The derive_rate_metrics
344 // post-pass below produces iteration_rate from them.
345 bucket
346 .metrics
347 .entry("total_phase_iterations".to_string())
348 .or_insert(iters);
349 bucket
350 .metrics
351 .entry("total_phase_duration_sec".to_string())
352 .or_insert(secs);
353 break;
354 }
355 }
356 }
357 // Stimulus/monitor components are injected ABOVE, AFTER
358 // buckets_from_grouped's own derive_rate_metrics already ran; re-derive
359 // here so a Rate over a post-injected component is produced for these
360 // buckets too. Idempotent: re-deriving an unchanged map is a no-op.
361 for bucket in buckets.iter_mut() {
362 crate::stats::derive_rate_metrics(&mut bucket.metrics);
363 }
364}
365
366/// Build per-phase metric buckets from a sample series.
367///
368/// Walks [`crate::scenario::sample::SampleSeries::by_stamped_phase`]
369/// to group every stamped sample under its bridge-stamped
370/// `step_index` (NOT re-derived from elapsed-ms windows; the
371/// bridge stamp is authoritative because the capture path knows
372/// the phase it fired from while the time window cannot recover
373/// the phase when stimulus events arrive late or out of order).
374///
375/// For each phase observed (BASELINE under `step_index = 0`,
376/// scenario Steps under `step_index = 1..=N` per the 1-indexed
377/// phase convention) emits one [`PhaseBucket`] with `step_index`
378/// as the key, `label` derived per the BASELINE/Step\[k\]
379/// convention, `start_ms` / `end_ms` from the first / last
380/// sample's `elapsed_ms`, `sample_count` from the bucketed
381/// samples, and `metrics` from the per-kind reduction described
382/// on [`PhaseBucket`]. Metrics whose per-sample reading returns
383/// `None` for every sample in the bucket are omitted entirely
384/// (absent → "no data") rather than collapsed to `Some(0.0)`
385/// (real zero), preserving the sentinel-free contract.
386///
387/// Returns an empty `Vec` when the input series is empty (no
388/// samples captured), distinct from returning a single empty
389/// BASELINE bucket — the former means the periodic-capture path
390/// never fired, the latter means it fired but no metric reading
391/// came back.
392///
393/// No live production caller: `evaluate_vm_result` in
394/// `src/test_support/eval/mod.rs` builds its phase buckets through the
395/// stimulus-aware sibling `build_phase_buckets_with_stimulus` (via
396/// `precompute_early_series`), and `AssertResult.stats.phases` is
397/// populated from those stimulus buckets. This plain variant is
398/// reachable only by full path, for the rare no-stimulus-timeline
399/// case. Exposed `pub` (not `pub(crate)`)
400/// so out-of-tree consumers — payload authors writing custom
401/// eval paths against the publicly-drainable
402/// `result.snapshot_bridge` — can produce the same per-phase
403/// aggregate shape without re-implementing the bucketing logic.
404pub fn build_phase_buckets(samples: &crate::scenario::sample::SampleSeries) -> Vec<PhaseBucket> {
405 // Borrowed per-tick monitor samples (None when no MonitorReport
406 // was attached, e.g. host-only fixture tests).
407 let monitor_samples: &[crate::monitor::MonitorSample] =
408 samples.monitor().map(|m| m.samples()).unwrap_or(&[]);
409 // vCPU-preemption exemption window for the per-phase stall predicate
410 // (see build_phase_buckets_with_stimulus). 0 (no monitor) derives from
411 // CONFIG_HZ inside compute_metrics.
412 let preemption_threshold_ns: u64 = samples
413 .monitor()
414 .map(|m| m.preemption_threshold_ns())
415 .unwrap_or(0);
416 // Group by the bridge-stamped step_index. Without a stimulus
417 // timeline to remap against, the stamped index is the only phase
418 // signal available — see `build_phase_buckets_with_stimulus` (and
419 // `SampleSeries::by_stimulus_phase`) for the offset-remap that
420 // corrects a deferred-fire burst (every capture stamping the same
421 // late CURRENT_STEP). The bucket window and the monitor folding
422 // share the run-relative frame here, so the clock offset is `0`;
423 // synthetic / legacy fixture samples carry `boundary_offset_ms =
424 // None` and the window falls back to `elapsed_ms`.
425 buckets_from_grouped(
426 samples.by_stamped_phase(),
427 monitor_samples,
428 0,
429 preemption_threshold_ns,
430 )
431}
432
433/// Per-phase CPU-time delta (ns) for one field family
434/// (`stime`/`signal_stime` or `utime`/`signal_utime`), folded host-side
435/// from the frozen `task_struct` enrichments captured at the phase's
436/// freeze boundaries. Backs the injected `system_time_ns` /
437/// `user_time_ns` per-phase metrics.
438///
439/// The unit is the THREAD GROUP, not the individual task: the kernel's
440/// `thread_group_cputime` is `signal_struct.{u,s}time` (the accumulator
441/// a dying thread's time is folded into at exit) plus the live threads'
442/// `task_struct.{u,s}time`. `signal_struct` is shared across a thread
443/// group, so its value is counted once per `tgid`; the live counters are
444/// summed across the group's threads. For each `tgid` the group total is
445/// taken at the FIRST and LAST sample in which the group had a READABLE
446/// total (ordered by capture time) and `last - first` is summed across
447/// groups.
448///
449/// Per-group first-seen/last-seen, NOT a per-sample cross-task SUM then a
450/// Counter `last - first`: the captured set changes between freezes
451/// (system tasks churn in and out). A task carrying a large cumulative
452/// counter that appears only in a LATER sample would dump its entire
453/// pre-phase history into a summed delta, inflating the phase value
454/// many-fold. Subtracting each group's OWN first-seen total cancels its
455/// pre-phase history, so a late-joining group contributes only what it
456/// accrued while observed — bounding the result by wall-clock × cores. A
457/// group's thread exiting mid-phase does not dip the total: its time
458/// moves from a `task_struct` counter into `signal_struct`, both of which
459/// the group total includes.
460///
461/// A `None` `signal_field` is a `signal_struct` translate miss, NOT a
462/// real zero (which reads `Some(0)`): such a sample is omitted for that
463/// group so every endpoint is a full live+signal total — mixing a
464/// live-only endpoint with a live+signal one would otherwise leak the
465/// cumulative accumulator as a phantom positive. A numeric `tgid` reused
466/// within the phase (process exit + PID realloc) can read lower at last
467/// than first; `saturating_sub` clamps that to 0 rather than wrapping.
468///
469/// Returns `None` when no group was observed with a readable total across
470/// at least two samples — no delta is measurable — keeping an absent
471/// per-phase bucket key distinct from a real `0` (a qualifying group
472/// whose counters did not advance yields `Some(0.0)`). Accumulates in
473/// `u128` to stay exact before the final `f64` (a phase can total many
474/// task-seconds of ns).
475fn phase_group_cpu_delta(
476 samples: &[crate::scenario::sample::Sample<'_>],
477 task_field: impl Fn(&crate::monitor::task_enrichment::TaskEnrichment) -> u64,
478 signal_field: impl Fn(&crate::monitor::task_enrichment::TaskEnrichment) -> Option<u64>,
479) -> Option<f64> {
480 // Per-tgid `thread_group_cputime` total at one sample: the shared
481 // signal_struct accumulator (once per group) plus the sum of the
482 // group's live threads' task_struct counter. A group is INCLUDED for
483 // a sample only when its signal accumulator was readable there (some
484 // thread of the tgid carried `Some`): a `None` is a signal_struct
485 // translate miss (see `task_enrichment.rs`), NOT a real zero — a real
486 // zero reads `Some(0)`. Omitting unreadable-signal groups keeps every
487 // endpoint a FULL group total (live + signal), so the first/last delta
488 // can never mix a live-only endpoint with a live+signal endpoint and
489 // leak the cumulative accumulator as a phantom positive.
490 let group_totals = |s: &crate::scenario::sample::Sample<'_>| {
491 let mut live: std::collections::HashMap<i32, u128> = std::collections::HashMap::new();
492 let mut signal: std::collections::HashMap<i32, Option<u128>> =
493 std::collections::HashMap::new();
494 for t in s.snapshot.task_enrichments() {
495 *live.entry(t.tgid).or_insert(0) += u128::from(task_field(t));
496 match signal_field(t) {
497 // Shared across the group — any readable thread fixes it.
498 Some(v) => {
499 signal.insert(t.tgid, Some(u128::from(v)));
500 }
501 None => {
502 signal.entry(t.tgid).or_insert(None);
503 }
504 }
505 }
506 live.into_iter()
507 .filter_map(|(tgid, l)| {
508 signal
509 .get(&tgid)
510 .copied()
511 .flatten()
512 .map(|sig| (tgid, l + sig))
513 })
514 .collect::<std::collections::HashMap<i32, u128>>()
515 };
516
517 // Order by capture time so first/last are the earliest/latest
518 // boundary — the grouped vec is not guaranteed sorted (the
519 // offset-remap in the stimulus path can reorder samples).
520 let mut ordered: Vec<&crate::scenario::sample::Sample<'_>> = samples.iter().collect();
521 // `elapsed_ms` is now Option: a sample with neither a
522 // boundary offset nor a measured elapsed has no time anchor — sort
523 // it LAST (u64::MAX), never first, so an untimestamped sample can't
524 // become the spurious earliest first_seen endpoint.
525 ordered.sort_by_key(|s| s.boundary_offset_ms.or(s.elapsed_ms).unwrap_or(u64::MAX));
526
527 // Per tgid: its full group total at the first and last sample in which
528 // it had a readable total, and how many such samples it had.
529 let mut first_seen: std::collections::HashMap<i32, u128> = std::collections::HashMap::new();
530 let mut last_seen: std::collections::HashMap<i32, u128> = std::collections::HashMap::new();
531 let mut readable_count: std::collections::HashMap<i32, u32> = std::collections::HashMap::new();
532 for s in ordered {
533 for (tgid, total) in group_totals(s) {
534 first_seen.entry(tgid).or_insert(total);
535 last_seen.insert(tgid, total);
536 *readable_count.entry(tgid).or_insert(0) += 1;
537 }
538 }
539
540 let mut sum: u128 = 0;
541 let mut measured = false;
542 for (tgid, last) in &last_seen {
543 // A delta needs the group observed with a readable total across
544 // TWO boundaries; one readable sample gives first == last and no
545 // measurable in-phase growth (and a tgid that appears in only one
546 // sample, or whose signal_struct never translated, never reaches
547 // 2). saturating_sub clamps a last < first read — a numeric tgid
548 // reused within the phase (process exit + PID realloc to a fresh
549 // group starting near zero) reads lower at last; clamp to 0 rather
550 // than wrap.
551 if readable_count.get(tgid).copied().unwrap_or(0) < 2 {
552 continue;
553 }
554 measured = true;
555 // first_seen always holds tgid (populated in the same pass).
556 let first = first_seen.get(tgid).copied().unwrap_or(*last);
557 sum += last.saturating_sub(first);
558 }
559 // None when no group was observed with a readable total across two
560 // samples — unmeasurable, so the bucket key stays ABSENT (distinct
561 // from a real 0: a qualifying group whose counters did not advance
562 // yields Some(0.0)).
563 measured.then_some(sum as f64)
564}
565
566/// Per-phase per-CPU spatial-max fold for a monotonic per-CPU counter: the
567/// BUSIEST CPU's delta over the phase (`max_key`) + the concentration ratio
568/// `max / mean` over the reporting CPUs (`concentration_key`). A custom
569/// per-CPU-delta fold — NOT a read_sample arm (read_sample yields the cross-CPU
570/// SUM as one f64, with no per-CPU vector). `field` selects the per-CPU counter
571/// (`|c| c.irqs_sum` for hardirqs, `|c| c.softirqs[NET_RX]` for NET_RX softirqs).
572///
573/// Endpoints are the earliest + latest freeze BY `win` (the SAME key the bucket
574/// window uses; `samples_in_phase` is NOT positionally ordered — the stimulus
575/// offset-remap can reorder it). The per-CPU delta is correlated by the
576/// per_cpu_time `cpu` FIELD (not vec position) over the CPUs present in BOTH
577/// endpoints; a CPU absent from either is skipped (hotplug defense — a no-op on
578/// ktstr's boot-fixed vCPU set, correct-by-construction if a hot-add Op lands).
579/// `saturating_sub` clamps a per-CPU counter regress (the kernel counters are
580/// monotonic; a regress would signal a reset). The per-CPU delta is taken
581/// FIRST, the spatial max SECOND — so a busiest CPU that SHIFTS across the phase
582/// is captured as max(per-CPU deltas), not a delta of spatial maxes.
583///
584/// Loud-absent: `max_key` needs at least 2 freezes spanning a positive `win`
585/// window and at least 1 defined-delta CPU; `concentration_key` ADDITIONALLY
586/// needs at least 2 defined-delta CPUs (a 1-CPU intersection makes max/mean == 1
587/// a structural artifact) and a positive mean (a NaN would fail the sidecar's
588/// is_finite insert-guard).
589fn fold_per_cpu_spatial_max(
590 metrics: &mut std::collections::BTreeMap<String, f64>,
591 samples_in_phase: &[crate::scenario::sample::Sample<'_>],
592 win: impl Fn(&crate::scenario::sample::Sample<'_>) -> Option<u64>,
593 field: impl Fn(&crate::monitor::dump::PerCpuTimeStats) -> u64,
594 max_key: &str,
595 concentration_key: &str,
596) {
597 let placed: Vec<(&crate::scenario::sample::Sample<'_>, u64)> = samples_in_phase
598 .iter()
599 .filter(|s| !s.snapshot.per_cpu_time().is_empty())
600 .filter_map(|s| win(s).map(|w| (s, w)))
601 .collect();
602 if let (Some(&(first_s, fw)), Some(&(last_s, lw))) = (
603 placed.iter().min_by_key(|(_, w)| *w),
604 placed.iter().max_by_key(|(_, w)| *w),
605 ) && fw < lw
606 {
607 // Per-CPU delta over the cpu-field intersection of the two endpoint
608 // freezes; clamp a regress to 0.
609 let deltas: Vec<f64> = first_s
610 .snapshot
611 .per_cpu_time()
612 .iter()
613 .filter_map(|c0| {
614 last_s
615 .snapshot
616 .per_cpu_time_at(c0.cpu)
617 .map(|c1| field(c1).saturating_sub(field(c0)) as f64)
618 })
619 .collect();
620 if let Some(max) = deltas.iter().copied().reduce(f64::max) {
621 metrics.entry(max_key.to_string()).or_insert(max);
622 if deltas.len() >= 2 {
623 let mean = deltas.iter().sum::<f64>() / deltas.len() as f64;
624 if mean > 0.0 {
625 metrics
626 .entry(concentration_key.to_string())
627 .or_insert(max / mean);
628 }
629 }
630 }
631 }
632}
633
634/// Per-CPU scx_layered util-compensation SCALE over a freeze span: the factor by
635/// which a CPU's useful-work (non-overhead) capacity is scaled up to compensate
636/// for IRQ / softirq / stolen time. Over the `first`->`last` cpustat delta,
637/// `scale = delta_total / available` where `delta_total` is the sum of ALL 8
638/// `kernel_cpustat[]` ns slots (user+nice+system+idle+iowait+irq+softirq+steal)
639/// and `available = delta_total - (irq + softirq + steal)`. Clamped to
640/// `[1.0, 20.0]`; `available == 0` (overhead consumed the whole window, or no
641/// forward progress) returns the floor `1.0` (no compensation) — scx_layered's
642/// `available > 0` guard, expressed over `u64` saturating deltas where `> 0`
643/// is `!= 0`.
644///
645/// Byte-faithful to scx_layered's `util_compensation` per-CPU compute. The
646/// ns-vs-µs unit is immaterial: the ratio cancels it (scx_layered reads /proc
647/// microseconds; here the host reads `kernel_cpustat` ns, the same slots
648/// `/proc/stat` formats from). Hostile-guest hardening: every per-slot delta is
649/// `saturating_sub` (a cpustat counter that regresses across the span — a
650/// CPU-hotplug reset, or a corrupt read — clamps to 0, never wraps to
651/// ~`u64::MAX`), exactly as scx_layered does. The one divergence from
652/// scx_layered: the 8-slot `delta_total` and 3-slot `overhead` sums use
653/// `saturating_add` (a hostile per-slot `u64::MAX` clamps the total, mirroring
654/// the per-CPU IRQ spatial sums in [`crate::stats::MetricDef::read_sample`]),
655/// where scx_layered combines its per-slot saturating-subs with plain `+` — its
656/// /proc inputs are trusted, ours are guest-controlled. With `overhead >= 0` the
657/// ratio is always `>= 1.0`, so the clamp's floor is belt-and-suspenders and the
658/// cap at 20.0 is the only active bound.
659pub(crate) fn cpu_util_comp_scale(
660 first: &crate::monitor::dump::PerCpuTimeStats,
661 last: &crate::monitor::dump::PerCpuTimeStats,
662) -> f64 {
663 let user = last.cpustat_user_ns.saturating_sub(first.cpustat_user_ns);
664 let nice = last.cpustat_nice_ns.saturating_sub(first.cpustat_nice_ns);
665 let system = last
666 .cpustat_system_ns
667 .saturating_sub(first.cpustat_system_ns);
668 let idle = last.cpustat_idle_ns.saturating_sub(first.cpustat_idle_ns);
669 let iowait = last
670 .cpustat_iowait_ns
671 .saturating_sub(first.cpustat_iowait_ns);
672 let irq = last.cpustat_irq_ns.saturating_sub(first.cpustat_irq_ns);
673 let softirq = last
674 .cpustat_softirq_ns
675 .saturating_sub(first.cpustat_softirq_ns);
676 let steal = last.cpustat_steal_ns.saturating_sub(first.cpustat_steal_ns);
677 let delta_total = [user, nice, system, idle, iowait, irq, softirq, steal]
678 .into_iter()
679 .fold(0u64, u64::saturating_add);
680 let overhead = irq.saturating_add(softirq).saturating_add(steal);
681 let available = delta_total.saturating_sub(overhead);
682 if available == 0 {
683 return 1.0;
684 }
685 (delta_total as f64 / available as f64).clamp(1.0, 20.0)
686}
687
688/// Per-phase mean ACROSS CPUs of the [`cpu_util_comp_scale`] over the
689/// `per_cpu_time` freeze span, inserted as `avg_cpu_util_comp_scale`. Endpoint
690/// selection (first/last by `win`), the cpu-field intersection (per-CPU paired
691/// by the `cpu` field; a CPU absent from either endpoint is skipped — hotplug
692/// churn), the saturating deltas, and the loud-absent discipline (no key when
693/// no CPU pairs or the window is degenerate, `fw == lw`) mirror
694/// [`fold_per_cpu_spatial_max`]. `Gauge(Avg)`, so it auto-folds to run-level
695/// (weighted mean across phases) and cross-run with no extra wiring.
696///
697/// Cross-boot caveat: `samples_in_phase` are the periodic freezes, whose
698/// window starts at `max(scenario-start, prereqs-ready)` — on a cold boot
699/// (accessor/attach lag) it starts later, so this key reflects a later
700/// workload sub-window. Across a cold/warm boot mix its cross-run magnitude
701/// shifts; use `--noise-adjust` (spread analysis absorbs the boot-timing
702/// jitter) for cross-run compares — a plain single-pair (cached) compare
703/// of this key is advisory.
704fn fold_util_comp_scale(
705 metrics: &mut std::collections::BTreeMap<String, f64>,
706 samples_in_phase: &[crate::scenario::sample::Sample<'_>],
707 win: impl Fn(&crate::scenario::sample::Sample<'_>) -> Option<u64>,
708) {
709 let placed: Vec<(&crate::scenario::sample::Sample<'_>, u64)> = samples_in_phase
710 .iter()
711 .filter(|s| !s.snapshot.per_cpu_time().is_empty())
712 .filter_map(|s| win(s).map(|w| (s, w)))
713 .collect();
714 if let (Some(&(first_s, fw)), Some(&(last_s, lw))) = (
715 placed.iter().min_by_key(|(_, w)| *w),
716 placed.iter().max_by_key(|(_, w)| *w),
717 ) && fw < lw
718 {
719 // Per-CPU clamped scale over the cpu-field intersection of the two
720 // endpoint freezes, then the arithmetic mean across the reporting CPUs.
721 let scales: Vec<f64> = first_s
722 .snapshot
723 .per_cpu_time()
724 .iter()
725 .filter_map(|c0| {
726 last_s
727 .snapshot
728 .per_cpu_time_at(c0.cpu)
729 .map(|c1| cpu_util_comp_scale(c0, c1))
730 })
731 .collect();
732 if !scales.is_empty() {
733 let mean = scales.iter().sum::<f64>() / scales.len() as f64;
734 metrics
735 .entry("avg_cpu_util_comp_scale".to_string())
736 .or_insert(mean);
737 }
738 }
739}
740
741/// Per-phase per-task latency-criticality aggregate from the scheduler's
742/// `sdt_alloc` arena (scx_lavd's `task_ctx.normalized_lat_cri`, `[0, 1024]`).
743/// Over every freeze's `sdt_allocations` walk, pulls each live task_ctx's
744/// `normalized_lat_cri` — a host-rendered arena field (BPF_MAP_TYPE_ARENA), NOT
745/// a kernel counter and NOT a BPF `.bss` field — and folds across (freeze,
746/// task): mean -> `avg_task_lat_cri`, max -> `max_task_lat_cri`.
747///
748/// A GAUGE: `normalized_lat_cri` is an instantaneous per-task value lavd
749/// recomputes each schedule (scx_lavd `lat_cri.bpf.c`), so the fold is over ALL
750/// samples (mean / max over every (freeze, task) observation), NOT a first/last
751/// delta like the cpustat folds. `normalized` (not raw `lat_cri`) for cross-run
752/// comparability — raw `lat_cri` is squared + waker/wakee-propagated +
753/// load-dependent, so its magnitude is not comparable across runs.
754///
755/// Loud-absent + scheduler-agnostic by construction: a non-lavd payload has no
756/// `normalized_lat_cri` member, so [`crate::monitor::btf_render::RenderedValue::get`]
757/// returns `None` and the entry is skipped; if no entry yields the field, no key
758/// is inserted (the "no key when absent" discipline the sibling folds use). The
759/// member name is the only gate (lavd-unique) — no scheduler-name hardcoding.
760///
761/// Cross-boot caveat: like `fold_util_comp_scale`, these are the periodic
762/// freezes, whose window starts later on a cold boot (the prereq-ready
763/// anchor), so `avg_task_lat_cri`'s cross-run magnitude shifts across a
764/// cold/warm boot mix; use `--noise-adjust` for cross-run compares —
765/// a single-pair (cached) compare is advisory.
766fn fold_lat_cri(
767 metrics: &mut std::collections::BTreeMap<String, f64>,
768 samples_in_phase: &[crate::scenario::sample::Sample<'_>],
769) {
770 let mut sum = 0.0f64;
771 let mut count = 0usize;
772 let mut max = f64::MIN;
773 for s in samples_in_phase {
774 for alloc in &s.snapshot.report().sdt_allocations {
775 for entry in &alloc.entries {
776 if let Some(v) = entry
777 .payload
778 .get("normalized_lat_cri")
779 .and_then(|r| r.as_u64())
780 {
781 let v = v as f64;
782 sum += v;
783 count += 1;
784 if v > max {
785 max = v;
786 }
787 }
788 }
789 }
790 }
791 if count > 0 {
792 metrics
793 .entry("avg_task_lat_cri".to_string())
794 .or_insert(sum / count as f64);
795 metrics.entry("max_task_lat_cri".to_string()).or_insert(max);
796 }
797}
798
799/// Per-phase per-CGROUP spatial axis over the workload-leaf PSI-irq captured at
800/// each freeze (`Snapshot::cgroup_psi`). The per-cgroup analog of
801/// [`fold_per_cpu_spatial_max`], producing three Peak metrics:
802///
803/// - `max_cgroup_psi_irq_avg10`: the worst leaf's IRQ-full pressure GAUGE
804/// (decoded avg10 percent) — per-freeze max across leaves, then max across the
805/// phase's freezes. A gauge, so no delta/baseline (a spatial-max of an
806/// instantaneous reading, the `max_avg_irq_util` shape on the cgroup axis).
807/// - `max_cgroup_irq_pressure`: the busiest leaf's IRQ-full stall DELTA over the
808/// phase (decoded µs) — per-leaf (last - first) `total_ns`, correlated by
809/// `cgroup_kva`, then the spatial max. A monotonic Counter, so delta-FIRST
810/// then spatial-max (the `max_cpu_hardirqs` shape). Informational
811/// (workload-confounded magnitude).
812/// - `max_cgroup_irq_pressure_concentration`: `max_cgroup_irq_pressure` / the
813/// mean per-leaf delta over the SAME leaf set — the busiest cell's share (the
814/// isolation/steering signal). max/MEAN, LowerBetter (the
815/// `max_cpu_hardirq_concentration` shape). Absent when < 2 reporting leaves or
816/// mean == 0.
817///
818/// Endpoints + the leaf intersection + saturating + loud-absent rules mirror
819/// [`fold_per_cpu_spatial_max`] (the `(cgroup_kva, serial_nr)` pair is the
820/// per-CPU `cpu`-field analog — the serial_nr disambiguates a freed slab KVA
821/// reused by a NEW cgroup, which `cgroup_kva` alone cannot; a leaf absent from
822/// either endpoint is skipped — leaf churn / a cgroup created mid-phase). All
823/// three are Peak → they auto-fold max-across-phases to run-level and cross-run
824/// MAX, no extra wiring.
825fn fold_per_cgroup_psi(
826 metrics: &mut std::collections::BTreeMap<String, f64>,
827 samples_in_phase: &[crate::scenario::sample::Sample<'_>],
828 win: impl Fn(&crate::scenario::sample::Sample<'_>) -> Option<u64>,
829) {
830 use crate::monitor::btf_offsets::{decode_avg10_percent, decode_total_us};
831 // avg10 gauge: per-freeze max across leaves, then max across the freezes.
832 let avg10_peak = samples_in_phase
833 .iter()
834 .filter_map(|s| {
835 s.snapshot
836 .cgroup_psi()
837 .iter()
838 .map(|c| decode_avg10_percent(c.avg10_raw))
839 .reduce(f64::max)
840 })
841 .reduce(f64::max);
842 if let Some(v) = avg10_peak {
843 metrics
844 .entry("max_cgroup_psi_irq_avg10".to_string())
845 .or_insert(v);
846 }
847
848 // total-delta spatial-max + concentration. Earliest + latest freeze by `win`
849 // (the bucket-window key; samples_in_phase is not positionally ordered). The
850 // per-leaf delta is correlated by `(cgroup_kva, serial_nr)` over the leaves
851 // present in BOTH endpoints; a leaf absent from either is skipped. The
852 // serial_nr guards KVA REUSE: a leaf rmdir'd mid-phase whose freed slab KVA a
853 // NEW cgroup then reused would alias by KVA alone and yield a bogus
854 // cross-cgroup delta — a different serial_nr at the same KVA is treated as
855 // absent (dropped), not differenced. `saturating_sub` then clamps only a
856 // (rare) genuine same-cgroup counter regress. delta FIRST, spatial max
857 // SECOND — a busiest cell that shifts across the phase is captured as
858 // max(per-leaf deltas), not a delta of spatial maxes.
859 let placed: Vec<(&crate::scenario::sample::Sample<'_>, u64)> = samples_in_phase
860 .iter()
861 .filter(|s| !s.snapshot.cgroup_psi().is_empty())
862 .filter_map(|s| win(s).map(|w| (s, w)))
863 .collect();
864 if let (Some(&(first_s, fw)), Some(&(last_s, lw))) = (
865 placed.iter().min_by_key(|(_, w)| *w),
866 placed.iter().max_by_key(|(_, w)| *w),
867 ) && fw < lw
868 {
869 let deltas: Vec<f64> = first_s
870 .snapshot
871 .cgroup_psi()
872 .iter()
873 .filter_map(|c0| {
874 last_s
875 .snapshot
876 .cgroup_psi()
877 .iter()
878 .find(|c1| c1.cgroup_kva == c0.cgroup_kva && c1.serial_nr == c0.serial_nr)
879 .map(|c1| decode_total_us(c1.total_ns.saturating_sub(c0.total_ns)))
880 })
881 .collect();
882 if let Some(max) = deltas.iter().copied().reduce(f64::max) {
883 metrics
884 .entry("max_cgroup_irq_pressure".to_string())
885 .or_insert(max);
886 if deltas.len() >= 2 {
887 let mean = deltas.iter().sum::<f64>() / deltas.len() as f64;
888 if mean > 0.0 {
889 metrics
890 .entry("max_cgroup_irq_pressure_concentration".to_string())
891 .or_insert(max / mean);
892 }
893 }
894 }
895 }
896}
897
898/// Assemble [`PhaseBucket`]s from a pre-grouped phase map. Shared by
899/// [`build_phase_buckets`] (grouping by the bridge-stamped step_index)
900/// and [`build_phase_buckets_with_stimulus`] (grouping by the
901/// offset-remapped step).
902///
903/// `monitor_to_window_offset_ms` is subtracted from each
904/// [`crate::monitor::MonitorSample`] `elapsed_ms` before the window
905/// test, bringing the monitor sample's run-relative timestamp into the
906/// bucket-window frame: `0` when both share the run-relative frame, the
907/// stimulus/monitor clock skew (see [`monitor_clock_offset`]) when the
908/// window is workload-relative (boundary-offset) but the monitor samples
909/// remain run-relative.
910///
911/// Each bucket folds the monitor samples whose (shifted) elapsed_ms
912/// lands in the bucket window — supplying metrics like
913/// `avg_imbalance_ratio` that need per-CPU full-class `rq.nr_running`,
914/// which the bridge-captured Snapshot does not expose (Snapshot carries
915/// scx_rq.nr_running only).
916///
917/// `preemption_threshold_ns` is forwarded to `fold_monitor_into_bucket`
918/// (and thence `compute_metrics`) for the per-phase stall predicate; see
919/// `fold_monitor_into_bucket`.
920fn buckets_from_grouped(
921 by_phase: std::collections::BTreeMap<u16, Vec<crate::scenario::sample::Sample<'_>>>,
922 monitor_samples: &[crate::monitor::MonitorSample],
923 monitor_to_window_offset_ms: i64,
924 preemption_threshold_ns: u64,
925) -> Vec<PhaseBucket> {
926 let mut out: Vec<PhaseBucket> = Vec::with_capacity(by_phase.len());
927 for (step_index, samples_in_phase) in by_phase {
928 let label = if step_index == 0 {
929 "BASELINE".to_string()
930 } else {
931 // Scenario-Step ordinal lives at `step_index - 1`
932 // because phase 0 is BASELINE under the 1-indexed
933 // encoding; saturate at 0 if the underflow guard
934 // ever fires (unreachable for the current encoding
935 // — step_index here came from the bucket key so the
936 // `> 0` branch is satisfied — but keep the guard so
937 // a future caller that hands in a synthetic
938 // `step_index = 0` does not panic).
939 format!("Step[{}]", step_index.saturating_sub(1))
940 };
941 let sample_count = samples_in_phase.len();
942 // Bucket window: prefer each capture's workload-relative
943 // scheduled boundary offset (stable across a deferred-fire
944 // burst) over the run-relative fire time; fall back to
945 // `elapsed_ms` per-sample when no offset was stamped (synthetic
946 // / on-demand). min/max rather than first/last because the
947 // offset-remap in the stimulus path can reorder samples
948 // relative to drain order.
949 let win = |s: &crate::scenario::sample::Sample<'_>| s.boundary_offset_ms.or(s.elapsed_ms);
950 let (start_ms, end_ms) = if samples_in_phase.is_empty() {
951 (0, u64::MAX)
952 } else {
953 let mut lo = u64::MAX;
954 let mut hi = 0u64;
955 for s in &samples_in_phase {
956 // Skip a sample with no time anchor (no boundary offset
957 // AND no measured elapsed): coercing it to 0
958 // would pull start_ms to 0 and over-fold monitor samples.
959 // If EVERY sample in the phase is unanchored, lo/hi stay
960 // (u64::MAX, 0) — an inverted window that folds nothing,
961 // the correct "no placeable samples" outcome.
962 if let Some(w) = win(s) {
963 lo = lo.min(w);
964 hi = hi.max(w);
965 }
966 }
967 (lo, hi)
968 };
969 let mut metrics: std::collections::BTreeMap<String, f64> =
970 std::collections::BTreeMap::new();
971 for metric_def in crate::stats::METRICS {
972 let per_sample_readings: Vec<f64> = samples_in_phase
973 .iter()
974 .filter_map(|s| metric_def.read_sample(s))
975 .collect();
976 if per_sample_readings.is_empty() {
977 // No per-sample reading for any sample in this
978 // bucket -- the metric is host-side-only
979 // (cross-cgroup fold) or its dispatch arm has
980 // not landed yet. Omit the key rather than
981 // collapsing to `Some(0.0)` so the renderer
982 // paints "absent" vs "real zero" distinctly.
983 continue;
984 }
985 if let Some(reduced) =
986 crate::stats::aggregate_samples_for_phase(metric_def, &per_sample_readings)
987 {
988 metrics.insert(metric_def.name.to_string(), reduced);
989 }
990 }
991 // Per-phase system / user CPU time (ns), injected post-hoc as a
992 // per-thread-GROUP delta over the phase's freeze samples (NOT a
993 // read_sample metric — a per-sample cross-task sum then a Counter
994 // delta inflates when the captured task set churns; see
995 // `phase_group_cpu_delta`). Observer-free: reads the frozen
996 // task_struct.{s,u}time + thread-group signal_struct accumulator
997 // already captured in each sample's enrichments.
998 if let Some(v) = phase_group_cpu_delta(&samples_in_phase, |t| t.stime, |t| t.signal_stime) {
999 metrics.entry("system_time_ns".to_string()).or_insert(v);
1000 }
1001 if let Some(v) = phase_group_cpu_delta(&samples_in_phase, |t| t.utime, |t| t.signal_utime) {
1002 metrics.entry("user_time_ns".to_string()).or_insert(v);
1003 }
1004 // Per-CPU spatial axes (busiest-CPU counter delta + concentration) over
1005 // the per_cpu_time freezes: hardirqs (kstat.irqs_sum) and NET_RX softirqs
1006 // (kstat.softirqs[NET_RX]). Both are monotonic per-CPU counters; the fold
1007 // takes the per-CPU delta FIRST then the spatial max — see
1008 // `fold_per_cpu_spatial_max` for the shared endpoint / cpu-field
1009 // intersection / saturating / loud-absent rules.
1010 fold_per_cpu_spatial_max(
1011 &mut metrics,
1012 &samples_in_phase,
1013 win,
1014 |c| c.irqs_sum,
1015 "max_cpu_hardirqs",
1016 "max_cpu_hardirq_concentration",
1017 );
1018 fold_per_cpu_spatial_max(
1019 &mut metrics,
1020 &samples_in_phase,
1021 win,
1022 |c| c.softirqs[crate::monitor::btf_offsets::SOFTIRQ_NET_RX],
1023 "max_cpu_softirq_net_rx",
1024 "max_cpu_softirq_net_rx_concentration",
1025 );
1026 // scx_layered util-compensation scale: per-CPU first->last cpustat delta
1027 // -> clamped scale -> mean across CPUs (avg_cpu_util_comp_scale). Over
1028 // the same per_cpu_time freezes as the spatial axes above, but a Gauge
1029 // (per-CPU clamp-then-mean), not a spatial-max counter delta.
1030 fold_util_comp_scale(&mut metrics, &samples_in_phase, win);
1031 // scx_lavd per-task latency-criticality: mean/max of each live task_ctx's
1032 // normalized_lat_cri ([0,1024]) over the sdt_alloc arena walk at every
1033 // freeze (avg_task_lat_cri / max_task_lat_cri). A gauge folded over all
1034 // (freeze, task) observations; loud-absent for non-lavd schedulers.
1035 fold_lat_cri(&mut metrics, &samples_in_phase);
1036 // Per-cgroup PSI-irq spatial axis: the busiest workload-leaf's IRQ-full
1037 // stall delta + avg10 gauge + the cross-leaf concentration, over the
1038 // cgroup_psi freezes (the host cgroup-walk capture). All Peak; auto-fold
1039 // run-level.
1040 fold_per_cgroup_psi(&mut metrics, &samples_in_phase, win);
1041 let mut bucket = PhaseBucket {
1042 step_index,
1043 label,
1044 start_ms,
1045 end_ms,
1046 sample_count,
1047 metrics,
1048 per_cgroup: std::collections::BTreeMap::new(),
1049 };
1050 // Per-phase MonitorSample windowing for monitor-derived metrics
1051 // (avg_imbalance_ratio). Factored into `fold_monitor_into_bucket`
1052 // so a synthesized zero-capture bucket (see
1053 // `build_phase_buckets_with_stimulus`) recovers its in-window
1054 // imbalance too — same formula and frame, but over the
1055 // synthesized bucket's FULL stimulus window vs this captured
1056 // bucket's narrower interior capture-offset span.
1057 fold_monitor_into_bucket(
1058 &mut bucket,
1059 monitor_samples,
1060 monitor_to_window_offset_ms,
1061 preemption_threshold_ns,
1062 );
1063 // Phase-wall denominators for the IRQ rates, co-inserted
1064 // both-or-neither with the read_sample-folded IRQ numerators above so
1065 // derive_rate_metrics pairs num/den from THIS bucket (never cross-paired
1066 // with a stimulus event's total_phase_duration_sec — a distinct
1067 // metric). The window is the CAPTURE span (start_ms/end_ms = first->last
1068 // freeze offset), the same span the Counter numerators accrue over:
1069 // self-consistent for the ns/ns fraction, and a capture-window rate for
1070 // the per-second rates (the d/span factor cancels in an A/B compare with
1071 // matched cadence). Gate on total_hardirqs (present iff per_cpu_time was
1072 // captured; irqtime-independent, unlike total_irq_time_ns) + a positive
1073 // window. ms->ns and ms->s scaling lives HERE (derive_rate_metrics does
1074 // bare num/den). One insertion covers both derive sites:
1075 // build_phase_buckets_with_stimulus folds over buckets_from_grouped's
1076 // output, so its later re-derive sees these denominators too.
1077 if bucket.metrics.contains_key("total_hardirqs") && bucket.end_ms > bucket.start_ms {
1078 let wall_ms = (bucket.end_ms - bucket.start_ms) as f64;
1079 bucket
1080 .metrics
1081 .entry("total_phase_wall_ns".to_string())
1082 .or_insert(wall_ms * 1_000_000.0);
1083 bucket
1084 .metrics
1085 .entry("total_phase_wall_sec".to_string())
1086 .or_insert(wall_ms / 1000.0);
1087 }
1088 // Derive Rate metrics AFTER every component source is folded in:
1089 // the METRICS reductions + system/user_time_ns above AND the
1090 // monitor-injected components (e.g. avg_imbalance_ratio, whose ONLY
1091 // source is fold_monitor_into_bucket). Deriving before the monitor
1092 // fold would silently drop a Rate over a monitor-only component. A
1093 // Rate has no samples of its own (see MetricKind::Rate); this is its
1094 // per-phase producer. Inert until a Rate registers. The stimulus
1095 // sibling build_phase_buckets_with_stimulus re-derives again after
1096 // its own later injections (idempotent).
1097 crate::stats::derive_rate_metrics(&mut bucket.metrics);
1098 out.push(bucket);
1099 }
1100 out
1101}
1102
1103/// Fold the per-CPU full-class imbalance from the monitor samples whose
1104/// run-relative timestamp falls in `bucket`'s `[start_ms, end_ms)`
1105/// window into `bucket.metrics["avg_imbalance_ratio"]`.
1106///
1107/// The monitor sample's run-relative `elapsed_ms` is shifted into the
1108/// bucket-window frame (subtract `monitor_to_window_offset_ms`) before
1109/// the half-open `[start_ms, end_ms)` test so a MonitorSample whose
1110/// timestamp equals the boundary lands in exactly one bucket (not both
1111/// adjacent buckets — the closed-on-right form double-counted boundary
1112/// samples). Single-sample phases (`start_ms == end_ms`) use explicit
1113/// equality so the window is not empty.
1114///
1115/// Filters via [`crate::monitor::sample_looks_valid`] (implausible-DSQ
1116/// samples) before the fold; `compute_metrics` additionally drops
1117/// empty-cpus samples (which would default `imbalance_ratio` to 1.0 and
1118/// pull the mean toward "perfect balance", masking a real regression) —
1119/// matching the legacy `Timeline::build` path's filter discipline.
1120///
1121/// Folds the FULL monitor-derived metric set the legacy `Timeline::build`
1122/// reducer (`crate::timeline::compute_metrics`) produces —
1123/// `avg_imbalance_ratio`, `max_imbalance_ratio`, `avg_dsq_depth`,
1124/// `max_dsq_depth`, `stuck_count`, and the `total_fallback` /
1125/// `total_keep_last` counter deltas — over the bucket's window.
1126/// `avg_imbalance_ratio`, `max_imbalance_ratio`, and `stuck_count` are
1127/// folded for EVERY bucket with in-window monitor samples: none of the three
1128/// has a `read_sample` dispatch arm (`crate::stats` `read_sample` has arms
1129/// only for the dsq / fallback keys; these three fall to `_ => None`), so the
1130/// per-sample capture path never produces them and monitor is their only
1131/// per-bucket source on captured AND synthesized buckets alike — a captured
1132/// (common-case) phase must report its per-phase imbalance peak and stall
1133/// count, not drop them. (`avg_imbalance_ratio` is genuinely ext-metrics-only;
1134/// `max_imbalance_ratio` and `stuck_count` ALSO carry a typed `GauntletRow`
1135/// accessor sourced from the whole-run MonitorSummary, so their per-phase
1136/// fold here feeds per-phase RENDERING only — the run-level value stays the
1137/// typed accessor, and both `populate_run_ext_metrics*` skip them via
1138/// `TYPED_FIELD_NAMES` to avoid a double-source.) The dsq / fallback set
1139/// (`avg_dsq_depth`,
1140/// `max_dsq_depth`, `total_fallback`, `total_keep_last`) is folded ONLY for
1141/// a synthesized (`sample_count == 0`) bucket: a captured bucket sources
1142/// those from its read_sample captures and keeps its pre-synthesize
1143/// behavior, while a synthesized bucket has no captures, so monitor is its
1144/// only source — restoring the rendered timeline to PARITY with the old
1145/// `Timeline::build` fallback (the path a zero-capture-with-monitor run
1146/// took before the synthesize seam flipped it onto from_phase_buckets;
1147/// `format_phases` renders these folded metrics for a `sample_count == 0`
1148/// bucket via its `has_monitor_metrics` gate). Each key is `or_insert` so
1149/// it never overwrites a value already present. Parity with
1150/// `Timeline::build` is exact for the production case; for legacy
1151/// ScenarioEnd-but-no-StepEnd data the synthesized last-step window clamps
1152/// to the terminal rather than extending to end-of-monitor, and a
1153/// synthesized bucket's dsq metrics come from the monitor
1154/// `CpuSnapshot.local_dsq_depth` axis (vs a captured bucket's DSQ-walker
1155/// axis) — same metric, different sampling axis.
1156///
1157/// `bucket`'s `[start_ms, end_ms)` IS the window basis and differs by
1158/// bucket kind: a captured bucket's is the min/max of its samples'
1159/// interior capture offsets; a synthesized bucket's is its full
1160/// `[StepStart, StepEnd)` stimulus window. The monitor sample's
1161/// run-relative `elapsed_ms` is shifted into that frame (subtract
1162/// `monitor_to_window_offset_ms`) before the half-open test so a sample
1163/// on the boundary lands in exactly one bucket. `compute_metrics` returns
1164/// fallback / keep_last as RATES; this re-derives the bucket-native
1165/// counter DELTAS (so `phase_from_bucket` re-rates them over the bucket
1166/// window like the read_sample path) using the same `counter_delta` clamp.
1167fn fold_monitor_into_bucket(
1168 bucket: &mut PhaseBucket,
1169 monitor_samples: &[crate::monitor::MonitorSample],
1170 monitor_to_window_offset_ms: i64,
1171 preemption_threshold_ns: u64,
1172) {
1173 let start_ms = bucket.start_ms;
1174 let end_ms = bucket.end_ms;
1175 let in_window = |monitor_ms: u64| -> bool {
1176 let shifted = monitor_ms as i64 - monitor_to_window_offset_ms;
1177 if shifted < 0 {
1178 return false;
1179 }
1180 let m = shifted as u64;
1181 if start_ms == end_ms {
1182 m == start_ms
1183 } else {
1184 m >= start_ms && m < end_ms
1185 }
1186 };
1187 // Filter via sample_looks_valid (matches Timeline::build) so an invalid
1188 // sample (empty cpus -> imbalance_ratio 1.0 default) doesn't pull the
1189 // mean toward "perfect balance" and mask a real regression.
1190 let phase_monitor_samples: Vec<&crate::monitor::MonitorSample> = monitor_samples
1191 .iter()
1192 .filter(|s| in_window(s.elapsed_ms))
1193 .filter(|s| crate::monitor::sample_looks_valid(s))
1194 .collect();
1195 if phase_monitor_samples.is_empty() {
1196 return;
1197 }
1198 let pm = crate::timeline::compute_metrics(&phase_monitor_samples, preemption_threshold_ns);
1199 // sample_count == 0 IFF the bucket is synthesized: buckets_from_grouped
1200 // only emits >=1-sample buckets, the synthesize loop only emits 0-sample
1201 // ones. A CAPTURED bucket folds the source-less monitor signals (avg/max
1202 // imbalance + stuck_count — none have a read_sample arm) but NOT the
1203 // dsq / fallback set, which it sources from its read_sample captures.
1204 // Restricting only that dsq / fallback set to synthesized buckets
1205 // preserves captured-bucket behavior; a synthesized bucket has no
1206 // captures, so monitor is its only source and it takes the full set
1207 // (Timeline::build render parity).
1208 let synthesized = bucket.sample_count == 0;
1209 let mut put = |key: &str, v: f64| {
1210 if v.is_finite() {
1211 bucket.metrics.entry(key.to_string()).or_insert(v);
1212 }
1213 };
1214 if let Some(v) = pm.avg_imbalance {
1215 put("avg_imbalance_ratio", v);
1216 }
1217 // avg_nr_running (Gauge(Avg)) is a monitor-axis signal (full-class
1218 // rq.nr_running, no read_sample dispatch arm), so like avg_imbalance_ratio
1219 // it is folded for EVERY monitor-bearing bucket (not gated on synthesized)
1220 // — captured buckets have no other source for it. It feeds per-phase
1221 // rendering + boundary change-detection only; the run-level value stays
1222 // MonitorSummary::avg_nr_running (fold_run_level_ext), so the run-level ext
1223 // re-pool (populate_run_ext_metrics_from_phases) SKIPS this key to avoid a
1224 // double-source.
1225 if let Some(v) = pm.avg_nr_running {
1226 put("avg_nr_running", v);
1227 }
1228 // max_imbalance_ratio (Peak) and stuck_count (Counter) have NO read_sample
1229 // dispatch arm (both fall to `_ => None` in crate::stats read_sample), so
1230 // the per-sample capture path never produces them and a CAPTURED bucket
1231 // would otherwise never carry them — they would surface only on synthesized
1232 // (zero-capture) buckets. Fold them for EVERY monitor-bearing bucket — like
1233 // avg_imbalance_ratio above — so a captured (common-case) phase reports its
1234 // per-phase imbalance peak and stall count instead of dropping them. (Both
1235 // are monitor-axis signals: imbalance from full-class rq.nr_running, stalls
1236 // from non-advancing rq.clock across consecutive samples — neither is in
1237 // the guest Snapshot read_sample observes. Both ALSO carry a typed
1238 // run-level GauntletRow accessor, so this per-phase fold feeds per-phase
1239 // RENDERING only; TYPED_FIELD_NAMES keeps them out of the run-level
1240 // ext_metrics so the typed accessor stays the single run-level source.)
1241 // `or_insert` still guards against overwriting a value already present.
1242 if let Some(v) = pm.max_imbalance {
1243 put("max_imbalance_ratio", v);
1244 }
1245 if pm.stall_count > 0 {
1246 put("stuck_count", pm.stall_count as f64);
1247 }
1248 if synthesized {
1249 if let Some(v) = pm.avg_dsq_depth {
1250 put("avg_dsq_depth", v);
1251 }
1252 put("max_dsq_depth", pm.max_dsq_depth as f64);
1253 // Bucket-native counter totals for fallback / keep_last: the first
1254 // and last in-window samples carrying event counters, clamped with
1255 // the same counter_delta MonitorSummary uses (a mid-phase scheduler
1256 // restart can reset the counter, producing a negative raw delta).
1257 // phase_from_bucket re-rates these over the bucket window, matching
1258 // the read_sample representation captured buckets carry.
1259 let has_events =
1260 |s: &&crate::monitor::MonitorSample| s.cpus.iter().any(|c| c.event_counters.is_some());
1261 let first_ev = phase_monitor_samples.iter().copied().find(has_events);
1262 let last_ev = phase_monitor_samples.iter().copied().rev().find(has_events);
1263 if let (Some(first), Some(last)) = (first_ev, last_ev) {
1264 let fb = crate::monitor::counter_delta(
1265 last.sum_event_field(|e| e.select_cpu_fallback).unwrap_or(0),
1266 first
1267 .sum_event_field(|e| e.select_cpu_fallback)
1268 .unwrap_or(0),
1269 );
1270 let kl = crate::monitor::counter_delta(
1271 last.sum_event_field(|e| e.dispatch_keep_last).unwrap_or(0),
1272 first.sum_event_field(|e| e.dispatch_keep_last).unwrap_or(0),
1273 );
1274 put("total_fallback", fb as f64);
1275 put("total_keep_last", kl as f64);
1276 }
1277 }
1278}
1279
1280/// Clock skew (ms) between the host monitor's run-relative timeline and
1281/// the guest's scenario-relative stimulus timeline, computed the same
1282/// way as [`crate::timeline::Timeline::build`]: the first significant
1283/// monitor sample (elapsed > 500 ms, non-empty cpus) and the earliest
1284/// stimulus event roughly coincide at scenario start. Returns
1285/// `first_monitor_ms - first_stimulus_ms`; subtract from a monitor
1286/// sample's elapsed_ms to reach the scenario-relative (boundary-offset)
1287/// window frame. `0` when either timeline is empty (nothing to align,
1288/// so the run-relative frames are used as-is).
1289fn monitor_clock_offset(
1290 stimulus_events: &[crate::timeline::StimulusEvent],
1291 monitor_samples: &[crate::monitor::MonitorSample],
1292) -> i64 {
1293 if stimulus_events.is_empty() || monitor_samples.is_empty() {
1294 return 0;
1295 }
1296 let first_stimulus_ms = stimulus_events
1297 .iter()
1298 .map(|e| e.elapsed_ms)
1299 .min()
1300 .unwrap_or(0);
1301 let first_monitor_ms = monitor_samples
1302 .iter()
1303 .find(|s| s.elapsed_ms > 500 && !s.cpus.is_empty())
1304 .map(|s| s.elapsed_ms)
1305 .unwrap_or_else(|| monitor_samples.first().map(|s| s.elapsed_ms).unwrap_or(0));
1306 first_monitor_ms as i64 - first_stimulus_ms as i64
1307}