ktstr/scenario/snapshot/
bridge.rs

1//! [`SnapshotBridge`] is the request/reply channel between the
2//! scenario executor and the host capture pipeline. Implements
3//! callbacks ([`CaptureCallback`], [`WatchRegisterCallback`]), the
4//! per-thread bridge installation guard ([`BridgeGuard`]), the
5//! diagnostic event log ([`SnapshotBridgeEvent`]), and the
6//! storage caps ([`MAX_STORED_SNAPSHOTS`], [`MAX_STORED_EVENTS`],
7//! [`MAX_WATCH_SNAPSHOTS`]).
8
9use std::collections::{HashMap, VecDeque};
10use std::sync::{Arc, Mutex};
11
12use crate::monitor::dump::FailureDumpReport;
13use crate::sync::MutexExt;
14
15// ---------------------------------------------------------------------------
16// Bridge: request/reply channel between executor and host capture
17// ---------------------------------------------------------------------------
18
19/// Closure type the bridge invokes to capture a snapshot.
20///
21/// Returns `None` when the capture pipeline could not produce a
22/// report (rendezvous timed out, capture prerequisites missing, no
23/// host-side wiring).
24///
25/// **Wire shape (locked: ioeventfd doorbell).** The production
26/// implementation writes the tag into a small per-call slot inside
27/// the SHM region, performs an `mmap`'d `u32` write to the
28/// doorbell GPA inside the MMIO gap (KVM dispatches via
29/// `KVM_IOEVENTFD` without a userspace exit), then blocks on a
30/// per-request reply completion (an eventfd / mpsc receiver paired
31/// with the doorbell registration). The freeze coordinator's
32/// epoll loop wakes on the doorbell eventfd, reads the tag, runs
33/// `freeze_and_dispatch`, and signals the reply completion with
34/// the resulting `Option<FailureDumpReport>`.
35///
36/// On-demand captures are orthogonal to the error-trigger
37/// `freeze_state` machine — the request handler in the coordinator
38/// must not transition `freeze_state` from Idle, and must service
39/// requests even when `freeze_state == Done`. The
40/// rendezvous-serialisation invariant is the only constraint: each
41/// request waits for `all parked == false` from the previous
42/// capture before issuing.
43pub type CaptureCallback = Arc<dyn Fn(&str) -> Option<FailureDumpReport> + Send + Sync + 'static>;
44
45/// Closure type the bridge invokes to register a hardware-watchpoint
46/// snapshot.
47///
48/// This callback is the host-side unit-testing seam — it lets
49/// in-process executor tests record the symbol and return without
50/// arming any hardware. In a booted VM the bridge's
51/// `register_watch` is **not** installed; the in-guest
52/// `Op::WatchSnapshot` arm rings an SHM doorbell and the host's
53/// freeze coordinator runs `arm_user_watchpoint`
54/// (`src/vmm/freeze_coord/mod.rs`), which resolves the symbol via a
55/// verbatim match against the vmlinux ELF symtab, allocates a
56/// free user watchpoint slot (3 user slots are available; slot 0
57/// is reserved for the existing `*scx_root->exit_kind` trigger),
58/// and arms the hardware watchpoint via `KVM_SET_GUEST_DEBUG`.
59///
60/// Once armed, the capture tagged with the symbol fires on every
61/// guest write without any further userspace round-trip — the
62/// debug exit dispatches into the freeze coordinator directly,
63/// mirroring the existing reserved-slot path the error-class
64/// trigger already uses.
65///
66/// Returns `Err(reason)` when:
67///   - The symbol does not match any vmlinux ELF symtab entry
68///     (typo, symbol stripped from the build, or a non-ELF kernel
69///     image).
70///   - The resolved KVA is not 4-byte aligned (the 4-byte watch
71///     length the framework arms requires `addr & 0x3 == 0` on
72///     every supported architecture).
73///   - All three available user watchpoint slots are already
74///     allocated.
75///   - `KVM_SET_GUEST_DEBUG` rejected the arm (host kernel
76///     limitation).
77pub type WatchRegisterCallback =
78    Arc<dyn Fn(&str) -> std::result::Result<(), String> + Send + Sync + 'static>;
79
80/// Closure type the bridge invokes for a host-side kernel-memory
81/// write or read (`Op::WriteKernel{Hot,Cold}` /
82/// `Op::ReadKernel{Hot,Cold}`).
83///
84/// The host dispatches the request (a sequence of
85/// `(KernelTarget, KernelValue)` entries, plus mode/direction/tag
86/// metadata in `crate::vmm::wire::KernelOpRequestPayload`) against
87/// the resolved guest-memory accessor. Returns
88/// `crate::vmm::wire::KernelOpReplyPayload` mirroring the request
89/// id, the success/error status, and (for reads) the read-back
90/// bytes per entry.
91///
92/// Test fixtures install a closure that records the request and
93/// returns a synthetic reply without touching real guest memory
94/// (the in-process bridge surface stays mockable). The in-VM
95/// production path goes through the wire layer
96/// (`crate::vmm::wire::MsgType::KernelOpRequest`) and the freeze
97/// coordinator / hot-path worker, NOT this callback — the bridge
98/// keeps it Option<…> so executor tests can install one while real
99/// VM runs leave it unset.
100pub type KernelOpCallback = Arc<
101    dyn Fn(&crate::vmm::wire::KernelOpRequestPayload) -> crate::vmm::wire::KernelOpReplyPayload
102        + Send
103        + Sync
104        + 'static,
105>;
106
107/// Shared state owning the capture closure plus the captured-report
108/// map.
109///
110/// Cloneable via the wrapped `Arc`s. The host installs an instance
111/// in the executor's thread-local via `Self::set_thread_local`
112/// before [`execute_steps`](crate::scenario::ops::execute_steps)
113/// runs; the executor's `Op::CaptureSnapshot` arm calls
114/// `Self::capture` with the op's name.
115/// Maximum number of [`Op::WatchSnapshot`](crate::scenario::ops::Op::WatchSnapshot)
116/// ops a single scenario may register.
117///
118/// This is the framework's per-scenario cap on user watchpoint slots
119/// across every supported host architecture, not a count of debug
120/// registers on any specific arch. One additional slot (slot 0) is
121/// always reserved internally for the `*scx_root->exit_kind`
122/// watchpoint that drives the error-class freeze trigger, so a host
123/// must expose at least 4 hardware watchpoint slots through
124/// `KVM_SET_GUEST_DEBUG` for every user `Op::WatchSnapshot` to arm.
125/// Common x86_64 and aarch64 hosts meet that bar.
126///
127/// The actual host slot count is probed once during VM bring-up via
128/// `KVM_CHECK_EXTENSION(KVM_CAP_GUEST_DEBUG_HW_WPS)` in
129/// `crate::vmm::freeze_coord` (search for `Cap::DebugHwWps`); a
130/// host returning `<= 0` or fewer than 4 slots logs a `tracing::warn!`
131/// at coordinator setup. Per-arm failures surface as `tracing::warn!`
132/// from `self_arm_watchpoint` with per-vCPU retry capping at
133/// `WATCHPOINT_MAX_NON_EINTR_FAILURES`.
134pub const MAX_WATCH_SNAPSHOTS: usize = 3;
135
136/// Maximum number of [`FailureDumpReport`]s the bridge keeps. Captures
137/// driven by a Loop step with a unique tag per iteration would
138/// otherwise grow the storage map without bound — every report
139/// renders a full BTF tree (potentially hundreds of KB), so an
140/// uncapped bridge under hostile/runaway capture frequency drains
141/// host memory. The bridge enforces FIFO eviction at this cap so the
142/// most recent captures stay reachable; eviction logs a `tracing::warn!`
143/// naming the dropped tag so the operator sees the truncation.
144pub const MAX_STORED_SNAPSHOTS: usize = 64;
145
146/// Maximum number of [`SnapshotBridgeEvent`] entries the bridge
147/// retains between [`SnapshotBridge::drain_events`] calls. A scenario
148/// that triggers many cap-eviction events (a Loop step that captures
149/// a unique tag every 30ms for 10 minutes produces ~20 000 events,
150/// each ~100 bytes) would otherwise grow the events log without
151/// bound. The bridge enforces FIFO eviction at this cap — when push
152/// would exceed it, the oldest event is dropped, the dropped count
153/// is tracked on `SnapshotStore::events_dropped`, and the next
154/// [`SnapshotBridge::drain_events`] call appends a synthetic
155/// [`SnapshotBridgeEvent::EventLogTruncated`] entry at the tail so
156/// the operator never silently loses events. The cap is loose enough
157/// (1024 events × ~100 bytes ≈ 100 KiB) that legitimate scenarios
158/// never hit it; only runaway capture frequency does.
159pub const MAX_STORED_EVENTS: usize = 1024;
160
161/// A structured event surfaced by the [`SnapshotBridge`] during its
162/// own operation (capture, storage, drain). Promotes the previous
163/// `tracing::warn!`-only diagnostic channel into an operator-
164/// drainable structured row so tests can assert on bridge-side
165/// conditions (eviction, missing capture, invariant violations)
166/// instead of grepping stderr.
167///
168/// Distinct from [`crate::assert::AssertDetail`]: an `AssertDetail`
169/// is a per-assertion outcome (Starved / Stuck / etc.); a
170/// `SnapshotBridgeEvent` is a per-bridge meta-event about the
171/// storage pipeline itself. Mixing them at the assertion level
172/// would conflate "scheduler behavior failed" with "bridge dropped
173/// an entry due to cap" — two orthogonal concerns. Test authors
174/// who want to fail their scenario on a bridge event compose the
175/// two streams themselves (drain events, convert to `AssertDetail`
176/// if needed) — see [`SnapshotBridge::drain_events`].
177///
178/// Every bridge site that previously emitted only `tracing::warn!`
179/// still emits the warn (preserved for stderr visibility) AND
180/// appends the structured variant here. "Promote, don't replace."
181#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
182#[non_exhaustive]
183pub enum SnapshotBridgeEvent {
184    /// Capture callback returned `None` for `tag` — the corresponding
185    /// `Op::CaptureSnapshot` was a no-op. Fires from
186    /// [`SnapshotBridge::capture`] when the host couldn't freeze /
187    /// build the report (scheduler died before the freeze, scan
188    /// accessor unavailable, etc.).
189    CaptureUnavailable {
190        /// Tag the failed capture was attempted under.
191        tag: String,
192    },
193    /// Storage of `tag` overwrote a prior entry. Fires from
194    /// [`SnapshotBridge::store`] / [`SnapshotBridge::store_with_stats`]
195    /// when `bridge.store(tag, ...)` is called with a tag that
196    /// already has a stored report. FIFO order is refreshed to back,
197    /// prior `(stats, elapsed_ms)` parallel slots are replaced.
198    Overwrite {
199        /// Tag whose prior entry was overwritten.
200        tag: String,
201        /// `schema` of the prior entry — included for diagnostic
202        /// context (a schema bump alongside an unintended overwrite
203        /// is the textbook double-tag bug).
204        prior_schema: String,
205    },
206    /// FIFO eviction of `evicted_tag` triggered by storing
207    /// `new_tag`. Fires from the cap-enforcement loop in
208    /// `store_internal` when `reports.len()` exceeds
209    /// [`MAX_STORED_SNAPSHOTS`] after insertion. `cap` is the limit
210    /// at the time of eviction.
211    Eviction {
212        /// Tag that was popped from the FIFO to make room.
213        evicted_tag: String,
214        /// Tag whose storage triggered the cap-overflow.
215        new_tag: String,
216        /// Cap value at the time — folded in so the operator
217        /// doesn't have to cross-reference [`MAX_STORED_SNAPSHOTS`].
218        cap: usize,
219    },
220    /// A drain found `tag` in `reports` but missing from `order` —
221    /// internal invariant violation. The report was surfaced at the
222    /// tail of the drain output rather than dropped silently; this
223    /// event flags the bug so test authors who care can fail their
224    /// scenario.
225    DrainOrderingInvariantViolation {
226        /// Tag whose desynchronised entry was surfaced at the tail.
227        tag: String,
228        /// Which drain variant fired the warning —
229        /// `"drain_ordered"` or `"drain_ordered_with_stats"`. Lets
230        /// post-mortem analysis disambiguate the two code paths.
231        drain_variant: &'static str,
232    },
233    /// The cap-enforcement loop in `store_internal` found
234    /// `reports.len() > cap` while `order` was empty — a worse
235    /// invariant violation than [`Self::DrainOrderingInvariantViolation`]
236    /// because the bulk-clear branch nukes ALL reports / stats /
237    /// elapsed_ms to restore the invariant. Unreachable through the
238    /// current public API (every insert site appends to `order`
239    /// alongside `reports`), but recorded for the same future-proofing
240    /// reason as the drain variant: a refactor that desynchronised
241    /// the two collections must not be allowed to silently drop the
242    /// entire bridge state.
243    CapInvariantViolation {
244        /// `reports.len()` at the moment the bulk-clear was triggered.
245        /// Folded in so the operator can see how much state was
246        /// nuked.
247        reports_len: usize,
248        /// Cap value at the time — same definition as
249        /// [`Self::Eviction::cap`].
250        cap: usize,
251    },
252    /// The events log itself hit [`MAX_STORED_EVENTS`] and dropped
253    /// `dropped_count` oldest events to keep memory bounded. The
254    /// bridge appends this variant at the tail of every
255    /// [`SnapshotBridge::drain_events`] result whenever
256    /// `events_dropped > 0` (resets to 0 after drain), so the
257    /// operator never silently loses events — they see a count of
258    /// how many were dropped between drains. Test authors who care
259    /// about exhaustive coverage should `assert!(!matches!(events
260    /// .last(), Some(SnapshotBridgeEvent::EventLogTruncated { .. })))`
261    /// to fail when the bridge truncated.
262    EventLogTruncated {
263        /// Number of events evicted from the front of the log since
264        /// the last [`SnapshotBridge::drain_events`] call. Resets to
265        /// 0 after drain.
266        dropped_count: u64,
267    },
268}
269
270/// Inner storage for [`SnapshotBridge::snapshots`]. Pairs the
271/// HashMap-keyed reports with a [`VecDeque`] tracking insertion
272/// order so the FIFO eviction in [`SnapshotBridge::store`] can pop
273/// the oldest tag in O(1) when the cap is reached. The optional
274/// `stats` map carries the scheduler-stats JSON captured at the
275/// same boundary as the snapshot — only periodic captures populate
276/// this; on-demand and watchpoint captures leave the slot empty
277/// because no stats request is issued.
278pub(super) struct SnapshotStore {
279    pub(super) reports: HashMap<String, FailureDumpReport>,
280    /// scx_stats JSON captured at the same wall-clock as the report
281    /// stored under the same tag in `reports`. Periodic captures
282    /// populate this when a stats client is wired and the request
283    /// succeeds; on-demand / watchpoint paths leave the entry
284    /// absent. `drain_ordered_with_stats` `remove`s the per-tag slot
285    /// and surfaces it as `Sample::stats`; an absent/`None` slot is
286    /// the expected shape for non-periodic tags or when the scheduler
287    /// stats request failed.
288    pub(super) stats: HashMap<String, Result<serde_json::Value, super::error::MissingStatsReason>>,
289    /// Elapsed milliseconds since `run_start` at the moment the
290    /// periodic capture fired. Same key set as `reports` for
291    /// periodic tags; absent for non-periodic captures. Read by
292    /// [`SnapshotBridge::drain_ordered_with_stats`] to populate
293    /// `Sample::elapsed_ms` without recomputing.
294    pub(super) elapsed_ms: HashMap<String, u64>,
295    /// Workload-relative boundary OFFSET (periodic `boundary_ns -
296    /// scenario_anchor_ns`, in ms) this periodic capture was
297    /// SCHEDULED for — distinct from `elapsed_ms`, which is the
298    /// run_start-relative FIRE time (~uniform across the deferred-fire
299    /// burst, so useless for per-phase attribution). Same key set as
300    /// `reports` for periodic tags; absent for non-periodic / on-demand
301    /// captures. Drained into
302    /// [`super::error::DrainedSnapshotEntry::boundary_offset_ms`] and
303    /// consumed by `build_phase_buckets[_with_stimulus]` to attribute
304    /// the capture to the guest step whose stimulus window contains
305    /// this offset, and as the workload-relative bucket start/end.
306    pub(super) boundary_offset_ms: HashMap<String, u64>,
307    /// Per-VM scenario step index stamped on the capture by the
308    /// step-aware entry points
309    /// ([`SnapshotBridge::capture_with_step`] /
310    /// [`SnapshotBridge::store_with_stats_and_step`]). Absent for
311    /// fixture-injected captures stored via the unstamped legacy
312    /// paths ([`SnapshotBridge::capture`] / [`SnapshotBridge::store`]
313    /// / [`SnapshotBridge::store_with_stats`]). Encoded per the
314    /// 1-indexed phase convention (`0` = BASELINE, `1..=N` = Step
315    /// ordinals); drained in lock-step with `reports` / `stats` /
316    /// `elapsed_ms` and surfaced as the
317    /// [`super::error::DrainedSnapshotEntry::step_index`] field so
318    /// the phase-aware aggregator can bucket each sample directly.
319    pub(super) step_index: HashMap<String, u16>,
320    /// Insertion order of currently-resident keys. An overwrite of
321    /// an existing key MUST remove the prior entry from this deque
322    /// before pushing the fresh occurrence so the `reports.len()`
323    /// and `order.len()` invariants stay in lock-step.
324    pub(super) order: VecDeque<String>,
325    /// Structured bridge-side meta-events appended in insertion
326    /// order. Every site that previously emitted only a
327    /// `tracing::warn!` also pushes the corresponding
328    /// [`SnapshotBridgeEvent`] variant here. Drained by
329    /// [`SnapshotBridge::drain_events`] so test authors can assert
330    /// on bridge meta-conditions (eviction, overwrite, missing
331    /// capture, invariant violation) without grepping stderr.
332    /// Capped at [`MAX_STORED_EVENTS`] via FIFO eviction in
333    /// `push_event`; dropped count is tracked in `events_dropped`
334    /// and surfaced as a synthetic
335    /// [`SnapshotBridgeEvent::EventLogTruncated`] appended at the
336    /// tail of the next `drain_events` result so no event loss is
337    /// silent.
338    events: Vec<SnapshotBridgeEvent>,
339    /// Number of events evicted from the front of `events` since
340    /// the last `drain_events` call. Reset to 0 on drain.
341    /// Drain appends [`SnapshotBridgeEvent::EventLogTruncated`] at
342    /// the tail when this is non-zero so the operator never silently
343    /// loses events — they always see a marker carrying the dropped
344    /// count.
345    events_dropped: u64,
346}
347
348impl SnapshotStore {
349    fn new() -> Self {
350        Self {
351            reports: HashMap::new(),
352            stats: HashMap::new(),
353            elapsed_ms: HashMap::new(),
354            boundary_offset_ms: HashMap::new(),
355            step_index: HashMap::new(),
356            order: VecDeque::new(),
357            events: Vec::new(),
358            events_dropped: 0,
359        }
360    }
361
362    /// Append `event` to `events`, enforcing [`MAX_STORED_EVENTS`]
363    /// via FIFO eviction. When push would exceed the cap, the
364    /// oldest entry is removed and `events_dropped` is incremented
365    /// so a subsequent [`SnapshotBridge::drain_events`] call can
366    /// surface a [`SnapshotBridgeEvent::EventLogTruncated`] marker
367    /// — the operator never silently loses events. The fast path
368    /// (cap not reached) is a single push with no extra allocation.
369    fn push_event(&mut self, event: SnapshotBridgeEvent) {
370        if self.events.len() >= MAX_STORED_EVENTS {
371            // Drop the oldest. Vec::remove(0) is O(n) but the cap
372            // is bounded and this branch only fires in pathological
373            // runaway-capture scenarios.
374            self.events.remove(0);
375            self.events_dropped = self.events_dropped.saturating_add(1);
376        }
377        self.events.push(event);
378    }
379}
380
381/// RAII guard for a reserved [`SnapshotBridge::watch_count`] slot.
382///
383/// [`SnapshotBridge::register_watch`] reserves a slot via CAS BEFORE
384/// calling the host's watch-register callback so concurrent callers
385/// cannot push the count past [`MAX_WATCH_SNAPSHOTS`] even
386/// transiently. If the callback panics (rather than returning Err),
387/// the prior manual-fetch_sub rollback never ran — the slot would
388/// leak permanently and every future `register_watch` call would hit
389/// the cap with no real watchpoints armed. This guard releases the
390/// reservation on every exit path (Err-return AND unwind); the
391/// success path commits the slot via `mem::forget`.
392struct WatchSlotGuard<'a> {
393    count: &'a std::sync::atomic::AtomicUsize,
394}
395
396impl Drop for WatchSlotGuard<'_> {
397    fn drop(&mut self) {
398        self.count
399            .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
400    }
401}
402
403/// A single `cgroup.procs` snapshot captured by
404/// [`Op::CaptureCgroupProcs`](crate::scenario::ops::Op::CaptureCgroupProcs).
405///
406/// Tests drain the per-bridge log of these via
407/// [`SnapshotBridge::drain_cgroup_procs`] after the scenario completes,
408/// then assert on the captured `pids` (membership, count, identity)
409/// against the workload they spawned. Carries:
410///
411/// - `tag`: the snapshot key supplied in the Op (lets a scenario
412///   capture pre/post snapshots of the same cgroup at different
413///   points and disambiguate them on drain).
414/// - `cgroup`: the cgroup name the kernel read returned PIDs for.
415/// - `pids`: the thread-group leaders (PIDs / TGIDs) the kernel
416///   reported in `cgroup.procs` at capture time. An empty Vec means
417///   the cgroup existed but held no tasks. A read against a missing
418///   cgroup directory errors at the dispatch arm and never produces
419///   a snapshot — so the presence of a `CgroupProcsSnapshot` in the
420///   drain log implies the read succeeded.
421#[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
422#[non_exhaustive]
423pub struct CgroupProcsSnapshot {
424    /// Snapshot key supplied to
425    /// [`Op::CaptureCgroupProcs`](crate::scenario::ops::Op::CaptureCgroupProcs).
426    /// Distinguishes multiple captures of the same cgroup.
427    pub tag: String,
428    /// Cgroup name the kernel read returned PIDs for.
429    pub cgroup: String,
430    /// Thread-group leaders in the cgroup at capture time.
431    /// `libc::pid_t` is `i32` on Linux; compare against other ktstr
432    /// pid surfaces (`WorkloadHandle::worker_pids()`, etc.) via the
433    /// matching integer type or `as i32` cast when types differ.
434    pub pids: Vec<libc::pid_t>,
435}
436
437impl CgroupProcsSnapshot {
438    /// Construct a `CgroupProcsSnapshot`. The struct is
439    /// `#[non_exhaustive]` so external callers cannot use
440    /// struct-literal syntax — use this constructor instead. The
441    /// framework's [`SnapshotBridge::record_cgroup_procs`] dispatch
442    /// site routes through here so test fixtures that synthesise
443    /// snapshots for drain-consumer testing share the same
444    /// construction path.
445    pub fn new(tag: impl Into<String>, cgroup: impl Into<String>, pids: Vec<libc::pid_t>) -> Self {
446        Self {
447            tag: tag.into(),
448            cgroup: cgroup.into(),
449            pids,
450        }
451    }
452}
453
454/// Host-side capture pipeline that the freeze coordinator routes
455/// [`Op::CaptureSnapshot`](crate::scenario::ops::Op::CaptureSnapshot) and
456/// [`Op::WatchSnapshot`](crate::scenario::ops::Op::WatchSnapshot)
457/// requests through.
458///
459/// Construct via [`SnapshotBridge::new`] (with an explicit capture
460/// callback) and optionally [`SnapshotBridge::with_watch_register`]
461/// to attach watch support. Install for the current thread via
462/// [`SnapshotBridge::set_thread_local`] — see [`BridgeGuard`] for
463/// the RAII teardown contract.
464#[derive(Clone)]
465#[must_use = "dropping a SnapshotBridge discards the capture pipeline"]
466pub struct SnapshotBridge {
467    capture: CaptureCallback,
468    register_watch: Option<WatchRegisterCallback>,
469    kernel_op: Option<KernelOpCallback>,
470    pub(super) snapshots: Arc<Mutex<SnapshotStore>>,
471    /// Per-tag drain log of kernel-op reply payloads. Test fixtures
472    /// inspect this via [`SnapshotBridge::drain_kernel_ops`] after
473    /// `execute_steps` returns to verify each request's outcome.
474    kernel_ops: Arc<Mutex<Vec<(String, crate::vmm::wire::KernelOpReplyPayload)>>>,
475    /// Per-tag drain log of cgroup.procs snapshots captured by
476    /// [`Op::CaptureCgroupProcs`](crate::scenario::ops::Op::CaptureCgroupProcs).
477    /// Stored in insertion order so a scenario capturing the same
478    /// cgroup at multiple points (with distinct tags) sees both
479    /// snapshots when draining. Test fixtures drain via
480    /// [`SnapshotBridge::drain_cgroup_procs`] after the scenario
481    /// completes; each entry carries the tag, the cgroup name read,
482    /// and the PIDs the kernel reported in `cgroup.procs`.
483    cgroup_procs: Arc<Mutex<Vec<CgroupProcsSnapshot>>>,
484    watch_count: Arc<std::sync::atomic::AtomicUsize>,
485    /// Monotonic counter bumped by the freeze-coordinator's accessor-
486    /// init worker on every successful slot publish (initial attach +
487    /// each subsequent re-init triggered by scheduler swap). Paired
488    /// with [`Self::accessor_worker_state`] so a scheduler-swap op can
489    /// wait for the new scheduler's BPF maps to land before returning
490    /// success. `None` for bridges built without an accessor (test
491    /// fixtures that never trigger reinit); see
492    /// [`Self::with_accessor_state`].
493    accessor_publish_seqno: Option<Arc<std::sync::atomic::AtomicU64>>,
494    /// Liveness sentinel for the accessor-init worker — set by the
495    /// worker as it transitions through its retry / publish / exit
496    /// states. Values match [`accessor_worker_state`] module constants
497    /// (Trying / Succeeded / FailedPermanently). The dispatcher in
498    /// `Op::ReplaceScheduler` / `Op::AttachScheduler` reads this on
499    /// timeout so the surfaced error distinguishes "worker still
500    /// trying — bump deadline" from "worker exited — retry will
501    /// hang." `None` mirrors `accessor_publish_seqno`.
502    accessor_worker_state: Option<Arc<std::sync::atomic::AtomicU8>>,
503    /// Dispatcher wake EventFd. The accessor-init worker pulses
504    /// this fd in lock-step with every seqno bump AND on every
505    /// terminal worker_state transition (FAILED_PERMANENTLY).
506    /// Wait paths in [`Self::wait_for_accessor_publish_advance`] /
507    /// [`Self::wait_for_worker_state_not_trying`] `poll(2)` on the
508    /// fd with the remaining deadline so the wake latency is one
509    /// kernel scheduling tick instead of the 50 ms sleep tail an
510    /// atomic-only loop would carry. Distinct from `accessor_ready_evt`
511    /// (which the coord epoll drains) so the dispatcher and the
512    /// coord don't race for the same wake count.
513    ///
514    /// **Single-consumer assumption.** The fd is read (drained) only
515    /// from the wait paths; the bridge is installed thread-local via
516    /// [`Self::set_thread_local`], and each freeze-coordinator
517    /// instance constructs one bridge → one wake fd. Multiple
518    /// concurrent dispatchers sharing the same fd could race for the
519    /// drain edge (the atomic re-check at the top of each wait loop
520    /// keeps that benign — atomic is source of truth — but a missed
521    /// wake adds latency). Per-VM bridge ownership today guarantees
522    /// no such sharing. `None` mirrors the other accessor-state fields.
523    accessor_dispatcher_wake_evt: Option<Arc<vmm_sys_util::eventfd::EventFd>>,
524}
525
526/// State codes for the accessor-init worker, written to the
527/// `SnapshotBridge::accessor_worker_state` atomic. Two-bit
528/// encoding leaves the high bits open for future sub-states (e.g.
529/// "shutting down").
530pub mod accessor_worker_state {
531    /// Worker is in its retry loop trying to initialize the BPF
532    /// accessor pair. Both initial-attach and post-swap re-init
533    /// path through this state before reaching Succeeded.
534    pub const TRYING: u8 = 0;
535    /// Worker has published at least one accessor pair successfully.
536    /// Subsequent re-inits also publish via this transition (the
537    /// state stays SUCCEEDED while the seqno bumps).
538    pub const SUCCEEDED: u8 = 1;
539    /// Worker has exited and will not publish again — ELF parse
540    /// failure, the 60 s boot-budget deadline, or some other
541    /// terminal condition the worker detected. A dispatcher seeing
542    /// this on a timeout knows to surface "worker exited" rather
543    /// than "still trying."
544    pub const FAILED_PERMANENTLY: u8 = 2;
545}
546
547impl std::fmt::Debug for SnapshotBridge {
548    /// Debug print does NOT show captured reports (their full
549    /// rendering can be hundreds of KB) — only the count and the
550    /// presence of callbacks.
551    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
552        f.debug_struct("SnapshotBridge")
553            .field("snapshots", &self.len())
554            .field("watch_count", &self.watch_count())
555            .field("capture", &"<callback>")
556            .field(
557                "register_watch",
558                &if self.register_watch.is_some() {
559                    "<callback>"
560                } else {
561                    "<none>"
562                },
563            )
564            .finish()
565    }
566}
567
568/// Match the periodic dispatch loop's tag format exactly:
569/// `"periodic_"` + 3 ASCII digits. Source of truth is the
570/// coordinator's `format!("periodic_{:03}", idx)` emission at
571/// `src/vmm/freeze_coord/state.rs`. Distinct from
572/// `tag.starts_with("periodic_")` (which would accept arbitrary
573/// user `Op::CaptureSnapshot` tags whose names happen to share
574/// that prefix and pollute the `periodic_real_count` floor).
575fn is_periodic_tag(tag: &str) -> bool {
576    match tag.strip_prefix("periodic_") {
577        Some(rest) => rest.len() == 3 && rest.bytes().all(|b| b.is_ascii_digit()),
578        None => false,
579    }
580}
581
582impl SnapshotBridge {
583    /// Build a bridge from a capture callback. The callback may
584    /// freeze the VM, build the report, or return `None` when
585    /// capture is unavailable. No watch-register callback —
586    /// `Op::WatchSnapshot` returns "not supported" when the host
587    /// did not wire one. Use [`Self::with_watch_register`] to
588    /// install one.
589    pub fn new(capture: CaptureCallback) -> Self {
590        Self {
591            capture,
592            register_watch: None,
593            kernel_op: None,
594            snapshots: Arc::new(Mutex::new(SnapshotStore::new())),
595            kernel_ops: Arc::new(Mutex::new(Vec::new())),
596            cgroup_procs: Arc::new(Mutex::new(Vec::new())),
597            watch_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
598            accessor_publish_seqno: None,
599            accessor_worker_state: None,
600            accessor_dispatcher_wake_evt: None,
601        }
602    }
603
604    /// Install the accessor-init worker's publish-seqno, worker-state,
605    /// and dispatcher-wake EventFd so
606    /// [`Self::wait_for_accessor_publish_advance`] /
607    /// [`Self::wait_for_worker_state_not_trying`] can block scheduler-
608    /// swap ops until the new scheduler's BPF maps land — with kernel-
609    /// scheduling-tick wake latency rather than the 50 ms sleep tail
610    /// an atomic-only loop would carry. Called by the freeze-coordinator
611    /// when it sets up the worker (`vmm-accessor-init` thread); test
612    /// bridges that don't drive a real worker can omit this call and
613    /// the wait becomes a no-op that returns `Ok(0)` immediately.
614    pub fn with_accessor_state(
615        mut self,
616        publish_seqno: Arc<std::sync::atomic::AtomicU64>,
617        worker_state: Arc<std::sync::atomic::AtomicU8>,
618        dispatcher_wake_evt: Arc<vmm_sys_util::eventfd::EventFd>,
619    ) -> Self {
620        self.accessor_publish_seqno = Some(publish_seqno);
621        self.accessor_worker_state = Some(worker_state);
622        self.accessor_dispatcher_wake_evt = Some(dispatcher_wake_evt);
623        self
624    }
625
626    /// Snapshot of the accessor-init worker's current publish seqno.
627    /// Returns 0 for bridges built without an accessor (test fixtures
628    /// — the dispatch wait sees no advance to gate on and the op
629    /// returns immediately). Read with `Acquire` so the seqno orders
630    /// against the worker's `Release` bump.
631    pub fn accessor_publish_seqno(&self) -> u64 {
632        self.accessor_publish_seqno
633            .as_ref()
634            .map(|s| s.load(std::sync::atomic::Ordering::Acquire))
635            .unwrap_or(0)
636    }
637
638    /// Block until the accessor-init worker exits the TRYING state
639    /// (transitions to SUCCEEDED on its first publish, or to
640    /// FAILED_PERMANENTLY on a terminal worker exit). Used by
641    /// `Op::AttachScheduler` to serialize against the worker's
642    /// concurrent boot-publish: capturing the seqno baseline AFTER
643    /// the boot publish completes is the only way to ensure the
644    /// next observed seqno advance belongs to the just-attached
645    /// scheduler rather than to a co-resident boot scheduler that
646    /// finished its 60 s init in parallel. Event-driven via
647    /// `accessor_dispatcher_wake_evt` — wake latency is one
648    /// kernel scheduling tick, not a 50 ms poll. No-op (Ok(())) for
649    /// bridges without an accessor.
650    pub fn wait_for_worker_state_not_trying(
651        &self,
652        deadline: std::time::Instant,
653        op_label: &str,
654    ) -> anyhow::Result<()> {
655        let Some(state) = self.accessor_worker_state.as_ref() else {
656            return Ok(());
657        };
658        loop {
659            let cur = state.load(std::sync::atomic::Ordering::Acquire);
660            if cur != accessor_worker_state::TRYING {
661                if cur == accessor_worker_state::FAILED_PERMANENTLY {
662                    anyhow::bail!(
663                        "{op_label}: accessor-init worker exited \
664                         (FAILED_PERMANENTLY) before this op could run — \
665                         check freeze-coord 'vmm-accessor-init' logs for ELF \
666                         parse or boot-budget failures"
667                    );
668                }
669                return Ok(());
670            }
671            let now = std::time::Instant::now();
672            if now >= deadline {
673                anyhow::bail!(
674                    "{op_label}: accessor-init worker stayed in TRYING state \
675                     past the deadline — the worker's first publish has not \
676                     completed. Likely cause: kernel boot stalled before \
677                     accessor's bootstrap symbols became readable. Check \
678                     freeze-coord logs for `accessor-init:` lines"
679                );
680            }
681            let remaining = deadline.saturating_duration_since(now);
682            self.poll_dispatcher_wake(remaining);
683            // Loop back to re-check state. The atomic is the source
684            // of truth — the eventfd is just a fast wake; a spurious
685            // wake (e.g. a publish landing while we're already past
686            // TRYING) drops through to the next state check.
687        }
688    }
689
690    /// Block on the dispatcher-wake EventFd for up to `remaining`.
691    /// Caller's atomic re-check after return is the source of truth.
692    /// Panics if the with_accessor_state coupling invariant is
693    /// violated (see body's unreachable! arm).
694    ///
695    /// Uses raw libc::poll rather than nix::poll::poll because the
696    /// underlying [`vmm_sys_util::eventfd::EventFd`] doesn't impl
697    /// `AsFd`, only `AsRawFd`; the cleaner nix path would require
698    /// an unsafe `BorrowedFd::borrow_raw` wrapper, leaving the
699    /// unsafe surface identical to libc::poll. No net reduction —
700    /// keep the simpler libc call.
701    fn poll_dispatcher_wake(&self, remaining: std::time::Duration) {
702        if remaining.is_zero() {
703            return;
704        }
705        let Some(evt) = self.accessor_dispatcher_wake_evt.as_ref() else {
706            unreachable!(
707                "poll_dispatcher_wake reached without an installed wake fd — \
708                 SnapshotBridge::with_accessor_state stores accessor_worker_state \
709                 and accessor_dispatcher_wake_evt together, so any caller that \
710                 passed the worker_state gate must also have the wake fd. A None \
711                 here indicates a bridge constructor that violated the coupling \
712                 invariant"
713            );
714        };
715        let ms = remaining.as_millis().min(i32::MAX as u128) as i32;
716        let fd = {
717            use std::os::unix::io::AsRawFd;
718            evt.as_raw_fd()
719        };
720        let mut pfd = libc::pollfd {
721            fd,
722            events: libc::POLLIN,
723            revents: 0,
724        };
725        // SAFETY: pollfd is a POD struct; libc::poll reads/writes
726        // the slice in-bounds for nfds == 1.
727        unsafe {
728            libc::poll(&mut pfd, 1, ms);
729        }
730        // Drain the counter so a subsequent poll doesn't re-fire
731        // immediately on the same edge. EFD_NONBLOCK so an empty fd
732        // returns EAGAIN — ignore the result.
733        let _ = evt.read();
734    }
735
736    /// Block until the accessor-init worker's publish seqno advances
737    /// past `seqno_before`, or until `deadline` elapses. The dispatch
738    /// for `Op::ReplaceScheduler` / `Op::AttachScheduler` calls this
739    /// after spawning the new scheduler so the op only returns success
740    /// once the new scheduler's BPF accessor pair has been re-published
741    /// and a subsequent `Snapshot::active()` reflects the swap.
742    ///
743    /// On timeout the surfaced error reads the worker-state sentinel
744    /// to distinguish "still trying" (transient — operator can bump
745    /// the deadline) from "worker exited" (terminal — retry will
746    /// hang the same way). Bridges without an accessor (test fixtures
747    /// that omit [`Self::with_accessor_state`]) succeed immediately
748    /// with `Ok(0)` so unit-test scenarios don't synthesize a worker.
749    pub fn wait_for_accessor_publish_advance(
750        &self,
751        seqno_before: u64,
752        deadline: std::time::Instant,
753        op_label: &str,
754    ) -> anyhow::Result<u64> {
755        let (seqno, state) = match (
756            self.accessor_publish_seqno.as_ref(),
757            self.accessor_worker_state.as_ref(),
758        ) {
759            (Some(s), Some(w)) => (s, w),
760            _ => return Ok(0),
761        };
762        // Tight initial check so a publish that already landed
763        // between the dispatcher's pre-spawn `accessor_publish_seqno()`
764        // load and this call returns without sleeping.
765        let cur = seqno.load(std::sync::atomic::Ordering::Acquire);
766        if cur > seqno_before {
767            return Ok(cur);
768        }
769        loop {
770            let cur_state = state.load(std::sync::atomic::Ordering::Acquire);
771            if cur_state == accessor_worker_state::FAILED_PERMANENTLY {
772                anyhow::bail!(
773                    "{op_label}: accessor-init worker exited (FAILED_PERMANENTLY) — \
774                     check freeze-coord 'vmm-accessor-init' logs for ELF parse or \
775                     boot-budget failures; retrying the op will hit the same wall"
776                );
777            }
778            let cur = seqno.load(std::sync::atomic::Ordering::Acquire);
779            if cur > seqno_before {
780                return Ok(cur);
781            }
782            let now = std::time::Instant::now();
783            if now >= deadline {
784                let remaining_state = state.load(std::sync::atomic::Ordering::Acquire);
785                anyhow::bail!(
786                    "{op_label}: accessor reinit did not advance publish seqno \
787                     from {seqno_before} within deadline (worker state = \
788                     {remaining_state}; 0=Trying / 1=Succeeded / 2=FailedPermanently). \
789                     A reinit that's stuck in Trying past the deadline indicates the \
790                     coord's scan-tick hasn't observed the rebind or the worker's \
791                     `from_elf_with_hint` retry is hitting a transient address-space \
792                     window — check freeze-coord logs for `accessor-init:` lines"
793                );
794            }
795            // Event-driven via accessor_dispatcher_wake_evt: the
796            // worker writes the fd in lock-step with each seqno
797            // bump and on FAILED_PERMANENTLY exit, so poll wakes at
798            // kernel-scheduling-tick latency. Distinct from the
799            // accessor_ready_evt the coord drains (no second-
800            // consumer race). Reaching poll_dispatcher_wake without
801            // an installed wake fd panics — the `(Some, Some)` match
802            // / `return Ok(0)` at the top of this function gates this
803            // branch on both accessor_publish_seqno and
804            // accessor_worker_state being present, and
805            // with_accessor_state stores both together.
806            let remaining = deadline.saturating_duration_since(now);
807            self.poll_dispatcher_wake(remaining);
808        }
809    }
810
811    /// Install a kernel-op callback so
812    /// `Op::WriteKernel{Hot,Cold}` / `Op::ReadKernel{Hot,Cold}` ops
813    /// can dispatch host-side guest-memory writes/reads. Without
814    /// one installed, the in-process executor returns "no
815    /// SnapshotBridge installed" and the ops fall through to the
816    /// virtio-console wire path. Test fixtures use this seam to
817    /// record requests and assert on them without touching real
818    /// guest memory.
819    pub fn with_kernel_op(mut self, callback: KernelOpCallback) -> Self {
820        self.kernel_op = Some(callback);
821        self
822    }
823
824    /// Dispatch a kernel-op request through the installed callback.
825    /// Returns `None` when no callback is installed (the executor
826    /// then falls through to the wire path); returns
827    /// `Some(KernelOpReplyPayload)` otherwise and records the
828    /// reply in the per-tag drain log.
829    pub fn dispatch_kernel_op(
830        &self,
831        request: &crate::vmm::wire::KernelOpRequestPayload,
832    ) -> Option<crate::vmm::wire::KernelOpReplyPayload> {
833        let callback = self.kernel_op.as_ref()?;
834        let reply = callback(request);
835        self.kernel_ops
836            .lock_unpoisoned()
837            .push((request.tag.clone(), reply.clone()));
838        Some(reply)
839    }
840
841    /// Drain the per-tag kernel-op reply log. Returns the accumulated
842    /// `(tag, reply)` pairs in insertion order; leaves the bridge's
843    /// own copy empty so subsequent calls see only newer entries.
844    pub fn drain_kernel_ops(&self) -> Vec<(String, crate::vmm::wire::KernelOpReplyPayload)> {
845        std::mem::take(&mut *self.kernel_ops.lock_unpoisoned())
846    }
847
848    /// Record a `cgroup.procs` snapshot captured by
849    /// [`Op::CaptureCgroupProcs`](crate::scenario::ops::Op::CaptureCgroupProcs).
850    ///
851    /// Appends to the per-bridge log in insertion order. Multiple
852    /// captures of the same `cgroup` (distinguished by `tag`)
853    /// surface as separate entries — the bridge does NOT collapse
854    /// or overwrite on duplicate cgroup names, which lets a scenario
855    /// capture pre/post snapshots of the same cgroup under different
856    /// tags. Multiple captures with the same `(tag, cgroup)` also
857    /// append rather than overwrite; tag uniqueness is a caller
858    /// convention, not a framework-enforced contract.
859    pub fn record_cgroup_procs(&self, tag: String, cgroup: String, pids: Vec<libc::pid_t>) {
860        self.cgroup_procs
861            .lock_unpoisoned()
862            .push(CgroupProcsSnapshot::new(tag, cgroup, pids));
863    }
864
865    /// Drain the per-bridge `cgroup.procs` snapshot log. Returns the
866    /// accumulated [`CgroupProcsSnapshot`]s in insertion order; leaves
867    /// the bridge's own copy empty so subsequent calls observe only
868    /// newer entries. Mirrors [`Self::drain_kernel_ops`]' contract.
869    pub fn drain_cgroup_procs(&self) -> Vec<CgroupProcsSnapshot> {
870        std::mem::take(&mut *self.cgroup_procs.lock_unpoisoned())
871    }
872
873    /// Look up the first `cgroup.procs` snapshot recorded under
874    /// `tag` without consuming the drain log. Returns `None` when no
875    /// snapshot was recorded under that tag (the typical case is a
876    /// typo, a missing capture op, or capture-before-drain ordering
877    /// mistake). For the common single-capture-per-tag pattern this
878    /// collapses the `drain_cgroup_procs().iter().find(|s| s.tag ==
879    /// tag).expect("missing")` 4-combinator dance into a one-liner.
880    ///
881    /// **Clone cost.** Each `CgroupProcsSnapshot` carries `tag`,
882    /// `cgroup`, and `pids: Vec<libc::pid_t>` — the pids vec is
883    /// bounded by the number of tasks in the cgroup at capture
884    /// time (kilobytes at most for realistic test scenarios).
885    /// Mirrors [`Self::kernel_op_value`]'s non-draining-lookup
886    /// shape for sibling-API consistency.
887    ///
888    /// **Duplicate tag.** Returns the FIRST snapshot recorded under
889    /// `tag`. Multiple captures with the same `(tag, cgroup)` are
890    /// stored separately (see [`Self::record_cgroup_procs`]); callers
891    /// who care about the multiplicity should use
892    /// [`Self::drain_cgroup_procs`] and filter the Vec manually.
893    pub fn cgroup_procs_by_tag(&self, tag: &str) -> Option<CgroupProcsSnapshot> {
894        self.cgroup_procs
895            .lock_unpoisoned()
896            .iter()
897            .find(|s| s.tag == tag)
898            .cloned()
899    }
900
901    /// Record a kernel-op reply produced by the host-side wire-path
902    /// dispatcher into the same per-tag drain log that
903    /// [`Self::dispatch_kernel_op`] populates.
904    ///
905    /// The in-process bridge path stores its replies inside
906    /// `dispatch_kernel_op` (which both invokes the callback AND
907    /// pushes the reply into the log). The wire-path used by ops
908    /// running inside a guest VM produces its reply on the host
909    /// freeze-coordinator and frames it back over virtio-console
910    /// port 1 — there is no callback to drive the push, so the
911    /// host coordinator calls this record-only method directly
912    /// after framing each reply. Without this hook, `post_vm`
913    /// callbacks observing [`crate::vmm::VmResult::snapshot_bridge`]
914    /// would see an empty drain log for any guest-side
915    /// `Op::WriteKernel*` / `Op::ReadKernel*` invocation, defeating
916    /// the asserts-from-replies pattern the gated cold-path e2e
917    /// scaffolding pins.
918    pub fn record_kernel_op_reply(
919        &self,
920        tag: String,
921        reply: crate::vmm::wire::KernelOpReplyPayload,
922    ) {
923        self.kernel_ops.lock_unpoisoned().push((tag, reply));
924    }
925
926    /// Look up the first kernel-op reply value recorded under `tag`
927    /// in the kernel-op drain log without consuming the log.
928    ///
929    /// The bulk shape returned by [`Self::drain_kernel_ops`] is
930    /// `Vec<(tag, reply)>` with each reply carrying a
931    /// `Vec<crate::vmm::wire::KernelOpValue>`. For the common
932    /// single-tag single-value read-back lookup, this helper
933    /// collapses the 4-layer unwrap (find by tag → check success →
934    /// index into read_values → match the variant) into a single
935    /// call. Returns `None` when no reply was recorded under `tag`,
936    /// when the reply reported `success = false`, or when the
937    /// reply's `read_values` is empty (e.g. a write-op reply under
938    /// the same tag). Otherwise returns `Some(value)` with the
939    /// first `KernelOpValue` of the first matching reply.
940    ///
941    /// The log is NOT drained — the caller can still inspect via
942    /// [`Self::drain_kernel_ops`] to observe the full per-tag
943    /// history.
944    ///
945    /// **Clone cost.** For `U32` / `U64` the clone is 4 / 8 bytes.
946    /// For `crate::vmm::wire::KernelOpValue::Bytes` the clone
947    /// can be up to `crate::vmm::wire::KERNEL_OP_REPLY_MAX`
948    /// (1 MiB). Hot paths that repeatedly inspect the same tag
949    /// should prefer [`Self::drain_kernel_ops`] + index into the
950    /// returned Vec to avoid the per-call clone.
951    pub fn kernel_op_value(&self, tag: &str) -> Option<crate::vmm::wire::KernelOpValue> {
952        self.kernel_ops
953            .lock_unpoisoned()
954            .iter()
955            .find(|(t, reply)| t == tag && reply.success && !reply.read_values.is_empty())
956            .map(|(_, reply)| reply.read_values[0].clone())
957    }
958
959    /// Install a watch-register callback so [`Op::WatchSnapshot`](crate::scenario::ops::Op::WatchSnapshot)
960    /// ops can attach hardware-watchpoint snapshots. The callback
961    /// is responsible for symbol resolution, watchpoint slot allocation, and
962    /// `KVM_SET_GUEST_DEBUG` arming.
963    pub fn with_watch_register(mut self, register: WatchRegisterCallback) -> Self {
964        self.register_watch = Some(register);
965        self
966    }
967
968    /// Register a hardware-watchpoint snapshot for `symbol`.
969    ///
970    /// Enforces the per-scenario [`MAX_WATCH_SNAPSHOTS`] cap before
971    /// invoking the host's watch-register callback. Returns
972    /// `Err(reason)` when:
973    /// - The cap has been reached (slot 0 reserved + 3 user slots
974    ///   allocated).
975    /// - No watch-register callback was installed via
976    ///   [`Self::with_watch_register`].
977    /// - The host's callback rejected the request (symbol unresolved,
978    ///   alignment violation, ioctl failure).
979    pub fn register_watch(&self, symbol: &str) -> std::result::Result<(), String> {
980        // Reserve a slot via compare_exchange so concurrent callers
981        // can never push the count past MAX_WATCH_SNAPSHOTS even
982        // transiently. The previous fetch_add+rollback path let two
983        // concurrent threads observe `prev < MAX` and increment past
984        // the cap before either rolled back, briefly violating the
985        // invariant `watch_count <= MAX_WATCH_SNAPSHOTS`.
986        loop {
987            let prev = self.watch_count.load(std::sync::atomic::Ordering::Relaxed);
988            if prev >= MAX_WATCH_SNAPSHOTS {
989                return Err(format!(
990                    "Op::WatchSnapshot cap exceeded: scenario already registered \
991                     {MAX_WATCH_SNAPSHOTS} watchpoints ({MAX_WATCH_SNAPSHOTS} user \
992                     watchpoint slots occupied; slot 0 reserved for the error-class \
993                     exit_kind trigger). Drop a watch or use Op::CaptureSnapshot for a \
994                     time-driven capture instead."
995                ));
996            }
997            if self
998                .watch_count
999                .compare_exchange_weak(
1000                    prev,
1001                    prev + 1,
1002                    std::sync::atomic::Ordering::Relaxed,
1003                    std::sync::atomic::Ordering::Relaxed,
1004                )
1005                .is_ok()
1006            {
1007                break;
1008            }
1009            // Lost the CAS to a concurrent register/unregister; reload
1010            // and retry. spurious failures are also retried — that is
1011            // why this uses the _weak variant inside a loop.
1012        }
1013        // Slot reserved. Wrap it in a Drop guard so a panic inside
1014        // `register(symbol)` releases the reservation on unwind — the
1015        // previous manual-fetch_sub rollback only ran on the explicit
1016        // Err(reason) arm, leaking the slot permanently if the
1017        // callback panicked. The success path commits the slot with
1018        // mem::forget after register returns Ok.
1019        let guard = WatchSlotGuard {
1020            count: &self.watch_count,
1021        };
1022        let Some(register) = self.register_watch.as_ref() else {
1023            drop(guard);
1024            return Err(format!(
1025                "Op::WatchSnapshot('{symbol}'): no watch-register callback installed \
1026                 on this SnapshotBridge — the host wires one via \
1027                 SnapshotBridge::with_watch_register before execute_steps; \
1028                 in-guest / no-VM scenarios cannot register hardware watchpoints"
1029            ));
1030        };
1031        register(symbol)?;
1032        std::mem::forget(guard);
1033        Ok(())
1034    }
1035
1036    /// Number of watchpoint snapshots currently registered.
1037    pub fn watch_count(&self) -> usize {
1038        self.watch_count.load(std::sync::atomic::Ordering::Relaxed)
1039    }
1040
1041    /// Drive the capture closure and store the result under `name`.
1042    /// Returns `true` when a report was captured and stored;
1043    /// `false` when the closure returned `None`.
1044    pub fn capture(&self, name: &str) -> bool {
1045        let Some(report) = (self.capture)(name) else {
1046            tracing::warn!(
1047                name,
1048                "SnapshotBridge::capture: capture callback returned None — snapshot unavailable"
1049            );
1050            self.snapshots
1051                .lock_unpoisoned()
1052                .push_event(SnapshotBridgeEvent::CaptureUnavailable {
1053                    tag: name.to_string(),
1054                });
1055            return false;
1056        };
1057        self.store(name, report);
1058        true
1059    }
1060
1061    /// Step-aware variant of [`Self::capture`]: drives the capture
1062    /// closure and stores the result under `name`, stamping it with
1063    /// `step_index` so the drained entry's
1064    /// [`super::error::DrainedSnapshotEntry::step_index`] surfaces
1065    /// the scenario phase the capture belongs to.
1066    ///
1067    /// `step_index` is encoded per the 1-indexed phase convention:
1068    /// `0` is the BASELINE settle window, `1..=N` align with
1069    /// scenario Step ordinals. The host-side on-demand-capture
1070    /// dispatch reads
1071    /// [`crate::scenario::Ctx::current_step`] just before the
1072    /// freeze rendezvous and passes the loaded value through this
1073    /// entry point so the downstream phase aggregator can bucket
1074    /// the sample directly.
1075    ///
1076    /// Returns `true` when a report was captured and stored;
1077    /// `false` when the closure returned `None`.
1078    pub fn capture_with_step(&self, name: &str, step_index: u16) -> bool {
1079        let Some(report) = (self.capture)(name) else {
1080            tracing::warn!(
1081                name,
1082                step_index,
1083                "SnapshotBridge::capture_with_step: capture callback returned None — snapshot unavailable"
1084            );
1085            self.snapshots
1086                .lock_unpoisoned()
1087                .push_event(SnapshotBridgeEvent::CaptureUnavailable {
1088                    tag: name.to_string(),
1089                });
1090            return false;
1091        };
1092        self.store_internal(name, report, None, None, None, Some(step_index));
1093        true
1094    }
1095
1096    /// Store a pre-built [`FailureDumpReport`] under `name`,
1097    /// bypassing the capture callback. Used by the host-side freeze
1098    /// coordinator after it runs `freeze_and_dispatch(FreezeMode::Capture { gate_on_exit_kind: false })` and
1099    /// wants to publish the resulting report on the bridge for the
1100    /// test author to drain post-VM-exit.
1101    ///
1102    /// Storage is capped at [`MAX_STORED_SNAPSHOTS`] entries to bound
1103    /// host memory under runaway capture cadence (e.g. a Loop step
1104    /// firing `Op::CaptureSnapshot` with a unique tag every iteration).
1105    /// When the cap is reached, the oldest stored entry is evicted
1106    /// with a `tracing::warn!` naming the dropped tag. An overwrite
1107    /// of an existing tag also warns and replaces the prior report
1108    /// in place without disturbing FIFO ordering of other entries.
1109    pub fn store(&self, name: &str, report: FailureDumpReport) {
1110        self.store_internal(name, report, None, None, None, None);
1111    }
1112
1113    /// Bundle a [`FailureDumpReport`] with the scx_stats JSON and
1114    /// elapsed-millisecond timestamp captured at the same periodic
1115    /// boundary. Used by the freeze coordinator's periodic-fire path
1116    /// so [`Sample`](crate::scenario::sample::Sample) can pair the
1117    /// frozen BPF state with the running-scheduler stats observed
1118    /// just before the freeze rendezvous.
1119    ///
1120    /// Stats / elapsed are stored in parallel HashMaps keyed by the
1121    /// same tag as the report. FIFO eviction sweeps all three in
1122    /// lock-step; an overwrite refreshes order and replaces every
1123    /// parallel value (or clears it when the new write passes
1124    /// `None`) so a stale stats / elapsed entry can never accompany
1125    /// a freshly stored report.
1126    pub fn store_with_stats(
1127        &self,
1128        name: &str,
1129        report: FailureDumpReport,
1130        stats: Option<Result<serde_json::Value, super::error::MissingStatsReason>>,
1131        elapsed_ms: Option<u64>,
1132    ) {
1133        self.store_internal(name, report, stats, elapsed_ms, None, None);
1134    }
1135
1136    /// Step-aware variant of [`Self::store_with_stats`]: bundles the
1137    /// scenario phase index alongside the report / stats / elapsed
1138    /// tuple so the drained entry's
1139    /// [`super::error::DrainedSnapshotEntry::step_index`] carries
1140    /// the phase the capture belongs to. The freeze coordinator's
1141    /// periodic-fire path reads
1142    /// [`crate::scenario::Ctx::current_step`] just before the
1143    /// rendezvous and routes the value through this method so each
1144    /// periodic sample is bucketable per phase without a second
1145    /// lookup.
1146    ///
1147    /// `step_index` is encoded per the 1-indexed phase convention
1148    /// — `0` is the BASELINE settle window, `1..=N` align with
1149    /// scenario Step ordinals. All other arguments match
1150    /// [`Self::store_with_stats`] verbatim.
1151    pub fn store_with_stats_and_step(
1152        &self,
1153        name: &str,
1154        report: FailureDumpReport,
1155        stats: Option<Result<serde_json::Value, super::error::MissingStatsReason>>,
1156        elapsed_ms: Option<u64>,
1157        boundary_offset_ms: Option<u64>,
1158        step_index: u16,
1159    ) {
1160        self.store_internal(
1161            name,
1162            report,
1163            stats,
1164            elapsed_ms,
1165            boundary_offset_ms,
1166            Some(step_index),
1167        );
1168    }
1169
1170    fn store_internal(
1171        &self,
1172        name: &str,
1173        report: FailureDumpReport,
1174        stats: Option<Result<serde_json::Value, super::error::MissingStatsReason>>,
1175        elapsed_ms: Option<u64>,
1176        boundary_offset_ms: Option<u64>,
1177        step_index: Option<u16>,
1178    ) {
1179        let mut store = self.snapshots.lock_unpoisoned();
1180        if let Some(existing) = store.reports.insert(name.to_string(), report) {
1181            tracing::warn!(
1182                name,
1183                schema = %existing.schema,
1184                "SnapshotBridge::store: name already had a stored report; overwriting prior capture"
1185            );
1186            store.push_event(SnapshotBridgeEvent::Overwrite {
1187                tag: name.to_string(),
1188                prior_schema: existing.schema.clone(),
1189            });
1190            // Move this tag to the back of the FIFO order so the
1191            // overwrite refreshes its position (newest insertion =
1192            // farthest from eviction). Without this, a hot-rewritten
1193            // tag would still be the oldest and risk eviction even
1194            // when actively updated.
1195            if let Some(pos) = store.order.iter().position(|k| k == name) {
1196                store.order.remove(pos);
1197            }
1198            store.order.push_back(name.to_string());
1199            // Refresh / clear parallel stats and elapsed entries so
1200            // the post-overwrite `(report, stats, elapsed)` tuple is
1201            // self-consistent — a None overwrite must clear the prior
1202            // value rather than carrying forward a stale match from
1203            // an earlier capture.
1204            match stats {
1205                Some(v) => {
1206                    store.stats.insert(name.to_string(), v);
1207                }
1208                None => {
1209                    store.stats.remove(name);
1210                }
1211            }
1212            match elapsed_ms {
1213                Some(v) => {
1214                    store.elapsed_ms.insert(name.to_string(), v);
1215                }
1216                None => {
1217                    store.elapsed_ms.remove(name);
1218                }
1219            }
1220            match boundary_offset_ms {
1221                Some(v) => {
1222                    store.boundary_offset_ms.insert(name.to_string(), v);
1223                }
1224                None => {
1225                    store.boundary_offset_ms.remove(name);
1226                }
1227            }
1228            match step_index {
1229                Some(v) => {
1230                    store.step_index.insert(name.to_string(), v);
1231                }
1232                None => {
1233                    store.step_index.remove(name);
1234                }
1235            }
1236            return;
1237        }
1238        store.order.push_back(name.to_string());
1239        if let Some(v) = stats {
1240            store.stats.insert(name.to_string(), v);
1241        }
1242        if let Some(v) = elapsed_ms {
1243            store.elapsed_ms.insert(name.to_string(), v);
1244        }
1245        if let Some(v) = boundary_offset_ms {
1246            store.boundary_offset_ms.insert(name.to_string(), v);
1247        }
1248        if let Some(v) = step_index {
1249            store.step_index.insert(name.to_string(), v);
1250        }
1251        while store.reports.len() > MAX_STORED_SNAPSHOTS {
1252            let Some(evicted) = store.order.pop_front() else {
1253                // Defensive: if order is empty while reports is over
1254                // cap something is desynchronised — clear reports to
1255                // restore the invariant rather than loop forever.
1256                let nuked = store.reports.len();
1257                tracing::warn!(
1258                    reports_len = nuked,
1259                    cap = MAX_STORED_SNAPSHOTS,
1260                    "SnapshotBridge::store: order empty while reports over cap — bulk-clearing to restore invariant"
1261                );
1262                store.push_event(SnapshotBridgeEvent::CapInvariantViolation {
1263                    reports_len: nuked,
1264                    cap: MAX_STORED_SNAPSHOTS,
1265                });
1266                store.reports.clear();
1267                store.stats.clear();
1268                store.elapsed_ms.clear();
1269                store.boundary_offset_ms.clear();
1270                store.step_index.clear();
1271                break;
1272            };
1273            if store.reports.remove(&evicted).is_some() {
1274                tracing::warn!(
1275                    evicted = %evicted,
1276                    cap = MAX_STORED_SNAPSHOTS,
1277                    "SnapshotBridge::store: cap reached, evicting oldest captured snapshot"
1278                );
1279                store.push_event(SnapshotBridgeEvent::Eviction {
1280                    evicted_tag: evicted.clone(),
1281                    new_tag: name.to_string(),
1282                    cap: MAX_STORED_SNAPSHOTS,
1283                });
1284            }
1285            // Sweep the parallel maps in lock-step so a stranded
1286            // stats / elapsed / boundary_offset / step_index entry
1287            // cannot outlive its report.
1288            store.stats.remove(&evicted);
1289            store.elapsed_ms.remove(&evicted);
1290            store.boundary_offset_ms.remove(&evicted);
1291            store.step_index.remove(&evicted);
1292        }
1293    }
1294
1295    /// Snapshot count for diagnostic logging.
1296    pub fn len(&self) -> usize {
1297        self.snapshots.lock_unpoisoned().reports.len()
1298    }
1299
1300    /// True when no snapshots have been captured.
1301    pub fn is_empty(&self) -> bool {
1302        self.snapshots.lock_unpoisoned().reports.is_empty()
1303    }
1304
1305    /// Count of stored periodic-tagged reports that carry REAL BPF
1306    /// state (not placeholders synthesized by rendezvous timeouts /
1307    /// gate suppression). Distinct from
1308    /// [`crate::vmm::VmResult::periodic_fired`], which counts every
1309    /// periodic boundary the freeze coordinator attempted —
1310    /// including the ones that landed only a placeholder when the
1311    /// vCPU rendezvous timed out.
1312    ///
1313    /// This is the "useful data produced" floor: a scheduler that
1314    /// attached but produced nothing but placeholders surfaces as
1315    /// `periodic_real_count() == 0` here even though
1316    /// `periodic_fired` may be `target`. Tests that want a
1317    /// stricter smoke-floor than `periodic_fired >= 1` (which
1318    /// passes on placeholder-only fills) read this query.
1319    ///
1320    /// **Tag-format pin.** The coordinator emits periodic tags via
1321    /// `format!("periodic_{:03}", idx)` at
1322    /// `src/vmm/freeze_coord/state.rs` — always
1323    /// `"periodic_"` + exactly 3 ASCII digits. The match here
1324    /// enforces that exact shape (NOT a loose prefix) so a user
1325    /// `Op::CaptureSnapshot { name: "periodic_kaslr" }` cannot
1326    /// collide with the periodic dispatch namespace and pollute
1327    /// the count.
1328    ///
1329    /// **What "real" measures.** A placeholder report may still
1330    /// carry `vcpu_regs` from a degraded capture (see
1331    /// `src/vmm/freeze_coord/mod.rs` periodic-degraded path —
1332    /// `degraded.vcpu_regs` is preserved into the stored
1333    /// placeholder). The floor here treats those as "not real"
1334    /// because the contract is "the test produced BPF-state data"
1335    /// — vcpu_regs alone don't satisfy that. Tests that want the
1336    /// looser "any capture-attempt landed" floor read
1337    /// [`crate::vmm::VmResult::periodic_fired`] instead.
1338    pub fn periodic_real_count(&self) -> u32 {
1339        let store = self.snapshots.lock_unpoisoned();
1340        let mut n: u32 = 0;
1341        for (tag, report) in &store.reports {
1342            if is_periodic_tag(tag) && !report.is_placeholder {
1343                n = n.saturating_add(1);
1344            }
1345        }
1346        n
1347    }
1348
1349    /// True when a stored report already exists for `name`. Lets the
1350    /// freeze coordinator's final-drain placeholder path skip storing
1351    /// a degraded "coord exited before capture" report on top of a
1352    /// real capture that the in-loop dispatch landed earlier — without
1353    /// this gate, a vCPU thread that re-armed `hit=true` after the
1354    /// in-loop service successfully published the report would have
1355    /// its tag's stored capture overwritten by the placeholder at
1356    /// teardown, presenting tests with a hollow snapshot in place of
1357    /// the real one.
1358    pub fn has(&self, name: &str) -> bool {
1359        self.snapshots.lock_unpoisoned().reports.contains_key(name)
1360    }
1361
1362    /// Take ownership of the captured snapshots, leaving the bridge
1363    /// empty. Drops any periodic-capture stats / elapsed / boundary-
1364    /// offset metadata stored alongside reports — callers that need
1365    /// the stats JSON or per-sample timestamp must use
1366    /// [`Self::drain_ordered_with_stats`] instead.
1367    pub fn drain(&self) -> HashMap<String, FailureDumpReport> {
1368        let mut store = self.snapshots.lock_unpoisoned();
1369        store.order.clear();
1370        store.stats.clear();
1371        store.elapsed_ms.clear();
1372        store.boundary_offset_ms.clear();
1373        store.step_index.clear();
1374        std::mem::take(&mut store.reports)
1375    }
1376
1377    /// Take ownership of the captured snapshots in insertion order,
1378    /// leaving the bridge empty. The returned `Vec` walks
1379    /// `SnapshotStore::order` (the FIFO key list maintained by
1380    /// [`Self::store`]) so periodic captures — whose ordering IS the
1381    /// signal — are returned `periodic_000` first, `periodic_NNN`
1382    /// last. [`Self::drain`] returns a `HashMap` and loses ordering;
1383    /// use this method when ordering matters.
1384    ///
1385    /// An overwrite of an existing tag (the `if let Some(existing) =
1386    /// store.reports.insert(...)` branch in [`Self::store`]) moves
1387    /// the tag to the back of the FIFO — `drain_ordered` therefore
1388    /// returns the LATEST capture under each tag exactly once, in
1389    /// the order of its most-recent insertion.
1390    ///
1391    /// FIFO eviction at [`MAX_STORED_SNAPSHOTS`] drops the oldest
1392    /// tags from `order` AND `reports` together, so a hot run that
1393    /// fired more than the cap returns the most recent
1394    /// [`MAX_STORED_SNAPSHOTS`] captures in insertion order; older
1395    /// captures are gone and [`Self::store`] already logged the
1396    /// eviction.
1397    pub fn drain_ordered(&self) -> Vec<(String, FailureDumpReport)> {
1398        let mut store = self.snapshots.lock_unpoisoned();
1399        let order = std::mem::take(&mut store.order);
1400        let mut reports = std::mem::take(&mut store.reports);
1401        // Stats / elapsed / boundary_offset / step_index are dropped
1402        // with the bridge — callers that need the parallel data must
1403        // use `drain_ordered_with_stats` instead.
1404        store.stats.clear();
1405        store.elapsed_ms.clear();
1406        store.boundary_offset_ms.clear();
1407        store.step_index.clear();
1408        let mut out: Vec<(String, FailureDumpReport)> = Vec::with_capacity(order.len());
1409        for tag in order {
1410            if let Some(report) = reports.remove(&tag) {
1411                out.push((tag, report));
1412            }
1413        }
1414        // Defensive: if any reports remained outside the order Vec
1415        // (an invariant violation that would only fire if a future
1416        // refactor of `store()` desynchronised the two), surface
1417        // them at the tail rather than dropping silently. Their
1418        // relative order is HashMap-iteration-arbitrary but at
1419        // least nothing is lost.
1420        for (tag, report) in reports {
1421            tracing::warn!(
1422                tag,
1423                "SnapshotBridge::drain_ordered: report present in `reports` \
1424                 but missing from `order` — surfacing at tail (FIFO \
1425                 invariant violation; please file)"
1426            );
1427            store.push_event(SnapshotBridgeEvent::DrainOrderingInvariantViolation {
1428                tag: tag.clone(),
1429                drain_variant: "drain_ordered",
1430            });
1431            out.push((tag, report));
1432        }
1433        out
1434    }
1435
1436    /// Take ownership of the captured snapshots in insertion order
1437    /// along with the parallel scx_stats JSON and per-sample
1438    /// elapsed-ms timestamps (`None` per slot when the tag was
1439    /// captured outside the periodic-capture path or when the stats
1440    /// request failed). Empties the bridge — every parallel map is
1441    /// drained in lock-step so a follow-up call returns an empty
1442    /// vec.
1443    ///
1444    /// The returned tuple shape `(tag, report, stats, elapsed_ms)`
1445    /// is the input to
1446    /// [`SampleSeries::from_drained`](crate::scenario::sample::SampleSeries::from_drained):
1447    /// the bridge owns the raw drainable shape, the higher-level
1448    /// `SampleSeries` view consumes it. Insertion order is the
1449    /// signal — periodic captures land
1450    /// `periodic_000`/`periodic_001`/… in monotonic wall-clock
1451    /// order, and the temporal-assertion patterns walk the vec
1452    /// expecting that ordering.
1453    pub fn drain_ordered_with_stats(&self) -> Vec<super::error::DrainedSnapshotEntry> {
1454        let mut store = self.snapshots.lock_unpoisoned();
1455        let order = std::mem::take(&mut store.order);
1456        let mut reports = std::mem::take(&mut store.reports);
1457        let mut stats = std::mem::take(&mut store.stats);
1458        let mut elapsed = std::mem::take(&mut store.elapsed_ms);
1459        let mut boundary_offset = std::mem::take(&mut store.boundary_offset_ms);
1460        let mut step_index = std::mem::take(&mut store.step_index);
1461        let mut out: Vec<super::error::DrainedSnapshotEntry> = Vec::with_capacity(order.len());
1462        // Bridge-absent stats slot collapses to the typed
1463        // `NoSchedulerBinary` reason: the capture path that produced
1464        // this tag never bundled a stats Result (non-periodic Op
1465        // capture, or periodic without a stats client wired). The
1466        // periodic path always bundles a Some(Result), so a `None`
1467        // here is always the "no scheduler binary" case.
1468        let stats_fallback = || Err(super::error::MissingStatsReason::NoSchedulerBinary);
1469        for tag in order {
1470            if let Some(report) = reports.remove(&tag) {
1471                let s = stats.remove(&tag).unwrap_or_else(stats_fallback);
1472                let e = elapsed.remove(&tag);
1473                let bo = boundary_offset.remove(&tag);
1474                let phase = step_index.remove(&tag);
1475                out.push(super::error::DrainedSnapshotEntry {
1476                    tag,
1477                    report,
1478                    stats: s,
1479                    elapsed_ms: e,
1480                    boundary_offset_ms: bo,
1481                    step_index: phase,
1482                });
1483            }
1484        }
1485        // Defensive tail for desynchronised maps (matches
1486        // `drain_ordered`'s tail behaviour). Any stats / elapsed /
1487        // step_index entries that were not paired with a tag in
1488        // `order` are dropped because they have no anchoring report
1489        // — surfacing them as orphaned tuples would invent a structure
1490        // no consumer expects.
1491        for (tag, report) in reports {
1492            tracing::warn!(
1493                tag,
1494                "SnapshotBridge::drain_ordered_with_stats: report present in `reports` \
1495                 but missing from `order` — surfacing at tail (FIFO \
1496                 invariant violation; please file)"
1497            );
1498            store.push_event(SnapshotBridgeEvent::DrainOrderingInvariantViolation {
1499                tag: tag.clone(),
1500                drain_variant: "drain_ordered_with_stats",
1501            });
1502            let s = stats.remove(&tag).unwrap_or_else(stats_fallback);
1503            let e = elapsed.remove(&tag);
1504            let bo = boundary_offset.remove(&tag);
1505            let phase = step_index.remove(&tag);
1506            out.push(super::error::DrainedSnapshotEntry {
1507                tag,
1508                report,
1509                stats: s,
1510                elapsed_ms: e,
1511                boundary_offset_ms: bo,
1512                step_index: phase,
1513            });
1514        }
1515        out
1516    }
1517
1518    /// Take ownership of all queued [`SnapshotBridgeEvent`]s in
1519    /// insertion order. Empties the internal log; a follow-up call
1520    /// returns an empty vec. Test authors call this after
1521    /// [`Self::drain_ordered`] / [`Self::drain_ordered_with_stats`]
1522    /// to inspect bridge-side conditions that fired during the
1523    /// scenario (eviction, overwrite, missing capture, invariant
1524    /// violation).
1525    ///
1526    /// Independent of the report drain — events accumulate even on
1527    /// scenarios that never call `drain_ordered*`, and reports
1528    /// remain reachable even on scenarios that never call
1529    /// `drain_events`. Tests that want to fail on a bridge event
1530    /// compose the streams: drain events, inspect, fail with
1531    /// `AssertResult::fail(AssertDetail::new(Other, ...))` if any
1532    /// variant is unexpected.
1533    ///
1534    /// When the events log hit [`MAX_STORED_EVENTS`] and FIFO-evicted
1535    /// older entries since the previous drain, a synthetic
1536    /// [`SnapshotBridgeEvent::EventLogTruncated`] is appended at the
1537    /// tail of the returned vec carrying the dropped count — the
1538    /// operator never silently loses events. The internal dropped
1539    /// counter resets to 0 after every drain.
1540    pub fn drain_events(&self) -> Vec<SnapshotBridgeEvent> {
1541        let mut store = self.snapshots.lock_unpoisoned();
1542        let mut events = std::mem::take(&mut store.events);
1543        if store.events_dropped > 0 {
1544            events.push(SnapshotBridgeEvent::EventLogTruncated {
1545                dropped_count: store.events_dropped,
1546            });
1547            store.events_dropped = 0;
1548        }
1549        events
1550    }
1551
1552    /// Non-draining count of queued [`SnapshotBridgeEvent`]s. Useful
1553    /// for "no bridge events fired" assertions without consuming
1554    /// the log — `assert_eq!(bridge.event_count(), 0)`. Does NOT
1555    /// include the synthetic
1556    /// [`SnapshotBridgeEvent::EventLogTruncated`] marker that
1557    /// [`Self::drain_events`] would append; that marker is
1558    /// drain-time-only and `events_dropped > 0` is observable via
1559    /// the next drain rather than via this counter.
1560    pub fn event_count(&self) -> usize {
1561        self.snapshots.lock_unpoisoned().events.len()
1562    }
1563
1564    /// Install this bridge as the active bridge for the calling
1565    /// thread. The bridge stays installed for the lifetime of the
1566    /// returned [`BridgeGuard`]; on drop the prior bridge (or
1567    /// `None`) is restored.
1568    ///
1569    /// Thread-local because [`execute_steps`](crate::scenario::ops::execute_steps)
1570    /// runs on the calling thread and `Op::CaptureSnapshot` only makes
1571    /// sense in that exact thread's call stack — installing a
1572    /// bridge process-wide would race against parallel test
1573    /// threads.
1574    ///
1575    /// # Cloning for drain
1576    ///
1577    /// `set_thread_local` consumes `self`. The bridge is `Clone`
1578    /// (internal state lives behind `Arc<Mutex<_>>`), so callers
1579    /// that need to inspect / drain the bridge after the scenario
1580    /// runs MUST clone before installing:
1581    ///
1582    /// ```ignore
1583    /// let bridge = SnapshotBridge::new(capture_cb);
1584    /// let drain_handle = bridge.clone();      // share the Arc
1585    /// let _guard = bridge.set_thread_local(); // consume the original
1586    /// // ... execute_scenario runs and records on the thread-local ...
1587    /// let snaps = drain_handle.drain_cgroup_procs();
1588    /// ```
1589    ///
1590    /// Both handles share the same underlying snapshot store
1591    /// (reports, kernel_ops, cgroup_procs); ops recorded against
1592    /// the thread-local are observable via the cloned handle.
1593    /// Forgetting to clone is the single most common bridge-flow
1594    /// footgun for tests that consume capture results.
1595    pub fn set_thread_local(self) -> BridgeGuard {
1596        let prev = ACTIVE_BRIDGE.with(|c| c.borrow_mut().replace(self));
1597        BridgeGuard { prev }
1598    }
1599}
1600
1601thread_local! {
1602    static ACTIVE_BRIDGE: std::cell::RefCell<Option<SnapshotBridge>> =
1603        const { std::cell::RefCell::new(None) };
1604}
1605
1606/// RAII guard returned by [`SnapshotBridge::set_thread_local`].
1607/// Restores the prior thread-local bridge on drop so a nested
1608/// scenario inside an outer one cannot leak its bridge into the
1609/// outer scope.
1610#[must_use = "BridgeGuard restores the prior bridge on drop; bind it"]
1611pub struct BridgeGuard {
1612    prev: Option<SnapshotBridge>,
1613}
1614
1615impl Drop for BridgeGuard {
1616    fn drop(&mut self) {
1617        let prev = self.prev.take();
1618        ACTIVE_BRIDGE.with(|c| {
1619            *c.borrow_mut() = prev;
1620        });
1621    }
1622}
1623
1624/// Run `f` with the active bridge if one is installed. When no
1625/// bridge is installed, returns `None` without invoking `f` — the
1626/// caller's responsibility to fall through to its own no-bridge
1627/// path.
1628pub fn with_active_bridge<R>(f: impl FnOnce(&SnapshotBridge) -> R) -> Option<R> {
1629    ACTIVE_BRIDGE.with(|c| c.borrow().as_ref().map(f))
1630}
1631
1632#[cfg(test)]
1633mod accessor_wait_tests {
1634    use super::*;
1635    use std::sync::Arc;
1636    use std::sync::atomic::{AtomicU8, AtomicU64};
1637    use std::time::{Duration, Instant};
1638
1639    fn bridge_with_accessor_state() -> (
1640        SnapshotBridge,
1641        Arc<AtomicU64>,
1642        Arc<AtomicU8>,
1643        Arc<vmm_sys_util::eventfd::EventFd>,
1644    ) {
1645        let seqno = Arc::new(AtomicU64::new(0));
1646        let worker_state = Arc::new(AtomicU8::new(accessor_worker_state::TRYING));
1647        let wake_evt = Arc::new(
1648            vmm_sys_util::eventfd::EventFd::new(libc::EFD_NONBLOCK)
1649                .expect("eventfd for accessor_dispatcher_wake test fixture"),
1650        );
1651        let cb: CaptureCallback = Arc::new(|_| None);
1652        let bridge = SnapshotBridge::new(cb).with_accessor_state(
1653            seqno.clone(),
1654            worker_state.clone(),
1655            wake_evt.clone(),
1656        );
1657        (bridge, seqno, worker_state, wake_evt)
1658    }
1659
1660    #[test]
1661    fn wait_no_accessor_state_returns_ok_zero_immediately() {
1662        // Bridge built without with_accessor_state — wait degenerates
1663        // to Ok(0) so unit-test scenarios that don't spawn a real
1664        // worker don't synthesize one.
1665        let cb: CaptureCallback = Arc::new(|_| None);
1666        let bridge = SnapshotBridge::new(cb);
1667        let deadline = Instant::now() + Duration::from_secs(60);
1668        assert_eq!(
1669            bridge
1670                .wait_for_accessor_publish_advance(0, deadline, "Op::Test")
1671                .unwrap(),
1672            0
1673        );
1674    }
1675
1676    #[test]
1677    fn wait_seqno_already_advanced_returns_immediately() {
1678        // Pre-advance the seqno so the tight initial check returns
1679        // without polling. Validates the "publish landed between
1680        // dispatch's load and wait call" race window is handled.
1681        let (bridge, seqno, _, _) = bridge_with_accessor_state();
1682        seqno.store(5, std::sync::atomic::Ordering::Release);
1683        let deadline = Instant::now() + Duration::from_secs(60);
1684        assert_eq!(
1685            bridge
1686                .wait_for_accessor_publish_advance(3, deadline, "Op::Test")
1687                .unwrap(),
1688            5
1689        );
1690    }
1691
1692    #[test]
1693    fn wait_observes_worker_publish() {
1694        // Pin: worker bumps seqno + pulses dispatcher wake fd on a
1695        // side thread; dispatch-style wait observes the advance
1696        // within the deadline. Event-driven path — the wake fd
1697        // unblocks poll within a kernel-scheduling tick, so the
1698        // test should complete well under 500 ms even though the
1699        // deadline is 2 s.
1700        let (bridge, seqno, _, wake_evt) = bridge_with_accessor_state();
1701        let bridge_for_thread = bridge.clone();
1702        let seqno_for_thread = seqno.clone();
1703        let wake_for_thread = wake_evt.clone();
1704        let publisher = std::thread::spawn(move || {
1705            std::thread::sleep(Duration::from_millis(100));
1706            seqno_for_thread.fetch_add(1, std::sync::atomic::Ordering::Release);
1707            // Pulse the wake fd in lock-step with the seqno bump,
1708            // matching the worker's contract at the freeze_coord
1709            // publish site.
1710            let _ = wake_for_thread.write(1);
1711            let _ = bridge_for_thread; // keep bridge alive for the thread duration
1712        });
1713        let deadline = Instant::now() + Duration::from_secs(2);
1714        let t0 = Instant::now();
1715        let observed = bridge
1716            .wait_for_accessor_publish_advance(0, deadline, "Op::Test")
1717            .expect("publish observed within deadline");
1718        let elapsed = t0.elapsed();
1719        publisher.join().unwrap();
1720        assert_eq!(observed, 1);
1721        // Event-driven wake — the wait should return shortly after
1722        // the 100ms publisher sleep, NOT at the 2s deadline. A 500ms
1723        // ceiling leaves plenty of slack for slow CI hosts while
1724        // still catching a regression where wait falls back to a
1725        // pure-sleep loop.
1726        assert!(
1727            elapsed < Duration::from_millis(500),
1728            "wait did not wake event-driven; took {elapsed:?} \
1729             (expected close to 100ms)"
1730        );
1731    }
1732
1733    #[test]
1734    fn wait_bails_on_worker_failed_permanently() {
1735        // Pin: when worker_state flips to FAILED_PERMANENTLY the
1736        // wait surfaces the terminal-worker diagnostic instead of
1737        // blocking the full deadline.
1738        let (bridge, _, worker_state, _) = bridge_with_accessor_state();
1739        worker_state.store(
1740            accessor_worker_state::FAILED_PERMANENTLY,
1741            std::sync::atomic::Ordering::Release,
1742        );
1743        let deadline = Instant::now() + Duration::from_secs(60);
1744        let t0 = Instant::now();
1745        let err = bridge
1746            .wait_for_accessor_publish_advance(0, deadline, "Op::Test")
1747            .expect_err("expected terminal-worker bail");
1748        let elapsed = t0.elapsed();
1749        assert!(
1750            elapsed < Duration::from_millis(100),
1751            "wait did not surface terminal state quickly; took {elapsed:?}"
1752        );
1753        let msg = err.to_string();
1754        assert!(
1755            msg.contains("FAILED_PERMANENTLY"),
1756            "bail message missing terminal sentinel: {msg}"
1757        );
1758        assert!(msg.contains("Op::Test"), "bail missing op label: {msg}");
1759    }
1760
1761    #[test]
1762    fn wait_bails_on_deadline_with_worker_state_in_diagnostic() {
1763        // Pin: deadline-exceeded surfaces with the worker state
1764        // attached so the diagnostic distinguishes stuck-in-Trying
1765        // (transient) from FAILED_PERMANENTLY (terminal).
1766        let (bridge, _, _, _) = bridge_with_accessor_state();
1767        let deadline = Instant::now() + Duration::from_millis(120);
1768        let err = bridge
1769            .wait_for_accessor_publish_advance(0, deadline, "Op::Test")
1770            .expect_err("expected deadline bail");
1771        let msg = err.to_string();
1772        assert!(
1773            msg.contains("worker state = 0"),
1774            "deadline diagnostic missing worker_state: {msg}"
1775        );
1776        assert!(
1777            msg.contains("Trying"),
1778            "deadline diagnostic missing state name table: {msg}"
1779        );
1780    }
1781
1782    #[test]
1783    fn accessor_publish_seqno_returns_zero_without_accessor_state() {
1784        let cb: CaptureCallback = Arc::new(|_| None);
1785        let bridge = SnapshotBridge::new(cb);
1786        assert_eq!(bridge.accessor_publish_seqno(), 0);
1787    }
1788
1789    #[test]
1790    fn accessor_publish_seqno_reads_atomic() {
1791        let (bridge, seqno, _, _) = bridge_with_accessor_state();
1792        assert_eq!(bridge.accessor_publish_seqno(), 0);
1793        seqno.store(42, std::sync::atomic::Ordering::Release);
1794        assert_eq!(bridge.accessor_publish_seqno(), 42);
1795    }
1796}
1797
1798#[cfg(test)]
1799mod periodic_tag_tests {
1800    use super::*;
1801
1802    /// `is_periodic_tag` MUST accept the exact format the
1803    /// coordinator emits: `"periodic_"` + 3 ASCII digits. Pins the
1804    /// canonical shape so a regression that relaxed the matcher
1805    /// (e.g., back to `starts_with("periodic_")`) would surface
1806    /// immediately via the rejected-tag cases below.
1807    #[test]
1808    fn is_periodic_tag_accepts_canonical_three_digit_index() {
1809        assert!(is_periodic_tag("periodic_000"));
1810        assert!(is_periodic_tag("periodic_007"));
1811        assert!(is_periodic_tag("periodic_123"));
1812        assert!(is_periodic_tag("periodic_999"));
1813    }
1814
1815    /// User-supplied `Op::CaptureSnapshot` tags whose names start
1816    /// with `"periodic_"` but DON'T match the strict
1817    /// `periodic_NNN` shape MUST be rejected. This is the
1818    /// load-bearing defense against tag collision polluting
1819    /// `periodic_real_count`.
1820    #[test]
1821    fn is_periodic_tag_rejects_user_tag_collisions() {
1822        assert!(!is_periodic_tag("periodic_kaslr"));
1823        assert!(!is_periodic_tag("periodic_user_baseline"));
1824        assert!(!is_periodic_tag("periodic_"));
1825        assert!(!is_periodic_tag("periodic_1"));
1826        assert!(!is_periodic_tag("periodic_12"));
1827        assert!(!is_periodic_tag("periodic_1234"));
1828        assert!(!is_periodic_tag("periodic_00a"));
1829        assert!(!is_periodic_tag("periodic_007 "));
1830        assert!(!is_periodic_tag("PERIODIC_000"));
1831        assert!(!is_periodic_tag("capture_my_thing"));
1832        assert!(!is_periodic_tag(""));
1833        assert!(!is_periodic_tag("periodic"));
1834    }
1835
1836    /// `periodic_real_count` MUST count only canonical
1837    /// `periodic_NNN` tags. A bridge with a real `periodic_000`
1838    /// alongside a real user `periodic_kaslr` capture MUST
1839    /// surface count = 1 — the user tag does NOT inflate the
1840    /// floor.
1841    #[test]
1842    fn periodic_real_count_ignores_user_tag_with_periodic_prefix() {
1843        let cb: CaptureCallback = Arc::new(|_| None);
1844        let bridge = SnapshotBridge::new(cb);
1845        // Real periodic capture from the coordinator.
1846        let real_periodic = crate::monitor::dump::FailureDumpReport {
1847            schema: crate::monitor::dump::SCHEMA_SINGLE.to_string(),
1848            is_placeholder: false,
1849            ..Default::default()
1850        };
1851        bridge.store("periodic_000", real_periodic);
1852        // User-supplied CaptureSnapshot tag that happens to start
1853        // with "periodic_" — MUST NOT count.
1854        let user_capture = crate::monitor::dump::FailureDumpReport {
1855            schema: crate::monitor::dump::SCHEMA_SINGLE.to_string(),
1856            is_placeholder: false,
1857            ..Default::default()
1858        };
1859        bridge.store("periodic_kaslr", user_capture);
1860        // A placeholder under a canonical tag — counted as fired
1861        // but NOT as real.
1862        bridge.store(
1863            "periodic_001",
1864            crate::monitor::dump::FailureDumpReport::placeholder("rendezvous timed out"),
1865        );
1866        assert_eq!(
1867            bridge.periodic_real_count(),
1868            1,
1869            "only the canonical periodic_000 real capture counts; \
1870             user tag periodic_kaslr (even though real) must be \
1871             excluded by the strict matcher",
1872        );
1873    }
1874}
1875
1876#[cfg(test)]
1877mod cgroup_procs_bridge_tests {
1878    use super::*;
1879
1880    /// Pin the Arc-shared-state invariant for `SnapshotBridge.clone()`:
1881    /// a clone made BEFORE `set_thread_local` consumed the original
1882    /// sees writes that the consumed bridge made via `record_cgroup_procs`.
1883    /// This is the user-facing pattern in tests/cgroup_capture_procs_e2e.rs
1884    /// and the doc example on `Op::CaptureCgroupProcs`. A future refactor
1885    /// that accidentally turned `cgroup_procs: Arc<Mutex<_>>` into a
1886    /// non-Arc field would silently break the e2e drain pattern; this
1887    /// test catches it.
1888    #[test]
1889    fn snapshot_bridge_record_cgroup_procs_visible_via_arc_clone() {
1890        let bridge_a = SnapshotBridge::new(Arc::new(|_| None));
1891        let bridge_b = bridge_a.clone();
1892        bridge_a.record_cgroup_procs("tag".into(), "cg".into(), vec![1, 2]);
1893        let drained_b = bridge_b.drain_cgroup_procs();
1894        assert_eq!(drained_b.len(), 1);
1895        assert_eq!(drained_b[0].tag, "tag");
1896        assert_eq!(drained_b[0].cgroup, "cg");
1897        assert_eq!(drained_b[0].pids, vec![1, 2]);
1898        // A drained THROUGH B — pin that A also sees empty afterward
1899        // (single shared Vec under the Arc<Mutex>; drain via either
1900        // handle leaves both at empty).
1901        assert!(
1902            bridge_a.drain_cgroup_procs().is_empty(),
1903            "Arc-shared field — B's drain empties A too",
1904        );
1905    }
1906
1907    /// Pin the consume-on-drain semantic of `drain_cgroup_procs`. The
1908    /// impl uses `std::mem::take` which leaves the source empty after
1909    /// the call returns. A future refactor swapping `mem::take` for
1910    /// `.clone()` would silently duplicate snapshots across drains;
1911    /// this test catches it.
1912    #[test]
1913    fn snapshot_bridge_drain_cgroup_procs_consumes_via_mem_take() {
1914        let bridge = SnapshotBridge::new(Arc::new(|_| None));
1915        bridge.record_cgroup_procs("t1".into(), "cg".into(), vec![1]);
1916        bridge.record_cgroup_procs("t2".into(), "cg".into(), vec![2]);
1917        let drained = bridge.drain_cgroup_procs();
1918        assert_eq!(drained.len(), 2);
1919        // Second drain MUST yield empty — mem::take left the source
1920        // empty after the first drain.
1921        let second = bridge.drain_cgroup_procs();
1922        assert!(second.is_empty(), "drain must consume; got: {second:?}");
1923        // After record-after-drain, only the new entries appear.
1924        bridge.record_cgroup_procs("t3".into(), "cg".into(), vec![3]);
1925        let third = bridge.drain_cgroup_procs();
1926        assert_eq!(third.len(), 1);
1927        assert_eq!(third[0].tag, "t3");
1928    }
1929
1930    /// `cgroup_procs_by_tag` returns the FIRST matching snapshot
1931    /// without consuming the drain log. Mirrors `kernel_op_value(tag)`
1932    /// semantics. The drain log persists past the lookup so subsequent
1933    /// `drain_cgroup_procs` calls observe the same entries.
1934    #[test]
1935    fn snapshot_bridge_cgroup_procs_by_tag_is_non_draining() {
1936        let bridge = SnapshotBridge::new(Arc::new(|_| None));
1937        bridge.record_cgroup_procs("tag_a".into(), "cg_x".into(), vec![10, 20]);
1938        bridge.record_cgroup_procs("tag_b".into(), "cg_y".into(), vec![30]);
1939        // Lookup by tag returns the expected snapshot.
1940        let snap_a = bridge.cgroup_procs_by_tag("tag_a").expect("tag_a present");
1941        assert_eq!(snap_a.pids, vec![10, 20]);
1942        assert_eq!(snap_a.cgroup, "cg_x");
1943        // Unknown tag → None.
1944        assert!(bridge.cgroup_procs_by_tag("tag_unknown").is_none());
1945        // Non-draining: drain still returns both entries.
1946        let drained = bridge.drain_cgroup_procs();
1947        assert_eq!(drained.len(), 2);
1948        // After drain, by_tag finds nothing.
1949        assert!(bridge.cgroup_procs_by_tag("tag_a").is_none());
1950    }
1951}