ktstr/timeline.rs
1//! Stimulus/phase correlation for scenario execution.
2//!
3//! Correlates [`StimulusEvent`]s (cgroup operations, cpuset changes)
4//! with `MonitorSample` windows to
5//! measure per-phase scheduler behavior degradation. Produces
6//! [`Timeline`] entries consumed by the stats and reporting pipeline.
7
8use std::fmt;
9
10use crate::monitor::{MonitorSample, sample_looks_valid};
11
12// ---------------------------------------------------------------------------
13// TimelineContext — system context rendered as a header
14// ---------------------------------------------------------------------------
15
16/// System context for a timeline, rendered as a header block.
17#[derive(Debug, Clone, Default)]
18pub struct TimelineContext {
19 /// Kernel version string (e.g. "6.14.0-rc3+").
20 pub kernel: Option<String>,
21 /// Topology description (e.g. "2n4l4c2t (16 cpus)").
22 pub topology: Option<String>,
23 /// Scheduler name (e.g. "scx_mitosis").
24 pub scheduler: Option<String>,
25 /// Scenario name.
26 pub scenario: Option<String>,
27 /// Total run duration in seconds.
28 pub duration_s: Option<f64>,
29}
30
31// ---------------------------------------------------------------------------
32// StimulusEvent — what happened and when
33// ---------------------------------------------------------------------------
34
35/// A discrete event during scenario execution that may cause observable
36/// changes in scheduler behavior. Generated by step executors on the guest
37/// side and carried in the VM output alongside monitor samples.
38#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
39pub struct StimulusEvent {
40 /// Milliseconds since scenario start (guest monotonic clock).
41 pub elapsed_ms: u64,
42 /// Human-readable label. Produced as `"StepStart[k]"` by
43 /// [`Self::from_wire`] (the 0-indexed scenario Step ordinal),
44 /// `"ScenarioEnd"` by [`Self::terminal`], and the
45 /// `"BASELINE"`/`"Step[k]"` bucket label by the
46 /// `phase_from_bucket` placeholder. Test fixtures may carry any
47 /// label.
48 pub label: String,
49 /// What kind of operation triggered this event.
50 pub op_kind: Option<String>,
51 /// Additional context (e.g. "4 cpus", "cgroup=cg_0").
52 pub detail: Option<String>,
53 /// Cumulative worker iterations at this event. `Some(_)` for every
54 /// event built from the wire (the wire counter is always present —
55 /// see [`Self::from_wire`]); a cumulative counter for which
56 /// `Some(0)` is a legitimate "no iterations accumulated yet"
57 /// baseline, NOT a missing sample. `None` only for synthetic /
58 /// placeholder events that carry no counter (the
59 /// `phase_from_bucket` fallback and test fixtures). Used to
60 /// compute per-phase throughput (iterations/s) as the delta
61 /// between consecutive events.
62 ///
63 /// SEMANTICS: this is the sum of the iteration counters of the
64 /// worker handles ALIVE at the event instant (step-local +
65 /// Backdrop). Each step emits BOTH a StepStart event (counter at the
66 /// step's start) and a StepEnd event ([`Self::is_step_end`], counter
67 /// at the step's end-of-hold), so the per-phase iteration_rate is the
68 /// STEP-LOCAL delta `StepEnd[k] - StepStart[k]` — each step's OWN
69 /// workers measured start-to-end. That works for workers respawned
70 /// per step (the cross-step `StepStart[k+1] - StepStart[k]` delta
71 /// reads fresh~0 - fresh~0 and is dropped) AND is more accurate for
72 /// persistent (Backdrop) workers (it excludes the inter-step
73 /// teardown/respawn wall-time the cross-step window spanned). Bucket
74 /// `k` is sourced ONLY by its `StepStart[k] -> StepEnd[k]` pair: the
75 /// `iteration_rate` attribution loop in
76 /// [`crate::assert::build_phase_buckets_with_stimulus`] skips any
77 /// `is_step_end` `prev`, so a stalled step whose step-local delta is
78 /// zero (`StepEnd[k] == StepStart[k]`) reports its MEASURED-ZERO rate
79 /// `Some(0.0)` (see `Self::rate_to`) rather than leaking the
80 /// inter-step gap rate from the `StepEnd[k] -> StepStart[k+1]` pair.
81 /// The monitor-only
82 /// [`Timeline::build`] fallback (no snapshot captures) computes the
83 /// SAME step-local `StepStart[k] -> StepEnd[k]` rate — the StepEnd
84 /// events reach it too (they are emitted independent of captures) — and
85 /// falls back to cross-step (or the terminal for the last step) only
86 /// when a step has no StepEnd (sched-died / legacy data); StepEnd is
87 /// filtered only from that path's phase LAYOUT, not its rate.
88 pub total_iterations: Option<u64>,
89 /// 1-indexed scenario step this event belongs to (the same
90 /// encoding the bridge stamps: `1..=N` for Step ordinals), or
91 /// `None` for non-step events (including the terminal scenario-end
92 /// boundary; see `is_terminal`). Carried explicitly from the wire
93 /// `StimulusPayload.step_index` so the periodic-capture phase
94 /// attribution can map a capture's workload-relative boundary
95 /// offset onto the guest's own step timeline without parsing the
96 /// human-readable `label`.
97 pub step_index: Option<u16>,
98 /// True only for the synthetic scenario-end boundary the eval
99 /// walker appends from the `ScenarioEnd` wire frame's final
100 /// `total_iterations`. On a CLEAN run the last step emits its own
101 /// `StepEnd[N]`, which supplies that step's `iteration_rate` right
102 /// boundary in BOTH rate consumers — the snapshot path
103 /// ([`crate::assert::build_phase_buckets_with_stimulus`], the
104 /// `StepStart[N]` -> `StepEnd[N]` pair) and the monitor-only
105 /// [`Timeline::build`] fallback (which looks up each step's `StepEnd`
106 /// by `step_index`) — and the terminal is then NOT consumed for a
107 /// rate: the snapshot path's attribution loop skips the
108 /// `(StepEnd[N], terminal)` pair via its `is_step_end` guard (before
109 /// `rate_components` is reached), and `Timeline::build` reaches for the
110 /// terminal only when a step's `StepEnd` lookup misses. The terminal
111 /// is consumed as a step's rate boundary ONLY for legacy/synthetic
112 /// data that carries a `ScenarioEnd` frame but no `StepEnd` frames
113 /// (fresh guest output always pairs them). A sched-died step is NOT
114 /// such a case: its early return skips BOTH the `StepEnd` emission AND
115 /// `send_scenario_end`, so neither frame exists and the dead step
116 /// reports no rate via the no-successor path. It is NOT a step start:
117 /// `step_index` is `None` so it seeds no [`crate::assert::PhaseBucket`]
118 /// (excluded from the step-start timeline), and [`Timeline::build`]
119 /// skips it when laying out phase boundaries so it never renders a
120 /// phantom trailing phase.
121 pub is_terminal: bool,
122 /// True for a per-step END event (decoded from a
123 /// `crate::vmm::wire::MsgType::StepEnd` frame via
124 /// [`Self::from_step_end`]). It carries the SAME 1-indexed
125 /// `step_index` as its StepStart and its step's end-of-hold
126 /// `total_iterations`, so [`crate::assert::build_phase_buckets_with_stimulus`]'s
127 /// elapsed-sorted `windows(2)` pairs `StepStart[k]` -> `StepEnd[k]`
128 /// first and `or_insert` keeps that step-local rate. NOT a step
129 /// start, so [`Timeline::build`] (the monitor-only fallback's
130 /// index-based cross-step pairing) filters it out of its step-start
131 /// list to avoid a phantom phase.
132 pub is_step_end: bool,
133}
134
135impl StimulusEvent {
136 /// Build a timeline event from a deserialized wire stimulus event.
137 /// Centralizes the wire→timeline mapping so the production eval path
138 /// (`evaluate_vm_result`) and out-of-tree consumers — post_vm
139 /// callbacks folding `VmResult::stimulus_timeline()` (which calls
140 /// this internally) through
141 /// [`crate::assert::build_phase_buckets_with_stimulus`] — produce
142 /// identical events. The wire `step_index` is the bridge 1-indexed
143 /// convention (`Step[k]` -> `k + 1`, BASELINE owns 0); the human
144 /// `label` renders the 0-indexed Scenario-Step ordinal
145 /// (`step_index - 1`) to match the `PhaseBucket` `Step[k]` labels,
146 /// while the `step_index` field keeps the 1-indexed wire value for
147 /// phase-bucket remap. `total_iterations` is carried verbatim as
148 /// `Some(_)`: the wire field is a cumulative counter that is always
149 /// populated (the guest sums live worker iterations at every step
150 /// boundary), so `0` is a legitimate baseline reading — the FIRST
151 /// step's frame fires right after its workers spawn and genuinely
152 /// reads ~0. Collapsing that `0` to `None` (the old behavior) made
153 /// the (first, second) delta pair fail the `Some`/`Some` guard in
154 /// both rate consumers, silently dropping the first step's
155 /// `iteration_rate`; carrying `Some(0)` lets the delta compute the
156 /// first step's throughput for the PERSISTENT (Backdrop) population
157 /// (see the `total_iterations` field doc for the persistent-vs-
158 /// step-local semantics this delta measures).
159 pub fn from_wire(ev: &crate::vmm::wire::StimulusEvent) -> Self {
160 Self {
161 elapsed_ms: ev.elapsed_ms as u64,
162 label: format!("StepStart[{}]", ev.step_index.saturating_sub(1)),
163 op_kind: Some(format!("ops={}", ev.op_count)),
164 detail: Some(format!(
165 "{} cgroups, {} workers",
166 ev.cgroup_count, ev.worker_count,
167 )),
168 total_iterations: Some(ev.total_iterations),
169 step_index: Some(ev.step_index),
170 is_terminal: false,
171 is_step_end: false,
172 }
173 }
174
175 /// Build a per-step END event from a `crate::vmm::wire::MsgType::StepEnd`
176 /// frame (reuses the `crate::vmm::wire::StimulusEvent` wire body).
177 /// Carries the SAME 1-indexed `step_index` as the step's StepStart
178 /// and the step's end-of-hold `total_iterations`, with `is_step_end`
179 /// set. Elapsed-sorted, a step's events order `StepStart[k]` (start) <
180 /// `StepEnd[k]` (end-of-hold) < `StepStart[k+1]`, so
181 /// [`crate::assert::build_phase_buckets_with_stimulus`]'s `windows(2)`
182 /// pairs `StepStart[k]` -> `StepEnd[k]` first and `or_insert` keeps that
183 /// step-local rate. `is_terminal` is false (it is a real per-step
184 /// boundary, not the scenario-end terminal).
185 pub fn from_step_end(ev: &crate::vmm::wire::StimulusEvent) -> Self {
186 Self {
187 elapsed_ms: ev.elapsed_ms as u64,
188 label: format!("StepEnd[{}]", ev.step_index.saturating_sub(1)),
189 op_kind: Some(format!("ops={}", ev.op_count)),
190 detail: Some(format!(
191 "{} cgroups, {} workers",
192 ev.cgroup_count, ev.worker_count,
193 )),
194 total_iterations: Some(ev.total_iterations),
195 step_index: Some(ev.step_index),
196 is_terminal: false,
197 is_step_end: true,
198 }
199 }
200
201 /// Build the synthetic terminal boundary event from the
202 /// `ScenarioEnd` wire frame's final cumulative `total_iterations`
203 /// and scenario-relative `elapsed_ms`. Appended once, after every
204 /// per-step [`Self::from_wire`] event. On a clean run `StepEnd[N]`
205 /// supplies the last step's `iteration_rate` right boundary in both
206 /// rate consumers and the terminal is not consumed for a rate; it is
207 /// consumed as a step's boundary ONLY for legacy/synthetic data with a
208 /// `ScenarioEnd` frame but no `StepEnd` frames (a sched-died step has
209 /// neither, since the early return skips both emissions) — see the
210 /// [`Self::is_terminal`] field doc.
211 /// `step_index` is `None` (it is not a step start — it seeds no
212 /// [`crate::assert::PhaseBucket`]) and `is_terminal` is set so
213 /// [`Timeline::build`] treats it as a right boundary only, never a
214 /// phase. `elapsed_ms` is in the same guest-monotonic frame as the
215 /// step events (both come from `scenario_start.elapsed()`), so the
216 /// last-step duration is well-formed.
217 pub fn terminal(elapsed_ms: u64, total_iterations: u64) -> Self {
218 Self {
219 elapsed_ms,
220 label: "ScenarioEnd".to_string(),
221 op_kind: None,
222 detail: None,
223 total_iterations: Some(total_iterations),
224 step_index: None,
225 is_terminal: true,
226 is_step_end: false,
227 }
228 }
229
230 /// Iterations-per-second from this event to `next`:
231 /// `(next.total_iterations - self.total_iterations)` over the
232 /// guest-clock elapsed-ms delta between them. Returns `None` ONLY when
233 /// the measurement is genuinely undefined: either event lacks a
234 /// `total_iterations` sample, the window is zero-length, or the count
235 /// went BACKWARD (`next < self` — a counter reset; the delta is
236 /// unmeasurable, not zero). The backward case is reachable only for the
237 /// guard-skipped cross-step pairing or legacy/synthetic data, NOT for
238 /// the live step-local `StepStart[k]` -> `StepEnd[k]` pair: teardown
239 /// runs after `StepEnd` is emitted, so the handle set is stable within
240 /// a step and the per-worker counters are monotone across the pair.
241 ///
242 /// MEASURED ZERO is distinct from not-measured: a step whose workers
243 /// made exactly zero forward progress over a positive hold
244 /// (`next == self`) returns `Some(0.0)`, not `None`. Zero throughput
245 /// is a real, measured value — the strongest degradation signal — so
246 /// it must surface, not vanish. With `Some(0.0)` a phase that
247 /// collapsed to zero IS visible to the throughput-degradation detector
248 /// ([`Timeline::build`] / [`Timeline::from_phase_buckets`]): when the
249 /// prior phase had a positive rate (`before > 0.0`), the relative
250 /// delta is `-1.0` and the drop is flagged. (A phase that was already
251 /// zero before is still not relatively comparable — the detector's
252 /// `before > 0.0` gate avoids a div-by-zero — but an *unchanged* zero
253 /// is not a degradation.)
254 ///
255 /// This is the SINGLE iteration-rate formula, shared via its
256 /// decomposition [`Self::rate_components`] by
257 /// [`crate::assert::build_phase_buckets_with_stimulus`] (per-step
258 /// windows attributed by `step_index`) and [`Timeline::build`]
259 /// (per-phase windows attributed by index) — the two callers pair
260 /// events differently but must compute the rate identically. The
261 /// per-step metric producer inserts the `rate_components` pair (the
262 /// `iteration_rate` Rate's `total_phase_iterations` /
263 /// `total_phase_duration_sec` components); `rate_to` (the quotient) is
264 /// the display/comparison form used by `Timeline::build` and the
265 /// result-helper ratios.
266 pub fn rate_to(&self, next: &StimulusEvent) -> Option<f64> {
267 self.rate_components(next).map(|(iters, secs)| iters / secs)
268 }
269
270 /// The `(iteration_delta, window_seconds)` components of [`Self::rate_to`]
271 /// — same `None` conditions (missing `total_iterations`, backward count,
272 /// or zero-length window). The per-phase metric pipeline inserts these as
273 /// the `total_phase_iterations` / `total_phase_duration_sec` Counter
274 /// components rather than the ready ratio, so the `iteration_rate` Rate
275 /// re-pools across phases/runs as `Σdelta / Σseconds`, not a mean of
276 /// per-phase ratios. The ms→s `/1000` lives HERE (the seconds component)
277 /// because `derive_rate_metrics` does a bare num/den with no scaling.
278 pub fn rate_components(&self, next: &StimulusEvent) -> Option<(f64, f64)> {
279 let s = self.total_iterations?;
280 let e = next.total_iterations?;
281 if e < s {
282 return None;
283 }
284 let duration_ms = next.elapsed_ms.saturating_sub(self.elapsed_ms);
285 if duration_ms == 0 {
286 return None;
287 }
288 Some(((e - s) as f64, duration_ms as f64 / 1000.0))
289 }
290
291 /// The scenario [`Phase`](crate::assert::Phase) this event belongs to,
292 /// or `None` for the terminal scenario-end boundary (which seeds no
293 /// phase). Use THIS — not the raw [`Self::step_index`] field — to key
294 /// per-phase lookups. `step_index` carries the bridge 1-indexed wire
295 /// convention (`Step k` -> `Some(k + 1)`) while `label` renders the
296 /// 0-indexed `k`, so reading the field directly invites the 0-vs-1
297 /// off-by-one this method removes: it maps the wire value onto the same
298 /// [`Phase`](crate::assert::Phase) newtype the
299 /// [`ScenarioStats`](crate::assert::ScenarioStats) /
300 /// [`PhaseBucket`](crate::assert::PhaseBucket) accessors are keyed by
301 /// (`Phase::step(k)`). Step events carry `step_index >= 1`, so the
302 /// `saturating_sub(1)` is exact.
303 pub fn phase(&self) -> Option<crate::assert::Phase> {
304 self.step_index
305 .map(|si| crate::assert::Phase::step(si.saturating_sub(1)))
306 }
307}
308
309// ---------------------------------------------------------------------------
310// Phase — a time window between consecutive stimulus events
311// ---------------------------------------------------------------------------
312
313/// Metrics aggregated from monitor samples within a phase.
314#[derive(Debug, Clone, Default)]
315pub struct PhaseMetrics {
316 pub sample_count: usize,
317 /// Mean CPU-imbalance ratio over the phase's valid samples. `None`
318 /// when the phase had no valid samples (monitor-only `Timeline::build`)
319 /// or its source bucket carried no `avg_imbalance_ratio` metric
320 /// (snapshot `from_phase_buckets`) — distinct from a real `Some(0.0)`
321 /// (perfectly balanced). The change detector compares it only when
322 /// both sides are `Some`, so an absent phase never reads as a false
323 /// zero-imbalance.
324 pub avg_imbalance: Option<f64>,
325 /// Peak CPU-imbalance ratio over the phase's valid samples. `None` on
326 /// the same no-data conditions as [`Self::avg_imbalance`].
327 pub max_imbalance: Option<f64>,
328 /// Mean local-DSQ depth over the phase's valid samples. `None` on the
329 /// same no-data conditions as [`Self::avg_imbalance`].
330 pub avg_dsq_depth: Option<f64>,
331 /// Mean runqueue occupancy (full-class `rq.nr_running`) over the phase's
332 /// valid samples — per-sample mean of the per-CPU `nr_running`, averaged
333 /// over samples (the `avg_dsq_depth` shape). `None` on the same no-data
334 /// conditions as [`Self::avg_imbalance`]. The run-level value stays
335 /// `MonitorSummary::avg_nr_running` (folded by `fold_run_level_ext`); this
336 /// per-phase field feeds rendering + boundary change-detection only and is
337 /// kept out of the run-level ext re-pool (see `populate_run_ext_metrics_from_phases`).
338 pub avg_nr_running: Option<f64>,
339 pub max_dsq_depth: u32,
340 pub stall_count: usize,
341 /// select_cpu_fallback events per second. None when event counters unavailable.
342 pub fallback_rate: Option<f64>,
343 /// dispatch_keep_last events per second. None when event counters unavailable.
344 pub keep_last_rate: Option<f64>,
345 /// Worker iterations per second during this phase. Computed from
346 /// cumulative iteration counts in consecutive stimulus events.
347 pub iteration_rate: Option<f64>,
348}
349
350/// Direction of change at a phase boundary.
351#[derive(Debug, Clone, Copy, PartialEq, Eq)]
352pub enum ChangeDirection {
353 Improved,
354 Degraded,
355}
356
357impl fmt::Display for ChangeDirection {
358 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
359 match self {
360 ChangeDirection::Improved => write!(f, "IMPROVEMENT"),
361 ChangeDirection::Degraded => write!(f, "DEGRADATION"),
362 }
363 }
364}
365
366/// Detected change at a stimulus boundary.
367#[derive(Debug, Clone)]
368pub struct PhaseChange {
369 pub direction: ChangeDirection,
370 pub metric: String,
371 pub before: f64,
372 pub after: f64,
373}
374
375/// A time window between two consecutive stimulus events.
376#[derive(Debug, Clone)]
377pub struct Phase {
378 pub index: usize,
379 pub start_ms: u64,
380 pub end_ms: u64,
381 /// The stimulus event that starts this phase (None for the initial phase).
382 pub stimulus: Option<StimulusEvent>,
383 pub metrics: PhaseMetrics,
384 /// Changes detected at this phase's stimulus boundary.
385 pub changes: Vec<PhaseChange>,
386 /// Per-cgroup raw telemetry for this phase, keyed by cgroup name. Carried
387 /// from [`crate::assert::PhaseBucket::per_cgroup`] on the
388 /// [`Self`]-via-`from_phase_buckets` path; empty on the monitor-only
389 /// [`Timeline::build`] path (which has no carriers). Rendered as a
390 /// per-cgroup sub-block by display-time reduction
391 /// ([`crate::assert::PhaseCgroupStats::off_cpu_summary`] etc.); never a
392 /// change-detection input (those are [`PhaseMetrics`] scalars only).
393 pub per_cgroup: std::collections::BTreeMap<String, crate::assert::PhaseCgroupStats>,
394 /// True when `(start_ms, end_ms)` is the normalized `(0, 0)` of an ORPHAN
395 /// carrier (no measured host window) rather than a real window. Set in
396 /// `phase_from_bucket` by the orphan shape signature `(0,0)` + empty
397 /// `metrics` + non-empty `per_cgroup` (unique to
398 /// `crate::assert::fold_guest_per_cgroup_into_host_buckets`'s orphan arm).
399 /// The render shows "window not measured" instead of a misleading `0ms`.
400 /// Always `false` on the [`Timeline::build`] path.
401 pub not_measured_window: bool,
402}
403
404// ---------------------------------------------------------------------------
405// Timeline
406// ---------------------------------------------------------------------------
407
408/// Correlated timeline of stimulus events and monitor observations.
409#[derive(Debug, Clone)]
410pub struct Timeline {
411 pub phases: Vec<Phase>,
412}
413
414/// Minimum delta in imbalance ratio to flag a change (avoids noise).
415const IMBALANCE_THRESHOLD: f64 = 0.5;
416/// Minimum delta in DSQ depth to flag a change.
417const DSQ_THRESHOLD: f64 = 3.0;
418/// Minimum delta in mean runqueue depth (avg_nr_running) to flag a change:
419/// one additional runnable task per CPU on average between phases.
420const NR_RUNNING_THRESHOLD: f64 = 1.0;
421/// Minimum delta in fallback rate (events/s) to flag a change.
422const FALLBACK_RATE_THRESHOLD: f64 = 10.0;
423/// Minimum delta in keep_last rate (events/s) to flag a change.
424const KEEP_LAST_RATE_THRESHOLD: f64 = 10.0;
425/// Minimum relative change in iteration rate to flag a throughput change.
426/// 0.3 = 30% drop or increase.
427const ITERATION_RATE_REL_THRESHOLD: f64 = 0.3;
428
429/// Create a PhaseChange if the delta between `before` and `after` exceeds
430/// `threshold`. `higher_is_worse` determines degradation direction: when
431/// true, a positive delta means Degraded; when false, a negative delta
432/// means Degraded.
433fn detect_change(
434 before: f64,
435 after: f64,
436 threshold: f64,
437 metric: &str,
438 higher_is_worse: bool,
439) -> Option<PhaseChange> {
440 let delta = after - before;
441 if delta.abs() <= threshold {
442 return None;
443 }
444 let degraded = if higher_is_worse {
445 delta > 0.0
446 } else {
447 delta < 0.0
448 };
449 Some(PhaseChange {
450 direction: if degraded {
451 ChangeDirection::Degraded
452 } else {
453 ChangeDirection::Improved
454 },
455 metric: metric.to_string(),
456 before,
457 after,
458 })
459}
460
461/// Per-boundary change set between two adjacent phases, shared by both
462/// [`Timeline::build`] and [`Timeline::from_phase_buckets`].
463///
464/// Throughput (`iteration_rate`) is compared whenever BOTH phases carry
465/// a rate and the earlier one is positive — INCLUDING a synthesized
466/// zero-capture step, whose rate is stimulus-derived (total_iterations
467/// deltas) rather than sampled. Throughput is the one metric that
468/// survives a capture gap, so a throughput collapse entering or leaving
469/// a synthesized step must still be flagged; gating it on `sample_count`
470/// (as both call sites did before) silently dropped exactly the
471/// degradation this timeline exists to surface.
472///
473/// Asymmetry from the `bi > 0.0` div-by-zero guard: a COLLAPSE into a
474/// zero-rate (incl. a synthesized measured-zero) step is flagged
475/// (before > 0), but a RECOVERY out of one (before == 0 -> positive) is
476/// not — there is no relative baseline to divide by. This is a
477/// deliberate tradeoff, not an oversight: an unchanged zero is genuinely
478/// not a degradation, and the collapse direction (the one this task
479/// targets) is caught.
480///
481/// The monitor-derived metrics (imbalance, dsq depth, fallback rate,
482/// keep_last rate) ARE gated on both phases having real samples. The
483/// values themselves can be real even on a synthesized phase (monitor
484/// windows fold into the bucket), but they come from a DIFFERENT
485/// sampling basis (folded monitor window vs captured periodic samples)
486/// than a captured neighbor's, so cross-comparing them is
487/// apples-to-oranges — suppressed. Throughput is exempt because it is
488/// the same stimulus-derived quantity on both sides.
489fn detect_boundary_changes(before: &PhaseMetrics, after: &PhaseMetrics) -> Vec<PhaseChange> {
490 let mut changes = Vec::new();
491
492 // Monitor-derived metrics (absolute-unit gauges/rates with
493 // fixed-magnitude thresholds) — gated on both phases having real
494 // samples. Pushed FIRST to preserve the historical render order
495 // (these before throughput in a multi-change boundary; format_phases
496 // renders changes in vec order).
497 if before.sample_count > 0 && after.sample_count > 0 {
498 if let (Some(bi), Some(ai)) = (before.avg_imbalance, after.avg_imbalance) {
499 changes.extend(detect_change(
500 bi,
501 ai,
502 IMBALANCE_THRESHOLD,
503 "imbalance",
504 true,
505 ));
506 }
507 if let (Some(bd), Some(ad)) = (before.avg_dsq_depth, after.avg_dsq_depth) {
508 changes.extend(detect_change(bd, ad, DSQ_THRESHOLD, "dsq_depth", true));
509 }
510 if let (Some(bn), Some(an)) = (before.avg_nr_running, after.avg_nr_running) {
511 changes.extend(detect_change(
512 bn,
513 an,
514 NR_RUNNING_THRESHOLD,
515 "nr_running",
516 true,
517 ));
518 }
519 if let (Some(bf), Some(af)) = (before.fallback_rate, after.fallback_rate) {
520 changes.extend(detect_change(
521 bf,
522 af,
523 FALLBACK_RATE_THRESHOLD,
524 "fallback",
525 true,
526 ));
527 }
528 if let (Some(bk), Some(ak)) = (before.keep_last_rate, after.keep_last_rate) {
529 changes.extend(detect_change(
530 bk,
531 ak,
532 KEEP_LAST_RATE_THRESHOLD,
533 "keep_last",
534 true,
535 ));
536 }
537 }
538
539 // Throughput is the SOLE rate-class field (counter-delta / elapsed), so
540 // it uses a RELATIVE threshold (rel = (ai-bi)/bi vs
541 // ITERATION_RATE_REL_THRESHOLD) and cannot route through detect_change's
542 // absolute-delta gate above — fixed-unit metrics get absolute
543 // thresholds, this gets a relative one (a semantic-class decision, not
544 // arbitrary). Strict `>`: an exactly-at-threshold relative change is not
545 // flagged. Pushed last so it renders after the monitor metrics.
546 if let (Some(bi), Some(ai)) = (before.iteration_rate, after.iteration_rate)
547 && bi > 0.0
548 {
549 let rel = (ai - bi) / bi;
550 if rel.abs() > ITERATION_RATE_REL_THRESHOLD {
551 changes.push(PhaseChange {
552 direction: if rel < 0.0 {
553 ChangeDirection::Degraded
554 } else {
555 ChangeDirection::Improved
556 },
557 metric: "throughput".to_string(),
558 before: bi,
559 after: ai,
560 });
561 }
562 }
563
564 changes
565}
566
567impl Timeline {
568 /// Build a timeline from stimulus events and monitor samples.
569 ///
570 /// Clock alignment: stimulus events use guest monotonic time (ms since
571 /// scenario start). Monitor samples use host monotonic time (ms since
572 /// VM boot). The first stimulus event's timestamp and the first
573 /// non-trivial monitor sample (after 500ms warmup) approximately
574 /// coincide. We compute an offset to align them.
575 ///
576 /// Returns an empty timeline if either input is empty.
577 /// Build a Timeline from stimulus events + raw monitor
578 /// samples via the per-window `compute_metrics` reduction.
579 /// The production success path uses [`Self::from_phase_buckets`]
580 /// (which folds pre-bucketed PhaseBuckets); `build` is the fallback
581 /// evaluate_vm_result takes only for a run with an EMPTY PhaseBuckets
582 /// vec but monitor samples present — i.e. no periodic captures AND no
583 /// stimulus Steps. A monitor-only run that DID run Steps now
584 /// synthesizes a capture-free bucket per StepStart (see
585 /// [`crate::assert::build_phase_buckets_with_stimulus`]), so its vec is
586 /// non-empty and it takes the from_phase_buckets path (whose
587 /// fold_monitor_into_bucket recovers the same monitor-derived metric
588 /// set this path computes). Both entry points produce the same
589 /// Timeline field shape; from_phase_buckets is preferred when buckets
590 /// are available because it avoids the per-MonitorSample reduction.
591 ///
592 /// `preemption_threshold_ns` threads the vCPU-preemption exemption
593 /// window into the per-phase stall predicate (see `compute_metrics`);
594 /// the production caller passes the run's
595 /// `MonitorReport::preemption_threshold_ns`. `0` derives it from the
596 /// guest kernel `CONFIG_HZ`.
597 pub fn build(
598 stimulus_events: &[StimulusEvent],
599 monitor_samples: &[MonitorSample],
600 preemption_threshold_ns: u64,
601 ) -> Self {
602 if stimulus_events.is_empty() || monitor_samples.is_empty() {
603 return Self { phases: Vec::new() };
604 }
605
606 let mut events = stimulus_events.to_vec();
607 // Total-order on an elapsed_ms tie: StepEnd before StepStart
608 // (`!is_step_end` is false=0 for StepEnd) so a zero-length
609 // inter-step gap at the guest's coarse-ms clock attributes the
610 // step-local StepStart[k]->StepEnd[k] rate to bucket k, never the
611 // cross-step StepStart[k]->StepStart[k+1] delta. Mirrors the same
612 // sort in build_phase_buckets_with_stimulus so the two rate
613 // consumers stay identical.
614 events.sort_by_key(|e| (e.elapsed_ms, !e.is_step_end));
615
616 // Clock alignment: find the offset between guest stimulus time
617 // and host monitor time. The first stimulus event (ScenarioStart)
618 // and the first monitor sample with plausible data roughly coincide.
619 let first_stimulus_ms = events[0].elapsed_ms;
620 let first_monitor_ms = monitor_samples
621 .iter()
622 .find(|s| s.elapsed_ms > 500 && !s.cpus.is_empty())
623 .map(|s| s.elapsed_ms)
624 .unwrap_or_else(|| monitor_samples.first().map(|s| s.elapsed_ms).unwrap_or(0));
625
626 // offset: add this to a stimulus timestamp to get monitor time
627 let offset = first_monitor_ms as i64 - first_stimulus_ms as i64;
628
629 // Define phase boundaries from consecutive stimulus events.
630 // Each pair (events[i], events[i+1]) bounds a phase.
631 // The last event to end-of-data is also a phase.
632 let last_monitor_ms = monitor_samples.last().map(|s| s.elapsed_ms).unwrap_or(0);
633
634 // The terminal scenario-end event is a rate right
635 // boundary ONLY — it seeds no phase. Extract it explicitly
636 // rather than relying on it sorting last for positional
637 // alignment: a corrupt / out-of-order step `elapsed_ms` (a u32
638 // read off the wire) could otherwise shift it into the middle
639 // of `events` and misalign the dense phase index against the
640 // step events. `step_events` is the phase-bearing set.
641 let terminal: Option<&StimulusEvent> = events.iter().find(|e| e.is_terminal);
642 // StepStart events only — the PHASE-LAYOUT set. Per-step StepEnd
643 // events are excluded here because a StepEnd seeds no new phase
644 // (it is an end-of-hold marker, not a step boundary); including
645 // them would produce a phantom extra phase and misalign the dense
646 // phase index. StepEnd events are NOT discarded, though: the
647 // step-local iteration_rate loop below pairs each StepStart[k]
648 // with its own StepEnd[k] (looked up by step_index in the full
649 // `events` vec), matching build_phase_buckets_with_stimulus. The
650 // dense-index cross-step pairing is kept only as a fallback for
651 // steps that have no StepEnd (a sched-died step, or legacy data
652 // predating the StepEnd frame).
653 let step_events: Vec<&StimulusEvent> = events
654 .iter()
655 .filter(|e| !e.is_terminal && !e.is_step_end)
656 .collect();
657
658 let mut boundaries: Vec<(u64, u64, Option<StimulusEvent>)> = Vec::new();
659 for i in 0..step_events.len() {
660 let start = (step_events[i].elapsed_ms as i64 + offset).max(0) as u64;
661 // The LAST step phase extends to end-of-monitor-data, NOT to
662 // the terminal event: the terminal is a rate boundary only,
663 // and clamping the last phase's metric window to it would
664 // drop trailing monitor samples (the host keeps sampling
665 // through teardown). Preserves the pre-terminal window.
666 let end = if i + 1 < step_events.len() {
667 (step_events[i + 1].elapsed_ms as i64 + offset).max(0) as u64
668 } else {
669 last_monitor_ms.saturating_add(1)
670 };
671 let stimulus = if i == 0 {
672 None
673 } else {
674 Some(step_events[i].clone())
675 };
676 boundaries.push((start, end, stimulus));
677 }
678
679 // Assign monitor samples to phases and compute metrics.
680 let mut phases: Vec<Phase> = Vec::with_capacity(boundaries.len());
681 for (idx, (start, end, stimulus)) in boundaries.into_iter().enumerate() {
682 let phase_samples: Vec<&MonitorSample> = monitor_samples
683 .iter()
684 .filter(|s| s.elapsed_ms >= start && s.elapsed_ms < end && sample_looks_valid(s))
685 .collect();
686
687 let metrics = compute_metrics(&phase_samples, preemption_threshold_ns);
688
689 phases.push(Phase {
690 // Enumerate position over the phase-bearing step_events,
691 // NOT the bucket step_index `phase_from_bucket` uses. This
692 // path's input is a monitor-only / legacy / test stream with
693 // no step_index-bearing Stimulus frames (the settle is
694 // step_events[0]; later events follow), so enumerate IS the
695 // step identity and `index == 0 => BASELINE` holds for this
696 // model. It diverges from the step_index model ONLY for a
697 // step_index-bearing stream with no leading settle (the
698 // production stimulus shape), which never reaches `build`:
699 // build is the monitor-only fallback, taken only when there
700 // are no PhaseBuckets, i.e. no StepStarts
701 // (build_phase_buckets_with_stimulus synthesizes a bucket
702 // per StepStart) — so a step_index-bearing production stream
703 // always takes from_phase_buckets, never this path.
704 index: idx,
705 start_ms: start,
706 end_ms: end,
707 stimulus,
708 metrics,
709 changes: Vec::new(),
710 // Monitor-only path: no per-cgroup carriers, real window.
711 per_cgroup: std::collections::BTreeMap::new(),
712 not_measured_window: false,
713 });
714 }
715
716 // Per-phase iteration rate, STEP-LOCAL: each step's rate is its
717 // own `StepStart[k] -> StepEnd[k]` delta — the step's OWN workers
718 // measured start-to-end-of-hold, matching the snapshot path
719 // (`build_phase_buckets_with_stimulus`). StepEnd events are
720 // present in `events` (emitted independent of snapshot captures)
721 // even on this monitor-only path, so the same step-local model
722 // applies; without it, workers respawned fresh each step read
723 // ~0 -> ~0 cross-step and every fresh-per-step phase but the last
724 // silently reported no throughput. A step with NO StepEnd falls
725 // back to the cross-step successor, or the terminal scenario-end
726 // event for the last step — but that fallback yields a rate only
727 // for legacy/synthetic data (a ScenarioEnd frame present without
728 // per-step StepEnd frames). A sched-died step has neither a
729 // StepEnd nor a terminal (the early return skips both emissions),
730 // so its lookup and fallback both miss and it correctly reports no
731 // rate. Duration is the guest-clock elapsed-ms delta between the
732 // paired events — independent of the metric-sample window above
733 // (whose last phase reaches end-of-monitor-data).
734 #[allow(clippy::needless_range_loop)]
735 for i in 0..phases.len() {
736 let this = step_events[i];
737 // Step-local boundary: this step's own StepEnd (same
738 // step_index). Cross-step successor / terminal only when the
739 // step has no StepEnd.
740 let step_end: Option<&StimulusEvent> = this.step_index.and_then(|k| {
741 events
742 .iter()
743 .find(|e| e.is_step_end && e.step_index == Some(k))
744 });
745 let next: Option<&StimulusEvent> = step_end.or_else(|| {
746 if i + 1 < step_events.len() {
747 Some(step_events[i + 1])
748 } else {
749 terminal
750 }
751 });
752 // Timeline::build's display fallback: compute this phase's rate
753 // directly via rate_to. The metric-pipeline producer
754 // build_phase_buckets_with_stimulus shares the same rate
755 // semantics via rate_components (it emits the two Counter
756 // components that derive_rate_metrics re-pools into
757 // iteration_rate); this display field reads the quotient.
758 if let Some(next_ev) = next
759 && let Some(rate) = this.rate_to(next_ev)
760 {
761 phases[i].metrics.iteration_rate = Some(rate);
762 }
763 }
764
765 // Detect changes at each phase boundary. Throughput is compared
766 // even across synthesized zero-capture steps; monitor-derived
767 // metrics stay gated on both phases having real samples (see
768 // [`detect_boundary_changes`]).
769 for i in 1..phases.len() {
770 let changes = detect_boundary_changes(&phases[i - 1].metrics, &phases[i].metrics);
771 phases[i].changes = changes;
772 }
773
774 Self { phases }
775 }
776
777 /// Format the timeline with a system context header.
778 ///
779 /// Tests without a real context pass `&TimelineContext::default()`;
780 /// the header lines (`kernel:`, `topology:`, etc.) are omitted but
781 /// the `--- timeline ---` prefix is preserved.
782 // No parameterless format() sibling: output with default context
783 // is byte-identical, but the only non-test caller
784 // (crate::test_support::eval) always has real context, so format()
785 // would be dead code.
786 pub fn format_with_context(&self, ctx: &TimelineContext) -> String {
787 if self.phases.is_empty() {
788 return String::new();
789 }
790
791 let mut out = String::from("--- timeline ---\n");
792
793 // Render context header.
794 let mut header_parts = Vec::new();
795 if let Some(ref k) = ctx.kernel {
796 header_parts.push(format!("kernel: {k}"));
797 }
798 if let Some(ref t) = ctx.topology {
799 header_parts.push(format!("topology: {t}"));
800 }
801 if let Some(ref s) = ctx.scheduler {
802 header_parts.push(format!("scheduler: {s}"));
803 }
804 if let Some(ref s) = ctx.scenario {
805 header_parts.push(format!("scenario: {s}"));
806 }
807 if let Some(d) = ctx.duration_s {
808 header_parts.push(format!("duration: {d:.1}s"));
809 }
810 if !header_parts.is_empty() {
811 for part in &header_parts {
812 out.push_str(part);
813 out.push_str(" ");
814 }
815 // Trim trailing " " appended by the last iteration.
816 // Explicit length guard so a future edit that stops
817 // appending the separator here can't underflow.
818 if out.len() >= 2 {
819 out.truncate(out.len() - 2);
820 }
821 out.push('\n');
822 }
823
824 self.format_phases(&mut out);
825 out
826 }
827
828 /// Render phase details into the output buffer.
829 fn format_phases(&self, out: &mut String) {
830 for phase in &self.phases {
831 let duration_ms = phase.end_ms.saturating_sub(phase.start_ms);
832 // An orphan carrier carries a normalized (0,0) window that is NOT a
833 // measured zero-duration step; surface it as not-measured rather
834 // than a misleading 0ms (the None-vs-Some discipline the per-metric
835 // renders use, applied to the window).
836 let window = if phase.not_measured_window {
837 "window not measured".to_string()
838 } else {
839 format!("{duration_ms}ms")
840 };
841
842 if phase.index == 0 {
843 // Phase 0 is the settle window before any stimulus.
844 out.push_str(&format!(
845 "\nBASELINE (settle, {}, {} samples):\n",
846 window, phase.metrics.sample_count,
847 ));
848 } else {
849 let label_start = phase
850 .stimulus
851 .as_ref()
852 .map(|s| {
853 let mut l = s.label.clone();
854 if let Some(op) = &s.op_kind {
855 l.push(' ');
856 l.push_str(op);
857 }
858 l
859 })
860 .unwrap_or_else(|| "?".to_string());
861
862 out.push_str(&format!(
863 "\nPhase {}: {} ({}, {} samples):\n",
864 phase.index, label_start, window, phase.metrics.sample_count,
865 ));
866 }
867
868 let m = &phase.metrics;
869 // Render the metric block whenever the phase carries
870 // monitor-derived metrics, not only when it captured periodic
871 // samples: a SYNTHESIZED zero-capture bucket
872 // (build_phase_buckets_with_stimulus) has sample_count 0 but
873 // fold_monitor_into_bucket fills its imbalance / dsq / fallback
874 // / stall from in-window monitor samples — render them for
875 // parity with the legacy Timeline::build path (which a
876 // zero-capture-with-monitor run took before the synthesize
877 // seam flipped it onto from_phase_buckets).
878 let has_monitor_metrics = m.avg_imbalance.is_some()
879 || m.max_imbalance.is_some()
880 || m.avg_dsq_depth.is_some()
881 || m.avg_nr_running.is_some()
882 || m.max_dsq_depth > 0
883 || m.fallback_rate.is_some()
884 || m.keep_last_rate.is_some()
885 || m.stall_count > 0;
886 if m.sample_count > 0 || has_monitor_metrics {
887 out.push_str(&format!(
888 " imbalance: avg={} max={} | dsq: avg={} max={} | nr_run: avg={}",
889 m.avg_imbalance
890 .map_or_else(|| "n/a".to_string(), |v| format!("{v:.1}")),
891 m.max_imbalance
892 .map_or_else(|| "n/a".to_string(), |v| format!("{v:.1}")),
893 m.avg_dsq_depth
894 .map_or_else(|| "n/a".to_string(), |v| format!("{v:.0}")),
895 m.max_dsq_depth,
896 m.avg_nr_running
897 .map_or_else(|| "n/a".to_string(), |v| format!("{v:.1}")),
898 ));
899 if let Some(fb) = m.fallback_rate {
900 out.push_str(&format!(" | fallback: {:.0}/s", fb));
901 }
902 if let Some(kl) = m.keep_last_rate {
903 out.push_str(&format!(" | keep_last: {:.0}/s", kl));
904 }
905 if let Some(ir) = m.iteration_rate {
906 // A synthesized (sample_count==0) step's rate is
907 // stimulus-derived; label it consistently with the
908 // no-monitor-metrics branch below.
909 let suffix = if m.sample_count == 0 {
910 " (stimulus-derived)"
911 } else {
912 ""
913 };
914 out.push_str(&format!(" | throughput: {ir:.0} iter/s{suffix}"));
915 }
916 out.push('\n');
917 if m.stall_count > 0 {
918 out.push_str(&format!(" stalls: {}\n", m.stall_count));
919 }
920 } else if let Some(ir) = m.iteration_rate {
921 // Synthesized zero-capture step (the
922 // build_phase_buckets_with_stimulus seam): no periodic
923 // captures landed, but the stimulus StepStart/StepEnd
924 // deltas still yield a throughput. Surface it so a short
925 // interior step's recovered rate is visible in the
926 // rendered timeline, not only via the structured
927 // phase_metric API.
928 out.push_str(&format!(
929 " [no samples] | throughput: {ir:.0} iter/s (stimulus-derived)\n"
930 ));
931 } else {
932 out.push_str(" [no samples]\n");
933 }
934
935 format_phase_cgroups(out, &phase.per_cgroup);
936
937 if let Some(ref stim) = phase.stimulus {
938 let detail = stim.detail.as_deref().unwrap_or("");
939 let op = stim.op_kind.as_deref().unwrap_or("?");
940 out.push_str(&format!(" >>> {}: {op}", stim.label));
941 if !detail.is_empty() {
942 out.push_str(&format!(" ({detail})"));
943 }
944 out.push('\n');
945 }
946
947 for change in &phase.changes {
948 let delta = change.after - change.before;
949 let sign = if delta > 0.0 { "+" } else { "" };
950 out.push_str(&format!(
951 " >>> {}: {} {sign}{:.1}\n",
952 change.direction, change.metric, delta,
953 ));
954 }
955 }
956 }
957
958 /// Build a [`Timeline`] from pre-bucketed
959 /// [`crate::assert::PhaseBucket`]s emitted by the metric pipeline.
960 /// Preferred over [`Self::build`] when the caller already has
961 /// `PhaseBucket`s in hand — avoids re-deriving phase boundaries
962 /// from stimulus events + monitor samples by walking the buckets
963 /// directly.
964 ///
965 /// One [`Phase`] is emitted per bucket, in `step_index` order.
966 /// `PhaseMetrics` fields are populated from the bucket's
967 /// `metrics` map via a name-keyed mapping:
968 ///
969 /// | PhaseBucket metric key | PhaseMetrics field |
970 /// |-------------------------|-------------------------|
971 /// | `max_imbalance_ratio` | `max_imbalance` |
972 /// | `avg_imbalance_ratio` | `avg_imbalance` |
973 /// | `max_dsq_depth` | `max_dsq_depth` |
974 /// | `avg_dsq_depth` | `avg_dsq_depth` |
975 /// | `avg_nr_running` | `avg_nr_running` |
976 /// | `stuck_count` | `stall_count` |
977 /// | `total_fallback` | `fallback_rate` (rate) |
978 /// | `total_keep_last` | `keep_last_rate` (rate) |
979 /// | `iteration_rate` | `iteration_rate` |
980 ///
981 /// Rate fields (`fallback_rate`, `keep_last_rate`) are computed
982 /// by dividing the bucket's reduced counter delta by the
983 /// bucket's window duration in seconds
984 /// (`(end_ms - start_ms) / 1000.0`). When the window has zero
985 /// duration (degenerate bucket) the rate stays `None`.
986 ///
987 /// Every PhaseMetrics field has a PhaseBucket source — but
988 /// `iteration_rate` only when build_phase_buckets_with_stimulus
989 /// (not the plain build_phase_buckets) produced the bucket.
990 /// `iteration_rate` requires stimulus events that the per-test
991 /// scenario produces; the plain bucket-builder used by some
992 /// tests doesn't have access to them. Defaults to `None` when
993 /// PhaseBucket.metrics has no `iteration_rate` key.
994 ///
995 /// `changes` (boundary degradation detection) IS computed
996 /// here by diffing adjacent `PhaseMetrics` fields — same
997 /// detection logic [`Self::build`] uses, applied after the
998 /// per-bucket conversion. avg_imbalance + avg_dsq_depth are
999 /// supplied by PhaseBucket so the detection runs on the same
1000 /// fields as the legacy path.
1001 pub fn from_phase_buckets(
1002 phase_buckets: &[crate::assert::PhaseBucket],
1003 stimulus_events: &[StimulusEvent],
1004 _ctx: &TimelineContext,
1005 ) -> Self {
1006 let mut sorted: Vec<&crate::assert::PhaseBucket> = phase_buckets.iter().collect();
1007 sorted.sort_by_key(|b| b.step_index);
1008 // Sort stimulus events by elapsed_ms so correlation finds
1009 // the closest event for each bucket window deterministically.
1010 // The terminal scenario-end event is excluded: it carries no
1011 // step ops/detail to render and its elapsed_ms lands past
1012 // every bucket window, so it would never correlate — filtering
1013 // it keeps the correlation set to real step starts only.
1014 // Per-step StepEnd events are likewise excluded so each bucket's
1015 // rendered op/detail label correlates to the step's defining
1016 // StepStart, not its end-of-hold marker (the bucket's iteration_rate
1017 // is already the step-local value computed upstream).
1018 let mut sorted_events: Vec<&StimulusEvent> = stimulus_events
1019 .iter()
1020 .filter(|e| !e.is_terminal && !e.is_step_end)
1021 .collect();
1022 sorted_events.sort_by_key(|e| e.elapsed_ms);
1023 let mut phases: Vec<Phase> = sorted
1024 .into_iter()
1025 .map(|b| phase_from_bucket(b, &sorted_events))
1026 .collect();
1027 // Boundary-change detection — shares [`detect_boundary_changes`]
1028 // with [`Self::build`]. Walks each adjacent (prev, curr) pair and
1029 // records significant deltas on the LATER phase's `changes` vec so
1030 // the operator sees "what changed when entering this phase".
1031 // Throughput is compared even when a side is a SYNTHESIZED
1032 // zero-capture bucket (build_phase_buckets_with_stimulus): its
1033 // `iteration_rate` is stimulus-derived and real, so a throughput
1034 // collapse entering or leaving it must surface. The monitor-derived
1035 // metrics (imbalance / dsq depth / fallback / keep_last) stay gated
1036 // inside the helper on both phases having real samples, so a
1037 // partial-metric phase never paints a phantom non-throughput change.
1038 // An orphan phase (a not-measured (0,0) window with all-None
1039 // PhaseMetrics) sits between its neighbors here, so detect_boundary_changes
1040 // compares each real neighbor against the orphan's None metrics — which
1041 // the helper gates away (both sides must be Some) — rather than across
1042 // the unmeasured window. INTENTIONAL: there is no data for the orphan's
1043 // step, so flagging a phase-k-1 -> phase-k+1 change as if they were
1044 // adjacent would assert a transition over an unmeasured interval. This is
1045 // render-only (Phase.changes has no verdict/sidecar/A-B consumer).
1046 for i in 1..phases.len() {
1047 let changes = detect_boundary_changes(&phases[i - 1].metrics, &phases[i].metrics);
1048 phases[i].changes = changes;
1049 }
1050 Self { phases }
1051 }
1052
1053 /// Test helper — collect all degradation changes across phases.
1054 /// Retained after the gauntlet analyzer was removed; the scenarios
1055 /// pipeline consumes `Timeline` via `format_with_context` and does
1056 /// not read degradations directly.
1057 #[cfg(test)]
1058 pub fn degradations(&self) -> Vec<(&Phase, &PhaseChange)> {
1059 let mut out = Vec::new();
1060 for phase in &self.phases {
1061 for change in &phase.changes {
1062 if change.direction == ChangeDirection::Degraded {
1063 out.push((phase, change));
1064 }
1065 }
1066 }
1067 out
1068 }
1069}
1070
1071// ---------------------------------------------------------------------------
1072// PhaseBucket → Phase conversion
1073// ---------------------------------------------------------------------------
1074
1075/// Build a [`Phase`] from a [`crate::assert::PhaseBucket`]. The phase
1076/// index is the bucket's `step_index` (BASELINE = 0, scenario Step k =
1077/// k + 1), NOT the enumerate position in the vec — so `format_phases`
1078/// keys its BASELINE-vs-Step label on the true phase identity, and a run
1079/// whose first bucket is a Step (no BASELINE bucket, e.g. under
1080/// `--cell-parent-cgroup` where BASELINE captured nothing) renders that
1081/// Step correctly rather than mislabeling it as BASELINE. The metric map
1082/// is projected onto the named `PhaseMetrics` fields per the table in
1083/// [`Timeline::from_phase_buckets`]. BASELINE (`step_index` 0) emits
1084/// `stimulus = None`; later phases synthesize a [`StimulusEvent`] whose
1085/// label / op_kind come from the bucket label so the failure-message
1086/// renderer prints a recognizable phase header.
1087fn phase_from_bucket(b: &crate::assert::PhaseBucket, sorted_events: &[&StimulusEvent]) -> Phase {
1088 let duration_s = if b.end_ms > b.start_ms {
1089 (b.end_ms - b.start_ms) as f64 / 1000.0
1090 } else {
1091 0.0
1092 };
1093 // Rate computation: counter-delta / duration_s. duration_s == 0
1094 // disables the rate (None) — degenerate buckets shouldn't
1095 // produce spurious infinities.
1096 let rate = |key: &str| -> Option<f64> {
1097 if duration_s <= 0.0 {
1098 return None;
1099 }
1100 b.metrics.get(key).map(|v| v / duration_s)
1101 };
1102 let metrics = PhaseMetrics {
1103 sample_count: b.sample_count,
1104 avg_imbalance: b.metrics.get("avg_imbalance_ratio").copied(),
1105 max_imbalance: b.metrics.get("max_imbalance_ratio").copied(),
1106 avg_dsq_depth: b.metrics.get("avg_dsq_depth").copied(),
1107 avg_nr_running: b.metrics.get("avg_nr_running").copied(),
1108 max_dsq_depth: b
1109 .metrics
1110 .get("max_dsq_depth")
1111 .map(|v| v.round() as u32)
1112 .unwrap_or(0),
1113 stall_count: b
1114 .metrics
1115 .get("stuck_count")
1116 .map(|v| v.round() as usize)
1117 .unwrap_or(0),
1118 fallback_rate: rate("total_fallback"),
1119 keep_last_rate: rate("total_keep_last"),
1120 // iteration_rate is a derived Rate: derive_rate_metrics already
1121 // placed the Σiterations/Σseconds quotient into the bucket map, so
1122 // read it verbatim — do NOT divide by duration (unlike
1123 // fallback_rate / keep_last_rate above, which divide their Counter
1124 // by the window).
1125 iteration_rate: b.metrics.get("iteration_rate").copied(),
1126 };
1127 let stimulus = if b.step_index == 0 {
1128 None
1129 } else {
1130 // Correlate with the closest StimulusEvent whose
1131 // elapsed_ms falls in [start_ms, end_ms]. Carrying the
1132 // real event preserves op_kind + detail in the failure-
1133 // message timeline render — `phase_from_bucket`'s prior
1134 // synthesis of a placeholder StimulusEvent with op_kind
1135 // = None / detail = None produced "Step[N]: ?" headers
1136 // that lost the operator-facing per-phase context the
1137 // legacy Timeline::build path carried.
1138 let correlated = sorted_events.iter().find(|e| {
1139 if b.start_ms == b.end_ms {
1140 e.elapsed_ms == b.start_ms
1141 } else {
1142 e.elapsed_ms >= b.start_ms && e.elapsed_ms < b.end_ms
1143 }
1144 });
1145 match correlated {
1146 Some(ev) => Some((*ev).clone()),
1147 None => Some(StimulusEvent {
1148 elapsed_ms: b.start_ms,
1149 label: b.label.clone(),
1150 op_kind: None,
1151 detail: None,
1152 total_iterations: None,
1153 // Synthetic placeholder for a bucket with no
1154 // correlated stimulus event; no authoritative step
1155 // ordinal to carry.
1156 step_index: None,
1157 is_terminal: false,
1158 is_step_end: false,
1159 }),
1160 }
1161 };
1162 // Orphan signature: fold_guest_per_cgroup_into_host_buckets normalizes a
1163 // guest carrier with no paired host bucket to a (0,0) window carrying ONLY
1164 // per_cgroup (empty metrics). A captured bucket has metrics, so the
1165 // (0,0)+empty-metrics+non-empty-per_cgroup shape is the orphan arm's on
1166 // every NON-zero-duration window. The one other producer is a ZERO-duration
1167 // step at scenario start (StepStart[k] == StepEnd[k] == 0 -> a synthesized
1168 // host (0,0) window with empty metrics, since duration 0 yields no rate and
1169 // no in-window monitor sample, then MATCHED with a same-step guest carrier),
1170 // but that collision is HARMLESS: a zero-duration step has no window to
1171 // measure, so "window not measured" reads the same as the "0ms" it would
1172 // otherwise show. Display-only, with no verdict/sidecar consumer,
1173 // so the marker needs no serialized PhaseBucket flag.
1174 let not_measured_window =
1175 b.start_ms == 0 && b.end_ms == 0 && b.metrics.is_empty() && !b.per_cgroup.is_empty();
1176 Phase {
1177 index: b.step_index as usize,
1178 start_ms: b.start_ms,
1179 end_ms: b.end_ms,
1180 stimulus,
1181 metrics,
1182 changes: Vec::new(),
1183 per_cgroup: b.per_cgroup.clone(),
1184 not_measured_window,
1185 }
1186}
1187
1188/// Cap on per-cgroup lines rendered per phase, bounding the failure-message
1189/// size for a many-cgroup scenario (the sched_log render caps similarly).
1190/// Truncation is by BTreeMap NAME order (deterministic, no ranking math); a
1191/// "+J more" note records the drop so the cap never reads as "all cgroups".
1192const MAX_RENDERED_CGROUPS: usize = 16;
1193
1194/// Render the per-cgroup sub-block for one phase: one line per cgroup in
1195/// BTreeMap (name) order, reduced at display time via the
1196/// [`crate::assert::PhaseCgroupStats`] summaries. Empty `per_cgroup` (the
1197/// monitor-only path, or a phase that carried no per-cgroup components)
1198/// renders nothing. The None-vs-Some(0.0)
1199/// discipline carries through: an absent off-CPU reduction renders `n/a` (NOT
1200/// `0.0%`), and an empty wake / run-delay pool OMITS that segment rather than
1201/// painting a misleading `0µs`.
1202fn format_phase_cgroups(
1203 out: &mut String,
1204 per_cgroup: &std::collections::BTreeMap<String, crate::assert::PhaseCgroupStats>,
1205) {
1206 if per_cgroup.is_empty() {
1207 return;
1208 }
1209 out.push_str(" per-cgroup:\n");
1210 for (name, pcg) in per_cgroup.iter().take(MAX_RENDERED_CGROUPS) {
1211 out.push_str(&format!(" {name}: "));
1212 // A stripped carrier had its raw sample vectors dropped to fit the bulk
1213 // frame, so off-cpu / wake / run-delay summaries are all absent — but
1214 // that is NOT "not measured". Surface the size-limit drop explicitly so
1215 // the operator does not read it as a quiet cgroup.
1216 if pcg.stripped {
1217 out.push_str("samples stripped (size limit)");
1218 } else {
1219 match pcg.off_cpu_summary() {
1220 Some((avg, min, max, spread)) => out.push_str(&format!(
1221 "off-cpu avg={avg:.1}% min={min:.1}% max={max:.1}% spread={spread:.1}%"
1222 )),
1223 None => out.push_str("off-cpu n/a"),
1224 }
1225 if let Some((p99, median)) = pcg.wake_summary() {
1226 out.push_str(&format!(
1227 " | wake p99={p99:.0}\u{00b5}s median={median:.0}\u{00b5}s"
1228 ));
1229 }
1230 if let Some((mean, worst)) = pcg.run_delay_summary() {
1231 out.push_str(&format!(
1232 " | run-delay mean={mean:.0}\u{00b5}s worst={worst:.0}\u{00b5}s"
1233 ));
1234 }
1235 }
1236 out.push_str(&format!(
1237 " | iters={} migrations={}",
1238 pcg.total_iterations, pcg.total_migrations
1239 ));
1240 // Gap is a Peak with no Option: 0 means "no notable gap", so omit it
1241 // rather than print a noisy gap=0ms on every quiet cgroup.
1242 if pcg.max_gap_ms > 0 {
1243 out.push_str(&format!(
1244 " | gap={}ms@cpu{}",
1245 pcg.max_gap_ms, pcg.max_gap_cpu
1246 ));
1247 }
1248 out.push('\n');
1249 }
1250 let total = per_cgroup.len();
1251 if total > MAX_RENDERED_CGROUPS {
1252 let dropped = total - MAX_RENDERED_CGROUPS;
1253 let noun = if dropped == 1 { "cgroup" } else { "cgroups" };
1254 out.push_str(&format!(" (+{dropped} more {noun})\n"));
1255 }
1256}
1257
1258// ---------------------------------------------------------------------------
1259// Metric computation
1260// ---------------------------------------------------------------------------
1261
1262/// Reduce a phase's monitor samples to [`PhaseMetrics`].
1263///
1264/// `preemption_threshold_ns` is the vCPU-preemption exemption window for
1265/// stall detection: a non-advancing `rq_clock` on a CPU whose
1266/// `vcpu_cpu_time_ns` advanced by less than this is a host-preemption
1267/// artifact, not a scheduler stall, and is exempt. Pass `0` to derive it
1268/// from the guest kernel's `CONFIG_HZ` via
1269/// `crate::monitor::vcpu_preemption_threshold_ns` — the same resolution
1270/// [`MonitorSummary::from_samples_with_threshold`](crate::monitor::MonitorSummary::from_samples_with_threshold)
1271/// applies, so the per-phase `stall_count` applies the SAME per-(CPU,
1272/// window) `is_cpu_stuck` predicate as the run-level
1273/// `MonitorSummary::stuck_count` (run-level `>=` Σ per-phase: it also
1274/// windows across phase boundaries).
1275pub(crate) fn compute_metrics(
1276 samples: &[&MonitorSample],
1277 preemption_threshold_ns: u64,
1278) -> PhaseMetrics {
1279 if samples.is_empty() {
1280 return PhaseMetrics::default();
1281 }
1282
1283 // Filter out samples with implausible data (e.g. garbage DSQ depths
1284 // from uninitialized guest memory) before computing metrics.
1285 let valid: Vec<&MonitorSample> = samples
1286 .iter()
1287 .copied()
1288 .filter(|s| !s.cpus.is_empty() && sample_looks_valid(s))
1289 .collect();
1290
1291 if valid.is_empty() {
1292 return PhaseMetrics {
1293 sample_count: 0,
1294 ..PhaseMetrics::default()
1295 };
1296 }
1297
1298 let mut total_imbalance = 0.0f64;
1299 let mut max_imbalance = 0.0f64;
1300 let mut total_dsq = 0.0f64;
1301 let mut total_nr_running = 0.0f64;
1302 let mut max_dsq = 0u32;
1303 let mut stall_count = 0usize;
1304
1305 for sample in &valid {
1306 for cpu in &sample.cpus {
1307 max_dsq = max_dsq.max(cpu.local_dsq_depth);
1308 }
1309 let ratio = sample.imbalance_ratio();
1310 total_imbalance += ratio;
1311 if ratio > max_imbalance {
1312 max_imbalance = ratio;
1313 }
1314
1315 let avg_dsq_this: f64 = sample
1316 .cpus
1317 .iter()
1318 .map(|c| c.local_dsq_depth as f64)
1319 .sum::<f64>()
1320 / sample.cpus.len() as f64;
1321 total_dsq += avg_dsq_this;
1322
1323 // Per-sample mean of per-CPU nr_running (full-class runqueue depth),
1324 // averaged over samples below — the avg_dsq_depth shape. `valid`
1325 // guarantees `!cpus.is_empty()`, so the divisor is nonzero.
1326 let avg_nr_this: f64 =
1327 sample.cpus.iter().map(|c| c.nr_running as f64).sum::<f64>() / sample.cpus.len() as f64;
1328 total_nr_running += avg_nr_this;
1329 }
1330
1331 // Stall detection between consecutive valid samples in this phase.
1332 // Route through `is_cpu_stuck` (the shared predicate the run-level
1333 // `MonitorSummary` path also uses) so the per-phase stall count and the
1334 // run-level stuck count apply the identical NOHZ-idle and
1335 // vCPU-preemption exemptions — the per-phase count uses the SAME
1336 // predicate as the run-level one (run-level `>=` Σ per-phase: it also
1337 // counts the boundary-straddling window pair and out-of-phase samples).
1338 let threshold = if preemption_threshold_ns > 0 {
1339 preemption_threshold_ns
1340 } else {
1341 crate::monitor::vcpu_preemption_threshold_ns(None)
1342 };
1343 for w in valid.windows(2) {
1344 let prev = w[0];
1345 let curr = w[1];
1346 let cpu_count = prev.cpus.len().min(curr.cpus.len());
1347 for cpu in 0..cpu_count {
1348 if crate::monitor::reader::is_cpu_stuck(&prev.cpus[cpu], &curr.cpus[cpu], threshold) {
1349 stall_count += 1;
1350 }
1351 }
1352 }
1353
1354 // Event counter rates: sum counters across CPUs for first/last valid
1355 // samples that have event_counters, compute delta / duration.
1356 let has_events = |s: &&MonitorSample| s.cpus.iter().any(|c| c.event_counters.is_some());
1357 let first_ev = valid.iter().copied().find(|s| has_events(s));
1358 let last_ev = valid.iter().copied().rev().find(|s| has_events(s));
1359
1360 let (fallback_rate, keep_last_rate) = match (first_ev, last_ev) {
1361 (Some(first), Some(last)) if first.elapsed_ms < last.elapsed_ms => {
1362 // `<` guard above is expected to rule out underflow, but
1363 // `saturating_sub` is defense-in-depth: if a future change
1364 // loosens the guard, the worst outcome becomes
1365 // `duration_s == 0.0` (which disables the rate below) rather
1366 // than a panic.
1367 let duration_s = last.elapsed_ms.saturating_sub(first.elapsed_ms) as f64 / 1000.0;
1368 // Event counters can reset mid-run (scheduler restart) and
1369 // produce a negative raw delta. Shared helper clamps to
1370 // >= 0 so the computed rate never goes negative; same
1371 // semantics as MonitorSummary::compute_event_deltas.
1372 let fb_delta = crate::monitor::counter_delta(
1373 last.sum_event_field(|e| e.select_cpu_fallback).unwrap_or(0),
1374 first
1375 .sum_event_field(|e| e.select_cpu_fallback)
1376 .unwrap_or(0),
1377 );
1378 let kl_delta = crate::monitor::counter_delta(
1379 last.sum_event_field(|e| e.dispatch_keep_last).unwrap_or(0),
1380 first.sum_event_field(|e| e.dispatch_keep_last).unwrap_or(0),
1381 );
1382 (
1383 Some(fb_delta as f64 / duration_s),
1384 Some(kl_delta as f64 / duration_s),
1385 )
1386 }
1387 _ => (None, None),
1388 };
1389
1390 let valid_count = valid.len();
1391 let n = valid_count as f64;
1392 // None when no valid samples — avoids a 0.0/0.0 NaN and keeps "no
1393 // data" distinct from a real zero (the detector skips None sides).
1394 PhaseMetrics {
1395 sample_count: valid_count,
1396 avg_imbalance: (valid_count > 0).then(|| total_imbalance / n),
1397 max_imbalance: (valid_count > 0).then_some(max_imbalance),
1398 avg_dsq_depth: (valid_count > 0).then(|| total_dsq / n),
1399 avg_nr_running: (valid_count > 0).then(|| total_nr_running / n),
1400 max_dsq_depth: max_dsq,
1401 stall_count,
1402 fallback_rate,
1403 keep_last_rate,
1404 iteration_rate: None,
1405 }
1406}
1407
1408// ---------------------------------------------------------------------------
1409// Tests
1410// ---------------------------------------------------------------------------
1411
1412#[cfg(test)]
1413mod tests {
1414 use super::*;
1415 use crate::monitor::{CpuSnapshot, MonitorSample};
1416
1417 /// `StimulusEvent::phase()` maps the 1-indexed wire `step_index` onto the
1418 /// canonical [`crate::assert::Phase`] (StepStart and StepEnd of step `k`
1419 /// both -> `Phase::step(k)`); the scenario-end terminal seeds no phase.
1420 #[test]
1421 fn stimulus_event_phase_maps_wire_step_index_to_phase() {
1422 use crate::assert::Phase;
1423 // Wire step_index 1 (Step 0) -> Phase::step(0); 2 (Step 1) -> step(1).
1424 assert_eq!(
1425 StimulusEvent::from_wire(&wire_event(0, 1, 0)).phase(),
1426 Some(Phase::step(0)),
1427 );
1428 assert_eq!(
1429 StimulusEvent::from_wire(&wire_event(100, 2, 50)).phase(),
1430 Some(Phase::step(1)),
1431 );
1432 // StepEnd carries the same step_index -> same Phase as its StepStart.
1433 assert_eq!(
1434 StimulusEvent::from_step_end(&wire_event(200, 2, 90)).phase(),
1435 Some(Phase::step(1)),
1436 );
1437 // The terminal boundary is not a step -> no Phase.
1438 assert_eq!(StimulusEvent::terminal(300, 100).phase(), None);
1439 }
1440
1441 fn sample(elapsed_ms: u64, cpus: Vec<(u32, u32, u64)>) -> MonitorSample {
1442 MonitorSample {
1443 bpf_map_fields: Vec::new(),
1444 prog_stats: None,
1445 psi_irq: None,
1446 elapsed_ms,
1447 cpus: cpus
1448 .into_iter()
1449 .map(|(nr_running, dsq, rq_clock)| CpuSnapshot {
1450 nr_running,
1451 scx_nr_running: 0,
1452 local_dsq_depth: dsq,
1453 rq_clock,
1454 scx_flags: 0,
1455 event_counters: None,
1456 schedstat: None,
1457 vcpu_cpu_time_ns: None,
1458 vcpu_perf: None,
1459 avg_irq_util: None,
1460 sched_domains: None,
1461 })
1462 .collect(),
1463 }
1464 }
1465
1466 fn stimulus(elapsed_ms: u64, label: &str) -> StimulusEvent {
1467 StimulusEvent {
1468 elapsed_ms,
1469 label: label.to_string(),
1470 op_kind: None,
1471 detail: None,
1472 total_iterations: None,
1473 step_index: None,
1474 is_terminal: false,
1475 is_step_end: false,
1476 }
1477 }
1478
1479 #[test]
1480 fn empty_inputs_empty_timeline() {
1481 let t = Timeline::build(&[], &[], 0);
1482 assert!(t.phases.is_empty());
1483 }
1484
1485 #[test]
1486 fn no_stimulus_empty_timeline() {
1487 let samples = vec![sample(1000, vec![(2, 1, 100)])];
1488 let t = Timeline::build(&[], &samples, 0);
1489 assert!(t.phases.is_empty());
1490 }
1491
1492 #[test]
1493 fn no_monitor_empty_timeline() {
1494 let events = vec![stimulus(0, "ScenarioStart")];
1495 let t = Timeline::build(&events, &[], 0);
1496 assert!(t.phases.is_empty());
1497 }
1498
1499 #[test]
1500 fn single_event_single_phase() {
1501 let events = vec![stimulus(0, "ScenarioStart")];
1502 let samples = vec![
1503 sample(600, vec![(2, 1, 100), (2, 1, 200)]),
1504 sample(700, vec![(2, 1, 300), (2, 1, 400)]),
1505 ];
1506 let t = Timeline::build(&events, &samples, 0);
1507 assert_eq!(t.phases.len(), 1);
1508 // Both samples — including the one AT last_monitor_ms (700) —
1509 // must fall inside the single phase's [start, last_monitor_ms+1)
1510 // window. A > 0 check passes even if the last-sample-inclusion
1511 // off-by-one (end = last_monitor_ms+1) regressed to +0, dropping
1512 // the 700 sample. Pin the exact count.
1513 assert_eq!(t.phases[0].metrics.sample_count, 2);
1514 }
1515
1516 #[test]
1517 fn two_events_two_phases() {
1518 let events = vec![stimulus(0, "ScenarioStart"), stimulus(3000, "StepStart[0]")];
1519 let samples: Vec<MonitorSample> = (5..65)
1520 .map(|i| sample(i * 100, vec![(2, 1, i * 1000), (2, 1, i * 1000 + 100)]))
1521 .collect();
1522 let t = Timeline::build(&events, &samples, 0);
1523 assert_eq!(t.phases.len(), 2);
1524 // Pin WHERE the boundary fell, not just non-emptiness: 60 samples
1525 // at i*100 (i in 5..65 → 500..6400); the >500 warmup drops the
1526 // 500 sample (i=5), leaving 59. The StepStart[0]@3000 boundary
1527 // (offset-adjusted) splits them 30/29. A > 0 check passes even if
1528 // the offset/boundary math shifted the split point while leaving
1529 // samples on both sides.
1530 assert_eq!(t.phases[0].metrics.sample_count, 30);
1531 assert_eq!(t.phases[1].metrics.sample_count, 29);
1532 assert_eq!(
1533 t.phases[0].metrics.sample_count + t.phases[1].metrics.sample_count,
1534 59,
1535 "59 = 60 samples minus the 500ms sample dropped by the >500 warmup",
1536 );
1537 }
1538
1539 #[test]
1540 fn improvement_detected() {
1541 // Phase 0: imbalanced
1542 // Phase 1: balanced
1543 let events = vec![stimulus(0, "ScenarioStart"), stimulus(1000, "StepStart[0]")];
1544 let mut samples = Vec::new();
1545 for i in 5..15 {
1546 samples.push(sample(
1547 i * 100,
1548 vec![(1, 1, i * 1000), (5, 1, i * 1000 + 100)],
1549 ));
1550 }
1551 for i in 15..25 {
1552 samples.push(sample(
1553 i * 100,
1554 vec![(2, 1, i * 1000), (2, 1, i * 1000 + 100)],
1555 ));
1556 }
1557 let t = Timeline::build(&events, &samples, 0);
1558 let improvements: Vec<_> = t
1559 .phases
1560 .iter()
1561 .flat_map(|p| p.changes.iter())
1562 .filter(|c| c.direction == ChangeDirection::Improved)
1563 .collect();
1564 assert!(!improvements.is_empty());
1565 }
1566
1567 #[test]
1568 fn format_non_empty() {
1569 let events = vec![stimulus(0, "ScenarioStart"), stimulus(1000, "StepStart[0]")];
1570 let samples: Vec<MonitorSample> = (5..25)
1571 .map(|i| sample(i * 100, vec![(2, 1, i * 1000), (2, 1, i * 1000 + 100)]))
1572 .collect();
1573 let t = Timeline::build(&events, &samples, 0);
1574 let formatted = t.format_with_context(&TimelineContext::default());
1575 assert!(formatted.contains("BASELINE"));
1576 assert!(formatted.contains("Phase 1"));
1577 assert!(formatted.contains("imbalance"));
1578 }
1579
1580 /// A synthesized zero-capture step (sample_count==0) still renders its
1581 /// stimulus-derived throughput in the formatted timeline, not only
1582 /// "[no samples]". Pins the synthesized-step visibility in format_phases. The
1583 /// BASELINE bucket holds step_index 0 (its phase index, the settle
1584 /// render) so the synthesized step lands at a Phase index that takes
1585 /// the metric path.
1586 #[test]
1587 fn format_renders_synthesized_step_throughput() {
1588 let buckets = vec![
1589 crate::assert::PhaseBucket {
1590 per_cgroup: Default::default(),
1591 step_index: 0,
1592 label: "BASELINE".to_string(),
1593 start_ms: 0,
1594 end_ms: 1000,
1595 sample_count: 2,
1596 metrics: std::collections::BTreeMap::new(),
1597 },
1598 crate::assert::PhaseBucket {
1599 per_cgroup: Default::default(),
1600 step_index: 1,
1601 label: "Step[0]".to_string(),
1602 start_ms: 1000,
1603 end_ms: 2000,
1604 sample_count: 0, // synthesized zero-capture step
1605 metrics: std::collections::BTreeMap::from([("iteration_rate".to_string(), 1500.0)]),
1606 },
1607 ];
1608 let t = Timeline::from_phase_buckets(&buckets, &[], &TimelineContext::default());
1609 let formatted = t.format_with_context(&TimelineContext::default());
1610 assert!(
1611 formatted.contains("throughput: 1500 iter/s (stimulus-derived)"),
1612 "a synthesized step must render its stimulus-derived throughput, \
1613 not only '[no samples]'; got:\n{formatted}",
1614 );
1615 }
1616
1617 // -- orphan not-measured marker + per-cgroup sub-block render --
1618
1619 /// An orphan bucket — the unique (0,0)-window + empty-metrics +
1620 /// non-empty-per_cgroup shape from the fold's orphan arm — renders "window
1621 /// not measured", NOT a misleading "0ms".
1622 #[test]
1623 fn format_renders_orphan_window_as_not_measured() {
1624 let mut per_cgroup = std::collections::BTreeMap::new();
1625 per_cgroup.insert(
1626 "cg".to_string(),
1627 crate::assert::PhaseCgroupStats {
1628 off_cpu_pcts: vec![80.0],
1629 total_iterations: 900_000,
1630 ..Default::default()
1631 },
1632 );
1633 let buckets = vec![crate::assert::PhaseBucket {
1634 per_cgroup,
1635 step_index: 1,
1636 label: "Step[0]".to_string(),
1637 start_ms: 0,
1638 end_ms: 0,
1639 sample_count: 0,
1640 metrics: std::collections::BTreeMap::new(),
1641 }];
1642 let t = Timeline::from_phase_buckets(&buckets, &[], &TimelineContext::default());
1643 let formatted = t.format_with_context(&TimelineContext::default());
1644 assert!(
1645 formatted.contains("window not measured"),
1646 "orphan window must render not-measured; got:\n{formatted}",
1647 );
1648 assert!(
1649 !formatted.contains("(0ms,"),
1650 "orphan must NOT render as 0ms; got:\n{formatted}",
1651 );
1652 // The whole point of routing orphan carriers through the render (vs the
1653 // pre-fold path that dropped them) is that their per-cgroup telemetry —
1654 // an orphan's ONLY payload — SURFACES alongside the not-measured marker.
1655 assert!(
1656 formatted.contains("per-cgroup:"),
1657 "orphan's per-cgroup sub-block must render; got:\n{formatted}",
1658 );
1659 assert!(
1660 formatted.contains("cg: off-cpu avg=80.0%"),
1661 "orphan carrier's off-cpu reduction must render; got:\n{formatted}",
1662 );
1663 assert!(
1664 formatted.contains("iters=900000"),
1665 "orphan carrier's counters must render; got:\n{formatted}",
1666 );
1667 // The cg carrier has off-cpu but NO wake/run-delay pools — pin the
1668 // per-line omit when off-cpu is the sole reduction present (the most
1669 // likely real orphan-carrier shape).
1670 let cg_line = formatted
1671 .lines()
1672 .find(|l| l.contains("cg: off-cpu"))
1673 .expect("cg line");
1674 assert!(!cg_line.contains("wake p99="), "got:\n{formatted}");
1675 assert!(!cg_line.contains("run-delay mean="), "got:\n{formatted}");
1676 }
1677
1678 /// Orphan-adjacency suppression: an orphan phase (all-None
1679 /// PhaseMetrics) BETWEEN two real phases makes detect_boundary_changes
1680 /// compare each real neighbor against the orphan's gated-away None metrics,
1681 /// NOT across the unmeasured window — so a step1->step3 throughput collapse
1682 /// is NOT flagged on step3 (no data for the intervening orphan step).
1683 /// INTENTIONAL + render-only; pins the documented from_phase_buckets behavior.
1684 #[test]
1685 fn format_orphan_phase_suppresses_cross_orphan_change_detection() {
1686 let real = |step: u16, rate: f64| crate::assert::PhaseBucket {
1687 per_cgroup: Default::default(),
1688 step_index: step,
1689 label: format!("Step[{}]", step - 1),
1690 start_ms: step as u64 * 1000,
1691 end_ms: step as u64 * 1000 + 500,
1692 sample_count: 5,
1693 metrics: std::collections::BTreeMap::from([("iteration_rate".to_string(), rate)]),
1694 };
1695 let mut orphan_pc = std::collections::BTreeMap::new();
1696 orphan_pc.insert(
1697 "cg".to_string(),
1698 crate::assert::PhaseCgroupStats {
1699 off_cpu_pcts: vec![50.0],
1700 ..Default::default()
1701 },
1702 );
1703 let orphan = crate::assert::PhaseBucket {
1704 per_cgroup: orphan_pc,
1705 step_index: 2,
1706 label: "Step[1]".to_string(),
1707 start_ms: 0,
1708 end_ms: 0,
1709 sample_count: 0,
1710 metrics: std::collections::BTreeMap::new(),
1711 };
1712 // CONTROL: step1 (rate 10000) adjacent to step3 (rate 1000) — a 90%
1713 // collapse (> the 30% rel threshold) IS flagged as a change on step3.
1714 let ctrl = Timeline::from_phase_buckets(
1715 &[real(1, 10000.0), real(3, 1000.0)],
1716 &[],
1717 &TimelineContext::default(),
1718 );
1719 let ctrl_step3 = ctrl.phases.iter().find(|p| p.index == 3).expect("step3");
1720 assert!(
1721 !ctrl_step3.changes.is_empty(),
1722 "control: adjacent step1->step3 throughput collapse IS flagged; got: {:?}",
1723 ctrl_step3.changes,
1724 );
1725 // With the orphan between, step3 is compared against the orphan's None
1726 // metrics -> the change is suppressed (no data for the gap).
1727 let t = Timeline::from_phase_buckets(
1728 &[real(1, 10000.0), orphan, real(3, 1000.0)],
1729 &[],
1730 &TimelineContext::default(),
1731 );
1732 let step3 = t.phases.iter().find(|p| p.index == 3).expect("step3");
1733 assert!(
1734 step3.changes.is_empty(),
1735 "orphan between suppresses the cross-orphan change; got: {:?}",
1736 step3.changes,
1737 );
1738 assert!(
1739 t.format_with_context(&TimelineContext::default())
1740 .contains("window not measured"),
1741 "the orphan still renders not-measured",
1742 );
1743 }
1744
1745 /// The orphan signature is GUARDED on empty metrics: a (0,0)-window bucket
1746 /// that carries metrics is a captured bucket (or a measured zero-duration
1747 /// step), NOT an orphan — it renders 0ms, never "window not measured".
1748 #[test]
1749 fn format_does_not_mark_not_measured_when_metrics_present() {
1750 let mut per_cgroup = std::collections::BTreeMap::new();
1751 per_cgroup.insert(
1752 "cg".to_string(),
1753 crate::assert::PhaseCgroupStats {
1754 off_cpu_pcts: vec![50.0],
1755 ..Default::default()
1756 },
1757 );
1758 let buckets = vec![crate::assert::PhaseBucket {
1759 per_cgroup,
1760 step_index: 1,
1761 label: "Step[0]".to_string(),
1762 start_ms: 0,
1763 end_ms: 0,
1764 sample_count: 1,
1765 metrics: std::collections::BTreeMap::from([("avg_imbalance_ratio".to_string(), 1.0)]),
1766 }];
1767 let t = Timeline::from_phase_buckets(&buckets, &[], &TimelineContext::default());
1768 let formatted = t.format_with_context(&TimelineContext::default());
1769 assert!(
1770 !formatted.contains("window not measured"),
1771 "a (0,0) window WITH metrics is captured, not an orphan; got:\n{formatted}",
1772 );
1773 assert!(
1774 formatted.contains("(0ms,"),
1775 "a measured zero-duration window renders 0ms; got:\n{formatted}",
1776 );
1777 }
1778
1779 /// The per-cgroup sub-block renders one line per cgroup (name order), with
1780 /// the None-vs-Some discipline: a not-measured off-CPU reduction renders
1781 /// `n/a` (not `0.0%`), and an empty wake/run-delay pool omits that segment.
1782 #[test]
1783 fn format_renders_per_cgroup_subblock_none_aware() {
1784 let mut per_cgroup = std::collections::BTreeMap::new();
1785 per_cgroup.insert(
1786 "cg_a".to_string(),
1787 crate::assert::PhaseCgroupStats {
1788 off_cpu_pcts: vec![80.0, 84.0],
1789 wake_latencies_ns: vec![100_000, 120_000],
1790 run_delays_ns: vec![45_000],
1791 total_iterations: 900_000,
1792 total_migrations: 12,
1793 max_gap_ms: 8,
1794 max_gap_cpu: 3,
1795 ..Default::default()
1796 },
1797 );
1798 per_cgroup.insert(
1799 "cg_b".to_string(),
1800 crate::assert::PhaseCgroupStats {
1801 off_cpu_pcts: vec![],
1802 total_iterations: 80_000,
1803 ..Default::default()
1804 },
1805 );
1806 let buckets = vec![crate::assert::PhaseBucket {
1807 per_cgroup,
1808 step_index: 1,
1809 label: "Step[0]".to_string(),
1810 start_ms: 1000,
1811 end_ms: 6000,
1812 sample_count: 10,
1813 metrics: std::collections::BTreeMap::from([("avg_imbalance_ratio".to_string(), 1.5)]),
1814 }];
1815 let t = Timeline::from_phase_buckets(&buckets, &[], &TimelineContext::default());
1816 let formatted = t.format_with_context(&TimelineContext::default());
1817 assert!(
1818 formatted.contains("per-cgroup:"),
1819 "sub-block header; got:\n{formatted}"
1820 );
1821 assert!(
1822 formatted.contains("cg_a: off-cpu avg=82.0% min=80.0% max=84.0% spread=4.0%"),
1823 "got:\n{formatted}",
1824 );
1825 assert!(
1826 formatted.contains("wake p99="),
1827 "wake segment present; got:\n{formatted}"
1828 );
1829 assert!(
1830 formatted.contains("run-delay mean="),
1831 "run-delay segment present; got:\n{formatted}",
1832 );
1833 assert!(
1834 formatted.contains("iters=900000 migrations=12"),
1835 "counters present; got:\n{formatted}",
1836 );
1837 assert!(
1838 formatted.contains("cg_b: off-cpu n/a"),
1839 "not-measured off-cpu is n/a, NOT 0.0%; got:\n{formatted}",
1840 );
1841 // cg_a has a coupled gap (8ms @ cpu 3) -> rendered with its cpu.
1842 assert!(
1843 formatted.contains("gap=8ms@cpu3"),
1844 "coupled gap renders ms@cpu; got:\n{formatted}",
1845 );
1846 // cg_b has no wake/run-delay pools AND max_gap_ms==0 -> those segments
1847 // omitted on its line (gap is omitted at 0, not printed as gap=0ms).
1848 let cg_b_line = formatted
1849 .lines()
1850 .find(|l| l.contains("cg_b:"))
1851 .expect("cg_b line");
1852 assert!(!cg_b_line.contains("wake p99="), "got:\n{formatted}");
1853 assert!(
1854 !cg_b_line.contains("gap="),
1855 "gap omitted at 0; got:\n{formatted}"
1856 );
1857 // BTreeMap name order: cg_a before cg_b.
1858 assert!(
1859 formatted.find("cg_a:").unwrap() < formatted.find("cg_b:").unwrap(),
1860 "cgroups render in name order; got:\n{formatted}",
1861 );
1862 }
1863
1864 /// A stripped carrier (raw sample vectors dropped to fit the size-limited
1865 /// bulk frame) renders "samples stripped (size limit)" — distinct from a
1866 /// not-measured carrier's "off-cpu n/a" — so an operator does not read a
1867 /// size-limit drop as a quiet cgroup. The surviving counters still render.
1868 #[test]
1869 fn format_renders_stripped_carrier_distinctly() {
1870 let mut per_cgroup = std::collections::BTreeMap::new();
1871 per_cgroup.insert(
1872 "cg_stripped".to_string(),
1873 crate::assert::PhaseCgroupStats {
1874 stripped: true,
1875 total_iterations: 500_000,
1876 total_migrations: 9,
1877 ..Default::default()
1878 },
1879 );
1880 per_cgroup.insert(
1881 "cg_quiet".to_string(),
1882 // Genuinely measured nothing (NOT stripped).
1883 crate::assert::PhaseCgroupStats {
1884 total_iterations: 1_000,
1885 ..Default::default()
1886 },
1887 );
1888 let buckets = vec![crate::assert::PhaseBucket {
1889 per_cgroup,
1890 step_index: 1,
1891 label: "Step[0]".to_string(),
1892 start_ms: 1000,
1893 end_ms: 6000,
1894 sample_count: 10,
1895 metrics: std::collections::BTreeMap::from([("avg_imbalance_ratio".to_string(), 1.5)]),
1896 }];
1897 let t = Timeline::from_phase_buckets(&buckets, &[], &TimelineContext::default());
1898 let formatted = t.format_with_context(&TimelineContext::default());
1899 assert!(
1900 formatted.contains("cg_stripped: samples stripped (size limit)"),
1901 "stripped carrier shows the size-limit marker; got:\n{formatted}",
1902 );
1903 assert!(
1904 formatted.contains("iters=500000 migrations=9"),
1905 "stripped carrier still renders its surviving counters; got:\n{formatted}",
1906 );
1907 assert!(
1908 formatted.contains("cg_quiet: off-cpu n/a"),
1909 "a not-stripped, not-measured carrier stays n/a; got:\n{formatted}",
1910 );
1911 assert!(
1912 !formatted.contains("cg_quiet: samples stripped"),
1913 "a not-stripped carrier must NOT show the stripped marker; got:\n{formatted}",
1914 );
1915 }
1916
1917 /// Measured-zero off-CPU renders `0.0%` (a real zero), distinct from the
1918 /// `n/a` not-measured state — the kind-specific None-vs-Some(0.0) boundary.
1919 #[test]
1920 fn format_renders_measured_zero_off_cpu_as_zero_not_na() {
1921 let mut per_cgroup = std::collections::BTreeMap::new();
1922 per_cgroup.insert(
1923 "cg".to_string(),
1924 crate::assert::PhaseCgroupStats {
1925 off_cpu_pcts: vec![0.0, 0.0],
1926 ..Default::default()
1927 },
1928 );
1929 let buckets = vec![crate::assert::PhaseBucket {
1930 per_cgroup,
1931 step_index: 1,
1932 label: "Step[0]".to_string(),
1933 start_ms: 1000,
1934 end_ms: 2000,
1935 sample_count: 1,
1936 metrics: std::collections::BTreeMap::from([("avg_imbalance_ratio".to_string(), 1.0)]),
1937 }];
1938 let t = Timeline::from_phase_buckets(&buckets, &[], &TimelineContext::default());
1939 let formatted = t.format_with_context(&TimelineContext::default());
1940 assert!(
1941 formatted.contains("off-cpu avg=0.0%"),
1942 "measured zero off-cpu is 0.0%, not n/a; got:\n{formatted}",
1943 );
1944 assert!(!formatted.contains("off-cpu n/a"), "got:\n{formatted}");
1945 }
1946
1947 /// Symmetric with the off-cpu measured-zero test for wake + run-delay: a
1948 /// NON-empty pool of zeros is a measured zero (Some) and renders `0µs`, NOT
1949 /// omitted (which only an EMPTY pool -> None does). Guards against a refactor
1950 /// that special-cased a zero reduction to None (collapsing measured-zero
1951 /// into not-measured), silently omitting a real zero-latency reading.
1952 #[test]
1953 fn format_renders_measured_zero_wake_and_run_delay_not_omitted() {
1954 let mut per_cgroup = std::collections::BTreeMap::new();
1955 per_cgroup.insert(
1956 "cg".to_string(),
1957 crate::assert::PhaseCgroupStats {
1958 off_cpu_pcts: vec![5.0],
1959 wake_latencies_ns: vec![0],
1960 run_delays_ns: vec![0],
1961 ..Default::default()
1962 },
1963 );
1964 let buckets = vec![crate::assert::PhaseBucket {
1965 per_cgroup,
1966 step_index: 1,
1967 label: "Step[0]".to_string(),
1968 start_ms: 1000,
1969 end_ms: 2000,
1970 sample_count: 1,
1971 metrics: std::collections::BTreeMap::from([("avg_imbalance_ratio".to_string(), 1.0)]),
1972 }];
1973 let t = Timeline::from_phase_buckets(&buckets, &[], &TimelineContext::default());
1974 let formatted = t.format_with_context(&TimelineContext::default());
1975 assert!(
1976 formatted.contains("wake p99=0\u{00b5}s"),
1977 "measured-zero wake renders 0µs, not omitted; got:\n{formatted}",
1978 );
1979 assert!(
1980 formatted.contains("run-delay mean=0\u{00b5}s"),
1981 "measured-zero run-delay renders 0µs, not omitted; got:\n{formatted}",
1982 );
1983 }
1984
1985 /// The per-cgroup sub-block caps at MAX_RENDERED_CGROUPS (16) by name
1986 /// order, with a "+J more" note — bounding failure-message size.
1987 #[test]
1988 fn format_caps_per_cgroup_subblock() {
1989 let mut per_cgroup = std::collections::BTreeMap::new();
1990 for i in 0..20 {
1991 per_cgroup.insert(
1992 format!("cg{i:02}"),
1993 crate::assert::PhaseCgroupStats {
1994 off_cpu_pcts: vec![10.0],
1995 ..Default::default()
1996 },
1997 );
1998 }
1999 let buckets = vec![crate::assert::PhaseBucket {
2000 per_cgroup,
2001 step_index: 1,
2002 label: "Step[0]".to_string(),
2003 start_ms: 1000,
2004 end_ms: 2000,
2005 sample_count: 1,
2006 metrics: std::collections::BTreeMap::from([("avg_imbalance_ratio".to_string(), 1.0)]),
2007 }];
2008 let t = Timeline::from_phase_buckets(&buckets, &[], &TimelineContext::default());
2009 let formatted = t.format_with_context(&TimelineContext::default());
2010 assert!(
2011 formatted.contains("(+4 more cgroups)"),
2012 "20 cgroups capped at 16 -> +4 more; got:\n{formatted}",
2013 );
2014 assert!(
2015 formatted.contains("cg15:"),
2016 "first 16 rendered; got:\n{formatted}"
2017 );
2018 assert!(
2019 !formatted.contains("cg16:"),
2020 "cg16 is beyond the cap; got:\n{formatted}",
2021 );
2022 }
2023
2024 /// No per-cgroup sub-block when the phase carries no carriers (the
2025 /// monitor-only path, or a phase with empty per_cgroup).
2026 #[test]
2027 fn format_omits_per_cgroup_subblock_when_empty() {
2028 let buckets = vec![crate::assert::PhaseBucket {
2029 per_cgroup: Default::default(),
2030 step_index: 1,
2031 label: "Step[0]".to_string(),
2032 start_ms: 1000,
2033 end_ms: 2000,
2034 sample_count: 1,
2035 metrics: std::collections::BTreeMap::from([("avg_imbalance_ratio".to_string(), 1.0)]),
2036 }];
2037 let t = Timeline::from_phase_buckets(&buckets, &[], &TimelineContext::default());
2038 let formatted = t.format_with_context(&TimelineContext::default());
2039 assert!(
2040 !formatted.contains("per-cgroup:"),
2041 "no sub-block when carriers absent; got:\n{formatted}",
2042 );
2043 }
2044
2045 /// Cap off-by-one boundary: EXACTLY 16 cgroups render all 16 with NO
2046 /// "more cgroups" note (total > MAX is false); 17 render 16 + "(+1 more
2047 /// cgroups)". Pins the `>` vs `>=` / `take(N)` edge against a silent drop
2048 /// (one cgroup gone, no note) or a "(+0 more)" lie.
2049 #[test]
2050 fn format_per_cgroup_cap_boundary_16_and_17() {
2051 let mk = |n: usize| {
2052 let mut per_cgroup = std::collections::BTreeMap::new();
2053 for i in 0..n {
2054 per_cgroup.insert(
2055 format!("cg{i:02}"),
2056 crate::assert::PhaseCgroupStats {
2057 off_cpu_pcts: vec![10.0],
2058 ..Default::default()
2059 },
2060 );
2061 }
2062 let buckets = vec![crate::assert::PhaseBucket {
2063 per_cgroup,
2064 step_index: 1,
2065 label: "Step[0]".to_string(),
2066 start_ms: 1000,
2067 end_ms: 2000,
2068 sample_count: 1,
2069 metrics: std::collections::BTreeMap::from([(
2070 "avg_imbalance_ratio".to_string(),
2071 1.0,
2072 )]),
2073 }];
2074 Timeline::from_phase_buckets(&buckets, &[], &TimelineContext::default())
2075 .format_with_context(&TimelineContext::default())
2076 };
2077 let at16 = mk(16);
2078 assert!(at16.contains("cg15:"), "16th cgroup renders; got:\n{at16}");
2079 assert!(
2080 !at16.contains("more cgroups"),
2081 "exactly 16 has NO truncation note; got:\n{at16}",
2082 );
2083 let at17 = mk(17);
2084 assert!(at17.contains("cg15:"), "got:\n{at17}");
2085 assert!(
2086 !at17.contains("cg16:"),
2087 "17th is past the cap; got:\n{at17}"
2088 );
2089 assert!(
2090 at17.contains("(+1 more cgroup)") && !at17.contains("(+1 more cgroups)"),
2091 "17 cgroups -> exactly +1 more (singular 'cgroup'); got:\n{at17}",
2092 );
2093 }
2094
2095 /// A synthesized (sample_count==0) bucket carrying monitor-derived
2096 /// metrics renders the imbalance/dsq block, not just "[no samples]" —
2097 /// the render-layer half of the monitor-parity handling. format_phases
2098 /// gates that block on `has_monitor_metrics`, not `sample_count > 0`.
2099 #[test]
2100 fn format_renders_synthesized_step_monitor_metrics() {
2101 let buckets = vec![
2102 crate::assert::PhaseBucket {
2103 per_cgroup: Default::default(),
2104 step_index: 0,
2105 label: "BASELINE".to_string(),
2106 start_ms: 0,
2107 end_ms: 1000,
2108 sample_count: 2,
2109 metrics: std::collections::BTreeMap::new(),
2110 },
2111 crate::assert::PhaseBucket {
2112 per_cgroup: Default::default(),
2113 step_index: 1,
2114 label: "Step[0]".to_string(),
2115 start_ms: 1000,
2116 end_ms: 2000,
2117 sample_count: 0, // synthesized zero-capture step
2118 metrics: std::collections::BTreeMap::from([
2119 ("avg_imbalance_ratio".to_string(), 2.0),
2120 ("max_imbalance_ratio".to_string(), 3.0),
2121 ("avg_dsq_depth".to_string(), 5.0),
2122 ("max_dsq_depth".to_string(), 7.0),
2123 ]),
2124 },
2125 ];
2126 let t = Timeline::from_phase_buckets(&buckets, &[], &TimelineContext::default());
2127 let formatted = t.format_with_context(&TimelineContext::default());
2128 assert!(
2129 formatted.contains("imbalance: avg=2.0 max=3.0"),
2130 "synthesized bucket's folded imbalance must render, not \
2131 '[no samples]'; got:\n{formatted}",
2132 );
2133 assert!(
2134 formatted.contains("dsq: avg=5 max=7"),
2135 "synthesized bucket's folded dsq must render; got:\n{formatted}",
2136 );
2137 }
2138
2139 /// A from_phase_buckets render whose first bucket is a Step (no
2140 /// BASELINE bucket — e.g. under --cell-parent-cgroup where BASELINE
2141 /// captured nothing) must NOT mislabel that Step as "BASELINE".
2142 /// phase.index is the bucket's step_index, so format_phases' index==0
2143 /// BASELINE check fires only for a real BASELINE bucket.
2144 #[test]
2145 fn format_no_baseline_bucket_does_not_mislabel_first_step() {
2146 let buckets = vec![
2147 crate::assert::PhaseBucket {
2148 per_cgroup: Default::default(),
2149 step_index: 1, // scenario Step 0; NO BASELINE bucket present
2150 label: "Step[0]".to_string(),
2151 start_ms: 1000,
2152 end_ms: 2000,
2153 sample_count: 3,
2154 metrics: std::collections::BTreeMap::from([(
2155 "avg_imbalance_ratio".to_string(),
2156 1.0,
2157 )]),
2158 },
2159 crate::assert::PhaseBucket {
2160 per_cgroup: Default::default(),
2161 step_index: 2,
2162 label: "Step[1]".to_string(),
2163 start_ms: 2000,
2164 end_ms: 3000,
2165 sample_count: 3,
2166 metrics: std::collections::BTreeMap::from([(
2167 "avg_imbalance_ratio".to_string(),
2168 1.0,
2169 )]),
2170 },
2171 ];
2172 let t = Timeline::from_phase_buckets(&buckets, &[], &TimelineContext::default());
2173 let formatted = t.format_with_context(&TimelineContext::default());
2174 assert!(
2175 !formatted.contains("BASELINE"),
2176 "no BASELINE bucket -> no BASELINE label; the first Step must not \
2177 be mislabeled as BASELINE; got:\n{formatted}",
2178 );
2179 assert!(
2180 formatted.contains("Phase 1"),
2181 "the first Step renders as 'Phase 1' (its step_index), not \
2182 'Phase 0'/BASELINE; got:\n{formatted}",
2183 );
2184 }
2185
2186 /// Sparse / non-contiguous step_index renders the TRUE step number:
2187 /// BASELINE(0) + a Step at step_index 3 renders "Phase 3", not the
2188 /// enumerate-position "Phase 1". Pins index == step_index for a
2189 /// non-zero, non-contiguous label — the case that distinguishes
2190 /// step_index from the old enumerate index (a revert to enumerate
2191 /// would pass the contiguous tests but fail this one).
2192 #[test]
2193 fn format_sparse_step_index_renders_true_step_number() {
2194 let buckets = vec![
2195 crate::assert::PhaseBucket {
2196 per_cgroup: Default::default(),
2197 step_index: 0,
2198 label: "BASELINE".to_string(),
2199 start_ms: 0,
2200 end_ms: 100,
2201 sample_count: 2,
2202 metrics: std::collections::BTreeMap::new(),
2203 },
2204 crate::assert::PhaseBucket {
2205 per_cgroup: Default::default(),
2206 step_index: 3, // sparse: Steps 0/1 absent, only step_index 3
2207 label: "Step[2]".to_string(),
2208 start_ms: 200,
2209 end_ms: 300,
2210 sample_count: 2,
2211 metrics: std::collections::BTreeMap::from([(
2212 "avg_imbalance_ratio".to_string(),
2213 1.0,
2214 )]),
2215 },
2216 ];
2217 let t = Timeline::from_phase_buckets(&buckets, &[], &TimelineContext::default());
2218 let formatted = t.format_with_context(&TimelineContext::default());
2219 assert!(
2220 formatted.contains("Phase 3"),
2221 "a sparse Step at step_index 3 must render 'Phase 3' (its \
2222 step_index), not the enumerate-position 'Phase 1'; got:\n{formatted}",
2223 );
2224 assert!(
2225 !formatted.contains("Phase 1"),
2226 "the enumerate-position 'Phase 1' must NOT appear for a \
2227 step_index-3 bucket; got:\n{formatted}",
2228 );
2229 }
2230
2231 #[test]
2232 fn unsorted_events_sorted() {
2233 let events = vec![stimulus(3000, "StepStart[0]"), stimulus(0, "ScenarioStart")];
2234 let samples: Vec<MonitorSample> = (5..35)
2235 .map(|i| sample(i * 100, vec![(2, 1, i * 1000), (2, 1, i * 1000 + 100)]))
2236 .collect();
2237 let t = Timeline::build(&events, &samples, 0);
2238 assert_eq!(t.phases.len(), 2);
2239 // First phase should be from ScenarioStart (earliest).
2240 assert!(t.phases[0].stimulus.is_none());
2241 }
2242
2243 #[test]
2244 fn stall_detected_in_phase() {
2245 let events = vec![stimulus(0, "ScenarioStart")];
2246 let samples = vec![
2247 sample(600, vec![(1, 0, 5000), (1, 0, 6000)]),
2248 sample(700, vec![(1, 0, 5000), (1, 0, 7000)]), // cpu0 stalled
2249 ];
2250 let t = Timeline::build(&events, &samples, 0);
2251 assert_eq!(t.phases[0].metrics.stall_count, 1);
2252 }
2253
2254 #[test]
2255 fn compute_metrics_stall_count_accumulates_across_windows() {
2256 // cpu0 frozen across TWO consecutive windows -> stall_count == 2.
2257 // Pins that the per-phase path counts every stuck (CPU, window),
2258 // mirroring the run-level accumulation (both breaks removed).
2259 let s1 = sample(100, vec![(1, 0, 5000), (1, 0, 6000)]);
2260 let s2 = sample(200, vec![(1, 0, 5000), (1, 0, 6100)]);
2261 let s3 = sample(300, vec![(1, 0, 5000), (1, 0, 6200)]);
2262 let refs: Vec<&MonitorSample> = vec![&s1, &s2, &s3];
2263 let m = compute_metrics(&refs, 0);
2264 assert_eq!(m.stall_count, 2);
2265 }
2266
2267 #[test]
2268 fn run_level_stuck_count_ge_sum_of_per_phase() {
2269 // Same is_cpu_stuck predicate, different windowing domains: the
2270 // run-level path windows(2) over the FULL stream and counts the
2271 // pair straddling a phase split; partitioning the stream into
2272 // per-phase subsets drops that boundary pair. So run-level >= Σ
2273 // per-phase (strict here) — pins the documented inequality.
2274 let s1 = sample(100, vec![(1, 0, 5000), (1, 0, 6000)]);
2275 let s2 = sample(200, vec![(1, 0, 5000), (1, 0, 6100)]);
2276 let s3 = sample(300, vec![(1, 0, 5000), (1, 0, 6200)]);
2277 let samples = vec![s1, s2, s3];
2278 // Run-level: windows(2) over all 3 -> cpu0 frozen in both windows.
2279 let run_level = crate::monitor::MonitorSummary::from_samples(&samples).stuck_count;
2280 assert_eq!(run_level, 2);
2281 // Partition after s2 (phase A = [s1,s2], phase B = [s3]): the
2282 // (s2,s3) boundary pair is in neither phase's windows(2).
2283 let phase_a: Vec<&MonitorSample> = vec![&samples[0], &samples[1]];
2284 let phase_b: Vec<&MonitorSample> = vec![&samples[2]];
2285 let sum_per_phase =
2286 compute_metrics(&phase_a, 0).stall_count + compute_metrics(&phase_b, 0).stall_count;
2287 assert_eq!(
2288 sum_per_phase, 1,
2289 "the (s2,s3) boundary pair is in neither phase"
2290 );
2291 assert!(
2292 run_level > sum_per_phase,
2293 "run-level counts the boundary-straddling pair the per-phase partition drops"
2294 );
2295 }
2296
2297 #[test]
2298 fn compute_metrics_empty() {
2299 let m = compute_metrics(&[], 0);
2300 assert_eq!(m.sample_count, 0);
2301 // No samples -> no measurement, not a false 0.0 (the sentinel fix).
2302 assert_eq!(m.avg_imbalance, None);
2303 assert_eq!(m.max_imbalance, None);
2304 assert_eq!(m.avg_dsq_depth, None);
2305 assert_eq!(m.avg_nr_running, None);
2306 assert_eq!(m.max_dsq_depth, 0);
2307 }
2308
2309 #[test]
2310 fn stimulus_event_with_detail() {
2311 let e = StimulusEvent {
2312 elapsed_ms: 100,
2313 label: "StepStart[0]".to_string(),
2314 op_kind: Some("SetCpuset".to_string()),
2315 detail: Some("4 cpus".to_string()),
2316 total_iterations: None,
2317 step_index: None,
2318 is_terminal: false,
2319 is_step_end: false,
2320 };
2321 let events = vec![stimulus(0, "ScenarioStart"), e];
2322 let samples: Vec<MonitorSample> = (5..25)
2323 .map(|i| sample(i * 100, vec![(2, 1, i * 1000), (2, 1, i * 1000 + 100)]))
2324 .collect();
2325 let t = Timeline::build(&events, &samples, 0);
2326 let formatted = t.format_with_context(&TimelineContext::default());
2327 assert!(formatted.contains("SetCpuset"));
2328 assert!(formatted.contains("4 cpus"));
2329 }
2330
2331 #[test]
2332 fn many_phases() {
2333 let events: Vec<StimulusEvent> = (0..10)
2334 .map(|i| stimulus(i * 500, &format!("Step[{i}]")))
2335 .collect();
2336 let samples: Vec<MonitorSample> = (5..55)
2337 .map(|i| sample(i * 100, vec![(2, 1, i * 1000)]))
2338 .collect();
2339 let t = Timeline::build(&events, &samples, 0);
2340 assert_eq!(t.phases.len(), 10);
2341 }
2342
2343 #[test]
2344 fn phase_metrics_accuracy() {
2345 let s1 = sample(600, vec![(1, 3, 100), (4, 5, 200)]); // ratio=4, avg_dsq=4
2346 let s2 = sample(700, vec![(2, 1, 300), (2, 7, 400)]); // ratio=1, avg_dsq=4
2347 let refs: Vec<&MonitorSample> = vec![&s1, &s2];
2348 let m = compute_metrics(&refs, 0);
2349 assert_eq!(m.sample_count, 2);
2350 assert!((m.avg_imbalance.unwrap() - 2.5).abs() < 0.01); // (4+1)/2
2351 // nr_running per CPU: s1=(1,4)->per-sample mean 2.5, s2=(2,2)->mean 2.0;
2352 // averaged over the 2 samples = (2.5+2.0)/2 = 2.25.
2353 assert!((m.avg_nr_running.unwrap() - 2.25).abs() < 0.01);
2354 assert!((m.max_imbalance.unwrap() - 4.0).abs() < 0.01);
2355 assert_eq!(m.max_dsq_depth, 7);
2356 }
2357
2358 // -- ChangeDirection Display tests --
2359
2360 #[test]
2361 fn change_direction_display() {
2362 assert_eq!(format!("{}", ChangeDirection::Improved), "IMPROVEMENT");
2363 assert_eq!(format!("{}", ChangeDirection::Degraded), "DEGRADATION");
2364 }
2365
2366 // -- compute_metrics with event counters --
2367
2368 #[test]
2369 fn compute_metrics_with_event_counters() {
2370 use crate::monitor::ScxEventCounters;
2371
2372 let s1 = MonitorSample {
2373 bpf_map_fields: Vec::new(),
2374 prog_stats: None,
2375 psi_irq: None,
2376 elapsed_ms: 600,
2377 cpus: vec![CpuSnapshot {
2378 nr_running: 2,
2379 local_dsq_depth: 1,
2380 rq_clock: 100,
2381 scx_nr_running: 0,
2382 scx_flags: 0,
2383 event_counters: Some(ScxEventCounters {
2384 select_cpu_fallback: 10,
2385 dispatch_keep_last: 5,
2386 ..Default::default()
2387 }),
2388 schedstat: None,
2389 vcpu_cpu_time_ns: None,
2390 vcpu_perf: None,
2391 avg_irq_util: None,
2392 sched_domains: None,
2393 }],
2394 };
2395 let s2 = MonitorSample {
2396 bpf_map_fields: Vec::new(),
2397 prog_stats: None,
2398 psi_irq: None,
2399 elapsed_ms: 1600,
2400 cpus: vec![CpuSnapshot {
2401 nr_running: 2,
2402 local_dsq_depth: 1,
2403 rq_clock: 200,
2404 scx_nr_running: 0,
2405 scx_flags: 0,
2406 event_counters: Some(ScxEventCounters {
2407 select_cpu_fallback: 110,
2408 dispatch_keep_last: 55,
2409 ..Default::default()
2410 }),
2411 schedstat: None,
2412 vcpu_cpu_time_ns: None,
2413 vcpu_perf: None,
2414 avg_irq_util: None,
2415 sched_domains: None,
2416 }],
2417 };
2418 let refs: Vec<&MonitorSample> = vec![&s1, &s2];
2419 let m = compute_metrics(&refs, 0);
2420 // fallback delta: 110 - 10 = 100 over 1.0s = 100.0/s
2421 assert!((m.fallback_rate.unwrap() - 100.0).abs() < 0.01);
2422 // keep_last delta: 55 - 5 = 50 over 1.0s = 50.0/s
2423 assert!((m.keep_last_rate.unwrap() - 50.0).abs() < 0.01);
2424 }
2425
2426 #[test]
2427 fn compute_metrics_no_event_counters() {
2428 let s1 = sample(600, vec![(2, 1, 100)]);
2429 let s2 = sample(700, vec![(2, 1, 200)]);
2430 let refs: Vec<&MonitorSample> = vec![&s1, &s2];
2431 let m = compute_metrics(&refs, 0);
2432 assert!(m.fallback_rate.is_none());
2433 assert!(m.keep_last_rate.is_none());
2434 }
2435
2436 #[test]
2437 fn compute_metrics_counter_reset_clamps_rates_to_non_negative() {
2438 // A scheduler restart between samples resets event counters
2439 // to smaller (or zero) values. Raw `last - first` then
2440 // produces a negative delta, which would flow into
2441 // `fallback_rate = delta / duration` and report a negative
2442 // rate. The shared counter_delta helper clamps to 0.
2443 use crate::monitor::ScxEventCounters;
2444
2445 let s1 = MonitorSample {
2446 bpf_map_fields: Vec::new(),
2447 prog_stats: None,
2448 psi_irq: None,
2449 elapsed_ms: 0,
2450 cpus: vec![CpuSnapshot {
2451 nr_running: 2,
2452 local_dsq_depth: 1,
2453 rq_clock: 100,
2454 scx_nr_running: 0,
2455 scx_flags: 0,
2456 event_counters: Some(ScxEventCounters {
2457 select_cpu_fallback: 1000,
2458 dispatch_keep_last: 500,
2459 ..Default::default()
2460 }),
2461 schedstat: None,
2462 vcpu_cpu_time_ns: None,
2463 vcpu_perf: None,
2464 avg_irq_util: None,
2465 sched_domains: None,
2466 }],
2467 };
2468 let s2 = MonitorSample {
2469 bpf_map_fields: Vec::new(),
2470 prog_stats: None,
2471 psi_irq: None,
2472 elapsed_ms: 1000,
2473 cpus: vec![CpuSnapshot {
2474 nr_running: 2,
2475 local_dsq_depth: 1,
2476 rq_clock: 200,
2477 scx_nr_running: 0,
2478 scx_flags: 0,
2479 event_counters: Some(ScxEventCounters {
2480 select_cpu_fallback: 5,
2481 dispatch_keep_last: 2,
2482 ..Default::default()
2483 }),
2484 schedstat: None,
2485 vcpu_cpu_time_ns: None,
2486 vcpu_perf: None,
2487 avg_irq_util: None,
2488 sched_domains: None,
2489 }],
2490 };
2491 let refs: Vec<&MonitorSample> = vec![&s1, &s2];
2492 let m = compute_metrics(&refs, 0);
2493 let fb = m.fallback_rate.expect("reset still produces Some rate");
2494 let kl = m.keep_last_rate.expect("reset still produces Some rate");
2495 assert!(
2496 fb >= 0.0,
2497 "reset must not produce negative fallback_rate, got {fb}"
2498 );
2499 assert!(
2500 kl >= 0.0,
2501 "reset must not produce negative keep_last_rate, got {kl}"
2502 );
2503 }
2504
2505 // -- format with stalls --
2506
2507 #[test]
2508 fn format_with_stalls_shown() {
2509 let events = vec![stimulus(0, "ScenarioStart")];
2510 let samples = vec![
2511 sample(600, vec![(1, 0, 5000), (1, 0, 6000)]),
2512 sample(700, vec![(1, 0, 5000), (1, 0, 7000)]), // cpu0 stalled
2513 ];
2514 let t = Timeline::build(&events, &samples, 0);
2515 let formatted = t.format_with_context(&TimelineContext::default());
2516 assert!(formatted.contains("stalls: 1"));
2517 }
2518
2519 // -- format with no samples in a phase --
2520
2521 #[test]
2522 fn format_phase_no_samples() {
2523 // Create a phase with no samples by making a phase boundary far
2524 // beyond the last monitor sample's time.
2525 let events = vec![
2526 stimulus(0, "ScenarioStart"),
2527 stimulus(100, "StepStart[0]"),
2528 stimulus(50000, "StepStart[1]"),
2529 ];
2530 // All samples are in the middle phase window.
2531 let samples: Vec<MonitorSample> = (5..15)
2532 .map(|i| sample(i * 100, vec![(2, 1, i * 1000)]))
2533 .collect();
2534 let t = Timeline::build(&events, &samples, 0);
2535 let formatted = t.format_with_context(&TimelineContext::default());
2536 // The last phase (50000+offset to end) should have no samples.
2537 assert!(formatted.contains("[no samples]"));
2538 }
2539
2540 // -- timeline with fallback rate change detection --
2541
2542 #[test]
2543 fn fallback_rate_degradation_detected() {
2544 use crate::monitor::ScxEventCounters;
2545
2546 let events = vec![stimulus(0, "ScenarioStart"), stimulus(1000, "StepStart[0]")];
2547 let mut samples = Vec::new();
2548 // Phase 0: zero fallback rate (counter stays constant).
2549 for i in 5..15 {
2550 samples.push(MonitorSample {
2551 bpf_map_fields: Vec::new(),
2552 prog_stats: None,
2553 psi_irq: None,
2554 elapsed_ms: i * 100,
2555 cpus: vec![CpuSnapshot {
2556 nr_running: 2,
2557 local_dsq_depth: 1,
2558 rq_clock: i * 1000,
2559 scx_nr_running: 0,
2560 scx_flags: 0,
2561 event_counters: Some(ScxEventCounters {
2562 select_cpu_fallback: 0,
2563 dispatch_keep_last: 0,
2564 ..Default::default()
2565 }),
2566 schedstat: None,
2567 vcpu_cpu_time_ns: None,
2568 vcpu_perf: None,
2569 avg_irq_util: None,
2570 sched_domains: None,
2571 }],
2572 });
2573 }
2574 // Phase 1: very high fallback rate.
2575 // 10 samples over 1s. Counter goes from 0 to 500.
2576 // Rate = 500/1.0 = 500/s, well above threshold 10.0.
2577 for i in 15..25 {
2578 samples.push(MonitorSample {
2579 bpf_map_fields: Vec::new(),
2580 prog_stats: None,
2581 psi_irq: None,
2582 elapsed_ms: i * 100,
2583 cpus: vec![CpuSnapshot {
2584 nr_running: 2,
2585 local_dsq_depth: 1,
2586 rq_clock: i * 1000,
2587 scx_nr_running: 0,
2588 scx_flags: 0,
2589 event_counters: Some(ScxEventCounters {
2590 select_cpu_fallback: (i as i64 - 15) * 50,
2591 dispatch_keep_last: 0,
2592 ..Default::default()
2593 }),
2594 schedstat: None,
2595 vcpu_cpu_time_ns: None,
2596 vcpu_perf: None,
2597 avg_irq_util: None,
2598 sched_domains: None,
2599 }],
2600 });
2601 }
2602 let t = Timeline::build(&events, &samples, 0);
2603 let degs: Vec<_> = t
2604 .degradations()
2605 .into_iter()
2606 .filter(|(_, c)| c.metric == "fallback")
2607 .collect();
2608 assert!(!degs.is_empty());
2609 }
2610
2611 // -- format_with_context tests --
2612
2613 #[test]
2614 fn format_with_context_includes_header() {
2615 let events = vec![stimulus(0, "ScenarioStart")];
2616 let samples = vec![
2617 sample(600, vec![(2, 1, 100), (2, 1, 200)]),
2618 sample(700, vec![(2, 1, 300), (2, 1, 400)]),
2619 ];
2620 let t = Timeline::build(&events, &samples, 0);
2621 let ctx = TimelineContext {
2622 kernel: Some("6.14.0-rc3+".to_string()),
2623 topology: Some("2n4l4c2t (16 cpus)".to_string()),
2624 scheduler: Some("scx_mitosis".to_string()),
2625 scenario: Some("proportional".to_string()),
2626 duration_s: Some(20.5),
2627 };
2628 let formatted = t.format_with_context(&ctx);
2629 assert!(formatted.contains("--- timeline ---"));
2630 assert!(formatted.contains("kernel: 6.14.0-rc3+"));
2631 assert!(formatted.contains("topology: 2n4l4c2t (16 cpus)"));
2632 assert!(formatted.contains("scheduler: scx_mitosis"));
2633 assert!(formatted.contains("scenario: proportional"));
2634 assert!(formatted.contains("duration: 20.5s"));
2635 assert!(formatted.contains("BASELINE"));
2636 }
2637
2638 #[test]
2639 fn format_with_context_partial_fields() {
2640 let events = vec![stimulus(0, "ScenarioStart")];
2641 let samples = vec![sample(600, vec![(2, 1, 100)])];
2642 let t = Timeline::build(&events, &samples, 0);
2643 let ctx = TimelineContext {
2644 kernel: None,
2645 topology: Some("1n1l1c1t (1 cpus)".to_string()),
2646 scheduler: None,
2647 scenario: Some("basic".to_string()),
2648 duration_s: None,
2649 };
2650 let formatted = t.format_with_context(&ctx);
2651 assert!(formatted.contains("topology: 1n1l1c1t"));
2652 assert!(formatted.contains("scenario: basic"));
2653 assert!(!formatted.contains("kernel:"));
2654 assert!(!formatted.contains("scheduler:"));
2655 assert!(!formatted.contains("duration:"));
2656 }
2657
2658 #[test]
2659 fn format_with_context_empty_timeline() {
2660 let t = Timeline { phases: vec![] };
2661 let ctx = TimelineContext {
2662 kernel: Some("6.14.0".to_string()),
2663 ..Default::default()
2664 };
2665 assert!(t.format_with_context(&ctx).is_empty());
2666 }
2667
2668 #[test]
2669 fn format_with_context_empty_context() {
2670 let events = vec![stimulus(0, "ScenarioStart")];
2671 let samples = vec![sample(600, vec![(2, 1, 100)])];
2672 let t = Timeline::build(&events, &samples, 0);
2673 let ctx = TimelineContext::default();
2674 let formatted = t.format_with_context(&ctx);
2675 // Should have the timeline header and phases but no context line.
2676 assert!(formatted.contains("--- timeline ---"));
2677 assert!(formatted.contains("BASELINE"));
2678 // The line after "--- timeline ---\n" should be "\nBASELINE" (no context line).
2679 let after_header = &formatted["--- timeline ---\n".len()..];
2680 assert!(after_header.starts_with('\n'));
2681 }
2682
2683 #[test]
2684 fn garbage_dsq_samples_filtered_from_metrics() {
2685 // Samples with DSQ depth above DSQ_PLAUSIBILITY_CEILING should be
2686 // excluded from phase metrics (the bug: garbage values like 1.5B
2687 // were flowing into timeline output).
2688 let events = vec![stimulus(0, "ScenarioStart")];
2689 let garbage_dsq = 1_550_435_906u32;
2690 let samples = vec![
2691 // Garbage sample (DSQ above ceiling).
2692 MonitorSample {
2693 bpf_map_fields: Vec::new(),
2694 prog_stats: None,
2695 psi_irq: None,
2696 elapsed_ms: 600,
2697 cpus: vec![CpuSnapshot {
2698 nr_running: 1,
2699 local_dsq_depth: garbage_dsq,
2700 rq_clock: 1000,
2701 ..Default::default()
2702 }],
2703 },
2704 // Valid sample.
2705 sample(700, vec![(2, 3, 2000)]),
2706 ];
2707 let t = Timeline::build(&events, &samples, 0);
2708 assert_eq!(t.phases.len(), 1);
2709 // Only the valid sample should be counted.
2710 assert_eq!(t.phases[0].metrics.sample_count, 1);
2711 assert_eq!(t.phases[0].metrics.max_dsq_depth, 3);
2712 }
2713
2714 #[test]
2715 fn all_garbage_samples_yield_no_metrics() {
2716 let events = vec![stimulus(0, "ScenarioStart")];
2717 let samples = vec![MonitorSample {
2718 bpf_map_fields: Vec::new(),
2719 prog_stats: None,
2720 psi_irq: None,
2721 elapsed_ms: 600,
2722 cpus: vec![CpuSnapshot {
2723 nr_running: 1,
2724 local_dsq_depth: 50_000,
2725 rq_clock: 1000,
2726 ..Default::default()
2727 }],
2728 }];
2729 let t = Timeline::build(&events, &samples, 0);
2730 assert_eq!(t.phases[0].metrics.sample_count, 0);
2731 }
2732
2733 // ---------------------------------------------------------------
2734 // Negative test: timeline detects degradation at phase transition
2735 // ---------------------------------------------------------------
2736
2737 #[test]
2738 fn neg_timeline_detects_imbalance_degradation() {
2739 let events = vec![stimulus(0, "ScenarioStart"), stimulus(2000, "StepStart[0]")];
2740 let mut samples = Vec::new();
2741 for i in 6..25 {
2742 samples.push(sample(
2743 i * 100,
2744 vec![(2, 1, i * 1000), (2, 1, i * 1000 + 100)],
2745 ));
2746 }
2747 for i in 26..45 {
2748 samples.push(sample(
2749 i * 100,
2750 vec![(1, 1, i * 1000), (10, 1, i * 1000 + 100)],
2751 ));
2752 }
2753 let t = Timeline::build(&events, &samples, 0);
2754 assert_eq!(t.phases.len(), 2, "must have 2 phases");
2755 assert!(!t.degradations().is_empty());
2756
2757 // Phase 0 (baseline) must have samples and reasonable metrics.
2758 assert!(
2759 t.phases[0].metrics.sample_count > 0,
2760 "baseline must have samples"
2761 );
2762 assert!(
2763 (t.phases[0].metrics.avg_imbalance.unwrap() - 1.0).abs() < 0.5,
2764 "baseline imbalance should be ~1.0, got {:?}",
2765 t.phases[0].metrics.avg_imbalance,
2766 );
2767
2768 // Phase 1 must have the stimulus label and degradation.
2769 assert!(
2770 t.phases[1].metrics.sample_count > 0,
2771 "phase 1 must have samples"
2772 );
2773 assert!(
2774 t.phases[1]
2775 .stimulus
2776 .as_ref()
2777 .is_some_and(|s| s.label == "StepStart[0]"),
2778 "phase 1 stimulus must be StepStart[0]",
2779 );
2780
2781 let degs = t.degradations();
2782 assert!(!degs.is_empty());
2783 let (phase, change) = °s[0];
2784 assert_eq!(phase.index, 1);
2785 assert_eq!(change.metric, "imbalance");
2786 assert_eq!(change.direction, ChangeDirection::Degraded);
2787 let delta = change.after - change.before;
2788 assert!(delta > 0.0, "delta must be positive for degradation");
2789 assert!(
2790 delta > IMBALANCE_THRESHOLD,
2791 "delta {:.1} must exceed threshold {:.1}",
2792 delta,
2793 IMBALANCE_THRESHOLD
2794 );
2795 assert!(
2796 change.before < 2.0,
2797 "before should be low: {:.1}",
2798 change.before
2799 );
2800 assert!(
2801 change.after > 5.0,
2802 "after should be high: {:.1}",
2803 change.after
2804 );
2805
2806 // Format output must be parseable.
2807 let formatted = t.format_with_context(&TimelineContext::default());
2808 assert!(
2809 formatted.contains("BASELINE"),
2810 "format must include BASELINE phase"
2811 );
2812 assert!(formatted.contains("Phase 1"), "format must include Phase 1");
2813 assert!(
2814 formatted.contains("DEGRADATION"),
2815 "format must include DEGRADATION label"
2816 );
2817 assert!(
2818 formatted.contains("imbalance"),
2819 "format must name the metric"
2820 );
2821 }
2822
2823 #[test]
2824 fn neg_timeline_detects_dsq_depth_degradation() {
2825 let events = vec![stimulus(0, "ScenarioStart"), stimulus(2000, "StepStart[0]")];
2826 let mut samples = Vec::new();
2827 for i in 6..25 {
2828 samples.push(sample(
2829 i * 100,
2830 vec![(2, 1, i * 1000), (2, 1, i * 1000 + 100)],
2831 ));
2832 }
2833 for i in 26..45 {
2834 samples.push(sample(
2835 i * 100,
2836 vec![(2, 20, i * 1000), (2, 20, i * 1000 + 100)],
2837 ));
2838 }
2839 let t = Timeline::build(&events, &samples, 0);
2840 assert!(
2841 !t.degradations().is_empty(),
2842 "DSQ depth jump must be detected"
2843 );
2844 let degs = t.degradations();
2845 let dsq_deg = degs.iter().find(|(_, c)| c.metric == "dsq_depth");
2846 assert!(dsq_deg.is_some(), "must detect dsq_depth degradation");
2847 let (phase, change) = dsq_deg.unwrap();
2848 assert_eq!(phase.index, 1);
2849 assert_eq!(change.direction, ChangeDirection::Degraded);
2850 let delta = change.after - change.before;
2851 assert!(
2852 delta > DSQ_THRESHOLD,
2853 "dsq delta {:.1} must exceed threshold {:.1}",
2854 delta,
2855 DSQ_THRESHOLD
2856 );
2857 assert!(
2858 change.before < 5.0,
2859 "before dsq should be low: {:.1}",
2860 change.before
2861 );
2862 assert!(
2863 change.after > 15.0,
2864 "after dsq should be high: {:.1}",
2865 change.after
2866 );
2867
2868 let formatted = t.format_with_context(&TimelineContext::default());
2869 assert!(
2870 formatted.contains("dsq_depth"),
2871 "format must name dsq_depth"
2872 );
2873 assert!(
2874 formatted.contains("DEGRADATION"),
2875 "format must label degradation"
2876 );
2877 }
2878
2879 #[test]
2880 fn neg_timeline_no_degradation_when_stable() {
2881 let events = vec![stimulus(0, "ScenarioStart"), stimulus(2000, "StepStart[0]")];
2882 let mut samples = Vec::new();
2883 for i in 6..45 {
2884 samples.push(sample(
2885 i * 100,
2886 vec![(2, 1, i * 1000), (2, 1, i * 1000 + 100)],
2887 ));
2888 }
2889 let t = Timeline::build(&events, &samples, 0);
2890 assert_eq!(t.phases.len(), 2, "must have 2 phases");
2891 assert!(t.phases[0].metrics.sample_count > 0);
2892 assert!(t.phases[1].metrics.sample_count > 0);
2893 assert!(
2894 t.degradations().is_empty(),
2895 "stable phases must not show degradation"
2896 );
2897 assert!(t.degradations().is_empty());
2898 // All phase changes should be empty.
2899 for phase in &t.phases {
2900 assert!(
2901 phase.changes.is_empty(),
2902 "phase {} should have no changes",
2903 phase.index
2904 );
2905 }
2906 }
2907
2908 // -- detect_change direct tests --
2909
2910 #[test]
2911 fn detect_change_higher_is_worse_positive_delta_degraded() {
2912 let c = detect_change(1.0, 5.0, 0.5, "imbalance", true).unwrap();
2913 assert_eq!(c.direction, ChangeDirection::Degraded);
2914 assert_eq!(c.metric, "imbalance");
2915 assert!((c.before - 1.0).abs() < f64::EPSILON);
2916 assert!((c.after - 5.0).abs() < f64::EPSILON);
2917 }
2918
2919 #[test]
2920 fn detect_change_higher_is_worse_negative_delta_improved() {
2921 let c = detect_change(5.0, 1.0, 0.5, "imbalance", true).unwrap();
2922 assert_eq!(c.direction, ChangeDirection::Improved);
2923 }
2924
2925 #[test]
2926 fn detect_change_lower_is_worse_negative_delta_degraded() {
2927 let c = detect_change(100.0, 50.0, 10.0, "throughput", false).unwrap();
2928 assert_eq!(c.direction, ChangeDirection::Degraded);
2929 }
2930
2931 #[test]
2932 fn detect_change_lower_is_worse_positive_delta_improved() {
2933 let c = detect_change(50.0, 100.0, 10.0, "throughput", false).unwrap();
2934 assert_eq!(c.direction, ChangeDirection::Improved);
2935 }
2936
2937 #[test]
2938 fn detect_change_below_threshold_returns_none() {
2939 assert!(detect_change(1.0, 1.3, 0.5, "imbalance", true).is_none());
2940 }
2941
2942 #[test]
2943 fn detect_change_exactly_at_threshold_returns_none() {
2944 assert!(detect_change(1.0, 1.5, 0.5, "imbalance", true).is_none());
2945 }
2946
2947 // -- detect_boundary_changes: throughput across synthesized steps --
2948
2949 /// Throughput is flagged across a SYNTHESIZED zero-capture phase
2950 /// (sample_count 0) — its iteration_rate is stimulus-derived and
2951 /// real — while monitor-derived metrics stay gated on samples, so
2952 /// the synthesized side never paints a phantom imbalance change.
2953 /// This is the collapse-suppression invariant: a throughput collapse entering or
2954 /// leaving a capture-free step must not be silently dropped.
2955 #[test]
2956 fn detect_boundary_changes_flags_throughput_across_synthesized_phase() {
2957 let before = PhaseMetrics {
2958 sample_count: 30,
2959 iteration_rate: Some(1000.0),
2960 avg_imbalance: Some(1.0),
2961 ..Default::default()
2962 };
2963 let after = PhaseMetrics {
2964 sample_count: 0, // synthesized zero-capture step
2965 iteration_rate: Some(300.0), // 70% collapse
2966 avg_imbalance: Some(99.0), // partial/default — must be ignored
2967 ..Default::default()
2968 };
2969 let changes = detect_boundary_changes(&before, &after);
2970 let throughput: Vec<_> = changes
2971 .iter()
2972 .filter(|c| c.metric == "throughput")
2973 .collect();
2974 assert_eq!(
2975 throughput.len(),
2976 1,
2977 "throughput collapse across a synthesized step must be flagged: {changes:?}",
2978 );
2979 assert_eq!(throughput[0].direction, ChangeDirection::Degraded);
2980 assert!(
2981 !changes.iter().any(|c| c.metric == "imbalance"),
2982 "monitor metrics must stay gated when a side has 0 samples: {changes:?}",
2983 );
2984 }
2985
2986 /// The monitor-metric gate only suppresses a ZERO-sample side: when
2987 /// both phases captured samples, monitor-derived changes still
2988 /// surface. Guards the collapse suppression from over-suppressing the normal
2989 /// captured-to-captured boundary.
2990 #[test]
2991 fn detect_boundary_changes_reports_monitor_metrics_when_both_sampled() {
2992 let before = PhaseMetrics {
2993 sample_count: 30,
2994 iteration_rate: Some(1000.0),
2995 avg_imbalance: Some(1.0),
2996 ..Default::default()
2997 };
2998 let after = PhaseMetrics {
2999 sample_count: 30,
3000 iteration_rate: Some(1000.0), // unchanged throughput
3001 avg_imbalance: Some(5.0), // imbalance jump > IMBALANCE_THRESHOLD
3002 ..Default::default()
3003 };
3004 let changes = detect_boundary_changes(&before, &after);
3005 assert!(
3006 changes.iter().any(|c| c.metric == "imbalance"),
3007 "imbalance change must surface when both sides sampled: {changes:?}",
3008 );
3009 assert!(
3010 !changes.iter().any(|c| c.metric == "throughput"),
3011 "unchanged throughput must not be flagged: {changes:?}",
3012 );
3013 }
3014
3015 /// A per-phase avg_nr_running (mean runqueue depth) rise above
3016 /// NR_RUNNING_THRESHOLD between two sampled phases is flagged as a
3017 /// degradation (higher runqueue depth = more contention). Gated on both
3018 /// phases having real samples, like the other monitor-derived gauges.
3019 #[test]
3020 fn detect_boundary_changes_flags_nr_running_jump() {
3021 let before = PhaseMetrics {
3022 sample_count: 30,
3023 avg_nr_running: Some(1.0),
3024 ..Default::default()
3025 };
3026 let after = PhaseMetrics {
3027 sample_count: 30,
3028 avg_nr_running: Some(3.0), // +2.0 > NR_RUNNING_THRESHOLD (1.0)
3029 ..Default::default()
3030 };
3031 let changes = detect_boundary_changes(&before, &after);
3032 let nr: Vec<_> = changes
3033 .iter()
3034 .filter(|c| c.metric == "nr_running")
3035 .collect();
3036 assert_eq!(
3037 nr.len(),
3038 1,
3039 "avg_nr_running jump must be flagged: {changes:?}"
3040 );
3041 assert_eq!(
3042 nr[0].direction,
3043 ChangeDirection::Degraded,
3044 "rising runqueue depth is a degradation",
3045 );
3046 // A zero-sample side suppresses it (same gate as the sibling gauges).
3047 let after_synth = PhaseMetrics {
3048 sample_count: 0,
3049 avg_nr_running: Some(3.0),
3050 ..Default::default()
3051 };
3052 assert!(
3053 !detect_boundary_changes(&before, &after_synth)
3054 .iter()
3055 .any(|c| c.metric == "nr_running"),
3056 "nr_running must stay gated when a side has 0 samples",
3057 );
3058 }
3059
3060 /// The per-phase timeline narrative renders avg_nr_running (`nr_run: avg=`)
3061 /// alongside imbalance + dsq for a sampled phase.
3062 #[test]
3063 fn format_phases_renders_avg_nr_running() {
3064 // Two CPUs with nr_running 2 and 4 -> per-sample mean 3.0.
3065 let samples: Vec<MonitorSample> = (5..25)
3066 .map(|i| sample(i * 100, vec![(2, 1, i * 1000), (4, 1, i * 1000 + 100)]))
3067 .collect();
3068 let events = vec![stimulus(0, "ScenarioStart")];
3069 let t = Timeline::build(&events, &samples, 0);
3070 let formatted = t.format_with_context(&TimelineContext::default());
3071 assert!(
3072 formatted.contains("nr_run: avg="),
3073 "per-phase narrative must render avg_nr_running: {formatted}",
3074 );
3075 }
3076
3077 /// Throughput uses a STRICT relative threshold: a change of exactly
3078 /// ITERATION_RATE_REL_THRESHOLD (30%) is NOT flagged. Pins the `>`
3079 /// boundary so a future `>` -> `>=` flip is caught (the absolute
3080 /// detect_change gate is a different code path, tested separately).
3081 #[test]
3082 fn detect_boundary_changes_throughput_exactly_at_threshold_not_flagged() {
3083 let before = PhaseMetrics {
3084 sample_count: 30,
3085 iteration_rate: Some(1000.0),
3086 ..Default::default()
3087 };
3088 let after = PhaseMetrics {
3089 sample_count: 30,
3090 iteration_rate: Some(700.0), // rel = -0.3 exactly
3091 ..Default::default()
3092 };
3093 assert_eq!((700.0 - 1000.0) / 1000.0, -ITERATION_RATE_REL_THRESHOLD);
3094 assert!(
3095 !detect_boundary_changes(&before, &after)
3096 .iter()
3097 .any(|c| c.metric == "throughput"),
3098 "an exactly-30% relative change must not flag (strict >)",
3099 );
3100 }
3101
3102 /// synthesized -> synthesized boundary (BOTH sides sample_count 0,
3103 /// both carrying a real stimulus-derived rate): throughput is still
3104 /// compared (the gate is symmetric and sample_count-independent for
3105 /// throughput) and monitor metrics stay suppressed on both sides.
3106 /// Pins the zero->zero cell of the transition matrix against a future
3107 /// edit that re-couples throughput to a per-side sample_count check.
3108 #[test]
3109 fn detect_boundary_changes_synthesized_to_synthesized_flags_throughput() {
3110 let before = PhaseMetrics {
3111 sample_count: 0,
3112 iteration_rate: Some(1000.0),
3113 avg_imbalance: Some(1.0),
3114 ..Default::default()
3115 };
3116 let after = PhaseMetrics {
3117 sample_count: 0,
3118 iteration_rate: Some(300.0), // 70% collapse
3119 avg_imbalance: Some(99.0), // wild — must stay gated
3120 ..Default::default()
3121 };
3122 let changes = detect_boundary_changes(&before, &after);
3123 let throughput: Vec<_> = changes
3124 .iter()
3125 .filter(|c| c.metric == "throughput")
3126 .collect();
3127 assert_eq!(
3128 throughput.len(),
3129 1,
3130 "zero->zero throughput collapse must flag: {changes:?}"
3131 );
3132 assert_eq!(throughput[0].direction, ChangeDirection::Degraded);
3133 assert!(
3134 !changes.iter().any(|c| c.metric == "imbalance"),
3135 "monitor metrics gated on both zero-sample sides: {changes:?}",
3136 );
3137 }
3138
3139 // -- iteration_rate computation tests --
3140
3141 fn stimulus_with_iters(elapsed_ms: u64, label: &str, total_iterations: u64) -> StimulusEvent {
3142 StimulusEvent {
3143 elapsed_ms,
3144 label: label.to_string(),
3145 op_kind: None,
3146 detail: None,
3147 total_iterations: Some(total_iterations),
3148 step_index: None,
3149 is_terminal: false,
3150 is_step_end: false,
3151 }
3152 }
3153
3154 #[test]
3155 fn iteration_rate_computed_from_consecutive_events() {
3156 // Two events with total_iterations: phase 0 spans 0..3000ms
3157 // (aligned). iterations: 0 -> 3000 over ~3s = 1000 iter/s.
3158 let events = vec![
3159 stimulus_with_iters(0, "ScenarioStart", 0),
3160 stimulus_with_iters(3000, "StepStart[0]", 3000),
3161 ];
3162 let samples: Vec<MonitorSample> = (5..35)
3163 .map(|i| sample(i * 100, vec![(2, 1, i * 1000), (2, 1, i * 1000 + 100)]))
3164 .collect();
3165 let t = Timeline::build(&events, &samples, 0);
3166 assert_eq!(t.phases.len(), 2);
3167 let rate = t.phases[0].metrics.iteration_rate;
3168 assert!(rate.is_some(), "phase 0 should have iteration_rate");
3169 let r = rate.unwrap();
3170 // Duration is phase boundary difference, not exactly 3s due to
3171 // clock alignment offset. Check that the rate is reasonable.
3172 assert!(r > 500.0 && r < 2000.0, "rate {r} outside expected range");
3173 }
3174
3175 #[test]
3176 fn iteration_rate_none_without_total_iterations() {
3177 // Events without total_iterations: iteration_rate should be None.
3178 let events = vec![stimulus(0, "ScenarioStart"), stimulus(3000, "StepStart[0]")];
3179 let samples: Vec<MonitorSample> = (5..35)
3180 .map(|i| sample(i * 100, vec![(2, 1, i * 1000), (2, 1, i * 1000 + 100)]))
3181 .collect();
3182 let t = Timeline::build(&events, &samples, 0);
3183 assert!(t.phases[0].metrics.iteration_rate.is_none());
3184 assert!(t.phases[1].metrics.iteration_rate.is_none());
3185 }
3186
3187 /// Build a wire `StimulusEvent` so tests can drive the FULL
3188 /// `from_wire` path (the production conversion) rather than
3189 /// constructing the timeline event directly — the latter bypassed
3190 /// the `total_iterations == 0` sentinel.
3191 fn wire_event(
3192 elapsed_ms: u32,
3193 step_index: u16,
3194 total_iterations: u64,
3195 ) -> crate::vmm::wire::StimulusEvent {
3196 crate::vmm::wire::StimulusEvent {
3197 elapsed_ms,
3198 step_index,
3199 op_count: 0,
3200 op_kinds: 0,
3201 cgroup_count: 0,
3202 worker_count: 1,
3203 total_iterations,
3204 }
3205 }
3206
3207 #[test]
3208 fn from_wire_zero_iterations_is_some_baseline() {
3209 // total_iterations is a cumulative counter, so a
3210 // start-of-window 0 is a legitimate baseline, NOT a missing
3211 // sample. from_wire must carry Some(0), never collapse it to
3212 // None.
3213 let te = StimulusEvent::from_wire(&wire_event(0, 1, 0));
3214 assert_eq!(te.total_iterations, Some(0));
3215 assert_eq!(te.step_index, Some(1));
3216 assert!(!te.is_terminal);
3217 assert!(
3218 !te.is_step_end,
3219 "a StepStart-derived event is not a StepEnd"
3220 );
3221 }
3222
3223 #[test]
3224 fn from_step_end_carries_step_index_and_marks_step_end() {
3225 // A StepEnd frame reuses the StimulusEvent wire body.
3226 // from_step_end must carry the same 1-indexed step_index and the
3227 // step's end-of-hold total_iterations, flag is_step_end, and leave
3228 // is_terminal off (it is a real per-step boundary, not the
3229 // scenario terminal).
3230 let te = StimulusEvent::from_step_end(&wire_event(1_900, 1, 9_000));
3231 assert_eq!(te.step_index, Some(1));
3232 assert_eq!(te.total_iterations, Some(9_000));
3233 assert!(te.is_step_end, "StepEnd-derived event must set is_step_end");
3234 assert!(
3235 !te.is_terminal,
3236 "StepEnd is a per-step boundary, not the scenario terminal",
3237 );
3238 }
3239
3240 #[test]
3241 fn from_wire_first_step_zero_baseline_yields_rate() {
3242 // First-step zero-baseline regression, driven through the FULL from_wire path
3243 // (unit tests previously injected Some(0) directly, masking the
3244 // wire 0->None collapse). First step frame reads 0 cumulative
3245 // iterations, the second reads 3000; the first phase must get a
3246 // rate rather than a silent None.
3247 let events: Vec<StimulusEvent> = [wire_event(0, 1, 0), wire_event(3000, 2, 3000)]
3248 .iter()
3249 .map(StimulusEvent::from_wire)
3250 .collect();
3251 let samples: Vec<MonitorSample> = (5..35)
3252 .map(|i| sample(i * 100, vec![(2, 1, i * 1000), (2, 1, i * 1000 + 100)]))
3253 .collect();
3254 let t = Timeline::build(&events, &samples, 0);
3255 assert!(
3256 t.phases[0].metrics.iteration_rate.is_some(),
3257 "first phase must get a rate from the 0 baseline",
3258 );
3259 }
3260
3261 #[test]
3262 fn terminal_event_gives_last_step_rate_without_phantom_phase() {
3263 // The last step has no successor step event, so its
3264 // iteration_rate needs the terminal scenario-end boundary. The
3265 // terminal must supply that boundary WITHOUT adding a phantom
3266 // trailing phase.
3267 let mut events: Vec<StimulusEvent> = [wire_event(0, 1, 0), wire_event(2000, 2, 4000)]
3268 .iter()
3269 .map(StimulusEvent::from_wire)
3270 .collect();
3271 events.push(StimulusEvent::terminal(4000, 10000));
3272 let samples: Vec<MonitorSample> = (5..45)
3273 .map(|i| sample(i * 100, vec![(2, 1, i * 1000)]))
3274 .collect();
3275 let t = Timeline::build(&events, &samples, 0);
3276 assert_eq!(
3277 t.phases.len(),
3278 2,
3279 "two step events -> two phases; terminal seeds none",
3280 );
3281 assert!(
3282 t.phases[1].metrics.iteration_rate.is_some(),
3283 "last step must get a rate from the terminal boundary",
3284 );
3285 }
3286
3287 #[test]
3288 fn build_filters_step_end_events_no_phantom_phase() {
3289 // A StepEnd must be filtered from the PHASE-LAYOUT set
3290 // (it is an end-of-hold marker, not a step boundary) so it neither
3291 // adds a phantom phase nor misaligns the dense phase index. Two
3292 // StepStart events with an interleaved StepEnd still yield exactly
3293 // two phases. (StepEnd is still consumed for the step-local RATE —
3294 // see build_pairs_step_local_when_step_end_events_present.)
3295 let events: Vec<StimulusEvent> = vec![
3296 StimulusEvent::from_wire(&wire_event(0, 1, 0)),
3297 StimulusEvent::from_step_end(&wire_event(1_900, 1, 9_000)),
3298 StimulusEvent::from_wire(&wire_event(2_000, 2, 9_000)),
3299 ];
3300 let samples: Vec<MonitorSample> = (5..35)
3301 .map(|i| sample(i * 100, vec![(2, 1, i * 1000)]))
3302 .collect();
3303 let t = Timeline::build(&events, &samples, 0);
3304 assert_eq!(
3305 t.phases.len(),
3306 2,
3307 "two StepStart events -> two phases; the interleaved StepEnd seeds none",
3308 );
3309 }
3310
3311 #[test]
3312 fn build_pairs_step_local_when_step_end_events_present() {
3313 // The monitor-only Timeline::build fallback must ALSO
3314 // use step-local StepStart[k] -> StepEnd[k] pairing when StepEnd
3315 // events are present (they are emitted independent of snapshot
3316 // captures), NOT the cross-step StepStart[k] -> StepStart[k+1]
3317 // pairing that reads 0 -> 0 for respawned-per-step workers. Two
3318 // fresh-per-step steps (each StepStart reads ~0); without
3319 // step-local pairing phase 0 would be None (0 -> 0 cross-step).
3320 // With it, both phases get a positive rate.
3321 let events: Vec<StimulusEvent> = vec![
3322 StimulusEvent::from_wire(&wire_event(0, 1, 0)), // StepStart[0], iters 0
3323 StimulusEvent::from_step_end(&wire_event(1_000, 1, 5_000)), // StepEnd[0], iters 5000
3324 StimulusEvent::from_wire(&wire_event(1_100, 2, 0)), // StepStart[1] respawned, iters 0
3325 StimulusEvent::from_step_end(&wire_event(2_100, 2, 3_000)), // StepEnd[1], iters 3000
3326 ];
3327 let samples: Vec<MonitorSample> = (1..30)
3328 .map(|i| sample(i * 100, vec![(2, 1, i * 1000)]))
3329 .collect();
3330 let t = Timeline::build(&events, &samples, 0);
3331 assert_eq!(
3332 t.phases.len(),
3333 2,
3334 "two StepStart events -> two phases (each StepEnd seeds none)",
3335 );
3336 assert!(
3337 t.phases[0].metrics.iteration_rate.is_some(),
3338 "phase 0 must get a step-local rate from StepStart[0] -> StepEnd[0], \
3339 not the cross-step 0 -> 0 None (the old cross-step fallback bug)",
3340 );
3341 assert!(
3342 t.phases[1].metrics.iteration_rate.is_some(),
3343 "phase 1 (respawned workers) must get its own step-local rate",
3344 );
3345 }
3346
3347 #[test]
3348 fn build_stalled_step_with_step_end_reports_measured_zero_not_cross_step() {
3349 // Monitor-only path: a step that HAS a StepEnd but
3350 // stalled (StepEnd[k] == StepStart[k]) reports its MEASURED-ZERO
3351 // step-local rate (Some(0.0)) — its StepEnd lookup hits, so the
3352 // cross-step fallback must NOT run. Mirrors the snapshot path's
3353 // build_phase_buckets_with_stimulus_stalled_step_reports_measured_zero.
3354 // Step 0 stalls (0 -> 0); a persistent population reads 500 at
3355 // StepStart[1], so a cross-step StepStart[0] -> StepStart[1] leak
3356 // would be ~454/s. Step 1 advances 500 -> 5500 (5000/s).
3357 let events: Vec<StimulusEvent> = vec![
3358 StimulusEvent::from_wire(&wire_event(0, 1, 0)), // StepStart[0], iters 0
3359 StimulusEvent::from_step_end(&wire_event(1_000, 1, 0)), // StepEnd[0], STALLED 0
3360 StimulusEvent::from_wire(&wire_event(1_100, 2, 500)), // StepStart[1], persistent 500
3361 StimulusEvent::from_step_end(&wire_event(2_100, 2, 5_500)), // StepEnd[1], iters 5500
3362 ];
3363 let samples: Vec<MonitorSample> = (1..30)
3364 .map(|i| sample(i * 100, vec![(2, 1, i * 1000)]))
3365 .collect();
3366 let t = Timeline::build(&events, &samples, 0);
3367 assert_eq!(t.phases.len(), 2);
3368 assert_eq!(
3369 t.phases[0].metrics.iteration_rate,
3370 Some(0.0),
3371 "a stalled step reports measured-zero throughput, not the \
3372 cross-step StepStart[0] -> StepStart[1] persistent-leak rate",
3373 );
3374 assert!(
3375 t.phases[1].metrics.iteration_rate.is_some(),
3376 "step 1 still reports its own step-local rate",
3377 );
3378 }
3379
3380 #[test]
3381 fn terminal_event_single_step_rate() {
3382 // Boundary case: a one-step scenario (first == last). With the
3383 // 0 baseline and the terminal boundary supplying the right edge,
3384 // the single step still gets a rate, and the terminal adds no
3385 // phase.
3386 let mut events: Vec<StimulusEvent> = [wire_event(0, 1, 0)]
3387 .iter()
3388 .map(StimulusEvent::from_wire)
3389 .collect();
3390 events.push(StimulusEvent::terminal(3000, 9000));
3391 let samples: Vec<MonitorSample> = (5..35)
3392 .map(|i| sample(i * 100, vec![(2, 1, i * 1000)]))
3393 .collect();
3394 let t = Timeline::build(&events, &samples, 0);
3395 assert_eq!(
3396 t.phases.len(),
3397 1,
3398 "single step -> one phase; terminal adds none"
3399 );
3400 assert!(
3401 t.phases[0].metrics.iteration_rate.is_some(),
3402 "single step gets a rate (first == last)",
3403 );
3404 }
3405
3406 #[test]
3407 fn terminal_event_stalled_last_step_reports_measured_zero() {
3408 // Boundary case: the last step's counter did not advance
3409 // (terminal count == last step-start count): e == s. That is
3410 // MEASURED ZERO throughput — a real value (the strongest
3411 // degradation signal), not "unmeasured" — so rate_to returns
3412 // Some(0.0), and the zero surfaces to the degradation detector.
3413 // Only a counter DECREASE (e < s) is unmeasurable -> None.
3414 let mut events: Vec<StimulusEvent> = [wire_event(0, 1, 0), wire_event(2000, 2, 4000)]
3415 .iter()
3416 .map(StimulusEvent::from_wire)
3417 .collect();
3418 events.push(StimulusEvent::terminal(4000, 4000)); // no advance
3419 let samples: Vec<MonitorSample> = (5..45)
3420 .map(|i| sample(i * 100, vec![(2, 1, i * 1000)]))
3421 .collect();
3422 let t = Timeline::build(&events, &samples, 0);
3423 assert_eq!(t.phases.len(), 2);
3424 assert_eq!(
3425 t.phases[1].metrics.iteration_rate,
3426 Some(0.0),
3427 "stalled last step (e == s) reports measured-zero, not None",
3428 );
3429 }
3430
3431 #[test]
3432 fn iteration_rate_counter_decrease_yields_no_rate() {
3433 // A counter DECREASE between consecutive step frames (e.g. a
3434 // step-local worker population reset) is unmeasurable and must NOT
3435 // produce a negative or conflated rate — the `e < s` guard drops
3436 // the pair, returning None (distinct from `e == s`, which is a
3437 // measured-zero Some(0.0)). Pin it so a future change that loosens
3438 // the guard to allow a negative delta fails here.
3439 let events: Vec<StimulusEvent> = [
3440 wire_event(0, 1, 0),
3441 wire_event(2000, 2, 5000),
3442 wire_event(3000, 3, 1000), // counter dropped 5000 -> 1000
3443 ]
3444 .iter()
3445 .map(StimulusEvent::from_wire)
3446 .collect();
3447 let samples: Vec<MonitorSample> = (5..35)
3448 .map(|i| sample(i * 100, vec![(2, 1, i * 1000)]))
3449 .collect();
3450 let t = Timeline::build(&events, &samples, 0);
3451 // phase 1 is step 2 (frame iters 5000 -> next 1000): decrease.
3452 assert!(
3453 t.phases[1].metrics.iteration_rate.is_none(),
3454 "a counter decrease must not manufacture a (negative) rate",
3455 );
3456 }
3457
3458 #[test]
3459 fn iteration_rate_zero_duration_yields_no_rate() {
3460 // Two consecutive frames with identical elapsed_ms -> the rate
3461 // denominator is 0; the duration==0 guard must drop the pair
3462 // rather than divide and produce inf/NaN.
3463 let events: Vec<StimulusEvent> = [wire_event(1000, 1, 0), wire_event(1000, 2, 2000)]
3464 .iter()
3465 .map(StimulusEvent::from_wire)
3466 .collect();
3467 let samples: Vec<MonitorSample> = (5..35)
3468 .map(|i| sample(i * 100, vec![(2, 1, i * 1000)]))
3469 .collect();
3470 let t = Timeline::build(&events, &samples, 0);
3471 assert!(
3472 t.phases[0].metrics.iteration_rate.is_none(),
3473 "zero-duration pair must not divide; rate stays None",
3474 );
3475 }
3476
3477 #[test]
3478 fn terminal_not_last_does_not_misalign_or_misattribute() {
3479 // Robustness: even if a corrupt/out-of-order elapsed_ms made the
3480 // terminal sort BEFORE a real step, the explicit is_terminal
3481 // extraction (not positional) must keep the step phases aligned
3482 // and attribute the early step's rate correctly. A corrupt
3483 // terminal contributes no spurious rate (its position can't
3484 // shift the dense phase index).
3485 let mut events: Vec<StimulusEvent> = [wire_event(0, 1, 0), wire_event(2000, 2, 4000)]
3486 .iter()
3487 .map(StimulusEvent::from_wire)
3488 .collect();
3489 // Terminal with elapsed_ms BEFORE step 2 (simulated corruption).
3490 events.push(StimulusEvent::terminal(500, 9000));
3491 let samples: Vec<MonitorSample> = (5..45)
3492 .map(|i| sample(i * 100, vec![(2, 1, i * 1000)]))
3493 .collect();
3494 let t = Timeline::build(&events, &samples, 0);
3495 // Two step events -> two phases regardless of terminal position.
3496 assert_eq!(
3497 t.phases.len(),
3498 2,
3499 "terminal position must not change phase count"
3500 );
3501 // Phase 0 (step 1) still gets its correct rate (0 -> 4000 over
3502 // 2s = 2000/s): the misordered terminal did not misalign it.
3503 assert_eq!(
3504 t.phases[0].metrics.iteration_rate,
3505 Some(2000.0),
3506 "early step rate must be correct despite a misordered terminal",
3507 );
3508 }
3509
3510 #[test]
3511 fn throughput_degradation_detected() {
3512 // Phase 0: high throughput (0 -> 10000 iters over ~2s = ~5000/s)
3513 // Phase 1: low throughput (10000 -> 11000 iters over ~2s = ~500/s)
3514 // 90% drop exceeds ITERATION_RATE_REL_THRESHOLD (0.3).
3515 let events = vec![
3516 stimulus_with_iters(0, "ScenarioStart", 0),
3517 stimulus_with_iters(2000, "StepStart[0]", 10000),
3518 stimulus_with_iters(4000, "StepEnd[0]", 11000),
3519 ];
3520 let samples: Vec<MonitorSample> = (5..45)
3521 .map(|i| sample(i * 100, vec![(2, 1, i * 1000), (2, 1, i * 1000 + 100)]))
3522 .collect();
3523 let t = Timeline::build(&events, &samples, 0);
3524 assert_eq!(t.phases.len(), 3);
3525 // Phase 0 should have high iteration_rate.
3526 assert!(t.phases[0].metrics.iteration_rate.is_some());
3527 // Phase 1 should have low iteration_rate.
3528 assert!(t.phases[1].metrics.iteration_rate.is_some());
3529 let r0 = t.phases[0].metrics.iteration_rate.unwrap();
3530 let r1 = t.phases[1].metrics.iteration_rate.unwrap();
3531 assert!(
3532 r0 > r1,
3533 "phase 0 rate ({r0}) should exceed phase 1 rate ({r1})"
3534 );
3535
3536 // Throughput degradation should be detected at phase 1 boundary.
3537 let degs: Vec<_> = t
3538 .degradations()
3539 .into_iter()
3540 .filter(|(_, c)| c.metric == "throughput")
3541 .collect();
3542 assert!(!degs.is_empty(), "throughput degradation must be detected");
3543 let (phase, change) = °s[0];
3544 assert_eq!(phase.index, 1);
3545 assert_eq!(change.direction, ChangeDirection::Degraded);
3546 assert!(change.before > change.after);
3547 }
3548
3549 #[test]
3550 fn throughput_collapse_to_zero_is_flagged() {
3551 // A phase that collapses to ZERO throughput (e == s, measured
3552 // zero) must be flagged as a degradation — it is the strongest
3553 // degradation signal. Previously the zero phase's rate_to returned
3554 // None, so the detector's Some/Some gate dropped it and the worst
3555 // degradation went silently unreported.
3556 let events = vec![
3557 stimulus_with_iters(0, "ScenarioStart", 0),
3558 stimulus_with_iters(2000, "StepStart[0]", 10000), // phase 0: ~5000/s
3559 stimulus_with_iters(4000, "StepStart[1]", 10000), // phase 1: 0/s (stalled)
3560 ];
3561 let samples: Vec<MonitorSample> = (5..45)
3562 .map(|i| sample(i * 100, vec![(2, 1, i * 1000)]))
3563 .collect();
3564 let t = Timeline::build(&events, &samples, 0);
3565 assert_eq!(
3566 t.phases[1].metrics.iteration_rate,
3567 Some(0.0),
3568 "the collapsed phase must report measured-zero throughput",
3569 );
3570 let degs: Vec<_> = t
3571 .degradations()
3572 .into_iter()
3573 .filter(|(p, c)| p.index == 1 && c.metric == "throughput")
3574 .collect();
3575 assert!(
3576 !degs.is_empty(),
3577 "a collapse to zero throughput must be flagged as a degradation",
3578 );
3579 assert_eq!(degs[0].1.direction, ChangeDirection::Degraded);
3580 assert_eq!(degs[0].1.after, 0.0);
3581 }
3582
3583 #[test]
3584 fn throughput_improvement_detected() {
3585 // Phase 0: low throughput (0 -> 500 iters over ~2s = ~250/s)
3586 // Phase 1: high throughput (500 -> 10500 iters over ~2s = ~5000/s)
3587 // >30% increase should be flagged as improvement.
3588 let events = vec![
3589 stimulus_with_iters(0, "ScenarioStart", 0),
3590 stimulus_with_iters(2000, "StepStart[0]", 500),
3591 stimulus_with_iters(4000, "StepEnd[0]", 10500),
3592 ];
3593 let samples: Vec<MonitorSample> = (5..45)
3594 .map(|i| sample(i * 100, vec![(2, 1, i * 1000), (2, 1, i * 1000 + 100)]))
3595 .collect();
3596 let t = Timeline::build(&events, &samples, 0);
3597 let improvements: Vec<_> = t
3598 .phases
3599 .iter()
3600 .flat_map(|p| p.changes.iter())
3601 .filter(|c| c.metric == "throughput" && c.direction == ChangeDirection::Improved)
3602 .collect();
3603 assert!(
3604 !improvements.is_empty(),
3605 "throughput improvement must be detected"
3606 );
3607 }
3608
3609 #[test]
3610 fn throughput_stable_below_threshold() {
3611 // Phase 0: 1000 iter/s
3612 // Phase 1: ~900 iter/s (10% drop, below 30% threshold)
3613 // No throughput change should be detected.
3614 let events = vec![
3615 stimulus_with_iters(0, "ScenarioStart", 0),
3616 stimulus_with_iters(2000, "StepStart[0]", 2000),
3617 stimulus_with_iters(4000, "StepEnd[0]", 3800),
3618 ];
3619 let samples: Vec<MonitorSample> = (5..45)
3620 .map(|i| sample(i * 100, vec![(2, 1, i * 1000), (2, 1, i * 1000 + 100)]))
3621 .collect();
3622 let t = Timeline::build(&events, &samples, 0);
3623 let throughput_changes: Vec<_> = t
3624 .phases
3625 .iter()
3626 .flat_map(|p| p.changes.iter())
3627 .filter(|c| c.metric == "throughput")
3628 .collect();
3629 assert!(
3630 throughput_changes.is_empty(),
3631 "10% change should not trigger throughput change detection"
3632 );
3633 }
3634
3635 #[test]
3636 fn from_phase_buckets_maps_known_metrics_and_renders_phase_block() {
3637 use crate::assert::PhaseBucket;
3638 use std::collections::BTreeMap;
3639 let mut s0_metrics = BTreeMap::new();
3640 s0_metrics.insert("max_dsq_depth".to_string(), 7.0);
3641 s0_metrics.insert("avg_dsq_depth".to_string(), 2.5);
3642 s0_metrics.insert("max_imbalance_ratio".to_string(), 3.5);
3643 s0_metrics.insert("avg_imbalance_ratio".to_string(), 1.8);
3644 s0_metrics.insert("avg_nr_running".to_string(), 3.0);
3645 s0_metrics.insert("total_fallback".to_string(), 200.0);
3646 let buckets = vec![
3647 PhaseBucket {
3648 per_cgroup: Default::default(),
3649 step_index: 0,
3650 label: "BASELINE".to_string(),
3651 start_ms: 0,
3652 end_ms: 1000,
3653 sample_count: 5,
3654 metrics: BTreeMap::new(),
3655 },
3656 PhaseBucket {
3657 per_cgroup: Default::default(),
3658 step_index: 1,
3659 label: "Step[0]".to_string(),
3660 start_ms: 1000,
3661 end_ms: 6000,
3662 sample_count: 20,
3663 metrics: s0_metrics,
3664 },
3665 ];
3666 let t = Timeline::from_phase_buckets(&buckets, &[], &TimelineContext::default());
3667 assert_eq!(t.phases.len(), 2);
3668 // Phase 0 (BASELINE) — no stimulus, no metrics.
3669 assert!(t.phases[0].stimulus.is_none());
3670 assert_eq!(t.phases[0].metrics.sample_count, 5);
3671 assert_eq!(t.phases[0].metrics.max_dsq_depth, 0);
3672 // Phase 1 (Step[0]) — stimulus set, metrics projected from
3673 // the bucket map.
3674 assert!(t.phases[1].stimulus.is_some());
3675 assert_eq!(t.phases[1].stimulus.as_ref().unwrap().label, "Step[0]");
3676 assert_eq!(t.phases[1].metrics.sample_count, 20);
3677 assert_eq!(t.phases[1].metrics.max_dsq_depth, 7);
3678 assert!((t.phases[1].metrics.avg_dsq_depth.unwrap() - 2.5).abs() < f64::EPSILON);
3679 assert!((t.phases[1].metrics.max_imbalance.unwrap() - 3.5).abs() < f64::EPSILON);
3680 assert!((t.phases[1].metrics.avg_imbalance.unwrap() - 1.8).abs() < f64::EPSILON);
3681 // Snapshot-path read (phase_from_bucket): bucket.metrics["avg_nr_running"]
3682 // projects to PhaseMetrics.avg_nr_running; phase 0's empty map -> None.
3683 assert!((t.phases[1].metrics.avg_nr_running.unwrap() - 3.0).abs() < f64::EPSILON);
3684 assert_eq!(t.phases[0].metrics.avg_nr_running, None);
3685 // fallback_rate = 200 / (5000 / 1000) = 40.0 events/s
3686 assert_eq!(t.phases[1].metrics.fallback_rate, Some(40.0));
3687 // keep_last_rate absent → None (no total_keep_last in metrics map)
3688 assert_eq!(t.phases[1].metrics.keep_last_rate, None);
3689 // avg_dsq_depth + avg_imbalance + avg_nr_running are now
3690 // all wired (per the doc table). iteration_rate is the only field
3691 // PhaseBucket cannot supply directly (depends on stimulus
3692 // event totals, not a per-Sample reading).
3693 assert_eq!(t.phases[1].metrics.iteration_rate, None);
3694 // Render produces a non-empty timeline block.
3695 let formatted = t.format_with_context(&TimelineContext::default());
3696 assert!(formatted.contains("--- timeline ---"));
3697 assert!(formatted.contains("BASELINE"));
3698 assert!(formatted.contains("Step[0]"));
3699 }
3700
3701 /// Collapse-suppression production path: a throughput collapse INTO a synthesized
3702 /// zero-capture step surfaces through from_phase_buckets (the path
3703 /// `evaluate_vm_result` prefers), not only via the helper unit test.
3704 /// BASELINE captures samples + an iteration_rate; Step[0] is
3705 /// synthesized (sample_count 0) with a collapsed iteration_rate AND a
3706 /// divergent imbalance that must stay gated. Re-adding the old
3707 /// `if sample_count==0 { continue }` gate at this call site makes
3708 /// phases[1].changes empty, so this test fails — it is the
3709 /// regression pin for the removed gate. The producer half — that
3710 /// build_phase_buckets_with_stimulus actually populates a synthesized
3711 /// bucket's iteration_rate from stimulus deltas — is pinned by
3712 /// assert::tests_phase_bucket::build_phase_buckets_with_stimulus_synthesizes_zero_capture_step_bucket;
3713 /// together they pin the full producer->consumer chain.
3714 #[test]
3715 fn from_phase_buckets_flags_throughput_into_synthesized_step() {
3716 use crate::assert::PhaseBucket;
3717 use std::collections::BTreeMap;
3718 let mut baseline_metrics = BTreeMap::new();
3719 baseline_metrics.insert("iteration_rate".to_string(), 1000.0);
3720 baseline_metrics.insert("avg_imbalance_ratio".to_string(), 1.0);
3721 let mut step_metrics = BTreeMap::new();
3722 step_metrics.insert("iteration_rate".to_string(), 300.0); // 70% collapse
3723 step_metrics.insert("avg_imbalance_ratio".to_string(), 99.0); // must stay gated
3724 let buckets = vec![
3725 PhaseBucket {
3726 per_cgroup: Default::default(),
3727 step_index: 0,
3728 label: "BASELINE".to_string(),
3729 start_ms: 0,
3730 end_ms: 1000,
3731 sample_count: 5,
3732 metrics: baseline_metrics,
3733 },
3734 PhaseBucket {
3735 per_cgroup: Default::default(),
3736 step_index: 1,
3737 label: "Step[0]".to_string(),
3738 start_ms: 1000,
3739 end_ms: 6000,
3740 sample_count: 0, // synthesized zero-capture step
3741 metrics: step_metrics,
3742 },
3743 ];
3744 let t = Timeline::from_phase_buckets(&buckets, &[], &TimelineContext::default());
3745 assert_eq!(t.phases[1].metrics.sample_count, 0);
3746 assert_eq!(t.phases[1].metrics.iteration_rate, Some(300.0));
3747 let throughput: Vec<_> = t.phases[1]
3748 .changes
3749 .iter()
3750 .filter(|c| c.metric == "throughput")
3751 .collect();
3752 assert_eq!(
3753 throughput.len(),
3754 1,
3755 "throughput collapse into a synthesized step must surface via \
3756 from_phase_buckets: {:?}",
3757 t.phases[1].changes,
3758 );
3759 assert_eq!(throughput[0].direction, ChangeDirection::Degraded);
3760 assert!(
3761 !t.phases[1].changes.iter().any(|c| c.metric == "imbalance"),
3762 "monitor metrics must stay gated for a zero-sample side: {:?}",
3763 t.phases[1].changes,
3764 );
3765 }
3766
3767 /// Boundary change-detection on the from_phase_buckets path — the
3768 /// PRODUCTION success path (`evaluate_vm_result` prefers
3769 /// from_phase_buckets over `build`). Two adjacent metric-bearing
3770 /// buckets whose avg_imbalance / avg_dsq_depth cross the thresholds
3771 /// in the worsening direction must record Degraded changes on the
3772 /// ENTERED phase (phases[1]), and the BASELINE phase records none.
3773 /// Without this, the from_phase_buckets boundary-change loop (the
3774 /// `for i in 1..phases.len()` call to detect_boundary_changes) ships
3775 /// unverified (a wrong threshold, inverted direction, wrong-phase
3776 /// recording, or wrong metric field would all slip past the other
3777 /// from_phase_buckets tests, which never trigger the loop).
3778 #[test]
3779 fn from_phase_buckets_detects_boundary_degradation() {
3780 use crate::assert::PhaseBucket;
3781 use std::collections::BTreeMap;
3782 let mut base = BTreeMap::new();
3783 base.insert("avg_imbalance_ratio".to_string(), 1.0);
3784 base.insert("avg_dsq_depth".to_string(), 1.0);
3785 let mut step = BTreeMap::new();
3786 step.insert("avg_imbalance_ratio".to_string(), 2.0); // +1.0 > 0.5 threshold
3787 step.insert("avg_dsq_depth".to_string(), 6.0); // +5.0 > 3.0 threshold
3788 let buckets = vec![
3789 PhaseBucket {
3790 per_cgroup: Default::default(),
3791 step_index: 0,
3792 label: "BASELINE".to_string(),
3793 start_ms: 0,
3794 end_ms: 1000,
3795 sample_count: 5,
3796 metrics: base,
3797 },
3798 PhaseBucket {
3799 per_cgroup: Default::default(),
3800 step_index: 1,
3801 label: "Step[0]".to_string(),
3802 start_ms: 1000,
3803 end_ms: 6000,
3804 sample_count: 20,
3805 metrics: step,
3806 },
3807 ];
3808 let t = Timeline::from_phase_buckets(&buckets, &[], &TimelineContext::default());
3809 // Change recorded on the ENTERED phase, never the prior one.
3810 assert!(
3811 t.phases[0].changes.is_empty(),
3812 "BASELINE has no prior phase to diff; changes belong to the entered phase",
3813 );
3814 let changes = &t.phases[1].changes;
3815 let imb = changes
3816 .iter()
3817 .find(|c| c.metric == "imbalance")
3818 .expect("imbalance change must fire (1.0 -> 2.0 crosses 0.5)");
3819 assert_eq!(imb.direction, ChangeDirection::Degraded);
3820 assert!((imb.before - 1.0).abs() < f64::EPSILON);
3821 assert!((imb.after - 2.0).abs() < f64::EPSILON);
3822 let dsq = changes
3823 .iter()
3824 .find(|c| c.metric == "dsq_depth")
3825 .expect("dsq_depth change must fire (1.0 -> 6.0 crosses 3.0)");
3826 assert_eq!(dsq.direction, ChangeDirection::Degraded);
3827 assert!((dsq.before - 1.0).abs() < f64::EPSILON);
3828 assert!((dsq.after - 6.0).abs() < f64::EPSILON);
3829 }
3830
3831 /// Sub-threshold deltas record NO change — guards a dropped/zeroed
3832 /// threshold that would fabricate spurious boundary changes.
3833 #[test]
3834 fn from_phase_buckets_subthreshold_records_no_change() {
3835 use crate::assert::PhaseBucket;
3836 use std::collections::BTreeMap;
3837 let mut base = BTreeMap::new();
3838 base.insert("avg_imbalance_ratio".to_string(), 1.0);
3839 base.insert("avg_dsq_depth".to_string(), 1.0);
3840 let mut step = BTreeMap::new();
3841 step.insert("avg_imbalance_ratio".to_string(), 1.2); // +0.2 < 0.5
3842 step.insert("avg_dsq_depth".to_string(), 2.0); // +1.0 < 3.0
3843 let buckets = vec![
3844 PhaseBucket {
3845 per_cgroup: Default::default(),
3846 step_index: 0,
3847 label: "BASELINE".to_string(),
3848 start_ms: 0,
3849 end_ms: 1000,
3850 sample_count: 5,
3851 metrics: base,
3852 },
3853 PhaseBucket {
3854 per_cgroup: Default::default(),
3855 step_index: 1,
3856 label: "Step[0]".to_string(),
3857 start_ms: 1000,
3858 end_ms: 6000,
3859 sample_count: 20,
3860 metrics: step,
3861 },
3862 ];
3863 let t = Timeline::from_phase_buckets(&buckets, &[], &TimelineContext::default());
3864 assert!(
3865 t.phases[1].changes.is_empty(),
3866 "sub-threshold deltas must not record a boundary change",
3867 );
3868 }
3869
3870 /// Decreasing imbalance across the boundary records an IMPROVEMENT —
3871 /// locks the higher_is_worse direction so an inverted flag cannot
3872 /// report a regression as an improvement (or vice versa).
3873 #[test]
3874 fn from_phase_buckets_detects_boundary_improvement() {
3875 use crate::assert::PhaseBucket;
3876 use std::collections::BTreeMap;
3877 let mut base = BTreeMap::new();
3878 base.insert("avg_imbalance_ratio".to_string(), 2.0);
3879 let mut step = BTreeMap::new();
3880 step.insert("avg_imbalance_ratio".to_string(), 1.0); // -1.0, |delta|>0.5, after<before
3881 let buckets = vec![
3882 PhaseBucket {
3883 per_cgroup: Default::default(),
3884 step_index: 0,
3885 label: "BASELINE".to_string(),
3886 start_ms: 0,
3887 end_ms: 1000,
3888 sample_count: 5,
3889 metrics: base,
3890 },
3891 PhaseBucket {
3892 per_cgroup: Default::default(),
3893 step_index: 1,
3894 label: "Step[0]".to_string(),
3895 start_ms: 1000,
3896 end_ms: 6000,
3897 sample_count: 20,
3898 metrics: step,
3899 },
3900 ];
3901 let t = Timeline::from_phase_buckets(&buckets, &[], &TimelineContext::default());
3902 let imb = t.phases[1]
3903 .changes
3904 .iter()
3905 .find(|c| c.metric == "imbalance")
3906 .expect("imbalance change must fire (2.0 -> 1.0 crosses 0.5)");
3907 assert_eq!(
3908 imb.direction,
3909 ChangeDirection::Improved,
3910 "a decreasing imbalance is an improvement, not a degradation",
3911 );
3912 }
3913
3914 /// from_phase_buckets must CORRELATE a real stimulus event into the
3915 /// phase header, carrying its op_kind + detail. Every other
3916 /// from_phase_buckets test passes `&[]`, so only the synthetic
3917 /// None-placeholder arm ran and the `Some(ev) => (*ev).clone()`
3918 /// correlation arm (added to stop headers degrading to "Step[N]: ?")
3919 /// was untested. A wrong interval bound or cloning the wrong event
3920 /// would drop the operator-facing op/detail with no failure.
3921 #[test]
3922 fn from_phase_buckets_correlates_real_stimulus_op_and_detail() {
3923 use crate::assert::PhaseBucket;
3924 use std::collections::BTreeMap;
3925 let event = StimulusEvent {
3926 elapsed_ms: 1000,
3927 label: "Step[0]".to_string(),
3928 op_kind: Some("SetCpuset".to_string()),
3929 detail: Some("4 cpus".to_string()),
3930 total_iterations: None,
3931 step_index: Some(1),
3932 is_terminal: false,
3933 is_step_end: false,
3934 };
3935 let buckets = vec![
3936 PhaseBucket {
3937 per_cgroup: Default::default(),
3938 step_index: 0,
3939 label: "BASELINE".to_string(),
3940 start_ms: 0,
3941 end_ms: 1000,
3942 sample_count: 5,
3943 metrics: BTreeMap::new(),
3944 },
3945 PhaseBucket {
3946 per_cgroup: Default::default(),
3947 step_index: 1,
3948 label: "Step[0]".to_string(),
3949 start_ms: 1000,
3950 end_ms: 6000,
3951 sample_count: 20,
3952 metrics: BTreeMap::new(),
3953 },
3954 ];
3955 let t = Timeline::from_phase_buckets(&buckets, &[event], &TimelineContext::default());
3956 let stim = t.phases[1]
3957 .stimulus
3958 .as_ref()
3959 .expect("Step[0] phase carries a stimulus");
3960 assert_eq!(
3961 stim.op_kind.as_deref(),
3962 Some("SetCpuset"),
3963 "the correlated event's op_kind must be carried, not the None placeholder",
3964 );
3965 assert_eq!(stim.detail.as_deref(), Some("4 cpus"));
3966 }
3967
3968 #[test]
3969 fn from_phase_buckets_zero_duration_window_emits_no_rate() {
3970 use crate::assert::PhaseBucket;
3971 use std::collections::BTreeMap;
3972 let mut metrics = BTreeMap::new();
3973 metrics.insert("total_fallback".to_string(), 100.0);
3974 let bucket = PhaseBucket {
3975 per_cgroup: Default::default(),
3976 step_index: 1,
3977 label: "Step[0]".to_string(),
3978 start_ms: 500,
3979 end_ms: 500,
3980 sample_count: 1,
3981 metrics,
3982 };
3983 let t = Timeline::from_phase_buckets(&[bucket], &[], &TimelineContext::default());
3984 // Degenerate window (start == end) yields duration_s == 0,
3985 // so rate divisions stay None rather than producing
3986 // spurious infinities.
3987 assert_eq!(t.phases[0].metrics.fallback_rate, None);
3988 }
3989
3990 #[test]
3991 fn from_phase_buckets_absent_imbalance_metric_is_none_not_zero() {
3992 // A bucket carrying no avg_imbalance_ratio / avg_dsq_depth
3993 // metric must yield None (no data), NOT Some(0.0) — so the change
3994 // detector skips it instead of comparing a false zero-imbalance.
3995 use crate::assert::PhaseBucket;
3996 use std::collections::BTreeMap;
3997 let bucket = PhaseBucket {
3998 per_cgroup: Default::default(),
3999 step_index: 1,
4000 label: "Step[0]".to_string(),
4001 start_ms: 100,
4002 end_ms: 600,
4003 sample_count: 3,
4004 metrics: BTreeMap::new(),
4005 };
4006 let t = Timeline::from_phase_buckets(&[bucket], &[], &TimelineContext::default());
4007 assert_eq!(t.phases[0].metrics.avg_imbalance, None);
4008 assert_eq!(t.phases[0].metrics.max_imbalance, None);
4009 assert_eq!(t.phases[0].metrics.avg_dsq_depth, None);
4010 }
4011
4012 #[test]
4013 fn from_phase_buckets_sorts_by_step_index() {
4014 use crate::assert::PhaseBucket;
4015 use std::collections::BTreeMap;
4016 // Out-of-order input; from_phase_buckets must sort by
4017 // step_index so the rendered phase block walks BASELINE
4018 // → Step[0] → Step[1] in time order regardless of
4019 // how the caller arranged the input vec.
4020 let buckets = vec![
4021 PhaseBucket {
4022 per_cgroup: Default::default(),
4023 step_index: 2,
4024 label: "Step[1]".to_string(),
4025 start_ms: 2000,
4026 end_ms: 3000,
4027 sample_count: 5,
4028 metrics: BTreeMap::new(),
4029 },
4030 PhaseBucket {
4031 per_cgroup: Default::default(),
4032 step_index: 0,
4033 label: "BASELINE".to_string(),
4034 start_ms: 0,
4035 end_ms: 500,
4036 sample_count: 2,
4037 metrics: BTreeMap::new(),
4038 },
4039 PhaseBucket {
4040 per_cgroup: Default::default(),
4041 step_index: 1,
4042 label: "Step[0]".to_string(),
4043 start_ms: 500,
4044 end_ms: 2000,
4045 sample_count: 5,
4046 metrics: BTreeMap::new(),
4047 },
4048 ];
4049 let t = Timeline::from_phase_buckets(&buckets, &[], &TimelineContext::default());
4050 assert_eq!(t.phases.len(), 3);
4051 assert_eq!(t.phases[0].start_ms, 0);
4052 assert_eq!(t.phases[1].start_ms, 500);
4053 assert_eq!(t.phases[2].start_ms, 2000);
4054 }
4055
4056 #[test]
4057 fn iteration_rate_in_formatted_output() {
4058 let events = vec![
4059 stimulus_with_iters(0, "ScenarioStart", 0),
4060 stimulus_with_iters(2000, "StepStart[0]", 5000),
4061 ];
4062 let samples: Vec<MonitorSample> = (5..25)
4063 .map(|i| sample(i * 100, vec![(2, 1, i * 1000), (2, 1, i * 1000 + 100)]))
4064 .collect();
4065 let t = Timeline::build(&events, &samples, 0);
4066 let formatted = t.format_with_context(&TimelineContext::default());
4067 assert!(
4068 formatted.contains("throughput:"),
4069 "format output must contain throughput when iteration_rate is set"
4070 );
4071 assert!(formatted.contains("iter/s"));
4072 }
4073}