ktstr/scenario/host_stall.rs
1//! Host-mode worker stall detection via `/proc/<pid>/sched` polling.
2//!
3//! When a scenario runs in host-mode (no VM boot — `!is_guest()` AND
4//! `!cargo_test_mode_active()`), the freeze coordinator / KVM-side
5//! stall plumbing is unavailable. This module fills that gap by
6//! polling every worker pid's `/proc/<pid>/sched` file from a
7//! background thread and flagging a "task did not run" condition
8//! when both `nr_switches` and `sum_exec_runtime` are unchanged
9//! across a sliding window of W samples.
10//!
11//! # Signal
12//!
13//! `/proc/<pid>/sched` exposes both `nr_switches` (total context
14//! switches the task has been involved in) and `sum_exec_runtime`
15//! (cumulative on-CPU nanoseconds). Both are emitted unconditionally
16//! by `kernel/sched/debug.c` regardless of `CONFIG_SCHEDSTATS`, so
17//! the signal works on any production kernel.
18//!
19//! `se.statistics.wait_sum` would arguably be a STRONGER signal
20//! (cumulative time waiting in the runqueue — a starved task
21//! grows wait_sum monotonically while a sleeping task does not),
22//! but it IS gated on `CONFIG_SCHEDSTATS=y`. The monitor sticks
23//! with the unconditional `nr_switches + sum_exec_runtime` pair
24//! so it stays useful on minimum-config production kernels;
25//! schedstat-aware schedulers (sched_ext schedulers that read
26//! `wait_sum` via BPF) supplement this with their own latency
27//! probes via the `--ktstr-probe-stack` pipeline.
28//!
29//! Stall heuristic: if `Δnr_switches == 0` AND
30//! `Δsum_exec_runtime == 0` across W consecutive samples, the task
31//! has neither been picked nor preempted for `W * poll_interval` —
32//! a stronger signal than either counter alone (a busy-loop on one
33//! CPU could leave `nr_switches` flat while `sum_exec_runtime`
34//! climbs; a fully starved task pins both).
35//!
36//! # Cadence
37//!
38//! Default poll interval is 500 ms; window size W = 4 yields a 2 s
39//! detection latency. The interval is overridable via the
40//! [`crate::KTSTR_STALL_POLL_MS_ENV`] env var (empty / unset / 0 /
41//! unparseable falls back to the default).
42//!
43//! # Diagnostic capture
44//!
45//! When a stall fires, the polling thread captures a one-shot
46//! diagnostic snapshot from `/proc/<pid>/{wchan, syscall, status,
47//! stack, cgroup}` and `/proc/<pid>/task/<pid>/stat`, plus the
48//! host's `/proc/loadavg`. Each field is read independently and
49//! gracefully degraded to `"[unreadable: <reason>]"` on EACCES /
50//! ENOENT — `/proc/<pid>/stack` requires `CAP_SYS_ADMIN` and is
51//! typically absent for unprivileged callers, so its absence is
52//! not a failure.
53
54use std::collections::VecDeque;
55use std::sync::atomic::{AtomicBool, Ordering};
56use std::sync::{Arc, Mutex};
57use std::thread::{self, JoinHandle};
58use std::time::{Duration, Instant};
59
60use anyhow::Result;
61
62use crate::KTSTR_STALL_POLL_MS_ENV;
63
64/// Default poll cadence when [`KTSTR_STALL_POLL_MS_ENV`] is unset /
65/// empty / 0 / unparseable. 500 ms × W=4 yields a 2 s detection
66/// latency — short enough to catch a stuck scheduler within a
67/// typical ktstr test duration, long enough that procfs reads stay
68/// O(workers) per second rather than swamping the host.
69pub const DEFAULT_POLL_INTERVAL_MS: u64 = 500;
70
71/// Sliding-window size: number of consecutive flat samples that
72/// flip the stall predicate. W=4 with [`DEFAULT_POLL_INTERVAL_MS`]
73/// = 2 s detection latency. Constant rather than env-tunable —
74/// the operator already controls latency via the poll interval,
75/// and a smaller W would false-positive on transient idle.
76///
77/// # False-positive on slow-period workloads
78///
79/// A worker that legitimately runs once per `interval * W`
80/// (e.g. a periodic 10 s sleep tracker on a 2 s poll) will look
81/// "stuck" for the full W-sample window because both counters
82/// stay flat between the worker's wakeups. The fire is a true
83/// "no forward progress" observation — the operator distinguishes
84/// false-positive (intentional slow period) from true stall
85/// (kernel-side hang) via the [`StallDiagnostic::wchan`] field on
86/// the report: a healthy slow-period worker shows a `do_nanosleep`
87/// / `pipe_read` / `epoll_wait` wchan, while a true stall shows
88/// the offending kernel function. Per the no-silent-drops
89/// policy the framework opts for the loud-fire path rather than
90/// guessing the worker's intended cadence.
91pub const STALL_WINDOW: usize = 4;
92
93/// Snapshot of the two scheduler counters this monitor watches.
94///
95/// Mirrors what `kernel/sched/debug.c::proc_sched_show_task`
96/// writes to `/proc/<pid>/sched`. Both fields are cumulative since task
97/// creation; the monitor tracks deltas between consecutive
98/// samples rather than absolute values.
99#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
100pub struct SchedSample {
101 /// `nr_switches` from `/proc/<pid>/sched` — total voluntary +
102 /// involuntary context switches involving this task.
103 pub nr_switches: u64,
104 /// `sum_exec_runtime` from `/proc/<pid>/sched` — cumulative
105 /// on-CPU runtime in nanoseconds.
106 pub sum_exec_runtime_ns: u64,
107 /// Instant the sample was captured. Stored alongside the
108 /// counters so a downstream consumer can compute wall-clock
109 /// deltas without re-deriving the poll cadence. Not serialized
110 /// — Instant is a monotonic-clock opaque value with no portable
111 /// wire form; serde consumers reading a sidecar dump will see
112 /// `Instant::now()` as the default. The wall-clock context for
113 /// the report is carried separately by [`StallReport::captured_at`]
114 /// (also `#[serde(skip)]` for the same reason; sidecar dump
115 /// consumers anchoring across runs should pair the report with
116 /// the run's start timestamp from elsewhere).
117 #[serde(skip, default = "Instant::now")]
118 pub captured_at: Instant,
119}
120
121/// One-shot diagnostic snapshot captured at stall-trip time.
122///
123/// Each field is the contents of a `/proc/<pid>/<field>` file
124/// (or a stand-in describing why the read failed). Field
125/// extraction is best-effort: a missing `/proc/<pid>/stack`
126/// (requires `CAP_SYS_ADMIN`) does NOT block the diagnostic;
127/// every field carries its own `"[unreadable: <reason>]"` stand-in
128/// so an operator triaging the stall can tell apart "kernel
129/// didn't expose it" from "monitor failed to read it".
130#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
131pub struct StallDiagnostic {
132 /// `/proc/<pid>/wchan` — kernel symbol the task is sleeping
133 /// in, or `0` when the task is runnable / wchan is unresolvable.
134 pub wchan: String,
135 /// `/proc/<pid>/syscall` — first field is the syscall number
136 /// the task is blocked in (or `running` when on-CPU).
137 pub syscall: String,
138 /// `/proc/<pid>/status` task-state line ("State: S
139 /// (sleeping)"). Parsed down to the single-letter state
140 /// code; the full status file is preserved in `status_full`.
141 pub state: String,
142 /// `/proc/<pid>/stack` — kernel-stack trace; commonly
143 /// unreadable without `CAP_SYS_ADMIN`. `None` when absent
144 /// (versus an unreadable string in the string fields).
145 pub stack: Option<String>,
146 /// `/proc/<pid>/status` raw — full text for fields beyond
147 /// `State:` the operator may want to inspect (Cpus_allowed,
148 /// Threads, ctxt counts).
149 pub status_full: String,
150 /// `/proc/<pid>/cgroup` — v2 path the task belongs to.
151 pub cgroup: String,
152 /// Host's `/proc/loadavg` at trip time — useful for ruling
153 /// out "stall" caused by extreme host load rather than
154 /// scheduler misbehavior.
155 pub host_loadavg: String,
156}
157
158/// One stall report: a worker pid plus the sample window that
159/// triggered the stall predicate plus the captured diagnostic.
160///
161/// Pushed onto `StallMonitor::reports` by the polling thread the
162/// moment the predicate fires. The thread continues running so
163/// subsequent stalls on the same pid (or other pids) also surface,
164/// but a pid is "re-armed" only after observing forward progress
165/// (any non-zero delta) so a permanently-stuck task fires once and
166/// then stays silent until it moves again — preventing a single
167/// stall from spamming the report list every poll cycle.
168#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
169pub struct StallReport {
170 /// The stalled worker's pid.
171 pub pid: libc::pid_t,
172 /// `comm` field from `/proc/<pid>/comm` at trip time — the
173 /// task's name (`SpinWait_0`, etc.) so an operator can map
174 /// the pid back to a scenario WorkSpec without inspecting
175 /// the diagnostic's full status file.
176 pub comm: String,
177 /// The sample window that triggered the predicate. Length
178 /// equals [`STALL_WINDOW`] at the moment of fire; later
179 /// samples are NOT appended (each report is a snapshot, not
180 /// a rolling view).
181 pub samples: Vec<SchedSample>,
182 /// Wall-clock instant the predicate fired. Not serialized —
183 /// see [`SchedSample::captured_at`] for the rationale.
184 #[serde(skip, default = "Instant::now")]
185 pub captured_at: Instant,
186 /// Diagnostic snapshot captured immediately after the trip.
187 pub diagnostic: StallDiagnostic,
188}
189
190/// Background polling thread + shared report buffer.
191///
192/// Framework-internal — the test author never constructs a
193/// `StallMonitor` directly; the scenario engine spawns one via
194/// [`spawn_monitor`] in `apply_setup` when running host-mode.
195/// Drop the [`StallMonitorHandle`] to stop polling and join the
196/// thread.
197pub(crate) struct StallMonitor {
198 /// Shutdown flag — flipped to `true` by [`StallMonitorHandle::drop`]
199 /// to terminate the polling loop at the next sleep boundary.
200 shutdown: Arc<AtomicBool>,
201 /// Accumulated stall reports. The polling thread appends; the
202 /// owner drains via [`StallMonitorHandle::drain`].
203 reports: Arc<Mutex<Vec<StallReport>>>,
204}
205
206/// Owned handle returned by [`spawn_monitor`]. Dropping joins the
207/// polling thread; calling [`Self::drain`] returns accumulated
208/// reports before the drop point.
209///
210/// Framework-internal — held by `StepState.stall_monitor` for the
211/// per-step lifetime; test authors don't construct or hand out
212/// handles.
213#[must_use = "StallMonitorHandle stops polling on Drop; bind it to a local for the scenario lifetime"]
214pub(crate) struct StallMonitorHandle {
215 monitor: StallMonitor,
216 thread: Option<JoinHandle<()>>,
217}
218
219impl StallMonitorHandle {
220 /// Take ownership of every report accumulated so far.
221 /// Returns an empty Vec when no stalls have been observed.
222 /// Safe to call any number of times.
223 pub(crate) fn drain(&self) -> Vec<StallReport> {
224 let mut guard = self
225 .monitor
226 .reports
227 .lock()
228 .expect("stall-monitor reports mutex poisoned");
229 std::mem::take(&mut *guard)
230 }
231}
232
233impl Drop for StallMonitorHandle {
234 fn drop(&mut self) {
235 // Flip the shutdown flag so the polling loop exits at the
236 // next sleep boundary, then join the thread. The thread
237 // sleeps at most `poll_interval_ms`, so the join latency is
238 // bounded.
239 self.monitor.shutdown.store(true, Ordering::SeqCst);
240 if let Some(handle) = self.thread.take() {
241 // Join errors only on thread panic; log and continue —
242 // a panicked thread already has the shutdown flag
243 // honored implicitly (it's no longer running).
244 if let Err(e) = handle.join() {
245 tracing::warn!(?e, "stall-monitor polling thread panicked");
246 }
247 }
248 }
249}
250
251/// Spawn a background polling thread that watches each pid in
252/// `pids` for stalls. Returns a [`StallMonitorHandle`] whose
253/// [`StallMonitorHandle::drain`] yields the accumulated reports
254/// and whose `Drop` stops the thread.
255///
256/// The poll cadence is read from [`crate::KTSTR_STALL_POLL_MS_ENV`]
257/// (empty / unset / 0 / unparseable → [`DEFAULT_POLL_INTERVAL_MS`]).
258///
259/// Empty `pids` is accepted: the polling thread starts but its
260/// per-iteration loop is a no-op. The caller is expected to gate
261/// the `spawn_monitor` call on `!pids.is_empty()` for the common
262/// case so no thread is spawned at all; the no-op branch exists so
263/// the constructor itself is total.
264pub(crate) fn spawn_monitor(pids: &[libc::pid_t]) -> Result<StallMonitorHandle> {
265 let interval = resolve_poll_interval();
266 let pids: Vec<libc::pid_t> = pids.to_vec();
267 let shutdown = Arc::new(AtomicBool::new(false));
268 let reports: Arc<Mutex<Vec<StallReport>>> = Arc::new(Mutex::new(Vec::new()));
269
270 let thread_shutdown = Arc::clone(&shutdown);
271 let thread_reports = Arc::clone(&reports);
272
273 let thread = thread::Builder::new()
274 .name("ktstr-stall-mon".to_string())
275 .spawn(move || {
276 poll_loop(pids, interval, thread_shutdown, thread_reports);
277 })
278 .map_err(|e| anyhow::anyhow!("failed to spawn stall-monitor thread: {e}"))?;
279
280 Ok(StallMonitorHandle {
281 monitor: StallMonitor { shutdown, reports },
282 thread: Some(thread),
283 })
284}
285
286/// Resolve the poll interval from [`KTSTR_STALL_POLL_MS_ENV`].
287///
288/// Contract: empty / unset / `0` / unparseable falls back to
289/// [`DEFAULT_POLL_INTERVAL_MS`]. Any positive `u64` value is honored
290/// verbatim. Mirrors the empty-string-as-unset contract documented
291/// on the env-var const.
292fn resolve_poll_interval() -> Duration {
293 let ms = std::env::var(KTSTR_STALL_POLL_MS_ENV)
294 .ok()
295 .filter(|v| !v.is_empty())
296 .and_then(|v| v.trim().parse::<u64>().ok())
297 .filter(|&n| n > 0)
298 .unwrap_or(DEFAULT_POLL_INTERVAL_MS);
299 Duration::from_millis(ms)
300}
301
302/// Polling loop body. Runs until `shutdown` flips. Each iteration
303/// samples every pid in `pids`, advances per-pid sliding windows,
304/// and appends a [`StallReport`] when the predicate trips.
305fn poll_loop(
306 pids: Vec<libc::pid_t>,
307 interval: Duration,
308 shutdown: Arc<AtomicBool>,
309 reports: Arc<Mutex<Vec<StallReport>>>,
310) {
311 // Per-pid state: the sliding sample window plus an "armed"
312 // flag that prevents a permanently-stuck pid from spamming the
313 // report list every iteration. The pid stays "disarmed" until
314 // forward progress is observed (any non-zero delta in either
315 // counter), at which point it re-arms and can fire again on
316 // the next stall window.
317 let mut windows: Vec<(libc::pid_t, VecDeque<SchedSample>, bool)> = pids
318 .iter()
319 .map(|&p| (p, VecDeque::with_capacity(STALL_WINDOW), true))
320 .collect();
321
322 while !shutdown.load(Ordering::SeqCst) {
323 for (pid, window, armed) in windows.iter_mut() {
324 let sample = match read_sched_sample(*pid) {
325 Some(s) => s,
326 // pid has exited or /proc/<pid>/sched is unreadable
327 // — clear the window so we don't false-fire on a
328 // pre-exit flat tail, and continue.
329 None => {
330 window.clear();
331 continue;
332 }
333 };
334 if process_iteration(sample, window, armed) {
335 let samples: Vec<SchedSample> = window.iter().copied().collect();
336 let comm =
337 read_comm(*pid).unwrap_or_else(|reason| format!("[unreadable: {reason}]"));
338 let diagnostic = capture_diagnostic(*pid);
339 let report = StallReport {
340 pid: *pid,
341 comm,
342 samples,
343 captured_at: Instant::now(),
344 diagnostic,
345 };
346 {
347 let mut guard = reports
348 .lock()
349 .expect("stall-monitor reports mutex poisoned");
350 guard.push(report);
351 }
352 }
353 }
354 // Sleep `interval` total but honor shutdown promptly: chunk
355 // the sleep into 50 ms slices so the worst-case shutdown
356 // latency is ~50 ms instead of `interval`. The cap exists
357 // because `thread::sleep` is uninterruptible from the
358 // outside — a sole `thread::sleep(interval)` for an
359 // interval of (say) 5 s would block shutdown for up to 5
360 // seconds; the 50 ms cap keeps shutdown latency bounded
361 // independently of the polling cadence.
362 let chunk = Duration::from_millis(50).min(interval);
363 let mut remaining = interval;
364 while remaining > Duration::ZERO && !shutdown.load(Ordering::SeqCst) {
365 let slice = chunk.min(remaining);
366 thread::sleep(slice);
367 remaining = remaining.saturating_sub(slice);
368 }
369 }
370}
371
372/// Per-pid polling-iteration state update.
373///
374/// Folds the four-step transition the poll loop performs for each
375/// pid each iteration into a single testable function:
376///
377/// 1. Push `sample` onto `window`.
378/// 2. Evict the oldest sample if the window now exceeds
379/// [`STALL_WINDOW`].
380/// 3. Re-arm `armed` when the two most recent samples show
381/// forward progress in either counter (`nr_switches` or
382/// `sum_exec_runtime_ns`).
383/// 4. When `armed` AND [`stall_predicate`] fires, return `true`
384/// (caller emits a [`StallReport`]) and disarm.
385///
386/// Returns `true` when this iteration produced a stall — the
387/// caller is expected to construct a [`StallReport`] from the
388/// window contents and the pid. Returns `false` otherwise.
389///
390/// Extracted from [`poll_loop`] so unit tests can exercise the
391/// re-arm + spawn-gate semantics without spinning up the polling
392/// thread (which would require real PIDs + readable
393/// `/proc/<pid>/sched`).
394pub(crate) fn process_iteration(
395 sample: SchedSample,
396 window: &mut VecDeque<SchedSample>,
397 armed: &mut bool,
398) -> bool {
399 window.push_back(sample);
400 while window.len() > STALL_WINDOW {
401 window.pop_front();
402 }
403 // Re-arm whenever forward progress is observed between the
404 // two most recent samples — even if armed already (cheap
405 // branch).
406 if window.len() >= 2 {
407 let last = window[window.len() - 1];
408 let prev = window[window.len() - 2];
409 if last.nr_switches != prev.nr_switches
410 || last.sum_exec_runtime_ns != prev.sum_exec_runtime_ns
411 {
412 *armed = true;
413 }
414 }
415 if *armed && stall_predicate(window.make_contiguous()) {
416 *armed = false;
417 true
418 } else {
419 false
420 }
421}
422
423/// Stall predicate: returns `true` when `samples.len() >=
424/// STALL_WINDOW` AND every consecutive pair has both `nr_switches`
425/// delta == 0 AND `sum_exec_runtime_ns` delta == 0. A window
426/// shorter than [`STALL_WINDOW`] never fires (insufficient signal).
427///
428/// Extracted as a free function so a unit test can exercise the
429/// predicate directly with synthetic [`SchedSample`] sequences
430/// without spinning up the polling thread.
431pub fn stall_predicate(samples: &[SchedSample]) -> bool {
432 if samples.len() < STALL_WINDOW {
433 return false;
434 }
435 for pair in samples.windows(2) {
436 if pair[0].nr_switches != pair[1].nr_switches
437 || pair[0].sum_exec_runtime_ns != pair[1].sum_exec_runtime_ns
438 {
439 return false;
440 }
441 }
442 true
443}
444
445/// Parse `/proc/<pid>/sched` content into a [`SchedSample`].
446///
447/// The kernel format (per `kernel/sched/debug.c::proc_sched_show_task`)
448/// is:
449///
450/// ```text
451/// <comm> (<pid>, #threads: N)
452/// ---------------------------
453/// se.exec_start : 12345.6789
454/// ...
455/// nr_switches : 42
456/// nr_voluntary_switches : 30
457/// ...
458/// se.sum_exec_runtime : 1234.567890
459/// ...
460/// ```
461///
462/// Each "key : value" line uses arbitrary whitespace around the
463/// `:`. The parser extracts the two named keys and ignores
464/// everything else; missing keys yield `None`.
465///
466/// `se.sum_exec_runtime` is emitted as fractional milliseconds by
467/// `kernel/sched/debug.c::proc_sched_show_task` via the `PN` macro
468/// (`SPLIT_NS` divides the ns value by 1_000_000 and formats with 6
469/// decimal places). This parser reads it as an f64 in ms and
470/// multiplies by 1_000_000 to recover nanoseconds.
471pub fn parse_sched_file(content: &str) -> Option<(u64, u64)> {
472 let mut nr_switches: Option<u64> = None;
473 let mut sum_exec_runtime_ns: Option<u64> = None;
474 for line in content.lines() {
475 // Lines we care about are `<key> : <value>`. Skip the
476 // header and dashed separator.
477 let Some((key, value)) = line.split_once(':') else {
478 continue;
479 };
480 let key = key.trim();
481 let value = value.trim();
482 match key {
483 "nr_switches" => {
484 nr_switches = value.parse::<u64>().ok();
485 }
486 "se.sum_exec_runtime" => {
487 // Kernel emits as fractional milliseconds (e.g.
488 // `1234.567890`) per `kernel/sched/debug.c`'s
489 // `SPLIT_NS` macro — that's `value_ns / 1_000_000`
490 // formatted with 6 decimal places. Recover ns by
491 // parsing as f64 and multiplying back.
492 //
493 // Key carries the `se.` prefix because the kernel
494 // formats this field via the `PN(se.sum_exec_runtime)`
495 // macro at `kernel/sched/debug.c`'s
496 // `proc_sched_show_task` — `PN` stringifies the FULL
497 // expression including the `se.` qualifier. Matching
498 // the bare `sum_exec_runtime` (without prefix) would
499 // silently drop every real-kernel sample.
500 sum_exec_runtime_ns = value
501 .parse::<f64>()
502 .ok()
503 .map(|ms| (ms * 1_000_000.0) as u64);
504 }
505 _ => {}
506 }
507 if nr_switches.is_some() && sum_exec_runtime_ns.is_some() {
508 break;
509 }
510 }
511 match (nr_switches, sum_exec_runtime_ns) {
512 (Some(n), Some(r)) => Some((n, r)),
513 _ => None,
514 }
515}
516
517/// Read `/proc/<pid>/sched` and return a [`SchedSample`]. Returns
518/// `None` when the file is unreadable (pid exited, EACCES, parse
519/// failure).
520fn read_sched_sample(pid: libc::pid_t) -> Option<SchedSample> {
521 let content = std::fs::read_to_string(format!("/proc/{pid}/sched")).ok()?;
522 let (nr_switches, sum_exec_runtime_ns) = parse_sched_file(&content)?;
523 Some(SchedSample {
524 nr_switches,
525 sum_exec_runtime_ns,
526 captured_at: Instant::now(),
527 })
528}
529
530/// Read `/proc/<pid>/comm` and return the task name with the
531/// trailing newline stripped. Returns the IO error's `to_string`
532/// when the read fails.
533fn read_comm(pid: libc::pid_t) -> std::result::Result<String, String> {
534 std::fs::read_to_string(format!("/proc/{pid}/comm"))
535 .map(|s| s.trim_end_matches('\n').to_string())
536 .map_err(|e| e.to_string())
537}
538
539/// Capture a [`StallDiagnostic`] for a stalled pid. Every field is
540/// read independently; an unreadable field becomes a
541/// `"[unreadable: <reason>]"` stand-in rather than aborting the
542/// snapshot — `/proc/<pid>/stack` in particular is privileged and
543/// commonly absent.
544fn capture_diagnostic(pid: libc::pid_t) -> StallDiagnostic {
545 let wchan = read_proc_field(pid, "wchan");
546 let syscall = read_proc_field(pid, "syscall");
547 let status_full = read_proc_field(pid, "status");
548 let state = extract_state_letter(&status_full);
549 let cgroup = read_proc_field(pid, "cgroup");
550 // `/proc/<pid>/stack` is privileged; treat absence as the
551 // expected case (None) rather than a read failure.
552 let stack = std::fs::read_to_string(format!("/proc/{pid}/stack")).ok();
553 let host_loadavg = std::fs::read_to_string("/proc/loadavg")
554 .map(|s| s.trim_end_matches('\n').to_string())
555 .unwrap_or_else(|e| format!("[unreadable: {e}]"));
556 StallDiagnostic {
557 wchan,
558 syscall,
559 state,
560 stack,
561 status_full,
562 cgroup,
563 host_loadavg,
564 }
565}
566
567/// Read `/proc/<pid>/<field>` and return its contents trimmed.
568/// Failures (EACCES, ENOENT) become a `"[unreadable: <reason>]"`
569/// stand-in so the diagnostic always carries a value per field —
570/// the operator triaging a stall can tell apart "kernel didn't
571/// expose it" from "monitor failed to read it".
572fn read_proc_field(pid: libc::pid_t, field: &str) -> String {
573 match std::fs::read_to_string(format!("/proc/{pid}/{field}")) {
574 Ok(s) => s.trim_end_matches('\n').to_string(),
575 Err(e) => format!("[unreadable: {e}]"),
576 }
577}
578
579/// Parse the `State:` line out of a `/proc/<pid>/status` body and
580/// return just the single-letter state code (e.g. `"S"` for
581/// sleeping, `"R"` for running, `"D"` for uninterruptible sleep).
582/// Falls back to `"?"` when the line is absent or malformed.
583fn extract_state_letter(status: &str) -> String {
584 for line in status.lines() {
585 if let Some(rest) = line.strip_prefix("State:") {
586 let rest = rest.trim();
587 // Format: `<letter> (<description>)` per
588 // `fs/proc/array.c::task_state_array`. Take the first
589 // whitespace-delimited token.
590 if let Some(letter) = rest.split_whitespace().next() {
591 return letter.to_string();
592 }
593 }
594 }
595 "?".to_string()
596}
597
598#[cfg(test)]
599mod tests {
600 use super::*;
601
602 /// `parse_sched_file` recovers both counters from a realistic
603 /// `/proc/<pid>/sched` fragment containing the headers and
604 /// extra fields the kernel emits. Pins the parser against the
605 /// arbitrary-whitespace + interleaved-other-lines reality of
606 /// kernel output, AND against the exact `se.sum_exec_runtime`
607 /// key the kernel emits (prefix included).
608 #[test]
609 fn parse_sched_file_extracts_signals() {
610 let content = "\
611worker_0 (12345, #threads: 1)
612-------------------------------------------------------------------
613se.exec_start : 123456789.123456
614se.vruntime : 789.012345
615se.sum_exec_runtime : 1234.567890
616nr_migrations : 7
617nr_switches : 42
618nr_voluntary_switches : 30
619nr_involuntary_switches : 12
620clock-delta : 0
621";
622 let parsed = parse_sched_file(content).expect("both fields present");
623 assert_eq!(parsed.0, 42, "nr_switches");
624 // 1234.567890 ms = 1_234_567_890 ns.
625 assert_eq!(parsed.1, 1_234_567_890, "sum_exec_runtime in ns");
626 }
627
628 /// Defense-in-depth against kernel-key-format regression: read
629 /// the LIVE `/proc/self/sched` for the test process and verify
630 /// the parser extracts plausible counters. A future
631 /// kernel-format change (e.g. dropping the `se.` prefix,
632 /// renaming `nr_switches`, switching to a non-SPLIT_NS time
633 /// format) would fail this test loudly instead of silently
634 /// disabling the monitor.
635 #[test]
636 fn parse_sched_file_handles_live_proc_self_sched() {
637 let Ok(content) = std::fs::read_to_string("/proc/self/sched") else {
638 // /proc not available (sandboxed test runner without
639 // procfs) — skip rather than fail. The fixture-based
640 // test above already covers the parser; this test's
641 // value is defense against real-kernel drift.
642 return;
643 };
644 let parsed = parse_sched_file(&content)
645 .expect("live /proc/self/sched MUST parse — kernel-format regression");
646 // nr_switches MUST be >= 1 by the time this test runs (the
647 // process has at least spawned + scheduled + read the file).
648 assert!(
649 parsed.0 >= 1,
650 "live nr_switches must be >= 1, got {}",
651 parsed.0
652 );
653 // sum_exec_runtime_ns MUST be > 0 — process has accumulated
654 // some CPU time to reach this assertion.
655 assert!(
656 parsed.1 > 0,
657 "live sum_exec_runtime_ns must be > 0, got {}",
658 parsed.1
659 );
660 }
661
662 fn s(nr: u64, ns: u64) -> SchedSample {
663 SchedSample {
664 nr_switches: nr,
665 sum_exec_runtime_ns: ns,
666 captured_at: Instant::now(),
667 }
668 }
669
670 /// Window with W consecutive identical samples → predicate
671 /// fires. Pins the core stall detection contract.
672 #[test]
673 fn stall_predicate_fires_after_w_samples_no_delta() {
674 let samples: Vec<SchedSample> = (0..STALL_WINDOW).map(|_| s(100, 5_000)).collect();
675 assert!(
676 stall_predicate(&samples),
677 "all-flat window of W samples must fire"
678 );
679 }
680
681 /// Window with ANY consecutive-pair delta > 0 → predicate
682 /// does NOT fire, even if every other pair is flat. Pins the
683 /// "any progress disqualifies" semantic.
684 #[test]
685 fn stall_predicate_skips_when_delta_present() {
686 // Three flat samples then one with a switch — the
687 // last-pair delta is non-zero so predicate stays false.
688 let samples = vec![s(100, 5_000), s(100, 5_000), s(100, 5_000), s(101, 5_000)];
689 assert!(
690 !stall_predicate(&samples),
691 "any non-zero delta in any consecutive pair must keep predicate false",
692 );
693 // Same for an exec_runtime move.
694 let samples = vec![s(100, 5_000), s(100, 5_000), s(100, 5_001), s(100, 5_001)];
695 assert!(
696 !stall_predicate(&samples),
697 "exec_runtime delta in any pair must keep predicate false",
698 );
699 // And a window shorter than W never fires regardless.
700 let short: Vec<SchedSample> = (0..STALL_WINDOW - 1).map(|_| s(100, 5_000)).collect();
701 assert!(
702 !stall_predicate(&short),
703 "window shorter than STALL_WINDOW must not fire (insufficient signal)",
704 );
705 }
706
707 /// `capture_diagnostic` against pid 0 (an invalid pid that
708 /// fails every read) returns a populated [`StallDiagnostic`]
709 /// where every string field carries the `"[unreadable: ...]"`
710 /// stand-in and `stack` is `None`. Pins the
711 /// graceful-degradation contract: a privileged or missing
712 /// field never aborts the snapshot.
713 #[test]
714 fn diagnostic_capture_skips_unreadable_fields() {
715 // pid 0 is reserved and /proc/0/* never exists.
716 let diag = capture_diagnostic(0);
717 assert!(
718 diag.wchan.starts_with("[unreadable:"),
719 "wchan must degrade: got {:?}",
720 diag.wchan,
721 );
722 assert!(
723 diag.syscall.starts_with("[unreadable:"),
724 "syscall must degrade: got {:?}",
725 diag.syscall,
726 );
727 assert!(
728 diag.status_full.starts_with("[unreadable:"),
729 "status must degrade: got {:?}",
730 diag.status_full,
731 );
732 assert!(
733 diag.cgroup.starts_with("[unreadable:"),
734 "cgroup must degrade: got {:?}",
735 diag.cgroup,
736 );
737 // status_full degraded → state extraction yields "?".
738 assert_eq!(diag.state, "?", "unreadable status → state = ?");
739 // /proc/0/stack is also absent.
740 assert!(diag.stack.is_none(), "missing stack must remain None");
741 // host_loadavg should succeed on any healthy Linux host
742 // even when the pid is bogus; either it reads or carries
743 // the unreadable stand-in.
744 assert!(
745 !diag.host_loadavg.is_empty(),
746 "host_loadavg must always populate (success OR stand-in)",
747 );
748 }
749
750 /// Sliding window honors [`STALL_WINDOW`]: once full, the
751 /// oldest sample is evicted as new ones arrive, so the
752 /// predicate always sees the most-recent W samples. Pins the
753 /// "predicate sees last W" guarantee.
754 #[test]
755 fn ring_buffer_sliding_window_correctness() {
756 // Simulate the poll loop's window management without
757 // spinning the actual thread: push samples through a
758 // VecDeque with the same eviction policy and verify the
759 // contents.
760 let mut window: VecDeque<SchedSample> = VecDeque::with_capacity(STALL_WINDOW);
761 // Push 6 samples (more than W=4): values 0..6.
762 for i in 0..(STALL_WINDOW + 2) {
763 window.push_back(s(i as u64, i as u64 * 10));
764 while window.len() > STALL_WINDOW {
765 window.pop_front();
766 }
767 }
768 assert_eq!(
769 window.len(),
770 STALL_WINDOW,
771 "window size must stay at STALL_WINDOW after overflow",
772 );
773 // The oldest two were evicted; the head should be sample
774 // index 2 (out of 0..6 pushed).
775 let head = window.front().expect("window non-empty");
776 assert_eq!(
777 head.nr_switches, 2,
778 "oldest sample must be index 2 after 2 evictions"
779 );
780 let tail = window.back().expect("window non-empty");
781 assert_eq!(
782 tail.nr_switches,
783 (STALL_WINDOW + 1) as u64,
784 "newest sample must be the last pushed (index W+1)",
785 );
786 // None of these consecutive pairs is flat (every pair
787 // differs by 1), so the predicate must NOT fire even
788 // though the window is full.
789 let snap: Vec<SchedSample> = window.iter().copied().collect();
790 assert!(
791 !stall_predicate(&snap),
792 "monotonic samples must not trip predicate"
793 );
794 }
795
796 // -- process_iteration tests --
797 //
798 // Pin the per-pid state machine extracted from poll_loop:
799 // re-arm after stall-then-resume, spawn-gate behavior
800 // when the initial window is empty, and the fire-disarm
801 // sequence so a permanently-stuck pid produces ONE report
802 // per stall window, not one per polling iteration.
803
804 /// Spawn-gate behavior: an initial window starts armed but
805 /// empty. The first STALL_WINDOW samples can't fire (predicate
806 /// requires >= STALL_WINDOW samples). Only the STALL_WINDOW'th
807 /// flat sample can trip the stall.
808 #[test]
809 fn process_iteration_spawn_gate_short_window_never_fires() {
810 let mut window: VecDeque<SchedSample> = VecDeque::with_capacity(STALL_WINDOW);
811 let mut armed = true;
812 // Feed STALL_WINDOW - 1 identical samples; predicate
813 // sees a short window every iteration and stays false.
814 for _ in 0..(STALL_WINDOW - 1) {
815 assert!(
816 !process_iteration(s(100, 5_000), &mut window, &mut armed),
817 "short window must not fire (spawn-gate semantic)",
818 );
819 }
820 assert!(armed, "no resume seen → stays armed");
821 // The STALL_WINDOW'th identical sample fills the window
822 // → predicate fires.
823 assert!(
824 process_iteration(s(100, 5_000), &mut window, &mut armed),
825 "Wth flat sample fills window AND trips predicate → fire",
826 );
827 assert!(!armed, "fire path disarms");
828 }
829
830 /// Re-arm after stall-then-resume: a fired stall disarms
831 /// the pid; resume (forward progress) re-arms; a subsequent
832 /// flat window fires AGAIN. A regression that
833 /// dropped the re-arm branch would never produce the second
834 /// report, silently hiding recurrent stalls.
835 #[test]
836 fn process_iteration_rearm_after_stall_then_resume() {
837 let mut window: VecDeque<SchedSample> = VecDeque::with_capacity(STALL_WINDOW);
838 let mut armed = true;
839 // Fill window with flat samples → first stall fires.
840 for _ in 0..STALL_WINDOW {
841 process_iteration(s(100, 5_000), &mut window, &mut armed);
842 }
843 assert!(!armed, "after first fire, disarmed");
844 // Resume: feed a sample with forward progress.
845 // Predicate stays false (most-recent pair has delta), and
846 // the re-arm branch flips armed back to true.
847 assert!(
848 !process_iteration(s(101, 5_001), &mut window, &mut armed),
849 "resume sample must not fire (last pair has delta)",
850 );
851 assert!(armed, "resume sample must re-arm");
852 // Re-stall: feed STALL_WINDOW identical samples. Need to
853 // overwrite the resume sample first, so STALL_WINDOW more
854 // flat samples are needed to refill the window.
855 let mut second_fire_iter = None;
856 for i in 0..STALL_WINDOW {
857 if process_iteration(s(101, 5_001), &mut window, &mut armed) {
858 second_fire_iter = Some(i);
859 break;
860 }
861 }
862 assert!(
863 second_fire_iter.is_some(),
864 "second stall window must fire after re-arm; got no fire across {} iters",
865 STALL_WINDOW,
866 );
867 assert!(!armed, "second fire disarms");
868 }
869
870 /// Permanently-stuck pid produces EXACTLY ONE report per
871 /// stall window, not one per polling iteration. Pins the
872 /// "stays disarmed until forward progress" semantic so a
873 /// hung process doesn't spam the reports vec across hundreds
874 /// of iterations.
875 #[test]
876 fn process_iteration_permanent_stall_fires_only_once() {
877 let mut window: VecDeque<SchedSample> = VecDeque::with_capacity(STALL_WINDOW);
878 let mut armed = true;
879 // Fill window + fire (one report).
880 let mut fire_count = 0;
881 for _ in 0..STALL_WINDOW {
882 if process_iteration(s(100, 5_000), &mut window, &mut armed) {
883 fire_count += 1;
884 }
885 }
886 assert_eq!(fire_count, 1, "first window MUST fire exactly once");
887 // 100 more flat samples — disarmed, predicate keeps
888 // matching but fire path is gated on `armed`.
889 for _ in 0..100 {
890 if process_iteration(s(100, 5_000), &mut window, &mut armed) {
891 fire_count += 1;
892 }
893 }
894 assert_eq!(
895 fire_count, 1,
896 "permanently-stuck pid must NOT spam reports — exactly one fire across many iters",
897 );
898 }
899}