ktstr/workload/taobench/run.rs
1//! The taobench engine: a bounded, sharded, evicting key-value cache served by a
2//! client population (closed-loop by default, or open-loop fixed-rate arrival),
3//! with a fast in-cache path (hit) and a slow backing-store-miss path (a
4//! worker-defined sleep on a dispatcher thread). It is
5//! entirely in-process — no sockets, no TLS, no subprocess — and re-expresses the
6//! taobench ACCESS PATTERN in ktstr primitives so its qps / hit-ratio flow
7//! through the metric API. See `validation.md` for the per-aspect real-vs-port
8//! fidelity comparison.
9//!
10//! ## Model
11//!
12//! - A `Cache` of `SHARDS` independently-locked shards, each a FIFO-evicting
13//! map bounded to a per-shard object cap. Total capacity is `cache_capacity_mib`
14//! worth of objects; the key range is sized `capacity / target_hit` so a
15//! uniform-random key stream hits the resident set with probability
16//! ≈ `target_hit` at equilibrium. Eviction is the load-bearing mechanism: with
17//! no eviction a self-healing cache drifts to a 1.0 hit ratio; a bounded cache
18//! whose key range exceeds its capacity holds a steady-state miss stream.
19//! - `client_threads` CLIENT threads pick a key, look it up, and on a HIT touch
20//! the stored value bytes (the cache read-bandwidth cost) and count a fast op;
21//! on a MISS hand the key to a slow dispatcher and block until it is filled,
22//! then count a slow op.
23//! - Arrival: `arrival_rate == 0` (default) is CLOSED loop — each client issues
24//! its next request as soon as the prior completes. `arrival_rate > 0` is OPEN
25//! loop — each client has a fixed intended-arrival SCHEDULE
26//! (`arrival_rate / client_threads` per client) independent of completion, and
27//! serve latency is measured from that intended time. A client still holds at
28//! most one outstanding request (it blocks on a miss before issuing the next),
29//! so a slow completion delays the next issue; that backlog is folded into the
30//! late requests' serve latency (coordinated-omission correction) instead of
31//! being omitted. The serve-latency histogram is per-phase data (empty in
32//! closed loop).
33//! - `slow_threads` DISPATCHER threads serve misses: sleep the simulated
34//! backing-store fetch (a worker-defined cost, the same model schbench's
35//! think-sleep uses, not a synchronization wait), insert a freshly sized+touched
36//! value, and wake the waiting client. The fetch latency is a fixed
37//! `slow_path_sleep_us` by default, or — when `slow_path_p99_us` is set above it
38//! — a per-fetch heavy-tailed Pareto draw with median `slow_path_sleep_us` and
39//! 99th percentile `slow_path_p99_us` (most fetches near the median, a rare slow
40//! tail), so the open-loop serve-latency tail reflects realistic backing-store
41//! variance.
42//!
43//! ## Counters (request-time vs response-time)
44//!
45//! `get_cmds` / `get_misses` are counted at LOOKUP (request) time; `fast_ops` /
46//! `slow_ops` at COMPLETION (response) time. In a closed loop they are equal once
47//! every in-flight request drains, but a request that straddles a phase boundary
48//! lands its lookup in one phase and its completion in the next, so a per-phase
49//! command-time hit_rate (`1 - get_misses/get_cmds`) differs slightly from the
50//! response-time hit_ratio (`fast_ops/(fast_ops+slow_ops)`) — the same skew the
51//! real reports between its interval hit_rate and its final hit_ratio.
52
53use crate::workload::schbench::plat::PlatStats;
54use core::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
55use std::collections::HashMap;
56use std::collections::{BTreeMap, VecDeque};
57use std::sync::{Condvar, Mutex};
58
59/// Number of independently-locked cache shards (power of two). Sized to keep
60/// per-shard lock contention low for typical guest thread counts while bounding
61/// the lock-array footprint; the cache capacity is split evenly across shards.
62const SHARDS: usize = 256;
63
64/// Representative value-size distribution: a small-object-heavy, long-tailed
65/// histogram (bytes → relative weight) approximating the empirical object-size
66/// profile of the real workload (mean ≈ 332 B, tail to 64 KiB). This is ktstr's
67/// own approximation, not a copy of any external size table.
68const VALUE_SIZES: [usize; 8] = [64, 128, 256, 512, 1024, 4096, 16384, 65536];
69const VALUE_WEIGHTS: [u32; 8] = [450, 300, 130, 70, 30, 12, 3, 1];
70
71/// Cap on a single open-loop pacing sleep (ns). A client waiting for its next
72/// scheduled arrival re-checks `stop` at least this often, so a shutdown is
73/// observed within this bound rather than after a full inter-arrival interval —
74/// which at a degenerate low `arrival_rate` (`interval_ns = 1e9 * n_clients /
75/// arrival_rate`) can exceed the scenario cleanup budget and risk a watchdog kill.
76/// At realistic open-loop rates the per-arrival interval is well under this, so the
77/// pacing sleep is a single uncapped wait — the cap only engages at pathological
78/// low rates.
79const STOP_POLL_QUANTUM_NS: u64 = 50_000_000;
80
81/// Read `CLOCK_MONOTONIC` as nanoseconds (monotonic, not wall-clock), matching
82/// the schbench engine's clock source.
83fn monotonic_nanos() -> u64 {
84 // SAFETY: `clock_gettime` writes a `timespec` through the out-pointer and
85 // reads nothing else; CLOCK_MONOTONIC is always available on Linux.
86 let mut ts: libc::timespec = unsafe { core::mem::zeroed() };
87 let rc = unsafe { libc::clock_gettime(libc::CLOCK_MONOTONIC, &mut ts) };
88 assert_eq!(rc, 0, "clock_gettime(CLOCK_MONOTONIC) failed");
89 (ts.tv_sec as u64) * 1_000_000_000 + ts.tv_nsec as u64
90}
91
92/// The simulated backing-store fetch on the slow path. Worker-defined cost (like
93/// schbench's think-sleep), not a synchronization wait: `std::thread::sleep` maps
94/// to `clock_nanosleep` on Linux. `0` is a no-op (the dispatcher still does the
95/// fill + wakeup, so the slow path remains a distinct thread hop). The caller
96/// passes either the fixed `slow_path_sleep_us` or, when the heavy tail is enabled,
97/// a per-fetch [`sample_service_us`] Pareto draw — this function just sleeps `usec`.
98fn backing_store_fetch(usec: u64) {
99 if usec > 0 {
100 std::thread::sleep(std::time::Duration::from_micros(usec));
101 }
102}
103
104/// Upper bound on a single heavy-tailed slow-path fetch (µs). Pareto is unbounded;
105/// a pathological tail draw must not park a dispatcher off-CPU for an unbounded
106/// time — the dispatcher must complete the fetch + fill + wake for the blocked
107/// client (it cannot be interrupted on shutdown without breaking that contract), so
108/// this caps each fetch at 2 s (far above any realistic backing-store p99.9). The
109/// worst-case client-join/teardown is NOT a single fetch: at shutdown up to
110/// `n_clients` misses are outstanding and each dispatcher drains its queue share
111/// SERIALLY, so the last client can wait up to `ceil(n_clients / n_slow)` fetches —
112/// with the default staffing (`n_slow ≈ n_clients/3`) ~3·MAX_SERVICE_US. Size the
113/// scenario watchdog / cleanup budget accordingly when configuring a deep tail with
114/// many clients per dispatcher. This caps only the per-FETCH off-CPU time, not a
115/// whole serve-latency sample (fetch + open-loop queueing/coordinated-omission
116/// backlog, which can exceed it); the serve sample is kept inside the `PlatStats`
117/// u32 range by `record_serve_lat`'s own `(ns/1000).min(u32::MAX)` clamp,
118/// independent of this cap. Only the fixed legacy path is uncapped (the user sets
119/// that latency directly).
120const MAX_SERVICE_US: u64 = 2_000_000;
121
122/// Draw one heavy-tailed slow-path service time (µs) from the Pareto resolved by
123/// [`TaobenchConfig::resolve_service_pareto`]: `scale · U^(−1/α)` with `U ∈ (0, 1]`
124/// ([`Rng::f64_open01`]). `U = 1` yields the scale (the floor, below the median);
125/// smaller `U` yields the tail. Clamped to [`MAX_SERVICE_US`]. `inv_neg_alpha`
126/// is the precomputed `−1/α` exponent.
127fn sample_service_us(rng: &mut Rng, scale: f64, inv_neg_alpha: f64) -> u64 {
128 let us = scale * rng.f64_open01().powf(inv_neg_alpha);
129 (us as u64).min(MAX_SERVICE_US)
130}
131
132/// Touch every byte of a served/filled value so the read cannot be elided — the
133/// cache memory-bandwidth cost that makes this a cache workload rather than a
134/// control-flow micro-benchmark (schbench's `black_box`-guarded memset is the
135/// precedent). Returns the (black-boxed) checksum.
136fn touch(bytes: &[u8]) -> u64 {
137 let mut acc = 0u64;
138 for &b in bytes {
139 acc = acc.wrapping_add(b as u64);
140 }
141 std::hint::black_box(acc)
142}
143
144/// A small, fast, per-thread xorshift64 PRNG (workload key/size sampling only —
145/// not cryptographic). Seeded distinctly per thread so threads do not march in
146/// lockstep.
147struct Rng(u64);
148
149impl Rng {
150 fn new(seed: u64) -> Self {
151 // Avoid the xorshift fixed point at 0.
152 Rng(seed ^ 0x9E37_79B9_7F4A_7C15)
153 }
154 fn next_u64(&mut self) -> u64 {
155 let mut x = self.0;
156 x ^= x << 13;
157 x ^= x >> 7;
158 x ^= x << 17;
159 self.0 = x;
160 x
161 }
162 /// Uniform in `[0, n)` (`n > 0`); a modulo reduction — the tiny bias is
163 /// irrelevant for a workload key stream.
164 fn below(&mut self, n: u64) -> u64 {
165 self.next_u64() % n
166 }
167 /// Uniform `f64` in `(0, 1]` (open at 0, closed at 1) from the top 53 bits —
168 /// the `OpenClosed01` form the Pareto inverse-CDF needs ([`sample_service_us`]):
169 /// `U = 0` would make `U^(−1/α)` infinite, so 0 is excluded. Workload sampling
170 /// only, not cryptographic.
171 fn f64_open01(&mut self) -> f64 {
172 // Top 53 bits + 1 -> integer in 1..=2^53; / 2^53 -> (0, 1].
173 let bits = (self.next_u64() >> 11) + 1;
174 bits as f64 * (1.0 / ((1u64 << 53) as f64))
175 }
176}
177
178/// Sample a value size from [`VALUE_SIZES`] weighted by [`VALUE_WEIGHTS`].
179fn sample_value_size(rng: &mut Rng) -> usize {
180 let total: u32 = VALUE_WEIGHTS.iter().sum();
181 let mut pick = (rng.below(total as u64)) as u32;
182 for (i, &w) in VALUE_WEIGHTS.iter().enumerate() {
183 if pick < w {
184 return VALUE_SIZES[i];
185 }
186 pick -= w;
187 }
188 VALUE_SIZES[VALUE_SIZES.len() - 1]
189}
190
191/// Build a freshly-allocated, byte-filled value of `size` (the fill touches every
192/// byte, the write-side bandwidth cost).
193fn make_value(size: usize) -> Box<[u8]> {
194 vec![0xABu8; size].into_boxed_slice()
195}
196
197// ---------------------------------------------------------------------------
198// Config
199// ---------------------------------------------------------------------------
200
201/// User-facing config for the [`Taobench`](crate::workload::WorkType::Taobench)
202/// workload — a bounded, evicting key-value cache with a fast hit path and a slow
203/// miss path, driven to a steady-state hit ratio.
204///
205/// All fields are integer/scalar so the type keeps `Eq + Hash` (fractional knobs
206/// are expressed as integer percents). Every field has a chainable builder
207/// setter; [`Default`] is a useful working config.
208#[derive(Clone, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
209#[serde(rename_all = "snake_case")]
210pub struct TaobenchConfig {
211 /// CLIENT threads that issue lookups and serve hits. `0` resolves
212 /// to the allocated guest cpuset CPU count (one client per CPU).
213 pub client_threads: usize,
214 /// SLOW dispatcher threads that serve misses (sleep + fill + wake). `0`
215 /// resolves to `max(1, client_threads / 3)`, the real's fast:slow staffing
216 /// ratio.
217 pub slow_threads: usize,
218 /// Resident cache budget in MiB. The cache FIFO-evicts to stay near this many
219 /// bytes' worth of objects; larger than the guest LLC makes the value touch a
220 /// real memory-bandwidth cost.
221 pub cache_capacity_mib: usize,
222 /// Target steady-state hit ratio, in percent (`1..=99`). The key range is
223 /// sized `capacity / target_hit` so a uniform key stream hits at this rate at
224 /// equilibrium. Clamped into range at consumption.
225 pub target_hit_pct: usize,
226 /// Simulated backing-store fetch latency on a miss, in microseconds (the slow
227 /// dispatcher sleeps this long before filling). `0` keeps the slow path as a
228 /// pure thread hop with no sleep (and disables the heavy tail — a zero median
229 /// has no Pareto scale, so the fetch stays a no-op regardless of
230 /// `slow_path_p99_us`). When `slow_path_sleep_us > 0` and `slow_path_p99_us >
231 /// slow_path_sleep_us` this is the MEDIAN (p50) of a heavy-tailed service-time
232 /// distribution rather than a fixed latency (see `slow_path_p99_us`).
233 pub slow_path_sleep_us: u64,
234 /// Heavy-tailed slow-path service time: the p99 of the simulated backing-store
235 /// fetch, in microseconds. When `> slow_path_sleep_us`, each miss's fetch sleep
236 /// is drawn from a Pareto distribution whose median is `slow_path_sleep_us` and
237 /// whose 99th percentile is this value — most fetches near the median, a heavy
238 /// tail of rare slow fetches (GC pauses, cold cache, disk seeks), the realistic
239 /// power-law shape a backing store exhibits. `0` (default), any value
240 /// `<= slow_path_sleep_us`, OR `slow_path_sleep_us == 0` (a zero median has no
241 /// Pareto scale) keeps the fixed-latency behavior (every fetch sleeps exactly
242 /// `slow_path_sleep_us` — the legacy path, byte-identical). The tail is
243 /// clamped at a fixed maximum so a pathological draw cannot park a dispatcher
244 /// off-CPU unboundedly. A ktstr enhancement beyond the reference, whose
245 /// per-request slow-path service time is a FIXED sleep with no tail (the
246 /// reference's `[target/2, 2×target]` uniform jitter + idle-poll backoff is a
247 /// separate poll/idle-wait knob, not the per-request fetch), so the open-loop
248 /// serve-latency tail ([`crate::workload::TaobenchConfig::arrival_rate`])
249 /// reflects realistic
250 /// backing-store variance.
251 pub slow_path_p99_us: u64,
252 /// Open-loop arrival rate, AGGREGATE ops/sec across all client threads (the
253 /// taobench analog of schbench's `-R`). `0` (default) = CLOSED loop: each
254 /// client blocks until its request completes before issuing the next (the
255 /// legacy behavior, byte-identical). Non-zero = OPEN loop: each client has a
256 /// fixed intended-arrival SCHEDULE (`arrival_rate / client_threads` per client)
257 /// independent of completion, and serve latency is measured from that intended
258 /// time. A client still holds at most one outstanding request, so a slow
259 /// completion delays the next issue; that backlog is folded into the late
260 /// requests' serve latency (coordinated-omission correction) rather than
261 /// omitted. Divided evenly across resolved clients. (Named for the standard
262 /// queueing-theory term; the schbench analog is `requests_per_sec`, which is a
263 /// verbatim mirror of schbench's own `-R` CLI flag — a constraint this
264 /// ktstr-added field, with no reference `-R` to mirror, does not share.)
265 pub arrival_rate: usize,
266}
267
268impl Default for TaobenchConfig {
269 fn default() -> Self {
270 Self {
271 client_threads: 0,
272 slow_threads: 0,
273 cache_capacity_mib: 64,
274 target_hit_pct: 90,
275 slow_path_sleep_us: 100,
276 slow_path_p99_us: 0,
277 arrival_rate: 0,
278 }
279 }
280}
281
282impl TaobenchConfig {
283 /// Set the client thread count (`0` = one per allocated CPU).
284 #[must_use = "builder methods consume self; bind the result"]
285 pub fn client_threads(mut self, n: usize) -> Self {
286 self.client_threads = n;
287 self
288 }
289 /// Set the slow dispatcher thread count (`0` = `max(1, client_threads/3)`).
290 #[must_use = "builder methods consume self; bind the result"]
291 pub fn slow_threads(mut self, n: usize) -> Self {
292 self.slow_threads = n;
293 self
294 }
295 /// Set the resident cache budget in MiB.
296 #[must_use = "builder methods consume self; bind the result"]
297 pub fn cache_capacity_mib(mut self, mib: usize) -> Self {
298 self.cache_capacity_mib = mib;
299 self
300 }
301 /// Set the target steady-state hit ratio in percent (`1..=99`).
302 #[must_use = "builder methods consume self; bind the result"]
303 pub fn target_hit_pct(mut self, pct: usize) -> Self {
304 self.target_hit_pct = pct;
305 self
306 }
307 /// Set the simulated backing-store fetch latency on a miss, microseconds (the
308 /// MEDIAN when `slow_path_p99_us` enables the heavy tail; `0` keeps the fetch a
309 /// no-op and also disables the tail — a zero median has no Pareto scale).
310 #[must_use = "builder methods consume self; bind the result"]
311 pub fn slow_path_sleep_us(mut self, us: u64) -> Self {
312 self.slow_path_sleep_us = us;
313 self
314 }
315 /// Set the heavy-tailed slow-path service-time p99 in microseconds (`0` or
316 /// `<= slow_path_sleep_us` = fixed latency, the legacy behavior; also fixed when
317 /// `slow_path_sleep_us == 0`, a zero median having no Pareto scale). When larger
318 /// than a non-zero `slow_path_sleep_us`, each miss's fetch is drawn from a Pareto
319 /// with median `slow_path_sleep_us` and this p99.
320 #[must_use = "builder methods consume self; bind the result"]
321 pub fn slow_path_p99_us(mut self, us: u64) -> Self {
322 self.slow_path_p99_us = us;
323 self
324 }
325 /// Set the open-loop AGGREGATE arrival rate in ops/sec across all clients
326 /// (`0` = closed loop). Divided evenly across resolved client threads; serve
327 /// latency is then measured from the intended arrival (coordinated-omission).
328 #[must_use = "builder methods consume self; bind the result"]
329 pub fn arrival_rate(mut self, ops_per_sec: usize) -> Self {
330 self.arrival_rate = ops_per_sec;
331 self
332 }
333
334 /// Resolve the client thread count: the configured value, or the allocated
335 /// CPU count when `0`.
336 fn resolve_client_threads(&self, allowed_cpus: usize) -> usize {
337 if self.client_threads == 0 {
338 allowed_cpus.max(1)
339 } else {
340 self.client_threads
341 }
342 }
343
344 /// Resolve the slow dispatcher count: the configured value, or
345 /// `max(1, clients/3)` when `0`.
346 fn resolve_slow_threads(&self, clients: usize) -> usize {
347 if self.slow_threads == 0 {
348 (clients / 3).max(1)
349 } else {
350 self.slow_threads
351 }
352 }
353
354 /// Clamp the target hit ratio to a usable open interval (a 0% or ≥100% target
355 /// has no finite key range / no miss stream).
356 fn target_hit_fraction(&self) -> f64 {
357 (self.target_hit_pct.clamp(1, 99) as f64) / 100.0
358 }
359
360 /// Resolve the heavy-tailed slow-path service-time model from the `(p50, p99)`
361 /// knob: `None` = fixed `slow_path_sleep_us` (the legacy path); `Some((scale,
362 /// inv_neg_alpha))` = a Pareto whose median is `slow_path_sleep_us` and whose
363 /// p99 is `slow_path_p99_us`, sampled by [`sample_service_us`].
364 ///
365 /// `slow_path_p99_us <= slow_path_sleep_us` (incl. `0`) resolves to `None` — a
366 /// p99 at or below the median is not a tail. `slow_path_sleep_us == 0` also
367 /// resolves to `None`: a zero median has no Pareto scale, and the legacy no-op
368 /// fetch is preserved.
369 ///
370 /// Mapping: the Pareto q-quantile is `scale·(1−q)^(−1/α)`, so from the two
371 /// quantiles `p99/p50 = (0.01/0.5)^(−1/α) = 50^(1/α)`, giving
372 /// `α = ln(50) / ln(p99/p50)` and `scale = p50·0.5^(1/α)`. `p99 > p50`
373 /// guarantees `ln(p99/p50) > 0 ⇒ α > 0`, and `α ≥ ln(50)/ln(spread)` keeps the
374 /// mean finite (`α > 1`) for any spread below 50x. The stored `inv_neg_alpha =
375 /// −1/α` is the exponent [`sample_service_us`] raises `U` to.
376 fn resolve_service_pareto(&self) -> Option<(f64, f64)> {
377 let p50 = self.slow_path_sleep_us;
378 let p99 = self.slow_path_p99_us;
379 if p50 == 0 || p99 <= p50 {
380 return None;
381 }
382 let (p50f, p99f) = (p50 as f64, p99 as f64);
383 let alpha = 50.0_f64.ln() / (p99f / p50f).ln();
384 let scale = p50f * 0.5_f64.powf(1.0 / alpha);
385 Some((scale, -1.0 / alpha))
386 }
387}
388
389// ---------------------------------------------------------------------------
390// Cache
391// ---------------------------------------------------------------------------
392
393/// One cache shard: a FIFO-evicting map bounded to `cap` objects. FIFO eviction
394/// over a uniform key stream yields the same equilibrium hit ratio as LRU
395/// (resident_fraction = cap / key_range) while being O(1) and lock-cheap.
396struct Shard {
397 map: HashMap<u64, Box<[u8]>>,
398 fifo: VecDeque<u64>,
399 cap: usize,
400}
401
402impl Shard {
403 fn with_cap(cap: usize) -> Self {
404 Shard {
405 map: HashMap::new(),
406 fifo: VecDeque::new(),
407 cap,
408 }
409 }
410 /// Look up `k`; on a hit, touch its bytes and return `true`.
411 fn get_touch(&self, k: u64) -> bool {
412 match self.map.get(&k) {
413 Some(v) => {
414 touch(v);
415 true
416 }
417 None => false,
418 }
419 }
420 /// Insert a freshly-built value for `k` — called at warmup and on a miss,
421 /// where `k` is absent; the `is_none()` guard keeps it correct even if `k`
422 /// is already present. FIFO-evicts down to `cap`.
423 fn insert(&mut self, k: u64, v: Box<[u8]>) {
424 if self.map.insert(k, v).is_none() {
425 self.fifo.push_back(k);
426 while self.map.len() > self.cap {
427 match self.fifo.pop_front() {
428 Some(old) => {
429 self.map.remove(&old);
430 }
431 None => break,
432 }
433 }
434 }
435 }
436}
437
438/// A bounded, sharded, FIFO-evicting cache. Keys map to shards by the low bits of
439/// the key (`SHARDS` is a power of two).
440struct Cache {
441 shards: Vec<Mutex<Shard>>,
442}
443
444impl Cache {
445 /// Build a cache holding ≈ `total_objects` across `SHARDS` shards.
446 fn new(total_objects: usize) -> Self {
447 let per_shard = (total_objects / SHARDS).max(1);
448 let shards = (0..SHARDS)
449 .map(|_| Mutex::new(Shard::with_cap(per_shard)))
450 .collect();
451 Cache { shards }
452 }
453 fn shard(&self, k: u64) -> &Mutex<Shard> {
454 &self.shards[(k as usize) & (SHARDS - 1)]
455 }
456 /// Look up + touch `k`; `true` on a hit.
457 fn get_touch(&self, k: u64) -> bool {
458 self.shard(k)
459 .lock()
460 .expect("cache shard poisoned")
461 .get_touch(k)
462 }
463 /// Fill `k` with a freshly sized + built value (the value bytes are touched
464 /// by `make_value`); FIFO-evicts to keep the shard bounded.
465 fn fill(&self, k: u64, size: usize) {
466 let v = make_value(size);
467 self.shard(k)
468 .lock()
469 .expect("cache shard poisoned")
470 .insert(k, v);
471 }
472}
473
474// ---------------------------------------------------------------------------
475// Slow-path handoff
476// ---------------------------------------------------------------------------
477
478/// A miss handed from a client to a slow dispatcher. The client measures serve
479/// latency from its own local intended-arrival timestamp after `wait_filled`, so
480/// the request itself carries no timing — only the key + the client to wake.
481struct SlowReq {
482 key: u64,
483 client: usize,
484}
485
486/// The slow-request queue: dispatchers block here when idle; clients push misses
487/// and wake one dispatcher.
488struct SlowQueue {
489 q: Mutex<VecDeque<SlowReq>>,
490 cv: Condvar,
491}
492
493impl SlowQueue {
494 fn new() -> Self {
495 SlowQueue {
496 q: Mutex::new(VecDeque::new()),
497 cv: Condvar::new(),
498 }
499 }
500 fn push(&self, req: SlowReq) {
501 self.q.lock().expect("slow queue poisoned").push_back(req);
502 self.cv.notify_one();
503 }
504}
505
506/// A per-client response slot: the dispatcher sets `done` + notifies after the
507/// fill; the client blocks here until its outstanding miss is served. At most one
508/// outstanding request per client in BOTH arrival modes (a client blocks on
509/// `wait_filled` for a miss before issuing its next request), so the slot is
510/// reused. This single-outstanding-request serialization is exactly why open-loop
511/// serve latency must be measured from the intended arrival (coordinated-omission):
512/// a slow completion delays the next issue, and that backlog is folded into the
513/// late requests' latency instead of omitted.
514struct Slot {
515 done: Mutex<bool>,
516 cv: Condvar,
517}
518
519impl Slot {
520 fn new() -> Self {
521 Slot {
522 done: Mutex::new(false),
523 cv: Condvar::new(),
524 }
525 }
526 /// Block until the dispatcher marks this slot done, then reset it.
527 fn wait_filled(&self) {
528 let mut done = self.done.lock().expect("slot poisoned");
529 while !*done {
530 done = self.cv.wait(done).expect("slot poisoned");
531 }
532 *done = false;
533 }
534 /// Mark this slot filled and wake the waiting client.
535 fn signal(&self) {
536 *self.done.lock().expect("slot poisoned") = true;
537 self.cv.notify_one();
538 }
539}
540
541// ---------------------------------------------------------------------------
542// Stats
543// ---------------------------------------------------------------------------
544
545/// Taobench engine counters for one accounting window — a single phase
546/// epoch (the per-phase `crate::workload::PhaseSlice::taobench` carrier) or a
547/// whole worker run (the [`crate::workload::WorkerReport::taobench_whole`] /
548/// [`crate::assert::CgroupStats::taobench_whole`] aggregate). Integer-only so the
549/// enclosing `PhaseSlice` keeps `Eq`. `get_cmds` / `get_misses` are request-time;
550/// `fast_ops` / `slow_ops` are response-time (see the module docs). `Self::merge`
551/// pools two windows (Σ ops, MAX wall) and [`Self::total_ops`] is the throughput
552/// numerator; the host derives the run-level `taobench_*` Rate metrics from the
553/// pooled aggregate.
554#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
555pub struct TaobenchStats {
556 /// Lookups issued (request time).
557 pub get_cmds: u64,
558 /// Lookups that missed (request time).
559 pub get_misses: u64,
560 /// Hits served (response time).
561 pub fast_ops: u64,
562 /// Misses served via the slow path (response time).
563 pub slow_ops: u64,
564 /// Wall-clock window this stat covers, ns — the qps denominator. Per-phase:
565 /// the phase segment; whole-run: the run window. Merged as MAX (the window is
566 /// shared by the concurrent threads/workers being pooled, not summed).
567 pub elapsed_ns: u64,
568}
569
570impl TaobenchStats {
571 pub(crate) fn merge(&mut self, o: &TaobenchStats) {
572 // saturating_add: these are guest-runtime monotonic op counters pooled
573 // across concurrent workers/cgroups. Overflow is unreachable for honest
574 // data (real counts ~1e7..1e14 << u64::MAX), but a corrupt/hostile guest
575 // value would otherwise debug-panic or release-wrap to a silently-wrong
576 // qps/hit Rate; saturating is exact for every in-range value.
577 self.get_cmds = self.get_cmds.saturating_add(o.get_cmds);
578 self.get_misses = self.get_misses.saturating_add(o.get_misses);
579 self.fast_ops = self.fast_ops.saturating_add(o.fast_ops);
580 self.slow_ops = self.slow_ops.saturating_add(o.slow_ops);
581 // The wall window is shared across the pooled concurrent threads/workers
582 // (they run the same phase at the same time), so MAX, not sum.
583 self.elapsed_ns = self.elapsed_ns.max(o.elapsed_ns);
584 }
585 /// Completed ops (fast + slow) — the throughput numerator.
586 pub fn total_ops(&self) -> u64 {
587 self.fast_ops.saturating_add(self.slow_ops)
588 }
589}
590
591/// One PER-PHASE accounting window's taobench carrier: the integer counters
592/// ([`TaobenchStats`]) plus the open-loop serve-latency histogram. Internal
593/// (`pub(crate)`) — the taobench analog of
594/// [`crate::workload::schbench::run::SchbenchPhaseStats`] (counters + `PlatStats`
595/// histograms together), and like it, never on a public field. It rides only the
596/// per-phase carriers (`PhaseSlice::taobench` / `PhaseCgroupStats::taobench`); the
597/// WHOLE-RUN carrier is the counters-only [`TaobenchStats`] (the serve histogram
598/// is per-phase data — the whole-run serve distribution is the union of these
599/// per-phase histograms, computed where needed). `serve_lat` is EMPTY in closed
600/// loop: serve latency is measured only under open-loop arrival, so its
601/// percentiles read absent there.
602#[derive(Clone, Debug, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
603pub(crate) struct TaobenchPhaseStats {
604 /// Throughput / hit counters (request- and response-time).
605 pub(crate) counters: TaobenchStats,
606 /// Coordinated-omission serve latency (µs), measured from intended arrival.
607 /// Exposed to test authors only via the derived `taobench_serve_*_us` metric
608 /// keys (the schbench-histogram model), never as a direct field.
609 pub(crate) serve_lat: PlatStats,
610}
611
612impl TaobenchPhaseStats {
613 /// Pool another window: Σ counters ([`TaobenchStats::merge`]) and union the
614 /// serve histograms ([`PlatStats::combine`]) — the per-cgroup / cross-thread
615 /// fold, mirroring the schbench carrier merge.
616 pub(crate) fn merge(&mut self, other: &TaobenchPhaseStats) {
617 self.counters.merge(&other.counters);
618 self.serve_lat.combine(&other.serve_lat);
619 }
620}
621
622/// The engine's return: the whole-run merged COUNTERS ([`TaobenchStats`], the
623/// qps/hit aggregate) and the per-phase-epoch carriers (each counters + the
624/// serve-latency histogram). The whole-run carrier is counters-only — mirroring
625/// the wire carriers (`WorkerReport::taobench_whole` / `CgroupStats::taobench_whole`);
626/// a whole-run serve distribution, when needed (the standalone driver), is the
627/// union of the per-phase `serve_lat` histograms.
628pub(crate) struct TaobenchOutcome {
629 pub whole_run: TaobenchStats,
630 pub phases: Vec<(u32, TaobenchPhaseStats)>,
631}
632
633/// Per-thread accumulation with phase-epoch bucketing: the current epoch's
634/// counters roll into `phases` when the epoch changes (the schbench drain-on-epoch
635/// pattern), and `whole` accumulates the un-bucketed run total. Wall windows are
636/// stamped from `CLOCK_MONOTONIC` at each roll (per-phase segment) and at finalize
637/// (whole-run).
638struct ThreadAccum {
639 cur_epoch: u32,
640 cur: TaobenchPhaseStats,
641 phases: BTreeMap<u32, TaobenchPhaseStats>,
642 /// Whole-run counters only (the qps/hit aggregate). The serve histogram is
643 /// per-phase data (in `cur`/`phases`); the whole-run serve distribution, when
644 /// needed, is the union of the per-phase histograms.
645 whole: TaobenchStats,
646 /// When the current phase segment started (ns).
647 phase_start_ns: u64,
648 /// When this thread started (ns) — the whole-run window start.
649 thread_start_ns: u64,
650}
651
652impl ThreadAccum {
653 fn new(epoch: u32) -> Self {
654 let now = monotonic_nanos();
655 ThreadAccum {
656 cur_epoch: epoch,
657 cur: TaobenchPhaseStats::default(),
658 phases: BTreeMap::new(),
659 whole: TaobenchStats::default(),
660 phase_start_ns: now,
661 thread_start_ns: now,
662 }
663 }
664 /// Roll the current bucket into `phases` (stamping its segment wall) and start
665 /// a fresh one for `epoch`.
666 fn roll_to(&mut self, epoch: u32) {
667 if epoch != self.cur_epoch {
668 let now = monotonic_nanos();
669 let mut cur = std::mem::take(&mut self.cur);
670 cur.counters.elapsed_ns = now.saturating_sub(self.phase_start_ns);
671 self.phases.entry(self.cur_epoch).or_default().merge(&cur);
672 self.cur_epoch = epoch;
673 self.phase_start_ns = now;
674 }
675 }
676 /// Record a lookup at request time in `epoch`.
677 fn record_cmd(&mut self, epoch: u32, hit: bool) {
678 self.roll_to(epoch);
679 self.cur.counters.get_cmds += 1;
680 self.whole.get_cmds += 1;
681 if !hit {
682 self.cur.counters.get_misses += 1;
683 self.whole.get_misses += 1;
684 }
685 }
686 /// Record a completion at response time in `epoch`.
687 fn record_complete(&mut self, epoch: u32, hit: bool) {
688 self.roll_to(epoch);
689 if hit {
690 self.cur.counters.fast_ops += 1;
691 self.whole.fast_ops += 1;
692 } else {
693 self.cur.counters.slow_ops += 1;
694 self.whole.slow_ops += 1;
695 }
696 }
697 /// Record an open-loop serve latency (`ns` from the intended arrival) in
698 /// `epoch`, into the current phase segment's histogram (µs buckets). Closed
699 /// loop never calls this, so the histograms stay empty and the serve-latency
700 /// keys read absent. The whole-run serve distribution is the union of the
701 /// per-phase histograms, so only the per-phase bucket is recorded here.
702 fn record_serve_lat(&mut self, epoch: u32, ns: u64) {
703 self.roll_to(epoch);
704 let us = (ns / 1000).min(u32::MAX as u64) as u32;
705 self.cur.serve_lat.add_lat(us);
706 }
707 /// Flush the last open bucket (stamping its segment wall) and stamp the
708 /// whole-run window.
709 fn finalize(mut self) -> Self {
710 let now = monotonic_nanos();
711 let mut cur = std::mem::take(&mut self.cur);
712 cur.counters.elapsed_ns = now.saturating_sub(self.phase_start_ns);
713 self.phases.entry(self.cur_epoch).or_default().merge(&cur);
714 self.whole.elapsed_ns = now.saturating_sub(self.thread_start_ns);
715 self
716 }
717}
718
719/// Read the current phase epoch (`0` when phases are not tracked).
720fn read_epoch(phase_epoch: Option<&AtomicU32>) -> u32 {
721 phase_epoch.map(|e| e.load(Ordering::Relaxed)).unwrap_or(0)
722}
723
724// ---------------------------------------------------------------------------
725// Engine
726// ---------------------------------------------------------------------------
727
728/// Run the taobench engine until `stop` is set. `progress` counts completed ops
729/// (the live work-unit counter); `phase_epoch` buckets per-phase stats.
730pub(crate) fn run(
731 config: &TaobenchConfig,
732 stop: &AtomicBool,
733 progress: &AtomicU64,
734 phase_epoch: Option<&AtomicU32>,
735) -> TaobenchOutcome {
736 let allowed_cpus = resolve_allowed_cpus();
737 let n_clients = config.resolve_client_threads(allowed_cpus);
738 let n_slow = config.resolve_slow_threads(n_clients);
739 // Open-loop per-client inter-arrival = 1s / (arrival_rate / n_clients) =
740 // 1e9 * n_clients / arrival_rate. `0` arrival_rate ⇒ `0` interval ⇒ closed
741 // loop (issue as fast as completions allow — the legacy behavior). `.max(1)`
742 // keeps a non-zero rate strictly open-loop even at an absurd rate (so the
743 // `interval_ns != 0` mode gate in client_loop never misfires to closed loop).
744 // saturating_mul: an absurd client count (physically unreachable — it would
745 // OOM on thread spawn first) cannot wrap the numerator to a tiny interval, the
746 // same overflow-safe discipline as `TaobenchStats::merge`.
747 let interval_ns: u64 = if config.arrival_rate == 0 {
748 0
749 } else {
750 (1_000_000_000u64.saturating_mul(n_clients as u64) / config.arrival_rate as u64).max(1)
751 };
752 // Resolve the slow-path service-time model once (None = fixed legacy latency;
753 // Some = a per-fetch Pareto draw). Cheap, but kept out of the dispatcher hot
754 // path.
755 let service_pareto = config.resolve_service_pareto();
756
757 // Size the resident object count and the key range. Mean value size pins the
758 // object count for a byte budget; key_range = capacity / target_hit makes a
759 // uniform key stream hit the resident set at ≈ target_hit at equilibrium.
760 let mean_value = mean_value_size();
761 let capacity_bytes = config.cache_capacity_mib.max(1) * 1024 * 1024;
762 let total_objects = (capacity_bytes / mean_value).max(SHARDS);
763 let key_range = ((total_objects as f64) / config.target_hit_fraction()).ceil() as u64;
764
765 let cache = Cache::new(total_objects);
766 // Warm the cache to capacity so the hit ratio starts at the target rather
767 // than climbing from empty (the real warms before measuring). Keys
768 // [0, total_objects) are the initial resident set; the client key stream over
769 // [0, key_range) then hits them at ≈ target_hit and the eviction<->refill
770 // equilibrium holds it there.
771 {
772 let mut warm = Rng::new(0xC0FFEE);
773 for k in 0..total_objects as u64 {
774 cache.fill(k, sample_value_size(&mut warm));
775 }
776 }
777
778 let slow_q = SlowQueue::new();
779 let slots: Vec<Slot> = (0..n_clients).map(|_| Slot::new()).collect();
780 let disp_stop = AtomicBool::new(false);
781
782 let client_accums: Vec<ThreadAccum> = std::thread::scope(|s| {
783 // Dispatchers first so a client miss is served immediately.
784 let dispatchers: Vec<_> = (0..n_slow)
785 .map(|i| {
786 let cache = &cache;
787 let slow_q = &slow_q;
788 let slots = &slots;
789 let disp_stop = &disp_stop;
790 s.spawn(move || {
791 dispatcher_loop(i, config, service_pareto, cache, slow_q, slots, disp_stop)
792 })
793 })
794 .collect();
795
796 let clients: Vec<_> = (0..n_clients)
797 .map(|i| {
798 let cache = &cache;
799 let slow_q = &slow_q;
800 let slot = &slots[i];
801 s.spawn(move || {
802 client_loop(
803 i,
804 stop,
805 progress,
806 phase_epoch,
807 cache,
808 slow_q,
809 slot,
810 key_range,
811 interval_ns,
812 )
813 })
814 })
815 .collect();
816
817 // Join the clients first: once every client has returned (each only
818 // exits at the top of its loop, after its outstanding request — if any —
819 // has been served), NO further misses can be enqueued. Only then signal
820 // the dispatchers, so a dispatcher can never exit while a client is still
821 // blocked on an unserved miss.
822 let accums: Vec<ThreadAccum> = clients
823 .into_iter()
824 .map(|c| c.join().expect("taobench client panicked"))
825 .collect();
826 disp_stop.store(true, Ordering::Release);
827 slow_q.cv.notify_all();
828 for d in dispatchers {
829 d.join().expect("taobench dispatcher panicked");
830 }
831 accums
832 });
833
834 // Reduce: merge per-epoch and whole-run across clients. Each ThreadAccum
835 // stamped its per-phase segment walls + its whole-run window in finalize().
836 let mut all_phases: BTreeMap<u32, TaobenchPhaseStats> = BTreeMap::new();
837 let mut whole = TaobenchStats::default();
838 for accum in client_accums {
839 for (e, s) in accum.phases {
840 all_phases.entry(e).or_default().merge(&s);
841 }
842 whole.merge(&accum.whole);
843 }
844
845 TaobenchOutcome {
846 whole_run: whole,
847 phases: all_phases.into_iter().collect(),
848 }
849}
850
851/// One client: pick a key, look it up, serve a hit inline (touch + fast op) or
852/// hand a miss to a dispatcher and block until filled (slow op). `interval_ns ==
853/// 0` is CLOSED loop (issue the next request as soon as the prior completes — the
854/// legacy behavior). `interval_ns > 0` is OPEN loop: requests are due on a fixed
855/// `interval_ns` cadence independent of completion (the first one interval after
856/// the loop start `t0`), and serve latency is measured from the INTENDED arrival
857/// (coordinated-omission) so a backlog from slow service is folded into the late
858/// requests' latency instead of omitted.
859#[allow(clippy::too_many_arguments)]
860fn client_loop(
861 id: usize,
862 stop: &AtomicBool,
863 progress: &AtomicU64,
864 phase_epoch: Option<&AtomicU32>,
865 cache: &Cache,
866 slow_q: &SlowQueue,
867 slot: &Slot,
868 key_range: u64,
869 interval_ns: u64,
870) -> ThreadAccum {
871 let mut rng = Rng::new(0x5EED_0000 ^ (id as u64).wrapping_mul(0x9E37_79B1));
872 let mut accum = ThreadAccum::new(read_epoch(phase_epoch));
873 let open_loop = interval_ns != 0;
874 // Open-loop schedule cursor, advanced one `interval` before each request: the
875 // first request is due at `t0 + interval`, then every `interval` thereafter.
876 let mut intended = monotonic_nanos();
877
878 'client: while !stop.load(Ordering::Acquire) {
879 if open_loop {
880 // Pace to the next scheduled arrival; if behind (overload), the wait
881 // collapses to zero and the gap is captured as serve latency below.
882 // Each sleep is capped at STOP_POLL_QUANTUM_NS so a shutdown set during
883 // a long inter-arrival gap is observed within the quantum (not after a
884 // full interval) — exiting promptly via `break 'client` to `finalize`,
885 // bounding client-join teardown regardless of `arrival_rate`.
886 intended = intended.saturating_add(interval_ns);
887 loop {
888 if stop.load(Ordering::Acquire) {
889 break 'client;
890 }
891 let now = monotonic_nanos();
892 if now >= intended {
893 break;
894 }
895 let nap = (intended - now).min(STOP_POLL_QUANTUM_NS);
896 std::thread::sleep(std::time::Duration::from_nanos(nap));
897 }
898 }
899 let epoch_cmd = read_epoch(phase_epoch);
900 let key = rng.below(key_range);
901 let hit = cache.get_touch(key);
902 accum.record_cmd(epoch_cmd, hit);
903
904 if hit {
905 // Hit completes inline; response epoch == request epoch.
906 accum.record_complete(epoch_cmd, true);
907 if open_loop {
908 let lat = monotonic_nanos().saturating_sub(intended);
909 accum.record_serve_lat(epoch_cmd, lat);
910 }
911 } else {
912 // Miss: hand to a dispatcher and block until it fills + wakes us.
913 // The dispatcher always serves us before `disp_stop` is set (which
914 // happens only after every client has joined), so this never blocks
915 // past shutdown.
916 slow_q.push(SlowReq { key, client: id });
917 slot.wait_filled();
918 let resp_epoch = read_epoch(phase_epoch);
919 accum.record_complete(resp_epoch, false);
920 if open_loop {
921 // Completion timestamp taken by the CLIENT after wait_filled, so
922 // serve latency covers the full slow_q queueing + dispatcher
923 // service + wakeup, all measured from the intended arrival.
924 let lat = monotonic_nanos().saturating_sub(intended);
925 accum.record_serve_lat(resp_epoch, lat);
926 }
927 }
928 progress.fetch_add(1, Ordering::Relaxed);
929 }
930 accum.finalize()
931}
932
933/// One slow dispatcher: pull a miss, perform the simulated backing-store fetch,
934/// fill the cache, and wake the waiting client. Drains any queued misses before
935/// exiting on `disp_stop`. `service_pareto` is the resolved heavy-tail model
936/// (`None` = fixed `slow_path_sleep_us`); the per-fetch draw is deterministic from
937/// this dispatcher's seeded `rng` (the same one that picks the fill value size).
938fn dispatcher_loop(
939 id: usize,
940 config: &TaobenchConfig,
941 service_pareto: Option<(f64, f64)>,
942 cache: &Cache,
943 slow_q: &SlowQueue,
944 slots: &[Slot],
945 disp_stop: &AtomicBool,
946) {
947 let mut rng = Rng::new(0xD15A_0000 ^ (id as u64).wrapping_mul(0x9E37_79B1));
948 loop {
949 let req = {
950 let mut q = slow_q.q.lock().expect("slow queue poisoned");
951 loop {
952 if let Some(r) = q.pop_front() {
953 break Some(r);
954 }
955 if disp_stop.load(Ordering::Acquire) {
956 break None;
957 }
958 q = slow_q.cv.wait(q).expect("slow queue poisoned");
959 }
960 };
961 let Some(req) = req else { break };
962
963 // Heavy-tailed Pareto draw when configured, else the fixed latency. The
964 // draw precedes the value-size draw, so both come off this dispatcher's
965 // deterministic seeded stream.
966 let usec = match service_pareto {
967 Some((scale, inv_neg_alpha)) => sample_service_us(&mut rng, scale, inv_neg_alpha),
968 None => config.slow_path_sleep_us,
969 };
970 backing_store_fetch(usec);
971 cache.fill(req.key, sample_value_size(&mut rng));
972 slots[req.client].signal();
973 }
974}
975
976// ---------------------------------------------------------------------------
977// Topology + size helpers
978// ---------------------------------------------------------------------------
979
980/// The mean of the value-size distribution (for sizing the object count).
981fn mean_value_size() -> usize {
982 let total_w: u64 = VALUE_WEIGHTS.iter().map(|&w| w as u64).sum();
983 let weighted: u64 = VALUE_SIZES
984 .iter()
985 .zip(VALUE_WEIGHTS.iter())
986 .map(|(&s, &w)| s as u64 * w as u64)
987 .sum();
988 (weighted / total_w).max(1) as usize
989}
990
991/// Resolve the number of CPUs the worker is allowed to run on (its affinity
992/// mask), falling back to the online CPU count. Mirrors the schbench engine's
993/// cpuset-scoped resolution.
994fn resolve_allowed_cpus() -> usize {
995 // SAFETY: `sched_getaffinity` writes the calling thread's CPU mask into the
996 // provided cpu_set; reads nothing else.
997 unsafe {
998 let mut set: libc::cpu_set_t = core::mem::zeroed();
999 if libc::sched_getaffinity(0, core::mem::size_of::<libc::cpu_set_t>(), &mut set) == 0 {
1000 let n = libc::CPU_COUNT(&set);
1001 if n > 0 {
1002 return n as usize;
1003 }
1004 }
1005 }
1006 let n = unsafe { libc::sysconf(libc::_SC_NPROCESSORS_ONLN) };
1007 if n > 0 { n as usize } else { 1 }
1008}
1009
1010// ---------------------------------------------------------------------------
1011// Host-side standalone runner (validation driver entry)
1012// ---------------------------------------------------------------------------
1013
1014/// Host-side standalone run of the taobench engine for `run_secs`, returning a
1015/// summary report — the analog of schbench's `run_standalone`, backing the
1016/// `ktstr-taobench-validate` driver for the side-by-side comparison against the
1017/// reference taobench. NOT used in-VM (the scenario engine drives `run` there).
1018///
1019/// The engine warms the cache synchronously before the client threads start; the
1020/// report's `elapsed_secs` is the engine-measured CLIENT window (post-warmup), so
1021/// the qps figures are steady-state, not diluted by warmup.
1022pub fn run_standalone(config: &TaobenchConfig, run_secs: u64) -> TaobenchStandaloneReport {
1023 let stop = AtomicBool::new(false);
1024 let progress = AtomicU64::new(0);
1025 let allowed_cpus = resolve_allowed_cpus();
1026 let nr_client_threads = config.resolve_client_threads(allowed_cpus);
1027 let nr_slow_threads = config.resolve_slow_threads(nr_client_threads);
1028 let outcome = std::thread::scope(|s| {
1029 let h = s.spawn(|| run(config, &stop, &progress, None));
1030 // The benchmark run window — a workload-defined duration (the same model
1031 // as the scenario engine's hold step), not a synchronization wait.
1032 std::thread::sleep(std::time::Duration::from_secs(run_secs));
1033 stop.store(true, Ordering::Release);
1034 h.join().expect("taobench standalone run panicked")
1035 });
1036 // Whole-run serve-latency distribution = union of the per-phase histograms
1037 // (the whole-run carrier is counters-only). Empty in closed loop, so the
1038 // report's serve percentiles read absent there.
1039 let mut serve = PlatStats::default();
1040 for (_epoch, p) in &outcome.phases {
1041 serve.combine(&p.serve_lat);
1042 }
1043 TaobenchStandaloneReport::from_run(
1044 &outcome.whole_run,
1045 &serve,
1046 nr_client_threads,
1047 nr_slow_threads,
1048 )
1049}
1050
1051/// Summary of a [`run_standalone`] run — the headline taobench metrics in the
1052/// shape the reference taobench server reports (`fast_qps` / `hit_rate` /
1053/// `slow_qps`) plus the derived `total_qps` (= fast + slow) and `hit_ratio`
1054/// (= fast / total). Under open-loop arrival (`arrival_rate > 0`) it also carries
1055/// the coordinated-omission serve-latency percentiles; these are `None` in closed
1056/// loop (no intended-arrival schedule, so no serve latency is measured).
1057#[derive(Debug, Clone, Copy, PartialEq)]
1058pub struct TaobenchStandaloneReport {
1059 /// (fast + slow) ops per second over the measured window.
1060 pub total_qps: f64,
1061 /// Hits served per second.
1062 pub fast_qps: f64,
1063 /// Misses served (slow path) per second.
1064 pub slow_qps: f64,
1065 /// Response-time hit ratio: fast / (fast + slow).
1066 pub hit_ratio: f64,
1067 /// Command-time hit rate: 1 - get_misses / get_cmds.
1068 pub hit_rate: f64,
1069 /// Completed ops (fast + slow).
1070 pub total_ops: u64,
1071 /// Hits served.
1072 pub fast_ops: u64,
1073 /// Misses served via the slow path.
1074 pub slow_ops: u64,
1075 /// The engine-measured client window, seconds.
1076 pub elapsed_secs: f64,
1077 /// Resolved client thread count.
1078 pub nr_client_threads: usize,
1079 /// Resolved slow dispatcher count.
1080 pub nr_slow_threads: usize,
1081 /// Open-loop coordinated-omission serve-latency percentiles (µs), measured
1082 /// from intended arrival. `None` in closed loop / when no serve samples were
1083 /// recorded (matching the metric-key ABSENT discipline).
1084 pub serve_p50_us: Option<u32>,
1085 /// Open-loop serve-latency p99 (µs) — the headline coordinated-omission tail.
1086 pub serve_p99_us: Option<u32>,
1087 /// Open-loop serve-latency p99.9 (µs).
1088 pub serve_p999_us: Option<u32>,
1089 /// Open-loop serve-latency minimum sample (µs).
1090 pub serve_min_us: Option<u32>,
1091 /// Open-loop serve-latency maximum sample (µs).
1092 pub serve_max_us: Option<u32>,
1093}
1094
1095impl TaobenchStandaloneReport {
1096 fn from_run(
1097 w: &TaobenchStats,
1098 serve: &PlatStats,
1099 nr_client_threads: usize,
1100 nr_slow_threads: usize,
1101 ) -> Self {
1102 use crate::workload::schbench::plat::Pct;
1103 let secs = (w.elapsed_ns as f64 / 1e9).max(f64::MIN_POSITIVE);
1104 let total = w.total_ops();
1105 // Serve-latency percentiles are present iff open-loop arrival recorded
1106 // samples; closed loop leaves the histogram empty, so all fields read
1107 // None (the metric-key ABSENT discipline).
1108 let serve_q = (serve.sample_count() > 0).then(|| serve.percentiles());
1109 TaobenchStandaloneReport {
1110 total_qps: total as f64 / secs,
1111 fast_qps: w.fast_ops as f64 / secs,
1112 slow_qps: w.slow_ops as f64 / secs,
1113 hit_ratio: if total > 0 {
1114 w.fast_ops as f64 / total as f64
1115 } else {
1116 0.0
1117 },
1118 hit_rate: if w.get_cmds > 0 {
1119 1.0 - (w.get_misses as f64 / w.get_cmds as f64)
1120 } else {
1121 0.0
1122 },
1123 total_ops: total,
1124 fast_ops: w.fast_ops,
1125 slow_ops: w.slow_ops,
1126 elapsed_secs: secs,
1127 nr_client_threads,
1128 nr_slow_threads,
1129 serve_p50_us: serve_q.as_ref().map(|q| q.value_at(Pct::P50)),
1130 serve_p99_us: serve_q.as_ref().map(|q| q.value_at(Pct::P99)),
1131 serve_p999_us: serve_q.as_ref().map(|q| q.value_at(Pct::P999)),
1132 serve_min_us: serve_q.as_ref().map(|q| q.min),
1133 serve_max_us: serve_q.as_ref().map(|q| q.max),
1134 }
1135 }
1136}
1137
1138#[cfg(test)]
1139mod tests {
1140 use super::*;
1141
1142 #[test]
1143 fn value_size_distribution_mean_is_small_object_heavy() {
1144 // The representative distribution is small-object-heavy (mean well under
1145 // 1 KiB) with a tail to 64 KiB.
1146 let m = mean_value_size();
1147 assert!(
1148 (200..=500).contains(&m),
1149 "mean value size {m} B is small-object-heavy"
1150 );
1151 assert_eq!(*VALUE_SIZES.last().unwrap(), 65536, "tail reaches 64 KiB");
1152 }
1153
1154 #[test]
1155 fn taobench_stats_merge_and_total_ops_saturate_on_overflow() {
1156 // Pooling guest-runtime op counters must saturate, not wrap: a corrupt
1157 // u64::MAX component must never produce a silently-wrong (wrapped-small)
1158 // qps/hit Rate. Saturating is exact for all in-range data.
1159 let mut a = TaobenchStats {
1160 get_cmds: u64::MAX,
1161 get_misses: u64::MAX,
1162 fast_ops: u64::MAX,
1163 slow_ops: 1,
1164 elapsed_ns: 1000,
1165 };
1166 let b = TaobenchStats {
1167 get_cmds: 5,
1168 get_misses: 5,
1169 fast_ops: 5,
1170 slow_ops: 5,
1171 elapsed_ns: 9000,
1172 };
1173 a.merge(&b);
1174 assert_eq!(a.get_cmds, u64::MAX, "saturates, not wraps to 4");
1175 assert_eq!(a.get_misses, u64::MAX);
1176 assert_eq!(a.fast_ops, u64::MAX);
1177 assert_eq!(a.slow_ops, 6);
1178 assert_eq!(a.elapsed_ns, 9000, "wall window is MAX, not summed");
1179 // total_ops = fast_ops + slow_ops saturates (u64::MAX + 6 -> u64::MAX).
1180 assert_eq!(a.total_ops(), u64::MAX, "total_ops saturates");
1181 }
1182
1183 #[test]
1184 fn engine_serves_ops_and_hit_ratio_settles_near_target_not_one() {
1185 // A short run on a small cache: the hit ratio must settle near the target
1186 // (the eviction<->refill equilibrium), NOT drift to 1.0 (which a
1187 // self-healing cache with no eviction would do).
1188 let cfg = TaobenchConfig::default()
1189 .client_threads(4)
1190 .slow_threads(2)
1191 .cache_capacity_mib(8)
1192 .target_hit_pct(90)
1193 .slow_path_sleep_us(10);
1194 let stop = AtomicBool::new(false);
1195 let progress = AtomicU64::new(0);
1196
1197 std::thread::scope(|s| {
1198 let h = s.spawn(|| run(&cfg, &stop, &progress, None));
1199 // Spin until the engine has served enough ops to reach equilibrium.
1200 while progress.load(Ordering::Relaxed) < 100_000 {
1201 std::hint::spin_loop();
1202 }
1203 stop.store(true, Ordering::Release);
1204 let out = h.join().expect("engine panicked");
1205
1206 let total = out.whole_run.total_ops();
1207 assert!(total > 0, "engine served ops");
1208 assert!(out.whole_run.elapsed_ns > 0, "the run window was measured");
1209
1210 let hit = out.whole_run.fast_ops as f64 / total as f64;
1211 assert!(
1212 (0.80..=0.97).contains(&hit),
1213 "hit ratio {hit} settles near target 0.90 (eviction equilibrium), not 1.0"
1214 );
1215 assert!(
1216 out.whole_run.slow_ops > 0,
1217 "the slow/miss path is exercised"
1218 );
1219 });
1220 }
1221
1222 #[test]
1223 fn taobench_config_serde_roundtrips() {
1224 // The new serialized type roundtrips unchanged.
1225 // Every field set to a non-default value to exercise the full serde surface.
1226 let cfg = TaobenchConfig::default()
1227 .client_threads(8)
1228 .slow_threads(3)
1229 .cache_capacity_mib(128)
1230 .target_hit_pct(85)
1231 .slow_path_sleep_us(250)
1232 .slow_path_p99_us(2500)
1233 .arrival_rate(50_000);
1234 let json = serde_json::to_string(&cfg).expect("TaobenchConfig must serialize");
1235 let back: TaobenchConfig =
1236 serde_json::from_str(&json).expect("TaobenchConfig must deserialize");
1237 assert_eq!(cfg, back, "config roundtrips unchanged");
1238 }
1239
1240 #[test]
1241 fn worktype_taobench_registration_and_serde() {
1242 use crate::workload::WorkType;
1243 let wt = WorkType::taobench(
1244 TaobenchConfig::default()
1245 .client_threads(4)
1246 .target_hit_pct(95),
1247 );
1248 assert_eq!(wt.name(), "Taobench");
1249 // from_name yields the default-config variant.
1250 assert_eq!(
1251 WorkType::from_name("Taobench"),
1252 Some(WorkType::Taobench {
1253 config: TaobenchConfig::default()
1254 })
1255 );
1256 // The variant serde-roundtrips, carrying its config.
1257 let json = serde_json::to_string(&wt).expect("WorkType::Taobench must serialize");
1258 let back: WorkType =
1259 serde_json::from_str(&json).expect("WorkType::Taobench must deserialize");
1260 assert_eq!(wt, back);
1261 }
1262
1263 #[test]
1264 fn taobench_config_reachable_via_prelude() {
1265 // Regression-pin the prelude placement: test authors construct the config
1266 // via `use ktstr::prelude::*`. Dropping TaobenchConfig from the prelude
1267 // would fail this compile. Also exercises the Eq derive.
1268 let cfg: crate::prelude::TaobenchConfig = crate::prelude::TaobenchConfig::default();
1269 assert_eq!(cfg, TaobenchConfig::default());
1270 }
1271
1272 #[test]
1273 fn taobench_stats_serde_roundtrips() {
1274 // TaobenchStats is a pub serialized type that rides real wires — a field
1275 // on WorkerReport (the worker→host postcard payload) and CgroupStats (the
1276 // sidecar JSON) — so a field-rename or serde-attr drift is a silent
1277 // data-loss risk this pins. Every field distinct +
1278 // non-zero to exercise the full serde surface.
1279 let s = TaobenchStats {
1280 get_cmds: 1000,
1281 get_misses: 150,
1282 fast_ops: 850,
1283 slow_ops: 150,
1284 elapsed_ns: 9_000_000_000,
1285 };
1286 let json = serde_json::to_string(&s).expect("TaobenchStats must serialize");
1287 let back: TaobenchStats =
1288 serde_json::from_str(&json).expect("TaobenchStats must deserialize");
1289 assert_eq!(s, back, "TaobenchStats roundtrips unchanged");
1290 }
1291
1292 #[test]
1293 fn taobench_stats_reachable_via_prelude() {
1294 // Regression-pin the prelude placement: TaobenchStats is read off the
1295 // preluded WorkerReport/CgroupStats `taobench_whole` fields, so it must be
1296 // nameable via `use ktstr::prelude::*`. Dropping it from the prelude would
1297 // fail this compile. Also exercises Default + Eq.
1298 let s: crate::prelude::TaobenchStats = crate::prelude::TaobenchStats::default();
1299 assert_eq!(s, TaobenchStats::default());
1300 }
1301
1302 #[test]
1303 fn taobench_config_arrival_rate_default_and_setter() {
1304 // Default is closed loop (no open-loop schedule); the setter sets the
1305 // aggregate ops/sec field.
1306 assert_eq!(
1307 TaobenchConfig::default().arrival_rate,
1308 0,
1309 "default arrival_rate is 0 (closed loop)"
1310 );
1311 let cfg = TaobenchConfig::default().arrival_rate(100_000);
1312 assert_eq!(cfg.arrival_rate, 100_000, "setter sets arrival_rate");
1313 }
1314
1315 #[test]
1316 fn taobench_phase_stats_merge_sums_counters_and_unions_serve_lat() {
1317 // The carrier merge pools counters (Σ ops, MAX wall — TaobenchStats::merge)
1318 // AND unions the serve histograms (PlatStats::combine) — the per-cgroup /
1319 // cross-thread fold.
1320 let mut a = TaobenchPhaseStats {
1321 counters: TaobenchStats {
1322 get_cmds: 10,
1323 get_misses: 2,
1324 fast_ops: 8,
1325 slow_ops: 2,
1326 elapsed_ns: 1000,
1327 },
1328 ..Default::default()
1329 };
1330 a.serve_lat.add_lat(5);
1331 a.serve_lat.add_lat(50);
1332 let mut b = TaobenchPhaseStats {
1333 counters: TaobenchStats {
1334 get_cmds: 6,
1335 get_misses: 1,
1336 fast_ops: 5,
1337 slow_ops: 1,
1338 elapsed_ns: 4000,
1339 },
1340 ..Default::default()
1341 };
1342 b.serve_lat.add_lat(500);
1343 a.merge(&b);
1344 assert_eq!(a.counters.get_cmds, 16, "Σ get_cmds");
1345 assert_eq!(a.counters.fast_ops, 13, "Σ fast_ops");
1346 assert_eq!(a.counters.slow_ops, 3, "Σ slow_ops");
1347 assert_eq!(a.counters.elapsed_ns, 4000, "wall is MAX, not summed");
1348 assert_eq!(
1349 a.serve_lat.sample_count(),
1350 3,
1351 "serve histograms union (2 + 1 samples)"
1352 );
1353 }
1354
1355 #[test]
1356 fn taobench_phase_stats_serde_roundtrips() {
1357 // TaobenchPhaseStats rides the WorkerReport postcard + CgroupStats sidecar
1358 // JSON wires; both the counters and the serve histogram must roundtrip.
1359 let mut s = TaobenchPhaseStats {
1360 counters: TaobenchStats {
1361 get_cmds: 1000,
1362 get_misses: 150,
1363 fast_ops: 850,
1364 slow_ops: 150,
1365 elapsed_ns: 9_000_000_000,
1366 },
1367 ..Default::default()
1368 };
1369 s.serve_lat.add_lat(12);
1370 s.serve_lat.add_lat(340);
1371 s.serve_lat.add_lat(99_999);
1372 let json = serde_json::to_string(&s).expect("TaobenchPhaseStats must serialize");
1373 let back: TaobenchPhaseStats =
1374 serde_json::from_str(&json).expect("TaobenchPhaseStats must deserialize");
1375 assert_eq!(
1376 s, back,
1377 "TaobenchPhaseStats roundtrips unchanged (counters + serve_lat)"
1378 );
1379 }
1380
1381 #[test]
1382 fn open_loop_stamps_serve_latency_on_every_completion() {
1383 // Open loop (non-zero arrival_rate) measures coordinated-omission serve
1384 // latency: every completion — a hit (served inline) AND a miss (served via
1385 // the slow path) — records exactly one sample from its intended arrival, so
1386 // the histogram count equals total_ops. An absurdly high rate keeps the
1387 // pacing in permanent overload (the per-arrival sleep collapses to zero), so
1388 // the test runs at full speed while still exercising stamping on both arms.
1389 let cfg = TaobenchConfig::default()
1390 .client_threads(4)
1391 .slow_threads(2)
1392 .cache_capacity_mib(8)
1393 .target_hit_pct(90)
1394 .slow_path_sleep_us(10)
1395 .arrival_rate(1_000_000_000);
1396 let stop = AtomicBool::new(false);
1397 let progress = AtomicU64::new(0);
1398 let out = std::thread::scope(|s| {
1399 let h = s.spawn(|| run(&cfg, &stop, &progress, None));
1400 while progress.load(Ordering::Relaxed) < 50_000 {
1401 std::hint::spin_loop();
1402 }
1403 stop.store(true, Ordering::Release);
1404 h.join().expect("engine panicked")
1405 });
1406 let total = out.whole_run.total_ops();
1407 assert!(total > 0, "engine served ops");
1408 assert!(out.whole_run.slow_ops > 0, "the miss arm is exercised");
1409 // The whole-run serve distribution is the union of the per-phase
1410 // histograms (the whole-run carrier is counters-only). Its count equals
1411 // total_ops: every completion (hit inline + miss after wait_filled)
1412 // records exactly one serve-latency sample.
1413 let mut serve = PlatStats::default();
1414 for (_epoch, p) in &out.phases {
1415 serve.combine(&p.serve_lat);
1416 }
1417 assert_eq!(
1418 serve.sample_count(),
1419 total,
1420 "every completion (hit + miss) records one serve-latency sample"
1421 );
1422 }
1423
1424 #[test]
1425 fn closed_loop_records_no_serve_latency() {
1426 // arrival_rate == 0 is closed loop: there is no intended-arrival schedule,
1427 // so serve latency is undefined and the histogram stays EMPTY (the
1428 // taobench_serve_*_us keys then read absent, distinct from a measured 0).
1429 // The heavy tail is ALSO enabled (slow_path_p99_us > slow_path_sleep_us) to
1430 // pin that enabling the tail never leaks a serve sample without an open-loop
1431 // schedule — the tail only affects fetch latency, not whether serve latency
1432 // is measured.
1433 let cfg = TaobenchConfig::default()
1434 .client_threads(4)
1435 .slow_threads(2)
1436 .cache_capacity_mib(8)
1437 .target_hit_pct(90)
1438 .slow_path_sleep_us(10)
1439 .slow_path_p99_us(1000); // tail enabled; arrival_rate defaults to 0 (closed loop)
1440 let stop = AtomicBool::new(false);
1441 let progress = AtomicU64::new(0);
1442 let out = std::thread::scope(|s| {
1443 let h = s.spawn(|| run(&cfg, &stop, &progress, None));
1444 while progress.load(Ordering::Relaxed) < 20_000 {
1445 std::hint::spin_loop();
1446 }
1447 stop.store(true, Ordering::Release);
1448 h.join().expect("engine panicked")
1449 });
1450 assert!(out.whole_run.total_ops() > 0, "ops completed");
1451 for (epoch, p) in &out.phases {
1452 assert_eq!(
1453 p.serve_lat.sample_count(),
1454 0,
1455 "closed loop records no per-phase serve latency (epoch {epoch})"
1456 );
1457 }
1458 }
1459
1460 #[test]
1461 fn open_loop_per_phase_carrier_holds_serve_latency() {
1462 // The per-phase carrier (PhaseSlice.taobench) carries the serve histogram
1463 // (the whole-run carrier is counters-only). With a single static epoch
1464 // every completion's sample lands in that epoch's bucket, so the per-phase
1465 // serve count equals the whole-run completion count (total_ops).
1466 let cfg = TaobenchConfig::default()
1467 .client_threads(4)
1468 .slow_threads(2)
1469 .cache_capacity_mib(8)
1470 .target_hit_pct(90)
1471 .slow_path_sleep_us(10)
1472 .arrival_rate(1_000_000_000);
1473 let stop = AtomicBool::new(false);
1474 let progress = AtomicU64::new(0);
1475 let epoch = AtomicU32::new(7);
1476 let out = std::thread::scope(|s| {
1477 let h = s.spawn(|| run(&cfg, &stop, &progress, Some(&epoch)));
1478 while progress.load(Ordering::Relaxed) < 30_000 {
1479 std::hint::spin_loop();
1480 }
1481 stop.store(true, Ordering::Release);
1482 h.join().expect("engine panicked")
1483 });
1484 let phase7 = out
1485 .phases
1486 .iter()
1487 .find(|(e, _)| *e == 7)
1488 .map(|(_, p)| p)
1489 .expect("epoch-7 phase carrier present");
1490 assert!(
1491 phase7.serve_lat.sample_count() > 0,
1492 "per-phase serve latency recorded"
1493 );
1494 assert_eq!(
1495 phase7.serve_lat.sample_count(),
1496 out.whole_run.total_ops(),
1497 "single-epoch run: every completion stamps one per-phase serve sample"
1498 );
1499 }
1500
1501 #[test]
1502 fn open_loop_pacing_sleep_is_bounded_for_prompt_shutdown() {
1503 // A degenerate low arrival_rate makes the inter-arrival interval ~1s, but a
1504 // client waiting for its next scheduled arrival must observe `stop` within
1505 // STOP_POLL_QUANTUM_NS (50 ms) and exit, so the join completes well under
1506 // one interval. An unbounded pacing sleep would block the join for the full
1507 // ~1s. The 500 ms bound is 10x the quantum (robust to scheduling jitter)
1508 // and half the interval (so it cleanly distinguishes bounded from
1509 // unbounded).
1510 let cfg = TaobenchConfig::default()
1511 .client_threads(1)
1512 .slow_threads(1)
1513 .cache_capacity_mib(4)
1514 .target_hit_pct(90)
1515 .slow_path_sleep_us(0)
1516 .arrival_rate(1); // interval_ns = 1e9 * 1 / 1 = 1s
1517 let stop = AtomicBool::new(false);
1518 let progress = AtomicU64::new(0);
1519 let start = std::time::Instant::now();
1520 std::thread::scope(|s| {
1521 let h = s.spawn(|| run(&cfg, &stop, &progress, None));
1522 // Let the client reach its first (long) pacing sleep, then shut down.
1523 std::thread::sleep(std::time::Duration::from_millis(20));
1524 stop.store(true, Ordering::Release);
1525 let _ = h.join().expect("engine panicked");
1526 });
1527 let elapsed = start.elapsed();
1528 assert!(
1529 elapsed < std::time::Duration::from_millis(500),
1530 "open-loop client joined in {elapsed:?}; the pacing sleep must be \
1531 bounded (interval is ~1s — an unbounded sleep would block the join)"
1532 );
1533 }
1534
1535 #[test]
1536 fn taobench_config_slow_path_p99_default_and_setter() {
1537 // Default keeps the legacy fixed-latency path (no tail); the setter sets the
1538 // p99 tail target.
1539 assert_eq!(
1540 TaobenchConfig::default().slow_path_p99_us,
1541 0,
1542 "default slow_path_p99_us is 0 (fixed latency)"
1543 );
1544 let cfg = TaobenchConfig::default().slow_path_p99_us(5000);
1545 assert_eq!(cfg.slow_path_p99_us, 5000, "setter sets slow_path_p99_us");
1546 }
1547
1548 #[test]
1549 fn taobench_service_pareto_mapping_and_legacy_gate() {
1550 // The (p50, p99) knob maps to a Pareto whose analytic quantiles round-trip:
1551 // alpha = ln(50)/ln(p99/p50); scale = p50 * 0.5^(1/alpha).
1552 let cfg = TaobenchConfig::default()
1553 .slow_path_sleep_us(100)
1554 .slow_path_p99_us(1000);
1555 let (scale, inv_neg_alpha) = cfg.resolve_service_pareto().expect("tail enabled");
1556 let alpha = -1.0 / inv_neg_alpha;
1557 assert!(
1558 (alpha - 50.0_f64.ln() / 10.0_f64.ln()).abs() < 1e-9,
1559 "alpha = ln(50)/ln(10), got {alpha}",
1560 );
1561 // The Pareto q-quantile scale*(1-q)^(-1/alpha) round-trips the config: the
1562 // median quantile is p50, the 99th percentile is p99.
1563 let median = scale * 0.5_f64.powf(-1.0 / alpha);
1564 let p99 = scale * 0.01_f64.powf(-1.0 / alpha);
1565 assert!(
1566 (median - 100.0).abs() < 1e-6,
1567 "median quantile = p50 (100), got {median}",
1568 );
1569 assert!(
1570 (p99 - 1000.0).abs() < 1e-6,
1571 "p99 quantile = configured (1000), got {p99}",
1572 );
1573 // Legacy gate: p99 == 0, p99 < p50, p99 == p50, and p50 == 0 all resolve to
1574 // the fixed path (None).
1575 let base = TaobenchConfig::default().slow_path_sleep_us(100);
1576 assert!(
1577 base.clone()
1578 .slow_path_p99_us(0)
1579 .resolve_service_pareto()
1580 .is_none(),
1581 "p99 == 0 is fixed",
1582 );
1583 assert!(
1584 base.clone()
1585 .slow_path_p99_us(50)
1586 .resolve_service_pareto()
1587 .is_none(),
1588 "p99 < p50 is fixed",
1589 );
1590 assert!(
1591 base.clone()
1592 .slow_path_p99_us(100)
1593 .resolve_service_pareto()
1594 .is_none(),
1595 "p99 == p50 is fixed",
1596 );
1597 assert!(
1598 TaobenchConfig::default()
1599 .slow_path_sleep_us(0)
1600 .slow_path_p99_us(1000)
1601 .resolve_service_pareto()
1602 .is_none(),
1603 "p50 == 0 has no Pareto scale",
1604 );
1605 }
1606
1607 #[test]
1608 fn taobench_service_sampler_matches_configured_quantiles() {
1609 // Drawing many samples from the resolved Pareto through the actual
1610 // measurement path (PlatStats) yields a histogram whose p50/p99 land near
1611 // the configured (100, 1000) µs. Deterministic (fixed seed); the bands
1612 // absorb PlatStats log-bucketing + sampling variance.
1613 use crate::workload::schbench::plat::Pct;
1614 let cfg = TaobenchConfig::default()
1615 .slow_path_sleep_us(100)
1616 .slow_path_p99_us(1000);
1617 let (scale, inv_neg_alpha) = cfg.resolve_service_pareto().unwrap();
1618 let mut rng = Rng::new(0x5A3C_1234);
1619 let mut plat = PlatStats::default();
1620 for _ in 0..200_000 {
1621 let us = sample_service_us(&mut rng, scale, inv_neg_alpha);
1622 plat.add_lat(us.min(u32::MAX as u64) as u32);
1623 }
1624 let q = plat.percentiles();
1625 let p50 = q.value_at(Pct::P50);
1626 let p99 = q.value_at(Pct::P99);
1627 assert!(
1628 (90..=115).contains(&p50),
1629 "empirical p50 {p50} ≈ configured 100µs (tight band: rejects a sampler \
1630 bit-shift / lost-scale bug, absorbs only log-bucketing + seed noise)",
1631 );
1632 assert!(
1633 (850..=1200).contains(&p99),
1634 "empirical p99 {p99} ≈ configured 1000µs (tight band: rejects a ~±50% \
1635 sampler error, absorbs only log-bucketing + seed noise)",
1636 );
1637 }
1638
1639 #[test]
1640 fn taobench_service_sampler_clamps_pathological_tail() {
1641 // A pathological draw (huge scale, so any U gives a value far above the cap)
1642 // is clamped to MAX_SERVICE_US rather than parking a dispatcher unboundedly.
1643 let mut rng = Rng::new(7);
1644 let v = sample_service_us(&mut rng, 1e12, -0.5);
1645 assert_eq!(v, MAX_SERVICE_US, "huge draw clamps to MAX_SERVICE_US");
1646 }
1647
1648 #[test]
1649 fn taobench_service_sampler_is_deterministic_from_seed() {
1650 // Same seed -> same service-time draw sequence (the determinism the engine
1651 // relies on for reproducible runs).
1652 let cfg = TaobenchConfig::default()
1653 .slow_path_sleep_us(100)
1654 .slow_path_p99_us(1000);
1655 let (scale, inv_neg_alpha) = cfg.resolve_service_pareto().unwrap();
1656 let mut a = Rng::new(42);
1657 let mut b = Rng::new(42);
1658 for _ in 0..1000 {
1659 assert_eq!(
1660 sample_service_us(&mut a, scale, inv_neg_alpha),
1661 sample_service_us(&mut b, scale, inv_neg_alpha),
1662 );
1663 }
1664 }
1665
1666 #[test]
1667 fn rng_f64_open01_is_in_open_closed_unit_interval() {
1668 // f64_open01 must yield U in (0, 1]: 0 is excluded (U^(−1/α) would be
1669 // infinite in the Pareto inverse-CDF), 1 is included (the distribution
1670 // floor). Pin the boundary contract directly over many fixed-seed draws.
1671 let mut rng = Rng::new(0xF00D_BEEF);
1672 for _ in 0..1_000_000 {
1673 let u = rng.f64_open01();
1674 assert!(
1675 u > 0.0 && u <= 1.0,
1676 "f64_open01 yielded {u}, outside (0, 1]"
1677 );
1678 }
1679 }
1680
1681 // The heavy-tail slow-path service model — the Pareto that
1682 // `resolve_service_pareto` derives from the `slow_path_sleep_us` median +
1683 // `slow_path_p99_us` 99th-percentile knobs, sampled by `sample_service_us` —
1684 // reproduces those quantiles and a heavy tail the fixed path lacks.
1685 //
1686 // Deterministic: a fixed-seed PRNG driving the pure sampler, no threads and
1687 // no wall-clock timing, so it pins the heavy-tail LOGIC without the
1688 // host-scheduling jitter a real-timing run carries (a prior version compared
1689 // wall-clock serve MAX across host threads and flaked on loaded CI runners,
1690 // where the control's jitter spike collapsed the ratio). The end-to-end
1691 // engine path — the slow path drawing from this sampler under a live
1692 // open-loop arrival process — is covered by the coordinated in-VM
1693 // `taobench_open_loop_runs_in_vm` e2e (pinned vCPUs, no host jitter).
1694 #[test]
1695 fn heavy_tail_service_model_reproduces_configured_quantiles() {
1696 // A p99 above the median resolves to a Pareto; at/below it (or a zero
1697 // median) stays on the fixed-latency path.
1698 let heavy = TaobenchConfig::default()
1699 .slow_path_sleep_us(100)
1700 .slow_path_p99_us(2000);
1701 let (scale, inv_neg_alpha) = heavy
1702 .resolve_service_pareto()
1703 .expect("p99 2000µs above median 100µs resolves to a Pareto tail");
1704 for (p50, p99, why) in [
1705 (100u64, 0u64, "p99 == 0 keeps the fixed path"),
1706 (100, 100, "p99 == median is not a tail"),
1707 (0, 2000, "zero median has no Pareto scale"),
1708 ] {
1709 assert!(
1710 TaobenchConfig::default()
1711 .slow_path_sleep_us(p50)
1712 .slow_path_p99_us(p99)
1713 .resolve_service_pareto()
1714 .is_none(),
1715 "{why}",
1716 );
1717 }
1718
1719 // Fixed-seed draws reproduce the configured quantiles + a heavy tail.
1720 let mut rng = Rng::new(0x00C0_FFEE);
1721 let mut samples: Vec<u64> = (0..200_000)
1722 .map(|_| sample_service_us(&mut rng, scale, inv_neg_alpha))
1723 .collect();
1724 samples.sort_unstable();
1725 let q = |p: f64| samples[((samples.len() as f64 * p) as usize).min(samples.len() - 1)];
1726 let (p50, p99, max) = (q(0.50), q(0.99), *samples.last().unwrap());
1727 // Median ~100µs and p99 ~2000µs (the configured knobs), within a
1728 // generous band only a broken Pareto mapping would miss.
1729 assert!(
1730 (50..=200).contains(&p50),
1731 "empirical median {p50}µs should track the configured 100µs",
1732 );
1733 assert!(
1734 (1000..=4000).contains(&p99),
1735 "empirical p99 {p99}µs should track the configured 2000µs",
1736 );
1737 assert!(
1738 max > p99.saturating_mul(2),
1739 "max {max}µs must dwarf p99 {p99}µs — the Pareto tail the fixed path lacks",
1740 );
1741 }
1742}