ktstr/workload/taobench/
run.rs

1//! The taobench engine: a bounded, sharded, evicting key-value cache served by a
2//! client population (closed-loop by default, or open-loop fixed-rate arrival),
3//! with a fast in-cache path (hit) and a slow backing-store-miss path (a
4//! worker-defined sleep on a dispatcher thread). It is
5//! entirely in-process — no sockets, no TLS, no subprocess — and re-expresses the
6//! taobench ACCESS PATTERN in ktstr primitives so its qps / hit-ratio flow
7//! through the metric API. See `validation.md` for the per-aspect real-vs-port
8//! fidelity comparison.
9//!
10//! ## Model
11//!
12//! - A `Cache` of `SHARDS` independently-locked shards, each a FIFO-evicting
13//!   map bounded to a per-shard object cap. Total capacity is `cache_capacity_mib`
14//!   worth of objects; the key range is sized `capacity / target_hit` so a
15//!   uniform-random key stream hits the resident set with probability
16//!   ≈ `target_hit` at equilibrium. Eviction is the load-bearing mechanism: with
17//!   no eviction a self-healing cache drifts to a 1.0 hit ratio; a bounded cache
18//!   whose key range exceeds its capacity holds a steady-state miss stream.
19//! - `client_threads` CLIENT threads pick a key, look it up, and on a HIT touch
20//!   the stored value bytes (the cache read-bandwidth cost) and count a fast op;
21//!   on a MISS hand the key to a slow dispatcher and block until it is filled,
22//!   then count a slow op.
23//! - Arrival: `arrival_rate == 0` (default) is CLOSED loop — each client issues
24//!   its next request as soon as the prior completes. `arrival_rate > 0` is OPEN
25//!   loop — each client has a fixed intended-arrival SCHEDULE
26//!   (`arrival_rate / client_threads` per client) independent of completion, and
27//!   serve latency is measured from that intended time. A client still holds at
28//!   most one outstanding request (it blocks on a miss before issuing the next),
29//!   so a slow completion delays the next issue; that backlog is folded into the
30//!   late requests' serve latency (coordinated-omission correction) instead of
31//!   being omitted. The serve-latency histogram is per-phase data (empty in
32//!   closed loop).
33//! - `slow_threads` DISPATCHER threads serve misses: sleep the simulated
34//!   backing-store fetch (a worker-defined cost, the same model schbench's
35//!   think-sleep uses, not a synchronization wait), insert a freshly sized+touched
36//!   value, and wake the waiting client. The fetch latency is a fixed
37//!   `slow_path_sleep_us` by default, or — when `slow_path_p99_us` is set above it
38//!   — a per-fetch heavy-tailed Pareto draw with median `slow_path_sleep_us` and
39//!   99th percentile `slow_path_p99_us` (most fetches near the median, a rare slow
40//!   tail), so the open-loop serve-latency tail reflects realistic backing-store
41//!   variance.
42//!
43//! ## Counters (request-time vs response-time)
44//!
45//! `get_cmds` / `get_misses` are counted at LOOKUP (request) time; `fast_ops` /
46//! `slow_ops` at COMPLETION (response) time. In a closed loop they are equal once
47//! every in-flight request drains, but a request that straddles a phase boundary
48//! lands its lookup in one phase and its completion in the next, so a per-phase
49//! command-time hit_rate (`1 - get_misses/get_cmds`) differs slightly from the
50//! response-time hit_ratio (`fast_ops/(fast_ops+slow_ops)`) — the same skew the
51//! real reports between its interval hit_rate and its final hit_ratio.
52
53use crate::workload::schbench::plat::PlatStats;
54use core::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
55use std::collections::HashMap;
56use std::collections::{BTreeMap, VecDeque};
57use std::sync::{Condvar, Mutex};
58
59/// Number of independently-locked cache shards (power of two). Sized to keep
60/// per-shard lock contention low for typical guest thread counts while bounding
61/// the lock-array footprint; the cache capacity is split evenly across shards.
62const SHARDS: usize = 256;
63
64/// Representative value-size distribution: a small-object-heavy, long-tailed
65/// histogram (bytes → relative weight) approximating the empirical object-size
66/// profile of the real workload (mean ≈ 332 B, tail to 64 KiB). This is ktstr's
67/// own approximation, not a copy of any external size table.
68const VALUE_SIZES: [usize; 8] = [64, 128, 256, 512, 1024, 4096, 16384, 65536];
69const VALUE_WEIGHTS: [u32; 8] = [450, 300, 130, 70, 30, 12, 3, 1];
70
71/// Cap on a single open-loop pacing sleep (ns). A client waiting for its next
72/// scheduled arrival re-checks `stop` at least this often, so a shutdown is
73/// observed within this bound rather than after a full inter-arrival interval —
74/// which at a degenerate low `arrival_rate` (`interval_ns = 1e9 * n_clients /
75/// arrival_rate`) can exceed the scenario cleanup budget and risk a watchdog kill.
76/// At realistic open-loop rates the per-arrival interval is well under this, so the
77/// pacing sleep is a single uncapped wait — the cap only engages at pathological
78/// low rates.
79const STOP_POLL_QUANTUM_NS: u64 = 50_000_000;
80
81/// Read `CLOCK_MONOTONIC` as nanoseconds (monotonic, not wall-clock), matching
82/// the schbench engine's clock source.
83fn monotonic_nanos() -> u64 {
84    // SAFETY: `clock_gettime` writes a `timespec` through the out-pointer and
85    // reads nothing else; CLOCK_MONOTONIC is always available on Linux.
86    let mut ts: libc::timespec = unsafe { core::mem::zeroed() };
87    let rc = unsafe { libc::clock_gettime(libc::CLOCK_MONOTONIC, &mut ts) };
88    assert_eq!(rc, 0, "clock_gettime(CLOCK_MONOTONIC) failed");
89    (ts.tv_sec as u64) * 1_000_000_000 + ts.tv_nsec as u64
90}
91
92/// The simulated backing-store fetch on the slow path. Worker-defined cost (like
93/// schbench's think-sleep), not a synchronization wait: `std::thread::sleep` maps
94/// to `clock_nanosleep` on Linux. `0` is a no-op (the dispatcher still does the
95/// fill + wakeup, so the slow path remains a distinct thread hop). The caller
96/// passes either the fixed `slow_path_sleep_us` or, when the heavy tail is enabled,
97/// a per-fetch [`sample_service_us`] Pareto draw — this function just sleeps `usec`.
98fn backing_store_fetch(usec: u64) {
99    if usec > 0 {
100        std::thread::sleep(std::time::Duration::from_micros(usec));
101    }
102}
103
104/// Upper bound on a single heavy-tailed slow-path fetch (µs). Pareto is unbounded;
105/// a pathological tail draw must not park a dispatcher off-CPU for an unbounded
106/// time — the dispatcher must complete the fetch + fill + wake for the blocked
107/// client (it cannot be interrupted on shutdown without breaking that contract), so
108/// this caps each fetch at 2 s (far above any realistic backing-store p99.9). The
109/// worst-case client-join/teardown is NOT a single fetch: at shutdown up to
110/// `n_clients` misses are outstanding and each dispatcher drains its queue share
111/// SERIALLY, so the last client can wait up to `ceil(n_clients / n_slow)` fetches —
112/// with the default staffing (`n_slow ≈ n_clients/3`) ~3·MAX_SERVICE_US. Size the
113/// scenario watchdog / cleanup budget accordingly when configuring a deep tail with
114/// many clients per dispatcher. This caps only the per-FETCH off-CPU time, not a
115/// whole serve-latency sample (fetch + open-loop queueing/coordinated-omission
116/// backlog, which can exceed it); the serve sample is kept inside the `PlatStats`
117/// u32 range by `record_serve_lat`'s own `(ns/1000).min(u32::MAX)` clamp,
118/// independent of this cap. Only the fixed legacy path is uncapped (the user sets
119/// that latency directly).
120const MAX_SERVICE_US: u64 = 2_000_000;
121
122/// Draw one heavy-tailed slow-path service time (µs) from the Pareto resolved by
123/// [`TaobenchConfig::resolve_service_pareto`]: `scale · U^(−1/α)` with `U ∈ (0, 1]`
124/// ([`Rng::f64_open01`]). `U = 1` yields the scale (the floor, below the median);
125/// smaller `U` yields the tail. Clamped to [`MAX_SERVICE_US`]. `inv_neg_alpha`
126/// is the precomputed `−1/α` exponent.
127fn sample_service_us(rng: &mut Rng, scale: f64, inv_neg_alpha: f64) -> u64 {
128    let us = scale * rng.f64_open01().powf(inv_neg_alpha);
129    (us as u64).min(MAX_SERVICE_US)
130}
131
132/// Touch every byte of a served/filled value so the read cannot be elided — the
133/// cache memory-bandwidth cost that makes this a cache workload rather than a
134/// control-flow micro-benchmark (schbench's `black_box`-guarded memset is the
135/// precedent). Returns the (black-boxed) checksum.
136fn touch(bytes: &[u8]) -> u64 {
137    let mut acc = 0u64;
138    for &b in bytes {
139        acc = acc.wrapping_add(b as u64);
140    }
141    std::hint::black_box(acc)
142}
143
144/// A small, fast, per-thread xorshift64 PRNG (workload key/size sampling only —
145/// not cryptographic). Seeded distinctly per thread so threads do not march in
146/// lockstep.
147struct Rng(u64);
148
149impl Rng {
150    fn new(seed: u64) -> Self {
151        // Avoid the xorshift fixed point at 0.
152        Rng(seed ^ 0x9E37_79B9_7F4A_7C15)
153    }
154    fn next_u64(&mut self) -> u64 {
155        let mut x = self.0;
156        x ^= x << 13;
157        x ^= x >> 7;
158        x ^= x << 17;
159        self.0 = x;
160        x
161    }
162    /// Uniform in `[0, n)` (`n > 0`); a modulo reduction — the tiny bias is
163    /// irrelevant for a workload key stream.
164    fn below(&mut self, n: u64) -> u64 {
165        self.next_u64() % n
166    }
167    /// Uniform `f64` in `(0, 1]` (open at 0, closed at 1) from the top 53 bits —
168    /// the `OpenClosed01` form the Pareto inverse-CDF needs ([`sample_service_us`]):
169    /// `U = 0` would make `U^(−1/α)` infinite, so 0 is excluded. Workload sampling
170    /// only, not cryptographic.
171    fn f64_open01(&mut self) -> f64 {
172        // Top 53 bits + 1 -> integer in 1..=2^53; / 2^53 -> (0, 1].
173        let bits = (self.next_u64() >> 11) + 1;
174        bits as f64 * (1.0 / ((1u64 << 53) as f64))
175    }
176}
177
178/// Sample a value size from [`VALUE_SIZES`] weighted by [`VALUE_WEIGHTS`].
179fn sample_value_size(rng: &mut Rng) -> usize {
180    let total: u32 = VALUE_WEIGHTS.iter().sum();
181    let mut pick = (rng.below(total as u64)) as u32;
182    for (i, &w) in VALUE_WEIGHTS.iter().enumerate() {
183        if pick < w {
184            return VALUE_SIZES[i];
185        }
186        pick -= w;
187    }
188    VALUE_SIZES[VALUE_SIZES.len() - 1]
189}
190
191/// Build a freshly-allocated, byte-filled value of `size` (the fill touches every
192/// byte, the write-side bandwidth cost).
193fn make_value(size: usize) -> Box<[u8]> {
194    vec![0xABu8; size].into_boxed_slice()
195}
196
197// ---------------------------------------------------------------------------
198// Config
199// ---------------------------------------------------------------------------
200
201/// User-facing config for the [`Taobench`](crate::workload::WorkType::Taobench)
202/// workload — a bounded, evicting key-value cache with a fast hit path and a slow
203/// miss path, driven to a steady-state hit ratio.
204///
205/// All fields are integer/scalar so the type keeps `Eq + Hash` (fractional knobs
206/// are expressed as integer percents). Every field has a chainable builder
207/// setter; [`Default`] is a useful working config.
208#[derive(Clone, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
209#[serde(rename_all = "snake_case")]
210pub struct TaobenchConfig {
211    /// CLIENT threads that issue lookups and serve hits. `0` resolves
212    /// to the allocated guest cpuset CPU count (one client per CPU).
213    pub client_threads: usize,
214    /// SLOW dispatcher threads that serve misses (sleep + fill + wake). `0`
215    /// resolves to `max(1, client_threads / 3)`, the real's fast:slow staffing
216    /// ratio.
217    pub slow_threads: usize,
218    /// Resident cache budget in MiB. The cache FIFO-evicts to stay near this many
219    /// bytes' worth of objects; larger than the guest LLC makes the value touch a
220    /// real memory-bandwidth cost.
221    pub cache_capacity_mib: usize,
222    /// Target steady-state hit ratio, in percent (`1..=99`). The key range is
223    /// sized `capacity / target_hit` so a uniform key stream hits at this rate at
224    /// equilibrium. Clamped into range at consumption.
225    pub target_hit_pct: usize,
226    /// Simulated backing-store fetch latency on a miss, in microseconds (the slow
227    /// dispatcher sleeps this long before filling). `0` keeps the slow path as a
228    /// pure thread hop with no sleep (and disables the heavy tail — a zero median
229    /// has no Pareto scale, so the fetch stays a no-op regardless of
230    /// `slow_path_p99_us`). When `slow_path_sleep_us > 0` and `slow_path_p99_us >
231    /// slow_path_sleep_us` this is the MEDIAN (p50) of a heavy-tailed service-time
232    /// distribution rather than a fixed latency (see `slow_path_p99_us`).
233    pub slow_path_sleep_us: u64,
234    /// Heavy-tailed slow-path service time: the p99 of the simulated backing-store
235    /// fetch, in microseconds. When `> slow_path_sleep_us`, each miss's fetch sleep
236    /// is drawn from a Pareto distribution whose median is `slow_path_sleep_us` and
237    /// whose 99th percentile is this value — most fetches near the median, a heavy
238    /// tail of rare slow fetches (GC pauses, cold cache, disk seeks), the realistic
239    /// power-law shape a backing store exhibits. `0` (default), any value
240    /// `<= slow_path_sleep_us`, OR `slow_path_sleep_us == 0` (a zero median has no
241    /// Pareto scale) keeps the fixed-latency behavior (every fetch sleeps exactly
242    /// `slow_path_sleep_us` — the legacy path, byte-identical). The tail is
243    /// clamped at a fixed maximum so a pathological draw cannot park a dispatcher
244    /// off-CPU unboundedly. A ktstr enhancement beyond the reference, whose
245    /// per-request slow-path service time is a FIXED sleep with no tail (the
246    /// reference's `[target/2, 2×target]` uniform jitter + idle-poll backoff is a
247    /// separate poll/idle-wait knob, not the per-request fetch), so the open-loop
248    /// serve-latency tail ([`crate::workload::TaobenchConfig::arrival_rate`])
249    /// reflects realistic
250    /// backing-store variance.
251    pub slow_path_p99_us: u64,
252    /// Open-loop arrival rate, AGGREGATE ops/sec across all client threads (the
253    /// taobench analog of schbench's `-R`). `0` (default) = CLOSED loop: each
254    /// client blocks until its request completes before issuing the next (the
255    /// legacy behavior, byte-identical). Non-zero = OPEN loop: each client has a
256    /// fixed intended-arrival SCHEDULE (`arrival_rate / client_threads` per client)
257    /// independent of completion, and serve latency is measured from that intended
258    /// time. A client still holds at most one outstanding request, so a slow
259    /// completion delays the next issue; that backlog is folded into the late
260    /// requests' serve latency (coordinated-omission correction) rather than
261    /// omitted. Divided evenly across resolved clients. (Named for the standard
262    /// queueing-theory term; the schbench analog is `requests_per_sec`, which is a
263    /// verbatim mirror of schbench's own `-R` CLI flag — a constraint this
264    /// ktstr-added field, with no reference `-R` to mirror, does not share.)
265    pub arrival_rate: usize,
266}
267
268impl Default for TaobenchConfig {
269    fn default() -> Self {
270        Self {
271            client_threads: 0,
272            slow_threads: 0,
273            cache_capacity_mib: 64,
274            target_hit_pct: 90,
275            slow_path_sleep_us: 100,
276            slow_path_p99_us: 0,
277            arrival_rate: 0,
278        }
279    }
280}
281
282impl TaobenchConfig {
283    /// Set the client thread count (`0` = one per allocated CPU).
284    #[must_use = "builder methods consume self; bind the result"]
285    pub fn client_threads(mut self, n: usize) -> Self {
286        self.client_threads = n;
287        self
288    }
289    /// Set the slow dispatcher thread count (`0` = `max(1, client_threads/3)`).
290    #[must_use = "builder methods consume self; bind the result"]
291    pub fn slow_threads(mut self, n: usize) -> Self {
292        self.slow_threads = n;
293        self
294    }
295    /// Set the resident cache budget in MiB.
296    #[must_use = "builder methods consume self; bind the result"]
297    pub fn cache_capacity_mib(mut self, mib: usize) -> Self {
298        self.cache_capacity_mib = mib;
299        self
300    }
301    /// Set the target steady-state hit ratio in percent (`1..=99`).
302    #[must_use = "builder methods consume self; bind the result"]
303    pub fn target_hit_pct(mut self, pct: usize) -> Self {
304        self.target_hit_pct = pct;
305        self
306    }
307    /// Set the simulated backing-store fetch latency on a miss, microseconds (the
308    /// MEDIAN when `slow_path_p99_us` enables the heavy tail; `0` keeps the fetch a
309    /// no-op and also disables the tail — a zero median has no Pareto scale).
310    #[must_use = "builder methods consume self; bind the result"]
311    pub fn slow_path_sleep_us(mut self, us: u64) -> Self {
312        self.slow_path_sleep_us = us;
313        self
314    }
315    /// Set the heavy-tailed slow-path service-time p99 in microseconds (`0` or
316    /// `<= slow_path_sleep_us` = fixed latency, the legacy behavior; also fixed when
317    /// `slow_path_sleep_us == 0`, a zero median having no Pareto scale). When larger
318    /// than a non-zero `slow_path_sleep_us`, each miss's fetch is drawn from a Pareto
319    /// with median `slow_path_sleep_us` and this p99.
320    #[must_use = "builder methods consume self; bind the result"]
321    pub fn slow_path_p99_us(mut self, us: u64) -> Self {
322        self.slow_path_p99_us = us;
323        self
324    }
325    /// Set the open-loop AGGREGATE arrival rate in ops/sec across all clients
326    /// (`0` = closed loop). Divided evenly across resolved client threads; serve
327    /// latency is then measured from the intended arrival (coordinated-omission).
328    #[must_use = "builder methods consume self; bind the result"]
329    pub fn arrival_rate(mut self, ops_per_sec: usize) -> Self {
330        self.arrival_rate = ops_per_sec;
331        self
332    }
333
334    /// Resolve the client thread count: the configured value, or the allocated
335    /// CPU count when `0`.
336    fn resolve_client_threads(&self, allowed_cpus: usize) -> usize {
337        if self.client_threads == 0 {
338            allowed_cpus.max(1)
339        } else {
340            self.client_threads
341        }
342    }
343
344    /// Resolve the slow dispatcher count: the configured value, or
345    /// `max(1, clients/3)` when `0`.
346    fn resolve_slow_threads(&self, clients: usize) -> usize {
347        if self.slow_threads == 0 {
348            (clients / 3).max(1)
349        } else {
350            self.slow_threads
351        }
352    }
353
354    /// Clamp the target hit ratio to a usable open interval (a 0% or ≥100% target
355    /// has no finite key range / no miss stream).
356    fn target_hit_fraction(&self) -> f64 {
357        (self.target_hit_pct.clamp(1, 99) as f64) / 100.0
358    }
359
360    /// Resolve the heavy-tailed slow-path service-time model from the `(p50, p99)`
361    /// knob: `None` = fixed `slow_path_sleep_us` (the legacy path); `Some((scale,
362    /// inv_neg_alpha))` = a Pareto whose median is `slow_path_sleep_us` and whose
363    /// p99 is `slow_path_p99_us`, sampled by [`sample_service_us`].
364    ///
365    /// `slow_path_p99_us <= slow_path_sleep_us` (incl. `0`) resolves to `None` — a
366    /// p99 at or below the median is not a tail. `slow_path_sleep_us == 0` also
367    /// resolves to `None`: a zero median has no Pareto scale, and the legacy no-op
368    /// fetch is preserved.
369    ///
370    /// Mapping: the Pareto q-quantile is `scale·(1−q)^(−1/α)`, so from the two
371    /// quantiles `p99/p50 = (0.01/0.5)^(−1/α) = 50^(1/α)`, giving
372    /// `α = ln(50) / ln(p99/p50)` and `scale = p50·0.5^(1/α)`. `p99 > p50`
373    /// guarantees `ln(p99/p50) > 0 ⇒ α > 0`, and `α ≥ ln(50)/ln(spread)` keeps the
374    /// mean finite (`α > 1`) for any spread below 50x. The stored `inv_neg_alpha =
375    /// −1/α` is the exponent [`sample_service_us`] raises `U` to.
376    fn resolve_service_pareto(&self) -> Option<(f64, f64)> {
377        let p50 = self.slow_path_sleep_us;
378        let p99 = self.slow_path_p99_us;
379        if p50 == 0 || p99 <= p50 {
380            return None;
381        }
382        let (p50f, p99f) = (p50 as f64, p99 as f64);
383        let alpha = 50.0_f64.ln() / (p99f / p50f).ln();
384        let scale = p50f * 0.5_f64.powf(1.0 / alpha);
385        Some((scale, -1.0 / alpha))
386    }
387}
388
389// ---------------------------------------------------------------------------
390// Cache
391// ---------------------------------------------------------------------------
392
393/// One cache shard: a FIFO-evicting map bounded to `cap` objects. FIFO eviction
394/// over a uniform key stream yields the same equilibrium hit ratio as LRU
395/// (resident_fraction = cap / key_range) while being O(1) and lock-cheap.
396struct Shard {
397    map: HashMap<u64, Box<[u8]>>,
398    fifo: VecDeque<u64>,
399    cap: usize,
400}
401
402impl Shard {
403    fn with_cap(cap: usize) -> Self {
404        Shard {
405            map: HashMap::new(),
406            fifo: VecDeque::new(),
407            cap,
408        }
409    }
410    /// Look up `k`; on a hit, touch its bytes and return `true`.
411    fn get_touch(&self, k: u64) -> bool {
412        match self.map.get(&k) {
413            Some(v) => {
414                touch(v);
415                true
416            }
417            None => false,
418        }
419    }
420    /// Insert a freshly-built value for `k` — called at warmup and on a miss,
421    /// where `k` is absent; the `is_none()` guard keeps it correct even if `k`
422    /// is already present. FIFO-evicts down to `cap`.
423    fn insert(&mut self, k: u64, v: Box<[u8]>) {
424        if self.map.insert(k, v).is_none() {
425            self.fifo.push_back(k);
426            while self.map.len() > self.cap {
427                match self.fifo.pop_front() {
428                    Some(old) => {
429                        self.map.remove(&old);
430                    }
431                    None => break,
432                }
433            }
434        }
435    }
436}
437
438/// A bounded, sharded, FIFO-evicting cache. Keys map to shards by the low bits of
439/// the key (`SHARDS` is a power of two).
440struct Cache {
441    shards: Vec<Mutex<Shard>>,
442}
443
444impl Cache {
445    /// Build a cache holding ≈ `total_objects` across `SHARDS` shards.
446    fn new(total_objects: usize) -> Self {
447        let per_shard = (total_objects / SHARDS).max(1);
448        let shards = (0..SHARDS)
449            .map(|_| Mutex::new(Shard::with_cap(per_shard)))
450            .collect();
451        Cache { shards }
452    }
453    fn shard(&self, k: u64) -> &Mutex<Shard> {
454        &self.shards[(k as usize) & (SHARDS - 1)]
455    }
456    /// Look up + touch `k`; `true` on a hit.
457    fn get_touch(&self, k: u64) -> bool {
458        self.shard(k)
459            .lock()
460            .expect("cache shard poisoned")
461            .get_touch(k)
462    }
463    /// Fill `k` with a freshly sized + built value (the value bytes are touched
464    /// by `make_value`); FIFO-evicts to keep the shard bounded.
465    fn fill(&self, k: u64, size: usize) {
466        let v = make_value(size);
467        self.shard(k)
468            .lock()
469            .expect("cache shard poisoned")
470            .insert(k, v);
471    }
472}
473
474// ---------------------------------------------------------------------------
475// Slow-path handoff
476// ---------------------------------------------------------------------------
477
478/// A miss handed from a client to a slow dispatcher. The client measures serve
479/// latency from its own local intended-arrival timestamp after `wait_filled`, so
480/// the request itself carries no timing — only the key + the client to wake.
481struct SlowReq {
482    key: u64,
483    client: usize,
484}
485
486/// The slow-request queue: dispatchers block here when idle; clients push misses
487/// and wake one dispatcher.
488struct SlowQueue {
489    q: Mutex<VecDeque<SlowReq>>,
490    cv: Condvar,
491}
492
493impl SlowQueue {
494    fn new() -> Self {
495        SlowQueue {
496            q: Mutex::new(VecDeque::new()),
497            cv: Condvar::new(),
498        }
499    }
500    fn push(&self, req: SlowReq) {
501        self.q.lock().expect("slow queue poisoned").push_back(req);
502        self.cv.notify_one();
503    }
504}
505
506/// A per-client response slot: the dispatcher sets `done` + notifies after the
507/// fill; the client blocks here until its outstanding miss is served. At most one
508/// outstanding request per client in BOTH arrival modes (a client blocks on
509/// `wait_filled` for a miss before issuing its next request), so the slot is
510/// reused. This single-outstanding-request serialization is exactly why open-loop
511/// serve latency must be measured from the intended arrival (coordinated-omission):
512/// a slow completion delays the next issue, and that backlog is folded into the
513/// late requests' latency instead of omitted.
514struct Slot {
515    done: Mutex<bool>,
516    cv: Condvar,
517}
518
519impl Slot {
520    fn new() -> Self {
521        Slot {
522            done: Mutex::new(false),
523            cv: Condvar::new(),
524        }
525    }
526    /// Block until the dispatcher marks this slot done, then reset it.
527    fn wait_filled(&self) {
528        let mut done = self.done.lock().expect("slot poisoned");
529        while !*done {
530            done = self.cv.wait(done).expect("slot poisoned");
531        }
532        *done = false;
533    }
534    /// Mark this slot filled and wake the waiting client.
535    fn signal(&self) {
536        *self.done.lock().expect("slot poisoned") = true;
537        self.cv.notify_one();
538    }
539}
540
541// ---------------------------------------------------------------------------
542// Stats
543// ---------------------------------------------------------------------------
544
545/// Taobench engine counters for one accounting window — a single phase
546/// epoch (the per-phase `crate::workload::PhaseSlice::taobench` carrier) or a
547/// whole worker run (the [`crate::workload::WorkerReport::taobench_whole`] /
548/// [`crate::assert::CgroupStats::taobench_whole`] aggregate). Integer-only so the
549/// enclosing `PhaseSlice` keeps `Eq`. `get_cmds` / `get_misses` are request-time;
550/// `fast_ops` / `slow_ops` are response-time (see the module docs). `Self::merge`
551/// pools two windows (Σ ops, MAX wall) and [`Self::total_ops`] is the throughput
552/// numerator; the host derives the run-level `taobench_*` Rate metrics from the
553/// pooled aggregate.
554#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
555pub struct TaobenchStats {
556    /// Lookups issued (request time).
557    pub get_cmds: u64,
558    /// Lookups that missed (request time).
559    pub get_misses: u64,
560    /// Hits served (response time).
561    pub fast_ops: u64,
562    /// Misses served via the slow path (response time).
563    pub slow_ops: u64,
564    /// Wall-clock window this stat covers, ns — the qps denominator. Per-phase:
565    /// the phase segment; whole-run: the run window. Merged as MAX (the window is
566    /// shared by the concurrent threads/workers being pooled, not summed).
567    pub elapsed_ns: u64,
568}
569
570impl TaobenchStats {
571    pub(crate) fn merge(&mut self, o: &TaobenchStats) {
572        // saturating_add: these are guest-runtime monotonic op counters pooled
573        // across concurrent workers/cgroups. Overflow is unreachable for honest
574        // data (real counts ~1e7..1e14 << u64::MAX), but a corrupt/hostile guest
575        // value would otherwise debug-panic or release-wrap to a silently-wrong
576        // qps/hit Rate; saturating is exact for every in-range value.
577        self.get_cmds = self.get_cmds.saturating_add(o.get_cmds);
578        self.get_misses = self.get_misses.saturating_add(o.get_misses);
579        self.fast_ops = self.fast_ops.saturating_add(o.fast_ops);
580        self.slow_ops = self.slow_ops.saturating_add(o.slow_ops);
581        // The wall window is shared across the pooled concurrent threads/workers
582        // (they run the same phase at the same time), so MAX, not sum.
583        self.elapsed_ns = self.elapsed_ns.max(o.elapsed_ns);
584    }
585    /// Completed ops (fast + slow) — the throughput numerator.
586    pub fn total_ops(&self) -> u64 {
587        self.fast_ops.saturating_add(self.slow_ops)
588    }
589}
590
591/// One PER-PHASE accounting window's taobench carrier: the integer counters
592/// ([`TaobenchStats`]) plus the open-loop serve-latency histogram. Internal
593/// (`pub(crate)`) — the taobench analog of
594/// [`crate::workload::schbench::run::SchbenchPhaseStats`] (counters + `PlatStats`
595/// histograms together), and like it, never on a public field. It rides only the
596/// per-phase carriers (`PhaseSlice::taobench` / `PhaseCgroupStats::taobench`); the
597/// WHOLE-RUN carrier is the counters-only [`TaobenchStats`] (the serve histogram
598/// is per-phase data — the whole-run serve distribution is the union of these
599/// per-phase histograms, computed where needed). `serve_lat` is EMPTY in closed
600/// loop: serve latency is measured only under open-loop arrival, so its
601/// percentiles read absent there.
602#[derive(Clone, Debug, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
603pub(crate) struct TaobenchPhaseStats {
604    /// Throughput / hit counters (request- and response-time).
605    pub(crate) counters: TaobenchStats,
606    /// Coordinated-omission serve latency (µs), measured from intended arrival.
607    /// Exposed to test authors only via the derived `taobench_serve_*_us` metric
608    /// keys (the schbench-histogram model), never as a direct field.
609    pub(crate) serve_lat: PlatStats,
610}
611
612impl TaobenchPhaseStats {
613    /// Pool another window: Σ counters ([`TaobenchStats::merge`]) and union the
614    /// serve histograms ([`PlatStats::combine`]) — the per-cgroup / cross-thread
615    /// fold, mirroring the schbench carrier merge.
616    pub(crate) fn merge(&mut self, other: &TaobenchPhaseStats) {
617        self.counters.merge(&other.counters);
618        self.serve_lat.combine(&other.serve_lat);
619    }
620}
621
622/// The engine's return: the whole-run merged COUNTERS ([`TaobenchStats`], the
623/// qps/hit aggregate) and the per-phase-epoch carriers (each counters + the
624/// serve-latency histogram). The whole-run carrier is counters-only — mirroring
625/// the wire carriers (`WorkerReport::taobench_whole` / `CgroupStats::taobench_whole`);
626/// a whole-run serve distribution, when needed (the standalone driver), is the
627/// union of the per-phase `serve_lat` histograms.
628pub(crate) struct TaobenchOutcome {
629    pub whole_run: TaobenchStats,
630    pub phases: Vec<(u32, TaobenchPhaseStats)>,
631}
632
633/// Per-thread accumulation with phase-epoch bucketing: the current epoch's
634/// counters roll into `phases` when the epoch changes (the schbench drain-on-epoch
635/// pattern), and `whole` accumulates the un-bucketed run total. Wall windows are
636/// stamped from `CLOCK_MONOTONIC` at each roll (per-phase segment) and at finalize
637/// (whole-run).
638struct ThreadAccum {
639    cur_epoch: u32,
640    cur: TaobenchPhaseStats,
641    phases: BTreeMap<u32, TaobenchPhaseStats>,
642    /// Whole-run counters only (the qps/hit aggregate). The serve histogram is
643    /// per-phase data (in `cur`/`phases`); the whole-run serve distribution, when
644    /// needed, is the union of the per-phase histograms.
645    whole: TaobenchStats,
646    /// When the current phase segment started (ns).
647    phase_start_ns: u64,
648    /// When this thread started (ns) — the whole-run window start.
649    thread_start_ns: u64,
650}
651
652impl ThreadAccum {
653    fn new(epoch: u32) -> Self {
654        let now = monotonic_nanos();
655        ThreadAccum {
656            cur_epoch: epoch,
657            cur: TaobenchPhaseStats::default(),
658            phases: BTreeMap::new(),
659            whole: TaobenchStats::default(),
660            phase_start_ns: now,
661            thread_start_ns: now,
662        }
663    }
664    /// Roll the current bucket into `phases` (stamping its segment wall) and start
665    /// a fresh one for `epoch`.
666    fn roll_to(&mut self, epoch: u32) {
667        if epoch != self.cur_epoch {
668            let now = monotonic_nanos();
669            let mut cur = std::mem::take(&mut self.cur);
670            cur.counters.elapsed_ns = now.saturating_sub(self.phase_start_ns);
671            self.phases.entry(self.cur_epoch).or_default().merge(&cur);
672            self.cur_epoch = epoch;
673            self.phase_start_ns = now;
674        }
675    }
676    /// Record a lookup at request time in `epoch`.
677    fn record_cmd(&mut self, epoch: u32, hit: bool) {
678        self.roll_to(epoch);
679        self.cur.counters.get_cmds += 1;
680        self.whole.get_cmds += 1;
681        if !hit {
682            self.cur.counters.get_misses += 1;
683            self.whole.get_misses += 1;
684        }
685    }
686    /// Record a completion at response time in `epoch`.
687    fn record_complete(&mut self, epoch: u32, hit: bool) {
688        self.roll_to(epoch);
689        if hit {
690            self.cur.counters.fast_ops += 1;
691            self.whole.fast_ops += 1;
692        } else {
693            self.cur.counters.slow_ops += 1;
694            self.whole.slow_ops += 1;
695        }
696    }
697    /// Record an open-loop serve latency (`ns` from the intended arrival) in
698    /// `epoch`, into the current phase segment's histogram (µs buckets). Closed
699    /// loop never calls this, so the histograms stay empty and the serve-latency
700    /// keys read absent. The whole-run serve distribution is the union of the
701    /// per-phase histograms, so only the per-phase bucket is recorded here.
702    fn record_serve_lat(&mut self, epoch: u32, ns: u64) {
703        self.roll_to(epoch);
704        let us = (ns / 1000).min(u32::MAX as u64) as u32;
705        self.cur.serve_lat.add_lat(us);
706    }
707    /// Flush the last open bucket (stamping its segment wall) and stamp the
708    /// whole-run window.
709    fn finalize(mut self) -> Self {
710        let now = monotonic_nanos();
711        let mut cur = std::mem::take(&mut self.cur);
712        cur.counters.elapsed_ns = now.saturating_sub(self.phase_start_ns);
713        self.phases.entry(self.cur_epoch).or_default().merge(&cur);
714        self.whole.elapsed_ns = now.saturating_sub(self.thread_start_ns);
715        self
716    }
717}
718
719/// Read the current phase epoch (`0` when phases are not tracked).
720fn read_epoch(phase_epoch: Option<&AtomicU32>) -> u32 {
721    phase_epoch.map(|e| e.load(Ordering::Relaxed)).unwrap_or(0)
722}
723
724// ---------------------------------------------------------------------------
725// Engine
726// ---------------------------------------------------------------------------
727
728/// Run the taobench engine until `stop` is set. `progress` counts completed ops
729/// (the live work-unit counter); `phase_epoch` buckets per-phase stats.
730pub(crate) fn run(
731    config: &TaobenchConfig,
732    stop: &AtomicBool,
733    progress: &AtomicU64,
734    phase_epoch: Option<&AtomicU32>,
735) -> TaobenchOutcome {
736    let allowed_cpus = resolve_allowed_cpus();
737    let n_clients = config.resolve_client_threads(allowed_cpus);
738    let n_slow = config.resolve_slow_threads(n_clients);
739    // Open-loop per-client inter-arrival = 1s / (arrival_rate / n_clients) =
740    // 1e9 * n_clients / arrival_rate. `0` arrival_rate ⇒ `0` interval ⇒ closed
741    // loop (issue as fast as completions allow — the legacy behavior). `.max(1)`
742    // keeps a non-zero rate strictly open-loop even at an absurd rate (so the
743    // `interval_ns != 0` mode gate in client_loop never misfires to closed loop).
744    // saturating_mul: an absurd client count (physically unreachable — it would
745    // OOM on thread spawn first) cannot wrap the numerator to a tiny interval, the
746    // same overflow-safe discipline as `TaobenchStats::merge`.
747    let interval_ns: u64 = if config.arrival_rate == 0 {
748        0
749    } else {
750        (1_000_000_000u64.saturating_mul(n_clients as u64) / config.arrival_rate as u64).max(1)
751    };
752    // Resolve the slow-path service-time model once (None = fixed legacy latency;
753    // Some = a per-fetch Pareto draw). Cheap, but kept out of the dispatcher hot
754    // path.
755    let service_pareto = config.resolve_service_pareto();
756
757    // Size the resident object count and the key range. Mean value size pins the
758    // object count for a byte budget; key_range = capacity / target_hit makes a
759    // uniform key stream hit the resident set at ≈ target_hit at equilibrium.
760    let mean_value = mean_value_size();
761    let capacity_bytes = config.cache_capacity_mib.max(1) * 1024 * 1024;
762    let total_objects = (capacity_bytes / mean_value).max(SHARDS);
763    let key_range = ((total_objects as f64) / config.target_hit_fraction()).ceil() as u64;
764
765    let cache = Cache::new(total_objects);
766    // Warm the cache to capacity so the hit ratio starts at the target rather
767    // than climbing from empty (the real warms before measuring). Keys
768    // [0, total_objects) are the initial resident set; the client key stream over
769    // [0, key_range) then hits them at ≈ target_hit and the eviction<->refill
770    // equilibrium holds it there.
771    {
772        let mut warm = Rng::new(0xC0FFEE);
773        for k in 0..total_objects as u64 {
774            cache.fill(k, sample_value_size(&mut warm));
775        }
776    }
777
778    let slow_q = SlowQueue::new();
779    let slots: Vec<Slot> = (0..n_clients).map(|_| Slot::new()).collect();
780    let disp_stop = AtomicBool::new(false);
781
782    let client_accums: Vec<ThreadAccum> = std::thread::scope(|s| {
783        // Dispatchers first so a client miss is served immediately.
784        let dispatchers: Vec<_> = (0..n_slow)
785            .map(|i| {
786                let cache = &cache;
787                let slow_q = &slow_q;
788                let slots = &slots;
789                let disp_stop = &disp_stop;
790                s.spawn(move || {
791                    dispatcher_loop(i, config, service_pareto, cache, slow_q, slots, disp_stop)
792                })
793            })
794            .collect();
795
796        let clients: Vec<_> = (0..n_clients)
797            .map(|i| {
798                let cache = &cache;
799                let slow_q = &slow_q;
800                let slot = &slots[i];
801                s.spawn(move || {
802                    client_loop(
803                        i,
804                        stop,
805                        progress,
806                        phase_epoch,
807                        cache,
808                        slow_q,
809                        slot,
810                        key_range,
811                        interval_ns,
812                    )
813                })
814            })
815            .collect();
816
817        // Join the clients first: once every client has returned (each only
818        // exits at the top of its loop, after its outstanding request — if any —
819        // has been served), NO further misses can be enqueued. Only then signal
820        // the dispatchers, so a dispatcher can never exit while a client is still
821        // blocked on an unserved miss.
822        let accums: Vec<ThreadAccum> = clients
823            .into_iter()
824            .map(|c| c.join().expect("taobench client panicked"))
825            .collect();
826        disp_stop.store(true, Ordering::Release);
827        slow_q.cv.notify_all();
828        for d in dispatchers {
829            d.join().expect("taobench dispatcher panicked");
830        }
831        accums
832    });
833
834    // Reduce: merge per-epoch and whole-run across clients. Each ThreadAccum
835    // stamped its per-phase segment walls + its whole-run window in finalize().
836    let mut all_phases: BTreeMap<u32, TaobenchPhaseStats> = BTreeMap::new();
837    let mut whole = TaobenchStats::default();
838    for accum in client_accums {
839        for (e, s) in accum.phases {
840            all_phases.entry(e).or_default().merge(&s);
841        }
842        whole.merge(&accum.whole);
843    }
844
845    TaobenchOutcome {
846        whole_run: whole,
847        phases: all_phases.into_iter().collect(),
848    }
849}
850
851/// One client: pick a key, look it up, serve a hit inline (touch + fast op) or
852/// hand a miss to a dispatcher and block until filled (slow op). `interval_ns ==
853/// 0` is CLOSED loop (issue the next request as soon as the prior completes — the
854/// legacy behavior). `interval_ns > 0` is OPEN loop: requests are due on a fixed
855/// `interval_ns` cadence independent of completion (the first one interval after
856/// the loop start `t0`), and serve latency is measured from the INTENDED arrival
857/// (coordinated-omission) so a backlog from slow service is folded into the late
858/// requests' latency instead of omitted.
859#[allow(clippy::too_many_arguments)]
860fn client_loop(
861    id: usize,
862    stop: &AtomicBool,
863    progress: &AtomicU64,
864    phase_epoch: Option<&AtomicU32>,
865    cache: &Cache,
866    slow_q: &SlowQueue,
867    slot: &Slot,
868    key_range: u64,
869    interval_ns: u64,
870) -> ThreadAccum {
871    let mut rng = Rng::new(0x5EED_0000 ^ (id as u64).wrapping_mul(0x9E37_79B1));
872    let mut accum = ThreadAccum::new(read_epoch(phase_epoch));
873    let open_loop = interval_ns != 0;
874    // Open-loop schedule cursor, advanced one `interval` before each request: the
875    // first request is due at `t0 + interval`, then every `interval` thereafter.
876    let mut intended = monotonic_nanos();
877
878    'client: while !stop.load(Ordering::Acquire) {
879        if open_loop {
880            // Pace to the next scheduled arrival; if behind (overload), the wait
881            // collapses to zero and the gap is captured as serve latency below.
882            // Each sleep is capped at STOP_POLL_QUANTUM_NS so a shutdown set during
883            // a long inter-arrival gap is observed within the quantum (not after a
884            // full interval) — exiting promptly via `break 'client` to `finalize`,
885            // bounding client-join teardown regardless of `arrival_rate`.
886            intended = intended.saturating_add(interval_ns);
887            loop {
888                if stop.load(Ordering::Acquire) {
889                    break 'client;
890                }
891                let now = monotonic_nanos();
892                if now >= intended {
893                    break;
894                }
895                let nap = (intended - now).min(STOP_POLL_QUANTUM_NS);
896                std::thread::sleep(std::time::Duration::from_nanos(nap));
897            }
898        }
899        let epoch_cmd = read_epoch(phase_epoch);
900        let key = rng.below(key_range);
901        let hit = cache.get_touch(key);
902        accum.record_cmd(epoch_cmd, hit);
903
904        if hit {
905            // Hit completes inline; response epoch == request epoch.
906            accum.record_complete(epoch_cmd, true);
907            if open_loop {
908                let lat = monotonic_nanos().saturating_sub(intended);
909                accum.record_serve_lat(epoch_cmd, lat);
910            }
911        } else {
912            // Miss: hand to a dispatcher and block until it fills + wakes us.
913            // The dispatcher always serves us before `disp_stop` is set (which
914            // happens only after every client has joined), so this never blocks
915            // past shutdown.
916            slow_q.push(SlowReq { key, client: id });
917            slot.wait_filled();
918            let resp_epoch = read_epoch(phase_epoch);
919            accum.record_complete(resp_epoch, false);
920            if open_loop {
921                // Completion timestamp taken by the CLIENT after wait_filled, so
922                // serve latency covers the full slow_q queueing + dispatcher
923                // service + wakeup, all measured from the intended arrival.
924                let lat = monotonic_nanos().saturating_sub(intended);
925                accum.record_serve_lat(resp_epoch, lat);
926            }
927        }
928        progress.fetch_add(1, Ordering::Relaxed);
929    }
930    accum.finalize()
931}
932
933/// One slow dispatcher: pull a miss, perform the simulated backing-store fetch,
934/// fill the cache, and wake the waiting client. Drains any queued misses before
935/// exiting on `disp_stop`. `service_pareto` is the resolved heavy-tail model
936/// (`None` = fixed `slow_path_sleep_us`); the per-fetch draw is deterministic from
937/// this dispatcher's seeded `rng` (the same one that picks the fill value size).
938fn dispatcher_loop(
939    id: usize,
940    config: &TaobenchConfig,
941    service_pareto: Option<(f64, f64)>,
942    cache: &Cache,
943    slow_q: &SlowQueue,
944    slots: &[Slot],
945    disp_stop: &AtomicBool,
946) {
947    let mut rng = Rng::new(0xD15A_0000 ^ (id as u64).wrapping_mul(0x9E37_79B1));
948    loop {
949        let req = {
950            let mut q = slow_q.q.lock().expect("slow queue poisoned");
951            loop {
952                if let Some(r) = q.pop_front() {
953                    break Some(r);
954                }
955                if disp_stop.load(Ordering::Acquire) {
956                    break None;
957                }
958                q = slow_q.cv.wait(q).expect("slow queue poisoned");
959            }
960        };
961        let Some(req) = req else { break };
962
963        // Heavy-tailed Pareto draw when configured, else the fixed latency. The
964        // draw precedes the value-size draw, so both come off this dispatcher's
965        // deterministic seeded stream.
966        let usec = match service_pareto {
967            Some((scale, inv_neg_alpha)) => sample_service_us(&mut rng, scale, inv_neg_alpha),
968            None => config.slow_path_sleep_us,
969        };
970        backing_store_fetch(usec);
971        cache.fill(req.key, sample_value_size(&mut rng));
972        slots[req.client].signal();
973    }
974}
975
976// ---------------------------------------------------------------------------
977// Topology + size helpers
978// ---------------------------------------------------------------------------
979
980/// The mean of the value-size distribution (for sizing the object count).
981fn mean_value_size() -> usize {
982    let total_w: u64 = VALUE_WEIGHTS.iter().map(|&w| w as u64).sum();
983    let weighted: u64 = VALUE_SIZES
984        .iter()
985        .zip(VALUE_WEIGHTS.iter())
986        .map(|(&s, &w)| s as u64 * w as u64)
987        .sum();
988    (weighted / total_w).max(1) as usize
989}
990
991/// Resolve the number of CPUs the worker is allowed to run on (its affinity
992/// mask), falling back to the online CPU count. Mirrors the schbench engine's
993/// cpuset-scoped resolution.
994fn resolve_allowed_cpus() -> usize {
995    // SAFETY: `sched_getaffinity` writes the calling thread's CPU mask into the
996    // provided cpu_set; reads nothing else.
997    unsafe {
998        let mut set: libc::cpu_set_t = core::mem::zeroed();
999        if libc::sched_getaffinity(0, core::mem::size_of::<libc::cpu_set_t>(), &mut set) == 0 {
1000            let n = libc::CPU_COUNT(&set);
1001            if n > 0 {
1002                return n as usize;
1003            }
1004        }
1005    }
1006    let n = unsafe { libc::sysconf(libc::_SC_NPROCESSORS_ONLN) };
1007    if n > 0 { n as usize } else { 1 }
1008}
1009
1010// ---------------------------------------------------------------------------
1011// Host-side standalone runner (validation driver entry)
1012// ---------------------------------------------------------------------------
1013
1014/// Host-side standalone run of the taobench engine for `run_secs`, returning a
1015/// summary report — the analog of schbench's `run_standalone`, backing the
1016/// `ktstr-taobench-validate` driver for the side-by-side comparison against the
1017/// reference taobench. NOT used in-VM (the scenario engine drives `run` there).
1018///
1019/// The engine warms the cache synchronously before the client threads start; the
1020/// report's `elapsed_secs` is the engine-measured CLIENT window (post-warmup), so
1021/// the qps figures are steady-state, not diluted by warmup.
1022pub fn run_standalone(config: &TaobenchConfig, run_secs: u64) -> TaobenchStandaloneReport {
1023    let stop = AtomicBool::new(false);
1024    let progress = AtomicU64::new(0);
1025    let allowed_cpus = resolve_allowed_cpus();
1026    let nr_client_threads = config.resolve_client_threads(allowed_cpus);
1027    let nr_slow_threads = config.resolve_slow_threads(nr_client_threads);
1028    let outcome = std::thread::scope(|s| {
1029        let h = s.spawn(|| run(config, &stop, &progress, None));
1030        // The benchmark run window — a workload-defined duration (the same model
1031        // as the scenario engine's hold step), not a synchronization wait.
1032        std::thread::sleep(std::time::Duration::from_secs(run_secs));
1033        stop.store(true, Ordering::Release);
1034        h.join().expect("taobench standalone run panicked")
1035    });
1036    // Whole-run serve-latency distribution = union of the per-phase histograms
1037    // (the whole-run carrier is counters-only). Empty in closed loop, so the
1038    // report's serve percentiles read absent there.
1039    let mut serve = PlatStats::default();
1040    for (_epoch, p) in &outcome.phases {
1041        serve.combine(&p.serve_lat);
1042    }
1043    TaobenchStandaloneReport::from_run(
1044        &outcome.whole_run,
1045        &serve,
1046        nr_client_threads,
1047        nr_slow_threads,
1048    )
1049}
1050
1051/// Summary of a [`run_standalone`] run — the headline taobench metrics in the
1052/// shape the reference taobench server reports (`fast_qps` / `hit_rate` /
1053/// `slow_qps`) plus the derived `total_qps` (= fast + slow) and `hit_ratio`
1054/// (= fast / total). Under open-loop arrival (`arrival_rate > 0`) it also carries
1055/// the coordinated-omission serve-latency percentiles; these are `None` in closed
1056/// loop (no intended-arrival schedule, so no serve latency is measured).
1057#[derive(Debug, Clone, Copy, PartialEq)]
1058pub struct TaobenchStandaloneReport {
1059    /// (fast + slow) ops per second over the measured window.
1060    pub total_qps: f64,
1061    /// Hits served per second.
1062    pub fast_qps: f64,
1063    /// Misses served (slow path) per second.
1064    pub slow_qps: f64,
1065    /// Response-time hit ratio: fast / (fast + slow).
1066    pub hit_ratio: f64,
1067    /// Command-time hit rate: 1 - get_misses / get_cmds.
1068    pub hit_rate: f64,
1069    /// Completed ops (fast + slow).
1070    pub total_ops: u64,
1071    /// Hits served.
1072    pub fast_ops: u64,
1073    /// Misses served via the slow path.
1074    pub slow_ops: u64,
1075    /// The engine-measured client window, seconds.
1076    pub elapsed_secs: f64,
1077    /// Resolved client thread count.
1078    pub nr_client_threads: usize,
1079    /// Resolved slow dispatcher count.
1080    pub nr_slow_threads: usize,
1081    /// Open-loop coordinated-omission serve-latency percentiles (µs), measured
1082    /// from intended arrival. `None` in closed loop / when no serve samples were
1083    /// recorded (matching the metric-key ABSENT discipline).
1084    pub serve_p50_us: Option<u32>,
1085    /// Open-loop serve-latency p99 (µs) — the headline coordinated-omission tail.
1086    pub serve_p99_us: Option<u32>,
1087    /// Open-loop serve-latency p99.9 (µs).
1088    pub serve_p999_us: Option<u32>,
1089    /// Open-loop serve-latency minimum sample (µs).
1090    pub serve_min_us: Option<u32>,
1091    /// Open-loop serve-latency maximum sample (µs).
1092    pub serve_max_us: Option<u32>,
1093}
1094
1095impl TaobenchStandaloneReport {
1096    fn from_run(
1097        w: &TaobenchStats,
1098        serve: &PlatStats,
1099        nr_client_threads: usize,
1100        nr_slow_threads: usize,
1101    ) -> Self {
1102        use crate::workload::schbench::plat::Pct;
1103        let secs = (w.elapsed_ns as f64 / 1e9).max(f64::MIN_POSITIVE);
1104        let total = w.total_ops();
1105        // Serve-latency percentiles are present iff open-loop arrival recorded
1106        // samples; closed loop leaves the histogram empty, so all fields read
1107        // None (the metric-key ABSENT discipline).
1108        let serve_q = (serve.sample_count() > 0).then(|| serve.percentiles());
1109        TaobenchStandaloneReport {
1110            total_qps: total as f64 / secs,
1111            fast_qps: w.fast_ops as f64 / secs,
1112            slow_qps: w.slow_ops as f64 / secs,
1113            hit_ratio: if total > 0 {
1114                w.fast_ops as f64 / total as f64
1115            } else {
1116                0.0
1117            },
1118            hit_rate: if w.get_cmds > 0 {
1119                1.0 - (w.get_misses as f64 / w.get_cmds as f64)
1120            } else {
1121                0.0
1122            },
1123            total_ops: total,
1124            fast_ops: w.fast_ops,
1125            slow_ops: w.slow_ops,
1126            elapsed_secs: secs,
1127            nr_client_threads,
1128            nr_slow_threads,
1129            serve_p50_us: serve_q.as_ref().map(|q| q.value_at(Pct::P50)),
1130            serve_p99_us: serve_q.as_ref().map(|q| q.value_at(Pct::P99)),
1131            serve_p999_us: serve_q.as_ref().map(|q| q.value_at(Pct::P999)),
1132            serve_min_us: serve_q.as_ref().map(|q| q.min),
1133            serve_max_us: serve_q.as_ref().map(|q| q.max),
1134        }
1135    }
1136}
1137
1138#[cfg(test)]
1139mod tests {
1140    use super::*;
1141
1142    #[test]
1143    fn value_size_distribution_mean_is_small_object_heavy() {
1144        // The representative distribution is small-object-heavy (mean well under
1145        // 1 KiB) with a tail to 64 KiB.
1146        let m = mean_value_size();
1147        assert!(
1148            (200..=500).contains(&m),
1149            "mean value size {m} B is small-object-heavy"
1150        );
1151        assert_eq!(*VALUE_SIZES.last().unwrap(), 65536, "tail reaches 64 KiB");
1152    }
1153
1154    #[test]
1155    fn taobench_stats_merge_and_total_ops_saturate_on_overflow() {
1156        // Pooling guest-runtime op counters must saturate, not wrap: a corrupt
1157        // u64::MAX component must never produce a silently-wrong (wrapped-small)
1158        // qps/hit Rate. Saturating is exact for all in-range data.
1159        let mut a = TaobenchStats {
1160            get_cmds: u64::MAX,
1161            get_misses: u64::MAX,
1162            fast_ops: u64::MAX,
1163            slow_ops: 1,
1164            elapsed_ns: 1000,
1165        };
1166        let b = TaobenchStats {
1167            get_cmds: 5,
1168            get_misses: 5,
1169            fast_ops: 5,
1170            slow_ops: 5,
1171            elapsed_ns: 9000,
1172        };
1173        a.merge(&b);
1174        assert_eq!(a.get_cmds, u64::MAX, "saturates, not wraps to 4");
1175        assert_eq!(a.get_misses, u64::MAX);
1176        assert_eq!(a.fast_ops, u64::MAX);
1177        assert_eq!(a.slow_ops, 6);
1178        assert_eq!(a.elapsed_ns, 9000, "wall window is MAX, not summed");
1179        // total_ops = fast_ops + slow_ops saturates (u64::MAX + 6 -> u64::MAX).
1180        assert_eq!(a.total_ops(), u64::MAX, "total_ops saturates");
1181    }
1182
1183    #[test]
1184    fn engine_serves_ops_and_hit_ratio_settles_near_target_not_one() {
1185        // A short run on a small cache: the hit ratio must settle near the target
1186        // (the eviction<->refill equilibrium), NOT drift to 1.0 (which a
1187        // self-healing cache with no eviction would do).
1188        let cfg = TaobenchConfig::default()
1189            .client_threads(4)
1190            .slow_threads(2)
1191            .cache_capacity_mib(8)
1192            .target_hit_pct(90)
1193            .slow_path_sleep_us(10);
1194        let stop = AtomicBool::new(false);
1195        let progress = AtomicU64::new(0);
1196
1197        std::thread::scope(|s| {
1198            let h = s.spawn(|| run(&cfg, &stop, &progress, None));
1199            // Spin until the engine has served enough ops to reach equilibrium.
1200            while progress.load(Ordering::Relaxed) < 100_000 {
1201                std::hint::spin_loop();
1202            }
1203            stop.store(true, Ordering::Release);
1204            let out = h.join().expect("engine panicked");
1205
1206            let total = out.whole_run.total_ops();
1207            assert!(total > 0, "engine served ops");
1208            assert!(out.whole_run.elapsed_ns > 0, "the run window was measured");
1209
1210            let hit = out.whole_run.fast_ops as f64 / total as f64;
1211            assert!(
1212                (0.80..=0.97).contains(&hit),
1213                "hit ratio {hit} settles near target 0.90 (eviction equilibrium), not 1.0"
1214            );
1215            assert!(
1216                out.whole_run.slow_ops > 0,
1217                "the slow/miss path is exercised"
1218            );
1219        });
1220    }
1221
1222    #[test]
1223    fn taobench_config_serde_roundtrips() {
1224        // The new serialized type roundtrips unchanged.
1225        // Every field set to a non-default value to exercise the full serde surface.
1226        let cfg = TaobenchConfig::default()
1227            .client_threads(8)
1228            .slow_threads(3)
1229            .cache_capacity_mib(128)
1230            .target_hit_pct(85)
1231            .slow_path_sleep_us(250)
1232            .slow_path_p99_us(2500)
1233            .arrival_rate(50_000);
1234        let json = serde_json::to_string(&cfg).expect("TaobenchConfig must serialize");
1235        let back: TaobenchConfig =
1236            serde_json::from_str(&json).expect("TaobenchConfig must deserialize");
1237        assert_eq!(cfg, back, "config roundtrips unchanged");
1238    }
1239
1240    #[test]
1241    fn worktype_taobench_registration_and_serde() {
1242        use crate::workload::WorkType;
1243        let wt = WorkType::taobench(
1244            TaobenchConfig::default()
1245                .client_threads(4)
1246                .target_hit_pct(95),
1247        );
1248        assert_eq!(wt.name(), "Taobench");
1249        // from_name yields the default-config variant.
1250        assert_eq!(
1251            WorkType::from_name("Taobench"),
1252            Some(WorkType::Taobench {
1253                config: TaobenchConfig::default()
1254            })
1255        );
1256        // The variant serde-roundtrips, carrying its config.
1257        let json = serde_json::to_string(&wt).expect("WorkType::Taobench must serialize");
1258        let back: WorkType =
1259            serde_json::from_str(&json).expect("WorkType::Taobench must deserialize");
1260        assert_eq!(wt, back);
1261    }
1262
1263    #[test]
1264    fn taobench_config_reachable_via_prelude() {
1265        // Regression-pin the prelude placement: test authors construct the config
1266        // via `use ktstr::prelude::*`. Dropping TaobenchConfig from the prelude
1267        // would fail this compile. Also exercises the Eq derive.
1268        let cfg: crate::prelude::TaobenchConfig = crate::prelude::TaobenchConfig::default();
1269        assert_eq!(cfg, TaobenchConfig::default());
1270    }
1271
1272    #[test]
1273    fn taobench_stats_serde_roundtrips() {
1274        // TaobenchStats is a pub serialized type that rides real wires — a field
1275        // on WorkerReport (the worker→host postcard payload) and CgroupStats (the
1276        // sidecar JSON) — so a field-rename or serde-attr drift is a silent
1277        // data-loss risk this pins. Every field distinct +
1278        // non-zero to exercise the full serde surface.
1279        let s = TaobenchStats {
1280            get_cmds: 1000,
1281            get_misses: 150,
1282            fast_ops: 850,
1283            slow_ops: 150,
1284            elapsed_ns: 9_000_000_000,
1285        };
1286        let json = serde_json::to_string(&s).expect("TaobenchStats must serialize");
1287        let back: TaobenchStats =
1288            serde_json::from_str(&json).expect("TaobenchStats must deserialize");
1289        assert_eq!(s, back, "TaobenchStats roundtrips unchanged");
1290    }
1291
1292    #[test]
1293    fn taobench_stats_reachable_via_prelude() {
1294        // Regression-pin the prelude placement: TaobenchStats is read off the
1295        // preluded WorkerReport/CgroupStats `taobench_whole` fields, so it must be
1296        // nameable via `use ktstr::prelude::*`. Dropping it from the prelude would
1297        // fail this compile. Also exercises Default + Eq.
1298        let s: crate::prelude::TaobenchStats = crate::prelude::TaobenchStats::default();
1299        assert_eq!(s, TaobenchStats::default());
1300    }
1301
1302    #[test]
1303    fn taobench_config_arrival_rate_default_and_setter() {
1304        // Default is closed loop (no open-loop schedule); the setter sets the
1305        // aggregate ops/sec field.
1306        assert_eq!(
1307            TaobenchConfig::default().arrival_rate,
1308            0,
1309            "default arrival_rate is 0 (closed loop)"
1310        );
1311        let cfg = TaobenchConfig::default().arrival_rate(100_000);
1312        assert_eq!(cfg.arrival_rate, 100_000, "setter sets arrival_rate");
1313    }
1314
1315    #[test]
1316    fn taobench_phase_stats_merge_sums_counters_and_unions_serve_lat() {
1317        // The carrier merge pools counters (Σ ops, MAX wall — TaobenchStats::merge)
1318        // AND unions the serve histograms (PlatStats::combine) — the per-cgroup /
1319        // cross-thread fold.
1320        let mut a = TaobenchPhaseStats {
1321            counters: TaobenchStats {
1322                get_cmds: 10,
1323                get_misses: 2,
1324                fast_ops: 8,
1325                slow_ops: 2,
1326                elapsed_ns: 1000,
1327            },
1328            ..Default::default()
1329        };
1330        a.serve_lat.add_lat(5);
1331        a.serve_lat.add_lat(50);
1332        let mut b = TaobenchPhaseStats {
1333            counters: TaobenchStats {
1334                get_cmds: 6,
1335                get_misses: 1,
1336                fast_ops: 5,
1337                slow_ops: 1,
1338                elapsed_ns: 4000,
1339            },
1340            ..Default::default()
1341        };
1342        b.serve_lat.add_lat(500);
1343        a.merge(&b);
1344        assert_eq!(a.counters.get_cmds, 16, "Σ get_cmds");
1345        assert_eq!(a.counters.fast_ops, 13, "Σ fast_ops");
1346        assert_eq!(a.counters.slow_ops, 3, "Σ slow_ops");
1347        assert_eq!(a.counters.elapsed_ns, 4000, "wall is MAX, not summed");
1348        assert_eq!(
1349            a.serve_lat.sample_count(),
1350            3,
1351            "serve histograms union (2 + 1 samples)"
1352        );
1353    }
1354
1355    #[test]
1356    fn taobench_phase_stats_serde_roundtrips() {
1357        // TaobenchPhaseStats rides the WorkerReport postcard + CgroupStats sidecar
1358        // JSON wires; both the counters and the serve histogram must roundtrip.
1359        let mut s = TaobenchPhaseStats {
1360            counters: TaobenchStats {
1361                get_cmds: 1000,
1362                get_misses: 150,
1363                fast_ops: 850,
1364                slow_ops: 150,
1365                elapsed_ns: 9_000_000_000,
1366            },
1367            ..Default::default()
1368        };
1369        s.serve_lat.add_lat(12);
1370        s.serve_lat.add_lat(340);
1371        s.serve_lat.add_lat(99_999);
1372        let json = serde_json::to_string(&s).expect("TaobenchPhaseStats must serialize");
1373        let back: TaobenchPhaseStats =
1374            serde_json::from_str(&json).expect("TaobenchPhaseStats must deserialize");
1375        assert_eq!(
1376            s, back,
1377            "TaobenchPhaseStats roundtrips unchanged (counters + serve_lat)"
1378        );
1379    }
1380
1381    #[test]
1382    fn open_loop_stamps_serve_latency_on_every_completion() {
1383        // Open loop (non-zero arrival_rate) measures coordinated-omission serve
1384        // latency: every completion — a hit (served inline) AND a miss (served via
1385        // the slow path) — records exactly one sample from its intended arrival, so
1386        // the histogram count equals total_ops. An absurdly high rate keeps the
1387        // pacing in permanent overload (the per-arrival sleep collapses to zero), so
1388        // the test runs at full speed while still exercising stamping on both arms.
1389        let cfg = TaobenchConfig::default()
1390            .client_threads(4)
1391            .slow_threads(2)
1392            .cache_capacity_mib(8)
1393            .target_hit_pct(90)
1394            .slow_path_sleep_us(10)
1395            .arrival_rate(1_000_000_000);
1396        let stop = AtomicBool::new(false);
1397        let progress = AtomicU64::new(0);
1398        let out = std::thread::scope(|s| {
1399            let h = s.spawn(|| run(&cfg, &stop, &progress, None));
1400            while progress.load(Ordering::Relaxed) < 50_000 {
1401                std::hint::spin_loop();
1402            }
1403            stop.store(true, Ordering::Release);
1404            h.join().expect("engine panicked")
1405        });
1406        let total = out.whole_run.total_ops();
1407        assert!(total > 0, "engine served ops");
1408        assert!(out.whole_run.slow_ops > 0, "the miss arm is exercised");
1409        // The whole-run serve distribution is the union of the per-phase
1410        // histograms (the whole-run carrier is counters-only). Its count equals
1411        // total_ops: every completion (hit inline + miss after wait_filled)
1412        // records exactly one serve-latency sample.
1413        let mut serve = PlatStats::default();
1414        for (_epoch, p) in &out.phases {
1415            serve.combine(&p.serve_lat);
1416        }
1417        assert_eq!(
1418            serve.sample_count(),
1419            total,
1420            "every completion (hit + miss) records one serve-latency sample"
1421        );
1422    }
1423
1424    #[test]
1425    fn closed_loop_records_no_serve_latency() {
1426        // arrival_rate == 0 is closed loop: there is no intended-arrival schedule,
1427        // so serve latency is undefined and the histogram stays EMPTY (the
1428        // taobench_serve_*_us keys then read absent, distinct from a measured 0).
1429        // The heavy tail is ALSO enabled (slow_path_p99_us > slow_path_sleep_us) to
1430        // pin that enabling the tail never leaks a serve sample without an open-loop
1431        // schedule — the tail only affects fetch latency, not whether serve latency
1432        // is measured.
1433        let cfg = TaobenchConfig::default()
1434            .client_threads(4)
1435            .slow_threads(2)
1436            .cache_capacity_mib(8)
1437            .target_hit_pct(90)
1438            .slow_path_sleep_us(10)
1439            .slow_path_p99_us(1000); // tail enabled; arrival_rate defaults to 0 (closed loop)
1440        let stop = AtomicBool::new(false);
1441        let progress = AtomicU64::new(0);
1442        let out = std::thread::scope(|s| {
1443            let h = s.spawn(|| run(&cfg, &stop, &progress, None));
1444            while progress.load(Ordering::Relaxed) < 20_000 {
1445                std::hint::spin_loop();
1446            }
1447            stop.store(true, Ordering::Release);
1448            h.join().expect("engine panicked")
1449        });
1450        assert!(out.whole_run.total_ops() > 0, "ops completed");
1451        for (epoch, p) in &out.phases {
1452            assert_eq!(
1453                p.serve_lat.sample_count(),
1454                0,
1455                "closed loop records no per-phase serve latency (epoch {epoch})"
1456            );
1457        }
1458    }
1459
1460    #[test]
1461    fn open_loop_per_phase_carrier_holds_serve_latency() {
1462        // The per-phase carrier (PhaseSlice.taobench) carries the serve histogram
1463        // (the whole-run carrier is counters-only). With a single static epoch
1464        // every completion's sample lands in that epoch's bucket, so the per-phase
1465        // serve count equals the whole-run completion count (total_ops).
1466        let cfg = TaobenchConfig::default()
1467            .client_threads(4)
1468            .slow_threads(2)
1469            .cache_capacity_mib(8)
1470            .target_hit_pct(90)
1471            .slow_path_sleep_us(10)
1472            .arrival_rate(1_000_000_000);
1473        let stop = AtomicBool::new(false);
1474        let progress = AtomicU64::new(0);
1475        let epoch = AtomicU32::new(7);
1476        let out = std::thread::scope(|s| {
1477            let h = s.spawn(|| run(&cfg, &stop, &progress, Some(&epoch)));
1478            while progress.load(Ordering::Relaxed) < 30_000 {
1479                std::hint::spin_loop();
1480            }
1481            stop.store(true, Ordering::Release);
1482            h.join().expect("engine panicked")
1483        });
1484        let phase7 = out
1485            .phases
1486            .iter()
1487            .find(|(e, _)| *e == 7)
1488            .map(|(_, p)| p)
1489            .expect("epoch-7 phase carrier present");
1490        assert!(
1491            phase7.serve_lat.sample_count() > 0,
1492            "per-phase serve latency recorded"
1493        );
1494        assert_eq!(
1495            phase7.serve_lat.sample_count(),
1496            out.whole_run.total_ops(),
1497            "single-epoch run: every completion stamps one per-phase serve sample"
1498        );
1499    }
1500
1501    #[test]
1502    fn open_loop_pacing_sleep_is_bounded_for_prompt_shutdown() {
1503        // A degenerate low arrival_rate makes the inter-arrival interval ~1s, but a
1504        // client waiting for its next scheduled arrival must observe `stop` within
1505        // STOP_POLL_QUANTUM_NS (50 ms) and exit, so the join completes well under
1506        // one interval. An unbounded pacing sleep would block the join for the full
1507        // ~1s. The 500 ms bound is 10x the quantum (robust to scheduling jitter)
1508        // and half the interval (so it cleanly distinguishes bounded from
1509        // unbounded).
1510        let cfg = TaobenchConfig::default()
1511            .client_threads(1)
1512            .slow_threads(1)
1513            .cache_capacity_mib(4)
1514            .target_hit_pct(90)
1515            .slow_path_sleep_us(0)
1516            .arrival_rate(1); // interval_ns = 1e9 * 1 / 1 = 1s
1517        let stop = AtomicBool::new(false);
1518        let progress = AtomicU64::new(0);
1519        let start = std::time::Instant::now();
1520        std::thread::scope(|s| {
1521            let h = s.spawn(|| run(&cfg, &stop, &progress, None));
1522            // Let the client reach its first (long) pacing sleep, then shut down.
1523            std::thread::sleep(std::time::Duration::from_millis(20));
1524            stop.store(true, Ordering::Release);
1525            let _ = h.join().expect("engine panicked");
1526        });
1527        let elapsed = start.elapsed();
1528        assert!(
1529            elapsed < std::time::Duration::from_millis(500),
1530            "open-loop client joined in {elapsed:?}; the pacing sleep must be \
1531             bounded (interval is ~1s — an unbounded sleep would block the join)"
1532        );
1533    }
1534
1535    #[test]
1536    fn taobench_config_slow_path_p99_default_and_setter() {
1537        // Default keeps the legacy fixed-latency path (no tail); the setter sets the
1538        // p99 tail target.
1539        assert_eq!(
1540            TaobenchConfig::default().slow_path_p99_us,
1541            0,
1542            "default slow_path_p99_us is 0 (fixed latency)"
1543        );
1544        let cfg = TaobenchConfig::default().slow_path_p99_us(5000);
1545        assert_eq!(cfg.slow_path_p99_us, 5000, "setter sets slow_path_p99_us");
1546    }
1547
1548    #[test]
1549    fn taobench_service_pareto_mapping_and_legacy_gate() {
1550        // The (p50, p99) knob maps to a Pareto whose analytic quantiles round-trip:
1551        // alpha = ln(50)/ln(p99/p50); scale = p50 * 0.5^(1/alpha).
1552        let cfg = TaobenchConfig::default()
1553            .slow_path_sleep_us(100)
1554            .slow_path_p99_us(1000);
1555        let (scale, inv_neg_alpha) = cfg.resolve_service_pareto().expect("tail enabled");
1556        let alpha = -1.0 / inv_neg_alpha;
1557        assert!(
1558            (alpha - 50.0_f64.ln() / 10.0_f64.ln()).abs() < 1e-9,
1559            "alpha = ln(50)/ln(10), got {alpha}",
1560        );
1561        // The Pareto q-quantile scale*(1-q)^(-1/alpha) round-trips the config: the
1562        // median quantile is p50, the 99th percentile is p99.
1563        let median = scale * 0.5_f64.powf(-1.0 / alpha);
1564        let p99 = scale * 0.01_f64.powf(-1.0 / alpha);
1565        assert!(
1566            (median - 100.0).abs() < 1e-6,
1567            "median quantile = p50 (100), got {median}",
1568        );
1569        assert!(
1570            (p99 - 1000.0).abs() < 1e-6,
1571            "p99 quantile = configured (1000), got {p99}",
1572        );
1573        // Legacy gate: p99 == 0, p99 < p50, p99 == p50, and p50 == 0 all resolve to
1574        // the fixed path (None).
1575        let base = TaobenchConfig::default().slow_path_sleep_us(100);
1576        assert!(
1577            base.clone()
1578                .slow_path_p99_us(0)
1579                .resolve_service_pareto()
1580                .is_none(),
1581            "p99 == 0 is fixed",
1582        );
1583        assert!(
1584            base.clone()
1585                .slow_path_p99_us(50)
1586                .resolve_service_pareto()
1587                .is_none(),
1588            "p99 < p50 is fixed",
1589        );
1590        assert!(
1591            base.clone()
1592                .slow_path_p99_us(100)
1593                .resolve_service_pareto()
1594                .is_none(),
1595            "p99 == p50 is fixed",
1596        );
1597        assert!(
1598            TaobenchConfig::default()
1599                .slow_path_sleep_us(0)
1600                .slow_path_p99_us(1000)
1601                .resolve_service_pareto()
1602                .is_none(),
1603            "p50 == 0 has no Pareto scale",
1604        );
1605    }
1606
1607    #[test]
1608    fn taobench_service_sampler_matches_configured_quantiles() {
1609        // Drawing many samples from the resolved Pareto through the actual
1610        // measurement path (PlatStats) yields a histogram whose p50/p99 land near
1611        // the configured (100, 1000) µs. Deterministic (fixed seed); the bands
1612        // absorb PlatStats log-bucketing + sampling variance.
1613        use crate::workload::schbench::plat::Pct;
1614        let cfg = TaobenchConfig::default()
1615            .slow_path_sleep_us(100)
1616            .slow_path_p99_us(1000);
1617        let (scale, inv_neg_alpha) = cfg.resolve_service_pareto().unwrap();
1618        let mut rng = Rng::new(0x5A3C_1234);
1619        let mut plat = PlatStats::default();
1620        for _ in 0..200_000 {
1621            let us = sample_service_us(&mut rng, scale, inv_neg_alpha);
1622            plat.add_lat(us.min(u32::MAX as u64) as u32);
1623        }
1624        let q = plat.percentiles();
1625        let p50 = q.value_at(Pct::P50);
1626        let p99 = q.value_at(Pct::P99);
1627        assert!(
1628            (90..=115).contains(&p50),
1629            "empirical p50 {p50} ≈ configured 100µs (tight band: rejects a sampler \
1630             bit-shift / lost-scale bug, absorbs only log-bucketing + seed noise)",
1631        );
1632        assert!(
1633            (850..=1200).contains(&p99),
1634            "empirical p99 {p99} ≈ configured 1000µs (tight band: rejects a ~±50% \
1635             sampler error, absorbs only log-bucketing + seed noise)",
1636        );
1637    }
1638
1639    #[test]
1640    fn taobench_service_sampler_clamps_pathological_tail() {
1641        // A pathological draw (huge scale, so any U gives a value far above the cap)
1642        // is clamped to MAX_SERVICE_US rather than parking a dispatcher unboundedly.
1643        let mut rng = Rng::new(7);
1644        let v = sample_service_us(&mut rng, 1e12, -0.5);
1645        assert_eq!(v, MAX_SERVICE_US, "huge draw clamps to MAX_SERVICE_US");
1646    }
1647
1648    #[test]
1649    fn taobench_service_sampler_is_deterministic_from_seed() {
1650        // Same seed -> same service-time draw sequence (the determinism the engine
1651        // relies on for reproducible runs).
1652        let cfg = TaobenchConfig::default()
1653            .slow_path_sleep_us(100)
1654            .slow_path_p99_us(1000);
1655        let (scale, inv_neg_alpha) = cfg.resolve_service_pareto().unwrap();
1656        let mut a = Rng::new(42);
1657        let mut b = Rng::new(42);
1658        for _ in 0..1000 {
1659            assert_eq!(
1660                sample_service_us(&mut a, scale, inv_neg_alpha),
1661                sample_service_us(&mut b, scale, inv_neg_alpha),
1662            );
1663        }
1664    }
1665
1666    #[test]
1667    fn rng_f64_open01_is_in_open_closed_unit_interval() {
1668        // f64_open01 must yield U in (0, 1]: 0 is excluded (U^(−1/α) would be
1669        // infinite in the Pareto inverse-CDF), 1 is included (the distribution
1670        // floor). Pin the boundary contract directly over many fixed-seed draws.
1671        let mut rng = Rng::new(0xF00D_BEEF);
1672        for _ in 0..1_000_000 {
1673            let u = rng.f64_open01();
1674            assert!(
1675                u > 0.0 && u <= 1.0,
1676                "f64_open01 yielded {u}, outside (0, 1]"
1677            );
1678        }
1679    }
1680
1681    // The heavy-tail slow-path service model — the Pareto that
1682    // `resolve_service_pareto` derives from the `slow_path_sleep_us` median +
1683    // `slow_path_p99_us` 99th-percentile knobs, sampled by `sample_service_us` —
1684    // reproduces those quantiles and a heavy tail the fixed path lacks.
1685    //
1686    // Deterministic: a fixed-seed PRNG driving the pure sampler, no threads and
1687    // no wall-clock timing, so it pins the heavy-tail LOGIC without the
1688    // host-scheduling jitter a real-timing run carries (a prior version compared
1689    // wall-clock serve MAX across host threads and flaked on loaded CI runners,
1690    // where the control's jitter spike collapsed the ratio). The end-to-end
1691    // engine path — the slow path drawing from this sampler under a live
1692    // open-loop arrival process — is covered by the coordinated in-VM
1693    // `taobench_open_loop_runs_in_vm` e2e (pinned vCPUs, no host jitter).
1694    #[test]
1695    fn heavy_tail_service_model_reproduces_configured_quantiles() {
1696        // A p99 above the median resolves to a Pareto; at/below it (or a zero
1697        // median) stays on the fixed-latency path.
1698        let heavy = TaobenchConfig::default()
1699            .slow_path_sleep_us(100)
1700            .slow_path_p99_us(2000);
1701        let (scale, inv_neg_alpha) = heavy
1702            .resolve_service_pareto()
1703            .expect("p99 2000µs above median 100µs resolves to a Pareto tail");
1704        for (p50, p99, why) in [
1705            (100u64, 0u64, "p99 == 0 keeps the fixed path"),
1706            (100, 100, "p99 == median is not a tail"),
1707            (0, 2000, "zero median has no Pareto scale"),
1708        ] {
1709            assert!(
1710                TaobenchConfig::default()
1711                    .slow_path_sleep_us(p50)
1712                    .slow_path_p99_us(p99)
1713                    .resolve_service_pareto()
1714                    .is_none(),
1715                "{why}",
1716            );
1717        }
1718
1719        // Fixed-seed draws reproduce the configured quantiles + a heavy tail.
1720        let mut rng = Rng::new(0x00C0_FFEE);
1721        let mut samples: Vec<u64> = (0..200_000)
1722            .map(|_| sample_service_us(&mut rng, scale, inv_neg_alpha))
1723            .collect();
1724        samples.sort_unstable();
1725        let q = |p: f64| samples[((samples.len() as f64 * p) as usize).min(samples.len() - 1)];
1726        let (p50, p99, max) = (q(0.50), q(0.99), *samples.last().unwrap());
1727        // Median ~100µs and p99 ~2000µs (the configured knobs), within a
1728        // generous band only a broken Pareto mapping would miss.
1729        assert!(
1730            (50..=200).contains(&p50),
1731            "empirical median {p50}µs should track the configured 100µs",
1732        );
1733        assert!(
1734            (1000..=4000).contains(&p99),
1735            "empirical p99 {p99}µs should track the configured 2000µs",
1736        );
1737        assert!(
1738            max > p99.saturating_mul(2),
1739            "max {max}µs must dwarf p99 {p99}µs — the Pareto tail the fixed path lacks",
1740        );
1741    }
1742}