#!/usr/bin/env python3 """Phase B state-snapshot diff tool. Reads two snapshot directories (one per engine, `/canary/` and `/ours/`) emitted by `phase_b_snapshot` at the moment immediately before the first guest PPC instruction of the XEX entry_point. Produces a markdown report (`report.md`) plus a machine-readable JSON sibling (`report.json`) classifying every observable divergence. Field-comparison rules + classification table: audit-runs/phase-b-state-equivalence/README.md Both engines' emitter source + this tool read the same rules. Usage: diff_state.py --canary /canary --ours /ours [--out report.md] diff_state.py --canary --ours --validate-identical Exit codes: 0 — no divergence (or `--validate-identical` succeeded) 1 — divergences found 2 — STOP triggered (image_loaded_sha256 / xex_entry_point / iso_sha256 mismatch — interpretation of downstream files is not valid) """ from __future__ import annotations import argparse import hashlib import json import sys from pathlib import Path from typing import Any SCHEMA_VERSION = 1 # ---------- field-comparison rules (declared up front) ---------- # Per-snapshot-file fields the diff tool always skips at the top level. SKIP_TOP_FIELDS = {"schema_version", "engine", "deterministic_skip"} # Per-file: extra fields skipped. JSON-pointer-style ("a.b.c") matched # either at top-level keys or within array-of-objects members keyed by # `handle_semantic_id` etc. SKIP_BY_FILE: dict[str, set[str]] = { "cpu_state.json": {"hw_id"}, "memory.json": set(), "kernel.json": {"raw_handle_id", "exports_registered_count"}, "vfs.json": set(), "config.json": { "build_id", "iso_path", "host_ns_at_snapshot", "wall_clock_iso8601", "cli_argv", "cvars.phase_b_snapshot_dir", }, } # `objects` etc. are sets (sort then compare); `regions`/`probes`/`gpr`/ # etc. are sequences (positional compare). Mismatches handled separately. SET_FIELDS: dict[str, dict[str, str]] = { # file -> field_name -> sort-key (used as dict key) "kernel.json": { "objects": "handle_semantic_id", "handle_name_table": "name", }, "vfs.json": {"cache_root_listing": "relpath"}, "memory.json": {"heaps": "base"}, } # STOP-trigger fields (δ-content critical equivalence). # Note: image_loaded_sha256 is reported but NOT a STOP trigger here. The # raw hash mismatches when engines patch imports differently — see # check_invariants() which evaluates `image_canonical_sha256` (computed # from image.bin + xex.json) as the real semantic STOP key. STOP_FIELDS = { ("config.json", "xex_entry_point"), ("config.json", "iso_sha256"), } # ---------- divergence record ---------- class Divergence: __slots__ = ("file", "path", "kind", "canary", "ours", "klass") def __init__(self, file: str, path: str, kind: str, canary: Any, ours: Any, klass: str): self.file = file self.path = path self.kind = kind self.canary = canary self.ours = ours self.klass = klass def to_dict(self) -> dict: return { "file": self.file, "path": self.path, "kind": self.kind, "canary": self.canary, "ours": self.ours, "class": self.klass, } # ---------- classification ---------- def classify(file: str, path: str, kind: str, canary: Any, ours: Any) -> str: if (file, path) in STOP_FIELDS: return "delta-content-STOP" if kind in ("set-size-mismatch", "missing-field", "extra-field", "seq-length"): return "sigma-structural" if path.endswith(".sha256") or path.endswith("_sha256"): return "delta-content" if path.startswith("objects[") and ".details." in path: return "gamma-kernel-content" if file == "vfs.json" and path.startswith("cache_root_listing"): return "kappa-cache" if path in ("heaps[].base", "heaps[].name"): return "epsilon-host-allocator" if path in ("host_ns_at_snapshot", "wall_clock_iso8601"): return "tau-host-timing" return "gamma-kernel-content" # ---------- generic walker ---------- def collect_skip_set(file: str, doc: dict) -> set[str]: s = set(SKIP_TOP_FIELDS) | set(SKIP_BY_FILE.get(file, set())) extra = doc.get("deterministic_skip") if isinstance(extra, list): for x in extra: if isinstance(x, str): s.add(x) return s def is_skipped(file: str, path: str, skip: set[str]) -> bool: if path in skip: return True # Strip array indices for membership check, so "objects[].raw_handle_id" # in the skip set matches "objects[3].raw_handle_id". bracketed = [] parts = path.split(".") for p in parts: idx = p.find("[") if idx >= 0: bracketed.append(p[:idx] + "[]") else: bracketed.append(p) norm = ".".join(bracketed) if norm in skip: return True # Last-token (leaf field) match — e.g. "raw_handle_id" anywhere. leaf = bracketed[-1] if leaf in skip: return True return False def diff_value( file: str, path: str, a: Any, b: Any, out: list[Divergence], skip: set[str], set_keys: dict[str, str] | None = None, ) -> None: if is_skipped(file, path, skip): return if type(a) != type(b): out.append(Divergence(file, path, "type-mismatch", a, b, classify(file, path, "type-mismatch", a, b))) return if isinstance(a, dict): a_keys = set(a.keys()) b_keys = set(b.keys()) for k in sorted(a_keys - b_keys): sub = f"{path}.{k}" if path else k if is_skipped(file, sub, skip): continue out.append(Divergence(file, sub, "missing-field", a[k], None, classify(file, sub, "missing-field", a[k], None))) for k in sorted(b_keys - a_keys): sub = f"{path}.{k}" if path else k if is_skipped(file, sub, skip): continue out.append(Divergence(file, sub, "extra-field", None, b[k], classify(file, sub, "extra-field", None, b[k]))) for k in sorted(a_keys & b_keys): sub = f"{path}.{k}" if path else k diff_value(file, sub, a[k], b[k], out, skip, set_keys) return if isinstance(a, list): # Set-field handling: sort by configured key. last_seg = path.rsplit(".", 1)[-1] if path else "" bare = last_seg.split("[", 1)[0] key = (set_keys or {}).get(bare) if key is not None: a_sorted = sorted(a, key=lambda x: x.get(key, "") if isinstance(x, dict) else "") b_sorted = sorted(b, key=lambda x: x.get(key, "") if isinstance(x, dict) else "") a_keys = {x.get(key) for x in a_sorted if isinstance(x, dict)} b_keys = {x.get(key) for x in b_sorted if isinstance(x, dict)} missing = sorted(a_keys - b_keys, key=str) extra = sorted(b_keys - a_keys, key=str) for m in missing: out.append(Divergence(file, f"{path}[{key}={m}]", "missing-from-ours", m, None, classify(file, f"{path}[{key}={m}]", "missing-from-ours", m, None))) for e in extra: out.append(Divergence(file, f"{path}[{key}={e}]", "extra-in-ours", None, e, classify(file, f"{path}[{key}={e}]", "extra-in-ours", None, e))) common = sorted(a_keys & b_keys, key=str) a_by = {x.get(key): x for x in a_sorted if isinstance(x, dict)} b_by = {x.get(key): x for x in b_sorted if isinstance(x, dict)} for ck in common: diff_value(file, f"{path}[{key}={ck}]", a_by[ck], b_by[ck], out, skip, set_keys) return # Sequence-field: positional. if len(a) != len(b): out.append(Divergence(file, path, "seq-length", len(a), len(b), classify(file, path, "seq-length", len(a), len(b)))) n = min(len(a), len(b)) else: n = len(a) for i in range(n): diff_value(file, f"{path}[{i}]", a[i], b[i], out, skip, set_keys) return if a != b: out.append(Divergence(file, path, "value", a, b, classify(file, path, "value", a, b))) # ---------- file-level orchestration ---------- def load_json(p: Path) -> dict: with p.open("r", encoding="utf-8") as f: return json.load(f) def diff_directory(canary_dir: Path, ours_dir: Path) -> tuple[list[Divergence], dict]: files = ["cpu_state.json", "memory.json", "kernel.json", "vfs.json", "config.json"] divergences: list[Divergence] = [] manifest_canary = load_json(canary_dir / "manifest.json") if (canary_dir / "manifest.json").exists() else {} manifest_ours = load_json(ours_dir / "manifest.json") if (ours_dir / "manifest.json").exists() else {} file_status = {} for name in files: cp = canary_dir / name op = ours_dir / name if not cp.exists(): divergences.append(Divergence(name, "", "missing-file", "absent", "present", "sigma-structural")) file_status[name] = "missing-in-canary" continue if not op.exists(): divergences.append(Divergence(name, "", "missing-file", "present", "absent", "sigma-structural")) file_status[name] = "missing-in-ours" continue ch = manifest_canary.get("files", {}).get(name) oh = manifest_ours.get("files", {}).get(name) if ch is not None and ch == oh: # Verify the manifest hashes against the actual file contents # before trusting them — a tampered file with an intact manifest # would otherwise be silently masked. ch_actual = hashlib.sha256(cp.read_bytes()).hexdigest() oh_actual = hashlib.sha256(op.read_bytes()).hexdigest() if ch_actual == ch and oh_actual == oh: file_status[name] = "identical" continue # Manifest claim does not match disk — fall through to full diff # and surface the manifest mismatch as a structural divergence. if ch_actual != ch: divergences.append(Divergence( name, "", "manifest-hash-mismatch", ch, ch_actual, "sigma-structural")) if oh_actual != oh: divergences.append(Divergence( name, "", "manifest-hash-mismatch", oh, oh_actual, "sigma-structural")) a = load_json(cp) b = load_json(op) skip = collect_skip_set(name, a) | collect_skip_set(name, b) diff_value(name, "", a, b, divergences, skip, set_keys=SET_FIELDS.get(name)) file_status[name] = "diverged" return divergences, file_status # ---------- invariants ---------- def _canonicalize_image(image: bytes, xex_meta: dict, image_base: int) -> bytes: """Mask XEX import slots to 0xCD. Import patches are legitimate engine-specific runtime overlays (record_type=0 var slots = 4 bytes, record_type=1 thunks = 16 bytes); they break a naive byte-equality invariant even when both engines decoded the XEX identically.""" ranges = [] for lib in xex_meta.get("import_libraries", []): for imp in lib.get("imports", []): addr = imp["address"] rt = imp["record_type"] if rt == 0: ranges.append((addr, addr + 4)) elif rt == 1: ranges.append((addr, addr + 16)) buf = bytearray(image) for sva, eva in ranges: s = sva - image_base e = eva - image_base if s < 0 or e > len(buf): continue for i in range(s, e): buf[i] = 0xCD return bytes(buf) def check_invariants( canary_dir: Path, ours_dir: Path, xex_json: Path | None = None ) -> tuple[list[tuple[str, str, str, bool]], bool]: """Returns (rows, stop) where each row is (name, canary_val, ours_val, ok). `stop` is True iff any STOP-class invariant failed. When --xex-json is provided AND both snapshots contain `image.bin`, the image-load invariant is computed over a canonicalized buffer (XEX import slots masked). This relaxes the original raw-bytes STOP to the only meaningful semantic check — both engines decoded the XEX identically — and avoids tripping on legitimate runtime import patches (canary's 0xDEADC0DE vs ours's 0x00000000 sentinels).""" rows = [] stop = False try: c_cfg = load_json(canary_dir / "config.json") o_cfg = load_json(ours_dir / "config.json") c_cpu = load_json(canary_dir / "cpu_state.json") o_cpu = load_json(ours_dir / "cpu_state.json") except FileNotFoundError as e: return [(f"file_present:{e.filename}", "", "", False)], True c_entry = c_cfg.get("xex_entry_point") o_entry = o_cfg.get("xex_entry_point") rows.append(("xex_entry_point", str(c_entry), str(o_entry), c_entry == o_entry)) if c_entry != o_entry: stop = True c_pc = c_cpu.get("pc") o_pc = o_cpu.get("pc") pc_match = c_pc == c_entry and o_pc == o_entry rows.append(( "cpu_state.pc == xex_entry_point", f"{c_pc} == {c_entry}", f"{o_pc} == {o_entry}", pc_match, )) if not pc_match: stop = True c_img = c_cfg.get("image_loaded_sha256") o_img = o_cfg.get("image_loaded_sha256") # Original raw hash — informational. Mismatch is expected when the # engines patch imports differently. Reported but does NOT STOP. rows.append(( "image_loaded_sha256 (raw)", c_img or "", o_img or "", c_img == o_img, )) # Canonical hash — the real equivalence check. Requires both engines # to have dumped image.bin (--phase-b-dump-section-content) AND a # caller-supplied --xex-json with the import table. When unavailable # we fall back to the raw hash as the STOP key for backward compat. c_img_bin = canary_dir / "image.bin" o_img_bin = ours_dir / "image.bin" canonical_available = ( xex_json is not None and c_img_bin.exists() and o_img_bin.exists() ) if canonical_available: xex_meta = json.loads(Path(xex_json).read_text()) image_base = xex_meta.get("image_base", 0x82000000) cbytes = c_img_bin.read_bytes() obytes = o_img_bin.read_bytes() c_canon = _canonicalize_image(cbytes, xex_meta, image_base) o_canon = _canonicalize_image(obytes, xex_meta, image_base) import hashlib as _hl c_canon_h = _hl.sha256(c_canon).hexdigest() o_canon_h = _hl.sha256(o_canon).hexdigest() canon_ok = c_canon_h == o_canon_h rows.append(( "image_canonical_sha256", c_canon_h, o_canon_h, canon_ok, )) if not canon_ok: stop = True else: # No canonicalization possible — fall back to raw bytes as the # STOP key. This preserves the original Phase B semantics. if c_img != o_img: stop = True return rows, stop # ---------- report writing ---------- def write_report(out_path: Path, canary_dir: Path, ours_dir: Path, divergences: list[Divergence], file_status: dict, invariants: list, stop: bool): lines = [] lines.append("# Phase B snapshot diff") lines.append("") lines.append(f"- canary snapshot: `{canary_dir}`") lines.append(f"- ours snapshot: `{ours_dir}`") lines.append("") lines.append("## Invariants (HARD GATE)") lines.append("") lines.append("| invariant | canary | ours | ok? |") lines.append("|---|---|---|---|") for name, cval, oval, ok in invariants: lines.append(f"| {name} | `{cval}` | `{oval}` | {'PASS' if ok else 'FAIL'} |") lines.append("") if stop: lines.append("> **STOP**: a primary equivalence invariant failed. " "Downstream divergences are not interpretable until this is " "resolved. Re-run with `--phase-b-dump-section-content` on both " "engines and binary-diff the regions to localize.") lines.append("") lines.append("## File-level summary") lines.append("") lines.append("| file | status | divergence count by class |") lines.append("|---|---|---|") by_file_class: dict[tuple[str, str], int] = {} for d in divergences: by_file_class[(d.file, d.klass)] = by_file_class.get((d.file, d.klass), 0) + 1 for fname, st in file_status.items(): counts = [] for klass in ["sigma-structural", "delta-content-STOP", "delta-content", "gamma-kernel-content", "kappa-cache", "epsilon-host-allocator", "tau-host-timing"]: c = by_file_class.get((fname, klass), 0) if c: counts.append(f"{klass}={c}") lines.append(f"| {fname} | {st} | {' '.join(counts) if counts else '—'} |") lines.append("") # Per-class sections. by_class: dict[str, list[Divergence]] = {} for d in divergences: by_class.setdefault(d.klass, []).append(d) priority_order = [ ("sigma-structural", "σ-structural divergences (priority 1)"), ("delta-content-STOP", "δ-content STOP divergences"), ("delta-content", "δ-content divergences (priority 2)"), ("gamma-kernel-content", "γ-kernel-content divergences (priority 2)"), ("kappa-cache", "κ-cache divergences (re-run after pre-clean)"), ("epsilon-host-allocator", "ε-host-allocator (informational)"), ("tau-host-timing", "τ-host-timing (informational)"), ] for klass, title in priority_order: items = by_class.get(klass, []) if not items: continue lines.append(f"## {title}") lines.append("") for d in items[:200]: # cap each section lines.append(f"- **{d.file}** `{d.path}`: kind=`{d.kind}` " f"canary=`{d.canary!r}` ours=`{d.ours!r}`") if len(items) > 200: lines.append(f"- _… {len(items) - 200} more in this class (see report.json)_") lines.append("") lines.append("## Phase C handoff") lines.append("") lines.append("Suggested attack order: σ first (structural), then γ ranked by " "object type (Thread > Event > Semaphore > Mutex > Timer > File > " "Other), then δ. ε and τ are catalog-only.") out_path.write_text("\n".join(lines), encoding="utf-8") def write_report_json(out_path: Path, divergences: list[Divergence], file_status: dict, invariants: list, stop: bool): obj = { "schema_version": SCHEMA_VERSION, "invariants": [ {"name": n, "canary": c, "ours": o, "ok": ok} for n, c, o, ok in invariants ], "stop": stop, "file_status": file_status, "divergences": [d.to_dict() for d in divergences], } out_path.write_text(json.dumps(obj, indent=2, sort_keys=True), encoding="utf-8") # ---------- CLI ---------- def main(): ap = argparse.ArgumentParser() ap.add_argument("--canary", required=True) ap.add_argument("--ours", required=True) ap.add_argument("--out", default=None) ap.add_argument("--xex-json", default=None, help="optional xex.json metadata for canonical image-load " "invariant (requires image.bin in both snapshot dirs)") ap.add_argument("--validate-identical", action="store_true") ns = ap.parse_args() canary_dir = Path(ns.canary) ours_dir = Path(ns.ours) if not canary_dir.is_dir() or not ours_dir.is_dir(): print(f"both snapshot dirs must exist: {canary_dir} {ours_dir}", file=sys.stderr) sys.exit(2) xex_json = Path(ns.xex_json) if ns.xex_json else None invariants, stop = check_invariants(canary_dir, ours_dir, xex_json) divergences, file_status = diff_directory(canary_dir, ours_dir) if ns.validate_identical: if divergences or not all(ok for _, _, _, ok in invariants): print("validate-identical: differences found", file=sys.stderr) sys.exit(1) print("validate-identical: OK") sys.exit(0) out_md = Path(ns.out) if ns.out else (canary_dir.parent / "report.md") out_json = out_md.with_suffix(".json") write_report(out_md, canary_dir, ours_dir, divergences, file_status, invariants, stop) write_report_json(out_json, divergences, file_status, invariants, stop) print(f"wrote {out_md} ({len(divergences)} divergences)") print(f"wrote {out_json}") if stop: sys.exit(2) if divergences: sys.exit(1) sys.exit(0) if __name__ == "__main__": main()