diff --git a/crates/xenia-app/Cargo.toml b/crates/xenia-app/Cargo.toml index 001a2d9..4bdb984 100644 --- a/crates/xenia-app/Cargo.toml +++ b/crates/xenia-app/Cargo.toml @@ -20,9 +20,21 @@ xenia-apu = { workspace = true } xenia-hid = { workspace = true } xenia-debugger = { workspace = true } xenia-analysis = { workspace = true } +xenia-ui = { workspace = true } +winit = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true } +tracing-appender = { workspace = true } +tracing-chrome = { workspace = true } +tracing-error = { workspace = true } +metrics = { workspace = true } +metrics-util = { workspace = true } +pprof = { workspace = true, optional = true } anyhow = { workspace = true } clap = { version = "4", features = ["derive"] } serde = { workspace = true } serde_json = { workspace = true } + +[features] +default = ["profiling"] +profiling = ["dep:pprof"] diff --git a/crates/xenia-app/src/main.rs b/crates/xenia-app/src/main.rs index 6aa3694..74f5c5c 100644 --- a/crates/xenia-app/src/main.rs +++ b/crates/xenia-app/src/main.rs @@ -1,8 +1,12 @@ +mod observability; + use std::collections::HashMap; +use std::path::PathBuf; +use std::time::Instant; use anyhow::Result; -use clap::{Parser, Subcommand}; -use tracing_subscriber::EnvFilter; +use clap::{Parser, Subcommand, ValueEnum}; +use tracing::{debug, info, instrument, warn}; use xenia_kernel::ModuleId; use xenia_memory::MemoryAccess; @@ -11,19 +15,63 @@ use xenia_memory::MemoryAccess; #[command(name = "xenia-rs")] #[command(about = "Xbox 360 emulator for reverse engineering and preservation")] struct Cli { + /// Force JSON formatting on the console log layer (default is compact text) + #[arg(long, global = true)] + log_json: bool, + + /// Additionally write logs to this file. `.json` extension selects JSON; + /// anything else uses text. Uses a non-blocking appender. + #[arg(long, global = true, value_name = "PATH")] + log_file: Option, + + /// Override tracing filter (same syntax as `RUST_LOG`). Also settable via env. + #[arg(long, global = true, value_name = "DIRECTIVES")] + log_filter: Option, + + /// Emit a Chrome `about:tracing` JSON trace of all spans to this path. + #[arg(long, global = true, value_name = "PATH")] + trace_chrome: Option, + + /// Start the pprof sampling profiler at 100 Hz. Extension `.svg` writes a + /// flamegraph; `.pb` writes a pprof protobuf. Requires the `profiling` + /// Cargo feature (enabled by default). + #[arg(long, global = true, value_name = "PATH")] + profile: Option, + #[command(subcommand)] command: Commands, } +/// `dis --analyze` modes. Rust passes are the default and always functional; +/// SQL views are an additive surface for relational queries. +#[derive(Copy, Clone, Debug, PartialEq, Eq, ValueEnum)] +enum AnalyzeMode { + /// Rust passes only (`func.rs`, `xref.rs`). No SQL views. + Rust, + /// Rust passes + additive SQL views (`v_branch_xrefs`, `v_call_graph`, + /// `v_reachability_from_entry`, `v_function_first_instruction`, + /// `v_imports_called`). + Sql, + /// Same as `sql`, plus a Rust-vs-SQL cross-check on branch xrefs at + /// the end. Disagreement is logged as a warning (non-fatal). + Both, +} + #[derive(Subcommand)] enum Commands { - /// Disassemble a XEX file from its entry point + /// Disassemble a XEX file from its entry point (or an arbitrary address via `--at`) Disasm { /// Path to XEX file path: String, /// Number of instructions to disassemble #[arg(short = 'n', default_value = "64")] count: usize, + /// Start address (hex with or without `0x` prefix). Defaults to + /// the XEX entry point. Must fall inside the loaded image range. + /// + /// Example: `--at 0x824be9a0` to inspect a graphics-interrupt callback. + #[arg(long, value_parser = parse_hex_u32)] + at: Option, }, /// Load and execute a XEX file with tracing Exec { @@ -32,6 +80,9 @@ enum Commands { /// Maximum instructions to execute before stopping (unlimited if omitted) #[arg(short = 'n')] max_instructions: Option, + /// Throttle execution to at most N instructions per second (unlimited if omitted) + #[arg(long)] + ips_limit: Option, /// SQLite database to write to. Includes the full static analysis /// that `dis --db` would produce, plus any opt-in trace tables. #[arg(long)] @@ -49,6 +100,63 @@ enum Commands { /// (only errors, faults, halts, and the summary line are printed) #[arg(long)] quiet: bool, + /// Open a window (winit + wgpu) for interactive dynamic analysis: + /// presents the guest frontbuffer on each VdSwap, polls a host + /// gamepad (gilrs) into XamInputGetState, and renders a live HUD + /// with swap/input stats. Runs the CPU interpreter on a worker + /// thread; the window hosts the event loop on the main thread. + #[arg(long)] + ui: bool, + /// Halt immediately when every live HW thread is blocked on a handle + /// wait and no timer is pending, dumping a per-thread diagnostic + /// (state / PC / LR / waiter handles) instead of force-waking the + /// waiters with STATUS_TIMEOUT. Default is to force-wake so probe + /// runs survive early deadlocks; flip this when investigating the + /// deadlock itself. Also settable via `XENIA_HALT_ON_DEADLOCK=1`. + #[arg(long)] + halt_on_deadlock: bool, + /// Enable per-handle signal/wait/wake audit trail. Diagnostic only: + /// at end-of-run, `dump_thread_diagnostic` prints, for every kernel + /// handle, the bounded ring of recent create/wait/signal/wake events + /// (who fired what, when, with what status). Used to identify + /// missing-signal deadlocks (a handle with waiters but zero signals + /// is the smoking gun for a missing kernel API). Also settable via + /// `XENIA_TRACE_HANDLES=1`. + #[arg(long)] + trace_handles: bool, + /// Force the threaded GPU backend (default at M1.9). Kept for + /// explicit scripted runs; equivalent to omitting both + /// `--gpu-inline` and `--ui`. Settable via `XENIA_GPU_THREAD=1`. + #[arg(long)] + gpu_thread: bool, + /// **M1.9 rollback flag.** Forces the legacy inline GPU backend + /// (synchronous `kernel.gpu.execute_one` on the CPU thread). + /// Use this if the threaded backend regresses; it preserves + /// pre-M1 behavior bit-for-bit. Settable via + /// `XENIA_GPU_INLINE=1`. `--ui` implies `--gpu-inline` until the + /// UI worker is migrated to the Arc-shared mem model. + #[arg(long)] + gpu_inline: bool, + /// **M2.8 — enable the inter-thread reservation table for + /// `lwarx`/`stwcx.`.** Under M3's per-HW-thread parallelism this + /// is the substrate that mediates concurrent `lwarx` claims; in + /// M2's single-threaded mode it's a no-op (the legacy + /// per-`PpcContext` reservation fields drive observable + /// behavior). Settable via `XENIA_RESERVATIONS_TABLE=1`. Default + /// off; M3 flips to default-on for the parallel host-thread + /// path. Verifies the golden digest is unchanged under lockstep + /// mode with the table activated. + #[arg(long)] + reservations_table: bool, + /// **M3 — spawn HW-thread host workers.** When set, wraps + /// `KernelState` in `Arc>` and runs the interpreter + /// loop on a dedicated worker thread (the coarse-locked first + /// step toward per-HW-thread parallelism). Lockstep behavior + /// preserved: the worker holds the lock for the whole run, so + /// observable execution is identical. Settable via + /// `XENIA_PARALLEL=1`. + #[arg(long)] + parallel: bool, }, /// Browse XISO disc image contents Browse { @@ -81,55 +189,170 @@ enum Commands { /// Output SQLite database (also includes the base extract tables) #[arg(long)] db: Option, + /// Output JSON Lines file: one structured row per instruction with + /// section/function/label/branch_target columns. Suitable for + /// `jq`, pandas, or DuckDB's `read_json_auto`. + #[arg(long)] + json: Option, + /// Choose how analysis tables are produced when `--db` is set. + /// + /// - `rust` (default): only the Rust passes (`func.rs`, `xref.rs`) + /// populate `functions`/`labels`/`xrefs`. No SQL views. + /// - `sql`: Rust passes still run (function detection and data-ref + /// resolution are Rust-only by design); additive SQL views + /// (`v_branch_xrefs`, `v_call_graph`, `v_reachability_from_entry`, + /// `v_function_first_instruction`, `v_imports_called`) are + /// created on top of the same tables. + /// - `both`: same as `sql`, plus a Rust-vs-SQL cross-check on + /// branch xrefs. Disagreement is logged as a warning (non-fatal). + #[arg(long, value_enum, default_value_t = AnalyzeMode::Rust)] + analyze: AnalyzeMode, /// Suppress assembly text output (DB-only mode) #[arg(long)] quiet: bool, }, + /// P8 — deterministic run digest: execute `n` instructions and print a + /// JSON summary of stats (instructions, draws, swaps, resolves, + /// unique shaders/RTs, interrupts). Drop-in regression detector. + Check { + /// Path to XEX or ISO file + path: String, + /// Instructions to execute before computing the digest + #[arg(short = 'n', default_value = "2000000")] + max_instructions: u64, + /// Write the digest as JSON to this path (stdout if omitted) + #[arg(long)] + out: Option, + /// Optional golden digest JSON; `check` exits non-zero on mismatch + #[arg(long)] + expect: Option, + /// Force the threaded GPU backend (default at M1.9). + #[arg(long)] + gpu_thread: bool, + /// **M1.9 rollback flag.** Forces the legacy inline GPU backend. + /// Use this if the threaded backend regresses. + #[arg(long)] + gpu_inline: bool, + /// M2.8 — enable the inter-thread reservation table. + #[arg(long)] + reservations_table: bool, + /// M3 — spawn HW-thread host workers under coarse Arc>. + #[arg(long)] + parallel: bool, + }, } fn main() -> Result<()> { let cli = Cli::parse(); // Bump default log level to `warn` for quiet exec runs so kernel-call - // tracing::info! spam is filtered out. RUST_LOG still wins if set. + // tracing::info! spam is filtered out. RUST_LOG / --log-filter still win. let exec_quiet = matches!(&cli.command, Commands::Exec { quiet: true, .. }); - let default_level = if exec_quiet { "warn" } else { "info" }; - tracing_subscriber::fmt() - .with_env_filter(EnvFilter::from_default_env().add_directive(default_level.parse()?)) - .init(); + // Default filter: keep xenia-rs logs, silence noisy third-party crates. + // `wgpu*` / `naga` emit many harmless WARNs (unrecognized present modes, + // GLES-backend config probes, etc.) — we push them to ERROR by default. + // Override via `--log-filter` or `RUST_LOG`, e.g. + // `--log-filter='info,wgpu_core=trace'` during bring-up. + let default_level: &'static str = if exec_quiet { + "warn,\ + wgpu=off,wgpu_core=off,wgpu_hal=off,naga=off,\ + winit=off,gilrs=off,gilrs_core=off,\ + mio=off,polling=off,sctk=off,smithay_client_toolkit=off,\ + calloop=off" + } else { + "info,\ + wgpu=error,wgpu_core=error,wgpu_hal=error,naga=error,\ + winit=warn,gilrs=warn,gilrs_core=warn,\ + mio=warn,polling=warn,sctk=warn,smithay_client_toolkit=warn,\ + calloop=warn" + }; - match cli.command { - Commands::Disasm { path, count } => cmd_disasm(&path, count), + let config = observability::ObservabilityConfig { + log_json: cli.log_json, + log_file: cli.log_file.clone(), + log_filter: cli.log_filter.clone(), + default_level, + trace_chrome: cli.trace_chrome.clone(), + profile: cli.profile.clone(), + }; + // Hold the guards for the whole program; their Drop flushes appenders, + // finalises the Chrome trace, writes the pprof report, and prints the + // metrics summary. + let _obs = observability::init(&config)?; + + let result = match cli.command { + Commands::Disasm { path, count, at } => cmd_disasm(&path, count, at), Commands::Exec { path, max_instructions, + ips_limit, db, trace_instructions, trace_imports, trace_branches, quiet, + ui, + halt_on_deadlock, + trace_handles, + gpu_thread, + gpu_inline, + reservations_table, + parallel, } => cmd_exec( &path, max_instructions, + ips_limit, db.as_deref(), trace_instructions, trace_imports, trace_branches, quiet, + ui, + halt_on_deadlock, + trace_handles, + gpu_thread, + gpu_inline, + reservations_table, + parallel, ), Commands::Browse { path } => cmd_browse(&path), Commands::Info { path } => cmd_info(&path), Commands::Extract { path, output, db } => cmd_extract(&path, output.as_deref(), db.as_deref()), - Commands::Dis { path, output, db, quiet } => cmd_dis(&path, output.as_deref(), db.as_deref(), quiet), + Commands::Dis { path, output, db, json, analyze, quiet } => cmd_dis(&path, output.as_deref(), db.as_deref(), json.as_deref(), analyze, quiet), + Commands::Check { + path, + max_instructions, + out, + expect, + gpu_thread, + gpu_inline, + reservations_table, + parallel, + } => cmd_check( + &path, + max_instructions, + out.as_deref(), + expect.as_deref(), + gpu_thread, + gpu_inline, + reservations_table, + parallel, + ), + }; + + match result { + Ok(()) => Ok(()), + Err(e) => Err(observability::attach_span_trace(e)), } } /// Load XEX data from a path. If the path is an ISO, extract default.xex from it. +#[instrument(skip_all, fields(path = %path))] fn load_xex_data(path: &str) -> Result> { let lower = path.to_lowercase(); if lower.ends_with(".iso") || lower.ends_with(".xiso") { use xenia_vfs::VfsDevice; - println!("Detected disc image, extracting default.xex..."); + info!("detected disc image, extracting default.xex"); let disc = xenia_vfs::disc_image::DiscImageDevice::open("disc", std::path::Path::new(path)) .map_err(|e| anyhow::anyhow!("Failed to open disc image: {}", e))?; disc.read_file("default.xex") @@ -139,7 +362,9 @@ fn load_xex_data(path: &str) -> Result> { } } +#[instrument(skip_all, fields(path = %path))] fn cmd_info(path: &str) -> Result<()> { + let started = Instant::now(); let data = load_xex_data(path)?; let header = xenia_xex::loader::parse_xex2_header(&data)?; @@ -203,10 +428,27 @@ fn cmd_info(path: &str) -> Result<()> { } } + info!(wall_ms = started.elapsed().as_millis() as u64, "info complete"); Ok(()) } -fn cmd_disasm(path: &str, count: usize) -> Result<()> { +/// Clap parser for `--at` — accepts decimal, 0x-prefixed hex, or bare hex. +fn parse_hex_u32(s: &str) -> Result { + let t = s.trim(); + let (digits, radix) = if let Some(rest) = t.strip_prefix("0x").or_else(|| t.strip_prefix("0X")) { + (rest, 16) + } else if t.chars().all(|c| c.is_ascii_digit()) { + (t, 10) + } else { + (t, 16) + }; + u32::from_str_radix(digits, radix) + .map_err(|e| format!("invalid u32 {:?}: {e} (try `0x824be9a0`)", t)) +} + +#[instrument(skip_all, fields(path = %path, count))] +fn cmd_disasm(path: &str, count: usize, at: Option) -> Result<()> { + let started = Instant::now(); let data = load_xex_data(path)?; let header = xenia_xex::loader::parse_xex2_header(&data)?; @@ -215,35 +457,133 @@ fn cmd_disasm(path: &str, count: usize) -> Result<()> { let base = xenia_xex::loader::get_image_base(&header) .ok_or_else(|| anyhow::anyhow!("No image base found in XEX2 header"))?; - println!("Entry point: {:#010x}, Image base: {:#010x}", entry, base); + info!(entry = format_args!("{:#010x}", entry), base = format_args!("{:#010x}", base), "XEX entry/base"); - // Load and decompress the image let image_data = xenia_xex::loader::load_image(&data, &header)?; - println!("Image loaded: {} bytes decompressed", image_data.len()); - println!("Disassembly from entry point ({} instructions):\n", count); + info!(bytes = image_data.len(), "image decompressed"); - let entry_offset = (entry - base) as usize; - if entry_offset + count * 4 <= image_data.len() { - let block = xenia_cpu::disasm::disassemble_block(&image_data[entry_offset..], entry, count); - for (addr, text) in block { - println!(" {:#010x}: {}", addr, text); - } - } else { - println!(" (entry point offset {:#x} is outside image bounds, image is {:#x} bytes)", entry_offset, image_data.len()); + let start = at.unwrap_or(entry); + let label = if at.is_some() { "requested address" } else { "entry point" }; + println!("Disassembly from {} {:#010x} ({} instructions):\n", label, start, count); + + if start < base { + return Err(anyhow::anyhow!( + "address {:#x} is below image base {:#x}", + start, + base + )); + } + let offset = (start - base) as usize; + if offset + count * 4 > image_data.len() { + return Err(anyhow::anyhow!( + "address {:#x} (offset {:#x}) + {} instructions extends past image end ({:#x} bytes)", + start, + offset, + count, + image_data.len() + )); + } + let block = xenia_cpu::disasm::disassemble_block(&image_data[offset..], start, count); + for (addr, text) in block { + println!(" {:#010x}: {}", addr, text); } + info!(wall_ms = started.elapsed().as_millis() as u64, "disasm complete"); Ok(()) } +#[instrument(skip_all, fields(path = %path, ui))] fn cmd_exec( path: &str, max_instructions: Option, + ips_limit: Option, db_path: Option<&str>, trace_instructions: bool, trace_imports: bool, trace_branches: bool, quiet: bool, + ui: bool, + halt_on_deadlock: bool, + trace_handles: bool, + gpu_thread: bool, + gpu_inline: bool, + reservations_table: bool, + parallel: bool, ) -> Result<()> { + cmd_exec_inner( + path, + max_instructions, + ips_limit, + db_path, + trace_instructions, + trace_imports, + trace_branches, + quiet, + ui, + halt_on_deadlock, + trace_handles, + gpu_thread, + gpu_inline, + reservations_table, + parallel, + None, + None, + ) +} + +/// P8 — drive a deterministic run and emit a JSON digest of key counters. +/// Shares `cmd_exec`'s setup path so the digest reflects a real execution. +fn cmd_check( + path: &str, + max_instructions: u64, + out: Option<&str>, + expect: Option<&str>, + gpu_thread: bool, + gpu_inline: bool, + reservations_table: bool, + parallel: bool, +) -> Result<()> { + cmd_exec_inner( + path, + Some(max_instructions), + None, + None, + false, + false, + false, + true, // quiet — suppress banners + false, // ui — headless + false, // halt_on_deadlock — keep the force-wake default for probe/check runs + false, // trace_handles — diagnostic only, never wanted on golden runs + gpu_thread, + gpu_inline, + reservations_table, + parallel, + out, + expect, + ) +} + +fn cmd_exec_inner( + path: &str, + max_instructions: Option, + ips_limit: Option, + db_path: Option<&str>, + trace_instructions: bool, + trace_imports: bool, + trace_branches: bool, + quiet: bool, + ui: bool, + halt_on_deadlock: bool, + trace_handles: bool, + gpu_thread: bool, + gpu_inline: bool, + reservations_table: bool, + parallel: bool, + digest_out: Option<&str>, + digest_expect: Option<&str>, +) -> Result<()> { + let started = Instant::now(); let data = load_xex_data(path)?; let mut header = xenia_xex::loader::parse_xex2_header(&data)?; @@ -252,24 +592,25 @@ fn cmd_exec( let base = xenia_xex::loader::get_image_base(&header) .ok_or_else(|| anyhow::anyhow!("No image base found"))?; - if !quiet { - if let Some(ref ffi) = header.file_format_info { - println!("Compression: {} (encryption: {})", - match ffi.compression_type { - 0 => "none", 1 => "basic", 2 => "normal (LZX)", _ => "unknown" - }, - match ffi.encryption_type { - 0 => "none", 1 => "normal (AES)", _ => "unknown" - }); - } - if !header.import_libraries.is_empty() { - println!("Import libraries:"); - for lib in &header.import_libraries { - println!(" {} ({} imports)", lib.name, lib.imports.len()); - } - } - println!("Loading XEX: entry={:#010x} base={:#010x}", entry, base); + if let Some(ref ffi) = header.file_format_info { + info!( + compression = match ffi.compression_type { + 0 => "none", 1 => "basic", 2 => "normal (LZX)", _ => "unknown" + }, + encryption = match ffi.encryption_type { + 0 => "none", 1 => "normal (AES)", _ => "unknown" + }, + "XEX file format" + ); } + for lib in &header.import_libraries { + debug!(name = %lib.name, imports = lib.imports.len(), "import library"); + } + info!( + entry = format_args!("{:#010x}", entry), + base = format_args!("{:#010x}", base), + "loading XEX" + ); // Allocate guest memory let mut mem = xenia_memory::GuestMemory::new() @@ -290,6 +631,8 @@ fn cmd_exec( // ── Phase 1: Build import thunk map ────────────────────────────────── let mut thunk_map: HashMap = HashMap::new(); + // Reverse map for `XexGetProcedureAddress` (drained into kernel below). + let mut thunk_addr_map: Vec<(ModuleId, u16, u32)> = Vec::new(); for lib in &header.import_libraries { let module = match lib.name.as_str() { "xboxkrnl.exe" => ModuleId::Xboxkrnl, @@ -302,12 +645,11 @@ fn cmd_exec( .map(|s| s.to_string()) .unwrap_or_else(|| format!("ordinal_{:#06X}", imp.ordinal)); thunk_map.insert(imp.address, (module, imp.ordinal, name)); + thunk_addr_map.push((module, imp.ordinal, imp.address)); } } } - if !quiet { - println!("Import thunks mapped: {}", thunk_map.len()); - } + info!(thunks = thunk_map.len(), "import thunks mapped"); // ── Phase 2: CPU initialization per xenia-canary ───────────────────── // Allocate stack (1MB at 0x70000000) @@ -327,20 +669,192 @@ fn cmd_exec( mem.write_u32(pcr_addr + 0x100, 0x1000); // PCR->current_thread (fake) mem.write_u32(pcr_addr + 0x150, 0); // PCR->dpc_active - // Set up CPU context per xenia-canary/cpu/thread_state.cc + // Set up CPU context per xenia-canary/cpu/thread_state.cc. + // + // `PpcContext::new` already initializes LR = 0xBCBC_BCBC (halt sentinel), + // VSCR NJ bit, and VRSAVE = 0xFFFFFFFF to match canary. let mut ctx = xenia_cpu::PpcContext::new(); ctx.pc = entry; - ctx.gpr[1] = (stack_base + stack_size - 0x80) as u64; // Stack pointer - ctx.gpr[2] = 0x2000_0000; // RTOC - ctx.gpr[13] = pcr_addr as u64; // PCR/TLS pointer - ctx.msr = 0x9030; // Hardware-dumped MSR + // Stack pointer: 16-byte aligned, leaving 256 bytes of reservation above + // the top so the entry's prolog has room for its linkage area without + // wandering off the allocated region. + let sp_top = (stack_base + stack_size) as u64; + ctx.gpr[1] = (sp_top - 0x100) & !0xFu64; + ctx.gpr[2] = 0x2000_0000; // RTOC (TOC base) + // r3..r7 carry the XEX "start_context" in canary; for a bare entry (no + // launch data from the kernel), canary passes 0. Explicitly zero them so + // we don't inherit whatever PpcContext::new happens to leave. + for r in 3..=7 { ctx.gpr[r] = 0; } + ctx.gpr[13] = pcr_addr as u64; // PCR/TLS pointer (r13) + ctx.msr = 0x9030; // Hardware-dumped MSR + + // ── Phase 4: Set up kernel ─────────────────────────────────────────── + // + // M1.9 — default GPU backend is now **threaded**. Rollback rules: + // 1. `--gpu-inline` / `XENIA_GPU_INLINE=1` → force inline. + // 2. `--ui` → force inline (the UI + // worker thread doesn't yet share `Arc` with the + // GPU worker; cohabitation panics in `cmd_exec_inner` below). + // 3. `--gpu-thread` / `XENIA_GPU_THREAD=1` → force threaded + // (explicit override). + // 4. Otherwise: threaded. + let env_inline = std::env::var("XENIA_GPU_INLINE") + .ok() + .is_some_and(|v| { + let v = v.trim().to_ascii_lowercase(); + v == "1" || v == "true" || v == "yes" + }); + let env_thread = std::env::var("XENIA_GPU_THREAD") + .ok() + .is_some_and(|v| { + let v = v.trim().to_ascii_lowercase(); + v == "1" || v == "true" || v == "yes" + }); + let force_inline = gpu_inline || env_inline || ui; + let force_thread = gpu_thread || env_thread; + let use_threaded = if force_inline { + false + } else if force_thread { + true + } else { + true // M1.9 default + }; + let shutdown_arc = + std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)); + let (mut maybe_gpu_worker, gpu_backend): ( + Option, + xenia_gpu::GpuBackend, + ) = if use_threaded { + let (worker, handle) = xenia_gpu::GpuSystem::new() + .into_handle_with_shutdown(shutdown_arc.clone()); + (Some(worker), xenia_gpu::GpuBackend::Threaded(handle)) + } else { + ( + None, + xenia_gpu::GpuBackend::Inline(xenia_gpu::GpuSystem::new()), + ) + }; + let mut kernel = xenia_kernel::KernelState::with_gpu(gpu_backend); + kernel.image_base = base; + // Drain the reverse thunk map into the kernel so `XexGetProcedureAddress` + // can resolve ordinals back to callable thunk addresses. + for (module, ordinal, addr) in thunk_addr_map.drain(..) { + kernel.register_thunk(module, ordinal, addr); + } + // M2.8 — flip the reservation-table flag if requested. Either + // `--reservations-table` or `XENIA_RESERVATIONS_TABLE=1`. The flag + // is observable via `kernel.reservations_enabled.load(Acquire)`; + // M3 will hook it into the interpreter's `lwarx`/`stwcx.` arms. + let reservations_via_env = std::env::var("XENIA_RESERVATIONS_TABLE") + .ok() + .is_some_and(|v| { + let v = v.trim().to_ascii_lowercase(); + v == "1" || v == "true" || v == "yes" + }); + let parallel_via_env = std::env::var("XENIA_PARALLEL") + .ok() + .is_some_and(|v| { + let v = v.trim().to_ascii_lowercase(); + v == "1" || v == "true" || v == "yes" + }); + let parallel_active = parallel || parallel_via_env; + if reservations_table || reservations_via_env || parallel_active { + kernel.reservations.enable(); + if !quiet { + tracing::info!("reservation table enabled (lwarx/stwcx route through it)"); + } + } + // Diagnostic: flip audit on when --trace-handles or XENIA_TRACE_HANDLES. + // Behaviour-neutral when disabled (every record_* method is a no-op). + let audit_via_env = std::env::var("XENIA_TRACE_HANDLES") + .map(|v| matches!(v.as_str(), "1" | "true" | "TRUE" | "True")) + .unwrap_or(false); + if trace_handles || audit_via_env { + kernel.audit.enabled = true; + if !quiet { + tracing::info!("handle audit enabled (--trace-handles)"); + } + } + + // Install the GPU register aperture MMIO region on the guest memory so + // any `0x7FC8xxxx` access routes to our atomic mailbox. Matches canary's + // `graphics_system.cc:141-144`. The callbacks capture Arc clones of the + // atoms that live inside `kernel.gpu.mmio`. + mem.add_mmio_region(xenia_gpu::build_mmio_region(kernel.gpu.mmio())); + + // Install the initial guest thread on HW slot 0. The thread handle we + // hand the scheduler isn't visible to any guest API yet, but joiners + // (XThreadWait-style) will see it via `find_by_tid`. + let main_handle = kernel.alloc_handle_for(xenia_kernel::objects::KernelObject::Thread { + id: xenia_cpu::scheduler::INITIAL_GUEST_TID, + hw_id: Some(0), + exit_code: None, + waiters: Vec::new(), + }); + kernel.next_thread_id.store( + xenia_cpu::scheduler::INITIAL_GUEST_TID + 1, + std::sync::atomic::Ordering::Relaxed, + ); + // PCR[+0x2C] = processor number 0 for the main HW thread. + mem.write_u32(pcr_addr + 0x2C, 0); + ctx.thread_id = xenia_cpu::scheduler::INITIAL_GUEST_TID; + kernel.install_initial_thread( + ctx, + stack_base, + stack_size, + pcr_addr, + tls_addr, + main_handle, + &mut mem, + ); + + // If the input was a disc image, mount it so the kernel's file I/O + // handlers can serve the game's own assets via VFS. + if path.to_lowercase().ends_with(".iso") || path.to_lowercase().ends_with(".xiso") { + match xenia_vfs::disc_image::DiscImageDevice::open("d", std::path::Path::new(path)) { + Ok(disc) => kernel.vfs = Some(Box::new(disc)), + Err(e) => tracing::warn!("Could not mount disc image for VFS: {}", e), + } + } // ── Phase 3: Data export patching (variable imports) ───────────────── + // + // Helper: lazily allocate a zero-filled block on the heap. Shared for all + // diagnostic data exports the game reads but never writes — a single + // block keeps allocator noise low. + let alloc_zero = |size: u32, mem: &xenia_memory::GuestMemory, kernel: &mut xenia_kernel::KernelState| -> u32 { + kernel.heap_alloc(size, mem).unwrap_or(0) + }; + for lib in &header.import_libraries { for imp in &lib.imports { if imp.record_type != 0 { continue; } // Only variable entries let addr = imp.address; match (lib.name.as_str(), imp.ordinal) { + ("xboxkrnl.exe", 0x001B) => { + // ExThreadObjectType — ptr to OBJECT_TYPE descriptor (0x40 bytes) + let block = alloc_zero(0x40, &mut mem, &mut kernel); + mem.write_u32(addr, block); + } + ("xboxkrnl.exe", 0x0059) => { + // KeDebugMonitorData — ptr to diagnostic block + let block = alloc_zero(0x40, &mut mem, &mut kernel); + mem.write_u32(addr, block); + } + ("xboxkrnl.exe", 0x00AD) => { + // KeTimeStampBundle — 0x18 block with FILETIME at +0 and + // interrupt-time u64 at +0x10. Mirrors the clock used by + // KeQuerySystemTime so fast-path readers see consistent values. + let block = alloc_zero(0x18, &mut mem, &mut kernel); + if block != 0 { + let fake_time: u64 = 132_500_000_000_000_000; // ~2021 FILETIME + mem.write_u32(block, (fake_time >> 32) as u32); + mem.write_u32(block + 4, fake_time as u32); + mem.write_u32(block + 0x10, (fake_time >> 32) as u32); + mem.write_u32(block + 0x14, fake_time as u32); + } + mem.write_u32(addr, block); + } ("xboxkrnl.exe", 0x0158) => { // XboxKrnlVersion: {major=2, minor=0, build=20000, qfe=0} mem.write_u16(addr, 2); @@ -352,10 +866,29 @@ fn cmd_exec( // XexExecutableModuleHandle -> image base mem.write_u32(addr, base); } + ("xboxkrnl.exe", 0x01AE) => { + // ExLoadedCommandLine — ANSI empty string + let block = alloc_zero(0x10, &mut mem, &mut kernel); + // Block is already zero-initialized by heap_alloc -> empty string. + mem.write_u32(addr, block); + } + ("xboxkrnl.exe", 0x01BE) => { + // VdGlobalDevice — passed through to Vd* shims. Write 0. + mem.write_u32(addr, 0); + } ("xboxkrnl.exe", 0x01C0) => { // VdGpuClockInMHz mem.write_u32(addr, 500); } + ("xboxkrnl.exe", 0x01C1) => { + // VdHSIOCalibrationLock — lock slot + mem.write_u32(addr, 0); + } + ("xboxkrnl.exe", 0x0266) => { + // KeCertMonitorData — ptr to diagnostic block + let block = alloc_zero(0x100, &mut mem, &mut kernel); + mem.write_u32(addr, block); + } _ => { // All other variable exports: write 0 mem.write_u32(addr, 0); @@ -364,10 +897,6 @@ fn cmd_exec( } } - // ── Phase 4: Set up kernel ─────────────────────────────────────────── - let mut kernel = xenia_kernel::KernelState::new(); - kernel.image_base = base; - // ── Phase 5: Set up SQLite DB with full static analysis + opt-in traces ── let mut db_writer: Option = None; if let Some(db) = db_path { @@ -394,13 +923,17 @@ fn cmd_exec( .map(|s| (s.virtual_address, s.virtual_size, s.flags)) .collect(); let func_analysis = xenia_analysis::func::analyze(&image_data, base, entry, &code_sections); - eprintln!("Functions detected: {}", func_analysis.functions.len()); + info!(functions = func_analysis.functions.len(), "function detection complete"); let xref_result = xenia_analysis::xref::analyze_xrefs( &image_data, base, entry, §ions, &func_analysis, &import_map, ); let total_xrefs: usize = xref_result.xrefs.values().map(|v| v.len()).sum(); - eprintln!("Labels: {}, Cross-references: {}", xref_result.labels.len(), total_xrefs); + info!( + labels = xref_result.labels.len(), + xrefs = total_xrefs, + "xref analysis complete" + ); let disasm_info = xenia_analysis::formatter::DisasmInfo { image_base: base, @@ -412,7 +945,7 @@ fn cmd_exec( import_libraries: &header.import_libraries, }; - eprintln!("Writing database to {db}..."); + info!(db = %db, "writing database"); let mut w = xenia_analysis::DbWriter::open_fresh(std::path::Path::new(db))?; w.write_base(&disasm_info)?; w.write_disasm(&image_data, &disasm_info, &func_analysis, &xref_result.labels, &xref_result.xrefs)?; @@ -420,101 +953,862 @@ fn cmd_exec( db_writer = Some(w); } - // Set up debugger (in-memory trace disabled when any file-based trace is on) + // Set up debugger. + // + // Tier-3 perf: `trace_enabled = false` by default. The in-memory + // `trace_log: Vec` was previously populated on every + // instruction; once it filled to 100k entries each subsequent step + // performed a `Vec::remove(0)` — O(n) shifts per instruction. At + // 2M+ instructions this pathological behavior dominated run time + // (~300s of the ~600s we observed for 5M instructions, consistent + // with ~10 GB/s memory bandwidth × n²/2 shift cost). + // + // Nothing outside the debugger REPL (not shipped) reads the log, so + // the default is safe to flip. `--trace-instructions` routes full + // instruction traces to the DB writer instead, which is the correct + // path for analysis; we keep that behaviour unchanged. let mut debugger = xenia_debugger::Debugger::new(); debugger.paused = false; debugger.step_mode = xenia_debugger::StepMode::Run; - debugger.trace_enabled = !trace_instructions; + debugger.trace_enabled = false; + // DB-based trace still needs the in-memory trace off — we only want + // one authoritative stream. `trace_instructions` implies the DB + // writer pipeline is the recorder; leave `trace_enabled` false. + let _ = trace_instructions; - if !quiet { - match max_instructions { - Some(n) => println!("Starting execution (max {n} instructions)...\n"), - None => println!("Starting execution (unlimited)...\n"), + match max_instructions { + Some(n) => info!(limit = n, "starting execution"), + None => info!("starting execution (unlimited)"), + } + if let Some(ips) = ips_limit { + info!(ips_limit = ips, "throttling execution to instructions/second"); + } + + // ── Phase 6: Execution ─────────────────────────────────────────────── + // Either run on the main thread (headless), or spawn a worker and drive + // the winit event loop on the main thread (--ui). + + // M1.4 — wrap `mem` in an `Arc` after all init mutations + // are complete. The worker thread (if spawned below) holds its own + // Arc clone for the duration of the run; the CPU side passes + // `&*mem_arc` (= `&GuestMemory`) into `run_execution`. The + // trait-level invariant carrying this is correctness: writes are + // `&self` post the M1.4(b) trait flip, and `bump_page_version` is + // atomic, so concurrent CPU/GPU accesses to disjoint byte ranges are + // sound. (Same-byte concurrent access remains the caller's + // responsibility per the trait contract.) + let mem_arc = std::sync::Arc::new(mem); + + // Spawn the real GPU worker if the threaded backend was chosen at + // kernel-construction time. The handle the kernel already holds + // (`GpuBackend::Threaded`) is the CPU-side proxy; the worker owns + // the actual `GpuSystem` and runs its `run()` loop — see + // `xenia_gpu::handle::GpuWorker::run` for the concurrency model. + // M1.3's `spawn_noop_worker` is now superseded for the threaded + // path; the no-op helper is retained for unit tests. + let gpu_thread_resources = if let Some(worker) = maybe_gpu_worker.take() { + info!("gpu: threaded backend — spawning worker thread"); + let join = xenia_gpu::spawn_gpu_worker(worker, mem_arc.clone()); + Some((shutdown_arc.clone(), join)) + } else { + None + }; + + // M1.7: capture the worker-thread handle before the dispatch so the + // shutdown path can unpark it without holding `kernel` (the UI path + // moves `kernel` into `run_with_ui`). In headless+threaded mode the + // kernel survives the dispatch, but extracting here also covers the + // hypothetical UI+threaded mix once that ergonomic landmine is + // resolved. + let worker_thread_handle = if gpu_thread_resources.is_some() { + Some(kernel.gpu.mmio().worker_thread.clone()) + } else { + None + }; + + let result = if ui { + run_with_ui( + path, + // `run_with_ui` consumes `GuestMemory` by value today; M1.4 + // keeps that path on the inline backend until the UI worker + // is migrated to the Arc-shared model. Recover ownership via + // `Arc::try_unwrap` — succeeds because the GPU worker is not + // spawned in inline mode (`maybe_gpu_worker` is `None`). + std::sync::Arc::try_unwrap(mem_arc).unwrap_or_else(|_| { + panic!( + "M1.4: --ui + --gpu-thread cohabitation not yet wired; \ + choose one" + ) + }), + kernel, + debugger, + thunk_map, + db_writer, + max_instructions, + ips_limit, + quiet, + halt_on_deadlock, + started, + ) + } else { + // Wrap the headless body in a closure so any early returns + // (digest mismatch, db finalize error) flow through the GPU + // thread shutdown below rather than leaking the worker. + (|| -> Result<()> { + // M3 — opt-in `--parallel` flag spawns the interpreter on a + // dedicated worker thread under `Arc>`. + // The first M3 milestone delivers the spawn substrate (N=1 + // worker, kernel wrapped in Arc>); the actual + // per-HW-thread parallel spawn is the follow-up. With N=1 + // and the worker holding the lock for the run's duration, + // observable execution is bit-identical to lockstep mode — + // golden must still match. + let env_parallel = std::env::var("XENIA_PARALLEL") + .ok() + .is_some_and(|v| { + let v = v.trim().to_ascii_lowercase(); + v == "1" || v == "true" || v == "yes" + }); + let do_parallel = parallel || env_parallel; + // Step 04 gate: --parallel runs N=6 workers that release + // the kernel mutex around step_block, so the per-instruction + // observation path (debugger hooks, DB writer, force-per-instr) + // would either contend severely or deadlock. Reject the + // combination at the boundary with a clean error. + if do_parallel { + if debugger.wants_hooks() { + anyhow::bail!( + "--parallel is incompatible with debugger hooks; use lockstep mode" + ); + } + if db_writer.is_some() { + anyhow::bail!( + "--parallel is incompatible with --db / DB writer; use lockstep mode" + ); + } + if std::env::var("XENIA_FORCE_PER_INSTR") + .map(|v| v == "1" || v.eq_ignore_ascii_case("true")) + .unwrap_or(false) + { + anyhow::bail!( + "--parallel is incompatible with XENIA_FORCE_PER_INSTR=1; use lockstep mode" + ); + } + } + let stats = if do_parallel { + if !quiet { + info!( + "M3 — spawning N=6 worker threads + main coordinator (7-party phaser)", + ); + } + let kernel_arc = std::sync::Arc::new(std::sync::Mutex::new(kernel)); + let mem_for_worker = mem_arc.clone(); + let kernel_for_worker = kernel_arc.clone(); + // Move debugger + db_writer + thunk_map into the worker; + // the worker returns them so the caller can resume + // post-run analysis (digest, summary, diagnostic dump). + let max_instructions_w = max_instructions; + let ips_limit_w = ips_limit; + let quiet_w = quiet; + let halt_w = halt_on_deadlock; + let mut debugger_w = debugger; + let mut db_writer_w = db_writer; + let thunk_map_w = thunk_map; + let join = std::thread::Builder::new() + .name("xenia-cpu-host".into()) + .spawn(move || -> ( + ExecStats, + xenia_debugger::Debugger, + Option, + std::collections::HashMap, + ) { + // M3 real-par Step 03: per-round drop-and-reacquire + // around step_block. Worker no longer holds the + // mutex for the run's duration; `run_execution_parallel` + // owns the locking dance internally. + let stats = run_execution_parallel( + &*mem_for_worker, + &kernel_for_worker, + &mut debugger_w, + &thunk_map_w, + db_writer_w.as_mut(), + max_instructions_w, + ips_limit_w, + quiet_w, + halt_w, + None, + ); + (stats, debugger_w, db_writer_w, thunk_map_w) + }) + .expect("spawn xenia-cpu-host"); + let (stats, debugger_back, db_writer_back, thunk_map_back) = + join.join().expect("xenia-cpu-host worker panicked"); + debugger = debugger_back; + db_writer = db_writer_back; + let _ = thunk_map_back; // keep alive via shadowing already handled + // Recover the kernel from the Arc>. The worker + // dropped its clone on return; only one strong ref + // remains, so `try_unwrap` succeeds. + kernel = std::sync::Arc::try_unwrap(kernel_arc) + .map_err(|_| anyhow::anyhow!("M3: kernel Arc still has clones"))? + .into_inner() + .map_err(|_| anyhow::anyhow!("M3: kernel mutex poisoned"))?; + stats + } else { + run_execution( + &*mem_arc, + &mut kernel, + &mut debugger, + &thunk_map, + db_writer.as_mut(), + max_instructions, + ips_limit, + quiet, + halt_on_deadlock, + None, + ) + }; + // After the conditional bound `stats`, `kernel`, + // `debugger`, `db_writer` above, post-run analysis continues + // unchanged: db finalize, print_summary, dump_thread_diagnostic, + // run digest emit + golden compare. + if let Some(ref mut db) = db_writer { + db.finalize_traces()?; + } + print_summary(kernel.scheduler.ctx(0), &debugger, &db_writer, quiet); + dump_thread_diagnostic(&kernel, quiet); + info!( + wall_ms = started.elapsed().as_millis() as u64, + instructions = stats.instruction_count, + import_calls = stats.import_count, + unimplemented = stats.unimpl_count, + "exec complete" + ); + // P8 — optional run-digest emission. Built from the live + // `kernel.gpu.stats` + `ExecStats` after the run completes; + // stable ordering + stable field names so byte-level JSON diffs + // catch any drift between runs. + if digest_out.is_some() || digest_expect.is_some() { + let digest = RunDigest::capture(path, &kernel, &stats); + let json = digest.to_json(); + if let Some(out_path) = digest_out { + std::fs::write(out_path, &json)?; + info!(out = out_path, "run digest written"); + } else { + println!("{json}"); + } + if let Some(expect_path) = digest_expect { + let expected = std::fs::read_to_string(expect_path) + .map_err(|e| anyhow::anyhow!("failed to read expected digest: {e}"))?; + if json.trim() != expected.trim() { + eprintln!("DIGEST MISMATCH vs {expect_path}"); + eprintln!("--- expected ---"); + eprintln!("{}", expected.trim()); + eprintln!("--- actual ---"); + eprintln!("{}", json.trim()); + return Err(anyhow::anyhow!( + "run digest differs from golden; see diff above" + )); + } + info!("run digest matches golden"); + } + } + Ok(()) + })() + }; + + // M1.7 — signal the GPU worker thread to exit and join it with a + // 1 s defensive timeout. We also `unpark()` it via the MMIO + // worker-thread handle so it observes shutdown immediately rather + // than waiting up to `WORKER_PARK_TIMEOUT` (16 ms) on its own. The + // helper logs at `error!` if the join blows the budget; we emit a + // `warn!` here as a second-level breadcrumb tied to this specific + // run. We do NOT propagate the timeout as an error: process + // teardown is more important than blocking on a stuck thread. + if let Some((shutdown, join)) = gpu_thread_resources { + // Wake the parker so the worker observes `shutdown` on its next + // loop iteration without waiting for `park_timeout`. + if let Some(handle_arc) = worker_thread_handle { + if let Ok(g) = handle_arc.lock() { + if let Some(t) = g.as_ref() { + t.unpark(); + } + } + } + if xenia_gpu::shutdown_and_join_with_timeout( + &shutdown, + join, + std::time::Duration::from_secs(1), + ) + .is_err() + { + warn!("GPU worker thread did not exit in 1s; continuing teardown"); } } - // ── Phase 6: Execution loop with thunk interception ────────────────── - use xenia_cpu::interpreter::{step, StepResult}; - use xenia_cpu::PpcOpcode; + result +} - let mut instruction_count: u64 = 0; - let mut unimpl_count: u64 = 0; - let mut import_count: u64 = 0; +/// Result of one execution pass — used by both the headless path and the +/// worker thread behind `--ui`. +#[derive(Default)] +struct ExecStats { + instruction_count: u64, + import_count: u64, + unimpl_count: u64, +} - loop { - if let Some(limit) = max_instructions { - if instruction_count >= limit { - println!("\nReached max instruction count ({limit})"); +/// P8 — deterministic run digest. Emitted as stable-ordered JSON by +/// `xenia-rs check`. Every field is a u64 counter; the file is trivially +/// diffable via `diff` / `cmp`. New counters are appended at the bottom +/// so golden files authored today still match tomorrow's schema minus +/// any new lines. Semver-wise this is a "pre-1.0" API — regressions bail +/// on first mismatch, no field-wise tolerance. +struct RunDigest { + path: String, + instructions: u64, + imports: u64, + unimpl: u64, + packets: u64, + draws: u64, + swaps: u64, + resolves: u64, + unique_render_targets: u64, + shader_blobs_live: u64, + interrupts_delivered: u64, + interrupts_dropped: u64, + texture_cache_entries: u64, + texture_decodes: u64, +} + +impl RunDigest { + fn capture(path: &str, kernel: &xenia_kernel::KernelState, stats: &ExecStats) -> Self { + // M1.4: route stats through the backend snapshot accessor. Inline + // mode reads directly from the live `GpuSystem`; threaded mode + // reads from the worker's published `Arc>` + // (refreshed each outer loop iteration on the worker thread). + let snap = kernel.gpu.digest_snapshot(); + Self { + path: path.to_string(), + instructions: stats.instruction_count, + imports: stats.import_count, + unimpl: stats.unimpl_count, + packets: snap.stats.packets_executed, + draws: snap.stats.draws_seen, + swaps: snap.stats.swaps_seen, + resolves: snap.stats.resolves_total, + unique_render_targets: snap.stats.unique_render_targets, + shader_blobs_live: snap.shader_blobs_live, + interrupts_delivered: kernel.interrupts.delivered, + interrupts_dropped: kernel.interrupts.dropped, + texture_cache_entries: snap.texture_cache_entries, + texture_decodes: snap.texture_decodes, + } + } + + fn to_json(&self) -> String { + // Hand-rolled deterministic ordering — alphabetical-ish + keys + // quoted. No external dep, no field ambiguity. + format!( + "{{\n \"path\": \"{}\",\n \ + \"instructions\": {},\n \ + \"imports\": {},\n \ + \"unimpl\": {},\n \ + \"packets\": {},\n \ + \"draws\": {},\n \ + \"swaps\": {},\n \ + \"resolves\": {},\n \ + \"unique_render_targets\": {},\n \ + \"shader_blobs_live\": {},\n \ + \"interrupts_delivered\": {},\n \ + \"interrupts_dropped\": {},\n \ + \"texture_cache_entries\": {},\n \ + \"texture_decodes\": {}\n}}\n", + escape_json_string(&self.path), + self.instructions, + self.imports, + self.unimpl, + self.packets, + self.draws, + self.swaps, + self.resolves, + self.unique_render_targets, + self.shader_blobs_live, + self.interrupts_delivered, + self.interrupts_dropped, + self.texture_cache_entries, + self.texture_decodes, + ) + } +} + +fn escape_json_string(s: &str) -> String { + s.replace('\\', r"\\").replace('"', r#"\""#) +} + +/// Round-control signal returned by the per-round coordinator helpers. +/// `BreakOuter` halts the scheduler loop; `Continue` resumes the outer +/// loop (either to step the round or to retry after an idle-advance). +#[derive(Debug)] +enum RoundCtl { + BreakOuter, + Continue, +} + +/// Per-round prologue: max-instruction budget check, IPS throttle, UI +/// shutdown poll, heartbeat log, vsync ticker (with d1mode bit set when +/// the vsync fires), pending-timer fire, graphics-interrupt injection. +/// Returns `BreakOuter` only when the run hits its budget or the UI +/// asks for shutdown. +fn coord_pre_round( + kernel: &mut xenia_kernel::KernelState, + stats: &ExecStats, + max_instructions: Option, + ips_limit: Option, + throttle_start: Instant, + shutdown: &Option>, +) -> RoundCtl { + const SHUTDOWN_CHECK_MASK: u64 = 0x1FFF; + const HEARTBEAT_MASK: u64 = 0xF_FFFF; // ~1M instructions + + if let Some(limit) = max_instructions + && stats.instruction_count >= limit + { + info!(limit, "reached max instruction count"); + return RoundCtl::BreakOuter; + } + + if let Some(ips) = ips_limit + && ips > 0 + && stats.instruction_count > 0 + { + let expected_ns = (stats.instruction_count as u128) + .saturating_mul(1_000_000_000u128) + / ips as u128; + let actual_ns = throttle_start.elapsed().as_nanos(); + if actual_ns < expected_ns { + let sleep_ns = (expected_ns - actual_ns).min(u64::MAX as u128) as u64; + std::thread::sleep(std::time::Duration::from_nanos(sleep_ns)); + } + } + + if (stats.instruction_count & SHUTDOWN_CHECK_MASK) == 0 { + if let Some(flag) = shutdown + && flag.load(std::sync::atomic::Ordering::Relaxed) + { + info!(instructions = stats.instruction_count, "UI requested shutdown"); + return RoundCtl::BreakOuter; + } + if let Some(ref ui) = kernel.ui { + ui.instructions_counter + .store(stats.instruction_count, std::sync::atomic::Ordering::Relaxed); + } + } + + if (stats.instruction_count & HEARTBEAT_MASK) == 0 + && stats.instruction_count > 0 + && tracing::enabled!(tracing::Level::DEBUG) + { + let t = kernel.scheduler.ctx(0); + debug!( + cycle = stats.instruction_count, + pc = format_args!("{:#010x}", t.pc), + lr = format_args!("{:#010x}", t.lr as u32), + sp = format_args!("{:#010x}", t.gpr[1] as u32), + "cpu heartbeat" + ); + } + + if kernel.interrupts.tick_vsync(stats.instruction_count) { + use std::sync::atomic::Ordering; + let mmio = kernel.gpu.mmio(); + let prev = mmio.d1mode_vblank_vline_status.load(Ordering::Relaxed); + mmio.d1mode_vblank_vline_status + .store(prev | 0x1, Ordering::Relaxed); + } + + kernel.fire_due_timers(); + try_inject_graphics_interrupt(kernel); + + RoundCtl::Continue +} + +/// Per-round idle-advance: invoked when `round_schedule()` returns no +/// runnable threads. Advances time to the earliest pending timer or +/// timed-wait deadline, fires due timers, and handles deadline wakes. +/// On hard deadlock either halts (with diagnostics) or force-wakes +/// blocked waiters with `STATUS_TIMEOUT`. `Continue` restarts the +/// outer loop; `BreakOuter` halts the scheduler. +fn coord_idle_advance( + kernel: &mut xenia_kernel::KernelState, + halt_on_deadlock: bool, + shutdown: &Option>, + stats: &ExecStats, +) -> RoundCtl { + let next_timer = kernel.earliest_timer_deadline(); + let next_wait = kernel.scheduler.earliest_wait_deadline(); + let target = match (next_timer, next_wait) { + (Some(a), Some(b)) => Some(a.min(b)), + (Some(a), None) => Some(a), + (None, Some(b)) => Some(b), + (None, None) => None, + }; + if let Some(target) = target { + kernel.scheduler.advance_all_timebases_to(target); + kernel.fire_due_timers(); + if let Some((r, reason)) = kernel.scheduler.advance_to_next_wake_if_due(target) { + kernel.handle_timeout_wake(r, reason); + } + return RoundCtl::Continue; + } + if kernel.scheduler.has_live_thread() { + if halt_on_deadlock { + tracing::warn!( + instructions = stats.instruction_count, + "scheduler deadlock: halting (--halt-on-deadlock); dumping per-thread diagnostic" + ); + for (hw_id, slot) in kernel.scheduler.slots.iter().enumerate() { + if slot.runqueue.is_empty() { + tracing::warn!(hw = hw_id, state = "empty", "deadlock slot"); + continue; + } + for (idx, t) in slot.runqueue.iter().enumerate() { + tracing::warn!( + hw = hw_id, + idx = idx, + tid = t.tid, + state = ?t.state, + pc = format_args!("{:#010x}", t.ctx.pc), + lr = format_args!("{:#010x}", t.ctx.lr as u32), + sp = format_args!("{:#010x}", t.ctx.gpr[1] as u32), + "deadlock thread" + ); + } + } + metrics::counter!("scheduler.deadlock_halts").increment(1); + if let Some(flag) = shutdown { + flag.store(true, std::sync::atomic::Ordering::Relaxed); + } + return RoundCtl::BreakOuter; + } + tracing::warn!( + snapshot = ?kernel.scheduler.diagnostic_snapshot(), + "scheduler deadlock: force-waking blocked waiters with STATUS_TIMEOUT" + ); + let woken = kernel.scheduler.unblock_on_deadlock(); + // STATUS_TIMEOUT = 0x00000102 per NT conventions. + const STATUS_TIMEOUT: u64 = 0x0000_0102; + for r in &woken { + kernel.scheduler.ctx_mut_ref(*r).gpr[3] = STATUS_TIMEOUT; + } + metrics::counter!("scheduler.deadlock_recoveries").increment(1); + if woken.is_empty() { + info!("scheduler: deadlock recovery woke zero threads — halting"); + return RoundCtl::BreakOuter; + } + return RoundCtl::Continue; + } + info!("scheduler: no live HW threads remain — halting"); + RoundCtl::BreakOuter +} + +/// Per-round epilogue: clear the slot visit, drive the inline GPU +/// proportional to instructions executed in this round, drain any +/// GPU-side pending interrupts, and check liveness. Returns +/// `BreakOuter` when no live HW threads remain. +fn coord_post_round( + kernel: &mut xenia_kernel::KernelState, + mem: &xenia_memory::GuestMemory, + stats: &ExecStats, + instrs_at_round_start: u64, +) -> RoundCtl { + kernel.scheduler.end_slot_visit(); + + let executed_this_round = stats + .instruction_count + .saturating_sub(instrs_at_round_start); + let mut gpu_runs = (executed_this_round + / xenia_cpu::scheduler::HW_THREAD_COUNT as u64) + .max(1); + if gpu_runs > 64 { + gpu_runs = 64; + } + if let Some(gpu) = kernel.gpu.as_inline_mut() { + gpu.sync_with_mmio(); + for _ in 0..gpu_runs { + if gpu.is_ready(mem) { + let _ = gpu.execute_one(mem); + gpu.sync_with_mmio(); + } else { break; } } + } else { + let _ = gpu_runs; + } - // ── Import thunk interception ── - if let Some((module, ordinal, name)) = thunk_map.get(&ctx.pc) { - let module = *module; - let ordinal_u32 = *ordinal as u32; + if kernel.gpu.has_pending_interrupts() { + for _pi in kernel.gpu.take_pending_interrupts() { + kernel + .interrupts + .queue_interrupt(xenia_kernel::INTERRUPT_SOURCE_CP); + } + } - let thunk_pc = ctx.pc; - let args = [ctx.gpr[3], ctx.gpr[4], ctx.gpr[5], ctx.gpr[6]]; + if !kernel.scheduler.has_live_thread() { + info!("scheduler: all threads exited — halting"); + return RoundCtl::BreakOuter; + } + RoundCtl::Continue +} - kernel.call_export(module, ordinal_u32, &mut ctx, &mut mem); +/// Per-worker state for one HW slot. Owns its block cache + decode +/// cache so that under per-HW-thread parallelism (Step 04) the worker +/// thread can step its slot without sharing mutable cache state with +/// peers. In lockstep mode this is just six independent caches on the +/// same host thread — observable behavior identical, slightly higher +/// per-slot fill churn. +struct WorkerCtx { + hw_id: u8, + block_cache: xenia_cpu::block_cache::BlockCache, + decode_cache: xenia_cpu::decoder::DecodeCache, + force_per_instr: bool, +} - if let Some(ref mut db) = db_writer { - db.log_import_call(xenia_analysis::ImportCallEntry { - address: thunk_pc, - cycle: ctx.cycle_count, - module: match module { - ModuleId::Xboxkrnl => "xboxkrnl.exe".to_string(), - ModuleId::Xam => "xam.xex".to_string(), - ModuleId::Xbdm => "xbdm.xex".to_string(), - }, - ordinal: *ordinal, - name: name.clone(), - arg_r3: args[0], - arg_r4: args[1], - arg_r5: args[2], - arg_r6: args[3], - return_value: ctx.gpr[3], - }); +impl WorkerCtx { + fn new(hw_id: u8, force_per_instr: bool) -> Self { + Self { + hw_id, + block_cache: xenia_cpu::block_cache::BlockCache::new(), + decode_cache: xenia_cpu::decoder::DecodeCache::new(), + force_per_instr, + } + } +} + +/// Outcome of `worker_prologue`. Either the slot was fully handled +/// inline (halt-sentinel, thunk dispatch, per-instruction step), or +/// the caller must run a block-cache `step_block` and then call +/// `worker_epilogue`, or the outer scheduler loop must break (fault, +/// debugger break, main-thread halt). +enum PrologueOutcome { + Continue, + BreakOuter, + StepBlock { + tid: Option, + thread_ref: xenia_cpu::ThreadRef, + block_ptr: *const xenia_cpu::block_cache::DecodedBlock, + pc_before: u32, + }, +} + +enum SlotOutcome { + Continue, + BreakOuter, +} + +/// Per-slot prologue. Performs `begin_slot_visit`, halt-sentinel +/// detection + restore, import-thunk dispatch (under the kernel lock), +/// unmapped-PC fault check, and either kicks off a block-cache step +/// (returning `StepBlock`) or runs the per-instruction observation +/// path inline (debugger / DB writer). When this returns +/// `PrologueOutcome::Continue` the caller skips to the next slot; +/// `BreakOuter` exits the scheduler loop; `StepBlock` hands off to a +/// `step_block(ctx, mem, &*block_ptr)` call followed by +/// `worker_epilogue`. +fn worker_prologue( + wc: &mut WorkerCtx, + kernel: &mut xenia_kernel::KernelState, + mem: &xenia_memory::GuestMemory, + debugger: &mut xenia_debugger::Debugger, + db_writer: &mut Option<&mut xenia_analysis::DbWriter>, + thunk_map: &HashMap, + stats: &mut ExecStats, +) -> PrologueOutcome { + use xenia_cpu::interpreter::{step_cached, StepResult}; + use xenia_cpu::scheduler::{HwState, INITIAL_GUEST_TID}; + use xenia_cpu::PpcOpcode; + const LR_HALT: u32 = xenia_cpu::context::LR_HALT_SENTINEL as u32; + + let hw_id = wc.hw_id; + kernel.scheduler.begin_slot_visit(hw_id); + + // Under per-HW-thread parallelism (run_execution_parallel) the + // coordinator's runnable-mask snapshot may be stale by the time + // this worker picks up the lock — a peer worker's call_export + // could have blocked the slot's last Ready thread. begin_slot_visit + // then sets running_idx = None and ctx(hw_id) returns the idle + // sentinel (pc = 0). Without this short-circuit the prologue would + // fall through to the unmapped-PC fault and break the outer loop. + if kernel.scheduler.current.is_none() { + return PrologueOutcome::Continue; + } + + let pc = kernel.scheduler.ctx(hw_id).pc; + + // 1) Halt-sentinel check (per HW thread). + if pc == LR_HALT { + let injected_here = kernel.interrupts.saved.is_some() + && kernel + .interrupts + .injected_ref + .map(|r| r.hw_id == hw_id) + == Some(true); + if injected_here + && let Some(saved) = kernel.interrupts.saved.take() + { + let target_ref = kernel + .interrupts + .injected_ref + .expect("injected_here guard"); + saved.restore(kernel.scheduler.ctx_mut_ref(target_ref)); + kernel.interrupts.delivered += 1; + let source = saved.source; + let mut restore_outcome = "ready"; + let current = kernel.scheduler.thread(target_ref).state.clone(); + if let HwState::ServicingIrq(reason) = current { + kernel.scheduler.thread_mut(target_ref).state = + HwState::Blocked(reason); + restore_outcome = "reblocked"; } - - // Simulate blr (return to caller) - ctx.pc = ctx.lr as u32; - ctx.cycle_count += 1; - instruction_count += 1; - import_count += 1; - continue; + tracing::debug!( + source, + hw_id, + outcome = restore_outcome, + "graphics interrupt: callback returned" + ); + return PrologueOutcome::Continue; } - - // Check if PC is in mapped memory - if !mem.is_mapped(ctx.pc) { - println!("[{:>8}] FAULT: PC {:#010x} is in unmapped memory", instruction_count, ctx.pc); - break; + let tid = kernel.scheduler.tid(hw_id); + tracing::info!( + hw_id, + ?tid, + is_main = tid == Some(INITIAL_GUEST_TID), + cycle = stats.instruction_count, + "HW thread returned to LR sentinel — marking exited" + ); + let (_, _exited_tid, handle_opt) = kernel.scheduler.exit_current(0); + if let Some(h) = handle_opt + && let Some(xenia_kernel::objects::KernelObject::Thread { + exit_code, + waiters, + .. + }) = kernel.objects.get_mut(&h) + { + *exit_code = Some(0); + let to_wake: Vec = std::mem::take(waiters); + for w in to_wake { + kernel.scheduler.wake_ref(w); + } } + return PrologueOutcome::Continue; + } - // Pre-step debugger - debugger.pre_step(&ctx, &mem); + // 2) Import thunk intercept. + if let Some((module, ordinal, name)) = thunk_map.get(&pc) { + let module = *module; + let ordinal_u32 = *ordinal as u32; + let thunk_pc = pc; + let args = { + let c = kernel.scheduler.ctx(hw_id); + [c.gpr[3], c.gpr[4], c.gpr[5], c.gpr[6]] + }; + kernel.call_export(module, ordinal_u32, mem); + let post_ref = kernel.scheduler.current; + let c = match post_ref { + Some(r) => kernel.scheduler.ctx_mut_ref(r), + None => kernel.scheduler.ctx_mut(hw_id), + }; + let return_value = c.gpr[3]; + c.pc = c.lr as u32; + c.cycle_count += 1; + c.timebase += 1; + if let Some(db) = db_writer.as_deref_mut() { + db.log_import_call(xenia_analysis::ImportCallEntry { + address: thunk_pc, + cycle: c.cycle_count, + module: match module { + ModuleId::Xboxkrnl => "xboxkrnl.exe".to_string(), + ModuleId::Xam => "xam.xex".to_string(), + ModuleId::Xbdm => "xbdm.xex".to_string(), + }, + ordinal: *ordinal, + name: name.clone(), + arg_r3: args[0], + arg_r4: args[1], + arg_r5: args[2], + arg_r6: args[3], + return_value, + }); + } + stats.instruction_count += 1; + stats.import_count += 1; + kernel.scheduler.decrement_quantum(); + return PrologueOutcome::Continue; + } - let pc_before = ctx.pc; - // Decode the instruction word before step() so we can classify branches - let raw_before = mem.read_u32(pc_before); - let opcode_before = xenia_cpu::decode(raw_before, pc_before).opcode; + // 3) Unmapped PC. + if !mem.is_mapped(pc) { + tracing::error!( + cycle = stats.instruction_count, + pc = format_args!("{:#010x}", pc), + hw_id, + "FAULT: PC in unmapped memory" + ); + return PrologueOutcome::BreakOuter; + } - let result = step(&mut ctx, &mut mem); - instruction_count += 1; + // 4) Decide block-cache fast path vs per-instruction observation. + let hooks = debugger.wants_hooks(); + let observe_per_instruction = hooks || db_writer.is_some() || wc.force_per_instr; - if let Some(ref mut db) = db_writer { + if !observe_per_instruction { + // Block-cache fast path: look up the block under the kernel + // lock and hand it to the caller via raw pointer. The pointer + // is sound for the duration of this slot's step_block call + // because `wc.block_cache` is owned by this worker (no peer + // mutation possible) and the cache slot is not touched again + // until the next prologue iteration. + let tid = kernel.scheduler.tid(hw_id); + let thread_ref = kernel + .scheduler + .current + .expect("begin_slot_visit set scheduler.current to Some when slot has runnable thread"); + let block_ptr: *const xenia_cpu::block_cache::DecodedBlock = { + let pc_for_lookup = kernel.scheduler.ctx(hw_id).pc; + let b: &xenia_cpu::block_cache::DecodedBlock = + wc.block_cache.lookup_or_build(pc_for_lookup, mem); + b + }; + return PrologueOutcome::StepBlock { + tid, + thread_ref, + block_ptr, + pc_before: pc, + }; + } + + // 5) Per-instruction observation path (debugger / DB). + let result = { + let ctx = kernel.scheduler.ctx_mut(hw_id); + if hooks { + debugger.pre_step(ctx, mem); + } + let page_ver = mem.page_version(ctx.pc); + let r = step_cached(ctx, mem, &mut wc.decode_cache, page_ver); + stats.instruction_count += 1; + if let Some(db) = db_writer.as_deref_mut() { + let raw_before = mem.read_u32(pc); + let opcode_before = xenia_cpu::decode(raw_before, pc).opcode; db.log_instruction(xenia_analysis::ExecTraceEntry { - address: pc_before, + address: pc, cycle: ctx.cycle_count, r3: ctx.gpr[3], r4: ctx.gpr[4], lr: ctx.lr, sp: ctx.gpr[1], }); - - // Branch detection — fallthrough (pc_before + 4) means untaken conditional - if opcode_before.is_branch() && ctx.pc != pc_before.wrapping_add(4) { + if opcode_before.is_branch() && ctx.pc != pc.wrapping_add(4) { let lk = (raw_before & 1) == 1; let kind: &'static str = if lk { "call" @@ -527,50 +1821,877 @@ fn cmd_exec( }; db.log_branch(xenia_analysis::BranchTraceEntry { cycle: ctx.cycle_count, - source: pc_before, + source: pc, target: ctx.pc, kind, lr: ctx.lr, }); } } + if hooks { + debugger.post_step(ctx, mem); + } + r + }; - // Post-step debugger - debugger.post_step(&ctx, &mem); - - match result { - StepResult::Continue => {} - StepResult::SystemCall => { - tracing::warn!("SYSCALL at {:#010x}", pc_before); + match result { + StepResult::Continue => {} + StepResult::SystemCall => { + tracing::warn!("SYSCALL at {:#010x} (hw={})", pc, hw_id); + } + StepResult::Unimplemented(op) => { + stats.unimpl_count += 1; + metrics::counter!("cpu.unimplemented").increment(1); + if stats.unimpl_count <= 50 { + warn!( + cycle = stats.instruction_count, + ?op, + pc = format_args!("{:#010x}", pc), + hw_id, + "UNIMPL" + ); + } else if stats.unimpl_count == 51 { + info!("suppressing further UNIMPL messages"); } - StepResult::Unimplemented(op) => { - unimpl_count += 1; - if unimpl_count <= 50 { - println!("[{:>8}] UNIMPL: {:?} at {:#010x}", instruction_count, op, pc_before); - } else if unimpl_count == 51 { - println!(" (suppressing further UNIMPL messages)"); + } + StepResult::Trap => { + tracing::warn!(pc = format_args!("{:#010x}", pc), hw_id, "TRAP"); + } + StepResult::Halted => { + info!(cycle = stats.instruction_count, hw_id, "HALTED"); + let tid = kernel.scheduler.tid(hw_id); + if tid == Some(INITIAL_GUEST_TID) { + return PrologueOutcome::BreakOuter; + } + kernel.scheduler.exit_current(0); + } + } + + kernel.scheduler.decrement_quantum(); + + if debugger.should_break() { + let pc_now = kernel.scheduler.ctx(hw_id).pc; + info!( + cycle = stats.instruction_count, + pc = format_args!("{:#010x}", pc_now), + hw_id, + "BREAK" + ); + return PrologueOutcome::BreakOuter; + } + + PrologueOutcome::Continue +} + +/// Per-slot epilogue for the block-cache fast path. Charges +/// quantum decrements, applies the StepResult, and returns whether +/// the outer scheduler loop should continue or break. SAFETY: caller +/// must guarantee `block_ptr` still points at a live `DecodedBlock` +/// (it does, because the worker owns `wc.block_cache` and didn't +/// re-enter `lookup_or_build` between prologue and epilogue). +fn worker_epilogue( + wc: &mut WorkerCtx, + kernel: &mut xenia_kernel::KernelState, + debugger: &mut xenia_debugger::Debugger, + stats: &mut ExecStats, + _tid: Option, + _thread_ref: xenia_cpu::ThreadRef, + block_ptr: *const xenia_cpu::block_cache::DecodedBlock, + pc_before: u32, + result: xenia_cpu::interpreter::StepResult, + executed: u64, +) -> SlotOutcome { + use xenia_cpu::interpreter::StepResult; + use xenia_cpu::scheduler::INITIAL_GUEST_TID; + + let hw_id = wc.hw_id; + let block = unsafe { &*block_ptr }; + + stats.instruction_count = stats.instruction_count.wrapping_add(executed); + + for _ in 0..executed { + kernel.scheduler.decrement_quantum(); + } + + match result { + StepResult::Continue => {} + StepResult::SystemCall => { + let last_pc = block.instrs.last().map(|i| i.addr).unwrap_or(pc_before); + tracing::warn!("SYSCALL at {:#010x} (hw={})", last_pc, hw_id); + } + StepResult::Unimplemented(op) => { + stats.unimpl_count += 1; + metrics::counter!("cpu.unimplemented").increment(1); + let unimpl_pc = kernel.scheduler.ctx(hw_id).pc.wrapping_sub(4); + if stats.unimpl_count <= 50 { + warn!( + cycle = stats.instruction_count, + ?op, + pc = format_args!("{:#010x}", unimpl_pc), + hw_id, + "UNIMPL" + ); + } else if stats.unimpl_count == 51 { + info!("suppressing further UNIMPL messages"); + } + } + StepResult::Trap => { + let last_pc = block.instrs.last().map(|i| i.addr).unwrap_or(pc_before); + tracing::warn!(pc = format_args!("{:#010x}", last_pc), hw_id, "TRAP"); + } + StepResult::Halted => { + info!(cycle = stats.instruction_count, hw_id, "HALTED"); + let tid = kernel.scheduler.tid(hw_id); + if tid == Some(INITIAL_GUEST_TID) { + return SlotOutcome::BreakOuter; + } + kernel.scheduler.exit_current(0); + } + } + + if debugger.should_break() { + let pc_now = kernel.scheduler.ctx(hw_id).pc; + info!( + cycle = stats.instruction_count, + pc = format_args!("{:#010x}", pc_now), + hw_id, + "BREAK" + ); + return SlotOutcome::BreakOuter; + } + + SlotOutcome::Continue +} + +#[instrument(skip_all, fields(max = ?max_instructions, ips = ?ips_limit))] +fn run_execution( + mem: &xenia_memory::GuestMemory, + kernel: &mut xenia_kernel::KernelState, + debugger: &mut xenia_debugger::Debugger, + thunk_map: &HashMap, + mut db_writer: Option<&mut xenia_analysis::DbWriter>, + max_instructions: Option, + ips_limit: Option, + quiet: bool, + halt_on_deadlock: bool, + shutdown: Option>, +) -> ExecStats { + use xenia_cpu::interpreter::step_block; + + let mut stats = ExecStats::default(); + let _ = quiet; // retained for future per-kind suppression + + // `--halt-on-deadlock` CLI flag OR `XENIA_HALT_ON_DEADLOCK=1|true` env var: + // when the scheduler next hits a hard deadlock (every live HW thread + // blocked on a handle wait with no pending timer) we bail out with a + // per-thread diagnostic instead of force-waking waiters with + // STATUS_TIMEOUT. Env-var fallback is cheap and lets `just` recipes / + // one-off shells flip the behaviour without rewiring the subcommand. + let halt_on_deadlock = halt_on_deadlock + || std::env::var("XENIA_HALT_ON_DEADLOCK") + .map(|v| v == "1" || v.eq_ignore_ascii_case("true")) + .unwrap_or(false); + + // Throttle anchor: wall-clock at which instruction 0 was retired. Used to + // pace execution when `ips_limit` is set — each round we compute the + // expected elapsed time for the current retired-count and sleep the + // difference if we're running ahead of schedule. + let throttle_start = Instant::now(); + + // Escape hatch for A/B testing: when set, every dispatch goes + // through `step_cached` even outside the debugger/DB trace modes. + // Useful for pinpointing whether a behaviour regression is the + // block-cache fast path's fault or something else. + let force_per_instr = std::env::var("XENIA_FORCE_PER_INSTR") + .map(|v| v == "1" || v.eq_ignore_ascii_case("true")) + .unwrap_or(false); + + // M3.2 / M3-realpar Step 02 — six per-HW-slot worker contexts, each + // owning its own block cache + decode cache. Under per-HW-thread + // parallelism (Step 04) each WorkerCtx will be moved into its own + // worker thread; in lockstep it's six independent caches on the + // same host thread. The block cache (1.5 MiB-ish per slot) lives + // here so the prologue/epilogue split can borrow it directly. + let mut workers: [WorkerCtx; xenia_cpu::scheduler::HW_THREAD_COUNT] = + std::array::from_fn(|i| WorkerCtx::new(i as u8, force_per_instr)); + + 'outer: loop { + // Per-round prologue: budget / shutdown / heartbeat / vsync / + // timers / graphics-interrupt injection. Carved into + // `coord_pre_round` so the parallel scheduler (Step 03+) can + // call the same coordination logic between phaser barriers + // without duplicating it from the lockstep path. + match coord_pre_round( + kernel, + &stats, + max_instructions, + ips_limit, + throttle_start, + &shutdown, + ) { + RoundCtl::BreakOuter => break, + RoundCtl::Continue => {} + } + + // Snapshot round schedule. `round_schedule` also advances rng state + // when seeded; mutation is intentional. + kernel.scheduler.begin_round(); + let order = kernel.scheduler.round_schedule(); + + if order.is_empty() { + // No Ready threads — advance time to the earliest pending + // deadline, fire timers, handle deadline wakes, and on hard + // deadlock either halt (with diagnostics) or force-wake. + // Logic carved into `coord_idle_advance`. + match coord_idle_advance(kernel, halt_on_deadlock, &shutdown, &stats) { + RoundCtl::BreakOuter => break, + RoundCtl::Continue => continue, + } + } + + // Tier-4 perf: track guest-instruction progress within this + // outer round so the post-round GPU pacer can run the GPU at a + // rate proportional to executed instructions. With the + // block-cache fast path executing up to MAX_BLOCK_INSTRS guest + // instructions per inner iteration vs the per-instruction + // path's 1, a fixed "once per round" pace would starve the + // GPU when block dispatch engages. + let instrs_at_round_start = stats.instruction_count; + + for hw_id in order { + let wc = &mut workers[hw_id as usize]; + match worker_prologue( + wc, + kernel, + mem, + debugger, + &mut db_writer, + thunk_map, + &mut stats, + ) { + PrologueOutcome::Continue => continue, + PrologueOutcome::BreakOuter => break 'outer, + PrologueOutcome::StepBlock { + tid, + thread_ref, + block_ptr, + pc_before, + } => { + // Block-cache step. The lockstep path keeps the + // kernel state borrowed straight through (single + // host thread, no contention). Step 03 of the + // M3 real-parallelism plan introduces a + // drop-and-reacquire window around `step_block` + // for the parallel branch. + let cycle_before = kernel.scheduler.ctx_mut_ref(thread_ref).cycle_count; + let block = unsafe { &*block_ptr }; + let result = { + let ctx = kernel.scheduler.ctx_mut_ref(thread_ref); + step_block(ctx, mem, block) + }; + let executed = kernel + .scheduler + .ctx_mut_ref(thread_ref) + .cycle_count + .saturating_sub(cycle_before); + match worker_epilogue( + wc, + kernel, + debugger, + &mut stats, + tid, + thread_ref, + block_ptr, + pc_before, + result, + executed, + ) { + SlotOutcome::Continue => continue, + SlotOutcome::BreakOuter => break 'outer, + } } } - StepResult::Trap => { - tracing::warn!("TRAP at {:#010x}", pc_before); - } - StepResult::Halted => { - println!("[{:>8}] HALTED", instruction_count); - break; - } } - if debugger.should_break() { - println!("[{:>8}] BREAK at {:#010x}", instruction_count, ctx.pc); - break; + // Per-round epilogue: end the slot visit, drive the inline + // GPU proportional to instructions executed, drain GPU-side + // pending interrupts, and break the outer loop if no live HW + // threads remain. Carved into `coord_post_round`. + match coord_post_round(kernel, mem, &stats, instrs_at_round_start) { + RoundCtl::BreakOuter => break, + RoundCtl::Continue => {} } } + stats +} - // Finalize trace DB (flush buffers + create indices) - if let Some(ref mut db) = db_writer { - db.finalize_traces()?; +/// M3 real-parallelism (Step 04) — N=6 worker threads + main-thread +/// coordinator + 7-party phaser. Workers each own a `WorkerCtx` +/// (per-slot block + decode caches), call `worker_prologue` / +/// `step_block` / `worker_epilogue` on their slot, releasing the +/// kernel mutex around the lock-free `step_block`. Coordinator runs +/// `coord_pre_round` / `coord_idle_advance` / `coord_post_round` +/// between two phaser barriers per round. +/// +/// Phaser (party_count = 7): +/// - B1 (round-start gate): coordinator publishes runnable mask + drops +/// kernel lock; workers wait. After B1 releases, workers see the +/// mask and either (a) acquire the lock and process their slot, +/// or (b) `phaser.skip(hw_id)` to skip B2. +/// - B2 (round-end gate): workers signal completion. Coordinator +/// reacquires the kernel lock and runs `coord_post_round`. +/// +/// Per-worker locking dance (when runnable): +/// 1. Acquire kernel lock (stats lock briefly nested). +/// 2. `worker_prologue` → `Continue` | `BreakOuter` | `StepBlock`. +/// 3. On `StepBlock`: `mem::replace` ctx out; clear `scheduler.current` +/// via `end_slot_visit`; drop kernel lock. +/// 4. `step_block` on the local ctx (no lock held — actual parallelism). +/// 5. Reacquire lock; `find_by_tid(tid)` to resolve post-migration +/// ThreadRef; write ctx back; set `scheduler.current = Some(target_ref)` +/// for the epilogue's `exit_current` path. +/// 6. `worker_epilogue` (under lock + stats lock briefly). +/// 7. Clear `scheduler.current = None`; drop kernel lock. +/// 8. `phaser.arrive_and_wait(hw_id)` for B2. +/// +/// Lock order: kernel lock first, stats mutex second. No inversion. +/// Workers never take stats without kernel; coordinator follows the +/// same order. +/// +/// The parallel path is incompatible with debugger hooks, the DB +/// writer, and `XENIA_FORCE_PER_INSTR=1` (these all force the +/// per-instruction path which serializes the per-instruction +/// `step_cached` under the kernel lock — no parallelism gain). We +/// panic at entry rather than degrade silently. +fn run_execution_parallel( + mem: &xenia_memory::GuestMemory, + kernel_arc: &std::sync::Arc>, + debugger: &mut xenia_debugger::Debugger, + thunk_map: &HashMap, + mut db_writer: Option<&mut xenia_analysis::DbWriter>, + max_instructions: Option, + ips_limit: Option, + quiet: bool, + halt_on_deadlock: bool, + shutdown_outer: Option>, +) -> ExecStats { + use std::sync::atomic::{AtomicBool, Ordering}; + use std::sync::{Arc, Mutex}; + use xenia_cpu::interpreter::step_block; + use xenia_cpu::{Phaser, PhaserOutcome}; + + let _ = quiet; + let _ = debugger; // Verified incompatible just below; kept in signature for symmetry with run_execution. + + // Defense-in-depth: cmd_exec_inner should already gate these out + // with a friendly error, but if we got here with hooks attached, + // the parallel path can't honor them. + assert!( + !debugger.wants_hooks(), + "--parallel mode is incompatible with debugger hooks (per-instruction path required)" + ); + assert!( + db_writer.is_none(), + "--parallel mode is incompatible with --db / DB writer" + ); + assert!( + !std::env::var("XENIA_FORCE_PER_INSTR") + .map(|v| v == "1" || v.eq_ignore_ascii_case("true")) + .unwrap_or(false), + "--parallel mode is incompatible with XENIA_FORCE_PER_INSTR=1" + ); + let _ = (debugger, db_writer.take()); // suppress 'unused mut' / 'unused' on the bound param + + let halt_on_deadlock = halt_on_deadlock + || std::env::var("XENIA_HALT_ON_DEADLOCK") + .map(|v| v == "1" || v.eq_ignore_ascii_case("true")) + .unwrap_or(false); + + let throttle_start = Instant::now(); + + const COORD_ID: u8 = xenia_cpu::scheduler::HW_THREAD_COUNT as u8; // = 6 + const PARTY_COUNT: u32 = xenia_cpu::scheduler::HW_THREAD_COUNT as u32 + 1; + + let phaser: Arc = Arc::new(Phaser::new(PARTY_COUNT)); + let internal_shutdown: Arc = Arc::new(AtomicBool::new(false)); + let runnable_mask: [Arc; xenia_cpu::scheduler::HW_THREAD_COUNT] = + std::array::from_fn(|_| Arc::new(AtomicBool::new(false))); + + // (Reserved for the race-free parking work deferred from Step 05; + // see the Step 05 memo for the analysis. For now the per-worker + // handle table is unused — workers always arrive at B1 and the + // coordinator never calls unpark.) + + // Stats lives behind a Mutex shared between coordinator and workers. + // Lock order: kernel mutex first, stats mutex second. Stats is + // accessed only briefly (the duration of a helper invocation). + let stats_mtx: Mutex = Mutex::new(ExecStats::default()); + + std::thread::scope(|scope| { + for hw_id in 0..xenia_cpu::scheduler::HW_THREAD_COUNT as u8 { + let phaser_w: Arc = phaser.clone(); + let kernel_w = kernel_arc.clone(); + let shutdown_w: Arc = internal_shutdown.clone(); + let runnable_w: Arc = runnable_mask[hw_id as usize].clone(); + let stats_ref: &Mutex = &stats_mtx; + let mem_ref: &xenia_memory::GuestMemory = mem; + let thunk_map_ref: &HashMap = thunk_map; + + scope + .spawn(move || { + // Workers run with no debugger hooks and no DB + // writer (asserted in run_execution_parallel + // entry). The local debugger is constructed in + // the same "cold-run" shape as the main-thread + // one (paused=false, step_mode=Run, + // trace_enabled=false) so wants_hooks() is false + // and worker_prologue takes the block-cache fast + // path. Without this `Debugger::new()` defaults + // to paused/trace-enabled and worker_prologue's + // per-instruction path returns BreakOuter on the + // first should_break check. + let mut wc = WorkerCtx::new(hw_id, /*force_per_instr=*/false); + let mut local_debugger = xenia_debugger::Debugger::new(); + local_debugger.paused = false; + local_debugger.step_mode = xenia_debugger::StepMode::Run; + local_debugger.trace_enabled = false; + let mut local_db_writer: Option<&mut xenia_analysis::DbWriter> = None; + + 'worker: loop { + // B1: round-start gate. Workers ALWAYS arrive + // at B1 — the phaser advance is the only race- + // free synchronization with the coordinator's + // mask publish. Reading runnable_w only after + // B1's Release-Acquire pair guarantees we see + // the current round's mask, not a stale one. + match phaser_w.arrive_and_wait(hw_id) { + PhaserOutcome::Advanced => {} + PhaserOutcome::Shutdown => break 'worker, + PhaserOutcome::Timeout => { + tracing::warn!( + hw_id, + "worker: phaser B1 timeout — peer hung; shutting down" + ); + shutdown_w.store(true, Ordering::Release); + phaser_w.shutdown(); + break 'worker; + } + } + if shutdown_w.load(Ordering::Acquire) { + break 'worker; + } + + // Read mask AFTER B1 advance (Acquire pair). + if !runnable_w.load(Ordering::Acquire) { + phaser_w.skip(hw_id); + continue; + } + + // We have work. Acquire the kernel lock. + let prologue_outcome = { + let mut guard = kernel_w.lock().expect("kernel mutex poisoned"); + let mut s = stats_ref.lock().expect("stats mutex poisoned"); + let r = worker_prologue( + &mut wc, + &mut *guard, + mem_ref, + &mut local_debugger, + &mut local_db_writer, + thunk_map_ref, + &mut *s, + ); + // Drop stats lock first (reverse acquisition order). + drop(s); + // Don't drop the kernel guard yet for the + // StepBlock arm — we need to mem::replace + // before unlock. + (r, guard) + }; + match prologue_outcome.0 { + PrologueOutcome::Continue => { + // Halt-sentinel restore, thunk + // dispatch, or per-instruction step + // ran inline. Drop the lock and arrive + // at B2. + drop(prologue_outcome.1); + } + PrologueOutcome::BreakOuter => { + drop(prologue_outcome.1); + shutdown_w.store(true, Ordering::Release); + phaser_w.shutdown(); + break 'worker; + } + PrologueOutcome::StepBlock { + tid, + thread_ref, + block_ptr, + pc_before, + } => { + let mut guard = prologue_outcome.1; + // Snapshot ctx into a local; replace + // the in-scheduler ctx with a fresh + // (zeroed) PpcContext so peers can't + // observe stale state. + let mut ctx_taken = std::mem::replace( + guard.scheduler.ctx_mut_ref(thread_ref), + xenia_cpu::PpcContext::new(), + ); + let cycle_before = ctx_taken.cycle_count; + // Clear scheduler.current so peers + // don't see this slot as "running" + // while the lock is unheld. + guard.scheduler.end_slot_visit(); + drop(guard); + + // ── unlocked window ─────────────── + let block = unsafe { &*block_ptr }; + let result = step_block(&mut ctx_taken, mem_ref, block); + let executed = ctx_taken + .cycle_count + .saturating_sub(cycle_before); + // ────────────────────────────────── + + let mut guard = kernel_w.lock().expect("kernel mutex poisoned"); + // find_by_tid handles cross-worker + // migration (a peer's call_export may + // have moved this thread between slots). + let target_ref = tid + .and_then(|t| guard.scheduler.find_by_tid(t)) + .unwrap_or(thread_ref); + *guard.scheduler.ctx_mut_ref(target_ref) = ctx_taken; + // worker_epilogue's exit_current path + // expects scheduler.current to be set + // to the running thread. + guard.scheduler.current = Some(target_ref); + + let epilogue_outcome = { + let mut s = + stats_ref.lock().expect("stats mutex poisoned"); + let r = worker_epilogue( + &mut wc, + &mut *guard, + &mut local_debugger, + &mut *s, + tid, + target_ref, + block_ptr, + pc_before, + result, + executed, + ); + drop(s); + r + }; + // Clear current after epilogue so the + // next worker that picks up the lock + // doesn't observe it. + guard.scheduler.current = None; + drop(guard); + + if matches!(epilogue_outcome, SlotOutcome::BreakOuter) { + shutdown_w.store(true, Ordering::Release); + phaser_w.shutdown(); + break 'worker; + } + } + } + + // B2: round-end gate. Coordinator wakes when + // all 6 workers + coordinator have arrived + // (or skipped B2). + match phaser_w.arrive_and_wait(hw_id) { + PhaserOutcome::Advanced => {} + PhaserOutcome::Shutdown => break 'worker, + PhaserOutcome::Timeout => { + tracing::warn!( + hw_id, + "worker: phaser B2 timeout — peer hung; shutting down" + ); + shutdown_w.store(true, Ordering::Release); + phaser_w.shutdown(); + break 'worker; + } + } + } + }); + } + + // Coordinator runs on the main (calling) thread. + 'outer: loop { + // Pre-round under kernel + stats locks. + let pre_outcome = { + let mut guard = kernel_arc.lock().expect("kernel mutex poisoned"); + let r = { + let s = stats_mtx.lock().expect("stats mutex poisoned"); + coord_pre_round( + &mut *guard, + &*s, + max_instructions, + ips_limit, + throttle_start, + &shutdown_outer, + ) + }; + (r, guard) + }; + match pre_outcome.0 { + RoundCtl::BreakOuter => { + drop(pre_outcome.1); + internal_shutdown.store(true, Ordering::Release); + phaser.shutdown(); + break 'outer; + } + RoundCtl::Continue => {} + } + let mut guard = pre_outcome.1; + + guard.scheduler.begin_round(); + let order = guard.scheduler.round_schedule(); + + // Publish runnable mask for workers. All 6 workers arrive + // at B1 unconditionally; the mask is published before + // coord arrives so the phaser's Release on advance pairs + // with workers' Acquire-after-B1 read of the mask. + for id in 0..xenia_cpu::scheduler::HW_THREAD_COUNT as u8 { + let runnable = order.contains(&id); + runnable_mask[id as usize].store(runnable, Ordering::Release); + } + + let instrs_at_round_start = + stats_mtx.lock().expect("stats mutex poisoned").instruction_count; + + if order.is_empty() { + let r = { + let s = stats_mtx.lock().expect("stats mutex poisoned"); + coord_idle_advance(&mut *guard, halt_on_deadlock, &shutdown_outer, &*s) + }; + drop(guard); + match r { + RoundCtl::BreakOuter => { + internal_shutdown.store(true, Ordering::Release); + phaser.shutdown(); + break 'outer; + } + RoundCtl::Continue => { + // Workers see runnable_mask all-false at B1 and + // skip B2. Coordinator participates in both + // barriers so the phase advances normally. + match phaser.arrive_and_wait(COORD_ID) { + PhaserOutcome::Advanced => {} + PhaserOutcome::Shutdown => break 'outer, + PhaserOutcome::Timeout => { + tracing::warn!( + "coordinator: phaser B1 (idle) timeout; shutting down" + ); + internal_shutdown.store(true, Ordering::Release); + phaser.shutdown(); + break 'outer; + } + } + match phaser.arrive_and_wait(COORD_ID) { + PhaserOutcome::Advanced => {} + PhaserOutcome::Shutdown => break 'outer, + PhaserOutcome::Timeout => { + tracing::warn!( + "coordinator: phaser B2 (idle) timeout; shutting down" + ); + internal_shutdown.store(true, Ordering::Release); + phaser.shutdown(); + break 'outer; + } + } + continue; + } + } + } + + drop(guard); // Release lock so workers can acquire for prologue/epilogue. + + // B1: signal workers to start. All 6 workers arrive at + // B1 unconditionally; coord is the 7th party. + match phaser.arrive_and_wait(COORD_ID) { + PhaserOutcome::Advanced => {} + PhaserOutcome::Shutdown => break 'outer, + PhaserOutcome::Timeout => { + tracing::warn!("coordinator: phaser B1 timeout; shutting down"); + internal_shutdown.store(true, Ordering::Release); + phaser.shutdown(); + break 'outer; + } + } + + // Workers process slots concurrently. Idle-slot workers + // skip B2 (the coordinator does NOT skip on their behalf — + // see the race analysis in the Step 05 memo). + + // B2: wait for all workers to finish. + match phaser.arrive_and_wait(COORD_ID) { + PhaserOutcome::Advanced => {} + PhaserOutcome::Shutdown => break 'outer, + PhaserOutcome::Timeout => { + tracing::warn!("coordinator: phaser B2 timeout; shutting down"); + internal_shutdown.store(true, Ordering::Release); + phaser.shutdown(); + break 'outer; + } + } + + // All workers quiescent. Reacquire lock for post-round. + let post_outcome = { + let mut guard = kernel_arc.lock().expect("kernel mutex poisoned"); + let r = { + let s = stats_mtx.lock().expect("stats mutex poisoned"); + coord_post_round(&mut *guard, mem, &*s, instrs_at_round_start) + }; + drop(guard); + r + }; + match post_outcome { + RoundCtl::BreakOuter => { + internal_shutdown.store(true, Ordering::Release); + phaser.shutdown(); + break 'outer; + } + RoundCtl::Continue => {} + } + } + }); // <- thread::scope joins all workers here. + + stats_mtx.into_inner().expect("stats mutex poisoned") +} + +/// First-Pixels M2 — inject a queued graphics interrupt into HW thread 0 +/// when it's safe to do so (callback registered, no interrupt already +/// running). Called at the top of each scheduler round. +/// +/// Unlike the earlier P6 version which only delivered when HW 0 was +/// `Ready`, this one also delivers when HW 0 is `Blocked`: the injector +/// stashes the block reason into the new `HwState::ServicingIrq(reason)` +/// variant, flips the thread to that state so `round_schedule` runs it, +/// and — on callback return to `LR_HALT_SENTINEL` — the restore path +/// re-creates `Blocked(reason)`, unless a `wake()` during the callback +/// (e.g. `KeSetEvent` → `wake_eligible_waiters`) flipped it to `Ready`, +/// in which case the wait was resolved and we leave it. +/// +/// This is the fix that unblocks games (like Sylpheed) which gate their +/// main loop on a v-sync callback signaling an event the main thread +/// waits on. The earlier "only-when-Ready" policy dropped 397 of 399 +/// observed v-syncs on a 1 B-instruction Sylpheed probe; now they +/// actually get delivered. +fn try_inject_graphics_interrupt(kernel: &mut xenia_kernel::KernelState) { + use xenia_cpu::scheduler::HwState; + + if kernel.interrupts.is_in_callback() { + return; + } + let Some(cb) = kernel.interrupts.callback else { + // No callback registered; drain any pending entries (they + // wouldn't have made it into the queue per `queue_interrupt`'s + // own `callback.is_none()` guard, but be defensive). + kernel.interrupts.pending.clear(); + return; + }; + + let Some(source) = kernel.interrupts.peek_next() else { + return; + }; + + // Canary's `EmulateCPInterruptDPC` (kernel_state.cc:1373) dispatches on + // whatever the current thread happens to be — real hardware fires the + // interrupt on CPU 2 and the kernel impersonates a DPC on top of + // whichever thread is active. Hard-anchoring to HW 0 breaks the moment + // `main()` returns: Sylpheed's main thread exits right after init, the + // render worker spins on a `PKEVENT` inside the interrupt callback's + // user_data struct (`user_data + 0x5C`), and because HW 0 is now + // `Exited(_)` our injector drops every subsequent vsync — the PKEVENT + // is never signaled and the worker polls forever. + // + // Pick the first HW thread we can plausibly run the callback on: + // 1. Prefer `Ready` (no state-mangling needed) + // 2. Else take a `Blocked(reason)` thread and swap to + // `ServicingIrq(reason)` so the round scheduler runs it; the + // LR-sentinel restore path reinstates the block on callback return + // 3. Skip `Idle`, `Exited`, or already-`ServicingIrq` slots + // + // The callback itself just signals a game-side event and returns — it + // doesn't care which HW thread it ran on. + // Pass 1: find any Ready thread across all slots. + let mut victim: Option = None; + 'outer_ready: for (hw_id, slot) in kernel.scheduler.slots.iter().enumerate() { + for (idx, t) in slot.runqueue.iter().enumerate() { + if matches!(t.state, HwState::Ready) { + victim = Some(xenia_cpu::ThreadRef::new(hw_id as u8, idx as u16)); + break 'outer_ready; + } + } + } + // Pass 2: any Blocked thread (we'll flip it to ServicingIrq). + if victim.is_none() { + 'outer_blocked: for (hw_id, slot) in kernel.scheduler.slots.iter().enumerate() { + for (idx, t) in slot.runqueue.iter().enumerate() { + if matches!(t.state, HwState::Blocked(_)) { + victim = Some(xenia_cpu::ThreadRef::new(hw_id as u8, idx as u16)); + break 'outer_blocked; + } + } + } + } + let Some(target_ref) = victim else { + // All threads Idle/Exited/already servicing — nothing to inject on. + kernel.interrupts.take_next(); + kernel.interrupts.dropped += 1; + return; + }; + + let t = kernel.scheduler.thread_mut(target_ref); + let prev_state = t.state.clone(); + match prev_state { + HwState::Ready => {} + HwState::Blocked(reason) => { + t.state = HwState::ServicingIrq(reason); + } + _ => unreachable!("victim selection above filtered out other variants"), } + let _ = kernel.interrupts.take_next(); + let t = kernel.scheduler.thread_mut(target_ref); + let saved = xenia_kernel::SavedCallbackCtx::capture(&t.ctx, source); + kernel.interrupts.injected_ref = Some(target_ref); + t.ctx.pc = cb.callback_pc; + t.ctx.lr = xenia_cpu::context::LR_HALT_SENTINEL; + // Canary `Processor::Execute` decrements the guest SP by 176 before + // running the callback and restores on return (see Canary + // processor.cc:383). Without this pad the callback's + // `__savegprlr_N` prologue stomps the interrupted function's + // already-saved LR at [r1-8], so when the interrupted function + // later returns via `__restgprlr_N -> bclr` it jumps to + // `LR_HALT_SENTINEL` and the thread exits prematurely. Matching + // restore lives in `SavedCallbackCtx::restore` (which now also + // restores r1). + t.ctx.gpr[1] = t + .ctx + .gpr[1] + .wrapping_sub(xenia_kernel::interrupts::CALLBACK_STACK_PAD as u64); + t.ctx.gpr[3] = source as u64; + t.ctx.gpr[4] = cb.user_data as u64; + kernel.interrupts.saved = Some(saved); + metrics::counter!("gpu.interrupt.delivered", "source" => format!("{source}")) + .increment(1); + tracing::debug!( + source, + hw_id = target_ref.hw_id, + idx = target_ref.idx, + callback = format_args!("{:#010x}", cb.callback_pc), + "graphics interrupt: injecting" + ); +} + +fn print_summary( + ctx: &xenia_cpu::PpcContext, + debugger: &xenia_debugger::Debugger, + db_writer: &Option, + quiet: bool, +) { if !quiet { println!("\n=== Final State ==="); println!("PC: {:#010x}", ctx.pc); @@ -584,15 +2705,273 @@ fn cmd_exec( } } } - println!("Executed {} instructions ({} import calls, {} unimplemented)", - instruction_count, import_count, unimpl_count); - if !quiet && db_writer.is_none() { - println!("Trace log: {} entries", debugger.trace_log.len()); + if db_writer.is_none() { + info!(entries = debugger.trace_log.len(), "in-memory trace log"); + } +} + +/// Dump a per-thread + per-handle diagnostic at end-of-run. Lets the next +/// session see *which* thread is stuck *where* without having to re-derive +/// it from ring-buffer / event-count math. Mirrors the halt-on-deadlock +/// dump in `run_execution` but runs on the normal `-n` exit path too. +fn dump_thread_diagnostic(kernel: &xenia_kernel::KernelState, quiet: bool) { + if quiet { + return; + } + use xenia_kernel::objects::KernelObject; + println!("\n=== Thread diagnostics ==="); + for (hw_id, slot) in kernel.scheduler.slots.iter().enumerate() { + if slot.runqueue.is_empty() { + println!(" hw={} (empty)", hw_id); + continue; + } + for (idx, t) in slot.runqueue.iter().enumerate() { + println!( + " hw={} idx={} tid={} state={:?} pc={:#010x} lr={:#010x} sp={:#010x}", + hw_id, + idx, + t.tid, + t.state, + t.ctx.pc, + t.ctx.lr as u32, + t.ctx.gpr[1] as u32, + ); + println!( + " r0={:#010x} r3={:#010x} r4={:#010x} r5={:#010x} r6={:#010x} r7={:#010x}", + t.ctx.gpr[0] as u32, + t.ctx.gpr[3] as u32, + t.ctx.gpr[4] as u32, + t.ctx.gpr[5] as u32, + t.ctx.gpr[6] as u32, + t.ctx.gpr[7] as u32, + ); + println!( + " r8={:#010x} r9={:#010x} r10={:#010x} r11={:#010x} r12={:#010x} r13={:#010x}", + t.ctx.gpr[8] as u32, + t.ctx.gpr[9] as u32, + t.ctx.gpr[10] as u32, + t.ctx.gpr[11] as u32, + t.ctx.gpr[12] as u32, + t.ctx.gpr[13] as u32, + ); + } } + let tid_of = |r: &xenia_cpu::ThreadRef| -> u32 { + kernel + .scheduler + .slots + .get(r.hw_id as usize) + .and_then(|s| s.runqueue.get(r.idx as usize)) + .map(|t| t.tid) + .unwrap_or(0) + }; + + let mut any_obj = false; + for (h, obj) in kernel.objects.iter() { + let (kind, waiters) = match obj { + KernelObject::Event { signaled, manual_reset, waiters } => { + (format!("Event(sig={}, mr={})", signaled, manual_reset), waiters) + } + KernelObject::Semaphore { count, max, waiters } => { + (format!("Semaphore({}/{})", count, max), waiters) + } + KernelObject::Thread { id, exit_code, waiters, .. } => { + (format!("Thread(id={}, exit={:?})", id, exit_code), waiters) + } + KernelObject::Timer { signaled, deadline, waiters, .. } => { + (format!("Timer(sig={}, deadline={:?})", signaled, deadline), waiters) + } + KernelObject::Mutex { owner, recursion, waiters } => { + (format!("Mutex(owner={:?}, rec={})", owner, recursion), waiters) + } + KernelObject::File { .. } => continue, + }; + if waiters.is_empty() { + continue; + } + if !any_obj { + println!("\n -- Handle waiter lists --"); + any_obj = true; + } + let tids: Vec = waiters.iter().map(&tid_of).collect(); + println!(" handle={:#010x} {} waiters(tid)={:?}", h, kind, tids); + } + + if !kernel.cs_waiters.is_empty() { + let mut any_cs = false; + for (cs_ptr, waiters) in kernel.cs_waiters.iter() { + if waiters.is_empty() { + continue; + } + if !any_cs { + println!("\n -- Critical section waiters --"); + any_cs = true; + } + let tids: Vec = waiters.iter().map(&tid_of).collect(); + println!(" cs={:#010x} waiters(tid)={:?}", cs_ptr, tids); + } + } + + // Audit trails (only when --trace-handles flipped the flag). For each + // tracked handle, emit a compact block: kind, creator, and the bounded + // ring of signal/wait/wake events. Highlight handles with waiters and + // zero signals — the smoking gun for a missing kernel API. + if kernel.audit.enabled { + println!("\n=== Handle audit trails ==="); + // Suspect set from project memory: handles stuck on Sylpheed boot. + const SUSPECT: [u32; 5] = [0x10FC, 0x1014, 0x1104, 0x10DC, 0x10F0]; + + // Sort by handle for stable output. + let mut handles: Vec = kernel.audit.trails.keys().copied().collect(); + handles.sort(); + + for h in handles { + let trail = &kernel.audit.trails[&h]; + let waiter_count = kernel + .objects + .get(&h) + .and_then(|o| match o { + KernelObject::Event { waiters, .. } + | KernelObject::Semaphore { waiters, .. } + | KernelObject::Thread { waiters, .. } + | KernelObject::Timer { waiters, .. } + | KernelObject::Mutex { waiters, .. } => Some(waiters.len()), + KernelObject::File { .. } => None, + }) + .unwrap_or(0); + let suspect = SUSPECT.contains(&h); + let interesting = waiter_count > 0 + || trail.signals.is_empty() && !trail.waits.is_empty() + || suspect; + if !interesting { + continue; + } + let marker = if suspect { " " } else { "" }; + let smoking = if !trail.waits.is_empty() && trail.signals.is_empty() { + " " + } else { + "" + }; + println!( + " handle={:#010x} kind={} waiters={} signals={} waits={} wakes={}{}{}", + h, + trail.kind, + waiter_count, + trail.signals.len(), + trail.waits.len(), + trail.wakes.len(), + marker, + smoking, + ); + println!( + " created cycle={} tid={} lr={:#010x} src={}", + trail.created.cycle, trail.created.tid, trail.created.lr, trail.created.source, + ); + // Print the first few of each ring — full dump can be re-run with + // a larger AUDIT_RING_CAPACITY if needed. + let head = 4usize; + for (label, ring) in [ + ("waits", &trail.waits), + ("signals", &trail.signals), + ("wakes", &trail.wakes), + ] { + if ring.is_empty() { + continue; + } + println!(" {} ({}):", label, ring.len()); + for (i, e) in ring.iter().take(head).enumerate() { + println!( + " [{:>2}] cycle={} tid={} lr={:#010x} src={} aux={:#x}", + i, e.cycle, e.tid, e.lr, e.source, e.aux, + ); + } + if ring.len() > head { + println!(" ... ({} more)", ring.len() - head); + } + } + } + } +} + +#[allow(clippy::too_many_arguments)] +#[instrument(skip_all, fields(title))] +fn run_with_ui( + title: &str, + mut mem: xenia_memory::GuestMemory, + mut kernel: xenia_kernel::KernelState, + mut debugger: xenia_debugger::Debugger, + thunk_map: HashMap, + mut db_writer: Option, + max_instructions: Option, + ips_limit: Option, + quiet: bool, + halt_on_deadlock: bool, + started: Instant, +) -> Result<()> { + use winit::event_loop::EventLoop; + + let event_loop = EventLoop::::with_user_event() + .build() + .map_err(|e| anyhow::anyhow!("winit event loop build failed: {e}"))?; + let (ui_handles, kernel_bridge) = xenia_ui::build(event_loop.create_proxy()); + kernel.ui = Some(kernel_bridge); + + let shutdown = std::sync::Arc::clone(&ui_handles.shutdown); + let title_owned = std::path::Path::new(title) + .file_name() + .and_then(|n| n.to_str()) + .unwrap_or(title) + .to_string(); + + let worker_span = tracing::info_span!("cpu_worker"); + let worker = std::thread::Builder::new() + .name("xenia-cpu".into()) + .spawn(move || -> Result<(ExecStats, xenia_kernel::KernelState, xenia_debugger::Debugger, Option)> { + let _guard = worker_span.enter(); + let stats = run_execution( + &mut mem, + &mut kernel, + &mut debugger, + &thunk_map, + db_writer.as_mut(), + max_instructions, + ips_limit, + quiet, + halt_on_deadlock, + Some(shutdown), + ); + if let Some(ref mut db) = db_writer { + db.finalize_traces()?; + } + Ok((stats, kernel, debugger, db_writer)) + }) + .map_err(|e| anyhow::anyhow!("spawn CPU worker: {e}"))?; + + // Run the UI — blocks until the user closes the window or the worker + // flips the shutdown flag itself (e.g. after max_instructions). + xenia_ui::run(event_loop, ui_handles, &title_owned)?; + + let (stats, kernel, debugger, db_writer) = match worker.join() { + Ok(res) => res?, + Err(_) => { + return Err(anyhow::anyhow!("CPU worker thread panicked")); + } + }; + + print_summary(kernel.scheduler.ctx(0), &debugger, &db_writer, quiet); + dump_thread_diagnostic(&kernel, quiet); + info!( + wall_ms = started.elapsed().as_millis() as u64, + instructions = stats.instruction_count, + import_calls = stats.import_count, + unimplemented = stats.unimpl_count, + "exec --ui complete" + ); Ok(()) } +#[instrument(skip_all, fields(path = %path))] fn cmd_browse(path: &str) -> Result<()> { use xenia_vfs::VfsDevice; @@ -607,13 +2986,14 @@ fn cmd_browse(path: &str) -> Result<()> { println!(" {} {:>10} {}", kind, entry.size, entry.name); } } - Err(e) => println!(" Error listing contents: {}", e), + Err(e) => tracing::error!(%e, "error listing contents"), } Ok(()) } /// Helper: load XEX, parse header, decompress PE, resolve imports, parse sections. +#[instrument(skip_all, fields(path = %path))] fn load_and_prepare(path: &str) -> Result<(xenia_xex::Xex2Header, Vec, Vec)> { let data = load_xex_data(path)?; let mut header = xenia_xex::loader::parse_xex2_header(&data)?; @@ -623,21 +3003,26 @@ fn load_and_prepare(path: &str) -> Result<(xenia_xex::Xex2Header, Vec, Vec, db_path: Option<&str>) -> Result<()> { use serde::Serialize; @@ -686,13 +3071,17 @@ fn cmd_extract(path: &str, output_dir: Option<&str>, db_path: Option<&str>) -> R // Write PE image let pe_path = out_dir.join(format!("{stem}.pe")); std::fs::write(&pe_path, &pe_image)?; - eprintln!("Wrote PE image: {} ({} bytes)", pe_path.display(), pe_image.len()); + info!( + path = %pe_path.display(), + bytes = pe_image.len(), + "wrote PE image" + ); // Write JSON metadata let json_path = out_dir.join(format!("{stem}.xex.json")); let json = serde_json::to_string_pretty(&info)?; std::fs::write(&json_path, &json)?; - eprintln!("Wrote metadata: {}", json_path.display()); + info!(path = %json_path.display(), "wrote metadata JSON"); // Print summary let total_imports: usize = header.import_libraries.iter().map(|l| l.imports.len()).sum(); @@ -713,18 +3102,27 @@ fn cmd_extract(path: &str, output_dir: Option<&str>, db_path: Option<&str>) -> R sections: §ions, import_libraries: &header.import_libraries, }; - eprintln!("Writing base tables to {db}..."); + info!(db = %db, "writing base tables"); let mut w = xenia_analysis::DbWriter::open_fresh(std::path::Path::new(db))?; w.write_base(&disasm_info)?; - eprintln!("Database written: {db}"); + info!(db = %db, "database written"); } Ok(()) } -fn cmd_dis(path: &str, output: Option<&str>, db_path: Option<&str>, quiet: bool) -> Result<()> { +#[instrument(skip_all, fields(path = %path))] +fn cmd_dis( + path: &str, + output: Option<&str>, + db_path: Option<&str>, + json_path: Option<&str>, + analyze: AnalyzeMode, + quiet: bool, +) -> Result<()> { use std::collections::HashMap; + let started = Instant::now(); let (header, pe_image, sections) = load_and_prepare(path)?; let entry = xenia_xex::loader::get_entry_point(&header).unwrap(); @@ -742,7 +3140,7 @@ fn cmd_dis(path: &str, output: Option<&str>, db_path: Option<&str>, quiet: bool) import_map.insert(imp.address, name); } } - eprintln!("Resolved {} import thunks", import_map.len()); + info!(thunks = import_map.len(), "resolved import thunks"); // Function analysis let code_sections: Vec<(u32, u32, u32)> = sections.iter() @@ -750,14 +3148,18 @@ fn cmd_dis(path: &str, output: Option<&str>, db_path: Option<&str>, quiet: bool) .map(|s| (s.virtual_address, s.virtual_size, s.flags)) .collect(); let func_analysis = xenia_analysis::func::analyze(&pe_image, base, entry, &code_sections); - eprintln!("Functions detected: {}", func_analysis.functions.len()); + info!(functions = func_analysis.functions.len(), "function detection complete"); // Cross-reference analysis let xref_result = xenia_analysis::xref::analyze_xrefs( &pe_image, base, entry, §ions, &func_analysis, &import_map, ); let total_xrefs: usize = xref_result.xrefs.values().map(|v| v.len()).sum(); - eprintln!("Labels: {}, Cross-references: {}", xref_result.labels.len(), total_xrefs); + info!( + labels = xref_result.labels.len(), + xrefs = total_xrefs, + "xref analysis complete" + ); // Build DisasmInfo let disasm_info = xenia_analysis::formatter::DisasmInfo { @@ -770,19 +3172,55 @@ fn cmd_dis(path: &str, output: Option<&str>, db_path: Option<&str>, quiet: bool) import_libraries: &header.import_libraries, }; - // SQLite database output (base + disasm layers) + // SQLite database output (base + ingest + analyze layers) if let Some(db) = db_path { - eprintln!("Writing database to {db}..."); + info!(db = %db, analyze = ?analyze, "writing database"); let mut w = xenia_analysis::DbWriter::open_fresh(std::path::Path::new(db))?; w.write_base(&disasm_info)?; - w.write_disasm( + w.ingest_instructions(&pe_image, &disasm_info, &func_analysis, &xref_result.labels)?; + w.write_analysis_results( &pe_image, &disasm_info, &func_analysis, &xref_result.labels, &xref_result.xrefs, )?; - eprintln!("Database written: {db}"); + if matches!(analyze, AnalyzeMode::Sql | AnalyzeMode::Both) { + w.create_sql_views()?; + info!(db = %db, "SQL views created"); + } + if matches!(analyze, AnalyzeMode::Both) { + let (sql_only, rust_only) = w.cross_check_branch_xrefs()?; + if sql_only == 0 && rust_only == 0 { + info!(db = %db, "Rust/SQL branch xrefs agree"); + } else { + tracing::warn!( + db = %db, + sql_only, + rust_only, + "Rust/SQL branch xref disagreement — investigate formatter mnemonic vs xref.rs kind classification" + ); + } + } + info!(db = %db, "database written"); + } + + // JSON Lines output: one row per instruction, structured columns. + if let Some(json) = json_path { + info!(json = %json, "writing JSON Lines"); + let mut out = std::io::BufWriter::new(std::fs::File::create(json)?); + let mut total: u64 = 0; + for section in §ions { + if !section.is_code() { continue; } + let abs_start = base + section.virtual_address; + let abs_end = abs_start + section.virtual_size; + let items = xenia_analysis::enrich_section( + &pe_image, base, §ion.name, abs_start, abs_end, + &func_analysis, &xref_result.labels, + ); + total += xenia_analysis::sinks::json::write_jsonl(&mut out, items)?; + } + info!(json = %json, rows = total, "JSON Lines written"); } // Assembly output (skipped when --quiet and no --output specified) @@ -804,9 +3242,47 @@ fn cmd_dis(path: &str, output: Option<&str>, db_path: Option<&str>, quiet: bool) )?; if let Some(path) = output { - eprintln!("Wrote disassembly: {path}"); + info!(path, "wrote disassembly"); } } + info!(wall_ms = started.elapsed().as_millis() as u64, "dis complete"); Ok(()) } + + +#[cfg(test)] +mod tests { + use super::parse_hex_u32; + + #[test] + fn parse_hex_u32_accepts_0x_prefix() { + assert_eq!(parse_hex_u32("0x824be9a0").unwrap(), 0x824be9a0); + assert_eq!(parse_hex_u32("0X82000000").unwrap(), 0x82000000); + } + + #[test] + fn parse_hex_u32_accepts_bare_hex() { + // No 0x prefix, contains hex letters — treated as hex. + assert_eq!(parse_hex_u32("824be9a0").unwrap(), 0x824be9a0); + } + + #[test] + fn parse_hex_u32_accepts_decimal() { + // All digits, no 0x — treated as decimal. + assert_eq!(parse_hex_u32("1000").unwrap(), 1000); + assert_eq!(parse_hex_u32("0").unwrap(), 0); + } + + #[test] + fn parse_hex_u32_rejects_garbage() { + assert!(parse_hex_u32("not a number").is_err()); + assert!(parse_hex_u32("0xZZZ").is_err()); + } + + #[test] + fn parse_hex_u32_tolerates_whitespace() { + assert_eq!(parse_hex_u32(" 0x82000000 ").unwrap(), 0x82000000); + } +} + diff --git a/crates/xenia-app/src/observability.rs b/crates/xenia-app/src/observability.rs new file mode 100644 index 0000000..265d248 --- /dev/null +++ b/crates/xenia-app/src/observability.rs @@ -0,0 +1,384 @@ +//! Logging, tracing, and profiling wiring for the `xenia-rs` CLI. +//! +//! Owns the `tracing-subscriber` registry, optional file / Chrome-trace sinks, +//! the `metrics` debugging recorder, and (behind the `profiling` feature) the +//! `pprof-rs` sampling profiler. All drop-time cleanup (flushing appenders, +//! finalising Chrome output, writing flamegraphs, printing the metrics +//! summary) is carried by [`ObservabilityGuards`] so `main` just has to hold +//! the value until return. + +use std::path::{Path, PathBuf}; + +use anyhow::{bail, Context, Result}; +use tracing::Level; +use tracing_error::{ErrorLayer, SpanTrace}; +use tracing_subscriber::fmt::format::FmtSpan; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::util::SubscriberInitExt; +use tracing_subscriber::{fmt, EnvFilter, Layer, Registry}; + +/// User-selectable observability settings parsed from CLI + environment. +#[derive(Debug, Clone, Default)] +pub struct ObservabilityConfig { + /// If `true`, render console logs as JSON instead of compact text. + pub log_json: bool, + /// Additional log sink file. `.json` → JSON formatter; anything else → text. + pub log_file: Option, + /// Overrides `RUST_LOG` when set. Passed through `EnvFilter::try_new`. + pub log_filter: Option, + /// Default filter directive used when neither `RUST_LOG` nor + /// [`log_filter`](Self::log_filter) are set. + pub default_level: &'static str, + /// If set, emit a Chrome `about:tracing` JSON trace to this path. + pub trace_chrome: Option, + /// If set, run the pprof sampling profiler and write output here on drop. + /// Extension `.svg` → flamegraph, `.pb` → protobuf. + pub profile: Option, +} + +impl ObservabilityConfig { + #[allow(dead_code)] + pub fn new(default_level: &'static str) -> Self { + Self { + default_level, + ..Self::default() + } + } +} + +/// RAII handle returned by [`init`]. Drop flushes the appender, finalises +/// Chrome output, writes the pprof report, and prints the metrics summary. +#[must_use = "drop of ObservabilityGuards is what flushes logs, profiles, and metrics"] +pub struct ObservabilityGuards { + _appender: Option, + _chrome: Option, + #[cfg(feature = "profiling")] + pprof: Option<(pprof::ProfilerGuard<'static>, PathBuf)>, + metrics_snapshotter: Option, +} + +impl Drop for ObservabilityGuards { + fn drop(&mut self) { + #[cfg(feature = "profiling")] + if let Some((guard, path)) = self.pprof.take() { + if let Err(e) = write_pprof_report(&guard, &path) { + eprintln!("profile write failed: {e:#}"); + } else { + tracing::info!(path = %path.display(), "pprof report written"); + } + } + + if let Some(snap) = self.metrics_snapshotter.take() { + print_metrics_summary(&snap); + } + } +} + +/// Build and install the global tracing subscriber + metrics recorder. +pub fn init(config: &ObservabilityConfig) -> Result { + let span_events = parse_span_events(); + + // Resolve the filter directive once; attach a freshly-built `EnvFilter` + // per sink layer via `.with_filter()`. Previously the filter was pushed + // into the layer-`Vec` but that only gates what *itself* sees in a + // boxed-Vec setup; sibling fmt / chrome / file layers kept emitting + // filtered-out events. Per-layer filtering is the idiomatic tracing- + // subscriber pattern and works cleanly with boxed layer dispatch. + let directive = resolve_filter_directive(config); + + let mut layers: Vec + Send + Sync + 'static>> = Vec::new(); + + // Console fmt layer — compact text or JSON, always stderr. + let console_layer: Box + Send + Sync + 'static> = if config.log_json { + fmt::layer() + .json() + .with_span_events(span_events.clone()) + .with_writer(std::io::stderr) + .with_filter(EnvFilter::try_new(&directive).context("invalid filter")?) + .boxed() + } else { + fmt::layer() + .compact() + .with_span_events(span_events.clone()) + .with_writer(std::io::stderr) + .with_filter(EnvFilter::try_new(&directive).context("invalid filter")?) + .boxed() + }; + layers.push(console_layer); + + // Optional file sink — also filtered. + let appender_guard = match &config.log_file { + Some(path) => { + let (layer, guard) = build_file_layer(path, span_events)?; + layers.push( + layer + .with_filter(EnvFilter::try_new(&directive).context("invalid filter")?) + .boxed(), + ); + Some(guard) + } + None => None, + }; + + // Optional Chrome `about:tracing` sink — intentionally UNFILTERED so + // traces capture the full picture even when the console is quiet. + let chrome_guard = match &config.trace_chrome { + Some(path) => { + let (layer, guard) = tracing_chrome::ChromeLayerBuilder::new() + .file(path.clone()) + .include_args(true) + .build(); + layers.push(layer.boxed()); + Some(guard) + } + None => None, + }; + + // `tracing-error` layer enables SpanTrace capture in `with_span_trace`. + layers.push(ErrorLayer::default().boxed()); + + tracing_subscriber::registry() + .with(layers) + .try_init() + .context("tracing subscriber already initialized")?; + // `build_env_filter` is retained for compatibility with older callers; + // `resolve_filter_directive` above is what actually drives the layer + // filters. + let _ = build_env_filter(config); + + // Install the metrics debugging recorder. `install` sets the global + // recorder; its snapshotter is held in the guards struct. + let recorder = metrics_util::debugging::DebuggingRecorder::new(); + let snapshotter = recorder.snapshotter(); + if recorder.install().is_err() { + tracing::warn!("a metrics recorder was already installed; skipping xenia-rs recorder"); + } + + #[cfg(feature = "profiling")] + let pprof = match &config.profile { + Some(path) => { + let guard = pprof::ProfilerGuardBuilder::default() + .frequency(100) + .blocklist(&["libc", "libgcc", "pthread", "vdso"]) + .build() + .context("failed to start pprof sampling profiler")?; + Some((guard, path.clone())) + } + None => None, + }; + + #[cfg(not(feature = "profiling"))] + if config.profile.is_some() { + bail!("--profile requires building with --features profiling"); + } + + Ok(ObservabilityGuards { + _appender: appender_guard, + _chrome: chrome_guard, + #[cfg(feature = "profiling")] + pprof, + metrics_snapshotter: Some(snapshotter), + }) +} + +fn resolve_filter_directive(config: &ObservabilityConfig) -> String { + if let Some(ref f) = config.log_filter { + return f.clone(); + } + if let Ok(f) = std::env::var("RUST_LOG") + && !f.is_empty() { + return f; + } + config.default_level.to_string() +} + +fn build_env_filter(config: &ObservabilityConfig) -> Result { + // Precedence: explicit --log-filter > RUST_LOG > default_level. + if let Some(ref f) = config.log_filter { + return EnvFilter::try_new(f).context("invalid --log-filter directive"); + } + if let Ok(f) = EnvFilter::try_from_default_env() { + return Ok(f); + } + EnvFilter::try_new(config.default_level) + .with_context(|| format!("invalid default filter `{}`", config.default_level)) +} + +fn parse_span_events() -> FmtSpan { + match std::env::var("RUST_LOG_SPAN_EVENTS").as_deref() { + Ok("full") => FmtSpan::FULL, + Ok("close") => FmtSpan::CLOSE, + Ok("active") => FmtSpan::ACTIVE, + Ok("enter") => FmtSpan::ENTER, + Ok("exit") => FmtSpan::EXIT, + Ok("new") => FmtSpan::NEW, + _ => FmtSpan::NONE, + } +} + +type FileLayerBox = Box + Send + Sync + 'static>; + +fn build_file_layer( + path: &Path, + span_events: FmtSpan, +) -> Result<(FileLayerBox, tracing_appender::non_blocking::WorkerGuard)> { + let parent = path.parent().unwrap_or_else(|| Path::new(".")); + let file_name = path + .file_name() + .ok_or_else(|| anyhow::anyhow!("log file path has no file name: {}", path.display()))?; + std::fs::create_dir_all(parent) + .with_context(|| format!("failed to create {}", parent.display()))?; + + let appender = tracing_appender::rolling::never(parent, file_name); + let (non_blocking, guard) = tracing_appender::non_blocking(appender); + + let ext = path + .extension() + .and_then(|e| e.to_str()) + .unwrap_or_default(); + let layer: FileLayerBox = if ext.eq_ignore_ascii_case("json") { + fmt::layer() + .json() + .with_span_events(span_events) + .with_writer(non_blocking) + .with_ansi(false) + .boxed() + } else { + fmt::layer() + .with_span_events(span_events) + .with_writer(non_blocking) + .with_ansi(false) + .boxed() + }; + + Ok((layer, guard)) +} + +/// Wrap an error with a captured span-trace so the top-level `main` can +/// render "failed in `cmd_exec > load_image > …`" alongside the regular +/// anyhow context chain. +#[allow(dead_code)] +pub fn with_span_trace(err: E) -> anyhow::Error +where + E: std::error::Error + Send + Sync + 'static, +{ + anyhow::Error::new(err).context(SpanTraceDisplay(SpanTrace::capture())) +} + +/// Attach a captured span-trace to an existing `anyhow::Error` as extra +/// context. Used at command boundaries where errors already bubble as +/// `anyhow::Error`. +pub fn attach_span_trace(err: anyhow::Error) -> anyhow::Error { + err.context(SpanTraceDisplay(SpanTrace::capture())) +} + +struct SpanTraceDisplay(SpanTrace); + +impl std::fmt::Display for SpanTraceDisplay { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "span trace:\n{}", self.0) + } +} + +impl std::fmt::Debug for SpanTraceDisplay { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + ::fmt(self, f) + } +} + +#[cfg(feature = "profiling")] +fn write_pprof_report(guard: &pprof::ProfilerGuard<'static>, path: &Path) -> Result<()> { + let report = guard.report().build().context("pprof report build failed")?; + let ext = path + .extension() + .and_then(|e| e.to_str()) + .unwrap_or("") + .to_ascii_lowercase(); + + let parent = path.parent().unwrap_or_else(|| Path::new(".")); + std::fs::create_dir_all(parent).ok(); + + match ext.as_str() { + "svg" | "" => { + let file = std::fs::File::create(path) + .with_context(|| format!("create {}", path.display()))?; + report + .flamegraph(file) + .context("flamegraph render failed")?; + } + "pb" | "proto" | "pprof" => { + use pprof::protos::Message; + let profile = report.pprof().context("pprof protobuf build failed")?; + let buf = profile + .write_to_bytes() + .context("pprof protobuf encode failed")?; + std::fs::write(path, &buf).with_context(|| format!("write {}", path.display()))?; + } + other => bail!("unknown --profile extension `.{other}` (use .svg or .pb)"), + } + Ok(()) +} + +fn print_metrics_summary(snap: &metrics_util::debugging::Snapshotter) { + use metrics_util::debugging::DebugValue; + use metrics_util::MetricKind; + + let snapshot = snap.snapshot(); + let rows = snapshot.into_vec(); + if rows.is_empty() { + return; + } + + // Group counters, gauges, histograms into simple lines. Use tracing so + // the summary honours the installed subscriber (can land in file + JSON + // sinks and not just stderr). + let mut lines: Vec = Vec::with_capacity(rows.len()); + for (key, _unit, _desc, value) in rows { + let kind = match key.kind() { + MetricKind::Counter => "counter", + MetricKind::Gauge => "gauge", + MetricKind::Histogram => "histogram", + }; + let name = key.key().name(); + let labels: Vec = key + .key() + .labels() + .map(|l| format!("{}={}", l.key(), l.value())) + .collect(); + let labels_str = if labels.is_empty() { + String::new() + } else { + format!("{{{}}}", labels.join(",")) + }; + let value_str = match value { + DebugValue::Counter(n) => n.to_string(), + DebugValue::Gauge(g) => format!("{}", g.into_inner()), + DebugValue::Histogram(samples) => { + if samples.is_empty() { + "empty".to_string() + } else { + let floats: Vec = samples.iter().map(|s| s.into_inner()).collect(); + let count = floats.len(); + let sum: f64 = floats.iter().copied().sum(); + let min = floats.iter().copied().fold(f64::INFINITY, f64::min); + let max = floats.iter().copied().fold(f64::NEG_INFINITY, f64::max); + format!( + "count={} sum={:.3} min={:.3} max={:.3} mean={:.3}", + count, + sum, + min, + max, + sum / count as f64 + ) + } + } + }; + lines.push(format!(" {kind:<9} {name}{labels_str} = {value_str}")); + } + + if tracing::enabled!(Level::INFO) { + tracing::info!("metrics summary:\n{}", lines.join("\n")); + } else { + eprintln!("metrics summary:\n{}", lines.join("\n")); + } +} diff --git a/crates/xenia-app/tests/golden/sylpheed_n2m.json b/crates/xenia-app/tests/golden/sylpheed_n2m.json new file mode 100644 index 0000000..50a2276 --- /dev/null +++ b/crates/xenia-app/tests/golden/sylpheed_n2m.json @@ -0,0 +1,16 @@ +{ + "path": "/home/fabi/RE Project Sylpheed/Project Sylpheed - Arc of Deception (USA, Europe) (En,Ja).iso", + "instructions": 2000000, + "imports": 5634, + "unimpl": 0, + "packets": 0, + "draws": 0, + "swaps": 0, + "resolves": 0, + "unique_render_targets": 0, + "shader_blobs_live": 0, + "interrupts_delivered": 0, + "interrupts_dropped": 13, + "texture_cache_entries": 0, + "texture_decodes": 0 +} diff --git a/crates/xenia-app/tests/parallel_stress.rs b/crates/xenia-app/tests/parallel_stress.rs new file mode 100644 index 0000000..a365111 --- /dev/null +++ b/crates/xenia-app/tests/parallel_stress.rs @@ -0,0 +1,111 @@ +//! M3 real-parallelism stress harness. +//! +//! Runs `xenia-rs check sylpheed.iso --parallel --halt-on-deadlock` +//! many times back-to-back to surface lost-wakeups, lock-order +//! inversions, and ABA hazards that a single run wouldn't reliably +//! reproduce. Failures dump per-run stdout/stderr to +//! `target/parallel-stress-NNN.{stdout,stderr}` for post-mortem. +//! +//! Two configurations: +//! - `parallel_stress_short`: 20 runs at -n 5_000_000. Quick smoke +//! check — runs in a few minutes on the current substrate. +//! - `parallel_stress_long` (ignored, opt-in): 100 runs at +//! -n 50_000_000. The full gate from the master plan; expected +//! runtime is hours until the perf gap (Step 05's deferred parking +//! fix) closes. +//! +//! Run with `cargo test --release -p xenia-app --test parallel_stress +//! -- --ignored --nocapture` for the full 100x; otherwise the short +//! variant runs as part of the normal test suite when explicitly +//! invoked: `cargo test --release -p xenia-app --test parallel_stress +//! -- --nocapture parallel_stress_short`. + +use std::process::Command; +use std::time::Instant; + +const ISO_DEFAULT: &str = "/home/fabi/RE Project Sylpheed/Project Sylpheed - Arc of Deception (USA, Europe) (En,Ja).iso"; + +fn iso_path() -> String { + std::env::var("SYLPHEED_ISO").unwrap_or_else(|_| ISO_DEFAULT.to_string()) +} + +fn run_stress(label: &str, runs: u32, max_instr: u64) { + let bin = env!("CARGO_BIN_EXE_xenia-rs"); + let iso = iso_path(); + if !std::path::Path::new(&iso).exists() { + eprintln!("{label}: iso not found at {iso}; set SYLPHEED_ISO to override. SKIPPING."); + return; + } + std::fs::create_dir_all("target").ok(); + let mut failures: u32 = 0; + let mut wall_ms: Vec = Vec::with_capacity(runs as usize); + let max_instr_str = max_instr.to_string(); + for run in 1..=runs { + let t0 = Instant::now(); + let out = Command::new(bin) + .args([ + "exec", + &iso, + "-n", + &max_instr_str, + "--parallel", + "--halt-on-deadlock", + "--quiet", + ]) + .output() + .expect("failed to spawn xenia-rs"); + let dt = t0.elapsed().as_millis(); + wall_ms.push(dt); + let exit_ok = out.status.success(); + let vdswap2 = String::from_utf8_lossy(&out.stderr).contains("VdSwap") + || String::from_utf8_lossy(&out.stdout).contains("VdSwap"); + let _ = vdswap2; // VdSwap=2 not required at -n 5M; tracked for diagnostic only. + if !exit_ok { + failures += 1; + std::fs::write( + format!("target/parallel-stress-{label}-{run:03}.stdout"), + &out.stdout, + ) + .ok(); + std::fs::write( + format!("target/parallel-stress-{label}-{run:03}.stderr"), + &out.stderr, + ) + .ok(); + eprintln!( + "{label}: run {run}/{runs} FAILED (wall={}ms, exit={:?})", + dt, + out.status.code() + ); + } else { + eprintln!("{label}: run {run}/{runs} ok (wall={dt}ms)"); + } + } + wall_ms.sort(); + let p50 = wall_ms[wall_ms.len() / 2]; + let p95_idx = ((wall_ms.len() - 1) * 95) / 100; + let p95 = wall_ms[p95_idx]; + let max = *wall_ms.last().unwrap(); + eprintln!( + "{label} summary: runs={runs} ok={} failed={failures} p50={p50}ms p95={p95}ms max={max}ms", + runs - failures, + ); + assert_eq!(failures, 0, "{label}: {failures} of {runs} stress runs failed"); +} + +/// 20 runs at -n 5M. Session-feasible (~10 minutes at the current +/// perf level). Surfaces lost-wakeup / lock-order / phaser-timeout +/// bugs that a single run wouldn't reproduce. +#[test] +#[ignore = "stress test; run via `cargo test ... -- --ignored parallel_stress_short`"] +fn parallel_stress_short() { + run_stress("short", 20, 5_000_000); +} + +/// 100 runs at -n 50M. The full M3 follow-up gate per the master +/// plan. Expected runtime is hours until the perf gap closes. +#[test] +#[ignore = "full stress test; run via `cargo test ... -- --ignored parallel_stress_long`"] +fn parallel_stress_long() { + run_stress("long", 100, 50_000_000); +}