Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 106 additions & 1 deletion ledger/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
# - edges: yields(input_span→decision), binds_to(decision→code_region),
# locates(symbol→code_region)
# - removed: maps_to, implements
SCHEMA_VERSION = 22
SCHEMA_VERSION = 23

# Maps schema version → minimum bicameral-mcp code version that understands it.
# Used to produce actionable "upgrade your binary" messages.
Expand All @@ -48,6 +48,7 @@
17: "0.14.x", # re-runnable yields integrity cleanup; placeholder, release-eng pins final value at PR merge
18: "0.14.x", # decision.updated_at + idx_decision_updated_at (#87 precondition — revision marker for preflight dedup); placeholder, release-eng pins final value at PR merge
19: "0.14.x", # bicameral_meta.decision_revision counter + DEFINE EVENT on decision (#87 Phase 6 — constant-time replacement for ORDER BY DESC); placeholder, release-eng pins final value at PR merge
23: "0.15.x", # decision_level backfill for legacy rows; placeholder, release-eng pins final value at PR merge
}

# SurrealDB error substrings that init_schema treats as recoverable: the row
Expand Down Expand Up @@ -1322,6 +1323,109 @@ async def _migrate_v21_to_v22(client: LedgerClient) -> None:
)


async def _migrate_v22_to_v23(client: LedgerClient) -> None:
"""v22 → v23: Backfill decision_level for legacy decisions.

The v8→v9 migration added the decision_level field (DEFAULT NONE) but
did not classify existing rows. The #340 auto-classify heuristic only
runs on newly ingested decisions, so all pre-#340 rows remain NONE
(unclassified). Per the tolerant policy, NONE is treated as L3 — this
silently excludes legacy decisions from the codegenome identity graph.

This migration applies the same deterministic heuristic used by
``ledger.adapter._classify_decision_level`` at ingest time:

1. Has binds_to edge → L2 (architecture, code-grounded).
2. source_type ∈ {transcript, notion, slack, document} → L1.
3. source_type ∈ {implementation_choice, agent_session} → L3.
4. Remaining → L2 (safe default — enters identity graph).

Idempotent: only touches rows WHERE decision_level IS NONE.

Legacy decision rows (from pre-v18 fixtures or ancient DBs) may
carry NONE values for required typed fields (``created_at``,
``feature_hint``, etc.) that were added by later schema versions.
SurrealDB v2 re-validates the entire record on any UPDATE, so a
bulk ``UPDATE decision SET decision_level = ...`` fails on these
rows even though the migration only touches ``decision_level``.
We therefore UPDATE per-row and skip (with a warning) any row
whose record is too broken for an in-place patch.
"""
# Step 1: decisions with code-region bindings → L2.
# Bound decision IDs come from the binds_to edge table (not
# ``->binds_to->code_region IS NOT EMPTY``, which returns True for
# all rows in SurrealDB v2 embedded — known quirk).
bound_ids = await client.query("SELECT type::string(`in`) AS id FROM binds_to")
bound_id_set = {r["id"] for r in (bound_ids or []) if r.get("id")}

# Fetch all unclassified decisions once for per-row processing.
unclassified = await client.query(
"SELECT type::string(id) AS id, source_type FROM decision WHERE decision_level IS NONE"
)
unclassified = [r for r in (unclassified or []) if r.get("id")]

product_sources = {"transcript", "notion", "slack", "document"}
impl_sources = {"implementation_choice", "agent_session"}

bound_count = 0
product_count = 0
impl_count = 0
fallback_count = 0
skip_count = 0

for row in unclassified:
did = row["id"]
src = row.get("source_type") or ""

if did in bound_id_set:
level = "L2"
counter = "bound"
elif src in product_sources:
level = "L1"
counter = "product"
elif src in impl_sources:
level = "L3"
counter = "impl"
else:
level = "L2"
counter = "fallback"

try:
await client.execute(
f"UPDATE {did} SET decision_level = '{level}', updated_at = time::now()"
)
except Exception:
# Row has NONE values in required typed fields from a pre-v18
# fixture or ancient DB. Skip it — NONE is already treated as
# L3 by the tolerant policy, so the row remains functional.
skip_count += 1
logger.debug(
"[migration] v22 → v23: skipping %s — record fails "
"re-validation (likely legacy fixture with missing fields)",
did,
)
continue

if counter == "bound":
bound_count += 1
elif counter == "product":
product_count += 1
elif counter == "impl":
impl_count += 1
else:
fallback_count += 1

logger.info(
"[migration] v22 → v23: decision_level backfill — "
"%d bound→L2, %d product→L1, %d impl→L3, %d fallback→L2, %d skipped",
bound_count,
product_count,
impl_count,
fallback_count,
skip_count,
)


async def _write_wire_format_sentinel(
client: LedgerClient,
) -> tuple[str | None, str | None, str]:
Expand Down Expand Up @@ -1405,6 +1509,7 @@ async def _write_wire_format_sentinel(
20: _migrate_v19_to_v20,
21: _migrate_v20_to_v21,
22: _migrate_v21_to_v22,
23: _migrate_v22_to_v23,
}


Expand Down
252 changes: 252 additions & 0 deletions tests/test_v23_decision_level_backfill.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
"""v22 → v23 migration: backfill decision_level for legacy decisions.

Sociable tests — real SurrealDB adapter over ``memory://``, real schema
init + migrate. Seeds decisions with various source_type and binding
states, then runs ``_migrate_v22_to_v23`` directly to simulate the
backfill on legacy rows whose decision_level was cleared to NONE.
"""

from __future__ import annotations

import pytest

from ledger.client import LedgerClient
from ledger.schema import SCHEMA_VERSION, _migrate_v22_to_v23, init_schema, migrate

_NS_COUNTER = 0


async def _fresh_client() -> LedgerClient:
global _NS_COUNTER
_NS_COUNTER += 1
c = LedgerClient(url="memory://", ns=f"v23_test_{_NS_COUNTER}", db="ledger_v23_test")
await c.connect()
await init_schema(c)
await migrate(c, allow_destructive=True)
return c


async def _seed_decision(
c: LedgerClient,
*,
description: str,
source_type: str = "manual",
decision_level: str | None = None,
canonical_id: str = "",
) -> str:
"""Insert a decision and return its string id."""
params: dict = {
"d": description,
"st": source_type,
"cid": canonical_id or f"cid-{description}",
}
if decision_level is not None:
rows = await c.query(
"CREATE decision SET description = $d, source_type = $st, "
"canonical_id = $cid, status = 'ungrounded', decision_level = $lvl",
{**params, "lvl": decision_level},
)
else:
rows = await c.query(
"CREATE decision SET description = $d, source_type = $st, "
"canonical_id = $cid, status = 'ungrounded'",
params,
)
row = rows[0]
rid = row.get("id")
if isinstance(rid, dict):
return f"decision:{rid.get('id', rid)}"
return str(rid)


async def _seed_bound_decision(
c: LedgerClient,
*,
description: str,
source_type: str = "manual",
canonical_id: str = "",
) -> str:
"""Insert a decision with a binds_to edge to a code_region (no decision_level)."""
did = await _seed_decision(
c,
description=description,
source_type=source_type,
canonical_id=canonical_id,
)
await c.query(
"CREATE code_region SET file_path = $fp, symbol_name = $sn, "
"start_line = 1, end_line = 10, content_hash = 'abc123'",
{"fp": f"src/{description}.py", "sn": f"Sym_{description}"},
)
regions = await c.query(
"SELECT type::string(id) AS id FROM code_region WHERE file_path = $fp",
{"fp": f"src/{description}.py"},
)
rid = regions[0]["id"]
await c.execute(f"RELATE {did}->binds_to->{rid} SET confidence = 0.9, created_at = time::now()")
return did


async def _get_level(c: LedgerClient, did: str) -> str | None:
rows = await c.query(f"SELECT decision_level FROM {did} LIMIT 1")
if not rows:
return None
return rows[0].get("decision_level")


@pytest.mark.phase2
@pytest.mark.asyncio
async def test_v23_schema_version() -> None:
"""After migrate, schema version is >= 23."""
c = await _fresh_client()
try:
rows = await c.query("SELECT version FROM schema_meta LIMIT 1")
assert rows
assert rows[0]["version"] == SCHEMA_VERSION
assert SCHEMA_VERSION >= 23
finally:
await c.close()


@pytest.mark.phase2
@pytest.mark.asyncio
async def test_v23_bound_decisions_become_l2() -> None:
"""Decisions with binds_to edges are classified as L2."""
c = await _fresh_client()
try:
did = await _seed_bound_decision(c, description="bound-arch")
await c.execute(f"UPDATE {did} SET decision_level = NONE")
assert await _get_level(c, did) is None

await _migrate_v22_to_v23(c)

assert await _get_level(c, did) == "L2"
finally:
await c.close()


@pytest.mark.phase2
@pytest.mark.asyncio
async def test_v23_product_source_becomes_l1() -> None:
"""Unbound decisions from product sources are classified as L1."""
c = await _fresh_client()
try:
for st in ("transcript", "notion", "slack", "document"):
did = await _seed_decision(c, description=f"product-{st}", source_type=st)
await c.execute(f"UPDATE {did} SET decision_level = NONE")
assert await _get_level(c, did) is None

await _migrate_v22_to_v23(c)

for st in ("transcript", "notion", "slack", "document"):
rows = await c.query(
"SELECT type::string(id) AS id, decision_level FROM decision "
"WHERE description = $d",
{"d": f"product-{st}"},
)
assert rows, f"missing row for source_type={st}"
assert rows[0]["decision_level"] == "L1", (
f"expected L1 for source_type={st}, got {rows[0]['decision_level']}"
)
finally:
await c.close()


@pytest.mark.phase2
@pytest.mark.asyncio
async def test_v23_impl_source_becomes_l3() -> None:
"""Unbound decisions from implementation sources are classified as L3."""
c = await _fresh_client()
try:
for st in ("implementation_choice", "agent_session"):
did = await _seed_decision(c, description=f"impl-{st}", source_type=st)
await c.execute(f"UPDATE {did} SET decision_level = NONE")
assert await _get_level(c, did) is None

await _migrate_v22_to_v23(c)

for st in ("implementation_choice", "agent_session"):
rows = await c.query(
"SELECT type::string(id) AS id, decision_level FROM decision "
"WHERE description = $d",
{"d": f"impl-{st}"},
)
assert rows, f"missing row for source_type={st}"
assert rows[0]["decision_level"] == "L3", (
f"expected L3 for source_type={st}, got {rows[0]['decision_level']}"
)
finally:
await c.close()


@pytest.mark.phase2
@pytest.mark.asyncio
async def test_v23_unknown_source_becomes_l2() -> None:
"""Unbound decisions with unknown source_type default to L2."""
c = await _fresh_client()
try:
did = await _seed_decision(c, description="unknown-src", source_type="manual")
await c.execute(f"UPDATE {did} SET decision_level = NONE")
assert await _get_level(c, did) is None

await _migrate_v22_to_v23(c)

assert await _get_level(c, did) == "L2"
finally:
await c.close()


@pytest.mark.phase2
@pytest.mark.asyncio
async def test_v23_does_not_overwrite_existing_level() -> None:
"""Decisions that already have a decision_level are not touched."""
c = await _fresh_client()
try:
did = await _seed_decision(
c,
description="already-classified",
source_type="transcript",
decision_level="L2",
)
assert await _get_level(c, did) == "L2"

await _migrate_v22_to_v23(c)

assert await _get_level(c, did) == "L2"
finally:
await c.close()


@pytest.mark.phase2
@pytest.mark.asyncio
async def test_v23_idempotent() -> None:
"""Running the backfill twice is a no-op."""
c = await _fresh_client()
try:
did = await _seed_decision(c, description="idempotent-probe", source_type="notion")
await c.execute(f"UPDATE {did} SET decision_level = NONE")
await _migrate_v22_to_v23(c)
assert await _get_level(c, did) == "L1"

await _migrate_v22_to_v23(c)
assert await _get_level(c, did) == "L1"
finally:
await c.close()


@pytest.mark.phase2
@pytest.mark.asyncio
async def test_v23_bound_product_source_still_l2() -> None:
"""A product-source decision WITH bindings should be L2 (code-grounded
takes priority over source_type)."""
c = await _fresh_client()
try:
did = await _seed_bound_decision(c, description="bound-product", source_type="transcript")
await c.execute(f"UPDATE {did} SET decision_level = NONE")
assert await _get_level(c, did) is None

await _migrate_v22_to_v23(c)

assert await _get_level(c, did) == "L2"
finally:
await c.close()
Loading