ktstr/workload/schbench/
run.rs

1//! schbench's run engine, ported faithfully from `schbench.c`: the
2//! message-thread / worker-thread topology, the lockless wait-list, the
3//! handshake-driven wakeup-latency loop, and the matrix work under the
4//! per-CPU lock. The default (non-RPS) mode is the wakeup-latency +
5//! request-latency benchmark, with per-phase schedstat (run-delay) capture; the
6//! RPS-injector mode (`-R`) and its auto-RPS closed-loop rate control (`-A`)
7//! layer on the request queue + once-per-second control thread; pipe mode (`-p`)
8//! swaps the matrix work for a memory-transfer simulation over the same handshake.
9//!
10//! # Topology (`schbench.c` `message_thread` :1540, `worker_thread` :1419)
11//!
12//! `message_threads` message threads each spawn `worker_threads` worker
13//! threads in one process (schbench is single-process pthreads; schbench_rs
14//! re-expresses that with [`std::thread`], so the
15//! [`handshake`](super::handshake) futex is PRIVATE and the per-CPU locks +
16//! matrices live in one address space). A worker loops: block until its
17//! message thread wakes it (measuring wakeup latency), then think-sleep +
18//! matrix work under the per-CPU lock (measuring request latency). The message
19//! thread batch-wakes all waiting workers (`run_msg_thread` :1166 /
20//! `xlist_wake_all` :969).
21//!
22//! # Fidelity
23//!
24//! - Clock: `CLOCK_MONOTONIC` (ruling), not schbench's `gettimeofday`
25//!   wall-clock. Monotonic is freeze-robust (a host-side VM pause cannot make
26//!   a delta go negative) and is the correct source for a latency delta. The
27//!   measured quantity -- elapsed nanoseconds between two reads -- is identical
28//!   to what schbench measures; only the clock id differs.
29//! - The lockless wait-list is a Treiber stack ([`TreiberStack`]) over an
30//!   intrusive next-pointer, matching schbench's `xlist_add`/`xlist_splice`
31//!   cmpxchg list (`schbench.c:866-896`): userspace CAS only, no lock/syscall
32//!   on the hot path, so the syscall profile stays futex-dominated like
33//!   schbench's. (schbench hand-duplicates this for its thread list and its
34//!   request list; schbench_rs shares one generic implementation.)
35//! - do_work timing: the matrix multiply compiles to the SAME scalar inner loop
36//!   as schbench's `-O2` `do_some_math` -- a `u64` multiply-accumulate
37//!   (`imul` with a fused memory operand, then `add`), NOT SIMD. It does not
38//!   auto-vectorize: there is no vector `u64` multiply on the build target
39//!   (`x86-64-v3`; `vpmullq` is AVX-512) and the serial accumulator reduction
40//!   blocks SLP -- exactly as for schbench's `-O2` build (its Makefile
41//!   `CFLAGS = -Wall -O2`, no `-march`, likewise scalar; verified by disassembly
42//!   of both). A debug build (opt-level 0) emits far more overhead (no register
43//!   allocation, unrolling, or memory-operand fusion), so request latency runs
44//!   higher than reference schbench's. ABSOLUTE side-by-side fidelity vs the
45//!   reference (the `ktstr-schbench-validate` comparison) therefore needs a
46//!   release build. The in-ktstr A/B comparison (perf-delta / per-phase claims)
47//!   is build-invariant -- both sides use the same build -- so it is unaffected.
48
49use core::cell::UnsafeCell;
50use core::ptr;
51use core::sync::atomic::{AtomicBool, AtomicPtr, AtomicU32, AtomicU64, AtomicUsize, Ordering};
52use std::fs::File;
53use std::io::{Read, Seek, SeekFrom};
54
55use super::percpu_lock::PerCpuLocks;
56use super::plat::{Percentiles, PlatStats};
57
58/// Read `CLOCK_MONOTONIC` as nanoseconds (ruling: monotonic, not wall-clock).
59/// Self-contained so the schbench modules build into the standalone validation
60/// binary without reaching into `worker`'s `pub(super)` clock wrapper.
61fn monotonic_nanos() -> u64 {
62    // SAFETY: `clock_gettime` writes a `timespec` through the out-pointer and
63    // reads nothing else; CLOCK_MONOTONIC is always available on Linux.
64    let mut ts: libc::timespec = unsafe { core::mem::zeroed() };
65    let rc = unsafe { libc::clock_gettime(libc::CLOCK_MONOTONIC, &mut ts) };
66    assert_eq!(rc, 0, "clock_gettime(CLOCK_MONOTONIC) failed");
67    (ts.tv_sec as u64) * 1_000_000_000 + ts.tv_nsec as u64
68}
69
70/// A node that can be linked into a [`TreiberStack`] via an intrusive
71/// next-pointer it owns.
72trait Linked: Sized {
73    fn next_link(&self) -> &AtomicPtr<Self>;
74}
75
76/// Lockless LIFO stack via an intrusive next-pointer, the shared form of
77/// schbench's `xlist_add`/`xlist_splice` (`schbench.c:866-896`). Nodes are
78/// referenced by raw pointer and must outlive every concurrent operation; the
79/// caller owns their storage and lifetimes.
80struct TreiberStack<T: Linked> {
81    head: AtomicPtr<T>,
82}
83
84impl<T: Linked> TreiberStack<T> {
85    fn new() -> Self {
86        Self {
87            head: AtomicPtr::new(ptr::null_mut()),
88        }
89    }
90
91    /// Push `node` onto the stack. Faithful port of `xlist_add`
92    /// (`schbench.c:866`): set the node's link to the current head, then
93    /// CAS-publish it, retrying on contention. The caller must not push a node
94    /// that is already on the stack (its link would be overwritten while
95    /// reachable) -- schbench upholds this (a worker re-queues only after being
96    /// spliced off and woken).
97    fn add(&self, node: *mut T) {
98        // SAFETY: `node` is a valid pointer to a `T` that outlives all list
99        // operations (caller contract).
100        let link = unsafe { (*node).next_link() };
101        loop {
102            let old = self.head.load(Ordering::Acquire);
103            link.store(old, Ordering::Relaxed);
104            if self
105                .head
106                .compare_exchange(old, node, Ordering::AcqRel, Ordering::Acquire)
107                .is_ok()
108            {
109                return;
110            }
111        }
112    }
113
114    /// Atomically take the whole stack, leaving it empty, returning the head
115    /// (null if empty). Faithful port of `xlist_splice` (`schbench.c:884`): an
116    /// atomic swap to null (the single-op equivalent of schbench's CAS loop).
117    /// The returned chain is walked via [`Linked::next_link`] in LIFO order
118    /// (schbench's thread-list splice does not reverse; wake order is
119    /// irrelevant).
120    fn splice(&self) -> *mut T {
121        self.head.swap(ptr::null_mut(), Ordering::AcqRel)
122    }
123
124    /// Atomically take the whole stack and return the chain in FIFO (insertion)
125    /// order -- the reverse of [`splice`](Self::splice)'s LIFO. Faithful port of
126    /// `request_splice` (`schbench.c:920-940`): swap to null, then reverse the
127    /// spliced chain so the consumer services the oldest queued request first
128    /// (request order matters, unlike the thread wait-list). Returns null if
129    /// empty. Single-consumer: only the owning worker drains its own queue.
130    fn splice_reversed(&self) -> *mut T {
131        let mut cur = self.head.swap(ptr::null_mut(), Ordering::AcqRel);
132        let mut rev: *mut T = ptr::null_mut();
133        while !cur.is_null() {
134            // SAFETY: `cur` is a node that was published on the stack and whose
135            // storage the caller owns; the single consumer reverses the chain it
136            // just took exclusive ownership of, so no other thread touches these
137            // links concurrently.
138            let link = unsafe { (*cur).next_link() };
139            let next = link.load(Ordering::Acquire);
140            link.store(rev, Ordering::Relaxed);
141            rev = cur;
142            cur = next;
143        }
144        rev
145    }
146}
147
148/// A queued work request in RPS-injector mode (`schbench.c` `struct request`,
149/// `:757`). The RPS thread heap-allocates one (`Box`) per request and links it
150/// onto the target worker's queue; the worker drains its queue and frees each --
151/// matching schbench's per-request malloc/free (part of the syscall profile the
152/// standalone validation compares). schbench's `start_time` field is set
153/// (`:951`) but never read, so it is omitted: the wakeup latency is measured
154/// from the worker's `wake_time` (stamped by the RPS thread at enqueue, and by
155/// the worker itself before it blocks -- see [`rps_wait`]) and the request
156/// latency from a local work-start -- neither reads `start_time`.
157/// Omitting it also drops schbench's per-request `gettimeofday` for that dead
158/// field (`:951`) -- one fewer clock syscall per request, consistent with the
159/// engine's `CLOCK_MONOTONIC`-vs-`gettimeofday` clock divergence.
160struct Request {
161    /// Intrusive link for the per-worker request [`TreiberStack`].
162    next: AtomicPtr<Request>,
163}
164
165impl Request {
166    fn new() -> Self {
167        Self {
168            next: AtomicPtr::new(ptr::null_mut()),
169        }
170    }
171}
172
173impl Linked for Request {
174    fn next_link(&self) -> &AtomicPtr<Self> {
175        &self.next
176    }
177}
178
179/// Declarative config for the [`Schbench`](crate::workload::WorkType::Schbench)
180/// workload. Construct via [`SchbenchConfig::default`] (schbench's own
181/// defaults) plus the chainable setters, e.g.
182/// `SchbenchConfig::default().message_threads(2).worker_threads(4)`. Derives
183/// Clone/Debug/PartialEq/Eq/Hash/serde; the builder shape follows
184/// [`WorkloadConfig`](crate::workload::WorkloadConfig), but `Eq`+`Hash` (which
185/// `WorkloadConfig` and `WorkSpec` omit because of their transitive `f64`) are
186/// available here since every field is integer/bool -- the ktstr f64-free
187/// leaf-config convention.
188///
189/// # schbench(8) CLI parity
190///
191/// This port re-expresses schbench's default (matrix-work) mode natively, so
192/// its tunables are config fields and topology rather than CLI flags. The
193/// mapping to schbench's option table (`schbench.c:138-187`):
194///
195/// | schbench flag | ktstr |
196/// |---|---|
197/// | `-m` message-threads | `message_threads` |
198/// | `-t` threads | `worker_threads` (workers per message thread; `0` = `ceil(cpuset_cpus / message_threads)`, see below) |
199/// | `-F` cache_footprint | `cache_footprint_kib` |
200/// | `-n` operations | `operations` |
201/// | `-s` sleep_usec | `sleep_usec` |
202/// | `-L` no-locking | `skip_locking` |
203/// | `-R` rps | `requests_per_sec` |
204/// | `-A` auto-rps | `auto_rps` |
205/// | `--split` (long-only) | `split_percent` (`None` = no split, all-private) |
206/// | `-p` pipe (also `--pipe`) | `pipe_transfer_bytes` (`0` = off; memory-transfer mode, no matrix work) |
207/// | `-r` runtime | in-VM: the scenario engine's run window (the engine runs until `stop`); host-side: the `run_secs` argument to [`run_standalone`](crate::workload::run_standalone) |
208///
209/// ## Set by ktstr topology, not a flag
210///
211/// - `-t` default: with `worker_threads = 0`, ktstr matches schbench's `-t`
212///   0-default -- it divides the CPU count across the message threads,
213///   `ceil(cpus / message_threads)` per thread (`schbench.c:1849-1852`), so the
214///   total worker count stays near the CPU count. ktstr scopes "cpus" to the
215///   allocated guest cpuset (the worker's `sched_getaffinity` mask, set by the
216///   scenario's topology / `CgroupDef`) rather than schbench's `get_nprocs`, so
217///   the total is ≈ the cpuset's CPU count. An explicit non-zero `worker_threads`
218///   is workers-per-message-thread in both.
219/// - `-M` (message-cpus) / `-W` (worker-cpus) thread pinning: ktstr places
220///   threads through its affinity / cpuset layer, so there is deliberately no
221///   per-thread-pin knob.
222///
223/// ## Observability flags -> the metric API
224///
225/// schbench's `-w` (warmuptime), `-i` (intervaltime), `-z` (zerotime), `-j`
226/// (json), and `-J` (jobname) shape its streaming stderr/JSON report. ktstr's
227/// numbers flow through the metric API instead -- per-phase attribution and the
228/// sidecar -- so these have no flag equivalent. `ktstr-schbench-validate`
229/// reproduces schbench's stderr-table shape for a side-by-side comparison.
230///
231/// ## Split mode (`--split`)
232///
233/// `Some(p)` partitions `cache_footprint_kib` into a per-thread private matrix
234/// (`p`%) and ONE process-global shared matrix (`100-p`%) that every worker
235/// multiplies into concurrently, reproducing schbench's cross-core
236/// shared-working-set cache contention (`schbench.c:1390-1404`, `:1858-1863`).
237/// ktstr models the shared matrix with `AtomicU64` `Relaxed` accesses. Like
238/// schbench's emitted code, the shared kernel keeps the running sum in a register
239/// and STORES it to each shared C cell on every inner (`k`) iteration -- C is
240/// write-only in the loop (A and B are loaded each `k`, C is never reloaded), and
241/// that per-k store is what generates the contention. Both gcc and clang keep the
242/// per-k store: `do_some_math` reads `m1`/`m2`/`m3` as offsets into one base
243/// pointer, so neither can prove the `m3` store doesn't alias the next `k`'s
244/// `m1`/`m2` loads. On x86-64 a `Relaxed` load/store lowers to a plain `MOV` (no
245/// `LOCK`), so the contention is identical to schbench's plain shared-memory race
246/// -- but sound (atomics, no data race), with zero unsafe. `None` (default) is
247/// the legacy all-private single matrix.
248///
249/// ## Pipe mode (`-p`)
250///
251/// `pipe_transfer_bytes > 0` REPLACES the matrix workload with schbench's
252/// memory-transfer simulation (`schbench.c:177`, `pipe_test`). It rides the
253/// message-handshake path: the message thread memsets each woken worker's
254/// per-thread page to `1` (`schbench.c:980-981`) and the worker memsets its own
255/// page to `2` before blocking (`schbench.c:1003-1004`), `pipe_transfer_bytes`
256/// bytes each per handshake cycle (clamped to 1 MiB, `PIPE_TRANSFER_BUFFER`).
257/// `do_work` and the think-sleep are skipped (`schbench.c:1448`), so the only
258/// per-cycle work is the wakeup handshake + the two memsets; the report is the
259/// PER-WORKER memory-transfer throughput (`avg worker transfer` = the aggregate
260/// rate divided by the worker count, `schbench.c:1697,1942-1943,1979`) alongside
261/// the wakeup-latency table, not request latency.
262///
263/// ktstr does NOT compose `-p` with `-R`: in pipe mode it always runs the
264/// message-handshake waker (so BOTH pipe memsets fire — a full transfer) and never
265/// starts the RPS injector. schbench instead COMPOSES them, half-broken: it has no
266/// precedence (`-R` alone picks the waker, `schbench.c:1594`), so `-p -R` runs the
267/// RPS injector while the worker-side memset still fires unconditionally
268/// (`schbench.c:1003-1004`) but the waker-side memset — which lives only in
269/// `xlist_wake_all` (`schbench.c:980-981`) — does not, yielding a degenerate
270/// half-pipe. ktstr's full pipe is the more faithful `-p` behavior; the realistic
271/// use is `-p` without `-R`. schbench also zeroes warmuptime in pipe mode
272/// (`schbench.c:296`); ktstr has no warmuptime concept, so that is a no-op here.
273///
274/// ## Modes not ported
275///
276/// - `-C` (calibrate): a tuning aid that times schbench's own work loop and
277///   forces `-L` (`schbench.c:166`, `:389`). Intentionally out of scope -- ktstr
278///   measures through the metric path.
279#[derive(Clone, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
280#[serde(rename_all = "snake_case")]
281pub struct SchbenchConfig {
282    /// Number of message threads (`schbench.c` `-m`, default 1).
283    pub message_threads: usize,
284    /// Worker threads per message thread (`schbench.c` `-t`). 0 resolves to
285    /// `ceil(cpuset_cpus / message_threads)` -- the CPU count of the allocated
286    /// guest cpuset (the worker's `sched_getaffinity` mask, per ruling) divided
287    /// across the message threads, matching schbench's 0-default
288    /// (`schbench.c:1849-1852`) scoped to the cpuset rather than `get_nprocs`.
289    /// See `resolve_worker_count` and the CLI-parity section above.
290    pub worker_threads: usize,
291    /// Per-worker matrix cache footprint in KiB (`schbench.c` `-F`, default
292    /// 256); sets the matrix dimension.
293    pub cache_footprint_kib: usize,
294    /// Matrix multiplications per work cycle (`schbench.c` `-n`, default 5).
295    pub operations: usize,
296    /// Think-time sleep before the matrix work, microseconds (`schbench.c`
297    /// `-s`, default 100); simulates networking. 0 disables.
298    pub sleep_usec: u64,
299    /// Skip the per-CPU lock around the matrix work (`schbench.c` `-L`,
300    /// default false: locking on).
301    pub skip_locking: bool,
302    /// Fixed request rate, requests/second (`schbench.c` `-R`, default 0 = off).
303    /// 0 selects the default message-handshake mode (each worker is woken by its
304    /// message thread); non-zero switches to the RPS-injector mode, where a
305    /// dedicated thread enqueues `requests_per_sec` requests/second round-robin
306    /// across the workers (`schbench.c` `run_rps_thread`, `:1258`).
307    pub requests_per_sec: usize,
308    /// Auto-RPS target CPU-busy percentage (`schbench.c` `-A`, default 0 = off).
309    /// Non-zero turns on closed-loop rate control: a once-per-second control
310    /// thread grows/shrinks the live request rate toward this host-busy% target
311    /// (`schbench.c` `auto_scale_rps`, `:1180`). Setting it seeds the rate to 10
312    /// when `requests_per_sec` is 0 (`schbench.c:286`), so auto-RPS starts low
313    /// and climbs.
314    pub auto_rps: usize,
315    /// Percent of the cache footprint that is PRIVATE per worker thread
316    /// (`schbench.c` `--split`, long-only, 0-100). `None` = no split:
317    /// schbench's legacy all-private single matrix (`schbench.c:1405-1408`,
318    /// `:1879-1880`). `Some(p)` partitions `cache_footprint_kib` into a
319    /// per-thread private matrix (`p`%) and ONE process-global shared matrix
320    /// (`100-p`%) that every worker multiplies into concurrently, reproducing
321    /// schbench's cross-core shared-working-set cache contention
322    /// (`schbench.c:1390-1404`, `:1858-1863`). `Some(0)` = all shared,
323    /// `Some(100)` = all private (same matrix sizes as `None`, but routed
324    /// through the split branch, matching schbench's `split_specified` path).
325    /// Out-of-range `Some(p > 100)` panics when the engine consumes it
326    /// (`schbench.c:362-365` exits on the same); the builder also debug-asserts
327    /// the bound.
328    pub split_percent: Option<usize>,
329    /// Pipe-mode transfer size in bytes (`schbench.c` `-p`/`--pipe`, default 0 =
330    /// off, clamped to 1 MiB `PIPE_TRANSFER_BUFFER`). Non-zero REPLACES the
331    /// matrix workload with schbench's memory-transfer simulation: the message
332    /// thread memsets each woken worker's per-thread page to `1` and the worker
333    /// memsets its own page to `2` (`schbench.c:980-981`/`:1003-1004`),
334    /// `pipe_transfer_bytes` bytes each per cycle, while `do_work` + the
335    /// think-sleep are skipped (`schbench.c:1448`). Reports PER-WORKER
336    /// memory-transfer throughput (`avg worker transfer`) rather than request
337    /// latency.
338    pub pipe_transfer_bytes: usize,
339}
340
341/// schbench's `PIPE_TRANSFER_BUFFER` (`schbench.c:41`): the per-thread pipe page
342/// is 1 MiB; a larger `pipe_transfer_bytes` is clamped to it (schbench clamps and
343/// warns, `schbench.c:291-294`).
344pub(crate) const PIPE_TRANSFER_BUFFER: usize = 1024 * 1024;
345
346/// Cap on a single RPS-injector pacing sleep (ns). `run_rps_thread` runs
347/// synchronously on the message thread that `run` joins on shutdown, so an
348/// uncapped paced sleep would delay teardown by up to a full inter-arrival
349/// interval — at a degenerate low `requests_per_sec` (interval = 1s / rps) that
350/// can exceed the scenario cleanup/watchdog budget. Capping each sleep at this
351/// quantum re-checks `stop` at least this often, bounding shutdown latency. At
352/// realistic rates the interval is well under this, so a single uncapped wait
353/// results — the cap only engages at pathological low rates. (Schbench-local; the
354/// sibling taobench engine has its own equivalent, both with self-contained
355/// `monotonic_nanos`.)
356const STOP_POLL_QUANTUM_NS: u64 = 50_000_000;
357
358/// schbench's pipe-mode (`-p`) throughput summary — the `avg worker transfer`
359/// line (`schbench.c:1979-1982`): the per-worker transfer rate as ops/sec plus
360/// the pretty-scaled bytes/sec. Built by [`pipe_transfer_report`].
361#[derive(Debug, Clone, Copy, PartialEq)]
362pub struct PipeTransferReport {
363    /// Completed transfer cycles per second, PER WORKER (`loop_count / Σ worker
364    /// runtimes`, `schbench.c:1942-1943`).
365    pub ops_per_sec: f64,
366    /// Bytes transferred per second PER WORKER, scaled into [`unit`](Self::unit)
367    /// (÷1024 per step, `pretty_size`).
368    pub scaled: f64,
369    /// The unit `scaled` is expressed in: `B`/`KB`/`MB`/`GB`/`TB`/`PB`/`EB`
370    /// (`schbench.c:1606`).
371    pub unit: &'static str,
372}
373
374/// Derive the pipe-mode `avg worker transfer` line from a run's aggregate
375/// `achieved_rps` (completed cycles/sec over the true elapsed window), the
376/// requested `pipe_transfer_bytes`, and the resolved `nr_workers`. The figure is
377/// PER WORKER: schbench divides by `loop_runtime` = Σ each worker's runtime
378/// (`schbench.c:1697` sums `worker->runtime`; `:1942-1943`/`:1979` divide by it),
379/// and `Σ worker runtimes ≈ nr_workers * elapsed`, so the per-worker rate is the
380/// aggregate `achieved_rps / nr_workers` — the label is literally "avg WORKER
381/// transfer". (Dividing the aggregate by wall-clock alone would over-report by
382/// `nr_workers`×.) The transfer size is CLAMPED to `PIPE_TRANSFER_BUFFER` first —
383/// the engine moves only the clamped size per cycle (`run` applies the same
384/// `.min()`), matching schbench's parse-time clamp (`schbench.c:291-294`) — so the
385/// throughput reflects the bytes ACTUALLY moved. Scaling is schbench's
386/// `pretty_size` (`schbench.c:1606`). `nr_workers` is floored at 1 (no division by
387/// zero).
388pub fn pipe_transfer_report(
389    achieved_rps: f64,
390    pipe_transfer_bytes: usize,
391    nr_workers: usize,
392) -> PipeTransferReport {
393    let n = nr_workers.max(1) as f64;
394    let bytes = pipe_transfer_bytes.min(PIPE_TRANSFER_BUFFER);
395    // PER-WORKER: schbench's loops_per_sec/mb_per_sec divide the aggregate by
396    // Σ worker runtimes (≈ nr_workers * elapsed), i.e. `achieved_rps / nr_workers`.
397    let ops_per_sec = achieved_rps / n;
398    let bytes_per_sec = achieved_rps * bytes as f64 / n;
399    let (scaled, unit) = pretty_size(bytes_per_sec);
400    PipeTransferReport {
401        ops_per_sec,
402        scaled,
403        unit,
404    }
405}
406
407/// schbench's `pretty_size` (`schbench.c:1606-1620`): scale a byte count by 1024
408/// into B/KB/MB/GB/TB/PB/EB. Stops at the last unit (matches schbench's
409/// `units[divs + 1] == NULL` break — never overflows the table).
410fn pretty_size(mut number: f64) -> (f64, &'static str) {
411    const UNITS: [&str; 7] = ["B", "KB", "MB", "GB", "TB", "PB", "EB"];
412    let mut divs = 0;
413    while number >= 1024.0 && divs + 1 < UNITS.len() {
414        divs += 1;
415        number /= 1024.0;
416    }
417    (number, UNITS[divs])
418}
419
420impl Default for SchbenchConfig {
421    fn default() -> Self {
422        // schbench defaults (schbench.c option table + globals).
423        Self {
424            message_threads: 1,
425            worker_threads: 0,
426            cache_footprint_kib: 256,
427            operations: 5,
428            sleep_usec: 100,
429            skip_locking: false,
430            requests_per_sec: 0,
431            auto_rps: 0,
432            split_percent: None,
433            pipe_transfer_bytes: 0,
434        }
435    }
436}
437
438impl SchbenchConfig {
439    /// Set the number of message threads (schbench `-m`).
440    #[must_use = "builder methods consume self; bind the result"]
441    pub fn message_threads(mut self, n: usize) -> Self {
442        self.message_threads = n;
443        self
444    }
445    /// Set worker threads per message thread (schbench `-t`); 0 = one per
446    /// allocated CPU.
447    #[must_use = "builder methods consume self; bind the result"]
448    pub fn worker_threads(mut self, n: usize) -> Self {
449        self.worker_threads = n;
450        self
451    }
452    /// Set the per-worker matrix cache footprint in KiB (schbench `-F`).
453    #[must_use = "builder methods consume self; bind the result"]
454    pub fn cache_footprint_kib(mut self, kib: usize) -> Self {
455        self.cache_footprint_kib = kib;
456        self
457    }
458    /// Set the matrix multiplications per work cycle (schbench `-n`).
459    #[must_use = "builder methods consume self; bind the result"]
460    pub fn operations(mut self, n: usize) -> Self {
461        self.operations = n;
462        self
463    }
464    /// Set the think-time sleep in microseconds (schbench `-s`); 0 disables.
465    #[must_use = "builder methods consume self; bind the result"]
466    pub fn sleep_usec(mut self, usec: u64) -> Self {
467        self.sleep_usec = usec;
468        self
469    }
470    /// Skip the per-CPU lock around the matrix work (schbench `-L`).
471    #[must_use = "builder methods consume self; bind the result"]
472    pub fn skip_locking(mut self, skip: bool) -> Self {
473        self.skip_locking = skip;
474        self
475    }
476    /// Set the fixed request rate in requests/second (schbench `-R`); 0 selects
477    /// the default message-handshake mode, non-zero the RPS-injector mode.
478    #[must_use = "builder methods consume self; bind the result"]
479    pub fn requests_per_sec(mut self, rps: usize) -> Self {
480        self.requests_per_sec = rps;
481        self
482    }
483    /// Set the auto-RPS target host-busy percentage (schbench `-A`); 0 disables
484    /// auto-scaling. Non-zero seeds the rate to 10 when `requests_per_sec` is 0.
485    #[must_use = "builder methods consume self; bind the result"]
486    pub fn auto_rps(mut self, target_pct: usize) -> Self {
487        self.auto_rps = target_pct;
488        self
489    }
490    /// Set the private/shared cache-footprint split percentage (schbench
491    /// `--split`); `None` (default) = no split, all-private single matrix.
492    /// `Some(p)` requires `p <= 100`: the builder debug-asserts it for early
493    /// feedback, and the engine hard-panics on an out-of-range value at the
494    /// consumption boundary in `run` -- the analog of schbench exiting on a bad
495    /// `--split` (`schbench.c:362-365`).
496    #[must_use = "builder methods consume self; bind the result"]
497    pub fn split_percent(mut self, percent: Option<usize>) -> Self {
498        debug_assert!(
499            percent.is_none_or(|p| p <= 100),
500            "split_percent must be 0..=100"
501        );
502        self.split_percent = percent;
503        self
504    }
505    /// Set the pipe-mode transfer size in bytes (schbench `-p`/`--pipe`); 0
506    /// (default) = off (the matrix workload). Non-zero switches to schbench's
507    /// memory-transfer simulation; values above 1 MiB (`PIPE_TRANSFER_BUFFER`)
508    /// are clamped when the engine consumes it (schbench clamps the same,
509    /// `schbench.c:291-294`).
510    #[must_use = "builder methods consume self; bind the result"]
511    pub fn pipe_transfer_bytes(mut self, bytes: usize) -> Self {
512        self.pipe_transfer_bytes = bytes;
513        self
514    }
515
516    /// Matrix dimension from the cache footprint, identical to schbench
517    /// (`schbench.c:1880`: `sqrt(cache_footprint_kb * 1024 / 3 /
518    /// sizeof(unsigned long))`) and to ktstr's `FanOutCompute` precompute. Zero
519    /// `operations` or `cache_footprint_kib` yields a 0 dimension (no matrix
520    /// work).
521    pub(crate) fn matrix_size(&self) -> usize {
522        if self.operations > 0 && self.cache_footprint_kib > 0 {
523            ((self.cache_footprint_kib * 1024 / 3 / core::mem::size_of::<u64>()) as f64).sqrt()
524                as usize
525        } else {
526            0
527        }
528    }
529
530    /// Shared-matrix dimension for `--split` (`schbench.c:1859,1862`:
531    /// `sqrt(cache_footprint_kb*(100-split)/100 * 1024/3/sizeof(ulong))`).
532    /// `None` split (or 0 `operations`/`cache_footprint_kib`) => 0 (no shared
533    /// matrix); `Some(100)` => 0 (all private, no shared work).
534    pub(crate) fn shared_matrix_size(&self) -> usize {
535        match self.split_percent {
536            Some(p) if self.operations > 0 && self.cache_footprint_kib > 0 => {
537                let shared_kb = self.cache_footprint_kib * (100 - p) / 100;
538                ((shared_kb * 1024 / 3 / core::mem::size_of::<u64>()) as f64).sqrt() as usize
539            }
540            _ => 0,
541        }
542    }
543
544    /// Private (per-worker) matrix dimension. `Some(p)` (`schbench.c:1860,1863`):
545    /// `sqrt(cache_footprint_kb*split/100 * 1024/3/sizeof(ulong))`. `None`
546    /// (legacy): the full-footprint [`matrix_size`](Self::matrix_size)
547    /// (`schbench.c:1880`, the all-private single matrix).
548    pub(crate) fn private_matrix_size(&self) -> usize {
549        match self.split_percent {
550            Some(p) if self.operations > 0 && self.cache_footprint_kib > 0 => {
551                let private_kb = self.cache_footprint_kib * p / 100;
552                ((private_kb * 1024 / 3 / core::mem::size_of::<u64>()) as f64).sqrt() as usize
553            }
554            _ => self.matrix_size(),
555        }
556    }
557
558    /// The effective TOTAL request rate after schbench's startup seed: the user's
559    /// `requests_per_sec`, or 10 when auto-RPS is on and no fixed rate was given
560    /// (`schbench.c:286`: auto-RPS starts at 10 and climbs toward its target).
561    /// Both the mode gate ([`rps_per_message_thread`](Self::rps_per_message_thread))
562    /// and the live auto-scaled injection rate seed from this.
563    pub(crate) fn normalized_total_rps(&self) -> usize {
564        if self.auto_rps != 0 && self.requests_per_sec == 0 {
565            10
566        } else {
567            self.requests_per_sec
568        }
569    }
570
571    /// Per-message-thread request rate driving the RPS-vs-default MODE GATE: the
572    /// normalized total ([`normalized_total_rps`](Self::normalized_total_rps))
573    /// divided across the message threads, each of which runs its own RPS thread.
574    /// schbench divides once at startup (`requests_per_sec /= message_threads`,
575    /// `schbench.c:1899`) BEFORE the per-thread mode gate (`if (requests_per_sec)`,
576    /// `schbench.c:1594`), so the gate sees the divided value: a total below the
577    /// message-thread count rounds to 0 and the thread runs the DEFAULT
578    /// message-handshake mode (not a zero-rate RPS). The gate is decided ONCE from
579    /// this seeded value; the actual injection rate is the live auto-scaled total
580    /// (which may climb above the seed). Integer division drops the remainder,
581    /// matching schbench -- so auto-RPS with more than 10 message threads (seed
582    /// 10 / m = 0) degenerates to default mode, exactly as schbench does.
583    pub(crate) fn rps_per_message_thread(&self) -> usize {
584        self.normalized_total_rps() / self.message_threads.max(1)
585    }
586
587    /// Effective per-message-thread worker count for [`run`]. A non-zero
588    /// `worker_threads` is workers-per-message-thread as-is; the `0` default
589    /// mirrors schbench's `-t` 0-default (`schbench.c:1849-1852`:
590    /// `worker_threads = (num_cpus + message_threads - 1) / message_threads`)
591    /// but over the allocated guest cpuset (`allowed_count` from
592    /// [`resolve_cpu_topology`], per the ktstr cpuset ruling) instead of
593    /// `get_nprocs`: it divides the cpuset CPU count across the message threads
594    /// (ceil), so the TOTAL worker count (`message_threads * this`) stays ≈ the
595    /// cpuset CPU count -- matching schbench's total ≈ `num_cpus`.
596    /// `message_threads` floors at 1 to avoid a zero divisor; the divide is a
597    /// no-op at the default `message_threads = 1`.
598    pub(crate) fn resolve_worker_count(&self, allowed_count: usize) -> usize {
599        if self.worker_threads != 0 {
600            return self.worker_threads;
601        }
602        allowed_count.div_ceil(self.message_threads.max(1))
603    }
604}
605
606/// Per-thread state, the Rust counterpart of schbench's `struct thread_data`
607/// (`schbench.c:766`). Shared across threads by raw pointer for the lockless
608/// wait-list, so cross-thread access is restricted to the atomic fields
609/// (`next`, `futex`, `wake_time`); the histogram fields are owned solely by the
610/// worker thread the `ThreadData` belongs to.
611pub(crate) struct ThreadData {
612    /// Treiber-stack link for the message thread's wait-list
613    /// (schbench `thread_data->next`, `:776`). Null when not queued.
614    next: AtomicPtr<ThreadData>,
615    /// Wake handshake futex (`schbench.c` `thread_data->futex`, `:791`).
616    futex: super::handshake::Handshake,
617    /// Monotonic-ns timestamp the waker stamps just before posting, so the
618    /// woken thread can measure scheduler wakeup latency (schbench `wake_time`,
619    /// `:788`, stamped in `xlist_wake_all` `:984`).
620    wake_time: AtomicU64,
621    /// Wakeup-latency histogram (`schbench.c` `wakeup_stats`, `:794`).
622    /// Owner-thread-only; see the `Sync` impl SAFETY note.
623    wakeup_stats: UnsafeCell<PlatStats>,
624    /// Request (work-cycle) latency histogram (`schbench.c` `request_stats`,
625    /// `:795`). Owner-thread-only.
626    request_stats: UnsafeCell<PlatStats>,
627    /// Mean per-schedule run-queue wait (ns), read from `/proc/<tid>/schedstat`
628    /// at thread exit (schbench's `read_sched_delay`, `:1118`). Owner-only.
629    /// Feeds the WHOLE-RUN `SchbenchResult` (mean-of-means, matching real
630    /// schbench for the side-by-side validation); the per-phase delay comes from
631    /// [`Self::phase_snapshots`] instead.
632    sched_delay_ns: UnsafeCell<u64>,
633    /// Per-phase snapshots this thread accumulated via drain-on-change against
634    /// the shared `phase_epoch` (one [`PhaseSnapshot`] per phase the thread did
635    /// work in, plus a final in-flight phase at exit). Owner-thread-only,
636    /// drained by the main thread after join — same happens-before as the
637    /// histogram cells. Empty when run non-phasic (`phase_epoch == None`) until
638    /// the single end-of-run drain.
639    phase_snapshots: UnsafeCell<Vec<PhaseSnapshot>>,
640    /// RPS-injector mode: this worker's pending request queue. The RPS thread
641    /// pushes heap-allocated [`Request`]s (round-robin across workers); the
642    /// owning worker drains via `splice_reversed` (FIFO) and frees each. Unused
643    /// in the default message-handshake mode (`requests_per_sec == 0`).
644    requests: TreiberStack<Request>,
645    /// RPS-injector backpressure counter (`schbench.c` `thread_data->pending`,
646    /// `:779`): incremented by the RPS thread per enqueue, reset to 0 by the
647    /// owning worker at each splice. The RPS thread stops enqueuing to a worker
648    /// whose `pending` exceeds the batch cap (`:1284`).
649    pending: AtomicU64,
650    /// Pipe-mode (`-p`) transfer size in bytes (0 = not pipe mode); the length of
651    /// [`Self::pipe_page`] and the per-cycle memset. Plain `Copy`, set once at
652    /// construction, read-only thereafter.
653    pipe_bytes: usize,
654    /// Pipe-mode per-thread transfer page (`schbench.c` `thread_data->pipe_page`,
655    /// `:801`). The worker memsets it to `2` before blocking and the waker memsets
656    /// it to `1` when waking the worker; the bytes are never read back (the point
657    /// is the memory-transfer cost). `UnsafeCell` because the WAKER writes the
658    /// worker's page (cross-thread) -- see the `Sync` SAFETY note for the
659    /// wait-list/futex ordering that keeps it race-free. Empty unless pipe mode.
660    pipe_page: UnsafeCell<Box<[u8]>>,
661}
662
663/// One thread's latency + run-queue-delay accumulation over a single phase
664/// (the window between two `phase_epoch` transitions). Built on the owning
665/// thread at each drain-on-change boundary; the histograms are `take`n from the
666/// live cells (snapshot-and-reset) and the run-delay is a RAW
667/// `/proc/<tid>/schedstat` `(run_delay, pcount)` DELTA over the phase, so the
668/// host re-derives the per-phase mean as `Σrun_delay / Σpcount` (the
669/// sample-weighted pooled mean — a deliberate divergence from schbench's
670/// whole-run mean-of-means, correct for a heterogeneous per-phase thread set).
671/// Message threads leave `wakeup`/`request` empty and `loop_count` 0 (they
672/// carry only run-delay); workers fill all fields.
673struct PhaseSnapshot {
674    /// The `phase_epoch` value active while this snapshot's samples were
675    /// recorded (NOT the value at drain time). `0` = BASELINE, `u32::MAX` =
676    /// inter-step gap; both are emitted as-is and discarded host-side.
677    epoch: u32,
678    wakeup: PlatStats,
679    request: PlatStats,
680    /// `/proc/<tid>/schedstat` field 2 (run_delay ns) delta over this phase.
681    run_delay_ns: u64,
682    /// `/proc/<tid>/schedstat` field 3 (pcount = timeslices) delta over this
683    /// phase. The host guards `pcount == 0` → metric absent (never a div-by-zero
684    /// 0), matching [`mean_sched_delay`].
685    pcount: u64,
686    /// Completed work cycles this worker ran in this phase (0 for the message
687    /// thread).
688    loop_count: u64,
689}
690
691/// Per-phase, cross-thread aggregate for one `phase_epoch`: the wakeup + request
692/// histograms merged across every worker that ran in the phase, plus the
693/// run-delay raw pairs split by thread class (message vs worker). The per-phase
694/// wire carrier — rides inside [`crate::workload::PhaseSlice`] guest→host. All
695/// fields are integer, so the host re-pools across workers/cgroups by
696/// [`PlatStats::combine`] (histogram add) + integer sums; percentiles are
697/// re-derived from the merged histogram, NEVER averaged.
698///
699/// ESTIMATOR NOTE: the per-phase run-delay mean the host derives from
700/// `*_run_delay_ns / *_pcount` is SAMPLE-WEIGHTED (`Σrun_delay / Σpcount`), a
701/// DIFFERENT estimator from the whole-run [`SchbenchResult`]'s `sched_delay_*`,
702/// which keeps schbench's mean-of-per-thread-means (`collect_sched_delay`) for
703/// schbench parity. They measure different things by design — never cross-compare a
704/// per-phase value against a whole-run threshold (or vice-versa). `pcount == 0`
705/// for a class means that class was never scheduled in the phase → the host
706/// emits the metric as ABSENT, not `0`.
707#[derive(Debug, Clone, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
708pub(crate) struct SchbenchPhaseStats {
709    /// Wakeup-latency histogram merged across the phase's workers.
710    pub(crate) wakeup: PlatStats,
711    /// Request-latency histogram merged across the phase's workers.
712    pub(crate) request: PlatStats,
713    /// Per-phase achieved-RPS distribution: the control thread's per-second
714    /// samples attributed to this phase's epoch (the whole-run rps is unchanged;
715    /// this is purely additive). A 1s sample straddling an epoch boundary lands
716    /// wholly in the sample-time epoch (bounded fuzz, <=1 sample/transition, like
717    /// the msg-thread late-drain). Empty for a phase shorter than the ~1s control
718    /// cadence -- the host gates on `sample_count() > 0` so a sub-second phase
719    /// reads ABSENT, never rps=0. A rate; re-derived from the merged histogram.
720    pub(crate) rps: PlatStats,
721    /// Σ message-thread `run_delay` (ns) over the phase.
722    pub(crate) msg_run_delay_ns: u64,
723    /// Σ message-thread `pcount` over the phase (the mean's denominator).
724    pub(crate) msg_pcount: u64,
725    /// Σ worker `run_delay` (ns) over the phase.
726    pub(crate) worker_run_delay_ns: u64,
727    /// Σ worker `pcount` over the phase (the mean's denominator).
728    pub(crate) worker_pcount: u64,
729    /// Σ completed work cycles across the phase's workers.
730    pub(crate) loop_count: u64,
731}
732
733impl SchbenchPhaseStats {
734    /// Merge `other` into `self`: combine the wakeup + request histograms
735    /// (bucket-count addition, [`PlatStats::combine`]) and integer-add the
736    /// run-delay raw pairs + loop_count. Associative AND commutative (combine +
737    /// `saturating_add` both are), so pooling is order-independent — the SAME
738    /// operation whether pooling per-engine across message threads (in [`run`])
739    /// or per-cgroup across workers host-side
740    /// (`crate::assert::PhaseCgroupStats::merge`). Percentiles are NEVER
741    /// merged; the merged histogram is re-derived to percentiles by the reader.
742    pub(crate) fn merge(&mut self, other: &SchbenchPhaseStats) {
743        self.wakeup.combine(&other.wakeup);
744        self.request.combine(&other.request);
745        self.rps.combine(&other.rps);
746        self.msg_run_delay_ns = self.msg_run_delay_ns.saturating_add(other.msg_run_delay_ns);
747        self.msg_pcount = self.msg_pcount.saturating_add(other.msg_pcount);
748        self.worker_run_delay_ns = self
749            .worker_run_delay_ns
750            .saturating_add(other.worker_run_delay_ns);
751        self.worker_pcount = self.worker_pcount.saturating_add(other.worker_pcount);
752        self.loop_count = self.loop_count.saturating_add(other.loop_count);
753    }
754}
755
756impl Linked for ThreadData {
757    fn next_link(&self) -> &AtomicPtr<Self> {
758        &self.next
759    }
760}
761
762// SAFETY: `ThreadData` is shared across threads only via the lockless wait-list
763// and (in RPS mode) the per-worker request queue, whose operations touch
764// exclusively the atomic fields (`next`, `futex`, `wake_time`, `requests`,
765// `pending`) -- all internally synchronized. `requests` is single-producer /
766// single-consumer: the RPS thread is the only pusher to a given worker's queue
767// (round-robin assignment) and the owning worker is the only drainer, with the
768// push's Release CAS / drain's Acquire swap establishing happens-before;
769// `pending` is plain atomic counting. The `UnsafeCell` fields (`wakeup_stats`,
770// `request_stats`, `sched_delay_ns`, `phase_snapshots`) are written and read
771// ONLY by the single worker thread that owns this `ThreadData`, and only the
772// main thread reads/drains them after all workers have joined (a happens-before
773// via the join). The per-phase drain (`worker_loop` / `run_msg_thread`) `take`s
774// and pushes into the owning thread's own cells, never another's. The `pipe_page`
775// cell (pipe mode) is the one cross-thread `UnsafeCell`: the owning worker memsets
776// it (to 2) and the message thread memsets it (to 1). They never overlap, via two
777// happens-before edges:
778//   - SAME cycle (worker fill-2 vs waker fill-1): the worker's fill-2 is SEQUENCED
779//     BEFORE its `wait_list.add` (Release CAS), and the waker's `splice` (Acquire
780//     swap) observes that add, giving fill-2 -> add -> splice -> waker fill-1. The
781//     load-bearing fact is "fill-2 precedes add", which holds UNCONDITIONALLY --
782//     including the no-block shutdown path (stop set, worker skips the futex wait):
783//     the worker still added itself after fill-2, so a waker that then splices and
784//     fills it is ordered after the fill-2, and the worker's exit path never
785//     touches the page again. Blocking is NOT what makes it safe.
786//   - NEXT cycle (worker's fill-2 vs the waker's prior fill-1): the futex
787//     post/consume CAS (SeqCst, `Handshake`) orders waker fill-1 -> post -> the
788//     worker's consume -> its next fill-2; the worker cannot reach its next fill-2
789//     until it consumes the token the waker published after fill-1.
790// So the two threads never touch `pipe_page` concurrently. No two threads ever
791// touch a cell concurrently, so sharing `&ThreadData` across threads is sound.
792unsafe impl Sync for ThreadData {}
793
794impl ThreadData {
795    fn new(pipe_bytes: usize) -> Self {
796        Self {
797            next: AtomicPtr::new(ptr::null_mut()),
798            futex: super::handshake::Handshake::new(),
799            wake_time: AtomicU64::new(0),
800            wakeup_stats: UnsafeCell::new(PlatStats::default()),
801            request_stats: UnsafeCell::new(PlatStats::default()),
802            sched_delay_ns: UnsafeCell::new(0),
803            phase_snapshots: UnsafeCell::new(Vec::new()),
804            requests: TreiberStack::new(),
805            pending: AtomicU64::new(0),
806            pipe_bytes,
807            pipe_page: UnsafeCell::new(vec![0u8; pipe_bytes].into_boxed_slice()),
808        }
809    }
810
811    /// Pipe-mode transfer: fill this thread's `pipe_page` (`pipe_bytes` bytes) with
812    /// `val`. The page is never read back, so the fill would be DCE-eligible. The
813    /// trailing `black_box(page.as_ptr())` ESCAPES THE ADDRESS (not a loaded value):
814    /// the optimizer must then assume the opaque call may read through the pointer,
815    /// so every byte of the fill must be in place at that point -- the fill is
816    /// retained. This is stronger than `black_box(value)`, which only keeps a value
817    /// live and (per the optimization-resistance note in `worker::worker_main`) does
818    /// NOT by itself stop a backing STORE from being elided -- which is why the
819    /// cache-pressure workloads there use `write_volatile`. Here the faithful
820    /// schbench op is a memset (`schbench.c:980-981`/`:1003-1004`) and `slice::fill`
821    /// lowers to one; a `write_volatile` byte loop would not. The memory traffic IS
822    /// the workload.
823    ///
824    /// SAFETY: the caller guarantees no concurrent access to `pipe_page`. The only
825    /// callers are the owning worker (before it adds itself to the wait-list) and
826    /// the message thread (after it splices the worker off the wait-list); the
827    /// load-bearing edge is fill-2-before-add (holding even on the no-block
828    /// shutdown path), so the add/splice + futex handshake order these with no
829    /// overlap (see the `Sync` SAFETY note).
830    unsafe fn pipe_fill(&self, val: u8) {
831        // SAFETY: exclusive access per the caller contract; the box lives for the
832        // `ThreadData`'s lifetime.
833        let page = unsafe { &mut *self.pipe_page.get() };
834        page.fill(val);
835        std::hint::black_box(page.as_ptr());
836    }
837}
838
839/// schbench's per-request think-time sleep ("simulated networking",
840/// `schbench.c:1461`). This is the workload's defined behavior, not a
841/// synchronization wait: `usleep` maps to `clock_nanosleep`, matching
842/// schbench's syscall profile.
843fn think_sleep(usec: u64) {
844    std::thread::sleep(std::time::Duration::from_micros(usec));
845}
846
847/// schbench's `--split` op partition (`schbench.c:1391-1392`): of `operations`
848/// total, `p`% run on the private matrix and the rest on the shared matrix.
849/// Returns `(ops_shared, ops_private)`. Extracted so the integer-division split
850/// is pinned by a unit test rather than re-implemented inline in `do_work`.
851fn ops_split(operations: usize, p: usize) -> (usize, usize) {
852    let ops_private = (operations * p) / 100;
853    let ops_shared = operations - ops_private;
854    (ops_shared, ops_private)
855}
856
857/// One work cycle's matrix multiplications under the per-CPU lock. Faithful to
858/// schbench's `do_work` (`schbench.c:1379-1413`): take the current CPU's mutex
859/// (unless `skip_locking`), run `operations` matrix multiplies, unlock. With
860/// `split_percent` set, the ops are partitioned (`schbench.c:1390-1404`):
861/// `ops_shared` run on the ONE process-global shared matrix (every worker
862/// contends), then `ops_private` on this worker's private buffer; `None` is the
863/// legacy all-private single matrix (`:1406-1408`). The guard drops at the end,
864/// holding the lock across all operations exactly as schbench does (`:1387`/
865/// `:1411`).
866#[allow(clippy::too_many_arguments)]
867fn do_work(
868    private_buf: &mut [u64],
869    private_matrix_size: usize,
870    shared_buf: &[core::sync::atomic::AtomicU64],
871    shared_matrix_size: usize,
872    split_percent: Option<usize>,
873    operations: usize,
874    locks: Option<&PerCpuLocks>,
875    work_units: &mut u64,
876) {
877    let _guard = locks.map(|l| l.lock_this_cpu());
878    match split_percent {
879        Some(p) => {
880            let (ops_shared, ops_private) = ops_split(operations, p);
881            // schbench.c:1395-1398: shared ops first, on the global matrix.
882            if shared_matrix_size > 0 && ops_shared > 0 {
883                for _ in 0..ops_shared {
884                    crate::workload::worker::matrix_multiply_shared(
885                        shared_buf,
886                        shared_matrix_size,
887                        work_units,
888                    );
889                }
890            }
891            // schbench.c:1401-1404: private ops second, on this worker's buffer.
892            if private_matrix_size > 0 && ops_private > 0 {
893                for _ in 0..ops_private {
894                    crate::workload::worker::matrix_multiply(
895                        private_buf,
896                        private_matrix_size,
897                        work_units,
898                    );
899                }
900            }
901        }
902        None => {
903            // schbench.c:1406-1408: legacy all-private single matrix.
904            for _ in 0..operations {
905                if private_matrix_size > 0 {
906                    crate::workload::worker::matrix_multiply(
907                        private_buf,
908                        private_matrix_size,
909                        work_units,
910                    );
911                }
912            }
913        }
914    }
915}
916
917/// Worker side of one wakeup cycle. Faithful to schbench's `msg_and_wait`
918/// (`schbench.c:997`, default branch): stamp our `wake_time`, push ourselves
919/// onto the message thread's wait-list, wake the message thread, then block
920/// until it wakes us back; record the wakeup (scheduler) latency. The `!stop`
921/// guard mirrors schbench's `if (!stopping)` (`schbench.c:1030`) so we do not
922/// block during shutdown.
923fn msg_and_wait(
924    td: &ThreadData,
925    msg_td: &ThreadData,
926    wait_list: &TreiberStack<ThreadData>,
927    stop: &AtomicBool,
928) {
929    // Our futex is BLOCKED here (consumed by the prior wait, or fresh).
930    // Pipe mode: write our transfer page to 2 (schbench memsets td->pipe_page
931    // before blocking, schbench.c:1003-1004). Done before we enqueue on the
932    // wait-list, so the waker's fill-to-1 (after it splices us off) never races --
933    // see the Sync SAFETY note.
934    if td.pipe_bytes > 0 {
935        // SAFETY: we own this page and have not yet enqueued, so the waker cannot
936        // touch it concurrently.
937        unsafe { td.pipe_fill(2) };
938    }
939    td.wake_time.store(monotonic_nanos(), Ordering::Release);
940    wait_list.add(td as *const ThreadData as *mut ThreadData);
941    msg_td.futex.post();
942    if !stop.load(Ordering::Acquire) {
943        td.futex.wait_forever();
944    }
945    let now = monotonic_nanos();
946    let wake = td.wake_time.load(Ordering::Acquire);
947    // schbench buckets in microseconds (gettimeofday resolution); the monotonic
948    // clock gives ns, so divide. `if delta > 0` matches schbench (`:1036`).
949    let delta_us = now.saturating_sub(wake) / 1000;
950    if delta_us > 0 {
951        // SAFETY: only this worker thread accesses its own wakeup_stats cell.
952        unsafe { (*td.wakeup_stats.get()).add_lat(delta_us.min(u32::MAX as u64) as u32) };
953    }
954}
955
956/// Worker side of one RPS-injector cycle. Faithful to schbench's `msg_and_wait`
957/// RPS branch (`schbench.c:1007-1037`): stamp our own `wake_time`, reset
958/// `pending`, then splice this worker's request queue (FIFO). If non-empty,
959/// return the chain immediately (fast path, no wait, no wakeup sample --
960/// `:1014-1017`). If empty, block on this worker's own futex until the RPS thread
961/// posts it, record the wakeup (scheduler) latency, then splice whatever it
962/// queued (null on a spurious wake or the shutdown wake-all). Unlike
963/// [`msg_and_wait`], the worker does NOT enqueue on the message thread's
964/// wait-list or post a message thread -- the RPS thread is the sole waker.
965///
966/// `wake_time` is stamped HERE before the splice (schbench `:1008`) AND by the
967/// RPS thread at enqueue (`:1294`). On a TRUE block the RPS thread's stamp wins
968/// (it overwrites ours before posting), so `now - wake_time` is the enqueue->run
969/// scheduler latency. But on a FAST wake -- a leftover RUNNING futex token from a
970/// request the worker already drained via splice (the word is consumed by splice,
971/// not by `wait`, so it stays RUNNING) -- the RPS thread does NOT re-stamp, so our
972/// own stamp makes the delta measure THIS block decision (near-zero), not the
973/// stale enqueue time of the already-served request. Omitting it inflates the
974/// fast-wake samples to the full inter-request gap (the wakeup-p50 bug: 543ms
975/// vs schbench's 7µs).
976fn rps_wait(td: &ThreadData, stop: &AtomicBool) -> *mut Request {
977    // Stamp our own wake_time before splicing (schbench `:1008`); see the doc for
978    // why this is what keeps fast-wake samples honest.
979    td.wake_time.store(monotonic_nanos(), Ordering::Release);
980    // Reset the backpressure counter before draining (schbench `td->pending = 0`,
981    // `:1012`): the RPS thread stops enqueuing while pending exceeds the batch
982    // cap, so clearing it re-opens this worker for the next batch.
983    td.pending.store(0, Ordering::Release);
984    let chain = td.requests.splice_reversed();
985    if !chain.is_null() {
986        return chain; // work already queued: serve it without blocking
987    }
988    if stop.load(Ordering::Acquire) {
989        return ptr::null_mut();
990    }
991    // Empty queue: block until the RPS thread posts our futex, then `now -
992    // wake_time` is the scheduler wakeup latency (schbench `fwait` NULL then
993    // add_lat, `:1032`/`:1034-1037`).
994    td.futex.wait_forever();
995    // Shutdown wake: if stop is set, the wake came from run_rps_thread's stop-path
996    // wake-all (`:1308-1312`), which posts every worker WITHOUT restamping
997    // wake_time. A worker blocked between bursts when stop fires would otherwise
998    // record `now - (its own stale block-time stamp)` -- the whole inter-burst
999    // gap -- as a phantom wakeup, one garbage tail sample per worker. That is not
1000    // a request-driven wakeup, so skip the sample (schbench's `if (!stopping)
1001    // fwait` guard, `:1030`, encodes the same intent: shutdown is not a latency
1002    // event). Return any leftover requests for the caller to free.
1003    if stop.load(Ordering::Acquire) {
1004        return td.requests.splice_reversed();
1005    }
1006    let now = monotonic_nanos();
1007    let wake = td.wake_time.load(Ordering::Acquire);
1008    let delta_us = now.saturating_sub(wake) / 1000;
1009    if delta_us > 0 {
1010        // SAFETY: only this worker thread accesses its own wakeup_stats cell.
1011        unsafe { (*td.wakeup_stats.get()).add_lat(delta_us.min(u32::MAX as u64) as u32) };
1012    }
1013    // Splice whatever the RPS thread enqueued before posting us (null if the post
1014    // was a spurious wake with nothing queued).
1015    td.requests.splice_reversed()
1016}
1017
1018/// Free a chain of [`Request`]s spliced off a worker's queue but not processed
1019/// (the shutdown path, and the post-join cleanup of any requests the RPS thread
1020/// enqueued after a worker's last splice). Each node was `Box::into_raw`'d by the
1021/// RPS thread; the draining/cleaning thread owns them exclusively after the
1022/// splice, so each is freed exactly once.
1023fn free_request_chain(mut req: *mut Request) {
1024    while !req.is_null() {
1025        // SAFETY: `req` is a node taken via `splice_reversed` (exclusive
1026        // ownership); freed exactly once.
1027        let next = unsafe { (*req).next.load(Ordering::Acquire) };
1028        drop(unsafe { Box::from_raw(req) });
1029        req = next;
1030    }
1031}
1032
1033/// Wake every worker on the wait-list, stamping each one's `wake_time` so it
1034/// can measure scheduler latency. Faithful to schbench's `xlist_wake_all`
1035/// (`schbench.c:969`): splice the whole list, read the clock ONCE, stamp every
1036/// worker with that single time, then post each. The single clock read is what
1037/// detects the scheduler preempting the waker mid-batch (`schbench.c:961-964`).
1038/// Pipe mode (`-p`) diverges, matching schbench (`schbench.c:980-984`): it memsets
1039/// each worker's transfer page to `1` and re-reads the clock PER worker (so the
1040/// memset cost is not charged to that worker's wakeup latency).
1041fn wake_all(wait_list: &TreiberStack<ThreadData>) {
1042    let mut cur = wait_list.splice();
1043    let now = monotonic_nanos();
1044    while !cur.is_null() {
1045        // SAFETY: `cur` is a worker `ThreadData` alive for the whole run; only
1046        // its atomic fields (next / wake_time / futex) are touched here.
1047        let td = unsafe { &*cur };
1048        let next = td.next.load(Ordering::Acquire);
1049        td.next.store(ptr::null_mut(), Ordering::Relaxed);
1050        if td.pipe_bytes > 0 {
1051            // Pipe mode: write this woken worker's transfer page to 1 (schbench
1052            // memsets list->pipe_page in the wake loop, schbench.c:980-981). We
1053            // spliced it off the wait-list and it is still blocked (our `post`
1054            // below wakes it), so no concurrent access -- see the Sync SAFETY note.
1055            // SAFETY: `td` is off the wait-list and blocked here.
1056            unsafe { td.pipe_fill(1) };
1057            // schbench re-reads the clock per worker in pipe mode (after the
1058            // memset, schbench.c:982) so the fill is not charged to the worker's
1059            // wakeup latency; non-pipe uses the single batched `now`.
1060            td.wake_time.store(monotonic_nanos(), Ordering::Release);
1061        } else {
1062            td.wake_time.store(now, Ordering::Release);
1063        }
1064        td.futex.post();
1065        cur = next;
1066    }
1067}
1068
1069/// The message thread's loop. Faithful to schbench's `run_msg_thread`
1070/// (`schbench.c:1166`): batch-wake all waiting workers, then block until a
1071/// worker posts us back. On stop, drain once more (to wake any worker that
1072/// queued during the final wake) and exit.
1073///
1074/// `phase_epoch` drives the per-phase drain exactly as in [`worker_loop`], but
1075/// the message thread records only per-phase RUN-DELAY (its `wakeup`/`request`
1076/// histograms stay empty, taken empty by [`drain_phase`]). It polls the epoch
1077/// after each wake cycle; a phase in which no worker ever posts it leaves that
1078/// phase's run-delay drained late at the next wake (bounded boundary fuzz,
1079/// matching the worker drain).
1080fn run_msg_thread(
1081    msg_td: &ThreadData,
1082    wait_list: &TreiberStack<ThreadData>,
1083    stop: &AtomicBool,
1084    phase_epoch: Option<&AtomicU32>,
1085) {
1086    let tid = gettid_self();
1087    let mut cur_epoch = phase_epoch.map_or(0, |e| e.load(Ordering::Relaxed));
1088    let mut phase_ss_start = read_schedstat_raw(tid);
1089    loop {
1090        // msg_td.futex is BLOCKED here (consumed by the prior wait, or fresh).
1091        wake_all(wait_list);
1092        if stop.load(Ordering::Acquire) {
1093            wake_all(wait_list);
1094            break;
1095        }
1096        msg_td.futex.wait_forever();
1097        if let Some(pe) = phase_epoch {
1098            let new_epoch = pe.load(Ordering::Relaxed);
1099            if new_epoch != cur_epoch {
1100                let ss_end = read_schedstat_raw(tid);
1101                // SAFETY: this is the thread that owns msg_td (the coordinator
1102                // thread of run_one_message_thread).
1103                unsafe { drain_phase(msg_td, cur_epoch, phase_ss_start, ss_end, 0) };
1104                cur_epoch = new_epoch;
1105                phase_ss_start = ss_end;
1106            }
1107        }
1108    }
1109    // Final drain: close the still-open phase (the sole snapshot when non-phasic).
1110    let ss_end = read_schedstat_raw(tid);
1111    // SAFETY: this is the thread that owns msg_td.
1112    unsafe { drain_phase(msg_td, cur_epoch, phase_ss_start, ss_end, 0) };
1113    // Record the message thread's whole-run mean run-queue wait (`schbench.c:1664`),
1114    // reusing the cumulative `ss_end` pair just read — one /proc read, so the mean
1115    // and the final-phase delta derive from one consistent snapshot.
1116    // SAFETY: owner-only access to msg_td's sched_delay_ns cell.
1117    unsafe { *msg_td.sched_delay_ns.get() = mean_sched_delay(ss_end) };
1118}
1119
1120/// The RPS-injector dispatcher loop, replacing [`run_msg_thread`] when
1121/// `requests_per_sec != 0`. Each second it enqueues `requests_per_sec` requests
1122/// round-robin across this group's `workers`, skipping a worker whose `pending`
1123/// exceeds the batch cap (backpressure, `schbench.c:1284-1290`); on stop, it
1124/// wakes every worker and exits. Runs on the message thread's calling thread, so
1125/// — like [`run_msg_thread`] — it records the dispatcher thread's per-phase +
1126/// whole-run run-queue wait into `msg_td` (the "message thread delay" in RPS
1127/// mode, `schbench.c:1664-1670`).
1128///
1129/// DIVERGENCE from schbench's `run_rps_thread` (`schbench.c:1258-1314`),
1130/// intentional: schbench FRONT-LOADS the second (bursts all N enqueues, then
1131/// `usleep`s to fill the rest of the second) and relies on its workers
1132/// interleaving with the burst to keep each queue under the cap. That interleave
1133/// is scheduling-dependent; ktstr's burst instead front-ran the workers (the
1134/// first splice grabbed a large batch the worker drained to completion before
1135/// re-splicing), so `pending` stayed pegged and the injector dropped the
1136/// remainder every second -- a ~BATCH/worker supply cap that under-delivered the
1137/// requested rate (e.g. ~257/s at a 400/s target). ktstr instead PACES the
1138/// enqueues evenly across the second (one every `1s/N`), which forces the
1139/// injector to interleave with the workers deterministically: `pending` drains
1140/// between enqueues and the full rate is delivered up to worker capacity. Above
1141/// capacity the pacing collapses to a flat-out loop and the `BATCH` backpressure
1142/// bounds the queue, matching schbench's bounded-backlog overload behavior. The
1143/// pacing sleep and the bounded backpressure usleep are workload-defined `-R`
1144/// cadence (like the per-request think-sleep), not synchronization waits. Uses
1145/// `CLOCK_MONOTONIC` (ruling), not schbench's `gettimeofday`.
1146///
1147/// Verified by `ktstr-schbench-validate` (release, standalone), not a nextest
1148/// unit test: the under-delivery signal only appears when worker capacity exceeds
1149/// the front-load cap (~2*BATCH/s), which requires a release build (debug matrix
1150/// work caps capacity well below that) on dedicated CPU (nextest's parallel
1151/// execution starves the workers, collapsing both the paced and a hypothetical
1152/// reverted injector to the same capacity-limited rate). The fixed `-R` axis in
1153/// the schbench validation comparison is the regression evidence.
1154fn run_rps_thread(
1155    workers: &[ThreadData],
1156    msg_td: &ThreadData,
1157    stop: &AtomicBool,
1158    phase_epoch: Option<&AtomicU32>,
1159    live_rate: &AtomicUsize,
1160) {
1161    let tid = gettid_self();
1162    let mut cur_epoch = phase_epoch.map_or(0, |e| e.load(Ordering::Relaxed));
1163    let mut phase_ss_start = read_schedstat_raw(tid);
1164    // schbench's per-worker queue-depth cap before the injector backs off
1165    // (`schbench.c:1267`).
1166    const BATCH: u64 = 128;
1167    const ONE_SEC_NS: u64 = 1_000_000_000;
1168    let worker_count = workers.len().max(1);
1169    let mut cur_tid: usize = 0;
1170    while !stop.load(Ordering::Acquire) {
1171        // Re-read the live per-message-thread rate each second. `live_rate` is
1172        // already the per-thread value (schbench's post-`/= message_threads`
1173        // global `requests_per_sec`, `:1899`), injected directly with no further
1174        // division (`schbench.c:1290`). Auto-RPS retargets it from the control
1175        // thread; a fixed `-R` leaves it constant.
1176        let requests_per_sec = live_rate.load(Ordering::Relaxed);
1177        let start = monotonic_nanos();
1178        // PACE the enqueues evenly across the second rather than front-loading
1179        // them. schbench bursts all N then fills the rest of the second
1180        // (`schbench.c:1300-1306`), relying on its workers interleaving with the
1181        // burst to keep each worker's queue under the `BATCH` cap. ktstr's burst
1182        // front-ran the workers: the first splice grabbed a large batch the worker
1183        // processed to completion before re-splicing, so `pending` stayed pegged
1184        // and the injector dropped the remainder every second (a steady-state
1185        // ~BATCH/worker supply cap -> only ~257/s at a 400/s target). Pacing one
1186        // enqueue every `1s/N` forces the injector to interleave with the workers,
1187        // so `pending` drains between enqueues and the full rate is delivered up
1188        // to worker capacity. Above capacity `due` falls behind real time, the
1189        // pacing sleep collapses to zero, and the `BATCH` backpressure bounds the
1190        // queue -- matching schbench's overload behavior (a bounded backlog).
1191        let interval_ns = if requests_per_sec > 0 {
1192            ONE_SEC_NS / requests_per_sec as u64
1193        } else {
1194            ONE_SEC_NS
1195        };
1196        let mut due = start;
1197        for _ in 0..requests_per_sec {
1198            if stop.load(Ordering::Acquire) {
1199                break;
1200            }
1201            // Wait until this enqueue's scheduled time. A no-op once `due` falls
1202            // behind real time (rate above what the workers can drain), so the
1203            // loop then runs flat-out and the backpressure below bounds the queue.
1204            // `due` is re-based to `start` each second and accrues at most
1205            // N*(ONE_SEC_NS/N) ~= 1s within a pass, so saturating_add never wraps
1206            // (the saturating form documents that the per-pass re-base is the
1207            // load-bearing invariant).
1208            due = due.saturating_add(interval_ns);
1209            // Pace to this enqueue's scheduled time, but cap each sleep at
1210            // STOP_POLL_QUANTUM_NS and re-check `stop`, so a shutdown during a long
1211            // inter-arrival gap (a degenerate low rate, where the interval can
1212            // exceed the cleanup/watchdog budget) is observed within the quantum
1213            // instead of after a full interval — mirroring the taobench open-loop
1214            // pacing. A stop observed here breaks the enqueue loop BEFORE issuing
1215            // this slot's request.
1216            let mut stop_during_pace = false;
1217            loop {
1218                if stop.load(Ordering::Acquire) {
1219                    stop_during_pace = true;
1220                    break;
1221                }
1222                let now = monotonic_nanos();
1223                if now >= due {
1224                    break;
1225                }
1226                let nap = (due - now).min(STOP_POLL_QUANTUM_NS);
1227                std::thread::sleep(std::time::Duration::from_nanos(nap));
1228            }
1229            if stop_during_pace {
1230                break;
1231            }
1232            let worker = &workers[cur_tid % worker_count];
1233            cur_tid += 1;
1234            // Backpressure: don't queue more to a worker already `BATCH` deep
1235            // (`schbench.c:1284-1290`). The bounded usleep is schbench's injector
1236            // throttle (workload behavior), not a sync poll. The round-robin
1237            // cursor already advanced, so the skipped slot moves to the next
1238            // worker, matching schbench (the request for this slot is dropped).
1239            if worker.pending.load(Ordering::Acquire) > BATCH {
1240                std::thread::sleep(std::time::Duration::from_micros(100));
1241                continue;
1242            }
1243            worker.pending.fetch_add(1, Ordering::AcqRel);
1244            // schbench mallocs one request per enqueue; the worker frees it after
1245            // processing (`schbench.c:1292`/`:1476`). Box matches that profile.
1246            let req = Box::into_raw(Box::new(Request::new()));
1247            worker.requests.add(req);
1248            // Stamp the target's wake_time so it measures wakeup latency as
1249            // now - enqueue (`schbench.c:1294`), then post it.
1250            worker.wake_time.store(monotonic_nanos(), Ordering::Release);
1251            worker.futex.post();
1252        }
1253        // The pacing loop already spans ~1 second; top up only if it finished
1254        // early (rate 0, or `stop`), preserving schbench's per-second cadence. Cap
1255        // each sleep at STOP_POLL_QUANTUM_NS and re-check `stop` so a shutdown
1256        // during the top-up is observed within the quantum, not after a full
1257        // remaining second (the same teardown bound as the pacing sleep above).
1258        loop {
1259            if stop.load(Ordering::Acquire) {
1260                break;
1261            }
1262            let elapsed = monotonic_nanos().saturating_sub(start);
1263            if elapsed >= ONE_SEC_NS {
1264                break;
1265            }
1266            let nap = (ONE_SEC_NS - elapsed).min(STOP_POLL_QUANTUM_NS);
1267            std::thread::sleep(std::time::Duration::from_nanos(nap));
1268        }
1269        // Dispatcher per-phase run-delay drain (mirrors run_msg_thread): on epoch
1270        // change, finalize this thread's run-delay for the ended phase. Empty
1271        // histograms (the dispatcher records only run-delay).
1272        if let Some(pe) = phase_epoch {
1273            let new_epoch = pe.load(Ordering::Relaxed);
1274            if new_epoch != cur_epoch {
1275                let ss_end = read_schedstat_raw(tid);
1276                // SAFETY: this thread owns msg_td (the dispatcher's ThreadData).
1277                unsafe { drain_phase(msg_td, cur_epoch, phase_ss_start, ss_end, 0) };
1278                cur_epoch = new_epoch;
1279                phase_ss_start = ss_end;
1280            }
1281        }
1282        if stop.load(Ordering::Acquire) {
1283            // Wake every worker so a blocked one observes stop and exits
1284            // (`schbench.c:1308-1312`).
1285            for w in workers {
1286                w.futex.post();
1287            }
1288            break;
1289        }
1290    }
1291    // Final drain + whole-run mean run-delay for the dispatcher thread, matching
1292    // run_msg_thread's exit (`schbench.c:1664`).
1293    let ss_end = read_schedstat_raw(tid);
1294    // SAFETY: this thread owns msg_td.
1295    unsafe { drain_phase(msg_td, cur_epoch, phase_ss_start, ss_end, 0) };
1296    // SAFETY: owner-only access to msg_td's sched_delay_ns cell.
1297    unsafe { *msg_td.sched_delay_ns.get() = mean_sched_delay(ss_end) };
1298}
1299
1300/// The calling thread's kernel tid (`gettid`), for reading its own schedstat.
1301fn gettid_self() -> libc::pid_t {
1302    // SAFETY: gettid takes no arguments and only returns the caller's tid.
1303    unsafe { libc::syscall(libc::SYS_gettid) as libc::pid_t }
1304}
1305
1306/// Raw `(run_delay_ns, pcount)` pair from `/proc/<tid>/schedstat` (fields 2, 3),
1307/// UNDIVIDED — schbench's `read_sched_delay` source (`schbench.c:1118`) without
1308/// the division. Both consumers want the raw pair, not a pre-divided mean: the
1309/// per-phase path re-pools `Σrun_delay / Σpcount` host-side, and the whole-run
1310/// per-thread mean is [`mean_sched_delay`] of the thread's final cumulative
1311/// pair. An absent file (the thread exited) yields `(0, 0)`, like schbench's
1312/// fopen-failure path.
1313///
1314/// `CONFIG_SCHED_INFO` (selected by `CONFIG_SCHEDSTATS`, and also by
1315/// `CONFIG_TASK_DELAY_ACCT` -- ktstr.kconfig enables both) populates these
1316/// fields even under sched_ext and regardless of the `kernel.sched_schedstats`
1317/// sysctl (that sysctl gates the separate per-rq/domain counters, not
1318/// `sched_info`).
1319fn read_schedstat_raw(tid: libc::pid_t) -> (u64, u64) {
1320    match std::fs::read_to_string(format!("/proc/{tid}/schedstat")) {
1321        Ok(s) => parse_schedstat_raw(&s),
1322        // The thread may have exited; schbench's fopen-failure path also -> 0.
1323        Err(_) => (0, 0),
1324    }
1325}
1326
1327/// Parse a `/proc/<tid>/schedstat` line into the raw `(run_delay_ns, pcount)`
1328/// pair (field 2 = run_delay ns, field 3 = pcount = timeslices). The kernel's
1329/// `proc_pid_schedstat` always emits three integer fields, so a malformed
1330/// *present* line is a kernel/parse bug: panic rather than silently report 0
1331/// (matching the handshake's fail-loud stance + the no-silent-wrong-answer
1332/// rule). An absent file is the caller's concern, handled as `(0, 0)`.
1333fn parse_schedstat_raw(s: &str) -> (u64, u64) {
1334    let mut fields = s.split_whitespace();
1335    let _run = fields.next(); // field 1: sum_exec_runtime (on-CPU ns), unused
1336    let run_delay: u64 = fields
1337        .next()
1338        .and_then(|f| f.parse().ok())
1339        .expect("schedstat field 2 (run_delay) must be a present integer");
1340    let pcount: u64 = fields
1341        .next()
1342        .and_then(|f| f.parse().ok())
1343        .expect("schedstat field 3 (pcount) must be a present integer");
1344    (run_delay, pcount)
1345}
1346
1347/// Mean per-schedule run-queue wait (ns) from a raw `(run_delay_ns, pcount)`
1348/// pair: `run_delay / pcount`, schbench's `read_sched_delay` arithmetic
1349/// (`schbench.c:1146`). Guards `pcount == 0` (a never-scheduled thread, or the
1350/// `!sched_info_on()` "0 0 0" line) to 0, where schbench divides by zero. Used
1351/// for the WHOLE-RUN per-thread mean (the mean-of-means component); the
1352/// per-phase path instead keeps the raw pair and re-pools `Σrd/Σpc` host-side.
1353fn mean_sched_delay((run_delay, pcount): (u64, u64)) -> u64 {
1354    if pcount == 0 { 0 } else { run_delay / pcount }
1355}
1356
1357/// Finalize the just-ended phase on the OWNING thread: `take` the live
1358/// histograms (snapshot-and-reset, so the next phase starts clean) and push a
1359/// [`PhaseSnapshot`] tagged `epoch`, carrying the schedstat run-delay/pcount
1360/// delta (`ss_end − ss_start`) and this thread's `loop_count` for the phase.
1361/// Mirrors the worker/mod.rs:3858-3894 backdrop drain-on-change. For the
1362/// message thread the histograms are unwritten (taken empty) and `loop_count`
1363/// is 0; an empty histogram folds harmlessly host-side ([`PlatStats::combine`]).
1364///
1365/// # Safety
1366/// Must be called by the thread that owns `td` — it is the sole writer of
1367/// `td`'s `wakeup_stats` / `request_stats` / `phase_snapshots` cells (the
1368/// [`ThreadData`] `Sync` contract).
1369unsafe fn drain_phase(
1370    td: &ThreadData,
1371    epoch: u32,
1372    ss_start: (u64, u64),
1373    ss_end: (u64, u64),
1374    loop_count: u64,
1375) {
1376    // SAFETY: owner-only cell access, per this function's contract.
1377    unsafe {
1378        let wakeup = (*td.wakeup_stats.get()).take();
1379        let request = (*td.request_stats.get()).take();
1380        (*td.phase_snapshots.get()).push(PhaseSnapshot {
1381            epoch,
1382            wakeup,
1383            request,
1384            run_delay_ns: ss_end.0.saturating_sub(ss_start.0),
1385            pcount: ss_end.1.saturating_sub(ss_start.1),
1386            loop_count,
1387        });
1388    }
1389}
1390
1391/// The per-message-thread shared context every worker borrows — bundled so
1392/// [`worker_loop`]'s signature stays within clippy's argument budget. All
1393/// fields are `Copy` shared references; every worker spawned by one
1394/// [`run_one_message_thread`] reads the same instance.
1395struct WorkerCtx<'a> {
1396    msg_td: &'a ThreadData,
1397    wait_list: &'a TreiberStack<ThreadData>,
1398    locks: Option<&'a PerCpuLocks>,
1399    config: &'a SchbenchConfig,
1400    stop: &'a AtomicBool,
1401    progress: &'a AtomicU64,
1402    phase_epoch: Option<&'a AtomicU32>,
1403    /// The ONE process-global shared matrix (`--split` mode), shared across
1404    /// every worker of every message thread; empty slice when not splitting.
1405    shared: &'a [core::sync::atomic::AtomicU64],
1406    shared_matrix_size: usize,
1407}
1408
1409/// One worker thread's loop. Faithful to schbench's `worker_thread`
1410/// (`schbench.c:1419`, default branch): block until woken (recording wakeup
1411/// latency), then think-sleep + matrix work under the per-CPU lock (recording
1412/// request latency), until stop. `ctx.progress` counts completed work cycles
1413/// across all workers (the live cycle counter / achieved request rate).
1414///
1415/// `ctx.phase_epoch` (when `Some`) is the shared per-phase generation the
1416/// scenario engine bumps at each step boundary. The worker polls it once per
1417/// cycle and, on change, drains the just-ended phase ([`drain_phase`]) —
1418/// `take`ing its own histograms + a schedstat raw delta — so per-phase
1419/// percentiles are isolated across e.g. an scx→detached-EEVDF transition. A
1420/// final drain at exit closes the last phase (and is the SOLE snapshot when
1421/// non-phasic, `cur_epoch` 0).
1422fn worker_loop(td: &ThreadData, ctx: &WorkerCtx) {
1423    // Destructure the all-`Copy` context into the same bindings the body used
1424    // before the bundle, so the loop logic below reads unchanged.
1425    let WorkerCtx {
1426        msg_td,
1427        wait_list,
1428        locks,
1429        config,
1430        stop,
1431        progress,
1432        phase_epoch,
1433        shared,
1434        shared_matrix_size,
1435    } = *ctx;
1436    let tid = gettid_self();
1437    // Per-worker PRIVATE matrix (schbench.c:1568-1574: private_matrix_size when
1438    // splitting, else the full matrix_size). The shared matrix (split mode) is
1439    // the process-global `shared` allocated once in run().
1440    let private_matrix_size = config.private_matrix_size();
1441    let mut private_buf = if private_matrix_size > 0 {
1442        vec![0u64; 3 * private_matrix_size * private_matrix_size]
1443    } else {
1444        Vec::new()
1445    };
1446    let mut work_units = 0u64;
1447    // Per-phase drain-on-change state. `cur_epoch` is the epoch the current
1448    // phase's samples belong to; `phase_ss_start` baselines the schedstat raw
1449    // pair so each phase reports its own delta.
1450    let mut cur_epoch = phase_epoch.map_or(0, |e| e.load(Ordering::Relaxed));
1451    let mut phase_ss_start = read_schedstat_raw(tid);
1452    let mut phase_loop_count = 0u64;
1453    // RPS mode iff the per-thread rate is non-zero (the `-R` total covers at
1454    // least one request/sec per message thread). Below that it rounds to 0 and
1455    // the worker runs the default message-handshake mode -- matching schbench's
1456    // startup-divide-before-gate ([`SchbenchConfig::rps_per_message_thread`]).
1457    // Pipe mode (`-p`): the memory-transfer workload. It runs the message-handshake
1458    // path (where the pipe-page memsets live), NOT the RPS injector -- a ktstr
1459    // divergence; schbench composes -p+-R half-broken (see the `pipe_transfer_bytes`
1460    // field doc).
1461    let pipe_mode = config.pipe_transfer_bytes > 0;
1462    let rps_mode = !pipe_mode && config.rps_per_message_thread() != 0;
1463    while !stop.load(Ordering::Acquire) {
1464        // Acquire work. Default mode: the message-thread handshake (records
1465        // wakeup latency; no request chain -> a single work cycle). RPS mode:
1466        // splice this worker's request queue, blocking on its own futex until
1467        // the RPS thread posts; returns the spliced chain (FIFO) to drain.
1468        let mut req: *mut Request = if rps_mode {
1469            let chain = rps_wait(td, stop);
1470            if stop.load(Ordering::Acquire) {
1471                // Free any spliced-but-unprocessed requests, then exit.
1472                free_request_chain(chain);
1473                break;
1474            }
1475            if chain.is_null() {
1476                continue; // spurious wake with an empty queue
1477            }
1478            chain
1479        } else {
1480            msg_and_wait(td, msg_td, wait_list, stop);
1481            if stop.load(Ordering::Acquire) {
1482                break;
1483            }
1484            ptr::null_mut()
1485        };
1486        // Process the work: one cycle in default mode (`req` null), or one cycle
1487        // per queued request in RPS mode (freeing each). `work_start` is stamped
1488        // before the think-sleep so the request latency covers think-time +
1489        // matrix work (`schbench.c:1464-1481`).
1490        loop {
1491            let work_start = monotonic_nanos();
1492            // Pipe mode: the per-cycle transfer is the wake/block memsets
1493            // (msg_and_wait fills our page to 2; wake_all fills it to 1). The work
1494            // loop only counts cycles for the MB/s throughput; schbench likewise
1495            // skips do_work + the think-sleep in pipe mode (schbench.c:1448).
1496            if !pipe_mode {
1497                if config.sleep_usec > 0 {
1498                    think_sleep(config.sleep_usec);
1499                }
1500                do_work(
1501                    &mut private_buf,
1502                    private_matrix_size,
1503                    shared,
1504                    shared_matrix_size,
1505                    config.split_percent,
1506                    config.operations,
1507                    locks,
1508                    &mut work_units,
1509                );
1510            }
1511            let now = monotonic_nanos();
1512            let delta_us = now.saturating_sub(work_start) / 1000;
1513            if delta_us > 0 {
1514                // SAFETY: only this worker thread accesses its own request_stats cell.
1515                unsafe { (*td.request_stats.get()).add_lat(delta_us.min(u32::MAX as u64) as u32) };
1516            }
1517            progress.fetch_add(1, Ordering::Relaxed);
1518            phase_loop_count += 1;
1519            if req.is_null() {
1520                break; // default mode: a single work cycle
1521            }
1522            // RPS mode: advance to the next queued request and free this one.
1523            // SAFETY: `req` was `Box::into_raw`'d by the RPS thread and handed to
1524            // this worker exclusively via the queue splice; we own it and free it
1525            // exactly once here.
1526            let next = unsafe { (*req).next.load(Ordering::Acquire) };
1527            drop(unsafe { Box::from_raw(req) });
1528            req = next;
1529            if req.is_null() {
1530                break;
1531            }
1532        }
1533        // Drain-on-change: when the parent advances `phase_epoch`, finalize the
1534        // phase just ended (tagged with the OLD epoch the samples were recorded
1535        // under) and re-baseline. A worker blocked across a whole phase simply
1536        // contributes an empty histogram for it (no cycle ran), which folds
1537        // harmlessly.
1538        if let Some(pe) = phase_epoch {
1539            let new_epoch = pe.load(Ordering::Relaxed);
1540            if new_epoch != cur_epoch {
1541                let ss_end = read_schedstat_raw(tid);
1542                // SAFETY: this is the thread that owns `td`.
1543                unsafe { drain_phase(td, cur_epoch, phase_ss_start, ss_end, phase_loop_count) };
1544                cur_epoch = new_epoch;
1545                phase_ss_start = ss_end;
1546                phase_loop_count = 0;
1547            }
1548        }
1549    }
1550    // Shutdown wake: the loop exits on `stop` without a final `msg_and_wait`, so
1551    // the message thread may still be parked in `wait_forever` (its only waker is
1552    // a worker post). schbench's MAIN thread handles this -- it fposts each
1553    // message thread after `stopping = 1` (`schbench.c:1832`, `:1933`) -- but our
1554    // `run()` joins passively and `msg_td` is private to `run_one_message_thread`,
1555    // so the exiting worker wakes its own message thread instead. Without this, a
1556    // worker that exits mid-`do_work` leaves the message thread parked forever and
1557    // `run()` deadlocks. Idempotent across workers: the first post the message
1558    // thread observes makes it re-check `stop` and break; later posts are no-ops.
1559    // In RPS mode this is itself a harmless no-op: the dispatcher runs
1560    // `run_rps_thread` (which never parks on `msg_td`) and observes `stop` itself.
1561    msg_td.futex.post();
1562    // Final drain: close the still-open phase. When non-phasic this is the only
1563    // snapshot (`cur_epoch` 0), so run() builds the whole-run result uniformly
1564    // from snapshots.
1565    let ss_end = read_schedstat_raw(tid);
1566    // SAFETY: this is the thread that owns `td`.
1567    unsafe { drain_phase(td, cur_epoch, phase_ss_start, ss_end, phase_loop_count) };
1568    // Record this worker's whole-run mean run-queue wait at exit — the
1569    // mean-of-means component for the whole-run SchbenchResult (schbench reads
1570    // each thread's schedstat for the final aggregate, `schbench.c:1664-1670`).
1571    // Reuses the cumulative `ss_end` pair just read (one /proc read; mean and
1572    // final-phase delta share one consistent snapshot).
1573    // SAFETY: owner-only access to this thread's sched_delay_ns cell.
1574    unsafe { *td.sched_delay_ns.get() = mean_sched_delay(ss_end) };
1575}
1576
1577/// Resolve the worker-thread default and the per-CPU lock-array size from the
1578/// calling thread's CPU affinity (the allocated cpuset, per ruling). Returns
1579/// `(allowed_cpu_count, lock_array_size)` where the array size is the highest
1580/// allowed CPU id + 1, so `sched_getcpu` indexes it without clamping even on a
1581/// sparse cpuset.
1582fn resolve_cpu_topology() -> (usize, usize) {
1583    // SAFETY: a zeroed cpu_set_t filled by sched_getaffinity for the calling
1584    // thread (pid 0); CPU_ISSET only reads it.
1585    unsafe {
1586        let mut set: libc::cpu_set_t = core::mem::zeroed();
1587        let rc = libc::sched_getaffinity(0, core::mem::size_of::<libc::cpu_set_t>(), &mut set);
1588        if rc != 0 {
1589            return (1, 1);
1590        }
1591        let mut count = 0usize;
1592        let mut max_id = 0usize;
1593        for cpu in 0..libc::CPU_SETSIZE as usize {
1594            if libc::CPU_ISSET(cpu, &set) {
1595                count += 1;
1596                max_id = cpu;
1597            }
1598        }
1599        (count.max(1), (max_id + 1).max(1))
1600    }
1601}
1602
1603/// Combined WHOLE-RUN results of a schbench run: the merged wakeup + request
1604/// latency percentiles and the achieved request rate (completed work
1605/// cycles/second). The histograms are the union of every phase; `sched_delay_*`
1606/// keep schbench's mean-of-means (`collect_sched_delay`) so the side-by-side
1607/// validation matches real schbench's reported number.
1608pub(crate) struct SchbenchResult {
1609    pub(crate) wakeup: Percentiles,
1610    pub(crate) request: Percentiles,
1611    /// Per-second achieved-RPS distribution (schbench's `rps_stats`), sampled
1612    /// once per second by the control thread. For auto-RPS, samples are gated
1613    /// until the target is hit (`schbench.c:1781`); for fixed/default mode every
1614    /// second is sampled.
1615    pub(crate) rps: Percentiles,
1616    pub(crate) loop_count: u64,
1617    /// Resolved total worker-thread count (`message_threads * worker_threads`).
1618    /// Divisor for schbench's PER-WORKER pipe-mode rate (`avg worker transfer`):
1619    /// schbench reports `loop_count / Σ worker runtimes` (`schbench.c:1697`,
1620    /// `:1942-1943`/`:1979`), and `Σ worker runtimes ≈ nr_workers * elapsed`, so
1621    /// the per-worker rate is `achieved_rps / nr_workers`.
1622    pub(crate) nr_workers: usize,
1623    pub(crate) achieved_rps: f64,
1624    /// Auto-RPS final TOTAL target rate at run exit: the live per-message-thread
1625    /// rate * message_threads (schbench's `requests_per_sec * message_threads`,
1626    /// `schbench.c:1995`). For fixed `-R` and default mode this equals the seeded
1627    /// total; only auto-RPS makes it diverge as the control loop retargets.
1628    pub(crate) final_rps_goal: usize,
1629    /// Mean message-thread run-queue wait (ns), averaged across message threads
1630    /// (schbench's `message_thread_delay`, `schbench.c:1673`).
1631    pub(crate) sched_delay_msg_ns: u64,
1632    /// Mean worker-thread run-queue wait (ns), averaged across all workers
1633    /// (schbench's `worker_thread_delay`, `schbench.c:1674`).
1634    pub(crate) sched_delay_worker_ns: u64,
1635}
1636
1637/// The full result of [`run`]: the whole-run [`SchbenchResult`] plus the
1638/// per-phase aggregates keyed by `phase_epoch` (each the cross-thread merge for
1639/// one scenario step's HOLD window). When non-phasic (`phase_epoch == None`)
1640/// `phases` holds the single `(0, ..)` baseline entry the host discards; tests
1641/// read `whole_run`. The worker dispatch turns each `(epoch, SchbenchPhaseStats)`
1642/// into a `PhaseSlice`.
1643pub(crate) struct SchbenchOutcome {
1644    pub(crate) whole_run: SchbenchResult,
1645    pub(crate) phases: Vec<(u32, SchbenchPhaseStats)>,
1646}
1647
1648/// One message thread's pooled results, returned from [`run_one_message_thread`]
1649/// to [`run`]: the whole-run histograms (Σ over its workers' phase snapshots),
1650/// the run-delay components for the whole-run mean-of-means, and the per-epoch
1651/// aggregate keyed by `phase_epoch`.
1652struct MessageThreadResult {
1653    whole_wakeup: PlatStats,
1654    whole_request: PlatStats,
1655    msg_sched_delay_ns: u64,
1656    workers_sched_delay_sum: u64,
1657    phases: std::collections::BTreeMap<u32, SchbenchPhaseStats>,
1658}
1659
1660/// Run one message thread plus its workers, returning the whole-run histograms
1661/// AND the per-epoch aggregate. The message thread runs on the calling thread;
1662/// workers are scoped so they are joined before their owner-only phase snapshots
1663/// are drained.
1664#[allow(clippy::too_many_arguments)]
1665fn run_one_message_thread(
1666    worker_threads: usize,
1667    locks: Option<&PerCpuLocks>,
1668    config: &SchbenchConfig,
1669    stop: &AtomicBool,
1670    progress: &AtomicU64,
1671    phase_epoch: Option<&AtomicU32>,
1672    live_rate: &AtomicUsize,
1673    shared: &[core::sync::atomic::AtomicU64],
1674    shared_matrix_size: usize,
1675) -> MessageThreadResult {
1676    // schbench clamps -p to PIPE_TRANSFER_BUFFER (schbench.c:291-294). Workers get
1677    // the per-thread transfer page; the message thread (waker) needs none.
1678    let pipe_bytes = config.pipe_transfer_bytes.min(PIPE_TRANSFER_BUFFER);
1679    let workers: Vec<ThreadData> = (0..worker_threads)
1680        .map(|_| ThreadData::new(pipe_bytes))
1681        .collect();
1682    let msg_td = ThreadData::new(0);
1683    let wait_list = TreiberStack::new();
1684    let ctx = WorkerCtx {
1685        msg_td: &msg_td,
1686        wait_list: &wait_list,
1687        locks,
1688        config,
1689        stop,
1690        progress,
1691        phase_epoch,
1692        shared,
1693        shared_matrix_size,
1694    };
1695
1696    std::thread::scope(|inner| {
1697        for w in &workers {
1698            inner.spawn(|| worker_loop(w, &ctx));
1699        }
1700        // Dispatcher: RPS-injector mode runs the rate-driven request thread;
1701        // default mode runs the message-thread handshake. Both run on this
1702        // calling thread and record the dispatcher's run-delay into `msg_td`
1703        // (schbench `message_thread`, `:1594`).
1704        // Pipe mode (`-p`) uses the message-handshake waker (run_msg_thread does
1705        // the per-worker pipe-page fill in wake_all) and takes precedence over -R.
1706        // This DIVERGES from schbench, where -R alone picks the path
1707        // (`schbench.c:1594`): schbench's `-p -R` runs the RPS injector (no
1708        // xlist_wake_all -> no waker-side memset -> a half-pipe). ktstr runs the
1709        // full pipe; the realistic use is `-p` alone. Otherwise -R selects the
1710        // injector.
1711        if config.pipe_transfer_bytes == 0 && config.rps_per_message_thread() != 0 {
1712            run_rps_thread(&workers, &msg_td, stop, phase_epoch, live_rate);
1713        } else {
1714            run_msg_thread(&msg_td, &wait_list, stop, phase_epoch);
1715        }
1716        // Stop is set: wake every worker so a blocked one observes stop and
1717        // exits (schbench fposts each worker before joining, `:1599-1602`).
1718        for w in &workers {
1719            w.futex.post();
1720        }
1721        // The inner scope joins the workers here.
1722    });
1723
1724    // RPS mode: free any requests the RPS thread enqueued after a worker's last
1725    // splice (the worker exited on stop without draining them). After the join,
1726    // this is the sole access to each worker's queue, so no request leaks across
1727    // run() calls.
1728    if config.rps_per_message_thread() != 0 {
1729        for w in &workers {
1730            free_request_chain(w.requests.splice_reversed());
1731        }
1732    }
1733
1734    // After join (happens-before): drain each thread's owner-only phase
1735    // snapshots into the whole-run histograms + the per-epoch aggregate.
1736    let mut whole_wakeup = PlatStats::default();
1737    let mut whole_request = PlatStats::default();
1738    let mut workers_sched_delay_sum = 0u64;
1739    let mut phases: std::collections::BTreeMap<u32, SchbenchPhaseStats> =
1740        std::collections::BTreeMap::new();
1741    for w in &workers {
1742        // SAFETY: every worker has joined (inner scope ended), so this is the
1743        // sole access to their cells.
1744        unsafe {
1745            workers_sched_delay_sum =
1746                workers_sched_delay_sum.saturating_add(*w.sched_delay_ns.get());
1747            for snap in (*w.phase_snapshots.get()).drain(..) {
1748                whole_wakeup.combine(&snap.wakeup);
1749                whole_request.combine(&snap.request);
1750                let e = phases.entry(snap.epoch).or_default();
1751                e.wakeup.combine(&snap.wakeup);
1752                e.request.combine(&snap.request);
1753                e.worker_run_delay_ns = e.worker_run_delay_ns.saturating_add(snap.run_delay_ns);
1754                e.worker_pcount = e.worker_pcount.saturating_add(snap.pcount);
1755                e.loop_count = e.loop_count.saturating_add(snap.loop_count);
1756            }
1757        }
1758    }
1759    // SAFETY: run_msg_thread ran on this thread inside the scope above (before
1760    // the join), so msg_td's cells are settled and this is sole access.
1761    let msg_sched_delay = unsafe { *msg_td.sched_delay_ns.get() };
1762    // SAFETY: same — sole access to msg_td's snapshot cell after the scope.
1763    unsafe {
1764        for snap in (*msg_td.phase_snapshots.get()).drain(..) {
1765            let e = phases.entry(snap.epoch).or_default();
1766            e.msg_run_delay_ns = e.msg_run_delay_ns.saturating_add(snap.run_delay_ns);
1767            e.msg_pcount = e.msg_pcount.saturating_add(snap.pcount);
1768        }
1769    }
1770    MessageThreadResult {
1771        whole_wakeup,
1772        whole_request,
1773        msg_sched_delay_ns: msg_sched_delay,
1774        workers_sched_delay_sum,
1775        phases,
1776    }
1777}
1778
1779/// Persistent /proc/stat reader state for auto-RPS, carrying the previous
1780/// cumulative totals so each call computes a delta over the interval (schbench
1781/// keeps one fd + the prior total/idle, `schbench.c` `read_busy`, `:1046`).
1782#[derive(Default)]
1783struct ReadBusyState {
1784    fd: Option<File>,
1785    prev_total: u64,
1786    prev_idle: u64,
1787}
1788
1789/// Host-busy percentage over the interval since the last call, from the
1790/// aggregate `cpu` line of /proc/stat (schbench `read_busy`,
1791/// `schbench.c:1046-1112`). `None` means "no usable reading this tick": the first
1792/// call (baseline seed, schbench's `first_run`) OR a zero jiffy-delta (two reads
1793/// in the same tick -- schbench has no guard and divides by zero into a NaN that
1794/// spuriously trips its target-hit `else` branch; we skip the tick instead). A
1795/// real reading is `Some(0.0..=100.0)`, including a genuine fully-idle `Some(0.0)`
1796/// -- distinguished from the seed via the `prev_total` state, NOT via the value,
1797/// so an idle interval still drives a grow (the bug a `busy == 0.0` test had). In
1798/// the guest VM /proc/stat covers exactly the allocated cpuset (the guest has
1799/// only `cores=N`), so this system-wide read is correctly scoped to the
1800/// workload's CPUs. f32 to match schbench's `float` arithmetic.
1801fn read_busy(s: &mut ReadBusyState) -> Option<f32> {
1802    if s.fd.is_none() {
1803        s.fd = File::open("/proc/stat").ok();
1804    }
1805    let f = s.fd.as_mut()?;
1806    f.seek(SeekFrom::Start(0)).ok()?;
1807    let mut buf = [0u8; 512];
1808    let n = f.read(&mut buf).ok()?;
1809    let text = core::str::from_utf8(&buf[..n]).ok()?;
1810    // First line only; first token must be "cpu" (the aggregate line). Sum every
1811    // numeric field -> total; field index 3 (after "cpu") is idle -- kernel
1812    // show_stat order is user/nice/system/IDLE/iowait/irq/softirq/steal/guest/
1813    // guest_nice. `split_whitespace` collapses the "cpu  " double space, matching
1814    // schbench's `strtok_r` (`schbench.c:1082-1096`).
1815    let line = text.lines().next().unwrap_or("");
1816    let mut fields = line.split_whitespace();
1817    if fields.next() != Some("cpu") {
1818        return None;
1819    }
1820    let mut total = 0u64;
1821    let mut idle = 0u64;
1822    for (i, tok) in fields.enumerate() {
1823        let v: u64 = tok.parse().unwrap_or(0);
1824        if i == 3 {
1825            idle = v;
1826        }
1827        total += v;
1828    }
1829    // First call: seed the baseline, no busy% yet (`schbench.c:1098-1101`).
1830    if s.prev_total == 0 {
1831        s.prev_total = total;
1832        s.prev_idle = idle;
1833        return None;
1834    }
1835    let dt = total.saturating_sub(s.prev_total);
1836    let di = idle.saturating_sub(s.prev_idle);
1837    s.prev_total = total;
1838    s.prev_idle = idle;
1839    if dt == 0 {
1840        // Zero jiffy-delta: skip rather than NaN (see the doc; schbench diverges).
1841        return None;
1842    }
1843    Some(100.0 - (di as f32 / dt as f32) * 100.0)
1844}
1845
1846/// Grow or shrink the live per-message-thread request rate toward `target_pct`
1847/// host-busy, given an already-read `busy` percentage (schbench `auto_scale_rps`'s
1848/// scaling body, `schbench.c:1203-1250`). Pure: no I/O, so the scaling decision is
1849/// unit-tested with injected `busy` values. Stores the new per-thread rate into
1850/// `live_rate` -- schbench scales its global `requests_per_sec` (already per-thread
1851/// after `:1899`) and writes it back at `:1250`. Returns true on the transition
1852/// INTO the near-target band (when `target_hit` flips false->true), so the caller
1853/// resets the rps histogram (schbench memsets `rps_stats` on that transition). The
1854/// near-target test reads the delta AFTER its damping reassignment, exactly as
1855/// schbench does (`:1210`,`:1233`). f32 math + `ceil` (grow) / `floor` (shrink)
1856/// match schbench's float deltas + rounding.
1857fn scale_rps_for_busy(
1858    busy: f32,
1859    live_rate: &AtomicUsize,
1860    target_hit: &AtomicBool,
1861    target_pct: usize,
1862) -> bool {
1863    let target = target_pct as f32;
1864    let rps = live_rate.load(Ordering::Relaxed) as f32;
1865    let already_hit = target_hit.load(Ordering::Relaxed);
1866    let mut just_hit = false;
1867    let new_rate: usize = if busy < target {
1868        // Under target: grow (`schbench.c:1203-1224`).
1869        let mut delta = target / busy;
1870        if delta > 3.0 {
1871            delta = 3.0;
1872        } else if delta < 1.2 {
1873            delta = 1.0 + (delta - 1.0) / 8.0;
1874            // Threshold check AFTER the damping reassignment (`schbench.c:1209-1213`).
1875            if delta < 1.05 && !already_hit {
1876                just_hit = true;
1877            }
1878        } else if delta < 1.5 {
1879            delta = 1.0 + (delta - 1.0) / 4.0;
1880        }
1881        let t = (rps * delta).ceil();
1882        // Not enough capacity to reach the target -> hold (`schbench.c:1219-1224`).
1883        if t >= (1u64 << 31) as f32 {
1884            rps as usize
1885        } else {
1886            t as usize
1887        }
1888    } else if busy > target {
1889        // Over target: shrink (`schbench.c:1226-1242`).
1890        let mut delta = target / busy;
1891        if delta < 0.3 {
1892            delta = 0.3;
1893        } else if delta > 0.9 {
1894            delta += (1.0 - delta) / 8.0;
1895            // Threshold check AFTER the damping reassignment (`schbench.c:1232-1236`).
1896            if delta > 0.95 && !already_hit {
1897                just_hit = true;
1898            }
1899        } else if delta > 0.8 {
1900            delta += (1.0 - delta) / 4.0;
1901        }
1902        (rps * delta).floor().max(0.0) as usize
1903    } else {
1904        // Exactly on target (`schbench.c:1243-1248`).
1905        if !already_hit {
1906            just_hit = true;
1907        }
1908        rps as usize
1909    };
1910    live_rate.store(new_rate, Ordering::Relaxed);
1911    if just_hit {
1912        target_hit.store(true, Ordering::Relaxed);
1913    }
1914    just_hit
1915}
1916
1917/// Read host-busy from /proc/stat and scale the live rate toward the target
1918/// (schbench `auto_scale_rps`, `schbench.c:1180-1251`). The first read only seeds
1919/// the baseline (schbench's `first_run` early return, `:1201`); a zero-delta tick
1920/// is skipped (see [`read_busy`]). Returns [`scale_rps_for_busy`]'s just-hit flag.
1921fn auto_scale_rps(
1922    busy_state: &mut ReadBusyState,
1923    live_rate: &AtomicUsize,
1924    target_hit: &AtomicBool,
1925    target_pct: usize,
1926) -> bool {
1927    let Some(busy) = read_busy(busy_state) else {
1928        return false;
1929    };
1930    scale_rps_for_busy(busy, live_rate, target_hit, target_pct)
1931}
1932
1933/// Reset BOTH per-phase rps accumulators in LOCKSTEP at an auto-RPS target-hit.
1934/// schbench memsets `rps_stats` on the target-hit transition to discard the
1935/// pre-target ramp; the per-epoch map MUST reset with it, or the per-phase view
1936/// keeps ramp samples the whole-run view dropped and `Σ per-epoch == whole-run`
1937/// breaks (asymmetric-counter-bookkeeping).
1938fn reset_rps_accumulators(
1939    whole: &mut PlatStats,
1940    per_epoch: &mut std::collections::BTreeMap<u32, PlatStats>,
1941) {
1942    *whole = PlatStats::default();
1943    per_epoch.clear();
1944}
1945
1946/// The once-per-second control thread, schbench's main control loop
1947/// (`schbench.c:1756-1827`): sample the achieved per-second RPS into `rps_stats`
1948/// and -- when auto-RPS is on -- adjust the live rate toward the target. Returns
1949/// BOTH the whole-run `rps_stats` AND a per-epoch RPS map (the same samples
1950/// bucketed by the phase epoch observed at sample time), moved out on join. The
1951/// whole-run `rps_stats` is byte-identical to the pre-per-phase behavior -- the
1952/// per-epoch map is PURELY ADDITIVE (each sample lands in the whole-run histogram
1953/// AND exactly one epoch bucket), so `Σ per-epoch == whole-run`. `phase_epoch` is
1954/// read Relaxed (telemetry, gates no coupled memory); a 1s sample straddling an
1955/// epoch change is attributed wholly to the sample-time epoch (bounded fuzz,
1956/// mirroring the worker drain-on-change). The 1-second cadence is the workload's
1957/// defined sampling/scaling interval, not a synchronization wait; it observes
1958/// `stop` after each interval.
1959fn control_loop(
1960    progress: &AtomicU64,
1961    stop: &AtomicBool,
1962    config: &SchbenchConfig,
1963    live_rate: &AtomicUsize,
1964    target_hit: &AtomicBool,
1965    phase_epoch: Option<&AtomicU32>,
1966) -> (PlatStats, std::collections::BTreeMap<u32, PlatStats>) {
1967    let mut rps_stats = PlatStats::default();
1968    let mut per_epoch_rps: std::collections::BTreeMap<u32, PlatStats> =
1969        std::collections::BTreeMap::new();
1970    let mut last_loop = 0u64;
1971    let mut last_t = monotonic_nanos();
1972    let mut busy_state = ReadBusyState::default();
1973    while !stop.load(Ordering::Acquire) {
1974        // Sleep ~1s between samples, but in STOP_POLL_QUANTUM_NS chunks that
1975        // re-check `stop`, so a shutdown is observed within a quantum instead of
1976        // blocking run()'s teardown for a full second (the control thread is joined
1977        // by run(), so an uncapped sleep here is the dominant teardown latency —
1978        // worse than, and unconditional unlike, the RPS injector's pacing sleep).
1979        let sleep_target = monotonic_nanos().saturating_add(1_000_000_000);
1980        loop {
1981            if stop.load(Ordering::Acquire) {
1982                break;
1983            }
1984            let now = monotonic_nanos();
1985            if now >= sleep_target {
1986                break;
1987            }
1988            std::thread::sleep(std::time::Duration::from_nanos(
1989                (sleep_target - now).min(STOP_POLL_QUANTUM_NS),
1990            ));
1991        }
1992        // Sample the just-elapsed window BEFORE re-checking stop at the loop top,
1993        // so the in-progress second is recorded rather than dropped. `dt` is the
1994        // ACTUAL elapsed time (now - last_t).
1995        //
1996        // schbench samples-first then sleeps the full second (`sleep(1)` only
1997        // `if (!done)`, schbench.c:1827), so every schbench sample -- including the
1998        // boundary one taken when `done` is detected -- spans a real ~1s window
1999        // (rps over a real `delta = tvdelta(..)`, schbench.c:1774,1777). ktstr
2000        // sleeps-first/samples-last with a BOUNDED sleep, which diverges two ways:
2001        //   - On early `stop` this boundary sample spans a true PARTIAL window: for
2002        //     a substantial window (>= one quantum) the real rate over the
2003        //     sub-window the workers were active (more granular than schbench, which
2004        //     would have waited the full second); a degenerate sub-quantum window is
2005        //     dropped by `control_loop_rps_sample` (a count/dt rate is unbounded as
2006        //     dt->0). The bounded sleep is a ktstr scenario-teardown requirement
2007        //     schbench does not face.
2008        //   - The sleep-first shape omits schbench's tiny FIRST-window sample
2009        //     (schbench reads `now` at loop start before any work, schbench.c:1757,
2010        //     so its first `delta` is ~us); ktstr's first sample follows a full ~1s
2011        //     sleep.
2012        let now = monotonic_nanos();
2013        let lc = progress.load(Ordering::Relaxed);
2014        let dt = now.saturating_sub(last_t);
2015        // Achieved per-second rate = Δcompleted-cycles / Δt (schbench
2016        // `combine_message_thread_rps` over the shared loop count, schbench.c:1777),
2017        // floored at one quantum by `control_loop_rps_sample` (a ktstr teardown
2018        // guard, not in schbench — see its doc).
2019        if let Some(sample) = control_loop_rps_sample(lc.saturating_sub(last_loop), dt) {
2020            // Gate: for auto-RPS, sample only once the target is hit
2021            // (`schbench.c:1781`); fixed/default mode samples every second.
2022            if config.auto_rps == 0 || target_hit.load(Ordering::Relaxed) {
2023                rps_stats.add_lat(sample);
2024                // Per-phase parity: also bucket the sample by the epoch observed
2025                // NOW (the sample-time epoch; a straddling 1s sample lands wholly
2026                // here, bounded fuzz). 0/u32::MAX sentinel epochs route into the
2027                // same all_phases entries the host discards (the run() merge).
2028                let epoch = phase_epoch.map_or(0, |e| e.load(Ordering::Relaxed));
2029                per_epoch_rps.entry(epoch).or_default().add_lat(sample);
2030            }
2031        }
2032        last_loop = lc;
2033        last_t = now;
2034        if config.auto_rps != 0 {
2035            let just_hit = auto_scale_rps(&mut busy_state, live_rate, target_hit, config.auto_rps);
2036            if just_hit {
2037                // Discard ALL pre-target ramp samples (schbench memsets rps_stats
2038                // on the target-hit transition -- grow/shrink/on-target bands all
2039                // funnel through just_hit, `schbench.c:1212`/`:1235`/`:1247`). The
2040                // per-epoch map resets in LOCKSTEP so the per-phase view never
2041                // includes ramp samples the whole-run view discards (the
2042                // Σ-per-epoch == whole-run invariant survives the reset).
2043                reset_rps_accumulators(&mut rps_stats, &mut per_epoch_rps);
2044            }
2045        }
2046    }
2047    (rps_stats, per_epoch_rps)
2048}
2049
2050/// Achieved per-second rate sample for one control-loop window:
2051/// Δcompleted-cycles over `dt_ns`, in cycles/sec, clamped to `u32`. `None` when
2052/// the window is shorter than one `STOP_POLL_QUANTUM_NS` quantum.
2053///
2054/// The floor is a correctness guard, not a nicety. The bounded teardown sleep in
2055/// `control_loop` can return on `stop` with a sub-millisecond `dt` (the window
2056/// between a sample's `last_t = now` and the next iteration's first stop check),
2057/// and a count/dt RATE is unbounded as `dt -> 0` (a single completion in a 1us
2058/// window reads as ~1e6 cycles/sec), so an unfloored sub-quantum sample would
2059/// spike `rps_max` and the top rps percentiles -- assertable metrics. A real
2060/// partial window is always >= one quantum (the sleep loop only observes `stop`
2061/// at a quantum boundary), so the floor drops only the degenerate race window; an
2062/// admitted window bounds the sample to `<= delta_loops * 1e9 /
2063/// STOP_POLL_QUANTUM_NS` (~ the real rate, no inflation). This diverges from
2064/// schbench's `isfinite(rps)` guard (schbench.c:1782), which catches only an
2065/// exact-zero delta -- a finite-but-huge rate from a tiny window slips past it,
2066/// but schbench's full-second cadence (schbench.c:1827) never produces one.
2067fn control_loop_rps_sample(delta_loops: u64, dt_ns: u64) -> Option<u32> {
2068    if dt_ns < STOP_POLL_QUANTUM_NS {
2069        return None;
2070    }
2071    let rps = delta_loops as f64 * 1e9 / dt_ns as f64;
2072    Some((rps as u64).min(u32::MAX as u64) as u32)
2073}
2074
2075/// Run the schbench workload until `stop` is set, returning the whole-run
2076/// percentiles + achieved rate AND the per-phase aggregates. `progress` is the
2077/// live count of completed work cycles across all workers. `phase_epoch` (when
2078/// `Some`) is the scenario engine's shared per-phase generation; the engine
2079/// splits its histograms at each transition (see [`worker_loop`]).
2080pub(crate) fn run(
2081    config: &SchbenchConfig,
2082    stop: &AtomicBool,
2083    progress: &AtomicU64,
2084    phase_epoch: Option<&AtomicU32>,
2085) -> SchbenchOutcome {
2086    let (allowed_count, lock_array_size) = resolve_cpu_topology();
2087    let worker_threads = config.resolve_worker_count(allowed_count);
2088    let locks = if config.skip_locking {
2089        None
2090    } else {
2091        Some(PerCpuLocks::new(lock_array_size))
2092    };
2093    // schbench rejects a `--split` outside 0..=100 at parse and exits
2094    // (schbench.c:362-365). ktstr's analog is a loud panic here at the
2095    // consumption boundary -- before any matrix sizing or the shared Arc
2096    // allocation below -- so an out-of-range config (a `pub split_percent` set
2097    // via struct literal bypasses the builder's debug_assert) can never
2098    // underflow `100 - p` (a release `usize` wrap that would size a garbage
2099    // shared matrix and OOM the allocation) or silently mis-split.
2100    if let Some(p) = config.split_percent {
2101        assert!(
2102            p <= 100,
2103            "schbench split_percent must be in 0..=100, got {p} (schbench.c:362-365 rejects out of range)"
2104        );
2105    }
2106    // schbench.c:1871-1872: ONE process-global shared matrix for `--split`,
2107    // allocated once and shared (interior-mutable AtomicU64) by every worker
2108    // across all message threads. Empty when not splitting (`None`) or when the
2109    // shared portion rounds to 0 (split=100): the `shared_matrix_size > 0`
2110    // guard in do_work then skips the shared branch, so the empty slice is
2111    // never indexed.
2112    let shared_matrix_size = config.shared_matrix_size();
2113    let shared: std::sync::Arc<[core::sync::atomic::AtomicU64]> = if shared_matrix_size > 0 {
2114        (0..3 * shared_matrix_size * shared_matrix_size)
2115            .map(|_| core::sync::atomic::AtomicU64::new(0))
2116            .collect()
2117    } else {
2118        std::sync::Arc::from([] as [core::sync::atomic::AtomicU64; 0])
2119    };
2120
2121    let start = monotonic_nanos();
2122    let mut all_wakeup = PlatStats::default();
2123    let mut all_request = PlatStats::default();
2124    let mut total_msg_sched_delay = 0u64;
2125    let mut total_worker_sched_delay = 0u64;
2126    let mut all_phases: std::collections::BTreeMap<u32, SchbenchPhaseStats> =
2127        std::collections::BTreeMap::new();
2128
2129    // The live PER-MESSAGE-THREAD request rate the RPS injectors read each second
2130    // -- schbench's global `requests_per_sec` AFTER its startup `/= message_threads`
2131    // (`schbench.c:1899`), which is exactly the value `auto_scale_rps` scales
2132    // (`schbench.c:1250`) and each `run_rps_thread` injects directly with no
2133    // further division (`schbench.c:1290`). Seeded from `rps_per_message_thread()`
2134    // (the -R total / m, or the auto-RPS seed 10 / m). The control thread
2135    // auto-scales it when auto-RPS is on; otherwise it stays constant -- a fixed
2136    // -R then injects that seed rate unchanged. `target_hit` gates the rps_stats sampling
2137    // for auto-RPS (`schbench.c:1781`); `rps_stats` is moved out of the control
2138    // thread on join. `live_rate` is read back after the scope as the auto-RPS
2139    // "final rps goal" (`schbench.c:1995`, per-thread * m).
2140    //
2141    // Ordering: both atomics are lone scalars that gate NO coupled memory --
2142    // `live_rate` is a plain count the RPS thread loops on (the requests it
2143    // enqueues publish via the Treiber stack's own AcqRel), and `target_hit` is
2144    // touched only by the control thread. So Relaxed is correct (a 1-second-stale
2145    // read is benign within the auto-RPS cadence; schbench reads its global with
2146    // no sync at all). The final read after the scope is synchronized by the join.
2147    let live_rate = AtomicUsize::new(config.rps_per_message_thread());
2148    let target_hit = AtomicBool::new(false);
2149    let mut rps_stats = PlatStats::default();
2150
2151    std::thread::scope(|outer| {
2152        let handles: Vec<_> = (0..config.message_threads)
2153            .map(|_| {
2154                let locks = locks.as_ref();
2155                let live_rate = &live_rate;
2156                let shared = &shared;
2157                outer.spawn(move || {
2158                    run_one_message_thread(
2159                        worker_threads,
2160                        locks,
2161                        config,
2162                        stop,
2163                        progress,
2164                        phase_epoch,
2165                        live_rate,
2166                        &shared[..],
2167                        shared_matrix_size,
2168                    )
2169                })
2170            })
2171            .collect();
2172        // Control thread: schbench's main control loop -- once/sec, sample the
2173        // achieved RPS into rps_stats and (for auto-RPS) auto-scale `live_rate`.
2174        let control = outer
2175            .spawn(|| control_loop(progress, stop, config, &live_rate, &target_hit, phase_epoch));
2176        for h in handles {
2177            let mtr = h.join().expect("schbench message thread panicked");
2178            all_wakeup.combine(&mtr.whole_wakeup);
2179            all_request.combine(&mtr.whole_request);
2180            total_msg_sched_delay = total_msg_sched_delay.saturating_add(mtr.msg_sched_delay_ns);
2181            total_worker_sched_delay =
2182                total_worker_sched_delay.saturating_add(mtr.workers_sched_delay_sum);
2183            for (epoch, sps) in mtr.phases {
2184                all_phases.entry(epoch).or_default().merge(&sps);
2185            }
2186        }
2187        let (whole_rps, control_per_epoch_rps) =
2188            control.join().expect("schbench control thread panicked");
2189        rps_stats = whole_rps;
2190        // Merge the control thread's per-epoch RPS into the per-phase aggregate
2191        // EXACTLY ONCE, post-join, OUTSIDE the per-message-thread loop above (else
2192        // it would double-count by message_threads). The control thread is the
2193        // SOLE rps source; the workers contribute empty rps via mtr.phases.
2194        for (epoch, hist) in control_per_epoch_rps {
2195            all_phases.entry(epoch).or_default().rps.combine(&hist);
2196        }
2197    });
2198
2199    // The auto-RPS "final rps goal" (`schbench.c:1995`): the live per-thread rate
2200    // at exit * message_threads. Read after the scope (all threads joined, so the
2201    // control loop's last store is visible).
2202    let final_rps_goal = live_rate.load(Ordering::Relaxed) * config.message_threads;
2203    let loop_count = progress.load(Ordering::Relaxed);
2204    let elapsed_ns = monotonic_nanos().saturating_sub(start);
2205    let achieved_rps = if elapsed_ns > 0 {
2206        loop_count as f64 / (elapsed_ns as f64 / 1e9)
2207    } else {
2208        0.0
2209    };
2210    // Average the per-thread run-queue waits, matching schbench's
2211    // collect_sched_delay (`schbench.c:1673-1674`): message delay over
2212    // message_threads, worker delay over all workers. (Whole-run mean-of-means;
2213    // the per-phase path uses sample-weighted Σrd/Σpc — see SchbenchPhaseStats.)
2214    let sched_delay_msg_ns = total_msg_sched_delay / (config.message_threads.max(1) as u64);
2215    let total_workers = (config.message_threads * worker_threads).max(1) as u64;
2216    let sched_delay_worker_ns = total_worker_sched_delay / total_workers;
2217
2218    SchbenchOutcome {
2219        whole_run: SchbenchResult {
2220            wakeup: all_wakeup.percentiles(),
2221            request: all_request.percentiles(),
2222            rps: rps_stats.percentiles(),
2223            loop_count,
2224            nr_workers: total_workers as usize,
2225            achieved_rps,
2226            final_rps_goal,
2227            sched_delay_msg_ns,
2228            sched_delay_worker_ns,
2229        },
2230        phases: all_phases.into_iter().collect(),
2231    }
2232}
2233
2234#[cfg(test)]
2235mod tests {
2236    use super::*;
2237
2238    /// Lightweight node for stack stress tests (ThreadData is far heavier —
2239    /// two boxed `PlatStats` histograms plus per-thread state — too heavy to
2240    /// allocate by the thousand).
2241    struct TestNode {
2242        next: AtomicPtr<TestNode>,
2243    }
2244    impl Linked for TestNode {
2245        fn next_link(&self) -> &AtomicPtr<Self> {
2246            &self.next
2247        }
2248    }
2249    impl TestNode {
2250        fn new() -> Self {
2251            Self {
2252                next: AtomicPtr::new(ptr::null_mut()),
2253            }
2254        }
2255    }
2256
2257    #[test]
2258    fn resolve_worker_count_divides_cpuset_across_message_threads() {
2259        // Explicit non-zero worker_threads is honored as-is (per message thread).
2260        let c = SchbenchConfig::default()
2261            .worker_threads(3)
2262            .message_threads(4);
2263        assert_eq!(c.resolve_worker_count(8), 3);
2264
2265        // 0-default mirrors schbench's ceil(cpuset_cpus / message_threads)
2266        // (schbench.c:1849-1852), scoped to the cpuset count:
2267        //   m=1: 8/1 = 8 per thread, total 8 (== the cpuset count; no divide).
2268        let c = SchbenchConfig::default().message_threads(1);
2269        assert_eq!(c.resolve_worker_count(8), 8);
2270        //   m=2: 8/2 = 4 per thread, total 8.
2271        let c = SchbenchConfig::default().message_threads(2);
2272        assert_eq!(c.resolve_worker_count(8), 4);
2273        //   m=3: ceil(8/3) = 3 per thread, total 9 (≈ 8; schbench rounds up too).
2274        let c = SchbenchConfig::default().message_threads(3);
2275        assert_eq!(c.resolve_worker_count(8), 3);
2276        //   message_threads floored at 1 — no div-by-zero.
2277        let c = SchbenchConfig::default().message_threads(0);
2278        assert_eq!(c.resolve_worker_count(8), 8);
2279    }
2280
2281    #[test]
2282    fn matrix_size_matches_schbench_formula() {
2283        // sqrt(256*1024/3/8) = sqrt(10922) = 104.
2284        assert_eq!(
2285            SchbenchConfig {
2286                cache_footprint_kib: 256,
2287                operations: 5,
2288                ..Default::default()
2289            }
2290            .matrix_size(),
2291            104
2292        );
2293        // Zero operations -> no matrix work.
2294        assert_eq!(
2295            SchbenchConfig {
2296                operations: 0,
2297                ..Default::default()
2298            }
2299            .matrix_size(),
2300            0
2301        );
2302    }
2303
2304    #[test]
2305    fn split_matrix_sizes_match_schbench_formula() {
2306        // schbench.c:1858-1863: with --split=p the footprint splits into a
2307        // private matrix (p%) and a shared matrix (100-p%); each dimension is
2308        // sqrt(kib*frac/100 * 1024/3/sizeof(ulong)). None => the legacy single
2309        // all-private matrix (full footprint) and no shared matrix.
2310        let cfg = |split| SchbenchConfig {
2311            cache_footprint_kib: 256,
2312            operations: 5,
2313            split_percent: split,
2314            ..Default::default()
2315        };
2316        // None: private == the full-footprint matrix_size (104), no shared.
2317        assert_eq!(cfg(None).private_matrix_size(), 104);
2318        assert_eq!(cfg(None).shared_matrix_size(), 0);
2319        // 0%: nothing private; the whole footprint is the shared matrix.
2320        assert_eq!(cfg(Some(0)).private_matrix_size(), 0);
2321        assert_eq!(cfg(Some(0)).shared_matrix_size(), 104);
2322        // 25%: 64 KiB private (sqrt(64*1024/3/8)=52), 192 KiB shared (=90).
2323        assert_eq!(cfg(Some(25)).private_matrix_size(), 52);
2324        assert_eq!(cfg(Some(25)).shared_matrix_size(), 90);
2325        // 100%: all private (full footprint), no shared work.
2326        assert_eq!(cfg(Some(100)).private_matrix_size(), 104);
2327        assert_eq!(cfg(Some(100)).shared_matrix_size(), 0);
2328        // Zero operations -> no matrix work either way.
2329        let none_ops = SchbenchConfig {
2330            operations: 0,
2331            split_percent: Some(50),
2332            ..Default::default()
2333        };
2334        assert_eq!(none_ops.private_matrix_size(), 0);
2335        assert_eq!(none_ops.shared_matrix_size(), 0);
2336    }
2337
2338    #[test]
2339    fn ops_split_matches_schbench_integer_division() {
2340        // schbench.c:1391-1392: ops_private = operations*split/100 (integer
2341        // truncation), ops_shared = operations - ops_private. Returns
2342        // (ops_shared, ops_private).
2343        assert_eq!(ops_split(5, 0), (5, 0)); // all shared
2344        assert_eq!(ops_split(5, 25), (4, 1)); // 5*25/100 = 1 private
2345        assert_eq!(ops_split(5, 50), (3, 2)); // 5*50/100 = 2 private
2346        assert_eq!(ops_split(5, 100), (0, 5)); // all private
2347    }
2348
2349    #[test]
2350    fn engine_split_runs_shared_matrix_concurrently() {
2351        // --split with two workers: both concurrently multiply into the ONE
2352        // process-global shared AtomicU64 matrix (per-k stores to the shared C
2353        // cells -- the deliberate cache contention schbench models). Reaching the
2354        // assertion proves the shared-matrix path
2355        // neither deadlocks nor trips a data-race trap -- the atomics make the
2356        // shared race sound (vs schbench's plain shared-memory race). operations=8
2357        // @ split=50 => 4 shared + 4 private ops/cycle, so both paths run. Stop is
2358        // event-driven (spin on the progress counter, not a sleep).
2359        let config = SchbenchConfig {
2360            message_threads: 1,
2361            worker_threads: 2,
2362            cache_footprint_kib: 16,
2363            operations: 8,
2364            sleep_usec: 0,
2365            skip_locking: false,
2366            requests_per_sec: 0,
2367            auto_rps: 0,
2368            split_percent: Some(50),
2369            pipe_transfer_bytes: 0,
2370        };
2371        // Sanity: both split matrices are non-empty at this footprint, so the
2372        // test exercises shared + private work, not a degenerate 0-dim path.
2373        assert!(config.shared_matrix_size() > 0, "shared matrix present");
2374        assert!(config.private_matrix_size() > 0, "private matrix present");
2375        let stop = AtomicBool::new(false);
2376        let progress = AtomicU64::new(0);
2377        let outcome = std::thread::scope(|s| {
2378            let runner = s.spawn(|| run(&config, &stop, &progress, None));
2379            while progress.load(Ordering::Relaxed) < 50 {
2380                core::hint::spin_loop();
2381            }
2382            stop.store(true, Ordering::Release);
2383            runner.join().expect("run panicked")
2384        });
2385        assert!(
2386            outcome.whole_run.loop_count >= 50,
2387            "engine did split work: {}",
2388            outcome.whole_run.loop_count
2389        );
2390    }
2391
2392    #[test]
2393    fn pipe_fill_writes_every_byte_and_sizes_the_page() {
2394        // pipe_fill is the per-cycle memory transfer (schbench's pipe_page memset,
2395        // schbench.c:980-981/:1003-1004): it must touch EVERY byte of a
2396        // pipe_bytes-sized page. The bytes are never read back, so a dead-store
2397        // elision would silently zero the workload -- the black_box in pipe_fill
2398        // prevents that; this pins the observable effect (every byte written).
2399        let td = ThreadData::new(64);
2400        assert_eq!(td.pipe_bytes, 64);
2401        // SAFETY: single-threaded test, exclusive access to this td's page.
2402        unsafe { td.pipe_fill(2) };
2403        // SAFETY: same thread; no concurrent access.
2404        assert_eq!(
2405            unsafe { &*td.pipe_page.get() }.len(),
2406            64,
2407            "page sized to pipe_bytes"
2408        );
2409        assert!(
2410            unsafe { &*td.pipe_page.get() }.iter().all(|&b| b == 2),
2411            "worker fill (2) touches every byte"
2412        );
2413        // SAFETY: exclusive.
2414        unsafe { td.pipe_fill(1) };
2415        assert!(
2416            unsafe { &*td.pipe_page.get() }.iter().all(|&b| b == 1),
2417            "waker fill (1) overwrites every byte"
2418        );
2419        // Non-pipe ThreadData (pipe_bytes == 0) allocates an empty page: no transfer.
2420        let none = ThreadData::new(0);
2421        assert_eq!(none.pipe_bytes, 0);
2422        // SAFETY: exclusive.
2423        assert_eq!(unsafe { &*none.pipe_page.get() }.len(), 0);
2424    }
2425
2426    #[test]
2427    fn pipe_transfer_bytes_clamps_to_one_mib() {
2428        // schbench clamps -p to PIPE_TRANSFER_BUFFER (schbench.c:41,291-294); run()
2429        // applies the same `.min()` before sizing each worker's page.
2430        assert_eq!(PIPE_TRANSFER_BUFFER, 1024 * 1024);
2431        assert_eq!(
2432            (2 * PIPE_TRANSFER_BUFFER).min(PIPE_TRANSFER_BUFFER),
2433            PIPE_TRANSFER_BUFFER,
2434            "over-cap pipe size clamps to the 1 MiB buffer"
2435        );
2436        // At/under-cap values pass through unchanged.
2437        assert_eq!(4096_usize.min(PIPE_TRANSFER_BUFFER), 4096);
2438    }
2439
2440    #[test]
2441    fn pipe_transfer_report_is_per_worker_scales_and_clamps() {
2442        // PER-WORKER: ops/sec = achieved_rps / nr_workers
2443        // (schbench.c:1942-1943 divide by Σ worker runtimes ≈ nr_workers*elapsed).
2444        // 2048 aggregate cycles/sec / 2 workers = 1024 per-worker ops/sec.
2445        let r = pipe_transfer_report(2048.0, 1, 2);
2446        assert_eq!(r.ops_per_sec, 1024.0);
2447        // pretty_size (schbench.c:1606-1620): 1024 per-worker B/s -> 1.00 KB/s.
2448        assert_eq!((r.scaled, r.unit), (1.0, "KB"));
2449        // 1 worker, 1 B/cycle @ 1 cycle/s -> 1.00 B/s.
2450        let one = pipe_transfer_report(1.0, 1, 1);
2451        assert_eq!((one.ops_per_sec, one.scaled, one.unit), (1.0, 1.0, "B"));
2452        // 1 MiB/cycle @ 1 cycle/s/worker -> 1.00 MB/s.
2453        assert_eq!(pipe_transfer_report(1.0, 1 << 20, 1).unit, "MB");
2454        // CLAMP: an over-cap pipe size reports the clamped 1 MiB, not
2455        // the requested 2 MiB -- the engine moves only the clamped size per cycle
2456        // (schbench clamps at parse, schbench.c:291-294).
2457        let over = pipe_transfer_report(1.0, 2 << 20, 1);
2458        let cap = pipe_transfer_report(1.0, 1 << 20, 1);
2459        assert_eq!(
2460            (over.scaled, over.unit),
2461            (cap.scaled, cap.unit),
2462            "over-cap pipe size clamps to 1 MiB"
2463        );
2464        // EB cap: a huge byte rate never scales past the last unit (no overflow).
2465        assert_eq!(pipe_transfer_report(f64::MAX, 1 << 20, 1).unit, "EB");
2466        // nr_workers 0 -> floored at 1, never a division by zero.
2467        assert_eq!(pipe_transfer_report(100.0, 4096, 0).ops_per_sec, 100.0);
2468    }
2469
2470    #[test]
2471    fn engine_pipe_mode_transfers_without_matrix_work() {
2472        // Pipe mode (-p): the worker loop skips do_work + the think-sleep, and the
2473        // per-cycle work is the cross-thread pipe_page handshake -- the worker
2474        // memsets its page to 2 before blocking, the waker memsets it to 1 on wake
2475        // (schbench.c:980-981/:1003-1004). Two workers exercise the concurrent
2476        // waker->worker page writes. Reaching the assertions proves the handshake
2477        // runs to completion: no deadlock and no data-race trap on the cross-thread
2478        // `pipe_page` UnsafeCell (the wait-list + futex ordering keeps it race-free,
2479        // see the Sync SAFETY note). `operations`/`cache_footprint_kib` are set
2480        // LARGE on purpose: if do_work erroneously ran, every cycle would log a
2481        // multi-us request-latency sample -- the second assertion would then fail.
2482        // Stop is event-driven (spin on the progress counter, not a sleep).
2483        let config = SchbenchConfig::default()
2484            .message_threads(1)
2485            .worker_threads(2)
2486            .sleep_usec(0)
2487            .operations(50)
2488            .cache_footprint_kib(256)
2489            .pipe_transfer_bytes(4096);
2490        assert!(config.pipe_transfer_bytes > 0, "pipe mode engaged");
2491        assert!(
2492            config.matrix_size() > 0,
2493            "matrix work would be non-trivial if do_work ran (regression guard)"
2494        );
2495        let stop = AtomicBool::new(false);
2496        let progress = AtomicU64::new(0);
2497        let outcome = std::thread::scope(|s| {
2498            let runner = s.spawn(|| run(&config, &stop, &progress, None));
2499            while progress.load(Ordering::Relaxed) < 50 {
2500                core::hint::spin_loop();
2501            }
2502            stop.store(true, Ordering::Release);
2503            runner.join().expect("run panicked")
2504        });
2505        // loop_count accrues one per completed transfer cycle (schbench.c:1479),
2506        // proving the pipe handshake + memsets ran end-to-end without deadlock.
2507        assert!(
2508            outcome.whole_run.loop_count >= 50,
2509            "pipe engine transferred {} cycles",
2510            outcome.whole_run.loop_count
2511        );
2512        // Discriminating assertion: do_work is SKIPPED, so the
2513        // back-to-back work_start->now span yields sub-us request deltas that the
2514        // `delta_us > 0` filter drops -- request stats stay ~empty. If matrix work
2515        // erroneously ran at operations=50/256KiB, nearly every cycle would record a
2516        // >0 request delta and request samples would approach loop_count, failing
2517        // this. (A rare us-straddle may record a handful; `* 4 < loop_count` tolerates
2518        // that while still catching a real do_work regression.)
2519        assert!(
2520            outcome.whole_run.request.nr_samples * 4 < outcome.whole_run.loop_count,
2521            "pipe mode skips matrix work: {} request samples vs {} cycles",
2522            outcome.whole_run.request.nr_samples,
2523            outcome.whole_run.loop_count
2524        );
2525    }
2526
2527    #[test]
2528    #[should_panic(expected = "split_percent must be in 0..=100")]
2529    fn engine_panics_on_out_of_range_split() {
2530        // Regression pin for the out-of-range finding: a `pub split_percent` set
2531        // past 100 via STRUCT LITERAL (the path that bypasses the builder's
2532        // debug_assert) must fail LOUDLY at the run() consumption boundary in
2533        // every build profile -- the analog of schbench exiting on a bad
2534        // `--split` -- never silently underflow `100 - p` into a garbage-huge
2535        // shared matrix. The assert fires before any thread spawn or allocation,
2536        // so this is cheap (`stop` is pre-set just in case).
2537        let config = SchbenchConfig {
2538            message_threads: 1,
2539            worker_threads: 1,
2540            cache_footprint_kib: 16,
2541            operations: 1,
2542            sleep_usec: 0,
2543            skip_locking: false,
2544            requests_per_sec: 0,
2545            auto_rps: 0,
2546            split_percent: Some(101),
2547            pipe_transfer_bytes: 0,
2548        };
2549        let stop = AtomicBool::new(true);
2550        let progress = AtomicU64::new(0);
2551        let _ = run(&config, &stop, &progress, None);
2552    }
2553
2554    #[test]
2555    fn stack_add_splice_is_lifo() {
2556        let a = TestNode::new();
2557        let b = TestNode::new();
2558        let stack: TreiberStack<TestNode> = TreiberStack::new();
2559        assert!(stack.splice().is_null(), "empty stack splices to null");
2560        stack.add(&a as *const _ as *mut _);
2561        stack.add(&b as *const _ as *mut _);
2562        // LIFO: b (pushed last) is the head; a follows.
2563        let head = stack.splice();
2564        assert_eq!(head.cast_const(), &b as *const TestNode);
2565        // SAFETY: head -> b (alive on stack); its link -> a.
2566        let second = unsafe { (*head).next.load(Ordering::Acquire) };
2567        assert_eq!(second.cast_const(), &a as *const TestNode);
2568        // SAFETY: second -> a; its link is null.
2569        assert!(unsafe { (*second).next.load(Ordering::Acquire) }.is_null());
2570        assert!(stack.splice().is_null(), "splice emptied the stack");
2571    }
2572
2573    #[test]
2574    fn stack_concurrent_add_loses_no_nodes() {
2575        // N threads each concurrently push K distinct nodes; after all join, a
2576        // single splice must return exactly N*K nodes. A broken CAS (lost
2577        // update / ABA) would drop nodes and the count would be short. Push is
2578        // deterministic (each node pushed once), so this never hangs.
2579        const THREADS: usize = 8;
2580        const PER_THREAD: usize = 2000;
2581        let nodes: Vec<TestNode> = (0..THREADS * PER_THREAD).map(|_| TestNode::new()).collect();
2582        let stack = TreiberStack::new();
2583
2584        std::thread::scope(|s| {
2585            for chunk in nodes.chunks(PER_THREAD) {
2586                let stack = &stack;
2587                s.spawn(move || {
2588                    for n in chunk {
2589                        stack.add(n as *const TestNode as *mut TestNode);
2590                    }
2591                });
2592            }
2593        });
2594
2595        // All pushes complete (scope joined). Drain and count distinct nodes.
2596        let mut seen = std::collections::HashSet::new();
2597        let mut cur = stack.splice();
2598        while !cur.is_null() {
2599            assert!(seen.insert(cur), "node observed twice");
2600            // SAFETY: cur is one of the live `nodes`; next is its link.
2601            cur = unsafe { (*cur).next.load(Ordering::Acquire) };
2602        }
2603        assert_eq!(
2604            seen.len(),
2605            THREADS * PER_THREAD,
2606            "no node lost under contention"
2607        );
2608    }
2609
2610    #[test]
2611    fn engine_runs_and_produces_latency_samples() {
2612        // Small topology; run until ~50 work cycles complete, then stop. The
2613        // test completing proves the shutdown does not hang (a scope-join
2614        // shutdown bug would block here until the nextest timeout). Stop is
2615        // event-driven: spin on the shared progress counter, not a sleep.
2616        let config = SchbenchConfig {
2617            message_threads: 1,
2618            worker_threads: 2,
2619            cache_footprint_kib: 16,
2620            operations: 1,
2621            sleep_usec: 0,
2622            skip_locking: false,
2623            requests_per_sec: 0,
2624            auto_rps: 0,
2625            split_percent: None,
2626            pipe_transfer_bytes: 0,
2627        };
2628        let stop = AtomicBool::new(false);
2629        let progress = AtomicU64::new(0);
2630        let outcome = std::thread::scope(|s| {
2631            let runner = s.spawn(|| run(&config, &stop, &progress, None));
2632            while progress.load(Ordering::Relaxed) < 50 {
2633                core::hint::spin_loop();
2634            }
2635            stop.store(true, Ordering::Release);
2636            runner.join().expect("run panicked")
2637        });
2638        let result = &outcome.whole_run;
2639        assert!(
2640            result.loop_count >= 50,
2641            "engine did work: {}",
2642            result.loop_count
2643        );
2644        assert!(result.wakeup.nr_samples > 0, "wakeup samples recorded");
2645        assert!(result.request.nr_samples > 0, "request samples recorded");
2646        assert!(result.achieved_rps > 0.0, "positive achieved rps");
2647        // Non-phasic (phase_epoch None): the only snapshot is the baseline
2648        // epoch 0, which the host discards. No per-phase metrics are emitted.
2649        assert_eq!(
2650            outcome.phases.len(),
2651            1,
2652            "non-phasic => single baseline phase"
2653        );
2654        assert_eq!(outcome.phases[0].0, 0, "the lone phase is BASELINE epoch 0");
2655    }
2656
2657    #[test]
2658    fn engine_terminates_when_lone_worker_stops() {
2659        // Regression for the shutdown deadlock: with a SINGLE worker there is no
2660        // second worker to post the message thread, so a worker that exits on
2661        // `stop` while the message thread is parked in `wait_forever` must wake it
2662        // (the unconditional post at worker_loop's exit), or run() hangs forever.
2663        // Reaching the assertion at all proves run() returned. `sleep_usec` 0
2664        // keeps the lone worker almost always mid-`do_work` when stop fires (the
2665        // deadlock-prone window). engine_runs / engine_splits use 2 workers and
2666        // missed this -- a second worker posting the message thread masks the bug;
2667        // a hang here is the nextest timeout, exactly how the bug first surfaced.
2668        let config = SchbenchConfig {
2669            message_threads: 1,
2670            worker_threads: 1,
2671            cache_footprint_kib: 256,
2672            operations: 5,
2673            sleep_usec: 0,
2674            skip_locking: false,
2675            requests_per_sec: 0,
2676            auto_rps: 0,
2677            split_percent: None,
2678            pipe_transfer_bytes: 0,
2679        };
2680        let stop = AtomicBool::new(false);
2681        let progress = AtomicU64::new(0);
2682        let outcome = std::thread::scope(|s| {
2683            let runner = s.spawn(|| run(&config, &stop, &progress, None));
2684            while progress.load(Ordering::Relaxed) < 10 {
2685                core::hint::spin_loop();
2686            }
2687            stop.store(true, Ordering::Release);
2688            // Deadlocks here on regression: the lone worker exits without waking
2689            // the parked message thread, so this join never returns.
2690            runner.join().expect("run panicked")
2691        });
2692        assert!(
2693            outcome.whole_run.loop_count >= 10,
2694            "engine did work and returned: {}",
2695            outcome.whole_run.loop_count
2696        );
2697    }
2698
2699    #[test]
2700    fn engine_rps_mode_injects_drains_and_terminates() {
2701        // RPS-injector mode (requests_per_sec != 0): a dedicated thread enqueues
2702        // requests round-robin across the workers, which splice + drain their
2703        // queues instead of the message-thread handshake. Verify the RPS path
2704        // runs (loop_count + request samples) and terminates cleanly -- run()
2705        // returns (no deadlock: the RPS thread is the waker, the worker the
2706        // waitee, and shutdown wakes the workers via run_one_message_thread's
2707        // worker-wake + the RPS thread's stop-time fpost-all; a hang is the
2708        // nextest timeout) with no request leak (the post-join free).
2709        let config = SchbenchConfig {
2710            message_threads: 1,
2711            worker_threads: 2,
2712            cache_footprint_kib: 16,
2713            operations: 1,
2714            sleep_usec: 0,
2715            skip_locking: false,
2716            requests_per_sec: 10_000,
2717            auto_rps: 0,
2718            split_percent: None,
2719            pipe_transfer_bytes: 0,
2720        };
2721        let stop = AtomicBool::new(false);
2722        let progress = AtomicU64::new(0);
2723        let outcome = std::thread::scope(|s| {
2724            let runner = s.spawn(|| run(&config, &stop, &progress, None));
2725            while progress.load(Ordering::Relaxed) < 50 {
2726                core::hint::spin_loop();
2727            }
2728            stop.store(true, Ordering::Release);
2729            // Deadlocks here on regression: a worker parked in rps_wait that the
2730            // shutdown path fails to wake, or the RPS thread failing to observe
2731            // stop, would hang this join.
2732            runner.join().expect("run panicked")
2733        });
2734        let result = &outcome.whole_run;
2735        assert!(
2736            result.loop_count >= 50,
2737            "RPS workers serviced requests: {}",
2738            result.loop_count
2739        );
2740        assert!(
2741            result.request.nr_samples > 0,
2742            "RPS request-latency samples recorded"
2743        );
2744    }
2745
2746    #[test]
2747    fn rps_injector_pacing_sleep_is_bounded_for_prompt_shutdown() {
2748        // requests_per_sec=1, message_threads=1 => per-thread inter-arrival
2749        // interval ~1s. A stop set while the injector is in that paced sleep must be
2750        // observed within STOP_POLL_QUANTUM_NS (50 ms), so run()'s message-thread
2751        // join completes well under one interval. An unbounded pacing sleep would
2752        // block the join ~1s. 500 ms is 10x the quantum (robust to scheduling
2753        // jitter) and half the interval (cleanly distinguishes bounded from
2754        // unbounded). The 20 ms warmup reaches the long paced sleep before stop (at
2755        // 1 req/s a progress-spin could outlast the test).
2756        let config = SchbenchConfig {
2757            message_threads: 1,
2758            worker_threads: 1,
2759            cache_footprint_kib: 16,
2760            operations: 1,
2761            sleep_usec: 0,
2762            skip_locking: false,
2763            requests_per_sec: 1, // interval = 1s / 1 = 1s
2764            auto_rps: 0,
2765            split_percent: None,
2766            pipe_transfer_bytes: 0,
2767        };
2768        let stop = AtomicBool::new(false);
2769        let progress = AtomicU64::new(0);
2770        let start = std::time::Instant::now();
2771        std::thread::scope(|s| {
2772            let runner = s.spawn(|| run(&config, &stop, &progress, None));
2773            std::thread::sleep(std::time::Duration::from_millis(20));
2774            stop.store(true, Ordering::Release);
2775            let _ = runner.join().expect("run panicked");
2776        });
2777        let elapsed = start.elapsed();
2778        assert!(
2779            elapsed < std::time::Duration::from_millis(500),
2780            "RPS injector joined in {elapsed:?}; the paced sleep must be bounded \
2781             (interval ~1s — an unbounded sleep would block the join that long)",
2782        );
2783    }
2784
2785    #[test]
2786    fn engine_rps_below_message_threads_falls_to_default() {
2787        // Regression: when the -R total is below the message-thread count,
2788        // the per-thread rate (rps_per_message_thread) rounds to 0, and the
2789        // engine must run the DEFAULT message-handshake mode -- workers do real
2790        // wake-cycle work -- matching schbench's startup-divide-before-gate. NOT
2791        // a zero-rate RPS mode (which would leave the workers parked with no
2792        // requests, never advancing progress -- the spin below would then hang,
2793        // caught by the nextest timeout).
2794        let config = SchbenchConfig {
2795            message_threads: 2,
2796            worker_threads: 1,
2797            cache_footprint_kib: 16,
2798            operations: 1,
2799            sleep_usec: 0,
2800            skip_locking: false,
2801            requests_per_sec: 1, // 1 / 2 message threads = 0 per thread -> default
2802            auto_rps: 0,
2803            split_percent: None,
2804            pipe_transfer_bytes: 0,
2805        };
2806        let stop = AtomicBool::new(false);
2807        let progress = AtomicU64::new(0);
2808        let outcome = std::thread::scope(|s| {
2809            let runner = s.spawn(|| run(&config, &stop, &progress, None));
2810            while progress.load(Ordering::Relaxed) < 50 {
2811                core::hint::spin_loop();
2812            }
2813            stop.store(true, Ordering::Release);
2814            runner.join().expect("run panicked")
2815        });
2816        assert!(
2817            outcome.whole_run.loop_count >= 50,
2818            "sub-1-per-thread RPS ran default-mode work: {}",
2819            outcome.whole_run.loop_count
2820        );
2821    }
2822
2823    #[test]
2824    fn auto_scale_grows_when_idle_and_shrinks_when_busy() {
2825        // Pure scaling math (no /proc/stat I/O) at injected busy levels, matching
2826        // schbench's auto_scale_rps body (`schbench.c:1203-1250`).
2827
2828        // Far under target: delta = 80/10 = 8, capped to 3 -> ceil(100*3) = 300.
2829        // Not a near-target transition, so just_hit is false (the >3 cap branch
2830        // never trips target_hit).
2831        let rate = AtomicUsize::new(100);
2832        let hit = AtomicBool::new(false);
2833        assert!(!scale_rps_for_busy(10.0, &rate, &hit, 80));
2834        assert_eq!(rate.load(Ordering::Acquire), 300);
2835        assert!(!hit.load(Ordering::Acquire));
2836
2837        // Far over target: delta = 20/100 = 0.2, clamped to 0.3 -> floor(300*0.3)
2838        // = 90. Also not a near-target transition.
2839        let rate = AtomicUsize::new(300);
2840        let hit = AtomicBool::new(false);
2841        assert!(!scale_rps_for_busy(100.0, &rate, &hit, 20));
2842        assert_eq!(rate.load(Ordering::Acquire), 90);
2843        assert!(!hit.load(Ordering::Acquire));
2844
2845        // Exactly on target: rate held, target_hit set on the first arrival
2846        // (`schbench.c:1243-1248`).
2847        let rate = AtomicUsize::new(123);
2848        let hit = AtomicBool::new(false);
2849        assert!(scale_rps_for_busy(50.0, &rate, &hit, 50));
2850        assert_eq!(rate.load(Ordering::Acquire), 123);
2851        assert!(hit.load(Ordering::Acquire));
2852    }
2853
2854    #[test]
2855    fn auto_scale_target_hit_uses_post_damping_delta() {
2856        // The near-target test reads the delta AFTER its damping reassignment
2857        // (`schbench.c:1209-1213` grow, `:1232-1236` shrink) -- the bug the
2858        // pre-reassignment check had. These cases trip target_hit ONLY under the
2859        // correct post-damping order.
2860
2861        // Grow band: busy 90, target 100 -> delta 1.111 (NOT < 1.05), damped to
2862        // 1 + 0.111/8 = 1.0139 (< 1.05) -> target_hit. A pre-damping check on
2863        // 1.111 would miss it.
2864        let rate = AtomicUsize::new(1000);
2865        let hit = AtomicBool::new(false);
2866        assert!(scale_rps_for_busy(90.0, &rate, &hit, 100));
2867        assert!(hit.load(Ordering::Acquire));
2868
2869        // Shrink band: busy 100, target 95 -> delta 0.95 (NOT > 0.95), damped to
2870        // 0.95 + 0.05/8 = 0.95625 (> 0.95) -> target_hit. A pre-damping check on
2871        // 0.95 would miss it.
2872        let rate = AtomicUsize::new(1000);
2873        let hit = AtomicBool::new(false);
2874        assert!(scale_rps_for_busy(100.0, &rate, &hit, 95));
2875        assert!(hit.load(Ordering::Acquire));
2876
2877        // Already hit: no second transition is reported (just_hit guards on
2878        // !already_hit), even on an exact-target tick.
2879        let rate = AtomicUsize::new(1000);
2880        let hit = AtomicBool::new(true);
2881        assert!(!scale_rps_for_busy(50.0, &rate, &hit, 50));
2882    }
2883
2884    #[test]
2885    fn reset_rps_accumulators_clears_both_in_lockstep() {
2886        // The auto-RPS target-hit reset MUST clear the whole-run AND per-epoch rps
2887        // accumulators together; if a future edit drops one, Σ-per-epoch != whole-run
2888        // (asymmetric-counter-bookkeeping). Pins the lockstep deterministically — the
2889        // engine just_hit path is timing-dependent (the controller must reach target).
2890        let mut whole = PlatStats::default();
2891        whole.add_lat(500);
2892        let mut per_epoch: std::collections::BTreeMap<u32, PlatStats> =
2893            std::collections::BTreeMap::new();
2894        per_epoch.entry(1).or_default().add_lat(500);
2895        reset_rps_accumulators(&mut whole, &mut per_epoch);
2896        assert_eq!(whole.sample_count(), 0, "whole-run rps reset");
2897        assert!(per_epoch.is_empty(), "per-epoch rps reset in lockstep");
2898    }
2899
2900    #[test]
2901    fn control_loop_rps_sample_floors_sub_quantum_window() {
2902        // The bounded teardown sleep can return on `stop` with a tiny dt; a
2903        // count/dt rate is unbounded as dt->0, so a sub-quantum window is dropped
2904        // rather than spiking rps_max / the top rps percentiles (assertable
2905        // metrics). Pins the floor deterministically -- the race that produces the
2906        // tiny dt is timing-dependent, but the math + floor it feeds is pure.
2907        // dt==0 is dropped, not a divide-by-zero:
2908        assert_eq!(control_loop_rps_sample(3, 0), None);
2909        // A 1us / one-completion window would read as ~1e6 cycles/sec -- dropped:
2910        assert_eq!(control_loop_rps_sample(1, 1_000), None);
2911        // Just under one quantum is still dropped:
2912        assert_eq!(control_loop_rps_sample(5, STOP_POLL_QUANTUM_NS - 1), None);
2913        // Exactly one quantum is admitted; the rate is bounded to
2914        // delta_loops * 1e9 / quantum, no inflation: 1 cycle / 50ms = 20/s.
2915        assert_eq!(control_loop_rps_sample(1, STOP_POLL_QUANTUM_NS), Some(20));
2916        // Even a burst of 1000 completions in one quantum caps at 1000*20 =
2917        // 20_000/s -- not the millions an unfloored 1us window would yield.
2918        assert_eq!(
2919            control_loop_rps_sample(1_000, STOP_POLL_QUANTUM_NS),
2920            Some(20_000)
2921        );
2922        // The common case -- a full ~1s window -- is unchanged: 5000/s.
2923        assert_eq!(control_loop_rps_sample(5_000, 1_000_000_000), Some(5_000));
2924    }
2925
2926    #[test]
2927    fn engine_auto_rps_mode_runs_and_terminates() {
2928        // auto_rps != 0 turns on the once-per-second control thread that
2929        // auto-scales the live rate toward the busy target. Seeded with an
2930        // explicit -R so the injector services requests immediately (progress
2931        // hits 50 well before the first 1s control tick), this proves the
2932        // auto-RPS path boots, injects, and run() terminates cleanly (the control
2933        // thread observes stop; a hang is the nextest timeout). The scaling MATH
2934        // is pinned by the scale_rps_for_busy unit tests; host-busy + target-hit
2935        // timing is environment-dependent, so there is no sample-count assertion.
2936        let config = SchbenchConfig {
2937            message_threads: 1,
2938            worker_threads: 2,
2939            cache_footprint_kib: 16,
2940            operations: 1,
2941            sleep_usec: 0,
2942            skip_locking: false,
2943            requests_per_sec: 10_000,
2944            auto_rps: 50,
2945            split_percent: None,
2946            pipe_transfer_bytes: 0,
2947        };
2948        let stop = AtomicBool::new(false);
2949        let progress = AtomicU64::new(0);
2950        let outcome = std::thread::scope(|s| {
2951            let runner = s.spawn(|| run(&config, &stop, &progress, None));
2952            while progress.load(Ordering::Relaxed) < 50 {
2953                core::hint::spin_loop();
2954            }
2955            stop.store(true, Ordering::Release);
2956            runner.join().expect("run panicked")
2957        });
2958        assert!(
2959            outcome.whole_run.loop_count >= 50,
2960            "auto-RPS injector serviced requests: {}",
2961            outcome.whole_run.loop_count
2962        );
2963    }
2964
2965    #[test]
2966    fn engine_splits_stats_across_phase_epochs() {
2967        // Drive the shared phase_epoch 1 -> 2 mid-run and confirm the engine
2968        // partitions its histograms + loop_count into per-epoch snapshots
2969        // (the scx-phase vs detached-EEVDF-phase mechanism). Event-driven via
2970        // the progress counter, no sleeps.
2971        let config = SchbenchConfig {
2972            message_threads: 1,
2973            worker_threads: 2,
2974            cache_footprint_kib: 16,
2975            operations: 1,
2976            sleep_usec: 0,
2977            skip_locking: false,
2978            requests_per_sec: 0,
2979            auto_rps: 0,
2980            split_percent: None,
2981            pipe_transfer_bytes: 0,
2982        };
2983        let stop = AtomicBool::new(false);
2984        let progress = AtomicU64::new(0);
2985        let epoch = AtomicU32::new(1); // start in a real step epoch (phase 1)
2986        let outcome = std::thread::scope(|s| {
2987            let runner = s.spawn(|| run(&config, &stop, &progress, Some(&epoch)));
2988            while progress.load(Ordering::Relaxed) < 50 {
2989                core::hint::spin_loop();
2990            }
2991            let after_phase1 = progress.load(Ordering::Relaxed);
2992            epoch.store(2, Ordering::Release); // transition to phase 2
2993            while progress.load(Ordering::Relaxed) < after_phase1 + 50 {
2994                core::hint::spin_loop();
2995            }
2996            stop.store(true, Ordering::Release);
2997            runner.join().expect("run panicked")
2998        });
2999
3000        let by_epoch: std::collections::BTreeMap<u32, &SchbenchPhaseStats> =
3001            outcome.phases.iter().map(|(e, s)| (*e, s)).collect();
3002        let p1 = by_epoch.get(&1).expect("phase 1 present");
3003        let p2 = by_epoch.get(&2).expect("phase 2 present");
3004        // Both phases ran work cycles (we waited for >=50 cycles in each
3005        // window). loop_count is the robust split proof: latency samples gate
3006        // on delta_us>0, which a sub-µs cycle can miss, but a cycle always
3007        // increments loop_count.
3008        assert!(p1.loop_count > 0, "phase 1 ran cycles: {}", p1.loop_count);
3009        assert!(p2.loop_count > 0, "phase 2 ran cycles: {}", p2.loop_count);
3010        // Per-phase run-delay split is populated per thread class. This guards a
3011        // DISTINCT path the loop_count/histogram asserts don't touch: the
3012        // schedstat raw-pair baseline/re-baseline (phase_ss_start) + the
3013        // per-class Σ fold (worker vs msg). pcount is the deterministic
3014        // denominator — any thread scheduled in the phase accrues pcount>=1
3015        // (sched_info_arrive), and both phases ran cycles (workers dispatched)
3016        // while the msg thread is scheduled on every wake. run_delay_ns itself
3017        // is NOT asserted: it can be 0 on an uncontended host, so a value check
3018        // would flake; pcount>0 pins that the class was measured and folded.
3019        assert!(
3020            p1.worker_pcount > 0,
3021            "phase 1 worker run-delay split populated"
3022        );
3023        assert!(
3024            p2.worker_pcount > 0,
3025            "phase 2 worker run-delay split populated"
3026        );
3027        assert!(p1.msg_pcount > 0, "phase 1 msg run-delay split populated");
3028        assert!(p2.msg_pcount > 0, "phase 2 msg run-delay split populated");
3029        // loop_count partitions across phases, summing to the global progress —
3030        // no cycle is lost or double-counted at the boundary.
3031        let loop_sum: u64 = outcome.phases.iter().map(|(_, s)| s.loop_count).sum();
3032        assert_eq!(
3033            loop_sum, outcome.whole_run.loop_count,
3034            "per-phase loop_count partitions the whole-run count"
3035        );
3036        // The whole-run histogram is EXACTLY the union of the per-phase ones
3037        // (the drained snapshots, recombined), so the per-phase split loses no
3038        // samples vs the whole-run accounting.
3039        let phase_request_sum: u64 = outcome
3040            .phases
3041            .iter()
3042            .map(|(_, s)| s.request.percentiles().nr_samples)
3043            .sum();
3044        assert_eq!(
3045            outcome.whole_run.request.nr_samples, phase_request_sum,
3046            "whole-run request count == Σ per-phase counts"
3047        );
3048        let phase_wakeup_sum: u64 = outcome
3049            .phases
3050            .iter()
3051            .map(|(_, s)| s.wakeup.percentiles().nr_samples)
3052            .sum();
3053        assert_eq!(
3054            outcome.whole_run.wakeup.nr_samples, phase_wakeup_sum,
3055            "whole-run wakeup count == Σ per-phase counts"
3056        );
3057        // Per-phase RPS: the additive invariant (deterministic) + a timing-tolerant
3058        // bound on the brief first phase. These phases run far under the 1s control
3059        // cadence, so phase 1 normally gets ZERO rps ticks (the ~1s tick lands in
3060        // phase 2, epoch already advanced); on a slow/contended host phase 1 may catch
3061        // a single straddle tick, so we tolerate <= 1 rather than a wall-clock-fragile
3062        // == 0. The deterministic empty-rps -> ABSENT contract is pinned by
3063        // derive_rps_distribution_values_and_absent_guard; this bound still catches
3064        // gross mis-attribution of the bulk of rps to the brief first phase.
3065        assert!(
3066            p1.rps.sample_count() <= 1,
3067            "brief phase 1 gets at most one straddle rps tick, got {}",
3068            p1.rps.sample_count()
3069        );
3070        let phase_rps_sum: u64 = outcome
3071            .phases
3072            .iter()
3073            .map(|(_, s)| s.rps.sample_count())
3074            .sum();
3075        assert_eq!(
3076            outcome.whole_run.rps.nr_samples, phase_rps_sum,
3077            "whole-run rps count == Σ per-phase rps counts"
3078        );
3079    }
3080
3081    #[test]
3082    fn engine_per_phase_rps_populates_and_sums_to_whole_run() {
3083        // POPULATED per-phase RPS: the control thread samples once/sec, so a phase
3084        // window >= ~2s gets multiple per-phase rps samples. Time-gated (the 1s
3085        // control cadence is the granularity floor) -- the progress-gated
3086        // engine_splits phases are too short to tick. Drive epoch 1 for ~2.2s then
3087        // epoch 2 for ~2.2s and confirm BOTH epochs carry rps samples AND the
3088        // per-epoch samples sum to the whole-run count (the additive invariant:
3089        // every sample lands in one epoch bucket AND the whole-run histogram).
3090        // The ~4.4s wall-clock is the irreducible cost of exercising the real 1s control
3091        // cadence end-to-end (2 ticks/epoch for robustness against a delayed tick on a
3092        // loaded host); the deterministic per-epoch invariants — the lockstep reset and
3093        // Σ-per-epoch==whole-run — are pinned sleep-free by
3094        // reset_rps_accumulators_clears_both_in_lockstep + engine_splits_stats_across_phase_epochs.
3095        // This test uniquely confirms the LIVE control thread POPULATES both epochs.
3096        let config = SchbenchConfig {
3097            message_threads: 1,
3098            worker_threads: 2,
3099            cache_footprint_kib: 16,
3100            operations: 1,
3101            sleep_usec: 0,
3102            skip_locking: false,
3103            requests_per_sec: 0,
3104            auto_rps: 0,
3105            split_percent: None,
3106            pipe_transfer_bytes: 0,
3107        };
3108        let stop = AtomicBool::new(false);
3109        let progress = AtomicU64::new(0);
3110        let epoch = AtomicU32::new(1);
3111        let outcome = std::thread::scope(|s| {
3112            let runner = s.spawn(|| run(&config, &stop, &progress, Some(&epoch)));
3113            // ~2.2s per epoch => >= 2 control ticks each (first tick at ~1s).
3114            std::thread::sleep(std::time::Duration::from_millis(2200));
3115            epoch.store(2, Ordering::Release);
3116            std::thread::sleep(std::time::Duration::from_millis(2200));
3117            stop.store(true, Ordering::Release);
3118            runner.join().expect("run panicked")
3119        });
3120        let by_epoch: std::collections::BTreeMap<u32, &SchbenchPhaseStats> =
3121            outcome.phases.iter().map(|(e, s)| (*e, s)).collect();
3122        let p1 = by_epoch.get(&1).expect("phase 1 present");
3123        let p2 = by_epoch.get(&2).expect("phase 2 present");
3124        assert!(
3125            p1.rps.sample_count() > 0,
3126            "phase 1 per-phase rps populated: {}",
3127            p1.rps.sample_count()
3128        );
3129        assert!(
3130            p2.rps.sample_count() > 0,
3131            "phase 2 per-phase rps populated: {}",
3132            p2.rps.sample_count()
3133        );
3134        // Additive invariant (no auto-RPS reset in default mode, so EXACT): Σ
3135        // per-epoch rps samples == whole-run rps samples.
3136        let phase_rps_sum: u64 = outcome
3137            .phases
3138            .iter()
3139            .map(|(_, s)| s.rps.sample_count())
3140            .sum();
3141        assert_eq!(
3142            phase_rps_sum, outcome.whole_run.rps.nr_samples,
3143            "Σ per-epoch rps samples == whole-run rps samples"
3144        );
3145    }
3146
3147    #[test]
3148    fn schbench_config_serde_roundtrips() {
3149        // The new serialized type roundtrips unchanged.
3150        let cfg = SchbenchConfig::default()
3151            .message_threads(3)
3152            .worker_threads(7)
3153            .cache_footprint_kib(512)
3154            .operations(9)
3155            .sleep_usec(250)
3156            .skip_locking(true)
3157            // Exercise the new field's Some(p) serde surface, not just the None
3158            // default (the API-review roundtrip requirement).
3159            .split_percent(Some(33))
3160            // Exercise the pipe field's non-default serde surface too.
3161            .pipe_transfer_bytes(4096);
3162        let json = serde_json::to_string(&cfg).expect("SchbenchConfig must serialize");
3163        let back: SchbenchConfig =
3164            serde_json::from_str(&json).expect("SchbenchConfig must deserialize");
3165        assert_eq!(cfg, back, "config roundtrips unchanged");
3166    }
3167
3168    #[test]
3169    fn worktype_schbench_registration_and_serde() {
3170        use crate::workload::WorkType;
3171        let wt = WorkType::schbench(
3172            SchbenchConfig::default()
3173                .message_threads(2)
3174                .worker_threads(4),
3175        );
3176        assert_eq!(wt.name(), "Schbench");
3177        // from_name yields the default-config variant.
3178        assert_eq!(
3179            WorkType::from_name("Schbench"),
3180            Some(WorkType::Schbench {
3181                config: SchbenchConfig::default()
3182            })
3183        );
3184        // The variant serde-roundtrips, carrying its config.
3185        let json = serde_json::to_string(&wt).expect("WorkType::Schbench must serialize");
3186        let back: WorkType =
3187            serde_json::from_str(&json).expect("WorkType::Schbench must deserialize");
3188        assert_eq!(wt, back);
3189    }
3190
3191    #[test]
3192    fn schbench_config_reachable_via_prelude() {
3193        // Regression-pin the prelude placement: test authors construct the
3194        // config via `use ktstr::prelude::*`. Dropping SchbenchConfig from the
3195        // prelude would fail this compile. Also exercises the Eq derive.
3196        let cfg: crate::prelude::SchbenchConfig = crate::prelude::SchbenchConfig::default();
3197        assert_eq!(cfg, SchbenchConfig::default());
3198    }
3199
3200    #[test]
3201    fn read_schedstat_raw_parses_own_and_handles_missing() {
3202        // The current thread has been scheduled, so its /proc/<tid>/schedstat
3203        // parses without panic into a (run_delay, pcount) pair.
3204        let _own = read_schedstat_raw(gettid_self());
3205        // A non-existent tid -> (0,0) via the file-read-failure path (thread
3206        // exited), matching schbench's fopen-failure handling. The parse
3207        // boundaries (incl. the pcount==0 mean guard) are covered below.
3208        assert_eq!(
3209            read_schedstat_raw(-1),
3210            (0, 0),
3211            "absent schedstat yields (0,0), no panic"
3212        );
3213    }
3214
3215    #[test]
3216    fn schedstat_raw_parse_and_mean_pcount_guard() {
3217        // Raw parse keeps run_delay + pcount undivided (the re-poolable pair).
3218        assert_eq!(parse_schedstat_raw("123456 50 5"), (50, 5));
3219        // mean_sched_delay divides with the pcount==0 guard (no div-by-zero).
3220        assert_eq!(mean_sched_delay((50, 5)), 10); // run_delay/pcount
3221        assert_eq!(mean_sched_delay((50, 0)), 0); // pcount==0 guard
3222        assert_eq!(mean_sched_delay(parse_schedstat_raw("0 0 0")), 0); // !sched_info_on()
3223        assert_eq!(mean_sched_delay((0, 5)), 0); // 0 run_delay -> 0 mean
3224    }
3225
3226    #[test]
3227    #[should_panic(expected = "schedstat field 3")]
3228    fn parse_schedstat_raw_short_line_panics() {
3229        // A present-but-short line is a kernel/parse bug: fail loud, not a
3230        // silent 0 (the fail-loud ruling).
3231        parse_schedstat_raw("100 50");
3232    }
3233
3234    #[test]
3235    #[should_panic(expected = "schedstat field 2")]
3236    fn parse_schedstat_raw_nonnumeric_panics() {
3237        parse_schedstat_raw("alpha beta gamma");
3238    }
3239}