ktstr/scenario/
payload_run.rs

1//! Runtime builder for launching a [`Payload`] from a test body.
2//!
3//! `ctx.payload(&X)` returns a [`PayloadRun`] whose chainable
4//! methods configure args, checks, and cgroup placement before the
5//! terminal `.run()` (foreground) or `.spawn()` (background)
6//! executes the binary inside the guest VM.
7//!
8//! `.run()` blocks until the child exits and returns
9//! `Result<(AssertResult, PayloadMetrics)>`. The builder is a pure
10//! guest-side std::process::Child wrapper — no cross-VM proxy.
11//!
12//! `PayloadKind::Scheduler` payloads are rejected at `.run()`:
13//! schedulers are launched by the framework at test start, not by
14//! test-body invocation. Only `PayloadKind::Binary` payloads are
15//! runnable via this builder.
16//!
17//! Args composition:
18//! 1. `payload.default_args` unless `.clear_args()` was called.
19//! 2. Plus any runtime `.arg(...)` / `.args(...)` appends.
20//!
21//! Checks composition is identical in shape.
22//!
23//! # Stdout-primary, stderr-fallback metric extraction
24//!
25//! The extraction pipeline runs `extract_metrics`
26//! against **stdout first**. When that returns an empty metric set
27//! AND stderr is non-empty, the extractor retries against stderr.
28//! This preserves the stdout-primary contract for well-behaved
29//! binaries (noisy stderr never corrupts the metric stream) while
30//! still handling payloads that emit their structured output only on
31//! stderr — e.g. schbench's default percentile tables via
32//! `show_latencies` → `fprintf(stderr, ...)`. The two streams are
33//! never merged: concurrent drain threads for stdout/stderr provide
34//! no ordering guarantee, so interleaving would corrupt any document
35//! whose bytes span both streams.
36//!
37//! Stderr is still forwarded verbatim into the exit-code-mismatch
38//! detail produced by [`MetricCheck::ExitCodeEq`] (see the
39//! `format_exit_mismatch` path) so failing binaries surface their
40//! error output directly.
41
42use std::borrow::Cow;
43use std::path::PathBuf;
44use std::sync::OnceLock;
45use std::sync::atomic::{AtomicUsize, Ordering};
46use std::thread::ThreadId;
47
48use std::time::Duration;
49
50use anyhow::{Context, Result, anyhow};
51
52use crate::assert::{AssertDetail, AssertResult, DetailKind};
53use crate::scenario::Ctx;
54use crate::test_support::{
55    Metric, MetricCheck, Payload, PayloadKind, PayloadMetrics, extract_metrics,
56};
57
58/// Per-process monotonic counter for payload-invocation indexing.
59///
60/// Increments once per `.run()` / `.wait()` / `.kill()` /
61/// `.try_wait()` terminal call (whichever produces the
62/// [`crate::test_support::PayloadMetrics`] emission). Each guest VM is a fresh process, so
63/// the counter starts at 0 every test boot. Stamped onto
64/// [`PayloadMetrics::payload_index`] so the host preserves
65/// per-invocation identity independent of SHM emission order.
66///
67/// `Ordering::Relaxed` is sufficient: the counter's only consumer
68/// is the same thread that incremented (the emit happens inside
69/// the calling thread's `.run()` / `.wait()` / etc.), and the only
70/// invariant required is "every call returns a unique value." A
71/// future multi-thread caller would need `Ordering::SeqCst` only
72/// if it wanted total ordering; `fetch_add(1, Relaxed)` already
73/// guarantees unique values across threads.
74static PAYLOAD_INVOCATION_COUNTER: AtomicUsize = AtomicUsize::new(0);
75
76/// Allocate the next per-invocation index. See
77/// [`PAYLOAD_INVOCATION_COUNTER`] for the lifecycle and ordering
78/// rationale.
79fn next_payload_index() -> usize {
80    PAYLOAD_INVOCATION_COUNTER.fetch_add(1, Ordering::Relaxed)
81}
82
83/// Builder returned by [`Ctx::payload`](crate::scenario::Ctx).
84///
85/// Configure the run via chainable methods, then invoke `.run()`
86/// (foreground, blocking) or `.spawn()` (background) to execute the
87/// payload's binary inside the guest VM and receive the extracted
88/// [`crate::test_support::PayloadMetrics`] plus an [`AssertResult`] for any declared
89/// [`MetricCheck`]s.
90pub struct PayloadRun<'a> {
91    ctx: &'a Ctx<'a>,
92    payload: &'static Payload,
93    /// Effective argv. Initialized to `payload.default_args` on
94    /// construction; `.arg`/`.args` append, `.clear_args` truncates.
95    args: Vec<String>,
96    /// Effective check list. Initialized to `payload.default_checks`;
97    /// `.check` appends, `.clear_checks` truncates.
98    checks: Vec<MetricCheck>,
99    /// User-supplied relative cgroup name (from the `in_cgroup` arg). The
100    /// absolute path is resolved + validated at `.run()`/`.spawn()`.
101    /// [`Cow`] keeps static-name callers zero-alloc while still
102    /// accepting owned Strings from dynamic call sites.
103    cgroup: Option<Cow<'static, str>>,
104    /// Optional runtime bound for the foreground `.run()` path. `None`
105    /// means wait indefinitely; `Some(duration)` arms a deadline
106    /// watchdog that SIGKILLs the payload's process group if it has
107    /// not exited by the deadline. Set via [`timeout`](Self::timeout).
108    /// Ignored by `.spawn()` — background handles manage their own
109    /// lifetime via [`PayloadHandle::wait`] / `.kill()` / `.try_wait()`.
110    timeout: Option<Duration>,
111}
112
113impl std::fmt::Debug for PayloadRun<'_> {
114    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
115        f.debug_struct("PayloadRun")
116            .field("payload", &self.payload.name)
117            .field("args_len", &self.args.len())
118            .field("checks_len", &self.checks.len())
119            .field("cgroup", &self.cgroup)
120            .field("timeout", &self.timeout)
121            .finish()
122    }
123}
124
125impl<'a> PayloadRun<'a> {
126    pub(crate) fn new(ctx: &'a Ctx<'a>, payload: &'static Payload) -> Self {
127        let args = payload.default_args.iter().map(|s| s.to_string()).collect();
128        let checks = payload.default_checks.to_vec();
129        Self {
130            ctx,
131            payload,
132            args,
133            checks,
134            cgroup: None,
135            timeout: None,
136        }
137    }
138
139    /// Append one CLI argument to the effective argv.
140    #[must_use = "builder methods consume self; bind the result"]
141    pub fn arg(mut self, arg: impl Into<String>) -> Self {
142        self.args.push(arg.into());
143        self
144    }
145
146    /// Append multiple CLI arguments to the effective argv.
147    #[must_use = "builder methods consume self; bind the result"]
148    pub fn args<I, S>(mut self, args: I) -> Self
149    where
150        I: IntoIterator<Item = S>,
151        S: Into<String>,
152    {
153        self.args.extend(args.into_iter().map(Into::into));
154        self
155    }
156
157    /// Wipe ALL args (both `payload.default_args` and any prior
158    /// `.arg()` calls). Subsequent `.arg()` calls start from empty.
159    #[must_use = "builder methods consume self; bind the result"]
160    pub fn clear_args(mut self) -> Self {
161        self.args.clear();
162        self
163    }
164
165    /// Append a [`MetricCheck`] to the effective check list.
166    #[must_use = "builder methods consume self; bind the result"]
167    pub fn check(mut self, c: MetricCheck) -> Self {
168        self.checks.push(c);
169        self
170    }
171
172    /// Wipe ALL checks (both `payload.default_checks` and any prior
173    /// `.check()` calls).
174    #[must_use = "builder methods consume self; bind the result"]
175    pub fn clear_checks(mut self) -> Self {
176        self.checks.clear();
177        self
178    }
179
180    /// Place the spawned child in the named cgroup (a plain name,
181    /// resolved relative to `ctx.cgroups.parent_path()`). When
182    /// omitted, the child inherits the spawning process's cgroup.
183    ///
184    /// Accepts `&'static str` (zero-alloc, the common case of a
185    /// const cgroup name) or any owned string type via [`Cow`]'s
186    /// `From` impls.
187    ///
188    /// The name is validated at `.run()`/`.spawn()` — leading `/`
189    /// is stripped, `..` and NUL bytes are rejected.
190    #[must_use = "builder methods consume self; bind the result"]
191    pub fn in_cgroup(mut self, name: impl Into<Cow<'static, str>>) -> Self {
192        self.cgroup = Some(name.into());
193        self
194    }
195
196    /// Bound `.run()`'s wait for the payload to exit. `None` (the
197    /// default when `.timeout` is not called) waits indefinitely —
198    /// suitable for payloads whose runtime is bounded internally
199    /// (schbench `-r 10`, fio `--runtime`, ...). `Some(duration)`
200    /// arms a deadline watchdog inside `.run()` that SIGKILLs the
201    /// payload's whole process group if it has not exited by the
202    /// deadline. Ignored by `.spawn()` — background handles manage
203    /// their own timing.
204    ///
205    /// The builder shape keeps `.run()` zero-arg so non-timeout
206    /// call sites read naturally, and leaves room for future
207    /// knobs (per-test environment, stdin, …) without another
208    /// signature break.
209    #[must_use = "builder methods consume self; bind the result"]
210    pub fn timeout(mut self, duration: Duration) -> Self {
211        self.timeout = Some(duration);
212        self
213    }
214
215    /// Blocking foreground run. Spawns the payload binary, waits
216    /// for it to exit, extracts metrics from its output per the
217    /// payload's [`OutputFormat`](crate::test_support::OutputFormat) (stdout-primary with stderr
218    /// fallback for `Json`; no extraction for
219    /// `ExitCode`), and evaluates declared [`MetricCheck`]s into an
220    /// [`AssertResult`]. See the module-level
221    /// `# Stdout-primary, stderr-fallback metric extraction`
222    /// section for the full contract.
223    ///
224    /// Runtime is bounded by the value set via
225    /// [`timeout`](Self::timeout). When the deadline expires,
226    /// `kill_payload_process_group` fires and the returned
227    /// `(AssertResult, PayloadMetrics)` reflects the captured
228    /// output plus the killed-child exit code; `status.code()`
229    /// returns `None` for a SIGKILL'd child, which
230    /// `spawn_and_wait` surfaces as `exit_code = -1` in
231    /// `SpawnOutput`. The timeout case is not an error — the
232    /// caller can still inspect metrics collected before the kill.
233    /// A post-kill drain failure is reported as `Err` (wraps the
234    /// original I/O error with "drain after timeout of N"); the
235    /// caller loses no output that was already captured because
236    /// the partial reader-thread buffers have been consumed in
237    /// the error path too.
238    ///
239    /// Metrics are also recorded to the per-test sidecar via the
240    /// SHM ring; the returned tuple is a convenience view of the
241    /// same values.
242    ///
243    /// Returns `Err` when the payload is not
244    /// [`PayloadKind::Binary`] (schedulers are framework-launched,
245    /// not test-body-launched), when the cgroup name fails
246    /// validation, when the spawn itself fails, or when post-kill
247    /// drain fails (see the timeout paragraph).
248    pub fn run(self) -> Result<(AssertResult, PayloadMetrics)> {
249        let binary = payload_binary(self.payload)?;
250        let cgroup_path = resolve_cgroup_path(self.ctx, self.cgroup.as_deref(), "PayloadRun::run")?;
251        let cgroup_arg: Option<(&str, &std::path::Path)> = cgroup_path.as_deref().map(|path| {
252            (
253                self.cgroup
254                    .as_deref()
255                    .expect("cgroup_path Some ⇒ cgroup name Some"),
256                path,
257            )
258        });
259        let output = spawn_and_wait(
260            binary,
261            &self.args,
262            cgroup_arg,
263            self.timeout,
264            self.payload.uses_parent_pgrp,
265            self.ctx.cgroups,
266        )
267        .with_context(|| format!("spawn payload '{}'", self.payload.name))?;
268        Ok(evaluate(self.payload, &self.checks, output))
269    }
270
271    /// Spawn the payload binary in the background and return a
272    /// [`PayloadHandle`] the caller can `.wait()`, `.kill()`, or
273    /// `.try_wait()` on.
274    ///
275    /// The child runs in the guest's process namespace (all ktstr
276    /// tests execute inside the VM); `PayloadHandle` is a thin
277    /// wrapper over [`std::process::Child`]. No cross-VM proxy.
278    ///
279    /// Dropping the handle without first calling one of the waiters
280    /// emits a stderr warning and SIGKILLs the child — leaked
281    /// handles would lose metrics and potentially outlive the test.
282    ///
283    /// Returns `Err` when the payload is not
284    /// [`PayloadKind::Binary`] or when the spawn itself fails.
285    pub fn spawn(self) -> Result<PayloadHandle> {
286        let binary = payload_binary(self.payload)?;
287        let cgroup_path =
288            resolve_cgroup_path(self.ctx, self.cgroup.as_deref(), "PayloadRun::spawn")?;
289        let cgroup_arg: Option<(&str, &std::path::Path)> = cgroup_path.as_deref().map(|path| {
290            (
291                self.cgroup
292                    .as_deref()
293                    .expect("cgroup_path Some ⇒ cgroup name Some"),
294                path,
295            )
296        });
297        let (child, sigchld) = spawn_child(
298            binary,
299            &self.args,
300            cgroup_arg,
301            self.payload.uses_parent_pgrp,
302            self.ctx.cgroups,
303        )
304        .with_context(|| format!("spawn payload '{}'", self.payload.name))?;
305        Ok(PayloadHandle {
306            child: Some(child),
307            payload: self.payload,
308            checks: self.checks,
309            _sigchld: sigchld,
310        })
311    }
312}
313
314/// Unwrap [`PayloadKind::Binary`] to its binary name, erroring when
315/// a scheduler-kind payload is passed.
316fn payload_binary(payload: &Payload) -> Result<&'static str> {
317    match payload.kind {
318        PayloadKind::Binary(name) => Ok(name),
319        PayloadKind::Scheduler(_) => anyhow::bail!(
320            "ctx.payload({}) called on a scheduler-kind payload; \
321             schedulers are launched by the test framework, not from \
322             the test body. Use ctx.payload(&BINARY_PAYLOAD) instead.",
323            payload.name,
324        ),
325    }
326}
327
328/// Common post-exit pipeline: extract metrics, resolve polarities,
329/// evaluate checks. Shared between foreground `.run()` and
330/// background handle `wait`/`kill` paths. The [`crate::test_support::PayloadMetrics`] is
331/// serialized to the guest-to-host SHM ring here — once per
332/// invocation — so the host can reconstruct per-call provenance in
333/// the sidecar without any Ctx-side accumulator.
334///
335/// # Per-format behavior
336///
337/// `OutputFormat::ExitCode` and `OutputFormat::Json` extract
338/// in-process: stdout-primary with a stderr fallback when stdout
339/// yields an empty metric set. The streams are never concatenated —
340/// the two drain threads in [`wait_and_capture`] run concurrently and
341/// provide no ordering guarantee, so a merged blob would corrupt any
342/// document whose bytes span both. Stderr is still passed separately
343/// to [`evaluate_checks`] so the exit-code-mismatch detail renders
344/// stderr without stdout prefix.
345fn evaluate(
346    payload: &Payload,
347    checks: &[MetricCheck],
348    output: SpawnOutput,
349) -> (AssertResult, PayloadMetrics) {
350    // `extract_metrics` is infallible for both `ExitCode` and `Json`
351    // (the two OutputFormat variants); the `Result` return type is
352    // preserved only for a uniform call signature.
353    let stdout_result = extract_metrics(
354        &output.stdout,
355        &payload.output,
356        crate::test_support::MetricStream::Stdout,
357    );
358    let mut metrics = stdout_result.unwrap_or_default();
359    if metrics.is_empty() && !output.stderr.is_empty() {
360        // Stderr fallback — runs only when stdout produced no
361        // metrics. Variant-agnostic by design:
362        //
363        // * `ExitCode`: `extract_metrics` returns `Ok(vec![])` on
364        //   both stdout and stderr for this variant (no parsing
365        //   path), so running the fallback is a no-op — no stored
366        //   state, no wasted work beyond one function call. A
367        //   per-variant gate would be complexity without behavioral
368        //   difference.
369        // * `Json`: BENEFITS from the fallback. The motivating case
370        //   is schbench-like payloads that write structured output
371        //   to stderr only (see `SchbenchJsonPayload` in
372        //   tests/common/fixtures.rs for the long-form rationale).
373        //
374        // The streams are never merged — fallback replaces, not
375        // concatenates — so an upstream that genuinely writes to
376        // both stdout and stderr gets only the stdout metrics,
377        // which matches the "well-behaved binaries keep stdout
378        // canonical" language on the `OutputFormat` doc.
379        let stderr_result = extract_metrics(
380            &output.stderr,
381            &payload.output,
382            crate::test_support::MetricStream::Stderr,
383        );
384        if let Ok(m) = stderr_result {
385            metrics = m;
386        }
387    }
388    resolve_polarities(&mut metrics, payload);
389
390    let payload_metrics = PayloadMetrics {
391        payload_index: next_payload_index(),
392        metrics,
393        exit_code: output.exit_code,
394    };
395
396    emit_payload_metrics(&payload_metrics);
397
398    let result = evaluate_checks(checks, &payload_metrics, &output.stderr);
399    (result, payload_metrics)
400}
401
402/// Emit a [`crate::test_support::PayloadMetrics`] on the guest-to-host bulk data channel
403/// (virtio-console port 1) under
404/// `MSG_TYPE_PAYLOAD_METRICS`(crate::vmm::wire::MSG_TYPE_PAYLOAD_METRICS).
405///
406/// The encoding (postcard v1) and the bulk-port fire-and-forget
407/// semantics live inside
408/// [`crate::vmm::guest_comms::send_payload_metrics`]; this thin
409/// wrapper exists only so the call site reads as the post-extraction
410/// emit step rather than reaching across modules. Backpressure is
411/// handled inside `write_msg`: a busy port-1 virtqueue blocks the
412/// writer until the host's `add_used` rate catches up.
413///
414/// Forwards the underlying `send_payload_metrics` boolean: `true`
415/// when the frame was written, `false` when the bulk port was not
416/// open, the write failed, or the call ran in host context (the
417/// `assert_guest_context` early-return). The emit call site in
418/// `evaluate` discards it at statement position; the boolean exists
419/// so the host-context no-op is observable to a test.
420fn emit_payload_metrics(pm: &PayloadMetrics) -> bool {
421    crate::vmm::guest_comms::send_payload_metrics(pm)
422}
423
424// ---------------------------------------------------------------------------
425// PayloadHandle — background spawn result
426// ---------------------------------------------------------------------------
427
428/// Handle to a background payload spawned via
429/// [`PayloadRun::spawn`]. Wraps a guest-local
430/// [`std::process::Child`]; `wait` / `kill` both consume the handle
431/// and return the collected metrics + assertion verdict.
432///
433/// Drop behavior: if the handle is dropped without `wait`/`kill`,
434/// the child and every process it forked are SIGKILLed via the
435/// process group headed by the child, then the child is reaped with
436/// `child.wait()`, and a stderr warning is emitted so the test
437/// author sees the implicit drop. The process-group kill reaches
438/// every descendant of multi-process payloads (stress-ng, schbench
439/// worker mode, fio `--numjobs`); without it the orphans keep
440/// stdout/stderr open, block `wait_and_capture`, and lose metrics.
441///
442/// When multiple handles are active, sidecar entries appear in
443/// finalization order (the order `.wait()`, `.kill()`, or
444/// `.try_wait()` returning `Ok(Some(..))` are called), not spawn
445/// order. `.try_wait()` only records on its terminal branch; an
446/// `Ok(None)` return keeps the handle live and defers the sidecar
447/// write to the next terminal call.
448// DROP-ORDER-CRITICAL: keep `_sigchld` LAST in the field list.
449//
450// Rust drops fields in declaration order, so the LAST field is the
451// LAST to drop. `SigchldScope::drop` re-installs the previously
452// captured disposition; multiple live `SigchldScope`s on the same
453// thread form a LIFO stack whose unwinding is what restores the
454// outermost original disposition. Any field added between `child`
455// and `_sigchld` would still keep `_sigchld` last, but a field
456// added AFTER `_sigchld` would reorder the drop sequence and break
457// the LIFO invariant — drop the new field first, signal-restore
458// path runs second, the disposition handler captured BEFORE the
459// new field was constructed gets restored even though the new
460// field outlived nothing. Also: when the caller holds two
461// `PayloadHandle`s and drops them in non-creation order, the LIFO
462// invariant is preserved by the field-order rule WITHIN each
463// handle but not ACROSS handles — non-LIFO drop across handles
464// will leak `SIG_DFL` into the rest of the process. Test authors
465// must drop handles in reverse creation order; the LIFO test in
466// the unit-test module pins the within-handle field-order rule
467// to catch a future refactor that re-orders the struct.
468#[must_use = "dropping a PayloadHandle SIGKILLs the child's process group; call .wait() or .kill() explicitly"]
469pub struct PayloadHandle {
470    /// Live child process. Wrapped in `Option` so consumers can
471    /// take ownership in `wait`/`kill` without making the drop-guard
472    /// reach into a `None`.
473    child: Option<std::process::Child>,
474    payload: &'static Payload,
475    checks: Vec<MetricCheck>,
476    /// `SIGCHLD` guard installed at spawn time. Kept alive until
477    /// the handle is consumed (via `wait`/`kill`/Drop) so the
478    /// child's eventual `waitpid` sees `SIG_DFL` instead of the
479    /// guest init's `SIG_IGN`. See [`SigchldScope`] for the full
480    /// rationale.
481    ///
482    /// DROP-ORDER-CRITICAL: keep this field LAST. See struct-level
483    /// note above.
484    _sigchld: SigchldScope,
485}
486
487impl std::fmt::Debug for PayloadHandle {
488    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
489        // Payload's manual Debug renders identity fields; the inner
490        // Child is omitted (not Debug-rendering-friendly and carries
491        // OS handles) — a one-line summary is enough for panics /
492        // test output.
493        f.debug_struct("PayloadHandle")
494            .field("payload", &self.payload.name)
495            .field("child_alive", &self.child.is_some())
496            .field("checks_len", &self.checks.len())
497            .finish()
498    }
499}
500
501impl PayloadHandle {
502    /// Name of the [`Payload`] this handle was spawned from — i.e.
503    /// the identity key used by step-level ops to address a running
504    /// payload. Step-local ops ([`Op::WaitPayload`](crate::scenario::ops::Op::WaitPayload),
505    /// [`Op::KillPayload`](crate::scenario::ops::Op::KillPayload))
506    /// match handles by this name.
507    pub fn payload_name(&self) -> &'static str {
508        self.payload.name
509    }
510
511    /// Live child's OS-level pid, or `None` once `wait`/`kill`/
512    /// `try_wait` has consumed the child.
513    ///
514    /// Integration tests that spawn a workload and then need to
515    /// target it with a second tool (for example the jemalloc-TLS
516    /// probe in `tests/jemalloc_probe_tests.rs`, which passes the
517    /// workload's pid to `ktstr-jemalloc-probe --pid`) read this
518    /// value between `spawn` and `wait`/`kill`/`try_wait`. The
519    /// internal fork-descendant reap test also uses it to probe
520    /// the process group via `killpg(_, 0)` after `kill()` without
521    /// reaching into the private `child` field.
522    pub fn pid(&self) -> Option<u32> {
523        self.child.as_ref().map(|c| c.id())
524    }
525
526    /// Block until the child exits naturally, then extract metrics
527    /// and evaluate checks, matching the foreground `.run()` return
528    /// shape.
529    ///
530    /// Metrics are also recorded to the per-test sidecar via the
531    /// SHM ring; the returned tuple is a convenience view of the
532    /// same values.
533    pub fn wait(mut self) -> Result<(AssertResult, PayloadMetrics)> {
534        let mut child = self
535            .child
536            .take()
537            .ok_or_else(|| already_consumed(self.payload))?;
538        // Block until the leader exits naturally first, BEFORE
539        // spawning reader threads inside `wait_and_capture`. Then
540        // killpg the group to reap any descendants (stress-ng workers,
541        // schbench worker mode, fio --numjobs threads) that survived
542        // the leader and still hold the inherited stdout/stderr pipes
543        // open. Without this killpg-before-drain step the reader
544        // threads would block forever waiting for descendants to
545        // release the pipes — the same blocking-pipe failure mode
546        // that try_wait's kill_payload_process_group step guards
547        // against. `Child::wait`
548        // caches the exit status so the second call inside
549        // `wait_and_capture` returns immediately without a syscall.
550        if let Err(e) = child.wait() {
551            kill_payload_process_group(&child, self.payload.name, self.payload.uses_parent_pgrp);
552            let _ = child.wait();
553            return Err(e).with_context(|| format!("wait payload '{}'", self.payload.name));
554        }
555        kill_payload_process_group(&child, self.payload.name, self.payload.uses_parent_pgrp);
556        match wait_and_capture(&mut child) {
557            Ok(output) => Ok(evaluate(self.payload, &self.checks, output)),
558            Err(e) => {
559                // killpg already ran above; one more `wait` clears
560                // the zombie so the pid slot is freed regardless of
561                // the capture error.
562                let _ = child.wait();
563                Err(e).with_context(|| format!("wait payload '{}'", self.payload.name))
564            }
565        }
566    }
567
568    /// SIGKILL the child **and every process it forked**, reap it,
569    /// and return whatever stdout+stderr was captured along with the
570    /// process exit code. Suitable for time-boxed background loads.
571    ///
572    /// The signal is delivered via `killpg(child_pid, SIGKILL)`
573    /// rather than `child.kill()` because `build_command` places the
574    /// payload at the head of its own process group. Multi-process
575    /// payloads (stress-ng, schbench worker mode, fio --numjobs) fork
576    /// descendants that keep stdout/stderr open; killing only the
577    /// head would orphan those writers and block
578    /// `wait_and_capture` forever, losing every metric.
579    ///
580    /// Metrics are also recorded to the per-test sidecar via the
581    /// SHM ring; the returned tuple is a convenience view of the
582    /// same values.
583    pub fn kill(mut self) -> Result<(AssertResult, PayloadMetrics)> {
584        let mut child = self
585            .child
586            .take()
587            .ok_or_else(|| already_consumed(self.payload))?;
588        kill_payload_process_group(&child, self.payload.name, self.payload.uses_parent_pgrp);
589        match wait_and_capture(&mut child) {
590            Ok(output) => Ok(evaluate(self.payload, &self.checks, output)),
591            Err(e) => {
592                // killpg + single-pid SIGKILL already ran at the
593                // start; the reap or pipe-drain failed afterwards.
594                // One more `wait` clears the zombie so the pid slot
595                // is freed regardless of the capture error.
596                let _ = child.wait();
597                Err(e).with_context(|| format!("reap killed payload '{}'", self.payload.name))
598            }
599        }
600    }
601
602    /// Non-blocking check for exit without consuming the handle.
603    /// Returns `Ok(Some((result, metrics)))` once the child has
604    /// exited and output is drained; `Ok(None)` while still
605    /// running. The handle remains live on `Ok(None)`.
606    ///
607    /// On the terminal `Ok(Some(..))` return, metrics are also
608    /// recorded to the per-test sidecar via the SHM ring; the
609    /// returned tuple is a convenience view of the same values.
610    pub fn try_wait(&mut self) -> Result<Option<(AssertResult, PayloadMetrics)>> {
611        let child = self
612            .child
613            .as_mut()
614            .ok_or_else(|| already_consumed(self.payload))?;
615        match child.try_wait()? {
616            Some(_status) => {
617                // `child` was Some above; the earlier branch didn't
618                // `take()` it, so this unwrap is guaranteed to hold.
619                let mut child = self
620                    .child
621                    .take()
622                    .expect("child still present on terminal branch");
623                // The leader has exited (try_wait returned Some), but
624                // descendants forked off the leader (stress-ng workers,
625                // schbench worker mode, fio --numjobs threads) may
626                // still be alive and holding the inherited
627                // stdout/stderr pipes open. Without first SIGKILLing
628                // the process group, `wait_and_capture` would block
629                // forever on the read syscall waiting for those
630                // descendants to release the pipes. This mirrors the
631                // kill() path's kill_payload_process_group call which
632                // always SIGKILLs the group before draining — the
633                // only difference here is
634                // the leader has already exited, so killpg is reaping
635                // descendants only.
636                kill_payload_process_group(
637                    &child,
638                    self.payload.name,
639                    self.payload.uses_parent_pgrp,
640                );
641                match wait_and_capture(&mut child) {
642                    Ok(output) => Ok(Some(evaluate(self.payload, &self.checks, output))),
643                    Err(e) => {
644                        // killpg + single-pid SIGKILL already ran at
645                        // the top of this branch; the reap or
646                        // pipe-drain failed afterwards. One more
647                        // `wait` clears the zombie so the pid slot is
648                        // freed regardless of the capture error.
649                        let _ = child.wait();
650                        Err(e).with_context(|| format!("reap payload '{}'", self.payload.name))
651                    }
652                }
653            }
654            None => Ok(None),
655        }
656    }
657}
658
659/// Error value produced when `wait`/`kill`/`try_wait` is called on a
660/// handle whose child has already been taken by a prior call. The
661/// payload name anchors the error to a specific handle so the
662/// test log points directly at the misuse site.
663fn already_consumed(payload: &'static Payload) -> anyhow::Error {
664    anyhow!(
665        "PayloadHandle for '{}' already consumed by a prior \
666         wait/kill/try_wait call; each handle can only produce \
667         one (AssertResult, PayloadMetrics) pair",
668        payload.name,
669    )
670}
671
672/// Drop-safety net for handles that fall out of scope without
673/// going through [`PayloadHandle::wait`], [`PayloadHandle::kill`],
674/// or [`PayloadHandle::try_wait`] (the three paths that
675/// `.take()` the child normally). Drop routes the process group
676/// through `kill_payload_process_group` — the SAME kill path the
677/// explicit `kill()` method uses — so there is no redundant
678/// `child.kill()` call: the killpg + single-pid SIGKILL inside
679/// `kill_payload_process_group` is belt-and-suspenders-by-design
680/// (see its doc for the pre-exec ESRCH race rationale), not
681/// two independent kills stacked. `child.wait()` reaps the
682/// zombie so the pid slot is freed even on the "dropped without
683/// consume" path, and the one-shot eprintln tells the operator
684/// metrics were lost.
685impl Drop for PayloadHandle {
686    fn drop(&mut self) {
687        if let Some(mut child) = self.child.take() {
688            kill_payload_process_group(&child, self.payload.name, self.payload.uses_parent_pgrp);
689            let _ = child.wait();
690            eprintln!(
691                "ktstr: PayloadHandle for '{}' dropped without wait/kill — \
692                 process group SIGKILLed, metrics not recorded.",
693                self.payload.name,
694            );
695        }
696    }
697}
698
699/// Send `SIGKILL` to the process group headed by `child` AND to the
700/// leader pid directly.
701///
702/// `build_command` requests `CommandExt::process_group(0)` by default
703/// so the child's pid becomes its own process-group leader, coordinated
704/// with exec setup by the standard library. `killpg(pgid, SIGKILL)`
705/// on the child's pid therefore reaches every fork descendant in
706/// one shot — a single `child.kill()` would otherwise miss
707/// grandchildren of multi-process payloads (stress-ng, schbench
708/// worker mode, fio --numjobs) and those orphans would keep the
709/// stdout/stderr pipes open, hanging `wait_and_capture` forever.
710///
711/// When `uses_parent_pgrp` is `true`, the child shares its parent's
712/// pgrp ([`Payload::uses_parent_pgrp`] opted out of the fresh
713/// process group for tty-dependent binaries). The `killpg` call is
714/// skipped entirely in that case — issuing it would either hit
715/// `ESRCH` (child is not a pgrp leader) in the common case or, worse,
716/// target an unrelated group if the pgrp id happened to match a stale
717/// value. Only the direct `kill(pid)` on the leader runs; opt-out
718/// payloads accept responsibility for cleaning up their own
719/// descendants.
720///
721/// The follow-up `kill(pid, SIGKILL)` on the leader pid is
722/// belt-and-suspenders coverage for the edge case where `killpg`
723/// alone is insufficient: the kernel-side pgid transition during
724/// exec may not have completed yet when `killpg` fires, so
725/// `killpg` returns `ESRCH` (no such group) and the leader
726/// survives. A direct `kill(pid, SIGKILL)` always reaches the
727/// leader, and the SIGKILL survives `execve(2)` to take effect
728/// once exec completes (signal disposition is preserved across
729/// exec; the pending signal is delivered once the new image
730/// starts). SIGKILL is idempotent against zombies and
731/// already-dead processes, so the extra signal is safe after a
732/// successful `killpg` — a killpg that reached the leader has
733/// already queued it for SIGKILL, and the follow-up `kill(pid)`
734/// is a no-op on the terminated process.
735///
736/// `child.id()` returns `u32` for API ergonomics; on Linux the
737/// kernel's `pid_max ≤ 2²²` guarantees the value fits in
738/// [`libc::pid_t`]'s positive `i32` range, so `try_from` succeeds on
739/// every live child. `debug_assert!(pgid > 0)` catches the
740/// theoretically-impossible non-positive case before
741/// [`nix::sys::signal::killpg`] would otherwise interpret it as a
742/// broadcast target. `ESRCH` is logged as-a-no-op for both calls
743/// — it means either "group/process already reaped" or "group not
744/// yet set up"; the follow-up direct `kill` plus the leader's
745/// eventual `waitpid` consumer handle both.
746///
747/// # Process-group escape (not handled here)
748///
749/// `killpg` reaches every process whose `getpgrp()` equals the
750/// leader's pgid. A descendant that calls `setpgid(0, 0)` or
751/// `setsid(2)` between fork and exit leaves the leader's process
752/// group and becomes invisible to this helper — the escaping
753/// descendant keeps running after SIGKILL and may continue holding
754/// pipe fds that stall `wait_and_capture`. The bundled payloads
755/// (stress-ng, schbench, fio) have not been audited for these
756/// syscalls. `build_command` does not place an exec'd
757/// child into any other group; this limitation applies only to
758/// third-party payloads that deliberately re-parent themselves. The
759/// mitigation path is the caller's: wrap the misbehaving payload in
760/// a shell that traps SIGTERM → SIGKILL of its own descendants, or
761/// register the leader as a subreaper
762/// (`PR_SET_CHILD_SUBREAPER`) and reap orphans explicitly.
763///
764/// # Caller contract
765///
766/// Every caller MUST hold a live [`SigchldScope`] for the duration of
767/// the `wait` / `waitpid` that reaps the leader after this call
768/// returns. Without `SIG_DFL` for `SIGCHLD`, the guest init's
769/// `SIG_IGN` default causes `wait` to block until the child is
770/// re-reaped by init or to return `ECHILD` on an already-ignored
771/// SIGCHLD. Audited caller set — every invocation of this function:
772///
773/// - `PayloadHandle::wait` (one site: error arm after a
774///   `wait_and_capture` failure) — holds `self._sigchld`.
775/// - `PayloadHandle::kill` (one site: top of the method, before
776///   drain) — holds `self._sigchld`.
777/// - `PayloadHandle::try_wait` (one site: error arm after a
778///   terminal `try_wait` when drain fails) — holds `self._sigchld`.
779/// - `impl Drop for PayloadHandle` (one site: handle dropped without
780///   an explicit `wait`/`kill`/`try_wait` consume) — holds
781///   `self._sigchld` for the full Drop body.
782/// - `spawn_and_wait` (one site: error arm when `wait_and_capture`
783///   fails on a timeout-less foreground spawn) — opens a local
784///   `let _sigchld = SigchldScope::new()` at the top of the
785///   function.
786/// - `wait_with_deadline` (two sites: deadline-miss kill on expiry,
787///   and error arm for drain failure on natural child exit) — runs
788///   inside `spawn_and_wait`'s `_sigchld` scope, which is held
789///   across the callee as a local binding.
790///
791/// Every `PayloadHandle` method is safe because `_sigchld` is
792/// declared after `child` in the struct body; Rust drops fields in
793/// declaration order so `_sigchld` lives longer than the child
794/// `Option`, keeping the scope live for the full method body.
795///
796/// A future caller that skips either pattern will see
797/// `waitpid` hang on some guest runtimes — add a `SigchldScope` at
798/// the call site, or extend an enclosing type with a
799/// `_sigchld: SigchldScope` field, before landing.
800fn kill_payload_process_group(
801    child: &std::process::Child,
802    payload_name: &str,
803    uses_parent_pgrp: bool,
804) {
805    let raw_pid = child.id();
806    let pgid = match libc::pid_t::try_from(raw_pid) {
807        Ok(p) if p > 0 => p,
808        Ok(p) => {
809            tracing::error!(
810                payload = payload_name,
811                pid = p,
812                "child pid is non-positive — cannot kill process group; \
813                 skipping kill (kernel's pid_max invariant violated, \
814                 no safe target)"
815            );
816            return;
817        }
818        Err(_) => {
819            tracing::error!(
820                payload = payload_name,
821                pid = raw_pid,
822                "child pid exceeds pid_t range — cannot kill process group; \
823                 skipping kill (Linux pid_max is 2^22 so this is only \
824                 reachable on a non-Linux target or a kernel with an \
825                 extended pid space)"
826            );
827            return;
828        }
829    };
830    let pid = nix::unistd::Pid::from_raw(pgid);
831    // `uses_parent_pgrp=true` means `build_command` did NOT request
832    // `process_group(0)`, so the child shares its parent's process
833    // group. A `killpg(pgid=child_pid, …)` call would target a group
834    // the child does not lead — `ESRCH` in the common case, or (worse)
835    // reach the ktstr process itself if a stale pid matches. Skip the
836    // group kill entirely and rely on the direct `kill(pid)` below to
837    // reap the leader. Multi-process tty-dependent payloads that
838    // opt out of the fresh pgrp accept responsibility for their own
839    // descendant cleanup (see `Payload::uses_parent_pgrp` doc).
840    if !uses_parent_pgrp {
841        match nix::sys::signal::killpg(pid, nix::sys::signal::Signal::SIGKILL) {
842            Ok(()) => {}
843            Err(nix::errno::Errno::ESRCH) => {
844                tracing::debug!(
845                    payload = payload_name,
846                    pgid,
847                    "ESRCH — payload process group already reaped",
848                );
849            }
850            Err(e) => {
851                tracing::warn!(payload = payload_name, pgid, %e, "killpg failed for payload process group");
852            }
853        }
854    }
855    match nix::sys::signal::kill(pid, nix::sys::signal::Signal::SIGKILL) {
856        Ok(()) => {}
857        Err(nix::errno::Errno::ESRCH) => {
858            tracing::debug!(
859                payload = payload_name,
860                pid = pgid,
861                "ESRCH — payload leader already reaped",
862            );
863        }
864        Err(e) => {
865            tracing::warn!(payload = payload_name, pid = pgid, %e, "direct kill failed for payload leader");
866        }
867    }
868}
869
870/// Resolve each extracted metric's polarity + unit against the
871/// payload's declared `metrics` hints.
872///
873/// Unhinted metrics keep [`crate::test_support::Polarity::Unknown`] and empty unit.
874///
875/// Complexity: O(N + M) — build a `HashMap<&str, &MetricHint>` from
876/// the hint slice once, then look up each metric by name in O(1).
877/// The prior linear-scan implementation was O(N × M) where N is
878/// extracted metrics and M is declared hints; fio JSON with
879/// thousands of leaves + a dozen hints was the hottest path this
880/// module sees per payload run.
881fn resolve_polarities(metrics: &mut [Metric], payload: &Payload) {
882    if payload.metrics.is_empty() || metrics.is_empty() {
883        return;
884    }
885    let hints: std::collections::HashMap<&str, &crate::test_support::MetricHint> =
886        payload.metrics.iter().map(|h| (h.name, h)).collect();
887    for metric in metrics {
888        if let Some(hint) = hints.get(metric.name.as_str()) {
889            metric.polarity = hint.polarity;
890            metric.unit = hint.unit.to_string();
891        }
892    }
893}
894
895/// Evaluate [`MetricCheck`]s against a [`crate::test_support::PayloadMetrics`] and fold the
896/// verdict into an [`AssertResult`].
897///
898/// Evaluation order:
899/// 1. [`MetricCheck::ExitCodeEq`] pre-pass — evaluated FIRST so a
900///    misconfigured binary fails with an actionable exit-code error
901///    rather than "metric X not found".
902/// 2. Metric-path checks ([`MetricCheck::Min`], [`MetricCheck::Max`],
903///    [`MetricCheck::Range`], [`MetricCheck::Exists`]).
904///
905/// `stderr` is folded into the exit-code-mismatch detail when
906/// present — when a binary fails with "expected 0 got 1", the
907/// captured stderr almost always explains why, and forcing the test
908/// author to go hunt it down defeats actionable diagnostics.
909///
910/// Missing metrics fail loudly — a `Min` / `Max` / `Range` / `Exists`
911/// check against an absent metric reports a "not found" detail
912/// instead of silently passing.
913fn evaluate_checks(checks: &[MetricCheck], pm: &PayloadMetrics, stderr: &str) -> AssertResult {
914    let mut result = AssertResult::pass();
915    // Pre-pass: exit-code checks first. Delegates to
916    // `exit_code_mismatch_detail` for the detail's kind + message.
917    // Short-circuit on mismatch — a bad exit probably means the
918    // metric extraction found nothing useful.
919    if let Some(detail) = exit_code_mismatch_detail(checks, pm.exit_code, stderr) {
920        result.merge(AssertResult::fail(detail));
921        return result;
922    }
923    // Metric-path pass.
924    //
925    // Each comparator below routes a NaN observed value through
926    // `nan_metric` BEFORE the bound comparison. IEEE 754 makes
927    // every comparison against NaN evaluate to false (`NaN < x`,
928    // `NaN > x`, and `NaN == x` are all false), which would let
929    // a NaN-valued metric silently pass `Min` / `Max` / `Range` —
930    // exactly the case operators most need to flag, since a NaN
931    // value indicates the payload's metric extraction itself is
932    // broken (a divide-by-zero, an unparsed token, or a typed-
933    // measurement error). Surface NaN as a hard failure with a
934    // dedicated message so the bound check never silently green-
935    // lights an unmeasurable value.
936    for check in checks {
937        let detail = match check {
938            MetricCheck::Min { metric, value } => pm.get(metric).map_or_else(
939                || Some(missing_metric(metric)),
940                |actual| {
941                    if actual.is_nan() {
942                        Some(nan_metric(metric))
943                    } else if actual < *value {
944                        Some(AssertDetail::new(
945                            DetailKind::Other,
946                            format!("metric '{metric}' = {actual} below minimum {value}"),
947                        ))
948                    } else {
949                        None
950                    }
951                },
952            ),
953            MetricCheck::Max { metric, value } => pm.get(metric).map_or_else(
954                || Some(missing_metric(metric)),
955                |actual| {
956                    if actual.is_nan() {
957                        Some(nan_metric(metric))
958                    } else if actual > *value {
959                        Some(AssertDetail::new(
960                            DetailKind::Other,
961                            format!("metric '{metric}' = {actual} exceeds maximum {value}"),
962                        ))
963                    } else {
964                        None
965                    }
966                },
967            ),
968            MetricCheck::Range { metric, lo, hi } => pm.get(metric).map_or_else(
969                || Some(missing_metric(metric)),
970                |actual| {
971                    if actual.is_nan() {
972                        Some(nan_metric(metric))
973                    } else if actual < *lo || actual > *hi {
974                        Some(AssertDetail::new(
975                            DetailKind::Other,
976                            format!("metric '{metric}' = {actual} outside [{lo}, {hi}]"),
977                        ))
978                    } else {
979                        None
980                    }
981                },
982            ),
983            MetricCheck::Exists(metric) => pm.get(metric).is_none().then(|| missing_metric(metric)),
984            MetricCheck::ExitCodeEq(_) => None, // already evaluated in pre-pass
985        };
986        if let Some(d) = detail {
987            result.merge(AssertResult::fail(d));
988        }
989    }
990    result
991}
992
993/// Build the NaN-value [`AssertDetail`] surfaced by every
994/// magnitude comparator ([`MetricCheck::Min`], [`MetricCheck::Max`],
995/// [`MetricCheck::Range`]) when a metric extracts as NaN. Pulled
996/// out so the three call sites share one message format — a
997/// renamer-resistant single source of truth that pairs naturally
998/// with [`missing_metric`] for the absent-metric counterpart.
999fn nan_metric(metric: &str) -> AssertDetail {
1000    AssertDetail::new(DetailKind::Other, format!("metric '{metric}' value is NaN"))
1001}
1002
1003fn missing_metric(metric: &str) -> AssertDetail {
1004    AssertDetail::new(
1005        DetailKind::Other,
1006        format!("metric '{metric}' not found in payload output"),
1007    )
1008}
1009
1010/// Scan `checks` for the first `MetricCheck::ExitCodeEq` whose expected
1011/// value differs from `actual_exit_code` and return a matching
1012/// diagnostic [`AssertDetail`]. Returns `None` when no
1013/// `ExitCodeEq` check is declared, or when every declared one
1014/// matches the observed exit code.
1015///
1016/// Factored out of [`evaluate_checks`]'s pre-pass so the exit-code
1017/// mismatch detail has one source of truth for kind, message format,
1018/// and the "which MetricCheck wins" order.
1019fn exit_code_mismatch_detail(
1020    checks: &[MetricCheck],
1021    actual_exit_code: i32,
1022    stderr: &str,
1023) -> Option<AssertDetail> {
1024    checks.iter().find_map(|c| match c {
1025        MetricCheck::ExitCodeEq(expected) if actual_exit_code != *expected => {
1026            Some(AssertDetail::new(
1027                DetailKind::Other,
1028                format_exit_mismatch(actual_exit_code, *expected, stderr),
1029            ))
1030        }
1031        _ => None,
1032    })
1033}
1034
1035/// Render an exit-code mismatch with a trailing stderr tail when
1036/// non-empty. Long stderr is tail-truncated (last 1 KiB) — the end
1037/// of a failing process usually carries the actionable error.
1038const STDERR_TAIL_BYTES: usize = 1024;
1039
1040fn format_exit_mismatch(actual: i32, expected: i32, stderr: &str) -> String {
1041    let trimmed = stderr.trim_end();
1042    if trimmed.is_empty() {
1043        return format!("payload exited with code {actual}, expected {expected}");
1044    }
1045    let tail = stderr_tail(trimmed, STDERR_TAIL_BYTES);
1046    format!("payload exited with code {actual}, expected {expected}; stderr:\n{tail}")
1047}
1048
1049/// Return the final `max_bytes` of `s`, snapped forward to a char
1050/// boundary so slicing never panics on multi-byte input. Emits a
1051/// leading `...` when truncation actually happens.
1052fn stderr_tail(s: &str, max_bytes: usize) -> String {
1053    if s.len() <= max_bytes {
1054        return s.to_string();
1055    }
1056    let mut start = s.len() - max_bytes;
1057    while start < s.len() && !s.is_char_boundary(start) {
1058        start += 1;
1059    }
1060    format!("...{}", &s[start..])
1061}
1062
1063/// Captured output from a payload process invocation. `stderr`
1064/// is kept so the evaluator can surface it on non-zero exit — the
1065/// extracted metrics alone don't explain why a binary failed.
1066struct SpawnOutput {
1067    stdout: String,
1068    stderr: String,
1069    exit_code: i32,
1070}
1071
1072/// Resolve the user-supplied cgroup name to an absolute path
1073/// under `ctx.cgroups.parent_path()`, validating BEFORE fork so a
1074/// bad name produces a clear error rather than a `pre_exec` failure
1075/// that surfaces as an `io::Error` after the child is already spawning.
1076///
1077/// `op` is a caller-supplied label (typically `"PayloadRun::run"`
1078/// or `"PayloadRun::spawn"`) that prefixes every validation-error
1079/// message so an operator reading the failure can identify which
1080/// Op-method bubbled the error without re-running the test.
1081///
1082/// Rules:
1083/// - `None` → child inherits caller's cgroup (returns `Ok(None)`).
1084/// - A leading `/` is tolerated and stripped so `"/workload"` and
1085///   `"workload"` behave identically.
1086/// - NUL bytes are rejected — a resolved path with an interior
1087///   NUL would truncate inside any `libc` layer that handles it.
1088///   The downstream cgroupfs write in
1089///   [`crate::cgroup::CgroupOps::place_task_during_handshake`]
1090///   would also reject the bad name, but catching it up-front
1091///   gives a clearer diagnostic than the underlying write error.
1092/// - Any `..` component is rejected to prevent the name from
1093///   escaping the parent cgroup.
1094/// - Empty names (or names that strip to empty) are rejected so a
1095///   typo doesn't silently target the parent cgroup itself.
1096fn resolve_cgroup_path(
1097    ctx: &Ctx<'_>,
1098    name: Option<&str>,
1099    op: &'static str,
1100) -> Result<Option<PathBuf>> {
1101    let Some(name) = name else {
1102        return Ok(None);
1103    };
1104    if name.as_bytes().contains(&0) {
1105        return Err(anyhow!("{op}: cgroup name '{name}' contains a NUL byte"));
1106    }
1107    let trimmed = name.trim_start_matches('/');
1108    if trimmed.is_empty() {
1109        return Err(anyhow!(
1110            "{op}: cgroup name '{name}' is empty or resolves to the parent cgroup"
1111        ));
1112    }
1113    let relative = std::path::Path::new(trimmed);
1114    if relative
1115        .components()
1116        .any(|c| matches!(c, std::path::Component::ParentDir))
1117    {
1118        return Err(anyhow!(
1119            "{op}: cgroup name '{name}' contains '..'; paths must stay within the \
1120             test's cgroup parent"
1121        ));
1122    }
1123    Ok(Some(ctx.cgroups.parent_path().join(relative)))
1124}
1125
1126/// Build a [`std::process::Command`] with args, piped stdout/stderr, a
1127/// `process_group(0)` request when the payload is not
1128/// `uses_parent_pgrp`, and (optionally) a cgroup-placement
1129/// pre_exec hook that BLOCKS the child on a read from a
1130/// caller-owned release pipe until the parent has placed the
1131/// child into the target cgroup via
1132/// [`crate::cgroup::CgroupOps::place_task_during_handshake`].
1133///
1134/// When `cgroup` is `Some((name, path))`, the returned tuple's
1135/// second element is `Some(CgroupSyncHandles)` — a parent-side
1136/// bundle of (a) the write end of the release pipe, (b) the read
1137/// end of the child-side pid-notify pipe, (c) the user-facing
1138/// `cgroup_name` the placement trait method will receive, and
1139/// (d) the absolute `cgroup.procs` path retained for diagnostic
1140/// surfaces. The caller passes it to
1141/// [`spawn_with_cgroup_sync`], which drives the placement
1142/// protocol by reading the child pid, calling
1143/// `cgroup_ops.place_task_during_handshake(cgroup_name, child_pid)`,
1144/// then releasing the child via a single-byte write on the
1145/// release pipe.
1146///
1147/// When `cgroup` is `None`, the returned handle is `None`
1148/// and callers may invoke `Command::spawn()` on the returned
1149/// `Command` directly — no placement protocol is required and
1150/// the child's cgroup is inherited from the parent (the ktstr
1151/// process).
1152///
1153/// Returns `Err` if the pipe(2) pair allocation fails.
1154fn build_command(
1155    binary: &str,
1156    args: &[String],
1157    cgroup: Option<(&str, &std::path::Path)>,
1158    uses_parent_pgrp: bool,
1159) -> Result<(std::process::Command, Option<CgroupSyncHandles>)> {
1160    use std::os::unix::process::CommandExt;
1161    use std::process::{Command, Stdio};
1162
1163    let mut cmd = Command::new(binary);
1164    cmd.args(args).stdout(Stdio::piped()).stderr(Stdio::piped());
1165    if !uses_parent_pgrp {
1166        // `process_group(0)` requests a fresh process group with
1167        // the child as leader (pgid == child's pid). `killpg` on
1168        // the child's pid then reaches every fork descendant in
1169        // one signal — a single `child.kill()` would otherwise
1170        // miss grandchildren of multi-process payloads (stress-ng,
1171        // schbench worker mode, fio with multiple jobs), and
1172        // those orphans keep the stdout/stderr pipes open,
1173        // hanging `wait_and_capture` and discarding the metrics.
1174        //
1175        // Previously a hand-rolled `pre_exec(setpgid(0, 0))` hook
1176        // did the same job, but a `killpg` issued between
1177        // `fork(2)` and the child's `setpgid` completion could
1178        // return `ESRCH` (no such group) while the child and its
1179        // descendants survived. `CommandExt::process_group`
1180        // NARROWS that window: on `posix_spawn`-capable
1181        // platforms (and futures where `process_group` dispatches
1182        // to it) the pgid transition is kernel-sequenced with
1183        // exec and the race is eliminated. When the standard
1184        // library has to fall through to the fork+exec path —
1185        // which it does whenever a cgroup placement `pre_exec`
1186        // hook is also registered below, as `process_group(0)`
1187        // and any `pre_exec` together force the legacy path —
1188        // the remaining window is covered by the direct
1189        // `kill(pid, SIGKILL)` follow-up in
1190        // `kill_payload_process_group`.
1191        //
1192        // The `uses_parent_pgrp == true` branch SKIPS this call
1193        // so the child inherits the parent ktstr process's pgid.
1194        // Opt-in for tty-dependent payloads (shells, `less`,
1195        // anything that reads controlling-terminal foreground-
1196        // pgrp for job-control signalling) — a fresh pgrp reads
1197        // as "no job control" and breaks their signal
1198        // behaviour. The cost is that `killpg(child_pid, ...)`
1199        // no longer reaches descendants (the child isn't a
1200        // pgrp leader), so multi-process tty-dependent payloads
1201        // must react to SIGHUP / pipe close on their own or
1202        // risk orphaning — see the doc on `Payload::uses_parent_pgrp`.
1203        cmd.process_group(0);
1204    }
1205
1206    if let Some((cgroup_name, cgroup_path)) = cgroup {
1207        // Two-pipe cgroup-placement handshake. `notify_*` carries
1208        // the child's pid from its pre_exec hook up to the parent
1209        // so the parent can dispatch the placement call via
1210        // [`crate::cgroup::CgroupOps::place_task_during_handshake`]
1211        // (`Command::spawn()` blocks on the stdlib CLOEXEC status
1212        // pipe until the child execve's, so the pid from
1213        // `Child::id()` is NOT available to the parent in time).
1214        // `release_*` is the reverse channel — the parent writes a
1215        // single byte once the `cgroup.procs` update has been
1216        // committed, and the child's pre_exec blocks on that byte
1217        // so its execve cannot race the placement.
1218        //
1219        // Both pipes are created with O_CLOEXEC so the parent's
1220        // copies never leak to the child (only the fds we
1221        // explicitly hand into the pre_exec closure via raw fd
1222        // numbers are touched by the child, and those are closed
1223        // on execve once pre_exec returns). This matches the
1224        // pre_exec AS-safety contract — only `read(2)` /
1225        // `write(2)` / `close(2)` / `getpid(2)` run between fork
1226        // and execve, all of which are explicitly AS-safe per
1227        // POSIX.1-2017 §2.4.3.
1228        let notify = PipePair::new().context("allocate cgroup sync pid-notify pipe")?;
1229        let release = PipePair::new().context("allocate cgroup sync release pipe")?;
1230        let notify_read_fd = notify.r_fd();
1231        let notify_write_fd = notify.w_fd();
1232        let release_read_fd = release.r_fd();
1233        let release_write_fd = release.w_fd();
1234        // SAFETY: the pre_exec closure runs in the child between
1235        // fork and execve. The body uses only getpid / write /
1236        // read / close, all AS-safe. All four fds are raw numbers
1237        // inherited by the child via the fork; the pre_exec hook
1238        // ALSO closes the child's own inherited copies of the
1239        // ends the parent will hold (`notify_read_fd`,
1240        // `release_write_fd`) BEFORE blocking on read, so the
1241        // parent's drop of the release write end actually reaches
1242        // the child as EOF instead of being masked by the child's
1243        // own inherited writer copy (which would otherwise leave
1244        // `read(release_read_fd)` blocked indefinitely — the
1245        // cross-fork pipe-sync deadlock). On the parent side we
1246        // hold owned copies in `notify` / `release` which we
1247        // close after consuming them in `drive_cgroup_handshake`.
1248        unsafe {
1249            cmd.pre_exec(move || {
1250                cgroup_sync_pre_exec(
1251                    notify_read_fd,
1252                    notify_write_fd,
1253                    release_read_fd,
1254                    release_write_fd,
1255                )
1256            });
1257        }
1258        let handles = CgroupSyncHandles {
1259            notify,
1260            release,
1261            cgroup_name: cgroup_name.to_string(),
1262            cgroup_procs_path: cgroup_path.join("cgroup.procs"),
1263        };
1264        return Ok((cmd, Some(handles)));
1265    }
1266    Ok((cmd, None))
1267}
1268
1269/// Owned pipe(2) pair. Tracks both fds as raw numbers so the
1270/// struct stays `Copy`-free and explicit about lifetime (closed
1271/// via `Drop` when no longer needed). The parent keeps one half
1272/// of each direction; the other halves are inherited by the
1273/// child through fork and consumed by `cgroup_sync_pre_exec`.
1274///
1275/// `O_CLOEXEC` is set on both ends at creation via `pipe2(2)` so
1276/// the parent's references do not leak into any subsequent
1277/// `Command::spawn()` that might run from a reader thread while
1278/// the handshake is in flight. The child's copies are closed by
1279/// the kernel on execve.
1280struct PipePair {
1281    read_fd: std::os::fd::OwnedFd,
1282    write_fd: std::os::fd::OwnedFd,
1283}
1284
1285impl PipePair {
1286    fn new() -> std::io::Result<Self> {
1287        use std::os::fd::FromRawFd;
1288        let mut fds = [0i32; 2];
1289        // SAFETY: `pipe2` writes two fds into the provided slot on
1290        // success. O_CLOEXEC ensures the fds are not leaked across
1291        // later execve calls on the parent side.
1292        let rc = unsafe { libc::pipe2(fds.as_mut_ptr(), libc::O_CLOEXEC) };
1293        if rc != 0 {
1294            return Err(std::io::Error::last_os_error());
1295        }
1296        // SAFETY: pipe2 returned success and gave us two fresh fds.
1297        let read_fd = unsafe { std::os::fd::OwnedFd::from_raw_fd(fds[0]) };
1298        let write_fd = unsafe { std::os::fd::OwnedFd::from_raw_fd(fds[1]) };
1299        Ok(Self { read_fd, write_fd })
1300    }
1301
1302    fn r_fd(&self) -> i32 {
1303        use std::os::fd::AsRawFd;
1304        self.read_fd.as_raw_fd()
1305    }
1306
1307    fn w_fd(&self) -> i32 {
1308        use std::os::fd::AsRawFd;
1309        self.write_fd.as_raw_fd()
1310    }
1311}
1312
1313/// Parent-side bundle carrying every resource the cgroup-placement
1314/// handshake needs after fork. Owned by the caller of
1315/// [`build_command`] until
1316/// [`spawn_with_cgroup_sync`] consumes it.
1317///
1318/// `notify` — the child writes its pid's bytes to the write end
1319/// as its first pre_exec step; the parent reads from the read end.
1320///
1321/// `release` — the parent writes a single byte to the write end
1322/// once the cgroup-placement update is committed; the child's
1323/// pre_exec blocks on a read of the read end.
1324///
1325/// `cgroup_name` — the user-facing cgroup name the test author
1326/// passed in `Op::RunPayload { cgroup: Some(name), .. }` or
1327/// `PayloadRun::in_cgroup(name)`. Threaded to
1328/// [`crate::cgroup::CgroupOps::place_task_during_handshake`]
1329/// at placement time so the trait implementation derives the
1330/// cgroup.procs path from its own parent-path knowledge, keeping
1331/// the host-side mock observable without touching cgroupfs.
1332///
1333/// `cgroup_procs_path` — the absolute `<cgroup>/cgroup.procs`
1334/// path, retained for diagnostic surfaces (the path appears in
1335/// trait-method `with_context` error messages so a failing
1336/// placement still names the exact filesystem path that didn't
1337/// accept the write).
1338struct CgroupSyncHandles {
1339    notify: PipePair,
1340    release: PipePair,
1341    cgroup_name: String,
1342    cgroup_procs_path: PathBuf,
1343}
1344
1345/// Async-signal-safe body of the cgroup-placement `pre_exec`
1346/// hook. Runs between fork and execve in the child.
1347///
1348/// Protocol:
1349/// 0. Close the child's inherited copies of the parent-owned
1350///    ends — `notify_read_fd` (child never reads notify) and
1351///    `release_write_fd` (child never writes release). This is
1352///    MANDATORY: without it the kernel still sees two writers on
1353///    the release pipe (parent's + child's own inherited copy),
1354///    so the parent's Drop of the release write end does NOT
1355///    deliver EOF to the child's `read(release_read_fd)` — the
1356///    child blocks forever. This is the canonical pipe-sync
1357///    fork-inherited-fd deadlock; closing the inherited copies
1358///    is what makes the sync work.
1359/// 1. Write the child's pid (as an LE i32, 4 bytes) to
1360///    `notify_write_fd` so the parent can begin the `cgroup.procs`
1361///    write. Close `notify_write_fd` immediately after so the
1362///    parent's read sees a fast EOF if the child crashes before
1363///    reaching the release read.
1364/// 2. Read a single release byte from `release_read_fd` to block
1365///    until the parent has committed the cgroup-placement write.
1366/// 3. Close `release_read_fd` (the kernel will also close it via
1367///    O_CLOEXEC on execve, but a prompt close frees the fd before
1368///    any user-provided pre_exec extension could observe it).
1369///
1370/// # Safety
1371///
1372/// This function runs between `fork(2)` and `execve(2)` in the
1373/// child. Only async-signal-safe operations are permitted — no
1374/// `malloc`, no `std::fs`, no `libc::printf`, no locks (including
1375/// the jemalloc arena). Every operation here is `getpid` / `write`
1376/// / `read` / `close`, all of which POSIX.1-2017 §2.4.3 lists as
1377/// AS-safe. In particular there is NO stdlib I/O, NO integer
1378/// formatting, and NO allocation — the pid is sent as 4 raw
1379/// little-endian bytes rather than an ASCII render, so no
1380/// formatting helper is reachable from the child side.
1381///
1382/// Errors from `write(2)` or `read(2)` (short writes, EPIPE from
1383/// a parent that abandoned the handshake) are mapped to
1384/// `io::Error::from_raw_os_error` and returned. The stdlib's
1385/// spawn loop forwards the errno through its CLOEXEC status pipe
1386/// so the parent's `spawn()` returns an actionable error rather
1387/// than silently racing through the placement step. Step 0's
1388/// `close(2)` failures are intentionally IGNORED — EBADF is
1389/// expected if the kernel is unusual about inherited fd numbering,
1390/// and any other errno here cannot be recovered from (the parent's
1391/// handshake still needs to run). The subsequent `write` / `read`
1392/// surfaces any real breakage.
1393fn cgroup_sync_pre_exec(
1394    notify_read_fd: libc::c_int,
1395    notify_write_fd: libc::c_int,
1396    release_read_fd: libc::c_int,
1397    release_write_fd: libc::c_int,
1398) -> std::io::Result<()> {
1399    // Step 0: close the child's inherited copies of the
1400    // parent-owned ends. MANDATORY to avoid deadlocking on the
1401    // subsequent `read(release_read_fd)` — without closing
1402    // `release_write_fd`, the kernel keeps the release pipe's
1403    // writer-count non-zero even when the parent drops its own
1404    // copy, so the child's read never EOFs. Symmetrically,
1405    // closing `notify_read_fd` frees a descriptor slot and keeps
1406    // the parent's notify read end the sole reader (defense in
1407    // depth — the protocol doesn't strictly require it since we
1408    // never EOF the notify pipe, but a tidy close is cheap).
1409    //
1410    // `libc::close` is AS-safe. Return codes are ignored: EBADF
1411    // is theoretically possible if the kernel ever renumbered
1412    // the inherited fd, and any other errno is non-actionable
1413    // between fork and execve. The write/read below surfaces any
1414    // real breakage.
1415    //
1416    // SAFETY: all four fd numbers were valid on the parent side
1417    // at the time of fork and the kernel duplicates them into
1418    // the child's fd table. Closing a fd that the kernel already
1419    // renumbered returns EBADF without effect — no memory
1420    // safety concern.
1421    unsafe {
1422        libc::close(notify_read_fd);
1423        libc::close(release_write_fd);
1424    }
1425
1426    // Step 1: publish pid. getpid(2) is AS-safe; the pid is a
1427    // raw i32, so we send its 4-byte little-endian encoding and
1428    // spare the child any integer-formatting work. A stack
1429    // buffer is the only storage; no allocation.
1430    let pid = unsafe { libc::getpid() };
1431    let pid_bytes = pid.to_le_bytes();
1432    let mut written = 0usize;
1433    while written < pid_bytes.len() {
1434        // SAFETY: writing into a raw fd that the parent owns the
1435        // read end of. `pid_bytes` is a live stack buffer.
1436        let n = unsafe {
1437            libc::write(
1438                notify_write_fd,
1439                pid_bytes.as_ptr().add(written) as *const libc::c_void,
1440                pid_bytes.len() - written,
1441            )
1442        };
1443        if n < 0 {
1444            let err = std::io::Error::last_os_error();
1445            // EINTR: retry. Every other errno (EPIPE from a
1446            // collapsed parent read end, EBADF, ...) is terminal
1447            // — surface it to the parent via the stdlib spawn
1448            // error channel.
1449            if err.raw_os_error() == Some(libc::EINTR) {
1450                continue;
1451            }
1452            return Err(err);
1453        }
1454        if n == 0 {
1455            // Zero-byte write is not defined for pipes; treat as
1456            // EIO rather than loop forever.
1457            return Err(std::io::Error::from_raw_os_error(libc::EIO));
1458        }
1459        written += n as usize;
1460    }
1461    // Close the notify write end so the parent's read gets EOF if
1462    // the child subsequently crashes before the release read.
1463    // SAFETY: notify_write_fd is a valid fd the child inherited
1464    // from the parent; closing it here does not affect the parent's
1465    // read end.
1466    unsafe {
1467        libc::close(notify_write_fd);
1468    }
1469
1470    // Step 2: block on the release byte. One byte is enough — the
1471    // payload is a synchronization token, not data. Loop to handle
1472    // EINTR and short reads (partial-byte reads are impossible on
1473    // a 1-byte read, but the loop keeps the code uniform with the
1474    // write side).
1475    let mut buf = [0u8; 1];
1476    let mut read_total = 0usize;
1477    while read_total < buf.len() {
1478        // SAFETY: reading from a raw fd that the parent owns the
1479        // write end of. `buf` is a live stack buffer.
1480        let n = unsafe {
1481            libc::read(
1482                release_read_fd,
1483                buf.as_mut_ptr().add(read_total) as *mut libc::c_void,
1484                buf.len() - read_total,
1485            )
1486        };
1487        if n < 0 {
1488            let err = std::io::Error::last_os_error();
1489            if err.raw_os_error() == Some(libc::EINTR) {
1490                continue;
1491            }
1492            return Err(err);
1493        }
1494        if n == 0 {
1495            // EOF before the release byte arrived — the parent
1496            // abandoned the handshake (crashed / failed cgroup
1497            // write). Fail the pre_exec so the stdlib spawn path
1498            // surfaces the abort instead of letting the child
1499            // execve into an unplaced cgroup.
1500            return Err(std::io::Error::from_raw_os_error(libc::EPIPE));
1501        }
1502        read_total += n as usize;
1503    }
1504    // Step 3: close the release read end. The kernel would do
1505    // this on execve via O_CLOEXEC anyway, but an explicit close
1506    // frees the fd now.
1507    // SAFETY: release_read_fd is a valid fd the child inherited
1508    // from the parent.
1509    unsafe {
1510        libc::close(release_read_fd);
1511    }
1512    Ok(())
1513}
1514
1515/// Complete the cgroup-placement handshake on a child that was
1516/// spawned with a [`build_command`]-supplied pre_exec hook.
1517///
1518/// The caller MUST run `Command::spawn()` on a dedicated thread
1519/// because the stdlib's `spawn()` blocks on its CLOEXEC status
1520/// pipe until the child has successfully execve'd — and the
1521/// child's pre_exec blocks on the release read until this
1522/// function finishes. Without the thread split the two would
1523/// deadlock.
1524///
1525/// Protocol (parent side, main thread):
1526/// 1. Read the child's pid bytes from the notify read end.
1527/// 2. Dispatch placement via
1528///    [`crate::cgroup::CgroupOps::place_task_during_handshake`],
1529///    threading the user-facing `cgroup_name` and the just-read
1530///    child pid. The trait implementation
1531///    ([`crate::cgroup::CgroupManager::place_task_during_handshake`]
1532///    in production) derives the `cgroup.procs` path from its own
1533///    parent-path knowledge and performs the write under the same
1534///    `write_with_timeout` shape as the other cgroupfs writes;
1535///    `MockCgroupOps` records the call without touching the
1536///    filesystem so handler-level tests can observe placement
1537///    against `Op::RunPayload { cgroup: Some(..), .. }`.
1538/// 3. Write the single release byte to the release write end,
1539///    then close it so any subsequent short-read / EOF on the
1540///    child side is prompt.
1541/// 4. Close the notify read end.
1542///
1543/// The function returns the child pid so callers can cross-check
1544/// it against `Child::id()` once the spawn thread returns.
1545/// Wrapped in `Result<libc::pid_t>` because the notify read or
1546/// the placement trait call can fail; a failure drops the
1547/// handle, which also closes the release write end, giving the
1548/// child's pre_exec a fast EOF-driven bail (the trait contract
1549/// pins this responsibility on the caller — see
1550/// [`crate::cgroup::CgroupOps::place_task_during_handshake`]).
1551fn spawn_with_cgroup_sync(
1552    handles: CgroupSyncHandles,
1553    cgroup_ops: &dyn crate::cgroup::CgroupOps,
1554) -> Result<libc::pid_t> {
1555    use std::io::{Read, Write};
1556    let CgroupSyncHandles {
1557        notify,
1558        release,
1559        cgroup_name,
1560        cgroup_procs_path,
1561    } = handles;
1562    // Step 1: read child pid. Keep the parent-side notify_w
1563    // OPEN during the read — closing it before fork would let
1564    // stdlib's internal `pipe2` for the CLOEXEC status pipe
1565    // recycle our fd number; the child then inherits a state
1566    // where its `notify_write_fd` points at stdlib's status
1567    // pipe, not our notify pipe. `write(notify_write_fd, pid)`
1568    // in the child would corrupt stdlib's protocol and the
1569    // parent's `read_exact` on our notify pipe would see an
1570    // indefinite wait because no data ever arrives on the
1571    // intended pipe. The canonical rule: drop your parent
1572    // copy of the child's write end AFTER the child has
1573    // written (or died), not before. We achieve that here by
1574    // holding `notify_w` alive across the read and dropping
1575    // it only at the end.
1576    //
1577    // Child-died-without-writing detection: if the child
1578    // dies before step 1's write, its inherited `notify_w`
1579    // closes on `_exit`. The pipe then has ONLY the parent's
1580    // `notify_w` as a writer — still non-zero — and our
1581    // `read_exact` would block indefinitely. Guard against
1582    // that with a bounded `poll(2)`: wait up to 5s for data,
1583    // then bail with an actionable error naming the
1584    // probable cause (child pre_exec failed before writing).
1585    // The spawn thread's own error (`cmd.spawn() → Err`)
1586    // surfaces too, and `drive_cgroup_handshake` returns
1587    // whichever the caller sees first.
1588    let PipePair {
1589        read_fd: notify_r,
1590        write_fd: notify_w,
1591    } = notify;
1592    {
1593        let pfd_fd = std::os::fd::AsRawFd::as_raw_fd(&notify_r);
1594        let mut pfd = libc::pollfd {
1595            fd: pfd_fd,
1596            events: libc::POLLIN,
1597            revents: 0,
1598        };
1599        // 5s ceiling. Any legitimate fork + pre_exec sequence
1600        // completes in low milliseconds; 5s is loose for even
1601        // the most contended CI host and tight enough to
1602        // flag a silent child-death promptly.
1603        let poll_ms: libc::c_int = 5_000;
1604        let ready = unsafe { libc::poll(&mut pfd, 1, poll_ms) };
1605        if ready < 0 {
1606            let e = std::io::Error::last_os_error();
1607            if e.raw_os_error() != Some(libc::EINTR) {
1608                return Err(
1609                    anyhow::Error::new(e).context("poll(notify_r) for cgroup-sync pid-notify")
1610                );
1611            }
1612        } else if ready == 0 {
1613            anyhow::bail!(
1614                "cgroup-sync notify pipe: no pid written by child within 5s. \
1615                 The child's pre_exec likely failed before Step 1 (possibly \
1616                 EBADF on `notify_write_fd` because the fd number was \
1617                 recycled by stdlib's internal pipe2). Check the spawn \
1618                 thread's error for the underlying cause."
1619            );
1620        }
1621    }
1622    let mut notify_file = std::fs::File::from(notify_r);
1623    let mut pid_bytes = [0u8; 4];
1624    notify_file
1625        .read_exact(&mut pid_bytes)
1626        .context("read child pid from cgroup-sync notify pipe")?;
1627    drop(notify_file);
1628    // Now it is safe to close parent's notify write end:
1629    // the child has either written its pid (success path) or
1630    // the poll bailed (failure path, already returned above).
1631    drop(notify_w);
1632    let child_pid = libc::pid_t::from_le_bytes(pid_bytes);
1633    anyhow::ensure!(
1634        child_pid > 0,
1635        "cgroup-sync notify pipe returned non-positive pid {child_pid}; \
1636         the child's pre_exec hook sent a corrupted pid — fail the \
1637         handshake rather than write a bad value to cgroup.procs"
1638    );
1639
1640    // Step 2: dispatch placement through the CgroupOps trait so
1641    // the production write goes through `CgroupManager` and the
1642    // test path's `MockCgroupOps` records the placement for
1643    // handler-level observability. The trait implementation
1644    // derives the cgroup.procs path from its own parent-path
1645    // knowledge; `cgroup_procs_path` is held alongside
1646    // `cgroup_name` purely so the error chain still names the
1647    // path that didn't accept the write. We are on the parent's
1648    // main thread post-fork — no AS-safety constraint applies,
1649    // so the trait dispatch's heap allocations and anyhow error
1650    // chain are fair game (pre_exec runs on the CHILD side and
1651    // never reaches this branch).
1652    cgroup_ops
1653        .place_task_during_handshake(&cgroup_name, child_pid)
1654        .with_context(|| {
1655            format!(
1656                "place pid {child_pid} into cgroup '{cgroup_name}' (cgroup.procs at {}) for cgroup-sync placement",
1657                cgroup_procs_path.display(),
1658            )
1659        })?;
1660
1661    // Step 3: release the child. One byte is enough; the content
1662    // is ignored by the reader.
1663    let PipePair {
1664        read_fd: release_r,
1665        write_fd: release_w,
1666    } = release;
1667    drop(release_r);
1668    let mut release_file = std::fs::File::from(release_w);
1669    release_file
1670        .write_all(&[1u8])
1671        .context("write release byte to cgroup-sync release pipe")?;
1672    drop(release_file);
1673
1674    Ok(child_pid)
1675}
1676
1677/// Spawn a Command that carries a cgroup-sync pre_exec hook.
1678/// Runs `Command::spawn()` on a dedicated thread (it blocks on
1679/// the stdlib CLOEXEC status pipe until the child execve's,
1680/// which can't happen until the parent's main thread has
1681/// released the pre_exec handshake), drives the
1682/// [`spawn_with_cgroup_sync`] protocol on the main thread, then
1683/// joins the spawn thread to collect the resulting [`std::process::Child`].
1684///
1685/// If either the spawn or the handshake fails, the caller drops
1686/// the remaining pipe handles (via the [`CgroupSyncHandles`]
1687/// consumption in `spawn_with_cgroup_sync`), which causes the
1688/// child's pre_exec read to unblock with EOF and fail with
1689/// EPIPE. The child never reaches execve, the spawn thread
1690/// surfaces the pre_exec error through its stdlib error
1691/// channel, and we propagate the first error the caller sees.
1692fn drive_cgroup_handshake(
1693    cmd: std::process::Command,
1694    handles: CgroupSyncHandles,
1695    binary: &str,
1696    cgroup_ops: &dyn crate::cgroup::CgroupOps,
1697) -> Result<std::process::Child> {
1698    // Move the Command into a thread so its blocking `spawn()`
1699    // doesn't deadlock with the child's pre_exec handshake.
1700    let binary_owned = binary.to_string();
1701    let spawn_thread = std::thread::spawn(move || -> Result<std::process::Child> {
1702        let mut cmd = cmd;
1703        cmd.spawn()
1704            .map_err(|e| spawn_error_context(e, &binary_owned))
1705    });
1706
1707    // Drive the placement protocol on the main thread. If this
1708    // fails we drop the remaining handle bits so the child sees
1709    // EOF on its release read; the spawn thread will then
1710    // surface the pre_exec EPIPE through its stdlib error
1711    // channel.
1712    let sync_result = spawn_with_cgroup_sync(handles, cgroup_ops);
1713
1714    // Join the spawn thread regardless of sync outcome so a
1715    // failing handshake does not leak a background std thread.
1716    // A join error is either a panic in the spawn closure (very
1717    // rare under `panic = "unwind"`) or an explicit poisoning;
1718    // we map it to a generic anyhow error so the caller still
1719    // gets a meaningful chain.
1720    let spawn_result = spawn_thread
1721        .join()
1722        .map_err(|_| anyhow!("cgroup-sync spawn thread panicked"))?;
1723
1724    // Precedence: if the sync failed, that error is the root
1725    // cause (the spawn will have failed TOO because the child
1726    // bailed on EPIPE, but the sync error carries the actionable
1727    // diagnostic — "failed to open cgroup.procs" / "short read
1728    // from notify pipe"). Return the sync error first and
1729    // discard the spawn error.
1730    sync_result?;
1731    spawn_result
1732}
1733
1734/// Actionable error wrapper for Command::spawn/.output failures.
1735/// ENOENT — the binary isn't on PATH inside the guest — gets the
1736/// remediation paths spelled out: `-i`/`--include-files` for CLI
1737/// invocations, pre-install in the initramfs for `#[ktstr_test]`
1738/// entries (which cannot pass `-i`). Other errors keep the minimal
1739/// `"spawn '<binary>'"` context so the underlying io::Error chain
1740/// surfaces unchanged.
1741///
1742/// **Shebang interpreter case.** `execve(2)` ALSO returns ENOENT
1743/// when `binary` is itself present but is a script whose `#!`
1744/// shebang names an interpreter that is missing in the guest
1745/// (e.g. `#!/usr/bin/python3` when python3 is absent from
1746/// initramfs). The kernel surfaces ENOENT with the script's path
1747/// even though the missing file is the interpreter — there is no
1748/// userspace signal that distinguishes "binary missing" from
1749/// "interpreter missing". The wrapped message therefore names
1750/// both the binary and the interpreter as candidate missing
1751/// artifacts and tells the operator to package both with `-i`
1752/// (CLI) or pre-install both in the initramfs
1753/// (`#[ktstr_test]`); the production message body carries this
1754/// guidance verbatim, the test
1755/// `spawn_error_context_enoent_attaches_remediation` pins it.
1756fn spawn_error_context(err: std::io::Error, binary: &str) -> anyhow::Error {
1757    if err.kind() == std::io::ErrorKind::NotFound {
1758        anyhow::Error::new(err).context(format!(
1759            "spawn '{binary}': binary not found on guest PATH. \
1760             Remediation: for CLI invocations (ktstr / cargo-ktstr \
1761             shell, run, …), package the binary with `-i {binary}` \
1762             / `--include-files {binary}` so it lands on the guest \
1763             PATH under `/include-files/`. For `#[ktstr_test]` \
1764             entries, pre-install the binary in the base initramfs \
1765             — the macro surface does not expose `-i`. If `{binary}` \
1766             is a script, execve(2) ALSO returns ENOENT when the \
1767             `#!` shebang names an interpreter missing from the \
1768             guest (the error names the script but the missing \
1769             file is the interpreter); package the interpreter \
1770             the same way — `-i <interpreter>` for CLI, pre-install \
1771             for `#[ktstr_test]`."
1772        ))
1773    } else {
1774        anyhow::Error::new(err).context(format!("spawn '{binary}'"))
1775    }
1776}
1777
1778/// RAII guard that saves the process's `SIGCHLD` disposition, sets
1779/// it to `SIG_DFL` on construction, and restores the saved value on
1780/// `Drop`. Required for [`spawn_and_wait`] and the background
1781/// [`spawn_child`] path because the guest ktstr-init sets
1782/// `SIGCHLD = SIG_IGN` at startup in `src/vmm/rust_init/init.rs`
1783/// ("Ignore SIGCHLD so child processes don't become zombies").
1784/// Under `SIG_IGN` the kernel auto-reaps children, so
1785/// `waitpid(child_pid)` returns `ECHILD` and Rust std's
1786/// `Command::spawn()` / `.output()` / `Child::wait()` internals
1787/// panic with "wait() should either return Ok or panic".
1788///
1789/// The shell-exec mode in `src/vmm/rust_init/modes.rs` already documents
1790/// this exact gotcha and uses the same save/set-`SIG_DFL` /
1791/// restore-on-completion pattern. `PayloadRun::run` /
1792/// `PayloadRun::spawn` are the second dispatch site that needs it.
1793///
1794/// For background spawns, the guard lives on [`PayloadHandle`]
1795/// until `.wait()` / `.kill()` / `Drop` consumes the handle, so
1796/// the child is reap-able via `waitpid` for the entire window
1797/// between spawn and final disposition. Foreground spawns
1798/// (`spawn_and_wait`) scope the guard to the `.output()` call —
1799/// the child is reaped inline, no lingering state.
1800/// Pins the [`ThreadId`] of the first `SigchldScope` constructed in
1801/// this process. Every subsequent construction must come from the
1802/// same thread: `libc::signal` is not thread-safe, and concurrent
1803/// installs from distinct threads would race on the process-wide
1804/// `SIGCHLD` disposition. The unset state of the `OnceLock` means
1805/// "uninitialized" (no `SigchldScope` has been constructed yet in
1806/// this process).
1807///
1808/// `ThreadId` is the std-library opaque thread identifier with
1809/// guaranteed uniqueness for the lifetime of the process and a
1810/// `PartialEq` impl that compares the underlying `NonZero<u64>`
1811/// directly. Storing the actual `ThreadId` (instead of a hash of
1812/// it) eliminates the collision-window risk that any hash-based
1813/// encoding carries, no matter how astronomically unlikely.
1814///
1815/// Multiple concurrent `SigchldScope` instances ARE allowed on
1816/// the same thread — each `PayloadHandle` carries one, and a
1817/// single-threaded caller can hold many handles simultaneously
1818/// without racing the libc::signal install. Drop order must
1819/// remain LIFO for the handler-restore chain to leave the
1820/// original disposition intact; this is the caller's obligation
1821/// (handles dropped in reverse creation order, which is the
1822/// default when locals go out of scope).
1823static SIGCHLD_SCOPE_OWNER_THREAD: OnceLock<ThreadId> = OnceLock::new();
1824
1825struct SigchldScope {
1826    prev: libc::sighandler_t,
1827    /// Marker that makes `SigchldScope` `!Send` AND `!Sync` at the
1828    /// type-system level. `libc::sighandler_t` is `size_t` on Linux
1829    /// (a plain integer that auto-implements both `Send` and `Sync`),
1830    /// so without this marker the struct would be `Send + Sync` and
1831    /// the compiler would silently accept a move across thread
1832    /// boundaries. The `OnceLock`-based runtime pin in `new` and
1833    /// `drop` only catches an actual install/restore on the wrong
1834    /// thread; the `PhantomData<*const ()>` adds a compile-time
1835    /// barrier so a `thread::spawn(move || { let _ = scope; })`
1836    /// fails to type-check instead of relying on the runtime check
1837    /// to catch it.
1838    ///
1839    /// `*const T` carries explicit `!Send` and `!Sync` negative
1840    /// impls in `core::marker` (`impl<T: PointeeSized> !Send for
1841    /// *const T` at marker.rs:100, `impl<T: PointeeSized> !Sync for
1842    /// *const T` at marker.rs:680). `Send` and `Sync` are
1843    /// independent auto traits — neither implies the other — and
1844    /// `PhantomData<T>` (a generic struct with no manual Send/Sync
1845    /// impl) propagates each independently via auto-trait
1846    /// inference. So `PhantomData<*const ()>` is `!Send` because
1847    /// `*const ()` is `!Send`, AND `!Sync` because `*const ()` is
1848    /// `!Sync`. Both come from the marker, not from one implying
1849    /// the other.
1850    _not_send: std::marker::PhantomData<*const ()>,
1851}
1852
1853impl SigchldScope {
1854    /// Save current `SIGCHLD` handler and install `SIG_DFL`.
1855    /// On host builds the init never flips SIGCHLD to SIG_IGN, so
1856    /// `prev` equals `SIG_DFL` and Drop is a no-op mathematically
1857    /// — the extra syscall is cheap and keeps behavior uniform
1858    /// between host and guest.
1859    ///
1860    /// # Panics
1861    ///
1862    /// Panics if called from a thread different from the one that
1863    /// constructed the first `SigchldScope` in this process.
1864    /// `libc::signal` is not thread-safe and cross-thread installs
1865    /// would race on the process-wide SIGCHLD disposition.
1866    fn new() -> Self {
1867        let tid = std::thread::current().id();
1868        // Pin the first thread that ever constructs a SigchldScope
1869        // in this process via `OnceLock::get_or_init`, then enforce
1870        // the pin on every subsequent construction. `get_or_init`
1871        // is internally synchronized: concurrent first-callers
1872        // race only on which one's `init` closure runs, and the
1873        // others observe the winner's value via the same call. The
1874        // returned reference is the canonical pinned ThreadId.
1875        let pinned = SIGCHLD_SCOPE_OWNER_THREAD.get_or_init(|| tid);
1876        if *pinned != tid {
1877            panic!(
1878                "SigchldScope constructed on a different thread than the first \
1879                 owner (pinned thread id={pinned:?}, this thread's id={tid:?}). \
1880                 libc::signal is not thread-safe; cross-thread installs race on \
1881                 the process-wide SIGCHLD disposition."
1882            );
1883        }
1884        // SAFETY: SIGCHLD_SCOPE_OWNER_THREAD pins construction to
1885        // a single thread across the whole process, so no other
1886        // thread is concurrently installing a SIGCHLD handler.
1887        // The `_not_send: PhantomData<*const ()>` field below makes
1888        // the type `!Send` so a move across threads fails to compile
1889        // — Drop is guaranteed to run on the same thread that
1890        // constructed the scope.
1891        let prev = unsafe { libc::signal(libc::SIGCHLD, libc::SIG_DFL) };
1892        SigchldScope {
1893            prev,
1894            _not_send: std::marker::PhantomData,
1895        }
1896    }
1897}
1898
1899impl Drop for SigchldScope {
1900    fn drop(&mut self) {
1901        // Defense-in-depth: the `!Send` marker on the struct
1902        // prevents a compile-time cross-thread move, but a future
1903        // refactor that adds an explicit `unsafe impl Send`
1904        // workaround would silently bypass that guard. Re-check the
1905        // owner-thread pin at drop time so a wrong-thread restore
1906        // panics loudly instead of quietly racing the process-wide
1907        // SIGCHLD disposition.
1908        let pinned = SIGCHLD_SCOPE_OWNER_THREAD
1909            .get()
1910            .expect("SIGCHLD_SCOPE_OWNER_THREAD must be initialized — set by SigchldScope::new");
1911        assert_eq!(
1912            *pinned,
1913            std::thread::current().id(),
1914            "SigchldScope dropped on a different thread than the pinned owner \
1915             (pinned={pinned:?}, this thread={:?}). libc::signal is not \
1916             thread-safe and the construct-side `!Send` marker should have \
1917             made this impossible at compile time — investigate any \
1918             `unsafe impl Send for SigchldScope` that bypassed it.",
1919            std::thread::current().id(),
1920        );
1921        // SAFETY: same rationale as `new` — the owner-thread pin
1922        // guarantees no concurrent installer on another thread.
1923        // Restoring in LIFO order across nested scopes unwinds
1924        // back to the original disposition; drop-order is the
1925        // caller's obligation.
1926        unsafe {
1927            libc::signal(libc::SIGCHLD, self.prev);
1928        }
1929    }
1930}
1931
1932// Compile-time pin: `SigchldScope` must be neither `Send` nor `Sync`.
1933//
1934// `SigchldScope::new` and its `Drop` install/restore the process-wide
1935// `SIGCHLD` disposition via `libc::signal`, which is documented as
1936// async-signal-safe but not thread-safe with respect to concurrent
1937// installs. The `_not_send: PhantomData<*const ()>` field at the
1938// struct definition is what makes the type `!Send` AND `!Sync`.
1939// `Send` and `Sync` are independent auto traits — neither implies
1940// the other. `*const T` carries explicit `!Send` and `!Sync`
1941// negative impls in `core::marker` (`impl<T: PointeeSized> !Send
1942// for *const T` at marker.rs:100, `impl<T: PointeeSized> !Sync for
1943// *const T` at marker.rs:680), and `PhantomData<T>` propagates each
1944// independently via auto-trait inference. So `PhantomData<*const
1945// ()>` is `!Send` because `*const ()` is `!Send`, AND `!Sync`
1946// because `*const ()` is `!Sync` — both from the marker, not from
1947// one implying the other. This block re-asserts that invariant at
1948// compile time so a future refactor that drops the marker, replaces
1949// it with a `Send` type, or adds an explicit `unsafe impl
1950// Send`/`unsafe impl Sync` for `SigchldScope` fails to compile here
1951// instead of silently allowing a cross-thread move that would race
1952// the SIGCHLD install on another thread.
1953//
1954// Mechanism (mirrors `static_assertions::assert_not_impl_any!`): a
1955// blanket `AmbiguousIfImpl<()>` impl applies to every type, while a
1956// specialized `AmbiguousIfImpl<Invalid{Send,Sync}>` impl applies only
1957// to types that `Send` / `Sync` respectively. If `SigchldScope`
1958// implemented either, two impls would match `AmbiguousIfImpl<_>` and
1959// type inference for `_` would be ambiguous, producing a compile
1960// error. With `SigchldScope: !Send + !Sync`, only the blanket impl
1961// matches and the assertion compiles.
1962//
1963// `static_assertions` is a transitive dep (via `compact_str`) but not
1964// a direct dependency; inlining the trick keeps the assertion local
1965// without growing the direct dep graph for one use site.
1966const _: fn() = || {
1967    trait AmbiguousIfImpl<A> {
1968        fn some_item() {}
1969    }
1970    impl<T: ?Sized> AmbiguousIfImpl<()> for T {}
1971
1972    #[allow(dead_code)]
1973    struct InvalidSend;
1974    impl<T: ?Sized + Send> AmbiguousIfImpl<InvalidSend> for T {}
1975
1976    #[allow(dead_code)]
1977    struct InvalidSync;
1978    impl<T: ?Sized + Sync> AmbiguousIfImpl<InvalidSync> for T {}
1979
1980    let _ = <SigchldScope as AmbiguousIfImpl<_>>::some_item;
1981};
1982
1983/// Foreground path: spawn + wait + capture. Used by `.run()`.
1984///
1985/// Wraps the child's lifetime in a [`SigchldScope`] so `waitpid`
1986/// sees `SIG_DFL` and returns the child's real exit status instead
1987/// of `ECHILD` under the guest init's `SIGCHLD = SIG_IGN`.
1988///
1989/// When `timeout` is `Some`, a poll loop bounds the payload's
1990/// runtime. Exceeding the deadline fires
1991/// [`kill_payload_process_group`] (killpg + single-pid SIGKILL)
1992/// so fork descendants die and release the pipes, then
1993/// [`wait_and_capture`] drains whatever output accumulated before
1994/// the kill. The `SpawnOutput` returned on timeout carries the
1995/// partial output and the post-kill exit code; the caller decides
1996/// whether that counts as a test failure.
1997fn spawn_and_wait(
1998    binary: &str,
1999    args: &[String],
2000    cgroup: Option<(&str, &std::path::Path)>,
2001    timeout: Option<Duration>,
2002    uses_parent_pgrp: bool,
2003    cgroup_ops: &dyn crate::cgroup::CgroupOps,
2004) -> Result<SpawnOutput> {
2005    let _sigchld = SigchldScope::new();
2006    let (cmd, sync_handles) = build_command(binary, args, cgroup, uses_parent_pgrp)?;
2007    let mut child = match sync_handles {
2008        Some(handles) => drive_cgroup_handshake(cmd, handles, binary, cgroup_ops)?,
2009        None => {
2010            let mut cmd = cmd;
2011            cmd.spawn().map_err(|e| spawn_error_context(e, binary))?
2012        }
2013    };
2014    match timeout {
2015        Some(deadline) => wait_with_deadline(&mut child, deadline, binary, uses_parent_pgrp),
2016        None => match wait_and_capture(&mut child) {
2017            Ok(out) => Ok(out),
2018            Err(e) => {
2019                kill_payload_process_group(&child, binary, uses_parent_pgrp);
2020                let _ = child.wait();
2021                Err(e)
2022            }
2023        },
2024    }
2025}
2026
2027/// Block in the kernel until the child exits or `timeout` elapses.
2028/// On expiry, kill the whole process group (killpg + single-pid
2029/// SIGKILL) and drain captured output.
2030///
2031/// Implementation uses `pidfd_open(2)` + `epoll_wait` so the waiter
2032/// is kernel-blocked instead of spinning on a 10ms `try_wait` loop.
2033/// The earlier poll burned one wake per 10ms for the entire payload
2034/// runtime (typically multi-second schbench / fio runs), producing a
2035/// small but measurable CPU spike on every timed payload; pidfd
2036/// parks the thread until the kernel signals child exit, so idle
2037/// waiters contribute zero CPU. Minimum kernel: Linux 5.3.
2038///
2039/// Deadline honoring: the `epoll_wait` timeout is re-derived from
2040/// `saturating_duration_since` each iteration so `EINTR` restarts
2041/// narrow the remaining window rather than extending it.
2042fn wait_with_deadline(
2043    child: &mut std::process::Child,
2044    timeout: Duration,
2045    payload_name: &str,
2046    uses_parent_pgrp: bool,
2047) -> Result<SpawnOutput> {
2048    use nix::sys::epoll::{Epoll, EpollCreateFlags, EpollEvent, EpollFlags, EpollTimeout};
2049    use std::os::fd::{AsFd, FromRawFd, OwnedFd};
2050
2051    let deadline = std::time::Instant::now() + timeout;
2052
2053    let pid =
2054        libc::pid_t::try_from(child.id()).expect("child pid fits in pid_t (Linux pid_max <= 2^22)");
2055    // `pidfd_open(pid, 0)`: returns an fd that becomes readable when
2056    // the pid exits. No `PIDFD_NONBLOCK` flag — epoll is the gate.
2057    let pidfd_raw = unsafe { libc::syscall(libc::SYS_pidfd_open, pid, 0i32) };
2058    if pidfd_raw < 0 {
2059        return Err(std::io::Error::last_os_error()).with_context(|| format!("pidfd_open({pid})"));
2060    }
2061    // SAFETY: the syscall succeeded and returned a fresh fd.
2062    let pidfd: OwnedFd = unsafe { OwnedFd::from_raw_fd(pidfd_raw as i32) };
2063
2064    let epoll = Epoll::new(EpollCreateFlags::EPOLL_CLOEXEC)
2065        .with_context(|| "epoll_create1 for pidfd wait")?;
2066    // `data` field is unused — we only ever watch one fd. The add()
2067    // syscall still needs an `EpollEvent` with populated events.
2068    let event = EpollEvent::new(EpollFlags::EPOLLIN, 0);
2069    epoll
2070        .add(pidfd.as_fd(), event)
2071        .with_context(|| "epoll_ctl ADD pidfd")?;
2072
2073    let mut events = [EpollEvent::empty()];
2074    loop {
2075        // Race-safe reap attempt first: if the child exited between
2076        // spawn and pidfd_open, or between iterations while we were
2077        // outside epoll_wait, `try_wait` catches it without a needless
2078        // syscall.
2079        if child
2080            .try_wait()
2081            .with_context(|| "try_wait child")?
2082            .is_some()
2083        {
2084            return match wait_and_capture(child) {
2085                Ok(out) => Ok(out),
2086                Err(e) => {
2087                    kill_payload_process_group(child, payload_name, uses_parent_pgrp);
2088                    let _ = child.wait();
2089                    Err(e)
2090                }
2091            };
2092        }
2093
2094        let remaining = deadline.saturating_duration_since(std::time::Instant::now());
2095        if remaining.is_zero() {
2096            kill_payload_process_group(child, payload_name, uses_parent_pgrp);
2097            return match wait_and_capture(child) {
2098                Ok(out) => Ok(out),
2099                Err(e) => {
2100                    let _ = child.wait();
2101                    Err(e).with_context(|| format!("drain after timeout of {timeout:?}"))
2102                }
2103            };
2104        }
2105
2106        // `PollTimeout` (aliased as `EpollTimeout`) stores the value
2107        // as `i32`, so `TryFrom<u32>` rejects any input larger than
2108        // `i32::MAX` (~24.8 days of milliseconds). Clamp both casts —
2109        // `u128 → u32` and then `u32 → i32`-range — so a
2110        // `Duration::MAX`-shaped remainder saturates to the max
2111        // accepted value instead of bubbling up a conversion error.
2112        let ms_u32 = u32::try_from(remaining.as_millis()).unwrap_or(u32::MAX);
2113        let ms_u32 = std::cmp::min(ms_u32, i32::MAX as u32);
2114        let timeout_param =
2115            EpollTimeout::try_from(ms_u32).with_context(|| "epoll timeout conversion")?;
2116
2117        match epoll.wait(&mut events, timeout_param) {
2118            Ok(_) => {
2119                // Either the pidfd went readable (child exit) OR the
2120                // timeout fired (ready_count == 0). Loop back: the
2121                // `try_wait` at top handles the exit path, the
2122                // `remaining.is_zero()` branch handles the deadline.
2123            }
2124            Err(nix::errno::Errno::EINTR) => {
2125                // Signal interrupted the wait; loop and re-compute
2126                // the remaining window.
2127            }
2128            Err(e) => {
2129                return Err(anyhow::anyhow!("epoll_wait: {e}"));
2130            }
2131        }
2132    }
2133}
2134
2135/// Background path: spawn without waiting. Returns the live
2136/// [`std::process::Child`] plus a [`SigchldScope`] that must be held for the
2137/// child's lifetime — [`PayloadHandle`] keeps it alive until
2138/// `.wait()` / `.kill()` / `Drop` so `waitpid` during reap sees
2139/// `SIG_DFL` and observes the child's real exit.
2140fn spawn_child(
2141    binary: &str,
2142    args: &[String],
2143    cgroup: Option<(&str, &std::path::Path)>,
2144    uses_parent_pgrp: bool,
2145    cgroup_ops: &dyn crate::cgroup::CgroupOps,
2146) -> Result<(std::process::Child, SigchldScope)> {
2147    let sigchld = SigchldScope::new();
2148    let (cmd, sync_handles) = build_command(binary, args, cgroup, uses_parent_pgrp)?;
2149    let child = match sync_handles {
2150        Some(handles) => drive_cgroup_handshake(cmd, handles, binary, cgroup_ops)?,
2151        None => {
2152            let mut cmd = cmd;
2153            cmd.spawn().map_err(|e| spawn_error_context(e, binary))?
2154        }
2155    };
2156    Ok((child, sigchld))
2157}
2158
2159/// Per-stream cap on captured child output. 16 MiB covers every
2160/// realistic benchmark stdout in the crate (typical schbench /
2161/// stress-ng flows emit kilobytes to low-hundreds-of-KB)
2162/// with multiple orders of magnitude of slack, while cutting off
2163/// OOM pressure from a pathological payload that prints unbounded
2164/// GBs. Output past the cap is truncated, not errored, so downstream
2165/// (metric extraction, sidecar) still sees a prefix — the only loss
2166/// is the tail, which is rarely load-bearing. Each truncation emits
2167/// a paired `eprintln!` + `tracing::warn!` notice naming the stream
2168/// and the cap byte count.
2169pub(crate) const MAX_CAPTURED_STREAM_BYTES: u64 = 16 * 1024 * 1024;
2170
2171/// Reap a (possibly already-killed) [`std::process::Child`]: wait for it to
2172/// exit, drain stdout + stderr, return the captured output.
2173///
2174/// Takes `&mut Child` so callers retain ownership and can
2175/// `kill_payload_process_group` + `wait` to clean up descendants
2176/// when this function returns `Err` (e.g. a reader thread panicked
2177/// or the wait syscall itself failed). An owned-child signature
2178/// would lose the handle inside this function and leave descendants
2179/// running because `std::process::Child`'s `Drop` is a no-op.
2180///
2181/// Sequential stdout-then-stderr reads deadlock when the child
2182/// fills one pipe buffer (typically 64KiB) while the other is
2183/// unread — the child blocks on write, the parent blocks on read
2184/// of the empty pipe. Drain both pipes concurrently via helper
2185/// threads, mirroring what `std::process::Command::output` does
2186/// for the foreground path.
2187///
2188/// Each reader thread wraps its source in
2189/// `Read::take(MAX_CAPTURED_STREAM_BYTES)` — see the constant's
2190/// rationale — so a runaway child cannot OOM the host. The tail
2191/// past the cap is discarded; the metric-extraction pipeline
2192/// always receives a bounded buffer.
2193fn wait_and_capture(child: &mut std::process::Child) -> Result<SpawnOutput> {
2194    let stdout_handle = child.stdout.take().map(|out| {
2195        std::thread::spawn(move || -> std::io::Result<(String, bool)> {
2196            drain_capped(out, "stdout")
2197        })
2198    });
2199    let stderr_handle = child.stderr.take().map(|err| {
2200        std::thread::spawn(move || -> std::io::Result<(String, bool)> {
2201            drain_capped(err, "stderr")
2202        })
2203    });
2204    let status = child.wait().with_context(|| "wait child")?;
2205    // `.join().unwrap()` below is NOT a bug: the workspace builds
2206    // with `panic = "abort"` in release (see Cargo.toml
2207    // `[profile.release]`), so a panicked reader thread aborts the
2208    // whole process and `join()` never returns an
2209    // `Err(Box<dyn Any + Send>)` (`std::thread::Result::Err`). The
2210    // historic `.map_err(|_| anyhow!("...panicked"))` arm could not
2211    // fire and misled readers into expecting a recoverable error.
2212    //
2213    // Under cargo's default `panic = "unwind"` (which the dev and
2214    // test profiles both inherit — only `[profile.release]` flips
2215    // to abort in this crate), a reader-thread panic DOES unwind
2216    // into `thread::Result::Err`. The `.unwrap()` then re-panics on
2217    // the main thread, which is the key test-profile behavior: the
2218    // libtest / nextest harness installs a per-test panic hook that
2219    // catches the re-panic and reports it as a failed test with the
2220    // reader-thread's payload preserved. The alternative —
2221    // `.map_err(|_| anyhow!("..."))` — would erase the reader-
2222    // thread panic payload, surface a generic string through `?`,
2223    // and make the test pass look like "drain step returned Err"
2224    // when the true failure was a panic inside `drain_capped` (an
2225    // indexing-out-of-bounds on a malformed stream, say). The
2226    // panic=abort caller contract holds in release (whole-process
2227    // abort); debug/test callers get a loud re-panic with the
2228    // original panic payload visible. Either way no `Err` reaches
2229    // the `?` below.
2230    let (stdout, _stdout_truncated) = match stdout_handle {
2231        Some(h) => h.join().unwrap().with_context(|| "read child stdout")?,
2232        None => (String::new(), false),
2233    };
2234    let (stderr, _stderr_truncated) = match stderr_handle {
2235        Some(h) => h.join().unwrap().with_context(|| "read child stderr")?,
2236        None => (String::new(), false),
2237    };
2238    Ok(SpawnOutput {
2239        stdout,
2240        stderr,
2241        exit_code: status.code().unwrap_or(-1),
2242    })
2243}
2244
2245/// Read `src` into a `String` with `MAX_CAPTURED_STREAM_BYTES` cap.
2246/// Returns `(buf, truncated)`. Emits a paired `eprintln!` +
2247/// `tracing::warn!` notice with the stream label (e.g. "stdout" /
2248/// "stderr") and cap byte count when the cap is hit.
2249///
2250/// Truncation is performed at the byte level on a `Vec<u8>` so a
2251/// split multi-byte UTF-8 char at the cap boundary cannot panic.
2252/// The final `String::from_utf8_lossy` replaces any invalid UTF-8
2253/// bytes with U+FFFD — including the partial-char split that byte
2254/// truncation can introduce. Non-truncated output preserves the
2255/// original bytes verbatim when it is already valid UTF-8; the
2256/// only behavioral delta vs the pre-cap `read_to_string` path is
2257/// that invalid UTF-8 in the child's full output now produces
2258/// replacement chars instead of an `io::ErrorKind::InvalidData`
2259/// upstream error. That trade is deliberate: past the cap there is
2260/// no way to report "invalid UTF-8" meaningfully since the tail is
2261/// gone, and making the pre-cap path lossy keeps semantics uniform.
2262fn drain_capped(src: impl std::io::Read, label: &'static str) -> std::io::Result<(String, bool)> {
2263    use std::io::Read;
2264    // One extra byte probes whether the source had more to offer —
2265    // `Take` returns EOF at exactly the cap, indistinguishable from
2266    // a child that emitted exactly `cap` bytes. We cap our own buffer
2267    // at MAX + 1 and check the read count.
2268    let mut raw: Vec<u8> = Vec::new();
2269    let n = src
2270        .take(MAX_CAPTURED_STREAM_BYTES + 1)
2271        .read_to_end(&mut raw)?;
2272    let truncated = n as u64 > MAX_CAPTURED_STREAM_BYTES;
2273    if truncated {
2274        raw.truncate(MAX_CAPTURED_STREAM_BYTES as usize);
2275        // Dual-emit: stderr for nextest-direct test runs (no
2276        // tracing subscriber installed in the default test-support
2277        // dispatch path), tracing for cargo-ktstr-wrapped runs and
2278        // structured-log consumers. Same rationale as the prefetch
2279        // notices — a silent-truncation warn that only reaches the
2280        // no-op dispatcher fails the visibility goal of this check.
2281        eprintln!(
2282            "ktstr: payload {label} exceeded {MAX_CAPTURED_STREAM_BYTES} bytes; tail discarded"
2283        );
2284        tracing::warn!(
2285            stream = label,
2286            cap_bytes = MAX_CAPTURED_STREAM_BYTES,
2287            "payload {label} exceeded capture cap; tail discarded",
2288        );
2289    }
2290    Ok((String::from_utf8_lossy(&raw).into_owned(), truncated))
2291}
2292
2293#[cfg(test)]
2294mod tests {
2295    use super::*;
2296    use crate::cgroup::CgroupManager;
2297    use crate::test_support::{MetricStream, OutputFormat, Polarity, Scheduler};
2298    use crate::topology::TestTopology;
2299
2300    // Minimal Ctx builder fixture for tests — no VM boot.
2301    fn make_ctx<'a>(
2302        cgroups: &'a CgroupManager,
2303        topo: &'a TestTopology,
2304    ) -> crate::scenario::Ctx<'a> {
2305        crate::scenario::Ctx::builder(cgroups, topo).build()
2306    }
2307
2308    /// Closure-form helper that owns the CgroupManager + TestTopology
2309    /// pair under the closure's stack frame so a Ctx borrowed from
2310    /// them stays valid for the closure body. Collapses the 3-line
2311    /// `cgroups + topo + make_ctx` setup that every payload_run test
2312    /// repeats. Tests pick the `parent` cgroup path: tests that never
2313    /// resolve against a real FS pass `"/nonexistent"`; tests that
2314    /// assert `resolve_cgroup_path` output pass `"/sys/fs/cgroup/test-parent"`.
2315    fn with_ctx<R>(parent: &str, f: impl FnOnce(&crate::scenario::Ctx<'_>) -> R) -> R {
2316        let cgroups = CgroupManager::new(parent);
2317        let topo = TestTopology::synthetic(4, 1);
2318        let ctx = make_ctx(&cgroups, &topo);
2319        f(&ctx)
2320    }
2321
2322    const FIO_BINARY: Payload = Payload {
2323        name: "fio",
2324        kind: PayloadKind::Binary("fio"),
2325        output: OutputFormat::Json,
2326        default_args: &["--output-format=json"],
2327        default_checks: &[],
2328        metrics: &[],
2329        include_files: &[],
2330        uses_parent_pgrp: false,
2331        known_flags: None,
2332    };
2333
2334    const EEVDF_SCHED_PAYLOAD: Payload = Payload {
2335        name: "eevdf",
2336        kind: PayloadKind::Scheduler(&Scheduler::EEVDF),
2337        output: OutputFormat::ExitCode,
2338        default_args: &[],
2339        default_checks: &[],
2340        metrics: &[],
2341        include_files: &[],
2342        uses_parent_pgrp: false,
2343        known_flags: None,
2344    };
2345
2346    #[test]
2347    fn builder_inherits_default_args() {
2348        with_ctx("/nonexistent", |ctx| {
2349            let run = PayloadRun::new(ctx, &FIO_BINARY);
2350            assert_eq!(run.args, vec!["--output-format=json"]);
2351        });
2352    }
2353
2354    #[test]
2355    fn arg_appends() {
2356        with_ctx("/nonexistent", |ctx| {
2357            let run = PayloadRun::new(ctx, &FIO_BINARY)
2358                .arg("--runtime=30")
2359                .arg("job.fio");
2360            assert_eq!(
2361                run.args,
2362                vec!["--output-format=json", "--runtime=30", "job.fio"]
2363            );
2364        });
2365    }
2366
2367    #[test]
2368    fn clear_args_wipes_defaults() {
2369        with_ctx("/nonexistent", |ctx| {
2370            let run = PayloadRun::new(ctx, &FIO_BINARY)
2371                .clear_args()
2372                .arg("--custom");
2373            assert_eq!(run.args, vec!["--custom"]);
2374        });
2375    }
2376
2377    #[test]
2378    fn args_method_bulk_appends() {
2379        with_ctx("/nonexistent", |ctx| {
2380            let run = PayloadRun::new(ctx, &FIO_BINARY).args(["--a", "--b", "--c"]);
2381            assert_eq!(run.args, vec!["--output-format=json", "--a", "--b", "--c"]);
2382        });
2383    }
2384
2385    #[test]
2386    fn check_and_clear_checks() {
2387        with_ctx("/nonexistent", |ctx| {
2388            let run = PayloadRun::new(ctx, &FIO_BINARY)
2389                .check(MetricCheck::min("iops", 1000.0))
2390                .check(MetricCheck::max("latency", 500.0));
2391            assert_eq!(run.checks.len(), 2);
2392            let cleared = PayloadRun::new(ctx, &FIO_BINARY)
2393                .clear_checks()
2394                .check(MetricCheck::exit_code_eq(0));
2395            assert_eq!(cleared.checks.len(), 1);
2396        });
2397    }
2398
2399    #[test]
2400    fn in_cgroup_stores_name() {
2401        with_ctx("/nonexistent", |ctx| {
2402            let run = PayloadRun::new(ctx, &FIO_BINARY).in_cgroup("fio_cg");
2403            assert_eq!(run.cgroup.as_deref(), Some("fio_cg"));
2404        });
2405    }
2406
2407    #[test]
2408    fn resolve_cgroup_path_strips_leading_slash_and_joins() {
2409        with_ctx("/sys/fs/cgroup/test-parent", |ctx| {
2410            // Leading "/" tolerated, joined under parent.
2411            let resolved = resolve_cgroup_path(ctx, Some("/workload"), "test::strips")
2412                .expect("valid cgroup name")
2413                .expect("Some(path)");
2414            assert_eq!(
2415                resolved,
2416                std::path::PathBuf::from("/sys/fs/cgroup/test-parent/workload")
2417            );
2418            // Same name without leading slash produces the same path.
2419            let plain = resolve_cgroup_path(ctx, Some("workload"), "test::strips")
2420                .expect("valid")
2421                .expect("Some");
2422            assert_eq!(resolved, plain);
2423        });
2424    }
2425
2426    #[test]
2427    fn resolve_cgroup_path_rejects_parent_dir() {
2428        with_ctx("/sys/fs/cgroup/test-parent", |ctx| {
2429            let err = resolve_cgroup_path(ctx, Some("../escape"), "PayloadRun::run")
2430                .expect_err("'..' must be rejected");
2431            let rendered = format!("{err:#}");
2432            assert!(rendered.contains(".."), "err: {rendered}");
2433            assert!(
2434                rendered.contains("PayloadRun::run"),
2435                "op label must appear in error: {rendered}",
2436            );
2437        });
2438    }
2439
2440    #[test]
2441    fn resolve_cgroup_path_rejects_nul_byte() {
2442        with_ctx("/sys/fs/cgroup/test-parent", |ctx| {
2443            let err = resolve_cgroup_path(ctx, Some("bad\0name"), "PayloadRun::spawn")
2444                .expect_err("NUL must be rejected");
2445            let rendered = format!("{err:#}");
2446            assert!(rendered.contains("NUL"), "err: {rendered}");
2447            assert!(
2448                rendered.contains("PayloadRun::spawn"),
2449                "op label must appear in error: {rendered}",
2450            );
2451        });
2452    }
2453
2454    #[test]
2455    fn resolve_cgroup_path_rejects_empty_after_strip() {
2456        with_ctx("/sys/fs/cgroup/test-parent", |ctx| {
2457            // "/" strips to empty — reject so we don't silently target
2458            // the parent cgroup itself.
2459            let err = resolve_cgroup_path(ctx, Some("/"), "PayloadRun::run")
2460                .expect_err("slash-only must be rejected");
2461            let rendered = format!("{err:#}");
2462            assert!(rendered.contains("empty"), "err: {rendered}");
2463            assert!(
2464                rendered.contains("PayloadRun::run"),
2465                "op label must appear in error: {rendered}",
2466            );
2467            let err = resolve_cgroup_path(ctx, Some(""), "PayloadRun::run")
2468                .expect_err("empty must be rejected");
2469            let rendered = format!("{err:#}");
2470            assert!(rendered.contains("empty"), "err: {rendered}");
2471            assert!(
2472                rendered.contains("PayloadRun::run"),
2473                "op label must appear in error: {rendered}",
2474            );
2475        });
2476    }
2477
2478    #[test]
2479    fn resolve_cgroup_path_none_passes_through() {
2480        with_ctx("/sys/fs/cgroup/test-parent", |ctx| {
2481            assert!(
2482                resolve_cgroup_path(ctx, None, "test::none")
2483                    .unwrap()
2484                    .is_none()
2485            );
2486        });
2487    }
2488
2489    /// Differential pin: the `op` label must flow through to the
2490    /// rendered error verbatim, distinct per caller. Catches the
2491    /// regression where the parameter is accepted but ignored
2492    /// (e.g. a refactor that hardcodes one label in the
2493    /// `anyhow!` message). Without this, per-caller assertions
2494    /// could pass against an implementation that drops the arg.
2495    #[test]
2496    fn resolve_cgroup_path_op_label_differs_per_caller() {
2497        with_ctx("/sys/fs/cgroup/test-parent", |ctx| {
2498            let err_a = format!(
2499                "{:#}",
2500                resolve_cgroup_path(ctx, Some("../bad"), "caller-a")
2501                    .expect_err("'..' must be rejected"),
2502            );
2503            let err_b = format!(
2504                "{:#}",
2505                resolve_cgroup_path(ctx, Some("../bad"), "caller-b")
2506                    .expect_err("'..' must be rejected"),
2507            );
2508            assert!(err_a.contains("caller-a"), "err_a: {err_a}");
2509            assert!(
2510                !err_a.contains("caller-b"),
2511                "err_a leaked caller-b: {err_a}"
2512            );
2513            assert!(err_b.contains("caller-b"), "err_b: {err_b}");
2514            assert!(
2515                !err_b.contains("caller-a"),
2516                "err_b leaked caller-a: {err_b}"
2517            );
2518        });
2519    }
2520
2521    /// Production-caller pin: the public `.run()` entry point must
2522    /// actually pass `"PayloadRun::run"` (and not, e.g.,
2523    /// `"PayloadRun::spawn"` via a copy-paste typo) when calling
2524    /// resolve_cgroup_path. The helper-level differential test above
2525    /// only proves the parameter isn't dropped — this test exercises
2526    /// the production call site inside `PayloadRun::run()` end-to-end.
2527    #[test]
2528    fn payload_run_run_threads_canonical_op_label() {
2529        with_ctx("/sys/fs/cgroup/test-parent", |ctx| {
2530            let run = PayloadRun::new(ctx, &FIO_BINARY).in_cgroup("../escape");
2531            let err = run.run().unwrap_err();
2532            let rendered = format!("{err:#}");
2533            assert!(rendered.contains(".."), "err: {rendered}");
2534            assert!(
2535                rendered.contains("PayloadRun::run"),
2536                "production run() must pass PayloadRun::run as op label: {rendered}",
2537            );
2538        });
2539    }
2540
2541    /// Sibling of `payload_run_run_threads_canonical_op_label` for
2542    /// the `PayloadRun::spawn()` entry point. Uses a NUL-byte
2543    /// cgroup name (different validation branch from the run()
2544    /// test's `..`) to guard against a hypothetical refactor that
2545    /// hardcodes one op label across all anyhow! sites in
2546    /// resolve_cgroup_path.
2547    #[test]
2548    fn payload_run_spawn_threads_canonical_op_label() {
2549        with_ctx("/sys/fs/cgroup/test-parent", |ctx| {
2550            let run = PayloadRun::new(ctx, &FIO_BINARY).in_cgroup("bad\0name");
2551            let err = run.spawn().unwrap_err();
2552            let rendered = format!("{err:#}");
2553            assert!(rendered.contains("NUL"), "err: {rendered}");
2554            assert!(
2555                rendered.contains("PayloadRun::spawn"),
2556                "production spawn() must pass PayloadRun::spawn as op label: {rendered}",
2557            );
2558        });
2559    }
2560
2561    #[test]
2562    fn run_rejects_scheduler_kind() {
2563        with_ctx("/nonexistent", |ctx| {
2564            let run = PayloadRun::new(ctx, &EEVDF_SCHED_PAYLOAD);
2565            let err = run.run().unwrap_err();
2566            assert!(
2567                format!("{err:#}").contains("scheduler-kind"),
2568                "err: {err:#}"
2569            );
2570        });
2571    }
2572
2573    #[test]
2574    fn evaluate_checks_passes_when_no_checks() {
2575        let pm = PayloadMetrics {
2576            payload_index: 0,
2577            metrics: vec![],
2578            exit_code: 0,
2579        };
2580        let r = evaluate_checks(&[], &pm, "");
2581        assert!(r.is_pass());
2582    }
2583
2584    #[test]
2585    fn evaluate_checks_exit_code_mismatch_fails_fast() {
2586        let pm = PayloadMetrics {
2587            payload_index: 0,
2588            metrics: vec![],
2589            exit_code: 42,
2590        };
2591        let checks = [
2592            MetricCheck::exit_code_eq(0),
2593            MetricCheck::min("iops", 100.0),
2594        ];
2595        let r = evaluate_checks(&checks, &pm, "");
2596        assert!(r.is_fail());
2597        // exit-code failure short-circuits — only one detail, not
2598        // a "missing metric" detail from the min check.
2599        assert_eq!(r.outcomes.len(), 1);
2600        assert!(
2601            r.failure_details()
2602                .next()
2603                .unwrap()
2604                .message
2605                .contains("exited with code 42"),
2606            "details: {:?}",
2607            r.outcomes
2608        );
2609    }
2610
2611    #[test]
2612    fn evaluate_checks_exit_code_mismatch_surfaces_stderr() {
2613        let pm = PayloadMetrics {
2614            payload_index: 0,
2615            metrics: vec![],
2616            exit_code: 1,
2617        };
2618        let r = evaluate_checks(
2619            &[MetricCheck::exit_code_eq(0)],
2620            &pm,
2621            "fatal: config missing\n",
2622        );
2623        assert!(r.is_fail());
2624        assert!(
2625            r.failure_details()
2626                .next()
2627                .unwrap()
2628                .message
2629                .contains("fatal: config missing"),
2630            "stderr tail must appear in detail: {:?}",
2631            r.outcomes,
2632        );
2633        assert!(
2634            r.failure_details()
2635                .next()
2636                .unwrap()
2637                .message
2638                .contains("stderr:"),
2639            "detail must label the stderr block: {:?}",
2640            r.outcomes,
2641        );
2642    }
2643
2644    #[test]
2645    fn evaluate_checks_exit_code_mismatch_without_stderr_stays_terse() {
2646        let pm = PayloadMetrics {
2647            payload_index: 0,
2648            metrics: vec![],
2649            exit_code: 1,
2650        };
2651        let r = evaluate_checks(&[MetricCheck::exit_code_eq(0)], &pm, "");
2652        assert!(r.is_fail());
2653        // Empty stderr → no "stderr:" prefix in the detail.
2654        assert!(
2655            !r.failure_details()
2656                .next()
2657                .unwrap()
2658                .message
2659                .contains("stderr:"),
2660            "empty stderr must not produce a stderr: block: {:?}",
2661            r.outcomes,
2662        );
2663    }
2664
2665    /// Signal-terminated payloads report `exit_code = -1` because
2666    /// `std::process::ExitStatus::code()` returns `None` on
2667    /// signal death and the spawn layer maps that to `-1` (see
2668    /// `spawn_and_wait`). A user who expects the signal-death
2669    /// case can assert `MetricCheck::exit_code_eq(-1)`, and the pre-pass
2670    /// comparison must pass under exact `i32` equality.
2671    #[test]
2672    fn evaluate_checks_exit_code_eq_negative_one_matches_signal_death() {
2673        let pm = PayloadMetrics {
2674            payload_index: 0,
2675            metrics: vec![],
2676            exit_code: -1,
2677        };
2678        let r = evaluate_checks(&[MetricCheck::exit_code_eq(-1)], &pm, "");
2679        assert!(
2680            r.is_pass(),
2681            "exit_code_eq(-1) must pass when exit_code == -1: {:?}",
2682            r.outcomes,
2683        );
2684    }
2685
2686    /// Symmetric negative case: `MetricCheck::exit_code_eq(-1)` against a
2687    /// CLEAN exit (`exit_code == 0`) must fail and surface the
2688    /// mismatch with both integers printed so the user sees what
2689    /// they asked for vs what happened.
2690    #[test]
2691    fn evaluate_checks_exit_code_eq_negative_one_fails_on_clean_exit() {
2692        let pm = PayloadMetrics {
2693            payload_index: 0,
2694            metrics: vec![],
2695            exit_code: 0,
2696        };
2697        let r = evaluate_checks(&[MetricCheck::exit_code_eq(-1)], &pm, "");
2698        assert!(r.is_fail());
2699        let msg = &*r.failure_details().next().unwrap().message;
2700        assert!(
2701            msg.contains("exited with code 0"),
2702            "mismatch detail must cite the actual exit code, got: {msg}"
2703        );
2704        assert!(
2705            msg.contains("-1"),
2706            "mismatch detail must cite the expected exit code, got: {msg}"
2707        );
2708    }
2709
2710    /// Reversed bounds (`lo > hi`) cannot reach the evaluator —
2711    /// `MetricCheck::range` panics at construction (see
2712    /// `check_range_reversed_bounds_panics_at_construction` in
2713    /// `test_support::payload`). Pin the constructor-side panic
2714    /// here so a future relaxation that lets reversed ranges flow
2715    /// into the evaluator (and silently fail every metric) trips
2716    /// this guard instead of slipping past CI.
2717    #[test]
2718    #[should_panic(expected = "lo must be <= hi")]
2719    fn evaluate_checks_range_reversed_bounds_panics_at_construction() {
2720        let _ = MetricCheck::range("iops", 100.0, 50.0);
2721    }
2722
2723    #[test]
2724    fn stderr_tail_truncates_long_input() {
2725        // Build >STDERR_TAIL_BYTES of ASCII so char-boundary logic
2726        // is a no-op and the tail size is deterministic.
2727        let long: String = "A".repeat(STDERR_TAIL_BYTES + 500);
2728        let tail = stderr_tail(&long, STDERR_TAIL_BYTES);
2729        assert!(tail.starts_with("..."));
2730        // Leading "..." + exactly STDERR_TAIL_BYTES of suffix.
2731        assert_eq!(tail.len(), STDERR_TAIL_BYTES + 3);
2732    }
2733
2734    #[test]
2735    fn stderr_tail_preserves_short_input() {
2736        let tail = stderr_tail("short error", STDERR_TAIL_BYTES);
2737        assert_eq!(tail, "short error");
2738    }
2739
2740    /// When `s.len() - max_bytes` lands inside a multi-byte UTF-8
2741    /// code unit, `stderr_tail` snaps the start index forward to the
2742    /// next char boundary so the slice operation never panics. This
2743    /// test uses a 2-byte UTF-8 character ("é") placed at the exact
2744    /// boundary so a naive `&s[start..]` would slice mid-codepoint.
2745    #[test]
2746    fn stderr_tail_snaps_forward_across_multibyte_char_boundary() {
2747        // "A"*10 + "é" + "B"*10 → 22 bytes total, len 22, "é" = 2 bytes.
2748        // With max_bytes = 11, start = 22 - 11 = 11. The byte at 11 is
2749        // the second byte of "é" (non-boundary). The snap-forward
2750        // advances start to 12, yielding the trailing "B"*10 + preamble.
2751        let mut s = String::from("A").repeat(10);
2752        s.push('é');
2753        s.push_str(&"B".repeat(10));
2754        let tail = stderr_tail(&s, 11);
2755        assert!(tail.starts_with("..."));
2756        // The multi-byte char must have been skipped (advanced off its
2757        // interior), so the tail begins with ASCII "B"s.
2758        assert!(
2759            tail[3..].starts_with('B'),
2760            "expected snap-forward past 'é', got: {tail:?}"
2761        );
2762    }
2763
2764    /// When the whole multi-byte character sits at the snap-forward
2765    /// boundary (start lands exactly on its first byte), the
2766    /// character is preserved intact — no off-by-one that drops its
2767    /// first byte.
2768    #[test]
2769    fn stderr_tail_preserves_multibyte_char_at_exact_boundary() {
2770        // Build a string so the multi-byte char starts exactly at the
2771        // snap-forward position. ASCII x10 + "é" (2B) + ASCII x10
2772        // = 22B. max_bytes = 12 → start = 22-12 = 10, which IS "é"'s
2773        // first byte (a boundary). No snap happens; "é" is included.
2774        let mut s = String::from("A").repeat(10);
2775        s.push('é');
2776        s.push_str(&"B".repeat(10));
2777        let tail = stderr_tail(&s, 12);
2778        assert!(tail.starts_with("..."));
2779        assert!(
2780            tail.contains('é'),
2781            "boundary-aligned multibyte char must survive the tail, got: {tail:?}"
2782        );
2783    }
2784
2785    /// For every `max_bytes` offset across a string with an interior
2786    /// multi-byte char, `stderr_tail`'s output is a faithful suffix of
2787    /// the input — not just valid UTF-8 (which `String` guarantees for
2788    /// free). The body after any leading `...` marker must be a suffix
2789    /// of `s`, and the truncation must never lengthen the content.
2790    /// Pins the slice CONTENT across every byte offset, catching a
2791    /// snap-direction or off-by-one regression that the prior
2792    /// discard-the-result loop could not see.
2793    #[test]
2794    fn stderr_tail_output_is_suffix_of_input_at_every_offset() {
2795        // Chinese "好" = 3 bytes (E5 A5 BD); pin it mid-string.
2796        let s = "xxxxxxxxxx好yyyyyyyyyy"; // 10 + 3 + 10 = 23 bytes
2797        for max in 1..=s.len() {
2798            let tail = stderr_tail(s, max);
2799            if s.len() <= max {
2800                // No truncation: returns the input verbatim, no marker.
2801                assert_eq!(tail, s, "max={max}: short input must round-trip");
2802                assert!(
2803                    !tail.starts_with("..."),
2804                    "max={max}: no marker when untruncated"
2805                );
2806            } else {
2807                // Truncated: leading "..." marker, then a suffix of s.
2808                let body = tail.strip_prefix("...").unwrap_or_else(|| {
2809                    panic!("max={max}: truncated output must carry the marker, got {tail:?}")
2810                });
2811                assert!(
2812                    s.ends_with(body),
2813                    "max={max}: body {body:?} must be a suffix of the input {s:?}",
2814                );
2815                // Snap-forward only ever drops bytes from the front, so
2816                // the retained suffix is at most `max` bytes and never
2817                // longer than the input.
2818                assert!(
2819                    body.len() <= max,
2820                    "max={max}: retained suffix {} bytes exceeds the {max}-byte budget",
2821                    body.len(),
2822                );
2823                // The boundary snap must keep the suffix a valid char
2824                // boundary of s (no mid-codepoint split) — equivalent
2825                // to: stripping `body` off the end of s leaves a valid
2826                // str whose remainder + body reconstructs s exactly.
2827                let head_len = s.len() - body.len();
2828                assert!(
2829                    s.is_char_boundary(head_len),
2830                    "max={max}: suffix start {head_len} must be a char boundary of {s:?}",
2831                );
2832            }
2833        }
2834    }
2835
2836    /// Production-scale counterpart to the boundary tests above. The
2837    /// existing small-string cases use ~20 bytes, well below the
2838    /// production [`STDERR_TAIL_BYTES`] threshold of 1024. This test
2839    /// lands a multi-byte character's interior byte on the truncation
2840    /// offset of a >1 KiB string, matching the actual shape of an
2841    /// overflowing stderr from a real payload. The snap-forward must
2842    /// advance past the interior byte so `stderr_tail` does not panic
2843    /// on mid-codepoint slicing.
2844    #[test]
2845    fn stderr_tail_snaps_forward_at_production_threshold() {
2846        // Layout: "A"*100 + "é" (2B) + "B"*1023 = 1125 bytes.
2847        // start = 1125 - 1024 = 101 — the interior byte of "é"
2848        // (whose boundary bytes are at 100 and 102). The snap-forward
2849        // advances start to 102, so the tail begins with the "B"
2850        // suffix rather than a corrupt split "é".
2851        let mut s = "A".repeat(100);
2852        s.push('é');
2853        s.push_str(&"B".repeat(1023));
2854        assert!(
2855            s.len() > STDERR_TAIL_BYTES,
2856            "fixture must exceed STDERR_TAIL_BYTES to exercise the truncation path",
2857        );
2858        let tail = stderr_tail(&s, STDERR_TAIL_BYTES);
2859        assert!(tail.starts_with("..."));
2860        assert!(
2861            tail[3..].starts_with('B'),
2862            "expected snap-forward past 'é' interior byte at >1 KiB, got prefix: {:?}",
2863            &tail[..20.min(tail.len())],
2864        );
2865    }
2866
2867    /// Production-scale complement: when the truncation offset lands
2868    /// exactly on a multi-byte character's first byte (a boundary),
2869    /// the character survives — no off-by-one that would drop it.
2870    /// Covers the is_char_boundary-true branch of the snap-forward
2871    /// loop at the real [`STDERR_TAIL_BYTES`] size.
2872    #[test]
2873    fn stderr_tail_preserves_multibyte_at_production_boundary() {
2874        // Layout: "A"*100 + "é" (2B) + "B"*1022 = 1124 bytes.
2875        // start = 1124 - 1024 = 100 — the first byte of "é" (which
2876        // IS a char boundary). No snap runs; "é" is included whole.
2877        let mut s = "A".repeat(100);
2878        s.push('é');
2879        s.push_str(&"B".repeat(1022));
2880        assert!(
2881            s.len() > STDERR_TAIL_BYTES,
2882            "fixture must exceed STDERR_TAIL_BYTES to exercise the truncation path",
2883        );
2884        let tail = stderr_tail(&s, STDERR_TAIL_BYTES);
2885        assert!(tail.starts_with("..."));
2886        assert!(
2887            tail.contains('é'),
2888            "boundary-aligned 'é' at the >1 KiB truncation offset must survive, got prefix: {:?}",
2889            &tail[..40.min(tail.len())],
2890        );
2891    }
2892
2893    #[test]
2894    fn evaluate_checks_missing_metric_fails_loudly() {
2895        let pm = PayloadMetrics {
2896            payload_index: 0,
2897            metrics: vec![],
2898            exit_code: 0,
2899        };
2900        let checks = [MetricCheck::min("iops", 100.0)];
2901        let r = evaluate_checks(&checks, &pm, "");
2902        assert!(r.is_fail());
2903        assert!(
2904            r.failure_details()
2905                .next()
2906                .unwrap()
2907                .message
2908                .contains("not found"),
2909            "details: {:?}",
2910            r.outcomes
2911        );
2912    }
2913
2914    #[test]
2915    fn evaluate_checks_min_below_threshold_fails() {
2916        let pm = PayloadMetrics {
2917            payload_index: 0,
2918            metrics: vec![Metric {
2919                name: "iops".to_string(),
2920                value: 50.0,
2921                polarity: Polarity::HigherBetter,
2922                unit: String::new(),
2923                stream: MetricStream::Stdout,
2924            }],
2925            exit_code: 0,
2926        };
2927        let r = evaluate_checks(&[MetricCheck::min("iops", 100.0)], &pm, "");
2928        assert!(r.is_fail());
2929        assert!(
2930            r.failure_details()
2931                .next()
2932                .unwrap()
2933                .message
2934                .contains("below minimum")
2935        );
2936    }
2937
2938    #[test]
2939    fn evaluate_checks_max_above_threshold_fails() {
2940        let pm = PayloadMetrics {
2941            payload_index: 0,
2942            metrics: vec![Metric {
2943                name: "lat".to_string(),
2944                value: 1000.0,
2945                polarity: Polarity::LowerBetter,
2946                unit: String::new(),
2947                stream: MetricStream::Stdout,
2948            }],
2949            exit_code: 0,
2950        };
2951        let r = evaluate_checks(&[MetricCheck::max("lat", 500.0)], &pm, "");
2952        assert!(r.is_fail());
2953        assert!(
2954            r.failure_details()
2955                .next()
2956                .unwrap()
2957                .message
2958                .contains("exceeds maximum")
2959        );
2960    }
2961
2962    #[test]
2963    fn evaluate_checks_range_out_of_bounds_fails() {
2964        let pm = PayloadMetrics {
2965            payload_index: 0,
2966            metrics: vec![Metric {
2967                name: "cpu".to_string(),
2968                value: 150.0,
2969                polarity: Polarity::Unknown,
2970                unit: String::new(),
2971                stream: MetricStream::Stdout,
2972            }],
2973            exit_code: 0,
2974        };
2975        let r = evaluate_checks(&[MetricCheck::range("cpu", 0.0, 100.0)], &pm, "");
2976        assert!(r.is_fail());
2977        assert!(
2978            r.failure_details()
2979                .next()
2980                .unwrap()
2981                .message
2982                .contains("outside")
2983        );
2984    }
2985
2986    /// IEEE 754 makes every comparison against NaN evaluate to
2987    /// false, so a naive `actual < min` would silently pass a
2988    /// `Min` check on a NaN-valued metric — exactly the case
2989    /// operators most need to flag, since NaN almost always means
2990    /// the metric extraction itself is broken (divide-by-zero,
2991    /// unparsed token, typed-measurement error). The fix routes
2992    /// NaN through `nan_metric` BEFORE the bound comparison, so
2993    /// the failure reads "metric '...' value is NaN" rather than
2994    /// silently green-lighting an unmeasurable value.
2995    #[test]
2996    fn evaluate_checks_min_nan_fails_with_nan_message() {
2997        let pm = PayloadMetrics {
2998            payload_index: 0,
2999            metrics: vec![Metric {
3000                name: "iops".to_string(),
3001                value: f64::NAN,
3002                polarity: Polarity::HigherBetter,
3003                unit: String::new(),
3004                stream: MetricStream::Stdout,
3005            }],
3006            exit_code: 0,
3007        };
3008        let r = evaluate_checks(&[MetricCheck::min("iops", 100.0)], &pm, "");
3009        assert!(!r.is_pass(), "NaN value must fail Min check");
3010        assert!(
3011            r.failure_details()
3012                .next()
3013                .unwrap()
3014                .message
3015                .contains("value is NaN"),
3016            "NaN failure must surface the dedicated message: {:?}",
3017            r.outcomes
3018        );
3019    }
3020
3021    /// Sibling of [`evaluate_checks_min_nan_fails_with_nan_message`]
3022    /// for the upper-bound path — `actual > max` is also false for
3023    /// NaN.
3024    #[test]
3025    fn evaluate_checks_max_nan_fails_with_nan_message() {
3026        let pm = PayloadMetrics {
3027            payload_index: 0,
3028            metrics: vec![Metric {
3029                name: "lat".to_string(),
3030                value: f64::NAN,
3031                polarity: Polarity::LowerBetter,
3032                unit: String::new(),
3033                stream: MetricStream::Stdout,
3034            }],
3035            exit_code: 0,
3036        };
3037        let r = evaluate_checks(&[MetricCheck::max("lat", 500.0)], &pm, "");
3038        assert!(!r.is_pass(), "NaN value must fail Max check");
3039        assert!(
3040            r.failure_details()
3041                .next()
3042                .unwrap()
3043                .message
3044                .contains("value is NaN"),
3045            "NaN failure must surface the dedicated message: {:?}",
3046            r.outcomes
3047        );
3048    }
3049
3050    /// Range gate sees NaN through both `actual < lo` and
3051    /// `actual > hi` — both false, so the legacy code accepted
3052    /// NaN as in-range. The fix routes NaN through `nan_metric`
3053    /// before the range comparison.
3054    #[test]
3055    fn evaluate_checks_range_nan_fails_with_nan_message() {
3056        let pm = PayloadMetrics {
3057            payload_index: 0,
3058            metrics: vec![Metric {
3059                name: "cpu".to_string(),
3060                value: f64::NAN,
3061                polarity: Polarity::Unknown,
3062                unit: String::new(),
3063                stream: MetricStream::Stdout,
3064            }],
3065            exit_code: 0,
3066        };
3067        let r = evaluate_checks(&[MetricCheck::range("cpu", 0.0, 100.0)], &pm, "");
3068        assert!(!r.is_pass(), "NaN value must fail Range check");
3069        assert!(
3070            r.failure_details()
3071                .next()
3072                .unwrap()
3073                .message
3074                .contains("value is NaN"),
3075            "NaN failure must surface the dedicated message: {:?}",
3076            r.outcomes
3077        );
3078    }
3079
3080    #[test]
3081    fn evaluate_checks_exists_missing_fails() {
3082        let pm = PayloadMetrics {
3083            payload_index: 0,
3084            metrics: vec![],
3085            exit_code: 0,
3086        };
3087        let r = evaluate_checks(&[MetricCheck::exists("thing")], &pm, "");
3088        assert!(r.is_fail());
3089    }
3090
3091    #[test]
3092    fn evaluate_checks_all_pass_returns_pass() {
3093        let pm = PayloadMetrics {
3094            payload_index: 0,
3095            metrics: vec![Metric {
3096                name: "iops".to_string(),
3097                value: 5000.0,
3098                polarity: Polarity::HigherBetter,
3099                unit: String::new(),
3100                stream: MetricStream::Stdout,
3101            }],
3102            exit_code: 0,
3103        };
3104        let r = evaluate_checks(
3105            &[
3106                MetricCheck::exit_code_eq(0),
3107                MetricCheck::min("iops", 1000.0),
3108                MetricCheck::exists("iops"),
3109            ],
3110            &pm,
3111            "",
3112        );
3113        assert!(r.is_pass());
3114    }
3115
3116    /// Multiple checks on the same metric all fire — the evaluator
3117    /// does not dedup by metric name. Two `Min`s on the same path
3118    /// either both pass (value >= max threshold) or both fail
3119    /// (value < one of the thresholds, depending on which is more
3120    /// restrictive). This test uses a pair where the metric value
3121    /// (100) is below the second threshold (200) but above the
3122    /// first (50). The second failure must appear in the details
3123    /// list — the evaluator must not short-circuit after the first
3124    /// matching metric check.
3125    #[test]
3126    fn evaluate_checks_duplicate_min_on_same_metric_both_evaluated() {
3127        let pm = PayloadMetrics {
3128            payload_index: 0,
3129            metrics: vec![Metric {
3130                name: "iops".to_string(),
3131                value: 100.0,
3132                polarity: Polarity::HigherBetter,
3133                unit: String::new(),
3134                stream: MetricStream::Stdout,
3135            }],
3136            exit_code: 0,
3137        };
3138        let r = evaluate_checks(
3139            &[
3140                MetricCheck::min("iops", 50.0),
3141                MetricCheck::min("iops", 200.0),
3142            ],
3143            &pm,
3144            "",
3145        );
3146        assert!(!r.is_pass(), "second min must fail");
3147        assert_eq!(r.outcomes.len(), 1, "only the failing check emits a detail");
3148        // The passing check produces no detail; only the failing one
3149        // shows up. The message must reference the 200 threshold.
3150        assert!(
3151            r.failure_details()
3152                .next()
3153                .unwrap()
3154                .message
3155                .contains("below minimum 200"),
3156            "failing check must cite its threshold: {:?}",
3157            r.outcomes,
3158        );
3159    }
3160
3161    /// Two conflicting checks on the same metric (Min 100 and Max 50)
3162    /// produce TWO failures in the details list — not one collapsed
3163    /// failure. Pins the "each check evaluated independently"
3164    /// invariant so a future optimization doesn't accidentally merge
3165    /// / dedup.
3166    #[test]
3167    fn evaluate_checks_conflicting_checks_on_same_metric_both_report() {
3168        let pm = PayloadMetrics {
3169            payload_index: 0,
3170            metrics: vec![Metric {
3171                name: "iops".to_string(),
3172                value: 75.0,
3173                polarity: Polarity::HigherBetter,
3174                unit: String::new(),
3175                stream: MetricStream::Stdout,
3176            }],
3177            exit_code: 0,
3178        };
3179        let r = evaluate_checks(
3180            &[
3181                MetricCheck::min("iops", 100.0), // 75 < 100: fail
3182                MetricCheck::max("iops", 50.0),  // 75 > 50: fail
3183            ],
3184            &pm,
3185            "",
3186        );
3187        assert!(r.is_fail());
3188        assert_eq!(
3189            r.outcomes.len(),
3190            2,
3191            "both conflicting checks must each emit a detail: {:?}",
3192            r.outcomes,
3193        );
3194    }
3195
3196    /// `MetricCheck::Exists` with a zero-value metric passes. The check is
3197    /// presence-only — a metric of 0.0 is still present in the
3198    /// PayloadMetrics map and `pm.get(name).is_some()` returns true.
3199    /// A naive `pm.get(name).filter(|v| *v != 0.0)` would spuriously
3200    /// fail here; pin the "exists is sign-agnostic and zero-
3201    /// friendly" invariant.
3202    #[test]
3203    fn evaluate_checks_exists_passes_for_zero_value_metric() {
3204        let pm = PayloadMetrics {
3205            payload_index: 0,
3206            metrics: vec![Metric {
3207                name: "errors".to_string(),
3208                value: 0.0,
3209                polarity: Polarity::LowerBetter,
3210                unit: String::new(),
3211                stream: MetricStream::Stdout,
3212            }],
3213            exit_code: 0,
3214        };
3215        let r = evaluate_checks(&[MetricCheck::exists("errors")], &pm, "");
3216        assert!(
3217            r.is_pass(),
3218            "exists('errors') must pass when metric is 0.0: {:?}",
3219            r.outcomes,
3220        );
3221    }
3222
3223    /// Negative zero (`-0.0`) also counts as present for
3224    /// `MetricCheck::Exists`. Paranoid pin because f64 `-0.0` surprises
3225    /// some pattern-matching code (`0.0 == -0.0` but they differ
3226    /// under `f64::to_bits`).
3227    #[test]
3228    fn evaluate_checks_exists_passes_for_negative_zero() {
3229        let pm = PayloadMetrics {
3230            payload_index: 0,
3231            metrics: vec![Metric {
3232                name: "drift".to_string(),
3233                value: -0.0,
3234                polarity: Polarity::Unknown,
3235                unit: String::new(),
3236                stream: MetricStream::Stdout,
3237            }],
3238            exit_code: 0,
3239        };
3240        let r = evaluate_checks(&[MetricCheck::exists("drift")], &pm, "");
3241        assert!(r.is_pass());
3242    }
3243
3244    /// `PayloadRun`'s custom `Debug` impl renders the stable
3245    /// identity fields — payload name, args/checks lengths, and
3246    /// cgroup placement — without dumping the `Ctx` pointer. Pins
3247    /// the output shape so a future rename can't silently drop a
3248    /// field that debug-printing consumers rely on.
3249    #[test]
3250    fn payload_run_debug_renders_identity_fields() {
3251        with_ctx("/nonexistent", |ctx| {
3252            let run = PayloadRun::new(ctx, &TRUE_BIN)
3253                .arg("--foo")
3254                .arg("--bar")
3255                .check(MetricCheck::exit_code_eq(0))
3256                .in_cgroup("workers");
3257            let s = format!("{run:?}");
3258            assert!(s.contains("PayloadRun"), "prefix: {s}");
3259            assert!(s.contains("payload:"), "payload field: {s}");
3260            assert!(s.contains("true_bin"), "payload name: {s}");
3261            assert!(s.contains("args_len"), "args_len field: {s}");
3262            assert!(s.contains("checks_len"), "checks_len field: {s}");
3263            assert!(s.contains("cgroup:"), "cgroup field: {s}");
3264            // Values: 2 args added (on top of 0 default) + 1 check.
3265            assert!(s.contains("args_len: 2"), "computed args_len: {s}");
3266            assert!(s.contains("checks_len: 1"), "computed checks_len: {s}");
3267            // cgroup is Some("workers"); the debug form of Cow<str>
3268            // renders as "workers" inside Some(..).
3269            assert!(s.contains("workers"), "cgroup value: {s}");
3270            // Must NOT leak the Ctx pointer (no raw-address tokens).
3271            assert!(
3272                !s.contains("Ctx {"),
3273                "Ctx should not appear in PayloadRun Debug: {s}"
3274            );
3275        });
3276    }
3277
3278    /// Default `PayloadRun` (no args, no checks, no cgroup)
3279    /// renders sensible zeroes.
3280    #[test]
3281    fn payload_run_debug_renders_defaults() {
3282        with_ctx("/nonexistent", |ctx| {
3283            let run = PayloadRun::new(ctx, &TRUE_BIN);
3284            let s = format!("{run:?}");
3285            assert!(s.contains("args_len: 0"), "default args_len: {s}");
3286            assert!(s.contains("checks_len: 0"), "default checks_len: {s}");
3287            assert!(s.contains("cgroup: None"), "default cgroup: {s}");
3288        });
3289    }
3290
3291    #[test]
3292    fn resolve_polarities_applies_hints() {
3293        let mut metrics = vec![Metric {
3294            name: "iops".to_string(),
3295            value: 100.0,
3296            polarity: Polarity::Unknown,
3297            unit: String::new(),
3298            stream: MetricStream::Stdout,
3299        }];
3300        const HINTED: Payload = Payload {
3301            name: "p",
3302            kind: PayloadKind::Binary("p"),
3303            output: OutputFormat::Json,
3304            default_args: &[],
3305            default_checks: &[],
3306            metrics: &[crate::test_support::MetricHint {
3307                name: "iops",
3308                polarity: Polarity::HigherBetter,
3309                unit: "iops",
3310            }],
3311            include_files: &[],
3312            uses_parent_pgrp: false,
3313            known_flags: None,
3314        };
3315        resolve_polarities(&mut metrics, &HINTED);
3316        assert_eq!(metrics[0].polarity, Polarity::HigherBetter);
3317        assert_eq!(metrics[0].unit, "iops");
3318    }
3319
3320    // -- PayloadHandle + .spawn() tests --
3321
3322    const TRUE_BIN: Payload = Payload::binary("true_bin", "/bin/true");
3323    const FALSE_BIN: Payload = Payload::binary("false_bin", "/bin/false");
3324
3325    #[test]
3326    fn spawn_rejects_scheduler_kind() {
3327        with_ctx("/nonexistent", |ctx| {
3328            let run = PayloadRun::new(ctx, &EEVDF_SCHED_PAYLOAD);
3329            let err = run.spawn().unwrap_err();
3330            assert!(
3331                format!("{err:#}").contains("scheduler-kind"),
3332                "err: {err:#}"
3333            );
3334        });
3335    }
3336
3337    #[test]
3338    fn spawn_then_wait_returns_result_and_metrics() {
3339        with_ctx("/nonexistent", |ctx| {
3340            let handle = PayloadRun::new(ctx, &TRUE_BIN)
3341                .spawn()
3342                .expect("spawn /bin/true");
3343            let (result, metrics) = handle.wait().expect("wait");
3344            assert!(result.is_pass());
3345            assert_eq!(metrics.exit_code, 0);
3346        });
3347    }
3348
3349    #[test]
3350    fn spawn_then_kill_returns_collected_output() {
3351        with_ctx("/nonexistent", |ctx| {
3352            // /bin/sleep runs for a while; .kill() terminates it.
3353            const SLEEPER: Payload = Payload {
3354                name: "sleeper",
3355                kind: PayloadKind::Binary("/bin/sleep"),
3356                output: crate::test_support::OutputFormat::ExitCode,
3357                default_args: &["60"],
3358                default_checks: &[],
3359                metrics: &[],
3360                include_files: &[],
3361                uses_parent_pgrp: false,
3362                known_flags: None,
3363            };
3364            let handle = PayloadRun::new(ctx, &SLEEPER).spawn().expect("spawn sleep");
3365            let (_result, metrics) = handle.kill().expect("kill+collect");
3366            // Killed process produces a non-zero exit (SIGKILL -> None
3367            // status code, wait_and_capture maps to -1).
3368            assert_ne!(metrics.exit_code, 0);
3369        });
3370    }
3371
3372    #[test]
3373    fn spawn_try_wait_returns_none_while_running() {
3374        with_ctx("/nonexistent", |ctx| {
3375            const SLEEPER: Payload = Payload {
3376                name: "sleeper3",
3377                kind: PayloadKind::Binary("/bin/sleep"),
3378                output: crate::test_support::OutputFormat::ExitCode,
3379                default_args: &["60"],
3380                default_checks: &[],
3381                metrics: &[],
3382                include_files: &[],
3383                uses_parent_pgrp: false,
3384                known_flags: None,
3385            };
3386            let mut handle = PayloadRun::new(ctx, &SLEEPER).spawn().expect("spawn sleep");
3387            // Not yet exited.
3388            assert!(handle.try_wait().expect("try_wait").is_none());
3389            // Cleanup — kill so Drop warning doesn't fire.
3390            let _ = handle.kill();
3391        });
3392    }
3393
3394    #[test]
3395    fn spawn_try_wait_returns_some_after_exit() {
3396        with_ctx("/nonexistent", |ctx| {
3397            let mut handle = PayloadRun::new(ctx, &TRUE_BIN)
3398                .spawn()
3399                .expect("spawn /bin/true");
3400            // /bin/true exits quickly. Poll a few times.
3401            let mut result = None;
3402            for _ in 0..100 {
3403                if let Some(r) = handle.try_wait().expect("try_wait") {
3404                    result = Some(r);
3405                    break;
3406                }
3407                std::thread::sleep(std::time::Duration::from_millis(10));
3408            }
3409            let (r, metrics) = result.expect("try_wait eventually returns Some");
3410            assert!(r.is_pass());
3411            assert_eq!(metrics.exit_code, 0);
3412        });
3413    }
3414
3415    #[test]
3416    fn spawn_false_binary_produces_failing_exit_code() {
3417        with_ctx("/nonexistent", |ctx| {
3418            let handle = PayloadRun::new(ctx, &FALSE_BIN)
3419                .spawn()
3420                .expect("spawn /bin/false");
3421            let (_result, metrics) = handle.wait().expect("wait");
3422            assert_ne!(metrics.exit_code, 0);
3423        });
3424    }
3425
3426    #[test]
3427    fn resolve_polarities_leaves_unhinted_alone() {
3428        let mut metrics = vec![Metric {
3429            name: "no_hint".to_string(),
3430            value: 1.0,
3431            polarity: Polarity::Unknown,
3432            unit: String::new(),
3433            stream: MetricStream::Stdout,
3434        }];
3435        resolve_polarities(&mut metrics, &FIO_BINARY);
3436        assert_eq!(metrics[0].polarity, Polarity::Unknown);
3437        assert_eq!(metrics[0].unit, "");
3438    }
3439
3440    // -- Builder-composition + evaluator-coverage regression tests --
3441
3442    #[test]
3443    fn evaluate_checks_three_failing_checks_produce_three_details() {
3444        // Exit-code check passes (0 == 0), so pre-pass does not
3445        // short-circuit; all three metric checks fail and each must
3446        // contribute its own AssertDetail — regression guard
3447        // against detail dedup/overwrite bugs.
3448        let pm = PayloadMetrics {
3449            payload_index: 0,
3450            metrics: vec![
3451                Metric {
3452                    name: "iops".to_string(),
3453                    value: 10.0,
3454                    polarity: Polarity::HigherBetter,
3455                    unit: String::new(),
3456                    stream: MetricStream::Stdout,
3457                },
3458                Metric {
3459                    name: "lat".to_string(),
3460                    value: 900.0,
3461                    polarity: Polarity::LowerBetter,
3462                    unit: String::new(),
3463                    stream: MetricStream::Stdout,
3464                },
3465                Metric {
3466                    name: "cpu".to_string(),
3467                    value: 200.0,
3468                    polarity: Polarity::Unknown,
3469                    unit: String::new(),
3470                    stream: MetricStream::Stdout,
3471                },
3472            ],
3473            exit_code: 0,
3474        };
3475        let checks = [
3476            MetricCheck::exit_code_eq(0),
3477            MetricCheck::min("iops", 1000.0),
3478            MetricCheck::max("lat", 100.0),
3479            MetricCheck::range("cpu", 0.0, 100.0),
3480        ];
3481        let r = evaluate_checks(&checks, &pm, "");
3482        assert!(r.is_fail());
3483        assert_eq!(
3484            r.outcomes.len(),
3485            3,
3486            "expected one detail per failed metric check, got: {:?}",
3487            r.outcomes,
3488        );
3489        // Each check's message must surface — not an aggregate or
3490        // a deduped first-only line.
3491        assert!(r.failure_details().any(|d| d.message.contains("iops")));
3492        assert!(r.failure_details().any(|d| d.message.contains("lat")));
3493        assert!(r.failure_details().any(|d| d.message.contains("cpu")));
3494    }
3495
3496    #[test]
3497    fn arg_then_clear_args_then_arg_yields_only_the_final_arg() {
3498        // clear_args() wipes EVERYTHING — the default_args AND any
3499        // previously-appended .arg(...) — and subsequent .arg(...)
3500        // calls start from empty. Regression guard for the
3501        // "clear_args truncates, arg appends" contract.
3502        with_ctx("/nonexistent", |ctx| {
3503            let run = PayloadRun::new(ctx, &FIO_BINARY)
3504                .arg("--x")
3505                .clear_args()
3506                .arg("--y");
3507            assert_eq!(run.args, vec!["--y"]);
3508        });
3509    }
3510
3511    #[test]
3512    fn default_checks_are_inherited_by_new_builder() {
3513        // Payload.default_checks are the starting check list: they
3514        // MUST be present on a fresh PayloadRun before any runtime
3515        // .check() calls. `.check` appends on top, `.clear_checks`
3516        // wipes them.
3517        const CHECKED: Payload = Payload {
3518            name: "checked",
3519            kind: PayloadKind::Binary("checked"),
3520            output: OutputFormat::ExitCode,
3521            default_args: &[],
3522            default_checks: &[
3523                MetricCheck::exit_code_eq(0),
3524                MetricCheck::min("iops", 500.0),
3525            ],
3526            metrics: &[],
3527            include_files: &[],
3528            uses_parent_pgrp: false,
3529            known_flags: None,
3530        };
3531        with_ctx("/nonexistent", |ctx| {
3532            // Fresh builder inherits both default checks in order.
3533            let fresh = PayloadRun::new(ctx, &CHECKED);
3534            assert_eq!(fresh.checks.len(), 2);
3535            assert!(matches!(fresh.checks[0], MetricCheck::ExitCodeEq(0)));
3536            assert!(matches!(
3537                fresh.checks[1],
3538                MetricCheck::Min { value, .. } if value == 500.0,
3539            ));
3540
3541            // Appending preserves defaults and adds on top.
3542            let appended = PayloadRun::new(ctx, &CHECKED).check(MetricCheck::exists("latency"));
3543            assert_eq!(appended.checks.len(), 3);
3544
3545            // Clearing wipes defaults too.
3546            let cleared = PayloadRun::new(ctx, &CHECKED).clear_checks();
3547            assert!(cleared.checks.is_empty());
3548        });
3549    }
3550
3551    #[test]
3552    fn in_cgroup_accepts_static_str_zero_alloc() {
3553        // Static &'static str goes in as Cow::Borrowed; no heap
3554        // allocation happens for the common case of a const cgroup
3555        // name. Regression guard for the Cow<'static, str> API shape.
3556        with_ctx("/nonexistent", |ctx| {
3557            let run = PayloadRun::new(ctx, &FIO_BINARY).in_cgroup("workload");
3558            match &run.cgroup {
3559                Some(Cow::Borrowed(s)) => assert_eq!(*s, "workload"),
3560                other => panic!("expected Cow::Borrowed for &'static str input, got {other:?}"),
3561            }
3562        });
3563    }
3564
3565    #[test]
3566    fn in_cgroup_accepts_owned_string() {
3567        // Owned String goes in as Cow::Owned; the builder must not
3568        // require the caller to convert themselves.
3569        with_ctx("/nonexistent", |ctx| {
3570            let name = String::from("dynamic");
3571            let run = PayloadRun::new(ctx, &FIO_BINARY).in_cgroup(name);
3572            match &run.cgroup {
3573                Some(Cow::Owned(s)) => assert_eq!(s, "dynamic"),
3574                other => panic!("expected Cow::Owned for String input, got {other:?}"),
3575            }
3576        });
3577    }
3578
3579    /// Host-side decode of a guest-emitted [`crate::test_support::PayloadMetrics`] JSON
3580    /// body must round-trip exactly — the SHM transport only carries
3581    /// bytes, and a schema drift between emit-side (serde_json on a
3582    /// `PayloadMetrics`) and drain-side (serde_json::from_slice) would
3583    /// silently drop metrics from the sidecar.
3584    #[test]
3585    fn payload_metrics_shm_payload_json_round_trip() {
3586        let emit = PayloadMetrics {
3587            payload_index: 0,
3588            metrics: vec![
3589                Metric {
3590                    name: "jobs.0.read.iops".to_string(),
3591                    value: 12345.0,
3592                    polarity: Polarity::HigherBetter,
3593                    unit: "iops".to_string(),
3594                    stream: MetricStream::Stdout,
3595                },
3596                Metric {
3597                    name: "lat_ns".to_string(),
3598                    value: 500.0,
3599                    polarity: Polarity::LowerBetter,
3600                    unit: "ns".to_string(),
3601                    stream: MetricStream::Stdout,
3602                },
3603            ],
3604            exit_code: 0,
3605        };
3606        let bytes = serde_json::to_vec(&emit).expect("serialize PayloadMetrics");
3607        let decoded: PayloadMetrics =
3608            serde_json::from_slice(&bytes).expect("decode PayloadMetrics from JSON bytes");
3609        assert_eq!(decoded.exit_code, emit.exit_code);
3610        assert_eq!(decoded.metrics.len(), emit.metrics.len());
3611        for (a, b) in decoded.metrics.iter().zip(emit.metrics.iter()) {
3612            assert_eq!(a.name, b.name);
3613            assert_eq!(a.value, b.value);
3614            assert_eq!(a.polarity, b.polarity);
3615            assert_eq!(a.unit, b.unit);
3616        }
3617    }
3618
3619    /// Hinted metrics pick up polarity + unit from the payload's
3620    /// declared MetricHints regardless of declaration order. Also
3621    /// pins that resolve_polarities leaves unhinted metrics at
3622    /// Polarity::Unknown / empty unit — the non-over-applying
3623    /// invariant the prior linear scan had.
3624    #[test]
3625    fn resolve_polarities_applies_hints_by_name_lookup() {
3626        use crate::test_support::{Metric, MetricHint, MetricStream, Polarity};
3627        static PAYLOAD: crate::test_support::Payload = crate::test_support::Payload {
3628            name: "hinted",
3629            kind: crate::test_support::PayloadKind::Binary("x"),
3630            output: crate::test_support::OutputFormat::Json,
3631            default_args: &[],
3632            default_checks: &[],
3633            // Out-of-order with the metric slice below so a naive
3634            // position-based lookup would miss.
3635            metrics: &[
3636                MetricHint {
3637                    name: "lat_ns",
3638                    polarity: Polarity::LowerBetter,
3639                    unit: "ns",
3640                },
3641                MetricHint {
3642                    name: "iops",
3643                    polarity: Polarity::HigherBetter,
3644                    unit: "iops",
3645                },
3646            ],
3647            include_files: &[],
3648            uses_parent_pgrp: false,
3649            known_flags: None,
3650        };
3651        let mut ms = vec![
3652            Metric {
3653                name: "iops".into(),
3654                value: 1.0,
3655                polarity: Polarity::Unknown,
3656                unit: String::new(),
3657                stream: MetricStream::Stdout,
3658            },
3659            Metric {
3660                name: "unhinted".into(),
3661                value: 2.0,
3662                polarity: Polarity::Unknown,
3663                unit: String::new(),
3664                stream: MetricStream::Stdout,
3665            },
3666            Metric {
3667                name: "lat_ns".into(),
3668                value: 3.0,
3669                polarity: Polarity::Unknown,
3670                unit: String::new(),
3671                stream: MetricStream::Stdout,
3672            },
3673        ];
3674        resolve_polarities(&mut ms, &PAYLOAD);
3675        // iops hinted → HigherBetter / "iops".
3676        assert_eq!(ms[0].polarity, Polarity::HigherBetter);
3677        assert_eq!(ms[0].unit, "iops");
3678        // unhinted stays Unknown + empty.
3679        assert_eq!(ms[1].polarity, Polarity::Unknown);
3680        assert_eq!(ms[1].unit, "");
3681        // lat_ns hinted → LowerBetter / "ns".
3682        assert_eq!(ms[2].polarity, Polarity::LowerBetter);
3683        assert_eq!(ms[2].unit, "ns");
3684    }
3685
3686    /// Empty hints or empty metrics are a fast-path — the HashMap
3687    /// build is skipped entirely. Pins the no-op invariant so a
3688    /// regression can't accidentally materialize an empty map for
3689    /// zero metrics on every hot-path call.
3690    /// When the payload declares two MetricHints with the same
3691    /// name, the HashMap build keeps the LAST insertion. The test
3692    /// pins that behavior so a future switch to a multimap or to
3693    /// first-wins semantics surfaces here. First-wins would be
3694    /// surprising: users who copy-paste a hint to tweak it expect
3695    /// the new value.
3696    #[test]
3697    fn resolve_polarities_duplicate_hint_names_last_wins() {
3698        use crate::test_support::{Metric, MetricHint, MetricStream, Polarity};
3699        static PAYLOAD: crate::test_support::Payload = crate::test_support::Payload {
3700            name: "dup_hints",
3701            kind: crate::test_support::PayloadKind::Binary("x"),
3702            output: crate::test_support::OutputFormat::Json,
3703            default_args: &[],
3704            default_checks: &[],
3705            metrics: &[
3706                MetricHint {
3707                    name: "iops",
3708                    polarity: Polarity::HigherBetter,
3709                    unit: "iops",
3710                },
3711                MetricHint {
3712                    name: "iops",
3713                    polarity: Polarity::LowerBetter,
3714                    unit: "overridden",
3715                },
3716            ],
3717            include_files: &[],
3718            uses_parent_pgrp: false,
3719            known_flags: None,
3720        };
3721        let mut ms = vec![Metric {
3722            name: "iops".into(),
3723            value: 1.0,
3724            polarity: Polarity::Unknown,
3725            unit: String::new(),
3726            stream: MetricStream::Stdout,
3727        }];
3728        resolve_polarities(&mut ms, &PAYLOAD);
3729        // Second declaration wins (HashMap last-insertion semantics).
3730        assert_eq!(ms[0].polarity, Polarity::LowerBetter);
3731        assert_eq!(ms[0].unit, "overridden");
3732    }
3733
3734    /// When the metric slice has duplicate names (e.g. a payload
3735    /// emitting the same dotted path twice in one run), the hint
3736    /// is applied to each occurrence. Each is a distinct Metric
3737    /// value in the sidecar; both must carry the hinted polarity +
3738    /// unit so downstream regression reports are consistent.
3739    #[test]
3740    fn resolve_polarities_duplicate_metric_names_each_gets_hint() {
3741        use crate::test_support::{Metric, MetricHint, MetricStream, Polarity};
3742        static PAYLOAD: crate::test_support::Payload = crate::test_support::Payload {
3743            name: "dup_metrics",
3744            kind: crate::test_support::PayloadKind::Binary("x"),
3745            output: crate::test_support::OutputFormat::Json,
3746            default_args: &[],
3747            default_checks: &[],
3748            metrics: &[MetricHint {
3749                name: "iops",
3750                polarity: Polarity::HigherBetter,
3751                unit: "iops",
3752            }],
3753            include_files: &[],
3754            uses_parent_pgrp: false,
3755            known_flags: None,
3756        };
3757        let mut ms = vec![
3758            Metric {
3759                name: "iops".into(),
3760                value: 1.0,
3761                polarity: Polarity::Unknown,
3762                unit: String::new(),
3763                stream: MetricStream::Stdout,
3764            },
3765            Metric {
3766                name: "iops".into(),
3767                value: 2.0,
3768                polarity: Polarity::Unknown,
3769                unit: String::new(),
3770                stream: MetricStream::Stdout,
3771            },
3772        ];
3773        resolve_polarities(&mut ms, &PAYLOAD);
3774        for m in &ms {
3775            assert_eq!(m.polarity, Polarity::HigherBetter);
3776            assert_eq!(m.unit, "iops");
3777        }
3778    }
3779
3780    #[test]
3781    fn resolve_polarities_empty_inputs_are_noop() {
3782        use crate::test_support::{Metric, MetricStream, Polarity};
3783        static NO_HINTS: crate::test_support::Payload = crate::test_support::Payload {
3784            name: "no_hints",
3785            kind: crate::test_support::PayloadKind::Binary("x"),
3786            output: crate::test_support::OutputFormat::Json,
3787            default_args: &[],
3788            default_checks: &[],
3789            metrics: &[],
3790            include_files: &[],
3791            uses_parent_pgrp: false,
3792            known_flags: None,
3793        };
3794        let mut ms = vec![Metric {
3795            name: "anything".into(),
3796            value: 1.0,
3797            polarity: Polarity::Unknown,
3798            unit: String::new(),
3799            stream: MetricStream::Stdout,
3800        }];
3801        resolve_polarities(&mut ms, &NO_HINTS);
3802        assert_eq!(ms[0].polarity, Polarity::Unknown);
3803        assert_eq!(ms[0].unit, "");
3804
3805        // Empty metrics list — also a fast-path no-op, just pin it
3806        // doesn't panic / over-allocate.
3807        let mut empty: Vec<Metric> = vec![];
3808        resolve_polarities(&mut empty, &NO_HINTS);
3809        assert!(empty.is_empty());
3810    }
3811
3812    /// In host (test) context the bulk port is not open and
3813    /// `is_guest()` is false, so the emit must NOT write — it returns
3814    /// `false` via the `write_msg` -> `assert_guest_context`
3815    /// early-return. Asserting the `false` return pins the
3816    /// observable no-op (no frame written) rather than merely that
3817    /// the call did not panic.
3818    #[test]
3819    fn emit_payload_metrics_returns_false_in_host_context() {
3820        let pm = PayloadMetrics {
3821            payload_index: 0,
3822            metrics: Vec::new(),
3823            exit_code: 0,
3824        };
3825        assert!(
3826            !emit_payload_metrics(&pm),
3827            "host-context emit must report no write (false) — the \
3828             bulk port is closed and assert_guest_context bails"
3829        );
3830    }
3831
3832    // -- PayloadHandle double-consume returns Err, not panic --
3833
3834    /// After `try_wait()` returns `Ok(Some(..))` (terminal branch
3835    /// that takes the child), a subsequent `try_wait()` on the same
3836    /// handle must return `Err` instead of panicking. Previously
3837    /// the implementation unwrapped `self.child.as_mut()` with a
3838    /// panicking `.expect(...)`.
3839    #[test]
3840    fn try_wait_after_terminal_returns_err() {
3841        with_ctx("/nonexistent", |ctx| {
3842            let mut handle = PayloadRun::new(ctx, &TRUE_BIN)
3843                .spawn()
3844                .expect("spawn /bin/true");
3845            // First terminal: /bin/true exits immediately; spin until
3846            // try_wait returns Some.
3847            for _ in 0..100 {
3848                if handle.try_wait().expect("try_wait").is_some() {
3849                    break;
3850                }
3851                std::thread::sleep(std::time::Duration::from_millis(10));
3852            }
3853            // Second call must not panic — expect Err describing the
3854            // consumed state.
3855            let err = handle
3856                .try_wait()
3857                .expect_err("second try_wait on consumed handle must be Err");
3858            let msg = format!("{err:#}");
3859            assert!(
3860                msg.contains("already consumed") && msg.contains("true_bin"),
3861                "error must name the handle + misuse, got: {msg}"
3862            );
3863        });
3864    }
3865
3866    /// Calling `wait()` after `try_wait()` has consumed the child
3867    /// must Err rather than panic. Test pairs with
3868    /// `try_wait_after_terminal_returns_err`: same state, different
3869    /// terminal method.
3870    #[test]
3871    fn wait_after_try_wait_consumed_returns_err() {
3872        with_ctx("/nonexistent", |ctx| {
3873            let mut handle = PayloadRun::new(ctx, &TRUE_BIN)
3874                .spawn()
3875                .expect("spawn /bin/true");
3876            for _ in 0..100 {
3877                if handle.try_wait().expect("try_wait").is_some() {
3878                    break;
3879                }
3880                std::thread::sleep(std::time::Duration::from_millis(10));
3881            }
3882            // Child is now taken; wait() must return Err, not panic.
3883            let err = handle
3884                .wait()
3885                .expect_err("wait() on consumed handle must return Err");
3886            let msg = format!("{err:#}");
3887            assert!(
3888                msg.contains("already consumed") && msg.contains("true_bin"),
3889                "error must name the handle + misuse, got: {msg}"
3890            );
3891        });
3892    }
3893
3894    /// Calling `kill()` after `try_wait()` has consumed the child
3895    /// must Err rather than panic.
3896    #[test]
3897    fn kill_after_try_wait_consumed_returns_err() {
3898        with_ctx("/nonexistent", |ctx| {
3899            let mut handle = PayloadRun::new(ctx, &TRUE_BIN)
3900                .spawn()
3901                .expect("spawn /bin/true");
3902            for _ in 0..100 {
3903                if handle.try_wait().expect("try_wait").is_some() {
3904                    break;
3905                }
3906                std::thread::sleep(std::time::Duration::from_millis(10));
3907            }
3908            let err = handle
3909                .kill()
3910                .expect_err("kill() on consumed handle must return Err");
3911            let msg = format!("{err:#}");
3912            assert!(
3913                msg.contains("already consumed") && msg.contains("true_bin"),
3914                "error must name the handle + misuse, got: {msg}"
3915            );
3916        });
3917    }
3918
3919    // -- stdout-primary / stderr-fallback evaluation --
3920
3921    const JSON_PAYLOAD: Payload = Payload {
3922        name: "json_payload",
3923        kind: PayloadKind::Binary("json_payload"),
3924        output: OutputFormat::Json,
3925        default_args: &[],
3926        default_checks: &[],
3927        metrics: &[],
3928        include_files: &[],
3929        uses_parent_pgrp: false,
3930        known_flags: None,
3931    };
3932
3933    /// Well-behaved case: stdout carries the JSON document; stderr
3934    /// carries banner noise the extractor must NOT see. Merging the
3935    /// streams would pull the banner into the metric blob; the
3936    /// fallback contract keeps stdout canonical.
3937    #[test]
3938    fn evaluate_prefers_stdout_when_stdout_yields_metrics() {
3939        let output = SpawnOutput {
3940            stdout: r#"{"iops": 500}"#.to_string(),
3941            stderr: "unrelated banner: open fd error (ignore)".to_string(),
3942            exit_code: 0,
3943        };
3944        let (_, pm) = evaluate(&JSON_PAYLOAD, &[], output);
3945        assert_eq!(pm.metrics.len(), 1, "stdout JSON must win");
3946        assert_eq!(pm.metrics[0].name, "iops");
3947        assert_eq!(pm.metrics[0].value, 500.0);
3948    }
3949
3950    /// schbench-style: the payload emits JSON percentiles on stderr,
3951    /// leaves stdout empty. Stdout-primary extraction returns an
3952    /// empty Vec, then the stderr fallback runs and produces the
3953    /// real metrics.
3954    #[test]
3955    fn evaluate_falls_back_to_stderr_when_stdout_empty() {
3956        let output = SpawnOutput {
3957            stdout: String::new(),
3958            stderr: r#"{"latency_ns": 1234}"#.to_string(),
3959            exit_code: 0,
3960        };
3961        let (_, pm) = evaluate(&JSON_PAYLOAD, &[], output);
3962        assert_eq!(pm.metrics.len(), 1, "stderr fallback must fire");
3963        assert_eq!(pm.metrics[0].name, "latency_ns");
3964        assert_eq!(pm.metrics[0].value, 1234.0);
3965    }
3966
3967    /// End-to-end stream-attribution pin for the stderr-fallback
3968    /// branch. When stdout carries no extractable metrics and the
3969    /// fallback pulls the real document from stderr, every emitted
3970    /// metric's `stream` field must tag `MetricStream::Stderr` —
3971    /// NOT `Stdout`. The attribution is what lets downstream review
3972    /// tools filter stderr-sourced metrics (well-behaved payloads
3973    /// keep stdout canonical; an all-stderr metric set is a review
3974    /// hint that the payload may be violating the channel
3975    /// convention). A regression that stamped `Stdout` on every
3976    /// Metric regardless of which stream it came from would silence that review
3977    /// signal without changing the metric values themselves — this
3978    /// test pins the attribution end-to-end so the regression
3979    /// cannot slip past the existing value-only asserts on the
3980    /// sibling fallback tests.
3981    ///
3982    /// Pairs three fallback shapes in one test: empty stdout, prose
3983    /// stdout, and valid-JSON-no-numeric-leaves stdout. The three
3984    /// share one fallback decision (`metrics.is_empty()` after
3985    /// stdout attempt), so their attribution invariant is identical;
3986    /// one test exercises all three to close the fallback-shape
3987    /// coverage gap for the stream field specifically.
3988    /// Positive control for the stream-attribution pin: when
3989    /// stdout carries valid JSON that extracts cleanly, every
3990    /// emitted metric's `stream` must tag `MetricStream::Stdout`
3991    /// — NOT `Stderr`. The sibling
3992    /// `stderr_fallback_tags_metrics_with_metric_stream_stderr`
3993    /// covers the fallback (negative) side; this test closes the
3994    /// symmetry gap. A regression that unconditionally stamped
3995    /// `Stderr` on every Metric (or swapped the two
3996    /// unconditionally) would trip the fallback test's value-
3997    /// agnostic `== Stderr` assertion OR this test's inverse
3998    /// `== Stdout` assertion — at least one of the two paths
3999    /// has to change its stream tag direction to hide the bug.
4000    ///
4001    /// Exercises the happy path with both a minimal JSON object
4002    /// and a multi-key JSON object, proving the attribution is
4003    /// per-metric rather than per-document. A regression that
4004    /// attributed based on document-level shape (e.g. "stream =
4005    /// Stderr if multi-key") would fail on the second fixture.
4006    #[test]
4007    fn stdout_primary_tags_metrics_with_metric_stream_stdout() {
4008        use crate::test_support::MetricStream;
4009
4010        for (label, stdout) in [
4011            ("single-key", r#"{"iops": 4242}"#.to_string()),
4012            (
4013                "multi-key",
4014                r#"{"iops": 1000, "latency_us": 42, "runs": 3}"#.to_string(),
4015            ),
4016        ] {
4017            let output = SpawnOutput {
4018                stdout,
4019                // stderr carries a distinct value so a regression
4020                // that merged the streams (or used stderr despite
4021                // stdout winning) would surface a wrong-valued
4022                // metric here alongside the wrong stream tag.
4023                stderr: r#"{"iops": 9999999}"#.to_string(),
4024                exit_code: 0,
4025            };
4026            let (_, pm) = evaluate(&JSON_PAYLOAD, &[], output);
4027            assert!(
4028                !pm.metrics.is_empty(),
4029                "[{label}] stdout-primary must produce metrics",
4030            );
4031            for m in &pm.metrics {
4032                assert_eq!(
4033                    m.stream,
4034                    MetricStream::Stdout,
4035                    "[{label}] stdout-extracted metric `{name}` must \
4036                     carry MetricStream::Stdout; got stream={stream:?}. \
4037                     A regression that mis-tagged stdout-sourced \
4038                     metrics as Stderr (or merged the streams) would \
4039                     trip here — the stderr-fallback sibling test \
4040                     covers the inverse direction.",
4041                    name = m.name,
4042                    stream = m.stream,
4043                );
4044            }
4045            // Stream-independence: the `iops` value MUST come from
4046            // stdout (4242 / 1000), not stderr (9999999). A
4047            // regression that pulled from the wrong stream would
4048            // both mis-tag AND mis-value, but the value check is
4049            // the ground-truth that the stream tag then describes.
4050            let iops = pm
4051                .metrics
4052                .iter()
4053                .find(|m| m.name == "iops")
4054                .expect("iops metric must be present");
4055            assert!(
4056                iops.value < 9_000_000.0,
4057                "[{label}] iops value {val} must come from stdout \
4058                 (< 9M) not stderr (9999999); a value from stderr \
4059                 would prove the test's stream tag is accidentally \
4060                 correct because the merge went the wrong way",
4061                val = iops.value,
4062            );
4063        }
4064    }
4065
4066    #[test]
4067    fn stderr_fallback_tags_metrics_with_metric_stream_stderr() {
4068        use crate::test_support::MetricStream;
4069
4070        for (label, stdout) in [
4071            ("empty-stdout", String::new()),
4072            (
4073                "prose-stdout",
4074                "no json here, just prose from a banner line\n".to_string(),
4075            ),
4076            (
4077                "valid-json-no-numeric-leaves-stdout",
4078                r#"{"status": "ok", "ready": true, "note": null}"#.to_string(),
4079            ),
4080        ] {
4081            let output = SpawnOutput {
4082                stdout,
4083                stderr: r#"{"iops": 4242}"#.to_string(),
4084                exit_code: 0,
4085            };
4086            let (_, pm) = evaluate(&JSON_PAYLOAD, &[], output);
4087            assert_eq!(
4088                pm.metrics.len(),
4089                1,
4090                "[{label}] stderr fallback must produce exactly one metric",
4091            );
4092            assert_eq!(
4093                pm.metrics[0].stream,
4094                MetricStream::Stderr,
4095                "[{label}] fallback-extracted metric must carry MetricStream::Stderr \
4096                 so downstream review tooling can distinguish stream origin; \
4097                 got stream={:?}",
4098                pm.metrics[0].stream,
4099            );
4100        }
4101    }
4102
4103    /// Stdout present but unparseable (not-JSON prose); stderr
4104    /// carries the real document. `extract_metrics` returns `Vec`
4105    /// empty for malformed stdout, so the fallback runs against
4106    /// stderr and recovers the metrics. Pins that "non-empty stdout
4107    /// that yields no metrics" still triggers the retry — the
4108    /// stdout-primary contract gates on the result, not on emptiness.
4109    #[test]
4110    fn evaluate_falls_back_to_stderr_when_stdout_yields_no_metrics() {
4111        let output = SpawnOutput {
4112            stdout: "no json here, just prose from a banner line\n".to_string(),
4113            stderr: r#"{"throughput": 42}"#.to_string(),
4114            exit_code: 0,
4115        };
4116        let (_, pm) = evaluate(&JSON_PAYLOAD, &[], output);
4117        assert_eq!(
4118            pm.metrics.len(),
4119            1,
4120            "stderr fallback must fire on empty result"
4121        );
4122        assert_eq!(pm.metrics[0].name, "throughput");
4123        assert_eq!(pm.metrics[0].value, 42.0);
4124    }
4125
4126    /// Stdout is valid JSON but contains only non-numeric leaves
4127    /// (strings, bools, nulls). `walk_json_leaves` at
4128    /// src/test_support/metrics.rs skips non-numeric leaves, so
4129    /// `extract_metrics` returns `Ok(vec![])` — a SUCCESSFUL parse
4130    /// with zero metrics. This is distinct from the
4131    /// "unparseable prose" case (`evaluate_falls_back_to_stderr_when_stdout_yields_no_metrics`
4132    /// above): that path fails to find any JSON document at all.
4133    /// The fallback condition (`evaluate`'s
4134    /// `if metrics.is_empty() && !output.stderr.is_empty()` gate)
4135    /// gates on `metrics.is_empty()`, not on parse success, so both
4136    /// paths must fall back to stderr. This test pins that: the
4137    /// fallback must not surface the empty stdout set as the
4138    /// result, and the string/bool/null leaves from stdout must
4139    /// not leak into the returned metrics (they can't — the walker
4140    /// never emitted them — but a future refactor that concatenated
4141    /// streams or merged results could regress this).
4142    #[test]
4143    fn evaluate_falls_back_when_stdout_json_has_no_numeric_leaves() {
4144        let output = SpawnOutput {
4145            stdout: r#"{"status": "ok", "ready": true, "note": null}"#.to_string(),
4146            stderr: r#"{"iops": 9001}"#.to_string(),
4147            exit_code: 0,
4148        };
4149        let (_, pm) = evaluate(&JSON_PAYLOAD, &[], output);
4150        assert_eq!(
4151            pm.metrics.len(),
4152            1,
4153            "stderr fallback must fire when stdout parses but has \
4154             no numeric leaves; got metrics: {:?}",
4155            pm.metrics,
4156        );
4157        assert_eq!(pm.metrics[0].name, "iops");
4158        assert_eq!(pm.metrics[0].value, 9001.0);
4159        // No stray string/bool/null names leaked in from stdout.
4160        for m in &pm.metrics {
4161            assert!(
4162                !matches!(m.name.as_str(), "status" | "ready" | "note"),
4163                "non-numeric stdout leaf {:?} leaked into metrics",
4164                m.name,
4165            );
4166        }
4167    }
4168
4169    /// Inverse of the above: both streams parse to JSON with no
4170    /// numeric leaves. Stdout extracts to `Ok(vec![])`, fallback
4171    /// fires, stderr also extracts to `Ok(vec![])`. Final metric
4172    /// set must be empty — not a synthetic pseudo-metric, not a
4173    /// silent merge of the two empty results with added string
4174    /// keys. Guards against a fallback refactor that might
4175    /// misinterpret "both empty" as "degenerate, emit a sentinel".
4176    #[test]
4177    fn evaluate_returns_empty_when_both_streams_have_no_numeric_leaves() {
4178        let output = SpawnOutput {
4179            stdout: r#"{"phase": "warmup"}"#.to_string(),
4180            stderr: r#"{"phase": "shutdown"}"#.to_string(),
4181            exit_code: 0,
4182        };
4183        let (_, pm) = evaluate(&JSON_PAYLOAD, &[], output);
4184        assert!(
4185            pm.metrics.is_empty(),
4186            "both-streams-non-numeric must produce no metrics; \
4187             got: {:?}",
4188            pm.metrics,
4189        );
4190    }
4191
4192    /// Both streams empty ⇒ no metrics; the fallback guard
4193    /// (`!output.stderr.is_empty()`) skips the second call and the
4194    /// extractor is invoked exactly once against empty stdout.
4195    #[test]
4196    fn evaluate_returns_empty_metrics_on_empty_stdout_and_stderr() {
4197        let output = SpawnOutput {
4198            stdout: String::new(),
4199            stderr: String::new(),
4200            exit_code: 0,
4201        };
4202        let (_, pm) = evaluate(&JSON_PAYLOAD, &[], output);
4203        assert!(pm.metrics.is_empty(), "both-empty must produce no metrics");
4204        assert_eq!(pm.exit_code, 0);
4205    }
4206
4207    /// Multi-process payloads (schbench worker mode, stress-ng, fio)
4208    /// fork descendants that keep stdout/stderr open past the head
4209    /// process. Without a process-group kill, `wait_and_capture`
4210    /// would block on a pipe that never EOFs and the test would
4211    /// either hang or time out without metrics.
4212    ///
4213    /// The payload `/bin/sh -c 'sleep 60 & exec sleep 60'` uses the
4214    /// shell's head process to exec into `sleep 60` (pid == pgid)
4215    /// while a background `sleep 60` descendant inherits the pgid.
4216    /// A single-process SIGKILL would leave the background sleeper
4217    /// alive; `killpg` must reach it.
4218    ///
4219    /// The existence probe reaps may lag the SIGKILL delivery — the
4220    /// loop waits up to 30s, which covers slow CI runners, a
4221    /// heavily-loaded host, and the `waitpid` race where the child
4222    /// is dying but not yet reaped.
4223    #[cfg(unix)]
4224    #[test]
4225    fn kill_reaps_fork_descendants_via_process_group() {
4226        with_ctx("/nonexistent", |ctx| {
4227            const MULTI_SLEEPER: Payload = Payload {
4228                name: "multi_sleeper",
4229                kind: PayloadKind::Binary("/bin/sh"),
4230                output: crate::test_support::OutputFormat::ExitCode,
4231                default_args: &["-c", "sleep 60 & exec sleep 60"],
4232                default_checks: &[],
4233                metrics: &[],
4234                include_files: &[],
4235                uses_parent_pgrp: false,
4236                known_flags: None,
4237            };
4238            let handle = PayloadRun::new(ctx, &MULTI_SLEEPER)
4239                .spawn()
4240                .expect("spawn multi-sleeper");
4241            // The pgid equals the head child's pid. Capture it via the
4242            // public `pid()` accessor so the test does not reach into the
4243            // private `child` field.
4244            let pgid = libc::pid_t::try_from(handle.pid().expect("child still present"))
4245                .expect("child pid fits in pid_t");
4246            let deadline = std::time::Instant::now() + std::time::Duration::from_secs(30);
4247            let (_, _) = handle.kill().expect("kill+reap");
4248            // After kill+reap the whole process group must be gone.
4249            // Poll `killpg(pgid, 0)` (existence probe) until ESRCH;
4250            // SIGKILL delivery + reap can lag the caller.
4251            loop {
4252                // SAFETY: killpg with signal 0 is a pure existence query
4253                // with no side effects beyond errno.
4254                let rc = unsafe { libc::killpg(pgid, 0) };
4255                if rc != 0 {
4256                    let err = std::io::Error::last_os_error();
4257                    assert_eq!(
4258                        err.raw_os_error(),
4259                        Some(libc::ESRCH),
4260                        "unexpected errno from killpg probe: {err}",
4261                    );
4262                    break;
4263                }
4264                if std::time::Instant::now() >= deadline {
4265                    panic!("process group {pgid} still alive after kill+reap");
4266                }
4267                std::thread::sleep(std::time::Duration::from_millis(20));
4268            }
4269        });
4270    }
4271
4272    /// Drop path of [`PayloadHandle`]: a handle that falls out of
4273    /// scope WITHOUT any consuming call (no `wait`, no `kill`, no
4274    /// `try_wait`) must still SIGKILL the whole process group via
4275    /// `kill_payload_process_group`. Without the Drop sweep,
4276    /// multi-process payloads whose head exits while descendants
4277    /// linger would leak their leader pid and keep descendants
4278    /// alive on init, polluting later tests with stray children
4279    /// holding file descriptors.
4280    ///
4281    /// Mirrors `kill_reaps_fork_descendants_via_process_group`
4282    /// (the explicit-`kill()` counterpart) but drops the handle
4283    /// instead of calling kill — pins the Drop implementation's
4284    /// killpg route against the same backgrounded-sleeper shape.
4285    #[cfg(unix)]
4286    #[test]
4287    fn drop_kills_fork_descendants_via_process_group() {
4288        with_ctx("/nonexistent", |ctx| {
4289            const MULTI_SLEEPER: Payload = Payload {
4290                name: "multi_sleeper_drop",
4291                kind: PayloadKind::Binary("/bin/sh"),
4292                output: crate::test_support::OutputFormat::ExitCode,
4293                default_args: &["-c", "sleep 60 & exec sleep 60"],
4294                default_checks: &[],
4295                metrics: &[],
4296                include_files: &[],
4297                uses_parent_pgrp: false,
4298                known_flags: None,
4299            };
4300            let handle = PayloadRun::new(ctx, &MULTI_SLEEPER)
4301                .spawn()
4302                .expect("spawn multi-sleeper");
4303            // Capture the pgid via the public `pid()` accessor before
4304            // dropping, so we can probe the group after the handle
4305            // goes out of scope.
4306            let pgid = libc::pid_t::try_from(handle.pid().expect("child still present"))
4307                .expect("child pid fits in pid_t");
4308            // Drop (no wait/kill/try_wait). The Drop impl at
4309            // src/scenario/payload_run.rs routes through
4310            // `kill_payload_process_group` + `child.wait()`.
4311            drop(handle);
4312            let deadline = std::time::Instant::now() + std::time::Duration::from_secs(30);
4313            loop {
4314                // SAFETY: killpg with signal 0 is a pure existence
4315                // query with no side effects beyond errno.
4316                let rc = unsafe { libc::killpg(pgid, 0) };
4317                if rc != 0 {
4318                    let err = std::io::Error::last_os_error();
4319                    assert_eq!(
4320                        err.raw_os_error(),
4321                        Some(libc::ESRCH),
4322                        "unexpected errno from killpg probe after drop: {err}",
4323                    );
4324                    break;
4325                }
4326                if std::time::Instant::now() >= deadline {
4327                    panic!(
4328                        "process group {pgid} still alive 30 s after \
4329                         PayloadHandle drop — Drop-path killpg sweep \
4330                         failed to reach every member",
4331                    );
4332                }
4333                std::thread::sleep(std::time::Duration::from_millis(20));
4334            }
4335        });
4336    }
4337
4338    /// `uses_parent_pgrp = true` SKIPS the `process_group(0)` call
4339    /// in `build_command`, so the child inherits the test
4340    /// process's pgid instead of becoming its own pgrp leader.
4341    /// Spawn a sleeping binary via a Payload with the flag set,
4342    /// `getpgid` the child's pid, and assert it equals the
4343    /// parent's pgid — that pairs the "opt-out" directive with
4344    /// the observable behaviour.
4345    #[cfg(unix)]
4346    #[test]
4347    fn payload_uses_parent_pgrp_opts_out_of_process_group() {
4348        with_ctx("/nonexistent", |ctx| {
4349            const PARENT_PGRP_SLEEPER: Payload = Payload {
4350                name: "parent_pgrp_sleeper",
4351                kind: PayloadKind::Binary("/bin/sleep"),
4352                output: crate::test_support::OutputFormat::ExitCode,
4353                default_args: &["60"],
4354                default_checks: &[],
4355                metrics: &[],
4356                include_files: &[],
4357                uses_parent_pgrp: true,
4358                known_flags: None,
4359            };
4360            let handle = PayloadRun::new(ctx, &PARENT_PGRP_SLEEPER)
4361                .spawn()
4362                .expect("spawn opt-out sleeper");
4363            let child_pid = libc::pid_t::try_from(handle.pid().expect("child alive"))
4364                .expect("child pid fits in pid_t");
4365            // SAFETY: getpgid(pid) is a pure lookup with no side
4366            // effects beyond returning the queried pid's pgid (or -1
4367            // + errno on failure).
4368            let child_pgid = unsafe { libc::getpgid(child_pid) };
4369            // SAFETY: getpgid(0) returns the CURRENT process's pgid
4370            // and cannot fail.
4371            let parent_pgid = unsafe { libc::getpgid(0) };
4372            assert!(child_pgid > 0, "getpgid(child) failed: {child_pgid}");
4373            assert_eq!(
4374                child_pgid, parent_pgid,
4375                "uses_parent_pgrp=true payload must inherit the \
4376                 parent's pgid (child_pgid={child_pgid}, \
4377                 parent_pgid={parent_pgid}); a mismatch means \
4378                 `build_command` still called `process_group(0)` \
4379                 despite the opt-out",
4380            );
4381            // kill() on a handle whose child is not a pgrp leader
4382            // still reaps normally — kill_payload_process_group
4383            // falls back to single-pid SIGKILL. Consume the handle
4384            // so the sleeper doesn't outlive the test; a silent
4385            // failure here would mask the test's own regression
4386            // (e.g. a broken kill path that leaks sleepers).
4387            let _ = handle.kill().expect("kill opt-out sleeper");
4388        });
4389    }
4390
4391    /// `wait_with_deadline` timeout kills the whole process group
4392    /// via killpg + single-pid SIGKILL. Spawn a multi-process
4393    /// shell, drive `wait_with_deadline` with a 500 ms budget
4394    /// (so the whole test fits inside the 30s-slack nextest
4395    /// budget without standing up a whole scenario) and probes the
4396    /// pgid with `killpg(pgid, 0)` after the deadline fires —
4397    /// ESRCH proves the sweep reached every member.
4398    #[cfg(unix)]
4399    #[test]
4400    fn wait_with_deadline_timeout_kills_process_group() {
4401        use std::os::unix::process::CommandExt;
4402        let mut child = std::process::Command::new("/bin/sh")
4403            .args(["-c", "sleep 60 & exec sleep 60"])
4404            .stdout(std::process::Stdio::piped())
4405            .stderr(std::process::Stdio::piped())
4406            .process_group(0)
4407            .spawn()
4408            .expect("spawn multi-sleeper");
4409        let pgid = libc::pid_t::try_from(child.id()).expect("child pid fits in pid_t");
4410        let start = std::time::Instant::now();
4411        let out = wait_with_deadline(
4412            &mut child,
4413            std::time::Duration::from_millis(500),
4414            "multi_sleeper_timeout",
4415            false,
4416        )
4417        .expect("wait_with_deadline returns Ok on timeout");
4418        let elapsed = start.elapsed();
4419        // Timeout must actually have elapsed — if the function
4420        // returns almost instantly, the pidfd/epoll loop is
4421        // short-circuiting on an unrelated signal rather than
4422        // waiting for the 500 ms deadline.
4423        assert!(
4424            elapsed >= std::time::Duration::from_millis(400),
4425            "wait_with_deadline returned after only {elapsed:?}; \
4426             deadline was 500 ms — check the epoll loop is honoring \
4427             the timeout rather than unblocking on an unrelated event",
4428        );
4429        // The drain result must be captured even on timeout.
4430        // After SIGKILL the child's std::process::ExitStatus has
4431        // no numeric code (killed by signal, `status.code()`
4432        // returns None), so `wait_and_capture` defaults to -1 at
4433        // src/scenario/payload_run.rs per the `unwrap_or(-1)`
4434        // fallback in its status-code read. Pin that contract —
4435        // a future refactor that surfaces the signal number as
4436        // the exit_code would regress this.
4437        assert_eq!(out.exit_code, -1);
4438        // After timeout-driven kill+reap, the whole process group
4439        // must be gone. Poll `killpg(pgid, 0)` (existence probe)
4440        // until ESRCH — SIGKILL delivery + reap of the backgrounded
4441        // sleeper can lag the caller, so allow up to 30 s (matches
4442        // kill_reaps_fork_descendants_via_process_group's budget).
4443        let deadline = std::time::Instant::now() + std::time::Duration::from_secs(30);
4444        loop {
4445            // SAFETY: killpg with signal 0 is a pure existence
4446            // query with no side effects beyond errno.
4447            let rc = unsafe { libc::killpg(pgid, 0) };
4448            if rc != 0 {
4449                let err = std::io::Error::last_os_error();
4450                assert_eq!(
4451                    err.raw_os_error(),
4452                    Some(libc::ESRCH),
4453                    "unexpected errno from killpg probe after \
4454                     timeout: {err}",
4455                );
4456                break;
4457            }
4458            if std::time::Instant::now() >= deadline {
4459                panic!(
4460                    "process group {pgid} still alive 30 s after \
4461                     wait_with_deadline timeout fired — killpg sweep \
4462                     in the timeout branch failed to reach every \
4463                     member",
4464                );
4465            }
4466            std::thread::sleep(std::time::Duration::from_millis(20));
4467        }
4468    }
4469
4470    /// [`spawn_error_context`] is the sole place the spawn-error
4471    /// surface is shaped. An `ErrorKind::NotFound` must grow the full
4472    /// remediation chain (include-files for CLI invocations,
4473    /// pre-install for `#[ktstr_test]` entries); every other errno
4474    /// MUST keep the minimal `"spawn '<binary>'"` context so the
4475    /// underlying `io::Error` chain surfaces unchanged. Pin both
4476    /// directions so a regression that (a) swallows the NotFound
4477    /// remediation or (b) sprays the remediation across unrelated
4478    /// errno paths surfaces here.
4479    #[test]
4480    fn spawn_error_context_enoent_attaches_remediation() {
4481        let err = std::io::Error::from_raw_os_error(libc::ENOENT);
4482        assert_eq!(err.kind(), std::io::ErrorKind::NotFound);
4483        let wrapped = super::spawn_error_context(err, "fio");
4484        let rendered = format!("{wrapped:#}");
4485        // Binary name still appears so `grep fio` still finds the error.
4486        assert!(rendered.contains("spawn 'fio'"), "got: {rendered}");
4487        // Remediation text must surface both mitigation paths.
4488        assert!(
4489            rendered.contains("not found on guest PATH"),
4490            "ENOENT branch must name the PATH miss: {rendered}"
4491        );
4492        assert!(
4493            rendered.contains("-i fio") || rendered.contains("--include-files fio"),
4494            "ENOENT branch must name the `-i <binary>` remediation: {rendered}"
4495        );
4496        assert!(
4497            rendered.contains("#[ktstr_test]"),
4498            "ENOENT branch must name the ktstr_test pre-install remediation: {rendered}"
4499        );
4500    }
4501
4502    #[test]
4503    fn spawn_error_context_non_enoent_keeps_minimal_context() {
4504        // EACCES is a representative non-NotFound errno. Any
4505        // remediation text leaking onto this path would mislead
4506        // users who e.g. hit a permission problem — the remediation
4507        // paths above (include-files, pre-install) are orthogonal
4508        // to the failure mode. Pin the absence.
4509        let err = std::io::Error::from_raw_os_error(libc::EACCES);
4510        assert_ne!(err.kind(), std::io::ErrorKind::NotFound);
4511        let wrapped = super::spawn_error_context(err, "fio");
4512        let rendered = format!("{wrapped:#}");
4513        assert!(rendered.contains("spawn 'fio'"), "got: {rendered}");
4514        assert!(
4515            !rendered.contains("-i fio"),
4516            "non-ENOENT must not leak the `-i` remediation: {rendered}"
4517        );
4518        assert!(
4519            !rendered.contains("--include-files"),
4520            "non-ENOENT must not leak the --include-files remediation: {rendered}"
4521        );
4522        assert!(
4523            !rendered.contains("#[ktstr_test]"),
4524            "non-ENOENT must not leak the ktstr_test remediation: {rendered}"
4525        );
4526        assert!(
4527            !rendered.contains("not found on guest PATH"),
4528            "non-ENOENT must not claim 'not found on PATH': {rendered}"
4529        );
4530    }
4531
4532    // -- cgroup-sync placement protocol --
4533
4534    /// When `cgroup_path` is `None`, `build_command` must return a
4535    /// Command with NO cgroup-sync handles. Regression guard
4536    /// against accidentally wiring the sync for inherited-cgroup
4537    /// placements, where the handshake would produce spurious
4538    /// pipe allocations and a spawn-thread round-trip for every
4539    /// payload run.
4540    #[test]
4541    fn build_command_without_cgroup_returns_no_sync_handles() {
4542        let (_cmd, handles) = super::build_command("/bin/true", &[], None, false).unwrap();
4543        assert!(
4544            handles.is_none(),
4545            "no cgroup_path ⇒ no sync handles — got Some(_)",
4546        );
4547    }
4548
4549    /// When `cgroup` is `Some(_)`, `build_command` must
4550    /// allocate both pipes and populate the cgroup.procs path
4551    /// plus the cgroup_name passed through. The target directory
4552    /// does NOT need to exist at build time — the placement is
4553    /// deferred to `spawn_with_cgroup_sync`, where a failure
4554    /// surfaces through the [`crate::cgroup::CgroupOps`] trait
4555    /// implementation's error context.
4556    #[test]
4557    fn build_command_with_cgroup_returns_sync_handles() {
4558        let fake_cg = std::path::PathBuf::from("/nonexistent/fake-cgroup");
4559        let (_cmd, handles) = super::build_command(
4560            "/bin/true",
4561            &[],
4562            Some(("fake-cg", fake_cg.as_path())),
4563            false,
4564        )
4565        .expect("build_command must defer cgroup-path validation to sync");
4566        let handles = handles.expect("cgroup path ⇒ handles");
4567        assert_eq!(
4568            handles.cgroup_procs_path,
4569            fake_cg.join("cgroup.procs"),
4570            "handles must carry <cg>/cgroup.procs verbatim",
4571        );
4572        assert_eq!(
4573            handles.cgroup_name, "fake-cg",
4574            "handles must carry the user-facing cgroup name verbatim",
4575        );
4576        // Both pipes must have valid fds on both ends (pipe2
4577        // succeeded).
4578        assert!(handles.notify.r_fd() >= 0);
4579        assert!(handles.notify.w_fd() >= 0);
4580        assert!(handles.release.r_fd() >= 0);
4581        assert!(handles.release.w_fd() >= 0);
4582    }
4583
4584    /// `PipePair::new` allocates a fresh pipe on every call;
4585    /// pins the Drop path closes both fds so repeated calls
4586    /// don't leak fd-table entries under test iteration.
4587    #[test]
4588    fn pipe_pair_allocates_fresh_pipe_on_each_call() {
4589        use std::io::{Read, Write};
4590        use std::os::fd::{AsRawFd, FromRawFd};
4591        let a = super::PipePair::new().unwrap();
4592        let b = super::PipePair::new().unwrap();
4593        // Distinct fd pairs.
4594        assert_ne!(a.r_fd(), b.r_fd());
4595        assert_ne!(a.w_fd(), b.w_fd());
4596        // Each pipe is a plumbed byte channel: write one byte
4597        // into A's write end, read it from A's read end.
4598        //
4599        // Drive the roundtrip via std::fs::File so we don't hit
4600        // libc directly in the test.
4601        {
4602            let mut w = unsafe { std::fs::File::from_raw_fd(a.w_fd()) };
4603            w.write_all(&[42u8]).unwrap();
4604            // Detach — the File closes the fd when dropped, but
4605            // we want the OwnedFd on the PipePair to handle it.
4606            std::mem::forget(w);
4607        }
4608        let mut buf = [0u8; 1];
4609        let mut r = unsafe { std::fs::File::from_raw_fd(a.read_fd.as_raw_fd()) };
4610        r.read_exact(&mut buf).unwrap();
4611        assert_eq!(buf[0], 42);
4612        std::mem::forget(r);
4613        // Drop the second pipe explicitly to exercise the Drop path.
4614        drop(b.read_fd);
4615        drop(b.write_fd);
4616    }
4617
4618    /// End-to-end: `drive_cgroup_handshake` reads a pid sent via the
4619    /// notify pipe, writes it to a temp "cgroup.procs" file, and
4620    /// releases the "child" via the release pipe. Exercises the
4621    /// real protocol without requiring a real cgroup — the temp
4622    /// file stands in for `/sys/fs/cgroup/<cg>/cgroup.procs`,
4623    /// whose acceptable write format is `<pid>\n`.
4624    ///
4625    /// Uses a synthetic Command that can't actually reach spawn
4626    /// (`/nonexistent`), but the test only drives the
4627    /// handshake half via a fake `CgroupSyncHandles`; the spawn
4628    /// side is stubbed by running the handshake directly, not
4629    /// through `drive_cgroup_handshake`'s thread wrapper.
4630    #[test]
4631    fn spawn_with_cgroup_sync_writes_pid_and_releases_child() {
4632        use std::io::Read;
4633        use std::os::fd::FromRawFd;
4634
4635        // Stand-in for cgroup.procs in a temp dir. The CgroupManager
4636        // points at `parent_dir`; the cgroup_name is `cg_leaf`, so
4637        // the trait derives `parent_dir/cg_leaf/cgroup.procs` =
4638        // `procs_path` — matches the real production resolution.
4639        let parent_dir = std::env::temp_dir().join(format!("ktstr-cgroup-sync-test-{}", unsafe {
4640            libc::getpid()
4641        }));
4642        std::fs::create_dir_all(&parent_dir).unwrap();
4643        let leaf_dir = parent_dir.join("cg_leaf");
4644        std::fs::create_dir_all(&leaf_dir).unwrap();
4645        let procs_path = leaf_dir.join("cgroup.procs");
4646        std::fs::write(&procs_path, b"").unwrap();
4647
4648        // Allocate two pipe pairs — one notify, one release.
4649        let notify = super::PipePair::new().unwrap();
4650        let release = super::PipePair::new().unwrap();
4651
4652        // Simulate child pre_exec: write pid into notify,
4653        // block on release. Run on a thread so the main test
4654        // thread can drive the handshake without a deadlock.
4655        let child_pid: libc::pid_t = 99999;
4656        let notify_w_fd = notify.w_fd();
4657        let release_r_fd = release.r_fd();
4658        let child_thread = std::thread::spawn(move || {
4659            use std::io::Write;
4660            // Write pid as LE bytes, matching the real pre_exec.
4661            let mut w = unsafe { std::fs::File::from_raw_fd(notify_w_fd) };
4662            w.write_all(&child_pid.to_le_bytes()).unwrap();
4663            drop(w);
4664            // Block on release.
4665            let mut r = unsafe { std::fs::File::from_raw_fd(release_r_fd) };
4666            let mut buf = [0u8; 1];
4667            r.read_exact(&mut buf).unwrap();
4668            assert_eq!(buf[0], 1, "release byte must be 1");
4669            drop(r);
4670        });
4671
4672        // Prevent PipePair's Drop from closing the fds we
4673        // handed to the thread — the thread owns them now.
4674        std::mem::forget(notify.write_fd);
4675        std::mem::forget(release.read_fd);
4676
4677        // Reassemble the handles into the bundle
4678        // `spawn_with_cgroup_sync` consumes. We MUST rebuild the
4679        // PipePair with the remaining fds so its Drop closes
4680        // them on exit.
4681        let notify_r = notify.read_fd;
4682        let release_w = release.write_fd;
4683        let handles = super::CgroupSyncHandles {
4684            notify: super::PipePair {
4685                read_fd: notify_r,
4686                // Dummy fd the drop will close — we need
4687                // something valid. /dev/null satisfies that.
4688                write_fd: unsafe {
4689                    std::os::fd::OwnedFd::from_raw_fd(libc::open(
4690                        c"/dev/null".as_ptr(),
4691                        libc::O_WRONLY,
4692                    ))
4693                },
4694            },
4695            release: super::PipePair {
4696                read_fd: unsafe {
4697                    std::os::fd::OwnedFd::from_raw_fd(libc::open(
4698                        c"/dev/null".as_ptr(),
4699                        libc::O_RDONLY,
4700                    ))
4701                },
4702                write_fd: release_w,
4703            },
4704            cgroup_name: "cg_leaf".to_string(),
4705            cgroup_procs_path: procs_path.clone(),
4706        };
4707
4708        // CgroupManager.parent_path = parent_dir, so
4709        // place_task_during_handshake("cg_leaf", pid) targets
4710        // parent_dir/cg_leaf/cgroup.procs = procs_path.
4711        let mgr = crate::cgroup::CgroupManager::new(parent_dir.to_str().unwrap());
4712
4713        // Drive the handshake on the main thread.
4714        let returned_pid = super::spawn_with_cgroup_sync(handles, &mgr).unwrap();
4715        assert_eq!(
4716            returned_pid, child_pid,
4717            "spawn_with_cgroup_sync must return the pid it read \
4718             from the notify pipe",
4719        );
4720
4721        // The child thread must complete after the release byte
4722        // arrives — join here and capture any panic propagation.
4723        child_thread
4724            .join()
4725            .expect("child thread completes after release");
4726
4727        // The temp cgroup.procs file must now contain the pid
4728        // followed by a newline (`CgroupManager::place_task_during_handshake`
4729        // writes the same `<pid>\n` shape the bare-stdlib path used).
4730        let written = std::fs::read_to_string(&procs_path).unwrap();
4731        assert_eq!(
4732            written,
4733            format!("{child_pid}\n"),
4734            "spawn_with_cgroup_sync must write <pid>\\n to cgroup.procs; \
4735             got {written:?}",
4736        );
4737
4738        // Cleanup.
4739        let _ = std::fs::remove_file(&procs_path);
4740        let _ = std::fs::remove_dir(&leaf_dir);
4741        let _ = std::fs::remove_dir(&parent_dir);
4742    }
4743
4744    /// Failure shape: if the placement trait call cannot reach the
4745    /// target cgroup (parent dir missing), the handshake surfaces
4746    /// an error that names both the cgroup_name the trait received
4747    /// AND the absolute cgroup.procs path the implementation
4748    /// derived. The child thread must NOT hang — it receives EOF
4749    /// on its release read because the handles (carrying the
4750    /// release write end) are dropped on the error path.
4751    #[test]
4752    fn spawn_with_cgroup_sync_errors_on_missing_cgroup_procs_path() {
4753        use std::os::fd::FromRawFd;
4754        let missing_parent = std::path::PathBuf::from("/nonexistent/dir/that/does/not/exist");
4755        let missing_path = missing_parent.join("missing-cg").join("cgroup.procs");
4756
4757        let notify = super::PipePair::new().unwrap();
4758        let release = super::PipePair::new().unwrap();
4759
4760        let child_pid: libc::pid_t = 12345;
4761        let notify_w_fd = notify.w_fd();
4762        let release_r_fd = release.r_fd();
4763        let child_thread = std::thread::spawn(move || -> std::io::Error {
4764            use std::io::{Read, Write};
4765            let mut w = unsafe { std::fs::File::from_raw_fd(notify_w_fd) };
4766            let _ = w.write_all(&child_pid.to_le_bytes());
4767            drop(w);
4768            // Block on release. Expect EOF (read_exact → Err
4769            // when the parent drops its write end on the error
4770            // path).
4771            let mut r = unsafe { std::fs::File::from_raw_fd(release_r_fd) };
4772            let mut buf = [0u8; 1];
4773            let err = r.read_exact(&mut buf).unwrap_err();
4774            drop(r);
4775            err
4776        });
4777
4778        std::mem::forget(notify.write_fd);
4779        std::mem::forget(release.read_fd);
4780
4781        let notify_r = notify.read_fd;
4782        let release_w = release.write_fd;
4783        let handles = super::CgroupSyncHandles {
4784            notify: super::PipePair {
4785                read_fd: notify_r,
4786                write_fd: unsafe {
4787                    std::os::fd::OwnedFd::from_raw_fd(libc::open(
4788                        c"/dev/null".as_ptr(),
4789                        libc::O_WRONLY,
4790                    ))
4791                },
4792            },
4793            release: super::PipePair {
4794                read_fd: unsafe {
4795                    std::os::fd::OwnedFd::from_raw_fd(libc::open(
4796                        c"/dev/null".as_ptr(),
4797                        libc::O_RDONLY,
4798                    ))
4799                },
4800                write_fd: release_w,
4801            },
4802            cgroup_name: "missing-cg".to_string(),
4803            cgroup_procs_path: missing_path.clone(),
4804        };
4805
4806        let mgr = crate::cgroup::CgroupManager::new(missing_parent.to_str().unwrap());
4807        let err = super::spawn_with_cgroup_sync(handles, &mgr).unwrap_err();
4808        let rendered = format!("{err:#}");
4809        assert!(
4810            rendered.contains("missing-cg"),
4811            "error must name the user-facing cgroup name: {rendered}",
4812        );
4813        assert!(
4814            rendered.contains("/nonexistent/dir/that/does/not/exist"),
4815            "error must name the failing path: {rendered}",
4816        );
4817
4818        // Child thread sees EOF because the release write end
4819        // was dropped on the error path.
4820        let child_err = child_thread.join().expect("child thread returns");
4821        assert_eq!(
4822            child_err.kind(),
4823            std::io::ErrorKind::UnexpectedEof,
4824            "child's release read must hit EOF when parent abandons sync; got {child_err}",
4825        );
4826    }
4827
4828    /// **Regression guard for the cross-fork inherited-fd
4829    /// deadlock.** Exercises the REAL fork path: builds a
4830    /// cgroup-sync Command targeting `/bin/true` against a
4831    /// nonexistent cgroup path, then calls
4832    /// [`drive_cgroup_handshake`] (which runs `Command::spawn()`
4833    /// on a thread and drives the parent-side protocol on the
4834    /// main thread).
4835    ///
4836    /// On the error path the parent drops its owned
4837    /// `release.write_fd` when `drive_cgroup_handshake` bails
4838    /// on the missing cgroup.procs. **Without `cgroup_sync_pre_exec`
4839    /// closing the CHILD's inherited copy of `release_write_fd`
4840    /// (Step 0 of the pre_exec protocol)**, the kernel still
4841    /// sees the child's inherited writer alive — the pipe never
4842    /// EOFs — the child's `read(release_read_fd)` blocks forever
4843    /// — `drive_cgroup_handshake` returns the error but the
4844    /// spawn thread's `join()` blocks indefinitely.
4845    ///
4846    /// With the Step 0 close in place, the child's pre_exec
4847    /// read hits EOF (→ EPIPE), the stdlib spawn path writes
4848    /// the errno to its CLOEXEC error channel and tears down
4849    /// the child, the spawn thread's `cmd.spawn()` returns
4850    /// `Err`, and our `join()` completes within the test
4851    /// deadline. A 10s timeout wraps the whole handshake —
4852    /// a regression that re-introduces the inherited-fd leak
4853    /// surfaces as a timeout panic, not a hang.
4854    #[test]
4855    fn drive_cgroup_handshake_does_not_deadlock_on_failing_cgroup_write() {
4856        use std::sync::mpsc;
4857
4858        // Pick a path that cannot possibly open — including a
4859        // guaranteed-missing parent dir so the placement step
4860        // fails hard in `drive_cgroup_handshake`.
4861        let missing_parent =
4862            std::path::PathBuf::from("/nonexistent/ktstr-cgroup-sync-deadlock-parent");
4863        let missing_cgroup = missing_parent.join("deadlock-guard");
4864
4865        // Run the whole exercise in a worker thread so the test
4866        // driver can time-box it: if the child's release read
4867        // ever blocks past the 10s budget we PANIC the timer
4868        // thread rather than hang the test harness.
4869        let (tx, rx) = mpsc::channel::<anyhow::Result<()>>();
4870        let worker = std::thread::spawn(move || {
4871            let (cmd, handles) = super::build_command(
4872                "/bin/true",
4873                &[],
4874                Some(("deadlock-guard", missing_cgroup.as_path())),
4875                false,
4876            )
4877            .expect("build_command");
4878            let handles = handles.expect("handles present when cgroup is Some");
4879            // CgroupManager whose `parent` does not exist, so
4880            // `place_task_during_handshake` will fail on the
4881            // bare `<missing>/<cg>/cgroup.procs` write — the
4882            // same failure shape the deadlock guard targets.
4883            let mgr = crate::cgroup::CgroupManager::new(missing_parent.to_str().unwrap());
4884            let result = super::drive_cgroup_handshake(cmd, handles, "/bin/true", &mgr);
4885            // drive_cgroup_handshake must surface an error
4886            // (the placement write failed) — if it succeeds
4887            // that's also a correctness violation because the
4888            // target directory does not exist.
4889            let err = result.expect_err("handshake against nonexistent cgroup.procs must Err");
4890            let rendered = format!("{err:#}");
4891            assert!(
4892                rendered.contains("cgroup.procs") || rendered.contains("deadlock-guard"),
4893                "handshake error must name the failing step: {rendered}",
4894            );
4895            let _ = tx.send(Ok(()));
4896        });
4897
4898        // 10s deadline — well beyond any legitimate stdlib spawn
4899        // + fork + pre_exec + error-channel latency on a loaded
4900        // CI host, tight enough to flag a real deadlock quickly.
4901        let deadline = std::time::Duration::from_secs(10);
4902        match rx.recv_timeout(deadline) {
4903            Ok(Ok(())) => {
4904                // Worker thread finished cleanly within budget.
4905                worker
4906                    .join()
4907                    .expect("worker thread completes without panic");
4908            }
4909            Ok(Err(e)) => panic!("worker thread reported error: {e:#}"),
4910            Err(mpsc::RecvTimeoutError::Timeout) => panic!(
4911                "drive_cgroup_handshake did not return within \
4912                 {deadline:?} — cross-fork inherited-fd deadlock \
4913                 has regressed. The child's pre_exec is almost \
4914                 certainly blocking on `read(release_read_fd)` \
4915                 because it still holds its own inherited copy of \
4916                 `release_write_fd` open; Step 0 of \
4917                 `cgroup_sync_pre_exec` must `close()` both \
4918                 `notify_read_fd` and `release_write_fd` BEFORE \
4919                 the release-read block, otherwise the kernel \
4920                 never delivers EOF when the parent drops its \
4921                 write end.",
4922            ),
4923            Err(mpsc::RecvTimeoutError::Disconnected) => {
4924                panic!("worker thread disconnected without reporting",)
4925            }
4926        }
4927    }
4928
4929    /// LIFO-drop pin for [`SigchldScope`]'s save-and-restore chain.
4930    ///
4931    /// `SigchldScope::new()` installs `SIG_DFL` and captures the
4932    /// PREVIOUS disposition into `prev`. When two scopes are
4933    /// constructed back-to-back on the same thread, the second
4934    /// scope's `prev` is the FIRST scope's `SIG_DFL` install (not
4935    /// the original disposition). The original disposition lives in
4936    /// the FIRST scope's `prev`.
4937    ///
4938    /// Drop order therefore matters: LIFO (drop second-constructed
4939    /// first) unwinds correctly — second drop restores `SIG_DFL`,
4940    /// first drop restores the original. NON-LIFO (drop
4941    /// first-constructed first) corrupts the disposition: first
4942    /// drop restores the original, second drop overwrites it with
4943    /// `SIG_DFL`, and the rest of the process runs with the wrong
4944    /// SIGCHLD handler.
4945    ///
4946    /// `PayloadHandle` keeps `_sigchld` as the LAST struct field so
4947    /// per-handle drop is always LIFO with respect to the child it
4948    /// guards — the field-order rule documented at the
4949    /// `PayloadHandle` definition. This test pins that LIFO drop of
4950    /// nested scopes preserves the initial disposition by capturing
4951    /// the disposition before, during, and after the scope chain
4952    /// unwinds, then asserting only the LIFO ordering produces the
4953    /// "after == initial" outcome.
4954    ///
4955    /// Implementation note: `libc::signal` returns the previous
4956    /// handler on every call. We use that as the read mechanism —
4957    /// install `SIG_DFL` to read the current handler, then
4958    /// immediately restore it. The read itself is destructive, so
4959    /// the helper has to put the original back before returning.
4960    ///
4961    /// The thread-pin in `SigchldScope::new` requires every
4962    /// construction in the process to come from the SAME thread.
4963    /// Nextest runs each test in its own process (per
4964    /// `.config/nextest.toml`), so this test's thread-of-call wins
4965    /// the pin and other tests' threads are isolated.
4966    #[test]
4967    fn sigchld_scope_lifo_drop_restores_initial_disposition() {
4968        // Read the current SIGCHLD disposition non-destructively by
4969        // saving the value `signal()` returns, then immediately
4970        // re-installing it. SAFETY: same as SigchldScope::new — the
4971        // thread-pin guarantees no other thread is racing the
4972        // process-wide disposition install.
4973        fn read_sigchld() -> libc::sighandler_t {
4974            // Pin the construction thread BEFORE we install — the
4975            // SigchldScope thread-pin is initialized on its first
4976            // `new()` call, but this helper runs before any scope
4977            // is constructed. Without a separate pin step, the
4978            // first scope below would be the one that pins. That
4979            // is fine in practice; the helper just has to use the
4980            // same thread the scopes do, which is the test thread.
4981            let handler = unsafe { libc::signal(libc::SIGCHLD, libc::SIG_DFL) };
4982            // Restore immediately so we don't leak the read-side
4983            // SIG_DFL into the rest of the test. Note: this is
4984            // observably equivalent to "do nothing" if the
4985            // disposition WAS already SIG_DFL, which is the common
4986            // case on the host (the guest init's SIG_IGN flip is
4987            // not active in the unit-test process).
4988            unsafe {
4989                libc::signal(libc::SIGCHLD, handler);
4990            }
4991            handler
4992        }
4993
4994        let initial = read_sigchld();
4995        // Construct nested scopes:
4996        // - outer.prev = initial; outer's new() installs SIG_DFL.
4997        // - inner.prev = SIG_DFL; inner's new() installs SIG_DFL
4998        //   (a no-op since outer already did).
4999        // After both new() calls, the live disposition is SIG_DFL.
5000        let outer = SigchldScope::new();
5001        let inner = SigchldScope::new();
5002        // LIFO drop: inner (constructed second) first.
5003        // - inner.drop() re-installs inner.prev (= SIG_DFL).
5004        //   Live disposition: SIG_DFL.
5005        // - Read confirms SIG_DFL.
5006        // - outer.drop() re-installs outer.prev (= initial).
5007        //   Live disposition: initial.
5008        // - Read confirms initial.
5009        //
5010        // Non-LIFO drop (drop outer first, then inner) would
5011        // produce: outer.drop() restores initial, inner.drop()
5012        // overwrites with SIG_DFL. The final read would see
5013        // SIG_DFL != initial. This test pins LIFO by structuring
5014        // the drops in LIFO order and asserting initial is
5015        // restored at the end. A future refactor that reordered
5016        // struct fields or shuffled drops would trip the final
5017        // assert.
5018        drop(inner);
5019        assert_eq!(
5020            read_sigchld(),
5021            libc::SIG_DFL,
5022            "after inner drop, live disposition must be SIG_DFL — \
5023             inner.prev was outer's SIG_DFL install, not initial",
5024        );
5025        drop(outer);
5026        assert_eq!(
5027            read_sigchld(),
5028            initial,
5029            "after outer drop, live disposition must equal initial \
5030             ({initial:#x}); a non-LIFO drop would leave SIG_DFL \
5031             ({:#x}) leaking into the process",
5032            libc::SIG_DFL,
5033        );
5034    }
5035
5036    /// Drop-side pin for [`PayloadHandle`]: dropping a real handle
5037    /// restores the SIGCHLD disposition.
5038    ///
5039    /// Rust drops struct fields in declaration order. The
5040    /// `PayloadHandle` definition (see the `DROP-ORDER-CRITICAL`
5041    /// comment block above the struct) requires `_sigchld` to drop
5042    /// AFTER `child`: while the child is being reaped on the drop
5043    /// path (via `kill_payload_process_group` + `child.wait()` in
5044    /// `Drop for PayloadHandle`), the SIGCHLD disposition must still
5045    /// be `SIG_DFL` so `waitpid` returns the real exit status instead
5046    /// of failing with `ECHILD` under the guest init's `SIGCHLD =
5047    /// SIG_IGN`. If `_sigchld` were dropped before `child`, its
5048    /// `Drop` would re-install the original (potentially `SIG_IGN`)
5049    /// disposition while `child.wait()` was still in flight and the
5050    /// reap could fail.
5051    ///
5052    /// `PayloadHandle::child` is `Option<std::process::Child>`; the
5053    /// `Drop for PayloadHandle` body gates its kill-and-reap path on
5054    /// `if let Some(mut child) = self.child.take()`, so `child:
5055    /// None` is a valid in-test construction whose handle Drop
5056    /// path is a no-op for the child branch but still drops every
5057    /// field in declaration order. This test constructs a real
5058    /// `PayloadHandle` (not a mirror) with `child: None`, a static
5059    /// `Payload`, an empty `checks` vec, and a real `SigchldScope`
5060    /// and asserts that the SIGCHLD disposition is restored to its
5061    /// initial value after the handle drops. Building the real
5062    /// struct exercises the actual field declaration order — a
5063    /// future refactor that drops `_sigchld` from the field list,
5064    /// renames it, or replaces its type compiles ONLY if this test
5065    /// is updated in lock-step.
5066    ///
5067    /// The thread-pin in `SigchldScope::new` requires every
5068    /// construction in the process to come from the SAME thread.
5069    /// Nextest runs each test in its own process (per
5070    /// `.config/nextest.toml`), so this test's thread-of-call wins
5071    /// the pin and other tests' threads are isolated.
5072    #[test]
5073    fn payload_handle_drop_restores_sigchld_disposition() {
5074        // Read the current SIGCHLD disposition non-destructively by
5075        // saving the value `signal()` returns, then immediately
5076        // re-installing it. SAFETY: same as SigchldScope::new — the
5077        // thread-pin guarantees no other thread is racing the
5078        // process-wide disposition install.
5079        fn read_sigchld() -> libc::sighandler_t {
5080            let handler = unsafe { libc::signal(libc::SIGCHLD, libc::SIG_DFL) };
5081            unsafe {
5082                libc::signal(libc::SIGCHLD, handler);
5083            }
5084            handler
5085        }
5086
5087        let initial = read_sigchld();
5088
5089        // Build a real PayloadHandle. `child: None` means the
5090        // `Drop for PayloadHandle` body's `if let Some(...)` arm is
5091        // not taken, so no kill/reap runs — but every field still
5092        // drops in declaration order, so `_sigchld` (declared last)
5093        // restores the SIGCHLD disposition. TRUE_BIN is a `&'static
5094        // Payload` from this test module.
5095        let handle = PayloadHandle {
5096            child: None,
5097            payload: &TRUE_BIN,
5098            checks: Vec::new(),
5099            _sigchld: SigchldScope::new(),
5100        };
5101
5102        // After SigchldScope::new(), live disposition is SIG_DFL.
5103        assert_eq!(
5104            read_sigchld(),
5105            libc::SIG_DFL,
5106            "SigchldScope::new should have installed SIG_DFL while \
5107             the handle is alive",
5108        );
5109
5110        drop(handle);
5111
5112        // After Drop for PayloadHandle runs, every field has been
5113        // dropped — including _sigchld, whose Drop re-installs the
5114        // captured `prev` (= initial). If a future refactor removed
5115        // _sigchld from the struct (or changed its type to one
5116        // whose Drop does not restore the signal), this assertion
5117        // fails because the live disposition would still be
5118        // SIG_DFL, not initial.
5119        assert_eq!(
5120            read_sigchld(),
5121            initial,
5122            "after dropping a real PayloadHandle, live SIGCHLD \
5123             disposition must equal initial ({initial:#x}); if the \
5124             handle's `_sigchld` field was removed, renamed, or \
5125             retyped, the disposition stays at SIG_DFL ({:#x})",
5126            libc::SIG_DFL,
5127        );
5128    }
5129}