ktstr/assert/
phase_build.rs

1use super::*;
2
3/// Phase buckets attributed against the guest stimulus timeline, then
4/// enriched with stimulus-event-derived per-phase `iteration_rate`.
5///
6/// Unlike the plain [`build_phase_buckets`] (which groups by the
7/// bridge-stamped step_index), this re-groups each periodic capture by
8/// the guest step whose stimulus window contains the capture's
9/// workload-relative boundary offset (`Sample::boundary_offset_ms`).
10/// That offset is derived from the boundary schedule rather than the
11/// fire time, so it is immune to the deferred-fire burst that makes
12/// every capture stamp the same late CURRENT_STEP (the
13/// `phases.len() == 1` collapse). Captures with no offset (on-demand /
14/// fixture) fall back to their stamped step_index. Because the bucket
15/// windows are then workload-relative, the run-relative monitor samples
16/// are shifted by the stimulus/monitor clock skew before windowing.
17///
18/// Additionally synthesizes a capture-free `PhaseBucket`
19/// (`sample_count == 0`) for any stimulus `StepStart`-step that
20/// captured no periodic samples — the uniform whole-workload boundary
21/// placement (`compute_periodic_boundaries_ns`) is step-agnostic, so a
22/// short interior step can land zero captures and otherwise leave no
23/// bucket, silently dropping its capture-independent `iteration_rate`.
24/// The synthesized bucket carries the step's full stimulus window so its
25/// `iteration_rate` (from `StepStart`/`StepEnd` deltas) and
26/// `avg_imbalance_ratio` (from in-window monitor samples) are still
27/// recovered. The returned vec therefore holds one bucket per
28/// (captured phase ∪ `StepStart`-step), sorted by `step_index` — NOT
29/// one-per-captured-phase, so `len()` is no longer "number of captured
30/// phases".
31///
32/// The `iteration_rate` enrichment lets
33/// `crate::timeline::Timeline::from_phase_buckets` render the per-phase
34/// throughput annotation without going through the legacy
35/// `crate::timeline::Timeline::build` path.
36///
37/// For each `StepStart[k]` -> `StepEnd[k]` pair with
38/// `total_iterations: Some(_)`, the per-phase rate is
39/// `(later - earlier) / duration_s` where `duration_s` is the
40/// elapsed-ms delta BETWEEN THE TWO STIMULUS EVENTS (guest clock),
41/// not the PhaseBucket sample window. The rate is attributed to the
42/// step the EARLIER event starts (`prev.step_index`); the attribution
43/// loop skips any `is_step_end` (or `is_terminal`) `prev`, so only a
44/// StepStart is ever the earlier member. Phases that don't overlap a
45/// stimulus pair keep their PhaseBucket.metrics map unchanged (no
46/// iteration_rate key).
47///
48/// SEMANTICS: `total_iterations` is the sum of the worker handles
49/// alive at each event (see
50/// [`crate::timeline::StimulusEvent::total_iterations`]). Each step's
51/// rate is its STEP-LOCAL `StepStart[k]` -> `StepEnd[k]` delta — the
52/// step's own workers measured over its own hold — so a bucket is
53/// sourced ONLY by its own pair (the `is_step_end` guard drops the
54/// inter-step `StepEnd[k]` -> `StepStart[k+1]` pair entirely). This
55/// measures BOTH fresh-per-step workers (which read ~0 at each
56/// StepStart, so the old cross-step delta produced no rate) and
57/// persistent (Backdrop) workers (excluding the inter-step teardown
58/// wall-time a cross-step window would span). On a clean run the
59/// `(StepEnd[N], terminal)` pair is guard-skipped and the trailing
60/// `is_terminal` event is not consumed; it supplies a step's right
61/// boundary ONLY for legacy/synthetic data carrying a `ScenarioEnd`
62/// frame but no `StepEnd` frames. A sched-died step has neither frame
63/// (its early return skips both emissions), so the dead step's
64/// `StepStart` is never a `prev` with a successor and it reports no
65/// rate.
66///
67/// iteration_rate is registered as `MetricKind::Rate` with the Counter
68/// components `total_phase_iterations` / `total_phase_duration_sec` and
69/// `HigherBetter` polarity (more throughput is better). The per-step
70/// producer below emits those two components (the iteration delta and the
71/// window seconds — the ms→s `/1000` applied at the component, since
72/// `derive_rate_metrics` does a bare num/den) rather than a ready ratio,
73/// and the `derive_rate_metrics` post-pass re-derives `iteration_rate` =
74/// Σiterations / Σseconds at every in-map aggregation level. Its per-run
75/// run-scalar fold (one run's per-phase values → that run's `ext_metrics`)
76/// runs through `populate_run_ext_metrics_from_phases`, which SUMS the
77/// Counter components across phases (a synthesized zero-capture phase's
78/// components are summed in, not zero-weighted out — the run-aggregate
79/// completion of the per-step rate handling) and re-derives the rate. The
80/// cross-sidecar-run rollup `group_and_average_by` likewise re-pools via
81/// its `derive_rate_metrics` post-pass. `iteration_rate` has no cross-cgroup
82/// axis to re-pool: it is derived from run-level phase buckets and
83/// host-injected into the run `ext_metrics` by
84/// `populate_run_ext_metrics_from_phases` (the eval layer) AFTER the
85/// cross-cgroup `merge`, so `AssertResult::merge`'s worst-case
86/// (min/max-by-polarity) `ext_metrics` fold never sees its components. The rate whose components ARE per-cgroup is the separate
87/// pooled `iterations_per_cpu_sec`, re-pooled across a run's cgroups by
88/// `populate_run_pooled_iterations_per_cpu_sec` (reading `stats.cgroups`
89/// post-merge).
90///
91/// Live caller: `evaluate_vm_result` at `src/test_support/eval/mod.rs`
92/// — has both the SampleSeries and the stimulus_events vec in scope.
93pub fn build_phase_buckets_with_stimulus(
94    samples: &crate::scenario::sample::SampleSeries,
95    stimulus_events: &[crate::timeline::StimulusEvent],
96) -> Vec<PhaseBucket> {
97    let monitor_samples: &[crate::monitor::MonitorSample] =
98        samples.monitor().map(|m| m.samples()).unwrap_or(&[]);
99    // vCPU-preemption exemption window for the per-phase stall predicate,
100    // threaded into fold_monitor_into_bucket -> compute_metrics so the
101    // per-phase stuck_count uses the SAME predicate as the run-level
102    // MonitorSummary::stuck_count. 0 (no monitor) derives from CONFIG_HZ
103    // inside compute_metrics, matching from_samples_with_threshold.
104    let preemption_threshold_ns: u64 = samples
105        .monitor()
106        .map(|m| m.preemption_threshold_ns())
107        .unwrap_or(0);
108    // Re-group periodic captures by the guest step whose stimulus
109    // window contains each capture's workload-relative boundary offset,
110    // NOT the step_index stamped at (deferred) fire time — see
111    // [`crate::scenario::sample::SampleSeries::by_stimulus_phase`] for
112    // why the scheduled offset is the timing-independent truth (it
113    // survives the deferred-fire burst that collapses the stamped
114    // step_index to one phase).
115    let by_phase = samples.by_stimulus_phase(stimulus_events);
116    // Bucket windows are workload-relative (boundary-offset) here, so the
117    // monitor samples (run-relative) are shifted by the stimulus/monitor
118    // clock skew before windowing.
119    let monitor_to_window_offset_ms = monitor_clock_offset(stimulus_events, monitor_samples);
120    let mut buckets = buckets_from_grouped(
121        by_phase,
122        monitor_samples,
123        monitor_to_window_offset_ms,
124        preemption_threshold_ns,
125    );
126    let step_starts = crate::scenario::sample::step_starts_from_stimulus(stimulus_events);
127    synthesize_missing_step_buckets(
128        &mut buckets,
129        &step_starts,
130        stimulus_events,
131        monitor_samples,
132        monitor_to_window_offset_ms,
133        preemption_threshold_ns,
134    );
135    // Keep buckets in step_index order: buckets_from_grouped emits them
136    // sorted (BTreeMap key order), but the synthesize loop appends out of
137    // order. Downstream lookups resolve by step_index, but a sorted vec
138    // matches the captured-bucket invariant and keeps rendered output
139    // stable.
140    buckets.sort_by_key(|b| b.step_index);
141    fill_phase_iteration_rates(&mut buckets, stimulus_events);
142    buckets
143}
144
145/// Synthesize a `PhaseBucket` for any scenario step that has a stimulus
146/// StepStart but produced no capture bucket. Periodic boundaries are
147/// placed uniformly over the whole workload (step-agnostic — see
148/// `compute_periodic_boundaries_ns`), so a short interior step can
149/// capture zero samples and leave no bucket. The iteration_rate
150/// attribution in [`fill_phase_iteration_rates`] only mutates EXISTING
151/// buckets, so without this seam that step's capture-independent rate
152/// (derived purely from the StepStart/StepEnd total_iterations deltas,
153/// needing no capture) would be silently dropped. The synthesized bucket
154/// carries the step's true stimulus window so `fold_monitor_into_bucket`
155/// still recovers its monitor-derived imbalance; `sample_count == 0` marks
156/// it capture-free for downstream consumers. `Timeline::from_phase_buckets`
157/// COMPARES this bucket's stimulus-derived throughput across the gap
158/// (the iteration_rate is real, not a sampling artifact) but GATES the
159/// monitor-derived metrics (imbalance/dsq/fallback/keep_last) behind
160/// both sides having samples — see `detect_boundary_changes`. BASELINE
161/// (step 0) is synthesized only if a StepStart carries
162/// `step_index == 0` — it is not special-cased.
163fn synthesize_missing_step_buckets(
164    buckets: &mut Vec<PhaseBucket>,
165    step_starts: &[(u64, u16)],
166    stimulus_events: &[crate::timeline::StimulusEvent],
167    monitor_samples: &[crate::monitor::MonitorSample],
168    monitor_to_window_offset_ms: i64,
169    preemption_threshold_ns: u64,
170) {
171    for &(start_ms, k) in step_starts {
172        if buckets.iter().any(|b| b.step_index == k) {
173            continue;
174        }
175        // Right edge: the step's own StepEnd, CLAMPED to the next
176        // step-start so a non-monotonic (corrupt-wire) StepEnd[k] >=
177        // StepStart[k+1] can never extend this synthesized window past the
178        // next step's start (which would fold the same MonitorSample into
179        // two adjacent synthesized buckets). Falls back to the next
180        // step-start alone, else open-ended (the last step on data with no
181        // StepEnd — e.g. sched-died — which the rate loop leaves rateless).
182        // Earliest StepEnd elapsed for k (min, symmetric with start_ms's
183        // earliest step-start), so a duplicate/corrupt StepEnd later in the
184        // vec can't pick a wrong right edge.
185        let step_end = stimulus_events
186            .iter()
187            .filter(|e| e.is_step_end && e.step_index == Some(k))
188            .map(|e| e.elapsed_ms)
189            .min();
190        // `ms > start_ms` (strict): two DISTINCT step_index sharing the
191        // same elapsed is corrupt-wire only (build_stimulus emits StepStarts
192        // at distinct monotone guest-clock times), so the equal-elapsed
193        // window-overlap edge is out of scope — its worst case is a
194        // redundant double-fold of one monitor sample, never a drop or panic.
195        let next_start = step_starts
196            .iter()
197            .filter(|&&(ms, _)| ms > start_ms)
198            .map(|&(ms, _)| ms)
199            .min();
200        // Last-resort right edge for a last step with no StepEnd and no
201        // successor: the scenario-end terminal event, bounding the window
202        // at run-end instead of folding every trailing teardown monitor
203        // sample into the bucket. NOTE the production sched-died path emits
204        // NEITHER a StepEnd NOR a ScenarioEnd (its early return skips both),
205        // so it carries no terminal here and falls to u64::MAX — matching
206        // Timeline::build, which extends the last phase to end-of-monitor.
207        // This terminal clamp therefore only binds for legacy/synthetic
208        // data carrying a ScenarioEnd frame without StepEnd frames.
209        let terminal = stimulus_events
210            .iter()
211            .find(|e| e.is_terminal)
212            .map(|e| e.elapsed_ms);
213        let end_ms = match (step_end, next_start) {
214            (Some(se), Some(ns)) => se.min(ns),
215            (Some(se), None) => se,
216            (None, Some(ns)) => ns,
217            (None, None) => terminal.unwrap_or(u64::MAX),
218        };
219        // BASELINE (step 0) is synthesized only if a StepStart carries
220        // step_index == 0; build_stimulus 1-indexes scenario Steps (Step 0
221        // -> step_index 1), so it never emits step_index 0 and BASELINE is
222        // intentionally never synthesized (its pre-first-Step settle window
223        // carries no meaningful per-step rate). The branch is kept for the
224        // convention, not for a production path.
225        let label = if k == 0 {
226            "BASELINE".to_string()
227        } else {
228            format!("Step[{}]", k.saturating_sub(1))
229        };
230        let mut bucket = PhaseBucket {
231            step_index: k,
232            label,
233            start_ms,
234            end_ms,
235            sample_count: 0,
236            metrics: std::collections::BTreeMap::new(),
237            per_cgroup: std::collections::BTreeMap::new(),
238        };
239        fold_monitor_into_bucket(
240            &mut bucket,
241            monitor_samples,
242            monitor_to_window_offset_ms,
243            preemption_threshold_ns,
244        );
245        buckets.push(bucket);
246    }
247}
248
249/// Fill each phase bucket's `iteration_rate` Rate components from the
250/// stimulus event `total_iterations` deltas. Walk events pairwise; for
251/// each pair compute the rate. Sort events by elapsed_ms first so an
252/// out-of-order arrival from the bulk-port drain doesn't silently lose the
253/// delta to saturating_sub (the legacy Timeline::build path at
254/// src/timeline.rs sorts the same way; without the sort, an inversion
255/// produces duration_ms == 0 → skipped, a silent drop).
256///
257/// Must run AFTER the synthesize seam and the step_index sort so it sees
258/// the full, ordered bucket set exactly as the returned vec will hold it.
259fn fill_phase_iteration_rates(
260    buckets: &mut [PhaseBucket],
261    stimulus_events: &[crate::timeline::StimulusEvent],
262) {
263    let mut sorted_events: Vec<&crate::timeline::StimulusEvent> = stimulus_events.iter().collect();
264    // Total-order on an elapsed_ms tie: StepEnd before StepStart
265    // (`!is_step_end` is false=0 for StepEnd) so a zero-length inter-step
266    // gap at the guest's coarse-ms clock attributes the step-local
267    // StepStart[k]->StepEnd[k] rate to bucket k, never the cross-step
268    // StepStart[k]->StepStart[k+1] delta (the is_step_end guard below only
269    // stops StepEnd from being `prev`; it does not order a StepEnd before
270    // the next StepStart on a tie). Mirrors Timeline::build's sort.
271    sorted_events.sort_by_key(|e| (e.elapsed_ms, !e.is_step_end));
272    for w in sorted_events.windows(2) {
273        let prev = w[0];
274        let curr = w[1];
275        // The terminal scenario-end event and per-step StepEnd events are
276        // right boundaries only — never the EARLIER member of a pair (the
277        // step a rate is attributed to). Both carry an end-of-hold count,
278        // not a step-start, so pairing them as `prev` would attribute the
279        // WRONG window to a bucket:
280        // - terminal sorts last so it is naturally only ever `curr`, but
281        //   guard explicitly so a future caller producing a non-last or
282        //   duplicate terminal can't fall into the `None` step_index
283        //   timestamp-window branch below and attach a bogus rate.
284        // - StepEnd[k] carries step_index k (same as StepStart[k]); if its
285        //   step's own StepStart[k] -> StepEnd[k] pair returned None (a
286        //   BACKWARD count, e < s — a counter reset; a stalled step e == s
287        //   instead yields a measured-ZERO rate, see
288        //   StimulusEvent::rate_components), the bucket k key is still empty,
289        //   so without this guard the next pair StepEnd[k] -> StepStart[k+1]
290        //   would or_insert an inter-step teardown-gap rate into bucket k,
291        //   mislabeling gap throughput as step k's. The guard keeps bucket k
292        //   sourced ONLY by its own StepStart[k] -> StepEnd[k] pair (whether
293        //   a measured zero or a real rate), never a leaked gap rate.
294        if prev.is_terminal || prev.is_step_end {
295            continue;
296        }
297        // Emit the iteration_rate Rate's two components (the per-step
298        // iteration delta + window seconds) rather than the ready ratio, so
299        // derive_rate_metrics re-pools Σiters/Σseconds at every aggregation
300        // level. rate_components is None only when the count went BACKWARD
301        // (e < s, a counter reset), an event lacks total_iterations, or the
302        // window is zero-length; a non-advancing count (e == s over a
303        // positive window) yields Some((0.0, secs)) — a measured zero, not
304        // None (the same shared formula Timeline::build's rate_to divides
305        // for display).
306        let Some((iters, secs)) = prev.rate_components(curr) else {
307            continue;
308        };
309        // Attribute the rate to the step PREV starts: the guards above
310        // leave `prev` as a StepStart (or a legacy/synthetic step event),
311        // so the pair (prev, curr) measures iterations accumulated from
312        // prev's step-start to curr's — the throughput DURING prev's step.
313        // Match by prev.step_index, NOT a timestamp window: the bucket
314        // windows here are workload-relative capture offsets (the step
315        // INTERIOR, 10-90% of the workload per
316        // compute_periodic_boundaries_ns), so prev.elapsed_ms — the
317        // step-START — lands in the inter-step gap, inside no bucket's
318        // [start_ms, end_ms) window; a window match would drop every
319        // rate. This mirrors the legacy Timeline::build alignment:
320        // phase[i] gets events[i]→events[i+1] where events[i] is
321        // phase[i]'s left boundary = prev.
322        //
323        // Stimulus events without a step stamp (step_index == None:
324        // legacy / synthetic callers) fall back to the timestamp
325        // window — half-open [start_ms, end_ms) with a single-sample
326        // (start == end) equality carve-out so a boundary event lands
327        // in exactly one bucket.
328        for bucket in buckets.iter_mut() {
329            let in_bucket = match prev.step_index {
330                Some(k) => bucket.step_index == k,
331                None => {
332                    if bucket.start_ms == bucket.end_ms {
333                        prev.elapsed_ms == bucket.start_ms
334                    } else {
335                        prev.elapsed_ms >= bucket.start_ms && prev.elapsed_ms < bucket.end_ms
336                    }
337                }
338            };
339            if in_bucket {
340                // Both components come from the SAME (prev, curr) pair and are
341                // inserted together; or_insert keeps the first pair that
342                // matched this bucket (mirrors the prior first-wins
343                // iteration_rate attribution). The derive_rate_metrics
344                // post-pass below produces iteration_rate from them.
345                bucket
346                    .metrics
347                    .entry("total_phase_iterations".to_string())
348                    .or_insert(iters);
349                bucket
350                    .metrics
351                    .entry("total_phase_duration_sec".to_string())
352                    .or_insert(secs);
353                break;
354            }
355        }
356    }
357    // Stimulus/monitor components are injected ABOVE, AFTER
358    // buckets_from_grouped's own derive_rate_metrics already ran; re-derive
359    // here so a Rate over a post-injected component is produced for these
360    // buckets too. Idempotent: re-deriving an unchanged map is a no-op.
361    for bucket in buckets.iter_mut() {
362        crate::stats::derive_rate_metrics(&mut bucket.metrics);
363    }
364}
365
366/// Build per-phase metric buckets from a sample series.
367///
368/// Walks [`crate::scenario::sample::SampleSeries::by_stamped_phase`]
369/// to group every stamped sample under its bridge-stamped
370/// `step_index` (NOT re-derived from elapsed-ms windows; the
371/// bridge stamp is authoritative because the capture path knows
372/// the phase it fired from while the time window cannot recover
373/// the phase when stimulus events arrive late or out of order).
374///
375/// For each phase observed (BASELINE under `step_index = 0`,
376/// scenario Steps under `step_index = 1..=N` per the 1-indexed
377/// phase convention) emits one [`PhaseBucket`] with `step_index`
378/// as the key, `label` derived per the BASELINE/Step\[k\]
379/// convention, `start_ms` / `end_ms` from the first / last
380/// sample's `elapsed_ms`, `sample_count` from the bucketed
381/// samples, and `metrics` from the per-kind reduction described
382/// on [`PhaseBucket`]. Metrics whose per-sample reading returns
383/// `None` for every sample in the bucket are omitted entirely
384/// (absent → "no data") rather than collapsed to `Some(0.0)`
385/// (real zero), preserving the sentinel-free contract.
386///
387/// Returns an empty `Vec` when the input series is empty (no
388/// samples captured), distinct from returning a single empty
389/// BASELINE bucket — the former means the periodic-capture path
390/// never fired, the latter means it fired but no metric reading
391/// came back.
392///
393/// No live production caller: `evaluate_vm_result` in
394/// `src/test_support/eval/mod.rs` builds its phase buckets through the
395/// stimulus-aware sibling `build_phase_buckets_with_stimulus` (via
396/// `precompute_early_series`), and `AssertResult.stats.phases` is
397/// populated from those stimulus buckets. This plain variant is
398/// reachable only by full path, for the rare no-stimulus-timeline
399/// case. Exposed `pub` (not `pub(crate)`)
400/// so out-of-tree consumers — payload authors writing custom
401/// eval paths against the publicly-drainable
402/// `result.snapshot_bridge` — can produce the same per-phase
403/// aggregate shape without re-implementing the bucketing logic.
404pub fn build_phase_buckets(samples: &crate::scenario::sample::SampleSeries) -> Vec<PhaseBucket> {
405    // Borrowed per-tick monitor samples (None when no MonitorReport
406    // was attached, e.g. host-only fixture tests).
407    let monitor_samples: &[crate::monitor::MonitorSample] =
408        samples.monitor().map(|m| m.samples()).unwrap_or(&[]);
409    // vCPU-preemption exemption window for the per-phase stall predicate
410    // (see build_phase_buckets_with_stimulus). 0 (no monitor) derives from
411    // CONFIG_HZ inside compute_metrics.
412    let preemption_threshold_ns: u64 = samples
413        .monitor()
414        .map(|m| m.preemption_threshold_ns())
415        .unwrap_or(0);
416    // Group by the bridge-stamped step_index. Without a stimulus
417    // timeline to remap against, the stamped index is the only phase
418    // signal available — see `build_phase_buckets_with_stimulus` (and
419    // `SampleSeries::by_stimulus_phase`) for the offset-remap that
420    // corrects a deferred-fire burst (every capture stamping the same
421    // late CURRENT_STEP). The bucket window and the monitor folding
422    // share the run-relative frame here, so the clock offset is `0`;
423    // synthetic / legacy fixture samples carry `boundary_offset_ms =
424    // None` and the window falls back to `elapsed_ms`.
425    buckets_from_grouped(
426        samples.by_stamped_phase(),
427        monitor_samples,
428        0,
429        preemption_threshold_ns,
430    )
431}
432
433/// Per-phase CPU-time delta (ns) for one field family
434/// (`stime`/`signal_stime` or `utime`/`signal_utime`), folded host-side
435/// from the frozen `task_struct` enrichments captured at the phase's
436/// freeze boundaries. Backs the injected `system_time_ns` /
437/// `user_time_ns` per-phase metrics.
438///
439/// The unit is the THREAD GROUP, not the individual task: the kernel's
440/// `thread_group_cputime` is `signal_struct.{u,s}time` (the accumulator
441/// a dying thread's time is folded into at exit) plus the live threads'
442/// `task_struct.{u,s}time`. `signal_struct` is shared across a thread
443/// group, so its value is counted once per `tgid`; the live counters are
444/// summed across the group's threads. For each `tgid` the group total is
445/// taken at the FIRST and LAST sample in which the group had a READABLE
446/// total (ordered by capture time) and `last - first` is summed across
447/// groups.
448///
449/// Per-group first-seen/last-seen, NOT a per-sample cross-task SUM then a
450/// Counter `last - first`: the captured set changes between freezes
451/// (system tasks churn in and out). A task carrying a large cumulative
452/// counter that appears only in a LATER sample would dump its entire
453/// pre-phase history into a summed delta, inflating the phase value
454/// many-fold. Subtracting each group's OWN first-seen total cancels its
455/// pre-phase history, so a late-joining group contributes only what it
456/// accrued while observed — bounding the result by wall-clock × cores. A
457/// group's thread exiting mid-phase does not dip the total: its time
458/// moves from a `task_struct` counter into `signal_struct`, both of which
459/// the group total includes.
460///
461/// A `None` `signal_field` is a `signal_struct` translate miss, NOT a
462/// real zero (which reads `Some(0)`): such a sample is omitted for that
463/// group so every endpoint is a full live+signal total — mixing a
464/// live-only endpoint with a live+signal one would otherwise leak the
465/// cumulative accumulator as a phantom positive. A numeric `tgid` reused
466/// within the phase (process exit + PID realloc) can read lower at last
467/// than first; `saturating_sub` clamps that to 0 rather than wrapping.
468///
469/// Returns `None` when no group was observed with a readable total across
470/// at least two samples — no delta is measurable — keeping an absent
471/// per-phase bucket key distinct from a real `0` (a qualifying group
472/// whose counters did not advance yields `Some(0.0)`). Accumulates in
473/// `u128` to stay exact before the final `f64` (a phase can total many
474/// task-seconds of ns).
475fn phase_group_cpu_delta(
476    samples: &[crate::scenario::sample::Sample<'_>],
477    task_field: impl Fn(&crate::monitor::task_enrichment::TaskEnrichment) -> u64,
478    signal_field: impl Fn(&crate::monitor::task_enrichment::TaskEnrichment) -> Option<u64>,
479) -> Option<f64> {
480    // Per-tgid `thread_group_cputime` total at one sample: the shared
481    // signal_struct accumulator (once per group) plus the sum of the
482    // group's live threads' task_struct counter. A group is INCLUDED for
483    // a sample only when its signal accumulator was readable there (some
484    // thread of the tgid carried `Some`): a `None` is a signal_struct
485    // translate miss (see `task_enrichment.rs`), NOT a real zero — a real
486    // zero reads `Some(0)`. Omitting unreadable-signal groups keeps every
487    // endpoint a FULL group total (live + signal), so the first/last delta
488    // can never mix a live-only endpoint with a live+signal endpoint and
489    // leak the cumulative accumulator as a phantom positive.
490    let group_totals = |s: &crate::scenario::sample::Sample<'_>| {
491        let mut live: std::collections::HashMap<i32, u128> = std::collections::HashMap::new();
492        let mut signal: std::collections::HashMap<i32, Option<u128>> =
493            std::collections::HashMap::new();
494        for t in s.snapshot.task_enrichments() {
495            *live.entry(t.tgid).or_insert(0) += u128::from(task_field(t));
496            match signal_field(t) {
497                // Shared across the group — any readable thread fixes it.
498                Some(v) => {
499                    signal.insert(t.tgid, Some(u128::from(v)));
500                }
501                None => {
502                    signal.entry(t.tgid).or_insert(None);
503                }
504            }
505        }
506        live.into_iter()
507            .filter_map(|(tgid, l)| {
508                signal
509                    .get(&tgid)
510                    .copied()
511                    .flatten()
512                    .map(|sig| (tgid, l + sig))
513            })
514            .collect::<std::collections::HashMap<i32, u128>>()
515    };
516
517    // Order by capture time so first/last are the earliest/latest
518    // boundary — the grouped vec is not guaranteed sorted (the
519    // offset-remap in the stimulus path can reorder samples).
520    let mut ordered: Vec<&crate::scenario::sample::Sample<'_>> = samples.iter().collect();
521    // `elapsed_ms` is now Option: a sample with neither a
522    // boundary offset nor a measured elapsed has no time anchor — sort
523    // it LAST (u64::MAX), never first, so an untimestamped sample can't
524    // become the spurious earliest first_seen endpoint.
525    ordered.sort_by_key(|s| s.boundary_offset_ms.or(s.elapsed_ms).unwrap_or(u64::MAX));
526
527    // Per tgid: its full group total at the first and last sample in which
528    // it had a readable total, and how many such samples it had.
529    let mut first_seen: std::collections::HashMap<i32, u128> = std::collections::HashMap::new();
530    let mut last_seen: std::collections::HashMap<i32, u128> = std::collections::HashMap::new();
531    let mut readable_count: std::collections::HashMap<i32, u32> = std::collections::HashMap::new();
532    for s in ordered {
533        for (tgid, total) in group_totals(s) {
534            first_seen.entry(tgid).or_insert(total);
535            last_seen.insert(tgid, total);
536            *readable_count.entry(tgid).or_insert(0) += 1;
537        }
538    }
539
540    let mut sum: u128 = 0;
541    let mut measured = false;
542    for (tgid, last) in &last_seen {
543        // A delta needs the group observed with a readable total across
544        // TWO boundaries; one readable sample gives first == last and no
545        // measurable in-phase growth (and a tgid that appears in only one
546        // sample, or whose signal_struct never translated, never reaches
547        // 2). saturating_sub clamps a last < first read — a numeric tgid
548        // reused within the phase (process exit + PID realloc to a fresh
549        // group starting near zero) reads lower at last; clamp to 0 rather
550        // than wrap.
551        if readable_count.get(tgid).copied().unwrap_or(0) < 2 {
552            continue;
553        }
554        measured = true;
555        // first_seen always holds tgid (populated in the same pass).
556        let first = first_seen.get(tgid).copied().unwrap_or(*last);
557        sum += last.saturating_sub(first);
558    }
559    // None when no group was observed with a readable total across two
560    // samples — unmeasurable, so the bucket key stays ABSENT (distinct
561    // from a real 0: a qualifying group whose counters did not advance
562    // yields Some(0.0)).
563    measured.then_some(sum as f64)
564}
565
566/// Per-phase per-CPU spatial-max fold for a monotonic per-CPU counter: the
567/// BUSIEST CPU's delta over the phase (`max_key`) + the concentration ratio
568/// `max / mean` over the reporting CPUs (`concentration_key`). A custom
569/// per-CPU-delta fold — NOT a read_sample arm (read_sample yields the cross-CPU
570/// SUM as one f64, with no per-CPU vector). `field` selects the per-CPU counter
571/// (`|c| c.irqs_sum` for hardirqs, `|c| c.softirqs[NET_RX]` for NET_RX softirqs).
572///
573/// Endpoints are the earliest + latest freeze BY `win` (the SAME key the bucket
574/// window uses; `samples_in_phase` is NOT positionally ordered — the stimulus
575/// offset-remap can reorder it). The per-CPU delta is correlated by the
576/// per_cpu_time `cpu` FIELD (not vec position) over the CPUs present in BOTH
577/// endpoints; a CPU absent from either is skipped (hotplug defense — a no-op on
578/// ktstr's boot-fixed vCPU set, correct-by-construction if a hot-add Op lands).
579/// `saturating_sub` clamps a per-CPU counter regress (the kernel counters are
580/// monotonic; a regress would signal a reset). The per-CPU delta is taken
581/// FIRST, the spatial max SECOND — so a busiest CPU that SHIFTS across the phase
582/// is captured as max(per-CPU deltas), not a delta of spatial maxes.
583///
584/// Loud-absent: `max_key` needs at least 2 freezes spanning a positive `win`
585/// window and at least 1 defined-delta CPU; `concentration_key` ADDITIONALLY
586/// needs at least 2 defined-delta CPUs (a 1-CPU intersection makes max/mean == 1
587/// a structural artifact) and a positive mean (a NaN would fail the sidecar's
588/// is_finite insert-guard).
589fn fold_per_cpu_spatial_max(
590    metrics: &mut std::collections::BTreeMap<String, f64>,
591    samples_in_phase: &[crate::scenario::sample::Sample<'_>],
592    win: impl Fn(&crate::scenario::sample::Sample<'_>) -> Option<u64>,
593    field: impl Fn(&crate::monitor::dump::PerCpuTimeStats) -> u64,
594    max_key: &str,
595    concentration_key: &str,
596) {
597    let placed: Vec<(&crate::scenario::sample::Sample<'_>, u64)> = samples_in_phase
598        .iter()
599        .filter(|s| !s.snapshot.per_cpu_time().is_empty())
600        .filter_map(|s| win(s).map(|w| (s, w)))
601        .collect();
602    if let (Some(&(first_s, fw)), Some(&(last_s, lw))) = (
603        placed.iter().min_by_key(|(_, w)| *w),
604        placed.iter().max_by_key(|(_, w)| *w),
605    ) && fw < lw
606    {
607        // Per-CPU delta over the cpu-field intersection of the two endpoint
608        // freezes; clamp a regress to 0.
609        let deltas: Vec<f64> = first_s
610            .snapshot
611            .per_cpu_time()
612            .iter()
613            .filter_map(|c0| {
614                last_s
615                    .snapshot
616                    .per_cpu_time_at(c0.cpu)
617                    .map(|c1| field(c1).saturating_sub(field(c0)) as f64)
618            })
619            .collect();
620        if let Some(max) = deltas.iter().copied().reduce(f64::max) {
621            metrics.entry(max_key.to_string()).or_insert(max);
622            if deltas.len() >= 2 {
623                let mean = deltas.iter().sum::<f64>() / deltas.len() as f64;
624                if mean > 0.0 {
625                    metrics
626                        .entry(concentration_key.to_string())
627                        .or_insert(max / mean);
628                }
629            }
630        }
631    }
632}
633
634/// Per-CPU scx_layered util-compensation SCALE over a freeze span: the factor by
635/// which a CPU's useful-work (non-overhead) capacity is scaled up to compensate
636/// for IRQ / softirq / stolen time. Over the `first`->`last` cpustat delta,
637/// `scale = delta_total / available` where `delta_total` is the sum of ALL 8
638/// `kernel_cpustat[]` ns slots (user+nice+system+idle+iowait+irq+softirq+steal)
639/// and `available = delta_total - (irq + softirq + steal)`. Clamped to
640/// `[1.0, 20.0]`; `available == 0` (overhead consumed the whole window, or no
641/// forward progress) returns the floor `1.0` (no compensation) — scx_layered's
642/// `available > 0` guard, expressed over `u64` saturating deltas where `> 0`
643/// is `!= 0`.
644///
645/// Byte-faithful to scx_layered's `util_compensation` per-CPU compute. The
646/// ns-vs-µs unit is immaterial: the ratio cancels it (scx_layered reads /proc
647/// microseconds; here the host reads `kernel_cpustat` ns, the same slots
648/// `/proc/stat` formats from). Hostile-guest hardening: every per-slot delta is
649/// `saturating_sub` (a cpustat counter that regresses across the span — a
650/// CPU-hotplug reset, or a corrupt read — clamps to 0, never wraps to
651/// ~`u64::MAX`), exactly as scx_layered does. The one divergence from
652/// scx_layered: the 8-slot `delta_total` and 3-slot `overhead` sums use
653/// `saturating_add` (a hostile per-slot `u64::MAX` clamps the total, mirroring
654/// the per-CPU IRQ spatial sums in [`crate::stats::MetricDef::read_sample`]),
655/// where scx_layered combines its per-slot saturating-subs with plain `+` — its
656/// /proc inputs are trusted, ours are guest-controlled. With `overhead >= 0` the
657/// ratio is always `>= 1.0`, so the clamp's floor is belt-and-suspenders and the
658/// cap at 20.0 is the only active bound.
659pub(crate) fn cpu_util_comp_scale(
660    first: &crate::monitor::dump::PerCpuTimeStats,
661    last: &crate::monitor::dump::PerCpuTimeStats,
662) -> f64 {
663    let user = last.cpustat_user_ns.saturating_sub(first.cpustat_user_ns);
664    let nice = last.cpustat_nice_ns.saturating_sub(first.cpustat_nice_ns);
665    let system = last
666        .cpustat_system_ns
667        .saturating_sub(first.cpustat_system_ns);
668    let idle = last.cpustat_idle_ns.saturating_sub(first.cpustat_idle_ns);
669    let iowait = last
670        .cpustat_iowait_ns
671        .saturating_sub(first.cpustat_iowait_ns);
672    let irq = last.cpustat_irq_ns.saturating_sub(first.cpustat_irq_ns);
673    let softirq = last
674        .cpustat_softirq_ns
675        .saturating_sub(first.cpustat_softirq_ns);
676    let steal = last.cpustat_steal_ns.saturating_sub(first.cpustat_steal_ns);
677    let delta_total = [user, nice, system, idle, iowait, irq, softirq, steal]
678        .into_iter()
679        .fold(0u64, u64::saturating_add);
680    let overhead = irq.saturating_add(softirq).saturating_add(steal);
681    let available = delta_total.saturating_sub(overhead);
682    if available == 0 {
683        return 1.0;
684    }
685    (delta_total as f64 / available as f64).clamp(1.0, 20.0)
686}
687
688/// Per-phase mean ACROSS CPUs of the [`cpu_util_comp_scale`] over the
689/// `per_cpu_time` freeze span, inserted as `avg_cpu_util_comp_scale`. Endpoint
690/// selection (first/last by `win`), the cpu-field intersection (per-CPU paired
691/// by the `cpu` field; a CPU absent from either endpoint is skipped — hotplug
692/// churn), the saturating deltas, and the loud-absent discipline (no key when
693/// no CPU pairs or the window is degenerate, `fw == lw`) mirror
694/// [`fold_per_cpu_spatial_max`]. `Gauge(Avg)`, so it auto-folds to run-level
695/// (weighted mean across phases) and cross-run with no extra wiring.
696///
697/// Cross-boot caveat: `samples_in_phase` are the periodic freezes, whose
698/// window starts at `max(scenario-start, prereqs-ready)` — on a cold boot
699/// (accessor/attach lag) it starts later, so this key reflects a later
700/// workload sub-window. Across a cold/warm boot mix its cross-run magnitude
701/// shifts; use `--noise-adjust` (spread analysis absorbs the boot-timing
702/// jitter) for cross-run compares — a plain single-pair (cached) compare
703/// of this key is advisory.
704fn fold_util_comp_scale(
705    metrics: &mut std::collections::BTreeMap<String, f64>,
706    samples_in_phase: &[crate::scenario::sample::Sample<'_>],
707    win: impl Fn(&crate::scenario::sample::Sample<'_>) -> Option<u64>,
708) {
709    let placed: Vec<(&crate::scenario::sample::Sample<'_>, u64)> = samples_in_phase
710        .iter()
711        .filter(|s| !s.snapshot.per_cpu_time().is_empty())
712        .filter_map(|s| win(s).map(|w| (s, w)))
713        .collect();
714    if let (Some(&(first_s, fw)), Some(&(last_s, lw))) = (
715        placed.iter().min_by_key(|(_, w)| *w),
716        placed.iter().max_by_key(|(_, w)| *w),
717    ) && fw < lw
718    {
719        // Per-CPU clamped scale over the cpu-field intersection of the two
720        // endpoint freezes, then the arithmetic mean across the reporting CPUs.
721        let scales: Vec<f64> = first_s
722            .snapshot
723            .per_cpu_time()
724            .iter()
725            .filter_map(|c0| {
726                last_s
727                    .snapshot
728                    .per_cpu_time_at(c0.cpu)
729                    .map(|c1| cpu_util_comp_scale(c0, c1))
730            })
731            .collect();
732        if !scales.is_empty() {
733            let mean = scales.iter().sum::<f64>() / scales.len() as f64;
734            metrics
735                .entry("avg_cpu_util_comp_scale".to_string())
736                .or_insert(mean);
737        }
738    }
739}
740
741/// Per-phase per-task latency-criticality aggregate from the scheduler's
742/// `sdt_alloc` arena (scx_lavd's `task_ctx.normalized_lat_cri`, `[0, 1024]`).
743/// Over every freeze's `sdt_allocations` walk, pulls each live task_ctx's
744/// `normalized_lat_cri` — a host-rendered arena field (BPF_MAP_TYPE_ARENA), NOT
745/// a kernel counter and NOT a BPF `.bss` field — and folds across (freeze,
746/// task): mean -> `avg_task_lat_cri`, max -> `max_task_lat_cri`.
747///
748/// A GAUGE: `normalized_lat_cri` is an instantaneous per-task value lavd
749/// recomputes each schedule (scx_lavd `lat_cri.bpf.c`), so the fold is over ALL
750/// samples (mean / max over every (freeze, task) observation), NOT a first/last
751/// delta like the cpustat folds. `normalized` (not raw `lat_cri`) for cross-run
752/// comparability — raw `lat_cri` is squared + waker/wakee-propagated +
753/// load-dependent, so its magnitude is not comparable across runs.
754///
755/// Loud-absent + scheduler-agnostic by construction: a non-lavd payload has no
756/// `normalized_lat_cri` member, so [`crate::monitor::btf_render::RenderedValue::get`]
757/// returns `None` and the entry is skipped; if no entry yields the field, no key
758/// is inserted (the "no key when absent" discipline the sibling folds use). The
759/// member name is the only gate (lavd-unique) — no scheduler-name hardcoding.
760///
761/// Cross-boot caveat: like `fold_util_comp_scale`, these are the periodic
762/// freezes, whose window starts later on a cold boot (the prereq-ready
763/// anchor), so `avg_task_lat_cri`'s cross-run magnitude shifts across a
764/// cold/warm boot mix; use `--noise-adjust` for cross-run compares —
765/// a single-pair (cached) compare is advisory.
766fn fold_lat_cri(
767    metrics: &mut std::collections::BTreeMap<String, f64>,
768    samples_in_phase: &[crate::scenario::sample::Sample<'_>],
769) {
770    let mut sum = 0.0f64;
771    let mut count = 0usize;
772    let mut max = f64::MIN;
773    for s in samples_in_phase {
774        for alloc in &s.snapshot.report().sdt_allocations {
775            for entry in &alloc.entries {
776                if let Some(v) = entry
777                    .payload
778                    .get("normalized_lat_cri")
779                    .and_then(|r| r.as_u64())
780                {
781                    let v = v as f64;
782                    sum += v;
783                    count += 1;
784                    if v > max {
785                        max = v;
786                    }
787                }
788            }
789        }
790    }
791    if count > 0 {
792        metrics
793            .entry("avg_task_lat_cri".to_string())
794            .or_insert(sum / count as f64);
795        metrics.entry("max_task_lat_cri".to_string()).or_insert(max);
796    }
797}
798
799/// Per-phase per-CGROUP spatial axis over the workload-leaf PSI-irq captured at
800/// each freeze (`Snapshot::cgroup_psi`). The per-cgroup analog of
801/// [`fold_per_cpu_spatial_max`], producing three Peak metrics:
802///
803/// - `max_cgroup_psi_irq_avg10`: the worst leaf's IRQ-full pressure GAUGE
804///   (decoded avg10 percent) — per-freeze max across leaves, then max across the
805///   phase's freezes. A gauge, so no delta/baseline (a spatial-max of an
806///   instantaneous reading, the `max_avg_irq_util` shape on the cgroup axis).
807/// - `max_cgroup_irq_pressure`: the busiest leaf's IRQ-full stall DELTA over the
808///   phase (decoded µs) — per-leaf (last - first) `total_ns`, correlated by
809///   `cgroup_kva`, then the spatial max. A monotonic Counter, so delta-FIRST
810///   then spatial-max (the `max_cpu_hardirqs` shape). Informational
811///   (workload-confounded magnitude).
812/// - `max_cgroup_irq_pressure_concentration`: `max_cgroup_irq_pressure` / the
813///   mean per-leaf delta over the SAME leaf set — the busiest cell's share (the
814///   isolation/steering signal). max/MEAN, LowerBetter (the
815///   `max_cpu_hardirq_concentration` shape). Absent when < 2 reporting leaves or
816///   mean == 0.
817///
818/// Endpoints + the leaf intersection + saturating + loud-absent rules mirror
819/// [`fold_per_cpu_spatial_max`] (the `(cgroup_kva, serial_nr)` pair is the
820/// per-CPU `cpu`-field analog — the serial_nr disambiguates a freed slab KVA
821/// reused by a NEW cgroup, which `cgroup_kva` alone cannot; a leaf absent from
822/// either endpoint is skipped — leaf churn / a cgroup created mid-phase). All
823/// three are Peak → they auto-fold max-across-phases to run-level and cross-run
824/// MAX, no extra wiring.
825fn fold_per_cgroup_psi(
826    metrics: &mut std::collections::BTreeMap<String, f64>,
827    samples_in_phase: &[crate::scenario::sample::Sample<'_>],
828    win: impl Fn(&crate::scenario::sample::Sample<'_>) -> Option<u64>,
829) {
830    use crate::monitor::btf_offsets::{decode_avg10_percent, decode_total_us};
831    // avg10 gauge: per-freeze max across leaves, then max across the freezes.
832    let avg10_peak = samples_in_phase
833        .iter()
834        .filter_map(|s| {
835            s.snapshot
836                .cgroup_psi()
837                .iter()
838                .map(|c| decode_avg10_percent(c.avg10_raw))
839                .reduce(f64::max)
840        })
841        .reduce(f64::max);
842    if let Some(v) = avg10_peak {
843        metrics
844            .entry("max_cgroup_psi_irq_avg10".to_string())
845            .or_insert(v);
846    }
847
848    // total-delta spatial-max + concentration. Earliest + latest freeze by `win`
849    // (the bucket-window key; samples_in_phase is not positionally ordered). The
850    // per-leaf delta is correlated by `(cgroup_kva, serial_nr)` over the leaves
851    // present in BOTH endpoints; a leaf absent from either is skipped. The
852    // serial_nr guards KVA REUSE: a leaf rmdir'd mid-phase whose freed slab KVA a
853    // NEW cgroup then reused would alias by KVA alone and yield a bogus
854    // cross-cgroup delta — a different serial_nr at the same KVA is treated as
855    // absent (dropped), not differenced. `saturating_sub` then clamps only a
856    // (rare) genuine same-cgroup counter regress. delta FIRST, spatial max
857    // SECOND — a busiest cell that shifts across the phase is captured as
858    // max(per-leaf deltas), not a delta of spatial maxes.
859    let placed: Vec<(&crate::scenario::sample::Sample<'_>, u64)> = samples_in_phase
860        .iter()
861        .filter(|s| !s.snapshot.cgroup_psi().is_empty())
862        .filter_map(|s| win(s).map(|w| (s, w)))
863        .collect();
864    if let (Some(&(first_s, fw)), Some(&(last_s, lw))) = (
865        placed.iter().min_by_key(|(_, w)| *w),
866        placed.iter().max_by_key(|(_, w)| *w),
867    ) && fw < lw
868    {
869        let deltas: Vec<f64> = first_s
870            .snapshot
871            .cgroup_psi()
872            .iter()
873            .filter_map(|c0| {
874                last_s
875                    .snapshot
876                    .cgroup_psi()
877                    .iter()
878                    .find(|c1| c1.cgroup_kva == c0.cgroup_kva && c1.serial_nr == c0.serial_nr)
879                    .map(|c1| decode_total_us(c1.total_ns.saturating_sub(c0.total_ns)))
880            })
881            .collect();
882        if let Some(max) = deltas.iter().copied().reduce(f64::max) {
883            metrics
884                .entry("max_cgroup_irq_pressure".to_string())
885                .or_insert(max);
886            if deltas.len() >= 2 {
887                let mean = deltas.iter().sum::<f64>() / deltas.len() as f64;
888                if mean > 0.0 {
889                    metrics
890                        .entry("max_cgroup_irq_pressure_concentration".to_string())
891                        .or_insert(max / mean);
892                }
893            }
894        }
895    }
896}
897
898/// Assemble [`PhaseBucket`]s from a pre-grouped phase map. Shared by
899/// [`build_phase_buckets`] (grouping by the bridge-stamped step_index)
900/// and [`build_phase_buckets_with_stimulus`] (grouping by the
901/// offset-remapped step).
902///
903/// `monitor_to_window_offset_ms` is subtracted from each
904/// [`crate::monitor::MonitorSample`] `elapsed_ms` before the window
905/// test, bringing the monitor sample's run-relative timestamp into the
906/// bucket-window frame: `0` when both share the run-relative frame, the
907/// stimulus/monitor clock skew (see [`monitor_clock_offset`]) when the
908/// window is workload-relative (boundary-offset) but the monitor samples
909/// remain run-relative.
910///
911/// Each bucket folds the monitor samples whose (shifted) elapsed_ms
912/// lands in the bucket window — supplying metrics like
913/// `avg_imbalance_ratio` that need per-CPU full-class `rq.nr_running`,
914/// which the bridge-captured Snapshot does not expose (Snapshot carries
915/// scx_rq.nr_running only).
916///
917/// `preemption_threshold_ns` is forwarded to `fold_monitor_into_bucket`
918/// (and thence `compute_metrics`) for the per-phase stall predicate; see
919/// `fold_monitor_into_bucket`.
920fn buckets_from_grouped(
921    by_phase: std::collections::BTreeMap<u16, Vec<crate::scenario::sample::Sample<'_>>>,
922    monitor_samples: &[crate::monitor::MonitorSample],
923    monitor_to_window_offset_ms: i64,
924    preemption_threshold_ns: u64,
925) -> Vec<PhaseBucket> {
926    let mut out: Vec<PhaseBucket> = Vec::with_capacity(by_phase.len());
927    for (step_index, samples_in_phase) in by_phase {
928        let label = if step_index == 0 {
929            "BASELINE".to_string()
930        } else {
931            // Scenario-Step ordinal lives at `step_index - 1`
932            // because phase 0 is BASELINE under the 1-indexed
933            // encoding; saturate at 0 if the underflow guard
934            // ever fires (unreachable for the current encoding
935            // — step_index here came from the bucket key so the
936            // `> 0` branch is satisfied — but keep the guard so
937            // a future caller that hands in a synthetic
938            // `step_index = 0` does not panic).
939            format!("Step[{}]", step_index.saturating_sub(1))
940        };
941        let sample_count = samples_in_phase.len();
942        // Bucket window: prefer each capture's workload-relative
943        // scheduled boundary offset (stable across a deferred-fire
944        // burst) over the run-relative fire time; fall back to
945        // `elapsed_ms` per-sample when no offset was stamped (synthetic
946        // / on-demand). min/max rather than first/last because the
947        // offset-remap in the stimulus path can reorder samples
948        // relative to drain order.
949        let win = |s: &crate::scenario::sample::Sample<'_>| s.boundary_offset_ms.or(s.elapsed_ms);
950        let (start_ms, end_ms) = if samples_in_phase.is_empty() {
951            (0, u64::MAX)
952        } else {
953            let mut lo = u64::MAX;
954            let mut hi = 0u64;
955            for s in &samples_in_phase {
956                // Skip a sample with no time anchor (no boundary offset
957                // AND no measured elapsed): coercing it to 0
958                // would pull start_ms to 0 and over-fold monitor samples.
959                // If EVERY sample in the phase is unanchored, lo/hi stay
960                // (u64::MAX, 0) — an inverted window that folds nothing,
961                // the correct "no placeable samples" outcome.
962                if let Some(w) = win(s) {
963                    lo = lo.min(w);
964                    hi = hi.max(w);
965                }
966            }
967            (lo, hi)
968        };
969        let mut metrics: std::collections::BTreeMap<String, f64> =
970            std::collections::BTreeMap::new();
971        for metric_def in crate::stats::METRICS {
972            let per_sample_readings: Vec<f64> = samples_in_phase
973                .iter()
974                .filter_map(|s| metric_def.read_sample(s))
975                .collect();
976            if per_sample_readings.is_empty() {
977                // No per-sample reading for any sample in this
978                // bucket -- the metric is host-side-only
979                // (cross-cgroup fold) or its dispatch arm has
980                // not landed yet. Omit the key rather than
981                // collapsing to `Some(0.0)` so the renderer
982                // paints "absent" vs "real zero" distinctly.
983                continue;
984            }
985            if let Some(reduced) =
986                crate::stats::aggregate_samples_for_phase(metric_def, &per_sample_readings)
987            {
988                metrics.insert(metric_def.name.to_string(), reduced);
989            }
990        }
991        // Per-phase system / user CPU time (ns), injected post-hoc as a
992        // per-thread-GROUP delta over the phase's freeze samples (NOT a
993        // read_sample metric — a per-sample cross-task sum then a Counter
994        // delta inflates when the captured task set churns; see
995        // `phase_group_cpu_delta`). Observer-free: reads the frozen
996        // task_struct.{s,u}time + thread-group signal_struct accumulator
997        // already captured in each sample's enrichments.
998        if let Some(v) = phase_group_cpu_delta(&samples_in_phase, |t| t.stime, |t| t.signal_stime) {
999            metrics.entry("system_time_ns".to_string()).or_insert(v);
1000        }
1001        if let Some(v) = phase_group_cpu_delta(&samples_in_phase, |t| t.utime, |t| t.signal_utime) {
1002            metrics.entry("user_time_ns".to_string()).or_insert(v);
1003        }
1004        // Per-CPU spatial axes (busiest-CPU counter delta + concentration) over
1005        // the per_cpu_time freezes: hardirqs (kstat.irqs_sum) and NET_RX softirqs
1006        // (kstat.softirqs[NET_RX]). Both are monotonic per-CPU counters; the fold
1007        // takes the per-CPU delta FIRST then the spatial max — see
1008        // `fold_per_cpu_spatial_max` for the shared endpoint / cpu-field
1009        // intersection / saturating / loud-absent rules.
1010        fold_per_cpu_spatial_max(
1011            &mut metrics,
1012            &samples_in_phase,
1013            win,
1014            |c| c.irqs_sum,
1015            "max_cpu_hardirqs",
1016            "max_cpu_hardirq_concentration",
1017        );
1018        fold_per_cpu_spatial_max(
1019            &mut metrics,
1020            &samples_in_phase,
1021            win,
1022            |c| c.softirqs[crate::monitor::btf_offsets::SOFTIRQ_NET_RX],
1023            "max_cpu_softirq_net_rx",
1024            "max_cpu_softirq_net_rx_concentration",
1025        );
1026        // scx_layered util-compensation scale: per-CPU first->last cpustat delta
1027        // -> clamped scale -> mean across CPUs (avg_cpu_util_comp_scale). Over
1028        // the same per_cpu_time freezes as the spatial axes above, but a Gauge
1029        // (per-CPU clamp-then-mean), not a spatial-max counter delta.
1030        fold_util_comp_scale(&mut metrics, &samples_in_phase, win);
1031        // scx_lavd per-task latency-criticality: mean/max of each live task_ctx's
1032        // normalized_lat_cri ([0,1024]) over the sdt_alloc arena walk at every
1033        // freeze (avg_task_lat_cri / max_task_lat_cri). A gauge folded over all
1034        // (freeze, task) observations; loud-absent for non-lavd schedulers.
1035        fold_lat_cri(&mut metrics, &samples_in_phase);
1036        // Per-cgroup PSI-irq spatial axis: the busiest workload-leaf's IRQ-full
1037        // stall delta + avg10 gauge + the cross-leaf concentration, over the
1038        // cgroup_psi freezes (the host cgroup-walk capture). All Peak; auto-fold
1039        // run-level.
1040        fold_per_cgroup_psi(&mut metrics, &samples_in_phase, win);
1041        let mut bucket = PhaseBucket {
1042            step_index,
1043            label,
1044            start_ms,
1045            end_ms,
1046            sample_count,
1047            metrics,
1048            per_cgroup: std::collections::BTreeMap::new(),
1049        };
1050        // Per-phase MonitorSample windowing for monitor-derived metrics
1051        // (avg_imbalance_ratio). Factored into `fold_monitor_into_bucket`
1052        // so a synthesized zero-capture bucket (see
1053        // `build_phase_buckets_with_stimulus`) recovers its in-window
1054        // imbalance too — same formula and frame, but over the
1055        // synthesized bucket's FULL stimulus window vs this captured
1056        // bucket's narrower interior capture-offset span.
1057        fold_monitor_into_bucket(
1058            &mut bucket,
1059            monitor_samples,
1060            monitor_to_window_offset_ms,
1061            preemption_threshold_ns,
1062        );
1063        // Phase-wall denominators for the IRQ rates, co-inserted
1064        // both-or-neither with the read_sample-folded IRQ numerators above so
1065        // derive_rate_metrics pairs num/den from THIS bucket (never cross-paired
1066        // with a stimulus event's total_phase_duration_sec — a distinct
1067        // metric). The window is the CAPTURE span (start_ms/end_ms = first->last
1068        // freeze offset), the same span the Counter numerators accrue over:
1069        // self-consistent for the ns/ns fraction, and a capture-window rate for
1070        // the per-second rates (the d/span factor cancels in an A/B compare with
1071        // matched cadence). Gate on total_hardirqs (present iff per_cpu_time was
1072        // captured; irqtime-independent, unlike total_irq_time_ns) + a positive
1073        // window. ms->ns and ms->s scaling lives HERE (derive_rate_metrics does
1074        // bare num/den). One insertion covers both derive sites:
1075        // build_phase_buckets_with_stimulus folds over buckets_from_grouped's
1076        // output, so its later re-derive sees these denominators too.
1077        if bucket.metrics.contains_key("total_hardirqs") && bucket.end_ms > bucket.start_ms {
1078            let wall_ms = (bucket.end_ms - bucket.start_ms) as f64;
1079            bucket
1080                .metrics
1081                .entry("total_phase_wall_ns".to_string())
1082                .or_insert(wall_ms * 1_000_000.0);
1083            bucket
1084                .metrics
1085                .entry("total_phase_wall_sec".to_string())
1086                .or_insert(wall_ms / 1000.0);
1087        }
1088        // Derive Rate metrics AFTER every component source is folded in:
1089        // the METRICS reductions + system/user_time_ns above AND the
1090        // monitor-injected components (e.g. avg_imbalance_ratio, whose ONLY
1091        // source is fold_monitor_into_bucket). Deriving before the monitor
1092        // fold would silently drop a Rate over a monitor-only component. A
1093        // Rate has no samples of its own (see MetricKind::Rate); this is its
1094        // per-phase producer. Inert until a Rate registers. The stimulus
1095        // sibling build_phase_buckets_with_stimulus re-derives again after
1096        // its own later injections (idempotent).
1097        crate::stats::derive_rate_metrics(&mut bucket.metrics);
1098        out.push(bucket);
1099    }
1100    out
1101}
1102
1103/// Fold the per-CPU full-class imbalance from the monitor samples whose
1104/// run-relative timestamp falls in `bucket`'s `[start_ms, end_ms)`
1105/// window into `bucket.metrics["avg_imbalance_ratio"]`.
1106///
1107/// The monitor sample's run-relative `elapsed_ms` is shifted into the
1108/// bucket-window frame (subtract `monitor_to_window_offset_ms`) before
1109/// the half-open `[start_ms, end_ms)` test so a MonitorSample whose
1110/// timestamp equals the boundary lands in exactly one bucket (not both
1111/// adjacent buckets — the closed-on-right form double-counted boundary
1112/// samples). Single-sample phases (`start_ms == end_ms`) use explicit
1113/// equality so the window is not empty.
1114///
1115/// Filters via [`crate::monitor::sample_looks_valid`] (implausible-DSQ
1116/// samples) before the fold; `compute_metrics` additionally drops
1117/// empty-cpus samples (which would default `imbalance_ratio` to 1.0 and
1118/// pull the mean toward "perfect balance", masking a real regression) —
1119/// matching the legacy `Timeline::build` path's filter discipline.
1120///
1121/// Folds the FULL monitor-derived metric set the legacy `Timeline::build`
1122/// reducer (`crate::timeline::compute_metrics`) produces —
1123/// `avg_imbalance_ratio`, `max_imbalance_ratio`, `avg_dsq_depth`,
1124/// `max_dsq_depth`, `stuck_count`, and the `total_fallback` /
1125/// `total_keep_last` counter deltas — over the bucket's window.
1126/// `avg_imbalance_ratio`, `max_imbalance_ratio`, and `stuck_count` are
1127/// folded for EVERY bucket with in-window monitor samples: none of the three
1128/// has a `read_sample` dispatch arm (`crate::stats` `read_sample` has arms
1129/// only for the dsq / fallback keys; these three fall to `_ => None`), so the
1130/// per-sample capture path never produces them and monitor is their only
1131/// per-bucket source on captured AND synthesized buckets alike — a captured
1132/// (common-case) phase must report its per-phase imbalance peak and stall
1133/// count, not drop them. (`avg_imbalance_ratio` is genuinely ext-metrics-only;
1134/// `max_imbalance_ratio` and `stuck_count` ALSO carry a typed `GauntletRow`
1135/// accessor sourced from the whole-run MonitorSummary, so their per-phase
1136/// fold here feeds per-phase RENDERING only — the run-level value stays the
1137/// typed accessor, and both `populate_run_ext_metrics*` skip them via
1138/// `TYPED_FIELD_NAMES` to avoid a double-source.) The dsq / fallback set
1139/// (`avg_dsq_depth`,
1140/// `max_dsq_depth`, `total_fallback`, `total_keep_last`) is folded ONLY for
1141/// a synthesized (`sample_count == 0`) bucket: a captured bucket sources
1142/// those from its read_sample captures and keeps its pre-synthesize
1143/// behavior, while a synthesized bucket has no captures, so monitor is its
1144/// only source — restoring the rendered timeline to PARITY with the old
1145/// `Timeline::build` fallback (the path a zero-capture-with-monitor run
1146/// took before the synthesize seam flipped it onto from_phase_buckets;
1147/// `format_phases` renders these folded metrics for a `sample_count == 0`
1148/// bucket via its `has_monitor_metrics` gate). Each key is `or_insert` so
1149/// it never overwrites a value already present. Parity with
1150/// `Timeline::build` is exact for the production case; for legacy
1151/// ScenarioEnd-but-no-StepEnd data the synthesized last-step window clamps
1152/// to the terminal rather than extending to end-of-monitor, and a
1153/// synthesized bucket's dsq metrics come from the monitor
1154/// `CpuSnapshot.local_dsq_depth` axis (vs a captured bucket's DSQ-walker
1155/// axis) — same metric, different sampling axis.
1156///
1157/// `bucket`'s `[start_ms, end_ms)` IS the window basis and differs by
1158/// bucket kind: a captured bucket's is the min/max of its samples'
1159/// interior capture offsets; a synthesized bucket's is its full
1160/// `[StepStart, StepEnd)` stimulus window. The monitor sample's
1161/// run-relative `elapsed_ms` is shifted into that frame (subtract
1162/// `monitor_to_window_offset_ms`) before the half-open test so a sample
1163/// on the boundary lands in exactly one bucket. `compute_metrics` returns
1164/// fallback / keep_last as RATES; this re-derives the bucket-native
1165/// counter DELTAS (so `phase_from_bucket` re-rates them over the bucket
1166/// window like the read_sample path) using the same `counter_delta` clamp.
1167fn fold_monitor_into_bucket(
1168    bucket: &mut PhaseBucket,
1169    monitor_samples: &[crate::monitor::MonitorSample],
1170    monitor_to_window_offset_ms: i64,
1171    preemption_threshold_ns: u64,
1172) {
1173    let start_ms = bucket.start_ms;
1174    let end_ms = bucket.end_ms;
1175    let in_window = |monitor_ms: u64| -> bool {
1176        let shifted = monitor_ms as i64 - monitor_to_window_offset_ms;
1177        if shifted < 0 {
1178            return false;
1179        }
1180        let m = shifted as u64;
1181        if start_ms == end_ms {
1182            m == start_ms
1183        } else {
1184            m >= start_ms && m < end_ms
1185        }
1186    };
1187    // Filter via sample_looks_valid (matches Timeline::build) so an invalid
1188    // sample (empty cpus -> imbalance_ratio 1.0 default) doesn't pull the
1189    // mean toward "perfect balance" and mask a real regression.
1190    let phase_monitor_samples: Vec<&crate::monitor::MonitorSample> = monitor_samples
1191        .iter()
1192        .filter(|s| in_window(s.elapsed_ms))
1193        .filter(|s| crate::monitor::sample_looks_valid(s))
1194        .collect();
1195    if phase_monitor_samples.is_empty() {
1196        return;
1197    }
1198    let pm = crate::timeline::compute_metrics(&phase_monitor_samples, preemption_threshold_ns);
1199    // sample_count == 0 IFF the bucket is synthesized: buckets_from_grouped
1200    // only emits >=1-sample buckets, the synthesize loop only emits 0-sample
1201    // ones. A CAPTURED bucket folds the source-less monitor signals (avg/max
1202    // imbalance + stuck_count — none have a read_sample arm) but NOT the
1203    // dsq / fallback set, which it sources from its read_sample captures.
1204    // Restricting only that dsq / fallback set to synthesized buckets
1205    // preserves captured-bucket behavior; a synthesized bucket has no
1206    // captures, so monitor is its only source and it takes the full set
1207    // (Timeline::build render parity).
1208    let synthesized = bucket.sample_count == 0;
1209    let mut put = |key: &str, v: f64| {
1210        if v.is_finite() {
1211            bucket.metrics.entry(key.to_string()).or_insert(v);
1212        }
1213    };
1214    if let Some(v) = pm.avg_imbalance {
1215        put("avg_imbalance_ratio", v);
1216    }
1217    // avg_nr_running (Gauge(Avg)) is a monitor-axis signal (full-class
1218    // rq.nr_running, no read_sample dispatch arm), so like avg_imbalance_ratio
1219    // it is folded for EVERY monitor-bearing bucket (not gated on synthesized)
1220    // — captured buckets have no other source for it. It feeds per-phase
1221    // rendering + boundary change-detection only; the run-level value stays
1222    // MonitorSummary::avg_nr_running (fold_run_level_ext), so the run-level ext
1223    // re-pool (populate_run_ext_metrics_from_phases) SKIPS this key to avoid a
1224    // double-source.
1225    if let Some(v) = pm.avg_nr_running {
1226        put("avg_nr_running", v);
1227    }
1228    // max_imbalance_ratio (Peak) and stuck_count (Counter) have NO read_sample
1229    // dispatch arm (both fall to `_ => None` in crate::stats read_sample), so
1230    // the per-sample capture path never produces them and a CAPTURED bucket
1231    // would otherwise never carry them — they would surface only on synthesized
1232    // (zero-capture) buckets. Fold them for EVERY monitor-bearing bucket — like
1233    // avg_imbalance_ratio above — so a captured (common-case) phase reports its
1234    // per-phase imbalance peak and stall count instead of dropping them. (Both
1235    // are monitor-axis signals: imbalance from full-class rq.nr_running, stalls
1236    // from non-advancing rq.clock across consecutive samples — neither is in
1237    // the guest Snapshot read_sample observes. Both ALSO carry a typed
1238    // run-level GauntletRow accessor, so this per-phase fold feeds per-phase
1239    // RENDERING only; TYPED_FIELD_NAMES keeps them out of the run-level
1240    // ext_metrics so the typed accessor stays the single run-level source.)
1241    // `or_insert` still guards against overwriting a value already present.
1242    if let Some(v) = pm.max_imbalance {
1243        put("max_imbalance_ratio", v);
1244    }
1245    if pm.stall_count > 0 {
1246        put("stuck_count", pm.stall_count as f64);
1247    }
1248    if synthesized {
1249        if let Some(v) = pm.avg_dsq_depth {
1250            put("avg_dsq_depth", v);
1251        }
1252        put("max_dsq_depth", pm.max_dsq_depth as f64);
1253        // Bucket-native counter totals for fallback / keep_last: the first
1254        // and last in-window samples carrying event counters, clamped with
1255        // the same counter_delta MonitorSummary uses (a mid-phase scheduler
1256        // restart can reset the counter, producing a negative raw delta).
1257        // phase_from_bucket re-rates these over the bucket window, matching
1258        // the read_sample representation captured buckets carry.
1259        let has_events =
1260            |s: &&crate::monitor::MonitorSample| s.cpus.iter().any(|c| c.event_counters.is_some());
1261        let first_ev = phase_monitor_samples.iter().copied().find(has_events);
1262        let last_ev = phase_monitor_samples.iter().copied().rev().find(has_events);
1263        if let (Some(first), Some(last)) = (first_ev, last_ev) {
1264            let fb = crate::monitor::counter_delta(
1265                last.sum_event_field(|e| e.select_cpu_fallback).unwrap_or(0),
1266                first
1267                    .sum_event_field(|e| e.select_cpu_fallback)
1268                    .unwrap_or(0),
1269            );
1270            let kl = crate::monitor::counter_delta(
1271                last.sum_event_field(|e| e.dispatch_keep_last).unwrap_or(0),
1272                first.sum_event_field(|e| e.dispatch_keep_last).unwrap_or(0),
1273            );
1274            put("total_fallback", fb as f64);
1275            put("total_keep_last", kl as f64);
1276        }
1277    }
1278}
1279
1280/// Clock skew (ms) between the host monitor's run-relative timeline and
1281/// the guest's scenario-relative stimulus timeline, computed the same
1282/// way as [`crate::timeline::Timeline::build`]: the first significant
1283/// monitor sample (elapsed > 500 ms, non-empty cpus) and the earliest
1284/// stimulus event roughly coincide at scenario start. Returns
1285/// `first_monitor_ms - first_stimulus_ms`; subtract from a monitor
1286/// sample's elapsed_ms to reach the scenario-relative (boundary-offset)
1287/// window frame. `0` when either timeline is empty (nothing to align,
1288/// so the run-relative frames are used as-is).
1289fn monitor_clock_offset(
1290    stimulus_events: &[crate::timeline::StimulusEvent],
1291    monitor_samples: &[crate::monitor::MonitorSample],
1292) -> i64 {
1293    if stimulus_events.is_empty() || monitor_samples.is_empty() {
1294        return 0;
1295    }
1296    let first_stimulus_ms = stimulus_events
1297        .iter()
1298        .map(|e| e.elapsed_ms)
1299        .min()
1300        .unwrap_or(0);
1301    let first_monitor_ms = monitor_samples
1302        .iter()
1303        .find(|s| s.elapsed_ms > 500 && !s.cpus.is_empty())
1304        .map(|s| s.elapsed_ms)
1305        .unwrap_or_else(|| monitor_samples.first().map(|s| s.elapsed_ms).unwrap_or(0));
1306    first_monitor_ms as i64 - first_stimulus_ms as i64
1307}