Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/test-mcp-regression.yml
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ jobs:
tests/test_query_safety_562.py
tests/test_preflight_intent.py
tests/test_preflight_hook.py
tests/test_post_preflight_capture_reminder.py
-v --tb=short
--junitxml=test-results/results.xml
--html=test-results/report.html --self-contained-html
Expand Down
83 changes: 83 additions & 0 deletions scripts/hooks/_prompt_intent_state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
"""#170 — session-scoped implementation-intent state shared between hooks.

The PostToolUse capture reminder (``post_preflight_capture_reminder.py``)
should stay silent when the originating user prompt had no implementation
intent — a read-only prompt cannot be refining a surfaced decision, so the
disambiguation question is pure noise. The UserPromptSubmit hook
(``preflight_reminder.py``) persists the boolean returned by
``has_implementation_signal(prompt)`` here keyed by ``session_id``, and the
capture hook reads it.

Single source of truth for the handoff file. Pure filesystem; no ledger, no
network. Files are swept on a TTL so they garbage-collect across session
boundaries without a SessionEnd hook. Absence/unreadability reads as ``None``
(caller defaults to firing — missed capture is irreversible).
"""

from __future__ import annotations

import json
import re
import tempfile
import time
from pathlib import Path

_TTL_SECONDS = 86400 # 24h — longer than any plausible session; GC backstop
_DIR_NAME = "bicameral_prompt_intent"
_SANITIZE_RE = re.compile(r"[^A-Za-z0-9_-]")


def _state_dir() -> Path:
return Path(tempfile.gettempdir()) / _DIR_NAME


def state_path(session_id: str) -> Path:
"""Per-session state file path. ``session_id`` is sanitized to a safe
filename component (defends against path traversal via the harness-supplied
id)."""
safe = _SANITIZE_RE.sub("_", session_id)[:128] or "_"
return _state_dir() / f"{safe}.json"


def _sweep_stale(now: float) -> None:
directory = _state_dir()
if not directory.is_dir():
return
for stale in directory.glob("*.json"):
try:
if now - stale.stat().st_mtime > _TTL_SECONDS:
stale.unlink()
except OSError:
pass # best-effort GC; never raise from a hook path


def write_intent(session_id: str, fire: bool) -> None:
"""Persist ``fire`` for ``session_id`` and sweep stale files. Best-effort:
any filesystem error is swallowed (absence defaults to firing downstream)."""
if not session_id:
return
now = time.time()
try:
_state_dir().mkdir(parents=True, exist_ok=True)
_sweep_stale(now)
state_path(session_id).write_text(
json.dumps({"fire": bool(fire), "ts": now}), encoding="utf-8"
)
except OSError:
pass


def read_intent(session_id: str) -> bool | None:
"""Return the persisted ``fire`` boolean for ``session_id``, or ``None``
when absent, stale, unreadable, or malformed."""
if not session_id:
return None
try:
path = state_path(session_id)
if time.time() - path.stat().st_mtime > _TTL_SECONDS:
return None
data = json.loads(path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError, ValueError):
return None
fire = data.get("fire") if isinstance(data, dict) else None
return fire if isinstance(fire, bool) else None
Comment thread
coderabbitai[bot] marked this conversation as resolved.
10 changes: 10 additions & 0 deletions scripts/hooks/post_preflight_capture_reminder.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@

import json
import sys
from pathlib import Path

sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from hooks._prompt_intent_state import read_intent # noqa: E402

PREFLIGHT_TOOL_NAME = "mcp__bicameral__bicameral_preflight"

Expand Down Expand Up @@ -133,6 +137,12 @@ def main() -> int:
decisions = response.get("decisions") or []
if not isinstance(decisions, list) or not decisions:
return 0
# #170: the originating prompt had no implementation intent (read-only) —
# nothing to refine, so suppress the disambiguation reminder. `is False`
# (not falsy): only a definitive non-impl classification suppresses; a
# missing/unreadable state file (None) defaults to firing.
if read_intent(payload.get("session_id", "")) is False:
return 0
json.dump(
{
"hookSpecificOutput": {
Expand Down
29 changes: 24 additions & 5 deletions scripts/hooks/preflight_intent.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,16 +177,35 @@ def classify_prompt(prompt: str) -> ClassifyResult:
if skip.search(prompt):
return ClassifyResult(False, surface_form, command)

if _VERB_REGEX.search(prompt):
return ClassifyResult(True, surface_form, command)

lowered = prompt.lower()
if any(phrase in lowered for phrase in INDIRECT_INTENT_PHRASES):
if has_implementation_signal(prompt):
return ClassifyResult(True, surface_form, command)

return ClassifyResult(False, surface_form, command)


def has_implementation_signal(prompt: str) -> bool:
"""True iff the prompt carries any code-implementation signal — an
implementation verb or an indirect-intent phrase — INDEPENDENT of the
SKIP_PATTERNS.

#170: the post-preflight capture gate suppresses its disambiguation
reminder only when this is False (a genuinely read-only prompt cannot be
refining a surfaced decision). It deliberately does NOT consult
SKIP_PATTERNS: a skip-listed-but-verb-bearing prompt like "add tests for X"
still carries implementation signal and must reach the user-disambiguation
per #175 — never lexically pre-judged "compatible."
"""
if not prompt or not prompt.strip():
return False
_surface_form, command, _remainder = _detect_surface_form(prompt)
if command is not None and command in IMPL_INTENT_SLASH_COMMANDS:
return True
if _VERB_REGEX.search(prompt):
return True
lowered = prompt.lower()
return any(phrase in lowered for phrase in INDIRECT_INTENT_PHRASES)


def should_fire_preflight(prompt: str) -> bool:
"""Return True iff prompt indicates code-implementation intent.

Expand Down
16 changes: 15 additions & 1 deletion scripts/hooks/preflight_reminder.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,12 @@
from pathlib import Path

sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from hooks.preflight_intent import ClassifyResult, classify_prompt # noqa: E402
from hooks._prompt_intent_state import write_intent # noqa: E402
from hooks.preflight_intent import ( # noqa: E402
ClassifyResult,
classify_prompt,
has_implementation_signal,
)

REMINDER_TEXT = (
"<system-reminder>\n"
Expand Down Expand Up @@ -140,6 +145,15 @@ def main() -> int:
prompt = payload.get("prompt", "") if isinstance(payload, dict) else ""
result = classify_prompt(prompt)
_record_trigger_evaluated(result)
# #170: persist whether the prompt carries ANY implementation signal so the
# PostToolUse capture hook can stay silent on genuinely read-only prompts
# (no refinement possible). Uses has_implementation_signal — NOT
# should_fire_preflight — so skip-listed-but-verb-bearing prompts like
# "add tests for X" still reach the user-disambiguation per #175.
write_intent(
payload.get("session_id", "") if isinstance(payload, dict) else "",
has_implementation_signal(prompt),
)
if result.fire:
json.dump(
{
Expand Down
168 changes: 168 additions & 0 deletions tests/test_post_preflight_capture_reminder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
"""#170 — implementation-intent gate on the post-preflight capture reminder.

The PostToolUse capture hook must suppress its disambiguation reminder when the
originating user prompt had no implementation intent (read-only), and must keep
firing for implementation-intent prompts — including compatible ones like
"add tests for X" (the #175 invariant: never lexically pre-judge contradiction).
"""

from __future__ import annotations

import json
import os
import subprocess
import sys
import time
import uuid
from pathlib import Path

REPO_ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(REPO_ROOT))

from scripts.hooks._prompt_intent_state import ( # noqa: E402
read_intent,
state_path,
write_intent,
)
from scripts.hooks.preflight_intent import ( # noqa: E402
has_implementation_signal,
should_fire_preflight,
)

POST_HOOK = REPO_ROOT / "scripts" / "hooks" / "post_preflight_capture_reminder.py"
PRE_HOOK = REPO_ROOT / "scripts" / "hooks" / "preflight_reminder.py"

_DECISION = {"decision_id": "decision:abc123", "description": "Drag-to-reorder commits"}


def _sid() -> str:
return f"test-170-{uuid.uuid4().hex}"


def _run(script: Path, payload: dict) -> tuple[int, str]:
proc = subprocess.run(
[sys.executable, str(script)],
input=json.dumps(payload),
capture_output=True,
text=True,
timeout=15,
)
return proc.returncode, proc.stdout


def _fired(stdout: str) -> bool:
"""True iff the hook emitted a capture-reminder envelope."""
if not stdout.strip():
return False
parsed = json.loads(stdout)
return "hookSpecificOutput" in parsed


def _post_payload(sid: str) -> dict:
return {
"tool_name": "mcp__bicameral__bicameral_preflight",
"tool_response": {"fired": True, "decisions": [_DECISION]},
"session_id": sid,
}


# ── State module unit tests ────────────────────────────────────────────


def test_state_round_trip():
sid = _sid()
write_intent(sid, True)
assert read_intent(sid) is True
write_intent(sid, False)
assert read_intent(sid) is False


def test_read_absent_is_none():
assert read_intent(_sid()) is None # never written
assert read_intent("") is None # empty session id


def test_ttl_sweep_removes_stale():
stale_sid, fresh_sid = _sid(), _sid()
write_intent(stale_sid, True)
# backdate the stale file well past the TTL
os.utime(state_path(stale_sid), (time.time() - 90_000, time.time() - 90_000))
write_intent(fresh_sid, True) # any write triggers the sweep
assert not state_path(stale_sid).exists(), "stale state file should be swept"
assert read_intent(fresh_sid) is True, "fresh state file must survive the sweep"


def test_read_stale_is_none():
sid = _sid()
write_intent(sid, False)
os.utime(state_path(sid), (time.time() - 90_000, time.time() - 90_000))
assert read_intent(sid) is None


# ── PostToolUse capture-hook behavioral tests (subprocess) ──────────────


def test_suppressed_when_read_only():
"""Read-only prompt (fire=False persisted) → reminder suppressed."""
sid = _sid()
write_intent(sid, False)
rc, out = _run(POST_HOOK, _post_payload(sid))
assert rc == 0
assert not _fired(out), "capture reminder must be suppressed for a non-impl prompt"


def test_fires_when_impl_intent():
"""Implementation-intent prompt (fire=True persisted) → reminder fires."""
sid = _sid()
write_intent(sid, True)
rc, out = _run(POST_HOOK, _post_payload(sid))
assert rc == 0
assert _fired(out), "capture reminder must fire for an impl-intent prompt"
assert "AskUserQuestion" in out # the #175 disambiguation path intact


def test_default_fires_when_state_absent():
"""No state file for the session → fire (safe default; missed capture is irreversible)."""
rc, out = _run(POST_HOOK, _post_payload(_sid()))
assert rc == 0
assert _fired(out), "absent state must default to firing"


def test_fires_for_compatible_add_tests():
"""#175 guard (end-to-end): 'add tests for X' carries implementation signal
(verb 'add'), so it is NOT lexically suppressed and reaches the
user-disambiguation — even though should_fire_preflight skip-lists it."""
prompt = "add tests for the existing drag-to-reorder behavior"
assert has_implementation_signal(prompt) is True
assert (
should_fire_preflight(prompt) is False
) # skip-listed: documents the deliberate divergence
sid = _sid()
rc, _ = _run(PRE_HOOK, {"prompt": prompt, "session_id": sid})
assert rc == 0
assert read_intent(sid) is True # write side persists has_implementation_signal
rc, out = _run(POST_HOOK, _post_payload(sid))
assert rc == 0 and _fired(out), "add-tests must reach user-disambiguation (#175)"


# ── Write-side: UserPromptSubmit hook persists the classification ───────


def test_write_side_persists_signal():
sid = _sid()
prompt = "refactor the rate limiter to a sliding window"
rc, _ = _run(PRE_HOOK, {"prompt": prompt, "session_id": sid})
assert rc == 0
expected = has_implementation_signal(prompt)
assert expected is True
assert read_intent(sid) is True


def test_write_side_persists_false_for_read_only():
sid = _sid()
prompt = "what does the rate limiter currently do?"
rc, _ = _run(PRE_HOOK, {"prompt": prompt, "session_id": sid})
assert rc == 0
expected = has_implementation_signal(prompt)
assert expected is False
assert read_intent(sid) is False
Loading