ktstr/scenario/
mod.rs

1//! Scenario definitions and test execution.
2//!
3//! Most tests use the declarative ops API from the [`ops`] submodule:
4//! - [`ops::CgroupDef`] -- declarative cgroup definition (name + cpuset + workload)
5//! - [`ops::Step`] -- a sequence of ops followed by a hold period
6//! - [`ops::Op`] -- an atomic scenario operation (cgroup/worker topology, payload run/wait/kill, freeze, snapshot, kernel read/write, scheduler attach/detach/restart/replace, BPF map pin)
7//! - [`ops::CpusetSpec`] -- how to compute a cpuset from topology
8//! - [`ops::HoldSpec`] -- how long to hold after a step
9//! - [`backdrop::Backdrop`] -- persistent scenario state shared across every Step
10//! - [`ops::execute_defs`] -- run cgroup definitions for the full duration
11//! - [`ops::execute_steps`] -- run a multi-step sequence
12//! - [`ops::execute_scenario`] -- run a Backdrop + Steps sequence
13//!
14//! Types defined in this module:
15//! - [`Ctx`] -- runtime context passed to scenario functions
16//! - [`CgroupGroup`] -- RAII guard that removes cgroups on drop
17//!
18//! The [`scenarios`] submodule provides curated canned scenarios.
19//!
20//! ## Builder method conventions
21//!
22//! Every builder type in the scenario API (Setup, Step, Backdrop,
23//! WorkloadConfig, …) names its methods by what they do, not by
24//! what they return. The three-prefix vocabulary is uniform across
25//! the scenario surface so a reader can predict semantics from
26//! the prefix alone:
27//!
28//! - **`with_X(arg) -> Self`** — alternate constructor that returns
29//!   a fresh value with `X` already set (e.g.
30//!   [`ops::Step::with_defs`], [`ops::Step::with_payload`],
31//!   [`ops::Setup::with_factory`]). Distinct from `Self::new(...)`
32//!   which is the base ctor; `with_X` constructors compose without
33//!   reaching for `Default::default()` then chaining setters.
34//! - **`set_X(self, value) -> Self`** — field REPLACE on an
35//!   existing builder. Consumes `self`, writes `X`, returns the
36//!   updated value (e.g. [`ops::Step::set_ops`],
37//!   [`ops::Step::set_hold`]). Previous contents of `X` are
38//!   discarded.
39//! - **`push_X(self, value) -> Self`** / **`extend_X<I>(self, iter)
40//!   -> Self`** — field APPEND. `push_X` adds one element,
41//!   `extend_X` adds many from any `IntoIterator` (e.g.
42//!   [`backdrop::Backdrop::push_cgroup`] /
43//!   [`backdrop::Backdrop::extend_cgroups`]).
44//!
45//! Naming an APPEND method `set_X` (or a REPLACE method `push_X`)
46//! mis-encodes the semantics and is a defect — flag at review.
47//!
48//! See the [Scenarios](https://ktstr.dev/guide/concepts/scenarios.html)
49//! and [Writing Tests](https://ktstr.dev/guide/writing-tests.html)
50//! chapters of the guide.
51
52pub mod affinity;
53pub mod backdrop;
54pub mod basic;
55pub mod bpf_pin;
56pub mod cpuset;
57pub mod dynamic;
58pub mod host_stall;
59pub mod interaction;
60pub mod nested;
61pub mod ops;
62pub mod payload_run;
63pub mod performance;
64pub mod sample;
65pub mod scenarios;
66pub mod snapshot;
67pub mod stress;
68
69pub use backdrop::Backdrop;
70
71use std::collections::BTreeSet;
72use std::sync::Arc;
73use std::sync::atomic::AtomicU16;
74use std::thread;
75use std::time::Duration;
76
77use anyhow::Result;
78
79use nix::sys::signal::kill;
80use nix::unistd::Pid;
81
82use crate::assert::AssertResult;
83use crate::topology::TestTopology;
84use crate::workload::*;
85
86/// Check if a process is alive via kill(pid, 0).
87///
88/// Returns `false` for pid 0: `kill(0, ...)` targets the caller's
89/// process group rather than a single process, so the syscall would
90/// always report success and falsely mark "no process" as alive.
91///
92/// Returns `false` for `pid <= 0`. Non-positive pid_t values are
93/// invalid targets — `kill(0, ...)` signals the caller's process
94/// group and `kill(-1, ...)` signals every process the caller is
95/// permitted to signal. Neither matches "is this specific process
96/// alive?", so we refuse rather than probe.
97///
98/// # EPERM: foreign-UID processes report as dead
99///
100/// `kill(pid, 0)` returns one of three things for `pid > 0`:
101///
102/// 1. `Ok(())` — pid exists and the caller is permitted to signal it
103///    (same UID, or the caller has `CAP_KILL`). This maps to `true`.
104/// 2. `Err(ESRCH)` — no process with that pid. Maps to `false`.
105/// 3. `Err(EPERM)` — the pid exists but belongs to a different UID
106///    (or is otherwise unsignalable by the caller). Per `kill(2)`,
107///    "EPERM implies the process exists" — a live process. This
108///    implementation treats EPERM as `false` (via `.is_ok()`) because
109///    ktstr's callers use `process_alive` to ask "is the scheduler /
110///    payload *I launched* still running?", not "does any process
111///    with this pid exist?". A foreign-UID process sharing the pid is
112///    not the one the caller is tracking and is correctly classified
113///    as "no, not *my* process."
114///
115/// If a future caller needs to distinguish "dead" from "alive but
116/// unsignalable," switch to `Errno::ESRCH` discrimination on the
117/// `kill` result instead of `.is_ok()` — do NOT change this function
118/// silently, because existing callers rely on the EPERM-as-false
119/// behavior when walking /proc on heavily-forking hosts where pid
120/// reuse can land a foreign-UID process on the old slot.
121fn process_alive(pid: libc::pid_t) -> bool {
122    if pid <= 0 {
123        return false;
124    }
125    kill(Pid::from_raw(pid), None).is_ok()
126}
127
128// Re-export AffinityIntent from workload so existing `use super::*` in
129// submodules (affinity.rs, etc.) can find it.
130pub use crate::workload::AffinityIntent;
131
132// ---------------------------------------------------------------------------
133// RAII cgroup group
134// ---------------------------------------------------------------------------
135
136/// RAII guard that removes cgroups on drop.
137///
138/// Prevents cgroup leaks when workload spawning or other operations fail
139/// between cgroup creation and cleanup.
140#[must_use = "dropping a CgroupGroup immediately destroys the cgroups it manages"]
141pub struct CgroupGroup<'a> {
142    cgroups: &'a dyn crate::cgroup::CgroupOps,
143    names: Vec<String>,
144}
145
146impl std::fmt::Debug for CgroupGroup<'_> {
147    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
148        f.debug_struct("CgroupGroup")
149            .field("cgroups", &self.cgroups.parent_path())
150            .field("names", &self.names)
151            .finish()
152    }
153}
154
155impl<'a> CgroupGroup<'a> {
156    /// Create an empty group. Cgroups added via `add_cgroup` or
157    /// `add_cgroup_no_cpuset` are removed when the group is dropped.
158    pub fn new(cgroups: &'a dyn crate::cgroup::CgroupOps) -> Self {
159        Self {
160            cgroups,
161            names: Vec::new(),
162        }
163    }
164
165    /// Create a cgroup and set its cpuset. The cgroup is tracked for cleanup on drop.
166    ///
167    /// Auto-enables [`Controller::Cpuset`](crate::cgroup::Controller::Cpuset)
168    /// on the parent's `cgroup.subtree_control` before creating the
169    /// child so the child's `cpuset.cpus` file is exposed and the
170    /// subsequent [`set_cpuset`](crate::cgroup::CgroupOps::set_cpuset)
171    /// write lands. Direct CgroupGroup users (the `custom_*` scenarios
172    /// in [`crate::scenario::nested`] / [`crate::scenario::stress`])
173    /// don't go through `run_scenario`'s
174    /// controller-resolution hook, so the controller enable has to
175    /// happen here. The setup call is idempotent on real cgroupfs (a
176    /// `+cpuset` write into `cgroup.subtree_control` that already
177    /// contains `cpuset` is a no-op at the kernel level per
178    /// `cgroup_subtree_control_write` in kernel/cgroup/cgroup.c).
179    pub fn add_cgroup(&mut self, name: &str, cpuset: &BTreeSet<usize>) -> Result<()> {
180        let mut required = BTreeSet::new();
181        required.insert(crate::cgroup::Controller::Cpuset);
182        self.cgroups.setup(&required)?;
183        self.cgroups.create_cgroup(name)?;
184        self.cgroups.set_cpuset(name, cpuset)?;
185        self.names.push(name.to_string());
186        Ok(())
187    }
188
189    /// Create a cgroup without a cpuset. The cgroup is tracked for cleanup on drop.
190    ///
191    /// No controller enablement: callers explicitly opting out of a
192    /// cpuset signal that they don't need any cgroup v2 controller
193    /// surface beyond the cgroup-core knobs (`cgroup.procs`,
194    /// `cgroup.freeze`) which are ungated. If a future caller needs
195    /// e.g. memory limits on a no-cpuset cgroup, add a
196    /// `with_controllers` overload rather than auto-enabling — the
197    /// "no-cpuset" name is load-bearing for the absent-controller
198    /// behavior pinned by tests in
199    /// [`crate::scenario::nested::custom_nested_cgroup_no_ctrl`].
200    pub fn add_cgroup_no_cpuset(&mut self, name: &str) -> Result<()> {
201        self.cgroups.create_cgroup(name)?;
202        self.names.push(name.to_string());
203        Ok(())
204    }
205
206    /// Names of all tracked cgroups.
207    pub fn names(&self) -> &[String] {
208        &self.names
209    }
210
211    /// Forget a tracked cgroup name without touching cgroupfs. Used
212    /// by `Op::RemoveCgroup` immediately BEFORE invoking the kernel
213    /// rmdir, so a later `Op::AddCgroup` with the same name can
214    /// re-create the cgroup instead of colliding against the stale
215    /// tracking entry, and the teardown-on-drop path skips a
216    /// now-redundant rmdir of a dir that the in-progress (or
217    /// already-completed) kernel call is removing.
218    pub(crate) fn forget(&mut self, name: &str) {
219        self.names.retain(|n| n != name);
220    }
221}
222
223/// True when `err`'s root cause is an `io::Error` with kind
224/// `NotFound` (ENOENT). Used by `CgroupGroup::drop` and
225/// `Op::RemoveCgroup` to classify a TOCTOU ENOENT as benign
226/// (post-condition "no dir" already holds) so it is filtered
227/// from warn output. Extracting the predicate keeps the two
228/// sites in lock-step — a classification change only edits
229/// this function, not both call sites.
230pub(crate) fn is_io_not_found(err: &anyhow::Error) -> bool {
231    err.root_cause()
232        .downcast_ref::<std::io::Error>()
233        .is_some_and(|io| io.kind() == std::io::ErrorKind::NotFound)
234}
235
236/// Map a cgroup `remove_cgroup` error's root-cause errno to a
237/// short remediation hint appended to warn messages. Only
238/// EBUSY and EACCES — the two errnos callers can act on — get
239/// specific hints; every other errno yields `None` so the warn
240/// stays terse with just the underlying error chain. Extracted
241/// so both `CgroupGroup::drop` and `Op::RemoveCgroup` stay
242/// synchronized; a new hint (e.g. ENOTEMPTY for un-cleaned
243/// children) only needs to be wired here.
244pub(crate) fn remove_cgroup_errno_hint(err: &anyhow::Error) -> Option<&'static str> {
245    let raw = err
246        .root_cause()
247        .downcast_ref::<std::io::Error>()?
248        .raw_os_error()?;
249    match raw {
250        libc::EBUSY => {
251            Some("EBUSY: cgroup still has live tasks — workloads were not drained before teardown")
252        }
253        libc::EACCES => {
254            Some("EACCES: permission denied — check cgroup owner / `user.slice` delegation")
255        }
256        _ => None,
257    }
258}
259
260impl Drop for CgroupGroup<'_> {
261    fn drop(&mut self) {
262        // Reverse-iterate so nested cgroups (children created AFTER
263        // their parents) are removed before their parents. Removing a
264        // cgroup directory that still has child cgroup directories
265        // under it fails with ENOTEMPTY.
266        //
267        // ENOENT is expected: `CgroupManager::remove_cgroup` returns
268        // Ok when the dir is already gone, so the only way ENOENT
269        // reaches here is the narrow TOCTOU race where another process
270        // unlinks between `exists()` and `remove_dir` — the post-
271        // condition (no dir) still holds and no cleanup is owed. Every
272        // other error (EBUSY from a surviving task, EACCES, broken
273        // cgroupfs mount) surfaces via `tracing::warn!` so a teardown
274        // failure is visible instead of silently swallowed; mirrors
275        // the same handling in `Op::RemoveCgroup` so the two paths
276        // stay consistent.
277        for name in self.names.iter().rev() {
278            if let Err(err) = self.cgroups.remove_cgroup(name) {
279                if is_io_not_found(&err) {
280                    continue;
281                }
282                let hint = remove_cgroup_errno_hint(&err).unwrap_or("");
283                tracing::warn!(
284                    cgroup = %name,
285                    err = %format!("{err:#}"),
286                    hint,
287                    "CgroupGroup::drop: remove_cgroup returned non-ENOENT error",
288                );
289            }
290        }
291    }
292}
293
294// ---------------------------------------------------------------------------
295// Runtime context and interpreter
296// ---------------------------------------------------------------------------
297
298/// Runtime context passed to scenario functions.
299///
300/// Provides access to cgroup management, topology information, and
301/// test configuration. Custom scenarios are functions receiving this
302/// `Ctx` as their sole parameter (e.g. the `custom_*` fns in
303/// [`crate::scenario::nested`] / [`crate::scenario::stress`]).
304///
305/// # Method groups
306///
307/// ## Time helpers
308///
309/// - [`Self::settled_hold`] — `HoldSpec::fixed(settle + duration * f)`
310///   sugar for the dominant Step hold-time pattern.
311///
312/// ## Cgroup construction
313///
314/// - [`Self::cgroup_def`] — `CgroupDef::named(name).workers(workers_per_cgroup)`
315///   sugar that pins the default-worker-count shape across 40+ call
316///   sites.
317///
318/// ## Topology accessors
319///
320/// - [`Self::cpuset_cpus`] — resolve a
321///   [`CpusetSpec`](crate::scenario::ops::CpusetSpec) against this
322///   context's topology and return the CPU count.
323///
324/// ## Constructors
325///
326/// - [`Self::builder`] — start a [`CtxBuilder`] with sane defaults for
327///   unit-test scenarios.
328/// - [`Self::payload`] — start a
329///   [`PayloadRun`](crate::scenario::payload_run::PayloadRun) for a
330///   given [`Payload`](crate::test_support::Payload).
331///
332/// # Field groups
333///
334/// Each pub field's doc is prefixed with its sub-concern label so the
335/// rustdoc table groups visibly. The six groups are:
336///
337/// - **VM environment** — `cgroups`, `topo`. The host-side
338///   filesystem + topology handles the scenario interacts with.
339/// - **Test timing** — `duration`, `settle`. The wall-clock
340///   budgets that shape every Step's hold-time math.
341/// - **Cgroup defaults** — `workers_per_cgroup`, `work_type_override`.
342///   The merge-time defaults `CgroupDef::merged_works` applies when a
343///   `WorkSpec` leaves them unset.
344/// - **Scheduler state** — `sched_pid`. Liveness-probe target for
345///   inter-step scheduler-death detection.
346/// - **Assertion policy** — `assert`. The merged
347///   default+scheduler+per-test verdict checks
348///   `run_scenario` / `execute_steps` apply.
349/// - **Runtime coordination** — `wait_for_map_write`. Framework-set
350///   gate that custom scenarios typically do not flip.
351#[non_exhaustive]
352pub struct Ctx<'a> {
353    /// **VM environment.** Cgroup filesystem operations. `&dyn CgroupOps`
354    /// (not `&CgroupManager`) so scenario code can be driven by an
355    /// in-memory test double without touching `/sys/fs/cgroup`.
356    /// Production callers pass `&CgroupManager` and the auto-coercion
357    /// is transparent at the call site — `ctx.cgroups.set_cpuset(...)`
358    /// works unchanged.
359    pub cgroups: &'a dyn crate::cgroup::CgroupOps,
360    /// **VM environment.** VM CPU topology.
361    pub topo: &'a TestTopology,
362    /// **Test timing.** How long to run the workload.
363    pub duration: Duration,
364    /// **Cgroup defaults.** Default number of workers per cgroup.
365    pub workers_per_cgroup: usize,
366    /// **Scheduler state.** PID of the running scheduler (for liveness
367    /// checks), or `None` when no scheduler is attached. Stored as
368    /// `Option<pid_t>` so the "no scheduler" state is a distinct
369    /// variant rather than a 0-sentinel — `run_scenario` and
370    /// step-level liveness probes destructure via `if let Some(pid)`
371    /// instead of `!= 0` guards.
372    pub sched_pid: Option<libc::pid_t>,
373    /// **Test timing.** Time to wait after cgroup creation for
374    /// scheduler stabilization.
375    pub settle: Duration,
376    /// **Cgroup defaults.** Override work type for scenarios that use
377    /// `SpinWait` by default.
378    pub work_type_override: Option<WorkType>,
379    /// **Assertion policy.** Merged assertion config (default_checks +
380    /// scheduler + per-test). Used by `run_scenario` for data-driven
381    /// scenarios and by `execute_steps` as the default when no explicit
382    /// checks are passed to `execute_steps_with`.
383    pub assert: crate::assert::Assert,
384    /// **Runtime coordination.** When true, `execute_steps` blocks after
385    /// writing the scenario start marker until the host confirms its BPF
386    /// map write is complete — waiting on the `bpf_map_write_done` latch
387    /// that `hvc0_poll_loop` sets when the host pushes
388    /// `SIGNAL_BPF_WRITE_DONE` over the virtio-console RX queue. Set
389    /// automatically by the framework when a `KtstrTestEntry` declares
390    /// `bpf_map_write`; custom scenarios typically do not flip this
391    /// manually.
392    pub wait_for_map_write: bool,
393    /// **Phase coordination.** Per-VM atomic publishing the current
394    /// scenario step index. Written by the scenario driver immediately
395    /// before each `run_step` call and read by three stamping sites
396    /// so each captured sample carries the step it belongs to:
397    /// (1) the host-side freeze-coordinator periodic-capture path
398    /// stamps at periodic-fire time;
399    /// (2) the on-demand `Op::CaptureSnapshot` apply arm stamps at
400    /// apply time (the apply happens in the same phase as the
401    /// capture);
402    /// (3) the host-side user-watchpoint trip handler stamps at
403    /// TRIP time, not at registration — the user issues
404    /// `Op::WatchSnapshot` from some Step k, but the actual write
405    /// that fires the watchpoint and triggers the snapshot can
406    /// happen at any later phase, so the trip-time stamp pins the
407    /// sample to the bucket matching when the observation actually
408    /// occurred.
409    ///
410    /// Encoded per the framework's 1-indexed phase convention: `0` is
411    /// the BASELINE settle window (the initial value), `1..=N` align
412    /// with scenario Step ordinals (`step_idx + 1`). This matches
413    /// [`crate::assert::PhaseBucket::step_index`] so a phase-aware
414    /// sample drops directly into the correct bucket without a
415    /// reindex.
416    ///
417    /// Stored as `AtomicU16` because the wire `StimulusPayload`
418    /// step-index field is also `u16`, so a single shared width
419    /// keeps the host-side bridge map and the guest-published wire
420    /// value type-compatible without narrowing.
421    ///
422    /// Wrapped in `Arc` so the same per-VM publisher can be cloned
423    /// into every consumer thread (scenario driver, freeze-coord,
424    /// on-demand-capture apply arms) without a process-global
425    /// static — multiple in-process VMs (e.g. parallel gauntlet
426    /// variants) each get an independent atomic instead of racing
427    /// on shared global state.
428    pub current_step: Arc<AtomicU16>,
429    /// **Drift-safe path derivation.** The `&'static str` name of
430    /// the [`KtstrTestEntry`](crate::test_support::KtstrTestEntry)
431    /// the running test body was dispatched as, stamped by the
432    /// guest-side `maybe_dispatch_vm_test_with_args` and the
433    /// host-only dispatch path before the test body runs. Drives
434    /// the body-side path-derivation methods
435    /// [`failure_dump_path`](Self::failure_dump_path)
436    /// (and `wprof_pb_path` / `repro_wprof_pb_path` when the
437    /// `wprof` feature is enabled) — the
438    /// drift-safe replacement for the legacy pattern of
439    /// hardcoding the test fn name as a string literal at the
440    /// callsite. When `Some(name)`, those methods derive the
441    /// sidecar paths from the macro-stamped value at call time,
442    /// so a future test rename surfaces the resulting
443    /// `Result<PathBuf>` bail at compile-time-equivalent-failure
444    /// (a deterministic Err) rather than as a runtime ENOENT
445    /// against a stale literal.
446    ///
447    /// `None` is the manually-constructed-Ctx escape hatch — ad-hoc
448    /// scenario tests that build `Ctx` via
449    /// [`CtxBuilder::build`](CtxBuilder::build) without calling
450    /// [`CtxBuilder::entry_name`](CtxBuilder::entry_name) get
451    /// `None` and a path-derivation method invocation bails with an
452    /// actionable diagnostic naming the missing-stamp scenario.
453    /// Sibling to [`crate::vmm::VmResult::entry_name`] which carries
454    /// the same `&'static str` on the post-VM result struct (the
455    /// two ends of the test-name chain — pre-VM body context vs
456    /// post-VM result — store the same shape so the body-side
457    /// `ctx.failure_dump_path()` and the host-side
458    /// `result.failure_dump_path()` resolve to identical paths).
459    pub entry_name: Option<&'static str>,
460    /// The run's variant hash (see `variant_hash_from_parts`),
461    /// stamped at the macro dispatch site alongside [`Self::entry_name`].
462    /// The body-side `failure_dump_path` / `wprof_pb_path` derivations
463    /// embed it as the `-{16-hex}` filename suffix so a gauntlet test's
464    /// per-preset dumps don't clobber and each matches its sidecar's
465    /// variant hash. `0` on a manually-built fixture (which has
466    /// `entry_name = None` and thus bails before reading this).
467    pub variant_hash: u64,
468}
469
470impl std::fmt::Debug for Ctx<'_> {
471    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
472        // `&dyn CgroupOps` is not Debug (dropped the supertrait to
473        // avoid bloating the test-double surface); render the parent
474        // path instead so debug prints are still informative.
475        f.debug_struct("Ctx")
476            .field("cgroups", &self.cgroups.parent_path())
477            .field("topo", &self.topo)
478            .field("duration", &self.duration)
479            .field("workers_per_cgroup", &self.workers_per_cgroup)
480            .field("sched_pid", &self.sched_pid)
481            .field("settle", &self.settle)
482            .field("work_type_override", &self.work_type_override)
483            .field("assert", &self.assert)
484            .field("wait_for_map_write", &self.wait_for_map_write)
485            .field(
486                "current_step",
487                &self.current_step.load(std::sync::atomic::Ordering::Relaxed),
488            )
489            .field("entry_name", &self.entry_name)
490            .field("variant_hash", &self.variant_hash)
491            .finish()
492    }
493}
494
495impl Ctx<'_> {
496    /// Read the live scheduler identity published by the
497    /// `Op::AttachScheduler` / `Op::ReplaceScheduler` /
498    /// `Op::DetachScheduler` dispatch arms. Returns `None` when no
499    /// scheduler is currently attached (the pre-attach state at
500    /// process start and the post-`Op::DetachScheduler` state).
501    ///
502    /// Distinct from `entry.scheduler` (the boot-time descriptor
503    /// from the `#[ktstr_test]` macro): `entry.scheduler` stays
504    /// the same across `Op::ReplaceScheduler` swaps, while
505    /// `ctx.current_scheduler()` reflects the LIVE identity after
506    /// any runtime swap. Consumer sites that care about the
507    /// currently-attached BPF binary (verifier_stats wiring,
508    /// monitor thresholds, auto-repro probe gates) want this
509    /// method; sites that care about the test's declared
510    /// scheduler (test-runner skip/include filtering, sidecar
511    /// `scheduler_name` metadata) want `entry.scheduler`.
512    ///
513    /// v0 limitation: the boot path does not publish the boot
514    /// scheduler into the side channel, so the first observable
515    /// `Some` arrives after the first `Op::AttachScheduler` /
516    /// `Op::ReplaceScheduler` runs. Consumer sites that want a
517    /// fallback should call `.unwrap_or(&entry.scheduler.binary)`
518    /// to combine the live view with the boot descriptor.
519    pub fn current_scheduler(&self) -> Option<&'static crate::test_support::SchedulerSpec> {
520        crate::vmm::rust_init::current_scheduler()
521    }
522
523    /// Scheduler pid, filtered to the `> 0` range that
524    /// `process_alive` treats as signalable.
525    ///
526    /// `Ctx::sched_pid` documents `None` as the "no scheduler
527    /// configured" state, and the liveness sites destructure with
528    /// `if let Some(pid)`. Nothing in the builder, however, prevents
529    /// a caller from passing `Some(0)` or a negative pid — an easy
530    /// mistake for callers used to the workload module's internal
531    /// 0-sentinel pid slot (see the note on `sched_pid` above — the
532    /// sentinel lives on a module-private `AtomicI32` in
533    /// `src/workload/`, not on this `Option<pid_t>`). A bare
534    /// `Some(0)` would reach
535    /// `process_alive`, which returns `false` for any pid `<= 0`,
536    /// and the liveness sites would then bail with `scheduler died`
537    /// even though no scheduler was ever running — a false
538    /// positive that turns a misconfiguration into a misleading
539    /// scheduler-death diagnostic.
540    ///
541    /// Centralising the filter here keeps the sole production
542    /// caller — `setup_cgroups`'s post-settle bail — on a single
543    /// predicate: only a positive pid is "configured". The
544    /// `run_scenario` post-settle bail and workload-phase polling
545    /// take a live `crate::vmm::rust_init::sched_pid()` read
546    /// instead of this snapshot accessor.
547    ///
548    /// A `Some(n)` where `n <= 0` is a caller bug — the builder
549    /// documents `None` as the unconfigured shape, and every
550    /// positive value flows through unchanged. When the accessor
551    /// squashes such a value to `None`, it emits a `tracing::warn!`
552    /// naming the offending pid so the misuse surfaces in
553    /// structured logs instead of manifesting downstream as a
554    /// silent "scheduler died" verdict or, worse, a `kill(0, …)`
555    /// reaching the caller's own process group. The warn is
556    /// bounded: the sole production caller is `setup_cgroups`'s
557    /// post-settle bail, so the volume is O(1) per scenario run
558    /// even for a sustained misconfiguration — tight enough to
559    /// leave in place without a rate limiter.
560    pub(crate) fn active_sched_pid(&self) -> Option<libc::pid_t> {
561        match self.sched_pid {
562            Some(p) if p > 0 => Some(p),
563            Some(p) => {
564                tracing::warn!(
565                    pid = p,
566                    "Ctx::active_sched_pid: sched_pid=Some({p}) squashed to None; \
567                     only positive pids are configured-scheduler values — use \
568                     None for the unconfigured shape instead of a 0-sentinel or \
569                     negative pid"
570                );
571                None
572            }
573            None => None,
574        }
575    }
576
577    /// Resolve a `CpusetSpec` against this context's topology and
578    /// return the CPU count. Convenience accessor for tests that need
579    /// to size work counts proportional to a cpuset without computing
580    /// the topology denominator by hand. Mirrors the framework's own
581    /// resolution: the count is exactly the size of the BTreeSet
582    /// `spec.resolve(self)` returns, so any
583    /// `CpusetSpec`-aware code path (cgroup cpuset assignment,
584    /// affinity intent resolution, [`WorkSpec::workers_pct`]) sees the
585    /// same denominator. Uses the TOPOLOGY-level cpuset, not the
586    /// currently-effective cgroup cpuset — narrowing via mid-scenario
587    /// `Op::SetCpuset` does not change the value this returns.
588    pub fn cpuset_cpus(&self, spec: &crate::scenario::ops::CpusetSpec) -> usize {
589        spec.resolve(self).len()
590    }
591
592    /// `HoldSpec::fixed(settle + duration * fraction_of_duration)` —
593    /// the dominant Step hold-time pattern across scenarios. A Step
594    /// typically holds for the settle window (so the scheduler can
595    /// reach steady state) plus some fraction of the workload
596    /// duration (often `1.0` for whole-test Steps, or `0.5`/`1.0/3.0`
597    /// for multi-Step scenarios that subdivide the duration budget).
598    ///
599    /// The multiplication routes through [`Duration::mul_f64`], so a
600    /// fraction like `1.0 / 3.0` may yield a Duration that differs
601    /// from an integer-division formulation by ≤1 nanosecond — below
602    /// Linux thread sleep granularity and so unobservable at the
603    /// hold-evaluation boundary, but worth noting if a test ever
604    /// byte-pins a Duration value.
605    ///
606    /// # Panics
607    /// When `fraction_of_duration` is NaN, infinite, or negative
608    /// (per the `Duration::mul_f64` contract).
609    ///
610    /// # Examples
611    ///
612    /// ```ignore
613    /// Step::new(vec![], ctx.settled_hold(0.5));  // settle + half duration
614    /// Step::new(vec![], ctx.settled_hold(1.0));  // settle + full duration
615    /// ```
616    pub fn settled_hold(&self, fraction_of_duration: f64) -> crate::scenario::ops::HoldSpec {
617        crate::scenario::ops::HoldSpec::fixed(
618            self.settle + self.duration.mul_f64(fraction_of_duration),
619        )
620    }
621
622    /// Construct a [`CgroupDef`](crate::scenario::ops::CgroupDef) with `self.workers_per_cgroup`
623    /// workers — the most common scenario shape, dedupe of 40+
624    /// `CgroupDef::named(name).workers(ctx.workers_per_cgroup)` call
625    /// sites across `src/scenario/` and `tests/`.
626    ///
627    /// Equivalent to:
628    ///
629    /// ```ignore
630    /// CgroupDef::named(name).workers(ctx.workers_per_cgroup)
631    /// ```
632    ///
633    /// Returns a fresh [`CgroupDef`](crate::scenario::ops::CgroupDef) so the test author can chain
634    /// further builders (`.cpuset`, `.work`, etc.) on the
635    /// result. For non-default worker counts call
636    /// `CgroupDef::named(name).workers(N)` directly — the helper
637    /// pins ONLY the `ctx.workers_per_cgroup` default path.
638    ///
639    /// # Examples
640    ///
641    /// ```ignore
642    /// // Before (42+ sites):
643    /// vec![CgroupDef::named("cg_0").workers(ctx.workers_per_cgroup)].into()
644    /// // After:
645    /// vec![ctx.cgroup_def("cg_0")].into()
646    ///
647    /// // With additional builders:
648    /// ctx.cgroup_def("cg_0").cpuset(...)
649    /// ```
650    pub fn cgroup_def(
651        &self,
652        name: impl Into<std::borrow::Cow<'static, str>>,
653    ) -> crate::scenario::ops::CgroupDef {
654        crate::scenario::ops::CgroupDef::named(name).workers(self.workers_per_cgroup)
655    }
656
657    /// Per-test failure-dump sidecar path. Derives
658    /// `{sidecar_dir()}/{entry_name}-{variant_hash:016x}.failure-dump.json`
659    /// from the macro-stamped [`Self::entry_name`] — the drift-safe
660    /// replacement for the legacy pattern of hardcoding the test
661    /// fn name as a string literal at the callsite.
662    ///
663    /// # Sibling to [`crate::vmm::VmResult::failure_dump_path`]
664    ///
665    /// The post-VM result struct carries its own copy of the
666    /// macro-stamped entry name + computes the same path string.
667    /// A test body invocation `ctx.failure_dump_path()` and a
668    /// post-VM `result.failure_dump_path()` resolve to identical
669    /// paths because both stamp from the same
670    /// `entry.name: &'static str` source — proc-macro emission at
671    /// the `#[ktstr_test]` site.
672    ///
673    /// # Errors
674    ///
675    /// Bails when `self.entry_name` is `None`. The `None` shape is
676    /// the manually-constructed-Ctx escape hatch — ad-hoc scenario
677    /// unit tests build [`Ctx`] via [`CtxBuilder::build`] without
678    /// calling [`CtxBuilder::entry_name`]; such a context cannot
679    /// compute a drift-safe path. The bail diagnostic names the
680    /// missing-stamp scenario explicitly so test authors who hit it
681    /// know exactly which builder method to call.
682    pub fn failure_dump_path(&self) -> anyhow::Result<std::path::PathBuf> {
683        let name = self.entry_name.ok_or_else(|| {
684            anyhow::anyhow!(
685                "Ctx::failure_dump_path requires entry_name set by the \
686                 macro-stamped dispatch path \
687                 (`maybe_dispatch_vm_test_with_args`); reached with \
688                 entry_name = None, which means the Ctx was \
689                 constructed via CtxBuilder::build without calling \
690                 .entry_name(...). Call ctx_builder.entry_name(name) \
691                 explicitly, OR if this is a scenario unit-test fixture \
692                 that has no test-entry context, derive the path inline \
693                 (sidecar_dir().join(format!(\"{{name}}-{{variant_hash:016x}}.failure-dump.json\"))) \
694                 — the method form is for tests dispatched via \
695                 #[ktstr_test], not for builder-driven fixtures."
696            )
697        })?;
698        Ok(crate::test_support::sidecar_dir().join(format!(
699            "{name}-{:016x}.failure-dump.json",
700            self.variant_hash
701        )))
702    }
703
704    /// Per-test wprof Perfetto-trace sidecar path. Mirror of
705    /// [`Self::failure_dump_path`] for the wprof artifact —
706    /// derives `{sidecar_dir()}/{entry_name}-{variant_hash:016x}.wprof.pb`
707    /// from the macro-stamped [`Self::entry_name`].
708    ///
709    /// Sibling to [`crate::vmm::VmResult::wprof_pb_path`] —
710    /// the post-VM and pre-VM derivations produce identical paths.
711    /// See [`Self::failure_dump_path`] for the broader contract +
712    /// the manually-constructed-Ctx None-bail diagnostic.
713    ///
714    /// # Errors
715    ///
716    /// Bails when `self.entry_name` is `None` per the same shape
717    /// as [`Self::failure_dump_path`].
718    #[cfg(feature = "wprof")]
719    pub fn wprof_pb_path(&self) -> anyhow::Result<std::path::PathBuf> {
720        let name = self.entry_name.ok_or_else(|| {
721            anyhow::anyhow!(
722                "Ctx::wprof_pb_path requires entry_name set by the \
723                 macro-stamped dispatch path; reached with \
724                 entry_name = None — see Ctx::failure_dump_path \
725                 for the manually-constructed-Ctx workaround."
726            )
727        })?;
728        Ok(crate::test_support::sidecar_dir()
729            .join(format!("{name}-{:016x}.wprof.pb", self.variant_hash)))
730    }
731
732    #[cfg(feature = "wprof")]
733    pub fn repro_wprof_pb_path(&self) -> anyhow::Result<std::path::PathBuf> {
734        let name = self.entry_name.ok_or_else(|| {
735            anyhow::anyhow!(
736                "Ctx::repro_wprof_pb_path requires entry_name set by \
737                 the macro-stamped dispatch path; reached with \
738                 entry_name = None — see Ctx::failure_dump_path for \
739                 the manually-constructed-Ctx workaround."
740            )
741        })?;
742        Ok(crate::test_support::sidecar_dir()
743            .join(format!("{name}-{:016x}.repro.wprof.pb", self.variant_hash)))
744    }
745}
746
747/// Fluent builder for [`Ctx`].
748///
749/// Scenario unit tests reach for a [`Ctx`] with sane defaults so they
750/// can exercise scenario logic without booting a VM. The direct
751/// struct-literal construction at ~14 call sites forces every test to
752/// repeat the full 12-field init and keeps diverging defaults in sync
753/// by hand; this builder centralises those defaults and keeps required
754/// fields (borrowed `cgroups`/`topo`) in their types.
755///
756/// Defaults:
757/// - `duration`: 1 s — matches the `scenario::basic` test helper
758///   (`scenario::stress` uses 2 s and sets it explicitly)
759/// - `workers_per_cgroup`: 1
760/// - `sched_pid`: `None` — `run_scenario` short-circuits the
761///   liveness checks when `sched_pid.is_none()`.
762/// - `settle`: 0 ms — tests do not need to wait for scheduler stabilisation
763/// - `work_type_override`: `None`
764/// - `assert`: [`crate::assert::Assert::default_checks()`] —
765///   the same policy production paths merge through
766/// - `wait_for_map_write`: `false`
767///
768/// Override any default via the corresponding method, then materialise
769/// the context with [`CtxBuilder::build`].
770///
771/// # Example
772/// ```ignore
773/// let cgroups = CgroupManager::new("/nonexistent");
774/// let topo = TestTopology::synthetic(4, 1);
775/// let ctx = Ctx::builder(&cgroups, &topo)
776///     .workers_per_cgroup(3)
777///     .duration(Duration::from_secs(2))
778///     .build();
779/// ```
780pub struct CtxBuilder<'a> {
781    cgroups: &'a dyn crate::cgroup::CgroupOps,
782    topo: &'a TestTopology,
783    duration: Duration,
784    workers_per_cgroup: usize,
785    sched_pid: Option<libc::pid_t>,
786    settle: Duration,
787    work_type_override: Option<WorkType>,
788    assert: crate::assert::Assert,
789    wait_for_map_write: bool,
790    current_step: Arc<AtomicU16>,
791    entry_name: Option<&'static str>,
792    variant_hash: u64,
793}
794
795impl<'a> CtxBuilder<'a> {
796    /// Wall-clock budget for the workload phase of the scenario.
797    #[must_use = "builder methods consume self; bind the result"]
798    pub fn duration(mut self, d: Duration) -> Self {
799        self.duration = d;
800        self
801    }
802
803    /// Number of worker threads started per cgroup by the default workload.
804    #[must_use = "builder methods consume self; bind the result"]
805    pub fn workers_per_cgroup(mut self, n: usize) -> Self {
806        self.workers_per_cgroup = n;
807        self
808    }
809
810    /// PID of the scheduler process; `None` disables the liveness
811    /// checks in `run_scenario`.
812    #[must_use = "builder methods consume self; bind the result"]
813    pub fn sched_pid(mut self, pid: Option<libc::pid_t>) -> Self {
814        self.sched_pid = pid;
815        self
816    }
817
818    /// Time to wait after cgroup creation for scheduler stabilisation.
819    #[must_use = "builder methods consume self; bind the result"]
820    pub fn settle(mut self, s: Duration) -> Self {
821        self.settle = s;
822        self
823    }
824
825    /// Override the default work type for scenarios that would
826    /// otherwise use `SpinWait`.
827    #[must_use = "builder methods consume self; bind the result"]
828    pub fn work_type_override(mut self, wt: Option<WorkType>) -> Self {
829        self.work_type_override = wt;
830        self
831    }
832
833    /// Merged assertion config. Callers that want the production
834    /// layering should pass `Assert::default_checks().merge(&...)`;
835    /// tests that pin a specific policy can pass
836    /// [`crate::assert::Assert::NO_OVERRIDES`] directly.
837    #[must_use = "builder methods consume self; bind the result"]
838    pub fn assert(mut self, a: crate::assert::Assert) -> Self {
839        self.assert = a;
840        self
841    }
842
843    /// When true, `execute_steps` blocks on the `bpf_map_write_done`
844    /// latch (set on the host's `SIGNAL_BPF_WRITE_DONE` over
845    /// virtio-console RX) after writing the scenario start marker. See
846    /// the field doc on [`Ctx::wait_for_map_write`].
847    #[must_use = "builder methods consume self; bind the result"]
848    pub fn wait_for_map_write(mut self, v: bool) -> Self {
849        self.wait_for_map_write = v;
850        self
851    }
852
853    /// Inject a caller-owned per-VM step-index publisher. The
854    /// default `Ctx::builder` already constructs a fresh
855    /// `Arc<AtomicU16>` initialised to `0`, so most callers do
856    /// not need this setter; it exists so the host-side VM runner
857    /// can hand the same Arc to both the scenario driver `Ctx` and
858    /// the freeze-coordinator thread, giving both halves a single
859    /// per-VM source of truth for the current phase.
860    #[must_use = "builder methods consume self; bind the result"]
861    pub fn current_step(mut self, cs: Arc<AtomicU16>) -> Self {
862        self.current_step = cs;
863        self
864    }
865
866    /// **Drift-safe path derivation.** Stamp the
867    /// `&'static str` name of the
868    /// [`KtstrTestEntry`](crate::test_support::KtstrTestEntry)
869    /// the dispatched test body was registered as. Drives the
870    /// body-side path-derivation methods on [`Ctx`]
871    /// (`failure_dump_path`, `wprof_pb_path`,
872    /// `repro_wprof_pb_path`) so test
873    /// authors get the drift-safe per-test sidecar path without
874    /// re-hardcoding the test fn name in the body — a future test
875    /// rename surfaces a deterministic `Result<PathBuf>` bail
876    /// rather than a runtime ENOENT against a stale literal.
877    ///
878    /// The framework's macro-stamped dispatch path
879    /// (`maybe_dispatch_vm_test_with_args` + the host-only
880    /// dispatcher) calls this with the entry name at Ctx
881    /// construction time, before the test body runs. Ad-hoc
882    /// scenario unit tests that build [`Ctx`] without the dispatch
883    /// path skip this setter, and the path-derivation methods bail
884    /// with an actionable diagnostic — see
885    /// [`Ctx::failure_dump_path`] for the None-case bail shape.
886    #[must_use = "builder methods consume self; bind the result"]
887    pub fn entry_name(mut self, name: &'static str) -> Self {
888        self.entry_name = Some(name);
889        self
890    }
891
892    /// Stamp the run's variant hash (see `variant_hash_from_parts`) so the
893    /// body-side `failure_dump_path` / `wprof_pb_path` derivations embed
894    /// it as the `-{16-hex}` filename suffix. Set at the macro dispatch
895    /// site alongside [`Self::entry_name`]; ad-hoc fixtures leave it `0`.
896    #[must_use = "builder methods consume self; bind the result"]
897    pub fn variant_hash(mut self, hash: u64) -> Self {
898        self.variant_hash = hash;
899        self
900    }
901
902    /// Materialise the configured [`Ctx`].
903    #[must_use = "dropping a Ctx without running the scenario discards the test setup"]
904    pub fn build(self) -> Ctx<'a> {
905        Ctx {
906            cgroups: self.cgroups,
907            topo: self.topo,
908            duration: self.duration,
909            workers_per_cgroup: self.workers_per_cgroup,
910            sched_pid: self.sched_pid,
911            settle: self.settle,
912            work_type_override: self.work_type_override,
913            assert: self.assert,
914            wait_for_map_write: self.wait_for_map_write,
915            current_step: self.current_step,
916            entry_name: self.entry_name,
917            variant_hash: self.variant_hash,
918        }
919    }
920}
921
922impl<'a> Ctx<'a> {
923    /// Start a new [`CtxBuilder`] with required `cgroups` and `topo`
924    /// borrows and sane defaults for every other field. See
925    /// [`CtxBuilder`] for the full default set.
926    #[must_use = "discarding a CtxBuilder drops the scenario context defaults; chain setters and call .build()"]
927    pub fn builder(
928        cgroups: &'a dyn crate::cgroup::CgroupOps,
929        topo: &'a TestTopology,
930    ) -> CtxBuilder<'a> {
931        CtxBuilder {
932            cgroups,
933            topo,
934            duration: Duration::from_secs(1),
935            workers_per_cgroup: 1,
936            sched_pid: None,
937            settle: Duration::from_millis(0),
938            work_type_override: None,
939            assert: crate::assert::Assert::default_checks(),
940            wait_for_map_write: false,
941            current_step: Arc::new(AtomicU16::new(0)),
942            entry_name: None,
943            variant_hash: 0,
944        }
945    }
946
947    /// Start a [`PayloadRun`](crate::scenario::payload_run::PayloadRun)
948    /// builder for the given [`Payload`](crate::test_support::Payload).
949    ///
950    /// The builder inherits `payload.default_args` and
951    /// `payload.default_checks`; chained `.arg(...)` / `.check(...)`
952    /// calls extend them; `.clear_args()` / `.clear_checks()` wipe
953    /// both defaults and prior appends. Terminal `.run()` blocks and
954    /// returns `Result<(AssertResult, PayloadMetrics)>`.
955    ///
956    /// Only `PayloadKind::Binary` payloads are runnable here;
957    /// `.run()` on a `PayloadKind::Scheduler` payload returns `Err`.
958    #[must_use = "dropping a PayloadRun discards the payload configuration; chain setters and call .run()"]
959    pub fn payload(
960        &'a self,
961        p: &'static crate::test_support::Payload,
962    ) -> crate::scenario::payload_run::PayloadRun<'a> {
963        crate::scenario::payload_run::PayloadRun::new(self, p)
964    }
965}
966
967/// Spawn workers per cgroup, move each handle's worker pids into
968/// its cgroup, then start all handles in a second pass.
969///
970/// Shared scaffolding for `run_scenario` and `setup_cgroups` —
971/// both defer `.start()` until every handle has been spawned and
972/// every worker pid moved, so workers see a stable cgroup
973/// membership at first run. [`spawn_diverse`] does NOT use this
974/// helper because it starts each handle inline (eager-start
975/// semantics required for its IoSyncWrite/SpinWait mix — workload
976/// ordering matters when the mix includes I/O-bound and CPU-bound
977/// cgroups).
978///
979/// `cfg_fn` builds the per-cgroup [`WorkloadConfig`] from its
980/// index + name; callers own the per-cgroup customization logic.
981///
982/// `move_tasks` is ESRCH-tolerant — a worker that exits between
983/// fork and cgroup placement is warned and skipped, unlike the
984/// original per-pid `move_task` which propagated ESRCH.
985fn spawn_and_move<F>(ctx: &Ctx, names: &[String], mut cfg_fn: F) -> Result<Vec<WorkloadHandle>>
986where
987    F: FnMut(usize, &str) -> Result<WorkloadConfig>,
988{
989    let mut handles = Vec::with_capacity(names.len());
990    for (i, name) in names.iter().enumerate() {
991        let wl = cfg_fn(i, name.as_str())?;
992        let h = WorkloadHandle::spawn(&wl)?;
993        tracing::debug!(
994            cgroup = %name,
995            workers = wl.num_workers,
996            pids = h.worker_pids().len(),
997            "spawned workers",
998        );
999        ctx.cgroups
1000            .move_tasks(name.as_str(), &h.worker_pids_for_cgroup_procs()?)?;
1001        handles.push(h);
1002    }
1003    for h in &mut handles {
1004        h.start();
1005    }
1006    Ok(handles)
1007}
1008
1009/// Resolve a [`WorkSpec`]'s `num_workers`, falling back to `default_n` when unset,
1010/// and reject `num_workers=0`.
1011///
1012/// A cgroup with no workers emits no [`crate::workload::WorkerReport`]s, so every downstream
1013/// assertion vacuously passes. Callers that want "no load" on a cgroup
1014/// should either drop the [`crate::workload::WorkSpec`] entry entirely (letting the default apply)
1015/// or use a single sentinel worker so assertions have something to check.
1016pub(crate) fn resolve_num_workers(work: &WorkSpec, default_n: usize, label: &str) -> Result<usize> {
1017    let n = work.num_workers.unwrap_or(default_n);
1018    if n == 0 {
1019        anyhow::bail!(
1020            "cgroup '{}': num_workers=0 is not allowed — assertions would \
1021             vacuously pass with no WorkerReports; use at least 1 worker or \
1022             drop this WorkSpec entry",
1023            label,
1024        );
1025    }
1026    Ok(n)
1027}
1028
1029/// Resolve an [`AffinityIntent`] to a concrete [`ResolvedAffinity`]
1030/// for workers in a cgroup with the given effective cpuset.
1031///
1032/// # Errors
1033///
1034/// Returns `Err` when the test author's affinity intent cannot be
1035/// satisfied against the cgroup's effective cpuset. Per the
1036/// project-wide no-silent-drops invariant, an unsatisfiable
1037/// intent must surface as a returnable error rather than silently
1038/// degrading to "no affinity applied" — silent degradation lets
1039/// the workload run with the wrong placement while the test
1040/// reports success (vacuously-passing assertions).
1041///
1042/// The unsatisfiable cases by variant:
1043/// - [`AffinityIntent::RandomSubset`]: `from` pool empty after
1044///   cpuset intersection, or `count == 0`.
1045/// - [`AffinityIntent::LlcAligned`]: every LLC's CPUs disjoint
1046///   from the cpuset (no LLC has any CPU inside the cpuset).
1047/// - [`AffinityIntent::SingleCpu`]: cpuset is empty.
1048/// - [`AffinityIntent::Exact`]: requested CPU set is empty
1049///   (`Exact(BTreeSet::new())` is intent-only unsatisfiable),
1050///   or requested CPU set disjoint from the cpuset
1051///   (intersection empty).
1052/// - [`AffinityIntent::SmtSiblingPair`]: no physical core with
1053///   ≥2 SMT siblings inside the cpuset.
1054/// - [`AffinityIntent::CrossCgroup`]: topology exposes zero CPUs.
1055///   The public [`crate::topology::TestTopology`] constructors all
1056///   reject this at construction; reaching this case requires a
1057///   private-field construction or a future API addition.
1058///
1059/// Every error diagnostic names the offending intent and a
1060/// remediation hint. Diagnostics for cpuset-narrowed pools
1061/// (`RandomSubset` empty intersection, `LlcAligned`, `SingleCpu`,
1062/// `Exact` disjoint-intersection, `SmtSiblingPair`) also render the
1063/// cpuset that narrowed the pool. The intent-only errors —
1064/// `RandomSubset { count: 0 }` and `Exact(BTreeSet::new())` — omit
1065/// the cpuset because the cpuset is irrelevant to the failure (the
1066/// intent itself names zero CPUs). Remediation hints include
1067/// switching to [`AffinityIntent::Inherit`] to deliberately inherit
1068/// the cpuset, widening the cgroup's cpuset, or picking CPUs inside
1069/// the cpuset.
1070pub fn resolve_affinity_for_cgroup(
1071    kind: &AffinityIntent,
1072    cpuset: Option<&BTreeSet<usize>>,
1073    topo: &TestTopology,
1074) -> Result<ResolvedAffinity> {
1075    match kind {
1076        AffinityIntent::Inherit => Ok(ResolvedAffinity::None),
1077        AffinityIntent::RandomSubset { from, count } => {
1078            // Validate the intent itself (count > 0) before doing any
1079            // resource work — an intent-only bug (count==0) doesn't
1080            // need an allocation to diagnose.
1081            if *count == 0 {
1082                anyhow::bail!(
1083                    "AffinityIntent::RandomSubset count=0 cannot satisfy any sample. \
1084                     Switch to `AffinityIntent::Inherit` to deliberately inherit the \
1085                     cgroup cpuset, or pass `count >= 1`.",
1086                );
1087            }
1088            // The pool is already resolved by the caller (typed
1089            // `from`). Intersect with the cgroup's cpuset if one is
1090            // active so the resolved pool stays within the
1091            // scenario's CPU budget — same intersection semantic
1092            // applied to `Exact` below.
1093            let pool = if let Some(cs) = cpuset {
1094                from.intersection(cs).copied().collect::<BTreeSet<usize>>()
1095            } else {
1096                from.clone()
1097            };
1098            if pool.is_empty() {
1099                if cpuset.is_some() {
1100                    let cpuset_repr = format_cpuset_for_diag(cpuset);
1101                    anyhow::bail!(
1102                        "AffinityIntent::RandomSubset has no CPUs after intersecting \
1103                         `from={from:?}` with the cgroup cpuset ({cpuset_repr}). \
1104                         Switch to `AffinityIntent::Inherit` to deliberately inherit \
1105                         the cgroup cpuset, widen the cgroup's cpuset, or pick a \
1106                         `from` set that overlaps the cpuset.",
1107                    );
1108                } else {
1109                    anyhow::bail!(
1110                        "AffinityIntent::RandomSubset has an empty `from` pool with \
1111                         no cgroup cpuset to narrow it — there is no CPU to sample. \
1112                         Switch to `AffinityIntent::Inherit` to deliberately inherit \
1113                         the scenario's CPU budget, or pass a non-empty `from` set.",
1114                    );
1115                }
1116            }
1117            Ok(ResolvedAffinity::Random {
1118                from: pool,
1119                count: *count,
1120            })
1121        }
1122        AffinityIntent::LlcAligned => {
1123            let pool = cpuset.cloned().unwrap_or_else(|| topo.all_cpuset());
1124            // Find the LLC that has the most overlap with the cpuset.
1125            let mut best_llc = topo.llc_aligned_cpuset(0);
1126            let mut best_overlap = best_llc.intersection(&pool).count();
1127            for idx in 1..topo.num_llcs() {
1128                let llc = topo.llc_aligned_cpuset(idx);
1129                let overlap = llc.intersection(&pool).count();
1130                if overlap > best_overlap {
1131                    best_llc = llc;
1132                    best_overlap = overlap;
1133                }
1134            }
1135            // Intersect with cpuset so effective affinity matches kernel behavior.
1136            let effective: BTreeSet<usize> = best_llc.intersection(&pool).copied().collect();
1137            if effective.is_empty() {
1138                let cpuset_repr = format_cpuset_for_diag(cpuset);
1139                anyhow::bail!(
1140                    "AffinityIntent::LlcAligned has no CPUs after intersecting every \
1141                     LLC with the cgroup cpuset ({cpuset_repr}). No LLC has any CPU \
1142                     inside the cpuset. Switch to `AffinityIntent::Inherit` to \
1143                     deliberately inherit the cpuset, widen the cgroup's cpuset to \
1144                     include CPUs from at least one LLC, or pick a different \
1145                     affinity intent that doesn't require LLC alignment.",
1146                );
1147            }
1148            Ok(ResolvedAffinity::Fixed(effective))
1149        }
1150        AffinityIntent::CrossCgroup => {
1151            // When a cpuset is active, crossing cgroup boundaries is the intent,
1152            // but the kernel will intersect. Use all CPUs -- the kernel enforces
1153            // the cpuset constraint.
1154            let all = topo.all_cpuset();
1155            if all.is_empty() {
1156                // Defense-in-depth against zero-CPU topologies. The two
1157                // public TestTopology constructors (`synthetic` +
1158                // `from_vm_topology`) both reject `num_cpus == 0` at
1159                // construction, so reaching this branch requires a
1160                // private-field construction or a future API addition
1161                // that produces a zero-CPU topology. Without this bail
1162                // an empty `Fixed` would either trip the
1163                // `flatten_for_spawn` unreachable!() OR (if reached via
1164                // a path that bypassed flatten) silently produce an
1165                // empty `sched_setaffinity` mask the kernel rejects
1166                // with EINVAL after the cgroup intersection.
1167                anyhow::bail!(
1168                    "AffinityIntent::CrossCgroup cannot satisfy any worker — \
1169                     the topology exposes zero CPUs. The public \
1170                     TestTopology constructors (`synthetic` + \
1171                     `from_vm_topology`) reject this at construction; \
1172                     reaching this bail means a direct private-field \
1173                     construction or a future API addition produced a \
1174                     zero-CPU topology. Build the test against a \
1175                     topology with at least one CPU, or switch to \
1176                     `AffinityIntent::Inherit` to defer to the cgroup \
1177                     cpuset.",
1178                );
1179            }
1180            Ok(ResolvedAffinity::Fixed(all))
1181        }
1182        AffinityIntent::SingleCpu => {
1183            let pool = cpuset.cloned().unwrap_or_else(|| topo.all_cpuset());
1184            if let Some(&cpu) = pool.iter().next() {
1185                Ok(ResolvedAffinity::SingleCpu(cpu))
1186            } else {
1187                // Pool is empty only when cpuset is Some(empty) — `all_cpuset()`
1188                // returns at least the boot CPU for any non-degenerate topology.
1189                anyhow::bail!(
1190                    "AffinityIntent::SingleCpu cannot pick a CPU from an empty \
1191                     cgroup cpuset. Switch to `AffinityIntent::Inherit` to \
1192                     deliberately inherit (the empty cpuset is itself the \
1193                     problem), or assign a non-empty cpuset to the cgroup.",
1194                );
1195            }
1196        }
1197        AffinityIntent::Exact(cpus) => {
1198            if cpus.is_empty() {
1199                // Empty Exact is the most-explicit way a user can say
1200                // "I made a mistake" — silently degrading it to
1201                // Inherit is the same no-silent-drop violation as the
1202                // disjoint-intersection case below.
1203                anyhow::bail!(
1204                    "AffinityIntent::Exact(BTreeSet::new()) is unsatisfiable — an \
1205                     empty CPU set pins workers to nothing. Switch to \
1206                     `AffinityIntent::Inherit` to deliberately inherit the cgroup \
1207                     cpuset (or the full topology when no cpuset is active), or \
1208                     pass at least one CPU ID.",
1209                );
1210            }
1211            if let Some(cs) = cpuset {
1212                let effective: BTreeSet<usize> = cpus.intersection(cs).copied().collect();
1213                if effective.is_empty() {
1214                    let cpuset_repr = format_cpuset_for_diag(cpuset);
1215                    anyhow::bail!(
1216                        "AffinityIntent::Exact({cpus:?}) is disjoint from the cgroup \
1217                         cpuset ({cpuset_repr}); intersection is empty. Switch to \
1218                         `AffinityIntent::Inherit` to deliberately inherit the cpuset, \
1219                         widen the cgroup's cpuset to include the requested CPUs, or \
1220                         narrow the `Exact` set to CPUs inside the cpuset.",
1221                    );
1222                }
1223                Ok(ResolvedAffinity::Fixed(effective))
1224            } else {
1225                Ok(ResolvedAffinity::Fixed(cpus.clone()))
1226            }
1227        }
1228        AffinityIntent::SmtSiblingPair => resolve_smt_sibling_pair(cpuset, topo),
1229    }
1230}
1231
1232/// Render a cgroup cpuset for the bail diagnostics on
1233/// [`resolve_affinity_for_cgroup`]'s unsatisfiable arms. `None`
1234/// renders as `<no cpuset>` so the operator can distinguish
1235/// "cpuset is empty" from "no cpuset is active" — both can produce
1236/// an empty intersection on different intents.
1237fn format_cpuset_for_diag(cpuset: Option<&BTreeSet<usize>>) -> String {
1238    match cpuset {
1239        Some(cs) if cs.is_empty() => "empty cpuset {}".to_string(),
1240        Some(cs) => format!("cpuset {cs:?}"),
1241        None => "<no cpuset>".to_string(),
1242    }
1243}
1244
1245/// Resolve [`AffinityIntent::SmtSiblingPair`] against the cgroup's
1246/// effective cpuset.
1247///
1248/// Walks every LLC's per-core sibling map looking for a physical
1249/// core whose SMT siblings are all present in the pool (cgroup's
1250/// cpuset, or the full topology when no cpuset is active). Returns
1251/// the first matching pair as [`ResolvedAffinity::Fixed`] containing
1252/// the two sibling CPU IDs.
1253///
1254/// Returns `Err` when no core has 2+ siblings in the pool —
1255/// `threads_per_core == 1` (SMT disabled or non-SMT host), the
1256/// cpuset isolates each sibling onto a different cgroup, or the
1257/// topology was constructed without per-core sibling data
1258/// (`LlcInfo::cores` empty — see `crate::topology::TestTopology::synthetic`). The
1259/// error path is explicit, not a silent fallback, because
1260/// [`WorkType::SmtSiblingSpin`] and other paired-on-siblings
1261/// workloads produce meaningless results without true SMT
1262/// contention.
1263///
1264/// All workers in the group resolve to the same 2-CPU set; for
1265/// `num_workers == 2` the kernel runs one worker on each sibling,
1266/// which is the contention pattern this intent targets. For
1267/// `num_workers > 2` (multiple pairs in one group) every worker
1268/// shares the same pair — the kernel time-slices them, which
1269/// approximates pair contention but does not place each pair on
1270/// distinct cores. Strict per-pair distribution across cores
1271/// requires per-worker affinity that the current
1272/// [`ResolvedAffinity`] model does not express; track via a
1273/// follow-up if a test author needs it.
1274///
1275/// [`WorkType::SmtSiblingSpin`]: crate::workload::WorkType::SmtSiblingSpin
1276/// [`AffinityIntent::SmtSiblingPair`]: crate::workload::AffinityIntent::SmtSiblingPair
1277fn resolve_smt_sibling_pair(
1278    cpuset: Option<&BTreeSet<usize>>,
1279    topo: &TestTopology,
1280) -> Result<ResolvedAffinity> {
1281    let pool = cpuset.cloned().unwrap_or_else(|| topo.all_cpuset());
1282    for llc in topo.llcs() {
1283        for siblings in llc.cores().values() {
1284            // Take the first two sibling CPUs that are both in the
1285            // pool. `cores()` is sorted; pairing the lowest two
1286            // present siblings gives a deterministic choice for a
1287            // given (topology, cpuset) input.
1288            let mut iter = siblings.iter().copied().filter(|cpu| pool.contains(cpu));
1289            if let (Some(a), Some(b)) = (iter.next(), iter.next()) {
1290                let pair: BTreeSet<usize> = [a, b].into_iter().collect();
1291                return Ok(ResolvedAffinity::Fixed(pair));
1292            }
1293        }
1294    }
1295    // Render the search scope: when a cpuset narrowed the pool, name
1296    // it (operator can widen / pick siblings inside it); when no
1297    // cpuset is active, the scope IS the full topology (operator must
1298    // adjust topology or switch intents — naming "<no cpuset>" would
1299    // mislead by implying cpuset config is relevant).
1300    let scope = if cpuset.is_some() {
1301        format!("the effective cpuset ({})", format_cpuset_for_diag(cpuset))
1302    } else {
1303        "the full topology (no cgroup cpuset is active)".to_string()
1304    };
1305    anyhow::bail!(
1306        "AffinityIntent::SmtSiblingPair requires a physical core with at \
1307         least two SMT siblings present in {scope}. The current topology \
1308         and cpuset expose no such pair — threads_per_core may be 1 (SMT \
1309         disabled or non-SMT host), the cpuset may have isolated each \
1310         sibling onto a different cgroup, or the topology was built \
1311         without per-core sibling data. Switch to a different \
1312         AffinityIntent for non-SMT scheduling tests, or run on a host \
1313         whose VM topology has threads_per_core >= 2.",
1314    );
1315}
1316
1317/// Resolve an [`AffinityIntent`] for direct storage in
1318/// [`crate::workload::WorkloadConfig::affinity`].
1319///
1320/// [`crate::workload::WorkloadConfig::affinity`] is an
1321/// [`AffinityIntent`] (type-unified with [`crate::workload::WorkSpec::affinity`])
1322/// and its spawn-time gate (see
1323/// [`crate::workload::WorkloadHandle::spawn`]) accepts
1324/// [`AffinityIntent::Inherit`], [`AffinityIntent::Exact`], and
1325/// [`AffinityIntent::RandomSubset`]. The scenario engine holds the
1326/// topology and cpuset that the spawn-time gate lacks, so it
1327/// pre-resolves topology-aware variants here:
1328///
1329/// - [`ResolvedAffinity::None`] → [`AffinityIntent::Inherit`]
1330/// - [`ResolvedAffinity::Fixed(set)`](ResolvedAffinity::Fixed) →
1331///   [`AffinityIntent::Exact(set)`](AffinityIntent::Exact)
1332/// - [`ResolvedAffinity::SingleCpu(cpu)`](ResolvedAffinity::SingleCpu) →
1333///   [`AffinityIntent::Exact`] containing `cpu`
1334/// - [`ResolvedAffinity::Random { from, count }`](ResolvedAffinity::Random) →
1335///   [`AffinityIntent::RandomSubset { from, count }`](AffinityIntent::RandomSubset)
1336///   — the resolved pool is forwarded verbatim and per-worker
1337///   sampling stays deferred to spawn time (each worker gets an
1338///   independent draw from `from`).
1339///
1340/// # Errors
1341///
1342/// Forwards every `Err` from the inner [`resolve_affinity_for_cgroup`]
1343/// — see that function's `# Errors` section for the full list of
1344/// unsatisfiable cases (RandomSubset empty pool / count=0,
1345/// LlcAligned no-overlap, SingleCpu empty cpuset, Exact empty or
1346/// disjoint, SmtSiblingPair no-pair-in-cpuset, CrossCgroup on
1347/// zero-CPU topology). The empty-pool "silent degrade to Inherit"
1348/// policy that previously lived here was removed — empty pools are
1349/// operator bugs, not "soft" fallbacks.
1350pub(crate) fn intent_for_spawn(
1351    kind: &AffinityIntent,
1352    cpuset: Option<&BTreeSet<usize>>,
1353    topo: &TestTopology,
1354) -> Result<AffinityIntent> {
1355    Ok(flatten_for_spawn(resolve_affinity_for_cgroup(
1356        kind, cpuset, topo,
1357    )?))
1358}
1359
1360fn flatten_for_spawn(resolved: ResolvedAffinity) -> AffinityIntent {
1361    match resolved {
1362        ResolvedAffinity::None => AffinityIntent::Inherit,
1363        ResolvedAffinity::Fixed(set) => {
1364            if set.is_empty() {
1365                // Invariant: resolve_affinity_for_cgroup bails before
1366                // constructing an empty Fixed (LlcAligned
1367                // empty-effective bail, Exact empty-input bail, Exact
1368                // disjoint-intersection bail, CrossCgroup zero-CPU
1369                // topology bail). Reaching here means a future
1370                // constructor of ResolvedAffinity::Fixed bypassed
1371                // those checks — panic loudly so the regression
1372                // surfaces at the construction site, not as a silent
1373                // inheritance downstream.
1374                unreachable!(
1375                    "ResolvedAffinity::Fixed(empty) reached flatten_for_spawn — \
1376                     resolve_affinity_for_cgroup is supposed to bail on every \
1377                     path that produces an empty Fixed (no-silent-drops \
1378                     invariant). Audit the new caller that constructed it.",
1379                )
1380            } else {
1381                AffinityIntent::Exact(set)
1382            }
1383        }
1384        ResolvedAffinity::SingleCpu(cpu) => AffinityIntent::Exact([cpu].into_iter().collect()),
1385        ResolvedAffinity::Random { from, count } => {
1386            // Round-trip the resolved pool through
1387            // [`AffinityIntent::RandomSubset`] so per-worker
1388            // sampling stays deferred to spawn time
1389            // (`workload::resolve_affinity` samples each worker
1390            // independently).
1391            if count == 0 || from.is_empty() {
1392                // Invariant: resolve_affinity_for_cgroup bails on
1393                // RandomSubset { count: 0 } and on empty intersected
1394                // pools. Same regression-surface contract as the
1395                // Fixed arm above.
1396                unreachable!(
1397                    "ResolvedAffinity::Random {{ count={count}, from={from:?} }} \
1398                     reached flatten_for_spawn with count==0 or empty pool — \
1399                     resolve_affinity_for_cgroup is supposed to bail on those \
1400                     cases (no-silent-drops invariant). Audit the new caller \
1401                     that constructed it.",
1402                )
1403            } else {
1404                AffinityIntent::RandomSubset { from, count }
1405            }
1406        }
1407    }
1408}
1409
1410// ---------------------------------------------------------------------------
1411// Custom scenario helpers
1412// ---------------------------------------------------------------------------
1413
1414/// Create N cgroups, spawn workers in each, and start them.
1415///
1416/// Returns the worker handles and an RAII [`CgroupGroup`] that removes
1417/// the cgroups on drop. Workers are moved into their target cgroups
1418/// before being signaled to start.
1419pub fn setup_cgroups<'a>(
1420    ctx: &'a Ctx,
1421    n: usize,
1422    wl: &WorkloadConfig,
1423) -> Result<(Vec<WorkloadHandle>, CgroupGroup<'a>)> {
1424    let mut guard = CgroupGroup::new(ctx.cgroups);
1425    for i in 0..n {
1426        guard.add_cgroup_no_cpuset(&format!("cg_{i}"))?;
1427    }
1428    thread::sleep(ctx.settle);
1429    // `active_sched_pid()` returns `None` when no scheduler was
1430    // configured (kernel-default path) OR when the caller planted a
1431    // `<= 0` sentinel; both cases skip the liveness-based bail.
1432    if let Some(pid) = ctx.active_sched_pid()
1433        && !process_alive(pid)
1434    {
1435        anyhow::bail!(
1436            "{} after cgroup creation (pid={})",
1437            crate::assert::SCHED_DIED_PREFIX,
1438            pid,
1439        );
1440    }
1441    let names: Vec<String> = (0..n).map(|i| format!("cg_{i}")).collect();
1442    let handles = spawn_and_move(ctx, &names, |_, _| Ok(wl.clone()))?;
1443    Ok((handles, guard))
1444}
1445
1446/// Stop workers, collect reports, and merge per-cgroup telemetry +
1447/// assertion results.
1448///
1449/// Each item is a `(WorkloadHandle, Option<&BTreeSet<usize>>)` pair
1450/// where the optional cpuset is passed through to
1451/// [`Assert::assert_cgroup`](crate::assert::Assert::assert_cgroup)
1452/// for isolation checks. Per-cgroup telemetry ([`crate::assert::CgroupStats`])
1453/// is produced for EVERY handle — one entry per declared cgroup,
1454/// including a `num_workers == 0` entry for a handle that collected no
1455/// reports — independent of whether any worker-level check is configured.
1456/// Worker-check assertion outcomes are recorded only for the checks the
1457/// caller set. (Telemetry was previously gated behind
1458/// `checks.has_worker_checks()`, which silently left
1459/// `ScenarioStats.cgroups` empty for tests that read the telemetry
1460/// without configuring a check.)
1461pub(crate) fn collect_handles<'a>(
1462    handles: impl IntoIterator<Item = (String, WorkloadHandle, Option<&'a BTreeSet<usize>>)>,
1463    checks: &crate::assert::Assert,
1464    topo: Option<&crate::topology::TestTopology>,
1465    step_index: Option<u16>,
1466) -> AssertResult {
1467    let mut r = AssertResult::pass();
1468    for (name, h, cpuset) in handles {
1469        // Bind the cgroup name before it is moved into cg.cgroup_name below,
1470        // so the per-phase per_cgroup carrier can key on it.
1471        let key = name.clone();
1472        let reports = h.stop_and_collect();
1473        let numa_nodes = cpuset.and_then(|cs| topo.map(|t| t.numa_nodes_for_cpuset(cs)));
1474        let mut one = checks.assert_cgroup_with_numa(&reports, cpuset, numa_nodes.as_ref());
1475        // `assert_cgroup_with_numa` produces exactly one CgroupStats entry
1476        // (scenario_stats_for_cgroup); no sub-check populates stats.cgroups,
1477        // so last_mut() is that entry. Label it with the cgroup name here —
1478        // the name is in scope only at the collection layer; cgroup_stats
1479        // sees only the reports. merge() extends cgroups, so the label
1480        // survives the roll-up and surfaces per-cgroup on a passing run.
1481        // The debug_assert trips immediately (in any debug-build test path)
1482        // if a future sub-assert ever adds a second cgroups entry, which
1483        // would make last_mut() mislabel the wrong one.
1484        debug_assert_eq!(
1485            one.stats.cgroups.len(),
1486            1,
1487            "assert_cgroup_with_numa must yield exactly one cgroup entry for \
1488             collect_handles to label correctly; got {}",
1489            one.stats.cgroups.len(),
1490        );
1491        if let Some(cg) = one.stats.cgroups.last_mut() {
1492            cg.cgroup_name = name;
1493        }
1494        // For a step-local cgroup (step_index Some), attach the per-phase
1495        // RAW per-cgroup components as a single-bucket phases entry keyed by the
1496        // step's 1-indexed step_index. AssertResult::merge unions per_cgroup by
1497        // name, so multiple cgroups in one step accumulate into the one bucket;
1498        // the host eval fold then unions these into the host-rebuilt buckets.
1499        // None: a backdrop handle expands each worker's PhaseSlices into
1500        // per-epoch buckets (expand_backdrop_phase_buckets); collect_all and
1501        // the non-step staging collect carry no PhaseSlices, so the
1502        // expansion yields an empty Vec (effectively nothing).
1503        match step_index {
1504            Some(idx) => {
1505                one.stats.phases = vec![crate::assert::step_per_cgroup_bucket(
1506                    &key,
1507                    &reports,
1508                    numa_nodes.as_ref(),
1509                    idx,
1510                )];
1511            }
1512            None => {
1513                // Backdrop (collected with no step_index): expand each
1514                // worker's per-phase PhaseSlices into one PhaseBucket per
1515                // epoch (BASELINE / inter-step-gap epochs skipped). The
1516                // host's fold_guest_per_cgroup_into_host_buckets then
1517                // unions these into the host-rebuilt buckets (matched
1518                // epochs) or surfaces them as orphan not-measured windows.
1519                one.stats.phases = crate::assert::expand_backdrop_phase_buckets(
1520                    &key,
1521                    &reports,
1522                    numa_nodes.as_ref(),
1523                );
1524            }
1525        }
1526        // Handle iteration order IS the per_cgroup fold order: AssertResult::merge
1527        // folds same-name carriers (a multi-WorkSpec cgroup's per-handle carriers)
1528        // in this order, and PhaseCgroupStats::merge's coupled-gap last-wins
1529        // tie-break depends on it matching the order cgroup_stats pools the reports
1530        // (also handle order) for gap-CPU parity. A reorder here would desync them.
1531        r.merge(one);
1532    }
1533    r
1534}
1535
1536/// Stop all workers, collect reports, and run assertion checks.
1537///
1538/// Uses `checks` for worker evaluation. Returns a merged
1539/// [`AssertResult`] across all workers.
1540pub fn collect_all(handles: Vec<WorkloadHandle>, checks: &crate::assert::Assert) -> AssertResult {
1541    collect_handles(
1542        handles.into_iter().map(|h| (String::new(), h, None)),
1543        checks,
1544        None,
1545        // No step concept for the bare collect_all path -> no phase attribution.
1546        None,
1547    )
1548}
1549
1550/// Default [`WorkloadConfig`] with `ctx.workers_per_cgroup` workers.
1551pub fn dfl_wl(ctx: &Ctx) -> WorkloadConfig {
1552    WorkloadConfig {
1553        num_workers: ctx.workers_per_cgroup,
1554        ..Default::default()
1555    }
1556}
1557
1558#[cfg(test)]
1559pub fn split_half(ctx: &Ctx) -> (BTreeSet<usize>, BTreeSet<usize>) {
1560    let usable = ctx.topo.usable_cpus();
1561    let mid = usable.len() / 2;
1562    (
1563        usable[..mid].iter().copied().collect(),
1564        usable[mid..].iter().copied().collect(),
1565    )
1566}
1567
1568/// Spawn diverse workloads across N cgroups: SpinWait, Bursty,
1569/// IoSyncWrite, Mixed, YieldHeavy. Each cgroup uses
1570/// `ctx.workers_per_cgroup` workers except IoSyncWrite cgroups,
1571/// which always use 2 workers to avoid drowning the scenario in
1572/// blocking IO.
1573pub fn spawn_diverse(ctx: &Ctx, cgroup_names: &[&str]) -> Result<Vec<WorkloadHandle>> {
1574    let types = [
1575        WorkType::SpinWait,
1576        WorkType::bursty(Duration::from_millis(50), Duration::from_millis(100)),
1577        WorkType::IoSyncWrite,
1578        WorkType::Mixed,
1579        WorkType::YieldHeavy,
1580    ];
1581    let mut handles = Vec::new();
1582    for (i, name) in cgroup_names.iter().enumerate() {
1583        let wt = types[i % types.len()].clone();
1584        let n = if matches!(wt, WorkType::IoSyncWrite) {
1585            2
1586        } else {
1587            ctx.workers_per_cgroup
1588        };
1589        let mut h = WorkloadHandle::spawn(&WorkloadConfig {
1590            num_workers: n,
1591            work_type: wt,
1592            ..Default::default()
1593        })?;
1594        ctx.cgroups
1595            .move_tasks(name, &h.worker_pids_for_cgroup_procs()?)?;
1596        h.start();
1597        handles.push(h);
1598    }
1599    Ok(handles)
1600}
1601
1602#[cfg(test)]
1603mod tests;