Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 83 additions & 3 deletions handlers/preflight.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
from preflight_telemetry import (
new_preflight_id,
telemetry_enabled,
write_dedup_event,
write_preflight_event,
)

Expand Down Expand Up @@ -325,6 +326,61 @@ def _check_dedup(
return False


def _dedup_miss_was_revision_bump(
ctx,
topic: str,
file_paths: list[str] | None,
ledger_revision: str | None,
) -> bool:
"""Classify a dedup miss: did it miss because ``ledger_revision``
advanced since the prior same-(topic, file_paths) call?

Returns True when:
- the current (topic, file_paths) prefix has been seen before within
``_DEDUP_TTL_SECONDS``,
- but the prior entry's revision component differs from the current
``ledger_revision``.

This is the M7a/M7c signal — a decision landed (M7a) or HITL state
cleared (M7c) between two same-topic/same-paths calls, and the new
`ledger_revision` invalidated the cache as intended. Phase 5
telemetry (#87) emits a ``preflight_dedup_decision`` event with
``reason=invalidated_by_revision_bump`` on True.

False for: first-call misses (no prior prefix entry), expired
entries (older than TTL), and file_paths-shift misses (the prefix
itself differs, not the revision suffix).
"""
sync_state = getattr(ctx, "_sync_state", None)
if not isinstance(sync_state, dict):
return False
topics: dict[str, float] = sync_state.get("preflight_topics") or {}
if not topics:
return False
current_key = _dedup_key_for(topic, file_paths, ledger_revision)
parts = current_key.split("||")
if len(parts) != 3:
return False
topic_norm, paths_norm, current_rev = parts
if not topic_norm:
return False
prefix = f"{topic_norm}||{paths_norm}||"
now = time.time()
for stored_key, ts in topics.items():
if not stored_key.startswith(prefix):
continue
if stored_key == current_key:
# Identical key — would have been a cache hit, not a miss.
continue
if now - ts >= _DEDUP_TTL_SECONDS:
continue
# Same prefix, different rev, within TTL → revision bump.
stored_rev = stored_key[len(prefix) :]
if stored_rev != current_rev:
return True
return False


async def _region_anchored_preflight(
ctx,
file_paths: list[str],
Expand Down Expand Up @@ -549,14 +605,20 @@ async def handle_preflight(

if ledger_revision is None:
# BYPASS: revision is unknown → cannot safely dedup. Loud warning
# for ops visibility; telemetry counter wiring lands in Phase 5
# (#87) so dashboards can quantify how often this happens in
# production.
# for ops visibility; #87 Phase 5 telemetry counter quantifies
# how often this happens in production. A sustained spike is the
# signal to look at ledger health (transient SurrealDB faults,
# schema mismatch, etc.).
logger.warning(
"[preflight] dedup bypassed — ledger_revision lookup failed for "
"topic %r; the next same-topic call will re-evaluate fully",
topic[:60],
)
write_dedup_event(
reason="bypassed_revision_unknown",
session_id=session_id,
preflight_id=pid,
)
elif _check_dedup(ctx, topic, file_paths, ledger_revision):
logger.debug(
"[preflight] dedup hit for topic=%r file_paths=%s rev=%s",
Expand All @@ -582,6 +644,24 @@ async def handle_preflight(
preflight_id=pid,
)

# Cache-miss classification (#87 Phase 5): if the miss was caused by
# a ledger_revision bump (same topic + file_paths seen before but
# with a different revision still within TTL), emit the M7a/c
# signal. This is the counter Kevin asked for — "so we can tell the
# new key is doing useful work in production". File-paths-shift
# misses (M7b) are intentionally NOT counted here; the file_paths
# component of the key is observable from preflight_events.jsonl
# via the existing ``file_paths_hash`` field if a follow-up wants
# to backfill that metric.
if ledger_revision is not None and _dedup_miss_was_revision_bump(
ctx, topic, file_paths, ledger_revision
):
write_dedup_event(
reason="invalidated_by_revision_bump",
session_id=session_id,
preflight_id=pid,
)

# V1 A3: time the call locally so the metric reflects THIS handler's catch-up.
import time as _time

Expand Down
51 changes: 51 additions & 0 deletions preflight_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,57 @@ def write_fallback_event(reason: str, session_id: str) -> None:
_append(_EVENTS_FILE, record)


# ── #87 Phase 5: preflight dedup-cache decision counters ─────────────


def write_dedup_event(
reason: str,
session_id: str,
preflight_id: str | None = None,
) -> None:
"""Append a preflight-dedup decision event to
``~/.bicameral/preflight_events.jsonl``.

Fires on the two dedup outcomes that matter for #87 Phase 5
instrumentation:

- ``invalidated_by_revision_bump`` — a same-(topic, file_paths) call
missed the cache because ``ledger_revision`` advanced since the
prior call. This is the M7a/M7c signal — proves the new key shape
is doing useful work in production (the metric Kevin asked for at
signoff: *"so we can tell the new key is doing useful work in
production"*).

- ``bypassed_revision_unknown`` — ``get_ledger_revision()`` returned
None and the handler short-circuited the dedup check per Kevin's
amendment (correctness over saving a preflight call). Watching
this counter lets ops detect transient SurrealDB faults or
schema-mismatch incidents — a sustained spike is a "look at the
ledger" signal.

Other dedup outcomes (cache hit, first-call miss, topic-changed,
file_paths-shift) are intentionally NOT emitted. Phase 5's scope is
the *change-detection signal*; hit/miss baselines are derivable
from ``write_preflight_event`` rows with ``reason="recently_checked"``
if needed later.

No-op when telemetry is disabled. Written into the same JSONL file
as other preflight events so operator triage joins on a single
substrate.
"""
if not telemetry_enabled():
return
record: dict = {
"ts": datetime.now(UTC).isoformat(),
"event_type": "preflight_dedup_decision",
"reason": reason,
"session_id": session_id,
}
if preflight_id:
record["preflight_id"] = preflight_id
_append(_EVENTS_FILE, record)


# ── Phase 4: #112 HITL bypass flow ───────────────────────────────────


Expand Down
Loading
Loading