diff --git a/.github/workflows/test-mcp-regression.yml b/.github/workflows/test-mcp-regression.yml
index dff352be..4b820cac 100644
--- a/.github/workflows/test-mcp-regression.yml
+++ b/.github/workflows/test-mcp-regression.yml
@@ -77,6 +77,8 @@ jobs:
tests/test_phase1_code_locator.py
tests/test_phase2_ledger.py
tests/test_phase3_integration.py
+ tests/test_legacy_ledger_fixtures.py
+ tests/test_schema_recoverable_errors.py
-v --tb=short
--junitxml=test-results/results.xml
--html=test-results/report.html --self-contained-html
diff --git a/README.md b/README.md
index 1a2775af..4657d7c8 100644
--- a/README.md
+++ b/README.md
@@ -1,9 +1,5 @@

-
-
-
-
# Bicameral MCP
@@ -13,7 +9,9 @@
[](https://opensource.org/licenses/MIT)
[](https://github.com/BicameralAI/bicameral-mcp/actions)
-A local-first [MCP server](https://spec.modelcontextprotocol.io/) that ingests your meeting transcripts, PRDs, and Slack threads, maps every decision to the code that implements it, and surfaces alignment gaps to your AI agent — before they become bugs.
+AI agents ship code fast. They forget what your team agreed — and most agreements emerge mid-flight, in corrections and side comments that never reach a doc.
+
+Bicameral MCP is a **spec compliance layer** for AI-assisted engineering. Local-first; runs as an [MCP server](https://spec.modelcontextprotocol.io/). It ingests your meeting transcripts, PRDs, and Slack threads, captures any mid-implementation decision that was not discussed, to be ratified async by your product owner, and pins each one to the code that implements it — so your agent finds out the moment it drifts from either the written spec or the spoken one.
---
@@ -68,9 +66,25 @@ bicameral-mcp --smoke-test
Source: Slack #payments 2026-03-20
```
-**At any time**, the dashboard gives you the full picture:
+**See it in motion** — the loop in three beats:
+
+**1. Ingest (PM or dev).** A transcript, PRD, or Slack thread comes in; bicameral extracts decisions and writes them to the ledger.
+
+https://github.com/user-attachments/assets/e74ae39f-dd99-485b-8122-8c5211478eb1
+
+**2. Preflight (auto).** Before the agent edits code, bicameral surfaces prior decisions, drifted regions, and open questions for the file in scope.
+
+https://github.com/user-attachments/assets/8a0fdfb8-fc9a-49fc-9521-e5b5faf8646a
+
+**3. Ratify async (product owner).** The product owner reviews captured proposals and ratifies or rejects them on their own cadence. Drift tracking activates on ratification.
+
+https://github.com/user-attachments/assets/206e269e-49d6-407d-b338-ab3f2a2c70ec
-
+
+
+
+
+
---
diff --git a/audit_log.py b/audit_log.py
index a7bcc7bc..e062b746 100644
--- a/audit_log.py
+++ b/audit_log.py
@@ -71,6 +71,8 @@ class AuditEventType(enum.StrEnum):
# #252 Layer 2 — wire-format sentinel observability
LEDGER_SCHEMA_VERIFIED = "ledger_schema_verified"
LEDGER_VERSION_DRIFT = "ledger_version_drift"
+ # #296 — recoverable schema-definition skip (init_schema deferring to migrate)
+ SCHEMA_DEFINE_SKIPPED = "schema_define_skipped"
_LEVEL_BY_EVENT: dict[AuditEventType, str] = {
@@ -84,6 +86,7 @@ class AuditEventType(enum.StrEnum):
AuditEventType.ERROR: "error",
AuditEventType.LEDGER_SCHEMA_VERIFIED: "info",
AuditEventType.LEDGER_VERSION_DRIFT: "warn",
+ AuditEventType.SCHEMA_DEFINE_SKIPPED: "warn",
}
_LEVEL_RANK = {"info": 10, "warn": 20, "error": 30}
diff --git a/cli/_diagnose_gather.py b/cli/_diagnose_gather.py
index ba11620b..1854bf05 100644
--- a/cli/_diagnose_gather.py
+++ b/cli/_diagnose_gather.py
@@ -23,9 +23,8 @@
_RECENT_EVENT_TAIL = 5
-def _read_ledger_metadata(adapter) -> tuple[str, int | None, str | None]:
+def _read_ledger_metadata_for_url(url: str) -> tuple[str, int | None, str | None]:
"""Return (ledger_url, size_bytes_or_None, mtime_iso_or_None)."""
- url = getattr(adapter, "_url", "")
if not url.startswith("surrealkv://"):
return url, None, None
path_str = url.removeprefix("surrealkv://")
@@ -37,21 +36,23 @@ def _read_ledger_metadata(adapter) -> tuple[str, int | None, str | None]:
return url, stat.st_size, mtime_iso
-async def _read_bicameral_meta(
- adapter,
-) -> tuple[str | None, str | None, str | None, str, str]:
- """Return (first_write, last_write, last_write_at_iso, drift_status, running).
+def _read_ledger_metadata(adapter) -> tuple[str, int | None, str | None]:
+ return _read_ledger_metadata_for_url(getattr(adapter, "_url", ""))
- ``drift_status`` is one of: ``"first-write"`` / ``"match"`` / ``"drift"`` /
- ``"unavailable"`` (table missing, e.g., pre-Layer-2 ledger).
- """
+
+async def _read_bicameral_meta_raw(
+ client,
+) -> tuple[str | None, str | None, str | None, str, str]:
+ """Same shape as ``_read_bicameral_meta`` but operates on a raw
+ ``LedgerClient``. Used by the MCP ``bicameral_diagnose`` tool, which
+ must work without ``init_schema``/``migrate`` succeeding."""
try:
running = importlib.metadata.version("surrealdb")
except importlib.metadata.PackageNotFoundError:
running = "unknown"
try:
- rows = await adapter._client.query("SELECT * FROM bicameral_meta LIMIT 1")
+ rows = await client.query("SELECT * FROM bicameral_meta LIMIT 1")
except Exception: # noqa: BLE001 — table missing is the load-bearing case
return None, None, None, "unavailable", running
@@ -71,9 +72,20 @@ async def _read_bicameral_meta(
return first, last, last_at_iso, "drift", running
-async def _read_schema_version(adapter) -> int | None:
+async def _read_bicameral_meta(
+ adapter,
+) -> tuple[str | None, str | None, str | None, str, str]:
+ """Return (first_write, last_write, last_write_at_iso, drift_status, running).
+
+ ``drift_status`` is one of: ``"first-write"`` / ``"match"`` / ``"drift"`` /
+ ``"unavailable"`` (table missing, e.g., pre-Layer-2 ledger).
+ """
+ return await _read_bicameral_meta_raw(adapter._client)
+
+
+async def _read_schema_version_raw(client) -> int | None:
try:
- rows = await adapter._client.query("SELECT version FROM schema_meta LIMIT 1")
+ rows = await client.query("SELECT version FROM schema_meta LIMIT 1")
except Exception: # noqa: BLE001
return None
if not rows:
@@ -82,11 +94,15 @@ async def _read_schema_version(adapter) -> int | None:
return int(val) if val is not None else None
-async def _read_table_counts(adapter) -> dict[str, int]:
+async def _read_schema_version(adapter) -> int | None:
+ return await _read_schema_version_raw(adapter._client)
+
+
+async def _read_table_counts_raw(client) -> dict[str, int]:
counts: dict[str, int] = {}
for table in _CANONICAL_TABLES:
try:
- rows = await adapter._client.query(f"SELECT count() AS n FROM {table} GROUP ALL")
+ rows = await client.query(f"SELECT count() AS n FROM {table} GROUP ALL")
except Exception: # noqa: BLE001 — missing table is acceptable (pre-v16)
continue
if rows:
@@ -95,6 +111,10 @@ async def _read_table_counts(adapter) -> dict[str, int]:
return counts
+async def _read_table_counts(adapter) -> dict[str, int]:
+ return await _read_table_counts_raw(adapter._client)
+
+
def _resolve_audit_log_channel() -> tuple[str, Path | None]:
"""Return (channel_label, configured_file_path_or_None)."""
raw = os.getenv("BICAMERAL_AUDIT_LOG", "stderr").strip()
@@ -195,8 +215,17 @@ def _fetch_recommended() -> str | None:
return None
-async def gather_diagnosis(adapter) -> Diagnosis:
- """Collect every allowlisted field from the running install + ledger."""
+async def gather_diagnosis_raw(client, ledger_url: str) -> Diagnosis:
+ """Same allowlisted gather as ``gather_diagnosis`` but takes a raw
+ ``LedgerClient`` and an explicit ``ledger_url``.
+
+ Used by the MCP ``bicameral_diagnose`` tool, which opens a raw client
+ so it can produce a report even when ``adapter.connect()`` (and its
+ init_schema / migrate calls) would crash on a corrupted ledger. The
+ CLI ``bicameral-mcp diagnose`` keeps using ``gather_diagnosis``
+ (adapter-based) because it benefits from the adapter's connection
+ lifecycle in the happy-path operator-bug-report flow.
+ """
try:
bicameral_version = importlib.metadata.version("bicameral-mcp")
except importlib.metadata.PackageNotFoundError:
@@ -204,10 +233,10 @@ async def gather_diagnosis(adapter) -> Diagnosis:
from ledger.schema import SCHEMA_VERSION
- ledger_url, size_bytes, mtime_iso = _read_ledger_metadata(adapter)
- first, last, last_at_iso, drift_status, running = await _read_bicameral_meta(adapter)
- schema_recorded = await _read_schema_version(adapter)
- table_counts = await _read_table_counts(adapter)
+ _, size_bytes, mtime_iso = _read_ledger_metadata_for_url(ledger_url)
+ first, last, last_at_iso, drift_status, running = await _read_bicameral_meta_raw(client)
+ schema_recorded = await _read_schema_version_raw(client)
+ table_counts = await _read_table_counts_raw(client)
channel_label, audit_path = _resolve_audit_log_channel()
recent_events = _tail_recent_events(audit_path, _RECENT_EVENT_TAIL)
@@ -242,3 +271,12 @@ async def gather_diagnosis(adapter) -> Diagnosis:
recent_events=recent_events,
suggestions=suggestions,
)
+
+
+async def gather_diagnosis(adapter) -> Diagnosis:
+ """Adapter-flavoured wrapper over ``gather_diagnosis_raw``.
+
+ Reads the ledger URL off the adapter and forwards to the raw helper.
+ Existing CLI callers (`bicameral-mcp diagnose`) keep this entry point.
+ """
+ return await gather_diagnosis_raw(adapter._client, getattr(adapter, "_url", ""))
diff --git a/contracts.py b/contracts.py
index a6c3cd6a..ef3eabaa 100644
--- a/contracts.py
+++ b/contracts.py
@@ -650,6 +650,43 @@ class ResetResponse(BaseModel):
replay_plan: list[ResetReplayEntry] = []
replay_errors: list[str] = []
next_action: str
+ # #296 Layer E — automated rebuild from .bicameral/events/*.jsonl
+ # after wipe. Populated only when `replay_from_events=True` and
+ # `confirm=True`; reports how many events the materializer replayed.
+ events_replayed: int = 0
+
+
+# ── Tool 8 (new): /bicameral_diagnose ────────────────────────────────
+
+
+class DiagnoseResponse(BaseModel):
+ """Read-only diagnostic snapshot. Mirrors the CLI ``bicameral-mcp
+ diagnose`` output but returns structured fields so agents can render
+ a recovery prompt deterministically.
+
+ `recovery_path` classifies the next operator action:
+ - ``clean`` — ledger looks healthy, no remediation needed
+ - ``fixable`` — schema is behind binary; next normal call migrates
+ - ``reset_rebuild`` — ledger broken AND events present → reset
+ with `replay_from_events=True` recovers without data loss
+ - ``reset_destructive`` — ledger broken AND no events → reset
+ loses decision history; user must explicitly accept
+
+ `diagnosis` carries the same structural-metadata-only fields the
+ CLI emits (see ``cli.diagnose.Diagnosis``); empty when the raw
+ client could not connect.
+ """
+
+ ledger_url: str
+ connect_error: str = ""
+ recovery_path: Literal[
+ "clean",
+ "fixable",
+ "reset_rebuild",
+ "reset_destructive",
+ ]
+ diagnosis: dict | None = None
+ next_action: str
# ── Tool 9: /bicameral_preflight ─────────────────────────────────────
diff --git a/handlers/diagnose.py b/handlers/diagnose.py
new file mode 100644
index 00000000..5099d477
--- /dev/null
+++ b/handlers/diagnose.py
@@ -0,0 +1,166 @@
+"""Handler for /bicameral_diagnose MCP tool.
+
+Read-only structural diagnostic. Mirrors the ``bicameral-mcp diagnose``
+CLI but exposed as an MCP tool so agents can call it from any
+tool-error envelope.
+
+Critical property: this handler MUST work even when ``adapter.connect()``
+crashes inside ``init_schema``/``migrate``. It opens a raw
+``LedgerClient`` directly and calls ``gather_diagnosis_raw``, which
+reads tables defensively (missing tables and SELECT failures are
+treated as "unavailable" not propagated as exceptions). That's the
+whole point — when the ledger is broken, the agent needs a tool that
+still answers "what's wrong?"
+
+Repair is a deliberate CLI operation (``bicameral-mcp diagnose
+--repair``), never an in-session agent action. This tool is
+intentionally read-only.
+"""
+
+from __future__ import annotations
+
+import logging
+import os
+from typing import Literal
+
+from contracts import DiagnoseResponse
+
+RecoveryPath = Literal["clean", "fixable", "reset_rebuild", "reset_destructive"]
+
+logger = logging.getLogger(__name__)
+
+
+def _resolve_ledger_url(ctx) -> str:
+ """Pick the ledger URL from the same source ``adapter.connect()`` does.
+
+ Order: ``ctx.ledger._url`` (if connected), then ``SURREAL_URL`` env,
+ then the embedded default. Mirrors ``SurrealDBLedgerAdapter.__init__``.
+ """
+ ledger = getattr(ctx, "ledger", None)
+ inner = getattr(ledger, "_inner", ledger) if ledger is not None else None
+ for obj in (ledger, inner):
+ if obj is None:
+ continue
+ url = getattr(obj, "_url", None)
+ if url:
+ return str(url)
+
+ from ledger.adapter import _default_db_url
+
+ return os.environ.get("SURREAL_URL", _default_db_url())
+
+
+async def handle_diagnose(ctx) -> DiagnoseResponse:
+ """Probe the ledger read-only and return a structural diagnosis.
+
+ Always opens its own raw ``LedgerClient`` rather than reusing the
+ adapter's client — the adapter may be in a partially-connected
+ state, and going through ``adapter._ensure_connected`` would re-run
+ the migration that's already failing.
+ """
+ from cli._diagnose_gather import gather_diagnosis_raw
+ from ledger.client import LedgerClient
+
+ ledger_url = _resolve_ledger_url(ctx)
+
+ client = LedgerClient(url=ledger_url)
+ try:
+ await client.connect()
+ except Exception as exc: # noqa: BLE001 — operator needs the failure context
+ logger.warning("[diagnose] raw connect failed for %s: %s", ledger_url, exc)
+ return DiagnoseResponse(
+ ledger_url=ledger_url,
+ connect_error=f"{type(exc).__name__}: {exc}",
+ recovery_path="reset_destructive",
+ diagnosis=None,
+ next_action=(
+ "Raw client could not connect to the ledger URL. The DB file "
+ "may be missing, locked, or unreadable. Inspect the path, then "
+ "run `bicameral-mcp reset --confirm` to reinitialise. If the "
+ "directory contains team-mode events under .bicameral/events/, "
+ "use `bicameral_reset(replay_from_events=True, confirm=True)` "
+ "to rebuild from the substrate after wipe."
+ ),
+ )
+
+ try:
+ diagnosis = await gather_diagnosis_raw(client, ledger_url)
+ finally:
+ try:
+ await client.close()
+ except Exception: # noqa: BLE001 — close is best-effort
+ pass
+
+ recovery_path, next_action = _classify_recovery(diagnosis)
+ return DiagnoseResponse(
+ ledger_url=ledger_url,
+ connect_error="",
+ recovery_path=recovery_path,
+ diagnosis=diagnosis.__dict__, # Diagnosis is a frozen dataclass
+ next_action=next_action,
+ )
+
+
+def _classify_recovery(diagnosis) -> tuple[RecoveryPath, str]:
+ """Translate the raw diagnosis into one of four recovery paths.
+
+ - ``clean``: schema_recorded == schema_expected and no flagged drift.
+ - ``fixable``: schema_recorded < schema_expected — `migrate` will
+ catch up on next normal connect.
+ - ``reset_rebuild``: ledger is past the binary's schema OR the
+ diagnose surfaces stale-row warnings; .bicameral/events/ is
+ present, so reset+replay recovers without data loss.
+ - ``reset_destructive``: same condition as reset_rebuild but no
+ events on disk → user must accept data loss.
+
+ The classification is deliberately conservative — anything weird
+ routes to the operator with a clear next_action, not an automated
+ fix.
+ """
+ rec = diagnosis.schema_version_recorded
+ exp = diagnosis.schema_version_expected
+ has_events = _events_present(diagnosis.ledger_url)
+
+ if rec is not None and rec > exp:
+ path: RecoveryPath = "reset_rebuild" if has_events else "reset_destructive"
+ return path, (
+ f"Ledger schema v{rec} is newer than this binary (v{exp}). "
+ f"Upgrade `bicameral-mcp` to a version that understands v{rec}, "
+ f"or run `bicameral_reset(replay_from_events={has_events}, confirm=True)`."
+ )
+
+ if rec is not None and rec < exp:
+ return "fixable", (
+ f"Ledger schema v{rec} is behind binary v{exp}. The next normal "
+ f"connect will run pending migrations. If connect is failing, "
+ f"the cleanup migration may need a re-run — `bicameral-mcp diagnose --repair`."
+ )
+
+ if rec is None:
+ return "clean", (
+ "Schema version not yet recorded — likely a fresh install. "
+ "Any tool call will initialise the ledger."
+ )
+
+ # rec == exp — confirm the table counts look sane
+ table_counts = diagnosis.table_counts or {}
+ if not table_counts:
+ return "fixable", (
+ "Schema version matches but no tables visible. "
+ "Connect may have stopped mid-init; re-run a tool call to retry."
+ )
+
+ return "clean", (f"Ledger is at expected schema v{exp}. No remediation needed.")
+
+
+def _events_present(ledger_url: str) -> bool:
+ """Best-effort check for ``.bicameral/events/*.jsonl``."""
+ if not ledger_url.startswith("surrealkv://"):
+ return False
+ from pathlib import Path
+
+ db_path = Path(ledger_url.removeprefix("surrealkv://"))
+ events_dir = db_path.parent / "events"
+ if not events_dir.exists():
+ return False
+ return any(events_dir.glob("*.jsonl"))
diff --git a/handlers/reset.py b/handlers/reset.py
index 2814ddb1..a05da9ee 100644
--- a/handlers/reset.py
+++ b/handlers/reset.py
@@ -1,6 +1,6 @@
"""Handler for /bicameral_reset MCP tool.
-The fail-safe valve. Two modes:
+The fail-safe valve. Two wipe modes plus an optional replay path.
wipe_mode="ledger" (default)
Wipes the materialized SurrealDB rows scoped to the current repo.
@@ -14,10 +14,19 @@
Use this for: nuclear restart, switching repos, credential rotation.
The user must explicitly confirm after seeing the warning.
+ replay_from_events=True (#296 Layer E)
+ After a successful ``wipe_mode="ledger"`` wipe, the watermark is
+ reset and ``EventMaterializer.replay_new_events`` rebuilds the
+ local DB from ``.bicameral/events/.jsonl``. The event log
+ is the canonical record (committed to git in team mode) — replay
+ is recovery, not destruction. Combined with ``wipe_mode="full"``
+ is rejected because full-wipe deletes the very events we'd replay.
+
Safety design:
- Dry run by default. confirm=False returns the plan without touching state.
- Replay plan is always computed before any destructive operation.
- Full mode surfaces the exact path that will be deleted in the dry run.
+ - replay_from_events surfaces the on-disk event count in the dry run.
"""
from __future__ import annotations
@@ -35,6 +44,7 @@ async def handle_reset(
replay: bool = True,
confirm: bool = False,
wipe_mode: str = "ledger",
+ replay_from_events: bool = False,
) -> ResetResponse:
"""Wipe the ledger (and optionally the full .bicameral/ dir) for ctx.repo_path.
@@ -44,7 +54,32 @@ async def handle_reset(
confirm: False = dry run (default). True = execute.
wipe_mode: "ledger" = wipe DB rows only (server stays live).
"full" = delete the entire .bicameral/ directory.
+ replay_from_events: After wipe (only when wipe_mode="ledger"),
+ replay every event in .bicameral/events/*.jsonl through the
+ ingest path to recover decisions deterministically. Mutually
+ exclusive with wipe_mode="full" — that mode deletes the
+ substrate we'd replay from.
"""
+ if replay_from_events and wipe_mode == "full":
+ return ResetResponse(
+ wiped=False,
+ wipe_mode=wipe_mode,
+ ledger_url=_resolve_ledger_url(ctx, ctx.ledger),
+ bicameral_dir="",
+ repo=ctx.repo_path,
+ cursors_before=0,
+ replay_plan=[],
+ replay_errors=[
+ "replay_from_events is incompatible with wipe_mode='full' "
+ "(full wipe deletes .bicameral/events which is the replay source)"
+ ],
+ next_action=(
+ "Pick one: wipe_mode='ledger' + replay_from_events=True for "
+ "recovery from a corrupted ledger, OR wipe_mode='full' for a "
+ "nuclear restart that intentionally drops everything."
+ ),
+ )
+
ledger = ctx.ledger
if hasattr(ledger, "connect"):
await ledger.connect()
@@ -70,6 +105,8 @@ async def handle_reset(
ledger_url = _resolve_ledger_url(ctx, ledger)
bicameral_dir = _resolve_bicameral_dir(ledger) if wipe_mode == "full" else ""
+ events_on_disk = _count_events_on_disk(ledger) if replay_from_events else 0
+
if not confirm:
if wipe_mode == "full":
dir_desc = (
@@ -83,6 +120,16 @@ async def handle_reset(
f"WARNING: this removes config.yaml, team event files, and all history — "
f"there is no undo. Re-run with confirm=True to execute."
)
+ elif replay_from_events:
+ next_action = (
+ f"DRY RUN — LEDGER WIPE + REBUILD. Would wipe {cursors_before} "
+ f"source_cursor row(s), every bicameral node/edge scoped to "
+ f"{ctx.repo_path!r}, then reset the watermark and replay "
+ f"{events_on_disk} event(s) from .bicameral/events/*.jsonl through "
+ f"the ingest path. The event log is the canonical record — replay "
+ f"recovers decisions deterministically. "
+ f"Re-run with confirm=True to execute."
+ )
else:
next_action = (
f"Dry run only. Would wipe {cursors_before} source_cursor row(s) "
@@ -97,6 +144,7 @@ async def handle_reset(
repo=ctx.repo_path,
cursors_before=cursors_before,
replay_plan=replay_plan if replay else [],
+ events_replayed=events_on_disk if replay_from_events else 0,
next_action=next_action,
)
@@ -138,6 +186,15 @@ async def handle_reset(
bicameral_dir,
)
+ events_replayed = 0
+ replay_errors: list[str] = []
+ if replay_from_events and wipe_mode != "full":
+ try:
+ events_replayed = await _replay_events_into_ledger(ledger)
+ except Exception as exc: # noqa: BLE001 — surface failure but keep wipe done
+ logger.exception("[reset] replay_from_events failed: %s", exc)
+ replay_errors.append(f"replay_from_events failed: {exc}")
+
if wipe_mode == "full":
next_action = (
f"Full wipe complete for repo {ctx.repo_path!r}. "
@@ -146,6 +203,20 @@ async def handle_reset(
f"Schema has been reinitialised — the server is ready for fresh ingestion. "
f"Re-run the original bicameral_ingest calls for each entry in replay_plan."
)
+ elif replay_from_events:
+ if replay_errors:
+ next_action = (
+ f"Ledger wiped for repo {ctx.repo_path!r}. Replay FAILED — see "
+ f"replay_errors. The wipe succeeded; the event substrate is intact. "
+ f"Re-run with confirm=True after addressing the replay error, or "
+ f"fall back to manual ingest using replay_plan."
+ )
+ else:
+ next_action = (
+ f"Ledger wiped and rebuilt from events for repo {ctx.repo_path!r}. "
+ f"{events_replayed} event(s) replayed through the ingest path. "
+ f"Verify with `bicameral_diagnose` or `bicameral_history`."
+ )
else:
next_action = (
f"Ledger wiped for repo {ctx.repo_path!r}. "
@@ -162,6 +233,8 @@ async def handle_reset(
repo=ctx.repo_path,
cursors_before=cursors_before,
replay_plan=replay_plan if replay else [],
+ replay_errors=replay_errors,
+ events_replayed=events_replayed,
next_action=next_action,
)
@@ -169,6 +242,77 @@ async def handle_reset(
# ── Wipe implementations ─────────────────────────────────────────────
+def _resolve_events_dir(ledger) -> Path | None:
+ """Return ``.bicameral/events/`` for the resolved ledger URL, or None
+ when the ledger is in-memory or the directory does not exist.
+
+ Honours ``BICAMERAL_DATA_PATH`` symmetrically with
+ ``adapters/ledger.py:_real_ledger_instance`` so `replay_from_events`
+ targets the same substrate the team-mode write path uses.
+ """
+ import os as _os
+
+ bicameral_dir = _resolve_bicameral_dir(ledger)
+ data_path = _os.environ.get("BICAMERAL_DATA_PATH")
+ if data_path:
+ events_dir = Path(data_path) / ".bicameral" / "events"
+ elif bicameral_dir:
+ events_dir = Path(bicameral_dir) / "events"
+ else:
+ return None
+ return events_dir if events_dir.exists() else None
+
+
+def _count_events_on_disk(ledger) -> int:
+ """Tally non-empty lines across every ``.jsonl`` under events/.
+
+ Best-effort: returns 0 when the directory is missing, ledger is
+ in-memory, or any file read fails. Used only to surface a count in
+ the dry-run; not load-bearing for the replay itself.
+ """
+ events_dir = _resolve_events_dir(ledger)
+ if events_dir is None:
+ return 0
+ total = 0
+ for path in events_dir.glob("*.jsonl"):
+ try:
+ with open(path, "rb") as f:
+ for line in f:
+ if line.strip():
+ total += 1
+ except OSError:
+ continue
+ return total
+
+
+async def _replay_events_into_ledger(ledger) -> int:
+ """Reset the materializer watermark and replay every event back
+ through the ingest path. Returns the count of events the
+ materializer applied.
+
+ Uses the same ``EventMaterializer`` instance team mode uses, so
+ replay-vs-live divergence is impossible by construction. Determinism
+ is tracked separately under issue #296.
+ """
+ inner = getattr(ledger, "_inner", ledger)
+ events_dir = _resolve_events_dir(ledger)
+ if events_dir is None:
+ return 0
+
+ local_dir = events_dir.parent / "local"
+ watermark_path = local_dir / "watermark"
+ local_dir.mkdir(parents=True, exist_ok=True)
+ # Reset the watermark so every event replays from offset 0. The
+ # materializer's offset map is `{author: byte_offset}`; writing an
+ # empty object is the canonical "start over" signal.
+ watermark_path.write_text("{}\n", encoding="utf-8")
+
+ from events.materializer import EventMaterializer
+
+ materializer = EventMaterializer(events_dir, local_dir)
+ return await materializer.replay_new_events(inner)
+
+
async def _wipe_ledger(ledger, repo_path: str) -> None:
"""Wipe DB rows only. Delegates to adapter method or falls back to direct delete."""
if hasattr(ledger, "wipe_all_rows"):
diff --git a/ledger/schema.py b/ledger/schema.py
index fce5e0da..94b3d552 100644
--- a/ledger/schema.py
+++ b/ledger/schema.py
@@ -28,7 +28,7 @@
# - edges: yields(input_span→decision), binds_to(decision→code_region),
# locates(symbol→code_region)
# - removed: maps_to, implements
-SCHEMA_VERSION = 16
+SCHEMA_VERSION = 17
# Maps schema version → minimum bicameral-mcp code version that understands it.
# Used to produce actionable "upgrade your binary" messages.
@@ -45,8 +45,22 @@
14: "0.13.0", # placeholder; release-eng pins final value at PR merge — Phase 4 (#61)
15: "0.15.x", # decision.governance (#109 — governance metadata)
16: "0.13.x", # #252 Layer 2 — wire-format sentinel via bicameral_meta table; placeholder, release-eng pins final value at PR merge
+ 17: "0.14.x", # re-runnable yields integrity cleanup; placeholder, release-eng pins final value at PR merge
}
+# SurrealDB error substrings that init_schema treats as recoverable: the row
+# state predates the new constraint and the next migration is responsible for
+# cleaning it up. The substring set is the load-bearing safety contract — any
+# new pattern here must be paired with a fixture-DB regression test
+# (tests/test_schema_recoverable_errors.py) that produces the exact string,
+# so a future surrealdb-py bump that changes the format fails CI loudly.
+RECOVERABLE_DEFINE_PATTERNS: tuple[str, ...] = (
+ "already exists", # DEFINE re-applied with no change
+ "already contains", # UNIQUE index attempted on table w/ duplicates
+ "expected a record<", # TYPE constraint mismatch (rename of IN/OUT type)
+ "but expected", # generic value-type assertion failure (defensive)
+)
+
# Migrations that drop or recreate tables/data. These are never auto-applied;
# the user must explicitly confirm via bicameral_reset(confirm=True).
DESTRUCTIVE_MIGRATIONS: frozenset[int] = frozenset()
@@ -423,24 +437,53 @@ def _with_overwrite(sql: str) -> str:
async def _execute_define_idempotent(client: LedgerClient, sql: str) -> None:
- """Run a DEFINE statement; treat "already exists" / "already contains" as success.
-
- "already contains" is SurrealDB's error when a UNIQUE index is attempted on
- a table that already has duplicate rows. This lets the server start so the
- migration that cleans stale data can run; the migration re-issues the index.
+ """Run a DEFINE statement; treat known recoverable errors as success.
+
+ Recoverable patterns are listed in ``RECOVERABLE_DEFINE_PATTERNS``:
+
+ - ``already exists`` — DEFINE re-applied; no-op.
+ - ``already contains`` — UNIQUE index against duplicates; the migration
+ that cleans stale data runs next and re-issues the index.
+ - ``expected a record<`` — DEFINE TABLE / INDEX OVERWRITE re-validating
+ legacy rows whose IN/OUT type was renamed (e.g. v3 ``yields.in =
+ source_span:*`` against the current ``RELATION IN input_span``).
+ The migration that drops those rows runs next.
+ - ``but expected`` — generic value-type assertion failure; same recovery
+ shape as the type-mismatch case.
+
+ Any other ``LedgerError`` propagates so connect() fails fast and the
+ operator sees an actionable message pointing at ``bicameral-mcp diagnose``.
"""
try:
await client.execute(sql)
except LedgerError as exc:
- msg = str(exc)
- if "already exists" not in msg and "already contains" not in msg:
+ msg_lower = str(exc).lower()
+ if not any(p in msg_lower for p in RECOVERABLE_DEFINE_PATTERNS):
raise
- if "already contains" in msg:
+ # Loud signal — UNIQUE-violation and type-mismatch each warrant a
+ # warning so the next migration's cleanup is visible in audit logs.
+ if (
+ "already contains" in msg_lower
+ or "expected a record<" in msg_lower
+ or "but expected" in msg_lower
+ ):
logger.warning(
- "[schema] DEFINE INDEX skipped — existing data violates UNIQUE "
- "constraint (%s). Migration will clean stale rows and re-apply.",
+ "[schema] DEFINE skipped — existing data violates new constraint "
+ "(%s). Migration will clean stale rows and re-apply. detail=%s",
sql.split("ON")[0].strip(),
+ str(exc).splitlines()[0] if str(exc) else "",
)
+ try:
+ from audit_log import AuditEventType
+ from audit_log import emit as audit_emit
+
+ audit_emit(
+ AuditEventType.SCHEMA_DEFINE_SKIPPED,
+ sql_prefix=sql.split("ON")[0].strip(),
+ error_class=type(exc).__name__,
+ )
+ except Exception: # noqa: BLE001 — audit MUST NOT break init_schema
+ pass
async def init_schema(client: LedgerClient) -> None:
@@ -459,64 +502,81 @@ async def init_schema(client: LedgerClient) -> None:
# ── Migrations ──────────────────────────────────────────────────────────
-async def _migrate_v4_to_v5(client: LedgerClient) -> None:
- """v4 → v5: Remove stale v3-era yields edges and deduplicate.
-
- Some DBs that went through v3→v4 still have residual source_span→intent
- edges in the yields table (the REMOVE TABLE in v3→v4 silently failed).
- Those stale edges prevent DEFINE INDEX idx_yields_unique from being
- applied, which broke startup. This migration:
+async def _clean_yields_legacy_rows(client: LedgerClient, *, log_tag: str) -> int:
+ """Delete `yields` rows whose IN/OUT references the v3-era types
+ (`source_span:*` / `intent:*`) that no longer match the current
+ `RELATION IN input_span OUT decision` constraint.
- 1. Deletes any yields edge whose `in` is a source_span record
- or whose `out` is an intent record (v3-era types).
- 2. Deduplicates remaining yields edges by (in, out), keeping
- the first-seen record per pair.
- 3. Re-applies the unique index now that the table is clean.
+ Returns the count of rows removed. Tolerates per-row failures so a
+ single rejected delete does not abort the whole pass.
"""
- # Step 1: Remove stale v3 edges
try:
stale = await client.query(
"SELECT id FROM yields "
"WHERE string::starts_with(type::string(in), 'source_span:') "
" OR string::starts_with(type::string(out), 'intent:')"
)
- for row in stale or []:
+ except Exception as exc:
+ logger.warning("[migration] %s: stale-edge select failed: %s", log_tag, exc)
+ return 0
+ removed = 0
+ for row in stale or []:
+ try:
+ await client.execute(f"DELETE {row['id']}")
+ removed += 1
+ except Exception:
+ pass
+ if removed:
+ logger.info("[migration] %s: removed %d stale legacy yields edges", log_tag, removed)
+ return removed
+
+
+async def _dedupe_yields(client: LedgerClient, *, log_tag: str) -> int:
+ """Drop duplicate `yields` rows by (in, out), keeping the first seen.
+
+ Returns the count of duplicates removed. Caller is responsible for
+ re-applying ``DEFINE INDEX idx_yields_unique`` afterwards.
+ """
+ try:
+ rows = await client.query("SELECT id, in, out FROM yields")
+ except Exception as exc:
+ logger.warning("[migration] %s: dedup select failed: %s", log_tag, exc)
+ return 0
+ seen: set[tuple[str, str]] = set()
+ removed = 0
+ for row in rows or []:
+ key = (str(row.get("in", "")), str(row.get("out", "")))
+ if key in seen:
try:
await client.execute(f"DELETE {row['id']}")
+ removed += 1
except Exception:
pass
- logger.info(
- "[migration] v4 → v5: removed %d stale v3 yields edges",
- len(stale or []),
- )
- except Exception as exc:
- logger.warning("[migration] v4 → v5: stale-edge cleanup failed: %s", exc)
+ else:
+ seen.add(key)
+ if removed:
+ logger.info("[migration] %s: removed %d duplicate yields edges", log_tag, removed)
+ return removed
- # Step 2: Deduplicate remaining yields by (in, out)
- try:
- all_yields = await client.query("SELECT id, in, out FROM yields")
- seen: set[tuple[str, str]] = set()
- removed = 0
- for row in all_yields or []:
- key = (str(row.get("in", "")), str(row.get("out", "")))
- if key in seen:
- try:
- await client.execute(f"DELETE {row['id']}")
- removed += 1
- except Exception:
- pass
- else:
- seen.add(key)
- if removed:
- logger.info("[migration] v4 → v5: removed %d duplicate yields edges", removed)
- except Exception as exc:
- logger.warning("[migration] v4 → v5: dedup failed: %s", exc)
- # Step 3: Re-apply the unique index now that the table is clean
- for sql in [
+async def _migrate_v4_to_v5(client: LedgerClient) -> None:
+ """v4 → v5: Remove stale v3-era yields edges and deduplicate.
+
+ Some DBs that went through v3→v4 still have residual source_span→intent
+ edges in the yields table (the REMOVE TABLE in v3→v4 silently failed).
+ Those stale edges prevent DEFINE INDEX idx_yields_unique from being
+ applied, which broke startup.
+
+ See also ``_migrate_v16_to_v17`` — a re-run of the same cleanup, gated
+ on the v16→v17 boundary, for DBs that rolled past v5 with the
+ corruption still present.
+ """
+ await _clean_yields_legacy_rows(client, log_tag="v4 → v5")
+ await _dedupe_yields(client, log_tag="v4 → v5")
+ await _execute_define_idempotent(
+ client,
"DEFINE INDEX idx_yields_unique ON yields FIELDS in, out UNIQUE",
- ]:
- await _execute_define_idempotent(client, sql)
+ )
logger.info("[migration] v4 → v5: yields table clean, unique index applied")
@@ -929,6 +989,36 @@ async def _migrate_v15_to_v16(client: LedgerClient) -> None:
return
+async def _migrate_v16_to_v17(client: LedgerClient) -> None:
+ """v16 → v17: Re-run the yields integrity cleanup (#296 follow-up).
+
+ The v4 → v5 cleanup (``_migrate_v4_to_v5``) was the last reachable
+ pass that purged v3-era ``source_span:*`` rows from the ``yields``
+ table. Any DB whose ``schema_meta.version`` rolled past 5 with the
+ corruption still present is permanently broken on the v4→v5 boundary
+ — ``migrate()`` only iterates ``range(current+1, SCHEMA_VERSION+1)``,
+ so historical migrations are unreachable.
+
+ This migration re-runs the same yields cleanup body. It is a no-op on
+ a clean DB (zero deletions, idempotent index re-apply). It exists to
+ cover any operator whose ledger crashes connect() with the type
+ mismatch ``yields.in expected a record`` because their DB
+ holds the legacy rows.
+
+ Symmetric with v4→v5 — both call into
+ ``_clean_yields_legacy_rows`` + ``_dedupe_yields`` so the SQL lives
+ in one place. Add a new fixture under
+ ``tests/fixtures/legacy_ledgers/`` for any future row-shape we
+ discover; the registry-driven test parametrizes over it.
+ """
+ await _clean_yields_legacy_rows(client, log_tag="v16 → v17")
+ await _dedupe_yields(client, log_tag="v16 → v17")
+ await _execute_define_idempotent(
+ client,
+ "DEFINE INDEX idx_yields_unique ON yields FIELDS in, out UNIQUE",
+ )
+
+
async def _write_wire_format_sentinel(
client: LedgerClient,
) -> tuple[str | None, str | None, str]:
@@ -1006,6 +1096,7 @@ async def _write_wire_format_sentinel(
14: _migrate_v13_to_v14,
15: _migrate_v14_to_v15,
16: _migrate_v15_to_v16,
+ 17: _migrate_v16_to_v17,
}
diff --git a/server.py b/server.py
index e10179a5..4d04104b 100644
--- a/server.py
+++ b/server.py
@@ -38,6 +38,7 @@
from context import BicameralContext
from dashboard.server import get_dashboard_server
from handlers.bind import handle_bind
+from handlers.diagnose import handle_diagnose
from handlers.gap_judge import handle_judge_gaps
from handlers.history import handle_history
from handlers.ingest import _IngestRefused, handle_ingest
@@ -818,6 +819,19 @@ async def list_tools() -> list[Tool]:
},
},
),
+ Tool(
+ name="bicameral.diagnose",
+ description=(
+ "Read-only structural diagnosis of the local ledger — versions, schema_meta state, "
+ "table counts, recent warn|error audit events, and a recovery_path classification "
+ "(clean / fixable / reset_rebuild / reset_destructive). Works even when the normal "
+ "adapter connect crashes during init_schema/migrate, because it opens a raw "
+ "LedgerClient that doesn't run schema initialization. Pair with `bicameral_reset` "
+ "for the actual recovery action — this tool never mutates state. "
+ "Slash alias: /bicameral-diagnose"
+ ),
+ inputSchema={"type": "object", "properties": {}},
+ ),
# ── Code locator tools (MCP-native) ──────────────────────────
Tool(
name="validate_symbols",
@@ -1032,7 +1046,10 @@ async def _call_tool_impl(name: str, arguments: dict) -> list[TextContent]:
confirm=arguments.get("confirm", False),
replay=arguments.get("replay", True),
wipe_mode=arguments.get("wipe_mode", "ledger"),
+ replay_from_events=arguments.get("replay_from_events", False),
)
+ elif name in ("bicameral.diagnose", "diagnose"):
+ result = await handle_diagnose(ctx)
elif name in ("bicameral.preflight", "preflight"):
result = await handle_preflight(
ctx,
diff --git a/skills/bicameral-diagnose/SKILL.md b/skills/bicameral-diagnose/SKILL.md
new file mode 100644
index 00000000..d4bb75b6
--- /dev/null
+++ b/skills/bicameral-diagnose/SKILL.md
@@ -0,0 +1,77 @@
+---
+name: bicameral-diagnose
+description: Read-only structural diagnosis of the local bicameral ledger. Fires on "what's wrong with my ledger", "diagnose the ledger", "is bicameral broken", "ledger health check", or any user-facing tool error that mentions schema/migration/SurrealDB. Calls `bicameral.diagnose` (MCP) which uses a raw client and works even when the normal connect path crashes. Returns a structured `recovery_path` (clean / fixable / reset_rebuild / reset_destructive) the agent surfaces alongside the recommended next command. Never mutates state — repair is a CLI operation (`bicameral-mcp diagnose --repair`) or a deliberate reset call.
+---
+
+# Bicameral Diagnose
+
+Read-only diagnostic for the local ledger. The MCP tool surface for the same `bicameral-mcp diagnose` CLI you'd paste into a bug report — but agent-callable from a tool-error envelope, and resilient to the failure modes that crash normal `connect()`.
+
+## When to fire
+
+- User asks "what's wrong with my ledger" / "is bicameral broken" / "ledger health check" / "diagnose the ledger".
+- Any tool error envelope from another bicameral tool that mentions `LedgerError`, `SchemaVersionTooNew`, `DestructiveMigrationRequired`, or a SurrealDB error string the user can't action.
+- Before recommending `bicameral_reset` — diagnose first, choose the recovery path on the basis of the response, then propose the matching reset call.
+
+## When NOT to fire
+
+- For questions about decisions or drift — that's `bicameral.history` / `bicameral.preflight`. Diagnose is about the ledger storage layer.
+- To repair the ledger. Diagnose is read-only by contract. The repair surface is `bicameral-mcp diagnose --repair` (CLI, user-driven) or a deliberate `bicameral_reset` call. Surface the diagnosis, then surface the recommended next command — never silently retry.
+
+## Output contract
+
+The MCP tool returns:
+
+```jsonc
+{
+ "ledger_url": "surrealkv:///Users/.../ledger.db",
+ "connect_error": "", // non-empty when raw connect itself failed
+ "recovery_path": "clean" | "fixable" | "reset_rebuild" | "reset_destructive",
+ "diagnosis": { // null when connect_error is set
+ "bicameral_version": "0.14.x",
+ "python_version": "3.13.0",
+ "platform_str": "...",
+ "surrealdb_running": "1.0.4",
+ "schema_version_recorded": 16,
+ "schema_version_expected": 17,
+ "drift_status": "match" | "drift" | "first-write" | "unavailable",
+ "table_counts": { "decision": 42, "yields": 100, ... },
+ "recent_events": [...], // last 5 warn|error audit log entries
+ "suggestions": [...], // hardcoded heuristics from the CLI gather
+ ...
+ },
+ "next_action": "human-readable instruction tied to recovery_path"
+}
+```
+
+## Recovery-path matrix
+
+Render `next_action` verbatim to the user. Then offer the matching command:
+
+| `recovery_path` | What it means | Recommend |
+|---|---|---|
+| `clean` | Schema matches, tables look sane. | "No remediation needed. If you're still seeing errors, share the exact tool name and arguments." |
+| `fixable` | Schema is behind binary; pending migrations will run on next normal connect. | "Run any bicameral tool — it will trigger the migration. If that fails, try `bicameral-mcp diagnose --repair` (CLI)." |
+| `reset_rebuild` | Ledger is unrecoverable, but `.bicameral/events/` has events on disk. | "`bicameral_reset(wipe_mode='ledger', replay_from_events=True, confirm=True)` will wipe and rebuild from the event log." |
+| `reset_destructive` | Ledger is unrecoverable AND no events on disk → reset loses decision history. | "`bicameral_reset(wipe_mode='ledger', confirm=True)` will wipe; you'll need to re-ingest sources from `replay_plan`." |
+
+For `connect_error`-set responses (raw client itself can't connect), surface the error text and suggest checking the ledger path / file permissions before any reset.
+
+## Two diagnose surfaces — when to use which
+
+| Surface | Role | When |
+|---|---|---|
+| `bicameral.diagnose` (MCP, this skill) | Agent-callable, read-only, returns structured `recovery_path`. | In-session diagnosis, automatic surfacing from tool errors. |
+| `bicameral-mcp diagnose` (CLI) | Human-pasteable markdown for bug reports. Adds `--repair` flag (user-driven repair attempts). | Bug reports, manual repair, terminal sessions without an active agent. |
+
+Both call the same `gather_diagnosis_raw` function — the data is identical. The MCP version emits structured JSON; the CLI emits markdown. Don't render the CLI markdown via this skill; surface the structured fields directly.
+
+## Auto-fire from another tool's error envelope
+
+When another bicameral tool returns an error containing schema/migration vocabulary, the agent should:
+
+1. Call `bicameral.diagnose` immediately (no user prompt — it's read-only and bounded).
+2. Render the `recovery_path` and `next_action`.
+3. Wait for user confirmation before invoking any reset command. Reset is destructive even when "non-destructive on paper."
+
+Never run `bicameral_reset` on the basis of a diagnose response without explicit user confirmation.
diff --git a/skills/bicameral-reset/SKILL.md b/skills/bicameral-reset/SKILL.md
index 66e7d601..d407db09 100644
--- a/skills/bicameral-reset/SKILL.md
+++ b/skills/bicameral-reset/SKILL.md
@@ -1,19 +1,30 @@
---
name: bicameral-reset
-description: Emergency trust recovery for a polluted or stale ledger. Fires when the user says "my ledger looks wrong", "nuke the ledger", "start over", "this is polluted", or otherwise loses trust in the current state. DRY RUN BY DEFAULT — always confirms with the user before the destructive call. Two modes: ledger (default, wipes DB rows only) and full (deletes entire .bicameral/ directory).
+description: Emergency trust recovery for a polluted or stale ledger. Fires when the user says "my ledger looks wrong", "nuke the ledger", "start over", "this is polluted", or otherwise loses trust in the current state. DRY RUN BY DEFAULT — always confirms with the user before the destructive call. Two wipe modes (ledger / full) plus an optional `replay_from_events` flag that rebuilds the ledger from `.bicameral/events/*.jsonl` after wipe.
---
# Bicameral Reset
The fail-safe valve. When the ledger gets polluted — bad ingest, stale groundings, or a session that went off the rails — `bicameral.reset` gives you a one-command recovery path.
-## Two modes
+## Two wipe modes
| Mode | What's deleted | When to use |
|------|----------------|-------------|
| `wipe_mode="ledger"` (default) | Materialized SurrealDB rows only. Config, event files, and team history are preserved. Server stays live. | Bug recovery, bad ingest, polluted groundings. The safe default. |
| `wipe_mode="full"` | The entire `.bicameral/` directory — ledger, `config.yaml`, team event JSONL files, everything. | Nuclear restart: switching repos, credential rotation, complete distrust of all prior decisions. |
+## Optional `replay_from_events`
+
+`replay_from_events=True` (only valid with `wipe_mode="ledger"`) wipes the materialized DB, resets the team-mode watermark, and replays every event in `.bicameral/events/*.jsonl` back through the same ingest path team mode uses. The event log is the canonical record (committed to git in team mode); this replay is recovery, not destruction.
+
+Use it when:
+
+- `bicameral_diagnose` returns `recovery_path="reset_rebuild"` — events are present and the ledger is unrecoverable through normal migration.
+- A corrupted ledger needs to be reconstructed without losing decision history.
+
+The dry run reports the on-disk event count; the post-confirm response reports `events_replayed` so the user can verify the round-trip. Combining `replay_from_events=True` with `wipe_mode="full"` is rejected by the handler — full-wipe deletes the substrate we'd replay from.
+
## When to fire
- User says *"my ledger looks polluted"*, *"this is wrong, start over"*, *"nuke the ledger"*, *"wipe it and retry"*
diff --git a/tests/fixtures/legacy_ledgers/README.md b/tests/fixtures/legacy_ledgers/README.md
new file mode 100644
index 00000000..23e61a77
--- /dev/null
+++ b/tests/fixtures/legacy_ledgers/README.md
@@ -0,0 +1,35 @@
+# Legacy ledger fixtures
+
+Frozen DB states that reproduce historical corruption patterns. Each
+file builds the bad state in a fresh `memory://` ledger using raw
+`LedgerClient.execute` calls — never the real `init_schema` / `migrate`
+path, which would refuse to apply the broken state.
+
+The CI suite at `tests/test_legacy_ledger_fixtures.py` parametrizes
+over every fixture:
+
+1. Build the bad state.
+2. Run `init_schema` + `migrate` (the production code path).
+3. Assert the cleanup ran, the schema reaches `SCHEMA_VERSION`, no
+ row violates the current type/UNIQUE constraints, and a second
+ `init_schema` + `migrate` is a no-op (idempotent).
+
+## Fixture index
+
+| Fixture | Reproduces | First seen | Cleaned by |
+|---|---|---|---|
+| `v3_yields_source_span.py` | v3-era `yields.in = source_span:*` rows surviving past v5 cleanup | 2026-05-09 dogfood (#296 root cause) | v4→v5 + v16→v17 |
+
+## Adding a fixture
+
+Each fixture is a Python module exporting an async `build(client)`
+coroutine that mutates the client's state. Keep them tiny — one bad
+row per fixture is enough to assert the cleanup contract.
+
+```python
+# tests/fixtures/legacy_ledgers/.py
+async def build(client):
+ await client.execute("…raw SurrealQL that produces the bad state…")
+```
+
+Then register it in `tests/test_legacy_ledger_fixtures.py::FIXTURES`.
diff --git a/tests/fixtures/legacy_ledgers/__init__.py b/tests/fixtures/legacy_ledgers/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/tests/fixtures/legacy_ledgers/v3_yields_source_span.py b/tests/fixtures/legacy_ledgers/v3_yields_source_span.py
new file mode 100644
index 00000000..7ff0bf9b
--- /dev/null
+++ b/tests/fixtures/legacy_ledgers/v3_yields_source_span.py
@@ -0,0 +1,55 @@
+"""Fixture: v3-era ``yields.in = source_span:*`` rows.
+
+Reproduces the dogfood crash (#296) where a DB carried v3-vintage
+``yields`` edges whose ``in`` is a ``source_span:*`` record. The
+current schema declares ``yields`` as ``RELATION IN input_span OUT
+decision``, so applying ``DEFINE INDEX OVERWRITE idx_yields_unique``
+re-validates the existing rows and fails with::
+
+ Found source_span: for field `in`, with record `yields:`,
+ but expected a record
+
+The v4 → v5 cleanup deletes these rows, but only on the v4→v5
+boundary. Any DB whose ``schema_meta.version`` rolled past 5 with the
+corruption intact is permanently broken until the v16 → v17 cleanup
+re-runs the same logic.
+"""
+
+from __future__ import annotations
+
+
+async def build(client) -> None:
+ """Insert a v3-era ``yields`` row whose ``in`` is a ``source_span``.
+
+ Sets ``schema_meta.version = 16`` so the migration loop sees a
+ legitimate "past v5" DB that v4→v5 cannot reach. The v16 → v17
+ cleanup is the only path that should fix it.
+ """
+ # Define a minimal source_span shadow table so the RecordID parses.
+ # We can't recreate the full v3 schema — we just need the row id to
+ # use the source_span: form.
+ await client.execute("DEFINE TABLE source_span SCHEMAFULL")
+ await client.execute("CREATE source_span:legacy_span_1 SET text = 'legacy'")
+ await client.execute("CREATE input_span:span_1 SET text = 'fresh', source_type = 'transcript'")
+ await client.execute(
+ "CREATE decision:dec_1 SET description = 'd1', source_type = 'transcript', "
+ "source_ref = '', status = 'ungrounded', canonical_id = 'fixture-cid-1'"
+ )
+ # RELATE refuses cross-table on a typed edge, so write the bad row
+ # by direct CREATE on the yields table after defining it permissively.
+ await client.execute("DEFINE TABLE yields SCHEMAFULL TYPE RELATION")
+ await client.execute("RELATE source_span:legacy_span_1 -> yields -> decision:dec_1")
+ # And one valid row so the dedupe step has something legitimate.
+ await client.execute("RELATE input_span:span_1 -> yields -> decision:dec_1")
+ # Mark the schema as v16 so the migration loop targets v16 → v17.
+ await client.execute("DEFINE TABLE schema_meta SCHEMAFULL")
+ await client.execute("DEFINE FIELD version ON schema_meta TYPE int")
+ await client.execute("CREATE schema_meta SET version = 16")
+
+
+# Post-migration assertions specific to this fixture.
+async def assert_clean(client) -> None:
+ """All ``yields`` rows must reference an ``input_span`` IN."""
+ rows = await client.query("SELECT id, type::string(in) AS in_table FROM yields")
+ bad = [r for r in (rows or []) if not str(r.get("in_table", "")).startswith("input_span:")]
+ assert not bad, f"v16→v17 left stale yields rows: {bad}"
diff --git a/tests/test_ledger_bicameral_meta_migration.py b/tests/test_ledger_bicameral_meta_migration.py
index e3b94c81..99c2b580 100644
--- a/tests/test_ledger_bicameral_meta_migration.py
+++ b/tests/test_ledger_bicameral_meta_migration.py
@@ -32,7 +32,7 @@ async def test_migrate_v15_to_v16_is_no_op_for_existing_v15_ledger(fresh_client)
await migrate(fresh_client, allow_destructive=True)
rows = await fresh_client.query("SELECT version FROM schema_meta LIMIT 1")
assert rows[0]["version"] == SCHEMA_VERSION
- assert SCHEMA_VERSION == 16
+ assert SCHEMA_VERSION >= 16 # current floor — bumps land here as version increments
bm_rows = await fresh_client.query("SELECT * FROM bicameral_meta")
# Migration body is a no-op; sentinel writes happen in adapter.connect, not here.
assert bm_rows == []
diff --git a/tests/test_legacy_ledger_fixtures.py b/tests/test_legacy_ledger_fixtures.py
new file mode 100644
index 00000000..8cd29c9c
--- /dev/null
+++ b/tests/test_legacy_ledger_fixtures.py
@@ -0,0 +1,97 @@
+"""Fixture-replay regression suite (#296 Layer A + B).
+
+For every frozen DB shape under ``tests/fixtures/legacy_ledgers/``:
+
+ 1. Build the bad state in a fresh ``memory://`` ledger.
+ 2. Run ``init_schema`` + ``migrate`` (the production code path).
+ 3. Assert the schema reaches ``SCHEMA_VERSION``, the fixture's own
+ ``assert_clean`` invariants hold, and a second ``init_schema`` +
+ ``migrate`` is a no-op (idempotent).
+
+Adding a new fixture requires no test code — register it in
+``FIXTURES`` and the parametrized test exercises it on every PR.
+"""
+
+from __future__ import annotations
+
+import importlib
+import sys
+from pathlib import Path
+
+import pytest
+
+from ledger.client import LedgerClient
+from ledger.schema import SCHEMA_VERSION, init_schema, migrate
+
+# Fixtures live under tests/fixtures/legacy_ledgers/. ``tests/`` is not
+# a package (no top-level __init__.py) so we extend sys.path to import
+# the fixture modules by name.
+_FIXTURES_DIR = Path(__file__).parent / "fixtures" / "legacy_ledgers"
+if str(_FIXTURES_DIR) not in sys.path:
+ sys.path.insert(0, str(_FIXTURES_DIR))
+
+import v3_yields_source_span # noqa: E402 — see sys.path comment
+
+# Each entry: (slug, module). Module must export `build(client)` and
+# may export `assert_clean(client)` for fixture-specific invariants.
+FIXTURES = [
+ ("v3_yields_source_span", v3_yields_source_span),
+]
+
+
+async def _fresh_client(slug: str) -> LedgerClient:
+ c = LedgerClient(url="memory://", ns="bicameral_test", db=f"fixture_{slug}")
+ await c.connect()
+ return c
+
+
+@pytest.mark.phase2
+@pytest.mark.asyncio
+@pytest.mark.parametrize("slug,module", FIXTURES, ids=[s for s, _ in FIXTURES])
+async def test_legacy_ledger_fixture_reaches_clean_state(slug: str, module) -> None:
+ """init_schema + migrate must terminate cleanly on every fixture."""
+ c = await _fresh_client(slug)
+ try:
+ # Build the broken DB state.
+ await module.build(c)
+
+ # Run the production init/migrate path. It must not raise — that's
+ # the entire safety contract this suite enforces.
+ await init_schema(c)
+ await migrate(c, allow_destructive=True)
+
+ # Schema reached current version.
+ rows = await c.query("SELECT version FROM schema_meta LIMIT 1")
+ assert rows, f"{slug}: schema_meta empty after migrate"
+ assert rows[0]["version"] == SCHEMA_VERSION, (
+ f"{slug}: schema_meta.version = {rows[0]['version']!r}, expected {SCHEMA_VERSION}"
+ )
+
+ # Fixture-specific invariants.
+ if hasattr(module, "assert_clean"):
+ await module.assert_clean(c)
+
+ # Idempotency: a second pass changes nothing.
+ await init_schema(c)
+ await migrate(c, allow_destructive=True)
+ rows = await c.query("SELECT version FROM schema_meta LIMIT 1")
+ assert rows[0]["version"] == SCHEMA_VERSION, (
+ f"{slug}: schema_meta.version regressed on second migrate ({rows[0]['version']!r})"
+ )
+ finally:
+ await c.close()
+
+
+@pytest.mark.phase2
+@pytest.mark.asyncio
+async def test_fixture_registry_imports() -> None:
+ """Every fixture in ``FIXTURES`` must export an async ``build``.
+
+ Catches typos in the registry — a missing ``build`` would silently
+ skip the migration assertion on that row.
+ """
+ for slug, module in FIXTURES:
+ assert hasattr(module, "build"), f"{slug} missing build()"
+ # Re-import via importlib to confirm the module is on sys.path
+ # under its registered slug (catches typos in FIXTURES).
+ importlib.import_module(slug)
diff --git a/tests/test_schema_recoverable_errors.py b/tests/test_schema_recoverable_errors.py
new file mode 100644
index 00000000..1926d791
--- /dev/null
+++ b/tests/test_schema_recoverable_errors.py
@@ -0,0 +1,109 @@
+"""SurrealDB error-format contract test (#296 Layer A).
+
+``ledger.schema._execute_define_idempotent`` swallows the substrings in
+``RECOVERABLE_DEFINE_PATTERNS``: those are the load-bearing safety
+contract that lets ``init_schema`` continue past row-state that the
+next migration is responsible for cleaning up.
+
+If a future ``surrealdb-py`` bump changes the error-string format, the
+catch silently stops working and the user sees the same crash that
+motivated #296. This test fabricates the bad-row state, provokes the
+error, and asserts at least one of the recoverable substrings matches.
+When it fails, the message tells the maintainer exactly what the new
+format looks like so the constants list can be updated explicitly.
+"""
+
+from __future__ import annotations
+
+import pytest
+
+from ledger.client import LedgerClient, LedgerError
+from ledger.schema import RECOVERABLE_DEFINE_PATTERNS
+
+
+async def _fresh_client(suffix: str) -> LedgerClient:
+ c = LedgerClient(url="memory://", ns="bicameral_test", db=f"recov_{suffix}")
+ await c.connect()
+ return c
+
+
+@pytest.mark.phase2
+@pytest.mark.asyncio
+async def test_define_index_unique_violation_matches_recoverable_pattern() -> None:
+ """A DEFINE INDEX UNIQUE on a table with duplicate rows must produce
+ one of the substrings in ``RECOVERABLE_DEFINE_PATTERNS`` (this is
+ what the v4→v5 cleanup relies on)."""
+ c = await _fresh_client("uniq")
+ try:
+ await c.execute("DEFINE TABLE thing SCHEMAFULL")
+ await c.execute("DEFINE FIELD k ON thing TYPE string")
+ # Two rows with the same key — UNIQUE will reject the index.
+ await c.execute("CREATE thing SET k = 'dup'")
+ await c.execute("CREATE thing SET k = 'dup'")
+ with pytest.raises(LedgerError) as exc:
+ await c.execute("DEFINE INDEX idx_thing_k ON thing FIELDS k UNIQUE")
+ msg = str(exc.value).lower()
+ matched = [p for p in RECOVERABLE_DEFINE_PATTERNS if p in msg]
+ assert matched, (
+ "SurrealDB UNIQUE-violation error string changed — the "
+ "_execute_define_idempotent catch will no longer cover it. "
+ "Update RECOVERABLE_DEFINE_PATTERNS in ledger/schema.py to "
+ f"include a substring of: {exc.value!r}"
+ )
+ finally:
+ await c.close()
+
+
+@pytest.mark.phase2
+@pytest.mark.asyncio
+async def test_define_index_overwrite_type_violation_matches_recoverable_pattern() -> None:
+ """A DEFINE INDEX OVERWRITE on a table whose existing rows violate the
+ column types referenced by the index must produce one of
+ ``RECOVERABLE_DEFINE_PATTERNS``. This is the exact failure mode that
+ motivated #296 (yields.in = source_span:* against the new
+ RELATION IN input_span constraint)."""
+ c = await _fresh_client("type")
+ try:
+ # Set up a typed RELATION table with a row that violates the type.
+ await c.execute("DEFINE TABLE source_span SCHEMAFULL")
+ await c.execute("DEFINE TABLE input_span SCHEMAFULL")
+ await c.execute("DEFINE TABLE decision SCHEMAFULL")
+ await c.execute("CREATE source_span:legacy_1 SET text = 'old'")
+ await c.execute("CREATE input_span:fresh_1 SET text = 'new'")
+ await c.execute("CREATE decision:d_1 SET description = 'd'")
+ # Permissive yields → insert the bad row → tighten the type and
+ # try to apply the unique index. The OVERWRITE re-validates.
+ await c.execute("DEFINE TABLE yields SCHEMAFULL TYPE RELATION")
+ await c.execute("RELATE source_span:legacy_1 -> yields -> decision:d_1")
+ await c.execute("RELATE input_span:fresh_1 -> yields -> decision:d_1")
+ # Tighten — this re-validates the source_span row against the new IN type.
+ await c.execute(
+ "DEFINE TABLE OVERWRITE yields SCHEMAFULL TYPE RELATION IN input_span OUT decision"
+ )
+ with pytest.raises(LedgerError) as exc:
+ await c.execute(
+ "DEFINE INDEX OVERWRITE idx_yields_unique ON yields FIELDS in, out UNIQUE"
+ )
+ msg = str(exc.value).lower()
+ matched = [p for p in RECOVERABLE_DEFINE_PATTERNS if p in msg]
+ assert matched, (
+ "SurrealDB type-mismatch error string changed — the "
+ "_execute_define_idempotent catch will no longer cover the #296 "
+ "scenario. Update RECOVERABLE_DEFINE_PATTERNS in ledger/schema.py "
+ f"to include a substring of: {exc.value!r}"
+ )
+ finally:
+ await c.close()
+
+
+@pytest.mark.phase2
+def test_recoverable_patterns_constant_is_lowercase() -> None:
+ """The catch lower-cases the SurrealDB message before substring
+ matching. Patterns must be lowercase too, or they'd silently never
+ match."""
+ for pattern in RECOVERABLE_DEFINE_PATTERNS:
+ assert pattern == pattern.lower(), (
+ f"RECOVERABLE_DEFINE_PATTERNS entry {pattern!r} contains "
+ "non-lowercase characters; the substring catch lower-cases "
+ "the SurrealDB message before matching."
+ )