Files
xenia-rs/crates/xenia-gpu/src/handle.rs
MechaCat02 82f3d611e2 fix(gpu,kernel): KRNBUG-Vd-04 / GPUBUG-001 / XMODBUG-013 — VdSwap PM4 ring path
The pre-fix VdSwap zero-filled the guest's reserved buffer with NOPs and
called `state.gpu.notify_xe_swap` directly — bypassing the ring, leaving
the PM4_XE_SWAP handler at gpu_system.rs:1232 dead code, and skipping
the PM4_TYPE0(SHADER_CONSTANT_FETCH_00_0, 6) patch. Sylpheed's bloom/
blur "sample frame N for frame N+1" path samples fetch-constant slot 0
expecting the frontbuffer descriptor; without the patch, slot 0 stayed
stale and any shader sampling it read garbage.

This commit writes the canary VdSwap PM4 sequence directly into the
primary ring at the current write pointer (read via the shared MMIO
atomic), then advances WPTR over the injection. The natural CP drain
consumes PM4_XE_SWAP — bumping `swaps_seen` and patching fetch-constant
slot 0 — without going through any direct kernel→GPU bypass.

Sequence per xenia-canary VdSwap_entry (xboxkrnl_video.cc:438-521):
  1) PM4_TYPE0(0x4800, count=6) + 6 fetch-header dwords (with
     base_address re-patched from virtual to physical >> 12).
  2) PM4_TYPE3(PM4_XE_SWAP, count=4) + signature + frontbuffer_phys
     + width + height.

Mechanism notes:
- buffer_ptr in xenia-rs is in the system command buffer, NOT the
  primary ring (verified empirically: buffer_ptr=0x4acd4df8 vs
  ring_base=0x0accb000, size 4 KB). Canary's VdSwap writes to
  buffer_ptr because its ring layout maps the reserved slot inside
  the ring; xenia-rs's doesn't, so we have to write at the actual
  ring WPTR address (cached on KernelState.ring_base from
  VdInitializeRingBuffer).
- The original "buffer_ptr zero-fill + bump WPTR by 64" path is
  preserved before the injection — it exposes any game-batched PM4
  packets and keeps the buffer_ptr region skippable per existing
  game compat behavior.
- A safety-net fallback at the end calls `notify_xe_swap` directly if
  swaps_seen didn't advance during the drain (e.g. a ring-arithmetic
  edge case). Idempotent — only fires when the PM4 path didn't.
- KRNBUG-Mm-04 deferred: virt→phys uses the masked stub
  `virt & 0x1FFF_FFFF`, sufficient for the standard heap.

Mechanical changes:
- crates/xenia-gpu/src/pm4.rs: add make_packet_type0 / type2 / type3
  helpers + round-trip unit test (mirrors canary xenos.h:1682-1709).
- crates/xenia-gpu/src/handle.rs: add mmio_cp_rb_wptr_load accessor
  (Acquire-load) so the kernel can compute ring offsets.
- crates/xenia-kernel/src/state.rs: cache ring_base / ring_size_dwords
  on KernelState (set by VdInitializeRingBuffer).
- crates/xenia-kernel/src/exports.rs: rewrite the vd_swap PM4-emit
  block; patch fetch_dwords[1] base_address virt→phys before injection.

Verification at -n 100M lockstep:
  swaps:                2 → 2     (game fires VdSwap exactly twice)
  draws:                0 → 0     (gated by Phases D+E)
  fallback warning:     0 occurrences (PM4 path consumed both swaps)
  instructions:         ~100M
Tests: 552 passing (553 with new pm4 round-trip test). Lockstep
stable-fields determinism: byte-identical across two 100M runs.

The "swaps > 2" prediction in the audit's plan assumed the game would
fire VdSwap more often once the path worked; empirically Sylpheed only
calls VdSwap twice within 100M instructions (this is the renderer
plateau the audit identified). The success criterion for Phase C is
that the PM4 path is now operational, which Phases D+E require for
visible draws.

Closes KRNBUG-Vd-04, GPUBUG-001, XMODBUG-013.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-03 14:00:23 +02:00

1019 lines
44 KiB
Rust

//! GPU thread skeleton — types only, no thread spawned yet.
//!
//! M1 step 1 of the concurrency rollout (see
//! `/home/fabi/.claude/plans/good-plese-implement-in-zesty-hickey.md`).
//! This module introduces the **shapes** that the eventual GPU host thread
//! will use, without changing any runtime behavior:
//!
//! - [`GpuCommand`] — the control-plane RPC enum sent CPU→GPU.
//! - [`GpuHandle`] — the CPU-side proxy: command sender + cloned MMIO
//! atomics + an interrupt receiver. Eventually it'll also carry the
//! worker thread's `JoinHandle`.
//! - [`GpuWorker`] — the GPU-side owned state (the `GpuSystem` itself plus
//! the receive end of the command channel and the sender for interrupts).
//! It does not yet have a `run()` method; that lands in step 4.
//!
//! The construction is done via [`GpuSystem::into_handle`], which splits a
//! freshly-built `GpuSystem` into `(GpuWorker, GpuHandle)`. The worker keeps
//! the actual GPU state plus `cmd_rx`/`int_tx`; the handle carries `cmd_tx`,
//! `int_rx`, and clones of the `Arc<AtomicU32>` MMIO mailboxes so the CPU
//! producer side can write WPTR / read RPTR without going through the
//! channel.
//!
//! Until step 4 wires the worker into a real thread, no caller invokes
//! `into_handle` on the live `KernelState.gpu` — the constructor exists for
//! the unit test below and for the synthetic-test path.
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread::{self, JoinHandle};
use std::time::{Duration, Instant};
use crossbeam_channel::{Receiver, Sender, bounded, unbounded};
use xenia_memory::GuestMemory;
use crate::gpu_system::{ExecOutcome, GpuMmio, GpuStats, GpuSystem, PendingInterrupt};
/// Reply channel for a [`GpuCommand::DrainFence`]. Single-shot
/// `bounded(1)` — the GPU sends `()` once it's drained the ring up to the
/// requested wptr, the CPU thread blocks on `recv` until then. M1 step 5
/// is the first user of this; step 1 only validates the type fits.
pub type DrainReply = crossbeam_channel::Sender<()>;
/// Control-plane RPC the CPU thread sends to the GPU thread. Data-plane
/// signals (WPTR/RPTR/INT_STATUS) ride atomic mailboxes instead — see
/// [`GpuMmio`]. Channels are for events that need ordered delivery and
/// (sometimes) a reply.
#[derive(Debug)]
pub enum GpuCommand {
/// `VdInitializeRingBuffer(base, size_log2)`. The kernel hands the GPU
/// the guest-physical base address and dword-size of the primary ring.
/// Today's [`GpuSystem::initialize_ring_buffer`] does this synchronously;
/// from step 4 onward the kernel sends this command instead.
InitializeRing {
base: u32,
size_log2: u32,
},
/// `VdEnableRingBufferRPtrWriteBack(addr, block_size)`. The kernel
/// supplies a guest-memory address into which the GPU should mirror its
/// internal `read_offset_dwords` after each packet.
EnableRptrWriteback {
addr: u32,
block_size_log2: u32,
},
/// Block until the GPU's read pointer has caught up with the supplied
/// `target_wptr` (in dwords). The reply is sent on `reply_tx` once the
/// drain completes (or the worker hits its internal deadline). Used by
/// `vd_swap` to preserve the synchronous "GPU has caught up to the
/// guest's wptr at swap time" semantics.
DrainFence {
target_wptr: u32,
reply_tx: DrainReply,
},
/// Bump the swap counter and post a swap interrupt. Sent by `vd_swap`
/// when the guest commits a frame; the worker's [`GpuSystem::notify_xe_swap`]
/// updates `swaps_seen`/`last_swap` and pushes an `InterruptSource::Swap`
/// onto the (M1.6) `int_tx` channel. Fire-and-forget — no reply.
NotifyXeSwap {
frontbuffer_phys: u32,
width: u32,
height: u32,
},
/// Tear-down signal. The worker drains any in-flight reply channels,
/// drops its `GpuSystem`, and the host thread joins.
Shutdown,
}
/// CPU-side proxy that the kernel and the interpreter loop hold. It owns:
///
/// - The send end of the GPU command channel (`cmd_tx`).
/// - The receive end of the GPU→CPU interrupt channel (`int_rx`). Step 6
/// migrates `kernel.gpu.pending_interrupts` onto this channel.
/// - Cloned `Arc<AtomicU32>` MMIO mailboxes so MMIO write callbacks (which
/// already capture these `Arc`s) can keep working unchanged after the
/// `GpuSystem` itself moves to the worker thread.
///
/// `GpuHandle` is `Send + Sync` by virtue of every field being so already
/// (`Sender`/`Receiver` from crossbeam are `Send + Sync` for `Send` payloads;
/// `Arc<AtomicU32>` is `Send + Sync`). No explicit impl is needed.
///
/// Use [`Self::send_cmd`] to post commands so the parker wake invariant is
/// preserved (see M1.7): every channel send must be paired with a wake of
/// the worker thread, otherwise the worker can sleep through new commands
/// even if `cmd_rx.is_empty()` was momentarily false.
#[derive(Debug, Clone)]
pub struct GpuHandle {
/// Control-plane sender. `clone()`-able so multiple call sites can post
/// commands; crossbeam's `Sender` is `Send + Sync + Clone`.
pub cmd_tx: Sender<GpuCommand>,
/// Interrupt drain channel. Step 6 moves `pending_interrupts` onto this.
/// Until then the `int_tx` half on the worker side is unused.
pub int_rx: Receiver<PendingInterrupt>,
/// Direct access to the MMIO mailbox arcs. The CP_RB_WPTR write that
/// the guest does inside the MMIO region callback already lands in
/// `mmio.cp_rb_wptr` (an `Arc<AtomicU32>`); cloning these here lets
/// CPU-side code do the same atomic read/write that the inline path
/// did, without channel hops.
pub mmio: GpuMmio,
/// Read-side snapshot of GPU stats / cache sizes. Refreshed by the
/// worker each outer loop iteration. Read by [`GpuBackend::digest_snapshot`]
/// and the HUD; cheap copy-out under a brief lock acquisition.
pub digest: Arc<std::sync::Mutex<GpuDigestSnapshot>>,
/// Shared shutdown flag — set by the CPU side during teardown, read
/// by the worker each loop iteration. Cloned here so callers without
/// access to the worker side (e.g. drop guards) can still signal exit.
pub shutdown: Arc<AtomicBool>,
}
impl GpuHandle {
/// Post a command to the worker and wake it. Wraps the raw
/// `cmd_tx.send` with the M1.7 parker discipline: set
/// `wake_pending=true` (Release) and `unpark()` the worker thread.
/// Without the wake, channel sends would not surface to a parked
/// worker — `crossbeam_channel::Sender::send` doesn't unpark by
/// itself.
pub fn send_cmd(
&self,
cmd: GpuCommand,
) -> Result<(), crossbeam_channel::SendError<GpuCommand>> {
let r = self.cmd_tx.send(cmd);
if r.is_ok() {
self.mmio.wake_pending.store(true, Ordering::Release);
if let Ok(g) = self.mmio.worker_thread.lock() {
if let Some(t) = g.as_ref() {
t.unpark();
}
}
}
r
}
}
/// Periodically-refreshed read-side snapshot of GPU state. Updated by the
/// worker thread on each outer loop iteration; consumed by the CPU side
/// for the run-digest at end-of-run, the HUD, etc. Held in an
/// `Arc<Mutex<GpuDigestSnapshot>>` shared between worker and handle.
///
/// Snapshot is intentionally restricted to `u64` counters — they're
/// cheap to copy and survive past the worker's lifetime (so the digest
/// can be computed even after the worker has shut down). For the
/// inline backend [`GpuBackend::digest_snapshot`] computes the same view
/// directly from the live `GpuSystem` without any locking.
#[derive(Debug, Clone, Default)]
pub struct GpuDigestSnapshot {
pub stats: GpuStats,
pub shader_blobs_live: u64,
pub texture_cache_entries: u64,
pub texture_decodes: u64,
}
/// Reverse end of the command and interrupt channels. The GPU thread
/// (`GpuWorker::run`) reads from `cmd_rx`, pushes onto `int_tx` (M1.6+),
/// owns the actual `GpuSystem`, and refreshes the shared
/// `GpuDigestSnapshot`. Built by [`GpuSystem::into_handle`] alongside its
/// matching [`GpuHandle`].
///
/// (No `#[derive(Debug)]` because `GpuSystem` itself isn't `Debug`; we
/// don't need it on the worker for any production purpose.)
pub struct GpuWorker {
/// The GPU subsystem itself. The worker thread is its exclusive
/// owner once spawned.
pub system: GpuSystem,
/// Receive end of the control channel.
pub cmd_rx: Receiver<GpuCommand>,
/// Send end of the interrupt channel. M1.6 wires this in.
pub int_tx: Sender<PendingInterrupt>,
/// Shared digest snapshot, refreshed each outer loop iteration.
pub digest: Arc<std::sync::Mutex<GpuDigestSnapshot>>,
/// Shutdown flag. Set by `shutdown_and_join_with_timeout`; the worker
/// loop checks `Acquire` each iteration.
pub shutdown: Arc<AtomicBool>,
}
impl GpuSystem {
/// Split a freshly-built `GpuSystem` into a `(GpuWorker, GpuHandle)`
/// pair. The handle keeps cloned `Arc<AtomicU32>` MMIO mailboxes plus
/// the channel sender; the worker keeps the system itself plus the
/// channel receiver and the interrupt sender.
///
/// Channels are unbounded (`crossbeam_channel::unbounded`) because the
/// CPU side never blocks on a control-plane send — guest-driven export
/// rates are bounded by the interpreter throughput, and interrupts are
/// already coalesced upstream by the kernel.
///
/// Caller supplies a shared `shutdown: Arc<AtomicBool>` so the worker
/// and the CPU side can coordinate teardown. For unit tests that don't
/// care about lifecycle, [`Self::into_handle_test`] supplies a fresh
/// flag.
pub fn into_handle_with_shutdown(
self,
shutdown: Arc<AtomicBool>,
) -> (GpuWorker, GpuHandle) {
let mmio = self.mmio.clone();
let (cmd_tx, cmd_rx) = unbounded::<GpuCommand>();
let (int_tx, int_rx) = unbounded::<PendingInterrupt>();
let digest = Arc::new(std::sync::Mutex::new(GpuDigestSnapshot::default()));
let worker = GpuWorker {
system: self,
cmd_rx,
int_tx,
digest: digest.clone(),
shutdown: shutdown.clone(),
};
let handle = GpuHandle {
cmd_tx,
int_rx,
mmio,
digest,
shutdown,
};
(worker, handle)
}
/// Convenience for tests: allocate a fresh shutdown flag and split.
pub fn into_handle(self) -> (GpuWorker, GpuHandle) {
self.into_handle_with_shutdown(Arc::new(AtomicBool::new(false)))
}
}
/// Polling interval for the no-op worker's shutdown check. A short sleep
/// avoids burning a host core while still keeping shutdown latency under
/// 10 ms, well below the 1 s defensive timeout in
/// [`shutdown_and_join_with_timeout`].
const NOOP_WORKER_POLL: Duration = Duration::from_millis(2);
/// Maximum time the worker waits in `park_timeout` before re-checking
/// shutdown / commands / ring state. With `unpark()` on every guest WPTR
/// write the typical wake latency is microseconds; this is the upper
/// bound for the shutdown / quiescent-state polling cadence. 16 ms aligns
/// with vsync cadence on a 60 Hz host and bounds shutdown latency at the
/// same value.
const WORKER_PARK_TIMEOUT: Duration = Duration::from_millis(16);
/// Cap on packets executed per outer-loop iteration before the worker
/// re-checks shutdown / commands / digest publish. Mirrors the inline-mode
/// `gpu_runs = max(1, min(64, executed_this_round / 6))` pacer ceiling.
const WORKER_PACKETS_PER_ITER: u32 = 64;
/// Backend for the kernel's `gpu` field. The two variants share a thin
/// dispatch layer (forwarding methods on this enum) so call sites in
/// `xenia-kernel` exports stay terse.
///
/// - [`GpuBackend::Inline`] keeps the legacy synchronous path: the CPU
/// thread calls `kernel.gpu.execute_one(mem)` directly each scheduler
/// round. Selected by `--gpu-inline` (rollback flag) or implied by
/// `--ui` until the UI worker is migrated.
/// - [`GpuBackend::Threaded`] (**default at M1.9**) hands `GpuSystem`
/// ownership to a dedicated host thread; the CPU thread holds a
/// [`GpuHandle`] proxy and talks to the worker via channels + the
/// shared MMIO atomics.
///
/// `GpuBackend` itself is `Send` (the inline variant carries `GpuSystem`,
/// which is Send-able as long as nothing inside it is `!Send` — it isn't);
/// the threaded variant carries a `Send + Sync` handle.
pub enum GpuBackend {
Inline(GpuSystem),
Threaded(GpuHandle),
}
impl GpuBackend {
/// Read the MMIO mailbox struct (cheap — `GpuMmio` is `Clone` cloning
/// only `Arc<AtomicU32>`s; we hand back a borrow). The result is the
/// same `Arc<AtomicU32>` set on either backend, so MMIO region
/// callbacks installed via [`crate::build_mmio_region`] route guest
/// writes to the same atomics the worker reads.
pub fn mmio(&self) -> &GpuMmio {
match self {
GpuBackend::Inline(s) => &s.mmio,
GpuBackend::Threaded(h) => &h.mmio,
}
}
/// Convenience: borrow the inline `GpuSystem` for code paths that
/// haven't been generalized to the `Threaded` variant yet (vd_swap's
/// drain, the various `state.gpu.X` reads in M1.5+ work). Returns
/// `None` in threaded mode; the caller's responsibility is to handle
/// that gracefully — typically by treating the operation as a no-op
/// or routing it through a command (see `vd_swap` notes).
pub fn as_inline(&self) -> Option<&GpuSystem> {
match self {
GpuBackend::Inline(s) => Some(s),
GpuBackend::Threaded(_) => None,
}
}
/// Mutable counterpart of [`Self::as_inline`].
pub fn as_inline_mut(&mut self) -> Option<&mut GpuSystem> {
match self {
GpuBackend::Inline(s) => Some(s),
GpuBackend::Threaded(_) => None,
}
}
/// Forward `VdInitializeRingBuffer`. Inline mode applies it directly;
/// threaded mode posts an `InitializeRing` command onto the worker
/// channel.
pub fn initialize_ring_buffer(&mut self, base: u32, size_log2: u32) {
match self {
GpuBackend::Inline(s) => s.initialize_ring_buffer(base, size_log2),
GpuBackend::Threaded(h) => {
let _ = h.send_cmd(GpuCommand::InitializeRing { base, size_log2 });
}
}
}
/// Forward `VdEnableRingBufferRPtrWriteBack`.
pub fn enable_rptr_writeback(&mut self, addr: u32, block_log2: u32) {
match self {
GpuBackend::Inline(s) => s.enable_rptr_writeback(addr, block_log2),
GpuBackend::Threaded(h) => {
let _ = h.send_cmd(GpuCommand::EnableRptrWriteback {
addr,
block_size_log2: block_log2,
});
}
}
}
/// Bump `CP_RB_WPTR` by `dwords`. Both backends route the bump through
/// the shared MMIO atomic mailbox (`Acquire`-load → wrap-add →
/// `Release`-store). Inline mode then picks up the new value on its
/// next `sync_with_mmio`; threaded mode's worker observes the same
/// atomic and folds it into its ring view.
///
/// Note: the value stored is unmodulo'd. The reading side's
/// `sync_with_mmio` does the `% ring.size_dwords` step before
/// updating the local ring view, which is the only place a `size_dwords`
/// reference exists. Practical wptr drift before u32 wraps is
/// `2^32 / 64 ≈ 67M` VdSwap-style bumps — safely above any plausible
/// single-run total.
pub fn extend_write_ptr_by(&mut self, dwords: u32) {
let mmio = self.mmio();
// Relaxed is sufficient for the load — we re-store with Release
// and the readers (worker `sync_with_mmio` / inline next round)
// do their own Acquire. The load here is just a value source.
let cur = mmio.cp_rb_wptr.load(Ordering::Relaxed);
mmio.cp_rb_wptr
.store(cur.wrapping_add(dwords), Ordering::Release);
}
/// Read the current ring write pointer from the shared MMIO atomic.
/// Acquire-load so any prior ring-memory writes the caller did before
/// asking are not reordered past this read by the compiler. Used by
/// `vd_swap` to compute the ring offset for direct PM4 injection.
pub fn mmio_cp_rb_wptr_load(&self) -> u32 {
self.mmio().cp_rb_wptr.load(Ordering::Acquire)
}
/// Drain any PM4 packets currently exposed by the ring (i.e., up to
/// the current `CP_RB_WPTR`). Inline mode runs the synchronous
/// drain. Threaded mode posts a [`GpuCommand::DrainFence`] and blocks
/// on the reply channel up to a 1 s defensive timeout — the worker
/// has its own ~900 ms internal deadline so the reply is bounded.
///
/// The CPU thread blocking here is sound: the only thread that
/// satisfies the reply is the GPU worker, which never tries to
/// acquire any CPU-side primitive (it talks back exclusively through
/// channels and atomics). The lock-ordering argument from the M1.4
/// plan holds: T_cpu → cmd_tx → T_gpu → reply_tx → T_cpu, no cycle.
pub fn drain_to_current_wptr(&mut self, mem: &dyn xenia_memory::MemoryAccess) -> u32 {
match self {
GpuBackend::Inline(s) => {
s.sync_with_mmio();
s.drain(mem, 4096)
}
GpuBackend::Threaded(h) => {
let target_wptr = h.mmio.cp_rb_wptr.load(Ordering::Acquire);
let (reply_tx, reply_rx) = bounded::<()>(1);
if h
.send_cmd(GpuCommand::DrainFence {
target_wptr,
reply_tx,
})
.is_err()
{
// Worker disconnected; treat as drained.
return 0;
}
match reply_rx.recv_timeout(Duration::from_secs(1)) {
Ok(()) => {
// We don't currently track the exact packet count
// drained on the threaded path — the worker drains
// by `is_ready` predicate. Return 1 as a "drain
// happened" sentinel; the inline mode's exact
// count is a debug-trace nicety.
1
}
Err(_) => {
tracing::warn!(
target: "gpu",
target_wptr,
"vd_swap drain fence timed out at 1s; continuing teardown",
);
0
}
}
}
}
}
/// Bump `swaps_seen` + record `last_swap` + push a swap interrupt.
/// Inline calls directly. Threaded sends `NotifyXeSwap` over the
/// command channel — fire-and-forget; the worker handles it on its
/// next loop iteration.
pub fn notify_xe_swap(&mut self, frontbuffer_phys: u32, width: u32, height: u32) {
match self {
GpuBackend::Inline(s) => s.notify_xe_swap(frontbuffer_phys, width, height),
GpuBackend::Threaded(h) => {
let _ = h.send_cmd(GpuCommand::NotifyXeSwap {
frontbuffer_phys,
width,
height,
});
}
}
}
/// Forward [`GpuSystem::has_pending_interrupts`] under inline mode;
/// under threaded mode peek the `int_rx` channel.
pub fn has_pending_interrupts(&self) -> bool {
match self {
GpuBackend::Inline(s) => s.has_pending_interrupts(),
GpuBackend::Threaded(h) => !h.int_rx.is_empty(),
}
}
/// Drain pending interrupts. Inline path forwards to
/// [`GpuSystem::take_pending_interrupts`]; threaded path drains the
/// channel non-blockingly. (M1 step 6 fully wires the threaded path
/// when the worker starts pushing onto `int_tx`; for now, the channel
/// is empty in threaded mode at -n 2M, so this still returns an empty
/// `Vec`.)
pub fn take_pending_interrupts(&mut self) -> Vec<PendingInterrupt> {
match self {
GpuBackend::Inline(s) => s.take_pending_interrupts(),
GpuBackend::Threaded(h) => {
let mut out = Vec::new();
while let Ok(pi) = h.int_rx.try_recv() {
out.push(pi);
}
out
}
}
}
/// End-of-run snapshot used by the run-digest. Inline mode reads
/// directly; threaded mode pulls the latest published mirror under a
/// brief lock. Returns owned data — safe to use after the worker has
/// shut down.
pub fn digest_snapshot(&self) -> GpuDigestSnapshot {
match self {
GpuBackend::Inline(s) => GpuDigestSnapshot {
stats: s.stats.clone(),
shader_blobs_live: s.shader_blobs.len() as u64,
texture_cache_entries: s.texture_cache.len() as u64,
texture_decodes: s.texture_cache.decodes_total,
},
GpuBackend::Threaded(h) => h
.digest
.lock()
.expect("GpuDigestSnapshot mutex poisoned")
.clone(),
}
}
}
impl GpuWorker {
/// Run loop body for the GPU host thread.
///
/// Each iteration:
/// 1. Check the `Acquire`-loaded shutdown flag — exit if set.
/// 2. Drain any pending control-plane commands non-blockingly.
/// 3. Sample MMIO (refreshes WPTR / RPTR mailboxes into the live ring).
/// 4. Execute up to [`WORKER_PACKETS_PER_ITER`] PM4 packets while the
/// ring is non-empty / not blocked.
/// 5. Refresh the shared digest snapshot under a brief lock.
/// 6. If no work was done this iteration, sleep
/// [`WORKER_IDLE_SLEEP`]. Step 7 swaps this for `park_timeout`.
///
/// `memory: Arc<GuestMemory>` is shared with the CPU thread. The
/// worker only ever reads through `&*memory`, which deref-coerces to
/// `&GuestMemory` and then to `&dyn MemoryAccess`. All mutations on
/// `MemoryAccess` are `&self` post-trait-flip, so concurrent CPU and
/// GPU writes are sound under the trait's contract (callers must not
/// concurrently read/write the same byte range from different
/// threads — vd_swap's RPTR writeback / EVENT_WRITE_SHD writes target
/// guest-thread-private addresses by construction).
pub fn run(mut self, memory: Arc<GuestMemory>) {
// M1.7 parker registration: publish our `Thread` handle so the
// MMIO `CP_RB_WPTR` write callback can `unpark()` us. Only one
// worker thread per `GpuMmio`; we replace whatever was there.
if let Ok(mut g) = self.system.mmio.worker_thread.lock() {
*g = Some(thread::current());
}
loop {
// (1) shutdown
if self.shutdown.load(Ordering::Acquire) {
break;
}
// (2) drain commands
let mut did_work = false;
while let Ok(cmd) = self.cmd_rx.try_recv() {
did_work = true;
match cmd {
GpuCommand::InitializeRing { base, size_log2 } => {
self.system.initialize_ring_buffer(base, size_log2);
}
GpuCommand::EnableRptrWriteback {
addr,
block_size_log2,
} => {
self.system.enable_rptr_writeback(addr, block_size_log2);
}
GpuCommand::DrainFence {
target_wptr: _,
reply_tx,
} => {
// Drain the ring up to whatever WPTR the MMIO
// atomic currently exposes (the CPU side bumped
// it before sending the fence). Bounded by an
// internal 900 ms deadline — 100 ms tighter than
// the CPU's `recv_timeout(1s)` so the timeout
// surfaces on the CPU side as a clean
// `RecvTimeout` rather than a partial drain that
// looks complete.
//
// The drain loop polls `is_ready` after each
// packet; `sync_with_mmio` between packets is
// what folds late guest WPTR writes into the
// local ring view. Loop exits when the ring is
// empty (rptr == wptr after modulo) or a packet
// returns `Idle`/`Blocked`.
self.system.sync_with_mmio();
let deadline = Instant::now() + Duration::from_millis(900);
while self.system.is_ready(&*memory) {
if Instant::now() >= deadline {
break;
}
match self.system.execute_one(&*memory) {
ExecOutcome::Stepped { .. } => {
self.system.sync_with_mmio();
}
ExecOutcome::Idle | ExecOutcome::Blocked => break,
}
}
let _ = reply_tx.send(());
}
GpuCommand::NotifyXeSwap {
frontbuffer_phys,
width,
height,
} => {
self.system
.notify_xe_swap(frontbuffer_phys, width, height);
}
GpuCommand::Shutdown => {
self.shutdown.store(true, Ordering::Release);
return;
}
}
}
// (3,4) drive the GPU
self.system.sync_with_mmio();
let mut budget = WORKER_PACKETS_PER_ITER;
while budget > 0 && self.system.is_ready(&*memory) {
match self.system.execute_one(&*memory) {
ExecOutcome::Stepped { .. } => {
did_work = true;
self.system.sync_with_mmio();
}
ExecOutcome::Idle | ExecOutcome::Blocked => break,
}
budget -= 1;
}
// (5a) M1.6: forward `PM4_INTERRUPT` / `XE_SWAP` events from
// `system.pending_interrupts` onto `int_tx`. The Vec
// lives on this thread; the channel is the cross-thread
// delivery primitive. Send is non-blocking (unbounded)
// and the receive end is drained by the CPU thread's
// per-round queue at `main.rs::run_execution`.
// `int_tx.send` returns `Err` only if the receiver was
// dropped — which means the CPU side is gone, in which
// case we'd be torn down momentarily anyway.
for pi in self.system.take_pending_interrupts() {
if self.int_tx.send(pi).is_err() {
break;
}
did_work = true;
}
// (5b) publish digest snapshot
if did_work {
let snap = GpuDigestSnapshot {
stats: self.system.stats.clone(),
shader_blobs_live: self.system.shader_blobs.len() as u64,
texture_cache_entries: self.system.texture_cache.len() as u64,
texture_decodes: self.system.texture_cache.decodes_total,
};
if let Ok(mut g) = self.digest.lock() {
*g = snap;
}
}
// (6) M1.7 parker — `park_timeout` replaces the polling
// sleep. The standard parker idiom defends against the
// producer-races-park lost-wakeup:
//
// 1. Swap `wake_pending` to false (claim "we're going to
// park"). If `was_pending` is true, a producer
// signaled us between the last work and now — skip
// the park, loop and re-process.
// 2. Re-check side conditions (cmd channel, shutdown).
// These may have changed after step 1.
// 3. `park_timeout`. If the producer's `unpark()` runs
// between our re-check and our park call, std's token
// records it and the next park returns immediately.
// If neither happens within `WORKER_PARK_TIMEOUT`, we
// wake on our own and re-evaluate.
if !did_work {
let was_pending =
self.system.mmio.wake_pending.swap(false, Ordering::AcqRel);
if !was_pending
&& self.cmd_rx.is_empty()
&& !self.shutdown.load(Ordering::Acquire)
{
thread::park_timeout(WORKER_PARK_TIMEOUT);
}
}
}
// Clear the wake target on exit so post-shutdown MMIO writes
// don't try to unpark a dead thread (sound — `Thread::unpark`
// on an exited thread is a no-op — but it keeps the invariant
// tidy).
if let Ok(mut g) = self.system.mmio.worker_thread.lock() {
*g = None;
}
}
}
/// Spawn the real GPU worker thread. Returns its `JoinHandle`; the
/// matching `GpuHandle` (caller's existing one from
/// [`GpuSystem::into_handle_with_shutdown`]) is what the CPU thread keeps.
pub fn spawn_gpu_worker(
worker: GpuWorker,
memory: Arc<GuestMemory>,
) -> JoinHandle<()> {
thread::Builder::new()
.name("xenia-gpu".to_string())
.spawn(move || worker.run(memory))
.expect("spawn xenia-gpu worker thread")
}
/// M1 step 3 — spawn a placeholder GPU worker thread that does nothing
/// except poll `shutdown` on a short cadence and exit cleanly when it sees
/// `true`. Verifies thread lifecycle, signal propagation, and clean
/// teardown. **Not used in production paths**: in step 4 the worker grows a
/// real `GpuWorker::run` body that owns a `GpuSystem`. Until then this
/// function is the only spawn site, gated behind `--gpu-thread` in the CLI.
///
/// The function returns the `JoinHandle` so the caller can block on
/// teardown via [`shutdown_and_join_with_timeout`]. Ownership of the
/// `Arc<AtomicBool>` is shared: the caller keeps a clone for signaling,
/// the thread takes another clone for polling.
///
/// Rationale for `Acquire`/`Release` ordering on the bool: the spawning
/// thread may set up shared state *before* the worker reads it once we
/// expand the worker in step 4 — the `Release` store on shutdown then
/// pairs with the `Acquire` load here so any prior writes the spawner did
/// (e.g. populating channels with farewell messages) are visible to the
/// worker. For the no-op stage there's no shared state, but using the
/// stricter ordering now means step 4 inherits a correctly-fenced
/// shutdown protocol with no further changes.
pub fn spawn_noop_worker(shutdown: Arc<AtomicBool>) -> JoinHandle<()> {
thread::Builder::new()
.name("xenia-gpu-noop".to_string())
.spawn(move || {
while !shutdown.load(Ordering::Acquire) {
thread::sleep(NOOP_WORKER_POLL);
}
})
.expect("spawn xenia-gpu-noop worker thread")
}
/// Signal `shutdown` to the worker and join its thread, with a defensive
/// timeout so a misbehaving worker can't wedge the entire process. Logs at
/// `error!` if the timeout fires (which would indicate either the worker
/// loop ignoring `shutdown` or being parked on a primitive that wasn't
/// woken — both are bugs the user should hear about).
///
/// Returns `Ok(())` on a clean join inside the timeout, `Err(())` on
/// timeout. The caller decides whether to continue process teardown anyway
/// (typically yes — the worker's only state is its own stack).
pub fn shutdown_and_join_with_timeout(
shutdown: &Arc<AtomicBool>,
handle: JoinHandle<()>,
timeout: Duration,
) -> Result<(), ()> {
shutdown.store(true, Ordering::Release);
// No `JoinHandle::join_timeout` in std; emulate via a side-channel
// signal from the polling. We use a sentinel-clone of the JoinHandle
// approach: spawn a watchdog that, after the timeout, sets a
// "give up" flag — but `join` is a blocking call we can't preempt.
// Instead use a parking helper: spawn a hop-thread to call join, and
// wait on it via a `crossbeam_channel::after` select.
let (tx, rx) = unbounded::<()>();
let join_thread = thread::Builder::new()
.name("xenia-gpu-joiner".to_string())
.spawn(move || {
let _ = handle.join();
// `_ = tx.send(())` — receiver may already be dropped if
// we timed out, in which case Err is fine.
let _ = tx.send(());
})
.expect("spawn xenia-gpu-joiner thread");
crossbeam_channel::select! {
recv(rx) -> _ => {
// Joiner finished within the budget. Reap it (no work — the
// thread already returned). `join` here is fast.
let _ = join_thread.join();
Ok(())
}
recv(crossbeam_channel::after(timeout)) -> _ => {
tracing::error!(
target: "gpu",
?timeout,
"GPU worker did not exit in time; leaking thread to avoid wedging shutdown",
);
// Detach the joiner; it leaks but at least we proceed. Will
// get cleaned up when the process exits.
std::mem::drop(join_thread);
Err(())
}
}
}
#[cfg(test)]
mod tests {
use super::*;
/// Roundtrip an `InitializeRing` command: CPU side sends, worker side
/// receives, payload bytes match. This validates the channel plumbing
/// without touching any GPU semantics.
#[test]
fn initialize_ring_roundtrips_through_handle() {
let (worker, handle) = GpuSystem::new().into_handle();
let GpuWorker { cmd_rx, .. } = worker;
handle
.cmd_tx
.send(GpuCommand::InitializeRing {
base: 0x1000_0000,
size_log2: 18,
})
.expect("cmd_tx send");
match cmd_rx.recv().expect("cmd_rx recv") {
GpuCommand::InitializeRing { base, size_log2 } => {
assert_eq!(base, 0x1000_0000);
assert_eq!(size_log2, 18);
}
other => panic!("unexpected cmd: {other:?}"),
}
}
/// MMIO atomics on the handle and the worker's GpuSystem must be the
/// same Arc (clone). A guest write through the MMIO region callback
/// stores into `handle.mmio.cp_rb_wptr`; the worker observes the same
/// value via `worker.system.mmio.cp_rb_wptr`. If we accidentally
/// allocated a fresh atomic for either side, the worker would never see
/// guest writes.
#[test]
fn mmio_arcs_are_shared_between_handle_and_worker() {
use std::sync::atomic::Ordering;
let (worker, handle) = GpuSystem::new().into_handle();
handle.mmio.cp_rb_wptr.store(0xC0FFEE, Ordering::Release);
assert_eq!(
worker.system.mmio.cp_rb_wptr.load(Ordering::Acquire),
0xC0FFEE,
"worker side did not observe handle-side atomic store",
);
}
/// `GpuHandle` must be `Send + Sync`. Compile-time assertion via type
/// constraint — won't link if the bound is violated.
#[test]
fn handle_is_send_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<GpuHandle>();
}
/// `GpuWorker` must be `Send` so we can move it onto a thread in step 3.
/// (`Sync` is not required — only one thread ever owns the worker.)
#[test]
fn worker_is_send() {
fn assert_send<T: Send>() {}
assert_send::<GpuWorker>();
}
/// Dropping the handle (CPU side) must not block recv on the worker
/// side; instead it must surface as `Disconnected`. This is the
/// standard crossbeam guarantee but we pin it down with an explicit
/// test so a future refactor (e.g. switching channel backends) can't
/// silently change semantics.
#[test]
fn dropping_handle_disconnects_command_channel() {
let (worker, handle) = GpuSystem::new().into_handle();
drop(handle);
let result = worker.cmd_rx.recv();
assert!(
matches!(result, Err(crossbeam_channel::RecvError)),
"expected Disconnected after handle drop, got {result:?}",
);
}
/// Spawn the no-op worker, signal shutdown immediately, and join. Must
/// complete within a generous timeout (the polling cadence is 2 ms,
/// so 250 ms gives plenty of headroom even on a loaded test runner).
#[test]
fn noop_worker_shuts_down_cleanly() {
let shutdown = Arc::new(AtomicBool::new(false));
let handle = spawn_noop_worker(shutdown.clone());
// Brief gap so the worker enters its loop at least once before we
// signal — exercises the in-loop exit path rather than the never-
// entered-loop case.
thread::sleep(Duration::from_millis(5));
let outcome =
shutdown_and_join_with_timeout(&shutdown, handle, Duration::from_millis(250));
assert_eq!(outcome, Ok(()), "no-op worker did not join in budget");
}
/// A worker that never exits must surface as a timeout, not a hang.
/// Validates the `crossbeam_channel::after` budget. We construct a
/// pseudo-worker that ignores `shutdown` for the entire timeout
/// window, then exits — the test budget is short enough to force the
/// timeout path.
#[test]
fn shutdown_join_timeouts_on_misbehaving_worker() {
let shutdown = Arc::new(AtomicBool::new(false));
let handle = thread::Builder::new()
.name("test-misbehaving-worker".to_string())
.spawn(|| {
// Sleep longer than the test's join budget. The test
// proves the join helper returns `Err(())` rather than
// blocking the test process indefinitely.
thread::sleep(Duration::from_millis(500));
})
.expect("spawn misbehaving worker");
let outcome =
shutdown_and_join_with_timeout(&shutdown, handle, Duration::from_millis(50));
assert_eq!(outcome, Err(()), "expected timeout signal");
}
/// M1.8 — `write_u32_fence` / `read_u32_fence` ordering test. A
/// producer thread writes a "data" value, then a "fence" value via
/// `write_u32_fence`. A consumer thread spin-reads the fence via
/// `read_u32_fence` and, on observing the producer's update, reads
/// the data via plain `read_u32`. The data must always equal the
/// producer's pre-fence write — never an older value or a torn read.
///
/// On x86_64 (TSO) this would pass even without the fences; on
/// weaker architectures it would fail without them. We pin down the
/// invariant here so future ports / refactors can't silently weaken
/// it. Uses a small synthetic memory implementing `MemoryAccess`.
#[test]
fn write_u32_fence_publishes_prior_writes() {
use std::sync::atomic::{AtomicBool, AtomicU32};
use std::time::Instant;
use xenia_memory::MemoryAccess;
// The test's MemoryAccess impl uses `AtomicU32` slots so the
// multi-byte u32 reads/writes are torn-read-free. The fence
// helper layers Release/Acquire on top — without atomic
// storage, byte-by-byte reads on a writer-racing buffer would
// see torn values regardless of the fence.
const SLOT_COUNT: usize = 16;
struct ScopedMem([AtomicU32; SLOT_COUNT]);
impl ScopedMem {
fn slot(&self, addr: u32) -> &AtomicU32 {
&self.0[(addr / 4) as usize]
}
}
impl MemoryAccess for ScopedMem {
fn read_u8(&self, addr: u32) -> u8 {
let v = self.slot(addr & !3).load(Ordering::Relaxed);
let shift = (addr & 3) * 8;
(v >> shift) as u8
}
fn read_u16(&self, addr: u32) -> u16 {
u16::from_le_bytes([self.read_u8(addr), self.read_u8(addr + 1)])
}
fn read_u32(&self, addr: u32) -> u32 {
self.slot(addr).load(Ordering::Relaxed)
}
fn read_u64(&self, addr: u32) -> u64 {
let lo = self.read_u32(addr) as u64;
let hi = self.read_u32(addr + 4) as u64;
lo | (hi << 32)
}
fn write_u8(&self, _addr: u32, _val: u8) {
unimplemented!("test fixture only writes u32")
}
fn write_u16(&self, _addr: u32, _val: u16) {
unimplemented!("test fixture only writes u32")
}
fn write_u32(&self, addr: u32, val: u32) {
self.slot(addr).store(val, Ordering::Relaxed);
}
fn write_u64(&self, addr: u32, val: u64) {
self.write_u32(addr, val as u32);
self.write_u32(addr + 4, (val >> 32) as u32);
}
fn translate(&self, _addr: u32) -> Option<*const u8> {
None
}
fn translate_mut(&self, _addr: u32) -> Option<*mut u8> {
None
}
}
let mem: Arc<ScopedMem> = Arc::new(ScopedMem(std::array::from_fn(|_| {
AtomicU32::new(0)
})));
// Initialize fence and data slots to zero.
mem.write_u32(0, 0); // data
mem.write_u32(16, 0); // fence
let stop = Arc::new(AtomicBool::new(false));
let mem_p = mem.clone();
let stop_p = stop.clone();
let producer = thread::Builder::new()
.name("fence-producer".into())
.spawn(move || {
for i in 1u32..=10_000 {
if stop_p.load(Ordering::Relaxed) {
break;
}
mem_p.write_u32(0, i); // data
mem_p.write_u32_fence(16, i); // fence (Release)
thread::yield_now();
}
})
.expect("spawn producer");
let mem_c = mem.clone();
let consumer = thread::Builder::new()
.name("fence-consumer".into())
.spawn(move || {
let deadline = Instant::now() + Duration::from_millis(500);
let mut last_seen = 0u32;
let mut iters = 0u32;
while Instant::now() < deadline {
let f = mem_c.read_u32_fence(16); // Acquire
if f != last_seen {
let d = mem_c.read_u32(0);
// The data we read after the fence must be at
// least as new as the fence value (producer
// wrote `data = i; fence(i)` in that order).
assert!(
d >= f,
"fence ordering violated: data={d} fence={f}"
);
last_seen = f;
iters += 1;
}
}
iters
})
.expect("spawn consumer");
let observed = consumer.join().expect("consumer join");
stop.store(true, Ordering::Relaxed);
let _ = producer.join();
assert!(
observed > 0,
"consumer never observed a fence transition (race scheduler too unfair?)",
);
}
/// Spawning two no-op workers in parallel and joining both must
/// succeed without interference — proves the joiner side-thread
/// pattern doesn't accidentally serialize teardown.
#[test]
fn two_concurrent_noop_workers_both_shut_down() {
let shutdown_a = Arc::new(AtomicBool::new(false));
let handle_a = spawn_noop_worker(shutdown_a.clone());
let shutdown_b = Arc::new(AtomicBool::new(false));
let handle_b = spawn_noop_worker(shutdown_b.clone());
let r_a = shutdown_and_join_with_timeout(
&shutdown_a,
handle_a,
Duration::from_millis(250),
);
let r_b = shutdown_and_join_with_timeout(
&shutdown_b,
handle_b,
Duration::from_millis(250),
);
assert_eq!(r_a, Ok(()));
assert_eq!(r_b, Ok(()));
}
}