xenia-app: observability subsystem, --parallel runtime, stress harness

observability.rs installs the tracing subscriber stack (env-filter +
JSON file appender + chrome trace + error layer) and the metrics
recorder shared by the workspace. main.rs grows the new CLI surface:
--parallel, --reservations-table, --trace-handles, --analyze=
{rust,sql,both}, xenia dis --json, --ui, plus the wiring that runs
the CPU through the new scheduler, drives the GPU's threaded backend,
and surfaces the framebuffer + HUD via xenia-ui.

Add tests/parallel_stress.rs (#[ignore]-gated long form, short form
runs 20×@5M) and tests/golden/sylpheed_n2m.json — the digest the
lockstep/parallel combos compare against.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
MechaCat02
2026-05-01 16:30:26 +02:00
parent b1285ba560
commit bae9305982
5 changed files with 3169 additions and 170 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,384 @@
//! Logging, tracing, and profiling wiring for the `xenia-rs` CLI.
//!
//! Owns the `tracing-subscriber` registry, optional file / Chrome-trace sinks,
//! the `metrics` debugging recorder, and (behind the `profiling` feature) the
//! `pprof-rs` sampling profiler. All drop-time cleanup (flushing appenders,
//! finalising Chrome output, writing flamegraphs, printing the metrics
//! summary) is carried by [`ObservabilityGuards`] so `main` just has to hold
//! the value until return.
use std::path::{Path, PathBuf};
use anyhow::{bail, Context, Result};
use tracing::Level;
use tracing_error::{ErrorLayer, SpanTrace};
use tracing_subscriber::fmt::format::FmtSpan;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::{fmt, EnvFilter, Layer, Registry};
/// User-selectable observability settings parsed from CLI + environment.
#[derive(Debug, Clone, Default)]
pub struct ObservabilityConfig {
/// If `true`, render console logs as JSON instead of compact text.
pub log_json: bool,
/// Additional log sink file. `.json` → JSON formatter; anything else → text.
pub log_file: Option<PathBuf>,
/// Overrides `RUST_LOG` when set. Passed through `EnvFilter::try_new`.
pub log_filter: Option<String>,
/// Default filter directive used when neither `RUST_LOG` nor
/// [`log_filter`](Self::log_filter) are set.
pub default_level: &'static str,
/// If set, emit a Chrome `about:tracing` JSON trace to this path.
pub trace_chrome: Option<PathBuf>,
/// If set, run the pprof sampling profiler and write output here on drop.
/// Extension `.svg` → flamegraph, `.pb` → protobuf.
pub profile: Option<PathBuf>,
}
impl ObservabilityConfig {
#[allow(dead_code)]
pub fn new(default_level: &'static str) -> Self {
Self {
default_level,
..Self::default()
}
}
}
/// RAII handle returned by [`init`]. Drop flushes the appender, finalises
/// Chrome output, writes the pprof report, and prints the metrics summary.
#[must_use = "drop of ObservabilityGuards is what flushes logs, profiles, and metrics"]
pub struct ObservabilityGuards {
_appender: Option<tracing_appender::non_blocking::WorkerGuard>,
_chrome: Option<tracing_chrome::FlushGuard>,
#[cfg(feature = "profiling")]
pprof: Option<(pprof::ProfilerGuard<'static>, PathBuf)>,
metrics_snapshotter: Option<metrics_util::debugging::Snapshotter>,
}
impl Drop for ObservabilityGuards {
fn drop(&mut self) {
#[cfg(feature = "profiling")]
if let Some((guard, path)) = self.pprof.take() {
if let Err(e) = write_pprof_report(&guard, &path) {
eprintln!("profile write failed: {e:#}");
} else {
tracing::info!(path = %path.display(), "pprof report written");
}
}
if let Some(snap) = self.metrics_snapshotter.take() {
print_metrics_summary(&snap);
}
}
}
/// Build and install the global tracing subscriber + metrics recorder.
pub fn init(config: &ObservabilityConfig) -> Result<ObservabilityGuards> {
let span_events = parse_span_events();
// Resolve the filter directive once; attach a freshly-built `EnvFilter`
// per sink layer via `.with_filter()`. Previously the filter was pushed
// into the layer-`Vec` but that only gates what *itself* sees in a
// boxed-Vec setup; sibling fmt / chrome / file layers kept emitting
// filtered-out events. Per-layer filtering is the idiomatic tracing-
// subscriber pattern and works cleanly with boxed layer dispatch.
let directive = resolve_filter_directive(config);
let mut layers: Vec<Box<dyn Layer<Registry> + Send + Sync + 'static>> = Vec::new();
// Console fmt layer — compact text or JSON, always stderr.
let console_layer: Box<dyn Layer<Registry> + Send + Sync + 'static> = if config.log_json {
fmt::layer()
.json()
.with_span_events(span_events.clone())
.with_writer(std::io::stderr)
.with_filter(EnvFilter::try_new(&directive).context("invalid filter")?)
.boxed()
} else {
fmt::layer()
.compact()
.with_span_events(span_events.clone())
.with_writer(std::io::stderr)
.with_filter(EnvFilter::try_new(&directive).context("invalid filter")?)
.boxed()
};
layers.push(console_layer);
// Optional file sink — also filtered.
let appender_guard = match &config.log_file {
Some(path) => {
let (layer, guard) = build_file_layer(path, span_events)?;
layers.push(
layer
.with_filter(EnvFilter::try_new(&directive).context("invalid filter")?)
.boxed(),
);
Some(guard)
}
None => None,
};
// Optional Chrome `about:tracing` sink — intentionally UNFILTERED so
// traces capture the full picture even when the console is quiet.
let chrome_guard = match &config.trace_chrome {
Some(path) => {
let (layer, guard) = tracing_chrome::ChromeLayerBuilder::new()
.file(path.clone())
.include_args(true)
.build();
layers.push(layer.boxed());
Some(guard)
}
None => None,
};
// `tracing-error` layer enables SpanTrace capture in `with_span_trace`.
layers.push(ErrorLayer::default().boxed());
tracing_subscriber::registry()
.with(layers)
.try_init()
.context("tracing subscriber already initialized")?;
// `build_env_filter` is retained for compatibility with older callers;
// `resolve_filter_directive` above is what actually drives the layer
// filters.
let _ = build_env_filter(config);
// Install the metrics debugging recorder. `install` sets the global
// recorder; its snapshotter is held in the guards struct.
let recorder = metrics_util::debugging::DebuggingRecorder::new();
let snapshotter = recorder.snapshotter();
if recorder.install().is_err() {
tracing::warn!("a metrics recorder was already installed; skipping xenia-rs recorder");
}
#[cfg(feature = "profiling")]
let pprof = match &config.profile {
Some(path) => {
let guard = pprof::ProfilerGuardBuilder::default()
.frequency(100)
.blocklist(&["libc", "libgcc", "pthread", "vdso"])
.build()
.context("failed to start pprof sampling profiler")?;
Some((guard, path.clone()))
}
None => None,
};
#[cfg(not(feature = "profiling"))]
if config.profile.is_some() {
bail!("--profile requires building with --features profiling");
}
Ok(ObservabilityGuards {
_appender: appender_guard,
_chrome: chrome_guard,
#[cfg(feature = "profiling")]
pprof,
metrics_snapshotter: Some(snapshotter),
})
}
fn resolve_filter_directive(config: &ObservabilityConfig) -> String {
if let Some(ref f) = config.log_filter {
return f.clone();
}
if let Ok(f) = std::env::var("RUST_LOG")
&& !f.is_empty() {
return f;
}
config.default_level.to_string()
}
fn build_env_filter(config: &ObservabilityConfig) -> Result<EnvFilter> {
// Precedence: explicit --log-filter > RUST_LOG > default_level.
if let Some(ref f) = config.log_filter {
return EnvFilter::try_new(f).context("invalid --log-filter directive");
}
if let Ok(f) = EnvFilter::try_from_default_env() {
return Ok(f);
}
EnvFilter::try_new(config.default_level)
.with_context(|| format!("invalid default filter `{}`", config.default_level))
}
fn parse_span_events() -> FmtSpan {
match std::env::var("RUST_LOG_SPAN_EVENTS").as_deref() {
Ok("full") => FmtSpan::FULL,
Ok("close") => FmtSpan::CLOSE,
Ok("active") => FmtSpan::ACTIVE,
Ok("enter") => FmtSpan::ENTER,
Ok("exit") => FmtSpan::EXIT,
Ok("new") => FmtSpan::NEW,
_ => FmtSpan::NONE,
}
}
type FileLayerBox = Box<dyn Layer<Registry> + Send + Sync + 'static>;
fn build_file_layer(
path: &Path,
span_events: FmtSpan,
) -> Result<(FileLayerBox, tracing_appender::non_blocking::WorkerGuard)> {
let parent = path.parent().unwrap_or_else(|| Path::new("."));
let file_name = path
.file_name()
.ok_or_else(|| anyhow::anyhow!("log file path has no file name: {}", path.display()))?;
std::fs::create_dir_all(parent)
.with_context(|| format!("failed to create {}", parent.display()))?;
let appender = tracing_appender::rolling::never(parent, file_name);
let (non_blocking, guard) = tracing_appender::non_blocking(appender);
let ext = path
.extension()
.and_then(|e| e.to_str())
.unwrap_or_default();
let layer: FileLayerBox = if ext.eq_ignore_ascii_case("json") {
fmt::layer()
.json()
.with_span_events(span_events)
.with_writer(non_blocking)
.with_ansi(false)
.boxed()
} else {
fmt::layer()
.with_span_events(span_events)
.with_writer(non_blocking)
.with_ansi(false)
.boxed()
};
Ok((layer, guard))
}
/// Wrap an error with a captured span-trace so the top-level `main` can
/// render "failed in `cmd_exec > load_image > …`" alongside the regular
/// anyhow context chain.
#[allow(dead_code)]
pub fn with_span_trace<E>(err: E) -> anyhow::Error
where
E: std::error::Error + Send + Sync + 'static,
{
anyhow::Error::new(err).context(SpanTraceDisplay(SpanTrace::capture()))
}
/// Attach a captured span-trace to an existing `anyhow::Error` as extra
/// context. Used at command boundaries where errors already bubble as
/// `anyhow::Error`.
pub fn attach_span_trace(err: anyhow::Error) -> anyhow::Error {
err.context(SpanTraceDisplay(SpanTrace::capture()))
}
struct SpanTraceDisplay(SpanTrace);
impl std::fmt::Display for SpanTraceDisplay {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "span trace:\n{}", self.0)
}
}
impl std::fmt::Debug for SpanTraceDisplay {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
<Self as std::fmt::Display>::fmt(self, f)
}
}
#[cfg(feature = "profiling")]
fn write_pprof_report(guard: &pprof::ProfilerGuard<'static>, path: &Path) -> Result<()> {
let report = guard.report().build().context("pprof report build failed")?;
let ext = path
.extension()
.and_then(|e| e.to_str())
.unwrap_or("")
.to_ascii_lowercase();
let parent = path.parent().unwrap_or_else(|| Path::new("."));
std::fs::create_dir_all(parent).ok();
match ext.as_str() {
"svg" | "" => {
let file = std::fs::File::create(path)
.with_context(|| format!("create {}", path.display()))?;
report
.flamegraph(file)
.context("flamegraph render failed")?;
}
"pb" | "proto" | "pprof" => {
use pprof::protos::Message;
let profile = report.pprof().context("pprof protobuf build failed")?;
let buf = profile
.write_to_bytes()
.context("pprof protobuf encode failed")?;
std::fs::write(path, &buf).with_context(|| format!("write {}", path.display()))?;
}
other => bail!("unknown --profile extension `.{other}` (use .svg or .pb)"),
}
Ok(())
}
fn print_metrics_summary(snap: &metrics_util::debugging::Snapshotter) {
use metrics_util::debugging::DebugValue;
use metrics_util::MetricKind;
let snapshot = snap.snapshot();
let rows = snapshot.into_vec();
if rows.is_empty() {
return;
}
// Group counters, gauges, histograms into simple lines. Use tracing so
// the summary honours the installed subscriber (can land in file + JSON
// sinks and not just stderr).
let mut lines: Vec<String> = Vec::with_capacity(rows.len());
for (key, _unit, _desc, value) in rows {
let kind = match key.kind() {
MetricKind::Counter => "counter",
MetricKind::Gauge => "gauge",
MetricKind::Histogram => "histogram",
};
let name = key.key().name();
let labels: Vec<String> = key
.key()
.labels()
.map(|l| format!("{}={}", l.key(), l.value()))
.collect();
let labels_str = if labels.is_empty() {
String::new()
} else {
format!("{{{}}}", labels.join(","))
};
let value_str = match value {
DebugValue::Counter(n) => n.to_string(),
DebugValue::Gauge(g) => format!("{}", g.into_inner()),
DebugValue::Histogram(samples) => {
if samples.is_empty() {
"empty".to_string()
} else {
let floats: Vec<f64> = samples.iter().map(|s| s.into_inner()).collect();
let count = floats.len();
let sum: f64 = floats.iter().copied().sum();
let min = floats.iter().copied().fold(f64::INFINITY, f64::min);
let max = floats.iter().copied().fold(f64::NEG_INFINITY, f64::max);
format!(
"count={} sum={:.3} min={:.3} max={:.3} mean={:.3}",
count,
sum,
min,
max,
sum / count as f64
)
}
}
};
lines.push(format!(" {kind:<9} {name}{labels_str} = {value_str}"));
}
if tracing::enabled!(Level::INFO) {
tracing::info!("metrics summary:\n{}", lines.join("\n"));
} else {
eprintln!("metrics summary:\n{}", lines.join("\n"));
}
}