diff --git a/ledger/schema.py b/ledger/schema.py index b59f93f2..c80ed0b4 100644 --- a/ledger/schema.py +++ b/ledger/schema.py @@ -28,7 +28,7 @@ # - edges: yields(input_span→decision), binds_to(decision→code_region), # locates(symbol→code_region) # - removed: maps_to, implements -SCHEMA_VERSION = 22 +SCHEMA_VERSION = 23 # Maps schema version → minimum bicameral-mcp code version that understands it. # Used to produce actionable "upgrade your binary" messages. @@ -48,6 +48,7 @@ 17: "0.14.x", # re-runnable yields integrity cleanup; placeholder, release-eng pins final value at PR merge 18: "0.14.x", # decision.updated_at + idx_decision_updated_at (#87 precondition — revision marker for preflight dedup); placeholder, release-eng pins final value at PR merge 19: "0.14.x", # bicameral_meta.decision_revision counter + DEFINE EVENT on decision (#87 Phase 6 — constant-time replacement for ORDER BY DESC); placeholder, release-eng pins final value at PR merge + 23: "0.15.x", # decision_level backfill for legacy rows; placeholder, release-eng pins final value at PR merge } # SurrealDB error substrings that init_schema treats as recoverable: the row @@ -1322,6 +1323,109 @@ async def _migrate_v21_to_v22(client: LedgerClient) -> None: ) +async def _migrate_v22_to_v23(client: LedgerClient) -> None: + """v22 → v23: Backfill decision_level for legacy decisions. + + The v8→v9 migration added the decision_level field (DEFAULT NONE) but + did not classify existing rows. The #340 auto-classify heuristic only + runs on newly ingested decisions, so all pre-#340 rows remain NONE + (unclassified). Per the tolerant policy, NONE is treated as L3 — this + silently excludes legacy decisions from the codegenome identity graph. + + This migration applies the same deterministic heuristic used by + ``ledger.adapter._classify_decision_level`` at ingest time: + + 1. Has binds_to edge → L2 (architecture, code-grounded). + 2. source_type ∈ {transcript, notion, slack, document} → L1. + 3. source_type ∈ {implementation_choice, agent_session} → L3. + 4. Remaining → L2 (safe default — enters identity graph). + + Idempotent: only touches rows WHERE decision_level IS NONE. + + Legacy decision rows (from pre-v18 fixtures or ancient DBs) may + carry NONE values for required typed fields (``created_at``, + ``feature_hint``, etc.) that were added by later schema versions. + SurrealDB v2 re-validates the entire record on any UPDATE, so a + bulk ``UPDATE decision SET decision_level = ...`` fails on these + rows even though the migration only touches ``decision_level``. + We therefore UPDATE per-row and skip (with a warning) any row + whose record is too broken for an in-place patch. + """ + # Step 1: decisions with code-region bindings → L2. + # Bound decision IDs come from the binds_to edge table (not + # ``->binds_to->code_region IS NOT EMPTY``, which returns True for + # all rows in SurrealDB v2 embedded — known quirk). + bound_ids = await client.query("SELECT type::string(`in`) AS id FROM binds_to") + bound_id_set = {r["id"] for r in (bound_ids or []) if r.get("id")} + + # Fetch all unclassified decisions once for per-row processing. + unclassified = await client.query( + "SELECT type::string(id) AS id, source_type FROM decision WHERE decision_level IS NONE" + ) + unclassified = [r for r in (unclassified or []) if r.get("id")] + + product_sources = {"transcript", "notion", "slack", "document"} + impl_sources = {"implementation_choice", "agent_session"} + + bound_count = 0 + product_count = 0 + impl_count = 0 + fallback_count = 0 + skip_count = 0 + + for row in unclassified: + did = row["id"] + src = row.get("source_type") or "" + + if did in bound_id_set: + level = "L2" + counter = "bound" + elif src in product_sources: + level = "L1" + counter = "product" + elif src in impl_sources: + level = "L3" + counter = "impl" + else: + level = "L2" + counter = "fallback" + + try: + await client.execute( + f"UPDATE {did} SET decision_level = '{level}', updated_at = time::now()" + ) + except Exception: + # Row has NONE values in required typed fields from a pre-v18 + # fixture or ancient DB. Skip it — NONE is already treated as + # L3 by the tolerant policy, so the row remains functional. + skip_count += 1 + logger.debug( + "[migration] v22 → v23: skipping %s — record fails " + "re-validation (likely legacy fixture with missing fields)", + did, + ) + continue + + if counter == "bound": + bound_count += 1 + elif counter == "product": + product_count += 1 + elif counter == "impl": + impl_count += 1 + else: + fallback_count += 1 + + logger.info( + "[migration] v22 → v23: decision_level backfill — " + "%d bound→L2, %d product→L1, %d impl→L3, %d fallback→L2, %d skipped", + bound_count, + product_count, + impl_count, + fallback_count, + skip_count, + ) + + async def _write_wire_format_sentinel( client: LedgerClient, ) -> tuple[str | None, str | None, str]: @@ -1405,6 +1509,7 @@ async def _write_wire_format_sentinel( 20: _migrate_v19_to_v20, 21: _migrate_v20_to_v21, 22: _migrate_v21_to_v22, + 23: _migrate_v22_to_v23, } diff --git a/tests/test_v23_decision_level_backfill.py b/tests/test_v23_decision_level_backfill.py new file mode 100644 index 00000000..360d2f55 --- /dev/null +++ b/tests/test_v23_decision_level_backfill.py @@ -0,0 +1,252 @@ +"""v22 → v23 migration: backfill decision_level for legacy decisions. + +Sociable tests — real SurrealDB adapter over ``memory://``, real schema +init + migrate. Seeds decisions with various source_type and binding +states, then runs ``_migrate_v22_to_v23`` directly to simulate the +backfill on legacy rows whose decision_level was cleared to NONE. +""" + +from __future__ import annotations + +import pytest + +from ledger.client import LedgerClient +from ledger.schema import SCHEMA_VERSION, _migrate_v22_to_v23, init_schema, migrate + +_NS_COUNTER = 0 + + +async def _fresh_client() -> LedgerClient: + global _NS_COUNTER + _NS_COUNTER += 1 + c = LedgerClient(url="memory://", ns=f"v23_test_{_NS_COUNTER}", db="ledger_v23_test") + await c.connect() + await init_schema(c) + await migrate(c, allow_destructive=True) + return c + + +async def _seed_decision( + c: LedgerClient, + *, + description: str, + source_type: str = "manual", + decision_level: str | None = None, + canonical_id: str = "", +) -> str: + """Insert a decision and return its string id.""" + params: dict = { + "d": description, + "st": source_type, + "cid": canonical_id or f"cid-{description}", + } + if decision_level is not None: + rows = await c.query( + "CREATE decision SET description = $d, source_type = $st, " + "canonical_id = $cid, status = 'ungrounded', decision_level = $lvl", + {**params, "lvl": decision_level}, + ) + else: + rows = await c.query( + "CREATE decision SET description = $d, source_type = $st, " + "canonical_id = $cid, status = 'ungrounded'", + params, + ) + row = rows[0] + rid = row.get("id") + if isinstance(rid, dict): + return f"decision:{rid.get('id', rid)}" + return str(rid) + + +async def _seed_bound_decision( + c: LedgerClient, + *, + description: str, + source_type: str = "manual", + canonical_id: str = "", +) -> str: + """Insert a decision with a binds_to edge to a code_region (no decision_level).""" + did = await _seed_decision( + c, + description=description, + source_type=source_type, + canonical_id=canonical_id, + ) + await c.query( + "CREATE code_region SET file_path = $fp, symbol_name = $sn, " + "start_line = 1, end_line = 10, content_hash = 'abc123'", + {"fp": f"src/{description}.py", "sn": f"Sym_{description}"}, + ) + regions = await c.query( + "SELECT type::string(id) AS id FROM code_region WHERE file_path = $fp", + {"fp": f"src/{description}.py"}, + ) + rid = regions[0]["id"] + await c.execute(f"RELATE {did}->binds_to->{rid} SET confidence = 0.9, created_at = time::now()") + return did + + +async def _get_level(c: LedgerClient, did: str) -> str | None: + rows = await c.query(f"SELECT decision_level FROM {did} LIMIT 1") + if not rows: + return None + return rows[0].get("decision_level") + + +@pytest.mark.phase2 +@pytest.mark.asyncio +async def test_v23_schema_version() -> None: + """After migrate, schema version is >= 23.""" + c = await _fresh_client() + try: + rows = await c.query("SELECT version FROM schema_meta LIMIT 1") + assert rows + assert rows[0]["version"] == SCHEMA_VERSION + assert SCHEMA_VERSION >= 23 + finally: + await c.close() + + +@pytest.mark.phase2 +@pytest.mark.asyncio +async def test_v23_bound_decisions_become_l2() -> None: + """Decisions with binds_to edges are classified as L2.""" + c = await _fresh_client() + try: + did = await _seed_bound_decision(c, description="bound-arch") + await c.execute(f"UPDATE {did} SET decision_level = NONE") + assert await _get_level(c, did) is None + + await _migrate_v22_to_v23(c) + + assert await _get_level(c, did) == "L2" + finally: + await c.close() + + +@pytest.mark.phase2 +@pytest.mark.asyncio +async def test_v23_product_source_becomes_l1() -> None: + """Unbound decisions from product sources are classified as L1.""" + c = await _fresh_client() + try: + for st in ("transcript", "notion", "slack", "document"): + did = await _seed_decision(c, description=f"product-{st}", source_type=st) + await c.execute(f"UPDATE {did} SET decision_level = NONE") + assert await _get_level(c, did) is None + + await _migrate_v22_to_v23(c) + + for st in ("transcript", "notion", "slack", "document"): + rows = await c.query( + "SELECT type::string(id) AS id, decision_level FROM decision " + "WHERE description = $d", + {"d": f"product-{st}"}, + ) + assert rows, f"missing row for source_type={st}" + assert rows[0]["decision_level"] == "L1", ( + f"expected L1 for source_type={st}, got {rows[0]['decision_level']}" + ) + finally: + await c.close() + + +@pytest.mark.phase2 +@pytest.mark.asyncio +async def test_v23_impl_source_becomes_l3() -> None: + """Unbound decisions from implementation sources are classified as L3.""" + c = await _fresh_client() + try: + for st in ("implementation_choice", "agent_session"): + did = await _seed_decision(c, description=f"impl-{st}", source_type=st) + await c.execute(f"UPDATE {did} SET decision_level = NONE") + assert await _get_level(c, did) is None + + await _migrate_v22_to_v23(c) + + for st in ("implementation_choice", "agent_session"): + rows = await c.query( + "SELECT type::string(id) AS id, decision_level FROM decision " + "WHERE description = $d", + {"d": f"impl-{st}"}, + ) + assert rows, f"missing row for source_type={st}" + assert rows[0]["decision_level"] == "L3", ( + f"expected L3 for source_type={st}, got {rows[0]['decision_level']}" + ) + finally: + await c.close() + + +@pytest.mark.phase2 +@pytest.mark.asyncio +async def test_v23_unknown_source_becomes_l2() -> None: + """Unbound decisions with unknown source_type default to L2.""" + c = await _fresh_client() + try: + did = await _seed_decision(c, description="unknown-src", source_type="manual") + await c.execute(f"UPDATE {did} SET decision_level = NONE") + assert await _get_level(c, did) is None + + await _migrate_v22_to_v23(c) + + assert await _get_level(c, did) == "L2" + finally: + await c.close() + + +@pytest.mark.phase2 +@pytest.mark.asyncio +async def test_v23_does_not_overwrite_existing_level() -> None: + """Decisions that already have a decision_level are not touched.""" + c = await _fresh_client() + try: + did = await _seed_decision( + c, + description="already-classified", + source_type="transcript", + decision_level="L2", + ) + assert await _get_level(c, did) == "L2" + + await _migrate_v22_to_v23(c) + + assert await _get_level(c, did) == "L2" + finally: + await c.close() + + +@pytest.mark.phase2 +@pytest.mark.asyncio +async def test_v23_idempotent() -> None: + """Running the backfill twice is a no-op.""" + c = await _fresh_client() + try: + did = await _seed_decision(c, description="idempotent-probe", source_type="notion") + await c.execute(f"UPDATE {did} SET decision_level = NONE") + await _migrate_v22_to_v23(c) + assert await _get_level(c, did) == "L1" + + await _migrate_v22_to_v23(c) + assert await _get_level(c, did) == "L1" + finally: + await c.close() + + +@pytest.mark.phase2 +@pytest.mark.asyncio +async def test_v23_bound_product_source_still_l2() -> None: + """A product-source decision WITH bindings should be L2 (code-grounded + takes priority over source_type).""" + c = await _fresh_client() + try: + did = await _seed_bound_decision(c, description="bound-product", source_type="transcript") + await c.execute(f"UPDATE {did} SET decision_level = NONE") + assert await _get_level(c, did) is None + + await _migrate_v22_to_v23(c) + + assert await _get_level(c, did) == "L2" + finally: + await c.close()