-
Notifications
You must be signed in to change notification settings - Fork 1
feat: v0.21.0 MCP proxy (transparent governance for upstream MCP servers) #100
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
e281135
docs: drop Python versions badge from README header
vaaraio 235054d
feat: v0.21.0 MCP proxy (transparent governance for upstream MCP serv…
vaaraio f4fa961
fix: drop unused typing.Any import in mcp_proxy tests
vaaraio 8d479cd
fix(mcp_proxy): raise ProxyError on upstream shutdown instead of assert
vaaraio File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,187 @@ | ||
| """Upstream MCP subprocess client for the proxy. | ||
|
|
||
| Owns the subprocess lifecycle of an upstream MCP server, demuxes responses | ||
| by JSON-RPC id, and routes notifications to a callback for the proxy to | ||
| forward downstream. | ||
|
|
||
| Internal module. Public surface is :class:`vaara.integrations.mcp_proxy.VaaraMCPProxy`. | ||
| """ | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import json | ||
| import logging | ||
| import os | ||
| import subprocess | ||
| import sys | ||
| import threading | ||
| from dataclasses import dataclass | ||
| from typing import Any, Callable, Optional | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class ProxyError(Exception): | ||
| """The proxy itself cannot serve a request. | ||
|
|
||
| Distinct from upstream-emitted JSON-RPC errors, which are forwarded | ||
| verbatim. ProxyError is raised when the proxy-side machinery fails | ||
| (upstream subprocess crashed, stdin write failed, response timeout) | ||
| and the caller should surface JSON-RPC -32603 Internal error downstream. | ||
| """ | ||
|
|
||
|
|
||
| def strict_json_dumps(obj: Any, **kwargs: Any) -> str: | ||
| """JSON dump that fails on NaN/Infinity (RFC 8259 strict). | ||
|
|
||
| Python's default ``json.dumps`` emits ``NaN``/``Infinity``/``-Infinity`` | ||
| literals that strict JSON parsers (Go, Rust, browsers, many MCP clients) | ||
| reject. Forcing strict output surfaces escaped non-finite values loudly | ||
| in tests rather than silently emitting invalid wire format. | ||
| """ | ||
| return json.dumps(obj, allow_nan=False, **kwargs) | ||
|
|
||
|
|
||
| @dataclass | ||
| class _UpstreamRequest: | ||
| id: Any | ||
| event: threading.Event | ||
| response: Optional[dict] = None | ||
|
|
||
|
|
||
| class UpstreamMCPClient: | ||
| """Spawn an upstream MCP server and communicate over its stdio. | ||
|
|
||
| The reader runs on a background thread that parks responses keyed by | ||
| JSON-RPC id and routes notifications to ``on_notification``. The main | ||
| thread synchronously calls :meth:`request` and waits. | ||
| """ | ||
|
|
||
| def __init__( | ||
| self, | ||
| command: list[str], | ||
| env: Optional[dict[str, str]] = None, | ||
| on_notification: Optional[Callable[[dict], None]] = None, | ||
| ) -> None: | ||
| self._on_notification = on_notification | ||
| self._pending: dict[Any, _UpstreamRequest] = {} | ||
| self._lock = threading.Lock() | ||
| self._closed = False | ||
|
|
||
| # stderr passes through so upstream logs surface in the proxy's | ||
| # stderr without contaminating the JSON-RPC channel on stdout. | ||
| self._proc = subprocess.Popen( | ||
| command, | ||
| stdin=subprocess.PIPE, | ||
| stdout=subprocess.PIPE, | ||
| stderr=sys.stderr, | ||
| env=env or os.environ.copy(), | ||
| bufsize=1, | ||
| text=True, | ||
| ) | ||
|
|
||
| self._reader_thread = threading.Thread( | ||
| target=self._read_loop, daemon=True, name="upstream-reader", | ||
| ) | ||
| self._reader_thread.start() | ||
|
|
||
| def request(self, payload: dict, timeout: float = 30.0) -> dict: | ||
| """Send a request, wait for the matching response by id. | ||
|
|
||
| Raises :class:`ProxyError` if the upstream has died or the response | ||
| does not arrive within ``timeout``. | ||
| """ | ||
| if self._closed: | ||
| raise ProxyError("Upstream MCP server is closed") | ||
| if "id" not in payload: | ||
| raise ValueError("request() requires a JSON-RPC id; use notify() for notifications") | ||
|
|
||
| pending = _UpstreamRequest(id=payload["id"], event=threading.Event()) | ||
| with self._lock: | ||
| self._pending[payload["id"]] = pending | ||
| try: | ||
| self._write(payload) | ||
| if not pending.event.wait(timeout=timeout): | ||
| raise ProxyError(f"Upstream MCP server did not respond within {timeout}s") | ||
| # event was set but response stays None when the reader thread | ||
| # exited (upstream closed stdout) and woke us as a shutdown signal. | ||
| # An assert would either raise AssertionError (escapes the caller's | ||
| # ProxyError handler) or be optimized out under -O (return None, | ||
| # silently breaking the contract). Raise ProxyError explicitly. | ||
| if pending.response is None: | ||
| raise ProxyError("Upstream MCP server closed before responding") | ||
| if not isinstance(pending.response, dict): | ||
| raise ProxyError("Upstream MCP server returned non-object JSON-RPC") | ||
| return pending.response | ||
| finally: | ||
| with self._lock: | ||
| self._pending.pop(payload["id"], None) | ||
|
|
||
| def notify(self, payload: dict) -> None: | ||
| """Send a JSON-RPC notification (no response expected).""" | ||
| if self._closed: | ||
| return | ||
| self._write(payload) | ||
|
|
||
| def _write(self, payload: dict) -> None: | ||
| if self._proc.stdin is None: | ||
| raise ProxyError("Upstream MCP server stdin is closed") | ||
| try: | ||
| self._proc.stdin.write(strict_json_dumps(payload) + "\n") | ||
| self._proc.stdin.flush() | ||
| except (BrokenPipeError, OSError) as e: | ||
| raise ProxyError(f"Upstream MCP server stdin write failed: {e}") from e | ||
|
|
||
| def _read_loop(self) -> None: | ||
| if self._proc.stdout is None: | ||
| return | ||
| for line in self._proc.stdout: | ||
| line = line.strip() | ||
| if not line: | ||
| continue | ||
| try: | ||
| message = json.loads(line) | ||
| except json.JSONDecodeError: | ||
| logger.warning("Upstream emitted non-JSON line: %r", line[:200]) | ||
| continue | ||
|
|
||
| # Notifications (no id) route to the callback for downstream forward. | ||
| if isinstance(message, dict) and "id" not in message: | ||
| if self._on_notification is not None: | ||
| try: | ||
| self._on_notification(message) | ||
| except Exception: | ||
| logger.exception("Notification handler raised") | ||
| continue | ||
|
|
||
| # Responses demux by id. | ||
| response_id = message.get("id") if isinstance(message, dict) else None | ||
| with self._lock: | ||
| pending = self._pending.get(response_id) | ||
| if pending is None: | ||
| logger.warning("Upstream response for unknown id %r", response_id) | ||
| continue | ||
| pending.response = message | ||
| pending.event.set() | ||
|
|
||
| # Reader exited: upstream closed stdout. Wake all waiters so they | ||
| # fail with ProxyError rather than hanging forever. | ||
| self._closed = True | ||
| with self._lock: | ||
| for pending in self._pending.values(): | ||
| pending.event.set() | ||
|
|
||
| def close(self) -> None: | ||
| self._closed = True | ||
| try: | ||
| if self._proc.stdin is not None: | ||
| self._proc.stdin.close() | ||
| except Exception: | ||
Check noticeCode scanning / CodeQL Empty except Note
'except' clause does nothing but pass and there is no explanatory comment.
|
||
|
|
||
| pass | ||
| try: | ||
| self._proc.terminate() | ||
| self._proc.wait(timeout=5) | ||
| except subprocess.TimeoutExpired: | ||
| self._proc.kill() | ||
| except Exception: | ||
Check noticeCode scanning / CodeQL Empty except Note
'except' clause does nothing but pass and there is no explanatory comment.
|
||
|
|
||
| pass | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Return
ProxyErroron upstream shutdown instead of asserting.If the reader thread exits, it wakes pending requests without a response. This path hits the
assertat Line 106, which can raiseAssertionError(or be removed with-O) and escape caller error handling. Convert this to an explicitProxyErrorwhen no response is available.💡 Proposed fix
try: self._write(payload) if not pending.event.wait(timeout=timeout): raise ProxyError(f"Upstream MCP server did not respond within {timeout}s") - assert pending.response is not None - return pending.response + if pending.response is None: + raise ProxyError("Upstream MCP server closed before responding") + if not isinstance(pending.response, dict): + raise ProxyError("Upstream MCP server returned non-object JSON-RPC") + return pending.response finally: with self._lock: self._pending.pop(payload["id"], None)Also applies to: 159-164
🤖 Prompt for AI Agents