Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
- Per-owner concurrency limit now configurable via `AWARENESS_MAX_CONCURRENT_PER_OWNER` (default raised from 3 to 10) — Claude.ai sends parallel MCP requests that exceeded the old limit, causing 429 errors surfaced as "authorization failed"
- OAuth proxy rate limits now configurable via `AWARENESS_OAUTH_PROXY_RATE_{AUTHORIZE,TOKEN,REGISTER}` (defaults raised from 20/10/5 to 60/60/30 req/min) and `AWARENESS_OAUTH_PROXY_RATE_WINDOW` (sliding window, default 60s)
- SessionRegistryMiddleware now compatible with MCP SDK 1.27.0 SSE responses — `_buffer_body` forwards to real `receive` after replay, `_handle_subsequent` streams 2xx responses immediately, `_reinitialize` blocks until task group cancellation for SSE disconnect detection

## [0.16.0] - 2026-04-08

Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
> **Your AI's memory shouldn't be locked to one app. It should follow you everywhere.**

> [!NOTE]
> Early-stage but actively deployed — 721 tests, 15 releases, in daily use across Claude.ai, Claude Code, and Claude Desktop. See [Current status](#current-status) for what's working and what's planned.
> Early-stage but actively deployed — 725 tests, 15 releases, in daily use across Claude.ai, Claude Code, and Claude Desktop. See [Current status](#current-status) for what's working and what's planned.

## What this is

Expand Down Expand Up @@ -396,7 +396,7 @@ For single-user deployments, secret path + WAF is sufficient. For multi-user, en
- Secret path auth + Cloudflare WAF for edge-level access control
- Docker Compose with Postgres, optional Ollama, named Cloudflare Tunnel, or ephemeral quick tunnel
- Request timing instrumentation and `/health` endpoint
- 721 tests (all against real Postgres + Ollama in CI), strict type checking, CI pipeline with coverage, QA gate
- 725 tests (all against real Postgres + Ollama in CI), strict type checking, CI pipeline with coverage, QA gate

### Not yet implemented
- Layer 2 (baseline) detection — rolling averages and deviation calculation
Expand Down
71 changes: 57 additions & 14 deletions src/mcp_awareness/session_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import time
from typing import Any

import anyio
import psycopg
from psycopg.rows import dict_row
from psycopg_pool import ConnectionPool
Expand Down Expand Up @@ -391,18 +392,30 @@ async def _handle_subsequent(
# Buffer body for potential re-init replay
body, replay_receive = await self._buffer_body(receive)

# Buffer response to inspect status before sending to client.
# This adds latency equal to response size — acceptable for JSON-RPC
# responses but would be problematic for SSE streams. POST /mcp
# responses are always JSON-RPC, so this is safe.
# Intercept http.response.start to inspect status. For 2xx (success)
# responses — which may be SSE streams in MCP SDK >=1.27.0 — forward
# all messages immediately. For 400/404 (session not found) buffer the
# response so we can attempt cross-node re-init without the client
# seeing the error. Error responses are always small JSON, not SSE.
captured_status = 0
captured_response_parts: list[Message] = []
buffered_parts: list[Message] = []
streaming = False # True once we've committed to forwarding to client

async def capturing_send(message: Message) -> None:
nonlocal captured_status
nonlocal captured_status, streaming
if message["type"] == "http.response.start":
captured_status = message["status"]
captured_response_parts.append(message)
if 200 <= captured_status < 300:
# Success — stream through immediately (may be SSE)
streaming = True
await send(message)
else:
# Error — buffer for potential re-init interception
buffered_parts.append(message)
elif streaming:
await send(message)
else:
buffered_parts.append(message)

await self.app(scope, replay_receive, capturing_send)

Expand All @@ -419,9 +432,10 @@ async def capturing_send(message: Message) -> None:
return # Re-init handled the response
logger.warning("Re-initialization failed for session %s", session_id)

# Send captured response to client (original or error)
for part in captured_response_parts:
await send(part)
# Send buffered error response to client (only if not already streaming)
if not streaming:
for part in buffered_parts:
await send(part)

# Touch on success (debounced)
if 200 <= captured_status < 300:
Expand Down Expand Up @@ -467,8 +481,20 @@ async def _reinitialize(
init_status = 0
init_headers: list[tuple[bytes, bytes]] = []

init_body_sent = False

async def init_receive() -> Message:
return {"type": "http.request", "body": init_body}
nonlocal init_body_sent
if not init_body_sent:
init_body_sent = True
return {"type": "http.request", "body": init_body}
# Block until the task group cancels us. SSE disconnect detection
# calls receive() and aborts if it sees http.disconnect, so we must
# keep a live receive instead of returning a fake disconnect. The
# enclosing task group in _reinitialize cancels this coroutine once
# the SSE response stream completes.
while True:
await anyio.sleep(3600)

async def init_send(message: Message) -> None:
nonlocal init_status, init_headers
Expand Down Expand Up @@ -506,8 +532,16 @@ async def init_send(message: Message) -> None:
# Step 3: Replay original request with new session_id
replay_scope = self._rewrite_session_header(original_scope, new_session_id)

replay_body_sent = False

async def replay_receive() -> Message:
return {"type": "http.request", "body": original_body}
nonlocal replay_body_sent
if not replay_body_sent:
replay_body_sent = True
return {"type": "http.request", "body": original_body}
# Block until cancelled — see init_receive for explanation.
while True:
await anyio.sleep(3600)

async def replay_send(message: Message) -> None:
if message["type"] == "http.response.start":
Expand Down Expand Up @@ -576,7 +610,14 @@ async def _send_error(send: Send, status: int, message: str) -> None:

@staticmethod
async def _buffer_body(receive: Receive) -> tuple[bytes, Receive]:
"""Buffer the full request body and return (body_bytes, replay_receive)."""
"""Buffer the full request body and return (body_bytes, replay_receive).

After buffered chunks are replayed, subsequent calls forward to the
original ``receive``. This is critical for SSE responses (MCP SDK
>=1.27.0) where ``EventSourceResponse`` calls ``receive()`` to detect
client disconnect — returning a synthetic ``http.disconnect`` would
abort the SSE stream before sending any response.
"""
chunks: list[Message] = []
body_parts: list[bytes] = []
while True:
Expand All @@ -596,7 +637,9 @@ async def replay_receive() -> Message:
try:
return next(chunk_iter)
except StopIteration:
return {"type": "http.disconnect"}
# Forward to the real receive so SSE disconnect detection
# works — do NOT return a synthetic http.disconnect.
return await receive()

return body, replay_receive

Expand Down
Loading