ktstr/scenario/ops/setup.rs
1//! Setup-phase orchestration for [`crate::scenario::ops::Op`]-driven scenarios.
2//!
3//! [`apply_setup`] runs the [`CgroupDef`] passes that materialise each
4//! declared cgroup, resolve its cpuset / cpu / memory / io / pids
5//! controllers, partition `WorkSpec`s by `pcomm`, spawn worker handles,
6//! and start any [`CgroupDef::workload`] payload. After the per-def
7//! loop finishes it triggers [`maybe_start_stall_monitor`] for the
8//! host-mode worker-stall poller.
9//!
10//! Sibling to [`super::apply_ops`]; both mutate the same
11//! [`super::ScenarioState`] view over step-local + backdrop state.
12
13use std::collections::BTreeSet;
14
15use anyhow::Result;
16
17use crate::scenario::Ctx;
18use crate::vmm::guest_comms;
19use crate::workload::{WorkloadConfig, WorkloadHandle};
20
21use super::{CgroupDef, PayloadEntry, PayloadSource, ScenarioState, validate_mempolicy_cpuset};
22
23/// Path of the scenario placement-history log. Captures placement
24/// events from both `apply_setup` (worker / payload spawn) and
25/// `apply_ops` (`Op::MoveAllTasks` pre/post cgroup.procs snapshots).
26/// Tests that assert on per-cgroup worker placement can read this
27/// file on failure to see exactly which PIDs landed in which cgroup
28/// at spawn time AND how `Op::MoveAllTasks` migrated them. The file
29/// lives in tmpfs and survives the scenario teardown (only the
30/// cgroup directories are rmdir'd; this file stays put). The log is
31/// append-only and is never cleared between runs (the only writer,
32/// `append_placement_log`, opens with `.append(true)` and never
33/// truncates), so lines from prior runs accumulate — a reader that
34/// needs only the current run's placement must filter accordingly.
35pub const PLACEMENT_LOG_PATH: &str = "/tmp/ktstr-placement.log";
36
37/// Append `msg` (with a trailing newline) to the placement-history
38/// log. See [`PLACEMENT_LOG_PATH`] for the transport rationale
39/// (every guest-side stderr/serial route was empirically confirmed
40/// to drop scenario-era diagnostic bytes in test mode). Errors are
41/// swallowed because this is a best-effort diagnostic — if the
42/// file write fails, the placement record just goes missing and the
43/// operator falls back to the existing post-mortem cgroup.procs
44/// snapshot.
45pub(super) fn append_placement_log(msg: &str) {
46 use std::io::Write;
47 if let Ok(mut f) = std::fs::OpenOptions::new()
48 .create(true)
49 .append(true)
50 .open(PLACEMENT_LOG_PATH)
51 {
52 let _ = writeln!(f, "{msg}");
53 }
54}
55
56/// Walk every [`CgroupDef`] in `defs` and register it against the
57/// step or backdrop cgroup slot selected by `state.target_backdrop`.
58/// A duplicate name (already tracked by either state) bails — a
59/// [`CgroupDef`] must not silently shadow a cgroup that another state
60/// slot has already created.
61///
62/// Each `CgroupDef`'s `works` vec is iterated, spawning one
63/// [`WorkloadHandle`] per [`crate::workload::WorkSpec`] entry. Multiple
64/// `WorkSpec`s for the same cgroup produce multiple handle entries with
65/// the same name key; ops that filter by cgroup name (`StopCgroup`,
66/// `SetAffinity`, etc.) naturally apply to all of them. When `works`
67/// is empty, a single default `WorkSpec` is used (`SpinWait`, `Normal`,
68/// `ctx.workers_per_cgroup` workers). Cgroups created here route into
69/// step-local or backdrop state per the `target_backdrop` flag.
70pub(super) fn apply_setup(
71 ctx: &Ctx,
72 state: &mut ScenarioState<'_, '_>,
73 defs: &[CgroupDef],
74) -> Result<()> {
75 for def in defs {
76 if state.cgroup_name_is_tracked(&def.name) {
77 anyhow::bail!(
78 "CgroupDef '{}' collides with a cgroup already tracked \
79 (by a prior Backdrop or step-local CgroupDef) — declare it \
80 in exactly one place; use a fresh name for the step-local cgroup",
81 def.name,
82 );
83 }
84 state.target_cgroups().add_cgroup_no_cpuset(&def.name)?;
85 // Per-controller application, in the order the kernel docs and
86 // the operator-meaningful constraints want: cpuset (sets the
87 // CPUs every later task inherits) before cpuset.mems (NUMA
88 // node binding) before cpu (weight + cpu.max) before memory
89 // (max/high/low/swap) before io (weight) before pids
90 // (pids.max). Each helper is a no-op early-return when its
91 // `def.<controller>` field is `None`.
92 apply_cgroup_cpuset(ctx, state, def)?;
93 apply_cgroup_cpuset_mems(ctx, def)?;
94 apply_cgroup_cpu(ctx, def)?;
95 apply_cgroup_memory(ctx, def)?;
96 apply_cgroup_io(ctx, def)?;
97 apply_cgroup_pids(ctx, def)?;
98 // Materialize the per-WorkSpec values with cgroup-level
99 // defaults merged in, resolve workers_pct / num_workers /
100 // work_type / affinity, then spawn the synthetic worker
101 // handles and the optional userspace payload.
102 let resolved_works = resolve_def_works(ctx, state, def)?;
103 spawn_def_workers(ctx, state, def, resolved_works)?;
104 spawn_def_payload(ctx, state, def)?;
105 }
106 // Start the host-mode stall monitor once we've spawned workers
107 // for the first apply_setup in this step. Skip when:
108 // - running inside the guest (the VM-side freeze coordinator
109 // owns stall detection there; the host-mode poller would
110 // read its own guest /proc/sched, which has no relevance);
111 // - running under cargo_test_mode (in-process VMM tests share
112 // the host's /proc with the test harness itself and a poller
113 // would observe the harness threads, not the scenario);
114 // - the apply_setup pass is routing to the Backdrop slot
115 // (Backdrop workers are scenario-persistent; the per-step
116 // monitor's lifetime would be wrong for them, and a future
117 // task can add a parallel Backdrop monitor if needed);
118 // - the monitor is already started for this StepState (an
119 // `Op::AddCgroupDef` that re-enters apply_setup mid-step
120 // should not spawn a second poller — the existing one
121 // already runs);
122 // - no workers exist (degenerate scenarios with all worker
123 // spawns deferred to apply_ops or with zero spawns at all).
124 maybe_start_stall_monitor(state);
125 Ok(())
126}
127
128/// Apply [`CgroupDef::cpuset`] for `def`: resolve the spec against
129/// the topology, surface the workers_pct / empty-cpuset diagnostics,
130/// validate, write the cpuset, and record it on `state`. No-op when
131/// `def.cpuset` is `None` (the cgroup inherits the parent cpuset).
132fn apply_cgroup_cpuset(
133 ctx: &Ctx,
134 state: &mut ScenarioState<'_, '_>,
135 def: &CgroupDef,
136) -> Result<()> {
137 if let Some(ref cpuset_spec) = def.cpuset {
138 let resolved = cpuset_spec.resolve_quiet(ctx);
139 // workers_pct + empty cpuset combinations produce more
140 // actionable diagnostics than the generic CpusetSpec
141 // empty-mask rejection — surface them here before
142 // validate's broader empty-Exact reject preempts the
143 // per-pct context.
144 //
145 // Two distinct empty-cpuset misconfigurations:
146 //
147 // (1) any WorkSpec sets BOTH workers(N) and
148 // workers_pct(P): the dual-set is the more
149 // fundamental error and must be resolved before
150 // the cpuset semantics matter. Surface "BOTH
151 // workers ... workers_pct" here rather than letting
152 // validate's empty-mask rejection mask it.
153 //
154 // (2) one or more WorkSpecs set workers_pct only:
155 // enumerate every configured pct value so a
156 // multi-WorkSpec cgroup doesn't silently drop all
157 // but the first.
158 if resolved.is_empty() {
159 let works = def.merged_works();
160 if let Some(dual_work) = works
161 .iter()
162 .find(|w| w.workers_pct.is_some() && w.num_workers.is_some())
163 {
164 let n = dual_work
165 .num_workers
166 .expect("dual_work selected via num_workers.is_some()");
167 let pct = dual_work
168 .workers_pct
169 .expect("dual_work selected via workers_pct.is_some()");
170 anyhow::bail!(
171 "cgroup '{}': WorkSpec sets BOTH workers({n}) \
172 and workers_pct({pct}); pick one — \
173 workers_pct resolves the cpuset fraction at \
174 apply-setup time and is incompatible with an \
175 explicit count. The empty cpuset would \
176 otherwise mask this conflict; resolve the \
177 workers/workers_pct conflict first",
178 def.name,
179 );
180 }
181 let pcts: Vec<(usize, f64)> = works
182 .iter()
183 .enumerate()
184 .filter_map(|(i, w)| w.workers_pct.map(|p| (i, p)))
185 .collect();
186 if !pcts.is_empty() {
187 let pct_display = if pcts.len() == 1 {
188 format!("workers_pct({})", pcts[0].1)
189 } else {
190 // Include positional indices so the operator
191 // can disambiguate when the same fraction is
192 // configured on multiple WorkSpecs (e.g.
193 // `[works[0]=0.5, works[2]=0.5]` shows which
194 // entries to adjust without grepping the test).
195 let list = pcts
196 .iter()
197 .map(|(i, p)| format!("works[{i}]={p}"))
198 .collect::<Vec<_>>()
199 .join(", ");
200 format!("workers_pct values [{list}]")
201 };
202 anyhow::bail!(
203 "cgroup '{}': {pct_display} on a cpuset of 0 \
204 CPU(s) would resolve to 0 workers; the cgroup \
205 would have no workers and downstream \
206 assertions would vacuously pass — narrow the \
207 cpuset, raise the fraction, or use \
208 `workers(N)` instead",
209 def.name,
210 );
211 }
212 // Fall-through for cpusets that resolve to empty
213 // without workers_pct — i.e. cases where the slice
214 // math (or topology shape) yields an empty BTreeSet
215 // even though `validate` accepts the spec. Examples:
216 // `Range { 0.0, 0.1 }` on a small usable set (the
217 // truncated `(len * 0.1) as usize` rounds to 0, see
218 // the `op_set_cpuset_narrow_to_empty_bails` test),
219 // or `Llc(N)` on a pathological topology where LLC
220 // N has no associated CPUs (memory-only NUMA node
221 // attached to a separate LLC). Cases like
222 // `Range { 0.0, 0.0 }` or `Disjoint { of: 0 }` do
223 // NOT reach this branch in `Op::SetCpuset` — they
224 // get rejected by validate first — but they DO
225 // reach this branch here because apply_setup runs
226 // resolve before validate (intentional: the Bundle
227 // H workers_pct diagnostic at the dual_work / pcts
228 // probes above needs to fire on empty-Exact +
229 // workers_pct combinations before validate's
230 // generic empty-Exact rejection preempts it). An
231 // empty cpuset reaching `set_cpuset` silently
232 // writes an empty mask to the cgroup; subsequent
233 // worker spawns get no CPUs and every CPU-pinned
234 // assertion vacuously passes. Bail here with the
235 // cpuset_spec context so the operator sees which
236 // spec resolved to empty and can adjust.
237 anyhow::bail!(
238 "cgroup '{}': cpuset_spec {:?} resolved to 0 \
239 CPU(s); the cgroup would have no CPUs assigned \
240 and downstream worker spawns would fail or \
241 produce vacuous assertions — adjust the spec \
242 so it resolves to a non-empty cpuset on this \
243 topology",
244 def.name,
245 cpuset_spec,
246 );
247 }
248 if let Err(reason) = cpuset_spec.validate(ctx) {
249 anyhow::bail!(
250 "cgroup '{}': CpusetSpec validation failed: {}",
251 def.name,
252 reason
253 );
254 }
255 ctx.cgroups.set_cpuset(&def.name, &resolved)?;
256 state.record_cpuset(&def.name, resolved);
257 }
258 Ok(())
259}
260
261/// Apply [`CgroupDef::cpuset_mems`] for `def`: write the NUMA node
262/// binding to `cpuset.mems`. No-op when `def.cpuset_mems` is `None`
263/// (the cgroup inherits the parent `cpuset.mems`).
264fn apply_cgroup_cpuset_mems(ctx: &Ctx, def: &CgroupDef) -> Result<()> {
265 if let Some(ref nodes) = def.cpuset_mems {
266 // The cpuset.mems write must succeed before any task
267 // moves into the cgroup; cpuset_update_task_spread will
268 // SIGKILL or fail allocations otherwise. Surfacing the
269 // error here (instead of at move_tasks time) lets the
270 // operator see the bad NUMA spec at setup, before the
271 // worker spawn pays its cost.
272 ctx.cgroups.set_cpuset_mems(&def.name, nodes)?;
273 }
274 Ok(())
275}
276
277/// Apply [`CgroupDef::cpu`] for `def`: validate `cpu.weight` against
278/// the kernel range, reject a zero `cpu.max` period / quota, and
279/// write `cpu.weight` + `cpu.max`. No-op when `def.cpu` is `None`
280/// (both kernel defaults stay in place).
281fn apply_cgroup_cpu(ctx: &Ctx, def: &CgroupDef) -> Result<()> {
282 if let Some(ref cpu) = def.cpu {
283 // cpu.weight: kernel range is 1..=10000 per
284 // Documentation/admin-guide/cgroup-v2.rst. Reject at
285 // setup so a 0 / 12000 from a typo fails fast instead
286 // of returning EINVAL from the kernel write.
287 if let Some(w) = cpu.weight {
288 if !(1..=10_000).contains(&w) {
289 anyhow::bail!(
290 "cgroup '{}': cpu.weight {w} out of range 1..=10000",
291 def.name,
292 );
293 }
294 ctx.cgroups.set_cpu_weight(&def.name, w)?;
295 }
296 // cpu.max: writing requires `+cpu` in subtree_control;
297 // CgroupManager::setup with enable_cpu_controller=true
298 // turns it on. quota=0 with period>0 would reject every
299 // schedule slice in the kernel; reject here with a
300 // clearer message.
301 if cpu.max_period_us == 0 {
302 anyhow::bail!("cgroup '{}': cpu.max period must be > 0 (got 0)", def.name,);
303 }
304 if let Some(q) = cpu.max_quota_us
305 && q == 0
306 {
307 anyhow::bail!(
308 "cgroup '{}': cpu.max quota must be > 0 when set; \
309 use cpu_unlimited() to remove the cap",
310 def.name,
311 );
312 }
313 // Always emit the cpu.max write so the period field is
314 // recorded even when quota is None. Aligns with the
315 // kernel's `"max <period>"` write format.
316 ctx.cgroups
317 .set_cpu_max(&def.name, cpu.max_quota_us, cpu.max_period_us)?;
318 }
319 Ok(())
320}
321
322/// Apply [`CgroupDef::memory`] for `def`: write `memory.max`,
323/// `memory.high`, `memory.low`, then (only when the user opted in)
324/// `memory.swap.max`. No-op when `def.memory` is `None` (all four
325/// stay at the kernel defaults).
326fn apply_cgroup_memory(ctx: &Ctx, def: &CgroupDef) -> Result<()> {
327 if let Some(ref mem) = def.memory {
328 // Order: max first, then high (high <= max is the
329 // operator-meaningful constraint per cgroup-v2 docs;
330 // kernel allows any ordering but writing max first
331 // means a high write fails clearly when high>max).
332 // swap_max is independent of the max/high/low triple
333 // and lands last in the memory block.
334 ctx.cgroups.set_memory_max(&def.name, mem.max)?;
335 ctx.cgroups.set_memory_high(&def.name, mem.high)?;
336 ctx.cgroups.set_memory_low(&def.name, mem.low)?;
337 // memory.swap.max only exists when the kernel was built
338 // with CONFIG_SWAP. On a swap-disabled kernel the file is
339 // absent and write returns ENOENT. Match the per-knob
340 // explicit-set semantics of the pids block: emit the
341 // write only when the user opted in via memory_swap_max
342 // / memory_swap_unlimited. swap_max=None means "the
343 // user never asked for a swap cap" — in that case the
344 // kernel default (unlimited on swap-enabled, no file on
345 // swap-disabled) is exactly what we want, and skipping
346 // the write keeps swap-disabled kernels viable for
347 // tests that just set memory_max.
348 if mem.swap_max.is_some() {
349 ctx.cgroups.set_memory_swap_max(&def.name, mem.swap_max)?;
350 }
351 }
352 Ok(())
353}
354
355/// Apply [`CgroupDef::io`] for `def`: validate `io.weight` against
356/// the kernel range and write it. No-op when `def.io` (or its
357/// `weight`) is `None` (the kernel default stays in place).
358fn apply_cgroup_io(ctx: &Ctx, def: &CgroupDef) -> Result<()> {
359 if let Some(ref io) = def.io
360 && let Some(w) = io.weight
361 {
362 if !(1..=10_000).contains(&w) {
363 anyhow::bail!(
364 "cgroup '{}': io.weight {w} out of range 1..=10000",
365 def.name,
366 );
367 }
368 ctx.cgroups.set_io_weight(&def.name, w)?;
369 }
370 Ok(())
371}
372
373/// Apply [`CgroupDef::pids`] for `def`: reject a zero `pids.max` and
374/// write `pids.max`. No-op when `def.pids` is `None` (no ceiling).
375fn apply_cgroup_pids(ctx: &Ctx, def: &CgroupDef) -> Result<()> {
376 if let Some(ref pids) = def.pids {
377 // pids.max: zero is a foot-cannon (no fork ever), so
378 // reject before the syscall — the kernel would accept
379 // 0 but the workload would silently halt every fork
380 // including the futex-helper threads spawned by some
381 // WorkType variants. There's no kernel sentinel for
382 // "no fork ever"; the explicit None path writes "max".
383 if let Some(0) = pids.max {
384 anyhow::bail!(
385 "cgroup '{}': pids.max must be > 0; use \
386 pids_unlimited() to remove the cap",
387 def.name,
388 );
389 }
390 ctx.cgroups.set_pids_max(&def.name, pids.max)?;
391 }
392 Ok(())
393}
394
395/// Resolve the effective worker specs for `def`: merge cgroup-level
396/// defaults into each `WorkSpec`, validate each work's mempolicy
397/// (and the mempolicy-vs-cpuset compatibility), then resolve
398/// `workers_pct` / `num_workers` / `work_type` / `affinity` into a
399/// concrete `WorkSpec` per entry. Returns the resolved specs in
400/// declaration order for [`spawn_def_workers`] to spawn.
401fn resolve_def_works(
402 ctx: &Ctx,
403 state: &ScenarioState<'_, '_>,
404 def: &CgroupDef,
405) -> Result<Vec<crate::workload::WorkSpec>> {
406 // Materialize the per-WorkSpec values with cgroup-level
407 // defaults merged in. `merged_works` substitutes a single
408 // `WorkSpec::default()` when `def.works` is empty (matching
409 // the historical default-substitution rule pinned by
410 // `apply_setup_substitutes_default_workspec_when_works_empty`)
411 // and merges `default_nice` / `default_comm` / `default_uid`
412 // / `default_gid` / `default_numa_node` into each WorkSpec
413 // whose own field is unset, regardless of the order builder
414 // methods were called in. This is what makes
415 // `def.nice(5).work(spec)` and `def.work(spec).nice(5)`
416 // equivalent. `pcomm` lives ONLY on `WorkSpec`; the
417 // `CgroupDef::pcomm` builder writes it into every WorkSpec
418 // directly, so the per-WorkSpec value below is the
419 // authoritative source for the pcomm dispatch.
420 let effective_works = def.merged_works();
421 for work in &effective_works {
422 if let Err(reason) = work.mem_policy.validate() {
423 anyhow::bail!("cgroup '{}': {}", def.name, reason);
424 }
425 }
426 // Clone the cpuset out so we don't keep a borrow into
427 // `state` across the mutable spawn calls below.
428 let cgroup_cpuset: Option<BTreeSet<usize>> = state.lookup_cpuset(&def.name).cloned();
429 if let Some(ref resolved) = cgroup_cpuset {
430 for work in &effective_works {
431 validate_mempolicy_cpuset(&work.mem_policy, work.mpol_flags, resolved, ctx, &def.name)?;
432 }
433 }
434 // Per-WorkSpec pcomm dispatch. A WorkSpec with `pcomm =
435 // Some(value)` joins a thread-group leader keyed on
436 // `value`: every WorkSpec sharing the same `pcomm` value
437 // coalesces into ONE forked leader per group, and every
438 // thread inside the leader observes
439 // `task->group_leader->comm == pcomm`. WorkSpecs with
440 // `pcomm = None` (or an empty pcomm string, which is
441 // treated as `None`) spawn via the conventional fork path
442 // — one process per worker.
443 //
444 // Coalescing key: the pcomm string itself. Different pcomm
445 // values inside the same CgroupDef produce different
446 // leaders (more flexible than rejecting heterogeneity, and
447 // matches the "model real workloads like `chrome` next to
448 // `java` in one cgroup" use case).
449 //
450 // Resolve every WorkSpec's `num_workers` / `work_type` /
451 // `affinity` once up front; the same triple is used by
452 // both dispatch paths and the resolution context (ctx,
453 // cgroup_cpuset) is identical for every WorkSpec inside
454 // this CgroupDef.
455 let mut resolved_works: Vec<crate::workload::WorkSpec> =
456 Vec::with_capacity(effective_works.len());
457 for work in &effective_works {
458 // Resolve `workers_pct` against the cgroup's cpuset (or
459 // the topology-usable cpuset when the cgroup inherits)
460 // and synthesize a `num_workers` value before the rest of
461 // the dispatch. Shares the resolution helper with
462 // Op::Spawn(SpawnPlacement::Cgroup) so the two paths
463 // produce identical worker counts for the same
464 // `(pct, cpuset_size)` pair.
465 let cpuset_size = cgroup_cpuset
466 .as_ref()
467 .map_or_else(|| ctx.topo.usable_cpuset().len(), |s| s.len());
468 let work = work.clone().resolve_workers_pct(cpuset_size, &def.name)?;
469 let n = crate::scenario::resolve_num_workers(&work, ctx.workers_per_cgroup, &def.name)?;
470 let effective_work_type = crate::workload::resolve_work_type(
471 &work.work_type,
472 ctx.work_type_override.as_ref(),
473 def.swappable,
474 n,
475 );
476 let affinity =
477 crate::scenario::intent_for_spawn(&work.affinity, cgroup_cpuset.as_ref(), ctx.topo)?;
478 resolved_works.push(crate::workload::WorkSpec {
479 work_type: effective_work_type,
480 sched_policy: work.sched_policy,
481 num_workers: Some(n),
482 affinity,
483 mem_policy: work.mem_policy.clone(),
484 mpol_flags: work.mpol_flags,
485 nice: work.nice,
486 comm: work.comm.clone(),
487 pcomm: work.pcomm.clone(),
488 uid: work.uid,
489 gid: work.gid,
490 numa_node: work.numa_node,
491 workers_pct: None,
492 });
493 }
494 Ok(resolved_works)
495}
496
497/// Spawn the synthetic worker handles for `def` from the
498/// already-resolved `resolved_works`: partition by `pcomm`, spawn
499/// the non-pcomm WorkSpecs via the conventional fork path, then
500/// spawn one thread-group leader per unique `pcomm` value. Each
501/// spawned handle is moved into `def`'s cgroup, started, and pushed
502/// onto `state`'s target handle vec.
503fn spawn_def_workers(
504 ctx: &Ctx,
505 state: &mut ScenarioState<'_, '_>,
506 def: &CgroupDef,
507 resolved_works: Vec<crate::workload::WorkSpec>,
508) -> Result<()> {
509 // Partition by pcomm value. `pcomm_groups` is keyed on the
510 // pcomm string; insertion order tracked via a parallel
511 // `pcomm_order` vec so the spawn order is stable
512 // (BTreeMap iteration would reorder by string sort).
513 let mut pcomm_groups: std::collections::HashMap<String, Vec<crate::workload::WorkSpec>> =
514 std::collections::HashMap::new();
515 let mut pcomm_order: Vec<String> = Vec::new();
516 let mut non_pcomm_works: Vec<crate::workload::WorkSpec> = Vec::new();
517 for work in resolved_works {
518 match &work.pcomm {
519 Some(value) if !value.is_empty() => {
520 let key = value.to_string();
521 if !pcomm_groups.contains_key(&key) {
522 pcomm_order.push(key.clone());
523 }
524 pcomm_groups.entry(key).or_default().push(work);
525 }
526 _ => non_pcomm_works.push(work),
527 }
528 }
529
530 // Spawn non-pcomm WorkSpecs via the conventional fork path
531 // (one WorkloadHandle per WorkSpec, one move_tasks call
532 // per spawn).
533 for work in non_pcomm_works {
534 let n = work.num_workers.expect("num_workers resolved above");
535 // `for_scenario_engine` pins `clone_mode = Fork` in the
536 // constructor body so the previously-needed
537 // `assert_eq!(wl.clone_mode, Fork)` collapses into the
538 // type system. Move_tasks below relies on Fork semantics:
539 // each worker is its own tgid leader, so writing a
540 // worker's TID to `<cgroup>/cgroup.procs` migrates only
541 // that worker. Thread mode would drag the scenario
542 // runner's tgid into def.name's cgroup per
543 // kernel/cgroup/cgroup.c::cgroup_procs_write_start.
544 let wl = WorkloadConfig::for_scenario_engine(
545 &work,
546 n,
547 work.affinity.clone(),
548 work.work_type.clone(),
549 )?;
550 tracing::debug!(
551 cgroup = %def.name,
552 expected_workers = n,
553 comm = ?work.comm,
554 work_type = ?work.work_type,
555 "apply_setup: about to spawn non-pcomm workers"
556 );
557 let mut h = WorkloadHandle::spawn(&wl)?;
558 let pids = h.worker_pids();
559 // Append the placement record to a guest-side log file.
560 // Empirical reruns ruled out every stderr/serial route
561 // for surfacing apply_setup diagnostics in libtest's
562 // captured stderr (tracing::*/eprintln drop in the pre-
563 // multiport-handshake window; COM1/COM2 PIO writes do
564 // not surface either — the host's test-mode serial
565 // capture only scans for panic-prefixed lines). A
566 // tmpfs file is the established pattern in this test
567 // module (the shell probe at cgroup_ops_placement_e2e
568 // already writes /tmp/ktstr-cgroup-ops-procs and the
569 // test body reads it on assertion failure).
570 append_placement_log(&format!(
571 "apply_setup: spawned non-pcomm workers cgroup={} count={} pids={:?}",
572 def.name,
573 pids.len(),
574 pids,
575 ));
576 ctx.cgroups.move_tasks(&def.name, &pids)?;
577 h.start();
578 state.target_handles().push((def.name.to_string(), h));
579 }
580
581 // Spawn one thread-group leader per unique pcomm value.
582 // Each leader hosts every WorkSpec that shares its pcomm.
583 for pcomm in pcomm_order {
584 let works_for_pcomm = pcomm_groups
585 .remove(&pcomm)
586 .expect("pcomm key inserted during partition pass");
587 // glibc setresuid/setresgid broadcasts via NPTL signalling
588 // to every thread in the tgid; coalesced WorkSpecs that
589 // disagree on uid/gid would race the leader's credentials
590 // out from under the other group's threads. Reject mixed
591 // values upfront so the misconfiguration surfaces here
592 // rather than as a runtime credential flap.
593 if works_for_pcomm.len() > 1 {
594 let first_uid = works_for_pcomm[0].uid;
595 let first_gid = works_for_pcomm[0].gid;
596 for (i, w) in works_for_pcomm.iter().enumerate().skip(1) {
597 if w.uid != first_uid {
598 anyhow::bail!(
599 "cgroup '{}' pcomm '{}': WorkSpec[0].uid={:?} differs from \
600 WorkSpec[{}].uid={:?}; pcomm-coalesced WorkSpecs must \
601 agree on uid (NPTL setresuid is broadcast to every thread \
602 in the tgid)",
603 def.name,
604 pcomm,
605 first_uid,
606 i,
607 w.uid,
608 );
609 }
610 if w.gid != first_gid {
611 anyhow::bail!(
612 "cgroup '{}' pcomm '{}': WorkSpec[0].gid={:?} differs from \
613 WorkSpec[{}].gid={:?}; pcomm-coalesced WorkSpecs must \
614 agree on gid (NPTL setresgid is broadcast to every thread \
615 in the tgid)",
616 def.name,
617 pcomm,
618 first_gid,
619 i,
620 w.gid,
621 );
622 }
623 }
624 }
625 // Container-leader credentials. Fall back to the first
626 // WorkSpec's uid/gid when no CgroupDef-level default is
627 // set: glibc's `setresuid` is broadcast to every thread
628 // in the tgid via NPTL signalling, so a worker thread's
629 // setresuid would eventually drop the leader's
630 // credentials anyway. Pre-applying it on the leader
631 // closes the root-uid window between fork and the first
632 // worker's setresuid call. When the WorkSpec also has
633 // `uid = None` the container stays at the parent's
634 // credentials (root in the test harness, the harness's
635 // euid otherwise) — the WorkSpec's lack-of-uid means
636 // "inherit the parent" anyway.
637 let container_uid = def
638 .default_uid
639 .or_else(|| works_for_pcomm.first().and_then(|w| w.uid));
640 let container_gid = def
641 .default_gid
642 .or_else(|| works_for_pcomm.first().and_then(|w| w.gid));
643 tracing::debug!(
644 cgroup = %def.name,
645 pcomm = %pcomm,
646 workspec_count = works_for_pcomm.len(),
647 "apply_setup: about to spawn pcomm-coalesced workers"
648 );
649 let mut h = WorkloadHandle::spawn_pcomm_cgroup(
650 &pcomm,
651 container_uid,
652 container_gid,
653 &works_for_pcomm,
654 )?;
655 let pids = h.worker_pids();
656 // Append the placement record to a guest-side log file —
657 // see the non-pcomm sibling above for the routing
658 // rationale.
659 append_placement_log(&format!(
660 "apply_setup: spawned pcomm workers cgroup={} pcomm={} count={} pids={:?}",
661 def.name,
662 pcomm,
663 pids.len(),
664 pids,
665 ));
666 ctx.cgroups.move_tasks(&def.name, &pids)?;
667 tracing::debug!(
668 cgroup = %def.name,
669 pcomm = %pcomm,
670 "apply_setup: move_tasks succeeded; about to h.start()"
671 );
672 h.start();
673 tracing::debug!(
674 cgroup = %def.name,
675 pcomm = %pcomm,
676 "apply_setup: h.start() returned; handle pushed"
677 );
678 state.target_handles().push((def.name.to_string(), h));
679 }
680 Ok(())
681}
682
683/// Spawn the optional userspace payload for `def` inside its cgroup,
684/// after the synthetic worker handles are already in place. No-op
685/// when `def.payload` is `None`. Rejects a duplicate payload already
686/// live in the same cgroup; on success pushes a [`PayloadEntry`]
687/// onto `state`'s target payload-handle vec.
688fn spawn_def_payload(ctx: &Ctx, state: &mut ScenarioState<'_, '_>, def: &CgroupDef) -> Result<()> {
689 // After synthetic workers are in place, spawn the optional
690 // userspace payload inside the same cgroup. The payload runs
691 // concurrently with the WorkSpec groups; its metrics are recorded
692 // to the sidecar via the guest-to-host SHM ring when the
693 // handle is killed at step-teardown. Spawning after the WorkSpec
694 // handles lets the cgroup cpuset + mempolicy settle first so
695 // the binary inherits a stable placement.
696 if let Some(payload) = def.payload {
697 // Composite-key dedup: the same payload CAN live in a
698 // different cgroup, but two copies in THIS cgroup would
699 // collide on teardown (one handle masks the other in
700 // the sidecar). Reject upfront with the same error
701 // shape as the Op::RunPayload path.
702 if let Some(existing) = state.find_live_payload_with_cgroup(payload.name, def.name.as_ref())
703 {
704 anyhow::bail!(
705 "CgroupDef::workload: payload '{}' already running in cgroup '{}' (spawned by {}) — \
706 declare it in exactly one place per cgroup",
707 payload.name,
708 def.name,
709 existing.source.describe(),
710 );
711 }
712 let handle = crate::scenario::payload_run::PayloadRun::new(ctx, payload)
713 .in_cgroup(def.name.clone())
714 .spawn()
715 .map_err(|e| {
716 anyhow::anyhow!(
717 "cgroup '{}': spawn payload '{}': {:#}",
718 def.name,
719 payload.name,
720 e,
721 )
722 })?;
723 state.target_payload_handles().push(PayloadEntry {
724 cgroup: def.name.to_string(),
725 source: PayloadSource::CgroupDefWorkload,
726 handle,
727 });
728 }
729 Ok(())
730}
731
732/// Helper for [`apply_setup`]: spawn the host-mode stall monitor
733/// against every step-local worker pid the just-completed setup
734/// pass left in [`super::StepState::handles`]. Idempotent —
735/// re-invocation when the monitor is already started is a no-op.
736///
737/// The set of pids is snapshot once at start time; workers added
738/// by subsequent ops (e.g. an `Op::Spawn` later in the same step)
739/// are NOT picked up by the running thread. This matches the
740/// dominant CgroupDef workload shape (every worker for the step
741/// is spawned in one apply_setup pass) and avoids the
742/// add-while-polling synchronization complexity of a dynamic
743/// pid set.
744pub(super) fn maybe_start_stall_monitor(state: &mut ScenarioState<'_, '_>) {
745 // Backdrop-setup pass: monitor is scoped to the step, not the
746 // backdrop. Skip and let the next per-step apply_setup install
747 // it if/when step-local workers spawn.
748 if state.target_backdrop {
749 return;
750 }
751 // Already started for this step.
752 if state.step.stall_monitor.is_some() {
753 return;
754 }
755 // VM-side scenarios use the freeze coordinator's stall plumbing,
756 // not the host-mode poller. cargo_test_mode runs the harness
757 // itself in the same /proc namespace, so a poller would see
758 // unrelated harness threads.
759 if guest_comms::is_guest() || crate::cargo_test_mode::cargo_test_mode_active() {
760 return;
761 }
762 // Collect every worker pid in scope — step-local handles
763 // spawned by the just-completed apply_setup pass AND
764 // backdrop-owned handles that survive across steps. The
765 // monitor's lifetime is per-step (re-installed at each step
766 // boundary), but the pid set must include backdrop workers
767 // because a stalled backdrop worker (e.g. a long-running
768 // payload pinned to a contended cpuset) would otherwise be
769 // silent across the entire scenario.
770 let pids: Vec<libc::pid_t> = state
771 .step
772 .handles
773 .iter()
774 .chain(state.backdrop.handles.iter())
775 .flat_map(|(_, h)| h.worker_pids())
776 .collect();
777 if pids.is_empty() {
778 return;
779 }
780 match crate::scenario::host_stall::spawn_monitor(&pids) {
781 Ok(handle) => {
782 state.step.stall_monitor = Some(handle);
783 }
784 Err(e) => {
785 // Spawn failure is non-fatal — the scenario itself can
786 // still run and report worker results. Surface the
787 // defect via tracing so an operator can spot the
788 // missing stall coverage.
789 tracing::warn!(err = %format!("{e:#}"), "host_stall::spawn_monitor failed; stall monitor disabled for this step");
790 }
791 }
792}