ktstr/workload/spawn/
mod.rs

1//! Spawn pipeline: `WorkloadHandle`, `SpawnGuard`, `GroupParams`,
2//! `ThreadWorker`, the report shapes (`WorkerReport`, `WorkerExitInfo`,
3//! `Migration`), and the helpers that thread workers through fork or
4//! `std::thread::spawn`. Split out of `workload/mod.rs` to separate
5//! the production code path from its co-located tests. Tests are
6//! co-located with the production code in topic-grouped sibling
7//! files (`tests_lifecycle`, `tests_grandchild`, `tests_composed`,
8//! ...) that import shared fixtures from `testing.rs` via
9//! `use super::testing::*;`.
10
11use anyhow::{Context, Result};
12use std::collections::{BTreeMap, BTreeSet};
13use std::io::{Read, Write};
14use std::os::unix::io::FromRawFd;
15use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
16use std::time::{Duration, Instant};
17
18use super::affinity::{AffinityIntent, ResolvedAffinity, resolve_affinity, set_thread_affinity};
19use super::config::{CloneMode, MemPolicy, MpolFlags, SchedPolicy, WorkSpec, WorkloadConfig};
20use super::types::*;
21use super::worker::worker_main;
22
23mod cleanup;
24use cleanup::{close_fds_silently, kill_and_killpg};
25
26pub(super) static STOP: AtomicBool = AtomicBool::new(false);
27
28/// A single CPU migration event observed by a worker.
29///
30/// No `Default` impl — a default-constructed Migration would be
31/// `{at_ns: 0, from_cpu: 0, to_cpu: 0}` (a "migration from CPU 0 to
32/// CPU 0 at time 0" is a contradiction: a migration where source ==
33/// dest is not a real migration). Downstream analysis (NUMA migration
34/// counts, scheduler-balance ratios) that assumes `from_cpu !=
35/// to_cpu` would misread default values as real migrations. Construct
36/// every Migration explicitly via [`Migration::new`].
37#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
38pub struct Migration {
39    /// Nanoseconds since worker start.
40    pub at_ns: u64,
41    /// CPU before migration.
42    pub from_cpu: usize,
43    /// CPU after migration.
44    pub to_cpu: usize,
45}
46
47impl Migration {
48    /// Const constructor for a Migration event. Pair-symmetric with
49    /// the struct fields; lets `Migration::new(ns, from, to)` replace
50    /// `Migration { at_ns: ns, from_cpu: from, to_cpu: to }` in
51    /// hand-built test fixtures.
52    pub const fn new(at_ns: u64, from_cpu: usize, to_cpu: usize) -> Self {
53        Self {
54            at_ns,
55            from_cpu,
56            to_cpu,
57        }
58    }
59}
60
61/// Build a nodemask bitmask and maxnode value for `set_mempolicy(2)`
62/// and `mbind(2)`.
63///
64/// Returns `(nodemask_vec, maxnode)`. The nodemask is a bitmask of
65/// `c_ulong` words where bit N corresponds to NUMA node N. `maxnode`
66/// must be `max_node + 2` because the kernel's `get_nodes()` does
67/// `--maxnode` before reading the bitmask.
68pub fn build_nodemask(nodes: &BTreeSet<usize>) -> (Vec<libc::c_ulong>, libc::c_ulong) {
69    if nodes.is_empty() {
70        return (vec![], 0);
71    }
72    let max_node = nodes.iter().copied().max().unwrap_or(0);
73    let mask_bits = max_node + 2;
74    let bits_per_word = std::mem::size_of::<libc::c_ulong>() * 8;
75    let mask_words = mask_bits.div_ceil(bits_per_word);
76    let mut nodemask = vec![0 as libc::c_ulong; mask_words];
77    for &node in nodes {
78        nodemask[node / bits_per_word] |= 1 << (node % bits_per_word);
79    }
80    (nodemask, mask_bits as libc::c_ulong)
81}
82
83const MPOL_PREFERRED_MANY: i32 = 5;
84const MPOL_WEIGHTED_INTERLEAVE: i32 = 6;
85
86/// Worker-side `futex_wait` timeout for STOP-signal polling across
87/// every blocking workload primitive (FutexPingPong, FutexFanOut,
88/// FanOutCompute, MutexContention). Workers block inside the
89/// per-variant futex with this timespec; on wake (or timeout) they
90/// re-check [`STOP`] and either continue working or exit cleanly.
91/// At 100ms the worst-case shutdown latency a `stop_and_collect`
92/// caller must budget for is ~100ms above the flush/IO cost; see
93/// [`WorkloadHandle::stop_and_collect`]'s "Shutdown latency"
94/// paragraph for the caller-facing contract.
95pub(super) const WORKER_STOP_POLL_NS: libc::c_long = 100_000_000;
96
97/// Packaged [`libc::timespec`] for every worker-side `futex_wait`
98/// across the blocking workload primitives. Duplicating the struct
99/// literal per call site drifted the `tv_nsec` field between variants
100/// during earlier edits; a single const keeps the shutdown-latency
101/// budget documented on [`WORKER_STOP_POLL_NS`] authoritative.
102pub(super) const FUTEX_WAIT_TIMEOUT: libc::timespec = libc::timespec {
103    tv_sec: 0,
104    tv_nsec: WORKER_STOP_POLL_NS,
105};
106
107/// Post-wake spin count used by the fan-out messenger variants
108/// ([`WorkType::FutexFanOut`] and [`WorkType::FanOutCompute`]) AFTER
109/// each broadcast wake. Gives receivers a short uncontended window
110/// to run to their reservoir-push before the next wake cycle
111/// arrives. Threaded through `spin_burst` rather than a raw
112/// `std::hint::spin_loop` so the messenger also contributes to
113/// `work_units` — matching FanOutCompute's existing pattern so
114/// both variants' messengers report comparable throughput to
115/// downstream assertions.
116pub(super) const FAN_OUT_POST_WAKE_SPIN_ITERS: u64 = 256;
117
118/// Outcome of [`apply_mempolicy_with_flags`].
119///
120/// Makes each branch of the apply path observable to tests and any
121/// future caller that wants to react to the result (the production
122/// worker-setup caller in `worker::worker_main` discards it — applying
123/// the policy is best-effort there, matching the `setpriority` /
124/// `set_sched_policy` soft-fail idiom). The variants partition the
125/// function's exits exactly:
126///
127/// - [`SkippedDefault`](MempolicyOutcome::SkippedDefault): `policy ==
128///   MemPolicy::Default` — no syscall (inherit the parent's policy).
129/// - [`SkippedEmpty`](MempolicyOutcome::SkippedEmpty): a
130///   node-set-bearing variant (`Bind` / `Interleave` /
131///   `PreferredMany` / `WeightedInterleave`) carried an empty set — no
132///   `set_mempolicy(2)` is issued. Issuing one with an empty mask
133///   would be a different (and wrong) request to the kernel, so the
134///   skip is the contract, not merely "did not crash".
135/// - [`Applied`](MempolicyOutcome::Applied): `set_mempolicy(2)`
136///   returned 0.
137/// - [`Failed`](MempolicyOutcome::Failed): `set_mempolicy(2)` returned
138///   non-zero; the errno is logged to stderr and carried here.
139#[derive(Debug, Clone, Copy, PartialEq, Eq)]
140pub(super) enum MempolicyOutcome {
141    /// `MemPolicy::Default` — no syscall.
142    SkippedDefault,
143    /// A node-set variant with an empty set — no syscall.
144    SkippedEmpty,
145    /// `set_mempolicy(2)` succeeded.
146    Applied,
147    /// `set_mempolicy(2)` failed; carries the errno.
148    Failed(i32),
149}
150
151/// Call `set_mempolicy(2)` for the current process with mode flags.
152///
153/// No-op for `MemPolicy::Default` (returns
154/// [`MempolicyOutcome::SkippedDefault`]) and for a node-set variant
155/// with an empty set (returns [`MempolicyOutcome::SkippedEmpty`]).
156/// Logs a warning on syscall failure and returns
157/// [`MempolicyOutcome::Failed`]; returns [`MempolicyOutcome::Applied`]
158/// on success.
159pub(super) fn apply_mempolicy_with_flags(policy: &MemPolicy, flags: MpolFlags) -> MempolicyOutcome {
160    let (mode, node_set): (i32, BTreeSet<usize>) = match policy {
161        MemPolicy::Default => return MempolicyOutcome::SkippedDefault,
162        MemPolicy::Bind(nodes) => (libc::MPOL_BIND, nodes.clone()),
163        MemPolicy::Preferred(node) => (libc::MPOL_PREFERRED, [*node].into_iter().collect()),
164        MemPolicy::Interleave(nodes) => (libc::MPOL_INTERLEAVE, nodes.clone()),
165        MemPolicy::PreferredMany(nodes) => (MPOL_PREFERRED_MANY, nodes.clone()),
166        MemPolicy::WeightedInterleave(nodes) => (MPOL_WEIGHTED_INTERLEAVE, nodes.clone()),
167        MemPolicy::Local => {
168            let rc = unsafe {
169                libc::syscall(
170                    libc::SYS_set_mempolicy,
171                    libc::MPOL_LOCAL | flags.bits() as i32,
172                    std::ptr::null::<libc::c_ulong>(),
173                    0 as libc::c_ulong,
174                )
175            };
176            if rc != 0 {
177                let err = std::io::Error::last_os_error();
178                eprintln!("ktstr: set_mempolicy(MPOL_LOCAL) failed: {err}");
179                return MempolicyOutcome::Failed(err.raw_os_error().unwrap_or(0));
180            }
181            return MempolicyOutcome::Applied;
182        }
183    };
184    if node_set.is_empty() {
185        eprintln!("ktstr: set_mempolicy: empty node set, skipping");
186        return MempolicyOutcome::SkippedEmpty;
187    }
188    let (mask, maxnode) = build_nodemask(&node_set);
189    let effective_mode = mode | flags.bits() as i32;
190    let rc = unsafe {
191        libc::syscall(
192            libc::SYS_set_mempolicy,
193            effective_mode,
194            mask.as_ptr(),
195            maxnode,
196        )
197    };
198    if rc != 0 {
199        let err = std::io::Error::last_os_error();
200        eprintln!("ktstr: set_mempolicy(mode={mode}, nodes={node_set:?}) failed: {err}");
201        return MempolicyOutcome::Failed(err.raw_os_error().unwrap_or(0));
202    }
203    MempolicyOutcome::Applied
204}
205
206/// Terminate the calling forked-child worker with success status (code 0).
207///
208/// Uses the `_exit(2)` syscall via `libc::_exit` directly rather than [`std::process::exit`] or
209/// returning from `main` so that:
210/// 1. Rust drop glue does NOT run on inherited parent state (open file
211///    descriptors, mmaps, atexit handlers, thread-local destructors).
212///    Forked children share address space with the parent at fork
213///    time; running destructors on the child side would also tear down
214///    parent-owned resources via the shared FD table / OS handles.
215/// 2. `exit_group(2)` semantics (which `_exit` invokes) tear down only
216///    the calling tgid. Forked children have their own tgid distinct
217///    from the test-runner's, so the parent test runner survives.
218///
219/// Code 0 signals to the parent's collect_results path that the worker
220/// completed the work-loop normally — the WorkerReport (if any was
221/// written) is valid and should be merged into the test verdict.
222#[inline]
223fn exit_child_success() -> ! {
224    // SAFETY: `libc::_exit` is `unsafe` because it bypasses Rust drop
225    // execution + C stdlib atexit. That bypass IS the contract here
226    // for forked children: running destructors on shared inherited
227    // state corrupts parent-owned resources. exit_group(2) tear-down
228    // of the child's tgid is sound — the parent is in a different
229    // tgid.
230    unsafe { libc::_exit(0) }
231}
232
233/// Terminate the calling forked-child worker with error status (code 1).
234///
235/// Same `_exit`-vs-Rust-exit rationale as [`exit_child_success`]. Code 1
236/// signals to the parent's collect_results path that the worker failed
237/// before completing the work-loop normally — the parent treats the
238/// WorkerReport as missing or invalid and surfaces the failure to the
239/// test verdict.
240///
241/// Use this on any error path inside the forked-child closure: poll
242/// timeout, syscall failure, panic via `catch_unwind`, write-report
243/// failure, orphan detection.
244#[inline]
245fn exit_child_error() -> ! {
246    // SAFETY: see [`exit_child_success`] — identical contract.
247    unsafe { libc::_exit(1) }
248}
249
250/// Apply `nice` to the calling worker via `setpriority(2)`.
251///
252/// Always invokes `setpriority(PRIO_PROCESS, 0, nice)` — including
253/// for `nice == 0`, which writes 0 explicitly rather than
254/// inheriting. The "skip the syscall, inherit the parent's nice"
255/// state lives one layer up: callers gate the call on
256/// [`WorkSpec::nice`](crate::workload::WorkSpec::nice) /
257/// [`WorkloadConfig::nice`](crate::workload::WorkloadConfig::nice)
258/// being `Some(_)` and pass through the inner value (so `Some(0)`
259/// becomes a real `setpriority(0)` and `None` skips the call
260/// entirely). The kernel clamps `niceval` to
261/// `[MIN_NICE, MAX_NICE]` (-20..19) inside `setpriority`, so any
262/// out-of-range input is normalised by the syscall itself rather
263/// than rejected.
264///
265/// Failures are logged once via stderr and do not abort the
266/// worker — matches the [`apply_mempolicy_with_flags`] /
267/// [`set_thread_affinity`] / `set_sched_policy` error idiom in
268/// `worker_main`. The expected failure mode is `EACCES` from
269/// `set_one_prio` → `can_nice` when an unprivileged worker tries
270/// to lower nice (negative niceval) without `CAP_SYS_NICE`.
271pub(super) fn apply_nice(nice: i32) {
272    let rc = unsafe { libc::setpriority(libc::PRIO_PROCESS, 0, nice) };
273    if rc != 0 {
274        warn_setpriority_failed_once();
275    }
276}
277
278/// Print a single `setpriority` failure warning for the lifetime
279/// of the process. Same rationale as
280/// `warn_schedstat_unavailable_once`: dozens of workers will fail
281/// once each on an unprivileged host that requested negative nice,
282/// and a per-worker line floods the test log.
283pub(super) fn warn_setpriority_failed_once() {
284    static WARNED: std::sync::Once = std::sync::Once::new();
285    WARNED.call_once(|| {
286        let errno = std::io::Error::last_os_error();
287        eprintln!(
288            "workload: setpriority(PRIO_PROCESS) failed: {errno}; nice value not applied (CAP_SYS_NICE may be required for negative nice)"
289        );
290    });
291}
292
293/// Telemetry collected from a worker process after it stops.
294///
295/// Normal reports: each field is populated by the worker itself
296/// (inside the VM) and serialized via a pipe to the parent process.
297/// Sentinel reports: sentinel reports synthesized by
298/// [`WorkloadHandle::stop_and_collect`] on worker-exit carry
299/// parent-populated `exit_info` with the remaining fields at their
300/// [`Default`] values (the worker never emitted on the pipe, so
301/// the parent is the sole source of truth for the surfaced
302/// outcome).
303///
304/// # Default trade-off
305///
306/// [`Default`] produces a zero/empty report. The trade-off:
307///
308/// - **Pro:** sentinel/test code can spread `..WorkerReport::default()`
309///   so adding a field does not require touching every sentinel site.
310/// - **Con:** zero-valued fields are valid report outputs (e.g. a
311///   worker that never blocked has `wake_latencies_ns: vec![]`), so
312///   a missing field cannot be distinguished from a real-zero field at
313///   the reader. Consumers that need "was this field actually set"
314///   must track presence out-of-band (e.g. whether the work type
315///   populates the field per `wake_latencies_ns`'s doc).
316///
317/// Decision: keep the `Default` impl. Sentinel ergonomics outweigh
318/// the distinguishability cost — every real consumer already knows
319/// which fields a given `WorkType` populates, and the alternative
320/// (removing `Default` and hand-listing every field at sentinel
321/// sites) introduces a worse drift problem that silently skips new
322/// telemetry instead of reporting it as zero.
323///
324/// Callers building a sentinel report should spread
325/// `..WorkerReport::default()` rather than listing every field by hand
326/// -- the sentinel drifts silently when a field is added.
327#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize, crate::Claim)]
328pub struct WorkerReport {
329    /// Kernel TID from `gettid(2)`. For [`CloneMode::Fork`] each
330    /// worker is its own thread-group leader so `gettid() == getpid()
331    /// == tgid`; the report's tid is interchangeable with the
332    /// worker's pid in libc / cgroup APIs. For [`CloneMode::Thread`]
333    /// every worker shares the parent's tgid and `gettid()` is the
334    /// only identifier that discriminates per-task identity, so the
335    /// report's tid is what feeds `sched_setaffinity(tid, ...)` and
336    /// `cgroup.threads` writes (NOT `cgroup.procs` — see the warning
337    /// on [`WorkloadHandle::worker_pids`]). Stored as `pid_t` (i32)
338    /// to match the kernel's native type and avoid the silent
339    /// u32→i32 sign-cast wraparound at libc boundaries
340    /// (kill/waitpid/Pid::from_raw).
341    pub tid: i32,
342    /// Cumulative work iterations (incremented by `spin_burst` or I/O loops).
343    /// Read by the fairness/starvation gate (`assert_not_starved` /
344    /// `min_work_units`) and `assert_throughput_parity`; NOT summed into
345    /// `CgroupStats::total_iterations`, which reads [`iterations`](Self::iterations).
346    /// A `Custom` worker that wants throughput assertions must also populate
347    /// [`iterations`](Self::iterations).
348    pub work_units: u64,
349    /// Thread CPU time from `CLOCK_THREAD_CPUTIME_ID` (ns).
350    pub cpu_time_ns: u64,
351    /// Wall-clock time from worker-start to stop flag (ns).
352    /// Measured from the worker's first `Instant::now()` in
353    /// `worker_main` (immediately after the start handshake) to the
354    /// outer-loop exit (when the per-worker `stop` flag is observed
355    /// `true`); covers both Fork-mode workers (signal-driven flag)
356    /// and Thread-mode workers (parent-driven flag).
357    pub wall_time_ns: u64,
358    /// `wall_time_ns - cpu_time_ns`: total off-CPU time (ns).
359    ///
360    /// Includes all time the worker was not executing on a CPU: runnable
361    /// queue wait, voluntary sleep, I/O wait, futex wait, etc.
362    pub off_cpu_ns: u64,
363    /// Number of observed CPU migrations (checked every 1024 work units).
364    pub migration_count: u64,
365    /// Set of all CPUs this worker ran on.
366    pub cpus_used: BTreeSet<usize>,
367    /// Ordered list of CPU migration events with timestamps.
368    pub migrations: Vec<Migration>,
369    /// Longest wall-clock gap observed at 1024-work-unit checkpoints
370    /// (ms). High values indicate the task was preempted or descheduled
371    /// near a checkpoint boundary.
372    pub max_gap_ms: u64,
373    /// CPU where the longest gap happened.
374    pub max_gap_cpu: usize,
375    /// When the longest gap happened (ms from start).
376    pub max_gap_at_ms: u64,
377    /// Per-wakeup latency samples (ns). Measures off-CPU time
378    /// between the call that blocks (any blocking primitive — pipe
379    /// `read`, futex wait, `poll`, `sched_yield`, `nanosleep`, etc.)
380    /// and the wakeup that resumes execution; not a yield-specific
381    /// measure.
382    /// Populated for blocking work types: Bursty, PipeIo, FutexPingPong,
383    /// FutexFanOut, FanOutCompute, CacheYield, CachePipe, IoSyncWrite,
384    /// IoRandRead, IoConvoy, NiceSweep,
385    /// AffinityChurn, PolicyChurn, MutexContention, ForkExit (parent's
386    /// waitpid wait), Sequence with Sleep/Yield/Io phases.
387    ///
388    /// Distinct from [`iteration_costs_ns`](Self::iteration_costs_ns):
389    /// this field measures the OFF-CPU gap between blocks (scheduler
390    /// wake latency); `iteration_costs_ns` measures the wall-clock
391    /// duration of a single compute iteration. The three pure-compute
392    /// variants that populate `iteration_costs_ns` —
393    /// [`WorkType::AluHot`], [`WorkType::SmtSiblingSpin`], and
394    /// [`WorkType::IpcVariance`] — never block and report
395    /// `wake_latencies_ns: vec![]`. Other compute variants
396    /// (e.g. SpinWait, YieldHeavy, Mixed) populate neither
397    /// reservoir.
398    pub wake_latencies_ns: Vec<u64>,
399    /// Total number of wake-latency observations the worker
400    /// recorded, INCLUDING any that were dropped by the reservoir
401    /// sampler. `wake_latencies_ns` is reservoir-clamped to at
402    /// most `MAX_WAKE_SAMPLES` (100_000) entries; on a long run
403    /// that accumulates more than that many wake events, the
404    /// vector stays at its cap while this counter keeps climbing.
405    /// Host-side consumers that want to report "total wakeups
406    /// observed" (vs. "entries in the sample") read this field;
407    /// percentile / CV computations read `wake_latencies_ns`.
408    pub wake_sample_total: u64,
409    /// Per-iteration wall-clock duration of one compute iteration (ns),
410    /// including any scheduler preemption. Measured via
411    /// `Instant::now()` (CLOCK_MONOTONIC), so a sample includes any
412    /// off-CPU time the kernel inserted mid-iteration. The variance
413    /// across iterations is the load-bearing scheduler signal —
414    /// preemption inflates samples and that inflation is the
415    /// observable.
416    ///
417    /// Reservoir-sampled at the same cap (`MAX_WAKE_SAMPLES` =
418    /// 100_000) as [`wake_latencies_ns`](Self::wake_latencies_ns),
419    /// using the same Algorithm-R sampler.
420    ///
421    /// Populated for pure compute work types where the worker
422    /// never blocks: [`WorkType::AluHot`], [`WorkType::SmtSiblingSpin`],
423    /// and [`WorkType::IpcVariance`]. Each sample is the elapsed
424    /// time from the start to the end of one outer-loop iteration's
425    /// compute burst.
426    ///
427    /// Distinct from [`wake_latencies_ns`](Self::wake_latencies_ns):
428    /// the wake-latency reservoir captures off-CPU time (futex /
429    /// pipe / nanosleep wakeups); this reservoir captures the
430    /// wall-clock duration of one compute iteration (which
431    /// includes any scheduler preemption inside the iteration).
432    /// The two are NOT comparable across variants — a
433    /// scheduler-A/B test that wants iteration cost for a compute
434    /// variant reads this field; a test that wants wake latency
435    /// for a blocking variant reads `wake_latencies_ns`.
436    pub iteration_costs_ns: Vec<u64>,
437    /// Total number of iteration-cost observations the worker
438    /// recorded, INCLUDING any that were dropped by the reservoir
439    /// sampler. Mirrors [`wake_sample_total`](Self::wake_sample_total)
440    /// but for [`iteration_costs_ns`](Self::iteration_costs_ns):
441    /// host-side consumers that want "total compute iterations
442    /// observed" read this field; distribution computations read
443    /// `iteration_costs_ns` directly.
444    pub iteration_cost_sample_total: u64,
445    /// Per-timer-cycle latency samples (ns) for
446    /// [`crate::workload::WorkType::TimerLatency`]: the observed
447    /// `clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME)` wake time minus the
448    /// absolute deadline, floored at 0. Reservoir-clamped to at most
449    /// `MAX_WAKE_SAMPLES`, distinct from `wake_latencies_ns` so cyclictest-style
450    /// timer latency does not blur with the blocking variants' wake latency.
451    /// `vec![]` for every non-`TimerLatency` variant.
452    pub timer_latencies_ns: Vec<u64>,
453    /// Total timer-cycle observations, INCLUDING any the reservoir dropped —
454    /// the true population for unbiased cross-phase weighting. Mirrors
455    /// [`wake_sample_total`](Self::wake_sample_total) for `timer_latencies_ns`.
456    pub timer_sample_total: u64,
457    /// Outer-loop iteration count. What `CgroupStats::total_iterations` sums
458    /// and what the derived throughput rates (`iterations_per_worker` /
459    /// `iterations_per_cpu_sec`) and `migration_ratio` divide by; NOT read by
460    /// the starvation gate, which reads [`work_units`](Self::work_units). A
461    /// `Custom` worker that wants the starvation / `min_work_units` gate
462    /// honored must also populate [`work_units`](Self::work_units).
463    pub iterations: u64,
464    /// Delta of /proc/self/schedstat field 2 (run_delay) over the work loop.
465    pub schedstat_run_delay_ns: u64,
466    /// Delta of /proc/self/schedstat field 3 (pcount — number of
467    /// times the task was scheduled in over the work loop). This is
468    /// NOT a context-switch count; `/proc/<pid>/status`'s
469    /// `voluntary_ctxt_switches` / `nonvoluntary_ctxt_switches` are
470    /// the true context-switch counters and are not read here.
471    pub schedstat_run_count: u64,
472    /// Delta of /proc/self/schedstat field 1 (cpu_time) over the work loop.
473    pub schedstat_cpu_time_ns: u64,
474    /// `true` when the worker reached its natural end — either the
475    /// outer work loop observed STOP and exited cleanly, or a
476    /// custom-closure payload returned from its `run` function. A
477    /// sentinel report synthesised by
478    /// [`WorkloadHandle::stop_and_collect`]'s decode-failure
479    /// fallback (see `exit_info` below) carries `false`. Lets downstream
480    /// consumers distinguish "worker ran to completion and
481    /// observed zero iterations" (`completed: true, iterations: 0`
482    /// — legitimate for pathologically short test windows) from
483    /// "worker died / timed out before recording anything"
484    /// (`completed: false, iterations: 0` — the sentinel shape).
485    pub completed: bool,
486    /// Per-NUMA-node page counts from `/proc/self/numa_maps` after workload.
487    /// Keyed by node ID. Empty when numa_maps is unavailable. numa_maps reports
488    /// the per-node RESIDENT pages of the calling task's mm. For
489    /// [`CloneMode::Fork`] workers (the scenario-engine default) each worker has
490    /// a disjoint mm, so SUMming across workers is the true cgroup page total;
491    /// for [`CloneMode::Thread`] siblings share one mm and each reports the SAME
492    /// residency, so a SUM counts shared pages once per thread. Consumers
493    /// (cgroup_stats / phase_cgroup_stats) SUM
494    /// this — correct for the Fork default; the Thread-mode caveat is inherited
495    /// identically by both reducers (no per-phase divergence).
496    pub numa_pages: BTreeMap<usize, u64>,
497    /// Delta of `/proc/vmstat` `numa_pages_migrated` over the work loop. This is
498    /// the SYSTEM-WIDE vmstat `NUMA_PAGE_MIGRATE` vm_event (summed across all
499    /// CPUs), NOT the per-task `task_struct` field of the same name surfaced in
500    /// `/proc/PID/sched`. Because every worker reads the same system-wide
501    /// counter, consumers fold it as MAX across workers (a SUM would inflate it
502    /// by the worker count) — see `PhaseCgroupStats::cross_node_migrated`. The vm_event is bumped only by
503    /// NUMA balancing (the source of NUMA page migrations), so the delta is 0 on
504    /// kernels/configs without it (a measurement-availability caveat, not a wrong
505    /// value).
506    pub vmstat_numa_pages_migrated: u64,
507    /// Diagnostic attached only to sentinel reports — populated when
508    /// `stop_and_collect` synthesized the entry because no (or
509    /// unparseable) postcard payload came back on the report pipe.
510    /// `None` on every real worker-produced report. Lets operators
511    /// distinguish the four failure shapes that all collapse to
512    /// "empty pipe + no report":
513    ///
514    /// - [`WorkerExitInfo::Exited`] with a non-zero code: worker
515    ///   reached `_exit(code)` without writing the report —
516    ///   typically the `catch_unwind` Err arm in the worker-child
517    ///   closure (panic under `panic = "unwind"`) or the 30s
518    ///   poll-start timeout's early `_exit(1)`.
519    /// - [`WorkerExitInfo::Signaled`]: worker was killed — SIGABRT
520    ///   under `panic = "abort"`, SIGKILL from the still-alive
521    ///   escalation in `stop_and_collect`, or an external signal
522    ///   (OOM killer, operator SIGKILL).
523    /// - [`WorkerExitInfo::TimedOut`]: worker never exited within the
524    ///   5s collection deadline and the WNOHANG reap observed
525    ///   `StillAlive` — escalated via SIGKILL + `waitpid(None)`.
526    /// - [`WorkerExitInfo::WaitFailed`]: `waitpid` itself returned an
527    ///   error (ECHILD / EINTR). Typically a plumbing bug — the child
528    ///   was reaped by an external signal handler, a double-reap
529    ///   regression, or the pid was recycled.
530    ///
531    /// No `skip_serializing_if`: postcard is a positional, schemaless
532    /// format — every Serialize call must emit every field in the
533    /// same order or the decoder reads the next field's bytes off
534    /// the wire (silent data corruption). The Option<…> tag itself
535    /// (one byte) is the only overhead on the live-worker path.
536    pub exit_info: Option<WorkerExitInfo>,
537    /// `true` when this worker served as the messenger for a
538    /// wake-fanout work type ([`WorkType::FutexFanOut`] or
539    /// [`WorkType::FanOutCompute`]) — the single writer that
540    /// advances the shared generation and issues `futex_wake` for
541    /// its group. `false` for receivers and for every non-fanout
542    /// work type.
543    ///
544    /// Populated from the `is_messenger` flag on the
545    /// `futex: Option<(*mut u32, bool)>` parameter threaded into
546    /// `worker_main`. A sentinel report synthesized by the
547    /// decode-failure fallback in
548    /// [`WorkloadHandle::stop_and_collect`] carries `false` via
549    /// [`Default`], matching its `completed: false` shape.
550    ///
551    /// Enables per-worker latency-participation assertions in
552    /// tests — a receiver worker produces `wake_latencies_ns`
553    /// entries while its messenger pair records wake-side work but
554    /// no wake latency. Without this field, tests had to
555    /// cross-reference per-group indexing or guess from the empty
556    /// vector — ambiguous on groups where the messenger legitimately
557    /// exits before producing a report.
558    pub is_messenger: bool,
559    /// Index of the worker group this report belongs to.
560    ///
561    /// `0` denotes the primary group described by
562    /// [`WorkloadConfig`]'s top-level `work_type` / `num_workers` /
563    /// `affinity` / `sched_policy` fields. `1..=N` denotes
564    /// composed groups in the order they appear in
565    /// [`WorkloadConfig::composed`]. Reports collected by
566    /// [`WorkloadHandle::stop_and_collect`] are tagged with the
567    /// `group_idx` of the spawning [`WorkSpec`] (or `0` for the
568    /// primary), so per-group filtering in test assertions can
569    /// cleanly partition the vector.
570    ///
571    /// Sentinel reports (synthesized on missing or undecodable
572    /// payload / panic / timeout) carry the `group_idx` of the
573    /// worker whose pid the sentinel replaces, so a "this composed
574    /// group failed" assertion still works on an outright crash.
575    pub group_idx: usize,
576    /// Rendered error from the worker's `set_thread_affinity`
577    /// call, or `None` when affinity setup succeeded (or the
578    /// worker had no affinity to apply). Populated by
579    /// `worker_main` when the pre-loop
580    /// `set_thread_affinity(tid, cpus)` returns `Err` — the
581    /// worker continues with the inherited (or kernel-default)
582    /// cpumask so the test still produces an observable outcome,
583    /// but the failure is now surfaced in the report instead of
584    /// being silently dropped via `let _ = …`. The expected
585    /// failure shape is EINVAL from a requested cpu that is
586    /// outside the cpuset cgroup's `cpus.allowed` mask or the
587    /// kernel's online mask; EPERM is reachable when a more
588    /// privileged tracer set the worker's cpus_allowed and a
589    /// container policy denies further widening. Sentinel
590    /// reports synthesised by
591    /// [`WorkloadHandle::stop_and_collect`] leave this field at
592    /// its default `None` — a worker that died before
593    /// `worker_main` ran has no affinity-error observation.
594    ///
595    /// No `skip_serializing_if`: postcard is positional and
596    /// schemaless, so every Serialize call must emit every field
597    /// in the same order — skipping a field shifts the decoder
598    /// onto the next field's bytes (silent corruption). The
599    /// Option<…> tag (one byte) is the only overhead on the
600    /// success path.
601    pub affinity_error: Option<String>,
602    /// `set_sched_policy` failure text (`{e:#}`), or `None` when the
603    /// per-worker scheduling-policy set succeeded (or the policy was the
604    /// `Normal` no-op). Load-bearing for the verifier dispatch probe: a
605    /// probe worker configured `SchedPolicy::Ext` whose
606    /// `sched_setattr(SCHED_EXT)` was rejected (e.g. the scheduler set
607    /// `scx.disallow` on it) stays SCHED_OTHER, so its `iterations`
608    /// progress does NOT prove the BPF scheduler dispatched it —
609    /// `run_and_confirm_dispatch` excludes any worker with a
610    /// `sched_policy_error` from the dispatch proof. Sentinel reports
611    /// synthesised by [`WorkloadHandle::stop_and_collect`] leave this
612    /// `None` (no policy set was attempted). Same positional-serde
613    /// reasoning as [`Self::affinity_error`]: no `skip_serializing_if`.
614    pub sched_policy_error: Option<String>,
615    /// Per-phase telemetry slices for a backdrop (persistent) worker
616    /// that spanned multiple scenario steps. EMPTY for step-local
617    /// workers and for any backdrop worker that observed no phase
618    /// boundary: the worker pushes a [`PhaseSlice`] only when the
619    /// parent-driven `phase_epoch` actually changes, so a worker whose
620    /// epoch never moved (step-local pools are never bumped) ships none
621    /// — keeping the wire empty on the common path. Each slice carries
622    /// the per-phase subset of the whole-run telemetry above, scoped to
623    /// one phase's hold window; the host expands these into per-epoch
624    /// `PhaseBucket` entries — the per-phase attribution a backdrop
625    /// worker otherwise lacks, since it is collected once with
626    /// `step_index = None`.
627    ///
628    /// Appended LAST so the positional postcard decode order of every
629    /// prior field is unchanged. `#[claim(skip)]`: there is no
630    /// test-author claim surface for the raw wire slices — assertions
631    /// run against the host-expanded `PhaseCgroupStats` carriers, which
632    /// carry their own `#[derive(Claim)]`.
633    #[claim(skip)]
634    pub phase_slices: Vec<PhaseSlice>,
635    /// Whole-run taobench COUNTER aggregate — `Some` only for a Taobench worker,
636    /// `None` otherwise. Shipped so the host can derive run-level qps/hit Rate keys
637    /// (`taobench_*_ops_per_sec` / `taobench_hit_fraction` /
638    /// `taobench_command_hit_rate`) for `--noise-adjust` spread analysis. The
639    /// per-phase `PhaseSlice::taobench` carriers feed the per-phase metrics + the
640    /// serve-latency distribution; this whole-run carrier holds COUNTERS only
641    /// ([`TaobenchStats`](crate::workload::taobench::run::TaobenchStats)) — the
642    /// serve histogram is per-phase data, and the per-phase `elapsed_ns` is
643    /// MAX-merged across concurrent threads so summing phase windows is the wrong
644    /// qps denominator, hence the engine's authoritative whole-run counter
645    /// aggregate is shipped directly. Appended LAST (after `phase_slices`) to keep
646    /// the positional postcard decode order of every prior field unchanged. `pub`
647    /// (every `WorkerReport` field is `pub`, so external custom workers can
648    /// struct-literal construct it via [`crate::workload::WorkType::custom`]; a
649    /// non-taobench worker sets `None`). `#[claim(skip)]`: framework wire plumbing,
650    /// not a test-author claim surface; assertions run against the host-derived
651    /// run-level `taobench_*` Rate metrics.
652    #[claim(skip)]
653    pub taobench_whole: Option<crate::workload::taobench::run::TaobenchStats>,
654}
655
656/// Per-phase telemetry for one backdrop worker over one scenario
657/// step's HOLD window `[StepStart(k), StepEnd(k))`. A backdrop worker
658/// spans every phase, so it accumulates a fresh slice between each
659/// parent-driven `phase_epoch` transition and finalizes it when the
660/// epoch moves (drain-on-change). Carries the per-phase subset of
661/// [`WorkerReport`]'s whole-run telemetry that has a host-side
662/// per-cgroup carrier (`PhaseCgroupStats`), so the host can pool slices
663/// across workers into per-epoch `PhaseBucket`s. Counter fields are
664/// per-phase deltas (`end_snap − start_snap`, re-baselined at each
665/// boundary); `cpus_used` and `numa_pages` are per-phase observations
666/// (gauges). Excludes `iteration_costs_ns` — that reservoir has no
667/// per-cgroup carrier at any level (it feeds only the run-level
668/// benchmark gate).
669#[derive(Debug, Clone, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
670pub struct PhaseSlice {
671    /// The 1-indexed phase this slice covers (scenario Step k → k+1;
672    /// `0` = BASELINE, `u32::MAX` = inter-step gap). The host maps this
673    /// to a `PhaseBucket.step_index`; gap- and baseline-tagged slices
674    /// have no host bucket and are discarded.
675    pub phase_epoch: u32,
676    /// CPUs this worker ran on DURING this phase.
677    pub cpus_used: BTreeSet<usize>,
678    /// Per-wakeup latency samples (ns) recorded during this phase,
679    /// reservoir-clamped to at most `MAX_PHASE_WAKE_SAMPLES` — a
680    /// smaller cap than the whole-run `MAX_WAKE_SAMPLES` so a worker
681    /// spanning many phases stays within the report pipe. The host
682    /// re-caps the merged per-cgroup pool at the larger
683    /// `MAX_WAKE_SAMPLES`.
684    pub wake_latencies_ns: Vec<u64>,
685    /// Total wake observations during this phase, INCLUDING any the
686    /// reservoir dropped — the true population for unbiased
687    /// cross-phase weighting.
688    pub wake_sample_total: u64,
689    /// Per-timer-cycle latency samples (ns) recorded during this phase by a
690    /// [`crate::workload::WorkType::TimerLatency`] worker, reservoir-clamped to
691    /// `MAX_PHASE_WAKE_SAMPLES` (like `wake_latencies_ns`). Distinct carrier so
692    /// timer latency does not blur with wake latency.
693    pub timer_latencies_ns: Vec<u64>,
694    /// Total timer-cycle observations during this phase, INCLUDING any the
695    /// reservoir dropped — the true population for unbiased cross-phase
696    /// weighting. Mirrors `wake_sample_total` for `timer_latencies_ns`.
697    pub timer_sample_total: u64,
698    /// `/proc/self/schedstat` run_delay (field 2) delta over this phase (ns).
699    pub run_delay_ns: u64,
700    /// Off-CPU time during this phase (ns): `wall_ns − cpu_time` over
701    /// the phase. Paired with `wall_ns` so the host derives the
702    /// off-CPU fraction and skips the not-measured `wall_ns == 0` case
703    /// rather than dividing by zero.
704    pub off_cpu_ns: u64,
705    /// Wall-clock duration of this phase as the worker observed it
706    /// (ns) — the denominator for the off-CPU fraction. `0` for a
707    /// phase the worker never ran in (not-measured).
708    pub wall_ns: u64,
709    /// CPU migrations observed during this phase.
710    pub migration_count: u64,
711    /// Outer-loop iterations during this phase.
712    pub iterations: u64,
713    /// `/proc/self/schedstat` cpu_time (field 1, sum_exec_runtime)
714    /// delta over this phase (ns).
715    pub schedstat_cpu_time_ns: u64,
716    /// Per-NUMA-node resident page counts observed at this phase's end
717    /// (`/proc/self/numa_maps`). A gauge, not a delta.
718    pub numa_pages: BTreeMap<usize, u64>,
719    /// `/proc/vmstat numa_pages_migrated` delta over this phase
720    /// (system-wide counter; host folds as MAX across workers).
721    pub vmstat_numa_pages_migrated: u64,
722    /// Longest checkpoint-to-checkpoint wall gap during this phase (ms).
723    pub max_gap_ms: u64,
724    /// CPU where this phase's longest gap happened — coupled with
725    /// `max_gap_ms` as an argmax pair (a bare max would desync the gap
726    /// from its CPU).
727    pub max_gap_cpu: usize,
728    /// Per-phase schbench engine metrics, present ONLY for a
729    /// `WorkType::Schbench` backdrop worker (`None` for every other work
730    /// type — the generic drain leaves it `None`). Carries the phase's merged
731    /// wakeup/request histograms + run-delay raw pairs; the host re-pools these
732    /// across workers ([`crate::workload::schbench::run::SchbenchPhaseStats`])
733    /// and derives per-phase percentiles into `PhaseBucket.metrics`.
734    /// `pub(crate)`: an internal carrier (test authors read `PhaseBucket`, not
735    /// `PhaseSlice`) whose element type is `pub(crate)`. Integer-only, so it
736    /// preserves `PhaseSlice`'s `Eq`.
737    pub(crate) schbench: Option<crate::workload::schbench::run::SchbenchPhaseStats>,
738    /// Per-phase taobench engine metrics, present ONLY for a
739    /// `WorkType::Taobench` backdrop worker (`None` for every other work type —
740    /// the generic drain leaves it `None`). Carries the phase's request- and
741    /// response-time op counters plus the open-loop serve-latency histogram
742    /// ([`crate::workload::taobench::run::TaobenchPhaseStats`]); the host derives
743    /// per-phase qps / hit-ratio / serve-latency percentiles into
744    /// `PhaseBucket.metrics`. `pub(crate)`: an internal carrier (test authors read
745    /// `PhaseBucket`, not `PhaseSlice`). `PhaseSlice`'s `Eq` is preserved because
746    /// `PlatStats` is `Eq` (the schbench carrier above already holds histograms),
747    /// not because the field is integer-only.
748    pub(crate) taobench: Option<crate::workload::taobench::run::TaobenchPhaseStats>,
749}
750
751/// Reason a sentinel [`WorkerReport`] was synthesized — attached to
752/// the report's `exit_info` field so operators can triage a missing
753/// or undecodable postcard payload without cross-referencing
754/// parent-side logs.
755///
756/// Invariant: every variant carries the `waitpid`-derived status for
757/// the worker PID as of the end of `stop_and_collect`. Ordered from
758/// most-informative (exit code) to least (plumbing failure).
759/// No `Default` impl — every variant carries observed-outcome state.
760/// A default-constructed value would have to pick one variant (TimedOut
761/// was the obvious candidate) but that risks silent-pass: a test
762/// fixture using `..Default::default()` on a struct containing
763/// `WorkerExitInfo` gets "worker never exited within the deadline,"
764/// which an operator triaging the failure would chase for minutes
765/// before realizing the value came from a missing field. Construct
766/// every WorkerExitInfo explicitly via the variant the test scenario
767/// expects (e.g. `WorkerExitInfo::Exited(0)` for a clean success).
768#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
769pub enum WorkerExitInfo {
770    /// `WIFEXITED=true` with the given exit code. Non-zero under
771    /// `panic = "unwind"` means catch_unwind caught a panic in the
772    /// worker-child closure and `_exit(1)` fired, or the 30s
773    /// parent-ready poll timed out. Zero means the worker ran to
774    /// completion but failed to write / serialize the report — a
775    /// postcard encode or pipe-write failure that didn't panic.
776    Exited(i32),
777    /// `WIFSIGNALED=true` with the given signal number. Under
778    /// `panic = "abort"` a worker panic raises SIGABRT (signal 6);
779    /// other values indicate external kill, OOM killer, or the
780    /// still-alive-escalation SIGKILL (signal 9) from this function.
781    Signaled(i32),
782    /// Worker was still running after the 5s shared collection
783    /// deadline; escalated via SIGKILL + blocking `waitpid`. The
784    /// child's final status is not retained — the reap happened past
785    /// the point where operator diagnostics would differ between a
786    /// clean timeout and a signal storm.
787    TimedOut,
788    /// `waitpid` itself returned `Err` — typically ECHILD (child
789    /// already reaped by an external signal handler or a double-reap
790    /// regression) or EINTR. Message is the rendered `errno` string.
791    WaitFailed(String),
792    /// Thread-mode worker panicked. `JoinHandle::join()` returned
793    /// `Err`; the inner payload is downcast to a `&str` / `String`
794    /// (the canonical `panic!` payload shapes) and recorded here so
795    /// the operator can triage without scraping the test log. This
796    /// variant is exclusive to [`CloneMode::Thread`] — fork workers
797    /// surface panics via `Exited(1)` or `Signaled(SIGABRT)`
798    /// depending on the panic strategy.
799    Panicked(String),
800}
801
802/// Pure mapping from a `waitpid` outcome to the diagnostic
803/// [`WorkerExitInfo`] attached to a sentinel [`WorkerReport`].
804///
805/// Split out of [`WorkloadHandle::stop_and_collect`] so the four
806/// shapes each resolve to a `WorkerExitInfo` variant without pulling
807/// in the full collection loop's state (pipe drain, SIGKILL
808/// escalation, pid lifetime). Pure input → output means the variant
809/// matrix is directly testable without spawning children.
810///
811/// Shape → variant:
812/// - `Ok(Exited(_, code))` → [`WorkerExitInfo::Exited`]
813/// - `Ok(Signaled(_, sig, _))` → [`WorkerExitInfo::Signaled`]
814/// - `Ok(StillAlive)` → [`WorkerExitInfo::TimedOut`]
815/// - `Ok(_ exotic)` → [`WorkerExitInfo::TimedOut`] (Stopped /
816///   PtraceEvent / PtraceSyscall / Continued; not reachable for a
817///   plain forked worker with no ptrace parent, but collapsed rather
818///   than silently dropped so coverage stays exhaustive)
819/// - `Err(errno)` → [`WorkerExitInfo::WaitFailed`]
820pub(super) fn classify_wait_outcome(
821    source: Result<nix::sys::wait::WaitStatus, nix::errno::Errno>,
822) -> WorkerExitInfo {
823    match source {
824        Ok(nix::sys::wait::WaitStatus::Exited(_, code)) => WorkerExitInfo::Exited(code),
825        Ok(nix::sys::wait::WaitStatus::Signaled(_, sig, _)) => WorkerExitInfo::Signaled(sig as i32),
826        Ok(nix::sys::wait::WaitStatus::StillAlive) => WorkerExitInfo::TimedOut,
827        Ok(_) => WorkerExitInfo::TimedOut,
828        Err(e) => WorkerExitInfo::WaitFailed(e.to_string()),
829    }
830}
831
832/// Extract a human-readable panic payload from a
833/// [`std::thread::Result`] `Err` value. The two canonical shapes
834/// are `&'static str` (`panic!("literal")`) and `String`
835/// (`panic!("{x}")` post-formatting); anything else falls back to
836/// a fixed sentinel.
837///
838/// Pure mapping (no IO, no allocation past `String::clone`) so the
839/// stop_and_collect path can call it on every joined-and-panicked
840/// thread without performance cliffs.
841pub(super) fn extract_panic_payload(payload: Box<dyn std::any::Any + Send>) -> String {
842    if let Some(s) = payload.downcast_ref::<&'static str>() {
843        (*s).to_string()
844    } else if let Some(s) = payload.downcast_ref::<String>() {
845        s.clone()
846    } else {
847        "<non-string panic payload>".to_string()
848    }
849}
850
851/// Wall-clock time budget for joining a thread-mode worker after
852/// its per-task `stop` has been flipped. Mirrors the fork-mode
853/// `stop_and_collect` 5s shared deadline so neither dispatch path
854/// can serially exhaust the test runtime by hanging on a single
855/// stuck worker. The 100ms `FUTEX_WAIT_TIMEOUT` inside
856/// `worker_main`'s blocking primitives means a well-behaved worker
857/// observes `stop=true` within 100ms of the parent's flip; the 5s
858/// budget covers IO drain, scheduling delays under contention, and
859/// post-loop cleanup (NUMA stat reads, schedstat snapshots).
860pub(super) const THREAD_JOIN_TIMEOUT: Duration = Duration::from_secs(5);
861
862/// Block until `join` reports finished or `timeout` elapses.
863/// Returns `Some(thread_result)` on successful join, `None` on
864/// timeout.
865///
866/// Implementation: wait on `exit_evt` (the worker's "I'm about to
867/// return" eventfd, bumped from a Drop guard inside the thread
868/// closure) via `epoll_wait` with a `timerfd` for the safety
869/// deadline. A spurious wake (e.g. EINTR or a stale eventfd-counter
870/// drain) loops back into the wait without orphaning the worker —
871/// the timerfd carries the absolute deadline.
872///
873/// Std lacks a native timed-join API; an alternative side-thread
874/// "joiner + channel" pattern would orphan the joiner on timeout
875/// (joining is non-cancellable in std), which keeps the thread
876/// alive past `WorkloadHandle::drop` and prevents process exit.
877/// The eventfd path replaces the previous 10ms sleep-poll loop
878/// without that orphan cost.
879pub(super) fn join_thread_with_timeout(
880    join: std::thread::JoinHandle<WorkerReport>,
881    exit_evt: &vmm_sys_util::eventfd::EventFd,
882    timeout: Duration,
883) -> Option<std::thread::Result<WorkerReport>> {
884    use std::os::unix::io::AsRawFd;
885    use vmm_sys_util::epoll::{ControlOperation, Epoll, EpollEvent, EventSet};
886    use vmm_sys_util::timerfd::TimerFd;
887
888    if join.is_finished() {
889        return Some(join.join());
890    }
891
892    let epoll = match Epoll::new() {
893        Ok(e) => e,
894        Err(e) => {
895            tracing::warn!(%e, "join_thread_with_timeout: epoll_create1 failed");
896            return None;
897        }
898    };
899    if let Err(e) = epoll.ctl(
900        ControlOperation::Add,
901        exit_evt.as_raw_fd(),
902        EpollEvent::new(EventSet::IN, 0),
903    ) {
904        tracing::warn!(%e, "join_thread_with_timeout: add exit_evt to epoll");
905        return None;
906    }
907    let mut timer = match TimerFd::new() {
908        Ok(t) => t,
909        Err(e) => {
910            tracing::warn!(%e, "join_thread_with_timeout: timerfd_create failed");
911            return None;
912        }
913    };
914    if let Err(e) = timer.reset(timeout, None) {
915        tracing::warn!(%e, "join_thread_with_timeout: timerfd_settime failed");
916        return None;
917    }
918    if let Err(e) = epoll.ctl(
919        ControlOperation::Add,
920        timer.as_raw_fd(),
921        EpollEvent::new(EventSet::IN, 1),
922    ) {
923        tracing::warn!(%e, "join_thread_with_timeout: add timerfd to epoll");
924        return None;
925    }
926
927    let deadline = Instant::now() + timeout;
928    let mut events = [EpollEvent::default(); 2];
929    loop {
930        if join.is_finished() {
931            return Some(join.join());
932        }
933        let remaining = deadline.saturating_duration_since(Instant::now());
934        if remaining.is_zero() {
935            return None;
936        }
937        match epoll.wait(-1, &mut events) {
938            Ok(_) => {}
939            Err(e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
940            Err(e) => {
941                tracing::warn!(%e, "join_thread_with_timeout: epoll_wait failed");
942                return None;
943            }
944        }
945    }
946}
947
948/// `worker_main`'s loop-check predicate: returns `true` when the
949/// worker should stop iterating. Reads BOTH the per-worker `stop`
950/// flag and the global [`STOP`] flag; either set request causes
951/// exit.
952///
953/// Why both:
954/// - Per-worker `stop` is what `WorkloadHandle::stop_and_collect`
955///   flips for graceful shutdown. For Fork mode the per-worker
956///   `stop` IS the global [`STOP`] (the SIGUSR1 handler flips it).
957///   For Thread mode each worker has its own `Arc<AtomicBool>`
958///   passed via `&AtomicBool`.
959/// - The global [`STOP`] is what the SIGUSR1 handler sets. For
960///   Fork mode the worker's per-process [`STOP`] is the same
961///   AtomicBool the handler writes. For Thread mode every thread
962///   shares the parent process's address space, so a SIGUSR1
963///   delivered to the parent (e.g. Ctrl-C / a test harness signal)
964///   flips the shared global [`STOP`] but NOT the per-worker
965///   `stop` Arcs. Without this disjunction, Thread workers would
966///   silently keep running through a parent-level shutdown
967///   request.
968///
969/// `#[inline]` because the call site is two atomic loads + an OR.
970/// Relaxed ordering on both reads matches every existing site —
971/// no cross-field happens-before edge to establish.
972#[inline]
973pub(super) fn stop_requested(stop: &AtomicBool) -> bool {
974    stop.load(Ordering::Relaxed) || STOP.load(Ordering::Relaxed)
975}
976
977/// Per-thread worker state for [`CloneMode::Thread`] dispatch.
978///
979/// Thread workers cannot be reaped via `waitpid` (they share a tgid
980/// with the parent), so the lifecycle uses Rust's [`std::thread`]
981/// primitives instead of pid-based syscalls:
982///
983/// - `tid` is published by the worker thread post-spawn via
984///   `gettid()` so the parent can address the kernel task for
985///   `sched_setaffinity(tid, ...)` and report it from
986///   [`WorkloadHandle::worker_pids`]. `Arc<AtomicI32>` because the
987///   thread closure owns the publisher and the parent reads it
988///   without joining.
989/// - `stop` replaces the global [`STOP`] signal-flag for thread
990///   mode: the parent flips it from
991///   [`WorkloadHandle::stop_and_collect`], the worker observes it
992///   inside `worker_main`'s `stop.load(Relaxed)` checks. SIGUSR1 is
993///   process-wide and useless for per-thread stop control.
994/// - `start_tx` is the rendezvous channel: the parent calls
995///   `send(())` from [`WorkloadHandle::start`]; the thread blocks
996///   in `recv()` until then. `Option` so `start` can take it and
997///   drop it (idempotent re-call is a no-op when `None`).
998/// - `join` holds the [`std::thread::JoinHandle`] returned by
999///   `thread::spawn`; `stop_and_collect` joins each handle to
1000///   retrieve the [`WorkerReport`]. `Option` so `stop_and_collect`
1001///   can take ownership and `Drop` does not double-join.
1002pub(super) struct ThreadWorker {
1003    tid: std::sync::Arc<std::sync::atomic::AtomicI32>,
1004    stop: std::sync::Arc<AtomicBool>,
1005    pub(super) start_tx: Option<std::sync::mpsc::SyncSender<()>>,
1006    join: Option<std::thread::JoinHandle<WorkerReport>>,
1007    /// Eventfd bumped by the worker thread's `WorkerExitSignal` Drop
1008    /// guard before the thread returns from its closure. Lets
1009    /// [`join_thread_with_timeout`] block in `epoll_wait` instead of
1010    /// sleep-polling [`std::thread::JoinHandle::is_finished`]. Counter
1011    /// mode (not semaphore) — the value never matters; only the edge
1012    /// from 0 to non-zero does. The Arc is cloned into the closure
1013    /// for the Drop guard; the parent retains the original here.
1014    exit_evt: std::sync::Arc<vmm_sys_util::eventfd::EventFd>,
1015}
1016
1017/// Defense-in-depth Drop for [`ThreadWorker`]. Rust's
1018/// [`std::thread::JoinHandle`] does NOT join its thread on drop —
1019/// it detaches, and the thread continues running until completion.
1020/// `WorkloadHandle::drop`, `WorkloadHandle::stop_and_collect`, and
1021/// `SpawnGuard::drop` already explicitly `take()` the JoinHandle and
1022/// route it through [`join_thread_with_timeout`]; this impl exists
1023/// for the case where some future refactor lets a `ThreadWorker`
1024/// fall out of scope without going through one of those paths.
1025///
1026/// Behavior: if `join` is still `Some` when this Drop fires, flip
1027/// `stop` (so the worker exits cleanly), drop `start_tx` (in case
1028/// the worker is still parked on `recv()`), and join with the
1029/// shared 5s budget. Errors / timeouts are swallowed because Drop
1030/// has nothing to assert against; the upstream paths produce the
1031/// auditable diagnostics.
1032impl Drop for ThreadWorker {
1033    fn drop(&mut self) {
1034        if let Some(j) = self.join.take() {
1035            self.stop.store(true, Ordering::Relaxed);
1036            self.start_tx.take();
1037            let _ = join_thread_with_timeout(j, &self.exit_evt, THREAD_JOIN_TIMEOUT);
1038        }
1039    }
1040}
1041
1042/// Per-fork-child report-decoding shape and bookkeeping recorded
1043/// alongside each entry in [`SpawnGuard::children`] and
1044/// [`WorkloadHandle::children`].
1045///
1046/// Two variants:
1047///
1048/// - [`ForkedChildKind::Worker`]: the conventional fork-mode worker
1049///   (one process per worker, single postcard `WorkerReport`).
1050///   Carries the worker's `group_idx` so a sentinel report on a
1051///   missing payload can be tagged correctly.
1052/// - [`ForkedChildKind::PcommContainer`]: a single thread-group
1053///   leader that owns `num_workers` worker threads across one or
1054///   more logical groups. The leader sets its own `comm` to `pcomm`
1055///   via `prctl(PR_SET_NAME)` before spawning the threads, so every
1056///   worker thread's `task->group_leader->comm` is `pcomm` exactly
1057///   (the [`WorkSpec::pcomm`] builder rejects > 15 bytes —
1058///   `TASK_COMM_LEN - 1` — so the framework never feeds the kernel
1059///   a name `__set_task_comm` would truncate). Reports are a single
1060///   `serde_json` `Vec<WorkerReport>` (one entry per worker thread;
1061///   per-thread `group_idx` lives inside each `WorkerReport`).
1062///
1063/// Note the per-variant wire-format split: `Worker` uses postcard and
1064/// `PcommContainer` uses serde_json. The encodings are dispatched by
1065/// `ForkedChildKind` at decode time on the parent.
1066#[derive(Clone, Debug)]
1067pub(super) enum ForkedChildKind {
1068    /// Conventional fork-mode worker (one process per worker).
1069    /// Report wire format: bare `b'r'` ready byte followed by a
1070    /// `postcard::to_stdvec(&WorkerReport)` document.
1071    Worker { group_idx: usize },
1072    /// pcomm thread-group leader hosting worker threads across one
1073    /// or more logical groups. `groups` records the per-group
1074    /// `(group_idx, num_workers)` layout in the order threads were
1075    /// spawned: `groups[0]` contributes the first
1076    /// `groups[0].1` worker reports, `groups[1]` the next
1077    /// `groups[1].1`, and so on. Total expected reports =
1078    /// `groups.iter().map(|(_, n)| n).sum()`. The layout drives
1079    /// sentinel distribution: when the JSON payload is missing or
1080    /// short, the parent emits sentinels tagged with the right
1081    /// per-group `group_idx` so per-group filters partition
1082    /// correctly.
1083    ///
1084    /// Report wire format: bare `b'r'` ready byte followed by a
1085    /// `serde_json::to_vec(&Vec<WorkerReport>)` document, one entry
1086    /// per worker thread (in `(group_idx, within-group index)`
1087    /// traversal order matching the `groups` layout).
1088    PcommContainer { groups: Vec<(usize, usize)> },
1089}
1090
1091/// Bookkeeping for a single forked-child process owned by the spawn
1092/// pipeline. Replaces the prior `(pid, report_fd, start_fd)` tuple so
1093/// per-child decoding metadata ([`ForkedChildKind`]) lives alongside
1094/// the pipe fds and pid.
1095#[derive(Clone, Debug)]
1096pub(super) struct ForkedChild {
1097    /// Forked child's pid (== tgid for both `Worker` and
1098    /// `PcommContainer`: the worker is its own thread-group leader,
1099    /// the container is the thread-group leader of its inner threads).
1100    pub pid: libc::pid_t,
1101    /// Parent-side read end of the report pipe. The child writes one
1102    /// `b'r'` ready byte followed by either a postcard-encoded single
1103    /// `WorkerReport` (Worker) or a `serde_json` `Vec<WorkerReport>`
1104    /// (PcommContainer) per the [`ForkedChildKind`] tag.
1105    pub report_fd: std::os::unix::io::RawFd,
1106    /// Parent-side write end of the start pipe. Closed by
1107    /// [`crate::workload::WorkloadHandle::start`] after writing the
1108    /// start byte; set to `-1` thereafter.
1109    pub start_fd: std::os::unix::io::RawFd,
1110    /// Per-child decoding shape. See [`ForkedChildKind`].
1111    pub kind: ForkedChildKind,
1112}
1113
1114/// Handle to spawned worker tasks. Workers block until
1115/// [`start()`](Self::start) is called.
1116///
1117/// The [`CloneMode`] in the [`WorkloadConfig`] selects how each
1118/// worker is created. Within one [`WorkloadHandle`] every worker
1119/// uses the same mode, so exactly one of `children` or `threads`
1120/// is populated; the other is empty. This avoids per-worker mode
1121/// dispatch on the hot path and keeps each vec's per-mode
1122/// invariants (pid-based vs JoinHandle-based reaping) cohesive.
1123///
1124/// - [`CloneMode::Fork`] populates `children` — separate process
1125///   per worker, reaped via `waitpid`, signaled via SIGUSR1.
1126/// - [`CloneMode::Thread`] populates `threads` — separate kernel
1127///   task in the parent's thread group via [`std::thread::spawn`],
1128///   joined via `JoinHandle`. Workers share the parent's tgid;
1129///   per-worker cgroup placement requires `cgroup.threads`
1130///   (cgroup v2 thread mode), which ktstr scenarios do not
1131///   currently configure — Thread-mode workers inherit the
1132///   parent's cgroup.
1133#[must_use = "dropping a WorkloadHandle immediately tears down all worker tasks"]
1134pub struct WorkloadHandle {
1135    /// Forked-child processes owned by this handle. Each entry is a
1136    /// [`ForkedChild`] carrying pid, parent-side pipe fds, and a
1137    /// [`ForkedChildKind`] tag that drives report decoding (single
1138    /// postcard `WorkerReport` for `Worker`; `serde_json`
1139    /// `Vec<WorkerReport>` for `PcommContainer`). Empty when every
1140    /// group used [`CloneMode::Thread`] without `pcomm`.
1141    children: Vec<ForkedChild>,
1142    /// Thread-mode workers. Empty when `clone_mode` is not
1143    /// [`CloneMode::Thread`].
1144    threads: Vec<ThreadWorker>,
1145    started: bool,
1146    /// Shared mmap regions for futex-based work types (one per worker group). Unmapped on drop.
1147    futex_ptrs: Vec<*mut u32>,
1148    /// Per-region byte length, parallel to `futex_ptrs`. Each
1149    /// region was sized at spawn time to its source group's
1150    /// natural width (4 for FutexPingPong / FutexFanOut /
1151    /// MutexContention / etc., 16 for FanOutCompute, 32 + Q*8 for
1152    /// ProducerConsumerImbalance — see [`futex_region_size_for`]).
1153    /// `futex_ptrs[i]` and `futex_region_sizes[i]` describe the
1154    /// same region; both are consumed pairwise on `Drop` so each
1155    /// `munmap` call receives the matching length.
1156    futex_region_sizes: Vec<usize>,
1157    /// MAP_SHARED region of per-worker iteration counters. Workers
1158    /// atomically store their iteration count; parent reads via
1159    /// `snapshot_iterations()`. Pointer to the first element; length
1160    /// is the active worker collection's len. Typed as
1161    /// `*mut AtomicU64` rather than `*mut u64` so the 8-byte
1162    /// alignment guarantee (inherited from the page-aligned
1163    /// iter_counters mmap site in `WorkloadHandle::spawn`) and the
1164    /// atomic-only-access invariant are encoded in the type system
1165    /// instead of prose. `AtomicU64` is layout-compatible with `u64`:
1166    /// `std::mem::size_of::<AtomicU64>() == std::mem::align_of::<AtomicU64>() == 8`
1167    /// on every supported target, so casting the `*mut c_void`
1168    /// returned by `mmap` to `*mut AtomicU64` is sound.
1169    iter_counters: *mut AtomicU64,
1170    /// Number of AtomicU64 slots in iter_counters (== num_workers at spawn time).
1171    iter_counter_len: usize,
1172    /// MAP_SHARED single-word phase-epoch region. The scenario engine
1173    /// stores the current 1-indexed phase here (0 = BASELINE, Step k →
1174    /// k + 1, `u32::MAX` = inter-step gap); every backdrop worker of
1175    /// this handle reads the SAME pointer (not a per-worker slot — the
1176    /// epoch is a broadcast value, all the handle's workers transition
1177    /// phases together) to attribute its per-phase `PhaseSlice`s. A
1178    /// single `AtomicU32`, not the per-worker `AtomicU64` layout of
1179    /// `iter_counters`, because one shared word suffices. Null until
1180    /// the spawn-site mmap installs it.
1181    phase_epoch: *mut AtomicU32,
1182    /// Byte length of the `phase_epoch` mmap, for `munmap` on drop
1183    /// (`size_of::<AtomicU32>()`; the kernel rounds the mapping up to a
1184    /// page).
1185    phase_epoch_bytes: usize,
1186    /// Inter-worker paired pipes `(ab, ba)` for PipeIo / CachePipe.
1187    /// Transferred from [`SpawnGuard`] on success; closed by
1188    /// [`WorkloadHandle::drop`] AFTER worker shutdown so Thread-mode
1189    /// workers (which share the parent's fd table) can finish their
1190    /// pipe ops before the close. Under Fork mode each child holds
1191    /// its own fd-table copy via `fork()`, so the parent's late
1192    /// close is a no-op for the children. Empty when `work_type`
1193    /// is neither PipeIo nor CachePipe.
1194    pipe_pairs: Vec<([i32; 2], [i32; 2])>,
1195    /// Per-chain pipe rings for `WakeChain { wake: WakeMechanism::Pipe }`.
1196    /// Outer Vec is one entry per chain (= `num_workers / depth`);
1197    /// inner Vec is `depth` pipes per chain. Same ownership rule as
1198    /// `pipe_pairs`: transferred from [`SpawnGuard`] on success,
1199    /// closed by [`WorkloadHandle::drop`] AFTER worker shutdown so
1200    /// Thread-mode chain workers don't observe `EBADF` mid-run.
1201    /// Empty when `work_type` is not `WakeChain { wake: Pipe }`.
1202    chain_pipes: Vec<Vec<[i32; 2]>>,
1203}
1204
1205/// Per-variant byte length for the MAP_SHARED futex region.
1206///
1207/// Each WorkType that needs a shared region has a fixed natural
1208/// size:
1209///
1210/// - [`WorkType::FanOutCompute`] needs 16 bytes — futex `u32` at
1211///   offset 0, wake-timestamp `u64` at offset 8.
1212/// - [`WorkType::ProducerConsumerImbalance`] needs a ring buffer:
1213///   reserve-head `u64` @ 0, tail `u64` @ 8, producer-wake `u32`
1214///   @ 16, consumer-wake `u32` @ 20, publish-head `u64` @ 24,
1215///   then `Q` × `u64` ring slots starting at offset 32. Total
1216///   bytes = `32 + Q*8`. The two head counters split the MPMC
1217///   protocol: producers CAS-advance the reserve head to claim a
1218///   unique slot index, write the slot, then in slot-index FIFO
1219///   order release-store the publish head; consumers
1220///   acquire-load the publish head so a slot is visible to a
1221///   reader only after its producer's data write
1222///   synchronizes-with the consumer through the publish-head
1223///   release. `queue_depth_target` is `u64` to match the variant;
1224///   an `as usize` truncation on a 32-bit host could silently
1225///   produce a sub-page region with a malformed queue, so the
1226///   conversion is clamped at `usize::MAX/8 - 4` to keep the
1227///   layout well-defined (one fewer slot than before to make
1228///   room for `pub_head`). Realistic configs use Q in the
1229///   hundreds-to-thousands; the clamp only triggers on a
1230///   degenerate input that itself fails admission control
1231///   elsewhere (the queue is far larger than RAM).
1232/// - [`WorkType::SignalStorm`] needs 8 bytes — two `u32` tid
1233///   slots (worker 0's tid @ offset 0, worker 1's @ offset 4).
1234/// - Everything else: `u32` (4 bytes).
1235///
1236/// Returning the same byte count for every WorkType variant lets
1237/// the caller mmap exactly what's needed for THIS group rather
1238/// than the MAX across all groups, so a small-variant group
1239/// composed alongside a large `ProducerConsumerImbalance` no
1240/// longer pays the large group's per-region overhead.
1241pub(super) fn futex_region_size_for(work_type: &WorkType) -> usize {
1242    match work_type {
1243        WorkType::FanOutCompute { .. } => 16,
1244        WorkType::ProducerConsumerImbalance {
1245            queue_depth_target, ..
1246        } => {
1247            let q = std::cmp::min(*queue_depth_target as usize, usize::MAX / 8 - 4);
1248            32 + q * 8
1249        }
1250        // SignalStorm exchanges two u32 tid slots through the region
1251        // (worker 0's tid @ offset 0, worker 1's @ offset 4), so it
1252        // needs 8 bytes, not the default single u32.
1253        WorkType::SignalStorm { .. } => 2 * std::mem::size_of::<u32>(),
1254        _ => std::mem::size_of::<u32>(),
1255    }
1256}
1257
1258/// Scope guard that owns every resource acquired during
1259/// [`WorkloadHandle::spawn`]'s partial setup. If `spawn` returns
1260/// early (via `?` or `bail!`), the guard's `Drop` kills and reaps any
1261/// already-forked children, closes every open pipe fd, and munmaps
1262/// every shared region — so a mid-setup failure never leaks fds,
1263/// zombie processes, or anonymous-shared pages.
1264///
1265/// On success, [`SpawnGuard::into_handle`] moves every live
1266/// resource — children/threads, futex regions, iter-counter
1267/// region, AND `pipe_pairs` / `chain_pipes` — into the returned
1268/// [`WorkloadHandle`]. The guard's subsequent `Drop` runs against
1269/// empty Vecs/null pointers and is a no-op on the success path.
1270/// On the early-bail path (an `?` inside `WorkloadHandle::spawn`)
1271/// the guard still owns whatever it allocated and `Drop` cleans
1272/// it all up — fds, processes, threads, mmaps. Pipe fds are
1273/// closed by the handle (not the guard) because Thread-mode
1274/// workers share the parent's fd table; closing the fds before
1275/// worker shutdown would surface as `EBADF` on every pipe op a
1276/// thread runs after spawn returns.
1277pub(super) struct SpawnGuard {
1278    /// Inter-worker paired pipes `(ab, ba)` for PipeIo/CachePipe.
1279    /// Transferred to [`WorkloadHandle`] on success; closed by the
1280    /// guard only on the early-bail path. Under Fork mode each
1281    /// child holds its own fd-table copy via `fork()`; under
1282    /// Thread mode every worker thread shares these fds with the
1283    /// parent.
1284    pipe_pairs: Vec<([i32; 2], [i32; 2])>,
1285    /// Per-chain pipe rings for `WakeChain { wake: WakeMechanism::Pipe }`. Outer
1286    /// Vec is one entry per chain (= `num_workers / depth`); inner
1287    /// Vec is `depth` pipes per chain. Pipe `i` connects stage `i`
1288    /// (writer) to stage `(i + 1) % depth` (reader). Same ownership
1289    /// shape as `pipe_pairs`: transferred to the handle on success,
1290    /// closed by the guard only on the early-bail path.
1291    chain_pipes: Vec<Vec<[i32; 2]>>,
1292    /// Shared-memory futex regions (transferred to handle on success).
1293    futex_ptrs: Vec<*mut u32>,
1294    /// Per-region byte length, parallel to `futex_ptrs`. Each
1295    /// region is sized to its source group's natural width
1296    /// (4 / 16 / 32+Q*8 — see [`futex_region_size_for`]) and
1297    /// recorded here at `spawn_group` time so munmap on Drop
1298    /// can call `libc::munmap(ptr, len)` with the matching length
1299    /// even when groups with different natural sizes co-exist.
1300    /// `futex_ptrs[i]` and `futex_region_sizes[i]` describe the
1301    /// same region.
1302    futex_region_sizes: Vec<usize>,
1303    /// Per-worker iteration counter region (transferred on success).
1304    /// Typed matches the handle field; see `WorkloadHandle::iter_counters`.
1305    iter_counters: *mut AtomicU64,
1306    iter_counter_bytes: usize,
1307    /// Per-handle phase-epoch region (transferred to handle on
1308    /// success). Single `AtomicU32` word shared by all the handle's
1309    /// workers; see `WorkloadHandle::phase_epoch`.
1310    phase_epoch: *mut AtomicU32,
1311    phase_epoch_bytes: usize,
1312    /// Already-forked children (either conventional workers or
1313    /// pcomm containers) with their parent-side pipe fds and
1314    /// per-child decoding shape (transferred to handle on success).
1315    children: Vec<ForkedChild>,
1316    /// Already-spawned thread workers (transferred on success).
1317    /// Cleanup on early-exit flips each `stop` and joins each
1318    /// thread, since threads share the parent's address space and
1319    /// must be drained cooperatively (no `kill` equivalent).
1320    threads: Vec<ThreadWorker>,
1321}
1322
1323impl SpawnGuard {
1324    fn new() -> Self {
1325        Self {
1326            pipe_pairs: Vec::new(),
1327            chain_pipes: Vec::new(),
1328            futex_ptrs: Vec::new(),
1329            futex_region_sizes: Vec::new(),
1330            iter_counters: std::ptr::null_mut(),
1331            iter_counter_bytes: 0,
1332            phase_epoch: std::ptr::null_mut(),
1333            phase_epoch_bytes: 0,
1334            children: Vec::new(),
1335            threads: Vec::new(),
1336        }
1337    }
1338
1339    /// Transfer live resources into a [`WorkloadHandle`]. Leaves the
1340    /// guard's `children`, `threads`, `futex_ptrs`,
1341    /// `futex_region_sizes`, `iter_counters`, `pipe_pairs`, and
1342    /// `chain_pipes` empty, so the guard's subsequent `Drop` is a
1343    /// no-op on the success path. The handle is now the sole owner
1344    /// of every resource — its own `Drop` closes the pipe fds
1345    /// AFTER worker shutdown completes, which is the ordering
1346    /// Thread mode requires (workers share the parent's fd table;
1347    /// closing pre-shutdown would surface as `EBADF` on every
1348    /// worker's pipe op). Fork mode is unaffected either way: each
1349    /// child holds its own fd-table copy via `fork()`, so the
1350    /// parent's close timing is invisible to the child.
1351    fn into_handle(mut self) -> WorkloadHandle {
1352        let children = std::mem::take(&mut self.children);
1353        let threads = std::mem::take(&mut self.threads);
1354        let futex_ptrs = std::mem::take(&mut self.futex_ptrs);
1355        let futex_region_sizes = std::mem::take(&mut self.futex_region_sizes);
1356        let iter_counters = std::mem::replace(&mut self.iter_counters, std::ptr::null_mut());
1357        let iter_counter_bytes = std::mem::replace(&mut self.iter_counter_bytes, 0);
1358        let iter_counter_len = iter_counter_bytes / std::mem::size_of::<AtomicU64>();
1359        let phase_epoch = std::mem::replace(&mut self.phase_epoch, std::ptr::null_mut());
1360        let phase_epoch_bytes = std::mem::replace(&mut self.phase_epoch_bytes, 0);
1361        let pipe_pairs = std::mem::take(&mut self.pipe_pairs);
1362        let chain_pipes = std::mem::take(&mut self.chain_pipes);
1363        WorkloadHandle {
1364            children,
1365            threads,
1366            started: false,
1367            futex_ptrs,
1368            futex_region_sizes,
1369            iter_counters,
1370            iter_counter_len,
1371            phase_epoch,
1372            phase_epoch_bytes,
1373            pipe_pairs,
1374            chain_pipes,
1375        }
1376    }
1377}
1378
1379impl Drop for SpawnGuard {
1380    fn drop(&mut self) {
1381        // Kill and reap any already-forked children first, so their
1382        // pipe ends are not left blocked when we close the parent
1383        // side. `nix` wrappers replace the raw libc calls — kill
1384        // returns `Result<()>` (we swallow ECHILD/ESRCH in the
1385        // already-exited case), waitpid returns `Result<WaitStatus>`
1386        // (we discard the status in the cleanup path), close returns
1387        // `Result<()>` (we swallow EBADF for fds an earlier arm may
1388        // have already closed).
1389        for child in &self.children {
1390            let npid = nix::unistd::Pid::from_raw(child.pid);
1391            let _ = nix::sys::signal::kill(npid, nix::sys::signal::Signal::SIGKILL);
1392            let _ = nix::sys::wait::waitpid(npid, None);
1393        }
1394        // Close each child's parent-side report/start fds.
1395        for child in &self.children {
1396            close_fds_silently(&[child.report_fd, child.start_fd]);
1397        }
1398        // Stop and join any partially-spawned threads. Threads
1399        // share our address space, so `kill` does not reach them
1400        // and the only safe teardown is "flip stop, drop the start
1401        // channel (in case worker is still parked on `recv`), then
1402        // join". Dropping `start_tx` causes `recv` on the worker
1403        // side to return `Err(Disconnected)`, unblocking a thread
1404        // that has not yet been signaled. After both signals
1405        // (stop=true and start_tx dropped), `worker_main`'s outer
1406        // loop exits at the next `stop.load(Relaxed)` check (max
1407        // ~100ms latency from the `FUTEX_WAIT_TIMEOUT` poll
1408        // cadence) and the thread completes. `join` returns the
1409        // partial `WorkerReport` (or `Err` on panic, which we
1410        // swallow because mid-spawn cleanup has nothing to assert).
1411        for tw in &mut self.threads {
1412            tw.stop.store(true, Ordering::Relaxed);
1413            // Drop start_tx FIRST so a worker still parked on
1414            // recv() unblocks via Disconnected.
1415            tw.start_tx.take();
1416            if let Some(j) = tw.join.take() {
1417                // SpawnGuard cleanup uses the same `THREAD_JOIN_TIMEOUT`
1418                // budget as `stop_and_collect` and `WorkloadHandle::drop`
1419                // so a stuck worker can't pin mid-spawn error recovery.
1420                // Errors (panic / timeout) are silently dropped — the
1421                // mid-spawn path has nothing to assert against beyond
1422                // not leaking, and the spawn-side bail message has
1423                // already named the failure mode that triggered cleanup.
1424                let _ = join_thread_with_timeout(j, &tw.exit_evt, THREAD_JOIN_TIMEOUT);
1425            }
1426        }
1427        // Early-bail pipe close. On the success path, into_handle
1428        // moved both `pipe_pairs` and `chain_pipes` into the handle,
1429        // so these Vecs are empty here and these loops iterate
1430        // nothing. On the early-bail path the guard still owns the
1431        // partially-allocated pipes and must close them now — the
1432        // child arm of each fork already closed any inherited
1433        // copies it held, and Thread-mode early-bail joined any
1434        // partially-spawned threads above before this loop runs.
1435        for (ab, ba) in &self.pipe_pairs {
1436            close_fds_silently(&[ab[0], ab[1], ba[0], ba[1]]);
1437        }
1438        for chain in &self.chain_pipes {
1439            for pipe in chain {
1440                close_fds_silently(&[pipe[0], pipe[1]]);
1441            }
1442        }
1443        // Munmap shared regions. `futex_ptrs[i]` and
1444        // `futex_region_sizes[i]` describe the same region, so each
1445        // munmap receives the exact length used for the matching
1446        // mmap. The two vectors are appended in lockstep inside
1447        // `spawn_group`, so they have identical lengths in every
1448        // observable state.
1449        for (&ptr, &size) in self.futex_ptrs.iter().zip(self.futex_region_sizes.iter()) {
1450            unsafe {
1451                libc::munmap(ptr as *mut libc::c_void, size);
1452            }
1453        }
1454        if !self.iter_counters.is_null() && self.iter_counter_bytes > 0 {
1455            unsafe {
1456                libc::munmap(
1457                    self.iter_counters as *mut libc::c_void,
1458                    self.iter_counter_bytes,
1459                );
1460            }
1461        }
1462        if !self.phase_epoch.is_null() && self.phase_epoch_bytes > 0 {
1463            unsafe {
1464                libc::munmap(
1465                    self.phase_epoch as *mut libc::c_void,
1466                    self.phase_epoch_bytes,
1467                );
1468            }
1469        }
1470    }
1471}
1472
1473// SAFETY: futex_ptrs, iter_counters, and phase_epoch are MAP_SHARED anonymous pages
1474// created before fork, so every forked child inherits a pointer copy
1475// of the same underlying kernel object. Children read/write their own
1476// futex word — via `std::ptr::read_volatile`/`write_volatile` for
1477// most WorkType variants, or via `AtomicU32`/`AtomicU64` references
1478// re-derived from the raw pointer for FanOutCompute, which needs
1479// release-acquire ordering to publish `wake_ns` alongside the
1480// generation advance — and atomically store into their dedicated
1481// iter_counters slot (via a shared `&AtomicU64` reference derived
1482// from the `*mut AtomicU64` region pointer); the parent reads
1483// all slots via `snapshot_iterations` and is the sole process that
1484// munmaps the region, on WorkloadHandle::drop after every child has
1485// been reaped.
1486//
1487// Per-mode aliasing rationale:
1488//
1489// - Fork mode: each forked child constructs its own process-local
1490//   `&AtomicU32`/`&AtomicU64` shared reference into the MAP_SHARED
1491//   page from the inherited raw pointer. No reference value ever
1492//   crosses a process boundary — each process synthesises its own
1493//   reference from the same underlying kernel object. Interior
1494//   mutation through a shared atomic reference is permitted by
1495//   Rust's aliasing model because `AtomicU32`/`AtomicU64` wrap an
1496//   `UnsafeCell`; the post-fork alias relation is therefore not an
1497//   aliasing-rule violation.
1498//
1499// - Thread mode: under [`CloneMode::Thread`] every worker thread
1500//   shares the parent process's single address space — the same
1501//   raw `*mut AtomicU32`/`*mut AtomicU64` pointer is dereferenced
1502//   from multiple threads concurrently, and the resulting
1503//   `&AtomicU32`/`&AtomicU64` shared references coexist for
1504//   overlapping lifetimes. This is sound for the same reason
1505//   `Arc<AtomicU64>` is sound: atomic types' `UnsafeCell`-wrapped
1506//   storage permits concurrent shared-reference access by design,
1507//   and the underlying load/store instructions are by construction
1508//   non-tearing on every supported target. No `&mut` reference is
1509//   ever materialised; every access is via the atomic API. The
1510//   MAP_SHARED region is allocated once before any worker spawns
1511//   and `munmap`ped after every worker has been joined, so the
1512//   underlying kernel object outlives every alias.
1513//
1514// `phase_epoch` follows the same MAP_SHARED aliasing rules with the
1515// roles reversed: the parent (scenario engine) is the sole WRITER of
1516// the single `AtomicU32` word (Release stores at each step boundary)
1517// and the backdrop workers are READERS (Relaxed loads). It is
1518// telemetry — no happens-before edge is required, matching the
1519// `iter_counters` Relaxed-store / Relaxed-read precedent.
1520unsafe impl Send for WorkloadHandle {}
1521unsafe impl Sync for WorkloadHandle {}
1522
1523/// Pointer-sized addresses passed across a thread-spawn boundary.
1524///
1525/// Rust's auto-`Send` inference on closures conservatively treats
1526/// `*mut T` as `!Send` even inside a wrapper struct destructured in
1527/// the closure body — the destructured field type leaks into the
1528/// closure's auto-trait check. The simplest workaround is to round-
1529/// trip the pointers through `usize` (Send + Copy) and re-cast on
1530/// the receiver side. Soundness is identical: thread-mode workers
1531/// share the parent's address space, so the addresses retain
1532/// meaning across the thread boundary, and the underlying
1533/// MAP_SHARED regions are owned by the guard / handle for the full
1534/// duration of every worker.
1535///
1536/// `SendFutexPtr` carries a (futex_address, pos) tuple wrapped in
1537/// `Option`; `None` is the "no futex required" case for work types
1538/// that don't need shared memory. `SendIterSlotPtr` carries a single
1539/// address (zero ⇒ no iter_slot publish).
1540#[derive(Clone, Copy)]
1541pub(super) struct SendFutexPtr(Option<(usize, usize)>);
1542
1543#[derive(Clone, Copy)]
1544pub(super) struct SendIterSlotPtr(usize);
1545
1546impl SendFutexPtr {
1547    fn new(p: Option<(*mut u32, usize)>) -> Self {
1548        SendFutexPtr(p.map(|(ptr, pos)| (ptr as usize, pos)))
1549    }
1550
1551    /// Re-cast back into the `*mut u32` + `pos` tuple `worker_main`
1552    /// expects.
1553    fn into_raw(self) -> Option<(*mut u32, usize)> {
1554        self.0.map(|(addr, pos)| (addr as *mut u32, pos))
1555    }
1556}
1557
1558impl SendIterSlotPtr {
1559    fn new(p: *mut AtomicU64) -> Self {
1560        SendIterSlotPtr(p as usize)
1561    }
1562
1563    fn into_raw(self) -> *mut AtomicU64 {
1564        self.0 as *mut AtomicU64
1565    }
1566}
1567
1568/// Send-safe carrier for the single shared `phase_epoch` word (the SAME
1569/// pointer for every worker in a group), mirroring [`SendIterSlotPtr`].
1570/// Round-trips the address through `usize` so a worker closure's capture
1571/// set stays `Send` (no raw-pointer field in the closure type).
1572#[derive(Clone, Copy)]
1573pub(super) struct SendPhaseEpochPtr(usize);
1574
1575impl SendPhaseEpochPtr {
1576    fn new(p: *mut AtomicU32) -> Self {
1577        SendPhaseEpochPtr(p as usize)
1578    }
1579
1580    fn into_raw(self) -> *mut AtomicU32 {
1581        self.0 as *mut AtomicU32
1582    }
1583}
1584
1585/// Per-group resolved view of [`WorkloadConfig`] used by the
1586/// spawn pipeline.
1587///
1588/// [`WorkloadHandle::spawn`] iterates one `GroupParams` per group
1589/// it spawns: the primary group (`group_idx == 0`) is built from
1590/// the top-level [`WorkloadConfig`] fields via
1591/// [`Self::primary`], and each composed [`WorkSpec`] entry is
1592/// resolved into its own `GroupParams` (with `group_idx ==
1593/// 1..=N`) via [`Self::from_composed`]. Both paths funnel through
1594/// [`Self::from_work_spec`] for the actual field copy.
1595///
1596/// `GroupParams` is the post-resolution shape — `num_workers` is a
1597/// concrete `usize` (not the `Option<usize>` that [`WorkSpec`]
1598/// carries), `affinity` is a concrete [`ResolvedAffinity`] (not
1599/// the [`AffinityIntent`] that [`WorkSpec`] carries). The spawn
1600/// pipeline operates on `GroupParams` exclusively so it never has
1601/// to deal with the unresolved intent/optional shapes that the
1602/// user-facing types expose.
1603///
1604/// `clone_mode` is shared across every group — the top-level
1605/// [`WorkloadConfig::clone_mode`] selects fork vs thread dispatch
1606/// for the entire workload, and [`WorkSpec`] carries no
1607/// `clone_mode` field of its own (composed entries inherit the
1608/// parent's mode; the [`SpawnGuard`]'s lifecycle assumes a single
1609/// dispatch path).
1610#[derive(Clone)]
1611pub(super) struct GroupParams {
1612    work_type: WorkType,
1613    sched_policy: SchedPolicy,
1614    mem_policy: MemPolicy,
1615    mpol_flags: MpolFlags,
1616    nice: Option<i32>,
1617    comm: Option<String>,
1618    uid: Option<u32>,
1619    gid: Option<u32>,
1620    numa_node: Option<u32>,
1621    affinity: ResolvedAffinity,
1622    num_workers: usize,
1623    group_idx: usize,
1624}
1625
1626impl GroupParams {
1627    /// Extract a [`GroupParams`] from a [`WorkSpec`] given the
1628    /// resolved sibling values. This is the single field-extraction
1629    /// site — both [`Self::primary`] and [`Self::from_composed`]
1630    /// funnel through here, so the field-by-field copy lives in one
1631    /// place.
1632    ///
1633    /// The caller is responsible for resolving the
1634    /// [`WorkSpec::num_workers`] `Option<usize>` to a concrete
1635    /// `usize` and the [`WorkSpec::affinity`] [`AffinityIntent`] to
1636    /// a concrete [`ResolvedAffinity`]. The remaining fields
1637    /// (`work_type`, `sched_policy`, `mem_policy`, `mpol_flags`,
1638    /// `nice`) are copied verbatim — they need no resolution
1639    /// because both [`WorkSpec`] and [`GroupParams`] carry them in
1640    /// their final runtime form (`nice` round-trips as
1641    /// `Option<i32>` so `None` continues to mean "skip the
1642    /// `setpriority(2)` call" at the spawn-time gate).
1643    fn from_work_spec(
1644        spec: &WorkSpec,
1645        group_idx: usize,
1646        resolved_affinity: ResolvedAffinity,
1647        resolved_num_workers: usize,
1648    ) -> Self {
1649        Self {
1650            work_type: spec.work_type.clone(),
1651            sched_policy: spec.sched_policy,
1652            mem_policy: spec.mem_policy.clone(),
1653            mpol_flags: spec.mpol_flags,
1654            nice: spec.nice,
1655            comm: spec.comm.as_ref().map(|c| c.to_string()),
1656            uid: spec.uid,
1657            gid: spec.gid,
1658            numa_node: spec.numa_node,
1659            affinity: resolved_affinity,
1660            num_workers: resolved_num_workers,
1661            group_idx,
1662        }
1663    }
1664
1665    /// Resolve an [`AffinityIntent`] to a [`ResolvedAffinity`] under
1666    /// the spawn-time gate: only `Inherit`, `Exact`, and
1667    /// `RandomSubset` carry enough information to resolve without
1668    /// scenario context (the caller supplies the `from` pool for
1669    /// `RandomSubset`, so per-worker sampling stays self-contained).
1670    /// Topology-aware variants (`SingleCpu`, `LlcAligned`,
1671    /// `CrossCgroup`, `SmtSiblingPair`) require a
1672    /// [`crate::topology::TestTopology`] / cpuset state that
1673    /// [`WorkloadHandle::spawn`] does not have, so they bail with an
1674    /// actionable diagnostic.
1675    ///
1676    /// `site` names the location of the affinity field for the bail
1677    /// message — `"WorkloadConfig::affinity"` for the primary group,
1678    /// `"composed[N].affinity"` for entries inside `composed`. Pinned
1679    /// across both call sites so the gate matches exactly and a
1680    /// future variant addition is rejected uniformly.
1681    pub(super) fn resolve_spawn_affinity(
1682        intent: &AffinityIntent,
1683        site: &str,
1684    ) -> Result<ResolvedAffinity> {
1685        match intent {
1686            AffinityIntent::Inherit => Ok(ResolvedAffinity::None),
1687            AffinityIntent::Exact(cpus) => {
1688                if cpus.is_empty() {
1689                    anyhow::bail!(
1690                        "{site} = AffinityIntent::Exact with empty CPU set \
1691                         would produce EINVAL from sched_setaffinity; \
1692                         use AffinityIntent::Inherit for no affinity \
1693                         constraint",
1694                    );
1695                }
1696                Ok(ResolvedAffinity::Fixed(cpus.clone()))
1697            }
1698            AffinityIntent::RandomSubset { from, count } => {
1699                if from.is_empty() {
1700                    anyhow::bail!(
1701                        "{site} = AffinityIntent::RandomSubset with empty \
1702                         pool; use AffinityIntent::Inherit for no affinity \
1703                         constraint",
1704                    );
1705                }
1706                if *count == 0 {
1707                    anyhow::bail!(
1708                        "{site} = AffinityIntent::RandomSubset with \
1709                         count=0; use AffinityIntent::Inherit for no \
1710                         affinity constraint",
1711                    );
1712                }
1713                Ok(ResolvedAffinity::Random {
1714                    from: from.clone(),
1715                    count: *count,
1716                })
1717            }
1718            AffinityIntent::SingleCpu
1719            | AffinityIntent::LlcAligned
1720            | AffinityIntent::CrossCgroup
1721            | AffinityIntent::SmtSiblingPair => {
1722                anyhow::bail!(
1723                    "{site} = {:?} requires scenario context; use \
1724                     AffinityIntent::Exact(set), \
1725                     AffinityIntent::RandomSubset {{ from, count }}, \
1726                     or AffinityIntent::Inherit when spawning directly \
1727                     via WorkloadHandle::spawn. Topology-aware variants \
1728                     resolve automatically inside #[ktstr_test] \
1729                     scenarios.",
1730                    intent,
1731                );
1732            }
1733        }
1734    }
1735
1736    /// Build the primary group's parameters from the top-level
1737    /// [`WorkloadConfig`] fields. `group_idx` is fixed to `0`.
1738    ///
1739    /// Synthesises a [`WorkSpec`] view of the top-level config
1740    /// fields and funnels through [`Self::from_work_spec`] so the
1741    /// field-by-field copy lives in exactly one place. The
1742    /// synthesised spec mirrors the resolved sibling values
1743    /// (`num_workers: Some(n)`, `affinity: Inherit`) — the spawn
1744    /// pipeline never reads it.
1745    ///
1746    /// `WorkloadConfig::affinity` is an [`AffinityIntent`]
1747    /// (type-unified with [`WorkSpec::affinity`]); resolution to
1748    /// [`ResolvedAffinity`] runs through
1749    /// [`Self::resolve_spawn_affinity`] under the same gate as
1750    /// [`Self::from_composed`]. Topology-aware variants
1751    /// (`SingleCpu`, `LlcAligned`, `CrossCgroup`, `SmtSiblingPair`)
1752    /// require scenario context; the scenario engine pre-resolves
1753    /// them via
1754    /// `crate::scenario::intent_for_spawn` (which round-trips
1755    /// `RandomSubset` verbatim and flattens topology-aware variants
1756    /// to `Exact`) before building [`WorkloadConfig`], so the gate
1757    /// only ever sees `Inherit`, `Exact`, or `RandomSubset` from
1758    /// this path.
1759    fn primary(config: &WorkloadConfig) -> Result<Self> {
1760        let resolved_affinity =
1761            Self::resolve_spawn_affinity(&config.affinity, "WorkloadConfig::affinity")?;
1762        let spec = WorkSpec {
1763            work_type: config.work_type.clone(),
1764            sched_policy: config.sched_policy,
1765            num_workers: Some(config.num_workers),
1766            affinity: AffinityIntent::Inherit,
1767            mem_policy: config.mem_policy.clone(),
1768            mpol_flags: config.mpol_flags,
1769            nice: config.nice,
1770            comm: config.comm.clone(),
1771            uid: config.uid,
1772            gid: config.gid,
1773            numa_node: config.numa_node,
1774            pcomm: None,
1775            workers_pct: None,
1776        };
1777        Ok(Self::from_work_spec(
1778            &spec,
1779            0,
1780            resolved_affinity,
1781            config.num_workers,
1782        ))
1783    }
1784
1785    /// Resolve a composed [`WorkSpec`] into per-group parameters,
1786    /// applying the spawn-time rules documented on
1787    /// [`WorkloadConfig::composed`]:
1788    ///
1789    /// - `num_workers` must be `Some(n)`; the `None` default
1790    ///   resolved by the scenario engine via
1791    ///   `Ctx::workers_per_cgroup` is unreachable here. A `None`
1792    ///   value is rejected with an actionable diagnostic.
1793    /// - `affinity` resolution runs through
1794    ///   [`Self::resolve_spawn_affinity`] —
1795    ///   [`AffinityIntent::Inherit`] (mapped to
1796    ///   [`ResolvedAffinity::None`]),
1797    ///   [`AffinityIntent::Exact`] (mapped to
1798    ///   [`ResolvedAffinity::Fixed`]), and
1799    ///   [`AffinityIntent::RandomSubset`] (mapped to
1800    ///   [`ResolvedAffinity::Random`]) are accepted; topology-aware
1801    ///   variants are rejected.
1802    ///
1803    /// Composed entries inherit the parent
1804    /// [`WorkloadConfig::clone_mode`]; [`WorkSpec`] has no
1805    /// `clone_mode` field of its own.
1806    fn from_composed(spec: &WorkSpec, group_idx: usize) -> Result<Self> {
1807        if spec.pcomm.is_some() {
1808            anyhow::bail!(
1809                "composed[{}].pcomm: pcomm via WorkloadHandle::spawn is not supported; \
1810                 use WorkloadHandle::spawn_pcomm_cgroup or CgroupDef (apply_setup) — \
1811                 spawn always forks one process per worker and never coalesces into \
1812                 a thread-group leader",
1813                group_idx - 1,
1814            );
1815        }
1816        let num_workers = spec.num_workers.ok_or_else(|| {
1817            anyhow::anyhow!(
1818                "composed[{}].num_workers must be set explicitly at spawn time \
1819                 (the Some/None resolution via Ctx::workers_per_cgroup is only \
1820                 available through the scenario engine; \
1821                 WorkloadHandle::spawn requires a concrete count)",
1822                group_idx - 1,
1823            )
1824        })?;
1825        let site = format!("composed[{}].affinity", group_idx - 1);
1826        let affinity = Self::resolve_spawn_affinity(&spec.affinity, &site)?;
1827        Ok(Self::from_work_spec(spec, group_idx, affinity, num_workers))
1828    }
1829}
1830
1831/// Shared per-group admission rules common to every spawn entry
1832/// point. Validates only the rules that are dispatch-agnostic —
1833/// `worker_group_size` divisibility, `chain_pipe_depth >= 2`,
1834/// `IdleChurn` zero-duration rejections, `IpcVariance` zero-knob
1835/// rejections. Dispatch-specific compatibility checks (CloneMode
1836/// vs WorkType, pcomm vs WorkType) stay at each entry point's
1837/// admission block since their reasoning differs per dispatch.
1838///
1839/// Called from [`WorkloadHandle::spawn`] (per-group inside
1840/// `groups`) and [`WorkloadHandle::spawn_pcomm_cgroup`] (per-group
1841/// inside its own `groups`). Centralises the rules so a future
1842/// addition (e.g. a new `WorkType` zero-rejection) lives in one
1843/// place.
1844pub(super) fn validate_workload_admission(group: &GroupParams) -> Result<()> {
1845    if let Some(group_size) = group.work_type.worker_group_size()
1846        && (group.num_workers == 0 || !group.num_workers.is_multiple_of(group_size))
1847    {
1848        return Err(WorkTypeValidationError::NonDivisibleWorkerCount {
1849            name: group.work_type.name().to_string(),
1850            group_idx: group.group_idx,
1851            group_size,
1852            num_workers: group.num_workers,
1853        }
1854        .into());
1855    }
1856    if let Some(depth) = group.work_type.chain_pipe_depth()
1857        && depth < 2
1858    {
1859        return Err(WorkTypeValidationError::InsufficientWakeChainDepth {
1860            depth,
1861            group_idx: group.group_idx,
1862        }
1863        .into());
1864    }
1865    if let WorkType::IdleChurn {
1866        burst_duration,
1867        sleep_duration,
1868        ..
1869    } = group.work_type
1870    {
1871        if burst_duration.is_zero() {
1872            return Err(WorkTypeValidationError::ZeroBurstDuration {
1873                group_idx: group.group_idx,
1874            }
1875            .into());
1876        }
1877        if sleep_duration.is_zero() {
1878            return Err(WorkTypeValidationError::ZeroSleepDuration {
1879                group_idx: group.group_idx,
1880            }
1881            .into());
1882        }
1883    }
1884    if let WorkType::IpcVariance {
1885        hot_iters,
1886        cold_iters,
1887        period_iters,
1888    } = group.work_type
1889    {
1890        if hot_iters == 0 {
1891            return Err(WorkTypeValidationError::ZeroIpcVarianceParam {
1892                field: "hot_iters",
1893                group_idx: group.group_idx,
1894            }
1895            .into());
1896        }
1897        if cold_iters == 0 {
1898            return Err(WorkTypeValidationError::ZeroIpcVarianceParam {
1899                field: "cold_iters",
1900                group_idx: group.group_idx,
1901            }
1902            .into());
1903        }
1904        if period_iters == 0 {
1905            return Err(WorkTypeValidationError::ZeroIpcVarianceParam {
1906                field: "period_iters",
1907                group_idx: group.group_idx,
1908            }
1909            .into());
1910        }
1911    }
1912    if let WorkType::TimerLatency { interval_us } = group.work_type
1913        && interval_us == 0
1914    {
1915        return Err(WorkTypeValidationError::ZeroTimerInterval {
1916            group_idx: group.group_idx,
1917        }
1918        .into());
1919    }
1920    if let WorkType::NetTraffic { frame_bytes, .. } = group.work_type
1921        && !(60..=1514).contains(&frame_bytes)
1922    {
1923        return Err(WorkTypeValidationError::NetTrafficFrameBytes {
1924            frame_bytes,
1925            group_idx: group.group_idx,
1926        }
1927        .into());
1928    }
1929    if let WorkType::IrqWake { frame_bytes, .. } = group.work_type
1930        && !(60..=1514).contains(&frame_bytes)
1931    {
1932        return Err(WorkTypeValidationError::IrqWakeFrameBytes {
1933            frame_bytes,
1934            group_idx: group.group_idx,
1935        }
1936        .into());
1937    }
1938    Ok(())
1939}
1940
1941/// Spawn a single thread-mode worker via [`std::thread::Builder`].
1942///
1943/// The thread closure runs `worker_main` directly with the same
1944/// per-worker arguments the fork dispatch passes, except `stop` is
1945/// a per-worker `Arc<AtomicBool>` instead of the global [`STOP`].
1946/// Start rendezvous uses an `mpsc::sync_channel(0)` because every
1947/// worker needs to block until the parent calls
1948/// [`WorkloadHandle::start`]; the parent then sends `()` to each
1949/// worker's `start_tx` to unblock them in order.
1950///
1951/// Also uses a capacity-1 `mpsc::sync_channel(1)` tid-publish
1952/// channel so this function blocks (up to 2 s) until the worker
1953/// thread reaches the `gettid()` publish point. That makes
1954/// [`WorkloadHandle::worker_pids`] safe to call immediately after
1955/// [`WorkloadHandle::spawn`] returns without first calling
1956/// [`WorkloadHandle::start`] — the publish happens BEFORE the
1957/// start handshake, so post-spawn tid reads always observe the
1958/// gettid() value (or spawn would have bailed with a "did not
1959/// publish gettid() within 2 s" error).
1960///
1961/// SIGUSR1 is process-wide and useless for per-thread stop control,
1962/// so this path does not install a signal handler. The parent flips
1963/// `stop` directly from [`WorkloadHandle::stop_and_collect`].
1964#[allow(clippy::too_many_arguments)]
1965pub(super) fn spawn_thread_worker(
1966    guard: &mut SpawnGuard,
1967    group: &GroupParams,
1968    affinity: Option<BTreeSet<usize>>,
1969    worker_pipe_fds: Option<(i32, i32)>,
1970    worker_futex: Option<(*mut u32, usize)>,
1971    iter_slot: *mut AtomicU64,
1972    phase_epoch: *mut AtomicU32,
1973) -> Result<()> {
1974    use std::sync::Arc;
1975    use std::sync::atomic::AtomicI32;
1976    use std::sync::mpsc;
1977
1978    // SyncSender(0) — bounded rendezvous channel. The thread blocks
1979    // in `recv()` until the parent sends `()`; if the parent drops
1980    // the sender first (mid-spawn cleanup or early bail), `recv()`
1981    // returns `Err(Disconnected)` and the closure exits cleanly.
1982    let (start_tx, start_rx) = mpsc::sync_channel::<()>(0);
1983    // tid-publish handshake. Capacity 1 so the worker's send is
1984    // non-blocking; the parent's `recv_timeout` below blocks until
1985    // the worker reaches the publish point. Without this, the
1986    // post-spawn `WorkloadHandle::worker_pids()` race-reads the
1987    // initial 0 sentinel because the worker thread hasn't been
1988    // scheduled to the `tid_thread.store(my_tid, Release)` site yet
1989    // — surfaced as a spurious `Thread SpinWait worker reported
1990    // non-positive tid=0` test failure in
1991    // tests/worker_thread_integration.rs.
1992    let (tid_pub_tx, tid_pub_rx) = mpsc::sync_channel::<()>(1);
1993    let stop = Arc::new(AtomicBool::new(false));
1994    let tid = Arc::new(AtomicI32::new(0));
1995    // Per-worker exit eventfd: bumped by a Drop guard inside the
1996    // closure right before the thread returns its `WorkerReport`. The
1997    // parent's `join_thread_with_timeout` blocks in `epoll_wait` on
1998    // this fd instead of sleep-polling `is_finished`. Created with
1999    // `EFD_NONBLOCK` so the Drop-time `write` cannot block; counter
2000    // mode so a missed read just accumulates without losing the edge.
2001    let exit_evt = Arc::new(
2002        vmm_sys_util::eventfd::EventFd::new(libc::EFD_NONBLOCK)
2003            .context("create thread-worker exit eventfd")?,
2004    );
2005
2006    // Clone Arcs for the closure. The thread takes ownership of the
2007    // closure-side handles; the parent retains the originals via
2008    // ThreadWorker for stop signaling and tid reading.
2009    let stop_thread = Arc::clone(&stop);
2010    let tid_thread = Arc::clone(&tid);
2011    let exit_evt_thread = Arc::clone(&exit_evt);
2012    let work_type = group.work_type.clone();
2013    let sched_policy = group.sched_policy;
2014    let mem_policy = group.mem_policy.clone();
2015    let mpol_flags = group.mpol_flags;
2016    let nice = group.nice;
2017    let comm = group.comm.clone();
2018    let uid = group.uid;
2019    let gid = group.gid;
2020    let numa_node = group.numa_node;
2021    let group_idx = group.group_idx;
2022    let num_workers = group.num_workers;
2023
2024    // The closure must be `Send` to cross the thread boundary.
2025    // `worker_pipe_fds` is `Option<(i32, i32)>` (Copy + Send), but
2026    // `worker_futex` and `iter_slot` are raw pointers and not
2027    // `Send` by default. The module-level `SendFutexPtr` and
2028    // `SendIterSlotPtr` newtypes round-trip the addresses through
2029    // `usize` so the closure's capture set is genuinely Send (no
2030    // raw-pointer field appears in the closure type).
2031    let futex_send = SendFutexPtr::new(worker_futex);
2032    let iter_slot_send = SendIterSlotPtr::new(iter_slot);
2033    let phase_epoch_send = SendPhaseEpochPtr::new(phase_epoch);
2034
2035    let join = std::thread::Builder::new()
2036        .name(format!("ktstr-worker-g{group_idx}-{}", guard.threads.len()))
2037        .spawn(move || {
2038            // Drop guard: signal the exit eventfd as the closure
2039            // unwinds, regardless of whether `worker_main` returned
2040            // normally or panicked. The parent's
2041            // `join_thread_with_timeout` blocks in `epoll_wait` on
2042            // this fd; a panic that bypassed the explicit signal
2043            // would otherwise leave the parent waiting until the
2044            // safety timerfd fires. Drop runs even under unwinding,
2045            // so this guard captures both the normal and panic
2046            // paths.
2047            struct WorkerExitSignal(std::sync::Arc<vmm_sys_util::eventfd::EventFd>);
2048            impl Drop for WorkerExitSignal {
2049                fn drop(&mut self) {
2050                    let _ = self.0.write(1);
2051                }
2052            }
2053            let _exit_signal = WorkerExitSignal(exit_evt_thread);
2054
2055            // Publish gettid() so the parent can address this task
2056            // for sched_setaffinity and report it from worker_pids.
2057            // gettid() is the kernel TID; getpid() would return the
2058            // shared tgid, which collides across threads.
2059            let my_tid: libc::pid_t = unsafe { libc::syscall(libc::SYS_gettid) as libc::pid_t };
2060            // Release pairs with Acquire on the parent's
2061            // `tid.load()` sites so any reader observing a non-zero
2062            // tid also sees the worker's post-start state. Cheap on
2063            // every supported target (release-store on the Arc's
2064            // underlying AtomicI32 is a single instruction).
2065            tid_thread.store(my_tid, Ordering::Release);
2066            // Notify the parent that tid is published. Capacity-1
2067            // channel means this send is non-blocking; the parent's
2068            // `recv_timeout` below blocks until this fires so
2069            // post-spawn callers see populated tids. Drop the
2070            // sender after send so any future replays are no-ops.
2071            let _ = tid_pub_tx.send(());
2072            drop(tid_pub_tx);
2073
2074            // Block on start rendezvous. `Err(_)` means the parent
2075            // dropped start_tx before sending — return a sentinel
2076            // WorkerReport without doing any work.
2077            if start_rx.recv().is_err() {
2078                return WorkerReport {
2079                    tid: my_tid,
2080                    completed: false,
2081                    group_idx,
2082                    ..WorkerReport::default()
2083                };
2084            }
2085
2086            // Re-cast usize addresses back into raw pointers for
2087            // worker_main. SAFETY: the ownership and lifetime
2088            // arguments documented on `SendFutexPtr` /
2089            // `SendIterSlotPtr` ensure these pointers are still
2090            // live when worker_main dereferences them.
2091            let futex = futex_send.into_raw();
2092            let slot = iter_slot_send.into_raw();
2093            let epoch = phase_epoch_send.into_raw();
2094
2095            worker_main(
2096                affinity,
2097                work_type,
2098                sched_policy,
2099                mem_policy,
2100                mpol_flags,
2101                nice,
2102                comm.as_deref(),
2103                uid,
2104                gid,
2105                numa_node,
2106                worker_pipe_fds,
2107                futex,
2108                slot,
2109                epoch,
2110                &stop_thread,
2111                group_idx,
2112            )
2113        })
2114        .with_context(|| {
2115            format!(
2116                "thread::spawn for worker {}/{} (group {}) failed",
2117                guard.threads.len() + 1,
2118                num_workers,
2119                group_idx,
2120            )
2121        })?;
2122
2123    // Block until the worker reaches the gettid() publish point.
2124    // 2 s deadline is generous: a fresh std::thread on a healthy
2125    // host reaches the closure body within microseconds; under
2126    // VM/CI pressure ~ms. Anything past 2 s indicates the worker
2127    // thread failed to schedule at all — surface as a typed error
2128    // rather than letting a downstream `worker_pids()` reader race-
2129    // read the initial 0 sentinel and tip over a tid-check assertion.
2130    match tid_pub_rx.recv_timeout(std::time::Duration::from_secs(2)) {
2131        Ok(()) => {}
2132        Err(mpsc::RecvTimeoutError::Timeout) => {
2133            anyhow::bail!(
2134                "spawn_thread_worker: worker {} (group {}) did not publish gettid() \
2135                 within 2 s of thread::spawn — the worker thread failed to schedule. \
2136                 Likely cause: host fd / thread-stack exhaustion, or the std runtime \
2137                 thread pool is wedged. tid stays at 0; subsequent worker_pids() \
2138                 reads would surface the sentinel.",
2139                guard.threads.len() + 1,
2140                group_idx,
2141            );
2142        }
2143        Err(mpsc::RecvTimeoutError::Disconnected) => {
2144            anyhow::bail!(
2145                "spawn_thread_worker: worker {} (group {}) closed tid-publish channel \
2146                 without sending — the closure panicked before reaching the gettid() \
2147                 publish point. Check the thread name's panic output for the cause.",
2148                guard.threads.len() + 1,
2149                group_idx,
2150            );
2151        }
2152    }
2153
2154    guard.threads.push(ThreadWorker {
2155        tid,
2156        stop,
2157        start_tx: Some(start_tx),
2158        join: Some(join),
2159        exit_evt,
2160    });
2161    Ok(())
2162}
2163
2164/// Per-group resource indices into the [`SpawnGuard`]'s flat
2165/// vectors used by [`spawn_pcomm_container`] to compute per-thread
2166/// pipe / chain / futex / iter-slot addresses inside the forked
2167/// container. Built by [`WorkloadHandle::spawn_pcomm_cgroup`] in
2168/// lockstep with the per-group resource allocation pass so the
2169/// container's threads can address their own group's resources from
2170/// the inherited shared regions.
2171#[derive(Clone, Copy, Debug)]
2172pub(super) struct PcommGroupResources {
2173    /// Offset into the iter_counters MAP_SHARED region for this
2174    /// group's first worker. Subsequent workers occupy
2175    /// `iter_offset + i` for `i in 0..num_workers`.
2176    pub iter_offset: usize,
2177    /// Index of this group's first pipe pair in `guard.pipe_pairs`
2178    /// (only meaningful when `needs_pipes` is true).
2179    pub pipe_pair_base: usize,
2180    /// Index of this group's first chain pipe in `guard.chain_pipes`
2181    /// (only meaningful when `chain_depth` is `Some(_)`).
2182    pub chain_pipes_base: usize,
2183    /// Index of this group's first futex region in `guard.futex_ptrs`
2184    /// (only meaningful when `needs_futex` is true).
2185    pub futex_ptrs_base: usize,
2186    /// True when the group's `WorkType` requires inter-worker pipe
2187    /// pairs (PipeIo / CachePipe).
2188    pub needs_pipes: bool,
2189    /// `Some(depth)` when the group's `WorkType` requires a chain
2190    /// pipe ring (WakeChain { wake: Pipe }); `None` otherwise.
2191    pub chain_depth: Option<usize>,
2192    /// True when the group's `WorkType` requires a MAP_SHARED futex
2193    /// region (FutexPingPong / FutexFanOut / FanOutCompute /
2194    /// MutexContention / etc.).
2195    pub needs_futex: bool,
2196    /// Worker-group-size for per-worker `pos` computation inside
2197    /// the futex region (e.g. 2 for FutexPingPong, the messenger /
2198    /// receiver split for FutexFanOut). Defaulted to 2 when the
2199    /// `WorkType` does not declare a group_size.
2200    pub futex_group_size: usize,
2201}
2202
2203/// Fork one thread-group leader hosting every worker thread for the
2204/// supplied groups: fork the leader, set its `comm` to `pcomm` via
2205/// `prctl(PR_SET_NAME)`, then spawn `groups[k].num_workers` worker
2206/// threads per group inside it. The [`WorkSpec::pcomm`] builder
2207/// rejects > 15 bytes (TASK_COMM_LEN-1), so the framework never
2208/// feeds the kernel a name `__set_task_comm` would truncate. Every
2209/// worker thread shares the leader's tgid, so
2210/// `task->group_leader->comm == pcomm` byte-for-byte. Models real
2211/// workloads like `chrome` (pcomm) hosting `ThreadPoolForeg` and
2212/// `GPU Process` (per-thread comm via [`WorkSpec::comm`]) or `java`
2213/// (pcomm) hosting `GC Thread` and `C2 CompilerThre`.
2214///
2215/// `groups` carries per-group [`GroupParams`] and `resources` carries
2216/// the parallel [`PcommGroupResources`] pre-built by
2217/// [`WorkloadHandle::spawn_pcomm_cgroup`] from the surrounding
2218/// per-group resource-allocation pass; the two slices have identical
2219/// length and `resources[k]` describes the indices owned by
2220/// `groups[k]`.
2221///
2222/// `container_uid` / `container_gid` are applied via `setresuid` /
2223/// `setresgid` once on the leader before spawning threads — the
2224/// leader takes the configured process credentials, and each worker
2225/// thread additionally re-applies its own merged credentials inside
2226/// `worker_main` at thread creation time (idempotent when the merge
2227/// produced the same value, which is the common case for a
2228/// CgroupDef-level default flowing through `merged_works`).
2229///
2230/// # Wire format
2231///
2232/// The parent collects via the same report pipe used by conventional
2233/// fork workers:
2234///
2235/// 1. After the start handshake, the leader writes one `b'r'` ready
2236///    byte to the report pipe after all threads are spawned, a
2237///    stronger guarantee than the conventional fork worker's
2238///    pre-loop byte.
2239/// 2. After all worker threads have joined, the leader serializes
2240///    `Vec<WorkerReport>` (one entry per worker thread, in
2241///    `(group_idx, within-group order)` traversal order) via
2242///    `serde_json::to_vec` and writes the bytes to the report pipe
2243///    before `_exit(0)`. The parent decodes via
2244///    `serde_json::from_slice::<Vec<WorkerReport>>`. The report pipe
2245///    is sized at 8 MiB; parent `read_to_end` drains concurrently
2246///    with the leader's `write_all` so payloads exceeding the pipe
2247///    size still drain correctly.
2248///
2249/// On a decode-failure or short payload, the parent emits one
2250/// sentinel report per expected worker so per-group filtering and
2251/// `assert_not_starved` see the correct cardinality.
2252///
2253/// # Lifecycle
2254///
2255/// - `PR_SET_PDEATHSIG(SIGKILL)`: when the parent dies, the kernel
2256///   sends SIGKILL to the LEADER thread (the calling task at fork
2257///   time). SIGKILL on any thread is fatal-to-tgid: the kernel
2258///   routes it through `complete_signal` → `do_group_exit` →
2259///   `zap_other_threads`, which sets `SIGNAL_GROUP_EXIT` on every
2260///   sibling thread and walks the thread list signalling each. So
2261///   the cascade to worker threads happens via
2262///   `zap_other_threads`, NOT via per-task PDEATHSIG inheritance
2263///   (PDEATHSIG itself is per-task and is cleared on clone). Net:
2264///   parent death → leader's SIGKILL → all worker threads die,
2265///   so the container cannot outlive the harness.
2266/// - `setpgid(0, 0)`: the leader becomes its own process group
2267///   leader so the parent's stop/kill `killpg` reaches any
2268///   descendants the workers spawn.
2269/// - SIGUSR1 handler installed pre-thread-spawn: any
2270///   `kill(leader_pid, SIGUSR1)` from the parent flips the leader's
2271///   STOP, which every worker thread observes via
2272///   `stop_requested(&STOP)` on its next loop check.
2273/// - `prctl(PR_SET_NAME, pcomm)`: sets the leader's `task->comm`,
2274///   which becomes `task->group_leader->comm` for every spawned
2275///   thread byte-for-byte (the [`WorkSpec::pcomm`] builder rejects
2276///   names longer than 15 bytes — `TASK_COMM_LEN - 1` — so the
2277///   framework never feeds the kernel a name `__set_task_comm`
2278///   would truncate).
2279/// - `setresuid(container_uid, ...)` / `setresgid(container_gid,
2280///   ...)` (if specified) apply once before the start-byte poll.
2281/// - Threads spawn AFTER the start byte arrives, so `start()`'s
2282///   serial start-byte writes still gate work entry (the parent
2283///   moves the leader to its cgroup before sending start).
2284#[allow(clippy::too_many_arguments)]
2285pub(super) fn spawn_pcomm_container(
2286    guard: &mut SpawnGuard,
2287    pcomm: &str,
2288    container_uid: Option<u32>,
2289    container_gid: Option<u32>,
2290    groups: &[GroupParams],
2291    resources: &[PcommGroupResources],
2292) -> Result<()> {
2293    debug_assert_eq!(
2294        groups.len(),
2295        resources.len(),
2296        "spawn_pcomm_container: groups / resources must have the same length",
2297    );
2298
2299    // Allocate the container's report and start pipes. Parent holds
2300    // report_fds[0] (read end) and start_fds[1] (write end); the
2301    // container holds the inverse ends post-fork. O_CLOEXEC matches
2302    // the defense-in-depth posture used by every other pipe in the
2303    // spawn pipeline — defends against future exec paths that could
2304    // otherwise leak the fd into a helper.
2305    let mut report_fds = [0i32; 2];
2306    if unsafe { libc::pipe2(report_fds.as_mut_ptr(), libc::O_CLOEXEC) } != 0 {
2307        anyhow::bail!(
2308            "pcomm container (pcomm={pcomm:?}): report pipe2 failed: {}",
2309            std::io::Error::last_os_error(),
2310        );
2311    }
2312    // Grow the report pipe to 8 MiB so the container's `write_all` of
2313    // the JSON-encoded `Vec<WorkerReport>` issues without blocking
2314    // for moderate worker counts. For larger payloads the parent's
2315    // `read_to_end` drains concurrently with the container's
2316    // `write_all`, so a pipe overflow degrades to extra wakeups
2317    // rather than truncation. Best-effort failure (older kernel /
2318    // EPERM) leaves the default size in place — large payloads
2319    // still drain via the concurrent flow but with more wake cycles.
2320    const REPORT_PIPE_SIZE: libc::c_int = 8 * 1024 * 1024;
2321    let prev_size = unsafe { libc::fcntl(report_fds[1], libc::F_SETPIPE_SZ, REPORT_PIPE_SIZE) };
2322    if prev_size < 0 {
2323        let err = std::io::Error::last_os_error();
2324        tracing::warn!(
2325            pcomm,
2326            requested_size = REPORT_PIPE_SIZE,
2327            %err,
2328            "F_SETPIPE_SZ on pcomm container report pipe failed; falling back \
2329             to default pipe capacity",
2330        );
2331    }
2332    let mut start_fds = [0i32; 2];
2333    if unsafe { libc::pipe2(start_fds.as_mut_ptr(), libc::O_CLOEXEC) } != 0 {
2334        unsafe {
2335            libc::close(report_fds[0]);
2336            libc::close(report_fds[1]);
2337        }
2338        anyhow::bail!(
2339            "pcomm container (pcomm={pcomm:?}): start pipe2 failed: {}",
2340            std::io::Error::last_os_error(),
2341        );
2342    }
2343
2344    // Block SIGUSR1 across fork so the container inherits a blocked
2345    // mask; the post-fork install + restore pattern matches the
2346    // conventional fork worker (see `spawn_group`'s fork dispatch
2347    // for the full rationale).
2348    let mut old_mask: libc::sigset_t = unsafe { std::mem::zeroed() };
2349    let mut block_mask: libc::sigset_t = unsafe { std::mem::zeroed() };
2350    let psm_block_rc = unsafe {
2351        libc::sigemptyset(&mut block_mask);
2352        libc::sigaddset(&mut block_mask, libc::SIGUSR1);
2353        libc::pthread_sigmask(libc::SIG_BLOCK, &block_mask, &mut old_mask)
2354    };
2355    if psm_block_rc != 0 {
2356        tracing::warn!(
2357            rc = psm_block_rc,
2358            pcomm,
2359            "pthread_sigmask(SIG_BLOCK, SIGUSR1) failed pre-fork for pcomm container; \
2360             container inherits unblocked SIGUSR1 and may terminate on default \
2361             action before installing handler",
2362        );
2363    }
2364
2365    // Sum of all groups' workers — matches `WorkerReport` cardinality
2366    // the parent expects.
2367    let total_workers: usize = groups.iter().map(|g| g.num_workers).sum();
2368
2369    let pid = unsafe { libc::fork() };
2370    match pid {
2371        -1 => {
2372            let psm_restore_rc = unsafe {
2373                libc::pthread_sigmask(libc::SIG_SETMASK, &old_mask, std::ptr::null_mut())
2374            };
2375            if psm_restore_rc != 0 {
2376                tracing::warn!(
2377                    rc = psm_restore_rc,
2378                    "pthread_sigmask(SIG_SETMASK) failed restoring mask after pcomm \
2379                     container fork failure; SIGUSR1 may remain blocked in this thread",
2380                );
2381            }
2382            unsafe {
2383                libc::close(report_fds[0]);
2384                libc::close(report_fds[1]);
2385                libc::close(start_fds[0]);
2386                libc::close(start_fds[1]);
2387            }
2388            anyhow::bail!(
2389                "pcomm container (pcomm={pcomm:?}): fork failed: {}",
2390                std::io::Error::last_os_error(),
2391            );
2392        }
2393        0 => {
2394            // Container child. Mirror the conventional fork worker's
2395            // post-fork init: PDEATHSIG, getppid orphan check,
2396            // setpgid, SIGUSR1 handler install + mask restore. This
2397            // is the exact same defensive sequence; the only
2398            // structural difference is what happens AFTER the
2399            // start-byte handshake (thread spawn instead of inline
2400            // worker_main).
2401            unsafe {
2402                libc::prctl(libc::PR_SET_PDEATHSIG, libc::SIGKILL);
2403            }
2404            if std::env::var_os(crate::KTSTR_GUEST_INIT_ENV).is_none()
2405                && unsafe { libc::getppid() } == 1
2406            {
2407                exit_child_success();
2408            }
2409            unsafe {
2410                libc::setpgid(0, 0);
2411            }
2412            STOP.store(false, Ordering::Relaxed);
2413            let sig_prev = unsafe {
2414                libc::signal(
2415                    libc::SIGUSR1,
2416                    sigusr1_handler as *const () as libc::sighandler_t,
2417                )
2418            };
2419            if sig_prev == libc::SIG_ERR {
2420                let errno = std::io::Error::last_os_error();
2421                eprintln!(
2422                    "ktstr: signal(SIGUSR1) install failed in pcomm container: {errno}; \
2423                     graceful stop unavailable, killpg escalation will reap"
2424                );
2425            }
2426            let psm_unblock_rc = unsafe {
2427                libc::pthread_sigmask(libc::SIG_SETMASK, &old_mask, std::ptr::null_mut())
2428            };
2429            if psm_unblock_rc != 0 {
2430                eprintln!(
2431                    "ktstr: pthread_sigmask(SIG_SETMASK) unblock failed in pcomm \
2432                     container: rc={psm_unblock_rc}; SIGUSR1 stays blocked, killpg \
2433                     escalation will reap"
2434                );
2435            }
2436            // Close the parent's ends. Keep report_fds[1] (the
2437            // container's write end) and start_fds[0] (the
2438            // container's read end).
2439            unsafe {
2440                libc::close(report_fds[0]);
2441                libc::close(start_fds[1]);
2442            }
2443
2444            // Inherited-fd sweep. The container forked from the
2445            // harness inherits every fd in the parent's table —
2446            // including pipe fds from OTHER live `WorkloadHandle`s
2447            // and any harness-only fds (tracing subscribers etc.).
2448            // Close everything not in the keep-set:
2449            //   - 0 / 1 / 2 (stdio).
2450            //   - `report_fds[1]` (this container's write end).
2451            //   - `start_fds[0]` (this container's read end).
2452            //   - This spawn's `pipe_pairs` (per-pair inter-worker
2453            //     fds) and `chain_pipes` (per-chain ring fds) —
2454            //     worker threads will use them once they start.
2455            //
2456            // MAP_SHARED regions (futex / iter_counters) are mmap
2457            // memory inherited via CLONE_VM, not fd entries; nothing
2458            // to keep on that axis. The sweep walks
2459            // `/proc/self/fd` once and uses raw `libc::close` on
2460            // each non-kept fd; failure to close (EBADF) is benign
2461            // — the entry is gone by the time we get to it. Heap
2462            // allocation here is acceptable post-fork (the
2463            // conventional fork worker also heap-allocates in
2464            // `worker_main`).
2465            let mut keep_fds: std::collections::BTreeSet<i32> = std::collections::BTreeSet::new();
2466            keep_fds.insert(0);
2467            keep_fds.insert(1);
2468            keep_fds.insert(2);
2469            keep_fds.insert(report_fds[1]);
2470            keep_fds.insert(start_fds[0]);
2471            for (ab, ba) in &guard.pipe_pairs {
2472                keep_fds.insert(ab[0]);
2473                keep_fds.insert(ab[1]);
2474                keep_fds.insert(ba[0]);
2475                keep_fds.insert(ba[1]);
2476            }
2477            for chain in &guard.chain_pipes {
2478                for pipe in chain {
2479                    keep_fds.insert(pipe[0]);
2480                    keep_fds.insert(pipe[1]);
2481                }
2482            }
2483            if let Ok(entries) = std::fs::read_dir("/proc/self/fd") {
2484                let mut to_close: Vec<i32> = Vec::new();
2485                for entry in entries.flatten() {
2486                    if let Some(name) = entry.file_name().to_str()
2487                        && let Ok(fd) = name.parse::<i32>()
2488                        && !keep_fds.contains(&fd)
2489                    {
2490                        to_close.push(fd);
2491                    }
2492                }
2493                for fd in to_close {
2494                    unsafe {
2495                        libc::close(fd);
2496                    }
2497                }
2498            } else {
2499                eprintln!(
2500                    "ktstr: pcomm container (pcomm={pcomm:?}): /proc/self/fd \
2501                     read_dir failed; inherited-fd sweep skipped",
2502                );
2503            }
2504
2505            // prctl(PR_SET_NAME, pcomm). Setting it on the container's
2506            // tgid leader makes `task->group_leader->comm == pcomm`
2507            // for every spawned thread (kernel/sys.c::prctl_set_name).
2508            // The WorkSpec::pcomm builder rejects empty strings,
2509            // interior NULs, and names > 15 bytes (TASK_COMM_LEN - 1)
2510            // at declaration time, so neither CString construction
2511            // nor `__set_task_comm`'s `min(strlen, TASK_COMM_LEN-1)`
2512            // truncation can fire for any value that arrives here.
2513            // `unwrap_or_default` is a defensive fall-through to an
2514            // empty CString — the kernel accepts it; the leader's
2515            // comm is set to the empty string and the failure is
2516            // surfaced to stderr.
2517            let c_pcomm = std::ffi::CString::new(pcomm).unwrap_or_default();
2518            let prctl_rc = unsafe { libc::prctl(libc::PR_SET_NAME, c_pcomm.as_ptr()) };
2519            if prctl_rc != 0 {
2520                let errno = std::io::Error::last_os_error();
2521                eprintln!(
2522                    "ktstr: prctl(PR_SET_NAME, {pcomm:?}) failed on pcomm container: {errno}",
2523                );
2524            }
2525
2526            // Apply container-level credentials. Order matters:
2527            // setresgid first (while still uid=0 we can change the
2528            // primary group), setresuid second (after the uid drop
2529            // the process loses CAP_SETGID and a later gid change
2530            // would EPERM). Failures are surfaced to stderr but do
2531            // not halt the container — the threads inside still
2532            // run with the inherited credentials, and worker_main's
2533            // own setresuid / setresgid calls (which run after this)
2534            // act as a second-line defense for any per-thread merge
2535            // that produced a different value than the container
2536            // default.
2537            if let Some(gid) = container_gid {
2538                let rc = unsafe { libc::setresgid(gid, gid, gid) };
2539                if rc != 0 {
2540                    let errno = std::io::Error::last_os_error();
2541                    eprintln!("ktstr: setresgid({gid}) failed on pcomm container: {errno}");
2542                }
2543            }
2544            if let Some(uid) = container_uid {
2545                let rc = unsafe { libc::setresuid(uid, uid, uid) };
2546                if rc != 0 {
2547                    let errno = std::io::Error::last_os_error();
2548                    eprintln!("ktstr: setresuid({uid}) failed on pcomm container: {errno}");
2549                }
2550            }
2551
2552            // Wait for the parent's start byte (poll with 30s
2553            // timeout — same budget as the conventional fork
2554            // worker). Match the conventional worker's behaviour:
2555            // on `poll <= 0` call exit_child_error so the parent's
2556            // collect path observes a missing report and emits
2557            // sentinels.
2558            let mut pfd = libc::pollfd {
2559                fd: start_fds[0],
2560                events: libc::POLLIN,
2561                revents: 0,
2562            };
2563            let ret = unsafe { libc::poll(&mut pfd, 1, 30_000) };
2564            if ret <= 0 {
2565                exit_child_error();
2566            }
2567            let mut buf = [0u8; 1];
2568            {
2569                let mut f = unsafe { std::fs::File::from_raw_fd(start_fds[0]) };
2570                let _ = f.read_exact(&mut buf);
2571                drop(f);
2572            }
2573
2574            // Unlike fork workers, the pcomm container spawns threads after
2575            // the start byte. A latched STOP from a SIGUSR1 race during mask
2576            // restore would produce N zero-work threads. Reset so threads
2577            // get a fair start.
2578            STOP.store(false, Ordering::Relaxed);
2579
2580            // Spawn worker threads for every group. Each thread
2581            // computes its own affinity / pipe-fd / futex /
2582            // iter_slot from its (group_idx, within-group index)
2583            // pair, matching the conventional fork loop's per-worker
2584            // computation but inside the container's address space
2585            // (so threads share the parent-allocated MAP_SHARED
2586            // regions and inter-worker pipes inherited via fork).
2587            // Each spawned thread gets its own `EventFd` cloned into
2588            // the closure. A `WorkerExitSignal` Drop guard inside the
2589            // closure writes 1 to that fd as the closure unwinds —
2590            // covering both the normal-return and panic paths. The
2591            // container's join phase below `epoll_wait`s on every
2592            // eventfd, so a thread's exit edge is delivered without
2593            // any polling sleep. This mirrors `join_thread_with_timeout`'s
2594            // pattern but without a timer fd: the parent's
2595            // `stop_and_collect` SIGKILL is the only timeout authority,
2596            // per the user's "event-driven only, no sleeps or
2597            // timeouts" rule.
2598            let mut joins: Vec<(
2599                std::thread::JoinHandle<WorkerReport>,
2600                std::sync::Arc<vmm_sys_util::eventfd::EventFd>,
2601            )> = Vec::with_capacity(total_workers);
2602            // Snapshot the guard's resource bookkeeping we need
2603            // inside each thread closure. The guard is borrowed
2604            // mutably by the surrounding `spawn_pcomm_cgroup` and
2605            // dropped on the parent side after the function returns;
2606            // after fork the container has its own copy of every
2607            // field, and these snapshots are clones owned by the
2608            // container's process so the closures can capture them
2609            // without aliasing the guard.
2610            let pipe_pairs_snapshot: Vec<([i32; 2], [i32; 2])> = guard.pipe_pairs.clone();
2611            let chain_pipes_snapshot: Vec<Vec<[i32; 2]>> = guard.chain_pipes.clone();
2612            let futex_ptrs_snapshot: Vec<*mut u32> = guard.futex_ptrs.clone();
2613            let iter_counters_base = guard.iter_counters;
2614            let phase_epoch_base = guard.phase_epoch;
2615
2616            for (group, res) in groups.iter().zip(resources.iter()) {
2617                let num_workers = group.num_workers;
2618                for i in 0..num_workers {
2619                    // Per-thread affinity resolution. Random-subset
2620                    // affinity samples a fresh subset per thread, so
2621                    // each call produces an independent BTreeSet.
2622                    // Fixed affinity returns the same BTreeSet for
2623                    // every thread.
2624                    //
2625                    // resolve_affinity bails on caller bugs (count==0,
2626                    // empty pool). In the pcomm container's forked-
2627                    // child context we cannot propagate Err up; the
2628                    // convention is eprintln + [`exit_child_error`]
2629                    // (the named helper used by every forked-child
2630                    // failure site in this file — start-byte
2631                    // poll-timeout bail, EventFd::new bail,
2632                    // thread::Builder::spawn bail) so the parent
2633                    // collects a missing-report rather than silently
2634                    // spawning workers without the affinity the test
2635                    // author requested. The prior degrade-to-None
2636                    // behaviour was a silent-drop bug we close here.
2637                    let affinity = match resolve_affinity(&group.affinity) {
2638                        Ok(a) => a,
2639                        Err(e) => {
2640                            eprintln!(
2641                                "ktstr: pcomm container (group {}): resolve_affinity \
2642                                 for thread {i}/{num_workers} failed: {e:#}. \
2643                                 Aborting container — refusing to spawn workers \
2644                                 without the requested affinity (no silent drops).",
2645                                group.group_idx,
2646                            );
2647                            // exit_group(2) tears down every sibling
2648                            // worker thread spawned in earlier loop
2649                            // iterations along with this one — the
2650                            // intended fail-stop behaviour (parent
2651                            // observes a missing report and surfaces
2652                            // a sentinel).
2653                            exit_child_error();
2654                        }
2655                    };
2656
2657                    let worker_pipe_fds: Option<(i32, i32)> = if res.needs_pipes {
2658                        let pair_idx = res.pipe_pair_base + i / 2;
2659                        let (ab, ba) = &pipe_pairs_snapshot[pair_idx];
2660                        if i % 2 == 0 {
2661                            Some((ba[0], ab[1]))
2662                        } else {
2663                            Some((ab[0], ba[1]))
2664                        }
2665                    } else if let Some(depth) = res.chain_depth
2666                        && depth > 0
2667                    {
2668                        let chain_idx = res.chain_pipes_base + i / depth;
2669                        let stage = i % depth;
2670                        let prev_stage = (stage + depth - 1) % depth;
2671                        let chain = &chain_pipes_snapshot[chain_idx];
2672                        Some((chain[prev_stage][0], chain[stage][1]))
2673                    } else {
2674                        None
2675                    };
2676
2677                    let worker_futex: Option<(*mut u32, usize)> = if res.needs_futex {
2678                        let futex_group_idx = res.futex_ptrs_base + i / res.futex_group_size;
2679                        let pos = i % res.futex_group_size;
2680                        Some((futex_ptrs_snapshot[futex_group_idx], pos))
2681                    } else {
2682                        None
2683                    };
2684
2685                    let iter_slot: *mut AtomicU64 = if !iter_counters_base.is_null() {
2686                        unsafe { iter_counters_base.add(res.iter_offset + i) }
2687                    } else {
2688                        std::ptr::null_mut()
2689                    };
2690
2691                    // Round raw pointers through Send-newtypes so
2692                    // the closure's auto-Send check passes (same
2693                    // wrapper pattern `spawn_thread_worker` uses).
2694                    let futex_send = SendFutexPtr::new(worker_futex);
2695                    let iter_slot_send = SendIterSlotPtr::new(iter_slot);
2696                    let phase_epoch_send = SendPhaseEpochPtr::new(phase_epoch_base);
2697
2698                    let work_type = group.work_type.clone();
2699                    let sched_policy = group.sched_policy;
2700                    let mem_policy = group.mem_policy.clone();
2701                    let mpol_flags = group.mpol_flags;
2702                    let nice = group.nice;
2703                    let comm = group.comm.clone();
2704                    let uid = group.uid;
2705                    let gid = group.gid;
2706                    let numa_node = group.numa_node;
2707                    let group_idx = group.group_idx;
2708
2709                    // Per-thread exit eventfd. Cloned into the
2710                    // closure as `Arc<EventFd>`; the closure-local
2711                    // `WorkerExitSignal` Drop guard writes 1 to the
2712                    // fd on every exit path (normal return AND
2713                    // panic). The container's join phase below
2714                    // `epoll_wait`s on every eventfd to deliver the
2715                    // exit edge without polling. EFD_NONBLOCK keeps
2716                    // the Drop-time `write` from blocking — counter
2717                    // mode means the writer just bumps the counter
2718                    // without losing the edge.
2719                    let exit_evt = match vmm_sys_util::eventfd::EventFd::new(libc::EFD_NONBLOCK) {
2720                        Ok(efd) => std::sync::Arc::new(efd),
2721                        Err(e) => {
2722                            // Hard fail: continuing with fewer
2723                            // threads silently breaks the
2724                            // workload's worker count and the
2725                            // parent's report count. exit_child_error
2726                            // so the parent's sentinel path observes
2727                            // a missing payload and emits one
2728                            // sentinel per expected report. Reuses
2729                            // the same fail-stop contract as the
2730                            // start-byte poll timeout (`ret <=
2731                            // 0`).
2732                            eprintln!(
2733                                "ktstr: pcomm container (group {}): EventFd::new for \
2734                                 worker {}/{num_workers} failed: {e}; aborting container",
2735                                group.group_idx,
2736                                i + 1,
2737                            );
2738                            exit_child_error();
2739                        }
2740                    };
2741                    let exit_evt_thread = std::sync::Arc::clone(&exit_evt);
2742
2743                    let join = std::thread::Builder::new()
2744                        .name(format!("ktstr-pcomm-g{group_idx}-{i}"))
2745                        .spawn(move || {
2746                            // Drop guard: bumps the eventfd as the
2747                            // closure unwinds. The write is
2748                            // non-blocking (EFD_NONBLOCK) and the
2749                            // counter accumulates, so a missed
2750                            // read just queues the next read's
2751                            // value — no edge loss. Drop runs
2752                            // under both normal return and
2753                            // unwinding (panic), so the container's
2754                            // join phase observes EVERY thread's
2755                            // exit, including panicked ones.
2756                            struct WorkerExitSignal(std::sync::Arc<vmm_sys_util::eventfd::EventFd>);
2757                            impl Drop for WorkerExitSignal {
2758                                fn drop(&mut self) {
2759                                    let _ = self.0.write(1);
2760                                }
2761                            }
2762                            let _exit_signal = WorkerExitSignal(exit_evt_thread);
2763
2764                            let futex = futex_send.into_raw();
2765                            let slot = iter_slot_send.into_raw();
2766                            let epoch = phase_epoch_send.into_raw();
2767                            worker_main(
2768                                affinity,
2769                                work_type,
2770                                sched_policy,
2771                                mem_policy,
2772                                mpol_flags,
2773                                nice,
2774                                comm.as_deref(),
2775                                uid,
2776                                gid,
2777                                numa_node,
2778                                worker_pipe_fds,
2779                                futex,
2780                                slot,
2781                                epoch,
2782                                &STOP,
2783                                group_idx,
2784                            )
2785                        });
2786                    match join {
2787                        Ok(j) => joins.push((j, exit_evt)),
2788                        Err(e) => {
2789                            // Hard fail: see EventFd path above.
2790                            // A partially-spawned container leaves
2791                            // the parent guessing about which
2792                            // reports are real vs missing;
2793                            // exit_child_error collapses to the
2794                            // parent's sentinel path with
2795                            // deterministic cardinality.
2796                            eprintln!(
2797                                "ktstr: pcomm container (group {}): thread::spawn for \
2798                                 worker {}/{num_workers} failed: {e}; aborting container",
2799                                group.group_idx,
2800                                i + 1,
2801                            );
2802                            exit_child_error();
2803                        }
2804                    }
2805                }
2806            }
2807
2808            // Publish the ready byte AFTER every worker thread is
2809            // alive. The parent's barrier polls every report fd for
2810            // POLLIN with a bounded deadline; the byte's correct
2811            // semantic is "the container has spawned every worker
2812            // thread and they are now running" — earlier (pre-spawn)
2813            // is observable to the parent before threads exist, which
2814            // races with `set_affinity(idx)` / `worker_pids()` calls
2815            // the harness may issue between the parent's barrier wake
2816            // and the work loop. Each thread starts work immediately
2817            // upon spawn (no per-thread handshake gate) since the
2818            // CONTAINER's start-byte poll above is the workload's
2819            // single start gate; once the container observes the
2820            // start byte, every thread it spawns is by construction
2821            // intended to run. Done as a raw `libc::write` to avoid
2822            // taking ownership of `report_fds[1]` (we still need it
2823            // for the JSON write below).
2824            let ready_byte: u8 = b'r';
2825            unsafe {
2826                libc::write(
2827                    report_fds[1],
2828                    &ready_byte as *const u8 as *const libc::c_void,
2829                    1,
2830                );
2831            }
2832
2833            // Join every thread via `epoll_wait` on per-thread exit
2834            // eventfds. Each thread's `WorkerExitSignal` Drop guard
2835            // writes 1 to its fd as the closure unwinds (normal
2836            // return AND panic), so the container's `epoll_wait`
2837            // observes EVERY thread's exit edge without polling.
2838            // No timeout: per the user's "event-driven only, no
2839            // sleeps or timeouts" rule, the container blocks in
2840            // `epoll_wait(-1)` until each fd fires. The parent's
2841            // `stop_and_collect` 5s collect deadline + SIGKILL is
2842            // the only timeout authority; if a thread hangs (no
2843            // Drop guard fires — no realistic path to that under
2844            // the closures we spawn), the parent SIGKILLs the
2845            // container, the kernel tears down every thread, and
2846            // the container exits via the signal.
2847            //
2848            // Setup-failure fallback: if `Epoll::new` or any
2849            // `epoll.ctl(Add)` fails, fall back to blocking
2850            // `JoinHandle::join` per thread — that itself is
2851            // event-driven from the kernel's perspective (the
2852            // syscall blocks on the thread's exit signal, no
2853            // userspace polling). The fallback path is rare
2854            // (epoll_create1 failure means out of fds or out of
2855            // memory, both of which are pre-existing failure modes
2856            // the conventional fork worker also has).
2857            use std::os::unix::io::AsRawFd;
2858            use vmm_sys_util::epoll::{ControlOperation, Epoll, EpollEvent, EventSet};
2859            let mut indexed_joins: Vec<(
2860                usize,
2861                std::thread::JoinHandle<WorkerReport>,
2862                std::sync::Arc<vmm_sys_util::eventfd::EventFd>,
2863            )> = joins
2864                .into_iter()
2865                .enumerate()
2866                .map(|(i, (j, evt))| (i, j, evt))
2867                .collect();
2868            let mut reports: Vec<WorkerReport> = Vec::with_capacity(indexed_joins.len());
2869
2870            let epoll_setup: Option<Epoll> = match Epoll::new() {
2871                Ok(ep) => {
2872                    let mut ok = true;
2873                    for (idx, _, evt) in &indexed_joins {
2874                        if let Err(e) = ep.ctl(
2875                            ControlOperation::Add,
2876                            evt.as_raw_fd(),
2877                            EpollEvent::new(EventSet::IN, *idx as u64),
2878                        ) {
2879                            eprintln!(
2880                                "ktstr: pcomm container (pcomm={pcomm:?}): epoll.ctl(Add) \
2881                                 for thread {idx} failed: {e}; falling back to blocking \
2882                                 join per thread",
2883                            );
2884                            ok = false;
2885                            break;
2886                        }
2887                    }
2888                    if ok { Some(ep) } else { None }
2889                }
2890                Err(e) => {
2891                    eprintln!(
2892                        "ktstr: pcomm container (pcomm={pcomm:?}): Epoll::new failed: {e}; \
2893                         falling back to blocking join per thread",
2894                    );
2895                    None
2896                }
2897            };
2898
2899            if let Some(epoll) = epoll_setup {
2900                // Event-driven join loop. `epoll_wait(-1, …)` blocks
2901                // until at least one eventfd fires, which only
2902                // happens when a thread's `WorkerExitSignal` Drop
2903                // guard runs. The wake delivers `events[k].data()`
2904                // as the index of the finished thread; we
2905                // deregister, locate the entry, and `join()` it
2906                // (the `join()` call may briefly wait for the OS
2907                // thread to fully exit after the Drop guard, but
2908                // that wait is itself event-driven inside the
2909                // kernel — no userspace polling).
2910                let mut events_buf: Vec<EpollEvent> =
2911                    vec![EpollEvent::default(); indexed_joins.len().max(1)];
2912                while !indexed_joins.is_empty() {
2913                    let n = match epoll.wait(-1, &mut events_buf) {
2914                        Ok(n) => n,
2915                        Err(e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
2916                        Err(e) => {
2917                            eprintln!(
2918                                "ktstr: pcomm container (pcomm={pcomm:?}): epoll.wait \
2919                                 failed: {e}; falling back to blocking join for the \
2920                                 remaining {} thread(s)",
2921                                indexed_joins.len(),
2922                            );
2923                            break;
2924                        }
2925                    };
2926                    for ev in &events_buf[..n] {
2927                        let target_idx = ev.data() as usize;
2928                        let pos = match indexed_joins
2929                            .iter()
2930                            .position(|(idx, _, _)| *idx == target_idx)
2931                        {
2932                            Some(p) => p,
2933                            None => continue,
2934                        };
2935                        let (idx, j, evt) = indexed_joins.swap_remove(pos);
2936                        if let Err(e) = epoll.ctl(
2937                            ControlOperation::Delete,
2938                            evt.as_raw_fd(),
2939                            EpollEvent::default(),
2940                        ) {
2941                            eprintln!(
2942                                "ktstr: pcomm container (pcomm={pcomm:?}): epoll.ctl(Del) \
2943                                 for thread {idx} failed: {e}",
2944                            );
2945                        }
2946                        let _ = evt.read();
2947                        match j.join() {
2948                            Ok(r) => reports.push(r),
2949                            Err(payload) => {
2950                                let msg = extract_panic_payload(payload);
2951                                eprintln!(
2952                                    "ktstr: pcomm container (pcomm={pcomm:?}): thread {idx} \
2953                                     panicked: {msg}",
2954                                );
2955                                reports.push(WorkerReport {
2956                                    completed: false,
2957                                    exit_info: Some(WorkerExitInfo::Panicked(msg)),
2958                                    ..WorkerReport::default()
2959                                });
2960                            }
2961                        }
2962                    }
2963                }
2964            }
2965            // Either the epoll path completed (drained
2966            // `indexed_joins`) or it bailed mid-flight; either way,
2967            // any remaining handles are joined by the kernel-blocking
2968            // `JoinHandle::join` below. `join` is event-driven inside
2969            // the kernel (futex on the thread's TID); no userspace
2970            // polling.
2971            for (idx, j, _evt) in indexed_joins {
2972                match j.join() {
2973                    Ok(r) => reports.push(r),
2974                    Err(payload) => {
2975                        let msg = extract_panic_payload(payload);
2976                        eprintln!(
2977                            "ktstr: pcomm container (pcomm={pcomm:?}): thread {idx} \
2978                             panicked: {msg}",
2979                        );
2980                        reports.push(WorkerReport {
2981                            completed: false,
2982                            exit_info: Some(WorkerExitInfo::Panicked(msg)),
2983                            ..WorkerReport::default()
2984                        });
2985                    }
2986                }
2987            }
2988
2989            // Encode and write the report stream as a single
2990            // `Vec<WorkerReport>` JSON document via `serde_json`.
2991            // serde_json for the pcomm `Vec<WorkerReport>`; the
2992            // fork-mode workers use postcard for the single
2993            // `WorkerReport`. The pcomm container is a
2994            // fork-mode child and its payload sits on the same
2995            // per-child pipe used by every other forked worker.
2996            // The parent decodes via
2997            // `serde_json::from_slice::<Vec<WorkerReport>>`. Encode
2998            // failures fall through to exit_child_success — the
2999            // parent's sentinel path handles missing / truncated
3000            // payloads.
3001            // tracing is unsafe in a forked child (the parent's
3002            // subscriber may hold a lock); use eprintln + raw fd.
3003            let bytes = match serde_json::to_vec(&reports) {
3004                Ok(v) => v,
3005                Err(e) => {
3006                    eprintln!(
3007                        "ktstr: pcomm container (pcomm={pcomm:?}): serde_json encode \
3008                         of {} reports failed: {e}",
3009                        reports.len(),
3010                    );
3011                    Vec::new()
3012                }
3013            };
3014            {
3015                let mut f = unsafe { std::fs::File::from_raw_fd(report_fds[1]) };
3016                if let Err(e) = f.write_all(&bytes) {
3017                    eprintln!(
3018                        "ktstr: pcomm container (pcomm={pcomm:?}): report write_all \
3019                         of {} bytes failed: {e}",
3020                        bytes.len(),
3021                    );
3022                }
3023                drop(f);
3024            }
3025            exit_child_success();
3026        }
3027        child_pid => {
3028            // Parent. Restore signal mask, close the wrong ends,
3029            // record the container in `guard.children` so its
3030            // lifecycle (kill on bail / collect on stop) is shared
3031            // with conventional workers.
3032            let psm_parent_restore_rc = unsafe {
3033                libc::pthread_sigmask(libc::SIG_SETMASK, &old_mask, std::ptr::null_mut())
3034            };
3035            if psm_parent_restore_rc != 0 {
3036                tracing::warn!(
3037                    rc = psm_parent_restore_rc,
3038                    "pthread_sigmask(SIG_SETMASK) failed restoring mask in parent \
3039                     post-pcomm-fork; SIGUSR1 stays blocked in this thread for \
3040                     the lifetime of the workload",
3041                );
3042            }
3043            unsafe {
3044                libc::close(report_fds[1]);
3045                libc::close(start_fds[0]);
3046            }
3047            // Per-group layout for sentinel distribution. Each
3048            // entry is `(group_idx, num_workers)`; total report
3049            // count is the sum of `num_workers` across the layout
3050            // (== `total_workers` invariant). When the parent
3051            // emits sentinels for a missing JSON payload, the
3052            // layout drives per-group_idx tagging so per-group
3053            // filters partition correctly even on a failure.
3054            let group_layout: Vec<(usize, usize)> = groups
3055                .iter()
3056                .map(|g| (g.group_idx, g.num_workers))
3057                .collect();
3058            debug_assert_eq!(
3059                group_layout.iter().map(|(_, n)| n).sum::<usize>(),
3060                total_workers,
3061                "spawn_pcomm_container: group_layout total must match total_workers",
3062            );
3063            guard.children.push(ForkedChild {
3064                pid: child_pid,
3065                report_fd: report_fds[0],
3066                start_fd: start_fds[1],
3067                kind: ForkedChildKind::PcommContainer {
3068                    groups: group_layout,
3069                },
3070            });
3071            Ok(())
3072        }
3073    }
3074}
3075
3076/// Internal dispatch shape resolved from
3077/// [`WorkloadConfig::clone_mode`] inside [`WorkloadHandle::spawn`].
3078pub(super) enum Dispatch {
3079    Fork,
3080    Thread,
3081}
3082
3083impl WorkloadHandle {
3084    /// Fork one thread-group leader hosting every worker in `works`
3085    /// as worker threads inside a single forked process. Used by
3086    /// `apply_setup` when a `CgroupDef` declares `pcomm` — every
3087    /// `WorkSpec` in the same `CgroupDef` is coalesced into one
3088    /// thread-group leader whose `task->comm` carries `pcomm`
3089    /// exactly (the [`WorkSpec::pcomm`] builder rejects > 15 bytes
3090    /// — `TASK_COMM_LEN - 1` — so the framework never feeds the
3091    /// kernel a name `__set_task_comm` would truncate). Every
3092    /// spawned thread reads its `task->group_leader->comm` as
3093    /// `pcomm` for the leader's lifetime.
3094    ///
3095    /// `works` must already be fully resolved: each entry's
3096    /// `num_workers` must be `Some(_)` and `affinity` must be
3097    /// non-topology-aware (Inherit / Exact / RandomSubset).
3098    /// `apply_setup` runs the standard scenario-engine resolution
3099    /// (`resolve_num_workers` + `intent_for_spawn`) before calling
3100    /// in.
3101    ///
3102    /// `container_uid` / `container_gid` apply to the leader's
3103    /// process credentials via `setresuid` / `setresgid` once,
3104    /// inside the forked leader, before threads spawn. Each worker
3105    /// thread additionally re-applies its merged uid/gid inside
3106    /// `worker_main` at thread creation time; for the common case
3107    /// of a single CgroupDef-level default flowing through
3108    /// `merged_works`, the per-thread call is idempotent.
3109    ///
3110    /// On `works.is_empty()` the function returns a handle with no
3111    /// children — no fork, no resource allocation. Same for the
3112    /// empty-thread case across ALL groups (every `num_workers ==
3113    /// 0`): the leader is skipped because there is no work to host.
3114    /// This matches the `pcomm_zero_workers_no_container_spawn`
3115    /// contract.
3116    ///
3117    /// Group-level admission rejects WorkType variants that conflict
3118    /// with the threaded shape (every worker shares the leader's
3119    /// tgid):
3120    ///
3121    /// - [`WorkType::ForkExit`]: a fork from a thread of a
3122    ///   multi-threaded process inherits all locks held by other
3123    ///   threads at fork time, which the child cannot release;
3124    ///   safe-but-degraded use cases would still need
3125    ///   `CloneMode::Fork` for clean lock-free child state.
3126    /// - [`WorkType::CgroupChurn`]: writing the worker tid to
3127    ///   `cgroup.procs` migrates the entire leader tgid (every
3128    ///   sibling thread) — the test loses control of its own
3129    ///   cgroup placement.
3130    pub fn spawn_pcomm_cgroup(
3131        pcomm: &str,
3132        container_uid: Option<u32>,
3133        container_gid: Option<u32>,
3134        works: &[WorkSpec],
3135    ) -> Result<Self> {
3136        // Empty pcomm input is treated as "no pcomm" by the
3137        // apply_setup caller; reject here as a safety belt — every
3138        // production caller has already filtered empty strings.
3139        if pcomm.is_empty() {
3140            anyhow::bail!(
3141                "spawn_pcomm_cgroup: pcomm must be a non-empty string; \
3142                 the caller (apply_setup) treats `Some(\"\")` as \
3143                 `None` and falls through to the conventional fork \
3144                 spawn — empty here is a programmer error.",
3145            );
3146        }
3147
3148        // Structural mem_policy validation per-spec — same gate as
3149        // [`WorkloadHandle::spawn`]'s [`WorkloadConfig::validate`],
3150        // applied at the second public spawn entry point. Closes the
3151        // bypass that would otherwise allow invalid mem_policy via
3152        // this entry to reach `apply_mempolicy_with_flags`'s silent-
3153        // skip arm. The `exit_group(2)` thread-mode unsoundness
3154        // documented on [`WorkloadConfig::validate`] does not apply
3155        // here (pcomm-container workers run inside a forked container
3156        // with a new tgid distinct from the test runner's — the
3157        // container forks once via `libc::fork`, then workers are
3158        // spawned as threads sharing the container's tgid; a
3159        // hypothetical inner `_exit(1)` would only tear down the
3160        // container, not the test runner), but the validation
3161        // symmetry between the two pub entries prevents future
3162        // refactors from inadvertently re-opening the bypass.
3163        for (i, spec) in works.iter().enumerate() {
3164            spec.mem_policy.validate().map_err(|e| {
3165                anyhow::anyhow!("WorkloadHandle::spawn_pcomm_cgroup: works[{i}].mem_policy: {e}",)
3166            })?;
3167        }
3168
3169        // Build a `GroupParams` per `WorkSpec` using the same
3170        // resolver `WorkloadHandle::spawn` uses for composed
3171        // entries. group_idx is the entry's position in `works` (0-
3172        // based); the parent has no notion of "primary" inside a
3173        // pcomm container — every entry is a peer.
3174        let mut groups: Vec<GroupParams> = Vec::with_capacity(works.len());
3175        for (i, spec) in works.iter().enumerate() {
3176            let num_workers = spec.num_workers.ok_or_else(|| {
3177                anyhow::anyhow!(
3178                    "spawn_pcomm_cgroup: works[{i}].num_workers must be set explicitly \
3179                     (apply_setup runs resolve_num_workers before calling in)",
3180                )
3181            })?;
3182            let site = format!("works[{i}].affinity");
3183            let affinity = GroupParams::resolve_spawn_affinity(&spec.affinity, &site)?;
3184            groups.push(GroupParams::from_work_spec(spec, i, affinity, num_workers));
3185        }
3186
3187        // Per-group admission. Mirrors `WorkloadHandle::spawn` for
3188        // shared rules (worker_group_size divisibility,
3189        // chain-depth-< 2, IdleChurn / IpcVariance zero rejection)
3190        // and adds pcomm-specific rejections (ForkExit,
3191        // CgroupChurn) since the container's threaded shape —
3192        // every worker is a thread of one forked tgid sharing one
3193        // fd table — introduces hazards those variants cannot
3194        // tolerate.
3195        for group in &groups {
3196            if matches!(group.work_type, WorkType::ForkExit) {
3197                anyhow::bail!(
3198                    "WorkSpec::pcomm is incompatible with WorkType::ForkExit \
3199                     (works[{}]): a fork from a thread of a multi-threaded \
3200                     container inherits all locks held by sibling threads at \
3201                     fork time, producing undefined behaviour for any libc \
3202                     primitive in the child. Drop pcomm or pick a different \
3203                     work type.",
3204                    group.group_idx,
3205                );
3206            }
3207            if matches!(group.work_type, WorkType::CgroupChurn { .. }) {
3208                anyhow::bail!(
3209                    "WorkSpec::pcomm is incompatible with WorkType::CgroupChurn \
3210                     (works[{}]): CgroupChurn writes the worker tid to \
3211                     `cgroup.procs`, which the kernel resolves to the whole \
3212                     tgid and migrates the entire pcomm container (every \
3213                     sibling thread). Drop pcomm or pick a different work type.",
3214                    group.group_idx,
3215                );
3216            }
3217            validate_workload_admission(group)?;
3218        }
3219
3220        // No-work shortcut: every group has zero workers (or the
3221        // works list itself is empty). Fork would produce a
3222        // container that blocks on the start byte forever and
3223        // outlives the test handle; skip it. The handle still
3224        // has a sensible Drop (empty children/threads/regions).
3225        let total_workers: usize = groups.iter().map(|g| g.num_workers).sum();
3226        if total_workers == 0 {
3227            return Ok(SpawnGuard::new().into_handle());
3228        }
3229
3230        // All failable acquisitions route through `guard`. If any
3231        // `?` returns early, the guard's Drop SIGKILLs+reaps any
3232        // already-forked container, closes open pipe fds, and
3233        // munmaps the shared regions — so no leak on a mid-spawn
3234        // error path.
3235        let mut guard = SpawnGuard::new();
3236
3237        // Per-worker iteration counter region (MAP_SHARED). Sized
3238        // for ALL groups' workers laid out contiguously; matches
3239        // the `WorkloadHandle::spawn` allocation but with a
3240        // pcomm-specific diagnostic prefix.
3241        let size = total_workers * std::mem::size_of::<AtomicU64>();
3242        let ptr = unsafe {
3243            libc::mmap(
3244                std::ptr::null_mut(),
3245                size,
3246                libc::PROT_READ | libc::PROT_WRITE,
3247                libc::MAP_SHARED | libc::MAP_ANONYMOUS,
3248                -1,
3249                0,
3250            )
3251        };
3252        if ptr == libc::MAP_FAILED {
3253            let errno = std::io::Error::last_os_error();
3254            let hint = mmap_shared_anon_errno_hint(errno.raw_os_error());
3255            anyhow::bail!(
3256                "mmap(MAP_SHARED|MAP_ANONYMOUS, {size} bytes) for the \
3257                 per-worker iter_counters region failed: {errno}{hint}; \
3258                 this region holds one AtomicU64 per worker thread \
3259                 ({total_workers} thread(s) inside the pcomm={pcomm:?} \
3260                 container) so the parent can snapshot iteration \
3261                 counts via `snapshot_iterations()`.",
3262            );
3263        }
3264        guard.iter_counters = ptr as *mut AtomicU64;
3265        guard.iter_counter_bytes = size;
3266
3267        // Single shared phase-epoch word (MAP_SHARED, zero-init to
3268        // 0 = BASELINE) — same role as in `WorkloadHandle::spawn`: the
3269        // scenario engine bumps it per step boundary and backdrop
3270        // workers read it for per-phase attribution.
3271        let epoch_size = std::mem::size_of::<AtomicU32>();
3272        let epoch_ptr = unsafe {
3273            libc::mmap(
3274                std::ptr::null_mut(),
3275                epoch_size,
3276                libc::PROT_READ | libc::PROT_WRITE,
3277                libc::MAP_SHARED | libc::MAP_ANONYMOUS,
3278                -1,
3279                0,
3280            )
3281        };
3282        if epoch_ptr == libc::MAP_FAILED {
3283            let errno = std::io::Error::last_os_error();
3284            let hint = mmap_shared_anon_errno_hint(errno.raw_os_error());
3285            anyhow::bail!(
3286                "mmap(MAP_SHARED|MAP_ANONYMOUS, {epoch_size} bytes) for the \
3287                 phase-epoch word (pcomm={pcomm:?}) failed: {errno}{hint}; \
3288                 this single AtomicU32 broadcasts the current phase to \
3289                 backdrop workers for per-phase attribution.",
3290            );
3291        }
3292        guard.phase_epoch = epoch_ptr as *mut AtomicU32;
3293        guard.phase_epoch_bytes = epoch_size;
3294
3295        // Per-group resource allocation (pipe pairs, chain pipes,
3296        // futex regions). Same shape as `WorkloadHandle::spawn_group`'s
3297        // prologue but inlined here so the per-group bookkeeping
3298        // (`PcommGroupResources`) is built directly for
3299        // `spawn_pcomm_container`. `iter_offset` runs across groups
3300        // and tracks each group's first iter_slot.
3301        let mut resources: Vec<PcommGroupResources> = Vec::with_capacity(groups.len());
3302        let mut iter_offset: usize = 0;
3303        for group in &groups {
3304            let needs_pipes = matches!(
3305                group.work_type,
3306                WorkType::PipeIo { .. } | WorkType::CachePipe { .. }
3307            );
3308            let chain_depth = group.work_type.chain_pipe_depth();
3309            let needs_futex = group.work_type.needs_shared_mem();
3310            let pipe_pair_base = guard.pipe_pairs.len();
3311            let chain_pipes_base = guard.chain_pipes.len();
3312            let futex_ptrs_base = guard.futex_ptrs.len();
3313            let futex_region_size = futex_region_size_for(&group.work_type);
3314            let futex_group_size = group.work_type.worker_group_size().unwrap_or(2);
3315
3316            if needs_pipes {
3317                for _ in 0..group.num_workers / 2 {
3318                    let mut ab = [0i32; 2];
3319                    if unsafe { libc::pipe2(ab.as_mut_ptr(), libc::O_CLOEXEC) } != 0 {
3320                        anyhow::bail!(
3321                            "pipe2 (pcomm={pcomm:?}, group {}) failed: {}",
3322                            group.group_idx,
3323                            std::io::Error::last_os_error(),
3324                        );
3325                    }
3326                    let mut ba = [0i32; 2];
3327                    if unsafe { libc::pipe2(ba.as_mut_ptr(), libc::O_CLOEXEC) } != 0 {
3328                        unsafe {
3329                            libc::close(ab[0]);
3330                            libc::close(ab[1]);
3331                        }
3332                        anyhow::bail!(
3333                            "pipe2 (pcomm={pcomm:?}, group {}) failed: {}",
3334                            group.group_idx,
3335                            std::io::Error::last_os_error(),
3336                        );
3337                    }
3338                    guard.pipe_pairs.push((ab, ba));
3339                }
3340            }
3341
3342            if let Some(depth) = chain_depth
3343                && depth > 0
3344                && group.num_workers >= depth
3345            {
3346                let chains = group.num_workers / depth;
3347                for _ in 0..chains {
3348                    let mut chain: Vec<[i32; 2]> = Vec::with_capacity(depth);
3349                    let mut alloc_ok = true;
3350                    for _ in 0..depth {
3351                        let mut p = [0i32; 2];
3352                        if unsafe { libc::pipe2(p.as_mut_ptr(), libc::O_CLOEXEC) } != 0 {
3353                            alloc_ok = false;
3354                            break;
3355                        }
3356                        chain.push(p);
3357                    }
3358                    if !alloc_ok {
3359                        for p in &chain {
3360                            unsafe {
3361                                libc::close(p[0]);
3362                                libc::close(p[1]);
3363                            }
3364                        }
3365                        anyhow::bail!(
3366                            "WakeChain pipe2 (pcomm={pcomm:?}, group {}) failed: {}",
3367                            group.group_idx,
3368                            std::io::Error::last_os_error(),
3369                        );
3370                    }
3371                    guard.chain_pipes.push(chain);
3372                }
3373            }
3374
3375            if needs_futex {
3376                for _ in 0..group.num_workers / futex_group_size {
3377                    let region = unsafe {
3378                        libc::mmap(
3379                            std::ptr::null_mut(),
3380                            futex_region_size,
3381                            libc::PROT_READ | libc::PROT_WRITE,
3382                            libc::MAP_SHARED | libc::MAP_ANONYMOUS,
3383                            -1,
3384                            0,
3385                        )
3386                    };
3387                    if region == libc::MAP_FAILED {
3388                        let errno = std::io::Error::last_os_error();
3389                        let hint = mmap_shared_anon_errno_hint(errno.raw_os_error());
3390                        anyhow::bail!(
3391                            "mmap(MAP_SHARED|MAP_ANONYMOUS, {futex_region_size} bytes) \
3392                             for a futex shared-memory region (pcomm={pcomm:?}, group {}) \
3393                             failed: {errno}{hint}",
3394                            group.group_idx,
3395                        );
3396                    }
3397                    unsafe {
3398                        std::ptr::write_bytes(region as *mut u8, 0, futex_region_size);
3399                    }
3400                    guard.futex_ptrs.push(region as *mut u32);
3401                    guard.futex_region_sizes.push(futex_region_size);
3402                }
3403            }
3404
3405            resources.push(PcommGroupResources {
3406                iter_offset,
3407                pipe_pair_base,
3408                chain_pipes_base,
3409                futex_ptrs_base,
3410                needs_pipes,
3411                chain_depth,
3412                needs_futex,
3413                futex_group_size,
3414            });
3415            iter_offset += group.num_workers;
3416        }
3417
3418        // Fork the container and spawn its threads. On success the
3419        // container is registered in `guard.children`; on failure the
3420        // guard's Drop reaps it.
3421        spawn_pcomm_container(
3422            &mut guard,
3423            pcomm,
3424            container_uid,
3425            container_gid,
3426            &groups,
3427            &resources,
3428        )?;
3429
3430        Ok(guard.into_handle())
3431    }
3432
3433    /// Spawn worker tasks. Workers block until
3434    /// [`start()`](Self::start) is called, allowing the caller to
3435    /// move fork-mode workers into cgroups first. The worker creation
3436    /// primitive (`fork` or `std::thread::spawn`) is selected by
3437    /// [`WorkloadConfig::clone_mode`].
3438    pub fn spawn(config: &WorkloadConfig) -> Result<Self> {
3439        // Reject invalid mem_policy before any worker context exists;
3440        // see [`WorkloadConfig::validate`] for the silent-skip
3441        // rationale and why a worker-side `_exit(1)` fix is unsound.
3442        config.validate()?;
3443
3444        let dispatch = match &config.clone_mode {
3445            CloneMode::Fork => Dispatch::Fork,
3446            CloneMode::Thread => Dispatch::Thread,
3447        };
3448
3449        // Build the per-group params list: primary first
3450        // (group_idx == 0), then composed[k] resolved into
3451        // group_idx == k+1. The resolver enforces the "spawn-time
3452        // resolution rules" documented on
3453        // [`WorkloadConfig::composed`] (Q1: num_workers must be
3454        // explicit; Q2: only Inherit/Exact affinity reachable from
3455        // spawn() — topology-aware variants need the scenario
3456        // engine).
3457        //
3458        // Every group inherits the parent
3459        // [`WorkloadConfig::clone_mode`]: SpawnGuard's lifecycle
3460        // assumes a single dispatch path (every guard.children
3461        // entry is a fork-mode child reaped via waitpid; every
3462        // guard.threads entry is a thread-mode worker joined via
3463        // JoinHandle). Mixing modes inside one guard would route
3464        // teardown through the wrong code path, so [`WorkSpec`]
3465        // carries no `clone_mode` field — it is a workload-wide
3466        // property fixed by [`WorkloadConfig::clone_mode`].
3467        let mut groups: Vec<GroupParams> = Vec::with_capacity(1 + config.composed.len());
3468        groups.push(GroupParams::primary(config)?);
3469        for (i, spec) in config.composed.iter().enumerate() {
3470            groups.push(GroupParams::from_composed(spec, i + 1)?);
3471        }
3472
3473        // Per-group admission. Each group's work_type is checked
3474        // independently — a malformed composed entry bails the
3475        // whole workload before any resources are acquired.
3476        for group in &groups {
3477            // Thread mode + ForkExit is incompatible. ForkExit's worker
3478            // body calls `libc::fork()` from inside `worker_main` to
3479            // exercise the fork -> exit -> wait lifecycle. Under
3480            // [`CloneMode::Thread`] the worker is one thread of the
3481            // harness's multi-threaded tgid, and a `fork()` from a thread
3482            // of a multi-threaded process duplicates ONLY the calling
3483            // thread: any lock another harness thread holds at fork time
3484            // (glibc malloc arena, internal mutexes) stays locked forever
3485            // in the child, which has no thread left to release it. The
3486            // child here only `_exit`s (async-signal-safe), so it survives
3487            // this in practice — but the hazard is real and the
3488            // fork -> exit primitive is faithfully exercised only when each
3489            // worker is its own process. (The child does NOT share the
3490            // harness tgid: `fork()` omits CLONE_THREAD, so copy_process
3491            // gives it group_leader=self / tgid=pid, a fresh singleton
3492            // group. Its `libc::_exit(0)` issues exit_group(2) ->
3493            // `do_group_exit` -> `zap_other_threads`, but a singleton tgid
3494            // has no other threads for `zap_other_threads` to SIGKILL, so
3495            // the teardown ends only the child; the harness tgid is
3496            // untouched.) Reject at spawn with an actionable diagnostic;
3497            // CloneMode::Fork gives each worker its own tgid and a
3498            // lock-clean address space.
3499            if matches!(dispatch, Dispatch::Thread) && matches!(group.work_type, WorkType::ForkExit)
3500            {
3501                anyhow::bail!(
3502                    "CloneMode::Thread is incompatible with WorkType::ForkExit \
3503                     (group {}) — ForkExit forks inside the worker, and a fork \
3504                     from a thread of the multi-threaded harness inherits \
3505                     sibling-held locks the child cannot release; only a \
3506                     separate process faithfully exercises the fork/exit \
3507                     lifecycle. Use CloneMode::Fork for ForkExit workloads.",
3508                    group.group_idx,
3509                );
3510            }
3511            // Fork mode + EpollStorm is incompatible. EpollStorm
3512            // creates an eventfd + epoll fd inside worker pos 0 and
3513            // publishes their integer fd numbers through the per-group
3514            // shared mmap region (`efd_slot` / `epfd_slot`); siblings
3515            // load those numbers and operate on them as if they
3516            // referred to the same kernel objects. Under
3517            // [`CloneMode::Fork`] each forked child holds its own copy
3518            // of the parent's fd table at fork time, but the eventfd
3519            // and epoll fd are created AFTER the fork on worker pos 0
3520            // — so sibling children's fd tables never contain those
3521            // descriptors. The integer numbers they read from the
3522            // shared region either resolve to unrelated fds the child
3523            // happened to have at the same slot or fail with EBADF.
3524            // The fd table is genuinely shared only under
3525            // [`CloneMode::Thread`]. Reject at spawn time with an
3526            // actionable diagnostic; CloneMode::Thread is the correct
3527            // choice for EpollStorm.
3528            if matches!(dispatch, Dispatch::Fork)
3529                && matches!(group.work_type, WorkType::EpollStorm { .. })
3530            {
3531                anyhow::bail!(
3532                    "CloneMode::Fork is incompatible with WorkType::EpollStorm \
3533                     (group {}) — EpollStorm publishes eventfd/epoll fd numbers \
3534                     through a shared mmap region for siblings to consume, but \
3535                     forked children hold independent fd tables that never \
3536                     contain those post-fork descriptors. Use CloneMode::Thread \
3537                     for EpollStorm workloads.",
3538                    group.group_idx,
3539                );
3540            }
3541            // Thread mode + CgroupChurn is incompatible. CgroupChurn's
3542            // worker body writes its own tid (`SYS_gettid`) to a
3543            // sibling cgroup's `cgroup.procs` file. The kernel's
3544            // `__cgroup_procs_write` resolves the tid to its
3545            // task_struct, then migrates the *entire* thread group
3546            // leader's task and every member of its tgid to the
3547            // target cgroup (see `cgroup_attach_task` /
3548            // `cgroup_migrate` in kernel/cgroup/cgroup.c — procs-file
3549            // semantics are tgid-wide, contrast `cgroup.threads`
3550            // which is per-thread under the threaded controller).
3551            // Under [`CloneMode::Thread`] every worker is a member of
3552            // the test harness's tgid, so the first CgroupChurn write
3553            // migrates the harness itself and every sibling worker
3554            // thread out from under the host. Reject at spawn time
3555            // with an actionable diagnostic; CloneMode::Fork gives
3556            // each worker its own tgid so a procs-file write moves
3557            // only that worker.
3558            if matches!(dispatch, Dispatch::Thread)
3559                && matches!(group.work_type, WorkType::CgroupChurn { .. })
3560            {
3561                anyhow::bail!(
3562                    "CloneMode::Thread is incompatible with WorkType::CgroupChurn \
3563                     (group {}) — CgroupChurn writes the worker tid to \
3564                     `cgroup.procs`, which the kernel resolves to the whole tgid \
3565                     and migrates every sibling thread (including the harness) \
3566                     to the target cgroup. Use CloneMode::Fork for CgroupChurn \
3567                     workloads so each worker is a separate tgid.",
3568                    group.group_idx,
3569                );
3570            }
3571            // Thread mode + CgroupAttachStorm is incompatible because the
3572            // worker installs `SIGCHLD = SIG_IGN` at entry (under
3573            // ReapMode::SigIgn) to auto-reap its forked children. Under
3574            // [`CloneMode::Thread`] the worker is a pthread of the harness
3575            // thread group and shares the harness `sighand` (CLONE_THREAD
3576            // implies CLONE_SIGHAND), so that install changes SIGCHLD
3577            // disposition for the WHOLE harness process — corrupting the
3578            // harness's own child reaping — not just this worker. (fork()
3579            // from a thread of the multithreaded harness is also fragile;
3580            // the forked child is safe here only because it does nothing
3581            // but `_exit`.) Under [`CloneMode::Fork`] each worker is its
3582            // own process with a private sighand, so the SIG_IGN install
3583            // and the forked-child lifecycle stay confined to that worker.
3584            // Note: the storm writes the forked CHILD's pid (its own tgid)
3585            // to cgroup.procs, so — unlike CgroupChurn's own-tid write —
3586            // the migration never moves the harness; the hazard is the
3587            // shared-sighand SIG_IGN install, not tgid migration. Reject at
3588            // spawn time with an actionable diagnostic.
3589            if matches!(dispatch, Dispatch::Thread)
3590                && matches!(group.work_type, WorkType::CgroupAttachStorm { .. })
3591            {
3592                anyhow::bail!(
3593                    "CloneMode::Thread is incompatible with WorkType::CgroupAttachStorm \
3594                     (group {}) — the worker installs SIGCHLD=SIG_IGN to auto-reap its \
3595                     forked children, but a thread-group worker shares the harness \
3596                     sighand, so that install corrupts the harness's own child reaping. \
3597                     Use CloneMode::Fork for CgroupAttachStorm workloads so each worker \
3598                     has its own process and private sighand.",
3599                    group.group_idx,
3600                );
3601            }
3602            // Dispatch-agnostic admission rules
3603            // (worker_group_size divisibility, chain depth >= 2,
3604            // IdleChurn / IpcVariance zero-rejections) live in
3605            // [`validate_workload_admission`] and are shared with
3606            // [`Self::spawn_pcomm_cgroup`]. The
3607            // CloneMode-vs-WorkType compat checks above stay
3608            // dispatch-specific because each dispatch path gates
3609            // independently: Thread+ForkExit (here) and pcomm+ForkExit
3610            // ([`Self::spawn_pcomm_cgroup`]) reject for the SAME reason
3611            // (a fork from a thread of a multi-threaded process inherits
3612            // sibling-held locks the child cannot release) via separate
3613            // per-path gates; Fork+EpollStorm has no pcomm equivalent.
3614            validate_workload_admission(group)?;
3615        }
3616
3617        // futex region sizing is per-group, not MAX'd across all
3618        // groups. Each group's futex region has its own natural
3619        // size determined by [`futex_region_size_for`] (FanOutCompute
3620        // = 16, ProducerConsumerImbalance = 32 + Q*8, everything
3621        // else = 4). Storing the size alongside each pointer in
3622        // `SpawnGuard::futex_region_sizes` lets Drop munmap each
3623        // region with its own length, so a small-variant group
3624        // composed alongside a large ProducerConsumerImbalance group
3625        // is no longer inflated to the large size.
3626        //
3627        // The kernel rounds munmap length up to PAGE_SIZE, so the
3628        // per-region waste for sub-page allocations is bounded at
3629        // one page; the previous MAX-across-groups approach could
3630        // waste many pages per small-variant group when paired with
3631        // a large queue_depth_target.
3632
3633        // All failable acquisitions in this function route through
3634        // `guard`. If any `?`/`bail!` returns early, the guard's Drop
3635        // SIGKILLs+reaps forked children, closes open pipe fds, and
3636        // munmaps the shared regions — so no leak on a mid-spawn
3637        // error path.
3638        let mut guard = SpawnGuard::new();
3639
3640        // Per-worker iteration counter region (MAP_SHARED). Sized
3641        // for ALL groups' workers laid out contiguously: primary
3642        // group occupies slots `[0, primary.num_workers)`, composed
3643        // group `k` occupies slots starting at the running offset
3644        // tracked by `iter_offset` in the per-group spawn loop
3645        // below. Each worker atomically stores its iteration count
3646        // to its assigned slot; the parent reads all slots via
3647        // `snapshot_iterations()`. The mmap base is page-aligned
3648        // (kernel guarantee), so casting to `*mut AtomicU64` is
3649        // sound: page alignment (≥ 4096) ≥ AtomicU64 alignment (8),
3650        // and the region size is an exact multiple of
3651        // `size_of::<AtomicU64>()` (== 8). Each `.add(i)` moves by
3652        // `i * 8` bytes, preserving the 8-byte alignment invariant.
3653        // No non-atomic access to the region exists anywhere in the
3654        // crate, so the atomic-only aliasing rule (workers + parent
3655        // share `&AtomicU64` references derived from the raw
3656        // pointer) holds.
3657        let total_workers: usize = groups.iter().map(|g| g.num_workers).sum();
3658        if total_workers > 0 {
3659            let size = total_workers * std::mem::size_of::<AtomicU64>();
3660            let ptr = unsafe {
3661                libc::mmap(
3662                    std::ptr::null_mut(),
3663                    size,
3664                    libc::PROT_READ | libc::PROT_WRITE,
3665                    libc::MAP_SHARED | libc::MAP_ANONYMOUS,
3666                    -1,
3667                    0,
3668                )
3669            };
3670            if ptr == libc::MAP_FAILED {
3671                let errno = std::io::Error::last_os_error();
3672                let hint = mmap_shared_anon_errno_hint(errno.raw_os_error());
3673                anyhow::bail!(
3674                    "mmap(MAP_SHARED|MAP_ANONYMOUS, {size} bytes) for the \
3675                     per-worker iter_counters region failed: {errno}{hint}; \
3676                     this region holds one AtomicU64 per worker \
3677                     ({total_workers} slots across {} group(s)) so the parent \
3678                     can snapshot iteration counts via \
3679                     `snapshot_iterations()`. Remediation: reduce num_workers \
3680                     (each worker consumes 8 bytes of this region, rounded up \
3681                     to a page) or raise `vm.max_map_count` / the memory \
3682                     cgroup limit.",
3683                    groups.len(),
3684                );
3685            }
3686            guard.iter_counters = ptr as *mut AtomicU64;
3687            guard.iter_counter_bytes = size;
3688            // Single shared phase-epoch word (MAP_SHARED, MAP_ANONYMOUS
3689            // zero-inits it to 0 = BASELINE). The scenario engine bumps
3690            // it at each step boundary; backdrop workers read it to
3691            // attribute per-phase telemetry. Allocated for every handle
3692            // — a step-local pool whose epoch is never bumped simply
3693            // observes no change and emits no PhaseSlices.
3694            let epoch_size = std::mem::size_of::<AtomicU32>();
3695            let epoch_ptr = unsafe {
3696                libc::mmap(
3697                    std::ptr::null_mut(),
3698                    epoch_size,
3699                    libc::PROT_READ | libc::PROT_WRITE,
3700                    libc::MAP_SHARED | libc::MAP_ANONYMOUS,
3701                    -1,
3702                    0,
3703                )
3704            };
3705            if epoch_ptr == libc::MAP_FAILED {
3706                let errno = std::io::Error::last_os_error();
3707                let hint = mmap_shared_anon_errno_hint(errno.raw_os_error());
3708                anyhow::bail!(
3709                    "mmap(MAP_SHARED|MAP_ANONYMOUS, {epoch_size} bytes) for the \
3710                     phase-epoch word failed: {errno}{hint}; this single \
3711                     AtomicU32 lets the scenario engine broadcast the current \
3712                     phase to backdrop workers for per-phase attribution.",
3713                );
3714            }
3715            guard.phase_epoch = epoch_ptr as *mut AtomicU32;
3716            guard.phase_epoch_bytes = epoch_size;
3717        }
3718
3719        // Spawn each group in declaration order. `iter_offset`
3720        // tracks the running offset into the iter_counters mmap
3721        // (slot allocation per the layout commented above). Each
3722        // group's pipes / chain_pipes / futex_ptrs are appended to
3723        // the guard's flat vectors; we record per-group base
3724        // offsets so the per-worker fork loop can compute
3725        // global-vector indices from per-group worker indices and
3726        // the close-other-fds child path can iterate the full
3727        // guard while still identifying its own group's resources.
3728        let mut iter_offset: usize = 0;
3729        for group in &groups {
3730            Self::spawn_group(&mut guard, group, &dispatch, iter_offset)?;
3731            iter_offset += group.num_workers;
3732        }
3733
3734        // Success: transfer every live resource (children, threads,
3735        // futex_ptrs, iter_counters, pipe_pairs, chain_pipes) into
3736        // the handle. The guard's subsequent Drop sees empty Vecs
3737        // and a null iter_counters pointer — it is a no-op on this
3738        // path. Pipe fds are closed by `WorkloadHandle::drop` AFTER
3739        // worker shutdown so Thread-mode workers (which share the
3740        // parent's fd table) finish their pipe ops before the close.
3741        Ok(guard.into_handle())
3742    }
3743
3744    /// Spawn a single worker group's resources and per-worker
3745    /// tasks, appending each into the shared [`SpawnGuard`].
3746    ///
3747    /// Each group records its own base offsets into the guard's
3748    /// flat vectors at entry time, then uses those offsets when
3749    /// computing per-worker `pair_idx` / `chain_idx` /
3750    /// `futex_group_idx`. The fork-child close-other-fds block
3751    /// iterates the FULL guard so it sweeps fds belonging to
3752    /// other groups too — without that sweep, a composed-group
3753    /// worker would inherit (and never close) every primary-group
3754    /// pipe fd.
3755    ///
3756    /// Resource ownership is uniform across groups: every
3757    /// allocated pipe / mmap region lives in the guard's flat
3758    /// vectors and is freed by `SpawnGuard::Drop` on early-bail or
3759    /// transferred to [`WorkloadHandle`] on success via
3760    /// [`SpawnGuard::into_handle`].
3761    #[allow(clippy::too_many_arguments)]
3762    fn spawn_group(
3763        guard: &mut SpawnGuard,
3764        group: &GroupParams,
3765        dispatch: &Dispatch,
3766        iter_offset: usize,
3767    ) -> Result<()> {
3768        let needs_pipes = matches!(
3769            group.work_type,
3770            WorkType::PipeIo { .. } | WorkType::CachePipe { .. }
3771        );
3772        let chain_depth = group.work_type.chain_pipe_depth();
3773        let needs_futex = group.work_type.needs_shared_mem();
3774
3775        // Record the bases into the guard's flat vectors BEFORE
3776        // appending this group's allocations. The base values
3777        // identify "where this group's resources start" — the
3778        // per-worker fork loop combines `pipe_pair_base + i / 2`
3779        // (and analogous for chain_idx / futex_group_idx) to
3780        // address its own resources without colliding with another
3781        // group's range.
3782        let pipe_pair_base = guard.pipe_pairs.len();
3783        let chain_pipes_base = guard.chain_pipes.len();
3784        let futex_ptrs_base = guard.futex_ptrs.len();
3785        // Per-group natural size for the futex MAP_SHARED region —
3786        // each region in this group's range gets exactly this many
3787        // bytes (rather than the previous global MAX across every
3788        // group). See `futex_region_size_for` for the per-variant
3789        // sizing rules.
3790        let futex_region_size = futex_region_size_for(&group.work_type);
3791
3792        // For paired work types, create one pipe per worker pair before forking.
3793        // pipe_pairs[pair_idx] = (read_fd, write_fd) for the A->B direction,
3794        // and a second pipe for B->A. Use `pipe2(O_CLOEXEC)` instead
3795        // of bare `pipe(2)`: O_CLOEXEC is the correct default for
3796        // any kernel fd in long-running processes — fds without
3797        // O_CLOEXEC silently leak into any exec path.
3798        if needs_pipes {
3799            for _ in 0..group.num_workers / 2 {
3800                let mut ab = [0i32; 2]; // A writes, B reads
3801                if unsafe { libc::pipe2(ab.as_mut_ptr(), libc::O_CLOEXEC) } != 0 {
3802                    anyhow::bail!("pipe2 failed: {}", std::io::Error::last_os_error());
3803                }
3804                let mut ba = [0i32; 2]; // B writes, A reads
3805                if unsafe { libc::pipe2(ba.as_mut_ptr(), libc::O_CLOEXEC) } != 0 {
3806                    // Close the ab half we just created: it is not
3807                    // yet owned by the guard, so its Drop won't
3808                    // otherwise reach it.
3809                    unsafe {
3810                        libc::close(ab[0]);
3811                        libc::close(ab[1]);
3812                    }
3813                    anyhow::bail!("pipe2 failed: {}", std::io::Error::last_os_error());
3814                }
3815                guard.pipe_pairs.push((ab, ba));
3816            }
3817        }
3818
3819        // For WakeChain { wake: WakeMechanism::Pipe }, allocate `depth` pipes per
3820        // chain (one pipe per stage). Pipe `i` connects stage `i`
3821        // (writer) to stage `(i + 1) % depth` (reader). On any
3822        // `pipe2()` failure mid-allocation, close the half-built
3823        // chain's pipes before bailing — the chain is not yet
3824        // pushed onto `guard.chain_pipes`, so its Drop won't
3825        // otherwise reach those fds. `O_CLOEXEC` matches the
3826        // defense-in-depth posture documented above on the
3827        // pipe-pair allocation.
3828        if let Some(depth) = chain_depth
3829            && depth > 0
3830            && group.num_workers >= depth
3831        {
3832            let chains = group.num_workers / depth;
3833            for _ in 0..chains {
3834                let mut chain: Vec<[i32; 2]> = Vec::with_capacity(depth);
3835                let mut alloc_ok = true;
3836                for _ in 0..depth {
3837                    let mut p = [0i32; 2];
3838                    if unsafe { libc::pipe2(p.as_mut_ptr(), libc::O_CLOEXEC) } != 0 {
3839                        alloc_ok = false;
3840                        break;
3841                    }
3842                    chain.push(p);
3843                }
3844                if !alloc_ok {
3845                    for p in &chain {
3846                        unsafe {
3847                            libc::close(p[0]);
3848                            libc::close(p[1]);
3849                        }
3850                    }
3851                    anyhow::bail!(
3852                        "WakeChain pipe2 allocation failed: {}",
3853                        std::io::Error::last_os_error()
3854                    );
3855                }
3856                guard.chain_pipes.push(chain);
3857            }
3858        }
3859
3860        // For FutexPingPong/FutexFanOut/FanOutCompute/MutexContention, allocate
3861        // one shared region per worker group via MAP_SHARED|MAP_ANONYMOUS
3862        // so all members of the fork see the same physical page.
3863        // Each region is sized exactly to the variant's natural
3864        // need (see [`futex_region_size_for`]) — the per-region
3865        // size is recorded in `guard.futex_region_sizes` parallel
3866        // to `guard.futex_ptrs`, so munmap on Drop receives the
3867        // correct length even when groups with different natural
3868        // sizes co-exist in the same workload.
3869        let futex_group_size = group.work_type.worker_group_size().unwrap_or(2);
3870        if needs_futex {
3871            for _ in 0..group.num_workers / futex_group_size {
3872                let ptr = unsafe {
3873                    libc::mmap(
3874                        std::ptr::null_mut(),
3875                        futex_region_size,
3876                        libc::PROT_READ | libc::PROT_WRITE,
3877                        libc::MAP_SHARED | libc::MAP_ANONYMOUS,
3878                        -1,
3879                        0,
3880                    )
3881                };
3882                if ptr == libc::MAP_FAILED {
3883                    let errno = std::io::Error::last_os_error();
3884                    let hint = mmap_shared_anon_errno_hint(errno.raw_os_error());
3885                    anyhow::bail!(
3886                        "mmap(MAP_SHARED|MAP_ANONYMOUS, {futex_region_size} bytes) \
3887                         for a futex shared-memory region failed: {errno}{hint}; \
3888                         this region backs the {:?} worker-group's (group {}) \
3889                         inter-process futex word and is allocated \
3890                         before fork so every child inherits the same \
3891                         mapping. Remediation: reduce num_workers (each \
3892                         futex group consumes one shared page) or raise \
3893                         `vm.max_map_count` / the memory cgroup limit.",
3894                        group.work_type.name(),
3895                        group.group_idx,
3896                    );
3897                }
3898                unsafe { std::ptr::write_bytes(ptr as *mut u8, 0, futex_region_size) };
3899                guard.futex_ptrs.push(ptr as *mut u32);
3900                guard.futex_region_sizes.push(futex_region_size);
3901            }
3902        }
3903
3904        for i in 0..group.num_workers {
3905            let affinity = resolve_affinity(&group.affinity)?;
3906
3907            // Determine pipe fds for this worker.
3908            //
3909            // Three shapes use the same `Option<(read_fd, write_fd)>`
3910            // parameter:
3911            // - PipeIo / CachePipe (paired): worker A reads `ba[0]`,
3912            //   writes `ab[1]`; worker B reads `ab[0]`, writes
3913            //   `ba[1]`.
3914            // - WakeChain { wake: WakeMechanism::Pipe } (chain ring): stage `s`
3915            //   reads from pipe `(s + depth - 1) % depth` (its
3916            //   predecessor's write end's matching read end) and
3917            //   writes to pipe `s` (its own pipe's write end, which
3918            //   stage `s + 1` reads from).
3919            // - Everything else: `None`.
3920            //
3921            // Indices are computed in the GLOBAL `guard.pipe_pairs`
3922            // / `guard.chain_pipes` space by adding the per-group
3923            // base recorded at the top of `spawn_group`. A composed
3924            // group's pipe-pair-base, for example, equals the sum
3925            // of every prior group's pipe-pair count, so its first
3926            // worker pair is allocated immediately after the
3927            // primary's last entry — no collision, no aliasing.
3928            let worker_pipe_fds: Option<(i32, i32)> = if needs_pipes {
3929                let pair_idx = pipe_pair_base + i / 2;
3930                let (ref ab, ref ba) = guard.pipe_pairs[pair_idx];
3931                if i % 2 == 0 {
3932                    // Worker A: writes to ab[1], reads from ba[0]
3933                    Some((ba[0], ab[1]))
3934                } else {
3935                    // Worker B: writes to ba[1], reads from ab[0]
3936                    Some((ab[0], ba[1]))
3937                }
3938            } else if let Some(depth) = chain_depth
3939                && depth > 0
3940            {
3941                let chain_idx = chain_pipes_base + i / depth;
3942                let stage = i % depth;
3943                let prev_stage = (stage + depth - 1) % depth;
3944                let chain = &guard.chain_pipes[chain_idx];
3945                // Read end of predecessor's pipe; write end of own
3946                // pipe. The kernel pipe pair is `[read_end,
3947                // write_end]` per `libc::pipe`'s manpage.
3948                Some((chain[prev_stage][0], chain[stage][1]))
3949            } else {
3950                None
3951            };
3952
3953            // Futex pointer for this worker. The `pos` is the
3954            // worker's index inside its futex group: `pos == 0`
3955            // is the group's "first" worker (the role that varies
3956            // per-variant — pair-A for FutexPingPong, messenger for
3957            // FutexFanOut/FanOutCompute, waker for ThunderingHerd/
3958            // AsymmetricWaker, chain-head for WakeChain). Variants
3959            // that need finer-grained per-worker positioning
3960            // (PriorityInversion's 3 tiers, ProducerConsumerImbalance's
3961            // producer/consumer split, RtStarvation's RT/CFS split,
3962            // WakeChain's stage index) consume `pos` directly.
3963            let worker_futex: Option<(*mut u32, usize)> = if needs_futex {
3964                let futex_group_idx = futex_ptrs_base + i / futex_group_size;
3965                let pos = i % futex_group_size;
3966                Some((guard.futex_ptrs[futex_group_idx], pos))
3967            } else {
3968                None
3969            };
3970
3971            // Shared iteration counter slot for this worker. The
3972            // group-local index `i` is added to the spawn-time
3973            // `iter_offset` so each group's slot range is disjoint
3974            // from every other group's.
3975            let iter_slot: *mut AtomicU64 = if !guard.iter_counters.is_null() {
3976                unsafe { guard.iter_counters.add(iter_offset + i) }
3977            } else {
3978                std::ptr::null_mut()
3979            };
3980            // Single shared phase-epoch word — the SAME pointer for every
3981            // worker (no per-worker offset). Non-null for every handle; the
3982            // scenario bumps it only for backdrop handles, so step-local
3983            // workers observe no change and emit no slices.
3984            let phase_epoch_ptr: *mut AtomicU32 = guard.phase_epoch;
3985
3986            // Per-mode dispatch. Thread-mode workers do not need
3987            // pipes — the rendezvous and report channels are
3988            // in-process Rust primitives (`mpsc::sync_channel(0)` +
3989            // `JoinHandle`). Fork mode uses the pipe-based
3990            // scaffolding below.
3991            match dispatch {
3992                Dispatch::Thread => {
3993                    spawn_thread_worker(
3994                        guard,
3995                        group,
3996                        affinity,
3997                        worker_pipe_fds,
3998                        worker_futex,
3999                        iter_slot,
4000                        phase_epoch_ptr,
4001                    )?;
4002                    continue;
4003                }
4004                Dispatch::Fork => {
4005                    // fall through to the pipe-based dispatch below
4006                }
4007            }
4008
4009            // Create pipe for report and a second pipe for "start" signal.
4010            // Local cleanup on second-pipe failure: the guard has no
4011            // per-worker tracking of half-allocated pipes, so the first
4012            // half closes here before the bail. `O_CLOEXEC` matches
4013            // the defense-in-depth posture above on the inter-worker
4014            // pipe pairs and chain pipes.
4015            let mut report_fds = [0i32; 2];
4016            if unsafe { libc::pipe2(report_fds.as_mut_ptr(), libc::O_CLOEXEC) } != 0 {
4017                anyhow::bail!(
4018                    "worker {}/{} (group {}): report pipe2 failed: {}",
4019                    i + 1,
4020                    group.num_workers,
4021                    group.group_idx,
4022                    std::io::Error::last_os_error(),
4023                );
4024            }
4025            // Grow the report pipe so a worker that produces a worst-
4026            // case report (two `Vec<u64>` reservoirs capped at
4027            // `MAX_WAKE_SAMPLES` (100_000) entries each, plus
4028            // smaller fields) finishes `write_all` without blocking
4029            // for the parent to drain. The default pipe capacity on
4030            // Linux is 16 pages (64 KiB on 4 KiB pages, 256 KiB on
4031            // 16 KiB pages); a postcard-encoded `WorkerReport` with
4032            // both reservoirs full is at most ~1.8 MiB (u64 varint
4033            // is up to 9 bytes per entry, 200_000 entries total).
4034            // Without this grow, the worker blocks inside
4035            // `f.write_all(&bytes)` waiting for the parent's
4036            // `read_to_end`. The parent reads workers serially with
4037            // a shared 5 s deadline (`stop_and_collect`); if the
4038            // first worker's drain consumes the budget, every
4039            // subsequent worker's `poll` budget is 0, the parent
4040            // closes the read fd without reading, and the child's
4041            // blocked write returns EPIPE — the report is lost and
4042            // the failure surfaces as a sentinel `WorkerReport`.
4043            // Sizing the pipe to 8 MiB lets the child write the
4044            // full payload in one non-blocking burst, exit, and
4045            // close its write end; when the parent later polls the
4046            // read fd, all data is already buffered in the kernel
4047            // and `read_to_end` returns immediately with EOF.
4048            //
4049            // F_SETPIPE_SZ rounds the requested size up to the next
4050            // power of two; unprivileged callers are clamped to
4051            // `/proc/sys/fs/pipe-max-size` (typically 1 MiB).
4052            // ktstr always runs as root inside the guest, so the
4053            // grow succeeds. A best-effort failure (older kernel
4054            // without F_SETPIPE_SZ, or an unexpected EPERM) leaves
4055            // the default size in place — workers with small
4056            // reports still complete; workers with large reports
4057            // fall back to the historical blocking-write behaviour.
4058            // Logging the failure surfaces the regression without
4059            // failing the whole spawn.
4060            const REPORT_PIPE_SIZE: libc::c_int = 8 * 1024 * 1024;
4061            // SAFETY: `report_fds[1]` is the freshly opened write
4062            // end returned by `pipe2` above; F_SETPIPE_SZ accepts
4063            // any pipe fd (read or write end — both refer to the
4064            // same kernel `struct pipe_inode_info`).
4065            let prev_size =
4066                unsafe { libc::fcntl(report_fds[1], libc::F_SETPIPE_SZ, REPORT_PIPE_SIZE) };
4067            if prev_size < 0 {
4068                let err = std::io::Error::last_os_error();
4069                tracing::warn!(
4070                    worker = i + 1,
4071                    num_workers = group.num_workers,
4072                    group_idx = group.group_idx,
4073                    requested_size = REPORT_PIPE_SIZE,
4074                    %err,
4075                    "F_SETPIPE_SZ on report pipe failed; falling back to default \
4076                     pipe capacity. Workers producing >64 KiB reports may block \
4077                     in write_all and have their reports truncated by the \
4078                     parent's deadline-driven fd close."
4079                );
4080            }
4081            let mut start_fds = [0i32; 2];
4082            if unsafe { libc::pipe2(start_fds.as_mut_ptr(), libc::O_CLOEXEC) } != 0 {
4083                unsafe {
4084                    libc::close(report_fds[0]);
4085                    libc::close(report_fds[1]);
4086                }
4087                anyhow::bail!(
4088                    "worker {}/{} (group {}): start pipe2 failed: {}",
4089                    i + 1,
4090                    group.num_workers,
4091                    group.group_idx,
4092                    std::io::Error::last_os_error(),
4093                );
4094            }
4095
4096            // Block SIGUSR1 across fork so the child inherits a
4097            // SIGUSR1-blocked mask. Without this, there is a
4098            // window between `fork()` returning in the child and
4099            // the child installing `sigusr1_handler` below where
4100            // SIGUSR1's default disposition (terminate) is in
4101            // effect. A `stop_and_collect` call that races against
4102            // a worker still in mid-init — e.g. if a sibling
4103            // worker spawned just before this one already pushed
4104            // its (pid, ...) into `guard.children` and a parent
4105            // thread initiated stop signaling — could deliver
4106            // SIGUSR1 to this child before its handler is
4107            // installed and terminate it via the default action.
4108            // The block + post-install unblock pattern queues any
4109            // SIGUSR1 sent in the window so the handler observes
4110            // it on the unblock; the worker then sets `STOP` and
4111            // exits gracefully.
4112            //
4113            // pthread_sigmask is the multi-threaded equivalent of
4114            // sigprocmask: ktstr's parent process is genuinely
4115            // multi-threaded (test runner threads, tracing
4116            // threads), and `sigprocmask` semantics are unspecified
4117            // in MT programs per signal-safety(7) /
4118            // pthread_sigmask(3) — pthread_sigmask is the correct
4119            // primitive to use here. The fork()-inherited mask is
4120            // the calling thread's mask at fork time, exactly what
4121            // we want.
4122            //
4123            // Old mask is captured so the parent path can restore
4124            // its prior signal mask after fork — the block must
4125            // be transient on the parent side; the parent's
4126            // SIGUSR1 stays under whatever disposition the host
4127            // application configured (typically the default;
4128            // ktstr does not handle SIGUSR1 in the parent
4129            // process).
4130            let mut old_mask: libc::sigset_t = unsafe { std::mem::zeroed() };
4131            let mut block_mask: libc::sigset_t = unsafe { std::mem::zeroed() };
4132            // pthread_sigmask returns 0 on success, a positive errno
4133            // on failure (per POSIX — does NOT set errno). A non-zero
4134            // return here means the SIGUSR1 mask did not actually get
4135            // installed: the child would inherit an unblocked SIGUSR1
4136            // and the post-fork race window between fork() and the
4137            // child's signal() install would terminate the worker on
4138            // its default action. Log so the failure surfaces.
4139            let psm_block_rc = unsafe {
4140                libc::sigemptyset(&mut block_mask);
4141                libc::sigaddset(&mut block_mask, libc::SIGUSR1);
4142                libc::pthread_sigmask(libc::SIG_BLOCK, &block_mask, &mut old_mask)
4143            };
4144            if psm_block_rc != 0 {
4145                tracing::warn!(
4146                    rc = psm_block_rc,
4147                    "pthread_sigmask(SIG_BLOCK, SIGUSR1) failed pre-fork; child inherits unblocked SIGUSR1 and may terminate on default action before installing handler"
4148                );
4149            }
4150
4151            let pid = unsafe { libc::fork() };
4152            match pid {
4153                -1 => {
4154                    // Fork failed: restore the parent's previous
4155                    // signal mask before bailing so this failure
4156                    // doesn't leave SIGUSR1 blocked in the calling
4157                    // thread for the rest of the process lifetime.
4158                    let psm_restore_rc = unsafe {
4159                        libc::pthread_sigmask(libc::SIG_SETMASK, &old_mask, std::ptr::null_mut())
4160                    };
4161                    if psm_restore_rc != 0 {
4162                        tracing::warn!(
4163                            rc = psm_restore_rc,
4164                            "pthread_sigmask(SIG_SETMASK) failed restoring mask after fork failure; SIGUSR1 may remain blocked in this thread"
4165                        );
4166                    }
4167                    // Close both fresh pipes so they don't leak
4168                    // before the guard reaps the already-forked
4169                    // siblings.
4170                    unsafe {
4171                        libc::close(report_fds[0]);
4172                        libc::close(report_fds[1]);
4173                        libc::close(start_fds[0]);
4174                        libc::close(start_fds[1]);
4175                    }
4176                    anyhow::bail!(
4177                        "worker {}/{} (group {}): fork failed: {}",
4178                        i + 1,
4179                        group.num_workers,
4180                        group.group_idx,
4181                        std::io::Error::last_os_error(),
4182                    );
4183                }
4184                0 => {
4185                    // Child: set parent-death signal BEFORE any other
4186                    // post-fork setup so the kernel SIGKILLs this worker
4187                    // immediately if the parent dies during the remaining
4188                    // init (close fd loops, signal handler install, start-
4189                    // pipe wait, worker_main). Without PR_SET_PDEATHSIG,
4190                    // a parent crash between fork and start leaves workers
4191                    // reparented to init and spinning indefinitely —
4192                    // they'd outlive the test run, consume the cgroup's
4193                    // CPU, and block the next scenario's cgroup teardown
4194                    // with EBUSY. SIGKILL is the only safe choice: it
4195                    // cannot be masked and runs before any of this child's
4196                    // destructors execute (good — those destructors still
4197                    // reference the parent's guard). prctl is NOT listed
4198                    // as async-signal-safe by signal-safety(7); safe to
4199                    // call here because this is a single-threaded
4200                    // post-fork child before any signal handlers are
4201                    // installed, so no interleaving can observe partial
4202                    // state.
4203                    unsafe {
4204                        libc::prctl(libc::PR_SET_PDEATHSIG, libc::SIGKILL);
4205                    }
4206                    // Fork-race close: if the parent died between fork()
4207                    // return and the prctl above, this child was already
4208                    // reparented (typically to pid 1) before PDEATHSIG
4209                    // was armed — the death signal is keyed on the CURRENT
4210                    // parent, not the parent-at-fork-time, so the signal
4211                    // will never fire. getppid() == 1 means we are already
4212                    // orphaned; exit now instead of running the full
4213                    // worker loop only to leak into init. Using `_exit`
4214                    // (async-signal-safe) rather than `exit` so Rust
4215                    // destructors that reference the parent's now-dead
4216                    // guard don't run on the fork stack.
4217                    //
4218                    // PR_SET_CHILD_SUBREAPER exception: when an ancestor
4219                    // of the ktstr process has called
4220                    // `prctl(PR_SET_CHILD_SUBREAPER, 1)` (systemd user
4221                    // scopes, some container runtimes, certain CI
4222                    // harnesses), orphaned descendants reparent to the
4223                    // nearest live subreaper rather than pid 1. In that
4224                    // case `getppid() == 1` is false after an orphan-
4225                    // race even though the original parent is dead —
4226                    // the check is a best-effort fast-path for the
4227                    // common "pid 1 catches orphans" case and does NOT
4228                    // fire under subreaper ancestry. PDEATHSIG still
4229                    // fires correctly in that scenario (the signal is
4230                    // triggered when the CURRENT parent dies, and the
4231                    // subreaper then inherits us), so the guard is a
4232                    // narrowing of the leak window, not an elimination.
4233                    //
4234                    // Guest-init exception: inside a ktstr guest VM the
4235                    // test driver IS pid 1 (it runs as /init), so every
4236                    // worker forked by a scenario legitimately has
4237                    // `getppid() == 1` even though the parent is alive
4238                    // and well. Firing the orphan guard there would kill
4239                    // every worker on startup and produce sentinel
4240                    // "0 cpus, 0 iterations" reports. `ktstr_guest_init`
4241                    // sets `KTSTR_GUEST_INIT=1` before dispatch; that
4242                    // variable is inherited by every descendant process,
4243                    // so its presence is a reliable signal that pid 1 is
4244                    // the legitimate parent. Host-side workloads leave
4245                    // the variable unset and retain the orphan detection.
4246                    if std::env::var_os(crate::KTSTR_GUEST_INIT_ENV).is_none()
4247                        && unsafe { libc::getppid() } == 1
4248                    {
4249                        exit_child_success();
4250                    }
4251                    // Make this worker its own process-group leader so
4252                    // any descendants it spawns inherit `pgid == worker_pid`.
4253                    // `stop_and_collect` / Drop issue `killpg(worker_pid,
4254                    // SIGKILL)` alongside the direct `kill` — without a
4255                    // private pgid, descendants forked by a
4256                    // [`WorkType::Custom`] body (or any future workload
4257                    // that spawns helpers) stay in the parent Rust
4258                    // process's pgid, survive the worker's SIGKILL, and
4259                    // orphan onto init. PR_SET_PDEATHSIG handles the
4260                    // "parent crashes" case but is per-task and cleared
4261                    // on fork, so grandchildren don't inherit it — the
4262                    // pgid route is the only safe reach for them when
4263                    // teardown is explicit. Failure is silently ignored:
4264                    // the only reachable failure mode for setpgid(0, 0)
4265                    // in a just-forked child is EPERM from an earlier
4266                    // setsid (we never call it), so a return of -1 here
4267                    // means the kernel invariant changed and the reach
4268                    // degrades to "leader only" — same as the pre-batch
4269                    // behavior. Async-signal-safe per signal-safety(7).
4270                    unsafe {
4271                        libc::setpgid(0, 0);
4272                    }
4273                    // Install signal handler BEFORE unblocking
4274                    // SIGUSR1: the parent blocked SIGUSR1 across
4275                    // fork so this child inherits a blocked mask;
4276                    // any SIGUSR1 sent by `stop_and_collect` while
4277                    // the mask is in effect queues until we
4278                    // unblock. Once the handler is registered we
4279                    // restore the original (pre-block) mask via
4280                    // `pthread_sigmask(SIG_SETMASK, …)`, which
4281                    // delivers any pending SIGUSR1 directly into
4282                    // `sigusr1_handler` and sets `STOP` cleanly —
4283                    // the worker proceeds into the start-wait
4284                    // poll() with handler armed and any pending
4285                    // signal already consumed. Without this two-
4286                    // step, SIGUSR1 would fire under its default
4287                    // disposition (terminate) during the window
4288                    // between `fork()` and `signal()`.
4289                    STOP.store(false, Ordering::Relaxed);
4290                    // libc::signal returns SIG_ERR (cast as
4291                    // sighandler_t) on failure with errno set.
4292                    // pthread_sigmask returns 0 on success, errno
4293                    // value on failure. Both failures here are
4294                    // visible defects: signal() fail means SIGUSR1
4295                    // keeps default disposition (terminate) and
4296                    // stop_and_collect cannot stop the worker
4297                    // gracefully — it falls through to the
4298                    // killpg/SIGKILL escalation; pthread_sigmask()
4299                    // fail means the SIGUSR1 we blocked across
4300                    // fork stays blocked, so the parent's stop
4301                    // SIGUSR1 queues forever in the child and the
4302                    // child runs the full work loop until the
4303                    // killpg escalation.
4304                    //
4305                    // tracing is unsafe in a post-fork child
4306                    // before _exit (the parent's tracing
4307                    // subscriber may be locked elsewhere); use
4308                    // eprintln! which is fork-safe per
4309                    // signal-safety(7) (write(2)).
4310                    let sig_prev = unsafe {
4311                        libc::signal(
4312                            libc::SIGUSR1,
4313                            sigusr1_handler as *const () as libc::sighandler_t,
4314                        )
4315                    };
4316                    if sig_prev == libc::SIG_ERR {
4317                        let errno = std::io::Error::last_os_error();
4318                        eprintln!(
4319                            "ktstr: signal(SIGUSR1) install failed in worker child: {errno}; graceful stop unavailable, killpg escalation will reap"
4320                        );
4321                    }
4322                    let psm_unblock_rc = unsafe {
4323                        libc::pthread_sigmask(libc::SIG_SETMASK, &old_mask, std::ptr::null_mut())
4324                    };
4325                    if psm_unblock_rc != 0 {
4326                        eprintln!(
4327                            "ktstr: pthread_sigmask(SIG_SETMASK) unblock failed in worker child: rc={psm_unblock_rc}; SIGUSR1 stays blocked, killpg escalation will reap"
4328                        );
4329                    }
4330                    // Close unused pipe ends
4331                    unsafe {
4332                        libc::close(report_fds[0]);
4333                        libc::close(start_fds[1]);
4334                    }
4335                    // Close pipe ends belonging to other workers
4336                    // in this pair, AND every pipe fd that belongs
4337                    // to any other pair anywhere in the workload —
4338                    // including pairs owned by other groups, since
4339                    // every pre-fork allocation lives in
4340                    // `guard.pipe_pairs` regardless of which group
4341                    // declared it. The fork inherits the parent's
4342                    // entire fd table; without this sweep, a
4343                    // composed-group worker would hold open every
4344                    // primary-group pipe fd for its lifetime,
4345                    // producing fd leaks and (for chain-shaped
4346                    // workloads) keeping reader-side blocks live
4347                    // when the writer-side closes.
4348                    if needs_pipes {
4349                        let pair_idx = pipe_pair_base + i / 2;
4350                        let (ref ab, ref ba) = guard.pipe_pairs[pair_idx];
4351                        if i % 2 == 0 {
4352                            // Worker A keeps ba[0] (read) and ab[1] (write).
4353                            // Close ab[0] and ba[1].
4354                            unsafe {
4355                                libc::close(ab[0]);
4356                                libc::close(ba[1]);
4357                            }
4358                        } else {
4359                            // Worker B keeps ab[0] (read) and ba[1] (write).
4360                            // Close ab[1] and ba[0].
4361                            unsafe {
4362                                libc::close(ab[1]);
4363                                libc::close(ba[0]);
4364                            }
4365                        }
4366                        // Close all pipe fds from other pairs (any group).
4367                        for (j, (ab2, ba2)) in guard.pipe_pairs.iter().enumerate() {
4368                            if j != pair_idx {
4369                                unsafe {
4370                                    libc::close(ab2[0]);
4371                                    libc::close(ab2[1]);
4372                                    libc::close(ba2[0]);
4373                                    libc::close(ba2[1]);
4374                                }
4375                            }
4376                        }
4377                    } else {
4378                        // Worker doesn't own any pipe pair, but
4379                        // other groups' pipe pairs are still in the
4380                        // child's fd table — close them all.
4381                        for (ab2, ba2) in guard.pipe_pairs.iter() {
4382                            unsafe {
4383                                libc::close(ab2[0]);
4384                                libc::close(ab2[1]);
4385                                libc::close(ba2[0]);
4386                                libc::close(ba2[1]);
4387                            }
4388                        }
4389                    }
4390                    if let Some(depth) = chain_depth
4391                        && depth > 0
4392                    {
4393                        let chain_idx = chain_pipes_base + i / depth;
4394                        let stage = i % depth;
4395                        let prev_stage = (stage + depth - 1) % depth;
4396                        // Close every fd in the chain that this
4397                        // stage does not own. Owned fds (kept open):
4398                        //   - chain[prev_stage][0]: read end of the
4399                        //     pipe predecessor writes to.
4400                        //   - chain[stage][1]: write end of the
4401                        //     pipe successor reads from.
4402                        // Everything else is the inverse end of an
4403                        // owned pipe or fully unrelated.
4404                        for (s, pipe) in guard.chain_pipes[chain_idx].iter().enumerate() {
4405                            // Always close the write end of the
4406                            // predecessor's pipe (we only need its
4407                            // read end).
4408                            if s == prev_stage {
4409                                unsafe {
4410                                    libc::close(pipe[1]);
4411                                }
4412                            // Always close the read end of our own
4413                            // pipe (we only need its write end).
4414                            } else if s == stage {
4415                                unsafe {
4416                                    libc::close(pipe[0]);
4417                                }
4418                            // Pipes belonging to neither this stage
4419                            // nor its predecessor: close both ends.
4420                            } else {
4421                                unsafe {
4422                                    libc::close(pipe[0]);
4423                                    libc::close(pipe[1]);
4424                                }
4425                            }
4426                        }
4427                        // Close every fd from other chains (any group).
4428                        for (cj, chain) in guard.chain_pipes.iter().enumerate() {
4429                            if cj != chain_idx {
4430                                for pipe in chain {
4431                                    unsafe {
4432                                        libc::close(pipe[0]);
4433                                        libc::close(pipe[1]);
4434                                    }
4435                                }
4436                            }
4437                        }
4438                    } else {
4439                        // This group has no chain pipes, but other
4440                        // groups may. Close every chain-pipe fd
4441                        // inherited via fork — leaving a primary
4442                        // group's chain pipe open in a composed
4443                        // worker would prevent the chain from ever
4444                        // observing EOF on its read ends.
4445                        for chain in guard.chain_pipes.iter() {
4446                            for pipe in chain {
4447                                unsafe {
4448                                    libc::close(pipe[0]);
4449                                    libc::close(pipe[1]);
4450                                }
4451                            }
4452                        }
4453                    }
4454                    // Layered defense against child-side unwinding
4455                    // reaching the forked-from-parent drops:
4456                    //
4457                    // 1. No-op panic hook — the default hook prints a
4458                    //    multi-line backtrace to stderr, which is a
4459                    //    shared fd with the parent post-fork. A panic
4460                    //    in the child would interleave garbled output
4461                    //    with the parent's tracing log and confuse
4462                    //    downstream parsers. Install a silent hook
4463                    //    before catch_unwind ONLY under
4464                    //    `panic = "unwind"` — under
4465                    //    `panic = "abort"` (release / nextest
4466                    //    profile) the silent hook would suppress the
4467                    //    panic message before SIGABRT fires, leaving
4468                    //    operators with zero diagnostic from a
4469                    //    crashed worker. Letting the default hook
4470                    //    write a backtrace to stderr is the lesser
4471                    //    evil under abort: the message may interleave
4472                    //    with parent tracing output but at least
4473                    //    surfaces the panic site. catch_unwind itself
4474                    //    is a no-op under abort (the closure is
4475                    //    invoked normally and the abort terminates
4476                    //    the child before any Err return), so gating
4477                    //    only the hook installation — not the
4478                    //    catch_unwind call — keeps the unwind-build
4479                    //    fast-path unchanged while restoring
4480                    //    release-build observability.
4481                    //
4482                    // 2. `_exit(0|1)` after the closure (success or
4483                    //    catch_unwind Err) — the child never returns
4484                    //    to a frame whose `SpawnGuard` Drop could
4485                    //    run. `_exit(2)` bypasses Rust's stack-unwind
4486                    //    drops and the static-destructor table both,
4487                    //    so the parent-owned `SpawnGuard` whose
4488                    //    storage was duplicated by `fork()` cannot
4489                    //    SIGKILL its siblings (fratricide) from the
4490                    //    child path.
4491                    //
4492                    // 3. `panic::catch_unwind` — catches any panic
4493                    //    before it escapes this arm. Belt-and-braces
4494                    //    against (a) additional Drops on this
4495                    //    frame's stack (e.g. future refactors that
4496                    //    add more RAII) and (b) alloc/OOM panics
4497                    //    during worker_main / postcard encode.
4498                    //
4499                    //    Caveat: catch_unwind is a no-op under
4500                    //    `panic = "abort"`, which ktstr's Cargo.toml
4501                    //    DOES set in `[profile.release]`. In release
4502                    //    builds a panic inside this closure aborts
4503                    //    the child immediately (SIGABRT); the
4504                    //    `catch_unwind` call compiles but never
4505                    //    returns `Err`, and neither the
4506                    //    `f.write_all(&bytes)` nor the `_exit(1)`
4507                    //    below runs on the panic path. The parent's
4508                    //    `stop_and_collect` therefore observes a
4509                    //    missing WorkerReport and fills in a
4510                    //    sentinel — that sentinel fallback IS the
4511                    //    release-build correctness mechanism. Under
4512                    //    abort the silent hook is NOT installed (see
4513                    //    item 1) so the default panic handler writes
4514                    //    the panic location and message to stderr
4515                    //    just before SIGABRT, giving operators the
4516                    //    diagnostic they would otherwise have lost.
4517                    //    Defense (2) still applies unchanged: abort
4518                    //    skips Drops (matching the `_exit` path's
4519                    //    no-Drop guarantee). Dev/test builds (cargo
4520                    //    test, cargo nextest run — dev profile
4521                    //    inherits default unwind semantics) still
4522                    //    get a real `catch_unwind` Err → `_exit(1)`
4523                    //    fast-path with a silent hook so the
4524                    //    backtrace doesn't pollute test output.
4525                    //
4526                    //    Global-state safety under unwind, scoped
4527                    //    to `worker_main`'s reachable code path —
4528                    //    the `fork()` child's observable set. Two
4529                    //    items: `STOP: AtomicBool` and
4530                    //    `STATIC_HOST_INFO: OnceLock<_>`. Neither
4531                    //    of them carries a Drop whose body touches
4532                    //    the inherited MAP_SHARED regions or the
4533                    //    parent-owned pipe fds. Under a
4534                    //    hypothetical unwind that escaped
4535                    //    `catch_unwind` (a double-panic that
4536                    //    bypasses the landing pad), the only
4537                    //    fork-child Drops that actually matter are
4538                    //    the parent-owned `SpawnGuard` (covered by
4539                    //    the `_exit` no-Drop guarantee above) and
4540                    //    the child-local `wake_latencies_ns` /
4541                    //    `migrations` `Vec<T>` (per-process heap,
4542                    //    no cross-process impact). `STATIC_HOST_INFO`'s
4543                    //    inner Drop frees a handful of
4544                    //    `Option<String>`s and is safe on either
4545                    //    side of fork. Crate-wide statics outside
4546                    //    this set (fetch, probe, vmm, …) are out
4547                    //    of scope — this audit pins only what the
4548                    //    fork-child can reach from `worker_main`.
4549                    //
4550                    // 4. `_exit(1)` on catch_unwind Err, `_exit(0)`
4551                    //    on Ok — bypasses Rust's global static
4552                    //    destructors that a plain `return` would
4553                    //    run.
4554                    //
4555                    // `AssertUnwindSafe` is justified: the child
4556                    // unconditionally _exits after this block, so no
4557                    // post-unwind invariant can be observed.
4558                    #[cfg(panic = "unwind")]
4559                    {
4560                        let _ = std::panic::take_hook();
4561                        std::panic::set_hook(Box::new(|_| {}));
4562                    }
4563                    let child_result =
4564                        std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
4565                            // Wait for parent to move us to cgroup before starting work.
4566                            // Use poll() with a 30s timeout — signal-safe after fork,
4567                            // prevents hanging forever if the parent stalls.
4568                            let mut pfd = libc::pollfd {
4569                                fd: start_fds[0],
4570                                events: libc::POLLIN,
4571                                revents: 0,
4572                            };
4573                            let ret = unsafe { libc::poll(&mut pfd, 1, 30_000) };
4574                            if ret <= 0 {
4575                                exit_child_error();
4576                            }
4577                            let mut buf = [0u8; 1];
4578                            let mut f = unsafe { std::fs::File::from_raw_fd(start_fds[0]) };
4579                            let _ = f.read_exact(&mut buf);
4580                            drop(f);
4581                            // Do NOT reset STOP here. STOP is initialized to
4582                            // false at the pre-handshake site above (after the
4583                            // signal-handler install) and a SIGUSR1 delivered
4584                            // between the mask unblock and this point reflects
4585                            // a legitimate parent-issued stop request — the
4586                            // explicit-start path in `stop_and_collect` skips
4587                            // the readiness barrier and races
4588                            // `kill(pid, SIGUSR1)` against the worker's start-
4589                            // byte read here, so any STOP=true observed at
4590                            // this point must propagate into the work loop.
4591                            // Clobbering it would lose the stop and produce a
4592                            // worker that loops until the killpg/SIGKILL
4593                            // escalation.
4594                            // Publish a single "ready" byte on the report pipe
4595                            // BEFORE entering `worker_main`. The parent's
4596                            // auto-start barrier in `stop_and_collect` polls
4597                            // every worker's report fd for POLLIN with a
4598                            // bounded deadline and consumes this byte as the
4599                            // explicit signal that the worker has finished
4600                            // its post-fork init (cgroup placement, SIGUSR1
4601                            // unblock) and is about to enter the
4602                            // work loop. Replaces the prior 500 ms blind
4603                            // sleep — under host CPU contention, real worker
4604                            // start latency can exceed that sleep, dropping
4605                            // a worker into its loop AFTER `stop_and_collect`
4606                            // has already signalled stop, which surfaces as
4607                            // a starvation false-positive.
4608                            //
4609                            // Done as a raw `libc::write` (not `File::write`)
4610                            // to keep the report-fd's ownership inside its
4611                            // existing `std::fs::File::from_raw_fd` block
4612                            // below — wrapping it here would close the fd
4613                            // when the local goes out of scope, before the
4614                            // post-`worker_main` write_all path. A 1-byte
4615                            // write to a freshly-grown 8 MiB pipe with no
4616                            // reader contention completes synchronously
4617                            // without blocking; signal-safe after fork.
4618                            //
4619                            // The byte is `b'r'`. The parent's collect path
4620                            // strips a leading `b'r'` before
4621                            // `postcard::from_bytes` so the
4622                            // explicit-start call site (which skips the
4623                            // barrier) still parses correctly. A zero return or short write
4624                            // is treated as best-effort: if the kernel
4625                            // refuses the write (EFAULT cannot occur with a
4626                            // stack pointer; EBADF cannot occur on a fresh
4627                            // fd; EPIPE only fires if the reader closed,
4628                            // which the parent does not do mid-spawn), the
4629                            // parent's barrier-deadline path falls back to
4630                            // the same "skip this worker" branch as a
4631                            // genuinely-dead worker.
4632                            let ready_byte: u8 = b'r';
4633                            unsafe {
4634                                libc::write(
4635                                    report_fds[1],
4636                                    &ready_byte as *const u8 as *const libc::c_void,
4637                                    1,
4638                                );
4639                            }
4640                            // Now run. Fork-mode workers thread the global
4641                            // STOP through `worker_main` — the SIGUSR1 handler
4642                            // is process-wide, so flipping `STOP` from
4643                            // `sigusr1_handler` is what reaches the loop's
4644                            // `stop.load(Relaxed)` checks.
4645                            let report = worker_main(
4646                                affinity,
4647                                group.work_type.clone(),
4648                                group.sched_policy,
4649                                group.mem_policy.clone(),
4650                                group.mpol_flags,
4651                                group.nice,
4652                                group.comm.as_deref(),
4653                                group.uid,
4654                                group.gid,
4655                                group.numa_node,
4656                                worker_pipe_fds,
4657                                worker_futex,
4658                                iter_slot,
4659                                phase_epoch_ptr,
4660                                &STOP,
4661                                group.group_idx,
4662                            );
4663                            let bytes = postcard::to_stdvec(&report).unwrap_or_default();
4664                            let mut f = unsafe { std::fs::File::from_raw_fd(report_fds[1]) };
4665                            if let Err(e) = f.write_all(&bytes) {
4666                                // tracing is unsafe in a post-fork child
4667                                // (the global subscriber may be holding a
4668                                // lock taken in another thread of the
4669                                // parent process before fork); eprintln
4670                                // goes straight to stderr without locking
4671                                // a tracing subscriber. The parent
4672                                // observes a partial / empty pipe payload
4673                                // and emits a sentinel WorkerReport with
4674                                // exit_info populated, but without this
4675                                // log line the underlying I/O error
4676                                // (EPIPE if the parent closed the read
4677                                // end early, EFBIG / ENOSPC on a backing-
4678                                // file pipe, EIO on a transient kernel
4679                                // failure) is invisible.
4680                                eprintln!("ktstr: worker report write_all failed: {e}");
4681                            }
4682                            drop(f);
4683                        }));
4684                    if child_result.is_ok() {
4685                        exit_child_success();
4686                    } else {
4687                        exit_child_error();
4688                    }
4689                }
4690                child_pid => {
4691                    // Parent: restore the pre-fork signal mask
4692                    // (the block was transient — only the child
4693                    // inherits the blocked SIGUSR1, and that
4694                    // child unblocks immediately after installing
4695                    // its handler). Without this restore, SIGUSR1
4696                    // would stay blocked in the calling thread for
4697                    // the lifetime of the workload, swallowing any
4698                    // signal directed at the parent process.
4699                    let psm_parent_restore_rc = unsafe {
4700                        libc::pthread_sigmask(libc::SIG_SETMASK, &old_mask, std::ptr::null_mut())
4701                    };
4702                    if psm_parent_restore_rc != 0 {
4703                        tracing::warn!(
4704                            rc = psm_parent_restore_rc,
4705                            "pthread_sigmask(SIG_SETMASK) failed restoring mask in parent post-fork; SIGUSR1 stays blocked in this thread for the lifetime of the workload"
4706                        );
4707                    }
4708                    // Close unused pipe ends.
4709                    unsafe {
4710                        libc::close(report_fds[1]);
4711                        libc::close(start_fds[0]);
4712                    }
4713                    // child_pid is positive by the -1 arm above, so
4714                    // fits in pid_t directly — store as pid_t so
4715                    // every downstream libc call avoids the u32→i32
4716                    // sign-cast wraparound bug.
4717                    guard.children.push(ForkedChild {
4718                        pid: child_pid,
4719                        report_fd: report_fds[0],
4720                        start_fd: start_fds[1],
4721                        kind: ForkedChildKind::Worker {
4722                            group_idx: group.group_idx,
4723                        },
4724                    });
4725                }
4726            }
4727        }
4728
4729        Ok(())
4730    }
4731
4732    /// Kernel TIDs of all worker tasks, in spawn order.
4733    ///
4734    /// Returned as `libc::pid_t` — the kernel's native type — so
4735    /// callers feed them directly into `kill`, `waitpid`,
4736    /// `Pid::from_raw`, and `sched_setaffinity` writes without any
4737    /// sign-cast at the libc boundary.
4738    ///
4739    /// # WARNING — `cgroup.procs` for `CloneMode::Thread`
4740    ///
4741    /// **For `CloneMode::Thread`, passing these TIDs to a
4742    /// `cgroup.procs` write migrates the ENTIRE test-runner process
4743    /// into that cgroup**: cgroup.procs writes are tgid-scoped, and
4744    /// every Thread worker shares the test runner's tgid. The first
4745    /// such write moves the test harness, every parent thread, and
4746    /// every sibling worker into the destination cgroup; subsequent
4747    /// writes are no-ops because they all point at the same tgid.
4748    /// Use cgroup v2 threaded-mode cgroups with `cgroup.threads`
4749    /// for per-thread placement. `CloneMode::Fork` is the right
4750    /// choice when each worker needs its own cgroup.
4751    ///
4752    /// # Per-mode interpretation
4753    ///
4754    /// - [`CloneMode::Fork`]: each entry is the worker's pid
4755    ///   (== tgid == kernel tid because the worker is its own
4756    ///   thread-group leader). Safe to feed into `cgroup.procs`.
4757    /// - [`CloneMode::Thread`]: each entry is the worker's
4758    ///   `gettid()` value — distinct kernel tasks inside the
4759    ///   parent's tgid. Safe for `sched_setaffinity(tid, ...)`;
4760    ///   safe for `cgroup.threads` writes under a threaded-mode
4761    ///   cgroup; **not** safe for `cgroup.procs` (see warning above).
4762    ///
4763    /// # Thread tid publish ordering
4764    ///
4765    /// Thread workers publish their `gettid()` via an
4766    /// `Arc<AtomicI32>` BEFORE the start handshake (the publish is
4767    /// the first thing the worker closure does, before blocking on
4768    /// `start_rx`). `spawn_thread_worker` blocks on a paired
4769    /// rendezvous channel until the worker reaches the publish
4770    /// point, so by the time [`WorkloadHandle::spawn`] returns,
4771    /// every thread worker's `tid` is non-zero (or spawn would have
4772    /// bailed with a "failed to publish gettid() within 2 s"
4773    /// error). Post-spawn `worker_pids()` is safe to call without
4774    /// first calling [`Self::start`]. The publish uses `Release`;
4775    /// this reader uses `Acquire`, pairing release-acquire so that
4776    /// any reader observing a non-zero tid is also guaranteed to
4777    /// observe the worker's post-publish state.
4778    ///
4779    /// # `pcomm` containers
4780    ///
4781    /// Groups configured with [`WorkSpec::pcomm`] yield a single
4782    /// container process whose tgid leader pid is what the parent
4783    /// holds; the per-thread `gettid()` values of the workers running
4784    /// inside the container are not exported across the process
4785    /// boundary. Each pcomm group contributes ONE entry to the
4786    /// returned vector — the container pid — regardless of how many
4787    /// thread workers it hosts. This pid is correct for `cgroup.procs`
4788    /// migration (the container's whole tgid moves) but is NOT
4789    /// suitable as a target for per-thread `sched_setaffinity` —
4790    /// see [`Self::set_affinity`] for the matching error path.
4791    ///
4792    /// Order: forked children (conventional workers + pcomm
4793    /// containers, in spawn order) followed by Thread-mode workers
4794    /// (in spawn order). A workload that mixes pcomm groups with
4795    /// non-pcomm Thread-mode groups produces both populated
4796    /// collections; a workload using only one dispatch path produces
4797    /// only one populated collection.
4798    pub fn worker_pids(&self) -> Vec<libc::pid_t> {
4799        let mut out = Vec::with_capacity(self.children.len() + self.threads.len());
4800        out.extend(self.children.iter().map(|c| c.pid));
4801        out.extend(self.threads.iter().map(|tw| tw.tid.load(Ordering::Acquire)));
4802        out
4803    }
4804
4805    /// Worker pids suitable for `cgroup.procs` migration.
4806    ///
4807    /// `cgroup.procs` is **tgid-scoped** in the kernel: writing a
4808    /// tid migrates the entire thread group containing that tid
4809    /// (`kernel/cgroup/cgroup.c::__cgroup_procs_write` resolves the
4810    /// passed pid to its leader via `find_lock_task_mm` /
4811    /// `cgroup_procs_write_start`). Under [`CloneMode::Thread`]
4812    /// every worker shares the test harness's tgid, so feeding
4813    /// [`Self::worker_pids`] to `cgroup.procs` would migrate the
4814    /// harness itself — catastrophic.
4815    ///
4816    /// Returns the per-worker pids when the spawn used
4817    /// [`CloneMode::Fork`] (each worker has its own tgid). Bails
4818    /// for [`CloneMode::Thread`] with an actionable diagnostic
4819    /// pointing at `cgroup.threads` (the thread-scoped sibling) as
4820    /// the right migration sink for thread workers.
4821    ///
4822    /// Callers that integrate with `cgroup.procs` writes — e.g.
4823    /// [`crate::cgroup::CgroupManager::move_tasks`] — should call
4824    /// this in place of [`Self::worker_pids`] so a misconfigured
4825    /// Thread-mode test fails at the migration step rather than
4826    /// silently moving the harness into the per-test cgroup.
4827    pub fn worker_pids_for_cgroup_procs(&self) -> Result<Vec<libc::pid_t>> {
4828        if !self.threads.is_empty() {
4829            anyhow::bail!(
4830                "WorkloadHandle::worker_pids_for_cgroup_procs: workers were \
4831                 spawned with CloneMode::Thread; their pids share the test \
4832                 harness's tgid and a `cgroup.procs` write would migrate the \
4833                 harness. Use `cgroup.threads` (thread-scoped) for Thread-mode \
4834                 workers, or switch to CloneMode::Fork."
4835            );
4836        }
4837        Ok(self.worker_pids())
4838    }
4839
4840    /// Signal all workers to start working (after they've been
4841    /// placed in cgroups, if applicable).
4842    ///
4843    /// Idempotent — subsequent calls after the first are no-ops.
4844    pub fn start(&mut self) {
4845        if self.started {
4846            return;
4847        }
4848        self.started = true;
4849        // Fork-mode: write a byte to each child's start pipe (covers
4850        // both conventional workers and pcomm containers — both wait
4851        // on the same start-byte handshake before entering their work
4852        // path).
4853        for child in &mut self.children {
4854            unsafe {
4855                libc::write(child.start_fd, b"s".as_ptr() as *const _, 1);
4856                libc::close(child.start_fd);
4857            }
4858            child.start_fd = -1;
4859        }
4860        // Thread-mode: send `()` on each worker's start_tx. The
4861        // SyncSender(0) rendezvous means each send blocks until the
4862        // worker calls recv(); if the worker has been joined or has
4863        // panicked before reaching recv, send returns Err which we
4864        // swallow (the join in stop_and_collect surfaces the real
4865        // exit). Take ownership so a future start() call (illegal
4866        // by the idempotence guard above) can't re-send.
4867        for tw in &mut self.threads {
4868            if let Some(tx) = tw.start_tx.take() {
4869                let _ = tx.send(());
4870            }
4871        }
4872    }
4873
4874    /// Set CPU affinity for worker at `idx`.
4875    ///
4876    /// For [`CloneMode::Fork`] the per-worker pid addresses a
4877    /// distinct kernel task. For [`CloneMode::Thread`] the worker's
4878    /// `gettid()` is what `sched_setaffinity(tid, ...)` accepts;
4879    /// this method reads the tid from the worker's
4880    /// `Arc<AtomicI32>` (with `Acquire` ordering, paired with the
4881    /// `Release` publish on the worker thread). Returns an error
4882    /// if the thread has not yet published its tid — call
4883    /// [`start()`](Self::start) first so the worker reaches its
4884    /// `gettid()` publish before reading.
4885    ///
4886    /// Bails for `ForkedChildKind::PcommContainer` entries: the
4887    /// container's tgid leader pid is the only kernel handle the
4888    /// parent holds for that group, but `sched_setaffinity` against
4889    /// that pid pins only the leader thread (a placeholder thread
4890    /// that never enters the work loop), not the worker threads
4891    /// running the workload. The container's worker tids are not
4892    /// published across the process boundary. Bake the affinity into
4893    /// [`WorkSpec::affinity`] at spawn time instead — `worker_main`
4894    /// applies it per-thread inside the container.
4895    ///
4896    /// Index space: `[0, children.len())` addresses forked children
4897    /// (conventional workers and pcomm containers), and
4898    /// `[children.len(), children.len() + threads.len())` addresses
4899    /// Thread-mode workers, matching the ordering of
4900    /// [`Self::worker_pids`].
4901    pub fn set_affinity(&self, idx: usize, cpus: &BTreeSet<usize>) -> Result<()> {
4902        let pid = if idx < self.children.len() {
4903            let child = &self.children[idx];
4904            if let ForkedChildKind::PcommContainer { groups } = &child.kind {
4905                let total: usize = groups.iter().map(|(_, n)| n).sum();
4906                anyhow::bail!(
4907                    "set_affinity: child {idx} is a pcomm container \
4908                     hosting {total} thread workers; per-thread \
4909                     tids are not exported across the process boundary. \
4910                     Set affinity via WorkSpec::affinity at spawn time \
4911                     so worker_main applies it inside the container."
4912                );
4913            }
4914            child.pid
4915        } else {
4916            let thread_idx = idx - self.children.len();
4917            let tid = self.threads[thread_idx].tid.load(Ordering::Acquire);
4918            if tid == 0 {
4919                anyhow::bail!(
4920                    "set_affinity: thread worker {thread_idx} has not yet \
4921                     published gettid() (call start() first)"
4922                );
4923            }
4924            tid
4925        };
4926        set_thread_affinity(pid, cpus)
4927    }
4928
4929    /// Read all workers' current iteration counts from shared memory.
4930    ///
4931    /// Each element is the monotonically increasing iteration count for
4932    /// that worker, read with Relaxed ordering. Returns an empty vec
4933    /// if no workers were spawned.
4934    ///
4935    /// # Ordering rationale — why Relaxed is sound
4936    ///
4937    /// Every producer (the worker-side store at the
4938    /// `worker_main` publish sites) writes its slot with Relaxed
4939    /// ordering, and this reader loads with Relaxed too. No
4940    /// happens-before edge is needed because no host-side consumer
4941    /// pairs the iteration count with OTHER shared state: the
4942    /// parent samples these counters to answer "is this worker
4943    /// still making progress?" and feeds deltas into gap
4944    /// detection, not into any data-dependent follow-up read from
4945    /// a different shared memory location. A stale value on one
4946    /// sample is self-correcting — the next snapshot picks up the
4947    /// newer count without any cross-field invariant to break.
4948    ///
4949    /// The per-slot single-producer / multi-sampler shape is
4950    /// inherently non-tearing on every supported target
4951    /// (AtomicU64 is architecture-primitive on x86_64 and aarch64
4952    /// LSE with 8-byte alignment enforced by the type). The only
4953    /// question is ordering, and the audit above concludes Relaxed
4954    /// is load-bearingly correct — promoting either side to
4955    /// Acquire/Release would add a barrier with no corresponding
4956    /// paired operation to synchronise with.
4957    pub fn snapshot_iterations(&self) -> Vec<u64> {
4958        if self.iter_counters.is_null() || self.iter_counter_len == 0 {
4959            return Vec::new();
4960        }
4961        (0..self.iter_counter_len)
4962            .map(|i| {
4963                // SAFETY: alignment + atomic-only-access invariant
4964                // established at the iter_counters mmap site in
4965                // `WorkloadHandle::spawn` and carried by the
4966                // `*mut AtomicU64` type. Relaxed ordering: see the
4967                // rationale in the outer doc comment.
4968                unsafe { &*self.iter_counters.add(i) }.load(Ordering::Relaxed)
4969            })
4970            .collect()
4971    }
4972
4973    /// Broadcast the current scenario phase to this handle's workers.
4974    /// The scenario engine calls this at each step boundary — the
4975    /// 1-indexed phase (Step k → k + 1) at StepStart, `u32::MAX` (the
4976    /// inter-step gap sentinel) at StepEnd — so a backdrop worker
4977    /// drains a per-phase `PhaseSlice` on each transition. No-op when
4978    /// the handle has no `phase_epoch` region. Release store pairs with
4979    /// the worker's Relaxed load; it is telemetry, so no happens-before
4980    /// edge is required (see the `unsafe impl Send for WorkloadHandle`
4981    /// SAFETY note), but Release keeps the store promptly visible.
4982    pub(crate) fn set_phase_epoch(&self, epoch: u32) {
4983        if self.phase_epoch.is_null() {
4984            return;
4985        }
4986        // SAFETY: alignment + atomic-only-access invariant established
4987        // at the phase_epoch mmap site in `WorkloadHandle::spawn` /
4988        // `spawn_pcomm_cgroup` and carried by the `*mut AtomicU32`
4989        // type. The parent is the sole writer; workers are readers.
4990        unsafe { &*self.phase_epoch }.store(epoch, Ordering::Release);
4991    }
4992
4993    /// Stop all workers, collect their reports, and wait for exit.
4994    ///
4995    /// Auto-starts workers if [`start()`](Self::start) was not called,
4996    /// then waits on an event-driven barrier — each fork worker
4997    /// writes a single `b'r'` byte to its report pipe immediately
4998    /// after the start handshake completes, and the parent polls
4999    /// every report fd for `POLLIN` with a 5 s deadline. The
5000    /// barrier wakes the moment the slowest worker finishes its
5001    /// post-fork init, replacing the prior unconditional 500 ms
5002    /// sleep that under-waited under host CPU contention and
5003    /// over-waited on idle hosts. Thread-mode workers are pre-
5004    /// synchronised by `start()`'s `mpsc::sync_channel(0)` rendezvous,
5005    /// so the barrier is a no-op when no fork children were spawned.
5006    /// Consumes `self` -- workers cannot be restarted.
5007    ///
5008    /// Workers that fail to produce a report (died, timed out, or wrote
5009    /// corrupt data) get a zeroed-out sentinel report with `work_units: 0`.
5010    /// This ensures `assert_not_starved` catches dead workers as starvation
5011    /// failures.
5012    ///
5013    /// # Shutdown latency
5014    ///
5015    /// Workers spend their steady-state time blocked inside a
5016    /// `futex_wait` with timeout `WORKER_STOP_POLL_NS` (~100 ms).
5017    /// The "stop signal" is a per-mode flag the worker checks on
5018    /// every futex-wait wake; the wake interval bounds shutdown
5019    /// latency.
5020    ///
5021    /// _Fork mode_ — `stop_and_collect` sends SIGUSR1 to each
5022    /// worker pid; the per-process `sigusr1_handler` flips the
5023    /// global `STOP` in that worker's CoW address space, and the
5024    /// worker observes it on the NEXT futex wake (partner-writes
5025    /// or the 100 ms timeout, whichever comes first). The signal
5026    /// handler is process-wide and reaches one worker per kill().
5027    ///
5028    /// _Thread mode_ — `stop_and_collect` calls
5029    /// `worker.stop.store(true, Relaxed)` directly on each
5030    /// worker's `Arc<AtomicBool>`. SIGUSR1 is process-wide and
5031    /// useless for per-thread stop control, so no signal is sent;
5032    /// the worker observes the flag flip on its next futex-wait
5033    /// wake at the same 100 ms cadence.
5034    ///
5035    /// Callers that budget a graceful-shutdown window should
5036    /// allow at least one `WORKER_STOP_POLL_NS` tick (~100 ms)
5037    /// between flag flip and final collect, over and above any
5038    /// report-flush / IO latency. Tighter windows can race the
5039    /// worker's pre-stop iteration and surface as a missing
5040    /// report, which is then mapped to the sentinel path above.
5041    ///
5042    /// # Exit-shape invariance
5043    ///
5044    /// Collection discriminates purely on the presence and validity of
5045    /// the worker's pipe-delivered postcard payload — **not** on
5046    /// `waitpid` exit status. Under `panic = "unwind"` (dev/test
5047    /// profile) the worker's
5048    /// `catch_unwind` arm calls `_exit(1)` so the parent sees
5049    /// `WIFEXITED=true`, `WEXITSTATUS=1`; under `panic = "abort"`
5050    /// (release profile) the worker aborts with `SIGABRT` so the parent
5051    /// sees `WIFEXITED=false`, `WTERMSIG=6`. Either way, a panicking
5052    /// worker never finishes `f.write_all(&bytes)` on the report pipe,
5053    /// so `poll` + `read_to_end` hands back an empty (or truncated)
5054    /// buffer, `postcard::from_bytes` fails, and the
5055    /// sentinel path fires. Partial writes from a panic between successful
5056    /// `write_all` and `_exit(0)` are not reachable — the write is the
5057    /// last non-trivial statement inside the catch_unwind closure.
5058    /// The `waitpid` call later in this function exists solely for
5059    /// reaping zombies; its return value feeds only the "still alive
5060    /// → SIGKILL escalate" branch and is never mapped to report
5061    /// state (the sentinel path DOES now read it to populate
5062    /// [`WorkerExitInfo`] on the attached diagnostic, but the
5063    /// correctness discrimination — sentinel vs real report — still
5064    /// happens purely on pipe payload presence).
5065    pub fn stop_and_collect(mut self) -> Vec<WorkerReport> {
5066        // Auto-start if not explicitly started (workers in parent cgroup)
5067        let was_started = self.started;
5068        self.start();
5069
5070        // Event-driven worker-started barrier. Each fork worker writes
5071        // a single `b'r'` byte to its report pipe immediately after
5072        // the start-pipe handshake completes (see the matching write
5073        // inside `worker_main`'s catch_unwind closure). Polling every
5074        // worker's report fd for `POLLIN` with a bounded deadline
5075        // wakes the moment the slowest worker has finished its
5076        // post-fork init — replacing the prior 500 ms blind sleep
5077        // that could under-wait on a CPU-contended host (false
5078        // starvation if the worker entered its loop after stop was
5079        // signalled) and over-wait on an idle host (~500 ms wasted
5080        // per `stop_and_collect`).
5081        //
5082        // Thread-mode workers do not need a barrier here: `start()`
5083        // above sent on a `mpsc::sync_channel(0)` SyncSender(0)
5084        // rendezvous, which blocks the parent until the worker's
5085        // matching `recv()` returns — by the time `start()` returns,
5086        // every thread worker has crossed its start handshake. Only
5087        // the fork-mode pipe-based start signal is fire-and-forget,
5088        // so the barrier is gated on `!self.children.is_empty()`.
5089        //
5090        // Deadline budget: 5 s mirrors the existing collect deadline
5091        // below. Each iteration polls every still-pending fd with
5092        // the remaining budget; a worker that returns `POLLHUP` /
5093        // `POLLERR` (died before writing the ready byte — fork-race
5094        // close, panic during early init, or the kernel killed it)
5095        // is dropped from the pending set and the surrounding
5096        // collect path's sentinel-report logic surfaces it. The
5097        // worst case (every worker hits the deadline without a
5098        // ready byte) bounds the wait at the same 5 s the legacy
5099        // sleep + collect path budgeted for the entire stop_and_collect.
5100        if !was_started && !self.children.is_empty() {
5101            let barrier_deadline = std::time::Instant::now() + std::time::Duration::from_secs(5);
5102            // Pending = (index_into_children, read_fd). We track the
5103            // index so a `POLLHUP` worker can be removed from
5104            // pending without disturbing the collect loop's
5105            // ordering below.
5106            let mut pending: Vec<(usize, i32)> = self
5107                .children
5108                .iter()
5109                .enumerate()
5110                .map(|(i, c)| (i, c.report_fd))
5111                .collect();
5112            while !pending.is_empty() {
5113                let remaining =
5114                    barrier_deadline.saturating_duration_since(std::time::Instant::now());
5115                if remaining.is_zero() {
5116                    // Barrier deadline expired with workers still
5117                    // pending. Stop waiting and proceed to signal
5118                    // stop. The collect path below treats any
5119                    // worker whose pipe never produced data as a
5120                    // sentinel report — same outcome as a worker
5121                    // that started in time but produced an
5122                    // unparseable report.
5123                    break;
5124                }
5125                let ms = remaining.as_millis().min(i32::MAX as u128) as i32;
5126                let mut pfds: Vec<libc::pollfd> = pending
5127                    .iter()
5128                    .map(|&(_, fd)| libc::pollfd {
5129                        fd,
5130                        events: libc::POLLIN,
5131                        revents: 0,
5132                    })
5133                    .collect();
5134                // SAFETY: `pfds` is a non-empty owned Vec; nfds
5135                // matches its length; `ms >= 0` since the
5136                // remaining-zero branch above bails first. Return
5137                // codes are interpreted below.
5138                let ret = unsafe { libc::poll(pfds.as_mut_ptr(), pfds.len() as libc::nfds_t, ms) };
5139                if ret < 0 {
5140                    let err = std::io::Error::last_os_error();
5141                    if err.kind() == std::io::ErrorKind::Interrupted {
5142                        // EINTR — re-poll with the remaining budget
5143                        // on the next iteration. Common during
5144                        // teardown when a sibling thread sends a
5145                        // signal; the loop's deadline guard bounds
5146                        // total wait time regardless of EINTR
5147                        // frequency.
5148                        continue;
5149                    }
5150                    // Hard poll failure (EFAULT impossible with an
5151                    // owned Vec; ENOMEM extreme). Bail out of the
5152                    // barrier and let the collect path's per-fd
5153                    // poll handle each worker individually.
5154                    tracing::warn!(
5155                        %err,
5156                        pending = pending.len(),
5157                        "WorkloadHandle::stop_and_collect: barrier poll failed; falling \
5158                         through to per-worker collect"
5159                    );
5160                    break;
5161                }
5162                // ret == 0 means the per-iteration timeout fired
5163                // without any fd ready — but we measured this
5164                // against `remaining`, so the next iteration's
5165                // saturating_duration_since will be zero and the
5166                // top-of-loop guard exits. Don't break here: a
5167                // future cycle could still be useful if the system
5168                // clock jumped backward, and the cost is one extra
5169                // iteration that immediately bails.
5170                if ret > 0 {
5171                    pending.retain(|&(_, fd)| {
5172                        // Find this fd's pollfd entry. Linear scan
5173                        // is fine: typical worker counts are <100
5174                        // and the alternative (HashMap) costs more
5175                        // in setup than the linear scan saves.
5176                        let pfd = pfds.iter().find(|p| p.fd == fd);
5177                        let revents = pfd.map(|p| p.revents).unwrap_or(0);
5178                        if revents & libc::POLLIN != 0 {
5179                            // Ready byte arrived. Consume exactly
5180                            // 1 byte and remove from pending. The
5181                            // raw `libc::read` does not take
5182                            // ownership of the fd — the collect
5183                            // path below still owns it via the
5184                            // `children` Vec and will read the
5185                            // postcard tail on its own deadline.
5186                            let mut byte: u8 = 0;
5187                            // SAFETY: `&mut byte` is a valid
5188                            // 1-byte buffer; `fd` is the report
5189                            // read end the parent owns until
5190                            // collect drains it. A 0 / -1 return
5191                            // is treated as not-yet-ready; the
5192                            // next iteration retries.
5193                            let n = unsafe {
5194                                libc::read(fd, &mut byte as *mut u8 as *mut libc::c_void, 1)
5195                            };
5196                            // n == 1 → ready byte consumed; drop
5197                            // from pending.
5198                            // n == 0 → POLLIN with zero-byte read
5199                            // means EOF (writer closed) without
5200                            // sending the byte — worker died
5201                            // pre-write. Drop from pending; the
5202                            // collect path emits a sentinel report.
5203                            // n < 0 → transient error (EAGAIN
5204                            // shouldn't happen since POLLIN
5205                            // signalled readability, but on EINTR
5206                            // the next iteration re-polls).
5207                            if n >= 0 {
5208                                return false;
5209                            }
5210                            // Negative return: re-check kind.
5211                            let err = std::io::Error::last_os_error();
5212                            if err.kind() == std::io::ErrorKind::Interrupted {
5213                                return true;
5214                            }
5215                            tracing::warn!(
5216                                %err,
5217                                fd,
5218                                "WorkloadHandle::stop_and_collect: barrier byte read \
5219                                 failed; treating worker as ready"
5220                            );
5221                            return false;
5222                        }
5223                        if revents & (libc::POLLHUP | libc::POLLERR | libc::POLLNVAL) != 0 {
5224                            // Worker closed the write end without
5225                            // sending the ready byte (panic during
5226                            // post-fork init, or kernel-killed
5227                            // before reaching the write). Drop
5228                            // from pending; the collect path's
5229                            // `read_to_end` returns 0 bytes and the
5230                            // sentinel-report branch fires.
5231                            return false;
5232                        }
5233                        // No POLLIN, no hangup — keep waiting.
5234                        true
5235                    });
5236                }
5237            }
5238        }
5239
5240        let mut reports = Vec::new();
5241        let children = std::mem::take(&mut self.children);
5242        let threads = std::mem::take(&mut self.threads);
5243
5244        // Drain the 'r' ready byte from each child's report pipe
5245        // so the collect poll below waits for the report payload, not
5246        // the already-present ready byte. The barrier section above
5247        // consumes it for auto-started workers; explicitly-started
5248        // workers (was_started=true) skip the barrier.
5249        if was_started {
5250            for child in &children {
5251                let mut byte: u8 = 0;
5252                unsafe {
5253                    libc::read(
5254                        child.report_fd,
5255                        &mut byte as *mut u8 as *mut libc::c_void,
5256                        1,
5257                    );
5258                }
5259            }
5260        }
5261
5262        // Signal all fork-mode children to stop via SIGUSR1; the
5263        // signal handler flips that child's process-local STOP, which
5264        // worker_main's `stop_requested` checks read. For pcomm
5265        // containers the SIGUSR1 flips the container's STOP, which
5266        // every thread inside the container observes via `&STOP`
5267        // passed to `worker_main` — one signal stops the whole
5268        // group cleanly. `pid` is `libc::pid_t`, so it flows to
5269        // `Pid::from_raw` without the u32→i32 sign-cast wraparound
5270        // that produced `kill(-1, ...)` session-wide reaps when the
5271        // old u32 pid exceeded i32::MAX.
5272        for child in &children {
5273            let _ = nix::sys::signal::kill(
5274                nix::unistd::Pid::from_raw(child.pid),
5275                nix::sys::signal::Signal::SIGUSR1,
5276            );
5277        }
5278        // Signal all thread-mode workers by flipping each worker's
5279        // per-task `stop`. SIGUSR1 is process-wide and useless for
5280        // per-thread stop; the Arc<AtomicBool> threaded through
5281        // worker_main is the only path that reaches an individual
5282        // thread without affecting siblings.
5283        for tw in &threads {
5284            tw.stop.store(true, Ordering::Relaxed);
5285        }
5286
5287        // Collect reports with a shared 5s deadline across all workers.
5288        // Each worker gets the remaining budget, so starved workers
5289        // (e.g. under degrade mode) don't serially exhaust the VM
5290        // timeout.
5291        let deadline = std::time::Instant::now() + std::time::Duration::from_secs(5);
5292
5293        // Bound on the post-SIGKILL worker reap (below). A worker normally
5294        // exits <<1s after SIGKILL (post-crash bypass keeps it
5295        // CFS-schedulable). The rare wedge: a sched_setaffinity flipper
5296        // stuck uninterruptible in `affine_move_task`'s `wait_for_completion`
5297        // — a migration that can't finish until the target becomes
5298        // migratable, which during the crash/bypass window can take seconds
5299        // — cannot take its pending SIGKILL until that syscall returns, so a
5300        // blocking `waitpid` would stall collect for that whole window.
5301        // The deadline is SHARED across workers: every such wedge waits out
5302        // the SAME crash/bypass window, so once one times out the rest get
5303        // the remaining (~0) budget and give up immediately rather than
5304        // summing to N×timeout. Mirrors the scheduler-side
5305        // `reap_child_bounded` (rust_init/scheduler.rs); on timeout the zombie is left
5306        // for VM reboot to reap (SIGKILL cannot make it exit faster — see
5307        // that fn's doc).
5308        const WORKER_REAP_TIMEOUT: Duration = Duration::from_secs(3);
5309        let mut worker_reap_deadline: Option<Instant> = None;
5310        // Evented, bounded reap of a SIGKILLed worker `pid`: wait up to
5311        // `timeout` for it to become a zombie (`pidfd_open` + `poll`),
5312        // then reap non-blocking. Returns false on timeout (worker still
5313        // uninterruptibly wedged — the rare case `WORKER_REAP_TIMEOUT`
5314        // documents; left for VM reboot).
5315        fn reap_pid_bounded(pid: libc::pid_t, timeout: Duration) -> bool {
5316            // Non-blocking reap of `pid` (observed StillAlive just above,
5317            // so this succeeds only once it has become a zombie).
5318            let reap = || {
5319                matches!(
5320                    nix::sys::wait::waitpid(
5321                        nix::unistd::Pid::from_raw(pid),
5322                        Some(nix::sys::wait::WaitPidFlag::WNOHANG),
5323                    ),
5324                    Ok(nix::sys::wait::WaitStatus::Exited(..))
5325                        | Ok(nix::sys::wait::WaitStatus::Signaled(..))
5326                )
5327            };
5328            match crate::sync::pidfd_poll_exited(pid, timeout) {
5329                // Readable => zombie => the WNOHANG reap is non-blocking.
5330                crate::sync::PidfdWait::Exited => {
5331                    let _ = reap();
5332                    true
5333                }
5334                // Still uninterruptibly wedged (see `WORKER_REAP_TIMEOUT`):
5335                // leave the zombie-to-be for VM reboot (SIGKILL already
5336                // pending).
5337                crate::sync::PidfdWait::TimedOut => false,
5338                // pidfd_open failed (ESRCH/gone): one non-blocking reap.
5339                crate::sync::PidfdWait::NoPidfd => reap(),
5340            }
5341        }
5342
5343        for child in children {
5344            let mut buf = Vec::new();
5345            let remaining = deadline.saturating_duration_since(std::time::Instant::now());
5346            let ms = remaining.as_millis().min(i32::MAX as u128) as i32;
5347            let mut poll_ready = false;
5348            if ms > 0 {
5349                let mut pfd = libc::pollfd {
5350                    fd: child.report_fd,
5351                    events: libc::POLLIN,
5352                    revents: 0,
5353                };
5354                let ready = unsafe { libc::poll(&mut pfd, 1, ms) };
5355                poll_ready = ready > 0;
5356            }
5357
5358            let npid = nix::unistd::Pid::from_raw(child.pid);
5359            if !poll_ready {
5360                // Deadline expired — child didn't write a report.
5361                // Kill first so read_to_end doesn't block on the
5362                // pipe's write end (the child may have written
5363                // only the 'r' ready byte but no payload).
5364                kill_and_killpg(npid, nix::sys::signal::Signal::SIGKILL);
5365                let _ = nix::unistd::close(child.report_fd);
5366            } else {
5367                // Child responded — read the report, then kill.
5368                let mut f = unsafe { std::fs::File::from_raw_fd(child.report_fd) };
5369                let _ = f.read_to_end(&mut buf);
5370                drop(f);
5371                kill_and_killpg(npid, nix::sys::signal::Signal::SIGKILL);
5372            }
5373            let waited = nix::sys::wait::waitpid(npid, Some(nix::sys::wait::WaitPidFlag::WNOHANG));
5374            let still_running = matches!(waited, Ok(nix::sys::wait::WaitStatus::StillAlive));
5375            let exit_info_source: Result<nix::sys::wait::WaitStatus, nix::errno::Errno> =
5376                if still_running {
5377                    // Bounded reap (see `reap_pid_bounded` + the shared
5378                    // `worker_reap_deadline` above): replaces a blocking
5379                    // `waitpid(npid, None)` that stalled on a worker
5380                    // uninterruptibly wedged in an affine_move_task
5381                    // migration (seconds; see `WORKER_REAP_TIMEOUT`).
5382                    // exit_info stays `StillAlive` either way — the prior
5383                    // blocking reap also discarded the status here.
5384                    let rd = *worker_reap_deadline
5385                        .get_or_insert_with(|| Instant::now() + WORKER_REAP_TIMEOUT);
5386                    let remaining = rd.saturating_duration_since(Instant::now());
5387                    reap_pid_bounded(child.pid, remaining);
5388                    Ok(nix::sys::wait::WaitStatus::StillAlive)
5389                } else {
5390                    waited
5391                };
5392
5393            // Strip the leading `b'r'` ready byte if the auto-start
5394            // barrier above did not consume it. Children write this
5395            // byte unconditionally on every success path right after
5396            // the start handshake; the barrier polls + reads it when
5397            // `stop_and_collect` auto-started children, but
5398            // explicit-start callers (`start()` invoked before
5399            // `stop_and_collect`) bypass the barrier and the byte
5400            // sits in the pipe ahead of the payload. Strip exactly 1
5401            // byte if present so the per-kind decoder sees a clean
5402            // payload either way.
5403            let report_slice: &[u8] = if buf.first() == Some(&b'r') {
5404                &buf[1..]
5405            } else {
5406                &buf[..]
5407            };
5408
5409            // Per-kind decoding. `Worker` carries a single postcard
5410            // `WorkerReport`. `PcommContainer` carries
5411            // a `serde_json` `Vec<WorkerReport>` — serde_json for
5412            // the pcomm container; fork-mode workers use postcard
5413            // for the single `WorkerReport`. Both
5414            // payloads ride the EOF-terminated pipe with no length
5415            // prefix; the parent's `read_to_end` provides framing.
5416            // On a decode failure, emit one sentinel per expected
5417            // report so downstream consumers (per-group filtering,
5418            // assert_not_starved) see the correct cardinality.
5419            match child.kind {
5420                ForkedChildKind::Worker { group_idx } => {
5421                    let decoded: Result<WorkerReport, _> = postcard::from_bytes(report_slice);
5422                    if let Ok(report) = decoded {
5423                        reports.push(report);
5424                    } else {
5425                        let exit_info = classify_wait_outcome(exit_info_source);
5426                        eprintln!(
5427                            "ktstr: worker pid={} returned no report ({} bytes read, exit={exit_info:?})",
5428                            child.pid,
5429                            buf.len(),
5430                        );
5431                        reports.push(WorkerReport {
5432                            tid: child.pid,
5433                            group_idx,
5434                            exit_info: Some(exit_info),
5435                            ..WorkerReport::default()
5436                        });
5437                    }
5438                }
5439                ForkedChildKind::PcommContainer { groups } => {
5440                    let total_workers: usize = groups.iter().map(|(_, n)| n).sum();
5441                    let decoded: Result<Vec<WorkerReport>, _> =
5442                        serde_json::from_slice(report_slice);
5443                    match decoded {
5444                        Ok(mut decoded_reports) if decoded_reports.len() == total_workers => {
5445                            reports.append(&mut decoded_reports);
5446                        }
5447                        Ok(mut decoded_reports) => {
5448                            // Cardinality mismatch — surface the
5449                            // partial set we did get and pad with
5450                            // sentinels so the total report count
5451                            // still equals `total_workers`. A short
5452                            // payload typically signals the
5453                            // thread-group leader died mid-encode
5454                            // (panic in `serde_json::to_vec`, OOM
5455                            // during Vec growth, or a write_all
5456                            // truncated by SIGKILL escalation).
5457                            // Sentinels are tagged with the right
5458                            // per-group `group_idx` using the
5459                            // `groups` layout: groups[0] consumed
5460                            // the first groups[0].1 slots,
5461                            // groups[1] the next groups[1].1, etc.
5462                            // We emit sentinels for the trailing
5463                            // missing slots, computing each
5464                            // sentinel's group_idx by walking the
5465                            // layout from `got` forward.
5466                            let exit_info = classify_wait_outcome(exit_info_source);
5467                            eprintln!(
5468                                "ktstr: pcomm thread-group leader pid={} returned {} of {} reports ({} bytes read, exit={exit_info:?})",
5469                                child.pid,
5470                                decoded_reports.len(),
5471                                total_workers,
5472                                buf.len(),
5473                            );
5474                            // Surplus reports (decoded > total_workers)
5475                            // must not leak into the parent's report
5476                            // stream — downstream cardinality assertions
5477                            // (per-group filtering, assert_not_starved)
5478                            // assume exactly `total_workers` entries.
5479                            // Truncate to the layout's total before the
5480                            // `got..total_workers` loop runs (which is
5481                            // a no-op for the surplus case after this).
5482                            decoded_reports.truncate(total_workers);
5483                            let got = decoded_reports.len();
5484                            for r in decoded_reports {
5485                                reports.push(r);
5486                            }
5487                            // Compute group_idx for each missing
5488                            // slot by walking the layout. `slot` is
5489                            // a 0-based global index from `got` to
5490                            // `total_workers - 1`; we accumulate
5491                            // per-group counts to find which group
5492                            // owns each slot.
5493                            for slot in got..total_workers {
5494                                let mut acc = 0usize;
5495                                let mut g_idx = 0usize;
5496                                for &(gi, n) in &groups {
5497                                    if slot < acc + n {
5498                                        g_idx = gi;
5499                                        break;
5500                                    }
5501                                    acc += n;
5502                                }
5503                                reports.push(WorkerReport {
5504                                    tid: child.pid,
5505                                    group_idx: g_idx,
5506                                    exit_info: Some(exit_info.clone()),
5507                                    ..WorkerReport::default()
5508                                });
5509                            }
5510                        }
5511                        Err(_) => {
5512                            let exit_info = classify_wait_outcome(exit_info_source);
5513                            eprintln!(
5514                                "ktstr: pcomm thread-group leader pid={} returned no decodable report ({} bytes read, exit={exit_info:?})",
5515                                child.pid,
5516                                buf.len(),
5517                            );
5518                            // Total decode failure — emit one
5519                            // sentinel per slot, tagging each with
5520                            // the correct group_idx.
5521                            for &(gi, n) in &groups {
5522                                for _ in 0..n {
5523                                    reports.push(WorkerReport {
5524                                        tid: child.pid,
5525                                        group_idx: gi,
5526                                        exit_info: Some(exit_info.clone()),
5527                                        ..WorkerReport::default()
5528                                    });
5529                                }
5530                            }
5531                        }
5532                    }
5533                }
5534            }
5535        }
5536
5537        // Thread-mode collection: join each worker's JoinHandle
5538        // (with the [`THREAD_JOIN_TIMEOUT`] budget) and adopt the
5539        // returned [`WorkerReport`]. Per-worker `stop` was flipped
5540        // above; the worker observes it in worker_main's
5541        // `stop.load(Relaxed)` checks (max ~100ms latency from the
5542        // FUTEX_WAIT_TIMEOUT poll cadence). Three outcomes:
5543        //
5544        //   1. Ok(report): join returned the worker's WorkerReport.
5545        //      Push as-is.
5546        //   2. Err(payload): the thread panicked. Build a sentinel
5547        //      report and attach
5548        //      `exit_info: Some(WorkerExitInfo::Panicked(msg))`
5549        //      where `msg` comes from `extract_panic_payload`.
5550        //   3. Timeout (5s elapsed without is_finished): emit a
5551        //      tracing::warn and push a sentinel with
5552        //      `exit_info: Some(WorkerExitInfo::TimedOut)` —
5553        //      `worker_main` should have observed the per-worker
5554        //      `stop` within 100ms, so a 5s no-show signals a
5555        //      genuinely stuck worker (deadlock, infinite spin,
5556        //      blocking syscall the runtime can't interrupt).
5557        //      stop_and_collect does NOT process::exit on timeout —
5558        //      the orphan thread keeps running until the test
5559        //      harness exits, but any subsequent worker uses a
5560        //      fresh per-worker `stop` so the orphan can't pollute
5561        //      later runs.
5562        for mut tw in threads {
5563            // Drop start_tx (idempotent — `start()` may have already
5564            // taken it). If start() ran first, `start_tx` is
5565            // already `None` and the take is a no-op; if the caller
5566            // skipped start() entirely, dropping start_tx here
5567            // signals the worker via `Disconnected` so it exits
5568            // cleanly without the rendezvous send.
5569            tw.start_tx.take();
5570            let tid = tw.tid.load(Ordering::Acquire);
5571            if let Some(j) = tw.join.take() {
5572                match join_thread_with_timeout(j, &tw.exit_evt, THREAD_JOIN_TIMEOUT) {
5573                    Some(Ok(report)) => reports.push(report),
5574                    Some(Err(payload)) => {
5575                        let msg = extract_panic_payload(payload);
5576                        eprintln!("ktstr: thread worker tid={tid} panicked: {msg}");
5577                        reports.push(WorkerReport {
5578                            tid,
5579                            completed: false,
5580                            exit_info: Some(WorkerExitInfo::Panicked(msg)),
5581                            ..WorkerReport::default()
5582                        });
5583                    }
5584                    None => {
5585                        tracing::warn!(
5586                            tid,
5587                            timeout_secs = THREAD_JOIN_TIMEOUT.as_secs(),
5588                            "thread worker did not join within timeout — leaking the \
5589                             thread; sentinel report attached with TimedOut exit_info"
5590                        );
5591                        reports.push(WorkerReport {
5592                            tid,
5593                            completed: false,
5594                            exit_info: Some(WorkerExitInfo::TimedOut),
5595                            ..WorkerReport::default()
5596                        });
5597                    }
5598                }
5599            }
5600        }
5601
5602        reports
5603    }
5604
5605    /// Deliver the terminal stop to every live worker without
5606    /// reaping. Signal-only: NO `waitpid`, NO fd close, NO consume of
5607    /// `self`, no blocking of any kind. Reaping, the fd/region
5608    /// cleanup, and (for `stop_and_collect`) report decoding all
5609    /// belong to the subsequent [`Self::stop_and_collect`] or
5610    /// [`Drop`] — doing any of them here would double-reap. This only
5611    /// pre-delivers the terminal signal so that reap finds workers
5612    /// already exiting.
5613    ///
5614    /// # Why (fork mode — the case this exists for)
5615    ///
5616    /// When the sched_ext scheduler under test errors out, the kernel
5617    /// disables sched_ext and its tasks fall back to the builtin
5618    /// scheduler, where CPU-bound workers keep spinning under
5619    /// contention. The normal reap in `stop_and_collect` SIGKILLs one
5620    /// worker then *blocks* in `waitpid` for it while the rest have
5621    /// had only the cooperative SIGUSR1 and keep spinning, so each
5622    /// dying worker must win a CPU slice against every still-runnable
5623    /// sibling before it can run its exit path — the serial reap
5624    /// becomes scheduling-gated and teardown time scales with worker
5625    /// count. `kill_and_killpg(SIGKILL)` here is delivered, not
5626    /// cooperative: once every worker carries a pending SIGKILL none
5627    /// re-enters userspace, so no worker competes for the CPU while
5628    /// the reap drains them, and each killed worker's report-pipe
5629    /// write end closes so the reap's `poll` returns `POLLHUP`
5630    /// immediately instead of waiting out its deadline. One
5631    /// `kill_and_killpg` per `ForkedChild` covers both a plain worker
5632    /// and a pcomm container: SIGKILL on any thread is fatal to the
5633    /// whole thread group, and `killpg` additionally reaches
5634    /// descendants — every forked child is its own process-group
5635    /// leader via `setpgid(0, 0)`, so pgid == child pid (mirrors the
5636    /// `killpg` in [`Drop`]).
5637    ///
5638    /// # Thread mode
5639    ///
5640    /// Thread-mode workers share the harness tgid and cannot be
5641    /// signalled individually, so there is NO SIGKILL fast path for
5642    /// them. This flips each worker's cooperative `stop` flag — the
5643    /// same store `stop_and_collect` and [`Drop`] already do, just
5644    /// earlier — and the worker still only observes it on its next
5645    /// futex-wait wake (`WORKER_STOP_POLL_NS`, ~100 ms). It is
5646    /// therefore NOT a teardown speedup for thread mode; it is kept
5647    /// for symmetry and is cheap and harmless (an idempotent relaxed
5648    /// store). The win is fork-mode only, which is where the
5649    /// SIGKILL-vs-cooperative distinction exists.
5650    ///
5651    /// # Frozen cgroups
5652    ///
5653    /// A cgroup-frozen worker is not a special case here. The freezer
5654    /// parks a userspace task in `TASK_INTERRUPTIBLE` (the jobctl
5655    /// freezer trap), so a fatal signal wakes it and it exits on its
5656    /// next `get_signal` — SIGKILL kills a frozen worker with no
5657    /// unfreeze required. (`collect_step` / `collect_backdrop` still
5658    /// unfreeze each cgroup as their first teardown step for reasons of
5659    /// their own; this method does not depend on that.)
5660    pub(crate) fn sigkill_workers(&self) {
5661        for child in &self.children {
5662            kill_and_killpg(
5663                nix::unistd::Pid::from_raw(child.pid),
5664                nix::sys::signal::Signal::SIGKILL,
5665            );
5666        }
5667        for tw in &self.threads {
5668            tw.stop.store(true, Ordering::Relaxed);
5669        }
5670    }
5671}
5672
5673impl Drop for WorkloadHandle {
5674    fn drop(&mut self) {
5675        use nix::sys::signal::{Signal, kill};
5676        use nix::sys::wait::waitpid;
5677        use nix::unistd::{Pid, close};
5678
5679        // Forked children (conventional workers AND pcomm
5680        // containers). `pid` is `libc::pid_t` — stored as i32 so
5681        // `Pid::from_raw` receives the kernel's native representation
5682        // directly, not the sign-cast of a u32 that could alias
5683        // negative values (including -1, i.e. every process in the
5684        // session).
5685        for child in &self.children {
5686            let pid = child.pid;
5687            let nix_pid = Pid::from_raw(pid);
5688            // killpg first: reach descendants the child may have
5689            // forked (Custom workloads, ForkExit caught mid-fork).
5690            // pgid == child pid because every forked child (worker
5691            // or pcomm container) calls `setpgid(0, 0)` at fork time.
5692            // ESRCH (group gone / no members) is expected and not a
5693            // warning-worthy failure; swallow it to keep the log
5694            // clean when the common no-descendants case drops.
5695            if let Err(e) = nix::sys::signal::killpg(nix_pid, Signal::SIGKILL)
5696                && e != nix::errno::Errno::ESRCH
5697            {
5698                tracing::warn!(pid, %e, "killpg failed in WorkloadHandle::drop");
5699            }
5700            if let Err(e) = kill(nix_pid, Signal::SIGKILL) {
5701                tracing::warn!(pid, %e, "kill failed in WorkloadHandle::drop");
5702            }
5703            if let Err(e) = waitpid(nix_pid, None) {
5704                tracing::warn!(pid, %e, "waitpid failed in WorkloadHandle::drop");
5705            }
5706            for fd in [child.report_fd, child.start_fd] {
5707                if fd >= 0
5708                    && let Err(e) = close(fd)
5709                {
5710                    tracing::warn!(fd, %e, "close failed in WorkloadHandle::drop");
5711                }
5712            }
5713        }
5714        // Thread-mode workers: flip stop, drop start_tx (in case
5715        // worker hasn't yet recv'd), join with the same 5s budget
5716        // `stop_and_collect` uses. Threads share the parent's
5717        // address space — there is no `kill` equivalent and no
5718        // MAP_SHARED ownership to give back. Drop still applies
5719        // the timeout so a stuck worker doesn't pin
5720        // `WorkloadHandle::drop` indefinitely; on timeout we log
5721        // the leak via `tracing::warn!` and proceed.
5722        let threads = std::mem::take(&mut self.threads);
5723        for mut tw in threads {
5724            tw.stop.store(true, Ordering::Relaxed);
5725            tw.start_tx.take();
5726            if let Some(j) = tw.join.take() {
5727                let tid = tw.tid.load(Ordering::Acquire);
5728                match join_thread_with_timeout(j, &tw.exit_evt, THREAD_JOIN_TIMEOUT) {
5729                    Some(Ok(_)) => {}
5730                    Some(Err(e)) => {
5731                        let payload = extract_panic_payload(e);
5732                        tracing::warn!(
5733                            tid,
5734                            payload,
5735                            "thread worker panicked in WorkloadHandle::drop"
5736                        );
5737                    }
5738                    None => {
5739                        tracing::warn!(
5740                            tid,
5741                            timeout_secs = THREAD_JOIN_TIMEOUT.as_secs(),
5742                            "thread worker failed to join within timeout in \
5743                             WorkloadHandle::drop — leaking the thread"
5744                        );
5745                    }
5746                }
5747            }
5748        }
5749        // Close inter-worker pipe pairs and chain pipes AFTER worker
5750        // shutdown. Ordering matters for Thread mode: every worker
5751        // thread shares the parent's fd table, so closing a pipe fd
5752        // before its using thread joins would surface to that thread
5753        // as `EBADF` on the next read/write/poll syscall. The
5754        // children-reap loop above and the threads-join loop above
5755        // both block until their worker is reaped or joined; only
5756        // then do these closes run, which is when the workers are
5757        // guaranteed to no longer touch their fds. Fork mode is
5758        // unaffected either way: each child held its own fd-table
5759        // copy via `fork()`, so this close is a no-op for the
5760        // child's view (its own copy was closed by the post-fork
5761        // close-other-fds block in spawn_group).
5762        //
5763        // Errors from `close` are logged via `tracing::warn!` rather
5764        // than swallowed — `EBADF` here would indicate a double-close
5765        // (an aliased ownership bug) and is more diagnostic than the
5766        // SpawnGuard early-bail path's silent close. SpawnGuard's
5767        // Drop swallows EBADF deliberately because mid-spawn the
5768        // guard may share fd ownership with already-closed
5769        // half-allocated state; the handle on the other hand has
5770        // sole ownership at this point.
5771        for (ab, ba) in &self.pipe_pairs {
5772            for fd in [ab[0], ab[1], ba[0], ba[1]] {
5773                if let Err(e) = close(fd) {
5774                    tracing::warn!(fd, %e, "close failed for pipe_pair fd in WorkloadHandle::drop");
5775                }
5776            }
5777        }
5778        for chain in &self.chain_pipes {
5779            for pipe in chain {
5780                for fd in [pipe[0], pipe[1]] {
5781                    if let Err(e) = close(fd) {
5782                        tracing::warn!(fd, %e, "close failed for chain_pipe fd in WorkloadHandle::drop");
5783                    }
5784                }
5785            }
5786        }
5787        for (&ptr, &size) in self.futex_ptrs.iter().zip(self.futex_region_sizes.iter()) {
5788            unsafe {
5789                libc::munmap(ptr as *mut libc::c_void, size);
5790            }
5791        }
5792        if !self.iter_counters.is_null() && self.iter_counter_len > 0 {
5793            unsafe {
5794                libc::munmap(
5795                    self.iter_counters as *mut libc::c_void,
5796                    self.iter_counter_len * std::mem::size_of::<u64>(),
5797                );
5798            }
5799        }
5800        if !self.phase_epoch.is_null() && self.phase_epoch_bytes > 0 {
5801            unsafe {
5802                libc::munmap(
5803                    self.phase_epoch as *mut libc::c_void,
5804                    self.phase_epoch_bytes,
5805                );
5806            }
5807        }
5808    }
5809}
5810
5811/// SIGUSR1 handler installed in the fork-mode child post-fork. Flips
5812/// the per-process global [`STOP`] so `worker_main`'s outer loop
5813/// exits at the next `stop_requested` check.
5814pub(super) extern "C" fn sigusr1_handler(_: libc::c_int) {
5815    STOP.store(true, Ordering::Relaxed);
5816}
5817
5818/// Render an actionable hint for a failed
5819/// `mmap(MAP_SHARED | MAP_ANONYMOUS)` call based on the observed
5820/// `errno`. Shared between the futex-region mmap and the
5821/// iter_counters mmap in [`WorkloadHandle::spawn`] so the two
5822/// sites emit identical hint text per errno — a drift would mean
5823/// two related failures produce inconsistent remediation advice.
5824///
5825/// Takes `Option<i32>` (the output of `std::io::Error::raw_os_error`)
5826/// so an unrecognised errno folds cleanly through the `_ => ""`
5827/// arm without forcing callers to `unwrap`.
5828///
5829/// The leading space on every non-empty arm lets callers format
5830/// as `"...failed: {errno}{hint};"` without having to add a
5831/// conditional separator — an empty hint disappears cleanly.
5832pub(super) fn mmap_shared_anon_errno_hint(errno: Option<i32>) -> &'static str {
5833    match errno {
5834        Some(libc::ENOMEM) => {
5835            " (ENOMEM: host is out of memory \
5836             or /proc/sys/vm/max_map_count is too low — \
5837             check `sysctl vm.max_map_count` and `free -h`)"
5838        }
5839        Some(libc::EPERM) => {
5840            " (EPERM: MAP_SHARED|MAP_ANONYMOUS \
5841             rejected by the kernel — check memory cgroup \
5842             limits and container seccomp policy)"
5843        }
5844        Some(libc::EINVAL) => {
5845            " (EINVAL: invalid length or \
5846             flag combination — verify num_workers > 0 so the \
5847             region size is non-zero, and that the total size \
5848             does not overflow usize)"
5849        }
5850        _ => "",
5851    }
5852}
5853
5854#[cfg(test)]
5855mod testing;
5856#[cfg(test)]
5857mod tests_composed;
5858#[cfg(test)]
5859mod tests_fan_out;
5860#[cfg(test)]
5861mod tests_futex;
5862#[cfg(test)]
5863mod tests_grandchild;
5864#[cfg(test)]
5865mod tests_idle_churn;
5866#[cfg(test)]
5867mod tests_integration;
5868#[cfg(test)]
5869mod tests_lifecycle;
5870#[cfg(test)]
5871mod tests_mempolicy;
5872#[cfg(test)]
5873mod tests_misc;
5874#[cfg(test)]
5875mod tests_pcomm;
5876#[cfg(test)]
5877mod tests_sched_policy;
5878#[cfg(test)]
5879mod tests_spawn_guard;
5880#[cfg(test)]
5881mod tests_thread_mode;
5882#[cfg(test)]
5883mod tests_wake_chain;