diff --git a/tools/diff-events/diff_events.py b/tools/diff-events/diff_events.py new file mode 100644 index 0000000..ecc2c0b --- /dev/null +++ b/tools/diff-events/diff_events.py @@ -0,0 +1,434 @@ +#!/usr/bin/env python3 +"""Phase A event-log diff tool. + +Reads two schema-v1 JSONL event logs (one per engine) and reports the +first behavioral divergence per guest-thread. Aligns streams by +`tid_event_idx`. Field-comparison rules come straight from +`audit-runs/phase-a-diff-harness/schema-v1.md` — keep both in sync. + +Usage: + diff_events.py --canary canary.jsonl --ours ours.jsonl [--out report.md] + diff_events.py --canary a.jsonl --ours b.jsonl --validate-identical + diff_events.py --canary a.jsonl --ours b.jsonl --tid-map 6=1,7=2 +""" + +import argparse +import json +import sys +from pathlib import Path + +SCHEMA_VERSION = 1 + +# Fields the diff tool skips (engine-local or host-clock). +SKIP_TOP_FIELDS = {"engine", "host_ns", "guest_cycle", "deterministic"} +# Within a payload: skipped fields by kind (in addition to the global set). +SKIP_PAYLOAD_FIELDS_BY_KIND = { + # raw_handle_id is engine-local; the diff key is handle_semantic_id. + "handle.create": {"raw_handle_id"}, + "handle.destroy": {"raw_handle_id"}, + # wait_duration_cycles is non-deterministic (host scheduling). + "wait.end": {"wait_duration_cycles"}, +} + +# Allocator-returning kernel exports whose `kernel.return.payload.return_value` +# is a host-allocator-dependent guest VA. Canary and ours legitimately route +# allocations to different heap regions (e.g. canary `MmAllocatePhysicalMemoryEx` +# returns `0xBC220000` from `vC0000000` while ours returns `0x40105000` from +# its single user-heap region — see AUDIT-043 "ε host-allocator address-space +# divergence" and Phase B `report.md` ε-class). Comparing raw VAs would always +# diverge at the first allocator call. +# +# Canonicalization: per `(tid, export_name)` we assign a stable ordinal +# (0, 1, 2, …) to each successive `kernel.return.return_value`, replacing +# both sides' value with the sentinel string `_>` +# before payload comparison. As long as both engines call the same +# allocator the same number of times in the same order on a given thread, +# the comparison treats them as equivalent. +# +# Limitations (documented): +# * If one engine calls an allocator more times than the other, ordinals +# drift and subsequent allocator returns appear as divergences. That's +# the correct outcome — ordinal-count mismatch IS a behavioral +# divergence. +# * `payload.status` is left untouched: it's a copy of the raw VA in +# hex-string form, useful in diff context. +# * Other payload fields that happen to embed an allocator VA (e.g. a +# future `args_resolved.base_address` in a free-call) are NOT +# canonicalized — out of scope for this divergence. Extend the set +# below as new divergence classes surface. +ALLOCATOR_RETURN_FNS = frozenset( + [ + "MmAllocatePhysicalMemoryEx", + "MmAllocatePhysicalMemory", + "NtAllocateVirtualMemory", + "RtlAllocateHeap", + "MmCreateKernelStack", + ] +) + + +def canonicalize_allocator_returns(events_by_tid: dict) -> None: + """In-place: rewrite `payload.return_value` for every kernel.return whose + `payload.name` is in ALLOCATOR_RETURN_FNS, replacing the raw VA with + `_>`. Ordinals are per (tid, name) and assigned + in event order. + + Called on each engine's stream independently; because ordinals are + assigned deterministically by per-tid call order, equivalent streams + produce equivalent sentinels.""" + for tid, evs in events_by_tid.items(): + # name -> next ordinal to assign on this tid + counters: dict[str, int] = {} + for ev in evs: + if ev.get("kind") != "kernel.return": + continue + payload = ev.get("payload") or {} + name = payload.get("name") + if name not in ALLOCATOR_RETURN_FNS: + continue + ordinal = counters.get(name, 0) + counters[name] = ordinal + 1 + sentinel = f"" + payload["return_value"] = sentinel + # `payload.status` mirrors `return_value` as a hex string for + # allocator entries (xboxkrnl trampoline doesn't distinguish + # NTSTATUS from pointer-typed returns). Canonicalize together + # so they stay in lockstep. + if "status" in payload: + payload["status"] = sentinel + + +def load_events(path: Path) -> dict: + """Return {tid: [event, ...]} keyed by tid, ordered by tid_event_idx. + + Validates the schema header (first line must be schema_version=1). + """ + events_by_tid: dict[int, list[dict]] = {} + with path.open("r", encoding="utf-8") as f: + first = f.readline() + if not first: + raise SystemExit(f"{path}: empty file") + hdr = json.loads(first) + if hdr.get("kind") != "schema_version": + raise SystemExit( + f"{path}: first event is not schema_version (got {hdr.get('kind')!r})" + ) + if hdr.get("schema_version") != SCHEMA_VERSION: + raise SystemExit( + f"{path}: schema_version mismatch (expected {SCHEMA_VERSION}, got {hdr.get('schema_version')!r})" + ) + for lineno, line in enumerate(f, start=2): + line = line.rstrip("\n") + if not line: + continue + try: + ev = json.loads(line) + except json.JSONDecodeError as e: + raise SystemExit(f"{path}:{lineno}: invalid JSON ({e})") + tid = ev.get("tid") + if tid is None: + raise SystemExit(f"{path}:{lineno}: missing tid") + events_by_tid.setdefault(tid, []).append(ev) + # Ensure each per-tid list is already monotonic by tid_event_idx. + for tid, evs in events_by_tid.items(): + for i, ev in enumerate(evs): + if ev.get("tid_event_idx") != i: + # Note: the schema permits one engine to emit fewer events; we + # only validate the in-file ordering is strictly monotonic. + if i > 0 and ev["tid_event_idx"] <= evs[i - 1]["tid_event_idx"]: + raise SystemExit( + f"{path}: tid={tid} events out of order at index {i}" + ) + return events_by_tid + + +def auto_tid_map(canary_evs: dict, ours_evs: dict) -> dict[int, int]: + """Naive tid mapping: pair canary tids with ours tids by the first + kernel.call name in each stream. Documented limitation in README.""" + def first_call_name(evs: list[dict]) -> str | None: + for ev in evs: + if ev.get("kind") == "kernel.call": + return ev["payload"].get("name") + return None + + canary_by_first = {} + for tid, evs in canary_evs.items(): + name = first_call_name(evs) + if name is not None: + canary_by_first.setdefault(name, []).append(tid) + + ours_by_first = {} + for tid, evs in ours_evs.items(): + name = first_call_name(evs) + if name is not None: + ours_by_first.setdefault(name, []).append(tid) + + mapping: dict[int, int] = {} + for name, c_tids in canary_by_first.items(): + o_tids = ours_by_first.get(name, []) + for c, o in zip(sorted(c_tids), sorted(o_tids)): + mapping[c] = o + return mapping + + +def parse_tid_map_arg(s: str) -> dict[int, int]: + """Parse `--tid-map 6=1,7=2` into {6: 1, 7: 2}.""" + out: dict[int, int] = {} + for token in s.split(","): + token = token.strip() + if not token: + continue + if "=" not in token: + raise SystemExit(f"--tid-map: bad token {token!r} (expected canary=ours)") + a, b = token.split("=", 1) + out[int(a.strip(), 0)] = int(b.strip(), 0) + return out + + +def compare_payload(kind: str, p_canary: dict, p_ours: dict) -> str | None: + """Compare two payloads. Returns None if equivalent, else a short + human-readable description of the first differing field.""" + skip = SKIP_PAYLOAD_FIELDS_BY_KIND.get(kind, set()) + # Compare the union of keys excluding skipped ones, in canary's key order + # first (stable), then any ours-only fields. + keys_seen: set[str] = set() + for k in p_canary.keys(): + if k in skip: + continue + keys_seen.add(k) + vc = p_canary.get(k) + vo = p_ours.get(k) + if vc != vo: + return f"payload.{k}: canary={vc!r} ours={vo!r}" + for k in p_ours.keys(): + if k in skip or k in keys_seen: + continue + if p_ours[k] is not None: + return f"payload.{k}: canary= ours={p_ours[k]!r}" + return None + + +def compare_event(ev_canary: dict, ev_ours: dict) -> str | None: + """Compare two events. Returns None if equivalent, else a short description.""" + # Top-level comparison: kind must match. + if ev_canary.get("kind") != ev_ours.get("kind"): + return f"kind: canary={ev_canary.get('kind')!r} ours={ev_ours.get('kind')!r}" + # tid_event_idx must match (it's our diff key). + if ev_canary.get("tid_event_idx") != ev_ours.get("tid_event_idx"): + return ( + f"tid_event_idx: canary={ev_canary.get('tid_event_idx')!r} " + f"ours={ev_ours.get('tid_event_idx')!r}" + ) + # Payload comparison. + pc = ev_canary.get("payload", {}) + po = ev_ours.get("payload", {}) + diff = compare_payload(ev_canary["kind"], pc, po) + if diff: + return diff + return None + + +def render_event(ev: dict) -> str: + """One-line summary of an event for the diff report.""" + kind = ev.get("kind", "?") + idx = ev.get("tid_event_idx", "?") + payload = ev.get("payload", {}) + if kind in ("kernel.call", "kernel.return", "import.call"): + name = payload.get("name") or payload.get("ord") + return f"[{idx}] {kind} {name}" + if kind in ("handle.create", "handle.destroy"): + sid = payload.get("handle_semantic_id", "?") + return f"[{idx}] {kind} sid={sid}" + if kind in ("thread.create", "thread.exit"): + return f"[{idx}] {kind} {payload}" + if kind in ("wait.begin", "wait.end"): + return f"[{idx}] {kind} {payload}" + return f"[{idx}] {kind} {payload}" + + +def diff_one_tid( + canary_evs: list[dict], ours_evs: list[dict], canary_tid: int, ours_tid: int +) -> dict: + """Walk one mapped pair. Stop at the first divergence.""" + matched = 0 + n = min(len(canary_evs), len(ours_evs)) + pre_context: list[tuple[dict, dict]] = [] + diverged_at: int | None = None + diff_descr: str | None = None + for i in range(n): + ec = canary_evs[i] + eo = ours_evs[i] + d = compare_event(ec, eo) + if d is None: + matched += 1 + pre_context.append((ec, eo)) + if len(pre_context) > 5: + pre_context.pop(0) + continue + diverged_at = i + diff_descr = d + break + return { + "canary_tid": canary_tid, + "ours_tid": ours_tid, + "matched": matched, + "canary_total": len(canary_evs), + "ours_total": len(ours_evs), + "diverged_at": diverged_at, + "diff_descr": diff_descr, + "pre_context": pre_context, + "post_canary": canary_evs[diverged_at] if diverged_at is not None else None, + "post_ours": ours_evs[diverged_at] if diverged_at is not None else None, + "next_canary": ( + canary_evs[diverged_at + 1] + if diverged_at is not None and diverged_at + 1 < len(canary_evs) + else None + ), + "next_ours": ( + ours_evs[diverged_at + 1] + if diverged_at is not None and diverged_at + 1 < len(ours_evs) + else None + ), + } + + +def render_report(per_tid_results: list[dict]) -> str: + out: list[str] = [] + out.append("# Phase A diff report") + out.append("") + out.append("**This report is the output of Phase A's diff harness. Divergences") + out.append("shown here are INPUT for Phase B (first-divergence localization),") + out.append("not findings of Phase A.** Phase A's job is to make the harness") + out.append("itself correct, not to analyze what it surfaces.") + out.append("") + out.append("## Summary") + out.append("") + out.append("| canary_tid | ours_tid | matched | canary_total | ours_total | first_divergence_at |") + out.append("|---|---|---|---|---|---|") + for r in per_tid_results: + div = r["diverged_at"] if r["diverged_at"] is not None else "—" + out.append( + f"| {r['canary_tid']} | {r['ours_tid']} | {r['matched']} | " + f"{r['canary_total']} | {r['ours_total']} | {div} |" + ) + out.append("") + for r in per_tid_results: + out.append(f"## canary_tid={r['canary_tid']} → ours_tid={r['ours_tid']}") + out.append("") + if r["diverged_at"] is None: + out.append( + f"No divergence within the {r['matched']} compared events " + f"(canary has {r['canary_total']}, ours has {r['ours_total']})." + ) + out.append("") + continue + out.append(f"First divergence at `tid_event_idx={r['diverged_at']}`: {r['diff_descr']}") + out.append("") + out.append("**Pre-context (last 5 matching events):**") + out.append("```") + for ec, eo in r["pre_context"]: + out.append(f" canary: {render_event(ec)}") + out.append(f" ours: {render_event(eo)}") + out.append("```") + out.append("") + out.append("**Divergent event:**") + out.append("```") + out.append(f" canary: {render_event(r['post_canary'])}") + out.append(f" ours: {render_event(r['post_ours'])}") + out.append("```") + out.append("") + out.append("**Next event after the divergence (if any):**") + out.append("```") + if r["next_canary"]: + out.append(f" canary: {render_event(r['next_canary'])}") + else: + out.append(" canary: ") + if r["next_ours"]: + out.append(f" ours: {render_event(r['next_ours'])}") + else: + out.append(" ours: ") + out.append("```") + out.append("") + out.append("**Raw events (JSON):**") + out.append("```json") + out.append(json.dumps(r["post_canary"], sort_keys=True)) + out.append(json.dumps(r["post_ours"], sort_keys=True)) + out.append("```") + out.append("") + return "\n".join(out) + + +def main() -> int: + ap = argparse.ArgumentParser(description="Phase A event-log diff tool") + ap.add_argument("--canary", required=True, type=Path) + ap.add_argument("--ours", required=True, type=Path) + ap.add_argument("--out", type=Path, help="Write markdown report here (else stdout)") + ap.add_argument( + "--tid-map", + type=str, + help="Manual tid mapping like '6=1,7=2'. Overrides auto-mapping.", + ) + ap.add_argument( + "--validate-identical", + action="store_true", + help="Exit non-zero if any mapped tid pair has any divergence. " + "Used by gate-4 negative-test and by self-diff smoke tests.", + ) + ap.add_argument( + "--no-canonicalize-allocators", + action="store_true", + help="Disable per-tid ordinal canonicalization of allocator return " + "values (default: enabled). See ALLOCATOR_RETURN_FNS for the " + "covered set. Disabling reproduces the raw-VA comparison.", + ) + args = ap.parse_args() + + canary_evs = load_events(args.canary) + ours_evs = load_events(args.ours) + + if not args.no_canonicalize_allocators: + canonicalize_allocator_returns(canary_evs) + canonicalize_allocator_returns(ours_evs) + + if args.tid_map: + tid_map = parse_tid_map_arg(args.tid_map) + else: + tid_map = auto_tid_map(canary_evs, ours_evs) + + if not tid_map: + sys.stderr.write( + "no tid mapping (auto-mapping found no shared first-kernel-call). " + "Pass --tid-map manually.\n" + ) + return 2 + + per_tid: list[dict] = [] + for c_tid, o_tid in sorted(tid_map.items()): + if c_tid not in canary_evs: + sys.stderr.write(f"warn: canary tid {c_tid} not in stream; skipping\n") + continue + if o_tid not in ours_evs: + sys.stderr.write(f"warn: ours tid {o_tid} not in stream; skipping\n") + continue + per_tid.append(diff_one_tid(canary_evs[c_tid], ours_evs[o_tid], c_tid, o_tid)) + + report = render_report(per_tid) + if args.out: + args.out.write_text(report, encoding="utf-8") + sys.stderr.write(f"diff report written to {args.out}\n") + else: + sys.stdout.write(report) + + if args.validate_identical: + for r in per_tid: + if r["diverged_at"] is not None: + sys.stderr.write( + f"validate-identical: divergence in canary_tid={r['canary_tid']} " + f"at tid_event_idx={r['diverged_at']} ({r['diff_descr']})\n" + ) + return 1 + return 0 + + +if __name__ == "__main__": + sys.exit(main())