#!/usr/bin/env python3 """Phase D Stage 2 — contention-manifest builder. Reads a Phase A JSONL event log produced by canary with cvar `kernel_emit_contention=true` (Stage 1) and distills it to a replay-ready manifest for Stage 3 to consume. Output schema (`contention_manifest.json`): { "version": 1, "source_canary_jsonl": "", "source_canary_sha256": "", "built_at_host_unix": , "summary": { "total_input_events": , "total_contention_events_kept": , "per_tid_counts": { "": , ... } }, "entries": [ { "tid": 6, "tid_event_idx": 104664, "site_sid": "c26a128bf45411f7", "cs_ptr": "0xbc65c890", "contended": true }, ... ] } Entries are sorted by (tid asc, tid_event_idx asc). Stage 3's ours-side replay loader keys on `(tid, tid_event_idx)`; the canary tid is the *native* tid emitted by canary (no display-mapping is applied here — see investigation.md §"Tid mapping is per-engine native"). Only events with `kind == "contention.observed"` and `contended == true` are kept. Stage 1's emitter never emits `contended=false`, so this filter is paranoid-defensive. Schema events / handle events / wait events are dropped. Usage: python3 build_contention_manifest.py \\ --canary-jsonl path/to/canary-cvaron-trunc.jsonl \\ --out path/to/contention_manifest.json Exit 0 on success. Exit 1 on parse error or empty manifest (no contention events found — likely cvar wasn't enabled when the trace was captured). """ import argparse import hashlib import json import sys import time from pathlib import Path def parse_args() -> argparse.Namespace: p = argparse.ArgumentParser(description=__doc__.splitlines()[0]) p.add_argument( "--canary-jsonl", required=True, help="Path to canary Phase A JSONL log (with cvar=true).", ) p.add_argument( "--out", required=True, help="Output path for contention_manifest.json.", ) p.add_argument( "--tid-map", default="", help=( "Optional canary→ours tid translation. Format " "'CANARY=OURS,CANARY=OURS,...' (e.g. '6=1,7=2,4=11'). When " "supplied, manifest entries are emitted with the ours-side tid " "so the Stage-3 consumer can key on its own native current_tid. " "Entries on a canary tid NOT in the map are dropped with a " "warning. Same format as diff_events.py." ), ) p.add_argument( "--quiet", action="store_true", help="Suppress the human-readable summary on stderr.", ) return p.parse_args() def parse_tid_map(s: str) -> dict[int, int] | None: """Parse 'a=b,c=d' into {a: b, c: d}. Empty/None → None.""" s = s.strip() if not s: return None out: dict[int, int] = {} for piece in s.split(","): piece = piece.strip() if not piece: continue if "=" not in piece: raise ValueError(f"bad tid-map fragment: {piece!r}") l, r = piece.split("=", 1) out[int(l.strip())] = int(r.strip()) return out def sha256_of(path: Path) -> str: h = hashlib.sha256() with path.open("rb") as f: for chunk in iter(lambda: f.read(1 << 20), b""): h.update(chunk) return h.hexdigest() def build_manifest( jsonl_path: Path, tid_map: dict[int, int] | None = None, ) -> dict: """Read `jsonl_path` and return a manifest dict. If `tid_map` (canary_tid → ours_tid) is provided, entries are written with the translated ours-side tid. Entries on a canary tid not in the map are dropped (counted in `summary.skipped_unmapped_tids`). When `tid_map` is None, manifest tids are canary's native values (back-compat with Stage 2's first iteration). Raises FileNotFoundError / json.JSONDecodeError on bad input. """ entries: list[dict] = [] total_input = 0 bad_lines = 0 unmapped = 0 with jsonl_path.open("r", encoding="utf-8") as f: for lineno, line in enumerate(f, start=1): line = line.rstrip("\n") if not line: continue total_input += 1 try: ev = json.loads(line) except json.JSONDecodeError: bad_lines += 1 continue if ev.get("kind") != "contention.observed": continue payload = ev.get("payload") or {} if payload.get("contended") is not True: continue canary_tid = int(ev["tid"]) if tid_map is not None: if canary_tid not in tid_map: unmapped += 1 continue tid = tid_map[canary_tid] else: tid = canary_tid entry = { "tid": tid, "tid_event_idx": int(ev["tid_event_idx"]), "site_sid": str(payload.get("site_sid", "")), "cs_ptr": str(payload.get("cs_ptr", "")), "contended": True, } # Defensive: every Stage 1 event carries cs_ptr + site_sid. # If either is missing, skip rather than emit a broken entry. if not entry["site_sid"] or not entry["cs_ptr"]: bad_lines += 1 continue entries.append(entry) # Stable sort by (tid, tid_event_idx). Same (tid, idx) pair is not # expected — the per-tid counter is monotone — but if duplicates # appear (e.g. mis-merged jsonls), keep the first; later phases would # otherwise see ambiguous manifest keys. entries.sort(key=lambda e: (e["tid"], e["tid_event_idx"])) deduped: list[dict] = [] seen: set[tuple[int, int]] = set() dup_count = 0 for e in entries: key = (e["tid"], e["tid_event_idx"]) if key in seen: dup_count += 1 continue seen.add(key) deduped.append(e) per_tid: dict[str, int] = {} for e in deduped: per_tid[str(e["tid"])] = per_tid.get(str(e["tid"]), 0) + 1 return { "version": 1, "source_canary_jsonl": str(jsonl_path.resolve()), "source_canary_sha256": sha256_of(jsonl_path), "built_at_host_unix": int(time.time()), "tid_map": tid_map, "summary": { "total_input_events": total_input, "total_contention_events_kept": len(deduped), "skipped_bad_lines": bad_lines, "skipped_unmapped_tids": unmapped, "skipped_duplicate_keys": dup_count, "per_tid_counts": per_tid, }, "entries": deduped, } def render_summary(manifest: dict) -> str: s = manifest["summary"] lines = [ f"contention manifest built from {manifest['source_canary_jsonl']}", f" source sha256: {manifest['source_canary_sha256']}", f" total input events scanned: {s['total_input_events']}", f" contention events kept: {s['total_contention_events_kept']}", f" bad/skipped lines: {s['skipped_bad_lines']}", f" duplicate (tid,idx) skipped: {s['skipped_duplicate_keys']}", " per-tid counts:", ] for tid, count in sorted(s["per_tid_counts"].items(), key=lambda kv: int(kv[0])): lines.append(f" tid={int(tid):4d} {count}") return "\n".join(lines) def main() -> int: args = parse_args() src = Path(args.canary_jsonl) if not src.is_file(): print(f"error: not a file: {src}", file=sys.stderr) return 1 try: tid_map = parse_tid_map(args.tid_map) except ValueError as e: print(f"error: --tid-map: {e}", file=sys.stderr) return 1 manifest = build_manifest(src, tid_map=tid_map) if manifest["summary"]["total_contention_events_kept"] == 0: print( "error: 0 contention.observed events found — was the trace " "captured with --kernel_emit_contention=true?", file=sys.stderr, ) return 1 out = Path(args.out) out.parent.mkdir(parents=True, exist_ok=True) with out.open("w", encoding="utf-8") as f: json.dump(manifest, f, indent=2) f.write("\n") if not args.quiet: print(render_summary(manifest), file=sys.stderr) return 0 if __name__ == "__main__": sys.exit(main())