ktstr/workload/schbench/run.rs
1//! schbench's run engine, ported faithfully from `schbench.c`: the
2//! message-thread / worker-thread topology, the lockless wait-list, the
3//! handshake-driven wakeup-latency loop, and the matrix work under the
4//! per-CPU lock. The default (non-RPS) mode is the wakeup-latency +
5//! request-latency benchmark, with per-phase schedstat (run-delay) capture; the
6//! RPS-injector mode (`-R`) and its auto-RPS closed-loop rate control (`-A`)
7//! layer on the request queue + once-per-second control thread; pipe mode (`-p`)
8//! swaps the matrix work for a memory-transfer simulation over the same handshake.
9//!
10//! # Topology (`schbench.c` `message_thread` :1540, `worker_thread` :1419)
11//!
12//! `message_threads` message threads each spawn `worker_threads` worker
13//! threads in one process (schbench is single-process pthreads; schbench_rs
14//! re-expresses that with [`std::thread`], so the
15//! [`handshake`](super::handshake) futex is PRIVATE and the per-CPU locks +
16//! matrices live in one address space). A worker loops: block until its
17//! message thread wakes it (measuring wakeup latency), then think-sleep +
18//! matrix work under the per-CPU lock (measuring request latency). The message
19//! thread batch-wakes all waiting workers (`run_msg_thread` :1166 /
20//! `xlist_wake_all` :969).
21//!
22//! # Fidelity
23//!
24//! - Clock: `CLOCK_MONOTONIC` (ruling), not schbench's `gettimeofday`
25//! wall-clock. Monotonic is freeze-robust (a host-side VM pause cannot make
26//! a delta go negative) and is the correct source for a latency delta. The
27//! measured quantity -- elapsed nanoseconds between two reads -- is identical
28//! to what schbench measures; only the clock id differs.
29//! - The lockless wait-list is a Treiber stack ([`TreiberStack`]) over an
30//! intrusive next-pointer, matching schbench's `xlist_add`/`xlist_splice`
31//! cmpxchg list (`schbench.c:866-896`): userspace CAS only, no lock/syscall
32//! on the hot path, so the syscall profile stays futex-dominated like
33//! schbench's. (schbench hand-duplicates this for its thread list and its
34//! request list; schbench_rs shares one generic implementation.)
35//! - do_work timing: the matrix multiply compiles to the SAME scalar inner loop
36//! as schbench's `-O2` `do_some_math` -- a `u64` multiply-accumulate
37//! (`imul` with a fused memory operand, then `add`), NOT SIMD. It does not
38//! auto-vectorize: there is no vector `u64` multiply on the build target
39//! (`x86-64-v3`; `vpmullq` is AVX-512) and the serial accumulator reduction
40//! blocks SLP -- exactly as for schbench's `-O2` build (its Makefile
41//! `CFLAGS = -Wall -O2`, no `-march`, likewise scalar; verified by disassembly
42//! of both). A debug build (opt-level 0) emits far more overhead (no register
43//! allocation, unrolling, or memory-operand fusion), so request latency runs
44//! higher than reference schbench's. ABSOLUTE side-by-side fidelity vs the
45//! reference (the `ktstr-schbench-validate` comparison) therefore needs a
46//! release build. The in-ktstr A/B comparison (perf-delta / per-phase claims)
47//! is build-invariant -- both sides use the same build -- so it is unaffected.
48
49use core::cell::UnsafeCell;
50use core::ptr;
51use core::sync::atomic::{AtomicBool, AtomicPtr, AtomicU32, AtomicU64, AtomicUsize, Ordering};
52use std::fs::File;
53use std::io::{Read, Seek, SeekFrom};
54
55use super::percpu_lock::PerCpuLocks;
56use super::plat::{Percentiles, PlatStats};
57
58/// Read `CLOCK_MONOTONIC` as nanoseconds (ruling: monotonic, not wall-clock).
59/// Self-contained so the schbench modules build into the standalone validation
60/// binary without reaching into `worker`'s `pub(super)` clock wrapper.
61fn monotonic_nanos() -> u64 {
62 // SAFETY: `clock_gettime` writes a `timespec` through the out-pointer and
63 // reads nothing else; CLOCK_MONOTONIC is always available on Linux.
64 let mut ts: libc::timespec = unsafe { core::mem::zeroed() };
65 let rc = unsafe { libc::clock_gettime(libc::CLOCK_MONOTONIC, &mut ts) };
66 assert_eq!(rc, 0, "clock_gettime(CLOCK_MONOTONIC) failed");
67 (ts.tv_sec as u64) * 1_000_000_000 + ts.tv_nsec as u64
68}
69
70/// A node that can be linked into a [`TreiberStack`] via an intrusive
71/// next-pointer it owns.
72trait Linked: Sized {
73 fn next_link(&self) -> &AtomicPtr<Self>;
74}
75
76/// Lockless LIFO stack via an intrusive next-pointer, the shared form of
77/// schbench's `xlist_add`/`xlist_splice` (`schbench.c:866-896`). Nodes are
78/// referenced by raw pointer and must outlive every concurrent operation; the
79/// caller owns their storage and lifetimes.
80struct TreiberStack<T: Linked> {
81 head: AtomicPtr<T>,
82}
83
84impl<T: Linked> TreiberStack<T> {
85 fn new() -> Self {
86 Self {
87 head: AtomicPtr::new(ptr::null_mut()),
88 }
89 }
90
91 /// Push `node` onto the stack. Faithful port of `xlist_add`
92 /// (`schbench.c:866`): set the node's link to the current head, then
93 /// CAS-publish it, retrying on contention. The caller must not push a node
94 /// that is already on the stack (its link would be overwritten while
95 /// reachable) -- schbench upholds this (a worker re-queues only after being
96 /// spliced off and woken).
97 fn add(&self, node: *mut T) {
98 // SAFETY: `node` is a valid pointer to a `T` that outlives all list
99 // operations (caller contract).
100 let link = unsafe { (*node).next_link() };
101 loop {
102 let old = self.head.load(Ordering::Acquire);
103 link.store(old, Ordering::Relaxed);
104 if self
105 .head
106 .compare_exchange(old, node, Ordering::AcqRel, Ordering::Acquire)
107 .is_ok()
108 {
109 return;
110 }
111 }
112 }
113
114 /// Atomically take the whole stack, leaving it empty, returning the head
115 /// (null if empty). Faithful port of `xlist_splice` (`schbench.c:884`): an
116 /// atomic swap to null (the single-op equivalent of schbench's CAS loop).
117 /// The returned chain is walked via [`Linked::next_link`] in LIFO order
118 /// (schbench's thread-list splice does not reverse; wake order is
119 /// irrelevant).
120 fn splice(&self) -> *mut T {
121 self.head.swap(ptr::null_mut(), Ordering::AcqRel)
122 }
123
124 /// Atomically take the whole stack and return the chain in FIFO (insertion)
125 /// order -- the reverse of [`splice`](Self::splice)'s LIFO. Faithful port of
126 /// `request_splice` (`schbench.c:920-940`): swap to null, then reverse the
127 /// spliced chain so the consumer services the oldest queued request first
128 /// (request order matters, unlike the thread wait-list). Returns null if
129 /// empty. Single-consumer: only the owning worker drains its own queue.
130 fn splice_reversed(&self) -> *mut T {
131 let mut cur = self.head.swap(ptr::null_mut(), Ordering::AcqRel);
132 let mut rev: *mut T = ptr::null_mut();
133 while !cur.is_null() {
134 // SAFETY: `cur` is a node that was published on the stack and whose
135 // storage the caller owns; the single consumer reverses the chain it
136 // just took exclusive ownership of, so no other thread touches these
137 // links concurrently.
138 let link = unsafe { (*cur).next_link() };
139 let next = link.load(Ordering::Acquire);
140 link.store(rev, Ordering::Relaxed);
141 rev = cur;
142 cur = next;
143 }
144 rev
145 }
146}
147
148/// A queued work request in RPS-injector mode (`schbench.c` `struct request`,
149/// `:757`). The RPS thread heap-allocates one (`Box`) per request and links it
150/// onto the target worker's queue; the worker drains its queue and frees each --
151/// matching schbench's per-request malloc/free (part of the syscall profile the
152/// standalone validation compares). schbench's `start_time` field is set
153/// (`:951`) but never read, so it is omitted: the wakeup latency is measured
154/// from the worker's `wake_time` (stamped by the RPS thread at enqueue, and by
155/// the worker itself before it blocks -- see [`rps_wait`]) and the request
156/// latency from a local work-start -- neither reads `start_time`.
157/// Omitting it also drops schbench's per-request `gettimeofday` for that dead
158/// field (`:951`) -- one fewer clock syscall per request, consistent with the
159/// engine's `CLOCK_MONOTONIC`-vs-`gettimeofday` clock divergence.
160struct Request {
161 /// Intrusive link for the per-worker request [`TreiberStack`].
162 next: AtomicPtr<Request>,
163}
164
165impl Request {
166 fn new() -> Self {
167 Self {
168 next: AtomicPtr::new(ptr::null_mut()),
169 }
170 }
171}
172
173impl Linked for Request {
174 fn next_link(&self) -> &AtomicPtr<Self> {
175 &self.next
176 }
177}
178
179/// Declarative config for the [`Schbench`](crate::workload::WorkType::Schbench)
180/// workload. Construct via [`SchbenchConfig::default`] (schbench's own
181/// defaults) plus the chainable setters, e.g.
182/// `SchbenchConfig::default().message_threads(2).worker_threads(4)`. Derives
183/// Clone/Debug/PartialEq/Eq/Hash/serde; the builder shape follows
184/// [`WorkloadConfig`](crate::workload::WorkloadConfig), but `Eq`+`Hash` (which
185/// `WorkloadConfig` and `WorkSpec` omit because of their transitive `f64`) are
186/// available here since every field is integer/bool -- the ktstr f64-free
187/// leaf-config convention.
188///
189/// # schbench(8) CLI parity
190///
191/// This port re-expresses schbench's default (matrix-work) mode natively, so
192/// its tunables are config fields and topology rather than CLI flags. The
193/// mapping to schbench's option table (`schbench.c:138-187`):
194///
195/// | schbench flag | ktstr |
196/// |---|---|
197/// | `-m` message-threads | `message_threads` |
198/// | `-t` threads | `worker_threads` (workers per message thread; `0` = `ceil(cpuset_cpus / message_threads)`, see below) |
199/// | `-F` cache_footprint | `cache_footprint_kib` |
200/// | `-n` operations | `operations` |
201/// | `-s` sleep_usec | `sleep_usec` |
202/// | `-L` no-locking | `skip_locking` |
203/// | `-R` rps | `requests_per_sec` |
204/// | `-A` auto-rps | `auto_rps` |
205/// | `--split` (long-only) | `split_percent` (`None` = no split, all-private) |
206/// | `-p` pipe (also `--pipe`) | `pipe_transfer_bytes` (`0` = off; memory-transfer mode, no matrix work) |
207/// | `-r` runtime | in-VM: the scenario engine's run window (the engine runs until `stop`); host-side: the `run_secs` argument to [`run_standalone`](crate::workload::run_standalone) |
208///
209/// ## Set by ktstr topology, not a flag
210///
211/// - `-t` default: with `worker_threads = 0`, ktstr matches schbench's `-t`
212/// 0-default -- it divides the CPU count across the message threads,
213/// `ceil(cpus / message_threads)` per thread (`schbench.c:1849-1852`), so the
214/// total worker count stays near the CPU count. ktstr scopes "cpus" to the
215/// allocated guest cpuset (the worker's `sched_getaffinity` mask, set by the
216/// scenario's topology / `CgroupDef`) rather than schbench's `get_nprocs`, so
217/// the total is ≈ the cpuset's CPU count. An explicit non-zero `worker_threads`
218/// is workers-per-message-thread in both.
219/// - `-M` (message-cpus) / `-W` (worker-cpus) thread pinning: ktstr places
220/// threads through its affinity / cpuset layer, so there is deliberately no
221/// per-thread-pin knob.
222///
223/// ## Observability flags -> the metric API
224///
225/// schbench's `-w` (warmuptime), `-i` (intervaltime), `-z` (zerotime), `-j`
226/// (json), and `-J` (jobname) shape its streaming stderr/JSON report. ktstr's
227/// numbers flow through the metric API instead -- per-phase attribution and the
228/// sidecar -- so these have no flag equivalent. `ktstr-schbench-validate`
229/// reproduces schbench's stderr-table shape for a side-by-side comparison.
230///
231/// ## Split mode (`--split`)
232///
233/// `Some(p)` partitions `cache_footprint_kib` into a per-thread private matrix
234/// (`p`%) and ONE process-global shared matrix (`100-p`%) that every worker
235/// multiplies into concurrently, reproducing schbench's cross-core
236/// shared-working-set cache contention (`schbench.c:1390-1404`, `:1858-1863`).
237/// ktstr models the shared matrix with `AtomicU64` `Relaxed` accesses. Like
238/// schbench's emitted code, the shared kernel keeps the running sum in a register
239/// and STORES it to each shared C cell on every inner (`k`) iteration -- C is
240/// write-only in the loop (A and B are loaded each `k`, C is never reloaded), and
241/// that per-k store is what generates the contention. Both gcc and clang keep the
242/// per-k store: `do_some_math` reads `m1`/`m2`/`m3` as offsets into one base
243/// pointer, so neither can prove the `m3` store doesn't alias the next `k`'s
244/// `m1`/`m2` loads. On x86-64 a `Relaxed` load/store lowers to a plain `MOV` (no
245/// `LOCK`), so the contention is identical to schbench's plain shared-memory race
246/// -- but sound (atomics, no data race), with zero unsafe. `None` (default) is
247/// the legacy all-private single matrix.
248///
249/// ## Pipe mode (`-p`)
250///
251/// `pipe_transfer_bytes > 0` REPLACES the matrix workload with schbench's
252/// memory-transfer simulation (`schbench.c:177`, `pipe_test`). It rides the
253/// message-handshake path: the message thread memsets each woken worker's
254/// per-thread page to `1` (`schbench.c:980-981`) and the worker memsets its own
255/// page to `2` before blocking (`schbench.c:1003-1004`), `pipe_transfer_bytes`
256/// bytes each per handshake cycle (clamped to 1 MiB, `PIPE_TRANSFER_BUFFER`).
257/// `do_work` and the think-sleep are skipped (`schbench.c:1448`), so the only
258/// per-cycle work is the wakeup handshake + the two memsets; the report is the
259/// PER-WORKER memory-transfer throughput (`avg worker transfer` = the aggregate
260/// rate divided by the worker count, `schbench.c:1697,1942-1943,1979`) alongside
261/// the wakeup-latency table, not request latency.
262///
263/// ktstr does NOT compose `-p` with `-R`: in pipe mode it always runs the
264/// message-handshake waker (so BOTH pipe memsets fire — a full transfer) and never
265/// starts the RPS injector. schbench instead COMPOSES them, half-broken: it has no
266/// precedence (`-R` alone picks the waker, `schbench.c:1594`), so `-p -R` runs the
267/// RPS injector while the worker-side memset still fires unconditionally
268/// (`schbench.c:1003-1004`) but the waker-side memset — which lives only in
269/// `xlist_wake_all` (`schbench.c:980-981`) — does not, yielding a degenerate
270/// half-pipe. ktstr's full pipe is the more faithful `-p` behavior; the realistic
271/// use is `-p` without `-R`. schbench also zeroes warmuptime in pipe mode
272/// (`schbench.c:296`); ktstr has no warmuptime concept, so that is a no-op here.
273///
274/// ## Modes not ported
275///
276/// - `-C` (calibrate): a tuning aid that times schbench's own work loop and
277/// forces `-L` (`schbench.c:166`, `:389`). Intentionally out of scope -- ktstr
278/// measures through the metric path.
279#[derive(Clone, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
280#[serde(rename_all = "snake_case")]
281pub struct SchbenchConfig {
282 /// Number of message threads (`schbench.c` `-m`, default 1).
283 pub message_threads: usize,
284 /// Worker threads per message thread (`schbench.c` `-t`). 0 resolves to
285 /// `ceil(cpuset_cpus / message_threads)` -- the CPU count of the allocated
286 /// guest cpuset (the worker's `sched_getaffinity` mask, per ruling) divided
287 /// across the message threads, matching schbench's 0-default
288 /// (`schbench.c:1849-1852`) scoped to the cpuset rather than `get_nprocs`.
289 /// See `resolve_worker_count` and the CLI-parity section above.
290 pub worker_threads: usize,
291 /// Per-worker matrix cache footprint in KiB (`schbench.c` `-F`, default
292 /// 256); sets the matrix dimension.
293 pub cache_footprint_kib: usize,
294 /// Matrix multiplications per work cycle (`schbench.c` `-n`, default 5).
295 pub operations: usize,
296 /// Think-time sleep before the matrix work, microseconds (`schbench.c`
297 /// `-s`, default 100); simulates networking. 0 disables.
298 pub sleep_usec: u64,
299 /// Skip the per-CPU lock around the matrix work (`schbench.c` `-L`,
300 /// default false: locking on).
301 pub skip_locking: bool,
302 /// Fixed request rate, requests/second (`schbench.c` `-R`, default 0 = off).
303 /// 0 selects the default message-handshake mode (each worker is woken by its
304 /// message thread); non-zero switches to the RPS-injector mode, where a
305 /// dedicated thread enqueues `requests_per_sec` requests/second round-robin
306 /// across the workers (`schbench.c` `run_rps_thread`, `:1258`).
307 pub requests_per_sec: usize,
308 /// Auto-RPS target CPU-busy percentage (`schbench.c` `-A`, default 0 = off).
309 /// Non-zero turns on closed-loop rate control: a once-per-second control
310 /// thread grows/shrinks the live request rate toward this host-busy% target
311 /// (`schbench.c` `auto_scale_rps`, `:1180`). Setting it seeds the rate to 10
312 /// when `requests_per_sec` is 0 (`schbench.c:286`), so auto-RPS starts low
313 /// and climbs.
314 pub auto_rps: usize,
315 /// Percent of the cache footprint that is PRIVATE per worker thread
316 /// (`schbench.c` `--split`, long-only, 0-100). `None` = no split:
317 /// schbench's legacy all-private single matrix (`schbench.c:1405-1408`,
318 /// `:1879-1880`). `Some(p)` partitions `cache_footprint_kib` into a
319 /// per-thread private matrix (`p`%) and ONE process-global shared matrix
320 /// (`100-p`%) that every worker multiplies into concurrently, reproducing
321 /// schbench's cross-core shared-working-set cache contention
322 /// (`schbench.c:1390-1404`, `:1858-1863`). `Some(0)` = all shared,
323 /// `Some(100)` = all private (same matrix sizes as `None`, but routed
324 /// through the split branch, matching schbench's `split_specified` path).
325 /// Out-of-range `Some(p > 100)` panics when the engine consumes it
326 /// (`schbench.c:362-365` exits on the same); the builder also debug-asserts
327 /// the bound.
328 pub split_percent: Option<usize>,
329 /// Pipe-mode transfer size in bytes (`schbench.c` `-p`/`--pipe`, default 0 =
330 /// off, clamped to 1 MiB `PIPE_TRANSFER_BUFFER`). Non-zero REPLACES the
331 /// matrix workload with schbench's memory-transfer simulation: the message
332 /// thread memsets each woken worker's per-thread page to `1` and the worker
333 /// memsets its own page to `2` (`schbench.c:980-981`/`:1003-1004`),
334 /// `pipe_transfer_bytes` bytes each per cycle, while `do_work` + the
335 /// think-sleep are skipped (`schbench.c:1448`). Reports PER-WORKER
336 /// memory-transfer throughput (`avg worker transfer`) rather than request
337 /// latency.
338 pub pipe_transfer_bytes: usize,
339}
340
341/// schbench's `PIPE_TRANSFER_BUFFER` (`schbench.c:41`): the per-thread pipe page
342/// is 1 MiB; a larger `pipe_transfer_bytes` is clamped to it (schbench clamps and
343/// warns, `schbench.c:291-294`).
344pub(crate) const PIPE_TRANSFER_BUFFER: usize = 1024 * 1024;
345
346/// Cap on a single RPS-injector pacing sleep (ns). `run_rps_thread` runs
347/// synchronously on the message thread that `run` joins on shutdown, so an
348/// uncapped paced sleep would delay teardown by up to a full inter-arrival
349/// interval — at a degenerate low `requests_per_sec` (interval = 1s / rps) that
350/// can exceed the scenario cleanup/watchdog budget. Capping each sleep at this
351/// quantum re-checks `stop` at least this often, bounding shutdown latency. At
352/// realistic rates the interval is well under this, so a single uncapped wait
353/// results — the cap only engages at pathological low rates. (Schbench-local; the
354/// sibling taobench engine has its own equivalent, both with self-contained
355/// `monotonic_nanos`.)
356const STOP_POLL_QUANTUM_NS: u64 = 50_000_000;
357
358/// schbench's pipe-mode (`-p`) throughput summary — the `avg worker transfer`
359/// line (`schbench.c:1979-1982`): the per-worker transfer rate as ops/sec plus
360/// the pretty-scaled bytes/sec. Built by [`pipe_transfer_report`].
361#[derive(Debug, Clone, Copy, PartialEq)]
362pub struct PipeTransferReport {
363 /// Completed transfer cycles per second, PER WORKER (`loop_count / Σ worker
364 /// runtimes`, `schbench.c:1942-1943`).
365 pub ops_per_sec: f64,
366 /// Bytes transferred per second PER WORKER, scaled into [`unit`](Self::unit)
367 /// (÷1024 per step, `pretty_size`).
368 pub scaled: f64,
369 /// The unit `scaled` is expressed in: `B`/`KB`/`MB`/`GB`/`TB`/`PB`/`EB`
370 /// (`schbench.c:1606`).
371 pub unit: &'static str,
372}
373
374/// Derive the pipe-mode `avg worker transfer` line from a run's aggregate
375/// `achieved_rps` (completed cycles/sec over the true elapsed window), the
376/// requested `pipe_transfer_bytes`, and the resolved `nr_workers`. The figure is
377/// PER WORKER: schbench divides by `loop_runtime` = Σ each worker's runtime
378/// (`schbench.c:1697` sums `worker->runtime`; `:1942-1943`/`:1979` divide by it),
379/// and `Σ worker runtimes ≈ nr_workers * elapsed`, so the per-worker rate is the
380/// aggregate `achieved_rps / nr_workers` — the label is literally "avg WORKER
381/// transfer". (Dividing the aggregate by wall-clock alone would over-report by
382/// `nr_workers`×.) The transfer size is CLAMPED to `PIPE_TRANSFER_BUFFER` first —
383/// the engine moves only the clamped size per cycle (`run` applies the same
384/// `.min()`), matching schbench's parse-time clamp (`schbench.c:291-294`) — so the
385/// throughput reflects the bytes ACTUALLY moved. Scaling is schbench's
386/// `pretty_size` (`schbench.c:1606`). `nr_workers` is floored at 1 (no division by
387/// zero).
388pub fn pipe_transfer_report(
389 achieved_rps: f64,
390 pipe_transfer_bytes: usize,
391 nr_workers: usize,
392) -> PipeTransferReport {
393 let n = nr_workers.max(1) as f64;
394 let bytes = pipe_transfer_bytes.min(PIPE_TRANSFER_BUFFER);
395 // PER-WORKER: schbench's loops_per_sec/mb_per_sec divide the aggregate by
396 // Σ worker runtimes (≈ nr_workers * elapsed), i.e. `achieved_rps / nr_workers`.
397 let ops_per_sec = achieved_rps / n;
398 let bytes_per_sec = achieved_rps * bytes as f64 / n;
399 let (scaled, unit) = pretty_size(bytes_per_sec);
400 PipeTransferReport {
401 ops_per_sec,
402 scaled,
403 unit,
404 }
405}
406
407/// schbench's `pretty_size` (`schbench.c:1606-1620`): scale a byte count by 1024
408/// into B/KB/MB/GB/TB/PB/EB. Stops at the last unit (matches schbench's
409/// `units[divs + 1] == NULL` break — never overflows the table).
410fn pretty_size(mut number: f64) -> (f64, &'static str) {
411 const UNITS: [&str; 7] = ["B", "KB", "MB", "GB", "TB", "PB", "EB"];
412 let mut divs = 0;
413 while number >= 1024.0 && divs + 1 < UNITS.len() {
414 divs += 1;
415 number /= 1024.0;
416 }
417 (number, UNITS[divs])
418}
419
420impl Default for SchbenchConfig {
421 fn default() -> Self {
422 // schbench defaults (schbench.c option table + globals).
423 Self {
424 message_threads: 1,
425 worker_threads: 0,
426 cache_footprint_kib: 256,
427 operations: 5,
428 sleep_usec: 100,
429 skip_locking: false,
430 requests_per_sec: 0,
431 auto_rps: 0,
432 split_percent: None,
433 pipe_transfer_bytes: 0,
434 }
435 }
436}
437
438impl SchbenchConfig {
439 /// Set the number of message threads (schbench `-m`).
440 #[must_use = "builder methods consume self; bind the result"]
441 pub fn message_threads(mut self, n: usize) -> Self {
442 self.message_threads = n;
443 self
444 }
445 /// Set worker threads per message thread (schbench `-t`); 0 = one per
446 /// allocated CPU.
447 #[must_use = "builder methods consume self; bind the result"]
448 pub fn worker_threads(mut self, n: usize) -> Self {
449 self.worker_threads = n;
450 self
451 }
452 /// Set the per-worker matrix cache footprint in KiB (schbench `-F`).
453 #[must_use = "builder methods consume self; bind the result"]
454 pub fn cache_footprint_kib(mut self, kib: usize) -> Self {
455 self.cache_footprint_kib = kib;
456 self
457 }
458 /// Set the matrix multiplications per work cycle (schbench `-n`).
459 #[must_use = "builder methods consume self; bind the result"]
460 pub fn operations(mut self, n: usize) -> Self {
461 self.operations = n;
462 self
463 }
464 /// Set the think-time sleep in microseconds (schbench `-s`); 0 disables.
465 #[must_use = "builder methods consume self; bind the result"]
466 pub fn sleep_usec(mut self, usec: u64) -> Self {
467 self.sleep_usec = usec;
468 self
469 }
470 /// Skip the per-CPU lock around the matrix work (schbench `-L`).
471 #[must_use = "builder methods consume self; bind the result"]
472 pub fn skip_locking(mut self, skip: bool) -> Self {
473 self.skip_locking = skip;
474 self
475 }
476 /// Set the fixed request rate in requests/second (schbench `-R`); 0 selects
477 /// the default message-handshake mode, non-zero the RPS-injector mode.
478 #[must_use = "builder methods consume self; bind the result"]
479 pub fn requests_per_sec(mut self, rps: usize) -> Self {
480 self.requests_per_sec = rps;
481 self
482 }
483 /// Set the auto-RPS target host-busy percentage (schbench `-A`); 0 disables
484 /// auto-scaling. Non-zero seeds the rate to 10 when `requests_per_sec` is 0.
485 #[must_use = "builder methods consume self; bind the result"]
486 pub fn auto_rps(mut self, target_pct: usize) -> Self {
487 self.auto_rps = target_pct;
488 self
489 }
490 /// Set the private/shared cache-footprint split percentage (schbench
491 /// `--split`); `None` (default) = no split, all-private single matrix.
492 /// `Some(p)` requires `p <= 100`: the builder debug-asserts it for early
493 /// feedback, and the engine hard-panics on an out-of-range value at the
494 /// consumption boundary in `run` -- the analog of schbench exiting on a bad
495 /// `--split` (`schbench.c:362-365`).
496 #[must_use = "builder methods consume self; bind the result"]
497 pub fn split_percent(mut self, percent: Option<usize>) -> Self {
498 debug_assert!(
499 percent.is_none_or(|p| p <= 100),
500 "split_percent must be 0..=100"
501 );
502 self.split_percent = percent;
503 self
504 }
505 /// Set the pipe-mode transfer size in bytes (schbench `-p`/`--pipe`); 0
506 /// (default) = off (the matrix workload). Non-zero switches to schbench's
507 /// memory-transfer simulation; values above 1 MiB (`PIPE_TRANSFER_BUFFER`)
508 /// are clamped when the engine consumes it (schbench clamps the same,
509 /// `schbench.c:291-294`).
510 #[must_use = "builder methods consume self; bind the result"]
511 pub fn pipe_transfer_bytes(mut self, bytes: usize) -> Self {
512 self.pipe_transfer_bytes = bytes;
513 self
514 }
515
516 /// Matrix dimension from the cache footprint, identical to schbench
517 /// (`schbench.c:1880`: `sqrt(cache_footprint_kb * 1024 / 3 /
518 /// sizeof(unsigned long))`) and to ktstr's `FanOutCompute` precompute. Zero
519 /// `operations` or `cache_footprint_kib` yields a 0 dimension (no matrix
520 /// work).
521 pub(crate) fn matrix_size(&self) -> usize {
522 if self.operations > 0 && self.cache_footprint_kib > 0 {
523 ((self.cache_footprint_kib * 1024 / 3 / core::mem::size_of::<u64>()) as f64).sqrt()
524 as usize
525 } else {
526 0
527 }
528 }
529
530 /// Shared-matrix dimension for `--split` (`schbench.c:1859,1862`:
531 /// `sqrt(cache_footprint_kb*(100-split)/100 * 1024/3/sizeof(ulong))`).
532 /// `None` split (or 0 `operations`/`cache_footprint_kib`) => 0 (no shared
533 /// matrix); `Some(100)` => 0 (all private, no shared work).
534 pub(crate) fn shared_matrix_size(&self) -> usize {
535 match self.split_percent {
536 Some(p) if self.operations > 0 && self.cache_footprint_kib > 0 => {
537 let shared_kb = self.cache_footprint_kib * (100 - p) / 100;
538 ((shared_kb * 1024 / 3 / core::mem::size_of::<u64>()) as f64).sqrt() as usize
539 }
540 _ => 0,
541 }
542 }
543
544 /// Private (per-worker) matrix dimension. `Some(p)` (`schbench.c:1860,1863`):
545 /// `sqrt(cache_footprint_kb*split/100 * 1024/3/sizeof(ulong))`. `None`
546 /// (legacy): the full-footprint [`matrix_size`](Self::matrix_size)
547 /// (`schbench.c:1880`, the all-private single matrix).
548 pub(crate) fn private_matrix_size(&self) -> usize {
549 match self.split_percent {
550 Some(p) if self.operations > 0 && self.cache_footprint_kib > 0 => {
551 let private_kb = self.cache_footprint_kib * p / 100;
552 ((private_kb * 1024 / 3 / core::mem::size_of::<u64>()) as f64).sqrt() as usize
553 }
554 _ => self.matrix_size(),
555 }
556 }
557
558 /// The effective TOTAL request rate after schbench's startup seed: the user's
559 /// `requests_per_sec`, or 10 when auto-RPS is on and no fixed rate was given
560 /// (`schbench.c:286`: auto-RPS starts at 10 and climbs toward its target).
561 /// Both the mode gate ([`rps_per_message_thread`](Self::rps_per_message_thread))
562 /// and the live auto-scaled injection rate seed from this.
563 pub(crate) fn normalized_total_rps(&self) -> usize {
564 if self.auto_rps != 0 && self.requests_per_sec == 0 {
565 10
566 } else {
567 self.requests_per_sec
568 }
569 }
570
571 /// Per-message-thread request rate driving the RPS-vs-default MODE GATE: the
572 /// normalized total ([`normalized_total_rps`](Self::normalized_total_rps))
573 /// divided across the message threads, each of which runs its own RPS thread.
574 /// schbench divides once at startup (`requests_per_sec /= message_threads`,
575 /// `schbench.c:1899`) BEFORE the per-thread mode gate (`if (requests_per_sec)`,
576 /// `schbench.c:1594`), so the gate sees the divided value: a total below the
577 /// message-thread count rounds to 0 and the thread runs the DEFAULT
578 /// message-handshake mode (not a zero-rate RPS). The gate is decided ONCE from
579 /// this seeded value; the actual injection rate is the live auto-scaled total
580 /// (which may climb above the seed). Integer division drops the remainder,
581 /// matching schbench -- so auto-RPS with more than 10 message threads (seed
582 /// 10 / m = 0) degenerates to default mode, exactly as schbench does.
583 pub(crate) fn rps_per_message_thread(&self) -> usize {
584 self.normalized_total_rps() / self.message_threads.max(1)
585 }
586
587 /// Effective per-message-thread worker count for [`run`]. A non-zero
588 /// `worker_threads` is workers-per-message-thread as-is; the `0` default
589 /// mirrors schbench's `-t` 0-default (`schbench.c:1849-1852`:
590 /// `worker_threads = (num_cpus + message_threads - 1) / message_threads`)
591 /// but over the allocated guest cpuset (`allowed_count` from
592 /// [`resolve_cpu_topology`], per the ktstr cpuset ruling) instead of
593 /// `get_nprocs`: it divides the cpuset CPU count across the message threads
594 /// (ceil), so the TOTAL worker count (`message_threads * this`) stays ≈ the
595 /// cpuset CPU count -- matching schbench's total ≈ `num_cpus`.
596 /// `message_threads` floors at 1 to avoid a zero divisor; the divide is a
597 /// no-op at the default `message_threads = 1`.
598 pub(crate) fn resolve_worker_count(&self, allowed_count: usize) -> usize {
599 if self.worker_threads != 0 {
600 return self.worker_threads;
601 }
602 allowed_count.div_ceil(self.message_threads.max(1))
603 }
604}
605
606/// Per-thread state, the Rust counterpart of schbench's `struct thread_data`
607/// (`schbench.c:766`). Shared across threads by raw pointer for the lockless
608/// wait-list, so cross-thread access is restricted to the atomic fields
609/// (`next`, `futex`, `wake_time`); the histogram fields are owned solely by the
610/// worker thread the `ThreadData` belongs to.
611pub(crate) struct ThreadData {
612 /// Treiber-stack link for the message thread's wait-list
613 /// (schbench `thread_data->next`, `:776`). Null when not queued.
614 next: AtomicPtr<ThreadData>,
615 /// Wake handshake futex (`schbench.c` `thread_data->futex`, `:791`).
616 futex: super::handshake::Handshake,
617 /// Monotonic-ns timestamp the waker stamps just before posting, so the
618 /// woken thread can measure scheduler wakeup latency (schbench `wake_time`,
619 /// `:788`, stamped in `xlist_wake_all` `:984`).
620 wake_time: AtomicU64,
621 /// Wakeup-latency histogram (`schbench.c` `wakeup_stats`, `:794`).
622 /// Owner-thread-only; see the `Sync` impl SAFETY note.
623 wakeup_stats: UnsafeCell<PlatStats>,
624 /// Request (work-cycle) latency histogram (`schbench.c` `request_stats`,
625 /// `:795`). Owner-thread-only.
626 request_stats: UnsafeCell<PlatStats>,
627 /// Mean per-schedule run-queue wait (ns), read from `/proc/<tid>/schedstat`
628 /// at thread exit (schbench's `read_sched_delay`, `:1118`). Owner-only.
629 /// Feeds the WHOLE-RUN `SchbenchResult` (mean-of-means, matching real
630 /// schbench for the side-by-side validation); the per-phase delay comes from
631 /// [`Self::phase_snapshots`] instead.
632 sched_delay_ns: UnsafeCell<u64>,
633 /// Per-phase snapshots this thread accumulated via drain-on-change against
634 /// the shared `phase_epoch` (one [`PhaseSnapshot`] per phase the thread did
635 /// work in, plus a final in-flight phase at exit). Owner-thread-only,
636 /// drained by the main thread after join — same happens-before as the
637 /// histogram cells. Empty when run non-phasic (`phase_epoch == None`) until
638 /// the single end-of-run drain.
639 phase_snapshots: UnsafeCell<Vec<PhaseSnapshot>>,
640 /// RPS-injector mode: this worker's pending request queue. The RPS thread
641 /// pushes heap-allocated [`Request`]s (round-robin across workers); the
642 /// owning worker drains via `splice_reversed` (FIFO) and frees each. Unused
643 /// in the default message-handshake mode (`requests_per_sec == 0`).
644 requests: TreiberStack<Request>,
645 /// RPS-injector backpressure counter (`schbench.c` `thread_data->pending`,
646 /// `:779`): incremented by the RPS thread per enqueue, reset to 0 by the
647 /// owning worker at each splice. The RPS thread stops enqueuing to a worker
648 /// whose `pending` exceeds the batch cap (`:1284`).
649 pending: AtomicU64,
650 /// Pipe-mode (`-p`) transfer size in bytes (0 = not pipe mode); the length of
651 /// [`Self::pipe_page`] and the per-cycle memset. Plain `Copy`, set once at
652 /// construction, read-only thereafter.
653 pipe_bytes: usize,
654 /// Pipe-mode per-thread transfer page (`schbench.c` `thread_data->pipe_page`,
655 /// `:801`). The worker memsets it to `2` before blocking and the waker memsets
656 /// it to `1` when waking the worker; the bytes are never read back (the point
657 /// is the memory-transfer cost). `UnsafeCell` because the WAKER writes the
658 /// worker's page (cross-thread) -- see the `Sync` SAFETY note for the
659 /// wait-list/futex ordering that keeps it race-free. Empty unless pipe mode.
660 pipe_page: UnsafeCell<Box<[u8]>>,
661}
662
663/// One thread's latency + run-queue-delay accumulation over a single phase
664/// (the window between two `phase_epoch` transitions). Built on the owning
665/// thread at each drain-on-change boundary; the histograms are `take`n from the
666/// live cells (snapshot-and-reset) and the run-delay is a RAW
667/// `/proc/<tid>/schedstat` `(run_delay, pcount)` DELTA over the phase, so the
668/// host re-derives the per-phase mean as `Σrun_delay / Σpcount` (the
669/// sample-weighted pooled mean — a deliberate divergence from schbench's
670/// whole-run mean-of-means, correct for a heterogeneous per-phase thread set).
671/// Message threads leave `wakeup`/`request` empty and `loop_count` 0 (they
672/// carry only run-delay); workers fill all fields.
673struct PhaseSnapshot {
674 /// The `phase_epoch` value active while this snapshot's samples were
675 /// recorded (NOT the value at drain time). `0` = BASELINE, `u32::MAX` =
676 /// inter-step gap; both are emitted as-is and discarded host-side.
677 epoch: u32,
678 wakeup: PlatStats,
679 request: PlatStats,
680 /// `/proc/<tid>/schedstat` field 2 (run_delay ns) delta over this phase.
681 run_delay_ns: u64,
682 /// `/proc/<tid>/schedstat` field 3 (pcount = timeslices) delta over this
683 /// phase. The host guards `pcount == 0` → metric absent (never a div-by-zero
684 /// 0), matching [`mean_sched_delay`].
685 pcount: u64,
686 /// Completed work cycles this worker ran in this phase (0 for the message
687 /// thread).
688 loop_count: u64,
689}
690
691/// Per-phase, cross-thread aggregate for one `phase_epoch`: the wakeup + request
692/// histograms merged across every worker that ran in the phase, plus the
693/// run-delay raw pairs split by thread class (message vs worker). The per-phase
694/// wire carrier — rides inside [`crate::workload::PhaseSlice`] guest→host. All
695/// fields are integer, so the host re-pools across workers/cgroups by
696/// [`PlatStats::combine`] (histogram add) + integer sums; percentiles are
697/// re-derived from the merged histogram, NEVER averaged.
698///
699/// ESTIMATOR NOTE: the per-phase run-delay mean the host derives from
700/// `*_run_delay_ns / *_pcount` is SAMPLE-WEIGHTED (`Σrun_delay / Σpcount`), a
701/// DIFFERENT estimator from the whole-run [`SchbenchResult`]'s `sched_delay_*`,
702/// which keeps schbench's mean-of-per-thread-means (`collect_sched_delay`) for
703/// schbench parity. They measure different things by design — never cross-compare a
704/// per-phase value against a whole-run threshold (or vice-versa). `pcount == 0`
705/// for a class means that class was never scheduled in the phase → the host
706/// emits the metric as ABSENT, not `0`.
707#[derive(Debug, Clone, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
708pub(crate) struct SchbenchPhaseStats {
709 /// Wakeup-latency histogram merged across the phase's workers.
710 pub(crate) wakeup: PlatStats,
711 /// Request-latency histogram merged across the phase's workers.
712 pub(crate) request: PlatStats,
713 /// Per-phase achieved-RPS distribution: the control thread's per-second
714 /// samples attributed to this phase's epoch (the whole-run rps is unchanged;
715 /// this is purely additive). A 1s sample straddling an epoch boundary lands
716 /// wholly in the sample-time epoch (bounded fuzz, <=1 sample/transition, like
717 /// the msg-thread late-drain). Empty for a phase shorter than the ~1s control
718 /// cadence -- the host gates on `sample_count() > 0` so a sub-second phase
719 /// reads ABSENT, never rps=0. A rate; re-derived from the merged histogram.
720 pub(crate) rps: PlatStats,
721 /// Σ message-thread `run_delay` (ns) over the phase.
722 pub(crate) msg_run_delay_ns: u64,
723 /// Σ message-thread `pcount` over the phase (the mean's denominator).
724 pub(crate) msg_pcount: u64,
725 /// Σ worker `run_delay` (ns) over the phase.
726 pub(crate) worker_run_delay_ns: u64,
727 /// Σ worker `pcount` over the phase (the mean's denominator).
728 pub(crate) worker_pcount: u64,
729 /// Σ completed work cycles across the phase's workers.
730 pub(crate) loop_count: u64,
731}
732
733impl SchbenchPhaseStats {
734 /// Merge `other` into `self`: combine the wakeup + request histograms
735 /// (bucket-count addition, [`PlatStats::combine`]) and integer-add the
736 /// run-delay raw pairs + loop_count. Associative AND commutative (combine +
737 /// `saturating_add` both are), so pooling is order-independent — the SAME
738 /// operation whether pooling per-engine across message threads (in [`run`])
739 /// or per-cgroup across workers host-side
740 /// (`crate::assert::PhaseCgroupStats::merge`). Percentiles are NEVER
741 /// merged; the merged histogram is re-derived to percentiles by the reader.
742 pub(crate) fn merge(&mut self, other: &SchbenchPhaseStats) {
743 self.wakeup.combine(&other.wakeup);
744 self.request.combine(&other.request);
745 self.rps.combine(&other.rps);
746 self.msg_run_delay_ns = self.msg_run_delay_ns.saturating_add(other.msg_run_delay_ns);
747 self.msg_pcount = self.msg_pcount.saturating_add(other.msg_pcount);
748 self.worker_run_delay_ns = self
749 .worker_run_delay_ns
750 .saturating_add(other.worker_run_delay_ns);
751 self.worker_pcount = self.worker_pcount.saturating_add(other.worker_pcount);
752 self.loop_count = self.loop_count.saturating_add(other.loop_count);
753 }
754}
755
756impl Linked for ThreadData {
757 fn next_link(&self) -> &AtomicPtr<Self> {
758 &self.next
759 }
760}
761
762// SAFETY: `ThreadData` is shared across threads only via the lockless wait-list
763// and (in RPS mode) the per-worker request queue, whose operations touch
764// exclusively the atomic fields (`next`, `futex`, `wake_time`, `requests`,
765// `pending`) -- all internally synchronized. `requests` is single-producer /
766// single-consumer: the RPS thread is the only pusher to a given worker's queue
767// (round-robin assignment) and the owning worker is the only drainer, with the
768// push's Release CAS / drain's Acquire swap establishing happens-before;
769// `pending` is plain atomic counting. The `UnsafeCell` fields (`wakeup_stats`,
770// `request_stats`, `sched_delay_ns`, `phase_snapshots`) are written and read
771// ONLY by the single worker thread that owns this `ThreadData`, and only the
772// main thread reads/drains them after all workers have joined (a happens-before
773// via the join). The per-phase drain (`worker_loop` / `run_msg_thread`) `take`s
774// and pushes into the owning thread's own cells, never another's. The `pipe_page`
775// cell (pipe mode) is the one cross-thread `UnsafeCell`: the owning worker memsets
776// it (to 2) and the message thread memsets it (to 1). They never overlap, via two
777// happens-before edges:
778// - SAME cycle (worker fill-2 vs waker fill-1): the worker's fill-2 is SEQUENCED
779// BEFORE its `wait_list.add` (Release CAS), and the waker's `splice` (Acquire
780// swap) observes that add, giving fill-2 -> add -> splice -> waker fill-1. The
781// load-bearing fact is "fill-2 precedes add", which holds UNCONDITIONALLY --
782// including the no-block shutdown path (stop set, worker skips the futex wait):
783// the worker still added itself after fill-2, so a waker that then splices and
784// fills it is ordered after the fill-2, and the worker's exit path never
785// touches the page again. Blocking is NOT what makes it safe.
786// - NEXT cycle (worker's fill-2 vs the waker's prior fill-1): the futex
787// post/consume CAS (SeqCst, `Handshake`) orders waker fill-1 -> post -> the
788// worker's consume -> its next fill-2; the worker cannot reach its next fill-2
789// until it consumes the token the waker published after fill-1.
790// So the two threads never touch `pipe_page` concurrently. No two threads ever
791// touch a cell concurrently, so sharing `&ThreadData` across threads is sound.
792unsafe impl Sync for ThreadData {}
793
794impl ThreadData {
795 fn new(pipe_bytes: usize) -> Self {
796 Self {
797 next: AtomicPtr::new(ptr::null_mut()),
798 futex: super::handshake::Handshake::new(),
799 wake_time: AtomicU64::new(0),
800 wakeup_stats: UnsafeCell::new(PlatStats::default()),
801 request_stats: UnsafeCell::new(PlatStats::default()),
802 sched_delay_ns: UnsafeCell::new(0),
803 phase_snapshots: UnsafeCell::new(Vec::new()),
804 requests: TreiberStack::new(),
805 pending: AtomicU64::new(0),
806 pipe_bytes,
807 pipe_page: UnsafeCell::new(vec![0u8; pipe_bytes].into_boxed_slice()),
808 }
809 }
810
811 /// Pipe-mode transfer: fill this thread's `pipe_page` (`pipe_bytes` bytes) with
812 /// `val`. The page is never read back, so the fill would be DCE-eligible. The
813 /// trailing `black_box(page.as_ptr())` ESCAPES THE ADDRESS (not a loaded value):
814 /// the optimizer must then assume the opaque call may read through the pointer,
815 /// so every byte of the fill must be in place at that point -- the fill is
816 /// retained. This is stronger than `black_box(value)`, which only keeps a value
817 /// live and (per the optimization-resistance note in `worker::worker_main`) does
818 /// NOT by itself stop a backing STORE from being elided -- which is why the
819 /// cache-pressure workloads there use `write_volatile`. Here the faithful
820 /// schbench op is a memset (`schbench.c:980-981`/`:1003-1004`) and `slice::fill`
821 /// lowers to one; a `write_volatile` byte loop would not. The memory traffic IS
822 /// the workload.
823 ///
824 /// SAFETY: the caller guarantees no concurrent access to `pipe_page`. The only
825 /// callers are the owning worker (before it adds itself to the wait-list) and
826 /// the message thread (after it splices the worker off the wait-list); the
827 /// load-bearing edge is fill-2-before-add (holding even on the no-block
828 /// shutdown path), so the add/splice + futex handshake order these with no
829 /// overlap (see the `Sync` SAFETY note).
830 unsafe fn pipe_fill(&self, val: u8) {
831 // SAFETY: exclusive access per the caller contract; the box lives for the
832 // `ThreadData`'s lifetime.
833 let page = unsafe { &mut *self.pipe_page.get() };
834 page.fill(val);
835 std::hint::black_box(page.as_ptr());
836 }
837}
838
839/// schbench's per-request think-time sleep ("simulated networking",
840/// `schbench.c:1461`). This is the workload's defined behavior, not a
841/// synchronization wait: `usleep` maps to `clock_nanosleep`, matching
842/// schbench's syscall profile.
843fn think_sleep(usec: u64) {
844 std::thread::sleep(std::time::Duration::from_micros(usec));
845}
846
847/// schbench's `--split` op partition (`schbench.c:1391-1392`): of `operations`
848/// total, `p`% run on the private matrix and the rest on the shared matrix.
849/// Returns `(ops_shared, ops_private)`. Extracted so the integer-division split
850/// is pinned by a unit test rather than re-implemented inline in `do_work`.
851fn ops_split(operations: usize, p: usize) -> (usize, usize) {
852 let ops_private = (operations * p) / 100;
853 let ops_shared = operations - ops_private;
854 (ops_shared, ops_private)
855}
856
857/// One work cycle's matrix multiplications under the per-CPU lock. Faithful to
858/// schbench's `do_work` (`schbench.c:1379-1413`): take the current CPU's mutex
859/// (unless `skip_locking`), run `operations` matrix multiplies, unlock. With
860/// `split_percent` set, the ops are partitioned (`schbench.c:1390-1404`):
861/// `ops_shared` run on the ONE process-global shared matrix (every worker
862/// contends), then `ops_private` on this worker's private buffer; `None` is the
863/// legacy all-private single matrix (`:1406-1408`). The guard drops at the end,
864/// holding the lock across all operations exactly as schbench does (`:1387`/
865/// `:1411`).
866#[allow(clippy::too_many_arguments)]
867fn do_work(
868 private_buf: &mut [u64],
869 private_matrix_size: usize,
870 shared_buf: &[core::sync::atomic::AtomicU64],
871 shared_matrix_size: usize,
872 split_percent: Option<usize>,
873 operations: usize,
874 locks: Option<&PerCpuLocks>,
875 work_units: &mut u64,
876) {
877 let _guard = locks.map(|l| l.lock_this_cpu());
878 match split_percent {
879 Some(p) => {
880 let (ops_shared, ops_private) = ops_split(operations, p);
881 // schbench.c:1395-1398: shared ops first, on the global matrix.
882 if shared_matrix_size > 0 && ops_shared > 0 {
883 for _ in 0..ops_shared {
884 crate::workload::worker::matrix_multiply_shared(
885 shared_buf,
886 shared_matrix_size,
887 work_units,
888 );
889 }
890 }
891 // schbench.c:1401-1404: private ops second, on this worker's buffer.
892 if private_matrix_size > 0 && ops_private > 0 {
893 for _ in 0..ops_private {
894 crate::workload::worker::matrix_multiply(
895 private_buf,
896 private_matrix_size,
897 work_units,
898 );
899 }
900 }
901 }
902 None => {
903 // schbench.c:1406-1408: legacy all-private single matrix.
904 for _ in 0..operations {
905 if private_matrix_size > 0 {
906 crate::workload::worker::matrix_multiply(
907 private_buf,
908 private_matrix_size,
909 work_units,
910 );
911 }
912 }
913 }
914 }
915}
916
917/// Worker side of one wakeup cycle. Faithful to schbench's `msg_and_wait`
918/// (`schbench.c:997`, default branch): stamp our `wake_time`, push ourselves
919/// onto the message thread's wait-list, wake the message thread, then block
920/// until it wakes us back; record the wakeup (scheduler) latency. The `!stop`
921/// guard mirrors schbench's `if (!stopping)` (`schbench.c:1030`) so we do not
922/// block during shutdown.
923fn msg_and_wait(
924 td: &ThreadData,
925 msg_td: &ThreadData,
926 wait_list: &TreiberStack<ThreadData>,
927 stop: &AtomicBool,
928) {
929 // Our futex is BLOCKED here (consumed by the prior wait, or fresh).
930 // Pipe mode: write our transfer page to 2 (schbench memsets td->pipe_page
931 // before blocking, schbench.c:1003-1004). Done before we enqueue on the
932 // wait-list, so the waker's fill-to-1 (after it splices us off) never races --
933 // see the Sync SAFETY note.
934 if td.pipe_bytes > 0 {
935 // SAFETY: we own this page and have not yet enqueued, so the waker cannot
936 // touch it concurrently.
937 unsafe { td.pipe_fill(2) };
938 }
939 td.wake_time.store(monotonic_nanos(), Ordering::Release);
940 wait_list.add(td as *const ThreadData as *mut ThreadData);
941 msg_td.futex.post();
942 if !stop.load(Ordering::Acquire) {
943 td.futex.wait_forever();
944 }
945 let now = monotonic_nanos();
946 let wake = td.wake_time.load(Ordering::Acquire);
947 // schbench buckets in microseconds (gettimeofday resolution); the monotonic
948 // clock gives ns, so divide. `if delta > 0` matches schbench (`:1036`).
949 let delta_us = now.saturating_sub(wake) / 1000;
950 if delta_us > 0 {
951 // SAFETY: only this worker thread accesses its own wakeup_stats cell.
952 unsafe { (*td.wakeup_stats.get()).add_lat(delta_us.min(u32::MAX as u64) as u32) };
953 }
954}
955
956/// Worker side of one RPS-injector cycle. Faithful to schbench's `msg_and_wait`
957/// RPS branch (`schbench.c:1007-1037`): stamp our own `wake_time`, reset
958/// `pending`, then splice this worker's request queue (FIFO). If non-empty,
959/// return the chain immediately (fast path, no wait, no wakeup sample --
960/// `:1014-1017`). If empty, block on this worker's own futex until the RPS thread
961/// posts it, record the wakeup (scheduler) latency, then splice whatever it
962/// queued (null on a spurious wake or the shutdown wake-all). Unlike
963/// [`msg_and_wait`], the worker does NOT enqueue on the message thread's
964/// wait-list or post a message thread -- the RPS thread is the sole waker.
965///
966/// `wake_time` is stamped HERE before the splice (schbench `:1008`) AND by the
967/// RPS thread at enqueue (`:1294`). On a TRUE block the RPS thread's stamp wins
968/// (it overwrites ours before posting), so `now - wake_time` is the enqueue->run
969/// scheduler latency. But on a FAST wake -- a leftover RUNNING futex token from a
970/// request the worker already drained via splice (the word is consumed by splice,
971/// not by `wait`, so it stays RUNNING) -- the RPS thread does NOT re-stamp, so our
972/// own stamp makes the delta measure THIS block decision (near-zero), not the
973/// stale enqueue time of the already-served request. Omitting it inflates the
974/// fast-wake samples to the full inter-request gap (the wakeup-p50 bug: 543ms
975/// vs schbench's 7µs).
976fn rps_wait(td: &ThreadData, stop: &AtomicBool) -> *mut Request {
977 // Stamp our own wake_time before splicing (schbench `:1008`); see the doc for
978 // why this is what keeps fast-wake samples honest.
979 td.wake_time.store(monotonic_nanos(), Ordering::Release);
980 // Reset the backpressure counter before draining (schbench `td->pending = 0`,
981 // `:1012`): the RPS thread stops enqueuing while pending exceeds the batch
982 // cap, so clearing it re-opens this worker for the next batch.
983 td.pending.store(0, Ordering::Release);
984 let chain = td.requests.splice_reversed();
985 if !chain.is_null() {
986 return chain; // work already queued: serve it without blocking
987 }
988 if stop.load(Ordering::Acquire) {
989 return ptr::null_mut();
990 }
991 // Empty queue: block until the RPS thread posts our futex, then `now -
992 // wake_time` is the scheduler wakeup latency (schbench `fwait` NULL then
993 // add_lat, `:1032`/`:1034-1037`).
994 td.futex.wait_forever();
995 // Shutdown wake: if stop is set, the wake came from run_rps_thread's stop-path
996 // wake-all (`:1308-1312`), which posts every worker WITHOUT restamping
997 // wake_time. A worker blocked between bursts when stop fires would otherwise
998 // record `now - (its own stale block-time stamp)` -- the whole inter-burst
999 // gap -- as a phantom wakeup, one garbage tail sample per worker. That is not
1000 // a request-driven wakeup, so skip the sample (schbench's `if (!stopping)
1001 // fwait` guard, `:1030`, encodes the same intent: shutdown is not a latency
1002 // event). Return any leftover requests for the caller to free.
1003 if stop.load(Ordering::Acquire) {
1004 return td.requests.splice_reversed();
1005 }
1006 let now = monotonic_nanos();
1007 let wake = td.wake_time.load(Ordering::Acquire);
1008 let delta_us = now.saturating_sub(wake) / 1000;
1009 if delta_us > 0 {
1010 // SAFETY: only this worker thread accesses its own wakeup_stats cell.
1011 unsafe { (*td.wakeup_stats.get()).add_lat(delta_us.min(u32::MAX as u64) as u32) };
1012 }
1013 // Splice whatever the RPS thread enqueued before posting us (null if the post
1014 // was a spurious wake with nothing queued).
1015 td.requests.splice_reversed()
1016}
1017
1018/// Free a chain of [`Request`]s spliced off a worker's queue but not processed
1019/// (the shutdown path, and the post-join cleanup of any requests the RPS thread
1020/// enqueued after a worker's last splice). Each node was `Box::into_raw`'d by the
1021/// RPS thread; the draining/cleaning thread owns them exclusively after the
1022/// splice, so each is freed exactly once.
1023fn free_request_chain(mut req: *mut Request) {
1024 while !req.is_null() {
1025 // SAFETY: `req` is a node taken via `splice_reversed` (exclusive
1026 // ownership); freed exactly once.
1027 let next = unsafe { (*req).next.load(Ordering::Acquire) };
1028 drop(unsafe { Box::from_raw(req) });
1029 req = next;
1030 }
1031}
1032
1033/// Wake every worker on the wait-list, stamping each one's `wake_time` so it
1034/// can measure scheduler latency. Faithful to schbench's `xlist_wake_all`
1035/// (`schbench.c:969`): splice the whole list, read the clock ONCE, stamp every
1036/// worker with that single time, then post each. The single clock read is what
1037/// detects the scheduler preempting the waker mid-batch (`schbench.c:961-964`).
1038/// Pipe mode (`-p`) diverges, matching schbench (`schbench.c:980-984`): it memsets
1039/// each worker's transfer page to `1` and re-reads the clock PER worker (so the
1040/// memset cost is not charged to that worker's wakeup latency).
1041fn wake_all(wait_list: &TreiberStack<ThreadData>) {
1042 let mut cur = wait_list.splice();
1043 let now = monotonic_nanos();
1044 while !cur.is_null() {
1045 // SAFETY: `cur` is a worker `ThreadData` alive for the whole run; only
1046 // its atomic fields (next / wake_time / futex) are touched here.
1047 let td = unsafe { &*cur };
1048 let next = td.next.load(Ordering::Acquire);
1049 td.next.store(ptr::null_mut(), Ordering::Relaxed);
1050 if td.pipe_bytes > 0 {
1051 // Pipe mode: write this woken worker's transfer page to 1 (schbench
1052 // memsets list->pipe_page in the wake loop, schbench.c:980-981). We
1053 // spliced it off the wait-list and it is still blocked (our `post`
1054 // below wakes it), so no concurrent access -- see the Sync SAFETY note.
1055 // SAFETY: `td` is off the wait-list and blocked here.
1056 unsafe { td.pipe_fill(1) };
1057 // schbench re-reads the clock per worker in pipe mode (after the
1058 // memset, schbench.c:982) so the fill is not charged to the worker's
1059 // wakeup latency; non-pipe uses the single batched `now`.
1060 td.wake_time.store(monotonic_nanos(), Ordering::Release);
1061 } else {
1062 td.wake_time.store(now, Ordering::Release);
1063 }
1064 td.futex.post();
1065 cur = next;
1066 }
1067}
1068
1069/// The message thread's loop. Faithful to schbench's `run_msg_thread`
1070/// (`schbench.c:1166`): batch-wake all waiting workers, then block until a
1071/// worker posts us back. On stop, drain once more (to wake any worker that
1072/// queued during the final wake) and exit.
1073///
1074/// `phase_epoch` drives the per-phase drain exactly as in [`worker_loop`], but
1075/// the message thread records only per-phase RUN-DELAY (its `wakeup`/`request`
1076/// histograms stay empty, taken empty by [`drain_phase`]). It polls the epoch
1077/// after each wake cycle; a phase in which no worker ever posts it leaves that
1078/// phase's run-delay drained late at the next wake (bounded boundary fuzz,
1079/// matching the worker drain).
1080fn run_msg_thread(
1081 msg_td: &ThreadData,
1082 wait_list: &TreiberStack<ThreadData>,
1083 stop: &AtomicBool,
1084 phase_epoch: Option<&AtomicU32>,
1085) {
1086 let tid = gettid_self();
1087 let mut cur_epoch = phase_epoch.map_or(0, |e| e.load(Ordering::Relaxed));
1088 let mut phase_ss_start = read_schedstat_raw(tid);
1089 loop {
1090 // msg_td.futex is BLOCKED here (consumed by the prior wait, or fresh).
1091 wake_all(wait_list);
1092 if stop.load(Ordering::Acquire) {
1093 wake_all(wait_list);
1094 break;
1095 }
1096 msg_td.futex.wait_forever();
1097 if let Some(pe) = phase_epoch {
1098 let new_epoch = pe.load(Ordering::Relaxed);
1099 if new_epoch != cur_epoch {
1100 let ss_end = read_schedstat_raw(tid);
1101 // SAFETY: this is the thread that owns msg_td (the coordinator
1102 // thread of run_one_message_thread).
1103 unsafe { drain_phase(msg_td, cur_epoch, phase_ss_start, ss_end, 0) };
1104 cur_epoch = new_epoch;
1105 phase_ss_start = ss_end;
1106 }
1107 }
1108 }
1109 // Final drain: close the still-open phase (the sole snapshot when non-phasic).
1110 let ss_end = read_schedstat_raw(tid);
1111 // SAFETY: this is the thread that owns msg_td.
1112 unsafe { drain_phase(msg_td, cur_epoch, phase_ss_start, ss_end, 0) };
1113 // Record the message thread's whole-run mean run-queue wait (`schbench.c:1664`),
1114 // reusing the cumulative `ss_end` pair just read — one /proc read, so the mean
1115 // and the final-phase delta derive from one consistent snapshot.
1116 // SAFETY: owner-only access to msg_td's sched_delay_ns cell.
1117 unsafe { *msg_td.sched_delay_ns.get() = mean_sched_delay(ss_end) };
1118}
1119
1120/// The RPS-injector dispatcher loop, replacing [`run_msg_thread`] when
1121/// `requests_per_sec != 0`. Each second it enqueues `requests_per_sec` requests
1122/// round-robin across this group's `workers`, skipping a worker whose `pending`
1123/// exceeds the batch cap (backpressure, `schbench.c:1284-1290`); on stop, it
1124/// wakes every worker and exits. Runs on the message thread's calling thread, so
1125/// — like [`run_msg_thread`] — it records the dispatcher thread's per-phase +
1126/// whole-run run-queue wait into `msg_td` (the "message thread delay" in RPS
1127/// mode, `schbench.c:1664-1670`).
1128///
1129/// DIVERGENCE from schbench's `run_rps_thread` (`schbench.c:1258-1314`),
1130/// intentional: schbench FRONT-LOADS the second (bursts all N enqueues, then
1131/// `usleep`s to fill the rest of the second) and relies on its workers
1132/// interleaving with the burst to keep each queue under the cap. That interleave
1133/// is scheduling-dependent; ktstr's burst instead front-ran the workers (the
1134/// first splice grabbed a large batch the worker drained to completion before
1135/// re-splicing), so `pending` stayed pegged and the injector dropped the
1136/// remainder every second -- a ~BATCH/worker supply cap that under-delivered the
1137/// requested rate (e.g. ~257/s at a 400/s target). ktstr instead PACES the
1138/// enqueues evenly across the second (one every `1s/N`), which forces the
1139/// injector to interleave with the workers deterministically: `pending` drains
1140/// between enqueues and the full rate is delivered up to worker capacity. Above
1141/// capacity the pacing collapses to a flat-out loop and the `BATCH` backpressure
1142/// bounds the queue, matching schbench's bounded-backlog overload behavior. The
1143/// pacing sleep and the bounded backpressure usleep are workload-defined `-R`
1144/// cadence (like the per-request think-sleep), not synchronization waits. Uses
1145/// `CLOCK_MONOTONIC` (ruling), not schbench's `gettimeofday`.
1146///
1147/// Verified by `ktstr-schbench-validate` (release, standalone), not a nextest
1148/// unit test: the under-delivery signal only appears when worker capacity exceeds
1149/// the front-load cap (~2*BATCH/s), which requires a release build (debug matrix
1150/// work caps capacity well below that) on dedicated CPU (nextest's parallel
1151/// execution starves the workers, collapsing both the paced and a hypothetical
1152/// reverted injector to the same capacity-limited rate). The fixed `-R` axis in
1153/// the schbench validation comparison is the regression evidence.
1154fn run_rps_thread(
1155 workers: &[ThreadData],
1156 msg_td: &ThreadData,
1157 stop: &AtomicBool,
1158 phase_epoch: Option<&AtomicU32>,
1159 live_rate: &AtomicUsize,
1160) {
1161 let tid = gettid_self();
1162 let mut cur_epoch = phase_epoch.map_or(0, |e| e.load(Ordering::Relaxed));
1163 let mut phase_ss_start = read_schedstat_raw(tid);
1164 // schbench's per-worker queue-depth cap before the injector backs off
1165 // (`schbench.c:1267`).
1166 const BATCH: u64 = 128;
1167 const ONE_SEC_NS: u64 = 1_000_000_000;
1168 let worker_count = workers.len().max(1);
1169 let mut cur_tid: usize = 0;
1170 while !stop.load(Ordering::Acquire) {
1171 // Re-read the live per-message-thread rate each second. `live_rate` is
1172 // already the per-thread value (schbench's post-`/= message_threads`
1173 // global `requests_per_sec`, `:1899`), injected directly with no further
1174 // division (`schbench.c:1290`). Auto-RPS retargets it from the control
1175 // thread; a fixed `-R` leaves it constant.
1176 let requests_per_sec = live_rate.load(Ordering::Relaxed);
1177 let start = monotonic_nanos();
1178 // PACE the enqueues evenly across the second rather than front-loading
1179 // them. schbench bursts all N then fills the rest of the second
1180 // (`schbench.c:1300-1306`), relying on its workers interleaving with the
1181 // burst to keep each worker's queue under the `BATCH` cap. ktstr's burst
1182 // front-ran the workers: the first splice grabbed a large batch the worker
1183 // processed to completion before re-splicing, so `pending` stayed pegged
1184 // and the injector dropped the remainder every second (a steady-state
1185 // ~BATCH/worker supply cap -> only ~257/s at a 400/s target). Pacing one
1186 // enqueue every `1s/N` forces the injector to interleave with the workers,
1187 // so `pending` drains between enqueues and the full rate is delivered up
1188 // to worker capacity. Above capacity `due` falls behind real time, the
1189 // pacing sleep collapses to zero, and the `BATCH` backpressure bounds the
1190 // queue -- matching schbench's overload behavior (a bounded backlog).
1191 let interval_ns = if requests_per_sec > 0 {
1192 ONE_SEC_NS / requests_per_sec as u64
1193 } else {
1194 ONE_SEC_NS
1195 };
1196 let mut due = start;
1197 for _ in 0..requests_per_sec {
1198 if stop.load(Ordering::Acquire) {
1199 break;
1200 }
1201 // Wait until this enqueue's scheduled time. A no-op once `due` falls
1202 // behind real time (rate above what the workers can drain), so the
1203 // loop then runs flat-out and the backpressure below bounds the queue.
1204 // `due` is re-based to `start` each second and accrues at most
1205 // N*(ONE_SEC_NS/N) ~= 1s within a pass, so saturating_add never wraps
1206 // (the saturating form documents that the per-pass re-base is the
1207 // load-bearing invariant).
1208 due = due.saturating_add(interval_ns);
1209 // Pace to this enqueue's scheduled time, but cap each sleep at
1210 // STOP_POLL_QUANTUM_NS and re-check `stop`, so a shutdown during a long
1211 // inter-arrival gap (a degenerate low rate, where the interval can
1212 // exceed the cleanup/watchdog budget) is observed within the quantum
1213 // instead of after a full interval — mirroring the taobench open-loop
1214 // pacing. A stop observed here breaks the enqueue loop BEFORE issuing
1215 // this slot's request.
1216 let mut stop_during_pace = false;
1217 loop {
1218 if stop.load(Ordering::Acquire) {
1219 stop_during_pace = true;
1220 break;
1221 }
1222 let now = monotonic_nanos();
1223 if now >= due {
1224 break;
1225 }
1226 let nap = (due - now).min(STOP_POLL_QUANTUM_NS);
1227 std::thread::sleep(std::time::Duration::from_nanos(nap));
1228 }
1229 if stop_during_pace {
1230 break;
1231 }
1232 let worker = &workers[cur_tid % worker_count];
1233 cur_tid += 1;
1234 // Backpressure: don't queue more to a worker already `BATCH` deep
1235 // (`schbench.c:1284-1290`). The bounded usleep is schbench's injector
1236 // throttle (workload behavior), not a sync poll. The round-robin
1237 // cursor already advanced, so the skipped slot moves to the next
1238 // worker, matching schbench (the request for this slot is dropped).
1239 if worker.pending.load(Ordering::Acquire) > BATCH {
1240 std::thread::sleep(std::time::Duration::from_micros(100));
1241 continue;
1242 }
1243 worker.pending.fetch_add(1, Ordering::AcqRel);
1244 // schbench mallocs one request per enqueue; the worker frees it after
1245 // processing (`schbench.c:1292`/`:1476`). Box matches that profile.
1246 let req = Box::into_raw(Box::new(Request::new()));
1247 worker.requests.add(req);
1248 // Stamp the target's wake_time so it measures wakeup latency as
1249 // now - enqueue (`schbench.c:1294`), then post it.
1250 worker.wake_time.store(monotonic_nanos(), Ordering::Release);
1251 worker.futex.post();
1252 }
1253 // The pacing loop already spans ~1 second; top up only if it finished
1254 // early (rate 0, or `stop`), preserving schbench's per-second cadence. Cap
1255 // each sleep at STOP_POLL_QUANTUM_NS and re-check `stop` so a shutdown
1256 // during the top-up is observed within the quantum, not after a full
1257 // remaining second (the same teardown bound as the pacing sleep above).
1258 loop {
1259 if stop.load(Ordering::Acquire) {
1260 break;
1261 }
1262 let elapsed = monotonic_nanos().saturating_sub(start);
1263 if elapsed >= ONE_SEC_NS {
1264 break;
1265 }
1266 let nap = (ONE_SEC_NS - elapsed).min(STOP_POLL_QUANTUM_NS);
1267 std::thread::sleep(std::time::Duration::from_nanos(nap));
1268 }
1269 // Dispatcher per-phase run-delay drain (mirrors run_msg_thread): on epoch
1270 // change, finalize this thread's run-delay for the ended phase. Empty
1271 // histograms (the dispatcher records only run-delay).
1272 if let Some(pe) = phase_epoch {
1273 let new_epoch = pe.load(Ordering::Relaxed);
1274 if new_epoch != cur_epoch {
1275 let ss_end = read_schedstat_raw(tid);
1276 // SAFETY: this thread owns msg_td (the dispatcher's ThreadData).
1277 unsafe { drain_phase(msg_td, cur_epoch, phase_ss_start, ss_end, 0) };
1278 cur_epoch = new_epoch;
1279 phase_ss_start = ss_end;
1280 }
1281 }
1282 if stop.load(Ordering::Acquire) {
1283 // Wake every worker so a blocked one observes stop and exits
1284 // (`schbench.c:1308-1312`).
1285 for w in workers {
1286 w.futex.post();
1287 }
1288 break;
1289 }
1290 }
1291 // Final drain + whole-run mean run-delay for the dispatcher thread, matching
1292 // run_msg_thread's exit (`schbench.c:1664`).
1293 let ss_end = read_schedstat_raw(tid);
1294 // SAFETY: this thread owns msg_td.
1295 unsafe { drain_phase(msg_td, cur_epoch, phase_ss_start, ss_end, 0) };
1296 // SAFETY: owner-only access to msg_td's sched_delay_ns cell.
1297 unsafe { *msg_td.sched_delay_ns.get() = mean_sched_delay(ss_end) };
1298}
1299
1300/// The calling thread's kernel tid (`gettid`), for reading its own schedstat.
1301fn gettid_self() -> libc::pid_t {
1302 // SAFETY: gettid takes no arguments and only returns the caller's tid.
1303 unsafe { libc::syscall(libc::SYS_gettid) as libc::pid_t }
1304}
1305
1306/// Raw `(run_delay_ns, pcount)` pair from `/proc/<tid>/schedstat` (fields 2, 3),
1307/// UNDIVIDED — schbench's `read_sched_delay` source (`schbench.c:1118`) without
1308/// the division. Both consumers want the raw pair, not a pre-divided mean: the
1309/// per-phase path re-pools `Σrun_delay / Σpcount` host-side, and the whole-run
1310/// per-thread mean is [`mean_sched_delay`] of the thread's final cumulative
1311/// pair. An absent file (the thread exited) yields `(0, 0)`, like schbench's
1312/// fopen-failure path.
1313///
1314/// `CONFIG_SCHED_INFO` (selected by `CONFIG_SCHEDSTATS`, and also by
1315/// `CONFIG_TASK_DELAY_ACCT` -- ktstr.kconfig enables both) populates these
1316/// fields even under sched_ext and regardless of the `kernel.sched_schedstats`
1317/// sysctl (that sysctl gates the separate per-rq/domain counters, not
1318/// `sched_info`).
1319fn read_schedstat_raw(tid: libc::pid_t) -> (u64, u64) {
1320 match std::fs::read_to_string(format!("/proc/{tid}/schedstat")) {
1321 Ok(s) => parse_schedstat_raw(&s),
1322 // The thread may have exited; schbench's fopen-failure path also -> 0.
1323 Err(_) => (0, 0),
1324 }
1325}
1326
1327/// Parse a `/proc/<tid>/schedstat` line into the raw `(run_delay_ns, pcount)`
1328/// pair (field 2 = run_delay ns, field 3 = pcount = timeslices). The kernel's
1329/// `proc_pid_schedstat` always emits three integer fields, so a malformed
1330/// *present* line is a kernel/parse bug: panic rather than silently report 0
1331/// (matching the handshake's fail-loud stance + the no-silent-wrong-answer
1332/// rule). An absent file is the caller's concern, handled as `(0, 0)`.
1333fn parse_schedstat_raw(s: &str) -> (u64, u64) {
1334 let mut fields = s.split_whitespace();
1335 let _run = fields.next(); // field 1: sum_exec_runtime (on-CPU ns), unused
1336 let run_delay: u64 = fields
1337 .next()
1338 .and_then(|f| f.parse().ok())
1339 .expect("schedstat field 2 (run_delay) must be a present integer");
1340 let pcount: u64 = fields
1341 .next()
1342 .and_then(|f| f.parse().ok())
1343 .expect("schedstat field 3 (pcount) must be a present integer");
1344 (run_delay, pcount)
1345}
1346
1347/// Mean per-schedule run-queue wait (ns) from a raw `(run_delay_ns, pcount)`
1348/// pair: `run_delay / pcount`, schbench's `read_sched_delay` arithmetic
1349/// (`schbench.c:1146`). Guards `pcount == 0` (a never-scheduled thread, or the
1350/// `!sched_info_on()` "0 0 0" line) to 0, where schbench divides by zero. Used
1351/// for the WHOLE-RUN per-thread mean (the mean-of-means component); the
1352/// per-phase path instead keeps the raw pair and re-pools `Σrd/Σpc` host-side.
1353fn mean_sched_delay((run_delay, pcount): (u64, u64)) -> u64 {
1354 if pcount == 0 { 0 } else { run_delay / pcount }
1355}
1356
1357/// Finalize the just-ended phase on the OWNING thread: `take` the live
1358/// histograms (snapshot-and-reset, so the next phase starts clean) and push a
1359/// [`PhaseSnapshot`] tagged `epoch`, carrying the schedstat run-delay/pcount
1360/// delta (`ss_end − ss_start`) and this thread's `loop_count` for the phase.
1361/// Mirrors the worker/mod.rs:3858-3894 backdrop drain-on-change. For the
1362/// message thread the histograms are unwritten (taken empty) and `loop_count`
1363/// is 0; an empty histogram folds harmlessly host-side ([`PlatStats::combine`]).
1364///
1365/// # Safety
1366/// Must be called by the thread that owns `td` — it is the sole writer of
1367/// `td`'s `wakeup_stats` / `request_stats` / `phase_snapshots` cells (the
1368/// [`ThreadData`] `Sync` contract).
1369unsafe fn drain_phase(
1370 td: &ThreadData,
1371 epoch: u32,
1372 ss_start: (u64, u64),
1373 ss_end: (u64, u64),
1374 loop_count: u64,
1375) {
1376 // SAFETY: owner-only cell access, per this function's contract.
1377 unsafe {
1378 let wakeup = (*td.wakeup_stats.get()).take();
1379 let request = (*td.request_stats.get()).take();
1380 (*td.phase_snapshots.get()).push(PhaseSnapshot {
1381 epoch,
1382 wakeup,
1383 request,
1384 run_delay_ns: ss_end.0.saturating_sub(ss_start.0),
1385 pcount: ss_end.1.saturating_sub(ss_start.1),
1386 loop_count,
1387 });
1388 }
1389}
1390
1391/// The per-message-thread shared context every worker borrows — bundled so
1392/// [`worker_loop`]'s signature stays within clippy's argument budget. All
1393/// fields are `Copy` shared references; every worker spawned by one
1394/// [`run_one_message_thread`] reads the same instance.
1395struct WorkerCtx<'a> {
1396 msg_td: &'a ThreadData,
1397 wait_list: &'a TreiberStack<ThreadData>,
1398 locks: Option<&'a PerCpuLocks>,
1399 config: &'a SchbenchConfig,
1400 stop: &'a AtomicBool,
1401 progress: &'a AtomicU64,
1402 phase_epoch: Option<&'a AtomicU32>,
1403 /// The ONE process-global shared matrix (`--split` mode), shared across
1404 /// every worker of every message thread; empty slice when not splitting.
1405 shared: &'a [core::sync::atomic::AtomicU64],
1406 shared_matrix_size: usize,
1407}
1408
1409/// One worker thread's loop. Faithful to schbench's `worker_thread`
1410/// (`schbench.c:1419`, default branch): block until woken (recording wakeup
1411/// latency), then think-sleep + matrix work under the per-CPU lock (recording
1412/// request latency), until stop. `ctx.progress` counts completed work cycles
1413/// across all workers (the live cycle counter / achieved request rate).
1414///
1415/// `ctx.phase_epoch` (when `Some`) is the shared per-phase generation the
1416/// scenario engine bumps at each step boundary. The worker polls it once per
1417/// cycle and, on change, drains the just-ended phase ([`drain_phase`]) —
1418/// `take`ing its own histograms + a schedstat raw delta — so per-phase
1419/// percentiles are isolated across e.g. an scx→detached-EEVDF transition. A
1420/// final drain at exit closes the last phase (and is the SOLE snapshot when
1421/// non-phasic, `cur_epoch` 0).
1422fn worker_loop(td: &ThreadData, ctx: &WorkerCtx) {
1423 // Destructure the all-`Copy` context into the same bindings the body used
1424 // before the bundle, so the loop logic below reads unchanged.
1425 let WorkerCtx {
1426 msg_td,
1427 wait_list,
1428 locks,
1429 config,
1430 stop,
1431 progress,
1432 phase_epoch,
1433 shared,
1434 shared_matrix_size,
1435 } = *ctx;
1436 let tid = gettid_self();
1437 // Per-worker PRIVATE matrix (schbench.c:1568-1574: private_matrix_size when
1438 // splitting, else the full matrix_size). The shared matrix (split mode) is
1439 // the process-global `shared` allocated once in run().
1440 let private_matrix_size = config.private_matrix_size();
1441 let mut private_buf = if private_matrix_size > 0 {
1442 vec![0u64; 3 * private_matrix_size * private_matrix_size]
1443 } else {
1444 Vec::new()
1445 };
1446 let mut work_units = 0u64;
1447 // Per-phase drain-on-change state. `cur_epoch` is the epoch the current
1448 // phase's samples belong to; `phase_ss_start` baselines the schedstat raw
1449 // pair so each phase reports its own delta.
1450 let mut cur_epoch = phase_epoch.map_or(0, |e| e.load(Ordering::Relaxed));
1451 let mut phase_ss_start = read_schedstat_raw(tid);
1452 let mut phase_loop_count = 0u64;
1453 // RPS mode iff the per-thread rate is non-zero (the `-R` total covers at
1454 // least one request/sec per message thread). Below that it rounds to 0 and
1455 // the worker runs the default message-handshake mode -- matching schbench's
1456 // startup-divide-before-gate ([`SchbenchConfig::rps_per_message_thread`]).
1457 // Pipe mode (`-p`): the memory-transfer workload. It runs the message-handshake
1458 // path (where the pipe-page memsets live), NOT the RPS injector -- a ktstr
1459 // divergence; schbench composes -p+-R half-broken (see the `pipe_transfer_bytes`
1460 // field doc).
1461 let pipe_mode = config.pipe_transfer_bytes > 0;
1462 let rps_mode = !pipe_mode && config.rps_per_message_thread() != 0;
1463 while !stop.load(Ordering::Acquire) {
1464 // Acquire work. Default mode: the message-thread handshake (records
1465 // wakeup latency; no request chain -> a single work cycle). RPS mode:
1466 // splice this worker's request queue, blocking on its own futex until
1467 // the RPS thread posts; returns the spliced chain (FIFO) to drain.
1468 let mut req: *mut Request = if rps_mode {
1469 let chain = rps_wait(td, stop);
1470 if stop.load(Ordering::Acquire) {
1471 // Free any spliced-but-unprocessed requests, then exit.
1472 free_request_chain(chain);
1473 break;
1474 }
1475 if chain.is_null() {
1476 continue; // spurious wake with an empty queue
1477 }
1478 chain
1479 } else {
1480 msg_and_wait(td, msg_td, wait_list, stop);
1481 if stop.load(Ordering::Acquire) {
1482 break;
1483 }
1484 ptr::null_mut()
1485 };
1486 // Process the work: one cycle in default mode (`req` null), or one cycle
1487 // per queued request in RPS mode (freeing each). `work_start` is stamped
1488 // before the think-sleep so the request latency covers think-time +
1489 // matrix work (`schbench.c:1464-1481`).
1490 loop {
1491 let work_start = monotonic_nanos();
1492 // Pipe mode: the per-cycle transfer is the wake/block memsets
1493 // (msg_and_wait fills our page to 2; wake_all fills it to 1). The work
1494 // loop only counts cycles for the MB/s throughput; schbench likewise
1495 // skips do_work + the think-sleep in pipe mode (schbench.c:1448).
1496 if !pipe_mode {
1497 if config.sleep_usec > 0 {
1498 think_sleep(config.sleep_usec);
1499 }
1500 do_work(
1501 &mut private_buf,
1502 private_matrix_size,
1503 shared,
1504 shared_matrix_size,
1505 config.split_percent,
1506 config.operations,
1507 locks,
1508 &mut work_units,
1509 );
1510 }
1511 let now = monotonic_nanos();
1512 let delta_us = now.saturating_sub(work_start) / 1000;
1513 if delta_us > 0 {
1514 // SAFETY: only this worker thread accesses its own request_stats cell.
1515 unsafe { (*td.request_stats.get()).add_lat(delta_us.min(u32::MAX as u64) as u32) };
1516 }
1517 progress.fetch_add(1, Ordering::Relaxed);
1518 phase_loop_count += 1;
1519 if req.is_null() {
1520 break; // default mode: a single work cycle
1521 }
1522 // RPS mode: advance to the next queued request and free this one.
1523 // SAFETY: `req` was `Box::into_raw`'d by the RPS thread and handed to
1524 // this worker exclusively via the queue splice; we own it and free it
1525 // exactly once here.
1526 let next = unsafe { (*req).next.load(Ordering::Acquire) };
1527 drop(unsafe { Box::from_raw(req) });
1528 req = next;
1529 if req.is_null() {
1530 break;
1531 }
1532 }
1533 // Drain-on-change: when the parent advances `phase_epoch`, finalize the
1534 // phase just ended (tagged with the OLD epoch the samples were recorded
1535 // under) and re-baseline. A worker blocked across a whole phase simply
1536 // contributes an empty histogram for it (no cycle ran), which folds
1537 // harmlessly.
1538 if let Some(pe) = phase_epoch {
1539 let new_epoch = pe.load(Ordering::Relaxed);
1540 if new_epoch != cur_epoch {
1541 let ss_end = read_schedstat_raw(tid);
1542 // SAFETY: this is the thread that owns `td`.
1543 unsafe { drain_phase(td, cur_epoch, phase_ss_start, ss_end, phase_loop_count) };
1544 cur_epoch = new_epoch;
1545 phase_ss_start = ss_end;
1546 phase_loop_count = 0;
1547 }
1548 }
1549 }
1550 // Shutdown wake: the loop exits on `stop` without a final `msg_and_wait`, so
1551 // the message thread may still be parked in `wait_forever` (its only waker is
1552 // a worker post). schbench's MAIN thread handles this -- it fposts each
1553 // message thread after `stopping = 1` (`schbench.c:1832`, `:1933`) -- but our
1554 // `run()` joins passively and `msg_td` is private to `run_one_message_thread`,
1555 // so the exiting worker wakes its own message thread instead. Without this, a
1556 // worker that exits mid-`do_work` leaves the message thread parked forever and
1557 // `run()` deadlocks. Idempotent across workers: the first post the message
1558 // thread observes makes it re-check `stop` and break; later posts are no-ops.
1559 // In RPS mode this is itself a harmless no-op: the dispatcher runs
1560 // `run_rps_thread` (which never parks on `msg_td`) and observes `stop` itself.
1561 msg_td.futex.post();
1562 // Final drain: close the still-open phase. When non-phasic this is the only
1563 // snapshot (`cur_epoch` 0), so run() builds the whole-run result uniformly
1564 // from snapshots.
1565 let ss_end = read_schedstat_raw(tid);
1566 // SAFETY: this is the thread that owns `td`.
1567 unsafe { drain_phase(td, cur_epoch, phase_ss_start, ss_end, phase_loop_count) };
1568 // Record this worker's whole-run mean run-queue wait at exit — the
1569 // mean-of-means component for the whole-run SchbenchResult (schbench reads
1570 // each thread's schedstat for the final aggregate, `schbench.c:1664-1670`).
1571 // Reuses the cumulative `ss_end` pair just read (one /proc read; mean and
1572 // final-phase delta share one consistent snapshot).
1573 // SAFETY: owner-only access to this thread's sched_delay_ns cell.
1574 unsafe { *td.sched_delay_ns.get() = mean_sched_delay(ss_end) };
1575}
1576
1577/// Resolve the worker-thread default and the per-CPU lock-array size from the
1578/// calling thread's CPU affinity (the allocated cpuset, per ruling). Returns
1579/// `(allowed_cpu_count, lock_array_size)` where the array size is the highest
1580/// allowed CPU id + 1, so `sched_getcpu` indexes it without clamping even on a
1581/// sparse cpuset.
1582fn resolve_cpu_topology() -> (usize, usize) {
1583 // SAFETY: a zeroed cpu_set_t filled by sched_getaffinity for the calling
1584 // thread (pid 0); CPU_ISSET only reads it.
1585 unsafe {
1586 let mut set: libc::cpu_set_t = core::mem::zeroed();
1587 let rc = libc::sched_getaffinity(0, core::mem::size_of::<libc::cpu_set_t>(), &mut set);
1588 if rc != 0 {
1589 return (1, 1);
1590 }
1591 let mut count = 0usize;
1592 let mut max_id = 0usize;
1593 for cpu in 0..libc::CPU_SETSIZE as usize {
1594 if libc::CPU_ISSET(cpu, &set) {
1595 count += 1;
1596 max_id = cpu;
1597 }
1598 }
1599 (count.max(1), (max_id + 1).max(1))
1600 }
1601}
1602
1603/// Combined WHOLE-RUN results of a schbench run: the merged wakeup + request
1604/// latency percentiles and the achieved request rate (completed work
1605/// cycles/second). The histograms are the union of every phase; `sched_delay_*`
1606/// keep schbench's mean-of-means (`collect_sched_delay`) so the side-by-side
1607/// validation matches real schbench's reported number.
1608pub(crate) struct SchbenchResult {
1609 pub(crate) wakeup: Percentiles,
1610 pub(crate) request: Percentiles,
1611 /// Per-second achieved-RPS distribution (schbench's `rps_stats`), sampled
1612 /// once per second by the control thread. For auto-RPS, samples are gated
1613 /// until the target is hit (`schbench.c:1781`); for fixed/default mode every
1614 /// second is sampled.
1615 pub(crate) rps: Percentiles,
1616 pub(crate) loop_count: u64,
1617 /// Resolved total worker-thread count (`message_threads * worker_threads`).
1618 /// Divisor for schbench's PER-WORKER pipe-mode rate (`avg worker transfer`):
1619 /// schbench reports `loop_count / Σ worker runtimes` (`schbench.c:1697`,
1620 /// `:1942-1943`/`:1979`), and `Σ worker runtimes ≈ nr_workers * elapsed`, so
1621 /// the per-worker rate is `achieved_rps / nr_workers`.
1622 pub(crate) nr_workers: usize,
1623 pub(crate) achieved_rps: f64,
1624 /// Auto-RPS final TOTAL target rate at run exit: the live per-message-thread
1625 /// rate * message_threads (schbench's `requests_per_sec * message_threads`,
1626 /// `schbench.c:1995`). For fixed `-R` and default mode this equals the seeded
1627 /// total; only auto-RPS makes it diverge as the control loop retargets.
1628 pub(crate) final_rps_goal: usize,
1629 /// Mean message-thread run-queue wait (ns), averaged across message threads
1630 /// (schbench's `message_thread_delay`, `schbench.c:1673`).
1631 pub(crate) sched_delay_msg_ns: u64,
1632 /// Mean worker-thread run-queue wait (ns), averaged across all workers
1633 /// (schbench's `worker_thread_delay`, `schbench.c:1674`).
1634 pub(crate) sched_delay_worker_ns: u64,
1635}
1636
1637/// The full result of [`run`]: the whole-run [`SchbenchResult`] plus the
1638/// per-phase aggregates keyed by `phase_epoch` (each the cross-thread merge for
1639/// one scenario step's HOLD window). When non-phasic (`phase_epoch == None`)
1640/// `phases` holds the single `(0, ..)` baseline entry the host discards; tests
1641/// read `whole_run`. The worker dispatch turns each `(epoch, SchbenchPhaseStats)`
1642/// into a `PhaseSlice`.
1643pub(crate) struct SchbenchOutcome {
1644 pub(crate) whole_run: SchbenchResult,
1645 pub(crate) phases: Vec<(u32, SchbenchPhaseStats)>,
1646}
1647
1648/// One message thread's pooled results, returned from [`run_one_message_thread`]
1649/// to [`run`]: the whole-run histograms (Σ over its workers' phase snapshots),
1650/// the run-delay components for the whole-run mean-of-means, and the per-epoch
1651/// aggregate keyed by `phase_epoch`.
1652struct MessageThreadResult {
1653 whole_wakeup: PlatStats,
1654 whole_request: PlatStats,
1655 msg_sched_delay_ns: u64,
1656 workers_sched_delay_sum: u64,
1657 phases: std::collections::BTreeMap<u32, SchbenchPhaseStats>,
1658}
1659
1660/// Run one message thread plus its workers, returning the whole-run histograms
1661/// AND the per-epoch aggregate. The message thread runs on the calling thread;
1662/// workers are scoped so they are joined before their owner-only phase snapshots
1663/// are drained.
1664#[allow(clippy::too_many_arguments)]
1665fn run_one_message_thread(
1666 worker_threads: usize,
1667 locks: Option<&PerCpuLocks>,
1668 config: &SchbenchConfig,
1669 stop: &AtomicBool,
1670 progress: &AtomicU64,
1671 phase_epoch: Option<&AtomicU32>,
1672 live_rate: &AtomicUsize,
1673 shared: &[core::sync::atomic::AtomicU64],
1674 shared_matrix_size: usize,
1675) -> MessageThreadResult {
1676 // schbench clamps -p to PIPE_TRANSFER_BUFFER (schbench.c:291-294). Workers get
1677 // the per-thread transfer page; the message thread (waker) needs none.
1678 let pipe_bytes = config.pipe_transfer_bytes.min(PIPE_TRANSFER_BUFFER);
1679 let workers: Vec<ThreadData> = (0..worker_threads)
1680 .map(|_| ThreadData::new(pipe_bytes))
1681 .collect();
1682 let msg_td = ThreadData::new(0);
1683 let wait_list = TreiberStack::new();
1684 let ctx = WorkerCtx {
1685 msg_td: &msg_td,
1686 wait_list: &wait_list,
1687 locks,
1688 config,
1689 stop,
1690 progress,
1691 phase_epoch,
1692 shared,
1693 shared_matrix_size,
1694 };
1695
1696 std::thread::scope(|inner| {
1697 for w in &workers {
1698 inner.spawn(|| worker_loop(w, &ctx));
1699 }
1700 // Dispatcher: RPS-injector mode runs the rate-driven request thread;
1701 // default mode runs the message-thread handshake. Both run on this
1702 // calling thread and record the dispatcher's run-delay into `msg_td`
1703 // (schbench `message_thread`, `:1594`).
1704 // Pipe mode (`-p`) uses the message-handshake waker (run_msg_thread does
1705 // the per-worker pipe-page fill in wake_all) and takes precedence over -R.
1706 // This DIVERGES from schbench, where -R alone picks the path
1707 // (`schbench.c:1594`): schbench's `-p -R` runs the RPS injector (no
1708 // xlist_wake_all -> no waker-side memset -> a half-pipe). ktstr runs the
1709 // full pipe; the realistic use is `-p` alone. Otherwise -R selects the
1710 // injector.
1711 if config.pipe_transfer_bytes == 0 && config.rps_per_message_thread() != 0 {
1712 run_rps_thread(&workers, &msg_td, stop, phase_epoch, live_rate);
1713 } else {
1714 run_msg_thread(&msg_td, &wait_list, stop, phase_epoch);
1715 }
1716 // Stop is set: wake every worker so a blocked one observes stop and
1717 // exits (schbench fposts each worker before joining, `:1599-1602`).
1718 for w in &workers {
1719 w.futex.post();
1720 }
1721 // The inner scope joins the workers here.
1722 });
1723
1724 // RPS mode: free any requests the RPS thread enqueued after a worker's last
1725 // splice (the worker exited on stop without draining them). After the join,
1726 // this is the sole access to each worker's queue, so no request leaks across
1727 // run() calls.
1728 if config.rps_per_message_thread() != 0 {
1729 for w in &workers {
1730 free_request_chain(w.requests.splice_reversed());
1731 }
1732 }
1733
1734 // After join (happens-before): drain each thread's owner-only phase
1735 // snapshots into the whole-run histograms + the per-epoch aggregate.
1736 let mut whole_wakeup = PlatStats::default();
1737 let mut whole_request = PlatStats::default();
1738 let mut workers_sched_delay_sum = 0u64;
1739 let mut phases: std::collections::BTreeMap<u32, SchbenchPhaseStats> =
1740 std::collections::BTreeMap::new();
1741 for w in &workers {
1742 // SAFETY: every worker has joined (inner scope ended), so this is the
1743 // sole access to their cells.
1744 unsafe {
1745 workers_sched_delay_sum =
1746 workers_sched_delay_sum.saturating_add(*w.sched_delay_ns.get());
1747 for snap in (*w.phase_snapshots.get()).drain(..) {
1748 whole_wakeup.combine(&snap.wakeup);
1749 whole_request.combine(&snap.request);
1750 let e = phases.entry(snap.epoch).or_default();
1751 e.wakeup.combine(&snap.wakeup);
1752 e.request.combine(&snap.request);
1753 e.worker_run_delay_ns = e.worker_run_delay_ns.saturating_add(snap.run_delay_ns);
1754 e.worker_pcount = e.worker_pcount.saturating_add(snap.pcount);
1755 e.loop_count = e.loop_count.saturating_add(snap.loop_count);
1756 }
1757 }
1758 }
1759 // SAFETY: run_msg_thread ran on this thread inside the scope above (before
1760 // the join), so msg_td's cells are settled and this is sole access.
1761 let msg_sched_delay = unsafe { *msg_td.sched_delay_ns.get() };
1762 // SAFETY: same — sole access to msg_td's snapshot cell after the scope.
1763 unsafe {
1764 for snap in (*msg_td.phase_snapshots.get()).drain(..) {
1765 let e = phases.entry(snap.epoch).or_default();
1766 e.msg_run_delay_ns = e.msg_run_delay_ns.saturating_add(snap.run_delay_ns);
1767 e.msg_pcount = e.msg_pcount.saturating_add(snap.pcount);
1768 }
1769 }
1770 MessageThreadResult {
1771 whole_wakeup,
1772 whole_request,
1773 msg_sched_delay_ns: msg_sched_delay,
1774 workers_sched_delay_sum,
1775 phases,
1776 }
1777}
1778
1779/// Persistent /proc/stat reader state for auto-RPS, carrying the previous
1780/// cumulative totals so each call computes a delta over the interval (schbench
1781/// keeps one fd + the prior total/idle, `schbench.c` `read_busy`, `:1046`).
1782#[derive(Default)]
1783struct ReadBusyState {
1784 fd: Option<File>,
1785 prev_total: u64,
1786 prev_idle: u64,
1787}
1788
1789/// Host-busy percentage over the interval since the last call, from the
1790/// aggregate `cpu` line of /proc/stat (schbench `read_busy`,
1791/// `schbench.c:1046-1112`). `None` means "no usable reading this tick": the first
1792/// call (baseline seed, schbench's `first_run`) OR a zero jiffy-delta (two reads
1793/// in the same tick -- schbench has no guard and divides by zero into a NaN that
1794/// spuriously trips its target-hit `else` branch; we skip the tick instead). A
1795/// real reading is `Some(0.0..=100.0)`, including a genuine fully-idle `Some(0.0)`
1796/// -- distinguished from the seed via the `prev_total` state, NOT via the value,
1797/// so an idle interval still drives a grow (the bug a `busy == 0.0` test had). In
1798/// the guest VM /proc/stat covers exactly the allocated cpuset (the guest has
1799/// only `cores=N`), so this system-wide read is correctly scoped to the
1800/// workload's CPUs. f32 to match schbench's `float` arithmetic.
1801fn read_busy(s: &mut ReadBusyState) -> Option<f32> {
1802 if s.fd.is_none() {
1803 s.fd = File::open("/proc/stat").ok();
1804 }
1805 let f = s.fd.as_mut()?;
1806 f.seek(SeekFrom::Start(0)).ok()?;
1807 let mut buf = [0u8; 512];
1808 let n = f.read(&mut buf).ok()?;
1809 let text = core::str::from_utf8(&buf[..n]).ok()?;
1810 // First line only; first token must be "cpu" (the aggregate line). Sum every
1811 // numeric field -> total; field index 3 (after "cpu") is idle -- kernel
1812 // show_stat order is user/nice/system/IDLE/iowait/irq/softirq/steal/guest/
1813 // guest_nice. `split_whitespace` collapses the "cpu " double space, matching
1814 // schbench's `strtok_r` (`schbench.c:1082-1096`).
1815 let line = text.lines().next().unwrap_or("");
1816 let mut fields = line.split_whitespace();
1817 if fields.next() != Some("cpu") {
1818 return None;
1819 }
1820 let mut total = 0u64;
1821 let mut idle = 0u64;
1822 for (i, tok) in fields.enumerate() {
1823 let v: u64 = tok.parse().unwrap_or(0);
1824 if i == 3 {
1825 idle = v;
1826 }
1827 total += v;
1828 }
1829 // First call: seed the baseline, no busy% yet (`schbench.c:1098-1101`).
1830 if s.prev_total == 0 {
1831 s.prev_total = total;
1832 s.prev_idle = idle;
1833 return None;
1834 }
1835 let dt = total.saturating_sub(s.prev_total);
1836 let di = idle.saturating_sub(s.prev_idle);
1837 s.prev_total = total;
1838 s.prev_idle = idle;
1839 if dt == 0 {
1840 // Zero jiffy-delta: skip rather than NaN (see the doc; schbench diverges).
1841 return None;
1842 }
1843 Some(100.0 - (di as f32 / dt as f32) * 100.0)
1844}
1845
1846/// Grow or shrink the live per-message-thread request rate toward `target_pct`
1847/// host-busy, given an already-read `busy` percentage (schbench `auto_scale_rps`'s
1848/// scaling body, `schbench.c:1203-1250`). Pure: no I/O, so the scaling decision is
1849/// unit-tested with injected `busy` values. Stores the new per-thread rate into
1850/// `live_rate` -- schbench scales its global `requests_per_sec` (already per-thread
1851/// after `:1899`) and writes it back at `:1250`. Returns true on the transition
1852/// INTO the near-target band (when `target_hit` flips false->true), so the caller
1853/// resets the rps histogram (schbench memsets `rps_stats` on that transition). The
1854/// near-target test reads the delta AFTER its damping reassignment, exactly as
1855/// schbench does (`:1210`,`:1233`). f32 math + `ceil` (grow) / `floor` (shrink)
1856/// match schbench's float deltas + rounding.
1857fn scale_rps_for_busy(
1858 busy: f32,
1859 live_rate: &AtomicUsize,
1860 target_hit: &AtomicBool,
1861 target_pct: usize,
1862) -> bool {
1863 let target = target_pct as f32;
1864 let rps = live_rate.load(Ordering::Relaxed) as f32;
1865 let already_hit = target_hit.load(Ordering::Relaxed);
1866 let mut just_hit = false;
1867 let new_rate: usize = if busy < target {
1868 // Under target: grow (`schbench.c:1203-1224`).
1869 let mut delta = target / busy;
1870 if delta > 3.0 {
1871 delta = 3.0;
1872 } else if delta < 1.2 {
1873 delta = 1.0 + (delta - 1.0) / 8.0;
1874 // Threshold check AFTER the damping reassignment (`schbench.c:1209-1213`).
1875 if delta < 1.05 && !already_hit {
1876 just_hit = true;
1877 }
1878 } else if delta < 1.5 {
1879 delta = 1.0 + (delta - 1.0) / 4.0;
1880 }
1881 let t = (rps * delta).ceil();
1882 // Not enough capacity to reach the target -> hold (`schbench.c:1219-1224`).
1883 if t >= (1u64 << 31) as f32 {
1884 rps as usize
1885 } else {
1886 t as usize
1887 }
1888 } else if busy > target {
1889 // Over target: shrink (`schbench.c:1226-1242`).
1890 let mut delta = target / busy;
1891 if delta < 0.3 {
1892 delta = 0.3;
1893 } else if delta > 0.9 {
1894 delta += (1.0 - delta) / 8.0;
1895 // Threshold check AFTER the damping reassignment (`schbench.c:1232-1236`).
1896 if delta > 0.95 && !already_hit {
1897 just_hit = true;
1898 }
1899 } else if delta > 0.8 {
1900 delta += (1.0 - delta) / 4.0;
1901 }
1902 (rps * delta).floor().max(0.0) as usize
1903 } else {
1904 // Exactly on target (`schbench.c:1243-1248`).
1905 if !already_hit {
1906 just_hit = true;
1907 }
1908 rps as usize
1909 };
1910 live_rate.store(new_rate, Ordering::Relaxed);
1911 if just_hit {
1912 target_hit.store(true, Ordering::Relaxed);
1913 }
1914 just_hit
1915}
1916
1917/// Read host-busy from /proc/stat and scale the live rate toward the target
1918/// (schbench `auto_scale_rps`, `schbench.c:1180-1251`). The first read only seeds
1919/// the baseline (schbench's `first_run` early return, `:1201`); a zero-delta tick
1920/// is skipped (see [`read_busy`]). Returns [`scale_rps_for_busy`]'s just-hit flag.
1921fn auto_scale_rps(
1922 busy_state: &mut ReadBusyState,
1923 live_rate: &AtomicUsize,
1924 target_hit: &AtomicBool,
1925 target_pct: usize,
1926) -> bool {
1927 let Some(busy) = read_busy(busy_state) else {
1928 return false;
1929 };
1930 scale_rps_for_busy(busy, live_rate, target_hit, target_pct)
1931}
1932
1933/// Reset BOTH per-phase rps accumulators in LOCKSTEP at an auto-RPS target-hit.
1934/// schbench memsets `rps_stats` on the target-hit transition to discard the
1935/// pre-target ramp; the per-epoch map MUST reset with it, or the per-phase view
1936/// keeps ramp samples the whole-run view dropped and `Σ per-epoch == whole-run`
1937/// breaks (asymmetric-counter-bookkeeping).
1938fn reset_rps_accumulators(
1939 whole: &mut PlatStats,
1940 per_epoch: &mut std::collections::BTreeMap<u32, PlatStats>,
1941) {
1942 *whole = PlatStats::default();
1943 per_epoch.clear();
1944}
1945
1946/// The once-per-second control thread, schbench's main control loop
1947/// (`schbench.c:1756-1827`): sample the achieved per-second RPS into `rps_stats`
1948/// and -- when auto-RPS is on -- adjust the live rate toward the target. Returns
1949/// BOTH the whole-run `rps_stats` AND a per-epoch RPS map (the same samples
1950/// bucketed by the phase epoch observed at sample time), moved out on join. The
1951/// whole-run `rps_stats` is byte-identical to the pre-per-phase behavior -- the
1952/// per-epoch map is PURELY ADDITIVE (each sample lands in the whole-run histogram
1953/// AND exactly one epoch bucket), so `Σ per-epoch == whole-run`. `phase_epoch` is
1954/// read Relaxed (telemetry, gates no coupled memory); a 1s sample straddling an
1955/// epoch change is attributed wholly to the sample-time epoch (bounded fuzz,
1956/// mirroring the worker drain-on-change). The 1-second cadence is the workload's
1957/// defined sampling/scaling interval, not a synchronization wait; it observes
1958/// `stop` after each interval.
1959fn control_loop(
1960 progress: &AtomicU64,
1961 stop: &AtomicBool,
1962 config: &SchbenchConfig,
1963 live_rate: &AtomicUsize,
1964 target_hit: &AtomicBool,
1965 phase_epoch: Option<&AtomicU32>,
1966) -> (PlatStats, std::collections::BTreeMap<u32, PlatStats>) {
1967 let mut rps_stats = PlatStats::default();
1968 let mut per_epoch_rps: std::collections::BTreeMap<u32, PlatStats> =
1969 std::collections::BTreeMap::new();
1970 let mut last_loop = 0u64;
1971 let mut last_t = monotonic_nanos();
1972 let mut busy_state = ReadBusyState::default();
1973 while !stop.load(Ordering::Acquire) {
1974 // Sleep ~1s between samples, but in STOP_POLL_QUANTUM_NS chunks that
1975 // re-check `stop`, so a shutdown is observed within a quantum instead of
1976 // blocking run()'s teardown for a full second (the control thread is joined
1977 // by run(), so an uncapped sleep here is the dominant teardown latency —
1978 // worse than, and unconditional unlike, the RPS injector's pacing sleep).
1979 let sleep_target = monotonic_nanos().saturating_add(1_000_000_000);
1980 loop {
1981 if stop.load(Ordering::Acquire) {
1982 break;
1983 }
1984 let now = monotonic_nanos();
1985 if now >= sleep_target {
1986 break;
1987 }
1988 std::thread::sleep(std::time::Duration::from_nanos(
1989 (sleep_target - now).min(STOP_POLL_QUANTUM_NS),
1990 ));
1991 }
1992 // Sample the just-elapsed window BEFORE re-checking stop at the loop top,
1993 // so the in-progress second is recorded rather than dropped. `dt` is the
1994 // ACTUAL elapsed time (now - last_t).
1995 //
1996 // schbench samples-first then sleeps the full second (`sleep(1)` only
1997 // `if (!done)`, schbench.c:1827), so every schbench sample -- including the
1998 // boundary one taken when `done` is detected -- spans a real ~1s window
1999 // (rps over a real `delta = tvdelta(..)`, schbench.c:1774,1777). ktstr
2000 // sleeps-first/samples-last with a BOUNDED sleep, which diverges two ways:
2001 // - On early `stop` this boundary sample spans a true PARTIAL window: for
2002 // a substantial window (>= one quantum) the real rate over the
2003 // sub-window the workers were active (more granular than schbench, which
2004 // would have waited the full second); a degenerate sub-quantum window is
2005 // dropped by `control_loop_rps_sample` (a count/dt rate is unbounded as
2006 // dt->0). The bounded sleep is a ktstr scenario-teardown requirement
2007 // schbench does not face.
2008 // - The sleep-first shape omits schbench's tiny FIRST-window sample
2009 // (schbench reads `now` at loop start before any work, schbench.c:1757,
2010 // so its first `delta` is ~us); ktstr's first sample follows a full ~1s
2011 // sleep.
2012 let now = monotonic_nanos();
2013 let lc = progress.load(Ordering::Relaxed);
2014 let dt = now.saturating_sub(last_t);
2015 // Achieved per-second rate = Δcompleted-cycles / Δt (schbench
2016 // `combine_message_thread_rps` over the shared loop count, schbench.c:1777),
2017 // floored at one quantum by `control_loop_rps_sample` (a ktstr teardown
2018 // guard, not in schbench — see its doc).
2019 if let Some(sample) = control_loop_rps_sample(lc.saturating_sub(last_loop), dt) {
2020 // Gate: for auto-RPS, sample only once the target is hit
2021 // (`schbench.c:1781`); fixed/default mode samples every second.
2022 if config.auto_rps == 0 || target_hit.load(Ordering::Relaxed) {
2023 rps_stats.add_lat(sample);
2024 // Per-phase parity: also bucket the sample by the epoch observed
2025 // NOW (the sample-time epoch; a straddling 1s sample lands wholly
2026 // here, bounded fuzz). 0/u32::MAX sentinel epochs route into the
2027 // same all_phases entries the host discards (the run() merge).
2028 let epoch = phase_epoch.map_or(0, |e| e.load(Ordering::Relaxed));
2029 per_epoch_rps.entry(epoch).or_default().add_lat(sample);
2030 }
2031 }
2032 last_loop = lc;
2033 last_t = now;
2034 if config.auto_rps != 0 {
2035 let just_hit = auto_scale_rps(&mut busy_state, live_rate, target_hit, config.auto_rps);
2036 if just_hit {
2037 // Discard ALL pre-target ramp samples (schbench memsets rps_stats
2038 // on the target-hit transition -- grow/shrink/on-target bands all
2039 // funnel through just_hit, `schbench.c:1212`/`:1235`/`:1247`). The
2040 // per-epoch map resets in LOCKSTEP so the per-phase view never
2041 // includes ramp samples the whole-run view discards (the
2042 // Σ-per-epoch == whole-run invariant survives the reset).
2043 reset_rps_accumulators(&mut rps_stats, &mut per_epoch_rps);
2044 }
2045 }
2046 }
2047 (rps_stats, per_epoch_rps)
2048}
2049
2050/// Achieved per-second rate sample for one control-loop window:
2051/// Δcompleted-cycles over `dt_ns`, in cycles/sec, clamped to `u32`. `None` when
2052/// the window is shorter than one `STOP_POLL_QUANTUM_NS` quantum.
2053///
2054/// The floor is a correctness guard, not a nicety. The bounded teardown sleep in
2055/// `control_loop` can return on `stop` with a sub-millisecond `dt` (the window
2056/// between a sample's `last_t = now` and the next iteration's first stop check),
2057/// and a count/dt RATE is unbounded as `dt -> 0` (a single completion in a 1us
2058/// window reads as ~1e6 cycles/sec), so an unfloored sub-quantum sample would
2059/// spike `rps_max` and the top rps percentiles -- assertable metrics. A real
2060/// partial window is always >= one quantum (the sleep loop only observes `stop`
2061/// at a quantum boundary), so the floor drops only the degenerate race window; an
2062/// admitted window bounds the sample to `<= delta_loops * 1e9 /
2063/// STOP_POLL_QUANTUM_NS` (~ the real rate, no inflation). This diverges from
2064/// schbench's `isfinite(rps)` guard (schbench.c:1782), which catches only an
2065/// exact-zero delta -- a finite-but-huge rate from a tiny window slips past it,
2066/// but schbench's full-second cadence (schbench.c:1827) never produces one.
2067fn control_loop_rps_sample(delta_loops: u64, dt_ns: u64) -> Option<u32> {
2068 if dt_ns < STOP_POLL_QUANTUM_NS {
2069 return None;
2070 }
2071 let rps = delta_loops as f64 * 1e9 / dt_ns as f64;
2072 Some((rps as u64).min(u32::MAX as u64) as u32)
2073}
2074
2075/// Run the schbench workload until `stop` is set, returning the whole-run
2076/// percentiles + achieved rate AND the per-phase aggregates. `progress` is the
2077/// live count of completed work cycles across all workers. `phase_epoch` (when
2078/// `Some`) is the scenario engine's shared per-phase generation; the engine
2079/// splits its histograms at each transition (see [`worker_loop`]).
2080pub(crate) fn run(
2081 config: &SchbenchConfig,
2082 stop: &AtomicBool,
2083 progress: &AtomicU64,
2084 phase_epoch: Option<&AtomicU32>,
2085) -> SchbenchOutcome {
2086 let (allowed_count, lock_array_size) = resolve_cpu_topology();
2087 let worker_threads = config.resolve_worker_count(allowed_count);
2088 let locks = if config.skip_locking {
2089 None
2090 } else {
2091 Some(PerCpuLocks::new(lock_array_size))
2092 };
2093 // schbench rejects a `--split` outside 0..=100 at parse and exits
2094 // (schbench.c:362-365). ktstr's analog is a loud panic here at the
2095 // consumption boundary -- before any matrix sizing or the shared Arc
2096 // allocation below -- so an out-of-range config (a `pub split_percent` set
2097 // via struct literal bypasses the builder's debug_assert) can never
2098 // underflow `100 - p` (a release `usize` wrap that would size a garbage
2099 // shared matrix and OOM the allocation) or silently mis-split.
2100 if let Some(p) = config.split_percent {
2101 assert!(
2102 p <= 100,
2103 "schbench split_percent must be in 0..=100, got {p} (schbench.c:362-365 rejects out of range)"
2104 );
2105 }
2106 // schbench.c:1871-1872: ONE process-global shared matrix for `--split`,
2107 // allocated once and shared (interior-mutable AtomicU64) by every worker
2108 // across all message threads. Empty when not splitting (`None`) or when the
2109 // shared portion rounds to 0 (split=100): the `shared_matrix_size > 0`
2110 // guard in do_work then skips the shared branch, so the empty slice is
2111 // never indexed.
2112 let shared_matrix_size = config.shared_matrix_size();
2113 let shared: std::sync::Arc<[core::sync::atomic::AtomicU64]> = if shared_matrix_size > 0 {
2114 (0..3 * shared_matrix_size * shared_matrix_size)
2115 .map(|_| core::sync::atomic::AtomicU64::new(0))
2116 .collect()
2117 } else {
2118 std::sync::Arc::from([] as [core::sync::atomic::AtomicU64; 0])
2119 };
2120
2121 let start = monotonic_nanos();
2122 let mut all_wakeup = PlatStats::default();
2123 let mut all_request = PlatStats::default();
2124 let mut total_msg_sched_delay = 0u64;
2125 let mut total_worker_sched_delay = 0u64;
2126 let mut all_phases: std::collections::BTreeMap<u32, SchbenchPhaseStats> =
2127 std::collections::BTreeMap::new();
2128
2129 // The live PER-MESSAGE-THREAD request rate the RPS injectors read each second
2130 // -- schbench's global `requests_per_sec` AFTER its startup `/= message_threads`
2131 // (`schbench.c:1899`), which is exactly the value `auto_scale_rps` scales
2132 // (`schbench.c:1250`) and each `run_rps_thread` injects directly with no
2133 // further division (`schbench.c:1290`). Seeded from `rps_per_message_thread()`
2134 // (the -R total / m, or the auto-RPS seed 10 / m). The control thread
2135 // auto-scales it when auto-RPS is on; otherwise it stays constant -- a fixed
2136 // -R then injects that seed rate unchanged. `target_hit` gates the rps_stats sampling
2137 // for auto-RPS (`schbench.c:1781`); `rps_stats` is moved out of the control
2138 // thread on join. `live_rate` is read back after the scope as the auto-RPS
2139 // "final rps goal" (`schbench.c:1995`, per-thread * m).
2140 //
2141 // Ordering: both atomics are lone scalars that gate NO coupled memory --
2142 // `live_rate` is a plain count the RPS thread loops on (the requests it
2143 // enqueues publish via the Treiber stack's own AcqRel), and `target_hit` is
2144 // touched only by the control thread. So Relaxed is correct (a 1-second-stale
2145 // read is benign within the auto-RPS cadence; schbench reads its global with
2146 // no sync at all). The final read after the scope is synchronized by the join.
2147 let live_rate = AtomicUsize::new(config.rps_per_message_thread());
2148 let target_hit = AtomicBool::new(false);
2149 let mut rps_stats = PlatStats::default();
2150
2151 std::thread::scope(|outer| {
2152 let handles: Vec<_> = (0..config.message_threads)
2153 .map(|_| {
2154 let locks = locks.as_ref();
2155 let live_rate = &live_rate;
2156 let shared = &shared;
2157 outer.spawn(move || {
2158 run_one_message_thread(
2159 worker_threads,
2160 locks,
2161 config,
2162 stop,
2163 progress,
2164 phase_epoch,
2165 live_rate,
2166 &shared[..],
2167 shared_matrix_size,
2168 )
2169 })
2170 })
2171 .collect();
2172 // Control thread: schbench's main control loop -- once/sec, sample the
2173 // achieved RPS into rps_stats and (for auto-RPS) auto-scale `live_rate`.
2174 let control = outer
2175 .spawn(|| control_loop(progress, stop, config, &live_rate, &target_hit, phase_epoch));
2176 for h in handles {
2177 let mtr = h.join().expect("schbench message thread panicked");
2178 all_wakeup.combine(&mtr.whole_wakeup);
2179 all_request.combine(&mtr.whole_request);
2180 total_msg_sched_delay = total_msg_sched_delay.saturating_add(mtr.msg_sched_delay_ns);
2181 total_worker_sched_delay =
2182 total_worker_sched_delay.saturating_add(mtr.workers_sched_delay_sum);
2183 for (epoch, sps) in mtr.phases {
2184 all_phases.entry(epoch).or_default().merge(&sps);
2185 }
2186 }
2187 let (whole_rps, control_per_epoch_rps) =
2188 control.join().expect("schbench control thread panicked");
2189 rps_stats = whole_rps;
2190 // Merge the control thread's per-epoch RPS into the per-phase aggregate
2191 // EXACTLY ONCE, post-join, OUTSIDE the per-message-thread loop above (else
2192 // it would double-count by message_threads). The control thread is the
2193 // SOLE rps source; the workers contribute empty rps via mtr.phases.
2194 for (epoch, hist) in control_per_epoch_rps {
2195 all_phases.entry(epoch).or_default().rps.combine(&hist);
2196 }
2197 });
2198
2199 // The auto-RPS "final rps goal" (`schbench.c:1995`): the live per-thread rate
2200 // at exit * message_threads. Read after the scope (all threads joined, so the
2201 // control loop's last store is visible).
2202 let final_rps_goal = live_rate.load(Ordering::Relaxed) * config.message_threads;
2203 let loop_count = progress.load(Ordering::Relaxed);
2204 let elapsed_ns = monotonic_nanos().saturating_sub(start);
2205 let achieved_rps = if elapsed_ns > 0 {
2206 loop_count as f64 / (elapsed_ns as f64 / 1e9)
2207 } else {
2208 0.0
2209 };
2210 // Average the per-thread run-queue waits, matching schbench's
2211 // collect_sched_delay (`schbench.c:1673-1674`): message delay over
2212 // message_threads, worker delay over all workers. (Whole-run mean-of-means;
2213 // the per-phase path uses sample-weighted Σrd/Σpc — see SchbenchPhaseStats.)
2214 let sched_delay_msg_ns = total_msg_sched_delay / (config.message_threads.max(1) as u64);
2215 let total_workers = (config.message_threads * worker_threads).max(1) as u64;
2216 let sched_delay_worker_ns = total_worker_sched_delay / total_workers;
2217
2218 SchbenchOutcome {
2219 whole_run: SchbenchResult {
2220 wakeup: all_wakeup.percentiles(),
2221 request: all_request.percentiles(),
2222 rps: rps_stats.percentiles(),
2223 loop_count,
2224 nr_workers: total_workers as usize,
2225 achieved_rps,
2226 final_rps_goal,
2227 sched_delay_msg_ns,
2228 sched_delay_worker_ns,
2229 },
2230 phases: all_phases.into_iter().collect(),
2231 }
2232}
2233
2234#[cfg(test)]
2235mod tests {
2236 use super::*;
2237
2238 /// Lightweight node for stack stress tests (ThreadData is far heavier —
2239 /// two boxed `PlatStats` histograms plus per-thread state — too heavy to
2240 /// allocate by the thousand).
2241 struct TestNode {
2242 next: AtomicPtr<TestNode>,
2243 }
2244 impl Linked for TestNode {
2245 fn next_link(&self) -> &AtomicPtr<Self> {
2246 &self.next
2247 }
2248 }
2249 impl TestNode {
2250 fn new() -> Self {
2251 Self {
2252 next: AtomicPtr::new(ptr::null_mut()),
2253 }
2254 }
2255 }
2256
2257 #[test]
2258 fn resolve_worker_count_divides_cpuset_across_message_threads() {
2259 // Explicit non-zero worker_threads is honored as-is (per message thread).
2260 let c = SchbenchConfig::default()
2261 .worker_threads(3)
2262 .message_threads(4);
2263 assert_eq!(c.resolve_worker_count(8), 3);
2264
2265 // 0-default mirrors schbench's ceil(cpuset_cpus / message_threads)
2266 // (schbench.c:1849-1852), scoped to the cpuset count:
2267 // m=1: 8/1 = 8 per thread, total 8 (== the cpuset count; no divide).
2268 let c = SchbenchConfig::default().message_threads(1);
2269 assert_eq!(c.resolve_worker_count(8), 8);
2270 // m=2: 8/2 = 4 per thread, total 8.
2271 let c = SchbenchConfig::default().message_threads(2);
2272 assert_eq!(c.resolve_worker_count(8), 4);
2273 // m=3: ceil(8/3) = 3 per thread, total 9 (≈ 8; schbench rounds up too).
2274 let c = SchbenchConfig::default().message_threads(3);
2275 assert_eq!(c.resolve_worker_count(8), 3);
2276 // message_threads floored at 1 — no div-by-zero.
2277 let c = SchbenchConfig::default().message_threads(0);
2278 assert_eq!(c.resolve_worker_count(8), 8);
2279 }
2280
2281 #[test]
2282 fn matrix_size_matches_schbench_formula() {
2283 // sqrt(256*1024/3/8) = sqrt(10922) = 104.
2284 assert_eq!(
2285 SchbenchConfig {
2286 cache_footprint_kib: 256,
2287 operations: 5,
2288 ..Default::default()
2289 }
2290 .matrix_size(),
2291 104
2292 );
2293 // Zero operations -> no matrix work.
2294 assert_eq!(
2295 SchbenchConfig {
2296 operations: 0,
2297 ..Default::default()
2298 }
2299 .matrix_size(),
2300 0
2301 );
2302 }
2303
2304 #[test]
2305 fn split_matrix_sizes_match_schbench_formula() {
2306 // schbench.c:1858-1863: with --split=p the footprint splits into a
2307 // private matrix (p%) and a shared matrix (100-p%); each dimension is
2308 // sqrt(kib*frac/100 * 1024/3/sizeof(ulong)). None => the legacy single
2309 // all-private matrix (full footprint) and no shared matrix.
2310 let cfg = |split| SchbenchConfig {
2311 cache_footprint_kib: 256,
2312 operations: 5,
2313 split_percent: split,
2314 ..Default::default()
2315 };
2316 // None: private == the full-footprint matrix_size (104), no shared.
2317 assert_eq!(cfg(None).private_matrix_size(), 104);
2318 assert_eq!(cfg(None).shared_matrix_size(), 0);
2319 // 0%: nothing private; the whole footprint is the shared matrix.
2320 assert_eq!(cfg(Some(0)).private_matrix_size(), 0);
2321 assert_eq!(cfg(Some(0)).shared_matrix_size(), 104);
2322 // 25%: 64 KiB private (sqrt(64*1024/3/8)=52), 192 KiB shared (=90).
2323 assert_eq!(cfg(Some(25)).private_matrix_size(), 52);
2324 assert_eq!(cfg(Some(25)).shared_matrix_size(), 90);
2325 // 100%: all private (full footprint), no shared work.
2326 assert_eq!(cfg(Some(100)).private_matrix_size(), 104);
2327 assert_eq!(cfg(Some(100)).shared_matrix_size(), 0);
2328 // Zero operations -> no matrix work either way.
2329 let none_ops = SchbenchConfig {
2330 operations: 0,
2331 split_percent: Some(50),
2332 ..Default::default()
2333 };
2334 assert_eq!(none_ops.private_matrix_size(), 0);
2335 assert_eq!(none_ops.shared_matrix_size(), 0);
2336 }
2337
2338 #[test]
2339 fn ops_split_matches_schbench_integer_division() {
2340 // schbench.c:1391-1392: ops_private = operations*split/100 (integer
2341 // truncation), ops_shared = operations - ops_private. Returns
2342 // (ops_shared, ops_private).
2343 assert_eq!(ops_split(5, 0), (5, 0)); // all shared
2344 assert_eq!(ops_split(5, 25), (4, 1)); // 5*25/100 = 1 private
2345 assert_eq!(ops_split(5, 50), (3, 2)); // 5*50/100 = 2 private
2346 assert_eq!(ops_split(5, 100), (0, 5)); // all private
2347 }
2348
2349 #[test]
2350 fn engine_split_runs_shared_matrix_concurrently() {
2351 // --split with two workers: both concurrently multiply into the ONE
2352 // process-global shared AtomicU64 matrix (per-k stores to the shared C
2353 // cells -- the deliberate cache contention schbench models). Reaching the
2354 // assertion proves the shared-matrix path
2355 // neither deadlocks nor trips a data-race trap -- the atomics make the
2356 // shared race sound (vs schbench's plain shared-memory race). operations=8
2357 // @ split=50 => 4 shared + 4 private ops/cycle, so both paths run. Stop is
2358 // event-driven (spin on the progress counter, not a sleep).
2359 let config = SchbenchConfig {
2360 message_threads: 1,
2361 worker_threads: 2,
2362 cache_footprint_kib: 16,
2363 operations: 8,
2364 sleep_usec: 0,
2365 skip_locking: false,
2366 requests_per_sec: 0,
2367 auto_rps: 0,
2368 split_percent: Some(50),
2369 pipe_transfer_bytes: 0,
2370 };
2371 // Sanity: both split matrices are non-empty at this footprint, so the
2372 // test exercises shared + private work, not a degenerate 0-dim path.
2373 assert!(config.shared_matrix_size() > 0, "shared matrix present");
2374 assert!(config.private_matrix_size() > 0, "private matrix present");
2375 let stop = AtomicBool::new(false);
2376 let progress = AtomicU64::new(0);
2377 let outcome = std::thread::scope(|s| {
2378 let runner = s.spawn(|| run(&config, &stop, &progress, None));
2379 while progress.load(Ordering::Relaxed) < 50 {
2380 core::hint::spin_loop();
2381 }
2382 stop.store(true, Ordering::Release);
2383 runner.join().expect("run panicked")
2384 });
2385 assert!(
2386 outcome.whole_run.loop_count >= 50,
2387 "engine did split work: {}",
2388 outcome.whole_run.loop_count
2389 );
2390 }
2391
2392 #[test]
2393 fn pipe_fill_writes_every_byte_and_sizes_the_page() {
2394 // pipe_fill is the per-cycle memory transfer (schbench's pipe_page memset,
2395 // schbench.c:980-981/:1003-1004): it must touch EVERY byte of a
2396 // pipe_bytes-sized page. The bytes are never read back, so a dead-store
2397 // elision would silently zero the workload -- the black_box in pipe_fill
2398 // prevents that; this pins the observable effect (every byte written).
2399 let td = ThreadData::new(64);
2400 assert_eq!(td.pipe_bytes, 64);
2401 // SAFETY: single-threaded test, exclusive access to this td's page.
2402 unsafe { td.pipe_fill(2) };
2403 // SAFETY: same thread; no concurrent access.
2404 assert_eq!(
2405 unsafe { &*td.pipe_page.get() }.len(),
2406 64,
2407 "page sized to pipe_bytes"
2408 );
2409 assert!(
2410 unsafe { &*td.pipe_page.get() }.iter().all(|&b| b == 2),
2411 "worker fill (2) touches every byte"
2412 );
2413 // SAFETY: exclusive.
2414 unsafe { td.pipe_fill(1) };
2415 assert!(
2416 unsafe { &*td.pipe_page.get() }.iter().all(|&b| b == 1),
2417 "waker fill (1) overwrites every byte"
2418 );
2419 // Non-pipe ThreadData (pipe_bytes == 0) allocates an empty page: no transfer.
2420 let none = ThreadData::new(0);
2421 assert_eq!(none.pipe_bytes, 0);
2422 // SAFETY: exclusive.
2423 assert_eq!(unsafe { &*none.pipe_page.get() }.len(), 0);
2424 }
2425
2426 #[test]
2427 fn pipe_transfer_bytes_clamps_to_one_mib() {
2428 // schbench clamps -p to PIPE_TRANSFER_BUFFER (schbench.c:41,291-294); run()
2429 // applies the same `.min()` before sizing each worker's page.
2430 assert_eq!(PIPE_TRANSFER_BUFFER, 1024 * 1024);
2431 assert_eq!(
2432 (2 * PIPE_TRANSFER_BUFFER).min(PIPE_TRANSFER_BUFFER),
2433 PIPE_TRANSFER_BUFFER,
2434 "over-cap pipe size clamps to the 1 MiB buffer"
2435 );
2436 // At/under-cap values pass through unchanged.
2437 assert_eq!(4096_usize.min(PIPE_TRANSFER_BUFFER), 4096);
2438 }
2439
2440 #[test]
2441 fn pipe_transfer_report_is_per_worker_scales_and_clamps() {
2442 // PER-WORKER: ops/sec = achieved_rps / nr_workers
2443 // (schbench.c:1942-1943 divide by Σ worker runtimes ≈ nr_workers*elapsed).
2444 // 2048 aggregate cycles/sec / 2 workers = 1024 per-worker ops/sec.
2445 let r = pipe_transfer_report(2048.0, 1, 2);
2446 assert_eq!(r.ops_per_sec, 1024.0);
2447 // pretty_size (schbench.c:1606-1620): 1024 per-worker B/s -> 1.00 KB/s.
2448 assert_eq!((r.scaled, r.unit), (1.0, "KB"));
2449 // 1 worker, 1 B/cycle @ 1 cycle/s -> 1.00 B/s.
2450 let one = pipe_transfer_report(1.0, 1, 1);
2451 assert_eq!((one.ops_per_sec, one.scaled, one.unit), (1.0, 1.0, "B"));
2452 // 1 MiB/cycle @ 1 cycle/s/worker -> 1.00 MB/s.
2453 assert_eq!(pipe_transfer_report(1.0, 1 << 20, 1).unit, "MB");
2454 // CLAMP: an over-cap pipe size reports the clamped 1 MiB, not
2455 // the requested 2 MiB -- the engine moves only the clamped size per cycle
2456 // (schbench clamps at parse, schbench.c:291-294).
2457 let over = pipe_transfer_report(1.0, 2 << 20, 1);
2458 let cap = pipe_transfer_report(1.0, 1 << 20, 1);
2459 assert_eq!(
2460 (over.scaled, over.unit),
2461 (cap.scaled, cap.unit),
2462 "over-cap pipe size clamps to 1 MiB"
2463 );
2464 // EB cap: a huge byte rate never scales past the last unit (no overflow).
2465 assert_eq!(pipe_transfer_report(f64::MAX, 1 << 20, 1).unit, "EB");
2466 // nr_workers 0 -> floored at 1, never a division by zero.
2467 assert_eq!(pipe_transfer_report(100.0, 4096, 0).ops_per_sec, 100.0);
2468 }
2469
2470 #[test]
2471 fn engine_pipe_mode_transfers_without_matrix_work() {
2472 // Pipe mode (-p): the worker loop skips do_work + the think-sleep, and the
2473 // per-cycle work is the cross-thread pipe_page handshake -- the worker
2474 // memsets its page to 2 before blocking, the waker memsets it to 1 on wake
2475 // (schbench.c:980-981/:1003-1004). Two workers exercise the concurrent
2476 // waker->worker page writes. Reaching the assertions proves the handshake
2477 // runs to completion: no deadlock and no data-race trap on the cross-thread
2478 // `pipe_page` UnsafeCell (the wait-list + futex ordering keeps it race-free,
2479 // see the Sync SAFETY note). `operations`/`cache_footprint_kib` are set
2480 // LARGE on purpose: if do_work erroneously ran, every cycle would log a
2481 // multi-us request-latency sample -- the second assertion would then fail.
2482 // Stop is event-driven (spin on the progress counter, not a sleep).
2483 let config = SchbenchConfig::default()
2484 .message_threads(1)
2485 .worker_threads(2)
2486 .sleep_usec(0)
2487 .operations(50)
2488 .cache_footprint_kib(256)
2489 .pipe_transfer_bytes(4096);
2490 assert!(config.pipe_transfer_bytes > 0, "pipe mode engaged");
2491 assert!(
2492 config.matrix_size() > 0,
2493 "matrix work would be non-trivial if do_work ran (regression guard)"
2494 );
2495 let stop = AtomicBool::new(false);
2496 let progress = AtomicU64::new(0);
2497 let outcome = std::thread::scope(|s| {
2498 let runner = s.spawn(|| run(&config, &stop, &progress, None));
2499 while progress.load(Ordering::Relaxed) < 50 {
2500 core::hint::spin_loop();
2501 }
2502 stop.store(true, Ordering::Release);
2503 runner.join().expect("run panicked")
2504 });
2505 // loop_count accrues one per completed transfer cycle (schbench.c:1479),
2506 // proving the pipe handshake + memsets ran end-to-end without deadlock.
2507 assert!(
2508 outcome.whole_run.loop_count >= 50,
2509 "pipe engine transferred {} cycles",
2510 outcome.whole_run.loop_count
2511 );
2512 // Discriminating assertion: do_work is SKIPPED, so the
2513 // back-to-back work_start->now span yields sub-us request deltas that the
2514 // `delta_us > 0` filter drops -- request stats stay ~empty. If matrix work
2515 // erroneously ran at operations=50/256KiB, nearly every cycle would record a
2516 // >0 request delta and request samples would approach loop_count, failing
2517 // this. (A rare us-straddle may record a handful; `* 4 < loop_count` tolerates
2518 // that while still catching a real do_work regression.)
2519 assert!(
2520 outcome.whole_run.request.nr_samples * 4 < outcome.whole_run.loop_count,
2521 "pipe mode skips matrix work: {} request samples vs {} cycles",
2522 outcome.whole_run.request.nr_samples,
2523 outcome.whole_run.loop_count
2524 );
2525 }
2526
2527 #[test]
2528 #[should_panic(expected = "split_percent must be in 0..=100")]
2529 fn engine_panics_on_out_of_range_split() {
2530 // Regression pin for the out-of-range finding: a `pub split_percent` set
2531 // past 100 via STRUCT LITERAL (the path that bypasses the builder's
2532 // debug_assert) must fail LOUDLY at the run() consumption boundary in
2533 // every build profile -- the analog of schbench exiting on a bad
2534 // `--split` -- never silently underflow `100 - p` into a garbage-huge
2535 // shared matrix. The assert fires before any thread spawn or allocation,
2536 // so this is cheap (`stop` is pre-set just in case).
2537 let config = SchbenchConfig {
2538 message_threads: 1,
2539 worker_threads: 1,
2540 cache_footprint_kib: 16,
2541 operations: 1,
2542 sleep_usec: 0,
2543 skip_locking: false,
2544 requests_per_sec: 0,
2545 auto_rps: 0,
2546 split_percent: Some(101),
2547 pipe_transfer_bytes: 0,
2548 };
2549 let stop = AtomicBool::new(true);
2550 let progress = AtomicU64::new(0);
2551 let _ = run(&config, &stop, &progress, None);
2552 }
2553
2554 #[test]
2555 fn stack_add_splice_is_lifo() {
2556 let a = TestNode::new();
2557 let b = TestNode::new();
2558 let stack: TreiberStack<TestNode> = TreiberStack::new();
2559 assert!(stack.splice().is_null(), "empty stack splices to null");
2560 stack.add(&a as *const _ as *mut _);
2561 stack.add(&b as *const _ as *mut _);
2562 // LIFO: b (pushed last) is the head; a follows.
2563 let head = stack.splice();
2564 assert_eq!(head.cast_const(), &b as *const TestNode);
2565 // SAFETY: head -> b (alive on stack); its link -> a.
2566 let second = unsafe { (*head).next.load(Ordering::Acquire) };
2567 assert_eq!(second.cast_const(), &a as *const TestNode);
2568 // SAFETY: second -> a; its link is null.
2569 assert!(unsafe { (*second).next.load(Ordering::Acquire) }.is_null());
2570 assert!(stack.splice().is_null(), "splice emptied the stack");
2571 }
2572
2573 #[test]
2574 fn stack_concurrent_add_loses_no_nodes() {
2575 // N threads each concurrently push K distinct nodes; after all join, a
2576 // single splice must return exactly N*K nodes. A broken CAS (lost
2577 // update / ABA) would drop nodes and the count would be short. Push is
2578 // deterministic (each node pushed once), so this never hangs.
2579 const THREADS: usize = 8;
2580 const PER_THREAD: usize = 2000;
2581 let nodes: Vec<TestNode> = (0..THREADS * PER_THREAD).map(|_| TestNode::new()).collect();
2582 let stack = TreiberStack::new();
2583
2584 std::thread::scope(|s| {
2585 for chunk in nodes.chunks(PER_THREAD) {
2586 let stack = &stack;
2587 s.spawn(move || {
2588 for n in chunk {
2589 stack.add(n as *const TestNode as *mut TestNode);
2590 }
2591 });
2592 }
2593 });
2594
2595 // All pushes complete (scope joined). Drain and count distinct nodes.
2596 let mut seen = std::collections::HashSet::new();
2597 let mut cur = stack.splice();
2598 while !cur.is_null() {
2599 assert!(seen.insert(cur), "node observed twice");
2600 // SAFETY: cur is one of the live `nodes`; next is its link.
2601 cur = unsafe { (*cur).next.load(Ordering::Acquire) };
2602 }
2603 assert_eq!(
2604 seen.len(),
2605 THREADS * PER_THREAD,
2606 "no node lost under contention"
2607 );
2608 }
2609
2610 #[test]
2611 fn engine_runs_and_produces_latency_samples() {
2612 // Small topology; run until ~50 work cycles complete, then stop. The
2613 // test completing proves the shutdown does not hang (a scope-join
2614 // shutdown bug would block here until the nextest timeout). Stop is
2615 // event-driven: spin on the shared progress counter, not a sleep.
2616 let config = SchbenchConfig {
2617 message_threads: 1,
2618 worker_threads: 2,
2619 cache_footprint_kib: 16,
2620 operations: 1,
2621 sleep_usec: 0,
2622 skip_locking: false,
2623 requests_per_sec: 0,
2624 auto_rps: 0,
2625 split_percent: None,
2626 pipe_transfer_bytes: 0,
2627 };
2628 let stop = AtomicBool::new(false);
2629 let progress = AtomicU64::new(0);
2630 let outcome = std::thread::scope(|s| {
2631 let runner = s.spawn(|| run(&config, &stop, &progress, None));
2632 while progress.load(Ordering::Relaxed) < 50 {
2633 core::hint::spin_loop();
2634 }
2635 stop.store(true, Ordering::Release);
2636 runner.join().expect("run panicked")
2637 });
2638 let result = &outcome.whole_run;
2639 assert!(
2640 result.loop_count >= 50,
2641 "engine did work: {}",
2642 result.loop_count
2643 );
2644 assert!(result.wakeup.nr_samples > 0, "wakeup samples recorded");
2645 assert!(result.request.nr_samples > 0, "request samples recorded");
2646 assert!(result.achieved_rps > 0.0, "positive achieved rps");
2647 // Non-phasic (phase_epoch None): the only snapshot is the baseline
2648 // epoch 0, which the host discards. No per-phase metrics are emitted.
2649 assert_eq!(
2650 outcome.phases.len(),
2651 1,
2652 "non-phasic => single baseline phase"
2653 );
2654 assert_eq!(outcome.phases[0].0, 0, "the lone phase is BASELINE epoch 0");
2655 }
2656
2657 #[test]
2658 fn engine_terminates_when_lone_worker_stops() {
2659 // Regression for the shutdown deadlock: with a SINGLE worker there is no
2660 // second worker to post the message thread, so a worker that exits on
2661 // `stop` while the message thread is parked in `wait_forever` must wake it
2662 // (the unconditional post at worker_loop's exit), or run() hangs forever.
2663 // Reaching the assertion at all proves run() returned. `sleep_usec` 0
2664 // keeps the lone worker almost always mid-`do_work` when stop fires (the
2665 // deadlock-prone window). engine_runs / engine_splits use 2 workers and
2666 // missed this -- a second worker posting the message thread masks the bug;
2667 // a hang here is the nextest timeout, exactly how the bug first surfaced.
2668 let config = SchbenchConfig {
2669 message_threads: 1,
2670 worker_threads: 1,
2671 cache_footprint_kib: 256,
2672 operations: 5,
2673 sleep_usec: 0,
2674 skip_locking: false,
2675 requests_per_sec: 0,
2676 auto_rps: 0,
2677 split_percent: None,
2678 pipe_transfer_bytes: 0,
2679 };
2680 let stop = AtomicBool::new(false);
2681 let progress = AtomicU64::new(0);
2682 let outcome = std::thread::scope(|s| {
2683 let runner = s.spawn(|| run(&config, &stop, &progress, None));
2684 while progress.load(Ordering::Relaxed) < 10 {
2685 core::hint::spin_loop();
2686 }
2687 stop.store(true, Ordering::Release);
2688 // Deadlocks here on regression: the lone worker exits without waking
2689 // the parked message thread, so this join never returns.
2690 runner.join().expect("run panicked")
2691 });
2692 assert!(
2693 outcome.whole_run.loop_count >= 10,
2694 "engine did work and returned: {}",
2695 outcome.whole_run.loop_count
2696 );
2697 }
2698
2699 #[test]
2700 fn engine_rps_mode_injects_drains_and_terminates() {
2701 // RPS-injector mode (requests_per_sec != 0): a dedicated thread enqueues
2702 // requests round-robin across the workers, which splice + drain their
2703 // queues instead of the message-thread handshake. Verify the RPS path
2704 // runs (loop_count + request samples) and terminates cleanly -- run()
2705 // returns (no deadlock: the RPS thread is the waker, the worker the
2706 // waitee, and shutdown wakes the workers via run_one_message_thread's
2707 // worker-wake + the RPS thread's stop-time fpost-all; a hang is the
2708 // nextest timeout) with no request leak (the post-join free).
2709 let config = SchbenchConfig {
2710 message_threads: 1,
2711 worker_threads: 2,
2712 cache_footprint_kib: 16,
2713 operations: 1,
2714 sleep_usec: 0,
2715 skip_locking: false,
2716 requests_per_sec: 10_000,
2717 auto_rps: 0,
2718 split_percent: None,
2719 pipe_transfer_bytes: 0,
2720 };
2721 let stop = AtomicBool::new(false);
2722 let progress = AtomicU64::new(0);
2723 let outcome = std::thread::scope(|s| {
2724 let runner = s.spawn(|| run(&config, &stop, &progress, None));
2725 while progress.load(Ordering::Relaxed) < 50 {
2726 core::hint::spin_loop();
2727 }
2728 stop.store(true, Ordering::Release);
2729 // Deadlocks here on regression: a worker parked in rps_wait that the
2730 // shutdown path fails to wake, or the RPS thread failing to observe
2731 // stop, would hang this join.
2732 runner.join().expect("run panicked")
2733 });
2734 let result = &outcome.whole_run;
2735 assert!(
2736 result.loop_count >= 50,
2737 "RPS workers serviced requests: {}",
2738 result.loop_count
2739 );
2740 assert!(
2741 result.request.nr_samples > 0,
2742 "RPS request-latency samples recorded"
2743 );
2744 }
2745
2746 #[test]
2747 fn rps_injector_pacing_sleep_is_bounded_for_prompt_shutdown() {
2748 // requests_per_sec=1, message_threads=1 => per-thread inter-arrival
2749 // interval ~1s. A stop set while the injector is in that paced sleep must be
2750 // observed within STOP_POLL_QUANTUM_NS (50 ms), so run()'s message-thread
2751 // join completes well under one interval. An unbounded pacing sleep would
2752 // block the join ~1s. 500 ms is 10x the quantum (robust to scheduling
2753 // jitter) and half the interval (cleanly distinguishes bounded from
2754 // unbounded). The 20 ms warmup reaches the long paced sleep before stop (at
2755 // 1 req/s a progress-spin could outlast the test).
2756 let config = SchbenchConfig {
2757 message_threads: 1,
2758 worker_threads: 1,
2759 cache_footprint_kib: 16,
2760 operations: 1,
2761 sleep_usec: 0,
2762 skip_locking: false,
2763 requests_per_sec: 1, // interval = 1s / 1 = 1s
2764 auto_rps: 0,
2765 split_percent: None,
2766 pipe_transfer_bytes: 0,
2767 };
2768 let stop = AtomicBool::new(false);
2769 let progress = AtomicU64::new(0);
2770 let start = std::time::Instant::now();
2771 std::thread::scope(|s| {
2772 let runner = s.spawn(|| run(&config, &stop, &progress, None));
2773 std::thread::sleep(std::time::Duration::from_millis(20));
2774 stop.store(true, Ordering::Release);
2775 let _ = runner.join().expect("run panicked");
2776 });
2777 let elapsed = start.elapsed();
2778 assert!(
2779 elapsed < std::time::Duration::from_millis(500),
2780 "RPS injector joined in {elapsed:?}; the paced sleep must be bounded \
2781 (interval ~1s — an unbounded sleep would block the join that long)",
2782 );
2783 }
2784
2785 #[test]
2786 fn engine_rps_below_message_threads_falls_to_default() {
2787 // Regression: when the -R total is below the message-thread count,
2788 // the per-thread rate (rps_per_message_thread) rounds to 0, and the
2789 // engine must run the DEFAULT message-handshake mode -- workers do real
2790 // wake-cycle work -- matching schbench's startup-divide-before-gate. NOT
2791 // a zero-rate RPS mode (which would leave the workers parked with no
2792 // requests, never advancing progress -- the spin below would then hang,
2793 // caught by the nextest timeout).
2794 let config = SchbenchConfig {
2795 message_threads: 2,
2796 worker_threads: 1,
2797 cache_footprint_kib: 16,
2798 operations: 1,
2799 sleep_usec: 0,
2800 skip_locking: false,
2801 requests_per_sec: 1, // 1 / 2 message threads = 0 per thread -> default
2802 auto_rps: 0,
2803 split_percent: None,
2804 pipe_transfer_bytes: 0,
2805 };
2806 let stop = AtomicBool::new(false);
2807 let progress = AtomicU64::new(0);
2808 let outcome = std::thread::scope(|s| {
2809 let runner = s.spawn(|| run(&config, &stop, &progress, None));
2810 while progress.load(Ordering::Relaxed) < 50 {
2811 core::hint::spin_loop();
2812 }
2813 stop.store(true, Ordering::Release);
2814 runner.join().expect("run panicked")
2815 });
2816 assert!(
2817 outcome.whole_run.loop_count >= 50,
2818 "sub-1-per-thread RPS ran default-mode work: {}",
2819 outcome.whole_run.loop_count
2820 );
2821 }
2822
2823 #[test]
2824 fn auto_scale_grows_when_idle_and_shrinks_when_busy() {
2825 // Pure scaling math (no /proc/stat I/O) at injected busy levels, matching
2826 // schbench's auto_scale_rps body (`schbench.c:1203-1250`).
2827
2828 // Far under target: delta = 80/10 = 8, capped to 3 -> ceil(100*3) = 300.
2829 // Not a near-target transition, so just_hit is false (the >3 cap branch
2830 // never trips target_hit).
2831 let rate = AtomicUsize::new(100);
2832 let hit = AtomicBool::new(false);
2833 assert!(!scale_rps_for_busy(10.0, &rate, &hit, 80));
2834 assert_eq!(rate.load(Ordering::Acquire), 300);
2835 assert!(!hit.load(Ordering::Acquire));
2836
2837 // Far over target: delta = 20/100 = 0.2, clamped to 0.3 -> floor(300*0.3)
2838 // = 90. Also not a near-target transition.
2839 let rate = AtomicUsize::new(300);
2840 let hit = AtomicBool::new(false);
2841 assert!(!scale_rps_for_busy(100.0, &rate, &hit, 20));
2842 assert_eq!(rate.load(Ordering::Acquire), 90);
2843 assert!(!hit.load(Ordering::Acquire));
2844
2845 // Exactly on target: rate held, target_hit set on the first arrival
2846 // (`schbench.c:1243-1248`).
2847 let rate = AtomicUsize::new(123);
2848 let hit = AtomicBool::new(false);
2849 assert!(scale_rps_for_busy(50.0, &rate, &hit, 50));
2850 assert_eq!(rate.load(Ordering::Acquire), 123);
2851 assert!(hit.load(Ordering::Acquire));
2852 }
2853
2854 #[test]
2855 fn auto_scale_target_hit_uses_post_damping_delta() {
2856 // The near-target test reads the delta AFTER its damping reassignment
2857 // (`schbench.c:1209-1213` grow, `:1232-1236` shrink) -- the bug the
2858 // pre-reassignment check had. These cases trip target_hit ONLY under the
2859 // correct post-damping order.
2860
2861 // Grow band: busy 90, target 100 -> delta 1.111 (NOT < 1.05), damped to
2862 // 1 + 0.111/8 = 1.0139 (< 1.05) -> target_hit. A pre-damping check on
2863 // 1.111 would miss it.
2864 let rate = AtomicUsize::new(1000);
2865 let hit = AtomicBool::new(false);
2866 assert!(scale_rps_for_busy(90.0, &rate, &hit, 100));
2867 assert!(hit.load(Ordering::Acquire));
2868
2869 // Shrink band: busy 100, target 95 -> delta 0.95 (NOT > 0.95), damped to
2870 // 0.95 + 0.05/8 = 0.95625 (> 0.95) -> target_hit. A pre-damping check on
2871 // 0.95 would miss it.
2872 let rate = AtomicUsize::new(1000);
2873 let hit = AtomicBool::new(false);
2874 assert!(scale_rps_for_busy(100.0, &rate, &hit, 95));
2875 assert!(hit.load(Ordering::Acquire));
2876
2877 // Already hit: no second transition is reported (just_hit guards on
2878 // !already_hit), even on an exact-target tick.
2879 let rate = AtomicUsize::new(1000);
2880 let hit = AtomicBool::new(true);
2881 assert!(!scale_rps_for_busy(50.0, &rate, &hit, 50));
2882 }
2883
2884 #[test]
2885 fn reset_rps_accumulators_clears_both_in_lockstep() {
2886 // The auto-RPS target-hit reset MUST clear the whole-run AND per-epoch rps
2887 // accumulators together; if a future edit drops one, Σ-per-epoch != whole-run
2888 // (asymmetric-counter-bookkeeping). Pins the lockstep deterministically — the
2889 // engine just_hit path is timing-dependent (the controller must reach target).
2890 let mut whole = PlatStats::default();
2891 whole.add_lat(500);
2892 let mut per_epoch: std::collections::BTreeMap<u32, PlatStats> =
2893 std::collections::BTreeMap::new();
2894 per_epoch.entry(1).or_default().add_lat(500);
2895 reset_rps_accumulators(&mut whole, &mut per_epoch);
2896 assert_eq!(whole.sample_count(), 0, "whole-run rps reset");
2897 assert!(per_epoch.is_empty(), "per-epoch rps reset in lockstep");
2898 }
2899
2900 #[test]
2901 fn control_loop_rps_sample_floors_sub_quantum_window() {
2902 // The bounded teardown sleep can return on `stop` with a tiny dt; a
2903 // count/dt rate is unbounded as dt->0, so a sub-quantum window is dropped
2904 // rather than spiking rps_max / the top rps percentiles (assertable
2905 // metrics). Pins the floor deterministically -- the race that produces the
2906 // tiny dt is timing-dependent, but the math + floor it feeds is pure.
2907 // dt==0 is dropped, not a divide-by-zero:
2908 assert_eq!(control_loop_rps_sample(3, 0), None);
2909 // A 1us / one-completion window would read as ~1e6 cycles/sec -- dropped:
2910 assert_eq!(control_loop_rps_sample(1, 1_000), None);
2911 // Just under one quantum is still dropped:
2912 assert_eq!(control_loop_rps_sample(5, STOP_POLL_QUANTUM_NS - 1), None);
2913 // Exactly one quantum is admitted; the rate is bounded to
2914 // delta_loops * 1e9 / quantum, no inflation: 1 cycle / 50ms = 20/s.
2915 assert_eq!(control_loop_rps_sample(1, STOP_POLL_QUANTUM_NS), Some(20));
2916 // Even a burst of 1000 completions in one quantum caps at 1000*20 =
2917 // 20_000/s -- not the millions an unfloored 1us window would yield.
2918 assert_eq!(
2919 control_loop_rps_sample(1_000, STOP_POLL_QUANTUM_NS),
2920 Some(20_000)
2921 );
2922 // The common case -- a full ~1s window -- is unchanged: 5000/s.
2923 assert_eq!(control_loop_rps_sample(5_000, 1_000_000_000), Some(5_000));
2924 }
2925
2926 #[test]
2927 fn engine_auto_rps_mode_runs_and_terminates() {
2928 // auto_rps != 0 turns on the once-per-second control thread that
2929 // auto-scales the live rate toward the busy target. Seeded with an
2930 // explicit -R so the injector services requests immediately (progress
2931 // hits 50 well before the first 1s control tick), this proves the
2932 // auto-RPS path boots, injects, and run() terminates cleanly (the control
2933 // thread observes stop; a hang is the nextest timeout). The scaling MATH
2934 // is pinned by the scale_rps_for_busy unit tests; host-busy + target-hit
2935 // timing is environment-dependent, so there is no sample-count assertion.
2936 let config = SchbenchConfig {
2937 message_threads: 1,
2938 worker_threads: 2,
2939 cache_footprint_kib: 16,
2940 operations: 1,
2941 sleep_usec: 0,
2942 skip_locking: false,
2943 requests_per_sec: 10_000,
2944 auto_rps: 50,
2945 split_percent: None,
2946 pipe_transfer_bytes: 0,
2947 };
2948 let stop = AtomicBool::new(false);
2949 let progress = AtomicU64::new(0);
2950 let outcome = std::thread::scope(|s| {
2951 let runner = s.spawn(|| run(&config, &stop, &progress, None));
2952 while progress.load(Ordering::Relaxed) < 50 {
2953 core::hint::spin_loop();
2954 }
2955 stop.store(true, Ordering::Release);
2956 runner.join().expect("run panicked")
2957 });
2958 assert!(
2959 outcome.whole_run.loop_count >= 50,
2960 "auto-RPS injector serviced requests: {}",
2961 outcome.whole_run.loop_count
2962 );
2963 }
2964
2965 #[test]
2966 fn engine_splits_stats_across_phase_epochs() {
2967 // Drive the shared phase_epoch 1 -> 2 mid-run and confirm the engine
2968 // partitions its histograms + loop_count into per-epoch snapshots
2969 // (the scx-phase vs detached-EEVDF-phase mechanism). Event-driven via
2970 // the progress counter, no sleeps.
2971 let config = SchbenchConfig {
2972 message_threads: 1,
2973 worker_threads: 2,
2974 cache_footprint_kib: 16,
2975 operations: 1,
2976 sleep_usec: 0,
2977 skip_locking: false,
2978 requests_per_sec: 0,
2979 auto_rps: 0,
2980 split_percent: None,
2981 pipe_transfer_bytes: 0,
2982 };
2983 let stop = AtomicBool::new(false);
2984 let progress = AtomicU64::new(0);
2985 let epoch = AtomicU32::new(1); // start in a real step epoch (phase 1)
2986 let outcome = std::thread::scope(|s| {
2987 let runner = s.spawn(|| run(&config, &stop, &progress, Some(&epoch)));
2988 while progress.load(Ordering::Relaxed) < 50 {
2989 core::hint::spin_loop();
2990 }
2991 let after_phase1 = progress.load(Ordering::Relaxed);
2992 epoch.store(2, Ordering::Release); // transition to phase 2
2993 while progress.load(Ordering::Relaxed) < after_phase1 + 50 {
2994 core::hint::spin_loop();
2995 }
2996 stop.store(true, Ordering::Release);
2997 runner.join().expect("run panicked")
2998 });
2999
3000 let by_epoch: std::collections::BTreeMap<u32, &SchbenchPhaseStats> =
3001 outcome.phases.iter().map(|(e, s)| (*e, s)).collect();
3002 let p1 = by_epoch.get(&1).expect("phase 1 present");
3003 let p2 = by_epoch.get(&2).expect("phase 2 present");
3004 // Both phases ran work cycles (we waited for >=50 cycles in each
3005 // window). loop_count is the robust split proof: latency samples gate
3006 // on delta_us>0, which a sub-µs cycle can miss, but a cycle always
3007 // increments loop_count.
3008 assert!(p1.loop_count > 0, "phase 1 ran cycles: {}", p1.loop_count);
3009 assert!(p2.loop_count > 0, "phase 2 ran cycles: {}", p2.loop_count);
3010 // Per-phase run-delay split is populated per thread class. This guards a
3011 // DISTINCT path the loop_count/histogram asserts don't touch: the
3012 // schedstat raw-pair baseline/re-baseline (phase_ss_start) + the
3013 // per-class Σ fold (worker vs msg). pcount is the deterministic
3014 // denominator — any thread scheduled in the phase accrues pcount>=1
3015 // (sched_info_arrive), and both phases ran cycles (workers dispatched)
3016 // while the msg thread is scheduled on every wake. run_delay_ns itself
3017 // is NOT asserted: it can be 0 on an uncontended host, so a value check
3018 // would flake; pcount>0 pins that the class was measured and folded.
3019 assert!(
3020 p1.worker_pcount > 0,
3021 "phase 1 worker run-delay split populated"
3022 );
3023 assert!(
3024 p2.worker_pcount > 0,
3025 "phase 2 worker run-delay split populated"
3026 );
3027 assert!(p1.msg_pcount > 0, "phase 1 msg run-delay split populated");
3028 assert!(p2.msg_pcount > 0, "phase 2 msg run-delay split populated");
3029 // loop_count partitions across phases, summing to the global progress —
3030 // no cycle is lost or double-counted at the boundary.
3031 let loop_sum: u64 = outcome.phases.iter().map(|(_, s)| s.loop_count).sum();
3032 assert_eq!(
3033 loop_sum, outcome.whole_run.loop_count,
3034 "per-phase loop_count partitions the whole-run count"
3035 );
3036 // The whole-run histogram is EXACTLY the union of the per-phase ones
3037 // (the drained snapshots, recombined), so the per-phase split loses no
3038 // samples vs the whole-run accounting.
3039 let phase_request_sum: u64 = outcome
3040 .phases
3041 .iter()
3042 .map(|(_, s)| s.request.percentiles().nr_samples)
3043 .sum();
3044 assert_eq!(
3045 outcome.whole_run.request.nr_samples, phase_request_sum,
3046 "whole-run request count == Σ per-phase counts"
3047 );
3048 let phase_wakeup_sum: u64 = outcome
3049 .phases
3050 .iter()
3051 .map(|(_, s)| s.wakeup.percentiles().nr_samples)
3052 .sum();
3053 assert_eq!(
3054 outcome.whole_run.wakeup.nr_samples, phase_wakeup_sum,
3055 "whole-run wakeup count == Σ per-phase counts"
3056 );
3057 // Per-phase RPS: the additive invariant (deterministic) + a timing-tolerant
3058 // bound on the brief first phase. These phases run far under the 1s control
3059 // cadence, so phase 1 normally gets ZERO rps ticks (the ~1s tick lands in
3060 // phase 2, epoch already advanced); on a slow/contended host phase 1 may catch
3061 // a single straddle tick, so we tolerate <= 1 rather than a wall-clock-fragile
3062 // == 0. The deterministic empty-rps -> ABSENT contract is pinned by
3063 // derive_rps_distribution_values_and_absent_guard; this bound still catches
3064 // gross mis-attribution of the bulk of rps to the brief first phase.
3065 assert!(
3066 p1.rps.sample_count() <= 1,
3067 "brief phase 1 gets at most one straddle rps tick, got {}",
3068 p1.rps.sample_count()
3069 );
3070 let phase_rps_sum: u64 = outcome
3071 .phases
3072 .iter()
3073 .map(|(_, s)| s.rps.sample_count())
3074 .sum();
3075 assert_eq!(
3076 outcome.whole_run.rps.nr_samples, phase_rps_sum,
3077 "whole-run rps count == Σ per-phase rps counts"
3078 );
3079 }
3080
3081 #[test]
3082 fn engine_per_phase_rps_populates_and_sums_to_whole_run() {
3083 // POPULATED per-phase RPS: the control thread samples once/sec, so a phase
3084 // window >= ~2s gets multiple per-phase rps samples. Time-gated (the 1s
3085 // control cadence is the granularity floor) -- the progress-gated
3086 // engine_splits phases are too short to tick. Drive epoch 1 for ~2.2s then
3087 // epoch 2 for ~2.2s and confirm BOTH epochs carry rps samples AND the
3088 // per-epoch samples sum to the whole-run count (the additive invariant:
3089 // every sample lands in one epoch bucket AND the whole-run histogram).
3090 // The ~4.4s wall-clock is the irreducible cost of exercising the real 1s control
3091 // cadence end-to-end (2 ticks/epoch for robustness against a delayed tick on a
3092 // loaded host); the deterministic per-epoch invariants — the lockstep reset and
3093 // Σ-per-epoch==whole-run — are pinned sleep-free by
3094 // reset_rps_accumulators_clears_both_in_lockstep + engine_splits_stats_across_phase_epochs.
3095 // This test uniquely confirms the LIVE control thread POPULATES both epochs.
3096 let config = SchbenchConfig {
3097 message_threads: 1,
3098 worker_threads: 2,
3099 cache_footprint_kib: 16,
3100 operations: 1,
3101 sleep_usec: 0,
3102 skip_locking: false,
3103 requests_per_sec: 0,
3104 auto_rps: 0,
3105 split_percent: None,
3106 pipe_transfer_bytes: 0,
3107 };
3108 let stop = AtomicBool::new(false);
3109 let progress = AtomicU64::new(0);
3110 let epoch = AtomicU32::new(1);
3111 let outcome = std::thread::scope(|s| {
3112 let runner = s.spawn(|| run(&config, &stop, &progress, Some(&epoch)));
3113 // ~2.2s per epoch => >= 2 control ticks each (first tick at ~1s).
3114 std::thread::sleep(std::time::Duration::from_millis(2200));
3115 epoch.store(2, Ordering::Release);
3116 std::thread::sleep(std::time::Duration::from_millis(2200));
3117 stop.store(true, Ordering::Release);
3118 runner.join().expect("run panicked")
3119 });
3120 let by_epoch: std::collections::BTreeMap<u32, &SchbenchPhaseStats> =
3121 outcome.phases.iter().map(|(e, s)| (*e, s)).collect();
3122 let p1 = by_epoch.get(&1).expect("phase 1 present");
3123 let p2 = by_epoch.get(&2).expect("phase 2 present");
3124 assert!(
3125 p1.rps.sample_count() > 0,
3126 "phase 1 per-phase rps populated: {}",
3127 p1.rps.sample_count()
3128 );
3129 assert!(
3130 p2.rps.sample_count() > 0,
3131 "phase 2 per-phase rps populated: {}",
3132 p2.rps.sample_count()
3133 );
3134 // Additive invariant (no auto-RPS reset in default mode, so EXACT): Σ
3135 // per-epoch rps samples == whole-run rps samples.
3136 let phase_rps_sum: u64 = outcome
3137 .phases
3138 .iter()
3139 .map(|(_, s)| s.rps.sample_count())
3140 .sum();
3141 assert_eq!(
3142 phase_rps_sum, outcome.whole_run.rps.nr_samples,
3143 "Σ per-epoch rps samples == whole-run rps samples"
3144 );
3145 }
3146
3147 #[test]
3148 fn schbench_config_serde_roundtrips() {
3149 // The new serialized type roundtrips unchanged.
3150 let cfg = SchbenchConfig::default()
3151 .message_threads(3)
3152 .worker_threads(7)
3153 .cache_footprint_kib(512)
3154 .operations(9)
3155 .sleep_usec(250)
3156 .skip_locking(true)
3157 // Exercise the new field's Some(p) serde surface, not just the None
3158 // default (the API-review roundtrip requirement).
3159 .split_percent(Some(33))
3160 // Exercise the pipe field's non-default serde surface too.
3161 .pipe_transfer_bytes(4096);
3162 let json = serde_json::to_string(&cfg).expect("SchbenchConfig must serialize");
3163 let back: SchbenchConfig =
3164 serde_json::from_str(&json).expect("SchbenchConfig must deserialize");
3165 assert_eq!(cfg, back, "config roundtrips unchanged");
3166 }
3167
3168 #[test]
3169 fn worktype_schbench_registration_and_serde() {
3170 use crate::workload::WorkType;
3171 let wt = WorkType::schbench(
3172 SchbenchConfig::default()
3173 .message_threads(2)
3174 .worker_threads(4),
3175 );
3176 assert_eq!(wt.name(), "Schbench");
3177 // from_name yields the default-config variant.
3178 assert_eq!(
3179 WorkType::from_name("Schbench"),
3180 Some(WorkType::Schbench {
3181 config: SchbenchConfig::default()
3182 })
3183 );
3184 // The variant serde-roundtrips, carrying its config.
3185 let json = serde_json::to_string(&wt).expect("WorkType::Schbench must serialize");
3186 let back: WorkType =
3187 serde_json::from_str(&json).expect("WorkType::Schbench must deserialize");
3188 assert_eq!(wt, back);
3189 }
3190
3191 #[test]
3192 fn schbench_config_reachable_via_prelude() {
3193 // Regression-pin the prelude placement: test authors construct the
3194 // config via `use ktstr::prelude::*`. Dropping SchbenchConfig from the
3195 // prelude would fail this compile. Also exercises the Eq derive.
3196 let cfg: crate::prelude::SchbenchConfig = crate::prelude::SchbenchConfig::default();
3197 assert_eq!(cfg, SchbenchConfig::default());
3198 }
3199
3200 #[test]
3201 fn read_schedstat_raw_parses_own_and_handles_missing() {
3202 // The current thread has been scheduled, so its /proc/<tid>/schedstat
3203 // parses without panic into a (run_delay, pcount) pair.
3204 let _own = read_schedstat_raw(gettid_self());
3205 // A non-existent tid -> (0,0) via the file-read-failure path (thread
3206 // exited), matching schbench's fopen-failure handling. The parse
3207 // boundaries (incl. the pcount==0 mean guard) are covered below.
3208 assert_eq!(
3209 read_schedstat_raw(-1),
3210 (0, 0),
3211 "absent schedstat yields (0,0), no panic"
3212 );
3213 }
3214
3215 #[test]
3216 fn schedstat_raw_parse_and_mean_pcount_guard() {
3217 // Raw parse keeps run_delay + pcount undivided (the re-poolable pair).
3218 assert_eq!(parse_schedstat_raw("123456 50 5"), (50, 5));
3219 // mean_sched_delay divides with the pcount==0 guard (no div-by-zero).
3220 assert_eq!(mean_sched_delay((50, 5)), 10); // run_delay/pcount
3221 assert_eq!(mean_sched_delay((50, 0)), 0); // pcount==0 guard
3222 assert_eq!(mean_sched_delay(parse_schedstat_raw("0 0 0")), 0); // !sched_info_on()
3223 assert_eq!(mean_sched_delay((0, 5)), 0); // 0 run_delay -> 0 mean
3224 }
3225
3226 #[test]
3227 #[should_panic(expected = "schedstat field 3")]
3228 fn parse_schedstat_raw_short_line_panics() {
3229 // A present-but-short line is a kernel/parse bug: fail loud, not a
3230 // silent 0 (the fail-loud ruling).
3231 parse_schedstat_raw("100 50");
3232 }
3233
3234 #[test]
3235 #[should_panic(expected = "schedstat field 2")]
3236 fn parse_schedstat_raw_nonnumeric_panics() {
3237 parse_schedstat_raw("alpha beta gamma");
3238 }
3239}