ktstr/scenario/snapshot/bridge.rs
1//! [`SnapshotBridge`] is the request/reply channel between the
2//! scenario executor and the host capture pipeline. Implements
3//! callbacks ([`CaptureCallback`], [`WatchRegisterCallback`]), the
4//! per-thread bridge installation guard ([`BridgeGuard`]), the
5//! diagnostic event log ([`SnapshotBridgeEvent`]), and the
6//! storage caps ([`MAX_STORED_SNAPSHOTS`], [`MAX_STORED_EVENTS`],
7//! [`MAX_WATCH_SNAPSHOTS`]).
8
9use std::collections::{HashMap, VecDeque};
10use std::sync::{Arc, Mutex};
11
12use crate::monitor::dump::FailureDumpReport;
13use crate::sync::MutexExt;
14
15// ---------------------------------------------------------------------------
16// Bridge: request/reply channel between executor and host capture
17// ---------------------------------------------------------------------------
18
19/// Closure type the bridge invokes to capture a snapshot.
20///
21/// Returns `None` when the capture pipeline could not produce a
22/// report (rendezvous timed out, capture prerequisites missing, no
23/// host-side wiring).
24///
25/// **Wire shape (locked: ioeventfd doorbell).** The production
26/// implementation writes the tag into a small per-call slot inside
27/// the SHM region, performs an `mmap`'d `u32` write to the
28/// doorbell GPA inside the MMIO gap (KVM dispatches via
29/// `KVM_IOEVENTFD` without a userspace exit), then blocks on a
30/// per-request reply completion (an eventfd / mpsc receiver paired
31/// with the doorbell registration). The freeze coordinator's
32/// epoll loop wakes on the doorbell eventfd, reads the tag, runs
33/// `freeze_and_dispatch`, and signals the reply completion with
34/// the resulting `Option<FailureDumpReport>`.
35///
36/// On-demand captures are orthogonal to the error-trigger
37/// `freeze_state` machine — the request handler in the coordinator
38/// must not transition `freeze_state` from Idle, and must service
39/// requests even when `freeze_state == Done`. The
40/// rendezvous-serialisation invariant is the only constraint: each
41/// request waits for `all parked == false` from the previous
42/// capture before issuing.
43pub type CaptureCallback = Arc<dyn Fn(&str) -> Option<FailureDumpReport> + Send + Sync + 'static>;
44
45/// Closure type the bridge invokes to register a hardware-watchpoint
46/// snapshot.
47///
48/// This callback is the host-side unit-testing seam — it lets
49/// in-process executor tests record the symbol and return without
50/// arming any hardware. In a booted VM the bridge's
51/// `register_watch` is **not** installed; the in-guest
52/// `Op::WatchSnapshot` arm rings an SHM doorbell and the host's
53/// freeze coordinator runs `arm_user_watchpoint`
54/// (`src/vmm/freeze_coord/mod.rs`), which resolves the symbol via a
55/// verbatim match against the vmlinux ELF symtab, allocates a
56/// free user watchpoint slot (3 user slots are available; slot 0
57/// is reserved for the existing `*scx_root->exit_kind` trigger),
58/// and arms the hardware watchpoint via `KVM_SET_GUEST_DEBUG`.
59///
60/// Once armed, the capture tagged with the symbol fires on every
61/// guest write without any further userspace round-trip — the
62/// debug exit dispatches into the freeze coordinator directly,
63/// mirroring the existing reserved-slot path the error-class
64/// trigger already uses.
65///
66/// Returns `Err(reason)` when:
67/// - The symbol does not match any vmlinux ELF symtab entry
68/// (typo, symbol stripped from the build, or a non-ELF kernel
69/// image).
70/// - The resolved KVA is not 4-byte aligned (the 4-byte watch
71/// length the framework arms requires `addr & 0x3 == 0` on
72/// every supported architecture).
73/// - All three available user watchpoint slots are already
74/// allocated.
75/// - `KVM_SET_GUEST_DEBUG` rejected the arm (host kernel
76/// limitation).
77pub type WatchRegisterCallback =
78 Arc<dyn Fn(&str) -> std::result::Result<(), String> + Send + Sync + 'static>;
79
80/// Closure type the bridge invokes for a host-side kernel-memory
81/// write or read (`Op::WriteKernel{Hot,Cold}` /
82/// `Op::ReadKernel{Hot,Cold}`).
83///
84/// The host dispatches the request (a sequence of
85/// `(KernelTarget, KernelValue)` entries, plus mode/direction/tag
86/// metadata in `crate::vmm::wire::KernelOpRequestPayload`) against
87/// the resolved guest-memory accessor. Returns
88/// `crate::vmm::wire::KernelOpReplyPayload` mirroring the request
89/// id, the success/error status, and (for reads) the read-back
90/// bytes per entry.
91///
92/// Test fixtures install a closure that records the request and
93/// returns a synthetic reply without touching real guest memory
94/// (the in-process bridge surface stays mockable). The in-VM
95/// production path goes through the wire layer
96/// (`crate::vmm::wire::MsgType::KernelOpRequest`) and the freeze
97/// coordinator / hot-path worker, NOT this callback — the bridge
98/// keeps it Option<…> so executor tests can install one while real
99/// VM runs leave it unset.
100pub type KernelOpCallback = Arc<
101 dyn Fn(&crate::vmm::wire::KernelOpRequestPayload) -> crate::vmm::wire::KernelOpReplyPayload
102 + Send
103 + Sync
104 + 'static,
105>;
106
107/// Shared state owning the capture closure plus the captured-report
108/// map.
109///
110/// Cloneable via the wrapped `Arc`s. The host installs an instance
111/// in the executor's thread-local via `Self::set_thread_local`
112/// before [`execute_steps`](crate::scenario::ops::execute_steps)
113/// runs; the executor's `Op::CaptureSnapshot` arm calls
114/// `Self::capture` with the op's name.
115/// Maximum number of [`Op::WatchSnapshot`](crate::scenario::ops::Op::WatchSnapshot)
116/// ops a single scenario may register.
117///
118/// This is the framework's per-scenario cap on user watchpoint slots
119/// across every supported host architecture, not a count of debug
120/// registers on any specific arch. One additional slot (slot 0) is
121/// always reserved internally for the `*scx_root->exit_kind`
122/// watchpoint that drives the error-class freeze trigger, so a host
123/// must expose at least 4 hardware watchpoint slots through
124/// `KVM_SET_GUEST_DEBUG` for every user `Op::WatchSnapshot` to arm.
125/// Common x86_64 and aarch64 hosts meet that bar.
126///
127/// The actual host slot count is probed once during VM bring-up via
128/// `KVM_CHECK_EXTENSION(KVM_CAP_GUEST_DEBUG_HW_WPS)` in
129/// `crate::vmm::freeze_coord` (search for `Cap::DebugHwWps`); a
130/// host returning `<= 0` or fewer than 4 slots logs a `tracing::warn!`
131/// at coordinator setup. Per-arm failures surface as `tracing::warn!`
132/// from `self_arm_watchpoint` with per-vCPU retry capping at
133/// `WATCHPOINT_MAX_NON_EINTR_FAILURES`.
134pub const MAX_WATCH_SNAPSHOTS: usize = 3;
135
136/// Maximum number of [`FailureDumpReport`]s the bridge keeps. Captures
137/// driven by a Loop step with a unique tag per iteration would
138/// otherwise grow the storage map without bound — every report
139/// renders a full BTF tree (potentially hundreds of KB), so an
140/// uncapped bridge under hostile/runaway capture frequency drains
141/// host memory. The bridge enforces FIFO eviction at this cap so the
142/// most recent captures stay reachable; eviction logs a `tracing::warn!`
143/// naming the dropped tag so the operator sees the truncation.
144pub const MAX_STORED_SNAPSHOTS: usize = 64;
145
146/// Maximum number of [`SnapshotBridgeEvent`] entries the bridge
147/// retains between [`SnapshotBridge::drain_events`] calls. A scenario
148/// that triggers many cap-eviction events (a Loop step that captures
149/// a unique tag every 30ms for 10 minutes produces ~20 000 events,
150/// each ~100 bytes) would otherwise grow the events log without
151/// bound. The bridge enforces FIFO eviction at this cap — when push
152/// would exceed it, the oldest event is dropped, the dropped count
153/// is tracked on `SnapshotStore::events_dropped`, and the next
154/// [`SnapshotBridge::drain_events`] call appends a synthetic
155/// [`SnapshotBridgeEvent::EventLogTruncated`] entry at the tail so
156/// the operator never silently loses events. The cap is loose enough
157/// (1024 events × ~100 bytes ≈ 100 KiB) that legitimate scenarios
158/// never hit it; only runaway capture frequency does.
159pub const MAX_STORED_EVENTS: usize = 1024;
160
161/// A structured event surfaced by the [`SnapshotBridge`] during its
162/// own operation (capture, storage, drain). Promotes the previous
163/// `tracing::warn!`-only diagnostic channel into an operator-
164/// drainable structured row so tests can assert on bridge-side
165/// conditions (eviction, missing capture, invariant violations)
166/// instead of grepping stderr.
167///
168/// Distinct from [`crate::assert::AssertDetail`]: an `AssertDetail`
169/// is a per-assertion outcome (Starved / Stuck / etc.); a
170/// `SnapshotBridgeEvent` is a per-bridge meta-event about the
171/// storage pipeline itself. Mixing them at the assertion level
172/// would conflate "scheduler behavior failed" with "bridge dropped
173/// an entry due to cap" — two orthogonal concerns. Test authors
174/// who want to fail their scenario on a bridge event compose the
175/// two streams themselves (drain events, convert to `AssertDetail`
176/// if needed) — see [`SnapshotBridge::drain_events`].
177///
178/// Every bridge site that previously emitted only `tracing::warn!`
179/// still emits the warn (preserved for stderr visibility) AND
180/// appends the structured variant here. "Promote, don't replace."
181#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
182#[non_exhaustive]
183pub enum SnapshotBridgeEvent {
184 /// Capture callback returned `None` for `tag` — the corresponding
185 /// `Op::CaptureSnapshot` was a no-op. Fires from
186 /// [`SnapshotBridge::capture`] when the host couldn't freeze /
187 /// build the report (scheduler died before the freeze, scan
188 /// accessor unavailable, etc.).
189 CaptureUnavailable {
190 /// Tag the failed capture was attempted under.
191 tag: String,
192 },
193 /// Storage of `tag` overwrote a prior entry. Fires from
194 /// [`SnapshotBridge::store`] / [`SnapshotBridge::store_with_stats`]
195 /// when `bridge.store(tag, ...)` is called with a tag that
196 /// already has a stored report. FIFO order is refreshed to back,
197 /// prior `(stats, elapsed_ms)` parallel slots are replaced.
198 Overwrite {
199 /// Tag whose prior entry was overwritten.
200 tag: String,
201 /// `schema` of the prior entry — included for diagnostic
202 /// context (a schema bump alongside an unintended overwrite
203 /// is the textbook double-tag bug).
204 prior_schema: String,
205 },
206 /// FIFO eviction of `evicted_tag` triggered by storing
207 /// `new_tag`. Fires from the cap-enforcement loop in
208 /// `store_internal` when `reports.len()` exceeds
209 /// [`MAX_STORED_SNAPSHOTS`] after insertion. `cap` is the limit
210 /// at the time of eviction.
211 Eviction {
212 /// Tag that was popped from the FIFO to make room.
213 evicted_tag: String,
214 /// Tag whose storage triggered the cap-overflow.
215 new_tag: String,
216 /// Cap value at the time — folded in so the operator
217 /// doesn't have to cross-reference [`MAX_STORED_SNAPSHOTS`].
218 cap: usize,
219 },
220 /// A drain found `tag` in `reports` but missing from `order` —
221 /// internal invariant violation. The report was surfaced at the
222 /// tail of the drain output rather than dropped silently; this
223 /// event flags the bug so test authors who care can fail their
224 /// scenario.
225 DrainOrderingInvariantViolation {
226 /// Tag whose desynchronised entry was surfaced at the tail.
227 tag: String,
228 /// Which drain variant fired the warning —
229 /// `"drain_ordered"` or `"drain_ordered_with_stats"`. Lets
230 /// post-mortem analysis disambiguate the two code paths.
231 drain_variant: &'static str,
232 },
233 /// The cap-enforcement loop in `store_internal` found
234 /// `reports.len() > cap` while `order` was empty — a worse
235 /// invariant violation than [`Self::DrainOrderingInvariantViolation`]
236 /// because the bulk-clear branch nukes ALL reports / stats /
237 /// elapsed_ms to restore the invariant. Unreachable through the
238 /// current public API (every insert site appends to `order`
239 /// alongside `reports`), but recorded for the same future-proofing
240 /// reason as the drain variant: a refactor that desynchronised
241 /// the two collections must not be allowed to silently drop the
242 /// entire bridge state.
243 CapInvariantViolation {
244 /// `reports.len()` at the moment the bulk-clear was triggered.
245 /// Folded in so the operator can see how much state was
246 /// nuked.
247 reports_len: usize,
248 /// Cap value at the time — same definition as
249 /// [`Self::Eviction::cap`].
250 cap: usize,
251 },
252 /// The events log itself hit [`MAX_STORED_EVENTS`] and dropped
253 /// `dropped_count` oldest events to keep memory bounded. The
254 /// bridge appends this variant at the tail of every
255 /// [`SnapshotBridge::drain_events`] result whenever
256 /// `events_dropped > 0` (resets to 0 after drain), so the
257 /// operator never silently loses events — they see a count of
258 /// how many were dropped between drains. Test authors who care
259 /// about exhaustive coverage should `assert!(!matches!(events
260 /// .last(), Some(SnapshotBridgeEvent::EventLogTruncated { .. })))`
261 /// to fail when the bridge truncated.
262 EventLogTruncated {
263 /// Number of events evicted from the front of the log since
264 /// the last [`SnapshotBridge::drain_events`] call. Resets to
265 /// 0 after drain.
266 dropped_count: u64,
267 },
268}
269
270/// Inner storage for [`SnapshotBridge::snapshots`]. Pairs the
271/// HashMap-keyed reports with a [`VecDeque`] tracking insertion
272/// order so the FIFO eviction in [`SnapshotBridge::store`] can pop
273/// the oldest tag in O(1) when the cap is reached. The optional
274/// `stats` map carries the scheduler-stats JSON captured at the
275/// same boundary as the snapshot — only periodic captures populate
276/// this; on-demand and watchpoint captures leave the slot empty
277/// because no stats request is issued.
278pub(super) struct SnapshotStore {
279 pub(super) reports: HashMap<String, FailureDumpReport>,
280 /// scx_stats JSON captured at the same wall-clock as the report
281 /// stored under the same tag in `reports`. Periodic captures
282 /// populate this when a stats client is wired and the request
283 /// succeeds; on-demand / watchpoint paths leave the entry
284 /// absent. `drain_ordered_with_stats` `remove`s the per-tag slot
285 /// and surfaces it as `Sample::stats`; an absent/`None` slot is
286 /// the expected shape for non-periodic tags or when the scheduler
287 /// stats request failed.
288 pub(super) stats: HashMap<String, Result<serde_json::Value, super::error::MissingStatsReason>>,
289 /// Elapsed milliseconds since `run_start` at the moment the
290 /// periodic capture fired. Same key set as `reports` for
291 /// periodic tags; absent for non-periodic captures. Read by
292 /// [`SnapshotBridge::drain_ordered_with_stats`] to populate
293 /// `Sample::elapsed_ms` without recomputing.
294 pub(super) elapsed_ms: HashMap<String, u64>,
295 /// Workload-relative boundary OFFSET (periodic `boundary_ns -
296 /// scenario_anchor_ns`, in ms) this periodic capture was
297 /// SCHEDULED for — distinct from `elapsed_ms`, which is the
298 /// run_start-relative FIRE time (~uniform across the deferred-fire
299 /// burst, so useless for per-phase attribution). Same key set as
300 /// `reports` for periodic tags; absent for non-periodic / on-demand
301 /// captures. Drained into
302 /// [`super::error::DrainedSnapshotEntry::boundary_offset_ms`] and
303 /// consumed by `build_phase_buckets[_with_stimulus]` to attribute
304 /// the capture to the guest step whose stimulus window contains
305 /// this offset, and as the workload-relative bucket start/end.
306 pub(super) boundary_offset_ms: HashMap<String, u64>,
307 /// Per-VM scenario step index stamped on the capture by the
308 /// step-aware entry points
309 /// ([`SnapshotBridge::capture_with_step`] /
310 /// [`SnapshotBridge::store_with_stats_and_step`]). Absent for
311 /// fixture-injected captures stored via the unstamped legacy
312 /// paths ([`SnapshotBridge::capture`] / [`SnapshotBridge::store`]
313 /// / [`SnapshotBridge::store_with_stats`]). Encoded per the
314 /// 1-indexed phase convention (`0` = BASELINE, `1..=N` = Step
315 /// ordinals); drained in lock-step with `reports` / `stats` /
316 /// `elapsed_ms` and surfaced as the
317 /// [`super::error::DrainedSnapshotEntry::step_index`] field so
318 /// the phase-aware aggregator can bucket each sample directly.
319 pub(super) step_index: HashMap<String, u16>,
320 /// Insertion order of currently-resident keys. An overwrite of
321 /// an existing key MUST remove the prior entry from this deque
322 /// before pushing the fresh occurrence so the `reports.len()`
323 /// and `order.len()` invariants stay in lock-step.
324 pub(super) order: VecDeque<String>,
325 /// Structured bridge-side meta-events appended in insertion
326 /// order. Every site that previously emitted only a
327 /// `tracing::warn!` also pushes the corresponding
328 /// [`SnapshotBridgeEvent`] variant here. Drained by
329 /// [`SnapshotBridge::drain_events`] so test authors can assert
330 /// on bridge meta-conditions (eviction, overwrite, missing
331 /// capture, invariant violation) without grepping stderr.
332 /// Capped at [`MAX_STORED_EVENTS`] via FIFO eviction in
333 /// `push_event`; dropped count is tracked in `events_dropped`
334 /// and surfaced as a synthetic
335 /// [`SnapshotBridgeEvent::EventLogTruncated`] appended at the
336 /// tail of the next `drain_events` result so no event loss is
337 /// silent.
338 events: Vec<SnapshotBridgeEvent>,
339 /// Number of events evicted from the front of `events` since
340 /// the last `drain_events` call. Reset to 0 on drain.
341 /// Drain appends [`SnapshotBridgeEvent::EventLogTruncated`] at
342 /// the tail when this is non-zero so the operator never silently
343 /// loses events — they always see a marker carrying the dropped
344 /// count.
345 events_dropped: u64,
346}
347
348impl SnapshotStore {
349 fn new() -> Self {
350 Self {
351 reports: HashMap::new(),
352 stats: HashMap::new(),
353 elapsed_ms: HashMap::new(),
354 boundary_offset_ms: HashMap::new(),
355 step_index: HashMap::new(),
356 order: VecDeque::new(),
357 events: Vec::new(),
358 events_dropped: 0,
359 }
360 }
361
362 /// Append `event` to `events`, enforcing [`MAX_STORED_EVENTS`]
363 /// via FIFO eviction. When push would exceed the cap, the
364 /// oldest entry is removed and `events_dropped` is incremented
365 /// so a subsequent [`SnapshotBridge::drain_events`] call can
366 /// surface a [`SnapshotBridgeEvent::EventLogTruncated`] marker
367 /// — the operator never silently loses events. The fast path
368 /// (cap not reached) is a single push with no extra allocation.
369 fn push_event(&mut self, event: SnapshotBridgeEvent) {
370 if self.events.len() >= MAX_STORED_EVENTS {
371 // Drop the oldest. Vec::remove(0) is O(n) but the cap
372 // is bounded and this branch only fires in pathological
373 // runaway-capture scenarios.
374 self.events.remove(0);
375 self.events_dropped = self.events_dropped.saturating_add(1);
376 }
377 self.events.push(event);
378 }
379}
380
381/// RAII guard for a reserved [`SnapshotBridge::watch_count`] slot.
382///
383/// [`SnapshotBridge::register_watch`] reserves a slot via CAS BEFORE
384/// calling the host's watch-register callback so concurrent callers
385/// cannot push the count past [`MAX_WATCH_SNAPSHOTS`] even
386/// transiently. If the callback panics (rather than returning Err),
387/// the prior manual-fetch_sub rollback never ran — the slot would
388/// leak permanently and every future `register_watch` call would hit
389/// the cap with no real watchpoints armed. This guard releases the
390/// reservation on every exit path (Err-return AND unwind); the
391/// success path commits the slot via `mem::forget`.
392struct WatchSlotGuard<'a> {
393 count: &'a std::sync::atomic::AtomicUsize,
394}
395
396impl Drop for WatchSlotGuard<'_> {
397 fn drop(&mut self) {
398 self.count
399 .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
400 }
401}
402
403/// A single `cgroup.procs` snapshot captured by
404/// [`Op::CaptureCgroupProcs`](crate::scenario::ops::Op::CaptureCgroupProcs).
405///
406/// Tests drain the per-bridge log of these via
407/// [`SnapshotBridge::drain_cgroup_procs`] after the scenario completes,
408/// then assert on the captured `pids` (membership, count, identity)
409/// against the workload they spawned. Carries:
410///
411/// - `tag`: the snapshot key supplied in the Op (lets a scenario
412/// capture pre/post snapshots of the same cgroup at different
413/// points and disambiguate them on drain).
414/// - `cgroup`: the cgroup name the kernel read returned PIDs for.
415/// - `pids`: the thread-group leaders (PIDs / TGIDs) the kernel
416/// reported in `cgroup.procs` at capture time. An empty Vec means
417/// the cgroup existed but held no tasks. A read against a missing
418/// cgroup directory errors at the dispatch arm and never produces
419/// a snapshot — so the presence of a `CgroupProcsSnapshot` in the
420/// drain log implies the read succeeded.
421#[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
422#[non_exhaustive]
423pub struct CgroupProcsSnapshot {
424 /// Snapshot key supplied to
425 /// [`Op::CaptureCgroupProcs`](crate::scenario::ops::Op::CaptureCgroupProcs).
426 /// Distinguishes multiple captures of the same cgroup.
427 pub tag: String,
428 /// Cgroup name the kernel read returned PIDs for.
429 pub cgroup: String,
430 /// Thread-group leaders in the cgroup at capture time.
431 /// `libc::pid_t` is `i32` on Linux; compare against other ktstr
432 /// pid surfaces (`WorkloadHandle::worker_pids()`, etc.) via the
433 /// matching integer type or `as i32` cast when types differ.
434 pub pids: Vec<libc::pid_t>,
435}
436
437impl CgroupProcsSnapshot {
438 /// Construct a `CgroupProcsSnapshot`. The struct is
439 /// `#[non_exhaustive]` so external callers cannot use
440 /// struct-literal syntax — use this constructor instead. The
441 /// framework's [`SnapshotBridge::record_cgroup_procs`] dispatch
442 /// site routes through here so test fixtures that synthesise
443 /// snapshots for drain-consumer testing share the same
444 /// construction path.
445 pub fn new(tag: impl Into<String>, cgroup: impl Into<String>, pids: Vec<libc::pid_t>) -> Self {
446 Self {
447 tag: tag.into(),
448 cgroup: cgroup.into(),
449 pids,
450 }
451 }
452}
453
454/// Host-side capture pipeline that the freeze coordinator routes
455/// [`Op::CaptureSnapshot`](crate::scenario::ops::Op::CaptureSnapshot) and
456/// [`Op::WatchSnapshot`](crate::scenario::ops::Op::WatchSnapshot)
457/// requests through.
458///
459/// Construct via [`SnapshotBridge::new`] (with an explicit capture
460/// callback) and optionally [`SnapshotBridge::with_watch_register`]
461/// to attach watch support. Install for the current thread via
462/// [`SnapshotBridge::set_thread_local`] — see [`BridgeGuard`] for
463/// the RAII teardown contract.
464#[derive(Clone)]
465#[must_use = "dropping a SnapshotBridge discards the capture pipeline"]
466pub struct SnapshotBridge {
467 capture: CaptureCallback,
468 register_watch: Option<WatchRegisterCallback>,
469 kernel_op: Option<KernelOpCallback>,
470 pub(super) snapshots: Arc<Mutex<SnapshotStore>>,
471 /// Per-tag drain log of kernel-op reply payloads. Test fixtures
472 /// inspect this via [`SnapshotBridge::drain_kernel_ops`] after
473 /// `execute_steps` returns to verify each request's outcome.
474 kernel_ops: Arc<Mutex<Vec<(String, crate::vmm::wire::KernelOpReplyPayload)>>>,
475 /// Per-tag drain log of cgroup.procs snapshots captured by
476 /// [`Op::CaptureCgroupProcs`](crate::scenario::ops::Op::CaptureCgroupProcs).
477 /// Stored in insertion order so a scenario capturing the same
478 /// cgroup at multiple points (with distinct tags) sees both
479 /// snapshots when draining. Test fixtures drain via
480 /// [`SnapshotBridge::drain_cgroup_procs`] after the scenario
481 /// completes; each entry carries the tag, the cgroup name read,
482 /// and the PIDs the kernel reported in `cgroup.procs`.
483 cgroup_procs: Arc<Mutex<Vec<CgroupProcsSnapshot>>>,
484 watch_count: Arc<std::sync::atomic::AtomicUsize>,
485 /// Monotonic counter bumped by the freeze-coordinator's accessor-
486 /// init worker on every successful slot publish (initial attach +
487 /// each subsequent re-init triggered by scheduler swap). Paired
488 /// with [`Self::accessor_worker_state`] so a scheduler-swap op can
489 /// wait for the new scheduler's BPF maps to land before returning
490 /// success. `None` for bridges built without an accessor (test
491 /// fixtures that never trigger reinit); see
492 /// [`Self::with_accessor_state`].
493 accessor_publish_seqno: Option<Arc<std::sync::atomic::AtomicU64>>,
494 /// Liveness sentinel for the accessor-init worker — set by the
495 /// worker as it transitions through its retry / publish / exit
496 /// states. Values match [`accessor_worker_state`] module constants
497 /// (Trying / Succeeded / FailedPermanently). The dispatcher in
498 /// `Op::ReplaceScheduler` / `Op::AttachScheduler` reads this on
499 /// timeout so the surfaced error distinguishes "worker still
500 /// trying — bump deadline" from "worker exited — retry will
501 /// hang." `None` mirrors `accessor_publish_seqno`.
502 accessor_worker_state: Option<Arc<std::sync::atomic::AtomicU8>>,
503 /// Dispatcher wake EventFd. The accessor-init worker pulses
504 /// this fd in lock-step with every seqno bump AND on every
505 /// terminal worker_state transition (FAILED_PERMANENTLY).
506 /// Wait paths in [`Self::wait_for_accessor_publish_advance`] /
507 /// [`Self::wait_for_worker_state_not_trying`] `poll(2)` on the
508 /// fd with the remaining deadline so the wake latency is one
509 /// kernel scheduling tick instead of the 50 ms sleep tail an
510 /// atomic-only loop would carry. Distinct from `accessor_ready_evt`
511 /// (which the coord epoll drains) so the dispatcher and the
512 /// coord don't race for the same wake count.
513 ///
514 /// **Single-consumer assumption.** The fd is read (drained) only
515 /// from the wait paths; the bridge is installed thread-local via
516 /// [`Self::set_thread_local`], and each freeze-coordinator
517 /// instance constructs one bridge → one wake fd. Multiple
518 /// concurrent dispatchers sharing the same fd could race for the
519 /// drain edge (the atomic re-check at the top of each wait loop
520 /// keeps that benign — atomic is source of truth — but a missed
521 /// wake adds latency). Per-VM bridge ownership today guarantees
522 /// no such sharing. `None` mirrors the other accessor-state fields.
523 accessor_dispatcher_wake_evt: Option<Arc<vmm_sys_util::eventfd::EventFd>>,
524}
525
526/// State codes for the accessor-init worker, written to the
527/// `SnapshotBridge::accessor_worker_state` atomic. Two-bit
528/// encoding leaves the high bits open for future sub-states (e.g.
529/// "shutting down").
530pub mod accessor_worker_state {
531 /// Worker is in its retry loop trying to initialize the BPF
532 /// accessor pair. Both initial-attach and post-swap re-init
533 /// path through this state before reaching Succeeded.
534 pub const TRYING: u8 = 0;
535 /// Worker has published at least one accessor pair successfully.
536 /// Subsequent re-inits also publish via this transition (the
537 /// state stays SUCCEEDED while the seqno bumps).
538 pub const SUCCEEDED: u8 = 1;
539 /// Worker has exited and will not publish again — ELF parse
540 /// failure, the 60 s boot-budget deadline, or some other
541 /// terminal condition the worker detected. A dispatcher seeing
542 /// this on a timeout knows to surface "worker exited" rather
543 /// than "still trying."
544 pub const FAILED_PERMANENTLY: u8 = 2;
545}
546
547impl std::fmt::Debug for SnapshotBridge {
548 /// Debug print does NOT show captured reports (their full
549 /// rendering can be hundreds of KB) — only the count and the
550 /// presence of callbacks.
551 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
552 f.debug_struct("SnapshotBridge")
553 .field("snapshots", &self.len())
554 .field("watch_count", &self.watch_count())
555 .field("capture", &"<callback>")
556 .field(
557 "register_watch",
558 &if self.register_watch.is_some() {
559 "<callback>"
560 } else {
561 "<none>"
562 },
563 )
564 .finish()
565 }
566}
567
568/// Match the periodic dispatch loop's tag format exactly:
569/// `"periodic_"` + 3 ASCII digits. Source of truth is the
570/// coordinator's `format!("periodic_{:03}", idx)` emission at
571/// `src/vmm/freeze_coord/state.rs`. Distinct from
572/// `tag.starts_with("periodic_")` (which would accept arbitrary
573/// user `Op::CaptureSnapshot` tags whose names happen to share
574/// that prefix and pollute the `periodic_real_count` floor).
575fn is_periodic_tag(tag: &str) -> bool {
576 match tag.strip_prefix("periodic_") {
577 Some(rest) => rest.len() == 3 && rest.bytes().all(|b| b.is_ascii_digit()),
578 None => false,
579 }
580}
581
582impl SnapshotBridge {
583 /// Build a bridge from a capture callback. The callback may
584 /// freeze the VM, build the report, or return `None` when
585 /// capture is unavailable. No watch-register callback —
586 /// `Op::WatchSnapshot` returns "not supported" when the host
587 /// did not wire one. Use [`Self::with_watch_register`] to
588 /// install one.
589 pub fn new(capture: CaptureCallback) -> Self {
590 Self {
591 capture,
592 register_watch: None,
593 kernel_op: None,
594 snapshots: Arc::new(Mutex::new(SnapshotStore::new())),
595 kernel_ops: Arc::new(Mutex::new(Vec::new())),
596 cgroup_procs: Arc::new(Mutex::new(Vec::new())),
597 watch_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
598 accessor_publish_seqno: None,
599 accessor_worker_state: None,
600 accessor_dispatcher_wake_evt: None,
601 }
602 }
603
604 /// Install the accessor-init worker's publish-seqno, worker-state,
605 /// and dispatcher-wake EventFd so
606 /// [`Self::wait_for_accessor_publish_advance`] /
607 /// [`Self::wait_for_worker_state_not_trying`] can block scheduler-
608 /// swap ops until the new scheduler's BPF maps land — with kernel-
609 /// scheduling-tick wake latency rather than the 50 ms sleep tail
610 /// an atomic-only loop would carry. Called by the freeze-coordinator
611 /// when it sets up the worker (`vmm-accessor-init` thread); test
612 /// bridges that don't drive a real worker can omit this call and
613 /// the wait becomes a no-op that returns `Ok(0)` immediately.
614 pub fn with_accessor_state(
615 mut self,
616 publish_seqno: Arc<std::sync::atomic::AtomicU64>,
617 worker_state: Arc<std::sync::atomic::AtomicU8>,
618 dispatcher_wake_evt: Arc<vmm_sys_util::eventfd::EventFd>,
619 ) -> Self {
620 self.accessor_publish_seqno = Some(publish_seqno);
621 self.accessor_worker_state = Some(worker_state);
622 self.accessor_dispatcher_wake_evt = Some(dispatcher_wake_evt);
623 self
624 }
625
626 /// Snapshot of the accessor-init worker's current publish seqno.
627 /// Returns 0 for bridges built without an accessor (test fixtures
628 /// — the dispatch wait sees no advance to gate on and the op
629 /// returns immediately). Read with `Acquire` so the seqno orders
630 /// against the worker's `Release` bump.
631 pub fn accessor_publish_seqno(&self) -> u64 {
632 self.accessor_publish_seqno
633 .as_ref()
634 .map(|s| s.load(std::sync::atomic::Ordering::Acquire))
635 .unwrap_or(0)
636 }
637
638 /// Block until the accessor-init worker exits the TRYING state
639 /// (transitions to SUCCEEDED on its first publish, or to
640 /// FAILED_PERMANENTLY on a terminal worker exit). Used by
641 /// `Op::AttachScheduler` to serialize against the worker's
642 /// concurrent boot-publish: capturing the seqno baseline AFTER
643 /// the boot publish completes is the only way to ensure the
644 /// next observed seqno advance belongs to the just-attached
645 /// scheduler rather than to a co-resident boot scheduler that
646 /// finished its 60 s init in parallel. Event-driven via
647 /// `accessor_dispatcher_wake_evt` — wake latency is one
648 /// kernel scheduling tick, not a 50 ms poll. No-op (Ok(())) for
649 /// bridges without an accessor.
650 pub fn wait_for_worker_state_not_trying(
651 &self,
652 deadline: std::time::Instant,
653 op_label: &str,
654 ) -> anyhow::Result<()> {
655 let Some(state) = self.accessor_worker_state.as_ref() else {
656 return Ok(());
657 };
658 loop {
659 let cur = state.load(std::sync::atomic::Ordering::Acquire);
660 if cur != accessor_worker_state::TRYING {
661 if cur == accessor_worker_state::FAILED_PERMANENTLY {
662 anyhow::bail!(
663 "{op_label}: accessor-init worker exited \
664 (FAILED_PERMANENTLY) before this op could run — \
665 check freeze-coord 'vmm-accessor-init' logs for ELF \
666 parse or boot-budget failures"
667 );
668 }
669 return Ok(());
670 }
671 let now = std::time::Instant::now();
672 if now >= deadline {
673 anyhow::bail!(
674 "{op_label}: accessor-init worker stayed in TRYING state \
675 past the deadline — the worker's first publish has not \
676 completed. Likely cause: kernel boot stalled before \
677 accessor's bootstrap symbols became readable. Check \
678 freeze-coord logs for `accessor-init:` lines"
679 );
680 }
681 let remaining = deadline.saturating_duration_since(now);
682 self.poll_dispatcher_wake(remaining);
683 // Loop back to re-check state. The atomic is the source
684 // of truth — the eventfd is just a fast wake; a spurious
685 // wake (e.g. a publish landing while we're already past
686 // TRYING) drops through to the next state check.
687 }
688 }
689
690 /// Block on the dispatcher-wake EventFd for up to `remaining`.
691 /// Caller's atomic re-check after return is the source of truth.
692 /// Panics if the with_accessor_state coupling invariant is
693 /// violated (see body's unreachable! arm).
694 ///
695 /// Uses raw libc::poll rather than nix::poll::poll because the
696 /// underlying [`vmm_sys_util::eventfd::EventFd`] doesn't impl
697 /// `AsFd`, only `AsRawFd`; the cleaner nix path would require
698 /// an unsafe `BorrowedFd::borrow_raw` wrapper, leaving the
699 /// unsafe surface identical to libc::poll. No net reduction —
700 /// keep the simpler libc call.
701 fn poll_dispatcher_wake(&self, remaining: std::time::Duration) {
702 if remaining.is_zero() {
703 return;
704 }
705 let Some(evt) = self.accessor_dispatcher_wake_evt.as_ref() else {
706 unreachable!(
707 "poll_dispatcher_wake reached without an installed wake fd — \
708 SnapshotBridge::with_accessor_state stores accessor_worker_state \
709 and accessor_dispatcher_wake_evt together, so any caller that \
710 passed the worker_state gate must also have the wake fd. A None \
711 here indicates a bridge constructor that violated the coupling \
712 invariant"
713 );
714 };
715 let ms = remaining.as_millis().min(i32::MAX as u128) as i32;
716 let fd = {
717 use std::os::unix::io::AsRawFd;
718 evt.as_raw_fd()
719 };
720 let mut pfd = libc::pollfd {
721 fd,
722 events: libc::POLLIN,
723 revents: 0,
724 };
725 // SAFETY: pollfd is a POD struct; libc::poll reads/writes
726 // the slice in-bounds for nfds == 1.
727 unsafe {
728 libc::poll(&mut pfd, 1, ms);
729 }
730 // Drain the counter so a subsequent poll doesn't re-fire
731 // immediately on the same edge. EFD_NONBLOCK so an empty fd
732 // returns EAGAIN — ignore the result.
733 let _ = evt.read();
734 }
735
736 /// Block until the accessor-init worker's publish seqno advances
737 /// past `seqno_before`, or until `deadline` elapses. The dispatch
738 /// for `Op::ReplaceScheduler` / `Op::AttachScheduler` calls this
739 /// after spawning the new scheduler so the op only returns success
740 /// once the new scheduler's BPF accessor pair has been re-published
741 /// and a subsequent `Snapshot::active()` reflects the swap.
742 ///
743 /// On timeout the surfaced error reads the worker-state sentinel
744 /// to distinguish "still trying" (transient — operator can bump
745 /// the deadline) from "worker exited" (terminal — retry will
746 /// hang the same way). Bridges without an accessor (test fixtures
747 /// that omit [`Self::with_accessor_state`]) succeed immediately
748 /// with `Ok(0)` so unit-test scenarios don't synthesize a worker.
749 pub fn wait_for_accessor_publish_advance(
750 &self,
751 seqno_before: u64,
752 deadline: std::time::Instant,
753 op_label: &str,
754 ) -> anyhow::Result<u64> {
755 let (seqno, state) = match (
756 self.accessor_publish_seqno.as_ref(),
757 self.accessor_worker_state.as_ref(),
758 ) {
759 (Some(s), Some(w)) => (s, w),
760 _ => return Ok(0),
761 };
762 // Tight initial check so a publish that already landed
763 // between the dispatcher's pre-spawn `accessor_publish_seqno()`
764 // load and this call returns without sleeping.
765 let cur = seqno.load(std::sync::atomic::Ordering::Acquire);
766 if cur > seqno_before {
767 return Ok(cur);
768 }
769 loop {
770 let cur_state = state.load(std::sync::atomic::Ordering::Acquire);
771 if cur_state == accessor_worker_state::FAILED_PERMANENTLY {
772 anyhow::bail!(
773 "{op_label}: accessor-init worker exited (FAILED_PERMANENTLY) — \
774 check freeze-coord 'vmm-accessor-init' logs for ELF parse or \
775 boot-budget failures; retrying the op will hit the same wall"
776 );
777 }
778 let cur = seqno.load(std::sync::atomic::Ordering::Acquire);
779 if cur > seqno_before {
780 return Ok(cur);
781 }
782 let now = std::time::Instant::now();
783 if now >= deadline {
784 let remaining_state = state.load(std::sync::atomic::Ordering::Acquire);
785 anyhow::bail!(
786 "{op_label}: accessor reinit did not advance publish seqno \
787 from {seqno_before} within deadline (worker state = \
788 {remaining_state}; 0=Trying / 1=Succeeded / 2=FailedPermanently). \
789 A reinit that's stuck in Trying past the deadline indicates the \
790 coord's scan-tick hasn't observed the rebind or the worker's \
791 `from_elf_with_hint` retry is hitting a transient address-space \
792 window — check freeze-coord logs for `accessor-init:` lines"
793 );
794 }
795 // Event-driven via accessor_dispatcher_wake_evt: the
796 // worker writes the fd in lock-step with each seqno
797 // bump and on FAILED_PERMANENTLY exit, so poll wakes at
798 // kernel-scheduling-tick latency. Distinct from the
799 // accessor_ready_evt the coord drains (no second-
800 // consumer race). Reaching poll_dispatcher_wake without
801 // an installed wake fd panics — the `(Some, Some)` match
802 // / `return Ok(0)` at the top of this function gates this
803 // branch on both accessor_publish_seqno and
804 // accessor_worker_state being present, and
805 // with_accessor_state stores both together.
806 let remaining = deadline.saturating_duration_since(now);
807 self.poll_dispatcher_wake(remaining);
808 }
809 }
810
811 /// Install a kernel-op callback so
812 /// `Op::WriteKernel{Hot,Cold}` / `Op::ReadKernel{Hot,Cold}` ops
813 /// can dispatch host-side guest-memory writes/reads. Without
814 /// one installed, the in-process executor returns "no
815 /// SnapshotBridge installed" and the ops fall through to the
816 /// virtio-console wire path. Test fixtures use this seam to
817 /// record requests and assert on them without touching real
818 /// guest memory.
819 pub fn with_kernel_op(mut self, callback: KernelOpCallback) -> Self {
820 self.kernel_op = Some(callback);
821 self
822 }
823
824 /// Dispatch a kernel-op request through the installed callback.
825 /// Returns `None` when no callback is installed (the executor
826 /// then falls through to the wire path); returns
827 /// `Some(KernelOpReplyPayload)` otherwise and records the
828 /// reply in the per-tag drain log.
829 pub fn dispatch_kernel_op(
830 &self,
831 request: &crate::vmm::wire::KernelOpRequestPayload,
832 ) -> Option<crate::vmm::wire::KernelOpReplyPayload> {
833 let callback = self.kernel_op.as_ref()?;
834 let reply = callback(request);
835 self.kernel_ops
836 .lock_unpoisoned()
837 .push((request.tag.clone(), reply.clone()));
838 Some(reply)
839 }
840
841 /// Drain the per-tag kernel-op reply log. Returns the accumulated
842 /// `(tag, reply)` pairs in insertion order; leaves the bridge's
843 /// own copy empty so subsequent calls see only newer entries.
844 pub fn drain_kernel_ops(&self) -> Vec<(String, crate::vmm::wire::KernelOpReplyPayload)> {
845 std::mem::take(&mut *self.kernel_ops.lock_unpoisoned())
846 }
847
848 /// Record a `cgroup.procs` snapshot captured by
849 /// [`Op::CaptureCgroupProcs`](crate::scenario::ops::Op::CaptureCgroupProcs).
850 ///
851 /// Appends to the per-bridge log in insertion order. Multiple
852 /// captures of the same `cgroup` (distinguished by `tag`)
853 /// surface as separate entries — the bridge does NOT collapse
854 /// or overwrite on duplicate cgroup names, which lets a scenario
855 /// capture pre/post snapshots of the same cgroup under different
856 /// tags. Multiple captures with the same `(tag, cgroup)` also
857 /// append rather than overwrite; tag uniqueness is a caller
858 /// convention, not a framework-enforced contract.
859 pub fn record_cgroup_procs(&self, tag: String, cgroup: String, pids: Vec<libc::pid_t>) {
860 self.cgroup_procs
861 .lock_unpoisoned()
862 .push(CgroupProcsSnapshot::new(tag, cgroup, pids));
863 }
864
865 /// Drain the per-bridge `cgroup.procs` snapshot log. Returns the
866 /// accumulated [`CgroupProcsSnapshot`]s in insertion order; leaves
867 /// the bridge's own copy empty so subsequent calls observe only
868 /// newer entries. Mirrors [`Self::drain_kernel_ops`]' contract.
869 pub fn drain_cgroup_procs(&self) -> Vec<CgroupProcsSnapshot> {
870 std::mem::take(&mut *self.cgroup_procs.lock_unpoisoned())
871 }
872
873 /// Look up the first `cgroup.procs` snapshot recorded under
874 /// `tag` without consuming the drain log. Returns `None` when no
875 /// snapshot was recorded under that tag (the typical case is a
876 /// typo, a missing capture op, or capture-before-drain ordering
877 /// mistake). For the common single-capture-per-tag pattern this
878 /// collapses the `drain_cgroup_procs().iter().find(|s| s.tag ==
879 /// tag).expect("missing")` 4-combinator dance into a one-liner.
880 ///
881 /// **Clone cost.** Each `CgroupProcsSnapshot` carries `tag`,
882 /// `cgroup`, and `pids: Vec<libc::pid_t>` — the pids vec is
883 /// bounded by the number of tasks in the cgroup at capture
884 /// time (kilobytes at most for realistic test scenarios).
885 /// Mirrors [`Self::kernel_op_value`]'s non-draining-lookup
886 /// shape for sibling-API consistency.
887 ///
888 /// **Duplicate tag.** Returns the FIRST snapshot recorded under
889 /// `tag`. Multiple captures with the same `(tag, cgroup)` are
890 /// stored separately (see [`Self::record_cgroup_procs`]); callers
891 /// who care about the multiplicity should use
892 /// [`Self::drain_cgroup_procs`] and filter the Vec manually.
893 pub fn cgroup_procs_by_tag(&self, tag: &str) -> Option<CgroupProcsSnapshot> {
894 self.cgroup_procs
895 .lock_unpoisoned()
896 .iter()
897 .find(|s| s.tag == tag)
898 .cloned()
899 }
900
901 /// Record a kernel-op reply produced by the host-side wire-path
902 /// dispatcher into the same per-tag drain log that
903 /// [`Self::dispatch_kernel_op`] populates.
904 ///
905 /// The in-process bridge path stores its replies inside
906 /// `dispatch_kernel_op` (which both invokes the callback AND
907 /// pushes the reply into the log). The wire-path used by ops
908 /// running inside a guest VM produces its reply on the host
909 /// freeze-coordinator and frames it back over virtio-console
910 /// port 1 — there is no callback to drive the push, so the
911 /// host coordinator calls this record-only method directly
912 /// after framing each reply. Without this hook, `post_vm`
913 /// callbacks observing [`crate::vmm::VmResult::snapshot_bridge`]
914 /// would see an empty drain log for any guest-side
915 /// `Op::WriteKernel*` / `Op::ReadKernel*` invocation, defeating
916 /// the asserts-from-replies pattern the gated cold-path e2e
917 /// scaffolding pins.
918 pub fn record_kernel_op_reply(
919 &self,
920 tag: String,
921 reply: crate::vmm::wire::KernelOpReplyPayload,
922 ) {
923 self.kernel_ops.lock_unpoisoned().push((tag, reply));
924 }
925
926 /// Look up the first kernel-op reply value recorded under `tag`
927 /// in the kernel-op drain log without consuming the log.
928 ///
929 /// The bulk shape returned by [`Self::drain_kernel_ops`] is
930 /// `Vec<(tag, reply)>` with each reply carrying a
931 /// `Vec<crate::vmm::wire::KernelOpValue>`. For the common
932 /// single-tag single-value read-back lookup, this helper
933 /// collapses the 4-layer unwrap (find by tag → check success →
934 /// index into read_values → match the variant) into a single
935 /// call. Returns `None` when no reply was recorded under `tag`,
936 /// when the reply reported `success = false`, or when the
937 /// reply's `read_values` is empty (e.g. a write-op reply under
938 /// the same tag). Otherwise returns `Some(value)` with the
939 /// first `KernelOpValue` of the first matching reply.
940 ///
941 /// The log is NOT drained — the caller can still inspect via
942 /// [`Self::drain_kernel_ops`] to observe the full per-tag
943 /// history.
944 ///
945 /// **Clone cost.** For `U32` / `U64` the clone is 4 / 8 bytes.
946 /// For `crate::vmm::wire::KernelOpValue::Bytes` the clone
947 /// can be up to `crate::vmm::wire::KERNEL_OP_REPLY_MAX`
948 /// (1 MiB). Hot paths that repeatedly inspect the same tag
949 /// should prefer [`Self::drain_kernel_ops`] + index into the
950 /// returned Vec to avoid the per-call clone.
951 pub fn kernel_op_value(&self, tag: &str) -> Option<crate::vmm::wire::KernelOpValue> {
952 self.kernel_ops
953 .lock_unpoisoned()
954 .iter()
955 .find(|(t, reply)| t == tag && reply.success && !reply.read_values.is_empty())
956 .map(|(_, reply)| reply.read_values[0].clone())
957 }
958
959 /// Install a watch-register callback so [`Op::WatchSnapshot`](crate::scenario::ops::Op::WatchSnapshot)
960 /// ops can attach hardware-watchpoint snapshots. The callback
961 /// is responsible for symbol resolution, watchpoint slot allocation, and
962 /// `KVM_SET_GUEST_DEBUG` arming.
963 pub fn with_watch_register(mut self, register: WatchRegisterCallback) -> Self {
964 self.register_watch = Some(register);
965 self
966 }
967
968 /// Register a hardware-watchpoint snapshot for `symbol`.
969 ///
970 /// Enforces the per-scenario [`MAX_WATCH_SNAPSHOTS`] cap before
971 /// invoking the host's watch-register callback. Returns
972 /// `Err(reason)` when:
973 /// - The cap has been reached (slot 0 reserved + 3 user slots
974 /// allocated).
975 /// - No watch-register callback was installed via
976 /// [`Self::with_watch_register`].
977 /// - The host's callback rejected the request (symbol unresolved,
978 /// alignment violation, ioctl failure).
979 pub fn register_watch(&self, symbol: &str) -> std::result::Result<(), String> {
980 // Reserve a slot via compare_exchange so concurrent callers
981 // can never push the count past MAX_WATCH_SNAPSHOTS even
982 // transiently. The previous fetch_add+rollback path let two
983 // concurrent threads observe `prev < MAX` and increment past
984 // the cap before either rolled back, briefly violating the
985 // invariant `watch_count <= MAX_WATCH_SNAPSHOTS`.
986 loop {
987 let prev = self.watch_count.load(std::sync::atomic::Ordering::Relaxed);
988 if prev >= MAX_WATCH_SNAPSHOTS {
989 return Err(format!(
990 "Op::WatchSnapshot cap exceeded: scenario already registered \
991 {MAX_WATCH_SNAPSHOTS} watchpoints ({MAX_WATCH_SNAPSHOTS} user \
992 watchpoint slots occupied; slot 0 reserved for the error-class \
993 exit_kind trigger). Drop a watch or use Op::CaptureSnapshot for a \
994 time-driven capture instead."
995 ));
996 }
997 if self
998 .watch_count
999 .compare_exchange_weak(
1000 prev,
1001 prev + 1,
1002 std::sync::atomic::Ordering::Relaxed,
1003 std::sync::atomic::Ordering::Relaxed,
1004 )
1005 .is_ok()
1006 {
1007 break;
1008 }
1009 // Lost the CAS to a concurrent register/unregister; reload
1010 // and retry. spurious failures are also retried — that is
1011 // why this uses the _weak variant inside a loop.
1012 }
1013 // Slot reserved. Wrap it in a Drop guard so a panic inside
1014 // `register(symbol)` releases the reservation on unwind — the
1015 // previous manual-fetch_sub rollback only ran on the explicit
1016 // Err(reason) arm, leaking the slot permanently if the
1017 // callback panicked. The success path commits the slot with
1018 // mem::forget after register returns Ok.
1019 let guard = WatchSlotGuard {
1020 count: &self.watch_count,
1021 };
1022 let Some(register) = self.register_watch.as_ref() else {
1023 drop(guard);
1024 return Err(format!(
1025 "Op::WatchSnapshot('{symbol}'): no watch-register callback installed \
1026 on this SnapshotBridge — the host wires one via \
1027 SnapshotBridge::with_watch_register before execute_steps; \
1028 in-guest / no-VM scenarios cannot register hardware watchpoints"
1029 ));
1030 };
1031 register(symbol)?;
1032 std::mem::forget(guard);
1033 Ok(())
1034 }
1035
1036 /// Number of watchpoint snapshots currently registered.
1037 pub fn watch_count(&self) -> usize {
1038 self.watch_count.load(std::sync::atomic::Ordering::Relaxed)
1039 }
1040
1041 /// Drive the capture closure and store the result under `name`.
1042 /// Returns `true` when a report was captured and stored;
1043 /// `false` when the closure returned `None`.
1044 pub fn capture(&self, name: &str) -> bool {
1045 let Some(report) = (self.capture)(name) else {
1046 tracing::warn!(
1047 name,
1048 "SnapshotBridge::capture: capture callback returned None — snapshot unavailable"
1049 );
1050 self.snapshots
1051 .lock_unpoisoned()
1052 .push_event(SnapshotBridgeEvent::CaptureUnavailable {
1053 tag: name.to_string(),
1054 });
1055 return false;
1056 };
1057 self.store(name, report);
1058 true
1059 }
1060
1061 /// Step-aware variant of [`Self::capture`]: drives the capture
1062 /// closure and stores the result under `name`, stamping it with
1063 /// `step_index` so the drained entry's
1064 /// [`super::error::DrainedSnapshotEntry::step_index`] surfaces
1065 /// the scenario phase the capture belongs to.
1066 ///
1067 /// `step_index` is encoded per the 1-indexed phase convention:
1068 /// `0` is the BASELINE settle window, `1..=N` align with
1069 /// scenario Step ordinals. The host-side on-demand-capture
1070 /// dispatch reads
1071 /// [`crate::scenario::Ctx::current_step`] just before the
1072 /// freeze rendezvous and passes the loaded value through this
1073 /// entry point so the downstream phase aggregator can bucket
1074 /// the sample directly.
1075 ///
1076 /// Returns `true` when a report was captured and stored;
1077 /// `false` when the closure returned `None`.
1078 pub fn capture_with_step(&self, name: &str, step_index: u16) -> bool {
1079 let Some(report) = (self.capture)(name) else {
1080 tracing::warn!(
1081 name,
1082 step_index,
1083 "SnapshotBridge::capture_with_step: capture callback returned None — snapshot unavailable"
1084 );
1085 self.snapshots
1086 .lock_unpoisoned()
1087 .push_event(SnapshotBridgeEvent::CaptureUnavailable {
1088 tag: name.to_string(),
1089 });
1090 return false;
1091 };
1092 self.store_internal(name, report, None, None, None, Some(step_index));
1093 true
1094 }
1095
1096 /// Store a pre-built [`FailureDumpReport`] under `name`,
1097 /// bypassing the capture callback. Used by the host-side freeze
1098 /// coordinator after it runs `freeze_and_dispatch(FreezeMode::Capture { gate_on_exit_kind: false })` and
1099 /// wants to publish the resulting report on the bridge for the
1100 /// test author to drain post-VM-exit.
1101 ///
1102 /// Storage is capped at [`MAX_STORED_SNAPSHOTS`] entries to bound
1103 /// host memory under runaway capture cadence (e.g. a Loop step
1104 /// firing `Op::CaptureSnapshot` with a unique tag every iteration).
1105 /// When the cap is reached, the oldest stored entry is evicted
1106 /// with a `tracing::warn!` naming the dropped tag. An overwrite
1107 /// of an existing tag also warns and replaces the prior report
1108 /// in place without disturbing FIFO ordering of other entries.
1109 pub fn store(&self, name: &str, report: FailureDumpReport) {
1110 self.store_internal(name, report, None, None, None, None);
1111 }
1112
1113 /// Bundle a [`FailureDumpReport`] with the scx_stats JSON and
1114 /// elapsed-millisecond timestamp captured at the same periodic
1115 /// boundary. Used by the freeze coordinator's periodic-fire path
1116 /// so [`Sample`](crate::scenario::sample::Sample) can pair the
1117 /// frozen BPF state with the running-scheduler stats observed
1118 /// just before the freeze rendezvous.
1119 ///
1120 /// Stats / elapsed are stored in parallel HashMaps keyed by the
1121 /// same tag as the report. FIFO eviction sweeps all three in
1122 /// lock-step; an overwrite refreshes order and replaces every
1123 /// parallel value (or clears it when the new write passes
1124 /// `None`) so a stale stats / elapsed entry can never accompany
1125 /// a freshly stored report.
1126 pub fn store_with_stats(
1127 &self,
1128 name: &str,
1129 report: FailureDumpReport,
1130 stats: Option<Result<serde_json::Value, super::error::MissingStatsReason>>,
1131 elapsed_ms: Option<u64>,
1132 ) {
1133 self.store_internal(name, report, stats, elapsed_ms, None, None);
1134 }
1135
1136 /// Step-aware variant of [`Self::store_with_stats`]: bundles the
1137 /// scenario phase index alongside the report / stats / elapsed
1138 /// tuple so the drained entry's
1139 /// [`super::error::DrainedSnapshotEntry::step_index`] carries
1140 /// the phase the capture belongs to. The freeze coordinator's
1141 /// periodic-fire path reads
1142 /// [`crate::scenario::Ctx::current_step`] just before the
1143 /// rendezvous and routes the value through this method so each
1144 /// periodic sample is bucketable per phase without a second
1145 /// lookup.
1146 ///
1147 /// `step_index` is encoded per the 1-indexed phase convention
1148 /// — `0` is the BASELINE settle window, `1..=N` align with
1149 /// scenario Step ordinals. All other arguments match
1150 /// [`Self::store_with_stats`] verbatim.
1151 pub fn store_with_stats_and_step(
1152 &self,
1153 name: &str,
1154 report: FailureDumpReport,
1155 stats: Option<Result<serde_json::Value, super::error::MissingStatsReason>>,
1156 elapsed_ms: Option<u64>,
1157 boundary_offset_ms: Option<u64>,
1158 step_index: u16,
1159 ) {
1160 self.store_internal(
1161 name,
1162 report,
1163 stats,
1164 elapsed_ms,
1165 boundary_offset_ms,
1166 Some(step_index),
1167 );
1168 }
1169
1170 fn store_internal(
1171 &self,
1172 name: &str,
1173 report: FailureDumpReport,
1174 stats: Option<Result<serde_json::Value, super::error::MissingStatsReason>>,
1175 elapsed_ms: Option<u64>,
1176 boundary_offset_ms: Option<u64>,
1177 step_index: Option<u16>,
1178 ) {
1179 let mut store = self.snapshots.lock_unpoisoned();
1180 if let Some(existing) = store.reports.insert(name.to_string(), report) {
1181 tracing::warn!(
1182 name,
1183 schema = %existing.schema,
1184 "SnapshotBridge::store: name already had a stored report; overwriting prior capture"
1185 );
1186 store.push_event(SnapshotBridgeEvent::Overwrite {
1187 tag: name.to_string(),
1188 prior_schema: existing.schema.clone(),
1189 });
1190 // Move this tag to the back of the FIFO order so the
1191 // overwrite refreshes its position (newest insertion =
1192 // farthest from eviction). Without this, a hot-rewritten
1193 // tag would still be the oldest and risk eviction even
1194 // when actively updated.
1195 if let Some(pos) = store.order.iter().position(|k| k == name) {
1196 store.order.remove(pos);
1197 }
1198 store.order.push_back(name.to_string());
1199 // Refresh / clear parallel stats and elapsed entries so
1200 // the post-overwrite `(report, stats, elapsed)` tuple is
1201 // self-consistent — a None overwrite must clear the prior
1202 // value rather than carrying forward a stale match from
1203 // an earlier capture.
1204 match stats {
1205 Some(v) => {
1206 store.stats.insert(name.to_string(), v);
1207 }
1208 None => {
1209 store.stats.remove(name);
1210 }
1211 }
1212 match elapsed_ms {
1213 Some(v) => {
1214 store.elapsed_ms.insert(name.to_string(), v);
1215 }
1216 None => {
1217 store.elapsed_ms.remove(name);
1218 }
1219 }
1220 match boundary_offset_ms {
1221 Some(v) => {
1222 store.boundary_offset_ms.insert(name.to_string(), v);
1223 }
1224 None => {
1225 store.boundary_offset_ms.remove(name);
1226 }
1227 }
1228 match step_index {
1229 Some(v) => {
1230 store.step_index.insert(name.to_string(), v);
1231 }
1232 None => {
1233 store.step_index.remove(name);
1234 }
1235 }
1236 return;
1237 }
1238 store.order.push_back(name.to_string());
1239 if let Some(v) = stats {
1240 store.stats.insert(name.to_string(), v);
1241 }
1242 if let Some(v) = elapsed_ms {
1243 store.elapsed_ms.insert(name.to_string(), v);
1244 }
1245 if let Some(v) = boundary_offset_ms {
1246 store.boundary_offset_ms.insert(name.to_string(), v);
1247 }
1248 if let Some(v) = step_index {
1249 store.step_index.insert(name.to_string(), v);
1250 }
1251 while store.reports.len() > MAX_STORED_SNAPSHOTS {
1252 let Some(evicted) = store.order.pop_front() else {
1253 // Defensive: if order is empty while reports is over
1254 // cap something is desynchronised — clear reports to
1255 // restore the invariant rather than loop forever.
1256 let nuked = store.reports.len();
1257 tracing::warn!(
1258 reports_len = nuked,
1259 cap = MAX_STORED_SNAPSHOTS,
1260 "SnapshotBridge::store: order empty while reports over cap — bulk-clearing to restore invariant"
1261 );
1262 store.push_event(SnapshotBridgeEvent::CapInvariantViolation {
1263 reports_len: nuked,
1264 cap: MAX_STORED_SNAPSHOTS,
1265 });
1266 store.reports.clear();
1267 store.stats.clear();
1268 store.elapsed_ms.clear();
1269 store.boundary_offset_ms.clear();
1270 store.step_index.clear();
1271 break;
1272 };
1273 if store.reports.remove(&evicted).is_some() {
1274 tracing::warn!(
1275 evicted = %evicted,
1276 cap = MAX_STORED_SNAPSHOTS,
1277 "SnapshotBridge::store: cap reached, evicting oldest captured snapshot"
1278 );
1279 store.push_event(SnapshotBridgeEvent::Eviction {
1280 evicted_tag: evicted.clone(),
1281 new_tag: name.to_string(),
1282 cap: MAX_STORED_SNAPSHOTS,
1283 });
1284 }
1285 // Sweep the parallel maps in lock-step so a stranded
1286 // stats / elapsed / boundary_offset / step_index entry
1287 // cannot outlive its report.
1288 store.stats.remove(&evicted);
1289 store.elapsed_ms.remove(&evicted);
1290 store.boundary_offset_ms.remove(&evicted);
1291 store.step_index.remove(&evicted);
1292 }
1293 }
1294
1295 /// Snapshot count for diagnostic logging.
1296 pub fn len(&self) -> usize {
1297 self.snapshots.lock_unpoisoned().reports.len()
1298 }
1299
1300 /// True when no snapshots have been captured.
1301 pub fn is_empty(&self) -> bool {
1302 self.snapshots.lock_unpoisoned().reports.is_empty()
1303 }
1304
1305 /// Count of stored periodic-tagged reports that carry REAL BPF
1306 /// state (not placeholders synthesized by rendezvous timeouts /
1307 /// gate suppression). Distinct from
1308 /// [`crate::vmm::VmResult::periodic_fired`], which counts every
1309 /// periodic boundary the freeze coordinator attempted —
1310 /// including the ones that landed only a placeholder when the
1311 /// vCPU rendezvous timed out.
1312 ///
1313 /// This is the "useful data produced" floor: a scheduler that
1314 /// attached but produced nothing but placeholders surfaces as
1315 /// `periodic_real_count() == 0` here even though
1316 /// `periodic_fired` may be `target`. Tests that want a
1317 /// stricter smoke-floor than `periodic_fired >= 1` (which
1318 /// passes on placeholder-only fills) read this query.
1319 ///
1320 /// **Tag-format pin.** The coordinator emits periodic tags via
1321 /// `format!("periodic_{:03}", idx)` at
1322 /// `src/vmm/freeze_coord/state.rs` — always
1323 /// `"periodic_"` + exactly 3 ASCII digits. The match here
1324 /// enforces that exact shape (NOT a loose prefix) so a user
1325 /// `Op::CaptureSnapshot { name: "periodic_kaslr" }` cannot
1326 /// collide with the periodic dispatch namespace and pollute
1327 /// the count.
1328 ///
1329 /// **What "real" measures.** A placeholder report may still
1330 /// carry `vcpu_regs` from a degraded capture (see
1331 /// `src/vmm/freeze_coord/mod.rs` periodic-degraded path —
1332 /// `degraded.vcpu_regs` is preserved into the stored
1333 /// placeholder). The floor here treats those as "not real"
1334 /// because the contract is "the test produced BPF-state data"
1335 /// — vcpu_regs alone don't satisfy that. Tests that want the
1336 /// looser "any capture-attempt landed" floor read
1337 /// [`crate::vmm::VmResult::periodic_fired`] instead.
1338 pub fn periodic_real_count(&self) -> u32 {
1339 let store = self.snapshots.lock_unpoisoned();
1340 let mut n: u32 = 0;
1341 for (tag, report) in &store.reports {
1342 if is_periodic_tag(tag) && !report.is_placeholder {
1343 n = n.saturating_add(1);
1344 }
1345 }
1346 n
1347 }
1348
1349 /// True when a stored report already exists for `name`. Lets the
1350 /// freeze coordinator's final-drain placeholder path skip storing
1351 /// a degraded "coord exited before capture" report on top of a
1352 /// real capture that the in-loop dispatch landed earlier — without
1353 /// this gate, a vCPU thread that re-armed `hit=true` after the
1354 /// in-loop service successfully published the report would have
1355 /// its tag's stored capture overwritten by the placeholder at
1356 /// teardown, presenting tests with a hollow snapshot in place of
1357 /// the real one.
1358 pub fn has(&self, name: &str) -> bool {
1359 self.snapshots.lock_unpoisoned().reports.contains_key(name)
1360 }
1361
1362 /// Take ownership of the captured snapshots, leaving the bridge
1363 /// empty. Drops any periodic-capture stats / elapsed / boundary-
1364 /// offset metadata stored alongside reports — callers that need
1365 /// the stats JSON or per-sample timestamp must use
1366 /// [`Self::drain_ordered_with_stats`] instead.
1367 pub fn drain(&self) -> HashMap<String, FailureDumpReport> {
1368 let mut store = self.snapshots.lock_unpoisoned();
1369 store.order.clear();
1370 store.stats.clear();
1371 store.elapsed_ms.clear();
1372 store.boundary_offset_ms.clear();
1373 store.step_index.clear();
1374 std::mem::take(&mut store.reports)
1375 }
1376
1377 /// Take ownership of the captured snapshots in insertion order,
1378 /// leaving the bridge empty. The returned `Vec` walks
1379 /// `SnapshotStore::order` (the FIFO key list maintained by
1380 /// [`Self::store`]) so periodic captures — whose ordering IS the
1381 /// signal — are returned `periodic_000` first, `periodic_NNN`
1382 /// last. [`Self::drain`] returns a `HashMap` and loses ordering;
1383 /// use this method when ordering matters.
1384 ///
1385 /// An overwrite of an existing tag (the `if let Some(existing) =
1386 /// store.reports.insert(...)` branch in [`Self::store`]) moves
1387 /// the tag to the back of the FIFO — `drain_ordered` therefore
1388 /// returns the LATEST capture under each tag exactly once, in
1389 /// the order of its most-recent insertion.
1390 ///
1391 /// FIFO eviction at [`MAX_STORED_SNAPSHOTS`] drops the oldest
1392 /// tags from `order` AND `reports` together, so a hot run that
1393 /// fired more than the cap returns the most recent
1394 /// [`MAX_STORED_SNAPSHOTS`] captures in insertion order; older
1395 /// captures are gone and [`Self::store`] already logged the
1396 /// eviction.
1397 pub fn drain_ordered(&self) -> Vec<(String, FailureDumpReport)> {
1398 let mut store = self.snapshots.lock_unpoisoned();
1399 let order = std::mem::take(&mut store.order);
1400 let mut reports = std::mem::take(&mut store.reports);
1401 // Stats / elapsed / boundary_offset / step_index are dropped
1402 // with the bridge — callers that need the parallel data must
1403 // use `drain_ordered_with_stats` instead.
1404 store.stats.clear();
1405 store.elapsed_ms.clear();
1406 store.boundary_offset_ms.clear();
1407 store.step_index.clear();
1408 let mut out: Vec<(String, FailureDumpReport)> = Vec::with_capacity(order.len());
1409 for tag in order {
1410 if let Some(report) = reports.remove(&tag) {
1411 out.push((tag, report));
1412 }
1413 }
1414 // Defensive: if any reports remained outside the order Vec
1415 // (an invariant violation that would only fire if a future
1416 // refactor of `store()` desynchronised the two), surface
1417 // them at the tail rather than dropping silently. Their
1418 // relative order is HashMap-iteration-arbitrary but at
1419 // least nothing is lost.
1420 for (tag, report) in reports {
1421 tracing::warn!(
1422 tag,
1423 "SnapshotBridge::drain_ordered: report present in `reports` \
1424 but missing from `order` — surfacing at tail (FIFO \
1425 invariant violation; please file)"
1426 );
1427 store.push_event(SnapshotBridgeEvent::DrainOrderingInvariantViolation {
1428 tag: tag.clone(),
1429 drain_variant: "drain_ordered",
1430 });
1431 out.push((tag, report));
1432 }
1433 out
1434 }
1435
1436 /// Take ownership of the captured snapshots in insertion order
1437 /// along with the parallel scx_stats JSON and per-sample
1438 /// elapsed-ms timestamps (`None` per slot when the tag was
1439 /// captured outside the periodic-capture path or when the stats
1440 /// request failed). Empties the bridge — every parallel map is
1441 /// drained in lock-step so a follow-up call returns an empty
1442 /// vec.
1443 ///
1444 /// The returned tuple shape `(tag, report, stats, elapsed_ms)`
1445 /// is the input to
1446 /// [`SampleSeries::from_drained`](crate::scenario::sample::SampleSeries::from_drained):
1447 /// the bridge owns the raw drainable shape, the higher-level
1448 /// `SampleSeries` view consumes it. Insertion order is the
1449 /// signal — periodic captures land
1450 /// `periodic_000`/`periodic_001`/… in monotonic wall-clock
1451 /// order, and the temporal-assertion patterns walk the vec
1452 /// expecting that ordering.
1453 pub fn drain_ordered_with_stats(&self) -> Vec<super::error::DrainedSnapshotEntry> {
1454 let mut store = self.snapshots.lock_unpoisoned();
1455 let order = std::mem::take(&mut store.order);
1456 let mut reports = std::mem::take(&mut store.reports);
1457 let mut stats = std::mem::take(&mut store.stats);
1458 let mut elapsed = std::mem::take(&mut store.elapsed_ms);
1459 let mut boundary_offset = std::mem::take(&mut store.boundary_offset_ms);
1460 let mut step_index = std::mem::take(&mut store.step_index);
1461 let mut out: Vec<super::error::DrainedSnapshotEntry> = Vec::with_capacity(order.len());
1462 // Bridge-absent stats slot collapses to the typed
1463 // `NoSchedulerBinary` reason: the capture path that produced
1464 // this tag never bundled a stats Result (non-periodic Op
1465 // capture, or periodic without a stats client wired). The
1466 // periodic path always bundles a Some(Result), so a `None`
1467 // here is always the "no scheduler binary" case.
1468 let stats_fallback = || Err(super::error::MissingStatsReason::NoSchedulerBinary);
1469 for tag in order {
1470 if let Some(report) = reports.remove(&tag) {
1471 let s = stats.remove(&tag).unwrap_or_else(stats_fallback);
1472 let e = elapsed.remove(&tag);
1473 let bo = boundary_offset.remove(&tag);
1474 let phase = step_index.remove(&tag);
1475 out.push(super::error::DrainedSnapshotEntry {
1476 tag,
1477 report,
1478 stats: s,
1479 elapsed_ms: e,
1480 boundary_offset_ms: bo,
1481 step_index: phase,
1482 });
1483 }
1484 }
1485 // Defensive tail for desynchronised maps (matches
1486 // `drain_ordered`'s tail behaviour). Any stats / elapsed /
1487 // step_index entries that were not paired with a tag in
1488 // `order` are dropped because they have no anchoring report
1489 // — surfacing them as orphaned tuples would invent a structure
1490 // no consumer expects.
1491 for (tag, report) in reports {
1492 tracing::warn!(
1493 tag,
1494 "SnapshotBridge::drain_ordered_with_stats: report present in `reports` \
1495 but missing from `order` — surfacing at tail (FIFO \
1496 invariant violation; please file)"
1497 );
1498 store.push_event(SnapshotBridgeEvent::DrainOrderingInvariantViolation {
1499 tag: tag.clone(),
1500 drain_variant: "drain_ordered_with_stats",
1501 });
1502 let s = stats.remove(&tag).unwrap_or_else(stats_fallback);
1503 let e = elapsed.remove(&tag);
1504 let bo = boundary_offset.remove(&tag);
1505 let phase = step_index.remove(&tag);
1506 out.push(super::error::DrainedSnapshotEntry {
1507 tag,
1508 report,
1509 stats: s,
1510 elapsed_ms: e,
1511 boundary_offset_ms: bo,
1512 step_index: phase,
1513 });
1514 }
1515 out
1516 }
1517
1518 /// Take ownership of all queued [`SnapshotBridgeEvent`]s in
1519 /// insertion order. Empties the internal log; a follow-up call
1520 /// returns an empty vec. Test authors call this after
1521 /// [`Self::drain_ordered`] / [`Self::drain_ordered_with_stats`]
1522 /// to inspect bridge-side conditions that fired during the
1523 /// scenario (eviction, overwrite, missing capture, invariant
1524 /// violation).
1525 ///
1526 /// Independent of the report drain — events accumulate even on
1527 /// scenarios that never call `drain_ordered*`, and reports
1528 /// remain reachable even on scenarios that never call
1529 /// `drain_events`. Tests that want to fail on a bridge event
1530 /// compose the streams: drain events, inspect, fail with
1531 /// `AssertResult::fail(AssertDetail::new(Other, ...))` if any
1532 /// variant is unexpected.
1533 ///
1534 /// When the events log hit [`MAX_STORED_EVENTS`] and FIFO-evicted
1535 /// older entries since the previous drain, a synthetic
1536 /// [`SnapshotBridgeEvent::EventLogTruncated`] is appended at the
1537 /// tail of the returned vec carrying the dropped count — the
1538 /// operator never silently loses events. The internal dropped
1539 /// counter resets to 0 after every drain.
1540 pub fn drain_events(&self) -> Vec<SnapshotBridgeEvent> {
1541 let mut store = self.snapshots.lock_unpoisoned();
1542 let mut events = std::mem::take(&mut store.events);
1543 if store.events_dropped > 0 {
1544 events.push(SnapshotBridgeEvent::EventLogTruncated {
1545 dropped_count: store.events_dropped,
1546 });
1547 store.events_dropped = 0;
1548 }
1549 events
1550 }
1551
1552 /// Non-draining count of queued [`SnapshotBridgeEvent`]s. Useful
1553 /// for "no bridge events fired" assertions without consuming
1554 /// the log — `assert_eq!(bridge.event_count(), 0)`. Does NOT
1555 /// include the synthetic
1556 /// [`SnapshotBridgeEvent::EventLogTruncated`] marker that
1557 /// [`Self::drain_events`] would append; that marker is
1558 /// drain-time-only and `events_dropped > 0` is observable via
1559 /// the next drain rather than via this counter.
1560 pub fn event_count(&self) -> usize {
1561 self.snapshots.lock_unpoisoned().events.len()
1562 }
1563
1564 /// Install this bridge as the active bridge for the calling
1565 /// thread. The bridge stays installed for the lifetime of the
1566 /// returned [`BridgeGuard`]; on drop the prior bridge (or
1567 /// `None`) is restored.
1568 ///
1569 /// Thread-local because [`execute_steps`](crate::scenario::ops::execute_steps)
1570 /// runs on the calling thread and `Op::CaptureSnapshot` only makes
1571 /// sense in that exact thread's call stack — installing a
1572 /// bridge process-wide would race against parallel test
1573 /// threads.
1574 ///
1575 /// # Cloning for drain
1576 ///
1577 /// `set_thread_local` consumes `self`. The bridge is `Clone`
1578 /// (internal state lives behind `Arc<Mutex<_>>`), so callers
1579 /// that need to inspect / drain the bridge after the scenario
1580 /// runs MUST clone before installing:
1581 ///
1582 /// ```ignore
1583 /// let bridge = SnapshotBridge::new(capture_cb);
1584 /// let drain_handle = bridge.clone(); // share the Arc
1585 /// let _guard = bridge.set_thread_local(); // consume the original
1586 /// // ... execute_scenario runs and records on the thread-local ...
1587 /// let snaps = drain_handle.drain_cgroup_procs();
1588 /// ```
1589 ///
1590 /// Both handles share the same underlying snapshot store
1591 /// (reports, kernel_ops, cgroup_procs); ops recorded against
1592 /// the thread-local are observable via the cloned handle.
1593 /// Forgetting to clone is the single most common bridge-flow
1594 /// footgun for tests that consume capture results.
1595 pub fn set_thread_local(self) -> BridgeGuard {
1596 let prev = ACTIVE_BRIDGE.with(|c| c.borrow_mut().replace(self));
1597 BridgeGuard { prev }
1598 }
1599}
1600
1601thread_local! {
1602 static ACTIVE_BRIDGE: std::cell::RefCell<Option<SnapshotBridge>> =
1603 const { std::cell::RefCell::new(None) };
1604}
1605
1606/// RAII guard returned by [`SnapshotBridge::set_thread_local`].
1607/// Restores the prior thread-local bridge on drop so a nested
1608/// scenario inside an outer one cannot leak its bridge into the
1609/// outer scope.
1610#[must_use = "BridgeGuard restores the prior bridge on drop; bind it"]
1611pub struct BridgeGuard {
1612 prev: Option<SnapshotBridge>,
1613}
1614
1615impl Drop for BridgeGuard {
1616 fn drop(&mut self) {
1617 let prev = self.prev.take();
1618 ACTIVE_BRIDGE.with(|c| {
1619 *c.borrow_mut() = prev;
1620 });
1621 }
1622}
1623
1624/// Run `f` with the active bridge if one is installed. When no
1625/// bridge is installed, returns `None` without invoking `f` — the
1626/// caller's responsibility to fall through to its own no-bridge
1627/// path.
1628pub fn with_active_bridge<R>(f: impl FnOnce(&SnapshotBridge) -> R) -> Option<R> {
1629 ACTIVE_BRIDGE.with(|c| c.borrow().as_ref().map(f))
1630}
1631
1632#[cfg(test)]
1633mod accessor_wait_tests {
1634 use super::*;
1635 use std::sync::Arc;
1636 use std::sync::atomic::{AtomicU8, AtomicU64};
1637 use std::time::{Duration, Instant};
1638
1639 fn bridge_with_accessor_state() -> (
1640 SnapshotBridge,
1641 Arc<AtomicU64>,
1642 Arc<AtomicU8>,
1643 Arc<vmm_sys_util::eventfd::EventFd>,
1644 ) {
1645 let seqno = Arc::new(AtomicU64::new(0));
1646 let worker_state = Arc::new(AtomicU8::new(accessor_worker_state::TRYING));
1647 let wake_evt = Arc::new(
1648 vmm_sys_util::eventfd::EventFd::new(libc::EFD_NONBLOCK)
1649 .expect("eventfd for accessor_dispatcher_wake test fixture"),
1650 );
1651 let cb: CaptureCallback = Arc::new(|_| None);
1652 let bridge = SnapshotBridge::new(cb).with_accessor_state(
1653 seqno.clone(),
1654 worker_state.clone(),
1655 wake_evt.clone(),
1656 );
1657 (bridge, seqno, worker_state, wake_evt)
1658 }
1659
1660 #[test]
1661 fn wait_no_accessor_state_returns_ok_zero_immediately() {
1662 // Bridge built without with_accessor_state — wait degenerates
1663 // to Ok(0) so unit-test scenarios that don't spawn a real
1664 // worker don't synthesize one.
1665 let cb: CaptureCallback = Arc::new(|_| None);
1666 let bridge = SnapshotBridge::new(cb);
1667 let deadline = Instant::now() + Duration::from_secs(60);
1668 assert_eq!(
1669 bridge
1670 .wait_for_accessor_publish_advance(0, deadline, "Op::Test")
1671 .unwrap(),
1672 0
1673 );
1674 }
1675
1676 #[test]
1677 fn wait_seqno_already_advanced_returns_immediately() {
1678 // Pre-advance the seqno so the tight initial check returns
1679 // without polling. Validates the "publish landed between
1680 // dispatch's load and wait call" race window is handled.
1681 let (bridge, seqno, _, _) = bridge_with_accessor_state();
1682 seqno.store(5, std::sync::atomic::Ordering::Release);
1683 let deadline = Instant::now() + Duration::from_secs(60);
1684 assert_eq!(
1685 bridge
1686 .wait_for_accessor_publish_advance(3, deadline, "Op::Test")
1687 .unwrap(),
1688 5
1689 );
1690 }
1691
1692 #[test]
1693 fn wait_observes_worker_publish() {
1694 // Pin: worker bumps seqno + pulses dispatcher wake fd on a
1695 // side thread; dispatch-style wait observes the advance
1696 // within the deadline. Event-driven path — the wake fd
1697 // unblocks poll within a kernel-scheduling tick, so the
1698 // test should complete well under 500 ms even though the
1699 // deadline is 2 s.
1700 let (bridge, seqno, _, wake_evt) = bridge_with_accessor_state();
1701 let bridge_for_thread = bridge.clone();
1702 let seqno_for_thread = seqno.clone();
1703 let wake_for_thread = wake_evt.clone();
1704 let publisher = std::thread::spawn(move || {
1705 std::thread::sleep(Duration::from_millis(100));
1706 seqno_for_thread.fetch_add(1, std::sync::atomic::Ordering::Release);
1707 // Pulse the wake fd in lock-step with the seqno bump,
1708 // matching the worker's contract at the freeze_coord
1709 // publish site.
1710 let _ = wake_for_thread.write(1);
1711 let _ = bridge_for_thread; // keep bridge alive for the thread duration
1712 });
1713 let deadline = Instant::now() + Duration::from_secs(2);
1714 let t0 = Instant::now();
1715 let observed = bridge
1716 .wait_for_accessor_publish_advance(0, deadline, "Op::Test")
1717 .expect("publish observed within deadline");
1718 let elapsed = t0.elapsed();
1719 publisher.join().unwrap();
1720 assert_eq!(observed, 1);
1721 // Event-driven wake — the wait should return shortly after
1722 // the 100ms publisher sleep, NOT at the 2s deadline. A 500ms
1723 // ceiling leaves plenty of slack for slow CI hosts while
1724 // still catching a regression where wait falls back to a
1725 // pure-sleep loop.
1726 assert!(
1727 elapsed < Duration::from_millis(500),
1728 "wait did not wake event-driven; took {elapsed:?} \
1729 (expected close to 100ms)"
1730 );
1731 }
1732
1733 #[test]
1734 fn wait_bails_on_worker_failed_permanently() {
1735 // Pin: when worker_state flips to FAILED_PERMANENTLY the
1736 // wait surfaces the terminal-worker diagnostic instead of
1737 // blocking the full deadline.
1738 let (bridge, _, worker_state, _) = bridge_with_accessor_state();
1739 worker_state.store(
1740 accessor_worker_state::FAILED_PERMANENTLY,
1741 std::sync::atomic::Ordering::Release,
1742 );
1743 let deadline = Instant::now() + Duration::from_secs(60);
1744 let t0 = Instant::now();
1745 let err = bridge
1746 .wait_for_accessor_publish_advance(0, deadline, "Op::Test")
1747 .expect_err("expected terminal-worker bail");
1748 let elapsed = t0.elapsed();
1749 assert!(
1750 elapsed < Duration::from_millis(100),
1751 "wait did not surface terminal state quickly; took {elapsed:?}"
1752 );
1753 let msg = err.to_string();
1754 assert!(
1755 msg.contains("FAILED_PERMANENTLY"),
1756 "bail message missing terminal sentinel: {msg}"
1757 );
1758 assert!(msg.contains("Op::Test"), "bail missing op label: {msg}");
1759 }
1760
1761 #[test]
1762 fn wait_bails_on_deadline_with_worker_state_in_diagnostic() {
1763 // Pin: deadline-exceeded surfaces with the worker state
1764 // attached so the diagnostic distinguishes stuck-in-Trying
1765 // (transient) from FAILED_PERMANENTLY (terminal).
1766 let (bridge, _, _, _) = bridge_with_accessor_state();
1767 let deadline = Instant::now() + Duration::from_millis(120);
1768 let err = bridge
1769 .wait_for_accessor_publish_advance(0, deadline, "Op::Test")
1770 .expect_err("expected deadline bail");
1771 let msg = err.to_string();
1772 assert!(
1773 msg.contains("worker state = 0"),
1774 "deadline diagnostic missing worker_state: {msg}"
1775 );
1776 assert!(
1777 msg.contains("Trying"),
1778 "deadline diagnostic missing state name table: {msg}"
1779 );
1780 }
1781
1782 #[test]
1783 fn accessor_publish_seqno_returns_zero_without_accessor_state() {
1784 let cb: CaptureCallback = Arc::new(|_| None);
1785 let bridge = SnapshotBridge::new(cb);
1786 assert_eq!(bridge.accessor_publish_seqno(), 0);
1787 }
1788
1789 #[test]
1790 fn accessor_publish_seqno_reads_atomic() {
1791 let (bridge, seqno, _, _) = bridge_with_accessor_state();
1792 assert_eq!(bridge.accessor_publish_seqno(), 0);
1793 seqno.store(42, std::sync::atomic::Ordering::Release);
1794 assert_eq!(bridge.accessor_publish_seqno(), 42);
1795 }
1796}
1797
1798#[cfg(test)]
1799mod periodic_tag_tests {
1800 use super::*;
1801
1802 /// `is_periodic_tag` MUST accept the exact format the
1803 /// coordinator emits: `"periodic_"` + 3 ASCII digits. Pins the
1804 /// canonical shape so a regression that relaxed the matcher
1805 /// (e.g., back to `starts_with("periodic_")`) would surface
1806 /// immediately via the rejected-tag cases below.
1807 #[test]
1808 fn is_periodic_tag_accepts_canonical_three_digit_index() {
1809 assert!(is_periodic_tag("periodic_000"));
1810 assert!(is_periodic_tag("periodic_007"));
1811 assert!(is_periodic_tag("periodic_123"));
1812 assert!(is_periodic_tag("periodic_999"));
1813 }
1814
1815 /// User-supplied `Op::CaptureSnapshot` tags whose names start
1816 /// with `"periodic_"` but DON'T match the strict
1817 /// `periodic_NNN` shape MUST be rejected. This is the
1818 /// load-bearing defense against tag collision polluting
1819 /// `periodic_real_count`.
1820 #[test]
1821 fn is_periodic_tag_rejects_user_tag_collisions() {
1822 assert!(!is_periodic_tag("periodic_kaslr"));
1823 assert!(!is_periodic_tag("periodic_user_baseline"));
1824 assert!(!is_periodic_tag("periodic_"));
1825 assert!(!is_periodic_tag("periodic_1"));
1826 assert!(!is_periodic_tag("periodic_12"));
1827 assert!(!is_periodic_tag("periodic_1234"));
1828 assert!(!is_periodic_tag("periodic_00a"));
1829 assert!(!is_periodic_tag("periodic_007 "));
1830 assert!(!is_periodic_tag("PERIODIC_000"));
1831 assert!(!is_periodic_tag("capture_my_thing"));
1832 assert!(!is_periodic_tag(""));
1833 assert!(!is_periodic_tag("periodic"));
1834 }
1835
1836 /// `periodic_real_count` MUST count only canonical
1837 /// `periodic_NNN` tags. A bridge with a real `periodic_000`
1838 /// alongside a real user `periodic_kaslr` capture MUST
1839 /// surface count = 1 — the user tag does NOT inflate the
1840 /// floor.
1841 #[test]
1842 fn periodic_real_count_ignores_user_tag_with_periodic_prefix() {
1843 let cb: CaptureCallback = Arc::new(|_| None);
1844 let bridge = SnapshotBridge::new(cb);
1845 // Real periodic capture from the coordinator.
1846 let real_periodic = crate::monitor::dump::FailureDumpReport {
1847 schema: crate::monitor::dump::SCHEMA_SINGLE.to_string(),
1848 is_placeholder: false,
1849 ..Default::default()
1850 };
1851 bridge.store("periodic_000", real_periodic);
1852 // User-supplied CaptureSnapshot tag that happens to start
1853 // with "periodic_" — MUST NOT count.
1854 let user_capture = crate::monitor::dump::FailureDumpReport {
1855 schema: crate::monitor::dump::SCHEMA_SINGLE.to_string(),
1856 is_placeholder: false,
1857 ..Default::default()
1858 };
1859 bridge.store("periodic_kaslr", user_capture);
1860 // A placeholder under a canonical tag — counted as fired
1861 // but NOT as real.
1862 bridge.store(
1863 "periodic_001",
1864 crate::monitor::dump::FailureDumpReport::placeholder("rendezvous timed out"),
1865 );
1866 assert_eq!(
1867 bridge.periodic_real_count(),
1868 1,
1869 "only the canonical periodic_000 real capture counts; \
1870 user tag periodic_kaslr (even though real) must be \
1871 excluded by the strict matcher",
1872 );
1873 }
1874}
1875
1876#[cfg(test)]
1877mod cgroup_procs_bridge_tests {
1878 use super::*;
1879
1880 /// Pin the Arc-shared-state invariant for `SnapshotBridge.clone()`:
1881 /// a clone made BEFORE `set_thread_local` consumed the original
1882 /// sees writes that the consumed bridge made via `record_cgroup_procs`.
1883 /// This is the user-facing pattern in tests/cgroup_capture_procs_e2e.rs
1884 /// and the doc example on `Op::CaptureCgroupProcs`. A future refactor
1885 /// that accidentally turned `cgroup_procs: Arc<Mutex<_>>` into a
1886 /// non-Arc field would silently break the e2e drain pattern; this
1887 /// test catches it.
1888 #[test]
1889 fn snapshot_bridge_record_cgroup_procs_visible_via_arc_clone() {
1890 let bridge_a = SnapshotBridge::new(Arc::new(|_| None));
1891 let bridge_b = bridge_a.clone();
1892 bridge_a.record_cgroup_procs("tag".into(), "cg".into(), vec![1, 2]);
1893 let drained_b = bridge_b.drain_cgroup_procs();
1894 assert_eq!(drained_b.len(), 1);
1895 assert_eq!(drained_b[0].tag, "tag");
1896 assert_eq!(drained_b[0].cgroup, "cg");
1897 assert_eq!(drained_b[0].pids, vec![1, 2]);
1898 // A drained THROUGH B — pin that A also sees empty afterward
1899 // (single shared Vec under the Arc<Mutex>; drain via either
1900 // handle leaves both at empty).
1901 assert!(
1902 bridge_a.drain_cgroup_procs().is_empty(),
1903 "Arc-shared field — B's drain empties A too",
1904 );
1905 }
1906
1907 /// Pin the consume-on-drain semantic of `drain_cgroup_procs`. The
1908 /// impl uses `std::mem::take` which leaves the source empty after
1909 /// the call returns. A future refactor swapping `mem::take` for
1910 /// `.clone()` would silently duplicate snapshots across drains;
1911 /// this test catches it.
1912 #[test]
1913 fn snapshot_bridge_drain_cgroup_procs_consumes_via_mem_take() {
1914 let bridge = SnapshotBridge::new(Arc::new(|_| None));
1915 bridge.record_cgroup_procs("t1".into(), "cg".into(), vec![1]);
1916 bridge.record_cgroup_procs("t2".into(), "cg".into(), vec![2]);
1917 let drained = bridge.drain_cgroup_procs();
1918 assert_eq!(drained.len(), 2);
1919 // Second drain MUST yield empty — mem::take left the source
1920 // empty after the first drain.
1921 let second = bridge.drain_cgroup_procs();
1922 assert!(second.is_empty(), "drain must consume; got: {second:?}");
1923 // After record-after-drain, only the new entries appear.
1924 bridge.record_cgroup_procs("t3".into(), "cg".into(), vec![3]);
1925 let third = bridge.drain_cgroup_procs();
1926 assert_eq!(third.len(), 1);
1927 assert_eq!(third[0].tag, "t3");
1928 }
1929
1930 /// `cgroup_procs_by_tag` returns the FIRST matching snapshot
1931 /// without consuming the drain log. Mirrors `kernel_op_value(tag)`
1932 /// semantics. The drain log persists past the lookup so subsequent
1933 /// `drain_cgroup_procs` calls observe the same entries.
1934 #[test]
1935 fn snapshot_bridge_cgroup_procs_by_tag_is_non_draining() {
1936 let bridge = SnapshotBridge::new(Arc::new(|_| None));
1937 bridge.record_cgroup_procs("tag_a".into(), "cg_x".into(), vec![10, 20]);
1938 bridge.record_cgroup_procs("tag_b".into(), "cg_y".into(), vec![30]);
1939 // Lookup by tag returns the expected snapshot.
1940 let snap_a = bridge.cgroup_procs_by_tag("tag_a").expect("tag_a present");
1941 assert_eq!(snap_a.pids, vec![10, 20]);
1942 assert_eq!(snap_a.cgroup, "cg_x");
1943 // Unknown tag → None.
1944 assert!(bridge.cgroup_procs_by_tag("tag_unknown").is_none());
1945 // Non-draining: drain still returns both entries.
1946 let drained = bridge.drain_cgroup_procs();
1947 assert_eq!(drained.len(), 2);
1948 // After drain, by_tag finds nothing.
1949 assert!(bridge.cgroup_procs_by_tag("tag_a").is_none());
1950 }
1951}