ktstr/scenario/
host_stall.rs

1//! Host-mode worker stall detection via `/proc/<pid>/sched` polling.
2//!
3//! When a scenario runs in host-mode (no VM boot — `!is_guest()` AND
4//! `!cargo_test_mode_active()`), the freeze coordinator / KVM-side
5//! stall plumbing is unavailable. This module fills that gap by
6//! polling every worker pid's `/proc/<pid>/sched` file from a
7//! background thread and flagging a "task did not run" condition
8//! when both `nr_switches` and `sum_exec_runtime` are unchanged
9//! across a sliding window of W samples.
10//!
11//! # Signal
12//!
13//! `/proc/<pid>/sched` exposes both `nr_switches` (total context
14//! switches the task has been involved in) and `sum_exec_runtime`
15//! (cumulative on-CPU nanoseconds). Both are emitted unconditionally
16//! by `kernel/sched/debug.c` regardless of `CONFIG_SCHEDSTATS`, so
17//! the signal works on any production kernel.
18//!
19//! `se.statistics.wait_sum` would arguably be a STRONGER signal
20//! (cumulative time waiting in the runqueue — a starved task
21//! grows wait_sum monotonically while a sleeping task does not),
22//! but it IS gated on `CONFIG_SCHEDSTATS=y`. The monitor sticks
23//! with the unconditional `nr_switches + sum_exec_runtime` pair
24//! so it stays useful on minimum-config production kernels;
25//! schedstat-aware schedulers (sched_ext schedulers that read
26//! `wait_sum` via BPF) supplement this with their own latency
27//! probes via the `--ktstr-probe-stack` pipeline.
28//!
29//! Stall heuristic: if `Δnr_switches == 0` AND
30//! `Δsum_exec_runtime == 0` across W consecutive samples, the task
31//! has neither been picked nor preempted for `W * poll_interval` —
32//! a stronger signal than either counter alone (a busy-loop on one
33//! CPU could leave `nr_switches` flat while `sum_exec_runtime`
34//! climbs; a fully starved task pins both).
35//!
36//! # Cadence
37//!
38//! Default poll interval is 500 ms; window size W = 4 yields a 2 s
39//! detection latency. The interval is overridable via the
40//! [`crate::KTSTR_STALL_POLL_MS_ENV`] env var (empty / unset / 0 /
41//! unparseable falls back to the default).
42//!
43//! # Diagnostic capture
44//!
45//! When a stall fires, the polling thread captures a one-shot
46//! diagnostic snapshot from `/proc/<pid>/{wchan, syscall, status,
47//! stack, cgroup}` and `/proc/<pid>/task/<pid>/stat`, plus the
48//! host's `/proc/loadavg`. Each field is read independently and
49//! gracefully degraded to `"[unreadable: <reason>]"` on EACCES /
50//! ENOENT — `/proc/<pid>/stack` requires `CAP_SYS_ADMIN` and is
51//! typically absent for unprivileged callers, so its absence is
52//! not a failure.
53
54use std::collections::VecDeque;
55use std::sync::atomic::{AtomicBool, Ordering};
56use std::sync::{Arc, Mutex};
57use std::thread::{self, JoinHandle};
58use std::time::{Duration, Instant};
59
60use anyhow::Result;
61
62use crate::KTSTR_STALL_POLL_MS_ENV;
63
64/// Default poll cadence when [`KTSTR_STALL_POLL_MS_ENV`] is unset /
65/// empty / 0 / unparseable. 500 ms × W=4 yields a 2 s detection
66/// latency — short enough to catch a stuck scheduler within a
67/// typical ktstr test duration, long enough that procfs reads stay
68/// O(workers) per second rather than swamping the host.
69pub const DEFAULT_POLL_INTERVAL_MS: u64 = 500;
70
71/// Sliding-window size: number of consecutive flat samples that
72/// flip the stall predicate. W=4 with [`DEFAULT_POLL_INTERVAL_MS`]
73/// = 2 s detection latency. Constant rather than env-tunable —
74/// the operator already controls latency via the poll interval,
75/// and a smaller W would false-positive on transient idle.
76///
77/// # False-positive on slow-period workloads
78///
79/// A worker that legitimately runs once per `interval * W`
80/// (e.g. a periodic 10 s sleep tracker on a 2 s poll) will look
81/// "stuck" for the full W-sample window because both counters
82/// stay flat between the worker's wakeups. The fire is a true
83/// "no forward progress" observation — the operator distinguishes
84/// false-positive (intentional slow period) from true stall
85/// (kernel-side hang) via the [`StallDiagnostic::wchan`] field on
86/// the report: a healthy slow-period worker shows a `do_nanosleep`
87/// / `pipe_read` / `epoll_wait` wchan, while a true stall shows
88/// the offending kernel function. Per the no-silent-drops
89/// policy the framework opts for the loud-fire path rather than
90/// guessing the worker's intended cadence.
91pub const STALL_WINDOW: usize = 4;
92
93/// Snapshot of the two scheduler counters this monitor watches.
94///
95/// Mirrors what `kernel/sched/debug.c::proc_sched_show_task`
96/// writes to `/proc/<pid>/sched`. Both fields are cumulative since task
97/// creation; the monitor tracks deltas between consecutive
98/// samples rather than absolute values.
99#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
100pub struct SchedSample {
101    /// `nr_switches` from `/proc/<pid>/sched` — total voluntary +
102    /// involuntary context switches involving this task.
103    pub nr_switches: u64,
104    /// `sum_exec_runtime` from `/proc/<pid>/sched` — cumulative
105    /// on-CPU runtime in nanoseconds.
106    pub sum_exec_runtime_ns: u64,
107    /// Instant the sample was captured. Stored alongside the
108    /// counters so a downstream consumer can compute wall-clock
109    /// deltas without re-deriving the poll cadence. Not serialized
110    /// — Instant is a monotonic-clock opaque value with no portable
111    /// wire form; serde consumers reading a sidecar dump will see
112    /// `Instant::now()` as the default. The wall-clock context for
113    /// the report is carried separately by [`StallReport::captured_at`]
114    /// (also `#[serde(skip)]` for the same reason; sidecar dump
115    /// consumers anchoring across runs should pair the report with
116    /// the run's start timestamp from elsewhere).
117    #[serde(skip, default = "Instant::now")]
118    pub captured_at: Instant,
119}
120
121/// One-shot diagnostic snapshot captured at stall-trip time.
122///
123/// Each field is the contents of a `/proc/<pid>/<field>` file
124/// (or a stand-in describing why the read failed). Field
125/// extraction is best-effort: a missing `/proc/<pid>/stack`
126/// (requires `CAP_SYS_ADMIN`) does NOT block the diagnostic;
127/// every field carries its own `"[unreadable: <reason>]"` stand-in
128/// so an operator triaging the stall can tell apart "kernel
129/// didn't expose it" from "monitor failed to read it".
130#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
131pub struct StallDiagnostic {
132    /// `/proc/<pid>/wchan` — kernel symbol the task is sleeping
133    /// in, or `0` when the task is runnable / wchan is unresolvable.
134    pub wchan: String,
135    /// `/proc/<pid>/syscall` — first field is the syscall number
136    /// the task is blocked in (or `running` when on-CPU).
137    pub syscall: String,
138    /// `/proc/<pid>/status` task-state line ("State: S
139    /// (sleeping)"). Parsed down to the single-letter state
140    /// code; the full status file is preserved in `status_full`.
141    pub state: String,
142    /// `/proc/<pid>/stack` — kernel-stack trace; commonly
143    /// unreadable without `CAP_SYS_ADMIN`. `None` when absent
144    /// (versus an unreadable string in the string fields).
145    pub stack: Option<String>,
146    /// `/proc/<pid>/status` raw — full text for fields beyond
147    /// `State:` the operator may want to inspect (Cpus_allowed,
148    /// Threads, ctxt counts).
149    pub status_full: String,
150    /// `/proc/<pid>/cgroup` — v2 path the task belongs to.
151    pub cgroup: String,
152    /// Host's `/proc/loadavg` at trip time — useful for ruling
153    /// out "stall" caused by extreme host load rather than
154    /// scheduler misbehavior.
155    pub host_loadavg: String,
156}
157
158/// One stall report: a worker pid plus the sample window that
159/// triggered the stall predicate plus the captured diagnostic.
160///
161/// Pushed onto `StallMonitor::reports` by the polling thread the
162/// moment the predicate fires. The thread continues running so
163/// subsequent stalls on the same pid (or other pids) also surface,
164/// but a pid is "re-armed" only after observing forward progress
165/// (any non-zero delta) so a permanently-stuck task fires once and
166/// then stays silent until it moves again — preventing a single
167/// stall from spamming the report list every poll cycle.
168#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
169pub struct StallReport {
170    /// The stalled worker's pid.
171    pub pid: libc::pid_t,
172    /// `comm` field from `/proc/<pid>/comm` at trip time — the
173    /// task's name (`SpinWait_0`, etc.) so an operator can map
174    /// the pid back to a scenario WorkSpec without inspecting
175    /// the diagnostic's full status file.
176    pub comm: String,
177    /// The sample window that triggered the predicate. Length
178    /// equals [`STALL_WINDOW`] at the moment of fire; later
179    /// samples are NOT appended (each report is a snapshot, not
180    /// a rolling view).
181    pub samples: Vec<SchedSample>,
182    /// Wall-clock instant the predicate fired. Not serialized —
183    /// see [`SchedSample::captured_at`] for the rationale.
184    #[serde(skip, default = "Instant::now")]
185    pub captured_at: Instant,
186    /// Diagnostic snapshot captured immediately after the trip.
187    pub diagnostic: StallDiagnostic,
188}
189
190/// Background polling thread + shared report buffer.
191///
192/// Framework-internal — the test author never constructs a
193/// `StallMonitor` directly; the scenario engine spawns one via
194/// [`spawn_monitor`] in `apply_setup` when running host-mode.
195/// Drop the [`StallMonitorHandle`] to stop polling and join the
196/// thread.
197pub(crate) struct StallMonitor {
198    /// Shutdown flag — flipped to `true` by [`StallMonitorHandle::drop`]
199    /// to terminate the polling loop at the next sleep boundary.
200    shutdown: Arc<AtomicBool>,
201    /// Accumulated stall reports. The polling thread appends; the
202    /// owner drains via [`StallMonitorHandle::drain`].
203    reports: Arc<Mutex<Vec<StallReport>>>,
204}
205
206/// Owned handle returned by [`spawn_monitor`]. Dropping joins the
207/// polling thread; calling [`Self::drain`] returns accumulated
208/// reports before the drop point.
209///
210/// Framework-internal — held by `StepState.stall_monitor` for the
211/// per-step lifetime; test authors don't construct or hand out
212/// handles.
213#[must_use = "StallMonitorHandle stops polling on Drop; bind it to a local for the scenario lifetime"]
214pub(crate) struct StallMonitorHandle {
215    monitor: StallMonitor,
216    thread: Option<JoinHandle<()>>,
217}
218
219impl StallMonitorHandle {
220    /// Take ownership of every report accumulated so far.
221    /// Returns an empty Vec when no stalls have been observed.
222    /// Safe to call any number of times.
223    pub(crate) fn drain(&self) -> Vec<StallReport> {
224        let mut guard = self
225            .monitor
226            .reports
227            .lock()
228            .expect("stall-monitor reports mutex poisoned");
229        std::mem::take(&mut *guard)
230    }
231}
232
233impl Drop for StallMonitorHandle {
234    fn drop(&mut self) {
235        // Flip the shutdown flag so the polling loop exits at the
236        // next sleep boundary, then join the thread. The thread
237        // sleeps at most `poll_interval_ms`, so the join latency is
238        // bounded.
239        self.monitor.shutdown.store(true, Ordering::SeqCst);
240        if let Some(handle) = self.thread.take() {
241            // Join errors only on thread panic; log and continue —
242            // a panicked thread already has the shutdown flag
243            // honored implicitly (it's no longer running).
244            if let Err(e) = handle.join() {
245                tracing::warn!(?e, "stall-monitor polling thread panicked");
246            }
247        }
248    }
249}
250
251/// Spawn a background polling thread that watches each pid in
252/// `pids` for stalls. Returns a [`StallMonitorHandle`] whose
253/// [`StallMonitorHandle::drain`] yields the accumulated reports
254/// and whose `Drop` stops the thread.
255///
256/// The poll cadence is read from [`crate::KTSTR_STALL_POLL_MS_ENV`]
257/// (empty / unset / 0 / unparseable → [`DEFAULT_POLL_INTERVAL_MS`]).
258///
259/// Empty `pids` is accepted: the polling thread starts but its
260/// per-iteration loop is a no-op. The caller is expected to gate
261/// the `spawn_monitor` call on `!pids.is_empty()` for the common
262/// case so no thread is spawned at all; the no-op branch exists so
263/// the constructor itself is total.
264pub(crate) fn spawn_monitor(pids: &[libc::pid_t]) -> Result<StallMonitorHandle> {
265    let interval = resolve_poll_interval();
266    let pids: Vec<libc::pid_t> = pids.to_vec();
267    let shutdown = Arc::new(AtomicBool::new(false));
268    let reports: Arc<Mutex<Vec<StallReport>>> = Arc::new(Mutex::new(Vec::new()));
269
270    let thread_shutdown = Arc::clone(&shutdown);
271    let thread_reports = Arc::clone(&reports);
272
273    let thread = thread::Builder::new()
274        .name("ktstr-stall-mon".to_string())
275        .spawn(move || {
276            poll_loop(pids, interval, thread_shutdown, thread_reports);
277        })
278        .map_err(|e| anyhow::anyhow!("failed to spawn stall-monitor thread: {e}"))?;
279
280    Ok(StallMonitorHandle {
281        monitor: StallMonitor { shutdown, reports },
282        thread: Some(thread),
283    })
284}
285
286/// Resolve the poll interval from [`KTSTR_STALL_POLL_MS_ENV`].
287///
288/// Contract: empty / unset / `0` / unparseable falls back to
289/// [`DEFAULT_POLL_INTERVAL_MS`]. Any positive `u64` value is honored
290/// verbatim. Mirrors the empty-string-as-unset contract documented
291/// on the env-var const.
292fn resolve_poll_interval() -> Duration {
293    let ms = std::env::var(KTSTR_STALL_POLL_MS_ENV)
294        .ok()
295        .filter(|v| !v.is_empty())
296        .and_then(|v| v.trim().parse::<u64>().ok())
297        .filter(|&n| n > 0)
298        .unwrap_or(DEFAULT_POLL_INTERVAL_MS);
299    Duration::from_millis(ms)
300}
301
302/// Polling loop body. Runs until `shutdown` flips. Each iteration
303/// samples every pid in `pids`, advances per-pid sliding windows,
304/// and appends a [`StallReport`] when the predicate trips.
305fn poll_loop(
306    pids: Vec<libc::pid_t>,
307    interval: Duration,
308    shutdown: Arc<AtomicBool>,
309    reports: Arc<Mutex<Vec<StallReport>>>,
310) {
311    // Per-pid state: the sliding sample window plus an "armed"
312    // flag that prevents a permanently-stuck pid from spamming the
313    // report list every iteration. The pid stays "disarmed" until
314    // forward progress is observed (any non-zero delta in either
315    // counter), at which point it re-arms and can fire again on
316    // the next stall window.
317    let mut windows: Vec<(libc::pid_t, VecDeque<SchedSample>, bool)> = pids
318        .iter()
319        .map(|&p| (p, VecDeque::with_capacity(STALL_WINDOW), true))
320        .collect();
321
322    while !shutdown.load(Ordering::SeqCst) {
323        for (pid, window, armed) in windows.iter_mut() {
324            let sample = match read_sched_sample(*pid) {
325                Some(s) => s,
326                // pid has exited or /proc/<pid>/sched is unreadable
327                // — clear the window so we don't false-fire on a
328                // pre-exit flat tail, and continue.
329                None => {
330                    window.clear();
331                    continue;
332                }
333            };
334            if process_iteration(sample, window, armed) {
335                let samples: Vec<SchedSample> = window.iter().copied().collect();
336                let comm =
337                    read_comm(*pid).unwrap_or_else(|reason| format!("[unreadable: {reason}]"));
338                let diagnostic = capture_diagnostic(*pid);
339                let report = StallReport {
340                    pid: *pid,
341                    comm,
342                    samples,
343                    captured_at: Instant::now(),
344                    diagnostic,
345                };
346                {
347                    let mut guard = reports
348                        .lock()
349                        .expect("stall-monitor reports mutex poisoned");
350                    guard.push(report);
351                }
352            }
353        }
354        // Sleep `interval` total but honor shutdown promptly: chunk
355        // the sleep into 50 ms slices so the worst-case shutdown
356        // latency is ~50 ms instead of `interval`. The cap exists
357        // because `thread::sleep` is uninterruptible from the
358        // outside — a sole `thread::sleep(interval)` for an
359        // interval of (say) 5 s would block shutdown for up to 5
360        // seconds; the 50 ms cap keeps shutdown latency bounded
361        // independently of the polling cadence.
362        let chunk = Duration::from_millis(50).min(interval);
363        let mut remaining = interval;
364        while remaining > Duration::ZERO && !shutdown.load(Ordering::SeqCst) {
365            let slice = chunk.min(remaining);
366            thread::sleep(slice);
367            remaining = remaining.saturating_sub(slice);
368        }
369    }
370}
371
372/// Per-pid polling-iteration state update.
373///
374/// Folds the four-step transition the poll loop performs for each
375/// pid each iteration into a single testable function:
376///
377/// 1. Push `sample` onto `window`.
378/// 2. Evict the oldest sample if the window now exceeds
379///    [`STALL_WINDOW`].
380/// 3. Re-arm `armed` when the two most recent samples show
381///    forward progress in either counter (`nr_switches` or
382///    `sum_exec_runtime_ns`).
383/// 4. When `armed` AND [`stall_predicate`] fires, return `true`
384///    (caller emits a [`StallReport`]) and disarm.
385///
386/// Returns `true` when this iteration produced a stall — the
387/// caller is expected to construct a [`StallReport`] from the
388/// window contents and the pid. Returns `false` otherwise.
389///
390/// Extracted from [`poll_loop`] so unit tests can exercise the
391/// re-arm + spawn-gate semantics without spinning up the polling
392/// thread (which would require real PIDs + readable
393/// `/proc/<pid>/sched`).
394pub(crate) fn process_iteration(
395    sample: SchedSample,
396    window: &mut VecDeque<SchedSample>,
397    armed: &mut bool,
398) -> bool {
399    window.push_back(sample);
400    while window.len() > STALL_WINDOW {
401        window.pop_front();
402    }
403    // Re-arm whenever forward progress is observed between the
404    // two most recent samples — even if armed already (cheap
405    // branch).
406    if window.len() >= 2 {
407        let last = window[window.len() - 1];
408        let prev = window[window.len() - 2];
409        if last.nr_switches != prev.nr_switches
410            || last.sum_exec_runtime_ns != prev.sum_exec_runtime_ns
411        {
412            *armed = true;
413        }
414    }
415    if *armed && stall_predicate(window.make_contiguous()) {
416        *armed = false;
417        true
418    } else {
419        false
420    }
421}
422
423/// Stall predicate: returns `true` when `samples.len() >=
424/// STALL_WINDOW` AND every consecutive pair has both `nr_switches`
425/// delta == 0 AND `sum_exec_runtime_ns` delta == 0. A window
426/// shorter than [`STALL_WINDOW`] never fires (insufficient signal).
427///
428/// Extracted as a free function so a unit test can exercise the
429/// predicate directly with synthetic [`SchedSample`] sequences
430/// without spinning up the polling thread.
431pub fn stall_predicate(samples: &[SchedSample]) -> bool {
432    if samples.len() < STALL_WINDOW {
433        return false;
434    }
435    for pair in samples.windows(2) {
436        if pair[0].nr_switches != pair[1].nr_switches
437            || pair[0].sum_exec_runtime_ns != pair[1].sum_exec_runtime_ns
438        {
439            return false;
440        }
441    }
442    true
443}
444
445/// Parse `/proc/<pid>/sched` content into a [`SchedSample`].
446///
447/// The kernel format (per `kernel/sched/debug.c::proc_sched_show_task`)
448/// is:
449///
450/// ```text
451/// <comm> (<pid>, #threads: N)
452/// ---------------------------
453/// se.exec_start                  :       12345.6789
454/// ...
455/// nr_switches                    :              42
456/// nr_voluntary_switches          :              30
457/// ...
458/// se.sum_exec_runtime            :        1234.567890
459/// ...
460/// ```
461///
462/// Each "key : value" line uses arbitrary whitespace around the
463/// `:`. The parser extracts the two named keys and ignores
464/// everything else; missing keys yield `None`.
465///
466/// `se.sum_exec_runtime` is emitted as fractional milliseconds by
467/// `kernel/sched/debug.c::proc_sched_show_task` via the `PN` macro
468/// (`SPLIT_NS` divides the ns value by 1_000_000 and formats with 6
469/// decimal places). This parser reads it as an f64 in ms and
470/// multiplies by 1_000_000 to recover nanoseconds.
471pub fn parse_sched_file(content: &str) -> Option<(u64, u64)> {
472    let mut nr_switches: Option<u64> = None;
473    let mut sum_exec_runtime_ns: Option<u64> = None;
474    for line in content.lines() {
475        // Lines we care about are `<key> : <value>`. Skip the
476        // header and dashed separator.
477        let Some((key, value)) = line.split_once(':') else {
478            continue;
479        };
480        let key = key.trim();
481        let value = value.trim();
482        match key {
483            "nr_switches" => {
484                nr_switches = value.parse::<u64>().ok();
485            }
486            "se.sum_exec_runtime" => {
487                // Kernel emits as fractional milliseconds (e.g.
488                // `1234.567890`) per `kernel/sched/debug.c`'s
489                // `SPLIT_NS` macro — that's `value_ns / 1_000_000`
490                // formatted with 6 decimal places. Recover ns by
491                // parsing as f64 and multiplying back.
492                //
493                // Key carries the `se.` prefix because the kernel
494                // formats this field via the `PN(se.sum_exec_runtime)`
495                // macro at `kernel/sched/debug.c`'s
496                // `proc_sched_show_task` — `PN` stringifies the FULL
497                // expression including the `se.` qualifier. Matching
498                // the bare `sum_exec_runtime` (without prefix) would
499                // silently drop every real-kernel sample.
500                sum_exec_runtime_ns = value
501                    .parse::<f64>()
502                    .ok()
503                    .map(|ms| (ms * 1_000_000.0) as u64);
504            }
505            _ => {}
506        }
507        if nr_switches.is_some() && sum_exec_runtime_ns.is_some() {
508            break;
509        }
510    }
511    match (nr_switches, sum_exec_runtime_ns) {
512        (Some(n), Some(r)) => Some((n, r)),
513        _ => None,
514    }
515}
516
517/// Read `/proc/<pid>/sched` and return a [`SchedSample`]. Returns
518/// `None` when the file is unreadable (pid exited, EACCES, parse
519/// failure).
520fn read_sched_sample(pid: libc::pid_t) -> Option<SchedSample> {
521    let content = std::fs::read_to_string(format!("/proc/{pid}/sched")).ok()?;
522    let (nr_switches, sum_exec_runtime_ns) = parse_sched_file(&content)?;
523    Some(SchedSample {
524        nr_switches,
525        sum_exec_runtime_ns,
526        captured_at: Instant::now(),
527    })
528}
529
530/// Read `/proc/<pid>/comm` and return the task name with the
531/// trailing newline stripped. Returns the IO error's `to_string`
532/// when the read fails.
533fn read_comm(pid: libc::pid_t) -> std::result::Result<String, String> {
534    std::fs::read_to_string(format!("/proc/{pid}/comm"))
535        .map(|s| s.trim_end_matches('\n').to_string())
536        .map_err(|e| e.to_string())
537}
538
539/// Capture a [`StallDiagnostic`] for a stalled pid. Every field is
540/// read independently; an unreadable field becomes a
541/// `"[unreadable: <reason>]"` stand-in rather than aborting the
542/// snapshot — `/proc/<pid>/stack` in particular is privileged and
543/// commonly absent.
544fn capture_diagnostic(pid: libc::pid_t) -> StallDiagnostic {
545    let wchan = read_proc_field(pid, "wchan");
546    let syscall = read_proc_field(pid, "syscall");
547    let status_full = read_proc_field(pid, "status");
548    let state = extract_state_letter(&status_full);
549    let cgroup = read_proc_field(pid, "cgroup");
550    // `/proc/<pid>/stack` is privileged; treat absence as the
551    // expected case (None) rather than a read failure.
552    let stack = std::fs::read_to_string(format!("/proc/{pid}/stack")).ok();
553    let host_loadavg = std::fs::read_to_string("/proc/loadavg")
554        .map(|s| s.trim_end_matches('\n').to_string())
555        .unwrap_or_else(|e| format!("[unreadable: {e}]"));
556    StallDiagnostic {
557        wchan,
558        syscall,
559        state,
560        stack,
561        status_full,
562        cgroup,
563        host_loadavg,
564    }
565}
566
567/// Read `/proc/<pid>/<field>` and return its contents trimmed.
568/// Failures (EACCES, ENOENT) become a `"[unreadable: <reason>]"`
569/// stand-in so the diagnostic always carries a value per field —
570/// the operator triaging a stall can tell apart "kernel didn't
571/// expose it" from "monitor failed to read it".
572fn read_proc_field(pid: libc::pid_t, field: &str) -> String {
573    match std::fs::read_to_string(format!("/proc/{pid}/{field}")) {
574        Ok(s) => s.trim_end_matches('\n').to_string(),
575        Err(e) => format!("[unreadable: {e}]"),
576    }
577}
578
579/// Parse the `State:` line out of a `/proc/<pid>/status` body and
580/// return just the single-letter state code (e.g. `"S"` for
581/// sleeping, `"R"` for running, `"D"` for uninterruptible sleep).
582/// Falls back to `"?"` when the line is absent or malformed.
583fn extract_state_letter(status: &str) -> String {
584    for line in status.lines() {
585        if let Some(rest) = line.strip_prefix("State:") {
586            let rest = rest.trim();
587            // Format: `<letter> (<description>)` per
588            // `fs/proc/array.c::task_state_array`. Take the first
589            // whitespace-delimited token.
590            if let Some(letter) = rest.split_whitespace().next() {
591                return letter.to_string();
592            }
593        }
594    }
595    "?".to_string()
596}
597
598#[cfg(test)]
599mod tests {
600    use super::*;
601
602    /// `parse_sched_file` recovers both counters from a realistic
603    /// `/proc/<pid>/sched` fragment containing the headers and
604    /// extra fields the kernel emits. Pins the parser against the
605    /// arbitrary-whitespace + interleaved-other-lines reality of
606    /// kernel output, AND against the exact `se.sum_exec_runtime`
607    /// key the kernel emits (prefix included).
608    #[test]
609    fn parse_sched_file_extracts_signals() {
610        let content = "\
611worker_0 (12345, #threads: 1)
612-------------------------------------------------------------------
613se.exec_start                                :        123456789.123456
614se.vruntime                                  :              789.012345
615se.sum_exec_runtime                          :          1234.567890
616nr_migrations                                :                       7
617nr_switches                                  :                      42
618nr_voluntary_switches                        :                      30
619nr_involuntary_switches                      :                      12
620clock-delta                                  :                       0
621";
622        let parsed = parse_sched_file(content).expect("both fields present");
623        assert_eq!(parsed.0, 42, "nr_switches");
624        // 1234.567890 ms = 1_234_567_890 ns.
625        assert_eq!(parsed.1, 1_234_567_890, "sum_exec_runtime in ns");
626    }
627
628    /// Defense-in-depth against kernel-key-format regression: read
629    /// the LIVE `/proc/self/sched` for the test process and verify
630    /// the parser extracts plausible counters. A future
631    /// kernel-format change (e.g. dropping the `se.` prefix,
632    /// renaming `nr_switches`, switching to a non-SPLIT_NS time
633    /// format) would fail this test loudly instead of silently
634    /// disabling the monitor.
635    #[test]
636    fn parse_sched_file_handles_live_proc_self_sched() {
637        let Ok(content) = std::fs::read_to_string("/proc/self/sched") else {
638            // /proc not available (sandboxed test runner without
639            // procfs) — skip rather than fail. The fixture-based
640            // test above already covers the parser; this test's
641            // value is defense against real-kernel drift.
642            return;
643        };
644        let parsed = parse_sched_file(&content)
645            .expect("live /proc/self/sched MUST parse — kernel-format regression");
646        // nr_switches MUST be >= 1 by the time this test runs (the
647        // process has at least spawned + scheduled + read the file).
648        assert!(
649            parsed.0 >= 1,
650            "live nr_switches must be >= 1, got {}",
651            parsed.0
652        );
653        // sum_exec_runtime_ns MUST be > 0 — process has accumulated
654        // some CPU time to reach this assertion.
655        assert!(
656            parsed.1 > 0,
657            "live sum_exec_runtime_ns must be > 0, got {}",
658            parsed.1
659        );
660    }
661
662    fn s(nr: u64, ns: u64) -> SchedSample {
663        SchedSample {
664            nr_switches: nr,
665            sum_exec_runtime_ns: ns,
666            captured_at: Instant::now(),
667        }
668    }
669
670    /// Window with W consecutive identical samples → predicate
671    /// fires. Pins the core stall detection contract.
672    #[test]
673    fn stall_predicate_fires_after_w_samples_no_delta() {
674        let samples: Vec<SchedSample> = (0..STALL_WINDOW).map(|_| s(100, 5_000)).collect();
675        assert!(
676            stall_predicate(&samples),
677            "all-flat window of W samples must fire"
678        );
679    }
680
681    /// Window with ANY consecutive-pair delta > 0 → predicate
682    /// does NOT fire, even if every other pair is flat. Pins the
683    /// "any progress disqualifies" semantic.
684    #[test]
685    fn stall_predicate_skips_when_delta_present() {
686        // Three flat samples then one with a switch — the
687        // last-pair delta is non-zero so predicate stays false.
688        let samples = vec![s(100, 5_000), s(100, 5_000), s(100, 5_000), s(101, 5_000)];
689        assert!(
690            !stall_predicate(&samples),
691            "any non-zero delta in any consecutive pair must keep predicate false",
692        );
693        // Same for an exec_runtime move.
694        let samples = vec![s(100, 5_000), s(100, 5_000), s(100, 5_001), s(100, 5_001)];
695        assert!(
696            !stall_predicate(&samples),
697            "exec_runtime delta in any pair must keep predicate false",
698        );
699        // And a window shorter than W never fires regardless.
700        let short: Vec<SchedSample> = (0..STALL_WINDOW - 1).map(|_| s(100, 5_000)).collect();
701        assert!(
702            !stall_predicate(&short),
703            "window shorter than STALL_WINDOW must not fire (insufficient signal)",
704        );
705    }
706
707    /// `capture_diagnostic` against pid 0 (an invalid pid that
708    /// fails every read) returns a populated [`StallDiagnostic`]
709    /// where every string field carries the `"[unreadable: ...]"`
710    /// stand-in and `stack` is `None`. Pins the
711    /// graceful-degradation contract: a privileged or missing
712    /// field never aborts the snapshot.
713    #[test]
714    fn diagnostic_capture_skips_unreadable_fields() {
715        // pid 0 is reserved and /proc/0/* never exists.
716        let diag = capture_diagnostic(0);
717        assert!(
718            diag.wchan.starts_with("[unreadable:"),
719            "wchan must degrade: got {:?}",
720            diag.wchan,
721        );
722        assert!(
723            diag.syscall.starts_with("[unreadable:"),
724            "syscall must degrade: got {:?}",
725            diag.syscall,
726        );
727        assert!(
728            diag.status_full.starts_with("[unreadable:"),
729            "status must degrade: got {:?}",
730            diag.status_full,
731        );
732        assert!(
733            diag.cgroup.starts_with("[unreadable:"),
734            "cgroup must degrade: got {:?}",
735            diag.cgroup,
736        );
737        // status_full degraded → state extraction yields "?".
738        assert_eq!(diag.state, "?", "unreadable status → state = ?");
739        // /proc/0/stack is also absent.
740        assert!(diag.stack.is_none(), "missing stack must remain None");
741        // host_loadavg should succeed on any healthy Linux host
742        // even when the pid is bogus; either it reads or carries
743        // the unreadable stand-in.
744        assert!(
745            !diag.host_loadavg.is_empty(),
746            "host_loadavg must always populate (success OR stand-in)",
747        );
748    }
749
750    /// Sliding window honors [`STALL_WINDOW`]: once full, the
751    /// oldest sample is evicted as new ones arrive, so the
752    /// predicate always sees the most-recent W samples. Pins the
753    /// "predicate sees last W" guarantee.
754    #[test]
755    fn ring_buffer_sliding_window_correctness() {
756        // Simulate the poll loop's window management without
757        // spinning the actual thread: push samples through a
758        // VecDeque with the same eviction policy and verify the
759        // contents.
760        let mut window: VecDeque<SchedSample> = VecDeque::with_capacity(STALL_WINDOW);
761        // Push 6 samples (more than W=4): values 0..6.
762        for i in 0..(STALL_WINDOW + 2) {
763            window.push_back(s(i as u64, i as u64 * 10));
764            while window.len() > STALL_WINDOW {
765                window.pop_front();
766            }
767        }
768        assert_eq!(
769            window.len(),
770            STALL_WINDOW,
771            "window size must stay at STALL_WINDOW after overflow",
772        );
773        // The oldest two were evicted; the head should be sample
774        // index 2 (out of 0..6 pushed).
775        let head = window.front().expect("window non-empty");
776        assert_eq!(
777            head.nr_switches, 2,
778            "oldest sample must be index 2 after 2 evictions"
779        );
780        let tail = window.back().expect("window non-empty");
781        assert_eq!(
782            tail.nr_switches,
783            (STALL_WINDOW + 1) as u64,
784            "newest sample must be the last pushed (index W+1)",
785        );
786        // None of these consecutive pairs is flat (every pair
787        // differs by 1), so the predicate must NOT fire even
788        // though the window is full.
789        let snap: Vec<SchedSample> = window.iter().copied().collect();
790        assert!(
791            !stall_predicate(&snap),
792            "monotonic samples must not trip predicate"
793        );
794    }
795
796    // -- process_iteration tests --
797    //
798    // Pin the per-pid state machine extracted from poll_loop:
799    // re-arm after stall-then-resume, spawn-gate behavior
800    // when the initial window is empty, and the fire-disarm
801    // sequence so a permanently-stuck pid produces ONE report
802    // per stall window, not one per polling iteration.
803
804    /// Spawn-gate behavior: an initial window starts armed but
805    /// empty. The first STALL_WINDOW samples can't fire (predicate
806    /// requires >= STALL_WINDOW samples). Only the STALL_WINDOW'th
807    /// flat sample can trip the stall.
808    #[test]
809    fn process_iteration_spawn_gate_short_window_never_fires() {
810        let mut window: VecDeque<SchedSample> = VecDeque::with_capacity(STALL_WINDOW);
811        let mut armed = true;
812        // Feed STALL_WINDOW - 1 identical samples; predicate
813        // sees a short window every iteration and stays false.
814        for _ in 0..(STALL_WINDOW - 1) {
815            assert!(
816                !process_iteration(s(100, 5_000), &mut window, &mut armed),
817                "short window must not fire (spawn-gate semantic)",
818            );
819        }
820        assert!(armed, "no resume seen → stays armed");
821        // The STALL_WINDOW'th identical sample fills the window
822        // → predicate fires.
823        assert!(
824            process_iteration(s(100, 5_000), &mut window, &mut armed),
825            "Wth flat sample fills window AND trips predicate → fire",
826        );
827        assert!(!armed, "fire path disarms");
828    }
829
830    /// Re-arm after stall-then-resume: a fired stall disarms
831    /// the pid; resume (forward progress) re-arms; a subsequent
832    /// flat window fires AGAIN. A regression that
833    /// dropped the re-arm branch would never produce the second
834    /// report, silently hiding recurrent stalls.
835    #[test]
836    fn process_iteration_rearm_after_stall_then_resume() {
837        let mut window: VecDeque<SchedSample> = VecDeque::with_capacity(STALL_WINDOW);
838        let mut armed = true;
839        // Fill window with flat samples → first stall fires.
840        for _ in 0..STALL_WINDOW {
841            process_iteration(s(100, 5_000), &mut window, &mut armed);
842        }
843        assert!(!armed, "after first fire, disarmed");
844        // Resume: feed a sample with forward progress.
845        // Predicate stays false (most-recent pair has delta), and
846        // the re-arm branch flips armed back to true.
847        assert!(
848            !process_iteration(s(101, 5_001), &mut window, &mut armed),
849            "resume sample must not fire (last pair has delta)",
850        );
851        assert!(armed, "resume sample must re-arm");
852        // Re-stall: feed STALL_WINDOW identical samples. Need to
853        // overwrite the resume sample first, so STALL_WINDOW more
854        // flat samples are needed to refill the window.
855        let mut second_fire_iter = None;
856        for i in 0..STALL_WINDOW {
857            if process_iteration(s(101, 5_001), &mut window, &mut armed) {
858                second_fire_iter = Some(i);
859                break;
860            }
861        }
862        assert!(
863            second_fire_iter.is_some(),
864            "second stall window must fire after re-arm; got no fire across {} iters",
865            STALL_WINDOW,
866        );
867        assert!(!armed, "second fire disarms");
868    }
869
870    /// Permanently-stuck pid produces EXACTLY ONE report per
871    /// stall window, not one per polling iteration. Pins the
872    /// "stays disarmed until forward progress" semantic so a
873    /// hung process doesn't spam the reports vec across hundreds
874    /// of iterations.
875    #[test]
876    fn process_iteration_permanent_stall_fires_only_once() {
877        let mut window: VecDeque<SchedSample> = VecDeque::with_capacity(STALL_WINDOW);
878        let mut armed = true;
879        // Fill window + fire (one report).
880        let mut fire_count = 0;
881        for _ in 0..STALL_WINDOW {
882            if process_iteration(s(100, 5_000), &mut window, &mut armed) {
883                fire_count += 1;
884            }
885        }
886        assert_eq!(fire_count, 1, "first window MUST fire exactly once");
887        // 100 more flat samples — disarmed, predicate keeps
888        // matching but fire path is gated on `armed`.
889        for _ in 0..100 {
890            if process_iteration(s(100, 5_000), &mut window, &mut armed) {
891                fire_count += 1;
892            }
893        }
894        assert_eq!(
895            fire_count, 1,
896            "permanently-stuck pid must NOT spam reports — exactly one fire across many iters",
897        );
898    }
899}