xenia-cpu: VMX128, FPSCR, decoder split, scheduler, decode/block caches
Split the monolithic interpreter into cohesive modules: dedicated decoder (decoder.rs) producing 8-byte DecodedInstr; opcode tables (opcode.rs); explicit traps (trap.rs); FPSCR helpers (fpscr.rs); overflow/carry helpers (overflow.rs); a 4 KiB-page-versioned decode cache and basic-block cache (block_cache.rs); and a full VMX/VMX128 implementation (vmx.rs) covering AltiVec + Xenon's 128-bit extensions. Add the parallel-execution substrate behind --parallel: a 7-party phaser (phaser.rs) for round-based barrier sync, ReservationTable (reservation.rs) for guest LL/SC, and the per-HW-thread scheduler core (scheduler.rs) that owns ThreadRefs, runqueues, and pending IRQs. Disassembler is now the single source of truth: disasm.rs gains the full base + extended + VMX128 mnemonic set, with golden JSON fixtures and a disasm_goldens test suite. Add a criterion-style interpreter bench. context.rs grows the per-thread state the new modules need (reservation slot, FPSCR, vector regs). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
345
crates/xenia-cpu/src/phaser.rs
Normal file
345
crates/xenia-cpu/src/phaser.rs
Normal file
@@ -0,0 +1,345 @@
|
||||
//! Quantum-boundary phaser for the M3 per-HW-thread parallel scheduler.
|
||||
//!
|
||||
//! Six [`super::HW_THREAD_COUNT`] host threads run their slots' interpreters
|
||||
//! in parallel, then meet at a phaser to advance to the next quantum. This
|
||||
//! is **not** [`std::sync::Barrier`]: a Barrier needs a fixed party count,
|
||||
//! but our slots can become idle (no runnable thread) and shouldn't block
|
||||
//! the phaser arrival.
|
||||
//!
|
||||
//! ## Semantics
|
||||
//!
|
||||
//! - Each slot at the end of its quantum either calls
|
||||
//! [`Phaser::arrive_and_wait`] (it has a runnable thread to run next
|
||||
//! quantum) or [`Phaser::skip`] (it's idle this round and will wake on
|
||||
//! `slot_wake[i]`).
|
||||
//! - The phase advances when **all 6 slots have either arrived or
|
||||
//! skipped**. Arrived slots block until the advance; skipped slots
|
||||
//! return immediately and re-poll their wake state.
|
||||
//! - The phaser uses a generation counter so a slot that arrives "early"
|
||||
//! in the next phase doesn't see the prior phase's "all arrived"
|
||||
//! condition.
|
||||
//! - Defensive timeout: [`Phaser::arrive_and_wait_timeout`] returns
|
||||
//! [`PhaserOutcome::Timeout`] if a peer crashes / hangs. Callers
|
||||
//! typically convert this into a graceful shutdown rather than
|
||||
//! panicking, so the rest of the topology can tear down cleanly.
|
||||
//!
|
||||
//! ## Memory ordering
|
||||
//!
|
||||
//! - The participant counter (`arrived` + `skipped`) uses `AcqRel` on
|
||||
//! the increment so the last-to-arrive thread sees a consistent
|
||||
//! "everyone is here" snapshot.
|
||||
//! - The generation `phase` is read with `Acquire` in arrivers' wait
|
||||
//! loops; the advancing thread stores with `Release` after bumping.
|
||||
//! - The condvar's broadcast publishes the phase; the wait loop
|
||||
//! re-checks `phase` against its captured value to defend against
|
||||
//! spurious wakeups.
|
||||
|
||||
use std::sync::atomic::{AtomicU32, Ordering};
|
||||
use std::sync::{Condvar, Mutex};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
/// Outcome of a phaser arrival.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum PhaserOutcome {
|
||||
/// All participants arrived/skipped — phase advanced. Caller proceeds
|
||||
/// into the next quantum.
|
||||
Advanced,
|
||||
/// Defensive timeout fired before all peers arrived. Caller should
|
||||
/// log + initiate shutdown rather than retry.
|
||||
Timeout,
|
||||
/// Phaser was shut down via [`Phaser::shutdown`]; all waiters are
|
||||
/// woken and return this. Caller exits cleanly.
|
||||
Shutdown,
|
||||
}
|
||||
|
||||
/// Custom barrier-with-skip primitive. Construct once with the number of
|
||||
/// participating slots; share via `Arc` across host threads.
|
||||
pub struct Phaser {
|
||||
/// Total participant count (constant after construction). For our
|
||||
/// scheduler this is `HW_THREAD_COUNT = 6`.
|
||||
party_count: u32,
|
||||
/// Monotonic phase counter, incremented every time the phase
|
||||
/// advances. Used as a generation marker so a slot that wakes "into"
|
||||
/// the next phase doesn't observe the old "everyone arrived" state.
|
||||
phase: AtomicU32,
|
||||
/// Inner state guarded by the condvar's mutex.
|
||||
inner: Mutex<Inner>,
|
||||
/// Notified when a phase advances or shutdown fires.
|
||||
cv: Condvar,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct Inner {
|
||||
arrived_or_skipped: u32,
|
||||
shutdown: bool,
|
||||
}
|
||||
|
||||
impl Phaser {
|
||||
/// Create a phaser with `party_count` participants. Panics if
|
||||
/// `party_count == 0`.
|
||||
pub fn new(party_count: u32) -> Self {
|
||||
assert!(party_count > 0, "phaser party_count must be > 0");
|
||||
Self {
|
||||
party_count,
|
||||
phase: AtomicU32::new(0),
|
||||
inner: Mutex::new(Inner {
|
||||
arrived_or_skipped: 0,
|
||||
shutdown: false,
|
||||
}),
|
||||
cv: Condvar::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the current phase number. Useful for tests and observability.
|
||||
pub fn current_phase(&self) -> u32 {
|
||||
self.phase.load(Ordering::Acquire)
|
||||
}
|
||||
|
||||
/// Mark this slot as not participating in the current phase. Counts
|
||||
/// toward the advance threshold but does not block. Used when a slot
|
||||
/// has no runnable thread and is parked waiting on
|
||||
/// `slot_wake[i].unpark()`.
|
||||
///
|
||||
/// `_slot_id` is informational (not stored); the parameter exists so
|
||||
/// call sites stay greppable.
|
||||
pub fn skip(&self, _slot_id: u8) {
|
||||
self.contribute_advance();
|
||||
}
|
||||
|
||||
/// Block until the phase advances or the defensive 5-second timeout
|
||||
/// fires. Returns [`PhaserOutcome::Advanced`] on a clean phase
|
||||
/// transition; [`Timeout`] if a peer hung; [`Shutdown`] on tear-down.
|
||||
///
|
||||
/// `_slot_id` is informational (see [`Self::skip`]).
|
||||
pub fn arrive_and_wait(&self, _slot_id: u8) -> PhaserOutcome {
|
||||
self.arrive_and_wait_timeout(Duration::from_secs(5))
|
||||
}
|
||||
|
||||
/// Same as [`Self::arrive_and_wait`] with a caller-supplied timeout.
|
||||
pub fn arrive_and_wait_timeout(&self, timeout: Duration) -> PhaserOutcome {
|
||||
let pre_phase = self.phase.load(Ordering::Acquire);
|
||||
self.contribute_advance();
|
||||
let deadline = Instant::now() + timeout;
|
||||
let mut guard = self.inner.lock().unwrap();
|
||||
loop {
|
||||
if guard.shutdown {
|
||||
return PhaserOutcome::Shutdown;
|
||||
}
|
||||
if self.phase.load(Ordering::Acquire) != pre_phase {
|
||||
return PhaserOutcome::Advanced;
|
||||
}
|
||||
let now = Instant::now();
|
||||
if now >= deadline {
|
||||
return PhaserOutcome::Timeout;
|
||||
}
|
||||
let remaining = deadline - now;
|
||||
let result = self.cv.wait_timeout(guard, remaining).unwrap();
|
||||
guard = result.0;
|
||||
if result.1.timed_out() {
|
||||
// Loop once more to disambiguate "real timeout" vs
|
||||
// "spurious wakeup just before the deadline".
|
||||
if self.phase.load(Ordering::Acquire) != pre_phase {
|
||||
return PhaserOutcome::Advanced;
|
||||
}
|
||||
if guard.shutdown {
|
||||
return PhaserOutcome::Shutdown;
|
||||
}
|
||||
return PhaserOutcome::Timeout;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Wake every parked arriver and signal shutdown. After this, all
|
||||
/// future and outstanding `arrive_and_wait_*` calls return
|
||||
/// [`PhaserOutcome::Shutdown`].
|
||||
pub fn shutdown(&self) {
|
||||
let mut guard = self.inner.lock().unwrap();
|
||||
guard.shutdown = true;
|
||||
self.cv.notify_all();
|
||||
}
|
||||
|
||||
/// Common path for both arrive-and-wait and skip: bump the
|
||||
/// participant counter, and if we were the last one in, advance the
|
||||
/// phase + broadcast.
|
||||
fn contribute_advance(&self) {
|
||||
let mut guard = self.inner.lock().unwrap();
|
||||
guard.arrived_or_skipped += 1;
|
||||
if guard.arrived_or_skipped >= self.party_count {
|
||||
// Last one in. Reset the counter, bump the phase, broadcast.
|
||||
guard.arrived_or_skipped = 0;
|
||||
// `Release` on the phase store pairs with `Acquire` reads in
|
||||
// arriving slots' wait-loop predicates.
|
||||
self.phase.fetch_add(1, Ordering::Release);
|
||||
self.cv.notify_all();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicU32;
|
||||
use std::thread;
|
||||
|
||||
/// All N participants arrive — phase advances, every arriver returns
|
||||
/// `Advanced`.
|
||||
#[test]
|
||||
fn n_arrivers_all_advance() {
|
||||
const N: u32 = 6;
|
||||
let p = Arc::new(Phaser::new(N));
|
||||
let mut handles = Vec::new();
|
||||
for i in 0..N {
|
||||
let p = p.clone();
|
||||
handles.push(
|
||||
thread::Builder::new()
|
||||
.name(format!("phaser-test-{i}"))
|
||||
.spawn(move || p.arrive_and_wait(i as u8))
|
||||
.unwrap(),
|
||||
);
|
||||
}
|
||||
for h in handles {
|
||||
assert_eq!(h.join().unwrap(), PhaserOutcome::Advanced);
|
||||
}
|
||||
assert_eq!(p.current_phase(), 1);
|
||||
}
|
||||
|
||||
/// 5 arrive + 1 skip → phase advances; arrivers see `Advanced`.
|
||||
#[test]
|
||||
fn skip_counts_toward_advance() {
|
||||
const N: u32 = 6;
|
||||
let p = Arc::new(Phaser::new(N));
|
||||
let mut handles = Vec::new();
|
||||
for i in 0..(N - 1) {
|
||||
let p = p.clone();
|
||||
handles.push(
|
||||
thread::Builder::new()
|
||||
.name(format!("phaser-arrive-{i}"))
|
||||
.spawn(move || p.arrive_and_wait(i as u8))
|
||||
.unwrap(),
|
||||
);
|
||||
}
|
||||
// Brief pause to let arrivers park first (exercising the
|
||||
// skip-unblocks-arrivers path).
|
||||
thread::sleep(Duration::from_millis(20));
|
||||
p.skip((N - 1) as u8);
|
||||
for h in handles {
|
||||
assert_eq!(h.join().unwrap(), PhaserOutcome::Advanced);
|
||||
}
|
||||
assert_eq!(p.current_phase(), 1);
|
||||
}
|
||||
|
||||
/// Shutdown wakes parked arrivers; they return `Shutdown`.
|
||||
#[test]
|
||||
fn shutdown_wakes_arrivers() {
|
||||
const N: u32 = 6;
|
||||
let p = Arc::new(Phaser::new(N));
|
||||
let mut handles = Vec::new();
|
||||
// Only N-1 arrive — phase will not advance.
|
||||
for i in 0..(N - 1) {
|
||||
let p = p.clone();
|
||||
handles.push(
|
||||
thread::Builder::new()
|
||||
.name(format!("phaser-arrive-shutdown-{i}"))
|
||||
.spawn(move || p.arrive_and_wait(i as u8))
|
||||
.unwrap(),
|
||||
);
|
||||
}
|
||||
thread::sleep(Duration::from_millis(20));
|
||||
p.shutdown();
|
||||
for h in handles {
|
||||
assert_eq!(h.join().unwrap(), PhaserOutcome::Shutdown);
|
||||
}
|
||||
}
|
||||
|
||||
/// Defensive timeout: if some peers never arrive, others surface
|
||||
/// `Timeout` rather than blocking forever.
|
||||
#[test]
|
||||
fn timeout_fires_when_peer_hangs() {
|
||||
const N: u32 = 4;
|
||||
let p = Arc::new(Phaser::new(N));
|
||||
// Only 2 of 4 arrive — others "hang".
|
||||
let p1 = p.clone();
|
||||
let h1 = thread::spawn(move || {
|
||||
p1.arrive_and_wait_timeout(Duration::from_millis(50))
|
||||
});
|
||||
let p2 = p.clone();
|
||||
let h2 = thread::spawn(move || {
|
||||
p2.arrive_and_wait_timeout(Duration::from_millis(50))
|
||||
});
|
||||
assert_eq!(h1.join().unwrap(), PhaserOutcome::Timeout);
|
||||
assert_eq!(h2.join().unwrap(), PhaserOutcome::Timeout);
|
||||
}
|
||||
|
||||
/// Multi-phase stress: all participants run a tight loop of
|
||||
/// arrive_and_wait calls; after K phases they all observe the same
|
||||
/// `current_phase()` value. Catches generation/counter resync bugs.
|
||||
#[test]
|
||||
fn multi_phase_progress() {
|
||||
const N: u32 = 6;
|
||||
const K: u32 = 1000;
|
||||
let p = Arc::new(Phaser::new(N));
|
||||
let counter = Arc::new(AtomicU32::new(0));
|
||||
let mut handles = Vec::new();
|
||||
for i in 0..N {
|
||||
let p = p.clone();
|
||||
let c = counter.clone();
|
||||
handles.push(
|
||||
thread::Builder::new()
|
||||
.name(format!("phaser-multi-{i}"))
|
||||
.spawn(move || {
|
||||
for _ in 0..K {
|
||||
assert_eq!(
|
||||
p.arrive_and_wait(i as u8),
|
||||
PhaserOutcome::Advanced
|
||||
);
|
||||
}
|
||||
c.fetch_add(1, Ordering::Relaxed);
|
||||
})
|
||||
.unwrap(),
|
||||
);
|
||||
}
|
||||
for h in handles {
|
||||
h.join().unwrap();
|
||||
}
|
||||
assert_eq!(counter.load(Ordering::Relaxed), N);
|
||||
assert_eq!(p.current_phase(), K);
|
||||
}
|
||||
|
||||
/// Mixed skip/arrive across phases — emulates the realistic scheduler
|
||||
/// pattern where slots become idle for some quanta.
|
||||
#[test]
|
||||
fn mixed_skip_and_arrive_random() {
|
||||
const N: u32 = 6;
|
||||
const K: u32 = 200;
|
||||
let p = Arc::new(Phaser::new(N));
|
||||
let mut handles = Vec::new();
|
||||
for i in 0..N {
|
||||
let p = p.clone();
|
||||
handles.push(
|
||||
thread::Builder::new()
|
||||
.name(format!("phaser-mixed-{i}"))
|
||||
.spawn(move || {
|
||||
// Pseudo-random skip pattern based on slot+phase
|
||||
let mut state: u32 = 0x9E37_79B9u32.wrapping_add(i);
|
||||
for phase in 0..K {
|
||||
state = state.wrapping_mul(0x6C8E_9CF7).wrapping_add(phase);
|
||||
if state & 0xF == 0 {
|
||||
p.skip(i as u8);
|
||||
} else {
|
||||
let _ = p.arrive_and_wait(i as u8);
|
||||
}
|
||||
}
|
||||
})
|
||||
.unwrap(),
|
||||
);
|
||||
}
|
||||
for h in handles {
|
||||
h.join().unwrap();
|
||||
}
|
||||
// After K rounds with all-N participation each phase, the phase
|
||||
// counter equals K. Each iteration contributes exactly N to the
|
||||
// counter (split between arrive and skip).
|
||||
assert_eq!(p.current_phase(), K);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user