ktstr/assert/
temporal.rs

1//! Temporal-assertion patterns over a periodic
2//! [`SampleSeries`](crate::scenario::sample::SampleSeries).
3//!
4//! `SeriesField<T>` is a per-sample column extracted from the
5//! series via `SampleSeries::bpf` or `SampleSeries::stats` (or
6//! the typed `bpf_map` / `stats_path` projectors). It carries a
7//! parallel `(tag, elapsed_ms, SnapshotResult<T>)` triple per
8//! sample so any failure-path message can name the offending tag
9//! and timestamp without re-walking the source data.
10//!
11//! The seven built-in patterns are:
12//!   1. `nondecreasing` — counter monotonicity (`v[i] <= v[i+1]`).
13//!   2. `strictly_increasing` — strict counter monotonicity
14//!      (`v[i] < v[i+1]`).
15//!   3. `rate_within(lo, hi)` — bounded delta-per-millisecond
16//!      between consecutive samples.
17//!   4. `steady_within(warmup, tolerance)` — post-warmup samples
18//!      stay inside `[mean·(1-tol), mean·(1+tol)]`.
19//!   5. `converges_to(target, tol, deadline_ms)` — three
20//!      consecutive samples land inside `[target-tol, target+tol]`
21//!      at or before `deadline_ms`.
22//!   6. `always_true` — boolean invariant at every sample
23//!      (`SeriesField<bool>` only).
24//!   7. `ratio_within(other, lo, hi)` — cross-field correlation
25//!      between two same-length series.
26//!
27//! Per-sample scalar checks bypass the temporal patterns via
28//! [`SeriesField::each`], which yields an [`EachClaim`] supporting
29//! `at_least` / `at_most` / `between`. All patterns route through
30//! [`Verdict`] and tag failures with [`DetailKind::Temporal`].
31
32use crate::scenario::snapshot::{SnapshotError, SnapshotResult};
33
34use super::{AssertDetail, DetailKind, Outcome, Verdict};
35
36/// Per-sample triple `(tag, elapsed_ms, &value)` yielded by
37/// [`SeriesField::iter_full`] and stored in the per-phase buckets
38/// returned by [`SeriesField::by_phase`].
39pub type SampleTriple<'a, T> = (&'a str, Option<u64>, &'a SnapshotResult<T>);
40
41/// Return shape of [`SeriesField::by_phase`]: a `BTreeMap` keyed by
42/// `Phase` carrying the samples whose source row had a stamped
43/// step_index, plus a separate `none_bucket` for unstamped /
44/// fixture samples.
45pub type ByPhasePartition<'a, T> = (
46    std::collections::BTreeMap<crate::assert::Phase, Vec<SampleTriple<'a, T>>>,
47    Vec<SampleTriple<'a, T>>,
48);
49
50/// Render the numeric part of an optional sample timestamp for the
51/// `(+{n}ms)` failure-/skip-message convention: `Some(ms)` -> `"123"`,
52/// `None` (the bridge recorded no timestamp for this sample) -> `"?"`,
53/// so a not-measured sample reads as `+?ms` — visibly distinct from a
54/// measured `+0ms`. Keeping it numeric lets the existing
55/// `"(+{elapsed_ms}ms)"` format strings stay unchanged.
56fn fmt_elapsed_num(elapsed_ms: Option<u64>) -> String {
57    match elapsed_ms {
58        Some(ms) => ms.to_string(),
59        None => "?".to_string(),
60    }
61}
62
63/// Per-sample column extracted from a
64/// [`SampleSeries`](crate::scenario::sample::SampleSeries). Each
65/// slot is a [`SnapshotResult<T>`] so a missing or
66/// type-mismatched field does NOT abort the whole projection — it
67/// surfaces at the temporal-assertion site as a per-sample error
68/// the caller decides how to handle.
69///
70/// The label, tags, and per-sample timestamps are carried so
71/// failure-path messages name the offending sample without the
72/// caller re-threading the series. Tags and elapsed-ms vectors
73/// are always the same length as `values`.
74#[derive(Debug, Clone)]
75#[must_use = "SeriesField records nothing until a temporal pattern is invoked"]
76pub struct SeriesField<T> {
77    label: String,
78    tags: Vec<String>,
79    elapsed_ms: Vec<Option<u64>>,
80    values: Vec<SnapshotResult<T>>,
81    /// Per-sample scenario phase, mirrored from the
82    /// [`crate::scenario::sample::Sample::step_index`] each value
83    /// was projected from. `None` for unstamped fixture samples
84    /// (no bridge phase context); `Some(phase)` for production
85    /// captures whose source row carried a step_index. Same length
86    /// as `values` (or empty by the from_parts contract — the
87    /// 4-arg constructor fills with all-`None` for backward-compat
88    /// callers that didn't have the phase column yet).
89    phases: Vec<Option<crate::assert::Phase>>,
90}
91
92impl<T> SeriesField<T> {
93    /// Build a new field. Internal — projection helpers in
94    /// [`crate::scenario::sample`] call this on the series side.
95    /// 4-arg backward-compat constructor: defaults `phases` to all
96    /// `None`. New consumers that have phase context per sample
97    /// should call [`Self::from_parts_with_phases`] instead.
98    pub fn from_parts(
99        label: impl Into<String>,
100        tags: Vec<String>,
101        elapsed_ms: Vec<u64>,
102        values: Vec<SnapshotResult<T>>,
103    ) -> Self {
104        let phases = vec![None; values.len()];
105        Self::from_parts_with_phases(label, tags, elapsed_ms, values, phases)
106    }
107
108    /// Build a new field with explicit per-sample phase tags.
109    /// `phases.len()` MUST equal `values.len()` — the four parallel
110    /// vecs (tags / elapsed_ms / values / phases) are addressed by
111    /// the same index throughout. Like [`Self::from_parts`], this
112    /// is intended for projection helpers in
113    /// [`crate::scenario::sample`] that already know each sample's
114    /// step_index from the drained bridge tuple.
115    /// 5-arg convenience taking MEASURED `Vec<u64>` timestamps; wraps
116    /// each in `Some` and delegates to [`Self::from_parts_with_phases_opt`].
117    /// Test fixtures (which always model measured samples) and any
118    /// all-measured caller use this. The production projection funnel
119    /// — which threads `Option<u64>` from the bridge so "not measured"
120    /// stays distinct from "measured 0" — calls
121    /// [`Self::from_parts_with_phases_opt`] directly.
122    pub fn from_parts_with_phases(
123        label: impl Into<String>,
124        tags: Vec<String>,
125        elapsed_ms: Vec<u64>,
126        values: Vec<SnapshotResult<T>>,
127        phases: Vec<Option<crate::assert::Phase>>,
128    ) -> Self {
129        Self::from_parts_with_phases_opt(
130            label,
131            tags,
132            elapsed_ms.into_iter().map(Some).collect(),
133            values,
134            phases,
135        )
136    }
137
138    /// None-aware constructor: `elapsed_ms[i] == None` means the bridge
139    /// recorded no timestamp for that sample (not measured), kept
140    /// distinct from a measured `Some(0)`. Temporal patterns that do
141    /// timestamp math ([`EachClaim`]-free `rate_within` dt,
142    /// `steady_within` warmup gate, `converges_to` deadline gate) SKIP
143    /// a `None`-anchored sample rather than fabricating a `0` (the
144    /// silent-wrong-answer this distinction prevents).
145    pub fn from_parts_with_phases_opt(
146        label: impl Into<String>,
147        tags: Vec<String>,
148        elapsed_ms: Vec<Option<u64>>,
149        values: Vec<SnapshotResult<T>>,
150        phases: Vec<Option<crate::assert::Phase>>,
151    ) -> Self {
152        // Hard runtime check (not debug_assert_eq!) so the equal-
153        // length guarantee documented on iter_full() holds in
154        // release builds. A length mismatch would otherwise surface
155        // as either a silent truncation in iter_full() (zip stops
156        // at the shortest input) or an out-of-bounds panic from
157        // the direct `tags[i]` / `elapsed_ms[i]` field access in
158        // EachClaim failure-message rendering — both harder to
159        // diagnose than a panic at the construction site.
160        assert_eq!(tags.len(), values.len());
161        assert_eq!(elapsed_ms.len(), values.len());
162        assert_eq!(phases.len(), values.len());
163        Self {
164            label: label.into(),
165            tags,
166            elapsed_ms,
167            values,
168            phases,
169        }
170    }
171
172    /// Per-sample phase tag, parallel to `values`. `None` for
173    /// fixture / unstamped samples; `Some(phase)` for production
174    /// captures whose source row carried a `step_index`.
175    pub fn phases_iter(&self) -> impl Iterator<Item = Option<crate::assert::Phase>> + '_ {
176        self.phases.iter().copied()
177    }
178
179    /// Per-phase folded reduction: extract the `f64` value from
180    /// each Ok-sample (skipping per-sample errors) and route the
181    /// per-phase slice through `crate::stats::aggregate_samples_for_phase`
182    /// so Counter kinds get the cumulative-delta semantic while
183    /// other kinds inherit the flat-run aggregator. Returns one
184    /// entry per phase that has at least one Ok-sample with a
185    /// finite value; phases with all-err / all-NaN samples are
186    /// absent from the map (consistent with
187    /// `aggregate_samples_for_phase` returning `None` on that
188    /// input). Skips `None`-phase samples — fixture / unstamped
189    /// data does not have a phase key to bucket against.
190    ///
191    /// The API entry point a test author uses to ask "what was the
192    /// per-phase reduction of metric X?" without having to thread
193    /// `by_phase` + the kind-aware reducer manually.
194    pub fn aggregate_by_phase(
195        &self,
196        metric: &crate::stats::MetricDef,
197    ) -> std::collections::BTreeMap<crate::assert::Phase, f64>
198    where
199        T: Copy + Into<f64>,
200    {
201        let (by_phase, _none_bucket) = self.by_phase();
202        let mut out: std::collections::BTreeMap<crate::assert::Phase, f64> =
203            std::collections::BTreeMap::new();
204        for (phase, samples) in by_phase {
205            let finite_values: Vec<f64> = samples
206                .iter()
207                .filter_map(|(_tag, _elapsed, value)| match value {
208                    Ok(v) => Some((*v).into()),
209                    Err(_) => None,
210                })
211                .collect();
212            if let Some(reduced) = crate::stats::aggregate_samples_for_phase(metric, &finite_values)
213            {
214                out.insert(phase, reduced);
215            }
216        }
217        out
218    }
219
220    /// Per-phase SUM of the Ok-sample values — the per-phase total for a
221    /// metric whose samples are already per-read DELTAS, e.g. a
222    /// scheduler-defined scx_stats metric the scheduler deltas
223    /// server-side per reader request (one ktstr snapshot = one request
224    /// = one delta), read via `series.stats(name)`. Returns one entry
225    /// per phase with at least one finite Ok-sample; all-err / all-NaN
226    /// phases and `None`-phase (fixture / unstamped) samples are
227    /// excluded, matching [`Self::aggregate_by_phase`].
228    ///
229    /// This is the `crate::stats::MetricKind::DeltaSum` per-phase
230    /// reduction WITHOUT requiring a `crate::stats::MetricDef` — the
231    /// ergonomic accessor for the common case "I read a delta-reported
232    /// scx_stats metric; give me each phase's total." For a registered
233    /// metric, `aggregate_by_phase(&def)` with `def.kind == DeltaSum`
234    /// gives the identical result.
235    ///
236    /// Boundary: the first in-phase delta straddles the phase boundary
237    /// (it spans from the last pre-phase read to the first in-phase
238    /// read, so it carries a little pre-phase activity); it is
239    /// attributed to the phase its read lands in — a slight left-edge
240    /// over-attribution, the deliberate semantic since a per-read delta
241    /// cannot be split.
242    pub fn sum_by_phase(&self) -> std::collections::BTreeMap<crate::assert::Phase, f64>
243    where
244        T: Copy + Into<f64>,
245    {
246        let (by_phase, _none_bucket) = self.by_phase();
247        let mut out: std::collections::BTreeMap<crate::assert::Phase, f64> =
248            std::collections::BTreeMap::new();
249        for (phase, samples) in by_phase {
250            let values: Vec<f64> = samples
251                .iter()
252                .filter_map(|(_tag, _elapsed, value)| match value {
253                    Ok(v) => Some((*v).into()),
254                    Err(_) => None,
255                })
256                .collect();
257            if let Some(reduced) =
258                crate::stats::aggregate_samples(&values, crate::stats::MetricKind::DeltaSum)
259            {
260                out.insert(phase, reduced);
261            }
262        }
263        out
264    }
265
266    /// Apply `f` once per phase, with the per-phase slice of
267    /// [`SampleTriple<T>`]s. `None`-phase samples (fixture /
268    /// unstamped) are skipped — callers wanting them call
269    /// [`Self::by_phase`] directly and handle the second-tuple
270    /// `none_bucket` themselves. Phases iterate in `Phase` order
271    /// (BASELINE first, then Step\[0\], Step\[1\], ...) per the
272    /// `BTreeMap` key ordering, which lets temporal-pattern
273    /// consumers apply a per-phase reduction without restating
274    /// the `by_phase` unpacking at every call site.
275    ///
276    /// ```ignore
277    /// field.for_each_phase(|phase, samples| {
278    ///     // apply pattern X to samples, scoped to `phase`
279    /// });
280    /// ```
281    pub fn for_each_phase(&self, mut f: impl FnMut(crate::assert::Phase, &[SampleTriple<'_, T>])) {
282        let (by_phase, _none_bucket) = self.by_phase();
283        for (phase, samples) in by_phase {
284            f(phase, &samples);
285        }
286    }
287
288    /// Partition samples by phase. `None`-phase samples bucket
289    /// into the returned `none_bucket` outside the BTreeMap; phase
290    /// values bucket by their `Phase` key. Each bucket retains the
291    /// per-sample [`SampleTriple<T>`] the standard [`Self::iter_full`]
292    /// yields.
293    pub fn by_phase(&self) -> ByPhasePartition<'_, T> {
294        let mut buckets: std::collections::BTreeMap<
295            crate::assert::Phase,
296            Vec<SampleTriple<'_, T>>,
297        > = std::collections::BTreeMap::new();
298        let mut none_bucket: Vec<SampleTriple<'_, T>> = Vec::new();
299        for (((tag, elapsed_ms), value), phase) in self
300            .tags
301            .iter()
302            .zip(self.elapsed_ms.iter())
303            .zip(self.values.iter())
304            .zip(self.phases.iter())
305        {
306            let triple = (tag.as_str(), *elapsed_ms, value);
307            match phase {
308                Some(p) => buckets.entry(*p).or_default().push(triple),
309                None => none_bucket.push(triple),
310            }
311        }
312        (buckets, none_bucket)
313    }
314
315    /// Label for failure-message rendering.
316    pub fn label(&self) -> &str {
317        &self.label
318    }
319
320    /// Number of samples in the field.
321    pub fn len(&self) -> usize {
322        self.values.len()
323    }
324
325    /// True when no samples are present.
326    pub fn is_empty(&self) -> bool {
327        self.values.is_empty()
328    }
329
330    /// Iterate over per-sample values (each a [`SnapshotResult<T>`]).
331    pub fn values_iter(&self) -> impl Iterator<Item = &SnapshotResult<T>> {
332        self.values.iter()
333    }
334
335    /// Iterate over the full per-sample triple — `(tag,
336    /// elapsed_ms, &SnapshotResult<T>)`. Lets a caller consume the
337    /// projected column alongside its sample identity without
338    /// re-threading the source [`SampleSeries`](crate::scenario::sample::SampleSeries).
339    /// Yields entries in the same order as the underlying
340    /// `Vec<SnapshotResult<T>>` storage; tags and elapsed-ms
341    /// vectors are guaranteed equal-length to `values` by
342    /// [`Self::from_parts_with_phases_opt`]'s `assert_eq!` checks
343    /// (which run in both debug and release builds).
344    pub fn iter_full(&self) -> impl Iterator<Item = (&str, Option<u64>, &SnapshotResult<T>)> {
345        self.tags
346            .iter()
347            .zip(self.elapsed_ms.iter())
348            .zip(self.values.iter())
349            .map(|((tag, elapsed_ms), value)| (tag.as_str(), *elapsed_ms, value))
350    }
351
352    /// Open a per-sample claim builder for scalar comparators
353    /// (`at_least`, `at_most`, `between`). Each successful sample
354    /// runs the comparator independently; the first failure
355    /// records a detail; subsequent failures pile on so the
356    /// timeline shows every offending sample, not just the first.
357    /// Borrows the verdict mutably for the duration of the
358    /// comparator chain.
359    pub fn each<'v>(&self, verdict: &'v mut Verdict) -> EachClaim<'_, 'v, T> {
360        EachClaim {
361            field: self,
362            verdict,
363        }
364    }
365
366    /// Iterate the [`SampleTriple`]s for one specific phase. Sugar
367    /// for [`Self::by_phase`]`().0.get(&phase).map(...)` that drops
368    /// the tuple-destructure noise the user otherwise repeats at
369    /// every per-phase site. Returns an empty iterator when the
370    /// phase had no samples; callers that need to distinguish
371    /// "empty bucket" from "phase never observed" can use
372    /// [`Self::by_phase`] directly.
373    pub fn phase(&self, phase: crate::assert::Phase) -> Vec<SampleTriple<'_, T>> {
374        self.iter_full()
375            .zip(self.phases.iter())
376            .filter_map(|(triple, p)| {
377                if *p == Some(phase) {
378                    Some(triple)
379                } else {
380                    None
381                }
382            })
383            .collect()
384    }
385
386    /// Single-value reduction for a cumulative counter or any
387    /// metric where "the last sample of the phase" is the
388    /// load-bearing value: returns the last Ok-sample's value
389    /// for `phase`, or `None` when the phase had zero Ok-samples.
390    /// Per-sample errors are skipped so a one-off projection
391    /// hiccup doesn't drop the whole phase. The "value at end of
392    /// phase" semantic is what cross-phase counter comparisons
393    /// (e.g. "dispatch count of Step\[1\] ≤ 0.85 × dispatch count
394    /// of Step\[0\]") almost always want, so this primitive
395    /// removes the closure-over-by_phase-triples boilerplate the
396    /// user otherwise writes at every site.
397    pub fn value_at_phase(&self, phase: crate::assert::Phase) -> Option<T>
398    where
399        T: Copy,
400    {
401        self.iter_full()
402            .zip(self.phases.iter())
403            .filter_map(|((_, _, value), p)| {
404                if *p == Some(phase) {
405                    value.as_ref().ok().copied()
406                } else {
407                    None
408                }
409            })
410            .last()
411    }
412
413    /// Reduce to a per-phase last-Ok-value map. The companion to
414    /// [`Self::value_at_phase`] for callers that want every phase
415    /// at once: each present phase maps to its last successful
416    /// sample's value. Phases with all-Err samples are absent from
417    /// the map. Matches the "cumulative-counter-at-end-of-phase"
418    /// semantic [`Self::aggregate_by_phase`] applies for Counter
419    /// metrics, but skips the kind-aware fold so callers reaching
420    /// for the raw last value don't pay the projection cost.
421    pub fn last_per_phase(&self) -> std::collections::BTreeMap<crate::assert::Phase, T>
422    where
423        T: Copy,
424    {
425        let mut out: std::collections::BTreeMap<crate::assert::Phase, T> =
426            std::collections::BTreeMap::new();
427        for ((_, _, value), p) in self.iter_full().zip(self.phases.iter()) {
428            if let (Some(phase), Ok(v)) = (*p, value) {
429                out.insert(phase, *v);
430            }
431        }
432        out
433    }
434
435    /// Reduce to a per-phase first-Ok-value map. The symmetric
436    /// companion to [`Self::last_per_phase`]: each present phase
437    /// maps to its first successful sample's value. Per-sample
438    /// errors are skipped so a one-off projection hiccup at the
439    /// start of a phase doesn't drop the whole phase. Phases with
440    /// all-Err samples are absent from the map.
441    ///
442    /// The load-bearing pairing is
443    /// `last_per_phase - first_per_phase` — the work done WITHIN
444    /// a phase, with no contribution from prior phases — surfaced
445    /// directly via [`Self::counter_delta_per_phase`].
446    pub fn first_per_phase(&self) -> std::collections::BTreeMap<crate::assert::Phase, T>
447    where
448        T: Copy,
449    {
450        let mut out: std::collections::BTreeMap<crate::assert::Phase, T> =
451            std::collections::BTreeMap::new();
452        for ((_, _, value), p) in self.iter_full().zip(self.phases.iter()) {
453            if let (Some(phase), Ok(v)) = (*p, value) {
454                out.entry(phase).or_insert(*v);
455            }
456        }
457        out
458    }
459
460    /// Per-phase cumulative-counter delta: `last_per_phase(p) -
461    /// first_per_phase(p)` for every phase with at least one Ok
462    /// sample. The reducer A/B-compare tests reach for when a
463    /// metric is a cumulative counter still accruing from prior
464    /// phases — `value_at_phase`'s last-sample reading carries
465    /// the whole-run accumulation, which lets prior-phase activity
466    /// muddy the cross-phase ratio. The delta isolates the work
467    /// performed WITHIN each phase so
468    /// `ratio(phase_delta(later) / phase_delta(earlier))` answers
469    /// the question the operator actually asked: "did `later`'s
470    /// activity drop relative to `earlier`'s?"
471    ///
472    /// Single-Ok-sample phases yield a delta of zero (first ==
473    /// last). Phases with all-Err samples are absent. Phases that
474    /// appear in `first_per_phase` but not `last_per_phase` (the
475    /// out-of-order edge case) are also absent — the delta is
476    /// well-defined only when both endpoints are present.
477    ///
478    /// The reducer is intentionally generic: the test author owns
479    /// `(same_delta, cross_delta)`-style compositions (e.g. fold
480    /// two counter-delta maps into a per-phase fraction) without
481    /// the framework needing a registered `MetricDef`. For
482    /// Counter-typed registered metrics, the equivalent
483    /// MetricDef-aware path is [`Self::aggregate_by_phase`].
484    ///
485    /// **Counter regression**: when a phase's last sample reads
486    /// LOWER than its first, the reducer emits a `tracing::warn!`
487    /// naming the field label + phase + (first, last) and stores
488    /// `T::default()` for that phase (e.g. `0` for `u64`). The
489    /// underlying counter is assumed non-decreasing within a phase
490    /// (the common case for BPF per-event counters). A regression
491    /// can mean either an upstream-signal bug (the counter source
492    /// itself rolled back) OR a framework picker drift mid-phase
493    /// (e.g. [`crate::scenario::sample::SampleSeries::bpf_live_u64`]
494    /// resolved to different bss copies across same-phase
495    /// snapshots in a post-`Op::ReplaceScheduler` swap window).
496    /// Either way the regression is reported as zero progress
497    /// rather than panicking, so a single bad sample does not
498    /// crash the assertion engine.
499    pub fn counter_delta_per_phase(&self) -> std::collections::BTreeMap<crate::assert::Phase, T>
500    where
501        T: Copy + PartialOrd + std::ops::Sub<Output = T> + Default + std::fmt::Debug,
502    {
503        let firsts = self.first_per_phase();
504        let lasts = self.last_per_phase();
505        let label = self.label();
506        firsts
507            .into_iter()
508            .filter_map(|(phase, first)| {
509                lasts.get(&phase).map(|last| {
510                    if *last >= first {
511                        (phase, *last - first)
512                    } else {
513                        tracing::warn!(
514                            label = %label,
515                            ?phase,
516                            ?first,
517                            last = ?*last,
518                            "counter_delta_per_phase: phase counter regressed \
519                             (last < first); reporting zero progress for this phase",
520                        );
521                        (phase, T::default())
522                    }
523                })
524            })
525            .collect()
526    }
527
528    /// Cross-phase comparator: pin `value_at_phase(later) /
529    /// value_at_phase(earlier)` against a ceiling AND record the
530    /// computed ratio + both phase values in the verdict so
531    /// `--nocapture` runs surface the actual numbers without a
532    /// per-test `println!` boilerplate. Records a failure when
533    /// either phase had no Ok-samples, when the earlier value is
534    /// zero (no baseline), or when the ratio exceeds `ceiling`.
535    /// On success records an informational note carrying the
536    /// observed ratio + both values so the operator can see the
537    /// margin against the threshold.
538    ///
539    /// Returns `&mut Verdict` for chaining.
540    pub fn ratio_across_phases<'v>(
541        &self,
542        verdict: &'v mut Verdict,
543        earlier: crate::assert::Phase,
544        later: crate::assert::Phase,
545    ) -> CrossPhaseRatio<'v, T>
546    where
547        T: Copy + Into<f64> + std::fmt::Display,
548    {
549        let e = self.value_at_phase(earlier);
550        let l = self.value_at_phase(later);
551        CrossPhaseRatio {
552            label: self.label().to_string(),
553            verdict,
554            earlier,
555            later,
556            earlier_value: e,
557            later_value: l,
558        }
559    }
560}
561
562/// Cross-phase ratio builder returned by
563/// [`SeriesField::ratio_across_phases`] and
564/// [`PhaseMapExt::ratio_across_phases`]. Carries the resolved
565/// `(earlier, later)` values and a caller-supplied label so the
566/// terminal comparator chain (`at_most`) can format both values
567/// and the ratio into a single failure-or-note message. Mirrors
568/// the [`EachClaim`] shape (mutable verdict borrow held through
569/// the chain).
570///
571/// The `label` is origin-neutral: SeriesField's entry point fills
572/// it from the field's `.label()`, the PhaseMap entry point takes
573/// it from the caller. An empty label suppresses the leading
574/// `label:` / `[label]` prefix in the rendered message so the
575/// rest of the diagnostic stays readable.
576#[must_use = "CrossPhaseRatio records nothing until at_most is invoked"]
577pub struct CrossPhaseRatio<'v, T> {
578    label: String,
579    verdict: &'v mut Verdict,
580    earlier: crate::assert::Phase,
581    later: crate::assert::Phase,
582    earlier_value: Option<T>,
583    later_value: Option<T>,
584}
585
586impl<'v, T> CrossPhaseRatio<'v, T>
587where
588    T: Copy + Into<f64> + std::fmt::Display,
589{
590    /// Pass when `later_value / earlier_value <= ceiling`. On
591    /// failure records a [`DetailKind::Temporal`] detail naming
592    /// the field label, both phase values, the computed ratio,
593    /// and the ceiling so the failure message is self-contained.
594    /// On success records an info note with the same trio so a
595    /// `--nocapture` run surfaces the headroom without a separate
596    /// per-metric `println!`.
597    pub fn at_most(self, ceiling: f64) -> &'v mut Verdict {
598        let label_prefix = if self.label.is_empty() {
599            String::new()
600        } else {
601            format!("{}: ", self.label)
602        };
603        let note_prefix = if self.label.is_empty() {
604            String::new()
605        } else {
606            format!("[{}] ", self.label)
607        };
608        let earlier_str = match self.earlier_value {
609            Some(v) => format!("{v}"),
610            None => "<no-samples>".to_string(),
611        };
612        let later_str = match self.later_value {
613            Some(v) => format!("{v}"),
614            None => "<no-samples>".to_string(),
615        };
616        let (Some(earlier), Some(later)) = (self.earlier_value, self.later_value) else {
617            // INSTRUMENT-derived: one or both phases produced no
618            // samples, so there is no value to ratio against. The
619            // ratio cannot be computed; record Inconclusive rather
620            // than Fail to distinguish "no signal to evaluate" from
621            // "evaluated and exceeded ceiling."
622            push_inconclusive(
623                self.verdict,
624                format!(
625                    "{label_prefix}ratio_across_phases({:?}→{:?}) inconclusive: \
626                     needs both phases — earlier={earlier_str}, later={later_str}",
627                    self.earlier, self.later,
628                ),
629            );
630            return self.verdict;
631        };
632        let earlier_f: f64 = earlier.into();
633        let later_f: f64 = later.into();
634        if earlier_f == 0.0 {
635            // INSTRUMENT-derived: earlier baseline measured 0, so
636            // later/earlier is undefined. Record Inconclusive — the
637            // ceiling check has no signal to evaluate, neither pass
638            // (would silently green a phase pair with no baseline)
639            // nor fail (no actual ratio violation observed) is
640            // truthful. POLICY-derived zero baselines (a policy
641            // decision to compare against an intentional 0) are
642            // out of scope for this gate.
643            push_inconclusive(
644                self.verdict,
645                format!(
646                    "{label_prefix}ratio_across_phases({:?}→{:?}) inconclusive: \
647                     earlier value is 0 (no baseline to ratio against)",
648                    self.earlier, self.later,
649                ),
650            );
651            return self.verdict;
652        }
653        let ratio = later_f / earlier_f;
654        if !ratio.is_finite() {
655            // A non-finite ratio: the `earlier_f == 0.0` guard above
656            // misses a NaN baseline (NaN != 0.0), and a NaN later_f
657            // or an inf-producing quotient also lands here. Raw
658            // `ratio > ceiling` is always false for NaN, so without
659            // this guard the phase pair would silently PASS. Treat a
660            // corrupt endpoint as a Fail, mirroring rate_within /
661            // ratio_within's non-finite handling (distinct from the
662            // zero-baseline Inconclusive above, which is "no signal").
663            push_detail(
664                self.verdict,
665                format!(
666                    "{label_prefix}ratio_across_phases({:?}→{:?}) = \
667                     {later_str}/{earlier_str} = {ratio} is non-finite \
668                     (corrupt endpoint) — cannot evaluate ceiling {ceiling:.4}",
669                    self.earlier, self.later,
670                ),
671            );
672        } else if ratio > ceiling {
673            push_detail(
674                self.verdict,
675                format!(
676                    "{label_prefix}ratio_across_phases({:?}→{:?}) = \
677                     {later_str}/{earlier_str} = {ratio:.4} exceeds ceiling \
678                     {ceiling:.4}",
679                    self.earlier, self.later,
680                ),
681            );
682        } else {
683            // Pass — emit a note that surfaces in the sidecar
684            // info_notes (visible under --nocapture and on the
685            // failure render of any sibling claim) so the operator
686            // sees the headroom against the ceiling without a
687            // separate per-metric println.
688            self.verdict.note(format!(
689                "{note_prefix}ratio_across_phases({:?}→{:?}) = \
690                 {later_str}/{earlier_str} = {ratio:.4} (ceiling {ceiling:.4})",
691                self.earlier, self.later,
692            ));
693        }
694        self.verdict
695    }
696}
697
698/// The polarity-resolved outcome of a [`BetterThanPhase`] comparison, factored
699/// out as a PURE decision so it is exhaustively unit-testable without a
700/// `VmResult` or `Verdict`. The builder maps each variant to a verdict record +
701/// message.
702#[derive(Debug, Clone, Copy, PartialEq, Eq)]
703enum BetterOutcome {
704    /// Candidate is better than baseline (strictly, or by the required margin).
705    Pass,
706    /// Candidate is NOT better — worse, or short of the margin.
707    Fail,
708    /// A non-finite endpoint — a corrupt value the comparison can't trust
709    /// (Fail, not Inconclusive: a `<` on NaN is silently false, so an unguarded
710    /// corrupt value would falsely pass).
711    Corrupt,
712    /// One or both phases carried no value for the metric (no signal).
713    Missing,
714    /// The metric has no LowerBetter/HigherBetter polarity (TargetValue /
715    /// Unknown / unregistered), so "better" has no direction.
716    Undirected,
717    /// A fractional margin was requested against a zero baseline — nothing to
718    /// scale the margin against.
719    ZeroBaseline,
720}
721
722/// Pure better-than decision: is `candidate` better than `baseline` for a metric
723/// of the given `polarity`, by the optional `margin` (a FRACTION of the
724/// baseline; `None` = strictly better, any improvement)? The whole point is the
725/// SAME call works for a LowerBetter metric (latency) and a HigherBetter one
726/// (throughput) with no caller-specified direction — the direction comes from
727/// the registry-declared polarity.
728fn better_outcome(
729    baseline: Option<f64>,
730    candidate: Option<f64>,
731    polarity: Option<crate::test_support::Polarity>,
732    margin: Option<f64>,
733) -> BetterOutcome {
734    use crate::test_support::Polarity;
735    let (Some(b), Some(c)) = (baseline, candidate) else {
736        return BetterOutcome::Missing;
737    };
738    if !b.is_finite() || !c.is_finite() {
739        return BetterOutcome::Corrupt;
740    }
741    let lower_better = match polarity {
742        Some(Polarity::LowerBetter) => true,
743        Some(Polarity::HigherBetter) => false,
744        // TargetValue / Unknown / Informational / unregistered: no
745        // "better" direction.
746        _ => return BetterOutcome::Undirected,
747    };
748    let pass = match margin {
749        // Strictly better (any improvement).
750        None => {
751            if lower_better {
752                c < b
753            } else {
754                c > b
755            }
756        }
757        // Better by at least `m` (a fraction of the baseline). A zero baseline
758        // has nothing to scale the fractional margin against.
759        Some(m) => {
760            if b == 0.0 {
761                return BetterOutcome::ZeroBaseline;
762            }
763            // Relative improvement as a fraction of the baseline, the DIVISION
764            // form `(improvement)/baseline >= m` rather than the
765            // algebraically-equivalent multiplicative `c <= b*(1-m)`: the
766            // `b*(1-m)` intermediate (e.g. `100.0*(1.0-0.1)` = 89.999…) rejects
767            // an EXACTLY-`m`-better candidate by an f64 epsilon, whereas
768            // `(b-c)/b` is exact at round-number boundaries (`10.0/100.0` is the
769            // same f64 bit pattern as the `0.1` literal). The
770            // percentile-f64-threshold-rounding footgun.
771            if lower_better {
772                (b - c) / b >= m
773            } else {
774                (c - b) / b >= m
775            }
776        }
777    };
778    if pass {
779        BetterOutcome::Pass
780    } else {
781        BetterOutcome::Fail
782    }
783}
784
785/// Cross-phase "is the candidate phase better than the baseline phase on this
786/// metric?" comparator, returned by
787/// [`crate::vmm::VmResult::better_across_phases`] (pooled aggregate) and
788/// [`crate::vmm::VmResult::better_across_phases_cgroup`] (one named cgroup). The
789/// polarity-aware sibling of [`CrossPhaseRatio`]: it reads two PER-PHASE scalars
790/// (via `phase_metric` / `phase_cgroup_metric`, not a sampled series) and orients
791/// "better" from the metric's registry-declared
792/// polarity, so the same call expresses "scheduler beats EEVDF" for a
793/// LowerBetter latency AND a HigherBetter throughput without the test author
794/// naming a direction. A missing/undirected/zero-baseline comparison is
795/// Inconclusive (never a silent pass); a non-finite value is a Fail.
796#[must_use = "BetterThanPhase records nothing until better_than / by_at_least is invoked"]
797pub struct BetterThanPhase<'v> {
798    metric: String,
799    verdict: &'v mut Verdict,
800    baseline: crate::assert::Phase,
801    candidate: crate::assert::Phase,
802    baseline_value: Option<f64>,
803    candidate_value: Option<f64>,
804    polarity: Option<crate::test_support::Polarity>,
805    /// Optional scope label (the cgroup name) for the per-cgroup producer
806    /// (`better_across_phases_cgroup`); `None` for the pooled producer. Surfaced in
807    /// the Inconclusive/Fail diagnostics so a per-cgroup outcome names its cgroup.
808    scope: Option<String>,
809}
810
811impl<'v> BetterThanPhase<'v> {
812    /// Build from already-resolved per-phase values + polarity. Constructed by
813    /// [`crate::vmm::VmResult::better_across_phases`] (values via `phase_metric`)
814    /// and [`crate::vmm::VmResult::better_across_phases_cgroup`] (values via
815    /// `phase_cgroup_metric`), each resolving polarity via `crate::stats::metric_def`.
816    /// `scope` is the per-cgroup cgroup name (`None` for the pooled producer),
817    /// surfaced in the diagnostics.
818    // 8 args: a field-setting constructor (each arg is one BetterThanPhase field),
819    // with only 2 well-structured call sites (the two VmResult producers).
820    #[allow(clippy::too_many_arguments)]
821    pub(crate) fn new(
822        metric: String,
823        verdict: &'v mut Verdict,
824        baseline: crate::assert::Phase,
825        candidate: crate::assert::Phase,
826        baseline_value: Option<f64>,
827        candidate_value: Option<f64>,
828        polarity: Option<crate::test_support::Polarity>,
829        scope: Option<String>,
830    ) -> Self {
831        Self {
832            metric,
833            verdict,
834            baseline,
835            candidate,
836            baseline_value,
837            candidate_value,
838            polarity,
839            scope,
840        }
841    }
842
843    /// Pass iff the candidate is STRICTLY better than the baseline per the
844    /// metric's polarity (LowerBetter: candidate < baseline; HigherBetter:
845    /// candidate > baseline) — any improvement, no margin.
846    pub fn better_than(self) -> &'v mut Verdict {
847        self.evaluate(None)
848    }
849
850    /// Pass iff the candidate improves on the baseline by at least `margin`, a
851    /// FRACTION of the baseline (`0.10` = 10% better): LowerBetter requires
852    /// `candidate <= baseline * (1 - margin)`, HigherBetter requires
853    /// `candidate >= baseline * (1 + margin)`. `by_at_least(0.0)` means "no
854    /// regression" (at least as good). A zero baseline is Inconclusive (nothing
855    /// to scale the fractional margin against).
856    pub fn by_at_least(self, margin: f64) -> &'v mut Verdict {
857        self.evaluate(Some(margin))
858    }
859
860    fn evaluate(self, margin: Option<f64>) -> &'v mut Verdict {
861        let outcome = better_outcome(
862            self.baseline_value,
863            self.candidate_value,
864            self.polarity,
865            margin,
866        );
867        let dir = match self.polarity {
868            Some(crate::test_support::Polarity::LowerBetter) => "lower-is-better",
869            Some(crate::test_support::Polarity::HigherBetter) => "higher-is-better",
870            _ => "no-better-direction",
871        };
872        let b_str = self
873            .baseline_value
874            .map(|v| format!("{v}"))
875            .unwrap_or_else(|| "<no-value>".to_string());
876        let c_str = self
877            .candidate_value
878            .map(|v| format!("{v}"))
879            .unwrap_or_else(|| "<no-value>".to_string());
880        let req = match margin {
881            None => "strictly better".to_string(),
882            Some(m) => format!("better by >= {m:.4} fraction"),
883        };
884        let metric = &self.metric;
885        let base = self.baseline;
886        let cand = self.candidate;
887        // Scope suffix (the cgroup name) for the per-cgroup producer; empty for the
888        // pooled producer — so a per-cgroup outcome names its cgroup.
889        let scope_str = self
890            .scope
891            .as_deref()
892            .map(|s| format!(" [cgroup {s}]"))
893            .unwrap_or_default();
894        match outcome {
895            BetterOutcome::Pass => {
896                self.verdict.note(format!(
897                    "[{metric}]{scope_str} candidate {cand}={c_str} {req} than baseline {base}={b_str} ({dir})"
898                ));
899            }
900            BetterOutcome::Fail => {
901                push_detail(
902                    self.verdict,
903                    format!(
904                        "{metric}{scope_str}: candidate {cand}={c_str} is NOT {req} than baseline \
905                         {base}={b_str} ({dir})"
906                    ),
907                );
908            }
909            BetterOutcome::Corrupt => {
910                push_detail(
911                    self.verdict,
912                    format!(
913                        "{metric}{scope_str}: non-finite value (baseline {base}={b_str}, candidate \
914                         {cand}={c_str}) — cannot compare"
915                    ),
916                );
917            }
918            BetterOutcome::Missing => {
919                push_inconclusive(
920                    self.verdict,
921                    format!(
922                        "{metric}: cross-phase better-than({base}->{cand}){scope_str} inconclusive: \
923                         needs both phases — baseline={b_str}, candidate={c_str}"
924                    ),
925                );
926            }
927            BetterOutcome::Undirected => {
928                push_inconclusive(
929                    self.verdict,
930                    format!(
931                        "{metric}: cross-phase better-than{scope_str} inconclusive: metric has no \
932                         lower/higher-is-better polarity — cannot orient 'better'"
933                    ),
934                );
935            }
936            BetterOutcome::ZeroBaseline => {
937                push_inconclusive(
938                    self.verdict,
939                    format!(
940                        "{metric}: cross-phase better-than{scope_str} inconclusive: baseline {base}=0, no \
941                         baseline to scale the fractional margin ({req})"
942                    ),
943                );
944            }
945        }
946        self.verdict
947    }
948}
949
950/// Direct-claim "is this candidate better than this baseline on a
951/// registered metric?" comparator, returned by [`Verdict::claim_better`].
952/// The polarity-aware sibling of
953/// [`crate::vmm::VmResult::better_across_phases`] for the case where the
954/// test already HOLDS two scalar values (not two phases): it orients
955/// "better" from the metric's registry-declared polarity, so the SAME
956/// call expresses "candidate beats baseline" for a LowerBetter latency
957/// AND a HigherBetter throughput without the test author naming a
958/// direction. A metric with no lower/higher-better polarity
959/// (`TargetValue` / `Unknown` / `Informational` / an unregistered or
960/// typo'd name) is INCONCLUSIVE (never a silent pass); a non-finite
961/// endpoint is a `Fail`.
962#[must_use = "ClaimBetter records nothing until .than(..) / .than_by(..) is invoked"]
963pub struct ClaimBetter<'a> {
964    verdict: &'a mut Verdict,
965    label: String,
966    candidate: f64,
967    polarity: Option<crate::test_support::Polarity>,
968}
969
970impl<'a> ClaimBetter<'a> {
971    /// Pass iff the candidate is STRICTLY better than `baseline` per the
972    /// metric's polarity (LowerBetter: candidate < baseline;
973    /// HigherBetter: candidate > baseline) — any improvement, no margin.
974    pub fn than(self, baseline: f64) -> &'a mut Verdict {
975        self.evaluate(baseline, None)
976    }
977
978    /// Pass iff the candidate improves on `baseline` by at least
979    /// `margin`, a FRACTION of the baseline (`0.10` = 10% better).
980    /// `than_by(b, 0.0)` means "no regression" (at least as good). A
981    /// zero baseline is Inconclusive (nothing to scale the margin
982    /// against).
983    pub fn than_by(self, baseline: f64, margin: f64) -> &'a mut Verdict {
984        self.evaluate(baseline, Some(margin))
985    }
986
987    fn evaluate(self, baseline: f64, margin: Option<f64>) -> &'a mut Verdict {
988        let ClaimBetter {
989            verdict,
990            label,
991            candidate,
992            polarity,
993        } = self;
994        let outcome = better_outcome(Some(baseline), Some(candidate), polarity, margin);
995        let dir = match polarity {
996            Some(crate::test_support::Polarity::LowerBetter) => "lower-is-better",
997            Some(crate::test_support::Polarity::HigherBetter) => "higher-is-better",
998            _ => "no-better-direction",
999        };
1000        let req = match margin {
1001            None => "strictly better".to_string(),
1002            Some(m) => format!("better by >= {m:.4} fraction"),
1003        };
1004        match outcome {
1005            BetterOutcome::Pass => {
1006                verdict.note(format!(
1007                    "[{label}] candidate {candidate} {req} than baseline {baseline} ({dir})"
1008                ));
1009            }
1010            BetterOutcome::Fail => {
1011                push_detail(
1012                    verdict,
1013                    format!(
1014                        "{label}: candidate {candidate} is NOT {req} than baseline {baseline} ({dir})"
1015                    ),
1016                );
1017            }
1018            BetterOutcome::Corrupt => {
1019                push_detail(
1020                    verdict,
1021                    format!(
1022                        "{label}: non-finite value (baseline {baseline}, candidate {candidate}) \
1023                         — cannot compare"
1024                    ),
1025                );
1026            }
1027            BetterOutcome::Missing => {
1028                push_inconclusive(
1029                    verdict,
1030                    format!("{label}: better-than inconclusive: no value to compare"),
1031                );
1032            }
1033            BetterOutcome::Undirected => {
1034                push_inconclusive(
1035                    verdict,
1036                    format!(
1037                        "{label}: better-than inconclusive: metric has no lower/higher-is-better \
1038                         polarity — cannot orient 'better' (an unregistered or typo'd metric name \
1039                         resolves here)"
1040                    ),
1041                );
1042            }
1043            BetterOutcome::ZeroBaseline => {
1044                push_inconclusive(
1045                    verdict,
1046                    format!(
1047                        "{label}: better-than inconclusive: baseline=0, no baseline to scale the \
1048                         fractional margin ({req})"
1049                    ),
1050                );
1051            }
1052        }
1053        verdict
1054    }
1055}
1056
1057impl Verdict {
1058    /// Claim that `candidate` is better than a baseline on a registered
1059    /// metric, oriented by the metric's registry-declared polarity — the
1060    /// direct-value sibling of
1061    /// [`crate::vmm::VmResult::better_across_phases`]. Pass the metric as
1062    /// a typed [`BuiltinMetric`](crate::stats::BuiltinMetric) (typo-proof)
1063    /// or a string (a registered wire-name resolves; an unregistered or
1064    /// typo'd name has no polarity → the comparison is Inconclusive, the
1065    /// no-guessed-direction guardrail). Chain [`ClaimBetter::than`]
1066    /// (strictly better) or [`ClaimBetter::than_by`] (a fractional
1067    /// margin).
1068    ///
1069    /// ```
1070    /// # use ktstr::{assert::Verdict, prelude::BuiltinMetric};
1071    /// let mut v = Verdict::new();
1072    /// // wakeup p99 latency is lower-is-better — candidate 40 < baseline 50 passes:
1073    /// v.claim_better(BuiltinMetric::WakeupP99LatencyUs, 40.0).than(50.0);
1074    /// let r = v.into_result();
1075    /// assert!(r.is_pass());
1076    /// ```
1077    pub fn claim_better(
1078        &mut self,
1079        metric: impl Into<crate::stats::MetricId>,
1080        candidate: f64,
1081    ) -> ClaimBetter<'_> {
1082        let id = metric.into();
1083        let polarity = id.def().map(|m| m.polarity);
1084        ClaimBetter {
1085            label: id.as_str().to_string(),
1086            verdict: self,
1087            candidate,
1088            polarity,
1089        }
1090    }
1091}
1092
1093/// Extension trait that lets a pre-reduced per-phase map
1094/// (typically the output of [`SeriesField::counter_delta_per_phase`],
1095/// [`SeriesField::last_per_phase`], or
1096/// [`SeriesField::first_per_phase`]) compose with the
1097/// cross-phase comparator chain [`SeriesField::ratio_across_phases`]
1098/// exposes — without re-projecting the per-phase values back
1099/// through a synthetic [`SeriesField`].
1100///
1101/// Also surfaces [`Self::zip_per_phase`] so two per-phase maps fold
1102/// element-wise into a derived per-phase map (e.g. a cross-LLC
1103/// dispatch fraction from two counter-delta maps).
1104pub trait PhaseMapExt<T> {
1105    /// Fold two per-phase maps element-wise on phase intersection.
1106    /// For every phase present in BOTH `self` AND `other`, invoke
1107    /// `f(self_value, other_value)` and collect the result keyed
1108    /// by phase. Phases present in only one input are absent from
1109    /// the result.
1110    ///
1111    /// **Intersection-only — NOT [`Iterator::zip`] semantics.** This
1112    /// pairs values by phase key, not by position; a missing phase
1113    /// on either side surfaces as an absence in the result, never
1114    /// as a synthesized zero or default. Callers that want to act
1115    /// on coverage gaps compare the result map's length against
1116    /// either input's length.
1117    ///
1118    /// Both values are passed BY VALUE — the trait constrains
1119    /// `T: Copy` and `U: Copy` to keep the closure body free of
1120    /// `*s` / `*c` deref noise that would otherwise dominate every
1121    /// composed-metric expression. Non-Copy element types are out
1122    /// of scope; per-phase reducers in this crate already return
1123    /// scalar `Copy` values (`u64`, `f64`, `i64`).
1124    fn zip_per_phase<U, R>(
1125        &self,
1126        other: &std::collections::BTreeMap<crate::assert::Phase, U>,
1127        f: impl FnMut(T, U) -> R,
1128    ) -> std::collections::BTreeMap<crate::assert::Phase, R>
1129    where
1130        T: Copy,
1131        U: Copy;
1132
1133    /// Cross-phase ratio comparator on a pre-reduced per-phase
1134    /// map. Mirrors [`SeriesField::ratio_across_phases`]'s
1135    /// chain shape — `.at_most(ceiling)` records a failure detail
1136    /// or pass info note via the supplied verdict — but operates
1137    /// on the map directly so caller-derived per-phase values
1138    /// (e.g. a fraction of two counter deltas) skip a synthetic-
1139    /// SeriesField intermediate.
1140    ///
1141    /// Three load-bearing differences from the SeriesField entry:
1142    ///
1143    /// 1. **No implicit label.** SeriesField pulls its `.label()`
1144    ///    for the failure message; the map has no label, so the
1145    ///    caller names the metric being compared at the call site.
1146    /// 2. **Pre-reduced values.** SeriesField reduces by
1147    ///    last-Ok-sample at each comparator call; this trait
1148    ///    operates on values already reduced by any compatible
1149    ///    upstream reducer ([`SeriesField::counter_delta_per_phase`],
1150    ///    [`SeriesField::last_per_phase`], or a caller-defined fold).
1151    /// 3. **`T: Copy`** — the map's per-phase value is copied out
1152    ///    into the [`CrossPhaseRatio`] carrier's `Option<T>`
1153    ///    fields. Matches [`SeriesField::value_at_phase`]'s bound
1154    ///    for the same reason.
1155    fn ratio_across_phases<'v>(
1156        &self,
1157        verdict: &'v mut Verdict,
1158        label: impl Into<String>,
1159        earlier: crate::assert::Phase,
1160        later: crate::assert::Phase,
1161    ) -> CrossPhaseRatio<'v, T>
1162    where
1163        T: Copy + Into<f64> + std::fmt::Display;
1164}
1165
1166/// Per-phase "share of total" reducer. Specialized for the dominant
1167/// counter shape (`BTreeMap<Phase, u64>`) because `u64: Into<f64>`
1168/// is intentionally absent (cast is lossy) — a generic
1169/// [`PhaseMapExt`] method with an `Into<f64>` bound would reject
1170/// every counter-delta map test authors actually reach for.
1171///
1172/// For every phase present in BOTH `self` AND `other`, computes
1173/// `self_value as f64 / (self_value + other_value) as f64`. When
1174/// both values are zero (sum is zero), the phase is **dropped from
1175/// the result** — there is no signal to share, and synthesizing
1176/// `0.0` would let downstream `at_most` / `ratio_within` gates
1177/// silently pass on a zero-event phase pair. Returning no entry
1178/// surfaces the absence so the consumer can treat it as
1179/// Inconclusive (the same shape as a phase only present in one
1180/// input). Phases present in only one input also drop from the
1181/// result, mirroring [`PhaseMapExt::zip_per_phase`]'s
1182/// intersection-only semantics; both drop conditions surface
1183/// identically as "no entry for this phase."
1184///
1185/// Targets the "cross-LLC dispatch fraction" idiom (`nr_cross /
1186/// (nr_cross + nr_same)`) and similar share-of-total patterns.
1187/// The general fold via [`PhaseMapExt::zip_per_phase`] requires
1188/// the caller to spell the safe-divide branch inline at every
1189/// call site; this trait owns the branch so test code expresses
1190/// the metric in one chain.
1191pub trait FracPair {
1192    /// See trait-level doc for the zero-total drop and
1193    /// intersection-only semantics.
1194    fn frac_pair(&self, other: &Self) -> std::collections::BTreeMap<crate::assert::Phase, f64>;
1195}
1196
1197impl FracPair for std::collections::BTreeMap<crate::assert::Phase, u64> {
1198    fn frac_pair(&self, other: &Self) -> std::collections::BTreeMap<crate::assert::Phase, f64> {
1199        self.iter()
1200            .filter_map(|(p, n)| {
1201                other.get(p).and_then(|m| {
1202                    // `saturating_add` guards against u64 overflow on
1203                    // long-running counter pairs (the realistic
1204                    // failure is two near-u64::MAX counter deltas;
1205                    // wrap would produce a misleading fraction).
1206                    // Saturation to u64::MAX still yields a sensible
1207                    // fraction `n / u64::MAX` ≈ 0.0 for non-MAX `n`
1208                    // and 1.0 for the saturating case, with no NaN.
1209                    let total = n.saturating_add(*m);
1210                    if total == 0 {
1211                        // Zero-total phase: no events observed in
1212                        // either input. Drop the entry rather than
1213                        // synthesize 0.0 so a downstream `at_most`
1214                        // sees absence (= Inconclusive shape) instead
1215                        // of a silent pass against any positive
1216                        // threshold.
1217                        None
1218                    } else {
1219                        Some((*p, *n as f64 / total as f64))
1220                    }
1221                })
1222            })
1223            .collect()
1224    }
1225}
1226
1227impl<T> PhaseMapExt<T> for std::collections::BTreeMap<crate::assert::Phase, T> {
1228    fn zip_per_phase<U, R>(
1229        &self,
1230        other: &std::collections::BTreeMap<crate::assert::Phase, U>,
1231        mut f: impl FnMut(T, U) -> R,
1232    ) -> std::collections::BTreeMap<crate::assert::Phase, R>
1233    where
1234        T: Copy,
1235        U: Copy,
1236    {
1237        self.iter()
1238            .filter_map(|(p, t)| other.get(p).map(|u| (*p, f(*t, *u))))
1239            .collect()
1240    }
1241
1242    fn ratio_across_phases<'v>(
1243        &self,
1244        verdict: &'v mut Verdict,
1245        label: impl Into<String>,
1246        earlier: crate::assert::Phase,
1247        later: crate::assert::Phase,
1248    ) -> CrossPhaseRatio<'v, T>
1249    where
1250        T: Copy + Into<f64> + std::fmt::Display,
1251    {
1252        CrossPhaseRatio {
1253            label: label.into(),
1254            verdict,
1255            earlier,
1256            later,
1257            earlier_value: self.get(&earlier).copied(),
1258            later_value: self.get(&later).copied(),
1259        }
1260    }
1261}
1262
1263/// Per-sample scalar claim builder returned by
1264/// [`SeriesField::each`]. Provides `at_least` / `at_most` /
1265/// `between` — comparators that apply to every (successfully
1266/// projected) sample independently. Per-sample errors from the
1267/// projection (missing field, type mismatch) are routed through
1268/// the verdict as failures so coverage gaps are never silent.
1269#[must_use = "EachClaim records nothing until a comparator is invoked"]
1270pub struct EachClaim<'f, 'v, T> {
1271    field: &'f SeriesField<T>,
1272    verdict: &'v mut Verdict,
1273}
1274
1275impl<'f, 'v, T> EachClaim<'f, 'v, T>
1276where
1277    T: PartialOrd + std::fmt::Display + Copy,
1278{
1279    /// Pass when every sample's value satisfies `value >= floor`.
1280    /// Per-sample errors and per-sample violations both record a
1281    /// [`DetailKind::Temporal`] detail and flip the verdict to
1282    /// failed; the chain returns the verdict so further claims
1283    /// can stack.
1284    ///
1285    /// On `T = f64`, an incomparable value (NaN) is a failure: a
1286    /// NaN sample silently passing `value < floor`/`value > ceiling`
1287    /// (which IEEE-754 semantics give you on raw `<`/`>`) would
1288    /// hide a coverage gap, so the pattern uses `partial_cmp` and
1289    /// reports the offending sample distinctly.
1290    pub fn at_least(self, floor: T) -> &'v mut Verdict {
1291        let pre_outcomes = temporal_outcome_count(self.verdict);
1292        let label = self.field.label.as_str();
1293        let n = self.field.values.len();
1294        for (i, slot) in self.field.values.iter().enumerate() {
1295            match slot {
1296                Ok(v) => match v.partial_cmp(&floor) {
1297                    Some(std::cmp::Ordering::Less) => push_detail(
1298                        self.verdict,
1299                        format!(
1300                            "{label} (each.at_least {floor}): sample {tag} (+{elapsed_ms}ms): \
1301                             value {v}",
1302                            tag = self.field.tags[i],
1303                            elapsed_ms = fmt_elapsed_num(self.field.elapsed_ms[i]),
1304                        ),
1305                    ),
1306                    None => push_detail(
1307                        self.verdict,
1308                        format!(
1309                            "{label} (each.at_least {floor}): sample {tag} (+{elapsed_ms}ms): \
1310                             value {v} is incomparable (NaN)",
1311                            tag = self.field.tags[i],
1312                            elapsed_ms = fmt_elapsed_num(self.field.elapsed_ms[i]),
1313                        ),
1314                    ),
1315                    Some(std::cmp::Ordering::Equal | std::cmp::Ordering::Greater) => {}
1316                },
1317                Err(e) => {
1318                    push_detail(
1319                        self.verdict,
1320                        format!(
1321                            "{label} (each.at_least {floor}): sample {tag} (+{elapsed_ms}ms): \
1322                             projection error: {e}",
1323                            tag = self.field.tags[i],
1324                            elapsed_ms = fmt_elapsed_num(self.field.elapsed_ms[i]),
1325                        ),
1326                    );
1327                }
1328            }
1329        }
1330        maybe_log_pass_temporal(self.verdict, pre_outcomes, || {
1331            format!("{label} (each.at_least {floor}): all {n} samples passed")
1332        });
1333        self.verdict
1334    }
1335
1336    /// Pass when every sample's value satisfies `value <= ceiling`.
1337    /// NaN samples (on `T = f64`) report an incomparable failure
1338    /// for the same reason documented on [`Self::at_least`].
1339    pub fn at_most(self, ceiling: T) -> &'v mut Verdict {
1340        let pre_outcomes = temporal_outcome_count(self.verdict);
1341        let label = self.field.label.as_str();
1342        let n = self.field.values.len();
1343        for (i, slot) in self.field.values.iter().enumerate() {
1344            match slot {
1345                Ok(v) => match v.partial_cmp(&ceiling) {
1346                    Some(std::cmp::Ordering::Greater) => push_detail(
1347                        self.verdict,
1348                        format!(
1349                            "{label} (each.at_most {ceiling}): sample {tag} (+{elapsed_ms}ms): \
1350                             value {v}",
1351                            tag = self.field.tags[i],
1352                            elapsed_ms = fmt_elapsed_num(self.field.elapsed_ms[i]),
1353                        ),
1354                    ),
1355                    None => push_detail(
1356                        self.verdict,
1357                        format!(
1358                            "{label} (each.at_most {ceiling}): sample {tag} (+{elapsed_ms}ms): \
1359                             value {v} is incomparable (NaN)",
1360                            tag = self.field.tags[i],
1361                            elapsed_ms = fmt_elapsed_num(self.field.elapsed_ms[i]),
1362                        ),
1363                    ),
1364                    Some(std::cmp::Ordering::Equal | std::cmp::Ordering::Less) => {}
1365                },
1366                Err(e) => {
1367                    push_detail(
1368                        self.verdict,
1369                        format!(
1370                            "{label} (each.at_most {ceiling}): sample {tag} (+{elapsed_ms}ms): \
1371                             projection error: {e}",
1372                            tag = self.field.tags[i],
1373                            elapsed_ms = fmt_elapsed_num(self.field.elapsed_ms[i]),
1374                        ),
1375                    );
1376                }
1377            }
1378        }
1379        maybe_log_pass_temporal(self.verdict, pre_outcomes, || {
1380            format!("{label} (each.at_most {ceiling}): all {n} samples passed")
1381        });
1382        self.verdict
1383    }
1384
1385    /// Pass when every sample's value satisfies `lo <= value <= hi`.
1386    /// Caller error (`lo > hi`) lands as a single
1387    /// [`DetailKind::Temporal`] detail rather than evaluating each
1388    /// sample against an inverted range. NaN samples report an
1389    /// incomparable failure (see [`Self::at_least`]).
1390    pub fn between(self, lo: T, hi: T) -> &'v mut Verdict {
1391        let label = self.field.label.as_str();
1392        if lo > hi {
1393            push_detail(
1394                self.verdict,
1395                format!("{label} (each.between): caller error: lo={lo} > hi={hi}"),
1396            );
1397            return self.verdict;
1398        }
1399        let pre_outcomes = temporal_outcome_count(self.verdict);
1400        let n = self.field.values.len();
1401        for (i, slot) in self.field.values.iter().enumerate() {
1402            match slot {
1403                Ok(v) => {
1404                    let lo_cmp = v.partial_cmp(&lo);
1405                    let hi_cmp = v.partial_cmp(&hi);
1406                    if lo_cmp.is_none() || hi_cmp.is_none() {
1407                        push_detail(
1408                            self.verdict,
1409                            format!(
1410                                "{label} (each.between [{lo}, {hi}]): sample {tag} \
1411                                 (+{elapsed_ms}ms): value {v} is incomparable (NaN)",
1412                                tag = self.field.tags[i],
1413                                elapsed_ms = fmt_elapsed_num(self.field.elapsed_ms[i]),
1414                            ),
1415                        );
1416                    } else if matches!(lo_cmp, Some(std::cmp::Ordering::Less))
1417                        || matches!(hi_cmp, Some(std::cmp::Ordering::Greater))
1418                    {
1419                        push_detail(
1420                            self.verdict,
1421                            format!(
1422                                "{label} (each.between [{lo}, {hi}]): sample {tag} \
1423                                 (+{elapsed_ms}ms): value {v}",
1424                                tag = self.field.tags[i],
1425                                elapsed_ms = fmt_elapsed_num(self.field.elapsed_ms[i]),
1426                            ),
1427                        );
1428                    }
1429                }
1430                Err(e) => {
1431                    push_detail(
1432                        self.verdict,
1433                        format!(
1434                            "{label} (each.between [{lo}, {hi}]): sample {tag} \
1435                             (+{elapsed_ms}ms): projection error: {e}",
1436                            tag = self.field.tags[i],
1437                            elapsed_ms = fmt_elapsed_num(self.field.elapsed_ms[i]),
1438                        ),
1439                    );
1440                }
1441            }
1442        }
1443        maybe_log_pass_temporal(self.verdict, pre_outcomes, || {
1444            format!("{label} (each.between [{lo}, {hi}]): all {n} samples passed")
1445        });
1446        self.verdict
1447    }
1448}
1449
1450// ----- Seven temporal patterns -----
1451
1452impl<T> SeriesField<T>
1453where
1454    T: PartialOrd + std::fmt::Display + Copy,
1455{
1456    /// Pass when every consecutive pair satisfies
1457    /// `values[i] <= values[i+1]`. A common shape for kernel
1458    /// counters whose only legal direction is up. Per-sample
1459    /// projection errors are SKIPPED — the affected pair-comparison
1460    /// is dropped, the skip count is logged as a verdict Note so
1461    /// coverage gaps stay visible, and the verdict is NOT flipped
1462    /// on a missing-sample condition (which is structurally
1463    /// missing data, not a regression). Adjacent samples on
1464    /// either side of a gap are still checked against each other.
1465    pub fn nondecreasing<'v>(&self, verdict: &'v mut Verdict) -> &'v mut Verdict {
1466        self.monotonicity_check(verdict, false)
1467    }
1468
1469    /// Pass when every consecutive pair satisfies
1470    /// `values[i] < values[i+1]`. Useful for counters that MUST
1471    /// advance every period (e.g. a heartbeat tick). Same skip-on-
1472    /// projection-error semantics as [`Self::nondecreasing`].
1473    pub fn strictly_increasing<'v>(&self, verdict: &'v mut Verdict) -> &'v mut Verdict {
1474        self.monotonicity_check(verdict, true)
1475    }
1476
1477    fn monotonicity_check<'v>(&self, verdict: &'v mut Verdict, strict: bool) -> &'v mut Verdict {
1478        let pat = if strict {
1479            "strictly_increasing"
1480        } else {
1481            "nondecreasing"
1482        };
1483        if self.values.len() < 2 {
1484            // Vacuous: 0 or 1 samples cannot violate monotonicity.
1485            // Surface an informational note via the verdict's
1486            // notes path so the timeline summary records that the
1487            // pattern was opened against an under-populated
1488            // series — without this, a bug that drops every
1489            // periodic capture would silently pass every
1490            // monotonicity claim.
1491            verdict.note(format!(
1492                "{label} ({pat}): only {n} samples — pattern vacuously holds; \
1493                 ensure num_snapshots >= 2 for meaningful coverage",
1494                label = self.label,
1495                n = self.values.len(),
1496            ));
1497            return verdict;
1498        }
1499        let pre_outcomes = temporal_outcome_count(verdict);
1500        // Per-sample projection errors are NOT temporal failures —
1501        // they indicate the underlying field was missing on that
1502        // sample (e.g. placeholder report from a freeze-rendezvous
1503        // timeout). Skip the affected pair-comparisons and surface
1504        // the skip count as a Note on the verdict so a coverage
1505        // gap is visible without flipping a temporal pattern that
1506        // is structurally about value monotonicity. The compare
1507        // proceeds across the rest of the series without bridging
1508        // the gap (a gap means we cannot conclude anything about
1509        // monotonicity ACROSS the missing sample, only on either
1510        // side of it).
1511        let mut skipped: Vec<String> = Vec::new();
1512        for i in 0..self.values.len() - 1 {
1513            let left = match &self.values[i] {
1514                Ok(v) => v,
1515                Err(e) => {
1516                    // Surface the underlying SnapshotError variant
1517                    // (PlaceholderSample, MissingStats, FieldNotFound,
1518                    // VarNotFound, TypeMismatch, …) in the Note so
1519                    // the operator can distinguish "freeze rendezvous
1520                    // timed out" from "field name typo" from
1521                    // "stats relay had no scheduler" without
1522                    // re-running the test under a debugger. The
1523                    // Display impl on SnapshotError gives the
1524                    // human-readable variant text plus context
1525                    // (available keys, requested path).
1526                    skipped.push(format!(
1527                        "{tag}(+{elapsed_ms}ms): {e}",
1528                        tag = self.tags[i],
1529                        elapsed_ms = fmt_elapsed_num(self.elapsed_ms[i]),
1530                    ));
1531                    continue;
1532                }
1533            };
1534            let right = match &self.values[i + 1] {
1535                Ok(v) => v,
1536                Err(_) => {
1537                    // Skip recorded when the (i+1) slot becomes
1538                    // the `i` slot of the next iteration; avoid
1539                    // double-counting by only logging on the
1540                    // forward-edge here.
1541                    continue;
1542                }
1543            };
1544            let ok = if strict { right > left } else { right >= left };
1545            if !ok {
1546                push_detail(
1547                    verdict,
1548                    format!(
1549                        "{label} ({pat}): regression at sample {tag} (+{elapsed_ms}ms): \
1550                         value {right} after prior value {left} at sample {prev_tag} \
1551                         (+{prev_elapsed}ms)",
1552                        label = self.label,
1553                        tag = self.tags[i + 1],
1554                        elapsed_ms = fmt_elapsed_num(self.elapsed_ms[i + 1]),
1555                        prev_tag = self.tags[i],
1556                        prev_elapsed = fmt_elapsed_num(self.elapsed_ms[i]),
1557                    ),
1558                );
1559            }
1560        }
1561        // The final sample's err state was not visited by the
1562        // loop's left-arm; check it explicitly so the skip count
1563        // is exhaustive. Carry the same `: {e}` rendering used
1564        // above so the trailing skip entry exposes the error
1565        // variant just like every other entry.
1566        if let Some(last) = self.values.last()
1567            && let Err(e) = last
1568        {
1569            let i = self.values.len() - 1;
1570            skipped.push(format!(
1571                "{tag}(+{elapsed_ms}ms): {e}",
1572                tag = self.tags[i],
1573                elapsed_ms = fmt_elapsed_num(self.elapsed_ms[i]),
1574            ));
1575        }
1576        if !skipped.is_empty() {
1577            verdict.note(format!(
1578                "{label} ({pat}): skipped {n} sample(s) with projection errors: \
1579                 {samples}",
1580                label = self.label,
1581                n = skipped.len(),
1582                samples = skipped.join(", "),
1583            ));
1584        }
1585        maybe_log_pass_temporal(verdict, pre_outcomes, || {
1586            format!(
1587                "{label} ({pat}): all {n} samples passed",
1588                label = self.label,
1589                n = self.values.len(),
1590            )
1591        });
1592        verdict
1593    }
1594}
1595
1596impl SeriesField<f64> {
1597    /// Pass when every consecutive (delta_value / delta_ms) lies
1598    /// in `[lo, hi]`. The rate is computed with millisecond
1599    /// resolution from the per-sample elapsed-ms timestamps, so
1600    /// a counter that should advance at ~1 unit/ms reads cleanly
1601    /// as `rate_within(0.5, 2.0)`. A zero-time delta between
1602    /// adjacent samples lands as a caller-side or framework
1603    /// failure (samples too close to compute a rate); the detail
1604    /// names the offending pair.
1605    pub fn rate_within<'v>(&self, verdict: &'v mut Verdict, lo: f64, hi: f64) -> &'v mut Verdict {
1606        if lo > hi {
1607            push_detail(
1608                verdict,
1609                format!(
1610                    "{label} (rate_within): caller error: lo={lo} > hi={hi}",
1611                    label = self.label,
1612                ),
1613            );
1614            return verdict;
1615        }
1616        if self.values.len() < 2 {
1617            verdict.note(format!(
1618                "{label} (rate_within): only {n} samples — pattern vacuously holds",
1619                label = self.label,
1620                n = self.values.len(),
1621            ));
1622            return verdict;
1623        }
1624        let pre_outcomes = temporal_outcome_count(verdict);
1625        // Per-sample projection errors are treated as GAPS — no
1626        // rate is computed across the gap. Log every gap with the
1627        // underlying error variant via a Note so a coverage
1628        // problem is visible (with WHICH error) without flipping
1629        // the verdict on what is structurally a missing-data
1630        // condition, not a rate violation. When BOTH endpoints of
1631        // a pair errored, both errors are surfaced so the operator
1632        // can tell whether the projection has a per-sample
1633        // coverage hole on one side or a systemic problem on
1634        // both.
1635        let mut gaps: Vec<String> = Vec::new();
1636        for i in 0..self.values.len() - 1 {
1637            let (left, right) = match (&self.values[i], &self.values[i + 1]) {
1638                (Ok(l), Ok(r)) => (*l, *r),
1639                (lhs_slot, rhs_slot) => {
1640                    let mut endpoints: Vec<String> = Vec::with_capacity(2);
1641                    if let Err(e) = lhs_slot {
1642                        endpoints.push(format!(
1643                            "{tag}(+{elapsed_ms}ms): {e}",
1644                            tag = self.tags[i],
1645                            elapsed_ms = fmt_elapsed_num(self.elapsed_ms[i]),
1646                        ));
1647                    }
1648                    if let Err(e) = rhs_slot {
1649                        endpoints.push(format!(
1650                            "{tag}(+{elapsed_ms}ms): {e}",
1651                            tag = self.tags[i + 1],
1652                            elapsed_ms = fmt_elapsed_num(self.elapsed_ms[i + 1]),
1653                        ));
1654                    }
1655                    gaps.push(endpoints.join(" | "));
1656                    continue;
1657                }
1658            };
1659            // A None elapsed endpoint = the bridge recorded no timestamp
1660            // for that sample: the interval's duration is
1661            // UNDEFINED, so the rate over it cannot be computed. Skip the
1662            // pair (record it in `gaps`) rather than fabricating a dt from
1663            // a `0` — this runs BEFORE the dt<=0 guard, which applies only
1664            // once both endpoints are measured.
1665            let (Some(prev_ms), Some(next_ms)) = (self.elapsed_ms[i], self.elapsed_ms[i + 1])
1666            else {
1667                gaps.push(format!(
1668                    "{prev_tag}(+{prev_elapsed}ms)..{tag}(+{elapsed_ms}ms): elapsed not measured",
1669                    prev_tag = self.tags[i],
1670                    prev_elapsed = fmt_elapsed_num(self.elapsed_ms[i]),
1671                    tag = self.tags[i + 1],
1672                    elapsed_ms = fmt_elapsed_num(self.elapsed_ms[i + 1]),
1673                ));
1674                continue;
1675            };
1676            let dt_ms = next_ms.saturating_sub(prev_ms) as f64;
1677            if dt_ms <= 0.0 {
1678                push_inconclusive(
1679                    verdict,
1680                    format!(
1681                        "{label} (rate_within): zero-time delta between sample {prev_tag} \
1682                         (+{prev_elapsed}ms) and {tag} (+{elapsed_ms}ms) — denominator is \
1683                         INSTRUMENT-derived; rate is neither pass nor fail",
1684                        label = self.label,
1685                        prev_tag = self.tags[i],
1686                        prev_elapsed = fmt_elapsed_num(self.elapsed_ms[i]),
1687                        tag = self.tags[i + 1],
1688                        elapsed_ms = fmt_elapsed_num(self.elapsed_ms[i + 1]),
1689                    ),
1690                );
1691                continue;
1692            }
1693            let rate = (right - left) / dt_ms;
1694            // NaN can arise from inf-inf or NaN endpoints; raw `<`/`>`
1695            // against NaN is always false, so a NaN rate would
1696            // silently slip past the band check. Infinite rates
1697            // (inf endpoint, or finite endpoints whose difference
1698            // overflows f64) are also an upstream data corruption
1699            // signal — caller has no use for the band comparison
1700            // when the value is non-finite. Both cases get a
1701            // structured detail naming the sample pair so the
1702            // operator sees the offending span.
1703            if !rate.is_finite() {
1704                push_detail(
1705                    verdict,
1706                    format!(
1707                        "{label} (rate_within [{lo}, {hi}]): non-finite rate between \
1708                         samples {prev_tag} (+{prev_elapsed}ms, value {left}) and \
1709                         {tag} (+{elapsed_ms}ms, value {right}) — endpoint is NaN \
1710                         or produced inf in the delta",
1711                        label = self.label,
1712                        prev_tag = self.tags[i],
1713                        prev_elapsed = fmt_elapsed_num(self.elapsed_ms[i]),
1714                        tag = self.tags[i + 1],
1715                        elapsed_ms = fmt_elapsed_num(self.elapsed_ms[i + 1]),
1716                    ),
1717                );
1718            } else if rate < lo || rate > hi {
1719                push_detail(
1720                    verdict,
1721                    format!(
1722                        "{label} (rate_within [{lo}, {hi}]): rate {rate:.4}/ms between \
1723                         samples {prev_tag} (+{prev_elapsed}ms, value {left}) and \
1724                         {tag} (+{elapsed_ms}ms, value {right})",
1725                        label = self.label,
1726                        prev_tag = self.tags[i],
1727                        prev_elapsed = fmt_elapsed_num(self.elapsed_ms[i]),
1728                        tag = self.tags[i + 1],
1729                        elapsed_ms = fmt_elapsed_num(self.elapsed_ms[i + 1]),
1730                    ),
1731                );
1732            }
1733        }
1734        if !gaps.is_empty() {
1735            verdict.note(format!(
1736                "{label} (rate_within): {n} consecutive-pair gap(s) skipped \
1737                 due to a projection error or an unmeasured elapsed timestamp \
1738                 on at least one endpoint: {samples}",
1739                label = self.label,
1740                n = gaps.len(),
1741                samples = gaps.join(", "),
1742            ));
1743        }
1744        maybe_log_pass_temporal(verdict, pre_outcomes, || {
1745            format!(
1746                "{label} (rate_within [{lo}, {hi}]): all {n} consecutive-pair rates within band",
1747                label = self.label,
1748                n = self.values.len().saturating_sub(1),
1749            )
1750        });
1751        verdict
1752    }
1753
1754    /// Pass when every post-warmup sample (`elapsed_ms >=
1755    /// warmup_ms`) lies inside `mean·(1-tolerance), mean·(1+tolerance)`.
1756    /// `tolerance` is a fraction (0.10 = ±10%). The mean is
1757    /// computed over the post-warmup samples only — the warmup
1758    /// region is excluded so ramp-up does not bias the steady-
1759    /// state baseline. Per-sample projection errors are SKIPPED
1760    /// (with a verdict Note logging the count and tags); they are
1761    /// treated as gaps in coverage, not band violations, so a
1762    /// missing post-warmup sample does not flip the verdict.
1763    pub fn steady_within<'v>(
1764        &self,
1765        verdict: &'v mut Verdict,
1766        warmup_ms: u64,
1767        tolerance: f64,
1768    ) -> &'v mut Verdict {
1769        if tolerance < 0.0 {
1770            push_detail(
1771                verdict,
1772                format!(
1773                    "{label} (steady_within): caller error: tolerance {tolerance} negative",
1774                    label = self.label,
1775                ),
1776            );
1777            return verdict;
1778        }
1779        let pre_outcomes = temporal_outcome_count(verdict);
1780        let mut active: Vec<(usize, f64)> = Vec::new();
1781        let mut skipped: Vec<String> = Vec::new();
1782        // Track whether any sample's elapsed_ms reached or exceeded
1783        // warmup_ms — distinguishes "warmup window absorbed every
1784        // sample" (genuine vacuous-pass) from "post-warmup samples
1785        // existed but all errored" (skip-Note already covers it).
1786        let mut any_post_warmup = false;
1787        for (i, slot) in self.values.iter().enumerate() {
1788            // A None timestamp cannot be placed relative to
1789            // the warmup window: skip with a Note rather than treating it
1790            // as 0 (< warmup, silently dropped) or admitting an
1791            // untimestamped value into the post-warmup band.
1792            let Some(ms) = self.elapsed_ms[i] else {
1793                skipped.push(format!(
1794                    "{tag}(+?ms): elapsed not measured (cannot place vs warmup)",
1795                    tag = self.tags[i],
1796                ));
1797                continue;
1798            };
1799            if ms < warmup_ms {
1800                continue;
1801            }
1802            any_post_warmup = true;
1803            match slot {
1804                // A non-finite value (NaN/inf) cannot be band-checked
1805                // — `v < lo` is always false for NaN — and a single
1806                // NaN poisons the mean (line 1864), making `lo`/`hi` NaN so
1807                // EVERY sample slips past the band and the assertion
1808                // silently PASSES. Treat a non-finite value as a gap,
1809                // like a projection error: drop it from the band
1810                // population (so it can neither poison the mean nor
1811                // slip the band) and surface it in the skip Note.
1812                Ok(v) if v.is_finite() => active.push((i, *v)),
1813                Ok(v) => skipped.push(format!(
1814                    "{tag}(+{elapsed_ms}ms): non-finite value {v}",
1815                    tag = self.tags[i],
1816                    elapsed_ms = fmt_elapsed_num(self.elapsed_ms[i]),
1817                )),
1818                // Per-sample projection errors are treated as
1819                // gaps: a missing post-warmup sample cannot
1820                // violate the steady-state band (we have no value
1821                // to compare). Surface the skip count and the
1822                // underlying SnapshotError variant for each
1823                // skipped sample via a Note so the operator can
1824                // tell PlaceholderSample / MissingStats /
1825                // FieldNotFound / TypeMismatch apart instead of
1826                // collapsing every gap into "projection error" —
1827                // a coverage hole is visible WITH the failure
1828                // reason without flipping the verdict on what is
1829                // structurally missing data, not a band violation.
1830                Err(e) => skipped.push(format!(
1831                    "{tag}(+{elapsed_ms}ms): {e}",
1832                    tag = self.tags[i],
1833                    elapsed_ms = fmt_elapsed_num(self.elapsed_ms[i]),
1834                )),
1835            }
1836        }
1837        if !skipped.is_empty() {
1838            verdict.note(format!(
1839                "{label} (steady_within): skipped {n} sample(s) with a projection \
1840                 error or an unmeasured elapsed timestamp: {samples}",
1841                label = self.label,
1842                n = skipped.len(),
1843                samples = skipped.join(", "),
1844            ));
1845        }
1846        if active.is_empty() {
1847            // Only emit the vacuous-warmup Note when the warmup
1848            // window genuinely absorbed every sample (no
1849            // post-warmup samples existed). When post-warmup
1850            // samples existed but all errored, the
1851            // skipped-with-projection-errors Note above already
1852            // explained the empty active set; emitting a second
1853            // Note here would falsely claim "no samples beyond
1854            // warmup".
1855            if !any_post_warmup {
1856                verdict.note(format!(
1857                    "{label} (steady_within): no samples beyond warmup_ms={warmup_ms} — \
1858                     pattern vacuously holds",
1859                    label = self.label,
1860                ));
1861            }
1862            return verdict;
1863        }
1864        let mean: f64 = active.iter().map(|(_, v)| *v).sum::<f64>() / (active.len() as f64);
1865        let lo = mean * (1.0 - tolerance);
1866        let hi = mean * (1.0 + tolerance);
1867        // For negative means (pathological), the multiplication
1868        // flips the band; protect by sorting.
1869        let (lo, hi) = if lo <= hi { (lo, hi) } else { (hi, lo) };
1870        let active_count = active.len();
1871        for (i, v) in active {
1872            if v < lo || v > hi {
1873                push_detail(
1874                    verdict,
1875                    format!(
1876                        "{label} (steady_within mean {mean:.4} ±{pct:.1}%): \
1877                         sample {tag} (+{elapsed_ms}ms): value {v} outside [{lo:.4}, {hi:.4}]",
1878                        label = self.label,
1879                        pct = tolerance * 100.0,
1880                        tag = self.tags[i],
1881                        elapsed_ms = fmt_elapsed_num(self.elapsed_ms[i]),
1882                    ),
1883                );
1884            }
1885        }
1886        maybe_log_pass_temporal(verdict, pre_outcomes, || {
1887            format!(
1888                "{label} (steady_within mean {mean:.4} ±{pct:.1}%): all {n} post-warmup samples in band",
1889                label = self.label,
1890                pct = tolerance * 100.0,
1891                n = active_count,
1892            )
1893        });
1894        verdict
1895    }
1896
1897    /// Pass when three consecutive samples land inside
1898    /// `[target-tolerance, target+tolerance]` AT OR BEFORE
1899    /// `deadline_ms`. The intent is "the system stabilizes near
1900    /// `target` by the deadline" — three consecutive in-band
1901    /// samples are the convergence-witness shape. Failures fire
1902    /// when the deadline passes without a witness.
1903    pub fn converges_to<'v>(
1904        &self,
1905        verdict: &'v mut Verdict,
1906        target: f64,
1907        tolerance: f64,
1908        deadline_ms: u64,
1909    ) -> &'v mut Verdict {
1910        if tolerance < 0.0 {
1911            push_detail(
1912                verdict,
1913                format!(
1914                    "{label} (converges_to): caller error: tolerance {tolerance} negative",
1915                    label = self.label,
1916                ),
1917            );
1918            return verdict;
1919        }
1920        let pre_outcomes = temporal_outcome_count(verdict);
1921        // Pre-check: counting all successfully-projected samples
1922        // (within the deadline window) do we have enough evidence
1923        // to even attempt a 3-consecutive witness? When fewer
1924        // than 3 successfully-projected samples exist before the
1925        // deadline, record an explicit Note (NOT a verdict
1926        // failure) and return — absence of data is a coverage gap
1927        // surfaced for the operator, not a negative finding the
1928        // verdict should fail on. Distinguishes "did not collect
1929        // enough samples" (Note here) from "collected enough
1930        // samples but never converged" (the no-witness Temporal
1931        // detail emitted below by the witness loop). The Note
1932        // names every errored in-window sample with its underlying
1933        // SnapshotError variant so the operator can tell
1934        // PlaceholderSample / MissingStats / FieldNotFound apart
1935        // when diagnosing a coverage hole — a count alone hides
1936        // which kind of failure produced the gap.
1937        let mut projected_count: usize = 0;
1938        let mut error_samples: Vec<String> = Vec::new();
1939        for (i, slot) in self.values.iter().enumerate() {
1940            // A None timestamp cannot be placed before/after
1941            // the deadline: skip it from the projected-sample count
1942            // rather than counting it as 0 <= deadline, which would
1943            // falsely admit an untimestamped sample into the window.
1944            if self.elapsed_ms[i].is_none_or(|ms| ms > deadline_ms) {
1945                continue;
1946            }
1947            match slot {
1948                Ok(_) => projected_count += 1,
1949                Err(e) => error_samples.push(format!(
1950                    "{tag}(+{elapsed_ms}ms): {e}",
1951                    tag = self.tags[i],
1952                    elapsed_ms = fmt_elapsed_num(self.elapsed_ms[i]),
1953                )),
1954            }
1955        }
1956        if projected_count < 3 {
1957            let suffix = if error_samples.is_empty() {
1958                String::new()
1959            } else {
1960                format!("; errored sample(s): {}", error_samples.join(", "))
1961            };
1962            verdict.note(format!(
1963                "{label} (converges_to {target} ±{tolerance}, deadline_ms={deadline_ms}): \
1964                 insufficient samples for converges_to (need ≥3, have {projected_count}){suffix}",
1965                label = self.label,
1966            ));
1967            return verdict;
1968        }
1969        let lo = target - tolerance;
1970        let hi = target + tolerance;
1971        let mut consecutive: usize = 0;
1972        let mut witness_idx: Option<usize> = None;
1973        // Errored in-window samples that interrupted the
1974        // 3-consecutive witness search. Recorded here even when
1975        // the projected_count >= 3 pre-check passed so a verdict
1976        // failure ("no witness") still names the error variants
1977        // that broke each attempted run — the operator can see
1978        // whether the missing-witness was caused by genuine
1979        // out-of-band values or by a coverage hole resetting the
1980        // consecutive counter mid-run.
1981        let mut interrupting_errors: Vec<String> = Vec::new();
1982        for (i, slot) in self.values.iter().enumerate() {
1983            // A None timestamp cannot be placed before/after
1984            // the deadline: treat it as out-of-window (skip / reset the
1985            // witness run) rather than as 0 <= deadline, which would
1986            // falsely admit an untimestamped sample into the window.
1987            if self.elapsed_ms[i].is_none_or(|ms| ms > deadline_ms) {
1988                consecutive = 0;
1989                continue;
1990            }
1991            match slot {
1992                Ok(v) => {
1993                    if *v >= lo && *v <= hi {
1994                        consecutive += 1;
1995                        if consecutive >= 3 {
1996                            witness_idx = Some(i);
1997                            break;
1998                        }
1999                    } else {
2000                        consecutive = 0;
2001                    }
2002                }
2003                Err(e) => {
2004                    if consecutive > 0 {
2005                        // Only record the error when it actually
2006                        // interrupted an in-progress run — a
2007                        // string of out-of-band errors before any
2008                        // in-band samples is irrelevant to
2009                        // witness coverage.
2010                        interrupting_errors.push(format!(
2011                            "{tag}(+{elapsed_ms}ms): {e}",
2012                            tag = self.tags[i],
2013                            elapsed_ms = fmt_elapsed_num(self.elapsed_ms[i]),
2014                        ));
2015                    }
2016                    consecutive = 0;
2017                }
2018            }
2019        }
2020        if witness_idx.is_none() {
2021            let suffix = if interrupting_errors.is_empty() {
2022                String::new()
2023            } else {
2024                format!(
2025                    "; in-progress runs interrupted by errored sample(s): {}",
2026                    interrupting_errors.join(", ")
2027                )
2028            };
2029            push_detail(
2030                verdict,
2031                format!(
2032                    "{label} (converges_to {target} ±{tolerance}, deadline_ms={deadline_ms}): \
2033                     no 3-consecutive-in-band witness before deadline ({n} samples evaluated){suffix}",
2034                    label = self.label,
2035                    n = self.values.len(),
2036                ),
2037            );
2038        }
2039        maybe_log_pass_temporal(verdict, pre_outcomes, || {
2040            let where_at = witness_idx
2041                .map(|i| {
2042                    format!(
2043                        "{tag} (+{elapsed_ms}ms)",
2044                        tag = self.tags[i],
2045                        elapsed_ms = fmt_elapsed_num(self.elapsed_ms[i]),
2046                    )
2047                })
2048                .unwrap_or_else(|| "<unreached>".to_string());
2049            format!(
2050                "{label} (converges_to {target} ±{tolerance}, deadline_ms={deadline_ms}): \
2051                 3-consecutive-in-band witness reached at {where_at}",
2052                label = self.label,
2053            )
2054        });
2055        verdict
2056    }
2057
2058    /// Pass when every consecutive `(self_value / other_value)`
2059    /// lies in `[lo, hi]`. Cross-field correlation: e.g. ensure a
2060    /// per-cgroup utilization always tracks a per-cgroup runtime
2061    /// within a fixed band. The two series MUST have matching
2062    /// length and tags; mismatches fire a single caller-error
2063    /// detail. Per-sample projection errors on EITHER lhs or rhs
2064    /// are SKIPPED — the affected pair is dropped, the skip count
2065    /// is logged as a verdict Note, and the verdict is NOT flipped
2066    /// on missing-data conditions.
2067    pub fn ratio_within<'v>(
2068        &self,
2069        verdict: &'v mut Verdict,
2070        other: &SeriesField<f64>,
2071        lo: f64,
2072        hi: f64,
2073    ) -> &'v mut Verdict {
2074        if lo > hi {
2075            push_detail(
2076                verdict,
2077                format!(
2078                    "{label} (ratio_within): caller error: lo={lo} > hi={hi}",
2079                    label = self.label,
2080                ),
2081            );
2082            return verdict;
2083        }
2084        if self.values.len() != other.values.len() {
2085            push_detail(
2086                verdict,
2087                format!(
2088                    "{label} (ratio_within {other}): caller error: length mismatch \
2089                     (this {n}, other {m})",
2090                    label = self.label,
2091                    other = other.label,
2092                    n = self.values.len(),
2093                    m = other.values.len(),
2094                ),
2095            );
2096            return verdict;
2097        }
2098        let pre_outcomes = temporal_outcome_count(verdict);
2099        // Per-sample projection errors on either lhs or rhs are
2100        // treated as gaps — no ratio is computed across the pair.
2101        // Surface every gap with the underlying error variant
2102        // (and which side errored: lhs / rhs / both) via a Note
2103        // so a coverage hole is visible WITH the failure reason
2104        // without flipping the verdict on what is structurally
2105        // missing data. The Display impl on SnapshotError gives
2106        // the variant text plus context (FieldNotFound's
2107        // available keys, TypeMismatch's expected/actual,
2108        // PlaceholderSample's reason) so the operator can tell
2109        // failure modes apart instead of collapsing every gap
2110        // into "projection error on one side".
2111        let mut gaps: Vec<String> = Vec::new();
2112        for (i, (lhs_slot, rhs_slot)) in self.values.iter().zip(other.values.iter()).enumerate() {
2113            let (lhs, rhs) = match (lhs_slot, rhs_slot) {
2114                (Ok(l), Ok(r)) => (*l, *r),
2115                _ => {
2116                    // Each side carries its own tag + elapsed_ms —
2117                    // the two SeriesFields can be projected from
2118                    // different rows of the same SampleSeries with
2119                    // distinct tags at index `i`, so a single outer
2120                    // tag would mislabel the RHS endpoint. Fold the
2121                    // per-side identity into each entry instead.
2122                    let mut endpoints: Vec<String> = Vec::with_capacity(2);
2123                    if let Err(e) = lhs_slot {
2124                        endpoints.push(format!(
2125                            "lhs {tag}(+{elapsed_ms}ms): {e}",
2126                            tag = self.tags[i],
2127                            elapsed_ms = fmt_elapsed_num(self.elapsed_ms[i]),
2128                        ));
2129                    }
2130                    if let Err(e) = rhs_slot {
2131                        endpoints.push(format!(
2132                            "rhs {tag}(+{elapsed_ms}ms): {e}",
2133                            tag = other.tags[i],
2134                            elapsed_ms = fmt_elapsed_num(other.elapsed_ms[i]),
2135                        ));
2136                    }
2137                    gaps.push(endpoints.join(" | "));
2138                    continue;
2139                }
2140            };
2141            if rhs == 0.0 {
2142                push_inconclusive(
2143                    verdict,
2144                    format!(
2145                        "{label} (ratio_within): rhs == 0 at sample {tag} (+{elapsed_ms}ms) — \
2146                         denominator is INSTRUMENT-derived; ratio is neither pass nor fail",
2147                        label = self.label,
2148                        tag = self.tags[i],
2149                        elapsed_ms = fmt_elapsed_num(self.elapsed_ms[i]),
2150                    ),
2151                );
2152                continue;
2153            }
2154            let ratio = lhs / rhs;
2155            // A NaN lhs/rhs (or finite endpoints whose quotient
2156            // overflows to inf) yields a non-finite ratio; the
2157            // `rhs == 0.0` guard above misses it (NaN != 0.0). Raw
2158            // `<`/`>` against NaN is always false, so a non-finite
2159            // ratio would silently slip past the band check and PASS.
2160            // Surface it as a detail naming the pair, mirroring
2161            // rate_within's non-finite-rate guard.
2162            if !ratio.is_finite() {
2163                push_detail(
2164                    verdict,
2165                    format!(
2166                        "{label} (ratio_within {other_label} [{lo}, {hi}]): non-finite \
2167                         ratio at sample {tag} (+{elapsed_ms}ms) — lhs={lhs} rhs={rhs}",
2168                        label = self.label,
2169                        other_label = other.label,
2170                        tag = self.tags[i],
2171                        elapsed_ms = fmt_elapsed_num(self.elapsed_ms[i]),
2172                    ),
2173                );
2174            } else if ratio < lo || ratio > hi {
2175                push_detail(
2176                    verdict,
2177                    format!(
2178                        "{label} (ratio_within {other_label} [{lo}, {hi}]): \
2179                         ratio {ratio:.4} at sample {tag} (+{elapsed_ms}ms) — \
2180                         lhs={lhs} rhs={rhs}",
2181                        label = self.label,
2182                        other_label = other.label,
2183                        tag = self.tags[i],
2184                        elapsed_ms = fmt_elapsed_num(self.elapsed_ms[i]),
2185                    ),
2186                );
2187            }
2188        }
2189        if !gaps.is_empty() {
2190            verdict.note(format!(
2191                "{label} (ratio_within): {n} pair(s) skipped due to projection \
2192                 errors on lhs or rhs: {samples}",
2193                label = self.label,
2194                n = gaps.len(),
2195                samples = gaps.join(", "),
2196            ));
2197        }
2198        maybe_log_pass_temporal(verdict, pre_outcomes, || {
2199            format!(
2200                "{label} (ratio_within {other} [{lo}, {hi}]): all {n} pair ratios in band",
2201                label = self.label,
2202                other = other.label,
2203                n = self.values.len(),
2204            )
2205        });
2206        verdict
2207    }
2208}
2209
2210impl SeriesField<bool> {
2211    /// Pass when every sample's value is `true`. Per-sample
2212    /// projection errors fail the assertion. Use for boolean
2213    /// invariants — e.g. "scheduler is alive at every periodic
2214    /// boundary" projected as `snap.var("scheduler_alive").as_bool()`.
2215    pub fn always_true<'v>(&self, verdict: &'v mut Verdict) -> &'v mut Verdict {
2216        let pre_outcomes = temporal_outcome_count(verdict);
2217        for (i, slot) in self.values.iter().enumerate() {
2218            match slot {
2219                Ok(v) => {
2220                    if !*v {
2221                        push_detail(
2222                            verdict,
2223                            format!(
2224                                "{label} (always_true): sample {tag} (+{elapsed_ms}ms): \
2225                                 value false",
2226                                label = self.label,
2227                                tag = self.tags[i],
2228                                elapsed_ms = fmt_elapsed_num(self.elapsed_ms[i]),
2229                            ),
2230                        );
2231                    }
2232                }
2233                Err(e) => {
2234                    push_detail(
2235                        verdict,
2236                        format!(
2237                            "{label} (always_true): sample {tag} (+{elapsed_ms}ms): \
2238                             projection error: {e}",
2239                            label = self.label,
2240                            tag = self.tags[i],
2241                            elapsed_ms = fmt_elapsed_num(self.elapsed_ms[i]),
2242                        ),
2243                    );
2244                }
2245            }
2246        }
2247        maybe_log_pass_temporal(verdict, pre_outcomes, || {
2248            format!(
2249                "{label} (always_true): all {n} samples true",
2250                label = self.label,
2251                n = self.values.len(),
2252            )
2253        });
2254        verdict
2255    }
2256}
2257
2258fn push_detail(verdict: &mut Verdict, message: String) {
2259    verdict
2260        .result_mut()
2261        .record_fail(AssertDetail::new(DetailKind::Temporal, message));
2262}
2263
2264/// Inconclusive-arm sibling of [`push_detail`]. Records one
2265/// `Outcome::Inconclusive` with a [`DetailKind::Temporal`] detail.
2266/// Use for INSTRUMENT-derived zero-denominator paths — a
2267/// zero-time-delta between two consecutive samples in
2268/// [`SeriesField::rate_within`] or a zero rhs in
2269/// [`SeriesField::ratio_within`] cannot be computed, so the
2270/// verdict is neither pass nor fail (see [`Outcome`] doc's
2271/// INSTRUMENT vs POLICY carve-out).
2272fn push_inconclusive(verdict: &mut Verdict, message: String) {
2273    verdict
2274        .result_mut()
2275        .record_inconclusive(AssertDetail::new(DetailKind::Temporal, message));
2276}
2277
2278/// Count `DetailKind::Temporal` Fail + Inconclusive outcomes in
2279/// `verdict`'s underlying result. Used by
2280/// [`maybe_log_pass_temporal`] to gate the positive-confirmation
2281/// log on "this pattern added zero Temporal Fail or Inconclusive
2282/// outcomes." Inconclusives count because a pattern that emitted
2283/// only Inconclusives is not in a state where logging "passed"
2284/// would be truthful. Vacuous-pattern and projection-error skip
2285/// notes live on `AssertResult::info_notes` (a structurally-separate
2286/// field from outcomes) and are therefore naturally excluded from
2287/// this count, so a pattern that emits notes but no Fail or
2288/// Inconclusive outcomes still trips the positive log.
2289fn temporal_outcome_count(verdict: &Verdict) -> usize {
2290    verdict
2291        .result()
2292        .outcomes
2293        .iter()
2294        .filter(|o| {
2295            matches!(
2296                o,
2297                Outcome::Fail(d) | Outcome::Inconclusive(d) if matches!(d.kind, DetailKind::Temporal)
2298            )
2299        })
2300        .count()
2301}
2302
2303/// Positive-confirmation mirror of [`push_detail`] /
2304/// [`push_inconclusive`]. Emits a `tracing::info!` event naming
2305/// the temporal pattern and its sample count IFF
2306/// [`Verdict::log_passes`] is on AND the calling pattern added no
2307/// `DetailKind::Temporal` Fail or Inconclusive outcomes over its
2308/// run (compared via `pre_outcomes` captured at pattern entry via
2309/// [`temporal_outcome_count`]).
2310///
2311/// The pre/post gate is what makes this a positive confirmation —
2312/// a pattern that emitted a [`push_detail`] or [`push_inconclusive`]
2313/// mid-run stays silent here so a partial failure or inconclusive
2314/// does not log a misleading "passed" event. The closure
2315/// constructs the message only when both gates pass, so the
2316/// `format!` cost is paid only on the explicit opt-in + a clean
2317/// pattern run.
2318fn maybe_log_pass_temporal<F: FnOnce() -> String>(
2319    verdict: &Verdict,
2320    pre_outcomes: usize,
2321    message: F,
2322) {
2323    if verdict.log_passes() && temporal_outcome_count(verdict) == pre_outcomes {
2324        let m = message();
2325        tracing::info!(target: "ktstr::assert::temporal", "{m}");
2326    }
2327}
2328
2329#[allow(dead_code)]
2330fn _silence_snapshot_error_import(_: SnapshotError) {}
2331
2332#[cfg(test)]
2333mod tests {
2334    use super::*;
2335    use crate::scenario::sample::SampleSeries;
2336    use crate::scenario::snapshot::{SnapshotError, SnapshotResult};
2337    use crate::test_support::Polarity;
2338
2339    // -- BetterThanPhase polarity decision (the pure better_outcome core) --
2340
2341    #[test]
2342    fn better_outcome_lower_is_better_strict() {
2343        let p = Some(Polarity::LowerBetter);
2344        assert_eq!(
2345            better_outcome(Some(100.0), Some(50.0), p, None),
2346            BetterOutcome::Pass
2347        );
2348        assert_eq!(
2349            better_outcome(Some(50.0), Some(100.0), p, None),
2350            BetterOutcome::Fail
2351        );
2352        assert_eq!(
2353            better_outcome(Some(50.0), Some(50.0), p, None),
2354            BetterOutcome::Fail,
2355            "equal is not STRICTLY better"
2356        );
2357    }
2358
2359    #[test]
2360    fn better_outcome_higher_is_better_strict() {
2361        let p = Some(Polarity::HigherBetter);
2362        assert_eq!(
2363            better_outcome(Some(50.0), Some(100.0), p, None),
2364            BetterOutcome::Pass
2365        );
2366        assert_eq!(
2367            better_outcome(Some(100.0), Some(50.0), p, None),
2368            BetterOutcome::Fail
2369        );
2370        assert_eq!(
2371            better_outcome(Some(50.0), Some(50.0), p, None),
2372            BetterOutcome::Fail
2373        );
2374    }
2375
2376    #[test]
2377    fn better_outcome_margin_is_a_baseline_fraction() {
2378        let lb = Some(Polarity::LowerBetter);
2379        let hb = Some(Polarity::HigherBetter);
2380        // LOWER: 10% better = candidate <= 90.
2381        assert_eq!(
2382            better_outcome(Some(100.0), Some(90.0), lb, Some(0.1)),
2383            BetterOutcome::Pass
2384        );
2385        assert_eq!(
2386            better_outcome(Some(100.0), Some(91.0), lb, Some(0.1)),
2387            BetterOutcome::Fail,
2388            "9% short of the required 10%"
2389        );
2390        // HIGHER: 10% better = candidate >= 110.
2391        assert_eq!(
2392            better_outcome(Some(100.0), Some(110.0), hb, Some(0.1)),
2393            BetterOutcome::Pass
2394        );
2395        assert_eq!(
2396            better_outcome(Some(100.0), Some(109.0), hb, Some(0.1)),
2397            BetterOutcome::Fail
2398        );
2399        // margin 0.0 = "no regression": equal passes, worse fails.
2400        assert_eq!(
2401            better_outcome(Some(100.0), Some(100.0), lb, Some(0.0)),
2402            BetterOutcome::Pass
2403        );
2404        assert_eq!(
2405            better_outcome(Some(100.0), Some(101.0), lb, Some(0.0)),
2406            BetterOutcome::Fail
2407        );
2408    }
2409
2410    #[test]
2411    fn better_outcome_inconclusive_variants() {
2412        let lb = Some(Polarity::LowerBetter);
2413        // Missing: either value None (no signal in a phase).
2414        assert_eq!(
2415            better_outcome(None, Some(1.0), lb, None),
2416            BetterOutcome::Missing
2417        );
2418        assert_eq!(
2419            better_outcome(Some(1.0), None, lb, None),
2420            BetterOutcome::Missing
2421        );
2422        // Undirected: TargetValue / Unknown / unregistered (None) polarity.
2423        assert_eq!(
2424            better_outcome(Some(1.0), Some(2.0), Some(Polarity::Unknown), None),
2425            BetterOutcome::Undirected
2426        );
2427        assert_eq!(
2428            better_outcome(Some(1.0), Some(2.0), Some(Polarity::TargetValue(5.0)), None),
2429            BetterOutcome::Undirected
2430        );
2431        assert_eq!(
2432            better_outcome(Some(1.0), Some(2.0), None, None),
2433            BetterOutcome::Undirected
2434        );
2435        // ZeroBaseline: a fractional margin against a 0 baseline.
2436        assert_eq!(
2437            better_outcome(Some(0.0), Some(1.0), lb, Some(0.1)),
2438            BetterOutcome::ZeroBaseline
2439        );
2440        // ...but a STRICT (None-margin) compare against a 0 baseline is a plain
2441        // compare, not ZeroBaseline (1 < 0 is just false → Fail).
2442        assert_eq!(
2443            better_outcome(Some(0.0), Some(1.0), lb, None),
2444            BetterOutcome::Fail
2445        );
2446    }
2447
2448    #[test]
2449    fn better_outcome_corrupt_nonfinite_is_fail_not_silent_pass() {
2450        let lb = Some(Polarity::LowerBetter);
2451        assert_eq!(
2452            better_outcome(Some(f64::NAN), Some(1.0), lb, None),
2453            BetterOutcome::Corrupt
2454        );
2455        assert_eq!(
2456            better_outcome(Some(1.0), Some(f64::INFINITY), lb, None),
2457            BetterOutcome::Corrupt
2458        );
2459        // Precedence: non-finite is checked before polarity, so a NaN with an
2460        // undirected polarity is Corrupt (not Undirected) — a `<` on NaN is
2461        // silently false, so this must NOT collapse to a silent pass.
2462        assert_eq!(
2463            better_outcome(Some(f64::NAN), Some(1.0), None, None),
2464            BetterOutcome::Corrupt
2465        );
2466        // Missing (None) still wins over Corrupt — no value at all short-circuits.
2467        assert_eq!(
2468            better_outcome(None, Some(f64::NAN), lb, None),
2469            BetterOutcome::Missing
2470        );
2471    }
2472
2473    #[test]
2474    fn better_than_phase_scope_label_renders_per_cgroup_in_messages() {
2475        // Pins the BetterThanPhase `scope` rendering: a per-cgroup comparator
2476        // (Some(scope)) names its cgroup in EVERY outcome message, and the
2477        // Inconclusive wording is producer-neutral ('cross-phase better-than', not the
2478        // literal 'better_across_phases'). A None scope (the pooled producer) renders
2479        // no cgroup suffix. Without this, dropping `scope_str` or reverting the wording
2480        // would pass the suite green (the e2e only checks Ok/Err).
2481        use crate::assert::Phase;
2482        let (b, c) = (Phase::step(0), Phase::step(1));
2483        // Pass (HigherBetter, candidate 200 > baseline 100): the note names the cgroup.
2484        let mut v = Verdict::new();
2485        BetterThanPhase::new(
2486            "schbench_loop_count".to_string(),
2487            &mut v,
2488            b,
2489            c,
2490            Some(100.0),
2491            Some(200.0),
2492            Some(Polarity::HigherBetter),
2493            Some("cg_x".to_string()),
2494        )
2495        .better_than();
2496        assert!(
2497            v.into_result()
2498                .info_notes
2499                .iter()
2500                .any(|n| n.message.contains("[cgroup cg_x]")),
2501            "Pass note names the cgroup"
2502        );
2503        // Fail (candidate 100 < baseline 200, HigherBetter): the detail names it.
2504        let mut v = Verdict::new();
2505        BetterThanPhase::new(
2506            "schbench_loop_count".to_string(),
2507            &mut v,
2508            b,
2509            c,
2510            Some(200.0),
2511            Some(100.0),
2512            Some(Polarity::HigherBetter),
2513            Some("cg_x".to_string()),
2514        )
2515        .better_than();
2516        assert!(
2517            v.into_result()
2518                .failure_details()
2519                .any(|d| d.message.contains("[cgroup cg_x]")),
2520            "Fail detail names the cgroup"
2521        );
2522        // Inconclusive (Missing: no candidate value): producer-neutral wording + cgroup.
2523        let mut v = Verdict::new();
2524        BetterThanPhase::new(
2525            "schbench_loop_count".to_string(),
2526            &mut v,
2527            b,
2528            c,
2529            Some(100.0),
2530            None,
2531            Some(Polarity::HigherBetter),
2532            Some("cg_x".to_string()),
2533        )
2534        .better_than();
2535        assert!(
2536            v.into_result().inconclusive_details().any(|d| {
2537                d.message.contains("[cgroup cg_x]")
2538                    && d.message.contains("cross-phase better-than")
2539                    && !d.message.contains("better_across_phases")
2540            }),
2541            "Inconclusive names the cgroup + uses the producer-neutral wording"
2542        );
2543        // None scope (pooled producer): no cgroup suffix anywhere.
2544        let mut v = Verdict::new();
2545        BetterThanPhase::new(
2546            "schbench_loop_count".to_string(),
2547            &mut v,
2548            b,
2549            c,
2550            Some(100.0),
2551            None,
2552            Some(Polarity::HigherBetter),
2553            None,
2554        )
2555        .better_than();
2556        assert!(
2557            !v.into_result()
2558                .inconclusive_details()
2559                .any(|d| d.message.contains("[cgroup")),
2560            "None scope -> no cgroup suffix"
2561        );
2562    }
2563
2564    fn synthetic_field<T: Copy>(label: &'static str, values: Vec<(u64, T)>) -> SeriesField<T> {
2565        let tags: Vec<String> = (0..values.len())
2566            .map(|i| format!("periodic_{i:03}"))
2567            .collect();
2568        let elapsed: Vec<u64> = values.iter().map(|(t, _)| *t).collect();
2569        let vals: Vec<SnapshotResult<T>> = values.into_iter().map(|(_, v)| Ok(v)).collect();
2570        SeriesField::from_parts(label, tags, elapsed, vals)
2571    }
2572
2573    #[test]
2574    fn nondecreasing_passes_on_monotonic_series() {
2575        let f = synthetic_field("counter", vec![(100, 1u64), (200, 2u64), (300, 3u64)]);
2576        let mut v = Verdict::new();
2577        f.nondecreasing(&mut v);
2578        assert!(v.is_pass());
2579    }
2580
2581    #[test]
2582    fn nondecreasing_fails_on_regression() {
2583        let f = synthetic_field("counter", vec![(100, 5u64), (200, 3u64)]);
2584        let mut v = Verdict::new();
2585        f.nondecreasing(&mut v);
2586        let r = v.into_result();
2587        assert!(r.is_fail());
2588        assert!(r.failure_details().any(|d| d.kind == DetailKind::Temporal));
2589        assert!(r.failure_details().any(|d| d.message.contains("counter")));
2590    }
2591
2592    #[test]
2593    fn strictly_increasing_fails_on_plateau() {
2594        let f = synthetic_field("counter", vec![(100, 5u64), (200, 5u64)]);
2595        let mut v = Verdict::new();
2596        f.strictly_increasing(&mut v);
2597        let r = v.into_result();
2598        assert!(r.is_fail());
2599    }
2600
2601    #[test]
2602    fn rate_within_in_band_passes() {
2603        // Counter advances 1 unit per 100ms = 0.01/ms.
2604        let f = synthetic_field("ticks", vec![(100, 1.0f64), (200, 2.0f64), (300, 3.0f64)]);
2605        let mut v = Verdict::new();
2606        f.rate_within(&mut v, 0.005, 0.02);
2607        assert!(v.is_pass());
2608    }
2609
2610    #[test]
2611    fn rate_within_out_of_band_fails() {
2612        let f = synthetic_field("ticks", vec![(100, 1.0f64), (200, 100.0f64)]);
2613        let mut v = Verdict::new();
2614        f.rate_within(&mut v, 0.0, 0.5);
2615        assert!(!v.is_pass());
2616    }
2617
2618    /// A zero-time delta between two consecutive samples is
2619    /// INSTRUMENT-derived (the periodic monitor happened to emit
2620    /// two samples with the same elapsed_ms — typically a
2621    /// missed-tick coalescence). rate_within must record this as
2622    /// Inconclusive, NOT Fail, so a non-measurable interval cannot
2623    /// silently flip a real-pass workload into a false-fail. Pins
2624    /// the zero-denominator → Inconclusive contract on the rate
2625    /// pattern.
2626    #[test]
2627    fn rate_within_zero_dt_records_inconclusive() {
2628        let f = synthetic_field("ticks", vec![(100, 1.0f64), (100, 5.0f64)]);
2629        let mut v = Verdict::new();
2630        f.rate_within(&mut v, 0.0, 100.0);
2631        let r = v.into_result();
2632        assert!(
2633            r.is_inconclusive(),
2634            "zero-dt rate must record Inconclusive: {:?}",
2635            r.outcomes,
2636        );
2637        assert!(
2638            !r.is_fail(),
2639            "zero-dt is INSTRUMENT-derived; must NOT record Fail: {:?}",
2640            r.outcomes,
2641        );
2642        assert!(
2643            r.inconclusive_details()
2644                .any(|d| d.kind == DetailKind::Temporal
2645                    && d.message.contains("INSTRUMENT-derived")),
2646            "inconclusive detail must surface with Temporal kind and \
2647             INSTRUMENT-derived wording: {:?}",
2648            r.outcomes,
2649        );
2650    }
2651
2652    #[test]
2653    fn steady_within_skips_warmup_and_passes() {
2654        // Warmup at +0..200ms; steady at 10.0 from +300..500.
2655        let f = synthetic_field(
2656            "util",
2657            vec![
2658                (100, 100.0f64),
2659                (200, 50.0f64),
2660                (300, 10.0f64),
2661                (400, 10.0f64),
2662                (500, 10.0f64),
2663            ],
2664        );
2665        let mut v = Verdict::new();
2666        f.steady_within(&mut v, 250, 0.01);
2667        assert!(v.is_pass(), "{:?}", v.into_result().outcomes);
2668    }
2669
2670    #[test]
2671    fn steady_within_post_warmup_outlier_fails() {
2672        let f = synthetic_field("util", vec![(300, 10.0f64), (400, 10.0f64), (500, 50.0f64)]);
2673        let mut v = Verdict::new();
2674        f.steady_within(&mut v, 0, 0.10);
2675        assert!(!v.is_pass());
2676    }
2677
2678    #[test]
2679    fn converges_to_finds_witness() {
2680        let f = synthetic_field(
2681            "load",
2682            vec![
2683                (100, 10.0f64),
2684                (200, 5.0f64),
2685                (300, 1.0f64),
2686                (400, 1.0f64),
2687                (500, 1.0f64),
2688            ],
2689        );
2690        let mut v = Verdict::new();
2691        f.converges_to(&mut v, 1.0, 0.5, 1000);
2692        assert!(v.is_pass());
2693    }
2694
2695    #[test]
2696    fn converges_to_no_witness_fails() {
2697        let f = synthetic_field("load", vec![(100, 10.0f64), (200, 10.0f64), (300, 10.0f64)]);
2698        let mut v = Verdict::new();
2699        f.converges_to(&mut v, 1.0, 0.5, 500);
2700        assert!(!v.is_pass());
2701    }
2702
2703    /// REGRESSION: steady_within SKIPS a sample whose elapsed
2704    /// timestamp is None — it cannot be placed relative to warmup_ms, so
2705    /// it must be Note-skipped, never treated as 0 (< warmup, silently
2706    /// dropped) nor admitted into the steady-state band. The None sample
2707    /// here carries a wild value that would blow the band if admitted.
2708    #[test]
2709    fn steady_within_skips_none_elapsed_with_note() {
2710        let f: SeriesField<f64> = SeriesField::from_parts_with_phases_opt(
2711            "util",
2712            vec!["a".to_string(), "b".to_string(), "c".to_string()],
2713            vec![Some(300), None, Some(400)],
2714            vec![Ok(10.0), Ok(9999.0), Ok(10.0)],
2715            vec![None; 3],
2716        );
2717        let mut v = Verdict::new();
2718        f.steady_within(&mut v, 0, 0.05);
2719        let r = v.into_result();
2720        assert!(
2721            r.is_pass(),
2722            "None-elapsed sample must be skipped, not admitted into the band: {:?}",
2723            r.outcomes,
2724        );
2725        assert!(
2726            r.info_notes
2727                .iter()
2728                .any(|n| n.message.contains("not measured")),
2729            "the skipped None-elapsed sample must surface a Note: {:?}",
2730            r.info_notes,
2731        );
2732    }
2733
2734    /// REGRESSION: converges_to treats a None-elapsed sample
2735    /// as out-of-window — it RESETS the 3-consecutive witness run rather
2736    /// than bridging it. Here a,b,d are all in-band (1.0) but the None at
2737    /// index 2 breaks the run, so the longest streak is 2 → no witness →
2738    /// fail. If None were coerced to 0 (in-window, in-band), c would
2739    /// complete a 3-run and the assertion would falsely PASS.
2740    #[test]
2741    fn converges_to_none_elapsed_breaks_witness() {
2742        let f: SeriesField<f64> = SeriesField::from_parts_with_phases_opt(
2743            "load",
2744            vec![
2745                "a".to_string(),
2746                "b".to_string(),
2747                "c".to_string(),
2748                "d".to_string(),
2749            ],
2750            vec![Some(100), Some(200), None, Some(300)],
2751            vec![Ok(1.0), Ok(1.0), Ok(1.0), Ok(1.0)],
2752            vec![None; 4],
2753        );
2754        let mut v = Verdict::new();
2755        f.converges_to(&mut v, 1.0, 0.1, 1000);
2756        assert!(
2757            !v.is_pass(),
2758            "a None-elapsed sample must break the witness run, not bridge it",
2759        );
2760    }
2761
2762    #[test]
2763    fn always_true_passes_on_all_true() {
2764        let f = synthetic_field("alive", vec![(100, true), (200, true)]);
2765        let mut v = Verdict::new();
2766        f.always_true(&mut v);
2767        assert!(v.is_pass());
2768    }
2769
2770    #[test]
2771    fn always_true_fails_on_false() {
2772        let f = synthetic_field("alive", vec![(100, true), (200, false)]);
2773        let mut v = Verdict::new();
2774        f.always_true(&mut v);
2775        assert!(!v.is_pass());
2776    }
2777
2778    #[test]
2779    fn ratio_within_in_band_passes() {
2780        let lhs = synthetic_field("lhs", vec![(100, 10.0f64), (200, 20.0f64), (300, 30.0f64)]);
2781        let rhs = synthetic_field("rhs", vec![(100, 5.0f64), (200, 10.0f64), (300, 15.0f64)]);
2782        let mut v = Verdict::new();
2783        lhs.ratio_within(&mut v, &rhs, 1.5, 2.5);
2784        assert!(v.is_pass());
2785    }
2786
2787    #[test]
2788    fn ratio_within_length_mismatch_fails_caller_error() {
2789        let lhs = synthetic_field("lhs", vec![(100, 10.0f64)]);
2790        let rhs = synthetic_field("rhs", vec![(100, 5.0f64), (200, 10.0f64)]);
2791        let mut v = Verdict::new();
2792        lhs.ratio_within(&mut v, &rhs, 1.5, 2.5);
2793        assert!(!v.is_pass());
2794    }
2795
2796    /// A zero rhs at any sample is INSTRUMENT-derived (the
2797    /// projected value happened to be zero — a guest counter that
2798    /// reset, an aggregator that produced a zero bucket). The
2799    /// ratio_within pattern must record this as Inconclusive, NOT
2800    /// Fail, so a non-evaluable ratio cannot silently flip a
2801    /// real-pass workload into a false-fail. Pins the
2802    /// zero-denominator → Inconclusive contract on the ratio
2803    /// pattern.
2804    #[test]
2805    fn ratio_within_zero_rhs_records_inconclusive() {
2806        let lhs = synthetic_field("lhs", vec![(100, 10.0f64)]);
2807        let rhs = synthetic_field("rhs", vec![(100, 0.0f64)]);
2808        let mut v = Verdict::new();
2809        lhs.ratio_within(&mut v, &rhs, 1.5, 2.5);
2810        let r = v.into_result();
2811        assert!(
2812            r.is_inconclusive(),
2813            "zero-rhs ratio must record Inconclusive: {:?}",
2814            r.outcomes,
2815        );
2816        assert!(
2817            !r.is_fail(),
2818            "zero rhs is INSTRUMENT-derived; must NOT record Fail: {:?}",
2819            r.outcomes,
2820        );
2821        assert!(
2822            r.inconclusive_details()
2823                .any(|d| d.kind == DetailKind::Temporal
2824                    && d.message.contains("INSTRUMENT-derived")),
2825            "inconclusive detail must surface with Temporal kind and \
2826             INSTRUMENT-derived wording: {:?}",
2827            r.outcomes,
2828        );
2829    }
2830
2831    #[test]
2832    fn each_at_least_passes() {
2833        let f = synthetic_field("counter", vec![(100, 5u64), (200, 7u64)]);
2834        let mut v = Verdict::new();
2835        f.each(&mut v).at_least(3u64);
2836        assert!(v.is_pass());
2837    }
2838
2839    #[test]
2840    fn each_at_most_fails_on_outlier() {
2841        let f = synthetic_field("counter", vec![(100, 5u64), (200, 99u64)]);
2842        let mut v = Verdict::new();
2843        f.each(&mut v).at_most(10u64);
2844        assert!(!v.is_pass());
2845    }
2846
2847    #[test]
2848    fn each_propagates_per_sample_projection_error() {
2849        let tags = vec!["periodic_000".to_string(), "periodic_001".to_string()];
2850        let elapsed = vec![100u64, 200u64];
2851        let values: Vec<SnapshotResult<u64>> = vec![
2852            Ok(5u64),
2853            Err(SnapshotError::VarNotFound {
2854                requested: "missing".to_string(),
2855                available: vec!["a".to_string()],
2856            }),
2857        ];
2858        let f = SeriesField::from_parts("x", tags, elapsed, values);
2859        let mut v = Verdict::new();
2860        f.each(&mut v).at_least(1u64);
2861        let r = v.into_result();
2862        assert!(r.is_fail());
2863        assert!(
2864            r.failure_details()
2865                .any(|d| d.message.contains("projection error"))
2866        );
2867    }
2868
2869    // ---- iter_full() ----
2870
2871    /// iter_full on an empty SeriesField yields no items. Guards
2872    /// the trivial case so a caller threading the iterator into a
2873    /// for-loop never triggers a phantom first iteration on a
2874    /// freshly-constructed empty field.
2875    #[test]
2876    fn iter_full_empty_yields_no_items() {
2877        let f: SeriesField<u64> =
2878            SeriesField::from_parts("empty", Vec::new(), Vec::new(), Vec::new());
2879        let collected: Vec<(&str, Option<u64>, &SnapshotResult<u64>)> = f.iter_full().collect();
2880        assert!(collected.is_empty());
2881        assert_eq!(f.iter_full().count(), 0);
2882    }
2883
2884    /// iter_full on a populated SeriesField yields each
2885    /// (tag, elapsed_ms, &SnapshotResult<T>) triple in the same
2886    /// order as the underlying storage — both Ok and Err slots
2887    /// flow through unchanged. Mixes a successfully-projected
2888    /// sample with a SnapshotError variant so the test guards both
2889    /// branches of the per-sample SnapshotResult.
2890    #[test]
2891    fn iter_full_yields_triples_in_storage_order() {
2892        let tags = vec![
2893            "periodic_000".to_string(),
2894            "periodic_001".to_string(),
2895            "periodic_002".to_string(),
2896        ];
2897        let elapsed = vec![100u64, 200u64, 300u64];
2898        let values: Vec<SnapshotResult<u64>> = vec![
2899            Ok(7u64),
2900            Err(SnapshotError::VarNotFound {
2901                requested: "missing".to_string(),
2902                available: vec!["a".to_string()],
2903            }),
2904            Ok(42u64),
2905        ];
2906        let f = SeriesField::from_parts("counter", tags, elapsed, values);
2907        let collected: Vec<(&str, Option<u64>, &SnapshotResult<u64>)> = f.iter_full().collect();
2908        assert_eq!(collected.len(), 3);
2909        assert_eq!(collected[0].0, "periodic_000");
2910        assert_eq!(collected[0].1, Some(100u64));
2911        assert_eq!(collected[0].2.as_ref().ok().copied(), Some(7u64));
2912        assert_eq!(collected[1].0, "periodic_001");
2913        assert_eq!(collected[1].1, Some(200u64));
2914        assert!(collected[1].2.is_err());
2915        assert_eq!(collected[2].0, "periodic_002");
2916        assert_eq!(collected[2].1, Some(300u64));
2917        assert_eq!(collected[2].2.as_ref().ok().copied(), Some(42u64));
2918    }
2919
2920    /// iter_full's item count matches len(). Guards the
2921    /// equal-length invariant enforced at construction time
2922    /// (from_parts' assert_eq! checks): if any of the three
2923    /// vectors drifts, zip's shortest-input behavior would silently
2924    /// truncate the iterator, so a count mismatch would manifest
2925    /// here even when no slot is dereferenced.
2926    #[test]
2927    fn iter_full_count_matches_len() {
2928        let f = synthetic_field(
2929            "counter",
2930            vec![(100, 1u64), (200, 2u64), (300, 3u64), (400, 4u64)],
2931        );
2932        assert_eq!(f.iter_full().count(), f.len());
2933    }
2934
2935    /// Vacuous holding when num_snapshots < 2 records a Note, not a
2936    /// failure.
2937    #[test]
2938    fn nondecreasing_with_one_sample_records_note() {
2939        let f = synthetic_field("counter", vec![(100, 1u64)]);
2940        let mut v = Verdict::new();
2941        f.nondecreasing(&mut v);
2942        let r = v.into_result();
2943        assert!(r.is_pass());
2944        assert!(!r.info_notes.is_empty());
2945    }
2946
2947    /// End-to-end sample: sanity-check that a series projection
2948    /// flowing through a temporal pattern produces a coherent
2949    /// verdict. The `SampleSeries` shape exercise lives in
2950    /// `src/scenario/sample.rs`; this test only confirms the
2951    /// integration handshake works.
2952    #[test]
2953    fn series_projection_into_temporal_pattern_smoke_check() {
2954        // Empty series — every pattern should be vacuously ok.
2955        let series = SampleSeries::empty();
2956        let field = series.bpf("x", |snap| snap.var("missing").as_u64());
2957        let mut v = Verdict::new();
2958        field.nondecreasing(&mut v);
2959        let r = v.into_result();
2960        assert!(r.is_pass());
2961    }
2962
2963    // ---- Skip-on-projection-error semantics ----
2964
2965    /// nondecreasing skips errored samples, logs skip count, does
2966    /// NOT flip the verdict on missing data.
2967    #[test]
2968    fn nondecreasing_skips_projection_errors_with_note() {
2969        let tags = vec![
2970            "periodic_000".to_string(),
2971            "periodic_001".to_string(),
2972            "periodic_002".to_string(),
2973        ];
2974        let elapsed = vec![100u64, 200u64, 300u64];
2975        let values: Vec<SnapshotResult<u64>> = vec![
2976            Ok(1u64),
2977            Err(SnapshotError::VarNotFound {
2978                requested: "x".to_string(),
2979                available: vec![],
2980            }),
2981            Ok(2u64),
2982        ];
2983        let f = SeriesField::from_parts("counter", tags, elapsed, values);
2984        let mut v = Verdict::new();
2985        f.nondecreasing(&mut v);
2986        let r = v.into_result();
2987        assert!(
2988            r.is_pass(),
2989            "nondecreasing must NOT flip on projection error: {:?}",
2990            r.outcomes
2991        );
2992        assert!(
2993            r.info_notes
2994                .iter()
2995                .any(|n| n.message.contains("skipped 1 sample")
2996                    && n.message.contains("periodic_001")),
2997            "expected skip note: {:?}",
2998            r.info_notes
2999        );
3000    }
3001
3002    /// rate_within treats errored samples as gaps (no rate
3003    /// computed across the gap), records skip count via a Note.
3004    #[test]
3005    fn rate_within_skips_gaps_with_note() {
3006        let tags = vec![
3007            "periodic_000".to_string(),
3008            "periodic_001".to_string(),
3009            "periodic_002".to_string(),
3010        ];
3011        let elapsed = vec![100u64, 200u64, 300u64];
3012        let values: Vec<SnapshotResult<f64>> = vec![
3013            Ok(1.0f64),
3014            Err(SnapshotError::VarNotFound {
3015                requested: "x".to_string(),
3016                available: vec![],
3017            }),
3018            Ok(2.0f64),
3019        ];
3020        let f = SeriesField::from_parts("ticks", tags, elapsed, values);
3021        let mut v = Verdict::new();
3022        f.rate_within(&mut v, 0.0, 1.0);
3023        let r = v.into_result();
3024        assert!(
3025            r.is_pass(),
3026            "rate_within must NOT flip on gap: {:?}",
3027            r.outcomes
3028        );
3029        assert!(
3030            r.info_notes.iter().any(|n| n.message.contains("gap")),
3031            "expected gap note: {:?}",
3032            r.info_notes
3033        );
3034    }
3035
3036    /// REGRESSION: rate_within SKIPS an interval whose
3037    /// endpoint has no measured elapsed timestamp (`None`) — the dt is
3038    /// undefined, so the rate cannot be computed and must be skipped
3039    /// with a Note, NEVER fabricated from a `0`-coerced dt. The band
3040    /// `[0, 0.001]` is chosen so the bug-shape (coerce `None`→`0`, giving
3041    /// the b→c interval dt = 400 and rate (9-2)/400 = 0.0175) would
3042    /// FAIL; the correct skip leaves no computable interval and passes.
3043    #[test]
3044    fn rate_within_skips_none_elapsed_endpoint() {
3045        let tags = vec!["a".to_string(), "b".to_string(), "c".to_string()];
3046        // Sample `b` carries no recorded timestamp.
3047        let elapsed = vec![Some(100u64), None, Some(400u64)];
3048        let values: Vec<SnapshotResult<f64>> = vec![Ok(1.0), Ok(2.0), Ok(9.0)];
3049        let f =
3050            SeriesField::from_parts_with_phases_opt("ticks", tags, elapsed, values, vec![None; 3]);
3051        let mut v = Verdict::new();
3052        f.rate_within(&mut v, 0.0, 0.001);
3053        let r = v.into_result();
3054        assert!(
3055            r.is_pass(),
3056            "None-endpoint intervals must be skipped, not coerced into a band failure: {:?}",
3057            r.outcomes,
3058        );
3059        assert!(
3060            r.info_notes
3061                .iter()
3062                .any(|n| n.message.contains("unmeasured elapsed")),
3063            "the skipped None-endpoint interval(s) must surface a Note: {:?}",
3064            r.info_notes,
3065        );
3066    }
3067
3068    /// steady_within skips errored post-warmup samples, records a
3069    /// Note, does NOT flip the verdict on missing data.
3070    #[test]
3071    fn steady_within_skips_projection_errors_with_note() {
3072        let tags = vec![
3073            "periodic_000".to_string(),
3074            "periodic_001".to_string(),
3075            "periodic_002".to_string(),
3076        ];
3077        let elapsed = vec![300u64, 400u64, 500u64];
3078        let values: Vec<SnapshotResult<f64>> = vec![
3079            Ok(10.0f64),
3080            Err(SnapshotError::VarNotFound {
3081                requested: "x".to_string(),
3082                available: vec![],
3083            }),
3084            Ok(10.0f64),
3085        ];
3086        let f = SeriesField::from_parts("util", tags, elapsed, values);
3087        let mut v = Verdict::new();
3088        f.steady_within(&mut v, 0, 0.10);
3089        let r = v.into_result();
3090        assert!(r.is_pass(), "{:?}", r.outcomes);
3091        assert!(
3092            r.info_notes
3093                .iter()
3094                .any(|n| n.message.contains("skipped") && n.message.contains("periodic_001")),
3095            "expected skip note: {:?}",
3096            r.info_notes
3097        );
3098    }
3099
3100    /// ratio_within skips pairs where either side errored, records
3101    /// gap count, does NOT flip on missing data.
3102    #[test]
3103    fn ratio_within_skips_gaps_with_note() {
3104        let lhs_values: Vec<SnapshotResult<f64>> = vec![
3105            Ok(10.0f64),
3106            Err(SnapshotError::VarNotFound {
3107                requested: "x".to_string(),
3108                available: vec![],
3109            }),
3110            Ok(20.0f64),
3111        ];
3112        let rhs_values: Vec<SnapshotResult<f64>> = vec![Ok(5.0f64), Ok(7.0f64), Ok(10.0f64)];
3113        let tags = vec![
3114            "periodic_000".to_string(),
3115            "periodic_001".to_string(),
3116            "periodic_002".to_string(),
3117        ];
3118        let elapsed = vec![100u64, 200u64, 300u64];
3119        let lhs = SeriesField::from_parts("lhs", tags.clone(), elapsed.clone(), lhs_values);
3120        let rhs = SeriesField::from_parts("rhs", tags, elapsed, rhs_values);
3121        let mut v = Verdict::new();
3122        lhs.ratio_within(&mut v, &rhs, 1.5, 2.5);
3123        let r = v.into_result();
3124        assert!(r.is_pass(), "{:?}", r.outcomes);
3125        assert!(
3126            r.info_notes.iter().any(|n| n.message.contains("1 pair")),
3127            "expected gap note: {:?}",
3128            r.info_notes
3129        );
3130    }
3131
3132    /// A non-finite (NaN) post-warmup sample must not poison the mean
3133    /// into a silent PASS. With the NaN dropped from the band
3134    /// population (and noted), the mean is computed over the finite
3135    /// samples [10, 100] = 55, so both fall outside ±10% and the
3136    /// verdict FAILS — whereas the pre-fix code let the NaN make the
3137    /// mean NaN, the band NaN, and every band check vacuously pass.
3138    #[test]
3139    fn steady_within_nan_sample_does_not_silently_pass() {
3140        let tags = vec![
3141            "periodic_000".to_string(),
3142            "periodic_001".to_string(),
3143            "periodic_002".to_string(),
3144        ];
3145        let elapsed = vec![300u64, 400u64, 500u64];
3146        let values: Vec<SnapshotResult<f64>> = vec![Ok(10.0f64), Ok(f64::NAN), Ok(100.0f64)];
3147        let f = SeriesField::from_parts("util", tags, elapsed, values);
3148        let mut v = Verdict::new();
3149        f.steady_within(&mut v, 0, 0.10);
3150        let r = v.into_result();
3151        assert!(
3152            !r.is_pass(),
3153            "a NaN sample must not poison the band into a silent pass: {:?}",
3154            r.outcomes
3155        );
3156        assert!(
3157            r.info_notes
3158                .iter()
3159                .any(|n| n.message.contains("non-finite") && n.message.contains("periodic_001")),
3160            "expected non-finite skip note naming the NaN sample: {:?}",
3161            r.info_notes
3162        );
3163    }
3164
3165    /// A NaN lhs or rhs yields a non-finite ratio that the `rhs == 0`
3166    /// guard misses (NaN != 0.0); it must flip the verdict (a detail),
3167    /// not slip past the band check (NaN comparisons are always false)
3168    /// into a silent pass.
3169    #[test]
3170    fn ratio_within_non_finite_ratio_does_not_silently_pass() {
3171        let tags = vec!["periodic_000".to_string()];
3172        let elapsed = vec![100u64];
3173        // NaN numerator.
3174        {
3175            let lhs =
3176                SeriesField::from_parts("lhs", tags.clone(), elapsed.clone(), vec![Ok(f64::NAN)]);
3177            let rhs =
3178                SeriesField::from_parts("rhs", tags.clone(), elapsed.clone(), vec![Ok(5.0f64)]);
3179            let mut v = Verdict::new();
3180            lhs.ratio_within(&mut v, &rhs, 0.0, 1.0);
3181            let r = v.into_result();
3182            assert!(
3183                !r.is_pass(),
3184                "NaN lhs must not silently pass: {:?}",
3185                r.outcomes
3186            );
3187        }
3188        // NaN denominator (passes the rhs==0 guard, since NaN != 0.0).
3189        {
3190            let lhs =
3191                SeriesField::from_parts("lhs", tags.clone(), elapsed.clone(), vec![Ok(5.0f64)]);
3192            let rhs =
3193                SeriesField::from_parts("rhs", tags.clone(), elapsed.clone(), vec![Ok(f64::NAN)]);
3194            let mut v = Verdict::new();
3195            lhs.ratio_within(&mut v, &rhs, 0.0, 1.0);
3196            let r = v.into_result();
3197            assert!(
3198                !r.is_pass(),
3199                "NaN rhs must not silently pass: {:?}",
3200                r.outcomes
3201            );
3202        }
3203    }
3204
3205    /// converges_to with fewer than 3 successfully-projected
3206    /// samples in window records an explicit Note (not a verdict
3207    /// failure) — absence of data is a coverage gap, not a
3208    /// negative finding. The Note message names the count and the
3209    /// requirement so an operator can distinguish "did not collect
3210    /// enough samples" from "collected enough samples but never
3211    /// converged".
3212    #[test]
3213    fn converges_to_insufficient_samples_records_note() {
3214        let f = synthetic_field("load", vec![(100, 1.0f64), (200, 1.0f64)]);
3215        let mut v = Verdict::new();
3216        f.converges_to(&mut v, 1.0, 0.5, 1000);
3217        let r = v.into_result();
3218        assert!(
3219            r.is_pass(),
3220            "insufficient-samples must NOT flip the verdict: {:?}",
3221            r.outcomes
3222        );
3223        assert!(
3224            r.info_notes
3225                .iter()
3226                .any(|n| n.message.contains("insufficient samples")
3227                    && n.message.contains("need ≥3, have 2")),
3228            "expected insufficient-samples note with count: {:?}",
3229            r.info_notes
3230        );
3231    }
3232
3233    /// converges_to with 3+ samples in window but none in band
3234    /// produces the "no witness" structured failure (the
3235    /// pre-existing code path), distinct from the
3236    /// insufficient-samples message.
3237    #[test]
3238    fn converges_to_no_witness_distinct_from_insufficient() {
3239        let f = synthetic_field(
3240            "load",
3241            vec![
3242                (100, 10.0f64),
3243                (200, 10.0f64),
3244                (300, 10.0f64),
3245                (400, 10.0f64),
3246            ],
3247        );
3248        let mut v = Verdict::new();
3249        f.converges_to(&mut v, 1.0, 0.5, 1000);
3250        let r = v.into_result();
3251        assert!(r.is_fail());
3252        assert!(
3253            r.failure_details()
3254                .any(|d| d.message.contains("no 3-consecutive-in-band witness")),
3255            "expected no-witness message: {:?}",
3256            r.outcomes
3257        );
3258        assert!(
3259            !r.failure_details()
3260                .any(|d| d.message.contains("insufficient samples")),
3261            "must NOT report insufficient-samples when there ARE enough samples: {:?}",
3262            r.outcomes
3263        );
3264    }
3265
3266    // ---- NaN handling ----
3267
3268    /// each.at_least on NaN sample reports an incomparable
3269    /// failure rather than silently passing the comparison.
3270    /// Without the partial_cmp fix, IEEE-754 `<` against NaN
3271    /// is always false, so a NaN sample would silently pass
3272    /// `at_least(0.0)`.
3273    #[test]
3274    fn each_at_least_flags_nan_sample() {
3275        let f = synthetic_field("util", vec![(100, 50.0f64), (200, f64::NAN)]);
3276        let mut v = Verdict::new();
3277        f.each(&mut v).at_least(0.0f64);
3278        let r = v.into_result();
3279        assert!(r.is_fail());
3280        assert!(
3281            r.failure_details()
3282                .any(|d| d.message.contains("NaN") && d.message.contains("periodic_001")),
3283            "expected NaN failure naming the sample: {:?}",
3284            r.outcomes
3285        );
3286    }
3287
3288    /// each.at_most on NaN sample reports an incomparable failure.
3289    #[test]
3290    fn each_at_most_flags_nan_sample() {
3291        let f = synthetic_field("util", vec![(100, 50.0f64), (200, f64::NAN)]);
3292        let mut v = Verdict::new();
3293        f.each(&mut v).at_most(100.0f64);
3294        let r = v.into_result();
3295        assert!(r.is_fail());
3296        assert!(
3297            r.failure_details()
3298                .any(|d| d.message.contains("NaN") && d.message.contains("periodic_001")),
3299            "expected NaN failure naming the sample: {:?}",
3300            r.outcomes
3301        );
3302    }
3303
3304    /// each.between on NaN sample reports an incomparable failure.
3305    #[test]
3306    fn each_between_flags_nan_sample() {
3307        let f = synthetic_field("util", vec![(100, 50.0f64), (200, f64::NAN)]);
3308        let mut v = Verdict::new();
3309        f.each(&mut v).between(0.0f64, 100.0f64);
3310        let r = v.into_result();
3311        assert!(r.is_fail());
3312        assert!(
3313            r.failure_details()
3314                .any(|d| d.message.contains("NaN") && d.message.contains("periodic_001")),
3315            "expected NaN failure naming the sample: {:?}",
3316            r.outcomes
3317        );
3318    }
3319
3320    /// rate_within reports a non-finite-rate failure when the
3321    /// computed rate is NaN or Infinity (e.g. inf-inf endpoints,
3322    /// NaN in either endpoint, or a finite endpoint difference
3323    /// that overflows f64). Without the `rate.is_finite()` check,
3324    /// IEEE-754 `<` against NaN is always false and `<` against
3325    /// Inf trivially passes any finite ceiling, so non-finite
3326    /// rates would silently slip past the band check.
3327    #[test]
3328    fn rate_within_flags_non_finite_rate() {
3329        let f = synthetic_field("ticks", vec![(100, f64::INFINITY), (200, f64::INFINITY)]);
3330        let mut v = Verdict::new();
3331        f.rate_within(&mut v, 0.0, 1.0);
3332        let r = v.into_result();
3333        assert!(r.is_fail());
3334        assert!(
3335            r.failure_details()
3336                .any(|d| d.kind == DetailKind::Temporal && d.message.contains("non-finite rate")),
3337            "expected non-finite-rate failure: {:?}",
3338            r.outcomes
3339        );
3340    }
3341
3342    /// nondecreasing skips placeholder samples (is_placeholder=true)
3343    /// with a Note rather than treating them as monotonicity
3344    /// regressions or generic projection errors. Placeholder
3345    /// reports must NOT silently register as zero progress on a
3346    /// counter.
3347    #[test]
3348    fn nondecreasing_skips_placeholder_samples() {
3349        use crate::monitor::dump::FailureDumpReport;
3350        let report_a = FailureDumpReport::default(); // not a placeholder; will yield VarNotFound
3351        let placeholder = FailureDumpReport::placeholder("rendezvous timeout");
3352        let report_b = FailureDumpReport::default();
3353        let drained = vec![
3354            ("periodic_000".to_string(), report_a, None, Some(100u64)),
3355            ("periodic_001".to_string(), placeholder, None, Some(200u64)),
3356            ("periodic_002".to_string(), report_b, None, Some(300u64)),
3357        ];
3358        let series = SampleSeries::from_drained(drained, None);
3359        // Project a missing var so non-placeholder samples also
3360        // produce errors — but the placeholder sample's Err must
3361        // be the dedicated PlaceholderSample variant. The skip-
3362        // with-Note path collects all skipped samples; we verify
3363        // the placeholder tag appears in the skip list.
3364        let field: SeriesField<u64> = series.bpf("counter", |snap| snap.var("missing").as_u64());
3365        let mut v = Verdict::new();
3366        field.nondecreasing(&mut v);
3367        let r = v.into_result();
3368        // Verdict passes (nondecreasing skips errored samples).
3369        assert!(r.is_pass(), "{:?}", r.outcomes);
3370        // The note message names the placeholder sample.
3371        assert!(
3372            r.info_notes
3373                .iter()
3374                .any(|n| n.message.contains("periodic_001")),
3375            "expected skip note naming placeholder sample: {:?}",
3376            r.info_notes
3377        );
3378    }
3379
3380    /// nondecreasing skips MissingStats samples (stats=None at the
3381    /// row, surfaced through `series.stats(...)` as the dedicated
3382    /// `SnapshotError::MissingStats` variant) with a Note rather
3383    /// than treating them as monotonicity regressions. Mirrors
3384    /// `nondecreasing_skips_placeholder_samples` for the stats-
3385    /// coverage gap dimension: a per-sample missing-stats slot must
3386    /// NOT silently register as zero progress on a counter, and the
3387    /// skip-with-Note path must name the offending sample so the
3388    /// operator sees WHICH sample lacked stats.
3389    #[test]
3390    fn nondecreasing_skips_missing_stats_samples() {
3391        use crate::monitor::dump::FailureDumpReport;
3392        // Build three rows where sample[1]'s stats Option is None.
3393        // `series.stats(...)` projection will produce a per-sample
3394        // `Err(SnapshotError::MissingStats { tag: "periodic_001" })`
3395        // for that row (see SampleSeries::stats at
3396        // src/scenario/sample/stats.rs lines 65-67) — the analogue of
3397        // the placeholder path producing PlaceholderSample. The
3398        // outer rows carry concrete JSON so their projection slot
3399        // is Ok; only the middle row exercises the MissingStats
3400        // skip path.
3401        let stats_a: serde_json::Value = serde_json::json!({"counter": 1u64});
3402        let stats_b: serde_json::Value = serde_json::json!({"counter": 2u64});
3403        let drained = vec![
3404            (
3405                "periodic_000".to_string(),
3406                FailureDumpReport::default(),
3407                Some(stats_a),
3408                Some(100u64),
3409            ),
3410            (
3411                "periodic_001".to_string(),
3412                FailureDumpReport::default(),
3413                None,
3414                Some(200u64),
3415            ),
3416            (
3417                "periodic_002".to_string(),
3418                FailureDumpReport::default(),
3419                Some(stats_b),
3420                Some(300u64),
3421            ),
3422        ];
3423        let series = SampleSeries::from_drained(drained, None);
3424        let field: SeriesField<u64> = series.stats("counter", |sv| sv.get("counter").as_u64());
3425        // Sanity-check the constructed field's middle slot is
3426        // exactly the MissingStats variant the spec calls out, so a
3427        // future refactor that drops or renames the variant fails
3428        // here at the construction site rather than as an opaque
3429        // verdict mismatch.
3430        let middle = field.values_iter().nth(1).expect("3 samples");
3431        assert!(
3432            matches!(
3433                middle,
3434                Err(SnapshotError::MissingStats { tag, .. }) if tag == "periodic_001"
3435            ),
3436            "middle slot must be MissingStats('periodic_001'), got {middle:?}"
3437        );
3438        let mut v = Verdict::new();
3439        field.nondecreasing(&mut v);
3440        let r = v.into_result();
3441        // Verdict passes — MissingStats is structurally missing
3442        // data, not a monotonicity regression.
3443        assert!(
3444            r.is_pass(),
3445            "nondecreasing must NOT flip on MissingStats: {:?}",
3446            r.outcomes
3447        );
3448        // The note message names the MissingStats sample so the
3449        // operator sees the stats-coverage gap without re-walking
3450        // the source.
3451        assert!(
3452            r.info_notes
3453                .iter()
3454                .any(|n| n.message.contains("periodic_001")),
3455            "expected skip note naming MissingStats sample: {:?}",
3456            r.info_notes
3457        );
3458    }
3459
3460    /// `always_true` emits a `tracing::info!` event under the
3461    /// `ktstr::assert::temporal` target when log_passes is on AND
3462    /// the pattern adds no failure details. Mirrors the scalar
3463    /// claim's positive-confirmation contract at the temporal-
3464    /// pattern level: a passing run names the label and the
3465    /// sample count so a `--nocapture` operator sees the
3466    /// confirmation rather than silent acceptance.
3467    #[tracing_test::traced_test]
3468    #[test]
3469    fn always_true_emits_pass_log_when_log_passes_on() {
3470        let f = synthetic_field("alive", vec![(100, true), (200, true), (300, true)]);
3471        let mut v = Verdict::new().with_log_passes(true);
3472        f.always_true(&mut v);
3473        assert!(v.is_pass());
3474        assert!(
3475            logs_contain("alive (always_true): all 3 samples true"),
3476            "positive-confirmation log must name the label, pattern, and sample count",
3477        );
3478    }
3479
3480    /// A failed temporal pattern stays silent on the positive
3481    /// log even when log_passes is on — the pre/post
3482    /// `temporal_outcome_count` gate ensures a partial-failure
3483    /// run does not log a misleading "all passed" event.
3484    #[tracing_test::traced_test]
3485    #[test]
3486    fn always_true_silent_on_fail_arm_even_with_log_passes() {
3487        let f = synthetic_field("alive", vec![(100, true), (200, false)]);
3488        let mut v = Verdict::new().with_log_passes(true);
3489        f.always_true(&mut v);
3490        assert!(!v.is_pass());
3491        assert!(
3492            !logs_contain("samples true"),
3493            "fail arm must NOT emit the positive-confirmation log",
3494        );
3495    }
3496
3497    // ---------- for_each_phase + aggregate_by_phase ----------
3498
3499    #[test]
3500    fn series_field_for_each_phase_invokes_closure_per_phase_in_phase_order() {
3501        let f = SeriesField::<f64>::from_parts_with_phases(
3502            "x",
3503            vec!["t0".into(), "t1".into(), "t2".into(), "t3".into()],
3504            vec![100, 200, 300, 400],
3505            vec![Ok(10.0), Ok(20.0), Ok(30.0), Ok(40.0)],
3506            vec![
3507                Some(crate::assert::Phase::step(1)),
3508                Some(crate::assert::Phase::BASELINE),
3509                Some(crate::assert::Phase::step(0)),
3510                Some(crate::assert::Phase::step(0)),
3511            ],
3512        );
3513        let mut visited: Vec<(crate::assert::Phase, usize)> = Vec::new();
3514        f.for_each_phase(|phase, samples| {
3515            visited.push((phase, samples.len()));
3516        });
3517        // BTreeMap key order: BASELINE (0) < Step[0] (1) < Step[1] (2)
3518        assert_eq!(
3519            visited,
3520            vec![
3521                (crate::assert::Phase::BASELINE, 1),
3522                (crate::assert::Phase::step(0), 2),
3523                (crate::assert::Phase::step(1), 1),
3524            ],
3525            "for_each_phase must iterate phases in BTreeMap (Phase) order",
3526        );
3527    }
3528
3529    #[test]
3530    fn series_field_for_each_phase_skips_none_phase_samples() {
3531        let f = SeriesField::<f64>::from_parts_with_phases(
3532            "x",
3533            vec!["t0".into(), "t1".into()],
3534            vec![100, 200],
3535            vec![Ok(1.0), Ok(2.0)],
3536            vec![None, Some(crate::assert::Phase::step(0))],
3537        );
3538        let mut visited: Vec<crate::assert::Phase> = Vec::new();
3539        f.for_each_phase(|phase, _| visited.push(phase));
3540        assert_eq!(visited, vec![crate::assert::Phase::step(0)]);
3541    }
3542
3543    /// Helper: find a registered MetricDef by name from `crate::stats::METRICS`.
3544    fn metric_by_name(name: &str) -> &'static crate::stats::MetricDef {
3545        crate::stats::METRICS
3546            .iter()
3547            .find(|m| m.name == name)
3548            .unwrap_or_else(|| panic!("no MetricDef named '{}' in METRICS", name))
3549    }
3550
3551    #[test]
3552    fn series_field_aggregate_by_phase_routes_counter_through_last_minus_first() {
3553        // Use a registered Counter metric. `total_migrations` is
3554        // MetricKind::Counter per stats.rs METRICS.
3555        let metric = metric_by_name("total_migrations");
3556        assert!(matches!(metric.kind, crate::stats::MetricKind::Counter));
3557        let f = SeriesField::<f64>::from_parts_with_phases(
3558            "x",
3559            vec!["t0".into(), "t1".into(), "t2".into()],
3560            vec![100, 200, 300],
3561            vec![Ok(100.0), Ok(150.0), Ok(175.0)],
3562            vec![
3563                Some(crate::assert::Phase::step(0)),
3564                Some(crate::assert::Phase::step(0)),
3565                Some(crate::assert::Phase::step(0)),
3566            ],
3567        );
3568        let agg = f.aggregate_by_phase(metric);
3569        assert_eq!(agg.len(), 1, "1 distinct phase");
3570        // Counter last-minus-first: 175 - 100 = 75 (NOT the flat-run sum 425)
3571        assert_eq!(
3572            agg[&crate::assert::Phase::step(0)],
3573            75.0,
3574            "Counter routes through phase_counter_delta (last-first), not the flat-run sum aggregate_samples",
3575        );
3576    }
3577
3578    /// `sum_by_phase` totals per-read DELTAS per phase — the
3579    /// delta-reported scx_stats reduction. Each sample is its own
3580    /// window's count, so the per-phase total is their SUM, NOT a
3581    /// Counter last-minus-first (which would difference two deltas).
3582    #[test]
3583    fn series_field_sum_by_phase_sums_per_read_deltas_per_phase() {
3584        let f = SeriesField::<f64>::from_parts_with_phases(
3585            "x",
3586            vec![
3587                "t0".into(),
3588                "t1".into(),
3589                "t2".into(),
3590                "t3".into(),
3591                "t4".into(),
3592            ],
3593            vec![100, 200, 300, 400, 500],
3594            vec![Ok(10.0), Ok(20.0), Ok(5.0), Ok(100.0), Ok(50.0)],
3595            vec![
3596                Some(crate::assert::Phase::step(0)),
3597                Some(crate::assert::Phase::step(0)),
3598                Some(crate::assert::Phase::step(0)),
3599                Some(crate::assert::Phase::step(1)),
3600                Some(crate::assert::Phase::step(1)),
3601            ],
3602        );
3603        let sums = f.sum_by_phase();
3604        assert_eq!(sums.len(), 2, "two distinct phases");
3605        assert_eq!(
3606            sums[&crate::assert::Phase::step(0)],
3607            35.0,
3608            "Step[0] is the SUM of its per-read deltas (10+20+5=35), NOT a \
3609             last-minus-first (5-10)",
3610        );
3611        assert_eq!(
3612            sums[&crate::assert::Phase::step(1)],
3613            150.0,
3614            "Step[1] = 100 + 50",
3615        );
3616    }
3617
3618    #[test]
3619    fn series_field_aggregate_by_phase_routes_gauge_through_flat_run_aggregator() {
3620        // `worst_spread` is MetricKind::Gauge(GaugeAgg::Last) per stats.rs METRICS.
3621        let metric = metric_by_name("worst_spread");
3622        assert!(matches!(metric.kind, crate::stats::MetricKind::Gauge(_)));
3623        let f = SeriesField::<f64>::from_parts_with_phases(
3624            "x",
3625            vec!["t0".into(), "t1".into(), "t2".into()],
3626            vec![100, 200, 300],
3627            vec![Ok(2.0), Ok(4.0), Ok(6.0)],
3628            vec![
3629                Some(crate::assert::Phase::step(0)),
3630                Some(crate::assert::Phase::step(0)),
3631                Some(crate::assert::Phase::step(0)),
3632            ],
3633        );
3634        let agg = f.aggregate_by_phase(metric);
3635        // Gauge(Last) returns the last finite sample.
3636        assert_eq!(agg[&crate::assert::Phase::step(0)], 6.0);
3637    }
3638
3639    #[test]
3640    fn series_field_aggregate_by_phase_skips_phases_with_no_finite_samples() {
3641        let metric = metric_by_name("worst_spread");
3642        let f = SeriesField::<f64>::from_parts_with_phases(
3643            "x",
3644            vec!["t0".into(), "t1".into()],
3645            vec![100, 200],
3646            vec![
3647                Err(crate::scenario::snapshot::SnapshotError::MissingStats {
3648                    tag: "t0".into(),
3649                    reason: crate::scenario::snapshot::MissingStatsReason::NoSchedulerBinary,
3650                }),
3651                Ok(5.0),
3652            ],
3653            vec![
3654                Some(crate::assert::Phase::step(0)),
3655                Some(crate::assert::Phase::step(1)),
3656            ],
3657        );
3658        let agg = f.aggregate_by_phase(metric);
3659        assert!(
3660            !agg.contains_key(&crate::assert::Phase::step(0)),
3661            "phase with all-Err samples absent",
3662        );
3663        assert_eq!(agg[&crate::assert::Phase::step(1)], 5.0);
3664    }
3665
3666    // ---------- SeriesField phase column ----------
3667
3668    #[test]
3669    fn series_field_from_parts_defaults_phases_to_all_none() {
3670        let f = SeriesField::<f64>::from_parts(
3671            "x",
3672            vec!["t0".into(), "t1".into()],
3673            vec![100, 200],
3674            vec![Ok(1.0), Ok(2.0)],
3675        );
3676        let phases: Vec<_> = f.phases_iter().collect();
3677        assert_eq!(phases, vec![None, None]);
3678    }
3679
3680    #[test]
3681    fn series_field_from_parts_with_phases_preserves_per_sample_phase() {
3682        let f = SeriesField::<f64>::from_parts_with_phases(
3683            "x",
3684            vec!["t0".into(), "t1".into(), "t2".into()],
3685            vec![100, 200, 300],
3686            vec![Ok(1.0), Ok(2.0), Ok(3.0)],
3687            vec![
3688                Some(crate::assert::Phase::BASELINE),
3689                Some(crate::assert::Phase::step(0)),
3690                Some(crate::assert::Phase::step(1)),
3691            ],
3692        );
3693        let phases: Vec<_> = f.phases_iter().collect();
3694        assert_eq!(
3695            phases,
3696            vec![
3697                Some(crate::assert::Phase::BASELINE),
3698                Some(crate::assert::Phase::step(0)),
3699                Some(crate::assert::Phase::step(1)),
3700            ],
3701        );
3702    }
3703
3704    #[test]
3705    #[should_panic(expected = "assertion `left == right` failed")]
3706    fn series_field_from_parts_with_phases_rejects_length_mismatch() {
3707        let _ = SeriesField::<f64>::from_parts_with_phases(
3708            "x",
3709            vec!["t0".into(), "t1".into()],
3710            vec![100, 200],
3711            vec![Ok(1.0), Ok(2.0)],
3712            vec![Some(crate::assert::Phase::BASELINE)], // 1 != 2 values
3713        );
3714    }
3715
3716    #[test]
3717    fn series_field_by_phase_partitions_into_per_phase_buckets() {
3718        let f = SeriesField::<f64>::from_parts_with_phases(
3719            "x",
3720            vec!["t0".into(), "t1".into(), "t2".into(), "t3".into()],
3721            vec![100, 200, 300, 400],
3722            vec![Ok(10.0), Ok(20.0), Ok(30.0), Ok(40.0)],
3723            vec![
3724                Some(crate::assert::Phase::BASELINE),
3725                Some(crate::assert::Phase::step(0)),
3726                Some(crate::assert::Phase::step(0)),
3727                Some(crate::assert::Phase::step(1)),
3728            ],
3729        );
3730        let (by_phase, none_bucket) = f.by_phase();
3731        assert!(
3732            none_bucket.is_empty(),
3733            "no None-phase samples in this fixture"
3734        );
3735        assert_eq!(
3736            by_phase.len(),
3737            3,
3738            "3 distinct phases: BASELINE, Step[0], Step[1]"
3739        );
3740        assert_eq!(by_phase[&crate::assert::Phase::BASELINE].len(), 1);
3741        assert_eq!(by_phase[&crate::assert::Phase::step(0)].len(), 2);
3742        assert_eq!(by_phase[&crate::assert::Phase::step(1)].len(), 1);
3743    }
3744
3745    #[test]
3746    fn series_field_by_phase_collects_none_samples_in_separate_bucket() {
3747        let f = SeriesField::<f64>::from_parts_with_phases(
3748            "x",
3749            vec!["t0".into(), "t1".into()],
3750            vec![100, 200],
3751            vec![Ok(1.0), Ok(2.0)],
3752            vec![None, Some(crate::assert::Phase::step(0))],
3753        );
3754        let (by_phase, none_bucket) = f.by_phase();
3755        assert_eq!(none_bucket.len(), 1, "1 None-phase sample");
3756        assert_eq!(by_phase.len(), 1, "1 phase bucket");
3757        assert_eq!(by_phase[&crate::assert::Phase::step(0)].len(), 1);
3758    }
3759
3760    // ---------- phase() / value_at_phase() / last_per_phase() / ratio_across_phases() ----------
3761
3762    #[test]
3763    fn phase_returns_only_samples_in_named_phase() {
3764        let f = SeriesField::<u64>::from_parts_with_phases(
3765            "ticks",
3766            vec!["t0".into(), "t1".into(), "t2".into(), "t3".into()],
3767            vec![100, 200, 300, 400],
3768            vec![Ok(1), Ok(2), Ok(3), Ok(4)],
3769            vec![
3770                Some(crate::assert::Phase::BASELINE),
3771                Some(crate::assert::Phase::step(0)),
3772                Some(crate::assert::Phase::step(0)),
3773                Some(crate::assert::Phase::step(1)),
3774            ],
3775        );
3776        let step0: Vec<_> = f
3777            .phase(crate::assert::Phase::step(0))
3778            .into_iter()
3779            .map(|(_, _, v)| v.as_ref().copied().ok())
3780            .collect();
3781        assert_eq!(step0, vec![Some(2), Some(3)]);
3782        let step2 = f.phase(crate::assert::Phase::step(2));
3783        assert!(
3784            step2.is_empty(),
3785            "phase with no samples must return empty Vec, got {step2:?}",
3786        );
3787    }
3788
3789    #[test]
3790    fn value_at_phase_returns_last_ok_for_phase() {
3791        let f = SeriesField::<u64>::from_parts_with_phases(
3792            "ticks",
3793            vec!["t0".into(), "t1".into(), "t2".into()],
3794            vec![100, 200, 300],
3795            vec![Ok(10), Ok(20), Ok(30)],
3796            vec![
3797                Some(crate::assert::Phase::step(0)),
3798                Some(crate::assert::Phase::step(0)),
3799                Some(crate::assert::Phase::step(1)),
3800            ],
3801        );
3802        assert_eq!(
3803            f.value_at_phase(crate::assert::Phase::step(0)),
3804            Some(20),
3805            "value_at_phase returns the LAST Ok-sample for the phase",
3806        );
3807        assert_eq!(f.value_at_phase(crate::assert::Phase::step(1)), Some(30),);
3808        assert_eq!(
3809            f.value_at_phase(crate::assert::Phase::step(2)),
3810            None,
3811            "phase with no samples returns None",
3812        );
3813    }
3814
3815    #[test]
3816    fn value_at_phase_skips_err_samples_within_phase() {
3817        let f = SeriesField::<u64>::from_parts_with_phases(
3818            "ticks",
3819            vec!["t0".into(), "t1".into()],
3820            vec![100, 200],
3821            vec![
3822                Ok(7),
3823                Err(SnapshotError::VarNotFound {
3824                    requested: "x".into(),
3825                    available: vec![],
3826                }),
3827            ],
3828            vec![
3829                Some(crate::assert::Phase::step(0)),
3830                Some(crate::assert::Phase::step(0)),
3831            ],
3832        );
3833        assert_eq!(
3834            f.value_at_phase(crate::assert::Phase::step(0)),
3835            Some(7),
3836            "Err in same phase must be skipped; last Ok wins",
3837        );
3838    }
3839
3840    #[test]
3841    fn last_per_phase_returns_last_ok_per_present_phase() {
3842        let f = SeriesField::<u64>::from_parts_with_phases(
3843            "ticks",
3844            vec!["t0".into(), "t1".into(), "t2".into(), "t3".into()],
3845            vec![100, 200, 300, 400],
3846            vec![Ok(1), Ok(2), Ok(3), Ok(4)],
3847            vec![
3848                Some(crate::assert::Phase::BASELINE),
3849                Some(crate::assert::Phase::step(0)),
3850                Some(crate::assert::Phase::step(0)),
3851                Some(crate::assert::Phase::step(1)),
3852            ],
3853        );
3854        let m = f.last_per_phase();
3855        assert_eq!(m.len(), 3, "BASELINE + Step[0] + Step[1]");
3856        assert_eq!(m[&crate::assert::Phase::BASELINE], 1);
3857        assert_eq!(m[&crate::assert::Phase::step(0)], 3);
3858        assert_eq!(m[&crate::assert::Phase::step(1)], 4);
3859    }
3860
3861    #[test]
3862    fn last_per_phase_omits_phases_with_only_err_samples() {
3863        let f = SeriesField::<u64>::from_parts_with_phases(
3864            "ticks",
3865            vec!["t0".into(), "t1".into()],
3866            vec![100, 200],
3867            vec![
3868                Err(SnapshotError::VarNotFound {
3869                    requested: "x".into(),
3870                    available: vec![],
3871                }),
3872                Ok(9),
3873            ],
3874            vec![
3875                Some(crate::assert::Phase::step(0)),
3876                Some(crate::assert::Phase::step(1)),
3877            ],
3878        );
3879        let m = f.last_per_phase();
3880        assert!(
3881            !m.contains_key(&crate::assert::Phase::step(0)),
3882            "all-Err phase omitted from last_per_phase, got keys {:?}",
3883            m.keys().collect::<Vec<_>>(),
3884        );
3885        assert_eq!(m[&crate::assert::Phase::step(1)], 9);
3886    }
3887
3888    #[test]
3889    fn first_per_phase_returns_first_ok_per_present_phase() {
3890        let f = SeriesField::<u64>::from_parts_with_phases(
3891            "ticks",
3892            vec!["t0".into(), "t1".into(), "t2".into(), "t3".into()],
3893            vec![100, 200, 300, 400],
3894            vec![Ok(10), Ok(20), Ok(30), Ok(40)],
3895            vec![
3896                Some(crate::assert::Phase::BASELINE),
3897                Some(crate::assert::Phase::step(0)),
3898                Some(crate::assert::Phase::step(0)),
3899                Some(crate::assert::Phase::step(1)),
3900            ],
3901        );
3902        let m = f.first_per_phase();
3903        assert_eq!(m.len(), 3);
3904        assert_eq!(m[&crate::assert::Phase::BASELINE], 10);
3905        assert_eq!(
3906            m[&crate::assert::Phase::step(0)],
3907            20,
3908            "first Ok in Step[0] is t1=20, NOT t2=30",
3909        );
3910        assert_eq!(m[&crate::assert::Phase::step(1)], 40);
3911    }
3912
3913    #[test]
3914    fn first_per_phase_skips_leading_err_samples_within_phase() {
3915        let f = SeriesField::<u64>::from_parts_with_phases(
3916            "ticks",
3917            vec!["t0".into(), "t1".into()],
3918            vec![100, 200],
3919            vec![
3920                Err(SnapshotError::VarNotFound {
3921                    requested: "x".into(),
3922                    available: vec![],
3923                }),
3924                Ok(7),
3925            ],
3926            vec![
3927                Some(crate::assert::Phase::step(0)),
3928                Some(crate::assert::Phase::step(0)),
3929            ],
3930        );
3931        assert_eq!(
3932            f.first_per_phase()[&crate::assert::Phase::step(0)],
3933            7,
3934            "leading Err in phase must be skipped; first Ok wins",
3935        );
3936    }
3937
3938    #[test]
3939    fn counter_delta_per_phase_regressed_phase_reports_zero_instead_of_panicking() {
3940        // Picker drift (or upstream-signal rollback) within a phase
3941        // produces `last < first`. The reducer must not panic; it
3942        // reports zero progress for that phase and emits a tracing
3943        // warn (verified separately via test_log subscribers — here we
3944        // pin the no-panic + zero contract).
3945        let f = SeriesField::<u64>::from_parts_with_phases(
3946            "ticks",
3947            vec!["t0".into(), "t1".into(), "t2".into(), "t3".into()],
3948            vec![100, 200, 300, 400],
3949            // Phase 0: monotonic 100 -> 150 (delta 50).
3950            // Phase 1: regressed 1000 -> 200 (would panic in debug
3951            // pre-fix, wrap in release).
3952            vec![Ok(100), Ok(150), Ok(1000), Ok(200)],
3953            vec![
3954                Some(crate::assert::Phase::step(0)),
3955                Some(crate::assert::Phase::step(0)),
3956                Some(crate::assert::Phase::step(1)),
3957                Some(crate::assert::Phase::step(1)),
3958            ],
3959        );
3960        let m = f.counter_delta_per_phase();
3961        assert_eq!(m.len(), 2);
3962        assert_eq!(m[&crate::assert::Phase::step(0)], 50);
3963        assert_eq!(
3964            m[&crate::assert::Phase::step(1)],
3965            0,
3966            "regressed phase must yield 0 (no progress measurable) \
3967             rather than panicking on the underflowed subtraction",
3968        );
3969    }
3970
3971    #[test]
3972    fn counter_delta_per_phase_subtracts_first_from_last() {
3973        let f = SeriesField::<u64>::from_parts_with_phases(
3974            "ticks",
3975            vec!["t0".into(), "t1".into(), "t2".into(), "t3".into()],
3976            vec![100, 200, 300, 400],
3977            vec![Ok(100), Ok(150), Ok(180), Ok(200)],
3978            vec![
3979                Some(crate::assert::Phase::step(0)),
3980                Some(crate::assert::Phase::step(0)),
3981                Some(crate::assert::Phase::step(1)),
3982                Some(crate::assert::Phase::step(1)),
3983            ],
3984        );
3985        let m = f.counter_delta_per_phase();
3986        assert_eq!(m.len(), 2);
3987        assert_eq!(
3988            m[&crate::assert::Phase::step(0)],
3989            50,
3990            "Step[0]: last(150) - first(100) = 50",
3991        );
3992        assert_eq!(
3993            m[&crate::assert::Phase::step(1)],
3994            20,
3995            "Step[1]: last(200) - first(180) = 20 (NOT 200 - 100 = 100; \
3996             prior-phase accumulation excluded)",
3997        );
3998    }
3999
4000    #[test]
4001    fn counter_delta_per_phase_single_sample_phase_yields_zero() {
4002        let f = SeriesField::<u64>::from_parts_with_phases(
4003            "ticks",
4004            vec!["t0".into()],
4005            vec![100],
4006            vec![Ok(42)],
4007            vec![Some(crate::assert::Phase::step(0))],
4008        );
4009        let m = f.counter_delta_per_phase();
4010        assert_eq!(
4011            m[&crate::assert::Phase::step(0)],
4012            0,
4013            "single Ok sample: first == last → delta of zero",
4014        );
4015    }
4016
4017    #[test]
4018    fn counter_delta_per_phase_omits_phases_with_only_err_samples() {
4019        let f = SeriesField::<u64>::from_parts_with_phases(
4020            "ticks",
4021            vec!["t0".into(), "t1".into()],
4022            vec![100, 200],
4023            vec![
4024                Err(SnapshotError::VarNotFound {
4025                    requested: "x".into(),
4026                    available: vec![],
4027                }),
4028                Ok(5),
4029            ],
4030            vec![
4031                Some(crate::assert::Phase::step(0)),
4032                Some(crate::assert::Phase::step(1)),
4033            ],
4034        );
4035        let m = f.counter_delta_per_phase();
4036        assert!(
4037            !m.contains_key(&crate::assert::Phase::step(0)),
4038            "all-Err phase omitted from counter_delta_per_phase",
4039        );
4040        assert_eq!(
4041            m[&crate::assert::Phase::step(1)],
4042            0,
4043            "single-Ok phase: first(5) == last(5) → delta of zero",
4044        );
4045    }
4046
4047    #[test]
4048    fn counter_delta_per_phase_composes_with_a_b_ratio() {
4049        // Real-world composition exercise: two cumulative counters
4050        // (same / cross) projected separately, folded per-phase
4051        // into a fraction, then compared across phases. Pins the
4052        // load-bearing usage pattern that motivated the reducer.
4053        let tags: Vec<String> = (0..4).map(|i| format!("p{i}")).collect();
4054        let elapsed = vec![100, 200, 300, 400];
4055        let phases = vec![
4056            Some(crate::assert::Phase::step(0)),
4057            Some(crate::assert::Phase::step(0)),
4058            Some(crate::assert::Phase::step(1)),
4059            Some(crate::assert::Phase::step(1)),
4060        ];
4061        // same: 1000 → 1200 in step(0), 1200 → 1800 in step(1)
4062        let same = SeriesField::<u64>::from_parts_with_phases(
4063            "same",
4064            tags.clone(),
4065            elapsed.clone(),
4066            vec![Ok(1000), Ok(1200), Ok(1200), Ok(1800)],
4067            phases.clone(),
4068        );
4069        // cross: 100 → 200 in step(0), 200 → 300 in step(1)
4070        let cross = SeriesField::<u64>::from_parts_with_phases(
4071            "cross",
4072            tags,
4073            elapsed,
4074            vec![Ok(100), Ok(200), Ok(200), Ok(300)],
4075            phases,
4076        );
4077        let same_d = same.counter_delta_per_phase();
4078        let cross_d = cross.counter_delta_per_phase();
4079        let cross_frac = |p: crate::assert::Phase| -> f64 {
4080            let s = same_d[&p] as f64;
4081            let c = cross_d[&p] as f64;
4082            c / (s + c)
4083        };
4084        // Step[0]: 100 / (200 + 100) = 0.333...
4085        // Step[1]: 100 / (600 + 100) = 0.143
4086        let f0 = cross_frac(crate::assert::Phase::step(0));
4087        let f1 = cross_frac(crate::assert::Phase::step(1));
4088        assert!((f0 - 0.333333).abs() < 1e-4, "Step[0] cross_frac = {f0}");
4089        assert!((f1 - 0.142857).abs() < 1e-4, "Step[1] cross_frac = {f1}");
4090        let ratio = f1 / f0;
4091        assert!(
4092            ratio < 0.5,
4093            "phase-delta cross_frac ratio {ratio} should be well below 0.5 — \
4094             prior-phase accumulation would have inflated phase 1's reading",
4095        );
4096    }
4097
4098    #[test]
4099    fn ratio_across_phases_pass_records_info_note() {
4100        // Step[0] = 100, Step[1] = 50 → ratio 0.5, ceiling 0.85 ⇒ pass.
4101        let f = SeriesField::<f64>::from_parts_with_phases(
4102            "dispatches",
4103            vec!["t0".into(), "t1".into()],
4104            vec![100, 200],
4105            vec![Ok(100.0), Ok(50.0)],
4106            vec![
4107                Some(crate::assert::Phase::step(0)),
4108                Some(crate::assert::Phase::step(1)),
4109            ],
4110        );
4111        let mut v = Verdict::new();
4112        f.ratio_across_phases(
4113            &mut v,
4114            crate::assert::Phase::step(0),
4115            crate::assert::Phase::step(1),
4116        )
4117        .at_most(0.85);
4118        let r = v.into_result();
4119        assert!(
4120            r.is_pass(),
4121            "expected pass, got outcomes={:?} details={:?}",
4122            r.outcomes,
4123            r.failure_details().collect::<Vec<_>>(),
4124        );
4125        assert!(
4126            r.info_notes.iter().any(|n| n.message.contains("dispatches")
4127                && n.message.contains("50/100")
4128                && n.message.contains("0.5000")
4129                && n.message.contains("ceiling 0.8500")),
4130            "expected pass info note carrying ratio + ceiling, got {:?}",
4131            r.info_notes,
4132        );
4133    }
4134
4135    /// A non-finite phase value must not let `ratio_across_phases().at_most()`
4136    /// silently pass: a NaN `later` makes later/earlier = NaN, the
4137    /// `earlier == 0.0` guard misses it (NaN != 0.0), and raw
4138    /// `ratio > ceiling` is false for NaN — so without the non-finite
4139    /// guard the pair would PASS. Same class as the ratio_within /
4140    /// steady_within NaN fixes.
4141    #[test]
4142    fn ratio_across_phases_non_finite_does_not_silently_pass() {
4143        let f = SeriesField::<f64>::from_parts_with_phases(
4144            "dispatches",
4145            vec!["t0".into(), "t1".into()],
4146            vec![100, 200],
4147            vec![Ok(100.0f64), Ok(f64::NAN)],
4148            vec![
4149                Some(crate::assert::Phase::step(0)),
4150                Some(crate::assert::Phase::step(1)),
4151            ],
4152        );
4153        let mut v = Verdict::new();
4154        f.ratio_across_phases(
4155            &mut v,
4156            crate::assert::Phase::step(0),
4157            crate::assert::Phase::step(1),
4158        )
4159        .at_most(0.85);
4160        let r = v.into_result();
4161        assert!(
4162            !r.is_pass(),
4163            "a non-finite phase ratio must not silently pass: outcomes={:?}",
4164            r.outcomes,
4165        );
4166        assert!(
4167            r.failure_details()
4168                .any(|d| d.message.contains("non-finite")),
4169            "expected a non-finite failure detail: {:?}",
4170            r.failure_details().collect::<Vec<_>>(),
4171        );
4172    }
4173
4174    #[test]
4175    fn ratio_across_phases_failure_records_detail_with_ratio() {
4176        // Step[0] = 10, Step[1] = 20 → ratio 2.0, ceiling 0.85 ⇒ fail.
4177        let f = SeriesField::<f64>::from_parts_with_phases(
4178            "dispatches",
4179            vec!["t0".into(), "t1".into()],
4180            vec![100, 200],
4181            vec![Ok(10.0), Ok(20.0)],
4182            vec![
4183                Some(crate::assert::Phase::step(0)),
4184                Some(crate::assert::Phase::step(1)),
4185            ],
4186        );
4187        let mut v = Verdict::new();
4188        f.ratio_across_phases(
4189            &mut v,
4190            crate::assert::Phase::step(0),
4191            crate::assert::Phase::step(1),
4192        )
4193        .at_most(0.85);
4194        let r = v.into_result();
4195        assert!(r.is_fail(), "expected fail, got outcomes={:?}", r.outcomes);
4196        assert!(
4197            r.failure_details().any(|d| d.kind == DetailKind::Temporal
4198                && d.message.contains("dispatches")
4199                && d.message.contains("20/10")
4200                && d.message.contains("2.0000")
4201                && d.message.contains("ceiling 0.8500")),
4202            "expected fail detail carrying ratio + ceiling, got {:?}",
4203            r.failure_details().collect::<Vec<_>>(),
4204        );
4205    }
4206
4207    #[test]
4208    fn ratio_across_phases_missing_phase_is_inconclusive_with_clear_detail() {
4209        // Step[1] has no samples → Inconclusive with "needs both
4210        // phases" (the ratio cannot be computed; neither pass nor
4211        // fail is truthful per the INSTRUMENT-derived zero-signal
4212        // contract).
4213        let f = SeriesField::<f64>::from_parts_with_phases(
4214            "dispatches",
4215            vec!["t0".into()],
4216            vec![100],
4217            vec![Ok(10.0)],
4218            vec![Some(crate::assert::Phase::step(0))],
4219        );
4220        let mut v = Verdict::new();
4221        f.ratio_across_phases(
4222            &mut v,
4223            crate::assert::Phase::step(0),
4224            crate::assert::Phase::step(1),
4225        )
4226        .at_most(0.85);
4227        let r = v.into_result();
4228        assert!(
4229            r.is_inconclusive(),
4230            "expected Inconclusive, got {:?}",
4231            r.outcomes
4232        );
4233        assert!(
4234            r.inconclusive_details()
4235                .any(|d| d.message.contains("needs both phases")
4236                    && d.message.contains("later=<no-samples>")),
4237            "expected `needs both phases` Inconclusive reason naming the missing side, got {:?}",
4238            r.inconclusive_details().collect::<Vec<_>>(),
4239        );
4240    }
4241
4242    #[test]
4243    fn ratio_across_phases_zero_baseline_is_inconclusive_with_clear_detail() {
4244        // earlier=0 → Inconclusive with "earlier value is 0".
4245        // earlier_f == 0 means the baseline measured zero; the
4246        // ratio later/earlier is undefined (INSTRUMENT-derived
4247        // zero denominator). This was previously Fail; now
4248        // the gate cannot evaluate so the verdict is neither
4249        // pass nor fail.
4250        let f = SeriesField::<f64>::from_parts_with_phases(
4251            "dispatches",
4252            vec!["t0".into(), "t1".into()],
4253            vec![100, 200],
4254            vec![Ok(0.0), Ok(5.0)],
4255            vec![
4256                Some(crate::assert::Phase::step(0)),
4257                Some(crate::assert::Phase::step(1)),
4258            ],
4259        );
4260        let mut v = Verdict::new();
4261        f.ratio_across_phases(
4262            &mut v,
4263            crate::assert::Phase::step(0),
4264            crate::assert::Phase::step(1),
4265        )
4266        .at_most(0.85);
4267        let r = v.into_result();
4268        assert!(
4269            r.is_inconclusive(),
4270            "expected Inconclusive, got {:?}",
4271            r.outcomes
4272        );
4273        assert!(
4274            r.inconclusive_details()
4275                .any(|d| d.message.contains("earlier value is 0")
4276                    && d.message.contains("no baseline")),
4277            "expected `earlier value is 0` Inconclusive reason, got {:?}",
4278            r.inconclusive_details().collect::<Vec<_>>(),
4279        );
4280    }
4281
4282    // ---------- PhaseMapExt::ratio_across_phases (BTreeMap<Phase, T> entry) ----------
4283
4284    #[test]
4285    fn phasemap_ratio_across_phases_pass_records_info_note() {
4286        let mut m: std::collections::BTreeMap<crate::assert::Phase, f64> =
4287            std::collections::BTreeMap::new();
4288        m.insert(crate::assert::Phase::step(0), 10.0);
4289        m.insert(crate::assert::Phase::step(1), 5.0);
4290        let mut v = Verdict::new();
4291        m.ratio_across_phases(
4292            &mut v,
4293            "cross_frac",
4294            crate::assert::Phase::step(0),
4295            crate::assert::Phase::step(1),
4296        )
4297        .at_most(0.85);
4298        let r = v.into_result();
4299        assert!(r.is_pass(), "expected pass, got {:?}", r.outcomes);
4300        assert!(
4301            r.info_notes.iter().any(|n| n.message.contains("cross_frac")
4302                && n.message.contains("5/10")
4303                && n.message.contains("0.5000")
4304                && n.message.contains("ceiling 0.8500")),
4305            "expected pass info note with caller-supplied label, got {:?}",
4306            r.info_notes,
4307        );
4308    }
4309
4310    #[test]
4311    fn phasemap_ratio_across_phases_failure_records_detail_with_ratio() {
4312        let mut m: std::collections::BTreeMap<crate::assert::Phase, f64> =
4313            std::collections::BTreeMap::new();
4314        m.insert(crate::assert::Phase::step(0), 10.0);
4315        m.insert(crate::assert::Phase::step(1), 20.0);
4316        let mut v = Verdict::new();
4317        m.ratio_across_phases(
4318            &mut v,
4319            "cross_frac",
4320            crate::assert::Phase::step(0),
4321            crate::assert::Phase::step(1),
4322        )
4323        .at_most(0.85);
4324        let r = v.into_result();
4325        assert!(r.is_fail());
4326        assert!(
4327            r.failure_details().any(|d| d.kind == DetailKind::Temporal
4328                && d.message.contains("cross_frac")
4329                && d.message.contains("20/10")
4330                && d.message.contains("2.0000")
4331                && d.message.contains("ceiling 0.8500")),
4332            "expected fail detail with caller-supplied label + ratio, got {:?}",
4333            r.failure_details().collect::<Vec<_>>(),
4334        );
4335    }
4336
4337    #[test]
4338    fn phasemap_ratio_across_phases_missing_phase_is_inconclusive_with_clear_detail() {
4339        let mut m: std::collections::BTreeMap<crate::assert::Phase, f64> =
4340            std::collections::BTreeMap::new();
4341        m.insert(crate::assert::Phase::step(0), 10.0);
4342        // Phase 1 absent
4343        let mut v = Verdict::new();
4344        m.ratio_across_phases(
4345            &mut v,
4346            "cross_frac",
4347            crate::assert::Phase::step(0),
4348            crate::assert::Phase::step(1),
4349        )
4350        .at_most(0.85);
4351        let r = v.into_result();
4352        assert!(
4353            r.is_inconclusive(),
4354            "expected Inconclusive, got {:?}",
4355            r.outcomes
4356        );
4357        assert!(
4358            r.inconclusive_details()
4359                .any(|d| d.message.contains("needs both phases")
4360                    && d.message.contains("later=<no-samples>")),
4361            "expected needs-both-phases Inconclusive reason, got {:?}",
4362            r.inconclusive_details().collect::<Vec<_>>(),
4363        );
4364    }
4365
4366    #[test]
4367    fn phasemap_ratio_across_phases_zero_baseline_is_inconclusive_with_clear_detail() {
4368        let mut m: std::collections::BTreeMap<crate::assert::Phase, f64> =
4369            std::collections::BTreeMap::new();
4370        m.insert(crate::assert::Phase::step(0), 0.0);
4371        m.insert(crate::assert::Phase::step(1), 5.0);
4372        let mut v = Verdict::new();
4373        m.ratio_across_phases(
4374            &mut v,
4375            "cross_frac",
4376            crate::assert::Phase::step(0),
4377            crate::assert::Phase::step(1),
4378        )
4379        .at_most(0.85);
4380        let r = v.into_result();
4381        assert!(
4382            r.is_inconclusive(),
4383            "expected Inconclusive, got {:?}",
4384            r.outcomes
4385        );
4386        assert!(
4387            r.inconclusive_details()
4388                .any(|d| d.message.contains("earlier value is 0")
4389                    && d.message.contains("no baseline")),
4390            "expected zero-baseline Inconclusive reason, got {:?}",
4391            r.inconclusive_details().collect::<Vec<_>>(),
4392        );
4393    }
4394
4395    #[test]
4396    fn phasemap_ratio_across_phases_disjoint_phase_keys_is_inconclusive_cleanly() {
4397        // BTreeMap is non-empty but neither queried phase exists.
4398        // Both sides yield <no-samples> → Inconclusive (neither
4399        // pass nor fail can be evaluated when both inputs are
4400        // missing).
4401        let mut m: std::collections::BTreeMap<crate::assert::Phase, f64> =
4402            std::collections::BTreeMap::new();
4403        m.insert(crate::assert::Phase::BASELINE, 7.0);
4404        m.insert(crate::assert::Phase::step(5), 8.0);
4405        let mut v = Verdict::new();
4406        m.ratio_across_phases(
4407            &mut v,
4408            "cross_frac",
4409            crate::assert::Phase::step(0),
4410            crate::assert::Phase::step(1),
4411        )
4412        .at_most(0.85);
4413        let r = v.into_result();
4414        assert!(
4415            r.is_inconclusive(),
4416            "expected Inconclusive, got {:?}",
4417            r.outcomes
4418        );
4419        assert!(
4420            r.inconclusive_details()
4421                .any(|d| d.message.contains("needs both phases")
4422                    && d.message.contains("earlier=<no-samples>")
4423                    && d.message.contains("later=<no-samples>")),
4424            "both phases absent must surface in Inconclusive reason, got {:?}",
4425            r.inconclusive_details().collect::<Vec<_>>(),
4426        );
4427    }
4428
4429    #[test]
4430    fn cross_phase_ratio_empty_label_omits_label_prefix() {
4431        // Regression pin: with label="", the failure detail must NOT
4432        // have a leading ": " from a stale "{label}: " concatenation.
4433        let mut m: std::collections::BTreeMap<crate::assert::Phase, f64> =
4434            std::collections::BTreeMap::new();
4435        m.insert(crate::assert::Phase::step(0), 10.0);
4436        m.insert(crate::assert::Phase::step(1), 20.0);
4437        let mut v = Verdict::new();
4438        m.ratio_across_phases(
4439            &mut v,
4440            "",
4441            crate::assert::Phase::step(0),
4442            crate::assert::Phase::step(1),
4443        )
4444        .at_most(0.85);
4445        let r = v.into_result();
4446        let first = r
4447            .failure_details()
4448            .next()
4449            .expect("empty label still produces a detail when comparator fails");
4450        assert!(
4451            first.message.starts_with("ratio_across_phases("),
4452            "empty label must omit leading prefix; got {:?}",
4453            first.message,
4454        );
4455    }
4456
4457    // ---------- PhaseMapExt::zip_per_phase ----------
4458
4459    #[test]
4460    fn zip_per_phase_intersects_phase_keys() {
4461        let mut a: std::collections::BTreeMap<crate::assert::Phase, u64> =
4462            std::collections::BTreeMap::new();
4463        a.insert(crate::assert::Phase::step(0), 10);
4464        a.insert(crate::assert::Phase::step(1), 20);
4465        a.insert(crate::assert::Phase::step(2), 30);
4466        let mut b: std::collections::BTreeMap<crate::assert::Phase, u64> =
4467            std::collections::BTreeMap::new();
4468        b.insert(crate::assert::Phase::step(1), 100);
4469        b.insert(crate::assert::Phase::step(2), 200);
4470        b.insert(crate::assert::Phase::step(3), 300);
4471        let z = a.zip_per_phase(&b, |s, t| s + t);
4472        assert_eq!(z.len(), 2);
4473        assert_eq!(z[&crate::assert::Phase::step(1)], 120);
4474        assert_eq!(z[&crate::assert::Phase::step(2)], 230);
4475        assert!(!z.contains_key(&crate::assert::Phase::step(0)));
4476        assert!(!z.contains_key(&crate::assert::Phase::step(3)));
4477    }
4478
4479    #[test]
4480    fn zip_per_phase_empty_intersection_yields_empty() {
4481        let mut a: std::collections::BTreeMap<crate::assert::Phase, u64> =
4482            std::collections::BTreeMap::new();
4483        a.insert(crate::assert::Phase::step(0), 1);
4484        let mut b: std::collections::BTreeMap<crate::assert::Phase, u64> =
4485            std::collections::BTreeMap::new();
4486        b.insert(crate::assert::Phase::step(1), 2);
4487        let z = a.zip_per_phase(&b, |s, t| s + t);
4488        assert!(z.is_empty());
4489    }
4490
4491    #[test]
4492    fn zip_per_phase_both_empty_yields_empty() {
4493        let a: std::collections::BTreeMap<crate::assert::Phase, u64> =
4494            std::collections::BTreeMap::new();
4495        let b: std::collections::BTreeMap<crate::assert::Phase, u64> =
4496            std::collections::BTreeMap::new();
4497        let z = a.zip_per_phase(&b, |s, t| s + t);
4498        assert!(z.is_empty());
4499    }
4500
4501    #[test]
4502    fn zip_per_phase_heterogeneous_t_u_types() {
4503        // Pins T and U can differ — trait isn't accidentally T=U-bound.
4504        let mut a: std::collections::BTreeMap<crate::assert::Phase, u64> =
4505            std::collections::BTreeMap::new();
4506        a.insert(crate::assert::Phase::step(0), 100);
4507        a.insert(crate::assert::Phase::step(1), 200);
4508        let mut b: std::collections::BTreeMap<crate::assert::Phase, f64> =
4509            std::collections::BTreeMap::new();
4510        b.insert(crate::assert::Phase::step(0), 0.5);
4511        b.insert(crate::assert::Phase::step(1), 2.0);
4512        let z = a.zip_per_phase(&b, |s, t| s as f64 * t);
4513        assert_eq!(z[&crate::assert::Phase::step(0)], 50.0);
4514        assert_eq!(z[&crate::assert::Phase::step(1)], 400.0);
4515    }
4516
4517    #[test]
4518    fn zip_per_phase_takes_values_by_value_no_deref_noise() {
4519        // The composition body operates on owned T/U directly.
4520        // No `*s` / `*c` syntax — pins the bound is `T: Copy + U: Copy`
4521        // by-value, not by-reference.
4522        let mut a: std::collections::BTreeMap<crate::assert::Phase, u64> =
4523            std::collections::BTreeMap::new();
4524        a.insert(crate::assert::Phase::step(0), 1000);
4525        a.insert(crate::assert::Phase::step(1), 1200);
4526        let mut b: std::collections::BTreeMap<crate::assert::Phase, u64> =
4527            std::collections::BTreeMap::new();
4528        b.insert(crate::assert::Phase::step(0), 100);
4529        b.insert(crate::assert::Phase::step(1), 200);
4530        let frac = a.zip_per_phase(&b, |s, c| {
4531            let total = (s + c) as f64;
4532            if total == 0.0 { 0.0 } else { c as f64 / total }
4533        });
4534        assert!((frac[&crate::assert::Phase::step(0)] - (100.0 / 1100.0)).abs() < 1e-9);
4535        assert!((frac[&crate::assert::Phase::step(1)] - (200.0 / 1400.0)).abs() < 1e-9);
4536    }
4537
4538    #[test]
4539    fn zip_then_ratio_across_phases_composes_a_b_test() {
4540        // End-to-end composition the scx_mitosis test reaches for:
4541        // two counter-delta maps → zip into cross_frac → ratio across
4542        // phases → verdict mutation.
4543        let mut same_d: std::collections::BTreeMap<crate::assert::Phase, u64> =
4544            std::collections::BTreeMap::new();
4545        same_d.insert(crate::assert::Phase::step(0), 200);
4546        same_d.insert(crate::assert::Phase::step(1), 600);
4547        let mut cross_d: std::collections::BTreeMap<crate::assert::Phase, u64> =
4548            std::collections::BTreeMap::new();
4549        cross_d.insert(crate::assert::Phase::step(0), 100);
4550        cross_d.insert(crate::assert::Phase::step(1), 100);
4551        let frac = same_d.zip_per_phase(&cross_d, |s, c| {
4552            let total = (s + c) as f64;
4553            if total == 0.0 { 0.0 } else { c as f64 / total }
4554        });
4555        let mut v = Verdict::new();
4556        frac.ratio_across_phases(
4557            &mut v,
4558            "cross_frac",
4559            crate::assert::Phase::step(0),
4560            crate::assert::Phase::step(1),
4561        )
4562        .at_most(0.85);
4563        let r = v.into_result();
4564        assert!(
4565            r.is_pass(),
4566            "Step[0] cross_frac = 100/300 ≈ 0.333, Step[1] = 100/700 ≈ 0.143; \
4567             ratio 0.143/0.333 ≈ 0.43 well below 0.85 ceiling. \
4568             Got outcomes={:?}, details={:?}",
4569            r.outcomes,
4570            r.failure_details().collect::<Vec<_>>(),
4571        );
4572        assert!(
4573            r.info_notes
4574                .iter()
4575                .any(|n| n.message.contains("cross_frac")
4576                    && n.message.contains("ratio_across_phases")),
4577            "expected pass info note carrying the composed-metric label, \
4578             got {:?}",
4579            r.info_notes,
4580        );
4581    }
4582
4583    /// `frac_pair` collapses the `n / (n + m)` safe-divide closure
4584    /// that `zip_per_phase` callers spell inline. Two phase maps
4585    /// with overlapping phase keys produce per-phase fractions of
4586    /// `self / (self + other)`.
4587    #[test]
4588    fn frac_pair_computes_share_of_total_per_phase() {
4589        let mut cross: std::collections::BTreeMap<crate::assert::Phase, u64> =
4590            std::collections::BTreeMap::new();
4591        cross.insert(crate::assert::Phase::step(0), 100);
4592        cross.insert(crate::assert::Phase::step(1), 100);
4593        let mut same: std::collections::BTreeMap<crate::assert::Phase, u64> =
4594            std::collections::BTreeMap::new();
4595        same.insert(crate::assert::Phase::step(0), 200);
4596        same.insert(crate::assert::Phase::step(1), 600);
4597        let frac = cross.frac_pair(&same);
4598        assert!(
4599            (frac[&crate::assert::Phase::step(0)] - (100.0 / 300.0)).abs() < 1e-9,
4600            "phase 0: 100/(100+200) = 1/3; got {}",
4601            frac[&crate::assert::Phase::step(0)],
4602        );
4603        assert!(
4604            (frac[&crate::assert::Phase::step(1)] - (100.0 / 700.0)).abs() < 1e-9,
4605            "phase 1: 100/(100+600) = 1/7; got {}",
4606            frac[&crate::assert::Phase::step(1)],
4607        );
4608    }
4609
4610    /// When both inputs sum to zero for a phase, `frac_pair` MUST
4611    /// drop the entry rather than synthesize `0.0`. A synthesized
4612    /// `0.0` would slip past any downstream `at_most(thr > 0)` gate
4613    /// without ever observing the phase pair carries no signal —
4614    /// the silent-pass class of bug `Outcome::Inconclusive` was
4615    /// introduced to prevent. Dropping the entry surfaces the
4616    /// absence so the consumer (typically a `ratio_within` /
4617    /// `at_most` chain over the resulting map) treats the missing
4618    /// phase the same as one only present on a single side —
4619    /// Inconclusive at the comparator boundary, never a silent pass.
4620    #[test]
4621    fn frac_pair_zero_total_drops_entry_no_silent_pass() {
4622        let mut a: std::collections::BTreeMap<crate::assert::Phase, u64> =
4623            std::collections::BTreeMap::new();
4624        a.insert(crate::assert::Phase::step(0), 0);
4625        let mut b: std::collections::BTreeMap<crate::assert::Phase, u64> =
4626            std::collections::BTreeMap::new();
4627        b.insert(crate::assert::Phase::step(0), 0);
4628        let frac = a.frac_pair(&b);
4629        assert!(
4630            !frac.contains_key(&crate::assert::Phase::step(0)),
4631            "zero/(zero+zero) must drop entry, not synthesize 0.0; got {frac:?}",
4632        );
4633        assert!(
4634            frac.is_empty(),
4635            "no phases survived → empty map; got {frac:?}"
4636        );
4637    }
4638
4639    /// One zero side plus a positive other side is NOT a zero-total
4640    /// pair — the total is positive and the fraction is `0/(0+m) =
4641    /// 0.0` (a real measurement of "self has zero share"). The
4642    /// entry MUST be retained because the signal is real even
4643    /// though the value is zero. Pins the boundary between
4644    /// "real-zero" (kept) and "no-signal" (dropped).
4645    #[test]
4646    fn frac_pair_zero_self_positive_other_keeps_real_zero() {
4647        let mut a: std::collections::BTreeMap<crate::assert::Phase, u64> =
4648            std::collections::BTreeMap::new();
4649        a.insert(crate::assert::Phase::step(0), 0);
4650        let mut b: std::collections::BTreeMap<crate::assert::Phase, u64> =
4651            std::collections::BTreeMap::new();
4652        b.insert(crate::assert::Phase::step(0), 100);
4653        let frac = a.frac_pair(&b);
4654        let v = frac[&crate::assert::Phase::step(0)];
4655        assert_eq!(
4656            v, 0.0,
4657            "0/(0+100) is a real-zero measurement, not no-signal; got {v}",
4658        );
4659        assert!(!v.is_nan(), "frac_pair must never produce NaN");
4660    }
4661
4662    /// `frac_pair` MUST saturate on u64 overflow rather than wrap.
4663    /// Two near-`u64::MAX` counter deltas wrapping silently would
4664    /// produce a wrong fraction. With `saturating_add`, the total
4665    /// caps at `u64::MAX` so the fraction is bounded and finite.
4666    #[test]
4667    fn frac_pair_saturates_on_u64_overflow_no_wrap() {
4668        let mut a: std::collections::BTreeMap<crate::assert::Phase, u64> =
4669            std::collections::BTreeMap::new();
4670        a.insert(crate::assert::Phase::step(0), u64::MAX);
4671        let mut b: std::collections::BTreeMap<crate::assert::Phase, u64> =
4672            std::collections::BTreeMap::new();
4673        b.insert(crate::assert::Phase::step(0), 1);
4674        let frac = a.frac_pair(&b);
4675        let v = frac[&crate::assert::Phase::step(0)];
4676        // u64::MAX.saturating_add(1) == u64::MAX. The fraction is
4677        // `u64::MAX as f64 / u64::MAX as f64` — both sides land on
4678        // the same f64 value (lossy cast collapses the bottom
4679        // ~11 bits) so the ratio is exactly 1.0.
4680        assert_eq!(
4681            v, 1.0,
4682            "saturating_add caps total at u64::MAX; both sides cast to same f64 → 1.0; got {v}",
4683        );
4684        assert!(!v.is_nan(), "must never produce NaN even at saturation");
4685        assert!(
4686            v.is_finite(),
4687            "must produce a finite value even at saturation"
4688        );
4689    }
4690
4691    /// Intersection-only semantics: phases present in only one
4692    /// input drop from the result. Mirrors `zip_per_phase`.
4693    #[test]
4694    fn frac_pair_intersects_phase_keys_only() {
4695        let mut a: std::collections::BTreeMap<crate::assert::Phase, u64> =
4696            std::collections::BTreeMap::new();
4697        a.insert(crate::assert::Phase::step(0), 100);
4698        a.insert(crate::assert::Phase::step(1), 200);
4699        let mut b: std::collections::BTreeMap<crate::assert::Phase, u64> =
4700            std::collections::BTreeMap::new();
4701        b.insert(crate::assert::Phase::step(1), 100);
4702        b.insert(crate::assert::Phase::step(2), 50);
4703        let frac = a.frac_pair(&b);
4704        assert!(
4705            !frac.contains_key(&crate::assert::Phase::step(0)),
4706            "phase 0 absent from b — must drop from result",
4707        );
4708        assert!(
4709            !frac.contains_key(&crate::assert::Phase::step(2)),
4710            "phase 2 absent from a — must drop from result",
4711        );
4712        assert_eq!(frac.len(), 1, "only phase 1 is in the intersection");
4713        assert!(
4714            (frac[&crate::assert::Phase::step(1)] - (200.0 / 300.0)).abs() < 1e-9,
4715            "phase 1: 200/(200+100); got {}",
4716            frac[&crate::assert::Phase::step(1)],
4717        );
4718    }
4719}