ktstr/
timeline.rs

1//! Stimulus/phase correlation for scenario execution.
2//!
3//! Correlates [`StimulusEvent`]s (cgroup operations, cpuset changes)
4//! with `MonitorSample` windows to
5//! measure per-phase scheduler behavior degradation. Produces
6//! [`Timeline`] entries consumed by the stats and reporting pipeline.
7
8use std::fmt;
9
10use crate::monitor::{MonitorSample, sample_looks_valid};
11
12// ---------------------------------------------------------------------------
13// TimelineContext — system context rendered as a header
14// ---------------------------------------------------------------------------
15
16/// System context for a timeline, rendered as a header block.
17#[derive(Debug, Clone, Default)]
18pub struct TimelineContext {
19    /// Kernel version string (e.g. "6.14.0-rc3+").
20    pub kernel: Option<String>,
21    /// Topology description (e.g. "2n4l4c2t (16 cpus)").
22    pub topology: Option<String>,
23    /// Scheduler name (e.g. "scx_mitosis").
24    pub scheduler: Option<String>,
25    /// Scenario name.
26    pub scenario: Option<String>,
27    /// Total run duration in seconds.
28    pub duration_s: Option<f64>,
29}
30
31// ---------------------------------------------------------------------------
32// StimulusEvent — what happened and when
33// ---------------------------------------------------------------------------
34
35/// A discrete event during scenario execution that may cause observable
36/// changes in scheduler behavior. Generated by step executors on the guest
37/// side and carried in the VM output alongside monitor samples.
38#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
39pub struct StimulusEvent {
40    /// Milliseconds since scenario start (guest monotonic clock).
41    pub elapsed_ms: u64,
42    /// Human-readable label. Produced as `"StepStart[k]"` by
43    /// [`Self::from_wire`] (the 0-indexed scenario Step ordinal),
44    /// `"ScenarioEnd"` by [`Self::terminal`], and the
45    /// `"BASELINE"`/`"Step[k]"` bucket label by the
46    /// `phase_from_bucket` placeholder. Test fixtures may carry any
47    /// label.
48    pub label: String,
49    /// What kind of operation triggered this event.
50    pub op_kind: Option<String>,
51    /// Additional context (e.g. "4 cpus", "cgroup=cg_0").
52    pub detail: Option<String>,
53    /// Cumulative worker iterations at this event. `Some(_)` for every
54    /// event built from the wire (the wire counter is always present —
55    /// see [`Self::from_wire`]); a cumulative counter for which
56    /// `Some(0)` is a legitimate "no iterations accumulated yet"
57    /// baseline, NOT a missing sample. `None` only for synthetic /
58    /// placeholder events that carry no counter (the
59    /// `phase_from_bucket` fallback and test fixtures). Used to
60    /// compute per-phase throughput (iterations/s) as the delta
61    /// between consecutive events.
62    ///
63    /// SEMANTICS: this is the sum of the iteration counters of the
64    /// worker handles ALIVE at the event instant (step-local +
65    /// Backdrop). Each step emits BOTH a StepStart event (counter at the
66    /// step's start) and a StepEnd event ([`Self::is_step_end`], counter
67    /// at the step's end-of-hold), so the per-phase iteration_rate is the
68    /// STEP-LOCAL delta `StepEnd[k] - StepStart[k]` — each step's OWN
69    /// workers measured start-to-end. That works for workers respawned
70    /// per step (the cross-step `StepStart[k+1] - StepStart[k]` delta
71    /// reads fresh~0 - fresh~0 and is dropped) AND is more accurate for
72    /// persistent (Backdrop) workers (it excludes the inter-step
73    /// teardown/respawn wall-time the cross-step window spanned). Bucket
74    /// `k` is sourced ONLY by its `StepStart[k] -> StepEnd[k]` pair: the
75    /// `iteration_rate` attribution loop in
76    /// [`crate::assert::build_phase_buckets_with_stimulus`] skips any
77    /// `is_step_end` `prev`, so a stalled step whose step-local delta is
78    /// zero (`StepEnd[k] == StepStart[k]`) reports its MEASURED-ZERO rate
79    /// `Some(0.0)` (see `Self::rate_to`) rather than leaking the
80    /// inter-step gap rate from the `StepEnd[k] -> StepStart[k+1]` pair.
81    /// The monitor-only
82    /// [`Timeline::build`] fallback (no snapshot captures) computes the
83    /// SAME step-local `StepStart[k] -> StepEnd[k]` rate — the StepEnd
84    /// events reach it too (they are emitted independent of captures) — and
85    /// falls back to cross-step (or the terminal for the last step) only
86    /// when a step has no StepEnd (sched-died / legacy data); StepEnd is
87    /// filtered only from that path's phase LAYOUT, not its rate.
88    pub total_iterations: Option<u64>,
89    /// 1-indexed scenario step this event belongs to (the same
90    /// encoding the bridge stamps: `1..=N` for Step ordinals), or
91    /// `None` for non-step events (including the terminal scenario-end
92    /// boundary; see `is_terminal`). Carried explicitly from the wire
93    /// `StimulusPayload.step_index` so the periodic-capture phase
94    /// attribution can map a capture's workload-relative boundary
95    /// offset onto the guest's own step timeline without parsing the
96    /// human-readable `label`.
97    pub step_index: Option<u16>,
98    /// True only for the synthetic scenario-end boundary the eval
99    /// walker appends from the `ScenarioEnd` wire frame's final
100    /// `total_iterations`. On a CLEAN run the last step emits its own
101    /// `StepEnd[N]`, which supplies that step's `iteration_rate` right
102    /// boundary in BOTH rate consumers — the snapshot path
103    /// ([`crate::assert::build_phase_buckets_with_stimulus`], the
104    /// `StepStart[N]` -> `StepEnd[N]` pair) and the monitor-only
105    /// [`Timeline::build`] fallback (which looks up each step's `StepEnd`
106    /// by `step_index`) — and the terminal is then NOT consumed for a
107    /// rate: the snapshot path's attribution loop skips the
108    /// `(StepEnd[N], terminal)` pair via its `is_step_end` guard (before
109    /// `rate_components` is reached), and `Timeline::build` reaches for the
110    /// terminal only when a step's `StepEnd` lookup misses. The terminal
111    /// is consumed as a step's rate boundary ONLY for legacy/synthetic
112    /// data that carries a `ScenarioEnd` frame but no `StepEnd` frames
113    /// (fresh guest output always pairs them). A sched-died step is NOT
114    /// such a case: its early return skips BOTH the `StepEnd` emission AND
115    /// `send_scenario_end`, so neither frame exists and the dead step
116    /// reports no rate via the no-successor path. It is NOT a step start:
117    /// `step_index` is `None` so it seeds no [`crate::assert::PhaseBucket`]
118    /// (excluded from the step-start timeline), and [`Timeline::build`]
119    /// skips it when laying out phase boundaries so it never renders a
120    /// phantom trailing phase.
121    pub is_terminal: bool,
122    /// True for a per-step END event (decoded from a
123    /// `crate::vmm::wire::MsgType::StepEnd` frame via
124    /// [`Self::from_step_end`]). It carries the SAME 1-indexed
125    /// `step_index` as its StepStart and its step's end-of-hold
126    /// `total_iterations`, so [`crate::assert::build_phase_buckets_with_stimulus`]'s
127    /// elapsed-sorted `windows(2)` pairs `StepStart[k]` -> `StepEnd[k]`
128    /// first and `or_insert` keeps that step-local rate. NOT a step
129    /// start, so [`Timeline::build`] (the monitor-only fallback's
130    /// index-based cross-step pairing) filters it out of its step-start
131    /// list to avoid a phantom phase.
132    pub is_step_end: bool,
133}
134
135impl StimulusEvent {
136    /// Build a timeline event from a deserialized wire stimulus event.
137    /// Centralizes the wire→timeline mapping so the production eval path
138    /// (`evaluate_vm_result`) and out-of-tree consumers — post_vm
139    /// callbacks folding `VmResult::stimulus_timeline()` (which calls
140    /// this internally) through
141    /// [`crate::assert::build_phase_buckets_with_stimulus`] — produce
142    /// identical events. The wire `step_index` is the bridge 1-indexed
143    /// convention (`Step[k]` -> `k + 1`, BASELINE owns 0); the human
144    /// `label` renders the 0-indexed Scenario-Step ordinal
145    /// (`step_index - 1`) to match the `PhaseBucket` `Step[k]` labels,
146    /// while the `step_index` field keeps the 1-indexed wire value for
147    /// phase-bucket remap. `total_iterations` is carried verbatim as
148    /// `Some(_)`: the wire field is a cumulative counter that is always
149    /// populated (the guest sums live worker iterations at every step
150    /// boundary), so `0` is a legitimate baseline reading — the FIRST
151    /// step's frame fires right after its workers spawn and genuinely
152    /// reads ~0. Collapsing that `0` to `None` (the old behavior) made
153    /// the (first, second) delta pair fail the `Some`/`Some` guard in
154    /// both rate consumers, silently dropping the first step's
155    /// `iteration_rate`; carrying `Some(0)` lets the delta compute the
156    /// first step's throughput for the PERSISTENT (Backdrop) population
157    /// (see the `total_iterations` field doc for the persistent-vs-
158    /// step-local semantics this delta measures).
159    pub fn from_wire(ev: &crate::vmm::wire::StimulusEvent) -> Self {
160        Self {
161            elapsed_ms: ev.elapsed_ms as u64,
162            label: format!("StepStart[{}]", ev.step_index.saturating_sub(1)),
163            op_kind: Some(format!("ops={}", ev.op_count)),
164            detail: Some(format!(
165                "{} cgroups, {} workers",
166                ev.cgroup_count, ev.worker_count,
167            )),
168            total_iterations: Some(ev.total_iterations),
169            step_index: Some(ev.step_index),
170            is_terminal: false,
171            is_step_end: false,
172        }
173    }
174
175    /// Build a per-step END event from a `crate::vmm::wire::MsgType::StepEnd`
176    /// frame (reuses the `crate::vmm::wire::StimulusEvent` wire body).
177    /// Carries the SAME 1-indexed `step_index` as the step's StepStart
178    /// and the step's end-of-hold `total_iterations`, with `is_step_end`
179    /// set. Elapsed-sorted, a step's events order `StepStart[k]` (start) <
180    /// `StepEnd[k]` (end-of-hold) < `StepStart[k+1]`, so
181    /// [`crate::assert::build_phase_buckets_with_stimulus`]'s `windows(2)`
182    /// pairs `StepStart[k]` -> `StepEnd[k]` first and `or_insert` keeps that
183    /// step-local rate. `is_terminal` is false (it is a real per-step
184    /// boundary, not the scenario-end terminal).
185    pub fn from_step_end(ev: &crate::vmm::wire::StimulusEvent) -> Self {
186        Self {
187            elapsed_ms: ev.elapsed_ms as u64,
188            label: format!("StepEnd[{}]", ev.step_index.saturating_sub(1)),
189            op_kind: Some(format!("ops={}", ev.op_count)),
190            detail: Some(format!(
191                "{} cgroups, {} workers",
192                ev.cgroup_count, ev.worker_count,
193            )),
194            total_iterations: Some(ev.total_iterations),
195            step_index: Some(ev.step_index),
196            is_terminal: false,
197            is_step_end: true,
198        }
199    }
200
201    /// Build the synthetic terminal boundary event from the
202    /// `ScenarioEnd` wire frame's final cumulative `total_iterations`
203    /// and scenario-relative `elapsed_ms`. Appended once, after every
204    /// per-step [`Self::from_wire`] event. On a clean run `StepEnd[N]`
205    /// supplies the last step's `iteration_rate` right boundary in both
206    /// rate consumers and the terminal is not consumed for a rate; it is
207    /// consumed as a step's boundary ONLY for legacy/synthetic data with a
208    /// `ScenarioEnd` frame but no `StepEnd` frames (a sched-died step has
209    /// neither, since the early return skips both emissions) — see the
210    /// [`Self::is_terminal`] field doc.
211    /// `step_index` is `None` (it is not a step start — it seeds no
212    /// [`crate::assert::PhaseBucket`]) and `is_terminal` is set so
213    /// [`Timeline::build`] treats it as a right boundary only, never a
214    /// phase. `elapsed_ms` is in the same guest-monotonic frame as the
215    /// step events (both come from `scenario_start.elapsed()`), so the
216    /// last-step duration is well-formed.
217    pub fn terminal(elapsed_ms: u64, total_iterations: u64) -> Self {
218        Self {
219            elapsed_ms,
220            label: "ScenarioEnd".to_string(),
221            op_kind: None,
222            detail: None,
223            total_iterations: Some(total_iterations),
224            step_index: None,
225            is_terminal: true,
226            is_step_end: false,
227        }
228    }
229
230    /// Iterations-per-second from this event to `next`:
231    /// `(next.total_iterations - self.total_iterations)` over the
232    /// guest-clock elapsed-ms delta between them. Returns `None` ONLY when
233    /// the measurement is genuinely undefined: either event lacks a
234    /// `total_iterations` sample, the window is zero-length, or the count
235    /// went BACKWARD (`next < self` — a counter reset; the delta is
236    /// unmeasurable, not zero). The backward case is reachable only for the
237    /// guard-skipped cross-step pairing or legacy/synthetic data, NOT for
238    /// the live step-local `StepStart[k]` -> `StepEnd[k]` pair: teardown
239    /// runs after `StepEnd` is emitted, so the handle set is stable within
240    /// a step and the per-worker counters are monotone across the pair.
241    ///
242    /// MEASURED ZERO is distinct from not-measured: a step whose workers
243    /// made exactly zero forward progress over a positive hold
244    /// (`next == self`) returns `Some(0.0)`, not `None`. Zero throughput
245    /// is a real, measured value — the strongest degradation signal — so
246    /// it must surface, not vanish. With `Some(0.0)` a phase that
247    /// collapsed to zero IS visible to the throughput-degradation detector
248    /// ([`Timeline::build`] / [`Timeline::from_phase_buckets`]): when the
249    /// prior phase had a positive rate (`before > 0.0`), the relative
250    /// delta is `-1.0` and the drop is flagged. (A phase that was already
251    /// zero before is still not relatively comparable — the detector's
252    /// `before > 0.0` gate avoids a div-by-zero — but an *unchanged* zero
253    /// is not a degradation.)
254    ///
255    /// This is the SINGLE iteration-rate formula, shared via its
256    /// decomposition [`Self::rate_components`] by
257    /// [`crate::assert::build_phase_buckets_with_stimulus`] (per-step
258    /// windows attributed by `step_index`) and [`Timeline::build`]
259    /// (per-phase windows attributed by index) — the two callers pair
260    /// events differently but must compute the rate identically. The
261    /// per-step metric producer inserts the `rate_components` pair (the
262    /// `iteration_rate` Rate's `total_phase_iterations` /
263    /// `total_phase_duration_sec` components); `rate_to` (the quotient) is
264    /// the display/comparison form used by `Timeline::build` and the
265    /// result-helper ratios.
266    pub fn rate_to(&self, next: &StimulusEvent) -> Option<f64> {
267        self.rate_components(next).map(|(iters, secs)| iters / secs)
268    }
269
270    /// The `(iteration_delta, window_seconds)` components of [`Self::rate_to`]
271    /// — same `None` conditions (missing `total_iterations`, backward count,
272    /// or zero-length window). The per-phase metric pipeline inserts these as
273    /// the `total_phase_iterations` / `total_phase_duration_sec` Counter
274    /// components rather than the ready ratio, so the `iteration_rate` Rate
275    /// re-pools across phases/runs as `Σdelta / Σseconds`, not a mean of
276    /// per-phase ratios. The ms→s `/1000` lives HERE (the seconds component)
277    /// because `derive_rate_metrics` does a bare num/den with no scaling.
278    pub fn rate_components(&self, next: &StimulusEvent) -> Option<(f64, f64)> {
279        let s = self.total_iterations?;
280        let e = next.total_iterations?;
281        if e < s {
282            return None;
283        }
284        let duration_ms = next.elapsed_ms.saturating_sub(self.elapsed_ms);
285        if duration_ms == 0 {
286            return None;
287        }
288        Some(((e - s) as f64, duration_ms as f64 / 1000.0))
289    }
290
291    /// The scenario [`Phase`](crate::assert::Phase) this event belongs to,
292    /// or `None` for the terminal scenario-end boundary (which seeds no
293    /// phase). Use THIS — not the raw [`Self::step_index`] field — to key
294    /// per-phase lookups. `step_index` carries the bridge 1-indexed wire
295    /// convention (`Step k` -> `Some(k + 1)`) while `label` renders the
296    /// 0-indexed `k`, so reading the field directly invites the 0-vs-1
297    /// off-by-one this method removes: it maps the wire value onto the same
298    /// [`Phase`](crate::assert::Phase) newtype the
299    /// [`ScenarioStats`](crate::assert::ScenarioStats) /
300    /// [`PhaseBucket`](crate::assert::PhaseBucket) accessors are keyed by
301    /// (`Phase::step(k)`). Step events carry `step_index >= 1`, so the
302    /// `saturating_sub(1)` is exact.
303    pub fn phase(&self) -> Option<crate::assert::Phase> {
304        self.step_index
305            .map(|si| crate::assert::Phase::step(si.saturating_sub(1)))
306    }
307}
308
309// ---------------------------------------------------------------------------
310// Phase — a time window between consecutive stimulus events
311// ---------------------------------------------------------------------------
312
313/// Metrics aggregated from monitor samples within a phase.
314#[derive(Debug, Clone, Default)]
315pub struct PhaseMetrics {
316    pub sample_count: usize,
317    /// Mean CPU-imbalance ratio over the phase's valid samples. `None`
318    /// when the phase had no valid samples (monitor-only `Timeline::build`)
319    /// or its source bucket carried no `avg_imbalance_ratio` metric
320    /// (snapshot `from_phase_buckets`) — distinct from a real `Some(0.0)`
321    /// (perfectly balanced). The change detector compares it only when
322    /// both sides are `Some`, so an absent phase never reads as a false
323    /// zero-imbalance.
324    pub avg_imbalance: Option<f64>,
325    /// Peak CPU-imbalance ratio over the phase's valid samples. `None` on
326    /// the same no-data conditions as [`Self::avg_imbalance`].
327    pub max_imbalance: Option<f64>,
328    /// Mean local-DSQ depth over the phase's valid samples. `None` on the
329    /// same no-data conditions as [`Self::avg_imbalance`].
330    pub avg_dsq_depth: Option<f64>,
331    /// Mean runqueue occupancy (full-class `rq.nr_running`) over the phase's
332    /// valid samples — per-sample mean of the per-CPU `nr_running`, averaged
333    /// over samples (the `avg_dsq_depth` shape). `None` on the same no-data
334    /// conditions as [`Self::avg_imbalance`]. The run-level value stays
335    /// `MonitorSummary::avg_nr_running` (folded by `fold_run_level_ext`); this
336    /// per-phase field feeds rendering + boundary change-detection only and is
337    /// kept out of the run-level ext re-pool (see `populate_run_ext_metrics_from_phases`).
338    pub avg_nr_running: Option<f64>,
339    pub max_dsq_depth: u32,
340    pub stall_count: usize,
341    /// select_cpu_fallback events per second. None when event counters unavailable.
342    pub fallback_rate: Option<f64>,
343    /// dispatch_keep_last events per second. None when event counters unavailable.
344    pub keep_last_rate: Option<f64>,
345    /// Worker iterations per second during this phase. Computed from
346    /// cumulative iteration counts in consecutive stimulus events.
347    pub iteration_rate: Option<f64>,
348}
349
350/// Direction of change at a phase boundary.
351#[derive(Debug, Clone, Copy, PartialEq, Eq)]
352pub enum ChangeDirection {
353    Improved,
354    Degraded,
355}
356
357impl fmt::Display for ChangeDirection {
358    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
359        match self {
360            ChangeDirection::Improved => write!(f, "IMPROVEMENT"),
361            ChangeDirection::Degraded => write!(f, "DEGRADATION"),
362        }
363    }
364}
365
366/// Detected change at a stimulus boundary.
367#[derive(Debug, Clone)]
368pub struct PhaseChange {
369    pub direction: ChangeDirection,
370    pub metric: String,
371    pub before: f64,
372    pub after: f64,
373}
374
375/// A time window between two consecutive stimulus events.
376#[derive(Debug, Clone)]
377pub struct Phase {
378    pub index: usize,
379    pub start_ms: u64,
380    pub end_ms: u64,
381    /// The stimulus event that starts this phase (None for the initial phase).
382    pub stimulus: Option<StimulusEvent>,
383    pub metrics: PhaseMetrics,
384    /// Changes detected at this phase's stimulus boundary.
385    pub changes: Vec<PhaseChange>,
386    /// Per-cgroup raw telemetry for this phase, keyed by cgroup name. Carried
387    /// from [`crate::assert::PhaseBucket::per_cgroup`] on the
388    /// [`Self`]-via-`from_phase_buckets` path; empty on the monitor-only
389    /// [`Timeline::build`] path (which has no carriers). Rendered as a
390    /// per-cgroup sub-block by display-time reduction
391    /// ([`crate::assert::PhaseCgroupStats::off_cpu_summary`] etc.); never a
392    /// change-detection input (those are [`PhaseMetrics`] scalars only).
393    pub per_cgroup: std::collections::BTreeMap<String, crate::assert::PhaseCgroupStats>,
394    /// True when `(start_ms, end_ms)` is the normalized `(0, 0)` of an ORPHAN
395    /// carrier (no measured host window) rather than a real window. Set in
396    /// `phase_from_bucket` by the orphan shape signature `(0,0)` + empty
397    /// `metrics` + non-empty `per_cgroup` (unique to
398    /// `crate::assert::fold_guest_per_cgroup_into_host_buckets`'s orphan arm).
399    /// The render shows "window not measured" instead of a misleading `0ms`.
400    /// Always `false` on the [`Timeline::build`] path.
401    pub not_measured_window: bool,
402}
403
404// ---------------------------------------------------------------------------
405// Timeline
406// ---------------------------------------------------------------------------
407
408/// Correlated timeline of stimulus events and monitor observations.
409#[derive(Debug, Clone)]
410pub struct Timeline {
411    pub phases: Vec<Phase>,
412}
413
414/// Minimum delta in imbalance ratio to flag a change (avoids noise).
415const IMBALANCE_THRESHOLD: f64 = 0.5;
416/// Minimum delta in DSQ depth to flag a change.
417const DSQ_THRESHOLD: f64 = 3.0;
418/// Minimum delta in mean runqueue depth (avg_nr_running) to flag a change:
419/// one additional runnable task per CPU on average between phases.
420const NR_RUNNING_THRESHOLD: f64 = 1.0;
421/// Minimum delta in fallback rate (events/s) to flag a change.
422const FALLBACK_RATE_THRESHOLD: f64 = 10.0;
423/// Minimum delta in keep_last rate (events/s) to flag a change.
424const KEEP_LAST_RATE_THRESHOLD: f64 = 10.0;
425/// Minimum relative change in iteration rate to flag a throughput change.
426/// 0.3 = 30% drop or increase.
427const ITERATION_RATE_REL_THRESHOLD: f64 = 0.3;
428
429/// Create a PhaseChange if the delta between `before` and `after` exceeds
430/// `threshold`. `higher_is_worse` determines degradation direction: when
431/// true, a positive delta means Degraded; when false, a negative delta
432/// means Degraded.
433fn detect_change(
434    before: f64,
435    after: f64,
436    threshold: f64,
437    metric: &str,
438    higher_is_worse: bool,
439) -> Option<PhaseChange> {
440    let delta = after - before;
441    if delta.abs() <= threshold {
442        return None;
443    }
444    let degraded = if higher_is_worse {
445        delta > 0.0
446    } else {
447        delta < 0.0
448    };
449    Some(PhaseChange {
450        direction: if degraded {
451            ChangeDirection::Degraded
452        } else {
453            ChangeDirection::Improved
454        },
455        metric: metric.to_string(),
456        before,
457        after,
458    })
459}
460
461/// Per-boundary change set between two adjacent phases, shared by both
462/// [`Timeline::build`] and [`Timeline::from_phase_buckets`].
463///
464/// Throughput (`iteration_rate`) is compared whenever BOTH phases carry
465/// a rate and the earlier one is positive — INCLUDING a synthesized
466/// zero-capture step, whose rate is stimulus-derived (total_iterations
467/// deltas) rather than sampled. Throughput is the one metric that
468/// survives a capture gap, so a throughput collapse entering or leaving
469/// a synthesized step must still be flagged; gating it on `sample_count`
470/// (as both call sites did before) silently dropped exactly the
471/// degradation this timeline exists to surface.
472///
473/// Asymmetry from the `bi > 0.0` div-by-zero guard: a COLLAPSE into a
474/// zero-rate (incl. a synthesized measured-zero) step is flagged
475/// (before > 0), but a RECOVERY out of one (before == 0 -> positive) is
476/// not — there is no relative baseline to divide by. This is a
477/// deliberate tradeoff, not an oversight: an unchanged zero is genuinely
478/// not a degradation, and the collapse direction (the one this task
479/// targets) is caught.
480///
481/// The monitor-derived metrics (imbalance, dsq depth, fallback rate,
482/// keep_last rate) ARE gated on both phases having real samples. The
483/// values themselves can be real even on a synthesized phase (monitor
484/// windows fold into the bucket), but they come from a DIFFERENT
485/// sampling basis (folded monitor window vs captured periodic samples)
486/// than a captured neighbor's, so cross-comparing them is
487/// apples-to-oranges — suppressed. Throughput is exempt because it is
488/// the same stimulus-derived quantity on both sides.
489fn detect_boundary_changes(before: &PhaseMetrics, after: &PhaseMetrics) -> Vec<PhaseChange> {
490    let mut changes = Vec::new();
491
492    // Monitor-derived metrics (absolute-unit gauges/rates with
493    // fixed-magnitude thresholds) — gated on both phases having real
494    // samples. Pushed FIRST to preserve the historical render order
495    // (these before throughput in a multi-change boundary; format_phases
496    // renders changes in vec order).
497    if before.sample_count > 0 && after.sample_count > 0 {
498        if let (Some(bi), Some(ai)) = (before.avg_imbalance, after.avg_imbalance) {
499            changes.extend(detect_change(
500                bi,
501                ai,
502                IMBALANCE_THRESHOLD,
503                "imbalance",
504                true,
505            ));
506        }
507        if let (Some(bd), Some(ad)) = (before.avg_dsq_depth, after.avg_dsq_depth) {
508            changes.extend(detect_change(bd, ad, DSQ_THRESHOLD, "dsq_depth", true));
509        }
510        if let (Some(bn), Some(an)) = (before.avg_nr_running, after.avg_nr_running) {
511            changes.extend(detect_change(
512                bn,
513                an,
514                NR_RUNNING_THRESHOLD,
515                "nr_running",
516                true,
517            ));
518        }
519        if let (Some(bf), Some(af)) = (before.fallback_rate, after.fallback_rate) {
520            changes.extend(detect_change(
521                bf,
522                af,
523                FALLBACK_RATE_THRESHOLD,
524                "fallback",
525                true,
526            ));
527        }
528        if let (Some(bk), Some(ak)) = (before.keep_last_rate, after.keep_last_rate) {
529            changes.extend(detect_change(
530                bk,
531                ak,
532                KEEP_LAST_RATE_THRESHOLD,
533                "keep_last",
534                true,
535            ));
536        }
537    }
538
539    // Throughput is the SOLE rate-class field (counter-delta / elapsed), so
540    // it uses a RELATIVE threshold (rel = (ai-bi)/bi vs
541    // ITERATION_RATE_REL_THRESHOLD) and cannot route through detect_change's
542    // absolute-delta gate above — fixed-unit metrics get absolute
543    // thresholds, this gets a relative one (a semantic-class decision, not
544    // arbitrary). Strict `>`: an exactly-at-threshold relative change is not
545    // flagged. Pushed last so it renders after the monitor metrics.
546    if let (Some(bi), Some(ai)) = (before.iteration_rate, after.iteration_rate)
547        && bi > 0.0
548    {
549        let rel = (ai - bi) / bi;
550        if rel.abs() > ITERATION_RATE_REL_THRESHOLD {
551            changes.push(PhaseChange {
552                direction: if rel < 0.0 {
553                    ChangeDirection::Degraded
554                } else {
555                    ChangeDirection::Improved
556                },
557                metric: "throughput".to_string(),
558                before: bi,
559                after: ai,
560            });
561        }
562    }
563
564    changes
565}
566
567impl Timeline {
568    /// Build a timeline from stimulus events and monitor samples.
569    ///
570    /// Clock alignment: stimulus events use guest monotonic time (ms since
571    /// scenario start). Monitor samples use host monotonic time (ms since
572    /// VM boot). The first stimulus event's timestamp and the first
573    /// non-trivial monitor sample (after 500ms warmup) approximately
574    /// coincide. We compute an offset to align them.
575    ///
576    /// Returns an empty timeline if either input is empty.
577    /// Build a Timeline from stimulus events + raw monitor
578    /// samples via the per-window `compute_metrics` reduction.
579    /// The production success path uses [`Self::from_phase_buckets`]
580    /// (which folds pre-bucketed PhaseBuckets); `build` is the fallback
581    /// evaluate_vm_result takes only for a run with an EMPTY PhaseBuckets
582    /// vec but monitor samples present — i.e. no periodic captures AND no
583    /// stimulus Steps. A monitor-only run that DID run Steps now
584    /// synthesizes a capture-free bucket per StepStart (see
585    /// [`crate::assert::build_phase_buckets_with_stimulus`]), so its vec is
586    /// non-empty and it takes the from_phase_buckets path (whose
587    /// fold_monitor_into_bucket recovers the same monitor-derived metric
588    /// set this path computes). Both entry points produce the same
589    /// Timeline field shape; from_phase_buckets is preferred when buckets
590    /// are available because it avoids the per-MonitorSample reduction.
591    ///
592    /// `preemption_threshold_ns` threads the vCPU-preemption exemption
593    /// window into the per-phase stall predicate (see `compute_metrics`);
594    /// the production caller passes the run's
595    /// `MonitorReport::preemption_threshold_ns`. `0` derives it from the
596    /// guest kernel `CONFIG_HZ`.
597    pub fn build(
598        stimulus_events: &[StimulusEvent],
599        monitor_samples: &[MonitorSample],
600        preemption_threshold_ns: u64,
601    ) -> Self {
602        if stimulus_events.is_empty() || monitor_samples.is_empty() {
603            return Self { phases: Vec::new() };
604        }
605
606        let mut events = stimulus_events.to_vec();
607        // Total-order on an elapsed_ms tie: StepEnd before StepStart
608        // (`!is_step_end` is false=0 for StepEnd) so a zero-length
609        // inter-step gap at the guest's coarse-ms clock attributes the
610        // step-local StepStart[k]->StepEnd[k] rate to bucket k, never the
611        // cross-step StepStart[k]->StepStart[k+1] delta. Mirrors the same
612        // sort in build_phase_buckets_with_stimulus so the two rate
613        // consumers stay identical.
614        events.sort_by_key(|e| (e.elapsed_ms, !e.is_step_end));
615
616        // Clock alignment: find the offset between guest stimulus time
617        // and host monitor time. The first stimulus event (ScenarioStart)
618        // and the first monitor sample with plausible data roughly coincide.
619        let first_stimulus_ms = events[0].elapsed_ms;
620        let first_monitor_ms = monitor_samples
621            .iter()
622            .find(|s| s.elapsed_ms > 500 && !s.cpus.is_empty())
623            .map(|s| s.elapsed_ms)
624            .unwrap_or_else(|| monitor_samples.first().map(|s| s.elapsed_ms).unwrap_or(0));
625
626        // offset: add this to a stimulus timestamp to get monitor time
627        let offset = first_monitor_ms as i64 - first_stimulus_ms as i64;
628
629        // Define phase boundaries from consecutive stimulus events.
630        // Each pair (events[i], events[i+1]) bounds a phase.
631        // The last event to end-of-data is also a phase.
632        let last_monitor_ms = monitor_samples.last().map(|s| s.elapsed_ms).unwrap_or(0);
633
634        // The terminal scenario-end event is a rate right
635        // boundary ONLY — it seeds no phase. Extract it explicitly
636        // rather than relying on it sorting last for positional
637        // alignment: a corrupt / out-of-order step `elapsed_ms` (a u32
638        // read off the wire) could otherwise shift it into the middle
639        // of `events` and misalign the dense phase index against the
640        // step events. `step_events` is the phase-bearing set.
641        let terminal: Option<&StimulusEvent> = events.iter().find(|e| e.is_terminal);
642        // StepStart events only — the PHASE-LAYOUT set. Per-step StepEnd
643        // events are excluded here because a StepEnd seeds no new phase
644        // (it is an end-of-hold marker, not a step boundary); including
645        // them would produce a phantom extra phase and misalign the dense
646        // phase index. StepEnd events are NOT discarded, though: the
647        // step-local iteration_rate loop below pairs each StepStart[k]
648        // with its own StepEnd[k] (looked up by step_index in the full
649        // `events` vec), matching build_phase_buckets_with_stimulus. The
650        // dense-index cross-step pairing is kept only as a fallback for
651        // steps that have no StepEnd (a sched-died step, or legacy data
652        // predating the StepEnd frame).
653        let step_events: Vec<&StimulusEvent> = events
654            .iter()
655            .filter(|e| !e.is_terminal && !e.is_step_end)
656            .collect();
657
658        let mut boundaries: Vec<(u64, u64, Option<StimulusEvent>)> = Vec::new();
659        for i in 0..step_events.len() {
660            let start = (step_events[i].elapsed_ms as i64 + offset).max(0) as u64;
661            // The LAST step phase extends to end-of-monitor-data, NOT to
662            // the terminal event: the terminal is a rate boundary only,
663            // and clamping the last phase's metric window to it would
664            // drop trailing monitor samples (the host keeps sampling
665            // through teardown). Preserves the pre-terminal window.
666            let end = if i + 1 < step_events.len() {
667                (step_events[i + 1].elapsed_ms as i64 + offset).max(0) as u64
668            } else {
669                last_monitor_ms.saturating_add(1)
670            };
671            let stimulus = if i == 0 {
672                None
673            } else {
674                Some(step_events[i].clone())
675            };
676            boundaries.push((start, end, stimulus));
677        }
678
679        // Assign monitor samples to phases and compute metrics.
680        let mut phases: Vec<Phase> = Vec::with_capacity(boundaries.len());
681        for (idx, (start, end, stimulus)) in boundaries.into_iter().enumerate() {
682            let phase_samples: Vec<&MonitorSample> = monitor_samples
683                .iter()
684                .filter(|s| s.elapsed_ms >= start && s.elapsed_ms < end && sample_looks_valid(s))
685                .collect();
686
687            let metrics = compute_metrics(&phase_samples, preemption_threshold_ns);
688
689            phases.push(Phase {
690                // Enumerate position over the phase-bearing step_events,
691                // NOT the bucket step_index `phase_from_bucket` uses. This
692                // path's input is a monitor-only / legacy / test stream with
693                // no step_index-bearing Stimulus frames (the settle is
694                // step_events[0]; later events follow), so enumerate IS the
695                // step identity and `index == 0 => BASELINE` holds for this
696                // model. It diverges from the step_index model ONLY for a
697                // step_index-bearing stream with no leading settle (the
698                // production stimulus shape), which never reaches `build`:
699                // build is the monitor-only fallback, taken only when there
700                // are no PhaseBuckets, i.e. no StepStarts
701                // (build_phase_buckets_with_stimulus synthesizes a bucket
702                // per StepStart) — so a step_index-bearing production stream
703                // always takes from_phase_buckets, never this path.
704                index: idx,
705                start_ms: start,
706                end_ms: end,
707                stimulus,
708                metrics,
709                changes: Vec::new(),
710                // Monitor-only path: no per-cgroup carriers, real window.
711                per_cgroup: std::collections::BTreeMap::new(),
712                not_measured_window: false,
713            });
714        }
715
716        // Per-phase iteration rate, STEP-LOCAL: each step's rate is its
717        // own `StepStart[k] -> StepEnd[k]` delta — the step's OWN workers
718        // measured start-to-end-of-hold, matching the snapshot path
719        // (`build_phase_buckets_with_stimulus`). StepEnd events are
720        // present in `events` (emitted independent of snapshot captures)
721        // even on this monitor-only path, so the same step-local model
722        // applies; without it, workers respawned fresh each step read
723        // ~0 -> ~0 cross-step and every fresh-per-step phase but the last
724        // silently reported no throughput. A step with NO StepEnd falls
725        // back to the cross-step successor, or the terminal scenario-end
726        // event for the last step — but that fallback yields a rate only
727        // for legacy/synthetic data (a ScenarioEnd frame present without
728        // per-step StepEnd frames). A sched-died step has neither a
729        // StepEnd nor a terminal (the early return skips both emissions),
730        // so its lookup and fallback both miss and it correctly reports no
731        // rate. Duration is the guest-clock elapsed-ms delta between the
732        // paired events — independent of the metric-sample window above
733        // (whose last phase reaches end-of-monitor-data).
734        #[allow(clippy::needless_range_loop)]
735        for i in 0..phases.len() {
736            let this = step_events[i];
737            // Step-local boundary: this step's own StepEnd (same
738            // step_index). Cross-step successor / terminal only when the
739            // step has no StepEnd.
740            let step_end: Option<&StimulusEvent> = this.step_index.and_then(|k| {
741                events
742                    .iter()
743                    .find(|e| e.is_step_end && e.step_index == Some(k))
744            });
745            let next: Option<&StimulusEvent> = step_end.or_else(|| {
746                if i + 1 < step_events.len() {
747                    Some(step_events[i + 1])
748                } else {
749                    terminal
750                }
751            });
752            // Timeline::build's display fallback: compute this phase's rate
753            // directly via rate_to. The metric-pipeline producer
754            // build_phase_buckets_with_stimulus shares the same rate
755            // semantics via rate_components (it emits the two Counter
756            // components that derive_rate_metrics re-pools into
757            // iteration_rate); this display field reads the quotient.
758            if let Some(next_ev) = next
759                && let Some(rate) = this.rate_to(next_ev)
760            {
761                phases[i].metrics.iteration_rate = Some(rate);
762            }
763        }
764
765        // Detect changes at each phase boundary. Throughput is compared
766        // even across synthesized zero-capture steps; monitor-derived
767        // metrics stay gated on both phases having real samples (see
768        // [`detect_boundary_changes`]).
769        for i in 1..phases.len() {
770            let changes = detect_boundary_changes(&phases[i - 1].metrics, &phases[i].metrics);
771            phases[i].changes = changes;
772        }
773
774        Self { phases }
775    }
776
777    /// Format the timeline with a system context header.
778    ///
779    /// Tests without a real context pass `&TimelineContext::default()`;
780    /// the header lines (`kernel:`, `topology:`, etc.) are omitted but
781    /// the `--- timeline ---` prefix is preserved.
782    // No parameterless format() sibling: output with default context
783    // is byte-identical, but the only non-test caller
784    // (crate::test_support::eval) always has real context, so format()
785    // would be dead code.
786    pub fn format_with_context(&self, ctx: &TimelineContext) -> String {
787        if self.phases.is_empty() {
788            return String::new();
789        }
790
791        let mut out = String::from("--- timeline ---\n");
792
793        // Render context header.
794        let mut header_parts = Vec::new();
795        if let Some(ref k) = ctx.kernel {
796            header_parts.push(format!("kernel: {k}"));
797        }
798        if let Some(ref t) = ctx.topology {
799            header_parts.push(format!("topology: {t}"));
800        }
801        if let Some(ref s) = ctx.scheduler {
802            header_parts.push(format!("scheduler: {s}"));
803        }
804        if let Some(ref s) = ctx.scenario {
805            header_parts.push(format!("scenario: {s}"));
806        }
807        if let Some(d) = ctx.duration_s {
808            header_parts.push(format!("duration: {d:.1}s"));
809        }
810        if !header_parts.is_empty() {
811            for part in &header_parts {
812                out.push_str(part);
813                out.push_str("  ");
814            }
815            // Trim trailing "  " appended by the last iteration.
816            // Explicit length guard so a future edit that stops
817            // appending the separator here can't underflow.
818            if out.len() >= 2 {
819                out.truncate(out.len() - 2);
820            }
821            out.push('\n');
822        }
823
824        self.format_phases(&mut out);
825        out
826    }
827
828    /// Render phase details into the output buffer.
829    fn format_phases(&self, out: &mut String) {
830        for phase in &self.phases {
831            let duration_ms = phase.end_ms.saturating_sub(phase.start_ms);
832            // An orphan carrier carries a normalized (0,0) window that is NOT a
833            // measured zero-duration step; surface it as not-measured rather
834            // than a misleading 0ms (the None-vs-Some discipline the per-metric
835            // renders use, applied to the window).
836            let window = if phase.not_measured_window {
837                "window not measured".to_string()
838            } else {
839                format!("{duration_ms}ms")
840            };
841
842            if phase.index == 0 {
843                // Phase 0 is the settle window before any stimulus.
844                out.push_str(&format!(
845                    "\nBASELINE (settle, {}, {} samples):\n",
846                    window, phase.metrics.sample_count,
847                ));
848            } else {
849                let label_start = phase
850                    .stimulus
851                    .as_ref()
852                    .map(|s| {
853                        let mut l = s.label.clone();
854                        if let Some(op) = &s.op_kind {
855                            l.push(' ');
856                            l.push_str(op);
857                        }
858                        l
859                    })
860                    .unwrap_or_else(|| "?".to_string());
861
862                out.push_str(&format!(
863                    "\nPhase {}: {} ({}, {} samples):\n",
864                    phase.index, label_start, window, phase.metrics.sample_count,
865                ));
866            }
867
868            let m = &phase.metrics;
869            // Render the metric block whenever the phase carries
870            // monitor-derived metrics, not only when it captured periodic
871            // samples: a SYNTHESIZED zero-capture bucket
872            // (build_phase_buckets_with_stimulus) has sample_count 0 but
873            // fold_monitor_into_bucket fills its imbalance / dsq / fallback
874            // / stall from in-window monitor samples — render them for
875            // parity with the legacy Timeline::build path (which a
876            // zero-capture-with-monitor run took before the synthesize
877            // seam flipped it onto from_phase_buckets).
878            let has_monitor_metrics = m.avg_imbalance.is_some()
879                || m.max_imbalance.is_some()
880                || m.avg_dsq_depth.is_some()
881                || m.avg_nr_running.is_some()
882                || m.max_dsq_depth > 0
883                || m.fallback_rate.is_some()
884                || m.keep_last_rate.is_some()
885                || m.stall_count > 0;
886            if m.sample_count > 0 || has_monitor_metrics {
887                out.push_str(&format!(
888                    "  imbalance: avg={} max={} | dsq: avg={} max={} | nr_run: avg={}",
889                    m.avg_imbalance
890                        .map_or_else(|| "n/a".to_string(), |v| format!("{v:.1}")),
891                    m.max_imbalance
892                        .map_or_else(|| "n/a".to_string(), |v| format!("{v:.1}")),
893                    m.avg_dsq_depth
894                        .map_or_else(|| "n/a".to_string(), |v| format!("{v:.0}")),
895                    m.max_dsq_depth,
896                    m.avg_nr_running
897                        .map_or_else(|| "n/a".to_string(), |v| format!("{v:.1}")),
898                ));
899                if let Some(fb) = m.fallback_rate {
900                    out.push_str(&format!(" | fallback: {:.0}/s", fb));
901                }
902                if let Some(kl) = m.keep_last_rate {
903                    out.push_str(&format!(" | keep_last: {:.0}/s", kl));
904                }
905                if let Some(ir) = m.iteration_rate {
906                    // A synthesized (sample_count==0) step's rate is
907                    // stimulus-derived; label it consistently with the
908                    // no-monitor-metrics branch below.
909                    let suffix = if m.sample_count == 0 {
910                        " (stimulus-derived)"
911                    } else {
912                        ""
913                    };
914                    out.push_str(&format!(" | throughput: {ir:.0} iter/s{suffix}"));
915                }
916                out.push('\n');
917                if m.stall_count > 0 {
918                    out.push_str(&format!("  stalls: {}\n", m.stall_count));
919                }
920            } else if let Some(ir) = m.iteration_rate {
921                // Synthesized zero-capture step (the
922                // build_phase_buckets_with_stimulus seam): no periodic
923                // captures landed, but the stimulus StepStart/StepEnd
924                // deltas still yield a throughput. Surface it so a short
925                // interior step's recovered rate is visible in the
926                // rendered timeline, not only via the structured
927                // phase_metric API.
928                out.push_str(&format!(
929                    "  [no samples] | throughput: {ir:.0} iter/s (stimulus-derived)\n"
930                ));
931            } else {
932                out.push_str("  [no samples]\n");
933            }
934
935            format_phase_cgroups(out, &phase.per_cgroup);
936
937            if let Some(ref stim) = phase.stimulus {
938                let detail = stim.detail.as_deref().unwrap_or("");
939                let op = stim.op_kind.as_deref().unwrap_or("?");
940                out.push_str(&format!("  >>> {}: {op}", stim.label));
941                if !detail.is_empty() {
942                    out.push_str(&format!(" ({detail})"));
943                }
944                out.push('\n');
945            }
946
947            for change in &phase.changes {
948                let delta = change.after - change.before;
949                let sign = if delta > 0.0 { "+" } else { "" };
950                out.push_str(&format!(
951                    "  >>> {}: {} {sign}{:.1}\n",
952                    change.direction, change.metric, delta,
953                ));
954            }
955        }
956    }
957
958    /// Build a [`Timeline`] from pre-bucketed
959    /// [`crate::assert::PhaseBucket`]s emitted by the metric pipeline.
960    /// Preferred over [`Self::build`] when the caller already has
961    /// `PhaseBucket`s in hand — avoids re-deriving phase boundaries
962    /// from stimulus events + monitor samples by walking the buckets
963    /// directly.
964    ///
965    /// One [`Phase`] is emitted per bucket, in `step_index` order.
966    /// `PhaseMetrics` fields are populated from the bucket's
967    /// `metrics` map via a name-keyed mapping:
968    ///
969    /// | PhaseBucket metric key  | PhaseMetrics field      |
970    /// |-------------------------|-------------------------|
971    /// | `max_imbalance_ratio`   | `max_imbalance`         |
972    /// | `avg_imbalance_ratio`   | `avg_imbalance`         |
973    /// | `max_dsq_depth`         | `max_dsq_depth`         |
974    /// | `avg_dsq_depth`         | `avg_dsq_depth`         |
975    /// | `avg_nr_running`        | `avg_nr_running`        |
976    /// | `stuck_count`           | `stall_count`           |
977    /// | `total_fallback`        | `fallback_rate` (rate)  |
978    /// | `total_keep_last`       | `keep_last_rate` (rate) |
979    /// | `iteration_rate`        | `iteration_rate`        |
980    ///
981    /// Rate fields (`fallback_rate`, `keep_last_rate`) are computed
982    /// by dividing the bucket's reduced counter delta by the
983    /// bucket's window duration in seconds
984    /// (`(end_ms - start_ms) / 1000.0`). When the window has zero
985    /// duration (degenerate bucket) the rate stays `None`.
986    ///
987    /// Every PhaseMetrics field has a PhaseBucket source — but
988    /// `iteration_rate` only when build_phase_buckets_with_stimulus
989    /// (not the plain build_phase_buckets) produced the bucket.
990    /// `iteration_rate` requires stimulus events that the per-test
991    /// scenario produces; the plain bucket-builder used by some
992    /// tests doesn't have access to them. Defaults to `None` when
993    /// PhaseBucket.metrics has no `iteration_rate` key.
994    ///
995    /// `changes` (boundary degradation detection) IS computed
996    /// here by diffing adjacent `PhaseMetrics` fields — same
997    /// detection logic [`Self::build`] uses, applied after the
998    /// per-bucket conversion. avg_imbalance + avg_dsq_depth are
999    /// supplied by PhaseBucket so the detection runs on the same
1000    /// fields as the legacy path.
1001    pub fn from_phase_buckets(
1002        phase_buckets: &[crate::assert::PhaseBucket],
1003        stimulus_events: &[StimulusEvent],
1004        _ctx: &TimelineContext,
1005    ) -> Self {
1006        let mut sorted: Vec<&crate::assert::PhaseBucket> = phase_buckets.iter().collect();
1007        sorted.sort_by_key(|b| b.step_index);
1008        // Sort stimulus events by elapsed_ms so correlation finds
1009        // the closest event for each bucket window deterministically.
1010        // The terminal scenario-end event is excluded: it carries no
1011        // step ops/detail to render and its elapsed_ms lands past
1012        // every bucket window, so it would never correlate — filtering
1013        // it keeps the correlation set to real step starts only.
1014        // Per-step StepEnd events are likewise excluded so each bucket's
1015        // rendered op/detail label correlates to the step's defining
1016        // StepStart, not its end-of-hold marker (the bucket's iteration_rate
1017        // is already the step-local value computed upstream).
1018        let mut sorted_events: Vec<&StimulusEvent> = stimulus_events
1019            .iter()
1020            .filter(|e| !e.is_terminal && !e.is_step_end)
1021            .collect();
1022        sorted_events.sort_by_key(|e| e.elapsed_ms);
1023        let mut phases: Vec<Phase> = sorted
1024            .into_iter()
1025            .map(|b| phase_from_bucket(b, &sorted_events))
1026            .collect();
1027        // Boundary-change detection — shares [`detect_boundary_changes`]
1028        // with [`Self::build`]. Walks each adjacent (prev, curr) pair and
1029        // records significant deltas on the LATER phase's `changes` vec so
1030        // the operator sees "what changed when entering this phase".
1031        // Throughput is compared even when a side is a SYNTHESIZED
1032        // zero-capture bucket (build_phase_buckets_with_stimulus): its
1033        // `iteration_rate` is stimulus-derived and real, so a throughput
1034        // collapse entering or leaving it must surface. The monitor-derived
1035        // metrics (imbalance / dsq depth / fallback / keep_last) stay gated
1036        // inside the helper on both phases having real samples, so a
1037        // partial-metric phase never paints a phantom non-throughput change.
1038        // An orphan phase (a not-measured (0,0) window with all-None
1039        // PhaseMetrics) sits between its neighbors here, so detect_boundary_changes
1040        // compares each real neighbor against the orphan's None metrics — which
1041        // the helper gates away (both sides must be Some) — rather than across
1042        // the unmeasured window. INTENTIONAL: there is no data for the orphan's
1043        // step, so flagging a phase-k-1 -> phase-k+1 change as if they were
1044        // adjacent would assert a transition over an unmeasured interval. This is
1045        // render-only (Phase.changes has no verdict/sidecar/A-B consumer).
1046        for i in 1..phases.len() {
1047            let changes = detect_boundary_changes(&phases[i - 1].metrics, &phases[i].metrics);
1048            phases[i].changes = changes;
1049        }
1050        Self { phases }
1051    }
1052
1053    /// Test helper — collect all degradation changes across phases.
1054    /// Retained after the gauntlet analyzer was removed; the scenarios
1055    /// pipeline consumes `Timeline` via `format_with_context` and does
1056    /// not read degradations directly.
1057    #[cfg(test)]
1058    pub fn degradations(&self) -> Vec<(&Phase, &PhaseChange)> {
1059        let mut out = Vec::new();
1060        for phase in &self.phases {
1061            for change in &phase.changes {
1062                if change.direction == ChangeDirection::Degraded {
1063                    out.push((phase, change));
1064                }
1065            }
1066        }
1067        out
1068    }
1069}
1070
1071// ---------------------------------------------------------------------------
1072// PhaseBucket → Phase conversion
1073// ---------------------------------------------------------------------------
1074
1075/// Build a [`Phase`] from a [`crate::assert::PhaseBucket`]. The phase
1076/// index is the bucket's `step_index` (BASELINE = 0, scenario Step k =
1077/// k + 1), NOT the enumerate position in the vec — so `format_phases`
1078/// keys its BASELINE-vs-Step label on the true phase identity, and a run
1079/// whose first bucket is a Step (no BASELINE bucket, e.g. under
1080/// `--cell-parent-cgroup` where BASELINE captured nothing) renders that
1081/// Step correctly rather than mislabeling it as BASELINE. The metric map
1082/// is projected onto the named `PhaseMetrics` fields per the table in
1083/// [`Timeline::from_phase_buckets`]. BASELINE (`step_index` 0) emits
1084/// `stimulus = None`; later phases synthesize a [`StimulusEvent`] whose
1085/// label / op_kind come from the bucket label so the failure-message
1086/// renderer prints a recognizable phase header.
1087fn phase_from_bucket(b: &crate::assert::PhaseBucket, sorted_events: &[&StimulusEvent]) -> Phase {
1088    let duration_s = if b.end_ms > b.start_ms {
1089        (b.end_ms - b.start_ms) as f64 / 1000.0
1090    } else {
1091        0.0
1092    };
1093    // Rate computation: counter-delta / duration_s. duration_s == 0
1094    // disables the rate (None) — degenerate buckets shouldn't
1095    // produce spurious infinities.
1096    let rate = |key: &str| -> Option<f64> {
1097        if duration_s <= 0.0 {
1098            return None;
1099        }
1100        b.metrics.get(key).map(|v| v / duration_s)
1101    };
1102    let metrics = PhaseMetrics {
1103        sample_count: b.sample_count,
1104        avg_imbalance: b.metrics.get("avg_imbalance_ratio").copied(),
1105        max_imbalance: b.metrics.get("max_imbalance_ratio").copied(),
1106        avg_dsq_depth: b.metrics.get("avg_dsq_depth").copied(),
1107        avg_nr_running: b.metrics.get("avg_nr_running").copied(),
1108        max_dsq_depth: b
1109            .metrics
1110            .get("max_dsq_depth")
1111            .map(|v| v.round() as u32)
1112            .unwrap_or(0),
1113        stall_count: b
1114            .metrics
1115            .get("stuck_count")
1116            .map(|v| v.round() as usize)
1117            .unwrap_or(0),
1118        fallback_rate: rate("total_fallback"),
1119        keep_last_rate: rate("total_keep_last"),
1120        // iteration_rate is a derived Rate: derive_rate_metrics already
1121        // placed the Σiterations/Σseconds quotient into the bucket map, so
1122        // read it verbatim — do NOT divide by duration (unlike
1123        // fallback_rate / keep_last_rate above, which divide their Counter
1124        // by the window).
1125        iteration_rate: b.metrics.get("iteration_rate").copied(),
1126    };
1127    let stimulus = if b.step_index == 0 {
1128        None
1129    } else {
1130        // Correlate with the closest StimulusEvent whose
1131        // elapsed_ms falls in [start_ms, end_ms]. Carrying the
1132        // real event preserves op_kind + detail in the failure-
1133        // message timeline render — `phase_from_bucket`'s prior
1134        // synthesis of a placeholder StimulusEvent with op_kind
1135        // = None / detail = None produced "Step[N]: ?" headers
1136        // that lost the operator-facing per-phase context the
1137        // legacy Timeline::build path carried.
1138        let correlated = sorted_events.iter().find(|e| {
1139            if b.start_ms == b.end_ms {
1140                e.elapsed_ms == b.start_ms
1141            } else {
1142                e.elapsed_ms >= b.start_ms && e.elapsed_ms < b.end_ms
1143            }
1144        });
1145        match correlated {
1146            Some(ev) => Some((*ev).clone()),
1147            None => Some(StimulusEvent {
1148                elapsed_ms: b.start_ms,
1149                label: b.label.clone(),
1150                op_kind: None,
1151                detail: None,
1152                total_iterations: None,
1153                // Synthetic placeholder for a bucket with no
1154                // correlated stimulus event; no authoritative step
1155                // ordinal to carry.
1156                step_index: None,
1157                is_terminal: false,
1158                is_step_end: false,
1159            }),
1160        }
1161    };
1162    // Orphan signature: fold_guest_per_cgroup_into_host_buckets normalizes a
1163    // guest carrier with no paired host bucket to a (0,0) window carrying ONLY
1164    // per_cgroup (empty metrics). A captured bucket has metrics, so the
1165    // (0,0)+empty-metrics+non-empty-per_cgroup shape is the orphan arm's on
1166    // every NON-zero-duration window. The one other producer is a ZERO-duration
1167    // step at scenario start (StepStart[k] == StepEnd[k] == 0 -> a synthesized
1168    // host (0,0) window with empty metrics, since duration 0 yields no rate and
1169    // no in-window monitor sample, then MATCHED with a same-step guest carrier),
1170    // but that collision is HARMLESS: a zero-duration step has no window to
1171    // measure, so "window not measured" reads the same as the "0ms" it would
1172    // otherwise show. Display-only, with no verdict/sidecar consumer,
1173    // so the marker needs no serialized PhaseBucket flag.
1174    let not_measured_window =
1175        b.start_ms == 0 && b.end_ms == 0 && b.metrics.is_empty() && !b.per_cgroup.is_empty();
1176    Phase {
1177        index: b.step_index as usize,
1178        start_ms: b.start_ms,
1179        end_ms: b.end_ms,
1180        stimulus,
1181        metrics,
1182        changes: Vec::new(),
1183        per_cgroup: b.per_cgroup.clone(),
1184        not_measured_window,
1185    }
1186}
1187
1188/// Cap on per-cgroup lines rendered per phase, bounding the failure-message
1189/// size for a many-cgroup scenario (the sched_log render caps similarly).
1190/// Truncation is by BTreeMap NAME order (deterministic, no ranking math); a
1191/// "+J more" note records the drop so the cap never reads as "all cgroups".
1192const MAX_RENDERED_CGROUPS: usize = 16;
1193
1194/// Render the per-cgroup sub-block for one phase: one line per cgroup in
1195/// BTreeMap (name) order, reduced at display time via the
1196/// [`crate::assert::PhaseCgroupStats`] summaries. Empty `per_cgroup` (the
1197/// monitor-only path, or a phase that carried no per-cgroup components)
1198/// renders nothing. The None-vs-Some(0.0)
1199/// discipline carries through: an absent off-CPU reduction renders `n/a` (NOT
1200/// `0.0%`), and an empty wake / run-delay pool OMITS that segment rather than
1201/// painting a misleading `0µs`.
1202fn format_phase_cgroups(
1203    out: &mut String,
1204    per_cgroup: &std::collections::BTreeMap<String, crate::assert::PhaseCgroupStats>,
1205) {
1206    if per_cgroup.is_empty() {
1207        return;
1208    }
1209    out.push_str("  per-cgroup:\n");
1210    for (name, pcg) in per_cgroup.iter().take(MAX_RENDERED_CGROUPS) {
1211        out.push_str(&format!("    {name}: "));
1212        // A stripped carrier had its raw sample vectors dropped to fit the bulk
1213        // frame, so off-cpu / wake / run-delay summaries are all absent — but
1214        // that is NOT "not measured". Surface the size-limit drop explicitly so
1215        // the operator does not read it as a quiet cgroup.
1216        if pcg.stripped {
1217            out.push_str("samples stripped (size limit)");
1218        } else {
1219            match pcg.off_cpu_summary() {
1220                Some((avg, min, max, spread)) => out.push_str(&format!(
1221                    "off-cpu avg={avg:.1}% min={min:.1}% max={max:.1}% spread={spread:.1}%"
1222                )),
1223                None => out.push_str("off-cpu n/a"),
1224            }
1225            if let Some((p99, median)) = pcg.wake_summary() {
1226                out.push_str(&format!(
1227                    " | wake p99={p99:.0}\u{00b5}s median={median:.0}\u{00b5}s"
1228                ));
1229            }
1230            if let Some((mean, worst)) = pcg.run_delay_summary() {
1231                out.push_str(&format!(
1232                    " | run-delay mean={mean:.0}\u{00b5}s worst={worst:.0}\u{00b5}s"
1233                ));
1234            }
1235        }
1236        out.push_str(&format!(
1237            " | iters={} migrations={}",
1238            pcg.total_iterations, pcg.total_migrations
1239        ));
1240        // Gap is a Peak with no Option: 0 means "no notable gap", so omit it
1241        // rather than print a noisy gap=0ms on every quiet cgroup.
1242        if pcg.max_gap_ms > 0 {
1243            out.push_str(&format!(
1244                " | gap={}ms@cpu{}",
1245                pcg.max_gap_ms, pcg.max_gap_cpu
1246            ));
1247        }
1248        out.push('\n');
1249    }
1250    let total = per_cgroup.len();
1251    if total > MAX_RENDERED_CGROUPS {
1252        let dropped = total - MAX_RENDERED_CGROUPS;
1253        let noun = if dropped == 1 { "cgroup" } else { "cgroups" };
1254        out.push_str(&format!("    (+{dropped} more {noun})\n"));
1255    }
1256}
1257
1258// ---------------------------------------------------------------------------
1259// Metric computation
1260// ---------------------------------------------------------------------------
1261
1262/// Reduce a phase's monitor samples to [`PhaseMetrics`].
1263///
1264/// `preemption_threshold_ns` is the vCPU-preemption exemption window for
1265/// stall detection: a non-advancing `rq_clock` on a CPU whose
1266/// `vcpu_cpu_time_ns` advanced by less than this is a host-preemption
1267/// artifact, not a scheduler stall, and is exempt. Pass `0` to derive it
1268/// from the guest kernel's `CONFIG_HZ` via
1269/// `crate::monitor::vcpu_preemption_threshold_ns` — the same resolution
1270/// [`MonitorSummary::from_samples_with_threshold`](crate::monitor::MonitorSummary::from_samples_with_threshold)
1271/// applies, so the per-phase `stall_count` applies the SAME per-(CPU,
1272/// window) `is_cpu_stuck` predicate as the run-level
1273/// `MonitorSummary::stuck_count` (run-level `>=` Σ per-phase: it also
1274/// windows across phase boundaries).
1275pub(crate) fn compute_metrics(
1276    samples: &[&MonitorSample],
1277    preemption_threshold_ns: u64,
1278) -> PhaseMetrics {
1279    if samples.is_empty() {
1280        return PhaseMetrics::default();
1281    }
1282
1283    // Filter out samples with implausible data (e.g. garbage DSQ depths
1284    // from uninitialized guest memory) before computing metrics.
1285    let valid: Vec<&MonitorSample> = samples
1286        .iter()
1287        .copied()
1288        .filter(|s| !s.cpus.is_empty() && sample_looks_valid(s))
1289        .collect();
1290
1291    if valid.is_empty() {
1292        return PhaseMetrics {
1293            sample_count: 0,
1294            ..PhaseMetrics::default()
1295        };
1296    }
1297
1298    let mut total_imbalance = 0.0f64;
1299    let mut max_imbalance = 0.0f64;
1300    let mut total_dsq = 0.0f64;
1301    let mut total_nr_running = 0.0f64;
1302    let mut max_dsq = 0u32;
1303    let mut stall_count = 0usize;
1304
1305    for sample in &valid {
1306        for cpu in &sample.cpus {
1307            max_dsq = max_dsq.max(cpu.local_dsq_depth);
1308        }
1309        let ratio = sample.imbalance_ratio();
1310        total_imbalance += ratio;
1311        if ratio > max_imbalance {
1312            max_imbalance = ratio;
1313        }
1314
1315        let avg_dsq_this: f64 = sample
1316            .cpus
1317            .iter()
1318            .map(|c| c.local_dsq_depth as f64)
1319            .sum::<f64>()
1320            / sample.cpus.len() as f64;
1321        total_dsq += avg_dsq_this;
1322
1323        // Per-sample mean of per-CPU nr_running (full-class runqueue depth),
1324        // averaged over samples below — the avg_dsq_depth shape. `valid`
1325        // guarantees `!cpus.is_empty()`, so the divisor is nonzero.
1326        let avg_nr_this: f64 =
1327            sample.cpus.iter().map(|c| c.nr_running as f64).sum::<f64>() / sample.cpus.len() as f64;
1328        total_nr_running += avg_nr_this;
1329    }
1330
1331    // Stall detection between consecutive valid samples in this phase.
1332    // Route through `is_cpu_stuck` (the shared predicate the run-level
1333    // `MonitorSummary` path also uses) so the per-phase stall count and the
1334    // run-level stuck count apply the identical NOHZ-idle and
1335    // vCPU-preemption exemptions — the per-phase count uses the SAME
1336    // predicate as the run-level one (run-level `>=` Σ per-phase: it also
1337    // counts the boundary-straddling window pair and out-of-phase samples).
1338    let threshold = if preemption_threshold_ns > 0 {
1339        preemption_threshold_ns
1340    } else {
1341        crate::monitor::vcpu_preemption_threshold_ns(None)
1342    };
1343    for w in valid.windows(2) {
1344        let prev = w[0];
1345        let curr = w[1];
1346        let cpu_count = prev.cpus.len().min(curr.cpus.len());
1347        for cpu in 0..cpu_count {
1348            if crate::monitor::reader::is_cpu_stuck(&prev.cpus[cpu], &curr.cpus[cpu], threshold) {
1349                stall_count += 1;
1350            }
1351        }
1352    }
1353
1354    // Event counter rates: sum counters across CPUs for first/last valid
1355    // samples that have event_counters, compute delta / duration.
1356    let has_events = |s: &&MonitorSample| s.cpus.iter().any(|c| c.event_counters.is_some());
1357    let first_ev = valid.iter().copied().find(|s| has_events(s));
1358    let last_ev = valid.iter().copied().rev().find(|s| has_events(s));
1359
1360    let (fallback_rate, keep_last_rate) = match (first_ev, last_ev) {
1361        (Some(first), Some(last)) if first.elapsed_ms < last.elapsed_ms => {
1362            // `<` guard above is expected to rule out underflow, but
1363            // `saturating_sub` is defense-in-depth: if a future change
1364            // loosens the guard, the worst outcome becomes
1365            // `duration_s == 0.0` (which disables the rate below) rather
1366            // than a panic.
1367            let duration_s = last.elapsed_ms.saturating_sub(first.elapsed_ms) as f64 / 1000.0;
1368            // Event counters can reset mid-run (scheduler restart) and
1369            // produce a negative raw delta. Shared helper clamps to
1370            // >= 0 so the computed rate never goes negative; same
1371            // semantics as MonitorSummary::compute_event_deltas.
1372            let fb_delta = crate::monitor::counter_delta(
1373                last.sum_event_field(|e| e.select_cpu_fallback).unwrap_or(0),
1374                first
1375                    .sum_event_field(|e| e.select_cpu_fallback)
1376                    .unwrap_or(0),
1377            );
1378            let kl_delta = crate::monitor::counter_delta(
1379                last.sum_event_field(|e| e.dispatch_keep_last).unwrap_or(0),
1380                first.sum_event_field(|e| e.dispatch_keep_last).unwrap_or(0),
1381            );
1382            (
1383                Some(fb_delta as f64 / duration_s),
1384                Some(kl_delta as f64 / duration_s),
1385            )
1386        }
1387        _ => (None, None),
1388    };
1389
1390    let valid_count = valid.len();
1391    let n = valid_count as f64;
1392    // None when no valid samples — avoids a 0.0/0.0 NaN and keeps "no
1393    // data" distinct from a real zero (the detector skips None sides).
1394    PhaseMetrics {
1395        sample_count: valid_count,
1396        avg_imbalance: (valid_count > 0).then(|| total_imbalance / n),
1397        max_imbalance: (valid_count > 0).then_some(max_imbalance),
1398        avg_dsq_depth: (valid_count > 0).then(|| total_dsq / n),
1399        avg_nr_running: (valid_count > 0).then(|| total_nr_running / n),
1400        max_dsq_depth: max_dsq,
1401        stall_count,
1402        fallback_rate,
1403        keep_last_rate,
1404        iteration_rate: None,
1405    }
1406}
1407
1408// ---------------------------------------------------------------------------
1409// Tests
1410// ---------------------------------------------------------------------------
1411
1412#[cfg(test)]
1413mod tests {
1414    use super::*;
1415    use crate::monitor::{CpuSnapshot, MonitorSample};
1416
1417    /// `StimulusEvent::phase()` maps the 1-indexed wire `step_index` onto the
1418    /// canonical [`crate::assert::Phase`] (StepStart and StepEnd of step `k`
1419    /// both -> `Phase::step(k)`); the scenario-end terminal seeds no phase.
1420    #[test]
1421    fn stimulus_event_phase_maps_wire_step_index_to_phase() {
1422        use crate::assert::Phase;
1423        // Wire step_index 1 (Step 0) -> Phase::step(0); 2 (Step 1) -> step(1).
1424        assert_eq!(
1425            StimulusEvent::from_wire(&wire_event(0, 1, 0)).phase(),
1426            Some(Phase::step(0)),
1427        );
1428        assert_eq!(
1429            StimulusEvent::from_wire(&wire_event(100, 2, 50)).phase(),
1430            Some(Phase::step(1)),
1431        );
1432        // StepEnd carries the same step_index -> same Phase as its StepStart.
1433        assert_eq!(
1434            StimulusEvent::from_step_end(&wire_event(200, 2, 90)).phase(),
1435            Some(Phase::step(1)),
1436        );
1437        // The terminal boundary is not a step -> no Phase.
1438        assert_eq!(StimulusEvent::terminal(300, 100).phase(), None);
1439    }
1440
1441    fn sample(elapsed_ms: u64, cpus: Vec<(u32, u32, u64)>) -> MonitorSample {
1442        MonitorSample {
1443            bpf_map_fields: Vec::new(),
1444            prog_stats: None,
1445            psi_irq: None,
1446            elapsed_ms,
1447            cpus: cpus
1448                .into_iter()
1449                .map(|(nr_running, dsq, rq_clock)| CpuSnapshot {
1450                    nr_running,
1451                    scx_nr_running: 0,
1452                    local_dsq_depth: dsq,
1453                    rq_clock,
1454                    scx_flags: 0,
1455                    event_counters: None,
1456                    schedstat: None,
1457                    vcpu_cpu_time_ns: None,
1458                    vcpu_perf: None,
1459                    avg_irq_util: None,
1460                    sched_domains: None,
1461                })
1462                .collect(),
1463        }
1464    }
1465
1466    fn stimulus(elapsed_ms: u64, label: &str) -> StimulusEvent {
1467        StimulusEvent {
1468            elapsed_ms,
1469            label: label.to_string(),
1470            op_kind: None,
1471            detail: None,
1472            total_iterations: None,
1473            step_index: None,
1474            is_terminal: false,
1475            is_step_end: false,
1476        }
1477    }
1478
1479    #[test]
1480    fn empty_inputs_empty_timeline() {
1481        let t = Timeline::build(&[], &[], 0);
1482        assert!(t.phases.is_empty());
1483    }
1484
1485    #[test]
1486    fn no_stimulus_empty_timeline() {
1487        let samples = vec![sample(1000, vec![(2, 1, 100)])];
1488        let t = Timeline::build(&[], &samples, 0);
1489        assert!(t.phases.is_empty());
1490    }
1491
1492    #[test]
1493    fn no_monitor_empty_timeline() {
1494        let events = vec![stimulus(0, "ScenarioStart")];
1495        let t = Timeline::build(&events, &[], 0);
1496        assert!(t.phases.is_empty());
1497    }
1498
1499    #[test]
1500    fn single_event_single_phase() {
1501        let events = vec![stimulus(0, "ScenarioStart")];
1502        let samples = vec![
1503            sample(600, vec![(2, 1, 100), (2, 1, 200)]),
1504            sample(700, vec![(2, 1, 300), (2, 1, 400)]),
1505        ];
1506        let t = Timeline::build(&events, &samples, 0);
1507        assert_eq!(t.phases.len(), 1);
1508        // Both samples — including the one AT last_monitor_ms (700) —
1509        // must fall inside the single phase's [start, last_monitor_ms+1)
1510        // window. A > 0 check passes even if the last-sample-inclusion
1511        // off-by-one (end = last_monitor_ms+1) regressed to +0, dropping
1512        // the 700 sample. Pin the exact count.
1513        assert_eq!(t.phases[0].metrics.sample_count, 2);
1514    }
1515
1516    #[test]
1517    fn two_events_two_phases() {
1518        let events = vec![stimulus(0, "ScenarioStart"), stimulus(3000, "StepStart[0]")];
1519        let samples: Vec<MonitorSample> = (5..65)
1520            .map(|i| sample(i * 100, vec![(2, 1, i * 1000), (2, 1, i * 1000 + 100)]))
1521            .collect();
1522        let t = Timeline::build(&events, &samples, 0);
1523        assert_eq!(t.phases.len(), 2);
1524        // Pin WHERE the boundary fell, not just non-emptiness: 60 samples
1525        // at i*100 (i in 5..65 → 500..6400); the >500 warmup drops the
1526        // 500 sample (i=5), leaving 59. The StepStart[0]@3000 boundary
1527        // (offset-adjusted) splits them 30/29. A > 0 check passes even if
1528        // the offset/boundary math shifted the split point while leaving
1529        // samples on both sides.
1530        assert_eq!(t.phases[0].metrics.sample_count, 30);
1531        assert_eq!(t.phases[1].metrics.sample_count, 29);
1532        assert_eq!(
1533            t.phases[0].metrics.sample_count + t.phases[1].metrics.sample_count,
1534            59,
1535            "59 = 60 samples minus the 500ms sample dropped by the >500 warmup",
1536        );
1537    }
1538
1539    #[test]
1540    fn improvement_detected() {
1541        // Phase 0: imbalanced
1542        // Phase 1: balanced
1543        let events = vec![stimulus(0, "ScenarioStart"), stimulus(1000, "StepStart[0]")];
1544        let mut samples = Vec::new();
1545        for i in 5..15 {
1546            samples.push(sample(
1547                i * 100,
1548                vec![(1, 1, i * 1000), (5, 1, i * 1000 + 100)],
1549            ));
1550        }
1551        for i in 15..25 {
1552            samples.push(sample(
1553                i * 100,
1554                vec![(2, 1, i * 1000), (2, 1, i * 1000 + 100)],
1555            ));
1556        }
1557        let t = Timeline::build(&events, &samples, 0);
1558        let improvements: Vec<_> = t
1559            .phases
1560            .iter()
1561            .flat_map(|p| p.changes.iter())
1562            .filter(|c| c.direction == ChangeDirection::Improved)
1563            .collect();
1564        assert!(!improvements.is_empty());
1565    }
1566
1567    #[test]
1568    fn format_non_empty() {
1569        let events = vec![stimulus(0, "ScenarioStart"), stimulus(1000, "StepStart[0]")];
1570        let samples: Vec<MonitorSample> = (5..25)
1571            .map(|i| sample(i * 100, vec![(2, 1, i * 1000), (2, 1, i * 1000 + 100)]))
1572            .collect();
1573        let t = Timeline::build(&events, &samples, 0);
1574        let formatted = t.format_with_context(&TimelineContext::default());
1575        assert!(formatted.contains("BASELINE"));
1576        assert!(formatted.contains("Phase 1"));
1577        assert!(formatted.contains("imbalance"));
1578    }
1579
1580    /// A synthesized zero-capture step (sample_count==0) still renders its
1581    /// stimulus-derived throughput in the formatted timeline, not only
1582    /// "[no samples]". Pins the synthesized-step visibility in format_phases. The
1583    /// BASELINE bucket holds step_index 0 (its phase index, the settle
1584    /// render) so the synthesized step lands at a Phase index that takes
1585    /// the metric path.
1586    #[test]
1587    fn format_renders_synthesized_step_throughput() {
1588        let buckets = vec![
1589            crate::assert::PhaseBucket {
1590                per_cgroup: Default::default(),
1591                step_index: 0,
1592                label: "BASELINE".to_string(),
1593                start_ms: 0,
1594                end_ms: 1000,
1595                sample_count: 2,
1596                metrics: std::collections::BTreeMap::new(),
1597            },
1598            crate::assert::PhaseBucket {
1599                per_cgroup: Default::default(),
1600                step_index: 1,
1601                label: "Step[0]".to_string(),
1602                start_ms: 1000,
1603                end_ms: 2000,
1604                sample_count: 0, // synthesized zero-capture step
1605                metrics: std::collections::BTreeMap::from([("iteration_rate".to_string(), 1500.0)]),
1606            },
1607        ];
1608        let t = Timeline::from_phase_buckets(&buckets, &[], &TimelineContext::default());
1609        let formatted = t.format_with_context(&TimelineContext::default());
1610        assert!(
1611            formatted.contains("throughput: 1500 iter/s (stimulus-derived)"),
1612            "a synthesized step must render its stimulus-derived throughput, \
1613             not only '[no samples]'; got:\n{formatted}",
1614        );
1615    }
1616
1617    // -- orphan not-measured marker + per-cgroup sub-block render --
1618
1619    /// An orphan bucket — the unique (0,0)-window + empty-metrics +
1620    /// non-empty-per_cgroup shape from the fold's orphan arm — renders "window
1621    /// not measured", NOT a misleading "0ms".
1622    #[test]
1623    fn format_renders_orphan_window_as_not_measured() {
1624        let mut per_cgroup = std::collections::BTreeMap::new();
1625        per_cgroup.insert(
1626            "cg".to_string(),
1627            crate::assert::PhaseCgroupStats {
1628                off_cpu_pcts: vec![80.0],
1629                total_iterations: 900_000,
1630                ..Default::default()
1631            },
1632        );
1633        let buckets = vec![crate::assert::PhaseBucket {
1634            per_cgroup,
1635            step_index: 1,
1636            label: "Step[0]".to_string(),
1637            start_ms: 0,
1638            end_ms: 0,
1639            sample_count: 0,
1640            metrics: std::collections::BTreeMap::new(),
1641        }];
1642        let t = Timeline::from_phase_buckets(&buckets, &[], &TimelineContext::default());
1643        let formatted = t.format_with_context(&TimelineContext::default());
1644        assert!(
1645            formatted.contains("window not measured"),
1646            "orphan window must render not-measured; got:\n{formatted}",
1647        );
1648        assert!(
1649            !formatted.contains("(0ms,"),
1650            "orphan must NOT render as 0ms; got:\n{formatted}",
1651        );
1652        // The whole point of routing orphan carriers through the render (vs the
1653        // pre-fold path that dropped them) is that their per-cgroup telemetry —
1654        // an orphan's ONLY payload — SURFACES alongside the not-measured marker.
1655        assert!(
1656            formatted.contains("per-cgroup:"),
1657            "orphan's per-cgroup sub-block must render; got:\n{formatted}",
1658        );
1659        assert!(
1660            formatted.contains("cg: off-cpu avg=80.0%"),
1661            "orphan carrier's off-cpu reduction must render; got:\n{formatted}",
1662        );
1663        assert!(
1664            formatted.contains("iters=900000"),
1665            "orphan carrier's counters must render; got:\n{formatted}",
1666        );
1667        // The cg carrier has off-cpu but NO wake/run-delay pools — pin the
1668        // per-line omit when off-cpu is the sole reduction present (the most
1669        // likely real orphan-carrier shape).
1670        let cg_line = formatted
1671            .lines()
1672            .find(|l| l.contains("cg: off-cpu"))
1673            .expect("cg line");
1674        assert!(!cg_line.contains("wake p99="), "got:\n{formatted}");
1675        assert!(!cg_line.contains("run-delay mean="), "got:\n{formatted}");
1676    }
1677
1678    /// Orphan-adjacency suppression: an orphan phase (all-None
1679    /// PhaseMetrics) BETWEEN two real phases makes detect_boundary_changes
1680    /// compare each real neighbor against the orphan's gated-away None metrics,
1681    /// NOT across the unmeasured window — so a step1->step3 throughput collapse
1682    /// is NOT flagged on step3 (no data for the intervening orphan step).
1683    /// INTENTIONAL + render-only; pins the documented from_phase_buckets behavior.
1684    #[test]
1685    fn format_orphan_phase_suppresses_cross_orphan_change_detection() {
1686        let real = |step: u16, rate: f64| crate::assert::PhaseBucket {
1687            per_cgroup: Default::default(),
1688            step_index: step,
1689            label: format!("Step[{}]", step - 1),
1690            start_ms: step as u64 * 1000,
1691            end_ms: step as u64 * 1000 + 500,
1692            sample_count: 5,
1693            metrics: std::collections::BTreeMap::from([("iteration_rate".to_string(), rate)]),
1694        };
1695        let mut orphan_pc = std::collections::BTreeMap::new();
1696        orphan_pc.insert(
1697            "cg".to_string(),
1698            crate::assert::PhaseCgroupStats {
1699                off_cpu_pcts: vec![50.0],
1700                ..Default::default()
1701            },
1702        );
1703        let orphan = crate::assert::PhaseBucket {
1704            per_cgroup: orphan_pc,
1705            step_index: 2,
1706            label: "Step[1]".to_string(),
1707            start_ms: 0,
1708            end_ms: 0,
1709            sample_count: 0,
1710            metrics: std::collections::BTreeMap::new(),
1711        };
1712        // CONTROL: step1 (rate 10000) adjacent to step3 (rate 1000) — a 90%
1713        // collapse (> the 30% rel threshold) IS flagged as a change on step3.
1714        let ctrl = Timeline::from_phase_buckets(
1715            &[real(1, 10000.0), real(3, 1000.0)],
1716            &[],
1717            &TimelineContext::default(),
1718        );
1719        let ctrl_step3 = ctrl.phases.iter().find(|p| p.index == 3).expect("step3");
1720        assert!(
1721            !ctrl_step3.changes.is_empty(),
1722            "control: adjacent step1->step3 throughput collapse IS flagged; got: {:?}",
1723            ctrl_step3.changes,
1724        );
1725        // With the orphan between, step3 is compared against the orphan's None
1726        // metrics -> the change is suppressed (no data for the gap).
1727        let t = Timeline::from_phase_buckets(
1728            &[real(1, 10000.0), orphan, real(3, 1000.0)],
1729            &[],
1730            &TimelineContext::default(),
1731        );
1732        let step3 = t.phases.iter().find(|p| p.index == 3).expect("step3");
1733        assert!(
1734            step3.changes.is_empty(),
1735            "orphan between suppresses the cross-orphan change; got: {:?}",
1736            step3.changes,
1737        );
1738        assert!(
1739            t.format_with_context(&TimelineContext::default())
1740                .contains("window not measured"),
1741            "the orphan still renders not-measured",
1742        );
1743    }
1744
1745    /// The orphan signature is GUARDED on empty metrics: a (0,0)-window bucket
1746    /// that carries metrics is a captured bucket (or a measured zero-duration
1747    /// step), NOT an orphan — it renders 0ms, never "window not measured".
1748    #[test]
1749    fn format_does_not_mark_not_measured_when_metrics_present() {
1750        let mut per_cgroup = std::collections::BTreeMap::new();
1751        per_cgroup.insert(
1752            "cg".to_string(),
1753            crate::assert::PhaseCgroupStats {
1754                off_cpu_pcts: vec![50.0],
1755                ..Default::default()
1756            },
1757        );
1758        let buckets = vec![crate::assert::PhaseBucket {
1759            per_cgroup,
1760            step_index: 1,
1761            label: "Step[0]".to_string(),
1762            start_ms: 0,
1763            end_ms: 0,
1764            sample_count: 1,
1765            metrics: std::collections::BTreeMap::from([("avg_imbalance_ratio".to_string(), 1.0)]),
1766        }];
1767        let t = Timeline::from_phase_buckets(&buckets, &[], &TimelineContext::default());
1768        let formatted = t.format_with_context(&TimelineContext::default());
1769        assert!(
1770            !formatted.contains("window not measured"),
1771            "a (0,0) window WITH metrics is captured, not an orphan; got:\n{formatted}",
1772        );
1773        assert!(
1774            formatted.contains("(0ms,"),
1775            "a measured zero-duration window renders 0ms; got:\n{formatted}",
1776        );
1777    }
1778
1779    /// The per-cgroup sub-block renders one line per cgroup (name order), with
1780    /// the None-vs-Some discipline: a not-measured off-CPU reduction renders
1781    /// `n/a` (not `0.0%`), and an empty wake/run-delay pool omits that segment.
1782    #[test]
1783    fn format_renders_per_cgroup_subblock_none_aware() {
1784        let mut per_cgroup = std::collections::BTreeMap::new();
1785        per_cgroup.insert(
1786            "cg_a".to_string(),
1787            crate::assert::PhaseCgroupStats {
1788                off_cpu_pcts: vec![80.0, 84.0],
1789                wake_latencies_ns: vec![100_000, 120_000],
1790                run_delays_ns: vec![45_000],
1791                total_iterations: 900_000,
1792                total_migrations: 12,
1793                max_gap_ms: 8,
1794                max_gap_cpu: 3,
1795                ..Default::default()
1796            },
1797        );
1798        per_cgroup.insert(
1799            "cg_b".to_string(),
1800            crate::assert::PhaseCgroupStats {
1801                off_cpu_pcts: vec![],
1802                total_iterations: 80_000,
1803                ..Default::default()
1804            },
1805        );
1806        let buckets = vec![crate::assert::PhaseBucket {
1807            per_cgroup,
1808            step_index: 1,
1809            label: "Step[0]".to_string(),
1810            start_ms: 1000,
1811            end_ms: 6000,
1812            sample_count: 10,
1813            metrics: std::collections::BTreeMap::from([("avg_imbalance_ratio".to_string(), 1.5)]),
1814        }];
1815        let t = Timeline::from_phase_buckets(&buckets, &[], &TimelineContext::default());
1816        let formatted = t.format_with_context(&TimelineContext::default());
1817        assert!(
1818            formatted.contains("per-cgroup:"),
1819            "sub-block header; got:\n{formatted}"
1820        );
1821        assert!(
1822            formatted.contains("cg_a: off-cpu avg=82.0% min=80.0% max=84.0% spread=4.0%"),
1823            "got:\n{formatted}",
1824        );
1825        assert!(
1826            formatted.contains("wake p99="),
1827            "wake segment present; got:\n{formatted}"
1828        );
1829        assert!(
1830            formatted.contains("run-delay mean="),
1831            "run-delay segment present; got:\n{formatted}",
1832        );
1833        assert!(
1834            formatted.contains("iters=900000 migrations=12"),
1835            "counters present; got:\n{formatted}",
1836        );
1837        assert!(
1838            formatted.contains("cg_b: off-cpu n/a"),
1839            "not-measured off-cpu is n/a, NOT 0.0%; got:\n{formatted}",
1840        );
1841        // cg_a has a coupled gap (8ms @ cpu 3) -> rendered with its cpu.
1842        assert!(
1843            formatted.contains("gap=8ms@cpu3"),
1844            "coupled gap renders ms@cpu; got:\n{formatted}",
1845        );
1846        // cg_b has no wake/run-delay pools AND max_gap_ms==0 -> those segments
1847        // omitted on its line (gap is omitted at 0, not printed as gap=0ms).
1848        let cg_b_line = formatted
1849            .lines()
1850            .find(|l| l.contains("cg_b:"))
1851            .expect("cg_b line");
1852        assert!(!cg_b_line.contains("wake p99="), "got:\n{formatted}");
1853        assert!(
1854            !cg_b_line.contains("gap="),
1855            "gap omitted at 0; got:\n{formatted}"
1856        );
1857        // BTreeMap name order: cg_a before cg_b.
1858        assert!(
1859            formatted.find("cg_a:").unwrap() < formatted.find("cg_b:").unwrap(),
1860            "cgroups render in name order; got:\n{formatted}",
1861        );
1862    }
1863
1864    /// A stripped carrier (raw sample vectors dropped to fit the size-limited
1865    /// bulk frame) renders "samples stripped (size limit)" — distinct from a
1866    /// not-measured carrier's "off-cpu n/a" — so an operator does not read a
1867    /// size-limit drop as a quiet cgroup. The surviving counters still render.
1868    #[test]
1869    fn format_renders_stripped_carrier_distinctly() {
1870        let mut per_cgroup = std::collections::BTreeMap::new();
1871        per_cgroup.insert(
1872            "cg_stripped".to_string(),
1873            crate::assert::PhaseCgroupStats {
1874                stripped: true,
1875                total_iterations: 500_000,
1876                total_migrations: 9,
1877                ..Default::default()
1878            },
1879        );
1880        per_cgroup.insert(
1881            "cg_quiet".to_string(),
1882            // Genuinely measured nothing (NOT stripped).
1883            crate::assert::PhaseCgroupStats {
1884                total_iterations: 1_000,
1885                ..Default::default()
1886            },
1887        );
1888        let buckets = vec![crate::assert::PhaseBucket {
1889            per_cgroup,
1890            step_index: 1,
1891            label: "Step[0]".to_string(),
1892            start_ms: 1000,
1893            end_ms: 6000,
1894            sample_count: 10,
1895            metrics: std::collections::BTreeMap::from([("avg_imbalance_ratio".to_string(), 1.5)]),
1896        }];
1897        let t = Timeline::from_phase_buckets(&buckets, &[], &TimelineContext::default());
1898        let formatted = t.format_with_context(&TimelineContext::default());
1899        assert!(
1900            formatted.contains("cg_stripped: samples stripped (size limit)"),
1901            "stripped carrier shows the size-limit marker; got:\n{formatted}",
1902        );
1903        assert!(
1904            formatted.contains("iters=500000 migrations=9"),
1905            "stripped carrier still renders its surviving counters; got:\n{formatted}",
1906        );
1907        assert!(
1908            formatted.contains("cg_quiet: off-cpu n/a"),
1909            "a not-stripped, not-measured carrier stays n/a; got:\n{formatted}",
1910        );
1911        assert!(
1912            !formatted.contains("cg_quiet: samples stripped"),
1913            "a not-stripped carrier must NOT show the stripped marker; got:\n{formatted}",
1914        );
1915    }
1916
1917    /// Measured-zero off-CPU renders `0.0%` (a real zero), distinct from the
1918    /// `n/a` not-measured state — the kind-specific None-vs-Some(0.0) boundary.
1919    #[test]
1920    fn format_renders_measured_zero_off_cpu_as_zero_not_na() {
1921        let mut per_cgroup = std::collections::BTreeMap::new();
1922        per_cgroup.insert(
1923            "cg".to_string(),
1924            crate::assert::PhaseCgroupStats {
1925                off_cpu_pcts: vec![0.0, 0.0],
1926                ..Default::default()
1927            },
1928        );
1929        let buckets = vec![crate::assert::PhaseBucket {
1930            per_cgroup,
1931            step_index: 1,
1932            label: "Step[0]".to_string(),
1933            start_ms: 1000,
1934            end_ms: 2000,
1935            sample_count: 1,
1936            metrics: std::collections::BTreeMap::from([("avg_imbalance_ratio".to_string(), 1.0)]),
1937        }];
1938        let t = Timeline::from_phase_buckets(&buckets, &[], &TimelineContext::default());
1939        let formatted = t.format_with_context(&TimelineContext::default());
1940        assert!(
1941            formatted.contains("off-cpu avg=0.0%"),
1942            "measured zero off-cpu is 0.0%, not n/a; got:\n{formatted}",
1943        );
1944        assert!(!formatted.contains("off-cpu n/a"), "got:\n{formatted}");
1945    }
1946
1947    /// Symmetric with the off-cpu measured-zero test for wake + run-delay: a
1948    /// NON-empty pool of zeros is a measured zero (Some) and renders `0µs`, NOT
1949    /// omitted (which only an EMPTY pool -> None does). Guards against a refactor
1950    /// that special-cased a zero reduction to None (collapsing measured-zero
1951    /// into not-measured), silently omitting a real zero-latency reading.
1952    #[test]
1953    fn format_renders_measured_zero_wake_and_run_delay_not_omitted() {
1954        let mut per_cgroup = std::collections::BTreeMap::new();
1955        per_cgroup.insert(
1956            "cg".to_string(),
1957            crate::assert::PhaseCgroupStats {
1958                off_cpu_pcts: vec![5.0],
1959                wake_latencies_ns: vec![0],
1960                run_delays_ns: vec![0],
1961                ..Default::default()
1962            },
1963        );
1964        let buckets = vec![crate::assert::PhaseBucket {
1965            per_cgroup,
1966            step_index: 1,
1967            label: "Step[0]".to_string(),
1968            start_ms: 1000,
1969            end_ms: 2000,
1970            sample_count: 1,
1971            metrics: std::collections::BTreeMap::from([("avg_imbalance_ratio".to_string(), 1.0)]),
1972        }];
1973        let t = Timeline::from_phase_buckets(&buckets, &[], &TimelineContext::default());
1974        let formatted = t.format_with_context(&TimelineContext::default());
1975        assert!(
1976            formatted.contains("wake p99=0\u{00b5}s"),
1977            "measured-zero wake renders 0µs, not omitted; got:\n{formatted}",
1978        );
1979        assert!(
1980            formatted.contains("run-delay mean=0\u{00b5}s"),
1981            "measured-zero run-delay renders 0µs, not omitted; got:\n{formatted}",
1982        );
1983    }
1984
1985    /// The per-cgroup sub-block caps at MAX_RENDERED_CGROUPS (16) by name
1986    /// order, with a "+J more" note — bounding failure-message size.
1987    #[test]
1988    fn format_caps_per_cgroup_subblock() {
1989        let mut per_cgroup = std::collections::BTreeMap::new();
1990        for i in 0..20 {
1991            per_cgroup.insert(
1992                format!("cg{i:02}"),
1993                crate::assert::PhaseCgroupStats {
1994                    off_cpu_pcts: vec![10.0],
1995                    ..Default::default()
1996                },
1997            );
1998        }
1999        let buckets = vec![crate::assert::PhaseBucket {
2000            per_cgroup,
2001            step_index: 1,
2002            label: "Step[0]".to_string(),
2003            start_ms: 1000,
2004            end_ms: 2000,
2005            sample_count: 1,
2006            metrics: std::collections::BTreeMap::from([("avg_imbalance_ratio".to_string(), 1.0)]),
2007        }];
2008        let t = Timeline::from_phase_buckets(&buckets, &[], &TimelineContext::default());
2009        let formatted = t.format_with_context(&TimelineContext::default());
2010        assert!(
2011            formatted.contains("(+4 more cgroups)"),
2012            "20 cgroups capped at 16 -> +4 more; got:\n{formatted}",
2013        );
2014        assert!(
2015            formatted.contains("cg15:"),
2016            "first 16 rendered; got:\n{formatted}"
2017        );
2018        assert!(
2019            !formatted.contains("cg16:"),
2020            "cg16 is beyond the cap; got:\n{formatted}",
2021        );
2022    }
2023
2024    /// No per-cgroup sub-block when the phase carries no carriers (the
2025    /// monitor-only path, or a phase with empty per_cgroup).
2026    #[test]
2027    fn format_omits_per_cgroup_subblock_when_empty() {
2028        let buckets = vec![crate::assert::PhaseBucket {
2029            per_cgroup: Default::default(),
2030            step_index: 1,
2031            label: "Step[0]".to_string(),
2032            start_ms: 1000,
2033            end_ms: 2000,
2034            sample_count: 1,
2035            metrics: std::collections::BTreeMap::from([("avg_imbalance_ratio".to_string(), 1.0)]),
2036        }];
2037        let t = Timeline::from_phase_buckets(&buckets, &[], &TimelineContext::default());
2038        let formatted = t.format_with_context(&TimelineContext::default());
2039        assert!(
2040            !formatted.contains("per-cgroup:"),
2041            "no sub-block when carriers absent; got:\n{formatted}",
2042        );
2043    }
2044
2045    /// Cap off-by-one boundary: EXACTLY 16 cgroups render all 16 with NO
2046    /// "more cgroups" note (total > MAX is false); 17 render 16 + "(+1 more
2047    /// cgroups)". Pins the `>` vs `>=` / `take(N)` edge against a silent drop
2048    /// (one cgroup gone, no note) or a "(+0 more)" lie.
2049    #[test]
2050    fn format_per_cgroup_cap_boundary_16_and_17() {
2051        let mk = |n: usize| {
2052            let mut per_cgroup = std::collections::BTreeMap::new();
2053            for i in 0..n {
2054                per_cgroup.insert(
2055                    format!("cg{i:02}"),
2056                    crate::assert::PhaseCgroupStats {
2057                        off_cpu_pcts: vec![10.0],
2058                        ..Default::default()
2059                    },
2060                );
2061            }
2062            let buckets = vec![crate::assert::PhaseBucket {
2063                per_cgroup,
2064                step_index: 1,
2065                label: "Step[0]".to_string(),
2066                start_ms: 1000,
2067                end_ms: 2000,
2068                sample_count: 1,
2069                metrics: std::collections::BTreeMap::from([(
2070                    "avg_imbalance_ratio".to_string(),
2071                    1.0,
2072                )]),
2073            }];
2074            Timeline::from_phase_buckets(&buckets, &[], &TimelineContext::default())
2075                .format_with_context(&TimelineContext::default())
2076        };
2077        let at16 = mk(16);
2078        assert!(at16.contains("cg15:"), "16th cgroup renders; got:\n{at16}");
2079        assert!(
2080            !at16.contains("more cgroups"),
2081            "exactly 16 has NO truncation note; got:\n{at16}",
2082        );
2083        let at17 = mk(17);
2084        assert!(at17.contains("cg15:"), "got:\n{at17}");
2085        assert!(
2086            !at17.contains("cg16:"),
2087            "17th is past the cap; got:\n{at17}"
2088        );
2089        assert!(
2090            at17.contains("(+1 more cgroup)") && !at17.contains("(+1 more cgroups)"),
2091            "17 cgroups -> exactly +1 more (singular 'cgroup'); got:\n{at17}",
2092        );
2093    }
2094
2095    /// A synthesized (sample_count==0) bucket carrying monitor-derived
2096    /// metrics renders the imbalance/dsq block, not just "[no samples]" —
2097    /// the render-layer half of the monitor-parity handling. format_phases
2098    /// gates that block on `has_monitor_metrics`, not `sample_count > 0`.
2099    #[test]
2100    fn format_renders_synthesized_step_monitor_metrics() {
2101        let buckets = vec![
2102            crate::assert::PhaseBucket {
2103                per_cgroup: Default::default(),
2104                step_index: 0,
2105                label: "BASELINE".to_string(),
2106                start_ms: 0,
2107                end_ms: 1000,
2108                sample_count: 2,
2109                metrics: std::collections::BTreeMap::new(),
2110            },
2111            crate::assert::PhaseBucket {
2112                per_cgroup: Default::default(),
2113                step_index: 1,
2114                label: "Step[0]".to_string(),
2115                start_ms: 1000,
2116                end_ms: 2000,
2117                sample_count: 0, // synthesized zero-capture step
2118                metrics: std::collections::BTreeMap::from([
2119                    ("avg_imbalance_ratio".to_string(), 2.0),
2120                    ("max_imbalance_ratio".to_string(), 3.0),
2121                    ("avg_dsq_depth".to_string(), 5.0),
2122                    ("max_dsq_depth".to_string(), 7.0),
2123                ]),
2124            },
2125        ];
2126        let t = Timeline::from_phase_buckets(&buckets, &[], &TimelineContext::default());
2127        let formatted = t.format_with_context(&TimelineContext::default());
2128        assert!(
2129            formatted.contains("imbalance: avg=2.0 max=3.0"),
2130            "synthesized bucket's folded imbalance must render, not \
2131             '[no samples]'; got:\n{formatted}",
2132        );
2133        assert!(
2134            formatted.contains("dsq: avg=5 max=7"),
2135            "synthesized bucket's folded dsq must render; got:\n{formatted}",
2136        );
2137    }
2138
2139    /// A from_phase_buckets render whose first bucket is a Step (no
2140    /// BASELINE bucket — e.g. under --cell-parent-cgroup where BASELINE
2141    /// captured nothing) must NOT mislabel that Step as "BASELINE".
2142    /// phase.index is the bucket's step_index, so format_phases' index==0
2143    /// BASELINE check fires only for a real BASELINE bucket.
2144    #[test]
2145    fn format_no_baseline_bucket_does_not_mislabel_first_step() {
2146        let buckets = vec![
2147            crate::assert::PhaseBucket {
2148                per_cgroup: Default::default(),
2149                step_index: 1, // scenario Step 0; NO BASELINE bucket present
2150                label: "Step[0]".to_string(),
2151                start_ms: 1000,
2152                end_ms: 2000,
2153                sample_count: 3,
2154                metrics: std::collections::BTreeMap::from([(
2155                    "avg_imbalance_ratio".to_string(),
2156                    1.0,
2157                )]),
2158            },
2159            crate::assert::PhaseBucket {
2160                per_cgroup: Default::default(),
2161                step_index: 2,
2162                label: "Step[1]".to_string(),
2163                start_ms: 2000,
2164                end_ms: 3000,
2165                sample_count: 3,
2166                metrics: std::collections::BTreeMap::from([(
2167                    "avg_imbalance_ratio".to_string(),
2168                    1.0,
2169                )]),
2170            },
2171        ];
2172        let t = Timeline::from_phase_buckets(&buckets, &[], &TimelineContext::default());
2173        let formatted = t.format_with_context(&TimelineContext::default());
2174        assert!(
2175            !formatted.contains("BASELINE"),
2176            "no BASELINE bucket -> no BASELINE label; the first Step must not \
2177             be mislabeled as BASELINE; got:\n{formatted}",
2178        );
2179        assert!(
2180            formatted.contains("Phase 1"),
2181            "the first Step renders as 'Phase 1' (its step_index), not \
2182             'Phase 0'/BASELINE; got:\n{formatted}",
2183        );
2184    }
2185
2186    /// Sparse / non-contiguous step_index renders the TRUE step number:
2187    /// BASELINE(0) + a Step at step_index 3 renders "Phase 3", not the
2188    /// enumerate-position "Phase 1". Pins index == step_index for a
2189    /// non-zero, non-contiguous label — the case that distinguishes
2190    /// step_index from the old enumerate index (a revert to enumerate
2191    /// would pass the contiguous tests but fail this one).
2192    #[test]
2193    fn format_sparse_step_index_renders_true_step_number() {
2194        let buckets = vec![
2195            crate::assert::PhaseBucket {
2196                per_cgroup: Default::default(),
2197                step_index: 0,
2198                label: "BASELINE".to_string(),
2199                start_ms: 0,
2200                end_ms: 100,
2201                sample_count: 2,
2202                metrics: std::collections::BTreeMap::new(),
2203            },
2204            crate::assert::PhaseBucket {
2205                per_cgroup: Default::default(),
2206                step_index: 3, // sparse: Steps 0/1 absent, only step_index 3
2207                label: "Step[2]".to_string(),
2208                start_ms: 200,
2209                end_ms: 300,
2210                sample_count: 2,
2211                metrics: std::collections::BTreeMap::from([(
2212                    "avg_imbalance_ratio".to_string(),
2213                    1.0,
2214                )]),
2215            },
2216        ];
2217        let t = Timeline::from_phase_buckets(&buckets, &[], &TimelineContext::default());
2218        let formatted = t.format_with_context(&TimelineContext::default());
2219        assert!(
2220            formatted.contains("Phase 3"),
2221            "a sparse Step at step_index 3 must render 'Phase 3' (its \
2222             step_index), not the enumerate-position 'Phase 1'; got:\n{formatted}",
2223        );
2224        assert!(
2225            !formatted.contains("Phase 1"),
2226            "the enumerate-position 'Phase 1' must NOT appear for a \
2227             step_index-3 bucket; got:\n{formatted}",
2228        );
2229    }
2230
2231    #[test]
2232    fn unsorted_events_sorted() {
2233        let events = vec![stimulus(3000, "StepStart[0]"), stimulus(0, "ScenarioStart")];
2234        let samples: Vec<MonitorSample> = (5..35)
2235            .map(|i| sample(i * 100, vec![(2, 1, i * 1000), (2, 1, i * 1000 + 100)]))
2236            .collect();
2237        let t = Timeline::build(&events, &samples, 0);
2238        assert_eq!(t.phases.len(), 2);
2239        // First phase should be from ScenarioStart (earliest).
2240        assert!(t.phases[0].stimulus.is_none());
2241    }
2242
2243    #[test]
2244    fn stall_detected_in_phase() {
2245        let events = vec![stimulus(0, "ScenarioStart")];
2246        let samples = vec![
2247            sample(600, vec![(1, 0, 5000), (1, 0, 6000)]),
2248            sample(700, vec![(1, 0, 5000), (1, 0, 7000)]), // cpu0 stalled
2249        ];
2250        let t = Timeline::build(&events, &samples, 0);
2251        assert_eq!(t.phases[0].metrics.stall_count, 1);
2252    }
2253
2254    #[test]
2255    fn compute_metrics_stall_count_accumulates_across_windows() {
2256        // cpu0 frozen across TWO consecutive windows -> stall_count == 2.
2257        // Pins that the per-phase path counts every stuck (CPU, window),
2258        // mirroring the run-level accumulation (both breaks removed).
2259        let s1 = sample(100, vec![(1, 0, 5000), (1, 0, 6000)]);
2260        let s2 = sample(200, vec![(1, 0, 5000), (1, 0, 6100)]);
2261        let s3 = sample(300, vec![(1, 0, 5000), (1, 0, 6200)]);
2262        let refs: Vec<&MonitorSample> = vec![&s1, &s2, &s3];
2263        let m = compute_metrics(&refs, 0);
2264        assert_eq!(m.stall_count, 2);
2265    }
2266
2267    #[test]
2268    fn run_level_stuck_count_ge_sum_of_per_phase() {
2269        // Same is_cpu_stuck predicate, different windowing domains: the
2270        // run-level path windows(2) over the FULL stream and counts the
2271        // pair straddling a phase split; partitioning the stream into
2272        // per-phase subsets drops that boundary pair. So run-level >= Σ
2273        // per-phase (strict here) — pins the documented inequality.
2274        let s1 = sample(100, vec![(1, 0, 5000), (1, 0, 6000)]);
2275        let s2 = sample(200, vec![(1, 0, 5000), (1, 0, 6100)]);
2276        let s3 = sample(300, vec![(1, 0, 5000), (1, 0, 6200)]);
2277        let samples = vec![s1, s2, s3];
2278        // Run-level: windows(2) over all 3 -> cpu0 frozen in both windows.
2279        let run_level = crate::monitor::MonitorSummary::from_samples(&samples).stuck_count;
2280        assert_eq!(run_level, 2);
2281        // Partition after s2 (phase A = [s1,s2], phase B = [s3]): the
2282        // (s2,s3) boundary pair is in neither phase's windows(2).
2283        let phase_a: Vec<&MonitorSample> = vec![&samples[0], &samples[1]];
2284        let phase_b: Vec<&MonitorSample> = vec![&samples[2]];
2285        let sum_per_phase =
2286            compute_metrics(&phase_a, 0).stall_count + compute_metrics(&phase_b, 0).stall_count;
2287        assert_eq!(
2288            sum_per_phase, 1,
2289            "the (s2,s3) boundary pair is in neither phase"
2290        );
2291        assert!(
2292            run_level > sum_per_phase,
2293            "run-level counts the boundary-straddling pair the per-phase partition drops"
2294        );
2295    }
2296
2297    #[test]
2298    fn compute_metrics_empty() {
2299        let m = compute_metrics(&[], 0);
2300        assert_eq!(m.sample_count, 0);
2301        // No samples -> no measurement, not a false 0.0 (the sentinel fix).
2302        assert_eq!(m.avg_imbalance, None);
2303        assert_eq!(m.max_imbalance, None);
2304        assert_eq!(m.avg_dsq_depth, None);
2305        assert_eq!(m.avg_nr_running, None);
2306        assert_eq!(m.max_dsq_depth, 0);
2307    }
2308
2309    #[test]
2310    fn stimulus_event_with_detail() {
2311        let e = StimulusEvent {
2312            elapsed_ms: 100,
2313            label: "StepStart[0]".to_string(),
2314            op_kind: Some("SetCpuset".to_string()),
2315            detail: Some("4 cpus".to_string()),
2316            total_iterations: None,
2317            step_index: None,
2318            is_terminal: false,
2319            is_step_end: false,
2320        };
2321        let events = vec![stimulus(0, "ScenarioStart"), e];
2322        let samples: Vec<MonitorSample> = (5..25)
2323            .map(|i| sample(i * 100, vec![(2, 1, i * 1000), (2, 1, i * 1000 + 100)]))
2324            .collect();
2325        let t = Timeline::build(&events, &samples, 0);
2326        let formatted = t.format_with_context(&TimelineContext::default());
2327        assert!(formatted.contains("SetCpuset"));
2328        assert!(formatted.contains("4 cpus"));
2329    }
2330
2331    #[test]
2332    fn many_phases() {
2333        let events: Vec<StimulusEvent> = (0..10)
2334            .map(|i| stimulus(i * 500, &format!("Step[{i}]")))
2335            .collect();
2336        let samples: Vec<MonitorSample> = (5..55)
2337            .map(|i| sample(i * 100, vec![(2, 1, i * 1000)]))
2338            .collect();
2339        let t = Timeline::build(&events, &samples, 0);
2340        assert_eq!(t.phases.len(), 10);
2341    }
2342
2343    #[test]
2344    fn phase_metrics_accuracy() {
2345        let s1 = sample(600, vec![(1, 3, 100), (4, 5, 200)]); // ratio=4, avg_dsq=4
2346        let s2 = sample(700, vec![(2, 1, 300), (2, 7, 400)]); // ratio=1, avg_dsq=4
2347        let refs: Vec<&MonitorSample> = vec![&s1, &s2];
2348        let m = compute_metrics(&refs, 0);
2349        assert_eq!(m.sample_count, 2);
2350        assert!((m.avg_imbalance.unwrap() - 2.5).abs() < 0.01); // (4+1)/2
2351        // nr_running per CPU: s1=(1,4)->per-sample mean 2.5, s2=(2,2)->mean 2.0;
2352        // averaged over the 2 samples = (2.5+2.0)/2 = 2.25.
2353        assert!((m.avg_nr_running.unwrap() - 2.25).abs() < 0.01);
2354        assert!((m.max_imbalance.unwrap() - 4.0).abs() < 0.01);
2355        assert_eq!(m.max_dsq_depth, 7);
2356    }
2357
2358    // -- ChangeDirection Display tests --
2359
2360    #[test]
2361    fn change_direction_display() {
2362        assert_eq!(format!("{}", ChangeDirection::Improved), "IMPROVEMENT");
2363        assert_eq!(format!("{}", ChangeDirection::Degraded), "DEGRADATION");
2364    }
2365
2366    // -- compute_metrics with event counters --
2367
2368    #[test]
2369    fn compute_metrics_with_event_counters() {
2370        use crate::monitor::ScxEventCounters;
2371
2372        let s1 = MonitorSample {
2373            bpf_map_fields: Vec::new(),
2374            prog_stats: None,
2375            psi_irq: None,
2376            elapsed_ms: 600,
2377            cpus: vec![CpuSnapshot {
2378                nr_running: 2,
2379                local_dsq_depth: 1,
2380                rq_clock: 100,
2381                scx_nr_running: 0,
2382                scx_flags: 0,
2383                event_counters: Some(ScxEventCounters {
2384                    select_cpu_fallback: 10,
2385                    dispatch_keep_last: 5,
2386                    ..Default::default()
2387                }),
2388                schedstat: None,
2389                vcpu_cpu_time_ns: None,
2390                vcpu_perf: None,
2391                avg_irq_util: None,
2392                sched_domains: None,
2393            }],
2394        };
2395        let s2 = MonitorSample {
2396            bpf_map_fields: Vec::new(),
2397            prog_stats: None,
2398            psi_irq: None,
2399            elapsed_ms: 1600,
2400            cpus: vec![CpuSnapshot {
2401                nr_running: 2,
2402                local_dsq_depth: 1,
2403                rq_clock: 200,
2404                scx_nr_running: 0,
2405                scx_flags: 0,
2406                event_counters: Some(ScxEventCounters {
2407                    select_cpu_fallback: 110,
2408                    dispatch_keep_last: 55,
2409                    ..Default::default()
2410                }),
2411                schedstat: None,
2412                vcpu_cpu_time_ns: None,
2413                vcpu_perf: None,
2414                avg_irq_util: None,
2415                sched_domains: None,
2416            }],
2417        };
2418        let refs: Vec<&MonitorSample> = vec![&s1, &s2];
2419        let m = compute_metrics(&refs, 0);
2420        // fallback delta: 110 - 10 = 100 over 1.0s = 100.0/s
2421        assert!((m.fallback_rate.unwrap() - 100.0).abs() < 0.01);
2422        // keep_last delta: 55 - 5 = 50 over 1.0s = 50.0/s
2423        assert!((m.keep_last_rate.unwrap() - 50.0).abs() < 0.01);
2424    }
2425
2426    #[test]
2427    fn compute_metrics_no_event_counters() {
2428        let s1 = sample(600, vec![(2, 1, 100)]);
2429        let s2 = sample(700, vec![(2, 1, 200)]);
2430        let refs: Vec<&MonitorSample> = vec![&s1, &s2];
2431        let m = compute_metrics(&refs, 0);
2432        assert!(m.fallback_rate.is_none());
2433        assert!(m.keep_last_rate.is_none());
2434    }
2435
2436    #[test]
2437    fn compute_metrics_counter_reset_clamps_rates_to_non_negative() {
2438        // A scheduler restart between samples resets event counters
2439        // to smaller (or zero) values. Raw `last - first` then
2440        // produces a negative delta, which would flow into
2441        // `fallback_rate = delta / duration` and report a negative
2442        // rate. The shared counter_delta helper clamps to 0.
2443        use crate::monitor::ScxEventCounters;
2444
2445        let s1 = MonitorSample {
2446            bpf_map_fields: Vec::new(),
2447            prog_stats: None,
2448            psi_irq: None,
2449            elapsed_ms: 0,
2450            cpus: vec![CpuSnapshot {
2451                nr_running: 2,
2452                local_dsq_depth: 1,
2453                rq_clock: 100,
2454                scx_nr_running: 0,
2455                scx_flags: 0,
2456                event_counters: Some(ScxEventCounters {
2457                    select_cpu_fallback: 1000,
2458                    dispatch_keep_last: 500,
2459                    ..Default::default()
2460                }),
2461                schedstat: None,
2462                vcpu_cpu_time_ns: None,
2463                vcpu_perf: None,
2464                avg_irq_util: None,
2465                sched_domains: None,
2466            }],
2467        };
2468        let s2 = MonitorSample {
2469            bpf_map_fields: Vec::new(),
2470            prog_stats: None,
2471            psi_irq: None,
2472            elapsed_ms: 1000,
2473            cpus: vec![CpuSnapshot {
2474                nr_running: 2,
2475                local_dsq_depth: 1,
2476                rq_clock: 200,
2477                scx_nr_running: 0,
2478                scx_flags: 0,
2479                event_counters: Some(ScxEventCounters {
2480                    select_cpu_fallback: 5,
2481                    dispatch_keep_last: 2,
2482                    ..Default::default()
2483                }),
2484                schedstat: None,
2485                vcpu_cpu_time_ns: None,
2486                vcpu_perf: None,
2487                avg_irq_util: None,
2488                sched_domains: None,
2489            }],
2490        };
2491        let refs: Vec<&MonitorSample> = vec![&s1, &s2];
2492        let m = compute_metrics(&refs, 0);
2493        let fb = m.fallback_rate.expect("reset still produces Some rate");
2494        let kl = m.keep_last_rate.expect("reset still produces Some rate");
2495        assert!(
2496            fb >= 0.0,
2497            "reset must not produce negative fallback_rate, got {fb}"
2498        );
2499        assert!(
2500            kl >= 0.0,
2501            "reset must not produce negative keep_last_rate, got {kl}"
2502        );
2503    }
2504
2505    // -- format with stalls --
2506
2507    #[test]
2508    fn format_with_stalls_shown() {
2509        let events = vec![stimulus(0, "ScenarioStart")];
2510        let samples = vec![
2511            sample(600, vec![(1, 0, 5000), (1, 0, 6000)]),
2512            sample(700, vec![(1, 0, 5000), (1, 0, 7000)]), // cpu0 stalled
2513        ];
2514        let t = Timeline::build(&events, &samples, 0);
2515        let formatted = t.format_with_context(&TimelineContext::default());
2516        assert!(formatted.contains("stalls: 1"));
2517    }
2518
2519    // -- format with no samples in a phase --
2520
2521    #[test]
2522    fn format_phase_no_samples() {
2523        // Create a phase with no samples by making a phase boundary far
2524        // beyond the last monitor sample's time.
2525        let events = vec![
2526            stimulus(0, "ScenarioStart"),
2527            stimulus(100, "StepStart[0]"),
2528            stimulus(50000, "StepStart[1]"),
2529        ];
2530        // All samples are in the middle phase window.
2531        let samples: Vec<MonitorSample> = (5..15)
2532            .map(|i| sample(i * 100, vec![(2, 1, i * 1000)]))
2533            .collect();
2534        let t = Timeline::build(&events, &samples, 0);
2535        let formatted = t.format_with_context(&TimelineContext::default());
2536        // The last phase (50000+offset to end) should have no samples.
2537        assert!(formatted.contains("[no samples]"));
2538    }
2539
2540    // -- timeline with fallback rate change detection --
2541
2542    #[test]
2543    fn fallback_rate_degradation_detected() {
2544        use crate::monitor::ScxEventCounters;
2545
2546        let events = vec![stimulus(0, "ScenarioStart"), stimulus(1000, "StepStart[0]")];
2547        let mut samples = Vec::new();
2548        // Phase 0: zero fallback rate (counter stays constant).
2549        for i in 5..15 {
2550            samples.push(MonitorSample {
2551                bpf_map_fields: Vec::new(),
2552                prog_stats: None,
2553                psi_irq: None,
2554                elapsed_ms: i * 100,
2555                cpus: vec![CpuSnapshot {
2556                    nr_running: 2,
2557                    local_dsq_depth: 1,
2558                    rq_clock: i * 1000,
2559                    scx_nr_running: 0,
2560                    scx_flags: 0,
2561                    event_counters: Some(ScxEventCounters {
2562                        select_cpu_fallback: 0,
2563                        dispatch_keep_last: 0,
2564                        ..Default::default()
2565                    }),
2566                    schedstat: None,
2567                    vcpu_cpu_time_ns: None,
2568                    vcpu_perf: None,
2569                    avg_irq_util: None,
2570                    sched_domains: None,
2571                }],
2572            });
2573        }
2574        // Phase 1: very high fallback rate.
2575        // 10 samples over 1s. Counter goes from 0 to 500.
2576        // Rate = 500/1.0 = 500/s, well above threshold 10.0.
2577        for i in 15..25 {
2578            samples.push(MonitorSample {
2579                bpf_map_fields: Vec::new(),
2580                prog_stats: None,
2581                psi_irq: None,
2582                elapsed_ms: i * 100,
2583                cpus: vec![CpuSnapshot {
2584                    nr_running: 2,
2585                    local_dsq_depth: 1,
2586                    rq_clock: i * 1000,
2587                    scx_nr_running: 0,
2588                    scx_flags: 0,
2589                    event_counters: Some(ScxEventCounters {
2590                        select_cpu_fallback: (i as i64 - 15) * 50,
2591                        dispatch_keep_last: 0,
2592                        ..Default::default()
2593                    }),
2594                    schedstat: None,
2595                    vcpu_cpu_time_ns: None,
2596                    vcpu_perf: None,
2597                    avg_irq_util: None,
2598                    sched_domains: None,
2599                }],
2600            });
2601        }
2602        let t = Timeline::build(&events, &samples, 0);
2603        let degs: Vec<_> = t
2604            .degradations()
2605            .into_iter()
2606            .filter(|(_, c)| c.metric == "fallback")
2607            .collect();
2608        assert!(!degs.is_empty());
2609    }
2610
2611    // -- format_with_context tests --
2612
2613    #[test]
2614    fn format_with_context_includes_header() {
2615        let events = vec![stimulus(0, "ScenarioStart")];
2616        let samples = vec![
2617            sample(600, vec![(2, 1, 100), (2, 1, 200)]),
2618            sample(700, vec![(2, 1, 300), (2, 1, 400)]),
2619        ];
2620        let t = Timeline::build(&events, &samples, 0);
2621        let ctx = TimelineContext {
2622            kernel: Some("6.14.0-rc3+".to_string()),
2623            topology: Some("2n4l4c2t (16 cpus)".to_string()),
2624            scheduler: Some("scx_mitosis".to_string()),
2625            scenario: Some("proportional".to_string()),
2626            duration_s: Some(20.5),
2627        };
2628        let formatted = t.format_with_context(&ctx);
2629        assert!(formatted.contains("--- timeline ---"));
2630        assert!(formatted.contains("kernel: 6.14.0-rc3+"));
2631        assert!(formatted.contains("topology: 2n4l4c2t (16 cpus)"));
2632        assert!(formatted.contains("scheduler: scx_mitosis"));
2633        assert!(formatted.contains("scenario: proportional"));
2634        assert!(formatted.contains("duration: 20.5s"));
2635        assert!(formatted.contains("BASELINE"));
2636    }
2637
2638    #[test]
2639    fn format_with_context_partial_fields() {
2640        let events = vec![stimulus(0, "ScenarioStart")];
2641        let samples = vec![sample(600, vec![(2, 1, 100)])];
2642        let t = Timeline::build(&events, &samples, 0);
2643        let ctx = TimelineContext {
2644            kernel: None,
2645            topology: Some("1n1l1c1t (1 cpus)".to_string()),
2646            scheduler: None,
2647            scenario: Some("basic".to_string()),
2648            duration_s: None,
2649        };
2650        let formatted = t.format_with_context(&ctx);
2651        assert!(formatted.contains("topology: 1n1l1c1t"));
2652        assert!(formatted.contains("scenario: basic"));
2653        assert!(!formatted.contains("kernel:"));
2654        assert!(!formatted.contains("scheduler:"));
2655        assert!(!formatted.contains("duration:"));
2656    }
2657
2658    #[test]
2659    fn format_with_context_empty_timeline() {
2660        let t = Timeline { phases: vec![] };
2661        let ctx = TimelineContext {
2662            kernel: Some("6.14.0".to_string()),
2663            ..Default::default()
2664        };
2665        assert!(t.format_with_context(&ctx).is_empty());
2666    }
2667
2668    #[test]
2669    fn format_with_context_empty_context() {
2670        let events = vec![stimulus(0, "ScenarioStart")];
2671        let samples = vec![sample(600, vec![(2, 1, 100)])];
2672        let t = Timeline::build(&events, &samples, 0);
2673        let ctx = TimelineContext::default();
2674        let formatted = t.format_with_context(&ctx);
2675        // Should have the timeline header and phases but no context line.
2676        assert!(formatted.contains("--- timeline ---"));
2677        assert!(formatted.contains("BASELINE"));
2678        // The line after "--- timeline ---\n" should be "\nBASELINE" (no context line).
2679        let after_header = &formatted["--- timeline ---\n".len()..];
2680        assert!(after_header.starts_with('\n'));
2681    }
2682
2683    #[test]
2684    fn garbage_dsq_samples_filtered_from_metrics() {
2685        // Samples with DSQ depth above DSQ_PLAUSIBILITY_CEILING should be
2686        // excluded from phase metrics (the bug: garbage values like 1.5B
2687        // were flowing into timeline output).
2688        let events = vec![stimulus(0, "ScenarioStart")];
2689        let garbage_dsq = 1_550_435_906u32;
2690        let samples = vec![
2691            // Garbage sample (DSQ above ceiling).
2692            MonitorSample {
2693                bpf_map_fields: Vec::new(),
2694                prog_stats: None,
2695                psi_irq: None,
2696                elapsed_ms: 600,
2697                cpus: vec![CpuSnapshot {
2698                    nr_running: 1,
2699                    local_dsq_depth: garbage_dsq,
2700                    rq_clock: 1000,
2701                    ..Default::default()
2702                }],
2703            },
2704            // Valid sample.
2705            sample(700, vec![(2, 3, 2000)]),
2706        ];
2707        let t = Timeline::build(&events, &samples, 0);
2708        assert_eq!(t.phases.len(), 1);
2709        // Only the valid sample should be counted.
2710        assert_eq!(t.phases[0].metrics.sample_count, 1);
2711        assert_eq!(t.phases[0].metrics.max_dsq_depth, 3);
2712    }
2713
2714    #[test]
2715    fn all_garbage_samples_yield_no_metrics() {
2716        let events = vec![stimulus(0, "ScenarioStart")];
2717        let samples = vec![MonitorSample {
2718            bpf_map_fields: Vec::new(),
2719            prog_stats: None,
2720            psi_irq: None,
2721            elapsed_ms: 600,
2722            cpus: vec![CpuSnapshot {
2723                nr_running: 1,
2724                local_dsq_depth: 50_000,
2725                rq_clock: 1000,
2726                ..Default::default()
2727            }],
2728        }];
2729        let t = Timeline::build(&events, &samples, 0);
2730        assert_eq!(t.phases[0].metrics.sample_count, 0);
2731    }
2732
2733    // ---------------------------------------------------------------
2734    // Negative test: timeline detects degradation at phase transition
2735    // ---------------------------------------------------------------
2736
2737    #[test]
2738    fn neg_timeline_detects_imbalance_degradation() {
2739        let events = vec![stimulus(0, "ScenarioStart"), stimulus(2000, "StepStart[0]")];
2740        let mut samples = Vec::new();
2741        for i in 6..25 {
2742            samples.push(sample(
2743                i * 100,
2744                vec![(2, 1, i * 1000), (2, 1, i * 1000 + 100)],
2745            ));
2746        }
2747        for i in 26..45 {
2748            samples.push(sample(
2749                i * 100,
2750                vec![(1, 1, i * 1000), (10, 1, i * 1000 + 100)],
2751            ));
2752        }
2753        let t = Timeline::build(&events, &samples, 0);
2754        assert_eq!(t.phases.len(), 2, "must have 2 phases");
2755        assert!(!t.degradations().is_empty());
2756
2757        // Phase 0 (baseline) must have samples and reasonable metrics.
2758        assert!(
2759            t.phases[0].metrics.sample_count > 0,
2760            "baseline must have samples"
2761        );
2762        assert!(
2763            (t.phases[0].metrics.avg_imbalance.unwrap() - 1.0).abs() < 0.5,
2764            "baseline imbalance should be ~1.0, got {:?}",
2765            t.phases[0].metrics.avg_imbalance,
2766        );
2767
2768        // Phase 1 must have the stimulus label and degradation.
2769        assert!(
2770            t.phases[1].metrics.sample_count > 0,
2771            "phase 1 must have samples"
2772        );
2773        assert!(
2774            t.phases[1]
2775                .stimulus
2776                .as_ref()
2777                .is_some_and(|s| s.label == "StepStart[0]"),
2778            "phase 1 stimulus must be StepStart[0]",
2779        );
2780
2781        let degs = t.degradations();
2782        assert!(!degs.is_empty());
2783        let (phase, change) = &degs[0];
2784        assert_eq!(phase.index, 1);
2785        assert_eq!(change.metric, "imbalance");
2786        assert_eq!(change.direction, ChangeDirection::Degraded);
2787        let delta = change.after - change.before;
2788        assert!(delta > 0.0, "delta must be positive for degradation");
2789        assert!(
2790            delta > IMBALANCE_THRESHOLD,
2791            "delta {:.1} must exceed threshold {:.1}",
2792            delta,
2793            IMBALANCE_THRESHOLD
2794        );
2795        assert!(
2796            change.before < 2.0,
2797            "before should be low: {:.1}",
2798            change.before
2799        );
2800        assert!(
2801            change.after > 5.0,
2802            "after should be high: {:.1}",
2803            change.after
2804        );
2805
2806        // Format output must be parseable.
2807        let formatted = t.format_with_context(&TimelineContext::default());
2808        assert!(
2809            formatted.contains("BASELINE"),
2810            "format must include BASELINE phase"
2811        );
2812        assert!(formatted.contains("Phase 1"), "format must include Phase 1");
2813        assert!(
2814            formatted.contains("DEGRADATION"),
2815            "format must include DEGRADATION label"
2816        );
2817        assert!(
2818            formatted.contains("imbalance"),
2819            "format must name the metric"
2820        );
2821    }
2822
2823    #[test]
2824    fn neg_timeline_detects_dsq_depth_degradation() {
2825        let events = vec![stimulus(0, "ScenarioStart"), stimulus(2000, "StepStart[0]")];
2826        let mut samples = Vec::new();
2827        for i in 6..25 {
2828            samples.push(sample(
2829                i * 100,
2830                vec![(2, 1, i * 1000), (2, 1, i * 1000 + 100)],
2831            ));
2832        }
2833        for i in 26..45 {
2834            samples.push(sample(
2835                i * 100,
2836                vec![(2, 20, i * 1000), (2, 20, i * 1000 + 100)],
2837            ));
2838        }
2839        let t = Timeline::build(&events, &samples, 0);
2840        assert!(
2841            !t.degradations().is_empty(),
2842            "DSQ depth jump must be detected"
2843        );
2844        let degs = t.degradations();
2845        let dsq_deg = degs.iter().find(|(_, c)| c.metric == "dsq_depth");
2846        assert!(dsq_deg.is_some(), "must detect dsq_depth degradation");
2847        let (phase, change) = dsq_deg.unwrap();
2848        assert_eq!(phase.index, 1);
2849        assert_eq!(change.direction, ChangeDirection::Degraded);
2850        let delta = change.after - change.before;
2851        assert!(
2852            delta > DSQ_THRESHOLD,
2853            "dsq delta {:.1} must exceed threshold {:.1}",
2854            delta,
2855            DSQ_THRESHOLD
2856        );
2857        assert!(
2858            change.before < 5.0,
2859            "before dsq should be low: {:.1}",
2860            change.before
2861        );
2862        assert!(
2863            change.after > 15.0,
2864            "after dsq should be high: {:.1}",
2865            change.after
2866        );
2867
2868        let formatted = t.format_with_context(&TimelineContext::default());
2869        assert!(
2870            formatted.contains("dsq_depth"),
2871            "format must name dsq_depth"
2872        );
2873        assert!(
2874            formatted.contains("DEGRADATION"),
2875            "format must label degradation"
2876        );
2877    }
2878
2879    #[test]
2880    fn neg_timeline_no_degradation_when_stable() {
2881        let events = vec![stimulus(0, "ScenarioStart"), stimulus(2000, "StepStart[0]")];
2882        let mut samples = Vec::new();
2883        for i in 6..45 {
2884            samples.push(sample(
2885                i * 100,
2886                vec![(2, 1, i * 1000), (2, 1, i * 1000 + 100)],
2887            ));
2888        }
2889        let t = Timeline::build(&events, &samples, 0);
2890        assert_eq!(t.phases.len(), 2, "must have 2 phases");
2891        assert!(t.phases[0].metrics.sample_count > 0);
2892        assert!(t.phases[1].metrics.sample_count > 0);
2893        assert!(
2894            t.degradations().is_empty(),
2895            "stable phases must not show degradation"
2896        );
2897        assert!(t.degradations().is_empty());
2898        // All phase changes should be empty.
2899        for phase in &t.phases {
2900            assert!(
2901                phase.changes.is_empty(),
2902                "phase {} should have no changes",
2903                phase.index
2904            );
2905        }
2906    }
2907
2908    // -- detect_change direct tests --
2909
2910    #[test]
2911    fn detect_change_higher_is_worse_positive_delta_degraded() {
2912        let c = detect_change(1.0, 5.0, 0.5, "imbalance", true).unwrap();
2913        assert_eq!(c.direction, ChangeDirection::Degraded);
2914        assert_eq!(c.metric, "imbalance");
2915        assert!((c.before - 1.0).abs() < f64::EPSILON);
2916        assert!((c.after - 5.0).abs() < f64::EPSILON);
2917    }
2918
2919    #[test]
2920    fn detect_change_higher_is_worse_negative_delta_improved() {
2921        let c = detect_change(5.0, 1.0, 0.5, "imbalance", true).unwrap();
2922        assert_eq!(c.direction, ChangeDirection::Improved);
2923    }
2924
2925    #[test]
2926    fn detect_change_lower_is_worse_negative_delta_degraded() {
2927        let c = detect_change(100.0, 50.0, 10.0, "throughput", false).unwrap();
2928        assert_eq!(c.direction, ChangeDirection::Degraded);
2929    }
2930
2931    #[test]
2932    fn detect_change_lower_is_worse_positive_delta_improved() {
2933        let c = detect_change(50.0, 100.0, 10.0, "throughput", false).unwrap();
2934        assert_eq!(c.direction, ChangeDirection::Improved);
2935    }
2936
2937    #[test]
2938    fn detect_change_below_threshold_returns_none() {
2939        assert!(detect_change(1.0, 1.3, 0.5, "imbalance", true).is_none());
2940    }
2941
2942    #[test]
2943    fn detect_change_exactly_at_threshold_returns_none() {
2944        assert!(detect_change(1.0, 1.5, 0.5, "imbalance", true).is_none());
2945    }
2946
2947    // -- detect_boundary_changes: throughput across synthesized steps --
2948
2949    /// Throughput is flagged across a SYNTHESIZED zero-capture phase
2950    /// (sample_count 0) — its iteration_rate is stimulus-derived and
2951    /// real — while monitor-derived metrics stay gated on samples, so
2952    /// the synthesized side never paints a phantom imbalance change.
2953    /// This is the collapse-suppression invariant: a throughput collapse entering or
2954    /// leaving a capture-free step must not be silently dropped.
2955    #[test]
2956    fn detect_boundary_changes_flags_throughput_across_synthesized_phase() {
2957        let before = PhaseMetrics {
2958            sample_count: 30,
2959            iteration_rate: Some(1000.0),
2960            avg_imbalance: Some(1.0),
2961            ..Default::default()
2962        };
2963        let after = PhaseMetrics {
2964            sample_count: 0,             // synthesized zero-capture step
2965            iteration_rate: Some(300.0), // 70% collapse
2966            avg_imbalance: Some(99.0),   // partial/default — must be ignored
2967            ..Default::default()
2968        };
2969        let changes = detect_boundary_changes(&before, &after);
2970        let throughput: Vec<_> = changes
2971            .iter()
2972            .filter(|c| c.metric == "throughput")
2973            .collect();
2974        assert_eq!(
2975            throughput.len(),
2976            1,
2977            "throughput collapse across a synthesized step must be flagged: {changes:?}",
2978        );
2979        assert_eq!(throughput[0].direction, ChangeDirection::Degraded);
2980        assert!(
2981            !changes.iter().any(|c| c.metric == "imbalance"),
2982            "monitor metrics must stay gated when a side has 0 samples: {changes:?}",
2983        );
2984    }
2985
2986    /// The monitor-metric gate only suppresses a ZERO-sample side: when
2987    /// both phases captured samples, monitor-derived changes still
2988    /// surface. Guards the collapse suppression from over-suppressing the normal
2989    /// captured-to-captured boundary.
2990    #[test]
2991    fn detect_boundary_changes_reports_monitor_metrics_when_both_sampled() {
2992        let before = PhaseMetrics {
2993            sample_count: 30,
2994            iteration_rate: Some(1000.0),
2995            avg_imbalance: Some(1.0),
2996            ..Default::default()
2997        };
2998        let after = PhaseMetrics {
2999            sample_count: 30,
3000            iteration_rate: Some(1000.0), // unchanged throughput
3001            avg_imbalance: Some(5.0),     // imbalance jump > IMBALANCE_THRESHOLD
3002            ..Default::default()
3003        };
3004        let changes = detect_boundary_changes(&before, &after);
3005        assert!(
3006            changes.iter().any(|c| c.metric == "imbalance"),
3007            "imbalance change must surface when both sides sampled: {changes:?}",
3008        );
3009        assert!(
3010            !changes.iter().any(|c| c.metric == "throughput"),
3011            "unchanged throughput must not be flagged: {changes:?}",
3012        );
3013    }
3014
3015    /// A per-phase avg_nr_running (mean runqueue depth) rise above
3016    /// NR_RUNNING_THRESHOLD between two sampled phases is flagged as a
3017    /// degradation (higher runqueue depth = more contention). Gated on both
3018    /// phases having real samples, like the other monitor-derived gauges.
3019    #[test]
3020    fn detect_boundary_changes_flags_nr_running_jump() {
3021        let before = PhaseMetrics {
3022            sample_count: 30,
3023            avg_nr_running: Some(1.0),
3024            ..Default::default()
3025        };
3026        let after = PhaseMetrics {
3027            sample_count: 30,
3028            avg_nr_running: Some(3.0), // +2.0 > NR_RUNNING_THRESHOLD (1.0)
3029            ..Default::default()
3030        };
3031        let changes = detect_boundary_changes(&before, &after);
3032        let nr: Vec<_> = changes
3033            .iter()
3034            .filter(|c| c.metric == "nr_running")
3035            .collect();
3036        assert_eq!(
3037            nr.len(),
3038            1,
3039            "avg_nr_running jump must be flagged: {changes:?}"
3040        );
3041        assert_eq!(
3042            nr[0].direction,
3043            ChangeDirection::Degraded,
3044            "rising runqueue depth is a degradation",
3045        );
3046        // A zero-sample side suppresses it (same gate as the sibling gauges).
3047        let after_synth = PhaseMetrics {
3048            sample_count: 0,
3049            avg_nr_running: Some(3.0),
3050            ..Default::default()
3051        };
3052        assert!(
3053            !detect_boundary_changes(&before, &after_synth)
3054                .iter()
3055                .any(|c| c.metric == "nr_running"),
3056            "nr_running must stay gated when a side has 0 samples",
3057        );
3058    }
3059
3060    /// The per-phase timeline narrative renders avg_nr_running (`nr_run: avg=`)
3061    /// alongside imbalance + dsq for a sampled phase.
3062    #[test]
3063    fn format_phases_renders_avg_nr_running() {
3064        // Two CPUs with nr_running 2 and 4 -> per-sample mean 3.0.
3065        let samples: Vec<MonitorSample> = (5..25)
3066            .map(|i| sample(i * 100, vec![(2, 1, i * 1000), (4, 1, i * 1000 + 100)]))
3067            .collect();
3068        let events = vec![stimulus(0, "ScenarioStart")];
3069        let t = Timeline::build(&events, &samples, 0);
3070        let formatted = t.format_with_context(&TimelineContext::default());
3071        assert!(
3072            formatted.contains("nr_run: avg="),
3073            "per-phase narrative must render avg_nr_running: {formatted}",
3074        );
3075    }
3076
3077    /// Throughput uses a STRICT relative threshold: a change of exactly
3078    /// ITERATION_RATE_REL_THRESHOLD (30%) is NOT flagged. Pins the `>`
3079    /// boundary so a future `>` -> `>=` flip is caught (the absolute
3080    /// detect_change gate is a different code path, tested separately).
3081    #[test]
3082    fn detect_boundary_changes_throughput_exactly_at_threshold_not_flagged() {
3083        let before = PhaseMetrics {
3084            sample_count: 30,
3085            iteration_rate: Some(1000.0),
3086            ..Default::default()
3087        };
3088        let after = PhaseMetrics {
3089            sample_count: 30,
3090            iteration_rate: Some(700.0), // rel = -0.3 exactly
3091            ..Default::default()
3092        };
3093        assert_eq!((700.0 - 1000.0) / 1000.0, -ITERATION_RATE_REL_THRESHOLD);
3094        assert!(
3095            !detect_boundary_changes(&before, &after)
3096                .iter()
3097                .any(|c| c.metric == "throughput"),
3098            "an exactly-30% relative change must not flag (strict >)",
3099        );
3100    }
3101
3102    /// synthesized -> synthesized boundary (BOTH sides sample_count 0,
3103    /// both carrying a real stimulus-derived rate): throughput is still
3104    /// compared (the gate is symmetric and sample_count-independent for
3105    /// throughput) and monitor metrics stay suppressed on both sides.
3106    /// Pins the zero->zero cell of the transition matrix against a future
3107    /// edit that re-couples throughput to a per-side sample_count check.
3108    #[test]
3109    fn detect_boundary_changes_synthesized_to_synthesized_flags_throughput() {
3110        let before = PhaseMetrics {
3111            sample_count: 0,
3112            iteration_rate: Some(1000.0),
3113            avg_imbalance: Some(1.0),
3114            ..Default::default()
3115        };
3116        let after = PhaseMetrics {
3117            sample_count: 0,
3118            iteration_rate: Some(300.0), // 70% collapse
3119            avg_imbalance: Some(99.0),   // wild — must stay gated
3120            ..Default::default()
3121        };
3122        let changes = detect_boundary_changes(&before, &after);
3123        let throughput: Vec<_> = changes
3124            .iter()
3125            .filter(|c| c.metric == "throughput")
3126            .collect();
3127        assert_eq!(
3128            throughput.len(),
3129            1,
3130            "zero->zero throughput collapse must flag: {changes:?}"
3131        );
3132        assert_eq!(throughput[0].direction, ChangeDirection::Degraded);
3133        assert!(
3134            !changes.iter().any(|c| c.metric == "imbalance"),
3135            "monitor metrics gated on both zero-sample sides: {changes:?}",
3136        );
3137    }
3138
3139    // -- iteration_rate computation tests --
3140
3141    fn stimulus_with_iters(elapsed_ms: u64, label: &str, total_iterations: u64) -> StimulusEvent {
3142        StimulusEvent {
3143            elapsed_ms,
3144            label: label.to_string(),
3145            op_kind: None,
3146            detail: None,
3147            total_iterations: Some(total_iterations),
3148            step_index: None,
3149            is_terminal: false,
3150            is_step_end: false,
3151        }
3152    }
3153
3154    #[test]
3155    fn iteration_rate_computed_from_consecutive_events() {
3156        // Two events with total_iterations: phase 0 spans 0..3000ms
3157        // (aligned). iterations: 0 -> 3000 over ~3s = 1000 iter/s.
3158        let events = vec![
3159            stimulus_with_iters(0, "ScenarioStart", 0),
3160            stimulus_with_iters(3000, "StepStart[0]", 3000),
3161        ];
3162        let samples: Vec<MonitorSample> = (5..35)
3163            .map(|i| sample(i * 100, vec![(2, 1, i * 1000), (2, 1, i * 1000 + 100)]))
3164            .collect();
3165        let t = Timeline::build(&events, &samples, 0);
3166        assert_eq!(t.phases.len(), 2);
3167        let rate = t.phases[0].metrics.iteration_rate;
3168        assert!(rate.is_some(), "phase 0 should have iteration_rate");
3169        let r = rate.unwrap();
3170        // Duration is phase boundary difference, not exactly 3s due to
3171        // clock alignment offset. Check that the rate is reasonable.
3172        assert!(r > 500.0 && r < 2000.0, "rate {r} outside expected range");
3173    }
3174
3175    #[test]
3176    fn iteration_rate_none_without_total_iterations() {
3177        // Events without total_iterations: iteration_rate should be None.
3178        let events = vec![stimulus(0, "ScenarioStart"), stimulus(3000, "StepStart[0]")];
3179        let samples: Vec<MonitorSample> = (5..35)
3180            .map(|i| sample(i * 100, vec![(2, 1, i * 1000), (2, 1, i * 1000 + 100)]))
3181            .collect();
3182        let t = Timeline::build(&events, &samples, 0);
3183        assert!(t.phases[0].metrics.iteration_rate.is_none());
3184        assert!(t.phases[1].metrics.iteration_rate.is_none());
3185    }
3186
3187    /// Build a wire `StimulusEvent` so tests can drive the FULL
3188    /// `from_wire` path (the production conversion) rather than
3189    /// constructing the timeline event directly — the latter bypassed
3190    /// the `total_iterations == 0` sentinel.
3191    fn wire_event(
3192        elapsed_ms: u32,
3193        step_index: u16,
3194        total_iterations: u64,
3195    ) -> crate::vmm::wire::StimulusEvent {
3196        crate::vmm::wire::StimulusEvent {
3197            elapsed_ms,
3198            step_index,
3199            op_count: 0,
3200            op_kinds: 0,
3201            cgroup_count: 0,
3202            worker_count: 1,
3203            total_iterations,
3204        }
3205    }
3206
3207    #[test]
3208    fn from_wire_zero_iterations_is_some_baseline() {
3209        // total_iterations is a cumulative counter, so a
3210        // start-of-window 0 is a legitimate baseline, NOT a missing
3211        // sample. from_wire must carry Some(0), never collapse it to
3212        // None.
3213        let te = StimulusEvent::from_wire(&wire_event(0, 1, 0));
3214        assert_eq!(te.total_iterations, Some(0));
3215        assert_eq!(te.step_index, Some(1));
3216        assert!(!te.is_terminal);
3217        assert!(
3218            !te.is_step_end,
3219            "a StepStart-derived event is not a StepEnd"
3220        );
3221    }
3222
3223    #[test]
3224    fn from_step_end_carries_step_index_and_marks_step_end() {
3225        // A StepEnd frame reuses the StimulusEvent wire body.
3226        // from_step_end must carry the same 1-indexed step_index and the
3227        // step's end-of-hold total_iterations, flag is_step_end, and leave
3228        // is_terminal off (it is a real per-step boundary, not the
3229        // scenario terminal).
3230        let te = StimulusEvent::from_step_end(&wire_event(1_900, 1, 9_000));
3231        assert_eq!(te.step_index, Some(1));
3232        assert_eq!(te.total_iterations, Some(9_000));
3233        assert!(te.is_step_end, "StepEnd-derived event must set is_step_end");
3234        assert!(
3235            !te.is_terminal,
3236            "StepEnd is a per-step boundary, not the scenario terminal",
3237        );
3238    }
3239
3240    #[test]
3241    fn from_wire_first_step_zero_baseline_yields_rate() {
3242        // First-step zero-baseline regression, driven through the FULL from_wire path
3243        // (unit tests previously injected Some(0) directly, masking the
3244        // wire 0->None collapse). First step frame reads 0 cumulative
3245        // iterations, the second reads 3000; the first phase must get a
3246        // rate rather than a silent None.
3247        let events: Vec<StimulusEvent> = [wire_event(0, 1, 0), wire_event(3000, 2, 3000)]
3248            .iter()
3249            .map(StimulusEvent::from_wire)
3250            .collect();
3251        let samples: Vec<MonitorSample> = (5..35)
3252            .map(|i| sample(i * 100, vec![(2, 1, i * 1000), (2, 1, i * 1000 + 100)]))
3253            .collect();
3254        let t = Timeline::build(&events, &samples, 0);
3255        assert!(
3256            t.phases[0].metrics.iteration_rate.is_some(),
3257            "first phase must get a rate from the 0 baseline",
3258        );
3259    }
3260
3261    #[test]
3262    fn terminal_event_gives_last_step_rate_without_phantom_phase() {
3263        // The last step has no successor step event, so its
3264        // iteration_rate needs the terminal scenario-end boundary. The
3265        // terminal must supply that boundary WITHOUT adding a phantom
3266        // trailing phase.
3267        let mut events: Vec<StimulusEvent> = [wire_event(0, 1, 0), wire_event(2000, 2, 4000)]
3268            .iter()
3269            .map(StimulusEvent::from_wire)
3270            .collect();
3271        events.push(StimulusEvent::terminal(4000, 10000));
3272        let samples: Vec<MonitorSample> = (5..45)
3273            .map(|i| sample(i * 100, vec![(2, 1, i * 1000)]))
3274            .collect();
3275        let t = Timeline::build(&events, &samples, 0);
3276        assert_eq!(
3277            t.phases.len(),
3278            2,
3279            "two step events -> two phases; terminal seeds none",
3280        );
3281        assert!(
3282            t.phases[1].metrics.iteration_rate.is_some(),
3283            "last step must get a rate from the terminal boundary",
3284        );
3285    }
3286
3287    #[test]
3288    fn build_filters_step_end_events_no_phantom_phase() {
3289        // A StepEnd must be filtered from the PHASE-LAYOUT set
3290        // (it is an end-of-hold marker, not a step boundary) so it neither
3291        // adds a phantom phase nor misaligns the dense phase index. Two
3292        // StepStart events with an interleaved StepEnd still yield exactly
3293        // two phases. (StepEnd is still consumed for the step-local RATE —
3294        // see build_pairs_step_local_when_step_end_events_present.)
3295        let events: Vec<StimulusEvent> = vec![
3296            StimulusEvent::from_wire(&wire_event(0, 1, 0)),
3297            StimulusEvent::from_step_end(&wire_event(1_900, 1, 9_000)),
3298            StimulusEvent::from_wire(&wire_event(2_000, 2, 9_000)),
3299        ];
3300        let samples: Vec<MonitorSample> = (5..35)
3301            .map(|i| sample(i * 100, vec![(2, 1, i * 1000)]))
3302            .collect();
3303        let t = Timeline::build(&events, &samples, 0);
3304        assert_eq!(
3305            t.phases.len(),
3306            2,
3307            "two StepStart events -> two phases; the interleaved StepEnd seeds none",
3308        );
3309    }
3310
3311    #[test]
3312    fn build_pairs_step_local_when_step_end_events_present() {
3313        // The monitor-only Timeline::build fallback must ALSO
3314        // use step-local StepStart[k] -> StepEnd[k] pairing when StepEnd
3315        // events are present (they are emitted independent of snapshot
3316        // captures), NOT the cross-step StepStart[k] -> StepStart[k+1]
3317        // pairing that reads 0 -> 0 for respawned-per-step workers. Two
3318        // fresh-per-step steps (each StepStart reads ~0); without
3319        // step-local pairing phase 0 would be None (0 -> 0 cross-step).
3320        // With it, both phases get a positive rate.
3321        let events: Vec<StimulusEvent> = vec![
3322            StimulusEvent::from_wire(&wire_event(0, 1, 0)), // StepStart[0], iters 0
3323            StimulusEvent::from_step_end(&wire_event(1_000, 1, 5_000)), // StepEnd[0], iters 5000
3324            StimulusEvent::from_wire(&wire_event(1_100, 2, 0)), // StepStart[1] respawned, iters 0
3325            StimulusEvent::from_step_end(&wire_event(2_100, 2, 3_000)), // StepEnd[1], iters 3000
3326        ];
3327        let samples: Vec<MonitorSample> = (1..30)
3328            .map(|i| sample(i * 100, vec![(2, 1, i * 1000)]))
3329            .collect();
3330        let t = Timeline::build(&events, &samples, 0);
3331        assert_eq!(
3332            t.phases.len(),
3333            2,
3334            "two StepStart events -> two phases (each StepEnd seeds none)",
3335        );
3336        assert!(
3337            t.phases[0].metrics.iteration_rate.is_some(),
3338            "phase 0 must get a step-local rate from StepStart[0] -> StepEnd[0], \
3339             not the cross-step 0 -> 0 None (the old cross-step fallback bug)",
3340        );
3341        assert!(
3342            t.phases[1].metrics.iteration_rate.is_some(),
3343            "phase 1 (respawned workers) must get its own step-local rate",
3344        );
3345    }
3346
3347    #[test]
3348    fn build_stalled_step_with_step_end_reports_measured_zero_not_cross_step() {
3349        // Monitor-only path: a step that HAS a StepEnd but
3350        // stalled (StepEnd[k] == StepStart[k]) reports its MEASURED-ZERO
3351        // step-local rate (Some(0.0)) — its StepEnd lookup hits, so the
3352        // cross-step fallback must NOT run. Mirrors the snapshot path's
3353        // build_phase_buckets_with_stimulus_stalled_step_reports_measured_zero.
3354        // Step 0 stalls (0 -> 0); a persistent population reads 500 at
3355        // StepStart[1], so a cross-step StepStart[0] -> StepStart[1] leak
3356        // would be ~454/s. Step 1 advances 500 -> 5500 (5000/s).
3357        let events: Vec<StimulusEvent> = vec![
3358            StimulusEvent::from_wire(&wire_event(0, 1, 0)), // StepStart[0], iters 0
3359            StimulusEvent::from_step_end(&wire_event(1_000, 1, 0)), // StepEnd[0], STALLED 0
3360            StimulusEvent::from_wire(&wire_event(1_100, 2, 500)), // StepStart[1], persistent 500
3361            StimulusEvent::from_step_end(&wire_event(2_100, 2, 5_500)), // StepEnd[1], iters 5500
3362        ];
3363        let samples: Vec<MonitorSample> = (1..30)
3364            .map(|i| sample(i * 100, vec![(2, 1, i * 1000)]))
3365            .collect();
3366        let t = Timeline::build(&events, &samples, 0);
3367        assert_eq!(t.phases.len(), 2);
3368        assert_eq!(
3369            t.phases[0].metrics.iteration_rate,
3370            Some(0.0),
3371            "a stalled step reports measured-zero throughput, not the \
3372             cross-step StepStart[0] -> StepStart[1] persistent-leak rate",
3373        );
3374        assert!(
3375            t.phases[1].metrics.iteration_rate.is_some(),
3376            "step 1 still reports its own step-local rate",
3377        );
3378    }
3379
3380    #[test]
3381    fn terminal_event_single_step_rate() {
3382        // Boundary case: a one-step scenario (first == last). With the
3383        // 0 baseline and the terminal boundary supplying the right edge,
3384        // the single step still gets a rate, and the terminal adds no
3385        // phase.
3386        let mut events: Vec<StimulusEvent> = [wire_event(0, 1, 0)]
3387            .iter()
3388            .map(StimulusEvent::from_wire)
3389            .collect();
3390        events.push(StimulusEvent::terminal(3000, 9000));
3391        let samples: Vec<MonitorSample> = (5..35)
3392            .map(|i| sample(i * 100, vec![(2, 1, i * 1000)]))
3393            .collect();
3394        let t = Timeline::build(&events, &samples, 0);
3395        assert_eq!(
3396            t.phases.len(),
3397            1,
3398            "single step -> one phase; terminal adds none"
3399        );
3400        assert!(
3401            t.phases[0].metrics.iteration_rate.is_some(),
3402            "single step gets a rate (first == last)",
3403        );
3404    }
3405
3406    #[test]
3407    fn terminal_event_stalled_last_step_reports_measured_zero() {
3408        // Boundary case: the last step's counter did not advance
3409        // (terminal count == last step-start count): e == s. That is
3410        // MEASURED ZERO throughput — a real value (the strongest
3411        // degradation signal), not "unmeasured" — so rate_to returns
3412        // Some(0.0), and the zero surfaces to the degradation detector.
3413        // Only a counter DECREASE (e < s) is unmeasurable -> None.
3414        let mut events: Vec<StimulusEvent> = [wire_event(0, 1, 0), wire_event(2000, 2, 4000)]
3415            .iter()
3416            .map(StimulusEvent::from_wire)
3417            .collect();
3418        events.push(StimulusEvent::terminal(4000, 4000)); // no advance
3419        let samples: Vec<MonitorSample> = (5..45)
3420            .map(|i| sample(i * 100, vec![(2, 1, i * 1000)]))
3421            .collect();
3422        let t = Timeline::build(&events, &samples, 0);
3423        assert_eq!(t.phases.len(), 2);
3424        assert_eq!(
3425            t.phases[1].metrics.iteration_rate,
3426            Some(0.0),
3427            "stalled last step (e == s) reports measured-zero, not None",
3428        );
3429    }
3430
3431    #[test]
3432    fn iteration_rate_counter_decrease_yields_no_rate() {
3433        // A counter DECREASE between consecutive step frames (e.g. a
3434        // step-local worker population reset) is unmeasurable and must NOT
3435        // produce a negative or conflated rate — the `e < s` guard drops
3436        // the pair, returning None (distinct from `e == s`, which is a
3437        // measured-zero Some(0.0)). Pin it so a future change that loosens
3438        // the guard to allow a negative delta fails here.
3439        let events: Vec<StimulusEvent> = [
3440            wire_event(0, 1, 0),
3441            wire_event(2000, 2, 5000),
3442            wire_event(3000, 3, 1000), // counter dropped 5000 -> 1000
3443        ]
3444        .iter()
3445        .map(StimulusEvent::from_wire)
3446        .collect();
3447        let samples: Vec<MonitorSample> = (5..35)
3448            .map(|i| sample(i * 100, vec![(2, 1, i * 1000)]))
3449            .collect();
3450        let t = Timeline::build(&events, &samples, 0);
3451        // phase 1 is step 2 (frame iters 5000 -> next 1000): decrease.
3452        assert!(
3453            t.phases[1].metrics.iteration_rate.is_none(),
3454            "a counter decrease must not manufacture a (negative) rate",
3455        );
3456    }
3457
3458    #[test]
3459    fn iteration_rate_zero_duration_yields_no_rate() {
3460        // Two consecutive frames with identical elapsed_ms -> the rate
3461        // denominator is 0; the duration==0 guard must drop the pair
3462        // rather than divide and produce inf/NaN.
3463        let events: Vec<StimulusEvent> = [wire_event(1000, 1, 0), wire_event(1000, 2, 2000)]
3464            .iter()
3465            .map(StimulusEvent::from_wire)
3466            .collect();
3467        let samples: Vec<MonitorSample> = (5..35)
3468            .map(|i| sample(i * 100, vec![(2, 1, i * 1000)]))
3469            .collect();
3470        let t = Timeline::build(&events, &samples, 0);
3471        assert!(
3472            t.phases[0].metrics.iteration_rate.is_none(),
3473            "zero-duration pair must not divide; rate stays None",
3474        );
3475    }
3476
3477    #[test]
3478    fn terminal_not_last_does_not_misalign_or_misattribute() {
3479        // Robustness: even if a corrupt/out-of-order elapsed_ms made the
3480        // terminal sort BEFORE a real step, the explicit is_terminal
3481        // extraction (not positional) must keep the step phases aligned
3482        // and attribute the early step's rate correctly. A corrupt
3483        // terminal contributes no spurious rate (its position can't
3484        // shift the dense phase index).
3485        let mut events: Vec<StimulusEvent> = [wire_event(0, 1, 0), wire_event(2000, 2, 4000)]
3486            .iter()
3487            .map(StimulusEvent::from_wire)
3488            .collect();
3489        // Terminal with elapsed_ms BEFORE step 2 (simulated corruption).
3490        events.push(StimulusEvent::terminal(500, 9000));
3491        let samples: Vec<MonitorSample> = (5..45)
3492            .map(|i| sample(i * 100, vec![(2, 1, i * 1000)]))
3493            .collect();
3494        let t = Timeline::build(&events, &samples, 0);
3495        // Two step events -> two phases regardless of terminal position.
3496        assert_eq!(
3497            t.phases.len(),
3498            2,
3499            "terminal position must not change phase count"
3500        );
3501        // Phase 0 (step 1) still gets its correct rate (0 -> 4000 over
3502        // 2s = 2000/s): the misordered terminal did not misalign it.
3503        assert_eq!(
3504            t.phases[0].metrics.iteration_rate,
3505            Some(2000.0),
3506            "early step rate must be correct despite a misordered terminal",
3507        );
3508    }
3509
3510    #[test]
3511    fn throughput_degradation_detected() {
3512        // Phase 0: high throughput (0 -> 10000 iters over ~2s = ~5000/s)
3513        // Phase 1: low throughput (10000 -> 11000 iters over ~2s = ~500/s)
3514        // 90% drop exceeds ITERATION_RATE_REL_THRESHOLD (0.3).
3515        let events = vec![
3516            stimulus_with_iters(0, "ScenarioStart", 0),
3517            stimulus_with_iters(2000, "StepStart[0]", 10000),
3518            stimulus_with_iters(4000, "StepEnd[0]", 11000),
3519        ];
3520        let samples: Vec<MonitorSample> = (5..45)
3521            .map(|i| sample(i * 100, vec![(2, 1, i * 1000), (2, 1, i * 1000 + 100)]))
3522            .collect();
3523        let t = Timeline::build(&events, &samples, 0);
3524        assert_eq!(t.phases.len(), 3);
3525        // Phase 0 should have high iteration_rate.
3526        assert!(t.phases[0].metrics.iteration_rate.is_some());
3527        // Phase 1 should have low iteration_rate.
3528        assert!(t.phases[1].metrics.iteration_rate.is_some());
3529        let r0 = t.phases[0].metrics.iteration_rate.unwrap();
3530        let r1 = t.phases[1].metrics.iteration_rate.unwrap();
3531        assert!(
3532            r0 > r1,
3533            "phase 0 rate ({r0}) should exceed phase 1 rate ({r1})"
3534        );
3535
3536        // Throughput degradation should be detected at phase 1 boundary.
3537        let degs: Vec<_> = t
3538            .degradations()
3539            .into_iter()
3540            .filter(|(_, c)| c.metric == "throughput")
3541            .collect();
3542        assert!(!degs.is_empty(), "throughput degradation must be detected");
3543        let (phase, change) = &degs[0];
3544        assert_eq!(phase.index, 1);
3545        assert_eq!(change.direction, ChangeDirection::Degraded);
3546        assert!(change.before > change.after);
3547    }
3548
3549    #[test]
3550    fn throughput_collapse_to_zero_is_flagged() {
3551        // A phase that collapses to ZERO throughput (e == s, measured
3552        // zero) must be flagged as a degradation — it is the strongest
3553        // degradation signal. Previously the zero phase's rate_to returned
3554        // None, so the detector's Some/Some gate dropped it and the worst
3555        // degradation went silently unreported.
3556        let events = vec![
3557            stimulus_with_iters(0, "ScenarioStart", 0),
3558            stimulus_with_iters(2000, "StepStart[0]", 10000), // phase 0: ~5000/s
3559            stimulus_with_iters(4000, "StepStart[1]", 10000), // phase 1: 0/s (stalled)
3560        ];
3561        let samples: Vec<MonitorSample> = (5..45)
3562            .map(|i| sample(i * 100, vec![(2, 1, i * 1000)]))
3563            .collect();
3564        let t = Timeline::build(&events, &samples, 0);
3565        assert_eq!(
3566            t.phases[1].metrics.iteration_rate,
3567            Some(0.0),
3568            "the collapsed phase must report measured-zero throughput",
3569        );
3570        let degs: Vec<_> = t
3571            .degradations()
3572            .into_iter()
3573            .filter(|(p, c)| p.index == 1 && c.metric == "throughput")
3574            .collect();
3575        assert!(
3576            !degs.is_empty(),
3577            "a collapse to zero throughput must be flagged as a degradation",
3578        );
3579        assert_eq!(degs[0].1.direction, ChangeDirection::Degraded);
3580        assert_eq!(degs[0].1.after, 0.0);
3581    }
3582
3583    #[test]
3584    fn throughput_improvement_detected() {
3585        // Phase 0: low throughput (0 -> 500 iters over ~2s = ~250/s)
3586        // Phase 1: high throughput (500 -> 10500 iters over ~2s = ~5000/s)
3587        // >30% increase should be flagged as improvement.
3588        let events = vec![
3589            stimulus_with_iters(0, "ScenarioStart", 0),
3590            stimulus_with_iters(2000, "StepStart[0]", 500),
3591            stimulus_with_iters(4000, "StepEnd[0]", 10500),
3592        ];
3593        let samples: Vec<MonitorSample> = (5..45)
3594            .map(|i| sample(i * 100, vec![(2, 1, i * 1000), (2, 1, i * 1000 + 100)]))
3595            .collect();
3596        let t = Timeline::build(&events, &samples, 0);
3597        let improvements: Vec<_> = t
3598            .phases
3599            .iter()
3600            .flat_map(|p| p.changes.iter())
3601            .filter(|c| c.metric == "throughput" && c.direction == ChangeDirection::Improved)
3602            .collect();
3603        assert!(
3604            !improvements.is_empty(),
3605            "throughput improvement must be detected"
3606        );
3607    }
3608
3609    #[test]
3610    fn throughput_stable_below_threshold() {
3611        // Phase 0: 1000 iter/s
3612        // Phase 1: ~900 iter/s (10% drop, below 30% threshold)
3613        // No throughput change should be detected.
3614        let events = vec![
3615            stimulus_with_iters(0, "ScenarioStart", 0),
3616            stimulus_with_iters(2000, "StepStart[0]", 2000),
3617            stimulus_with_iters(4000, "StepEnd[0]", 3800),
3618        ];
3619        let samples: Vec<MonitorSample> = (5..45)
3620            .map(|i| sample(i * 100, vec![(2, 1, i * 1000), (2, 1, i * 1000 + 100)]))
3621            .collect();
3622        let t = Timeline::build(&events, &samples, 0);
3623        let throughput_changes: Vec<_> = t
3624            .phases
3625            .iter()
3626            .flat_map(|p| p.changes.iter())
3627            .filter(|c| c.metric == "throughput")
3628            .collect();
3629        assert!(
3630            throughput_changes.is_empty(),
3631            "10% change should not trigger throughput change detection"
3632        );
3633    }
3634
3635    #[test]
3636    fn from_phase_buckets_maps_known_metrics_and_renders_phase_block() {
3637        use crate::assert::PhaseBucket;
3638        use std::collections::BTreeMap;
3639        let mut s0_metrics = BTreeMap::new();
3640        s0_metrics.insert("max_dsq_depth".to_string(), 7.0);
3641        s0_metrics.insert("avg_dsq_depth".to_string(), 2.5);
3642        s0_metrics.insert("max_imbalance_ratio".to_string(), 3.5);
3643        s0_metrics.insert("avg_imbalance_ratio".to_string(), 1.8);
3644        s0_metrics.insert("avg_nr_running".to_string(), 3.0);
3645        s0_metrics.insert("total_fallback".to_string(), 200.0);
3646        let buckets = vec![
3647            PhaseBucket {
3648                per_cgroup: Default::default(),
3649                step_index: 0,
3650                label: "BASELINE".to_string(),
3651                start_ms: 0,
3652                end_ms: 1000,
3653                sample_count: 5,
3654                metrics: BTreeMap::new(),
3655            },
3656            PhaseBucket {
3657                per_cgroup: Default::default(),
3658                step_index: 1,
3659                label: "Step[0]".to_string(),
3660                start_ms: 1000,
3661                end_ms: 6000,
3662                sample_count: 20,
3663                metrics: s0_metrics,
3664            },
3665        ];
3666        let t = Timeline::from_phase_buckets(&buckets, &[], &TimelineContext::default());
3667        assert_eq!(t.phases.len(), 2);
3668        // Phase 0 (BASELINE) — no stimulus, no metrics.
3669        assert!(t.phases[0].stimulus.is_none());
3670        assert_eq!(t.phases[0].metrics.sample_count, 5);
3671        assert_eq!(t.phases[0].metrics.max_dsq_depth, 0);
3672        // Phase 1 (Step[0]) — stimulus set, metrics projected from
3673        // the bucket map.
3674        assert!(t.phases[1].stimulus.is_some());
3675        assert_eq!(t.phases[1].stimulus.as_ref().unwrap().label, "Step[0]");
3676        assert_eq!(t.phases[1].metrics.sample_count, 20);
3677        assert_eq!(t.phases[1].metrics.max_dsq_depth, 7);
3678        assert!((t.phases[1].metrics.avg_dsq_depth.unwrap() - 2.5).abs() < f64::EPSILON);
3679        assert!((t.phases[1].metrics.max_imbalance.unwrap() - 3.5).abs() < f64::EPSILON);
3680        assert!((t.phases[1].metrics.avg_imbalance.unwrap() - 1.8).abs() < f64::EPSILON);
3681        // Snapshot-path read (phase_from_bucket): bucket.metrics["avg_nr_running"]
3682        // projects to PhaseMetrics.avg_nr_running; phase 0's empty map -> None.
3683        assert!((t.phases[1].metrics.avg_nr_running.unwrap() - 3.0).abs() < f64::EPSILON);
3684        assert_eq!(t.phases[0].metrics.avg_nr_running, None);
3685        // fallback_rate = 200 / (5000 / 1000) = 40.0 events/s
3686        assert_eq!(t.phases[1].metrics.fallback_rate, Some(40.0));
3687        // keep_last_rate absent → None (no total_keep_last in metrics map)
3688        assert_eq!(t.phases[1].metrics.keep_last_rate, None);
3689        // avg_dsq_depth + avg_imbalance + avg_nr_running are now
3690        // all wired (per the doc table). iteration_rate is the only field
3691        // PhaseBucket cannot supply directly (depends on stimulus
3692        // event totals, not a per-Sample reading).
3693        assert_eq!(t.phases[1].metrics.iteration_rate, None);
3694        // Render produces a non-empty timeline block.
3695        let formatted = t.format_with_context(&TimelineContext::default());
3696        assert!(formatted.contains("--- timeline ---"));
3697        assert!(formatted.contains("BASELINE"));
3698        assert!(formatted.contains("Step[0]"));
3699    }
3700
3701    /// Collapse-suppression production path: a throughput collapse INTO a synthesized
3702    /// zero-capture step surfaces through from_phase_buckets (the path
3703    /// `evaluate_vm_result` prefers), not only via the helper unit test.
3704    /// BASELINE captures samples + an iteration_rate; Step[0] is
3705    /// synthesized (sample_count 0) with a collapsed iteration_rate AND a
3706    /// divergent imbalance that must stay gated. Re-adding the old
3707    /// `if sample_count==0 { continue }` gate at this call site makes
3708    /// phases[1].changes empty, so this test fails — it is the
3709    /// regression pin for the removed gate. The producer half — that
3710    /// build_phase_buckets_with_stimulus actually populates a synthesized
3711    /// bucket's iteration_rate from stimulus deltas — is pinned by
3712    /// assert::tests_phase_bucket::build_phase_buckets_with_stimulus_synthesizes_zero_capture_step_bucket;
3713    /// together they pin the full producer->consumer chain.
3714    #[test]
3715    fn from_phase_buckets_flags_throughput_into_synthesized_step() {
3716        use crate::assert::PhaseBucket;
3717        use std::collections::BTreeMap;
3718        let mut baseline_metrics = BTreeMap::new();
3719        baseline_metrics.insert("iteration_rate".to_string(), 1000.0);
3720        baseline_metrics.insert("avg_imbalance_ratio".to_string(), 1.0);
3721        let mut step_metrics = BTreeMap::new();
3722        step_metrics.insert("iteration_rate".to_string(), 300.0); // 70% collapse
3723        step_metrics.insert("avg_imbalance_ratio".to_string(), 99.0); // must stay gated
3724        let buckets = vec![
3725            PhaseBucket {
3726                per_cgroup: Default::default(),
3727                step_index: 0,
3728                label: "BASELINE".to_string(),
3729                start_ms: 0,
3730                end_ms: 1000,
3731                sample_count: 5,
3732                metrics: baseline_metrics,
3733            },
3734            PhaseBucket {
3735                per_cgroup: Default::default(),
3736                step_index: 1,
3737                label: "Step[0]".to_string(),
3738                start_ms: 1000,
3739                end_ms: 6000,
3740                sample_count: 0, // synthesized zero-capture step
3741                metrics: step_metrics,
3742            },
3743        ];
3744        let t = Timeline::from_phase_buckets(&buckets, &[], &TimelineContext::default());
3745        assert_eq!(t.phases[1].metrics.sample_count, 0);
3746        assert_eq!(t.phases[1].metrics.iteration_rate, Some(300.0));
3747        let throughput: Vec<_> = t.phases[1]
3748            .changes
3749            .iter()
3750            .filter(|c| c.metric == "throughput")
3751            .collect();
3752        assert_eq!(
3753            throughput.len(),
3754            1,
3755            "throughput collapse into a synthesized step must surface via \
3756             from_phase_buckets: {:?}",
3757            t.phases[1].changes,
3758        );
3759        assert_eq!(throughput[0].direction, ChangeDirection::Degraded);
3760        assert!(
3761            !t.phases[1].changes.iter().any(|c| c.metric == "imbalance"),
3762            "monitor metrics must stay gated for a zero-sample side: {:?}",
3763            t.phases[1].changes,
3764        );
3765    }
3766
3767    /// Boundary change-detection on the from_phase_buckets path — the
3768    /// PRODUCTION success path (`evaluate_vm_result` prefers
3769    /// from_phase_buckets over `build`). Two adjacent metric-bearing
3770    /// buckets whose avg_imbalance / avg_dsq_depth cross the thresholds
3771    /// in the worsening direction must record Degraded changes on the
3772    /// ENTERED phase (phases[1]), and the BASELINE phase records none.
3773    /// Without this, the from_phase_buckets boundary-change loop (the
3774    /// `for i in 1..phases.len()` call to detect_boundary_changes) ships
3775    /// unverified (a wrong threshold, inverted direction, wrong-phase
3776    /// recording, or wrong metric field would all slip past the other
3777    /// from_phase_buckets tests, which never trigger the loop).
3778    #[test]
3779    fn from_phase_buckets_detects_boundary_degradation() {
3780        use crate::assert::PhaseBucket;
3781        use std::collections::BTreeMap;
3782        let mut base = BTreeMap::new();
3783        base.insert("avg_imbalance_ratio".to_string(), 1.0);
3784        base.insert("avg_dsq_depth".to_string(), 1.0);
3785        let mut step = BTreeMap::new();
3786        step.insert("avg_imbalance_ratio".to_string(), 2.0); // +1.0 > 0.5 threshold
3787        step.insert("avg_dsq_depth".to_string(), 6.0); // +5.0 > 3.0 threshold
3788        let buckets = vec![
3789            PhaseBucket {
3790                per_cgroup: Default::default(),
3791                step_index: 0,
3792                label: "BASELINE".to_string(),
3793                start_ms: 0,
3794                end_ms: 1000,
3795                sample_count: 5,
3796                metrics: base,
3797            },
3798            PhaseBucket {
3799                per_cgroup: Default::default(),
3800                step_index: 1,
3801                label: "Step[0]".to_string(),
3802                start_ms: 1000,
3803                end_ms: 6000,
3804                sample_count: 20,
3805                metrics: step,
3806            },
3807        ];
3808        let t = Timeline::from_phase_buckets(&buckets, &[], &TimelineContext::default());
3809        // Change recorded on the ENTERED phase, never the prior one.
3810        assert!(
3811            t.phases[0].changes.is_empty(),
3812            "BASELINE has no prior phase to diff; changes belong to the entered phase",
3813        );
3814        let changes = &t.phases[1].changes;
3815        let imb = changes
3816            .iter()
3817            .find(|c| c.metric == "imbalance")
3818            .expect("imbalance change must fire (1.0 -> 2.0 crosses 0.5)");
3819        assert_eq!(imb.direction, ChangeDirection::Degraded);
3820        assert!((imb.before - 1.0).abs() < f64::EPSILON);
3821        assert!((imb.after - 2.0).abs() < f64::EPSILON);
3822        let dsq = changes
3823            .iter()
3824            .find(|c| c.metric == "dsq_depth")
3825            .expect("dsq_depth change must fire (1.0 -> 6.0 crosses 3.0)");
3826        assert_eq!(dsq.direction, ChangeDirection::Degraded);
3827        assert!((dsq.before - 1.0).abs() < f64::EPSILON);
3828        assert!((dsq.after - 6.0).abs() < f64::EPSILON);
3829    }
3830
3831    /// Sub-threshold deltas record NO change — guards a dropped/zeroed
3832    /// threshold that would fabricate spurious boundary changes.
3833    #[test]
3834    fn from_phase_buckets_subthreshold_records_no_change() {
3835        use crate::assert::PhaseBucket;
3836        use std::collections::BTreeMap;
3837        let mut base = BTreeMap::new();
3838        base.insert("avg_imbalance_ratio".to_string(), 1.0);
3839        base.insert("avg_dsq_depth".to_string(), 1.0);
3840        let mut step = BTreeMap::new();
3841        step.insert("avg_imbalance_ratio".to_string(), 1.2); // +0.2 < 0.5
3842        step.insert("avg_dsq_depth".to_string(), 2.0); // +1.0 < 3.0
3843        let buckets = vec![
3844            PhaseBucket {
3845                per_cgroup: Default::default(),
3846                step_index: 0,
3847                label: "BASELINE".to_string(),
3848                start_ms: 0,
3849                end_ms: 1000,
3850                sample_count: 5,
3851                metrics: base,
3852            },
3853            PhaseBucket {
3854                per_cgroup: Default::default(),
3855                step_index: 1,
3856                label: "Step[0]".to_string(),
3857                start_ms: 1000,
3858                end_ms: 6000,
3859                sample_count: 20,
3860                metrics: step,
3861            },
3862        ];
3863        let t = Timeline::from_phase_buckets(&buckets, &[], &TimelineContext::default());
3864        assert!(
3865            t.phases[1].changes.is_empty(),
3866            "sub-threshold deltas must not record a boundary change",
3867        );
3868    }
3869
3870    /// Decreasing imbalance across the boundary records an IMPROVEMENT —
3871    /// locks the higher_is_worse direction so an inverted flag cannot
3872    /// report a regression as an improvement (or vice versa).
3873    #[test]
3874    fn from_phase_buckets_detects_boundary_improvement() {
3875        use crate::assert::PhaseBucket;
3876        use std::collections::BTreeMap;
3877        let mut base = BTreeMap::new();
3878        base.insert("avg_imbalance_ratio".to_string(), 2.0);
3879        let mut step = BTreeMap::new();
3880        step.insert("avg_imbalance_ratio".to_string(), 1.0); // -1.0, |delta|>0.5, after<before
3881        let buckets = vec![
3882            PhaseBucket {
3883                per_cgroup: Default::default(),
3884                step_index: 0,
3885                label: "BASELINE".to_string(),
3886                start_ms: 0,
3887                end_ms: 1000,
3888                sample_count: 5,
3889                metrics: base,
3890            },
3891            PhaseBucket {
3892                per_cgroup: Default::default(),
3893                step_index: 1,
3894                label: "Step[0]".to_string(),
3895                start_ms: 1000,
3896                end_ms: 6000,
3897                sample_count: 20,
3898                metrics: step,
3899            },
3900        ];
3901        let t = Timeline::from_phase_buckets(&buckets, &[], &TimelineContext::default());
3902        let imb = t.phases[1]
3903            .changes
3904            .iter()
3905            .find(|c| c.metric == "imbalance")
3906            .expect("imbalance change must fire (2.0 -> 1.0 crosses 0.5)");
3907        assert_eq!(
3908            imb.direction,
3909            ChangeDirection::Improved,
3910            "a decreasing imbalance is an improvement, not a degradation",
3911        );
3912    }
3913
3914    /// from_phase_buckets must CORRELATE a real stimulus event into the
3915    /// phase header, carrying its op_kind + detail. Every other
3916    /// from_phase_buckets test passes `&[]`, so only the synthetic
3917    /// None-placeholder arm ran and the `Some(ev) => (*ev).clone()`
3918    /// correlation arm (added to stop headers degrading to "Step[N]: ?")
3919    /// was untested. A wrong interval bound or cloning the wrong event
3920    /// would drop the operator-facing op/detail with no failure.
3921    #[test]
3922    fn from_phase_buckets_correlates_real_stimulus_op_and_detail() {
3923        use crate::assert::PhaseBucket;
3924        use std::collections::BTreeMap;
3925        let event = StimulusEvent {
3926            elapsed_ms: 1000,
3927            label: "Step[0]".to_string(),
3928            op_kind: Some("SetCpuset".to_string()),
3929            detail: Some("4 cpus".to_string()),
3930            total_iterations: None,
3931            step_index: Some(1),
3932            is_terminal: false,
3933            is_step_end: false,
3934        };
3935        let buckets = vec![
3936            PhaseBucket {
3937                per_cgroup: Default::default(),
3938                step_index: 0,
3939                label: "BASELINE".to_string(),
3940                start_ms: 0,
3941                end_ms: 1000,
3942                sample_count: 5,
3943                metrics: BTreeMap::new(),
3944            },
3945            PhaseBucket {
3946                per_cgroup: Default::default(),
3947                step_index: 1,
3948                label: "Step[0]".to_string(),
3949                start_ms: 1000,
3950                end_ms: 6000,
3951                sample_count: 20,
3952                metrics: BTreeMap::new(),
3953            },
3954        ];
3955        let t = Timeline::from_phase_buckets(&buckets, &[event], &TimelineContext::default());
3956        let stim = t.phases[1]
3957            .stimulus
3958            .as_ref()
3959            .expect("Step[0] phase carries a stimulus");
3960        assert_eq!(
3961            stim.op_kind.as_deref(),
3962            Some("SetCpuset"),
3963            "the correlated event's op_kind must be carried, not the None placeholder",
3964        );
3965        assert_eq!(stim.detail.as_deref(), Some("4 cpus"));
3966    }
3967
3968    #[test]
3969    fn from_phase_buckets_zero_duration_window_emits_no_rate() {
3970        use crate::assert::PhaseBucket;
3971        use std::collections::BTreeMap;
3972        let mut metrics = BTreeMap::new();
3973        metrics.insert("total_fallback".to_string(), 100.0);
3974        let bucket = PhaseBucket {
3975            per_cgroup: Default::default(),
3976            step_index: 1,
3977            label: "Step[0]".to_string(),
3978            start_ms: 500,
3979            end_ms: 500,
3980            sample_count: 1,
3981            metrics,
3982        };
3983        let t = Timeline::from_phase_buckets(&[bucket], &[], &TimelineContext::default());
3984        // Degenerate window (start == end) yields duration_s == 0,
3985        // so rate divisions stay None rather than producing
3986        // spurious infinities.
3987        assert_eq!(t.phases[0].metrics.fallback_rate, None);
3988    }
3989
3990    #[test]
3991    fn from_phase_buckets_absent_imbalance_metric_is_none_not_zero() {
3992        // A bucket carrying no avg_imbalance_ratio / avg_dsq_depth
3993        // metric must yield None (no data), NOT Some(0.0) — so the change
3994        // detector skips it instead of comparing a false zero-imbalance.
3995        use crate::assert::PhaseBucket;
3996        use std::collections::BTreeMap;
3997        let bucket = PhaseBucket {
3998            per_cgroup: Default::default(),
3999            step_index: 1,
4000            label: "Step[0]".to_string(),
4001            start_ms: 100,
4002            end_ms: 600,
4003            sample_count: 3,
4004            metrics: BTreeMap::new(),
4005        };
4006        let t = Timeline::from_phase_buckets(&[bucket], &[], &TimelineContext::default());
4007        assert_eq!(t.phases[0].metrics.avg_imbalance, None);
4008        assert_eq!(t.phases[0].metrics.max_imbalance, None);
4009        assert_eq!(t.phases[0].metrics.avg_dsq_depth, None);
4010    }
4011
4012    #[test]
4013    fn from_phase_buckets_sorts_by_step_index() {
4014        use crate::assert::PhaseBucket;
4015        use std::collections::BTreeMap;
4016        // Out-of-order input; from_phase_buckets must sort by
4017        // step_index so the rendered phase block walks BASELINE
4018        // → Step[0] → Step[1] in time order regardless of
4019        // how the caller arranged the input vec.
4020        let buckets = vec![
4021            PhaseBucket {
4022                per_cgroup: Default::default(),
4023                step_index: 2,
4024                label: "Step[1]".to_string(),
4025                start_ms: 2000,
4026                end_ms: 3000,
4027                sample_count: 5,
4028                metrics: BTreeMap::new(),
4029            },
4030            PhaseBucket {
4031                per_cgroup: Default::default(),
4032                step_index: 0,
4033                label: "BASELINE".to_string(),
4034                start_ms: 0,
4035                end_ms: 500,
4036                sample_count: 2,
4037                metrics: BTreeMap::new(),
4038            },
4039            PhaseBucket {
4040                per_cgroup: Default::default(),
4041                step_index: 1,
4042                label: "Step[0]".to_string(),
4043                start_ms: 500,
4044                end_ms: 2000,
4045                sample_count: 5,
4046                metrics: BTreeMap::new(),
4047            },
4048        ];
4049        let t = Timeline::from_phase_buckets(&buckets, &[], &TimelineContext::default());
4050        assert_eq!(t.phases.len(), 3);
4051        assert_eq!(t.phases[0].start_ms, 0);
4052        assert_eq!(t.phases[1].start_ms, 500);
4053        assert_eq!(t.phases[2].start_ms, 2000);
4054    }
4055
4056    #[test]
4057    fn iteration_rate_in_formatted_output() {
4058        let events = vec![
4059            stimulus_with_iters(0, "ScenarioStart", 0),
4060            stimulus_with_iters(2000, "StepStart[0]", 5000),
4061        ];
4062        let samples: Vec<MonitorSample> = (5..25)
4063            .map(|i| sample(i * 100, vec![(2, 1, i * 1000), (2, 1, i * 1000 + 100)]))
4064            .collect();
4065        let t = Timeline::build(&events, &samples, 0);
4066        let formatted = t.format_with_context(&TimelineContext::default());
4067        assert!(
4068            formatted.contains("throughput:"),
4069            "format output must contain throughput when iteration_rate is set"
4070        );
4071        assert!(formatted.contains("iter/s"));
4072    }
4073}