diff --git a/.github/workflows/test-mcp-regression.yml b/.github/workflows/test-mcp-regression.yml index 3db4fea9..1904b2ca 100644 --- a/.github/workflows/test-mcp-regression.yml +++ b/.github/workflows/test-mcp-regression.yml @@ -156,6 +156,7 @@ jobs: tests/test_query_safety_562.py tests/test_preflight_intent.py tests/test_preflight_hook.py + tests/test_post_preflight_capture_reminder.py -v --tb=short --junitxml=test-results/results.xml --html=test-results/report.html --self-contained-html diff --git a/scripts/hooks/_prompt_intent_state.py b/scripts/hooks/_prompt_intent_state.py new file mode 100644 index 00000000..d1acdd71 --- /dev/null +++ b/scripts/hooks/_prompt_intent_state.py @@ -0,0 +1,83 @@ +"""#170 — session-scoped implementation-intent state shared between hooks. + +The PostToolUse capture reminder (``post_preflight_capture_reminder.py``) +should stay silent when the originating user prompt had no implementation +intent — a read-only prompt cannot be refining a surfaced decision, so the +disambiguation question is pure noise. The UserPromptSubmit hook +(``preflight_reminder.py``) persists the boolean returned by +``has_implementation_signal(prompt)`` here keyed by ``session_id``, and the +capture hook reads it. + +Single source of truth for the handoff file. Pure filesystem; no ledger, no +network. Files are swept on a TTL so they garbage-collect across session +boundaries without a SessionEnd hook. Absence/unreadability reads as ``None`` +(caller defaults to firing — missed capture is irreversible). +""" + +from __future__ import annotations + +import json +import re +import tempfile +import time +from pathlib import Path + +_TTL_SECONDS = 86400 # 24h — longer than any plausible session; GC backstop +_DIR_NAME = "bicameral_prompt_intent" +_SANITIZE_RE = re.compile(r"[^A-Za-z0-9_-]") + + +def _state_dir() -> Path: + return Path(tempfile.gettempdir()) / _DIR_NAME + + +def state_path(session_id: str) -> Path: + """Per-session state file path. ``session_id`` is sanitized to a safe + filename component (defends against path traversal via the harness-supplied + id).""" + safe = _SANITIZE_RE.sub("_", session_id)[:128] or "_" + return _state_dir() / f"{safe}.json" + + +def _sweep_stale(now: float) -> None: + directory = _state_dir() + if not directory.is_dir(): + return + for stale in directory.glob("*.json"): + try: + if now - stale.stat().st_mtime > _TTL_SECONDS: + stale.unlink() + except OSError: + pass # best-effort GC; never raise from a hook path + + +def write_intent(session_id: str, fire: bool) -> None: + """Persist ``fire`` for ``session_id`` and sweep stale files. Best-effort: + any filesystem error is swallowed (absence defaults to firing downstream).""" + if not session_id: + return + now = time.time() + try: + _state_dir().mkdir(parents=True, exist_ok=True) + _sweep_stale(now) + state_path(session_id).write_text( + json.dumps({"fire": bool(fire), "ts": now}), encoding="utf-8" + ) + except OSError: + pass + + +def read_intent(session_id: str) -> bool | None: + """Return the persisted ``fire`` boolean for ``session_id``, or ``None`` + when absent, stale, unreadable, or malformed.""" + if not session_id: + return None + try: + path = state_path(session_id) + if time.time() - path.stat().st_mtime > _TTL_SECONDS: + return None + data = json.loads(path.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError, ValueError): + return None + fire = data.get("fire") if isinstance(data, dict) else None + return fire if isinstance(fire, bool) else None diff --git a/scripts/hooks/post_preflight_capture_reminder.py b/scripts/hooks/post_preflight_capture_reminder.py index c2669274..c9f60319 100644 --- a/scripts/hooks/post_preflight_capture_reminder.py +++ b/scripts/hooks/post_preflight_capture_reminder.py @@ -45,6 +45,10 @@ import json import sys +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) +from hooks._prompt_intent_state import read_intent # noqa: E402 PREFLIGHT_TOOL_NAME = "mcp__bicameral__bicameral_preflight" @@ -133,6 +137,12 @@ def main() -> int: decisions = response.get("decisions") or [] if not isinstance(decisions, list) or not decisions: return 0 + # #170: the originating prompt had no implementation intent (read-only) — + # nothing to refine, so suppress the disambiguation reminder. `is False` + # (not falsy): only a definitive non-impl classification suppresses; a + # missing/unreadable state file (None) defaults to firing. + if read_intent(payload.get("session_id", "")) is False: + return 0 json.dump( { "hookSpecificOutput": { diff --git a/scripts/hooks/preflight_intent.py b/scripts/hooks/preflight_intent.py index c9f888be..917ff0f1 100644 --- a/scripts/hooks/preflight_intent.py +++ b/scripts/hooks/preflight_intent.py @@ -177,16 +177,35 @@ def classify_prompt(prompt: str) -> ClassifyResult: if skip.search(prompt): return ClassifyResult(False, surface_form, command) - if _VERB_REGEX.search(prompt): - return ClassifyResult(True, surface_form, command) - - lowered = prompt.lower() - if any(phrase in lowered for phrase in INDIRECT_INTENT_PHRASES): + if has_implementation_signal(prompt): return ClassifyResult(True, surface_form, command) return ClassifyResult(False, surface_form, command) +def has_implementation_signal(prompt: str) -> bool: + """True iff the prompt carries any code-implementation signal — an + implementation verb or an indirect-intent phrase — INDEPENDENT of the + SKIP_PATTERNS. + + #170: the post-preflight capture gate suppresses its disambiguation + reminder only when this is False (a genuinely read-only prompt cannot be + refining a surfaced decision). It deliberately does NOT consult + SKIP_PATTERNS: a skip-listed-but-verb-bearing prompt like "add tests for X" + still carries implementation signal and must reach the user-disambiguation + per #175 — never lexically pre-judged "compatible." + """ + if not prompt or not prompt.strip(): + return False + _surface_form, command, _remainder = _detect_surface_form(prompt) + if command is not None and command in IMPL_INTENT_SLASH_COMMANDS: + return True + if _VERB_REGEX.search(prompt): + return True + lowered = prompt.lower() + return any(phrase in lowered for phrase in INDIRECT_INTENT_PHRASES) + + def should_fire_preflight(prompt: str) -> bool: """Return True iff prompt indicates code-implementation intent. diff --git a/scripts/hooks/preflight_reminder.py b/scripts/hooks/preflight_reminder.py index 15694950..089f521a 100644 --- a/scripts/hooks/preflight_reminder.py +++ b/scripts/hooks/preflight_reminder.py @@ -48,7 +48,12 @@ from pathlib import Path sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) -from hooks.preflight_intent import ClassifyResult, classify_prompt # noqa: E402 +from hooks._prompt_intent_state import write_intent # noqa: E402 +from hooks.preflight_intent import ( # noqa: E402 + ClassifyResult, + classify_prompt, + has_implementation_signal, +) REMINDER_TEXT = ( "\n" @@ -140,6 +145,15 @@ def main() -> int: prompt = payload.get("prompt", "") if isinstance(payload, dict) else "" result = classify_prompt(prompt) _record_trigger_evaluated(result) + # #170: persist whether the prompt carries ANY implementation signal so the + # PostToolUse capture hook can stay silent on genuinely read-only prompts + # (no refinement possible). Uses has_implementation_signal — NOT + # should_fire_preflight — so skip-listed-but-verb-bearing prompts like + # "add tests for X" still reach the user-disambiguation per #175. + write_intent( + payload.get("session_id", "") if isinstance(payload, dict) else "", + has_implementation_signal(prompt), + ) if result.fire: json.dump( { diff --git a/tests/test_post_preflight_capture_reminder.py b/tests/test_post_preflight_capture_reminder.py new file mode 100644 index 00000000..18d518a4 --- /dev/null +++ b/tests/test_post_preflight_capture_reminder.py @@ -0,0 +1,168 @@ +"""#170 — implementation-intent gate on the post-preflight capture reminder. + +The PostToolUse capture hook must suppress its disambiguation reminder when the +originating user prompt had no implementation intent (read-only), and must keep +firing for implementation-intent prompts — including compatible ones like +"add tests for X" (the #175 invariant: never lexically pre-judge contradiction). +""" + +from __future__ import annotations + +import json +import os +import subprocess +import sys +import time +import uuid +from pathlib import Path + +REPO_ROOT = Path(__file__).resolve().parent.parent +sys.path.insert(0, str(REPO_ROOT)) + +from scripts.hooks._prompt_intent_state import ( # noqa: E402 + read_intent, + state_path, + write_intent, +) +from scripts.hooks.preflight_intent import ( # noqa: E402 + has_implementation_signal, + should_fire_preflight, +) + +POST_HOOK = REPO_ROOT / "scripts" / "hooks" / "post_preflight_capture_reminder.py" +PRE_HOOK = REPO_ROOT / "scripts" / "hooks" / "preflight_reminder.py" + +_DECISION = {"decision_id": "decision:abc123", "description": "Drag-to-reorder commits"} + + +def _sid() -> str: + return f"test-170-{uuid.uuid4().hex}" + + +def _run(script: Path, payload: dict) -> tuple[int, str]: + proc = subprocess.run( + [sys.executable, str(script)], + input=json.dumps(payload), + capture_output=True, + text=True, + timeout=15, + ) + return proc.returncode, proc.stdout + + +def _fired(stdout: str) -> bool: + """True iff the hook emitted a capture-reminder envelope.""" + if not stdout.strip(): + return False + parsed = json.loads(stdout) + return "hookSpecificOutput" in parsed + + +def _post_payload(sid: str) -> dict: + return { + "tool_name": "mcp__bicameral__bicameral_preflight", + "tool_response": {"fired": True, "decisions": [_DECISION]}, + "session_id": sid, + } + + +# ── State module unit tests ──────────────────────────────────────────── + + +def test_state_round_trip(): + sid = _sid() + write_intent(sid, True) + assert read_intent(sid) is True + write_intent(sid, False) + assert read_intent(sid) is False + + +def test_read_absent_is_none(): + assert read_intent(_sid()) is None # never written + assert read_intent("") is None # empty session id + + +def test_ttl_sweep_removes_stale(): + stale_sid, fresh_sid = _sid(), _sid() + write_intent(stale_sid, True) + # backdate the stale file well past the TTL + os.utime(state_path(stale_sid), (time.time() - 90_000, time.time() - 90_000)) + write_intent(fresh_sid, True) # any write triggers the sweep + assert not state_path(stale_sid).exists(), "stale state file should be swept" + assert read_intent(fresh_sid) is True, "fresh state file must survive the sweep" + + +def test_read_stale_is_none(): + sid = _sid() + write_intent(sid, False) + os.utime(state_path(sid), (time.time() - 90_000, time.time() - 90_000)) + assert read_intent(sid) is None + + +# ── PostToolUse capture-hook behavioral tests (subprocess) ────────────── + + +def test_suppressed_when_read_only(): + """Read-only prompt (fire=False persisted) → reminder suppressed.""" + sid = _sid() + write_intent(sid, False) + rc, out = _run(POST_HOOK, _post_payload(sid)) + assert rc == 0 + assert not _fired(out), "capture reminder must be suppressed for a non-impl prompt" + + +def test_fires_when_impl_intent(): + """Implementation-intent prompt (fire=True persisted) → reminder fires.""" + sid = _sid() + write_intent(sid, True) + rc, out = _run(POST_HOOK, _post_payload(sid)) + assert rc == 0 + assert _fired(out), "capture reminder must fire for an impl-intent prompt" + assert "AskUserQuestion" in out # the #175 disambiguation path intact + + +def test_default_fires_when_state_absent(): + """No state file for the session → fire (safe default; missed capture is irreversible).""" + rc, out = _run(POST_HOOK, _post_payload(_sid())) + assert rc == 0 + assert _fired(out), "absent state must default to firing" + + +def test_fires_for_compatible_add_tests(): + """#175 guard (end-to-end): 'add tests for X' carries implementation signal + (verb 'add'), so it is NOT lexically suppressed and reaches the + user-disambiguation — even though should_fire_preflight skip-lists it.""" + prompt = "add tests for the existing drag-to-reorder behavior" + assert has_implementation_signal(prompt) is True + assert ( + should_fire_preflight(prompt) is False + ) # skip-listed: documents the deliberate divergence + sid = _sid() + rc, _ = _run(PRE_HOOK, {"prompt": prompt, "session_id": sid}) + assert rc == 0 + assert read_intent(sid) is True # write side persists has_implementation_signal + rc, out = _run(POST_HOOK, _post_payload(sid)) + assert rc == 0 and _fired(out), "add-tests must reach user-disambiguation (#175)" + + +# ── Write-side: UserPromptSubmit hook persists the classification ─────── + + +def test_write_side_persists_signal(): + sid = _sid() + prompt = "refactor the rate limiter to a sliding window" + rc, _ = _run(PRE_HOOK, {"prompt": prompt, "session_id": sid}) + assert rc == 0 + expected = has_implementation_signal(prompt) + assert expected is True + assert read_intent(sid) is True + + +def test_write_side_persists_false_for_read_only(): + sid = _sid() + prompt = "what does the rate limiter currently do?" + rc, _ = _run(PRE_HOOK, {"prompt": prompt, "session_id": sid}) + assert rc == 0 + expected = has_implementation_signal(prompt) + assert expected is False + assert read_intent(sid) is False