Files
xenia-rs/tools/diff-events/diff_events.py
MechaCat02 ad45873a1b ITERATE-2.V: scheduler priority aging closes 18-day AUDIT-049 wedge
Priority aging in xenia-cpu/scheduler.rs:pick_runnable
(effective_priority = base + age_bonus(now_round - last_run_round),
capped at +31, AGING_ROUNDS_PER_BONUS=1). Strict-priority was parking
priority=0 threads behind CPU-bound priority=15 audio mixer
(sub_824D1328 guest spinwait at PC=0x824d1404 on CPU5). Aging
eventually picks the starved thread, breaking the producer-consumer
cycle that caused 5-tid wedge at PC=0x824ac578 since AUDIT-049 (10 May).

Cascade observed: tid=13 clean exit; events 121K -> 13M (107x); last
host_ns 767ms -> 51,011ms (66x); 8 new threads spawn; VdSwap 1 -> 2.

Complete two-day iterate sequence (2026-05-27 -> 2026-05-28):
- 2.F: VdSwap drain timeout 900ms -> 1ms (xenia-gpu/handle.rs); 876x
       perf win on VdSwap kernel callback
- 2.H: vA0000000 physical heap bucket added (state.rs, exports.rs);
       ctx_ptrs now in 0xA0000000-0xBFFFFFFF range matching canary
- 2.L: Phase-A diff harness categorized [return_value mismatch],
       [status mismatch], [args_resolved.path mismatch] tags
       (tools/diff-events/diff_events.py); closes reading-error #41
       (silent test-harness state leak invalidating trace diffs)
- 2.M: always-on exit-thread-state.json sibling to Phase-A JSONL
       (event_log.rs + xenia-app/main.rs); closes reading-error #42
       (Phase-A blind to blocked-forever waits)
- 2.Q: signal.match kernel instrumentation in NtSetEvent /
       NtReleaseSemaphore / KeSetEvent / KeReleaseSemaphore
       (exports.rs); emits target_handle + waiter_count + waiter_tids
- 2.T: wake.requested kernel instrumentation in wake_eligible_waiters
       (exports.rs); emits target_tid + transition + new_state
- 2.V: scheduler priority aging (xenia-cpu/scheduler.rs) [keystone]

Plus accumulated WIP from earlier May (contention_manifest,
phase_b_snapshot, xam/xaudio enhancements, analysis db, xex loader,
xenia-app main loop, etc.). Audit-runs/ artifacts remain untracked
per project convention.

Tests: 300 xenia-cpu / 227 xenia-kernel / 5 xenia-app / 19 xenia-path
/ 30+ smaller suites -- all PASS, 0 regressions. Determinism preserved
(2x cold runs bit-identical at 13,003,881 events post-2.V).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-29 07:27:26 +02:00

1377 lines
60 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""Phase A event-log diff tool.
Reads two schema-v1 JSONL event logs (one per engine) and reports the
first behavioral divergence per guest-thread. Aligns streams by
`tid_event_idx`. Field-comparison rules come straight from
`audit-runs/phase-a-diff-harness/schema-v1.md` — keep both in sync.
Usage:
diff_events.py --canary canary.jsonl --ours ours.jsonl [--out report.md]
diff_events.py --canary a.jsonl --ours b.jsonl --validate-identical
diff_events.py --canary a.jsonl --ours b.jsonl --tid-map 6=1,7=2
"""
import argparse
import json
import sys
from pathlib import Path
SCHEMA_VERSION = 1
# Phase C+18 — Shared-global SID marker. Process-global dispatcher
# objects (canary `XObject::GetNativeObject` lazy-wrap / ours
# `ensure_dispatcher_object` first-touch synthesis) use this constant as
# the `create_site_pc` input to the FNV-1a SID computation so the SID is
# scheduling-invariant — keyed on `(marker, 0, pointer, object_type)`.
# See `event_log.rs::SHARED_GLOBAL_SID_MARKER` / `event_log.h::kSharedGlobalSidMarker`
# and schema-v1.md §"Shared-global SIDs". Both engines must use this
# exact value.
SHARED_GLOBAL_SID_MARKER = 0xC01AB005
def _fnv1a_64(data: bytes) -> int:
"""FNV-1a 64-bit. Identical to the engines' `semantic_id`/`ComputeSemanticId`
inner loops. Inlined here so the diff tool has no external deps."""
h = 0xCBF29CE484222325
for b in data:
h ^= b
h = (h * 0x100000001B3) & 0xFFFFFFFFFFFFFFFF
return h
def shared_global_sid(pointer: int, object_type: int) -> str:
"""Compute the deterministic shared-global SID for a process-global
dispatcher (engine-agnostic). Inputs:
create_site_pc = SHARED_GLOBAL_SID_MARKER (4 bytes LE)
creating_tid = 0 (4 bytes LE)
tid_event_idx = pointer as u64 (8 bytes LE)
object_type = object_type (4 bytes LE)
Returns the lowercase 16-hex-char SID string (schema-v1 format)."""
buf = bytearray(4 + 4 + 8 + 4)
buf[0:4] = SHARED_GLOBAL_SID_MARKER.to_bytes(4, "little")
buf[4:8] = (0).to_bytes(4, "little")
buf[8:16] = (pointer & 0xFFFFFFFFFFFFFFFF).to_bytes(8, "little")
buf[16:20] = (object_type & 0xFFFFFFFF).to_bytes(4, "little")
return f"{_fnv1a_64(bytes(buf)):016x}"
def is_shared_global_handle_create(ev: dict) -> bool:
"""Return True if `ev` is a `handle.create` whose SID matches the
deterministic shared-global recipe over the event's own
`(raw_handle_id, object_type)`. Self-consistent — independent of
cross-engine context.
Phase C+18 emits these via `ensure_dispatcher_object` (ours) and
`XObject::GetNativeObject` (canary). Regular per-thread
`handle.create` events (file/thread/etc., allocated via
`alloc_handle_for`/`AddHandle`) use the per-(tid, idx) SID recipe
and will NOT match this check — they keep their strict per-tid
sequence alignment in the diff.
Asymmetry note (Phase C+21): in **ours** the `raw_handle_id` is the
guest dispatcher pointer itself (so the recipe recomputes from the
payload directly). In **canary**, `EmitHandleCreateSharedGlobal`
hashes the dispatcher's guest VA but stashes
`object->handle()` (the handle-table slot, e.g. `0xf8000044`) as
`raw_handle_id`. So canary's shared-global handle.create events are
NOT self-recognizable by this recipe check. The diff tool covers
canary's side via the cross-tid usage heuristic in
`collect_shared_global_sids` — any SID that appears across multiple
tids in either engine is also treated as shared-global.
"""
if ev.get("kind") != "handle.create":
return False
p = ev.get("payload") or {}
sid = p.get("handle_semantic_id")
if not isinstance(sid, str):
return False
raw = p.get("raw_handle_id")
if not isinstance(raw, str):
return False
try:
pointer = int(raw, 16)
except ValueError:
return False
obj_type = p.get("object_type")
if not isinstance(obj_type, int):
return False
return sid == shared_global_sid(pointer, obj_type)
def collect_shared_global_sids(
canary_by_tid: dict, ours_by_tid: dict
) -> set[str]:
"""Collect the set of SIDs that are scheduling-invariant
"shared-global" — i.e. process-global dispatchers whose creation
order and per-tid attribution are timing-dependent. The diff tool
treats these SIDs as floating across tids (cross-engine match by
SID alone, regardless of which tid happens to be the first
toucher) — see Phase C+18 (`handle.create`) and Phase C+21
(`wait.begin`).
The set is the UNION of:
1. Recipe-matching `handle.create` events: any `handle.create`
whose payload SID equals `shared_global_sid(raw_handle_id,
object_type)`. This catches ours's `ensure_dispatcher_object`
output directly (where `raw_handle_id == ptr`). It does NOT
catch canary's `EmitHandleCreateSharedGlobal` output because
canary stashes the handle-table slot id (`0xf8xxxxxx`) as
`raw_handle_id` rather than the dispatcher VA that was hashed.
2. Cross-tid usage heuristic: any SID that is referenced by
`handle.create` or `wait.begin` events on **two or more
distinct guest tids** in EITHER engine. Process-global
dispatchers are touched by multiple guest threads during boot
(XAudio voice-volume semaphores, shared CSes, shared KEVENTs);
per-thread SIDs by construction stay on the single creating
tid (their hash inputs include `creating_tid`). So multi-tid
SID usage is a strong shared-global signal that survives
canary's raw_handle_id asymmetry.
Heuristic risk note: a per-thread SID referenced by another tid
via a wait would also appear cross-tid; this happens
legitimately (one thread creates, another waits) and would be
flagged as "shared-global" here. That's acceptable for the diff
tool's purpose — the floating-absorb only kicks in on KIND
MISMATCH, so true per-thread chains that match strictly on both
sides will still align correctly. The heuristic only loosens
things when one side is missing a `handle.create` or
`wait.begin` event for a cross-tid-used SID — which is exactly
the scheduling-jitter window the C+21 fix targets.
"""
sids: set[str] = set()
# Pass 1: recipe-matching handle.create events.
for evs_by_tid in (canary_by_tid, ours_by_tid):
for evs in evs_by_tid.values():
for ev in evs:
if is_shared_global_handle_create(ev):
s = _ev_handle_create_sid(ev)
if s:
sids.add(s)
# Pass 2: cross-tid usage heuristic.
for evs_by_tid in (canary_by_tid, ours_by_tid):
sid_to_tids: dict[str, set[int]] = {}
for tid, evs in evs_by_tid.items():
for ev in evs:
k = ev.get("kind")
p = ev.get("payload") or {}
if k == "handle.create":
s = p.get("handle_semantic_id")
if isinstance(s, str):
sid_to_tids.setdefault(s, set()).add(tid)
elif k == "wait.begin":
handles = p.get("handles_semantic_ids") or []
for s in handles:
if isinstance(s, str):
sid_to_tids.setdefault(s, set()).add(tid)
for s, tids in sid_to_tids.items():
if len(tids) >= 2:
sids.add(s)
return sids
def is_shared_global_wait_begin(ev: dict, shared_sids: set[str]) -> bool:
"""Return True if `ev` is a `wait.begin` referencing AT LEAST ONE
shared-global SID (per Phase C+21). For `wait_type=all` events,
any single shared-global handle in the set is enough to classify
the wait.begin as floating — the wait itself is a process-global
dispatcher contention point that may or may not actually block
depending on host scheduling.
See `collect_shared_global_sids` for what populates
`shared_sids`.
"""
if ev.get("kind") != "wait.begin":
return False
if not shared_sids:
return False
p = ev.get("payload") or {}
handles = p.get("handles_semantic_ids") or []
for sid in handles:
if isinstance(sid, str) and sid in shared_sids:
return True
return False
# Phase D Stage 4 — event kinds that are emitted by only one engine
# under default config or carry engine-local payloads that can't be
# cross-engine compared. The diff tool advances its per-tid pointer
# past these events on EITHER side without alignment, so they
# never participate in matched-prefix.
#
# `contention.observed` (v1.4): canary emits when its
# RtlEnterCriticalSection spin loop is exhausted (cvar-gated,
# `kernel_emit_contention=true`). Ours emits from
# `rtl_enter_critical_section` when the Stage-3 manifest fires (env
# var `XENIA_CONTENTION_MANIFEST_PATH`). The payload's `cs_ptr` and
# `site_sid` use each engine's native guest VA, which differ under
# AUDIT-043 allocator ε; comparing payloads would always diverge.
# The KIND itself (one event consuming one per-tid idx slot at the
# same logical call site) is what matters, so we drop the entire
# event from the diff.
#
# `sema.release` (v1.6 — added in AUDIT-069 Session 6, the
# AUDIT-070 bridge): symmetric semaphore-release event. Both
# engines MAY emit (cvar-gated default-off in canary; runtime-flag
# gated default-off in ours). Cadence is host-scheduler-driven on
# the work-semaphore (per AUDIT-069 H3: ours under-produces by ~80%
# on the cache-thread); strict positional alignment would always
# trip on AUDIT-069's known divergence. The kind is `ENGINE_LOCAL`
# at the alignment layer — but we surface per-engine counts and
# per-handle-SID counts in the summary so future regressions are
# diff-visible. See schema-v1.md §"sema.release (v1.6)".
ENGINE_LOCAL_KINDS = {"contention.observed", "sema.release", "signal.match", "wake.requested"}
# Kinds whose total counts are surfaced in the report summary even
# though they don't participate in matched-prefix alignment.
COUNTED_ENGINE_LOCAL_KINDS = {"sema.release"}
# Fields the diff tool skips (engine-local or host-clock).
SKIP_TOP_FIELDS = {"engine", "host_ns", "guest_cycle", "deterministic"}
# Within a payload: skipped fields by kind (in addition to the global set).
SKIP_PAYLOAD_FIELDS_BY_KIND = {
# raw_handle_id is engine-local; the diff key is handle_semantic_id.
# Phase C+15-α: handle_semantic_id is computed via FNV-1a over
# `(create_site_pc, creating_tid, tid_event_idx_at_creation, object_type)`.
# `creating_tid` differs cross-engine (canary tid=6 maps to ours tid=1
# etc.), so the SID is engine-local for cross-engine comparison.
# Skip the SID field at the diff layer; rely on tid_event_idx +
# object_type + payload fields to align. Same rationale as raw_handle_id.
# `parent_tid` likewise differs cross-engine.
"handle.create": {"raw_handle_id", "handle_semantic_id"},
"handle.destroy": {"raw_handle_id", "handle_semantic_id"},
"thread.create": {"handle_semantic_id", "parent_tid"},
"wait.begin": {"handles_semantic_ids"},
# wait_duration_cycles is non-deterministic (host scheduling).
"wait.end": {"wait_duration_cycles", "woken_by_semantic_id"},
}
# Allocator-returning kernel exports whose `kernel.return.payload.return_value`
# is a host-allocator-dependent guest VA. Canary and ours legitimately route
# allocations to different heap regions (e.g. canary `MmAllocatePhysicalMemoryEx`
# returns `0xBC220000` from `vC0000000` while ours returns `0x40105000` from
# its single user-heap region — see AUDIT-043 "ε host-allocator address-space
# divergence" and Phase B `report.md` ε-class). Comparing raw VAs would always
# diverge at the first allocator call.
#
# Canonicalization: per `(tid, export_name)` we assign a stable ordinal
# (0, 1, 2, …) to each successive `kernel.return.return_value`, replacing
# both sides' value with the sentinel string `<ALLOC_<NAME>_<ORDINAL>>`
# before payload comparison. As long as both engines call the same
# allocator the same number of times in the same order on a given thread,
# the comparison treats them as equivalent.
#
# Limitations (documented):
# * If one engine calls an allocator more times than the other, ordinals
# drift and subsequent allocator returns appear as divergences. That's
# the correct outcome — ordinal-count mismatch IS a behavioral
# divergence.
# * `payload.status` is left untouched: it's a copy of the raw VA in
# hex-string form, useful in diff context.
# * Other payload fields that happen to embed an allocator VA (e.g. a
# future `args_resolved.base_address` in a free-call) are NOT
# canonicalized — out of scope for this divergence. Extend the set
# below as new divergence classes surface.
ALLOCATOR_RETURN_FNS = frozenset(
[
"MmAllocatePhysicalMemoryEx",
"MmAllocatePhysicalMemory",
"NtAllocateVirtualMemory",
"RtlAllocateHeap",
"MmCreateKernelStack",
# Phase C+3: `RtlImageXexHeaderField` returns either a plain
# inline value (key low byte = 0x00) OR a guest VA inside the
# in-guest XEX header copy (key low byte = 0x01 or "else"). The
# latter is host-allocator-dependent (canary's `guest_xex_header_`
# via `Memory::SystemHeapAlloc` lands in the `0x30xxxxxx` virtual-
# heap region; ours's `KernelState::heap_alloc` cursor lands in
# `0x4xxxxxxx`). Canonicalize the whole class — for inline-value
# keys the sentinel will still match per (tid, name) ordinal
# provided both engines emit the same call sequence in the same
# order. See Phase C+3 fix for the implementation parity.
"RtlImageXexHeaderField",
# Phase D D-extension follow-up: `XamNotifyCreateListener`
# returns a 64-bit identity that differs across engines —
# canary returns the sign-extended host kernel-space pointer
# of the `XamNotifyListener` object (e.g. 0xFFFFFFFFFE8E110C),
# ours returns the allocated guest handle id (e.g. 0x10E0).
# Both are stable per-(tid, name) ordinal so canonicalization
# via `<ALLOC_XamNotifyCreateListener_N>` lines them up. Surface
# the underlying semantic divergence as the listener handle's
# subsequent use (`XamNotifyGetNext` etc.) rather than at the
# creation call.
"XamNotifyCreateListener",
# Phase C+25: `MmGetPhysicalAddress` is a VA→PA translator whose
# return depends on which heap region the input VA lives in. This
# is the downstream consequence of C+2's deferred Path β (canary
# has three physical heaps at vA0/vC0/vE0 routed by page size,
# ours has a single unified heap_cursor starting at 0x40000000).
# Concretely: at C+25 idx 105,112 canary returned 0x150B0000
# (input 0xF50AF000 in `vE0000000` heap: addr - 0xE0000000 + 0x1000
# per `PhysicalHeap::GetPhysicalAddress`, see `memory.cc:2317`),
# while ours returned 0x0ADCF000 (input ~0x4ADCF000 in unified heap,
# masked via `& 0x1FFF_FFFF` per `exports.rs:985`). Both engines'
# translations are SELF-CONSISTENT — game code passes the PA
# opaquely to GPU (`VdInitializeRingBuffer` is the very next call)
# and the GPU translates it back to a host pointer using the same
# engine's heap map. Per-(tid,name) ordinal sentinel preserves the
# opaque-pass-through semantics while exposing actual divergences
# (e.g. game-side arithmetic on the PA, or a translation-count
# mismatch). Lifting the engine-side three-physical-heaps memory
# model is the C+2 Path β deferral, out of scope for C+25 (see
# `project_phase_c2_MmAllocatePhysicalMemoryEx_2026_05_13.md`).
"MmGetPhysicalAddress",
]
)
def canonicalize_allocator_returns(events_by_tid: dict) -> None:
"""In-place: rewrite `payload.return_value` for every kernel.return whose
`payload.name` is in ALLOCATOR_RETURN_FNS, replacing the raw VA with
`<ALLOC_<NAME>_<ORDINAL>>`. Ordinals are per (tid, name) and assigned
in event order.
Called on each engine's stream independently; because ordinals are
assigned deterministically by per-tid call order, equivalent streams
produce equivalent sentinels."""
for tid, evs in events_by_tid.items():
# name -> next ordinal to assign on this tid
counters: dict[str, int] = {}
for ev in evs:
if ev.get("kind") != "kernel.return":
continue
payload = ev.get("payload") or {}
name = payload.get("name")
if name not in ALLOCATOR_RETURN_FNS:
continue
ordinal = counters.get(name, 0)
counters[name] = ordinal + 1
sentinel = f"<ALLOC_{name}_{ordinal}>"
payload["return_value"] = sentinel
# `payload.status` mirrors `return_value` as a hex string for
# allocator entries (xboxkrnl trampoline doesn't distinguish
# NTSTATUS from pointer-typed returns). Canonicalize together
# so they stay in lockstep.
if "status" in payload:
payload["status"] = sentinel
# Phase C+22 (v1.7) — payload-field canonicalization for host-heap-derived
# guest addresses that appear NOT as `kernel.return.return_value` but inside
# typed event payloads. These are the "second-class" allocator returns: a
# kernel call (e.g. ExCreateThread) allocates a TLS/context block via the
# host allocator, then the block's guest VA shows up in a *typed* downstream
# event (e.g. `thread.create.ctx_ptr`). The VA lives in different host-heap
# regions across engines (canary's `0xBCxxxxxx` BC physical heap vs ours's
# `0x4xxxxxxx` unified user heap — same AUDIT-043 ε class as C+2's
# `MmAllocatePhysicalMemoryEx`), so strict comparison always diverges.
#
# Canonicalization (mirrors `canonicalize_allocator_returns`): per
# `(tid, kind, field)` we assign a stable ordinal in per-tid event order and
# rewrite the field's hex-string value with `<HOSTHEAP_<KIND>_<FIELD>_<ORDINAL>>`.
# As long as both engines emit the same number of typed events on a given
# tid in the same order, the comparison treats them as equivalent.
#
# Map shape: kind -> tuple of payload-field names. The fields MUST hold a
# `0x`-prefixed hex string (guest VA); other types are left alone.
#
# Limitations (documented):
# * Ordinal-count mismatch IS a behavioral divergence (per-tid call-order
# skew → drifting sentinels → divergence reported at the first skewed
# event). Same contract as `ALLOCATOR_RETURN_FNS`.
# * The field is rewritten in-place. The pre-canonicalization raw VA is
# still preserved in the on-disk JSONL (we operate on the in-memory
# per-engine event dicts loaded by `load_events`).
# * Strictly compared fields next to the canonicalized one (e.g.
# `entry_pc`, `priority`, `affinity`, `stack_size`, `suspended` on a
# `thread.create`) are UNTOUCHED — they are game-visible attributes
# that must match bit-identically across engines.
#
# Empirical first surface (C+22, cold-vs-cold idx 105,128 — Sylpheed
# ExCreateThread for the audio-stack worker entry `0x824cd458`):
# canary: ctx_ptr = "0xbe56bb3c" (BC physical heap)
# ours: ctx_ptr = "0x42453b3c" (unified user heap)
# Both engines correctly allocate; both pass the resulting VA opaquely
# into the new guest thread's r3 register; the diff tool's only sensible
# behavior is to canonicalize and align by ordinal.
HOST_HEAP_PAYLOAD_FIELDS_BY_KIND: dict[str, tuple[str, ...]] = {
"thread.create": ("ctx_ptr",),
}
def canonicalize_host_heap_payload_fields(events_by_tid: dict) -> None:
"""In-place: rewrite host-heap-derived guest VA fields per
HOST_HEAP_PAYLOAD_FIELDS_BY_KIND.
For each event whose `kind` is a key in the map, replace each
listed payload field's value (expected `0x`-prefixed hex string)
with the per-(tid, kind, field) ordinal sentinel
`<HOSTHEAP_<KIND>_<FIELD>_<ORDINAL>>`. Non-string values and
missing fields are left untouched (defensive — pre-C+22 logs that
happen to omit the field for some reason still parse).
Per-tid ordinals are independent across (kind, field) pairs, so
e.g. `thread.create.ctx_ptr` ordinals do not interfere with a
(future) `thread.create.other_ptr` canonicalization.
Called once per engine's stream independently; equivalent
per-tid call sequences produce equivalent sentinels."""
for _tid, evs in events_by_tid.items():
# (kind, field) -> next ordinal on this tid
counters: dict[tuple[str, str], int] = {}
for ev in evs:
kind = ev.get("kind")
fields = HOST_HEAP_PAYLOAD_FIELDS_BY_KIND.get(kind)
if not fields:
continue
payload = ev.get("payload") or {}
for field in fields:
if field not in payload:
continue
value = payload[field]
# Defensive: only canonicalize string-typed VAs. A field
# that's already a sentinel (e.g. from a previous call
# in test-mode) or a non-string is left alone.
if not isinstance(value, str):
continue
key = (kind, field)
ordinal = counters.get(key, 0)
counters[key] = ordinal + 1
sentinel = (
f"<HOSTHEAP_{kind}_{field}_{ordinal}>"
)
payload[field] = sentinel
def load_events(path: Path) -> dict:
"""Return {tid: [event, ...]} keyed by tid, ordered by tid_event_idx.
Validates the schema header (first line must be schema_version=1).
"""
events_by_tid: dict[int, list[dict]] = {}
with path.open("r", encoding="utf-8") as f:
first = f.readline()
if not first:
raise SystemExit(f"{path}: empty file")
hdr = json.loads(first)
if hdr.get("kind") != "schema_version":
raise SystemExit(
f"{path}: first event is not schema_version (got {hdr.get('kind')!r})"
)
if hdr.get("schema_version") != SCHEMA_VERSION:
raise SystemExit(
f"{path}: schema_version mismatch (expected {SCHEMA_VERSION}, got {hdr.get('schema_version')!r})"
)
for lineno, line in enumerate(f, start=2):
line = line.rstrip("\n")
if not line:
continue
try:
ev = json.loads(line)
except json.JSONDecodeError as e:
raise SystemExit(f"{path}:{lineno}: invalid JSON ({e})")
tid = ev.get("tid")
if tid is None:
raise SystemExit(f"{path}:{lineno}: missing tid")
events_by_tid.setdefault(tid, []).append(ev)
# Ensure each per-tid list is already monotonic by tid_event_idx.
for tid, evs in events_by_tid.items():
for i, ev in enumerate(evs):
if ev.get("tid_event_idx") != i:
# Note: the schema permits one engine to emit fewer events; we
# only validate the in-file ordering is strictly monotonic.
if i > 0 and ev["tid_event_idx"] <= evs[i - 1]["tid_event_idx"]:
raise SystemExit(
f"{path}: tid={tid} events out of order at index {i}"
)
return events_by_tid
def auto_tid_map(canary_evs: dict, ours_evs: dict) -> dict[int, int]:
"""Naive tid mapping: pair canary tids with ours tids by the first
kernel.call name in each stream. Documented limitation in README."""
def first_call_name(evs: list[dict]) -> str | None:
for ev in evs:
if ev.get("kind") == "kernel.call":
return ev["payload"].get("name")
return None
canary_by_first = {}
for tid, evs in canary_evs.items():
name = first_call_name(evs)
if name is not None:
canary_by_first.setdefault(name, []).append(tid)
ours_by_first = {}
for tid, evs in ours_evs.items():
name = first_call_name(evs)
if name is not None:
ours_by_first.setdefault(name, []).append(tid)
mapping: dict[int, int] = {}
for name, c_tids in canary_by_first.items():
o_tids = ours_by_first.get(name, [])
for c, o in zip(sorted(c_tids), sorted(o_tids)):
mapping[c] = o
return mapping
def parse_tid_map_arg(s: str) -> dict[int, int]:
"""Parse `--tid-map 6=1,7=2` into {6: 1, 7: 2}."""
out: dict[int, int] = {}
for token in s.split(","):
token = token.strip()
if not token:
continue
if "=" not in token:
raise SystemExit(f"--tid-map: bad token {token!r} (expected canary=ours)")
a, b = token.split("=", 1)
out[int(a.strip(), 0)] = int(b.strip(), 0)
return out
# Iterate 2.L (2026-05-28) — payload fields whose mismatch is given
# diagnostic PRIORITY on `kernel.return` events. The generic per-field
# walk would still surface these eventually, but reading-error #41
# showed that mixing them in among allocator/SID/etc. noise risked
# burying the most actionable signal (return-value inversion =
# state-parity bug). Surfacing them first + tagging the category
# explicitly makes future iterates spot the class at a glance.
#
# Skip rule honored: if `return_value` is absent on either side we
# don't synthesize a divergence — the generic walk's missing-key
# logic still applies. Allocator returns are already canonicalized
# upstream via `ALLOCATOR_RETURN_FNS`, so they cannot trip this
# check (the sentinels match on both sides by construction).
_KERNEL_RETURN_PRIORITY_FIELDS = ("return_value", "status")
def _format_return_value_diff(
name: str | None, field: str, vc, vo
) -> str:
"""One-line, category-tagged diff for a kernel.return payload
field. Includes the function name and both raw values. Used by
`compare_payload` for the kernel.return priority fields. The
`[return_value mismatch]` / `[status mismatch]` tag is intended
to be greppable across diff reports."""
name_part = f" name={name}" if name else ""
return (
f"[{field} mismatch] kernel.return{name_part}: "
f"canary={vc!r} ours={vo!r}"
)
def _format_kernel_call_arg_diff(
name: str | None, sub: str, key: str, vc, vo
) -> str:
"""Category-tagged diff for a kernel.call payload sub-dict mismatch
(`args` or `args_resolved`). Surfaces the function name + the
nested key so a diff like `args_resolved.path` mismatch is
trivially greppable. Iterate 2.L extension."""
name_part = f" name={name}" if name else ""
return (
f"[{sub}.{key} mismatch] kernel.call{name_part}: "
f"canary={vc!r} ours={vo!r}"
)
def compare_payload(kind: str, p_canary: dict, p_ours: dict) -> str | None:
"""Compare two payloads. Returns None if equivalent, else a short
human-readable description of the first differing field.
Iterate 2.L (2026-05-28): on `kernel.return` events, the
`return_value` and `status` fields are checked FIRST and emit a
category-tagged diff string. Closes reading-error #41 (silent
test-harness state leak invalidating trace diffs) by surfacing
state-parity inversions (e.g. cache-probe SUCCESS vs NO_SUCH_FILE)
with a greppable `[return_value mismatch]` prefix instead of
burying them in a generic `payload.X` walk. Same for kernel.call
`args` / `args_resolved` sub-dicts: nested mismatches get
`[args_resolved.path mismatch]` etc. so the class is visible at a
glance.
Skip rule: a priority field is checked only when present on BOTH
sides; one-sided absence falls through to the generic walk's
missing-key path (which preserves the pre-2.L behavior)."""
skip = SKIP_PAYLOAD_FIELDS_BY_KIND.get(kind, set())
# Iterate 2.L priority pass: kernel.return return_value/status first.
if kind == "kernel.return":
name = p_canary.get("name") or p_ours.get("name")
for field in _KERNEL_RETURN_PRIORITY_FIELDS:
if field in skip:
continue
if field not in p_canary or field not in p_ours:
# Schema gap — defer to the generic walk's missing-key path.
continue
vc = p_canary[field]
vo = p_ours[field]
if vc != vo:
return _format_return_value_diff(name, field, vc, vo)
# Iterate 2.L priority pass: kernel.call args / args_resolved sub-dict
# mismatches surface category-tagged so an `args_resolved.path`
# divergence (e.g. canonical-path drift) doesn't read as a generic
# `payload.args_resolved: canary={...} ours={...}` blob.
if kind == "kernel.call":
name = p_canary.get("name") or p_ours.get("name")
for sub in ("args", "args_resolved"):
if sub in skip:
continue
sc = p_canary.get(sub)
so = p_ours.get(sub)
if not isinstance(sc, dict) or not isinstance(so, dict):
continue
if sc == so:
continue
# Walk sub-dict in canary key order; first differing key wins.
for k in sc.keys():
if k not in so or sc[k] != so[k]:
return _format_kernel_call_arg_diff(
name, sub, k, sc.get(k), so.get(k)
)
for k in so.keys():
if k not in sc:
return _format_kernel_call_arg_diff(
name, sub, k, None, so[k]
)
# Compare the union of keys excluding skipped ones, in canary's key order
# first (stable), then any ours-only fields.
keys_seen: set[str] = set()
for k in p_canary.keys():
if k in skip:
continue
keys_seen.add(k)
vc = p_canary.get(k)
vo = p_ours.get(k)
if vc != vo:
return f"payload.{k}: canary={vc!r} ours={vo!r}"
for k in p_ours.keys():
if k in skip or k in keys_seen:
continue
if p_ours[k] is not None:
return f"payload.{k}: canary=<missing> ours={p_ours[k]!r}"
return None
def compare_event(ev_canary: dict, ev_ours: dict) -> str | None:
"""Compare two events. Returns None if equivalent, else a short description.
Phase C+18: the per-tid `tid_event_idx` field is NOT compared field-to-
field. Both engines emit monotonic indices, but a floating shared-global
`handle.create` absorbed on one side will leave the running indices
offset by 1 — preserving the canonical pre/post alignment is what the
diff tool needs, and that's enforced by the stream-pointer walk in
`diff_one_tid`. The legacy "tid_event_idx must match" check was a
stricter form of the same invariant; relaxing it permits the floating-
create absorb without weakening the per-position comparison. (The
raw indices are still preserved in the events themselves and shown in
the diff report.)
"""
# Top-level comparison: kind must match.
if ev_canary.get("kind") != ev_ours.get("kind"):
return f"kind: canary={ev_canary.get('kind')!r} ours={ev_ours.get('kind')!r}"
# Payload comparison.
pc = ev_canary.get("payload", {})
po = ev_ours.get("payload", {})
diff = compare_payload(ev_canary["kind"], pc, po)
if diff:
return diff
return None
def render_event(ev: dict) -> str:
"""One-line summary of an event for the diff report."""
kind = ev.get("kind", "?")
idx = ev.get("tid_event_idx", "?")
payload = ev.get("payload", {})
if kind in ("kernel.call", "kernel.return", "import.call"):
name = payload.get("name") or payload.get("ord")
return f"[{idx}] {kind} {name}"
if kind in ("handle.create", "handle.destroy"):
sid = payload.get("handle_semantic_id", "?")
return f"[{idx}] {kind} sid={sid}"
if kind in ("thread.create", "thread.exit"):
return f"[{idx}] {kind} {payload}"
if kind in ("wait.begin", "wait.end"):
return f"[{idx}] {kind} {payload}"
return f"[{idx}] {kind} {payload}"
def _is_import_call_named(ev: dict, name: str) -> bool:
return (
ev.get("kind") == "import.call"
and (ev.get("payload") or {}).get("name") == name
)
def _is_kernel_call_named(ev: dict, name: str) -> bool:
return (
ev.get("kind") == "kernel.call"
and (ev.get("payload") or {}).get("name") == name
)
def _is_kernel_return_named(ev: dict, name: str) -> bool:
return (
ev.get("kind") == "kernel.return"
and (ev.get("payload") or {}).get("name") == name
)
def _looks_like_enter_block(canary: list[dict], i: int) -> bool:
"""True iff canary[i..i+3] is a fast-path RtlEnterCriticalSection
(import.call → kernel.call → kernel.return on the same name)."""
if i + 3 > len(canary):
return False
return (
_is_import_call_named(canary[i], "RtlEnterCriticalSection")
and _is_kernel_call_named(canary[i + 1], "RtlEnterCriticalSection")
and _is_kernel_return_named(canary[i + 2], "RtlEnterCriticalSection")
)
def _looks_like_leave_block(canary: list[dict], i: int) -> bool:
if i + 3 > len(canary):
return False
return (
_is_import_call_named(canary[i], "RtlLeaveCriticalSection")
and _is_kernel_call_named(canary[i + 1], "RtlLeaveCriticalSection")
and _is_kernel_return_named(canary[i + 2], "RtlLeaveCriticalSection")
)
# Phase D D-extension (v1.5): cap on nested-cleanup pairs to absorb in a
# single fold. Sylpheed's empirical max at the 104,607 cap is ~30 pairs
# (one per work item in the canary registry/tree). Anything beyond this
# is suspicious and likely a real divergence; refuse to absorb.
_NESTED_CS_PAIR_CAP = 32
def _try_absorb_nested_cs_cleanup(
canary: list[dict],
ours: list[dict],
ic: int,
io: int,
) -> int | None:
"""Phase D D-extension (v1.5): when the diff is at a kind mismatch
of `import.call RtlEnterCriticalSection` (canary) vs
`import.call RtlLeaveCriticalSection` (ours), look ahead in canary
for one or more balanced `[Enter-block, Leave-block]` pairs (each
pair = 6 events) followed by an event with the SAME kind as
`ours[io]`. If found, return the number of canary events to skip
(a multiple of 6); else None.
This is the band-aid absorber documented in plan.md's backstop §2
and forensics.md. It crosses reading-error #23 in spirit (folding
real guest behavior at the diff layer) but with a narrow trigger
that only fires for the exact E-vs-L kind mismatch shape.
"""
# Both sides must be at an import.call; the kinds must match the
# exact E-vs-L pattern. (Other kind mismatches are real
# divergences; do not touch them.)
if not _is_import_call_named(canary[ic], "RtlEnterCriticalSection"):
return None
if not _is_import_call_named(ours[io], "RtlLeaveCriticalSection"):
return None
# Walk canary's stream consuming balanced [Enter, Leave] pairs.
pos = ic
pairs = 0
while pairs < _NESTED_CS_PAIR_CAP:
if not _looks_like_enter_block(canary, pos):
break
if not _looks_like_leave_block(canary, pos + 3):
break
pairs += 1
pos += 6
# Convergence check: after consuming this pair, canary's next
# event should look like ours's current event. Greedy match —
# the first convergence wins.
if pos < len(canary) and canary[pos].get("kind") == ours[io].get("kind"):
cp = canary[pos].get("payload") or {}
op = ours[io].get("payload") or {}
if cp.get("name") == op.get("name"):
return pos - ic
return None
def diff_one_tid(
canary_evs: list[dict],
ours_evs: list[dict],
canary_tid: int,
ours_tid: int,
cross_tid_floating_sids: set[str] | None = None,
disabled_absorbers: frozenset[str] | None = None,
absorbed_sink: list[dict] | None = None,
) -> dict:
"""Walk one mapped pair. Stop at the first behavioral divergence.
Phase C+18: when a kind mismatch is found at the current position and
one side has a `handle.create` whose SID is a "floating" cross-tid
shared-global SID (present in `cross_tid_floating_sids`), advance
only that side's pointer (treating the event as not-belonging-to-
this-tid's-sequence) and re-compare. This handles the
process-global-dispatcher first-toucher race — see schema-v1.md
§"Shared-global SIDs" and the C+18 memory entry.
Phase C+21: extend the floating-absorb to `wait.begin` events whose
`handles_semantic_ids` reference shared-global SIDs. The contention
pattern on these dispatchers is host-scheduler-driven; one cold run
may emit `wait.begin` (slow path) while another fast-paths and emits
only `kernel.return` — see reading-error #32 and the C+20
`cold-vs-cold-result.md` jitter table. Absorbing these wait.begin
events makes the diff transparent to scheduling-jitter windows
on shared dispatchers without weakening per-thread wait alignment.
`cross_tid_floating_sids` is the set of shared-global SIDs that
appear in EITHER engine's stream on ANY tid (built by
`collect_shared_global_sids`); passing `None` falls back to strict
per-tid alignment (legacy behavior).
Phase absorber-review (2026-05-19, investigation-only):
* `disabled_absorbers` — frozenset subset of
{"shared-global", "wait-begin", "nested-cs"}. When an absorber's
name is present, that absorber is skipped (treated as if not
present in the diff tool). Default `None` preserves the
production absorber-on-everywhere behavior. Used by the
`--disable-absorber=` CLI flag for selective-disable re-runs.
* `absorbed_sink` — optional list that, when provided, receives a
dict per absorption event, with keys:
- `absorber` ∈ {"shared-global", "wait-begin", "nested-cs"}
- `side` ∈ {"canary", "ours"}
- `canary_tid`, `ours_tid` (mapped pair under test)
- `matched_at` (position in matched-prefix when absorber fired)
- `event` (the absorbed event verbatim — JSON-serializable)
- For nested-cs: `pairs_consumed` (count of [E,L] pairs folded)
Used by `--emit-absorbed-events` to write a JSONL log of every
silenced event so we can cross-reference against the wedge.
"""
floating = cross_tid_floating_sids if cross_tid_floating_sids else set()
disabled = disabled_absorbers if disabled_absorbers else frozenset()
matched = 0
pre_context: list[tuple[dict, dict]] = []
diverged_at: int | None = None
diff_descr: str | None = None
skipped_canary: list[dict] = []
skipped_ours: list[dict] = []
skipped_canary_wait: list[dict] = []
skipped_ours_wait: list[dict] = []
ic = 0
io = 0
skipped_canary_engine_local: list[dict] = []
skipped_ours_engine_local: list[dict] = []
while ic < len(canary_evs) and io < len(ours_evs):
ec = canary_evs[ic]
eo = ours_evs[io]
# Phase D Stage 4: engine-local event kinds (e.g.
# `contention.observed`) are emitted by only one engine under
# default config, or carry engine-local payloads (per-engine
# cs_ptr / site_sid that diverge under AUDIT-043 allocator ε).
# Advance the per-tid pointer past these events on EITHER side
# without alignment so they never participate in matched-prefix.
if ec.get("kind") in ENGINE_LOCAL_KINDS:
skipped_canary_engine_local.append(ec)
ic += 1
continue
if eo.get("kind") in ENGINE_LOCAL_KINDS:
skipped_ours_engine_local.append(eo)
io += 1
continue
d = compare_event(ec, eo)
if d is None:
matched += 1
pre_context.append((ec, eo))
if len(pre_context) > 5:
pre_context.pop(0)
ic += 1
io += 1
continue
# Phase C+18: cross-tid floating handle.create.
# One side may have a `handle.create` for a process-global
# dispatcher whose SID was emitted on a DIFFERENT tid in the
# other engine. The SID is engine-agnostic (deterministic over
# `(pointer, object_type)`), so we KNOW the event is observation-
# side. Advance only that side's pointer and re-compare.
if (
"shared-global" not in disabled
and ec.get("kind") == "handle.create"
and eo.get("kind") != "handle.create"
and _ev_handle_create_sid(ec) in floating
):
skipped_canary.append(ec)
if absorbed_sink is not None:
absorbed_sink.append({
"absorber": "shared-global",
"side": "canary",
"canary_tid": canary_tid,
"ours_tid": ours_tid,
"matched_at": matched,
"event": ec,
})
ic += 1
continue
if (
"shared-global" not in disabled
and eo.get("kind") == "handle.create"
and ec.get("kind") != "handle.create"
and _ev_handle_create_sid(eo) in floating
):
skipped_ours.append(eo)
if absorbed_sink is not None:
absorbed_sink.append({
"absorber": "shared-global",
"side": "ours",
"canary_tid": canary_tid,
"ours_tid": ours_tid,
"matched_at": matched,
"event": eo,
})
io += 1
continue
# Phase C+21: cross-tid floating wait.begin.
# One side may have a `wait.begin` on a process-global dispatcher
# that the OTHER side fast-paths past (no wait — uncontended in
# that cold run). The wait.begin's handles_semantic_ids reference
# shared-global SIDs whose creation order and contention pattern
# are host-scheduler-driven. Absorb the wait.begin on whichever
# side has it. Strict per-tid alignment still holds for
# `wait.begin` events whose handles are all per-thread SIDs.
if (
"wait-begin" not in disabled
and ec.get("kind") == "wait.begin"
and eo.get("kind") != "wait.begin"
and is_shared_global_wait_begin(ec, floating)
):
skipped_canary_wait.append(ec)
if absorbed_sink is not None:
absorbed_sink.append({
"absorber": "wait-begin",
"side": "canary",
"canary_tid": canary_tid,
"ours_tid": ours_tid,
"matched_at": matched,
"event": ec,
})
ic += 1
continue
if (
"wait-begin" not in disabled
and eo.get("kind") == "wait.begin"
and ec.get("kind") != "wait.begin"
and is_shared_global_wait_begin(eo, floating)
):
skipped_ours_wait.append(eo)
if absorbed_sink is not None:
absorbed_sink.append({
"absorber": "wait-begin",
"side": "ours",
"canary_tid": canary_tid,
"ours_tid": ours_tid,
"matched_at": matched,
"event": eo,
})
io += 1
continue
# Phase D D-extension (v1.5): nested-CS-cleanup absorber.
#
# CAVEAT (reading-error #23 boundary): this absorber folds REAL
# guest control-flow divergence at the diff-tool layer. It exists
# because the 104,607 Sylpheed cap (Phase D forensics) is a
# producer-throughput divergence — canary's preemptive
# host-OS scheduling lets a peer tid insert more work items
# into a CS-protected registry/tree during a notification-event
# wait window than ours's cooperative scheduler does. The
# consumer thread then takes an `[E L]`-nested-cleanup branch
# in canary but a fast-Leave branch in ours. Fixing this in
# ours's engine would require preempting the cooperative
# scheduler (which invalidates 23 phases of digest stability;
# explicitly out of scope per the H' plan).
#
# The absorber is narrow: it only fires at the specific kind
# mismatch `import.call RtlEnterCriticalSection` (canary) vs
# `import.call RtlLeaveCriticalSection` (ours), looks ahead in
# canary for balanced `[Enter, Leave]` pairs (6 events each
# consuming idx N..N+5), and only absorbs when canary's
# post-absorption stream re-aligns with ours's current event
# via a matching kind. Other kind mismatches fall through to
# the existing divergence reporting unchanged.
if "nested-cs" not in disabled:
absorbed_d_ext = _try_absorb_nested_cs_cleanup(
canary_evs, ours_evs, ic, io
)
if absorbed_d_ext is not None:
skipped_canary_d_ext = absorbed_d_ext
if absorbed_sink is not None:
# Record every event in the absorbed window verbatim.
pairs_consumed = skipped_canary_d_ext // 6
for j in range(skipped_canary_d_ext):
absorbed_sink.append({
"absorber": "nested-cs",
"side": "canary",
"canary_tid": canary_tid,
"ours_tid": ours_tid,
"matched_at": matched,
"event": canary_evs[ic + j],
"pairs_consumed": pairs_consumed,
"window_offset": j,
})
ic += skipped_canary_d_ext
continue
diverged_at = matched # report position in the matched-prefix space
diff_descr = d
break
return {
"canary_tid": canary_tid,
"ours_tid": ours_tid,
"matched": matched,
"canary_total": len(canary_evs),
"ours_total": len(ours_evs),
"diverged_at": diverged_at,
"diff_descr": diff_descr,
"pre_context": pre_context,
"post_canary": (
canary_evs[ic] if diverged_at is not None and ic < len(canary_evs) else None
),
"post_ours": (
ours_evs[io] if diverged_at is not None and io < len(ours_evs) else None
),
"next_canary": (
canary_evs[ic + 1]
if diverged_at is not None and ic + 1 < len(canary_evs)
else None
),
"next_ours": (
ours_evs[io + 1]
if diverged_at is not None and io + 1 < len(ours_evs)
else None
),
"skipped_canary": skipped_canary,
"skipped_ours": skipped_ours,
"skipped_canary_wait": skipped_canary_wait,
"skipped_ours_wait": skipped_ours_wait,
}
def _ev_handle_create_sid(ev: dict) -> str:
"""Return the lowercased SID string of a `handle.create` event, or ''."""
p = ev.get("payload") or {}
sid = p.get("handle_semantic_id")
return sid if isinstance(sid, str) else ""
def count_engine_local_kinds(events_by_tid: dict[int, list[dict]]) -> dict[str, dict[int, int]]:
"""v1.6 (AUDIT-070 bridge): tally `COUNTED_ENGINE_LOCAL_KINDS` events
per-tid for surfacing in the report. `events_by_tid` is the
per-tid event list as loaded by `load_events` (whose return shape
is dict[tid] -> list[event_dict])."""
out: dict[str, dict[int, int]] = {k: {} for k in COUNTED_ENGINE_LOCAL_KINDS}
for tid, evs in events_by_tid.items():
for ev in evs:
k = ev.get("kind")
if k in COUNTED_ENGINE_LOCAL_KINDS:
out[k][tid] = out[k].get(tid, 0) + 1
return out
def render_report(per_tid_results: list[dict],
counted_canary: dict[str, dict[int, int]] | None = None,
counted_ours: dict[str, dict[int, int]] | None = None) -> str:
out: list[str] = []
out.append("# Phase A diff report")
out.append("")
out.append("**This report is the output of Phase A's diff harness. Divergences")
out.append("shown here are INPUT for Phase B (first-divergence localization),")
out.append("not findings of Phase A.** Phase A's job is to make the harness")
out.append("itself correct, not to analyze what it surfaces.")
out.append("")
out.append("## Summary")
out.append("")
out.append(
"| canary_tid | ours_tid | matched | canary_total | ours_total | "
"first_divergence_at | floating_create (c/o) | floating_wait (c/o) |"
)
out.append("|---|---|---|---|---|---|---|---|")
for r in per_tid_results:
div = r["diverged_at"] if r["diverged_at"] is not None else ""
sc = len(r.get("skipped_canary") or [])
so = len(r.get("skipped_ours") or [])
scw = len(r.get("skipped_canary_wait") or [])
sow = len(r.get("skipped_ours_wait") or [])
out.append(
f"| {r['canary_tid']} | {r['ours_tid']} | {r['matched']} | "
f"{r['canary_total']} | {r['ours_total']} | {div} | "
f"{sc}/{so} | {scw}/{sow} |"
)
out.append("")
out.append(
"*`floating_create (c/o)` counts shared-global `handle.create` events "
"absorbed by Phase C+18 cross-tid SID matching. "
"`floating_wait (c/o)` counts `wait.begin` events on shared-global "
"dispatchers absorbed by Phase C+21 (scheduling-jitter window — "
"canary's contention slow path may fire while ours fast-paths or "
"vice versa). See schema-v1.md §\"Shared-global SIDs\" and §\"Wait-begin "
"floating absorb\".*"
)
out.append("")
# v1.6 (AUDIT-070 bridge): surface counted-engine-local kinds.
# `sema.release` cadence is the primary diff-visible metric for
# AUDIT-069 H3 (worker under-production); we count totals per
# engine but do NOT align positionally — see ENGINE_LOCAL_KINDS.
if (counted_canary or counted_ours) and any(
counted_canary.get(k) or counted_ours.get(k)
for k in COUNTED_ENGINE_LOCAL_KINDS
):
out.append("## Counted engine-local kinds (v1.6)")
out.append("")
out.append("| kind | canary total | ours total | per-tid (canary) | per-tid (ours) |")
out.append("|---|---:|---:|---|---|")
for kind in sorted(COUNTED_ENGINE_LOCAL_KINDS):
cc = (counted_canary or {}).get(kind, {})
co = (counted_ours or {}).get(kind, {})
cc_total = sum(cc.values())
co_total = sum(co.values())
cc_pertid = ", ".join(f"tid{t}={n}" for t, n in sorted(cc.items()))
co_pertid = ", ".join(f"tid{t}={n}" for t, n in sorted(co.items()))
out.append(
f"| `{kind}` | {cc_total} | {co_total} | "
f"{cc_pertid or ''} | {co_pertid or ''} |"
)
out.append("")
out.append(
"*These kinds are positionally engine-local (see "
"`ENGINE_LOCAL_KINDS`) — the diff tool does NOT align them. "
"Per-engine total cadence is surfaced here so regressions "
"are visible at-a-glance.*"
)
out.append("")
for r in per_tid_results:
out.append(f"## canary_tid={r['canary_tid']} → ours_tid={r['ours_tid']}")
out.append("")
if r["diverged_at"] is None:
out.append(
f"No divergence within the {r['matched']} compared events "
f"(canary has {r['canary_total']}, ours has {r['ours_total']})."
)
out.append("")
continue
# Iterate 2.L: surface the RAW per-tid idx on each side of the
# divergence in addition to the matched-prefix position. The
# `diverged_at` value above is the matched-prefix offset (the
# historical "tid_event_idx" label is a misnomer — it equals the
# raw idx only when absorbers haven't fired upstream). Reading-
# error #41 conflated the two. We keep the legacy field for
# backward compatibility of report consumers and add the raw
# idxs explicitly.
post_c = r.get("post_canary") or {}
post_o = r.get("post_ours") or {}
raw_c = post_c.get("tid_event_idx", "?")
raw_o = post_o.get("tid_event_idx", "?")
out.append(
f"First divergence at matched-prefix position {r['diverged_at']} "
f"(canary raw tid_event_idx={raw_c}, ours raw tid_event_idx={raw_o}): "
f"{r['diff_descr']}"
)
out.append("")
out.append("**Pre-context (last 5 matching events):**")
out.append("```")
for ec, eo in r["pre_context"]:
out.append(f" canary: {render_event(ec)}")
out.append(f" ours: {render_event(eo)}")
out.append("```")
out.append("")
out.append("**Divergent event:**")
out.append("```")
out.append(f" canary: {render_event(r['post_canary'])}")
out.append(f" ours: {render_event(r['post_ours'])}")
out.append("```")
out.append("")
out.append("**Next event after the divergence (if any):**")
out.append("```")
if r["next_canary"]:
out.append(f" canary: {render_event(r['next_canary'])}")
else:
out.append(" canary: <end of stream>")
if r["next_ours"]:
out.append(f" ours: {render_event(r['next_ours'])}")
else:
out.append(" ours: <end of stream>")
out.append("```")
out.append("")
out.append("**Raw events (JSON):**")
out.append("```json")
out.append(json.dumps(r["post_canary"], sort_keys=True))
out.append(json.dumps(r["post_ours"], sort_keys=True))
out.append("```")
out.append("")
return "\n".join(out)
def main() -> int:
ap = argparse.ArgumentParser(description="Phase A event-log diff tool")
ap.add_argument("--canary", required=True, type=Path)
ap.add_argument("--ours", required=True, type=Path)
ap.add_argument("--out", type=Path, help="Write markdown report here (else stdout)")
ap.add_argument(
"--tid-map",
type=str,
help="Manual tid mapping like '6=1,7=2'. Overrides auto-mapping.",
)
ap.add_argument(
"--validate-identical",
action="store_true",
help="Exit non-zero if any mapped tid pair has any divergence. "
"Used by gate-4 negative-test and by self-diff smoke tests.",
)
ap.add_argument(
"--no-canonicalize-allocators",
action="store_true",
help="Disable per-tid ordinal canonicalization of allocator return "
"values (default: enabled). See ALLOCATOR_RETURN_FNS for the "
"covered set. Disabling reproduces the raw-VA comparison.",
)
ap.add_argument(
"--no-canonicalize-host-heap-fields",
action="store_true",
help="Disable per-tid ordinal canonicalization of host-heap-derived "
"guest VA payload fields (default: enabled). See "
"HOST_HEAP_PAYLOAD_FIELDS_BY_KIND for the covered set (Phase C+22 "
"v1.7: `thread.create.ctx_ptr`). Disabling reproduces the raw-VA "
"comparison and re-surfaces the AUDIT-043 ε allocator-drift class.",
)
ap.add_argument(
"--disable-absorber",
type=str,
default="",
help="Phase absorber-review (investigation-only): comma-separated "
"list of absorbers to disable. Valid names: "
"shared-global (C+18 handle.create), wait-begin (C+21 wait.begin), "
"nested-cs (D-extension RtlEnter/Leave fold). Empty default keeps "
"all absorbers ON (production behavior). Use to isolate which "
"absorber suppresses which divergence.",
)
ap.add_argument(
"--emit-absorbed-events",
type=Path,
default=None,
help="Phase absorber-review (investigation-only): write every "
"absorbed event to a JSONL file at this path. Each line is a "
"JSON object with keys: absorber, side, canary_tid, ours_tid, "
"matched_at, event (verbatim), and for nested-cs also "
"pairs_consumed and window_offset.",
)
args = ap.parse_args()
VALID_ABSORBERS = {"shared-global", "wait-begin", "nested-cs"}
disabled_absorbers: frozenset[str]
if args.disable_absorber.strip():
names = {
tok.strip()
for tok in args.disable_absorber.split(",")
if tok.strip()
}
unknown = names - VALID_ABSORBERS
if unknown:
sys.stderr.write(
f"--disable-absorber: unknown name(s) {sorted(unknown)!r}. "
f"Valid: {sorted(VALID_ABSORBERS)!r}\n"
)
return 2
disabled_absorbers = frozenset(names)
else:
disabled_absorbers = frozenset()
absorbed_sink: list[dict] | None = (
[] if args.emit_absorbed_events else None
)
canary_evs = load_events(args.canary)
ours_evs = load_events(args.ours)
if not args.no_canonicalize_allocators:
canonicalize_allocator_returns(canary_evs)
canonicalize_allocator_returns(ours_evs)
if not args.no_canonicalize_host_heap_fields:
canonicalize_host_heap_payload_fields(canary_evs)
canonicalize_host_heap_payload_fields(ours_evs)
if args.tid_map:
tid_map = parse_tid_map_arg(args.tid_map)
else:
tid_map = auto_tid_map(canary_evs, ours_evs)
if not tid_map:
sys.stderr.write(
"no tid mapping (auto-mapping found no shared first-kernel-call). "
"Pass --tid-map manually.\n"
)
return 2
# Phase C+18 + C+21: pre-pass to collect all shared-global SIDs across
# both engines and all tids. Used by `diff_one_tid` to recognize
# "floating" `handle.create` events (C+18) and `wait.begin` events
# (C+21) whose presence on one side but not the other is
# observation-side — a different first-toucher thread (C+18) or a
# contention-jitter-driven slow-path entry (C+21). See schema-v1.md
# §"Shared-global SIDs" and §"Wait-begin floating absorb".
floating_sids = collect_shared_global_sids(canary_evs, ours_evs)
per_tid: list[dict] = []
for c_tid, o_tid in sorted(tid_map.items()):
if c_tid not in canary_evs:
sys.stderr.write(f"warn: canary tid {c_tid} not in stream; skipping\n")
continue
if o_tid not in ours_evs:
sys.stderr.write(f"warn: ours tid {o_tid} not in stream; skipping\n")
continue
per_tid.append(
diff_one_tid(
canary_evs[c_tid],
ours_evs[o_tid],
c_tid,
o_tid,
cross_tid_floating_sids=floating_sids,
disabled_absorbers=disabled_absorbers,
absorbed_sink=absorbed_sink,
)
)
if absorbed_sink is not None and args.emit_absorbed_events is not None:
with args.emit_absorbed_events.open("w", encoding="utf-8") as f:
for rec in absorbed_sink:
f.write(json.dumps(rec, sort_keys=True))
f.write("\n")
sys.stderr.write(
f"emitted {len(absorbed_sink)} absorbed events to "
f"{args.emit_absorbed_events}\n"
)
# v1.6 (AUDIT-070 bridge): count `sema.release` (and any future
# counted engine-local kinds) per-engine for surfacing in the
# report. These do not participate in matched-prefix.
counted_canary = count_engine_local_kinds(canary_evs)
counted_ours = count_engine_local_kinds(ours_evs)
report = render_report(per_tid, counted_canary, counted_ours)
if args.out:
args.out.write_text(report, encoding="utf-8")
sys.stderr.write(f"diff report written to {args.out}\n")
else:
sys.stdout.write(report)
if args.validate_identical:
for r in per_tid:
if r["diverged_at"] is not None:
sys.stderr.write(
f"validate-identical: divergence in canary_tid={r['canary_tid']} "
f"at tid_event_idx={r['diverged_at']} ({r['diff_descr']})\n"
)
return 1
return 0
if __name__ == "__main__":
sys.exit(main())