Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions events/materializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,56 @@ async def replay_new_events(self, inner_adapter) -> int:
payload.get("commit_hash", ""), payload.get("repo_path", ""),
)
replayed += 1
elif etype == "decision_ratified.completed":
# Resolve canonical_id → local decision_id; the
# event was emitted by a peer whose local
# decision_id is meaningless in this DB.
from ledger.queries import find_decision_by_canonical_id

local_id = await find_decision_by_canonical_id(
inner_adapter._client,
payload.get("canonical_id", ""),
)
if local_id is None:
logger.warning(
"[materializer] skipping decision_ratified — "
"canonical_id %r not found locally (ingest event missing or out-of-order)",
payload.get("canonical_id"),
)
continue
await inner_adapter.apply_ratify(
local_id,
payload.get("signoff", {}),
)
replayed += 1
elif etype == "decision_superseded.completed":
from ledger.queries import find_decision_by_canonical_id

local_new = await find_decision_by_canonical_id(
inner_adapter._client,
payload.get("new_canonical_id", ""),
)
local_old = await find_decision_by_canonical_id(
inner_adapter._client,
payload.get("old_canonical_id", ""),
)
if local_new is None or local_old is None:
logger.warning(
"[materializer] skipping decision_superseded — "
"canonical_id resolution failed (new=%r old=%r)",
payload.get("new_canonical_id"),
payload.get("old_canonical_id"),
)
continue
await inner_adapter.apply_supersede(
new_id=local_new,
old_id=local_old,
signer=payload.get("signer", ""),
signoff_note=payload.get("signoff_note", ""),
superseded_at=payload.get("superseded_at", ""),
session_id=payload.get("session_id", ""),
)
replayed += 1
new_offsets[author] = f.tell()

if new_offsets != offsets:
Expand Down
59 changes: 59 additions & 0 deletions events/team_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import logging
from pathlib import Path

from ledger.queries import find_decision_by_canonical_id, get_canonical_id

from .materializer import EventMaterializer
from .writer import EventFileWriter

Expand Down Expand Up @@ -138,6 +140,63 @@ async def bind_decision(
purpose=purpose,
)

async def apply_ratify(self, decision_id: str, signoff: dict) -> str:
"""Emit decision_ratified event, then delegate to inner adapter.

The event payload carries ``canonical_id`` so cross-author replay
can resolve to the peer's local decision row.
"""
await self._ensure_ready()
canonical_id = await get_canonical_id(self._inner._client, decision_id)
self._writer.write(
"decision_ratified.completed",
{
"canonical_id": canonical_id,
"decision_id": decision_id,
"signoff": signoff,
},
)
return await self._inner.apply_ratify(decision_id, signoff)

async def apply_supersede(
self,
new_id: str,
old_id: str,
signer: str = "",
signoff_note: str = "",
superseded_at: str = "",
session_id: str = "",
) -> dict:
"""Emit decision_superseded event, then delegate to inner adapter.

The event payload carries canonical_ids for both decisions so
cross-author replay can resolve to the peer's local rows.
"""
await self._ensure_ready()
new_canonical = await get_canonical_id(self._inner._client, new_id)
old_canonical = await get_canonical_id(self._inner._client, old_id)
self._writer.write(
"decision_superseded.completed",
{
"new_canonical_id": new_canonical,
"old_canonical_id": old_canonical,
"new_id": new_id,
"old_id": old_id,
"signer": signer,
"signoff_note": signoff_note,
"superseded_at": superseded_at,
"session_id": session_id,
},
)
return await self._inner.apply_supersede(
new_id=new_id,
old_id=old_id,
signer=signer,
signoff_note=signoff_note,
superseded_at=superseded_at,
session_id=session_id,
)

async def wipe_all_rows(self, repo: str) -> None:
"""Wipe the DB then reset the event watermark.

Expand Down
16 changes: 8 additions & 8 deletions handlers/ratify.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@
from datetime import datetime, timezone

from contracts import RatifyResponse
from ledger.queries import decision_exists, project_decision_status, update_decision_status
from ledger.queries import decision_exists, project_decision_status
# triage-adapt: dropped preflight_telemetry import from auto-merge — module
# is on dev (#65 preflight telemetry) but not on triage; the cherry-picked
# body doesn't actually reference it (intent of e6d4b8f for this file is
# routing through ledger.apply_ratify, not adding telemetry)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -90,13 +94,9 @@ async def handle_ratify(
"note": note,
}

await client.query(
f"UPDATE {decision_id} SET signoff = $signoff",
{"signoff": signoff},
)

projected = await project_decision_status(client, decision_id)
await update_decision_status(client, decision_id, projected)
# Routes through TeamWriteAdapter when in team mode so the signoff
# change is emitted as a decision_ratified.completed event.
projected = await ledger.apply_ratify(decision_id, signoff)

logger.info(
"[ratify] decision=%s action=%s signer=%s projected_status=%s",
Expand Down
42 changes: 13 additions & 29 deletions handlers/resolve_collision.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
decision_exists,
project_decision_status,
relate_context_for,
relate_supersedes,
update_decision_status,
)

Expand Down Expand Up @@ -71,35 +70,20 @@ async def handle_resolve_collision(
if not await decision_exists(client, old_id):
raise ValueError(f"No decision row for old_id={old_id}")

# Write supersedes edge (idempotent)
await relate_supersedes(
client, new_id, old_id,
confidence=1.0,
reason=f"human-confirmed supersession via resolve_collision session={_session_id}",
# Routes through TeamWriteAdapter when in team mode so the
# supersession is emitted as a decision_superseded.completed
# event. The adapter handles edge creation + frozen-signoff
# merge so the old decision's prior ratification record is
# preserved (drift sweeps skip signoff.state='superseded').
result = await ledger.apply_supersede(
new_id=new_id,
old_id=old_id,
signer=_session_id,
signoff_note="",
superseded_at=_now_iso,
session_id=_session_id,
)

# Mark old decision as superseded in signoff (not status).
# Supersession is a human editorial decision, not a code-compliance observation.
# The old decision's status field retains its last code-compliance value
# and is frozen — drift sweeps skip decisions where signoff.state='superseded'.
# Merge with existing signoff so a prior ratification record is preserved.
_existing_rows = await client.query(
f"SELECT signoff FROM {old_id} LIMIT 1"
)
_old_signoff: dict = {}
if _existing_rows and isinstance(_existing_rows[0], dict):
_old_signoff = _existing_rows[0].get("signoff") or {}
await client.execute(
f"UPDATE {old_id} SET signoff = $s",
{"s": {
**_old_signoff,
"state": "superseded",
"superseded_by": new_id,
"superseded_at": _now_iso,
"session_id": _session_id,
}},
)
old_status = "superseded"
old_status = result.get("old_status", "superseded")

logger.info(
"[resolve_collision] supersede: %s supersedes %s", new_id, old_id
Expand Down
64 changes: 64 additions & 0 deletions ledger/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
relate_binds_to,
relate_has_identity,
relate_locates,
relate_supersedes,
relate_yields,
search_by_bm25,
update_decision_status,
Expand Down Expand Up @@ -1115,3 +1116,66 @@ async def wipe_all_rows(self, repo: str) -> None:
if db_path:
shutil.rmtree(db_path, ignore_errors=True)
await self._ensure_connected()

# ── Decision signoff write path (#97 event vocabulary) ────────────
# Both methods are idempotent so the materializer can replay them
# safely. Handlers do their own pre-write idempotency / collision
# checks; the adapter just performs the write and re-projects status.

async def apply_ratify(self, decision_id: str, signoff: dict) -> str:
"""Write a ratify/reject signoff and re-project the decision's status.

Idempotent. Returns the projected decision status after the write.
"""
await self._ensure_connected()
await self._client.query(
f"UPDATE {decision_id} SET signoff = $signoff",
{"signoff": signoff},
)
projected = await project_decision_status(self._client, decision_id)
await update_decision_status(self._client, decision_id, projected)
return projected

async def apply_supersede(
self,
new_id: str,
old_id: str,
signer: str = "",
signoff_note: str = "",
superseded_at: str = "",
session_id: str = "",
) -> dict:
"""Write the supersedes edge and freeze the old decision's signoff.

Idempotent: ``relate_supersedes`` upserts the edge and the signoff
UPDATE is a full overwrite. Returns ``{"old_status": "superseded"}``.
"""
await self._ensure_connected()
await relate_supersedes(
self._client,
new_id,
old_id,
confidence=1.0,
reason=(
f"human-confirmed supersession via resolve_collision session={session_id}"
),
)
rows = await self._client.query(f"SELECT signoff FROM {old_id} LIMIT 1")
old_signoff: dict = {}
if rows and isinstance(rows[0], dict):
old_signoff = rows[0].get("signoff") or {}
await self._client.execute(
f"UPDATE {old_id} SET signoff = $s",
{
"s": {
**old_signoff,
"state": "superseded",
"superseded_by": new_id,
"superseded_at": superseded_at,
"session_id": session_id,
"signer": signer,
"note": signoff_note,
}
},
)
return {"old_status": "superseded"}
41 changes: 41 additions & 0 deletions ledger/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -942,6 +942,47 @@ async def update_decision_status(
)


# ── canonical_id ↔ decision_id resolution (#97 event replay) ──────────
# Decision rows carry both a SurrealDB-generated ``id`` (e.g. ``decision:abc``)
# and a content-addressed ``canonical_id`` (UUIDv5 from description +
# source_type + source_ref). The local id is per-DB; canonical_id is
# stable across authors and machines, so it's the only id safe to ship
# across the JSONL event log.

async def get_canonical_id(
client: LedgerClient,
decision_id: str,
) -> str | None:
"""Return the canonical_id for a local decision row, or None."""
rows = await client.query(
f"SELECT canonical_id FROM {decision_id} LIMIT 1",
)
if rows and isinstance(rows[0], dict):
cid = rows[0].get("canonical_id")
return str(cid) if cid else None
return None


async def find_decision_by_canonical_id(
client: LedgerClient,
canonical_id: str,
) -> str | None:
"""Return the local decision id for a canonical_id, or None."""
if not canonical_id:
return None
rows = await client.query(
"SELECT id FROM decision WHERE canonical_id = $cid LIMIT 1",
{"cid": canonical_id},
)
if rows and isinstance(rows[0], dict):
did = rows[0].get("id")
return str(did) if did else None
return None


# triage-adapt: dropped #77 update_decision_level block from auto-merge —
# triage doesn't carry decision_level work (PR #107, v0.16.0); not part
# of e6d4b8f's actual +38-line diff for #97
async def update_region_hash(
client: LedgerClient,
region_id: str,
Expand Down
Loading
Loading