Files
xenia-rs/crates/xenia-gpu/src/handle.rs
MechaCat02 7a1b6b3306 fix(gpu): GPUBUG-DRAIN-001 — silence VdSwap PM4 fallback under --parallel
The Phase-C VdSwap PM4 ring path (commit 82f3d61) emits two
"PM4_XE_SWAP not consumed by drain" warnings when running:

  exec sylpheed.iso --ui --quiet --halt-on-deadlock \
    --parallel --reservations-table

Lockstep -n 100M never trips it. Two distinct race windows:

(a) Inline backend (--ui forces it): drain(mem, 4096) hit its
    fixed packet cap before reaching the PM4_XE_SWAP we'd just
    injected at the WPTR tail. With 6 CPU threads, the ring
    accumulates >4096 packets between vd_swap callbacks.

(b) Threaded backend (--parallel without --ui): the worker's
    DrainFence handler has a 900 ms deadline and game-batched
    IBs (8-10 M packets observed) keep it from reaching the
    tail in any reasonable budget. If the worker eventually
    drained past the injected packet later, the safety-net
    direct notify would double-count.

Three changes:

* gpu_system.rs: new `drain_until_wptr(target, time_budget)`
  draining by the canary `WorkerThreadMain` predicate
  (read_offset != target) instead of a fixed packet count.
  900 ms deadline mirrors the threaded DrainFence handler.

* handle.rs: inline `drain_to_current_wptr` switches to
  `drain_until_wptr`. DrainFence handler publishes the digest
  mirror BEFORE replying so the CPU's post-drain
  `digest_snapshot` sees fresh stats.

* exports.rs (vd_swap): skip the PM4 ring injection
  unconditionally and route swap notification through
  `notify_xe_swap` directly. Tail-injection is unreliable
  under --parallel for both backends. The slot-0
  fetch-constant patch is deferred (GPUBUG-FETCH-PATCH-001);
  draws=0 today so a stale slot 0 has no observable effect.

Verification:

* cargo test --workspace --release: 556 passing (unchanged).

* Lockstep -n 100M --stable-digest: bit-identical to
  pre-fix master HEAD aa3f1d3.
  {instructions:100000002, imports:987685, unimpl:0, draws:0,
   swaps:2, ...}

* check --parallel --reservations-table -n 30M: 0 warnings
  (was 2). swaps=2.

* exec --gpu-inline --parallel --reservations-table -n 30M:
  0 warnings (was 2 with drained=8M-10M observed). swaps=2.

Audit IDs: GPUBUG-DRAIN-001 (closed),
GPUBUG-FETCH-PATCH-001 (filed, deferred).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-03 17:12:15 +02:00

1045 lines
46 KiB
Rust

//! GPU thread skeleton — types only, no thread spawned yet.
//!
//! M1 step 1 of the concurrency rollout (see
//! `/home/fabi/.claude/plans/good-plese-implement-in-zesty-hickey.md`).
//! This module introduces the **shapes** that the eventual GPU host thread
//! will use, without changing any runtime behavior:
//!
//! - [`GpuCommand`] — the control-plane RPC enum sent CPU→GPU.
//! - [`GpuHandle`] — the CPU-side proxy: command sender + cloned MMIO
//! atomics + an interrupt receiver. Eventually it'll also carry the
//! worker thread's `JoinHandle`.
//! - [`GpuWorker`] — the GPU-side owned state (the `GpuSystem` itself plus
//! the receive end of the command channel and the sender for interrupts).
//! It does not yet have a `run()` method; that lands in step 4.
//!
//! The construction is done via [`GpuSystem::into_handle`], which splits a
//! freshly-built `GpuSystem` into `(GpuWorker, GpuHandle)`. The worker keeps
//! the actual GPU state plus `cmd_rx`/`int_tx`; the handle carries `cmd_tx`,
//! `int_rx`, and clones of the `Arc<AtomicU32>` MMIO mailboxes so the CPU
//! producer side can write WPTR / read RPTR without going through the
//! channel.
//!
//! Until step 4 wires the worker into a real thread, no caller invokes
//! `into_handle` on the live `KernelState.gpu` — the constructor exists for
//! the unit test below and for the synthetic-test path.
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread::{self, JoinHandle};
use std::time::{Duration, Instant};
use crossbeam_channel::{Receiver, Sender, bounded, unbounded};
use xenia_memory::GuestMemory;
use crate::gpu_system::{ExecOutcome, GpuMmio, GpuStats, GpuSystem, PendingInterrupt};
/// Reply channel for a [`GpuCommand::DrainFence`]. Single-shot
/// `bounded(1)` — the GPU sends `()` once it's drained the ring up to the
/// requested wptr, the CPU thread blocks on `recv` until then. M1 step 5
/// is the first user of this; step 1 only validates the type fits.
pub type DrainReply = crossbeam_channel::Sender<()>;
/// Control-plane RPC the CPU thread sends to the GPU thread. Data-plane
/// signals (WPTR/RPTR/INT_STATUS) ride atomic mailboxes instead — see
/// [`GpuMmio`]. Channels are for events that need ordered delivery and
/// (sometimes) a reply.
#[derive(Debug)]
pub enum GpuCommand {
/// `VdInitializeRingBuffer(base, size_log2)`. The kernel hands the GPU
/// the guest-physical base address and dword-size of the primary ring.
/// Today's [`GpuSystem::initialize_ring_buffer`] does this synchronously;
/// from step 4 onward the kernel sends this command instead.
InitializeRing {
base: u32,
size_log2: u32,
},
/// `VdEnableRingBufferRPtrWriteBack(addr, block_size)`. The kernel
/// supplies a guest-memory address into which the GPU should mirror its
/// internal `read_offset_dwords` after each packet.
EnableRptrWriteback {
addr: u32,
block_size_log2: u32,
},
/// Block until the GPU's read pointer has caught up with the supplied
/// `target_wptr` (in dwords). The reply is sent on `reply_tx` once the
/// drain completes (or the worker hits its internal deadline). Used by
/// `vd_swap` to preserve the synchronous "GPU has caught up to the
/// guest's wptr at swap time" semantics.
DrainFence {
target_wptr: u32,
reply_tx: DrainReply,
},
/// Bump the swap counter and post a swap interrupt. Sent by `vd_swap`
/// when the guest commits a frame; the worker's [`GpuSystem::notify_xe_swap`]
/// updates `swaps_seen`/`last_swap` and pushes an `InterruptSource::Swap`
/// onto the (M1.6) `int_tx` channel. Fire-and-forget — no reply.
NotifyXeSwap {
frontbuffer_phys: u32,
width: u32,
height: u32,
},
/// Tear-down signal. The worker drains any in-flight reply channels,
/// drops its `GpuSystem`, and the host thread joins.
Shutdown,
}
/// CPU-side proxy that the kernel and the interpreter loop hold. It owns:
///
/// - The send end of the GPU command channel (`cmd_tx`).
/// - The receive end of the GPU→CPU interrupt channel (`int_rx`). Step 6
/// migrates `kernel.gpu.pending_interrupts` onto this channel.
/// - Cloned `Arc<AtomicU32>` MMIO mailboxes so MMIO write callbacks (which
/// already capture these `Arc`s) can keep working unchanged after the
/// `GpuSystem` itself moves to the worker thread.
///
/// `GpuHandle` is `Send + Sync` by virtue of every field being so already
/// (`Sender`/`Receiver` from crossbeam are `Send + Sync` for `Send` payloads;
/// `Arc<AtomicU32>` is `Send + Sync`). No explicit impl is needed.
///
/// Use [`Self::send_cmd`] to post commands so the parker wake invariant is
/// preserved (see M1.7): every channel send must be paired with a wake of
/// the worker thread, otherwise the worker can sleep through new commands
/// even if `cmd_rx.is_empty()` was momentarily false.
#[derive(Debug, Clone)]
pub struct GpuHandle {
/// Control-plane sender. `clone()`-able so multiple call sites can post
/// commands; crossbeam's `Sender` is `Send + Sync + Clone`.
pub cmd_tx: Sender<GpuCommand>,
/// Interrupt drain channel. Step 6 moves `pending_interrupts` onto this.
/// Until then the `int_tx` half on the worker side is unused.
pub int_rx: Receiver<PendingInterrupt>,
/// Direct access to the MMIO mailbox arcs. The CP_RB_WPTR write that
/// the guest does inside the MMIO region callback already lands in
/// `mmio.cp_rb_wptr` (an `Arc<AtomicU32>`); cloning these here lets
/// CPU-side code do the same atomic read/write that the inline path
/// did, without channel hops.
pub mmio: GpuMmio,
/// Read-side snapshot of GPU stats / cache sizes. Refreshed by the
/// worker each outer loop iteration. Read by [`GpuBackend::digest_snapshot`]
/// and the HUD; cheap copy-out under a brief lock acquisition.
pub digest: Arc<std::sync::Mutex<GpuDigestSnapshot>>,
/// Shared shutdown flag — set by the CPU side during teardown, read
/// by the worker each loop iteration. Cloned here so callers without
/// access to the worker side (e.g. drop guards) can still signal exit.
pub shutdown: Arc<AtomicBool>,
}
impl GpuHandle {
/// Post a command to the worker and wake it. Wraps the raw
/// `cmd_tx.send` with the M1.7 parker discipline: set
/// `wake_pending=true` (Release) and `unpark()` the worker thread.
/// Without the wake, channel sends would not surface to a parked
/// worker — `crossbeam_channel::Sender::send` doesn't unpark by
/// itself.
pub fn send_cmd(
&self,
cmd: GpuCommand,
) -> Result<(), crossbeam_channel::SendError<GpuCommand>> {
let r = self.cmd_tx.send(cmd);
if r.is_ok() {
self.mmio.wake_pending.store(true, Ordering::Release);
if let Ok(g) = self.mmio.worker_thread.lock() {
if let Some(t) = g.as_ref() {
t.unpark();
}
}
}
r
}
}
/// Periodically-refreshed read-side snapshot of GPU state. Updated by the
/// worker thread on each outer loop iteration; consumed by the CPU side
/// for the run-digest at end-of-run, the HUD, etc. Held in an
/// `Arc<Mutex<GpuDigestSnapshot>>` shared between worker and handle.
///
/// Snapshot is intentionally restricted to `u64` counters — they're
/// cheap to copy and survive past the worker's lifetime (so the digest
/// can be computed even after the worker has shut down). For the
/// inline backend [`GpuBackend::digest_snapshot`] computes the same view
/// directly from the live `GpuSystem` without any locking.
#[derive(Debug, Clone, Default)]
pub struct GpuDigestSnapshot {
pub stats: GpuStats,
pub shader_blobs_live: u64,
pub texture_cache_entries: u64,
pub texture_decodes: u64,
}
/// Reverse end of the command and interrupt channels. The GPU thread
/// (`GpuWorker::run`) reads from `cmd_rx`, pushes onto `int_tx` (M1.6+),
/// owns the actual `GpuSystem`, and refreshes the shared
/// `GpuDigestSnapshot`. Built by [`GpuSystem::into_handle`] alongside its
/// matching [`GpuHandle`].
///
/// (No `#[derive(Debug)]` because `GpuSystem` itself isn't `Debug`; we
/// don't need it on the worker for any production purpose.)
pub struct GpuWorker {
/// The GPU subsystem itself. The worker thread is its exclusive
/// owner once spawned.
pub system: GpuSystem,
/// Receive end of the control channel.
pub cmd_rx: Receiver<GpuCommand>,
/// Send end of the interrupt channel. M1.6 wires this in.
pub int_tx: Sender<PendingInterrupt>,
/// Shared digest snapshot, refreshed each outer loop iteration.
pub digest: Arc<std::sync::Mutex<GpuDigestSnapshot>>,
/// Shutdown flag. Set by `shutdown_and_join_with_timeout`; the worker
/// loop checks `Acquire` each iteration.
pub shutdown: Arc<AtomicBool>,
}
impl GpuSystem {
/// Split a freshly-built `GpuSystem` into a `(GpuWorker, GpuHandle)`
/// pair. The handle keeps cloned `Arc<AtomicU32>` MMIO mailboxes plus
/// the channel sender; the worker keeps the system itself plus the
/// channel receiver and the interrupt sender.
///
/// Channels are unbounded (`crossbeam_channel::unbounded`) because the
/// CPU side never blocks on a control-plane send — guest-driven export
/// rates are bounded by the interpreter throughput, and interrupts are
/// already coalesced upstream by the kernel.
///
/// Caller supplies a shared `shutdown: Arc<AtomicBool>` so the worker
/// and the CPU side can coordinate teardown. For unit tests that don't
/// care about lifecycle, [`Self::into_handle_test`] supplies a fresh
/// flag.
pub fn into_handle_with_shutdown(
self,
shutdown: Arc<AtomicBool>,
) -> (GpuWorker, GpuHandle) {
let mmio = self.mmio.clone();
let (cmd_tx, cmd_rx) = unbounded::<GpuCommand>();
let (int_tx, int_rx) = unbounded::<PendingInterrupt>();
let digest = Arc::new(std::sync::Mutex::new(GpuDigestSnapshot::default()));
let worker = GpuWorker {
system: self,
cmd_rx,
int_tx,
digest: digest.clone(),
shutdown: shutdown.clone(),
};
let handle = GpuHandle {
cmd_tx,
int_rx,
mmio,
digest,
shutdown,
};
(worker, handle)
}
/// Convenience for tests: allocate a fresh shutdown flag and split.
pub fn into_handle(self) -> (GpuWorker, GpuHandle) {
self.into_handle_with_shutdown(Arc::new(AtomicBool::new(false)))
}
}
/// Polling interval for the no-op worker's shutdown check. A short sleep
/// avoids burning a host core while still keeping shutdown latency under
/// 10 ms, well below the 1 s defensive timeout in
/// [`shutdown_and_join_with_timeout`].
const NOOP_WORKER_POLL: Duration = Duration::from_millis(2);
/// Maximum time the worker waits in `park_timeout` before re-checking
/// shutdown / commands / ring state. With `unpark()` on every guest WPTR
/// write the typical wake latency is microseconds; this is the upper
/// bound for the shutdown / quiescent-state polling cadence. 16 ms aligns
/// with vsync cadence on a 60 Hz host and bounds shutdown latency at the
/// same value.
const WORKER_PARK_TIMEOUT: Duration = Duration::from_millis(16);
/// Cap on packets executed per outer-loop iteration before the worker
/// re-checks shutdown / commands / digest publish. Mirrors the inline-mode
/// `gpu_runs = max(1, min(64, executed_this_round / 6))` pacer ceiling.
const WORKER_PACKETS_PER_ITER: u32 = 64;
/// Backend for the kernel's `gpu` field. The two variants share a thin
/// dispatch layer (forwarding methods on this enum) so call sites in
/// `xenia-kernel` exports stay terse.
///
/// - [`GpuBackend::Inline`] keeps the legacy synchronous path: the CPU
/// thread calls `kernel.gpu.execute_one(mem)` directly each scheduler
/// round. Selected by `--gpu-inline` (rollback flag) or implied by
/// `--ui` until the UI worker is migrated.
/// - [`GpuBackend::Threaded`] (**default at M1.9**) hands `GpuSystem`
/// ownership to a dedicated host thread; the CPU thread holds a
/// [`GpuHandle`] proxy and talks to the worker via channels + the
/// shared MMIO atomics.
///
/// `GpuBackend` itself is `Send` (the inline variant carries `GpuSystem`,
/// which is Send-able as long as nothing inside it is `!Send` — it isn't);
/// the threaded variant carries a `Send + Sync` handle.
pub enum GpuBackend {
Inline(GpuSystem),
Threaded(GpuHandle),
}
impl GpuBackend {
/// Read the MMIO mailbox struct (cheap — `GpuMmio` is `Clone` cloning
/// only `Arc<AtomicU32>`s; we hand back a borrow). The result is the
/// same `Arc<AtomicU32>` set on either backend, so MMIO region
/// callbacks installed via [`crate::build_mmio_region`] route guest
/// writes to the same atomics the worker reads.
pub fn mmio(&self) -> &GpuMmio {
match self {
GpuBackend::Inline(s) => &s.mmio,
GpuBackend::Threaded(h) => &h.mmio,
}
}
/// Convenience: borrow the inline `GpuSystem` for code paths that
/// haven't been generalized to the `Threaded` variant yet (vd_swap's
/// drain, the various `state.gpu.X` reads in M1.5+ work). Returns
/// `None` in threaded mode; the caller's responsibility is to handle
/// that gracefully — typically by treating the operation as a no-op
/// or routing it through a command (see `vd_swap` notes).
pub fn as_inline(&self) -> Option<&GpuSystem> {
match self {
GpuBackend::Inline(s) => Some(s),
GpuBackend::Threaded(_) => None,
}
}
/// Mutable counterpart of [`Self::as_inline`].
pub fn as_inline_mut(&mut self) -> Option<&mut GpuSystem> {
match self {
GpuBackend::Inline(s) => Some(s),
GpuBackend::Threaded(_) => None,
}
}
/// Forward `VdInitializeRingBuffer`. Inline mode applies it directly;
/// threaded mode posts an `InitializeRing` command onto the worker
/// channel.
pub fn initialize_ring_buffer(&mut self, base: u32, size_log2: u32) {
match self {
GpuBackend::Inline(s) => s.initialize_ring_buffer(base, size_log2),
GpuBackend::Threaded(h) => {
let _ = h.send_cmd(GpuCommand::InitializeRing { base, size_log2 });
}
}
}
/// Forward `VdEnableRingBufferRPtrWriteBack`.
pub fn enable_rptr_writeback(&mut self, addr: u32, block_log2: u32) {
match self {
GpuBackend::Inline(s) => s.enable_rptr_writeback(addr, block_log2),
GpuBackend::Threaded(h) => {
let _ = h.send_cmd(GpuCommand::EnableRptrWriteback {
addr,
block_size_log2: block_log2,
});
}
}
}
/// Bump `CP_RB_WPTR` by `dwords`. Both backends route the bump through
/// the shared MMIO atomic mailbox (`Acquire`-load → wrap-add →
/// `Release`-store). Inline mode then picks up the new value on its
/// next `sync_with_mmio`; threaded mode's worker observes the same
/// atomic and folds it into its ring view.
///
/// Note: the value stored is unmodulo'd. The reading side's
/// `sync_with_mmio` does the `% ring.size_dwords` step before
/// updating the local ring view, which is the only place a `size_dwords`
/// reference exists. Practical wptr drift before u32 wraps is
/// `2^32 / 64 ≈ 67M` VdSwap-style bumps — safely above any plausible
/// single-run total.
pub fn extend_write_ptr_by(&mut self, dwords: u32) {
let mmio = self.mmio();
// Relaxed is sufficient for the load — we re-store with Release
// and the readers (worker `sync_with_mmio` / inline next round)
// do their own Acquire. The load here is just a value source.
let cur = mmio.cp_rb_wptr.load(Ordering::Relaxed);
mmio.cp_rb_wptr
.store(cur.wrapping_add(dwords), Ordering::Release);
}
/// Read the current ring write pointer from the shared MMIO atomic.
/// Acquire-load so any prior ring-memory writes the caller did before
/// asking are not reordered past this read by the compiler. Used by
/// `vd_swap` to compute the ring offset for direct PM4 injection.
pub fn mmio_cp_rb_wptr_load(&self) -> u32 {
self.mmio().cp_rb_wptr.load(Ordering::Acquire)
}
/// Drain any PM4 packets currently exposed by the ring (i.e., up to
/// the current `CP_RB_WPTR`). Inline mode runs the synchronous
/// drain. Threaded mode posts a [`GpuCommand::DrainFence`] and blocks
/// on the reply channel up to a 1 s defensive timeout — the worker
/// has its own ~900 ms internal deadline so the reply is bounded.
///
/// The CPU thread blocking here is sound: the only thread that
/// satisfies the reply is the GPU worker, which never tries to
/// acquire any CPU-side primitive (it talks back exclusively through
/// channels and atomics). The lock-ordering argument from the M1.4
/// plan holds: T_cpu → cmd_tx → T_gpu → reply_tx → T_cpu, no cycle.
pub fn drain_to_current_wptr(&mut self, mem: &dyn xenia_memory::MemoryAccess) -> u32 {
match self {
GpuBackend::Inline(s) => {
s.sync_with_mmio();
// GPUBUG-DRAIN-001: drain until target WPTR is reached (or
// 900 ms deadline), mirroring canary's `WorkerThreadMain` and
// the threaded `DrainFence` semantics. The previous fixed
// 4096-packet budget hit the cap under `--parallel`, where
// CPU runs many more PPC blocks per kernel-callback boundary
// and the ring backs up past 4096 packets before vd_swap
// fires; the safety-net fallback warning fired twice for
// each Sylpheed run.
let target = s.mmio.cp_rb_wptr.load(Ordering::Acquire);
s.drain_until_wptr(mem, target, Duration::from_millis(900))
}
GpuBackend::Threaded(h) => {
let target_wptr = h.mmio.cp_rb_wptr.load(Ordering::Acquire);
let (reply_tx, reply_rx) = bounded::<()>(1);
if h
.send_cmd(GpuCommand::DrainFence {
target_wptr,
reply_tx,
})
.is_err()
{
// Worker disconnected; treat as drained.
return 0;
}
match reply_rx.recv_timeout(Duration::from_secs(1)) {
Ok(()) => {
// We don't currently track the exact packet count
// drained on the threaded path — the worker drains
// by `is_ready` predicate. Return 1 as a "drain
// happened" sentinel; the inline mode's exact
// count is a debug-trace nicety.
1
}
Err(_) => {
tracing::warn!(
target: "gpu",
target_wptr,
"vd_swap drain fence timed out at 1s; continuing teardown",
);
0
}
}
}
}
}
/// Bump `swaps_seen` + record `last_swap` + push a swap interrupt.
/// Inline calls directly. Threaded sends `NotifyXeSwap` over the
/// command channel — fire-and-forget; the worker handles it on its
/// next loop iteration.
pub fn notify_xe_swap(&mut self, frontbuffer_phys: u32, width: u32, height: u32) {
match self {
GpuBackend::Inline(s) => s.notify_xe_swap(frontbuffer_phys, width, height),
GpuBackend::Threaded(h) => {
let _ = h.send_cmd(GpuCommand::NotifyXeSwap {
frontbuffer_phys,
width,
height,
});
}
}
}
/// Forward [`GpuSystem::has_pending_interrupts`] under inline mode;
/// under threaded mode peek the `int_rx` channel.
pub fn has_pending_interrupts(&self) -> bool {
match self {
GpuBackend::Inline(s) => s.has_pending_interrupts(),
GpuBackend::Threaded(h) => !h.int_rx.is_empty(),
}
}
/// Drain pending interrupts. Inline path forwards to
/// [`GpuSystem::take_pending_interrupts`]; threaded path drains the
/// channel non-blockingly. (M1 step 6 fully wires the threaded path
/// when the worker starts pushing onto `int_tx`; for now, the channel
/// is empty in threaded mode at -n 2M, so this still returns an empty
/// `Vec`.)
pub fn take_pending_interrupts(&mut self) -> Vec<PendingInterrupt> {
match self {
GpuBackend::Inline(s) => s.take_pending_interrupts(),
GpuBackend::Threaded(h) => {
let mut out = Vec::new();
while let Ok(pi) = h.int_rx.try_recv() {
out.push(pi);
}
out
}
}
}
/// End-of-run snapshot used by the run-digest. Inline mode reads
/// directly; threaded mode pulls the latest published mirror under a
/// brief lock. Returns owned data — safe to use after the worker has
/// shut down.
pub fn digest_snapshot(&self) -> GpuDigestSnapshot {
match self {
GpuBackend::Inline(s) => GpuDigestSnapshot {
stats: s.stats.clone(),
shader_blobs_live: s.shader_blobs.len() as u64,
texture_cache_entries: s.texture_cache.len() as u64,
texture_decodes: s.texture_cache.decodes_total,
},
GpuBackend::Threaded(h) => h
.digest
.lock()
.expect("GpuDigestSnapshot mutex poisoned")
.clone(),
}
}
}
impl GpuWorker {
/// Run loop body for the GPU host thread.
///
/// Each iteration:
/// 1. Check the `Acquire`-loaded shutdown flag — exit if set.
/// 2. Drain any pending control-plane commands non-blockingly.
/// 3. Sample MMIO (refreshes WPTR / RPTR mailboxes into the live ring).
/// 4. Execute up to [`WORKER_PACKETS_PER_ITER`] PM4 packets while the
/// ring is non-empty / not blocked.
/// 5. Refresh the shared digest snapshot under a brief lock.
/// 6. If no work was done this iteration, sleep
/// [`WORKER_IDLE_SLEEP`]. Step 7 swaps this for `park_timeout`.
///
/// `memory: Arc<GuestMemory>` is shared with the CPU thread. The
/// worker only ever reads through `&*memory`, which deref-coerces to
/// `&GuestMemory` and then to `&dyn MemoryAccess`. All mutations on
/// `MemoryAccess` are `&self` post-trait-flip, so concurrent CPU and
/// GPU writes are sound under the trait's contract (callers must not
/// concurrently read/write the same byte range from different
/// threads — vd_swap's RPTR writeback / EVENT_WRITE_SHD writes target
/// guest-thread-private addresses by construction).
pub fn run(mut self, memory: Arc<GuestMemory>) {
// M1.7 parker registration: publish our `Thread` handle so the
// MMIO `CP_RB_WPTR` write callback can `unpark()` us. Only one
// worker thread per `GpuMmio`; we replace whatever was there.
if let Ok(mut g) = self.system.mmio.worker_thread.lock() {
*g = Some(thread::current());
}
loop {
// (1) shutdown
if self.shutdown.load(Ordering::Acquire) {
break;
}
// (2) drain commands
let mut did_work = false;
while let Ok(cmd) = self.cmd_rx.try_recv() {
did_work = true;
match cmd {
GpuCommand::InitializeRing { base, size_log2 } => {
self.system.initialize_ring_buffer(base, size_log2);
}
GpuCommand::EnableRptrWriteback {
addr,
block_size_log2,
} => {
self.system.enable_rptr_writeback(addr, block_size_log2);
}
GpuCommand::DrainFence {
target_wptr: _,
reply_tx,
} => {
// Drain the ring up to whatever WPTR the MMIO
// atomic currently exposes (the CPU side bumped
// it before sending the fence). Bounded by an
// internal 900 ms deadline — 100 ms tighter than
// the CPU's `recv_timeout(1s)` so the timeout
// surfaces on the CPU side as a clean
// `RecvTimeout` rather than a partial drain that
// looks complete.
//
// The drain loop polls `is_ready` after each
// packet; `sync_with_mmio` between packets is
// what folds late guest WPTR writes into the
// local ring view. Loop exits when the ring is
// empty (rptr == wptr after modulo) or a packet
// returns `Idle`/`Blocked`.
self.system.sync_with_mmio();
let deadline = Instant::now() + Duration::from_millis(900);
while self.system.is_ready(&*memory) {
if Instant::now() >= deadline {
break;
}
match self.system.execute_one(&*memory) {
ExecOutcome::Stepped { .. } => {
self.system.sync_with_mmio();
}
ExecOutcome::Idle | ExecOutcome::Blocked => break,
}
}
// GPUBUG-DRAIN-001: publish the digest mirror BEFORE
// replying so the CPU's post-drain `digest_snapshot`
// observes the `swaps_seen` bump from any
// PM4_XE_SWAP we just consumed. Without this the
// outer-loop publish at step 5b races the CPU's
// post_swap_counter check and the kernel-side
// `vd_swap` fires the "PM4_XE_SWAP not consumed"
// safety-net warning even when the swap landed.
let snap = GpuDigestSnapshot {
stats: self.system.stats.clone(),
shader_blobs_live: self.system.shader_blobs.len() as u64,
texture_cache_entries: self.system.texture_cache.len() as u64,
texture_decodes: self.system.texture_cache.decodes_total,
};
if let Ok(mut g) = self.digest.lock() {
*g = snap;
}
let _ = reply_tx.send(());
}
GpuCommand::NotifyXeSwap {
frontbuffer_phys,
width,
height,
} => {
self.system
.notify_xe_swap(frontbuffer_phys, width, height);
}
GpuCommand::Shutdown => {
self.shutdown.store(true, Ordering::Release);
return;
}
}
}
// (3,4) drive the GPU
self.system.sync_with_mmio();
let mut budget = WORKER_PACKETS_PER_ITER;
while budget > 0 && self.system.is_ready(&*memory) {
match self.system.execute_one(&*memory) {
ExecOutcome::Stepped { .. } => {
did_work = true;
self.system.sync_with_mmio();
}
ExecOutcome::Idle | ExecOutcome::Blocked => break,
}
budget -= 1;
}
// (5a) M1.6: forward `PM4_INTERRUPT` / `XE_SWAP` events from
// `system.pending_interrupts` onto `int_tx`. The Vec
// lives on this thread; the channel is the cross-thread
// delivery primitive. Send is non-blocking (unbounded)
// and the receive end is drained by the CPU thread's
// per-round queue at `main.rs::run_execution`.
// `int_tx.send` returns `Err` only if the receiver was
// dropped — which means the CPU side is gone, in which
// case we'd be torn down momentarily anyway.
for pi in self.system.take_pending_interrupts() {
if self.int_tx.send(pi).is_err() {
break;
}
did_work = true;
}
// (5b) publish digest snapshot
if did_work {
let snap = GpuDigestSnapshot {
stats: self.system.stats.clone(),
shader_blobs_live: self.system.shader_blobs.len() as u64,
texture_cache_entries: self.system.texture_cache.len() as u64,
texture_decodes: self.system.texture_cache.decodes_total,
};
if let Ok(mut g) = self.digest.lock() {
*g = snap;
}
}
// (6) M1.7 parker — `park_timeout` replaces the polling
// sleep. The standard parker idiom defends against the
// producer-races-park lost-wakeup:
//
// 1. Swap `wake_pending` to false (claim "we're going to
// park"). If `was_pending` is true, a producer
// signaled us between the last work and now — skip
// the park, loop and re-process.
// 2. Re-check side conditions (cmd channel, shutdown).
// These may have changed after step 1.
// 3. `park_timeout`. If the producer's `unpark()` runs
// between our re-check and our park call, std's token
// records it and the next park returns immediately.
// If neither happens within `WORKER_PARK_TIMEOUT`, we
// wake on our own and re-evaluate.
if !did_work {
let was_pending =
self.system.mmio.wake_pending.swap(false, Ordering::AcqRel);
if !was_pending
&& self.cmd_rx.is_empty()
&& !self.shutdown.load(Ordering::Acquire)
{
thread::park_timeout(WORKER_PARK_TIMEOUT);
}
}
}
// Clear the wake target on exit so post-shutdown MMIO writes
// don't try to unpark a dead thread (sound — `Thread::unpark`
// on an exited thread is a no-op — but it keeps the invariant
// tidy).
if let Ok(mut g) = self.system.mmio.worker_thread.lock() {
*g = None;
}
}
}
/// Spawn the real GPU worker thread. Returns its `JoinHandle`; the
/// matching `GpuHandle` (caller's existing one from
/// [`GpuSystem::into_handle_with_shutdown`]) is what the CPU thread keeps.
pub fn spawn_gpu_worker(
worker: GpuWorker,
memory: Arc<GuestMemory>,
) -> JoinHandle<()> {
thread::Builder::new()
.name("xenia-gpu".to_string())
.spawn(move || worker.run(memory))
.expect("spawn xenia-gpu worker thread")
}
/// M1 step 3 — spawn a placeholder GPU worker thread that does nothing
/// except poll `shutdown` on a short cadence and exit cleanly when it sees
/// `true`. Verifies thread lifecycle, signal propagation, and clean
/// teardown. **Not used in production paths**: in step 4 the worker grows a
/// real `GpuWorker::run` body that owns a `GpuSystem`. Until then this
/// function is the only spawn site, gated behind `--gpu-thread` in the CLI.
///
/// The function returns the `JoinHandle` so the caller can block on
/// teardown via [`shutdown_and_join_with_timeout`]. Ownership of the
/// `Arc<AtomicBool>` is shared: the caller keeps a clone for signaling,
/// the thread takes another clone for polling.
///
/// Rationale for `Acquire`/`Release` ordering on the bool: the spawning
/// thread may set up shared state *before* the worker reads it once we
/// expand the worker in step 4 — the `Release` store on shutdown then
/// pairs with the `Acquire` load here so any prior writes the spawner did
/// (e.g. populating channels with farewell messages) are visible to the
/// worker. For the no-op stage there's no shared state, but using the
/// stricter ordering now means step 4 inherits a correctly-fenced
/// shutdown protocol with no further changes.
pub fn spawn_noop_worker(shutdown: Arc<AtomicBool>) -> JoinHandle<()> {
thread::Builder::new()
.name("xenia-gpu-noop".to_string())
.spawn(move || {
while !shutdown.load(Ordering::Acquire) {
thread::sleep(NOOP_WORKER_POLL);
}
})
.expect("spawn xenia-gpu-noop worker thread")
}
/// Signal `shutdown` to the worker and join its thread, with a defensive
/// timeout so a misbehaving worker can't wedge the entire process. Logs at
/// `error!` if the timeout fires (which would indicate either the worker
/// loop ignoring `shutdown` or being parked on a primitive that wasn't
/// woken — both are bugs the user should hear about).
///
/// Returns `Ok(())` on a clean join inside the timeout, `Err(())` on
/// timeout. The caller decides whether to continue process teardown anyway
/// (typically yes — the worker's only state is its own stack).
pub fn shutdown_and_join_with_timeout(
shutdown: &Arc<AtomicBool>,
handle: JoinHandle<()>,
timeout: Duration,
) -> Result<(), ()> {
shutdown.store(true, Ordering::Release);
// No `JoinHandle::join_timeout` in std; emulate via a side-channel
// signal from the polling. We use a sentinel-clone of the JoinHandle
// approach: spawn a watchdog that, after the timeout, sets a
// "give up" flag — but `join` is a blocking call we can't preempt.
// Instead use a parking helper: spawn a hop-thread to call join, and
// wait on it via a `crossbeam_channel::after` select.
let (tx, rx) = unbounded::<()>();
let join_thread = thread::Builder::new()
.name("xenia-gpu-joiner".to_string())
.spawn(move || {
let _ = handle.join();
// `_ = tx.send(())` — receiver may already be dropped if
// we timed out, in which case Err is fine.
let _ = tx.send(());
})
.expect("spawn xenia-gpu-joiner thread");
crossbeam_channel::select! {
recv(rx) -> _ => {
// Joiner finished within the budget. Reap it (no work — the
// thread already returned). `join` here is fast.
let _ = join_thread.join();
Ok(())
}
recv(crossbeam_channel::after(timeout)) -> _ => {
tracing::error!(
target: "gpu",
?timeout,
"GPU worker did not exit in time; leaking thread to avoid wedging shutdown",
);
// Detach the joiner; it leaks but at least we proceed. Will
// get cleaned up when the process exits.
std::mem::drop(join_thread);
Err(())
}
}
}
#[cfg(test)]
mod tests {
use super::*;
/// Roundtrip an `InitializeRing` command: CPU side sends, worker side
/// receives, payload bytes match. This validates the channel plumbing
/// without touching any GPU semantics.
#[test]
fn initialize_ring_roundtrips_through_handle() {
let (worker, handle) = GpuSystem::new().into_handle();
let GpuWorker { cmd_rx, .. } = worker;
handle
.cmd_tx
.send(GpuCommand::InitializeRing {
base: 0x1000_0000,
size_log2: 18,
})
.expect("cmd_tx send");
match cmd_rx.recv().expect("cmd_rx recv") {
GpuCommand::InitializeRing { base, size_log2 } => {
assert_eq!(base, 0x1000_0000);
assert_eq!(size_log2, 18);
}
other => panic!("unexpected cmd: {other:?}"),
}
}
/// MMIO atomics on the handle and the worker's GpuSystem must be the
/// same Arc (clone). A guest write through the MMIO region callback
/// stores into `handle.mmio.cp_rb_wptr`; the worker observes the same
/// value via `worker.system.mmio.cp_rb_wptr`. If we accidentally
/// allocated a fresh atomic for either side, the worker would never see
/// guest writes.
#[test]
fn mmio_arcs_are_shared_between_handle_and_worker() {
use std::sync::atomic::Ordering;
let (worker, handle) = GpuSystem::new().into_handle();
handle.mmio.cp_rb_wptr.store(0xC0FFEE, Ordering::Release);
assert_eq!(
worker.system.mmio.cp_rb_wptr.load(Ordering::Acquire),
0xC0FFEE,
"worker side did not observe handle-side atomic store",
);
}
/// `GpuHandle` must be `Send + Sync`. Compile-time assertion via type
/// constraint — won't link if the bound is violated.
#[test]
fn handle_is_send_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<GpuHandle>();
}
/// `GpuWorker` must be `Send` so we can move it onto a thread in step 3.
/// (`Sync` is not required — only one thread ever owns the worker.)
#[test]
fn worker_is_send() {
fn assert_send<T: Send>() {}
assert_send::<GpuWorker>();
}
/// Dropping the handle (CPU side) must not block recv on the worker
/// side; instead it must surface as `Disconnected`. This is the
/// standard crossbeam guarantee but we pin it down with an explicit
/// test so a future refactor (e.g. switching channel backends) can't
/// silently change semantics.
#[test]
fn dropping_handle_disconnects_command_channel() {
let (worker, handle) = GpuSystem::new().into_handle();
drop(handle);
let result = worker.cmd_rx.recv();
assert!(
matches!(result, Err(crossbeam_channel::RecvError)),
"expected Disconnected after handle drop, got {result:?}",
);
}
/// Spawn the no-op worker, signal shutdown immediately, and join. Must
/// complete within a generous timeout (the polling cadence is 2 ms,
/// so 250 ms gives plenty of headroom even on a loaded test runner).
#[test]
fn noop_worker_shuts_down_cleanly() {
let shutdown = Arc::new(AtomicBool::new(false));
let handle = spawn_noop_worker(shutdown.clone());
// Brief gap so the worker enters its loop at least once before we
// signal — exercises the in-loop exit path rather than the never-
// entered-loop case.
thread::sleep(Duration::from_millis(5));
let outcome =
shutdown_and_join_with_timeout(&shutdown, handle, Duration::from_millis(250));
assert_eq!(outcome, Ok(()), "no-op worker did not join in budget");
}
/// A worker that never exits must surface as a timeout, not a hang.
/// Validates the `crossbeam_channel::after` budget. We construct a
/// pseudo-worker that ignores `shutdown` for the entire timeout
/// window, then exits — the test budget is short enough to force the
/// timeout path.
#[test]
fn shutdown_join_timeouts_on_misbehaving_worker() {
let shutdown = Arc::new(AtomicBool::new(false));
let handle = thread::Builder::new()
.name("test-misbehaving-worker".to_string())
.spawn(|| {
// Sleep longer than the test's join budget. The test
// proves the join helper returns `Err(())` rather than
// blocking the test process indefinitely.
thread::sleep(Duration::from_millis(500));
})
.expect("spawn misbehaving worker");
let outcome =
shutdown_and_join_with_timeout(&shutdown, handle, Duration::from_millis(50));
assert_eq!(outcome, Err(()), "expected timeout signal");
}
/// M1.8 — `write_u32_fence` / `read_u32_fence` ordering test. A
/// producer thread writes a "data" value, then a "fence" value via
/// `write_u32_fence`. A consumer thread spin-reads the fence via
/// `read_u32_fence` and, on observing the producer's update, reads
/// the data via plain `read_u32`. The data must always equal the
/// producer's pre-fence write — never an older value or a torn read.
///
/// On x86_64 (TSO) this would pass even without the fences; on
/// weaker architectures it would fail without them. We pin down the
/// invariant here so future ports / refactors can't silently weaken
/// it. Uses a small synthetic memory implementing `MemoryAccess`.
#[test]
fn write_u32_fence_publishes_prior_writes() {
use std::sync::atomic::{AtomicBool, AtomicU32};
use std::time::Instant;
use xenia_memory::MemoryAccess;
// The test's MemoryAccess impl uses `AtomicU32` slots so the
// multi-byte u32 reads/writes are torn-read-free. The fence
// helper layers Release/Acquire on top — without atomic
// storage, byte-by-byte reads on a writer-racing buffer would
// see torn values regardless of the fence.
const SLOT_COUNT: usize = 16;
struct ScopedMem([AtomicU32; SLOT_COUNT]);
impl ScopedMem {
fn slot(&self, addr: u32) -> &AtomicU32 {
&self.0[(addr / 4) as usize]
}
}
impl MemoryAccess for ScopedMem {
fn read_u8(&self, addr: u32) -> u8 {
let v = self.slot(addr & !3).load(Ordering::Relaxed);
let shift = (addr & 3) * 8;
(v >> shift) as u8
}
fn read_u16(&self, addr: u32) -> u16 {
u16::from_le_bytes([self.read_u8(addr), self.read_u8(addr + 1)])
}
fn read_u32(&self, addr: u32) -> u32 {
self.slot(addr).load(Ordering::Relaxed)
}
fn read_u64(&self, addr: u32) -> u64 {
let lo = self.read_u32(addr) as u64;
let hi = self.read_u32(addr + 4) as u64;
lo | (hi << 32)
}
fn write_u8(&self, _addr: u32, _val: u8) {
unimplemented!("test fixture only writes u32")
}
fn write_u16(&self, _addr: u32, _val: u16) {
unimplemented!("test fixture only writes u32")
}
fn write_u32(&self, addr: u32, val: u32) {
self.slot(addr).store(val, Ordering::Relaxed);
}
fn write_u64(&self, addr: u32, val: u64) {
self.write_u32(addr, val as u32);
self.write_u32(addr + 4, (val >> 32) as u32);
}
fn translate(&self, _addr: u32) -> Option<*const u8> {
None
}
fn translate_mut(&self, _addr: u32) -> Option<*mut u8> {
None
}
}
let mem: Arc<ScopedMem> = Arc::new(ScopedMem(std::array::from_fn(|_| {
AtomicU32::new(0)
})));
// Initialize fence and data slots to zero.
mem.write_u32(0, 0); // data
mem.write_u32(16, 0); // fence
let stop = Arc::new(AtomicBool::new(false));
let mem_p = mem.clone();
let stop_p = stop.clone();
let producer = thread::Builder::new()
.name("fence-producer".into())
.spawn(move || {
for i in 1u32..=10_000 {
if stop_p.load(Ordering::Relaxed) {
break;
}
mem_p.write_u32(0, i); // data
mem_p.write_u32_fence(16, i); // fence (Release)
thread::yield_now();
}
})
.expect("spawn producer");
let mem_c = mem.clone();
let consumer = thread::Builder::new()
.name("fence-consumer".into())
.spawn(move || {
let deadline = Instant::now() + Duration::from_millis(500);
let mut last_seen = 0u32;
let mut iters = 0u32;
while Instant::now() < deadline {
let f = mem_c.read_u32_fence(16); // Acquire
if f != last_seen {
let d = mem_c.read_u32(0);
// The data we read after the fence must be at
// least as new as the fence value (producer
// wrote `data = i; fence(i)` in that order).
assert!(
d >= f,
"fence ordering violated: data={d} fence={f}"
);
last_seen = f;
iters += 1;
}
}
iters
})
.expect("spawn consumer");
let observed = consumer.join().expect("consumer join");
stop.store(true, Ordering::Relaxed);
let _ = producer.join();
assert!(
observed > 0,
"consumer never observed a fence transition (race scheduler too unfair?)",
);
}
/// Spawning two no-op workers in parallel and joining both must
/// succeed without interference — proves the joiner side-thread
/// pattern doesn't accidentally serialize teardown.
#[test]
fn two_concurrent_noop_workers_both_shut_down() {
let shutdown_a = Arc::new(AtomicBool::new(false));
let handle_a = spawn_noop_worker(shutdown_a.clone());
let shutdown_b = Arc::new(AtomicBool::new(false));
let handle_b = spawn_noop_worker(shutdown_b.clone());
let r_a = shutdown_and_join_with_timeout(
&shutdown_a,
handle_a,
Duration::from_millis(250),
);
let r_b = shutdown_and_join_with_timeout(
&shutdown_b,
handle_b,
Duration::from_millis(250),
);
assert_eq!(r_a, Ok(()));
assert_eq!(r_b, Ok(()));
}
}