ktstr/scenario/ops/
mod.rs

1//! Composable ops/steps system for dynamic cgroup topology changes.
2//!
3//! [`Op`] is an atomic cgroup operation. [`Step`] sequences ops with a
4//! hold period. [`CgroupDef`] bundles create + cpuset + spawn into a
5//! single declaration. [`execute_steps()`] runs a step sequence with
6//! scheduler liveness checks and stimulus event recording.
7//!
8//! See the [Ops and Steps](https://ktstr.dev/guide/concepts/ops.html)
9//! chapter for a guide.
10//!
11//! # Cgroup tooling at a glance
12//!
13//! ktstr exposes the cgroup v2 surface across two layers — declarative
14//! steady-state via [`CgroupDef`] (set at scenario-setup time, holds
15//! for the cgroup's lifetime) and imperative state-transitions via
16//! [`Op`] (applied mid-step, describe transitions over time):
17//!
18//! | Knob | Layer | API entry | Underlying file | When to use |
19//! |------|-------|-----------|-----------------|-------------|
20//! | CPU affinity | setup | [`CgroupDef::cpuset`] | `cpuset.cpus` | Bind workers to a CPU subset for the whole run. |
21//! | NUMA-mem affinity | setup | [`CgroupDef::cpuset_mems`] | `cpuset.mems` | Constrain allocations to specific NUMA nodes. |
22//! | CPU bandwidth | setup | [`CgroupDef::cpu_quota_pct`] / [`CgroupDef::cpu_quota`] / [`CgroupDef::cpu_unlimited`] | `cpu.max` | Cap CPU time per period (1 CPU at 50% / 2 CPU at 100% / etc). |
23//! | CPU share weight | setup | [`CgroupDef::cpu_weight`] | `cpu.weight` | Bias relative CPU share when siblings contend. |
24//! | Memory ceiling | setup | [`CgroupDef::memory_max`] / [`CgroupDef::memory_unlimited`] | `memory.max` | Hard ceiling — exceeding triggers cgroup OOM. |
25//! | Memory throttle | setup | [`CgroupDef::memory_high`] | `memory.high` | Soft throttle: triggers reclaim, not OOM. |
26//! | Memory protection | setup | [`CgroupDef::memory_low`] | `memory.low` | Soft protection: kernel reclaims from siblings first. |
27//! | Swap cap | setup | [`CgroupDef::memory_swap_max`] / [`CgroupDef::memory_swap_unlimited`] | `memory.swap.max` | Cap how much memory can spill to swap (CONFIG_SWAP=y). |
28//! | IO share | setup | [`CgroupDef::io_weight`] | `io.weight` | Bias relative IO share when siblings contend. |
29//! | Task ceiling | setup | [`CgroupDef::pids_max`] / [`CgroupDef::pids_unlimited`] | `pids.max` | Cap process+thread count — fork/clone returns EAGAIN at limit. |
30//! | Mid-run cpuset rebind | mid-step | [`Op::set_cpuset`] / [`Op::clear_cpuset`] / [`Op::swap_cpusets`] | `cpuset.cpus` | Move cpuset on a live cgroup mid-scenario. |
31//! | Mid-run task migration | mid-step | [`Op::move_all_tasks`] | `cgroup.procs` | Move workers from one cgroup to another. |
32//! | Pause/resume | mid-step | [`Op::freeze_cgroup`] / [`Op::unfreeze_cgroup`] | `cgroup.freeze` | Suspend every task in the cgroup; resume later. |
33//! | Add/remove cgroup | mid-step | [`Op::add_cgroup`] / [`Op::remove_cgroup`] / [`Op::stop_cgroup`] | (cgroupfs mkdir/rmdir) | Spawn / tear down a cgroup mid-scenario. |
34//!
35//! # Worked examples
36//!
37//! * **Static topology** (one cgroup, fixed cpuset, weight-biased
38//!   compute): [`CgroupDef`] type-level docs.
39//! * **Suspend/resume** (3-Step idiom — run, freeze, run again):
40//!   [`Op::FreezeCgroup`] doc.
41//! * **Memory-cap teardown** (rewind a base CgroupDef's swap cap):
42//!   [`CgroupDef::memory_swap_unlimited`] doc.
43//!
44//! # Implementation entry points
45//!
46//! Every knob ends in [`crate::cgroup::CgroupOps`] (production:
47//! [`crate::cgroup::CgroupManager`]; tests: a recording `MockCgroupOps`
48//! double). `apply_setup` runs the [`CgroupDef`] passes; `apply_ops`
49//! dispatches the [`Op`] variants. Both share `ctx.cgroups` so a test
50//! that uses both layers writes through the same RAII teardown
51//! (`crate::scenario::CgroupGroup::Drop`).
52//!
53//! # File layout
54//!
55//! `types` holds the data model: [`Op`], [`CgroupDef`], [`Step`],
56//! [`HoldSpec`], [`Setup`], [`CpusetSpec`], the per-controller limits
57//! structs, and every builder constructor. Re-exported from this module
58//! so external paths remain `crate::scenario::ops::Op` etc. The executor
59//! drives that model against [`crate::cgroup::CgroupOps`] via `apply_setup`
60//! (sibling `setup` module) and `apply_ops` (sibling `dispatch` module),
61//! and exposes the [`execute_steps`] / [`execute_scenario`] family of
62//! public entry points (this file).
63
64mod types;
65pub use types::*;
66
67mod setup;
68pub use setup::PLACEMENT_LOG_PATH;
69use setup::apply_setup;
70
71mod dispatch;
72#[cfg(test)]
73use dispatch::{
74    REPLACE_NOT_TRYING_DEADLINE_S, build_kernel_op_request, dispatch_kernel_op_request,
75    merge_adjacent_cold_writes, staged_scheduler_log_path, wait_for_accessor_publish_or_bail,
76    wait_for_worker_state_not_trying_or_bail, write_entries_from_writes,
77};
78use dispatch::{apply_ops, render_cgroup_key};
79// Re-export the scx-state reader + enum. The guest-init scheduler
80// attach-readiness predicate (rust_init::scheduler::scx_attach_ready) reads
81// scx_state() to confirm SCX_ENABLED before the workload dispatches, and the
82// survives_storm probe's host unit test (`test_support::probe_tests`) drives
83// scheduler liveness on the host (no VM) via the set_test_scx_state seam.
84// `dispatch` is otherwise private to `ops`.
85#[cfg(test)]
86pub(crate) use dispatch::set_test_scx_state;
87pub(crate) use dispatch::{ScxState, scx_down, scx_state};
88
89use std::collections::BTreeSet;
90use std::thread;
91use std::time::Duration;
92
93use anyhow::{Context, Result};
94
95use crate::assert::AssertResult;
96use crate::scenario::backdrop;
97use crate::scenario::{CgroupGroup, Ctx, process_alive};
98use crate::vmm::guest_comms;
99use crate::vmm::wire::StimulusPayload;
100use crate::workload::{MemPolicy, WorkloadHandle};
101
102// ---------------------------------------------------------------------------
103// Step executor
104// ---------------------------------------------------------------------------
105
106/// Persistent scenario-wide state owned by
107/// [`execute_scenario_with`]. Lives for the entire step sequence;
108/// cgroups, workload handles, and payload handles declared by the
109/// [`Backdrop`](backdrop::Backdrop) go here and only tear
110/// down at scenario end (success or Err). See [`StepState`] for
111/// the step-local counterpart.
112struct BackdropState<'a> {
113    /// RAII cgroup guard for persistent cgroups — removes them on drop.
114    cgroups: CgroupGroup<'a>,
115    /// Active workload handles in persistent cgroups, keyed by cgroup name.
116    handles: Vec<(String, WorkloadHandle)>,
117    /// Resolved cpusets per persistent cgroup name.
118    cpusets: std::collections::HashMap<String, BTreeSet<usize>>,
119    /// Active payload-binary handles owned by the backdrop. Drained
120    /// via `.kill()` at scenario teardown so the metric-emission
121    /// pipeline still fires.
122    payload_handles: Vec<PayloadEntry>,
123    /// BPF map fds opened via [`crate::scenario::ops::types::Op::PinBpfMap`].
124    /// Keyed by the map name the caller requested; the
125    /// [`std::os::fd::OwnedFd`] holds an extra refcount on the
126    /// kernel-side `struct bpf_map` so the map survives any
127    /// scheduler-process teardown (including
128    /// [`crate::scenario::ops::types::Op::ReplaceScheduler`]) until
129    /// scenario end. Drops close the fds and release the refcount.
130    pinned_bpf_maps: std::collections::HashMap<String, std::os::fd::OwnedFd>,
131}
132
133impl<'a> BackdropState<'a> {
134    /// Empty backdrop state (no persistent entities), scoped to `ctx.cgroups`.
135    fn empty(ctx: &'a Ctx) -> Self {
136        Self {
137            cgroups: CgroupGroup::new(ctx.cgroups),
138            handles: Vec::new(),
139            cpusets: std::collections::HashMap::new(),
140            payload_handles: Vec::new(),
141            pinned_bpf_maps: std::collections::HashMap::new(),
142        }
143    }
144}
145
146/// Step-local execution state. Fresh per step, torn down at step
147/// boundary: cgroups removed (via RAII drop), workload handles
148/// collected, payload handles killed with metric emission. Any ops
149/// in the step that reference a cgroup name look here first before
150/// falling through to [`BackdropState`].
151struct StepState<'a> {
152    /// RAII cgroup guard — removes step-local cgroups on drop.
153    cgroups: CgroupGroup<'a>,
154    /// Active workload handles keyed by step-local cgroup name.
155    handles: Vec<(String, WorkloadHandle)>,
156    /// Resolved cpusets per step-local cgroup name, for isolation checks.
157    cpusets: std::collections::HashMap<String, BTreeSet<usize>>,
158    /// Active payload-binary handles keyed by cgroup name. Each entry
159    /// came from either a [`CgroupDef::workload`] spawn in
160    /// `apply_setup` or an explicit [`Op::RunPayload`] invocation;
161    /// `source` tags which path spawned it so the duplicate-name
162    /// dedup in `Op::RunPayload` can point at the original site. All
163    /// are killed during step-teardown / cgroup removal so cgroupfs
164    /// cleanup never trips EBUSY on a live process.
165    payload_handles: Vec<PayloadEntry>,
166    /// Host-mode worker stall monitor, started lazily at the end of
167    /// the first successful [`apply_setup`] when running outside a
168    /// VM (no `is_guest`, no `cargo_test_mode`) and at least one
169    /// worker exists. The handle's [`Drop`] joins the polling
170    /// thread when [`StepState`] drops; [`collect_step`] drains
171    /// any accumulated reports before that drop so they reach the
172    /// scenario's [`AssertResult`]. `None` in every guest-side
173    /// scenario and in `cargo_test_mode` runs — the host-side
174    /// monitor is the only stall signal available in host-mode,
175    /// where the freeze coordinator / KVM-side stall plumbing is
176    /// not running. See [`crate::scenario::host_stall`] for the
177    /// signal definition and detection latency contract.
178    stall_monitor: Option<crate::scenario::host_stall::StallMonitorHandle>,
179}
180
181impl<'a> StepState<'a> {
182    /// Empty step state scoped to `ctx.cgroups`.
183    fn empty(ctx: &'a Ctx) -> Self {
184        Self {
185            cgroups: CgroupGroup::new(ctx.cgroups),
186            handles: Vec::new(),
187            cpusets: std::collections::HashMap::new(),
188            payload_handles: Vec::new(),
189            stall_monitor: None,
190        }
191    }
192}
193
194/// Combined mutable view over step-local and backdrop state.
195///
196/// Every function that touches execution state (apply_setup,
197/// apply_ops, the drain helpers) receives a
198/// `ScenarioState`; lookups prefer step-local, falling through to
199/// backdrop. New state created via ops/setup inside a step writes
200/// to step-local by default — that is the primary mechanism
201/// enforcing per-step bounded lifetime. Setup for the Backdrop
202/// itself (run once before the step loop) writes straight to the
203/// backdrop side via [`ScenarioState::with_target_backdrop`].
204struct ScenarioState<'a, 'b> {
205    step: &'b mut StepState<'a>,
206    backdrop: &'b mut BackdropState<'a>,
207    /// When true, all mutations route to [`Self::backdrop`] instead
208    /// of [`Self::step`]. Set by [`Self::with_target_backdrop`] when
209    /// running the Backdrop's initial `apply_setup` / `apply_ops`
210    /// before the first step.
211    target_backdrop: bool,
212}
213
214impl<'a, 'b> ScenarioState<'a, 'b> {
215    /// Build a combined scenario view. Starts with the step-local
216    /// slot as the mutation target — call [`Self::with_target_backdrop`]
217    /// to flip into backdrop-setup mode for Backdrop's own
218    /// apply_setup / apply_ops pass.
219    fn new(step: &'b mut StepState<'a>, backdrop: &'b mut BackdropState<'a>) -> Self {
220        Self {
221            step,
222            backdrop,
223            target_backdrop: false,
224        }
225    }
226
227    /// Run `f` with writes routed to the backdrop side.
228    fn with_target_backdrop<R>(&mut self, f: impl FnOnce(&mut Self) -> R) -> R {
229        let prev = self.target_backdrop;
230        self.target_backdrop = true;
231        let r = f(self);
232        self.target_backdrop = prev;
233        r
234    }
235
236    /// `cgroups` group that receives newly-created cgroups. Step-local
237    /// by default; backdrop when [`Self::with_target_backdrop`] is active.
238    fn target_cgroups(&mut self) -> &mut CgroupGroup<'a> {
239        if self.target_backdrop {
240            &mut self.backdrop.cgroups
241        } else {
242            &mut self.step.cgroups
243        }
244    }
245
246    /// `handles` vec that receives newly-spawned workload handles.
247    fn target_handles(&mut self) -> &mut Vec<(String, WorkloadHandle)> {
248        if self.target_backdrop {
249            &mut self.backdrop.handles
250        } else {
251            &mut self.step.handles
252        }
253    }
254
255    /// `cpusets` map that receives resolved cpusets for new cgroups.
256    fn target_cpusets(&mut self) -> &mut std::collections::HashMap<String, BTreeSet<usize>> {
257        if self.target_backdrop {
258            &mut self.backdrop.cpusets
259        } else {
260            &mut self.step.cpusets
261        }
262    }
263
264    /// `payload_handles` vec that receives newly-spawned payload handles.
265    fn target_payload_handles(&mut self) -> &mut Vec<PayloadEntry> {
266        if self.target_backdrop {
267            &mut self.backdrop.payload_handles
268        } else {
269            &mut self.step.payload_handles
270        }
271    }
272
273    /// Resolved cpuset for a cgroup name, looked up step-first then backdrop.
274    fn lookup_cpuset(&self, name: &str) -> Option<&BTreeSet<usize>> {
275        self.step
276            .cpusets
277            .get(name)
278            .or_else(|| self.backdrop.cpusets.get(name))
279    }
280
281    /// Returns the live payload handle matching the composite key
282    /// (`payload_name`, `cgroup_key`) from either step-local or
283    /// backdrop state, or `None` when no entry matches. Used for
284    /// the `Op::RunPayload` duplicate guard, which now treats
285    /// "same payload in a different cgroup" as legitimate rather
286    /// than a name collision.
287    fn find_live_payload_with_cgroup(
288        &self,
289        payload_name: &str,
290        cgroup_key: &str,
291    ) -> Option<&PayloadEntry> {
292        let matches =
293            |e: &&PayloadEntry| e.handle.payload_name() == payload_name && e.cgroup == cgroup_key;
294        self.step
295            .payload_handles
296            .iter()
297            .find(matches)
298            .or_else(|| self.backdrop.payload_handles.iter().find(matches))
299    }
300
301    /// Drop a payload handle by composite key (`name`, optional
302    /// `cgroup`). Checks step-local first, then backdrop.
303    ///
304    /// - `cgroup = Some(c)`: exact match on both name and cgroup.
305    /// - `cgroup = None`: if exactly one entry matches `name` across
306    ///   both slots, consume it (backward-compat for
307    ///   `Op::wait_payload(name)` / `Op::kill_payload(name)` when
308    ///   only one copy is live). If two or more match, returns
309    ///   `Err(ambiguous_cgroups)` where `ambiguous_cgroups` is the
310    ///   list of cgroup keys for the candidates so the caller can
311    ///   produce an actionable error.
312    ///
313    /// Returns `Ok(None)` when no entry matches.
314    fn take_payload_by_name(
315        &mut self,
316        name: &str,
317        cgroup: Option<&str>,
318    ) -> std::result::Result<Option<PayloadEntry>, Vec<String>> {
319        if let Some(c) = cgroup {
320            // Composite-key path: exact match on both.
321            if let Some(idx) = self
322                .step
323                .payload_handles
324                .iter()
325                .position(|e| e.handle.payload_name() == name && e.cgroup == c)
326            {
327                return Ok(Some(self.step.payload_handles.swap_remove(idx)));
328            }
329            if let Some(idx) = self
330                .backdrop
331                .payload_handles
332                .iter()
333                .position(|e| e.handle.payload_name() == name && e.cgroup == c)
334            {
335                return Ok(Some(self.backdrop.payload_handles.swap_remove(idx)));
336            }
337            return Ok(None);
338        }
339        // Name-only path: disambiguate across both slots before
340        // consuming, so a mid-test wait on an ambiguous name
341        // surfaces the caller's bug rather than silently waiting
342        // on the first match.
343        let mut step_idx: Option<usize> = None;
344        let mut backdrop_idx: Option<usize> = None;
345        let mut cgroups: Vec<String> = Vec::new();
346        for (i, e) in self.step.payload_handles.iter().enumerate() {
347            if e.handle.payload_name() == name {
348                if step_idx.is_none() {
349                    step_idx = Some(i);
350                }
351                cgroups.push(e.cgroup.clone());
352            }
353        }
354        for (i, e) in self.backdrop.payload_handles.iter().enumerate() {
355            if e.handle.payload_name() == name {
356                if backdrop_idx.is_none() && step_idx.is_none() {
357                    backdrop_idx = Some(i);
358                }
359                cgroups.push(e.cgroup.clone());
360            }
361        }
362        if cgroups.len() > 1 {
363            return Err(cgroups);
364        }
365        if let Some(i) = step_idx {
366            return Ok(Some(self.step.payload_handles.swap_remove(i)));
367        }
368        if let Some(i) = backdrop_idx {
369            return Ok(Some(self.backdrop.payload_handles.swap_remove(i)));
370        }
371        Ok(None)
372    }
373
374    /// Drain every live payload handle in step + backdrop state by
375    /// calling `.kill()` so the metric-emission pipeline fires. Used
376    /// on error paths in the step loop so mid-scenario failure still
377    /// leaves a usable sidecar.
378    fn drain_all_payloads(&mut self) {
379        drain_all_payload_handles(&mut self.step.payload_handles);
380        drain_all_payload_handles(&mut self.backdrop.payload_handles);
381    }
382
383    /// Kill every payload handle (step-first, then backdrop) whose
384    /// cgroup matches `cgroup`. Called before a cgroup removal so
385    /// cgroupfs cleanup does not trip EBUSY on a live process.
386    fn drain_payloads_for_cgroup(&mut self, cgroup: &str) {
387        drain_payload_handles_for_cgroup(&mut self.step.payload_handles, cgroup);
388        drain_payload_handles_for_cgroup(&mut self.backdrop.payload_handles, cgroup);
389    }
390
391    /// Remove every workload handle whose key matches `cgroup`. The
392    /// handles themselves drop (which SIGKILLs the workers) — this is
393    /// appropriate for `Op::StopCgroup` and `Op::RemoveCgroup`.
394    fn drop_handles_for_cgroup(&mut self, cgroup: &str) {
395        self.step.handles.retain(|(n, _)| n.as_str() != cgroup);
396        self.backdrop.handles.retain(|(n, _)| n.as_str() != cgroup);
397    }
398
399    /// Forget a tracked cpuset (step-first, then backdrop) for a cgroup.
400    fn forget_cpuset(&mut self, cgroup: &str) {
401        self.step.cpusets.remove(cgroup);
402        self.backdrop.cpusets.remove(cgroup);
403    }
404
405    /// Record / overwrite the resolved cpuset for a cgroup. If the
406    /// cgroup is known to step-local state, the step-local entry
407    /// updates; if it's known to backdrop, the backdrop entry
408    /// updates; otherwise the entry goes into the currently-active
409    /// target (step-local, or backdrop inside `with_target_backdrop`).
410    fn record_cpuset(&mut self, cgroup: &str, cpuset: BTreeSet<usize>) {
411        if self.step.cpusets.contains_key(cgroup) {
412            self.step.cpusets.insert(cgroup.to_string(), cpuset);
413        } else if self.backdrop.cpusets.contains_key(cgroup) {
414            self.backdrop.cpusets.insert(cgroup.to_string(), cpuset);
415        } else {
416            self.target_cpusets().insert(cgroup.to_string(), cpuset);
417        }
418    }
419
420    /// Re-key every workload handle from `from` to `to`. When `to`
421    /// names a Backdrop-owned cgroup, step-local handles are also
422    /// transferred into [`Self::backdrop`] so their lifetime extends
423    /// to scenario end instead of dying at step teardown. Backdrop
424    /// handles stay in the backdrop slot regardless of `to`.
425    ///
426    /// Called by `Op::MoveAllTasks` after the kernel-side
427    /// `cgroup.procs` writes succeed so subsequent ops that address
428    /// the moved workers by cgroup name find them under the new key
429    /// and in the correct state slot.
430    fn rename_handles(&mut self, from: &str, to: &str) {
431        let to_is_backdrop = self.cgroup_name_is_backdrop(to);
432        if to_is_backdrop {
433            // Move step-local handles keyed under `from` into the
434            // backdrop slot, re-keyed to `to`. Iterate in reverse so
435            // swap_remove indices stay stable.
436            let mut i = self.step.handles.len();
437            while i > 0 {
438                i -= 1;
439                if self.step.handles[i].0.as_str() == from {
440                    let (_, handle) = self.step.handles.swap_remove(i);
441                    self.backdrop.handles.push((to.to_string(), handle));
442                }
443            }
444        } else {
445            // Step-local destination: keep ownership, just rename.
446            for (name, _) in &mut self.step.handles {
447                if name.as_str() == from {
448                    *name = to.to_string();
449                }
450            }
451        }
452        // Backdrop handles are never demoted to step-local ownership
453        // regardless of destination — a backdrop worker is declared
454        // persistent and stays persistent for the scenario. Rename
455        // in place so subsequent ops still find it under the new key.
456        for (name, _) in &mut self.backdrop.handles {
457            if name.as_str() == from {
458                *name = to.to_string();
459            }
460        }
461    }
462
463    /// Iterate every live workload handle across step + backdrop.
464    /// Used by `Op::MoveAllTasks` / `Op::SetAffinity` which act on
465    /// whichever cgroup owns the handle without caring about which
466    /// state slot it's in.
467    fn all_handles(&self) -> impl Iterator<Item = &(String, WorkloadHandle)> {
468        self.step.handles.iter().chain(self.backdrop.handles.iter())
469    }
470
471    /// True iff a cgroup with the given name is already tracked by
472    /// either step-local or backdrop state. Used to reject duplicate
473    /// names at `apply_setup` time so a user can't accidentally
474    /// shadow a Backdrop cgroup with a step-local [`CgroupDef`].
475    fn cgroup_name_is_tracked(&self, name: &str) -> bool {
476        self.step.cgroups.names().iter().any(|n| n == name)
477            || self.backdrop.cgroups.names().iter().any(|n| n == name)
478    }
479
480    /// True iff a cgroup with the given name is tracked by backdrop
481    /// (persistent) state. Used by `Op::MoveAllTasks` to decide
482    /// handle-ownership transfer direction (step→backdrop transfers
483    /// the handle into the persistent slot; backdrop→step-local is
484    /// rejected because it would orphan workers at step teardown).
485    fn cgroup_name_is_backdrop(&self, name: &str) -> bool {
486        self.backdrop.cgroups.names().iter().any(|n| n == name)
487    }
488}
489
490/// Whether a live payload handle was spawned by an explicit
491/// [`Op::RunPayload`] inside the step or by a
492/// [`CgroupDef::workload`] attachment at `apply_setup`. Held by
493/// every [`PayloadEntry`] so the dedup path in `Op::RunPayload`
494/// can name the original source when rejecting a second spawn of
495/// the same name.
496#[derive(Debug, Clone, Copy, PartialEq, Eq)]
497enum PayloadSource {
498    /// Spawned by `CgroupDef::workload(&payload)` during `apply_setup`.
499    CgroupDefWorkload,
500    /// Spawned by `Op::RunPayload { payload, .. }` inside the step's ops.
501    OpRunPayload,
502}
503
504impl PayloadSource {
505    /// Human-readable tag for error output. Describes the API surface
506    /// that originated the spawn, not the internal dispatch site.
507    fn describe(self) -> &'static str {
508        match self {
509            PayloadSource::CgroupDefWorkload => "CgroupDef::workload",
510            PayloadSource::OpRunPayload => "Op::RunPayload",
511        }
512    }
513}
514
515/// One live payload handle plus the cgroup it runs inside and the
516/// API surface that spawned it. `cgroup` is empty iff
517/// `source == PayloadSource::OpRunPayload` was invoked without a
518/// `cgroup = Some(...)` argument — in which case the payload runs
519/// in whatever cgroup its parent process inherited (no explicit
520/// placement).
521struct PayloadEntry {
522    cgroup: String,
523    source: PayloadSource,
524    handle: crate::scenario::payload_run::PayloadHandle,
525}
526
527/// Map the BPF probe's current scheduler-exit classification onto
528/// the [`crate::assert::DetailKind`] variant the three liveness
529/// emission sites push. Reads [`crate::probe::process::sched_exit_kind`]
530/// which mirrors the probe's `ktstr_err_exit_detected` BSS latch
531/// across threads.
532///
533/// Returns:
534/// - `SchedulerCrashed` when the probe observed a non-clean kernel
535///   exit (any path that latched `ktstr_err_exit_detected`).
536/// - `SchedulerExitedCleanly` when the probe ran but never observed
537///   the latch (clean `SCX_EXIT_NONE` teardown, or the scheduler
538///   exited for a benign reason).
539/// - `SchedulerDiedUnknownReason` when the probe has not classified
540///   yet — typically the probe pipeline never wired for this run
541///   (host-only test, no scheduler attached) or the poll thread has
542///   not completed a first iteration since the prior reset.
543fn sched_died_detail_kind() -> crate::assert::DetailKind {
544    use crate::assert::DetailKind;
545    use crate::probe::process::{SchedExitKind, sched_exit_kind};
546    match sched_exit_kind() {
547        SchedExitKind::Crashed => DetailKind::SchedulerCrashed,
548        SchedExitKind::Clean => DetailKind::SchedulerExitedCleanly,
549        SchedExitKind::Unknown => DetailKind::SchedulerDiedUnknownReason,
550    }
551}
552
553/// Classify scheduler liveness for the guest-side `survives_storm` probe:
554/// `Some(kind)` when a scheduler is still expected to run (`sched_pid` set) but
555/// has died or gone down — the leader pid ESRCH'd OR the scx state reads
556/// `disabling`/`disabled` ([`dispatch::scx_down`]) — and `None` otherwise.
557///
558/// Mirrors the inter-step / post-loop liveness gate `run_scenario` runs for
559/// `execute_*` scenarios (`!process_alive(pid) || dispatch::scx_down()`, gated
560/// on `sched_pid()` being set), so a clean `Op::DetachScheduler` — which clears
561/// the pid before this would observe `disabled` — does not trip it. The kind
562/// comes from [`sched_died_detail_kind`]; in the primary test VM the BPF
563/// err-exit latch is unread, so the kind is
564/// [`crate::assert::DetailKind::SchedulerDiedUnknownReason`] (the scx state is
565/// kind-less — a crash and a clean unregister both read `disabling`/`disabled`).
566pub(crate) fn sched_liveness_failure_kind() -> Option<crate::assert::DetailKind> {
567    let pid = crate::vmm::rust_init::sched_pid()?;
568    if !process_alive(pid) || dispatch::scx_down() {
569        Some(sched_died_detail_kind())
570    } else {
571        None
572    }
573}
574
575/// Execute a single step with CgroupDefs that hold for the full duration.
576///
577/// Convenience wrapper around [`execute_steps`] for the common pattern
578/// of creating cgroups and running them for [`HoldSpec::FULL`].
579pub fn execute_defs(ctx: &Ctx, defs: Vec<CgroupDef>) -> Result<AssertResult> {
580    execute_steps(ctx, vec![Step::with_defs(defs, HoldSpec::FULL)])
581}
582
583/// Block until the host freeze coordinator has ADOPTED its
584/// kernel-symbol accessor — signalled via the
585/// `SIGNAL_ACCESSOR_READY` wake byte →
586/// `accessor_ready_latch` (set by `hvc0_poll_loop`). A failure dump
587/// captured by a stall AFTER this returns renders real BPF map values
588/// instead of placeholders, because the coordinator's `owned_accessor`
589/// is adopted before the stall fires. Call this in a dump-asserting
590/// scenario before triggering its stall (e.g. before [`execute_steps`]
591/// with a `--stall-after` scheduler).
592///
593/// Guest-only: a no-op on the host (unit tests), where the latch is
594/// never armed. Warn-and-proceed on a 60s timeout — a never-adopted
595/// accessor is a worker failure, and surfacing it as a placeholder dump
596/// is more useful than blocking the test forever (mirrors the
597/// `wait_for_map_write` gate's soft-timeout policy). On that timeout the
598/// "renders real values" guarantee above does NOT hold: the wait returns
599/// without adoption and a subsequent stall may dump placeholders, which
600/// the test's post-VM dump assertions then surface as a failure.
601///
602/// The latch is sticky (level-triggered) and sets once, on the FIRST
603/// adoption. A re-init publish after a scheduler swap
604/// (`Op::ReplaceScheduler`) reuses the same latch, so a later
605/// `await_accessor_ready` returns immediately and does NOT re-synchronise
606/// on the post-swap adoption — today's only caller gates the first stall,
607/// before any swap.
608pub fn await_accessor_ready() {
609    if guest_comms::is_guest() {
610        let latch = crate::vmm::rust_init::accessor_ready_latch();
611        if !latch.wait_timeout(Duration::from_secs(60)) {
612            tracing::warn!(
613                "await_accessor_ready timed out after 60s — host freeze \
614                 coordinator did not signal accessor adoption; a dump from a \
615                 stall after this point may render placeholder map values"
616            );
617        }
618    }
619}
620
621/// Execute a sequence of steps against the given context.
622///
623/// Convenience wrapper around [`execute_steps_with`] that passes
624/// `None` for checks, falling back to `ctx.assert`. Use
625/// [`execute_steps_with`] when you need to override `ctx.assert`.
626pub fn execute_steps(ctx: &Ctx, steps: Vec<Step>) -> Result<AssertResult> {
627    execute_steps_with(ctx, steps, None)
628}
629
630/// Execute a [`Backdrop`](backdrop::Backdrop) + Steps sequence
631/// against the given context.
632///
633/// The Backdrop declares persistent scenario-wide state
634/// (long-running payloads, cgroups referenced by many Steps) while
635/// Steps express bounded per-phase behavior. The runtime sets up
636/// the Backdrop before the first Step, runs the Step sequence
637/// with per-Step teardown (cgroups removed, workload handles
638/// collected, payload handles killed at step boundary), and tears
639/// the Backdrop down at the end.
640pub fn execute_scenario(
641    ctx: &Ctx,
642    backdrop: backdrop::Backdrop,
643    steps: Vec<Step>,
644) -> Result<AssertResult> {
645    execute_scenario_with(ctx, backdrop, steps, None)
646}
647
648/// [`execute_scenario`] with an explicit
649/// [`Assert`](crate::assert::Assert) override — the Backdrop
650/// equivalent of [`execute_steps_with`].
651pub fn execute_scenario_with(
652    ctx: &Ctx,
653    backdrop: backdrop::Backdrop,
654    steps: Vec<Step>,
655    checks: Option<&crate::assert::Assert>,
656) -> Result<AssertResult> {
657    run_scenario(ctx, backdrop, steps, checks)
658}
659
660/// Execute steps with an explicit [`Assert`](crate::assert::Assert) for
661/// worker checks. When `checks` is `Some`, it overrides `ctx.assert`.
662/// When `None`, uses `ctx.assert` (the merged three-layer config).
663///
664/// Thin wrapper around [`execute_scenario_with`] with an empty
665/// [`Backdrop`](backdrop::Backdrop) — every Step's effects
666/// (cgroups, workloads, payloads) tear down at the step boundary.
667pub fn execute_steps_with(
668    ctx: &Ctx,
669    steps: Vec<Step>,
670    checks: Option<&crate::assert::Assert>,
671) -> Result<AssertResult> {
672    execute_scenario_with(ctx, backdrop::Backdrop::new(), steps, checks)
673}
674
675/// Compute the union of cgroup v2 controllers required by a
676/// Backdrop and Step sequence. Walks every [`CgroupDef`] declaration
677/// and every [`Op`] variant, returning the smallest set of
678/// controllers that must be enabled in `cgroup.subtree_control` for
679/// the scenario's per-knob writes to land.
680///
681/// Mapping:
682/// - [`CgroupDef::cpuset`] / [`CgroupDef::cpuset_mems`] → `Controller::Cpuset`
683/// - [`CgroupDef::cpu`] → `Controller::Cpu`
684/// - [`CgroupDef::memory`] → `Controller::Memory`
685/// - [`CgroupDef::pids`] → `Controller::Pids`
686/// - [`CgroupDef::io`] → `Controller::Io`
687/// - [`Op::SetCpuset`] / [`Op::ClearCpuset`] / [`Op::SwapCpusets`] /
688///   [`Op::SetAffinity`] → `Controller::Cpuset`
689/// - Every other [`Op`] variant ([`Op::FreezeCgroup`],
690///   [`Op::AddCgroup`], [`Op::Spawn`], [`Op::MoveAllTasks`], etc.)
691///   touches cgroup-core knobs (`cgroup.freeze`, `cgroup.procs`,
692///   `mkdir`/`rmdir`) which are ungated by any controller and
693///   contribute nothing to this set.
694///
695/// Returning the SMALLEST set lets a test that intentionally
696/// requires the absence of a controller (e.g. testing behavior on
697/// a kernel without `+cpu`) get an empty subtree_control write.
698fn required_controllers(
699    ctx: &Ctx,
700    backdrop: &backdrop::Backdrop,
701    steps: &[Step],
702) -> BTreeSet<crate::cgroup::Controller> {
703    use crate::cgroup::Controller;
704    fn absorb_def(set: &mut BTreeSet<Controller>, def: &CgroupDef) {
705        if def.cpuset.is_some() || def.cpuset_mems.is_some() {
706            set.insert(Controller::Cpuset);
707        }
708        if def.cpu.is_some() {
709            set.insert(Controller::Cpu);
710        }
711        if def.memory.is_some() {
712            set.insert(Controller::Memory);
713        }
714        if def.io.is_some() {
715            set.insert(Controller::Io);
716        }
717        if def.pids.is_some() {
718            set.insert(Controller::Pids);
719        }
720    }
721    fn absorb_op(set: &mut BTreeSet<Controller>, op: &Op) {
722        if matches!(
723            op,
724            Op::SetCpuset { .. }
725                | Op::ClearCpuset { .. }
726                | Op::SwapCpusets { .. }
727                | Op::SetAffinity { .. }
728        ) {
729            set.insert(Controller::Cpuset);
730        }
731        // AddCgroupDef carries a full CgroupDef whose knobs may
732        // require any of the same controllers absorb_def covers. The
733        // op-applied def goes through apply_setup at op-execute time,
734        // which writes to those controller files; the parent's
735        // subtree_control must already have the controllers enabled
736        // by then, so absorb the def's needs into the pre-scenario
737        // controller setup the same way step-local CgroupDefs do.
738        if let Op::AddCgroupDef { def } = op {
739            absorb_def(set, def);
740        }
741    }
742    let mut set = BTreeSet::new();
743    for def in &backdrop.cgroups {
744        absorb_def(&mut set, def);
745    }
746    for op in &backdrop.ops {
747        absorb_op(&mut set, op);
748    }
749    for step in steps {
750        for def in step.setup.resolve(ctx) {
751            absorb_def(&mut set, &def);
752        }
753        for op in &step.ops {
754            absorb_op(&mut set, op);
755        }
756    }
757    set
758}
759
760/// Validate every step hold spec and reject scheduler-kind Backdrop
761/// payloads up front so failures surface at the declaration, before
762/// any runtime state is created.
763fn validate_scenario_inputs(backdrop: &backdrop::Backdrop, steps: &[Step]) -> Result<()> {
764    // Validate every step's hold spec up front so a typo doesn't
765    // reach `Duration::from_secs_f64(NaN)` / `thread::sleep(ZERO)` /
766    // a no-yield Loop busy-wait after ops have already been applied.
767    for (i, step) in steps.iter().enumerate() {
768        if let Err(reason) = step.hold.validate() {
769            anyhow::bail!("step {i} hold validation: {reason}");
770        }
771    }
772    // Validate Backdrop payloads before creating any runtime state.
773    // Only binary payloads can be spawned by Op::RunPayload, which
774    // is what the Backdrop setup uses under the hood. Reject
775    // scheduler-kind payloads here so the failure surface is the
776    // Backdrop declaration, not a mid-scenario spawn error after
777    // cgroups have already been created.
778    for p in &backdrop.payloads {
779        if p.is_scheduler() {
780            anyhow::bail!(
781                "Backdrop::push_payload received scheduler-kind Payload '{}' — \
782                 only PayloadKind::Binary payloads run in the Backdrop; \
783                 place scheduler-kind payloads on the #[ktstr_test(scheduler = ...)] \
784                 attribute instead",
785                p.name,
786            );
787        }
788    }
789    // Scheduler-kind payloads smuggled via Backdrop::push_op(Op::RunPayload { ... })
790    // would otherwise bypass the check above and only bail deep inside
791    // apply_ops. Reject them here with a Backdrop-specific error so
792    // the failure surface matches the declaration surface.
793    for op in &backdrop.ops {
794        if let Op::RunPayload { payload, .. } = op
795            && payload.is_scheduler()
796        {
797            anyhow::bail!(
798                "Backdrop::push_op(Op::RunPayload) received scheduler-kind Payload '{}' — \
799                 only PayloadKind::Binary payloads run in the Backdrop; \
800                 place scheduler-kind payloads on the #[ktstr_test(scheduler = ...)] \
801                 attribute instead",
802                payload.name,
803            );
804        }
805    }
806    Ok(())
807}
808
809/// Guest-side gate that blocks until the host's queued BPF map writes
810/// land, so the workload phase observes fresh map values.
811fn wait_for_host_map_write(ctx: &Ctx) {
812    // When a host-side BPF map write is configured the test framework
813    // sets `wait_for_map_write=true`; in that case block until the
814    // guest's `hvc0_poll_loop` observes
815    // [`crate::vmm::virtio_console::SIGNAL_BPF_WRITE_DONE`] (pushed by
816    // the host's `bpf-map-write` thread after every queued
817    // `bpf_map_write` lands) and fires the `bpf_map_write_done` latch.
818    // Without this gate the workload phase races against the host's
819    // map writes and may observe a stale BPF map value.
820    //
821    // Guest-only path. On the host (unit tests) the latch is never
822    // armed, so we skip the wait entirely. The 60 s timeout matches
823    // the bpf-map-write thread's combined phase 1 + phase 2 budget
824    // (30 s accessor init + 30 s map discovery in
825    // `freeze_coord::start_bpf_map_write`); a real timeout means the
826    // host failed to resolve a map. The scenario continues anyway
827    // (rather than `bail!`) because the legacy rendezvous also let
828    // the guest proceed under its own timeout, and a bail here would
829    // mask the underlying host-side resolution failure with a
830    // test-side `Err`.
831    if ctx.wait_for_map_write && guest_comms::is_guest() {
832        let latch = crate::vmm::rust_init::bpf_map_write_done_latch();
833        if !latch.wait_timeout(Duration::from_secs(60)) {
834            tracing::warn!(
835                "wait_for_map_write timed out after 60s — host bpf-map-write \
836                 thread may have failed to resolve a queued map; proceeding \
837                 with the workload regardless"
838            );
839        }
840    }
841}
842
843/// Run persistent Backdrop setup. `Ok(None)` on success; `Ok(Some(r))`
844/// carries the failure AssertResult when setup errors so the caller
845/// returns it directly.
846fn run_backdrop_setup<'a>(
847    ctx: &'a Ctx,
848    backdrop: &backdrop::Backdrop,
849    backdrop_state: &mut BackdropState<'a>,
850    effective_checks: &crate::assert::Assert,
851    result: &AssertResult,
852) -> Result<Option<AssertResult>> {
853    // --- Backdrop setup (persistent) ---
854    // Run before the first Step. Cgroups + payloads declared on
855    // `backdrop` land in `backdrop_state` so they survive every
856    // Step's teardown. On error, drain Backdrop payload handles
857    // (metric emission) and propagate.
858    if backdrop.is_empty() {
859        return Ok(None);
860    }
861    let mut step_staging = StepState::empty(ctx);
862    let mut scratch = ScenarioState::new(&mut step_staging, backdrop_state);
863    let setup_res = scratch.with_target_backdrop(|s| {
864        // Order: cgroups → ops → payloads. CgroupDefs go first so
865        // a later `Op::add_cgroup` / `Op::run_payload_in_cgroup`
866        // can target cgroups that `apply_setup` just created.
867        // Payloads spawn last so `run_payload` resolving a cgroup
868        // placement lands inside a cgroup that either apply pass
869        // already built.
870        if !backdrop.cgroups.is_empty() {
871            apply_setup(ctx, s, &backdrop.cgroups)?;
872        }
873        // Raw ops: typically `Op::AddCgroup` for empty move-target
874        // cgroups (can't be expressed via CgroupDef because
875        // apply_setup forces a worker spawn), or placement-aware
876        // `Op::RunPayload` targeting a just-created backdrop
877        // cgroup.
878        if !backdrop.ops.is_empty() {
879            apply_ops(ctx, s, &backdrop.ops, false)?;
880        }
881        // Shorthand payloads: one Op::RunPayload per entry,
882        // inherited cgroup placement.
883        if !backdrop.payloads.is_empty() {
884            let ops: Vec<Op> = backdrop
885                .payloads
886                .iter()
887                .map(|p| Op::run_payload(p, Vec::<String>::new()))
888                .collect();
889            apply_ops(ctx, s, &ops, false)?;
890        }
891        Ok::<(), anyhow::Error>(())
892    });
893    if let Err(err) = setup_res {
894        // Scheduler crashed during backdrop setup (e.g. a worker
895        // tripped the BPF error before the first Step): SIGKILL the
896        // spawned workers up front so the collect reaps below aren't
897        // scheduling-gated behind fallback workers (see
898        // `sigkill_handles`). Gated on `scx_down` (crash-only) so a
899        // non-crash setup error keeps the graceful collect path.
900        if sched_crashed_unexpectedly() {
901            sigkill_handles(&backdrop_state.handles);
902            sigkill_handles(&step_staging.handles);
903        }
904        // Collect any workers that DID spawn before the failure
905        // so their stats reach the final result instead of being
906        // discarded by `WorkloadHandle::drop` (which SIGKILLs
907        // without gathering scheduler-side data). `collect_*`
908        // drain `payload_handles` internally, so the backdrop-
909        // and step-side payloads still get `.kill()` (SHM metric
910        // emission) on the error path.
911        //
912        // `with_target_backdrop` routes every target writer to
913        // the backdrop slot, so `step_staging` normally holds
914        // nothing — but collect defensively so a partial-failure
915        // path that leaks a non-backdrop write surfaces here
916        // rather than disappearing into `StepState::drop`.
917        let mut r = collect_backdrop(backdrop_state, effective_checks, ctx.topo, ctx.cgroups);
918        let staging_result = collect_step(
919            &mut step_staging,
920            effective_checks,
921            ctx.topo,
922            ctx.cgroups,
923            // Defensive backdrop-staging collect: no step attribution.
924            None,
925        );
926        r.merge(staging_result);
927        r.merge(result.clone());
928        // step_staging's CgroupGroup RAII still drops here,
929        // removing any cgroups the failed Backdrop setup routed
930        // into step-local state.
931        r.record_fail(crate::assert::AssertDetail::new(
932            crate::assert::DetailKind::Other,
933            format!("Backdrop setup failed: {err:#}"),
934        ));
935        return Ok(Some(r));
936    }
937    // `step_staging` should not have accumulated anything
938    // because `with_target_backdrop` routed every target writer
939    // to the backdrop side. Collect any stray handles defensively
940    // before dropping so a future refactor that leaks a non-
941    // backdrop write here surfaces as a missed teardown rather
942    // than silently discarded state.
943    drain_all_payload_handles(&mut step_staging.payload_handles);
944    Ok(None)
945}
946
947/// 1-indexed phase number for scenario Step `step_idx` (BASELINE=0,
948/// Step k -> k+1), saturating at `u16::MAX` past the encoding range.
949fn phase_step_index(step_idx: usize) -> u16 {
950    u16::try_from(step_idx)
951        .ok()
952        .and_then(|i| i.checked_add(1))
953        .unwrap_or(u16::MAX)
954}
955
956/// Build the inter-step scheduler-death failure result: collect
957/// backdrop workers, merge the accumulated result, record the
958/// `format_sched_died_after_step` detail.
959fn build_sched_died_after_step(
960    backdrop_state: &mut BackdropState<'_>,
961    result: AssertResult,
962    ctx: &Ctx,
963    effective_checks: &crate::assert::Assert,
964    step_idx: usize,
965    steps_len: usize,
966    scenario_start: std::time::Instant,
967) -> AssertResult {
968    // Collect backdrop-owned workload handles into the
969    // result before reporting the crash so whatever the
970    // persistent workers produced is still assertable.
971    let mut r = collect_backdrop(backdrop_state, effective_checks, ctx.topo, ctx.cgroups);
972    r.merge(result);
973    r.record_fail(crate::assert::AssertDetail::new(
974        sched_died_detail_kind(),
975        crate::assert::format_sched_died_after_step(
976            step_idx,
977            steps_len,
978            scenario_start.elapsed().as_secs_f64(),
979        ),
980    ));
981    r
982}
983
984/// Build the step-error failure result: collect backdrop workers,
985/// merge the accumulated result, record the `step N failed` detail.
986fn build_step_failed(
987    backdrop_state: &mut BackdropState<'_>,
988    result: AssertResult,
989    ctx: &Ctx,
990    effective_checks: &crate::assert::Assert,
991    step_idx: usize,
992    err: &anyhow::Error,
993) -> AssertResult {
994    // Collect Backdrop-owned workload handles into a fresh
995    // result first, then merge the accumulated step result
996    // on top. `collect_backdrop` drains
997    // `backdrop_state.payload_handles` internally, so the
998    // backdrop-side payloads still get `.kill()` (metric
999    // emission) on the error path. Ordering mirrors the
1000    // scheduler-crash path above so detail order is
1001    // consistent across both Ok(failed) returns.
1002    let mut r = collect_backdrop(backdrop_state, effective_checks, ctx.topo, ctx.cgroups);
1003    r.merge(result);
1004    r.record_fail(crate::assert::AssertDetail::new(
1005        crate::assert::DetailKind::Other,
1006        format!("step {step_idx} failed: {err:#}"),
1007    ));
1008    r
1009}
1010
1011/// Build the sched-died-during-hold failure result: collect backdrop
1012/// workers, merge the accumulated result, record the
1013/// `format_sched_died_during_workload` detail.
1014fn build_sched_died_during_hold(
1015    backdrop_state: &mut BackdropState<'_>,
1016    result: AssertResult,
1017    ctx: &Ctx,
1018    effective_checks: &crate::assert::Assert,
1019    scenario_start: std::time::Instant,
1020) -> AssertResult {
1021    let mut r = collect_backdrop(backdrop_state, effective_checks, ctx.topo, ctx.cgroups);
1022    r.merge(result);
1023    r.record_fail(crate::assert::AssertDetail::new(
1024        sched_died_detail_kind(),
1025        crate::assert::format_sched_died_during_workload(scenario_start.elapsed().as_secs_f64()),
1026    ));
1027    r
1028}
1029
1030/// Emit the ScenarioEnd marker, run the final liveness check, tear the
1031/// Backdrop down, and fold in a sched-died detail when the scheduler
1032/// died after the last hold.
1033fn finish_scenario(
1034    ctx: &Ctx,
1035    backdrop_state: &mut BackdropState<'_>,
1036    effective_checks: &crate::assert::Assert,
1037    scenario_start: std::time::Instant,
1038    steps_len: usize,
1039    final_total_iterations: Option<(u64, u64)>,
1040    mut result: AssertResult,
1041) -> AssertResult {
1042    // ScenarioEnd marker. Routes through `send_scenario_end`
1043    // (virtio-console port-1 bulk channel).
1044    // Carries the last cleanly-completed step's COINCIDENT (elapsed_ms,
1045    // iterations) pair so the host can give the last step an
1046    // `iteration_rate` over a well-formed window. The elapsed
1047    // is the end-of-hold timestamp captured WITH the count inside
1048    // run_step — NOT recomputed here, which would include the
1049    // pause+collect teardown wall-time and under-report the rate. When
1050    // no step completed a hold (e.g. every step's scheduler died) the
1051    // pair is `None`: fall back to (now, 0), which the host skips
1052    // (0 baseline / e<=s).
1053    if guest_comms::is_guest() {
1054        let (end_elapsed_ms, end_iterations) = final_total_iterations
1055            .unwrap_or_else(|| (scenario_start.elapsed().as_millis() as u64, 0));
1056        crate::vmm::guest_comms::send_scenario_end(end_elapsed_ms, end_iterations);
1057    }
1058
1059    // Final liveness check. Live `crate::vmm::rust_init::sched_pid()`
1060    // read instead of `ctx.sched_pid` snapshot so a mid-scenario
1061    // Op::ReplaceScheduler swap reflects the new pid here too.
1062    // sched_pid() == None ⇒ no scheduler configured (kernel-default
1063    // path) OR Op::DetachScheduler cleared it; no liveness to
1064    // report on either case.
1065    let sched_dead = crate::vmm::rust_init::sched_pid()
1066        .is_some_and(|pid| !process_alive(pid) || dispatch::scx_down());
1067
1068    // Scheduler died after the last hold: SIGKILL the backdrop
1069    // workers up front so the teardown reap below isn't
1070    // scheduling-gated behind still-spinning workers that fell back
1071    // to the builtin scheduler (see `sigkill_handles`).
1072    if sched_dead {
1073        sigkill_handles(&backdrop_state.handles);
1074    }
1075
1076    // --- Backdrop teardown ---
1077    let backdrop_result = collect_backdrop(backdrop_state, effective_checks, ctx.topo, ctx.cgroups);
1078    result.merge(backdrop_result);
1079
1080    if sched_dead {
1081        result.record_fail(crate::assert::AssertDetail::new(
1082            sched_died_detail_kind(),
1083            crate::assert::format_sched_died_after_all_steps(
1084                steps_len,
1085                scenario_start.elapsed().as_secs_f64(),
1086            ),
1087        ));
1088    }
1089
1090    result
1091}
1092
1093/// Internal driver: runs Backdrop setup, the Step loop with
1094/// per-Step teardown, and final Backdrop teardown.
1095fn run_scenario(
1096    ctx: &Ctx,
1097    backdrop: backdrop::Backdrop,
1098    steps: Vec<Step>,
1099    checks: Option<&crate::assert::Assert>,
1100) -> Result<AssertResult> {
1101    validate_scenario_inputs(&backdrop, &steps)?;
1102    let effective_checks = checks.unwrap_or(&ctx.assert);
1103
1104    // Enable the controllers this scenario actually needs in
1105    // `cgroup.subtree_control` BEFORE any cgroupfs writes land. The
1106    // union is computed from every CgroupDef and Op declared in the
1107    // backdrop+steps; tests that declare no controller-gated knobs
1108    // get an empty set (parent dir created, no subtree_control walk).
1109    let required = required_controllers(ctx, &backdrop, &steps);
1110    ctx.cgroups
1111        .setup(&required)
1112        .context("enable cgroup controllers in subtree_control")?;
1113
1114    let mut backdrop_state = BackdropState::empty(ctx);
1115    let mut result = AssertResult::pass();
1116
1117    // (scenario-relative elapsed_ms, cumulative worker iteration count)
1118    // at the END of the most-recently completed step, captured
1119    // coincidently inside `run_step` while that step's workers are still
1120    // alive. Carries the LAST step's pair out of the loop so the
1121    // `ScenarioEnd` frame can supply the final step's `iteration_rate`
1122    // right boundary. `None` until the first step completes a
1123    // hold cleanly.
1124    let mut final_total_iterations: Option<(u64, u64)> = None;
1125
1126    let scenario_start = std::time::Instant::now();
1127
1128    // ScenarioStart marker. `is_guest` short-circuits in host
1129    // contexts (unit tests) where the bulk port and SHM ring are
1130    // both absent and `send_scenario_start` would log a no-op warning.
1131    if guest_comms::is_guest() {
1132        crate::vmm::guest_comms::send_scenario_start();
1133    }
1134
1135    wait_for_host_map_write(ctx);
1136
1137    if let Some(r) = run_backdrop_setup(
1138        ctx,
1139        &backdrop,
1140        &mut backdrop_state,
1141        effective_checks,
1142        &result,
1143    )? {
1144        return Ok(r);
1145    }
1146
1147    // --- Step loop with per-Step teardown ---
1148    for (step_idx, step) in steps.iter().enumerate() {
1149        // Check scheduler liveness between steps (skip before first).
1150        // Live `crate::vmm::rust_init::sched_pid()` read instead of
1151        // `ctx.sched_pid` snapshot so a mid-scenario
1152        // `Op::ReplaceScheduler` swap is reflected — the swap
1153        // dispatcher publishes the new child's pid to `SCHED_PID`
1154        // via the `SCHED_PID.store` in `try_spawn_scheduler`, and
1155        // this check then observes the new pid's liveness (not the
1156        // dead boot pid). `None` means
1157        // either no scheduler was configured at boot or
1158        // `Op::DetachScheduler` cleared the pid; the liveness probe
1159        // cannot meaningfully report on a pid that doesn't exist.
1160        if step_idx > 0
1161            && let Some(pid) = crate::vmm::rust_init::sched_pid()
1162            && (!process_alive(pid) || dispatch::scx_down())
1163        {
1164            // Scheduler died between steps: the kernel has disabled
1165            // sched_ext and the backdrop workers have fallen back to
1166            // the builtin scheduler. SIGKILL them up front so the
1167            // collect reap below isn't scheduling-gated behind
1168            // still-spinning workers (see `sigkill_handles`).
1169            sigkill_handles(&backdrop_state.handles);
1170            return Ok(build_sched_died_after_step(
1171                &mut backdrop_state,
1172                result,
1173                ctx,
1174                effective_checks,
1175                step_idx,
1176                steps.len(),
1177                scenario_start,
1178            ));
1179        }
1180
1181        let mut step_state = StepState::empty(ctx);
1182        let mut sched_died_during_hold = false;
1183        // Publish the 1-indexed phase number for this Step so the
1184        // freeze-coordinator periodic-capture path and the on-demand
1185        // Op::CaptureSnapshot / Op::WatchSnapshot apply arms all
1186        // stamp the captures they take with the correct scenario
1187        // phase. The 1-indexed encoding (scenario Step k -> phase
1188        // k + 1) reserves phase 0 for the pre-first-Step BASELINE
1189        // settle window. `Release` pairs with the consumers'
1190        // `Acquire` load so a sample stamped with this value
1191        // happens-after any state the Step has set up before
1192        // calling run_step.
1193        let phase_step_index = phase_step_index(step_idx);
1194        ctx.current_step
1195            .store(phase_step_index, std::sync::atomic::Ordering::Release);
1196        // Broadcast the phase to this handle's backdrop (persistent)
1197        // workers so each drains its per-phase PhaseSlice at this
1198        // boundary. Step-local pools are NOT bumped here — they
1199        // re-spawn per step and get per-phase attribution via
1200        // collect_step's step_index instead.
1201        for (_, h) in &backdrop_state.handles {
1202            h.set_phase_epoch(u32::from(phase_step_index));
1203        }
1204        // Install the assert-side phase guard for the scenario
1205        // driver's thread for the duration of this Step. Every
1206        // AssertDetail / PassDetail / InfoNote constructed under
1207        // the run_step call below auto-stamps its `phase` field
1208        // with "Step[<step_idx>]" via the thread-local snapshot
1209        // in `crate::assert::current_phase_label`. On Drop the
1210        // prior label is restored (BASELINE outside any Step), so
1211        // assertions evaluated post-loop (e.g. at scenario
1212        // teardown) stamp with the right outer scope.
1213        let _phase_guard = crate::assert::PhaseGuard::install_step(step_idx as u16);
1214        let step_res = run_step(
1215            ctx,
1216            step,
1217            step_idx,
1218            &mut step_state,
1219            &mut backdrop_state,
1220            scenario_start,
1221            effective_checks,
1222            &mut sched_died_during_hold,
1223            &mut final_total_iterations,
1224        );
1225
1226        // Close this step's hold window for backdrop workers: write the
1227        // inter-step gap sentinel so work done between this StepEnd and
1228        // the next StepStart is NOT folded into step `step_idx`'s slice
1229        // (matching the host's hold-only window clamp, and step-local
1230        // workers, which do not exist during the inter-step gap).
1231        for (_, h) in &backdrop_state.handles {
1232            h.set_phase_epoch(u32::MAX);
1233        }
1234
1235        // Scheduler died during this step's hold: the kernel has
1236        // disabled sched_ext and the workers (step and backdrop) have
1237        // fallen back to the builtin scheduler. If CPU-bound they
1238        // would CFS-starve the per-worker reap in the collect calls
1239        // below, so SIGKILL them all up front — the reaps then find
1240        // already-exiting workers (see `sigkill_handles`). Gated on
1241        // the death flag so the normal teardown keeps its graceful
1242        // cooperative-stop + report-collection path.
1243        if sched_died_during_hold || sched_crashed_unexpectedly() {
1244            sigkill_handles(&step_state.handles);
1245            sigkill_handles(&backdrop_state.handles);
1246        }
1247
1248        if guest_comms::is_guest() {
1249            crate::vmm::guest_comms::send_scenario_pause();
1250        }
1251
1252        let step_result = collect_step(
1253            &mut step_state,
1254            effective_checks,
1255            ctx.topo,
1256            ctx.cgroups,
1257            // The SAME 1-indexed value stamped onto this step's StepStart
1258            // frames (build_stimulus), so the guest per_cgroup keys on the
1259            // identical step_index the host rebuilds buckets under.
1260            Some(phase_step_index),
1261        );
1262        result.merge(step_result);
1263
1264        // A step-level error is converted into a failure on the
1265        // accumulated result after teardown has run so every step
1266        // boundary leaves clean state behind even on failure. The
1267        // caller keeps the prior-steps' merged AssertResult plus
1268        // the error context as a detail, instead of an opaque Err
1269        // that discards everything.
1270        if let Err(err) = step_res {
1271            // Scheduler may have crashed between the pre-collect gate
1272            // above and here (e.g. a non-crash step error, then scx went
1273            // down during `collect_step`): SIGKILL the backdrop workers
1274            // up front so this collect isn't scheduling-gated behind the
1275            // fallback pool (see `sigkill_handles`). Crashed-only gate so
1276            // a plain step error with a live scheduler keeps the graceful
1277            // cooperative-stop path.
1278            if sched_crashed_unexpectedly() {
1279                sigkill_handles(&backdrop_state.handles);
1280            }
1281            return Ok(build_step_failed(
1282                &mut backdrop_state,
1283                result,
1284                ctx,
1285                effective_checks,
1286                step_idx,
1287                &err,
1288            ));
1289        }
1290
1291        // Scheduler exited during the step's hold-period sleep —
1292        // [`run_step`] cut the hold short and stamped
1293        // `sched_died_during_hold`. Emit the in-step
1294        // sched-died message before continuing to the next step
1295        // boundary; otherwise the post-loop probe would fire after
1296        // the full scenario duration and stamp a misleading elapsed
1297        // time. Same Backdrop-then-step merge order as the
1298        // inter-step path above so detail ordering stays consistent.
1299        if sched_died_during_hold {
1300            return Ok(build_sched_died_during_hold(
1301                &mut backdrop_state,
1302                result,
1303                ctx,
1304                effective_checks,
1305                scenario_start,
1306            ));
1307        }
1308    }
1309
1310    Ok(finish_scenario(
1311        ctx,
1312        &mut backdrop_state,
1313        effective_checks,
1314        scenario_start,
1315        steps.len(),
1316        final_total_iterations,
1317        result,
1318    ))
1319}
1320
1321/// Sleep up to `dur`, returning early if `sched_pid` exits.
1322///
1323/// Returns `true` the first time the scheduler is observed dead,
1324/// `false` if the full duration elapsed with no death observed.
1325/// When `sched_pid` is `None` (kernel-default scheduling, no
1326/// scheduler process to monitor), behaves exactly like
1327/// [`thread::sleep`] and always returns `false`.
1328///
1329/// Implementation uses `pidfd_open(2)` + `epoll_wait` so the waiter
1330/// is kernel-blocked on the pidfd until either the scheduler exits
1331/// (pidfd becomes readable) or the per-step hold elapses. This
1332/// drops crash-detection latency from one poll-tick (the previous
1333/// 100 ms cadence) to ~0: the kernel wakes the epoll waiter as
1334/// soon as the task transitions to EXIT_ZOMBIE. Mirrors
1335/// [`crate::scenario::payload_run`]'s `wait_with_deadline` shape.
1336/// Minimum kernel: Linux 5.3.
1337///
1338/// Deadline honoring: the `epoll_wait` timeout is re-derived from
1339/// `saturating_duration_since` each iteration so `EINTR` restarts
1340/// narrow the remaining window rather than extending it.
1341///
1342/// Failure handling: if `pidfd_open` returns `ESRCH`, the scheduler
1343/// is already gone — return `true` immediately without sleeping. Any
1344/// other failure mode (pidfd_open non-ESRCH, epoll_create1,
1345/// epoll_ctl ADD, EpollTimeout::try_from, epoll_wait) panics with an
1346/// operator-actionable message. Polling fallbacks were removed per
1347/// the project-wide "no polling fallbacks for evented paths" rule:
1348/// pidfd_open has shipped since Linux 5.3 and epoll has been
1349/// universally available for longer, so a failure here indicates a
1350/// catastrophic environment defect (memory pressure exhausting fds,
1351/// kernel feature compiled out) rather than a recoverable transient.
1352/// A loud panic surfaces the defect immediately; the prior silent
1353/// sleep+probe fallback masked it as test flakiness.
1354///
1355/// Scheduling jitter under load can leave the actual elapsed time
1356/// modestly above `dur`.
1357/// Loud panic on env- or code-defect failures inside [`hold_or_sched_died`].
1358/// Centralised so every site emits the same module-qualified prefix
1359/// `ktstr::scenario::hold_or_sched_died` — operator can grep that exact
1360/// string to land at the panic source. `op` names the failed primitive,
1361/// `pid` carries the in-scope pid for cross-reference with /proc, `err`
1362/// renders the underlying errno or nix error, `advice` is the one-line
1363/// remediation classified by failure class (env vs framework code defect).
1364#[cold]
1365#[track_caller]
1366fn panic_evented_hold_defect(
1367    op: &str,
1368    pid: libc::pid_t,
1369    err: impl std::fmt::Display,
1370    advice: &str,
1371) -> ! {
1372    panic!("ktstr::scenario::hold_or_sched_died: {op} failed (pid={pid}): {err} — {advice}");
1373}
1374
1375/// True when a scheduler is still expected running (`sched_pid` set) AND
1376/// sched_ext has gone down (`disabling`/`disabled`) — an unexpected
1377/// crash/unregister of the scheduler under test, detected via the
1378/// probe-independent [`dispatch::scx_down`] sysfs read. Unlike the BPF
1379/// err-exit latch (whose host mirror is only populated in the
1380/// auto-repro/dump VM, never the primary VM that runs the test), this is
1381/// live in the PRIMARY VM. The `sched_pid` guard keeps a clean
1382/// [`Op::DetachScheduler`] — which clears the pid before the state
1383/// settles to `disabled` — off the SIGKILL fast-path. Used by the
1384/// teardown gates that don't already hold the scheduler pid in scope.
1385fn sched_crashed_unexpectedly() -> bool {
1386    crate::vmm::rust_init::sched_pid().is_some() && dispatch::scx_down()
1387}
1388
1389fn hold_or_sched_died(dur: Duration, sched_pid: Option<libc::pid_t>) -> bool {
1390    use crate::probe::process::{SchedExitKind, sched_exit_kind};
1391    use nix::sys::epoll::{Epoll, EpollCreateFlags, EpollEvent, EpollFlags, EpollTimeout};
1392    use std::os::fd::{AsFd, FromRawFd, OwnedFd};
1393
1394    // The BPF err-exit latch (read via `sched_exit_kind`) flips at the
1395    // error-class sched_ext exit — i.e. at the crash, before the crashed
1396    // scheduler PROCESS exits (a scheduler like scx_lavd observes the
1397    // disable through its ~1s userspace poll loop, then flushes its dump
1398    // and exits, so its process exit trails the crash by ~that long). Poll
1399    // it ALONGSIDE the pidfd so a crash aborts the hold at ~crash time, not
1400    // at the process exit. The pidfd remains the backstop for
1401    // clean/non-error process exits. 100ms is negligible vs the crash + the
1402    // scheduler's exit latency.
1403    const ERR_EXIT_POLL: Duration = Duration::from_millis(100);
1404    let crashed = || matches!(sched_exit_kind(), SchedExitKind::Crashed);
1405
1406    if dur.is_zero() {
1407        return crashed()
1408            || (sched_pid.is_some() && dispatch::scx_down())
1409            || sched_pid.is_some_and(|pid| !process_alive(pid));
1410    }
1411    let deadline = std::time::Instant::now() + dur;
1412    let Some(pid) = sched_pid else {
1413        // No scheduler pid (host-only run, or the pid was not recorded):
1414        // no pidfd to wait on, but the err-exit latch still fires on a
1415        // crash — poll it in short intervals instead of sleeping blind
1416        // for the whole window.
1417        loop {
1418            let remaining = deadline.saturating_duration_since(std::time::Instant::now());
1419            if remaining.is_zero() {
1420                return crashed();
1421            }
1422            if crashed() {
1423                return true;
1424            }
1425            thread::sleep(remaining.min(ERR_EXIT_POLL));
1426        }
1427    };
1428
1429    // `pidfd_open(pid, 0)`: returns an fd that becomes readable when
1430    // the pid exits. Only meaningful on a thread-group leader, which
1431    // every `sched_pid` already is (it is the scheduler binary's
1432    // top-level pid as recorded in `Ctx::sched_pid`). No
1433    // `PIDFD_NONBLOCK` flag — epoll is the gate.
1434    let pidfd_raw = unsafe { libc::syscall(libc::SYS_pidfd_open, pid, 0i32) };
1435    if pidfd_raw < 0 {
1436        let err = std::io::Error::last_os_error();
1437        if err.raw_os_error() == Some(libc::ESRCH) {
1438            // pidfd_open observed the pid as gone before we could
1439            // even attach a waiter — sched is already dead.
1440            return true;
1441        }
1442        // pidfd_open shipped unconditionally in Linux 5.3 and ktstr's
1443        // kernel floor is well above that. A non-ESRCH failure (ENOMEM,
1444        // ENFILE, EPERM) means the test environment is broken in a way
1445        // polling cannot recover from. Panic loudly so the operator
1446        // sees the env defect instead of silently losing sched-died
1447        // detection for the rest of the hold.
1448        panic_evented_hold_defect(
1449            "pidfd_open",
1450            pid,
1451            format_args!("{err} (errno {:?})", err.raw_os_error()),
1452            "pidfd_open is unconditional from Linux 5.3; failure on a \
1453             5.3+ kernel = env defect — check ulimit -n / memory pressure / \
1454             cgroup pids.max",
1455        );
1456    }
1457    // SAFETY: the syscall succeeded and returned a fresh fd; it is
1458    // not registered with any other owner.
1459    let pidfd: OwnedFd = unsafe { OwnedFd::from_raw_fd(pidfd_raw as i32) };
1460
1461    // epoll setup. EPOLL_CLOEXEC matches `wait_with_deadline` to
1462    // avoid leaking the epoll fd into any post-fork descendant.
1463    let epoll = match Epoll::new(EpollCreateFlags::EPOLL_CLOEXEC) {
1464        Ok(e) => e,
1465        Err(e) => {
1466            let fd = std::os::fd::AsRawFd::as_raw_fd(&pidfd);
1467            panic_evented_hold_defect(
1468                "epoll_create1(EPOLL_CLOEXEC)",
1469                pid,
1470                format_args!("{e} (pidfd={fd})"),
1471                "epoll has been universally available since 2.6; failure = \
1472                 env defect — check ulimit -n and CONFIG_EPOLL",
1473            );
1474        }
1475    };
1476    // `data` field is unused — we only ever watch one fd. The add()
1477    // syscall still needs an `EpollEvent` with populated events.
1478    let event = EpollEvent::new(EpollFlags::EPOLLIN, 0);
1479    if let Err(e) = epoll.add(pidfd.as_fd(), event) {
1480        panic_evented_hold_defect(
1481            "epoll_ctl(ADD)",
1482            pid,
1483            e,
1484            "epoll_ctl(ADD) on a freshly-opened pidfd should never fail \
1485             in a healthy kernel; documented errors (EBADF/EEXIST/ENOMEM) \
1486             are env defects — likely fd exhaustion or memory pressure",
1487        );
1488    }
1489
1490    let mut events = [EpollEvent::empty()];
1491    loop {
1492        let remaining = deadline.saturating_duration_since(std::time::Instant::now());
1493        if remaining.is_zero() {
1494            // Hold elapsed without a wakeup. Re-probe ALL signals to
1495            // catch a race where the pid exited, the err-exit latch
1496            // flipped, or sched_ext went down (sysfs) between the last
1497            // `epoll_wait` return and the deadline check (e.g. during
1498            // EINTR re-entry). `pid` is `Some` here, so the
1499            // scheduler-expected guard for `scx_down` is satisfied.
1500            return crashed() || dispatch::scx_down() || !process_alive(pid);
1501        }
1502
1503        // Abort at the crash, not at the lingering process exit. Two
1504        // probe-independent signals fire well before a crashed
1505        // scheduler's process exits: the BPF err-exit latch (live only
1506        // in the auto-repro/dump VM) and `/sys/kernel/sched_ext/state`
1507        // going `disabling`/`disabled` (live in the PRIMARY VM, where the
1508        // latch mirror is never populated — this is what makes a primary
1509        // VM crash abort the hold at crash time rather than ~1s later at
1510        // process exit). `pid` is `Some` here, so the
1511        // scheduler-expected guard for `scx_down` holds.
1512        if crashed() || dispatch::scx_down() {
1513            return true;
1514        }
1515
1516        // Cap the epoll timeout at the latch poll interval so the latch
1517        // is re-checked every `ERR_EXIT_POLL`, not only when the pidfd
1518        // becomes readable. `PollTimeout` (aliased as `EpollTimeout`)
1519        // stores the value as `i32`; single-pass clamp via
1520        // `u128 → i32::MAX` so a large remainder saturates at the max
1521        // accepted value instead of overflowing through the u32.
1522        let poll = remaining.min(ERR_EXIT_POLL);
1523        let ms_i32 = poll.as_millis().min(i32::MAX as u128) as i32;
1524        let timeout_param = match EpollTimeout::try_from(ms_i32) {
1525            Ok(t) => t,
1526            Err(e) => {
1527                // ms_i32 was clamped to the i32 range above so
1528                // EpollTimeout::try_from (which accepts i32) cannot
1529                // overflow. Reaching this arm means the EpollTimeout API
1530                // changed shape — code defect, not env transient.
1531                panic_evented_hold_defect(
1532                    "EpollTimeout::try_from",
1533                    pid,
1534                    format_args!("{e} (input={ms_i32})"),
1535                    "input was pre-clamped to fit i32; failure indicates an \
1536                     upstream nix EpollTimeout API change requiring code update",
1537                );
1538            }
1539        };
1540
1541        match epoll.wait(&mut events, timeout_param) {
1542            Ok(0) => {
1543                // Poll-interval timeout, no pidfd event. Loop back to
1544                // re-check the err-exit latch and the deadline at the
1545                // top of the loop.
1546            }
1547            Ok(_) => {
1548                // pidfd became readable — task transitioned to
1549                // EXIT_ZOMBIE. Scheduler is dead.
1550                return true;
1551            }
1552            Err(nix::errno::Errno::EINTR) => {
1553                // Signal interrupted the wait; loop and re-compute
1554                // the remaining window.
1555            }
1556            Err(e) => {
1557                panic_evented_hold_defect(
1558                    "epoll_wait",
1559                    pid,
1560                    e,
1561                    "epoll_wait on a freshly-created epoll with a single \
1562                     valid pidfd cannot legitimately fail outside EINTR; \
1563                     documented errors (EBADF/EFAULT/EINVAL) are framework- \
1564                     internal memory-safety defects — investigate concurrent \
1565                     fd mutation or stack-frame corruption",
1566                );
1567            }
1568        }
1569    }
1570}
1571
1572/// Run a single step's setup + ops + hold against step-local state.
1573///
1574/// On error, the caller is expected to invoke `collect_step` for
1575/// per-step teardown (which runs regardless) and then propagate.
1576///
1577/// `sched_died_during_hold` is set to `true` when the hold-period
1578/// liveness poll observes the scheduler process exiting; the caller
1579/// uses this to emit [`crate::assert::format_sched_died_during_workload`]
1580/// instead of waiting for the post-loop probe to fire (which would
1581/// stamp the message with the full scenario duration even though
1582/// the death happened mid-step).
1583#[allow(clippy::too_many_arguments)]
1584fn run_step<'a>(
1585    ctx: &Ctx,
1586    step: &Step,
1587    step_idx: usize,
1588    step_state: &mut StepState<'a>,
1589    backdrop_state: &mut BackdropState<'a>,
1590    scenario_start: std::time::Instant,
1591    _effective_checks: &crate::assert::Assert,
1592    sched_died_during_hold: &mut bool,
1593    // Set to (scenario-relative elapsed_ms, cumulative worker iteration
1594    // count) sampled coincidently at this step's clean completion
1595    // Left unchanged on the sched-died early-return paths so
1596    // a failed step does not overwrite a prior step's good value with a
1597    // misleading post-death count.
1598    final_total_iterations: &mut Option<(u64, u64)>,
1599) -> Result<()> {
1600    let mut scenario = ScenarioState::new(step_state, backdrop_state);
1601
1602    // Any `?` out of apply_ops / apply_setup would bypass the
1603    // per-step teardown ordering; `drain_on_err!` kills payload
1604    // handles across step + backdrop (metric-emitting) before
1605    // propagating so a mid-scenario spawn failure still leaves a
1606    // usable sidecar.
1607    macro_rules! drain_on_err {
1608        ($scenario:expr, $e:expr) => {
1609            match $e {
1610                Ok(v) => v,
1611                Err(err) => {
1612                    $scenario.drain_all_payloads();
1613                    return Err(err);
1614                }
1615            }
1616        };
1617    }
1618
1619    match step.hold {
1620        HoldSpec::Loop { interval } => {
1621            // Setup runs once before the loop.
1622            if !step.setup.is_empty() {
1623                let defs = step.setup.resolve(ctx);
1624                drain_on_err!(scenario, apply_setup(ctx, &mut scenario, &defs));
1625            }
1626            // Emit the Loop step's start stimulus frame + resume, like
1627            // the non-Loop arm below. A Loop step is a phase like any
1628            // other: without its own start frame the synthesized
1629            // scenario-end terminal would pair against the
1630            // PRIOR step's frame and misattribute the entire Loop
1631            // window's throughput to the wrong step. The resume
1632            // re-enables periodic captures for the loop body and matches
1633            // the prior step's `run_steps` pause (same pause/resume
1634            // pairing the non-Loop arm uses; the first-step unmatched
1635            // resume is already the existing pattern).
1636            if guest_comms::is_guest() {
1637                let payload = build_stimulus(&scenario_start, step_idx, &step.ops, &scenario);
1638                crate::vmm::guest_comms::send_stimulus(zerocopy::IntoBytes::as_bytes(&payload));
1639                crate::vmm::guest_comms::send_scenario_resume();
1640            }
1641            // Loop mode: apply ops repeatedly at interval until
1642            // the remaining scenario time is exhausted, or the
1643            // scheduler process exits — whichever fires first.
1644            let deadline = scenario_start + ctx.duration;
1645            while std::time::Instant::now() < deadline {
1646                drain_on_err!(scenario, apply_ops(ctx, &mut scenario, &step.ops, true));
1647                let remaining = deadline.saturating_duration_since(std::time::Instant::now());
1648                // Live `sched_pid()` read so a mid-loop
1649                // Op::ReplaceScheduler swap is watched at the NEW
1650                // pid, not the stale boot snapshot in ctx.
1651                if hold_or_sched_died(remaining.min(interval), crate::vmm::rust_init::sched_pid()) {
1652                    *sched_died_during_hold = true;
1653                    return Ok(());
1654                }
1655            }
1656        }
1657        _ => {
1658            // Ops first (e.g. parent cgroup creation), then
1659            // CgroupDef setup (children with workers).
1660            //
1661            // Footgun: a workload-producing `CgroupDef` in
1662            // `step.setup` is invisible to `step.ops` operating on
1663            // it, because step.ops runs BEFORE apply_setup creates
1664            // the cgroup AND registers the WorkloadHandle in
1665            // `state.all_handles()`. The failure mode is per-Op:
1666            // `Op::MoveAllTasks` against the absent source filters
1667            // `state.all_handles()` by name, finds zero matches,
1668            // iterates an empty `pid_batches`, and exits with no
1669            // work done — silent (no error surfaced to the
1670            // operator); `Op::CaptureCgroupProcs` reading the
1671            // missing `cgroup.procs` via `read_procs` returns the
1672            // ENOENT `with_context` and propagates `?`, bailing the
1673            // ops phase — loud (the test crashes with the cgroup
1674            // name in the error). Tests that need a freshly-spawned
1675            // worker pool to feed step ops must declare the
1676            // producing `CgroupDef` in the Backdrop (which runs
1677            // before any Step) when `step.setup` is `Setup::Defs(_)`.
1678            // When `step.setup` is `Setup::Factory(_)` (runtime-
1679            // generated defs that can't be hoisted at edit time),
1680            // the only viable remediation is restructuring into
1681            // multiple Steps — producer factory in Step N's setup,
1682            // consumer ops in Step N+1.
1683            drain_on_err!(scenario, apply_ops(ctx, &mut scenario, &step.ops, false));
1684            if !step.setup.is_empty() {
1685                let defs = step.setup.resolve(ctx);
1686                drain_on_err!(scenario, apply_setup(ctx, &mut scenario, &defs));
1687            }
1688
1689            // Write stimulus event after applying ops. Routes through
1690            // `crate::vmm::guest_comms::send_stimulus` (virtio-console
1691            // port-1 bulk channel). `is_guest` keeps the
1692            // `build_stimulus` walk off the host where the write would
1693            // no-op.
1694            if guest_comms::is_guest() {
1695                let payload = build_stimulus(&scenario_start, step_idx, &step.ops, &scenario);
1696                crate::vmm::guest_comms::send_stimulus(zerocopy::IntoBytes::as_bytes(&payload));
1697            }
1698
1699            if guest_comms::is_guest() {
1700                crate::vmm::guest_comms::send_scenario_resume();
1701            }
1702            let hold_dur = match step.hold {
1703                HoldSpec::Frac(f) => Duration::from_secs_f64(ctx.duration.as_secs_f64() * f),
1704                HoldSpec::Fixed(d) => d,
1705                HoldSpec::Loop { .. } => unreachable!(),
1706            };
1707            let remaining = (scenario_start + ctx.duration)
1708                .saturating_duration_since(std::time::Instant::now());
1709            let hold_dur = hold_dur.min(remaining);
1710            // Live `sched_pid()` read — matches the loop arm above
1711            // so the hold watches the post-Op::ReplaceScheduler
1712            // pid, not the stale boot snapshot.
1713            if hold_or_sched_died(hold_dur, crate::vmm::rust_init::sched_pid()) {
1714                *sched_died_during_hold = true;
1715                return Ok(());
1716            }
1717        }
1718    }
1719
1720    // Capture the cumulative iteration count AND its scenario-relative
1721    // timestamp at this step's end, AT THE SAME INSTANT, while the
1722    // step's workers are still alive — `collect_step` tears them down
1723    // right after `run_step` returns, so this is the last moment a
1724    // step-inclusive sum is observable. `run_steps` forwards the LAST
1725    // step's (elapsed_ms, iterations) pair in the widened `ScenarioEnd`
1726    // frame so the final step's `iteration_rate` has a right boundary to
1727    // diff against. The elapsed MUST be sampled HERE,
1728    // coincident with the count: recomputing it at `ScenarioEnd` send
1729    // time would measure past `send_scenario_pause` + the last step's
1730    // `collect_step` teardown (100ms–seconds under contention),
1731    // inflating the rate's denominator with wall-time during which no
1732    // iterations accrued and systematically under-reporting the last
1733    // step's throughput (a rate consumer treats counter/duration with
1734    // coincident endpoints). Only reached on a clean hold (the
1735    // sched-died early returns above skip it — a failed run needs no
1736    // terminal rate); gated on `is_guest` so the host-side scenario walk
1737    // doesn't sum a handle set that never ran a workload.
1738    if guest_comms::is_guest() {
1739        let end_elapsed_ms = scenario_start.elapsed().as_millis() as u64;
1740        let end_iterations: u64 = scenario
1741            .all_handles()
1742            .flat_map(|(_, h)| h.snapshot_iterations())
1743            .sum();
1744        *final_total_iterations = Some((end_elapsed_ms, end_iterations));
1745
1746        // Emit a per-step StepEnd frame carrying this step's coincident
1747        // end-of-hold (elapsed_ms, total_iterations) and the SAME
1748        // 1-indexed step_index as its StepStart, so the host pairs
1749        // StepStart[k] -> StepEnd[k] for step-LOCAL throughput: each
1750        // step's OWN workers measured start-to-end, which —
1751        // unlike the cross-step StepStart[k] -> StepStart[k+1] delta —
1752        // does not read ~0 for workers respawned per step. build_stimulus
1753        // supplies the op/cgroup/worker fields + the 1-indexed
1754        // step_index; override its recomputed elapsed/iterations with the
1755        // values captured above so the StepEnd pair stays coincident with
1756        // `final_total_iterations` (no pause/teardown wall-time creep).
1757        let mut step_end = build_stimulus(&scenario_start, step_idx, &step.ops, &scenario);
1758        step_end.elapsed_ms = u32::try_from(end_elapsed_ms).unwrap_or(u32::MAX);
1759        step_end.total_iterations = end_iterations;
1760        guest_comms::send_step_end(zerocopy::IntoBytes::as_bytes(&step_end));
1761    }
1762
1763    Ok(())
1764}
1765
1766/// Build a StimulusPayload from the current scenario state (step + backdrop).
1767///
1768/// # step_idx u16 saturation
1769///
1770/// `step_idx` is a `usize` on the caller side but the wire
1771/// `StimulusPayload.step_index` is a `u16` — the slot is sized for
1772/// realistic scenarios (≤ 65 536 distinct indices, `0..=u16::MAX`).
1773/// Any `step_idx` > `u16::MAX as usize` is clamped to `u16::MAX` by
1774/// `to_u16` below, with a `tracing::warn!` that names the overflow.
1775/// Downstream consumers of the StepStart wire frame therefore see
1776/// every step past index `u16::MAX` collapsed onto the same
1777/// `step_index` value (`u16::MAX`) — the ordering is preserved for
1778/// the first 65 536 steps (indices `0..=u16::MAX`), but labels
1779/// saturate and become ambiguous once the scenario crosses the
1780/// boundary. Scenarios that need to distinguish individual steps
1781/// past `u16::MAX` must widen the wire schema field; the
1782/// saturating-clip preserves visible wake ordering at the cost of
1783/// individuality in the deep tail.
1784fn build_stimulus(
1785    scenario_start: &std::time::Instant,
1786    step_idx: usize,
1787    ops: &[Op],
1788    state: &ScenarioState<'_, '_>,
1789) -> StimulusPayload {
1790    let mut op_kinds: u32 = 0;
1791    for op in ops {
1792        op_kinds |= 1 << op.discriminant();
1793    }
1794
1795    let total_iterations: u64 = state
1796        .all_handles()
1797        .flat_map(|(_, h)| h.snapshot_iterations())
1798        .sum();
1799
1800    let cgroup_count = state.step.cgroups.names().len() + state.backdrop.cgroups.names().len();
1801    let worker_count = state.step.handles.len() + state.backdrop.handles.len();
1802
1803    // Saturate narrowing conversions for the wire schema: the
1804    // StimulusPayload fields are sized for realistic scenarios
1805    // (u32 ms, u16 counts) but `as u32` / `as u16` silently
1806    // wrap on overflow, poisoning downstream consumers. Log the
1807    // overflow so the operator sees which field exceeded its
1808    // bound and substitute MAX — clipped-high is a safer wire
1809    // value than silently wrapping to a small number.
1810    let to_u32 = |field: &str, v: u128| -> u32 {
1811        u32::try_from(v).unwrap_or_else(|_| {
1812            tracing::warn!(
1813                field,
1814                value = %v,
1815                "StimulusPayload field overflowed u32; saturating to u32::MAX",
1816            );
1817            u32::MAX
1818        })
1819    };
1820    let to_u16 = |field: &str, v: usize| -> u16 {
1821        u16::try_from(v).unwrap_or_else(|_| {
1822            tracing::warn!(
1823                field,
1824                value = v,
1825                "StimulusPayload field overflowed u16; saturating to u16::MAX",
1826            );
1827            u16::MAX
1828        })
1829    };
1830
1831    // Encode the 1-indexed phase number per the framework's
1832    // phase convention -- the BASELINE (pre-first-Step) window owns
1833    // 0, scenario Step k publishes k + 1. Saturate at u16::MAX
1834    // (rather than wrap) so a pathological 65k-step scenario still
1835    // produces a clipped-high value the host parser can recognise
1836    // instead of silently rolling over.
1837    let phase_step_index: u16 = u16::try_from(step_idx)
1838        .ok()
1839        .and_then(|i| i.checked_add(1))
1840        .unwrap_or_else(|| {
1841            tracing::warn!(
1842                field = "step_index",
1843                value = step_idx,
1844                "StimulusPayload step_index overflowed u16 after 1-indexed encoding; saturating to u16::MAX",
1845            );
1846            u16::MAX
1847        });
1848    StimulusPayload {
1849        elapsed_ms: to_u32("elapsed_ms", scenario_start.elapsed().as_millis()),
1850        step_index: phase_step_index,
1851        op_count: to_u16("op_count", ops.len()),
1852        op_kinds,
1853        cgroup_count: to_u16("cgroup_count", cgroup_count),
1854        worker_count: to_u16("worker_count", worker_count),
1855        total_iterations,
1856    }
1857}
1858
1859/// Validate that a MemPolicy's node set is consistent with the
1860/// cgroup's scenario intent — the cpuset the cgroup runs in and
1861/// the host topology.
1862///
1863/// # Empty-nodemask early return
1864///
1865/// Policies with no nodemask — [`MemPolicy::Default`] and
1866/// [`MemPolicy::Local`] — carry no node IDs to validate against,
1867/// so this function returns `Ok(())` unconditionally for them
1868/// (after the unknown-bit and mutual-exclusion flag guards run).
1869/// Every other variant — any variant carrying a nodemask,
1870/// currently [`MemPolicy::Bind`], [`MemPolicy::Preferred`],
1871/// [`MemPolicy::PreferredMany`], [`MemPolicy::Interleave`], and
1872/// [`MemPolicy::WeightedInterleave`] — reaches the cpuset /
1873/// host-topology coverage logic below.
1874///
1875/// # Why this is a scenario-intent check, not a kernel guard
1876///
1877/// ktstr writes `cpuset.cpus` on each cgroup but never writes
1878/// `cpuset.mems`, so `cpuset.mems` keeps its inherited default —
1879/// the permissive "all nodes" set in every ktstr deployment
1880/// shape (PID 1 inside the guest VM, cgroup root on the host).
1881/// The kernel's `set_mempolicy(2)` path always runs the policy's
1882/// nodemask through `mpol_set_nodemask` in `mm/mempolicy.c`, which
1883/// intersects it with the caller's `mems_allowed` before it is
1884/// stored on the task; because ktstr never narrows `mems_allowed`,
1885/// that intersection is an identity operation under ktstr's
1886/// deployment — the stored nodemask equals the one the caller
1887/// supplied, and the kernel never rejects or silently trims the
1888/// policy the way it would if `mems_allowed` were disjoint from
1889/// the requested set. Rejection of a mismatched policy is
1890/// therefore validator-only: if this function does not bail, the
1891/// policy lands on the syscall unchanged and `run_steps` commits
1892/// to running the worker with a misconfigured allocation target.
1893///
1894/// What the validator catches is a **scenario-design mismatch**:
1895/// you pinned CPUs on NUMA node X (via `CpusetSpec::Numa(X)`) but
1896/// asked the mempolicy to bind/prefer/interleave a disjoint node Y,
1897/// meaning the worker's compute is local to node X while its
1898/// allocations live on node Y — producing cross-socket traffic
1899/// that the test author almost certainly did not intend. Surface
1900/// the mismatch here before `run_steps` commits to the policy.
1901///
1902/// `MpolFlags::STATIC_NODES` is the rebind-behavior flag. Two
1903/// kernel sites encode the semantics: `mpol_set_nodemask` in
1904/// `mm/mempolicy.c` consumes the flag during policy creation (it
1905/// determines whether the supplied nodemask is stored absolute or
1906/// remapped against the caller's cpuset at install time), and
1907/// `mpol_rebind_policy` (same file) branches on the flag when the
1908/// cpuset's `mems_allowed` changes after the policy was installed
1909/// — with `STATIC_NODES` set, the stored nodemask is unchanged;
1910/// without it, the kernel remaps the nodemask against the new
1911/// `mems_allowed`. Since ktstr never rebinds `cpuset.mems` mid-run,
1912/// only the install-time semantics applies, and the flag is
1913/// effectively a cross-node-intent declaration for the validator's
1914/// purposes — a sign the author knows the intent is "allocations on
1915/// a node outside the CPU-affinity cpuset" and has opted in to
1916/// that shape.
1917///
1918/// # Flag-specific handling (in order of evaluation)
1919///
1920/// - `STATIC_NODES | RELATIVE_NODES` both set → bail: the kernel
1921///   rejects this combination with `EINVAL`; surfacing it here
1922///   names the offender before the syscall.
1923/// - `STATIC_NODES` only → the caller has declared intentional
1924///   cross-node placement. Skip the cpuset-intent check, but each
1925///   referenced node must exist on the host topology or the
1926///   kernel will reject the policy. Verify existence; bail with
1927///   the missing nodes if any.
1928/// - `RELATIVE_NODES` only → the nodemask is an ordinal into the
1929///   cpuset's allowed-nodes set. Cpuset coverage does not apply in
1930///   absolute-id terms, so bypass.
1931/// - No relevant flag set → enforce cpuset-intent coverage:
1932///   every policy node must appear in the cpuset's covered NUMA
1933///   nodes. Bail naming the uncovered nodes AND both escape
1934///   hatches (STATIC_NODES opt-in; widening the cpuset).
1935///
1936/// Reject `--flag` args whose bare name is not in the payload's
1937/// `known_flags` allowlist. Returns `Ok(())` when the payload
1938/// declared no allowlist (`known_flags: None`) — the opt-in
1939/// contract defaults to "permissive" so payloads wrapping
1940/// open-ended binaries (stress-ng, fio, schbench) aren't forced
1941/// to enumerate every flag their upstream tool accepts.
1942///
1943/// Recognises two flag shapes: `--foo` (flag-only) and
1944/// `--foo=value` (flag-with-attached-value). Non-flag args
1945/// (positional, `-short`, everything else) are passed through
1946/// without inspection — the allowlist scopes to long flags only.
1947///
1948/// Extracted out of `apply_ops`'s `Op::RunPayload` arm so the
1949/// validation is unit-testable without standing up a full Ctx
1950/// / scenario state. See the caller for how the allowlist is
1951/// threaded through Op::RunPayload execution.
1952fn validate_known_flags(payload: &crate::test_support::Payload, args: &[String]) -> Result<()> {
1953    let Some(allowlist) = payload.known_flags else {
1954        return Ok(());
1955    };
1956    for arg in args {
1957        let Some(flag_body) = arg.strip_prefix("--") else {
1958            continue;
1959        };
1960        // `split('=').next()` is infallible: `str::split` always
1961        // yields at least one element (the full string when no
1962        // separator is present). The prior `unwrap_or("")` fallback
1963        // was dead code — the empty-name branch below never fired
1964        // via this path since `flag_body` had already passed the
1965        // `strip_prefix("--")` filter above (leaving at least one
1966        // character). Kept the `name.is_empty()` guard in place
1967        // only to handle the degenerate `"--"` bare-dashes case,
1968        // which produces `flag_body = ""` → `name = ""`.
1969        let name = flag_body
1970            .split('=')
1971            .next()
1972            .expect("str::split always yields at least one element");
1973        if name.is_empty() {
1974            continue;
1975        }
1976        if !allowlist.contains(&name) {
1977            anyhow::bail!(
1978                "Op::RunPayload: payload '{}' received unknown flag \
1979                 '--{name}' — not in its known_flags allowlist \
1980                 {allowlist:?}. Check the spelling against the \
1981                 payload's declared flags; if '--{name}' is a new \
1982                 legitimate flag, add it to `Payload::known_flags`.",
1983                payload.name,
1984            );
1985        }
1986    }
1987    Ok(())
1988}
1989
1990fn validate_mempolicy_cpuset(
1991    policy: &MemPolicy,
1992    flags: crate::workload::MpolFlags,
1993    cpuset: &BTreeSet<usize>,
1994    ctx: &Ctx,
1995    cgroup_name: &str,
1996) -> Result<()> {
1997    use crate::workload::MpolFlags;
1998
1999    // Reject unknown bits before any other check. The `MpolFlags`
2000    // type is a `u32` bitfield covering three documented bits
2001    // (STATIC_NODES, RELATIVE_NODES, NUMA_BALANCING); any other bit
2002    // set in `flags` is either a user typo (raw-constructing the
2003    // struct with an arbitrary integer) or forward-compat from a
2004    // future kernel flag that this validator hasn't learned yet.
2005    // Either way, surfacing unknown bits here prevents a silent
2006    // semantic mismatch — the kernel would either reject with
2007    // EINVAL or (worse) treat the bit as a flag we don't model.
2008    let known_bits = MpolFlags::STATIC_NODES.bits()
2009        | MpolFlags::RELATIVE_NODES.bits()
2010        | MpolFlags::NUMA_BALANCING.bits();
2011    let unknown_bits = flags.bits() & !known_bits;
2012    if unknown_bits != 0 {
2013        anyhow::bail!(
2014            "cgroup '{}': MpolFlags contains unknown bit(s) {:#x} (known bits: \
2015             STATIC_NODES={:#x}, RELATIVE_NODES={:#x}, NUMA_BALANCING={:#x}); \
2016             refusing to forward to the kernel — update MpolFlags to model the \
2017             new bit before using it, or clear the bit at the call site",
2018            cgroup_name,
2019            unknown_bits,
2020            MpolFlags::STATIC_NODES.bits(),
2021            MpolFlags::RELATIVE_NODES.bits(),
2022            MpolFlags::NUMA_BALANCING.bits(),
2023        );
2024    }
2025
2026    // `STATIC_NODES | RELATIVE_NODES` is a kernel-rejected combination —
2027    // `MPOL_F_STATIC_NODES` and `MPOL_F_RELATIVE_NODES` are mutually
2028    // exclusive (see `include/uapi/linux/mempolicy.h` + the
2029    // `sanitize_mpol_flags` helper in `mm/mempolicy.c`, which bails
2030    // with `EINVAL` if both are set). Fail early here instead of
2031    // letting the syscall return a generic error — the scenario
2032    // caller almost certainly meant one or the other, not both.
2033    if flags.contains(MpolFlags::STATIC_NODES) && flags.contains(MpolFlags::RELATIVE_NODES) {
2034        anyhow::bail!(
2035            "cgroup '{}': MpolFlags::STATIC_NODES and MpolFlags::RELATIVE_NODES are \
2036             mutually exclusive (the kernel will reject the set_mempolicy syscall with \
2037             EINVAL); pick whichever matches the intended semantics — STATIC_NODES \
2038             for absolute node ids that survive cpuset changes, RELATIVE_NODES for \
2039             cpuset-relative indices",
2040            cgroup_name,
2041        );
2042    }
2043
2044    let policy_nodes = policy.node_set();
2045    if policy_nodes.is_empty() {
2046        return Ok(());
2047    }
2048
2049    // `STATIC_NODES`: nodemask is treated as absolute node ids and NOT
2050    // intersected with the cpuset. The cpuset-coverage check below
2051    // does not apply, but we DO need to verify the referenced nodes
2052    // actually exist on the host — a policy pinning node 7 on a
2053    // 2-node host would fail at syscall time; surfacing it here
2054    // names the offender.
2055    if flags.contains(MpolFlags::STATIC_NODES) {
2056        let host_nodes = ctx.topo.numa_node_ids();
2057        let missing: Vec<usize> = policy_nodes
2058            .iter()
2059            .copied()
2060            .filter(|n| !host_nodes.contains(n))
2061            .collect();
2062        if !missing.is_empty() {
2063            anyhow::bail!(
2064                "cgroup '{}': MemPolicy with MpolFlags::STATIC_NODES references \
2065                 NUMA node(s) {:?} that do not exist on this host (host nodes: {:?}); \
2066                 the kernel will reject or silently drop the policy (Preferred can \
2067                 silently fall back to local allocation; Bind/Interleave reject with \
2068                 EINVAL) — fix the MemPolicy or pick a host with the required nodes",
2069                cgroup_name,
2070                missing,
2071                host_nodes,
2072            );
2073        }
2074        return Ok(());
2075    }
2076
2077    // `RELATIVE_NODES`: nodemask is an ordinal into the cpuset's
2078    // allowed nodes, not an absolute node id set. The cpuset-coverage
2079    // check compares absolute ids, so it does not apply here — the
2080    // kernel does the relative-to-absolute remap internally. Trust
2081    // the caller and bypass the coverage bail, same shape as the
2082    // STATIC_NODES early return.
2083    if flags.contains(MpolFlags::RELATIVE_NODES) {
2084        return Ok(());
2085    }
2086
2087    let cpuset_numa = ctx.topo.numa_nodes_for_cpuset(cpuset);
2088    let uncovered: Vec<usize> = policy_nodes
2089        .iter()
2090        .copied()
2091        .filter(|n| !cpuset_numa.contains(n))
2092        .collect();
2093    if !uncovered.is_empty() {
2094        anyhow::bail!(
2095            "cgroup '{}': MemPolicy references NUMA node(s) {:?} \
2096             outside the cpuset's coverage (cpuset covers node(s) \
2097             {:?}) — some or all of the worker's allocations would \
2098             live on NUMA nodes its CPUs cannot reach locally, \
2099             producing cross-socket allocation traffic that is \
2100             almost certainly unintended. Two fixes: \
2101             (a) add .mpol_flags(MpolFlags::STATIC_NODES) to \
2102             declare the cross-node placement intentional (the \
2103             flag survives cpuset rebinds; see MpolFlags doc), or \
2104             (b) widen the cpuset to cover the policy's nodes \
2105             (e.g. CpusetSpec::Numa(N) for each referenced N, or \
2106             a CpusetSpec::Exact set that spans both).",
2107            cgroup_name,
2108            uncovered,
2109            cpuset_numa,
2110        );
2111    }
2112    Ok(())
2113}
2114
2115/// SIGKILL every worker in `handles` without reaping. Used only on
2116/// the scheduler-death paths in [`run_scenario`]: when the scheduler
2117/// under test dies, the kernel disables sched_ext and its workers
2118/// fall back to the builtin scheduler, so a CPU-bound worker pool
2119/// would otherwise make the per-worker `waitpid` reap in the
2120/// subsequent [`collect_step`] / [`collect_backdrop`]
2121/// scheduling-gated (teardown time scaling with worker count).
2122/// Delivering SIGKILL to every worker first lets that reap find
2123/// already-exiting workers. See
2124/// [`crate::workload::WorkloadHandle::sigkill_workers`].
2125fn sigkill_handles(handles: &[(String, WorkloadHandle)]) {
2126    for (_, h) in handles {
2127        h.sigkill_workers();
2128    }
2129}
2130
2131/// Collect step-local worker results and produce an AssertResult.
2132///
2133/// Drains step-local handles + payload handles; backdrop state is
2134/// untouched. Called at every step boundary (success AND error
2135/// paths) as the "Step is fully bounded" teardown. The
2136/// `step_state` goes out of scope at the end of this step's
2137/// iteration, so its `CgroupGroup` drop removes every step-local
2138/// cgroup immediately after `run_scenario` propagates the result
2139/// of this call.
2140///
2141/// Before draining handles, every step-local cgroup is unfrozen
2142/// (`cgroup.freeze` ← 0). An [`Op::FreezeCgroup`] without a paired
2143/// [`Op::UnfreezeCgroup`] would leave step-local tasks frozen at
2144/// step boundary, where the graceful cooperative stop cannot make
2145/// progress: the worker stop is a non-fatal SIGUSR1, and a frozen
2146/// task re-enters the freezer trap on a non-fatal signal
2147/// (`kernel/signal.c` `do_freezer_trap`) rather than running its
2148/// handler — so it never flips its stop flag or reports until
2149/// thawed, and `stop_and_collect` burns its full collection
2150/// deadline before falling back to a sentinel report. (A fatal
2151/// SIGKILL DOES wake and kill a frozen task — it diverges before
2152/// the freezer trap — so worker death and the subsequent rmdir are
2153/// unaffected by the freeze; the unfreeze only restores the
2154/// graceful path's ability to collect real reports.) Failures are
2155/// logged at warn level only — a missing freezer file or a cgroup
2156/// that was already torn down is benign at teardown time, and
2157/// propagating would mask the real workload result.
2158fn collect_step(
2159    step_state: &mut StepState<'_>,
2160    checks: &crate::assert::Assert,
2161    topo: &crate::topology::TestTopology,
2162    cgroups: &dyn crate::cgroup::CgroupOps,
2163    // The 1-indexed phase step_index (BASELINE=0, Step k -> k+1) this
2164    // teardown belongs to, threaded so collect_handles can attach the per-phase
2165    // per_cgroup carrier. `None` for the defensive backdrop-staging collect,
2166    // which has no step attribution.
2167    step_index: Option<u16>,
2168) -> AssertResult {
2169    // Unfreeze every step-local cgroup before the graceful collect.
2170    // A frozen worker re-enters the freezer trap on the cooperative
2171    // (non-fatal) SIGUSR1 stop instead of running its handler, so it
2172    // never reports until thawed. (SIGKILL would still kill it; only
2173    // the graceful report path needs the thaw.)
2174    for name in step_state.cgroups.names() {
2175        if let Err(e) = cgroups.set_freeze(name, false) {
2176            tracing::warn!(
2177                cgroup = %name,
2178                err = %format!("{e:#}"),
2179                "collect_step: pre-teardown unfreeze failed; any frozen workers will yield sentinel reports (still SIGKILL-reaped, no leak)"
2180            );
2181        }
2182    }
2183    // Kill any CgroupDef::workload / Op::RunPayload payload binaries
2184    // still live at step teardown so cgroupfs cleanup does not trip
2185    // EBUSY. Metrics are emitted to the SHM ring by PayloadHandle::kill
2186    // via the `evaluate()` pipeline.
2187    drain_all_payload_handles(&mut step_state.payload_handles);
2188    // Drain any host-mode stall reports that accumulated during the
2189    // step BEFORE dropping the monitor handle (Drop joins the
2190    // polling thread). Reports get folded into the merged
2191    // [`AssertResult`] below as
2192    // [`crate::assert::DetailKind::WorkerStalled`] failures. `take`
2193    // ensures the handle drops here (joining the thread) so the
2194    // polling thread exits before per-step teardown returns.
2195    let stall_reports = if let Some(handle) = step_state.stall_monitor.take() {
2196        let reports = handle.drain();
2197        drop(handle);
2198        reports
2199    } else {
2200        Vec::new()
2201    };
2202    let handles = std::mem::take(&mut step_state.handles);
2203    let mut result = crate::scenario::collect_handles(
2204        handles.into_iter().map(|(name, h)| {
2205            let cpuset = step_state.cpusets.get(&name);
2206            (name, h, cpuset)
2207        }),
2208        checks,
2209        Some(topo),
2210        step_index,
2211    );
2212    for report in stall_reports {
2213        result.record_fail(crate::assert::AssertDetail::new(
2214            crate::assert::DetailKind::WorkerStalled,
2215            format_stall_report(&report),
2216        ));
2217    }
2218    result
2219}
2220
2221/// Render a [`crate::scenario::host_stall::StallReport`] as a
2222/// human-readable single-string assertion detail. Emits the stalled
2223/// pid + comm, the sample-window summary (first and last values
2224/// for both counters PLUS the computed `last - first` delta — both
2225/// expected to be zero for a true stall but rendered from the
2226/// samples themselves so a predicate-tolerance refactor stays
2227/// observable), and the diagnostic subset (state, wchan, syscall,
2228/// cgroup, host loadavg, optional kernel stack). The diagnostic's
2229/// `status_full` field is intentionally OMITTED — the parsed
2230/// `state` letter carries the actionable signal and the full
2231/// status file is verbose; sidecar consumers keying off
2232/// [`crate::assert::DetailKind::WorkerStalled`] can match on the
2233/// kind discriminator and read the full StallReport (carries the
2234/// status_full field) without parsing this message.
2235///
2236/// Format is multi-line so the operator can read the trip at a
2237/// glance without parsing structured output.
2238fn format_stall_report(report: &crate::scenario::host_stall::StallReport) -> String {
2239    use std::fmt::Write as _;
2240    let mut s = String::new();
2241    let _ = writeln!(
2242        s,
2243        "worker stall detected: pid={} comm={:?} (host-mode /proc/<pid>/sched polling)",
2244        report.pid, report.comm,
2245    );
2246    if let (Some(first), Some(last)) = (report.samples.first(), report.samples.last()) {
2247        // Compute deltas from the actual sample values so a
2248        // predicate-tolerance refactor (e.g. "fire on Δ <= N
2249        // switches rather than == 0") stays observable in the
2250        // rendered output. Saturating subtraction handles the
2251        // pathological counter-rollover case without panic.
2252        let nr_delta = last.nr_switches.saturating_sub(first.nr_switches);
2253        let rt_delta = last
2254            .sum_exec_runtime_ns
2255            .saturating_sub(first.sum_exec_runtime_ns);
2256        let _ = writeln!(
2257            s,
2258            "  sample window: nr_switches {} -> {} (delta {nr_delta}), sum_exec_runtime_ns {} -> {} (delta {rt_delta}), {} samples",
2259            first.nr_switches,
2260            last.nr_switches,
2261            first.sum_exec_runtime_ns,
2262            last.sum_exec_runtime_ns,
2263            report.samples.len(),
2264        );
2265    }
2266    let d = &report.diagnostic;
2267    let _ = writeln!(s, "  state: {}", d.state);
2268    let _ = writeln!(s, "  wchan: {}", d.wchan);
2269    let _ = writeln!(s, "  syscall: {}", d.syscall);
2270    let _ = writeln!(s, "  cgroup: {}", d.cgroup);
2271    let _ = writeln!(s, "  host loadavg: {}", d.host_loadavg);
2272    if let Some(stack) = &d.stack {
2273        let _ = writeln!(s, "  kernel stack:\n{stack}");
2274    }
2275    s
2276}
2277
2278/// Collect backdrop (persistent) worker results. Called once at
2279/// scenario end after every Step has torn down. The
2280/// `backdrop_state.cgroups` RAII guard drops persistent cgroups
2281/// when `backdrop_state` itself drops.
2282///
2283/// Mirrors [`collect_step`]'s pre-teardown unfreeze pass over every
2284/// tracked cgroup, for the same reason: a backdrop cgroup left
2285/// frozen at scenario end cannot run the graceful cooperative stop.
2286/// A frozen task re-enters the freezer trap on the non-fatal SIGUSR1
2287/// (`kernel/signal.c` `do_freezer_trap`) instead of running its
2288/// handler, so its workers never report until thawed. The asymmetry
2289/// between step-local and backdrop teardown — only the former
2290/// unfreezing — would surface as backdrop workers yielding sentinel
2291/// reports (after burning the collection deadline) on every scenario
2292/// whose Backdrop froze a cgroup and never unfroze it. Symmetric
2293/// unfreeze brings the backdrop path back in line with the per-step
2294/// path. (As in `collect_step`, a fatal SIGKILL still kills a frozen
2295/// worker, so death and the `BackdropState::cgroups` RAII rmdir are
2296/// unaffected by the freeze.)
2297fn collect_backdrop(
2298    backdrop_state: &mut BackdropState<'_>,
2299    checks: &crate::assert::Assert,
2300    topo: &crate::topology::TestTopology,
2301    cgroups: &dyn crate::cgroup::CgroupOps,
2302) -> AssertResult {
2303    // Unfreeze every backdrop cgroup before the graceful collect.
2304    // Same rationale as `collect_step`: a frozen worker re-enters
2305    // the freezer trap on the cooperative (non-fatal) SIGUSR1 stop
2306    // instead of running its handler, so it never reports until
2307    // thawed. (SIGKILL would still kill it; only the graceful report
2308    // path needs the thaw.)
2309    for name in backdrop_state.cgroups.names() {
2310        if let Err(e) = cgroups.set_freeze(name, false) {
2311            tracing::warn!(
2312                cgroup = %name,
2313                err = %format!("{e:#}"),
2314                "collect_backdrop: pre-teardown unfreeze failed; any frozen workers will yield sentinel reports (still SIGKILL-reaped, no leak)"
2315            );
2316        }
2317    }
2318    drain_all_payload_handles(&mut backdrop_state.payload_handles);
2319    let handles = std::mem::take(&mut backdrop_state.handles);
2320    crate::scenario::collect_handles(
2321        handles.into_iter().map(|(name, h)| {
2322            let cpuset = backdrop_state.cpusets.get(&name);
2323            (name, h, cpuset)
2324        }),
2325        checks,
2326        Some(topo),
2327        // Backdrop spans every phase: the None arm expands each worker's
2328        // per-phase PhaseSlices into one per-epoch PhaseBucket
2329        // (expand_backdrop_phase_buckets) — per-epoch, not single-step,
2330        // attribution.
2331        None,
2332    )
2333}
2334
2335/// Kill every payload handle whose cgroup matches `cgroup` and drop
2336/// the matched entries from `handles`. Runs before the cgroup is
2337/// removed or stopped; failures are logged to stderr but do not
2338/// propagate — the cgroup removal is best-effort already, and the
2339/// payload-kill failure is never the primary error.
2340///
2341/// **Metric emission depends on the explicit `.kill()` call** —
2342/// if a future refactor replaces the `.kill()` below with plain
2343/// `drop(handle)`, the `PayloadHandle::drop` SIGKILLs the child
2344/// but skips the evaluate-and-emit pipeline that records metrics
2345/// to the SHM ring. Test helpers that drain payload handles
2346/// likewise route through `drain_all_payload_handles` for the
2347/// same reason. Preserve `.kill()` on every path that claims to
2348/// drain handles for metric capture.
2349///
2350/// Drop order across matched entries is LIFO (last pushed, first
2351/// dropped) — the loop walks indices from the tail toward index 0
2352/// using `Vec::remove` so newer matched entries' embedded
2353/// `SigchldScope`s restore the SIGCHLD disposition before older
2354/// matches do, matching the save-and-restore chain documented on
2355/// `PayloadHandle` in `payload_run.rs`. `Vec::swap_remove` would
2356/// rotate the tail into the freed slot and break LIFO across
2357/// matches; `Vec::remove` preserves the relative order of the
2358/// remaining (unmatched) survivors. Note: SIGCHLD scope LIFO across
2359/// the FULL vec is structurally unsalvageable in any partial-drain
2360/// helper — unmatched entries that stay alive in `handles` outlive
2361/// their younger matched siblings whose scopes already restored.
2362/// The full-vec LIFO contract holds only when every handle is
2363/// dropped together via [`drain_all_payload_handles`].
2364fn drain_payload_handles_for_cgroup(handles: &mut Vec<PayloadEntry>, cgroup: &str) {
2365    let mut i = handles.len();
2366    while i > 0 {
2367        i -= 1;
2368        if handles[i].cgroup.as_str() == cgroup {
2369            let entry = handles.remove(i);
2370            if let Err(e) = entry.handle.kill() {
2371                eprintln!("ktstr: kill payload in cgroup '{cgroup}': {e:#}");
2372            }
2373        }
2374    }
2375}
2376
2377/// Kill every payload handle regardless of cgroup and clear the
2378/// vector. Called at step-sequence teardown so every handle gets a
2379/// terminal `.kill()` (and therefore a sidecar metric emission) even
2380/// when no explicit `RemoveCgroup`/`StopCgroup` op targeted it.
2381///
2382/// Drop order is LIFO (last pushed, first dropped) — `Vec::pop`
2383/// returns the tail first, so `PayloadHandle::drop` runs in reverse
2384/// creation order. Each handle's embedded `SigchldScope` captured the
2385/// `SIGCHLD` disposition that was live at construction time (the
2386/// previous scope's installed `SIG_DFL`). Restoring in LIFO unwinds
2387/// the save-and-restore chain back to the original disposition; FIFO
2388/// drop (e.g. `Vec::drain(..)`) restores intermediate `SIG_DFL` values
2389/// out of order and leaks `SIG_DFL` past the outermost scope. See the
2390/// DROP-ORDER-CRITICAL note on `PayloadHandle` in `payload_run.rs`.
2391fn drain_all_payload_handles(handles: &mut Vec<PayloadEntry>) {
2392    while let Some(entry) = handles.pop() {
2393        if let Err(e) = entry.handle.kill() {
2394            eprintln!(
2395                "ktstr: teardown kill payload in cgroup {}: {e:#}",
2396                render_cgroup_key(&entry.cgroup),
2397            );
2398        }
2399    }
2400}
2401
2402#[cfg(test)]
2403mod tests;
2404
2405#[cfg(test)]
2406mod workers_pct_construction_tests;
2407
2408#[cfg(test)]
2409mod kernel_op_dispatch_tests;