Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ All notable changes to bicameral-mcp are tracked here. Format loosely follows

### Added

- **Preflight Step 3.5 queue-drain integration (#156 PR B).** Closes the cross-flow correction-capture path opened by PR A. Capture-corrections in-session mode (invoked by preflight Step 3.5) now drains `<repo>/.bicameral/pending-transcripts/` before scanning recent in-session turns. Drained ask-corrections share the existing preflight ≤4-question cap; remaining pending files stay queued for the next preflight. New telemetry fields `g11_queue_drained` / `g11_queue_remaining` / `g11_queue_cap_hit` quantify how often the cap binds. New e2e flow `Flow 4b` (`tests/e2e/prompts/flow-4b-queue-drain.md`) validates the full SessionEnd-write → next-preflight-drain → ingest pipeline end-to-end. New `--flow PATTERN` argparse filter on `tests/e2e/run_e2e_flows.py:main()` plus `_filter_flow_plan` helper (3 unit tests in `tests/test_run_e2e_flows_filter.py`) enables targeted CI invocations like `--flow "Flow 4"` (runs both Flow 4 and Flow 4b together — the canonical cross-flow validation command).
- **CI grounding lint for plan files and PR bodies (#114).** Two new
checkers ship together:
- `scripts/lint_plan_grounding.py` — walks `plan-*.md` files for
Expand Down
151 changes: 151 additions & 0 deletions plan-156b-preflight-queue-drain.md

Large diffs are not rendered by default.

16 changes: 15 additions & 1 deletion skills/bicameral-capture-corrections/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ bicameral.skill_end(skill_name="bicameral-capture-corrections", session_id=<stor
g11_corrections_classified_not: N,
g11_corrections_dedup_removed: N,
g11_user_overrode: N, # ask corrections user declined — labeled precision signal
g11_queue_drained: N, # pending files fully processed and archived (#156 PR B)
g11_queue_remaining: N, # pending files left after drain (>0 when cap was hit OR partial processing left files for next preflight)
g11_queue_cap_hit: <bool>, # true if accumulated ask-corrections reached 4 mid-drain
})
```

Expand Down Expand Up @@ -153,6 +156,17 @@ re-examine the same turns repeatedly).

### Steps

**0. Drain the pending-transcripts queue (#156 PR B).**
Before scanning recent in-session turns, drain the pending-transcripts queue per the canonical "Step 0 — drain the pending-transcripts queue (#156)" rubric above. In in-session mode the drain is bounded:

- Process pending files in mtime-order (oldest first), applying Steps A/B/C to each file's user turns.
- Track accumulated ask-corrections across all processed files.
- When accumulated ask-corrections reach 4 (the preflight ≤4-question cap), stop processing further pending files and surface a final note: "N more pending transcript(s) — invoke `/bicameral-capture-corrections` directly to drain manually." Remaining files stay in `.bicameral/pending-transcripts/` for the next preflight.
- Archive each fully-processed file via `python3 scripts/hooks/transcript_archive.py <basename>.jsonl`. Do NOT archive partially-processed files (the cap was hit mid-scan); the file stays pending and the next preflight resumes from its first un-surfaced correction.
- If `<repo>/.bicameral/pending-transcripts/` doesn't exist or is empty, skip Step 0 silently — same shape as the canonical rubric's empty path.

The 4-cap is shared with the in-session turn-scan that runs in step 1 below: queue-drained ask-corrections + in-session ask-corrections ≤ 4 total. If the queue alone fills the cap, the in-session turn scan still runs (its mechanical corrections still auto-ingest silently) but its ask-corrections are dropped (not surfaced) to preserve the cap.

**1. Run the canonical rubric** (Steps A → B → C above) on the last ~10
user messages.

Expand All @@ -166,7 +180,7 @@ Preflight merges them into its stop-and-ask queue (one question max,
priority slot 3: after drift, before open questions).

**4. Silent empty path.**
If no corrections found, return nothing. Preflight continues without any
If no corrections found (across both the queue drain and the in-session scan), return nothing. Preflight continues without any
capture-corrections output.

---
Expand Down
2 changes: 2 additions & 0 deletions skills/bicameral-preflight/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,8 @@ into the stop-and-ask queue below.
- Ask corrections → add as `uningested_corrections` category (priority
slot 3: after drift, before open questions). One question max.

**Queue drain (#156 PR B):** in-session mode also drains the pending-transcripts queue at `<repo>/.bicameral/pending-transcripts/` — transcripts from prior sessions whose corrections never surfaced (because that session ended without a follow-up preflight). Drained ask-corrections share the same ≤4-cap as in-session corrections; remaining pending files stay queued for the next preflight to pick up. The canonical drain rubric lives in `skills/bicameral-capture-corrections/SKILL.md` (Step 0 of the scan-and-classify rubric); preflight delegates to it via the in-session mode invocation.

### 4. Classify findings before surfacing

Before rendering anything, classify each finding as **mechanical** or
Expand Down
5 changes: 5 additions & 0 deletions tests/e2e/prompts/flow-4b-queue-drain.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Flow 4b: Queue drain via preflight

You're continuing work in the same project as the prior session. Make a small change to a tracked file in this repo: pick any function in `events/writer.py` and add a one-line docstring to it (no behavior change). Use the standard write-op flow.

(This prompt deliberately does not mention bicameral, queues, or corrections. The queue drain happens automatically through the preflight hook on the user-prompt classification.)
119 changes: 106 additions & 13 deletions tests/e2e/run_e2e_flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -1103,6 +1103,63 @@ def assert_flow_5(calls: list[dict]) -> tuple[bool, str]:
)


def assert_flow_4b(calls: list[dict]) -> tuple[bool, str]:
"""Flow 4b: cross-flow ledger assertion via queue drain (#156 PR B).

Validates the SessionEnd-write -> next-preflight-drain -> ingest pipeline
end-to-end. The prior flow (Flow 4) plants a correction in its single
user prompt; the SessionEnd hook (post #156 PR A) writes the transcript
to ``.bicameral/pending-transcripts/<session_id>.jsonl``; Flow 4b's
auto-fired preflight invokes capture-corrections in-session mode, which
(post #156 PR B) drains the queue, surfaces the correction, and ingests
it once the user-confirmation flow accepts.

Three assertions:
1. ``<repo>/.bicameral/pending-transcripts/`` is empty after Flow 4b
(drain completed).
2. ``<repo>/.bicameral/processed-transcripts/`` contains the archived
transcript file (archive_processed via transcript_archive.py CLI).
3. Ledger contains >=1 ``source_type=agent_session`` decision.
"""
repo = pathlib.Path(DESKTOP_REPO_PATH)
pending = repo / ".bicameral" / "pending-transcripts"
processed = repo / ".bicameral" / "processed-transcripts"

if pending.is_dir() and any(pending.glob("*.jsonl")):
leftover = sorted(p.name for p in pending.glob("*.jsonl"))
return False, f"pending-transcripts not drained; leftover: {leftover}"

if not processed.is_dir() or not list(processed.glob("*.jsonl")):
return False, "processed-transcripts is empty; archive_processed never ran"

snapshot = _snapshot_ledger()
count = _count_agent_session_decisions(snapshot)
if count is None:
return False, f"ledger query failed: {snapshot.get('error')}"
if count == 0:
return False, "no source=agent_session decisions in ledger; ingest never landed"

archived = sorted(p.name for p in processed.glob("*.jsonl"))
return True, (
f"queue drain pipeline validated: pending=empty, processed={archived}, "
f"ledger has {count} agent_session decision(s)"
)


def _filter_flow_plan(plan: list[FlowSpec], pattern: str | None) -> list[FlowSpec]:
"""Filter FLOW_PLAN by substring match on ``flow_id``. Pure function.

When ``pattern`` is None, returns ``plan`` unchanged (CI default — run
every flow). When ``pattern`` is given, returns the FlowSpecs whose
``flow_id`` contains ``pattern`` as a case-sensitive substring; order
preserved from the source list. Empty result is the caller's signal
to exit non-zero (typo in --flow argument).
"""
if pattern is None:
return plan
return [s for s in plan if pattern in s.flow_id]


FLOW_PLAN: list[FlowSpec] = [
FlowSpec(
flow_id="Flow 1",
Expand Down Expand Up @@ -1167,15 +1224,28 @@ def assert_flow_5(calls: list[dict]) -> tuple[bool, str]:
"Flow 4 captures an emerging constraint via correction markers "
'("wait", "shouldn\'t") — no collision-detection involved. NOT '
"the same gap as #154 (which is Flow 2a / contradiction-with-"
"prior-decision specific). #156 (PR A) has landed: the "
"SessionEnd hook now writes the parent transcript into "
".bicameral/pending-transcripts/<session_id>.jsonl and the "
"capture-corrections SKILL.md adds Step 0 to drain the queue "
"in the next session. In-flow assertions in this asserter "
"remain valid; cross-flow ledger validation via the queue "
"drain (PR B) will add the next-session preflight Step 3.5 "
"integration that ingests drained corrections into the "
"harness's test ledger."
"prior-decision specific). #156 (PR A) shipped the queue-write; "
"#156 (PR B) shipped the preflight Step 3.5 queue-drain "
"integration — see Flow 4b for the cross-flow ledger assertion "
"that validates the full session_end → next-preflight → ingest "
"pipeline end-to-end."
),
),
FlowSpec(
flow_id="Flow 4b",
prompt_file="flow-4b-queue-drain.md",
asserter=assert_flow_4b,
category="agentic_layer",
session_group="dev_session",
advisory=(
"Flow 4b validates the cross-flow path closed by #156 PR B: "
"the prior flow's SessionEnd hook wrote a transcript into "
".bicameral/pending-transcripts/; Flow 4b's auto-fired "
"preflight (UserPromptSubmit hook) invokes capture-corrections "
"in-session mode, which now drains the queue per #156 PR B's "
"Step 0 integration. Asserter checks the test ledger contains "
"a source=agent_session decision describing the prior flow's "
"correction — proving the cross-flow capture path closed."
),
),
FlowSpec(
Expand All @@ -1191,12 +1261,35 @@ def assert_flow_5(calls: list[dict]) -> tuple[bool, str]:


def main() -> int:
import argparse

parser = argparse.ArgumentParser(description="v0 user flow e2e harness")
parser.add_argument(
"--flow",
metavar="PATTERN",
default=None,
help="run only flows whose flow_id contains PATTERN (substring, case-sensitive)",
)
args = parser.parse_args()

selected_plan = _filter_flow_plan(FLOW_PLAN, args.flow)
if args.flow is not None and not selected_plan:
sys.stderr.write(
f"ERROR: --flow={args.flow!r} matched zero flows. Available: "
f"{[s.flow_id for s in FLOW_PLAN]}\n"
)
return 2

print("=== v0 user flow e2e — Claude Code CLI sessions ===")
print(f"DESKTOP_REPO_PATH: {DESKTOP_REPO_PATH}")
print(f"MCP config: {MCP_CONFIG_PATH}")
print(f"Ledger (persisted): {LEDGER_DIR}")
print(f"Transcripts: {RESULTS_DIR}")
print(f"Flows: {len(FLOW_PLAN)}\n")
if args.flow is not None:
print(
f"Filter --flow={args.flow!r}: {len(selected_plan)} of {len(FLOW_PLAN)} flows selected"
)
print(f"Flows: {len(selected_plan)}\n")

_clean_ledger()
_clean_claude_memory()
Expand All @@ -1210,15 +1303,15 @@ def main() -> int:

group_session_ids: dict[str, str] = {}
group_seen: set[str] = set()
chained_groups = sorted({s.session_group for s in FLOW_PLAN if s.session_group})
chained_groups = sorted({s.session_group for s in selected_plan if s.session_group})
if chained_groups:
print("Chained session groups:")
for g in chained_groups:
sid = str(uuid.uuid4())
group_session_ids[g] = sid
members = [
s.flow_id
for s in FLOW_PLAN
for s in selected_plan
if s.session_group == g and not s.skip and not s.reuses_flow
]
print(f" {g}: {sid[:8]}… → {' → '.join(members)}")
Expand All @@ -1229,7 +1322,7 @@ def main() -> int:
# taken just before the first dev_session flow runs.
dev_session_baseline: dict | None = None

for spec in FLOW_PLAN:
for spec in selected_plan:
# Snapshot baseline once, immediately before the first dev_session
# flow. This means Flow 1's effects are baked in but Flow 2/3/4's
# effects (the ones we want to measure) are not.
Expand Down
62 changes: 62 additions & 0 deletions tests/test_run_e2e_flows_filter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
"""Behavioral tests for the `--flow` substring filter helper in
``tests/e2e/run_e2e_flows.py`` (#156 PR B Phase 2).

The helper ``_filter_flow_plan(plan, pattern)`` is a pure function: given
the canonical FLOW_PLAN list and an optional substring pattern, returns
the subset of FlowSpecs whose ``flow_id`` contains the pattern. Used by
``main()``'s ``--flow PATTERN`` argparse arg so CI can validate one flow
(or one cross-flow pair like Flow 4 + Flow 4b) without running the full
e2e suite.

Mirrors the import pattern from ``tests/test_e2e_asserters.py:30-42``:
``run_e2e_flows`` performs ``shutil.which`` lookups for ``claude`` and
``bicameral-mcp`` at import time; this stub-and-import dance lets the
helper be tested without those binaries on PATH.
"""

from __future__ import annotations

import shutil
import sys
from pathlib import Path

REPO_ROOT = Path(__file__).resolve().parent.parent
E2E_DIR = REPO_ROOT / "tests" / "e2e"
if str(E2E_DIR) not in sys.path:
sys.path.insert(0, str(E2E_DIR))

_orig_which = shutil.which


def _which_stub(name: str, *args, **kwargs):
if name in ("claude", "bicameral-mcp"):
return f"/stub/{name}"
return _orig_which(name, *args, **kwargs)


shutil.which = _which_stub # type: ignore[assignment]
try:
import run_e2e_flows # noqa: E402
finally:
shutil.which = _orig_which # type: ignore[assignment]


def test_filter_flow_plan_returns_all_when_pattern_is_none() -> None:
result = run_e2e_flows._filter_flow_plan(run_e2e_flows.FLOW_PLAN, None)
assert result == run_e2e_flows.FLOW_PLAN
assert result is run_e2e_flows.FLOW_PLAN or len(result) == len(run_e2e_flows.FLOW_PLAN)


def test_filter_flow_plan_substring_matches_multiple() -> None:
result = run_e2e_flows._filter_flow_plan(run_e2e_flows.FLOW_PLAN, "Flow 4")
flow_ids = [s.flow_id for s in result]
assert "Flow 4" in flow_ids
assert "Flow 4b" in flow_ids
expected_order = [s.flow_id for s in run_e2e_flows.FLOW_PLAN if "Flow 4" in s.flow_id]
assert flow_ids == expected_order


def test_filter_flow_plan_exact_match_returns_single() -> None:
result = run_e2e_flows._filter_flow_plan(run_e2e_flows.FLOW_PLAN, "Flow 4b")
assert len(result) == 1
assert result[0].flow_id == "Flow 4b"
Loading