ktstr/workload/
affinity.rs

1//! CPU affinity intent + resolution for worker tasks.
2//!
3//! [`AffinityIntent`] expresses the test author's request before
4//! topology resolution; [`ResolvedAffinity`] is the post-resolution
5//! shape the spawn pipeline consumes. [`resolve_affinity`] walks the
6//! `ResolvedAffinity` to a concrete CPU set; [`set_thread_affinity`]
7//! issues the `sched_setaffinity` syscall; [`sched_getcpu`] is the
8//! inverse query (which CPU is the current task on right now).
9//!
10//! Re-exported from the parent module via `pub use affinity::*` so
11//! `crate::workload::AffinityIntent` etc. stay valid.
12
13use std::collections::BTreeSet;
14
15use anyhow::{Context, Result};
16
17/// Scenario-level affinity intent for a group of workers.
18///
19/// Resolved to a concrete [`ResolvedAffinity`] at runtime based on the
20/// cgroup's effective cpuset and the VM's topology. When attached to
21/// a [`WorkSpec`](crate::workload::WorkSpec), determines per-worker `sched_setaffinity` masks.
22///
23/// Resolution uses [`resolve_affinity_for_cgroup()`](crate::scenario::resolve_affinity_for_cgroup).
24///
25/// # Naming pattern (Intent vs Resolved)
26///
27/// [`AffinityIntent`] and [`ResolvedAffinity`] form a pre/post-resolution
28/// pair. Variant names line up where the same shape exists on both
29/// sides; payload differences encode the intent → concrete-CPU-set
30/// distinction:
31///
32/// | [`AffinityIntent`]                       | [`ResolvedAffinity`]              |
33/// |------------------------------------------|-----------------------------------|
34/// | `Inherit` (no payload)                   | `None`                            |
35/// | `Exact(BTreeSet<usize>)`                 | `Fixed(BTreeSet<usize>)`          |
36/// | `RandomSubset { from, count }`           | `Random { from, count }`          |
37/// | `SingleCpu` (no payload)                 | `SingleCpu(usize)`                |
38/// | `LlcAligned` / `CrossCgroup`             | `Fixed(...)` (resolver expands)   |
39/// | `SmtSiblingPair` (no payload)            | `Fixed({sibling_a, sibling_b})`   |
40///
41/// Constructor helpers: [`AffinityIntent::exact`] takes any
42/// `IntoIterator<Item = usize>` for the `Exact` set;
43/// [`AffinityIntent::random_subset`] takes the same iterator shape
44/// for the `RandomSubset` pool plus a sample-count argument.
45///
46/// The `SingleCpu` pair specifically: [`AffinityIntent::SingleCpu`]
47/// expresses "pin to one CPU; resolver picks which based on cgroup
48/// state and worker index", and [`ResolvedAffinity::SingleCpu`]
49/// records the concrete CPU id chosen. Reusing the variant name keeps
50/// the pre/post mapping lexically obvious — payload presence
51/// distinguishes intent from resolution without renaming the variant.
52///
53/// [`AffinityIntent::RandomSubset`] carries the resolved pool
54/// (`from`) and sample size (`count`) — sampling itself is deferred
55/// to spawn time so each worker gets an independent draw. The
56/// scenario engine's `resolve_affinity_for_cgroup` materialises the
57/// pool from cgroup cpuset / topology before constructing this
58/// variant; spawn-time `resolve_affinity` samples per-worker.
59/// Construct directly via [`AffinityIntent::random_subset`].
60#[derive(Clone, Debug, Default, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
61#[serde(rename_all = "snake_case")]
62pub enum AffinityIntent {
63    /// No affinity constraint -- inherit from parent cgroup.
64    #[default]
65    Inherit,
66    /// Pin each worker to a random subset of `from`, sampling `count`
67    /// CPUs per worker. Sampling is deferred to spawn time so each
68    /// worker gets an independent draw — mirrors
69    /// [`ResolvedAffinity::Random`] semantics. Construct with the
70    /// resolved pool already materialised; the scenario engine pre-
71    /// resolves topology-aware "pick from cgroup state" intent
72    /// before building this variant.
73    RandomSubset { from: BTreeSet<usize>, count: usize },
74    /// Pin to the CPUs in the worker's LLC.
75    LlcAligned,
76    /// Pin to all CPUs (crosses cgroup boundaries).
77    CrossCgroup,
78    /// Pin to a single CPU.
79    SingleCpu,
80    /// Pin to an exact set of CPUs.
81    Exact(BTreeSet<usize>),
82    /// Pin all workers in the group to the two SMT siblings of one
83    /// physical core. Tests how the scheduler handles two
84    /// compute-bound tasks placed on SMT siblings — both threads
85    /// contend for the core's shared front-end / execution
86    /// resources, exposing scheduler decisions about co-running
87    /// vs. spreading compute load across cores.
88    ///
89    /// Designed for `WorkType::SmtSiblingSpin` and other
90    /// `worker_group_size = 2` variants
91    /// (`WorkType::FutexPingPong`, `WorkType::AsymmetricWaker`,
92    /// `WorkType::SignalStorm`, etc.) where both workers in a
93    /// group are intended to run on a sibling pair. The variant
94    /// has no payload — the resolver picks an SMT-sibling pair
95    /// from the cgroup's effective cpuset (or the full topology
96    /// when no cpuset is active).
97    ///
98    /// Resolution is performed by the scenario engine's
99    /// `resolve_affinity_for_cgroup` (topology-aware, not
100    /// available at the bare `WorkloadHandle::spawn` gate). The
101    /// resolver searches the cpuset for a physical core with at
102    /// least two thread siblings present and resolves to
103    /// [`ResolvedAffinity::Fixed`] containing those two CPU IDs.
104    /// All workers in the group get pinned to that 2-CPU set;
105    /// when `num_workers == 2` the kernel runs one worker on each
106    /// sibling, which is the contention pattern this intent
107    /// targets.
108    ///
109    /// Returns an error from the resolver — NOT a silent
110    /// fallback — when no SMT-sibling pair is available
111    /// (`threads_per_core == 1`, or the cpuset isolates each
112    /// sibling onto a different CPU set). Callers must handle
113    /// the error; running `WorkType::SmtSiblingSpin` without
114    /// SMT siblings would produce a misleading result.
115    SmtSiblingPair,
116}
117
118impl AffinityIntent {
119    /// Construct an `Exact` affinity from any iterator of CPU indices.
120    ///
121    /// Accepts arrays, ranges, `Vec`, `BTreeSet`, or any `IntoIterator<Item = usize>`.
122    pub fn exact(cpus: impl IntoIterator<Item = usize>) -> Self {
123        AffinityIntent::Exact(cpus.into_iter().collect())
124    }
125
126    /// Construct a `RandomSubset` from a pool iterator and a sample
127    /// size. Mirrors the [`Self::exact`] constructor's iterator
128    /// flexibility — accepts arrays, `Vec`, `BTreeSet`, ranges, or
129    /// any `IntoIterator<Item = usize>` for the pool.
130    ///
131    /// Sampling is deferred to spawn time; each worker gets an
132    /// independent `count`-sized draw from `from`. `count > from.len()`
133    /// is clamped to `from.len()` at sample time (topology fact, not
134    /// caller error). `count == 0` and empty `from` are rejected at
135    /// the spawn-time affinity gate with an actionable diagnostic —
136    /// use [`AffinityIntent::Inherit`] for no affinity constraint.
137    pub fn random_subset(from: impl IntoIterator<Item = usize>, count: usize) -> Self {
138        AffinityIntent::RandomSubset {
139            from: from.into_iter().collect(),
140            count,
141        }
142    }
143}
144
145/// Resolved CPU affinity for a worker process.
146///
147/// Created from [`AffinityIntent`] at runtime based on topology and
148/// cpuset assignments. Variant names track [`AffinityIntent`] where the
149/// same shape exists pre/post-resolution; payload presence
150/// distinguishes intent from concrete CPU id(s). See the
151/// [`AffinityIntent`] type doc for the full pre/post mapping table.
152#[derive(Debug, Clone, Default, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
153#[serde(rename_all = "snake_case")]
154pub enum ResolvedAffinity {
155    /// No affinity constraint.
156    #[default]
157    None,
158    /// Pin to a specific set of CPUs.
159    Fixed(BTreeSet<usize>),
160    /// Pin to `count` randomly-chosen CPUs from `from`.
161    ///
162    /// - `count` must be `> 0`; zero is rejected at resolve time
163    ///   (previously it coerced silently to 1 and masked caller bugs).
164    /// - `count > from.len()` is clamped to `from.len()` — asking for
165    ///   more CPUs than the pool contains is a topology fact, not a
166    ///   caller error.
167    /// - `from` empty with `count > 0` is a caller bug: `resolve_affinity`
168    ///   bails (an unsatisfiable sample request would otherwise produce an
169    ///   empty `sched_setaffinity` mask that the kernel rejects with
170    ///   `EINVAL`). The resolution step that produces this variant — see
171    ///   [`crate::scenario::resolve_affinity_for_cgroup`] — bails on
172    ///   empty pools before construction; no silent fallback. Direct
173    ///   constructor callers (e.g. test fixtures) must do the same: use
174    ///   [`ResolvedAffinity::None`] for "no affinity constraint", never
175    ///   `Random { from: empty, count: > 0 }`.
176    Random { from: BTreeSet<usize>, count: usize },
177    /// Pin to a single CPU.
178    SingleCpu(usize),
179}
180
181impl ResolvedAffinity {
182    /// Construct a [`ResolvedAffinity::Fixed`] from any iterator over
183    /// CPU ids. Mirrors [`AffinityIntent::exact`].
184    pub fn fixed(cpus: impl IntoIterator<Item = usize>) -> Self {
185        ResolvedAffinity::Fixed(cpus.into_iter().collect())
186    }
187
188    /// Construct a [`ResolvedAffinity::Random`] from a pool iterator
189    /// and a sample count. Mirrors [`AffinityIntent::random_subset`].
190    pub fn random(from: impl IntoIterator<Item = usize>, count: usize) -> Self {
191        ResolvedAffinity::Random {
192            from: from.into_iter().collect(),
193            count,
194        }
195    }
196
197    /// Construct a [`ResolvedAffinity::SingleCpu`].
198    pub const fn single_cpu(cpu: usize) -> Self {
199        ResolvedAffinity::SingleCpu(cpu)
200    }
201}
202
203/// Resolve a [`ResolvedAffinity`] into the concrete CPU set the
204/// spawn pipeline writes into the worker's `sched_setaffinity` mask.
205///
206/// `Random` samples `count` CPUs from `from` per call (each worker
207/// gets an independent draw at spawn time). Empty `from` is a caller
208/// bug and bails — the upstream resolver
209/// [`crate::scenario::resolve_affinity_for_cgroup`] itself bails
210/// (rather than degrading to [`ResolvedAffinity::None`]) on every
211/// path that would produce an empty pool — empty cpuset intersection
212/// against `RandomSubset.from`, `count == 0`, or any other
213/// unsatisfiable shape. Reaching this fn with an empty `Random.from`
214/// therefore indicates a caller that bypassed the resolver. Invalid
215/// input must fail loudly, never silently degrade to "no affinity
216/// applied". `count == 0` likewise bails.
217pub(crate) fn resolve_affinity(mode: &ResolvedAffinity) -> Result<Option<BTreeSet<usize>>> {
218    match mode {
219        ResolvedAffinity::None => Ok(None),
220        ResolvedAffinity::Fixed(cpus) => Ok(Some(cpus.clone())),
221        ResolvedAffinity::SingleCpu(cpu) => Ok(Some([*cpu].into_iter().collect())),
222        ResolvedAffinity::Random { from, count } => {
223            use rand::seq::IndexedRandom;
224            if *count == 0 {
225                anyhow::bail!(
226                    "ResolvedAffinity::Random.count must be > 0; a zero count \
227                     previously silently coerced to 1, masking caller bugs"
228                );
229            }
230            if from.is_empty() {
231                anyhow::bail!(
232                    "ResolvedAffinity::Random.from is empty with count={count}; \
233                     a worker cannot be pinned to an empty CPU pool. The \
234                     resolution step that produced this Random must reject \
235                     the empty set up-front (e.g. via the bail paths in \
236                     `crate::scenario::resolve_affinity_for_cgroup`) — \
237                     forwarding an unsatisfiable sample request would \
238                     silently drop the affinity constraint",
239                    count = count,
240                );
241            }
242            let pool: Vec<usize> = from.iter().copied().collect();
243            // Clamp count down to the pool size (user asked for more
244            // CPUs than exist). Silent clamp is fine here: the pool
245            // upper bound is a topology fact, not a caller bug.
246            let count = (*count).min(pool.len());
247            Ok(Some(
248                pool.sample(&mut rand::rng(), count).copied().collect(),
249            ))
250        }
251    }
252}
253
254/// Return the CPU the calling task is currently running on.
255///
256/// Falls back to `0` on syscall failure (rare; would mean
257/// `getcpu(2)` is unavailable, which is not the case on any
258/// supported kernel). Wraps [`nix::sched::sched_getcpu`].
259pub(crate) fn sched_getcpu() -> usize {
260    nix::sched::sched_getcpu().unwrap_or(0)
261}
262
263/// Set per-thread CPU affinity via `sched_setaffinity(2)`.
264///
265/// `pid` must be `> 0` — `pid <= 0` has broadcast semantics at the
266/// syscall level (target the calling task or every task in a tgid
267/// depending on layer) and is rejected up-front so no caller passes
268/// an unchecked `0` through.
269pub fn set_thread_affinity(pid: libc::pid_t, cpus: &BTreeSet<usize>) -> Result<()> {
270    use nix::sched::{CpuSet, sched_setaffinity};
271    use nix::unistd::{Pid, SysconfVar, sysconf};
272    // See `set_sched_policy` for the rationale — pid <= 0 has
273    // broadcast semantics at the syscall and must not be passed
274    // through unchecked.
275    if pid <= 0 {
276        anyhow::bail!("sched_setaffinity: invalid pid {pid} (must be > 0)");
277    }
278    // Snapshot the host's online-CPU count and the cpu_set bitmap
279    // width before the loop so the diagnostic on overflow carries
280    // both numbers without re-syscalling per offending CPU. Render
281    // sysconf failure as "unavailable" rather than 0 to disambiguate
282    // a degenerate sysconf result from a legitimately zero count.
283    let online_cpus_str: std::borrow::Cow<'static, str> =
284        match sysconf(SysconfVar::_NPROCESSORS_ONLN).ok().flatten() {
285            Some(n) => format!("{n}").into(),
286            None => "unavailable".into(),
287        };
288    let cpuset_bitmap_width: usize = libc::CPU_SETSIZE as usize;
289    let mut cpu_set = CpuSet::new();
290    for &cpu in cpus {
291        cpu_set.set(cpu).with_context(|| {
292            format!(
293                "CPU {cpu} out of range: cpu_set bitmap holds CPU IDs \
294                 0..{cpuset_bitmap_width} (libc CPU_SETSIZE) and host \
295                 reports {online_cpus_str} online CPUs (sysconf \
296                 _SC_NPROCESSORS_ONLN). Either the cpuset spec was \
297                 resolved against a stale topology or the bitmap cap \
298                 needs raising on this build."
299            )
300        })?;
301    }
302    sched_setaffinity(Pid::from_raw(pid), &cpu_set)
303        .with_context(|| format!("sched_setaffinity pid={pid}"))?;
304    Ok(())
305}
306
307#[cfg(test)]
308mod tests {
309    use super::*;
310    use std::collections::BTreeSet;
311
312    #[test]
313    fn resolve_affinity_none() {
314        let r = resolve_affinity(&ResolvedAffinity::None).unwrap();
315        assert!(r.is_none());
316    }
317    #[test]
318    fn resolve_affinity_fixed() {
319        let cpus: BTreeSet<usize> = [0, 1, 2].into_iter().collect();
320        let r = resolve_affinity(&ResolvedAffinity::Fixed(cpus.clone())).unwrap();
321        assert_eq!(r, Some(cpus));
322    }
323    #[test]
324    fn resolve_affinity_single_cpu() {
325        let r = resolve_affinity(&ResolvedAffinity::SingleCpu(5)).unwrap();
326        assert_eq!(r, Some([5].into_iter().collect()));
327    }
328    /// `ResolvedAffinity` derives `Debug`; the `SingleCpu` variant
329    /// must render its variant name and the embedded CPU id so
330    /// failure-dump and tracing output are diagnosable. Pins the
331    /// derive against accidental removal.
332    #[test]
333    fn resolved_affinity_single_cpu_debug_format() {
334        let dbg = format!("{:?}", ResolvedAffinity::SingleCpu(7));
335        assert!(
336            dbg.contains("SingleCpu"),
337            "Debug output must name the variant, got: {dbg}"
338        );
339        assert!(
340            dbg.contains('7'),
341            "Debug output must include the CPU id payload, got: {dbg}"
342        );
343    }
344    #[test]
345    fn resolve_affinity_random() {
346        let from: BTreeSet<usize> = (0..8).collect();
347        let r = resolve_affinity(&ResolvedAffinity::Random { from, count: 3 }).unwrap();
348        let cpus = r.unwrap();
349        assert_eq!(cpus.len(), 3);
350        assert!(cpus.iter().all(|c| *c < 8));
351    }
352    #[test]
353    fn resolve_affinity_random_clamps_count() {
354        let from: BTreeSet<usize> = [0, 1].into_iter().collect();
355        let r = resolve_affinity(&ResolvedAffinity::Random { from, count: 10 }).unwrap();
356        assert_eq!(r.unwrap().len(), 2);
357    }
358    #[test]
359    fn resolve_affinity_random_single_cpu_pool() {
360        let from: BTreeSet<usize> = [7].into_iter().collect();
361        let r = resolve_affinity(&ResolvedAffinity::Random { from, count: 1 }).unwrap();
362        assert_eq!(r.unwrap(), [7].into_iter().collect());
363    }
364    #[test]
365    fn affinity_mode_debug_shows_cpus() {
366        let a = ResolvedAffinity::Fixed([0, 1, 7].into_iter().collect());
367        let s = format!("{:?}", a);
368        assert!(s.contains("0"), "must show CPU 0");
369        assert!(s.contains("1"), "must show CPU 1");
370        assert!(s.contains("7"), "must show CPU 7");
371        // Different CPU sets produce different output.
372        let b = ResolvedAffinity::Fixed([3, 4].into_iter().collect());
373        let s2 = format!("{:?}", b);
374        assert!(s2.contains("3"), "must show CPU 3");
375        assert_ne!(
376            s, s2,
377            "different CPU sets must produce different debug output"
378        );
379    }
380    #[test]
381    fn affinity_mode_clone_preserves_cpus() {
382        let cpus: BTreeSet<usize> = [2, 5, 7].into_iter().collect();
383        let a = ResolvedAffinity::Random {
384            from: cpus.clone(),
385            count: 2,
386        };
387        let b = a.clone();
388        match b {
389            ResolvedAffinity::Random { from, count } => {
390                assert_eq!(from, cpus, "cloned from set must match original");
391                assert_eq!(count, 2, "cloned count must match original");
392            }
393            _ => panic!("clone must preserve variant"),
394        }
395    }
396    // -- resolve_affinity edge cases --
397
398    #[test]
399    fn resolve_affinity_random_zero_count_rejected() {
400        // Regression: count=0 previously coerced silently to 1, masking
401        // caller bugs. Now it must return an Err.
402        let from: BTreeSet<usize> = (0..4).collect();
403        let err = resolve_affinity(&ResolvedAffinity::Random { from, count: 0 }).unwrap_err();
404        let msg = format!("{err}");
405        assert!(
406            msg.contains("count") && msg.contains("> 0"),
407            "error must name the field: {msg}"
408        );
409    }
410    #[test]
411    fn resolve_affinity_random_empty_pool_bails() {
412        // Empty Random.from with count > 0 is unsatisfiable: a worker
413        // cannot be pinned to an empty CPU pool, and the prior
414        // silent-degrade-to-Ok(None) was a silent-drop bug. The bail
415        // forces the bug to surface and points the caller at the
416        // upstream resolver as the place where empty pools should be
417        // rejected up-front.
418        let from: BTreeSet<usize> = BTreeSet::new();
419        let err = resolve_affinity(&ResolvedAffinity::Random { from, count: 1 }).unwrap_err();
420        let msg = format!("{err}");
421        assert!(
422            msg.contains("empty") && msg.contains("count=1"),
423            "diagnostic must name the empty pool and the count: got {msg}",
424        );
425        // Also pin the actionable suggestion — names the upstream
426        // resolver as the place callers should reject the empty set
427        // so a caller hitting this error learns where to plug the hole.
428        assert!(
429            msg.contains("resolve_affinity_for_cgroup"),
430            "diagnostic must point to the upstream resolver so callers \
431             learn where the empty pool should have been rejected: got {msg}",
432        );
433    }
434
435    #[test]
436    fn sched_getcpu_valid() {
437        let cpu = sched_getcpu();
438        let max = std::thread::available_parallelism()
439            .map(|n| n.get())
440            .unwrap_or(1);
441        assert!(cpu < max, "cpu {cpu} >= max {max}");
442    }
443
444    #[test]
445    fn set_thread_affinity_cpu_zero() {
446        let pid: libc::pid_t = unsafe { libc::getpid() };
447        let cpus: BTreeSet<usize> = [0].into_iter().collect();
448        let result = set_thread_affinity(pid, &cpus);
449        assert!(result.is_ok(), "pinning to CPU 0 should succeed");
450    }
451
452    /// GAP 7: pin that the three constructors produce the same
453    /// values a direct variant construction yields. A regression
454    /// where `fixed(iter)` started normalising the input would
455    /// silently shift downstream semantics.
456    #[test]
457    fn resolved_affinity_constructors_match_direct_variants() {
458        let from_ctor = ResolvedAffinity::fixed([0_usize, 1, 2]);
459        let from_variant = ResolvedAffinity::Fixed([0_usize, 1, 2].into_iter().collect());
460        assert_eq!(from_ctor, from_variant);
461
462        let from_ctor = ResolvedAffinity::random([0_usize, 1, 2, 3], 2);
463        let from_variant = ResolvedAffinity::Random {
464            from: [0_usize, 1, 2, 3].into_iter().collect(),
465            count: 2,
466        };
467        assert_eq!(from_ctor, from_variant);
468
469        let from_ctor = ResolvedAffinity::single_cpu(5);
470        let from_variant = ResolvedAffinity::SingleCpu(5);
471        assert_eq!(from_ctor, from_variant);
472    }
473
474    // Compile-time pin: `ResolvedAffinity::single_cpu` is `pub const
475    // fn`. A regression that drops `const` (e.g. switches to a body
476    // requiring runtime allocation) would silently break the
477    // const-context use case. The `const _` binding fails to
478    // type-check if `single_cpu` is no longer const-evaluable.
479    const _: ResolvedAffinity = ResolvedAffinity::single_cpu(7);
480
481    /// GAP 8: pin that `ResolvedAffinity::default()` is `None`
482    /// and that every variant roundtrips through serde unchanged,
483    /// including empty-payload edge cases for `Fixed` and `Random`.
484    /// Serde drift on any variant breaks failure-dump replay and
485    /// captured-workload reproduction — the persisted JSON would
486    /// deserialize into a different variant or lose payload data.
487    /// A regression that landed `#[serde(skip_serializing_if =
488    /// "BTreeSet::is_empty")]` on the inner field would silently
489    /// drop the variant tag on the empty case; pinning empty
490    /// payloads catches that.
491    #[test]
492    fn resolved_affinity_default_is_none_and_serde_roundtrip_per_variant() {
493        let d: ResolvedAffinity = Default::default();
494        assert_eq!(d, ResolvedAffinity::None);
495
496        let variants = [
497            ResolvedAffinity::None,
498            ResolvedAffinity::Fixed([0_usize, 1, 5].into_iter().collect()),
499            ResolvedAffinity::Fixed(BTreeSet::new()),
500            ResolvedAffinity::Random {
501                from: [0_usize, 1, 2, 3, 4].into_iter().collect(),
502                count: 2,
503            },
504            ResolvedAffinity::Random {
505                from: BTreeSet::new(),
506                count: 0,
507            },
508            ResolvedAffinity::SingleCpu(7),
509        ];
510        for original in &variants {
511            let bytes = serde_json::to_vec(original).expect("serialize");
512            let restored: ResolvedAffinity = serde_json::from_slice(&bytes).expect("deserialize");
513            assert_eq!(restored, *original, "roundtrip drift for {original:?}");
514        }
515    }
516}