-
Notifications
You must be signed in to change notification settings - Fork 1
fix: ingest pipeline — input contracts, payload normalization, freshness guards #3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -119,7 +119,76 @@ class DetectDriftResponse(BaseModel): | |
| # (LinkCommitResponse defined above alongside SearchDecisionsResponse) | ||
|
|
||
|
|
||
| # ── Tool 5: /ingest ─────────────────────────────────────────────────── | ||
| # ── Tool 5: /ingest — INPUT contracts ──────────────────────────────── | ||
|
|
||
|
|
||
| class IngestSpan(BaseModel): | ||
| """Source excerpt from a meeting, document, or manual input.""" | ||
| text: str = "" | ||
| source_type: str = "manual" # transcript | notion | document | manual | ||
| source_ref: str = "" # meeting ID, Notion page ID, etc. | ||
| speakers: list[str] = [] | ||
| meeting_date: str = "" | ||
|
|
||
|
|
||
| class IngestCodeRegion(BaseModel): | ||
| """Pre-resolved code region for a mapping.""" | ||
| symbol: str | ||
| file_path: str | ||
| start_line: int = 0 | ||
| end_line: int = 0 | ||
| type: str = "function" | ||
| purpose: str = "" | ||
|
|
||
|
|
||
| class IngestMapping(BaseModel): | ||
| """One decision-to-code mapping in the internal pipeline format.""" | ||
| intent: str | ||
| span: IngestSpan = IngestSpan() | ||
| symbols: list[str] = [] | ||
| code_regions: list[IngestCodeRegion] = [] | ||
|
|
||
|
|
||
| class IngestDecision(BaseModel): | ||
| """One decision in the natural LLM-generated format.""" | ||
| id: str = "" | ||
| title: str = "" | ||
| description: str = "" | ||
| status: str = "" | ||
| participants: list[str] = [] | ||
|
|
||
|
|
||
| class IngestActionItem(BaseModel): | ||
| owner: str = "unassigned" | ||
| action: str = "" | ||
| due: str = "" | ||
|
|
||
|
|
||
| class IngestPayload(BaseModel): | ||
| """Ingest input — accepts EITHER mappings (internal) or decisions (natural LLM). | ||
|
|
||
| If ``mappings`` is present, it's used directly (internal pipeline format). | ||
| If ``decisions`` is present, they are normalized into mappings automatically. | ||
| """ | ||
| # Common fields | ||
| repo: str = "" | ||
| commit_hash: str = "" | ||
| query: str = "" | ||
|
|
||
| # Internal pipeline format | ||
| mappings: list[IngestMapping] = [] | ||
|
|
||
| # Natural LLM-generated format (normalized into mappings if present) | ||
| source: str = "manual" | ||
| title: str = "" | ||
| date: str = "" | ||
| participants: list[str] = [] | ||
| decisions: list[IngestDecision] = [] | ||
| action_items: list[IngestActionItem] = [] | ||
| open_questions: list[str] = [] | ||
|
Comment on lines
+167
to
+188
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: # Locate contracts.py and handlers/ingest.py
git ls-files | grep -E "(contracts\.py|ingest\.py)"Repository: BicameralAI/bicameral-mcp Length of output: 100 🏁 Script executed: # Check contracts.py to see if IngestPayload exists at lines 167-188
head -200 contracts.py | tail -50Repository: BicameralAI/bicameral-mcp Length of output: 1421 🏁 Script executed: # Get context around the IngestPayload class
rg "class IngestPayload" -A 25 contracts.pyRepository: BicameralAI/bicameral-mcp Length of output: 881 🏁 Script executed: # Check handlers/ingest.py for the model_validate and normalization logic
cat -n handlers/ingest.py | head -110Repository: BicameralAI/bicameral-mcp Length of output: 4438 🏁 Script executed: # Check imports in contracts.py to see current Pydantic usage
head -30 contracts.pyRepository: BicameralAI/bicameral-mcp Length of output: 911 🏁 Script executed: # Search for any existing validators on IngestPayload
rg "@.*validator|from pydantic" contracts.py -B 2 -A 10Repository: BicameralAI/bicameral-mcp Length of output: 396 🏁 Script executed: # Check pyproject.toml or requirements for Pydantic version
fd -e "pyproject.toml" -o -e "requirements.txt" -o -e "requirements*.txt" | head -5 | xargs cat 2>/dev/null | grep -i pydanticRepository: BicameralAI/bicameral-mcp Length of output: 185 🏁 Script executed: # Check Pydantic version in config files
find . -maxdepth 2 -type f \( -name "pyproject.toml" -o -name "requirements*.txt" -o -name "setup.py" \) -exec cat {} \; | grep -i pydanticRepository: BicameralAI/bicameral-mcp Length of output: 105 🏁 Script executed: # Verify that contracts.py currently has NO validators on IngestPayload
ast-grep --pattern $'class IngestPayload {
$$$
}'Repository: BicameralAI/bicameral-mcp Length of output: 51 🏁 Script executed: # Double-check the exact behavior: empty payload validation
python3 << 'EOF'
# Simulate what happens with empty dict and IngestPayload
# to confirm defaults allow empty input
from typing import List
from pydantic import BaseModel
class MockIngestDecision(BaseModel):
id: str = ""
title: str = ""
class MockIngestActionItem(BaseModel):
owner: str = "unassigned"
action: str = ""
class MockIngestPayload(BaseModel):
repo: str = ""
commit_hash: str = ""
query: str = ""
mappings: List[dict] = []
source: str = "manual"
title: str = ""
date: str = ""
participants: List[str] = []
decisions: List[MockIngestDecision] = []
action_items: List[MockIngestActionItem] = []
open_questions: List[str] = []
# Test 1: empty dict
try:
p1 = MockIngestPayload.model_validate({})
print("✓ Empty dict validates: OK")
print(f" mappings={p1.mappings}, decisions={p1.decisions}, action_items={p1.action_items}, open_questions={p1.open_questions}")
except Exception as e:
print(f"✗ Empty dict rejected: {e}")
# Test 2: metadata only
try:
p2 = MockIngestPayload.model_validate({"repo": "test", "commit_hash": "abc"})
print("✓ Metadata-only payload validates: OK")
except Exception as e:
print(f"✗ Metadata-only rejected: {e}")
# Test 3: mixed (mappings + decisions)
try:
p3 = MockIngestPayload.model_validate({"mappings": [{"x": 1}], "decisions": [{"id": "d1"}]})
print("✓ Mixed payload (mappings + decisions) validates: OK")
except Exception as e:
print(f"✗ Mixed payload rejected: {e}")
EOFRepository: BicameralAI/bicameral-mcp Length of output: 250 🏁 Script executed: # Search for any other validation logic that might catch these cases
rg "_normalize_payload|IngestPayload" handlers/ingest.py -B 3 -A 15 | head -80Repository: BicameralAI/bicameral-mcp Length of output: 2560 Enforce the ingest shape instead of defaulting to empty payloads.
Proposed fix-from pydantic import BaseModel
+from pydantic import BaseModel, model_validator
...
class IngestPayload(BaseModel):
"""Ingest input — accepts EITHER mappings (internal) or decisions (natural LLM).
@@
decisions: list[IngestDecision] = []
action_items: list[IngestActionItem] = []
open_questions: list[str] = []
+
+ `@model_validator`(mode="after")
+ def validate_shape(self) -> "IngestPayload":
+ has_mappings = bool(self.mappings)
+ has_natural = bool(self.decisions or self.action_items or self.open_questions)
+ if has_mappings == has_natural:
+ raise ValueError(
+ "Provide exactly one ingest format: non-empty `mappings` or at least one "
+ "of `decisions`, `action_items`, `open_questions`."
+ )
+ return self🤖 Prompt for AI Agents |
||
|
|
||
|
|
||
| # ── Tool 5: /ingest — RESPONSE contracts ───────────────────────────── | ||
|
|
||
|
|
||
| class IngestStats(BaseModel): | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,25 +1,35 @@ | ||
| """Handler for /decision_status MCP tool. | ||
|
|
||
| Surfaces implementation status of all tracked decisions. | ||
| Read-only — does NOT auto-trigger link_commit. | ||
| Auto-syncs the ledger to HEAD before returning status. | ||
|
|
||
| Phase 0: backed by MockLedgerAdapter fixture data | ||
| Phase 2: backed by SurrealDBLedgerAdapter with real graph traversal | ||
| """ | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import logging | ||
| from datetime import datetime, timezone | ||
|
|
||
| from adapters.ledger import get_ledger | ||
| from contracts import CodeRegionSummary, DecisionStatusEntry, DecisionStatusResponse | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| async def handle_decision_status( | ||
| filter: str = "all", | ||
| since: str | None = None, | ||
| ref: str = "HEAD", | ||
| ) -> DecisionStatusResponse: | ||
| # Auto-sync to HEAD so status reflects current code state | ||
| try: | ||
| from handlers.link_commit import handle_link_commit | ||
| await handle_link_commit(ref) | ||
| except Exception as exc: | ||
| logger.warning("[status] auto-sync failed: %s", exc) | ||
|
Comment on lines
+26
to
+31
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't return status after a failed auto-sync.
🧰 Tools🪛 Ruff (0.15.9)[warning] 30-30: Do not catch blind exception: (BLE001) 🤖 Prompt for AI Agents |
||
|
|
||
| ledger = get_ledger() | ||
| decisions_raw = await ledger.get_all_decisions(filter=filter) | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -13,7 +13,7 @@ | |
| import re | ||
|
|
||
| from adapters.ledger import get_ledger | ||
| from contracts import IngestResponse, IngestStats, SourceCursorSummary | ||
| from contracts import IngestPayload, IngestResponse, IngestStats, SourceCursorSummary | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
@@ -34,6 +34,73 @@ | |
| }) | ||
|
|
||
|
|
||
| def _normalize_payload(payload: dict) -> dict: | ||
| """Validate and normalize ingest payload using Pydantic contracts. | ||
|
|
||
| 1. Validates the raw dict against IngestPayload (fails fast on bad types) | ||
| 2. If ``mappings`` is already present, returns as-is (internal format) | ||
| 3. If ``decisions``/``action_items``/``open_questions`` present, converts to mappings | ||
| """ | ||
| validated = IngestPayload.model_validate(payload) | ||
|
|
||
| # Already has mappings — convert back to dict and return | ||
| if validated.mappings: | ||
| return validated.model_dump() | ||
|
|
||
| mappings: list[dict] = [] | ||
| source_meta = { | ||
| "source_type": validated.source, | ||
| "source_ref": validated.title, | ||
| "speakers": validated.participants, | ||
| "meeting_date": validated.date, | ||
| } | ||
|
|
||
| for d in validated.decisions: | ||
| text = d.description or d.title | ||
| if not text: | ||
| continue | ||
| mappings.append({ | ||
| "intent": text, | ||
| "span": { | ||
| **source_meta, | ||
| "text": text, | ||
| "source_ref": d.id or source_meta["source_ref"], | ||
| "speakers": d.participants or source_meta["speakers"], | ||
| }, | ||
| "symbols": [], | ||
| "code_regions": [], | ||
| }) | ||
|
|
||
| for a in validated.action_items: | ||
| text = f"[Action: {a.owner}] {a.action}" | ||
| mappings.append({ | ||
| "intent": text, | ||
| "span": {**source_meta, "text": text}, | ||
| "symbols": [], | ||
| "code_regions": [], | ||
| }) | ||
|
|
||
| for q in validated.open_questions: | ||
| text = f"[Open Question] {q}" | ||
| mappings.append({ | ||
| "intent": text, | ||
| "span": {**source_meta, "text": text}, | ||
| "symbols": [], | ||
| "code_regions": [], | ||
| }) | ||
|
|
||
| if not mappings: | ||
| logger.warning( | ||
| "[ingest] payload validated but produced 0 mappings: %s", | ||
| list(payload.keys()), | ||
| ) | ||
| return validated.model_dump() | ||
|
|
||
| result = validated.model_dump() | ||
| result["mappings"] = mappings | ||
| return result | ||
|
|
||
|
|
||
| def _regions_from_symbol_ids(symbol_ids: list[int], db, description: str) -> list[dict]: | ||
| """Resolve a list of symbol IDs to code_region dicts.""" | ||
| regions = [] | ||
|
|
@@ -79,8 +146,9 @@ def _auto_ground_via_search(mappings: list[dict], repo: str) -> tuple[list[dict] | |
| db_path = str(os.path.join(repo, ".bicameral", "code-graph.db")) | ||
|
|
||
| try: | ||
| from adapters.code_locator import get_code_locator | ||
| from adapters.code_locator import get_code_locator, ensure_code_graph_fresh | ||
| from code_locator.indexing.sqlite_store import SymbolDB | ||
| ensure_code_graph_fresh(repo) | ||
| locator = get_code_locator() | ||
|
Comment on lines
+149
to
152
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use the same repo for refresh and search. This refreshes the code graph for 🔧 Proposed fix- from adapters.code_locator import get_code_locator, ensure_code_graph_fresh
+ from adapters.code_locator import get_code_locator, ensure_code_graph_fresh
from code_locator.indexing.sqlite_store import SymbolDB
ensure_code_graph_fresh(repo)
- locator = get_code_locator()
+ locator = get_code_locator(repo_path=repo)Companion change outside this hunk: # adapters/code_locator.py
def get_code_locator(repo_path: str | None = None):
repo = repo_path or os.getenv("REPO_PATH", ".")
return RealCodeLocatorAdapter(repo_path=repo)🤖 Prompt for AI Agents |
||
| db = SymbolDB(db_path) | ||
| except Exception as exc: | ||
|
|
@@ -196,7 +264,9 @@ def _resolve_symbols_to_regions(payload: dict, repo: str) -> dict: | |
| db_path = str(_os.path.join(repo, ".bicameral", "code-graph.db")) | ||
|
|
||
| try: | ||
| from adapters.code_locator import ensure_code_graph_fresh | ||
| from code_locator.indexing.sqlite_store import SymbolDB | ||
| ensure_code_graph_fresh(repo) | ||
| db = SymbolDB(db_path) | ||
| except Exception as exc: | ||
| logger.warning("[ingest] cannot open symbol DB at %s: %s", db_path, exc) | ||
|
|
@@ -249,12 +319,20 @@ async def handle_ingest( | |
| if hasattr(ledger, "connect"): | ||
| await ledger.connect() | ||
|
|
||
| payload = _normalize_payload(payload) | ||
| repo = str(payload.get("repo") or os.getenv("REPO_PATH", ".")) | ||
| payload = _resolve_symbols_to_regions(payload, repo) | ||
| mappings, grounding_deferred = _auto_ground_via_search(payload.get("mappings") or [], repo) | ||
| payload = {**payload, "mappings": mappings} | ||
| result = await ledger.ingest_payload(payload) | ||
|
|
||
| # Sync ledger to HEAD and re-ground any previously ungrounded intents | ||
| try: | ||
| from handlers.link_commit import handle_link_commit | ||
| await handle_link_commit("HEAD") | ||
| except Exception as exc: | ||
| logger.warning("[ingest] post-ingest link_commit failed: %s", exc) | ||
|
|
||
| cursor_summary = None | ||
| source_type = str(((payload.get("mappings") or [{}])[0].get("span") or {}).get("source_type", "manual")) | ||
| last_source_ref = _derive_last_source_ref(payload) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid running freshness rebuilds inline on the request path.
ensure_index_matches_repo()can cold-start or fully rebuild the sqlite/BM25 artifacts. Exposing that work through a synchronous helper is risky now that ingest/status flows can call it during normal tool execution: one stale repo can block the whole MCP server until indexing finishes. Please push this behind an async/off-thread wrapper, or require callers to invoke it viaasyncio.to_thread(...).🤖 Prompt for AI Agents