ktstr/probe/
process.rs

1//! Probe skeleton lifecycle: load, attach, run, collect.
2//!
3//! When the pipeline is split into two phases (see [`PhaseBInput`]):
4//! - **Phase A** attaches kprobes + the trigger + kernel fexit before
5//!   the scheduler starts; it runs under the initial skeleton load.
6//! - **Phase B** attaches fentry/fexit to the scheduler's BPF
7//!   struct_ops callbacks after the scheduler is up and BPF programs
8//!   are discoverable.
9//!
10//! In the single-phase path, all probes attach after the scheduler is
11//! up.
12//!
13//! ## Two-phase sync mechanism
14//!
15//! Phase A runs on a probe worker thread. Caller and worker
16//! synchronize via two `Latch`es and one mpsc channel:
17//!
18//! 1. Caller spawns the probe worker, which loads the skeleton and
19//!    attaches kprobes + trigger + kernel fexit, then signals the
20//!    `probes_ready` latch (see `ready.set()` below). The worker
21//!    then enters the ringbuf poll loop.
22//! 2. Caller waits on `probes_ready`. After it fires, the caller
23//!    starts the scheduler — the scheduler launches with Phase A
24//!    probes already attached, so the trigger and any kprobes that
25//!    fire during scheduler init are observed.
26//! 3. After the scheduler is up, the caller discovers BPF programs
27//!    by scheduler pid (see `discover_bpf_symbols` /
28//!    `expand_bpf_to_kernel_callers`) and sends a [`PhaseBInput`]
29//!    over the channel. The Phase B input includes BPF program FDs
30//!    held open while the scheduler is alive.
31//! 4. The probe worker's poll loop calls `try_recv` on the channel
32//!    every 100 ms; on receipt it attaches BPF fentry/fexit + extra
33//!    kprobes for kernel callers, then signals the `done` latch on
34//!    the [`PhaseBInput`].
35//! 5. Caller waits on `done` and proceeds to the test scenario with
36//!    full instrumentation in place.
37
38use std::sync::Arc;
39use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
40use std::time::Duration;
41
42use super::btf::{BtfFunc, RenderHint, STRUCT_FIELDS};
43use super::stack::StackFunction;
44
45use crate::bpf_skel::types;
46use crate::sync::{Latch, RwLockExt};
47
48/// Cross-thread mirror of the BPF probe's `ktstr_err_exit_detected`
49/// BSS latch. The probe-poll thread reads the BSS slot via volatile
50/// load each iteration and `store`s the observed value here; scenario
51/// code calls [`sched_exit_kind`] from any thread to read the current
52/// classification synchronously without owning the skeleton.
53///
54/// Encoded as `u32` for `AtomicU32`-friendliness:
55/// - `0` (initial / probe-not-yet-armed / scheduler still alive):
56///   [`SchedExitKind::Unknown`].
57/// - `1` ([`SchedExitKind::Clean`]) — probe armed, BSS latch read as
58///   `0` after at least one poll iteration. Means the scheduler is
59///   either still alive (latch never set) or exited via the clean
60///   `SCX_EXIT_NONE` path.
61/// - `2` ([`SchedExitKind::Crashed`]) — probe armed, BSS latch
62///   observed at `!= 0`. Set by the BPF trace_sched_ext_exit handler
63///   under any non-clean kernel exit (`SCX_EXIT_ERROR`,
64///   `SCX_EXIT_ERROR_STALL`, watchdog kick, BPF-side error).
65///
66/// 0 → 1/2 transitions are monotonic for a single test run; once the
67/// kernel emits the exit trace, the latch never clears. Cross-thread
68/// observation uses Release / Acquire so the scenario-side load sees
69/// every prior probe-side store.
70pub(crate) static PROBE_SCHED_EXIT_STATE: AtomicU32 = AtomicU32::new(0);
71
72const PROBE_EXIT_STATE_CLEAN: u32 = 1;
73const PROBE_EXIT_STATE_CRASHED: u32 = 2;
74
75/// Classification of the scheduler's exit observed by the probe
76/// poll thread. Mirrors the BSS latch in
77/// [`PROBE_SCHED_EXIT_STATE`].
78#[derive(Debug, Clone, Copy, PartialEq, Eq)]
79pub enum SchedExitKind {
80    /// Probe has not observed the scheduler's exit yet — either the
81    /// scheduler is still alive, the probe never armed for this run,
82    /// or the poll thread has not completed a single iteration since
83    /// the prior reset. Callers gating clean-vs-crash classification
84    /// on a known-dead scheduler treat `Unknown` as a probe-coverage
85    /// gap, not as evidence of either outcome.
86    Unknown,
87    /// Probe armed AND completed at least one observation with the
88    /// BSS latch unset. Scheduler is either still alive or exited
89    /// cleanly via the `SCX_EXIT_NONE` path that does not latch the
90    /// error flag.
91    Clean,
92    /// Probe armed AND observed the BSS latch set. Kernel emitted a
93    /// non-clean `trace_sched_ext_exit` event (error, stall,
94    /// watchdog kick, BPF-side error) before this read.
95    Crashed,
96}
97
98/// Read the current probe-observed scheduler exit classification.
99/// Synchronous, cross-thread-safe (Acquire load mirrors the probe's
100/// Release stores). Returns [`SchedExitKind::Unknown`] when the
101/// probe has not yet completed its first poll iteration.
102pub fn sched_exit_kind() -> SchedExitKind {
103    match PROBE_SCHED_EXIT_STATE.load(Ordering::Acquire) {
104        PROBE_EXIT_STATE_CLEAN => SchedExitKind::Clean,
105        PROBE_EXIT_STATE_CRASHED => SchedExitKind::Crashed,
106        _ => SchedExitKind::Unknown,
107    }
108}
109
110/// Override the cross-thread exit-state mirror. The probe poll thread
111/// is the production writer (it mirrors the BSS `ktstr_err_exit_detected`
112/// latch into [`PROBE_SCHED_EXIT_STATE`]); this lets host-side tests,
113/// where no BPF probe runs, simulate an observed crash latch. Pair a
114/// `Crashed` set with a reset to [`SchedExitKind::Unknown`] so neighbor
115/// tests start from a clean mirror.
116#[cfg(test)]
117pub(crate) fn set_probe_sched_exit_state(kind: SchedExitKind) {
118    let encoded = match kind {
119        SchedExitKind::Unknown => 0,
120        SchedExitKind::Clean => PROBE_EXIT_STATE_CLEAN,
121        SchedExitKind::Crashed => PROBE_EXIT_STATE_CRASHED,
122    };
123    PROBE_SCHED_EXIT_STATE.store(encoded, Ordering::Release);
124}
125
126/// Input for Phase B probe attachment (BPF fentry/fexit).
127///
128/// Sent via mpsc channel after the scheduler starts and BPF programs
129/// are discoverable. Phase A (kprobes + trigger + kernel fexit) runs
130/// before the scheduler; Phase B attaches fentry/fexit to the
131/// scheduler's BPF struct_ops callbacks.
132pub struct PhaseBInput {
133    /// BPF functions and kernel callers discovered from the running
134    /// scheduler. Includes both BPF callbacks (for fentry) and their
135    /// kernel-side callers from `expand_bpf_to_kernel_callers` (for
136    /// additional kprobes).
137    pub functions: Vec<StackFunction>,
138    /// Pre-opened BPF program FDs keyed by prog_id.
139    pub bpf_prog_fds: std::collections::HashMap<u32, i32>,
140    /// BTF-resolved function signatures for BPF callbacks and kernel callers.
141    pub btf_funcs: Vec<BtfFunc>,
142    /// Signaled by the probe worker thread once Phase B attachment
143    /// completes so the dispatch path can proceed past its wait.
144    pub done: Arc<Latch>,
145    /// Starting func_idx for Phase B functions. Must equal the number
146    /// of functions in Phase A to avoid index collisions in the shared
147    /// `func_meta_map` and `probe_data` maps.
148    pub func_idx_offset: u32,
149}
150
151/// Ring buffer event type for the trigger (matches `EVENT_TRIGGER`
152/// in `intf.h`). Currently the only record type emitted on the
153/// `ktstr_events` ringbuf — `EVENT_SCX_EVENT` was removed alongside
154/// the `tp_btf/sched_ext_event` BPF handler.
155const EVENT_TRIGGER: u32 = 2;
156
157/// Maximum string length carried in a probe_event entry (matches
158/// `MAX_STR_LEN` in `intf.h`). Used to size the `RbEvent.str_val`
159/// field for byte-level wire compatibility with `struct probe_event`
160/// in `intf.h`; the Rust dispatch path leaves it zeroed because
161/// the only producer that populated it (the removed
162/// `tp_btf/sched_ext_event` handler) is gone.
163const MAX_STR_LEN: usize = 64;
164
165/// Pipeline diagnostics from a probe run.
166///
167/// Tracks how many functions/events survived each stage so users can
168/// see WHERE data is being lost (filter, attach, capture, stitch).
169#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
170pub struct ProbeDiagnostics {
171    /// Kernel functions resolved to IPs.
172    pub kprobe_resolved: u32,
173    /// Kernel functions that failed IP resolution.
174    pub kprobe_resolve_failed: Vec<String>,
175    /// Kprobes successfully attached.
176    pub kprobe_attached: u32,
177    /// Kprobes that failed to attach (name, error).
178    pub kprobe_attach_failed: Vec<(String, String)>,
179    /// BPF functions with valid prog IDs for fentry.
180    pub fentry_candidates: u32,
181    /// Fentry probes successfully attached.
182    pub fentry_attached: u32,
183    /// Fentry probes that failed (name, error).
184    pub fentry_attach_failed: Vec<(String, String)>,
185    /// Total keys in probe_data map at readout.
186    pub probe_data_keys: u32,
187    /// Keys with unmatched IPs (no func_meta entry).
188    pub probe_data_unmatched_ips: u32,
189    /// Events read from probe_data before stitching.
190    pub events_before_stitch: u32,
191    /// Events surviving tptr+time stitching.
192    pub events_after_stitch: u32,
193    /// Whether the trigger fired.
194    pub trigger_fired: bool,
195    /// Which trigger mechanism attached ("tp_btf").
196    pub trigger_type: String,
197    /// Error from tp_btf/sched_ext_exit attach failure.
198    #[serde(default, skip_serializing_if = "Option::is_none")]
199    pub trigger_attach_error: Option<String>,
200    /// Panic payload from the guest-side probe-collection thread
201    /// when its `JoinHandle::join()` returned `Err`. `None` on a
202    /// clean run (thread exited normally — events may still be
203    /// empty if the trigger never fired). `Some(payload)`
204    /// distinguishes "the probe thread crashed before producing
205    /// events" from "the probe thread ran cleanly and observed no
206    /// trigger" — the COM2 payload's `events: []` is otherwise
207    /// indistinguishable between those two cases. Any consumer of
208    /// the payload (host harness, render layer, downstream test
209    /// verdict) MUST treat `Some(_)` as a failure even when
210    /// `trigger.fired == false` and `events` is empty.
211    #[serde(default, skip_serializing_if = "Option::is_none")]
212    pub host_thread_panic: Option<String>,
213    /// BPF-side kprobe fire count (cross-CPU sum of the
214    /// `KTSTR_PCPU_PROBE_COUNT` slot in `ktstr_pcpu_counters`).
215    pub bpf_kprobe_fires: u64,
216    /// BPF-side kprobe commit count (cross-CPU sum of the
217    /// `KTSTR_PCPU_KPROBE_RETURNS` slot).
218    /// `bpf_kprobe_fires - bpf_kprobe_returns` is the number of
219    /// kprobe fires that bailed before pushing into `probe_data`
220    /// (meta-map miss or scratch-slot miss).
221    pub bpf_kprobe_returns: u64,
222    /// BPF-side trigger fire count (cross-CPU sum of the
223    /// `KTSTR_PCPU_TRIGGER_COUNT` slot).
224    pub bpf_trigger_fires: u64,
225    /// BPF-side func_meta_map misses (cross-CPU sum of the
226    /// `KTSTR_PCPU_META_MISS` slot — IP not found in map).
227    pub bpf_meta_misses: u64,
228    /// IPs that missed func_meta_map lookup (from BSS ktstr_miss_log).
229    pub bpf_miss_ips: Vec<u64>,
230    /// BPF-side `bpf_ringbuf_reserve` failures inside the trigger
231    /// handler (cross-CPU sum of `KTSTR_PCPU_RINGBUF_DROPS`).
232    /// Non-zero means the userspace consumer fell behind on the
233    /// events ringbuf, so auto-repro will see a missing trigger
234    /// event even though the scheduler did fire.
235    pub bpf_ringbuf_drops: u64,
236    /// Nanosecond timestamp captured by the BPF trigger handler on
237    /// the first error-class `sched_ext_exit` (from BSS
238    /// ktstr_last_trigger_ts). 0 when no error-class exit fired.
239    pub bpf_first_trigger_ns: u64,
240    /// `kind` argument captured by the BPF trigger handler on the
241    /// first error-class `sched_ext_exit` (from BSS
242    /// `ktstr_exit_kind_snap`). 0 when no error-class exit fired,
243    /// otherwise one of the [`SCX_EXIT_*`](super::scx_defs) values.
244    /// Used by the host renderer to disambiguate "trigger fired with
245    /// kind=STALL/ERROR (no causal task; events suppressed)" from
246    /// "trigger never fired" when the post-stitch event count is 0.
247    pub bpf_exit_kind_snap: u32,
248    /// `true` when the readout phase reached the no-causal-tptr
249    /// branch and emitted events grouped by frequency rather than
250    /// stitched against a real trigger task pointer. Set in
251    /// [`run_probe_skeleton`] when the trigger fired with
252    /// `args[0] == 0` (kind=STALL or generic ERROR) or never fired
253    /// at all but at least one captured kprobe event had a non-zero
254    /// task pointer. Surfaced by the host renderer as
255    /// `events: ... — trigger absent, grouped by frequency` so the
256    /// operator does not misread the candidate chain as a verified
257    /// stitch.
258    pub stitch_fallback_used: bool,
259    /// Cumulative count of `tp_btf/sched_switch +
260    /// sched_migrate_task + sched_wakeup` records committed into
261    /// the dedicated `timeline_events` ringbuf by the timeline
262    /// handlers (cross-CPU sum of the
263    /// `KTSTR_PCPU_TIMELINE_COUNT` slot). Zero before any of
264    /// those tracepoints fire; otherwise grows continuously
265    /// while the probe runs. Combined with `bpf_timeline_drops`
266    /// it lets an operator tell whether a failure-time drain
267    /// saw the full window or only the tail.
268    pub bpf_timeline_count: u64,
269    /// `bpf_ringbuf_reserve` failures across the three timeline
270    /// tracepoint handlers (sched_switch / sched_migrate_task /
271    /// sched_wakeup), aggregated as cross-CPU sum of the
272    /// `KTSTR_PCPU_TIMELINE_DROPS` slot. Each drop is one new
273    /// event lost — the ring's existing contents are NOT evicted
274    /// on overflow, so the drain on failure recovers the OLDEST
275    /// captured events first.
276    pub bpf_timeline_drops: u64,
277}
278
279/// Structured probe event captured by the BPF skeleton.
280///
281/// One per (function, task_ptr) combination. `fields` contains BTF-resolved
282/// struct field values keyed as `"param:struct.field"` (from
283/// [`build_field_keys`]). Events are sorted by `ts` and stitched by
284/// `task_struct` pointer before output.
285#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
286pub struct ProbeEvent {
287    /// Index into the run's function list (matches the slot used in
288    /// the shared `func_meta_map`).
289    pub func_idx: u32,
290    /// `task_struct` pointer for the thread that triggered the probe;
291    /// used to stitch entry/exit pairs.
292    pub task_ptr: u64,
293    /// Nanosecond timestamp captured at entry.
294    pub ts: u64,
295    /// First six callee arguments captured verbatim from the call
296    /// site (after convention lowering).
297    pub args: [u64; 6],
298    /// BTF-resolved struct field values decoded post-hoc by the
299    /// caller; each entry is `(field_key, raw_value)`.
300    pub fields: Vec<(String, u64)>,
301    /// Kernel stack trace captured at entry, most recent frame first.
302    pub kstack: Vec<u64>,
303    /// Optional UTF-8 string associated with the event (e.g. a
304    /// scheduler-provided exit reason).
305    #[serde(default, skip_serializing_if = "Option::is_none")]
306    pub str_val: Option<String>,
307    /// Post-mutation field values captured by fexit.
308    /// Same field keys as `fields`, paired by index.
309    #[serde(default, skip_serializing_if = "Vec::is_empty")]
310    pub exit_fields: Vec<(String, u64)>,
311    /// Timestamp when fexit fired.
312    #[serde(default, skip_serializing_if = "Option::is_none")]
313    pub exit_ts: Option<u64>,
314}
315
316/// Parse `/proc/kallsyms` into a `name -> address` map. Returns `None`
317/// when the file is unreadable (expected outside a privileged context)
318/// OR when every parsed entry has a zero address (see
319/// [`accept_kallsyms_map`] for the kptr_restrict rationale).
320fn load_kallsyms() -> Option<std::collections::HashMap<String, u64>> {
321    let raw = std::fs::read_to_string("/proc/kallsyms").ok()?;
322    accept_kallsyms_map(parse_kallsyms(&raw))
323}
324
325/// Return `Some(map)` when at least one entry has a non-zero address,
326/// otherwise `None`. The all-zero case is what the kernel emits under
327/// `kernel.kptr_restrict=2` for non-CAP_SYSLOG callers — the file is
328/// readable, all symbol names are present, but every line carries
329/// `0000000000000000` for its address. Caching such a map would
330/// poison every later [`resolve_func_ip`] lookup with `Some(0)`,
331/// masking the unprivileged state from the retry-after-sudo path;
332/// treating it as a load failure instead lets the next caller (after
333/// [`RETRY_MIN_INTERVAL`]) try again under the new privilege level.
334fn accept_kallsyms_map(
335    map: std::collections::HashMap<String, u64>,
336) -> Option<std::collections::HashMap<String, u64>> {
337    if !map.values().any(|&a| a != 0) {
338        tracing::warn!(
339            entries = map.len(),
340            "/proc/kallsyms parsed with zero addresses only — kptr_restrict \
341             likely active; declining to cache",
342        );
343        return None;
344    }
345    Some(map)
346}
347
348/// Parse kallsyms-format text (one `HEX TYPE NAME ...` line per
349/// symbol) into a `name -> address` map. Extracted from
350/// [`load_kallsyms`] so unit tests can exercise the parser without
351/// touching `/proc/kallsyms`, which is usually unreadable in the
352/// unprivileged contexts the crate runs under.
353///
354/// Skipped lines (silently, without affecting other symbols):
355/// - lines with fewer than 3 whitespace-separated tokens (addr,
356///   type, name — all three are required; the type column is
357///   accepted but ignored)
358/// - lines whose first token is not a hex-parseable `u64`
359///
360/// A permanently-empty map is a valid return value — callers treat
361/// it as "no symbols found" rather than an error.
362fn parse_kallsyms(raw: &str) -> std::collections::HashMap<String, u64> {
363    // No pre-scan — HashMap grows from empty in a single pass over
364    // `raw`.
365    let mut map = std::collections::HashMap::new();
366    for line in raw.lines() {
367        let mut parts = line.split_whitespace();
368        let Some(addr) = parts.next() else { continue };
369        let _ty = parts.next();
370        let Some(sym) = parts.next() else { continue };
371        let Ok(addr) = u64::from_str_radix(addr, 16) else {
372            continue;
373        };
374        map.insert(sym.to_string(), addr);
375    }
376    map
377}
378
379/// Resolve a kernel function name to its address via /proc/kallsyms.
380///
381/// The parsed `name -> address` map is cached on first successful load
382/// so later lookups avoid re-reading and re-splitting ~200k lines.
383/// Callers that resolve many functions in a batch (auto-probe attach,
384/// probe-stack load) drop from O(N\*M) line scans to O(N) hash lookups.
385///
386/// A failed load (unreadable `/proc/kallsyms` — typical for
387/// unprivileged processes where the file is either missing or
388/// returns zeroed addresses) is rate-limited to one retry per
389/// `RETRY_MIN_INTERVAL` (1 s); calls within that window return
390/// `None` immediately. This matters both for performance — a caller
391/// resolving N symbols under a permanently unreadable
392/// `/proc/kallsyms` pays one load attempt, not N — and for
393/// privilege-escalation correctness, where a test harness that
394/// re-execs under `sudo` after the first miss still sees a retry
395/// within seconds.
396pub fn resolve_func_ip(name: &str) -> Option<u64> {
397    use std::sync::{OnceLock, RwLock};
398    use std::time::Instant;
399    static CACHE: OnceLock<RwLock<Option<std::collections::HashMap<String, u64>>>> =
400        OnceLock::new();
401    // Rate-limit the load retry on chronic failure. Without a
402    // floor, a caller that resolves N symbols under a permanently
403    // unreadable /proc/kallsyms triggers N * (read + parse) of a
404    // ~10 MB file. The floor turns that into one retry per window
405    // and returns `None` fast in between.
406    static LAST_LOAD_ATTEMPT: OnceLock<RwLock<Option<Instant>>> = OnceLock::new();
407
408    let slot = CACHE.get_or_init(|| RwLock::new(None));
409    // Fast path: take the read lock when the cache is populated.
410    // Post-load the lookup is read-only and batches resolving many
411    // symbols contend only on the shared read lock.
412    {
413        let read = slot.read_unpoisoned();
414        if let Some(map) = read.as_ref() {
415            return map.get(name).copied();
416        }
417    }
418    // Optional fast-decline: when the retry clock rules the caller
419    // out, avoid the write-lock acquire entirely. This is a
420    // performance hint only — correctness is enforced by the
421    // re-check below under the write lock, so a concurrent racer
422    // that slips past this gate still gets serialized.
423    let last_slot = LAST_LOAD_ATTEMPT.get_or_init(|| RwLock::new(None));
424    {
425        let last = last_slot.read_unpoisoned();
426        if let Some(t) = *last
427            && t.elapsed() < RETRY_MIN_INTERVAL
428        {
429            return None;
430        }
431    }
432    // Slow path: escalate to write lock to populate. Re-check both
433    // the cache and the retry clock under the write lock so N
434    // concurrent first-callers don't stampede into N serialized
435    // loads: only the winner gets past the timestamp gate, everyone
436    // else observes `*last = Some(now)` and bails.
437    let mut write = slot.write_unpoisoned();
438    if write.is_none() {
439        let mut last = last_slot.write_unpoisoned();
440        if last.is_none_or(|t| t.elapsed() >= RETRY_MIN_INTERVAL) {
441            *write = load_kallsyms();
442            *last = Some(Instant::now());
443        }
444    }
445    write.as_ref()?.get(name).copied()
446}
447
448/// Minimum interval between retry attempts when `/proc/kallsyms` is
449/// unreadable; see [`resolve_func_ip`] for the rationale.
450const RETRY_MIN_INTERVAL: std::time::Duration = std::time::Duration::from_secs(1);
451
452/// Build a `func_idx -> task_struct_param_idx` map for stitching by
453/// task pointer. Resolves each function's task-struct argument
454/// position from [`super::stack::BPF_OP_CALLERS`] first, falling back
455/// to the BTF param list (Phase A `btf_funcs` chained with Phase B
456/// `phase_b_btf`).
457///
458/// Entries with `pidx >= 6` are dropped with a warn rather than
459/// stored: the stitch code reads `ProbeEvent::args[pidx]` against a
460/// fixed-size `[u64; 6]` (matching the BPF-side capture limit), so
461/// any larger index would panic. A function with its task_struct
462/// past arg-6 simply cannot be stitched here — the BPF probe never
463/// captured that arg.
464fn build_task_param_idx(
465    func_ips: &[(u32, u64, String)],
466    btf_funcs: &[BtfFunc],
467    phase_b_btf: &[BtfFunc],
468) -> std::collections::HashMap<u32, usize> {
469    func_ips
470        .iter()
471        .filter_map(|(idx, _, name)| {
472            // BPF_OP_CALLERS: (op_fragment, kernel_caller, task_arg_idx)
473            let pidx = if let Some((_, _, tidx)) = super::stack::BPF_OP_CALLERS
474                .iter()
475                .find(|(_, caller, _)| *caller == name.as_str())
476            {
477                *tidx as usize
478            } else {
479                // Fallback: BTF params with task_struct
480                let btf = btf_funcs
481                    .iter()
482                    .chain(phase_b_btf.iter())
483                    .find(|f| f.name == *name)?;
484                btf.params
485                    .iter()
486                    .position(|p| p.struct_name.as_deref() == Some("task_struct"))?
487            };
488            if pidx >= 6 {
489                tracing::warn!(
490                    func = %name,
491                    pidx,
492                    "task_struct param index out of args[6] bounds — \
493                     skipping stitch entry",
494                );
495                return None;
496            }
497            Some((*idx, pidx))
498        })
499        .collect()
500}
501
502/// Populate a `func_meta` with field specs from BTF-resolved offsets.
503/// Shared between kprobe and fentry paths.
504///
505/// Invariant: `meta.nr_field_specs = max(field_idx) + 1`, NOT the count
506/// of specs. The BPF side writes `entry.fields[field_idx]` positionally,
507/// and the Rust side reads `entry.fields[..nr_field_specs]` positionally
508/// against [`build_field_keys`] (which includes skipped fields). A
509/// reorder of either loop that turns this into a count would silently
510/// mismatch keys to values.
511fn populate_field_specs(meta: &mut types::func_meta, field_specs: &[super::btf::FieldSpec]) {
512    let n = field_specs.len().min(16);
513    let max_fidx = field_specs
514        .iter()
515        .take(n)
516        .map(|fs| fs.field_idx)
517        .max()
518        .map(|m| m + 1)
519        .unwrap_or(0);
520    meta.nr_field_specs = max_fidx.min(16);
521    for fs in field_specs.iter().take(n) {
522        let slot = fs.field_idx as usize;
523        if slot < 16 {
524            meta.specs[slot] = types::field_spec {
525                param_idx: fs.param_idx,
526                offset: fs.offset,
527                size: fs.size,
528                field_idx: fs.field_idx,
529                ptr_offset: fs.ptr_offset,
530            };
531        }
532    }
533}
534
535/// Build field key names for a function based on its BTF info.
536///
537/// Returns a vec mapping `field_idx` to an output key name. Format:
538/// - Known struct param: `"p:task_struct.pid"`
539/// - Auto-discovered BPF struct: `"ctx:task_ctx.field_a"`
540/// - Scalar param: `"flags:val.flags"`
541///
542/// Processes at most 6 params (fentry/kprobe register limit) and
543/// at most 16 fields total (matching `MAX_FIELDS` in intf.h).
544///
545/// Invariant: keys are emitted in the same order [`populate_field_specs`]
546/// consumes `field_specs`, so `entry.fields[i]` maps to the i-th key.
547fn build_field_keys(btf_func: &BtfFunc) -> Vec<(String, RenderHint)> {
548    let mut keys = Vec::new();
549    let mut field_idx: u32 = 0;
550
551    let max_params = btf_func.params.len().min(6);
552    for param in &btf_func.params[..max_params] {
553        if let Some(ref sname) = param.struct_name {
554            if let Some((_, fields)) = STRUCT_FIELDS.iter().find(|(s, _)| *s == sname) {
555                for (_, key) in *fields {
556                    // Known struct fields use dedicated decoders in
557                    // decode.rs — hint is irrelevant (Default/Hex).
558                    keys.push((format!("{}:{}.{}", param.name, sname, key), RenderHint::Hex));
559                    field_idx += 1;
560                    if field_idx >= 16 {
561                        break;
562                    }
563                }
564            }
565        } else if !param.auto_fields.is_empty() {
566            let tname = param.type_name.as_deref().unwrap_or("void");
567            for (fname, _, hint) in &param.auto_fields {
568                keys.push((format!("{}:{}.{}", param.name, tname, fname), *hint));
569                field_idx += 1;
570                if field_idx >= 16 {
571                    break;
572                }
573            }
574        } else if !param.is_ptr {
575            keys.push((
576                format!("{}:val.{}", param.name, param.name),
577                RenderHint::Hex,
578            ));
579            field_idx += 1;
580        }
581    }
582
583    keys
584}
585
586/// Detect which param (if any) is a char * string.
587/// Uses BTF type detection first, then name heuristic as fallback.
588/// Returns 0xff if none found.
589fn detect_str_param(btf_func: &BtfFunc) -> u8 {
590    let max = btf_func.params.len().min(6);
591    // BTF-based: check is_string_ptr flag set by parse_btf_functions.
592    for (i, p) in btf_func.params[..max].iter().enumerate() {
593        if p.is_string_ptr {
594            return i as u8;
595        }
596    }
597    // Name heuristic fallback.
598    for (i, p) in btf_func.params[..max].iter().enumerate() {
599        if !p.is_ptr || p.struct_name.is_some() {
600            continue;
601        }
602        let n = p.name.as_str();
603        if matches!(n, "fmt" | "msg" | "str" | "reason" | "buf" | "s")
604            || n.contains("str")
605            || n.contains("msg")
606            || n.contains("fmt")
607        {
608            return i as u8;
609        }
610    }
611    0xff
612}
613
614/// Pre-open BPF program FDs while the scheduler is alive.
615///
616/// Returns a map from `bpf_prog_id` to owned fd. Holding these FDs
617/// keeps the BPF programs alive via kernel refcounting even after the
618/// scheduler exits. Must be called before the test function runs
619/// (which may crash the scheduler).
620pub fn open_bpf_prog_fds(functions: &[StackFunction]) -> std::collections::HashMap<u32, i32> {
621    let mut fds = std::collections::HashMap::new();
622    for f in functions {
623        if let Some(prog_id) = f.bpf_prog_id {
624            let fd = unsafe { libbpf_rs::libbpf_sys::bpf_prog_get_fd_by_id(prog_id) };
625            if fd >= 0 {
626                fds.insert(prog_id, fd);
627            }
628        }
629    }
630    fds
631}
632
633/// `&ProgramMut<'_>` newtype that asserts thread-shared access is
634/// safe for [`libbpf_rs::ProgramMut::attach_kprobe`].
635///
636/// `ProgramMut` holds a `NonNull<bpf_program>` which keeps it
637/// `!Sync` at the Rust type-system level. Concurrent attach is
638/// nonetheless sound:
639/// - `bpf_program__attach_kprobe_opts` (the libbpf C path
640///   `attach_kprobe` calls) takes `const struct bpf_program *prog` —
641///   it does not mutate the program through that pointer.
642/// - Each attach call creates an independent `perf_event_open` fd
643///   plus an independent `BPF_LINK_CREATE` syscall; those resources
644///   are not shared with the program object.
645/// - The only program-adjacent state the path touches is the global
646///   feature-detection cache via `kernel_supports` →
647///   `feat_supported`, which uses `READ_ONCE` / `WRITE_ONCE` and is
648///   idempotent across racing readers (see
649///   `libbpf-sys/libbpf/src/features.c::feat_supported`).
650///   All threads call only `attach_kprobe` on the inner reference; no
651///   mutating method is invoked here.
652///
653/// `Send` is paired with `Sync` because `std::thread::scope` requires
654/// the captured reference's referent to be `Sync` for `&_` to be
655/// `Send`.
656struct AttachableProgRef<'a, 'obj> {
657    inner: &'a libbpf_rs::ProgramMut<'obj>,
658}
659// SAFETY: see the type-level doc.
660unsafe impl Send for AttachableProgRef<'_, '_> {}
661// SAFETY: see the type-level doc.
662unsafe impl Sync for AttachableProgRef<'_, '_> {}
663
664/// Attach `prog` as a kprobe to every name in `func_names`,
665/// parallelising across worker threads.
666///
667/// Each attach is an independent `perf_event_open` + `BPF_LINK_CREATE`
668/// syscall pair. Sequentialised, the per-attach syscall round-trip
669/// dominates the auto-repro setup phase when the crash backtrace
670/// names many kernel functions. Spawning a small worker pool lets
671/// the kernel run the attaches concurrently and shrinks the total
672/// wall-clock cost of the loop to roughly `total / parallelism`
673/// (bounded by kernel-side serialisation inside `perf_event_open` for
674/// kprobe registration).
675///
676/// Worker count is `min(func_names.len(), 8)` — a fixed cap that
677/// matches the typical small ktstr backtrace width while avoiding
678/// thread-spawn overhead when there are only a handful of probes.
679/// Returns one `(name, Result<Link, libbpf_rs::Error>)` entry per
680/// input in input order, so the caller can populate
681/// [`ProbeDiagnostics::kprobe_attach_failed`] / `links` with the
682/// same shape as the prior sequential loop.
683fn parallel_attach_kprobes<'a, 'obj>(
684    prog: &'a libbpf_rs::ProgramMut<'obj>,
685    func_names: &[String],
686) -> Vec<(String, libbpf_rs::Result<libbpf_rs::Link>)> {
687    if func_names.is_empty() {
688        return Vec::new();
689    }
690
691    let prog_ref = AttachableProgRef { inner: prog };
692    // `min(N, 8)` balances syscall parallelism against thread-spawn
693    // overhead. Empirically the kernel kprobe registration path
694    // serialises on `kprobe_mutex` so going wider yields diminishing
695    // returns; the cap keeps us comfortably under that wall.
696    const MAX_WORKERS: usize = 8;
697    let workers = func_names.len().min(MAX_WORKERS);
698
699    // Slot the function names into round-robin per-worker buckets
700    // so each thread owns a disjoint subset. Index-tagged so the
701    // output preserves input order regardless of which worker
702    // finishes first.
703    let mut buckets: Vec<Vec<(usize, String)>> = (0..workers).map(|_| Vec::new()).collect();
704    for (i, name) in func_names.iter().enumerate() {
705        buckets[i % workers].push((i, name.clone()));
706    }
707
708    let mut results: Vec<Option<(String, libbpf_rs::Result<libbpf_rs::Link>)>> =
709        (0..func_names.len()).map(|_| None).collect();
710
711    std::thread::scope(|s| {
712        let mut handles = Vec::with_capacity(workers);
713        for bucket in buckets {
714            if bucket.is_empty() {
715                continue;
716            }
717            let prog_ref = &prog_ref;
718            handles.push(s.spawn(move || {
719                let mut out: Vec<(usize, String, libbpf_rs::Result<libbpf_rs::Link>)> =
720                    Vec::with_capacity(bucket.len());
721                for (i, name) in bucket {
722                    let r = prog_ref.inner.attach_kprobe(false, &name);
723                    out.push((i, name, r));
724                }
725                out
726            }));
727        }
728        for h in handles {
729            // Worker panics propagate as the join Err — re-panic on
730            // the main thread to surface bugs rather than silently
731            // dropping attach results.
732            let out = h.join().expect("kprobe attach worker panicked");
733            for (i, name, r) in out {
734                results[i] = Some((name, r));
735            }
736        }
737    });
738
739    results
740        .into_iter()
741        .map(|o| o.expect("every input slot must be filled by exactly one worker"))
742        .collect()
743}
744
745/// Attach the fentry program in slot 0..=3 on the fentry skeleton.
746///
747/// The fentry skeleton exposes four indexed programs
748/// (`ktstr_fentry_0`..`ktstr_fentry_3`) matching the 4-slot batch
749/// model in `src/bpf/fentry_probe.bpf.c`. Call sites previously spelled the
750/// full 4-arm `match slot { ... }` inline; routing through this
751/// family of helpers keeps the dispatch in one place so a future
752/// slot addition is a one-line change per helper instead of
753/// scattered across every batch.
754///
755/// Returns `None` for slot indices outside 0..=3, matching the
756/// existing `continue;` behaviour at call sites.
757fn attach_fentry_by_slot(
758    skel: &crate::bpf_skel::fentry::FentryProbeSkel<'_>,
759    slot: usize,
760) -> Option<libbpf_rs::Result<libbpf_rs::Link>> {
761    Some(match slot {
762        0 => skel.progs.ktstr_fentry_0.attach_trace(),
763        1 => skel.progs.ktstr_fentry_1.attach_trace(),
764        2 => skel.progs.ktstr_fentry_2.attach_trace(),
765        3 => skel.progs.ktstr_fentry_3.attach_trace(),
766        _ => return None,
767    })
768}
769
770/// Attach the fexit program in slot 0..=3 on the fentry skeleton.
771/// Sibling of [`attach_fentry_by_slot`]; see its doc for the routing
772/// rationale.
773fn attach_fexit_by_slot(
774    skel: &crate::bpf_skel::fentry::FentryProbeSkel<'_>,
775    slot: usize,
776) -> Option<libbpf_rs::Result<libbpf_rs::Link>> {
777    Some(match slot {
778        0 => skel.progs.ktstr_fexit_0.attach_trace(),
779        1 => skel.progs.ktstr_fexit_1.attach_trace(),
780        2 => skel.progs.ktstr_fexit_2.attach_trace(),
781        3 => skel.progs.ktstr_fexit_3.attach_trace(),
782        _ => return None,
783    })
784}
785
786/// Borrow the open fentry program in slot 0..=3 for pre-load
787/// configuration (`set_attach_target`, `set_autoload`).
788///
789/// Pre-load sibling of [`attach_fentry_by_slot`]: operates on
790/// [`OpenFentryProbeSkel`] before [`OpenSkel::load`] consumes it.
791/// Returns `None` for slot indices outside 0..=3, matching the
792/// existing `continue;` behaviour at call sites.
793///
794/// [`OpenFentryProbeSkel`]: crate::bpf_skel::fentry::OpenFentryProbeSkel
795/// [`OpenSkel::load`]: libbpf_rs::skel::OpenSkel::load
796fn fentry_prog_mut_by_slot<'a, 'obj>(
797    open_skel: &'a mut crate::bpf_skel::fentry::OpenFentryProbeSkel<'obj>,
798    slot: usize,
799) -> Option<&'a mut libbpf_rs::OpenProgramMut<'obj>> {
800    Some(match slot {
801        0 => &mut open_skel.progs.ktstr_fentry_0,
802        1 => &mut open_skel.progs.ktstr_fentry_1,
803        2 => &mut open_skel.progs.ktstr_fentry_2,
804        3 => &mut open_skel.progs.ktstr_fentry_3,
805        _ => return None,
806    })
807}
808
809/// Borrow the open fexit program in slot 0..=3 for pre-load
810/// configuration. Pre-load sibling of [`attach_fexit_by_slot`];
811/// see [`fentry_prog_mut_by_slot`] for the routing rationale.
812fn fexit_prog_mut_by_slot<'a, 'obj>(
813    open_skel: &'a mut crate::bpf_skel::fentry::OpenFentryProbeSkel<'obj>,
814    slot: usize,
815) -> Option<&'a mut libbpf_rs::OpenProgramMut<'obj>> {
816    Some(match slot {
817        0 => &mut open_skel.progs.ktstr_fexit_0,
818        1 => &mut open_skel.progs.ktstr_fexit_1,
819        2 => &mut open_skel.progs.ktstr_fexit_2,
820        3 => &mut open_skel.progs.ktstr_fexit_3,
821        _ => return None,
822    })
823}
824
825/// Disable autoload on both the fentry and fexit programs for
826/// slot 0..=3 so the verifier skips them at
827/// [`OpenSkel::load`][libbpf_rs::skel::OpenSkel::load]. Used for
828/// unused batch slots and slots whose
829/// [`set_attach_target`][libbpf_rs::OpenProgramMut::set_attach_target]
830/// call failed — either leaves the skeleton with placeholder
831/// targets the verifier would reject.
832///
833/// No-op for slot indices outside 0..=3.
834fn disable_slot_programs(
835    open_skel: &mut crate::bpf_skel::fentry::OpenFentryProbeSkel<'_>,
836    slot: usize,
837) {
838    if let Some(p) = fentry_prog_mut_by_slot(open_skel, slot) {
839        p.set_autoload(false);
840    }
841    if let Some(p) = fexit_prog_mut_by_slot(open_skel, slot) {
842        p.set_autoload(false);
843    }
844}
845
846/// Write the per-slot rodata fields (`ktstr_fentry_func_idx_N`,
847/// `ktstr_fentry_is_kernel_N`) for slot 0..=3. Mirrors the BPF
848/// side's positional `rodata` layout in `src/bpf/fentry_probe.bpf.c`.
849///
850/// No-op for slot indices outside 0..=3.
851fn set_rodata_slot(
852    rodata: &mut crate::bpf_skel::fentry::types::rodata,
853    slot: usize,
854    idx: u32,
855    is_kernel: bool,
856) {
857    let k = is_kernel as u8;
858    match slot {
859        0 => {
860            rodata.ktstr_fentry_func_idx_0 = idx;
861            rodata.ktstr_fentry_is_kernel_0 = k;
862        }
863        1 => {
864            rodata.ktstr_fentry_func_idx_1 = idx;
865            rodata.ktstr_fentry_is_kernel_1 = k;
866        }
867        2 => {
868            rodata.ktstr_fentry_func_idx_2 = idx;
869            rodata.ktstr_fentry_is_kernel_2 = k;
870        }
871        3 => {
872            rodata.ktstr_fentry_func_idx_3 = idx;
873            rodata.ktstr_fentry_is_kernel_3 = k;
874        }
875        _ => {}
876    }
877}
878
879/// Causal-task filter for the trigger event's `task_ptr` (sourced
880/// from `args[0]`).
881///
882/// The BPF `ktstr_trigger_tp` handler sets `args[0]` to
883/// `bpf_get_current_task()` only for `SCX_EXIT_ERROR_BPF` (where
884/// `current` IS the causal task); for `SCX_EXIT_ERROR` (kworker /
885/// sysrq context) it emits `args[0] == 0`. A zero `task_ptr` means
886/// "no causal task" and must suppress probe stitching, so this
887/// returns `None` for 0 and `Some(p)` otherwise. `run_probe_skeleton`
888/// calls this on the last trigger event; the `args0_*` tests pin both
889/// sides of the contract against this exact predicate.
890fn causal_tptr(task_ptr: u64) -> Option<u64> {
891    Some(task_ptr).filter(|&p| p != 0)
892}
893
894/// Run the BPF probe skeleton for auto-repro.
895///
896/// Operates in two modes depending on `phase_b_rx`:
897///
898/// **Single-phase (`phase_b_rx = None`):** loads the kprobe skeleton
899/// and fentry/fexit skeleton together, attaches all probes, then
900/// polls until the trigger fires.
901///
902/// **Two-phase (`phase_b_rx = Some`):** Phase A attaches kprobes +
903/// kernel fexit + the `tp_btf/sched_ext_exit` trigger before the
904/// scheduler starts, signals `ready`, then polls the ring buffer
905/// while waiting for Phase B input via the channel. When Phase B
906/// input arrives, attaches fentry/fexit to BPF struct_ops callbacks
907/// and additional kprobes for kernel callers. Signals `done` on the
908/// `PhaseBInput` when Phase B attachment completes. If the trigger
909/// fires before Phase B input arrives, fentry is skipped — the
910/// crash happened before BPF programs could be probed.
911///
912/// The trigger fires on `sched_ext_exit` inside `scx_claim_exit()`
913/// — exactly once per scheduler lifetime. If the tracepoint is
914/// unavailable, auto-repro is skipped.
915///
916/// Returns accumulated func_names from both phases as the third
917/// tuple element.
918pub fn run_probe_skeleton(
919    functions: &[StackFunction],
920    btf_funcs: &[BtfFunc],
921    stop: &AtomicBool,
922    bpf_prog_fds: &std::collections::HashMap<u32, i32>,
923    ready: &Latch,
924    phase_b_rx: Option<std::sync::mpsc::Receiver<PhaseBInput>>,
925) -> (
926    Option<Vec<ProbeEvent>>,
927    ProbeDiagnostics,
928    Vec<(u32, String)>,
929) {
930    use crate::bpf_skel::*;
931    use libbpf_rs::skel::{OpenSkel, SkelBuilder};
932    use libbpf_rs::{Link, MapCore, MapFlags, RingBufferBuilder};
933
934    tracing::debug!(n = functions.len(), "run_probe_skeleton");
935
936    let mut diag = ProbeDiagnostics::default();
937
938    // Open skeleton. Two MaybeUninit slots: the first backs the
939    // initial load attempt; the second backs the fallback retry when
940    // optional programs cause ESRCH. Both must outlive `skel`.
941    let mut open_object = std::mem::MaybeUninit::uninit();
942    let mut open_object_fallback = std::mem::MaybeUninit::uninit();
943    let builder = ProbeSkelBuilder::default();
944    let mut open_skel = match builder.open(&mut open_object) {
945        Ok(s) => s,
946        Err(e) => {
947            tracing::error!(%e, "probe skeleton open failed");
948            diag.trigger_attach_error = Some(format!("skeleton open: {e}"));
949            ready.set();
950            return (None, diag, Vec::new());
951        }
952    };
953
954    // Enable probes (must set before load — rodata is immutable after)
955    if let Some(rodata) = open_skel.maps.rodata_data.as_mut() {
956        rodata.ktstr_enabled = true;
957    }
958
959    // Load skeleton. Try with all programs first; if a missing tp_btf
960    // target causes ESRCH, re-open with optional programs disabled.
961    // The fallback unconditionally fires on any load error, not
962    // strictly ESRCH — libbpf doesn't surface a stable errno through
963    // its Error type, so an exact ESRCH match is brittle. The retry
964    // is cheap (re-open + load with autoload disabled on the optional
965    // set), so the broader gate is acceptable. When the retry ALSO
966    // fails, we surface BOTH errors so the operator sees the original
967    // root cause alongside the retry failure — a verifier rejection
968    // on a non-optional program would otherwise be masked by the
969    // retry's unrelated error.
970    let (skel, optional_programs_loaded) = match open_skel.load() {
971        Ok(s) => (s, true),
972        Err(first_err) => {
973            tracing::debug!(
974                %first_err,
975                "probe skeleton load failed with all programs; \
976                 retrying with optional programs disabled"
977            );
978            let builder2 = ProbeSkelBuilder::default();
979            let mut open_skel2 = match builder2.open(&mut open_object_fallback) {
980                Ok(s) => s,
981                Err(e) => {
982                    tracing::error!(%e, "probe skeleton re-open failed");
983                    diag.trigger_attach_error = Some(format!(
984                        "skeleton open (retry): {e}; original load error: {first_err}"
985                    ));
986                    ready.set();
987                    return (None, diag, Vec::new());
988                }
989            };
990            if let Some(rodata) = open_skel2.maps.rodata_data.as_mut() {
991                rodata.ktstr_enabled = true;
992            }
993            open_skel2.progs.ktstr_pi_fentry.set_autoload(false);
994            open_skel2.progs.ktstr_pi_fexit.set_autoload(false);
995            open_skel2.progs.ktstr_lock_contend.set_autoload(false);
996            open_skel2
997                .progs
998                .ktstr_preempt_disable_tp
999                .set_autoload(false);
1000            open_skel2.progs.ktstr_preempt_enable_tp.set_autoload(false);
1001            match open_skel2.load() {
1002                Ok(s) => (s, false),
1003                Err(e) => {
1004                    tracing::error!(
1005                        %e, %first_err,
1006                        "probe skeleton load failed (retry); \
1007                         surfacing both original and retry errors"
1008                    );
1009                    diag.trigger_attach_error = Some(format!(
1010                        "skeleton load (retry): {e}; original error before retry: {first_err}"
1011                    ));
1012                    ready.set();
1013                    return (None, diag, Vec::new());
1014                }
1015            }
1016        }
1017    };
1018
1019    // Populate func_meta_map with function IPs and metadata
1020    let mut func_ips: Vec<(u32, u64, String)> = Vec::new(); // (idx, ip, display_name)
1021    let mut bpf_funcs: Vec<(u32, &StackFunction)> = Vec::new(); // BPF functions for fentry
1022
1023    // Load vmlinux BTF once and reuse across every kprobe meta
1024    // population in the loop below. The previous code called
1025    // `resolve_field_specs(_, None)` per function, which re-parsed
1026    // the multi-MB vmlinux BTF on every iteration (>1 s per kernel
1027    // with thousands of kprobed functions). Loading once turns the
1028    // hot path into pure type lookups against a borrowed handle.
1029    // `None` (load failure) leaves `cached_btf` empty and downstream
1030    // call sites fall back to the no-BTF path — same behaviour as
1031    // the previous per-call `Err(...) -> Vec::new()` branch.
1032    //
1033    // `cached_vmlinux_btf` memoises the first successful parse
1034    // process-wide so repeated auto-repro cycles in the same nextest
1035    // process share one `Arc<Btf>` instead of re-reading and
1036    // re-parsing the multi-MB blob each time.
1037    let cached_btf = crate::monitor::btf_offsets::cached_vmlinux_btf();
1038
1039    for (idx, func) in functions.iter().enumerate() {
1040        if func.is_bpf {
1041            bpf_funcs.push((idx as u32, func));
1042            continue;
1043        }
1044        let ip = match resolve_func_ip(&func.raw_name) {
1045            Some(ip) => ip,
1046            None => {
1047                tracing::warn!(func = %func.raw_name, "could not resolve function IP");
1048                diag.kprobe_resolve_failed.push(func.raw_name.clone());
1049                continue;
1050            }
1051        };
1052
1053        let mut meta = types::func_meta {
1054            func_idx: idx as u32,
1055            ..Default::default()
1056        };
1057
1058        // Populate field specs from BTF-resolved offsets.
1059        if let Some(btf_func) = btf_funcs.iter().find(|f| f.name == func.raw_name) {
1060            let field_specs = match cached_btf.as_ref() {
1061                Some(btf) => super::btf::resolve_field_specs_with_btf(btf_func, btf),
1062                None => Vec::new(),
1063            };
1064            populate_field_specs(&mut meta, &field_specs);
1065            // Detect char * params for string capture.
1066            meta.str_param_idx = detect_str_param(btf_func);
1067        }
1068
1069        let key_bytes = ip.to_ne_bytes();
1070        let meta_bytes = unsafe {
1071            std::slice::from_raw_parts(
1072                &meta as *const _ as *const u8,
1073                std::mem::size_of::<types::func_meta>(),
1074            )
1075        };
1076
1077        if let Err(e) = skel
1078            .maps
1079            .func_meta_map
1080            .update(&key_bytes, meta_bytes, MapFlags::ANY)
1081        {
1082            tracing::warn!(%e, func = %func.raw_name, "failed to update func_meta_map");
1083            continue;
1084        }
1085
1086        tracing::debug!(func = %func.raw_name, ip, nr = meta.nr_field_specs, "kprobe meta");
1087        diag.kprobe_resolved += 1;
1088        func_ips.push((idx as u32, ip, func.display_name.clone()));
1089    }
1090
1091    if func_ips.is_empty() && bpf_funcs.is_empty() && phase_b_rx.is_none() {
1092        tracing::warn!("no kprobe IPs resolved and no BPF functions for fentry");
1093        diag.trigger_attach_error =
1094            Some("no functions resolved — kprobes and trigger skipped".to_string());
1095        ready.set();
1096        return (None, diag, Vec::new());
1097    }
1098    if func_ips.is_empty() && (phase_b_rx.is_some() || !bpf_funcs.is_empty()) {
1099        tracing::debug!("no kernel functions resolved to IPs, proceeding with fentry only");
1100    }
1101
1102    // Attach kprobes to each function for entry capture. Exit capture
1103    // for kernel functions uses fexit via the fentry skeleton (batched
1104    // separately below with fd=0 for vmlinux BTF).
1105    //
1106    // Parallelised via [`parallel_attach_kprobes`]: each attach is an
1107    // independent `perf_event_open` + `BPF_LINK_CREATE` syscall pair,
1108    // and the sequential loop's round-trip cost dominated the auto-
1109    // repro Phase A setup time when the crash backtrace named many
1110    // functions. Worker pool runs them concurrently; results land
1111    // back in the original input order so the post-attach
1112    // `kprobe_attach_failed` / `links` shape matches the prior loop.
1113    let mut links: Vec<(Link, String)> = Vec::new();
1114    let attach_input: Vec<String> = func_ips
1115        .iter()
1116        .map(|(idx, _, _)| functions[*idx as usize].raw_name.clone())
1117        .collect();
1118    for (raw, result) in parallel_attach_kprobes(&skel.progs.ktstr_probe, &attach_input) {
1119        match result {
1120            Ok(link) => {
1121                links.push((link, raw));
1122            }
1123            Err(e) => {
1124                tracing::warn!(%e, func = %raw, "kprobe attach failed");
1125                diag.kprobe_attach_failed.push((raw, e.to_string()));
1126            }
1127        }
1128    }
1129    diag.kprobe_attached = links.len() as u32;
1130    tracing::debug!(attached = links.len(), total = func_ips.len(), "kprobes");
1131
1132    // Attach fentry+fexit for BPF callbacks and kernel functions.
1133    // Batched in groups of FENTRY_BATCH per skeleton load to reduce
1134    // verifier passes. BPF callbacks use prog FD + sentinel IP.
1135    // Kernel functions use fd=0 (vmlinux BTF) + real IP.
1136    const FENTRY_BATCH: usize = 4;
1137    let mut fentry_links: Vec<Link> = Vec::new();
1138    let mut fexit_links: Vec<Link> = Vec::new();
1139
1140    struct FentryTarget<'a> {
1141        slot: usize,
1142        fd: i32,
1143        idx: u32,
1144        name: &'a str,
1145        ok: bool,
1146        is_kernel: bool,
1147    }
1148
1149    // Build combined list of targets: BPF callbacks + kernel functions.
1150    let valid_bpf: Vec<_> = bpf_funcs
1151        .iter()
1152        .filter(|(_, f)| f.bpf_prog_id.is_some())
1153        .collect();
1154    diag.fentry_candidates = valid_bpf.len() as u32;
1155
1156    // Kernel functions that were attached via kprobe also get fentry+fexit
1157    // for exit capture. fd=0 targets vmlinux BTF.
1158    struct KernelFentryTarget {
1159        idx: u32,
1160        name: String,
1161    }
1162    let kernel_fexit_targets: Vec<KernelFentryTarget> = func_ips
1163        .iter()
1164        .map(|(idx, _, name)| KernelFentryTarget {
1165            idx: *idx,
1166            name: name.clone(),
1167        })
1168        .collect();
1169
1170    // Interleave: process BPF targets first, then kernel targets.
1171    // Each gets batched into the fentry skeleton in groups of 4.
1172    //
1173    // When phase_b_rx is Some (Phase A/B split), BPF callback fentry
1174    // attachment is deferred to Phase B after the scheduler starts.
1175    // Only kernel fexit (fd=0) and kprobes run in Phase A.
1176
1177    // --- BPF callback batches (skip in Phase A when split is active) ---
1178    if phase_b_rx.is_none() {
1179        for chunk in valid_bpf.chunks(FENTRY_BATCH) {
1180            let mut targets: Vec<FentryTarget<'_>> = Vec::new();
1181            for (slot, (idx, func)) in chunk.iter().enumerate() {
1182                let prog_id = func.bpf_prog_id.unwrap();
1183                let fd = if let Some(&pre_fd) = bpf_prog_fds.get(&prog_id) {
1184                    let dup_fd = unsafe { libc::dup(pre_fd) };
1185                    if dup_fd < 0 {
1186                        tracing::warn!(prog_id, func = %func.display_name, "fentry: dup failed");
1187                        diag.fentry_attach_failed.push((
1188                            func.display_name.clone(),
1189                            format!("dup(pre_fd={pre_fd}) failed"),
1190                        ));
1191                        continue;
1192                    }
1193                    dup_fd
1194                } else {
1195                    let fd = unsafe { libbpf_rs::libbpf_sys::bpf_prog_get_fd_by_id(prog_id) };
1196                    if fd < 0 {
1197                        tracing::warn!(prog_id, func = %func.display_name, "fentry: failed to get fd");
1198                        diag.fentry_attach_failed.push((
1199                            func.display_name.clone(),
1200                            format!("bpf_prog_get_fd_by_id({prog_id}) returned {fd}"),
1201                        ));
1202                        continue;
1203                    }
1204                    fd
1205                };
1206                targets.push(FentryTarget {
1207                    slot,
1208                    fd,
1209                    idx: *idx,
1210                    name: &func.display_name,
1211                    ok: false,
1212                    is_kernel: false,
1213                });
1214            }
1215            if targets.is_empty() {
1216                continue;
1217            }
1218
1219            use crate::bpf_skel::fentry::*;
1220            let mut fentry_open_obj = std::mem::MaybeUninit::uninit();
1221            let fentry_builder = FentryProbeSkelBuilder::default();
1222            let mut fentry_open = match fentry_builder.open(&mut fentry_open_obj) {
1223                Ok(s) => s,
1224                Err(e) => {
1225                    tracing::warn!(%e, "fentry skeleton open failed");
1226                    for t in &targets {
1227                        unsafe { libc::close(t.fd) };
1228                    }
1229                    continue;
1230                }
1231            };
1232
1233            // Set rodata: func_idx and is_kernel per slot.
1234            if let Some(rodata) = fentry_open.maps.rodata_data.as_mut() {
1235                rodata.ktstr_enabled = true;
1236                for t in &targets {
1237                    set_rodata_slot(rodata, t.slot, t.idx, t.is_kernel);
1238                }
1239            }
1240
1241            for t in targets.iter_mut() {
1242                // Set fentry attach target.
1243                let Some(fentry_prog) = fentry_prog_mut_by_slot(&mut fentry_open, t.slot) else {
1244                    continue;
1245                };
1246                match fentry_prog.set_attach_target(t.fd, Some(t.name.to_string())) {
1247                    Ok(()) => {
1248                        t.ok = true;
1249                        tracing::debug!(
1250                            slot = t.slot,
1251                            func = t.name,
1252                            "fentry: set_attach_target ok"
1253                        );
1254                    }
1255                    Err(e) => {
1256                        tracing::warn!(slot = t.slot, func = t.name, %e, "fentry: set_attach_target failed");
1257                        diag.fentry_attach_failed
1258                            .push((t.name.to_string(), format!("set_attach_target: {e}")));
1259                        continue;
1260                    }
1261                }
1262                // Set fexit attach target on the same function.
1263                let Some(fexit_prog) = fexit_prog_mut_by_slot(&mut fentry_open, t.slot) else {
1264                    continue;
1265                };
1266                if let Err(e) = fexit_prog.set_attach_target(t.fd, Some(t.name.to_string())) {
1267                    tracing::debug!(slot = t.slot, func = t.name, %e, "fexit: set_attach_target failed (entry-only)");
1268                    // Disable autoload so the verifier doesn't reject the
1269                    // skeleton due to a stale placeholder target.
1270                    fexit_prog.set_autoload(false);
1271                }
1272            }
1273
1274            if !targets.iter().any(|t| t.ok) {
1275                for t in &targets {
1276                    unsafe { libc::close(t.fd) };
1277                }
1278                continue;
1279            }
1280
1281            // Disable autoload on unused or failed fentry/fexit slots so the
1282            // verifier doesn't reject the placeholder target.
1283            let used_slots: std::collections::HashSet<usize> =
1284                targets.iter().filter(|t| t.ok).map(|t| t.slot).collect();
1285            for slot in 0..FENTRY_BATCH {
1286                if !used_slots.contains(&slot) {
1287                    disable_slot_programs(&mut fentry_open, slot);
1288                }
1289            }
1290            tracing::debug!(
1291                active = used_slots.len(),
1292                disabled = FENTRY_BATCH - used_slots.len(),
1293                "fentry: loading batch",
1294            );
1295            // Reuse the main skeleton's maps so fentry events land in the
1296            // same probe_data map that the Rust side reads.
1297            use std::os::unix::io::AsFd;
1298            if let Err(e) = fentry_open
1299                .maps
1300                .probe_data
1301                .reuse_fd(skel.maps.probe_data.as_fd())
1302            {
1303                tracing::warn!(%e, "fentry: probe_data reuse_fd failed");
1304            }
1305            if let Err(e) = fentry_open
1306                .maps
1307                .func_meta_map
1308                .reuse_fd(skel.maps.func_meta_map.as_fd())
1309            {
1310                tracing::warn!(%e, "fentry: func_meta_map reuse_fd failed");
1311            }
1312
1313            let fentry_skel = match fentry_open.load() {
1314                Ok(s) => {
1315                    tracing::debug!("fentry: batch load success");
1316                    for t in &targets {
1317                        unsafe { libc::close(t.fd) };
1318                    }
1319                    s
1320                }
1321                Err(e) => {
1322                    tracing::warn!(%e, "fentry: batch load failed");
1323                    for t in &targets {
1324                        if t.ok {
1325                            diag.fentry_attach_failed
1326                                .push((t.name.to_string(), format!("batch load: {e}")));
1327                        }
1328                        unsafe { libc::close(t.fd) };
1329                    }
1330                    continue;
1331                }
1332            };
1333
1334            // Populate func_meta and attach each slot.
1335            for t in &targets {
1336                if !t.ok {
1337                    continue;
1338                }
1339
1340                let sentinel_ip = (t.idx as u64) | (1u64 << 63);
1341                let mut meta = crate::bpf_skel::types::func_meta {
1342                    func_idx: t.idx,
1343                    ..Default::default()
1344                };
1345
1346                if let Some(btf_func) = btf_funcs.iter().find(|f| f.name == t.name) {
1347                    // Try vmlinux BTF first (for known struct params like
1348                    // task_struct and auto-discovered vmlinux fields),
1349                    // then BPF program BTF (for BPF-local types like task_ctx).
1350                    let mut field_specs = match cached_btf.as_ref() {
1351                        Some(btf) => super::btf::resolve_field_specs_with_btf(btf_func, btf),
1352                        None => Vec::new(),
1353                    };
1354                    if field_specs.is_empty()
1355                        && let Some(prog_id) = functions
1356                            .iter()
1357                            .find(|f| f.display_name == t.name)
1358                            .and_then(|f| f.bpf_prog_id)
1359                    {
1360                        field_specs = super::btf::resolve_bpf_field_specs(btf_func, prog_id);
1361                    }
1362                    populate_field_specs(&mut meta, &field_specs);
1363                    meta.str_param_idx = detect_str_param(btf_func);
1364                }
1365
1366                let Some(result) = attach_fentry_by_slot(&fentry_skel, t.slot) else {
1367                    continue;
1368                };
1369                let link = match result {
1370                    Ok(link) => {
1371                        tracing::debug!(func = t.name, "fentry attached");
1372                        link
1373                    }
1374                    Err(e) => {
1375                        tracing::warn!(%e, func = t.name, "fentry attach failed");
1376                        diag.fentry_attach_failed
1377                            .push((t.name.to_string(), e.to_string()));
1378                        continue;
1379                    }
1380                };
1381
1382                // func_meta_map.update + func_ips.push run AFTER the
1383                // fentry attach succeeds. Reversing the order would
1384                // orphan map entries and func_ip tuples on attach
1385                // failure — downstream reporting ("successfully
1386                // probed N funcs") would then show false-positive
1387                // successes for probes that never fired. If the
1388                // map update fails after the attach succeeded, drop
1389                // the Link (which detaches the program) so the
1390                // post-attach state matches what func_ips reports.
1391                let key_bytes = sentinel_ip.to_ne_bytes();
1392                let meta_bytes = unsafe {
1393                    std::slice::from_raw_parts(
1394                        &meta as *const _ as *const u8,
1395                        std::mem::size_of::<crate::bpf_skel::types::func_meta>(),
1396                    )
1397                };
1398                if let Err(e) =
1399                    skel.maps
1400                        .func_meta_map
1401                        .update(&key_bytes, meta_bytes, MapFlags::ANY)
1402                {
1403                    tracing::warn!(%e, func = t.name, "fentry: failed to update func_meta_map; dropping attached link");
1404                    drop(link);
1405                    continue;
1406                }
1407                fentry_links.push(link);
1408                func_ips.push((t.idx, sentinel_ip, t.name.to_string()));
1409                // Attach fexit for exit-side capture.
1410                let Some(fexit_result) = attach_fexit_by_slot(&fentry_skel, t.slot) else {
1411                    continue;
1412                };
1413                match fexit_result {
1414                    Ok(link) => {
1415                        tracing::debug!(func = t.name, "fexit attached");
1416                        fexit_links.push(link);
1417                    }
1418                    Err(e) => {
1419                        tracing::debug!(%e, func = t.name, "fexit attach failed (entry-only)");
1420                    }
1421                }
1422            }
1423
1424            drop(fentry_skel);
1425        }
1426        diag.fentry_attached = fentry_links.len() as u32;
1427        if !valid_bpf.is_empty() {
1428            tracing::debug!(
1429                fentry = fentry_links.len(),
1430                fexit = fexit_links.len(),
1431                total = valid_bpf.len(),
1432                "BPF probes",
1433            );
1434        }
1435    } // end if phase_b_rx.is_none() — BPF callback batches
1436
1437    // --- Kernel function fexit batches (fd=0 = vmlinux BTF) ---
1438    for chunk in kernel_fexit_targets.chunks(FENTRY_BATCH) {
1439        let mut targets: Vec<FentryTarget<'_>> = Vec::new();
1440        for (slot, kt) in chunk.iter().enumerate() {
1441            targets.push(FentryTarget {
1442                slot,
1443                fd: 0, // vmlinux BTF
1444                idx: kt.idx,
1445                name: &kt.name,
1446                ok: false,
1447                is_kernel: true,
1448            });
1449        }
1450
1451        use crate::bpf_skel::fentry::*;
1452        let mut fentry_open_obj = std::mem::MaybeUninit::uninit();
1453        let fentry_builder = FentryProbeSkelBuilder::default();
1454        let mut fentry_open = match fentry_builder.open(&mut fentry_open_obj) {
1455            Ok(s) => s,
1456            Err(e) => {
1457                tracing::warn!(%e, "kernel fexit skeleton open failed");
1458                continue;
1459            }
1460        };
1461
1462        if let Some(rodata) = fentry_open.maps.rodata_data.as_mut() {
1463            rodata.ktstr_enabled = true;
1464            for t in &targets {
1465                set_rodata_slot(rodata, t.slot, t.idx, t.is_kernel);
1466            }
1467        }
1468
1469        // For kernel fexit, we only need fexit programs — disable fentry
1470        // (entry capture is handled by the kprobe skeleton).
1471        for t in targets.iter_mut() {
1472            // Disable fentry for kernel functions (kprobe handles entry).
1473            let Some(fentry_prog) = fentry_prog_mut_by_slot(&mut fentry_open, t.slot) else {
1474                continue;
1475            };
1476            fentry_prog.set_autoload(false);
1477
1478            // Set fexit attach target with fd=0 (vmlinux BTF).
1479            let Some(fexit_prog) = fexit_prog_mut_by_slot(&mut fentry_open, t.slot) else {
1480                continue;
1481            };
1482            match fexit_prog.set_attach_target(0, Some(t.name.to_string())) {
1483                Ok(()) => {
1484                    t.ok = true;
1485                    tracing::debug!(
1486                        slot = t.slot,
1487                        func = t.name,
1488                        "kernel fexit: set_attach_target ok"
1489                    );
1490                }
1491                Err(e) => {
1492                    tracing::debug!(slot = t.slot, func = t.name, %e, "kernel fexit: set_attach_target failed");
1493                    fexit_prog.set_autoload(false);
1494                }
1495            }
1496        }
1497
1498        if !targets.iter().any(|t| t.ok) {
1499            continue;
1500        }
1501
1502        // Disable fexit for unused slots. Fentry for these slots was
1503        // left at its default by the `targets` loop above (which
1504        // disables fentry only for slots that have a target); no
1505        // attach_target was set for them either, so libbpf loads
1506        // them with a NULL target. Disabling fexit here keeps the
1507        // behaviour as it was before the slot helpers were
1508        // introduced.
1509        let used_slots: std::collections::HashSet<usize> =
1510            targets.iter().filter(|t| t.ok).map(|t| t.slot).collect();
1511        for slot in 0..FENTRY_BATCH {
1512            if !used_slots.contains(&slot)
1513                && let Some(p) = fexit_prog_mut_by_slot(&mut fentry_open, slot)
1514            {
1515                p.set_autoload(false);
1516            }
1517        }
1518
1519        // Reuse probe_data and func_meta_map from the main skeleton.
1520        use std::os::unix::io::AsFd;
1521        if let Err(e) = fentry_open
1522            .maps
1523            .probe_data
1524            .reuse_fd(skel.maps.probe_data.as_fd())
1525        {
1526            tracing::warn!(%e, "kernel fexit: probe_data reuse_fd failed");
1527        }
1528        if let Err(e) = fentry_open
1529            .maps
1530            .func_meta_map
1531            .reuse_fd(skel.maps.func_meta_map.as_fd())
1532        {
1533            tracing::warn!(%e, "kernel fexit: func_meta_map reuse_fd failed");
1534        }
1535
1536        let fentry_skel = match fentry_open.load() {
1537            Ok(s) => s,
1538            Err(e) => {
1539                tracing::warn!(%e, "kernel fexit: batch load failed");
1540                continue;
1541            }
1542        };
1543
1544        for t in &targets {
1545            if !t.ok {
1546                continue;
1547            }
1548            let Some(result) = attach_fexit_by_slot(&fentry_skel, t.slot) else {
1549                continue;
1550            };
1551            match result {
1552                Ok(link) => {
1553                    tracing::debug!(func = t.name, "kernel fexit attached");
1554                    fexit_links.push(link);
1555                }
1556                Err(e) => {
1557                    tracing::debug!(%e, func = t.name, "kernel fexit attach failed");
1558                }
1559            }
1560        }
1561
1562        drop(fentry_skel);
1563    }
1564    if !kernel_fexit_targets.is_empty() {
1565        tracing::debug!(
1566            fexit = fexit_links.len(),
1567            total = kernel_fexit_targets.len(),
1568            "kernel fexit probes",
1569        );
1570    }
1571
1572    // Attach trigger: tp_btf/sched_ext_exit fires inside
1573    // scx_claim_exit() in the context of the current task at exit time.
1574    match skel.progs.ktstr_trigger_tp.attach_trace() {
1575        Ok(link) => {
1576            tracing::debug!("trigger attached via tp_btf/sched_ext_exit");
1577            diag.trigger_type = "tp_btf".to_string();
1578            links.push((link, "tp_btf/sched_ext_exit".to_string()));
1579        }
1580        Err(e) => {
1581            let msg = format!("auto-repro requires kernel with sched_ext_exit tracepoint: {e}");
1582            tracing::error!(%msg, "trigger attach failed");
1583            diag.trigger_attach_error = Some(msg);
1584            ready.set();
1585            return (None, diag, Vec::new());
1586        }
1587    }
1588
1589    // Attach timeline programs (loaded by the skeleton but not
1590    // auto-attached — they need explicit attach_trace calls).
1591    match skel.progs.ktstr_tl_switch.attach_trace() {
1592        Ok(link) => links.push((link, "tp_btf/sched_switch".to_string())),
1593        Err(e) => tracing::warn!(%e, "timeline sched_switch attach failed"),
1594    }
1595    match skel.progs.ktstr_tl_migrate.attach_trace() {
1596        Ok(link) => links.push((link, "tp_btf/sched_migrate_task".to_string())),
1597        Err(e) => tracing::warn!(%e, "timeline sched_migrate_task attach failed"),
1598    }
1599    match skel.progs.ktstr_tl_wakeup.attach_trace() {
1600        Ok(link) => links.push((link, "tp_btf/sched_wakeup".to_string())),
1601        Err(e) => tracing::warn!(%e, "timeline sched_wakeup attach failed"),
1602    }
1603
1604    // Attach optional programs. When the first load succeeded (all
1605    // targets present), they were loaded and just need attachment.
1606    // When the fallback path ran, they were not loaded — attach
1607    // returns an error that we silently absorb.
1608    if optional_programs_loaded {
1609        if let Ok(link) = skel.progs.ktstr_pi_fentry.attach_trace() {
1610            links.push((link, "fentry/rt_mutex_setprio".to_string()));
1611        }
1612        if let Ok(link) = skel.progs.ktstr_pi_fexit.attach_trace() {
1613            links.push((link, "fexit/rt_mutex_setprio".to_string()));
1614        }
1615        if let Ok(link) = skel.progs.ktstr_lock_contend.attach_trace() {
1616            links.push((link, "tp_btf/contention_begin".to_string()));
1617        }
1618        if let Ok(link) = skel.progs.ktstr_preempt_disable_tp.attach_trace() {
1619            links.push((link, "tp_btf/preempt_disable".to_string()));
1620        }
1621        if let Ok(link) = skel.progs.ktstr_preempt_enable_tp.attach_trace() {
1622            links.push((link, "tp_btf/preempt_enable".to_string()));
1623        }
1624    }
1625
1626    // Set up ring buffer
1627    let events: std::sync::Arc<std::sync::Mutex<Vec<ProbeEvent>>> =
1628        std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
1629    let events_clone = events.clone();
1630    let triggered = std::sync::Arc::new(AtomicBool::new(false));
1631    let triggered_clone = triggered.clone();
1632
1633    // Ring buffer event layout matching probe_event in intf.h.
1634    // `str_val` + `has_str` + `str_param_idx` are kept in the wire
1635    // layout for ABI symmetry with `struct probe_entry` (the
1636    // kprobe-side hash map shares the field names) — the
1637    // EVENT_TRIGGER producer leaves them zeroed.
1638    #[repr(C)]
1639    struct RbEvent {
1640        type_: u32,
1641        tid: u32,
1642        func_idx: u32,
1643        ts: u64,
1644        args: [u64; 6],
1645        fields: [u64; 16],
1646        nr_fields: u32,
1647        kstack: [u64; 32],
1648        kstack_sz: u32,
1649        str_val: [u8; MAX_STR_LEN],
1650        has_str: u8,
1651        str_param_idx: u8,
1652    }
1653
1654    let mut rb_builder = RingBufferBuilder::new();
1655    if let Err(e) = rb_builder.add(&skel.maps.ktstr_events, move |data: &[u8]| {
1656        if data.len() < std::mem::size_of::<RbEvent>() {
1657            return 0;
1658        }
1659        let raw: &RbEvent = unsafe { &*(data.as_ptr() as *const RbEvent) };
1660
1661        if raw.type_ == EVENT_TRIGGER {
1662            triggered_clone.store(true, Ordering::Release);
1663
1664            let kstack_sz = (raw.kstack_sz as usize).min(32);
1665            let event = ProbeEvent {
1666                func_idx: 0,
1667                task_ptr: raw.args[0],
1668                ts: raw.ts,
1669                args: raw.args,
1670                fields: vec![],
1671                kstack: raw.kstack[..kstack_sz].to_vec(),
1672                str_val: None,
1673                exit_fields: vec![],
1674                exit_ts: None,
1675            };
1676
1677            events_clone.lock().unwrap().push(event);
1678        }
1679
1680        0
1681    }) {
1682        tracing::error!(%e, "failed to register ring buffer callback");
1683        ready.set();
1684        return (None, diag, Vec::new());
1685    }
1686
1687    let rb = match rb_builder.build() {
1688        Ok(rb) => rb,
1689        Err(e) => {
1690            tracing::error!(%e, "failed to build ring buffer");
1691            ready.set();
1692            return (None, diag, Vec::new());
1693        }
1694    };
1695
1696    // Enable is handled by the BPF program reading the volatile const.
1697    // Since we can't mutate rodata after load, the program starts enabled.
1698    // (ktstr_enabled defaults to false in BPF, but we always want probes
1699    // active once attached — remove the gate or set it before load.)
1700
1701    tracing::debug!(
1702        funcs = func_ips.len(),
1703        links = links.len(),
1704        trigger_type = %diag.trigger_type,
1705        "polling for probe data",
1706    );
1707
1708    // Signal Phase A probes attached (kprobes + kernel fexit +
1709    // trigger). When phase_b_rx is None, this means all probes.
1710    // When Some, BPF fentry is deferred to Phase B.
1711    ready.set();
1712
1713    // Phase B: receive BPF fentry targets and attach them while
1714    // polling the ring buffer. The channel is consumed once; after
1715    // that phase_b_done prevents re-checking.
1716    let mut phase_b_rx = phase_b_rx;
1717    let mut phase_b_done = false;
1718    // Accumulates BTF from Phase B functions so the readout phase
1719    // can resolve field keys for both Phase A and Phase B functions.
1720    let mut phase_b_btf: Vec<BtfFunc> = Vec::new();
1721
1722    // Poll until trigger fires or stop requested.  When stop is
1723    // signaled, iterate all probe_data entries instead of waiting
1724    // for the trigger.
1725    loop {
1726        let _ = rb.poll(Duration::from_millis(100));
1727
1728        // Check for Phase B input while polling.
1729        if !phase_b_done && let Some(ref rx) = phase_b_rx {
1730            match rx.try_recv() {
1731                Ok(pb) => {
1732                    tracing::debug!(
1733                        bpf_funcs = pb.functions.len(),
1734                        "Phase B: attaching BPF fentry/fexit"
1735                    );
1736                    // Save Phase B BTF for readout field key resolution.
1737                    phase_b_btf = pb.btf_funcs.clone();
1738                    // Attach Phase B probes: kernel callers (kprobes
1739                    // + kernel fexit) and BPF callbacks (fentry/fexit).
1740                    attach_phase_b_fentry(
1741                        &skel,
1742                        &pb,
1743                        &mut func_ips,
1744                        &mut fentry_links,
1745                        &mut fexit_links,
1746                        &mut links,
1747                        &mut diag,
1748                    );
1749                    pb.done.set();
1750                    phase_b_done = true;
1751                    // Drop the receiver to release the channel.
1752                    phase_b_rx = None;
1753                }
1754                Err(std::sync::mpsc::TryRecvError::Empty) => {
1755                    // No Phase B input yet; if trigger already fired,
1756                    // break immediately — the crash happened before
1757                    // Phase B could attach.
1758                    if triggered.load(Ordering::Acquire) {
1759                        tracing::debug!("trigger fired during Phase B wait, skipping fentry");
1760                        phase_b_done = true;
1761                        phase_b_rx = None;
1762                    }
1763                }
1764                Err(std::sync::mpsc::TryRecvError::Disconnected) => {
1765                    // Channel sender dropped without delivering Phase B
1766                    // input. Without a Phase B payload there is nothing
1767                    // left for this probe loop to attach; if `stop` is
1768                    // also set, return empty diagnostics rather than
1769                    // poll an empty ringbuf for the rest of the timeout.
1770                    tracing::debug!("Phase B channel disconnected");
1771                    phase_b_done = true;
1772                    phase_b_rx = None;
1773                    if stop.load(Ordering::Relaxed) {
1774                        return (None, diag, Vec::new());
1775                    }
1776                }
1777            }
1778        }
1779
1780        // Also check BSS err_exit_detected: stall exits skip the
1781        // ring buffer (bpf_get_current_task is unrelated to the
1782        // stall cause) but still latch the BSS flag. Volatile read
1783        // because the BPF program writes via the kernel-side mmap
1784        // and Rust's aliasing rules let the compiler hoist a normal
1785        // read out of the loop.
1786        let bss_triggered = skel.maps.bss_data.as_ref().is_some_and(|bss| unsafe {
1787            std::ptr::read_volatile(&bss.ktstr_err_exit_detected as *const u32) != 0
1788        });
1789        // Mirror the latch into the cross-thread atomic so scenario
1790        // code can synchronously classify SchedulerDied as Clean vs
1791        // Crashed without owning the skeleton. Monotonic 0→1→2 —
1792        // once a poll iteration completes we know the probe armed,
1793        // and the BSS latch only ever transitions 0→1 within a
1794        // single test run. Release pairs with the Acquire load in
1795        // `sched_exit_kind()`.
1796        PROBE_SCHED_EXIT_STATE.store(
1797            if bss_triggered {
1798                PROBE_EXIT_STATE_CRASHED
1799            } else {
1800                PROBE_EXIT_STATE_CLEAN
1801            },
1802            Ordering::Release,
1803        );
1804        // Snapshot `triggered` once: a second `triggered.load` for the
1805        // diag assignment would observe an unrelated state if a racing
1806        // trigger fires between the two reads, so the gate decision
1807        // and the recorded `trigger_fired` could disagree.
1808        let triggered_snapshot = triggered.load(Ordering::Acquire);
1809        if triggered_snapshot || bss_triggered || stop.load(Ordering::Acquire) {
1810            // Final ringbuf drain before breaking. BPF-side ordering:
1811            // the trigger handler does
1812            // `__sync_val_compare_and_swap(&ktstr_err_exit_detected, 0u, 1u)`
1813            // BEFORE `bpf_ringbuf_reserve` + `bpf_ringbuf_submit`
1814            // (see src/bpf/probe.bpf.c near line 687-731). A userspace
1815            // observer can therefore see `bss_triggered=true` (CAS
1816            // visible) while the ringbuf event is still in transit
1817            // (submit not yet visible to `rb.poll`). Without an
1818            // explicit drain here, breaking out of the loop on
1819            // `bss_triggered` would lose the trigger event — the
1820            // readout phase below would see `events.last() = None`
1821            // (or the prior probe's trailing event), drop into the
1822            // no-causal-tptr fallback path, and produce
1823            // grouped-by-frequency output instead of a verified
1824            // stitch even though the kernel did publish a real
1825            // causal task pointer.
1826            //
1827            // 100 ms is bounded by the same teardown budget the loop's
1828            // top-level `rb.poll(100ms)` uses; libbpf's
1829            // `RingBuffer::poll` returns as soon as events are
1830            // consumed (it uses epoll-edge under the hood), so the
1831            // worst case is one full timeout window when no event
1832            // arrives — acceptable since we already know the trigger
1833            // fired (bss_triggered is true).
1834            let _ = rb.poll(Duration::from_millis(100));
1835            diag.trigger_fired = triggered_snapshot || bss_triggered;
1836
1837            // Read BPF-side diagnostic counters from BSS. The hot
1838            // counters live in the per-CPU `ktstr_pcpu_counters`
1839            // 2D array (`[MAX_CPUS][KTSTR_PCPU_NR]`); each per-CPU
1840            // slot is a cacheline-aligned `pcpu_counter { long
1841            // value; }`. Sum across CPUs to recover the cumulative
1842            // count — see `enum ktstr_pcpu_idx` in
1843            // src/bpf/probe.bpf.c for the slot indices.
1844            //
1845            // Every read below uses [`std::ptr::read_volatile`].
1846            // The BSS struct is mapped to userspace via the BPF
1847            // map's mmap region; the BPF program writes through
1848            // its own kernel-side mapping concurrently with these
1849            // reads. Without `read_volatile`, the userspace
1850            // compiler is free to hoist the loads (Rust's
1851            // aliasing rules: the compiler does not know a
1852            // `&types::bss` reference is shared with a kernel
1853            // writer through an unrelated mapping), miss the
1854            // post-trigger updates, and leave the diagnostic
1855            // counters / miss log / first-trigger timestamp
1856            // showing pre-trigger zeroes. Mirrors the existing
1857            // `ktstr_err_exit_detected` `read_volatile` site
1858            // upstream — same hazard, same fix, applied to every
1859            // BPF-mutated field the diag block reads.
1860            if let Some(bss) = skel.maps.bss_data.as_ref() {
1861                // Slot indices must match `enum ktstr_pcpu_idx`. A
1862                // reorder in the BPF source breaks every reader; the
1863                // explicit constants here surface that drift at the
1864                // call site instead of silently aliasing two
1865                // counters.
1866                const PCPU_PROBE_COUNT: usize = 0;
1867                const PCPU_KPROBE_RETURNS: usize = 1;
1868                const PCPU_META_MISS: usize = 2;
1869                const PCPU_RINGBUF_DROPS: usize = 3;
1870                const PCPU_TIMELINE_COUNT: usize = 4;
1871                const PCPU_TIMELINE_DROPS: usize = 5;
1872                const PCPU_TRIGGER_COUNT: usize = 14;
1873                let counters = &bss.ktstr_pcpu_counters;
1874                // SAFETY: each `pcpu_counter::value` is a plain
1875                // 64-bit integer at a stable BSS offset; the BPF
1876                // side updates it via `__sync_add_and_fetch`
1877                // (atomic add). A volatile load reads whatever
1878                // the kernel-side mmap currently shows — torn
1879                // reads are impossible because aligned 64-bit
1880                // loads are atomic on every supported arch
1881                // (x86_64, aarch64). The volatile qualifier is
1882                // what prevents the compiler from hoisting the
1883                // load out of the `sum` reduction across the
1884                // outer poll loop.
1885                let sum_pcpu = |idx: usize| -> u64 {
1886                    counters
1887                        .iter()
1888                        .map(|cpu_slots| unsafe {
1889                            std::ptr::read_volatile(&cpu_slots[idx].value as *const _) as u64
1890                        })
1891                        .sum()
1892                };
1893                diag.bpf_kprobe_fires = sum_pcpu(PCPU_PROBE_COUNT);
1894                diag.bpf_kprobe_returns = sum_pcpu(PCPU_KPROBE_RETURNS);
1895                diag.bpf_trigger_fires = sum_pcpu(PCPU_TRIGGER_COUNT);
1896                diag.bpf_meta_misses = sum_pcpu(PCPU_META_MISS);
1897                // SAFETY: `ktstr_miss_log_idx` is a `u32` written
1898                // via `__sync_fetch_and_add` from the BPF side
1899                // (see `src/bpf/probe.bpf.c` near the
1900                // `ktstr_miss_log[idx] = ip;` line). Aligned u32
1901                // loads are atomic on x86_64/aarch64. The BPF
1902                // writer increments-then-stores; a volatile read
1903                // observes either the pre- or post-update value
1904                // — both are bounded by the array length, so the
1905                // subsequent `.min(ktstr_miss_log.len())` keeps
1906                // the slice safe even if the kernel-side write
1907                // races this read.
1908                let miss_idx = unsafe {
1909                    std::ptr::read_volatile(&bss.ktstr_miss_log_idx as *const u32) as usize
1910                };
1911                let n = miss_idx.min(bss.ktstr_miss_log.len());
1912                // Element-wise volatile reads of the miss-log
1913                // entries that fall within `n`. A bulk
1914                // `to_vec()` over the `bss.ktstr_miss_log[..n]`
1915                // slice would let the compiler vectorise the
1916                // copy and elide the volatile semantics; pulling
1917                // each `u64` through `read_volatile` keeps every
1918                // load ordered against the BPF-side write.
1919                //
1920                // SAFETY: each entry is a 64-bit IP value the
1921                // BPF writer stores after its CAS-like
1922                // increment of `ktstr_miss_log_idx`. Aligned
1923                // u64 loads are atomic on every supported
1924                // arch; the BPF write order
1925                // (increment of `ktstr_miss_log_idx` via
1926                // `__sync_fetch_and_add` BEFORE the store
1927                // `ktstr_miss_log[idx] = ip`) means a
1928                // volatile read of `[..miss_idx]` may cover a
1929                // slot whose index was already reserved but
1930                // whose store has not yet landed. We tolerate
1931                // that race: a stale-zero entry is harmless
1932                // diagnostic noise compared with the
1933                // alternative (compiler-hoisted loads of
1934                // pre-trigger zeroes).
1935                diag.bpf_miss_ips = (0..n)
1936                    .map(|i| unsafe {
1937                        std::ptr::read_volatile(&bss.ktstr_miss_log[i] as *const u64)
1938                    })
1939                    .collect();
1940                diag.bpf_ringbuf_drops = sum_pcpu(PCPU_RINGBUF_DROPS);
1941                // SAFETY: `ktstr_last_trigger_ts` is a `u64`
1942                // written by the BPF trigger handler via
1943                // `bpf_ktime_get_ns()` (see
1944                // `src/bpf/probe.bpf.c::ktstr_last_trigger_ts`).
1945                // Aligned u64 loads are atomic; the volatile
1946                // qualifier prevents hoisting across the outer
1947                // poll loop so the userspace reader observes the
1948                // post-trigger timestamp instead of a cached
1949                // pre-trigger zero.
1950                diag.bpf_first_trigger_ns =
1951                    unsafe { std::ptr::read_volatile(&bss.ktstr_last_trigger_ts as *const u64) };
1952                // SAFETY: `ktstr_exit_kind_snap` is a `u32` written
1953                // by the BPF trigger handler in the same publishing
1954                // CAS sequence as `ktstr_err_exit_detected` (see
1955                // `src/bpf/probe.bpf.c::ktstr_exit_kind_snap`).
1956                // Aligned u32 loads are atomic on every supported
1957                // arch; the volatile qualifier prevents the compiler
1958                // from hoisting the load across the outer poll loop
1959                // so userspace observes the post-trigger SCX_EXIT_*
1960                // value rather than the pre-trigger zero.
1961                diag.bpf_exit_kind_snap =
1962                    unsafe { std::ptr::read_volatile(&bss.ktstr_exit_kind_snap as *const u32) };
1963                diag.bpf_timeline_count = sum_pcpu(PCPU_TIMELINE_COUNT);
1964                diag.bpf_timeline_drops = sum_pcpu(PCPU_TIMELINE_DROPS);
1965            }
1966
1967            let key_size = std::mem::size_of::<types::probe_key>();
1968            let mut probe_events = Vec::new();
1969            let mut total_keys = 0u32;
1970            let mut unmatched_ips = 0u32;
1971
1972            // Build IP → (func_idx, display_name) lookup once. The
1973            // event-drain loop below would otherwise scan every entry
1974            // in `func_ips` per probe_data key — O(events × funcs).
1975            // With thousands of funcs and tens of thousands of events
1976            // on a normal run, the linear scan dominates dump time.
1977            // HashMap turns the per-event lookup into O(1).
1978            let func_ips_by_ip: std::collections::HashMap<u64, (u32, &str)> = func_ips
1979                .iter()
1980                .map(|(idx, ip, name)| (*ip, (*idx, name.as_str())))
1981                .collect();
1982
1983            // Pre-compute per-function `field_keys_hints` once. The
1984            // previous code recomputed `build_field_keys` for every
1985            // event, even when many events share the same function —
1986            // O(events × funcs) field-key construction. Building the
1987            // map keyed by function name turns the per-event work
1988            // into a single HashMap lookup.
1989            let field_keys_by_func: std::collections::HashMap<&str, Vec<(String, RenderHint)>> =
1990                btf_funcs
1991                    .iter()
1992                    .chain(phase_b_btf.iter())
1993                    .map(|f| (f.name.as_str(), build_field_keys(f)))
1994                    .collect();
1995
1996            for key_bytes in skel.maps.probe_data.keys() {
1997                if key_bytes.len() < key_size {
1998                    continue;
1999                }
2000                total_keys += 1;
2001                let key: &types::probe_key =
2002                    unsafe { &*(key_bytes.as_ptr() as *const types::probe_key) };
2003
2004                // Find which function this IP belongs to.
2005                let (func_idx, display_name) = match func_ips_by_ip.get(&key.func_ip) {
2006                    Some(&(idx, name)) => (idx, name),
2007                    None => {
2008                        unmatched_ips += 1;
2009                        continue;
2010                    }
2011                };
2012
2013                if let Ok(Some(val_bytes)) = skel.maps.probe_data.lookup(&key_bytes, MapFlags::ANY)
2014                {
2015                    let entry: &types::probe_entry =
2016                        unsafe { &*(val_bytes.as_ptr() as *const types::probe_entry) };
2017                    if entry.ts == 0 {
2018                        continue;
2019                    }
2020
2021                    // Borrow the pre-computed hints for this function;
2022                    // an empty slice for unknown funcs preserves the
2023                    // previous `unwrap_or_default()` behaviour.
2024                    let empty: Vec<(String, RenderHint)> = Vec::new();
2025                    let field_keys_hints: &Vec<(String, RenderHint)> =
2026                        field_keys_by_func.get(display_name).unwrap_or(&empty);
2027
2028                    let nr = (entry.nr_fields as usize).min(16);
2029                    let fields: Vec<(String, u64)> = entry.fields[..nr]
2030                        .iter()
2031                        .enumerate()
2032                        .filter_map(|(i, &val)| {
2033                            field_keys_hints.get(i).map(|(k, _)| (k.clone(), val))
2034                        })
2035                        .collect();
2036
2037                    let str_val = if entry.has_str != 0 {
2038                        let s = &entry.str_val;
2039                        let bytes: Vec<u8> = s.iter().map(|&b| b as u8).collect();
2040                        let len = bytes.iter().position(|&b| b == 0).unwrap_or(bytes.len());
2041                        let text = std::str::from_utf8(&bytes[..len]).unwrap_or("").to_string();
2042                        if text.is_empty() { None } else { Some(text) }
2043                    } else {
2044                        None
2045                    };
2046
2047                    // Extract exit-side fields if fexit fired.
2048                    let (exit_fields, exit_ts) = if entry.has_exit != 0 {
2049                        let nr_exit = (entry.nr_exit_fields as usize).min(16);
2050                        let ef: Vec<(String, u64)> = entry.exit_fields[..nr_exit]
2051                            .iter()
2052                            .enumerate()
2053                            .filter_map(|(i, &val)| {
2054                                field_keys_hints.get(i).map(|(k, _)| (k.clone(), val))
2055                            })
2056                            .collect();
2057                        (ef, Some(entry.exit_ts))
2058                    } else {
2059                        (Vec::new(), None)
2060                    };
2061
2062                    probe_events.push(ProbeEvent {
2063                        func_idx,
2064                        task_ptr: key.task_ptr,
2065                        ts: entry.ts,
2066                        args: entry.args,
2067                        fields,
2068                        kstack: vec![],
2069                        str_val,
2070                        exit_fields,
2071                        exit_ts,
2072                    });
2073                }
2074            }
2075
2076            probe_events.sort_by_key(|e| e.ts);
2077
2078            diag.probe_data_keys = total_keys;
2079            diag.probe_data_unmatched_ips = unmatched_ips;
2080            diag.events_before_stitch = probe_events.len() as u32;
2081
2082            tracing::debug!(
2083                events = probe_events.len(),
2084                total_keys,
2085                unmatched_ips,
2086                "probe_data readout",
2087            );
2088
2089            if probe_events.is_empty() {
2090                return (None, diag, Vec::new());
2091            }
2092
2093            // Stitch by task_struct pointer. Build a map of func_idx ->
2094            // task_struct param index from BPF_OP_CALLERS and BTF, then
2095            // filter events to those referencing the same task_struct
2096            // pointer as the causal task.
2097            //
2098            // The args[0] assignment in ktstr_trigger_tp (the BPF
2099            // trigger handler) sets args[0] to
2100            // bpf_get_current_task() ONLY for
2101            // SCX_EXIT_ERROR_BPF (1025), where a BPF scheduler
2102            // callback faulted in the running task's context — so
2103            // `current` IS the causal task. For SCX_EXIT_ERROR
2104            // (1024), args[0] is 0 because that exit can fire from
2105            // kworker context (async unregistration, sysrq), where
2106            // `current` is the worker thread rather than the task
2107            // that triggered the exit. For SCX_EXIT_ERROR_STALL
2108            // (1026) the trigger handler returns early without
2109            // submitting an event at all (watchdog/timer context).
2110            // The filter below therefore drops args[0] == 0 to
2111            // suppress non-causal probe output: no causal task
2112            // means no useful stitch chain.
2113            let task_param_idx = build_task_param_idx(&func_ips, btf_funcs, &phase_b_btf);
2114
2115            // Extract tptr and kstack from the trigger event in one
2116            // lock acquisition. When the trigger did not fire (stop-
2117            // signaled) or the exit kind lacks a causal task, probe
2118            // output is suppressed.
2119            let (target_tptr, trigger_kstack) = {
2120                let guard = events.lock().unwrap();
2121                let tptr = guard.last().and_then(|e| causal_tptr(e.task_ptr));
2122                let kstack = guard.last().map(|e| e.kstack.clone()).unwrap_or_default();
2123                (tptr, kstack)
2124            };
2125
2126            let Some(tptr) = target_tptr else {
2127                // No causal task identified — the trigger fired with
2128                // args[0]==0 (kind=STALL or generic ERROR from kworker
2129                // context), or never fired at all (probe lifecycle
2130                // race, scheduler clean-exited).
2131                //
2132                // Best-effort fallback: instead of returning empty,
2133                // group the captured events by task_struct pointer
2134                // (resolved via task_param_idx for events with a
2135                // task_struct param, otherwise key.task_ptr from the
2136                // probe map) and pick the top-N most-frequent
2137                // pointers. The events that fired during the actual
2138                // crash window ARE causal data — the host renderer
2139                // will mark the output as "trigger absent —
2140                // best-effort grouped by frequency" so the operator
2141                // doesn't mistake the candidate chain for a verified
2142                // stitch. Cap at 3 candidates to keep the output
2143                // readable; one task usually dominates a single
2144                // crash window.
2145                use std::collections::HashMap;
2146                let mut counts: HashMap<u64, u32> = HashMap::new();
2147                for ev in &probe_events {
2148                    let key = if let Some(&pidx) = task_param_idx.get(&ev.func_idx) {
2149                        ev.args[pidx]
2150                    } else {
2151                        ev.task_ptr
2152                    };
2153                    if key != 0 {
2154                        *counts.entry(key).or_default() += 1;
2155                    }
2156                }
2157                if counts.is_empty() {
2158                    tracing::debug!(
2159                        "no causal tptr and no candidate task_ptrs — suppressing probe output"
2160                    );
2161                    return (None, diag, Vec::new());
2162                }
2163                let mut sorted: Vec<(u64, u32)> = counts.into_iter().collect();
2164                sorted.sort_by(|a, b| b.1.cmp(&a.1));
2165                const MAX_CANDIDATES: usize = 3;
2166                sorted.truncate(MAX_CANDIDATES);
2167                let candidate_set: std::collections::HashSet<u64> =
2168                    sorted.iter().map(|(k, _)| *k).collect();
2169                let before = probe_events.len();
2170                probe_events.retain(|e| {
2171                    let key = if let Some(&pidx) = task_param_idx.get(&e.func_idx) {
2172                        e.args[pidx]
2173                    } else {
2174                        e.task_ptr
2175                    };
2176                    candidate_set.contains(&key)
2177                });
2178                tracing::debug!(
2179                    candidates = sorted.len(),
2180                    kept = probe_events.len(),
2181                    total = before,
2182                    "stitched by frequency (fallback — no causal tptr)"
2183                );
2184                diag.events_after_stitch = probe_events.len() as u32;
2185                diag.stitch_fallback_used = true;
2186                let fnames: Vec<(u32, String)> = func_ips
2187                    .iter()
2188                    .map(|(idx, _, name)| (*idx, name.clone()))
2189                    .collect();
2190                return (Some(probe_events), diag, fnames);
2191            };
2192
2193            let before = probe_events.len();
2194            probe_events.retain(|e| {
2195                if let Some(&pidx) = task_param_idx.get(&e.func_idx) {
2196                    e.args[pidx] == tptr
2197                } else {
2198                    e.task_ptr == tptr // no task_struct param — match on current
2199                }
2200            });
2201            tracing::debug!(
2202                tptr = format_args!("0x{tptr:x}"),
2203                kept = probe_events.len(),
2204                total = before,
2205                "stitched by task_struct arg",
2206            );
2207
2208            diag.events_after_stitch = probe_events.len() as u32;
2209
2210            // Attach trigger kstack if available.
2211            if let Some(last) = probe_events.last_mut() {
2212                last.kstack = trigger_kstack;
2213            }
2214
2215            let fnames: Vec<(u32, String)> = func_ips
2216                .iter()
2217                .map(|(idx, _, name)| (*idx, name.clone()))
2218                .collect();
2219            return (Some(probe_events), diag, fnames);
2220        }
2221    }
2222}
2223
2224/// Attach Phase B probes after the scheduler starts.
2225///
2226/// Handles both kernel callers (kprobes + kernel fexit) and BPF
2227/// callbacks (fentry/fexit). Uses `pb.func_idx_offset` for all
2228/// func_idx values to avoid collisions with Phase A indices.
2229///
2230/// Kprobes are attached via the Phase A kprobe skeleton (`skel`),
2231/// which stays alive throughout. BPF fentry/fexit use separate
2232/// skeleton loads that share `probe_data` and `func_meta_map` via
2233/// `reuse_fd`.
2234fn attach_phase_b_fentry(
2235    skel: &crate::bpf_skel::ProbeSkel<'_>,
2236    pb: &PhaseBInput,
2237    func_ips: &mut Vec<(u32, u64, String)>,
2238    fentry_links: &mut Vec<libbpf_rs::Link>,
2239    fexit_links: &mut Vec<libbpf_rs::Link>,
2240    links: &mut Vec<(libbpf_rs::Link, String)>,
2241    diag: &mut ProbeDiagnostics,
2242) {
2243    use libbpf_rs::MapCore;
2244    use libbpf_rs::MapFlags;
2245    use libbpf_rs::skel::{OpenSkel, SkelBuilder};
2246
2247    const FENTRY_BATCH: usize = 4;
2248
2249    struct FentryTarget<'a> {
2250        slot: usize,
2251        fd: i32,
2252        idx: u32,
2253        name: &'a str,
2254        ok: bool,
2255        is_kernel: bool,
2256    }
2257
2258    let offset = pb.func_idx_offset;
2259
2260    // Pre-load vmlinux BTF once for the Phase B kprobe + fentry
2261    // loops below. Same rationale as the Phase A path in
2262    // `run_probe_skeleton`: per-call `resolve_field_specs(_, None)`
2263    // would re-parse the multi-MB vmlinux BTF on every iteration,
2264    // dominating Phase B attach time on a kernel with thousands of
2265    // probed functions.
2266    //
2267    // `cached_vmlinux_btf` shares the process-global memo with Phase
2268    // A so a single auto-repro VM pays one parse for both phases, and
2269    // a multi-VM nextest run pays one parse total.
2270    let cached_btf = crate::monitor::btf_offsets::cached_vmlinux_btf();
2271
2272    // --- Phase B kernel functions: kprobes + func_meta ---
2273    //
2274    // Two-pass shape: first populate `func_meta_map` for every
2275    // resolvable target and stage the (raw_name, idx, ip,
2276    // display_name) tuples, then run all kprobe attaches in
2277    // parallel via [`parallel_attach_kprobes`]. Splitting the loop
2278    // matters because the attach is the slow syscall pair (one
2279    // `perf_event_open` + one `BPF_LINK_CREATE` each); meta
2280    // population is a quick map update. Doing them as a single
2281    // sequential loop forces the slow attach to gate the next
2282    // iteration's meta update for no good reason.
2283    struct PhaseBKprobeTarget {
2284        raw_name: String,
2285        display_name: String,
2286        idx: u32,
2287        ip: u64,
2288    }
2289    let mut kprobe_targets: Vec<PhaseBKprobeTarget> = Vec::new();
2290    for (i, func) in pb.functions.iter().enumerate() {
2291        if func.is_bpf {
2292            continue;
2293        }
2294        let idx = offset + i as u32;
2295        let ip = match resolve_func_ip(&func.raw_name) {
2296            Some(ip) => ip,
2297            None => {
2298                tracing::warn!(func = %func.raw_name, "phase_b: could not resolve function IP");
2299                diag.kprobe_resolve_failed.push(func.raw_name.clone());
2300                continue;
2301            }
2302        };
2303
2304        let mut meta = types::func_meta {
2305            func_idx: idx,
2306            ..Default::default()
2307        };
2308
2309        if let Some(btf_func) = pb.btf_funcs.iter().find(|f| f.name == func.raw_name) {
2310            let field_specs = match cached_btf.as_ref() {
2311                Some(btf) => super::btf::resolve_field_specs_with_btf(btf_func, btf),
2312                None => Vec::new(),
2313            };
2314            populate_field_specs(&mut meta, &field_specs);
2315            meta.str_param_idx = detect_str_param(btf_func);
2316        }
2317
2318        let key_bytes = ip.to_ne_bytes();
2319        let meta_bytes = unsafe {
2320            std::slice::from_raw_parts(
2321                &meta as *const _ as *const u8,
2322                std::mem::size_of::<types::func_meta>(),
2323            )
2324        };
2325
2326        if let Err(e) = skel
2327            .maps
2328            .func_meta_map
2329            .update(&key_bytes, meta_bytes, MapFlags::ANY)
2330        {
2331            tracing::warn!(%e, func = %func.raw_name, "phase_b: failed to update func_meta_map");
2332            continue;
2333        }
2334
2335        kprobe_targets.push(PhaseBKprobeTarget {
2336            raw_name: func.raw_name.clone(),
2337            display_name: func.display_name.clone(),
2338            idx,
2339            ip,
2340        });
2341    }
2342
2343    let attach_input: Vec<String> = kprobe_targets.iter().map(|t| t.raw_name.clone()).collect();
2344    let attach_results = parallel_attach_kprobes(&skel.progs.ktstr_probe, &attach_input);
2345    // Pair the attach result with its target metadata. `attach_results`
2346    // is in the same order as `attach_input` is in the same order as
2347    // `kprobe_targets`, so a zip-and-iterate replays the original
2348    // sequential post-attach bookkeeping (links push, func_ips push,
2349    // counter bumps) without reordering.
2350    for (target, (raw_name, result)) in kprobe_targets.into_iter().zip(attach_results.into_iter()) {
2351        debug_assert_eq!(target.raw_name, raw_name);
2352        match result {
2353            Ok(link) => {
2354                links.push((link, raw_name));
2355                diag.kprobe_attached += 1;
2356            }
2357            Err(e) => {
2358                tracing::warn!(%e, func = %raw_name, "phase_b kprobe attach failed");
2359                diag.kprobe_attach_failed.push((raw_name, e.to_string()));
2360            }
2361        }
2362        diag.kprobe_resolved += 1;
2363        func_ips.push((target.idx, target.ip, target.display_name));
2364    }
2365
2366    // --- Phase B kernel function fexit batches (fd=0 = vmlinux BTF) ---
2367    struct KernelFexitTarget {
2368        idx: u32,
2369        name: String,
2370    }
2371    let kernel_fexit_targets: Vec<KernelFexitTarget> = pb
2372        .functions
2373        .iter()
2374        .enumerate()
2375        .filter(|(_, f)| !f.is_bpf)
2376        .map(|(i, f)| KernelFexitTarget {
2377            idx: offset + i as u32,
2378            name: f.display_name.clone(),
2379        })
2380        .collect();
2381
2382    for chunk in kernel_fexit_targets.chunks(FENTRY_BATCH) {
2383        let mut targets: Vec<FentryTarget<'_>> = Vec::new();
2384        for (slot, kt) in chunk.iter().enumerate() {
2385            targets.push(FentryTarget {
2386                slot,
2387                fd: 0,
2388                idx: kt.idx,
2389                name: &kt.name,
2390                ok: false,
2391                is_kernel: true,
2392            });
2393        }
2394
2395        use crate::bpf_skel::fentry::*;
2396        let mut fentry_open_obj = std::mem::MaybeUninit::uninit();
2397        let fentry_builder = FentryProbeSkelBuilder::default();
2398        let mut fentry_open = match fentry_builder.open(&mut fentry_open_obj) {
2399            Ok(s) => s,
2400            Err(e) => {
2401                tracing::warn!(%e, "phase_b kernel fexit skeleton open failed");
2402                continue;
2403            }
2404        };
2405
2406        if let Some(rodata) = fentry_open.maps.rodata_data.as_mut() {
2407            rodata.ktstr_enabled = true;
2408            for t in &targets {
2409                set_rodata_slot(rodata, t.slot, t.idx, t.is_kernel);
2410            }
2411        }
2412
2413        for t in targets.iter_mut() {
2414            let Some(fentry_prog) = fentry_prog_mut_by_slot(&mut fentry_open, t.slot) else {
2415                continue;
2416            };
2417            fentry_prog.set_autoload(false);
2418
2419            let Some(fexit_prog) = fexit_prog_mut_by_slot(&mut fentry_open, t.slot) else {
2420                continue;
2421            };
2422            match fexit_prog.set_attach_target(0, Some(t.name.to_string())) {
2423                Ok(()) => {
2424                    t.ok = true;
2425                }
2426                Err(e) => {
2427                    tracing::debug!(func = t.name, %e, "phase_b kernel fexit: set_attach_target failed");
2428                    fexit_prog.set_autoload(false);
2429                }
2430            }
2431        }
2432
2433        if !targets.iter().any(|t| t.ok) {
2434            continue;
2435        }
2436
2437        // Disable fexit for unused slots (see the matching single-phase
2438        // kernel-fexit batch for the fentry-left-at-default rationale).
2439        let used_slots: std::collections::HashSet<usize> =
2440            targets.iter().filter(|t| t.ok).map(|t| t.slot).collect();
2441        for slot in 0..FENTRY_BATCH {
2442            if !used_slots.contains(&slot)
2443                && let Some(p) = fexit_prog_mut_by_slot(&mut fentry_open, slot)
2444            {
2445                p.set_autoload(false);
2446            }
2447        }
2448
2449        use std::os::unix::io::AsFd;
2450        if let Err(e) = fentry_open
2451            .maps
2452            .probe_data
2453            .reuse_fd(skel.maps.probe_data.as_fd())
2454        {
2455            tracing::warn!(%e, "phase_b kernel fexit: probe_data reuse_fd failed");
2456        }
2457        if let Err(e) = fentry_open
2458            .maps
2459            .func_meta_map
2460            .reuse_fd(skel.maps.func_meta_map.as_fd())
2461        {
2462            tracing::warn!(%e, "phase_b kernel fexit: func_meta_map reuse_fd failed");
2463        }
2464
2465        let fentry_skel = match fentry_open.load() {
2466            Ok(s) => s,
2467            Err(e) => {
2468                tracing::warn!(%e, "phase_b kernel fexit: batch load failed");
2469                continue;
2470            }
2471        };
2472
2473        for t in &targets {
2474            if !t.ok {
2475                continue;
2476            }
2477            let Some(result) = attach_fexit_by_slot(&fentry_skel, t.slot) else {
2478                continue;
2479            };
2480            match result {
2481                Ok(link) => {
2482                    tracing::debug!(func = t.name, "phase_b kernel fexit attached");
2483                    fexit_links.push(link);
2484                }
2485                Err(e) => {
2486                    tracing::debug!(%e, func = t.name, "phase_b kernel fexit attach failed");
2487                }
2488            }
2489        }
2490
2491        drop(fentry_skel);
2492    }
2493
2494    // --- Phase B BPF callback fentry/fexit batches ---
2495    let valid_bpf: Vec<(u32, &StackFunction)> = pb
2496        .functions
2497        .iter()
2498        .enumerate()
2499        .filter(|(_, f)| f.bpf_prog_id.is_some())
2500        .map(|(i, f)| (offset + i as u32, f))
2501        .collect();
2502    diag.fentry_candidates += valid_bpf.len() as u32;
2503
2504    for chunk in valid_bpf.chunks(FENTRY_BATCH) {
2505        let mut targets: Vec<FentryTarget<'_>> = Vec::new();
2506        for (slot, (idx, func)) in chunk.iter().enumerate() {
2507            let prog_id = func.bpf_prog_id.unwrap();
2508            let fd = if let Some(&pre_fd) = pb.bpf_prog_fds.get(&prog_id) {
2509                let dup_fd = unsafe { libc::dup(pre_fd) };
2510                if dup_fd < 0 {
2511                    tracing::warn!(prog_id, func = %func.display_name, "phase_b fentry: dup failed");
2512                    diag.fentry_attach_failed.push((
2513                        func.display_name.clone(),
2514                        format!("dup(pre_fd={pre_fd}) failed"),
2515                    ));
2516                    continue;
2517                }
2518                dup_fd
2519            } else {
2520                let fd = unsafe { libbpf_rs::libbpf_sys::bpf_prog_get_fd_by_id(prog_id) };
2521                if fd < 0 {
2522                    tracing::warn!(prog_id, func = %func.display_name, "phase_b fentry: failed to get fd");
2523                    diag.fentry_attach_failed.push((
2524                        func.display_name.clone(),
2525                        format!("bpf_prog_get_fd_by_id({prog_id}) returned {fd}"),
2526                    ));
2527                    continue;
2528                }
2529                fd
2530            };
2531            targets.push(FentryTarget {
2532                slot,
2533                fd,
2534                idx: *idx,
2535                name: &func.display_name,
2536                ok: false,
2537                is_kernel: false,
2538            });
2539        }
2540        if targets.is_empty() {
2541            continue;
2542        }
2543
2544        use crate::bpf_skel::fentry::*;
2545        let mut fentry_open_obj = std::mem::MaybeUninit::uninit();
2546        let fentry_builder = FentryProbeSkelBuilder::default();
2547        let mut fentry_open = match fentry_builder.open(&mut fentry_open_obj) {
2548            Ok(s) => s,
2549            Err(e) => {
2550                tracing::warn!(%e, "phase_b fentry skeleton open failed");
2551                for t in &targets {
2552                    unsafe { libc::close(t.fd) };
2553                }
2554                continue;
2555            }
2556        };
2557
2558        if let Some(rodata) = fentry_open.maps.rodata_data.as_mut() {
2559            rodata.ktstr_enabled = true;
2560            for t in &targets {
2561                set_rodata_slot(rodata, t.slot, t.idx, t.is_kernel);
2562            }
2563        }
2564
2565        for t in targets.iter_mut() {
2566            let Some(fentry_prog) = fentry_prog_mut_by_slot(&mut fentry_open, t.slot) else {
2567                continue;
2568            };
2569            match fentry_prog.set_attach_target(t.fd, Some(t.name.to_string())) {
2570                Ok(()) => {
2571                    t.ok = true;
2572                }
2573                Err(e) => {
2574                    tracing::warn!(slot = t.slot, func = t.name, %e, "phase_b fentry: set_attach_target failed");
2575                    diag.fentry_attach_failed
2576                        .push((t.name.to_string(), format!("set_attach_target: {e}")));
2577                    continue;
2578                }
2579            }
2580            let Some(fexit_prog) = fexit_prog_mut_by_slot(&mut fentry_open, t.slot) else {
2581                continue;
2582            };
2583            if let Err(e) = fexit_prog.set_attach_target(t.fd, Some(t.name.to_string())) {
2584                tracing::debug!(slot = t.slot, func = t.name, %e, "phase_b fexit: set_attach_target failed (entry-only)");
2585                fexit_prog.set_autoload(false);
2586            }
2587        }
2588
2589        if !targets.iter().any(|t| t.ok) {
2590            for t in &targets {
2591                unsafe { libc::close(t.fd) };
2592            }
2593            continue;
2594        }
2595
2596        let used_slots: std::collections::HashSet<usize> =
2597            targets.iter().filter(|t| t.ok).map(|t| t.slot).collect();
2598        for slot in 0..FENTRY_BATCH {
2599            if !used_slots.contains(&slot) {
2600                disable_slot_programs(&mut fentry_open, slot);
2601            }
2602        }
2603
2604        use std::os::unix::io::AsFd;
2605        if let Err(e) = fentry_open
2606            .maps
2607            .probe_data
2608            .reuse_fd(skel.maps.probe_data.as_fd())
2609        {
2610            tracing::warn!(%e, "phase_b fentry: probe_data reuse_fd failed");
2611        }
2612        if let Err(e) = fentry_open
2613            .maps
2614            .func_meta_map
2615            .reuse_fd(skel.maps.func_meta_map.as_fd())
2616        {
2617            tracing::warn!(%e, "phase_b fentry: func_meta_map reuse_fd failed");
2618        }
2619
2620        let fentry_skel = match fentry_open.load() {
2621            Ok(s) => {
2622                for t in &targets {
2623                    unsafe { libc::close(t.fd) };
2624                }
2625                s
2626            }
2627            Err(e) => {
2628                tracing::warn!(%e, "phase_b fentry: batch load failed");
2629                for t in &targets {
2630                    if t.ok {
2631                        diag.fentry_attach_failed
2632                            .push((t.name.to_string(), format!("batch load: {e}")));
2633                    }
2634                    unsafe { libc::close(t.fd) };
2635                }
2636                continue;
2637            }
2638        };
2639
2640        for t in &targets {
2641            if !t.ok {
2642                continue;
2643            }
2644
2645            let sentinel_ip = (t.idx as u64) | (1u64 << 63);
2646            let mut meta = crate::bpf_skel::types::func_meta {
2647                func_idx: t.idx,
2648                ..Default::default()
2649            };
2650
2651            if let Some(btf_func) = pb.btf_funcs.iter().find(|f| f.name == t.name) {
2652                let mut field_specs = match cached_btf.as_ref() {
2653                    Some(btf) => super::btf::resolve_field_specs_with_btf(btf_func, btf),
2654                    None => Vec::new(),
2655                };
2656                if field_specs.is_empty()
2657                    && let Some(prog_id) = pb
2658                        .functions
2659                        .iter()
2660                        .find(|f| f.display_name == t.name)
2661                        .and_then(|f| f.bpf_prog_id)
2662                {
2663                    field_specs = super::btf::resolve_bpf_field_specs(btf_func, prog_id);
2664                }
2665                populate_field_specs(&mut meta, &field_specs);
2666                meta.str_param_idx = detect_str_param(btf_func);
2667            }
2668
2669            let Some(result) = attach_fentry_by_slot(&fentry_skel, t.slot) else {
2670                continue;
2671            };
2672            let link = match result {
2673                Ok(link) => {
2674                    tracing::debug!(func = t.name, "phase_b fentry attached");
2675                    link
2676                }
2677                Err(e) => {
2678                    tracing::warn!(%e, func = t.name, "phase_b fentry attach failed");
2679                    diag.fentry_attach_failed
2680                        .push((t.name.to_string(), e.to_string()));
2681                    continue;
2682                }
2683            };
2684
2685            // func_meta_map.update + func_ips.push run AFTER the
2686            // fentry attach succeeds. See the matching ordering
2687            // rationale at the phase A site above: reversing the
2688            // order would orphan map entries and func_ip tuples on
2689            // attach failure. If the map update fails after the
2690            // attach succeeded, drop the Link so post-attach state
2691            // matches what func_ips reports.
2692            let key_bytes = sentinel_ip.to_ne_bytes();
2693            let meta_bytes = unsafe {
2694                std::slice::from_raw_parts(
2695                    &meta as *const _ as *const u8,
2696                    std::mem::size_of::<crate::bpf_skel::types::func_meta>(),
2697                )
2698            };
2699            if let Err(e) = skel
2700                .maps
2701                .func_meta_map
2702                .update(&key_bytes, meta_bytes, MapFlags::ANY)
2703            {
2704                tracing::warn!(%e, func = t.name, "phase_b fentry: failed to update func_meta_map; dropping attached link");
2705                drop(link);
2706                continue;
2707            }
2708            fentry_links.push(link);
2709            func_ips.push((t.idx, sentinel_ip, t.name.to_string()));
2710            let Some(fexit_result) = attach_fexit_by_slot(&fentry_skel, t.slot) else {
2711                continue;
2712            };
2713            match fexit_result {
2714                Ok(link) => {
2715                    tracing::debug!(func = t.name, "phase_b fexit attached");
2716                    fexit_links.push(link);
2717                }
2718                Err(e) => {
2719                    tracing::debug!(%e, func = t.name, "phase_b fexit attach failed (entry-only)");
2720                }
2721            }
2722        }
2723
2724        drop(fentry_skel);
2725    }
2726
2727    diag.fentry_attached = fentry_links.len() as u32;
2728    tracing::debug!(
2729        fentry = fentry_links.len(),
2730        fexit = fexit_links.len(),
2731        bpf_targets = valid_bpf.len(),
2732        kernel_targets = kernel_fexit_targets.len(),
2733        "Phase B probes attached",
2734    );
2735}
2736
2737#[cfg(test)]
2738#[path = "process_tests.rs"]
2739mod tests;