ktstr/workload/spawn/mod.rs
1//! Spawn pipeline: `WorkloadHandle`, `SpawnGuard`, `GroupParams`,
2//! `ThreadWorker`, the report shapes (`WorkerReport`, `WorkerExitInfo`,
3//! `Migration`), and the helpers that thread workers through fork or
4//! `std::thread::spawn`. Split out of `workload/mod.rs` to separate
5//! the production code path from its co-located tests. Tests are
6//! co-located with the production code in topic-grouped sibling
7//! files (`tests_lifecycle`, `tests_grandchild`, `tests_composed`,
8//! ...) that import shared fixtures from `testing.rs` via
9//! `use super::testing::*;`.
10
11use anyhow::{Context, Result};
12use std::collections::{BTreeMap, BTreeSet};
13use std::io::{Read, Write};
14use std::os::unix::io::FromRawFd;
15use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
16use std::time::{Duration, Instant};
17
18use super::affinity::{AffinityIntent, ResolvedAffinity, resolve_affinity, set_thread_affinity};
19use super::config::{CloneMode, MemPolicy, MpolFlags, SchedPolicy, WorkSpec, WorkloadConfig};
20use super::types::*;
21use super::worker::worker_main;
22
23mod cleanup;
24use cleanup::{close_fds_silently, kill_and_killpg};
25
26pub(super) static STOP: AtomicBool = AtomicBool::new(false);
27
28/// A single CPU migration event observed by a worker.
29///
30/// No `Default` impl — a default-constructed Migration would be
31/// `{at_ns: 0, from_cpu: 0, to_cpu: 0}` (a "migration from CPU 0 to
32/// CPU 0 at time 0" is a contradiction: a migration where source ==
33/// dest is not a real migration). Downstream analysis (NUMA migration
34/// counts, scheduler-balance ratios) that assumes `from_cpu !=
35/// to_cpu` would misread default values as real migrations. Construct
36/// every Migration explicitly via [`Migration::new`].
37#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
38pub struct Migration {
39 /// Nanoseconds since worker start.
40 pub at_ns: u64,
41 /// CPU before migration.
42 pub from_cpu: usize,
43 /// CPU after migration.
44 pub to_cpu: usize,
45}
46
47impl Migration {
48 /// Const constructor for a Migration event. Pair-symmetric with
49 /// the struct fields; lets `Migration::new(ns, from, to)` replace
50 /// `Migration { at_ns: ns, from_cpu: from, to_cpu: to }` in
51 /// hand-built test fixtures.
52 pub const fn new(at_ns: u64, from_cpu: usize, to_cpu: usize) -> Self {
53 Self {
54 at_ns,
55 from_cpu,
56 to_cpu,
57 }
58 }
59}
60
61/// Build a nodemask bitmask and maxnode value for `set_mempolicy(2)`
62/// and `mbind(2)`.
63///
64/// Returns `(nodemask_vec, maxnode)`. The nodemask is a bitmask of
65/// `c_ulong` words where bit N corresponds to NUMA node N. `maxnode`
66/// must be `max_node + 2` because the kernel's `get_nodes()` does
67/// `--maxnode` before reading the bitmask.
68pub fn build_nodemask(nodes: &BTreeSet<usize>) -> (Vec<libc::c_ulong>, libc::c_ulong) {
69 if nodes.is_empty() {
70 return (vec![], 0);
71 }
72 let max_node = nodes.iter().copied().max().unwrap_or(0);
73 let mask_bits = max_node + 2;
74 let bits_per_word = std::mem::size_of::<libc::c_ulong>() * 8;
75 let mask_words = mask_bits.div_ceil(bits_per_word);
76 let mut nodemask = vec![0 as libc::c_ulong; mask_words];
77 for &node in nodes {
78 nodemask[node / bits_per_word] |= 1 << (node % bits_per_word);
79 }
80 (nodemask, mask_bits as libc::c_ulong)
81}
82
83const MPOL_PREFERRED_MANY: i32 = 5;
84const MPOL_WEIGHTED_INTERLEAVE: i32 = 6;
85
86/// Worker-side `futex_wait` timeout for STOP-signal polling across
87/// every blocking workload primitive (FutexPingPong, FutexFanOut,
88/// FanOutCompute, MutexContention). Workers block inside the
89/// per-variant futex with this timespec; on wake (or timeout) they
90/// re-check [`STOP`] and either continue working or exit cleanly.
91/// At 100ms the worst-case shutdown latency a `stop_and_collect`
92/// caller must budget for is ~100ms above the flush/IO cost; see
93/// [`WorkloadHandle::stop_and_collect`]'s "Shutdown latency"
94/// paragraph for the caller-facing contract.
95pub(super) const WORKER_STOP_POLL_NS: libc::c_long = 100_000_000;
96
97/// Packaged [`libc::timespec`] for every worker-side `futex_wait`
98/// across the blocking workload primitives. Duplicating the struct
99/// literal per call site drifted the `tv_nsec` field between variants
100/// during earlier edits; a single const keeps the shutdown-latency
101/// budget documented on [`WORKER_STOP_POLL_NS`] authoritative.
102pub(super) const FUTEX_WAIT_TIMEOUT: libc::timespec = libc::timespec {
103 tv_sec: 0,
104 tv_nsec: WORKER_STOP_POLL_NS,
105};
106
107/// Post-wake spin count used by the fan-out messenger variants
108/// ([`WorkType::FutexFanOut`] and [`WorkType::FanOutCompute`]) AFTER
109/// each broadcast wake. Gives receivers a short uncontended window
110/// to run to their reservoir-push before the next wake cycle
111/// arrives. Threaded through `spin_burst` rather than a raw
112/// `std::hint::spin_loop` so the messenger also contributes to
113/// `work_units` — matching FanOutCompute's existing pattern so
114/// both variants' messengers report comparable throughput to
115/// downstream assertions.
116pub(super) const FAN_OUT_POST_WAKE_SPIN_ITERS: u64 = 256;
117
118/// Outcome of [`apply_mempolicy_with_flags`].
119///
120/// Makes each branch of the apply path observable to tests and any
121/// future caller that wants to react to the result (the production
122/// worker-setup caller in `worker::worker_main` discards it — applying
123/// the policy is best-effort there, matching the `setpriority` /
124/// `set_sched_policy` soft-fail idiom). The variants partition the
125/// function's exits exactly:
126///
127/// - [`SkippedDefault`](MempolicyOutcome::SkippedDefault): `policy ==
128/// MemPolicy::Default` — no syscall (inherit the parent's policy).
129/// - [`SkippedEmpty`](MempolicyOutcome::SkippedEmpty): a
130/// node-set-bearing variant (`Bind` / `Interleave` /
131/// `PreferredMany` / `WeightedInterleave`) carried an empty set — no
132/// `set_mempolicy(2)` is issued. Issuing one with an empty mask
133/// would be a different (and wrong) request to the kernel, so the
134/// skip is the contract, not merely "did not crash".
135/// - [`Applied`](MempolicyOutcome::Applied): `set_mempolicy(2)`
136/// returned 0.
137/// - [`Failed`](MempolicyOutcome::Failed): `set_mempolicy(2)` returned
138/// non-zero; the errno is logged to stderr and carried here.
139#[derive(Debug, Clone, Copy, PartialEq, Eq)]
140pub(super) enum MempolicyOutcome {
141 /// `MemPolicy::Default` — no syscall.
142 SkippedDefault,
143 /// A node-set variant with an empty set — no syscall.
144 SkippedEmpty,
145 /// `set_mempolicy(2)` succeeded.
146 Applied,
147 /// `set_mempolicy(2)` failed; carries the errno.
148 Failed(i32),
149}
150
151/// Call `set_mempolicy(2)` for the current process with mode flags.
152///
153/// No-op for `MemPolicy::Default` (returns
154/// [`MempolicyOutcome::SkippedDefault`]) and for a node-set variant
155/// with an empty set (returns [`MempolicyOutcome::SkippedEmpty`]).
156/// Logs a warning on syscall failure and returns
157/// [`MempolicyOutcome::Failed`]; returns [`MempolicyOutcome::Applied`]
158/// on success.
159pub(super) fn apply_mempolicy_with_flags(policy: &MemPolicy, flags: MpolFlags) -> MempolicyOutcome {
160 let (mode, node_set): (i32, BTreeSet<usize>) = match policy {
161 MemPolicy::Default => return MempolicyOutcome::SkippedDefault,
162 MemPolicy::Bind(nodes) => (libc::MPOL_BIND, nodes.clone()),
163 MemPolicy::Preferred(node) => (libc::MPOL_PREFERRED, [*node].into_iter().collect()),
164 MemPolicy::Interleave(nodes) => (libc::MPOL_INTERLEAVE, nodes.clone()),
165 MemPolicy::PreferredMany(nodes) => (MPOL_PREFERRED_MANY, nodes.clone()),
166 MemPolicy::WeightedInterleave(nodes) => (MPOL_WEIGHTED_INTERLEAVE, nodes.clone()),
167 MemPolicy::Local => {
168 let rc = unsafe {
169 libc::syscall(
170 libc::SYS_set_mempolicy,
171 libc::MPOL_LOCAL | flags.bits() as i32,
172 std::ptr::null::<libc::c_ulong>(),
173 0 as libc::c_ulong,
174 )
175 };
176 if rc != 0 {
177 let err = std::io::Error::last_os_error();
178 eprintln!("ktstr: set_mempolicy(MPOL_LOCAL) failed: {err}");
179 return MempolicyOutcome::Failed(err.raw_os_error().unwrap_or(0));
180 }
181 return MempolicyOutcome::Applied;
182 }
183 };
184 if node_set.is_empty() {
185 eprintln!("ktstr: set_mempolicy: empty node set, skipping");
186 return MempolicyOutcome::SkippedEmpty;
187 }
188 let (mask, maxnode) = build_nodemask(&node_set);
189 let effective_mode = mode | flags.bits() as i32;
190 let rc = unsafe {
191 libc::syscall(
192 libc::SYS_set_mempolicy,
193 effective_mode,
194 mask.as_ptr(),
195 maxnode,
196 )
197 };
198 if rc != 0 {
199 let err = std::io::Error::last_os_error();
200 eprintln!("ktstr: set_mempolicy(mode={mode}, nodes={node_set:?}) failed: {err}");
201 return MempolicyOutcome::Failed(err.raw_os_error().unwrap_or(0));
202 }
203 MempolicyOutcome::Applied
204}
205
206/// Terminate the calling forked-child worker with success status (code 0).
207///
208/// Uses the `_exit(2)` syscall via `libc::_exit` directly rather than [`std::process::exit`] or
209/// returning from `main` so that:
210/// 1. Rust drop glue does NOT run on inherited parent state (open file
211/// descriptors, mmaps, atexit handlers, thread-local destructors).
212/// Forked children share address space with the parent at fork
213/// time; running destructors on the child side would also tear down
214/// parent-owned resources via the shared FD table / OS handles.
215/// 2. `exit_group(2)` semantics (which `_exit` invokes) tear down only
216/// the calling tgid. Forked children have their own tgid distinct
217/// from the test-runner's, so the parent test runner survives.
218///
219/// Code 0 signals to the parent's collect_results path that the worker
220/// completed the work-loop normally — the WorkerReport (if any was
221/// written) is valid and should be merged into the test verdict.
222#[inline]
223fn exit_child_success() -> ! {
224 // SAFETY: `libc::_exit` is `unsafe` because it bypasses Rust drop
225 // execution + C stdlib atexit. That bypass IS the contract here
226 // for forked children: running destructors on shared inherited
227 // state corrupts parent-owned resources. exit_group(2) tear-down
228 // of the child's tgid is sound — the parent is in a different
229 // tgid.
230 unsafe { libc::_exit(0) }
231}
232
233/// Terminate the calling forked-child worker with error status (code 1).
234///
235/// Same `_exit`-vs-Rust-exit rationale as [`exit_child_success`]. Code 1
236/// signals to the parent's collect_results path that the worker failed
237/// before completing the work-loop normally — the parent treats the
238/// WorkerReport as missing or invalid and surfaces the failure to the
239/// test verdict.
240///
241/// Use this on any error path inside the forked-child closure: poll
242/// timeout, syscall failure, panic via `catch_unwind`, write-report
243/// failure, orphan detection.
244#[inline]
245fn exit_child_error() -> ! {
246 // SAFETY: see [`exit_child_success`] — identical contract.
247 unsafe { libc::_exit(1) }
248}
249
250/// Apply `nice` to the calling worker via `setpriority(2)`.
251///
252/// Always invokes `setpriority(PRIO_PROCESS, 0, nice)` — including
253/// for `nice == 0`, which writes 0 explicitly rather than
254/// inheriting. The "skip the syscall, inherit the parent's nice"
255/// state lives one layer up: callers gate the call on
256/// [`WorkSpec::nice`](crate::workload::WorkSpec::nice) /
257/// [`WorkloadConfig::nice`](crate::workload::WorkloadConfig::nice)
258/// being `Some(_)` and pass through the inner value (so `Some(0)`
259/// becomes a real `setpriority(0)` and `None` skips the call
260/// entirely). The kernel clamps `niceval` to
261/// `[MIN_NICE, MAX_NICE]` (-20..19) inside `setpriority`, so any
262/// out-of-range input is normalised by the syscall itself rather
263/// than rejected.
264///
265/// Failures are logged once via stderr and do not abort the
266/// worker — matches the [`apply_mempolicy_with_flags`] /
267/// [`set_thread_affinity`] / `set_sched_policy` error idiom in
268/// `worker_main`. The expected failure mode is `EACCES` from
269/// `set_one_prio` → `can_nice` when an unprivileged worker tries
270/// to lower nice (negative niceval) without `CAP_SYS_NICE`.
271pub(super) fn apply_nice(nice: i32) {
272 let rc = unsafe { libc::setpriority(libc::PRIO_PROCESS, 0, nice) };
273 if rc != 0 {
274 warn_setpriority_failed_once();
275 }
276}
277
278/// Print a single `setpriority` failure warning for the lifetime
279/// of the process. Same rationale as
280/// `warn_schedstat_unavailable_once`: dozens of workers will fail
281/// once each on an unprivileged host that requested negative nice,
282/// and a per-worker line floods the test log.
283pub(super) fn warn_setpriority_failed_once() {
284 static WARNED: std::sync::Once = std::sync::Once::new();
285 WARNED.call_once(|| {
286 let errno = std::io::Error::last_os_error();
287 eprintln!(
288 "workload: setpriority(PRIO_PROCESS) failed: {errno}; nice value not applied (CAP_SYS_NICE may be required for negative nice)"
289 );
290 });
291}
292
293/// Telemetry collected from a worker process after it stops.
294///
295/// Normal reports: each field is populated by the worker itself
296/// (inside the VM) and serialized via a pipe to the parent process.
297/// Sentinel reports: sentinel reports synthesized by
298/// [`WorkloadHandle::stop_and_collect`] on worker-exit carry
299/// parent-populated `exit_info` with the remaining fields at their
300/// [`Default`] values (the worker never emitted on the pipe, so
301/// the parent is the sole source of truth for the surfaced
302/// outcome).
303///
304/// # Default trade-off
305///
306/// [`Default`] produces a zero/empty report. The trade-off:
307///
308/// - **Pro:** sentinel/test code can spread `..WorkerReport::default()`
309/// so adding a field does not require touching every sentinel site.
310/// - **Con:** zero-valued fields are valid report outputs (e.g. a
311/// worker that never blocked has `wake_latencies_ns: vec![]`), so
312/// a missing field cannot be distinguished from a real-zero field at
313/// the reader. Consumers that need "was this field actually set"
314/// must track presence out-of-band (e.g. whether the work type
315/// populates the field per `wake_latencies_ns`'s doc).
316///
317/// Decision: keep the `Default` impl. Sentinel ergonomics outweigh
318/// the distinguishability cost — every real consumer already knows
319/// which fields a given `WorkType` populates, and the alternative
320/// (removing `Default` and hand-listing every field at sentinel
321/// sites) introduces a worse drift problem that silently skips new
322/// telemetry instead of reporting it as zero.
323///
324/// Callers building a sentinel report should spread
325/// `..WorkerReport::default()` rather than listing every field by hand
326/// -- the sentinel drifts silently when a field is added.
327#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize, crate::Claim)]
328pub struct WorkerReport {
329 /// Kernel TID from `gettid(2)`. For [`CloneMode::Fork`] each
330 /// worker is its own thread-group leader so `gettid() == getpid()
331 /// == tgid`; the report's tid is interchangeable with the
332 /// worker's pid in libc / cgroup APIs. For [`CloneMode::Thread`]
333 /// every worker shares the parent's tgid and `gettid()` is the
334 /// only identifier that discriminates per-task identity, so the
335 /// report's tid is what feeds `sched_setaffinity(tid, ...)` and
336 /// `cgroup.threads` writes (NOT `cgroup.procs` — see the warning
337 /// on [`WorkloadHandle::worker_pids`]). Stored as `pid_t` (i32)
338 /// to match the kernel's native type and avoid the silent
339 /// u32→i32 sign-cast wraparound at libc boundaries
340 /// (kill/waitpid/Pid::from_raw).
341 pub tid: i32,
342 /// Cumulative work iterations (incremented by `spin_burst` or I/O loops).
343 /// Read by the fairness/starvation gate (`assert_not_starved` /
344 /// `min_work_units`) and `assert_throughput_parity`; NOT summed into
345 /// `CgroupStats::total_iterations`, which reads [`iterations`](Self::iterations).
346 /// A `Custom` worker that wants throughput assertions must also populate
347 /// [`iterations`](Self::iterations).
348 pub work_units: u64,
349 /// Thread CPU time from `CLOCK_THREAD_CPUTIME_ID` (ns).
350 pub cpu_time_ns: u64,
351 /// Wall-clock time from worker-start to stop flag (ns).
352 /// Measured from the worker's first `Instant::now()` in
353 /// `worker_main` (immediately after the start handshake) to the
354 /// outer-loop exit (when the per-worker `stop` flag is observed
355 /// `true`); covers both Fork-mode workers (signal-driven flag)
356 /// and Thread-mode workers (parent-driven flag).
357 pub wall_time_ns: u64,
358 /// `wall_time_ns - cpu_time_ns`: total off-CPU time (ns).
359 ///
360 /// Includes all time the worker was not executing on a CPU: runnable
361 /// queue wait, voluntary sleep, I/O wait, futex wait, etc.
362 pub off_cpu_ns: u64,
363 /// Number of observed CPU migrations (checked every 1024 work units).
364 pub migration_count: u64,
365 /// Set of all CPUs this worker ran on.
366 pub cpus_used: BTreeSet<usize>,
367 /// Ordered list of CPU migration events with timestamps.
368 pub migrations: Vec<Migration>,
369 /// Longest wall-clock gap observed at 1024-work-unit checkpoints
370 /// (ms). High values indicate the task was preempted or descheduled
371 /// near a checkpoint boundary.
372 pub max_gap_ms: u64,
373 /// CPU where the longest gap happened.
374 pub max_gap_cpu: usize,
375 /// When the longest gap happened (ms from start).
376 pub max_gap_at_ms: u64,
377 /// Per-wakeup latency samples (ns). Measures off-CPU time
378 /// between the call that blocks (any blocking primitive — pipe
379 /// `read`, futex wait, `poll`, `sched_yield`, `nanosleep`, etc.)
380 /// and the wakeup that resumes execution; not a yield-specific
381 /// measure.
382 /// Populated for blocking work types: Bursty, PipeIo, FutexPingPong,
383 /// FutexFanOut, FanOutCompute, CacheYield, CachePipe, IoSyncWrite,
384 /// IoRandRead, IoConvoy, NiceSweep,
385 /// AffinityChurn, PolicyChurn, MutexContention, ForkExit (parent's
386 /// waitpid wait), Sequence with Sleep/Yield/Io phases.
387 ///
388 /// Distinct from [`iteration_costs_ns`](Self::iteration_costs_ns):
389 /// this field measures the OFF-CPU gap between blocks (scheduler
390 /// wake latency); `iteration_costs_ns` measures the wall-clock
391 /// duration of a single compute iteration. The three pure-compute
392 /// variants that populate `iteration_costs_ns` —
393 /// [`WorkType::AluHot`], [`WorkType::SmtSiblingSpin`], and
394 /// [`WorkType::IpcVariance`] — never block and report
395 /// `wake_latencies_ns: vec![]`. Other compute variants
396 /// (e.g. SpinWait, YieldHeavy, Mixed) populate neither
397 /// reservoir.
398 pub wake_latencies_ns: Vec<u64>,
399 /// Total number of wake-latency observations the worker
400 /// recorded, INCLUDING any that were dropped by the reservoir
401 /// sampler. `wake_latencies_ns` is reservoir-clamped to at
402 /// most `MAX_WAKE_SAMPLES` (100_000) entries; on a long run
403 /// that accumulates more than that many wake events, the
404 /// vector stays at its cap while this counter keeps climbing.
405 /// Host-side consumers that want to report "total wakeups
406 /// observed" (vs. "entries in the sample") read this field;
407 /// percentile / CV computations read `wake_latencies_ns`.
408 pub wake_sample_total: u64,
409 /// Per-iteration wall-clock duration of one compute iteration (ns),
410 /// including any scheduler preemption. Measured via
411 /// `Instant::now()` (CLOCK_MONOTONIC), so a sample includes any
412 /// off-CPU time the kernel inserted mid-iteration. The variance
413 /// across iterations is the load-bearing scheduler signal —
414 /// preemption inflates samples and that inflation is the
415 /// observable.
416 ///
417 /// Reservoir-sampled at the same cap (`MAX_WAKE_SAMPLES` =
418 /// 100_000) as [`wake_latencies_ns`](Self::wake_latencies_ns),
419 /// using the same Algorithm-R sampler.
420 ///
421 /// Populated for pure compute work types where the worker
422 /// never blocks: [`WorkType::AluHot`], [`WorkType::SmtSiblingSpin`],
423 /// and [`WorkType::IpcVariance`]. Each sample is the elapsed
424 /// time from the start to the end of one outer-loop iteration's
425 /// compute burst.
426 ///
427 /// Distinct from [`wake_latencies_ns`](Self::wake_latencies_ns):
428 /// the wake-latency reservoir captures off-CPU time (futex /
429 /// pipe / nanosleep wakeups); this reservoir captures the
430 /// wall-clock duration of one compute iteration (which
431 /// includes any scheduler preemption inside the iteration).
432 /// The two are NOT comparable across variants — a
433 /// scheduler-A/B test that wants iteration cost for a compute
434 /// variant reads this field; a test that wants wake latency
435 /// for a blocking variant reads `wake_latencies_ns`.
436 pub iteration_costs_ns: Vec<u64>,
437 /// Total number of iteration-cost observations the worker
438 /// recorded, INCLUDING any that were dropped by the reservoir
439 /// sampler. Mirrors [`wake_sample_total`](Self::wake_sample_total)
440 /// but for [`iteration_costs_ns`](Self::iteration_costs_ns):
441 /// host-side consumers that want "total compute iterations
442 /// observed" read this field; distribution computations read
443 /// `iteration_costs_ns` directly.
444 pub iteration_cost_sample_total: u64,
445 /// Per-timer-cycle latency samples (ns) for
446 /// [`crate::workload::WorkType::TimerLatency`]: the observed
447 /// `clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME)` wake time minus the
448 /// absolute deadline, floored at 0. Reservoir-clamped to at most
449 /// `MAX_WAKE_SAMPLES`, distinct from `wake_latencies_ns` so cyclictest-style
450 /// timer latency does not blur with the blocking variants' wake latency.
451 /// `vec![]` for every non-`TimerLatency` variant.
452 pub timer_latencies_ns: Vec<u64>,
453 /// Total timer-cycle observations, INCLUDING any the reservoir dropped —
454 /// the true population for unbiased cross-phase weighting. Mirrors
455 /// [`wake_sample_total`](Self::wake_sample_total) for `timer_latencies_ns`.
456 pub timer_sample_total: u64,
457 /// Outer-loop iteration count. What `CgroupStats::total_iterations` sums
458 /// and what the derived throughput rates (`iterations_per_worker` /
459 /// `iterations_per_cpu_sec`) and `migration_ratio` divide by; NOT read by
460 /// the starvation gate, which reads [`work_units`](Self::work_units). A
461 /// `Custom` worker that wants the starvation / `min_work_units` gate
462 /// honored must also populate [`work_units`](Self::work_units).
463 pub iterations: u64,
464 /// Delta of /proc/self/schedstat field 2 (run_delay) over the work loop.
465 pub schedstat_run_delay_ns: u64,
466 /// Delta of /proc/self/schedstat field 3 (pcount — number of
467 /// times the task was scheduled in over the work loop). This is
468 /// NOT a context-switch count; `/proc/<pid>/status`'s
469 /// `voluntary_ctxt_switches` / `nonvoluntary_ctxt_switches` are
470 /// the true context-switch counters and are not read here.
471 pub schedstat_run_count: u64,
472 /// Delta of /proc/self/schedstat field 1 (cpu_time) over the work loop.
473 pub schedstat_cpu_time_ns: u64,
474 /// `true` when the worker reached its natural end — either the
475 /// outer work loop observed STOP and exited cleanly, or a
476 /// custom-closure payload returned from its `run` function. A
477 /// sentinel report synthesised by
478 /// [`WorkloadHandle::stop_and_collect`]'s decode-failure
479 /// fallback (see `exit_info` below) carries `false`. Lets downstream
480 /// consumers distinguish "worker ran to completion and
481 /// observed zero iterations" (`completed: true, iterations: 0`
482 /// — legitimate for pathologically short test windows) from
483 /// "worker died / timed out before recording anything"
484 /// (`completed: false, iterations: 0` — the sentinel shape).
485 pub completed: bool,
486 /// Per-NUMA-node page counts from `/proc/self/numa_maps` after workload.
487 /// Keyed by node ID. Empty when numa_maps is unavailable. numa_maps reports
488 /// the per-node RESIDENT pages of the calling task's mm. For
489 /// [`CloneMode::Fork`] workers (the scenario-engine default) each worker has
490 /// a disjoint mm, so SUMming across workers is the true cgroup page total;
491 /// for [`CloneMode::Thread`] siblings share one mm and each reports the SAME
492 /// residency, so a SUM counts shared pages once per thread. Consumers
493 /// (cgroup_stats / phase_cgroup_stats) SUM
494 /// this — correct for the Fork default; the Thread-mode caveat is inherited
495 /// identically by both reducers (no per-phase divergence).
496 pub numa_pages: BTreeMap<usize, u64>,
497 /// Delta of `/proc/vmstat` `numa_pages_migrated` over the work loop. This is
498 /// the SYSTEM-WIDE vmstat `NUMA_PAGE_MIGRATE` vm_event (summed across all
499 /// CPUs), NOT the per-task `task_struct` field of the same name surfaced in
500 /// `/proc/PID/sched`. Because every worker reads the same system-wide
501 /// counter, consumers fold it as MAX across workers (a SUM would inflate it
502 /// by the worker count) — see `PhaseCgroupStats::cross_node_migrated`. The vm_event is bumped only by
503 /// NUMA balancing (the source of NUMA page migrations), so the delta is 0 on
504 /// kernels/configs without it (a measurement-availability caveat, not a wrong
505 /// value).
506 pub vmstat_numa_pages_migrated: u64,
507 /// Diagnostic attached only to sentinel reports — populated when
508 /// `stop_and_collect` synthesized the entry because no (or
509 /// unparseable) postcard payload came back on the report pipe.
510 /// `None` on every real worker-produced report. Lets operators
511 /// distinguish the four failure shapes that all collapse to
512 /// "empty pipe + no report":
513 ///
514 /// - [`WorkerExitInfo::Exited`] with a non-zero code: worker
515 /// reached `_exit(code)` without writing the report —
516 /// typically the `catch_unwind` Err arm in the worker-child
517 /// closure (panic under `panic = "unwind"`) or the 30s
518 /// poll-start timeout's early `_exit(1)`.
519 /// - [`WorkerExitInfo::Signaled`]: worker was killed — SIGABRT
520 /// under `panic = "abort"`, SIGKILL from the still-alive
521 /// escalation in `stop_and_collect`, or an external signal
522 /// (OOM killer, operator SIGKILL).
523 /// - [`WorkerExitInfo::TimedOut`]: worker never exited within the
524 /// 5s collection deadline and the WNOHANG reap observed
525 /// `StillAlive` — escalated via SIGKILL + `waitpid(None)`.
526 /// - [`WorkerExitInfo::WaitFailed`]: `waitpid` itself returned an
527 /// error (ECHILD / EINTR). Typically a plumbing bug — the child
528 /// was reaped by an external signal handler, a double-reap
529 /// regression, or the pid was recycled.
530 ///
531 /// No `skip_serializing_if`: postcard is a positional, schemaless
532 /// format — every Serialize call must emit every field in the
533 /// same order or the decoder reads the next field's bytes off
534 /// the wire (silent data corruption). The Option<…> tag itself
535 /// (one byte) is the only overhead on the live-worker path.
536 pub exit_info: Option<WorkerExitInfo>,
537 /// `true` when this worker served as the messenger for a
538 /// wake-fanout work type ([`WorkType::FutexFanOut`] or
539 /// [`WorkType::FanOutCompute`]) — the single writer that
540 /// advances the shared generation and issues `futex_wake` for
541 /// its group. `false` for receivers and for every non-fanout
542 /// work type.
543 ///
544 /// Populated from the `is_messenger` flag on the
545 /// `futex: Option<(*mut u32, bool)>` parameter threaded into
546 /// `worker_main`. A sentinel report synthesized by the
547 /// decode-failure fallback in
548 /// [`WorkloadHandle::stop_and_collect`] carries `false` via
549 /// [`Default`], matching its `completed: false` shape.
550 ///
551 /// Enables per-worker latency-participation assertions in
552 /// tests — a receiver worker produces `wake_latencies_ns`
553 /// entries while its messenger pair records wake-side work but
554 /// no wake latency. Without this field, tests had to
555 /// cross-reference per-group indexing or guess from the empty
556 /// vector — ambiguous on groups where the messenger legitimately
557 /// exits before producing a report.
558 pub is_messenger: bool,
559 /// Index of the worker group this report belongs to.
560 ///
561 /// `0` denotes the primary group described by
562 /// [`WorkloadConfig`]'s top-level `work_type` / `num_workers` /
563 /// `affinity` / `sched_policy` fields. `1..=N` denotes
564 /// composed groups in the order they appear in
565 /// [`WorkloadConfig::composed`]. Reports collected by
566 /// [`WorkloadHandle::stop_and_collect`] are tagged with the
567 /// `group_idx` of the spawning [`WorkSpec`] (or `0` for the
568 /// primary), so per-group filtering in test assertions can
569 /// cleanly partition the vector.
570 ///
571 /// Sentinel reports (synthesized on missing or undecodable
572 /// payload / panic / timeout) carry the `group_idx` of the
573 /// worker whose pid the sentinel replaces, so a "this composed
574 /// group failed" assertion still works on an outright crash.
575 pub group_idx: usize,
576 /// Rendered error from the worker's `set_thread_affinity`
577 /// call, or `None` when affinity setup succeeded (or the
578 /// worker had no affinity to apply). Populated by
579 /// `worker_main` when the pre-loop
580 /// `set_thread_affinity(tid, cpus)` returns `Err` — the
581 /// worker continues with the inherited (or kernel-default)
582 /// cpumask so the test still produces an observable outcome,
583 /// but the failure is now surfaced in the report instead of
584 /// being silently dropped via `let _ = …`. The expected
585 /// failure shape is EINVAL from a requested cpu that is
586 /// outside the cpuset cgroup's `cpus.allowed` mask or the
587 /// kernel's online mask; EPERM is reachable when a more
588 /// privileged tracer set the worker's cpus_allowed and a
589 /// container policy denies further widening. Sentinel
590 /// reports synthesised by
591 /// [`WorkloadHandle::stop_and_collect`] leave this field at
592 /// its default `None` — a worker that died before
593 /// `worker_main` ran has no affinity-error observation.
594 ///
595 /// No `skip_serializing_if`: postcard is positional and
596 /// schemaless, so every Serialize call must emit every field
597 /// in the same order — skipping a field shifts the decoder
598 /// onto the next field's bytes (silent corruption). The
599 /// Option<…> tag (one byte) is the only overhead on the
600 /// success path.
601 pub affinity_error: Option<String>,
602 /// `set_sched_policy` failure text (`{e:#}`), or `None` when the
603 /// per-worker scheduling-policy set succeeded (or the policy was the
604 /// `Normal` no-op). Load-bearing for the verifier dispatch probe: a
605 /// probe worker configured `SchedPolicy::Ext` whose
606 /// `sched_setattr(SCHED_EXT)` was rejected (e.g. the scheduler set
607 /// `scx.disallow` on it) stays SCHED_OTHER, so its `iterations`
608 /// progress does NOT prove the BPF scheduler dispatched it —
609 /// `run_and_confirm_dispatch` excludes any worker with a
610 /// `sched_policy_error` from the dispatch proof. Sentinel reports
611 /// synthesised by [`WorkloadHandle::stop_and_collect`] leave this
612 /// `None` (no policy set was attempted). Same positional-serde
613 /// reasoning as [`Self::affinity_error`]: no `skip_serializing_if`.
614 pub sched_policy_error: Option<String>,
615 /// Per-phase telemetry slices for a backdrop (persistent) worker
616 /// that spanned multiple scenario steps. EMPTY for step-local
617 /// workers and for any backdrop worker that observed no phase
618 /// boundary: the worker pushes a [`PhaseSlice`] only when the
619 /// parent-driven `phase_epoch` actually changes, so a worker whose
620 /// epoch never moved (step-local pools are never bumped) ships none
621 /// — keeping the wire empty on the common path. Each slice carries
622 /// the per-phase subset of the whole-run telemetry above, scoped to
623 /// one phase's hold window; the host expands these into per-epoch
624 /// `PhaseBucket` entries — the per-phase attribution a backdrop
625 /// worker otherwise lacks, since it is collected once with
626 /// `step_index = None`.
627 ///
628 /// Appended LAST so the positional postcard decode order of every
629 /// prior field is unchanged. `#[claim(skip)]`: there is no
630 /// test-author claim surface for the raw wire slices — assertions
631 /// run against the host-expanded `PhaseCgroupStats` carriers, which
632 /// carry their own `#[derive(Claim)]`.
633 #[claim(skip)]
634 pub phase_slices: Vec<PhaseSlice>,
635 /// Whole-run taobench COUNTER aggregate — `Some` only for a Taobench worker,
636 /// `None` otherwise. Shipped so the host can derive run-level qps/hit Rate keys
637 /// (`taobench_*_ops_per_sec` / `taobench_hit_fraction` /
638 /// `taobench_command_hit_rate`) for `--noise-adjust` spread analysis. The
639 /// per-phase `PhaseSlice::taobench` carriers feed the per-phase metrics + the
640 /// serve-latency distribution; this whole-run carrier holds COUNTERS only
641 /// ([`TaobenchStats`](crate::workload::taobench::run::TaobenchStats)) — the
642 /// serve histogram is per-phase data, and the per-phase `elapsed_ns` is
643 /// MAX-merged across concurrent threads so summing phase windows is the wrong
644 /// qps denominator, hence the engine's authoritative whole-run counter
645 /// aggregate is shipped directly. Appended LAST (after `phase_slices`) to keep
646 /// the positional postcard decode order of every prior field unchanged. `pub`
647 /// (every `WorkerReport` field is `pub`, so external custom workers can
648 /// struct-literal construct it via [`crate::workload::WorkType::custom`]; a
649 /// non-taobench worker sets `None`). `#[claim(skip)]`: framework wire plumbing,
650 /// not a test-author claim surface; assertions run against the host-derived
651 /// run-level `taobench_*` Rate metrics.
652 #[claim(skip)]
653 pub taobench_whole: Option<crate::workload::taobench::run::TaobenchStats>,
654}
655
656/// Per-phase telemetry for one backdrop worker over one scenario
657/// step's HOLD window `[StepStart(k), StepEnd(k))`. A backdrop worker
658/// spans every phase, so it accumulates a fresh slice between each
659/// parent-driven `phase_epoch` transition and finalizes it when the
660/// epoch moves (drain-on-change). Carries the per-phase subset of
661/// [`WorkerReport`]'s whole-run telemetry that has a host-side
662/// per-cgroup carrier (`PhaseCgroupStats`), so the host can pool slices
663/// across workers into per-epoch `PhaseBucket`s. Counter fields are
664/// per-phase deltas (`end_snap − start_snap`, re-baselined at each
665/// boundary); `cpus_used` and `numa_pages` are per-phase observations
666/// (gauges). Excludes `iteration_costs_ns` — that reservoir has no
667/// per-cgroup carrier at any level (it feeds only the run-level
668/// benchmark gate).
669#[derive(Debug, Clone, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
670pub struct PhaseSlice {
671 /// The 1-indexed phase this slice covers (scenario Step k → k+1;
672 /// `0` = BASELINE, `u32::MAX` = inter-step gap). The host maps this
673 /// to a `PhaseBucket.step_index`; gap- and baseline-tagged slices
674 /// have no host bucket and are discarded.
675 pub phase_epoch: u32,
676 /// CPUs this worker ran on DURING this phase.
677 pub cpus_used: BTreeSet<usize>,
678 /// Per-wakeup latency samples (ns) recorded during this phase,
679 /// reservoir-clamped to at most `MAX_PHASE_WAKE_SAMPLES` — a
680 /// smaller cap than the whole-run `MAX_WAKE_SAMPLES` so a worker
681 /// spanning many phases stays within the report pipe. The host
682 /// re-caps the merged per-cgroup pool at the larger
683 /// `MAX_WAKE_SAMPLES`.
684 pub wake_latencies_ns: Vec<u64>,
685 /// Total wake observations during this phase, INCLUDING any the
686 /// reservoir dropped — the true population for unbiased
687 /// cross-phase weighting.
688 pub wake_sample_total: u64,
689 /// Per-timer-cycle latency samples (ns) recorded during this phase by a
690 /// [`crate::workload::WorkType::TimerLatency`] worker, reservoir-clamped to
691 /// `MAX_PHASE_WAKE_SAMPLES` (like `wake_latencies_ns`). Distinct carrier so
692 /// timer latency does not blur with wake latency.
693 pub timer_latencies_ns: Vec<u64>,
694 /// Total timer-cycle observations during this phase, INCLUDING any the
695 /// reservoir dropped — the true population for unbiased cross-phase
696 /// weighting. Mirrors `wake_sample_total` for `timer_latencies_ns`.
697 pub timer_sample_total: u64,
698 /// `/proc/self/schedstat` run_delay (field 2) delta over this phase (ns).
699 pub run_delay_ns: u64,
700 /// Off-CPU time during this phase (ns): `wall_ns − cpu_time` over
701 /// the phase. Paired with `wall_ns` so the host derives the
702 /// off-CPU fraction and skips the not-measured `wall_ns == 0` case
703 /// rather than dividing by zero.
704 pub off_cpu_ns: u64,
705 /// Wall-clock duration of this phase as the worker observed it
706 /// (ns) — the denominator for the off-CPU fraction. `0` for a
707 /// phase the worker never ran in (not-measured).
708 pub wall_ns: u64,
709 /// CPU migrations observed during this phase.
710 pub migration_count: u64,
711 /// Outer-loop iterations during this phase.
712 pub iterations: u64,
713 /// `/proc/self/schedstat` cpu_time (field 1, sum_exec_runtime)
714 /// delta over this phase (ns).
715 pub schedstat_cpu_time_ns: u64,
716 /// Per-NUMA-node resident page counts observed at this phase's end
717 /// (`/proc/self/numa_maps`). A gauge, not a delta.
718 pub numa_pages: BTreeMap<usize, u64>,
719 /// `/proc/vmstat numa_pages_migrated` delta over this phase
720 /// (system-wide counter; host folds as MAX across workers).
721 pub vmstat_numa_pages_migrated: u64,
722 /// Longest checkpoint-to-checkpoint wall gap during this phase (ms).
723 pub max_gap_ms: u64,
724 /// CPU where this phase's longest gap happened — coupled with
725 /// `max_gap_ms` as an argmax pair (a bare max would desync the gap
726 /// from its CPU).
727 pub max_gap_cpu: usize,
728 /// Per-phase schbench engine metrics, present ONLY for a
729 /// `WorkType::Schbench` backdrop worker (`None` for every other work
730 /// type — the generic drain leaves it `None`). Carries the phase's merged
731 /// wakeup/request histograms + run-delay raw pairs; the host re-pools these
732 /// across workers ([`crate::workload::schbench::run::SchbenchPhaseStats`])
733 /// and derives per-phase percentiles into `PhaseBucket.metrics`.
734 /// `pub(crate)`: an internal carrier (test authors read `PhaseBucket`, not
735 /// `PhaseSlice`) whose element type is `pub(crate)`. Integer-only, so it
736 /// preserves `PhaseSlice`'s `Eq`.
737 pub(crate) schbench: Option<crate::workload::schbench::run::SchbenchPhaseStats>,
738 /// Per-phase taobench engine metrics, present ONLY for a
739 /// `WorkType::Taobench` backdrop worker (`None` for every other work type —
740 /// the generic drain leaves it `None`). Carries the phase's request- and
741 /// response-time op counters plus the open-loop serve-latency histogram
742 /// ([`crate::workload::taobench::run::TaobenchPhaseStats`]); the host derives
743 /// per-phase qps / hit-ratio / serve-latency percentiles into
744 /// `PhaseBucket.metrics`. `pub(crate)`: an internal carrier (test authors read
745 /// `PhaseBucket`, not `PhaseSlice`). `PhaseSlice`'s `Eq` is preserved because
746 /// `PlatStats` is `Eq` (the schbench carrier above already holds histograms),
747 /// not because the field is integer-only.
748 pub(crate) taobench: Option<crate::workload::taobench::run::TaobenchPhaseStats>,
749}
750
751/// Reason a sentinel [`WorkerReport`] was synthesized — attached to
752/// the report's `exit_info` field so operators can triage a missing
753/// or undecodable postcard payload without cross-referencing
754/// parent-side logs.
755///
756/// Invariant: every variant carries the `waitpid`-derived status for
757/// the worker PID as of the end of `stop_and_collect`. Ordered from
758/// most-informative (exit code) to least (plumbing failure).
759/// No `Default` impl — every variant carries observed-outcome state.
760/// A default-constructed value would have to pick one variant (TimedOut
761/// was the obvious candidate) but that risks silent-pass: a test
762/// fixture using `..Default::default()` on a struct containing
763/// `WorkerExitInfo` gets "worker never exited within the deadline,"
764/// which an operator triaging the failure would chase for minutes
765/// before realizing the value came from a missing field. Construct
766/// every WorkerExitInfo explicitly via the variant the test scenario
767/// expects (e.g. `WorkerExitInfo::Exited(0)` for a clean success).
768#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
769pub enum WorkerExitInfo {
770 /// `WIFEXITED=true` with the given exit code. Non-zero under
771 /// `panic = "unwind"` means catch_unwind caught a panic in the
772 /// worker-child closure and `_exit(1)` fired, or the 30s
773 /// parent-ready poll timed out. Zero means the worker ran to
774 /// completion but failed to write / serialize the report — a
775 /// postcard encode or pipe-write failure that didn't panic.
776 Exited(i32),
777 /// `WIFSIGNALED=true` with the given signal number. Under
778 /// `panic = "abort"` a worker panic raises SIGABRT (signal 6);
779 /// other values indicate external kill, OOM killer, or the
780 /// still-alive-escalation SIGKILL (signal 9) from this function.
781 Signaled(i32),
782 /// Worker was still running after the 5s shared collection
783 /// deadline; escalated via SIGKILL + blocking `waitpid`. The
784 /// child's final status is not retained — the reap happened past
785 /// the point where operator diagnostics would differ between a
786 /// clean timeout and a signal storm.
787 TimedOut,
788 /// `waitpid` itself returned `Err` — typically ECHILD (child
789 /// already reaped by an external signal handler or a double-reap
790 /// regression) or EINTR. Message is the rendered `errno` string.
791 WaitFailed(String),
792 /// Thread-mode worker panicked. `JoinHandle::join()` returned
793 /// `Err`; the inner payload is downcast to a `&str` / `String`
794 /// (the canonical `panic!` payload shapes) and recorded here so
795 /// the operator can triage without scraping the test log. This
796 /// variant is exclusive to [`CloneMode::Thread`] — fork workers
797 /// surface panics via `Exited(1)` or `Signaled(SIGABRT)`
798 /// depending on the panic strategy.
799 Panicked(String),
800}
801
802/// Pure mapping from a `waitpid` outcome to the diagnostic
803/// [`WorkerExitInfo`] attached to a sentinel [`WorkerReport`].
804///
805/// Split out of [`WorkloadHandle::stop_and_collect`] so the four
806/// shapes each resolve to a `WorkerExitInfo` variant without pulling
807/// in the full collection loop's state (pipe drain, SIGKILL
808/// escalation, pid lifetime). Pure input → output means the variant
809/// matrix is directly testable without spawning children.
810///
811/// Shape → variant:
812/// - `Ok(Exited(_, code))` → [`WorkerExitInfo::Exited`]
813/// - `Ok(Signaled(_, sig, _))` → [`WorkerExitInfo::Signaled`]
814/// - `Ok(StillAlive)` → [`WorkerExitInfo::TimedOut`]
815/// - `Ok(_ exotic)` → [`WorkerExitInfo::TimedOut`] (Stopped /
816/// PtraceEvent / PtraceSyscall / Continued; not reachable for a
817/// plain forked worker with no ptrace parent, but collapsed rather
818/// than silently dropped so coverage stays exhaustive)
819/// - `Err(errno)` → [`WorkerExitInfo::WaitFailed`]
820pub(super) fn classify_wait_outcome(
821 source: Result<nix::sys::wait::WaitStatus, nix::errno::Errno>,
822) -> WorkerExitInfo {
823 match source {
824 Ok(nix::sys::wait::WaitStatus::Exited(_, code)) => WorkerExitInfo::Exited(code),
825 Ok(nix::sys::wait::WaitStatus::Signaled(_, sig, _)) => WorkerExitInfo::Signaled(sig as i32),
826 Ok(nix::sys::wait::WaitStatus::StillAlive) => WorkerExitInfo::TimedOut,
827 Ok(_) => WorkerExitInfo::TimedOut,
828 Err(e) => WorkerExitInfo::WaitFailed(e.to_string()),
829 }
830}
831
832/// Extract a human-readable panic payload from a
833/// [`std::thread::Result`] `Err` value. The two canonical shapes
834/// are `&'static str` (`panic!("literal")`) and `String`
835/// (`panic!("{x}")` post-formatting); anything else falls back to
836/// a fixed sentinel.
837///
838/// Pure mapping (no IO, no allocation past `String::clone`) so the
839/// stop_and_collect path can call it on every joined-and-panicked
840/// thread without performance cliffs.
841pub(super) fn extract_panic_payload(payload: Box<dyn std::any::Any + Send>) -> String {
842 if let Some(s) = payload.downcast_ref::<&'static str>() {
843 (*s).to_string()
844 } else if let Some(s) = payload.downcast_ref::<String>() {
845 s.clone()
846 } else {
847 "<non-string panic payload>".to_string()
848 }
849}
850
851/// Wall-clock time budget for joining a thread-mode worker after
852/// its per-task `stop` has been flipped. Mirrors the fork-mode
853/// `stop_and_collect` 5s shared deadline so neither dispatch path
854/// can serially exhaust the test runtime by hanging on a single
855/// stuck worker. The 100ms `FUTEX_WAIT_TIMEOUT` inside
856/// `worker_main`'s blocking primitives means a well-behaved worker
857/// observes `stop=true` within 100ms of the parent's flip; the 5s
858/// budget covers IO drain, scheduling delays under contention, and
859/// post-loop cleanup (NUMA stat reads, schedstat snapshots).
860pub(super) const THREAD_JOIN_TIMEOUT: Duration = Duration::from_secs(5);
861
862/// Block until `join` reports finished or `timeout` elapses.
863/// Returns `Some(thread_result)` on successful join, `None` on
864/// timeout.
865///
866/// Implementation: wait on `exit_evt` (the worker's "I'm about to
867/// return" eventfd, bumped from a Drop guard inside the thread
868/// closure) via `epoll_wait` with a `timerfd` for the safety
869/// deadline. A spurious wake (e.g. EINTR or a stale eventfd-counter
870/// drain) loops back into the wait without orphaning the worker —
871/// the timerfd carries the absolute deadline.
872///
873/// Std lacks a native timed-join API; an alternative side-thread
874/// "joiner + channel" pattern would orphan the joiner on timeout
875/// (joining is non-cancellable in std), which keeps the thread
876/// alive past `WorkloadHandle::drop` and prevents process exit.
877/// The eventfd path replaces the previous 10ms sleep-poll loop
878/// without that orphan cost.
879pub(super) fn join_thread_with_timeout(
880 join: std::thread::JoinHandle<WorkerReport>,
881 exit_evt: &vmm_sys_util::eventfd::EventFd,
882 timeout: Duration,
883) -> Option<std::thread::Result<WorkerReport>> {
884 use std::os::unix::io::AsRawFd;
885 use vmm_sys_util::epoll::{ControlOperation, Epoll, EpollEvent, EventSet};
886 use vmm_sys_util::timerfd::TimerFd;
887
888 if join.is_finished() {
889 return Some(join.join());
890 }
891
892 let epoll = match Epoll::new() {
893 Ok(e) => e,
894 Err(e) => {
895 tracing::warn!(%e, "join_thread_with_timeout: epoll_create1 failed");
896 return None;
897 }
898 };
899 if let Err(e) = epoll.ctl(
900 ControlOperation::Add,
901 exit_evt.as_raw_fd(),
902 EpollEvent::new(EventSet::IN, 0),
903 ) {
904 tracing::warn!(%e, "join_thread_with_timeout: add exit_evt to epoll");
905 return None;
906 }
907 let mut timer = match TimerFd::new() {
908 Ok(t) => t,
909 Err(e) => {
910 tracing::warn!(%e, "join_thread_with_timeout: timerfd_create failed");
911 return None;
912 }
913 };
914 if let Err(e) = timer.reset(timeout, None) {
915 tracing::warn!(%e, "join_thread_with_timeout: timerfd_settime failed");
916 return None;
917 }
918 if let Err(e) = epoll.ctl(
919 ControlOperation::Add,
920 timer.as_raw_fd(),
921 EpollEvent::new(EventSet::IN, 1),
922 ) {
923 tracing::warn!(%e, "join_thread_with_timeout: add timerfd to epoll");
924 return None;
925 }
926
927 let deadline = Instant::now() + timeout;
928 let mut events = [EpollEvent::default(); 2];
929 loop {
930 if join.is_finished() {
931 return Some(join.join());
932 }
933 let remaining = deadline.saturating_duration_since(Instant::now());
934 if remaining.is_zero() {
935 return None;
936 }
937 match epoll.wait(-1, &mut events) {
938 Ok(_) => {}
939 Err(e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
940 Err(e) => {
941 tracing::warn!(%e, "join_thread_with_timeout: epoll_wait failed");
942 return None;
943 }
944 }
945 }
946}
947
948/// `worker_main`'s loop-check predicate: returns `true` when the
949/// worker should stop iterating. Reads BOTH the per-worker `stop`
950/// flag and the global [`STOP`] flag; either set request causes
951/// exit.
952///
953/// Why both:
954/// - Per-worker `stop` is what `WorkloadHandle::stop_and_collect`
955/// flips for graceful shutdown. For Fork mode the per-worker
956/// `stop` IS the global [`STOP`] (the SIGUSR1 handler flips it).
957/// For Thread mode each worker has its own `Arc<AtomicBool>`
958/// passed via `&AtomicBool`.
959/// - The global [`STOP`] is what the SIGUSR1 handler sets. For
960/// Fork mode the worker's per-process [`STOP`] is the same
961/// AtomicBool the handler writes. For Thread mode every thread
962/// shares the parent process's address space, so a SIGUSR1
963/// delivered to the parent (e.g. Ctrl-C / a test harness signal)
964/// flips the shared global [`STOP`] but NOT the per-worker
965/// `stop` Arcs. Without this disjunction, Thread workers would
966/// silently keep running through a parent-level shutdown
967/// request.
968///
969/// `#[inline]` because the call site is two atomic loads + an OR.
970/// Relaxed ordering on both reads matches every existing site —
971/// no cross-field happens-before edge to establish.
972#[inline]
973pub(super) fn stop_requested(stop: &AtomicBool) -> bool {
974 stop.load(Ordering::Relaxed) || STOP.load(Ordering::Relaxed)
975}
976
977/// Per-thread worker state for [`CloneMode::Thread`] dispatch.
978///
979/// Thread workers cannot be reaped via `waitpid` (they share a tgid
980/// with the parent), so the lifecycle uses Rust's [`std::thread`]
981/// primitives instead of pid-based syscalls:
982///
983/// - `tid` is published by the worker thread post-spawn via
984/// `gettid()` so the parent can address the kernel task for
985/// `sched_setaffinity(tid, ...)` and report it from
986/// [`WorkloadHandle::worker_pids`]. `Arc<AtomicI32>` because the
987/// thread closure owns the publisher and the parent reads it
988/// without joining.
989/// - `stop` replaces the global [`STOP`] signal-flag for thread
990/// mode: the parent flips it from
991/// [`WorkloadHandle::stop_and_collect`], the worker observes it
992/// inside `worker_main`'s `stop.load(Relaxed)` checks. SIGUSR1 is
993/// process-wide and useless for per-thread stop control.
994/// - `start_tx` is the rendezvous channel: the parent calls
995/// `send(())` from [`WorkloadHandle::start`]; the thread blocks
996/// in `recv()` until then. `Option` so `start` can take it and
997/// drop it (idempotent re-call is a no-op when `None`).
998/// - `join` holds the [`std::thread::JoinHandle`] returned by
999/// `thread::spawn`; `stop_and_collect` joins each handle to
1000/// retrieve the [`WorkerReport`]. `Option` so `stop_and_collect`
1001/// can take ownership and `Drop` does not double-join.
1002pub(super) struct ThreadWorker {
1003 tid: std::sync::Arc<std::sync::atomic::AtomicI32>,
1004 stop: std::sync::Arc<AtomicBool>,
1005 pub(super) start_tx: Option<std::sync::mpsc::SyncSender<()>>,
1006 join: Option<std::thread::JoinHandle<WorkerReport>>,
1007 /// Eventfd bumped by the worker thread's `WorkerExitSignal` Drop
1008 /// guard before the thread returns from its closure. Lets
1009 /// [`join_thread_with_timeout`] block in `epoll_wait` instead of
1010 /// sleep-polling [`std::thread::JoinHandle::is_finished`]. Counter
1011 /// mode (not semaphore) — the value never matters; only the edge
1012 /// from 0 to non-zero does. The Arc is cloned into the closure
1013 /// for the Drop guard; the parent retains the original here.
1014 exit_evt: std::sync::Arc<vmm_sys_util::eventfd::EventFd>,
1015}
1016
1017/// Defense-in-depth Drop for [`ThreadWorker`]. Rust's
1018/// [`std::thread::JoinHandle`] does NOT join its thread on drop —
1019/// it detaches, and the thread continues running until completion.
1020/// `WorkloadHandle::drop`, `WorkloadHandle::stop_and_collect`, and
1021/// `SpawnGuard::drop` already explicitly `take()` the JoinHandle and
1022/// route it through [`join_thread_with_timeout`]; this impl exists
1023/// for the case where some future refactor lets a `ThreadWorker`
1024/// fall out of scope without going through one of those paths.
1025///
1026/// Behavior: if `join` is still `Some` when this Drop fires, flip
1027/// `stop` (so the worker exits cleanly), drop `start_tx` (in case
1028/// the worker is still parked on `recv()`), and join with the
1029/// shared 5s budget. Errors / timeouts are swallowed because Drop
1030/// has nothing to assert against; the upstream paths produce the
1031/// auditable diagnostics.
1032impl Drop for ThreadWorker {
1033 fn drop(&mut self) {
1034 if let Some(j) = self.join.take() {
1035 self.stop.store(true, Ordering::Relaxed);
1036 self.start_tx.take();
1037 let _ = join_thread_with_timeout(j, &self.exit_evt, THREAD_JOIN_TIMEOUT);
1038 }
1039 }
1040}
1041
1042/// Per-fork-child report-decoding shape and bookkeeping recorded
1043/// alongside each entry in [`SpawnGuard::children`] and
1044/// [`WorkloadHandle::children`].
1045///
1046/// Two variants:
1047///
1048/// - [`ForkedChildKind::Worker`]: the conventional fork-mode worker
1049/// (one process per worker, single postcard `WorkerReport`).
1050/// Carries the worker's `group_idx` so a sentinel report on a
1051/// missing payload can be tagged correctly.
1052/// - [`ForkedChildKind::PcommContainer`]: a single thread-group
1053/// leader that owns `num_workers` worker threads across one or
1054/// more logical groups. The leader sets its own `comm` to `pcomm`
1055/// via `prctl(PR_SET_NAME)` before spawning the threads, so every
1056/// worker thread's `task->group_leader->comm` is `pcomm` exactly
1057/// (the [`WorkSpec::pcomm`] builder rejects > 15 bytes —
1058/// `TASK_COMM_LEN - 1` — so the framework never feeds the kernel
1059/// a name `__set_task_comm` would truncate). Reports are a single
1060/// `serde_json` `Vec<WorkerReport>` (one entry per worker thread;
1061/// per-thread `group_idx` lives inside each `WorkerReport`).
1062///
1063/// Note the per-variant wire-format split: `Worker` uses postcard and
1064/// `PcommContainer` uses serde_json. The encodings are dispatched by
1065/// `ForkedChildKind` at decode time on the parent.
1066#[derive(Clone, Debug)]
1067pub(super) enum ForkedChildKind {
1068 /// Conventional fork-mode worker (one process per worker).
1069 /// Report wire format: bare `b'r'` ready byte followed by a
1070 /// `postcard::to_stdvec(&WorkerReport)` document.
1071 Worker { group_idx: usize },
1072 /// pcomm thread-group leader hosting worker threads across one
1073 /// or more logical groups. `groups` records the per-group
1074 /// `(group_idx, num_workers)` layout in the order threads were
1075 /// spawned: `groups[0]` contributes the first
1076 /// `groups[0].1` worker reports, `groups[1]` the next
1077 /// `groups[1].1`, and so on. Total expected reports =
1078 /// `groups.iter().map(|(_, n)| n).sum()`. The layout drives
1079 /// sentinel distribution: when the JSON payload is missing or
1080 /// short, the parent emits sentinels tagged with the right
1081 /// per-group `group_idx` so per-group filters partition
1082 /// correctly.
1083 ///
1084 /// Report wire format: bare `b'r'` ready byte followed by a
1085 /// `serde_json::to_vec(&Vec<WorkerReport>)` document, one entry
1086 /// per worker thread (in `(group_idx, within-group index)`
1087 /// traversal order matching the `groups` layout).
1088 PcommContainer { groups: Vec<(usize, usize)> },
1089}
1090
1091/// Bookkeeping for a single forked-child process owned by the spawn
1092/// pipeline. Replaces the prior `(pid, report_fd, start_fd)` tuple so
1093/// per-child decoding metadata ([`ForkedChildKind`]) lives alongside
1094/// the pipe fds and pid.
1095#[derive(Clone, Debug)]
1096pub(super) struct ForkedChild {
1097 /// Forked child's pid (== tgid for both `Worker` and
1098 /// `PcommContainer`: the worker is its own thread-group leader,
1099 /// the container is the thread-group leader of its inner threads).
1100 pub pid: libc::pid_t,
1101 /// Parent-side read end of the report pipe. The child writes one
1102 /// `b'r'` ready byte followed by either a postcard-encoded single
1103 /// `WorkerReport` (Worker) or a `serde_json` `Vec<WorkerReport>`
1104 /// (PcommContainer) per the [`ForkedChildKind`] tag.
1105 pub report_fd: std::os::unix::io::RawFd,
1106 /// Parent-side write end of the start pipe. Closed by
1107 /// [`crate::workload::WorkloadHandle::start`] after writing the
1108 /// start byte; set to `-1` thereafter.
1109 pub start_fd: std::os::unix::io::RawFd,
1110 /// Per-child decoding shape. See [`ForkedChildKind`].
1111 pub kind: ForkedChildKind,
1112}
1113
1114/// Handle to spawned worker tasks. Workers block until
1115/// [`start()`](Self::start) is called.
1116///
1117/// The [`CloneMode`] in the [`WorkloadConfig`] selects how each
1118/// worker is created. Within one [`WorkloadHandle`] every worker
1119/// uses the same mode, so exactly one of `children` or `threads`
1120/// is populated; the other is empty. This avoids per-worker mode
1121/// dispatch on the hot path and keeps each vec's per-mode
1122/// invariants (pid-based vs JoinHandle-based reaping) cohesive.
1123///
1124/// - [`CloneMode::Fork`] populates `children` — separate process
1125/// per worker, reaped via `waitpid`, signaled via SIGUSR1.
1126/// - [`CloneMode::Thread`] populates `threads` — separate kernel
1127/// task in the parent's thread group via [`std::thread::spawn`],
1128/// joined via `JoinHandle`. Workers share the parent's tgid;
1129/// per-worker cgroup placement requires `cgroup.threads`
1130/// (cgroup v2 thread mode), which ktstr scenarios do not
1131/// currently configure — Thread-mode workers inherit the
1132/// parent's cgroup.
1133#[must_use = "dropping a WorkloadHandle immediately tears down all worker tasks"]
1134pub struct WorkloadHandle {
1135 /// Forked-child processes owned by this handle. Each entry is a
1136 /// [`ForkedChild`] carrying pid, parent-side pipe fds, and a
1137 /// [`ForkedChildKind`] tag that drives report decoding (single
1138 /// postcard `WorkerReport` for `Worker`; `serde_json`
1139 /// `Vec<WorkerReport>` for `PcommContainer`). Empty when every
1140 /// group used [`CloneMode::Thread`] without `pcomm`.
1141 children: Vec<ForkedChild>,
1142 /// Thread-mode workers. Empty when `clone_mode` is not
1143 /// [`CloneMode::Thread`].
1144 threads: Vec<ThreadWorker>,
1145 started: bool,
1146 /// Shared mmap regions for futex-based work types (one per worker group). Unmapped on drop.
1147 futex_ptrs: Vec<*mut u32>,
1148 /// Per-region byte length, parallel to `futex_ptrs`. Each
1149 /// region was sized at spawn time to its source group's
1150 /// natural width (4 for FutexPingPong / FutexFanOut /
1151 /// MutexContention / etc., 16 for FanOutCompute, 32 + Q*8 for
1152 /// ProducerConsumerImbalance — see [`futex_region_size_for`]).
1153 /// `futex_ptrs[i]` and `futex_region_sizes[i]` describe the
1154 /// same region; both are consumed pairwise on `Drop` so each
1155 /// `munmap` call receives the matching length.
1156 futex_region_sizes: Vec<usize>,
1157 /// MAP_SHARED region of per-worker iteration counters. Workers
1158 /// atomically store their iteration count; parent reads via
1159 /// `snapshot_iterations()`. Pointer to the first element; length
1160 /// is the active worker collection's len. Typed as
1161 /// `*mut AtomicU64` rather than `*mut u64` so the 8-byte
1162 /// alignment guarantee (inherited from the page-aligned
1163 /// iter_counters mmap site in `WorkloadHandle::spawn`) and the
1164 /// atomic-only-access invariant are encoded in the type system
1165 /// instead of prose. `AtomicU64` is layout-compatible with `u64`:
1166 /// `std::mem::size_of::<AtomicU64>() == std::mem::align_of::<AtomicU64>() == 8`
1167 /// on every supported target, so casting the `*mut c_void`
1168 /// returned by `mmap` to `*mut AtomicU64` is sound.
1169 iter_counters: *mut AtomicU64,
1170 /// Number of AtomicU64 slots in iter_counters (== num_workers at spawn time).
1171 iter_counter_len: usize,
1172 /// MAP_SHARED single-word phase-epoch region. The scenario engine
1173 /// stores the current 1-indexed phase here (0 = BASELINE, Step k →
1174 /// k + 1, `u32::MAX` = inter-step gap); every backdrop worker of
1175 /// this handle reads the SAME pointer (not a per-worker slot — the
1176 /// epoch is a broadcast value, all the handle's workers transition
1177 /// phases together) to attribute its per-phase `PhaseSlice`s. A
1178 /// single `AtomicU32`, not the per-worker `AtomicU64` layout of
1179 /// `iter_counters`, because one shared word suffices. Null until
1180 /// the spawn-site mmap installs it.
1181 phase_epoch: *mut AtomicU32,
1182 /// Byte length of the `phase_epoch` mmap, for `munmap` on drop
1183 /// (`size_of::<AtomicU32>()`; the kernel rounds the mapping up to a
1184 /// page).
1185 phase_epoch_bytes: usize,
1186 /// Inter-worker paired pipes `(ab, ba)` for PipeIo / CachePipe.
1187 /// Transferred from [`SpawnGuard`] on success; closed by
1188 /// [`WorkloadHandle::drop`] AFTER worker shutdown so Thread-mode
1189 /// workers (which share the parent's fd table) can finish their
1190 /// pipe ops before the close. Under Fork mode each child holds
1191 /// its own fd-table copy via `fork()`, so the parent's late
1192 /// close is a no-op for the children. Empty when `work_type`
1193 /// is neither PipeIo nor CachePipe.
1194 pipe_pairs: Vec<([i32; 2], [i32; 2])>,
1195 /// Per-chain pipe rings for `WakeChain { wake: WakeMechanism::Pipe }`.
1196 /// Outer Vec is one entry per chain (= `num_workers / depth`);
1197 /// inner Vec is `depth` pipes per chain. Same ownership rule as
1198 /// `pipe_pairs`: transferred from [`SpawnGuard`] on success,
1199 /// closed by [`WorkloadHandle::drop`] AFTER worker shutdown so
1200 /// Thread-mode chain workers don't observe `EBADF` mid-run.
1201 /// Empty when `work_type` is not `WakeChain { wake: Pipe }`.
1202 chain_pipes: Vec<Vec<[i32; 2]>>,
1203}
1204
1205/// Per-variant byte length for the MAP_SHARED futex region.
1206///
1207/// Each WorkType that needs a shared region has a fixed natural
1208/// size:
1209///
1210/// - [`WorkType::FanOutCompute`] needs 16 bytes — futex `u32` at
1211/// offset 0, wake-timestamp `u64` at offset 8.
1212/// - [`WorkType::ProducerConsumerImbalance`] needs a ring buffer:
1213/// reserve-head `u64` @ 0, tail `u64` @ 8, producer-wake `u32`
1214/// @ 16, consumer-wake `u32` @ 20, publish-head `u64` @ 24,
1215/// then `Q` × `u64` ring slots starting at offset 32. Total
1216/// bytes = `32 + Q*8`. The two head counters split the MPMC
1217/// protocol: producers CAS-advance the reserve head to claim a
1218/// unique slot index, write the slot, then in slot-index FIFO
1219/// order release-store the publish head; consumers
1220/// acquire-load the publish head so a slot is visible to a
1221/// reader only after its producer's data write
1222/// synchronizes-with the consumer through the publish-head
1223/// release. `queue_depth_target` is `u64` to match the variant;
1224/// an `as usize` truncation on a 32-bit host could silently
1225/// produce a sub-page region with a malformed queue, so the
1226/// conversion is clamped at `usize::MAX/8 - 4` to keep the
1227/// layout well-defined (one fewer slot than before to make
1228/// room for `pub_head`). Realistic configs use Q in the
1229/// hundreds-to-thousands; the clamp only triggers on a
1230/// degenerate input that itself fails admission control
1231/// elsewhere (the queue is far larger than RAM).
1232/// - [`WorkType::SignalStorm`] needs 8 bytes — two `u32` tid
1233/// slots (worker 0's tid @ offset 0, worker 1's @ offset 4).
1234/// - Everything else: `u32` (4 bytes).
1235///
1236/// Returning the same byte count for every WorkType variant lets
1237/// the caller mmap exactly what's needed for THIS group rather
1238/// than the MAX across all groups, so a small-variant group
1239/// composed alongside a large `ProducerConsumerImbalance` no
1240/// longer pays the large group's per-region overhead.
1241pub(super) fn futex_region_size_for(work_type: &WorkType) -> usize {
1242 match work_type {
1243 WorkType::FanOutCompute { .. } => 16,
1244 WorkType::ProducerConsumerImbalance {
1245 queue_depth_target, ..
1246 } => {
1247 let q = std::cmp::min(*queue_depth_target as usize, usize::MAX / 8 - 4);
1248 32 + q * 8
1249 }
1250 // SignalStorm exchanges two u32 tid slots through the region
1251 // (worker 0's tid @ offset 0, worker 1's @ offset 4), so it
1252 // needs 8 bytes, not the default single u32.
1253 WorkType::SignalStorm { .. } => 2 * std::mem::size_of::<u32>(),
1254 _ => std::mem::size_of::<u32>(),
1255 }
1256}
1257
1258/// Scope guard that owns every resource acquired during
1259/// [`WorkloadHandle::spawn`]'s partial setup. If `spawn` returns
1260/// early (via `?` or `bail!`), the guard's `Drop` kills and reaps any
1261/// already-forked children, closes every open pipe fd, and munmaps
1262/// every shared region — so a mid-setup failure never leaks fds,
1263/// zombie processes, or anonymous-shared pages.
1264///
1265/// On success, [`SpawnGuard::into_handle`] moves every live
1266/// resource — children/threads, futex regions, iter-counter
1267/// region, AND `pipe_pairs` / `chain_pipes` — into the returned
1268/// [`WorkloadHandle`]. The guard's subsequent `Drop` runs against
1269/// empty Vecs/null pointers and is a no-op on the success path.
1270/// On the early-bail path (an `?` inside `WorkloadHandle::spawn`)
1271/// the guard still owns whatever it allocated and `Drop` cleans
1272/// it all up — fds, processes, threads, mmaps. Pipe fds are
1273/// closed by the handle (not the guard) because Thread-mode
1274/// workers share the parent's fd table; closing the fds before
1275/// worker shutdown would surface as `EBADF` on every pipe op a
1276/// thread runs after spawn returns.
1277pub(super) struct SpawnGuard {
1278 /// Inter-worker paired pipes `(ab, ba)` for PipeIo/CachePipe.
1279 /// Transferred to [`WorkloadHandle`] on success; closed by the
1280 /// guard only on the early-bail path. Under Fork mode each
1281 /// child holds its own fd-table copy via `fork()`; under
1282 /// Thread mode every worker thread shares these fds with the
1283 /// parent.
1284 pipe_pairs: Vec<([i32; 2], [i32; 2])>,
1285 /// Per-chain pipe rings for `WakeChain { wake: WakeMechanism::Pipe }`. Outer
1286 /// Vec is one entry per chain (= `num_workers / depth`); inner
1287 /// Vec is `depth` pipes per chain. Pipe `i` connects stage `i`
1288 /// (writer) to stage `(i + 1) % depth` (reader). Same ownership
1289 /// shape as `pipe_pairs`: transferred to the handle on success,
1290 /// closed by the guard only on the early-bail path.
1291 chain_pipes: Vec<Vec<[i32; 2]>>,
1292 /// Shared-memory futex regions (transferred to handle on success).
1293 futex_ptrs: Vec<*mut u32>,
1294 /// Per-region byte length, parallel to `futex_ptrs`. Each
1295 /// region is sized to its source group's natural width
1296 /// (4 / 16 / 32+Q*8 — see [`futex_region_size_for`]) and
1297 /// recorded here at `spawn_group` time so munmap on Drop
1298 /// can call `libc::munmap(ptr, len)` with the matching length
1299 /// even when groups with different natural sizes co-exist.
1300 /// `futex_ptrs[i]` and `futex_region_sizes[i]` describe the
1301 /// same region.
1302 futex_region_sizes: Vec<usize>,
1303 /// Per-worker iteration counter region (transferred on success).
1304 /// Typed matches the handle field; see `WorkloadHandle::iter_counters`.
1305 iter_counters: *mut AtomicU64,
1306 iter_counter_bytes: usize,
1307 /// Per-handle phase-epoch region (transferred to handle on
1308 /// success). Single `AtomicU32` word shared by all the handle's
1309 /// workers; see `WorkloadHandle::phase_epoch`.
1310 phase_epoch: *mut AtomicU32,
1311 phase_epoch_bytes: usize,
1312 /// Already-forked children (either conventional workers or
1313 /// pcomm containers) with their parent-side pipe fds and
1314 /// per-child decoding shape (transferred to handle on success).
1315 children: Vec<ForkedChild>,
1316 /// Already-spawned thread workers (transferred on success).
1317 /// Cleanup on early-exit flips each `stop` and joins each
1318 /// thread, since threads share the parent's address space and
1319 /// must be drained cooperatively (no `kill` equivalent).
1320 threads: Vec<ThreadWorker>,
1321}
1322
1323impl SpawnGuard {
1324 fn new() -> Self {
1325 Self {
1326 pipe_pairs: Vec::new(),
1327 chain_pipes: Vec::new(),
1328 futex_ptrs: Vec::new(),
1329 futex_region_sizes: Vec::new(),
1330 iter_counters: std::ptr::null_mut(),
1331 iter_counter_bytes: 0,
1332 phase_epoch: std::ptr::null_mut(),
1333 phase_epoch_bytes: 0,
1334 children: Vec::new(),
1335 threads: Vec::new(),
1336 }
1337 }
1338
1339 /// Transfer live resources into a [`WorkloadHandle`]. Leaves the
1340 /// guard's `children`, `threads`, `futex_ptrs`,
1341 /// `futex_region_sizes`, `iter_counters`, `pipe_pairs`, and
1342 /// `chain_pipes` empty, so the guard's subsequent `Drop` is a
1343 /// no-op on the success path. The handle is now the sole owner
1344 /// of every resource — its own `Drop` closes the pipe fds
1345 /// AFTER worker shutdown completes, which is the ordering
1346 /// Thread mode requires (workers share the parent's fd table;
1347 /// closing pre-shutdown would surface as `EBADF` on every
1348 /// worker's pipe op). Fork mode is unaffected either way: each
1349 /// child holds its own fd-table copy via `fork()`, so the
1350 /// parent's close timing is invisible to the child.
1351 fn into_handle(mut self) -> WorkloadHandle {
1352 let children = std::mem::take(&mut self.children);
1353 let threads = std::mem::take(&mut self.threads);
1354 let futex_ptrs = std::mem::take(&mut self.futex_ptrs);
1355 let futex_region_sizes = std::mem::take(&mut self.futex_region_sizes);
1356 let iter_counters = std::mem::replace(&mut self.iter_counters, std::ptr::null_mut());
1357 let iter_counter_bytes = std::mem::replace(&mut self.iter_counter_bytes, 0);
1358 let iter_counter_len = iter_counter_bytes / std::mem::size_of::<AtomicU64>();
1359 let phase_epoch = std::mem::replace(&mut self.phase_epoch, std::ptr::null_mut());
1360 let phase_epoch_bytes = std::mem::replace(&mut self.phase_epoch_bytes, 0);
1361 let pipe_pairs = std::mem::take(&mut self.pipe_pairs);
1362 let chain_pipes = std::mem::take(&mut self.chain_pipes);
1363 WorkloadHandle {
1364 children,
1365 threads,
1366 started: false,
1367 futex_ptrs,
1368 futex_region_sizes,
1369 iter_counters,
1370 iter_counter_len,
1371 phase_epoch,
1372 phase_epoch_bytes,
1373 pipe_pairs,
1374 chain_pipes,
1375 }
1376 }
1377}
1378
1379impl Drop for SpawnGuard {
1380 fn drop(&mut self) {
1381 // Kill and reap any already-forked children first, so their
1382 // pipe ends are not left blocked when we close the parent
1383 // side. `nix` wrappers replace the raw libc calls — kill
1384 // returns `Result<()>` (we swallow ECHILD/ESRCH in the
1385 // already-exited case), waitpid returns `Result<WaitStatus>`
1386 // (we discard the status in the cleanup path), close returns
1387 // `Result<()>` (we swallow EBADF for fds an earlier arm may
1388 // have already closed).
1389 for child in &self.children {
1390 let npid = nix::unistd::Pid::from_raw(child.pid);
1391 let _ = nix::sys::signal::kill(npid, nix::sys::signal::Signal::SIGKILL);
1392 let _ = nix::sys::wait::waitpid(npid, None);
1393 }
1394 // Close each child's parent-side report/start fds.
1395 for child in &self.children {
1396 close_fds_silently(&[child.report_fd, child.start_fd]);
1397 }
1398 // Stop and join any partially-spawned threads. Threads
1399 // share our address space, so `kill` does not reach them
1400 // and the only safe teardown is "flip stop, drop the start
1401 // channel (in case worker is still parked on `recv`), then
1402 // join". Dropping `start_tx` causes `recv` on the worker
1403 // side to return `Err(Disconnected)`, unblocking a thread
1404 // that has not yet been signaled. After both signals
1405 // (stop=true and start_tx dropped), `worker_main`'s outer
1406 // loop exits at the next `stop.load(Relaxed)` check (max
1407 // ~100ms latency from the `FUTEX_WAIT_TIMEOUT` poll
1408 // cadence) and the thread completes. `join` returns the
1409 // partial `WorkerReport` (or `Err` on panic, which we
1410 // swallow because mid-spawn cleanup has nothing to assert).
1411 for tw in &mut self.threads {
1412 tw.stop.store(true, Ordering::Relaxed);
1413 // Drop start_tx FIRST so a worker still parked on
1414 // recv() unblocks via Disconnected.
1415 tw.start_tx.take();
1416 if let Some(j) = tw.join.take() {
1417 // SpawnGuard cleanup uses the same `THREAD_JOIN_TIMEOUT`
1418 // budget as `stop_and_collect` and `WorkloadHandle::drop`
1419 // so a stuck worker can't pin mid-spawn error recovery.
1420 // Errors (panic / timeout) are silently dropped — the
1421 // mid-spawn path has nothing to assert against beyond
1422 // not leaking, and the spawn-side bail message has
1423 // already named the failure mode that triggered cleanup.
1424 let _ = join_thread_with_timeout(j, &tw.exit_evt, THREAD_JOIN_TIMEOUT);
1425 }
1426 }
1427 // Early-bail pipe close. On the success path, into_handle
1428 // moved both `pipe_pairs` and `chain_pipes` into the handle,
1429 // so these Vecs are empty here and these loops iterate
1430 // nothing. On the early-bail path the guard still owns the
1431 // partially-allocated pipes and must close them now — the
1432 // child arm of each fork already closed any inherited
1433 // copies it held, and Thread-mode early-bail joined any
1434 // partially-spawned threads above before this loop runs.
1435 for (ab, ba) in &self.pipe_pairs {
1436 close_fds_silently(&[ab[0], ab[1], ba[0], ba[1]]);
1437 }
1438 for chain in &self.chain_pipes {
1439 for pipe in chain {
1440 close_fds_silently(&[pipe[0], pipe[1]]);
1441 }
1442 }
1443 // Munmap shared regions. `futex_ptrs[i]` and
1444 // `futex_region_sizes[i]` describe the same region, so each
1445 // munmap receives the exact length used for the matching
1446 // mmap. The two vectors are appended in lockstep inside
1447 // `spawn_group`, so they have identical lengths in every
1448 // observable state.
1449 for (&ptr, &size) in self.futex_ptrs.iter().zip(self.futex_region_sizes.iter()) {
1450 unsafe {
1451 libc::munmap(ptr as *mut libc::c_void, size);
1452 }
1453 }
1454 if !self.iter_counters.is_null() && self.iter_counter_bytes > 0 {
1455 unsafe {
1456 libc::munmap(
1457 self.iter_counters as *mut libc::c_void,
1458 self.iter_counter_bytes,
1459 );
1460 }
1461 }
1462 if !self.phase_epoch.is_null() && self.phase_epoch_bytes > 0 {
1463 unsafe {
1464 libc::munmap(
1465 self.phase_epoch as *mut libc::c_void,
1466 self.phase_epoch_bytes,
1467 );
1468 }
1469 }
1470 }
1471}
1472
1473// SAFETY: futex_ptrs, iter_counters, and phase_epoch are MAP_SHARED anonymous pages
1474// created before fork, so every forked child inherits a pointer copy
1475// of the same underlying kernel object. Children read/write their own
1476// futex word — via `std::ptr::read_volatile`/`write_volatile` for
1477// most WorkType variants, or via `AtomicU32`/`AtomicU64` references
1478// re-derived from the raw pointer for FanOutCompute, which needs
1479// release-acquire ordering to publish `wake_ns` alongside the
1480// generation advance — and atomically store into their dedicated
1481// iter_counters slot (via a shared `&AtomicU64` reference derived
1482// from the `*mut AtomicU64` region pointer); the parent reads
1483// all slots via `snapshot_iterations` and is the sole process that
1484// munmaps the region, on WorkloadHandle::drop after every child has
1485// been reaped.
1486//
1487// Per-mode aliasing rationale:
1488//
1489// - Fork mode: each forked child constructs its own process-local
1490// `&AtomicU32`/`&AtomicU64` shared reference into the MAP_SHARED
1491// page from the inherited raw pointer. No reference value ever
1492// crosses a process boundary — each process synthesises its own
1493// reference from the same underlying kernel object. Interior
1494// mutation through a shared atomic reference is permitted by
1495// Rust's aliasing model because `AtomicU32`/`AtomicU64` wrap an
1496// `UnsafeCell`; the post-fork alias relation is therefore not an
1497// aliasing-rule violation.
1498//
1499// - Thread mode: under [`CloneMode::Thread`] every worker thread
1500// shares the parent process's single address space — the same
1501// raw `*mut AtomicU32`/`*mut AtomicU64` pointer is dereferenced
1502// from multiple threads concurrently, and the resulting
1503// `&AtomicU32`/`&AtomicU64` shared references coexist for
1504// overlapping lifetimes. This is sound for the same reason
1505// `Arc<AtomicU64>` is sound: atomic types' `UnsafeCell`-wrapped
1506// storage permits concurrent shared-reference access by design,
1507// and the underlying load/store instructions are by construction
1508// non-tearing on every supported target. No `&mut` reference is
1509// ever materialised; every access is via the atomic API. The
1510// MAP_SHARED region is allocated once before any worker spawns
1511// and `munmap`ped after every worker has been joined, so the
1512// underlying kernel object outlives every alias.
1513//
1514// `phase_epoch` follows the same MAP_SHARED aliasing rules with the
1515// roles reversed: the parent (scenario engine) is the sole WRITER of
1516// the single `AtomicU32` word (Release stores at each step boundary)
1517// and the backdrop workers are READERS (Relaxed loads). It is
1518// telemetry — no happens-before edge is required, matching the
1519// `iter_counters` Relaxed-store / Relaxed-read precedent.
1520unsafe impl Send for WorkloadHandle {}
1521unsafe impl Sync for WorkloadHandle {}
1522
1523/// Pointer-sized addresses passed across a thread-spawn boundary.
1524///
1525/// Rust's auto-`Send` inference on closures conservatively treats
1526/// `*mut T` as `!Send` even inside a wrapper struct destructured in
1527/// the closure body — the destructured field type leaks into the
1528/// closure's auto-trait check. The simplest workaround is to round-
1529/// trip the pointers through `usize` (Send + Copy) and re-cast on
1530/// the receiver side. Soundness is identical: thread-mode workers
1531/// share the parent's address space, so the addresses retain
1532/// meaning across the thread boundary, and the underlying
1533/// MAP_SHARED regions are owned by the guard / handle for the full
1534/// duration of every worker.
1535///
1536/// `SendFutexPtr` carries a (futex_address, pos) tuple wrapped in
1537/// `Option`; `None` is the "no futex required" case for work types
1538/// that don't need shared memory. `SendIterSlotPtr` carries a single
1539/// address (zero ⇒ no iter_slot publish).
1540#[derive(Clone, Copy)]
1541pub(super) struct SendFutexPtr(Option<(usize, usize)>);
1542
1543#[derive(Clone, Copy)]
1544pub(super) struct SendIterSlotPtr(usize);
1545
1546impl SendFutexPtr {
1547 fn new(p: Option<(*mut u32, usize)>) -> Self {
1548 SendFutexPtr(p.map(|(ptr, pos)| (ptr as usize, pos)))
1549 }
1550
1551 /// Re-cast back into the `*mut u32` + `pos` tuple `worker_main`
1552 /// expects.
1553 fn into_raw(self) -> Option<(*mut u32, usize)> {
1554 self.0.map(|(addr, pos)| (addr as *mut u32, pos))
1555 }
1556}
1557
1558impl SendIterSlotPtr {
1559 fn new(p: *mut AtomicU64) -> Self {
1560 SendIterSlotPtr(p as usize)
1561 }
1562
1563 fn into_raw(self) -> *mut AtomicU64 {
1564 self.0 as *mut AtomicU64
1565 }
1566}
1567
1568/// Send-safe carrier for the single shared `phase_epoch` word (the SAME
1569/// pointer for every worker in a group), mirroring [`SendIterSlotPtr`].
1570/// Round-trips the address through `usize` so a worker closure's capture
1571/// set stays `Send` (no raw-pointer field in the closure type).
1572#[derive(Clone, Copy)]
1573pub(super) struct SendPhaseEpochPtr(usize);
1574
1575impl SendPhaseEpochPtr {
1576 fn new(p: *mut AtomicU32) -> Self {
1577 SendPhaseEpochPtr(p as usize)
1578 }
1579
1580 fn into_raw(self) -> *mut AtomicU32 {
1581 self.0 as *mut AtomicU32
1582 }
1583}
1584
1585/// Per-group resolved view of [`WorkloadConfig`] used by the
1586/// spawn pipeline.
1587///
1588/// [`WorkloadHandle::spawn`] iterates one `GroupParams` per group
1589/// it spawns: the primary group (`group_idx == 0`) is built from
1590/// the top-level [`WorkloadConfig`] fields via
1591/// [`Self::primary`], and each composed [`WorkSpec`] entry is
1592/// resolved into its own `GroupParams` (with `group_idx ==
1593/// 1..=N`) via [`Self::from_composed`]. Both paths funnel through
1594/// [`Self::from_work_spec`] for the actual field copy.
1595///
1596/// `GroupParams` is the post-resolution shape — `num_workers` is a
1597/// concrete `usize` (not the `Option<usize>` that [`WorkSpec`]
1598/// carries), `affinity` is a concrete [`ResolvedAffinity`] (not
1599/// the [`AffinityIntent`] that [`WorkSpec`] carries). The spawn
1600/// pipeline operates on `GroupParams` exclusively so it never has
1601/// to deal with the unresolved intent/optional shapes that the
1602/// user-facing types expose.
1603///
1604/// `clone_mode` is shared across every group — the top-level
1605/// [`WorkloadConfig::clone_mode`] selects fork vs thread dispatch
1606/// for the entire workload, and [`WorkSpec`] carries no
1607/// `clone_mode` field of its own (composed entries inherit the
1608/// parent's mode; the [`SpawnGuard`]'s lifecycle assumes a single
1609/// dispatch path).
1610#[derive(Clone)]
1611pub(super) struct GroupParams {
1612 work_type: WorkType,
1613 sched_policy: SchedPolicy,
1614 mem_policy: MemPolicy,
1615 mpol_flags: MpolFlags,
1616 nice: Option<i32>,
1617 comm: Option<String>,
1618 uid: Option<u32>,
1619 gid: Option<u32>,
1620 numa_node: Option<u32>,
1621 affinity: ResolvedAffinity,
1622 num_workers: usize,
1623 group_idx: usize,
1624}
1625
1626impl GroupParams {
1627 /// Extract a [`GroupParams`] from a [`WorkSpec`] given the
1628 /// resolved sibling values. This is the single field-extraction
1629 /// site — both [`Self::primary`] and [`Self::from_composed`]
1630 /// funnel through here, so the field-by-field copy lives in one
1631 /// place.
1632 ///
1633 /// The caller is responsible for resolving the
1634 /// [`WorkSpec::num_workers`] `Option<usize>` to a concrete
1635 /// `usize` and the [`WorkSpec::affinity`] [`AffinityIntent`] to
1636 /// a concrete [`ResolvedAffinity`]. The remaining fields
1637 /// (`work_type`, `sched_policy`, `mem_policy`, `mpol_flags`,
1638 /// `nice`) are copied verbatim — they need no resolution
1639 /// because both [`WorkSpec`] and [`GroupParams`] carry them in
1640 /// their final runtime form (`nice` round-trips as
1641 /// `Option<i32>` so `None` continues to mean "skip the
1642 /// `setpriority(2)` call" at the spawn-time gate).
1643 fn from_work_spec(
1644 spec: &WorkSpec,
1645 group_idx: usize,
1646 resolved_affinity: ResolvedAffinity,
1647 resolved_num_workers: usize,
1648 ) -> Self {
1649 Self {
1650 work_type: spec.work_type.clone(),
1651 sched_policy: spec.sched_policy,
1652 mem_policy: spec.mem_policy.clone(),
1653 mpol_flags: spec.mpol_flags,
1654 nice: spec.nice,
1655 comm: spec.comm.as_ref().map(|c| c.to_string()),
1656 uid: spec.uid,
1657 gid: spec.gid,
1658 numa_node: spec.numa_node,
1659 affinity: resolved_affinity,
1660 num_workers: resolved_num_workers,
1661 group_idx,
1662 }
1663 }
1664
1665 /// Resolve an [`AffinityIntent`] to a [`ResolvedAffinity`] under
1666 /// the spawn-time gate: only `Inherit`, `Exact`, and
1667 /// `RandomSubset` carry enough information to resolve without
1668 /// scenario context (the caller supplies the `from` pool for
1669 /// `RandomSubset`, so per-worker sampling stays self-contained).
1670 /// Topology-aware variants (`SingleCpu`, `LlcAligned`,
1671 /// `CrossCgroup`, `SmtSiblingPair`) require a
1672 /// [`crate::topology::TestTopology`] / cpuset state that
1673 /// [`WorkloadHandle::spawn`] does not have, so they bail with an
1674 /// actionable diagnostic.
1675 ///
1676 /// `site` names the location of the affinity field for the bail
1677 /// message — `"WorkloadConfig::affinity"` for the primary group,
1678 /// `"composed[N].affinity"` for entries inside `composed`. Pinned
1679 /// across both call sites so the gate matches exactly and a
1680 /// future variant addition is rejected uniformly.
1681 pub(super) fn resolve_spawn_affinity(
1682 intent: &AffinityIntent,
1683 site: &str,
1684 ) -> Result<ResolvedAffinity> {
1685 match intent {
1686 AffinityIntent::Inherit => Ok(ResolvedAffinity::None),
1687 AffinityIntent::Exact(cpus) => {
1688 if cpus.is_empty() {
1689 anyhow::bail!(
1690 "{site} = AffinityIntent::Exact with empty CPU set \
1691 would produce EINVAL from sched_setaffinity; \
1692 use AffinityIntent::Inherit for no affinity \
1693 constraint",
1694 );
1695 }
1696 Ok(ResolvedAffinity::Fixed(cpus.clone()))
1697 }
1698 AffinityIntent::RandomSubset { from, count } => {
1699 if from.is_empty() {
1700 anyhow::bail!(
1701 "{site} = AffinityIntent::RandomSubset with empty \
1702 pool; use AffinityIntent::Inherit for no affinity \
1703 constraint",
1704 );
1705 }
1706 if *count == 0 {
1707 anyhow::bail!(
1708 "{site} = AffinityIntent::RandomSubset with \
1709 count=0; use AffinityIntent::Inherit for no \
1710 affinity constraint",
1711 );
1712 }
1713 Ok(ResolvedAffinity::Random {
1714 from: from.clone(),
1715 count: *count,
1716 })
1717 }
1718 AffinityIntent::SingleCpu
1719 | AffinityIntent::LlcAligned
1720 | AffinityIntent::CrossCgroup
1721 | AffinityIntent::SmtSiblingPair => {
1722 anyhow::bail!(
1723 "{site} = {:?} requires scenario context; use \
1724 AffinityIntent::Exact(set), \
1725 AffinityIntent::RandomSubset {{ from, count }}, \
1726 or AffinityIntent::Inherit when spawning directly \
1727 via WorkloadHandle::spawn. Topology-aware variants \
1728 resolve automatically inside #[ktstr_test] \
1729 scenarios.",
1730 intent,
1731 );
1732 }
1733 }
1734 }
1735
1736 /// Build the primary group's parameters from the top-level
1737 /// [`WorkloadConfig`] fields. `group_idx` is fixed to `0`.
1738 ///
1739 /// Synthesises a [`WorkSpec`] view of the top-level config
1740 /// fields and funnels through [`Self::from_work_spec`] so the
1741 /// field-by-field copy lives in exactly one place. The
1742 /// synthesised spec mirrors the resolved sibling values
1743 /// (`num_workers: Some(n)`, `affinity: Inherit`) — the spawn
1744 /// pipeline never reads it.
1745 ///
1746 /// `WorkloadConfig::affinity` is an [`AffinityIntent`]
1747 /// (type-unified with [`WorkSpec::affinity`]); resolution to
1748 /// [`ResolvedAffinity`] runs through
1749 /// [`Self::resolve_spawn_affinity`] under the same gate as
1750 /// [`Self::from_composed`]. Topology-aware variants
1751 /// (`SingleCpu`, `LlcAligned`, `CrossCgroup`, `SmtSiblingPair`)
1752 /// require scenario context; the scenario engine pre-resolves
1753 /// them via
1754 /// `crate::scenario::intent_for_spawn` (which round-trips
1755 /// `RandomSubset` verbatim and flattens topology-aware variants
1756 /// to `Exact`) before building [`WorkloadConfig`], so the gate
1757 /// only ever sees `Inherit`, `Exact`, or `RandomSubset` from
1758 /// this path.
1759 fn primary(config: &WorkloadConfig) -> Result<Self> {
1760 let resolved_affinity =
1761 Self::resolve_spawn_affinity(&config.affinity, "WorkloadConfig::affinity")?;
1762 let spec = WorkSpec {
1763 work_type: config.work_type.clone(),
1764 sched_policy: config.sched_policy,
1765 num_workers: Some(config.num_workers),
1766 affinity: AffinityIntent::Inherit,
1767 mem_policy: config.mem_policy.clone(),
1768 mpol_flags: config.mpol_flags,
1769 nice: config.nice,
1770 comm: config.comm.clone(),
1771 uid: config.uid,
1772 gid: config.gid,
1773 numa_node: config.numa_node,
1774 pcomm: None,
1775 workers_pct: None,
1776 };
1777 Ok(Self::from_work_spec(
1778 &spec,
1779 0,
1780 resolved_affinity,
1781 config.num_workers,
1782 ))
1783 }
1784
1785 /// Resolve a composed [`WorkSpec`] into per-group parameters,
1786 /// applying the spawn-time rules documented on
1787 /// [`WorkloadConfig::composed`]:
1788 ///
1789 /// - `num_workers` must be `Some(n)`; the `None` default
1790 /// resolved by the scenario engine via
1791 /// `Ctx::workers_per_cgroup` is unreachable here. A `None`
1792 /// value is rejected with an actionable diagnostic.
1793 /// - `affinity` resolution runs through
1794 /// [`Self::resolve_spawn_affinity`] —
1795 /// [`AffinityIntent::Inherit`] (mapped to
1796 /// [`ResolvedAffinity::None`]),
1797 /// [`AffinityIntent::Exact`] (mapped to
1798 /// [`ResolvedAffinity::Fixed`]), and
1799 /// [`AffinityIntent::RandomSubset`] (mapped to
1800 /// [`ResolvedAffinity::Random`]) are accepted; topology-aware
1801 /// variants are rejected.
1802 ///
1803 /// Composed entries inherit the parent
1804 /// [`WorkloadConfig::clone_mode`]; [`WorkSpec`] has no
1805 /// `clone_mode` field of its own.
1806 fn from_composed(spec: &WorkSpec, group_idx: usize) -> Result<Self> {
1807 if spec.pcomm.is_some() {
1808 anyhow::bail!(
1809 "composed[{}].pcomm: pcomm via WorkloadHandle::spawn is not supported; \
1810 use WorkloadHandle::spawn_pcomm_cgroup or CgroupDef (apply_setup) — \
1811 spawn always forks one process per worker and never coalesces into \
1812 a thread-group leader",
1813 group_idx - 1,
1814 );
1815 }
1816 let num_workers = spec.num_workers.ok_or_else(|| {
1817 anyhow::anyhow!(
1818 "composed[{}].num_workers must be set explicitly at spawn time \
1819 (the Some/None resolution via Ctx::workers_per_cgroup is only \
1820 available through the scenario engine; \
1821 WorkloadHandle::spawn requires a concrete count)",
1822 group_idx - 1,
1823 )
1824 })?;
1825 let site = format!("composed[{}].affinity", group_idx - 1);
1826 let affinity = Self::resolve_spawn_affinity(&spec.affinity, &site)?;
1827 Ok(Self::from_work_spec(spec, group_idx, affinity, num_workers))
1828 }
1829}
1830
1831/// Shared per-group admission rules common to every spawn entry
1832/// point. Validates only the rules that are dispatch-agnostic —
1833/// `worker_group_size` divisibility, `chain_pipe_depth >= 2`,
1834/// `IdleChurn` zero-duration rejections, `IpcVariance` zero-knob
1835/// rejections. Dispatch-specific compatibility checks (CloneMode
1836/// vs WorkType, pcomm vs WorkType) stay at each entry point's
1837/// admission block since their reasoning differs per dispatch.
1838///
1839/// Called from [`WorkloadHandle::spawn`] (per-group inside
1840/// `groups`) and [`WorkloadHandle::spawn_pcomm_cgroup`] (per-group
1841/// inside its own `groups`). Centralises the rules so a future
1842/// addition (e.g. a new `WorkType` zero-rejection) lives in one
1843/// place.
1844pub(super) fn validate_workload_admission(group: &GroupParams) -> Result<()> {
1845 if let Some(group_size) = group.work_type.worker_group_size()
1846 && (group.num_workers == 0 || !group.num_workers.is_multiple_of(group_size))
1847 {
1848 return Err(WorkTypeValidationError::NonDivisibleWorkerCount {
1849 name: group.work_type.name().to_string(),
1850 group_idx: group.group_idx,
1851 group_size,
1852 num_workers: group.num_workers,
1853 }
1854 .into());
1855 }
1856 if let Some(depth) = group.work_type.chain_pipe_depth()
1857 && depth < 2
1858 {
1859 return Err(WorkTypeValidationError::InsufficientWakeChainDepth {
1860 depth,
1861 group_idx: group.group_idx,
1862 }
1863 .into());
1864 }
1865 if let WorkType::IdleChurn {
1866 burst_duration,
1867 sleep_duration,
1868 ..
1869 } = group.work_type
1870 {
1871 if burst_duration.is_zero() {
1872 return Err(WorkTypeValidationError::ZeroBurstDuration {
1873 group_idx: group.group_idx,
1874 }
1875 .into());
1876 }
1877 if sleep_duration.is_zero() {
1878 return Err(WorkTypeValidationError::ZeroSleepDuration {
1879 group_idx: group.group_idx,
1880 }
1881 .into());
1882 }
1883 }
1884 if let WorkType::IpcVariance {
1885 hot_iters,
1886 cold_iters,
1887 period_iters,
1888 } = group.work_type
1889 {
1890 if hot_iters == 0 {
1891 return Err(WorkTypeValidationError::ZeroIpcVarianceParam {
1892 field: "hot_iters",
1893 group_idx: group.group_idx,
1894 }
1895 .into());
1896 }
1897 if cold_iters == 0 {
1898 return Err(WorkTypeValidationError::ZeroIpcVarianceParam {
1899 field: "cold_iters",
1900 group_idx: group.group_idx,
1901 }
1902 .into());
1903 }
1904 if period_iters == 0 {
1905 return Err(WorkTypeValidationError::ZeroIpcVarianceParam {
1906 field: "period_iters",
1907 group_idx: group.group_idx,
1908 }
1909 .into());
1910 }
1911 }
1912 if let WorkType::TimerLatency { interval_us } = group.work_type
1913 && interval_us == 0
1914 {
1915 return Err(WorkTypeValidationError::ZeroTimerInterval {
1916 group_idx: group.group_idx,
1917 }
1918 .into());
1919 }
1920 if let WorkType::NetTraffic { frame_bytes, .. } = group.work_type
1921 && !(60..=1514).contains(&frame_bytes)
1922 {
1923 return Err(WorkTypeValidationError::NetTrafficFrameBytes {
1924 frame_bytes,
1925 group_idx: group.group_idx,
1926 }
1927 .into());
1928 }
1929 if let WorkType::IrqWake { frame_bytes, .. } = group.work_type
1930 && !(60..=1514).contains(&frame_bytes)
1931 {
1932 return Err(WorkTypeValidationError::IrqWakeFrameBytes {
1933 frame_bytes,
1934 group_idx: group.group_idx,
1935 }
1936 .into());
1937 }
1938 Ok(())
1939}
1940
1941/// Spawn a single thread-mode worker via [`std::thread::Builder`].
1942///
1943/// The thread closure runs `worker_main` directly with the same
1944/// per-worker arguments the fork dispatch passes, except `stop` is
1945/// a per-worker `Arc<AtomicBool>` instead of the global [`STOP`].
1946/// Start rendezvous uses an `mpsc::sync_channel(0)` because every
1947/// worker needs to block until the parent calls
1948/// [`WorkloadHandle::start`]; the parent then sends `()` to each
1949/// worker's `start_tx` to unblock them in order.
1950///
1951/// Also uses a capacity-1 `mpsc::sync_channel(1)` tid-publish
1952/// channel so this function blocks (up to 2 s) until the worker
1953/// thread reaches the `gettid()` publish point. That makes
1954/// [`WorkloadHandle::worker_pids`] safe to call immediately after
1955/// [`WorkloadHandle::spawn`] returns without first calling
1956/// [`WorkloadHandle::start`] — the publish happens BEFORE the
1957/// start handshake, so post-spawn tid reads always observe the
1958/// gettid() value (or spawn would have bailed with a "did not
1959/// publish gettid() within 2 s" error).
1960///
1961/// SIGUSR1 is process-wide and useless for per-thread stop control,
1962/// so this path does not install a signal handler. The parent flips
1963/// `stop` directly from [`WorkloadHandle::stop_and_collect`].
1964#[allow(clippy::too_many_arguments)]
1965pub(super) fn spawn_thread_worker(
1966 guard: &mut SpawnGuard,
1967 group: &GroupParams,
1968 affinity: Option<BTreeSet<usize>>,
1969 worker_pipe_fds: Option<(i32, i32)>,
1970 worker_futex: Option<(*mut u32, usize)>,
1971 iter_slot: *mut AtomicU64,
1972 phase_epoch: *mut AtomicU32,
1973) -> Result<()> {
1974 use std::sync::Arc;
1975 use std::sync::atomic::AtomicI32;
1976 use std::sync::mpsc;
1977
1978 // SyncSender(0) — bounded rendezvous channel. The thread blocks
1979 // in `recv()` until the parent sends `()`; if the parent drops
1980 // the sender first (mid-spawn cleanup or early bail), `recv()`
1981 // returns `Err(Disconnected)` and the closure exits cleanly.
1982 let (start_tx, start_rx) = mpsc::sync_channel::<()>(0);
1983 // tid-publish handshake. Capacity 1 so the worker's send is
1984 // non-blocking; the parent's `recv_timeout` below blocks until
1985 // the worker reaches the publish point. Without this, the
1986 // post-spawn `WorkloadHandle::worker_pids()` race-reads the
1987 // initial 0 sentinel because the worker thread hasn't been
1988 // scheduled to the `tid_thread.store(my_tid, Release)` site yet
1989 // — surfaced as a spurious `Thread SpinWait worker reported
1990 // non-positive tid=0` test failure in
1991 // tests/worker_thread_integration.rs.
1992 let (tid_pub_tx, tid_pub_rx) = mpsc::sync_channel::<()>(1);
1993 let stop = Arc::new(AtomicBool::new(false));
1994 let tid = Arc::new(AtomicI32::new(0));
1995 // Per-worker exit eventfd: bumped by a Drop guard inside the
1996 // closure right before the thread returns its `WorkerReport`. The
1997 // parent's `join_thread_with_timeout` blocks in `epoll_wait` on
1998 // this fd instead of sleep-polling `is_finished`. Created with
1999 // `EFD_NONBLOCK` so the Drop-time `write` cannot block; counter
2000 // mode so a missed read just accumulates without losing the edge.
2001 let exit_evt = Arc::new(
2002 vmm_sys_util::eventfd::EventFd::new(libc::EFD_NONBLOCK)
2003 .context("create thread-worker exit eventfd")?,
2004 );
2005
2006 // Clone Arcs for the closure. The thread takes ownership of the
2007 // closure-side handles; the parent retains the originals via
2008 // ThreadWorker for stop signaling and tid reading.
2009 let stop_thread = Arc::clone(&stop);
2010 let tid_thread = Arc::clone(&tid);
2011 let exit_evt_thread = Arc::clone(&exit_evt);
2012 let work_type = group.work_type.clone();
2013 let sched_policy = group.sched_policy;
2014 let mem_policy = group.mem_policy.clone();
2015 let mpol_flags = group.mpol_flags;
2016 let nice = group.nice;
2017 let comm = group.comm.clone();
2018 let uid = group.uid;
2019 let gid = group.gid;
2020 let numa_node = group.numa_node;
2021 let group_idx = group.group_idx;
2022 let num_workers = group.num_workers;
2023
2024 // The closure must be `Send` to cross the thread boundary.
2025 // `worker_pipe_fds` is `Option<(i32, i32)>` (Copy + Send), but
2026 // `worker_futex` and `iter_slot` are raw pointers and not
2027 // `Send` by default. The module-level `SendFutexPtr` and
2028 // `SendIterSlotPtr` newtypes round-trip the addresses through
2029 // `usize` so the closure's capture set is genuinely Send (no
2030 // raw-pointer field appears in the closure type).
2031 let futex_send = SendFutexPtr::new(worker_futex);
2032 let iter_slot_send = SendIterSlotPtr::new(iter_slot);
2033 let phase_epoch_send = SendPhaseEpochPtr::new(phase_epoch);
2034
2035 let join = std::thread::Builder::new()
2036 .name(format!("ktstr-worker-g{group_idx}-{}", guard.threads.len()))
2037 .spawn(move || {
2038 // Drop guard: signal the exit eventfd as the closure
2039 // unwinds, regardless of whether `worker_main` returned
2040 // normally or panicked. The parent's
2041 // `join_thread_with_timeout` blocks in `epoll_wait` on
2042 // this fd; a panic that bypassed the explicit signal
2043 // would otherwise leave the parent waiting until the
2044 // safety timerfd fires. Drop runs even under unwinding,
2045 // so this guard captures both the normal and panic
2046 // paths.
2047 struct WorkerExitSignal(std::sync::Arc<vmm_sys_util::eventfd::EventFd>);
2048 impl Drop for WorkerExitSignal {
2049 fn drop(&mut self) {
2050 let _ = self.0.write(1);
2051 }
2052 }
2053 let _exit_signal = WorkerExitSignal(exit_evt_thread);
2054
2055 // Publish gettid() so the parent can address this task
2056 // for sched_setaffinity and report it from worker_pids.
2057 // gettid() is the kernel TID; getpid() would return the
2058 // shared tgid, which collides across threads.
2059 let my_tid: libc::pid_t = unsafe { libc::syscall(libc::SYS_gettid) as libc::pid_t };
2060 // Release pairs with Acquire on the parent's
2061 // `tid.load()` sites so any reader observing a non-zero
2062 // tid also sees the worker's post-start state. Cheap on
2063 // every supported target (release-store on the Arc's
2064 // underlying AtomicI32 is a single instruction).
2065 tid_thread.store(my_tid, Ordering::Release);
2066 // Notify the parent that tid is published. Capacity-1
2067 // channel means this send is non-blocking; the parent's
2068 // `recv_timeout` below blocks until this fires so
2069 // post-spawn callers see populated tids. Drop the
2070 // sender after send so any future replays are no-ops.
2071 let _ = tid_pub_tx.send(());
2072 drop(tid_pub_tx);
2073
2074 // Block on start rendezvous. `Err(_)` means the parent
2075 // dropped start_tx before sending — return a sentinel
2076 // WorkerReport without doing any work.
2077 if start_rx.recv().is_err() {
2078 return WorkerReport {
2079 tid: my_tid,
2080 completed: false,
2081 group_idx,
2082 ..WorkerReport::default()
2083 };
2084 }
2085
2086 // Re-cast usize addresses back into raw pointers for
2087 // worker_main. SAFETY: the ownership and lifetime
2088 // arguments documented on `SendFutexPtr` /
2089 // `SendIterSlotPtr` ensure these pointers are still
2090 // live when worker_main dereferences them.
2091 let futex = futex_send.into_raw();
2092 let slot = iter_slot_send.into_raw();
2093 let epoch = phase_epoch_send.into_raw();
2094
2095 worker_main(
2096 affinity,
2097 work_type,
2098 sched_policy,
2099 mem_policy,
2100 mpol_flags,
2101 nice,
2102 comm.as_deref(),
2103 uid,
2104 gid,
2105 numa_node,
2106 worker_pipe_fds,
2107 futex,
2108 slot,
2109 epoch,
2110 &stop_thread,
2111 group_idx,
2112 )
2113 })
2114 .with_context(|| {
2115 format!(
2116 "thread::spawn for worker {}/{} (group {}) failed",
2117 guard.threads.len() + 1,
2118 num_workers,
2119 group_idx,
2120 )
2121 })?;
2122
2123 // Block until the worker reaches the gettid() publish point.
2124 // 2 s deadline is generous: a fresh std::thread on a healthy
2125 // host reaches the closure body within microseconds; under
2126 // VM/CI pressure ~ms. Anything past 2 s indicates the worker
2127 // thread failed to schedule at all — surface as a typed error
2128 // rather than letting a downstream `worker_pids()` reader race-
2129 // read the initial 0 sentinel and tip over a tid-check assertion.
2130 match tid_pub_rx.recv_timeout(std::time::Duration::from_secs(2)) {
2131 Ok(()) => {}
2132 Err(mpsc::RecvTimeoutError::Timeout) => {
2133 anyhow::bail!(
2134 "spawn_thread_worker: worker {} (group {}) did not publish gettid() \
2135 within 2 s of thread::spawn — the worker thread failed to schedule. \
2136 Likely cause: host fd / thread-stack exhaustion, or the std runtime \
2137 thread pool is wedged. tid stays at 0; subsequent worker_pids() \
2138 reads would surface the sentinel.",
2139 guard.threads.len() + 1,
2140 group_idx,
2141 );
2142 }
2143 Err(mpsc::RecvTimeoutError::Disconnected) => {
2144 anyhow::bail!(
2145 "spawn_thread_worker: worker {} (group {}) closed tid-publish channel \
2146 without sending — the closure panicked before reaching the gettid() \
2147 publish point. Check the thread name's panic output for the cause.",
2148 guard.threads.len() + 1,
2149 group_idx,
2150 );
2151 }
2152 }
2153
2154 guard.threads.push(ThreadWorker {
2155 tid,
2156 stop,
2157 start_tx: Some(start_tx),
2158 join: Some(join),
2159 exit_evt,
2160 });
2161 Ok(())
2162}
2163
2164/// Per-group resource indices into the [`SpawnGuard`]'s flat
2165/// vectors used by [`spawn_pcomm_container`] to compute per-thread
2166/// pipe / chain / futex / iter-slot addresses inside the forked
2167/// container. Built by [`WorkloadHandle::spawn_pcomm_cgroup`] in
2168/// lockstep with the per-group resource allocation pass so the
2169/// container's threads can address their own group's resources from
2170/// the inherited shared regions.
2171#[derive(Clone, Copy, Debug)]
2172pub(super) struct PcommGroupResources {
2173 /// Offset into the iter_counters MAP_SHARED region for this
2174 /// group's first worker. Subsequent workers occupy
2175 /// `iter_offset + i` for `i in 0..num_workers`.
2176 pub iter_offset: usize,
2177 /// Index of this group's first pipe pair in `guard.pipe_pairs`
2178 /// (only meaningful when `needs_pipes` is true).
2179 pub pipe_pair_base: usize,
2180 /// Index of this group's first chain pipe in `guard.chain_pipes`
2181 /// (only meaningful when `chain_depth` is `Some(_)`).
2182 pub chain_pipes_base: usize,
2183 /// Index of this group's first futex region in `guard.futex_ptrs`
2184 /// (only meaningful when `needs_futex` is true).
2185 pub futex_ptrs_base: usize,
2186 /// True when the group's `WorkType` requires inter-worker pipe
2187 /// pairs (PipeIo / CachePipe).
2188 pub needs_pipes: bool,
2189 /// `Some(depth)` when the group's `WorkType` requires a chain
2190 /// pipe ring (WakeChain { wake: Pipe }); `None` otherwise.
2191 pub chain_depth: Option<usize>,
2192 /// True when the group's `WorkType` requires a MAP_SHARED futex
2193 /// region (FutexPingPong / FutexFanOut / FanOutCompute /
2194 /// MutexContention / etc.).
2195 pub needs_futex: bool,
2196 /// Worker-group-size for per-worker `pos` computation inside
2197 /// the futex region (e.g. 2 for FutexPingPong, the messenger /
2198 /// receiver split for FutexFanOut). Defaulted to 2 when the
2199 /// `WorkType` does not declare a group_size.
2200 pub futex_group_size: usize,
2201}
2202
2203/// Fork one thread-group leader hosting every worker thread for the
2204/// supplied groups: fork the leader, set its `comm` to `pcomm` via
2205/// `prctl(PR_SET_NAME)`, then spawn `groups[k].num_workers` worker
2206/// threads per group inside it. The [`WorkSpec::pcomm`] builder
2207/// rejects > 15 bytes (TASK_COMM_LEN-1), so the framework never
2208/// feeds the kernel a name `__set_task_comm` would truncate. Every
2209/// worker thread shares the leader's tgid, so
2210/// `task->group_leader->comm == pcomm` byte-for-byte. Models real
2211/// workloads like `chrome` (pcomm) hosting `ThreadPoolForeg` and
2212/// `GPU Process` (per-thread comm via [`WorkSpec::comm`]) or `java`
2213/// (pcomm) hosting `GC Thread` and `C2 CompilerThre`.
2214///
2215/// `groups` carries per-group [`GroupParams`] and `resources` carries
2216/// the parallel [`PcommGroupResources`] pre-built by
2217/// [`WorkloadHandle::spawn_pcomm_cgroup`] from the surrounding
2218/// per-group resource-allocation pass; the two slices have identical
2219/// length and `resources[k]` describes the indices owned by
2220/// `groups[k]`.
2221///
2222/// `container_uid` / `container_gid` are applied via `setresuid` /
2223/// `setresgid` once on the leader before spawning threads — the
2224/// leader takes the configured process credentials, and each worker
2225/// thread additionally re-applies its own merged credentials inside
2226/// `worker_main` at thread creation time (idempotent when the merge
2227/// produced the same value, which is the common case for a
2228/// CgroupDef-level default flowing through `merged_works`).
2229///
2230/// # Wire format
2231///
2232/// The parent collects via the same report pipe used by conventional
2233/// fork workers:
2234///
2235/// 1. After the start handshake, the leader writes one `b'r'` ready
2236/// byte to the report pipe after all threads are spawned, a
2237/// stronger guarantee than the conventional fork worker's
2238/// pre-loop byte.
2239/// 2. After all worker threads have joined, the leader serializes
2240/// `Vec<WorkerReport>` (one entry per worker thread, in
2241/// `(group_idx, within-group order)` traversal order) via
2242/// `serde_json::to_vec` and writes the bytes to the report pipe
2243/// before `_exit(0)`. The parent decodes via
2244/// `serde_json::from_slice::<Vec<WorkerReport>>`. The report pipe
2245/// is sized at 8 MiB; parent `read_to_end` drains concurrently
2246/// with the leader's `write_all` so payloads exceeding the pipe
2247/// size still drain correctly.
2248///
2249/// On a decode-failure or short payload, the parent emits one
2250/// sentinel report per expected worker so per-group filtering and
2251/// `assert_not_starved` see the correct cardinality.
2252///
2253/// # Lifecycle
2254///
2255/// - `PR_SET_PDEATHSIG(SIGKILL)`: when the parent dies, the kernel
2256/// sends SIGKILL to the LEADER thread (the calling task at fork
2257/// time). SIGKILL on any thread is fatal-to-tgid: the kernel
2258/// routes it through `complete_signal` → `do_group_exit` →
2259/// `zap_other_threads`, which sets `SIGNAL_GROUP_EXIT` on every
2260/// sibling thread and walks the thread list signalling each. So
2261/// the cascade to worker threads happens via
2262/// `zap_other_threads`, NOT via per-task PDEATHSIG inheritance
2263/// (PDEATHSIG itself is per-task and is cleared on clone). Net:
2264/// parent death → leader's SIGKILL → all worker threads die,
2265/// so the container cannot outlive the harness.
2266/// - `setpgid(0, 0)`: the leader becomes its own process group
2267/// leader so the parent's stop/kill `killpg` reaches any
2268/// descendants the workers spawn.
2269/// - SIGUSR1 handler installed pre-thread-spawn: any
2270/// `kill(leader_pid, SIGUSR1)` from the parent flips the leader's
2271/// STOP, which every worker thread observes via
2272/// `stop_requested(&STOP)` on its next loop check.
2273/// - `prctl(PR_SET_NAME, pcomm)`: sets the leader's `task->comm`,
2274/// which becomes `task->group_leader->comm` for every spawned
2275/// thread byte-for-byte (the [`WorkSpec::pcomm`] builder rejects
2276/// names longer than 15 bytes — `TASK_COMM_LEN - 1` — so the
2277/// framework never feeds the kernel a name `__set_task_comm`
2278/// would truncate).
2279/// - `setresuid(container_uid, ...)` / `setresgid(container_gid,
2280/// ...)` (if specified) apply once before the start-byte poll.
2281/// - Threads spawn AFTER the start byte arrives, so `start()`'s
2282/// serial start-byte writes still gate work entry (the parent
2283/// moves the leader to its cgroup before sending start).
2284#[allow(clippy::too_many_arguments)]
2285pub(super) fn spawn_pcomm_container(
2286 guard: &mut SpawnGuard,
2287 pcomm: &str,
2288 container_uid: Option<u32>,
2289 container_gid: Option<u32>,
2290 groups: &[GroupParams],
2291 resources: &[PcommGroupResources],
2292) -> Result<()> {
2293 debug_assert_eq!(
2294 groups.len(),
2295 resources.len(),
2296 "spawn_pcomm_container: groups / resources must have the same length",
2297 );
2298
2299 // Allocate the container's report and start pipes. Parent holds
2300 // report_fds[0] (read end) and start_fds[1] (write end); the
2301 // container holds the inverse ends post-fork. O_CLOEXEC matches
2302 // the defense-in-depth posture used by every other pipe in the
2303 // spawn pipeline — defends against future exec paths that could
2304 // otherwise leak the fd into a helper.
2305 let mut report_fds = [0i32; 2];
2306 if unsafe { libc::pipe2(report_fds.as_mut_ptr(), libc::O_CLOEXEC) } != 0 {
2307 anyhow::bail!(
2308 "pcomm container (pcomm={pcomm:?}): report pipe2 failed: {}",
2309 std::io::Error::last_os_error(),
2310 );
2311 }
2312 // Grow the report pipe to 8 MiB so the container's `write_all` of
2313 // the JSON-encoded `Vec<WorkerReport>` issues without blocking
2314 // for moderate worker counts. For larger payloads the parent's
2315 // `read_to_end` drains concurrently with the container's
2316 // `write_all`, so a pipe overflow degrades to extra wakeups
2317 // rather than truncation. Best-effort failure (older kernel /
2318 // EPERM) leaves the default size in place — large payloads
2319 // still drain via the concurrent flow but with more wake cycles.
2320 const REPORT_PIPE_SIZE: libc::c_int = 8 * 1024 * 1024;
2321 let prev_size = unsafe { libc::fcntl(report_fds[1], libc::F_SETPIPE_SZ, REPORT_PIPE_SIZE) };
2322 if prev_size < 0 {
2323 let err = std::io::Error::last_os_error();
2324 tracing::warn!(
2325 pcomm,
2326 requested_size = REPORT_PIPE_SIZE,
2327 %err,
2328 "F_SETPIPE_SZ on pcomm container report pipe failed; falling back \
2329 to default pipe capacity",
2330 );
2331 }
2332 let mut start_fds = [0i32; 2];
2333 if unsafe { libc::pipe2(start_fds.as_mut_ptr(), libc::O_CLOEXEC) } != 0 {
2334 unsafe {
2335 libc::close(report_fds[0]);
2336 libc::close(report_fds[1]);
2337 }
2338 anyhow::bail!(
2339 "pcomm container (pcomm={pcomm:?}): start pipe2 failed: {}",
2340 std::io::Error::last_os_error(),
2341 );
2342 }
2343
2344 // Block SIGUSR1 across fork so the container inherits a blocked
2345 // mask; the post-fork install + restore pattern matches the
2346 // conventional fork worker (see `spawn_group`'s fork dispatch
2347 // for the full rationale).
2348 let mut old_mask: libc::sigset_t = unsafe { std::mem::zeroed() };
2349 let mut block_mask: libc::sigset_t = unsafe { std::mem::zeroed() };
2350 let psm_block_rc = unsafe {
2351 libc::sigemptyset(&mut block_mask);
2352 libc::sigaddset(&mut block_mask, libc::SIGUSR1);
2353 libc::pthread_sigmask(libc::SIG_BLOCK, &block_mask, &mut old_mask)
2354 };
2355 if psm_block_rc != 0 {
2356 tracing::warn!(
2357 rc = psm_block_rc,
2358 pcomm,
2359 "pthread_sigmask(SIG_BLOCK, SIGUSR1) failed pre-fork for pcomm container; \
2360 container inherits unblocked SIGUSR1 and may terminate on default \
2361 action before installing handler",
2362 );
2363 }
2364
2365 // Sum of all groups' workers — matches `WorkerReport` cardinality
2366 // the parent expects.
2367 let total_workers: usize = groups.iter().map(|g| g.num_workers).sum();
2368
2369 let pid = unsafe { libc::fork() };
2370 match pid {
2371 -1 => {
2372 let psm_restore_rc = unsafe {
2373 libc::pthread_sigmask(libc::SIG_SETMASK, &old_mask, std::ptr::null_mut())
2374 };
2375 if psm_restore_rc != 0 {
2376 tracing::warn!(
2377 rc = psm_restore_rc,
2378 "pthread_sigmask(SIG_SETMASK) failed restoring mask after pcomm \
2379 container fork failure; SIGUSR1 may remain blocked in this thread",
2380 );
2381 }
2382 unsafe {
2383 libc::close(report_fds[0]);
2384 libc::close(report_fds[1]);
2385 libc::close(start_fds[0]);
2386 libc::close(start_fds[1]);
2387 }
2388 anyhow::bail!(
2389 "pcomm container (pcomm={pcomm:?}): fork failed: {}",
2390 std::io::Error::last_os_error(),
2391 );
2392 }
2393 0 => {
2394 // Container child. Mirror the conventional fork worker's
2395 // post-fork init: PDEATHSIG, getppid orphan check,
2396 // setpgid, SIGUSR1 handler install + mask restore. This
2397 // is the exact same defensive sequence; the only
2398 // structural difference is what happens AFTER the
2399 // start-byte handshake (thread spawn instead of inline
2400 // worker_main).
2401 unsafe {
2402 libc::prctl(libc::PR_SET_PDEATHSIG, libc::SIGKILL);
2403 }
2404 if std::env::var_os(crate::KTSTR_GUEST_INIT_ENV).is_none()
2405 && unsafe { libc::getppid() } == 1
2406 {
2407 exit_child_success();
2408 }
2409 unsafe {
2410 libc::setpgid(0, 0);
2411 }
2412 STOP.store(false, Ordering::Relaxed);
2413 let sig_prev = unsafe {
2414 libc::signal(
2415 libc::SIGUSR1,
2416 sigusr1_handler as *const () as libc::sighandler_t,
2417 )
2418 };
2419 if sig_prev == libc::SIG_ERR {
2420 let errno = std::io::Error::last_os_error();
2421 eprintln!(
2422 "ktstr: signal(SIGUSR1) install failed in pcomm container: {errno}; \
2423 graceful stop unavailable, killpg escalation will reap"
2424 );
2425 }
2426 let psm_unblock_rc = unsafe {
2427 libc::pthread_sigmask(libc::SIG_SETMASK, &old_mask, std::ptr::null_mut())
2428 };
2429 if psm_unblock_rc != 0 {
2430 eprintln!(
2431 "ktstr: pthread_sigmask(SIG_SETMASK) unblock failed in pcomm \
2432 container: rc={psm_unblock_rc}; SIGUSR1 stays blocked, killpg \
2433 escalation will reap"
2434 );
2435 }
2436 // Close the parent's ends. Keep report_fds[1] (the
2437 // container's write end) and start_fds[0] (the
2438 // container's read end).
2439 unsafe {
2440 libc::close(report_fds[0]);
2441 libc::close(start_fds[1]);
2442 }
2443
2444 // Inherited-fd sweep. The container forked from the
2445 // harness inherits every fd in the parent's table —
2446 // including pipe fds from OTHER live `WorkloadHandle`s
2447 // and any harness-only fds (tracing subscribers etc.).
2448 // Close everything not in the keep-set:
2449 // - 0 / 1 / 2 (stdio).
2450 // - `report_fds[1]` (this container's write end).
2451 // - `start_fds[0]` (this container's read end).
2452 // - This spawn's `pipe_pairs` (per-pair inter-worker
2453 // fds) and `chain_pipes` (per-chain ring fds) —
2454 // worker threads will use them once they start.
2455 //
2456 // MAP_SHARED regions (futex / iter_counters) are mmap
2457 // memory inherited via CLONE_VM, not fd entries; nothing
2458 // to keep on that axis. The sweep walks
2459 // `/proc/self/fd` once and uses raw `libc::close` on
2460 // each non-kept fd; failure to close (EBADF) is benign
2461 // — the entry is gone by the time we get to it. Heap
2462 // allocation here is acceptable post-fork (the
2463 // conventional fork worker also heap-allocates in
2464 // `worker_main`).
2465 let mut keep_fds: std::collections::BTreeSet<i32> = std::collections::BTreeSet::new();
2466 keep_fds.insert(0);
2467 keep_fds.insert(1);
2468 keep_fds.insert(2);
2469 keep_fds.insert(report_fds[1]);
2470 keep_fds.insert(start_fds[0]);
2471 for (ab, ba) in &guard.pipe_pairs {
2472 keep_fds.insert(ab[0]);
2473 keep_fds.insert(ab[1]);
2474 keep_fds.insert(ba[0]);
2475 keep_fds.insert(ba[1]);
2476 }
2477 for chain in &guard.chain_pipes {
2478 for pipe in chain {
2479 keep_fds.insert(pipe[0]);
2480 keep_fds.insert(pipe[1]);
2481 }
2482 }
2483 if let Ok(entries) = std::fs::read_dir("/proc/self/fd") {
2484 let mut to_close: Vec<i32> = Vec::new();
2485 for entry in entries.flatten() {
2486 if let Some(name) = entry.file_name().to_str()
2487 && let Ok(fd) = name.parse::<i32>()
2488 && !keep_fds.contains(&fd)
2489 {
2490 to_close.push(fd);
2491 }
2492 }
2493 for fd in to_close {
2494 unsafe {
2495 libc::close(fd);
2496 }
2497 }
2498 } else {
2499 eprintln!(
2500 "ktstr: pcomm container (pcomm={pcomm:?}): /proc/self/fd \
2501 read_dir failed; inherited-fd sweep skipped",
2502 );
2503 }
2504
2505 // prctl(PR_SET_NAME, pcomm). Setting it on the container's
2506 // tgid leader makes `task->group_leader->comm == pcomm`
2507 // for every spawned thread (kernel/sys.c::prctl_set_name).
2508 // The WorkSpec::pcomm builder rejects empty strings,
2509 // interior NULs, and names > 15 bytes (TASK_COMM_LEN - 1)
2510 // at declaration time, so neither CString construction
2511 // nor `__set_task_comm`'s `min(strlen, TASK_COMM_LEN-1)`
2512 // truncation can fire for any value that arrives here.
2513 // `unwrap_or_default` is a defensive fall-through to an
2514 // empty CString — the kernel accepts it; the leader's
2515 // comm is set to the empty string and the failure is
2516 // surfaced to stderr.
2517 let c_pcomm = std::ffi::CString::new(pcomm).unwrap_or_default();
2518 let prctl_rc = unsafe { libc::prctl(libc::PR_SET_NAME, c_pcomm.as_ptr()) };
2519 if prctl_rc != 0 {
2520 let errno = std::io::Error::last_os_error();
2521 eprintln!(
2522 "ktstr: prctl(PR_SET_NAME, {pcomm:?}) failed on pcomm container: {errno}",
2523 );
2524 }
2525
2526 // Apply container-level credentials. Order matters:
2527 // setresgid first (while still uid=0 we can change the
2528 // primary group), setresuid second (after the uid drop
2529 // the process loses CAP_SETGID and a later gid change
2530 // would EPERM). Failures are surfaced to stderr but do
2531 // not halt the container — the threads inside still
2532 // run with the inherited credentials, and worker_main's
2533 // own setresuid / setresgid calls (which run after this)
2534 // act as a second-line defense for any per-thread merge
2535 // that produced a different value than the container
2536 // default.
2537 if let Some(gid) = container_gid {
2538 let rc = unsafe { libc::setresgid(gid, gid, gid) };
2539 if rc != 0 {
2540 let errno = std::io::Error::last_os_error();
2541 eprintln!("ktstr: setresgid({gid}) failed on pcomm container: {errno}");
2542 }
2543 }
2544 if let Some(uid) = container_uid {
2545 let rc = unsafe { libc::setresuid(uid, uid, uid) };
2546 if rc != 0 {
2547 let errno = std::io::Error::last_os_error();
2548 eprintln!("ktstr: setresuid({uid}) failed on pcomm container: {errno}");
2549 }
2550 }
2551
2552 // Wait for the parent's start byte (poll with 30s
2553 // timeout — same budget as the conventional fork
2554 // worker). Match the conventional worker's behaviour:
2555 // on `poll <= 0` call exit_child_error so the parent's
2556 // collect path observes a missing report and emits
2557 // sentinels.
2558 let mut pfd = libc::pollfd {
2559 fd: start_fds[0],
2560 events: libc::POLLIN,
2561 revents: 0,
2562 };
2563 let ret = unsafe { libc::poll(&mut pfd, 1, 30_000) };
2564 if ret <= 0 {
2565 exit_child_error();
2566 }
2567 let mut buf = [0u8; 1];
2568 {
2569 let mut f = unsafe { std::fs::File::from_raw_fd(start_fds[0]) };
2570 let _ = f.read_exact(&mut buf);
2571 drop(f);
2572 }
2573
2574 // Unlike fork workers, the pcomm container spawns threads after
2575 // the start byte. A latched STOP from a SIGUSR1 race during mask
2576 // restore would produce N zero-work threads. Reset so threads
2577 // get a fair start.
2578 STOP.store(false, Ordering::Relaxed);
2579
2580 // Spawn worker threads for every group. Each thread
2581 // computes its own affinity / pipe-fd / futex /
2582 // iter_slot from its (group_idx, within-group index)
2583 // pair, matching the conventional fork loop's per-worker
2584 // computation but inside the container's address space
2585 // (so threads share the parent-allocated MAP_SHARED
2586 // regions and inter-worker pipes inherited via fork).
2587 // Each spawned thread gets its own `EventFd` cloned into
2588 // the closure. A `WorkerExitSignal` Drop guard inside the
2589 // closure writes 1 to that fd as the closure unwinds —
2590 // covering both the normal-return and panic paths. The
2591 // container's join phase below `epoll_wait`s on every
2592 // eventfd, so a thread's exit edge is delivered without
2593 // any polling sleep. This mirrors `join_thread_with_timeout`'s
2594 // pattern but without a timer fd: the parent's
2595 // `stop_and_collect` SIGKILL is the only timeout authority,
2596 // per the user's "event-driven only, no sleeps or
2597 // timeouts" rule.
2598 let mut joins: Vec<(
2599 std::thread::JoinHandle<WorkerReport>,
2600 std::sync::Arc<vmm_sys_util::eventfd::EventFd>,
2601 )> = Vec::with_capacity(total_workers);
2602 // Snapshot the guard's resource bookkeeping we need
2603 // inside each thread closure. The guard is borrowed
2604 // mutably by the surrounding `spawn_pcomm_cgroup` and
2605 // dropped on the parent side after the function returns;
2606 // after fork the container has its own copy of every
2607 // field, and these snapshots are clones owned by the
2608 // container's process so the closures can capture them
2609 // without aliasing the guard.
2610 let pipe_pairs_snapshot: Vec<([i32; 2], [i32; 2])> = guard.pipe_pairs.clone();
2611 let chain_pipes_snapshot: Vec<Vec<[i32; 2]>> = guard.chain_pipes.clone();
2612 let futex_ptrs_snapshot: Vec<*mut u32> = guard.futex_ptrs.clone();
2613 let iter_counters_base = guard.iter_counters;
2614 let phase_epoch_base = guard.phase_epoch;
2615
2616 for (group, res) in groups.iter().zip(resources.iter()) {
2617 let num_workers = group.num_workers;
2618 for i in 0..num_workers {
2619 // Per-thread affinity resolution. Random-subset
2620 // affinity samples a fresh subset per thread, so
2621 // each call produces an independent BTreeSet.
2622 // Fixed affinity returns the same BTreeSet for
2623 // every thread.
2624 //
2625 // resolve_affinity bails on caller bugs (count==0,
2626 // empty pool). In the pcomm container's forked-
2627 // child context we cannot propagate Err up; the
2628 // convention is eprintln + [`exit_child_error`]
2629 // (the named helper used by every forked-child
2630 // failure site in this file — start-byte
2631 // poll-timeout bail, EventFd::new bail,
2632 // thread::Builder::spawn bail) so the parent
2633 // collects a missing-report rather than silently
2634 // spawning workers without the affinity the test
2635 // author requested. The prior degrade-to-None
2636 // behaviour was a silent-drop bug we close here.
2637 let affinity = match resolve_affinity(&group.affinity) {
2638 Ok(a) => a,
2639 Err(e) => {
2640 eprintln!(
2641 "ktstr: pcomm container (group {}): resolve_affinity \
2642 for thread {i}/{num_workers} failed: {e:#}. \
2643 Aborting container — refusing to spawn workers \
2644 without the requested affinity (no silent drops).",
2645 group.group_idx,
2646 );
2647 // exit_group(2) tears down every sibling
2648 // worker thread spawned in earlier loop
2649 // iterations along with this one — the
2650 // intended fail-stop behaviour (parent
2651 // observes a missing report and surfaces
2652 // a sentinel).
2653 exit_child_error();
2654 }
2655 };
2656
2657 let worker_pipe_fds: Option<(i32, i32)> = if res.needs_pipes {
2658 let pair_idx = res.pipe_pair_base + i / 2;
2659 let (ab, ba) = &pipe_pairs_snapshot[pair_idx];
2660 if i % 2 == 0 {
2661 Some((ba[0], ab[1]))
2662 } else {
2663 Some((ab[0], ba[1]))
2664 }
2665 } else if let Some(depth) = res.chain_depth
2666 && depth > 0
2667 {
2668 let chain_idx = res.chain_pipes_base + i / depth;
2669 let stage = i % depth;
2670 let prev_stage = (stage + depth - 1) % depth;
2671 let chain = &chain_pipes_snapshot[chain_idx];
2672 Some((chain[prev_stage][0], chain[stage][1]))
2673 } else {
2674 None
2675 };
2676
2677 let worker_futex: Option<(*mut u32, usize)> = if res.needs_futex {
2678 let futex_group_idx = res.futex_ptrs_base + i / res.futex_group_size;
2679 let pos = i % res.futex_group_size;
2680 Some((futex_ptrs_snapshot[futex_group_idx], pos))
2681 } else {
2682 None
2683 };
2684
2685 let iter_slot: *mut AtomicU64 = if !iter_counters_base.is_null() {
2686 unsafe { iter_counters_base.add(res.iter_offset + i) }
2687 } else {
2688 std::ptr::null_mut()
2689 };
2690
2691 // Round raw pointers through Send-newtypes so
2692 // the closure's auto-Send check passes (same
2693 // wrapper pattern `spawn_thread_worker` uses).
2694 let futex_send = SendFutexPtr::new(worker_futex);
2695 let iter_slot_send = SendIterSlotPtr::new(iter_slot);
2696 let phase_epoch_send = SendPhaseEpochPtr::new(phase_epoch_base);
2697
2698 let work_type = group.work_type.clone();
2699 let sched_policy = group.sched_policy;
2700 let mem_policy = group.mem_policy.clone();
2701 let mpol_flags = group.mpol_flags;
2702 let nice = group.nice;
2703 let comm = group.comm.clone();
2704 let uid = group.uid;
2705 let gid = group.gid;
2706 let numa_node = group.numa_node;
2707 let group_idx = group.group_idx;
2708
2709 // Per-thread exit eventfd. Cloned into the
2710 // closure as `Arc<EventFd>`; the closure-local
2711 // `WorkerExitSignal` Drop guard writes 1 to the
2712 // fd on every exit path (normal return AND
2713 // panic). The container's join phase below
2714 // `epoll_wait`s on every eventfd to deliver the
2715 // exit edge without polling. EFD_NONBLOCK keeps
2716 // the Drop-time `write` from blocking — counter
2717 // mode means the writer just bumps the counter
2718 // without losing the edge.
2719 let exit_evt = match vmm_sys_util::eventfd::EventFd::new(libc::EFD_NONBLOCK) {
2720 Ok(efd) => std::sync::Arc::new(efd),
2721 Err(e) => {
2722 // Hard fail: continuing with fewer
2723 // threads silently breaks the
2724 // workload's worker count and the
2725 // parent's report count. exit_child_error
2726 // so the parent's sentinel path observes
2727 // a missing payload and emits one
2728 // sentinel per expected report. Reuses
2729 // the same fail-stop contract as the
2730 // start-byte poll timeout (`ret <=
2731 // 0`).
2732 eprintln!(
2733 "ktstr: pcomm container (group {}): EventFd::new for \
2734 worker {}/{num_workers} failed: {e}; aborting container",
2735 group.group_idx,
2736 i + 1,
2737 );
2738 exit_child_error();
2739 }
2740 };
2741 let exit_evt_thread = std::sync::Arc::clone(&exit_evt);
2742
2743 let join = std::thread::Builder::new()
2744 .name(format!("ktstr-pcomm-g{group_idx}-{i}"))
2745 .spawn(move || {
2746 // Drop guard: bumps the eventfd as the
2747 // closure unwinds. The write is
2748 // non-blocking (EFD_NONBLOCK) and the
2749 // counter accumulates, so a missed
2750 // read just queues the next read's
2751 // value — no edge loss. Drop runs
2752 // under both normal return and
2753 // unwinding (panic), so the container's
2754 // join phase observes EVERY thread's
2755 // exit, including panicked ones.
2756 struct WorkerExitSignal(std::sync::Arc<vmm_sys_util::eventfd::EventFd>);
2757 impl Drop for WorkerExitSignal {
2758 fn drop(&mut self) {
2759 let _ = self.0.write(1);
2760 }
2761 }
2762 let _exit_signal = WorkerExitSignal(exit_evt_thread);
2763
2764 let futex = futex_send.into_raw();
2765 let slot = iter_slot_send.into_raw();
2766 let epoch = phase_epoch_send.into_raw();
2767 worker_main(
2768 affinity,
2769 work_type,
2770 sched_policy,
2771 mem_policy,
2772 mpol_flags,
2773 nice,
2774 comm.as_deref(),
2775 uid,
2776 gid,
2777 numa_node,
2778 worker_pipe_fds,
2779 futex,
2780 slot,
2781 epoch,
2782 &STOP,
2783 group_idx,
2784 )
2785 });
2786 match join {
2787 Ok(j) => joins.push((j, exit_evt)),
2788 Err(e) => {
2789 // Hard fail: see EventFd path above.
2790 // A partially-spawned container leaves
2791 // the parent guessing about which
2792 // reports are real vs missing;
2793 // exit_child_error collapses to the
2794 // parent's sentinel path with
2795 // deterministic cardinality.
2796 eprintln!(
2797 "ktstr: pcomm container (group {}): thread::spawn for \
2798 worker {}/{num_workers} failed: {e}; aborting container",
2799 group.group_idx,
2800 i + 1,
2801 );
2802 exit_child_error();
2803 }
2804 }
2805 }
2806 }
2807
2808 // Publish the ready byte AFTER every worker thread is
2809 // alive. The parent's barrier polls every report fd for
2810 // POLLIN with a bounded deadline; the byte's correct
2811 // semantic is "the container has spawned every worker
2812 // thread and they are now running" — earlier (pre-spawn)
2813 // is observable to the parent before threads exist, which
2814 // races with `set_affinity(idx)` / `worker_pids()` calls
2815 // the harness may issue between the parent's barrier wake
2816 // and the work loop. Each thread starts work immediately
2817 // upon spawn (no per-thread handshake gate) since the
2818 // CONTAINER's start-byte poll above is the workload's
2819 // single start gate; once the container observes the
2820 // start byte, every thread it spawns is by construction
2821 // intended to run. Done as a raw `libc::write` to avoid
2822 // taking ownership of `report_fds[1]` (we still need it
2823 // for the JSON write below).
2824 let ready_byte: u8 = b'r';
2825 unsafe {
2826 libc::write(
2827 report_fds[1],
2828 &ready_byte as *const u8 as *const libc::c_void,
2829 1,
2830 );
2831 }
2832
2833 // Join every thread via `epoll_wait` on per-thread exit
2834 // eventfds. Each thread's `WorkerExitSignal` Drop guard
2835 // writes 1 to its fd as the closure unwinds (normal
2836 // return AND panic), so the container's `epoll_wait`
2837 // observes EVERY thread's exit edge without polling.
2838 // No timeout: per the user's "event-driven only, no
2839 // sleeps or timeouts" rule, the container blocks in
2840 // `epoll_wait(-1)` until each fd fires. The parent's
2841 // `stop_and_collect` 5s collect deadline + SIGKILL is
2842 // the only timeout authority; if a thread hangs (no
2843 // Drop guard fires — no realistic path to that under
2844 // the closures we spawn), the parent SIGKILLs the
2845 // container, the kernel tears down every thread, and
2846 // the container exits via the signal.
2847 //
2848 // Setup-failure fallback: if `Epoll::new` or any
2849 // `epoll.ctl(Add)` fails, fall back to blocking
2850 // `JoinHandle::join` per thread — that itself is
2851 // event-driven from the kernel's perspective (the
2852 // syscall blocks on the thread's exit signal, no
2853 // userspace polling). The fallback path is rare
2854 // (epoll_create1 failure means out of fds or out of
2855 // memory, both of which are pre-existing failure modes
2856 // the conventional fork worker also has).
2857 use std::os::unix::io::AsRawFd;
2858 use vmm_sys_util::epoll::{ControlOperation, Epoll, EpollEvent, EventSet};
2859 let mut indexed_joins: Vec<(
2860 usize,
2861 std::thread::JoinHandle<WorkerReport>,
2862 std::sync::Arc<vmm_sys_util::eventfd::EventFd>,
2863 )> = joins
2864 .into_iter()
2865 .enumerate()
2866 .map(|(i, (j, evt))| (i, j, evt))
2867 .collect();
2868 let mut reports: Vec<WorkerReport> = Vec::with_capacity(indexed_joins.len());
2869
2870 let epoll_setup: Option<Epoll> = match Epoll::new() {
2871 Ok(ep) => {
2872 let mut ok = true;
2873 for (idx, _, evt) in &indexed_joins {
2874 if let Err(e) = ep.ctl(
2875 ControlOperation::Add,
2876 evt.as_raw_fd(),
2877 EpollEvent::new(EventSet::IN, *idx as u64),
2878 ) {
2879 eprintln!(
2880 "ktstr: pcomm container (pcomm={pcomm:?}): epoll.ctl(Add) \
2881 for thread {idx} failed: {e}; falling back to blocking \
2882 join per thread",
2883 );
2884 ok = false;
2885 break;
2886 }
2887 }
2888 if ok { Some(ep) } else { None }
2889 }
2890 Err(e) => {
2891 eprintln!(
2892 "ktstr: pcomm container (pcomm={pcomm:?}): Epoll::new failed: {e}; \
2893 falling back to blocking join per thread",
2894 );
2895 None
2896 }
2897 };
2898
2899 if let Some(epoll) = epoll_setup {
2900 // Event-driven join loop. `epoll_wait(-1, …)` blocks
2901 // until at least one eventfd fires, which only
2902 // happens when a thread's `WorkerExitSignal` Drop
2903 // guard runs. The wake delivers `events[k].data()`
2904 // as the index of the finished thread; we
2905 // deregister, locate the entry, and `join()` it
2906 // (the `join()` call may briefly wait for the OS
2907 // thread to fully exit after the Drop guard, but
2908 // that wait is itself event-driven inside the
2909 // kernel — no userspace polling).
2910 let mut events_buf: Vec<EpollEvent> =
2911 vec![EpollEvent::default(); indexed_joins.len().max(1)];
2912 while !indexed_joins.is_empty() {
2913 let n = match epoll.wait(-1, &mut events_buf) {
2914 Ok(n) => n,
2915 Err(e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
2916 Err(e) => {
2917 eprintln!(
2918 "ktstr: pcomm container (pcomm={pcomm:?}): epoll.wait \
2919 failed: {e}; falling back to blocking join for the \
2920 remaining {} thread(s)",
2921 indexed_joins.len(),
2922 );
2923 break;
2924 }
2925 };
2926 for ev in &events_buf[..n] {
2927 let target_idx = ev.data() as usize;
2928 let pos = match indexed_joins
2929 .iter()
2930 .position(|(idx, _, _)| *idx == target_idx)
2931 {
2932 Some(p) => p,
2933 None => continue,
2934 };
2935 let (idx, j, evt) = indexed_joins.swap_remove(pos);
2936 if let Err(e) = epoll.ctl(
2937 ControlOperation::Delete,
2938 evt.as_raw_fd(),
2939 EpollEvent::default(),
2940 ) {
2941 eprintln!(
2942 "ktstr: pcomm container (pcomm={pcomm:?}): epoll.ctl(Del) \
2943 for thread {idx} failed: {e}",
2944 );
2945 }
2946 let _ = evt.read();
2947 match j.join() {
2948 Ok(r) => reports.push(r),
2949 Err(payload) => {
2950 let msg = extract_panic_payload(payload);
2951 eprintln!(
2952 "ktstr: pcomm container (pcomm={pcomm:?}): thread {idx} \
2953 panicked: {msg}",
2954 );
2955 reports.push(WorkerReport {
2956 completed: false,
2957 exit_info: Some(WorkerExitInfo::Panicked(msg)),
2958 ..WorkerReport::default()
2959 });
2960 }
2961 }
2962 }
2963 }
2964 }
2965 // Either the epoll path completed (drained
2966 // `indexed_joins`) or it bailed mid-flight; either way,
2967 // any remaining handles are joined by the kernel-blocking
2968 // `JoinHandle::join` below. `join` is event-driven inside
2969 // the kernel (futex on the thread's TID); no userspace
2970 // polling.
2971 for (idx, j, _evt) in indexed_joins {
2972 match j.join() {
2973 Ok(r) => reports.push(r),
2974 Err(payload) => {
2975 let msg = extract_panic_payload(payload);
2976 eprintln!(
2977 "ktstr: pcomm container (pcomm={pcomm:?}): thread {idx} \
2978 panicked: {msg}",
2979 );
2980 reports.push(WorkerReport {
2981 completed: false,
2982 exit_info: Some(WorkerExitInfo::Panicked(msg)),
2983 ..WorkerReport::default()
2984 });
2985 }
2986 }
2987 }
2988
2989 // Encode and write the report stream as a single
2990 // `Vec<WorkerReport>` JSON document via `serde_json`.
2991 // serde_json for the pcomm `Vec<WorkerReport>`; the
2992 // fork-mode workers use postcard for the single
2993 // `WorkerReport`. The pcomm container is a
2994 // fork-mode child and its payload sits on the same
2995 // per-child pipe used by every other forked worker.
2996 // The parent decodes via
2997 // `serde_json::from_slice::<Vec<WorkerReport>>`. Encode
2998 // failures fall through to exit_child_success — the
2999 // parent's sentinel path handles missing / truncated
3000 // payloads.
3001 // tracing is unsafe in a forked child (the parent's
3002 // subscriber may hold a lock); use eprintln + raw fd.
3003 let bytes = match serde_json::to_vec(&reports) {
3004 Ok(v) => v,
3005 Err(e) => {
3006 eprintln!(
3007 "ktstr: pcomm container (pcomm={pcomm:?}): serde_json encode \
3008 of {} reports failed: {e}",
3009 reports.len(),
3010 );
3011 Vec::new()
3012 }
3013 };
3014 {
3015 let mut f = unsafe { std::fs::File::from_raw_fd(report_fds[1]) };
3016 if let Err(e) = f.write_all(&bytes) {
3017 eprintln!(
3018 "ktstr: pcomm container (pcomm={pcomm:?}): report write_all \
3019 of {} bytes failed: {e}",
3020 bytes.len(),
3021 );
3022 }
3023 drop(f);
3024 }
3025 exit_child_success();
3026 }
3027 child_pid => {
3028 // Parent. Restore signal mask, close the wrong ends,
3029 // record the container in `guard.children` so its
3030 // lifecycle (kill on bail / collect on stop) is shared
3031 // with conventional workers.
3032 let psm_parent_restore_rc = unsafe {
3033 libc::pthread_sigmask(libc::SIG_SETMASK, &old_mask, std::ptr::null_mut())
3034 };
3035 if psm_parent_restore_rc != 0 {
3036 tracing::warn!(
3037 rc = psm_parent_restore_rc,
3038 "pthread_sigmask(SIG_SETMASK) failed restoring mask in parent \
3039 post-pcomm-fork; SIGUSR1 stays blocked in this thread for \
3040 the lifetime of the workload",
3041 );
3042 }
3043 unsafe {
3044 libc::close(report_fds[1]);
3045 libc::close(start_fds[0]);
3046 }
3047 // Per-group layout for sentinel distribution. Each
3048 // entry is `(group_idx, num_workers)`; total report
3049 // count is the sum of `num_workers` across the layout
3050 // (== `total_workers` invariant). When the parent
3051 // emits sentinels for a missing JSON payload, the
3052 // layout drives per-group_idx tagging so per-group
3053 // filters partition correctly even on a failure.
3054 let group_layout: Vec<(usize, usize)> = groups
3055 .iter()
3056 .map(|g| (g.group_idx, g.num_workers))
3057 .collect();
3058 debug_assert_eq!(
3059 group_layout.iter().map(|(_, n)| n).sum::<usize>(),
3060 total_workers,
3061 "spawn_pcomm_container: group_layout total must match total_workers",
3062 );
3063 guard.children.push(ForkedChild {
3064 pid: child_pid,
3065 report_fd: report_fds[0],
3066 start_fd: start_fds[1],
3067 kind: ForkedChildKind::PcommContainer {
3068 groups: group_layout,
3069 },
3070 });
3071 Ok(())
3072 }
3073 }
3074}
3075
3076/// Internal dispatch shape resolved from
3077/// [`WorkloadConfig::clone_mode`] inside [`WorkloadHandle::spawn`].
3078pub(super) enum Dispatch {
3079 Fork,
3080 Thread,
3081}
3082
3083impl WorkloadHandle {
3084 /// Fork one thread-group leader hosting every worker in `works`
3085 /// as worker threads inside a single forked process. Used by
3086 /// `apply_setup` when a `CgroupDef` declares `pcomm` — every
3087 /// `WorkSpec` in the same `CgroupDef` is coalesced into one
3088 /// thread-group leader whose `task->comm` carries `pcomm`
3089 /// exactly (the [`WorkSpec::pcomm`] builder rejects > 15 bytes
3090 /// — `TASK_COMM_LEN - 1` — so the framework never feeds the
3091 /// kernel a name `__set_task_comm` would truncate). Every
3092 /// spawned thread reads its `task->group_leader->comm` as
3093 /// `pcomm` for the leader's lifetime.
3094 ///
3095 /// `works` must already be fully resolved: each entry's
3096 /// `num_workers` must be `Some(_)` and `affinity` must be
3097 /// non-topology-aware (Inherit / Exact / RandomSubset).
3098 /// `apply_setup` runs the standard scenario-engine resolution
3099 /// (`resolve_num_workers` + `intent_for_spawn`) before calling
3100 /// in.
3101 ///
3102 /// `container_uid` / `container_gid` apply to the leader's
3103 /// process credentials via `setresuid` / `setresgid` once,
3104 /// inside the forked leader, before threads spawn. Each worker
3105 /// thread additionally re-applies its merged uid/gid inside
3106 /// `worker_main` at thread creation time; for the common case
3107 /// of a single CgroupDef-level default flowing through
3108 /// `merged_works`, the per-thread call is idempotent.
3109 ///
3110 /// On `works.is_empty()` the function returns a handle with no
3111 /// children — no fork, no resource allocation. Same for the
3112 /// empty-thread case across ALL groups (every `num_workers ==
3113 /// 0`): the leader is skipped because there is no work to host.
3114 /// This matches the `pcomm_zero_workers_no_container_spawn`
3115 /// contract.
3116 ///
3117 /// Group-level admission rejects WorkType variants that conflict
3118 /// with the threaded shape (every worker shares the leader's
3119 /// tgid):
3120 ///
3121 /// - [`WorkType::ForkExit`]: a fork from a thread of a
3122 /// multi-threaded process inherits all locks held by other
3123 /// threads at fork time, which the child cannot release;
3124 /// safe-but-degraded use cases would still need
3125 /// `CloneMode::Fork` for clean lock-free child state.
3126 /// - [`WorkType::CgroupChurn`]: writing the worker tid to
3127 /// `cgroup.procs` migrates the entire leader tgid (every
3128 /// sibling thread) — the test loses control of its own
3129 /// cgroup placement.
3130 pub fn spawn_pcomm_cgroup(
3131 pcomm: &str,
3132 container_uid: Option<u32>,
3133 container_gid: Option<u32>,
3134 works: &[WorkSpec],
3135 ) -> Result<Self> {
3136 // Empty pcomm input is treated as "no pcomm" by the
3137 // apply_setup caller; reject here as a safety belt — every
3138 // production caller has already filtered empty strings.
3139 if pcomm.is_empty() {
3140 anyhow::bail!(
3141 "spawn_pcomm_cgroup: pcomm must be a non-empty string; \
3142 the caller (apply_setup) treats `Some(\"\")` as \
3143 `None` and falls through to the conventional fork \
3144 spawn — empty here is a programmer error.",
3145 );
3146 }
3147
3148 // Structural mem_policy validation per-spec — same gate as
3149 // [`WorkloadHandle::spawn`]'s [`WorkloadConfig::validate`],
3150 // applied at the second public spawn entry point. Closes the
3151 // bypass that would otherwise allow invalid mem_policy via
3152 // this entry to reach `apply_mempolicy_with_flags`'s silent-
3153 // skip arm. The `exit_group(2)` thread-mode unsoundness
3154 // documented on [`WorkloadConfig::validate`] does not apply
3155 // here (pcomm-container workers run inside a forked container
3156 // with a new tgid distinct from the test runner's — the
3157 // container forks once via `libc::fork`, then workers are
3158 // spawned as threads sharing the container's tgid; a
3159 // hypothetical inner `_exit(1)` would only tear down the
3160 // container, not the test runner), but the validation
3161 // symmetry between the two pub entries prevents future
3162 // refactors from inadvertently re-opening the bypass.
3163 for (i, spec) in works.iter().enumerate() {
3164 spec.mem_policy.validate().map_err(|e| {
3165 anyhow::anyhow!("WorkloadHandle::spawn_pcomm_cgroup: works[{i}].mem_policy: {e}",)
3166 })?;
3167 }
3168
3169 // Build a `GroupParams` per `WorkSpec` using the same
3170 // resolver `WorkloadHandle::spawn` uses for composed
3171 // entries. group_idx is the entry's position in `works` (0-
3172 // based); the parent has no notion of "primary" inside a
3173 // pcomm container — every entry is a peer.
3174 let mut groups: Vec<GroupParams> = Vec::with_capacity(works.len());
3175 for (i, spec) in works.iter().enumerate() {
3176 let num_workers = spec.num_workers.ok_or_else(|| {
3177 anyhow::anyhow!(
3178 "spawn_pcomm_cgroup: works[{i}].num_workers must be set explicitly \
3179 (apply_setup runs resolve_num_workers before calling in)",
3180 )
3181 })?;
3182 let site = format!("works[{i}].affinity");
3183 let affinity = GroupParams::resolve_spawn_affinity(&spec.affinity, &site)?;
3184 groups.push(GroupParams::from_work_spec(spec, i, affinity, num_workers));
3185 }
3186
3187 // Per-group admission. Mirrors `WorkloadHandle::spawn` for
3188 // shared rules (worker_group_size divisibility,
3189 // chain-depth-< 2, IdleChurn / IpcVariance zero rejection)
3190 // and adds pcomm-specific rejections (ForkExit,
3191 // CgroupChurn) since the container's threaded shape —
3192 // every worker is a thread of one forked tgid sharing one
3193 // fd table — introduces hazards those variants cannot
3194 // tolerate.
3195 for group in &groups {
3196 if matches!(group.work_type, WorkType::ForkExit) {
3197 anyhow::bail!(
3198 "WorkSpec::pcomm is incompatible with WorkType::ForkExit \
3199 (works[{}]): a fork from a thread of a multi-threaded \
3200 container inherits all locks held by sibling threads at \
3201 fork time, producing undefined behaviour for any libc \
3202 primitive in the child. Drop pcomm or pick a different \
3203 work type.",
3204 group.group_idx,
3205 );
3206 }
3207 if matches!(group.work_type, WorkType::CgroupChurn { .. }) {
3208 anyhow::bail!(
3209 "WorkSpec::pcomm is incompatible with WorkType::CgroupChurn \
3210 (works[{}]): CgroupChurn writes the worker tid to \
3211 `cgroup.procs`, which the kernel resolves to the whole \
3212 tgid and migrates the entire pcomm container (every \
3213 sibling thread). Drop pcomm or pick a different work type.",
3214 group.group_idx,
3215 );
3216 }
3217 validate_workload_admission(group)?;
3218 }
3219
3220 // No-work shortcut: every group has zero workers (or the
3221 // works list itself is empty). Fork would produce a
3222 // container that blocks on the start byte forever and
3223 // outlives the test handle; skip it. The handle still
3224 // has a sensible Drop (empty children/threads/regions).
3225 let total_workers: usize = groups.iter().map(|g| g.num_workers).sum();
3226 if total_workers == 0 {
3227 return Ok(SpawnGuard::new().into_handle());
3228 }
3229
3230 // All failable acquisitions route through `guard`. If any
3231 // `?` returns early, the guard's Drop SIGKILLs+reaps any
3232 // already-forked container, closes open pipe fds, and
3233 // munmaps the shared regions — so no leak on a mid-spawn
3234 // error path.
3235 let mut guard = SpawnGuard::new();
3236
3237 // Per-worker iteration counter region (MAP_SHARED). Sized
3238 // for ALL groups' workers laid out contiguously; matches
3239 // the `WorkloadHandle::spawn` allocation but with a
3240 // pcomm-specific diagnostic prefix.
3241 let size = total_workers * std::mem::size_of::<AtomicU64>();
3242 let ptr = unsafe {
3243 libc::mmap(
3244 std::ptr::null_mut(),
3245 size,
3246 libc::PROT_READ | libc::PROT_WRITE,
3247 libc::MAP_SHARED | libc::MAP_ANONYMOUS,
3248 -1,
3249 0,
3250 )
3251 };
3252 if ptr == libc::MAP_FAILED {
3253 let errno = std::io::Error::last_os_error();
3254 let hint = mmap_shared_anon_errno_hint(errno.raw_os_error());
3255 anyhow::bail!(
3256 "mmap(MAP_SHARED|MAP_ANONYMOUS, {size} bytes) for the \
3257 per-worker iter_counters region failed: {errno}{hint}; \
3258 this region holds one AtomicU64 per worker thread \
3259 ({total_workers} thread(s) inside the pcomm={pcomm:?} \
3260 container) so the parent can snapshot iteration \
3261 counts via `snapshot_iterations()`.",
3262 );
3263 }
3264 guard.iter_counters = ptr as *mut AtomicU64;
3265 guard.iter_counter_bytes = size;
3266
3267 // Single shared phase-epoch word (MAP_SHARED, zero-init to
3268 // 0 = BASELINE) — same role as in `WorkloadHandle::spawn`: the
3269 // scenario engine bumps it per step boundary and backdrop
3270 // workers read it for per-phase attribution.
3271 let epoch_size = std::mem::size_of::<AtomicU32>();
3272 let epoch_ptr = unsafe {
3273 libc::mmap(
3274 std::ptr::null_mut(),
3275 epoch_size,
3276 libc::PROT_READ | libc::PROT_WRITE,
3277 libc::MAP_SHARED | libc::MAP_ANONYMOUS,
3278 -1,
3279 0,
3280 )
3281 };
3282 if epoch_ptr == libc::MAP_FAILED {
3283 let errno = std::io::Error::last_os_error();
3284 let hint = mmap_shared_anon_errno_hint(errno.raw_os_error());
3285 anyhow::bail!(
3286 "mmap(MAP_SHARED|MAP_ANONYMOUS, {epoch_size} bytes) for the \
3287 phase-epoch word (pcomm={pcomm:?}) failed: {errno}{hint}; \
3288 this single AtomicU32 broadcasts the current phase to \
3289 backdrop workers for per-phase attribution.",
3290 );
3291 }
3292 guard.phase_epoch = epoch_ptr as *mut AtomicU32;
3293 guard.phase_epoch_bytes = epoch_size;
3294
3295 // Per-group resource allocation (pipe pairs, chain pipes,
3296 // futex regions). Same shape as `WorkloadHandle::spawn_group`'s
3297 // prologue but inlined here so the per-group bookkeeping
3298 // (`PcommGroupResources`) is built directly for
3299 // `spawn_pcomm_container`. `iter_offset` runs across groups
3300 // and tracks each group's first iter_slot.
3301 let mut resources: Vec<PcommGroupResources> = Vec::with_capacity(groups.len());
3302 let mut iter_offset: usize = 0;
3303 for group in &groups {
3304 let needs_pipes = matches!(
3305 group.work_type,
3306 WorkType::PipeIo { .. } | WorkType::CachePipe { .. }
3307 );
3308 let chain_depth = group.work_type.chain_pipe_depth();
3309 let needs_futex = group.work_type.needs_shared_mem();
3310 let pipe_pair_base = guard.pipe_pairs.len();
3311 let chain_pipes_base = guard.chain_pipes.len();
3312 let futex_ptrs_base = guard.futex_ptrs.len();
3313 let futex_region_size = futex_region_size_for(&group.work_type);
3314 let futex_group_size = group.work_type.worker_group_size().unwrap_or(2);
3315
3316 if needs_pipes {
3317 for _ in 0..group.num_workers / 2 {
3318 let mut ab = [0i32; 2];
3319 if unsafe { libc::pipe2(ab.as_mut_ptr(), libc::O_CLOEXEC) } != 0 {
3320 anyhow::bail!(
3321 "pipe2 (pcomm={pcomm:?}, group {}) failed: {}",
3322 group.group_idx,
3323 std::io::Error::last_os_error(),
3324 );
3325 }
3326 let mut ba = [0i32; 2];
3327 if unsafe { libc::pipe2(ba.as_mut_ptr(), libc::O_CLOEXEC) } != 0 {
3328 unsafe {
3329 libc::close(ab[0]);
3330 libc::close(ab[1]);
3331 }
3332 anyhow::bail!(
3333 "pipe2 (pcomm={pcomm:?}, group {}) failed: {}",
3334 group.group_idx,
3335 std::io::Error::last_os_error(),
3336 );
3337 }
3338 guard.pipe_pairs.push((ab, ba));
3339 }
3340 }
3341
3342 if let Some(depth) = chain_depth
3343 && depth > 0
3344 && group.num_workers >= depth
3345 {
3346 let chains = group.num_workers / depth;
3347 for _ in 0..chains {
3348 let mut chain: Vec<[i32; 2]> = Vec::with_capacity(depth);
3349 let mut alloc_ok = true;
3350 for _ in 0..depth {
3351 let mut p = [0i32; 2];
3352 if unsafe { libc::pipe2(p.as_mut_ptr(), libc::O_CLOEXEC) } != 0 {
3353 alloc_ok = false;
3354 break;
3355 }
3356 chain.push(p);
3357 }
3358 if !alloc_ok {
3359 for p in &chain {
3360 unsafe {
3361 libc::close(p[0]);
3362 libc::close(p[1]);
3363 }
3364 }
3365 anyhow::bail!(
3366 "WakeChain pipe2 (pcomm={pcomm:?}, group {}) failed: {}",
3367 group.group_idx,
3368 std::io::Error::last_os_error(),
3369 );
3370 }
3371 guard.chain_pipes.push(chain);
3372 }
3373 }
3374
3375 if needs_futex {
3376 for _ in 0..group.num_workers / futex_group_size {
3377 let region = unsafe {
3378 libc::mmap(
3379 std::ptr::null_mut(),
3380 futex_region_size,
3381 libc::PROT_READ | libc::PROT_WRITE,
3382 libc::MAP_SHARED | libc::MAP_ANONYMOUS,
3383 -1,
3384 0,
3385 )
3386 };
3387 if region == libc::MAP_FAILED {
3388 let errno = std::io::Error::last_os_error();
3389 let hint = mmap_shared_anon_errno_hint(errno.raw_os_error());
3390 anyhow::bail!(
3391 "mmap(MAP_SHARED|MAP_ANONYMOUS, {futex_region_size} bytes) \
3392 for a futex shared-memory region (pcomm={pcomm:?}, group {}) \
3393 failed: {errno}{hint}",
3394 group.group_idx,
3395 );
3396 }
3397 unsafe {
3398 std::ptr::write_bytes(region as *mut u8, 0, futex_region_size);
3399 }
3400 guard.futex_ptrs.push(region as *mut u32);
3401 guard.futex_region_sizes.push(futex_region_size);
3402 }
3403 }
3404
3405 resources.push(PcommGroupResources {
3406 iter_offset,
3407 pipe_pair_base,
3408 chain_pipes_base,
3409 futex_ptrs_base,
3410 needs_pipes,
3411 chain_depth,
3412 needs_futex,
3413 futex_group_size,
3414 });
3415 iter_offset += group.num_workers;
3416 }
3417
3418 // Fork the container and spawn its threads. On success the
3419 // container is registered in `guard.children`; on failure the
3420 // guard's Drop reaps it.
3421 spawn_pcomm_container(
3422 &mut guard,
3423 pcomm,
3424 container_uid,
3425 container_gid,
3426 &groups,
3427 &resources,
3428 )?;
3429
3430 Ok(guard.into_handle())
3431 }
3432
3433 /// Spawn worker tasks. Workers block until
3434 /// [`start()`](Self::start) is called, allowing the caller to
3435 /// move fork-mode workers into cgroups first. The worker creation
3436 /// primitive (`fork` or `std::thread::spawn`) is selected by
3437 /// [`WorkloadConfig::clone_mode`].
3438 pub fn spawn(config: &WorkloadConfig) -> Result<Self> {
3439 // Reject invalid mem_policy before any worker context exists;
3440 // see [`WorkloadConfig::validate`] for the silent-skip
3441 // rationale and why a worker-side `_exit(1)` fix is unsound.
3442 config.validate()?;
3443
3444 let dispatch = match &config.clone_mode {
3445 CloneMode::Fork => Dispatch::Fork,
3446 CloneMode::Thread => Dispatch::Thread,
3447 };
3448
3449 // Build the per-group params list: primary first
3450 // (group_idx == 0), then composed[k] resolved into
3451 // group_idx == k+1. The resolver enforces the "spawn-time
3452 // resolution rules" documented on
3453 // [`WorkloadConfig::composed`] (Q1: num_workers must be
3454 // explicit; Q2: only Inherit/Exact affinity reachable from
3455 // spawn() — topology-aware variants need the scenario
3456 // engine).
3457 //
3458 // Every group inherits the parent
3459 // [`WorkloadConfig::clone_mode`]: SpawnGuard's lifecycle
3460 // assumes a single dispatch path (every guard.children
3461 // entry is a fork-mode child reaped via waitpid; every
3462 // guard.threads entry is a thread-mode worker joined via
3463 // JoinHandle). Mixing modes inside one guard would route
3464 // teardown through the wrong code path, so [`WorkSpec`]
3465 // carries no `clone_mode` field — it is a workload-wide
3466 // property fixed by [`WorkloadConfig::clone_mode`].
3467 let mut groups: Vec<GroupParams> = Vec::with_capacity(1 + config.composed.len());
3468 groups.push(GroupParams::primary(config)?);
3469 for (i, spec) in config.composed.iter().enumerate() {
3470 groups.push(GroupParams::from_composed(spec, i + 1)?);
3471 }
3472
3473 // Per-group admission. Each group's work_type is checked
3474 // independently — a malformed composed entry bails the
3475 // whole workload before any resources are acquired.
3476 for group in &groups {
3477 // Thread mode + ForkExit is incompatible. ForkExit's worker
3478 // body calls `libc::fork()` from inside `worker_main` to
3479 // exercise the fork -> exit -> wait lifecycle. Under
3480 // [`CloneMode::Thread`] the worker is one thread of the
3481 // harness's multi-threaded tgid, and a `fork()` from a thread
3482 // of a multi-threaded process duplicates ONLY the calling
3483 // thread: any lock another harness thread holds at fork time
3484 // (glibc malloc arena, internal mutexes) stays locked forever
3485 // in the child, which has no thread left to release it. The
3486 // child here only `_exit`s (async-signal-safe), so it survives
3487 // this in practice — but the hazard is real and the
3488 // fork -> exit primitive is faithfully exercised only when each
3489 // worker is its own process. (The child does NOT share the
3490 // harness tgid: `fork()` omits CLONE_THREAD, so copy_process
3491 // gives it group_leader=self / tgid=pid, a fresh singleton
3492 // group. Its `libc::_exit(0)` issues exit_group(2) ->
3493 // `do_group_exit` -> `zap_other_threads`, but a singleton tgid
3494 // has no other threads for `zap_other_threads` to SIGKILL, so
3495 // the teardown ends only the child; the harness tgid is
3496 // untouched.) Reject at spawn with an actionable diagnostic;
3497 // CloneMode::Fork gives each worker its own tgid and a
3498 // lock-clean address space.
3499 if matches!(dispatch, Dispatch::Thread) && matches!(group.work_type, WorkType::ForkExit)
3500 {
3501 anyhow::bail!(
3502 "CloneMode::Thread is incompatible with WorkType::ForkExit \
3503 (group {}) — ForkExit forks inside the worker, and a fork \
3504 from a thread of the multi-threaded harness inherits \
3505 sibling-held locks the child cannot release; only a \
3506 separate process faithfully exercises the fork/exit \
3507 lifecycle. Use CloneMode::Fork for ForkExit workloads.",
3508 group.group_idx,
3509 );
3510 }
3511 // Fork mode + EpollStorm is incompatible. EpollStorm
3512 // creates an eventfd + epoll fd inside worker pos 0 and
3513 // publishes their integer fd numbers through the per-group
3514 // shared mmap region (`efd_slot` / `epfd_slot`); siblings
3515 // load those numbers and operate on them as if they
3516 // referred to the same kernel objects. Under
3517 // [`CloneMode::Fork`] each forked child holds its own copy
3518 // of the parent's fd table at fork time, but the eventfd
3519 // and epoll fd are created AFTER the fork on worker pos 0
3520 // — so sibling children's fd tables never contain those
3521 // descriptors. The integer numbers they read from the
3522 // shared region either resolve to unrelated fds the child
3523 // happened to have at the same slot or fail with EBADF.
3524 // The fd table is genuinely shared only under
3525 // [`CloneMode::Thread`]. Reject at spawn time with an
3526 // actionable diagnostic; CloneMode::Thread is the correct
3527 // choice for EpollStorm.
3528 if matches!(dispatch, Dispatch::Fork)
3529 && matches!(group.work_type, WorkType::EpollStorm { .. })
3530 {
3531 anyhow::bail!(
3532 "CloneMode::Fork is incompatible with WorkType::EpollStorm \
3533 (group {}) — EpollStorm publishes eventfd/epoll fd numbers \
3534 through a shared mmap region for siblings to consume, but \
3535 forked children hold independent fd tables that never \
3536 contain those post-fork descriptors. Use CloneMode::Thread \
3537 for EpollStorm workloads.",
3538 group.group_idx,
3539 );
3540 }
3541 // Thread mode + CgroupChurn is incompatible. CgroupChurn's
3542 // worker body writes its own tid (`SYS_gettid`) to a
3543 // sibling cgroup's `cgroup.procs` file. The kernel's
3544 // `__cgroup_procs_write` resolves the tid to its
3545 // task_struct, then migrates the *entire* thread group
3546 // leader's task and every member of its tgid to the
3547 // target cgroup (see `cgroup_attach_task` /
3548 // `cgroup_migrate` in kernel/cgroup/cgroup.c — procs-file
3549 // semantics are tgid-wide, contrast `cgroup.threads`
3550 // which is per-thread under the threaded controller).
3551 // Under [`CloneMode::Thread`] every worker is a member of
3552 // the test harness's tgid, so the first CgroupChurn write
3553 // migrates the harness itself and every sibling worker
3554 // thread out from under the host. Reject at spawn time
3555 // with an actionable diagnostic; CloneMode::Fork gives
3556 // each worker its own tgid so a procs-file write moves
3557 // only that worker.
3558 if matches!(dispatch, Dispatch::Thread)
3559 && matches!(group.work_type, WorkType::CgroupChurn { .. })
3560 {
3561 anyhow::bail!(
3562 "CloneMode::Thread is incompatible with WorkType::CgroupChurn \
3563 (group {}) — CgroupChurn writes the worker tid to \
3564 `cgroup.procs`, which the kernel resolves to the whole tgid \
3565 and migrates every sibling thread (including the harness) \
3566 to the target cgroup. Use CloneMode::Fork for CgroupChurn \
3567 workloads so each worker is a separate tgid.",
3568 group.group_idx,
3569 );
3570 }
3571 // Thread mode + CgroupAttachStorm is incompatible because the
3572 // worker installs `SIGCHLD = SIG_IGN` at entry (under
3573 // ReapMode::SigIgn) to auto-reap its forked children. Under
3574 // [`CloneMode::Thread`] the worker is a pthread of the harness
3575 // thread group and shares the harness `sighand` (CLONE_THREAD
3576 // implies CLONE_SIGHAND), so that install changes SIGCHLD
3577 // disposition for the WHOLE harness process — corrupting the
3578 // harness's own child reaping — not just this worker. (fork()
3579 // from a thread of the multithreaded harness is also fragile;
3580 // the forked child is safe here only because it does nothing
3581 // but `_exit`.) Under [`CloneMode::Fork`] each worker is its
3582 // own process with a private sighand, so the SIG_IGN install
3583 // and the forked-child lifecycle stay confined to that worker.
3584 // Note: the storm writes the forked CHILD's pid (its own tgid)
3585 // to cgroup.procs, so — unlike CgroupChurn's own-tid write —
3586 // the migration never moves the harness; the hazard is the
3587 // shared-sighand SIG_IGN install, not tgid migration. Reject at
3588 // spawn time with an actionable diagnostic.
3589 if matches!(dispatch, Dispatch::Thread)
3590 && matches!(group.work_type, WorkType::CgroupAttachStorm { .. })
3591 {
3592 anyhow::bail!(
3593 "CloneMode::Thread is incompatible with WorkType::CgroupAttachStorm \
3594 (group {}) — the worker installs SIGCHLD=SIG_IGN to auto-reap its \
3595 forked children, but a thread-group worker shares the harness \
3596 sighand, so that install corrupts the harness's own child reaping. \
3597 Use CloneMode::Fork for CgroupAttachStorm workloads so each worker \
3598 has its own process and private sighand.",
3599 group.group_idx,
3600 );
3601 }
3602 // Dispatch-agnostic admission rules
3603 // (worker_group_size divisibility, chain depth >= 2,
3604 // IdleChurn / IpcVariance zero-rejections) live in
3605 // [`validate_workload_admission`] and are shared with
3606 // [`Self::spawn_pcomm_cgroup`]. The
3607 // CloneMode-vs-WorkType compat checks above stay
3608 // dispatch-specific because each dispatch path gates
3609 // independently: Thread+ForkExit (here) and pcomm+ForkExit
3610 // ([`Self::spawn_pcomm_cgroup`]) reject for the SAME reason
3611 // (a fork from a thread of a multi-threaded process inherits
3612 // sibling-held locks the child cannot release) via separate
3613 // per-path gates; Fork+EpollStorm has no pcomm equivalent.
3614 validate_workload_admission(group)?;
3615 }
3616
3617 // futex region sizing is per-group, not MAX'd across all
3618 // groups. Each group's futex region has its own natural
3619 // size determined by [`futex_region_size_for`] (FanOutCompute
3620 // = 16, ProducerConsumerImbalance = 32 + Q*8, everything
3621 // else = 4). Storing the size alongside each pointer in
3622 // `SpawnGuard::futex_region_sizes` lets Drop munmap each
3623 // region with its own length, so a small-variant group
3624 // composed alongside a large ProducerConsumerImbalance group
3625 // is no longer inflated to the large size.
3626 //
3627 // The kernel rounds munmap length up to PAGE_SIZE, so the
3628 // per-region waste for sub-page allocations is bounded at
3629 // one page; the previous MAX-across-groups approach could
3630 // waste many pages per small-variant group when paired with
3631 // a large queue_depth_target.
3632
3633 // All failable acquisitions in this function route through
3634 // `guard`. If any `?`/`bail!` returns early, the guard's Drop
3635 // SIGKILLs+reaps forked children, closes open pipe fds, and
3636 // munmaps the shared regions — so no leak on a mid-spawn
3637 // error path.
3638 let mut guard = SpawnGuard::new();
3639
3640 // Per-worker iteration counter region (MAP_SHARED). Sized
3641 // for ALL groups' workers laid out contiguously: primary
3642 // group occupies slots `[0, primary.num_workers)`, composed
3643 // group `k` occupies slots starting at the running offset
3644 // tracked by `iter_offset` in the per-group spawn loop
3645 // below. Each worker atomically stores its iteration count
3646 // to its assigned slot; the parent reads all slots via
3647 // `snapshot_iterations()`. The mmap base is page-aligned
3648 // (kernel guarantee), so casting to `*mut AtomicU64` is
3649 // sound: page alignment (≥ 4096) ≥ AtomicU64 alignment (8),
3650 // and the region size is an exact multiple of
3651 // `size_of::<AtomicU64>()` (== 8). Each `.add(i)` moves by
3652 // `i * 8` bytes, preserving the 8-byte alignment invariant.
3653 // No non-atomic access to the region exists anywhere in the
3654 // crate, so the atomic-only aliasing rule (workers + parent
3655 // share `&AtomicU64` references derived from the raw
3656 // pointer) holds.
3657 let total_workers: usize = groups.iter().map(|g| g.num_workers).sum();
3658 if total_workers > 0 {
3659 let size = total_workers * std::mem::size_of::<AtomicU64>();
3660 let ptr = unsafe {
3661 libc::mmap(
3662 std::ptr::null_mut(),
3663 size,
3664 libc::PROT_READ | libc::PROT_WRITE,
3665 libc::MAP_SHARED | libc::MAP_ANONYMOUS,
3666 -1,
3667 0,
3668 )
3669 };
3670 if ptr == libc::MAP_FAILED {
3671 let errno = std::io::Error::last_os_error();
3672 let hint = mmap_shared_anon_errno_hint(errno.raw_os_error());
3673 anyhow::bail!(
3674 "mmap(MAP_SHARED|MAP_ANONYMOUS, {size} bytes) for the \
3675 per-worker iter_counters region failed: {errno}{hint}; \
3676 this region holds one AtomicU64 per worker \
3677 ({total_workers} slots across {} group(s)) so the parent \
3678 can snapshot iteration counts via \
3679 `snapshot_iterations()`. Remediation: reduce num_workers \
3680 (each worker consumes 8 bytes of this region, rounded up \
3681 to a page) or raise `vm.max_map_count` / the memory \
3682 cgroup limit.",
3683 groups.len(),
3684 );
3685 }
3686 guard.iter_counters = ptr as *mut AtomicU64;
3687 guard.iter_counter_bytes = size;
3688 // Single shared phase-epoch word (MAP_SHARED, MAP_ANONYMOUS
3689 // zero-inits it to 0 = BASELINE). The scenario engine bumps
3690 // it at each step boundary; backdrop workers read it to
3691 // attribute per-phase telemetry. Allocated for every handle
3692 // — a step-local pool whose epoch is never bumped simply
3693 // observes no change and emits no PhaseSlices.
3694 let epoch_size = std::mem::size_of::<AtomicU32>();
3695 let epoch_ptr = unsafe {
3696 libc::mmap(
3697 std::ptr::null_mut(),
3698 epoch_size,
3699 libc::PROT_READ | libc::PROT_WRITE,
3700 libc::MAP_SHARED | libc::MAP_ANONYMOUS,
3701 -1,
3702 0,
3703 )
3704 };
3705 if epoch_ptr == libc::MAP_FAILED {
3706 let errno = std::io::Error::last_os_error();
3707 let hint = mmap_shared_anon_errno_hint(errno.raw_os_error());
3708 anyhow::bail!(
3709 "mmap(MAP_SHARED|MAP_ANONYMOUS, {epoch_size} bytes) for the \
3710 phase-epoch word failed: {errno}{hint}; this single \
3711 AtomicU32 lets the scenario engine broadcast the current \
3712 phase to backdrop workers for per-phase attribution.",
3713 );
3714 }
3715 guard.phase_epoch = epoch_ptr as *mut AtomicU32;
3716 guard.phase_epoch_bytes = epoch_size;
3717 }
3718
3719 // Spawn each group in declaration order. `iter_offset`
3720 // tracks the running offset into the iter_counters mmap
3721 // (slot allocation per the layout commented above). Each
3722 // group's pipes / chain_pipes / futex_ptrs are appended to
3723 // the guard's flat vectors; we record per-group base
3724 // offsets so the per-worker fork loop can compute
3725 // global-vector indices from per-group worker indices and
3726 // the close-other-fds child path can iterate the full
3727 // guard while still identifying its own group's resources.
3728 let mut iter_offset: usize = 0;
3729 for group in &groups {
3730 Self::spawn_group(&mut guard, group, &dispatch, iter_offset)?;
3731 iter_offset += group.num_workers;
3732 }
3733
3734 // Success: transfer every live resource (children, threads,
3735 // futex_ptrs, iter_counters, pipe_pairs, chain_pipes) into
3736 // the handle. The guard's subsequent Drop sees empty Vecs
3737 // and a null iter_counters pointer — it is a no-op on this
3738 // path. Pipe fds are closed by `WorkloadHandle::drop` AFTER
3739 // worker shutdown so Thread-mode workers (which share the
3740 // parent's fd table) finish their pipe ops before the close.
3741 Ok(guard.into_handle())
3742 }
3743
3744 /// Spawn a single worker group's resources and per-worker
3745 /// tasks, appending each into the shared [`SpawnGuard`].
3746 ///
3747 /// Each group records its own base offsets into the guard's
3748 /// flat vectors at entry time, then uses those offsets when
3749 /// computing per-worker `pair_idx` / `chain_idx` /
3750 /// `futex_group_idx`. The fork-child close-other-fds block
3751 /// iterates the FULL guard so it sweeps fds belonging to
3752 /// other groups too — without that sweep, a composed-group
3753 /// worker would inherit (and never close) every primary-group
3754 /// pipe fd.
3755 ///
3756 /// Resource ownership is uniform across groups: every
3757 /// allocated pipe / mmap region lives in the guard's flat
3758 /// vectors and is freed by `SpawnGuard::Drop` on early-bail or
3759 /// transferred to [`WorkloadHandle`] on success via
3760 /// [`SpawnGuard::into_handle`].
3761 #[allow(clippy::too_many_arguments)]
3762 fn spawn_group(
3763 guard: &mut SpawnGuard,
3764 group: &GroupParams,
3765 dispatch: &Dispatch,
3766 iter_offset: usize,
3767 ) -> Result<()> {
3768 let needs_pipes = matches!(
3769 group.work_type,
3770 WorkType::PipeIo { .. } | WorkType::CachePipe { .. }
3771 );
3772 let chain_depth = group.work_type.chain_pipe_depth();
3773 let needs_futex = group.work_type.needs_shared_mem();
3774
3775 // Record the bases into the guard's flat vectors BEFORE
3776 // appending this group's allocations. The base values
3777 // identify "where this group's resources start" — the
3778 // per-worker fork loop combines `pipe_pair_base + i / 2`
3779 // (and analogous for chain_idx / futex_group_idx) to
3780 // address its own resources without colliding with another
3781 // group's range.
3782 let pipe_pair_base = guard.pipe_pairs.len();
3783 let chain_pipes_base = guard.chain_pipes.len();
3784 let futex_ptrs_base = guard.futex_ptrs.len();
3785 // Per-group natural size for the futex MAP_SHARED region —
3786 // each region in this group's range gets exactly this many
3787 // bytes (rather than the previous global MAX across every
3788 // group). See `futex_region_size_for` for the per-variant
3789 // sizing rules.
3790 let futex_region_size = futex_region_size_for(&group.work_type);
3791
3792 // For paired work types, create one pipe per worker pair before forking.
3793 // pipe_pairs[pair_idx] = (read_fd, write_fd) for the A->B direction,
3794 // and a second pipe for B->A. Use `pipe2(O_CLOEXEC)` instead
3795 // of bare `pipe(2)`: O_CLOEXEC is the correct default for
3796 // any kernel fd in long-running processes — fds without
3797 // O_CLOEXEC silently leak into any exec path.
3798 if needs_pipes {
3799 for _ in 0..group.num_workers / 2 {
3800 let mut ab = [0i32; 2]; // A writes, B reads
3801 if unsafe { libc::pipe2(ab.as_mut_ptr(), libc::O_CLOEXEC) } != 0 {
3802 anyhow::bail!("pipe2 failed: {}", std::io::Error::last_os_error());
3803 }
3804 let mut ba = [0i32; 2]; // B writes, A reads
3805 if unsafe { libc::pipe2(ba.as_mut_ptr(), libc::O_CLOEXEC) } != 0 {
3806 // Close the ab half we just created: it is not
3807 // yet owned by the guard, so its Drop won't
3808 // otherwise reach it.
3809 unsafe {
3810 libc::close(ab[0]);
3811 libc::close(ab[1]);
3812 }
3813 anyhow::bail!("pipe2 failed: {}", std::io::Error::last_os_error());
3814 }
3815 guard.pipe_pairs.push((ab, ba));
3816 }
3817 }
3818
3819 // For WakeChain { wake: WakeMechanism::Pipe }, allocate `depth` pipes per
3820 // chain (one pipe per stage). Pipe `i` connects stage `i`
3821 // (writer) to stage `(i + 1) % depth` (reader). On any
3822 // `pipe2()` failure mid-allocation, close the half-built
3823 // chain's pipes before bailing — the chain is not yet
3824 // pushed onto `guard.chain_pipes`, so its Drop won't
3825 // otherwise reach those fds. `O_CLOEXEC` matches the
3826 // defense-in-depth posture documented above on the
3827 // pipe-pair allocation.
3828 if let Some(depth) = chain_depth
3829 && depth > 0
3830 && group.num_workers >= depth
3831 {
3832 let chains = group.num_workers / depth;
3833 for _ in 0..chains {
3834 let mut chain: Vec<[i32; 2]> = Vec::with_capacity(depth);
3835 let mut alloc_ok = true;
3836 for _ in 0..depth {
3837 let mut p = [0i32; 2];
3838 if unsafe { libc::pipe2(p.as_mut_ptr(), libc::O_CLOEXEC) } != 0 {
3839 alloc_ok = false;
3840 break;
3841 }
3842 chain.push(p);
3843 }
3844 if !alloc_ok {
3845 for p in &chain {
3846 unsafe {
3847 libc::close(p[0]);
3848 libc::close(p[1]);
3849 }
3850 }
3851 anyhow::bail!(
3852 "WakeChain pipe2 allocation failed: {}",
3853 std::io::Error::last_os_error()
3854 );
3855 }
3856 guard.chain_pipes.push(chain);
3857 }
3858 }
3859
3860 // For FutexPingPong/FutexFanOut/FanOutCompute/MutexContention, allocate
3861 // one shared region per worker group via MAP_SHARED|MAP_ANONYMOUS
3862 // so all members of the fork see the same physical page.
3863 // Each region is sized exactly to the variant's natural
3864 // need (see [`futex_region_size_for`]) — the per-region
3865 // size is recorded in `guard.futex_region_sizes` parallel
3866 // to `guard.futex_ptrs`, so munmap on Drop receives the
3867 // correct length even when groups with different natural
3868 // sizes co-exist in the same workload.
3869 let futex_group_size = group.work_type.worker_group_size().unwrap_or(2);
3870 if needs_futex {
3871 for _ in 0..group.num_workers / futex_group_size {
3872 let ptr = unsafe {
3873 libc::mmap(
3874 std::ptr::null_mut(),
3875 futex_region_size,
3876 libc::PROT_READ | libc::PROT_WRITE,
3877 libc::MAP_SHARED | libc::MAP_ANONYMOUS,
3878 -1,
3879 0,
3880 )
3881 };
3882 if ptr == libc::MAP_FAILED {
3883 let errno = std::io::Error::last_os_error();
3884 let hint = mmap_shared_anon_errno_hint(errno.raw_os_error());
3885 anyhow::bail!(
3886 "mmap(MAP_SHARED|MAP_ANONYMOUS, {futex_region_size} bytes) \
3887 for a futex shared-memory region failed: {errno}{hint}; \
3888 this region backs the {:?} worker-group's (group {}) \
3889 inter-process futex word and is allocated \
3890 before fork so every child inherits the same \
3891 mapping. Remediation: reduce num_workers (each \
3892 futex group consumes one shared page) or raise \
3893 `vm.max_map_count` / the memory cgroup limit.",
3894 group.work_type.name(),
3895 group.group_idx,
3896 );
3897 }
3898 unsafe { std::ptr::write_bytes(ptr as *mut u8, 0, futex_region_size) };
3899 guard.futex_ptrs.push(ptr as *mut u32);
3900 guard.futex_region_sizes.push(futex_region_size);
3901 }
3902 }
3903
3904 for i in 0..group.num_workers {
3905 let affinity = resolve_affinity(&group.affinity)?;
3906
3907 // Determine pipe fds for this worker.
3908 //
3909 // Three shapes use the same `Option<(read_fd, write_fd)>`
3910 // parameter:
3911 // - PipeIo / CachePipe (paired): worker A reads `ba[0]`,
3912 // writes `ab[1]`; worker B reads `ab[0]`, writes
3913 // `ba[1]`.
3914 // - WakeChain { wake: WakeMechanism::Pipe } (chain ring): stage `s`
3915 // reads from pipe `(s + depth - 1) % depth` (its
3916 // predecessor's write end's matching read end) and
3917 // writes to pipe `s` (its own pipe's write end, which
3918 // stage `s + 1` reads from).
3919 // - Everything else: `None`.
3920 //
3921 // Indices are computed in the GLOBAL `guard.pipe_pairs`
3922 // / `guard.chain_pipes` space by adding the per-group
3923 // base recorded at the top of `spawn_group`. A composed
3924 // group's pipe-pair-base, for example, equals the sum
3925 // of every prior group's pipe-pair count, so its first
3926 // worker pair is allocated immediately after the
3927 // primary's last entry — no collision, no aliasing.
3928 let worker_pipe_fds: Option<(i32, i32)> = if needs_pipes {
3929 let pair_idx = pipe_pair_base + i / 2;
3930 let (ref ab, ref ba) = guard.pipe_pairs[pair_idx];
3931 if i % 2 == 0 {
3932 // Worker A: writes to ab[1], reads from ba[0]
3933 Some((ba[0], ab[1]))
3934 } else {
3935 // Worker B: writes to ba[1], reads from ab[0]
3936 Some((ab[0], ba[1]))
3937 }
3938 } else if let Some(depth) = chain_depth
3939 && depth > 0
3940 {
3941 let chain_idx = chain_pipes_base + i / depth;
3942 let stage = i % depth;
3943 let prev_stage = (stage + depth - 1) % depth;
3944 let chain = &guard.chain_pipes[chain_idx];
3945 // Read end of predecessor's pipe; write end of own
3946 // pipe. The kernel pipe pair is `[read_end,
3947 // write_end]` per `libc::pipe`'s manpage.
3948 Some((chain[prev_stage][0], chain[stage][1]))
3949 } else {
3950 None
3951 };
3952
3953 // Futex pointer for this worker. The `pos` is the
3954 // worker's index inside its futex group: `pos == 0`
3955 // is the group's "first" worker (the role that varies
3956 // per-variant — pair-A for FutexPingPong, messenger for
3957 // FutexFanOut/FanOutCompute, waker for ThunderingHerd/
3958 // AsymmetricWaker, chain-head for WakeChain). Variants
3959 // that need finer-grained per-worker positioning
3960 // (PriorityInversion's 3 tiers, ProducerConsumerImbalance's
3961 // producer/consumer split, RtStarvation's RT/CFS split,
3962 // WakeChain's stage index) consume `pos` directly.
3963 let worker_futex: Option<(*mut u32, usize)> = if needs_futex {
3964 let futex_group_idx = futex_ptrs_base + i / futex_group_size;
3965 let pos = i % futex_group_size;
3966 Some((guard.futex_ptrs[futex_group_idx], pos))
3967 } else {
3968 None
3969 };
3970
3971 // Shared iteration counter slot for this worker. The
3972 // group-local index `i` is added to the spawn-time
3973 // `iter_offset` so each group's slot range is disjoint
3974 // from every other group's.
3975 let iter_slot: *mut AtomicU64 = if !guard.iter_counters.is_null() {
3976 unsafe { guard.iter_counters.add(iter_offset + i) }
3977 } else {
3978 std::ptr::null_mut()
3979 };
3980 // Single shared phase-epoch word — the SAME pointer for every
3981 // worker (no per-worker offset). Non-null for every handle; the
3982 // scenario bumps it only for backdrop handles, so step-local
3983 // workers observe no change and emit no slices.
3984 let phase_epoch_ptr: *mut AtomicU32 = guard.phase_epoch;
3985
3986 // Per-mode dispatch. Thread-mode workers do not need
3987 // pipes — the rendezvous and report channels are
3988 // in-process Rust primitives (`mpsc::sync_channel(0)` +
3989 // `JoinHandle`). Fork mode uses the pipe-based
3990 // scaffolding below.
3991 match dispatch {
3992 Dispatch::Thread => {
3993 spawn_thread_worker(
3994 guard,
3995 group,
3996 affinity,
3997 worker_pipe_fds,
3998 worker_futex,
3999 iter_slot,
4000 phase_epoch_ptr,
4001 )?;
4002 continue;
4003 }
4004 Dispatch::Fork => {
4005 // fall through to the pipe-based dispatch below
4006 }
4007 }
4008
4009 // Create pipe for report and a second pipe for "start" signal.
4010 // Local cleanup on second-pipe failure: the guard has no
4011 // per-worker tracking of half-allocated pipes, so the first
4012 // half closes here before the bail. `O_CLOEXEC` matches
4013 // the defense-in-depth posture above on the inter-worker
4014 // pipe pairs and chain pipes.
4015 let mut report_fds = [0i32; 2];
4016 if unsafe { libc::pipe2(report_fds.as_mut_ptr(), libc::O_CLOEXEC) } != 0 {
4017 anyhow::bail!(
4018 "worker {}/{} (group {}): report pipe2 failed: {}",
4019 i + 1,
4020 group.num_workers,
4021 group.group_idx,
4022 std::io::Error::last_os_error(),
4023 );
4024 }
4025 // Grow the report pipe so a worker that produces a worst-
4026 // case report (two `Vec<u64>` reservoirs capped at
4027 // `MAX_WAKE_SAMPLES` (100_000) entries each, plus
4028 // smaller fields) finishes `write_all` without blocking
4029 // for the parent to drain. The default pipe capacity on
4030 // Linux is 16 pages (64 KiB on 4 KiB pages, 256 KiB on
4031 // 16 KiB pages); a postcard-encoded `WorkerReport` with
4032 // both reservoirs full is at most ~1.8 MiB (u64 varint
4033 // is up to 9 bytes per entry, 200_000 entries total).
4034 // Without this grow, the worker blocks inside
4035 // `f.write_all(&bytes)` waiting for the parent's
4036 // `read_to_end`. The parent reads workers serially with
4037 // a shared 5 s deadline (`stop_and_collect`); if the
4038 // first worker's drain consumes the budget, every
4039 // subsequent worker's `poll` budget is 0, the parent
4040 // closes the read fd without reading, and the child's
4041 // blocked write returns EPIPE — the report is lost and
4042 // the failure surfaces as a sentinel `WorkerReport`.
4043 // Sizing the pipe to 8 MiB lets the child write the
4044 // full payload in one non-blocking burst, exit, and
4045 // close its write end; when the parent later polls the
4046 // read fd, all data is already buffered in the kernel
4047 // and `read_to_end` returns immediately with EOF.
4048 //
4049 // F_SETPIPE_SZ rounds the requested size up to the next
4050 // power of two; unprivileged callers are clamped to
4051 // `/proc/sys/fs/pipe-max-size` (typically 1 MiB).
4052 // ktstr always runs as root inside the guest, so the
4053 // grow succeeds. A best-effort failure (older kernel
4054 // without F_SETPIPE_SZ, or an unexpected EPERM) leaves
4055 // the default size in place — workers with small
4056 // reports still complete; workers with large reports
4057 // fall back to the historical blocking-write behaviour.
4058 // Logging the failure surfaces the regression without
4059 // failing the whole spawn.
4060 const REPORT_PIPE_SIZE: libc::c_int = 8 * 1024 * 1024;
4061 // SAFETY: `report_fds[1]` is the freshly opened write
4062 // end returned by `pipe2` above; F_SETPIPE_SZ accepts
4063 // any pipe fd (read or write end — both refer to the
4064 // same kernel `struct pipe_inode_info`).
4065 let prev_size =
4066 unsafe { libc::fcntl(report_fds[1], libc::F_SETPIPE_SZ, REPORT_PIPE_SIZE) };
4067 if prev_size < 0 {
4068 let err = std::io::Error::last_os_error();
4069 tracing::warn!(
4070 worker = i + 1,
4071 num_workers = group.num_workers,
4072 group_idx = group.group_idx,
4073 requested_size = REPORT_PIPE_SIZE,
4074 %err,
4075 "F_SETPIPE_SZ on report pipe failed; falling back to default \
4076 pipe capacity. Workers producing >64 KiB reports may block \
4077 in write_all and have their reports truncated by the \
4078 parent's deadline-driven fd close."
4079 );
4080 }
4081 let mut start_fds = [0i32; 2];
4082 if unsafe { libc::pipe2(start_fds.as_mut_ptr(), libc::O_CLOEXEC) } != 0 {
4083 unsafe {
4084 libc::close(report_fds[0]);
4085 libc::close(report_fds[1]);
4086 }
4087 anyhow::bail!(
4088 "worker {}/{} (group {}): start pipe2 failed: {}",
4089 i + 1,
4090 group.num_workers,
4091 group.group_idx,
4092 std::io::Error::last_os_error(),
4093 );
4094 }
4095
4096 // Block SIGUSR1 across fork so the child inherits a
4097 // SIGUSR1-blocked mask. Without this, there is a
4098 // window between `fork()` returning in the child and
4099 // the child installing `sigusr1_handler` below where
4100 // SIGUSR1's default disposition (terminate) is in
4101 // effect. A `stop_and_collect` call that races against
4102 // a worker still in mid-init — e.g. if a sibling
4103 // worker spawned just before this one already pushed
4104 // its (pid, ...) into `guard.children` and a parent
4105 // thread initiated stop signaling — could deliver
4106 // SIGUSR1 to this child before its handler is
4107 // installed and terminate it via the default action.
4108 // The block + post-install unblock pattern queues any
4109 // SIGUSR1 sent in the window so the handler observes
4110 // it on the unblock; the worker then sets `STOP` and
4111 // exits gracefully.
4112 //
4113 // pthread_sigmask is the multi-threaded equivalent of
4114 // sigprocmask: ktstr's parent process is genuinely
4115 // multi-threaded (test runner threads, tracing
4116 // threads), and `sigprocmask` semantics are unspecified
4117 // in MT programs per signal-safety(7) /
4118 // pthread_sigmask(3) — pthread_sigmask is the correct
4119 // primitive to use here. The fork()-inherited mask is
4120 // the calling thread's mask at fork time, exactly what
4121 // we want.
4122 //
4123 // Old mask is captured so the parent path can restore
4124 // its prior signal mask after fork — the block must
4125 // be transient on the parent side; the parent's
4126 // SIGUSR1 stays under whatever disposition the host
4127 // application configured (typically the default;
4128 // ktstr does not handle SIGUSR1 in the parent
4129 // process).
4130 let mut old_mask: libc::sigset_t = unsafe { std::mem::zeroed() };
4131 let mut block_mask: libc::sigset_t = unsafe { std::mem::zeroed() };
4132 // pthread_sigmask returns 0 on success, a positive errno
4133 // on failure (per POSIX — does NOT set errno). A non-zero
4134 // return here means the SIGUSR1 mask did not actually get
4135 // installed: the child would inherit an unblocked SIGUSR1
4136 // and the post-fork race window between fork() and the
4137 // child's signal() install would terminate the worker on
4138 // its default action. Log so the failure surfaces.
4139 let psm_block_rc = unsafe {
4140 libc::sigemptyset(&mut block_mask);
4141 libc::sigaddset(&mut block_mask, libc::SIGUSR1);
4142 libc::pthread_sigmask(libc::SIG_BLOCK, &block_mask, &mut old_mask)
4143 };
4144 if psm_block_rc != 0 {
4145 tracing::warn!(
4146 rc = psm_block_rc,
4147 "pthread_sigmask(SIG_BLOCK, SIGUSR1) failed pre-fork; child inherits unblocked SIGUSR1 and may terminate on default action before installing handler"
4148 );
4149 }
4150
4151 let pid = unsafe { libc::fork() };
4152 match pid {
4153 -1 => {
4154 // Fork failed: restore the parent's previous
4155 // signal mask before bailing so this failure
4156 // doesn't leave SIGUSR1 blocked in the calling
4157 // thread for the rest of the process lifetime.
4158 let psm_restore_rc = unsafe {
4159 libc::pthread_sigmask(libc::SIG_SETMASK, &old_mask, std::ptr::null_mut())
4160 };
4161 if psm_restore_rc != 0 {
4162 tracing::warn!(
4163 rc = psm_restore_rc,
4164 "pthread_sigmask(SIG_SETMASK) failed restoring mask after fork failure; SIGUSR1 may remain blocked in this thread"
4165 );
4166 }
4167 // Close both fresh pipes so they don't leak
4168 // before the guard reaps the already-forked
4169 // siblings.
4170 unsafe {
4171 libc::close(report_fds[0]);
4172 libc::close(report_fds[1]);
4173 libc::close(start_fds[0]);
4174 libc::close(start_fds[1]);
4175 }
4176 anyhow::bail!(
4177 "worker {}/{} (group {}): fork failed: {}",
4178 i + 1,
4179 group.num_workers,
4180 group.group_idx,
4181 std::io::Error::last_os_error(),
4182 );
4183 }
4184 0 => {
4185 // Child: set parent-death signal BEFORE any other
4186 // post-fork setup so the kernel SIGKILLs this worker
4187 // immediately if the parent dies during the remaining
4188 // init (close fd loops, signal handler install, start-
4189 // pipe wait, worker_main). Without PR_SET_PDEATHSIG,
4190 // a parent crash between fork and start leaves workers
4191 // reparented to init and spinning indefinitely —
4192 // they'd outlive the test run, consume the cgroup's
4193 // CPU, and block the next scenario's cgroup teardown
4194 // with EBUSY. SIGKILL is the only safe choice: it
4195 // cannot be masked and runs before any of this child's
4196 // destructors execute (good — those destructors still
4197 // reference the parent's guard). prctl is NOT listed
4198 // as async-signal-safe by signal-safety(7); safe to
4199 // call here because this is a single-threaded
4200 // post-fork child before any signal handlers are
4201 // installed, so no interleaving can observe partial
4202 // state.
4203 unsafe {
4204 libc::prctl(libc::PR_SET_PDEATHSIG, libc::SIGKILL);
4205 }
4206 // Fork-race close: if the parent died between fork()
4207 // return and the prctl above, this child was already
4208 // reparented (typically to pid 1) before PDEATHSIG
4209 // was armed — the death signal is keyed on the CURRENT
4210 // parent, not the parent-at-fork-time, so the signal
4211 // will never fire. getppid() == 1 means we are already
4212 // orphaned; exit now instead of running the full
4213 // worker loop only to leak into init. Using `_exit`
4214 // (async-signal-safe) rather than `exit` so Rust
4215 // destructors that reference the parent's now-dead
4216 // guard don't run on the fork stack.
4217 //
4218 // PR_SET_CHILD_SUBREAPER exception: when an ancestor
4219 // of the ktstr process has called
4220 // `prctl(PR_SET_CHILD_SUBREAPER, 1)` (systemd user
4221 // scopes, some container runtimes, certain CI
4222 // harnesses), orphaned descendants reparent to the
4223 // nearest live subreaper rather than pid 1. In that
4224 // case `getppid() == 1` is false after an orphan-
4225 // race even though the original parent is dead —
4226 // the check is a best-effort fast-path for the
4227 // common "pid 1 catches orphans" case and does NOT
4228 // fire under subreaper ancestry. PDEATHSIG still
4229 // fires correctly in that scenario (the signal is
4230 // triggered when the CURRENT parent dies, and the
4231 // subreaper then inherits us), so the guard is a
4232 // narrowing of the leak window, not an elimination.
4233 //
4234 // Guest-init exception: inside a ktstr guest VM the
4235 // test driver IS pid 1 (it runs as /init), so every
4236 // worker forked by a scenario legitimately has
4237 // `getppid() == 1` even though the parent is alive
4238 // and well. Firing the orphan guard there would kill
4239 // every worker on startup and produce sentinel
4240 // "0 cpus, 0 iterations" reports. `ktstr_guest_init`
4241 // sets `KTSTR_GUEST_INIT=1` before dispatch; that
4242 // variable is inherited by every descendant process,
4243 // so its presence is a reliable signal that pid 1 is
4244 // the legitimate parent. Host-side workloads leave
4245 // the variable unset and retain the orphan detection.
4246 if std::env::var_os(crate::KTSTR_GUEST_INIT_ENV).is_none()
4247 && unsafe { libc::getppid() } == 1
4248 {
4249 exit_child_success();
4250 }
4251 // Make this worker its own process-group leader so
4252 // any descendants it spawns inherit `pgid == worker_pid`.
4253 // `stop_and_collect` / Drop issue `killpg(worker_pid,
4254 // SIGKILL)` alongside the direct `kill` — without a
4255 // private pgid, descendants forked by a
4256 // [`WorkType::Custom`] body (or any future workload
4257 // that spawns helpers) stay in the parent Rust
4258 // process's pgid, survive the worker's SIGKILL, and
4259 // orphan onto init. PR_SET_PDEATHSIG handles the
4260 // "parent crashes" case but is per-task and cleared
4261 // on fork, so grandchildren don't inherit it — the
4262 // pgid route is the only safe reach for them when
4263 // teardown is explicit. Failure is silently ignored:
4264 // the only reachable failure mode for setpgid(0, 0)
4265 // in a just-forked child is EPERM from an earlier
4266 // setsid (we never call it), so a return of -1 here
4267 // means the kernel invariant changed and the reach
4268 // degrades to "leader only" — same as the pre-batch
4269 // behavior. Async-signal-safe per signal-safety(7).
4270 unsafe {
4271 libc::setpgid(0, 0);
4272 }
4273 // Install signal handler BEFORE unblocking
4274 // SIGUSR1: the parent blocked SIGUSR1 across
4275 // fork so this child inherits a blocked mask;
4276 // any SIGUSR1 sent by `stop_and_collect` while
4277 // the mask is in effect queues until we
4278 // unblock. Once the handler is registered we
4279 // restore the original (pre-block) mask via
4280 // `pthread_sigmask(SIG_SETMASK, …)`, which
4281 // delivers any pending SIGUSR1 directly into
4282 // `sigusr1_handler` and sets `STOP` cleanly —
4283 // the worker proceeds into the start-wait
4284 // poll() with handler armed and any pending
4285 // signal already consumed. Without this two-
4286 // step, SIGUSR1 would fire under its default
4287 // disposition (terminate) during the window
4288 // between `fork()` and `signal()`.
4289 STOP.store(false, Ordering::Relaxed);
4290 // libc::signal returns SIG_ERR (cast as
4291 // sighandler_t) on failure with errno set.
4292 // pthread_sigmask returns 0 on success, errno
4293 // value on failure. Both failures here are
4294 // visible defects: signal() fail means SIGUSR1
4295 // keeps default disposition (terminate) and
4296 // stop_and_collect cannot stop the worker
4297 // gracefully — it falls through to the
4298 // killpg/SIGKILL escalation; pthread_sigmask()
4299 // fail means the SIGUSR1 we blocked across
4300 // fork stays blocked, so the parent's stop
4301 // SIGUSR1 queues forever in the child and the
4302 // child runs the full work loop until the
4303 // killpg escalation.
4304 //
4305 // tracing is unsafe in a post-fork child
4306 // before _exit (the parent's tracing
4307 // subscriber may be locked elsewhere); use
4308 // eprintln! which is fork-safe per
4309 // signal-safety(7) (write(2)).
4310 let sig_prev = unsafe {
4311 libc::signal(
4312 libc::SIGUSR1,
4313 sigusr1_handler as *const () as libc::sighandler_t,
4314 )
4315 };
4316 if sig_prev == libc::SIG_ERR {
4317 let errno = std::io::Error::last_os_error();
4318 eprintln!(
4319 "ktstr: signal(SIGUSR1) install failed in worker child: {errno}; graceful stop unavailable, killpg escalation will reap"
4320 );
4321 }
4322 let psm_unblock_rc = unsafe {
4323 libc::pthread_sigmask(libc::SIG_SETMASK, &old_mask, std::ptr::null_mut())
4324 };
4325 if psm_unblock_rc != 0 {
4326 eprintln!(
4327 "ktstr: pthread_sigmask(SIG_SETMASK) unblock failed in worker child: rc={psm_unblock_rc}; SIGUSR1 stays blocked, killpg escalation will reap"
4328 );
4329 }
4330 // Close unused pipe ends
4331 unsafe {
4332 libc::close(report_fds[0]);
4333 libc::close(start_fds[1]);
4334 }
4335 // Close pipe ends belonging to other workers
4336 // in this pair, AND every pipe fd that belongs
4337 // to any other pair anywhere in the workload —
4338 // including pairs owned by other groups, since
4339 // every pre-fork allocation lives in
4340 // `guard.pipe_pairs` regardless of which group
4341 // declared it. The fork inherits the parent's
4342 // entire fd table; without this sweep, a
4343 // composed-group worker would hold open every
4344 // primary-group pipe fd for its lifetime,
4345 // producing fd leaks and (for chain-shaped
4346 // workloads) keeping reader-side blocks live
4347 // when the writer-side closes.
4348 if needs_pipes {
4349 let pair_idx = pipe_pair_base + i / 2;
4350 let (ref ab, ref ba) = guard.pipe_pairs[pair_idx];
4351 if i % 2 == 0 {
4352 // Worker A keeps ba[0] (read) and ab[1] (write).
4353 // Close ab[0] and ba[1].
4354 unsafe {
4355 libc::close(ab[0]);
4356 libc::close(ba[1]);
4357 }
4358 } else {
4359 // Worker B keeps ab[0] (read) and ba[1] (write).
4360 // Close ab[1] and ba[0].
4361 unsafe {
4362 libc::close(ab[1]);
4363 libc::close(ba[0]);
4364 }
4365 }
4366 // Close all pipe fds from other pairs (any group).
4367 for (j, (ab2, ba2)) in guard.pipe_pairs.iter().enumerate() {
4368 if j != pair_idx {
4369 unsafe {
4370 libc::close(ab2[0]);
4371 libc::close(ab2[1]);
4372 libc::close(ba2[0]);
4373 libc::close(ba2[1]);
4374 }
4375 }
4376 }
4377 } else {
4378 // Worker doesn't own any pipe pair, but
4379 // other groups' pipe pairs are still in the
4380 // child's fd table — close them all.
4381 for (ab2, ba2) in guard.pipe_pairs.iter() {
4382 unsafe {
4383 libc::close(ab2[0]);
4384 libc::close(ab2[1]);
4385 libc::close(ba2[0]);
4386 libc::close(ba2[1]);
4387 }
4388 }
4389 }
4390 if let Some(depth) = chain_depth
4391 && depth > 0
4392 {
4393 let chain_idx = chain_pipes_base + i / depth;
4394 let stage = i % depth;
4395 let prev_stage = (stage + depth - 1) % depth;
4396 // Close every fd in the chain that this
4397 // stage does not own. Owned fds (kept open):
4398 // - chain[prev_stage][0]: read end of the
4399 // pipe predecessor writes to.
4400 // - chain[stage][1]: write end of the
4401 // pipe successor reads from.
4402 // Everything else is the inverse end of an
4403 // owned pipe or fully unrelated.
4404 for (s, pipe) in guard.chain_pipes[chain_idx].iter().enumerate() {
4405 // Always close the write end of the
4406 // predecessor's pipe (we only need its
4407 // read end).
4408 if s == prev_stage {
4409 unsafe {
4410 libc::close(pipe[1]);
4411 }
4412 // Always close the read end of our own
4413 // pipe (we only need its write end).
4414 } else if s == stage {
4415 unsafe {
4416 libc::close(pipe[0]);
4417 }
4418 // Pipes belonging to neither this stage
4419 // nor its predecessor: close both ends.
4420 } else {
4421 unsafe {
4422 libc::close(pipe[0]);
4423 libc::close(pipe[1]);
4424 }
4425 }
4426 }
4427 // Close every fd from other chains (any group).
4428 for (cj, chain) in guard.chain_pipes.iter().enumerate() {
4429 if cj != chain_idx {
4430 for pipe in chain {
4431 unsafe {
4432 libc::close(pipe[0]);
4433 libc::close(pipe[1]);
4434 }
4435 }
4436 }
4437 }
4438 } else {
4439 // This group has no chain pipes, but other
4440 // groups may. Close every chain-pipe fd
4441 // inherited via fork — leaving a primary
4442 // group's chain pipe open in a composed
4443 // worker would prevent the chain from ever
4444 // observing EOF on its read ends.
4445 for chain in guard.chain_pipes.iter() {
4446 for pipe in chain {
4447 unsafe {
4448 libc::close(pipe[0]);
4449 libc::close(pipe[1]);
4450 }
4451 }
4452 }
4453 }
4454 // Layered defense against child-side unwinding
4455 // reaching the forked-from-parent drops:
4456 //
4457 // 1. No-op panic hook — the default hook prints a
4458 // multi-line backtrace to stderr, which is a
4459 // shared fd with the parent post-fork. A panic
4460 // in the child would interleave garbled output
4461 // with the parent's tracing log and confuse
4462 // downstream parsers. Install a silent hook
4463 // before catch_unwind ONLY under
4464 // `panic = "unwind"` — under
4465 // `panic = "abort"` (release / nextest
4466 // profile) the silent hook would suppress the
4467 // panic message before SIGABRT fires, leaving
4468 // operators with zero diagnostic from a
4469 // crashed worker. Letting the default hook
4470 // write a backtrace to stderr is the lesser
4471 // evil under abort: the message may interleave
4472 // with parent tracing output but at least
4473 // surfaces the panic site. catch_unwind itself
4474 // is a no-op under abort (the closure is
4475 // invoked normally and the abort terminates
4476 // the child before any Err return), so gating
4477 // only the hook installation — not the
4478 // catch_unwind call — keeps the unwind-build
4479 // fast-path unchanged while restoring
4480 // release-build observability.
4481 //
4482 // 2. `_exit(0|1)` after the closure (success or
4483 // catch_unwind Err) — the child never returns
4484 // to a frame whose `SpawnGuard` Drop could
4485 // run. `_exit(2)` bypasses Rust's stack-unwind
4486 // drops and the static-destructor table both,
4487 // so the parent-owned `SpawnGuard` whose
4488 // storage was duplicated by `fork()` cannot
4489 // SIGKILL its siblings (fratricide) from the
4490 // child path.
4491 //
4492 // 3. `panic::catch_unwind` — catches any panic
4493 // before it escapes this arm. Belt-and-braces
4494 // against (a) additional Drops on this
4495 // frame's stack (e.g. future refactors that
4496 // add more RAII) and (b) alloc/OOM panics
4497 // during worker_main / postcard encode.
4498 //
4499 // Caveat: catch_unwind is a no-op under
4500 // `panic = "abort"`, which ktstr's Cargo.toml
4501 // DOES set in `[profile.release]`. In release
4502 // builds a panic inside this closure aborts
4503 // the child immediately (SIGABRT); the
4504 // `catch_unwind` call compiles but never
4505 // returns `Err`, and neither the
4506 // `f.write_all(&bytes)` nor the `_exit(1)`
4507 // below runs on the panic path. The parent's
4508 // `stop_and_collect` therefore observes a
4509 // missing WorkerReport and fills in a
4510 // sentinel — that sentinel fallback IS the
4511 // release-build correctness mechanism. Under
4512 // abort the silent hook is NOT installed (see
4513 // item 1) so the default panic handler writes
4514 // the panic location and message to stderr
4515 // just before SIGABRT, giving operators the
4516 // diagnostic they would otherwise have lost.
4517 // Defense (2) still applies unchanged: abort
4518 // skips Drops (matching the `_exit` path's
4519 // no-Drop guarantee). Dev/test builds (cargo
4520 // test, cargo nextest run — dev profile
4521 // inherits default unwind semantics) still
4522 // get a real `catch_unwind` Err → `_exit(1)`
4523 // fast-path with a silent hook so the
4524 // backtrace doesn't pollute test output.
4525 //
4526 // Global-state safety under unwind, scoped
4527 // to `worker_main`'s reachable code path —
4528 // the `fork()` child's observable set. Two
4529 // items: `STOP: AtomicBool` and
4530 // `STATIC_HOST_INFO: OnceLock<_>`. Neither
4531 // of them carries a Drop whose body touches
4532 // the inherited MAP_SHARED regions or the
4533 // parent-owned pipe fds. Under a
4534 // hypothetical unwind that escaped
4535 // `catch_unwind` (a double-panic that
4536 // bypasses the landing pad), the only
4537 // fork-child Drops that actually matter are
4538 // the parent-owned `SpawnGuard` (covered by
4539 // the `_exit` no-Drop guarantee above) and
4540 // the child-local `wake_latencies_ns` /
4541 // `migrations` `Vec<T>` (per-process heap,
4542 // no cross-process impact). `STATIC_HOST_INFO`'s
4543 // inner Drop frees a handful of
4544 // `Option<String>`s and is safe on either
4545 // side of fork. Crate-wide statics outside
4546 // this set (fetch, probe, vmm, …) are out
4547 // of scope — this audit pins only what the
4548 // fork-child can reach from `worker_main`.
4549 //
4550 // 4. `_exit(1)` on catch_unwind Err, `_exit(0)`
4551 // on Ok — bypasses Rust's global static
4552 // destructors that a plain `return` would
4553 // run.
4554 //
4555 // `AssertUnwindSafe` is justified: the child
4556 // unconditionally _exits after this block, so no
4557 // post-unwind invariant can be observed.
4558 #[cfg(panic = "unwind")]
4559 {
4560 let _ = std::panic::take_hook();
4561 std::panic::set_hook(Box::new(|_| {}));
4562 }
4563 let child_result =
4564 std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
4565 // Wait for parent to move us to cgroup before starting work.
4566 // Use poll() with a 30s timeout — signal-safe after fork,
4567 // prevents hanging forever if the parent stalls.
4568 let mut pfd = libc::pollfd {
4569 fd: start_fds[0],
4570 events: libc::POLLIN,
4571 revents: 0,
4572 };
4573 let ret = unsafe { libc::poll(&mut pfd, 1, 30_000) };
4574 if ret <= 0 {
4575 exit_child_error();
4576 }
4577 let mut buf = [0u8; 1];
4578 let mut f = unsafe { std::fs::File::from_raw_fd(start_fds[0]) };
4579 let _ = f.read_exact(&mut buf);
4580 drop(f);
4581 // Do NOT reset STOP here. STOP is initialized to
4582 // false at the pre-handshake site above (after the
4583 // signal-handler install) and a SIGUSR1 delivered
4584 // between the mask unblock and this point reflects
4585 // a legitimate parent-issued stop request — the
4586 // explicit-start path in `stop_and_collect` skips
4587 // the readiness barrier and races
4588 // `kill(pid, SIGUSR1)` against the worker's start-
4589 // byte read here, so any STOP=true observed at
4590 // this point must propagate into the work loop.
4591 // Clobbering it would lose the stop and produce a
4592 // worker that loops until the killpg/SIGKILL
4593 // escalation.
4594 // Publish a single "ready" byte on the report pipe
4595 // BEFORE entering `worker_main`. The parent's
4596 // auto-start barrier in `stop_and_collect` polls
4597 // every worker's report fd for POLLIN with a
4598 // bounded deadline and consumes this byte as the
4599 // explicit signal that the worker has finished
4600 // its post-fork init (cgroup placement, SIGUSR1
4601 // unblock) and is about to enter the
4602 // work loop. Replaces the prior 500 ms blind
4603 // sleep — under host CPU contention, real worker
4604 // start latency can exceed that sleep, dropping
4605 // a worker into its loop AFTER `stop_and_collect`
4606 // has already signalled stop, which surfaces as
4607 // a starvation false-positive.
4608 //
4609 // Done as a raw `libc::write` (not `File::write`)
4610 // to keep the report-fd's ownership inside its
4611 // existing `std::fs::File::from_raw_fd` block
4612 // below — wrapping it here would close the fd
4613 // when the local goes out of scope, before the
4614 // post-`worker_main` write_all path. A 1-byte
4615 // write to a freshly-grown 8 MiB pipe with no
4616 // reader contention completes synchronously
4617 // without blocking; signal-safe after fork.
4618 //
4619 // The byte is `b'r'`. The parent's collect path
4620 // strips a leading `b'r'` before
4621 // `postcard::from_bytes` so the
4622 // explicit-start call site (which skips the
4623 // barrier) still parses correctly. A zero return or short write
4624 // is treated as best-effort: if the kernel
4625 // refuses the write (EFAULT cannot occur with a
4626 // stack pointer; EBADF cannot occur on a fresh
4627 // fd; EPIPE only fires if the reader closed,
4628 // which the parent does not do mid-spawn), the
4629 // parent's barrier-deadline path falls back to
4630 // the same "skip this worker" branch as a
4631 // genuinely-dead worker.
4632 let ready_byte: u8 = b'r';
4633 unsafe {
4634 libc::write(
4635 report_fds[1],
4636 &ready_byte as *const u8 as *const libc::c_void,
4637 1,
4638 );
4639 }
4640 // Now run. Fork-mode workers thread the global
4641 // STOP through `worker_main` — the SIGUSR1 handler
4642 // is process-wide, so flipping `STOP` from
4643 // `sigusr1_handler` is what reaches the loop's
4644 // `stop.load(Relaxed)` checks.
4645 let report = worker_main(
4646 affinity,
4647 group.work_type.clone(),
4648 group.sched_policy,
4649 group.mem_policy.clone(),
4650 group.mpol_flags,
4651 group.nice,
4652 group.comm.as_deref(),
4653 group.uid,
4654 group.gid,
4655 group.numa_node,
4656 worker_pipe_fds,
4657 worker_futex,
4658 iter_slot,
4659 phase_epoch_ptr,
4660 &STOP,
4661 group.group_idx,
4662 );
4663 let bytes = postcard::to_stdvec(&report).unwrap_or_default();
4664 let mut f = unsafe { std::fs::File::from_raw_fd(report_fds[1]) };
4665 if let Err(e) = f.write_all(&bytes) {
4666 // tracing is unsafe in a post-fork child
4667 // (the global subscriber may be holding a
4668 // lock taken in another thread of the
4669 // parent process before fork); eprintln
4670 // goes straight to stderr without locking
4671 // a tracing subscriber. The parent
4672 // observes a partial / empty pipe payload
4673 // and emits a sentinel WorkerReport with
4674 // exit_info populated, but without this
4675 // log line the underlying I/O error
4676 // (EPIPE if the parent closed the read
4677 // end early, EFBIG / ENOSPC on a backing-
4678 // file pipe, EIO on a transient kernel
4679 // failure) is invisible.
4680 eprintln!("ktstr: worker report write_all failed: {e}");
4681 }
4682 drop(f);
4683 }));
4684 if child_result.is_ok() {
4685 exit_child_success();
4686 } else {
4687 exit_child_error();
4688 }
4689 }
4690 child_pid => {
4691 // Parent: restore the pre-fork signal mask
4692 // (the block was transient — only the child
4693 // inherits the blocked SIGUSR1, and that
4694 // child unblocks immediately after installing
4695 // its handler). Without this restore, SIGUSR1
4696 // would stay blocked in the calling thread for
4697 // the lifetime of the workload, swallowing any
4698 // signal directed at the parent process.
4699 let psm_parent_restore_rc = unsafe {
4700 libc::pthread_sigmask(libc::SIG_SETMASK, &old_mask, std::ptr::null_mut())
4701 };
4702 if psm_parent_restore_rc != 0 {
4703 tracing::warn!(
4704 rc = psm_parent_restore_rc,
4705 "pthread_sigmask(SIG_SETMASK) failed restoring mask in parent post-fork; SIGUSR1 stays blocked in this thread for the lifetime of the workload"
4706 );
4707 }
4708 // Close unused pipe ends.
4709 unsafe {
4710 libc::close(report_fds[1]);
4711 libc::close(start_fds[0]);
4712 }
4713 // child_pid is positive by the -1 arm above, so
4714 // fits in pid_t directly — store as pid_t so
4715 // every downstream libc call avoids the u32→i32
4716 // sign-cast wraparound bug.
4717 guard.children.push(ForkedChild {
4718 pid: child_pid,
4719 report_fd: report_fds[0],
4720 start_fd: start_fds[1],
4721 kind: ForkedChildKind::Worker {
4722 group_idx: group.group_idx,
4723 },
4724 });
4725 }
4726 }
4727 }
4728
4729 Ok(())
4730 }
4731
4732 /// Kernel TIDs of all worker tasks, in spawn order.
4733 ///
4734 /// Returned as `libc::pid_t` — the kernel's native type — so
4735 /// callers feed them directly into `kill`, `waitpid`,
4736 /// `Pid::from_raw`, and `sched_setaffinity` writes without any
4737 /// sign-cast at the libc boundary.
4738 ///
4739 /// # WARNING — `cgroup.procs` for `CloneMode::Thread`
4740 ///
4741 /// **For `CloneMode::Thread`, passing these TIDs to a
4742 /// `cgroup.procs` write migrates the ENTIRE test-runner process
4743 /// into that cgroup**: cgroup.procs writes are tgid-scoped, and
4744 /// every Thread worker shares the test runner's tgid. The first
4745 /// such write moves the test harness, every parent thread, and
4746 /// every sibling worker into the destination cgroup; subsequent
4747 /// writes are no-ops because they all point at the same tgid.
4748 /// Use cgroup v2 threaded-mode cgroups with `cgroup.threads`
4749 /// for per-thread placement. `CloneMode::Fork` is the right
4750 /// choice when each worker needs its own cgroup.
4751 ///
4752 /// # Per-mode interpretation
4753 ///
4754 /// - [`CloneMode::Fork`]: each entry is the worker's pid
4755 /// (== tgid == kernel tid because the worker is its own
4756 /// thread-group leader). Safe to feed into `cgroup.procs`.
4757 /// - [`CloneMode::Thread`]: each entry is the worker's
4758 /// `gettid()` value — distinct kernel tasks inside the
4759 /// parent's tgid. Safe for `sched_setaffinity(tid, ...)`;
4760 /// safe for `cgroup.threads` writes under a threaded-mode
4761 /// cgroup; **not** safe for `cgroup.procs` (see warning above).
4762 ///
4763 /// # Thread tid publish ordering
4764 ///
4765 /// Thread workers publish their `gettid()` via an
4766 /// `Arc<AtomicI32>` BEFORE the start handshake (the publish is
4767 /// the first thing the worker closure does, before blocking on
4768 /// `start_rx`). `spawn_thread_worker` blocks on a paired
4769 /// rendezvous channel until the worker reaches the publish
4770 /// point, so by the time [`WorkloadHandle::spawn`] returns,
4771 /// every thread worker's `tid` is non-zero (or spawn would have
4772 /// bailed with a "failed to publish gettid() within 2 s"
4773 /// error). Post-spawn `worker_pids()` is safe to call without
4774 /// first calling [`Self::start`]. The publish uses `Release`;
4775 /// this reader uses `Acquire`, pairing release-acquire so that
4776 /// any reader observing a non-zero tid is also guaranteed to
4777 /// observe the worker's post-publish state.
4778 ///
4779 /// # `pcomm` containers
4780 ///
4781 /// Groups configured with [`WorkSpec::pcomm`] yield a single
4782 /// container process whose tgid leader pid is what the parent
4783 /// holds; the per-thread `gettid()` values of the workers running
4784 /// inside the container are not exported across the process
4785 /// boundary. Each pcomm group contributes ONE entry to the
4786 /// returned vector — the container pid — regardless of how many
4787 /// thread workers it hosts. This pid is correct for `cgroup.procs`
4788 /// migration (the container's whole tgid moves) but is NOT
4789 /// suitable as a target for per-thread `sched_setaffinity` —
4790 /// see [`Self::set_affinity`] for the matching error path.
4791 ///
4792 /// Order: forked children (conventional workers + pcomm
4793 /// containers, in spawn order) followed by Thread-mode workers
4794 /// (in spawn order). A workload that mixes pcomm groups with
4795 /// non-pcomm Thread-mode groups produces both populated
4796 /// collections; a workload using only one dispatch path produces
4797 /// only one populated collection.
4798 pub fn worker_pids(&self) -> Vec<libc::pid_t> {
4799 let mut out = Vec::with_capacity(self.children.len() + self.threads.len());
4800 out.extend(self.children.iter().map(|c| c.pid));
4801 out.extend(self.threads.iter().map(|tw| tw.tid.load(Ordering::Acquire)));
4802 out
4803 }
4804
4805 /// Worker pids suitable for `cgroup.procs` migration.
4806 ///
4807 /// `cgroup.procs` is **tgid-scoped** in the kernel: writing a
4808 /// tid migrates the entire thread group containing that tid
4809 /// (`kernel/cgroup/cgroup.c::__cgroup_procs_write` resolves the
4810 /// passed pid to its leader via `find_lock_task_mm` /
4811 /// `cgroup_procs_write_start`). Under [`CloneMode::Thread`]
4812 /// every worker shares the test harness's tgid, so feeding
4813 /// [`Self::worker_pids`] to `cgroup.procs` would migrate the
4814 /// harness itself — catastrophic.
4815 ///
4816 /// Returns the per-worker pids when the spawn used
4817 /// [`CloneMode::Fork`] (each worker has its own tgid). Bails
4818 /// for [`CloneMode::Thread`] with an actionable diagnostic
4819 /// pointing at `cgroup.threads` (the thread-scoped sibling) as
4820 /// the right migration sink for thread workers.
4821 ///
4822 /// Callers that integrate with `cgroup.procs` writes — e.g.
4823 /// [`crate::cgroup::CgroupManager::move_tasks`] — should call
4824 /// this in place of [`Self::worker_pids`] so a misconfigured
4825 /// Thread-mode test fails at the migration step rather than
4826 /// silently moving the harness into the per-test cgroup.
4827 pub fn worker_pids_for_cgroup_procs(&self) -> Result<Vec<libc::pid_t>> {
4828 if !self.threads.is_empty() {
4829 anyhow::bail!(
4830 "WorkloadHandle::worker_pids_for_cgroup_procs: workers were \
4831 spawned with CloneMode::Thread; their pids share the test \
4832 harness's tgid and a `cgroup.procs` write would migrate the \
4833 harness. Use `cgroup.threads` (thread-scoped) for Thread-mode \
4834 workers, or switch to CloneMode::Fork."
4835 );
4836 }
4837 Ok(self.worker_pids())
4838 }
4839
4840 /// Signal all workers to start working (after they've been
4841 /// placed in cgroups, if applicable).
4842 ///
4843 /// Idempotent — subsequent calls after the first are no-ops.
4844 pub fn start(&mut self) {
4845 if self.started {
4846 return;
4847 }
4848 self.started = true;
4849 // Fork-mode: write a byte to each child's start pipe (covers
4850 // both conventional workers and pcomm containers — both wait
4851 // on the same start-byte handshake before entering their work
4852 // path).
4853 for child in &mut self.children {
4854 unsafe {
4855 libc::write(child.start_fd, b"s".as_ptr() as *const _, 1);
4856 libc::close(child.start_fd);
4857 }
4858 child.start_fd = -1;
4859 }
4860 // Thread-mode: send `()` on each worker's start_tx. The
4861 // SyncSender(0) rendezvous means each send blocks until the
4862 // worker calls recv(); if the worker has been joined or has
4863 // panicked before reaching recv, send returns Err which we
4864 // swallow (the join in stop_and_collect surfaces the real
4865 // exit). Take ownership so a future start() call (illegal
4866 // by the idempotence guard above) can't re-send.
4867 for tw in &mut self.threads {
4868 if let Some(tx) = tw.start_tx.take() {
4869 let _ = tx.send(());
4870 }
4871 }
4872 }
4873
4874 /// Set CPU affinity for worker at `idx`.
4875 ///
4876 /// For [`CloneMode::Fork`] the per-worker pid addresses a
4877 /// distinct kernel task. For [`CloneMode::Thread`] the worker's
4878 /// `gettid()` is what `sched_setaffinity(tid, ...)` accepts;
4879 /// this method reads the tid from the worker's
4880 /// `Arc<AtomicI32>` (with `Acquire` ordering, paired with the
4881 /// `Release` publish on the worker thread). Returns an error
4882 /// if the thread has not yet published its tid — call
4883 /// [`start()`](Self::start) first so the worker reaches its
4884 /// `gettid()` publish before reading.
4885 ///
4886 /// Bails for `ForkedChildKind::PcommContainer` entries: the
4887 /// container's tgid leader pid is the only kernel handle the
4888 /// parent holds for that group, but `sched_setaffinity` against
4889 /// that pid pins only the leader thread (a placeholder thread
4890 /// that never enters the work loop), not the worker threads
4891 /// running the workload. The container's worker tids are not
4892 /// published across the process boundary. Bake the affinity into
4893 /// [`WorkSpec::affinity`] at spawn time instead — `worker_main`
4894 /// applies it per-thread inside the container.
4895 ///
4896 /// Index space: `[0, children.len())` addresses forked children
4897 /// (conventional workers and pcomm containers), and
4898 /// `[children.len(), children.len() + threads.len())` addresses
4899 /// Thread-mode workers, matching the ordering of
4900 /// [`Self::worker_pids`].
4901 pub fn set_affinity(&self, idx: usize, cpus: &BTreeSet<usize>) -> Result<()> {
4902 let pid = if idx < self.children.len() {
4903 let child = &self.children[idx];
4904 if let ForkedChildKind::PcommContainer { groups } = &child.kind {
4905 let total: usize = groups.iter().map(|(_, n)| n).sum();
4906 anyhow::bail!(
4907 "set_affinity: child {idx} is a pcomm container \
4908 hosting {total} thread workers; per-thread \
4909 tids are not exported across the process boundary. \
4910 Set affinity via WorkSpec::affinity at spawn time \
4911 so worker_main applies it inside the container."
4912 );
4913 }
4914 child.pid
4915 } else {
4916 let thread_idx = idx - self.children.len();
4917 let tid = self.threads[thread_idx].tid.load(Ordering::Acquire);
4918 if tid == 0 {
4919 anyhow::bail!(
4920 "set_affinity: thread worker {thread_idx} has not yet \
4921 published gettid() (call start() first)"
4922 );
4923 }
4924 tid
4925 };
4926 set_thread_affinity(pid, cpus)
4927 }
4928
4929 /// Read all workers' current iteration counts from shared memory.
4930 ///
4931 /// Each element is the monotonically increasing iteration count for
4932 /// that worker, read with Relaxed ordering. Returns an empty vec
4933 /// if no workers were spawned.
4934 ///
4935 /// # Ordering rationale — why Relaxed is sound
4936 ///
4937 /// Every producer (the worker-side store at the
4938 /// `worker_main` publish sites) writes its slot with Relaxed
4939 /// ordering, and this reader loads with Relaxed too. No
4940 /// happens-before edge is needed because no host-side consumer
4941 /// pairs the iteration count with OTHER shared state: the
4942 /// parent samples these counters to answer "is this worker
4943 /// still making progress?" and feeds deltas into gap
4944 /// detection, not into any data-dependent follow-up read from
4945 /// a different shared memory location. A stale value on one
4946 /// sample is self-correcting — the next snapshot picks up the
4947 /// newer count without any cross-field invariant to break.
4948 ///
4949 /// The per-slot single-producer / multi-sampler shape is
4950 /// inherently non-tearing on every supported target
4951 /// (AtomicU64 is architecture-primitive on x86_64 and aarch64
4952 /// LSE with 8-byte alignment enforced by the type). The only
4953 /// question is ordering, and the audit above concludes Relaxed
4954 /// is load-bearingly correct — promoting either side to
4955 /// Acquire/Release would add a barrier with no corresponding
4956 /// paired operation to synchronise with.
4957 pub fn snapshot_iterations(&self) -> Vec<u64> {
4958 if self.iter_counters.is_null() || self.iter_counter_len == 0 {
4959 return Vec::new();
4960 }
4961 (0..self.iter_counter_len)
4962 .map(|i| {
4963 // SAFETY: alignment + atomic-only-access invariant
4964 // established at the iter_counters mmap site in
4965 // `WorkloadHandle::spawn` and carried by the
4966 // `*mut AtomicU64` type. Relaxed ordering: see the
4967 // rationale in the outer doc comment.
4968 unsafe { &*self.iter_counters.add(i) }.load(Ordering::Relaxed)
4969 })
4970 .collect()
4971 }
4972
4973 /// Broadcast the current scenario phase to this handle's workers.
4974 /// The scenario engine calls this at each step boundary — the
4975 /// 1-indexed phase (Step k → k + 1) at StepStart, `u32::MAX` (the
4976 /// inter-step gap sentinel) at StepEnd — so a backdrop worker
4977 /// drains a per-phase `PhaseSlice` on each transition. No-op when
4978 /// the handle has no `phase_epoch` region. Release store pairs with
4979 /// the worker's Relaxed load; it is telemetry, so no happens-before
4980 /// edge is required (see the `unsafe impl Send for WorkloadHandle`
4981 /// SAFETY note), but Release keeps the store promptly visible.
4982 pub(crate) fn set_phase_epoch(&self, epoch: u32) {
4983 if self.phase_epoch.is_null() {
4984 return;
4985 }
4986 // SAFETY: alignment + atomic-only-access invariant established
4987 // at the phase_epoch mmap site in `WorkloadHandle::spawn` /
4988 // `spawn_pcomm_cgroup` and carried by the `*mut AtomicU32`
4989 // type. The parent is the sole writer; workers are readers.
4990 unsafe { &*self.phase_epoch }.store(epoch, Ordering::Release);
4991 }
4992
4993 /// Stop all workers, collect their reports, and wait for exit.
4994 ///
4995 /// Auto-starts workers if [`start()`](Self::start) was not called,
4996 /// then waits on an event-driven barrier — each fork worker
4997 /// writes a single `b'r'` byte to its report pipe immediately
4998 /// after the start handshake completes, and the parent polls
4999 /// every report fd for `POLLIN` with a 5 s deadline. The
5000 /// barrier wakes the moment the slowest worker finishes its
5001 /// post-fork init, replacing the prior unconditional 500 ms
5002 /// sleep that under-waited under host CPU contention and
5003 /// over-waited on idle hosts. Thread-mode workers are pre-
5004 /// synchronised by `start()`'s `mpsc::sync_channel(0)` rendezvous,
5005 /// so the barrier is a no-op when no fork children were spawned.
5006 /// Consumes `self` -- workers cannot be restarted.
5007 ///
5008 /// Workers that fail to produce a report (died, timed out, or wrote
5009 /// corrupt data) get a zeroed-out sentinel report with `work_units: 0`.
5010 /// This ensures `assert_not_starved` catches dead workers as starvation
5011 /// failures.
5012 ///
5013 /// # Shutdown latency
5014 ///
5015 /// Workers spend their steady-state time blocked inside a
5016 /// `futex_wait` with timeout `WORKER_STOP_POLL_NS` (~100 ms).
5017 /// The "stop signal" is a per-mode flag the worker checks on
5018 /// every futex-wait wake; the wake interval bounds shutdown
5019 /// latency.
5020 ///
5021 /// _Fork mode_ — `stop_and_collect` sends SIGUSR1 to each
5022 /// worker pid; the per-process `sigusr1_handler` flips the
5023 /// global `STOP` in that worker's CoW address space, and the
5024 /// worker observes it on the NEXT futex wake (partner-writes
5025 /// or the 100 ms timeout, whichever comes first). The signal
5026 /// handler is process-wide and reaches one worker per kill().
5027 ///
5028 /// _Thread mode_ — `stop_and_collect` calls
5029 /// `worker.stop.store(true, Relaxed)` directly on each
5030 /// worker's `Arc<AtomicBool>`. SIGUSR1 is process-wide and
5031 /// useless for per-thread stop control, so no signal is sent;
5032 /// the worker observes the flag flip on its next futex-wait
5033 /// wake at the same 100 ms cadence.
5034 ///
5035 /// Callers that budget a graceful-shutdown window should
5036 /// allow at least one `WORKER_STOP_POLL_NS` tick (~100 ms)
5037 /// between flag flip and final collect, over and above any
5038 /// report-flush / IO latency. Tighter windows can race the
5039 /// worker's pre-stop iteration and surface as a missing
5040 /// report, which is then mapped to the sentinel path above.
5041 ///
5042 /// # Exit-shape invariance
5043 ///
5044 /// Collection discriminates purely on the presence and validity of
5045 /// the worker's pipe-delivered postcard payload — **not** on
5046 /// `waitpid` exit status. Under `panic = "unwind"` (dev/test
5047 /// profile) the worker's
5048 /// `catch_unwind` arm calls `_exit(1)` so the parent sees
5049 /// `WIFEXITED=true`, `WEXITSTATUS=1`; under `panic = "abort"`
5050 /// (release profile) the worker aborts with `SIGABRT` so the parent
5051 /// sees `WIFEXITED=false`, `WTERMSIG=6`. Either way, a panicking
5052 /// worker never finishes `f.write_all(&bytes)` on the report pipe,
5053 /// so `poll` + `read_to_end` hands back an empty (or truncated)
5054 /// buffer, `postcard::from_bytes` fails, and the
5055 /// sentinel path fires. Partial writes from a panic between successful
5056 /// `write_all` and `_exit(0)` are not reachable — the write is the
5057 /// last non-trivial statement inside the catch_unwind closure.
5058 /// The `waitpid` call later in this function exists solely for
5059 /// reaping zombies; its return value feeds only the "still alive
5060 /// → SIGKILL escalate" branch and is never mapped to report
5061 /// state (the sentinel path DOES now read it to populate
5062 /// [`WorkerExitInfo`] on the attached diagnostic, but the
5063 /// correctness discrimination — sentinel vs real report — still
5064 /// happens purely on pipe payload presence).
5065 pub fn stop_and_collect(mut self) -> Vec<WorkerReport> {
5066 // Auto-start if not explicitly started (workers in parent cgroup)
5067 let was_started = self.started;
5068 self.start();
5069
5070 // Event-driven worker-started barrier. Each fork worker writes
5071 // a single `b'r'` byte to its report pipe immediately after
5072 // the start-pipe handshake completes (see the matching write
5073 // inside `worker_main`'s catch_unwind closure). Polling every
5074 // worker's report fd for `POLLIN` with a bounded deadline
5075 // wakes the moment the slowest worker has finished its
5076 // post-fork init — replacing the prior 500 ms blind sleep
5077 // that could under-wait on a CPU-contended host (false
5078 // starvation if the worker entered its loop after stop was
5079 // signalled) and over-wait on an idle host (~500 ms wasted
5080 // per `stop_and_collect`).
5081 //
5082 // Thread-mode workers do not need a barrier here: `start()`
5083 // above sent on a `mpsc::sync_channel(0)` SyncSender(0)
5084 // rendezvous, which blocks the parent until the worker's
5085 // matching `recv()` returns — by the time `start()` returns,
5086 // every thread worker has crossed its start handshake. Only
5087 // the fork-mode pipe-based start signal is fire-and-forget,
5088 // so the barrier is gated on `!self.children.is_empty()`.
5089 //
5090 // Deadline budget: 5 s mirrors the existing collect deadline
5091 // below. Each iteration polls every still-pending fd with
5092 // the remaining budget; a worker that returns `POLLHUP` /
5093 // `POLLERR` (died before writing the ready byte — fork-race
5094 // close, panic during early init, or the kernel killed it)
5095 // is dropped from the pending set and the surrounding
5096 // collect path's sentinel-report logic surfaces it. The
5097 // worst case (every worker hits the deadline without a
5098 // ready byte) bounds the wait at the same 5 s the legacy
5099 // sleep + collect path budgeted for the entire stop_and_collect.
5100 if !was_started && !self.children.is_empty() {
5101 let barrier_deadline = std::time::Instant::now() + std::time::Duration::from_secs(5);
5102 // Pending = (index_into_children, read_fd). We track the
5103 // index so a `POLLHUP` worker can be removed from
5104 // pending without disturbing the collect loop's
5105 // ordering below.
5106 let mut pending: Vec<(usize, i32)> = self
5107 .children
5108 .iter()
5109 .enumerate()
5110 .map(|(i, c)| (i, c.report_fd))
5111 .collect();
5112 while !pending.is_empty() {
5113 let remaining =
5114 barrier_deadline.saturating_duration_since(std::time::Instant::now());
5115 if remaining.is_zero() {
5116 // Barrier deadline expired with workers still
5117 // pending. Stop waiting and proceed to signal
5118 // stop. The collect path below treats any
5119 // worker whose pipe never produced data as a
5120 // sentinel report — same outcome as a worker
5121 // that started in time but produced an
5122 // unparseable report.
5123 break;
5124 }
5125 let ms = remaining.as_millis().min(i32::MAX as u128) as i32;
5126 let mut pfds: Vec<libc::pollfd> = pending
5127 .iter()
5128 .map(|&(_, fd)| libc::pollfd {
5129 fd,
5130 events: libc::POLLIN,
5131 revents: 0,
5132 })
5133 .collect();
5134 // SAFETY: `pfds` is a non-empty owned Vec; nfds
5135 // matches its length; `ms >= 0` since the
5136 // remaining-zero branch above bails first. Return
5137 // codes are interpreted below.
5138 let ret = unsafe { libc::poll(pfds.as_mut_ptr(), pfds.len() as libc::nfds_t, ms) };
5139 if ret < 0 {
5140 let err = std::io::Error::last_os_error();
5141 if err.kind() == std::io::ErrorKind::Interrupted {
5142 // EINTR — re-poll with the remaining budget
5143 // on the next iteration. Common during
5144 // teardown when a sibling thread sends a
5145 // signal; the loop's deadline guard bounds
5146 // total wait time regardless of EINTR
5147 // frequency.
5148 continue;
5149 }
5150 // Hard poll failure (EFAULT impossible with an
5151 // owned Vec; ENOMEM extreme). Bail out of the
5152 // barrier and let the collect path's per-fd
5153 // poll handle each worker individually.
5154 tracing::warn!(
5155 %err,
5156 pending = pending.len(),
5157 "WorkloadHandle::stop_and_collect: barrier poll failed; falling \
5158 through to per-worker collect"
5159 );
5160 break;
5161 }
5162 // ret == 0 means the per-iteration timeout fired
5163 // without any fd ready — but we measured this
5164 // against `remaining`, so the next iteration's
5165 // saturating_duration_since will be zero and the
5166 // top-of-loop guard exits. Don't break here: a
5167 // future cycle could still be useful if the system
5168 // clock jumped backward, and the cost is one extra
5169 // iteration that immediately bails.
5170 if ret > 0 {
5171 pending.retain(|&(_, fd)| {
5172 // Find this fd's pollfd entry. Linear scan
5173 // is fine: typical worker counts are <100
5174 // and the alternative (HashMap) costs more
5175 // in setup than the linear scan saves.
5176 let pfd = pfds.iter().find(|p| p.fd == fd);
5177 let revents = pfd.map(|p| p.revents).unwrap_or(0);
5178 if revents & libc::POLLIN != 0 {
5179 // Ready byte arrived. Consume exactly
5180 // 1 byte and remove from pending. The
5181 // raw `libc::read` does not take
5182 // ownership of the fd — the collect
5183 // path below still owns it via the
5184 // `children` Vec and will read the
5185 // postcard tail on its own deadline.
5186 let mut byte: u8 = 0;
5187 // SAFETY: `&mut byte` is a valid
5188 // 1-byte buffer; `fd` is the report
5189 // read end the parent owns until
5190 // collect drains it. A 0 / -1 return
5191 // is treated as not-yet-ready; the
5192 // next iteration retries.
5193 let n = unsafe {
5194 libc::read(fd, &mut byte as *mut u8 as *mut libc::c_void, 1)
5195 };
5196 // n == 1 → ready byte consumed; drop
5197 // from pending.
5198 // n == 0 → POLLIN with zero-byte read
5199 // means EOF (writer closed) without
5200 // sending the byte — worker died
5201 // pre-write. Drop from pending; the
5202 // collect path emits a sentinel report.
5203 // n < 0 → transient error (EAGAIN
5204 // shouldn't happen since POLLIN
5205 // signalled readability, but on EINTR
5206 // the next iteration re-polls).
5207 if n >= 0 {
5208 return false;
5209 }
5210 // Negative return: re-check kind.
5211 let err = std::io::Error::last_os_error();
5212 if err.kind() == std::io::ErrorKind::Interrupted {
5213 return true;
5214 }
5215 tracing::warn!(
5216 %err,
5217 fd,
5218 "WorkloadHandle::stop_and_collect: barrier byte read \
5219 failed; treating worker as ready"
5220 );
5221 return false;
5222 }
5223 if revents & (libc::POLLHUP | libc::POLLERR | libc::POLLNVAL) != 0 {
5224 // Worker closed the write end without
5225 // sending the ready byte (panic during
5226 // post-fork init, or kernel-killed
5227 // before reaching the write). Drop
5228 // from pending; the collect path's
5229 // `read_to_end` returns 0 bytes and the
5230 // sentinel-report branch fires.
5231 return false;
5232 }
5233 // No POLLIN, no hangup — keep waiting.
5234 true
5235 });
5236 }
5237 }
5238 }
5239
5240 let mut reports = Vec::new();
5241 let children = std::mem::take(&mut self.children);
5242 let threads = std::mem::take(&mut self.threads);
5243
5244 // Drain the 'r' ready byte from each child's report pipe
5245 // so the collect poll below waits for the report payload, not
5246 // the already-present ready byte. The barrier section above
5247 // consumes it for auto-started workers; explicitly-started
5248 // workers (was_started=true) skip the barrier.
5249 if was_started {
5250 for child in &children {
5251 let mut byte: u8 = 0;
5252 unsafe {
5253 libc::read(
5254 child.report_fd,
5255 &mut byte as *mut u8 as *mut libc::c_void,
5256 1,
5257 );
5258 }
5259 }
5260 }
5261
5262 // Signal all fork-mode children to stop via SIGUSR1; the
5263 // signal handler flips that child's process-local STOP, which
5264 // worker_main's `stop_requested` checks read. For pcomm
5265 // containers the SIGUSR1 flips the container's STOP, which
5266 // every thread inside the container observes via `&STOP`
5267 // passed to `worker_main` — one signal stops the whole
5268 // group cleanly. `pid` is `libc::pid_t`, so it flows to
5269 // `Pid::from_raw` without the u32→i32 sign-cast wraparound
5270 // that produced `kill(-1, ...)` session-wide reaps when the
5271 // old u32 pid exceeded i32::MAX.
5272 for child in &children {
5273 let _ = nix::sys::signal::kill(
5274 nix::unistd::Pid::from_raw(child.pid),
5275 nix::sys::signal::Signal::SIGUSR1,
5276 );
5277 }
5278 // Signal all thread-mode workers by flipping each worker's
5279 // per-task `stop`. SIGUSR1 is process-wide and useless for
5280 // per-thread stop; the Arc<AtomicBool> threaded through
5281 // worker_main is the only path that reaches an individual
5282 // thread without affecting siblings.
5283 for tw in &threads {
5284 tw.stop.store(true, Ordering::Relaxed);
5285 }
5286
5287 // Collect reports with a shared 5s deadline across all workers.
5288 // Each worker gets the remaining budget, so starved workers
5289 // (e.g. under degrade mode) don't serially exhaust the VM
5290 // timeout.
5291 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(5);
5292
5293 // Bound on the post-SIGKILL worker reap (below). A worker normally
5294 // exits <<1s after SIGKILL (post-crash bypass keeps it
5295 // CFS-schedulable). The rare wedge: a sched_setaffinity flipper
5296 // stuck uninterruptible in `affine_move_task`'s `wait_for_completion`
5297 // — a migration that can't finish until the target becomes
5298 // migratable, which during the crash/bypass window can take seconds
5299 // — cannot take its pending SIGKILL until that syscall returns, so a
5300 // blocking `waitpid` would stall collect for that whole window.
5301 // The deadline is SHARED across workers: every such wedge waits out
5302 // the SAME crash/bypass window, so once one times out the rest get
5303 // the remaining (~0) budget and give up immediately rather than
5304 // summing to N×timeout. Mirrors the scheduler-side
5305 // `reap_child_bounded` (rust_init/scheduler.rs); on timeout the zombie is left
5306 // for VM reboot to reap (SIGKILL cannot make it exit faster — see
5307 // that fn's doc).
5308 const WORKER_REAP_TIMEOUT: Duration = Duration::from_secs(3);
5309 let mut worker_reap_deadline: Option<Instant> = None;
5310 // Evented, bounded reap of a SIGKILLed worker `pid`: wait up to
5311 // `timeout` for it to become a zombie (`pidfd_open` + `poll`),
5312 // then reap non-blocking. Returns false on timeout (worker still
5313 // uninterruptibly wedged — the rare case `WORKER_REAP_TIMEOUT`
5314 // documents; left for VM reboot).
5315 fn reap_pid_bounded(pid: libc::pid_t, timeout: Duration) -> bool {
5316 // Non-blocking reap of `pid` (observed StillAlive just above,
5317 // so this succeeds only once it has become a zombie).
5318 let reap = || {
5319 matches!(
5320 nix::sys::wait::waitpid(
5321 nix::unistd::Pid::from_raw(pid),
5322 Some(nix::sys::wait::WaitPidFlag::WNOHANG),
5323 ),
5324 Ok(nix::sys::wait::WaitStatus::Exited(..))
5325 | Ok(nix::sys::wait::WaitStatus::Signaled(..))
5326 )
5327 };
5328 match crate::sync::pidfd_poll_exited(pid, timeout) {
5329 // Readable => zombie => the WNOHANG reap is non-blocking.
5330 crate::sync::PidfdWait::Exited => {
5331 let _ = reap();
5332 true
5333 }
5334 // Still uninterruptibly wedged (see `WORKER_REAP_TIMEOUT`):
5335 // leave the zombie-to-be for VM reboot (SIGKILL already
5336 // pending).
5337 crate::sync::PidfdWait::TimedOut => false,
5338 // pidfd_open failed (ESRCH/gone): one non-blocking reap.
5339 crate::sync::PidfdWait::NoPidfd => reap(),
5340 }
5341 }
5342
5343 for child in children {
5344 let mut buf = Vec::new();
5345 let remaining = deadline.saturating_duration_since(std::time::Instant::now());
5346 let ms = remaining.as_millis().min(i32::MAX as u128) as i32;
5347 let mut poll_ready = false;
5348 if ms > 0 {
5349 let mut pfd = libc::pollfd {
5350 fd: child.report_fd,
5351 events: libc::POLLIN,
5352 revents: 0,
5353 };
5354 let ready = unsafe { libc::poll(&mut pfd, 1, ms) };
5355 poll_ready = ready > 0;
5356 }
5357
5358 let npid = nix::unistd::Pid::from_raw(child.pid);
5359 if !poll_ready {
5360 // Deadline expired — child didn't write a report.
5361 // Kill first so read_to_end doesn't block on the
5362 // pipe's write end (the child may have written
5363 // only the 'r' ready byte but no payload).
5364 kill_and_killpg(npid, nix::sys::signal::Signal::SIGKILL);
5365 let _ = nix::unistd::close(child.report_fd);
5366 } else {
5367 // Child responded — read the report, then kill.
5368 let mut f = unsafe { std::fs::File::from_raw_fd(child.report_fd) };
5369 let _ = f.read_to_end(&mut buf);
5370 drop(f);
5371 kill_and_killpg(npid, nix::sys::signal::Signal::SIGKILL);
5372 }
5373 let waited = nix::sys::wait::waitpid(npid, Some(nix::sys::wait::WaitPidFlag::WNOHANG));
5374 let still_running = matches!(waited, Ok(nix::sys::wait::WaitStatus::StillAlive));
5375 let exit_info_source: Result<nix::sys::wait::WaitStatus, nix::errno::Errno> =
5376 if still_running {
5377 // Bounded reap (see `reap_pid_bounded` + the shared
5378 // `worker_reap_deadline` above): replaces a blocking
5379 // `waitpid(npid, None)` that stalled on a worker
5380 // uninterruptibly wedged in an affine_move_task
5381 // migration (seconds; see `WORKER_REAP_TIMEOUT`).
5382 // exit_info stays `StillAlive` either way — the prior
5383 // blocking reap also discarded the status here.
5384 let rd = *worker_reap_deadline
5385 .get_or_insert_with(|| Instant::now() + WORKER_REAP_TIMEOUT);
5386 let remaining = rd.saturating_duration_since(Instant::now());
5387 reap_pid_bounded(child.pid, remaining);
5388 Ok(nix::sys::wait::WaitStatus::StillAlive)
5389 } else {
5390 waited
5391 };
5392
5393 // Strip the leading `b'r'` ready byte if the auto-start
5394 // barrier above did not consume it. Children write this
5395 // byte unconditionally on every success path right after
5396 // the start handshake; the barrier polls + reads it when
5397 // `stop_and_collect` auto-started children, but
5398 // explicit-start callers (`start()` invoked before
5399 // `stop_and_collect`) bypass the barrier and the byte
5400 // sits in the pipe ahead of the payload. Strip exactly 1
5401 // byte if present so the per-kind decoder sees a clean
5402 // payload either way.
5403 let report_slice: &[u8] = if buf.first() == Some(&b'r') {
5404 &buf[1..]
5405 } else {
5406 &buf[..]
5407 };
5408
5409 // Per-kind decoding. `Worker` carries a single postcard
5410 // `WorkerReport`. `PcommContainer` carries
5411 // a `serde_json` `Vec<WorkerReport>` — serde_json for
5412 // the pcomm container; fork-mode workers use postcard
5413 // for the single `WorkerReport`. Both
5414 // payloads ride the EOF-terminated pipe with no length
5415 // prefix; the parent's `read_to_end` provides framing.
5416 // On a decode failure, emit one sentinel per expected
5417 // report so downstream consumers (per-group filtering,
5418 // assert_not_starved) see the correct cardinality.
5419 match child.kind {
5420 ForkedChildKind::Worker { group_idx } => {
5421 let decoded: Result<WorkerReport, _> = postcard::from_bytes(report_slice);
5422 if let Ok(report) = decoded {
5423 reports.push(report);
5424 } else {
5425 let exit_info = classify_wait_outcome(exit_info_source);
5426 eprintln!(
5427 "ktstr: worker pid={} returned no report ({} bytes read, exit={exit_info:?})",
5428 child.pid,
5429 buf.len(),
5430 );
5431 reports.push(WorkerReport {
5432 tid: child.pid,
5433 group_idx,
5434 exit_info: Some(exit_info),
5435 ..WorkerReport::default()
5436 });
5437 }
5438 }
5439 ForkedChildKind::PcommContainer { groups } => {
5440 let total_workers: usize = groups.iter().map(|(_, n)| n).sum();
5441 let decoded: Result<Vec<WorkerReport>, _> =
5442 serde_json::from_slice(report_slice);
5443 match decoded {
5444 Ok(mut decoded_reports) if decoded_reports.len() == total_workers => {
5445 reports.append(&mut decoded_reports);
5446 }
5447 Ok(mut decoded_reports) => {
5448 // Cardinality mismatch — surface the
5449 // partial set we did get and pad with
5450 // sentinels so the total report count
5451 // still equals `total_workers`. A short
5452 // payload typically signals the
5453 // thread-group leader died mid-encode
5454 // (panic in `serde_json::to_vec`, OOM
5455 // during Vec growth, or a write_all
5456 // truncated by SIGKILL escalation).
5457 // Sentinels are tagged with the right
5458 // per-group `group_idx` using the
5459 // `groups` layout: groups[0] consumed
5460 // the first groups[0].1 slots,
5461 // groups[1] the next groups[1].1, etc.
5462 // We emit sentinels for the trailing
5463 // missing slots, computing each
5464 // sentinel's group_idx by walking the
5465 // layout from `got` forward.
5466 let exit_info = classify_wait_outcome(exit_info_source);
5467 eprintln!(
5468 "ktstr: pcomm thread-group leader pid={} returned {} of {} reports ({} bytes read, exit={exit_info:?})",
5469 child.pid,
5470 decoded_reports.len(),
5471 total_workers,
5472 buf.len(),
5473 );
5474 // Surplus reports (decoded > total_workers)
5475 // must not leak into the parent's report
5476 // stream — downstream cardinality assertions
5477 // (per-group filtering, assert_not_starved)
5478 // assume exactly `total_workers` entries.
5479 // Truncate to the layout's total before the
5480 // `got..total_workers` loop runs (which is
5481 // a no-op for the surplus case after this).
5482 decoded_reports.truncate(total_workers);
5483 let got = decoded_reports.len();
5484 for r in decoded_reports {
5485 reports.push(r);
5486 }
5487 // Compute group_idx for each missing
5488 // slot by walking the layout. `slot` is
5489 // a 0-based global index from `got` to
5490 // `total_workers - 1`; we accumulate
5491 // per-group counts to find which group
5492 // owns each slot.
5493 for slot in got..total_workers {
5494 let mut acc = 0usize;
5495 let mut g_idx = 0usize;
5496 for &(gi, n) in &groups {
5497 if slot < acc + n {
5498 g_idx = gi;
5499 break;
5500 }
5501 acc += n;
5502 }
5503 reports.push(WorkerReport {
5504 tid: child.pid,
5505 group_idx: g_idx,
5506 exit_info: Some(exit_info.clone()),
5507 ..WorkerReport::default()
5508 });
5509 }
5510 }
5511 Err(_) => {
5512 let exit_info = classify_wait_outcome(exit_info_source);
5513 eprintln!(
5514 "ktstr: pcomm thread-group leader pid={} returned no decodable report ({} bytes read, exit={exit_info:?})",
5515 child.pid,
5516 buf.len(),
5517 );
5518 // Total decode failure — emit one
5519 // sentinel per slot, tagging each with
5520 // the correct group_idx.
5521 for &(gi, n) in &groups {
5522 for _ in 0..n {
5523 reports.push(WorkerReport {
5524 tid: child.pid,
5525 group_idx: gi,
5526 exit_info: Some(exit_info.clone()),
5527 ..WorkerReport::default()
5528 });
5529 }
5530 }
5531 }
5532 }
5533 }
5534 }
5535 }
5536
5537 // Thread-mode collection: join each worker's JoinHandle
5538 // (with the [`THREAD_JOIN_TIMEOUT`] budget) and adopt the
5539 // returned [`WorkerReport`]. Per-worker `stop` was flipped
5540 // above; the worker observes it in worker_main's
5541 // `stop.load(Relaxed)` checks (max ~100ms latency from the
5542 // FUTEX_WAIT_TIMEOUT poll cadence). Three outcomes:
5543 //
5544 // 1. Ok(report): join returned the worker's WorkerReport.
5545 // Push as-is.
5546 // 2. Err(payload): the thread panicked. Build a sentinel
5547 // report and attach
5548 // `exit_info: Some(WorkerExitInfo::Panicked(msg))`
5549 // where `msg` comes from `extract_panic_payload`.
5550 // 3. Timeout (5s elapsed without is_finished): emit a
5551 // tracing::warn and push a sentinel with
5552 // `exit_info: Some(WorkerExitInfo::TimedOut)` —
5553 // `worker_main` should have observed the per-worker
5554 // `stop` within 100ms, so a 5s no-show signals a
5555 // genuinely stuck worker (deadlock, infinite spin,
5556 // blocking syscall the runtime can't interrupt).
5557 // stop_and_collect does NOT process::exit on timeout —
5558 // the orphan thread keeps running until the test
5559 // harness exits, but any subsequent worker uses a
5560 // fresh per-worker `stop` so the orphan can't pollute
5561 // later runs.
5562 for mut tw in threads {
5563 // Drop start_tx (idempotent — `start()` may have already
5564 // taken it). If start() ran first, `start_tx` is
5565 // already `None` and the take is a no-op; if the caller
5566 // skipped start() entirely, dropping start_tx here
5567 // signals the worker via `Disconnected` so it exits
5568 // cleanly without the rendezvous send.
5569 tw.start_tx.take();
5570 let tid = tw.tid.load(Ordering::Acquire);
5571 if let Some(j) = tw.join.take() {
5572 match join_thread_with_timeout(j, &tw.exit_evt, THREAD_JOIN_TIMEOUT) {
5573 Some(Ok(report)) => reports.push(report),
5574 Some(Err(payload)) => {
5575 let msg = extract_panic_payload(payload);
5576 eprintln!("ktstr: thread worker tid={tid} panicked: {msg}");
5577 reports.push(WorkerReport {
5578 tid,
5579 completed: false,
5580 exit_info: Some(WorkerExitInfo::Panicked(msg)),
5581 ..WorkerReport::default()
5582 });
5583 }
5584 None => {
5585 tracing::warn!(
5586 tid,
5587 timeout_secs = THREAD_JOIN_TIMEOUT.as_secs(),
5588 "thread worker did not join within timeout — leaking the \
5589 thread; sentinel report attached with TimedOut exit_info"
5590 );
5591 reports.push(WorkerReport {
5592 tid,
5593 completed: false,
5594 exit_info: Some(WorkerExitInfo::TimedOut),
5595 ..WorkerReport::default()
5596 });
5597 }
5598 }
5599 }
5600 }
5601
5602 reports
5603 }
5604
5605 /// Deliver the terminal stop to every live worker without
5606 /// reaping. Signal-only: NO `waitpid`, NO fd close, NO consume of
5607 /// `self`, no blocking of any kind. Reaping, the fd/region
5608 /// cleanup, and (for `stop_and_collect`) report decoding all
5609 /// belong to the subsequent [`Self::stop_and_collect`] or
5610 /// [`Drop`] — doing any of them here would double-reap. This only
5611 /// pre-delivers the terminal signal so that reap finds workers
5612 /// already exiting.
5613 ///
5614 /// # Why (fork mode — the case this exists for)
5615 ///
5616 /// When the sched_ext scheduler under test errors out, the kernel
5617 /// disables sched_ext and its tasks fall back to the builtin
5618 /// scheduler, where CPU-bound workers keep spinning under
5619 /// contention. The normal reap in `stop_and_collect` SIGKILLs one
5620 /// worker then *blocks* in `waitpid` for it while the rest have
5621 /// had only the cooperative SIGUSR1 and keep spinning, so each
5622 /// dying worker must win a CPU slice against every still-runnable
5623 /// sibling before it can run its exit path — the serial reap
5624 /// becomes scheduling-gated and teardown time scales with worker
5625 /// count. `kill_and_killpg(SIGKILL)` here is delivered, not
5626 /// cooperative: once every worker carries a pending SIGKILL none
5627 /// re-enters userspace, so no worker competes for the CPU while
5628 /// the reap drains them, and each killed worker's report-pipe
5629 /// write end closes so the reap's `poll` returns `POLLHUP`
5630 /// immediately instead of waiting out its deadline. One
5631 /// `kill_and_killpg` per `ForkedChild` covers both a plain worker
5632 /// and a pcomm container: SIGKILL on any thread is fatal to the
5633 /// whole thread group, and `killpg` additionally reaches
5634 /// descendants — every forked child is its own process-group
5635 /// leader via `setpgid(0, 0)`, so pgid == child pid (mirrors the
5636 /// `killpg` in [`Drop`]).
5637 ///
5638 /// # Thread mode
5639 ///
5640 /// Thread-mode workers share the harness tgid and cannot be
5641 /// signalled individually, so there is NO SIGKILL fast path for
5642 /// them. This flips each worker's cooperative `stop` flag — the
5643 /// same store `stop_and_collect` and [`Drop`] already do, just
5644 /// earlier — and the worker still only observes it on its next
5645 /// futex-wait wake (`WORKER_STOP_POLL_NS`, ~100 ms). It is
5646 /// therefore NOT a teardown speedup for thread mode; it is kept
5647 /// for symmetry and is cheap and harmless (an idempotent relaxed
5648 /// store). The win is fork-mode only, which is where the
5649 /// SIGKILL-vs-cooperative distinction exists.
5650 ///
5651 /// # Frozen cgroups
5652 ///
5653 /// A cgroup-frozen worker is not a special case here. The freezer
5654 /// parks a userspace task in `TASK_INTERRUPTIBLE` (the jobctl
5655 /// freezer trap), so a fatal signal wakes it and it exits on its
5656 /// next `get_signal` — SIGKILL kills a frozen worker with no
5657 /// unfreeze required. (`collect_step` / `collect_backdrop` still
5658 /// unfreeze each cgroup as their first teardown step for reasons of
5659 /// their own; this method does not depend on that.)
5660 pub(crate) fn sigkill_workers(&self) {
5661 for child in &self.children {
5662 kill_and_killpg(
5663 nix::unistd::Pid::from_raw(child.pid),
5664 nix::sys::signal::Signal::SIGKILL,
5665 );
5666 }
5667 for tw in &self.threads {
5668 tw.stop.store(true, Ordering::Relaxed);
5669 }
5670 }
5671}
5672
5673impl Drop for WorkloadHandle {
5674 fn drop(&mut self) {
5675 use nix::sys::signal::{Signal, kill};
5676 use nix::sys::wait::waitpid;
5677 use nix::unistd::{Pid, close};
5678
5679 // Forked children (conventional workers AND pcomm
5680 // containers). `pid` is `libc::pid_t` — stored as i32 so
5681 // `Pid::from_raw` receives the kernel's native representation
5682 // directly, not the sign-cast of a u32 that could alias
5683 // negative values (including -1, i.e. every process in the
5684 // session).
5685 for child in &self.children {
5686 let pid = child.pid;
5687 let nix_pid = Pid::from_raw(pid);
5688 // killpg first: reach descendants the child may have
5689 // forked (Custom workloads, ForkExit caught mid-fork).
5690 // pgid == child pid because every forked child (worker
5691 // or pcomm container) calls `setpgid(0, 0)` at fork time.
5692 // ESRCH (group gone / no members) is expected and not a
5693 // warning-worthy failure; swallow it to keep the log
5694 // clean when the common no-descendants case drops.
5695 if let Err(e) = nix::sys::signal::killpg(nix_pid, Signal::SIGKILL)
5696 && e != nix::errno::Errno::ESRCH
5697 {
5698 tracing::warn!(pid, %e, "killpg failed in WorkloadHandle::drop");
5699 }
5700 if let Err(e) = kill(nix_pid, Signal::SIGKILL) {
5701 tracing::warn!(pid, %e, "kill failed in WorkloadHandle::drop");
5702 }
5703 if let Err(e) = waitpid(nix_pid, None) {
5704 tracing::warn!(pid, %e, "waitpid failed in WorkloadHandle::drop");
5705 }
5706 for fd in [child.report_fd, child.start_fd] {
5707 if fd >= 0
5708 && let Err(e) = close(fd)
5709 {
5710 tracing::warn!(fd, %e, "close failed in WorkloadHandle::drop");
5711 }
5712 }
5713 }
5714 // Thread-mode workers: flip stop, drop start_tx (in case
5715 // worker hasn't yet recv'd), join with the same 5s budget
5716 // `stop_and_collect` uses. Threads share the parent's
5717 // address space — there is no `kill` equivalent and no
5718 // MAP_SHARED ownership to give back. Drop still applies
5719 // the timeout so a stuck worker doesn't pin
5720 // `WorkloadHandle::drop` indefinitely; on timeout we log
5721 // the leak via `tracing::warn!` and proceed.
5722 let threads = std::mem::take(&mut self.threads);
5723 for mut tw in threads {
5724 tw.stop.store(true, Ordering::Relaxed);
5725 tw.start_tx.take();
5726 if let Some(j) = tw.join.take() {
5727 let tid = tw.tid.load(Ordering::Acquire);
5728 match join_thread_with_timeout(j, &tw.exit_evt, THREAD_JOIN_TIMEOUT) {
5729 Some(Ok(_)) => {}
5730 Some(Err(e)) => {
5731 let payload = extract_panic_payload(e);
5732 tracing::warn!(
5733 tid,
5734 payload,
5735 "thread worker panicked in WorkloadHandle::drop"
5736 );
5737 }
5738 None => {
5739 tracing::warn!(
5740 tid,
5741 timeout_secs = THREAD_JOIN_TIMEOUT.as_secs(),
5742 "thread worker failed to join within timeout in \
5743 WorkloadHandle::drop — leaking the thread"
5744 );
5745 }
5746 }
5747 }
5748 }
5749 // Close inter-worker pipe pairs and chain pipes AFTER worker
5750 // shutdown. Ordering matters for Thread mode: every worker
5751 // thread shares the parent's fd table, so closing a pipe fd
5752 // before its using thread joins would surface to that thread
5753 // as `EBADF` on the next read/write/poll syscall. The
5754 // children-reap loop above and the threads-join loop above
5755 // both block until their worker is reaped or joined; only
5756 // then do these closes run, which is when the workers are
5757 // guaranteed to no longer touch their fds. Fork mode is
5758 // unaffected either way: each child held its own fd-table
5759 // copy via `fork()`, so this close is a no-op for the
5760 // child's view (its own copy was closed by the post-fork
5761 // close-other-fds block in spawn_group).
5762 //
5763 // Errors from `close` are logged via `tracing::warn!` rather
5764 // than swallowed — `EBADF` here would indicate a double-close
5765 // (an aliased ownership bug) and is more diagnostic than the
5766 // SpawnGuard early-bail path's silent close. SpawnGuard's
5767 // Drop swallows EBADF deliberately because mid-spawn the
5768 // guard may share fd ownership with already-closed
5769 // half-allocated state; the handle on the other hand has
5770 // sole ownership at this point.
5771 for (ab, ba) in &self.pipe_pairs {
5772 for fd in [ab[0], ab[1], ba[0], ba[1]] {
5773 if let Err(e) = close(fd) {
5774 tracing::warn!(fd, %e, "close failed for pipe_pair fd in WorkloadHandle::drop");
5775 }
5776 }
5777 }
5778 for chain in &self.chain_pipes {
5779 for pipe in chain {
5780 for fd in [pipe[0], pipe[1]] {
5781 if let Err(e) = close(fd) {
5782 tracing::warn!(fd, %e, "close failed for chain_pipe fd in WorkloadHandle::drop");
5783 }
5784 }
5785 }
5786 }
5787 for (&ptr, &size) in self.futex_ptrs.iter().zip(self.futex_region_sizes.iter()) {
5788 unsafe {
5789 libc::munmap(ptr as *mut libc::c_void, size);
5790 }
5791 }
5792 if !self.iter_counters.is_null() && self.iter_counter_len > 0 {
5793 unsafe {
5794 libc::munmap(
5795 self.iter_counters as *mut libc::c_void,
5796 self.iter_counter_len * std::mem::size_of::<u64>(),
5797 );
5798 }
5799 }
5800 if !self.phase_epoch.is_null() && self.phase_epoch_bytes > 0 {
5801 unsafe {
5802 libc::munmap(
5803 self.phase_epoch as *mut libc::c_void,
5804 self.phase_epoch_bytes,
5805 );
5806 }
5807 }
5808 }
5809}
5810
5811/// SIGUSR1 handler installed in the fork-mode child post-fork. Flips
5812/// the per-process global [`STOP`] so `worker_main`'s outer loop
5813/// exits at the next `stop_requested` check.
5814pub(super) extern "C" fn sigusr1_handler(_: libc::c_int) {
5815 STOP.store(true, Ordering::Relaxed);
5816}
5817
5818/// Render an actionable hint for a failed
5819/// `mmap(MAP_SHARED | MAP_ANONYMOUS)` call based on the observed
5820/// `errno`. Shared between the futex-region mmap and the
5821/// iter_counters mmap in [`WorkloadHandle::spawn`] so the two
5822/// sites emit identical hint text per errno — a drift would mean
5823/// two related failures produce inconsistent remediation advice.
5824///
5825/// Takes `Option<i32>` (the output of `std::io::Error::raw_os_error`)
5826/// so an unrecognised errno folds cleanly through the `_ => ""`
5827/// arm without forcing callers to `unwrap`.
5828///
5829/// The leading space on every non-empty arm lets callers format
5830/// as `"...failed: {errno}{hint};"` without having to add a
5831/// conditional separator — an empty hint disappears cleanly.
5832pub(super) fn mmap_shared_anon_errno_hint(errno: Option<i32>) -> &'static str {
5833 match errno {
5834 Some(libc::ENOMEM) => {
5835 " (ENOMEM: host is out of memory \
5836 or /proc/sys/vm/max_map_count is too low — \
5837 check `sysctl vm.max_map_count` and `free -h`)"
5838 }
5839 Some(libc::EPERM) => {
5840 " (EPERM: MAP_SHARED|MAP_ANONYMOUS \
5841 rejected by the kernel — check memory cgroup \
5842 limits and container seccomp policy)"
5843 }
5844 Some(libc::EINVAL) => {
5845 " (EINVAL: invalid length or \
5846 flag combination — verify num_workers > 0 so the \
5847 region size is non-zero, and that the total size \
5848 does not overflow usize)"
5849 }
5850 _ => "",
5851 }
5852}
5853
5854#[cfg(test)]
5855mod testing;
5856#[cfg(test)]
5857mod tests_composed;
5858#[cfg(test)]
5859mod tests_fan_out;
5860#[cfg(test)]
5861mod tests_futex;
5862#[cfg(test)]
5863mod tests_grandchild;
5864#[cfg(test)]
5865mod tests_idle_churn;
5866#[cfg(test)]
5867mod tests_integration;
5868#[cfg(test)]
5869mod tests_lifecycle;
5870#[cfg(test)]
5871mod tests_mempolicy;
5872#[cfg(test)]
5873mod tests_misc;
5874#[cfg(test)]
5875mod tests_pcomm;
5876#[cfg(test)]
5877mod tests_sched_policy;
5878#[cfg(test)]
5879mod tests_spawn_guard;
5880#[cfg(test)]
5881mod tests_thread_mode;
5882#[cfg(test)]
5883mod tests_wake_chain;