//! Quantum-boundary phaser for the M3 per-HW-thread parallel scheduler. //! //! Six [`super::HW_THREAD_COUNT`] host threads run their slots' interpreters //! in parallel, then meet at a phaser to advance to the next quantum. This //! is **not** [`std::sync::Barrier`]: a Barrier needs a fixed party count, //! but our slots can become idle (no runnable thread) and shouldn't block //! the phaser arrival. //! //! ## Semantics //! //! - Each slot at the end of its quantum either calls //! [`Phaser::arrive_and_wait`] (it has a runnable thread to run next //! quantum) or [`Phaser::skip`] (it's idle this round and will wake on //! `slot_wake[i]`). //! - The phase advances when **all 6 slots have either arrived or //! skipped**. Arrived slots block until the advance; skipped slots //! return immediately and re-poll their wake state. //! - The phaser uses a generation counter so a slot that arrives "early" //! in the next phase doesn't see the prior phase's "all arrived" //! condition. //! - Defensive timeout: [`Phaser::arrive_and_wait_timeout`] returns //! [`PhaserOutcome::Timeout`] if a peer crashes / hangs. Callers //! typically convert this into a graceful shutdown rather than //! panicking, so the rest of the topology can tear down cleanly. //! //! ## Memory ordering //! //! - The participant counter (`arrived` + `skipped`) uses `AcqRel` on //! the increment so the last-to-arrive thread sees a consistent //! "everyone is here" snapshot. //! - The generation `phase` is read with `Acquire` in arrivers' wait //! loops; the advancing thread stores with `Release` after bumping. //! - The condvar's broadcast publishes the phase; the wait loop //! re-checks `phase` against its captured value to defend against //! spurious wakeups. use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::{Condvar, Mutex}; use std::time::{Duration, Instant}; /// Outcome of a phaser arrival. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum PhaserOutcome { /// All participants arrived/skipped — phase advanced. Caller proceeds /// into the next quantum. Advanced, /// Defensive timeout fired before all peers arrived. Caller should /// log + initiate shutdown rather than retry. Timeout, /// Phaser was shut down via [`Phaser::shutdown`]; all waiters are /// woken and return this. Caller exits cleanly. Shutdown, } /// Custom barrier-with-skip primitive. Construct once with the number of /// participating slots; share via `Arc` across host threads. pub struct Phaser { /// Total participant count (constant after construction). For our /// scheduler this is `HW_THREAD_COUNT = 6`. party_count: u32, /// Monotonic phase counter, incremented every time the phase /// advances. Used as a generation marker so a slot that wakes "into" /// the next phase doesn't observe the old "everyone arrived" state. phase: AtomicU32, /// Inner state guarded by the condvar's mutex. inner: Mutex, /// Notified when a phase advances or shutdown fires. cv: Condvar, } #[derive(Debug)] struct Inner { arrived_or_skipped: u32, shutdown: bool, } impl Phaser { /// Create a phaser with `party_count` participants. Panics if /// `party_count == 0`. pub fn new(party_count: u32) -> Self { assert!(party_count > 0, "phaser party_count must be > 0"); Self { party_count, phase: AtomicU32::new(0), inner: Mutex::new(Inner { arrived_or_skipped: 0, shutdown: false, }), cv: Condvar::new(), } } /// Get the current phase number. Useful for tests and observability. pub fn current_phase(&self) -> u32 { self.phase.load(Ordering::Acquire) } /// Mark this slot as not participating in the current phase. Counts /// toward the advance threshold but does not block. Used when a slot /// has no runnable thread and is parked waiting on /// `slot_wake[i].unpark()`. /// /// `_slot_id` is informational (not stored); the parameter exists so /// call sites stay greppable. pub fn skip(&self, _slot_id: u8) { self.contribute_advance(); } /// Block until the phase advances or the defensive 5-second timeout /// fires. Returns [`PhaserOutcome::Advanced`] on a clean phase /// transition; [`Timeout`] if a peer hung; [`Shutdown`] on tear-down. /// /// `_slot_id` is informational (see [`Self::skip`]). pub fn arrive_and_wait(&self, _slot_id: u8) -> PhaserOutcome { self.arrive_and_wait_timeout(Duration::from_secs(5)) } /// Same as [`Self::arrive_and_wait`] with a caller-supplied timeout. pub fn arrive_and_wait_timeout(&self, timeout: Duration) -> PhaserOutcome { let pre_phase = self.phase.load(Ordering::Acquire); self.contribute_advance(); let deadline = Instant::now() + timeout; let mut guard = self.inner.lock().unwrap(); loop { if guard.shutdown { return PhaserOutcome::Shutdown; } if self.phase.load(Ordering::Acquire) != pre_phase { return PhaserOutcome::Advanced; } let now = Instant::now(); if now >= deadline { return PhaserOutcome::Timeout; } let remaining = deadline - now; let result = self.cv.wait_timeout(guard, remaining).unwrap(); guard = result.0; if result.1.timed_out() { // Loop once more to disambiguate "real timeout" vs // "spurious wakeup just before the deadline". if self.phase.load(Ordering::Acquire) != pre_phase { return PhaserOutcome::Advanced; } if guard.shutdown { return PhaserOutcome::Shutdown; } return PhaserOutcome::Timeout; } } } /// Wake every parked arriver and signal shutdown. After this, all /// future and outstanding `arrive_and_wait_*` calls return /// [`PhaserOutcome::Shutdown`]. pub fn shutdown(&self) { let mut guard = self.inner.lock().unwrap(); guard.shutdown = true; self.cv.notify_all(); } /// Common path for both arrive-and-wait and skip: bump the /// participant counter, and if we were the last one in, advance the /// phase + broadcast. fn contribute_advance(&self) { let mut guard = self.inner.lock().unwrap(); guard.arrived_or_skipped += 1; if guard.arrived_or_skipped >= self.party_count { // Last one in. Reset the counter, bump the phase, broadcast. guard.arrived_or_skipped = 0; // `Release` on the phase store pairs with `Acquire` reads in // arriving slots' wait-loop predicates. self.phase.fetch_add(1, Ordering::Release); self.cv.notify_all(); } } } #[cfg(test)] mod tests { use super::*; use std::sync::Arc; use std::sync::atomic::AtomicU32; use std::thread; /// All N participants arrive — phase advances, every arriver returns /// `Advanced`. #[test] fn n_arrivers_all_advance() { const N: u32 = 6; let p = Arc::new(Phaser::new(N)); let mut handles = Vec::new(); for i in 0..N { let p = p.clone(); handles.push( thread::Builder::new() .name(format!("phaser-test-{i}")) .spawn(move || p.arrive_and_wait(i as u8)) .unwrap(), ); } for h in handles { assert_eq!(h.join().unwrap(), PhaserOutcome::Advanced); } assert_eq!(p.current_phase(), 1); } /// 5 arrive + 1 skip → phase advances; arrivers see `Advanced`. #[test] fn skip_counts_toward_advance() { const N: u32 = 6; let p = Arc::new(Phaser::new(N)); let mut handles = Vec::new(); for i in 0..(N - 1) { let p = p.clone(); handles.push( thread::Builder::new() .name(format!("phaser-arrive-{i}")) .spawn(move || p.arrive_and_wait(i as u8)) .unwrap(), ); } // Brief pause to let arrivers park first (exercising the // skip-unblocks-arrivers path). thread::sleep(Duration::from_millis(20)); p.skip((N - 1) as u8); for h in handles { assert_eq!(h.join().unwrap(), PhaserOutcome::Advanced); } assert_eq!(p.current_phase(), 1); } /// Shutdown wakes parked arrivers; they return `Shutdown`. #[test] fn shutdown_wakes_arrivers() { const N: u32 = 6; let p = Arc::new(Phaser::new(N)); let mut handles = Vec::new(); // Only N-1 arrive — phase will not advance. for i in 0..(N - 1) { let p = p.clone(); handles.push( thread::Builder::new() .name(format!("phaser-arrive-shutdown-{i}")) .spawn(move || p.arrive_and_wait(i as u8)) .unwrap(), ); } thread::sleep(Duration::from_millis(20)); p.shutdown(); for h in handles { assert_eq!(h.join().unwrap(), PhaserOutcome::Shutdown); } } /// Defensive timeout: if some peers never arrive, others surface /// `Timeout` rather than blocking forever. #[test] fn timeout_fires_when_peer_hangs() { const N: u32 = 4; let p = Arc::new(Phaser::new(N)); // Only 2 of 4 arrive — others "hang". let p1 = p.clone(); let h1 = thread::spawn(move || { p1.arrive_and_wait_timeout(Duration::from_millis(50)) }); let p2 = p.clone(); let h2 = thread::spawn(move || { p2.arrive_and_wait_timeout(Duration::from_millis(50)) }); assert_eq!(h1.join().unwrap(), PhaserOutcome::Timeout); assert_eq!(h2.join().unwrap(), PhaserOutcome::Timeout); } /// Multi-phase stress: all participants run a tight loop of /// arrive_and_wait calls; after K phases they all observe the same /// `current_phase()` value. Catches generation/counter resync bugs. #[test] fn multi_phase_progress() { const N: u32 = 6; const K: u32 = 1000; let p = Arc::new(Phaser::new(N)); let counter = Arc::new(AtomicU32::new(0)); let mut handles = Vec::new(); for i in 0..N { let p = p.clone(); let c = counter.clone(); handles.push( thread::Builder::new() .name(format!("phaser-multi-{i}")) .spawn(move || { for _ in 0..K { assert_eq!( p.arrive_and_wait(i as u8), PhaserOutcome::Advanced ); } c.fetch_add(1, Ordering::Relaxed); }) .unwrap(), ); } for h in handles { h.join().unwrap(); } assert_eq!(counter.load(Ordering::Relaxed), N); assert_eq!(p.current_phase(), K); } /// Mixed skip/arrive across phases — emulates the realistic scheduler /// pattern where slots become idle for some quanta. #[test] fn mixed_skip_and_arrive_random() { const N: u32 = 6; const K: u32 = 200; let p = Arc::new(Phaser::new(N)); let mut handles = Vec::new(); for i in 0..N { let p = p.clone(); handles.push( thread::Builder::new() .name(format!("phaser-mixed-{i}")) .spawn(move || { // Pseudo-random skip pattern based on slot+phase let mut state: u32 = 0x9E37_79B9u32.wrapping_add(i); for phase in 0..K { state = state.wrapping_mul(0x6C8E_9CF7).wrapping_add(phase); if state & 0xF == 0 { p.skip(i as u8); } else { let _ = p.arrive_and_wait(i as u8); } } }) .unwrap(), ); } for h in handles { h.join().unwrap(); } // After K rounds with all-N participation each phase, the phase // counter equals K. Each iteration contributes exactly N to the // counter (split between arrive and skip). assert_eq!(p.current_phase(), K); } }