ktstr/scenario/ops/mod.rs
1//! Composable ops/steps system for dynamic cgroup topology changes.
2//!
3//! [`Op`] is an atomic cgroup operation. [`Step`] sequences ops with a
4//! hold period. [`CgroupDef`] bundles create + cpuset + spawn into a
5//! single declaration. [`execute_steps()`] runs a step sequence with
6//! scheduler liveness checks and stimulus event recording.
7//!
8//! See the [Ops and Steps](https://ktstr.dev/guide/concepts/ops.html)
9//! chapter for a guide.
10//!
11//! # Cgroup tooling at a glance
12//!
13//! ktstr exposes the cgroup v2 surface across two layers — declarative
14//! steady-state via [`CgroupDef`] (set at scenario-setup time, holds
15//! for the cgroup's lifetime) and imperative state-transitions via
16//! [`Op`] (applied mid-step, describe transitions over time):
17//!
18//! | Knob | Layer | API entry | Underlying file | When to use |
19//! |------|-------|-----------|-----------------|-------------|
20//! | CPU affinity | setup | [`CgroupDef::cpuset`] | `cpuset.cpus` | Bind workers to a CPU subset for the whole run. |
21//! | NUMA-mem affinity | setup | [`CgroupDef::cpuset_mems`] | `cpuset.mems` | Constrain allocations to specific NUMA nodes. |
22//! | CPU bandwidth | setup | [`CgroupDef::cpu_quota_pct`] / [`CgroupDef::cpu_quota`] / [`CgroupDef::cpu_unlimited`] | `cpu.max` | Cap CPU time per period (1 CPU at 50% / 2 CPU at 100% / etc). |
23//! | CPU share weight | setup | [`CgroupDef::cpu_weight`] | `cpu.weight` | Bias relative CPU share when siblings contend. |
24//! | Memory ceiling | setup | [`CgroupDef::memory_max`] / [`CgroupDef::memory_unlimited`] | `memory.max` | Hard ceiling — exceeding triggers cgroup OOM. |
25//! | Memory throttle | setup | [`CgroupDef::memory_high`] | `memory.high` | Soft throttle: triggers reclaim, not OOM. |
26//! | Memory protection | setup | [`CgroupDef::memory_low`] | `memory.low` | Soft protection: kernel reclaims from siblings first. |
27//! | Swap cap | setup | [`CgroupDef::memory_swap_max`] / [`CgroupDef::memory_swap_unlimited`] | `memory.swap.max` | Cap how much memory can spill to swap (CONFIG_SWAP=y). |
28//! | IO share | setup | [`CgroupDef::io_weight`] | `io.weight` | Bias relative IO share when siblings contend. |
29//! | Task ceiling | setup | [`CgroupDef::pids_max`] / [`CgroupDef::pids_unlimited`] | `pids.max` | Cap process+thread count — fork/clone returns EAGAIN at limit. |
30//! | Mid-run cpuset rebind | mid-step | [`Op::set_cpuset`] / [`Op::clear_cpuset`] / [`Op::swap_cpusets`] | `cpuset.cpus` | Move cpuset on a live cgroup mid-scenario. |
31//! | Mid-run task migration | mid-step | [`Op::move_all_tasks`] | `cgroup.procs` | Move workers from one cgroup to another. |
32//! | Pause/resume | mid-step | [`Op::freeze_cgroup`] / [`Op::unfreeze_cgroup`] | `cgroup.freeze` | Suspend every task in the cgroup; resume later. |
33//! | Add/remove cgroup | mid-step | [`Op::add_cgroup`] / [`Op::remove_cgroup`] / [`Op::stop_cgroup`] | (cgroupfs mkdir/rmdir) | Spawn / tear down a cgroup mid-scenario. |
34//!
35//! # Worked examples
36//!
37//! * **Static topology** (one cgroup, fixed cpuset, weight-biased
38//! compute): [`CgroupDef`] type-level docs.
39//! * **Suspend/resume** (3-Step idiom — run, freeze, run again):
40//! [`Op::FreezeCgroup`] doc.
41//! * **Memory-cap teardown** (rewind a base CgroupDef's swap cap):
42//! [`CgroupDef::memory_swap_unlimited`] doc.
43//!
44//! # Implementation entry points
45//!
46//! Every knob ends in [`crate::cgroup::CgroupOps`] (production:
47//! [`crate::cgroup::CgroupManager`]; tests: a recording `MockCgroupOps`
48//! double). `apply_setup` runs the [`CgroupDef`] passes; `apply_ops`
49//! dispatches the [`Op`] variants. Both share `ctx.cgroups` so a test
50//! that uses both layers writes through the same RAII teardown
51//! (`crate::scenario::CgroupGroup::Drop`).
52//!
53//! # File layout
54//!
55//! `types` holds the data model: [`Op`], [`CgroupDef`], [`Step`],
56//! [`HoldSpec`], [`Setup`], [`CpusetSpec`], the per-controller limits
57//! structs, and every builder constructor. Re-exported from this module
58//! so external paths remain `crate::scenario::ops::Op` etc. The executor
59//! drives that model against [`crate::cgroup::CgroupOps`] via `apply_setup`
60//! (sibling `setup` module) and `apply_ops` (sibling `dispatch` module),
61//! and exposes the [`execute_steps`] / [`execute_scenario`] family of
62//! public entry points (this file).
63
64mod types;
65pub use types::*;
66
67mod setup;
68pub use setup::PLACEMENT_LOG_PATH;
69use setup::apply_setup;
70
71mod dispatch;
72#[cfg(test)]
73use dispatch::{
74 REPLACE_NOT_TRYING_DEADLINE_S, build_kernel_op_request, dispatch_kernel_op_request,
75 merge_adjacent_cold_writes, staged_scheduler_log_path, wait_for_accessor_publish_or_bail,
76 wait_for_worker_state_not_trying_or_bail, write_entries_from_writes,
77};
78use dispatch::{apply_ops, render_cgroup_key};
79// Re-export the scx-state reader + enum. The guest-init scheduler
80// attach-readiness predicate (rust_init::scheduler::scx_attach_ready) reads
81// scx_state() to confirm SCX_ENABLED before the workload dispatches, and the
82// survives_storm probe's host unit test (`test_support::probe_tests`) drives
83// scheduler liveness on the host (no VM) via the set_test_scx_state seam.
84// `dispatch` is otherwise private to `ops`.
85#[cfg(test)]
86pub(crate) use dispatch::set_test_scx_state;
87pub(crate) use dispatch::{ScxState, scx_down, scx_state};
88
89use std::collections::BTreeSet;
90use std::thread;
91use std::time::Duration;
92
93use anyhow::{Context, Result};
94
95use crate::assert::AssertResult;
96use crate::scenario::backdrop;
97use crate::scenario::{CgroupGroup, Ctx, process_alive};
98use crate::vmm::guest_comms;
99use crate::vmm::wire::StimulusPayload;
100use crate::workload::{MemPolicy, WorkloadHandle};
101
102// ---------------------------------------------------------------------------
103// Step executor
104// ---------------------------------------------------------------------------
105
106/// Persistent scenario-wide state owned by
107/// [`execute_scenario_with`]. Lives for the entire step sequence;
108/// cgroups, workload handles, and payload handles declared by the
109/// [`Backdrop`](backdrop::Backdrop) go here and only tear
110/// down at scenario end (success or Err). See [`StepState`] for
111/// the step-local counterpart.
112struct BackdropState<'a> {
113 /// RAII cgroup guard for persistent cgroups — removes them on drop.
114 cgroups: CgroupGroup<'a>,
115 /// Active workload handles in persistent cgroups, keyed by cgroup name.
116 handles: Vec<(String, WorkloadHandle)>,
117 /// Resolved cpusets per persistent cgroup name.
118 cpusets: std::collections::HashMap<String, BTreeSet<usize>>,
119 /// Active payload-binary handles owned by the backdrop. Drained
120 /// via `.kill()` at scenario teardown so the metric-emission
121 /// pipeline still fires.
122 payload_handles: Vec<PayloadEntry>,
123 /// BPF map fds opened via [`crate::scenario::ops::types::Op::PinBpfMap`].
124 /// Keyed by the map name the caller requested; the
125 /// [`std::os::fd::OwnedFd`] holds an extra refcount on the
126 /// kernel-side `struct bpf_map` so the map survives any
127 /// scheduler-process teardown (including
128 /// [`crate::scenario::ops::types::Op::ReplaceScheduler`]) until
129 /// scenario end. Drops close the fds and release the refcount.
130 pinned_bpf_maps: std::collections::HashMap<String, std::os::fd::OwnedFd>,
131}
132
133impl<'a> BackdropState<'a> {
134 /// Empty backdrop state (no persistent entities), scoped to `ctx.cgroups`.
135 fn empty(ctx: &'a Ctx) -> Self {
136 Self {
137 cgroups: CgroupGroup::new(ctx.cgroups),
138 handles: Vec::new(),
139 cpusets: std::collections::HashMap::new(),
140 payload_handles: Vec::new(),
141 pinned_bpf_maps: std::collections::HashMap::new(),
142 }
143 }
144}
145
146/// Step-local execution state. Fresh per step, torn down at step
147/// boundary: cgroups removed (via RAII drop), workload handles
148/// collected, payload handles killed with metric emission. Any ops
149/// in the step that reference a cgroup name look here first before
150/// falling through to [`BackdropState`].
151struct StepState<'a> {
152 /// RAII cgroup guard — removes step-local cgroups on drop.
153 cgroups: CgroupGroup<'a>,
154 /// Active workload handles keyed by step-local cgroup name.
155 handles: Vec<(String, WorkloadHandle)>,
156 /// Resolved cpusets per step-local cgroup name, for isolation checks.
157 cpusets: std::collections::HashMap<String, BTreeSet<usize>>,
158 /// Active payload-binary handles keyed by cgroup name. Each entry
159 /// came from either a [`CgroupDef::workload`] spawn in
160 /// `apply_setup` or an explicit [`Op::RunPayload`] invocation;
161 /// `source` tags which path spawned it so the duplicate-name
162 /// dedup in `Op::RunPayload` can point at the original site. All
163 /// are killed during step-teardown / cgroup removal so cgroupfs
164 /// cleanup never trips EBUSY on a live process.
165 payload_handles: Vec<PayloadEntry>,
166 /// Host-mode worker stall monitor, started lazily at the end of
167 /// the first successful [`apply_setup`] when running outside a
168 /// VM (no `is_guest`, no `cargo_test_mode`) and at least one
169 /// worker exists. The handle's [`Drop`] joins the polling
170 /// thread when [`StepState`] drops; [`collect_step`] drains
171 /// any accumulated reports before that drop so they reach the
172 /// scenario's [`AssertResult`]. `None` in every guest-side
173 /// scenario and in `cargo_test_mode` runs — the host-side
174 /// monitor is the only stall signal available in host-mode,
175 /// where the freeze coordinator / KVM-side stall plumbing is
176 /// not running. See [`crate::scenario::host_stall`] for the
177 /// signal definition and detection latency contract.
178 stall_monitor: Option<crate::scenario::host_stall::StallMonitorHandle>,
179}
180
181impl<'a> StepState<'a> {
182 /// Empty step state scoped to `ctx.cgroups`.
183 fn empty(ctx: &'a Ctx) -> Self {
184 Self {
185 cgroups: CgroupGroup::new(ctx.cgroups),
186 handles: Vec::new(),
187 cpusets: std::collections::HashMap::new(),
188 payload_handles: Vec::new(),
189 stall_monitor: None,
190 }
191 }
192}
193
194/// Combined mutable view over step-local and backdrop state.
195///
196/// Every function that touches execution state (apply_setup,
197/// apply_ops, the drain helpers) receives a
198/// `ScenarioState`; lookups prefer step-local, falling through to
199/// backdrop. New state created via ops/setup inside a step writes
200/// to step-local by default — that is the primary mechanism
201/// enforcing per-step bounded lifetime. Setup for the Backdrop
202/// itself (run once before the step loop) writes straight to the
203/// backdrop side via [`ScenarioState::with_target_backdrop`].
204struct ScenarioState<'a, 'b> {
205 step: &'b mut StepState<'a>,
206 backdrop: &'b mut BackdropState<'a>,
207 /// When true, all mutations route to [`Self::backdrop`] instead
208 /// of [`Self::step`]. Set by [`Self::with_target_backdrop`] when
209 /// running the Backdrop's initial `apply_setup` / `apply_ops`
210 /// before the first step.
211 target_backdrop: bool,
212}
213
214impl<'a, 'b> ScenarioState<'a, 'b> {
215 /// Build a combined scenario view. Starts with the step-local
216 /// slot as the mutation target — call [`Self::with_target_backdrop`]
217 /// to flip into backdrop-setup mode for Backdrop's own
218 /// apply_setup / apply_ops pass.
219 fn new(step: &'b mut StepState<'a>, backdrop: &'b mut BackdropState<'a>) -> Self {
220 Self {
221 step,
222 backdrop,
223 target_backdrop: false,
224 }
225 }
226
227 /// Run `f` with writes routed to the backdrop side.
228 fn with_target_backdrop<R>(&mut self, f: impl FnOnce(&mut Self) -> R) -> R {
229 let prev = self.target_backdrop;
230 self.target_backdrop = true;
231 let r = f(self);
232 self.target_backdrop = prev;
233 r
234 }
235
236 /// `cgroups` group that receives newly-created cgroups. Step-local
237 /// by default; backdrop when [`Self::with_target_backdrop`] is active.
238 fn target_cgroups(&mut self) -> &mut CgroupGroup<'a> {
239 if self.target_backdrop {
240 &mut self.backdrop.cgroups
241 } else {
242 &mut self.step.cgroups
243 }
244 }
245
246 /// `handles` vec that receives newly-spawned workload handles.
247 fn target_handles(&mut self) -> &mut Vec<(String, WorkloadHandle)> {
248 if self.target_backdrop {
249 &mut self.backdrop.handles
250 } else {
251 &mut self.step.handles
252 }
253 }
254
255 /// `cpusets` map that receives resolved cpusets for new cgroups.
256 fn target_cpusets(&mut self) -> &mut std::collections::HashMap<String, BTreeSet<usize>> {
257 if self.target_backdrop {
258 &mut self.backdrop.cpusets
259 } else {
260 &mut self.step.cpusets
261 }
262 }
263
264 /// `payload_handles` vec that receives newly-spawned payload handles.
265 fn target_payload_handles(&mut self) -> &mut Vec<PayloadEntry> {
266 if self.target_backdrop {
267 &mut self.backdrop.payload_handles
268 } else {
269 &mut self.step.payload_handles
270 }
271 }
272
273 /// Resolved cpuset for a cgroup name, looked up step-first then backdrop.
274 fn lookup_cpuset(&self, name: &str) -> Option<&BTreeSet<usize>> {
275 self.step
276 .cpusets
277 .get(name)
278 .or_else(|| self.backdrop.cpusets.get(name))
279 }
280
281 /// Returns the live payload handle matching the composite key
282 /// (`payload_name`, `cgroup_key`) from either step-local or
283 /// backdrop state, or `None` when no entry matches. Used for
284 /// the `Op::RunPayload` duplicate guard, which now treats
285 /// "same payload in a different cgroup" as legitimate rather
286 /// than a name collision.
287 fn find_live_payload_with_cgroup(
288 &self,
289 payload_name: &str,
290 cgroup_key: &str,
291 ) -> Option<&PayloadEntry> {
292 let matches =
293 |e: &&PayloadEntry| e.handle.payload_name() == payload_name && e.cgroup == cgroup_key;
294 self.step
295 .payload_handles
296 .iter()
297 .find(matches)
298 .or_else(|| self.backdrop.payload_handles.iter().find(matches))
299 }
300
301 /// Drop a payload handle by composite key (`name`, optional
302 /// `cgroup`). Checks step-local first, then backdrop.
303 ///
304 /// - `cgroup = Some(c)`: exact match on both name and cgroup.
305 /// - `cgroup = None`: if exactly one entry matches `name` across
306 /// both slots, consume it (backward-compat for
307 /// `Op::wait_payload(name)` / `Op::kill_payload(name)` when
308 /// only one copy is live). If two or more match, returns
309 /// `Err(ambiguous_cgroups)` where `ambiguous_cgroups` is the
310 /// list of cgroup keys for the candidates so the caller can
311 /// produce an actionable error.
312 ///
313 /// Returns `Ok(None)` when no entry matches.
314 fn take_payload_by_name(
315 &mut self,
316 name: &str,
317 cgroup: Option<&str>,
318 ) -> std::result::Result<Option<PayloadEntry>, Vec<String>> {
319 if let Some(c) = cgroup {
320 // Composite-key path: exact match on both.
321 if let Some(idx) = self
322 .step
323 .payload_handles
324 .iter()
325 .position(|e| e.handle.payload_name() == name && e.cgroup == c)
326 {
327 return Ok(Some(self.step.payload_handles.swap_remove(idx)));
328 }
329 if let Some(idx) = self
330 .backdrop
331 .payload_handles
332 .iter()
333 .position(|e| e.handle.payload_name() == name && e.cgroup == c)
334 {
335 return Ok(Some(self.backdrop.payload_handles.swap_remove(idx)));
336 }
337 return Ok(None);
338 }
339 // Name-only path: disambiguate across both slots before
340 // consuming, so a mid-test wait on an ambiguous name
341 // surfaces the caller's bug rather than silently waiting
342 // on the first match.
343 let mut step_idx: Option<usize> = None;
344 let mut backdrop_idx: Option<usize> = None;
345 let mut cgroups: Vec<String> = Vec::new();
346 for (i, e) in self.step.payload_handles.iter().enumerate() {
347 if e.handle.payload_name() == name {
348 if step_idx.is_none() {
349 step_idx = Some(i);
350 }
351 cgroups.push(e.cgroup.clone());
352 }
353 }
354 for (i, e) in self.backdrop.payload_handles.iter().enumerate() {
355 if e.handle.payload_name() == name {
356 if backdrop_idx.is_none() && step_idx.is_none() {
357 backdrop_idx = Some(i);
358 }
359 cgroups.push(e.cgroup.clone());
360 }
361 }
362 if cgroups.len() > 1 {
363 return Err(cgroups);
364 }
365 if let Some(i) = step_idx {
366 return Ok(Some(self.step.payload_handles.swap_remove(i)));
367 }
368 if let Some(i) = backdrop_idx {
369 return Ok(Some(self.backdrop.payload_handles.swap_remove(i)));
370 }
371 Ok(None)
372 }
373
374 /// Drain every live payload handle in step + backdrop state by
375 /// calling `.kill()` so the metric-emission pipeline fires. Used
376 /// on error paths in the step loop so mid-scenario failure still
377 /// leaves a usable sidecar.
378 fn drain_all_payloads(&mut self) {
379 drain_all_payload_handles(&mut self.step.payload_handles);
380 drain_all_payload_handles(&mut self.backdrop.payload_handles);
381 }
382
383 /// Kill every payload handle (step-first, then backdrop) whose
384 /// cgroup matches `cgroup`. Called before a cgroup removal so
385 /// cgroupfs cleanup does not trip EBUSY on a live process.
386 fn drain_payloads_for_cgroup(&mut self, cgroup: &str) {
387 drain_payload_handles_for_cgroup(&mut self.step.payload_handles, cgroup);
388 drain_payload_handles_for_cgroup(&mut self.backdrop.payload_handles, cgroup);
389 }
390
391 /// Remove every workload handle whose key matches `cgroup`. The
392 /// handles themselves drop (which SIGKILLs the workers) — this is
393 /// appropriate for `Op::StopCgroup` and `Op::RemoveCgroup`.
394 fn drop_handles_for_cgroup(&mut self, cgroup: &str) {
395 self.step.handles.retain(|(n, _)| n.as_str() != cgroup);
396 self.backdrop.handles.retain(|(n, _)| n.as_str() != cgroup);
397 }
398
399 /// Forget a tracked cpuset (step-first, then backdrop) for a cgroup.
400 fn forget_cpuset(&mut self, cgroup: &str) {
401 self.step.cpusets.remove(cgroup);
402 self.backdrop.cpusets.remove(cgroup);
403 }
404
405 /// Record / overwrite the resolved cpuset for a cgroup. If the
406 /// cgroup is known to step-local state, the step-local entry
407 /// updates; if it's known to backdrop, the backdrop entry
408 /// updates; otherwise the entry goes into the currently-active
409 /// target (step-local, or backdrop inside `with_target_backdrop`).
410 fn record_cpuset(&mut self, cgroup: &str, cpuset: BTreeSet<usize>) {
411 if self.step.cpusets.contains_key(cgroup) {
412 self.step.cpusets.insert(cgroup.to_string(), cpuset);
413 } else if self.backdrop.cpusets.contains_key(cgroup) {
414 self.backdrop.cpusets.insert(cgroup.to_string(), cpuset);
415 } else {
416 self.target_cpusets().insert(cgroup.to_string(), cpuset);
417 }
418 }
419
420 /// Re-key every workload handle from `from` to `to`. When `to`
421 /// names a Backdrop-owned cgroup, step-local handles are also
422 /// transferred into [`Self::backdrop`] so their lifetime extends
423 /// to scenario end instead of dying at step teardown. Backdrop
424 /// handles stay in the backdrop slot regardless of `to`.
425 ///
426 /// Called by `Op::MoveAllTasks` after the kernel-side
427 /// `cgroup.procs` writes succeed so subsequent ops that address
428 /// the moved workers by cgroup name find them under the new key
429 /// and in the correct state slot.
430 fn rename_handles(&mut self, from: &str, to: &str) {
431 let to_is_backdrop = self.cgroup_name_is_backdrop(to);
432 if to_is_backdrop {
433 // Move step-local handles keyed under `from` into the
434 // backdrop slot, re-keyed to `to`. Iterate in reverse so
435 // swap_remove indices stay stable.
436 let mut i = self.step.handles.len();
437 while i > 0 {
438 i -= 1;
439 if self.step.handles[i].0.as_str() == from {
440 let (_, handle) = self.step.handles.swap_remove(i);
441 self.backdrop.handles.push((to.to_string(), handle));
442 }
443 }
444 } else {
445 // Step-local destination: keep ownership, just rename.
446 for (name, _) in &mut self.step.handles {
447 if name.as_str() == from {
448 *name = to.to_string();
449 }
450 }
451 }
452 // Backdrop handles are never demoted to step-local ownership
453 // regardless of destination — a backdrop worker is declared
454 // persistent and stays persistent for the scenario. Rename
455 // in place so subsequent ops still find it under the new key.
456 for (name, _) in &mut self.backdrop.handles {
457 if name.as_str() == from {
458 *name = to.to_string();
459 }
460 }
461 }
462
463 /// Iterate every live workload handle across step + backdrop.
464 /// Used by `Op::MoveAllTasks` / `Op::SetAffinity` which act on
465 /// whichever cgroup owns the handle without caring about which
466 /// state slot it's in.
467 fn all_handles(&self) -> impl Iterator<Item = &(String, WorkloadHandle)> {
468 self.step.handles.iter().chain(self.backdrop.handles.iter())
469 }
470
471 /// True iff a cgroup with the given name is already tracked by
472 /// either step-local or backdrop state. Used to reject duplicate
473 /// names at `apply_setup` time so a user can't accidentally
474 /// shadow a Backdrop cgroup with a step-local [`CgroupDef`].
475 fn cgroup_name_is_tracked(&self, name: &str) -> bool {
476 self.step.cgroups.names().iter().any(|n| n == name)
477 || self.backdrop.cgroups.names().iter().any(|n| n == name)
478 }
479
480 /// True iff a cgroup with the given name is tracked by backdrop
481 /// (persistent) state. Used by `Op::MoveAllTasks` to decide
482 /// handle-ownership transfer direction (step→backdrop transfers
483 /// the handle into the persistent slot; backdrop→step-local is
484 /// rejected because it would orphan workers at step teardown).
485 fn cgroup_name_is_backdrop(&self, name: &str) -> bool {
486 self.backdrop.cgroups.names().iter().any(|n| n == name)
487 }
488}
489
490/// Whether a live payload handle was spawned by an explicit
491/// [`Op::RunPayload`] inside the step or by a
492/// [`CgroupDef::workload`] attachment at `apply_setup`. Held by
493/// every [`PayloadEntry`] so the dedup path in `Op::RunPayload`
494/// can name the original source when rejecting a second spawn of
495/// the same name.
496#[derive(Debug, Clone, Copy, PartialEq, Eq)]
497enum PayloadSource {
498 /// Spawned by `CgroupDef::workload(&payload)` during `apply_setup`.
499 CgroupDefWorkload,
500 /// Spawned by `Op::RunPayload { payload, .. }` inside the step's ops.
501 OpRunPayload,
502}
503
504impl PayloadSource {
505 /// Human-readable tag for error output. Describes the API surface
506 /// that originated the spawn, not the internal dispatch site.
507 fn describe(self) -> &'static str {
508 match self {
509 PayloadSource::CgroupDefWorkload => "CgroupDef::workload",
510 PayloadSource::OpRunPayload => "Op::RunPayload",
511 }
512 }
513}
514
515/// One live payload handle plus the cgroup it runs inside and the
516/// API surface that spawned it. `cgroup` is empty iff
517/// `source == PayloadSource::OpRunPayload` was invoked without a
518/// `cgroup = Some(...)` argument — in which case the payload runs
519/// in whatever cgroup its parent process inherited (no explicit
520/// placement).
521struct PayloadEntry {
522 cgroup: String,
523 source: PayloadSource,
524 handle: crate::scenario::payload_run::PayloadHandle,
525}
526
527/// Map the BPF probe's current scheduler-exit classification onto
528/// the [`crate::assert::DetailKind`] variant the three liveness
529/// emission sites push. Reads [`crate::probe::process::sched_exit_kind`]
530/// which mirrors the probe's `ktstr_err_exit_detected` BSS latch
531/// across threads.
532///
533/// Returns:
534/// - `SchedulerCrashed` when the probe observed a non-clean kernel
535/// exit (any path that latched `ktstr_err_exit_detected`).
536/// - `SchedulerExitedCleanly` when the probe ran but never observed
537/// the latch (clean `SCX_EXIT_NONE` teardown, or the scheduler
538/// exited for a benign reason).
539/// - `SchedulerDiedUnknownReason` when the probe has not classified
540/// yet — typically the probe pipeline never wired for this run
541/// (host-only test, no scheduler attached) or the poll thread has
542/// not completed a first iteration since the prior reset.
543fn sched_died_detail_kind() -> crate::assert::DetailKind {
544 use crate::assert::DetailKind;
545 use crate::probe::process::{SchedExitKind, sched_exit_kind};
546 match sched_exit_kind() {
547 SchedExitKind::Crashed => DetailKind::SchedulerCrashed,
548 SchedExitKind::Clean => DetailKind::SchedulerExitedCleanly,
549 SchedExitKind::Unknown => DetailKind::SchedulerDiedUnknownReason,
550 }
551}
552
553/// Classify scheduler liveness for the guest-side `survives_storm` probe:
554/// `Some(kind)` when a scheduler is still expected to run (`sched_pid` set) but
555/// has died or gone down — the leader pid ESRCH'd OR the scx state reads
556/// `disabling`/`disabled` ([`dispatch::scx_down`]) — and `None` otherwise.
557///
558/// Mirrors the inter-step / post-loop liveness gate `run_scenario` runs for
559/// `execute_*` scenarios (`!process_alive(pid) || dispatch::scx_down()`, gated
560/// on `sched_pid()` being set), so a clean `Op::DetachScheduler` — which clears
561/// the pid before this would observe `disabled` — does not trip it. The kind
562/// comes from [`sched_died_detail_kind`]; in the primary test VM the BPF
563/// err-exit latch is unread, so the kind is
564/// [`crate::assert::DetailKind::SchedulerDiedUnknownReason`] (the scx state is
565/// kind-less — a crash and a clean unregister both read `disabling`/`disabled`).
566pub(crate) fn sched_liveness_failure_kind() -> Option<crate::assert::DetailKind> {
567 let pid = crate::vmm::rust_init::sched_pid()?;
568 if !process_alive(pid) || dispatch::scx_down() {
569 Some(sched_died_detail_kind())
570 } else {
571 None
572 }
573}
574
575/// Execute a single step with CgroupDefs that hold for the full duration.
576///
577/// Convenience wrapper around [`execute_steps`] for the common pattern
578/// of creating cgroups and running them for [`HoldSpec::FULL`].
579pub fn execute_defs(ctx: &Ctx, defs: Vec<CgroupDef>) -> Result<AssertResult> {
580 execute_steps(ctx, vec![Step::with_defs(defs, HoldSpec::FULL)])
581}
582
583/// Block until the host freeze coordinator has ADOPTED its
584/// kernel-symbol accessor — signalled via the
585/// `SIGNAL_ACCESSOR_READY` wake byte →
586/// `accessor_ready_latch` (set by `hvc0_poll_loop`). A failure dump
587/// captured by a stall AFTER this returns renders real BPF map values
588/// instead of placeholders, because the coordinator's `owned_accessor`
589/// is adopted before the stall fires. Call this in a dump-asserting
590/// scenario before triggering its stall (e.g. before [`execute_steps`]
591/// with a `--stall-after` scheduler).
592///
593/// Guest-only: a no-op on the host (unit tests), where the latch is
594/// never armed. Warn-and-proceed on a 60s timeout — a never-adopted
595/// accessor is a worker failure, and surfacing it as a placeholder dump
596/// is more useful than blocking the test forever (mirrors the
597/// `wait_for_map_write` gate's soft-timeout policy). On that timeout the
598/// "renders real values" guarantee above does NOT hold: the wait returns
599/// without adoption and a subsequent stall may dump placeholders, which
600/// the test's post-VM dump assertions then surface as a failure.
601///
602/// The latch is sticky (level-triggered) and sets once, on the FIRST
603/// adoption. A re-init publish after a scheduler swap
604/// (`Op::ReplaceScheduler`) reuses the same latch, so a later
605/// `await_accessor_ready` returns immediately and does NOT re-synchronise
606/// on the post-swap adoption — today's only caller gates the first stall,
607/// before any swap.
608pub fn await_accessor_ready() {
609 if guest_comms::is_guest() {
610 let latch = crate::vmm::rust_init::accessor_ready_latch();
611 if !latch.wait_timeout(Duration::from_secs(60)) {
612 tracing::warn!(
613 "await_accessor_ready timed out after 60s — host freeze \
614 coordinator did not signal accessor adoption; a dump from a \
615 stall after this point may render placeholder map values"
616 );
617 }
618 }
619}
620
621/// Execute a sequence of steps against the given context.
622///
623/// Convenience wrapper around [`execute_steps_with`] that passes
624/// `None` for checks, falling back to `ctx.assert`. Use
625/// [`execute_steps_with`] when you need to override `ctx.assert`.
626pub fn execute_steps(ctx: &Ctx, steps: Vec<Step>) -> Result<AssertResult> {
627 execute_steps_with(ctx, steps, None)
628}
629
630/// Execute a [`Backdrop`](backdrop::Backdrop) + Steps sequence
631/// against the given context.
632///
633/// The Backdrop declares persistent scenario-wide state
634/// (long-running payloads, cgroups referenced by many Steps) while
635/// Steps express bounded per-phase behavior. The runtime sets up
636/// the Backdrop before the first Step, runs the Step sequence
637/// with per-Step teardown (cgroups removed, workload handles
638/// collected, payload handles killed at step boundary), and tears
639/// the Backdrop down at the end.
640pub fn execute_scenario(
641 ctx: &Ctx,
642 backdrop: backdrop::Backdrop,
643 steps: Vec<Step>,
644) -> Result<AssertResult> {
645 execute_scenario_with(ctx, backdrop, steps, None)
646}
647
648/// [`execute_scenario`] with an explicit
649/// [`Assert`](crate::assert::Assert) override — the Backdrop
650/// equivalent of [`execute_steps_with`].
651pub fn execute_scenario_with(
652 ctx: &Ctx,
653 backdrop: backdrop::Backdrop,
654 steps: Vec<Step>,
655 checks: Option<&crate::assert::Assert>,
656) -> Result<AssertResult> {
657 run_scenario(ctx, backdrop, steps, checks)
658}
659
660/// Execute steps with an explicit [`Assert`](crate::assert::Assert) for
661/// worker checks. When `checks` is `Some`, it overrides `ctx.assert`.
662/// When `None`, uses `ctx.assert` (the merged three-layer config).
663///
664/// Thin wrapper around [`execute_scenario_with`] with an empty
665/// [`Backdrop`](backdrop::Backdrop) — every Step's effects
666/// (cgroups, workloads, payloads) tear down at the step boundary.
667pub fn execute_steps_with(
668 ctx: &Ctx,
669 steps: Vec<Step>,
670 checks: Option<&crate::assert::Assert>,
671) -> Result<AssertResult> {
672 execute_scenario_with(ctx, backdrop::Backdrop::new(), steps, checks)
673}
674
675/// Compute the union of cgroup v2 controllers required by a
676/// Backdrop and Step sequence. Walks every [`CgroupDef`] declaration
677/// and every [`Op`] variant, returning the smallest set of
678/// controllers that must be enabled in `cgroup.subtree_control` for
679/// the scenario's per-knob writes to land.
680///
681/// Mapping:
682/// - [`CgroupDef::cpuset`] / [`CgroupDef::cpuset_mems`] → `Controller::Cpuset`
683/// - [`CgroupDef::cpu`] → `Controller::Cpu`
684/// - [`CgroupDef::memory`] → `Controller::Memory`
685/// - [`CgroupDef::pids`] → `Controller::Pids`
686/// - [`CgroupDef::io`] → `Controller::Io`
687/// - [`Op::SetCpuset`] / [`Op::ClearCpuset`] / [`Op::SwapCpusets`] /
688/// [`Op::SetAffinity`] → `Controller::Cpuset`
689/// - Every other [`Op`] variant ([`Op::FreezeCgroup`],
690/// [`Op::AddCgroup`], [`Op::Spawn`], [`Op::MoveAllTasks`], etc.)
691/// touches cgroup-core knobs (`cgroup.freeze`, `cgroup.procs`,
692/// `mkdir`/`rmdir`) which are ungated by any controller and
693/// contribute nothing to this set.
694///
695/// Returning the SMALLEST set lets a test that intentionally
696/// requires the absence of a controller (e.g. testing behavior on
697/// a kernel without `+cpu`) get an empty subtree_control write.
698fn required_controllers(
699 ctx: &Ctx,
700 backdrop: &backdrop::Backdrop,
701 steps: &[Step],
702) -> BTreeSet<crate::cgroup::Controller> {
703 use crate::cgroup::Controller;
704 fn absorb_def(set: &mut BTreeSet<Controller>, def: &CgroupDef) {
705 if def.cpuset.is_some() || def.cpuset_mems.is_some() {
706 set.insert(Controller::Cpuset);
707 }
708 if def.cpu.is_some() {
709 set.insert(Controller::Cpu);
710 }
711 if def.memory.is_some() {
712 set.insert(Controller::Memory);
713 }
714 if def.io.is_some() {
715 set.insert(Controller::Io);
716 }
717 if def.pids.is_some() {
718 set.insert(Controller::Pids);
719 }
720 }
721 fn absorb_op(set: &mut BTreeSet<Controller>, op: &Op) {
722 if matches!(
723 op,
724 Op::SetCpuset { .. }
725 | Op::ClearCpuset { .. }
726 | Op::SwapCpusets { .. }
727 | Op::SetAffinity { .. }
728 ) {
729 set.insert(Controller::Cpuset);
730 }
731 // AddCgroupDef carries a full CgroupDef whose knobs may
732 // require any of the same controllers absorb_def covers. The
733 // op-applied def goes through apply_setup at op-execute time,
734 // which writes to those controller files; the parent's
735 // subtree_control must already have the controllers enabled
736 // by then, so absorb the def's needs into the pre-scenario
737 // controller setup the same way step-local CgroupDefs do.
738 if let Op::AddCgroupDef { def } = op {
739 absorb_def(set, def);
740 }
741 }
742 let mut set = BTreeSet::new();
743 for def in &backdrop.cgroups {
744 absorb_def(&mut set, def);
745 }
746 for op in &backdrop.ops {
747 absorb_op(&mut set, op);
748 }
749 for step in steps {
750 for def in step.setup.resolve(ctx) {
751 absorb_def(&mut set, &def);
752 }
753 for op in &step.ops {
754 absorb_op(&mut set, op);
755 }
756 }
757 set
758}
759
760/// Validate every step hold spec and reject scheduler-kind Backdrop
761/// payloads up front so failures surface at the declaration, before
762/// any runtime state is created.
763fn validate_scenario_inputs(backdrop: &backdrop::Backdrop, steps: &[Step]) -> Result<()> {
764 // Validate every step's hold spec up front so a typo doesn't
765 // reach `Duration::from_secs_f64(NaN)` / `thread::sleep(ZERO)` /
766 // a no-yield Loop busy-wait after ops have already been applied.
767 for (i, step) in steps.iter().enumerate() {
768 if let Err(reason) = step.hold.validate() {
769 anyhow::bail!("step {i} hold validation: {reason}");
770 }
771 }
772 // Validate Backdrop payloads before creating any runtime state.
773 // Only binary payloads can be spawned by Op::RunPayload, which
774 // is what the Backdrop setup uses under the hood. Reject
775 // scheduler-kind payloads here so the failure surface is the
776 // Backdrop declaration, not a mid-scenario spawn error after
777 // cgroups have already been created.
778 for p in &backdrop.payloads {
779 if p.is_scheduler() {
780 anyhow::bail!(
781 "Backdrop::push_payload received scheduler-kind Payload '{}' — \
782 only PayloadKind::Binary payloads run in the Backdrop; \
783 place scheduler-kind payloads on the #[ktstr_test(scheduler = ...)] \
784 attribute instead",
785 p.name,
786 );
787 }
788 }
789 // Scheduler-kind payloads smuggled via Backdrop::push_op(Op::RunPayload { ... })
790 // would otherwise bypass the check above and only bail deep inside
791 // apply_ops. Reject them here with a Backdrop-specific error so
792 // the failure surface matches the declaration surface.
793 for op in &backdrop.ops {
794 if let Op::RunPayload { payload, .. } = op
795 && payload.is_scheduler()
796 {
797 anyhow::bail!(
798 "Backdrop::push_op(Op::RunPayload) received scheduler-kind Payload '{}' — \
799 only PayloadKind::Binary payloads run in the Backdrop; \
800 place scheduler-kind payloads on the #[ktstr_test(scheduler = ...)] \
801 attribute instead",
802 payload.name,
803 );
804 }
805 }
806 Ok(())
807}
808
809/// Guest-side gate that blocks until the host's queued BPF map writes
810/// land, so the workload phase observes fresh map values.
811fn wait_for_host_map_write(ctx: &Ctx) {
812 // When a host-side BPF map write is configured the test framework
813 // sets `wait_for_map_write=true`; in that case block until the
814 // guest's `hvc0_poll_loop` observes
815 // [`crate::vmm::virtio_console::SIGNAL_BPF_WRITE_DONE`] (pushed by
816 // the host's `bpf-map-write` thread after every queued
817 // `bpf_map_write` lands) and fires the `bpf_map_write_done` latch.
818 // Without this gate the workload phase races against the host's
819 // map writes and may observe a stale BPF map value.
820 //
821 // Guest-only path. On the host (unit tests) the latch is never
822 // armed, so we skip the wait entirely. The 60 s timeout matches
823 // the bpf-map-write thread's combined phase 1 + phase 2 budget
824 // (30 s accessor init + 30 s map discovery in
825 // `freeze_coord::start_bpf_map_write`); a real timeout means the
826 // host failed to resolve a map. The scenario continues anyway
827 // (rather than `bail!`) because the legacy rendezvous also let
828 // the guest proceed under its own timeout, and a bail here would
829 // mask the underlying host-side resolution failure with a
830 // test-side `Err`.
831 if ctx.wait_for_map_write && guest_comms::is_guest() {
832 let latch = crate::vmm::rust_init::bpf_map_write_done_latch();
833 if !latch.wait_timeout(Duration::from_secs(60)) {
834 tracing::warn!(
835 "wait_for_map_write timed out after 60s — host bpf-map-write \
836 thread may have failed to resolve a queued map; proceeding \
837 with the workload regardless"
838 );
839 }
840 }
841}
842
843/// Run persistent Backdrop setup. `Ok(None)` on success; `Ok(Some(r))`
844/// carries the failure AssertResult when setup errors so the caller
845/// returns it directly.
846fn run_backdrop_setup<'a>(
847 ctx: &'a Ctx,
848 backdrop: &backdrop::Backdrop,
849 backdrop_state: &mut BackdropState<'a>,
850 effective_checks: &crate::assert::Assert,
851 result: &AssertResult,
852) -> Result<Option<AssertResult>> {
853 // --- Backdrop setup (persistent) ---
854 // Run before the first Step. Cgroups + payloads declared on
855 // `backdrop` land in `backdrop_state` so they survive every
856 // Step's teardown. On error, drain Backdrop payload handles
857 // (metric emission) and propagate.
858 if backdrop.is_empty() {
859 return Ok(None);
860 }
861 let mut step_staging = StepState::empty(ctx);
862 let mut scratch = ScenarioState::new(&mut step_staging, backdrop_state);
863 let setup_res = scratch.with_target_backdrop(|s| {
864 // Order: cgroups → ops → payloads. CgroupDefs go first so
865 // a later `Op::add_cgroup` / `Op::run_payload_in_cgroup`
866 // can target cgroups that `apply_setup` just created.
867 // Payloads spawn last so `run_payload` resolving a cgroup
868 // placement lands inside a cgroup that either apply pass
869 // already built.
870 if !backdrop.cgroups.is_empty() {
871 apply_setup(ctx, s, &backdrop.cgroups)?;
872 }
873 // Raw ops: typically `Op::AddCgroup` for empty move-target
874 // cgroups (can't be expressed via CgroupDef because
875 // apply_setup forces a worker spawn), or placement-aware
876 // `Op::RunPayload` targeting a just-created backdrop
877 // cgroup.
878 if !backdrop.ops.is_empty() {
879 apply_ops(ctx, s, &backdrop.ops, false)?;
880 }
881 // Shorthand payloads: one Op::RunPayload per entry,
882 // inherited cgroup placement.
883 if !backdrop.payloads.is_empty() {
884 let ops: Vec<Op> = backdrop
885 .payloads
886 .iter()
887 .map(|p| Op::run_payload(p, Vec::<String>::new()))
888 .collect();
889 apply_ops(ctx, s, &ops, false)?;
890 }
891 Ok::<(), anyhow::Error>(())
892 });
893 if let Err(err) = setup_res {
894 // Scheduler crashed during backdrop setup (e.g. a worker
895 // tripped the BPF error before the first Step): SIGKILL the
896 // spawned workers up front so the collect reaps below aren't
897 // scheduling-gated behind fallback workers (see
898 // `sigkill_handles`). Gated on `scx_down` (crash-only) so a
899 // non-crash setup error keeps the graceful collect path.
900 if sched_crashed_unexpectedly() {
901 sigkill_handles(&backdrop_state.handles);
902 sigkill_handles(&step_staging.handles);
903 }
904 // Collect any workers that DID spawn before the failure
905 // so their stats reach the final result instead of being
906 // discarded by `WorkloadHandle::drop` (which SIGKILLs
907 // without gathering scheduler-side data). `collect_*`
908 // drain `payload_handles` internally, so the backdrop-
909 // and step-side payloads still get `.kill()` (SHM metric
910 // emission) on the error path.
911 //
912 // `with_target_backdrop` routes every target writer to
913 // the backdrop slot, so `step_staging` normally holds
914 // nothing — but collect defensively so a partial-failure
915 // path that leaks a non-backdrop write surfaces here
916 // rather than disappearing into `StepState::drop`.
917 let mut r = collect_backdrop(backdrop_state, effective_checks, ctx.topo, ctx.cgroups);
918 let staging_result = collect_step(
919 &mut step_staging,
920 effective_checks,
921 ctx.topo,
922 ctx.cgroups,
923 // Defensive backdrop-staging collect: no step attribution.
924 None,
925 );
926 r.merge(staging_result);
927 r.merge(result.clone());
928 // step_staging's CgroupGroup RAII still drops here,
929 // removing any cgroups the failed Backdrop setup routed
930 // into step-local state.
931 r.record_fail(crate::assert::AssertDetail::new(
932 crate::assert::DetailKind::Other,
933 format!("Backdrop setup failed: {err:#}"),
934 ));
935 return Ok(Some(r));
936 }
937 // `step_staging` should not have accumulated anything
938 // because `with_target_backdrop` routed every target writer
939 // to the backdrop side. Collect any stray handles defensively
940 // before dropping so a future refactor that leaks a non-
941 // backdrop write here surfaces as a missed teardown rather
942 // than silently discarded state.
943 drain_all_payload_handles(&mut step_staging.payload_handles);
944 Ok(None)
945}
946
947/// 1-indexed phase number for scenario Step `step_idx` (BASELINE=0,
948/// Step k -> k+1), saturating at `u16::MAX` past the encoding range.
949fn phase_step_index(step_idx: usize) -> u16 {
950 u16::try_from(step_idx)
951 .ok()
952 .and_then(|i| i.checked_add(1))
953 .unwrap_or(u16::MAX)
954}
955
956/// Build the inter-step scheduler-death failure result: collect
957/// backdrop workers, merge the accumulated result, record the
958/// `format_sched_died_after_step` detail.
959fn build_sched_died_after_step(
960 backdrop_state: &mut BackdropState<'_>,
961 result: AssertResult,
962 ctx: &Ctx,
963 effective_checks: &crate::assert::Assert,
964 step_idx: usize,
965 steps_len: usize,
966 scenario_start: std::time::Instant,
967) -> AssertResult {
968 // Collect backdrop-owned workload handles into the
969 // result before reporting the crash so whatever the
970 // persistent workers produced is still assertable.
971 let mut r = collect_backdrop(backdrop_state, effective_checks, ctx.topo, ctx.cgroups);
972 r.merge(result);
973 r.record_fail(crate::assert::AssertDetail::new(
974 sched_died_detail_kind(),
975 crate::assert::format_sched_died_after_step(
976 step_idx,
977 steps_len,
978 scenario_start.elapsed().as_secs_f64(),
979 ),
980 ));
981 r
982}
983
984/// Build the step-error failure result: collect backdrop workers,
985/// merge the accumulated result, record the `step N failed` detail.
986fn build_step_failed(
987 backdrop_state: &mut BackdropState<'_>,
988 result: AssertResult,
989 ctx: &Ctx,
990 effective_checks: &crate::assert::Assert,
991 step_idx: usize,
992 err: &anyhow::Error,
993) -> AssertResult {
994 // Collect Backdrop-owned workload handles into a fresh
995 // result first, then merge the accumulated step result
996 // on top. `collect_backdrop` drains
997 // `backdrop_state.payload_handles` internally, so the
998 // backdrop-side payloads still get `.kill()` (metric
999 // emission) on the error path. Ordering mirrors the
1000 // scheduler-crash path above so detail order is
1001 // consistent across both Ok(failed) returns.
1002 let mut r = collect_backdrop(backdrop_state, effective_checks, ctx.topo, ctx.cgroups);
1003 r.merge(result);
1004 r.record_fail(crate::assert::AssertDetail::new(
1005 crate::assert::DetailKind::Other,
1006 format!("step {step_idx} failed: {err:#}"),
1007 ));
1008 r
1009}
1010
1011/// Build the sched-died-during-hold failure result: collect backdrop
1012/// workers, merge the accumulated result, record the
1013/// `format_sched_died_during_workload` detail.
1014fn build_sched_died_during_hold(
1015 backdrop_state: &mut BackdropState<'_>,
1016 result: AssertResult,
1017 ctx: &Ctx,
1018 effective_checks: &crate::assert::Assert,
1019 scenario_start: std::time::Instant,
1020) -> AssertResult {
1021 let mut r = collect_backdrop(backdrop_state, effective_checks, ctx.topo, ctx.cgroups);
1022 r.merge(result);
1023 r.record_fail(crate::assert::AssertDetail::new(
1024 sched_died_detail_kind(),
1025 crate::assert::format_sched_died_during_workload(scenario_start.elapsed().as_secs_f64()),
1026 ));
1027 r
1028}
1029
1030/// Emit the ScenarioEnd marker, run the final liveness check, tear the
1031/// Backdrop down, and fold in a sched-died detail when the scheduler
1032/// died after the last hold.
1033fn finish_scenario(
1034 ctx: &Ctx,
1035 backdrop_state: &mut BackdropState<'_>,
1036 effective_checks: &crate::assert::Assert,
1037 scenario_start: std::time::Instant,
1038 steps_len: usize,
1039 final_total_iterations: Option<(u64, u64)>,
1040 mut result: AssertResult,
1041) -> AssertResult {
1042 // ScenarioEnd marker. Routes through `send_scenario_end`
1043 // (virtio-console port-1 bulk channel).
1044 // Carries the last cleanly-completed step's COINCIDENT (elapsed_ms,
1045 // iterations) pair so the host can give the last step an
1046 // `iteration_rate` over a well-formed window. The elapsed
1047 // is the end-of-hold timestamp captured WITH the count inside
1048 // run_step — NOT recomputed here, which would include the
1049 // pause+collect teardown wall-time and under-report the rate. When
1050 // no step completed a hold (e.g. every step's scheduler died) the
1051 // pair is `None`: fall back to (now, 0), which the host skips
1052 // (0 baseline / e<=s).
1053 if guest_comms::is_guest() {
1054 let (end_elapsed_ms, end_iterations) = final_total_iterations
1055 .unwrap_or_else(|| (scenario_start.elapsed().as_millis() as u64, 0));
1056 crate::vmm::guest_comms::send_scenario_end(end_elapsed_ms, end_iterations);
1057 }
1058
1059 // Final liveness check. Live `crate::vmm::rust_init::sched_pid()`
1060 // read instead of `ctx.sched_pid` snapshot so a mid-scenario
1061 // Op::ReplaceScheduler swap reflects the new pid here too.
1062 // sched_pid() == None ⇒ no scheduler configured (kernel-default
1063 // path) OR Op::DetachScheduler cleared it; no liveness to
1064 // report on either case.
1065 let sched_dead = crate::vmm::rust_init::sched_pid()
1066 .is_some_and(|pid| !process_alive(pid) || dispatch::scx_down());
1067
1068 // Scheduler died after the last hold: SIGKILL the backdrop
1069 // workers up front so the teardown reap below isn't
1070 // scheduling-gated behind still-spinning workers that fell back
1071 // to the builtin scheduler (see `sigkill_handles`).
1072 if sched_dead {
1073 sigkill_handles(&backdrop_state.handles);
1074 }
1075
1076 // --- Backdrop teardown ---
1077 let backdrop_result = collect_backdrop(backdrop_state, effective_checks, ctx.topo, ctx.cgroups);
1078 result.merge(backdrop_result);
1079
1080 if sched_dead {
1081 result.record_fail(crate::assert::AssertDetail::new(
1082 sched_died_detail_kind(),
1083 crate::assert::format_sched_died_after_all_steps(
1084 steps_len,
1085 scenario_start.elapsed().as_secs_f64(),
1086 ),
1087 ));
1088 }
1089
1090 result
1091}
1092
1093/// Internal driver: runs Backdrop setup, the Step loop with
1094/// per-Step teardown, and final Backdrop teardown.
1095fn run_scenario(
1096 ctx: &Ctx,
1097 backdrop: backdrop::Backdrop,
1098 steps: Vec<Step>,
1099 checks: Option<&crate::assert::Assert>,
1100) -> Result<AssertResult> {
1101 validate_scenario_inputs(&backdrop, &steps)?;
1102 let effective_checks = checks.unwrap_or(&ctx.assert);
1103
1104 // Enable the controllers this scenario actually needs in
1105 // `cgroup.subtree_control` BEFORE any cgroupfs writes land. The
1106 // union is computed from every CgroupDef and Op declared in the
1107 // backdrop+steps; tests that declare no controller-gated knobs
1108 // get an empty set (parent dir created, no subtree_control walk).
1109 let required = required_controllers(ctx, &backdrop, &steps);
1110 ctx.cgroups
1111 .setup(&required)
1112 .context("enable cgroup controllers in subtree_control")?;
1113
1114 let mut backdrop_state = BackdropState::empty(ctx);
1115 let mut result = AssertResult::pass();
1116
1117 // (scenario-relative elapsed_ms, cumulative worker iteration count)
1118 // at the END of the most-recently completed step, captured
1119 // coincidently inside `run_step` while that step's workers are still
1120 // alive. Carries the LAST step's pair out of the loop so the
1121 // `ScenarioEnd` frame can supply the final step's `iteration_rate`
1122 // right boundary. `None` until the first step completes a
1123 // hold cleanly.
1124 let mut final_total_iterations: Option<(u64, u64)> = None;
1125
1126 let scenario_start = std::time::Instant::now();
1127
1128 // ScenarioStart marker. `is_guest` short-circuits in host
1129 // contexts (unit tests) where the bulk port and SHM ring are
1130 // both absent and `send_scenario_start` would log a no-op warning.
1131 if guest_comms::is_guest() {
1132 crate::vmm::guest_comms::send_scenario_start();
1133 }
1134
1135 wait_for_host_map_write(ctx);
1136
1137 if let Some(r) = run_backdrop_setup(
1138 ctx,
1139 &backdrop,
1140 &mut backdrop_state,
1141 effective_checks,
1142 &result,
1143 )? {
1144 return Ok(r);
1145 }
1146
1147 // --- Step loop with per-Step teardown ---
1148 for (step_idx, step) in steps.iter().enumerate() {
1149 // Check scheduler liveness between steps (skip before first).
1150 // Live `crate::vmm::rust_init::sched_pid()` read instead of
1151 // `ctx.sched_pid` snapshot so a mid-scenario
1152 // `Op::ReplaceScheduler` swap is reflected — the swap
1153 // dispatcher publishes the new child's pid to `SCHED_PID`
1154 // via the `SCHED_PID.store` in `try_spawn_scheduler`, and
1155 // this check then observes the new pid's liveness (not the
1156 // dead boot pid). `None` means
1157 // either no scheduler was configured at boot or
1158 // `Op::DetachScheduler` cleared the pid; the liveness probe
1159 // cannot meaningfully report on a pid that doesn't exist.
1160 if step_idx > 0
1161 && let Some(pid) = crate::vmm::rust_init::sched_pid()
1162 && (!process_alive(pid) || dispatch::scx_down())
1163 {
1164 // Scheduler died between steps: the kernel has disabled
1165 // sched_ext and the backdrop workers have fallen back to
1166 // the builtin scheduler. SIGKILL them up front so the
1167 // collect reap below isn't scheduling-gated behind
1168 // still-spinning workers (see `sigkill_handles`).
1169 sigkill_handles(&backdrop_state.handles);
1170 return Ok(build_sched_died_after_step(
1171 &mut backdrop_state,
1172 result,
1173 ctx,
1174 effective_checks,
1175 step_idx,
1176 steps.len(),
1177 scenario_start,
1178 ));
1179 }
1180
1181 let mut step_state = StepState::empty(ctx);
1182 let mut sched_died_during_hold = false;
1183 // Publish the 1-indexed phase number for this Step so the
1184 // freeze-coordinator periodic-capture path and the on-demand
1185 // Op::CaptureSnapshot / Op::WatchSnapshot apply arms all
1186 // stamp the captures they take with the correct scenario
1187 // phase. The 1-indexed encoding (scenario Step k -> phase
1188 // k + 1) reserves phase 0 for the pre-first-Step BASELINE
1189 // settle window. `Release` pairs with the consumers'
1190 // `Acquire` load so a sample stamped with this value
1191 // happens-after any state the Step has set up before
1192 // calling run_step.
1193 let phase_step_index = phase_step_index(step_idx);
1194 ctx.current_step
1195 .store(phase_step_index, std::sync::atomic::Ordering::Release);
1196 // Broadcast the phase to this handle's backdrop (persistent)
1197 // workers so each drains its per-phase PhaseSlice at this
1198 // boundary. Step-local pools are NOT bumped here — they
1199 // re-spawn per step and get per-phase attribution via
1200 // collect_step's step_index instead.
1201 for (_, h) in &backdrop_state.handles {
1202 h.set_phase_epoch(u32::from(phase_step_index));
1203 }
1204 // Install the assert-side phase guard for the scenario
1205 // driver's thread for the duration of this Step. Every
1206 // AssertDetail / PassDetail / InfoNote constructed under
1207 // the run_step call below auto-stamps its `phase` field
1208 // with "Step[<step_idx>]" via the thread-local snapshot
1209 // in `crate::assert::current_phase_label`. On Drop the
1210 // prior label is restored (BASELINE outside any Step), so
1211 // assertions evaluated post-loop (e.g. at scenario
1212 // teardown) stamp with the right outer scope.
1213 let _phase_guard = crate::assert::PhaseGuard::install_step(step_idx as u16);
1214 let step_res = run_step(
1215 ctx,
1216 step,
1217 step_idx,
1218 &mut step_state,
1219 &mut backdrop_state,
1220 scenario_start,
1221 effective_checks,
1222 &mut sched_died_during_hold,
1223 &mut final_total_iterations,
1224 );
1225
1226 // Close this step's hold window for backdrop workers: write the
1227 // inter-step gap sentinel so work done between this StepEnd and
1228 // the next StepStart is NOT folded into step `step_idx`'s slice
1229 // (matching the host's hold-only window clamp, and step-local
1230 // workers, which do not exist during the inter-step gap).
1231 for (_, h) in &backdrop_state.handles {
1232 h.set_phase_epoch(u32::MAX);
1233 }
1234
1235 // Scheduler died during this step's hold: the kernel has
1236 // disabled sched_ext and the workers (step and backdrop) have
1237 // fallen back to the builtin scheduler. If CPU-bound they
1238 // would CFS-starve the per-worker reap in the collect calls
1239 // below, so SIGKILL them all up front — the reaps then find
1240 // already-exiting workers (see `sigkill_handles`). Gated on
1241 // the death flag so the normal teardown keeps its graceful
1242 // cooperative-stop + report-collection path.
1243 if sched_died_during_hold || sched_crashed_unexpectedly() {
1244 sigkill_handles(&step_state.handles);
1245 sigkill_handles(&backdrop_state.handles);
1246 }
1247
1248 if guest_comms::is_guest() {
1249 crate::vmm::guest_comms::send_scenario_pause();
1250 }
1251
1252 let step_result = collect_step(
1253 &mut step_state,
1254 effective_checks,
1255 ctx.topo,
1256 ctx.cgroups,
1257 // The SAME 1-indexed value stamped onto this step's StepStart
1258 // frames (build_stimulus), so the guest per_cgroup keys on the
1259 // identical step_index the host rebuilds buckets under.
1260 Some(phase_step_index),
1261 );
1262 result.merge(step_result);
1263
1264 // A step-level error is converted into a failure on the
1265 // accumulated result after teardown has run so every step
1266 // boundary leaves clean state behind even on failure. The
1267 // caller keeps the prior-steps' merged AssertResult plus
1268 // the error context as a detail, instead of an opaque Err
1269 // that discards everything.
1270 if let Err(err) = step_res {
1271 // Scheduler may have crashed between the pre-collect gate
1272 // above and here (e.g. a non-crash step error, then scx went
1273 // down during `collect_step`): SIGKILL the backdrop workers
1274 // up front so this collect isn't scheduling-gated behind the
1275 // fallback pool (see `sigkill_handles`). Crashed-only gate so
1276 // a plain step error with a live scheduler keeps the graceful
1277 // cooperative-stop path.
1278 if sched_crashed_unexpectedly() {
1279 sigkill_handles(&backdrop_state.handles);
1280 }
1281 return Ok(build_step_failed(
1282 &mut backdrop_state,
1283 result,
1284 ctx,
1285 effective_checks,
1286 step_idx,
1287 &err,
1288 ));
1289 }
1290
1291 // Scheduler exited during the step's hold-period sleep —
1292 // [`run_step`] cut the hold short and stamped
1293 // `sched_died_during_hold`. Emit the in-step
1294 // sched-died message before continuing to the next step
1295 // boundary; otherwise the post-loop probe would fire after
1296 // the full scenario duration and stamp a misleading elapsed
1297 // time. Same Backdrop-then-step merge order as the
1298 // inter-step path above so detail ordering stays consistent.
1299 if sched_died_during_hold {
1300 return Ok(build_sched_died_during_hold(
1301 &mut backdrop_state,
1302 result,
1303 ctx,
1304 effective_checks,
1305 scenario_start,
1306 ));
1307 }
1308 }
1309
1310 Ok(finish_scenario(
1311 ctx,
1312 &mut backdrop_state,
1313 effective_checks,
1314 scenario_start,
1315 steps.len(),
1316 final_total_iterations,
1317 result,
1318 ))
1319}
1320
1321/// Sleep up to `dur`, returning early if `sched_pid` exits.
1322///
1323/// Returns `true` the first time the scheduler is observed dead,
1324/// `false` if the full duration elapsed with no death observed.
1325/// When `sched_pid` is `None` (kernel-default scheduling, no
1326/// scheduler process to monitor), behaves exactly like
1327/// [`thread::sleep`] and always returns `false`.
1328///
1329/// Implementation uses `pidfd_open(2)` + `epoll_wait` so the waiter
1330/// is kernel-blocked on the pidfd until either the scheduler exits
1331/// (pidfd becomes readable) or the per-step hold elapses. This
1332/// drops crash-detection latency from one poll-tick (the previous
1333/// 100 ms cadence) to ~0: the kernel wakes the epoll waiter as
1334/// soon as the task transitions to EXIT_ZOMBIE. Mirrors
1335/// [`crate::scenario::payload_run`]'s `wait_with_deadline` shape.
1336/// Minimum kernel: Linux 5.3.
1337///
1338/// Deadline honoring: the `epoll_wait` timeout is re-derived from
1339/// `saturating_duration_since` each iteration so `EINTR` restarts
1340/// narrow the remaining window rather than extending it.
1341///
1342/// Failure handling: if `pidfd_open` returns `ESRCH`, the scheduler
1343/// is already gone — return `true` immediately without sleeping. Any
1344/// other failure mode (pidfd_open non-ESRCH, epoll_create1,
1345/// epoll_ctl ADD, EpollTimeout::try_from, epoll_wait) panics with an
1346/// operator-actionable message. Polling fallbacks were removed per
1347/// the project-wide "no polling fallbacks for evented paths" rule:
1348/// pidfd_open has shipped since Linux 5.3 and epoll has been
1349/// universally available for longer, so a failure here indicates a
1350/// catastrophic environment defect (memory pressure exhausting fds,
1351/// kernel feature compiled out) rather than a recoverable transient.
1352/// A loud panic surfaces the defect immediately; the prior silent
1353/// sleep+probe fallback masked it as test flakiness.
1354///
1355/// Scheduling jitter under load can leave the actual elapsed time
1356/// modestly above `dur`.
1357/// Loud panic on env- or code-defect failures inside [`hold_or_sched_died`].
1358/// Centralised so every site emits the same module-qualified prefix
1359/// `ktstr::scenario::hold_or_sched_died` — operator can grep that exact
1360/// string to land at the panic source. `op` names the failed primitive,
1361/// `pid` carries the in-scope pid for cross-reference with /proc, `err`
1362/// renders the underlying errno or nix error, `advice` is the one-line
1363/// remediation classified by failure class (env vs framework code defect).
1364#[cold]
1365#[track_caller]
1366fn panic_evented_hold_defect(
1367 op: &str,
1368 pid: libc::pid_t,
1369 err: impl std::fmt::Display,
1370 advice: &str,
1371) -> ! {
1372 panic!("ktstr::scenario::hold_or_sched_died: {op} failed (pid={pid}): {err} — {advice}");
1373}
1374
1375/// True when a scheduler is still expected running (`sched_pid` set) AND
1376/// sched_ext has gone down (`disabling`/`disabled`) — an unexpected
1377/// crash/unregister of the scheduler under test, detected via the
1378/// probe-independent [`dispatch::scx_down`] sysfs read. Unlike the BPF
1379/// err-exit latch (whose host mirror is only populated in the
1380/// auto-repro/dump VM, never the primary VM that runs the test), this is
1381/// live in the PRIMARY VM. The `sched_pid` guard keeps a clean
1382/// [`Op::DetachScheduler`] — which clears the pid before the state
1383/// settles to `disabled` — off the SIGKILL fast-path. Used by the
1384/// teardown gates that don't already hold the scheduler pid in scope.
1385fn sched_crashed_unexpectedly() -> bool {
1386 crate::vmm::rust_init::sched_pid().is_some() && dispatch::scx_down()
1387}
1388
1389fn hold_or_sched_died(dur: Duration, sched_pid: Option<libc::pid_t>) -> bool {
1390 use crate::probe::process::{SchedExitKind, sched_exit_kind};
1391 use nix::sys::epoll::{Epoll, EpollCreateFlags, EpollEvent, EpollFlags, EpollTimeout};
1392 use std::os::fd::{AsFd, FromRawFd, OwnedFd};
1393
1394 // The BPF err-exit latch (read via `sched_exit_kind`) flips at the
1395 // error-class sched_ext exit — i.e. at the crash, before the crashed
1396 // scheduler PROCESS exits (a scheduler like scx_lavd observes the
1397 // disable through its ~1s userspace poll loop, then flushes its dump
1398 // and exits, so its process exit trails the crash by ~that long). Poll
1399 // it ALONGSIDE the pidfd so a crash aborts the hold at ~crash time, not
1400 // at the process exit. The pidfd remains the backstop for
1401 // clean/non-error process exits. 100ms is negligible vs the crash + the
1402 // scheduler's exit latency.
1403 const ERR_EXIT_POLL: Duration = Duration::from_millis(100);
1404 let crashed = || matches!(sched_exit_kind(), SchedExitKind::Crashed);
1405
1406 if dur.is_zero() {
1407 return crashed()
1408 || (sched_pid.is_some() && dispatch::scx_down())
1409 || sched_pid.is_some_and(|pid| !process_alive(pid));
1410 }
1411 let deadline = std::time::Instant::now() + dur;
1412 let Some(pid) = sched_pid else {
1413 // No scheduler pid (host-only run, or the pid was not recorded):
1414 // no pidfd to wait on, but the err-exit latch still fires on a
1415 // crash — poll it in short intervals instead of sleeping blind
1416 // for the whole window.
1417 loop {
1418 let remaining = deadline.saturating_duration_since(std::time::Instant::now());
1419 if remaining.is_zero() {
1420 return crashed();
1421 }
1422 if crashed() {
1423 return true;
1424 }
1425 thread::sleep(remaining.min(ERR_EXIT_POLL));
1426 }
1427 };
1428
1429 // `pidfd_open(pid, 0)`: returns an fd that becomes readable when
1430 // the pid exits. Only meaningful on a thread-group leader, which
1431 // every `sched_pid` already is (it is the scheduler binary's
1432 // top-level pid as recorded in `Ctx::sched_pid`). No
1433 // `PIDFD_NONBLOCK` flag — epoll is the gate.
1434 let pidfd_raw = unsafe { libc::syscall(libc::SYS_pidfd_open, pid, 0i32) };
1435 if pidfd_raw < 0 {
1436 let err = std::io::Error::last_os_error();
1437 if err.raw_os_error() == Some(libc::ESRCH) {
1438 // pidfd_open observed the pid as gone before we could
1439 // even attach a waiter — sched is already dead.
1440 return true;
1441 }
1442 // pidfd_open shipped unconditionally in Linux 5.3 and ktstr's
1443 // kernel floor is well above that. A non-ESRCH failure (ENOMEM,
1444 // ENFILE, EPERM) means the test environment is broken in a way
1445 // polling cannot recover from. Panic loudly so the operator
1446 // sees the env defect instead of silently losing sched-died
1447 // detection for the rest of the hold.
1448 panic_evented_hold_defect(
1449 "pidfd_open",
1450 pid,
1451 format_args!("{err} (errno {:?})", err.raw_os_error()),
1452 "pidfd_open is unconditional from Linux 5.3; failure on a \
1453 5.3+ kernel = env defect — check ulimit -n / memory pressure / \
1454 cgroup pids.max",
1455 );
1456 }
1457 // SAFETY: the syscall succeeded and returned a fresh fd; it is
1458 // not registered with any other owner.
1459 let pidfd: OwnedFd = unsafe { OwnedFd::from_raw_fd(pidfd_raw as i32) };
1460
1461 // epoll setup. EPOLL_CLOEXEC matches `wait_with_deadline` to
1462 // avoid leaking the epoll fd into any post-fork descendant.
1463 let epoll = match Epoll::new(EpollCreateFlags::EPOLL_CLOEXEC) {
1464 Ok(e) => e,
1465 Err(e) => {
1466 let fd = std::os::fd::AsRawFd::as_raw_fd(&pidfd);
1467 panic_evented_hold_defect(
1468 "epoll_create1(EPOLL_CLOEXEC)",
1469 pid,
1470 format_args!("{e} (pidfd={fd})"),
1471 "epoll has been universally available since 2.6; failure = \
1472 env defect — check ulimit -n and CONFIG_EPOLL",
1473 );
1474 }
1475 };
1476 // `data` field is unused — we only ever watch one fd. The add()
1477 // syscall still needs an `EpollEvent` with populated events.
1478 let event = EpollEvent::new(EpollFlags::EPOLLIN, 0);
1479 if let Err(e) = epoll.add(pidfd.as_fd(), event) {
1480 panic_evented_hold_defect(
1481 "epoll_ctl(ADD)",
1482 pid,
1483 e,
1484 "epoll_ctl(ADD) on a freshly-opened pidfd should never fail \
1485 in a healthy kernel; documented errors (EBADF/EEXIST/ENOMEM) \
1486 are env defects — likely fd exhaustion or memory pressure",
1487 );
1488 }
1489
1490 let mut events = [EpollEvent::empty()];
1491 loop {
1492 let remaining = deadline.saturating_duration_since(std::time::Instant::now());
1493 if remaining.is_zero() {
1494 // Hold elapsed without a wakeup. Re-probe ALL signals to
1495 // catch a race where the pid exited, the err-exit latch
1496 // flipped, or sched_ext went down (sysfs) between the last
1497 // `epoll_wait` return and the deadline check (e.g. during
1498 // EINTR re-entry). `pid` is `Some` here, so the
1499 // scheduler-expected guard for `scx_down` is satisfied.
1500 return crashed() || dispatch::scx_down() || !process_alive(pid);
1501 }
1502
1503 // Abort at the crash, not at the lingering process exit. Two
1504 // probe-independent signals fire well before a crashed
1505 // scheduler's process exits: the BPF err-exit latch (live only
1506 // in the auto-repro/dump VM) and `/sys/kernel/sched_ext/state`
1507 // going `disabling`/`disabled` (live in the PRIMARY VM, where the
1508 // latch mirror is never populated — this is what makes a primary
1509 // VM crash abort the hold at crash time rather than ~1s later at
1510 // process exit). `pid` is `Some` here, so the
1511 // scheduler-expected guard for `scx_down` holds.
1512 if crashed() || dispatch::scx_down() {
1513 return true;
1514 }
1515
1516 // Cap the epoll timeout at the latch poll interval so the latch
1517 // is re-checked every `ERR_EXIT_POLL`, not only when the pidfd
1518 // becomes readable. `PollTimeout` (aliased as `EpollTimeout`)
1519 // stores the value as `i32`; single-pass clamp via
1520 // `u128 → i32::MAX` so a large remainder saturates at the max
1521 // accepted value instead of overflowing through the u32.
1522 let poll = remaining.min(ERR_EXIT_POLL);
1523 let ms_i32 = poll.as_millis().min(i32::MAX as u128) as i32;
1524 let timeout_param = match EpollTimeout::try_from(ms_i32) {
1525 Ok(t) => t,
1526 Err(e) => {
1527 // ms_i32 was clamped to the i32 range above so
1528 // EpollTimeout::try_from (which accepts i32) cannot
1529 // overflow. Reaching this arm means the EpollTimeout API
1530 // changed shape — code defect, not env transient.
1531 panic_evented_hold_defect(
1532 "EpollTimeout::try_from",
1533 pid,
1534 format_args!("{e} (input={ms_i32})"),
1535 "input was pre-clamped to fit i32; failure indicates an \
1536 upstream nix EpollTimeout API change requiring code update",
1537 );
1538 }
1539 };
1540
1541 match epoll.wait(&mut events, timeout_param) {
1542 Ok(0) => {
1543 // Poll-interval timeout, no pidfd event. Loop back to
1544 // re-check the err-exit latch and the deadline at the
1545 // top of the loop.
1546 }
1547 Ok(_) => {
1548 // pidfd became readable — task transitioned to
1549 // EXIT_ZOMBIE. Scheduler is dead.
1550 return true;
1551 }
1552 Err(nix::errno::Errno::EINTR) => {
1553 // Signal interrupted the wait; loop and re-compute
1554 // the remaining window.
1555 }
1556 Err(e) => {
1557 panic_evented_hold_defect(
1558 "epoll_wait",
1559 pid,
1560 e,
1561 "epoll_wait on a freshly-created epoll with a single \
1562 valid pidfd cannot legitimately fail outside EINTR; \
1563 documented errors (EBADF/EFAULT/EINVAL) are framework- \
1564 internal memory-safety defects — investigate concurrent \
1565 fd mutation or stack-frame corruption",
1566 );
1567 }
1568 }
1569 }
1570}
1571
1572/// Run a single step's setup + ops + hold against step-local state.
1573///
1574/// On error, the caller is expected to invoke `collect_step` for
1575/// per-step teardown (which runs regardless) and then propagate.
1576///
1577/// `sched_died_during_hold` is set to `true` when the hold-period
1578/// liveness poll observes the scheduler process exiting; the caller
1579/// uses this to emit [`crate::assert::format_sched_died_during_workload`]
1580/// instead of waiting for the post-loop probe to fire (which would
1581/// stamp the message with the full scenario duration even though
1582/// the death happened mid-step).
1583#[allow(clippy::too_many_arguments)]
1584fn run_step<'a>(
1585 ctx: &Ctx,
1586 step: &Step,
1587 step_idx: usize,
1588 step_state: &mut StepState<'a>,
1589 backdrop_state: &mut BackdropState<'a>,
1590 scenario_start: std::time::Instant,
1591 _effective_checks: &crate::assert::Assert,
1592 sched_died_during_hold: &mut bool,
1593 // Set to (scenario-relative elapsed_ms, cumulative worker iteration
1594 // count) sampled coincidently at this step's clean completion
1595 // Left unchanged on the sched-died early-return paths so
1596 // a failed step does not overwrite a prior step's good value with a
1597 // misleading post-death count.
1598 final_total_iterations: &mut Option<(u64, u64)>,
1599) -> Result<()> {
1600 let mut scenario = ScenarioState::new(step_state, backdrop_state);
1601
1602 // Any `?` out of apply_ops / apply_setup would bypass the
1603 // per-step teardown ordering; `drain_on_err!` kills payload
1604 // handles across step + backdrop (metric-emitting) before
1605 // propagating so a mid-scenario spawn failure still leaves a
1606 // usable sidecar.
1607 macro_rules! drain_on_err {
1608 ($scenario:expr, $e:expr) => {
1609 match $e {
1610 Ok(v) => v,
1611 Err(err) => {
1612 $scenario.drain_all_payloads();
1613 return Err(err);
1614 }
1615 }
1616 };
1617 }
1618
1619 match step.hold {
1620 HoldSpec::Loop { interval } => {
1621 // Setup runs once before the loop.
1622 if !step.setup.is_empty() {
1623 let defs = step.setup.resolve(ctx);
1624 drain_on_err!(scenario, apply_setup(ctx, &mut scenario, &defs));
1625 }
1626 // Emit the Loop step's start stimulus frame + resume, like
1627 // the non-Loop arm below. A Loop step is a phase like any
1628 // other: without its own start frame the synthesized
1629 // scenario-end terminal would pair against the
1630 // PRIOR step's frame and misattribute the entire Loop
1631 // window's throughput to the wrong step. The resume
1632 // re-enables periodic captures for the loop body and matches
1633 // the prior step's `run_steps` pause (same pause/resume
1634 // pairing the non-Loop arm uses; the first-step unmatched
1635 // resume is already the existing pattern).
1636 if guest_comms::is_guest() {
1637 let payload = build_stimulus(&scenario_start, step_idx, &step.ops, &scenario);
1638 crate::vmm::guest_comms::send_stimulus(zerocopy::IntoBytes::as_bytes(&payload));
1639 crate::vmm::guest_comms::send_scenario_resume();
1640 }
1641 // Loop mode: apply ops repeatedly at interval until
1642 // the remaining scenario time is exhausted, or the
1643 // scheduler process exits — whichever fires first.
1644 let deadline = scenario_start + ctx.duration;
1645 while std::time::Instant::now() < deadline {
1646 drain_on_err!(scenario, apply_ops(ctx, &mut scenario, &step.ops, true));
1647 let remaining = deadline.saturating_duration_since(std::time::Instant::now());
1648 // Live `sched_pid()` read so a mid-loop
1649 // Op::ReplaceScheduler swap is watched at the NEW
1650 // pid, not the stale boot snapshot in ctx.
1651 if hold_or_sched_died(remaining.min(interval), crate::vmm::rust_init::sched_pid()) {
1652 *sched_died_during_hold = true;
1653 return Ok(());
1654 }
1655 }
1656 }
1657 _ => {
1658 // Ops first (e.g. parent cgroup creation), then
1659 // CgroupDef setup (children with workers).
1660 //
1661 // Footgun: a workload-producing `CgroupDef` in
1662 // `step.setup` is invisible to `step.ops` operating on
1663 // it, because step.ops runs BEFORE apply_setup creates
1664 // the cgroup AND registers the WorkloadHandle in
1665 // `state.all_handles()`. The failure mode is per-Op:
1666 // `Op::MoveAllTasks` against the absent source filters
1667 // `state.all_handles()` by name, finds zero matches,
1668 // iterates an empty `pid_batches`, and exits with no
1669 // work done — silent (no error surfaced to the
1670 // operator); `Op::CaptureCgroupProcs` reading the
1671 // missing `cgroup.procs` via `read_procs` returns the
1672 // ENOENT `with_context` and propagates `?`, bailing the
1673 // ops phase — loud (the test crashes with the cgroup
1674 // name in the error). Tests that need a freshly-spawned
1675 // worker pool to feed step ops must declare the
1676 // producing `CgroupDef` in the Backdrop (which runs
1677 // before any Step) when `step.setup` is `Setup::Defs(_)`.
1678 // When `step.setup` is `Setup::Factory(_)` (runtime-
1679 // generated defs that can't be hoisted at edit time),
1680 // the only viable remediation is restructuring into
1681 // multiple Steps — producer factory in Step N's setup,
1682 // consumer ops in Step N+1.
1683 drain_on_err!(scenario, apply_ops(ctx, &mut scenario, &step.ops, false));
1684 if !step.setup.is_empty() {
1685 let defs = step.setup.resolve(ctx);
1686 drain_on_err!(scenario, apply_setup(ctx, &mut scenario, &defs));
1687 }
1688
1689 // Write stimulus event after applying ops. Routes through
1690 // `crate::vmm::guest_comms::send_stimulus` (virtio-console
1691 // port-1 bulk channel). `is_guest` keeps the
1692 // `build_stimulus` walk off the host where the write would
1693 // no-op.
1694 if guest_comms::is_guest() {
1695 let payload = build_stimulus(&scenario_start, step_idx, &step.ops, &scenario);
1696 crate::vmm::guest_comms::send_stimulus(zerocopy::IntoBytes::as_bytes(&payload));
1697 }
1698
1699 if guest_comms::is_guest() {
1700 crate::vmm::guest_comms::send_scenario_resume();
1701 }
1702 let hold_dur = match step.hold {
1703 HoldSpec::Frac(f) => Duration::from_secs_f64(ctx.duration.as_secs_f64() * f),
1704 HoldSpec::Fixed(d) => d,
1705 HoldSpec::Loop { .. } => unreachable!(),
1706 };
1707 let remaining = (scenario_start + ctx.duration)
1708 .saturating_duration_since(std::time::Instant::now());
1709 let hold_dur = hold_dur.min(remaining);
1710 // Live `sched_pid()` read — matches the loop arm above
1711 // so the hold watches the post-Op::ReplaceScheduler
1712 // pid, not the stale boot snapshot.
1713 if hold_or_sched_died(hold_dur, crate::vmm::rust_init::sched_pid()) {
1714 *sched_died_during_hold = true;
1715 return Ok(());
1716 }
1717 }
1718 }
1719
1720 // Capture the cumulative iteration count AND its scenario-relative
1721 // timestamp at this step's end, AT THE SAME INSTANT, while the
1722 // step's workers are still alive — `collect_step` tears them down
1723 // right after `run_step` returns, so this is the last moment a
1724 // step-inclusive sum is observable. `run_steps` forwards the LAST
1725 // step's (elapsed_ms, iterations) pair in the widened `ScenarioEnd`
1726 // frame so the final step's `iteration_rate` has a right boundary to
1727 // diff against. The elapsed MUST be sampled HERE,
1728 // coincident with the count: recomputing it at `ScenarioEnd` send
1729 // time would measure past `send_scenario_pause` + the last step's
1730 // `collect_step` teardown (100ms–seconds under contention),
1731 // inflating the rate's denominator with wall-time during which no
1732 // iterations accrued and systematically under-reporting the last
1733 // step's throughput (a rate consumer treats counter/duration with
1734 // coincident endpoints). Only reached on a clean hold (the
1735 // sched-died early returns above skip it — a failed run needs no
1736 // terminal rate); gated on `is_guest` so the host-side scenario walk
1737 // doesn't sum a handle set that never ran a workload.
1738 if guest_comms::is_guest() {
1739 let end_elapsed_ms = scenario_start.elapsed().as_millis() as u64;
1740 let end_iterations: u64 = scenario
1741 .all_handles()
1742 .flat_map(|(_, h)| h.snapshot_iterations())
1743 .sum();
1744 *final_total_iterations = Some((end_elapsed_ms, end_iterations));
1745
1746 // Emit a per-step StepEnd frame carrying this step's coincident
1747 // end-of-hold (elapsed_ms, total_iterations) and the SAME
1748 // 1-indexed step_index as its StepStart, so the host pairs
1749 // StepStart[k] -> StepEnd[k] for step-LOCAL throughput: each
1750 // step's OWN workers measured start-to-end, which —
1751 // unlike the cross-step StepStart[k] -> StepStart[k+1] delta —
1752 // does not read ~0 for workers respawned per step. build_stimulus
1753 // supplies the op/cgroup/worker fields + the 1-indexed
1754 // step_index; override its recomputed elapsed/iterations with the
1755 // values captured above so the StepEnd pair stays coincident with
1756 // `final_total_iterations` (no pause/teardown wall-time creep).
1757 let mut step_end = build_stimulus(&scenario_start, step_idx, &step.ops, &scenario);
1758 step_end.elapsed_ms = u32::try_from(end_elapsed_ms).unwrap_or(u32::MAX);
1759 step_end.total_iterations = end_iterations;
1760 guest_comms::send_step_end(zerocopy::IntoBytes::as_bytes(&step_end));
1761 }
1762
1763 Ok(())
1764}
1765
1766/// Build a StimulusPayload from the current scenario state (step + backdrop).
1767///
1768/// # step_idx u16 saturation
1769///
1770/// `step_idx` is a `usize` on the caller side but the wire
1771/// `StimulusPayload.step_index` is a `u16` — the slot is sized for
1772/// realistic scenarios (≤ 65 536 distinct indices, `0..=u16::MAX`).
1773/// Any `step_idx` > `u16::MAX as usize` is clamped to `u16::MAX` by
1774/// `to_u16` below, with a `tracing::warn!` that names the overflow.
1775/// Downstream consumers of the StepStart wire frame therefore see
1776/// every step past index `u16::MAX` collapsed onto the same
1777/// `step_index` value (`u16::MAX`) — the ordering is preserved for
1778/// the first 65 536 steps (indices `0..=u16::MAX`), but labels
1779/// saturate and become ambiguous once the scenario crosses the
1780/// boundary. Scenarios that need to distinguish individual steps
1781/// past `u16::MAX` must widen the wire schema field; the
1782/// saturating-clip preserves visible wake ordering at the cost of
1783/// individuality in the deep tail.
1784fn build_stimulus(
1785 scenario_start: &std::time::Instant,
1786 step_idx: usize,
1787 ops: &[Op],
1788 state: &ScenarioState<'_, '_>,
1789) -> StimulusPayload {
1790 let mut op_kinds: u32 = 0;
1791 for op in ops {
1792 op_kinds |= 1 << op.discriminant();
1793 }
1794
1795 let total_iterations: u64 = state
1796 .all_handles()
1797 .flat_map(|(_, h)| h.snapshot_iterations())
1798 .sum();
1799
1800 let cgroup_count = state.step.cgroups.names().len() + state.backdrop.cgroups.names().len();
1801 let worker_count = state.step.handles.len() + state.backdrop.handles.len();
1802
1803 // Saturate narrowing conversions for the wire schema: the
1804 // StimulusPayload fields are sized for realistic scenarios
1805 // (u32 ms, u16 counts) but `as u32` / `as u16` silently
1806 // wrap on overflow, poisoning downstream consumers. Log the
1807 // overflow so the operator sees which field exceeded its
1808 // bound and substitute MAX — clipped-high is a safer wire
1809 // value than silently wrapping to a small number.
1810 let to_u32 = |field: &str, v: u128| -> u32 {
1811 u32::try_from(v).unwrap_or_else(|_| {
1812 tracing::warn!(
1813 field,
1814 value = %v,
1815 "StimulusPayload field overflowed u32; saturating to u32::MAX",
1816 );
1817 u32::MAX
1818 })
1819 };
1820 let to_u16 = |field: &str, v: usize| -> u16 {
1821 u16::try_from(v).unwrap_or_else(|_| {
1822 tracing::warn!(
1823 field,
1824 value = v,
1825 "StimulusPayload field overflowed u16; saturating to u16::MAX",
1826 );
1827 u16::MAX
1828 })
1829 };
1830
1831 // Encode the 1-indexed phase number per the framework's
1832 // phase convention -- the BASELINE (pre-first-Step) window owns
1833 // 0, scenario Step k publishes k + 1. Saturate at u16::MAX
1834 // (rather than wrap) so a pathological 65k-step scenario still
1835 // produces a clipped-high value the host parser can recognise
1836 // instead of silently rolling over.
1837 let phase_step_index: u16 = u16::try_from(step_idx)
1838 .ok()
1839 .and_then(|i| i.checked_add(1))
1840 .unwrap_or_else(|| {
1841 tracing::warn!(
1842 field = "step_index",
1843 value = step_idx,
1844 "StimulusPayload step_index overflowed u16 after 1-indexed encoding; saturating to u16::MAX",
1845 );
1846 u16::MAX
1847 });
1848 StimulusPayload {
1849 elapsed_ms: to_u32("elapsed_ms", scenario_start.elapsed().as_millis()),
1850 step_index: phase_step_index,
1851 op_count: to_u16("op_count", ops.len()),
1852 op_kinds,
1853 cgroup_count: to_u16("cgroup_count", cgroup_count),
1854 worker_count: to_u16("worker_count", worker_count),
1855 total_iterations,
1856 }
1857}
1858
1859/// Validate that a MemPolicy's node set is consistent with the
1860/// cgroup's scenario intent — the cpuset the cgroup runs in and
1861/// the host topology.
1862///
1863/// # Empty-nodemask early return
1864///
1865/// Policies with no nodemask — [`MemPolicy::Default`] and
1866/// [`MemPolicy::Local`] — carry no node IDs to validate against,
1867/// so this function returns `Ok(())` unconditionally for them
1868/// (after the unknown-bit and mutual-exclusion flag guards run).
1869/// Every other variant — any variant carrying a nodemask,
1870/// currently [`MemPolicy::Bind`], [`MemPolicy::Preferred`],
1871/// [`MemPolicy::PreferredMany`], [`MemPolicy::Interleave`], and
1872/// [`MemPolicy::WeightedInterleave`] — reaches the cpuset /
1873/// host-topology coverage logic below.
1874///
1875/// # Why this is a scenario-intent check, not a kernel guard
1876///
1877/// ktstr writes `cpuset.cpus` on each cgroup but never writes
1878/// `cpuset.mems`, so `cpuset.mems` keeps its inherited default —
1879/// the permissive "all nodes" set in every ktstr deployment
1880/// shape (PID 1 inside the guest VM, cgroup root on the host).
1881/// The kernel's `set_mempolicy(2)` path always runs the policy's
1882/// nodemask through `mpol_set_nodemask` in `mm/mempolicy.c`, which
1883/// intersects it with the caller's `mems_allowed` before it is
1884/// stored on the task; because ktstr never narrows `mems_allowed`,
1885/// that intersection is an identity operation under ktstr's
1886/// deployment — the stored nodemask equals the one the caller
1887/// supplied, and the kernel never rejects or silently trims the
1888/// policy the way it would if `mems_allowed` were disjoint from
1889/// the requested set. Rejection of a mismatched policy is
1890/// therefore validator-only: if this function does not bail, the
1891/// policy lands on the syscall unchanged and `run_steps` commits
1892/// to running the worker with a misconfigured allocation target.
1893///
1894/// What the validator catches is a **scenario-design mismatch**:
1895/// you pinned CPUs on NUMA node X (via `CpusetSpec::Numa(X)`) but
1896/// asked the mempolicy to bind/prefer/interleave a disjoint node Y,
1897/// meaning the worker's compute is local to node X while its
1898/// allocations live on node Y — producing cross-socket traffic
1899/// that the test author almost certainly did not intend. Surface
1900/// the mismatch here before `run_steps` commits to the policy.
1901///
1902/// `MpolFlags::STATIC_NODES` is the rebind-behavior flag. Two
1903/// kernel sites encode the semantics: `mpol_set_nodemask` in
1904/// `mm/mempolicy.c` consumes the flag during policy creation (it
1905/// determines whether the supplied nodemask is stored absolute or
1906/// remapped against the caller's cpuset at install time), and
1907/// `mpol_rebind_policy` (same file) branches on the flag when the
1908/// cpuset's `mems_allowed` changes after the policy was installed
1909/// — with `STATIC_NODES` set, the stored nodemask is unchanged;
1910/// without it, the kernel remaps the nodemask against the new
1911/// `mems_allowed`. Since ktstr never rebinds `cpuset.mems` mid-run,
1912/// only the install-time semantics applies, and the flag is
1913/// effectively a cross-node-intent declaration for the validator's
1914/// purposes — a sign the author knows the intent is "allocations on
1915/// a node outside the CPU-affinity cpuset" and has opted in to
1916/// that shape.
1917///
1918/// # Flag-specific handling (in order of evaluation)
1919///
1920/// - `STATIC_NODES | RELATIVE_NODES` both set → bail: the kernel
1921/// rejects this combination with `EINVAL`; surfacing it here
1922/// names the offender before the syscall.
1923/// - `STATIC_NODES` only → the caller has declared intentional
1924/// cross-node placement. Skip the cpuset-intent check, but each
1925/// referenced node must exist on the host topology or the
1926/// kernel will reject the policy. Verify existence; bail with
1927/// the missing nodes if any.
1928/// - `RELATIVE_NODES` only → the nodemask is an ordinal into the
1929/// cpuset's allowed-nodes set. Cpuset coverage does not apply in
1930/// absolute-id terms, so bypass.
1931/// - No relevant flag set → enforce cpuset-intent coverage:
1932/// every policy node must appear in the cpuset's covered NUMA
1933/// nodes. Bail naming the uncovered nodes AND both escape
1934/// hatches (STATIC_NODES opt-in; widening the cpuset).
1935///
1936/// Reject `--flag` args whose bare name is not in the payload's
1937/// `known_flags` allowlist. Returns `Ok(())` when the payload
1938/// declared no allowlist (`known_flags: None`) — the opt-in
1939/// contract defaults to "permissive" so payloads wrapping
1940/// open-ended binaries (stress-ng, fio, schbench) aren't forced
1941/// to enumerate every flag their upstream tool accepts.
1942///
1943/// Recognises two flag shapes: `--foo` (flag-only) and
1944/// `--foo=value` (flag-with-attached-value). Non-flag args
1945/// (positional, `-short`, everything else) are passed through
1946/// without inspection — the allowlist scopes to long flags only.
1947///
1948/// Extracted out of `apply_ops`'s `Op::RunPayload` arm so the
1949/// validation is unit-testable without standing up a full Ctx
1950/// / scenario state. See the caller for how the allowlist is
1951/// threaded through Op::RunPayload execution.
1952fn validate_known_flags(payload: &crate::test_support::Payload, args: &[String]) -> Result<()> {
1953 let Some(allowlist) = payload.known_flags else {
1954 return Ok(());
1955 };
1956 for arg in args {
1957 let Some(flag_body) = arg.strip_prefix("--") else {
1958 continue;
1959 };
1960 // `split('=').next()` is infallible: `str::split` always
1961 // yields at least one element (the full string when no
1962 // separator is present). The prior `unwrap_or("")` fallback
1963 // was dead code — the empty-name branch below never fired
1964 // via this path since `flag_body` had already passed the
1965 // `strip_prefix("--")` filter above (leaving at least one
1966 // character). Kept the `name.is_empty()` guard in place
1967 // only to handle the degenerate `"--"` bare-dashes case,
1968 // which produces `flag_body = ""` → `name = ""`.
1969 let name = flag_body
1970 .split('=')
1971 .next()
1972 .expect("str::split always yields at least one element");
1973 if name.is_empty() {
1974 continue;
1975 }
1976 if !allowlist.contains(&name) {
1977 anyhow::bail!(
1978 "Op::RunPayload: payload '{}' received unknown flag \
1979 '--{name}' — not in its known_flags allowlist \
1980 {allowlist:?}. Check the spelling against the \
1981 payload's declared flags; if '--{name}' is a new \
1982 legitimate flag, add it to `Payload::known_flags`.",
1983 payload.name,
1984 );
1985 }
1986 }
1987 Ok(())
1988}
1989
1990fn validate_mempolicy_cpuset(
1991 policy: &MemPolicy,
1992 flags: crate::workload::MpolFlags,
1993 cpuset: &BTreeSet<usize>,
1994 ctx: &Ctx,
1995 cgroup_name: &str,
1996) -> Result<()> {
1997 use crate::workload::MpolFlags;
1998
1999 // Reject unknown bits before any other check. The `MpolFlags`
2000 // type is a `u32` bitfield covering three documented bits
2001 // (STATIC_NODES, RELATIVE_NODES, NUMA_BALANCING); any other bit
2002 // set in `flags` is either a user typo (raw-constructing the
2003 // struct with an arbitrary integer) or forward-compat from a
2004 // future kernel flag that this validator hasn't learned yet.
2005 // Either way, surfacing unknown bits here prevents a silent
2006 // semantic mismatch — the kernel would either reject with
2007 // EINVAL or (worse) treat the bit as a flag we don't model.
2008 let known_bits = MpolFlags::STATIC_NODES.bits()
2009 | MpolFlags::RELATIVE_NODES.bits()
2010 | MpolFlags::NUMA_BALANCING.bits();
2011 let unknown_bits = flags.bits() & !known_bits;
2012 if unknown_bits != 0 {
2013 anyhow::bail!(
2014 "cgroup '{}': MpolFlags contains unknown bit(s) {:#x} (known bits: \
2015 STATIC_NODES={:#x}, RELATIVE_NODES={:#x}, NUMA_BALANCING={:#x}); \
2016 refusing to forward to the kernel — update MpolFlags to model the \
2017 new bit before using it, or clear the bit at the call site",
2018 cgroup_name,
2019 unknown_bits,
2020 MpolFlags::STATIC_NODES.bits(),
2021 MpolFlags::RELATIVE_NODES.bits(),
2022 MpolFlags::NUMA_BALANCING.bits(),
2023 );
2024 }
2025
2026 // `STATIC_NODES | RELATIVE_NODES` is a kernel-rejected combination —
2027 // `MPOL_F_STATIC_NODES` and `MPOL_F_RELATIVE_NODES` are mutually
2028 // exclusive (see `include/uapi/linux/mempolicy.h` + the
2029 // `sanitize_mpol_flags` helper in `mm/mempolicy.c`, which bails
2030 // with `EINVAL` if both are set). Fail early here instead of
2031 // letting the syscall return a generic error — the scenario
2032 // caller almost certainly meant one or the other, not both.
2033 if flags.contains(MpolFlags::STATIC_NODES) && flags.contains(MpolFlags::RELATIVE_NODES) {
2034 anyhow::bail!(
2035 "cgroup '{}': MpolFlags::STATIC_NODES and MpolFlags::RELATIVE_NODES are \
2036 mutually exclusive (the kernel will reject the set_mempolicy syscall with \
2037 EINVAL); pick whichever matches the intended semantics — STATIC_NODES \
2038 for absolute node ids that survive cpuset changes, RELATIVE_NODES for \
2039 cpuset-relative indices",
2040 cgroup_name,
2041 );
2042 }
2043
2044 let policy_nodes = policy.node_set();
2045 if policy_nodes.is_empty() {
2046 return Ok(());
2047 }
2048
2049 // `STATIC_NODES`: nodemask is treated as absolute node ids and NOT
2050 // intersected with the cpuset. The cpuset-coverage check below
2051 // does not apply, but we DO need to verify the referenced nodes
2052 // actually exist on the host — a policy pinning node 7 on a
2053 // 2-node host would fail at syscall time; surfacing it here
2054 // names the offender.
2055 if flags.contains(MpolFlags::STATIC_NODES) {
2056 let host_nodes = ctx.topo.numa_node_ids();
2057 let missing: Vec<usize> = policy_nodes
2058 .iter()
2059 .copied()
2060 .filter(|n| !host_nodes.contains(n))
2061 .collect();
2062 if !missing.is_empty() {
2063 anyhow::bail!(
2064 "cgroup '{}': MemPolicy with MpolFlags::STATIC_NODES references \
2065 NUMA node(s) {:?} that do not exist on this host (host nodes: {:?}); \
2066 the kernel will reject or silently drop the policy (Preferred can \
2067 silently fall back to local allocation; Bind/Interleave reject with \
2068 EINVAL) — fix the MemPolicy or pick a host with the required nodes",
2069 cgroup_name,
2070 missing,
2071 host_nodes,
2072 );
2073 }
2074 return Ok(());
2075 }
2076
2077 // `RELATIVE_NODES`: nodemask is an ordinal into the cpuset's
2078 // allowed nodes, not an absolute node id set. The cpuset-coverage
2079 // check compares absolute ids, so it does not apply here — the
2080 // kernel does the relative-to-absolute remap internally. Trust
2081 // the caller and bypass the coverage bail, same shape as the
2082 // STATIC_NODES early return.
2083 if flags.contains(MpolFlags::RELATIVE_NODES) {
2084 return Ok(());
2085 }
2086
2087 let cpuset_numa = ctx.topo.numa_nodes_for_cpuset(cpuset);
2088 let uncovered: Vec<usize> = policy_nodes
2089 .iter()
2090 .copied()
2091 .filter(|n| !cpuset_numa.contains(n))
2092 .collect();
2093 if !uncovered.is_empty() {
2094 anyhow::bail!(
2095 "cgroup '{}': MemPolicy references NUMA node(s) {:?} \
2096 outside the cpuset's coverage (cpuset covers node(s) \
2097 {:?}) — some or all of the worker's allocations would \
2098 live on NUMA nodes its CPUs cannot reach locally, \
2099 producing cross-socket allocation traffic that is \
2100 almost certainly unintended. Two fixes: \
2101 (a) add .mpol_flags(MpolFlags::STATIC_NODES) to \
2102 declare the cross-node placement intentional (the \
2103 flag survives cpuset rebinds; see MpolFlags doc), or \
2104 (b) widen the cpuset to cover the policy's nodes \
2105 (e.g. CpusetSpec::Numa(N) for each referenced N, or \
2106 a CpusetSpec::Exact set that spans both).",
2107 cgroup_name,
2108 uncovered,
2109 cpuset_numa,
2110 );
2111 }
2112 Ok(())
2113}
2114
2115/// SIGKILL every worker in `handles` without reaping. Used only on
2116/// the scheduler-death paths in [`run_scenario`]: when the scheduler
2117/// under test dies, the kernel disables sched_ext and its workers
2118/// fall back to the builtin scheduler, so a CPU-bound worker pool
2119/// would otherwise make the per-worker `waitpid` reap in the
2120/// subsequent [`collect_step`] / [`collect_backdrop`]
2121/// scheduling-gated (teardown time scaling with worker count).
2122/// Delivering SIGKILL to every worker first lets that reap find
2123/// already-exiting workers. See
2124/// [`crate::workload::WorkloadHandle::sigkill_workers`].
2125fn sigkill_handles(handles: &[(String, WorkloadHandle)]) {
2126 for (_, h) in handles {
2127 h.sigkill_workers();
2128 }
2129}
2130
2131/// Collect step-local worker results and produce an AssertResult.
2132///
2133/// Drains step-local handles + payload handles; backdrop state is
2134/// untouched. Called at every step boundary (success AND error
2135/// paths) as the "Step is fully bounded" teardown. The
2136/// `step_state` goes out of scope at the end of this step's
2137/// iteration, so its `CgroupGroup` drop removes every step-local
2138/// cgroup immediately after `run_scenario` propagates the result
2139/// of this call.
2140///
2141/// Before draining handles, every step-local cgroup is unfrozen
2142/// (`cgroup.freeze` ← 0). An [`Op::FreezeCgroup`] without a paired
2143/// [`Op::UnfreezeCgroup`] would leave step-local tasks frozen at
2144/// step boundary, where the graceful cooperative stop cannot make
2145/// progress: the worker stop is a non-fatal SIGUSR1, and a frozen
2146/// task re-enters the freezer trap on a non-fatal signal
2147/// (`kernel/signal.c` `do_freezer_trap`) rather than running its
2148/// handler — so it never flips its stop flag or reports until
2149/// thawed, and `stop_and_collect` burns its full collection
2150/// deadline before falling back to a sentinel report. (A fatal
2151/// SIGKILL DOES wake and kill a frozen task — it diverges before
2152/// the freezer trap — so worker death and the subsequent rmdir are
2153/// unaffected by the freeze; the unfreeze only restores the
2154/// graceful path's ability to collect real reports.) Failures are
2155/// logged at warn level only — a missing freezer file or a cgroup
2156/// that was already torn down is benign at teardown time, and
2157/// propagating would mask the real workload result.
2158fn collect_step(
2159 step_state: &mut StepState<'_>,
2160 checks: &crate::assert::Assert,
2161 topo: &crate::topology::TestTopology,
2162 cgroups: &dyn crate::cgroup::CgroupOps,
2163 // The 1-indexed phase step_index (BASELINE=0, Step k -> k+1) this
2164 // teardown belongs to, threaded so collect_handles can attach the per-phase
2165 // per_cgroup carrier. `None` for the defensive backdrop-staging collect,
2166 // which has no step attribution.
2167 step_index: Option<u16>,
2168) -> AssertResult {
2169 // Unfreeze every step-local cgroup before the graceful collect.
2170 // A frozen worker re-enters the freezer trap on the cooperative
2171 // (non-fatal) SIGUSR1 stop instead of running its handler, so it
2172 // never reports until thawed. (SIGKILL would still kill it; only
2173 // the graceful report path needs the thaw.)
2174 for name in step_state.cgroups.names() {
2175 if let Err(e) = cgroups.set_freeze(name, false) {
2176 tracing::warn!(
2177 cgroup = %name,
2178 err = %format!("{e:#}"),
2179 "collect_step: pre-teardown unfreeze failed; any frozen workers will yield sentinel reports (still SIGKILL-reaped, no leak)"
2180 );
2181 }
2182 }
2183 // Kill any CgroupDef::workload / Op::RunPayload payload binaries
2184 // still live at step teardown so cgroupfs cleanup does not trip
2185 // EBUSY. Metrics are emitted to the SHM ring by PayloadHandle::kill
2186 // via the `evaluate()` pipeline.
2187 drain_all_payload_handles(&mut step_state.payload_handles);
2188 // Drain any host-mode stall reports that accumulated during the
2189 // step BEFORE dropping the monitor handle (Drop joins the
2190 // polling thread). Reports get folded into the merged
2191 // [`AssertResult`] below as
2192 // [`crate::assert::DetailKind::WorkerStalled`] failures. `take`
2193 // ensures the handle drops here (joining the thread) so the
2194 // polling thread exits before per-step teardown returns.
2195 let stall_reports = if let Some(handle) = step_state.stall_monitor.take() {
2196 let reports = handle.drain();
2197 drop(handle);
2198 reports
2199 } else {
2200 Vec::new()
2201 };
2202 let handles = std::mem::take(&mut step_state.handles);
2203 let mut result = crate::scenario::collect_handles(
2204 handles.into_iter().map(|(name, h)| {
2205 let cpuset = step_state.cpusets.get(&name);
2206 (name, h, cpuset)
2207 }),
2208 checks,
2209 Some(topo),
2210 step_index,
2211 );
2212 for report in stall_reports {
2213 result.record_fail(crate::assert::AssertDetail::new(
2214 crate::assert::DetailKind::WorkerStalled,
2215 format_stall_report(&report),
2216 ));
2217 }
2218 result
2219}
2220
2221/// Render a [`crate::scenario::host_stall::StallReport`] as a
2222/// human-readable single-string assertion detail. Emits the stalled
2223/// pid + comm, the sample-window summary (first and last values
2224/// for both counters PLUS the computed `last - first` delta — both
2225/// expected to be zero for a true stall but rendered from the
2226/// samples themselves so a predicate-tolerance refactor stays
2227/// observable), and the diagnostic subset (state, wchan, syscall,
2228/// cgroup, host loadavg, optional kernel stack). The diagnostic's
2229/// `status_full` field is intentionally OMITTED — the parsed
2230/// `state` letter carries the actionable signal and the full
2231/// status file is verbose; sidecar consumers keying off
2232/// [`crate::assert::DetailKind::WorkerStalled`] can match on the
2233/// kind discriminator and read the full StallReport (carries the
2234/// status_full field) without parsing this message.
2235///
2236/// Format is multi-line so the operator can read the trip at a
2237/// glance without parsing structured output.
2238fn format_stall_report(report: &crate::scenario::host_stall::StallReport) -> String {
2239 use std::fmt::Write as _;
2240 let mut s = String::new();
2241 let _ = writeln!(
2242 s,
2243 "worker stall detected: pid={} comm={:?} (host-mode /proc/<pid>/sched polling)",
2244 report.pid, report.comm,
2245 );
2246 if let (Some(first), Some(last)) = (report.samples.first(), report.samples.last()) {
2247 // Compute deltas from the actual sample values so a
2248 // predicate-tolerance refactor (e.g. "fire on Δ <= N
2249 // switches rather than == 0") stays observable in the
2250 // rendered output. Saturating subtraction handles the
2251 // pathological counter-rollover case without panic.
2252 let nr_delta = last.nr_switches.saturating_sub(first.nr_switches);
2253 let rt_delta = last
2254 .sum_exec_runtime_ns
2255 .saturating_sub(first.sum_exec_runtime_ns);
2256 let _ = writeln!(
2257 s,
2258 " sample window: nr_switches {} -> {} (delta {nr_delta}), sum_exec_runtime_ns {} -> {} (delta {rt_delta}), {} samples",
2259 first.nr_switches,
2260 last.nr_switches,
2261 first.sum_exec_runtime_ns,
2262 last.sum_exec_runtime_ns,
2263 report.samples.len(),
2264 );
2265 }
2266 let d = &report.diagnostic;
2267 let _ = writeln!(s, " state: {}", d.state);
2268 let _ = writeln!(s, " wchan: {}", d.wchan);
2269 let _ = writeln!(s, " syscall: {}", d.syscall);
2270 let _ = writeln!(s, " cgroup: {}", d.cgroup);
2271 let _ = writeln!(s, " host loadavg: {}", d.host_loadavg);
2272 if let Some(stack) = &d.stack {
2273 let _ = writeln!(s, " kernel stack:\n{stack}");
2274 }
2275 s
2276}
2277
2278/// Collect backdrop (persistent) worker results. Called once at
2279/// scenario end after every Step has torn down. The
2280/// `backdrop_state.cgroups` RAII guard drops persistent cgroups
2281/// when `backdrop_state` itself drops.
2282///
2283/// Mirrors [`collect_step`]'s pre-teardown unfreeze pass over every
2284/// tracked cgroup, for the same reason: a backdrop cgroup left
2285/// frozen at scenario end cannot run the graceful cooperative stop.
2286/// A frozen task re-enters the freezer trap on the non-fatal SIGUSR1
2287/// (`kernel/signal.c` `do_freezer_trap`) instead of running its
2288/// handler, so its workers never report until thawed. The asymmetry
2289/// between step-local and backdrop teardown — only the former
2290/// unfreezing — would surface as backdrop workers yielding sentinel
2291/// reports (after burning the collection deadline) on every scenario
2292/// whose Backdrop froze a cgroup and never unfroze it. Symmetric
2293/// unfreeze brings the backdrop path back in line with the per-step
2294/// path. (As in `collect_step`, a fatal SIGKILL still kills a frozen
2295/// worker, so death and the `BackdropState::cgroups` RAII rmdir are
2296/// unaffected by the freeze.)
2297fn collect_backdrop(
2298 backdrop_state: &mut BackdropState<'_>,
2299 checks: &crate::assert::Assert,
2300 topo: &crate::topology::TestTopology,
2301 cgroups: &dyn crate::cgroup::CgroupOps,
2302) -> AssertResult {
2303 // Unfreeze every backdrop cgroup before the graceful collect.
2304 // Same rationale as `collect_step`: a frozen worker re-enters
2305 // the freezer trap on the cooperative (non-fatal) SIGUSR1 stop
2306 // instead of running its handler, so it never reports until
2307 // thawed. (SIGKILL would still kill it; only the graceful report
2308 // path needs the thaw.)
2309 for name in backdrop_state.cgroups.names() {
2310 if let Err(e) = cgroups.set_freeze(name, false) {
2311 tracing::warn!(
2312 cgroup = %name,
2313 err = %format!("{e:#}"),
2314 "collect_backdrop: pre-teardown unfreeze failed; any frozen workers will yield sentinel reports (still SIGKILL-reaped, no leak)"
2315 );
2316 }
2317 }
2318 drain_all_payload_handles(&mut backdrop_state.payload_handles);
2319 let handles = std::mem::take(&mut backdrop_state.handles);
2320 crate::scenario::collect_handles(
2321 handles.into_iter().map(|(name, h)| {
2322 let cpuset = backdrop_state.cpusets.get(&name);
2323 (name, h, cpuset)
2324 }),
2325 checks,
2326 Some(topo),
2327 // Backdrop spans every phase: the None arm expands each worker's
2328 // per-phase PhaseSlices into one per-epoch PhaseBucket
2329 // (expand_backdrop_phase_buckets) — per-epoch, not single-step,
2330 // attribution.
2331 None,
2332 )
2333}
2334
2335/// Kill every payload handle whose cgroup matches `cgroup` and drop
2336/// the matched entries from `handles`. Runs before the cgroup is
2337/// removed or stopped; failures are logged to stderr but do not
2338/// propagate — the cgroup removal is best-effort already, and the
2339/// payload-kill failure is never the primary error.
2340///
2341/// **Metric emission depends on the explicit `.kill()` call** —
2342/// if a future refactor replaces the `.kill()` below with plain
2343/// `drop(handle)`, the `PayloadHandle::drop` SIGKILLs the child
2344/// but skips the evaluate-and-emit pipeline that records metrics
2345/// to the SHM ring. Test helpers that drain payload handles
2346/// likewise route through `drain_all_payload_handles` for the
2347/// same reason. Preserve `.kill()` on every path that claims to
2348/// drain handles for metric capture.
2349///
2350/// Drop order across matched entries is LIFO (last pushed, first
2351/// dropped) — the loop walks indices from the tail toward index 0
2352/// using `Vec::remove` so newer matched entries' embedded
2353/// `SigchldScope`s restore the SIGCHLD disposition before older
2354/// matches do, matching the save-and-restore chain documented on
2355/// `PayloadHandle` in `payload_run.rs`. `Vec::swap_remove` would
2356/// rotate the tail into the freed slot and break LIFO across
2357/// matches; `Vec::remove` preserves the relative order of the
2358/// remaining (unmatched) survivors. Note: SIGCHLD scope LIFO across
2359/// the FULL vec is structurally unsalvageable in any partial-drain
2360/// helper — unmatched entries that stay alive in `handles` outlive
2361/// their younger matched siblings whose scopes already restored.
2362/// The full-vec LIFO contract holds only when every handle is
2363/// dropped together via [`drain_all_payload_handles`].
2364fn drain_payload_handles_for_cgroup(handles: &mut Vec<PayloadEntry>, cgroup: &str) {
2365 let mut i = handles.len();
2366 while i > 0 {
2367 i -= 1;
2368 if handles[i].cgroup.as_str() == cgroup {
2369 let entry = handles.remove(i);
2370 if let Err(e) = entry.handle.kill() {
2371 eprintln!("ktstr: kill payload in cgroup '{cgroup}': {e:#}");
2372 }
2373 }
2374 }
2375}
2376
2377/// Kill every payload handle regardless of cgroup and clear the
2378/// vector. Called at step-sequence teardown so every handle gets a
2379/// terminal `.kill()` (and therefore a sidecar metric emission) even
2380/// when no explicit `RemoveCgroup`/`StopCgroup` op targeted it.
2381///
2382/// Drop order is LIFO (last pushed, first dropped) — `Vec::pop`
2383/// returns the tail first, so `PayloadHandle::drop` runs in reverse
2384/// creation order. Each handle's embedded `SigchldScope` captured the
2385/// `SIGCHLD` disposition that was live at construction time (the
2386/// previous scope's installed `SIG_DFL`). Restoring in LIFO unwinds
2387/// the save-and-restore chain back to the original disposition; FIFO
2388/// drop (e.g. `Vec::drain(..)`) restores intermediate `SIG_DFL` values
2389/// out of order and leaks `SIG_DFL` past the outermost scope. See the
2390/// DROP-ORDER-CRITICAL note on `PayloadHandle` in `payload_run.rs`.
2391fn drain_all_payload_handles(handles: &mut Vec<PayloadEntry>) {
2392 while let Some(entry) = handles.pop() {
2393 if let Err(e) = entry.handle.kill() {
2394 eprintln!(
2395 "ktstr: teardown kill payload in cgroup {}: {e:#}",
2396 render_cgroup_key(&entry.cgroup),
2397 );
2398 }
2399 }
2400}
2401
2402#[cfg(test)]
2403mod tests;
2404
2405#[cfg(test)]
2406mod workers_pct_construction_tests;
2407
2408#[cfg(test)]
2409mod kernel_op_dispatch_tests;