diff --git a/CHANGELOG.md b/CHANGELOG.md index a64c111..50a71e3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,40 @@ and this project follows [Semantic Versioning](https://semver.org/spec/v2.0.0.ht ## [Unreleased] +## [0.21.0] - 2026-05-19 + +**Theme: MCP-aware proxy.** Adds +`vaara.integrations.mcp_proxy.VaaraMCPProxy`, a transparent MCP proxy +that sits between an MCP client (Claude Code, Cursor, any MCP-capable +host) and an upstream MCP server (SAP ADT MCP, SAP Graph API MCP, SAP +Cloud ALM MCP, any community-built MCP server). Every `tools/call` +request from the client routes through Vaara's interception pipeline +before reaching the upstream. Allowed calls forward transparently and +report the upstream outcome back to the scorer. Blocked calls return +an MCP `isError: true` response with the block reason. Other MCP +methods (initialize, tools/list, resources, notifications) forward +unchanged. + +### Added +- `src/vaara/integrations/mcp_proxy.py`: `VaaraMCPProxy` and CLI entry + point. Invoke as `python -m vaara.integrations.mcp_proxy --upstream + [--upstream-arg ...]`. +- `src/vaara/integrations/_mcp_upstream.py`: `UpstreamMCPClient`, + subprocess lifecycle plus JSON-RPC id demultiplexing on a background + reader thread. Internal module, not part of the public surface. +- `tests/test_integrations_mcp_proxy.py`: six smoke tests covering + blocked tool calls, allowed forward, severity mapping, the + `_vaara_agent_id` strip, non-tools/call passthrough, and invalid + request handling. + +### Strategic frame +The community SAP MCP servers shipped at SAP Sapphire 2026 plus the +Anthropic-SAP partnership announcement put SAP ABAP / Graph / Cloud +ALM behind Claude Code in enterprise developer workflows. None of the +parties (SAP, Anthropic, the community MCP server authors) ships the +runtime governance layer the EU AI Act high-risk obligations require +for those tool calls. The proxy is that layer in OSS today. + ## [0.20.0] - 2026-05-18 **Theme: OSS guardrail adapters.** Adds four adapters that take findings diff --git a/README.md b/README.md index 9100fdd..2533fc2 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,6 @@

PyPI - Python License CI OpenSSF Scorecard @@ -113,6 +112,16 @@ Four adapters route findings from NVIDIA NeMo Guardrails, Guardrails AI, LLM Gua Each adapter returns a `ContentSafetyFinding` the deployer routes into `pipeline.intercept(context=finding.to_audit_context())`. OSS SDKs are optional extras: `pip install 'vaara[nemo-guardrails]'`, `pip install 'vaara[guardrails-ai]'`, `pip install 'vaara[llm-guard]'`, `pip install 'vaara[rebuff]'`. The 41 new mapping rows extend the same table at `src/vaara/integrations/_content_safety_articles.py`. Article-level rationale is in [COMPLIANCE.md](COMPLIANCE.md#oss-guardrail-adapter-pattern). +### MCP proxy (Vaara as a transparent governance layer) + +`vaara.integrations.mcp_proxy.VaaraMCPProxy` sits between an MCP client (Claude Code, Cursor, any MCP-capable host) and an upstream MCP server (SAP ADT MCP, SAP Graph API MCP, SAP Cloud ALM MCP, any community-built MCP server). Every `tools/call` request from the client routes through Vaara's interception pipeline before reaching the upstream. Allowed calls forward transparently and report the upstream outcome back to the scorer. Blocked calls return an MCP `isError: true` response with the block reason. Other MCP methods (initialize, tools/list, resources, notifications) forward unchanged. + +```bash +python -m vaara.integrations.mcp_proxy --upstream npx --upstream-arg @sap/adt-mcp-server +``` + +Point your MCP client at the proxy instead of the upstream. The audit chain captures every tool call without changing client or upstream behavior. Distinct from `mcp_server`, which exposes Vaara itself as an MCP server for agents that consult Vaara as a tool. + ## HTTP API The same scorer and audit trail are available over HTTP for non-Python agents and for control planes that prefer a network boundary. Install with the `server` extra: diff --git a/clients/ts/package.json b/clients/ts/package.json index 5e25103..fae1c02 100644 --- a/clients/ts/package.json +++ b/clients/ts/package.json @@ -1,6 +1,6 @@ { "name": "@vaara/client", - "version": "0.20.0", + "version": "0.21.0", "description": "TypeScript client for the Vaara HTTP API. Conformal risk scoring, hash-chained audit, policy reload, named detectors.", "main": "dist/index.js", "types": "dist/index.d.ts", diff --git a/pyproject.toml b/pyproject.toml index 37e749a..9de9544 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "vaara" -version = "0.20.0" +version = "0.21.0" description = "Adaptive AI Agent Execution Layer for risk scoring, audit trails, and regulatory compliance" requires-python = ">=3.10" license = "Apache-2.0" diff --git a/src/vaara/__init__.py b/src/vaara/__init__.py index 8da441b..8577808 100644 --- a/src/vaara/__init__.py +++ b/src/vaara/__init__.py @@ -6,7 +6,7 @@ oversight. """ -__version__ = "0.20.0" +__version__ = "0.21.0" from vaara.pipeline import InterceptionPipeline, InterceptionResult diff --git a/src/vaara/integrations/_mcp_upstream.py b/src/vaara/integrations/_mcp_upstream.py new file mode 100644 index 0000000..da44d87 --- /dev/null +++ b/src/vaara/integrations/_mcp_upstream.py @@ -0,0 +1,187 @@ +"""Upstream MCP subprocess client for the proxy. + +Owns the subprocess lifecycle of an upstream MCP server, demuxes responses +by JSON-RPC id, and routes notifications to a callback for the proxy to +forward downstream. + +Internal module. Public surface is :class:`vaara.integrations.mcp_proxy.VaaraMCPProxy`. +""" + +from __future__ import annotations + +import json +import logging +import os +import subprocess +import sys +import threading +from dataclasses import dataclass +from typing import Any, Callable, Optional + +logger = logging.getLogger(__name__) + + +class ProxyError(Exception): + """The proxy itself cannot serve a request. + + Distinct from upstream-emitted JSON-RPC errors, which are forwarded + verbatim. ProxyError is raised when the proxy-side machinery fails + (upstream subprocess crashed, stdin write failed, response timeout) + and the caller should surface JSON-RPC -32603 Internal error downstream. + """ + + +def strict_json_dumps(obj: Any, **kwargs: Any) -> str: + """JSON dump that fails on NaN/Infinity (RFC 8259 strict). + + Python's default ``json.dumps`` emits ``NaN``/``Infinity``/``-Infinity`` + literals that strict JSON parsers (Go, Rust, browsers, many MCP clients) + reject. Forcing strict output surfaces escaped non-finite values loudly + in tests rather than silently emitting invalid wire format. + """ + return json.dumps(obj, allow_nan=False, **kwargs) + + +@dataclass +class _UpstreamRequest: + id: Any + event: threading.Event + response: Optional[dict] = None + + +class UpstreamMCPClient: + """Spawn an upstream MCP server and communicate over its stdio. + + The reader runs on a background thread that parks responses keyed by + JSON-RPC id and routes notifications to ``on_notification``. The main + thread synchronously calls :meth:`request` and waits. + """ + + def __init__( + self, + command: list[str], + env: Optional[dict[str, str]] = None, + on_notification: Optional[Callable[[dict], None]] = None, + ) -> None: + self._on_notification = on_notification + self._pending: dict[Any, _UpstreamRequest] = {} + self._lock = threading.Lock() + self._closed = False + + # stderr passes through so upstream logs surface in the proxy's + # stderr without contaminating the JSON-RPC channel on stdout. + self._proc = subprocess.Popen( + command, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=sys.stderr, + env=env or os.environ.copy(), + bufsize=1, + text=True, + ) + + self._reader_thread = threading.Thread( + target=self._read_loop, daemon=True, name="upstream-reader", + ) + self._reader_thread.start() + + def request(self, payload: dict, timeout: float = 30.0) -> dict: + """Send a request, wait for the matching response by id. + + Raises :class:`ProxyError` if the upstream has died or the response + does not arrive within ``timeout``. + """ + if self._closed: + raise ProxyError("Upstream MCP server is closed") + if "id" not in payload: + raise ValueError("request() requires a JSON-RPC id; use notify() for notifications") + + pending = _UpstreamRequest(id=payload["id"], event=threading.Event()) + with self._lock: + self._pending[payload["id"]] = pending + try: + self._write(payload) + if not pending.event.wait(timeout=timeout): + raise ProxyError(f"Upstream MCP server did not respond within {timeout}s") + # event was set but response stays None when the reader thread + # exited (upstream closed stdout) and woke us as a shutdown signal. + # An assert would either raise AssertionError (escapes the caller's + # ProxyError handler) or be optimized out under -O (return None, + # silently breaking the contract). Raise ProxyError explicitly. + if pending.response is None: + raise ProxyError("Upstream MCP server closed before responding") + if not isinstance(pending.response, dict): + raise ProxyError("Upstream MCP server returned non-object JSON-RPC") + return pending.response + finally: + with self._lock: + self._pending.pop(payload["id"], None) + + def notify(self, payload: dict) -> None: + """Send a JSON-RPC notification (no response expected).""" + if self._closed: + return + self._write(payload) + + def _write(self, payload: dict) -> None: + if self._proc.stdin is None: + raise ProxyError("Upstream MCP server stdin is closed") + try: + self._proc.stdin.write(strict_json_dumps(payload) + "\n") + self._proc.stdin.flush() + except (BrokenPipeError, OSError) as e: + raise ProxyError(f"Upstream MCP server stdin write failed: {e}") from e + + def _read_loop(self) -> None: + if self._proc.stdout is None: + return + for line in self._proc.stdout: + line = line.strip() + if not line: + continue + try: + message = json.loads(line) + except json.JSONDecodeError: + logger.warning("Upstream emitted non-JSON line: %r", line[:200]) + continue + + # Notifications (no id) route to the callback for downstream forward. + if isinstance(message, dict) and "id" not in message: + if self._on_notification is not None: + try: + self._on_notification(message) + except Exception: + logger.exception("Notification handler raised") + continue + + # Responses demux by id. + response_id = message.get("id") if isinstance(message, dict) else None + with self._lock: + pending = self._pending.get(response_id) + if pending is None: + logger.warning("Upstream response for unknown id %r", response_id) + continue + pending.response = message + pending.event.set() + + # Reader exited: upstream closed stdout. Wake all waiters so they + # fail with ProxyError rather than hanging forever. + self._closed = True + with self._lock: + for pending in self._pending.values(): + pending.event.set() + + def close(self) -> None: + self._closed = True + try: + if self._proc.stdin is not None: + self._proc.stdin.close() + except Exception: + pass + try: + self._proc.terminate() + self._proc.wait(timeout=5) + except subprocess.TimeoutExpired: + self._proc.kill() + except Exception: + pass diff --git a/src/vaara/integrations/mcp_proxy.py b/src/vaara/integrations/mcp_proxy.py new file mode 100644 index 0000000..a511abf --- /dev/null +++ b/src/vaara/integrations/mcp_proxy.py @@ -0,0 +1,196 @@ +"""MCP proxy: Vaara as a transparent runtime governance layer for MCP tool calls. + +Sits between an MCP client (Claude Code, Cursor, any MCP-capable host) and an +upstream MCP server (SAP ADT MCP, SAP Graph API MCP, SAP Cloud ALM MCP, any +community-built MCP server). Forwards every request to the upstream, but +routes ``tools/call`` through Vaara's interception pipeline first. Allowed +calls flow through transparently. Blocked calls return an MCP tool error. +""" + +from __future__ import annotations + +import argparse +import json +import logging +import os +import sys +import threading +from pathlib import Path +from typing import Any, Optional + +from vaara import __version__ as _VAARA_VERSION +from vaara.audit.sqlite_backend import SQLiteAuditBackend +from vaara.audit.trail import AuditTrail +from vaara.integrations._mcp_upstream import ( + ProxyError, UpstreamMCPClient, strict_json_dumps, +) +from vaara.pipeline import InterceptionPipeline + +logger = logging.getLogger(__name__) + + +class VaaraMCPProxy: + """Transparent MCP proxy with Vaara interception on tool calls.""" + + PROXY_NAME = f"vaara-mcp-proxy/{_VAARA_VERSION}" + + def __init__( + self, + upstream_command: list[str], + pipeline: Optional[InterceptionPipeline] = None, + db_path: Optional[Path] = None, + agent_id_default: str = "mcp-proxy-client", + ) -> None: + if pipeline is not None: + self._pipeline = pipeline + self._backend = None + else: + db = db_path or Path(os.environ.get("VAARA_DB", "vaara_audit.db")) + self._backend = SQLiteAuditBackend(db) + trail = AuditTrail(on_record=self._backend.write_record) + self._pipeline = InterceptionPipeline(trail=trail) + self._agent_id_default = agent_id_default + self._stdout_lock = threading.Lock() + self._upstream = UpstreamMCPClient( + command=upstream_command, + on_notification=self._forward_notification_to_client, + ) + + def run(self) -> None: + """Read JSON-RPC from stdin, write to stdout, route through upstream.""" + logger.info("Vaara MCP proxy starting on stdio (%s)", self.PROXY_NAME) + for line in sys.stdin: + line = line.strip() + if not line: + continue + try: + request = json.loads(line) + except json.JSONDecodeError: + self._write_to_client(self._error_response(None, -32700, "Parse error")) + continue + # Notifications (no id) forward silently per JSON-RPC 2.0 §4.1. + if isinstance(request, dict) and "id" not in request: + try: + self._upstream.notify(request) + except ProxyError: + logger.exception("Failed to forward notification") + continue + self._write_to_client(self._handle_request(request)) + + def _handle_request(self, request: Any) -> dict: + if not isinstance(request, dict): + return self._error_response(None, -32600, "Invalid Request: not a JSON object") + method = request.get("method", "") + req_id = request.get("id") + if method == "tools/call": + try: + return self._handle_tools_call(request) + except ProxyError as e: + return self._error_response(req_id, -32603, str(e)) + except Exception: + logger.exception("Error in tools/call interception") + return self._error_response(req_id, -32603, "Internal proxy error") + try: + return self._upstream.request(request) + except ProxyError as e: + return self._error_response(req_id, -32603, f"Upstream unavailable: {e}") + + def _handle_tools_call(self, request: dict) -> dict: + params = request.get("params") or {} + if not isinstance(params, dict): + params = {} + tool_name = params.get("name", "") + arguments = params.get("arguments", {}) or {} + if not isinstance(arguments, dict): + arguments = {} + # _vaara_agent_id is a proxy-side override for audit attribution; + # strip before forwarding so the upstream never sees Vaara metadata. + agent_id = arguments.pop("_vaara_agent_id", self._agent_id_default) + if not isinstance(agent_id, str): + agent_id = self._agent_id_default + # Unknown upstream tool names classify as generic high-risk in the + # registry (fail-closed). Correct default for runtime governance. + result = self._pipeline.intercept( + agent_id=agent_id, tool_name=tool_name, parameters=arguments, + ) + if not result.allowed: + block_payload = { + "vaara_blocked": True, + "reason": getattr(result, "reason", None) or "Blocked by Vaara policy", + "decision": getattr(result, "decision", None), + "action_id": getattr(result, "action_id", None), + } + return { + "jsonrpc": "2.0", "id": request.get("id"), + "result": { + "content": [{"type": "text", "text": strict_json_dumps(block_payload, indent=2)}], + "isError": True, + }, + } + upstream_response = self._upstream.request(request) + outcome_severity = self._severity_from_response(upstream_response) + try: + self._pipeline.report_outcome( + action_id=result.action_id, outcome_severity=outcome_severity, + ) + except Exception: + logger.exception("report_outcome failed for action_id=%s", result.action_id) + return upstream_response + + @staticmethod + def _severity_from_response(response: dict) -> float: + # Protocol/tool errors → 1.0 (failure signal). Clean success → 0.0. + if not isinstance(response, dict) or "error" in response: + return 1.0 + result = response.get("result") + if isinstance(result, dict) and result.get("isError"): + return 1.0 + return 0.0 + + def _forward_notification_to_client(self, message: dict) -> None: + self._write_to_client(message) + + def _write_to_client(self, payload: dict) -> None: + # Serialize stdout writes between main loop and upstream reader thread. + with self._stdout_lock: + sys.stdout.write(strict_json_dumps(payload) + "\n") + sys.stdout.flush() + + @staticmethod + def _error_response(req_id: Any, code: int, message: str) -> dict: + return {"jsonrpc": "2.0", "id": req_id, "error": {"code": code, "message": message}} + + def close(self) -> None: + self._upstream.close() + if self._backend is not None: + self._backend.close() + + +def main(argv: Optional[list[str]] = None) -> None: + parser = argparse.ArgumentParser( + prog="vaara-mcp-proxy", + description="Vaara runtime governance proxy in front of an upstream MCP server.", + ) + parser.add_argument("--upstream", required=True, help="Upstream MCP server command") + parser.add_argument("--upstream-arg", action="append", default=[], dest="upstream_args", + help="Argument to pass to the upstream command (repeatable)") + parser.add_argument("--db", type=Path, default=None, + help="Audit database path (default: $VAARA_DB or ./vaara_audit.db)") + parser.add_argument("--agent-id", default="mcp-proxy-client", + help="Default agent_id for the audit trail") + args = parser.parse_args(argv) + logging.basicConfig(level=logging.INFO, + format="%(asctime)s %(name)s %(levelname)s %(message)s", + stream=sys.stderr) + proxy = VaaraMCPProxy( + upstream_command=[args.upstream, *args.upstream_args], + db_path=args.db, agent_id_default=args.agent_id, + ) + try: + proxy.run() + finally: + proxy.close() + + +if __name__ == "__main__": + main() diff --git a/tests/test_integrations_mcp_proxy.py b/tests/test_integrations_mcp_proxy.py new file mode 100644 index 0000000..54e9f89 --- /dev/null +++ b/tests/test_integrations_mcp_proxy.py @@ -0,0 +1,169 @@ +"""Tests for vaara.integrations.mcp_proxy. + +Smoke tests covering the tools/call interception path with a mocked upstream +and a controlled pipeline. The subprocess machinery in ``_mcp_upstream`` has +its own integration shape and is covered separately when a real upstream +command is available locally. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from unittest.mock import MagicMock + +import pytest + + +@dataclass +class _StubInterceptResult: + allowed: bool + action_id: str = "stub-action-id" + reason: str = "" + decision: str = "ALLOW" + + +@pytest.fixture +def proxy(monkeypatch): + """A VaaraMCPProxy with both UpstreamMCPClient and pipeline mocked. + + The proxy normally spawns a subprocess in __init__. We patch that out so + tests can exercise the request-dispatch and interception logic in + isolation, without any real MCP server on disk. + """ + from vaara.integrations import mcp_proxy + + monkeypatch.setattr(mcp_proxy, "UpstreamMCPClient", MagicMock()) + pipeline = MagicMock() + p = mcp_proxy.VaaraMCPProxy( + upstream_command=["echo"], + pipeline=pipeline, + ) + p._upstream = MagicMock() + return p, pipeline + + +def test_blocked_tool_call_returns_mcp_tool_error(proxy): + p, pipeline = proxy + pipeline.intercept.return_value = _StubInterceptResult( + allowed=False, reason="risk too high", decision="DENY", action_id="abc-123", + ) + request = { + "jsonrpc": "2.0", "id": 7, "method": "tools/call", + "params": {"name": "sap.abap.write", "arguments": {"path": "/etc/something"}}, + } + response = p._handle_tools_call(request) + assert response["jsonrpc"] == "2.0" + assert response["id"] == 7 + assert response["result"]["isError"] is True + text = response["result"]["content"][0]["text"] + assert "risk too high" in text + assert "vaara_blocked" in text + p._upstream.request.assert_not_called() + pipeline.report_outcome.assert_not_called() + + +def test_allowed_tool_call_forwards_and_reports_outcome(proxy): + p, pipeline = proxy + pipeline.intercept.return_value = _StubInterceptResult(allowed=True, action_id="xyz-999") + upstream_response = { + "jsonrpc": "2.0", "id": 9, + "result": {"content": [{"type": "text", "text": "ok"}]}, + } + p._upstream.request.return_value = upstream_response + + request = { + "jsonrpc": "2.0", "id": 9, "method": "tools/call", + "params": {"name": "sap.adt.read", "arguments": {"object": "ZCL_X"}}, + } + response = p._handle_tools_call(request) + assert response is upstream_response + p._upstream.request.assert_called_once_with(request) + pipeline.report_outcome.assert_called_once() + kwargs = pipeline.report_outcome.call_args.kwargs + assert kwargs["action_id"] == "xyz-999" + assert kwargs["outcome_severity"] == 0.0 + + +def test_upstream_error_response_yields_high_severity(proxy): + p, pipeline = proxy + pipeline.intercept.return_value = _StubInterceptResult(allowed=True, action_id="err-1") + p._upstream.request.return_value = { + "jsonrpc": "2.0", "id": 1, "error": {"code": -32000, "message": "upstream tool blew up"}, + } + request = { + "jsonrpc": "2.0", "id": 1, "method": "tools/call", + "params": {"name": "sap.adt.write", "arguments": {}}, + } + p._handle_tools_call(request) + assert pipeline.report_outcome.call_args.kwargs["outcome_severity"] == 1.0 + + +def test_vaara_agent_id_override_is_stripped_before_forward(proxy): + p, pipeline = proxy + pipeline.intercept.return_value = _StubInterceptResult(allowed=True, action_id="strip-1") + p._upstream.request.return_value = {"jsonrpc": "2.0", "id": 2, "result": {}} + + request = { + "jsonrpc": "2.0", "id": 2, "method": "tools/call", + "params": { + "name": "sap.adt.read", + "arguments": {"_vaara_agent_id": "agent-007", "object": "ZCL_X"}, + }, + } + p._handle_tools_call(request) + intercept_kwargs = pipeline.intercept.call_args.kwargs + assert intercept_kwargs["agent_id"] == "agent-007" + # Forwarded arguments must not leak the Vaara-internal key. + assert "_vaara_agent_id" not in intercept_kwargs["parameters"] + forwarded = p._upstream.request.call_args.args[0] + assert "_vaara_agent_id" not in forwarded["params"]["arguments"] + + +def test_non_tools_call_forwards_verbatim(proxy): + p, pipeline = proxy + p._upstream.request.return_value = { + "jsonrpc": "2.0", "id": 3, "result": {"tools": [{"name": "sap.adt.read"}]}, + } + request = {"jsonrpc": "2.0", "id": 3, "method": "tools/list"} + response = p._handle_request(request) + p._upstream.request.assert_called_once_with(request) + pipeline.intercept.assert_not_called() + assert response["result"]["tools"][0]["name"] == "sap.adt.read" + + +def test_invalid_request_returns_minus_32600(proxy): + p, _ = proxy + response = p._handle_request("not a dict") + assert response["error"]["code"] == -32600 + + +def test_upstream_request_raises_proxy_error_when_reader_exits_without_response(monkeypatch): + """Regression: reader-thread exit during request must raise ProxyError, not AssertionError. + + The reader sets pending.event on exit (so waiters do not hang) but does + not populate pending.response. Without an explicit ProxyError the + request() path's assertion either raises AssertionError (escapes the + caller's ProxyError handler) or is optimized out under python -O + (returns None silently). Flagged by CodeRabbit on PR #100. + """ + from vaara.integrations import _mcp_upstream as up + + fake_proc = MagicMock() + fake_proc.stdin = MagicMock() + fake_proc.stdout = None + monkeypatch.setattr(up.subprocess, "Popen", lambda *a, **k: fake_proc) + + client = up.UpstreamMCPClient(command=["dummy"]) + + real_request_cls = up._UpstreamRequest + + def pre_signaled(*args, **kwargs): + r = real_request_cls(*args, **kwargs) + r.event.set() # waiter unblocks immediately with response still None + return r + + monkeypatch.setattr(up, "_UpstreamRequest", pre_signaled) + + with pytest.raises(up.ProxyError, match="closed before responding"): + client.request({"jsonrpc": "2.0", "id": 1, "method": "ping"}) + client.close()