ktstr/ctprof_compare/groups.rs
1//! Group construction, per-group aggregation, and the cgroup-path
2//! flatten helpers consumed by [`super::compare()`].
3//!
4//! Three layers:
5//!
6//! 1. [`build_groups`] / [`build_cgroup_key_map`] — partition a
7//! snapshot's threads by [`super::GroupBy`] axis (pcomm, cgroup,
8//! comm, comm-exact), apply pattern-aware skeleton clustering
9//! where applicable, and produce a `BTreeMap<key, ThreadGroup>`
10//! keyed by the post-tightening join key. The N:1 pattern-bucket
11//! promotion ([`super::CompareOptions::no_thread_normalize`])
12//! determines whether buckets share a token-normalized skeleton
13//! or stay at the literal `pcomm` / `comm` value.
14//!
15//! 2. [`aggregate`] / [`mode_aggregate`] / [`build_row`] — apply
16//! per-metric reduction rules ([`super::AggRule`]) across a
17//! group's threads to produce [`super::Aggregated`] values, then
18//! materialize one [`super::DiffRow`] per `(group, metric)` pair
19//! with side-by-side baseline + candidate values and the
20//! precomputed delta / pct cells. The Mode arm dispatches
21//! through [`mode_aggregate`] so the categorical empty-bucket
22//! contract lives in one place; the numeric arms delegate to
23//! the typed traits in [`crate::metric_types`]
24//! (`Summable::sum_across`, `Maxable::max_across`,
25//! `Rangeable::range_across`).
26//!
27//! 3. [`collect_smaps_rollup`] / [`collect_smaps_rollup_hierarchical`]
28//! — pull the per-process smaps_rollup map (from the leader
29//! thread of each tgid) and key by the same pattern as the
30//! primary-table buckets so byte-counts join across snapshots
31//! when PIDs are ephemeral. The hierarchical variant honors
32//! cgroup-flatten patterns and the auto-normalize key map so
33//! smaps rows align with the primary table even after
34//! cgroup-path tightening.
35//!
36//! [`flatten_cgroup_path`] / [`compile_flatten_patterns`] are
37//! the helpers the caller side ([`super::compare()`] +
38//! [`super::flatten_cgroup_stats`]) uses to apply user-supplied
39//! glob patterns against cgroup paths before bucket assignment.
40
41use std::collections::BTreeMap;
42
43use crate::ctprof::{CtprofSnapshot, ThreadState};
44
45use super::{
46 AffinitySummary, AggRule, Aggregated, CTPROF_METRICS, CtprofMetricDef, DiffRow, GroupBy,
47 ThreadGroup,
48 pattern::{cgroup_normalize_skeleton, pattern_key, tighten_group},
49};
50
51/// Walk a snapshot's threads and pull non-empty smaps_rollup
52/// maps off the leader threads (tid == tgid; non-leader threads
53/// land at empty map per the leader-dedup contract).
54///
55/// Keying:
56///
57/// - Default normalization (`no_thread_normalize: false`): key is
58/// `pattern_key(&t.pcomm)` — pcomm only, the `[tgid]` suffix is
59/// DROPPED. The tgid digits would always normalize to `{N}` and
60/// add no discriminating signal to the join key, so omitting
61/// them makes smaps keys match the primary-table Pcomm group
62/// keys exactly (`kworker/{N}:{N}`, `firefox`, `worker-{N}`,
63/// etc.).
64///
65/// No singleton revert. Unlike [`build_groups`], which reverts a
66/// pattern_key to the literal name when only one contributor
67/// shares the skeleton, `collect_smaps_rollup` always normalizes
68/// when normalization is enabled regardless of how many PIDs
69/// share the bucket. The reason is structural: smaps keys exist
70/// to JOIN baseline vs candidate across snapshots, and PIDs are
71/// per-snapshot ephemeral. A singleton-revert path would emit a
72/// literal `worker[7]` on baseline and a literal `worker[1234]`
73/// on candidate — two never-matching keys — orphaning every
74/// cross-snapshot row. The build_groups invariant ("don't
75/// advertise a pattern that no peer shares") doesn't apply on
76/// the smaps axis because the bucket's role isn't intra-
77/// snapshot fleet aggregation; it's cross-snapshot memory
78/// diffing.
79///
80/// - Literal mode (`no_thread_normalize: true`): key is
81/// `pcomm[tgid]` so each PID stays attributable to its
82/// specific instance. The tradeoff is that rows only join
83/// across snapshots when the same process instance ran on
84/// both sides — the `[tgid]` is preserved precisely so two
85/// distinct PIDs sharing a pcomm don't collide within a
86/// snapshot.
87///
88/// Aggregation: multiple leader threads mapping to the same
89/// key (default mode: a fleet of `worker-{N}` parents) SUM
90/// their per-field byte counts. `Rss`, `Pss`, `Private_*`,
91/// `Shared_*` etc. each accumulate via `saturating_add` —
92/// memory quantities are additive across the merged bucket.
93/// `saturating_add` mirrors the cumulative-counter merge policy
94/// elsewhere in ctprof_compare (`cgroup_merge.rs`: `usage_usec`,
95/// `throttled_usec`); a
96/// u64 byte-count overflow implies more than 16 EiB of resident
97/// memory across the bucket, well past any realistic host.
98///
99/// Caveat on `Shared_*` aggregation: when multiple PIDs in a
100/// merged bucket share physical pages (the COW case for forked
101/// children, mmap'd shared libraries, etc.), summing each PID's
102/// per-process `Shared_*` reading double-counts the overlapping
103/// physical residency. The same double-count exists in the
104/// un-aggregated display — the operator already sees `Shared_Clean
105/// = 500MiB` listed against two distinct PID rows that happen to
106/// share the same library mapping — so the merge introduces no
107/// new information loss, just preserves the pre-existing kernel-
108/// emission characteristic. `Pss` stays the precise read for a
109/// merged bucket's resident footprint because the kernel
110/// proportionally divides shared pages across mappers
111/// (`fs/proc/task_mmu.c::smaps_account`); operators tracking actual
112/// memory pressure should prefer `Pss` over `Rss + Shared_*`
113/// arithmetic on collapsed buckets.
114///
115/// Values are converted from kB to bytes via
116/// [`ThreadState::smaps_rollup_bytes`] up-front, so the
117/// downstream renderer can pass cell values directly into the
118/// auto_scale "B" ladder without further unit math.
119pub fn collect_smaps_rollup(
120 snap: &CtprofSnapshot,
121 no_thread_normalize: bool,
122) -> BTreeMap<String, BTreeMap<String, u64>> {
123 collect_smaps_rollup_inner(snap, no_thread_normalize, false, &[], None)
124}
125
126pub fn collect_smaps_rollup_hierarchical(
127 snap: &CtprofSnapshot,
128 no_thread_normalize: bool,
129 flatten: &[glob::Pattern],
130 cgroup_key_map: Option<&BTreeMap<String, String>>,
131) -> BTreeMap<String, BTreeMap<String, u64>> {
132 collect_smaps_rollup_inner(snap, no_thread_normalize, true, flatten, cgroup_key_map)
133}
134
135fn collect_smaps_rollup_inner(
136 snap: &CtprofSnapshot,
137 no_thread_normalize: bool,
138 compound_cgroup: bool,
139 flatten: &[glob::Pattern],
140 cgroup_key_map: Option<&BTreeMap<String, String>>,
141) -> BTreeMap<String, BTreeMap<String, u64>> {
142 let mut out: BTreeMap<String, BTreeMap<String, u64>> = BTreeMap::new();
143 for t in &snap.threads {
144 if t.smaps_rollup_kib.is_empty() {
145 continue;
146 }
147 let pcomm_key = if no_thread_normalize {
148 format!("{}[{}]", t.pcomm, t.tgid)
149 } else {
150 pattern_key(&t.pcomm)
151 };
152 let key = if compound_cgroup {
153 let cg = flatten_cgroup_path(&t.cgroup, flatten);
154 let cg_key = match cgroup_key_map.and_then(|m| m.get(&cg)) {
155 Some(k) => k.clone(),
156 None => cg,
157 };
158 format!("{cg_key}\x00{pcomm_key}")
159 } else {
160 pcomm_key
161 };
162 let entry = out.entry(key).or_default();
163 for (k, b) in t.smaps_rollup_bytes() {
164 entry
165 .entry(k.clone())
166 .and_modify(|v| *v = v.saturating_add(b.0))
167 .or_insert(b.0);
168 }
169 }
170 out
171}
172
173/// Build the post-flatten-path → final-tightened-key map for
174/// [`GroupBy::Cgroup`] under auto-normalization. Walks the union
175/// of paths from both snapshots' threads and `cgroup_stats` so
176/// that Layer 3 (tighten) sees every contributor to a given
177/// Layer-2 skeleton group. Returns the map keyed by post-flatten
178/// path; consumers ([`build_groups`], `flatten_cgroup_stats`)
179/// look up the final key for any path they see.
180pub fn build_cgroup_key_map(
181 baseline: &CtprofSnapshot,
182 candidate: &CtprofSnapshot,
183 flatten: &[glob::Pattern],
184) -> BTreeMap<String, String> {
185 use std::collections::BTreeSet;
186 let mut paths: BTreeSet<String> = BTreeSet::new();
187 for snap in [baseline, candidate] {
188 for t in &snap.threads {
189 paths.insert(flatten_cgroup_path(&t.cgroup, flatten));
190 }
191 for k in snap.cgroup_stats.keys() {
192 paths.insert(flatten_cgroup_path(k, flatten));
193 }
194 }
195 // Compute (skeleton, post_l1, tokens) for every path.
196 let entries: Vec<(String, String, String, Vec<String>)> = paths
197 .into_iter()
198 .map(|p| {
199 let (skeleton, post_l1, tokens) = cgroup_normalize_skeleton(&p);
200 (p, skeleton, post_l1, tokens)
201 })
202 .collect();
203 // Group entries by Layer-2 skeleton.
204 let mut groups: BTreeMap<String, Vec<usize>> = BTreeMap::new();
205 for (idx, (_, skel, _, _)) in entries.iter().enumerate() {
206 groups.entry(skel.clone()).or_default().push(idx);
207 }
208 // Tighten per group.
209 let mut tightened: Vec<String> = vec![String::new(); entries.len()];
210 for (skeleton, indices) in &groups {
211 if indices.len() < 2 {
212 // Singleton — Layer-2 skeleton stays as the key. No
213 // member set to compare against.
214 for &i in indices {
215 tightened[i] = skeleton.clone();
216 }
217 } else {
218 let post_l1_paths: Vec<String> =
219 indices.iter().map(|&i| entries[i].2.clone()).collect();
220 let member_tokens: Vec<Vec<String>> =
221 indices.iter().map(|&i| entries[i].3.clone()).collect();
222 let key = tighten_group(&post_l1_paths, &member_tokens);
223 for &i in indices {
224 tightened[i] = key.clone();
225 }
226 }
227 }
228 let mut out: BTreeMap<String, String> = BTreeMap::new();
229 for (i, (orig, _, _, _)) in entries.into_iter().enumerate() {
230 out.insert(orig, tightened[i].clone());
231 }
232 out
233}
234
235#[allow(clippy::too_many_arguments)]
236pub(super) fn build_row(
237 key: &str,
238 display_key: &str,
239 n_a: usize,
240 n_b: usize,
241 metric: &'static CtprofMetricDef,
242 a: Aggregated,
243 b: Aggregated,
244 uptime_pct: Option<f64>,
245) -> DiffRow {
246 let (delta, delta_pct) = match (a.numeric(), b.numeric()) {
247 (Some(va), Some(vb)) => {
248 let d = vb - va;
249 let pct = if va.abs() > f64::EPSILON {
250 Some(d / va)
251 } else {
252 None
253 };
254 (Some(d), pct)
255 }
256 _ => (None, None),
257 };
258 DiffRow {
259 group_key: key.to_string(),
260 thread_count_a: n_a,
261 thread_count_b: n_b,
262 uptime_pct,
263 metric_name: metric.name,
264 metric_ladder: metric.rule.ladder(),
265 baseline: a,
266 candidate: b,
267 delta,
268 delta_pct,
269 display_key: display_key.to_string(),
270 sort_by_cell: None,
271 sort_by_delta: None,
272 }
273}
274
275pub fn build_groups(
276 snap: &CtprofSnapshot,
277 group_by: GroupBy,
278 flatten: &[glob::Pattern],
279 pattern_counts: Option<&BTreeMap<String, usize>>,
280 cgroup_key_map: Option<&BTreeMap<String, String>>,
281 no_thread_normalize: bool,
282) -> BTreeMap<String, ThreadGroup> {
283 // Pattern-aware grouping (Comm, Pcomm) needs a frequency pass:
284 // pattern keys with only one matching thread revert to the
285 // literal name so a lone worker stays ungrouped instead of
286 // advertising a `worker-{N}` pattern that no other thread
287 // shares. Non-pattern groupings (CommExact, Cgroup) skip the
288 // pre-pass.
289 //
290 // When `pattern_counts` is supplied (production: `compare()`
291 // passes the union over baseline+candidate), it is used as
292 // the gate. When it is `None` (single-snapshot test
293 // ergonomics), this fn computes counts from `snap` alone.
294 // Suppressed when `no_thread_normalize` is set — the gate is
295 // meaningless once each thread groups by its literal name.
296 // Pattern_field selects the thread accessor used by the
297 // singleton-revert gate inside the GroupBy::Pcomm / Comm
298 // grouping arm. GroupBy::All has its own arm that normalizes
299 // both pcomm and comm unconditionally (no singleton revert),
300 // so it never reads `pattern_field`. CommExact and Cgroup
301 // don't normalize either.
302 let pattern_field: Option<fn(&ThreadState) -> &str> = match (group_by, no_thread_normalize) {
303 (GroupBy::Comm, false) => Some(|t: &ThreadState| t.comm.as_str()),
304 (GroupBy::Pcomm, false) => Some(|t: &ThreadState| t.pcomm.as_str()),
305 _ => None,
306 };
307 let local_counts: Option<BTreeMap<String, usize>> = match (pattern_field, pattern_counts) {
308 (Some(field), None) => {
309 let mut counts: BTreeMap<String, usize> = BTreeMap::new();
310 for t in &snap.threads {
311 *counts.entry(pattern_key(field(t))).or_insert(0) += 1;
312 }
313 Some(counts)
314 }
315 _ => None,
316 };
317 let counts_ref: Option<&BTreeMap<String, usize>> = pattern_counts.or(local_counts.as_ref());
318
319 let mut buckets: BTreeMap<String, Vec<&ThreadState>> = BTreeMap::new();
320 for t in &snap.threads {
321 let key = match group_by {
322 GroupBy::All => {
323 let cg = flatten_cgroup_path(&t.cgroup, flatten);
324 let cg_key = match cgroup_key_map.and_then(|m| m.get(&cg)) {
325 Some(k) => k.clone(),
326 None => cg,
327 };
328 let pcomm_key = if no_thread_normalize {
329 t.pcomm.clone()
330 } else {
331 pattern_key(&t.pcomm)
332 };
333 let comm_key = if no_thread_normalize {
334 t.comm.clone()
335 } else {
336 pattern_key(&t.comm)
337 };
338 format!("{cg_key}\x00{pcomm_key}\x00{comm_key}")
339 }
340 // Pcomm and Comm share the same shape: when
341 // normalization is enabled, route the chosen field
342 // through `pattern_key` and revert singletons to the
343 // literal name so a lone process / thread does not
344 // advertise a pattern that no other contributor
345 // shares. The `pattern_field` accessor (already
346 // computed for the local_counts pre-pass) selects
347 // pcomm vs comm; under `no_thread_normalize` it is
348 // `None` and we group by literal name directly.
349 GroupBy::Pcomm | GroupBy::Comm => match pattern_field {
350 Some(field) => {
351 let name = field(t);
352 let pk = pattern_key(name);
353 let counts = counts_ref.expect("pattern_counts seeded for Pcomm/Comm");
354 if counts.get(&pk).copied().unwrap_or(0) >= 2 {
355 pk
356 } else {
357 name.to_string()
358 }
359 }
360 None => {
361 // `no_thread_normalize` set — literal grouping.
362 if group_by == GroupBy::Pcomm {
363 t.pcomm.clone()
364 } else {
365 t.comm.clone()
366 }
367 }
368 },
369 GroupBy::CommExact => t.comm.clone(),
370 GroupBy::Cgroup => {
371 let post_flatten = flatten_cgroup_path(&t.cgroup, flatten);
372 // When auto-normalize is enabled, the cgroup key map
373 // (built by `compare()` over the union of paths from
374 // both snapshots) maps each post-flatten path to its
375 // final tightened key (Layer 1 + 2 + 3). Otherwise,
376 // group by post-flatten path verbatim.
377 match cgroup_key_map.and_then(|m| m.get(&post_flatten)) {
378 Some(k) => k.clone(),
379 None => post_flatten,
380 }
381 }
382 };
383 buckets.entry(key).or_default().push(t);
384 }
385
386 let mut out = BTreeMap::new();
387 for (key, threads) in buckets {
388 let mut metrics = BTreeMap::new();
389 for m in CTPROF_METRICS {
390 let agg = aggregate(m.rule, &threads);
391 // Capture-gated families: a non-empty bucket whose threads ALL went
392 // uncaptured for the family folds to Absent, not a sentinel Sum(0),
393 // so a derived metric short-circuits to "-" and the raw row renders
394 // "-" rather than a measured-looking 0. A measured zero (>=1 thread
395 // captured it) stays its real aggregate.
396 let agg = match measured_predicate(m) {
397 Some(pred) if !threads.is_empty() && !threads.iter().copied().any(pred) => {
398 Aggregated::Absent
399 }
400 _ => agg,
401 };
402 metrics.insert(m.name.to_string(), agg);
403 }
404 let cgroup_stats = if group_by == GroupBy::Cgroup {
405 // Pick the first sampled thread's raw cgroup path and
406 // look up its enrichment. `snap.cgroup_stats` is keyed
407 // by raw cgroup paths, so the lookup uses `t.cgroup`
408 // verbatim (flattening produces a separate map and is
409 // not written back here). When a flatten/tighten
410 // pattern collapses several raw cgroups into one
411 // bucket, the first thread's raw cgroup is used as the
412 // representative enrichment.
413 threads
414 .first()
415 .and_then(|t| snap.cgroup_stats.get(&t.cgroup).cloned())
416 } else {
417 None
418 };
419 // `members` feeds the grex display-label path for
420 // normalized `GroupBy::Comm` (literal comms) and
421 // `GroupBy::Pcomm` (literal pcomms). Other groupings — and
422 // either pattern-aware grouping under
423 // `no_thread_normalize` — render the join key directly, so
424 // skip the per-bucket name collection (saves a
425 // clone-per-thread per-bucket on busy hosts).
426 let members: Vec<String> = match pattern_field {
427 Some(field) => {
428 let mut v: Vec<String> = threads.iter().map(|t| field(t).to_string()).collect();
429 v.sort();
430 v.dedup();
431 v
432 }
433 None => Vec::new(),
434 };
435 let valid_starts: Vec<u64> = threads
436 .iter()
437 .map(|t| t.start_time_clock_ticks)
438 .filter(|&t| t > 0)
439 .collect();
440 let avg_start_ticks = if valid_starts.is_empty() {
441 0
442 } else {
443 valid_starts.iter().sum::<u64>() / valid_starts.len() as u64
444 };
445 out.insert(
446 key.clone(),
447 ThreadGroup {
448 key,
449 thread_count: threads.len(),
450 metrics,
451 cgroup_stats,
452 members,
453 avg_start_ticks,
454 },
455 );
456 }
457 out
458}
459
460/// Aggregate one metric across a slice of threads per its rule.
461///
462/// Each `Sum*` / `Max*` / `Range*` arm dispatches
463/// through the trait method on the typed newtype defined in
464/// [`crate::metric_types`] — `sum_across` for [`Summable`],
465/// `max_across` for [`Maxable`], `range_across` for [`Rangeable`]
466/// — then unwraps to the
467/// untyped scalar that [`Aggregated`] carries today; the `Mode*`
468/// arms instead route through [`mode_aggregate`] (tally-based),
469/// not a trait method; the
470/// unit-aware format dispatch (which would read the registry's
471/// `unit` tag rather than the wrapper type) is not implemented
472/// yet, so `Aggregated` stays scalar-shaped after this phase.
473///
474/// # Empty-bucket contract
475///
476/// The trait-level shapes split empty handling differently
477/// from the dispatch-level shape:
478/// - `Summable::sum_across` returns the additive identity
479/// (zero) on an empty input — the trait surface itself
480/// collapses the empty case. The `Sum*` arms therefore feed
481/// straight into [`Aggregated::Sum`] without re-checking.
482/// - `Maxable::max_across` returns `Option<Self>` (`None`
483/// for empty) so callers can distinguish "no contributors"
484/// from "all contributors had zero." The dispatch in this
485/// function collapses `None` to `Aggregated::Max(0)` at the
486/// call boundary so the historical empty-bucket contract on
487/// this code path (zero rendered for empty groups) holds
488/// regardless of the trait's richer shape.
489/// - `Rangeable::range_across` returns
490/// `Option<Range<Self>>`; the dispatch collapses `None` to
491/// `Aggregated::OrdinalRange { min: 0, max: 0 }` at the call
492/// boundary.
493/// - The `Mode*` arms call [`mode_aggregate`], which builds a
494/// per-value tally map; an empty bucket yields
495/// `Aggregated::Mode { tallies: (empty), total }` where `total`
496/// is the bucket size (`aggregate` passes `threads.len()`
497/// directly). The mode/count are derived on demand via
498/// [`Aggregated::mode_value`] / [`Aggregated::mode_count`].
499///
500/// Downstream delta math therefore sees a well-defined value
501/// at every join boundary regardless of which side of a
502/// compare carried zero threads under the bucket key.
503///
504/// [`Summable`]: crate::metric_types::Summable
505/// [`Maxable`]: crate::metric_types::Maxable
506/// [`Rangeable`]: crate::metric_types::Rangeable
507/// [`Modeable`]: crate::metric_types::Modeable
508///
509/// Mode-arm dispatch helper used by `aggregate`. Builds a
510/// per-value tally map (`BTreeMap<String, usize>`) over a typed
511/// iterator of [`crate::metric_types::CategoricalString`] and
512/// returns [`Aggregated::Mode`] with the supplied `total` (the
513/// number of threads in the bucket). Empty buckets surface as
514/// `Aggregated::Mode { tallies: (empty), total }` — downstream
515/// delta math sees
516/// a well-defined value at the join boundary regardless of which
517/// side carried zero threads. Lifts the otherwise-identical
518/// match arms for [`AggRule::Mode`], [`AggRule::ModeChar`], and
519/// [`AggRule::ModeBool`] into one site so a future refactor that
520/// changes the empty-bucket contract or the tally shape only
521/// edits one place.
522fn mode_aggregate(
523 total: usize,
524 items: impl IntoIterator<Item = crate::metric_types::CategoricalString>,
525) -> Aggregated {
526 // Build the full tally map across the bucket — one entry
527 // per distinct category, with its occurrence count. The
528 // mode (most-frequent value) is derived on demand by
529 // [`Aggregated::mode_value`] / [`Aggregated::mode_count`];
530 // storing the full distribution (not just the mode) lets
531 // the N:1 fudge merge compose tallies correctly across
532 // buckets via [`merge_aggregated_into`].
533 let mut tallies: BTreeMap<String, usize> = BTreeMap::new();
534 for item in items {
535 *tallies.entry(item.0).or_insert(0) += 1;
536 }
537 Aggregated::Mode { tallies, total }
538}
539
540/// Taskstats sub-family of a raw metric, classified by name — the gating
541/// granularity kernel delay/xacct accounting actually has. The three sub-families
542/// have DIFFERENT enablement conditions (see [`measured_predicate`]): `cpu_delay_*`
543/// survive the runtime delayacct toggle, the resource-wait categories do not, and the
544/// xacct watermarks key off a separate CONFIG with no runtime toggle.
545#[derive(Debug, Clone, Copy, PartialEq, Eq)]
546enum TaskstatsFamily {
547 /// `cpu_delay_*` — sched_info-sourced, filled unconditionally by
548 /// `delayacct_add_tsk`; active unless CONFIG_TASK_DELAY_ACCT is off entirely.
549 CpuDelay,
550 /// The 7 delayacct resource-wait categories (`blkio`/`swapin`/`freepages`/`thrashing`/
551 /// `compact`/`wpcopy`/`irq`) — gated by the runtime `task_delayacct` toggle
552 /// (`tsk->delays` is allocated at fork only when on).
553 DelayBlock,
554 /// The xacct memory watermarks (`hiwater_*`) — gated by CONFIG_TASK_XACCT, no
555 /// runtime toggle.
556 Xacct,
557}
558
559/// Classify a RAW taskstats metric by name. `None` for non-taskstats metrics.
560/// Derived taskstats metrics (`avg_*_delay_ns`, `total_offcpu_delay_ns`) are NOT
561/// classified here — they inherit [`Aggregated::Absent`] from their raw inputs
562/// ([`Aggregated::numeric`] returns `None`, so the derived `compute` closure
563/// short-circuits). The
564/// `build_groups_all_subfamilies_inactive_makes_every_taskstats_metric_absent`
565/// test pins that every `Section::TaskstatsDelay` raw metric maps to `Some`.
566fn taskstats_family(name: &str) -> Option<TaskstatsFamily> {
567 if name.starts_with("cpu_delay_") {
568 return Some(TaskstatsFamily::CpuDelay);
569 }
570 if name.starts_with("hiwater_") {
571 return Some(TaskstatsFamily::Xacct);
572 }
573 const LOCK_DELAY_PREFIXES: [&str; 7] = [
574 "blkio_delay_",
575 "swapin_delay_",
576 "freepages_delay_",
577 "thrashing_delay_",
578 "compact_delay_",
579 "wpcopy_delay_",
580 "irq_delay_",
581 ];
582 if LOCK_DELAY_PREFIXES.iter().any(|p| name.starts_with(p)) {
583 return Some(TaskstatsFamily::DelayBlock);
584 }
585 None
586}
587
588/// The per-thread "was this metric's family genuinely measured" predicate for a
589/// capture-gated metric, or `None` for an always-measured metric. A taskstats
590/// metric is measured iff this thread's genetlink query succeeded
591/// (`taskstats_measured`) AND its sub-family is enabled host-wide — per
592/// [`TaskstatsFamily`], `cpu_delay_*` need CONFIG_TASK_DELAY_ACCT built in, the
593/// resource-wait categories additionally need the runtime delayacct toggle on, and
594/// the `hiwater_*` watermarks need CONFIG_TASK_XACCT (the active flags are baked
595/// per-thread at capture). jemalloc is the two byte counters, which live in the
596/// shared `Section::Primary` and so are name-matched. `build_groups` folds a
597/// non-empty bucket where the predicate holds but NO thread measured the family
598/// to [`Aggregated::Absent`], distinguishing a never-captured (or
599/// sub-family-disabled) family from a measured zero (the bug: a derived metric
600/// otherwise computes from a sentinel `0` and renders "0" instead of "-").
601fn measured_predicate(m: &CtprofMetricDef) -> Option<fn(&ThreadState) -> bool> {
602 if let Some(fam) = taskstats_family(m.name) {
603 return Some(match fam {
604 TaskstatsFamily::CpuDelay => |t| t.taskstats_measured && t.cpu_delay_active,
605 TaskstatsFamily::DelayBlock => |t| t.taskstats_measured && t.delay_block_active,
606 TaskstatsFamily::Xacct => |t| t.taskstats_measured && t.xacct_active,
607 });
608 }
609 if m.name == "allocated_bytes" || m.name == "deallocated_bytes" {
610 return Some(|t| t.jemalloc_measured);
611 }
612 None
613}
614
615pub fn aggregate(rule: AggRule, threads: &[&ThreadState]) -> Aggregated {
616 // `Modeable` is not used anywhere in this module: the Mode
617 // arms tally directly in `mode_aggregate` rather than calling
618 // `mode_across`. `CategoricalString` is still needed because
619 // the ModeChar / ModeBool arms construct one for the
620 // coercion path before passing the iterator to
621 // `mode_aggregate`.
622 use crate::metric_types::{CategoricalString, Maxable, Rangeable, Summable};
623 match rule {
624 AggRule::SumCount(f) => {
625 let s = crate::metric_types::MonotonicCount::sum_across(threads.iter().map(|t| f(t)));
626 Aggregated::Sum(s.0)
627 }
628 AggRule::SumNs(f) => {
629 let s = crate::metric_types::MonotonicNs::sum_across(threads.iter().map(|t| f(t)));
630 Aggregated::Sum(s.0)
631 }
632 AggRule::SumTicks(f) => {
633 let s = crate::metric_types::ClockTicks::sum_across(threads.iter().map(|t| f(t)));
634 Aggregated::Sum(s.0)
635 }
636 AggRule::SumBytes(f) => {
637 let s = crate::metric_types::Bytes::sum_across(threads.iter().map(|t| f(t)));
638 Aggregated::Sum(s.0)
639 }
640 AggRule::MaxPeak(f) => {
641 // `max_across` returns `Option<Self>` so callers can
642 // distinguish "empty thread bucket" from "all
643 // contributors had zero." The historical empty-bucket
644 // contract on this code path was `Aggregated::Max(0)`;
645 // preserve it by collapsing `None` to the additive
646 // identity at the call boundary. Non-empty buckets
647 // produce a concrete max regardless of value.
648 let m = crate::metric_types::PeakNs::max_across(threads.iter().map(|t| f(t)));
649 Aggregated::Max(m.map(|v| v.0).unwrap_or(0))
650 }
651 AggRule::MaxPeakBytes(f) => {
652 // Same Option<Self> + None → Aggregated::Max(0)
653 // collapse as MaxPeak; the difference is only the
654 // typed accessor's unit family — Bytes vs Ns. The
655 // ladder() match maps this variant to
656 // ScaleLadder::Bytes so the renderer auto-scales
657 // with KiB/MiB/GiB/TiB suffixes.
658 let m = crate::metric_types::PeakBytes::max_across(threads.iter().map(|t| f(t)));
659 Aggregated::Max(m.map(|v| v.0).unwrap_or(0))
660 }
661 AggRule::MaxGaugeNs(f) => {
662 let m = crate::metric_types::GaugeNs::max_across(threads.iter().map(|t| f(t)));
663 Aggregated::Max(m.map(|v| v.0).unwrap_or(0))
664 }
665 AggRule::MaxGaugeCount(f) => {
666 let m = crate::metric_types::GaugeCount::max_across(threads.iter().map(|t| f(t)));
667 Aggregated::Max(m.map(|v| v.0).unwrap_or(0))
668 }
669 AggRule::RangeI32(f) => {
670 match crate::metric_types::OrdinalI32::range_across(threads.iter().map(|t| f(t))) {
671 // `range_across` returns `None` only on an empty
672 // iterator — mirror the historical empty-group
673 // contract by collapsing to (0, 0) so the
674 // downstream midpoint and delta math sees a
675 // well-defined value at the join boundary. The
676 // `Some` arm carries a typed `Range<OrdinalI32>`
677 // wrapper that guarantees min ≤ max as a
678 // type-system invariant; `into_tuple()` extracts
679 // the pair without re-checking.
680 Some(r) => {
681 let (min, max) = r.into_tuple();
682 Aggregated::OrdinalRange {
683 min: i64::from(min.0),
684 max: i64::from(max.0),
685 }
686 }
687 None => Aggregated::OrdinalRange { min: 0, max: 0 },
688 }
689 }
690 AggRule::RangeU32(f) => {
691 match crate::metric_types::OrdinalU32::range_across(threads.iter().map(|t| f(t))) {
692 Some(r) => {
693 let (min, max) = r.into_tuple();
694 Aggregated::OrdinalRange {
695 min: i64::from(min.0),
696 max: i64::from(max.0),
697 }
698 }
699 None => Aggregated::OrdinalRange { min: 0, max: 0 },
700 }
701 }
702 AggRule::Mode(f) => mode_aggregate(threads.len(), threads.iter().map(|t| f(t))),
703 AggRule::ModeChar(f) => mode_aggregate(
704 threads.len(),
705 // `char` is not Modeable directly; coerce to the
706 // CategoricalString reduction so the lex-tiebreak
707 // contract is identical to other Mode variants.
708 threads.iter().map(|t| CategoricalString(f(t).to_string())),
709 ),
710 AggRule::ModeBool(f) => mode_aggregate(
711 threads.len(),
712 // Same coercion path as `ModeChar`. `to_string()`
713 // produces `"true"`/`"false"` per `bool::Display`.
714 threads.iter().map(|t| CategoricalString(f(t).to_string())),
715 ),
716 AggRule::Affinity(f) => {
717 let mut seen: Vec<Vec<u32>> = Vec::new();
718 let mut min_cpus = usize::MAX;
719 let mut max_cpus = 0usize;
720 for t in threads {
721 let cpus = f(t).0;
722 min_cpus = min_cpus.min(cpus.len());
723 max_cpus = max_cpus.max(cpus.len());
724 if !seen.iter().any(|s| s == &cpus) {
725 seen.push(cpus);
726 }
727 }
728 if threads.is_empty() {
729 min_cpus = 0;
730 }
731 let uniform = if seen.len() == 1 {
732 seen.into_iter().next()
733 } else {
734 None
735 };
736 Aggregated::Affinity(AffinitySummary {
737 min_cpus,
738 max_cpus,
739 uniform,
740 })
741 }
742 }
743}
744
745/// Collapse dynamic segments of a cgroup path per every pattern
746/// in `patterns`. A pattern is a glob matched with glob's default
747/// `MatchOptions` (`require_literal_separator = false`), so `*` is
748/// NOT segment-bounded — it matches across `/` just like `**`. The
749/// literal portions are preserved
750/// and the wildcard portions are replaced with the wildcard token
751/// itself. Example: pattern `/kubepods/*/workload` applied to
752/// `/kubepods/pod-abc/workload` produces `/kubepods/*/workload`,
753/// so two runs with different pod IDs collapse onto the same key.
754///
755/// Patterns are tried in listed order; the first match wins and
756/// subsequent patterns are not applied. A path that matches no
757/// pattern is returned verbatim.
758pub fn flatten_cgroup_path(path: &str, patterns: &[glob::Pattern]) -> String {
759 for p in patterns {
760 if p.matches(path) {
761 // The pattern itself becomes the canonical key: every
762 // path matching `/kubepods/*/workload` collapses onto
763 // the literal pattern string.
764 return p.as_str().to_string();
765 }
766 }
767 path.to_string()
768}
769
770pub fn compile_flatten_patterns(raw: &[String]) -> Vec<glob::Pattern> {
771 raw.iter()
772 .filter_map(|s| glob::Pattern::new(s).ok())
773 .collect()
774}