//! Round-robin scheduler over 6 HW threads with per-slot runqueues. //! //! Execution is serialized on a single host thread (the interpreter thread; //! `GuestMemory` is pinned and deliberately not thread-safe). The scheduler //! is a pure data container — kernel code parks, wakes, and mutates state //! through its public methods; it knows nothing about kernel objects. //! //! ## Model (post-Axis-1) //! //! - `HW_THREAD_COUNT = 6`, matching real Xenon hardware (3 cores × 2 SMT). //! - Each `HwSlot` carries a runqueue `Vec` — any state, //! `pick_runnable` filters Ready/ServicingIrq when choosing the live thread. //! - A `GuestThread` owns its own `PpcContext` inline. The live register //! file is always whichever thread the slot has pinned as running — no //! memcpy on context switch. //! - `ThreadRef { hw_id, idx }` is the stable identity used in waiter lists //! and anywhere a specific thread needs to be addressed across slot //! boundaries. Positional refs are cheap but **must** be fixed up after //! `swap_remove` (Axis 4 affinity migration does this explicitly). //! //! Every scheduler round: for each slot with a runnable thread, pick the //! highest-priority Ready thread and advance it one guest instruction (or //! one import-thunk dispatch). Blocked/Exited threads stay resident in the //! runqueue so their `ThreadRef` doesn't shift under kernel waiter lists. use crate::context::PpcContext; /// Number of emulated HW threads. Real Xbox 360 Xenon = 3 cores × 2 SMT = 6. pub const HW_THREAD_COUNT: usize = 6; /// Guest thread id assigned to the initial (module-entry) guest thread. pub const INITIAL_GUEST_TID: u32 = 1; /// Default per-thread instruction quantum. Consumed by Axis 3 (`decrement_quantum`); /// Axis 1 carries the field on every thread but doesn't decrement yet. pub const QUANTUM_DEFAULT: u32 = 50_000; /// Above this depth, `spawn` prunes `Exited` entries from a slot's runqueue /// before pushing the new thread. Keeps peer `ThreadRef`s stable on the /// common (low-depth) path — a game that spawns a handful of long-lived /// workers never triggers a compaction; a game that rapidly churns threads /// gets one when the slot fills up. const PRUNE_DEPTH_THRESHOLD: usize = 4; /// Stable identity for a guest thread across all scheduler tables. /// /// The positional `idx` is only valid while the source slot's runqueue /// has not been mutated by a `swap_remove`. All sites that do so (Axis 4 /// affinity migration, `prune_exited`) must fix up every `ThreadRef` they /// invalidate. /// /// **M2.3 generation packing.** Under M3's per-HW-thread parallelism, an /// `idx` reused after `swap_remove` could match a stale `ThreadRef` held /// in another thread's waiter list (the classic ABA hazard). The /// `generation` byte distinguishes such reuses. M2 introduces the field /// (set to `0` on fresh spawns) without yet bumping it — no concurrent /// remove paths exist before M3. The migration-fixup site at /// [`MigrationFixup::apply`] will bump generations once M3 lands. /// /// Layout: 1 + 1 + 2 = 4 bytes (no padding). 256 reuses per slot before /// wraparound; with `PRUNE_DEPTH_THRESHOLD = 4` keeping slots shallow, /// that is well above any realistic churn rate. #[derive(Copy, Clone, Debug, Eq, PartialEq, Hash, Default)] pub struct ThreadRef { pub hw_id: u8, pub generation: u8, pub idx: u16, } impl ThreadRef { /// Construct a `ThreadRef` with `generation = 0`. Used by every /// fresh-spawn / re-bind site that doesn't track generations /// directly. Sites that DO track generations (the migration-fixup /// path under M3) construct via struct literals so they're greppable. pub const fn new(hw_id: u8, idx: u16) -> Self { Self { hw_id, idx, generation: 0, } } /// Construct a `ThreadRef` with an explicit generation. Used by the /// migration-fixup path at M3. pub const fn with_generation(hw_id: u8, idx: u16, generation: u8) -> Self { Self { hw_id, idx, generation, } } } /// A guest thread and everything needed to schedule, park, and wake it. pub struct GuestThread { pub ctx: PpcContext, pub state: HwState, pub tid: u32, pub thread_handle: Option, pub stack_base: u32, pub stack_size: u32, pub pcr_base: u32, pub tls_base: u32, /// Per-thread TLS slot values; `KeTlsGetValue/SetValue` route through /// `Scheduler::tls_{get,set}` which index this on the currently-running thread. pub tls_values: Vec, /// Suspend counter — `NtSuspendThread` increments, `NtResumeThread` /// decrements, only unblocks at zero. pub suspend_count: u32, /// NT-style priority, signed. Higher wins within a slot. Default 0. pub priority: i32, /// Set bit i = thread may run on slot i. 0 normalizes to 0xFF (any). pub affinity_mask: u8, /// Hint from `KeSetIdealProcessor`. Axis 5 honors on spawn; Axis 1 /// carries the field. pub ideal_processor: Option, /// Axis 3 instruction budget. Decremented per retired step on this /// thread; on zero, slot rotates within same-priority tier. pub quantum_remaining: u32, } impl GuestThread { fn default_fields() -> Self { Self { ctx: PpcContext::new(), state: HwState::Idle, tid: 0, thread_handle: None, stack_base: 0, stack_size: 0, pcr_base: 0, tls_base: 0, tls_values: Vec::new(), suspend_count: 0, priority: 0, affinity_mask: 0xFF, ideal_processor: None, quantum_remaining: QUANTUM_DEFAULT, } } } #[derive(Debug, Clone, PartialEq, Eq)] pub enum HwState { /// Slot slot has no running thread (used only for `HwSlot::idle_ctx`'s /// conceptual state — live threads never sit in `Idle`). Idle, Ready, Blocked(BlockReason), Exited(u32), /// Graphics-interrupt servicing state. The thread was /// `Blocked(reason)` when an IRQ was injected; we flipped it to /// `ServicingIrq(reason)` so the scheduler will run the callback, /// carrying the prior block reason for the IRQ-return path to consult. /// On return to `LR_HALT_SENTINEL` the main loop restores to /// `Blocked(reason)` — **unless** something during the callback /// (e.g. `KeSetEvent → wake`) flipped this to `Ready`, in which case /// the wait was resolved and we leave it runnable. ServicingIrq(BlockReason), } #[derive(Debug, Clone, PartialEq, Eq)] pub enum BlockReason { Suspended, WaitAny { handles: Vec, deadline: Option, }, WaitAll { handles: Vec, deadline: Option, }, DelayUntil(u64), CriticalSection(u32), } /// Sink for PCR+0x2C writes — the scheduler writes the guest-visible /// current-processor-id here at spawn and Axis 4 rewrites on affinity /// migration. Implemented by `xenia-kernel` for `GuestMemory`; keeping it /// an abstract trait avoids pulling `xenia_memory` into `xenia_cpu`. pub trait PcrWriter { fn write_pcr_id(&mut self, pcr_base: u32, hw_id: u8); } /// Per-slot runqueue + the index of the thread currently pinned-running. pub struct HwSlot { pub runqueue: Vec, pub running_idx: Option, /// Sentinel context returned by compat accessors when the slot has no /// running thread. Keeps the `ctx(hw_id)` API safe from diagnostic /// paths that run between scheduling passes. idle_ctx: PpcContext, /// Same-shape sentinel state. idle_state: HwState, } impl Default for HwSlot { fn default() -> Self { Self { runqueue: Vec::new(), running_idx: None, idle_ctx: PpcContext::new(), idle_state: HwState::Idle, } } } impl HwSlot { /// Index of the highest-priority Ready/ServicingIrq thread in this /// slot's runqueue. Tiebreak: prefer lower index (deterministic). pub fn pick_runnable(&self) -> Option { self.runqueue .iter() .enumerate() .filter(|(_, t)| matches!(t.state, HwState::Ready | HwState::ServicingIrq(_))) .max_by_key(|(i, t)| (t.priority, -(*i as i64))) .map(|(i, _)| i) } /// How many non-Exited threads currently live on this slot (used by /// placement policies). pub fn live_depth(&self) -> usize { self.runqueue .iter() .filter(|t| !matches!(t.state, HwState::Exited(_))) .count() } } #[derive(Debug, Clone, Copy)] pub enum OrderMode { Fixed, Seeded { seed: u64 }, } impl OrderMode { pub fn from_env() -> Self { match std::env::var("XENIA_SCHED_ORDER").ok().as_deref() { Some("random") | Some("Random") | Some("RANDOM") => { let seed = std::env::var("XENIA_SCHED_SEED") .ok() .and_then(|s| s.parse::().ok()) .unwrap_or(0xC0FFEE_C0FFEE); OrderMode::Seeded { seed } } _ => OrderMode::Fixed, } } } #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum RoundOutcome { Progressed, Slept, Deadlock, MainExited, } /// Parameters for `Scheduler::spawn`. The caller allocates the stack/PCR/ /// TLS blocks in guest memory first, then hands these addresses here. #[derive(Debug)] pub struct SpawnParams { pub entry: u32, pub start_context: u32, pub stack_base: u32, pub stack_size: u32, pub pcr_base: u32, pub tls_base: u32, pub thread_handle: u32, pub guest_tid: u32, pub create_suspended: bool, pub is_initial: bool, pub tls_slot_count: u32, /// Set bit i = thread may land on slot i. 0 normalizes to 0xFF. pub affinity_mask: u8, /// NT-style signed priority; default 0. pub priority: i32, /// Preferred slot; Axis 5 spawn honors if allowed by affinity mask. pub ideal_processor: Option, } impl Default for SpawnParams { fn default() -> Self { Self { entry: 0, start_context: 0, stack_base: 0, stack_size: 0, pcr_base: 0, tls_base: 0, thread_handle: 0, guest_tid: 0, create_suspended: false, is_initial: false, tls_slot_count: 0, affinity_mask: 0xFF, priority: 0, ideal_processor: None, } } } #[derive(Debug)] pub enum SpawnError { NoFreeHwThread, } /// Side information returned by `set_affinity_ref` so the kernel layer /// can walk its waiter lists and retarget any `ThreadRef`s invalidated /// by the `swap_remove` on the source slot. #[derive(Debug, Copy, Clone)] pub struct MigrationFixup { pub source_hw: u8, pub promoted_old_idx: u16, pub promoted_new_idx: u16, pub migrated_old_ref: ThreadRef, pub migrated_new_ref: ThreadRef, } impl MigrationFixup { /// Apply the fixup to a single `ThreadRef` reference. Idempotent. pub fn apply(&self, r: &mut ThreadRef) { if *r == self.migrated_old_ref { *r = self.migrated_new_ref; } else if r.hw_id == self.source_hw && r.idx == self.promoted_old_idx { r.idx = self.promoted_new_idx; } } } pub struct Scheduler { pub slots: [HwSlot; HW_THREAD_COUNT], pub round_count: u64, /// Currently-stepping thread. Set by `begin_slot_visit`, cleared by /// `end_slot_visit`. Kernel exports reach through this to learn which /// thread they're running on. pub current: Option, order: OrderMode, rng_state: u64, /// Sorted by deadline ascending. Scheduler wakes the first entry via /// `advance_to_next_wake` when a round finds nothing runnable. timed_waits: Vec<(u64, ThreadRef)>, /// Global count of TLS slots allocated — `spawn` pre-sizes new threads' /// `tls_values` to this. tls_slot_count: usize, /// Axis 2: bit i set ⇒ slot i has at least one Ready/ServicingIrq /// thread. `round_schedule` uses this to skip empty slots cheaply; /// maintained by state-mutating methods via `recompute_slot_runnable`. non_empty_runnable: u8, /// Axis 2: rolling round-robin cursor. Each `round_schedule` call /// emits slot ids starting at `(rotation_cursor + i) % 6`, then /// advances the cursor by one so the next round begins from the /// following slot. Guarantees every non-empty slot gets an equal /// share of round leads over time. rotation_cursor: u8, /// M3.7 — optional reservation table installed by the kernel after /// scheduler construction. When present, [`Self::spawn`] and /// [`Self::install_initial_thread`] populate each `PpcContext`'s /// `reservation_table` field so the interpreter's `lwarx`/`stwcx.` /// arms can route through the table. reservation_table: Option>, } impl Scheduler { /// Build a scheduler with all slots empty. Callers (usually /// `KernelState::install_initial_thread`) push the initial guest /// thread onto slot 0 before stepping. pub fn new() -> Self { let order = OrderMode::from_env(); let rng_state = match order { OrderMode::Fixed => 0, OrderMode::Seeded { seed } => seed.max(1), }; Scheduler { slots: std::array::from_fn(|_| HwSlot::default()), round_count: 0, current: None, order, rng_state, timed_waits: Vec::new(), tls_slot_count: 0, non_empty_runnable: 0, rotation_cursor: 0, reservation_table: None, } } /// M3.7 — install a shared reservation table. Subsequent /// `spawn`/`install_initial_thread` calls will populate each /// `PpcContext::reservation_table` with a clone. Idempotent; /// passing `None` clears the binding (existing threads keep their /// previously-cloned Arcs). pub fn set_reservation_table( &mut self, table: Option>, ) { self.reservation_table = table; } /// Recompute the runnable bit for one slot. Cheap — scans the slot's /// runqueue once. Call at the tail of any method that may change /// whether the slot has a Ready/ServicingIrq member. fn recompute_slot_runnable(&mut self, hw_id: u8) { let any = self.slots[hw_id as usize] .runqueue .iter() .any(|t| matches!(t.state, HwState::Ready | HwState::ServicingIrq(_))); if any { self.non_empty_runnable |= 1 << hw_id; } else { self.non_empty_runnable &= !(1 << hw_id); } } // ----- Compat accessors (preserve the pre-Axis-1 hw_threads[i].ctx pattern) ----- /// Read-only context of the currently-running thread on `hw_id`. pub fn ctx(&self, hw_id: u8) -> &PpcContext { let slot = &self.slots[hw_id as usize]; match slot.running_idx { Some(i) if i < slot.runqueue.len() => &slot.runqueue[i].ctx, _ => &slot.idle_ctx, } } /// Mutable context of the currently-running thread on `hw_id`. pub fn ctx_mut(&mut self, hw_id: u8) -> &mut PpcContext { let slot = &mut self.slots[hw_id as usize]; match slot.running_idx { Some(i) if i < slot.runqueue.len() => &mut slot.runqueue[i].ctx, _ => &mut slot.idle_ctx, } } /// Mutable context addressed by `ThreadRef` — bypasses `running_idx` /// so callers (deadlock-recovery, `call_export` return, Axis 4 /// migration) can touch a specific thread even when it isn't the one /// the slot has pinned. pub fn ctx_mut_ref(&mut self, r: ThreadRef) -> &mut PpcContext { &mut self.slots[r.hw_id as usize].runqueue[r.idx as usize].ctx } pub fn state(&self, hw_id: u8) -> &HwState { let slot = &self.slots[hw_id as usize]; match slot.running_idx { Some(i) if i < slot.runqueue.len() => &slot.runqueue[i].state, _ => &slot.idle_state, } } pub fn state_mut(&mut self, hw_id: u8) -> &mut HwState { let slot = &mut self.slots[hw_id as usize]; match slot.running_idx { Some(i) if i < slot.runqueue.len() => &mut slot.runqueue[i].state, _ => &mut slot.idle_state, } } pub fn tid(&self, hw_id: u8) -> Option { let slot = &self.slots[hw_id as usize]; slot.running_idx.and_then(|i| slot.runqueue.get(i).map(|t| t.tid)) } pub fn thread_handle(&self, hw_id: u8) -> Option { let slot = &self.slots[hw_id as usize]; slot.running_idx .and_then(|i| slot.runqueue.get(i).and_then(|t| t.thread_handle)) } pub fn tls_values(&self, hw_id: u8) -> Option<&Vec> { let slot = &self.slots[hw_id as usize]; slot.running_idx.and_then(|i| slot.runqueue.get(i).map(|t| &t.tls_values)) } pub fn suspend_count_mut(&mut self, hw_id: u8) -> Option<&mut u32> { let slot = &mut self.slots[hw_id as usize]; match slot.running_idx { Some(i) if i < slot.runqueue.len() => Some(&mut slot.runqueue[i].suspend_count), _ => None, } } /// Compat: most pre-Axis-1 code reaches for `current_hw_id` as an /// `Option`. We keep it as a method that derives from `current`. #[inline] pub fn current_hw_id(&self) -> Option { self.current.map(|r| r.hw_id) } /// Panics if called outside a step. #[inline] pub fn current(&self) -> u8 { self.current.expect("no current thread").hw_id } /// Panics if called outside a step. #[inline] pub fn current_ref(&self) -> ThreadRef { self.current.expect("no current thread") } // ----- Guest-thread lookup ----- /// Find the `ThreadRef` of the (non-Exited) thread with `tid`. pub fn find_by_tid(&self, tid: u32) -> Option { for (hw_id, slot) in self.slots.iter().enumerate() { for (idx, t) in slot.runqueue.iter().enumerate() { if t.tid == tid && !matches!(t.state, HwState::Exited(_)) { return Some(ThreadRef::new(hw_id as u8, idx as u16)); } } } None } /// Find the `ThreadRef` of the (non-Exited) thread with `thread_handle`. pub fn find_by_handle(&self, handle: u32) -> Option { for (hw_id, slot) in self.slots.iter().enumerate() { for (idx, t) in slot.runqueue.iter().enumerate() { if t.thread_handle == Some(handle) && !matches!(t.state, HwState::Exited(_)) { return Some(ThreadRef::new(hw_id as u8, idx as u16)); } } } None } /// Thread pointer addressed by ThreadRef. Panics if the ref is out of /// bounds — only call with refs sourced from a live scheduler lookup /// (`find_by_*`, `current`). pub fn thread(&self, r: ThreadRef) -> &GuestThread { &self.slots[r.hw_id as usize].runqueue[r.idx as usize] } pub fn thread_mut(&mut self, r: ThreadRef) -> &mut GuestThread { &mut self.slots[r.hw_id as usize].runqueue[r.idx as usize] } /// Bounds-checked variant for code paths that accept potentially-stale /// refs from external storage (waiter lists that may survive a slot /// compaction, test fixtures). Returns None on out-of-bounds. pub fn try_thread_mut(&mut self, r: ThreadRef) -> Option<&mut GuestThread> { self.slots .get_mut(r.hw_id as usize) .and_then(|slot| slot.runqueue.get_mut(r.idx as usize)) } // ----- Spawn ----- /// Install a new guest thread on an affinity-permitted slot with the /// lowest live depth. Writes `PCR+0x2C = hw_id` via `mem`. Returns /// the assigned `hw_id`. /// /// Initial threads land on slot 0 (hardware convention). pub fn spawn( &mut self, params: SpawnParams, mem: &mut W, ) -> Result { let mask = if params.affinity_mask == 0 { 0xFF } else { params.affinity_mask }; // Axis 5: placement order — initial always slot 0; explicit // ideal (if the mask allows it) wins; otherwise least-depth // among mask-allowed slots. let slot_id: u8 = if params.is_initial { 0 } else if let Some(ideal) = params.ideal_processor && (mask & (1u8 << ideal)) != 0 { ideal } else { self.pick_least_depth_slot(mask) .ok_or(SpawnError::NoFreeHwThread)? }; // Compact Exited entries if this slot is approaching saturation. // Only safe to do when no ThreadRef outside of the scheduler is // currently held to exited entries on this slot — kernel waiter // lists drop refs when wake fires, and Exited threads are never // picked for stepping, so compaction is a no-op for live peers. self.prune_exited_if_needed(slot_id); let mut t = GuestThread::default_fields(); t.ctx.pc = params.entry; let sp_top = (params.stack_base as u64).saturating_add(params.stack_size as u64); t.ctx.gpr[1] = sp_top.saturating_sub(0x100) & !0xFu64; t.ctx.gpr[2] = 0x2000_0000; t.ctx.gpr[3] = params.start_context as u64; t.ctx.gpr[13] = params.pcr_base as u64; t.ctx.msr = 0x9030; t.ctx.thread_id = params.guest_tid; t.tid = params.guest_tid; t.thread_handle = Some(params.thread_handle); t.state = if params.create_suspended { HwState::Blocked(BlockReason::Suspended) } else { HwState::Ready }; t.stack_base = params.stack_base; t.stack_size = params.stack_size; t.pcr_base = params.pcr_base; t.tls_base = params.tls_base; let tls_count = params.tls_slot_count as usize; let tls_count = tls_count.max(self.tls_slot_count); t.tls_values = vec![0; tls_count]; t.suspend_count = if params.create_suspended { 1 } else { 0 }; t.priority = params.priority; t.affinity_mask = mask; t.ideal_processor = params.ideal_processor; // M3.7 — populate the inter-thread reservation handle + slot id // so the interpreter can route lwarx/stwcx through the table. t.ctx.hw_id = slot_id; t.ctx.reservation_table = self.reservation_table.clone(); self.slots[slot_id as usize].runqueue.push(t); mem.write_pcr_id(params.pcr_base, slot_id); self.recompute_slot_runnable(slot_id); tracing::info!( "spawn: tid={} on hw={} entry={:#010x} start_ctx={:#010x} suspended={} pri={} mask={:#04x}", params.guest_tid, slot_id, params.entry, params.start_context, params.create_suspended, params.priority, mask, ); Ok(slot_id) } /// Install the initial (module-entry) guest thread on slot 0 with an /// externally-prepared register file. Unlike `spawn`, this does not /// reset ctx — the app has already set up MSR, r1/r13/etc. for the /// XEX bootstrap. pub fn install_initial_thread( &mut self, ctx: PpcContext, stack_base: u32, stack_size: u32, pcr_base: u32, tls_base: u32, thread_handle: u32, mem: &mut W, ) { let mut t = GuestThread::default_fields(); t.ctx = ctx; // M3.7 — initial thread on slot 0; same wiring as `spawn`. t.ctx.hw_id = 0; t.ctx.reservation_table = self.reservation_table.clone(); t.state = HwState::Ready; t.tid = INITIAL_GUEST_TID; t.thread_handle = Some(thread_handle); t.stack_base = stack_base; t.stack_size = stack_size; t.pcr_base = pcr_base; t.tls_base = tls_base; t.tls_values = vec![0; self.tls_slot_count]; self.slots[0].runqueue.push(t); mem.write_pcr_id(pcr_base, 0); self.recompute_slot_runnable(0); } /// Pick the slot with the smallest `live_depth` whose bit is set in /// `mask`. Returns `None` only when `mask == 0` (malformed). pub fn pick_least_depth_slot(&self, mask: u8) -> Option { if mask == 0 { return None; } (0..HW_THREAD_COUNT as u8) .filter(|i| mask & (1 << i) != 0) .min_by_key(|i| self.slots[*i as usize].live_depth()) } /// Remove `Exited` entries from `slot_id`'s runqueue, but only when the /// runqueue is deep enough that compaction is worthwhile. Because /// `swap_remove` shifts indices, this is the only legal way to drop /// entries — and it can invalidate outstanding `ThreadRef`s to the /// affected slot. Callers are responsible for ensuring no live waiter /// lists hold refs into exited entries (they don't, because waiter /// wakeup always removes the ref and sets state to Ready before the /// thread can exit again). fn prune_exited_if_needed(&mut self, slot_id: u8) { let slot = &mut self.slots[slot_id as usize]; if slot.runqueue.len() < PRUNE_DEPTH_THRESHOLD { return; } slot.runqueue .retain(|t| !matches!(t.state, HwState::Exited(_))); // running_idx may now be stale. Since we only prune at spawn time // (not mid-round), and round boundaries re-pick running_idx via // begin_slot_visit, clearing is safe. slot.running_idx = None; self.recompute_slot_runnable(slot_id); } // ----- Round scheduling ----- /// Axis 2: emit slot ids with at least one runnable thread, starting /// from `rotation_cursor` and cycling forward. `non_empty_runnable` is /// the fast path — zero bits mean no slot has work and the caller /// falls through to `advance_to_next_wake`. pub fn round_schedule(&mut self) -> Vec { if self.non_empty_runnable == 0 { return Vec::new(); } let start = self.rotation_cursor as usize; let mut out: Vec = Vec::with_capacity(HW_THREAD_COUNT); for off in 0..HW_THREAD_COUNT { let i = (start + off) % HW_THREAD_COUNT; if self.non_empty_runnable & (1 << i) != 0 { out.push(i as u8); } } // Seeded mode layers a deterministic shuffle on top of the // already-filtered list. Same spawn/wake sequence + same seed ⇒ // same schedule (invariant preserved from pre-Axis-1). if let OrderMode::Seeded { .. } = self.order { for i in (1..out.len()).rev() { self.rng_state ^= self.rng_state << 13; self.rng_state ^= self.rng_state >> 7; self.rng_state ^= self.rng_state << 17; let j = (self.rng_state as usize) % (i + 1); out.swap(i, j); } } self.rotation_cursor = ((start + 1) % HW_THREAD_COUNT) as u8; out } pub fn begin_round(&mut self) { self.round_count += 1; } /// Called by the step loop at the top of each per-slot visit. Picks the /// highest-priority Ready thread on the slot, sets `running_idx`, and /// stashes `self.current` so exports can reach it. pub fn begin_slot_visit(&mut self, hw_id: u8) { let slot = &mut self.slots[hw_id as usize]; slot.running_idx = slot.pick_runnable(); self.current = slot .running_idx .map(|idx| ThreadRef::new(hw_id, idx as u16)); } /// Clear `current` at the end of each per-slot visit. pub fn end_slot_visit(&mut self) { self.current = None; } /// Axis 3: decrement the currently-running thread's instruction /// quantum. On reach-zero, reload to `QUANTUM_DEFAULT` and rotate /// `running_idx` to the next Ready thread on this slot that sits in /// the same priority tier (hand-off preserves priority ordering). /// The flip is observed by the *next* round's `begin_slot_visit` — /// the step that just completed has already returned, so there's no /// mid-instruction preemption hazard. /// /// Returns `true` if a rotation occurred (purely informational; /// callers don't need to act on it). pub fn decrement_quantum(&mut self) -> bool { let Some(r) = self.current else { return false; }; let slot = &mut self.slots[r.hw_id as usize]; let Some(t) = slot.runqueue.get_mut(r.idx as usize) else { return false; }; if t.quantum_remaining > 0 { t.quantum_remaining -= 1; } if t.quantum_remaining != 0 { return false; } let my_pri = t.priority; t.quantum_remaining = QUANTUM_DEFAULT; // Scan the rest of the runqueue for a same-priority Ready peer. // Priority-higher peers are already going to win the next // `pick_runnable` on this slot, so we only need to find an *equal* // priority peer to enforce fair rotation within the tier. let len = slot.runqueue.len(); if len < 2 { return false; } let start = (r.idx as usize + 1) % len; for off in 0..len { let i = (start + off) % len; if i == r.idx as usize { continue; } let cand = &slot.runqueue[i]; if cand.priority == my_pri && matches!(cand.state, HwState::Ready) { slot.running_idx = Some(i); self.current = Some(ThreadRef::new(r.hw_id, i as u16)); return true; } } false } // ----- Park / wake / exit ----- pub fn park_current(&mut self, reason: BlockReason) { let r = self .current .expect("park_current called outside a step"); let deadline = match &reason { BlockReason::WaitAny { deadline, .. } | BlockReason::WaitAll { deadline, .. } => { *deadline } BlockReason::DelayUntil(d) => Some(*d), _ => None, }; if let Some(d) = deadline { self.timed_waits.push((d, r)); self.timed_waits.sort_by_key(|&(d, _)| d); } self.thread_mut(r).state = HwState::Blocked(reason); self.recompute_slot_runnable(r.hw_id); } /// Wake a specific thread (must be Blocked or ServicingIrq). Silently /// no-ops on out-of-bounds refs — waiter lists are positional and may /// outlive their target after a slot compaction; in debug builds we /// warn so regressions of this class surface during development. pub fn wake_ref(&mut self, r: ThreadRef) { let Some(slot) = self.slots.get_mut(r.hw_id as usize) else { debug_assert!(false, "wake_ref: hw_id out of bounds: {:?}", r); return; }; let Some(t) = slot.runqueue.get_mut(r.idx as usize) else { // Stale waiter ref — expected under normal operation when a // waiter was enqueued from a test fixture or survived a slot // compaction. Warn in debug builds. #[cfg(debug_assertions)] tracing::debug!("wake_ref: idx out of bounds: {:?}", r); return; }; match &t.state { HwState::Blocked(_) | HwState::ServicingIrq(_) => {} _ => return, } t.state = HwState::Ready; t.quantum_remaining = QUANTUM_DEFAULT; self.timed_waits.retain(|&(_, tr)| tr != r); self.recompute_slot_runnable(r.hw_id); } /// Axis-4-friendly variant: look up the thread holding `handle` and wake it. pub fn wake_by_handle(&mut self, handle: u32) -> Option { let r = self.find_by_handle(handle)?; self.wake_ref(r); Some(r) } /// Decrement suspend count on target; if it reaches 0, unblock. /// Returns previous count. pub fn resume_ref(&mut self, r: ThreadRef) -> u32 { let t = &mut self.slots[r.hw_id as usize].runqueue[r.idx as usize]; let prev = t.suspend_count; if t.suspend_count > 0 { t.suspend_count -= 1; } if t.suspend_count == 0 && matches!(t.state, HwState::Blocked(BlockReason::Suspended)) { t.state = HwState::Ready; t.quantum_remaining = QUANTUM_DEFAULT; } self.recompute_slot_runnable(r.hw_id); prev } pub fn suspend_ref(&mut self, r: ThreadRef) -> u32 { let t = &mut self.slots[r.hw_id as usize].runqueue[r.idx as usize]; let prev = t.suspend_count; t.suspend_count += 1; if matches!(t.state, HwState::Ready) { t.state = HwState::Blocked(BlockReason::Suspended); } self.recompute_slot_runnable(r.hw_id); prev } /// Set base priority; returns prior value. pub fn set_priority_ref(&mut self, r: ThreadRef, priority: i32) -> i32 { let t = self.thread_mut(r); let prev = t.priority; t.priority = priority; prev } pub fn priority_ref(&self, r: ThreadRef) -> i32 { self.thread(r).priority } /// Axis 5: `KeSetIdealProcessor` — store the hint (does NOT migrate /// a live thread; purely advisory for subsequent wake decisions, /// which our cooperative scheduler doesn't currently consult — and /// as spawn placement for any newly-created sibling threads). /// Returns previous ideal (or 0xFF if unset). pub fn set_ideal_ref(&mut self, r: ThreadRef, ideal: u8) -> u8 { let t = self.thread_mut(r); let prev = t.ideal_processor.unwrap_or(0xFF); t.ideal_processor = Some(ideal); prev } pub fn ideal_ref(&self, r: ThreadRef) -> Option { self.thread(r).ideal_processor } /// Axis 4: Set the affinity mask on `r` and migrate between slot /// runqueues if the current slot is no longer allowed by the mask. /// Returns `(old_mask, new_ref, migration_info)`. `migration_info` is /// `None` when no migration happened, `Some((src_promoted_old_idx, /// src_promoted_new_idx))` when `swap_remove` moved a peer into the /// migrated thread's slot — the caller must walk external waiter /// containers and retarget any ref matching the promoted-old slot. /// /// `mask == 0` normalizes to `0xFF` (Canary parity: early Xbox code /// sometimes passes 0 meaning "any"). pub fn set_affinity_ref( &mut self, r: ThreadRef, new_mask: u8, mem: &mut W, ) -> (u8, ThreadRef, Option) { let old_mask = self.thread(r).affinity_mask; let effective = if new_mask == 0 { 0xFF } else { new_mask }; self.thread_mut(r).affinity_mask = new_mask; // Current slot still allowed → no migration. if effective & (1 << r.hw_id) != 0 { return (old_mask, r, None); } // Pick target = least-depth allowed slot. let target = self .pick_least_depth_slot(effective) .expect("set_affinity_ref: effective mask must allow some slot"); // Physically move the GuestThread struct. let src_len_before = self.slots[r.hw_id as usize].runqueue.len(); let promoted_old_idx = (src_len_before - 1) as u16; let mut thread = self.slots[r.hw_id as usize] .runqueue .swap_remove(r.idx as usize); mem.write_pcr_id(thread.pcr_base, target); // M3.7 — keep ctx.hw_id in sync with the thread's new slot so // table-routed lwarx/stwcx use the correct discriminator. thread.ctx.hw_id = target; self.slots[target as usize].runqueue.push(thread); let new_idx = (self.slots[target as usize].runqueue.len() - 1) as u16; let new_ref = ThreadRef::new(target, new_idx); // Timed waits: rewrite r → new_ref if present. for entry in self.timed_waits.iter_mut() { if entry.1 == r { entry.1 = new_ref; } else if entry.1 == ThreadRef::new(r.hw_id, promoted_old_idx) { entry.1 = ThreadRef::new(r.hw_id, r.idx); } } // Running index defense: if src slot's running_idx pointed at the // migrated entry or the promoted peer, clear / retarget. let src_slot = &mut self.slots[r.hw_id as usize]; if src_slot.running_idx == Some(r.idx as usize) { src_slot.running_idx = None; } else if src_slot.running_idx == Some(promoted_old_idx as usize) { src_slot.running_idx = Some(r.idx as usize); } self.recompute_slot_runnable(r.hw_id); self.recompute_slot_runnable(target); // If the migrating thread was the currently-running one (self- // migrating export call), update `self.current` so `call_export`'s // stashed ThreadRef still resolves on its swap-back path. if self.current == Some(r) { self.current = Some(new_ref); } else if self.current == Some(ThreadRef::new(r.hw_id, promoted_old_idx)) { self.current = Some(ThreadRef::new(r.hw_id, r.idx)); } // Emit promotion info only if the last-index of the source // wasn't the migrating thread itself (otherwise swap_remove was // a plain pop and no peer got promoted). let fixup = if promoted_old_idx != r.idx { Some(MigrationFixup { source_hw: r.hw_id, promoted_old_idx, promoted_new_idx: r.idx, migrated_old_ref: r, migrated_new_ref: new_ref, }) } else { Some(MigrationFixup { source_hw: r.hw_id, promoted_old_idx: r.idx, // no-op promotion promoted_new_idx: r.idx, migrated_old_ref: r, migrated_new_ref: new_ref, }) }; (old_mask, new_ref, fixup) } /// Mark the current thread exited. Returns (hw_id, tid, handle) of /// the exiting thread so the caller can wake joiners. pub fn exit_current(&mut self, exit_code: u32) -> (u8, Option, Option) { let r = self.current.expect("exit_current outside step"); let t = &mut self.slots[r.hw_id as usize].runqueue[r.idx as usize]; let tid = Some(t.tid); let handle = t.thread_handle; t.state = HwState::Exited(exit_code); self.timed_waits.retain(|&(_, tr)| tr != r); self.recompute_slot_runnable(r.hw_id); (r.hw_id, tid, handle) } // ----- TLS ----- /// Allocate a new global TLS slot index. All live threads' `tls_values` /// vecs grow to match. pub fn tls_alloc(&mut self) -> u32 { let idx = self.tls_slot_count as u32; self.tls_slot_count += 1; for slot in self.slots.iter_mut() { for t in slot.runqueue.iter_mut() { if t.tls_values.len() < self.tls_slot_count { t.tls_values.resize(self.tls_slot_count, 0); } } } idx } /// Compat: caller asks for a specific capacity (e.g. spawn's /// `tls_slot_count`). Grows every thread's tls_values up to `count`. pub fn tls_grow_to(&mut self, count: usize) { if count > self.tls_slot_count { self.tls_slot_count = count; } for slot in self.slots.iter_mut() { for t in slot.runqueue.iter_mut() { if t.tls_values.len() < count { t.tls_values.resize(count, 0); } } } } pub fn tls_get(&self, slot_idx: u32) -> u64 { let r = match self.current { Some(r) => r, None => return 0, }; self.slots[r.hw_id as usize].runqueue[r.idx as usize] .tls_values .get(slot_idx as usize) .copied() .unwrap_or(0) } pub fn tls_set(&mut self, slot_idx: u32, value: u64) { let Some(r) = self.current else { return; }; let t = &mut self.slots[r.hw_id as usize].runqueue[r.idx as usize]; let i = slot_idx as usize; if t.tls_values.len() <= i { t.tls_values.resize(i + 1, 0); } t.tls_values[i] = value; } // ----- Time advance / deadlock ----- /// Peek the earliest pending timed-wait deadline without popping. The /// kernel uses this together with `KernelState::earliest_timer_deadline` /// to compute the next global time step in the scheduler round. pub fn earliest_wait_deadline(&self) -> Option { self.timed_waits.first().map(|&(d, _)| d) } /// Move every thread's timebase up to `deadline` (if already past, /// leave it alone). Extracted from the old `advance_to_next_wake` /// body so the kernel can drive time-advance for timer fires in /// addition to thread wakes. pub fn advance_all_timebases_to(&mut self, deadline: u64) { for slot in self.slots.iter_mut() { for t in slot.runqueue.iter_mut() { if t.ctx.timebase < deadline { t.ctx.timebase = deadline; } } } } /// Fast-forward the timebase to the earliest pending timed wait and /// wake that sleeper. Used when a round had no Ready threads and no /// timer fires closer than the earliest wait. Returns the woken /// thread's `ThreadRef` + the `BlockReason` it was parked with, so /// the caller can stamp `STATUS_TIMEOUT` and scrub stale waiter-list /// entries via `KernelState::handle_timeout_wake`. `None` means the /// timed-waits queue was empty. pub fn advance_to_next_wake(&mut self) -> Option<(ThreadRef, BlockReason)> { let (deadline, r) = *self.timed_waits.first()?; self.advance_all_timebases_to(deadline); self.timed_waits.remove(0); let t = &mut self.slots[r.hw_id as usize].runqueue[r.idx as usize]; let reason = match std::mem::replace(&mut t.state, HwState::Ready) { HwState::Blocked(reason) | HwState::ServicingIrq(reason) => reason, other => { // Defensive: the timed_waits entry should only ever track a // Blocked or ServicingIrq thread, but if some path already // woke this ref we keep going with a stand-in reason so the // caller can't miss a timeout-wake follow-up. tracing::debug!( hw_id = r.hw_id, idx = r.idx, state = ?other, "advance_to_next_wake: unexpected prior state (ignored)" ); BlockReason::Suspended } }; t.quantum_remaining = QUANTUM_DEFAULT; self.recompute_slot_runnable(r.hw_id); tracing::info!( "scheduler: advanced to deadline {} waking hw={} idx={}", deadline, r.hw_id, r.idx ); Some((r, reason)) } /// Pop the earliest timed wait only if its deadline is `<= target`. /// Used by the kernel-driven scheduler loop to consume a just-ripe /// thread wake after timers fired to that same `target`. If the /// earliest entry has a later deadline (some other event drove /// advance), returns `None` and leaves the entry in place. pub fn advance_to_next_wake_if_due( &mut self, target: u64, ) -> Option<(ThreadRef, BlockReason)> { let (d, _) = *self.timed_waits.first()?; if d > target { return None; } self.advance_to_next_wake() } /// Does any thread across any slot exist in a state other than /// Exited/Idle? pub fn has_live_thread(&self) -> bool { self.slots.iter().any(|slot| { slot.runqueue.iter().any(|t| { matches!( t.state, HwState::Ready | HwState::Blocked(_) | HwState::ServicingIrq(_) ) }) }) } /// Snapshot thread states for diagnostic logging. One entry per live /// guest thread (Exited are included so post-mortem can see exit codes). pub fn diagnostic_snapshot(&self) -> Vec<(ThreadRef, Option, HwState)> { let mut out = Vec::new(); for (hw_id, slot) in self.slots.iter().enumerate() { for (idx, t) in slot.runqueue.iter().enumerate() { out.push(( ThreadRef::new(hw_id as u8, idx as u16), Some(t.tid), t.state.clone(), )); } } out } /// Force-wake every Blocked waiter (WaitAny/WaitAll/CriticalSection) /// with STATUS_TIMEOUT. Caller writes the status code into /// `ctx_mut_ref(r).gpr[3]`. Returns the refs that were woken. pub fn unblock_on_deadlock(&mut self) -> Vec { let mut woken = Vec::new(); for (hw_id, slot) in self.slots.iter_mut().enumerate() { for (idx, t) in slot.runqueue.iter_mut().enumerate() { if matches!( t.state, HwState::Blocked(BlockReason::WaitAny { .. }) | HwState::Blocked(BlockReason::WaitAll { .. }) | HwState::Blocked(BlockReason::CriticalSection(_)) ) { t.state = HwState::Ready; t.quantum_remaining = QUANTUM_DEFAULT; woken.push(ThreadRef::new(hw_id as u8, idx as u16)); } } } self.timed_waits.clear(); for i in 0..HW_THREAD_COUNT as u8 { self.recompute_slot_runnable(i); } woken } } impl Default for Scheduler { fn default() -> Self { Self::new() } } // ====== Tests ====== #[cfg(test)] mod tests { use super::*; /// No-op PcrWriter for unit tests that don't exercise the guest memory write. #[derive(Default)] struct NullPcr; impl PcrWriter for NullPcr { fn write_pcr_id(&mut self, _pcr_base: u32, _hw_id: u8) {} } /// PcrWriter that records every write for assertion. #[derive(Default)] struct RecordingPcr { writes: Vec<(u32, u8)>, } impl PcrWriter for RecordingPcr { fn write_pcr_id(&mut self, pcr_base: u32, hw_id: u8) { self.writes.push((pcr_base, hw_id)); } } fn mk_scheduler_with_initial() -> Scheduler { let mut s = Scheduler::new(); let mut ctx = PpcContext::new(); ctx.pc = 0x8200_0000; ctx.gpr[1] = 0x7000_0000; s.install_initial_thread( ctx, 0x7000_0000, 0x10_0000, 0x7FFF_0000, 0x7FFE_0000, 0x1000, &mut NullPcr, ); s } fn worker_spawn_params(tid: u32, handle: u32) -> SpawnParams { SpawnParams { entry: 0x8200_1000, start_context: 0xDEAD_BEEF, stack_base: 0x7100_0000 + tid * 0x10_0000, stack_size: 0x10_0000, pcr_base: 0x7FEF_0000 + tid * 0x2000, tls_base: 0x7FEE_0000 + tid * 0x2000, thread_handle: handle, guest_tid: tid, create_suspended: false, is_initial: false, tls_slot_count: 0, affinity_mask: 0xFF, priority: 0, ideal_processor: None, } } // ---- preserved from pre-Axis-1 (updated names and params) ---- #[test] fn spawn_lands_on_least_depth_slot() { // With only slot 0 occupied, the next spawn must go to slot 1 // (least depth among 1..5, all zero; 0 < 1). let mut s = mk_scheduler_with_initial(); let slot = s .spawn(worker_spawn_params(2, 0x2000), &mut NullPcr) .unwrap(); assert_eq!(slot, 1); let thread = &s.slots[1].runqueue[0]; assert_eq!(thread.state, HwState::Ready); assert_eq!(thread.ctx.pc, 0x8200_1000); assert_eq!(thread.ctx.gpr[3], 0xDEAD_BEEF); } #[test] fn suspended_spawn_stays_blocked_until_resume() { let mut s = mk_scheduler_with_initial(); let mut params = worker_spawn_params(2, 0x2000); params.create_suspended = true; let slot = s.spawn(params, &mut NullPcr).unwrap(); let r = ThreadRef::new(slot, 0); assert_eq!( s.thread(r).state, HwState::Blocked(BlockReason::Suspended) ); assert_eq!(s.thread(r).suspend_count, 1); let prev = s.resume_ref(r); assert_eq!(prev, 1); assert_eq!(s.thread(r).state, HwState::Ready); } #[test] fn round_schedule_skips_blocked() { let mut s = mk_scheduler_with_initial(); let mut params = worker_spawn_params(2, 0x2000); params.create_suspended = true; s.spawn(params, &mut NullPcr).unwrap(); // Initial thread (slot 0) is Ready. Spawned thread (slot 1) is // Suspended. round_schedule should only list slot 0. let order = s.round_schedule(); assert_eq!(order, vec![0]); } #[test] fn seeded_order_is_deterministic() { let order = OrderMode::Seeded { seed: 42 }; let mut s1 = mk_scheduler_with_initial(); let mut s2 = mk_scheduler_with_initial(); s1.order = order; s1.rng_state = 42; s2.order = order; s2.rng_state = 42; for i in 0..5 { let tid = 2 + i as u32; let _ = s1.spawn(worker_spawn_params(tid, 0x2000 + i * 4), &mut NullPcr); let _ = s2.spawn(worker_spawn_params(tid, 0x2000 + i * 4), &mut NullPcr); } let a = s1.round_schedule(); let b = s2.round_schedule(); assert_eq!(a, b); } #[test] fn tls_is_per_thread() { let mut s = mk_scheduler_with_initial(); s.spawn(worker_spawn_params(2, 0x2000), &mut NullPcr).unwrap(); s.tls_grow_to(4); // Simulate running on slot 0 (initial thread) s.begin_slot_visit(0); s.tls_set(0, 0xAAAA); s.end_slot_visit(); // Simulate running on slot 1 (worker) s.begin_slot_visit(1); s.tls_set(0, 0xBBBB); s.end_slot_visit(); s.begin_slot_visit(0); assert_eq!(s.tls_get(0), 0xAAAA); s.end_slot_visit(); s.begin_slot_visit(1); assert_eq!(s.tls_get(0), 0xBBBB); } // ---- new Axis-1 tests ---- #[test] fn test_two_threads_same_slot_higher_priority_runs_first() { let mut s = mk_scheduler_with_initial(); // Force both workers onto slot 0 via affinity. let mut a = worker_spawn_params(2, 0x2000); a.affinity_mask = 0b0000_0001; a.priority = 0; let mut b = worker_spawn_params(3, 0x3000); b.affinity_mask = 0b0000_0001; b.priority = 5; s.spawn(a, &mut NullPcr).unwrap(); s.spawn(b, &mut NullPcr).unwrap(); // Slot 0 now holds: [main(pri 0), worker2(pri 0), worker3(pri 5)] s.begin_slot_visit(0); let r = s.current.expect("current set"); let t = s.thread(r); assert_eq!(t.tid, 3, "worker3 (pri 5) wins the pick"); assert_eq!(t.priority, 5); s.end_slot_visit(); } #[test] fn test_slot_depth_accounting_least_depth_placement() { // Initial thread sits on slot 0 (depth 1, others 0). Spawning 6 // more threads with affinity 0xFF should fill slots 1..5 each to // depth 1, then the 7th lands on whichever slot has depth 1 // (ties broken by lower index — min_by_key preserves the first // minimum). let mut s = mk_scheduler_with_initial(); let mut placements = Vec::new(); for i in 0..6 { let tid = 2 + i as u32; let slot = s .spawn(worker_spawn_params(tid, 0x2000 + i * 4), &mut NullPcr) .unwrap(); placements.push(slot); } // After 7 total threads (1 initial + 6 workers), one of slots 1..5 // carries 2. Since min_by_key picks the *first* minimum at each // step and slot 0 starts at depth 1 (initial), placements should // go: [1, 2, 3, 4, 5, 1] (slot 0 starts at depth 1, others at 0, // so slot 1 wins first with depth 0; once slot 1 has one, slots // 2..5 still have 0, so slot 2 next; etc. On the 6th worker all // slots 1..5 have depth 1, same as slot 0 — min_by_key returns // slot 0? No: we skip the "current depth" comparison... actually // our filter includes slot 0 too since mask=0xFF. Slot 0 has // depth 1, slots 1..5 each have depth 1 after the first 5 // workers. The 6th worker sees slots 0..5 all with depth 1 ⇒ // min_by_key returns slot 0 (lowest index). So placements = // [1, 2, 3, 4, 5, 0]. assert_eq!(placements, vec![1, 2, 3, 4, 5, 0]); } #[test] fn test_exited_threads_dont_block_spawn() { let mut s = mk_scheduler_with_initial(); // Fill slot 1 to the prune threshold with exited threads. for i in 0..PRUNE_DEPTH_THRESHOLD { let tid = 10 + i as u32; let mut p = worker_spawn_params(tid, 0x4000 + i as u32 * 4); p.affinity_mask = 0b0000_0010; // only slot 1 s.spawn(p, &mut NullPcr).unwrap(); } assert_eq!(s.slots[1].runqueue.len(), PRUNE_DEPTH_THRESHOLD); // Mark them all Exited. for t in s.slots[1].runqueue.iter_mut() { t.state = HwState::Exited(0); } // Now spawn a fresh thread with affinity = slot 1 only. Should // land successfully (prune kicks in at PRUNE_DEPTH_THRESHOLD). let mut p = worker_spawn_params(99, 0x9000); p.affinity_mask = 0b0000_0010; let slot = s.spawn(p, &mut NullPcr).unwrap(); assert_eq!(slot, 1); // Post-prune + push: all-Exited entries gone, fresh thread at idx 0. assert_eq!(s.slots[1].runqueue.len(), 1); assert_eq!(s.slots[1].runqueue[0].tid, 99); } #[test] fn test_threadref_survives_spawn() { // Peer spawned into the same slot must not shift an existing // ThreadRef (vec push appends, doesn't reorder). let mut s = mk_scheduler_with_initial(); let mut a = worker_spawn_params(2, 0x2000); a.affinity_mask = 0b0000_0010; // slot 1 s.spawn(a, &mut NullPcr).unwrap(); let r_original = ThreadRef { hw_id: 1, idx: 0, generation: 0 }; assert_eq!(s.thread(r_original).tid, 2); let mut b = worker_spawn_params(3, 0x3000); b.affinity_mask = 0b0000_0010; s.spawn(b, &mut NullPcr).unwrap(); // Original ref still resolves to tid 2. assert_eq!(s.thread(r_original).tid, 2); assert_eq!(s.slots[1].runqueue[1].tid, 3); } #[test] fn test_priority_default_zero() { let mut s = mk_scheduler_with_initial(); let slot = s .spawn(worker_spawn_params(2, 0x2000), &mut NullPcr) .unwrap(); let r = ThreadRef::new(slot, 0); assert_eq!(s.priority_ref(r), 0); let prev = s.set_priority_ref(r, 5); assert_eq!(prev, 0); assert_eq!(s.priority_ref(r), 5); } #[test] fn test_spawn_records_pcr_write() { let mut s = mk_scheduler_with_initial(); let mut rec = RecordingPcr::default(); // install_initial wrote (pcr_base=0x7FFF_0000, hw=0) // spawn will write (pcr_base=0x7FEF_0000 + delta, hw=1) let p = worker_spawn_params(2, 0x2000); let pcr_base = p.pcr_base; let slot = s.spawn(p, &mut rec).unwrap(); assert_eq!(rec.writes, vec![(pcr_base, slot)]); } #[test] fn test_find_by_tid_returns_threadref() { let mut s = mk_scheduler_with_initial(); s.spawn(worker_spawn_params(2, 0x2000), &mut NullPcr).unwrap(); let r = s.find_by_tid(2).expect("spawned tid 2"); assert_eq!(r, ThreadRef { hw_id: 1, idx: 0, generation: 0 }); assert!(s.find_by_tid(99).is_none()); } #[test] fn test_find_by_handle_returns_threadref() { let mut s = mk_scheduler_with_initial(); s.spawn(worker_spawn_params(2, 0x2000), &mut NullPcr).unwrap(); let r = s.find_by_handle(0x2000).expect("handle 0x2000"); assert_eq!(r, ThreadRef { hw_id: 1, idx: 0, generation: 0 }); } #[test] fn test_exit_current_marks_state_without_removal() { // Exit must NOT Vec::remove — that would invalidate peer // ThreadRefs. State flip + stable positions is the invariant. let mut s = mk_scheduler_with_initial(); s.spawn(worker_spawn_params(2, 0x2000), &mut NullPcr).unwrap(); s.begin_slot_visit(0); let r = s.current.expect("current set"); let (hw_id, tid, _handle) = s.exit_current(0xABCD); s.end_slot_visit(); assert_eq!(hw_id, 0); assert_eq!(tid, Some(INITIAL_GUEST_TID)); // Thread still at slot 0 idx 0, now Exited. assert_eq!(s.slots[0].runqueue.len(), 1); assert_eq!(s.slots[0].runqueue[0].state, HwState::Exited(0xABCD)); // worker on slot 1 idx 0 is unaffected. assert_eq!(s.slots[1].runqueue[0].tid, 2); let _ = r; } // ---- Axis 2: rotation + bitset tests ---- fn mk_empty_scheduler() -> Scheduler { // For rotation tests we want NO initial thread on slot 0 — // every runnable bit comes from explicit spawns below. Scheduler::new() } #[test] fn test_rotation_cursor_advances_per_round() { let mut s = mk_empty_scheduler(); // Populate all 6 slots with one Ready thread each. let mut next_tid = 1u32; for hw in 0..6u8 { let mut p = SpawnParams::default(); p.guest_tid = next_tid; p.thread_handle = 0x1000 + (next_tid * 4); p.affinity_mask = 1 << hw; p.pcr_base = 0x40000000 + (hw as u32) * 0x1000; s.spawn(p, &mut NullPcr).unwrap(); next_tid += 1; } assert_eq!(s.non_empty_runnable, 0b11_1111); let r1 = s.round_schedule(); assert_eq!(r1, vec![0, 1, 2, 3, 4, 5]); let r2 = s.round_schedule(); assert_eq!(r2, vec![1, 2, 3, 4, 5, 0]); let r3 = s.round_schedule(); assert_eq!(r3, vec![2, 3, 4, 5, 0, 1]); } #[test] fn test_rotation_skips_empty_slots() { let mut s = mk_empty_scheduler(); // Slots [Ready, Ready, empty, Ready, Ready, empty] ⇒ bitset 0b011011. for hw in [0u8, 1, 3, 4] { let mut p = SpawnParams::default(); p.guest_tid = (hw + 1) as u32; p.thread_handle = 0x1000 + (hw as u32) * 4; p.affinity_mask = 1 << hw; p.pcr_base = 0x40000000 + (hw as u32) * 0x1000; s.spawn(p, &mut NullPcr).unwrap(); } assert_eq!(s.non_empty_runnable, 0b01_1011); let r = s.round_schedule(); assert_eq!(r, vec![0, 1, 3, 4], "emits only slots with bits set"); let r = s.round_schedule(); assert_eq!(r, vec![1, 3, 4, 0], "rotation cursor advances past empties"); } #[test] fn test_park_toggles_bit_and_wake_restores() { let mut s = mk_empty_scheduler(); let mut p = SpawnParams::default(); p.guest_tid = 2; p.thread_handle = 0x2000; p.affinity_mask = 0b0010; p.pcr_base = 0x4000_1000; s.spawn(p, &mut NullPcr).unwrap(); assert_eq!(s.non_empty_runnable, 0b0010); // Park the thread: bit 1 should clear. s.begin_slot_visit(1); s.park_current(BlockReason::DelayUntil(1_000_000)); s.end_slot_visit(); assert_eq!(s.non_empty_runnable, 0, "park clears slot 1's runnable bit"); // Wake it: bit 1 restores. let r = ThreadRef { hw_id: 1, idx: 0, generation: 0 }; s.wake_ref(r); assert_eq!(s.non_empty_runnable, 0b0010); } #[test] fn test_round_schedule_empty_fastpath() { let mut s = mk_empty_scheduler(); // No spawns ⇒ bitset is 0 ⇒ fast return without allocating. assert_eq!(s.non_empty_runnable, 0); let r = s.round_schedule(); assert!(r.is_empty()); // Cursor must not advance on empty rounds (nothing happened). assert_eq!(s.rotation_cursor, 0); } #[test] fn test_rotation_fairness_three_slots_two_threads_each() { let mut s = mk_empty_scheduler(); // Slots 0, 2, 4 each hold two Ready threads; 1/3/5 empty. let mut next_tid = 1u32; for hw in [0u8, 2, 4] { for _slot_peer in 0..2 { let mut p = SpawnParams::default(); p.guest_tid = next_tid; p.thread_handle = 0x1000 + (next_tid * 4); p.affinity_mask = 1 << hw; p.pcr_base = 0x40000000 + (next_tid * 0x1000); s.spawn(p, &mut NullPcr).unwrap(); next_tid += 1; } } assert_eq!(s.non_empty_runnable, 0b01_0101); let r = s.round_schedule(); // Three entries per round (one per non-empty slot). assert_eq!(r.len(), 3); assert!(r.contains(&0) && r.contains(&2) && r.contains(&4)); } // ---- Axis 5: ideal processor + initial placement tests ---- #[test] fn test_spawn_with_ideal_processor_lands_on_ideal_slot() { let mut s = mk_empty_scheduler(); let mut p = SpawnParams::default(); p.guest_tid = 1; p.thread_handle = 0x1000; p.affinity_mask = 0xFF; p.ideal_processor = Some(3); p.pcr_base = 0x4000_0000; let slot = s.spawn(p, &mut NullPcr).unwrap(); assert_eq!(slot, 3, "ideal=3 + mask=0xFF lands on slot 3"); } #[test] fn test_spawn_with_ideal_outside_mask_falls_back_to_least_depth() { let mut s = mk_empty_scheduler(); let mut p = SpawnParams::default(); p.guest_tid = 1; p.thread_handle = 0x1000; p.affinity_mask = 0b0000_0011; // only slots 0, 1 p.ideal_processor = Some(5); // outside mask p.pcr_base = 0x4000_0000; let slot = s.spawn(p, &mut NullPcr).unwrap(); assert!(slot == 0 || slot == 1, "falls back to mask-allowed least-depth"); } #[test] fn test_spawn_without_ideal_uses_least_depth() { let mut s = mk_empty_scheduler(); // Pre-fill slots 0..3 with one thread each via explicit affinity. let mut next_tid = 1u32; for hw in 0..4u8 { let mut p = SpawnParams::default(); p.guest_tid = next_tid; p.thread_handle = 0x1000 + next_tid * 4; p.affinity_mask = 1 << hw; p.pcr_base = 0x4000_0000 + next_tid * 0x1000; s.spawn(p, &mut NullPcr).unwrap(); next_tid += 1; } // Slots 0..3 have depth 1; 4, 5 have depth 0. let mut p = SpawnParams::default(); p.guest_tid = next_tid; p.thread_handle = 0x1000 + next_tid * 4; p.affinity_mask = 0xFF; p.ideal_processor = None; p.pcr_base = 0x4000_0000 + next_tid * 0x1000; let slot = s.spawn(p, &mut NullPcr).unwrap(); assert!(slot == 4 || slot == 5, "least-depth wins; slot={}", slot); } #[test] fn test_set_ideal_ref_roundtrip() { let mut s = mk_empty_scheduler(); let mut p = SpawnParams::default(); p.guest_tid = 1; p.thread_handle = 0x1000; p.affinity_mask = 0b0000_0001; p.pcr_base = 0x4000_0000; s.spawn(p, &mut NullPcr).unwrap(); let r = ThreadRef { hw_id: 0, idx: 0, generation: 0 }; assert_eq!(s.ideal_ref(r), None, "no ideal at spawn"); let prev = s.set_ideal_ref(r, 4); assert_eq!(prev, 0xFF, "unset previous returns 0xFF sentinel"); assert_eq!(s.ideal_ref(r), Some(4)); let prev = s.set_ideal_ref(r, 2); assert_eq!(prev, 4); assert_eq!(s.ideal_ref(r), Some(2)); } // ---- Axis 4: affinity migration tests ---- #[test] fn test_affinity_change_migrates_to_new_slot() { let mut s = mk_empty_scheduler(); let mut p = SpawnParams::default(); p.guest_tid = 1; p.thread_handle = 0x1000; p.affinity_mask = 0xFF; p.pcr_base = 0x4000_0000; s.spawn(p, &mut NullPcr).unwrap(); // Landed on slot 0 (least-depth + lowest-index tiebreak). assert_eq!(s.slots[0].runqueue.len(), 1); let r = ThreadRef { hw_id: 0, idx: 0, generation: 0 }; // Restrict to slot 2 only. let (old, new_ref, _fx) = s.set_affinity_ref(r, 0b0000_0100, &mut NullPcr); assert_eq!(old, 0xFF); assert_eq!(new_ref, ThreadRef { hw_id: 2, idx: 0, generation: 0 }); assert!(s.slots[0].runqueue.is_empty()); assert_eq!(s.slots[2].runqueue.len(), 1); } #[test] fn test_affinity_change_stays_put_when_current_allowed() { let mut s = mk_empty_scheduler(); let mut p = SpawnParams::default(); p.guest_tid = 1; p.thread_handle = 0x1000; p.affinity_mask = 0b0000_1000; p.pcr_base = 0x4000_0000; s.spawn(p, &mut NullPcr).unwrap(); // Landed on slot 3 (only bit set). let r = ThreadRef { hw_id: 3, idx: 0, generation: 0 }; assert_eq!(s.thread(r).tid, 1); // Expand mask to 0..3 — slot 3 still allowed, no migration. let (_old, new_ref, _fx) = s.set_affinity_ref(r, 0b0000_1111, &mut NullPcr); assert_eq!(new_ref, r); } #[test] fn test_affinity_migration_rewrites_pcr() { let mut s = mk_empty_scheduler(); let mut p = SpawnParams::default(); p.guest_tid = 1; p.thread_handle = 0x1000; p.affinity_mask = 0b0000_0010; p.pcr_base = 0x4100_0000; s.spawn(p, &mut NullPcr).unwrap(); let r = ThreadRef { hw_id: 1, idx: 0, generation: 0 }; let mut rec = RecordingPcr::default(); let (_old, _new, _fx) = s.set_affinity_ref(r, 0b0001_0000, &mut rec); // Migration target = slot 4 (the only bit set). assert_eq!(rec.writes, vec![(0x4100_0000, 4)]); } #[test] fn test_affinity_mask_zero_treated_as_any() { let mut s = mk_empty_scheduler(); let mut p = SpawnParams::default(); p.guest_tid = 1; p.thread_handle = 0x1000; p.affinity_mask = 0b0000_0100; p.pcr_base = 0x4000_0000; s.spawn(p, &mut NullPcr).unwrap(); let r = ThreadRef { hw_id: 2, idx: 0, generation: 0 }; // mask=0 normalizes to 0xFF; slot 2 is still allowed → no migration. let (old, new_ref, _fx) = s.set_affinity_ref(r, 0, &mut NullPcr); assert_eq!(old, 0b0000_0100); assert_eq!(new_ref, r); // Verify the stored mask is 0 (we save the raw value) even though // the effective is 0xFF. assert_eq!(s.thread(r).affinity_mask, 0); } #[test] fn test_affinity_migration_fixup_retargets_promoted_peer() { // Two threads on slot 0: A (idx 0), B (idx 1). Migrate A to // slot 3 — swap_remove moves B from idx 1 to idx 0. A ref that // previously pointed at B (idx 1) must be retargeted to idx 0. let mut s = mk_empty_scheduler(); for tid in [1u32, 2] { let mut p = SpawnParams::default(); p.guest_tid = tid; p.thread_handle = 0x1000 + tid * 4; p.affinity_mask = 0b0000_0001; p.pcr_base = 0x4000_0000 + tid * 0x1000; s.spawn(p, &mut NullPcr).unwrap(); } let a_ref = ThreadRef { hw_id: 0, idx: 0, generation: 0 }; let b_ref_before = ThreadRef { hw_id: 0, idx: 1, generation: 0 }; assert_eq!(s.thread(b_ref_before).tid, 2); let (_old, a_new, fx) = s.set_affinity_ref(a_ref, 0b0000_1000, &mut NullPcr); let fx = fx.expect("migration emits fixup"); // A now lives on slot 3 idx 0. assert_eq!(a_new, ThreadRef { hw_id: 3, idx: 0, generation: 0 }); // Apply fixup to B's old ref → promoted into slot 0 idx 0. let mut stale = b_ref_before; fx.apply(&mut stale); assert_eq!(stale, ThreadRef { hw_id: 0, idx: 0, generation: 0 }); assert_eq!(s.thread(stale).tid, 2); } // ---- Axis 3: quantum tests ---- #[test] fn test_quantum_rotation_within_slot() { let mut s = mk_empty_scheduler(); // A and B both on slot 0 at priority 0. for tid in [1u32, 2] { let mut p = SpawnParams::default(); p.guest_tid = tid; p.thread_handle = 0x1000 + tid * 4; p.affinity_mask = 0b0001; p.pcr_base = 0x4000_0000 + tid * 0x1000; s.spawn(p, &mut NullPcr).unwrap(); } s.begin_slot_visit(0); let first_tid = s.thread(s.current.unwrap()).tid; // Drain the quantum. Each call bar the last returns false. for _ in 0..(QUANTUM_DEFAULT - 1) { assert!(!s.decrement_quantum()); } // The final decrement flips running_idx to the peer. assert!(s.decrement_quantum()); let second_tid = s.thread(s.current.unwrap()).tid; assert_ne!(first_tid, second_tid, "quantum expiry rotates to peer"); } #[test] fn test_quantum_does_not_rotate_without_same_priority_peer() { let mut s = mk_empty_scheduler(); // A priority 0, B priority 5 — B wins pick_runnable outright, so // quantum expiry on A shouldn't flip to B (priority ordering // handles that next round instead). let mut pa = SpawnParams::default(); pa.guest_tid = 1; pa.thread_handle = 0x1000; pa.affinity_mask = 0b0001; pa.pcr_base = 0x4000_0000; pa.priority = 0; s.spawn(pa, &mut NullPcr).unwrap(); let mut pb = SpawnParams::default(); pb.guest_tid = 2; pb.thread_handle = 0x1004; pb.affinity_mask = 0b0001; pb.pcr_base = 0x4000_1000; pb.priority = 5; s.spawn(pb, &mut NullPcr).unwrap(); // Force A to be running (pick_runnable would actually pick B; // drive the test by manually setting current). s.begin_slot_visit(0); // pick_runnable selects the priority-5 thread (tid=2) because max_by_key // returns highest priority. Set running to A (tid=1, idx=0) manually. s.slots[0].running_idx = Some(0); s.current = Some(ThreadRef { hw_id: 0, idx: 0, generation: 0 }); // Drain A's quantum; should reload to DEFAULT but not rotate // (B is higher priority, not equal). for _ in 0..QUANTUM_DEFAULT { let _ = s.decrement_quantum(); } let t = s.thread(s.current.unwrap()); assert_eq!(t.tid, 1, "stays on A; B has higher priority, not equal"); assert_eq!(t.quantum_remaining, QUANTUM_DEFAULT, "quantum reloaded"); } #[test] fn test_cooperative_yield_does_not_need_quantum() { let mut s = mk_empty_scheduler(); for tid in [1u32, 2] { let mut p = SpawnParams::default(); p.guest_tid = tid; p.thread_handle = 0x1000 + tid * 4; p.affinity_mask = 0b0001; p.pcr_base = 0x4000_0000 + tid * 0x1000; s.spawn(p, &mut NullPcr).unwrap(); } s.begin_slot_visit(0); let first_tid = s.thread(s.current.unwrap()).tid; // Park via cooperative yield. s.park_current(BlockReason::DelayUntil(1_000_000)); s.end_slot_visit(); // Next round: pick_runnable skips the Blocked one, so the other // thread is selected. s.begin_slot_visit(0); let next_tid = s.thread(s.current.unwrap()).tid; assert_ne!(first_tid, next_tid, "cooperative park switches thread"); } #[test] fn test_wake_ref_resets_quantum() { let mut s = mk_empty_scheduler(); let mut p = SpawnParams::default(); p.guest_tid = 2; p.thread_handle = 0x2000; p.affinity_mask = 0b0010; p.pcr_base = 0x4000_1000; s.spawn(p, &mut NullPcr).unwrap(); let r = ThreadRef { hw_id: 1, idx: 0, generation: 0 }; // Park, poke quantum to 1, wake ⇒ quantum back to DEFAULT. s.thread_mut(r).state = HwState::Blocked(BlockReason::WaitAny { handles: vec![0xDEAD], deadline: None, }); s.thread_mut(r).quantum_remaining = 1; s.wake_ref(r); assert_eq!(s.thread(r).quantum_remaining, QUANTUM_DEFAULT); } #[test] fn test_wake_ref_restores_ready_and_quantum() { let mut s = mk_scheduler_with_initial(); s.spawn(worker_spawn_params(2, 0x2000), &mut NullPcr).unwrap(); let r = ThreadRef { hw_id: 1, idx: 0, generation: 0 }; // Park then wake. s.thread_mut(r).state = HwState::Blocked(BlockReason::WaitAny { handles: vec![0x1234], deadline: None, }); s.thread_mut(r).quantum_remaining = 1; s.wake_ref(r); assert_eq!(s.thread(r).state, HwState::Ready); assert_eq!(s.thread(r).quantum_remaining, QUANTUM_DEFAULT); } }