fix(mcp): Strip stale mcp-session-id to prevent 400 errors across proxy workers#21417
Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
Greptile SummaryThis PR refactors the
No functional changes to session routing logic — stale sessions are still stripped on non-DELETE and return 200 on DELETE, same as before. Confidence Score: 4/5
|
| Filename | Overview |
|---|---|
| litellm/proxy/_experimental/mcp_server/server.py | Refactors _handle_stale_mcp_session with case-insensitive header matching, defensive type checks, improved error handling around _server_instances access, fixed issue reference (#20292→#20992), and lazy log formatting. Adds a spurious blank line in stdlib imports. |
| tests/mcp_tests/test_mcp_server.py | Minor test comment update clarifying that send is passed directly without a wrapper. No behavioral changes. |
Flowchart
%%{init: {'theme': 'neutral'}}%%
flowchart TD
A[Incoming MCP Request] --> B{Has mcp-session-id header?}
B -- No --> G[Pass to session manager]
B -- Yes --> C{Can inspect _server_instances?}
C -- No --> G
C -- Yes --> D{Session ID in worker memory?}
D -- Yes --> G
D -- Error --> G
D -- No / Stale --> E{HTTP method?}
E -- DELETE --> F[Return 200 OK immediately]
E -- Non-DELETE --> H[Strip mcp-session-id header]
H --> G
Last reviewed commit: 406c468
| # Check Redis first for multi-worker deployments | ||
| session_exists_in_redis = False | ||
| if _REDIS_SESSION_STORE: | ||
| try: | ||
| session_exists_in_redis = await _REDIS_SESSION_STORE.session_exists(_session_id) | ||
| verbose_logger.debug( | ||
| f"MCP session {_session_id} exists in Redis: {session_exists_in_redis}" | ||
| ) | ||
| except Exception as e: | ||
| verbose_logger.error( | ||
| f"Error checking MCP session {_session_id} in Redis: {e}. " | ||
| "Falling back to in-memory check." | ||
| ) | ||
|
|
||
| # Fallback to in-memory check | ||
| known_sessions = getattr(mgr, "_server_instances", None) | ||
| if known_sessions is None or _session_id in known_sessions: | ||
| session_exists_in_memory = known_sessions is not None and _session_id in known_sessions | ||
|
|
||
| # Session exists if it's in either Redis or memory | ||
| if session_exists_in_redis or session_exists_in_memory or known_sessions is None: |
There was a problem hiding this comment.
Sessions are never saved to Redis — Redis check is always false
The _handle_stale_mcp_session function calls _REDIS_SESSION_STORE.session_exists(), but save_session() is never called anywhere in the server code. No code path writes sessions to Redis when they are created (e.g., during /mcp/initialize). This means session_exists will always return False, and the multi-worker problem described in #20992 is not actually solved by this PR.
To fix this, you need to call _REDIS_SESSION_STORE.save_session() when a new MCP session is created (likely after the session manager processes an initialize request) and _REDIS_SESSION_STORE.delete_session() when sessions are terminated.
| value = json.dumps(metadata_with_ts) | ||
|
|
||
| # Save to Redis with TTL | ||
| await self.redis_cache.async_set_cache( | ||
| key=key, | ||
| value=value, | ||
| ttl=self.ttl |
There was a problem hiding this comment.
Double JSON encoding — value is serialized twice
save_session calls json.dumps(metadata_with_ts) to produce a JSON string, then passes this string to async_set_cache. However, RedisCache.async_set_cache internally calls json.dumps(value) again (see redis_cache.py:487), resulting in double-encoded JSON stored in Redis.
On read, _get_cache_logic does one json.loads, returning the still-JSON-encoded string, and then get_session does another json.loads — so it works by accident. But this is fragile: if the RedisCache internals change, or if anyone reads this key directly from Redis, they'll get a double-encoded string.
Remove the json.dumps here and pass the dict directly to async_set_cache.
| try: | ||
| import litellm | ||
| from litellm.caching.redis_cache import RedisCache |
There was a problem hiding this comment.
Inline imports violate codebase convention
Per the project's CLAUDE.md, imports should be placed at module level, not inside methods. import litellm and from litellm.caching.redis_cache import RedisCache should be moved to the top of the file or within the existing if MCP_AVAILABLE: block where other imports live (around line 112-127).
| try: | |
| import litellm | |
| from litellm.caching.redis_cache import RedisCache | |
| from litellm.caching.redis_cache import RedisCache |
Context Used: Context from dashboard - CLAUDE.md (source)
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
| # Delete from Redis | ||
| result = await self.redis_cache.async_delete_cache(key=key) | ||
|
|
There was a problem hiding this comment.
Inconsistent call pattern for delete method
The async_delete_cache in the Redis cache class defines its parameter as a positional arg. Most callers in the codebase pass it positionally rather than as a named argument.
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
Additional Comments (1)
Both session managers are created with |
Greptile SummaryThis PR adds a Additionally, the session managers are configured with
Confidence Score: 1/5
|
| Filename | Overview |
|---|---|
| litellm/proxy/_experimental/mcp_server/server.py | Adds Redis session store initialization and existence check in _handle_stale_mcp_session, but save_session() is never called so the Redis store is always empty. The stateless session manager config also means _server_instances isn't used for session persistence, making the Redis check ineffective. |
| litellm/proxy/_experimental/mcp_server/redis_session_store.py | New Redis-backed session store with save/get/delete/exists methods. Implementation is well-structured with error handling, but the class is never meaningfully used since save_session() is never called from the server code. |
| tests/mcp_tests/test_redis_session_store.py | Unit tests for the Redis session store class. Tests cover CRUD operations and error handling well but only test the isolated class — they don't verify integration with the actual MCP server flow. |
| tests/mcp_tests/test_multi_worker.py | Multi-worker integration tests simulate cross-worker session sharing, but only test the session store in isolation — they don't test the actual multi-worker MCP request flow where the session store is supposedly integrated. |
Flowchart
flowchart TD
A[Client Request with mcp-session-id] --> B{_handle_stale_mcp_session}
B --> C{_REDIS_SESSION_STORE exists?}
C -->|Yes| D[session_exists in Redis]
C -->|No| F[Check _server_instances in-memory]
D -->|Always False - save_session never called| E[session_exists_in_redis = False]
E --> F
F --> G{session in memory OR known_sessions is None?}
G -->|Yes| H[Return False - let session manager handle]
G -->|No - stale session| I{DELETE request?}
I -->|Yes| J[Return 200 success]
I -->|No| K[Strip stale header, return False]
style D fill:#f66,stroke:#333,color:#fff
style E fill:#f66,stroke:#333,color:#fff
Last reviewed commit: e5f2fad
| # Check Redis first for multi-worker deployments | ||
| session_exists_in_redis = False | ||
| if _REDIS_SESSION_STORE: | ||
| try: | ||
| session_exists_in_redis = await _REDIS_SESSION_STORE.session_exists(_session_id) | ||
| verbose_logger.debug( | ||
| f"MCP session {_session_id} exists in Redis: {session_exists_in_redis}" | ||
| ) | ||
| except Exception as e: | ||
| verbose_logger.error( | ||
| f"Error checking MCP session {_session_id} in Redis: {e}. " | ||
| "Falling back to in-memory check." | ||
| ) | ||
|
|
||
| # Fallback to in-memory check | ||
| known_sessions = getattr(mgr, "_server_instances", None) | ||
| if known_sessions is None or _session_id in known_sessions: | ||
| session_exists_in_memory = known_sessions is not None and _session_id in known_sessions | ||
|
|
||
| # Session exists if it's in either Redis or memory | ||
| if session_exists_in_redis or session_exists_in_memory or known_sessions is None: |
There was a problem hiding this comment.
save_session() is never called — Redis store is always empty
The _REDIS_SESSION_STORE.session_exists() check will always return False because save_session() is never called anywhere in the codebase. The MCPRedisSessionStore class defines save_session(), get_session(), delete_session(), and session_exists(), but the only integration point with server.py is this session_exists() call in _handle_stale_mcp_session(). No code ever writes session data to Redis.
This means the Redis-backed session store is entirely inert — it will never find a session because no session was ever stored. The multi-worker fix described in the PR cannot work as implemented.
To actually fix the multi-worker issue, sessions would need to be saved to Redis when they're created (e.g., during /mcp/initialize) and the session manager would need to be able to reconstruct server instances from Redis-stored metadata on a different worker.
| try: | ||
| import litellm | ||
| from litellm.caching.redis_cache import RedisCache |
There was a problem hiding this comment.
Inline imports violate project style guide
The import litellm and from litellm.caching.redis_cache import RedisCache statements inside the function body violate the project's code style convention. Per CLAUDE.md: "Avoid imports within methods — place all imports at the top of the file (module-level)."
These imports should be moved to the top of the file, inside the if MCP_AVAILABLE: block alongside the other conditional imports.
Context Used: Context from dashboard - CLAUDE.md (source)
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
| async def session_exists(self, session_id: str) -> bool: | ||
| """ | ||
| Check if a session exists in Redis. | ||
|
|
||
| Args: | ||
| session_id: The MCP session ID to check | ||
|
|
||
| Returns: | ||
| True if session exists, False otherwise | ||
| """ | ||
| if not self.redis_cache: | ||
| return False | ||
|
|
||
| try: | ||
| metadata = await self.get_session(session_id) | ||
| return metadata is not None | ||
| except Exception as e: | ||
| verbose_logger.error( | ||
| f"Failed to check MCP session {session_id} existence: {e}" | ||
| ) | ||
| return False |
There was a problem hiding this comment.
session_exists performs full GET+deserialize instead of using Redis EXISTS
The session_exists() method calls get_session() which fetches the full value and deserializes it from JSON just to check existence. This is wasteful — Redis has a dedicated EXISTS command that returns a boolean without transferring the value. For a check that runs on every MCP request in the hot path, this adds unnecessary latency and bandwidth.
Consider using a direct Redis EXISTS check or the RedisCache equivalent if available.
Additional Comments (1)
The session managers are configured with The core multi-worker issue (sessions stored in per-worker Adding a Redis existence check on top of a stateless session manager doesn't change the request handling behavior in a meaningful way. |
e5f2fad to
ff42117
Compare
ff42117 to
91881ee
Compare
|
@greptileai review |
Greptile SummaryThis PR adds a Redis-backed session registry (
Confidence Score: 2/5
|
| Filename | Overview |
|---|---|
| litellm/proxy/_experimental/mcp_server/redis_session_store.py | New Redis-backed session store class. Clean implementation with proper error handling and TTL support, but session_exists() uses a different Redis access path than write operations, and the store's value is questionable with stateless=True mode. |
| litellm/proxy/_experimental/mcp_server/server.py | Core integration of Redis session store into MCP request handling. The _wrap_send_for_session_capture pattern is clever, but the Redis awareness doesn't change session routing behavior in stateless=True mode — it only changes log levels. worker_pid metadata uses id(session_manager) instead of os.getpid(). |
| tests/mcp_tests/test_mcp_server.py | Minor test update to accommodate the send wrapper — correctly verifies that the third argument is callable rather than checking identity. Clean adaptation. |
| tests/mcp_tests/test_multi_worker.py | New integration tests for multi-worker session handling. Tests are well-structured but some re-implement the capture logic manually instead of testing the actual _wrap_send_for_session_capture function, reducing confidence in true end-to-end coverage. |
| tests/mcp_tests/test_redis_session_store.py | Comprehensive unit tests for the Redis session store covering CRUD, error handling, concurrent access, and cross-worker simulation. Good test coverage. |
Flowchart
flowchart TD
A[Client sends mcp-session-id header] --> B{Header present?}
B -- No --> Z[Pass to session_manager.handle_request]
B -- Yes --> C{Session in worker memory?}
C -- Yes --> Z
C -- No --> D{Redis available?}
D -- Yes --> E[Redis EXISTS check]
D -- No --> G
E -- Found --> F["Log info: 'exists in Redis'\nStrip header"]
E -- Not found --> G["Log warning: 'not found'\nStrip header"]
F --> Z
G --> Z
Z --> H[_wrap_send_for_session_capture intercepts response]
H --> I{mcp-session-id in response?}
I -- Yes --> J[_persist_captured_session saves to Redis]
I -- No --> K[No-op]
J --> L[Response sent to client]
K --> L
style F fill:#fff3cd
style G fill:#fff3cd
style J fill:#d4edda
Last reviewed commit: 91881ee
| try: | ||
| await _REDIS_SESSION_STORE.save_session( | ||
| captured_session_id, | ||
| {"worker_pid": str(id(session_manager))}, |
There was a problem hiding this comment.
worker_pid stores Python object ID, not OS PID
id(session_manager) returns the memory address of the Python object, not the actual OS process ID. This is misleading metadata — different workers with forked processes will have different os.getpid() values, but id(session_manager) could coincidentally collide across workers since each fork has its own address space. Use os.getpid() instead for meaningful worker identification.
| {"worker_pid": str(id(session_manager))}, | |
| {"worker_pid": str(__import__('os').getpid())}, |
| if session_exists_in_redis: | ||
| verbose_logger.info( | ||
| "MCP session '%s' exists in Redis (created by another worker) " | ||
| "but not in this worker's memory. Stripping header so a " | ||
| "fresh session can be created on this worker.", | ||
| _session_id, | ||
| ) | ||
| else: | ||
| # Non-DELETE: strip stale session ID to allow new session creation | ||
| verbose_logger.warning( | ||
| "MCP session ID '%s' not found in active sessions. " | ||
| "Stripping stale header to force new session creation.", | ||
| "MCP session ID '%s' not found in active sessions (memory or " | ||
| "Redis). Stripping stale header to force new session creation.", | ||
| _session_id, | ||
| ) | ||
| scope["headers"] = [ | ||
| (k, v) for k, v in scope["headers"] | ||
| if k != _mcp_session_header | ||
| ] | ||
| return False | ||
| scope["headers"] = [ | ||
| (k, v) | ||
| for k, v in scope["headers"] | ||
| if k != _mcp_session_header | ||
| ] | ||
| return False |
There was a problem hiding this comment.
Redis awareness has no practical effect in stateless mode
In stateless=True mode (line 158), _server_instances is never populated with session IDs by the session manager — each request creates a fresh transport with mcp_session_id=None. This means:
- When a client sends back a session ID it received from a previous response, that ID will never be in
_server_instanceson any worker. - The Redis
session_existscheck will returnTrue(because_persist_captured_sessionsaved it), but the code still strips the header and creates a fresh session — identical behavior to when the session is not in Redis. - The only difference between "session in Redis" and "session not in Redis" is whether an
infoorwarninglog message is emitted (lines 2045-2056).
The Redis store adds latency to every request (one EXISTS call on inbound + one SET on outbound) but doesn't change session routing behavior. Consider documenting this limitation clearly, or reconsidering whether Redis awareness provides value with stateless=True.
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
| key = self.redis_cache.check_and_fix_namespace( | ||
| key=self._make_key(session_id) | ||
| ) | ||
|
|
||
| # Use Redis EXISTS directly — O(1), no data transfer | ||
| _client = self.redis_cache.init_async_client() | ||
| result = await _client.exists(key) | ||
| return bool(result) | ||
| except Exception as e: | ||
| verbose_logger.error( | ||
| "Failed to check MCP session %s existence: %s", | ||
| session_id, | ||
| e, |
There was a problem hiding this comment.
session_exists bypasses namespace used by save_session
save_session writes via async_set_cache which internally calls check_and_fix_namespace on the key. But session_exists calls check_and_fix_namespace manually and then uses init_async_client().exists(key) directly — bypassing whatever key transformations async_set_cache might apply beyond namespacing. If async_set_cache applies any additional key mutations (e.g., encoding or prefixing beyond check_and_fix_namespace), the EXISTS check could miss keys that were written via save_session.
It would be safer to use the same access path (e.g., async_get_cache) or verify that async_set_cache doesn't transform the key beyond check_and_fix_namespace.
91881ee to
f34ac3d
Compare
|
@greptileai review |
Greptile SummaryThis PR adds a Redis-backed session registry ( Key concern: Because the MCP session manager runs in
Confidence Score: 2/5
|
| Filename | Overview |
|---|---|
| litellm/proxy/_experimental/mcp_server/redis_session_store.py | New Redis-backed session store class with CRUD operations. Well-structured with proper error handling and graceful fallback. No double JSON encoding. session_exists now uses async_get_cache for consistency. |
| litellm/proxy/_experimental/mcp_server/server.py | Core integration: Redis session store init, stale session handling with Redis awareness, ASGI send wrapper for session capture. Adds Redis GET on every inbound request and Redis SET on every outbound response, but in stateless mode this only changes log verbosity — no behavioral impact on session routing. |
| tests/mcp_tests/test_mcp_server.py | Minor test update to accommodate the new _wrap_send_for_session_capture wrapper — now checks callable instead of exact identity for the send argument. |
| tests/mcp_tests/test_multi_worker.py | 11 integration tests covering stale handler with/without Redis, capture pattern, full multi-worker flow. Tests are well-structured but some re-implement the capture logic inline instead of testing the actual _wrap_send_for_session_capture function. |
| tests/mcp_tests/test_redis_session_store.py | 23 unit tests for MCPRedisSessionStore covering CRUD, error handling, graceful fallback, serialization correctness, TTL, concurrent access, and cross-worker simulation. Thorough coverage. |
Sequence Diagram
sequenceDiagram
participant Client
participant Worker as Any Worker (handle_streamable_http_mcp)
participant StaleHandler as _handle_stale_mcp_session
participant Redis as Redis Session Store
participant SDK as MCP SDK (stateless=True)
Client->>Worker: POST /mcp (mcp-session-id: abc123)
Worker->>StaleHandler: Check session header
StaleHandler->>Redis: session_exists("abc123")
Redis-->>StaleHandler: true/false (observability only)
Note over StaleHandler: _server_instances always empty<br/>in stateless mode
StaleHandler->>StaleHandler: Strip mcp-session-id header
StaleHandler-->>Worker: return False (continue)
Worker->>SDK: handle_request(scope, receive, wrapped_send)
SDK-->>Worker: Response with new mcp-session-id
Worker->>Worker: _send_and_capture extracts session ID
Worker->>Redis: save_session(new_id, {worker_pid})
Worker-->>Client: Response
Last reviewed commit: f34ac3d
| wrapped_send, get_captured_id = _wrap_send_for_session_capture(send) | ||
|
|
||
| await session_manager.handle_request(scope, receive, wrapped_send) | ||
|
|
||
| # Persist the session ID in Redis so other workers know it exists | ||
| await _persist_captured_session(get_captured_id()) |
There was a problem hiding this comment.
Redis calls on every MCP request add latency with no behavioral effect
In stateless=True mode, every request creates a fresh transport and _server_instances is always empty. This means:
- Inbound:
session_exists()(a full RedisGET+ deserialize) runs on every request with a session header, but the result only changes the log level (INFO vs WARNING) — the header is stripped either way. - Outbound:
_persist_captured_session()(a RedisSET) runs after every response, writing session metadata that will only ever be used for the log-level distinction above.
This adds ~2 Redis round-trips to every MCP request for observability-only benefit. Per project guidelines, avoid creating new requests in the critical request path (custom rule 0c2a17ad). Consider either:
- Making the Redis calls conditional on a config flag (e.g.,
MCP_SESSION_REGISTRY_ENABLED) - Deferring
_persist_captured_sessionto a background task (asyncio.create_task) so it doesn't block the response - Documenting clearly that this latency is intentional and expected to be useful only when migrating to stateful mode later
Context Used: Rule from dashboard - What: Avoid creating new database requests or Router objects in the critical request path.
Why: Cre... (source)
tests/mcp_tests/test_multi_worker.py
Outdated
| @pytest.mark.asyncio | ||
| async def test_send_and_capture_saves_session_to_redis(shared_redis): | ||
| """ | ||
| Simulate the _send_and_capture wrapper: when the response includes | ||
| a mcp-session-id header, save_session should be called. | ||
| """ | ||
| redis_store = MCPRedisSessionStore(redis_cache=shared_redis, ttl=3600) | ||
|
|
||
| # Simulate what handle_streamable_http_mcp does | ||
| captured_session_id = None | ||
|
|
||
| async def mock_send(message): | ||
| pass | ||
|
|
||
| async def send_and_capture(message): | ||
| nonlocal captured_session_id | ||
| if message.get("type") == "http.response.start": | ||
| for name, value in message.get("headers", []): | ||
| if isinstance(name, bytes) and name.lower() == b"mcp-session-id": | ||
| captured_session_id = value.decode("utf-8") | ||
| break | ||
| await mock_send(message) | ||
|
|
||
| # Simulate a response with mcp-session-id header | ||
| await send_and_capture( | ||
| { | ||
| "type": "http.response.start", | ||
| "status": 200, | ||
| "headers": [ | ||
| (b"content-type", b"application/json"), | ||
| (b"mcp-session-id", b"new-session-abc123"), | ||
| ], | ||
| } | ||
| ) | ||
|
|
||
| assert captured_session_id == "new-session-abc123" | ||
|
|
||
| # Now simulate the post-request save | ||
| if captured_session_id: | ||
| await redis_store.save_session(captured_session_id, {"worker": "1"}) | ||
|
|
||
| # Verify the session is in Redis | ||
| assert await redis_store.session_exists("new-session-abc123") is True | ||
| meta = await redis_store.get_session("new-session-abc123") | ||
| assert meta is not None | ||
| assert meta["worker"] == "1" |
There was a problem hiding this comment.
Tests re-implement capture logic instead of testing the actual function
test_send_and_capture_saves_session_to_redis and test_send_and_capture_ignores_no_session_header manually re-implement the header-capture logic inline (lines 238-244, 277-283) rather than importing and testing the actual _wrap_send_for_session_capture function from server.py. This means the tests don't verify the real implementation — they only prove that a separately-written copy of the logic works.
Consider importing _wrap_send_for_session_capture and _persist_captured_session and testing them directly, similar to how _handle_stale_mcp_session is tested above.
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
| False if the request should continue to the session manager | ||
| True if the request was fully handled (e.g. DELETE on | ||
| non-existent session). False if the request should continue | ||
| to the session manager. | ||
|
|
||
| Fixes https://github.com/BerriAI/litellm/issues/20292 |
There was a problem hiding this comment.
Docstring references wrong issue number
The docstring says "Fixes #20292" but the actual issue this PR addresses is #20992. This appears to be a typo from the original code that was carried forward.
| Fixes https://github.com/BerriAI/litellm/issues/20292 | |
| Fixes https://github.com/BerriAI/litellm/issues/20992 |
f34ac3d to
cdc4316
Compare
|
@greptileai review |
Greptile SummaryThis PR adds a Redis-backed session registry (
Confidence Score: 2/5
|
| Filename | Overview |
|---|---|
| litellm/proxy/_experimental/mcp_server/redis_session_store.py | New Redis session store with CRUD operations. session_exists() uses full GET+deserialize instead of Redis EXISTS as the PR description claims. Overall well-structured with proper error handling and graceful degradation. |
| litellm/proxy/_experimental/mcp_server/server.py | Core integration: adds Redis session awareness, _handle_stale_mcp_session refactored with Redis check, _wrap_send_for_session_capture intercepts response headers, _persist_captured_session fires as untracked background task. Redis adds latency on every MCP request but only provides observability benefit in stateless mode. |
| tests/mcp_tests/test_mcp_server.py | Minor adjustment to existing test to account for the new send wrapper — asserts wrapped send is callable instead of identity check. Correct adaptation. |
| tests/mcp_tests/test_multi_worker.py | 11 integration tests covering stale session handling with Redis, send-and-capture wrapper, full multi-worker flow, and fallback behavior. Good coverage but tests use mocked Redis which may not catch namespace/key transformation mismatches with the real RedisCache. |
| tests/mcp_tests/test_redis_session_store.py | 23 unit tests for MCPRedisSessionStore covering CRUD, error handling, graceful fallback, serialization correctness, TTL, key namespace, concurrent access, and cross-worker simulation. Thorough coverage of the store class. |
Sequence Diagram
sequenceDiagram
participant Client
participant ASGI as handle_streamable_http_mcp
participant Stale as _handle_stale_mcp_session
participant Redis as MCPRedisSessionStore
participant SDK as session_manager.handle_request
participant Persist as _persist_captured_session
Client->>ASGI: POST /mcp (mcp-session-id: abc-123)
ASGI->>Stale: Check stale session
Stale->>Redis: session_exists("abc-123") [GET + deserialize]
Redis-->>Stale: true/false
Note over Stale: Check _server_instances (always empty in stateless mode)
alt Session not in memory (always true in stateless)
Stale->>Stale: Strip mcp-session-id header
Note over Stale: Log INFO (if in Redis) or WARNING (if not)
end
Stale-->>ASGI: False (continue processing)
ASGI->>ASGI: _wrap_send_for_session_capture(send)
ASGI->>SDK: handle_request(scope, receive, wrapped_send)
SDK-->>ASGI: Response with new mcp-session-id header
Note over ASGI: Capture new session ID from response headers
ASGI->>Persist: asyncio.create_task (fire-and-forget)
Persist->>Redis: save_session(new-id, {worker_pid})
Redis-->>Persist: OK
ASGI-->>Client: Response (new mcp-session-id)
Last reviewed commit: cdc4316
| # Persist the session ID in Redis (background task, non-blocking) | ||
| captured_id = get_captured_id() | ||
| if captured_id and _REDIS_SESSION_STORE: | ||
| asyncio.create_task(_persist_captured_session(captured_id)) |
There was a problem hiding this comment.
Untracked asyncio.create_task may be garbage collected
The task returned by asyncio.create_task() is not stored in any variable or set. Per Python docs, the event loop only holds a weak reference to tasks, so an unreferenced task can be garbage collected before it completes — silently dropping the Redis write.
While this pattern exists elsewhere in the codebase, it's worth noting that for a best-effort background save, a silent failure due to GC is hard to debug. Consider storing the task reference (e.g., in a module-level set with a done-callback that removes it), which is the recommended practice:
_background_tasks = set()
task = asyncio.create_task(_persist_captured_session(captured_id))
_background_tasks.add(task)
task.add_done_callback(_background_tasks.discard)
| async def session_exists(self, session_id: str) -> bool: | ||
| """ | ||
| Check if a session exists in Redis. | ||
|
|
||
| Uses the same ``async_get_cache`` access path as ``save_session`` | ||
| to guarantee key transformations (namespacing, prefixing) are | ||
| consistent. | ||
|
|
||
| Args: | ||
| session_id: The MCP session ID to check | ||
|
|
||
| Returns: | ||
| True if session exists, False otherwise | ||
| """ | ||
| if not self.redis_cache: | ||
| return False | ||
|
|
||
| try: | ||
| key = self._make_key(session_id) | ||
| value = await self.redis_cache.async_get_cache(key=key) | ||
| return value is not None | ||
| except Exception as e: | ||
| verbose_logger.error( | ||
| "Failed to check MCP session %s existence: %s", | ||
| session_id, | ||
| e, | ||
| ) | ||
| return False |
There was a problem hiding this comment.
PR description claims O(1) EXISTS but code does full GET
The PR description states "O(1) existence check: session_exists() uses Redis EXISTS command directly (not GET+deserialize)" but the implementation calls async_get_cache which performs a full GET followed by json.loads deserialization. While the docstring here was updated to acknowledge this, the PR description is misleading.
Since session_exists is called on every inbound MCP request with a session header, the difference matters: Redis EXISTS returns a boolean without transferring the value payload, whereas GET transfers the full serialized JSON and then the client deserializes it. For a hot-path existence check, consider using a lighter-weight approach if the RedisCache API supports it, or at minimum update the PR description to reflect the actual implementation.
| # Check Redis for multi-worker session awareness | ||
| session_exists_in_redis = False | ||
| if _REDIS_SESSION_STORE: | ||
| try: | ||
| session_exists_in_redis = await _REDIS_SESSION_STORE.session_exists( | ||
| _session_id | ||
| ) | ||
| verbose_logger.debug( | ||
| "MCP session %s exists in Redis: %s", | ||
| _session_id, | ||
| session_exists_in_redis, | ||
| ) | ||
| except Exception as e: | ||
| verbose_logger.error( | ||
| "Error checking MCP session %s in Redis: %s. " | ||
| "Falling back to in-memory check.", | ||
| _session_id, | ||
| e, | ||
| ) |
There was a problem hiding this comment.
Blocking Redis call on every inbound MCP request in the hot path
session_exists() (which performs a full Redis GET + JSON deserialize) is awaited synchronously on every MCP request that carries an mcp-session-id header. In stateless=True mode, this call only determines the log level (INFO vs WARNING) — it does not change any routing or session behavior (the header is stripped either way).
This adds non-trivial latency to every MCP request for observability-only benefit. Per project custom instruction 0c2a17ad: "Avoid creating new database requests or Router objects in the critical request path." While Redis isn't a database in the traditional sense, the spirit of that rule applies — adding a network round-trip to every request for logging purposes is a concern.
Consider making this conditional on a debug/verbose flag, or moving it to a background task so it doesn't block the request.
Context Used: Rule from dashboard - What: Avoid creating new database requests or Router objects in the critical request path.
Why: Cre... (source)
cdc4316 to
2e03590
Compare
|
@greptileai review |
Greptile SummaryThis PR adds a Redis-backed session registry (
Confidence Score: 1/5
|
| Filename | Overview |
|---|---|
| litellm/proxy/_experimental/mcp_server/server.py | Adds Redis session capture/persist on outbound and DELETE cleanup, but the inbound _handle_stale_mcp_session never consults Redis for routing decisions. In stateless mode, Redis writes fire on every response with no behavioral impact — adding overhead without solving the multi-worker problem. |
| litellm/proxy/_experimental/mcp_server/redis_session_store.py | Well-structured Redis session store with proper error handling, graceful fallback, and TTL support. The implementation itself is clean, but the store is effectively unused for its stated purpose (cross-worker session awareness) since the server never reads from it on inbound requests. |
| tests/mcp_tests/test_redis_session_store.py | Comprehensive unit tests for the Redis session store CRUD operations, error handling, and cross-worker simulation. Tests are well-organized and thorough for the store itself. |
| tests/mcp_tests/test_multi_worker.py | Integration tests for the multi-worker flow, but test_full_multi_worker_flow inadvertently proves Redis has no effect — the outcome is identical with or without Redis. Tests don't verify any behavioral difference enabled by the Redis store. |
| tests/mcp_tests/test_mcp_server.py | Minor update to accommodate the new _wrap_send_for_session_capture wrapper — correctly verifies that the third argument to handle_request is now a wrapped callable rather than the original send. |
| PR_DESCRIPTION.md | PR description file committed to repo root — should be removed as it doesn't belong in the codebase. |
Flowchart
flowchart TD
A[Client sends mcp-session-id header] --> B{Session in local _server_instances?}
B -- Yes --> C[Let session manager handle]
B -- No --> D{_server_instances is None?}
D -- Yes --> C
D -- No --> E{Method is DELETE?}
E -- Yes --> F[Best-effort Redis delete_session]
F --> G[Return 200 success]
E -- No --> H[Strip mcp-session-id header]
H --> I[Session manager creates fresh session]
I --> J[_wrap_send_for_session_capture intercepts response]
J --> K{Response has mcp-session-id?}
K -- Yes --> L[Background task: save_session to Redis]
K -- No --> M[No-op]
L --> N["Redis SET (never read on inbound)"]
style N fill:#f96,stroke:#333
style L fill:#f96,stroke:#333
Last reviewed commit: 2e03590
| async def _persist_captured_session( | ||
| captured_session_id: Optional[str], | ||
| ) -> None: | ||
| """Persist a newly-captured MCP session ID to Redis (best-effort).""" | ||
| if captured_session_id and _REDIS_SESSION_STORE: | ||
| try: | ||
| await _REDIS_SESSION_STORE.save_session( | ||
| captured_session_id, | ||
| {"worker_pid": str(os.getpid())}, | ||
| ) | ||
| except Exception as exc: | ||
| verbose_logger.error( | ||
| "Failed to save MCP session %s to Redis: %s", | ||
| captured_session_id, | ||
| exc, | ||
| ) |
There was a problem hiding this comment.
Redis registry is write-only in stateless mode
_persist_captured_session saves session IDs to Redis on every outbound response, but _handle_stale_mcp_session never reads from Redis to influence routing decisions. The inbound path at lines 1989-2036 only checks _server_instances (local memory), and since stateless=True means _server_instances is never populated by the SDK, the session header is always stripped for any request carrying a session ID.
The Redis store is effectively write-only:
- Outbound:
save_session()writes on every response (line 2178-2183) - DELETE:
delete_session()cleans up (line 2010) - Inbound: Redis is never consulted for routing
This means Worker B handling a request originally from Worker A will behave identically whether Redis is configured or not — the stale header is always stripped and a fresh session is created. The Redis writes add overhead (one SET per response) with no observable behavioral difference.
If cross-worker awareness is purely for observability/logging, consider documenting that clearly and making the Redis writes conditional on a verbose/debug flag to avoid unnecessary Redis traffic in production.
Context Used: Rule from dashboard - What: Avoid creating new database requests or Router objects in the critical request path.
Why: Cre... (source)
PR_DESCRIPTION.md
Outdated
| @@ -0,0 +1,146 @@ | |||
| ## Relevant Issues | |||
There was a problem hiding this comment.
PR_DESCRIPTION.md should not be committed to the repo
This file duplicates the PR description and doesn't belong in the repository root. It should be removed from the commit — the PR description belongs in the GitHub PR body, not as a tracked file.
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
| # Wrap `send` to capture the session ID from response headers | ||
| wrapped_send, get_captured_id = _wrap_send_for_session_capture(send) | ||
|
|
||
| await session_manager.handle_request(scope, receive, wrapped_send) | ||
|
|
||
| # Persist the session ID in Redis (background task, non-blocking) | ||
| captured_id = get_captured_id() | ||
| if captured_id and _REDIS_SESSION_STORE: | ||
| task = asyncio.create_task( | ||
| _persist_captured_session(captured_id) | ||
| ) | ||
| _background_tasks.add(task) | ||
| task.add_done_callback(_background_tasks.discard) |
There was a problem hiding this comment.
Background Redis write fires on every MCP response
_wrap_send_for_session_capture intercepts every outbound response, and if a mcp-session-id header is present, a background task is created to write to Redis. In stateless=True mode, the MCP SDK generates a new session ID on every request (since each creates a fresh StreamableHTTPServerTransport). This means every single MCP response triggers a Redis SET — writing a session ID that will never be looked up on inbound.
Per custom rule 0c2a17ad: "Avoid creating new database requests or Router objects in the critical request path." While this is technically a background task, it still creates a Redis round-trip per request. Consider gating this behind a configuration flag (e.g., MCP_SESSION_REGISTRY_ENABLED) or only writing when the session ID is new (not already seen).
Context Used: Rule from dashboard - What: Avoid creating new database requests or Router objects in the critical request path.
Why: Cre... (source)
tests/mcp_tests/test_multi_worker.py
Outdated
| @pytest.mark.asyncio | ||
| async def test_full_multi_worker_flow(shared_redis): | ||
| """ | ||
| End-to-end scenario: | ||
| 1. Worker A handles initialize → captures session ID → saves to Redis | ||
| 2. Worker B receives follow-up with that session ID | ||
| 3. _handle_stale_mcp_session finds session in Redis → strips header (good) | ||
| 4. Worker B creates fresh session (stateless mode) | ||
| """ | ||
| handle = _get_handle_stale() | ||
| redis_store = MCPRedisSessionStore(redis_cache=shared_redis, ttl=3600) | ||
|
|
||
| # Step 1: Worker A creates session and saves to Redis | ||
| await redis_store.save_session( | ||
| "abc123", | ||
| {"origin": "worker_a", "worker_pid": "12345"}, | ||
| ) | ||
|
|
||
| # Step 2-3: Worker B receives request with session ID from Worker A | ||
| scope = _make_scope(session_id="abc123") | ||
| mgr_b = _mock_session_manager(session_ids=[]) # Worker B has no sessions | ||
|
|
||
| with patch( | ||
| "litellm.proxy._experimental.mcp_server.server._REDIS_SESSION_STORE", | ||
| redis_store, | ||
| ): | ||
| result = await handle(scope, AsyncMock(), AsyncMock(), mgr_b) | ||
|
|
||
| # Should return False (not handled) so request continues to session_manager | ||
| assert result is False | ||
| # Header should be stripped for fresh session creation | ||
| header_names = [h[0] for h in scope["headers"]] | ||
| assert b"mcp-session-id" not in header_names | ||
|
|
||
| # Step 4: Session should still exist in Redis | ||
| assert await redis_store.session_exists("abc123") is True |
There was a problem hiding this comment.
"Full multi-worker flow" test proves Redis has no effect
This test demonstrates the core architectural issue: even when Worker A's session is in Redis (line 311), Worker B still strips the header (line 330) and creates a fresh session. The test asserts result is False and b"mcp-session-id" not in header_names — which is exactly the same behavior as without Redis (see test_stale_handler_session_not_found_strips_header).
The test comment says "finds session in Redis → strips header (good)" but this is the same outcome as when the session is not in Redis. The Redis lookup isn't even happening in _handle_stale_mcp_session — the function doesn't call session_exists() on inbound requests. This confirms the Redis store has no behavioral impact in the current implementation.
2e03590 to
5728c03
Compare
|
@greptileai review |
tests/mcp_tests/test_mcp_server.py
Outdated
| # Note: send is wrapped by _wrap_send_for_session_capture, so we | ||
| # check scope/receive identity and that the third arg is callable |
There was a problem hiding this comment.
Comment references non-existent _wrap_send_for_session_capture function — this function doesn't exist in server.py. The actual code at line 2075 calls session_manager.handle_request(scope, receive, send) directly without any wrapper.
| # Check in-memory session tracking | ||
| known_sessions = getattr(mgr, "_server_instances", None) | ||
| if known_sessions is None or _session_id in known_sessions: | ||
| # Session exists or we can't check - let the session manager handle it | ||
| session_exists_in_memory = ( | ||
| known_sessions is not None and _session_id in known_sessions | ||
| ) | ||
|
|
||
| # If session exists in this worker's memory, let the manager handle it | ||
| if session_exists_in_memory: | ||
| return False | ||
|
|
||
| # Session doesn't exist - handle based on request method | ||
| # If we cannot inspect known_sessions, let the manager handle it | ||
| if known_sessions is None: | ||
| return False |
There was a problem hiding this comment.
Early return on line 1960 makes the session_exists_in_memory variable on line 1951 redundant. Since you return False if known_sessions is None, you can simplify:
| # Check in-memory session tracking | |
| known_sessions = getattr(mgr, "_server_instances", None) | |
| if known_sessions is None or _session_id in known_sessions: | |
| # Session exists or we can't check - let the session manager handle it | |
| session_exists_in_memory = ( | |
| known_sessions is not None and _session_id in known_sessions | |
| ) | |
| # If session exists in this worker's memory, let the manager handle it | |
| if session_exists_in_memory: | |
| return False | |
| # Session doesn't exist - handle based on request method | |
| # If we cannot inspect known_sessions, let the manager handle it | |
| if known_sessions is None: | |
| return False | |
| # Check in-memory session tracking | |
| known_sessions = getattr(mgr, "_server_instances", None) | |
| if known_sessions is None: | |
| # If we cannot inspect known_sessions, let the manager handle it | |
| return False | |
| # If session exists in this worker's memory, let the manager handle it | |
| if _session_id in known_sessions: | |
| return False |
5728c03 to
1bb2cad
Compare
|
@greptileai review |
| # Check in-memory session tracking | ||
| known_sessions = getattr(mgr, "_server_instances", None) | ||
| if known_sessions is None or _session_id in known_sessions: | ||
| # Session exists or we can't check - let the session manager handle it | ||
| # If we cannot inspect known_sessions, let the manager handle it | ||
| if known_sessions is None: | ||
| return False |
There was a problem hiding this comment.
redundant early return makes session_exists_in_memory variable unnecessary
since you return early on line 1953 if known_sessions is None, the variable assigned on line 1951 is never used
| # Check in-memory session tracking | |
| known_sessions = getattr(mgr, "_server_instances", None) | |
| if known_sessions is None or _session_id in known_sessions: | |
| # Session exists or we can't check - let the session manager handle it | |
| # If we cannot inspect known_sessions, let the manager handle it | |
| if known_sessions is None: | |
| return False | |
| # Check in-memory session tracking | |
| known_sessions = getattr(mgr, "_server_instances", None) | |
| # If we cannot inspect known_sessions, let the manager handle it | |
| if known_sessions is None: | |
| return False |
There was a problem hiding this comment.
CRITICAL: PR description completely misrepresents the changes
The PR description claims this PR adds:
MCPRedisSessionStoreclass inredis_session_store.py- 34 test cases (23 unit + 11 integration)
- Redis-backed session registry
save_session(),get_session(),session_exists(),delete_session()methods- Background task pattern with
_background_tasksset
None of these exist in the actual commit. The real changes are:
- Minor docstring/comment updates
- Fixed issue reference from [Bug]: MCP Server (stdio transport) fails to connect from VSCode: "Session not found" error #20292 to MCP sessions are not shared across workers - multi-worker proxy drops MCP connections randomly #20992
- Refactored early return logic in
_handle_stale_mcp_session - Updated log message wording
This is either the wrong commit for PR #21417, or the PR description was copy-pasted from a different PR entirely. This makes it impossible to properly review because all the previous review comments reference Redis code that doesn't exist here.
|
@greptileai review |
There was a problem hiding this comment.
PR description describes Redis-backed session store with MCPRedisSessionStore class, 34 tests, and multiple new files. The actual commit only contains minor refactoring: fixed issue reference (#20292→#20992), improved docstring clarity, split early return logic, and updated log messages. No Redis code, no new files, no 34 tests.
…orker deployments (BerriAI#20992) In a multi-worker Uvicorn setup, a client that reconnects to a different worker sends an mcp-session-id that the new worker has never seen. The MCP SDK returns 400 because the session is unknown. Fix: add _handle_stale_mcp_session() which inspects the inbound mcp-session-id header before the request reaches the SDK. If the session is not in this worker's _server_instances: - Non-DELETE: strip the header so the SDK creates a fresh session - DELETE: return 200 immediately (idempotent, session already gone) No new dependencies, no Redis, no latency added to the hot path. Fixes BerriAI#20992
1bb2cad to
406c468
Compare
|
@greptileai review |
Additional Comments (1)
|
|
@greptile re-review the entire PR |
| import contextlib | ||
|
|
There was a problem hiding this comment.
Spurious blank line splits stdlib import group
This blank line separates contextlib from traceback, both of which are stdlib imports. PEP 8 and isort convention group all stdlib imports together without blank lines between them. This likely needs to be removed to keep the import block clean.
| import contextlib | |
| import contextlib | |
| import traceback |
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
|
@gavksingh the PR doesn't seem to do any redis work - is this intentional? Seems in conflict with your PR title @jquinter for monitoring |
Hey @krrishdholakia - Yes it is fully intentional! Sorry for the confusing PR title history; I've just updated the title to properly reflect the fix. I initially went with a Redis implementation, but realized it wasn't the right approach here. Since the proxy uses The 400 error in #20992 happens simply because a reconnecting client hits a new worker, and that new worker rejects the unknown |
|
@greptileai review |
|
@greptile re-review any give the score again |
|
@gavksingh was this tested e2e in a multi instance env? |
|
@shivamrawat1 Yes, just validated it e2e. Spun up two independent litellm proxy containers via docker-compose, different ports, same config, no shared state between them. Each container has its own process and its own
This works because the endpoint runs in stateless mode ( Happy to open a follow-up PR with the e2e test files if you want them in the test suite. |
Relevant Issues
Fixes #20992
Root Cause
In a multi-worker Uvicorn deployment, a client that reconnects to a different worker sends an
mcp-session-idthat the new worker has never seen. The MCP SDK returns HTTP 400 because the session ID is unknown.Fix
Added
_handle_stale_mcp_session()inlitellm/proxy/_experimental/mcp_server/server.pywhich inspects the inboundmcp-session-idheader before the request reaches the MCP SDK:No new dependencies. No Redis. No hot-path latency.
Changes
Modified Files
litellm/proxy/_experimental/mcp_server/server.py_handle_stale_mcp_session()— inspects and strips stalemcp-session-idheadershandle_streamable_http_mcp()before delegating to the session managertests/mcp_tests/test_mcp_server.pytest_streamable_http_mcp_handler_mockand related tests covering:Testing