ktstr/workload/affinity.rs
1//! CPU affinity intent + resolution for worker tasks.
2//!
3//! [`AffinityIntent`] expresses the test author's request before
4//! topology resolution; [`ResolvedAffinity`] is the post-resolution
5//! shape the spawn pipeline consumes. [`resolve_affinity`] walks the
6//! `ResolvedAffinity` to a concrete CPU set; [`set_thread_affinity`]
7//! issues the `sched_setaffinity` syscall; [`sched_getcpu`] is the
8//! inverse query (which CPU is the current task on right now).
9//!
10//! Re-exported from the parent module via `pub use affinity::*` so
11//! `crate::workload::AffinityIntent` etc. stay valid.
12
13use std::collections::BTreeSet;
14
15use anyhow::{Context, Result};
16
17/// Scenario-level affinity intent for a group of workers.
18///
19/// Resolved to a concrete [`ResolvedAffinity`] at runtime based on the
20/// cgroup's effective cpuset and the VM's topology. When attached to
21/// a [`WorkSpec`](crate::workload::WorkSpec), determines per-worker `sched_setaffinity` masks.
22///
23/// Resolution uses [`resolve_affinity_for_cgroup()`](crate::scenario::resolve_affinity_for_cgroup).
24///
25/// # Naming pattern (Intent vs Resolved)
26///
27/// [`AffinityIntent`] and [`ResolvedAffinity`] form a pre/post-resolution
28/// pair. Variant names line up where the same shape exists on both
29/// sides; payload differences encode the intent → concrete-CPU-set
30/// distinction:
31///
32/// | [`AffinityIntent`] | [`ResolvedAffinity`] |
33/// |------------------------------------------|-----------------------------------|
34/// | `Inherit` (no payload) | `None` |
35/// | `Exact(BTreeSet<usize>)` | `Fixed(BTreeSet<usize>)` |
36/// | `RandomSubset { from, count }` | `Random { from, count }` |
37/// | `SingleCpu` (no payload) | `SingleCpu(usize)` |
38/// | `LlcAligned` / `CrossCgroup` | `Fixed(...)` (resolver expands) |
39/// | `SmtSiblingPair` (no payload) | `Fixed({sibling_a, sibling_b})` |
40///
41/// Constructor helpers: [`AffinityIntent::exact`] takes any
42/// `IntoIterator<Item = usize>` for the `Exact` set;
43/// [`AffinityIntent::random_subset`] takes the same iterator shape
44/// for the `RandomSubset` pool plus a sample-count argument.
45///
46/// The `SingleCpu` pair specifically: [`AffinityIntent::SingleCpu`]
47/// expresses "pin to one CPU; resolver picks which based on cgroup
48/// state and worker index", and [`ResolvedAffinity::SingleCpu`]
49/// records the concrete CPU id chosen. Reusing the variant name keeps
50/// the pre/post mapping lexically obvious — payload presence
51/// distinguishes intent from resolution without renaming the variant.
52///
53/// [`AffinityIntent::RandomSubset`] carries the resolved pool
54/// (`from`) and sample size (`count`) — sampling itself is deferred
55/// to spawn time so each worker gets an independent draw. The
56/// scenario engine's `resolve_affinity_for_cgroup` materialises the
57/// pool from cgroup cpuset / topology before constructing this
58/// variant; spawn-time `resolve_affinity` samples per-worker.
59/// Construct directly via [`AffinityIntent::random_subset`].
60#[derive(Clone, Debug, Default, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
61#[serde(rename_all = "snake_case")]
62pub enum AffinityIntent {
63 /// No affinity constraint -- inherit from parent cgroup.
64 #[default]
65 Inherit,
66 /// Pin each worker to a random subset of `from`, sampling `count`
67 /// CPUs per worker. Sampling is deferred to spawn time so each
68 /// worker gets an independent draw — mirrors
69 /// [`ResolvedAffinity::Random`] semantics. Construct with the
70 /// resolved pool already materialised; the scenario engine pre-
71 /// resolves topology-aware "pick from cgroup state" intent
72 /// before building this variant.
73 RandomSubset { from: BTreeSet<usize>, count: usize },
74 /// Pin to the CPUs in the worker's LLC.
75 LlcAligned,
76 /// Pin to all CPUs (crosses cgroup boundaries).
77 CrossCgroup,
78 /// Pin to a single CPU.
79 SingleCpu,
80 /// Pin to an exact set of CPUs.
81 Exact(BTreeSet<usize>),
82 /// Pin all workers in the group to the two SMT siblings of one
83 /// physical core. Tests how the scheduler handles two
84 /// compute-bound tasks placed on SMT siblings — both threads
85 /// contend for the core's shared front-end / execution
86 /// resources, exposing scheduler decisions about co-running
87 /// vs. spreading compute load across cores.
88 ///
89 /// Designed for `WorkType::SmtSiblingSpin` and other
90 /// `worker_group_size = 2` variants
91 /// (`WorkType::FutexPingPong`, `WorkType::AsymmetricWaker`,
92 /// `WorkType::SignalStorm`, etc.) where both workers in a
93 /// group are intended to run on a sibling pair. The variant
94 /// has no payload — the resolver picks an SMT-sibling pair
95 /// from the cgroup's effective cpuset (or the full topology
96 /// when no cpuset is active).
97 ///
98 /// Resolution is performed by the scenario engine's
99 /// `resolve_affinity_for_cgroup` (topology-aware, not
100 /// available at the bare `WorkloadHandle::spawn` gate). The
101 /// resolver searches the cpuset for a physical core with at
102 /// least two thread siblings present and resolves to
103 /// [`ResolvedAffinity::Fixed`] containing those two CPU IDs.
104 /// All workers in the group get pinned to that 2-CPU set;
105 /// when `num_workers == 2` the kernel runs one worker on each
106 /// sibling, which is the contention pattern this intent
107 /// targets.
108 ///
109 /// Returns an error from the resolver — NOT a silent
110 /// fallback — when no SMT-sibling pair is available
111 /// (`threads_per_core == 1`, or the cpuset isolates each
112 /// sibling onto a different CPU set). Callers must handle
113 /// the error; running `WorkType::SmtSiblingSpin` without
114 /// SMT siblings would produce a misleading result.
115 SmtSiblingPair,
116}
117
118impl AffinityIntent {
119 /// Construct an `Exact` affinity from any iterator of CPU indices.
120 ///
121 /// Accepts arrays, ranges, `Vec`, `BTreeSet`, or any `IntoIterator<Item = usize>`.
122 pub fn exact(cpus: impl IntoIterator<Item = usize>) -> Self {
123 AffinityIntent::Exact(cpus.into_iter().collect())
124 }
125
126 /// Construct a `RandomSubset` from a pool iterator and a sample
127 /// size. Mirrors the [`Self::exact`] constructor's iterator
128 /// flexibility — accepts arrays, `Vec`, `BTreeSet`, ranges, or
129 /// any `IntoIterator<Item = usize>` for the pool.
130 ///
131 /// Sampling is deferred to spawn time; each worker gets an
132 /// independent `count`-sized draw from `from`. `count > from.len()`
133 /// is clamped to `from.len()` at sample time (topology fact, not
134 /// caller error). `count == 0` and empty `from` are rejected at
135 /// the spawn-time affinity gate with an actionable diagnostic —
136 /// use [`AffinityIntent::Inherit`] for no affinity constraint.
137 pub fn random_subset(from: impl IntoIterator<Item = usize>, count: usize) -> Self {
138 AffinityIntent::RandomSubset {
139 from: from.into_iter().collect(),
140 count,
141 }
142 }
143}
144
145/// Resolved CPU affinity for a worker process.
146///
147/// Created from [`AffinityIntent`] at runtime based on topology and
148/// cpuset assignments. Variant names track [`AffinityIntent`] where the
149/// same shape exists pre/post-resolution; payload presence
150/// distinguishes intent from concrete CPU id(s). See the
151/// [`AffinityIntent`] type doc for the full pre/post mapping table.
152#[derive(Debug, Clone, Default, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
153#[serde(rename_all = "snake_case")]
154pub enum ResolvedAffinity {
155 /// No affinity constraint.
156 #[default]
157 None,
158 /// Pin to a specific set of CPUs.
159 Fixed(BTreeSet<usize>),
160 /// Pin to `count` randomly-chosen CPUs from `from`.
161 ///
162 /// - `count` must be `> 0`; zero is rejected at resolve time
163 /// (previously it coerced silently to 1 and masked caller bugs).
164 /// - `count > from.len()` is clamped to `from.len()` — asking for
165 /// more CPUs than the pool contains is a topology fact, not a
166 /// caller error.
167 /// - `from` empty with `count > 0` is a caller bug: `resolve_affinity`
168 /// bails (an unsatisfiable sample request would otherwise produce an
169 /// empty `sched_setaffinity` mask that the kernel rejects with
170 /// `EINVAL`). The resolution step that produces this variant — see
171 /// [`crate::scenario::resolve_affinity_for_cgroup`] — bails on
172 /// empty pools before construction; no silent fallback. Direct
173 /// constructor callers (e.g. test fixtures) must do the same: use
174 /// [`ResolvedAffinity::None`] for "no affinity constraint", never
175 /// `Random { from: empty, count: > 0 }`.
176 Random { from: BTreeSet<usize>, count: usize },
177 /// Pin to a single CPU.
178 SingleCpu(usize),
179}
180
181impl ResolvedAffinity {
182 /// Construct a [`ResolvedAffinity::Fixed`] from any iterator over
183 /// CPU ids. Mirrors [`AffinityIntent::exact`].
184 pub fn fixed(cpus: impl IntoIterator<Item = usize>) -> Self {
185 ResolvedAffinity::Fixed(cpus.into_iter().collect())
186 }
187
188 /// Construct a [`ResolvedAffinity::Random`] from a pool iterator
189 /// and a sample count. Mirrors [`AffinityIntent::random_subset`].
190 pub fn random(from: impl IntoIterator<Item = usize>, count: usize) -> Self {
191 ResolvedAffinity::Random {
192 from: from.into_iter().collect(),
193 count,
194 }
195 }
196
197 /// Construct a [`ResolvedAffinity::SingleCpu`].
198 pub const fn single_cpu(cpu: usize) -> Self {
199 ResolvedAffinity::SingleCpu(cpu)
200 }
201}
202
203/// Resolve a [`ResolvedAffinity`] into the concrete CPU set the
204/// spawn pipeline writes into the worker's `sched_setaffinity` mask.
205///
206/// `Random` samples `count` CPUs from `from` per call (each worker
207/// gets an independent draw at spawn time). Empty `from` is a caller
208/// bug and bails — the upstream resolver
209/// [`crate::scenario::resolve_affinity_for_cgroup`] itself bails
210/// (rather than degrading to [`ResolvedAffinity::None`]) on every
211/// path that would produce an empty pool — empty cpuset intersection
212/// against `RandomSubset.from`, `count == 0`, or any other
213/// unsatisfiable shape. Reaching this fn with an empty `Random.from`
214/// therefore indicates a caller that bypassed the resolver. Invalid
215/// input must fail loudly, never silently degrade to "no affinity
216/// applied". `count == 0` likewise bails.
217pub(crate) fn resolve_affinity(mode: &ResolvedAffinity) -> Result<Option<BTreeSet<usize>>> {
218 match mode {
219 ResolvedAffinity::None => Ok(None),
220 ResolvedAffinity::Fixed(cpus) => Ok(Some(cpus.clone())),
221 ResolvedAffinity::SingleCpu(cpu) => Ok(Some([*cpu].into_iter().collect())),
222 ResolvedAffinity::Random { from, count } => {
223 use rand::seq::IndexedRandom;
224 if *count == 0 {
225 anyhow::bail!(
226 "ResolvedAffinity::Random.count must be > 0; a zero count \
227 previously silently coerced to 1, masking caller bugs"
228 );
229 }
230 if from.is_empty() {
231 anyhow::bail!(
232 "ResolvedAffinity::Random.from is empty with count={count}; \
233 a worker cannot be pinned to an empty CPU pool. The \
234 resolution step that produced this Random must reject \
235 the empty set up-front (e.g. via the bail paths in \
236 `crate::scenario::resolve_affinity_for_cgroup`) — \
237 forwarding an unsatisfiable sample request would \
238 silently drop the affinity constraint",
239 count = count,
240 );
241 }
242 let pool: Vec<usize> = from.iter().copied().collect();
243 // Clamp count down to the pool size (user asked for more
244 // CPUs than exist). Silent clamp is fine here: the pool
245 // upper bound is a topology fact, not a caller bug.
246 let count = (*count).min(pool.len());
247 Ok(Some(
248 pool.sample(&mut rand::rng(), count).copied().collect(),
249 ))
250 }
251 }
252}
253
254/// Return the CPU the calling task is currently running on.
255///
256/// Falls back to `0` on syscall failure (rare; would mean
257/// `getcpu(2)` is unavailable, which is not the case on any
258/// supported kernel). Wraps [`nix::sched::sched_getcpu`].
259pub(crate) fn sched_getcpu() -> usize {
260 nix::sched::sched_getcpu().unwrap_or(0)
261}
262
263/// Set per-thread CPU affinity via `sched_setaffinity(2)`.
264///
265/// `pid` must be `> 0` — `pid <= 0` has broadcast semantics at the
266/// syscall level (target the calling task or every task in a tgid
267/// depending on layer) and is rejected up-front so no caller passes
268/// an unchecked `0` through.
269pub fn set_thread_affinity(pid: libc::pid_t, cpus: &BTreeSet<usize>) -> Result<()> {
270 use nix::sched::{CpuSet, sched_setaffinity};
271 use nix::unistd::{Pid, SysconfVar, sysconf};
272 // See `set_sched_policy` for the rationale — pid <= 0 has
273 // broadcast semantics at the syscall and must not be passed
274 // through unchecked.
275 if pid <= 0 {
276 anyhow::bail!("sched_setaffinity: invalid pid {pid} (must be > 0)");
277 }
278 // Snapshot the host's online-CPU count and the cpu_set bitmap
279 // width before the loop so the diagnostic on overflow carries
280 // both numbers without re-syscalling per offending CPU. Render
281 // sysconf failure as "unavailable" rather than 0 to disambiguate
282 // a degenerate sysconf result from a legitimately zero count.
283 let online_cpus_str: std::borrow::Cow<'static, str> =
284 match sysconf(SysconfVar::_NPROCESSORS_ONLN).ok().flatten() {
285 Some(n) => format!("{n}").into(),
286 None => "unavailable".into(),
287 };
288 let cpuset_bitmap_width: usize = libc::CPU_SETSIZE as usize;
289 let mut cpu_set = CpuSet::new();
290 for &cpu in cpus {
291 cpu_set.set(cpu).with_context(|| {
292 format!(
293 "CPU {cpu} out of range: cpu_set bitmap holds CPU IDs \
294 0..{cpuset_bitmap_width} (libc CPU_SETSIZE) and host \
295 reports {online_cpus_str} online CPUs (sysconf \
296 _SC_NPROCESSORS_ONLN). Either the cpuset spec was \
297 resolved against a stale topology or the bitmap cap \
298 needs raising on this build."
299 )
300 })?;
301 }
302 sched_setaffinity(Pid::from_raw(pid), &cpu_set)
303 .with_context(|| format!("sched_setaffinity pid={pid}"))?;
304 Ok(())
305}
306
307#[cfg(test)]
308mod tests {
309 use super::*;
310 use std::collections::BTreeSet;
311
312 #[test]
313 fn resolve_affinity_none() {
314 let r = resolve_affinity(&ResolvedAffinity::None).unwrap();
315 assert!(r.is_none());
316 }
317 #[test]
318 fn resolve_affinity_fixed() {
319 let cpus: BTreeSet<usize> = [0, 1, 2].into_iter().collect();
320 let r = resolve_affinity(&ResolvedAffinity::Fixed(cpus.clone())).unwrap();
321 assert_eq!(r, Some(cpus));
322 }
323 #[test]
324 fn resolve_affinity_single_cpu() {
325 let r = resolve_affinity(&ResolvedAffinity::SingleCpu(5)).unwrap();
326 assert_eq!(r, Some([5].into_iter().collect()));
327 }
328 /// `ResolvedAffinity` derives `Debug`; the `SingleCpu` variant
329 /// must render its variant name and the embedded CPU id so
330 /// failure-dump and tracing output are diagnosable. Pins the
331 /// derive against accidental removal.
332 #[test]
333 fn resolved_affinity_single_cpu_debug_format() {
334 let dbg = format!("{:?}", ResolvedAffinity::SingleCpu(7));
335 assert!(
336 dbg.contains("SingleCpu"),
337 "Debug output must name the variant, got: {dbg}"
338 );
339 assert!(
340 dbg.contains('7'),
341 "Debug output must include the CPU id payload, got: {dbg}"
342 );
343 }
344 #[test]
345 fn resolve_affinity_random() {
346 let from: BTreeSet<usize> = (0..8).collect();
347 let r = resolve_affinity(&ResolvedAffinity::Random { from, count: 3 }).unwrap();
348 let cpus = r.unwrap();
349 assert_eq!(cpus.len(), 3);
350 assert!(cpus.iter().all(|c| *c < 8));
351 }
352 #[test]
353 fn resolve_affinity_random_clamps_count() {
354 let from: BTreeSet<usize> = [0, 1].into_iter().collect();
355 let r = resolve_affinity(&ResolvedAffinity::Random { from, count: 10 }).unwrap();
356 assert_eq!(r.unwrap().len(), 2);
357 }
358 #[test]
359 fn resolve_affinity_random_single_cpu_pool() {
360 let from: BTreeSet<usize> = [7].into_iter().collect();
361 let r = resolve_affinity(&ResolvedAffinity::Random { from, count: 1 }).unwrap();
362 assert_eq!(r.unwrap(), [7].into_iter().collect());
363 }
364 #[test]
365 fn affinity_mode_debug_shows_cpus() {
366 let a = ResolvedAffinity::Fixed([0, 1, 7].into_iter().collect());
367 let s = format!("{:?}", a);
368 assert!(s.contains("0"), "must show CPU 0");
369 assert!(s.contains("1"), "must show CPU 1");
370 assert!(s.contains("7"), "must show CPU 7");
371 // Different CPU sets produce different output.
372 let b = ResolvedAffinity::Fixed([3, 4].into_iter().collect());
373 let s2 = format!("{:?}", b);
374 assert!(s2.contains("3"), "must show CPU 3");
375 assert_ne!(
376 s, s2,
377 "different CPU sets must produce different debug output"
378 );
379 }
380 #[test]
381 fn affinity_mode_clone_preserves_cpus() {
382 let cpus: BTreeSet<usize> = [2, 5, 7].into_iter().collect();
383 let a = ResolvedAffinity::Random {
384 from: cpus.clone(),
385 count: 2,
386 };
387 let b = a.clone();
388 match b {
389 ResolvedAffinity::Random { from, count } => {
390 assert_eq!(from, cpus, "cloned from set must match original");
391 assert_eq!(count, 2, "cloned count must match original");
392 }
393 _ => panic!("clone must preserve variant"),
394 }
395 }
396 // -- resolve_affinity edge cases --
397
398 #[test]
399 fn resolve_affinity_random_zero_count_rejected() {
400 // Regression: count=0 previously coerced silently to 1, masking
401 // caller bugs. Now it must return an Err.
402 let from: BTreeSet<usize> = (0..4).collect();
403 let err = resolve_affinity(&ResolvedAffinity::Random { from, count: 0 }).unwrap_err();
404 let msg = format!("{err}");
405 assert!(
406 msg.contains("count") && msg.contains("> 0"),
407 "error must name the field: {msg}"
408 );
409 }
410 #[test]
411 fn resolve_affinity_random_empty_pool_bails() {
412 // Empty Random.from with count > 0 is unsatisfiable: a worker
413 // cannot be pinned to an empty CPU pool, and the prior
414 // silent-degrade-to-Ok(None) was a silent-drop bug. The bail
415 // forces the bug to surface and points the caller at the
416 // upstream resolver as the place where empty pools should be
417 // rejected up-front.
418 let from: BTreeSet<usize> = BTreeSet::new();
419 let err = resolve_affinity(&ResolvedAffinity::Random { from, count: 1 }).unwrap_err();
420 let msg = format!("{err}");
421 assert!(
422 msg.contains("empty") && msg.contains("count=1"),
423 "diagnostic must name the empty pool and the count: got {msg}",
424 );
425 // Also pin the actionable suggestion — names the upstream
426 // resolver as the place callers should reject the empty set
427 // so a caller hitting this error learns where to plug the hole.
428 assert!(
429 msg.contains("resolve_affinity_for_cgroup"),
430 "diagnostic must point to the upstream resolver so callers \
431 learn where the empty pool should have been rejected: got {msg}",
432 );
433 }
434
435 #[test]
436 fn sched_getcpu_valid() {
437 let cpu = sched_getcpu();
438 let max = std::thread::available_parallelism()
439 .map(|n| n.get())
440 .unwrap_or(1);
441 assert!(cpu < max, "cpu {cpu} >= max {max}");
442 }
443
444 #[test]
445 fn set_thread_affinity_cpu_zero() {
446 let pid: libc::pid_t = unsafe { libc::getpid() };
447 let cpus: BTreeSet<usize> = [0].into_iter().collect();
448 let result = set_thread_affinity(pid, &cpus);
449 assert!(result.is_ok(), "pinning to CPU 0 should succeed");
450 }
451
452 /// GAP 7: pin that the three constructors produce the same
453 /// values a direct variant construction yields. A regression
454 /// where `fixed(iter)` started normalising the input would
455 /// silently shift downstream semantics.
456 #[test]
457 fn resolved_affinity_constructors_match_direct_variants() {
458 let from_ctor = ResolvedAffinity::fixed([0_usize, 1, 2]);
459 let from_variant = ResolvedAffinity::Fixed([0_usize, 1, 2].into_iter().collect());
460 assert_eq!(from_ctor, from_variant);
461
462 let from_ctor = ResolvedAffinity::random([0_usize, 1, 2, 3], 2);
463 let from_variant = ResolvedAffinity::Random {
464 from: [0_usize, 1, 2, 3].into_iter().collect(),
465 count: 2,
466 };
467 assert_eq!(from_ctor, from_variant);
468
469 let from_ctor = ResolvedAffinity::single_cpu(5);
470 let from_variant = ResolvedAffinity::SingleCpu(5);
471 assert_eq!(from_ctor, from_variant);
472 }
473
474 // Compile-time pin: `ResolvedAffinity::single_cpu` is `pub const
475 // fn`. A regression that drops `const` (e.g. switches to a body
476 // requiring runtime allocation) would silently break the
477 // const-context use case. The `const _` binding fails to
478 // type-check if `single_cpu` is no longer const-evaluable.
479 const _: ResolvedAffinity = ResolvedAffinity::single_cpu(7);
480
481 /// GAP 8: pin that `ResolvedAffinity::default()` is `None`
482 /// and that every variant roundtrips through serde unchanged,
483 /// including empty-payload edge cases for `Fixed` and `Random`.
484 /// Serde drift on any variant breaks failure-dump replay and
485 /// captured-workload reproduction — the persisted JSON would
486 /// deserialize into a different variant or lose payload data.
487 /// A regression that landed `#[serde(skip_serializing_if =
488 /// "BTreeSet::is_empty")]` on the inner field would silently
489 /// drop the variant tag on the empty case; pinning empty
490 /// payloads catches that.
491 #[test]
492 fn resolved_affinity_default_is_none_and_serde_roundtrip_per_variant() {
493 let d: ResolvedAffinity = Default::default();
494 assert_eq!(d, ResolvedAffinity::None);
495
496 let variants = [
497 ResolvedAffinity::None,
498 ResolvedAffinity::Fixed([0_usize, 1, 5].into_iter().collect()),
499 ResolvedAffinity::Fixed(BTreeSet::new()),
500 ResolvedAffinity::Random {
501 from: [0_usize, 1, 2, 3, 4].into_iter().collect(),
502 count: 2,
503 },
504 ResolvedAffinity::Random {
505 from: BTreeSet::new(),
506 count: 0,
507 },
508 ResolvedAffinity::SingleCpu(7),
509 ];
510 for original in &variants {
511 let bytes = serde_json::to_vec(original).expect("serialize");
512 let restored: ResolvedAffinity = serde_json::from_slice(&bytes).expect("deserialize");
513 assert_eq!(restored, *original, "roundtrip drift for {original:?}");
514 }
515 }
516}