Split the monolithic interpreter into cohesive modules: dedicated decoder (decoder.rs) producing 8-byte DecodedInstr; opcode tables (opcode.rs); explicit traps (trap.rs); FPSCR helpers (fpscr.rs); overflow/carry helpers (overflow.rs); a 4 KiB-page-versioned decode cache and basic-block cache (block_cache.rs); and a full VMX/VMX128 implementation (vmx.rs) covering AltiVec + Xenon's 128-bit extensions. Add the parallel-execution substrate behind --parallel: a 7-party phaser (phaser.rs) for round-based barrier sync, ReservationTable (reservation.rs) for guest LL/SC, and the per-HW-thread scheduler core (scheduler.rs) that owns ThreadRefs, runqueues, and pending IRQs. Disassembler is now the single source of truth: disasm.rs gains the full base + extended + VMX128 mnemonic set, with golden JSON fixtures and a disasm_goldens test suite. Add a criterion-style interpreter bench. context.rs grows the per-thread state the new modules need (reservation slot, FPSCR, vector regs). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
346 lines
13 KiB
Rust
346 lines
13 KiB
Rust
//! Quantum-boundary phaser for the M3 per-HW-thread parallel scheduler.
|
|
//!
|
|
//! Six [`super::HW_THREAD_COUNT`] host threads run their slots' interpreters
|
|
//! in parallel, then meet at a phaser to advance to the next quantum. This
|
|
//! is **not** [`std::sync::Barrier`]: a Barrier needs a fixed party count,
|
|
//! but our slots can become idle (no runnable thread) and shouldn't block
|
|
//! the phaser arrival.
|
|
//!
|
|
//! ## Semantics
|
|
//!
|
|
//! - Each slot at the end of its quantum either calls
|
|
//! [`Phaser::arrive_and_wait`] (it has a runnable thread to run next
|
|
//! quantum) or [`Phaser::skip`] (it's idle this round and will wake on
|
|
//! `slot_wake[i]`).
|
|
//! - The phase advances when **all 6 slots have either arrived or
|
|
//! skipped**. Arrived slots block until the advance; skipped slots
|
|
//! return immediately and re-poll their wake state.
|
|
//! - The phaser uses a generation counter so a slot that arrives "early"
|
|
//! in the next phase doesn't see the prior phase's "all arrived"
|
|
//! condition.
|
|
//! - Defensive timeout: [`Phaser::arrive_and_wait_timeout`] returns
|
|
//! [`PhaserOutcome::Timeout`] if a peer crashes / hangs. Callers
|
|
//! typically convert this into a graceful shutdown rather than
|
|
//! panicking, so the rest of the topology can tear down cleanly.
|
|
//!
|
|
//! ## Memory ordering
|
|
//!
|
|
//! - The participant counter (`arrived` + `skipped`) uses `AcqRel` on
|
|
//! the increment so the last-to-arrive thread sees a consistent
|
|
//! "everyone is here" snapshot.
|
|
//! - The generation `phase` is read with `Acquire` in arrivers' wait
|
|
//! loops; the advancing thread stores with `Release` after bumping.
|
|
//! - The condvar's broadcast publishes the phase; the wait loop
|
|
//! re-checks `phase` against its captured value to defend against
|
|
//! spurious wakeups.
|
|
|
|
use std::sync::atomic::{AtomicU32, Ordering};
|
|
use std::sync::{Condvar, Mutex};
|
|
use std::time::{Duration, Instant};
|
|
|
|
/// Outcome of a phaser arrival.
|
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
|
pub enum PhaserOutcome {
|
|
/// All participants arrived/skipped — phase advanced. Caller proceeds
|
|
/// into the next quantum.
|
|
Advanced,
|
|
/// Defensive timeout fired before all peers arrived. Caller should
|
|
/// log + initiate shutdown rather than retry.
|
|
Timeout,
|
|
/// Phaser was shut down via [`Phaser::shutdown`]; all waiters are
|
|
/// woken and return this. Caller exits cleanly.
|
|
Shutdown,
|
|
}
|
|
|
|
/// Custom barrier-with-skip primitive. Construct once with the number of
|
|
/// participating slots; share via `Arc` across host threads.
|
|
pub struct Phaser {
|
|
/// Total participant count (constant after construction). For our
|
|
/// scheduler this is `HW_THREAD_COUNT = 6`.
|
|
party_count: u32,
|
|
/// Monotonic phase counter, incremented every time the phase
|
|
/// advances. Used as a generation marker so a slot that wakes "into"
|
|
/// the next phase doesn't observe the old "everyone arrived" state.
|
|
phase: AtomicU32,
|
|
/// Inner state guarded by the condvar's mutex.
|
|
inner: Mutex<Inner>,
|
|
/// Notified when a phase advances or shutdown fires.
|
|
cv: Condvar,
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
struct Inner {
|
|
arrived_or_skipped: u32,
|
|
shutdown: bool,
|
|
}
|
|
|
|
impl Phaser {
|
|
/// Create a phaser with `party_count` participants. Panics if
|
|
/// `party_count == 0`.
|
|
pub fn new(party_count: u32) -> Self {
|
|
assert!(party_count > 0, "phaser party_count must be > 0");
|
|
Self {
|
|
party_count,
|
|
phase: AtomicU32::new(0),
|
|
inner: Mutex::new(Inner {
|
|
arrived_or_skipped: 0,
|
|
shutdown: false,
|
|
}),
|
|
cv: Condvar::new(),
|
|
}
|
|
}
|
|
|
|
/// Get the current phase number. Useful for tests and observability.
|
|
pub fn current_phase(&self) -> u32 {
|
|
self.phase.load(Ordering::Acquire)
|
|
}
|
|
|
|
/// Mark this slot as not participating in the current phase. Counts
|
|
/// toward the advance threshold but does not block. Used when a slot
|
|
/// has no runnable thread and is parked waiting on
|
|
/// `slot_wake[i].unpark()`.
|
|
///
|
|
/// `_slot_id` is informational (not stored); the parameter exists so
|
|
/// call sites stay greppable.
|
|
pub fn skip(&self, _slot_id: u8) {
|
|
self.contribute_advance();
|
|
}
|
|
|
|
/// Block until the phase advances or the defensive 5-second timeout
|
|
/// fires. Returns [`PhaserOutcome::Advanced`] on a clean phase
|
|
/// transition; [`Timeout`] if a peer hung; [`Shutdown`] on tear-down.
|
|
///
|
|
/// `_slot_id` is informational (see [`Self::skip`]).
|
|
pub fn arrive_and_wait(&self, _slot_id: u8) -> PhaserOutcome {
|
|
self.arrive_and_wait_timeout(Duration::from_secs(5))
|
|
}
|
|
|
|
/// Same as [`Self::arrive_and_wait`] with a caller-supplied timeout.
|
|
pub fn arrive_and_wait_timeout(&self, timeout: Duration) -> PhaserOutcome {
|
|
let pre_phase = self.phase.load(Ordering::Acquire);
|
|
self.contribute_advance();
|
|
let deadline = Instant::now() + timeout;
|
|
let mut guard = self.inner.lock().unwrap();
|
|
loop {
|
|
if guard.shutdown {
|
|
return PhaserOutcome::Shutdown;
|
|
}
|
|
if self.phase.load(Ordering::Acquire) != pre_phase {
|
|
return PhaserOutcome::Advanced;
|
|
}
|
|
let now = Instant::now();
|
|
if now >= deadline {
|
|
return PhaserOutcome::Timeout;
|
|
}
|
|
let remaining = deadline - now;
|
|
let result = self.cv.wait_timeout(guard, remaining).unwrap();
|
|
guard = result.0;
|
|
if result.1.timed_out() {
|
|
// Loop once more to disambiguate "real timeout" vs
|
|
// "spurious wakeup just before the deadline".
|
|
if self.phase.load(Ordering::Acquire) != pre_phase {
|
|
return PhaserOutcome::Advanced;
|
|
}
|
|
if guard.shutdown {
|
|
return PhaserOutcome::Shutdown;
|
|
}
|
|
return PhaserOutcome::Timeout;
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Wake every parked arriver and signal shutdown. After this, all
|
|
/// future and outstanding `arrive_and_wait_*` calls return
|
|
/// [`PhaserOutcome::Shutdown`].
|
|
pub fn shutdown(&self) {
|
|
let mut guard = self.inner.lock().unwrap();
|
|
guard.shutdown = true;
|
|
self.cv.notify_all();
|
|
}
|
|
|
|
/// Common path for both arrive-and-wait and skip: bump the
|
|
/// participant counter, and if we were the last one in, advance the
|
|
/// phase + broadcast.
|
|
fn contribute_advance(&self) {
|
|
let mut guard = self.inner.lock().unwrap();
|
|
guard.arrived_or_skipped += 1;
|
|
if guard.arrived_or_skipped >= self.party_count {
|
|
// Last one in. Reset the counter, bump the phase, broadcast.
|
|
guard.arrived_or_skipped = 0;
|
|
// `Release` on the phase store pairs with `Acquire` reads in
|
|
// arriving slots' wait-loop predicates.
|
|
self.phase.fetch_add(1, Ordering::Release);
|
|
self.cv.notify_all();
|
|
}
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use std::sync::Arc;
|
|
use std::sync::atomic::AtomicU32;
|
|
use std::thread;
|
|
|
|
/// All N participants arrive — phase advances, every arriver returns
|
|
/// `Advanced`.
|
|
#[test]
|
|
fn n_arrivers_all_advance() {
|
|
const N: u32 = 6;
|
|
let p = Arc::new(Phaser::new(N));
|
|
let mut handles = Vec::new();
|
|
for i in 0..N {
|
|
let p = p.clone();
|
|
handles.push(
|
|
thread::Builder::new()
|
|
.name(format!("phaser-test-{i}"))
|
|
.spawn(move || p.arrive_and_wait(i as u8))
|
|
.unwrap(),
|
|
);
|
|
}
|
|
for h in handles {
|
|
assert_eq!(h.join().unwrap(), PhaserOutcome::Advanced);
|
|
}
|
|
assert_eq!(p.current_phase(), 1);
|
|
}
|
|
|
|
/// 5 arrive + 1 skip → phase advances; arrivers see `Advanced`.
|
|
#[test]
|
|
fn skip_counts_toward_advance() {
|
|
const N: u32 = 6;
|
|
let p = Arc::new(Phaser::new(N));
|
|
let mut handles = Vec::new();
|
|
for i in 0..(N - 1) {
|
|
let p = p.clone();
|
|
handles.push(
|
|
thread::Builder::new()
|
|
.name(format!("phaser-arrive-{i}"))
|
|
.spawn(move || p.arrive_and_wait(i as u8))
|
|
.unwrap(),
|
|
);
|
|
}
|
|
// Brief pause to let arrivers park first (exercising the
|
|
// skip-unblocks-arrivers path).
|
|
thread::sleep(Duration::from_millis(20));
|
|
p.skip((N - 1) as u8);
|
|
for h in handles {
|
|
assert_eq!(h.join().unwrap(), PhaserOutcome::Advanced);
|
|
}
|
|
assert_eq!(p.current_phase(), 1);
|
|
}
|
|
|
|
/// Shutdown wakes parked arrivers; they return `Shutdown`.
|
|
#[test]
|
|
fn shutdown_wakes_arrivers() {
|
|
const N: u32 = 6;
|
|
let p = Arc::new(Phaser::new(N));
|
|
let mut handles = Vec::new();
|
|
// Only N-1 arrive — phase will not advance.
|
|
for i in 0..(N - 1) {
|
|
let p = p.clone();
|
|
handles.push(
|
|
thread::Builder::new()
|
|
.name(format!("phaser-arrive-shutdown-{i}"))
|
|
.spawn(move || p.arrive_and_wait(i as u8))
|
|
.unwrap(),
|
|
);
|
|
}
|
|
thread::sleep(Duration::from_millis(20));
|
|
p.shutdown();
|
|
for h in handles {
|
|
assert_eq!(h.join().unwrap(), PhaserOutcome::Shutdown);
|
|
}
|
|
}
|
|
|
|
/// Defensive timeout: if some peers never arrive, others surface
|
|
/// `Timeout` rather than blocking forever.
|
|
#[test]
|
|
fn timeout_fires_when_peer_hangs() {
|
|
const N: u32 = 4;
|
|
let p = Arc::new(Phaser::new(N));
|
|
// Only 2 of 4 arrive — others "hang".
|
|
let p1 = p.clone();
|
|
let h1 = thread::spawn(move || {
|
|
p1.arrive_and_wait_timeout(Duration::from_millis(50))
|
|
});
|
|
let p2 = p.clone();
|
|
let h2 = thread::spawn(move || {
|
|
p2.arrive_and_wait_timeout(Duration::from_millis(50))
|
|
});
|
|
assert_eq!(h1.join().unwrap(), PhaserOutcome::Timeout);
|
|
assert_eq!(h2.join().unwrap(), PhaserOutcome::Timeout);
|
|
}
|
|
|
|
/// Multi-phase stress: all participants run a tight loop of
|
|
/// arrive_and_wait calls; after K phases they all observe the same
|
|
/// `current_phase()` value. Catches generation/counter resync bugs.
|
|
#[test]
|
|
fn multi_phase_progress() {
|
|
const N: u32 = 6;
|
|
const K: u32 = 1000;
|
|
let p = Arc::new(Phaser::new(N));
|
|
let counter = Arc::new(AtomicU32::new(0));
|
|
let mut handles = Vec::new();
|
|
for i in 0..N {
|
|
let p = p.clone();
|
|
let c = counter.clone();
|
|
handles.push(
|
|
thread::Builder::new()
|
|
.name(format!("phaser-multi-{i}"))
|
|
.spawn(move || {
|
|
for _ in 0..K {
|
|
assert_eq!(
|
|
p.arrive_and_wait(i as u8),
|
|
PhaserOutcome::Advanced
|
|
);
|
|
}
|
|
c.fetch_add(1, Ordering::Relaxed);
|
|
})
|
|
.unwrap(),
|
|
);
|
|
}
|
|
for h in handles {
|
|
h.join().unwrap();
|
|
}
|
|
assert_eq!(counter.load(Ordering::Relaxed), N);
|
|
assert_eq!(p.current_phase(), K);
|
|
}
|
|
|
|
/// Mixed skip/arrive across phases — emulates the realistic scheduler
|
|
/// pattern where slots become idle for some quanta.
|
|
#[test]
|
|
fn mixed_skip_and_arrive_random() {
|
|
const N: u32 = 6;
|
|
const K: u32 = 200;
|
|
let p = Arc::new(Phaser::new(N));
|
|
let mut handles = Vec::new();
|
|
for i in 0..N {
|
|
let p = p.clone();
|
|
handles.push(
|
|
thread::Builder::new()
|
|
.name(format!("phaser-mixed-{i}"))
|
|
.spawn(move || {
|
|
// Pseudo-random skip pattern based on slot+phase
|
|
let mut state: u32 = 0x9E37_79B9u32.wrapping_add(i);
|
|
for phase in 0..K {
|
|
state = state.wrapping_mul(0x6C8E_9CF7).wrapping_add(phase);
|
|
if state & 0xF == 0 {
|
|
p.skip(i as u8);
|
|
} else {
|
|
let _ = p.arrive_and_wait(i as u8);
|
|
}
|
|
}
|
|
})
|
|
.unwrap(),
|
|
);
|
|
}
|
|
for h in handles {
|
|
h.join().unwrap();
|
|
}
|
|
// After K rounds with all-N participation each phase, the phase
|
|
// counter equals K. Each iteration contributes exactly N to the
|
|
// counter (split between arrive and skip).
|
|
assert_eq!(p.current_phase(), K);
|
|
}
|
|
}
|