ktstr/scenario/ops/
setup.rs

1//! Setup-phase orchestration for [`crate::scenario::ops::Op`]-driven scenarios.
2//!
3//! [`apply_setup`] runs the [`CgroupDef`] passes that materialise each
4//! declared cgroup, resolve its cpuset / cpu / memory / io / pids
5//! controllers, partition `WorkSpec`s by `pcomm`, spawn worker handles,
6//! and start any [`CgroupDef::workload`] payload. After the per-def
7//! loop finishes it triggers [`maybe_start_stall_monitor`] for the
8//! host-mode worker-stall poller.
9//!
10//! Sibling to [`super::apply_ops`]; both mutate the same
11//! [`super::ScenarioState`] view over step-local + backdrop state.
12
13use std::collections::BTreeSet;
14
15use anyhow::Result;
16
17use crate::scenario::Ctx;
18use crate::vmm::guest_comms;
19use crate::workload::{WorkloadConfig, WorkloadHandle};
20
21use super::{CgroupDef, PayloadEntry, PayloadSource, ScenarioState, validate_mempolicy_cpuset};
22
23/// Path of the scenario placement-history log. Captures placement
24/// events from both `apply_setup` (worker / payload spawn) and
25/// `apply_ops` (`Op::MoveAllTasks` pre/post cgroup.procs snapshots).
26/// Tests that assert on per-cgroup worker placement can read this
27/// file on failure to see exactly which PIDs landed in which cgroup
28/// at spawn time AND how `Op::MoveAllTasks` migrated them. The file
29/// lives in tmpfs and survives the scenario teardown (only the
30/// cgroup directories are rmdir'd; this file stays put). The log is
31/// append-only and is never cleared between runs (the only writer,
32/// `append_placement_log`, opens with `.append(true)` and never
33/// truncates), so lines from prior runs accumulate — a reader that
34/// needs only the current run's placement must filter accordingly.
35pub const PLACEMENT_LOG_PATH: &str = "/tmp/ktstr-placement.log";
36
37/// Append `msg` (with a trailing newline) to the placement-history
38/// log. See [`PLACEMENT_LOG_PATH`] for the transport rationale
39/// (every guest-side stderr/serial route was empirically confirmed
40/// to drop scenario-era diagnostic bytes in test mode). Errors are
41/// swallowed because this is a best-effort diagnostic — if the
42/// file write fails, the placement record just goes missing and the
43/// operator falls back to the existing post-mortem cgroup.procs
44/// snapshot.
45pub(super) fn append_placement_log(msg: &str) {
46    use std::io::Write;
47    if let Ok(mut f) = std::fs::OpenOptions::new()
48        .create(true)
49        .append(true)
50        .open(PLACEMENT_LOG_PATH)
51    {
52        let _ = writeln!(f, "{msg}");
53    }
54}
55
56/// Walk every [`CgroupDef`] in `defs` and register it against the
57/// step or backdrop cgroup slot selected by `state.target_backdrop`.
58/// A duplicate name (already tracked by either state) bails — a
59/// [`CgroupDef`] must not silently shadow a cgroup that another state
60/// slot has already created.
61///
62/// Each `CgroupDef`'s `works` vec is iterated, spawning one
63/// [`WorkloadHandle`] per [`crate::workload::WorkSpec`] entry. Multiple
64/// `WorkSpec`s for the same cgroup produce multiple handle entries with
65/// the same name key; ops that filter by cgroup name (`StopCgroup`,
66/// `SetAffinity`, etc.) naturally apply to all of them. When `works`
67/// is empty, a single default `WorkSpec` is used (`SpinWait`, `Normal`,
68/// `ctx.workers_per_cgroup` workers). Cgroups created here route into
69/// step-local or backdrop state per the `target_backdrop` flag.
70pub(super) fn apply_setup(
71    ctx: &Ctx,
72    state: &mut ScenarioState<'_, '_>,
73    defs: &[CgroupDef],
74) -> Result<()> {
75    for def in defs {
76        if state.cgroup_name_is_tracked(&def.name) {
77            anyhow::bail!(
78                "CgroupDef '{}' collides with a cgroup already tracked \
79                 (by a prior Backdrop or step-local CgroupDef) — declare it \
80                 in exactly one place; use a fresh name for the step-local cgroup",
81                def.name,
82            );
83        }
84        state.target_cgroups().add_cgroup_no_cpuset(&def.name)?;
85        // Per-controller application, in the order the kernel docs and
86        // the operator-meaningful constraints want: cpuset (sets the
87        // CPUs every later task inherits) before cpuset.mems (NUMA
88        // node binding) before cpu (weight + cpu.max) before memory
89        // (max/high/low/swap) before io (weight) before pids
90        // (pids.max). Each helper is a no-op early-return when its
91        // `def.<controller>` field is `None`.
92        apply_cgroup_cpuset(ctx, state, def)?;
93        apply_cgroup_cpuset_mems(ctx, def)?;
94        apply_cgroup_cpu(ctx, def)?;
95        apply_cgroup_memory(ctx, def)?;
96        apply_cgroup_io(ctx, def)?;
97        apply_cgroup_pids(ctx, def)?;
98        // Materialize the per-WorkSpec values with cgroup-level
99        // defaults merged in, resolve workers_pct / num_workers /
100        // work_type / affinity, then spawn the synthetic worker
101        // handles and the optional userspace payload.
102        let resolved_works = resolve_def_works(ctx, state, def)?;
103        spawn_def_workers(ctx, state, def, resolved_works)?;
104        spawn_def_payload(ctx, state, def)?;
105    }
106    // Start the host-mode stall monitor once we've spawned workers
107    // for the first apply_setup in this step. Skip when:
108    // - running inside the guest (the VM-side freeze coordinator
109    //   owns stall detection there; the host-mode poller would
110    //   read its own guest /proc/sched, which has no relevance);
111    // - running under cargo_test_mode (in-process VMM tests share
112    //   the host's /proc with the test harness itself and a poller
113    //   would observe the harness threads, not the scenario);
114    // - the apply_setup pass is routing to the Backdrop slot
115    //   (Backdrop workers are scenario-persistent; the per-step
116    //   monitor's lifetime would be wrong for them, and a future
117    //   task can add a parallel Backdrop monitor if needed);
118    // - the monitor is already started for this StepState (an
119    //   `Op::AddCgroupDef` that re-enters apply_setup mid-step
120    //   should not spawn a second poller — the existing one
121    //   already runs);
122    // - no workers exist (degenerate scenarios with all worker
123    //   spawns deferred to apply_ops or with zero spawns at all).
124    maybe_start_stall_monitor(state);
125    Ok(())
126}
127
128/// Apply [`CgroupDef::cpuset`] for `def`: resolve the spec against
129/// the topology, surface the workers_pct / empty-cpuset diagnostics,
130/// validate, write the cpuset, and record it on `state`. No-op when
131/// `def.cpuset` is `None` (the cgroup inherits the parent cpuset).
132fn apply_cgroup_cpuset(
133    ctx: &Ctx,
134    state: &mut ScenarioState<'_, '_>,
135    def: &CgroupDef,
136) -> Result<()> {
137    if let Some(ref cpuset_spec) = def.cpuset {
138        let resolved = cpuset_spec.resolve_quiet(ctx);
139        // workers_pct + empty cpuset combinations produce more
140        // actionable diagnostics than the generic CpusetSpec
141        // empty-mask rejection — surface them here before
142        // validate's broader empty-Exact reject preempts the
143        // per-pct context.
144        //
145        // Two distinct empty-cpuset misconfigurations:
146        //
147        //   (1) any WorkSpec sets BOTH workers(N) and
148        //       workers_pct(P): the dual-set is the more
149        //       fundamental error and must be resolved before
150        //       the cpuset semantics matter. Surface "BOTH
151        //       workers ... workers_pct" here rather than letting
152        //       validate's empty-mask rejection mask it.
153        //
154        //   (2) one or more WorkSpecs set workers_pct only:
155        //       enumerate every configured pct value so a
156        //       multi-WorkSpec cgroup doesn't silently drop all
157        //       but the first.
158        if resolved.is_empty() {
159            let works = def.merged_works();
160            if let Some(dual_work) = works
161                .iter()
162                .find(|w| w.workers_pct.is_some() && w.num_workers.is_some())
163            {
164                let n = dual_work
165                    .num_workers
166                    .expect("dual_work selected via num_workers.is_some()");
167                let pct = dual_work
168                    .workers_pct
169                    .expect("dual_work selected via workers_pct.is_some()");
170                anyhow::bail!(
171                    "cgroup '{}': WorkSpec sets BOTH workers({n}) \
172                     and workers_pct({pct}); pick one — \
173                     workers_pct resolves the cpuset fraction at \
174                     apply-setup time and is incompatible with an \
175                     explicit count. The empty cpuset would \
176                     otherwise mask this conflict; resolve the \
177                     workers/workers_pct conflict first",
178                    def.name,
179                );
180            }
181            let pcts: Vec<(usize, f64)> = works
182                .iter()
183                .enumerate()
184                .filter_map(|(i, w)| w.workers_pct.map(|p| (i, p)))
185                .collect();
186            if !pcts.is_empty() {
187                let pct_display = if pcts.len() == 1 {
188                    format!("workers_pct({})", pcts[0].1)
189                } else {
190                    // Include positional indices so the operator
191                    // can disambiguate when the same fraction is
192                    // configured on multiple WorkSpecs (e.g.
193                    // `[works[0]=0.5, works[2]=0.5]` shows which
194                    // entries to adjust without grepping the test).
195                    let list = pcts
196                        .iter()
197                        .map(|(i, p)| format!("works[{i}]={p}"))
198                        .collect::<Vec<_>>()
199                        .join(", ");
200                    format!("workers_pct values [{list}]")
201                };
202                anyhow::bail!(
203                    "cgroup '{}': {pct_display} on a cpuset of 0 \
204                     CPU(s) would resolve to 0 workers; the cgroup \
205                     would have no workers and downstream \
206                     assertions would vacuously pass — narrow the \
207                     cpuset, raise the fraction, or use \
208                     `workers(N)` instead",
209                    def.name,
210                );
211            }
212            // Fall-through for cpusets that resolve to empty
213            // without workers_pct — i.e. cases where the slice
214            // math (or topology shape) yields an empty BTreeSet
215            // even though `validate` accepts the spec. Examples:
216            // `Range { 0.0, 0.1 }` on a small usable set (the
217            // truncated `(len * 0.1) as usize` rounds to 0, see
218            // the `op_set_cpuset_narrow_to_empty_bails` test),
219            // or `Llc(N)` on a pathological topology where LLC
220            // N has no associated CPUs (memory-only NUMA node
221            // attached to a separate LLC). Cases like
222            // `Range { 0.0, 0.0 }` or `Disjoint { of: 0 }` do
223            // NOT reach this branch in `Op::SetCpuset` — they
224            // get rejected by validate first — but they DO
225            // reach this branch here because apply_setup runs
226            // resolve before validate (intentional: the Bundle
227            // H workers_pct diagnostic at the dual_work / pcts
228            // probes above needs to fire on empty-Exact +
229            // workers_pct combinations before validate's
230            // generic empty-Exact rejection preempts it). An
231            // empty cpuset reaching `set_cpuset` silently
232            // writes an empty mask to the cgroup; subsequent
233            // worker spawns get no CPUs and every CPU-pinned
234            // assertion vacuously passes. Bail here with the
235            // cpuset_spec context so the operator sees which
236            // spec resolved to empty and can adjust.
237            anyhow::bail!(
238                "cgroup '{}': cpuset_spec {:?} resolved to 0 \
239                 CPU(s); the cgroup would have no CPUs assigned \
240                 and downstream worker spawns would fail or \
241                 produce vacuous assertions — adjust the spec \
242                 so it resolves to a non-empty cpuset on this \
243                 topology",
244                def.name,
245                cpuset_spec,
246            );
247        }
248        if let Err(reason) = cpuset_spec.validate(ctx) {
249            anyhow::bail!(
250                "cgroup '{}': CpusetSpec validation failed: {}",
251                def.name,
252                reason
253            );
254        }
255        ctx.cgroups.set_cpuset(&def.name, &resolved)?;
256        state.record_cpuset(&def.name, resolved);
257    }
258    Ok(())
259}
260
261/// Apply [`CgroupDef::cpuset_mems`] for `def`: write the NUMA node
262/// binding to `cpuset.mems`. No-op when `def.cpuset_mems` is `None`
263/// (the cgroup inherits the parent `cpuset.mems`).
264fn apply_cgroup_cpuset_mems(ctx: &Ctx, def: &CgroupDef) -> Result<()> {
265    if let Some(ref nodes) = def.cpuset_mems {
266        // The cpuset.mems write must succeed before any task
267        // moves into the cgroup; cpuset_update_task_spread will
268        // SIGKILL or fail allocations otherwise. Surfacing the
269        // error here (instead of at move_tasks time) lets the
270        // operator see the bad NUMA spec at setup, before the
271        // worker spawn pays its cost.
272        ctx.cgroups.set_cpuset_mems(&def.name, nodes)?;
273    }
274    Ok(())
275}
276
277/// Apply [`CgroupDef::cpu`] for `def`: validate `cpu.weight` against
278/// the kernel range, reject a zero `cpu.max` period / quota, and
279/// write `cpu.weight` + `cpu.max`. No-op when `def.cpu` is `None`
280/// (both kernel defaults stay in place).
281fn apply_cgroup_cpu(ctx: &Ctx, def: &CgroupDef) -> Result<()> {
282    if let Some(ref cpu) = def.cpu {
283        // cpu.weight: kernel range is 1..=10000 per
284        // Documentation/admin-guide/cgroup-v2.rst. Reject at
285        // setup so a 0 / 12000 from a typo fails fast instead
286        // of returning EINVAL from the kernel write.
287        if let Some(w) = cpu.weight {
288            if !(1..=10_000).contains(&w) {
289                anyhow::bail!(
290                    "cgroup '{}': cpu.weight {w} out of range 1..=10000",
291                    def.name,
292                );
293            }
294            ctx.cgroups.set_cpu_weight(&def.name, w)?;
295        }
296        // cpu.max: writing requires `+cpu` in subtree_control;
297        // CgroupManager::setup with enable_cpu_controller=true
298        // turns it on. quota=0 with period>0 would reject every
299        // schedule slice in the kernel; reject here with a
300        // clearer message.
301        if cpu.max_period_us == 0 {
302            anyhow::bail!("cgroup '{}': cpu.max period must be > 0 (got 0)", def.name,);
303        }
304        if let Some(q) = cpu.max_quota_us
305            && q == 0
306        {
307            anyhow::bail!(
308                "cgroup '{}': cpu.max quota must be > 0 when set; \
309                 use cpu_unlimited() to remove the cap",
310                def.name,
311            );
312        }
313        // Always emit the cpu.max write so the period field is
314        // recorded even when quota is None. Aligns with the
315        // kernel's `"max <period>"` write format.
316        ctx.cgroups
317            .set_cpu_max(&def.name, cpu.max_quota_us, cpu.max_period_us)?;
318    }
319    Ok(())
320}
321
322/// Apply [`CgroupDef::memory`] for `def`: write `memory.max`,
323/// `memory.high`, `memory.low`, then (only when the user opted in)
324/// `memory.swap.max`. No-op when `def.memory` is `None` (all four
325/// stay at the kernel defaults).
326fn apply_cgroup_memory(ctx: &Ctx, def: &CgroupDef) -> Result<()> {
327    if let Some(ref mem) = def.memory {
328        // Order: max first, then high (high <= max is the
329        // operator-meaningful constraint per cgroup-v2 docs;
330        // kernel allows any ordering but writing max first
331        // means a high write fails clearly when high>max).
332        // swap_max is independent of the max/high/low triple
333        // and lands last in the memory block.
334        ctx.cgroups.set_memory_max(&def.name, mem.max)?;
335        ctx.cgroups.set_memory_high(&def.name, mem.high)?;
336        ctx.cgroups.set_memory_low(&def.name, mem.low)?;
337        // memory.swap.max only exists when the kernel was built
338        // with CONFIG_SWAP. On a swap-disabled kernel the file is
339        // absent and write returns ENOENT. Match the per-knob
340        // explicit-set semantics of the pids block: emit the
341        // write only when the user opted in via memory_swap_max
342        // / memory_swap_unlimited. swap_max=None means "the
343        // user never asked for a swap cap" — in that case the
344        // kernel default (unlimited on swap-enabled, no file on
345        // swap-disabled) is exactly what we want, and skipping
346        // the write keeps swap-disabled kernels viable for
347        // tests that just set memory_max.
348        if mem.swap_max.is_some() {
349            ctx.cgroups.set_memory_swap_max(&def.name, mem.swap_max)?;
350        }
351    }
352    Ok(())
353}
354
355/// Apply [`CgroupDef::io`] for `def`: validate `io.weight` against
356/// the kernel range and write it. No-op when `def.io` (or its
357/// `weight`) is `None` (the kernel default stays in place).
358fn apply_cgroup_io(ctx: &Ctx, def: &CgroupDef) -> Result<()> {
359    if let Some(ref io) = def.io
360        && let Some(w) = io.weight
361    {
362        if !(1..=10_000).contains(&w) {
363            anyhow::bail!(
364                "cgroup '{}': io.weight {w} out of range 1..=10000",
365                def.name,
366            );
367        }
368        ctx.cgroups.set_io_weight(&def.name, w)?;
369    }
370    Ok(())
371}
372
373/// Apply [`CgroupDef::pids`] for `def`: reject a zero `pids.max` and
374/// write `pids.max`. No-op when `def.pids` is `None` (no ceiling).
375fn apply_cgroup_pids(ctx: &Ctx, def: &CgroupDef) -> Result<()> {
376    if let Some(ref pids) = def.pids {
377        // pids.max: zero is a foot-cannon (no fork ever), so
378        // reject before the syscall — the kernel would accept
379        // 0 but the workload would silently halt every fork
380        // including the futex-helper threads spawned by some
381        // WorkType variants. There's no kernel sentinel for
382        // "no fork ever"; the explicit None path writes "max".
383        if let Some(0) = pids.max {
384            anyhow::bail!(
385                "cgroup '{}': pids.max must be > 0; use \
386                 pids_unlimited() to remove the cap",
387                def.name,
388            );
389        }
390        ctx.cgroups.set_pids_max(&def.name, pids.max)?;
391    }
392    Ok(())
393}
394
395/// Resolve the effective worker specs for `def`: merge cgroup-level
396/// defaults into each `WorkSpec`, validate each work's mempolicy
397/// (and the mempolicy-vs-cpuset compatibility), then resolve
398/// `workers_pct` / `num_workers` / `work_type` / `affinity` into a
399/// concrete `WorkSpec` per entry. Returns the resolved specs in
400/// declaration order for [`spawn_def_workers`] to spawn.
401fn resolve_def_works(
402    ctx: &Ctx,
403    state: &ScenarioState<'_, '_>,
404    def: &CgroupDef,
405) -> Result<Vec<crate::workload::WorkSpec>> {
406    // Materialize the per-WorkSpec values with cgroup-level
407    // defaults merged in. `merged_works` substitutes a single
408    // `WorkSpec::default()` when `def.works` is empty (matching
409    // the historical default-substitution rule pinned by
410    // `apply_setup_substitutes_default_workspec_when_works_empty`)
411    // and merges `default_nice` / `default_comm` / `default_uid`
412    // / `default_gid` / `default_numa_node` into each WorkSpec
413    // whose own field is unset, regardless of the order builder
414    // methods were called in. This is what makes
415    // `def.nice(5).work(spec)` and `def.work(spec).nice(5)`
416    // equivalent. `pcomm` lives ONLY on `WorkSpec`; the
417    // `CgroupDef::pcomm` builder writes it into every WorkSpec
418    // directly, so the per-WorkSpec value below is the
419    // authoritative source for the pcomm dispatch.
420    let effective_works = def.merged_works();
421    for work in &effective_works {
422        if let Err(reason) = work.mem_policy.validate() {
423            anyhow::bail!("cgroup '{}': {}", def.name, reason);
424        }
425    }
426    // Clone the cpuset out so we don't keep a borrow into
427    // `state` across the mutable spawn calls below.
428    let cgroup_cpuset: Option<BTreeSet<usize>> = state.lookup_cpuset(&def.name).cloned();
429    if let Some(ref resolved) = cgroup_cpuset {
430        for work in &effective_works {
431            validate_mempolicy_cpuset(&work.mem_policy, work.mpol_flags, resolved, ctx, &def.name)?;
432        }
433    }
434    // Per-WorkSpec pcomm dispatch. A WorkSpec with `pcomm =
435    // Some(value)` joins a thread-group leader keyed on
436    // `value`: every WorkSpec sharing the same `pcomm` value
437    // coalesces into ONE forked leader per group, and every
438    // thread inside the leader observes
439    // `task->group_leader->comm == pcomm`. WorkSpecs with
440    // `pcomm = None` (or an empty pcomm string, which is
441    // treated as `None`) spawn via the conventional fork path
442    // — one process per worker.
443    //
444    // Coalescing key: the pcomm string itself. Different pcomm
445    // values inside the same CgroupDef produce different
446    // leaders (more flexible than rejecting heterogeneity, and
447    // matches the "model real workloads like `chrome` next to
448    // `java` in one cgroup" use case).
449    //
450    // Resolve every WorkSpec's `num_workers` / `work_type` /
451    // `affinity` once up front; the same triple is used by
452    // both dispatch paths and the resolution context (ctx,
453    // cgroup_cpuset) is identical for every WorkSpec inside
454    // this CgroupDef.
455    let mut resolved_works: Vec<crate::workload::WorkSpec> =
456        Vec::with_capacity(effective_works.len());
457    for work in &effective_works {
458        // Resolve `workers_pct` against the cgroup's cpuset (or
459        // the topology-usable cpuset when the cgroup inherits)
460        // and synthesize a `num_workers` value before the rest of
461        // the dispatch. Shares the resolution helper with
462        // Op::Spawn(SpawnPlacement::Cgroup) so the two paths
463        // produce identical worker counts for the same
464        // `(pct, cpuset_size)` pair.
465        let cpuset_size = cgroup_cpuset
466            .as_ref()
467            .map_or_else(|| ctx.topo.usable_cpuset().len(), |s| s.len());
468        let work = work.clone().resolve_workers_pct(cpuset_size, &def.name)?;
469        let n = crate::scenario::resolve_num_workers(&work, ctx.workers_per_cgroup, &def.name)?;
470        let effective_work_type = crate::workload::resolve_work_type(
471            &work.work_type,
472            ctx.work_type_override.as_ref(),
473            def.swappable,
474            n,
475        );
476        let affinity =
477            crate::scenario::intent_for_spawn(&work.affinity, cgroup_cpuset.as_ref(), ctx.topo)?;
478        resolved_works.push(crate::workload::WorkSpec {
479            work_type: effective_work_type,
480            sched_policy: work.sched_policy,
481            num_workers: Some(n),
482            affinity,
483            mem_policy: work.mem_policy.clone(),
484            mpol_flags: work.mpol_flags,
485            nice: work.nice,
486            comm: work.comm.clone(),
487            pcomm: work.pcomm.clone(),
488            uid: work.uid,
489            gid: work.gid,
490            numa_node: work.numa_node,
491            workers_pct: None,
492        });
493    }
494    Ok(resolved_works)
495}
496
497/// Spawn the synthetic worker handles for `def` from the
498/// already-resolved `resolved_works`: partition by `pcomm`, spawn
499/// the non-pcomm WorkSpecs via the conventional fork path, then
500/// spawn one thread-group leader per unique `pcomm` value. Each
501/// spawned handle is moved into `def`'s cgroup, started, and pushed
502/// onto `state`'s target handle vec.
503fn spawn_def_workers(
504    ctx: &Ctx,
505    state: &mut ScenarioState<'_, '_>,
506    def: &CgroupDef,
507    resolved_works: Vec<crate::workload::WorkSpec>,
508) -> Result<()> {
509    // Partition by pcomm value. `pcomm_groups` is keyed on the
510    // pcomm string; insertion order tracked via a parallel
511    // `pcomm_order` vec so the spawn order is stable
512    // (BTreeMap iteration would reorder by string sort).
513    let mut pcomm_groups: std::collections::HashMap<String, Vec<crate::workload::WorkSpec>> =
514        std::collections::HashMap::new();
515    let mut pcomm_order: Vec<String> = Vec::new();
516    let mut non_pcomm_works: Vec<crate::workload::WorkSpec> = Vec::new();
517    for work in resolved_works {
518        match &work.pcomm {
519            Some(value) if !value.is_empty() => {
520                let key = value.to_string();
521                if !pcomm_groups.contains_key(&key) {
522                    pcomm_order.push(key.clone());
523                }
524                pcomm_groups.entry(key).or_default().push(work);
525            }
526            _ => non_pcomm_works.push(work),
527        }
528    }
529
530    // Spawn non-pcomm WorkSpecs via the conventional fork path
531    // (one WorkloadHandle per WorkSpec, one move_tasks call
532    // per spawn).
533    for work in non_pcomm_works {
534        let n = work.num_workers.expect("num_workers resolved above");
535        // `for_scenario_engine` pins `clone_mode = Fork` in the
536        // constructor body so the previously-needed
537        // `assert_eq!(wl.clone_mode, Fork)` collapses into the
538        // type system. Move_tasks below relies on Fork semantics:
539        // each worker is its own tgid leader, so writing a
540        // worker's TID to `<cgroup>/cgroup.procs` migrates only
541        // that worker. Thread mode would drag the scenario
542        // runner's tgid into def.name's cgroup per
543        // kernel/cgroup/cgroup.c::cgroup_procs_write_start.
544        let wl = WorkloadConfig::for_scenario_engine(
545            &work,
546            n,
547            work.affinity.clone(),
548            work.work_type.clone(),
549        )?;
550        tracing::debug!(
551            cgroup = %def.name,
552            expected_workers = n,
553            comm = ?work.comm,
554            work_type = ?work.work_type,
555            "apply_setup: about to spawn non-pcomm workers"
556        );
557        let mut h = WorkloadHandle::spawn(&wl)?;
558        let pids = h.worker_pids();
559        // Append the placement record to a guest-side log file.
560        // Empirical reruns ruled out every stderr/serial route
561        // for surfacing apply_setup diagnostics in libtest's
562        // captured stderr (tracing::*/eprintln drop in the pre-
563        // multiport-handshake window; COM1/COM2 PIO writes do
564        // not surface either — the host's test-mode serial
565        // capture only scans for panic-prefixed lines). A
566        // tmpfs file is the established pattern in this test
567        // module (the shell probe at cgroup_ops_placement_e2e
568        // already writes /tmp/ktstr-cgroup-ops-procs and the
569        // test body reads it on assertion failure).
570        append_placement_log(&format!(
571            "apply_setup: spawned non-pcomm workers cgroup={} count={} pids={:?}",
572            def.name,
573            pids.len(),
574            pids,
575        ));
576        ctx.cgroups.move_tasks(&def.name, &pids)?;
577        h.start();
578        state.target_handles().push((def.name.to_string(), h));
579    }
580
581    // Spawn one thread-group leader per unique pcomm value.
582    // Each leader hosts every WorkSpec that shares its pcomm.
583    for pcomm in pcomm_order {
584        let works_for_pcomm = pcomm_groups
585            .remove(&pcomm)
586            .expect("pcomm key inserted during partition pass");
587        // glibc setresuid/setresgid broadcasts via NPTL signalling
588        // to every thread in the tgid; coalesced WorkSpecs that
589        // disagree on uid/gid would race the leader's credentials
590        // out from under the other group's threads. Reject mixed
591        // values upfront so the misconfiguration surfaces here
592        // rather than as a runtime credential flap.
593        if works_for_pcomm.len() > 1 {
594            let first_uid = works_for_pcomm[0].uid;
595            let first_gid = works_for_pcomm[0].gid;
596            for (i, w) in works_for_pcomm.iter().enumerate().skip(1) {
597                if w.uid != first_uid {
598                    anyhow::bail!(
599                        "cgroup '{}' pcomm '{}': WorkSpec[0].uid={:?} differs from \
600                         WorkSpec[{}].uid={:?}; pcomm-coalesced WorkSpecs must \
601                         agree on uid (NPTL setresuid is broadcast to every thread \
602                         in the tgid)",
603                        def.name,
604                        pcomm,
605                        first_uid,
606                        i,
607                        w.uid,
608                    );
609                }
610                if w.gid != first_gid {
611                    anyhow::bail!(
612                        "cgroup '{}' pcomm '{}': WorkSpec[0].gid={:?} differs from \
613                         WorkSpec[{}].gid={:?}; pcomm-coalesced WorkSpecs must \
614                         agree on gid (NPTL setresgid is broadcast to every thread \
615                         in the tgid)",
616                        def.name,
617                        pcomm,
618                        first_gid,
619                        i,
620                        w.gid,
621                    );
622                }
623            }
624        }
625        // Container-leader credentials. Fall back to the first
626        // WorkSpec's uid/gid when no CgroupDef-level default is
627        // set: glibc's `setresuid` is broadcast to every thread
628        // in the tgid via NPTL signalling, so a worker thread's
629        // setresuid would eventually drop the leader's
630        // credentials anyway. Pre-applying it on the leader
631        // closes the root-uid window between fork and the first
632        // worker's setresuid call. When the WorkSpec also has
633        // `uid = None` the container stays at the parent's
634        // credentials (root in the test harness, the harness's
635        // euid otherwise) — the WorkSpec's lack-of-uid means
636        // "inherit the parent" anyway.
637        let container_uid = def
638            .default_uid
639            .or_else(|| works_for_pcomm.first().and_then(|w| w.uid));
640        let container_gid = def
641            .default_gid
642            .or_else(|| works_for_pcomm.first().and_then(|w| w.gid));
643        tracing::debug!(
644            cgroup = %def.name,
645            pcomm = %pcomm,
646            workspec_count = works_for_pcomm.len(),
647            "apply_setup: about to spawn pcomm-coalesced workers"
648        );
649        let mut h = WorkloadHandle::spawn_pcomm_cgroup(
650            &pcomm,
651            container_uid,
652            container_gid,
653            &works_for_pcomm,
654        )?;
655        let pids = h.worker_pids();
656        // Append the placement record to a guest-side log file —
657        // see the non-pcomm sibling above for the routing
658        // rationale.
659        append_placement_log(&format!(
660            "apply_setup: spawned pcomm workers cgroup={} pcomm={} count={} pids={:?}",
661            def.name,
662            pcomm,
663            pids.len(),
664            pids,
665        ));
666        ctx.cgroups.move_tasks(&def.name, &pids)?;
667        tracing::debug!(
668            cgroup = %def.name,
669            pcomm = %pcomm,
670            "apply_setup: move_tasks succeeded; about to h.start()"
671        );
672        h.start();
673        tracing::debug!(
674            cgroup = %def.name,
675            pcomm = %pcomm,
676            "apply_setup: h.start() returned; handle pushed"
677        );
678        state.target_handles().push((def.name.to_string(), h));
679    }
680    Ok(())
681}
682
683/// Spawn the optional userspace payload for `def` inside its cgroup,
684/// after the synthetic worker handles are already in place. No-op
685/// when `def.payload` is `None`. Rejects a duplicate payload already
686/// live in the same cgroup; on success pushes a [`PayloadEntry`]
687/// onto `state`'s target payload-handle vec.
688fn spawn_def_payload(ctx: &Ctx, state: &mut ScenarioState<'_, '_>, def: &CgroupDef) -> Result<()> {
689    // After synthetic workers are in place, spawn the optional
690    // userspace payload inside the same cgroup. The payload runs
691    // concurrently with the WorkSpec groups; its metrics are recorded
692    // to the sidecar via the guest-to-host SHM ring when the
693    // handle is killed at step-teardown. Spawning after the WorkSpec
694    // handles lets the cgroup cpuset + mempolicy settle first so
695    // the binary inherits a stable placement.
696    if let Some(payload) = def.payload {
697        // Composite-key dedup: the same payload CAN live in a
698        // different cgroup, but two copies in THIS cgroup would
699        // collide on teardown (one handle masks the other in
700        // the sidecar). Reject upfront with the same error
701        // shape as the Op::RunPayload path.
702        if let Some(existing) = state.find_live_payload_with_cgroup(payload.name, def.name.as_ref())
703        {
704            anyhow::bail!(
705                "CgroupDef::workload: payload '{}' already running in cgroup '{}' (spawned by {}) — \
706                 declare it in exactly one place per cgroup",
707                payload.name,
708                def.name,
709                existing.source.describe(),
710            );
711        }
712        let handle = crate::scenario::payload_run::PayloadRun::new(ctx, payload)
713            .in_cgroup(def.name.clone())
714            .spawn()
715            .map_err(|e| {
716                anyhow::anyhow!(
717                    "cgroup '{}': spawn payload '{}': {:#}",
718                    def.name,
719                    payload.name,
720                    e,
721                )
722            })?;
723        state.target_payload_handles().push(PayloadEntry {
724            cgroup: def.name.to_string(),
725            source: PayloadSource::CgroupDefWorkload,
726            handle,
727        });
728    }
729    Ok(())
730}
731
732/// Helper for [`apply_setup`]: spawn the host-mode stall monitor
733/// against every step-local worker pid the just-completed setup
734/// pass left in [`super::StepState::handles`]. Idempotent —
735/// re-invocation when the monitor is already started is a no-op.
736///
737/// The set of pids is snapshot once at start time; workers added
738/// by subsequent ops (e.g. an `Op::Spawn` later in the same step)
739/// are NOT picked up by the running thread. This matches the
740/// dominant CgroupDef workload shape (every worker for the step
741/// is spawned in one apply_setup pass) and avoids the
742/// add-while-polling synchronization complexity of a dynamic
743/// pid set.
744pub(super) fn maybe_start_stall_monitor(state: &mut ScenarioState<'_, '_>) {
745    // Backdrop-setup pass: monitor is scoped to the step, not the
746    // backdrop. Skip and let the next per-step apply_setup install
747    // it if/when step-local workers spawn.
748    if state.target_backdrop {
749        return;
750    }
751    // Already started for this step.
752    if state.step.stall_monitor.is_some() {
753        return;
754    }
755    // VM-side scenarios use the freeze coordinator's stall plumbing,
756    // not the host-mode poller. cargo_test_mode runs the harness
757    // itself in the same /proc namespace, so a poller would see
758    // unrelated harness threads.
759    if guest_comms::is_guest() || crate::cargo_test_mode::cargo_test_mode_active() {
760        return;
761    }
762    // Collect every worker pid in scope — step-local handles
763    // spawned by the just-completed apply_setup pass AND
764    // backdrop-owned handles that survive across steps. The
765    // monitor's lifetime is per-step (re-installed at each step
766    // boundary), but the pid set must include backdrop workers
767    // because a stalled backdrop worker (e.g. a long-running
768    // payload pinned to a contended cpuset) would otherwise be
769    // silent across the entire scenario.
770    let pids: Vec<libc::pid_t> = state
771        .step
772        .handles
773        .iter()
774        .chain(state.backdrop.handles.iter())
775        .flat_map(|(_, h)| h.worker_pids())
776        .collect();
777    if pids.is_empty() {
778        return;
779    }
780    match crate::scenario::host_stall::spawn_monitor(&pids) {
781        Ok(handle) => {
782            state.step.stall_monitor = Some(handle);
783        }
784        Err(e) => {
785            // Spawn failure is non-fatal — the scenario itself can
786            // still run and report worker results. Surface the
787            // defect via tracing so an operator can spot the
788            // missing stall coverage.
789            tracing::warn!(err = %format!("{e:#}"), "host_stall::spawn_monitor failed; stall monitor disabled for this step");
790        }
791    }
792}