ktstr/ctprof_compare/
groups.rs

1//! Group construction, per-group aggregation, and the cgroup-path
2//! flatten helpers consumed by [`super::compare()`].
3//!
4//! Three layers:
5//!
6//! 1. [`build_groups`] / [`build_cgroup_key_map`] — partition a
7//!    snapshot's threads by [`super::GroupBy`] axis (pcomm, cgroup,
8//!    comm, comm-exact), apply pattern-aware skeleton clustering
9//!    where applicable, and produce a `BTreeMap<key, ThreadGroup>`
10//!    keyed by the post-tightening join key. The N:1 pattern-bucket
11//!    promotion ([`super::CompareOptions::no_thread_normalize`])
12//!    determines whether buckets share a token-normalized skeleton
13//!    or stay at the literal `pcomm` / `comm` value.
14//!
15//! 2. [`aggregate`] / [`mode_aggregate`] / [`build_row`] — apply
16//!    per-metric reduction rules ([`super::AggRule`]) across a
17//!    group's threads to produce [`super::Aggregated`] values, then
18//!    materialize one [`super::DiffRow`] per `(group, metric)` pair
19//!    with side-by-side baseline + candidate values and the
20//!    precomputed delta / pct cells. The Mode arm dispatches
21//!    through [`mode_aggregate`] so the categorical empty-bucket
22//!    contract lives in one place; the numeric arms delegate to
23//!    the typed traits in [`crate::metric_types`]
24//!    (`Summable::sum_across`, `Maxable::max_across`,
25//!    `Rangeable::range_across`).
26//!
27//! 3. [`collect_smaps_rollup`] / [`collect_smaps_rollup_hierarchical`]
28//!    — pull the per-process smaps_rollup map (from the leader
29//!    thread of each tgid) and key by the same pattern as the
30//!    primary-table buckets so byte-counts join across snapshots
31//!    when PIDs are ephemeral. The hierarchical variant honors
32//!    cgroup-flatten patterns and the auto-normalize key map so
33//!    smaps rows align with the primary table even after
34//!    cgroup-path tightening.
35//!
36//! [`flatten_cgroup_path`] / [`compile_flatten_patterns`] are
37//! the helpers the caller side ([`super::compare()`] +
38//! [`super::flatten_cgroup_stats`]) uses to apply user-supplied
39//! glob patterns against cgroup paths before bucket assignment.
40
41use std::collections::BTreeMap;
42
43use crate::ctprof::{CtprofSnapshot, ThreadState};
44
45use super::{
46    AffinitySummary, AggRule, Aggregated, CTPROF_METRICS, CtprofMetricDef, DiffRow, GroupBy,
47    ThreadGroup,
48    pattern::{cgroup_normalize_skeleton, pattern_key, tighten_group},
49};
50
51/// Walk a snapshot's threads and pull non-empty smaps_rollup
52/// maps off the leader threads (tid == tgid; non-leader threads
53/// land at empty map per the leader-dedup contract).
54///
55/// Keying:
56///
57/// - Default normalization (`no_thread_normalize: false`): key is
58///   `pattern_key(&t.pcomm)` — pcomm only, the `[tgid]` suffix is
59///   DROPPED. The tgid digits would always normalize to `{N}` and
60///   add no discriminating signal to the join key, so omitting
61///   them makes smaps keys match the primary-table Pcomm group
62///   keys exactly (`kworker/{N}:{N}`, `firefox`, `worker-{N}`,
63///   etc.).
64///
65///   No singleton revert. Unlike [`build_groups`], which reverts a
66///   pattern_key to the literal name when only one contributor
67///   shares the skeleton, `collect_smaps_rollup` always normalizes
68///   when normalization is enabled regardless of how many PIDs
69///   share the bucket. The reason is structural: smaps keys exist
70///   to JOIN baseline vs candidate across snapshots, and PIDs are
71///   per-snapshot ephemeral. A singleton-revert path would emit a
72///   literal `worker[7]` on baseline and a literal `worker[1234]`
73///   on candidate — two never-matching keys — orphaning every
74///   cross-snapshot row. The build_groups invariant ("don't
75///   advertise a pattern that no peer shares") doesn't apply on
76///   the smaps axis because the bucket's role isn't intra-
77///   snapshot fleet aggregation; it's cross-snapshot memory
78///   diffing.
79///
80/// - Literal mode (`no_thread_normalize: true`): key is
81///   `pcomm[tgid]` so each PID stays attributable to its
82///   specific instance. The tradeoff is that rows only join
83///   across snapshots when the same process instance ran on
84///   both sides — the `[tgid]` is preserved precisely so two
85///   distinct PIDs sharing a pcomm don't collide within a
86///   snapshot.
87///
88/// Aggregation: multiple leader threads mapping to the same
89/// key (default mode: a fleet of `worker-{N}` parents) SUM
90/// their per-field byte counts. `Rss`, `Pss`, `Private_*`,
91/// `Shared_*` etc. each accumulate via `saturating_add` —
92/// memory quantities are additive across the merged bucket.
93/// `saturating_add` mirrors the cumulative-counter merge policy
94/// elsewhere in ctprof_compare (`cgroup_merge.rs`: `usage_usec`,
95/// `throttled_usec`); a
96/// u64 byte-count overflow implies more than 16 EiB of resident
97/// memory across the bucket, well past any realistic host.
98///
99/// Caveat on `Shared_*` aggregation: when multiple PIDs in a
100/// merged bucket share physical pages (the COW case for forked
101/// children, mmap'd shared libraries, etc.), summing each PID's
102/// per-process `Shared_*` reading double-counts the overlapping
103/// physical residency. The same double-count exists in the
104/// un-aggregated display — the operator already sees `Shared_Clean
105/// = 500MiB` listed against two distinct PID rows that happen to
106/// share the same library mapping — so the merge introduces no
107/// new information loss, just preserves the pre-existing kernel-
108/// emission characteristic. `Pss` stays the precise read for a
109/// merged bucket's resident footprint because the kernel
110/// proportionally divides shared pages across mappers
111/// (`fs/proc/task_mmu.c::smaps_account`); operators tracking actual
112/// memory pressure should prefer `Pss` over `Rss + Shared_*`
113/// arithmetic on collapsed buckets.
114///
115/// Values are converted from kB to bytes via
116/// [`ThreadState::smaps_rollup_bytes`] up-front, so the
117/// downstream renderer can pass cell values directly into the
118/// auto_scale "B" ladder without further unit math.
119pub fn collect_smaps_rollup(
120    snap: &CtprofSnapshot,
121    no_thread_normalize: bool,
122) -> BTreeMap<String, BTreeMap<String, u64>> {
123    collect_smaps_rollup_inner(snap, no_thread_normalize, false, &[], None)
124}
125
126pub fn collect_smaps_rollup_hierarchical(
127    snap: &CtprofSnapshot,
128    no_thread_normalize: bool,
129    flatten: &[glob::Pattern],
130    cgroup_key_map: Option<&BTreeMap<String, String>>,
131) -> BTreeMap<String, BTreeMap<String, u64>> {
132    collect_smaps_rollup_inner(snap, no_thread_normalize, true, flatten, cgroup_key_map)
133}
134
135fn collect_smaps_rollup_inner(
136    snap: &CtprofSnapshot,
137    no_thread_normalize: bool,
138    compound_cgroup: bool,
139    flatten: &[glob::Pattern],
140    cgroup_key_map: Option<&BTreeMap<String, String>>,
141) -> BTreeMap<String, BTreeMap<String, u64>> {
142    let mut out: BTreeMap<String, BTreeMap<String, u64>> = BTreeMap::new();
143    for t in &snap.threads {
144        if t.smaps_rollup_kib.is_empty() {
145            continue;
146        }
147        let pcomm_key = if no_thread_normalize {
148            format!("{}[{}]", t.pcomm, t.tgid)
149        } else {
150            pattern_key(&t.pcomm)
151        };
152        let key = if compound_cgroup {
153            let cg = flatten_cgroup_path(&t.cgroup, flatten);
154            let cg_key = match cgroup_key_map.and_then(|m| m.get(&cg)) {
155                Some(k) => k.clone(),
156                None => cg,
157            };
158            format!("{cg_key}\x00{pcomm_key}")
159        } else {
160            pcomm_key
161        };
162        let entry = out.entry(key).or_default();
163        for (k, b) in t.smaps_rollup_bytes() {
164            entry
165                .entry(k.clone())
166                .and_modify(|v| *v = v.saturating_add(b.0))
167                .or_insert(b.0);
168        }
169    }
170    out
171}
172
173/// Build the post-flatten-path → final-tightened-key map for
174/// [`GroupBy::Cgroup`] under auto-normalization. Walks the union
175/// of paths from both snapshots' threads and `cgroup_stats` so
176/// that Layer 3 (tighten) sees every contributor to a given
177/// Layer-2 skeleton group. Returns the map keyed by post-flatten
178/// path; consumers ([`build_groups`], `flatten_cgroup_stats`)
179/// look up the final key for any path they see.
180pub fn build_cgroup_key_map(
181    baseline: &CtprofSnapshot,
182    candidate: &CtprofSnapshot,
183    flatten: &[glob::Pattern],
184) -> BTreeMap<String, String> {
185    use std::collections::BTreeSet;
186    let mut paths: BTreeSet<String> = BTreeSet::new();
187    for snap in [baseline, candidate] {
188        for t in &snap.threads {
189            paths.insert(flatten_cgroup_path(&t.cgroup, flatten));
190        }
191        for k in snap.cgroup_stats.keys() {
192            paths.insert(flatten_cgroup_path(k, flatten));
193        }
194    }
195    // Compute (skeleton, post_l1, tokens) for every path.
196    let entries: Vec<(String, String, String, Vec<String>)> = paths
197        .into_iter()
198        .map(|p| {
199            let (skeleton, post_l1, tokens) = cgroup_normalize_skeleton(&p);
200            (p, skeleton, post_l1, tokens)
201        })
202        .collect();
203    // Group entries by Layer-2 skeleton.
204    let mut groups: BTreeMap<String, Vec<usize>> = BTreeMap::new();
205    for (idx, (_, skel, _, _)) in entries.iter().enumerate() {
206        groups.entry(skel.clone()).or_default().push(idx);
207    }
208    // Tighten per group.
209    let mut tightened: Vec<String> = vec![String::new(); entries.len()];
210    for (skeleton, indices) in &groups {
211        if indices.len() < 2 {
212            // Singleton — Layer-2 skeleton stays as the key. No
213            // member set to compare against.
214            for &i in indices {
215                tightened[i] = skeleton.clone();
216            }
217        } else {
218            let post_l1_paths: Vec<String> =
219                indices.iter().map(|&i| entries[i].2.clone()).collect();
220            let member_tokens: Vec<Vec<String>> =
221                indices.iter().map(|&i| entries[i].3.clone()).collect();
222            let key = tighten_group(&post_l1_paths, &member_tokens);
223            for &i in indices {
224                tightened[i] = key.clone();
225            }
226        }
227    }
228    let mut out: BTreeMap<String, String> = BTreeMap::new();
229    for (i, (orig, _, _, _)) in entries.into_iter().enumerate() {
230        out.insert(orig, tightened[i].clone());
231    }
232    out
233}
234
235#[allow(clippy::too_many_arguments)]
236pub(super) fn build_row(
237    key: &str,
238    display_key: &str,
239    n_a: usize,
240    n_b: usize,
241    metric: &'static CtprofMetricDef,
242    a: Aggregated,
243    b: Aggregated,
244    uptime_pct: Option<f64>,
245) -> DiffRow {
246    let (delta, delta_pct) = match (a.numeric(), b.numeric()) {
247        (Some(va), Some(vb)) => {
248            let d = vb - va;
249            let pct = if va.abs() > f64::EPSILON {
250                Some(d / va)
251            } else {
252                None
253            };
254            (Some(d), pct)
255        }
256        _ => (None, None),
257    };
258    DiffRow {
259        group_key: key.to_string(),
260        thread_count_a: n_a,
261        thread_count_b: n_b,
262        uptime_pct,
263        metric_name: metric.name,
264        metric_ladder: metric.rule.ladder(),
265        baseline: a,
266        candidate: b,
267        delta,
268        delta_pct,
269        display_key: display_key.to_string(),
270        sort_by_cell: None,
271        sort_by_delta: None,
272    }
273}
274
275pub fn build_groups(
276    snap: &CtprofSnapshot,
277    group_by: GroupBy,
278    flatten: &[glob::Pattern],
279    pattern_counts: Option<&BTreeMap<String, usize>>,
280    cgroup_key_map: Option<&BTreeMap<String, String>>,
281    no_thread_normalize: bool,
282) -> BTreeMap<String, ThreadGroup> {
283    // Pattern-aware grouping (Comm, Pcomm) needs a frequency pass:
284    // pattern keys with only one matching thread revert to the
285    // literal name so a lone worker stays ungrouped instead of
286    // advertising a `worker-{N}` pattern that no other thread
287    // shares. Non-pattern groupings (CommExact, Cgroup) skip the
288    // pre-pass.
289    //
290    // When `pattern_counts` is supplied (production: `compare()`
291    // passes the union over baseline+candidate), it is used as
292    // the gate. When it is `None` (single-snapshot test
293    // ergonomics), this fn computes counts from `snap` alone.
294    // Suppressed when `no_thread_normalize` is set — the gate is
295    // meaningless once each thread groups by its literal name.
296    // Pattern_field selects the thread accessor used by the
297    // singleton-revert gate inside the GroupBy::Pcomm / Comm
298    // grouping arm. GroupBy::All has its own arm that normalizes
299    // both pcomm and comm unconditionally (no singleton revert),
300    // so it never reads `pattern_field`. CommExact and Cgroup
301    // don't normalize either.
302    let pattern_field: Option<fn(&ThreadState) -> &str> = match (group_by, no_thread_normalize) {
303        (GroupBy::Comm, false) => Some(|t: &ThreadState| t.comm.as_str()),
304        (GroupBy::Pcomm, false) => Some(|t: &ThreadState| t.pcomm.as_str()),
305        _ => None,
306    };
307    let local_counts: Option<BTreeMap<String, usize>> = match (pattern_field, pattern_counts) {
308        (Some(field), None) => {
309            let mut counts: BTreeMap<String, usize> = BTreeMap::new();
310            for t in &snap.threads {
311                *counts.entry(pattern_key(field(t))).or_insert(0) += 1;
312            }
313            Some(counts)
314        }
315        _ => None,
316    };
317    let counts_ref: Option<&BTreeMap<String, usize>> = pattern_counts.or(local_counts.as_ref());
318
319    let mut buckets: BTreeMap<String, Vec<&ThreadState>> = BTreeMap::new();
320    for t in &snap.threads {
321        let key = match group_by {
322            GroupBy::All => {
323                let cg = flatten_cgroup_path(&t.cgroup, flatten);
324                let cg_key = match cgroup_key_map.and_then(|m| m.get(&cg)) {
325                    Some(k) => k.clone(),
326                    None => cg,
327                };
328                let pcomm_key = if no_thread_normalize {
329                    t.pcomm.clone()
330                } else {
331                    pattern_key(&t.pcomm)
332                };
333                let comm_key = if no_thread_normalize {
334                    t.comm.clone()
335                } else {
336                    pattern_key(&t.comm)
337                };
338                format!("{cg_key}\x00{pcomm_key}\x00{comm_key}")
339            }
340            // Pcomm and Comm share the same shape: when
341            // normalization is enabled, route the chosen field
342            // through `pattern_key` and revert singletons to the
343            // literal name so a lone process / thread does not
344            // advertise a pattern that no other contributor
345            // shares. The `pattern_field` accessor (already
346            // computed for the local_counts pre-pass) selects
347            // pcomm vs comm; under `no_thread_normalize` it is
348            // `None` and we group by literal name directly.
349            GroupBy::Pcomm | GroupBy::Comm => match pattern_field {
350                Some(field) => {
351                    let name = field(t);
352                    let pk = pattern_key(name);
353                    let counts = counts_ref.expect("pattern_counts seeded for Pcomm/Comm");
354                    if counts.get(&pk).copied().unwrap_or(0) >= 2 {
355                        pk
356                    } else {
357                        name.to_string()
358                    }
359                }
360                None => {
361                    // `no_thread_normalize` set — literal grouping.
362                    if group_by == GroupBy::Pcomm {
363                        t.pcomm.clone()
364                    } else {
365                        t.comm.clone()
366                    }
367                }
368            },
369            GroupBy::CommExact => t.comm.clone(),
370            GroupBy::Cgroup => {
371                let post_flatten = flatten_cgroup_path(&t.cgroup, flatten);
372                // When auto-normalize is enabled, the cgroup key map
373                // (built by `compare()` over the union of paths from
374                // both snapshots) maps each post-flatten path to its
375                // final tightened key (Layer 1 + 2 + 3). Otherwise,
376                // group by post-flatten path verbatim.
377                match cgroup_key_map.and_then(|m| m.get(&post_flatten)) {
378                    Some(k) => k.clone(),
379                    None => post_flatten,
380                }
381            }
382        };
383        buckets.entry(key).or_default().push(t);
384    }
385
386    let mut out = BTreeMap::new();
387    for (key, threads) in buckets {
388        let mut metrics = BTreeMap::new();
389        for m in CTPROF_METRICS {
390            let agg = aggregate(m.rule, &threads);
391            // Capture-gated families: a non-empty bucket whose threads ALL went
392            // uncaptured for the family folds to Absent, not a sentinel Sum(0),
393            // so a derived metric short-circuits to "-" and the raw row renders
394            // "-" rather than a measured-looking 0. A measured zero (>=1 thread
395            // captured it) stays its real aggregate.
396            let agg = match measured_predicate(m) {
397                Some(pred) if !threads.is_empty() && !threads.iter().copied().any(pred) => {
398                    Aggregated::Absent
399                }
400                _ => agg,
401            };
402            metrics.insert(m.name.to_string(), agg);
403        }
404        let cgroup_stats = if group_by == GroupBy::Cgroup {
405            // Pick the first sampled thread's raw cgroup path and
406            // look up its enrichment. `snap.cgroup_stats` is keyed
407            // by raw cgroup paths, so the lookup uses `t.cgroup`
408            // verbatim (flattening produces a separate map and is
409            // not written back here). When a flatten/tighten
410            // pattern collapses several raw cgroups into one
411            // bucket, the first thread's raw cgroup is used as the
412            // representative enrichment.
413            threads
414                .first()
415                .and_then(|t| snap.cgroup_stats.get(&t.cgroup).cloned())
416        } else {
417            None
418        };
419        // `members` feeds the grex display-label path for
420        // normalized `GroupBy::Comm` (literal comms) and
421        // `GroupBy::Pcomm` (literal pcomms). Other groupings — and
422        // either pattern-aware grouping under
423        // `no_thread_normalize` — render the join key directly, so
424        // skip the per-bucket name collection (saves a
425        // clone-per-thread per-bucket on busy hosts).
426        let members: Vec<String> = match pattern_field {
427            Some(field) => {
428                let mut v: Vec<String> = threads.iter().map(|t| field(t).to_string()).collect();
429                v.sort();
430                v.dedup();
431                v
432            }
433            None => Vec::new(),
434        };
435        let valid_starts: Vec<u64> = threads
436            .iter()
437            .map(|t| t.start_time_clock_ticks)
438            .filter(|&t| t > 0)
439            .collect();
440        let avg_start_ticks = if valid_starts.is_empty() {
441            0
442        } else {
443            valid_starts.iter().sum::<u64>() / valid_starts.len() as u64
444        };
445        out.insert(
446            key.clone(),
447            ThreadGroup {
448                key,
449                thread_count: threads.len(),
450                metrics,
451                cgroup_stats,
452                members,
453                avg_start_ticks,
454            },
455        );
456    }
457    out
458}
459
460/// Aggregate one metric across a slice of threads per its rule.
461///
462/// Each `Sum*` / `Max*` / `Range*` arm dispatches
463/// through the trait method on the typed newtype defined in
464/// [`crate::metric_types`] — `sum_across` for [`Summable`],
465/// `max_across` for [`Maxable`], `range_across` for [`Rangeable`]
466/// — then unwraps to the
467/// untyped scalar that [`Aggregated`] carries today; the `Mode*`
468/// arms instead route through [`mode_aggregate`] (tally-based),
469/// not a trait method; the
470/// unit-aware format dispatch (which would read the registry's
471/// `unit` tag rather than the wrapper type) is not implemented
472/// yet, so `Aggregated` stays scalar-shaped after this phase.
473///
474/// # Empty-bucket contract
475///
476/// The trait-level shapes split empty handling differently
477/// from the dispatch-level shape:
478/// - `Summable::sum_across` returns the additive identity
479///   (zero) on an empty input — the trait surface itself
480///   collapses the empty case. The `Sum*` arms therefore feed
481///   straight into [`Aggregated::Sum`] without re-checking.
482/// - `Maxable::max_across` returns `Option<Self>` (`None`
483///   for empty) so callers can distinguish "no contributors"
484///   from "all contributors had zero." The dispatch in this
485///   function collapses `None` to `Aggregated::Max(0)` at the
486///   call boundary so the historical empty-bucket contract on
487///   this code path (zero rendered for empty groups) holds
488///   regardless of the trait's richer shape.
489/// - `Rangeable::range_across` returns
490///   `Option<Range<Self>>`; the dispatch collapses `None` to
491///   `Aggregated::OrdinalRange { min: 0, max: 0 }` at the call
492///   boundary.
493/// - The `Mode*` arms call [`mode_aggregate`], which builds a
494///   per-value tally map; an empty bucket yields
495///   `Aggregated::Mode { tallies: (empty), total }` where `total`
496///   is the bucket size (`aggregate` passes `threads.len()`
497///   directly). The mode/count are derived on demand via
498///   [`Aggregated::mode_value`] / [`Aggregated::mode_count`].
499///
500/// Downstream delta math therefore sees a well-defined value
501/// at every join boundary regardless of which side of a
502/// compare carried zero threads under the bucket key.
503///
504/// [`Summable`]: crate::metric_types::Summable
505/// [`Maxable`]: crate::metric_types::Maxable
506/// [`Rangeable`]: crate::metric_types::Rangeable
507/// [`Modeable`]: crate::metric_types::Modeable
508///
509/// Mode-arm dispatch helper used by `aggregate`. Builds a
510/// per-value tally map (`BTreeMap<String, usize>`) over a typed
511/// iterator of [`crate::metric_types::CategoricalString`] and
512/// returns [`Aggregated::Mode`] with the supplied `total` (the
513/// number of threads in the bucket). Empty buckets surface as
514/// `Aggregated::Mode { tallies: (empty), total }` — downstream
515/// delta math sees
516/// a well-defined value at the join boundary regardless of which
517/// side carried zero threads. Lifts the otherwise-identical
518/// match arms for [`AggRule::Mode`], [`AggRule::ModeChar`], and
519/// [`AggRule::ModeBool`] into one site so a future refactor that
520/// changes the empty-bucket contract or the tally shape only
521/// edits one place.
522fn mode_aggregate(
523    total: usize,
524    items: impl IntoIterator<Item = crate::metric_types::CategoricalString>,
525) -> Aggregated {
526    // Build the full tally map across the bucket — one entry
527    // per distinct category, with its occurrence count. The
528    // mode (most-frequent value) is derived on demand by
529    // [`Aggregated::mode_value`] / [`Aggregated::mode_count`];
530    // storing the full distribution (not just the mode) lets
531    // the N:1 fudge merge compose tallies correctly across
532    // buckets via [`merge_aggregated_into`].
533    let mut tallies: BTreeMap<String, usize> = BTreeMap::new();
534    for item in items {
535        *tallies.entry(item.0).or_insert(0) += 1;
536    }
537    Aggregated::Mode { tallies, total }
538}
539
540/// Taskstats sub-family of a raw metric, classified by name — the gating
541/// granularity kernel delay/xacct accounting actually has. The three sub-families
542/// have DIFFERENT enablement conditions (see [`measured_predicate`]): `cpu_delay_*`
543/// survive the runtime delayacct toggle, the resource-wait categories do not, and the
544/// xacct watermarks key off a separate CONFIG with no runtime toggle.
545#[derive(Debug, Clone, Copy, PartialEq, Eq)]
546enum TaskstatsFamily {
547    /// `cpu_delay_*` — sched_info-sourced, filled unconditionally by
548    /// `delayacct_add_tsk`; active unless CONFIG_TASK_DELAY_ACCT is off entirely.
549    CpuDelay,
550    /// The 7 delayacct resource-wait categories (`blkio`/`swapin`/`freepages`/`thrashing`/
551    /// `compact`/`wpcopy`/`irq`) — gated by the runtime `task_delayacct` toggle
552    /// (`tsk->delays` is allocated at fork only when on).
553    DelayBlock,
554    /// The xacct memory watermarks (`hiwater_*`) — gated by CONFIG_TASK_XACCT, no
555    /// runtime toggle.
556    Xacct,
557}
558
559/// Classify a RAW taskstats metric by name. `None` for non-taskstats metrics.
560/// Derived taskstats metrics (`avg_*_delay_ns`, `total_offcpu_delay_ns`) are NOT
561/// classified here — they inherit [`Aggregated::Absent`] from their raw inputs
562/// ([`Aggregated::numeric`] returns `None`, so the derived `compute` closure
563/// short-circuits). The
564/// `build_groups_all_subfamilies_inactive_makes_every_taskstats_metric_absent`
565/// test pins that every `Section::TaskstatsDelay` raw metric maps to `Some`.
566fn taskstats_family(name: &str) -> Option<TaskstatsFamily> {
567    if name.starts_with("cpu_delay_") {
568        return Some(TaskstatsFamily::CpuDelay);
569    }
570    if name.starts_with("hiwater_") {
571        return Some(TaskstatsFamily::Xacct);
572    }
573    const LOCK_DELAY_PREFIXES: [&str; 7] = [
574        "blkio_delay_",
575        "swapin_delay_",
576        "freepages_delay_",
577        "thrashing_delay_",
578        "compact_delay_",
579        "wpcopy_delay_",
580        "irq_delay_",
581    ];
582    if LOCK_DELAY_PREFIXES.iter().any(|p| name.starts_with(p)) {
583        return Some(TaskstatsFamily::DelayBlock);
584    }
585    None
586}
587
588/// The per-thread "was this metric's family genuinely measured" predicate for a
589/// capture-gated metric, or `None` for an always-measured metric. A taskstats
590/// metric is measured iff this thread's genetlink query succeeded
591/// (`taskstats_measured`) AND its sub-family is enabled host-wide — per
592/// [`TaskstatsFamily`], `cpu_delay_*` need CONFIG_TASK_DELAY_ACCT built in, the
593/// resource-wait categories additionally need the runtime delayacct toggle on, and
594/// the `hiwater_*` watermarks need CONFIG_TASK_XACCT (the active flags are baked
595/// per-thread at capture). jemalloc is the two byte counters, which live in the
596/// shared `Section::Primary` and so are name-matched. `build_groups` folds a
597/// non-empty bucket where the predicate holds but NO thread measured the family
598/// to [`Aggregated::Absent`], distinguishing a never-captured (or
599/// sub-family-disabled) family from a measured zero (the bug: a derived metric
600/// otherwise computes from a sentinel `0` and renders "0" instead of "-").
601fn measured_predicate(m: &CtprofMetricDef) -> Option<fn(&ThreadState) -> bool> {
602    if let Some(fam) = taskstats_family(m.name) {
603        return Some(match fam {
604            TaskstatsFamily::CpuDelay => |t| t.taskstats_measured && t.cpu_delay_active,
605            TaskstatsFamily::DelayBlock => |t| t.taskstats_measured && t.delay_block_active,
606            TaskstatsFamily::Xacct => |t| t.taskstats_measured && t.xacct_active,
607        });
608    }
609    if m.name == "allocated_bytes" || m.name == "deallocated_bytes" {
610        return Some(|t| t.jemalloc_measured);
611    }
612    None
613}
614
615pub fn aggregate(rule: AggRule, threads: &[&ThreadState]) -> Aggregated {
616    // `Modeable` is not used anywhere in this module: the Mode
617    // arms tally directly in `mode_aggregate` rather than calling
618    // `mode_across`. `CategoricalString` is still needed because
619    // the ModeChar / ModeBool arms construct one for the
620    // coercion path before passing the iterator to
621    // `mode_aggregate`.
622    use crate::metric_types::{CategoricalString, Maxable, Rangeable, Summable};
623    match rule {
624        AggRule::SumCount(f) => {
625            let s = crate::metric_types::MonotonicCount::sum_across(threads.iter().map(|t| f(t)));
626            Aggregated::Sum(s.0)
627        }
628        AggRule::SumNs(f) => {
629            let s = crate::metric_types::MonotonicNs::sum_across(threads.iter().map(|t| f(t)));
630            Aggregated::Sum(s.0)
631        }
632        AggRule::SumTicks(f) => {
633            let s = crate::metric_types::ClockTicks::sum_across(threads.iter().map(|t| f(t)));
634            Aggregated::Sum(s.0)
635        }
636        AggRule::SumBytes(f) => {
637            let s = crate::metric_types::Bytes::sum_across(threads.iter().map(|t| f(t)));
638            Aggregated::Sum(s.0)
639        }
640        AggRule::MaxPeak(f) => {
641            // `max_across` returns `Option<Self>` so callers can
642            // distinguish "empty thread bucket" from "all
643            // contributors had zero." The historical empty-bucket
644            // contract on this code path was `Aggregated::Max(0)`;
645            // preserve it by collapsing `None` to the additive
646            // identity at the call boundary. Non-empty buckets
647            // produce a concrete max regardless of value.
648            let m = crate::metric_types::PeakNs::max_across(threads.iter().map(|t| f(t)));
649            Aggregated::Max(m.map(|v| v.0).unwrap_or(0))
650        }
651        AggRule::MaxPeakBytes(f) => {
652            // Same Option<Self> + None → Aggregated::Max(0)
653            // collapse as MaxPeak; the difference is only the
654            // typed accessor's unit family — Bytes vs Ns. The
655            // ladder() match maps this variant to
656            // ScaleLadder::Bytes so the renderer auto-scales
657            // with KiB/MiB/GiB/TiB suffixes.
658            let m = crate::metric_types::PeakBytes::max_across(threads.iter().map(|t| f(t)));
659            Aggregated::Max(m.map(|v| v.0).unwrap_or(0))
660        }
661        AggRule::MaxGaugeNs(f) => {
662            let m = crate::metric_types::GaugeNs::max_across(threads.iter().map(|t| f(t)));
663            Aggregated::Max(m.map(|v| v.0).unwrap_or(0))
664        }
665        AggRule::MaxGaugeCount(f) => {
666            let m = crate::metric_types::GaugeCount::max_across(threads.iter().map(|t| f(t)));
667            Aggregated::Max(m.map(|v| v.0).unwrap_or(0))
668        }
669        AggRule::RangeI32(f) => {
670            match crate::metric_types::OrdinalI32::range_across(threads.iter().map(|t| f(t))) {
671                // `range_across` returns `None` only on an empty
672                // iterator — mirror the historical empty-group
673                // contract by collapsing to (0, 0) so the
674                // downstream midpoint and delta math sees a
675                // well-defined value at the join boundary. The
676                // `Some` arm carries a typed `Range<OrdinalI32>`
677                // wrapper that guarantees min ≤ max as a
678                // type-system invariant; `into_tuple()` extracts
679                // the pair without re-checking.
680                Some(r) => {
681                    let (min, max) = r.into_tuple();
682                    Aggregated::OrdinalRange {
683                        min: i64::from(min.0),
684                        max: i64::from(max.0),
685                    }
686                }
687                None => Aggregated::OrdinalRange { min: 0, max: 0 },
688            }
689        }
690        AggRule::RangeU32(f) => {
691            match crate::metric_types::OrdinalU32::range_across(threads.iter().map(|t| f(t))) {
692                Some(r) => {
693                    let (min, max) = r.into_tuple();
694                    Aggregated::OrdinalRange {
695                        min: i64::from(min.0),
696                        max: i64::from(max.0),
697                    }
698                }
699                None => Aggregated::OrdinalRange { min: 0, max: 0 },
700            }
701        }
702        AggRule::Mode(f) => mode_aggregate(threads.len(), threads.iter().map(|t| f(t))),
703        AggRule::ModeChar(f) => mode_aggregate(
704            threads.len(),
705            // `char` is not Modeable directly; coerce to the
706            // CategoricalString reduction so the lex-tiebreak
707            // contract is identical to other Mode variants.
708            threads.iter().map(|t| CategoricalString(f(t).to_string())),
709        ),
710        AggRule::ModeBool(f) => mode_aggregate(
711            threads.len(),
712            // Same coercion path as `ModeChar`. `to_string()`
713            // produces `"true"`/`"false"` per `bool::Display`.
714            threads.iter().map(|t| CategoricalString(f(t).to_string())),
715        ),
716        AggRule::Affinity(f) => {
717            let mut seen: Vec<Vec<u32>> = Vec::new();
718            let mut min_cpus = usize::MAX;
719            let mut max_cpus = 0usize;
720            for t in threads {
721                let cpus = f(t).0;
722                min_cpus = min_cpus.min(cpus.len());
723                max_cpus = max_cpus.max(cpus.len());
724                if !seen.iter().any(|s| s == &cpus) {
725                    seen.push(cpus);
726                }
727            }
728            if threads.is_empty() {
729                min_cpus = 0;
730            }
731            let uniform = if seen.len() == 1 {
732                seen.into_iter().next()
733            } else {
734                None
735            };
736            Aggregated::Affinity(AffinitySummary {
737                min_cpus,
738                max_cpus,
739                uniform,
740            })
741        }
742    }
743}
744
745/// Collapse dynamic segments of a cgroup path per every pattern
746/// in `patterns`. A pattern is a glob matched with glob's default
747/// `MatchOptions` (`require_literal_separator = false`), so `*` is
748/// NOT segment-bounded — it matches across `/` just like `**`. The
749/// literal portions are preserved
750/// and the wildcard portions are replaced with the wildcard token
751/// itself. Example: pattern `/kubepods/*/workload` applied to
752/// `/kubepods/pod-abc/workload` produces `/kubepods/*/workload`,
753/// so two runs with different pod IDs collapse onto the same key.
754///
755/// Patterns are tried in listed order; the first match wins and
756/// subsequent patterns are not applied. A path that matches no
757/// pattern is returned verbatim.
758pub fn flatten_cgroup_path(path: &str, patterns: &[glob::Pattern]) -> String {
759    for p in patterns {
760        if p.matches(path) {
761            // The pattern itself becomes the canonical key: every
762            // path matching `/kubepods/*/workload` collapses onto
763            // the literal pattern string.
764            return p.as_str().to_string();
765        }
766    }
767    path.to_string()
768}
769
770pub fn compile_flatten_patterns(raw: &[String]) -> Vec<glob::Pattern> {
771    raw.iter()
772        .filter_map(|s| glob::Pattern::new(s).ok())
773        .collect()
774}