diff --git a/.github/workflows/test-mcp-regression.yml b/.github/workflows/test-mcp-regression.yml index e47641cb..2ed25bda 100644 --- a/.github/workflows/test-mcp-regression.yml +++ b/.github/workflows/test-mcp-regression.yml @@ -77,6 +77,8 @@ jobs: tests/test_phase1_code_locator.py tests/test_phase2_ledger.py tests/test_phase3_integration.py + tests/test_legacy_ledger_fixtures.py + tests/test_schema_recoverable_errors.py -v --tb=short --junitxml=test-results/results.xml --html=test-results/report.html --self-contained-html diff --git a/CHANGELOG.md b/CHANGELOG.md index 8793e087..99287bb1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,26 @@ All notable changes to bicameral-mcp are tracked here. Format loosely follows - **#243 Piece B — Eager symbol-index initialization at server startup, fail-loud on empty index.** Pre-fix, `get_code_locator()` returned a fresh `RealCodeLocatorAdapter` per call AND lazy-initialized the symbol index on first `_ensure_initialized()` call — so the FIRST tool call after server boot paid the index-build cost AND could race the index check on concurrent dispatch. Post-fix, `get_code_locator()` is **singleton-by-`REPO_PATH`** (multi-repo correctness preserved) with a new `reset_code_locator_cache()` test-only hook. New `async def RealCodeLocatorAdapter.initialize()` wraps `_ensure_initialized()` in a thread-pool executor for the `serve_stdio()` startup path; idempotent on already-initialized adapters. `serve_stdio()` calls `await get_code_locator().initialize()` between the dashboard sidecar start and the consent-notice block — **fail-loud**: an empty/missing index now propagates `RuntimeError("Code locator index is empty...")` and aborts boot with a clear stderr message ("Run: python -m code_locator index ") rather than silently degrading every preflight call. Lazy init via `_ensure_initialized` remains for test contexts where mock adapters bypass startup. 4 new tests in `tests/test_preflight_graph_expansion.py` cover singleton-by-`REPO_PATH` (with reset), `initialize()` idempotency, `RuntimeError` propagation, and the `serve_stdio` boot-refusal path. Together with Piece A: production deployments can no longer accumulate silent-fallback hours; either the index is healthy at boot OR the operator gets a loud failure they can act on. Refs #243 (parent #173 / PR #174). Plan signoff via https://github.com/BicameralAI/bicameral-mcp/issues/243#issuecomment-4414163338. +## v0.14.5 — bicameral_diagnose tool + reset --replay-from-events + README polish (triage) + +Triages the #296 ledger-resilience track and the #299 README pass onto main. New `bicameral_diagnose` MCP tool gives operators a privacy-preserving structural read of their ledger (recovery_path classification + table counts + recent audit events) without crashing on init when the ledger is corrupt — pairs with `bicameral_reset --replay-from-events` for the dataloss-avoidant recovery path. README opens with a position-take pitch + three-scene demo video drop-in (ingest → preflight → ratify async). + +### Added + +- **`bicameral_diagnose` MCP tool (#296).** Read-only structural diagnosis that opens a raw `LedgerClient` (no `init_schema` / `migrate`) so it works even when the normal adapter connect crashes during init. Returns `recovery_path` classification (`clean` / `fixable` / `reset_rebuild` / `reset_destructive`), `schema_meta` state, table counts, recent `warn`|`error` audit events, and a `next_action` recommendation. Slash alias `/bicameral-diagnose`. New skill at `skills/bicameral-diagnose/SKILL.md`. Privacy-preserving — only structural shape leaves the machine, never decision content. +- **`bicameral_reset --replay-from-events` flag (#296).** Rebuild the ledger from the team-mode JSONL event substrate instead of nuking decision rows. Use when the binary store is corrupt but the event log is intact — recovers without dataloss in team mode. +- **Legacy-ledger fixtures + replay tests (#296).** New `tests/fixtures/legacy_ledgers/` holds reproducible byte-level corruption fixtures (e.g. `v3_yields_source_span.py` for the v16→v17 yields integrity bug). `tests/test_legacy_ledger_fixtures.py` and `tests/test_schema_recoverable_errors.py` exercise the recovery paths against these fixtures. + +### Fixed + +- **`ledger/schema.py`: resilient init + v16→v17 yields integrity cleanup (#296).** v0.14.4 ledgers with stale `source_span:...` records on the `yields.in` field rejected `DEFINE INDEX OVERWRITE idx_yields_unique` on every connect, blocking `ingest` / `history` / `preflight` until manual reset. The migration now sweeps these stale rows during the v16→v17 step and the init path tolerates the recovery without aborting. Pairs with `bicameral_diagnose` for operator visibility into whether the migration ran. + +### Documentation + +- **README opener rewrite (#299).** Two-paragraph position-take: paragraph 1 names the failure mode ("requirement gaps surfaced mid-implementation are buried under thousands of lines of code"); paragraph 2 introduces Bicameral MCP as a **spec compliance layer** for AI-assisted engineering that ingests transcripts / PRDs / Slack threads, captures any mid-implementation decision that was not discussed (to be ratified async by the product owner), and pins each one to the implementing code. +- **README demo video section (#299).** Replaces the dashboard image transition with a three-beat demo loop — ingest (PM/dev) → preflight (auto) → ratify async (product owner) — each as an inline `user-attachments` video drop-in so the videos render on github.com without asset-path coupling. +- **README star CTA relocation (#299).** Moved from the top header (where it sat awkwardly between the hero image and the logo) to a centered placement immediately after the demo videos — natural post-demo conversion beat. + ## v0.14.4 — Hotfix: skip install-time manifest verification until release-side sigstore wiring lands Hotfix on v0.14.3. Unblocks every fresh `bicameral-mcp setup` against the published wheel — the v0.14.x wheels ship `skills-manifest.toml` and `hooks-manifest.json` but no `.sig`/`.crt` companions yet (release-side sigstore signing is a deferred follow-up of #218 LLM-06 / #237 LLM-11). The install-time verifiers were hitting a missing-signature path their own docstrings claim is unreachable, raising `SignatureError` and aborting setup. diff --git a/CLAUDE.md b/CLAUDE.md index fb45915b..6aee734a 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -20,6 +20,27 @@ worse than a compile error because it fails at runtime in production sessions. - [ ] Did a new tool get added? → Create `skills//SKILL.md` - [ ] Did a status literal gain a new value (e.g. `"proposal"`)? → Update every skill that renders status +## Sociable Testing for UX Paths (Mandatory for Handlers + Ledger) + +Default to **sociable unit tests** ([Martin Fowler, "On the Diverse And Fantastical Shape of Testing"](https://martinfowler.com/articles/2021-test-shapes.html)) for anything the MCP agent actually invokes: handlers under `handlers/`, ledger queries in `ledger/`, and the contracts they return. A test is **solitary** when it replaces a collaborator we ship to users (the `ctx`, the `ledger`, a handler in the call graph) with a `MagicMock` / `AsyncMock` / `patch(...)`; it's **sociable** when it runs the real collaborator and only seams off something we genuinely can't run in tests (network, time, external SaaS, an injected failure mode like "symbol disappears"). + +The motivation is concrete: AI-authored tests skew solitary because mocks are easy to make pass. A solitary test for `get_session_start_banner` stayed green for months while `get_decisions_by_status` was selecting an undefined `decision_id` field and returning `None` for every banner row — agents saw null IDs in production while the suite reported full coverage. The first sociable run caught it. + +**Rules** + +1. **Handler tests** (`tests/test_*.py`) — instantiate a real `SurrealDBLedgerAdapter` over `memory://` and seed rows with the production schema. Reference pattern: `tests/test_codegenome_continuity_service.py::_fresh_adapter` and `tests/test_sync_middleware.py::_make_real_adapter`. +2. **Ledger query tests** — never `MagicMock` the client. Use the real `LedgerClient(url="memory://", ...)` + `init_schema` + `migrate`. +3. **`ctx` should be `SimpleNamespace`, not `MagicMock`** — when a handler grows a new required field, `SimpleNamespace` raises `AttributeError` and the test fails honestly; `MagicMock` silently invents the field. +4. **Narrow seams are fine** when the alternative is impossible or fragile: patching `ledger.status.resolve_symbol_lines` to simulate a missing symbol (`tests/test_link_commit_grounding.py:185`), patching `handle_link_commit` when testing the *caller's* cache logic (not link_commit itself), patching `time.monotonic` for TTL math. +5. **Solitary is correct for** pure helpers (`_check_payload_size` standalone), external boundaries we can't run (`tests/test_backends_google_drive_unit.py`), and concurrency primitives that don't talk to collaborators (`repo_write_barrier` tests). + +**Checklist before opening a tests-only PR** + +- [ ] Does the test instantiate `MagicMock` for `ctx` or `ledger`? → Replace with `SimpleNamespace` + real adapter unless one of the "solitary is correct" exceptions applies. +- [ ] Does the test hand-craft a row dict that mimics what the ledger returns? → Seed the real ledger and let it produce the row. +- [ ] Does an `assert_called_once_with()` mirror the production code? → That's a tautology. Replace it with an assertion on observable behavior (what the user/agent sees). +- [ ] Does the failure mode under test (e.g. symbol disappeared, ledger crashed) actually require a patch? → Yes is fine; pin the patch to the narrowest seam. + ## Auto-Tick Rule After completing **any** implementation work in this directory: diff --git a/README.md b/README.md index 4657d7c8..67ebfc86 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,7 @@ [![License: MIT](https://img.shields.io/badge/License-MIT-blue.svg)](https://opensource.org/licenses/MIT) [![CI](https://img.shields.io/github/actions/workflow/status/BicameralAI/bicameral-mcp/test-mcp-regression.yml?branch=main&label=tests)](https://github.com/BicameralAI/bicameral-mcp/actions) -AI agents ship code fast. They forget what your team agreed — and most agreements emerge mid-flight, in corrections and side comments that never reach a doc. +AI agents ship code fast. They forget what your team agreed — and requirement gaps surfaced mid-implementation are buried under thousands of lines of code. Bicameral MCP is a **spec compliance layer** for AI-assisted engineering. Local-first; runs as an [MCP server](https://spec.modelcontextprotocol.io/). It ingests your meeting transcripts, PRDs, and Slack threads, captures any mid-implementation decision that was not discussed, to be ratified async by your product owner, and pins each one to the code that implements it — so your agent finds out the moment it drifts from either the written spec or the spoken one. diff --git a/RECOMMENDED_VERSION b/RECOMMENDED_VERSION index 3393b5fd..436d0ce0 100644 --- a/RECOMMENDED_VERSION +++ b/RECOMMENDED_VERSION @@ -1 +1 @@ -0.14.4 +0.14.5 diff --git a/audit_log.py b/audit_log.py index a7bcc7bc..e062b746 100644 --- a/audit_log.py +++ b/audit_log.py @@ -71,6 +71,8 @@ class AuditEventType(enum.StrEnum): # #252 Layer 2 — wire-format sentinel observability LEDGER_SCHEMA_VERIFIED = "ledger_schema_verified" LEDGER_VERSION_DRIFT = "ledger_version_drift" + # #296 — recoverable schema-definition skip (init_schema deferring to migrate) + SCHEMA_DEFINE_SKIPPED = "schema_define_skipped" _LEVEL_BY_EVENT: dict[AuditEventType, str] = { @@ -84,6 +86,7 @@ class AuditEventType(enum.StrEnum): AuditEventType.ERROR: "error", AuditEventType.LEDGER_SCHEMA_VERIFIED: "info", AuditEventType.LEDGER_VERSION_DRIFT: "warn", + AuditEventType.SCHEMA_DEFINE_SKIPPED: "warn", } _LEVEL_RANK = {"info": 10, "warn": 20, "error": 30} diff --git a/cli/_diagnose_gather.py b/cli/_diagnose_gather.py index ba11620b..1854bf05 100644 --- a/cli/_diagnose_gather.py +++ b/cli/_diagnose_gather.py @@ -23,9 +23,8 @@ _RECENT_EVENT_TAIL = 5 -def _read_ledger_metadata(adapter) -> tuple[str, int | None, str | None]: +def _read_ledger_metadata_for_url(url: str) -> tuple[str, int | None, str | None]: """Return (ledger_url, size_bytes_or_None, mtime_iso_or_None).""" - url = getattr(adapter, "_url", "") if not url.startswith("surrealkv://"): return url, None, None path_str = url.removeprefix("surrealkv://") @@ -37,21 +36,23 @@ def _read_ledger_metadata(adapter) -> tuple[str, int | None, str | None]: return url, stat.st_size, mtime_iso -async def _read_bicameral_meta( - adapter, -) -> tuple[str | None, str | None, str | None, str, str]: - """Return (first_write, last_write, last_write_at_iso, drift_status, running). +def _read_ledger_metadata(adapter) -> tuple[str, int | None, str | None]: + return _read_ledger_metadata_for_url(getattr(adapter, "_url", "")) - ``drift_status`` is one of: ``"first-write"`` / ``"match"`` / ``"drift"`` / - ``"unavailable"`` (table missing, e.g., pre-Layer-2 ledger). - """ + +async def _read_bicameral_meta_raw( + client, +) -> tuple[str | None, str | None, str | None, str, str]: + """Same shape as ``_read_bicameral_meta`` but operates on a raw + ``LedgerClient``. Used by the MCP ``bicameral_diagnose`` tool, which + must work without ``init_schema``/``migrate`` succeeding.""" try: running = importlib.metadata.version("surrealdb") except importlib.metadata.PackageNotFoundError: running = "unknown" try: - rows = await adapter._client.query("SELECT * FROM bicameral_meta LIMIT 1") + rows = await client.query("SELECT * FROM bicameral_meta LIMIT 1") except Exception: # noqa: BLE001 — table missing is the load-bearing case return None, None, None, "unavailable", running @@ -71,9 +72,20 @@ async def _read_bicameral_meta( return first, last, last_at_iso, "drift", running -async def _read_schema_version(adapter) -> int | None: +async def _read_bicameral_meta( + adapter, +) -> tuple[str | None, str | None, str | None, str, str]: + """Return (first_write, last_write, last_write_at_iso, drift_status, running). + + ``drift_status`` is one of: ``"first-write"`` / ``"match"`` / ``"drift"`` / + ``"unavailable"`` (table missing, e.g., pre-Layer-2 ledger). + """ + return await _read_bicameral_meta_raw(adapter._client) + + +async def _read_schema_version_raw(client) -> int | None: try: - rows = await adapter._client.query("SELECT version FROM schema_meta LIMIT 1") + rows = await client.query("SELECT version FROM schema_meta LIMIT 1") except Exception: # noqa: BLE001 return None if not rows: @@ -82,11 +94,15 @@ async def _read_schema_version(adapter) -> int | None: return int(val) if val is not None else None -async def _read_table_counts(adapter) -> dict[str, int]: +async def _read_schema_version(adapter) -> int | None: + return await _read_schema_version_raw(adapter._client) + + +async def _read_table_counts_raw(client) -> dict[str, int]: counts: dict[str, int] = {} for table in _CANONICAL_TABLES: try: - rows = await adapter._client.query(f"SELECT count() AS n FROM {table} GROUP ALL") + rows = await client.query(f"SELECT count() AS n FROM {table} GROUP ALL") except Exception: # noqa: BLE001 — missing table is acceptable (pre-v16) continue if rows: @@ -95,6 +111,10 @@ async def _read_table_counts(adapter) -> dict[str, int]: return counts +async def _read_table_counts(adapter) -> dict[str, int]: + return await _read_table_counts_raw(adapter._client) + + def _resolve_audit_log_channel() -> tuple[str, Path | None]: """Return (channel_label, configured_file_path_or_None).""" raw = os.getenv("BICAMERAL_AUDIT_LOG", "stderr").strip() @@ -195,8 +215,17 @@ def _fetch_recommended() -> str | None: return None -async def gather_diagnosis(adapter) -> Diagnosis: - """Collect every allowlisted field from the running install + ledger.""" +async def gather_diagnosis_raw(client, ledger_url: str) -> Diagnosis: + """Same allowlisted gather as ``gather_diagnosis`` but takes a raw + ``LedgerClient`` and an explicit ``ledger_url``. + + Used by the MCP ``bicameral_diagnose`` tool, which opens a raw client + so it can produce a report even when ``adapter.connect()`` (and its + init_schema / migrate calls) would crash on a corrupted ledger. The + CLI ``bicameral-mcp diagnose`` keeps using ``gather_diagnosis`` + (adapter-based) because it benefits from the adapter's connection + lifecycle in the happy-path operator-bug-report flow. + """ try: bicameral_version = importlib.metadata.version("bicameral-mcp") except importlib.metadata.PackageNotFoundError: @@ -204,10 +233,10 @@ async def gather_diagnosis(adapter) -> Diagnosis: from ledger.schema import SCHEMA_VERSION - ledger_url, size_bytes, mtime_iso = _read_ledger_metadata(adapter) - first, last, last_at_iso, drift_status, running = await _read_bicameral_meta(adapter) - schema_recorded = await _read_schema_version(adapter) - table_counts = await _read_table_counts(adapter) + _, size_bytes, mtime_iso = _read_ledger_metadata_for_url(ledger_url) + first, last, last_at_iso, drift_status, running = await _read_bicameral_meta_raw(client) + schema_recorded = await _read_schema_version_raw(client) + table_counts = await _read_table_counts_raw(client) channel_label, audit_path = _resolve_audit_log_channel() recent_events = _tail_recent_events(audit_path, _RECENT_EVENT_TAIL) @@ -242,3 +271,12 @@ async def gather_diagnosis(adapter) -> Diagnosis: recent_events=recent_events, suggestions=suggestions, ) + + +async def gather_diagnosis(adapter) -> Diagnosis: + """Adapter-flavoured wrapper over ``gather_diagnosis_raw``. + + Reads the ledger URL off the adapter and forwards to the raw helper. + Existing CLI callers (`bicameral-mcp diagnose`) keep this entry point. + """ + return await gather_diagnosis_raw(adapter._client, getattr(adapter, "_url", "")) diff --git a/contracts.py b/contracts.py index 0efefa53..2245c88b 100644 --- a/contracts.py +++ b/contracts.py @@ -650,6 +650,43 @@ class ResetResponse(BaseModel): replay_plan: list[ResetReplayEntry] = [] replay_errors: list[str] = [] next_action: str + # #296 Layer E — automated rebuild from .bicameral/events/*.jsonl + # after wipe. Populated only when `replay_from_events=True` and + # `confirm=True`; reports how many events the materializer replayed. + events_replayed: int = 0 + + +# ── Tool 8 (new): /bicameral_diagnose ──────────────────────────────── + + +class DiagnoseResponse(BaseModel): + """Read-only diagnostic snapshot. Mirrors the CLI ``bicameral-mcp + diagnose`` output but returns structured fields so agents can render + a recovery prompt deterministically. + + `recovery_path` classifies the next operator action: + - ``clean`` — ledger looks healthy, no remediation needed + - ``fixable`` — schema is behind binary; next normal call migrates + - ``reset_rebuild`` — ledger broken AND events present → reset + with `replay_from_events=True` recovers without data loss + - ``reset_destructive`` — ledger broken AND no events → reset + loses decision history; user must explicitly accept + + `diagnosis` carries the same structural-metadata-only fields the + CLI emits (see ``cli.diagnose.Diagnosis``); empty when the raw + client could not connect. + """ + + ledger_url: str + connect_error: str = "" + recovery_path: Literal[ + "clean", + "fixable", + "reset_rebuild", + "reset_destructive", + ] + diagnosis: dict | None = None + next_action: str # ── Tool 9: /bicameral_preflight ───────────────────────────────────── diff --git a/handlers/diagnose.py b/handlers/diagnose.py new file mode 100644 index 00000000..5099d477 --- /dev/null +++ b/handlers/diagnose.py @@ -0,0 +1,166 @@ +"""Handler for /bicameral_diagnose MCP tool. + +Read-only structural diagnostic. Mirrors the ``bicameral-mcp diagnose`` +CLI but exposed as an MCP tool so agents can call it from any +tool-error envelope. + +Critical property: this handler MUST work even when ``adapter.connect()`` +crashes inside ``init_schema``/``migrate``. It opens a raw +``LedgerClient`` directly and calls ``gather_diagnosis_raw``, which +reads tables defensively (missing tables and SELECT failures are +treated as "unavailable" not propagated as exceptions). That's the +whole point — when the ledger is broken, the agent needs a tool that +still answers "what's wrong?" + +Repair is a deliberate CLI operation (``bicameral-mcp diagnose +--repair``), never an in-session agent action. This tool is +intentionally read-only. +""" + +from __future__ import annotations + +import logging +import os +from typing import Literal + +from contracts import DiagnoseResponse + +RecoveryPath = Literal["clean", "fixable", "reset_rebuild", "reset_destructive"] + +logger = logging.getLogger(__name__) + + +def _resolve_ledger_url(ctx) -> str: + """Pick the ledger URL from the same source ``adapter.connect()`` does. + + Order: ``ctx.ledger._url`` (if connected), then ``SURREAL_URL`` env, + then the embedded default. Mirrors ``SurrealDBLedgerAdapter.__init__``. + """ + ledger = getattr(ctx, "ledger", None) + inner = getattr(ledger, "_inner", ledger) if ledger is not None else None + for obj in (ledger, inner): + if obj is None: + continue + url = getattr(obj, "_url", None) + if url: + return str(url) + + from ledger.adapter import _default_db_url + + return os.environ.get("SURREAL_URL", _default_db_url()) + + +async def handle_diagnose(ctx) -> DiagnoseResponse: + """Probe the ledger read-only and return a structural diagnosis. + + Always opens its own raw ``LedgerClient`` rather than reusing the + adapter's client — the adapter may be in a partially-connected + state, and going through ``adapter._ensure_connected`` would re-run + the migration that's already failing. + """ + from cli._diagnose_gather import gather_diagnosis_raw + from ledger.client import LedgerClient + + ledger_url = _resolve_ledger_url(ctx) + + client = LedgerClient(url=ledger_url) + try: + await client.connect() + except Exception as exc: # noqa: BLE001 — operator needs the failure context + logger.warning("[diagnose] raw connect failed for %s: %s", ledger_url, exc) + return DiagnoseResponse( + ledger_url=ledger_url, + connect_error=f"{type(exc).__name__}: {exc}", + recovery_path="reset_destructive", + diagnosis=None, + next_action=( + "Raw client could not connect to the ledger URL. The DB file " + "may be missing, locked, or unreadable. Inspect the path, then " + "run `bicameral-mcp reset --confirm` to reinitialise. If the " + "directory contains team-mode events under .bicameral/events/, " + "use `bicameral_reset(replay_from_events=True, confirm=True)` " + "to rebuild from the substrate after wipe." + ), + ) + + try: + diagnosis = await gather_diagnosis_raw(client, ledger_url) + finally: + try: + await client.close() + except Exception: # noqa: BLE001 — close is best-effort + pass + + recovery_path, next_action = _classify_recovery(diagnosis) + return DiagnoseResponse( + ledger_url=ledger_url, + connect_error="", + recovery_path=recovery_path, + diagnosis=diagnosis.__dict__, # Diagnosis is a frozen dataclass + next_action=next_action, + ) + + +def _classify_recovery(diagnosis) -> tuple[RecoveryPath, str]: + """Translate the raw diagnosis into one of four recovery paths. + + - ``clean``: schema_recorded == schema_expected and no flagged drift. + - ``fixable``: schema_recorded < schema_expected — `migrate` will + catch up on next normal connect. + - ``reset_rebuild``: ledger is past the binary's schema OR the + diagnose surfaces stale-row warnings; .bicameral/events/ is + present, so reset+replay recovers without data loss. + - ``reset_destructive``: same condition as reset_rebuild but no + events on disk → user must accept data loss. + + The classification is deliberately conservative — anything weird + routes to the operator with a clear next_action, not an automated + fix. + """ + rec = diagnosis.schema_version_recorded + exp = diagnosis.schema_version_expected + has_events = _events_present(diagnosis.ledger_url) + + if rec is not None and rec > exp: + path: RecoveryPath = "reset_rebuild" if has_events else "reset_destructive" + return path, ( + f"Ledger schema v{rec} is newer than this binary (v{exp}). " + f"Upgrade `bicameral-mcp` to a version that understands v{rec}, " + f"or run `bicameral_reset(replay_from_events={has_events}, confirm=True)`." + ) + + if rec is not None and rec < exp: + return "fixable", ( + f"Ledger schema v{rec} is behind binary v{exp}. The next normal " + f"connect will run pending migrations. If connect is failing, " + f"the cleanup migration may need a re-run — `bicameral-mcp diagnose --repair`." + ) + + if rec is None: + return "clean", ( + "Schema version not yet recorded — likely a fresh install. " + "Any tool call will initialise the ledger." + ) + + # rec == exp — confirm the table counts look sane + table_counts = diagnosis.table_counts or {} + if not table_counts: + return "fixable", ( + "Schema version matches but no tables visible. " + "Connect may have stopped mid-init; re-run a tool call to retry." + ) + + return "clean", (f"Ledger is at expected schema v{exp}. No remediation needed.") + + +def _events_present(ledger_url: str) -> bool: + """Best-effort check for ``.bicameral/events/*.jsonl``.""" + if not ledger_url.startswith("surrealkv://"): + return False + from pathlib import Path + + db_path = Path(ledger_url.removeprefix("surrealkv://")) + events_dir = db_path.parent / "events" + if not events_dir.exists(): + return False + return any(events_dir.glob("*.jsonl")) diff --git a/handlers/reset.py b/handlers/reset.py index 2814ddb1..a05da9ee 100644 --- a/handlers/reset.py +++ b/handlers/reset.py @@ -1,6 +1,6 @@ """Handler for /bicameral_reset MCP tool. -The fail-safe valve. Two modes: +The fail-safe valve. Two wipe modes plus an optional replay path. wipe_mode="ledger" (default) Wipes the materialized SurrealDB rows scoped to the current repo. @@ -14,10 +14,19 @@ Use this for: nuclear restart, switching repos, credential rotation. The user must explicitly confirm after seeing the warning. + replay_from_events=True (#296 Layer E) + After a successful ``wipe_mode="ledger"`` wipe, the watermark is + reset and ``EventMaterializer.replay_new_events`` rebuilds the + local DB from ``.bicameral/events/.jsonl``. The event log + is the canonical record (committed to git in team mode) — replay + is recovery, not destruction. Combined with ``wipe_mode="full"`` + is rejected because full-wipe deletes the very events we'd replay. + Safety design: - Dry run by default. confirm=False returns the plan without touching state. - Replay plan is always computed before any destructive operation. - Full mode surfaces the exact path that will be deleted in the dry run. + - replay_from_events surfaces the on-disk event count in the dry run. """ from __future__ import annotations @@ -35,6 +44,7 @@ async def handle_reset( replay: bool = True, confirm: bool = False, wipe_mode: str = "ledger", + replay_from_events: bool = False, ) -> ResetResponse: """Wipe the ledger (and optionally the full .bicameral/ dir) for ctx.repo_path. @@ -44,7 +54,32 @@ async def handle_reset( confirm: False = dry run (default). True = execute. wipe_mode: "ledger" = wipe DB rows only (server stays live). "full" = delete the entire .bicameral/ directory. + replay_from_events: After wipe (only when wipe_mode="ledger"), + replay every event in .bicameral/events/*.jsonl through the + ingest path to recover decisions deterministically. Mutually + exclusive with wipe_mode="full" — that mode deletes the + substrate we'd replay from. """ + if replay_from_events and wipe_mode == "full": + return ResetResponse( + wiped=False, + wipe_mode=wipe_mode, + ledger_url=_resolve_ledger_url(ctx, ctx.ledger), + bicameral_dir="", + repo=ctx.repo_path, + cursors_before=0, + replay_plan=[], + replay_errors=[ + "replay_from_events is incompatible with wipe_mode='full' " + "(full wipe deletes .bicameral/events which is the replay source)" + ], + next_action=( + "Pick one: wipe_mode='ledger' + replay_from_events=True for " + "recovery from a corrupted ledger, OR wipe_mode='full' for a " + "nuclear restart that intentionally drops everything." + ), + ) + ledger = ctx.ledger if hasattr(ledger, "connect"): await ledger.connect() @@ -70,6 +105,8 @@ async def handle_reset( ledger_url = _resolve_ledger_url(ctx, ledger) bicameral_dir = _resolve_bicameral_dir(ledger) if wipe_mode == "full" else "" + events_on_disk = _count_events_on_disk(ledger) if replay_from_events else 0 + if not confirm: if wipe_mode == "full": dir_desc = ( @@ -83,6 +120,16 @@ async def handle_reset( f"WARNING: this removes config.yaml, team event files, and all history — " f"there is no undo. Re-run with confirm=True to execute." ) + elif replay_from_events: + next_action = ( + f"DRY RUN — LEDGER WIPE + REBUILD. Would wipe {cursors_before} " + f"source_cursor row(s), every bicameral node/edge scoped to " + f"{ctx.repo_path!r}, then reset the watermark and replay " + f"{events_on_disk} event(s) from .bicameral/events/*.jsonl through " + f"the ingest path. The event log is the canonical record — replay " + f"recovers decisions deterministically. " + f"Re-run with confirm=True to execute." + ) else: next_action = ( f"Dry run only. Would wipe {cursors_before} source_cursor row(s) " @@ -97,6 +144,7 @@ async def handle_reset( repo=ctx.repo_path, cursors_before=cursors_before, replay_plan=replay_plan if replay else [], + events_replayed=events_on_disk if replay_from_events else 0, next_action=next_action, ) @@ -138,6 +186,15 @@ async def handle_reset( bicameral_dir, ) + events_replayed = 0 + replay_errors: list[str] = [] + if replay_from_events and wipe_mode != "full": + try: + events_replayed = await _replay_events_into_ledger(ledger) + except Exception as exc: # noqa: BLE001 — surface failure but keep wipe done + logger.exception("[reset] replay_from_events failed: %s", exc) + replay_errors.append(f"replay_from_events failed: {exc}") + if wipe_mode == "full": next_action = ( f"Full wipe complete for repo {ctx.repo_path!r}. " @@ -146,6 +203,20 @@ async def handle_reset( f"Schema has been reinitialised — the server is ready for fresh ingestion. " f"Re-run the original bicameral_ingest calls for each entry in replay_plan." ) + elif replay_from_events: + if replay_errors: + next_action = ( + f"Ledger wiped for repo {ctx.repo_path!r}. Replay FAILED — see " + f"replay_errors. The wipe succeeded; the event substrate is intact. " + f"Re-run with confirm=True after addressing the replay error, or " + f"fall back to manual ingest using replay_plan." + ) + else: + next_action = ( + f"Ledger wiped and rebuilt from events for repo {ctx.repo_path!r}. " + f"{events_replayed} event(s) replayed through the ingest path. " + f"Verify with `bicameral_diagnose` or `bicameral_history`." + ) else: next_action = ( f"Ledger wiped for repo {ctx.repo_path!r}. " @@ -162,6 +233,8 @@ async def handle_reset( repo=ctx.repo_path, cursors_before=cursors_before, replay_plan=replay_plan if replay else [], + replay_errors=replay_errors, + events_replayed=events_replayed, next_action=next_action, ) @@ -169,6 +242,77 @@ async def handle_reset( # ── Wipe implementations ───────────────────────────────────────────── +def _resolve_events_dir(ledger) -> Path | None: + """Return ``.bicameral/events/`` for the resolved ledger URL, or None + when the ledger is in-memory or the directory does not exist. + + Honours ``BICAMERAL_DATA_PATH`` symmetrically with + ``adapters/ledger.py:_real_ledger_instance`` so `replay_from_events` + targets the same substrate the team-mode write path uses. + """ + import os as _os + + bicameral_dir = _resolve_bicameral_dir(ledger) + data_path = _os.environ.get("BICAMERAL_DATA_PATH") + if data_path: + events_dir = Path(data_path) / ".bicameral" / "events" + elif bicameral_dir: + events_dir = Path(bicameral_dir) / "events" + else: + return None + return events_dir if events_dir.exists() else None + + +def _count_events_on_disk(ledger) -> int: + """Tally non-empty lines across every ``.jsonl`` under events/. + + Best-effort: returns 0 when the directory is missing, ledger is + in-memory, or any file read fails. Used only to surface a count in + the dry-run; not load-bearing for the replay itself. + """ + events_dir = _resolve_events_dir(ledger) + if events_dir is None: + return 0 + total = 0 + for path in events_dir.glob("*.jsonl"): + try: + with open(path, "rb") as f: + for line in f: + if line.strip(): + total += 1 + except OSError: + continue + return total + + +async def _replay_events_into_ledger(ledger) -> int: + """Reset the materializer watermark and replay every event back + through the ingest path. Returns the count of events the + materializer applied. + + Uses the same ``EventMaterializer`` instance team mode uses, so + replay-vs-live divergence is impossible by construction. Determinism + is tracked separately under issue #296. + """ + inner = getattr(ledger, "_inner", ledger) + events_dir = _resolve_events_dir(ledger) + if events_dir is None: + return 0 + + local_dir = events_dir.parent / "local" + watermark_path = local_dir / "watermark" + local_dir.mkdir(parents=True, exist_ok=True) + # Reset the watermark so every event replays from offset 0. The + # materializer's offset map is `{author: byte_offset}`; writing an + # empty object is the canonical "start over" signal. + watermark_path.write_text("{}\n", encoding="utf-8") + + from events.materializer import EventMaterializer + + materializer = EventMaterializer(events_dir, local_dir) + return await materializer.replay_new_events(inner) + + async def _wipe_ledger(ledger, repo_path: str) -> None: """Wipe DB rows only. Delegates to adapter method or falls back to direct delete.""" if hasattr(ledger, "wipe_all_rows"): diff --git a/ledger/adapter.py b/ledger/adapter.py index 2a152a06..ff514fbe 100644 --- a/ledger/adapter.py +++ b/ledger/adapter.py @@ -580,8 +580,13 @@ async def get_decisions_by_status(self, statuses: list[str]) -> list[dict]: return [] await self._ensure_connected() conditions = " OR ".join(f"status = '{s}'" for s in statuses) + # `decision_id` is not a stored field on the decision table; alias the + # Surreal record id into it (matches queries.py:167, 228, 404, 512 et al). + # Without the alias every banner row arrives with decision_id=None, + # which makes the items the agent sees unactionable. query = ( - f"SELECT decision_id, description, status, source_ref, meeting_date, signoff " + f"SELECT type::string(id) AS decision_id, description, status, " + f"source_ref, meeting_date, signoff " f"FROM decision WHERE {conditions} LIMIT 50" ) result = await self._client.query(query) diff --git a/ledger/schema.py b/ledger/schema.py index fce5e0da..94b3d552 100644 --- a/ledger/schema.py +++ b/ledger/schema.py @@ -28,7 +28,7 @@ # - edges: yields(input_span→decision), binds_to(decision→code_region), # locates(symbol→code_region) # - removed: maps_to, implements -SCHEMA_VERSION = 16 +SCHEMA_VERSION = 17 # Maps schema version → minimum bicameral-mcp code version that understands it. # Used to produce actionable "upgrade your binary" messages. @@ -45,8 +45,22 @@ 14: "0.13.0", # placeholder; release-eng pins final value at PR merge — Phase 4 (#61) 15: "0.15.x", # decision.governance (#109 — governance metadata) 16: "0.13.x", # #252 Layer 2 — wire-format sentinel via bicameral_meta table; placeholder, release-eng pins final value at PR merge + 17: "0.14.x", # re-runnable yields integrity cleanup; placeholder, release-eng pins final value at PR merge } +# SurrealDB error substrings that init_schema treats as recoverable: the row +# state predates the new constraint and the next migration is responsible for +# cleaning it up. The substring set is the load-bearing safety contract — any +# new pattern here must be paired with a fixture-DB regression test +# (tests/test_schema_recoverable_errors.py) that produces the exact string, +# so a future surrealdb-py bump that changes the format fails CI loudly. +RECOVERABLE_DEFINE_PATTERNS: tuple[str, ...] = ( + "already exists", # DEFINE re-applied with no change + "already contains", # UNIQUE index attempted on table w/ duplicates + "expected a record<", # TYPE constraint mismatch (rename of IN/OUT type) + "but expected", # generic value-type assertion failure (defensive) +) + # Migrations that drop or recreate tables/data. These are never auto-applied; # the user must explicitly confirm via bicameral_reset(confirm=True). DESTRUCTIVE_MIGRATIONS: frozenset[int] = frozenset() @@ -423,24 +437,53 @@ def _with_overwrite(sql: str) -> str: async def _execute_define_idempotent(client: LedgerClient, sql: str) -> None: - """Run a DEFINE statement; treat "already exists" / "already contains" as success. - - "already contains" is SurrealDB's error when a UNIQUE index is attempted on - a table that already has duplicate rows. This lets the server start so the - migration that cleans stale data can run; the migration re-issues the index. + """Run a DEFINE statement; treat known recoverable errors as success. + + Recoverable patterns are listed in ``RECOVERABLE_DEFINE_PATTERNS``: + + - ``already exists`` — DEFINE re-applied; no-op. + - ``already contains`` — UNIQUE index against duplicates; the migration + that cleans stale data runs next and re-issues the index. + - ``expected a record<`` — DEFINE TABLE / INDEX OVERWRITE re-validating + legacy rows whose IN/OUT type was renamed (e.g. v3 ``yields.in = + source_span:*`` against the current ``RELATION IN input_span``). + The migration that drops those rows runs next. + - ``but expected`` — generic value-type assertion failure; same recovery + shape as the type-mismatch case. + + Any other ``LedgerError`` propagates so connect() fails fast and the + operator sees an actionable message pointing at ``bicameral-mcp diagnose``. """ try: await client.execute(sql) except LedgerError as exc: - msg = str(exc) - if "already exists" not in msg and "already contains" not in msg: + msg_lower = str(exc).lower() + if not any(p in msg_lower for p in RECOVERABLE_DEFINE_PATTERNS): raise - if "already contains" in msg: + # Loud signal — UNIQUE-violation and type-mismatch each warrant a + # warning so the next migration's cleanup is visible in audit logs. + if ( + "already contains" in msg_lower + or "expected a record<" in msg_lower + or "but expected" in msg_lower + ): logger.warning( - "[schema] DEFINE INDEX skipped — existing data violates UNIQUE " - "constraint (%s). Migration will clean stale rows and re-apply.", + "[schema] DEFINE skipped — existing data violates new constraint " + "(%s). Migration will clean stale rows and re-apply. detail=%s", sql.split("ON")[0].strip(), + str(exc).splitlines()[0] if str(exc) else "", ) + try: + from audit_log import AuditEventType + from audit_log import emit as audit_emit + + audit_emit( + AuditEventType.SCHEMA_DEFINE_SKIPPED, + sql_prefix=sql.split("ON")[0].strip(), + error_class=type(exc).__name__, + ) + except Exception: # noqa: BLE001 — audit MUST NOT break init_schema + pass async def init_schema(client: LedgerClient) -> None: @@ -459,64 +502,81 @@ async def init_schema(client: LedgerClient) -> None: # ── Migrations ────────────────────────────────────────────────────────── -async def _migrate_v4_to_v5(client: LedgerClient) -> None: - """v4 → v5: Remove stale v3-era yields edges and deduplicate. - - Some DBs that went through v3→v4 still have residual source_span→intent - edges in the yields table (the REMOVE TABLE in v3→v4 silently failed). - Those stale edges prevent DEFINE INDEX idx_yields_unique from being - applied, which broke startup. This migration: +async def _clean_yields_legacy_rows(client: LedgerClient, *, log_tag: str) -> int: + """Delete `yields` rows whose IN/OUT references the v3-era types + (`source_span:*` / `intent:*`) that no longer match the current + `RELATION IN input_span OUT decision` constraint. - 1. Deletes any yields edge whose `in` is a source_span record - or whose `out` is an intent record (v3-era types). - 2. Deduplicates remaining yields edges by (in, out), keeping - the first-seen record per pair. - 3. Re-applies the unique index now that the table is clean. + Returns the count of rows removed. Tolerates per-row failures so a + single rejected delete does not abort the whole pass. """ - # Step 1: Remove stale v3 edges try: stale = await client.query( "SELECT id FROM yields " "WHERE string::starts_with(type::string(in), 'source_span:') " " OR string::starts_with(type::string(out), 'intent:')" ) - for row in stale or []: + except Exception as exc: + logger.warning("[migration] %s: stale-edge select failed: %s", log_tag, exc) + return 0 + removed = 0 + for row in stale or []: + try: + await client.execute(f"DELETE {row['id']}") + removed += 1 + except Exception: + pass + if removed: + logger.info("[migration] %s: removed %d stale legacy yields edges", log_tag, removed) + return removed + + +async def _dedupe_yields(client: LedgerClient, *, log_tag: str) -> int: + """Drop duplicate `yields` rows by (in, out), keeping the first seen. + + Returns the count of duplicates removed. Caller is responsible for + re-applying ``DEFINE INDEX idx_yields_unique`` afterwards. + """ + try: + rows = await client.query("SELECT id, in, out FROM yields") + except Exception as exc: + logger.warning("[migration] %s: dedup select failed: %s", log_tag, exc) + return 0 + seen: set[tuple[str, str]] = set() + removed = 0 + for row in rows or []: + key = (str(row.get("in", "")), str(row.get("out", ""))) + if key in seen: try: await client.execute(f"DELETE {row['id']}") + removed += 1 except Exception: pass - logger.info( - "[migration] v4 → v5: removed %d stale v3 yields edges", - len(stale or []), - ) - except Exception as exc: - logger.warning("[migration] v4 → v5: stale-edge cleanup failed: %s", exc) + else: + seen.add(key) + if removed: + logger.info("[migration] %s: removed %d duplicate yields edges", log_tag, removed) + return removed - # Step 2: Deduplicate remaining yields by (in, out) - try: - all_yields = await client.query("SELECT id, in, out FROM yields") - seen: set[tuple[str, str]] = set() - removed = 0 - for row in all_yields or []: - key = (str(row.get("in", "")), str(row.get("out", ""))) - if key in seen: - try: - await client.execute(f"DELETE {row['id']}") - removed += 1 - except Exception: - pass - else: - seen.add(key) - if removed: - logger.info("[migration] v4 → v5: removed %d duplicate yields edges", removed) - except Exception as exc: - logger.warning("[migration] v4 → v5: dedup failed: %s", exc) - # Step 3: Re-apply the unique index now that the table is clean - for sql in [ +async def _migrate_v4_to_v5(client: LedgerClient) -> None: + """v4 → v5: Remove stale v3-era yields edges and deduplicate. + + Some DBs that went through v3→v4 still have residual source_span→intent + edges in the yields table (the REMOVE TABLE in v3→v4 silently failed). + Those stale edges prevent DEFINE INDEX idx_yields_unique from being + applied, which broke startup. + + See also ``_migrate_v16_to_v17`` — a re-run of the same cleanup, gated + on the v16→v17 boundary, for DBs that rolled past v5 with the + corruption still present. + """ + await _clean_yields_legacy_rows(client, log_tag="v4 → v5") + await _dedupe_yields(client, log_tag="v4 → v5") + await _execute_define_idempotent( + client, "DEFINE INDEX idx_yields_unique ON yields FIELDS in, out UNIQUE", - ]: - await _execute_define_idempotent(client, sql) + ) logger.info("[migration] v4 → v5: yields table clean, unique index applied") @@ -929,6 +989,36 @@ async def _migrate_v15_to_v16(client: LedgerClient) -> None: return +async def _migrate_v16_to_v17(client: LedgerClient) -> None: + """v16 → v17: Re-run the yields integrity cleanup (#296 follow-up). + + The v4 → v5 cleanup (``_migrate_v4_to_v5``) was the last reachable + pass that purged v3-era ``source_span:*`` rows from the ``yields`` + table. Any DB whose ``schema_meta.version`` rolled past 5 with the + corruption still present is permanently broken on the v4→v5 boundary + — ``migrate()`` only iterates ``range(current+1, SCHEMA_VERSION+1)``, + so historical migrations are unreachable. + + This migration re-runs the same yields cleanup body. It is a no-op on + a clean DB (zero deletions, idempotent index re-apply). It exists to + cover any operator whose ledger crashes connect() with the type + mismatch ``yields.in expected a record`` because their DB + holds the legacy rows. + + Symmetric with v4→v5 — both call into + ``_clean_yields_legacy_rows`` + ``_dedupe_yields`` so the SQL lives + in one place. Add a new fixture under + ``tests/fixtures/legacy_ledgers/`` for any future row-shape we + discover; the registry-driven test parametrizes over it. + """ + await _clean_yields_legacy_rows(client, log_tag="v16 → v17") + await _dedupe_yields(client, log_tag="v16 → v17") + await _execute_define_idempotent( + client, + "DEFINE INDEX idx_yields_unique ON yields FIELDS in, out UNIQUE", + ) + + async def _write_wire_format_sentinel( client: LedgerClient, ) -> tuple[str | None, str | None, str]: @@ -1006,6 +1096,7 @@ async def _write_wire_format_sentinel( 14: _migrate_v13_to_v14, 15: _migrate_v14_to_v15, 16: _migrate_v15_to_v16, + 17: _migrate_v16_to_v17, } diff --git a/pyproject.toml b/pyproject.toml index b491dcd2..00b173a8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "bicameral-mcp" -version = "0.14.4" +version = "0.14.5" description = "Decision ledger MCP server — ingests meeting transcripts, maps decisions to code, tracks drift" readme = "README.md" requires-python = ">=3.10" diff --git a/server.py b/server.py index c5c9e105..7ae9a4d7 100644 --- a/server.py +++ b/server.py @@ -38,6 +38,7 @@ from context import BicameralContext from dashboard.server import get_dashboard_server from handlers.bind import handle_bind +from handlers.diagnose import handle_diagnose from handlers.gap_judge import handle_judge_gaps from handlers.history import handle_history from handlers.ingest import _IngestRefused, handle_ingest @@ -819,6 +820,19 @@ async def list_tools() -> list[Tool]: }, }, ), + Tool( + name="bicameral.diagnose", + description=( + "Read-only structural diagnosis of the local ledger — versions, schema_meta state, " + "table counts, recent warn|error audit events, and a recovery_path classification " + "(clean / fixable / reset_rebuild / reset_destructive). Works even when the normal " + "adapter connect crashes during init_schema/migrate, because it opens a raw " + "LedgerClient that doesn't run schema initialization. Pair with `bicameral_reset` " + "for the actual recovery action — this tool never mutates state. " + "Slash alias: /bicameral-diagnose" + ), + inputSchema={"type": "object", "properties": {}}, + ), # ── Code locator tools (MCP-native) ────────────────────────── Tool( name="validate_symbols", @@ -1033,7 +1047,10 @@ async def _call_tool_impl(name: str, arguments: dict) -> list[TextContent]: confirm=arguments.get("confirm", False), replay=arguments.get("replay", True), wipe_mode=arguments.get("wipe_mode", "ledger"), + replay_from_events=arguments.get("replay_from_events", False), ) + elif name in ("bicameral.diagnose", "diagnose"): + result = await handle_diagnose(ctx) elif name in ("bicameral.preflight", "preflight"): result = await handle_preflight( ctx, diff --git a/skills/bicameral-diagnose/SKILL.md b/skills/bicameral-diagnose/SKILL.md new file mode 100644 index 00000000..d4bb75b6 --- /dev/null +++ b/skills/bicameral-diagnose/SKILL.md @@ -0,0 +1,77 @@ +--- +name: bicameral-diagnose +description: Read-only structural diagnosis of the local bicameral ledger. Fires on "what's wrong with my ledger", "diagnose the ledger", "is bicameral broken", "ledger health check", or any user-facing tool error that mentions schema/migration/SurrealDB. Calls `bicameral.diagnose` (MCP) which uses a raw client and works even when the normal connect path crashes. Returns a structured `recovery_path` (clean / fixable / reset_rebuild / reset_destructive) the agent surfaces alongside the recommended next command. Never mutates state — repair is a CLI operation (`bicameral-mcp diagnose --repair`) or a deliberate reset call. +--- + +# Bicameral Diagnose + +Read-only diagnostic for the local ledger. The MCP tool surface for the same `bicameral-mcp diagnose` CLI you'd paste into a bug report — but agent-callable from a tool-error envelope, and resilient to the failure modes that crash normal `connect()`. + +## When to fire + +- User asks "what's wrong with my ledger" / "is bicameral broken" / "ledger health check" / "diagnose the ledger". +- Any tool error envelope from another bicameral tool that mentions `LedgerError`, `SchemaVersionTooNew`, `DestructiveMigrationRequired`, or a SurrealDB error string the user can't action. +- Before recommending `bicameral_reset` — diagnose first, choose the recovery path on the basis of the response, then propose the matching reset call. + +## When NOT to fire + +- For questions about decisions or drift — that's `bicameral.history` / `bicameral.preflight`. Diagnose is about the ledger storage layer. +- To repair the ledger. Diagnose is read-only by contract. The repair surface is `bicameral-mcp diagnose --repair` (CLI, user-driven) or a deliberate `bicameral_reset` call. Surface the diagnosis, then surface the recommended next command — never silently retry. + +## Output contract + +The MCP tool returns: + +```jsonc +{ + "ledger_url": "surrealkv:///Users/.../ledger.db", + "connect_error": "", // non-empty when raw connect itself failed + "recovery_path": "clean" | "fixable" | "reset_rebuild" | "reset_destructive", + "diagnosis": { // null when connect_error is set + "bicameral_version": "0.14.x", + "python_version": "3.13.0", + "platform_str": "...", + "surrealdb_running": "1.0.4", + "schema_version_recorded": 16, + "schema_version_expected": 17, + "drift_status": "match" | "drift" | "first-write" | "unavailable", + "table_counts": { "decision": 42, "yields": 100, ... }, + "recent_events": [...], // last 5 warn|error audit log entries + "suggestions": [...], // hardcoded heuristics from the CLI gather + ... + }, + "next_action": "human-readable instruction tied to recovery_path" +} +``` + +## Recovery-path matrix + +Render `next_action` verbatim to the user. Then offer the matching command: + +| `recovery_path` | What it means | Recommend | +|---|---|---| +| `clean` | Schema matches, tables look sane. | "No remediation needed. If you're still seeing errors, share the exact tool name and arguments." | +| `fixable` | Schema is behind binary; pending migrations will run on next normal connect. | "Run any bicameral tool — it will trigger the migration. If that fails, try `bicameral-mcp diagnose --repair` (CLI)." | +| `reset_rebuild` | Ledger is unrecoverable, but `.bicameral/events/` has events on disk. | "`bicameral_reset(wipe_mode='ledger', replay_from_events=True, confirm=True)` will wipe and rebuild from the event log." | +| `reset_destructive` | Ledger is unrecoverable AND no events on disk → reset loses decision history. | "`bicameral_reset(wipe_mode='ledger', confirm=True)` will wipe; you'll need to re-ingest sources from `replay_plan`." | + +For `connect_error`-set responses (raw client itself can't connect), surface the error text and suggest checking the ledger path / file permissions before any reset. + +## Two diagnose surfaces — when to use which + +| Surface | Role | When | +|---|---|---| +| `bicameral.diagnose` (MCP, this skill) | Agent-callable, read-only, returns structured `recovery_path`. | In-session diagnosis, automatic surfacing from tool errors. | +| `bicameral-mcp diagnose` (CLI) | Human-pasteable markdown for bug reports. Adds `--repair` flag (user-driven repair attempts). | Bug reports, manual repair, terminal sessions without an active agent. | + +Both call the same `gather_diagnosis_raw` function — the data is identical. The MCP version emits structured JSON; the CLI emits markdown. Don't render the CLI markdown via this skill; surface the structured fields directly. + +## Auto-fire from another tool's error envelope + +When another bicameral tool returns an error containing schema/migration vocabulary, the agent should: + +1. Call `bicameral.diagnose` immediately (no user prompt — it's read-only and bounded). +2. Render the `recovery_path` and `next_action`. +3. Wait for user confirmation before invoking any reset command. Reset is destructive even when "non-destructive on paper." + +Never run `bicameral_reset` on the basis of a diagnose response without explicit user confirmation. diff --git a/skills/bicameral-reset/SKILL.md b/skills/bicameral-reset/SKILL.md index 66e7d601..d407db09 100644 --- a/skills/bicameral-reset/SKILL.md +++ b/skills/bicameral-reset/SKILL.md @@ -1,19 +1,30 @@ --- name: bicameral-reset -description: Emergency trust recovery for a polluted or stale ledger. Fires when the user says "my ledger looks wrong", "nuke the ledger", "start over", "this is polluted", or otherwise loses trust in the current state. DRY RUN BY DEFAULT — always confirms with the user before the destructive call. Two modes: ledger (default, wipes DB rows only) and full (deletes entire .bicameral/ directory). +description: Emergency trust recovery for a polluted or stale ledger. Fires when the user says "my ledger looks wrong", "nuke the ledger", "start over", "this is polluted", or otherwise loses trust in the current state. DRY RUN BY DEFAULT — always confirms with the user before the destructive call. Two wipe modes (ledger / full) plus an optional `replay_from_events` flag that rebuilds the ledger from `.bicameral/events/*.jsonl` after wipe. --- # Bicameral Reset The fail-safe valve. When the ledger gets polluted — bad ingest, stale groundings, or a session that went off the rails — `bicameral.reset` gives you a one-command recovery path. -## Two modes +## Two wipe modes | Mode | What's deleted | When to use | |------|----------------|-------------| | `wipe_mode="ledger"` (default) | Materialized SurrealDB rows only. Config, event files, and team history are preserved. Server stays live. | Bug recovery, bad ingest, polluted groundings. The safe default. | | `wipe_mode="full"` | The entire `.bicameral/` directory — ledger, `config.yaml`, team event JSONL files, everything. | Nuclear restart: switching repos, credential rotation, complete distrust of all prior decisions. | +## Optional `replay_from_events` + +`replay_from_events=True` (only valid with `wipe_mode="ledger"`) wipes the materialized DB, resets the team-mode watermark, and replays every event in `.bicameral/events/*.jsonl` back through the same ingest path team mode uses. The event log is the canonical record (committed to git in team mode); this replay is recovery, not destruction. + +Use it when: + +- `bicameral_diagnose` returns `recovery_path="reset_rebuild"` — events are present and the ledger is unrecoverable through normal migration. +- A corrupted ledger needs to be reconstructed without losing decision history. + +The dry run reports the on-disk event count; the post-confirm response reports `events_replayed` so the user can verify the round-trip. Combining `replay_from_events=True` with `wipe_mode="full"` is rejected by the handler — full-wipe deletes the substrate we'd replay from. + ## When to fire - User says *"my ledger looks polluted"*, *"this is wrong, start over"*, *"nuke the ledger"*, *"wipe it and retry"* diff --git a/tests/fixtures/legacy_ledgers/README.md b/tests/fixtures/legacy_ledgers/README.md new file mode 100644 index 00000000..23e61a77 --- /dev/null +++ b/tests/fixtures/legacy_ledgers/README.md @@ -0,0 +1,35 @@ +# Legacy ledger fixtures + +Frozen DB states that reproduce historical corruption patterns. Each +file builds the bad state in a fresh `memory://` ledger using raw +`LedgerClient.execute` calls — never the real `init_schema` / `migrate` +path, which would refuse to apply the broken state. + +The CI suite at `tests/test_legacy_ledger_fixtures.py` parametrizes +over every fixture: + +1. Build the bad state. +2. Run `init_schema` + `migrate` (the production code path). +3. Assert the cleanup ran, the schema reaches `SCHEMA_VERSION`, no + row violates the current type/UNIQUE constraints, and a second + `init_schema` + `migrate` is a no-op (idempotent). + +## Fixture index + +| Fixture | Reproduces | First seen | Cleaned by | +|---|---|---|---| +| `v3_yields_source_span.py` | v3-era `yields.in = source_span:*` rows surviving past v5 cleanup | 2026-05-09 dogfood (#296 root cause) | v4→v5 + v16→v17 | + +## Adding a fixture + +Each fixture is a Python module exporting an async `build(client)` +coroutine that mutates the client's state. Keep them tiny — one bad +row per fixture is enough to assert the cleanup contract. + +```python +# tests/fixtures/legacy_ledgers/.py +async def build(client): + await client.execute("…raw SurrealQL that produces the bad state…") +``` + +Then register it in `tests/test_legacy_ledger_fixtures.py::FIXTURES`. diff --git a/tests/fixtures/legacy_ledgers/__init__.py b/tests/fixtures/legacy_ledgers/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/fixtures/legacy_ledgers/v3_yields_source_span.py b/tests/fixtures/legacy_ledgers/v3_yields_source_span.py new file mode 100644 index 00000000..7ff0bf9b --- /dev/null +++ b/tests/fixtures/legacy_ledgers/v3_yields_source_span.py @@ -0,0 +1,55 @@ +"""Fixture: v3-era ``yields.in = source_span:*`` rows. + +Reproduces the dogfood crash (#296) where a DB carried v3-vintage +``yields`` edges whose ``in`` is a ``source_span:*`` record. The +current schema declares ``yields`` as ``RELATION IN input_span OUT +decision``, so applying ``DEFINE INDEX OVERWRITE idx_yields_unique`` +re-validates the existing rows and fails with:: + + Found source_span: for field `in`, with record `yields:`, + but expected a record + +The v4 → v5 cleanup deletes these rows, but only on the v4→v5 +boundary. Any DB whose ``schema_meta.version`` rolled past 5 with the +corruption intact is permanently broken until the v16 → v17 cleanup +re-runs the same logic. +""" + +from __future__ import annotations + + +async def build(client) -> None: + """Insert a v3-era ``yields`` row whose ``in`` is a ``source_span``. + + Sets ``schema_meta.version = 16`` so the migration loop sees a + legitimate "past v5" DB that v4→v5 cannot reach. The v16 → v17 + cleanup is the only path that should fix it. + """ + # Define a minimal source_span shadow table so the RecordID parses. + # We can't recreate the full v3 schema — we just need the row id to + # use the source_span: form. + await client.execute("DEFINE TABLE source_span SCHEMAFULL") + await client.execute("CREATE source_span:legacy_span_1 SET text = 'legacy'") + await client.execute("CREATE input_span:span_1 SET text = 'fresh', source_type = 'transcript'") + await client.execute( + "CREATE decision:dec_1 SET description = 'd1', source_type = 'transcript', " + "source_ref = '', status = 'ungrounded', canonical_id = 'fixture-cid-1'" + ) + # RELATE refuses cross-table on a typed edge, so write the bad row + # by direct CREATE on the yields table after defining it permissively. + await client.execute("DEFINE TABLE yields SCHEMAFULL TYPE RELATION") + await client.execute("RELATE source_span:legacy_span_1 -> yields -> decision:dec_1") + # And one valid row so the dedupe step has something legitimate. + await client.execute("RELATE input_span:span_1 -> yields -> decision:dec_1") + # Mark the schema as v16 so the migration loop targets v16 → v17. + await client.execute("DEFINE TABLE schema_meta SCHEMAFULL") + await client.execute("DEFINE FIELD version ON schema_meta TYPE int") + await client.execute("CREATE schema_meta SET version = 16") + + +# Post-migration assertions specific to this fixture. +async def assert_clean(client) -> None: + """All ``yields`` rows must reference an ``input_span`` IN.""" + rows = await client.query("SELECT id, type::string(in) AS in_table FROM yields") + bad = [r for r in (rows or []) if not str(r.get("in_table", "")).startswith("input_span:")] + assert not bad, f"v16→v17 left stale yields rows: {bad}" diff --git a/tests/test_ledger_bicameral_meta_migration.py b/tests/test_ledger_bicameral_meta_migration.py index e3b94c81..99c2b580 100644 --- a/tests/test_ledger_bicameral_meta_migration.py +++ b/tests/test_ledger_bicameral_meta_migration.py @@ -32,7 +32,7 @@ async def test_migrate_v15_to_v16_is_no_op_for_existing_v15_ledger(fresh_client) await migrate(fresh_client, allow_destructive=True) rows = await fresh_client.query("SELECT version FROM schema_meta LIMIT 1") assert rows[0]["version"] == SCHEMA_VERSION - assert SCHEMA_VERSION == 16 + assert SCHEMA_VERSION >= 16 # current floor — bumps land here as version increments bm_rows = await fresh_client.query("SELECT * FROM bicameral_meta") # Migration body is a no-op; sentinel writes happen in adapter.connect, not here. assert bm_rows == [] diff --git a/tests/test_legacy_ledger_fixtures.py b/tests/test_legacy_ledger_fixtures.py new file mode 100644 index 00000000..8cd29c9c --- /dev/null +++ b/tests/test_legacy_ledger_fixtures.py @@ -0,0 +1,97 @@ +"""Fixture-replay regression suite (#296 Layer A + B). + +For every frozen DB shape under ``tests/fixtures/legacy_ledgers/``: + + 1. Build the bad state in a fresh ``memory://`` ledger. + 2. Run ``init_schema`` + ``migrate`` (the production code path). + 3. Assert the schema reaches ``SCHEMA_VERSION``, the fixture's own + ``assert_clean`` invariants hold, and a second ``init_schema`` + + ``migrate`` is a no-op (idempotent). + +Adding a new fixture requires no test code — register it in +``FIXTURES`` and the parametrized test exercises it on every PR. +""" + +from __future__ import annotations + +import importlib +import sys +from pathlib import Path + +import pytest + +from ledger.client import LedgerClient +from ledger.schema import SCHEMA_VERSION, init_schema, migrate + +# Fixtures live under tests/fixtures/legacy_ledgers/. ``tests/`` is not +# a package (no top-level __init__.py) so we extend sys.path to import +# the fixture modules by name. +_FIXTURES_DIR = Path(__file__).parent / "fixtures" / "legacy_ledgers" +if str(_FIXTURES_DIR) not in sys.path: + sys.path.insert(0, str(_FIXTURES_DIR)) + +import v3_yields_source_span # noqa: E402 — see sys.path comment + +# Each entry: (slug, module). Module must export `build(client)` and +# may export `assert_clean(client)` for fixture-specific invariants. +FIXTURES = [ + ("v3_yields_source_span", v3_yields_source_span), +] + + +async def _fresh_client(slug: str) -> LedgerClient: + c = LedgerClient(url="memory://", ns="bicameral_test", db=f"fixture_{slug}") + await c.connect() + return c + + +@pytest.mark.phase2 +@pytest.mark.asyncio +@pytest.mark.parametrize("slug,module", FIXTURES, ids=[s for s, _ in FIXTURES]) +async def test_legacy_ledger_fixture_reaches_clean_state(slug: str, module) -> None: + """init_schema + migrate must terminate cleanly on every fixture.""" + c = await _fresh_client(slug) + try: + # Build the broken DB state. + await module.build(c) + + # Run the production init/migrate path. It must not raise — that's + # the entire safety contract this suite enforces. + await init_schema(c) + await migrate(c, allow_destructive=True) + + # Schema reached current version. + rows = await c.query("SELECT version FROM schema_meta LIMIT 1") + assert rows, f"{slug}: schema_meta empty after migrate" + assert rows[0]["version"] == SCHEMA_VERSION, ( + f"{slug}: schema_meta.version = {rows[0]['version']!r}, expected {SCHEMA_VERSION}" + ) + + # Fixture-specific invariants. + if hasattr(module, "assert_clean"): + await module.assert_clean(c) + + # Idempotency: a second pass changes nothing. + await init_schema(c) + await migrate(c, allow_destructive=True) + rows = await c.query("SELECT version FROM schema_meta LIMIT 1") + assert rows[0]["version"] == SCHEMA_VERSION, ( + f"{slug}: schema_meta.version regressed on second migrate ({rows[0]['version']!r})" + ) + finally: + await c.close() + + +@pytest.mark.phase2 +@pytest.mark.asyncio +async def test_fixture_registry_imports() -> None: + """Every fixture in ``FIXTURES`` must export an async ``build``. + + Catches typos in the registry — a missing ``build`` would silently + skip the migration assertion on that row. + """ + for slug, module in FIXTURES: + assert hasattr(module, "build"), f"{slug} missing build()" + # Re-import via importlib to confirm the module is on sys.path + # under its registered slug (catches typos in FIXTURES). + importlib.import_module(slug) diff --git a/tests/test_schema_recoverable_errors.py b/tests/test_schema_recoverable_errors.py new file mode 100644 index 00000000..1926d791 --- /dev/null +++ b/tests/test_schema_recoverable_errors.py @@ -0,0 +1,109 @@ +"""SurrealDB error-format contract test (#296 Layer A). + +``ledger.schema._execute_define_idempotent`` swallows the substrings in +``RECOVERABLE_DEFINE_PATTERNS``: those are the load-bearing safety +contract that lets ``init_schema`` continue past row-state that the +next migration is responsible for cleaning up. + +If a future ``surrealdb-py`` bump changes the error-string format, the +catch silently stops working and the user sees the same crash that +motivated #296. This test fabricates the bad-row state, provokes the +error, and asserts at least one of the recoverable substrings matches. +When it fails, the message tells the maintainer exactly what the new +format looks like so the constants list can be updated explicitly. +""" + +from __future__ import annotations + +import pytest + +from ledger.client import LedgerClient, LedgerError +from ledger.schema import RECOVERABLE_DEFINE_PATTERNS + + +async def _fresh_client(suffix: str) -> LedgerClient: + c = LedgerClient(url="memory://", ns="bicameral_test", db=f"recov_{suffix}") + await c.connect() + return c + + +@pytest.mark.phase2 +@pytest.mark.asyncio +async def test_define_index_unique_violation_matches_recoverable_pattern() -> None: + """A DEFINE INDEX UNIQUE on a table with duplicate rows must produce + one of the substrings in ``RECOVERABLE_DEFINE_PATTERNS`` (this is + what the v4→v5 cleanup relies on).""" + c = await _fresh_client("uniq") + try: + await c.execute("DEFINE TABLE thing SCHEMAFULL") + await c.execute("DEFINE FIELD k ON thing TYPE string") + # Two rows with the same key — UNIQUE will reject the index. + await c.execute("CREATE thing SET k = 'dup'") + await c.execute("CREATE thing SET k = 'dup'") + with pytest.raises(LedgerError) as exc: + await c.execute("DEFINE INDEX idx_thing_k ON thing FIELDS k UNIQUE") + msg = str(exc.value).lower() + matched = [p for p in RECOVERABLE_DEFINE_PATTERNS if p in msg] + assert matched, ( + "SurrealDB UNIQUE-violation error string changed — the " + "_execute_define_idempotent catch will no longer cover it. " + "Update RECOVERABLE_DEFINE_PATTERNS in ledger/schema.py to " + f"include a substring of: {exc.value!r}" + ) + finally: + await c.close() + + +@pytest.mark.phase2 +@pytest.mark.asyncio +async def test_define_index_overwrite_type_violation_matches_recoverable_pattern() -> None: + """A DEFINE INDEX OVERWRITE on a table whose existing rows violate the + column types referenced by the index must produce one of + ``RECOVERABLE_DEFINE_PATTERNS``. This is the exact failure mode that + motivated #296 (yields.in = source_span:* against the new + RELATION IN input_span constraint).""" + c = await _fresh_client("type") + try: + # Set up a typed RELATION table with a row that violates the type. + await c.execute("DEFINE TABLE source_span SCHEMAFULL") + await c.execute("DEFINE TABLE input_span SCHEMAFULL") + await c.execute("DEFINE TABLE decision SCHEMAFULL") + await c.execute("CREATE source_span:legacy_1 SET text = 'old'") + await c.execute("CREATE input_span:fresh_1 SET text = 'new'") + await c.execute("CREATE decision:d_1 SET description = 'd'") + # Permissive yields → insert the bad row → tighten the type and + # try to apply the unique index. The OVERWRITE re-validates. + await c.execute("DEFINE TABLE yields SCHEMAFULL TYPE RELATION") + await c.execute("RELATE source_span:legacy_1 -> yields -> decision:d_1") + await c.execute("RELATE input_span:fresh_1 -> yields -> decision:d_1") + # Tighten — this re-validates the source_span row against the new IN type. + await c.execute( + "DEFINE TABLE OVERWRITE yields SCHEMAFULL TYPE RELATION IN input_span OUT decision" + ) + with pytest.raises(LedgerError) as exc: + await c.execute( + "DEFINE INDEX OVERWRITE idx_yields_unique ON yields FIELDS in, out UNIQUE" + ) + msg = str(exc.value).lower() + matched = [p for p in RECOVERABLE_DEFINE_PATTERNS if p in msg] + assert matched, ( + "SurrealDB type-mismatch error string changed — the " + "_execute_define_idempotent catch will no longer cover the #296 " + "scenario. Update RECOVERABLE_DEFINE_PATTERNS in ledger/schema.py " + f"to include a substring of: {exc.value!r}" + ) + finally: + await c.close() + + +@pytest.mark.phase2 +def test_recoverable_patterns_constant_is_lowercase() -> None: + """The catch lower-cases the SurrealDB message before substring + matching. Patterns must be lowercase too, or they'd silently never + match.""" + for pattern in RECOVERABLE_DEFINE_PATTERNS: + assert pattern == pattern.lower(), ( + f"RECOVERABLE_DEFINE_PATTERNS entry {pattern!r} contains " + "non-lowercase characters; the substring catch lower-cases " + "the SurrealDB message before matching." + ) diff --git a/tests/test_sync_middleware.py b/tests/test_sync_middleware.py index 8323b8c1..7bd44a0b 100644 --- a/tests/test_sync_middleware.py +++ b/tests/test_sync_middleware.py @@ -1,96 +1,175 @@ -"""Tests for sync_middleware — session-start banner and ledger catch-up (v0.6.1).""" +"""Tests for sync_middleware — session-start banner and ledger catch-up (v0.6.1). + +Banner tests are SOCIABLE: they seed a real ``SurrealDBLedgerAdapter`` backed +by ``memory://`` and run ``get_session_start_banner`` against the real +``get_decisions_by_status`` query. The previous shape (``MagicMock`` ctx + +``AsyncMock`` ledger returning hand-crafted dicts) was solitary — it stayed +green even when the production SQL, the row shape, or the SCHEMAFULL field +list drifted. See ``pilot/mcp/CLAUDE.md`` § Sociable Testing for UX Paths. + +The remaining ``ensure_ledger_synced`` and ``repo_write_barrier`` tests use +narrow seam patches / pure asyncio primitives — those are correctly solitary +because the collaborators (link_commit, asyncio.Lock) are not what's under +test here. +""" from __future__ import annotations -from datetime import UTC, datetime, timedelta, timezone +import asyncio +from datetime import UTC, datetime, timedelta from pathlib import Path +from types import SimpleNamespace from unittest.mock import AsyncMock, MagicMock, patch import pytest from handlers.sync_middleware import ensure_ledger_synced, get_session_start_banner +from ledger.adapter import SurrealDBLedgerAdapter +from ledger.client import LedgerClient +from ledger.schema import init_schema, migrate +# ── Sociable substrate: real ledger over memory:// ────────────────────────── -def _make_ctx(open_rows=None, last_sync_sha=None, session_started=False): - """Build a minimal ctx mock with a _sync_state dict and a ledger. - ``open_rows`` is the list returned by ``ledger.get_decisions_by_status``. - Each row should include a ``status`` key ("drifted" or "ungrounded") so - the banner can count them correctly. - """ - ctx = MagicMock() - ctx.repo_path = str(Path(__file__).resolve().parents[1]) - ctx._sync_state = {"session_started": session_started} - if last_sync_sha: - ctx._sync_state["last_sync_sha"] = last_sync_sha - - ledger = AsyncMock() - ledger.get_decisions_by_status = AsyncMock(return_value=open_rows or []) - ctx.ledger = ledger - return ctx +_NS_COUNTER = 0 + +async def _make_real_adapter() -> tuple[SurrealDBLedgerAdapter, LedgerClient]: + """Spin up an isolated SurrealDB memory backend. -def _drifted(decision_id="decision:1", description="Auth uses JWT", source_ref="arch-review"): - return { - "decision_id": decision_id, - "description": description, - "source_ref": source_ref, - "status": "drifted", - } + Each call gets a fresh namespace so seed rows from one test never leak + into another. Mirrors the pattern used by + ``test_codegenome_continuity_service.py``. + """ + global _NS_COUNTER + _NS_COUNTER += 1 + client = LedgerClient(url="memory://", ns=f"sync_mw_{_NS_COUNTER}", db="ledger_test") + await client.connect() + await init_schema(client) + await migrate(client, allow_destructive=True) + adapter = SurrealDBLedgerAdapter(url="memory://") + adapter._client = client + adapter._connected = True + return adapter, client + + +# A monotonic counter ensures each seed call gets a unique canonical_id — +# the decision table has a UNIQUE index on canonical_id (schema.py:155), +# so the default empty string would collide on the second seed. +_SEED_COUNTER = 0 + + +def _next_canonical(prefix: str) -> str: + global _SEED_COUNTER + _SEED_COUNTER += 1 + return f"{prefix}-{_SEED_COUNTER}" + + +async def _seed_drifted( + client: LedgerClient, + *, + description: str = "Auth uses JWT", + source_ref: str = "arch-review", +) -> None: + await client.query( + "CREATE decision SET description=$d, source_type='transcript', " + "source_ref=$s, status='drifted', canonical_id=$c", + {"d": description, "s": source_ref, "c": _next_canonical("drifted")}, + ) -def _ungrounded(decision_id="decision:2", description="Billing uses Stripe", source_ref="pm-doc"): - return { - "decision_id": decision_id, - "description": description, - "source_ref": source_ref, - "status": "ungrounded", - } +async def _seed_ungrounded( + client: LedgerClient, + *, + description: str = "Billing uses Stripe", + source_ref: str = "pm-doc", +) -> None: + await client.query( + "CREATE decision SET description=$d, source_type='transcript', " + "source_ref=$s, status='ungrounded', canonical_id=$c", + {"d": description, "s": source_ref, "c": _next_canonical("ungrounded")}, + ) -def _proposal( - decision_id="decision:3", - description="Rate limit is 100 req/s", - source_ref="sprint-notes", - days_old=15, -): +async def _seed_proposal( + client: LedgerClient, + *, + description: str = "Rate limit is 100 req/s", + source_ref: str = "sprint-notes", + days_old: int = 15, +) -> None: created_at = (datetime.now(UTC) - timedelta(days=days_old)).isoformat() - return { - "decision_id": decision_id, - "description": description, - "source_ref": source_ref, - "status": "ungrounded", # code-compliance axis; "proposal" is gone post-decoupling - "signoff": {"state": "proposed", "created_at": created_at}, - } + signoff = {"state": "proposed", "created_at": created_at} + await client.query( + "CREATE decision SET description=$d, source_type='transcript', " + "source_ref=$s, status='ungrounded', canonical_id=$c, signoff=$g", + { + "d": description, + "s": source_ref, + "c": _next_canonical("proposal"), + "g": signoff, + }, + ) + +def _banner_ctx(adapter: SurrealDBLedgerAdapter, *, session_started: bool = False): + """Build the minimal SimpleNamespace ctx the banner reads. + + The banner code only touches ``ctx.ledger`` and ``ctx._sync_state``; + a SimpleNamespace surfaces a real ``AttributeError`` if the contract + ever grows new required fields (MagicMock would silently invent them). + """ + return SimpleNamespace( + ledger=adapter, + repo_path=str(Path(__file__).resolve().parents[1]), + _sync_state={"session_started": session_started}, + ) -# ── get_session_start_banner ───────────────────────────────────────── + +# ── get_session_start_banner (sociable: real ledger) ──────────────────────── @pytest.mark.asyncio async def test_banner_none_when_no_open_decisions(): - ctx = _make_ctx(open_rows=[]) + adapter, _ = await _make_real_adapter() + ctx = _banner_ctx(adapter) banner = await get_session_start_banner(ctx) assert banner is None @pytest.mark.asyncio async def test_banner_returned_on_first_call_with_drifted(): - ctx = _make_ctx(open_rows=[_drifted()]) + adapter, client = await _make_real_adapter() + await _seed_drifted(client) + ctx = _banner_ctx(adapter) + banner = await get_session_start_banner(ctx) + assert banner is not None assert banner.drifted_count == 1 assert banner.ungrounded_count == 0 - assert banner.items[0]["decision_id"] == "decision:1" - assert banner.items[0]["status"] == "drifted" + assert len(banner.items) == 1 + item = banner.items[0] + # decision_id falls back to the Surreal record id (`decision:`) + # when the schema row has no explicit decision_id field — the production + # contract surfaced by the banner (handlers/sync_middleware.py:193). + assert isinstance(item["decision_id"], str) and item["decision_id"].startswith("decision:") + assert item["status"] == "drifted" + assert item["description"] == "Auth uses JWT" + assert item["source_ref"] == "arch-review" assert "drifted" in banner.message @pytest.mark.asyncio async def test_banner_includes_ungrounded_decisions(): """Ungrounded decisions are 'still floating' per Jacob's ask and must appear.""" - ctx = _make_ctx(open_rows=[_drifted(), _ungrounded()]) + adapter, client = await _make_real_adapter() + await _seed_drifted(client) + await _seed_ungrounded(client) + ctx = _banner_ctx(adapter) + banner = await get_session_start_banner(ctx) + assert banner is not None assert banner.drifted_count == 1 assert banner.ungrounded_count == 1 @@ -101,23 +180,40 @@ async def test_banner_includes_ungrounded_decisions(): @pytest.mark.asyncio -async def test_banner_queries_both_drifted_and_ungrounded_statuses(): - ctx = _make_ctx(open_rows=[_drifted()]) - await get_session_start_banner(ctx) - ctx.ledger.get_decisions_by_status.assert_called_once_with( - ["drifted", "ungrounded", "context_pending"] - ) +async def test_banner_queries_each_open_status_actually_surfaces(): + """The banner must surface decisions across ALL queried statuses. + + Original test asserted ``get_decisions_by_status.assert_called_once_with( + ["drifted", "ungrounded", "context_pending"])`` against a mock — a + tautology mirroring the SQL string. The real behavior contract is: + rows with each of those statuses end up in the banner. + ``context_pending`` rows are routed through ``status='ungrounded'`` in + the production query path; this test pins the visible-to-agent shape. + """ + adapter, client = await _make_real_adapter() + await _seed_drifted(client, description="d1") + await _seed_ungrounded(client, description="u1") + ctx = _banner_ctx(adapter) + + banner = await get_session_start_banner(ctx) + + assert banner is not None + assert {i["status"] for i in banner.items} == {"drifted", "ungrounded"} @pytest.mark.asyncio async def test_banner_truncates_at_10_items_with_drifted_prioritized(): # 12 open items: 3 drifted + 9 ungrounded. Truncated view should keep # all 3 drifted first, then fill with ungrounded up to the 10-item cap. - rows = [_drifted(decision_id=f"decision:d{i}") for i in range(3)] + [ - _ungrounded(decision_id=f"decision:u{i}") for i in range(9) - ] - ctx = _make_ctx(open_rows=rows) + adapter, client = await _make_real_adapter() + for i in range(3): + await _seed_drifted(client, description=f"d{i}") + for i in range(9): + await _seed_ungrounded(client, description=f"u{i}") + ctx = _banner_ctx(adapter) + banner = await get_session_start_banner(ctx) + assert banner is not None assert banner.drifted_count == 3 # full count, not truncated assert banner.ungrounded_count == 9 @@ -130,8 +226,13 @@ async def test_banner_truncates_at_10_items_with_drifted_prioritized(): @pytest.mark.asyncio async def test_banner_not_truncated_when_under_cap(): - ctx = _make_ctx(open_rows=[_drifted(), _ungrounded()]) + adapter, client = await _make_real_adapter() + await _seed_drifted(client) + await _seed_ungrounded(client) + ctx = _banner_ctx(adapter) + banner = await get_session_start_banner(ctx) + assert banner is not None assert banner.truncated is False assert "top" not in banner.message @@ -139,45 +240,109 @@ async def test_banner_not_truncated_when_under_cap(): @pytest.mark.asyncio async def test_banner_only_fires_once_per_session(): - ctx = _make_ctx(open_rows=[_drifted()]) + adapter, client = await _make_real_adapter() + await _seed_drifted(client) + ctx = _banner_ctx(adapter) + + # Spy on the real method so we can assert query frequency without + # replacing the collaborator. + call_count = 0 + original = adapter.get_decisions_by_status + + async def _spy(statuses): + nonlocal call_count + call_count += 1 + return await original(statuses) + + adapter.get_decisions_by_status = _spy # type: ignore[method-assign] + first = await get_session_start_banner(ctx) second = await get_session_start_banner(ctx) + assert first is not None assert second is None # session_started=True after first call - # DB queried exactly once - ctx.ledger.get_decisions_by_status.assert_called_once() + assert call_count == 1 @pytest.mark.asyncio async def test_banner_none_when_already_started(): - ctx = _make_ctx(session_started=True, open_rows=[_drifted()]) + adapter, client = await _make_real_adapter() + await _seed_drifted(client) + ctx = _banner_ctx(adapter, session_started=True) + + # Spy proves the early-return short-circuits the ledger query entirely. + queried = False + original = adapter.get_decisions_by_status + + async def _spy(statuses): + nonlocal queried + queried = True + return await original(statuses) + + adapter.get_decisions_by_status = _spy # type: ignore[method-assign] + banner = await get_session_start_banner(ctx) + assert banner is None - ctx.ledger.get_decisions_by_status.assert_not_called() + assert queried is False @pytest.mark.asyncio async def test_banner_swallows_ledger_exception(): - ctx = _make_ctx() - ctx.ledger.get_decisions_by_status = AsyncMock(side_effect=RuntimeError("db down")) + """Even a real adapter can fail mid-query (e.g. SurrealKV file corruption). + + Inject the failure at the adapter method seam so the swallow-and-return- + None path in the handler is what's exercised — the rest of the ctx / + sync_state plumbing stays real. + """ + adapter, _ = await _make_real_adapter() + ctx = _banner_ctx(adapter) + + async def _boom(_statuses): + raise RuntimeError("db down") + + adapter.get_decisions_by_status = _boom # type: ignore[method-assign] + banner = await get_session_start_banner(ctx) assert banner is None # must not raise @pytest.mark.asyncio async def test_banner_none_when_sync_state_missing(): - ctx = MagicMock() - ctx._sync_state = None + adapter, _ = await _make_real_adapter() + ctx = SimpleNamespace( + ledger=adapter, + repo_path=str(Path(__file__).resolve().parents[1]), + _sync_state=None, + ) banner = await get_session_start_banner(ctx) assert banner is None # ── ensure_ledger_synced ───────────────────────────────────────────── +# +# These tests legitimately patch the downstream ``handle_link_commit`` — +# the unit under test is the SHA-cache decision logic in +# ``ensure_ledger_synced`` itself, not the ledger sync mechanics. Real +# end-to-end coverage of ``handle_link_commit`` lives in +# ``test_link_commit_grounding.py`` and ``test_phase3_integration.py``. + + +def _ensure_ctx() -> SimpleNamespace: + """Lightweight ctx for the SHA-cache logic tests. + + No ledger ops happen inside ``ensure_ledger_synced`` itself — the only + ctx attribute it reads is ``repo_path`` (for ``_read_current_head_sha``). + """ + return SimpleNamespace( + repo_path=str(Path(__file__).resolve().parents[1]), + _sync_state={"session_started": False}, + ) @pytest.mark.asyncio async def test_ensure_calls_link_commit_when_head_advanced(): - ctx = _make_ctx(last_sync_sha="old_sha") + ctx = _ensure_ctx() with ( patch("handlers.link_commit._read_current_head_sha", return_value="new_sha"), @@ -190,7 +355,7 @@ async def test_ensure_calls_link_commit_when_head_advanced(): @pytest.mark.asyncio async def test_ensure_skips_link_commit_when_already_synced(monkeypatch): monkeypatch.setattr("handlers.sync_middleware._LAST_SYNCED_SHA", "current_sha") - ctx = _make_ctx() + ctx = _ensure_ctx() with ( patch("handlers.link_commit._read_current_head_sha", return_value="current_sha"), @@ -202,7 +367,7 @@ async def test_ensure_skips_link_commit_when_already_synced(monkeypatch): @pytest.mark.asyncio async def test_ensure_swallows_link_commit_exception(): - ctx = _make_ctx() + ctx = _ensure_ctx() with patch("handlers.link_commit.handle_link_commit", new_callable=AsyncMock) as mock_lc: mock_lc.side_effect = RuntimeError("git not available") @@ -210,14 +375,18 @@ async def test_ensure_swallows_link_commit_exception(): await ensure_ledger_synced(ctx) -# ── stale proposal banner (v0.7.0) ────────────────────────────────── +# ── stale proposal banner (v0.7.0) — sociable ─────────────────────── @pytest.mark.asyncio async def test_banner_surfaces_stale_proposal(): """Proposals idle >14 days appear in the banner with stale_proposal_count.""" - ctx = _make_ctx(open_rows=[_proposal(days_old=15)]) + adapter, client = await _make_real_adapter() + await _seed_proposal(client, days_old=15) + ctx = _banner_ctx(adapter) + banner = await get_session_start_banner(ctx) + assert banner is not None assert banner.stale_proposal_count == 1 assert banner.proposal_count == 1 @@ -228,7 +397,10 @@ async def test_banner_surfaces_stale_proposal(): @pytest.mark.asyncio async def test_banner_silent_on_fresh_proposal(): """Proposals <14 days old are expected noise — banner must not fire.""" - ctx = _make_ctx(open_rows=[_proposal(days_old=3)]) + adapter, client = await _make_real_adapter() + await _seed_proposal(client, days_old=3) + ctx = _banner_ctx(adapter) + banner = await get_session_start_banner(ctx) assert banner is None