diff --git a/events/materializer.py b/events/materializer.py index 550415e0..cd0bbf24 100644 --- a/events/materializer.py +++ b/events/materializer.py @@ -94,6 +94,56 @@ async def replay_new_events(self, inner_adapter) -> int: payload.get("commit_hash", ""), payload.get("repo_path", ""), ) replayed += 1 + elif etype == "decision_ratified.completed": + # Resolve canonical_id → local decision_id; the + # event was emitted by a peer whose local + # decision_id is meaningless in this DB. + from ledger.queries import find_decision_by_canonical_id + + local_id = await find_decision_by_canonical_id( + inner_adapter._client, + payload.get("canonical_id", ""), + ) + if local_id is None: + logger.warning( + "[materializer] skipping decision_ratified — " + "canonical_id %r not found locally (ingest event missing or out-of-order)", + payload.get("canonical_id"), + ) + continue + await inner_adapter.apply_ratify( + local_id, + payload.get("signoff", {}), + ) + replayed += 1 + elif etype == "decision_superseded.completed": + from ledger.queries import find_decision_by_canonical_id + + local_new = await find_decision_by_canonical_id( + inner_adapter._client, + payload.get("new_canonical_id", ""), + ) + local_old = await find_decision_by_canonical_id( + inner_adapter._client, + payload.get("old_canonical_id", ""), + ) + if local_new is None or local_old is None: + logger.warning( + "[materializer] skipping decision_superseded — " + "canonical_id resolution failed (new=%r old=%r)", + payload.get("new_canonical_id"), + payload.get("old_canonical_id"), + ) + continue + await inner_adapter.apply_supersede( + new_id=local_new, + old_id=local_old, + signer=payload.get("signer", ""), + signoff_note=payload.get("signoff_note", ""), + superseded_at=payload.get("superseded_at", ""), + session_id=payload.get("session_id", ""), + ) + replayed += 1 new_offsets[author] = f.tell() if new_offsets != offsets: diff --git a/events/team_adapter.py b/events/team_adapter.py index 4583c9d3..a4ecfae0 100644 --- a/events/team_adapter.py +++ b/events/team_adapter.py @@ -10,6 +10,8 @@ import logging from pathlib import Path +from ledger.queries import find_decision_by_canonical_id, get_canonical_id + from .materializer import EventMaterializer from .writer import EventFileWriter @@ -138,6 +140,63 @@ async def bind_decision( purpose=purpose, ) + async def apply_ratify(self, decision_id: str, signoff: dict) -> str: + """Emit decision_ratified event, then delegate to inner adapter. + + The event payload carries ``canonical_id`` so cross-author replay + can resolve to the peer's local decision row. + """ + await self._ensure_ready() + canonical_id = await get_canonical_id(self._inner._client, decision_id) + self._writer.write( + "decision_ratified.completed", + { + "canonical_id": canonical_id, + "decision_id": decision_id, + "signoff": signoff, + }, + ) + return await self._inner.apply_ratify(decision_id, signoff) + + async def apply_supersede( + self, + new_id: str, + old_id: str, + signer: str = "", + signoff_note: str = "", + superseded_at: str = "", + session_id: str = "", + ) -> dict: + """Emit decision_superseded event, then delegate to inner adapter. + + The event payload carries canonical_ids for both decisions so + cross-author replay can resolve to the peer's local rows. + """ + await self._ensure_ready() + new_canonical = await get_canonical_id(self._inner._client, new_id) + old_canonical = await get_canonical_id(self._inner._client, old_id) + self._writer.write( + "decision_superseded.completed", + { + "new_canonical_id": new_canonical, + "old_canonical_id": old_canonical, + "new_id": new_id, + "old_id": old_id, + "signer": signer, + "signoff_note": signoff_note, + "superseded_at": superseded_at, + "session_id": session_id, + }, + ) + return await self._inner.apply_supersede( + new_id=new_id, + old_id=old_id, + signer=signer, + signoff_note=signoff_note, + superseded_at=superseded_at, + session_id=session_id, + ) + async def wipe_all_rows(self, repo: str) -> None: """Wipe the DB then reset the event watermark. diff --git a/handlers/ratify.py b/handlers/ratify.py index e6bd5249..cf8a7c4a 100644 --- a/handlers/ratify.py +++ b/handlers/ratify.py @@ -16,7 +16,11 @@ from datetime import datetime, timezone from contracts import RatifyResponse -from ledger.queries import decision_exists, project_decision_status, update_decision_status +from ledger.queries import decision_exists, project_decision_status +# triage-adapt: dropped preflight_telemetry import from auto-merge — module +# is on dev (#65 preflight telemetry) but not on triage; the cherry-picked +# body doesn't actually reference it (intent of e6d4b8f for this file is +# routing through ledger.apply_ratify, not adding telemetry) logger = logging.getLogger(__name__) @@ -90,13 +94,9 @@ async def handle_ratify( "note": note, } - await client.query( - f"UPDATE {decision_id} SET signoff = $signoff", - {"signoff": signoff}, - ) - - projected = await project_decision_status(client, decision_id) - await update_decision_status(client, decision_id, projected) + # Routes through TeamWriteAdapter when in team mode so the signoff + # change is emitted as a decision_ratified.completed event. + projected = await ledger.apply_ratify(decision_id, signoff) logger.info( "[ratify] decision=%s action=%s signer=%s projected_status=%s", diff --git a/handlers/resolve_collision.py b/handlers/resolve_collision.py index 4027258c..eb739b3f 100644 --- a/handlers/resolve_collision.py +++ b/handlers/resolve_collision.py @@ -28,7 +28,6 @@ decision_exists, project_decision_status, relate_context_for, - relate_supersedes, update_decision_status, ) @@ -71,35 +70,20 @@ async def handle_resolve_collision( if not await decision_exists(client, old_id): raise ValueError(f"No decision row for old_id={old_id}") - # Write supersedes edge (idempotent) - await relate_supersedes( - client, new_id, old_id, - confidence=1.0, - reason=f"human-confirmed supersession via resolve_collision session={_session_id}", + # Routes through TeamWriteAdapter when in team mode so the + # supersession is emitted as a decision_superseded.completed + # event. The adapter handles edge creation + frozen-signoff + # merge so the old decision's prior ratification record is + # preserved (drift sweeps skip signoff.state='superseded'). + result = await ledger.apply_supersede( + new_id=new_id, + old_id=old_id, + signer=_session_id, + signoff_note="", + superseded_at=_now_iso, + session_id=_session_id, ) - - # Mark old decision as superseded in signoff (not status). - # Supersession is a human editorial decision, not a code-compliance observation. - # The old decision's status field retains its last code-compliance value - # and is frozen — drift sweeps skip decisions where signoff.state='superseded'. - # Merge with existing signoff so a prior ratification record is preserved. - _existing_rows = await client.query( - f"SELECT signoff FROM {old_id} LIMIT 1" - ) - _old_signoff: dict = {} - if _existing_rows and isinstance(_existing_rows[0], dict): - _old_signoff = _existing_rows[0].get("signoff") or {} - await client.execute( - f"UPDATE {old_id} SET signoff = $s", - {"s": { - **_old_signoff, - "state": "superseded", - "superseded_by": new_id, - "superseded_at": _now_iso, - "session_id": _session_id, - }}, - ) - old_status = "superseded" + old_status = result.get("old_status", "superseded") logger.info( "[resolve_collision] supersede: %s supersedes %s", new_id, old_id diff --git a/ledger/adapter.py b/ledger/adapter.py index dbd27e12..bee2b755 100644 --- a/ledger/adapter.py +++ b/ledger/adapter.py @@ -39,6 +39,7 @@ relate_binds_to, relate_has_identity, relate_locates, + relate_supersedes, relate_yields, search_by_bm25, update_decision_status, @@ -1115,3 +1116,66 @@ async def wipe_all_rows(self, repo: str) -> None: if db_path: shutil.rmtree(db_path, ignore_errors=True) await self._ensure_connected() + + # ── Decision signoff write path (#97 event vocabulary) ──────────── + # Both methods are idempotent so the materializer can replay them + # safely. Handlers do their own pre-write idempotency / collision + # checks; the adapter just performs the write and re-projects status. + + async def apply_ratify(self, decision_id: str, signoff: dict) -> str: + """Write a ratify/reject signoff and re-project the decision's status. + + Idempotent. Returns the projected decision status after the write. + """ + await self._ensure_connected() + await self._client.query( + f"UPDATE {decision_id} SET signoff = $signoff", + {"signoff": signoff}, + ) + projected = await project_decision_status(self._client, decision_id) + await update_decision_status(self._client, decision_id, projected) + return projected + + async def apply_supersede( + self, + new_id: str, + old_id: str, + signer: str = "", + signoff_note: str = "", + superseded_at: str = "", + session_id: str = "", + ) -> dict: + """Write the supersedes edge and freeze the old decision's signoff. + + Idempotent: ``relate_supersedes`` upserts the edge and the signoff + UPDATE is a full overwrite. Returns ``{"old_status": "superseded"}``. + """ + await self._ensure_connected() + await relate_supersedes( + self._client, + new_id, + old_id, + confidence=1.0, + reason=( + f"human-confirmed supersession via resolve_collision session={session_id}" + ), + ) + rows = await self._client.query(f"SELECT signoff FROM {old_id} LIMIT 1") + old_signoff: dict = {} + if rows and isinstance(rows[0], dict): + old_signoff = rows[0].get("signoff") or {} + await self._client.execute( + f"UPDATE {old_id} SET signoff = $s", + { + "s": { + **old_signoff, + "state": "superseded", + "superseded_by": new_id, + "superseded_at": superseded_at, + "session_id": session_id, + "signer": signer, + "note": signoff_note, + } + }, + ) + return {"old_status": "superseded"} diff --git a/ledger/queries.py b/ledger/queries.py index 28bcb19a..42d02276 100644 --- a/ledger/queries.py +++ b/ledger/queries.py @@ -942,6 +942,47 @@ async def update_decision_status( ) +# ── canonical_id ↔ decision_id resolution (#97 event replay) ────────── +# Decision rows carry both a SurrealDB-generated ``id`` (e.g. ``decision:abc``) +# and a content-addressed ``canonical_id`` (UUIDv5 from description + +# source_type + source_ref). The local id is per-DB; canonical_id is +# stable across authors and machines, so it's the only id safe to ship +# across the JSONL event log. + +async def get_canonical_id( + client: LedgerClient, + decision_id: str, +) -> str | None: + """Return the canonical_id for a local decision row, or None.""" + rows = await client.query( + f"SELECT canonical_id FROM {decision_id} LIMIT 1", + ) + if rows and isinstance(rows[0], dict): + cid = rows[0].get("canonical_id") + return str(cid) if cid else None + return None + + +async def find_decision_by_canonical_id( + client: LedgerClient, + canonical_id: str, +) -> str | None: + """Return the local decision id for a canonical_id, or None.""" + if not canonical_id: + return None + rows = await client.query( + "SELECT id FROM decision WHERE canonical_id = $cid LIMIT 1", + {"cid": canonical_id}, + ) + if rows and isinstance(rows[0], dict): + did = rows[0].get("id") + return str(did) if did else None + return None + + +# triage-adapt: dropped #77 update_decision_level block from auto-merge — +# triage doesn't carry decision_level work (PR #107, v0.16.0); not part +# of e6d4b8f's actual +38-line diff for #97 async def update_region_hash( client: LedgerClient, region_id: str, diff --git a/tests/test_team_event_replay.py b/tests/test_team_event_replay.py new file mode 100644 index 00000000..78647190 --- /dev/null +++ b/tests/test_team_event_replay.py @@ -0,0 +1,195 @@ +"""Round-trip tests for the team event log replay path (#97). + +For each decision-status event type: + 1. Setup team mode: inner adapter (memory://) wrapped in TeamWriteAdapter + 2. Mutate state via the adapter (writes JSONL + DB) + 3. Spin up a fresh adapter pointing at the same JSONL log but a fresh + memory:// inner DB and a fresh watermark + 4. Connect — triggers materializer replay from offset 0 + 5. Assert the fresh DB ends up in the same end-state + +Covers the new event vocabulary added in this PR: + - decision_ratified.completed + - decision_superseded.completed + +Plus regression coverage for the pre-existing emit/replay surface: + - ingest.completed (decision row + signoff round-trip) +""" + +from __future__ import annotations + +from pathlib import Path + +import pytest + +from events.materializer import EventMaterializer +from events.team_adapter import TeamWriteAdapter +from events.writer import EventFileWriter +from ledger.adapter import SurrealDBLedgerAdapter +from ledger.queries import find_decision_by_canonical_id, get_canonical_id + + +def _build_team_adapter( + events_dir: Path, + local_dir: Path, + author: str = "tester@example.com", +) -> tuple[TeamWriteAdapter, SurrealDBLedgerAdapter]: + """Wire up an in-memory inner adapter + JSONL event log + materializer.""" + inner = SurrealDBLedgerAdapter(url="memory://") + writer = EventFileWriter(events_dir, author) + materializer = EventMaterializer(events_dir, local_dir) + return TeamWriteAdapter(inner, writer, materializer), inner + + +def _payload(intent: str, source_ref: str) -> dict: + """Minimal single-decision payload for ingest_payload.""" + return { + "query": intent, + "repo": "test-repo", + "commit_hash": "deadbeef00000000000000000000000000000000", + "analyzed_at": "2026-04-29T12:00:00Z", + "mappings": [ + { + "span": { + "span_id": f"span-{source_ref}", + "source_type": "transcript", + "text": intent, + "speaker": "Tester", + "source_ref": source_ref, + }, + "intent": intent, + "symbols": [], + "code_regions": [], + "dependency_edges": [], + } + ], + } + + +@pytest.mark.asyncio +async def test_ratify_event_roundtrip(tmp_path: Path) -> None: + """A ratify on the live adapter replays into a fresh adapter's DB. + + Cross-DB lookup goes through canonical_id since SurrealDB-generated + decision ids are per-DB. + """ + events_dir = tmp_path / "events" + local_dir_a = tmp_path / "local_a" + + team_a, inner_a = _build_team_adapter(events_dir, local_dir_a) + await team_a.connect() + + res = await team_a.ingest_payload(_payload("ratify-roundtrip", "rt-mtg")) + decision_id_a = res["created_decisions"][0]["decision_id"] + canonical = await get_canonical_id(inner_a._client, decision_id_a) + assert canonical, "canonical_id not stamped on decision row" + + signoff = { + "state": "ratified", + "signer": "tester", + "note": "round-trip", + "ratified_at": "2026-04-29T13:00:00Z", + } + await team_a.apply_ratify(decision_id_a, signoff) + + rows = await inner_a._client.query( + f"SELECT signoff FROM {decision_id_a} LIMIT 1" + ) + assert rows and rows[0]["signoff"]["state"] == "ratified" + + # Fresh adapter, same JSONL log, fresh watermark — replay from 0. + local_dir_b = tmp_path / "local_b" + team_b, inner_b = _build_team_adapter(events_dir, local_dir_b) + await team_b.connect() + + decision_id_b = await find_decision_by_canonical_id(inner_b._client, canonical) + assert decision_id_b, "ingest event did not replay (no row for canonical_id)" + rows_b = await inner_b._client.query( + f"SELECT signoff FROM {decision_id_b} LIMIT 1" + ) + replayed_signoff = rows_b[0].get("signoff") or {} + assert replayed_signoff.get("state") == "ratified", ( + "decision_ratified.completed event did not replay; " + f"got signoff={replayed_signoff!r}" + ) + + +@pytest.mark.asyncio +async def test_supersede_event_roundtrip(tmp_path: Path) -> None: + """A supersede on the live adapter replays edge + frozen signoff.""" + events_dir = tmp_path / "events" + local_dir_a = tmp_path / "local_a" + + team_a, inner_a = _build_team_adapter(events_dir, local_dir_a) + await team_a.connect() + + r_old = await team_a.ingest_payload(_payload("old-decision", "old-mtg")) + r_new = await team_a.ingest_payload(_payload("new-decision", "new-mtg")) + old_id_a = r_old["created_decisions"][0]["decision_id"] + new_id_a = r_new["created_decisions"][0]["decision_id"] + old_canonical = await get_canonical_id(inner_a._client, old_id_a) + new_canonical = await get_canonical_id(inner_a._client, new_id_a) + assert old_canonical and new_canonical + + await team_a.apply_supersede( + new_id=new_id_a, + old_id=old_id_a, + signer="tester", + signoff_note="superseding for round-trip", + superseded_at="2026-04-29T13:00:00Z", + session_id="test-session", + ) + + rows = await inner_a._client.query(f"SELECT signoff FROM {old_id_a} LIMIT 1") + assert rows and rows[0]["signoff"]["state"] == "superseded" + + local_dir_b = tmp_path / "local_b" + team_b, inner_b = _build_team_adapter(events_dir, local_dir_b) + await team_b.connect() + + old_id_b = await find_decision_by_canonical_id(inner_b._client, old_canonical) + new_id_b = await find_decision_by_canonical_id(inner_b._client, new_canonical) + assert old_id_b and new_id_b, "ingest events did not replay (canonical lookup failed)" + + rows_b = await inner_b._client.query(f"SELECT signoff FROM {old_id_b} LIMIT 1") + replayed = rows_b[0].get("signoff") or {} + assert replayed.get("state") == "superseded", ( + "decision_superseded.completed event did not replay; " + f"got signoff={replayed!r}" + ) + assert replayed.get("superseded_by") == new_id_b + + edge_rows = await inner_b._client.query( + f"SELECT id FROM supersedes WHERE in = {new_id_b} AND out = {old_id_b} LIMIT 1" + ) + assert edge_rows, "supersedes edge did not replay" + + +@pytest.mark.asyncio +async def test_ingest_event_roundtrip_regression(tmp_path: Path) -> None: + """Pre-existing ingest.completed emit/replay still works. + + This is the regression guard for the existing event vocabulary — + ensures the new emit calls did not perturb the established path. + """ + events_dir = tmp_path / "events" + local_dir_a = tmp_path / "local_a" + + team_a, _ = _build_team_adapter(events_dir, local_dir_a) + await team_a.connect() + + res = await team_a.ingest_payload(_payload("regression-intent", "reg-mtg")) + decision_id_a = res["created_decisions"][0]["decision_id"] + canonical = await get_canonical_id(team_a._inner._client, decision_id_a) + + local_dir_b = tmp_path / "local_b" + team_b, inner_b = _build_team_adapter(events_dir, local_dir_b) + await team_b.connect() + + decision_id_b = await find_decision_by_canonical_id(inner_b._client, canonical) + assert decision_id_b, "ingest.completed regression — canonical lookup failed" + rows = await inner_b._client.query( + f"SELECT description FROM {decision_id_b} LIMIT 1" + ) + assert rows, "ingest.completed regression — decision row missing after replay" + assert "regression-intent" in str(rows[0].get("description", ""))