ITERATE-2.V: scheduler priority aging closes 18-day AUDIT-049 wedge

Priority aging in xenia-cpu/scheduler.rs:pick_runnable
(effective_priority = base + age_bonus(now_round - last_run_round),
capped at +31, AGING_ROUNDS_PER_BONUS=1). Strict-priority was parking
priority=0 threads behind CPU-bound priority=15 audio mixer
(sub_824D1328 guest spinwait at PC=0x824d1404 on CPU5). Aging
eventually picks the starved thread, breaking the producer-consumer
cycle that caused 5-tid wedge at PC=0x824ac578 since AUDIT-049 (10 May).

Cascade observed: tid=13 clean exit; events 121K -> 13M (107x); last
host_ns 767ms -> 51,011ms (66x); 8 new threads spawn; VdSwap 1 -> 2.

Complete two-day iterate sequence (2026-05-27 -> 2026-05-28):
- 2.F: VdSwap drain timeout 900ms -> 1ms (xenia-gpu/handle.rs); 876x
       perf win on VdSwap kernel callback
- 2.H: vA0000000 physical heap bucket added (state.rs, exports.rs);
       ctx_ptrs now in 0xA0000000-0xBFFFFFFF range matching canary
- 2.L: Phase-A diff harness categorized [return_value mismatch],
       [status mismatch], [args_resolved.path mismatch] tags
       (tools/diff-events/diff_events.py); closes reading-error #41
       (silent test-harness state leak invalidating trace diffs)
- 2.M: always-on exit-thread-state.json sibling to Phase-A JSONL
       (event_log.rs + xenia-app/main.rs); closes reading-error #42
       (Phase-A blind to blocked-forever waits)
- 2.Q: signal.match kernel instrumentation in NtSetEvent /
       NtReleaseSemaphore / KeSetEvent / KeReleaseSemaphore
       (exports.rs); emits target_handle + waiter_count + waiter_tids
- 2.T: wake.requested kernel instrumentation in wake_eligible_waiters
       (exports.rs); emits target_tid + transition + new_state
- 2.V: scheduler priority aging (xenia-cpu/scheduler.rs) [keystone]

Plus accumulated WIP from earlier May (contention_manifest,
phase_b_snapshot, xam/xaudio enhancements, analysis db, xex loader,
xenia-app main loop, etc.). Audit-runs/ artifacts remain untracked
per project convention.

Tests: 300 xenia-cpu / 227 xenia-kernel / 5 xenia-app / 19 xenia-path
/ 30+ smaller suites -- all PASS, 0 regressions. Determinism preserved
(2x cold runs bit-identical at 13,003,881 events post-2.V).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
MechaCat02
2026-05-29 07:27:26 +02:00
parent e6d43a23ac
commit ad45873a1b
50 changed files with 14389 additions and 506 deletions

View File

@@ -0,0 +1,73 @@
# diff_events.py — Phase A event-log diff tool
A stdlib-only Python tool that diffs two schema-v1 JSONL event logs (one per engine) and reports the **first behavioral divergence per guest thread**. Built for the Phase A diff harness — see `audit-runs/phase-a-diff-harness/README.md` and `schema-v1.md`.
## What it does
1. Reads two JSONL files. Validates each begins with a `schema_version=1` header event.
2. Builds per-thread streams keyed by `tid_event_idx` (the schema's per-tid monotonic counter).
3. Maps canary-tid ↔ ours-tid (auto-pairs by first `kernel.call` name in each stream, or manual via `--tid-map`).
4. Walks each mapped pair in parallel, comparing events with rules from the schema (raw_handle_id skipped, host_ns skipped, wait_duration_cycles skipped, etc.).
5. On first divergence: prints 5-event pre-context + the divergent event + the next event from each. Stops that thread's walk.
6. Writes a markdown report.
## Usage
```bash
# Default — auto-map tids, write markdown to stdout
python3 diff_events.py --canary canary.jsonl --ours ours.jsonl
# Write report to a file
python3 diff_events.py --canary c.jsonl --ours o.jsonl --out report.md
# Manual tid map
python3 diff_events.py --canary c.jsonl --ours o.jsonl --tid-map 6=1,7=2
# Negative-test mode — exit non-zero on ANY divergence (gate-4)
python3 diff_events.py --canary c.jsonl --ours o.jsonl --validate-identical
```
## How it compares
These fields are **skipped** when comparing payloads:
- Top-level: `engine`, `host_ns`, `guest_cycle`, `deterministic`.
- `handle.create`/`handle.destroy`: `raw_handle_id`, `handle_semantic_id` (engine-local).
- `wait.begin`: `handles_semantic_ids` (engine-local SIDs).
- `wait.end`: `wait_duration_cycles` (depends on host scheduling), `woken_by_semantic_id`.
The `tid_event_idx` field is the **alignment key**. Two events at the same `tid_event_idx` on a mapped pair of tids are expected to be the same logical event. The `kind` must match; the `payload` must match field-by-field (except skipped fields).
## Phase C+18 — Cross-tid floating `handle.create` (shared-global dispatchers)
Process-global kernel dispatcher objects (`KEVENT`/`KSEMAPHORE` etc. that game code creates with `KeInitializeEvent` or static-allocs and shares across multiple guest threads) are lazy-wrapped on **first guest-thread touch** by canary's `XObject::GetNativeObject` and ours's `ensure_dispatcher_object`. Whichever thread happens to touch the dispatcher first synthesizes the wrapper and emits the `handle.create` event. Which thread wins is timing-dependent — canary and ours may disagree.
The SID for these synthesized handles is computed via a **scheduling-invariant recipe** keyed on `(pointer, object_type)` only (see schema-v1.md §"Shared-global SIDs"). The same dispatcher therefore yields the same SID in both engines regardless of the first-toucher thread.
The diff tool detects shared-global `handle.create` events by recomputing the deterministic SID from the event's `(raw_handle_id, object_type)` payload and matching against the emitted `handle_semantic_id`. When per-tid alignment finds one side has an "extra" `handle.create` event whose SID is in the global set, the tool **advances only that side's stream pointer past the floating event** and re-compares — preserving strict alignment for everything else.
The summary table shows per-pair `floating_skipped (c/o)` counts so you can see how many events were absorbed by this mechanism.
## Known limitations (v1)
- **Auto tid-map is naive**: pairs canary-tid with ours-tid by the first `kernel.call` name on each thread. Works for boot when the same initial call happens on each engine's primary thread; can mis-pair if two threads start with the same first-call name or if a thread spawns earlier on one engine. Use `--tid-map` to override.
- **No streaming**: loads both files fully into memory. Acceptable for boot-window runs; the canary log is ~370 MB for a 12 s run.
- **First-divergence only**: per-thread walk stops at first divergence. Subsequent divergences on the same thread are not reported (a sliding-window mode could be added later if needed).
- **Schema v1 only**: refuses to parse v2 inputs (forward-incompat is intentional).
## Files
- `diff_events.py` — single-file CLI, stdlib only (json, argparse, pathlib).
- `README.md` — this file.
## Test it
```bash
# Self-diff (compare a file against itself) should report 0 divergences.
python3 diff_events.py --canary x.jsonl --ours x.jsonl --validate-identical
echo "exit=$?" # expect 0
# Negative test: corrupt one event and confirm the tool reports it.
sed '50s/"kernel.call"/"kernel.CORRUPT"/' x.jsonl > /tmp/x-corrupt.jsonl
python3 diff_events.py --canary x.jsonl --ours /tmp/x-corrupt.jsonl --validate-identical
echo "exit=$?" # expect 1
```

View File

@@ -0,0 +1,253 @@
#!/usr/bin/env python3
"""Phase D Stage 2 — contention-manifest builder.
Reads a Phase A JSONL event log produced by canary with cvar
`kernel_emit_contention=true` (Stage 1) and distills it to a
replay-ready manifest for Stage 3 to consume.
Output schema (`contention_manifest.json`):
{
"version": 1,
"source_canary_jsonl": "<absolute path>",
"source_canary_sha256": "<hex>",
"built_at_host_unix": <int>,
"summary": {
"total_input_events": <int>,
"total_contention_events_kept": <int>,
"per_tid_counts": { "<tid>": <int>, ... }
},
"entries": [
{ "tid": 6, "tid_event_idx": 104664, "site_sid": "c26a128bf45411f7",
"cs_ptr": "0xbc65c890", "contended": true },
...
]
}
Entries are sorted by (tid asc, tid_event_idx asc). Stage 3's ours-side
replay loader keys on `(tid, tid_event_idx)`; the canary tid is the
*native* tid emitted by canary (no display-mapping is applied here —
see investigation.md §"Tid mapping is per-engine native").
Only events with `kind == "contention.observed"` and `contended == true`
are kept. Stage 1's emitter never emits `contended=false`, so this
filter is paranoid-defensive. Schema events / handle events / wait
events are dropped.
Usage:
python3 build_contention_manifest.py \\
--canary-jsonl path/to/canary-cvaron-trunc.jsonl \\
--out path/to/contention_manifest.json
Exit 0 on success. Exit 1 on parse error or empty manifest (no
contention events found — likely cvar wasn't enabled when the trace
was captured).
"""
import argparse
import hashlib
import json
import sys
import time
from pathlib import Path
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(description=__doc__.splitlines()[0])
p.add_argument(
"--canary-jsonl",
required=True,
help="Path to canary Phase A JSONL log (with cvar=true).",
)
p.add_argument(
"--out",
required=True,
help="Output path for contention_manifest.json.",
)
p.add_argument(
"--tid-map",
default="",
help=(
"Optional canary→ours tid translation. Format "
"'CANARY=OURS,CANARY=OURS,...' (e.g. '6=1,7=2,4=11'). When "
"supplied, manifest entries are emitted with the ours-side tid "
"so the Stage-3 consumer can key on its own native current_tid. "
"Entries on a canary tid NOT in the map are dropped with a "
"warning. Same format as diff_events.py."
),
)
p.add_argument(
"--quiet",
action="store_true",
help="Suppress the human-readable summary on stderr.",
)
return p.parse_args()
def parse_tid_map(s: str) -> dict[int, int] | None:
"""Parse 'a=b,c=d' into {a: b, c: d}. Empty/None → None."""
s = s.strip()
if not s:
return None
out: dict[int, int] = {}
for piece in s.split(","):
piece = piece.strip()
if not piece:
continue
if "=" not in piece:
raise ValueError(f"bad tid-map fragment: {piece!r}")
l, r = piece.split("=", 1)
out[int(l.strip())] = int(r.strip())
return out
def sha256_of(path: Path) -> str:
h = hashlib.sha256()
with path.open("rb") as f:
for chunk in iter(lambda: f.read(1 << 20), b""):
h.update(chunk)
return h.hexdigest()
def build_manifest(
jsonl_path: Path,
tid_map: dict[int, int] | None = None,
) -> dict:
"""Read `jsonl_path` and return a manifest dict.
If `tid_map` (canary_tid → ours_tid) is provided, entries are written
with the translated ours-side tid. Entries on a canary tid not in
the map are dropped (counted in `summary.skipped_unmapped_tids`).
When `tid_map` is None, manifest tids are canary's native values
(back-compat with Stage 2's first iteration).
Raises FileNotFoundError / json.JSONDecodeError on bad input.
"""
entries: list[dict] = []
total_input = 0
bad_lines = 0
unmapped = 0
with jsonl_path.open("r", encoding="utf-8") as f:
for lineno, line in enumerate(f, start=1):
line = line.rstrip("\n")
if not line:
continue
total_input += 1
try:
ev = json.loads(line)
except json.JSONDecodeError:
bad_lines += 1
continue
if ev.get("kind") != "contention.observed":
continue
payload = ev.get("payload") or {}
if payload.get("contended") is not True:
continue
canary_tid = int(ev["tid"])
if tid_map is not None:
if canary_tid not in tid_map:
unmapped += 1
continue
tid = tid_map[canary_tid]
else:
tid = canary_tid
entry = {
"tid": tid,
"tid_event_idx": int(ev["tid_event_idx"]),
"site_sid": str(payload.get("site_sid", "")),
"cs_ptr": str(payload.get("cs_ptr", "")),
"contended": True,
}
# Defensive: every Stage 1 event carries cs_ptr + site_sid.
# If either is missing, skip rather than emit a broken entry.
if not entry["site_sid"] or not entry["cs_ptr"]:
bad_lines += 1
continue
entries.append(entry)
# Stable sort by (tid, tid_event_idx). Same (tid, idx) pair is not
# expected — the per-tid counter is monotone — but if duplicates
# appear (e.g. mis-merged jsonls), keep the first; later phases would
# otherwise see ambiguous manifest keys.
entries.sort(key=lambda e: (e["tid"], e["tid_event_idx"]))
deduped: list[dict] = []
seen: set[tuple[int, int]] = set()
dup_count = 0
for e in entries:
key = (e["tid"], e["tid_event_idx"])
if key in seen:
dup_count += 1
continue
seen.add(key)
deduped.append(e)
per_tid: dict[str, int] = {}
for e in deduped:
per_tid[str(e["tid"])] = per_tid.get(str(e["tid"]), 0) + 1
return {
"version": 1,
"source_canary_jsonl": str(jsonl_path.resolve()),
"source_canary_sha256": sha256_of(jsonl_path),
"built_at_host_unix": int(time.time()),
"tid_map": tid_map,
"summary": {
"total_input_events": total_input,
"total_contention_events_kept": len(deduped),
"skipped_bad_lines": bad_lines,
"skipped_unmapped_tids": unmapped,
"skipped_duplicate_keys": dup_count,
"per_tid_counts": per_tid,
},
"entries": deduped,
}
def render_summary(manifest: dict) -> str:
s = manifest["summary"]
lines = [
f"contention manifest built from {manifest['source_canary_jsonl']}",
f" source sha256: {manifest['source_canary_sha256']}",
f" total input events scanned: {s['total_input_events']}",
f" contention events kept: {s['total_contention_events_kept']}",
f" bad/skipped lines: {s['skipped_bad_lines']}",
f" duplicate (tid,idx) skipped: {s['skipped_duplicate_keys']}",
" per-tid counts:",
]
for tid, count in sorted(s["per_tid_counts"].items(),
key=lambda kv: int(kv[0])):
lines.append(f" tid={int(tid):4d} {count}")
return "\n".join(lines)
def main() -> int:
args = parse_args()
src = Path(args.canary_jsonl)
if not src.is_file():
print(f"error: not a file: {src}", file=sys.stderr)
return 1
try:
tid_map = parse_tid_map(args.tid_map)
except ValueError as e:
print(f"error: --tid-map: {e}", file=sys.stderr)
return 1
manifest = build_manifest(src, tid_map=tid_map)
if manifest["summary"]["total_contention_events_kept"] == 0:
print(
"error: 0 contention.observed events found — was the trace "
"captured with --kernel_emit_contention=true?",
file=sys.stderr,
)
return 1
out = Path(args.out)
out.parent.mkdir(parents=True, exist_ok=True)
with out.open("w", encoding="utf-8") as f:
json.dump(manifest, f, indent=2)
f.write("\n")
if not args.quiet:
print(render_summary(manifest), file=sys.stderr)
return 0
if __name__ == "__main__":
sys.exit(main())

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,299 @@
#!/usr/bin/env python3
"""Unit tests for `build_contention_manifest.py`.
Run as `python3 test_build_manifest.py` — prints `PASS` per test.
"""
import json
import sys
import tempfile
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from build_contention_manifest import build_manifest, render_summary # noqa: E402
def write_jsonl(lines: list[str]) -> Path:
tmp = tempfile.NamedTemporaryFile(
mode="w", suffix=".jsonl", delete=False, encoding="utf-8"
)
for line in lines:
tmp.write(line + "\n")
tmp.close()
return Path(tmp.name)
def mk_event(
kind: str,
tid: int,
idx: int,
payload: dict,
engine: str = "canary",
) -> str:
return json.dumps(
{
"schema_version": 1,
"engine": engine,
"kind": kind,
"tid": tid,
"tid_event_idx": idx,
"guest_cycle": 0,
"host_ns": 0,
"deterministic": True,
"payload": payload,
}
)
def test_basic_extract() -> None:
src = write_jsonl([
mk_event("import.call", 6, 0, {"name": "Foo"}),
mk_event(
"contention.observed",
6,
104664,
{"cs_ptr": "0xbc65c890", "site_sid": "c26a128b", "contended": True},
),
mk_event("import.call", 6, 1, {"name": "Bar"}),
])
m = build_manifest(src)
assert m["version"] == 1
assert m["summary"]["total_input_events"] == 3
assert m["summary"]["total_contention_events_kept"] == 1
assert m["summary"]["per_tid_counts"] == {"6": 1}
e = m["entries"][0]
assert e["tid"] == 6 and e["tid_event_idx"] == 104664
assert e["site_sid"] == "c26a128b" and e["cs_ptr"] == "0xbc65c890"
assert e["contended"] is True
print("PASS test_basic_extract")
def test_filters_non_contention_kinds() -> None:
src = write_jsonl([
mk_event("handle.create", 6, 0, {"handle_semantic_id": "x"}),
mk_event("wait.begin", 6, 1, {"handles_semantic_ids": ["x"]}),
mk_event("kernel.call", 6, 2, {"name": "X"}),
mk_event(
"contention.observed",
7,
42,
{"cs_ptr": "0x1000", "site_sid": "deadbeef", "contended": True},
),
])
m = build_manifest(src)
assert m["summary"]["total_contention_events_kept"] == 1
assert m["entries"][0]["tid"] == 7
print("PASS test_filters_non_contention_kinds")
def test_filters_contended_false() -> None:
# Stage 1's emitter never emits contended=false today, but defensive
# filter must skip those if a future variant adds them.
src = write_jsonl([
mk_event(
"contention.observed",
6,
10,
{"cs_ptr": "0xa", "site_sid": "11", "contended": False},
),
mk_event(
"contention.observed",
6,
11,
{"cs_ptr": "0xa", "site_sid": "11", "contended": True},
),
])
m = build_manifest(src)
assert m["summary"]["total_contention_events_kept"] == 1
assert m["entries"][0]["tid_event_idx"] == 11
print("PASS test_filters_contended_false")
def test_sorts_by_tid_then_idx() -> None:
src = write_jsonl([
mk_event(
"contention.observed",
9,
5,
{"cs_ptr": "0x9", "site_sid": "99", "contended": True},
),
mk_event(
"contention.observed",
6,
200,
{"cs_ptr": "0xb", "site_sid": "bb", "contended": True},
),
mk_event(
"contention.observed",
6,
100,
{"cs_ptr": "0xa", "site_sid": "aa", "contended": True},
),
])
m = build_manifest(src)
keys = [(e["tid"], e["tid_event_idx"]) for e in m["entries"]]
assert keys == [(6, 100), (6, 200), (9, 5)], keys
print("PASS test_sorts_by_tid_then_idx")
def test_deduplicates_same_tid_idx() -> None:
src = write_jsonl([
mk_event(
"contention.observed",
6,
42,
{"cs_ptr": "0xa", "site_sid": "aa", "contended": True},
),
mk_event(
"contention.observed",
6,
42,
{"cs_ptr": "0xb", "site_sid": "bb", "contended": True},
),
])
m = build_manifest(src)
assert m["summary"]["total_contention_events_kept"] == 1
assert m["summary"]["skipped_duplicate_keys"] == 1
# Keeps the first occurrence.
assert m["entries"][0]["cs_ptr"] == "0xa"
print("PASS test_deduplicates_same_tid_idx")
def test_skips_missing_fields() -> None:
src = write_jsonl([
# Missing site_sid.
mk_event(
"contention.observed",
6,
1,
{"cs_ptr": "0xa", "contended": True},
),
# Missing cs_ptr.
mk_event(
"contention.observed",
6,
2,
{"site_sid": "aa", "contended": True},
),
# Both present — kept.
mk_event(
"contention.observed",
6,
3,
{"cs_ptr": "0xb", "site_sid": "bb", "contended": True},
),
])
m = build_manifest(src)
assert m["summary"]["total_contention_events_kept"] == 1
assert m["summary"]["skipped_bad_lines"] == 2
print("PASS test_skips_missing_fields")
def test_handles_bad_json_lines() -> None:
src = write_jsonl([
"not-json",
mk_event(
"contention.observed",
6,
1,
{"cs_ptr": "0xa", "site_sid": "aa", "contended": True},
),
"{\"truncated\":",
])
m = build_manifest(src)
assert m["summary"]["total_contention_events_kept"] == 1
assert m["summary"]["skipped_bad_lines"] == 2
print("PASS test_handles_bad_json_lines")
def test_render_summary_human_readable() -> None:
src = write_jsonl([
mk_event(
"contention.observed",
6,
1,
{"cs_ptr": "0xa", "site_sid": "aa", "contended": True},
),
mk_event(
"contention.observed",
14,
100,
{"cs_ptr": "0xb", "site_sid": "bb", "contended": True},
),
])
m = build_manifest(src)
out = render_summary(m)
assert "contention events kept: 2" in out
assert "tid= 6 1" in out
assert "tid= 14 1" in out
print("PASS test_render_summary_human_readable")
def test_empty_input_yields_zero_kept() -> None:
src = write_jsonl([mk_event("import.call", 0, 0, {"name": "X"})])
m = build_manifest(src)
assert m["summary"]["total_contention_events_kept"] == 0
assert m["entries"] == []
print("PASS test_empty_input_yields_zero_kept")
def test_tid_map_translates_canary_to_ours() -> None:
src = write_jsonl([
mk_event(
"contention.observed",
6,
104664,
{"cs_ptr": "0xbc65c890", "site_sid": "c26a128bf45411f7", "contended": True},
),
mk_event(
"contention.observed",
7,
10,
{"cs_ptr": "0xa", "site_sid": "aa", "contended": True},
),
])
m = build_manifest(src, tid_map={6: 1, 7: 2})
assert m["entries"][0]["tid"] == 1, m["entries"][0]
assert m["entries"][1]["tid"] == 2
print("PASS test_tid_map_translates_canary_to_ours")
def test_tid_map_drops_unmapped_canary_tids() -> None:
src = write_jsonl([
mk_event(
"contention.observed",
6,
100,
{"cs_ptr": "0xa", "site_sid": "aa", "contended": True},
),
mk_event(
"contention.observed",
99,
200,
{"cs_ptr": "0xb", "site_sid": "bb", "contended": True},
),
])
m = build_manifest(src, tid_map={6: 1})
assert m["summary"]["total_contention_events_kept"] == 1
assert m["summary"]["skipped_unmapped_tids"] == 1
assert m["entries"][0]["tid"] == 1
print("PASS test_tid_map_drops_unmapped_canary_tids")
if __name__ == "__main__":
tests = [
test_basic_extract,
test_filters_non_contention_kinds,
test_filters_contended_false,
test_sorts_by_tid_then_idx,
test_deduplicates_same_tid_idx,
test_skips_missing_fields,
test_handles_bad_json_lines,
test_render_summary_human_readable,
test_empty_input_yields_zero_kept,
test_tid_map_translates_canary_to_ours,
test_tid_map_drops_unmapped_canary_tids,
]
for t in tests:
t()
print(f"\nALL {len(tests)} TESTS PASS")

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,75 @@
# diff-state
Phase B initial-state snapshot diff tool. Stdlib-only Python. Mirrors the
shape of `tools/diff-events/` but operates on the *static structural*
snapshots emitted by `phase_b_snapshot` at the moment immediately before
the first guest PPC instruction of the XEX entry_point executes.
## Usage
```bash
python3 tools/diff-state/diff_state.py \
--canary <snapshot_dir>/canary \
--ours <snapshot_dir>/ours \
--out <snapshot_dir>/report.md
```
Writes:
- `<snapshot_dir>/report.md` — human-readable divergence catalog
- `<snapshot_dir>/report.json` — machine-readable sibling (same content)
## Exit codes
| code | meaning |
|---|---|
| 0 | no divergence (or `--validate-identical` succeeded) |
| 1 | divergences found |
| 2 | STOP triggered (`image_loaded_sha256` / `xex_entry_point` / `iso_sha256` mismatch) |
## Field-comparison rules
Lives at the top of `diff_state.py` as Python constants — read those for
the authoritative spec. Summary:
- `engine`, `schema_version`, `deterministic_skip` are always skipped.
- `cpu_state.json`: skip `hw_id`.
- `kernel.json`: skip `raw_handle_id`, `exports_registered_count`.
- `config.json`: skip `build_id`, `iso_path`, `host_ns_at_snapshot`,
`wall_clock_iso8601`, `cli_argv`, `cvars.phase_b_snapshot_dir`.
- Each snapshot's `deterministic_skip` array is honored too.
## Set vs sequence semantics
- **Set** (sort by key, then positional compare):
- `kernel.json::objects` (key=`handle_semantic_id`)
- `kernel.json::handle_name_table` (key=`name`)
- `vfs.json::cache_root_listing` (key=`relpath`)
- `memory.json::heaps` (key=`base`)
- **Sequence** (positional compare): everything else, including
`memory.json::regions` (which both engines emit pre-sorted by
`(start, end)`).
## Classification
| class | trigger | priority |
|---|---|---|
| σ-structural | field missing/extra; sequence-length mismatch; set element only in one engine | 1 (always report) |
| δ-content-STOP | `image_loaded_sha256` / `xex_entry_point` / `iso_sha256` mismatch | STOP (exit 2) |
| δ-content | other `*_sha256` field differs | 2 |
| γ-kernel-content | `objects[].details` field differs | 2 — primary Phase C target |
| κ-cache | non-empty `cache_root_listing` either side | re-run after `rm -rf` of caches |
| ε-host-allocator | heap base/region start differs but sha256 agrees | catalog only |
| τ-host-timing | `deterministic_skip`-listed timing field | silent unless verbose |
## Negative-test recipe
To verify the tool catches a hand-mutation:
```bash
cp -r snap-001/ours snap-001/ours-mut
sed -i 's/"thread_id": 1/"thread_id": 999/' snap-001/ours-mut/kernel.json
python3 tools/diff-state/diff_state.py \
--canary snap-001/ours --ours snap-001/ours-mut --out /tmp/r.md
# exit code 1; report names objects[handle_semantic_id=...] details.thread_id
```

View File

@@ -0,0 +1,545 @@
#!/usr/bin/env python3
"""Phase B state-snapshot diff tool.
Reads two snapshot directories (one per engine, `<dir>/canary/` and
`<dir>/ours/`) emitted by `phase_b_snapshot` at the moment immediately
before the first guest PPC instruction of the XEX entry_point. Produces
a markdown report (`report.md`) plus a machine-readable JSON sibling
(`report.json`) classifying every observable divergence.
Field-comparison rules + classification table:
audit-runs/phase-b-state-equivalence/README.md
Both engines' emitter source + this tool read the same rules.
Usage:
diff_state.py --canary <dir>/canary --ours <dir>/ours [--out report.md]
diff_state.py --canary <a> --ours <b> --validate-identical
Exit codes:
0 — no divergence (or `--validate-identical` succeeded)
1 — divergences found
2 — STOP triggered (image_loaded_sha256 / xex_entry_point / iso_sha256
mismatch — interpretation of downstream files is not valid)
"""
from __future__ import annotations
import argparse
import hashlib
import json
import sys
from pathlib import Path
from typing import Any
SCHEMA_VERSION = 1
# ---------- field-comparison rules (declared up front) ----------
# Per-snapshot-file fields the diff tool always skips at the top level.
SKIP_TOP_FIELDS = {"schema_version", "engine", "deterministic_skip"}
# Per-file: extra fields skipped. JSON-pointer-style ("a.b.c") matched
# either at top-level keys or within array-of-objects members keyed by
# `handle_semantic_id` etc.
SKIP_BY_FILE: dict[str, set[str]] = {
"cpu_state.json": {"hw_id"},
"memory.json": set(),
"kernel.json": {"raw_handle_id", "exports_registered_count"},
"vfs.json": set(),
"config.json": {
"build_id",
"iso_path",
"host_ns_at_snapshot",
"wall_clock_iso8601",
"cli_argv",
"cvars.phase_b_snapshot_dir",
},
}
# `objects` etc. are sets (sort then compare); `regions`/`probes`/`gpr`/
# etc. are sequences (positional compare). Mismatches handled separately.
SET_FIELDS: dict[str, dict[str, str]] = {
# file -> field_name -> sort-key (used as dict key)
"kernel.json": {
"objects": "handle_semantic_id",
"handle_name_table": "name",
},
"vfs.json": {"cache_root_listing": "relpath"},
"memory.json": {"heaps": "base"},
}
# STOP-trigger fields (δ-content critical equivalence).
# Note: image_loaded_sha256 is reported but NOT a STOP trigger here. The
# raw hash mismatches when engines patch imports differently — see
# check_invariants() which evaluates `image_canonical_sha256` (computed
# from image.bin + xex.json) as the real semantic STOP key.
STOP_FIELDS = {
("config.json", "xex_entry_point"),
("config.json", "iso_sha256"),
}
# ---------- divergence record ----------
class Divergence:
__slots__ = ("file", "path", "kind", "canary", "ours", "klass")
def __init__(self, file: str, path: str, kind: str, canary: Any, ours: Any, klass: str):
self.file = file
self.path = path
self.kind = kind
self.canary = canary
self.ours = ours
self.klass = klass
def to_dict(self) -> dict:
return {
"file": self.file,
"path": self.path,
"kind": self.kind,
"canary": self.canary,
"ours": self.ours,
"class": self.klass,
}
# ---------- classification ----------
def classify(file: str, path: str, kind: str, canary: Any, ours: Any) -> str:
if (file, path) in STOP_FIELDS:
return "delta-content-STOP"
if kind in ("set-size-mismatch", "missing-field", "extra-field", "seq-length"):
return "sigma-structural"
if path.endswith(".sha256") or path.endswith("_sha256"):
return "delta-content"
if path.startswith("objects[") and ".details." in path:
return "gamma-kernel-content"
if file == "vfs.json" and path.startswith("cache_root_listing"):
return "kappa-cache"
if path in ("heaps[].base", "heaps[].name"):
return "epsilon-host-allocator"
if path in ("host_ns_at_snapshot", "wall_clock_iso8601"):
return "tau-host-timing"
return "gamma-kernel-content"
# ---------- generic walker ----------
def collect_skip_set(file: str, doc: dict) -> set[str]:
s = set(SKIP_TOP_FIELDS) | set(SKIP_BY_FILE.get(file, set()))
extra = doc.get("deterministic_skip")
if isinstance(extra, list):
for x in extra:
if isinstance(x, str):
s.add(x)
return s
def is_skipped(file: str, path: str, skip: set[str]) -> bool:
if path in skip:
return True
# Strip array indices for membership check, so "objects[].raw_handle_id"
# in the skip set matches "objects[3].raw_handle_id".
bracketed = []
parts = path.split(".")
for p in parts:
idx = p.find("[")
if idx >= 0:
bracketed.append(p[:idx] + "[]")
else:
bracketed.append(p)
norm = ".".join(bracketed)
if norm in skip:
return True
# Last-token (leaf field) match — e.g. "raw_handle_id" anywhere.
leaf = bracketed[-1]
if leaf in skip:
return True
return False
def diff_value(
file: str,
path: str,
a: Any,
b: Any,
out: list[Divergence],
skip: set[str],
set_keys: dict[str, str] | None = None,
) -> None:
if is_skipped(file, path, skip):
return
if type(a) != type(b):
out.append(Divergence(file, path, "type-mismatch", a, b,
classify(file, path, "type-mismatch", a, b)))
return
if isinstance(a, dict):
a_keys = set(a.keys())
b_keys = set(b.keys())
for k in sorted(a_keys - b_keys):
sub = f"{path}.{k}" if path else k
if is_skipped(file, sub, skip):
continue
out.append(Divergence(file, sub, "missing-field", a[k], None,
classify(file, sub, "missing-field", a[k], None)))
for k in sorted(b_keys - a_keys):
sub = f"{path}.{k}" if path else k
if is_skipped(file, sub, skip):
continue
out.append(Divergence(file, sub, "extra-field", None, b[k],
classify(file, sub, "extra-field", None, b[k])))
for k in sorted(a_keys & b_keys):
sub = f"{path}.{k}" if path else k
diff_value(file, sub, a[k], b[k], out, skip, set_keys)
return
if isinstance(a, list):
# Set-field handling: sort by configured key.
last_seg = path.rsplit(".", 1)[-1] if path else ""
bare = last_seg.split("[", 1)[0]
key = (set_keys or {}).get(bare)
if key is not None:
a_sorted = sorted(a, key=lambda x: x.get(key, "") if isinstance(x, dict) else "")
b_sorted = sorted(b, key=lambda x: x.get(key, "") if isinstance(x, dict) else "")
a_keys = {x.get(key) for x in a_sorted if isinstance(x, dict)}
b_keys = {x.get(key) for x in b_sorted if isinstance(x, dict)}
missing = sorted(a_keys - b_keys, key=str)
extra = sorted(b_keys - a_keys, key=str)
for m in missing:
out.append(Divergence(file, f"{path}[{key}={m}]",
"missing-from-ours", m, None,
classify(file, f"{path}[{key}={m}]",
"missing-from-ours", m, None)))
for e in extra:
out.append(Divergence(file, f"{path}[{key}={e}]",
"extra-in-ours", None, e,
classify(file, f"{path}[{key}={e}]",
"extra-in-ours", None, e)))
common = sorted(a_keys & b_keys, key=str)
a_by = {x.get(key): x for x in a_sorted if isinstance(x, dict)}
b_by = {x.get(key): x for x in b_sorted if isinstance(x, dict)}
for ck in common:
diff_value(file, f"{path}[{key}={ck}]", a_by[ck], b_by[ck],
out, skip, set_keys)
return
# Sequence-field: positional.
if len(a) != len(b):
out.append(Divergence(file, path, "seq-length", len(a), len(b),
classify(file, path, "seq-length", len(a), len(b))))
n = min(len(a), len(b))
else:
n = len(a)
for i in range(n):
diff_value(file, f"{path}[{i}]", a[i], b[i], out, skip, set_keys)
return
if a != b:
out.append(Divergence(file, path, "value", a, b,
classify(file, path, "value", a, b)))
# ---------- file-level orchestration ----------
def load_json(p: Path) -> dict:
with p.open("r", encoding="utf-8") as f:
return json.load(f)
def diff_directory(canary_dir: Path, ours_dir: Path) -> tuple[list[Divergence], dict]:
files = ["cpu_state.json", "memory.json", "kernel.json", "vfs.json", "config.json"]
divergences: list[Divergence] = []
manifest_canary = load_json(canary_dir / "manifest.json") if (canary_dir / "manifest.json").exists() else {}
manifest_ours = load_json(ours_dir / "manifest.json") if (ours_dir / "manifest.json").exists() else {}
file_status = {}
for name in files:
cp = canary_dir / name
op = ours_dir / name
if not cp.exists():
divergences.append(Divergence(name, "<file>", "missing-file",
"absent", "present", "sigma-structural"))
file_status[name] = "missing-in-canary"
continue
if not op.exists():
divergences.append(Divergence(name, "<file>", "missing-file",
"present", "absent", "sigma-structural"))
file_status[name] = "missing-in-ours"
continue
ch = manifest_canary.get("files", {}).get(name)
oh = manifest_ours.get("files", {}).get(name)
if ch is not None and ch == oh:
# Verify the manifest hashes against the actual file contents
# before trusting them — a tampered file with an intact manifest
# would otherwise be silently masked.
ch_actual = hashlib.sha256(cp.read_bytes()).hexdigest()
oh_actual = hashlib.sha256(op.read_bytes()).hexdigest()
if ch_actual == ch and oh_actual == oh:
file_status[name] = "identical"
continue
# Manifest claim does not match disk — fall through to full diff
# and surface the manifest mismatch as a structural divergence.
if ch_actual != ch:
divergences.append(Divergence(
name, "<manifest>", "manifest-hash-mismatch", ch, ch_actual,
"sigma-structural"))
if oh_actual != oh:
divergences.append(Divergence(
name, "<manifest>", "manifest-hash-mismatch", oh, oh_actual,
"sigma-structural"))
a = load_json(cp)
b = load_json(op)
skip = collect_skip_set(name, a) | collect_skip_set(name, b)
diff_value(name, "", a, b, divergences, skip,
set_keys=SET_FIELDS.get(name))
file_status[name] = "diverged"
return divergences, file_status
# ---------- invariants ----------
def _canonicalize_image(image: bytes, xex_meta: dict, image_base: int) -> bytes:
"""Mask XEX import slots to 0xCD. Import patches are legitimate
engine-specific runtime overlays (record_type=0 var slots = 4 bytes,
record_type=1 thunks = 16 bytes); they break a naive byte-equality
invariant even when both engines decoded the XEX identically."""
ranges = []
for lib in xex_meta.get("import_libraries", []):
for imp in lib.get("imports", []):
addr = imp["address"]
rt = imp["record_type"]
if rt == 0:
ranges.append((addr, addr + 4))
elif rt == 1:
ranges.append((addr, addr + 16))
buf = bytearray(image)
for sva, eva in ranges:
s = sva - image_base
e = eva - image_base
if s < 0 or e > len(buf):
continue
for i in range(s, e):
buf[i] = 0xCD
return bytes(buf)
def check_invariants(
canary_dir: Path, ours_dir: Path, xex_json: Path | None = None
) -> tuple[list[tuple[str, str, str, bool]], bool]:
"""Returns (rows, stop) where each row is (name, canary_val, ours_val, ok).
`stop` is True iff any STOP-class invariant failed.
When --xex-json is provided AND both snapshots contain `image.bin`,
the image-load invariant is computed over a canonicalized buffer
(XEX import slots masked). This relaxes the original raw-bytes STOP
to the only meaningful semantic check — both engines decoded the
XEX identically — and avoids tripping on legitimate runtime import
patches (canary's 0xDEADC0DE vs ours's 0x00000000 sentinels)."""
rows = []
stop = False
try:
c_cfg = load_json(canary_dir / "config.json")
o_cfg = load_json(ours_dir / "config.json")
c_cpu = load_json(canary_dir / "cpu_state.json")
o_cpu = load_json(ours_dir / "cpu_state.json")
except FileNotFoundError as e:
return [(f"file_present:{e.filename}", "", "", False)], True
c_entry = c_cfg.get("xex_entry_point")
o_entry = o_cfg.get("xex_entry_point")
rows.append(("xex_entry_point", str(c_entry), str(o_entry), c_entry == o_entry))
if c_entry != o_entry:
stop = True
c_pc = c_cpu.get("pc")
o_pc = o_cpu.get("pc")
pc_match = c_pc == c_entry and o_pc == o_entry
rows.append((
"cpu_state.pc == xex_entry_point",
f"{c_pc} == {c_entry}",
f"{o_pc} == {o_entry}",
pc_match,
))
if not pc_match:
stop = True
c_img = c_cfg.get("image_loaded_sha256")
o_img = o_cfg.get("image_loaded_sha256")
# Original raw hash — informational. Mismatch is expected when the
# engines patch imports differently. Reported but does NOT STOP.
rows.append((
"image_loaded_sha256 (raw)",
c_img or "",
o_img or "",
c_img == o_img,
))
# Canonical hash — the real equivalence check. Requires both engines
# to have dumped image.bin (--phase-b-dump-section-content) AND a
# caller-supplied --xex-json with the import table. When unavailable
# we fall back to the raw hash as the STOP key for backward compat.
c_img_bin = canary_dir / "image.bin"
o_img_bin = ours_dir / "image.bin"
canonical_available = (
xex_json is not None
and c_img_bin.exists()
and o_img_bin.exists()
)
if canonical_available:
xex_meta = json.loads(Path(xex_json).read_text())
image_base = xex_meta.get("image_base", 0x82000000)
cbytes = c_img_bin.read_bytes()
obytes = o_img_bin.read_bytes()
c_canon = _canonicalize_image(cbytes, xex_meta, image_base)
o_canon = _canonicalize_image(obytes, xex_meta, image_base)
import hashlib as _hl
c_canon_h = _hl.sha256(c_canon).hexdigest()
o_canon_h = _hl.sha256(o_canon).hexdigest()
canon_ok = c_canon_h == o_canon_h
rows.append((
"image_canonical_sha256",
c_canon_h,
o_canon_h,
canon_ok,
))
if not canon_ok:
stop = True
else:
# No canonicalization possible — fall back to raw bytes as the
# STOP key. This preserves the original Phase B semantics.
if c_img != o_img:
stop = True
return rows, stop
# ---------- report writing ----------
def write_report(out_path: Path, canary_dir: Path, ours_dir: Path,
divergences: list[Divergence], file_status: dict,
invariants: list, stop: bool):
lines = []
lines.append("# Phase B snapshot diff")
lines.append("")
lines.append(f"- canary snapshot: `{canary_dir}`")
lines.append(f"- ours snapshot: `{ours_dir}`")
lines.append("")
lines.append("## Invariants (HARD GATE)")
lines.append("")
lines.append("| invariant | canary | ours | ok? |")
lines.append("|---|---|---|---|")
for name, cval, oval, ok in invariants:
lines.append(f"| {name} | `{cval}` | `{oval}` | {'PASS' if ok else 'FAIL'} |")
lines.append("")
if stop:
lines.append("> **STOP**: a primary equivalence invariant failed. "
"Downstream divergences are not interpretable until this is "
"resolved. Re-run with `--phase-b-dump-section-content` on both "
"engines and binary-diff the regions to localize.")
lines.append("")
lines.append("## File-level summary")
lines.append("")
lines.append("| file | status | divergence count by class |")
lines.append("|---|---|---|")
by_file_class: dict[tuple[str, str], int] = {}
for d in divergences:
by_file_class[(d.file, d.klass)] = by_file_class.get((d.file, d.klass), 0) + 1
for fname, st in file_status.items():
counts = []
for klass in ["sigma-structural", "delta-content-STOP", "delta-content",
"gamma-kernel-content", "kappa-cache",
"epsilon-host-allocator", "tau-host-timing"]:
c = by_file_class.get((fname, klass), 0)
if c:
counts.append(f"{klass}={c}")
lines.append(f"| {fname} | {st} | {' '.join(counts) if counts else ''} |")
lines.append("")
# Per-class sections.
by_class: dict[str, list[Divergence]] = {}
for d in divergences:
by_class.setdefault(d.klass, []).append(d)
priority_order = [
("sigma-structural", "σ-structural divergences (priority 1)"),
("delta-content-STOP", "δ-content STOP divergences"),
("delta-content", "δ-content divergences (priority 2)"),
("gamma-kernel-content", "γ-kernel-content divergences (priority 2)"),
("kappa-cache", "κ-cache divergences (re-run after pre-clean)"),
("epsilon-host-allocator", "ε-host-allocator (informational)"),
("tau-host-timing", "τ-host-timing (informational)"),
]
for klass, title in priority_order:
items = by_class.get(klass, [])
if not items:
continue
lines.append(f"## {title}")
lines.append("")
for d in items[:200]: # cap each section
lines.append(f"- **{d.file}** `{d.path}`: kind=`{d.kind}` "
f"canary=`{d.canary!r}` ours=`{d.ours!r}`")
if len(items) > 200:
lines.append(f"- _… {len(items) - 200} more in this class (see report.json)_")
lines.append("")
lines.append("## Phase C handoff")
lines.append("")
lines.append("Suggested attack order: σ first (structural), then γ ranked by "
"object type (Thread > Event > Semaphore > Mutex > Timer > File > "
"Other), then δ. ε and τ are catalog-only.")
out_path.write_text("\n".join(lines), encoding="utf-8")
def write_report_json(out_path: Path, divergences: list[Divergence],
file_status: dict, invariants: list, stop: bool):
obj = {
"schema_version": SCHEMA_VERSION,
"invariants": [
{"name": n, "canary": c, "ours": o, "ok": ok}
for n, c, o, ok in invariants
],
"stop": stop,
"file_status": file_status,
"divergences": [d.to_dict() for d in divergences],
}
out_path.write_text(json.dumps(obj, indent=2, sort_keys=True), encoding="utf-8")
# ---------- CLI ----------
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--canary", required=True)
ap.add_argument("--ours", required=True)
ap.add_argument("--out", default=None)
ap.add_argument("--xex-json", default=None,
help="optional xex.json metadata for canonical image-load "
"invariant (requires image.bin in both snapshot dirs)")
ap.add_argument("--validate-identical", action="store_true")
ns = ap.parse_args()
canary_dir = Path(ns.canary)
ours_dir = Path(ns.ours)
if not canary_dir.is_dir() or not ours_dir.is_dir():
print(f"both snapshot dirs must exist: {canary_dir} {ours_dir}", file=sys.stderr)
sys.exit(2)
xex_json = Path(ns.xex_json) if ns.xex_json else None
invariants, stop = check_invariants(canary_dir, ours_dir, xex_json)
divergences, file_status = diff_directory(canary_dir, ours_dir)
if ns.validate_identical:
if divergences or not all(ok for _, _, _, ok in invariants):
print("validate-identical: differences found", file=sys.stderr)
sys.exit(1)
print("validate-identical: OK")
sys.exit(0)
out_md = Path(ns.out) if ns.out else (canary_dir.parent / "report.md")
out_json = out_md.with_suffix(".json")
write_report(out_md, canary_dir, ours_dir, divergences, file_status,
invariants, stop)
write_report_json(out_json, divergences, file_status, invariants, stop)
print(f"wrote {out_md} ({len(divergences)} divergences)")
print(f"wrote {out_json}")
if stop:
sys.exit(2)
if divergences:
sys.exit(1)
sys.exit(0)
if __name__ == "__main__":
main()