diff --git a/.gitignore b/.gitignore index f826704..dbd3812 100644 --- a/.gitignore +++ b/.gitignore @@ -43,3 +43,34 @@ claude-code-audit.db .pr_body_*.md .issue_body_*.md .comment_body_*.md + +# Private scratch drafts (replies, research, proposals, BD) — never publish +.recruiter_* +.reply_* +.research_* +.proposal_* +.tier1_* +.brand_book.md + +# One-off ops/deploy scratch scripts and payloads — never publish +.apply_*.sh +.fix_*.sh +.deploy_*.sh +.restart_*.sh +.style_*.sh +.pr_create_*.sh +.pr_comment_*.md +.tag_payload.json +.gen_evidence_pair.py +.tmp_*.py + +# Stray shell/editor env dotfiles (not part of the repo) +.bashrc +.bash_profile +.zshrc +.zprofile +.profile +.gitconfig +.ripgreprc +.idea/ +.vscode/ diff --git a/CHANGELOG.md b/CHANGELOG.md index fc90de8..68bc16d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,57 @@ and this project follows [Semantic Versioning](https://semver.org/spec/v2.0.0.ht ## [Unreleased] +## [0.45.1] - 2026-05-30 + +**Theme: audit-finding fixes on the remote HTTP connector, the HTTP transport, and the public numbers.** + +### Security +- SSRF egress floor on the `--upstream-url` connector. The remote HTTP connector + handed a user-supplied upstream URL straight to `urllib` and followed + redirects with the static `Authorization` header attached, so a hostile or + compromised upstream (or an attacker controlling a redirect target) could aim + the proxy at the cloud instance-metadata service or an internal host and have + it fetch the target with the operator's bearer token. The new `_egress_guard` + resolves the host and refuses loopback, link-local, RFC1918, IPv6 ULA, and the + cloud-metadata address (including its dotless and IPv4-mapped encodings) before + any socket opens; a guarded opener caps redirects, re-applies the floor to each + hop, and drops the auth header on a cross-origin redirect. Default is SAFE; a + trusted internal upstream is opted in via `--allow-private-upstream-hosts`, + the `allow_private_hosts` constructor arg, or the + `VAARA_MCP_ALLOW_PRIVATE_UPSTREAM` env flag. The metadata address stays refused + even with the opt-in. +- DNS-rebind closure on that egress floor. Resolving the host and then handing + the name back to `urllib` left a gap: `urllib` re-resolved at socket-connect, + so a name that answered with a public address at the check and a blocked one a + moment later (a time-split rebind) reached the blocked target with the auth + header attached. The connector now validates and pins the address at connect + time and dials the IP literal, so the address that passed the floor is the + exact address the socket reaches; HTTPS still verifies the certificate against + the original hostname. The pin is re-applied on every redirect hop. An absent + `--allow-private-upstream-hosts` flag now leaves the + `VAARA_MCP_ALLOW_PRIVATE_UPSTREAM` env opt-in live instead of silently + shadowing it with a `False`. + +### Fixed +- HTTP transport no longer serialises concurrent requests. The POST `/mcp` + endpoint ran the blocking `_handle_request` inline on the event loop, so one + slow upstream stalled every other POST, SSE drain, and `/health` (real + concurrency 1). It now runs on a worker thread via `asyncio.to_thread`, with + the per-request ContextVars preserved across the hop through + `contextvars.copy_context()`. +- SSE reconnect race that dropped notifications for the live session. On + reconnect under the same `Mcp-Session-Id`, the old stream's teardown + unregistered the NEW session. `unregister_session` is now identity-checked and + only removes the entry when it is still the tearing-down stream's own state. +- README mislabelled the rule-scorer latency as classifier latency. The + 140 µs / 210 µs figure is the hot-path rule scorer; the MiniLM classifier is + opt-in (`vaara[ml]`) and not in that path. Also surfaces the cross-model + held-out recall (66.8%) and its weakest sub-cell (38.9%) the bench docs + already disclose. +- `llms.txt` advertised a two-generations-stale classifier (5,955-entry corpus, + 97.1% at threshold 0.55). Regenerated from the current v9 numbers and switched + the lede to the tamper-evident runtime evidence framing. + ## [0.45.0] - 2026-05-30 **Theme: reach remote MCP upstreams over HTTP, and make the proxy's Streamable HTTP handling conform to the spec.** diff --git a/README.md b/README.md index 61afcfc..e6ab3a4 100644 --- a/README.md +++ b/README.md @@ -20,14 +20,15 @@ Vaara intercepts agent tool calls, scores each one with a conformal risk interva ## Numbers -Held-out TEST recall 84.7% (95% Wilson [82.4, 86.7]) at FPR 4.1% [2.9, 5.7]. Phase 1 PAIR scale-up to n=300 per attacker family lands at 88.1% [85.8, 90.1]. Under BIPIA-pressure context, false-positive rate on benign tool calls 1.2% [0.4, 3.6] across four agent backends (Claude Haiku 4.5, Llama-3.1-8B, Mistral-7B, Qwen-2.5-7B). Multi-attacker PAIR ASR 0/25 across three different attacker models with identical seeds. 140 µs mean / 210 µs p99 inference latency on commodity CPU (excluding one-time embedding model load). Every number reproducible end-to-end via `make bench`. +Held-out TEST recall 84.7% (95% Wilson [82.4, 86.7]) at FPR 4.1% [2.9, 5.7]. Phase 1 PAIR scale-up to n=300 per attacker family lands at 88.1% [85.8, 90.1]. Cross-model held-out recall, where no attacker model in the eval set was in TRAIN, is 66.8% [64.9, 68.7] over n=2,277; the weakest sub-cell is data_exfil against a closed-weight model at 38.9% [35.3, 42.5] (see [vaara-bench-v0.37](bench/vaara-bench-v0.37.md)). Under BIPIA-pressure context, false-positive rate on benign tool calls 1.2% [0.4, 3.6] across four agent backends (Claude Haiku 4.5, Llama-3.1-8B, Mistral-7B, Qwen-2.5-7B). Multi-attacker PAIR ASR 0/25 across three different attacker models with identical seeds. The rule scorer that runs in the hot path adds 140 µs mean / 210 µs p99 per call on commodity CPU; the MiniLM classifier is opt-in (`vaara[ml]`) and is not in that measured path. Every number reproducible end-to-end via `make bench`. - 12,155-entry adversarial corpus (250 hand-curated + 11,905 LLM-generated), 70/15/15 split stratified by (category, source) - Classifier v9 with 236 hand-features + 384-dim MiniLM embeddings at calibrated threshold 0.9150 on held-out TEST n=1,827: recall 84.7% [82.4, 86.7] at FPR 4.1% [2.9, 5.7] - Multi-attacker PAIR robustness: 0/25 successes per attacker across Qwen2.5-32B, Qwen2.5-72B, Llama-3.3-70B hitting identical seed indices, Wilson upper 13.3% - BIPIA-pressure FPR on benign tool calls 1.2% [0.4, 3.6] across four agent backends, n=244 benign tool calls under `context.source=injected_via_bipia_` +- Cross-model held-out recall 66.8% [64.9, 68.7] over n=2,277 with no eval-set attacker model in TRAIN; data_exfil generalises unevenly, with a closed-weight sub-cell at 38.9% [35.3, 42.5]. This is the honest worst case; the in-distribution TEST number above is the easier denominator - Chain of custody: corpus manifest SHA, split manifest SHA, training commit, bundle SHA, all locked and printed by every script -- 140 µs mean / 210 µs p99 inference latency, commodity CPU +- 140 µs mean / 210 µs p99 for the hot-path rule scorer on commodity CPU; the MiniLM classifier is opt-in (`vaara[ml]`) and not in that path - Distribution-free conformal coverage on the score - MWU regret bound O(sqrt(T log N)) - [vaara-bench-v0.39](bench/vaara-bench-v0.39.md): current methodology, chain of custody, ship-gate record. v9 retrain on BIPIA-augmented corpus with follows upweighted (`--follow-weight 8.0`), calibrated to T=0.9150 at a 5% FPR target on v035 VAL. BIPIA-pressure FPR collapses from 35.2% on v8 to 1.2% on v9. In-distribution recall flat within Wilson intervals. Found-and-fixed in tree: auto-labeller `example.com` placeholder false-positive rule (42 to 14 true follows across four backends). Historical bench docs live under `bench/` for chain-of-custody continuity. @@ -162,6 +163,8 @@ vaara-mcp-proxy \ Point your MCP client at the proxy instead of the upstream. The audit chain captures every tool call without changing client or upstream behavior. Distinct from `mcp_server`, which exposes Vaara itself as an MCP server for agents that consult Vaara as a tool. +Upstreams can be local or remote. `--upstream` launches a local stdio MCP server; `--upstream-url NAME=URL` connects to a remote MCP server over the Streamable HTTP transport, and a bare `--upstream-url URL` lands in the `default` slot. Each slot is one transport or the other, never both. +
Fleet shape (v0.40): one proxy, many upstreams, multi-tenant policy diff --git a/clients/ts/package.json b/clients/ts/package.json index bb3c259..de96f93 100644 --- a/clients/ts/package.json +++ b/clients/ts/package.json @@ -1,6 +1,6 @@ { "name": "@vaara/client", - "version": "0.45.0", + "version": "0.45.1", "mcpName": "io.github.vaaraio/vaara", "description": "TypeScript client for the Vaara HTTP API. Conformal risk scoring, hash-chained audit, policy reload, named detectors.", "main": "dist/index.js", diff --git a/llms.txt b/llms.txt index 128c834..ab0037c 100644 --- a/llms.txt +++ b/llms.txt @@ -1,10 +1,10 @@ # Vaara -> Runtime evidence layer for EU AI Act compliance. Open source, no SaaS, no telemetry. +> Tamper-evident runtime evidence layer for AI agents. Covers EU AI Act compliance and any case where you need to prove what an agent actually did. Open source, no SaaS, no telemetry. -Vaara intercepts agent tool calls, scores each one with a conformal risk interval, and writes a hash-chained audit record. Online learning across five expert signals via Multiplicative Weight Update. Distribution-free conformal coverage on the score. +Vaara intercepts agent tool calls, scores each one with a conformal risk interval, and writes a hash-chained audit record. Online learning across five expert signals via Multiplicative Weight Update. Distribution-free conformal coverage on the score. An external auditor can verify these properties without trusting your stack. -Position: runtime governance and enforcement layer. Implements OVERT 1.0 (Glacis Technologies, March 2026) as the Arbiter role at AAL-3 Phase 2. +Position: tamper-evident runtime evidence and enforcement layer. Signed attestation plus execution receipts pair each MCP tool call to the policy that allowed it. ## Repo and packages - [GitHub source](https://github.com/vaaraio/vaara): code, releases, issue tracker @@ -26,10 +26,12 @@ Position: runtime governance and enforcement layer. Implements OVERT 1.0 (Glacis - OVERT 1.0 emitter, verifier CLI, S3P (MEA-2) emitter with Clopper-Pearson intervals, experimental AMD SEV-SNP TEE attestation hook ## Numbers -- 5,955-entry adversarial corpus (3,422 attack across 8 categories, 2,533 benign) -- 97.1% attack recall on held-out distribution-shift split, threshold 0.55 -- PAIR adaptive-attacker calibration: ASR 0/25 against Qwen2.5-32B -- 140 µs / 210 µs p99 inference latency, commodity CPU +- 12,155-entry adversarial corpus (250 hand-curated + 11,905 LLM-generated), 70/15/15 split stratified by (category, source) +- Classifier v9 (236 hand-features + 384-dim MiniLM embeddings) at calibrated threshold 0.9150: held-out TEST recall 84.7% [82.4, 86.7] at FPR 4.1% [2.9, 5.7], n=1,827 +- Cross-model held-out recall 66.8% [64.9, 68.7] over n=2,277 with no eval-set attacker model in TRAIN; weakest sub-cell (data_exfil, closed-weight) 38.9% [35.3, 42.5] +- BIPIA-pressure FPR on benign tool calls 1.2% [0.4, 3.6] across four agent backends +- Multi-attacker PAIR ASR 0/25 per attacker across Qwen2.5-32B, Qwen2.5-72B, Llama-3.3-70B at identical seeds +- 140 µs mean / 210 µs p99 for the hot-path rule scorer, commodity CPU; the MiniLM classifier is opt-in (`vaara[ml]`) and not in that path ## Optional - [Article 14 runtime](https://futurium.ec.europa.eu/ga/apply-ai-alliance/community-content/article-14-runtime-why-oversight-agentic-ai-has-be-evidenced-action-not-model): position post on EU Apply AI Alliance Futurium diff --git a/pyproject.toml b/pyproject.toml index 8ddbd24..a837fbc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,8 +4,8 @@ build-backend = "setuptools.build_meta" [project] name = "vaara" -version = "0.45.0" -description = "Tamper-evident runtime evidence layer for AI agents: risk scoring, audit trails, and regulatory compliance" +version = "0.45.1" +description = "Tamper-evident runtime evidence layer for AI agents: conformal risk scoring, hash-chained audit trails, and signed attestation plus execution receipts per MCP tool call" requires-python = ">=3.10" license = "Apache-2.0" readme = "README.md" diff --git a/server.json b/server.json index 48f27d4..4035854 100644 --- a/server.json +++ b/server.json @@ -8,13 +8,13 @@ "url": "https://github.com/vaaraio/vaara", "source": "github" }, - "version": "0.45.0", + "version": "0.45.1", "packages": [ { "registryType": "pypi", "registryBaseUrl": "https://pypi.org", "identifier": "vaara", - "version": "0.45.0", + "version": "0.45.1", "runtimeHint": "uvx", "transport": { "type": "stdio" diff --git a/src/vaara/__init__.py b/src/vaara/__init__.py index cff2d93..51f7f38 100644 --- a/src/vaara/__init__.py +++ b/src/vaara/__init__.py @@ -6,7 +6,7 @@ oversight. """ -__version__ = "0.45.0" +__version__ = "0.45.1" from vaara.pipeline import InterceptionPipeline, InterceptionResult diff --git a/src/vaara/cli.py b/src/vaara/cli.py index 47da9a7..6613c6a 100644 --- a/src/vaara/cli.py +++ b/src/vaara/cli.py @@ -710,19 +710,25 @@ def _cmd_trail_receipt(args: argparse.Namespace) -> int: def _cmd_compliance_dashboard(args: argparse.Namespace) -> int: - from vaara.audit.sqlite_backend import SQLiteAuditTrail + from vaara.audit.sqlite_backend import SQLiteAuditBackend from vaara.compliance.dashboard import render_html - from vaara.compliance.engine import ComplianceEngine + from vaara.compliance.engine import create_default_engine db_path = Path(args.db).expanduser() if not db_path.is_file(): print(f"vaara compliance dashboard: not a file: {db_path}", file=sys.stderr) return 2 - trail = SQLiteAuditTrail(str(db_path)) - engine = ComplianceEngine() + backend = SQLiteAuditBackend(str(db_path)) + try: + trail = backend.load_trail() + except Exception as exc: + print(f"failed to load audit trail: {exc}", file=sys.stderr) + return 2 + + engine = create_default_engine() report = engine.assess( - trail=trail, + trail, system_name=args.system_name, system_version=args.system_version, ) diff --git a/src/vaara/integrations/_egress_guard.py b/src/vaara/integrations/_egress_guard.py new file mode 100644 index 0000000..4412ffa --- /dev/null +++ b/src/vaara/integrations/_egress_guard.py @@ -0,0 +1,351 @@ +"""SSRF egress guard for the remote MCP HTTP connector. + +The ``--upstream-url`` connector hands a user-supplied URL to ``urllib`` and +follows redirects. Without a guard a hostile or compromised upstream (or an +attacker who controls a redirect target) can point the proxy at the cloud +instance-metadata service, a loopback admin port, or an internal RFC1918 host, +and have the proxy fetch it with the operator's static auth headers attached. +This module is the host-resolution floor that refuses those targets before any +socket is opened, plus the custom ``urllib`` opener that re-applies the floor +on every redirect hop and drops the ``Authorization`` header on a cross-origin +redirect. + +Default posture is SAFE: loopback, link-local (IPv4 169.254/16 and IPv6 +fe80::/10), RFC1918, IPv6 ULA (fc00::/7), the cloud-metadata addresses, and the +dotless decimal/hex encodings of 169.254.169.254 are all refused. An operator +who needs a trusted internal host opts in explicitly, per client +(``allow_private_hosts=True``) or process-wide +(``VAARA_MCP_ALLOW_PRIVATE_UPSTREAM=1``). The opt-in never disables the +cross-origin ``Authorization`` drop, never raises the redirect cap, and never +reopens the metadata address. + +Internal module. Public surface is :mod:`vaara.integrations._mcp_upstream_http`. +""" + +from __future__ import annotations + +import http.client +import ipaddress +import os +import socket +import urllib.error +import urllib.request +from typing import Any, Optional +from urllib.parse import urlsplit + +# urllib's default redirect cap is 10; a remote MCP endpoint that needs more +# than a couple of redirects to answer a JSON-RPC POST is broken or hostile. +_MAX_REDIRECTS = 3 + +# Process-wide opt-in to permit private/loopback targets. Read at call time so +# tests and embedders can set it per process. +_ALLOW_ENV = "VAARA_MCP_ALLOW_PRIVATE_UPSTREAM" + +_METADATA_V4 = ipaddress.IPv4Address("169.254.169.254") +_METADATA_V6 = ipaddress.IPv6Address("fe80::a9fe:a9fe") + + +class EgressBlocked(Exception): + """An upstream URL resolves to an address the egress floor refuses.""" + + +def _env_allows_private() -> bool: + return os.environ.get(_ALLOW_ENV, "").strip().lower() in ("1", "true", "yes", "on") + + +def _is_metadata(ip: ipaddress._BaseAddress) -> bool: + """True iff the address is a cloud instance-metadata endpoint. + + Refused unconditionally (even under the private-host opt-in): there is no + legitimate reason to dial instance-metadata through the proxy. + """ + mapped = getattr(ip, "ipv4_mapped", None) + if mapped is not None: # ::ffff:a.b.c.d judged on the embedded v4 address + ip = mapped + return ip == _METADATA_V4 or ip == _METADATA_V6 + + +def _ip_is_blocked(ip: ipaddress._BaseAddress) -> bool: + """True iff this resolved address must never be reached by default. + + Covers the metadata addresses plus loopback, link-local (IPv4 169.254/16 + and IPv6 fe80::/10), private (RFC1918 and ULA fc00::/7), unspecified, + reserved, and multicast. + """ + mapped = getattr(ip, "ipv4_mapped", None) + if mapped is not None: # ::ffff:a.b.c.d judged on the embedded v4 address + ip = mapped + return ( + _is_metadata(ip) + or ip.is_loopback + or ip.is_link_local + or ip.is_private + or ip.is_unspecified + or ip.is_reserved + or ip.is_multicast + ) + + +def _coerce_dotless_host(host: str) -> Optional[ipaddress._BaseAddress]: + """Parse a bare decimal or hex integer host (``2852039166``, ``0xa9fea9fe``). + + Browsers and ``inet_aton`` accept these as IPv4; ``ipaddress`` does not, so + we decode them ourselves. Returns the address when the host is such an + integer, else None. + """ + base = 16 if host.lower().startswith("0x") else 10 + try: + value = int(host, base) + except ValueError: + return None + if 0 <= value <= 0xFFFFFFFF: + return ipaddress.IPv4Address(value) + return None + + +def assert_url_egress_allowed(url: str, *, allow_private: bool = False) -> None: + """Refuse ``url`` if its host resolves to a blocked address. + + Resolves the hostname (every returned A/AAAA record is checked, so a + DNS-rebinding answer mixing a public and a private address is still refused) + and applies the floor. A literal IP host is checked directly. The dotless + encodings of the metadata address are refused even when ``allow_private`` is + set: there is no legitimate reason to dial instance-metadata through the + proxy. Raises :class:`EgressBlocked` on a refused or unresolvable target. + """ + parts = urlsplit(url) + if parts.scheme not in ("http", "https"): + raise EgressBlocked(f"upstream URL scheme must be http or https: {url!r}") + host = parts.hostname + if not host: + raise EgressBlocked(f"upstream URL has no host: {url!r}") + + dotless = _coerce_dotless_host(host) + if dotless is not None: + if dotless == _METADATA_V4: + raise EgressBlocked( + f"upstream URL targets the cloud-metadata address: {url!r}", + ) + if not allow_private and _ip_is_blocked(dotless): + raise EgressBlocked(f"upstream URL resolves to a blocked address: {url!r}") + return + + try: + literal = ipaddress.ip_address(host) + except ValueError: + literal = None + if literal is not None: + if _is_metadata(literal): + raise EgressBlocked( + f"upstream URL targets the cloud-metadata address: {url!r}", + ) + if not allow_private and _ip_is_blocked(literal): + raise EgressBlocked(f"upstream URL resolves to a blocked address: {url!r}") + return + + try: + infos = socket.getaddrinfo(host, parts.port, proto=socket.IPPROTO_TCP) + except socket.gaierror as exc: + raise EgressBlocked(f"upstream host does not resolve: {host!r} ({exc})") from exc + for info in infos: + ip = ipaddress.ip_address(info[4][0]) + if _is_metadata(ip): + raise EgressBlocked( + f"upstream host {host!r} resolves to the cloud-metadata address", + ) + if not allow_private and _ip_is_blocked(ip): + raise EgressBlocked( + f"upstream host {host!r} resolves to a blocked address {ip}", + ) + + +def pick_egress_ip(host: str, port: Optional[int], *, allow_private: bool = False) -> str: + """Resolve ``host`` and return the single IP the caller must dial. + + Mirrors :func:`assert_url_egress_allowed`'s floor but *returns the + address to connect to* instead of only validating. The caller dials + that IP literal, so the kernel never performs a second DNS lookup at + socket-connect time. That closes the rebind TOCTOU: the address that + passed the floor is the exact address the socket reaches, even if the + name re-resolves to a blocked target a millisecond later. + + Every resolved address is checked (a rebind answer mixing a public and + a blocked address is still refused); the first one is pinned. Literal + and dotless-integer hosts are returned directly after the same checks. + Raises :class:`EgressBlocked` on a refused or unresolvable target. + """ + dotless = _coerce_dotless_host(host) + if dotless is not None: + if dotless == _METADATA_V4: + raise EgressBlocked(f"upstream host targets the cloud-metadata address: {host!r}") + if not allow_private and _ip_is_blocked(dotless): + raise EgressBlocked(f"upstream host resolves to a blocked address: {host!r}") + return str(dotless) + + try: + literal = ipaddress.ip_address(host) + except ValueError: + literal = None + if literal is not None: + if _is_metadata(literal): + raise EgressBlocked(f"upstream host targets the cloud-metadata address: {host!r}") + if not allow_private and _ip_is_blocked(literal): + raise EgressBlocked(f"upstream host resolves to a blocked address: {host!r}") + return str(literal) + + try: + infos = socket.getaddrinfo(host, port, proto=socket.IPPROTO_TCP) + except socket.gaierror as exc: + raise EgressBlocked(f"upstream host does not resolve: {host!r} ({exc})") from exc + chosen: Optional[str] = None + for info in infos: + addr = info[4][0] + ip = ipaddress.ip_address(addr) + if _is_metadata(ip): + raise EgressBlocked(f"upstream host {host!r} resolves to the cloud-metadata address") + if not allow_private and _ip_is_blocked(ip): + raise EgressBlocked(f"upstream host {host!r} resolves to a blocked address {ip}") + if chosen is None: + chosen = addr + if chosen is None: + raise EgressBlocked(f"upstream host does not resolve: {host!r}") + return chosen + + +class _PinnedHTTPConnection(http.client.HTTPConnection): + """HTTP connection that validates+pins the host's IP at connect time. + + ``self.host`` (the original name) stays the Host header; the socket is + opened to the validated IP literal so no re-resolution can occur + between the egress check and the connect. + """ + + def __init__(self, host: str, *, _allow_private: bool = False, **kwargs: Any) -> None: + super().__init__(host, **kwargs) + self._allow_private = _allow_private + + def connect(self) -> None: # noqa: D102 + ip = pick_egress_ip(self.host, self.port, allow_private=self._allow_private) + self.sock = socket.create_connection((ip, self.port), self.timeout, self.source_address) + if self._tunnel_host: + self._tunnel() + + +class _PinnedHTTPSConnection(http.client.HTTPSConnection): + """HTTPS counterpart to :class:`_PinnedHTTPConnection`. + + The TCP connect targets the pinned IP; the TLS handshake still uses the + original hostname for SNI and certificate verification, so a rebind to + an unvalidated address cannot also present a valid certificate. + """ + + def __init__(self, host: str, *, _allow_private: bool = False, **kwargs: Any) -> None: + super().__init__(host, **kwargs) + self._allow_private = _allow_private + + def connect(self) -> None: # noqa: D102 + ip = pick_egress_ip(self.host, self.port, allow_private=self._allow_private) + sock = socket.create_connection((ip, self.port), self.timeout, self.source_address) + if self._tunnel_host: + self.sock = sock + self._tunnel() + server_hostname = self._tunnel_host + else: + server_hostname = self.host + self.sock = self._context.wrap_socket(sock, server_hostname=server_hostname) + + +class _PinnedHTTPHandler(urllib.request.HTTPHandler): + """urllib handler that dials plain HTTP through a validated, pinned IP.""" + + def __init__(self, allow_private: bool) -> None: + super().__init__() + self._allow_private = allow_private + + def http_open(self, req: urllib.request.Request) -> Any: # noqa: D102 + allow_private = self._allow_private + + def factory(host: str, **kwargs: Any) -> _PinnedHTTPConnection: + return _PinnedHTTPConnection(host, _allow_private=allow_private, **kwargs) + + return self.do_open(factory, req) + + +class _PinnedHTTPSHandler(urllib.request.HTTPSHandler): + """urllib handler that dials HTTPS through a validated, pinned IP.""" + + def __init__(self, allow_private: bool) -> None: + super().__init__() + self._allow_private = allow_private + + def https_open(self, req: urllib.request.Request) -> Any: # noqa: D102 + allow_private = self._allow_private + + def factory(host: str, **kwargs: Any) -> _PinnedHTTPSConnection: + return _PinnedHTTPSConnection(host, _allow_private=allow_private, **kwargs) + + return self.do_open( + factory, req, context=self._context, check_hostname=self._check_hostname + ) + + +def _same_origin(a: str, b: str) -> bool: + """True iff two URLs share scheme, host, and effective port.""" + pa, pb = urlsplit(a), urlsplit(b) + if pa.scheme != pb.scheme: + return False + if (pa.hostname or "").lower() != (pb.hostname or "").lower(): + return False + default = {"http": 80, "https": 443} + port_a = pa.port if pa.port is not None else default.get(pa.scheme) + port_b = pb.port if pb.port is not None else default.get(pb.scheme) + return port_a == port_b + + +class _GuardedRedirectHandler(urllib.request.HTTPRedirectHandler): + """Redirect handler that re-applies the egress floor and strips auth. + + Each redirect (1) must not exceed :data:`_MAX_REDIRECTS`, (2) runs the new + target through :func:`assert_url_egress_allowed`, and (3) drops the auth + headers when the redirect crosses origin so the upstream bearer token never + leaks to a different host. + """ + + max_redirections = _MAX_REDIRECTS + + def __init__(self, allow_private: bool) -> None: + super().__init__() + self._allow_private = allow_private + + def redirect_request(self, req, fp, code, msg, headers, newurl): + try: + assert_url_egress_allowed(newurl, allow_private=self._allow_private) + except EgressBlocked as exc: + raise urllib.error.HTTPError( + newurl, code, f"blocked redirect target: {exc}", headers, fp, + ) from exc + new = super().redirect_request(req, fp, code, msg, headers, newurl) + if new is not None and not _same_origin(req.full_url, newurl): + for key in ("Authorization", "Proxy-Authorization", "Cookie"): + new.remove_header(key) + new.remove_header(key.lower()) + return new + + +def build_guarded_opener(allow_private: bool = False) -> urllib.request.OpenerDirector: + """An ``OpenerDirector`` whose redirects are guarded and auth-stripped. + + Use this opener's ``open`` instead of ``urllib.request.urlopen`` so every + redirect hop is re-checked and cross-origin hops drop the auth header. The + pinned HTTP/HTTPS handlers re-resolve, re-validate, and pin the target IP + at connect time on every hop, so a DNS name that re-resolves to a blocked + address between the floor check and the socket connect (DNS rebinding) is + still refused. The initial URL is also checked by the caller with + :func:`assert_url_egress_allowed` for a fail-fast error before the request + is built. + """ + return urllib.request.build_opener( + _GuardedRedirectHandler(allow_private=allow_private), + _PinnedHTTPHandler(allow_private), + _PinnedHTTPSHandler(allow_private), + ) diff --git a/src/vaara/integrations/_mcp_notify.py b/src/vaara/integrations/_mcp_notify.py index 204e877..d05e22f 100644 --- a/src/vaara/integrations/_mcp_notify.py +++ b/src/vaara/integrations/_mcp_notify.py @@ -155,11 +155,28 @@ def register_session( prior.close() return state - def unregister_session(self, session_id: str) -> None: + def unregister_session( + self, session_id: str, expected: Optional["_SessionState"] = None, + ) -> None: + """Remove a session's state. + + When ``expected`` is given, the entry is popped only if it is still the + same ``_SessionState`` object. On reconnect, ``register_session`` + replaces the map entry with a fresh state and closes the prior one; the + old stream's ``finally`` then runs ``unregister_session``. Without the + identity check that finally would pop the NEW state, silently dropping + notifications for the live reconnected session. The identity check makes + the stale teardown a no-op. + """ with self._lock: - state = self._sessions.pop(session_id, None) - if state is not None: - state.close() + current = self._sessions.get(session_id) + if current is None: + return + if expected is not None and current is not expected: + # A newer session took this id; leave it registered. + return + state = self._sessions.pop(session_id) + state.close() def deliver( self, diff --git a/src/vaara/integrations/_mcp_upstream_http.py b/src/vaara/integrations/_mcp_upstream_http.py index 308f129..0848caa 100644 --- a/src/vaara/integrations/_mcp_upstream_http.py +++ b/src/vaara/integrations/_mcp_upstream_http.py @@ -33,6 +33,11 @@ import urllib.request from typing import Any, Callable, Iterator, Optional +from vaara.integrations._egress_guard import ( + EgressBlocked, + assert_url_egress_allowed, + build_guarded_opener, +) from vaara.integrations._mcp_upstream import ProxyError, strict_json_dumps logger = logging.getLogger(__name__) @@ -77,7 +82,24 @@ def __init__( url: str, headers: Optional[dict[str, str]] = None, on_notification: Optional[Callable[[dict], None]] = None, + allow_private_hosts: Optional[bool] = None, ) -> None: + # SSRF egress floor. Defaults SAFE: loopback / link-local / RFC1918 / + # ULA / cloud-metadata targets are refused before any socket opens, and + # the guarded opener re-checks every redirect hop and drops the auth + # header on a cross-origin redirect. allow_private_hosts opts a trusted + # internal host in; None falls back to the VAARA_MCP_ALLOW_PRIVATE_ + # UPSTREAM env flag. The metadata address is refused even when opted in. + if allow_private_hosts is None: + from vaara.integrations._egress_guard import _env_allows_private + + allow_private_hosts = _env_allows_private() + self._allow_private_hosts = allow_private_hosts + try: + assert_url_egress_allowed(url, allow_private=allow_private_hosts) + except EgressBlocked as exc: + raise ProxyError(str(exc)) from exc + self._opener = build_guarded_opener(allow_private=allow_private_hosts) self._url = url # Caller-supplied static headers (auth). Applied first so the transport # control headers always win on the keys the protocol owns. @@ -156,7 +178,14 @@ def _post(self, payload: dict, timeout: float) -> Any: headers = self._headers(accept="application/json, text/event-stream") req = urllib.request.Request(self._url, data=body, headers=headers, method="POST") try: - return urllib.request.urlopen(req, timeout=timeout) # noqa: S310 + # Guarded opener: redirects are re-checked against the egress floor, + # the auth header is dropped on a cross-origin hop, and the target + # IP is validated and pinned at connect time (DNS-rebind safe). + return self._opener.open(req, timeout=timeout) + except EgressBlocked as e: + # A rebind that flipped the name to a blocked address after the + # constructor's fail-fast check is caught here at connect time. + raise ProxyError(f"Upstream MCP server blocked by egress floor: {e}") from e except urllib.error.HTTPError as e: raise ProxyError( f"Upstream MCP server returned HTTP {e.code}: {self._error_snippet(e)}", @@ -332,7 +361,7 @@ def _read_standing_stream(self) -> None: headers["Last-Event-ID"] = self._listener_last_event_id req = urllib.request.Request(self._url, headers=headers, method="GET") try: - resp = urllib.request.urlopen(req, timeout=_SSE_READ_TIMEOUT_SECONDS) # noqa: S310 + resp = self._opener.open(req, timeout=_SSE_READ_TIMEOUT_SECONDS) except urllib.error.HTTPError as e: if e.code in (404, 405, 501): raise _ServerPushUnsupported from e diff --git a/src/vaara/integrations/mcp_proxy.py b/src/vaara/integrations/mcp_proxy.py index a5a957b..2d7a820 100644 --- a/src/vaara/integrations/mcp_proxy.py +++ b/src/vaara/integrations/mcp_proxy.py @@ -193,6 +193,7 @@ def __init__( upstreams: Optional[dict[str, list[str]]] = None, upstream_urls: Optional[dict[str, str]] = None, upstream_headers: Optional[dict[str, dict[str, str]]] = None, + allow_private_upstream_hosts: Optional[bool] = None, router: Optional[NotificationRouter] = None, ) -> None: if pipeline is not None: @@ -309,12 +310,16 @@ def __init__( ), ) for name, url in url_map.items(): + # SSRF egress floor defaults SAFE; allow_private_upstream_hosts (or + # the VAARA_MCP_ALLOW_PRIVATE_UPSTREAM env flag) opts a trusted + # internal host in. Refused targets raise at construction here. self._upstreams[name] = HttpUpstreamClient( url=url, headers=header_map.get(name), on_notification=( lambda msg, n=name: self._on_upstream_notification(n, msg) ), + allow_private_hosts=allow_private_upstream_hosts, ) if default_alias_target is not None: self._upstreams["default"] = self._upstreams[default_alias_target] @@ -605,7 +610,18 @@ async def mcp_endpoint( except ProxyError: logger.exception("Failed to forward HTTP notification") return Response(status_code=202) - response = proxy._handle_request(payload) + # _handle_request is a blocking sync call that waits on the + # upstream (up to its request timeout). Running it inline would + # park the event loop for the whole call, serialising every + # other POST /mcp, GET /mcp drain, and /health to concurrency 1. + # Offload to a worker thread. The per-request ContextVars set + # just above live on this task's context, which a bare + # to_thread target would not inherit, so copy the current + # context and run the handler inside it on the worker thread. + ctx = contextvars.copy_context() + response = await asyncio.to_thread( + ctx.run, proxy._handle_request, payload, + ) return JSONResponse(content=response) finally: _REQUEST_UPSTREAM.reset(upstream_token) @@ -705,12 +721,18 @@ async def mcp_sse_endpoint( except ValueError: resume_after = 0 loop = asyncio.get_running_loop() - state = http_router.register_session( + # my_state is THIS stream's session state. On a reconnect with the + # same Mcp-Session-Id, register_session installs a fresh state and + # closes this one; the unregister in the finally below is then + # identity-checked against my_state so the tearing-down old stream + # never pops the NEW state out from under the live reconnection. + my_state = http_router.register_session( session_id=session_value, upstream=upstream_name, tenant=(x_vaara_tenant or "").strip(), loop=loop, ) + state = my_state async def event_stream(): # enqueue populates both the buffer (for replay) and the queue @@ -750,7 +772,10 @@ async def event_stream(): ).encode("utf-8") last_yielded = event_id finally: - http_router.unregister_session(session_value) + # Identity-checked: only tear down the map entry if it is + # still this stream's state. A reconnect that already + # replaced it leaves the live session registered. + http_router.unregister_session(session_value, expected=my_state) return StreamingResponse( event_stream(), @@ -1471,6 +1496,21 @@ def main(argv: Optional[list[str]] = None) -> None: "multiple headers or slots. The slot NAME must match an --upstream-url." ), ) + parser.add_argument( + "--allow-private-upstream-hosts", + action="store_true", + # default None (not False) so an absent flag leaves the env opt-in + # VAARA_MCP_ALLOW_PRIVATE_UPSTREAM live; passing False here would + # shadow it and silently break the documented process-wide opt-in. + default=None, + help=( + "Permit --upstream-url targets that resolve to loopback, " + "link-local, RFC1918, or ULA addresses. OFF by default: such " + "targets are refused to block SSRF. The cloud-metadata address " + "stays refused even with this flag. Only set it for a trusted " + "internal upstream you control." + ), + ) parser.add_argument( "--transport", choices=["stdio", "http"], @@ -1585,6 +1625,7 @@ def main(argv: Optional[list[str]] = None) -> None: upstreams=upstreams if (legacy_single is None and upstreams) else None, upstream_urls=upstream_urls or None, upstream_headers=upstream_headers or None, + allow_private_upstream_hosts=args.allow_private_upstream_hosts, db_path=args.db, agent_id_default=args.agent_id, allowlist=tool_allow, denylist=tool_deny if tool_deny else None, @@ -1595,7 +1636,9 @@ def main(argv: Optional[list[str]] = None) -> None: overt_emitter=overt_emitter, attest_emitter=attest_emitter, ) - except ValueError as e: + except (ValueError, ProxyError) as e: + # ProxyError here means a --upstream-url target was refused by the SSRF + # egress floor at client construction; surface it as a clean CLI error. parser.error(str(e)) try: if args.transport == "http": diff --git a/tests/test_compliance_dashboard.py b/tests/test_compliance_dashboard.py index 8a4a7ef..7d9f86a 100644 --- a/tests/test_compliance_dashboard.py +++ b/tests/test_compliance_dashboard.py @@ -4,7 +4,9 @@ import re +from vaara.audit.sqlite_backend import SQLiteAuditBackend from vaara.audit.trail import AuditTrail +from vaara.cli import main from vaara.compliance.dashboard import render_html from vaara.compliance.engine import create_default_engine from vaara.taxonomy.actions import ( @@ -142,3 +144,50 @@ def test_html_surfaces_verdict_inputs_and_contributing_events(): assert "Threshold" in out assert "Observed" in out assert "Evidence record count" in out + + +def test_cli_dashboard_db_to_renderer_wiring(tmp_path): + """End-to-end CLI smoke: `vaara compliance dashboard` must load an audit + trail from a real SQLite DB and write a self-contained HTML report. + + The other tests call render_html directly. This exercises the wiring the + CLI owns (DB open, load_trail, engine.assess, render_html, file write), + which is exactly what regressed when the command imported a class name + the backend module never exported. + """ + db_path = tmp_path / "audit.db" + backend = SQLiteAuditBackend(str(db_path)) + try: + # Persist a populated trail to disk via the on_record callback so the + # CLI's load_trail() reconstructs real evidence, not an empty trail. + trail = _populated_trail() + for record in trail._records: + backend.write_record(record) + finally: + backend.close() + + out_path = tmp_path / "dashboard.html" + rc = main([ + "compliance", "dashboard", + "--db", str(db_path), + "--out", str(out_path), + "--system-name", "WiredSys", + "--system-version", "9.9", + ]) + assert rc == 0 + assert out_path.is_file() + html = out_path.read_text(encoding="utf-8") + assert html.startswith("") + assert html.endswith("") + assert "WiredSys" in html + assert "Article-level evidence report" in html + + +def test_cli_dashboard_missing_db_returns_error(tmp_path): + """A non-existent --db path is a clean exit-2, not a traceback.""" + rc = main([ + "compliance", "dashboard", + "--db", str(tmp_path / "nope.db"), + "--out", str(tmp_path / "out.html"), + ]) + assert rc == 2 diff --git a/tests/test_http_concurrency.py b/tests/test_http_concurrency.py new file mode 100644 index 0000000..c37fcb9 --- /dev/null +++ b/tests/test_http_concurrency.py @@ -0,0 +1,96 @@ +"""HTTP transport must not serialise concurrent POSTs on the event loop. + +`_handle_request` is a blocking sync call that waits on the upstream. Before +the to_thread offload it ran inline in the async endpoint, so two in-flight +POSTs serialised behind one another (real concurrency 1) and a slow upstream +stalled every other endpoint. These tests assert two concurrent POSTs overlap +in wall-clock, and that the per-request ContextVars (which select the upstream +and tag the tenant) survive the thread hop so each request still routes to its +own slot. +""" + +from __future__ import annotations + +import asyncio +import time +from unittest.mock import MagicMock + +import pytest + +try: + import httpx + from httpx import ASGITransport +except ImportError: + pytest.skip( + "server extra not installed (pip install 'vaara[server]')", + allow_module_level=True, + ) + +from vaara.integrations import mcp_proxy +from vaara.integrations.mcp_proxy import VaaraMCPProxy + +_SLEEP = 0.30 + + +def _build_app(proxy): + import unittest.mock as um + + with um.patch("uvicorn.run") as run_mock: + captured: dict = {} + run_mock.side_effect = lambda app, **kw: captured.__setitem__("app", app) + proxy.run_http(host="127.0.0.1", port=0) + return captured["app"] + + +def _slow_upstream(reply_id): + def _request(payload, *a, **kw): + time.sleep(_SLEEP) + return {"jsonrpc": "2.0", "id": payload["id"], "result": {"slot": reply_id}} + + client = MagicMock() + client.request.side_effect = _request + return client + + +async def _drive(app): + transport = ASGITransport(app=app) + async with httpx.AsyncClient( + transport=transport, base_url="http://test" + ) as client: + async def call(slot, rid): + return await client.post( + "/mcp", + json={"jsonrpc": "2.0", "id": rid, "method": "tools/list"}, + headers={"X-Vaara-Upstream": slot}, + ) + + start = time.perf_counter() + r_alpha, r_beta = await asyncio.gather(call("alpha", 1), call("beta", 2)) + elapsed = time.perf_counter() - start + return r_alpha, r_beta, elapsed + + +def test_concurrent_posts_overlap_and_keep_context(monkeypatch): + monkeypatch.setattr(mcp_proxy, "UpstreamMCPClient", MagicMock()) + proxy = VaaraMCPProxy( + upstreams={"alpha": ["cmd-a"], "beta": ["cmd-b"]}, + pipeline=MagicMock(), + ) + # Each slot answers slowly and stamps its own name into the result, so an + # overlapping pair that still routes correctly proves both concurrency and + # that _REQUEST_UPSTREAM survived the to_thread hop. + proxy._upstreams["alpha"] = _slow_upstream("alpha") + proxy._upstreams["beta"] = _slow_upstream("beta") + app = _build_app(proxy) + + r_alpha, r_beta, elapsed = asyncio.run(_drive(app)) + + assert r_alpha.status_code == 200 + assert r_beta.status_code == 200 + # Context survived the hop: each request reached its own slot. + assert r_alpha.json()["result"]["slot"] == "alpha" + assert r_beta.json()["result"]["slot"] == "beta" + # Overlap, not serialisation: two _SLEEP calls in well under 2*_SLEEP. + assert elapsed < (2 * _SLEEP) - 0.05, ( + f"requests serialised: {elapsed:.3f}s for two {_SLEEP}s calls" + ) diff --git a/tests/test_mcp_egress_guard.py b/tests/test_mcp_egress_guard.py new file mode 100644 index 0000000..4771522 --- /dev/null +++ b/tests/test_mcp_egress_guard.py @@ -0,0 +1,201 @@ +"""SSRF egress floor for the remote MCP HTTP connector. + +Asserts the connector refuses loopback / link-local / RFC1918 / ULA / +cloud-metadata upstream targets by default, re-checks every redirect hop, and +never carries the Authorization header to a cross-origin redirect target. +""" + +from __future__ import annotations + +import json +import threading +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer + +import pytest + +from vaara.integrations._egress_guard import ( + EgressBlocked, + _same_origin, + assert_url_egress_allowed, +) +from vaara.integrations._mcp_upstream import ProxyError +from vaara.integrations._mcp_upstream_http import HttpUpstreamClient + + +# -- host-resolution floor -------------------------------------------------- + + +@pytest.mark.parametrize( + "url", + [ + "http://169.254.169.254/latest/meta-data/", + "http://[fe80::a9fe:a9fe]/latest/meta-data/", + "http://2852039166/latest/meta-data/", # dotless decimal + "http://0xa9fea9fe/latest/meta-data/", # dotless hex + ], +) +def test_metadata_address_refused(url): + with pytest.raises(EgressBlocked): + assert_url_egress_allowed(url) + + +def test_metadata_refused_even_when_private_allowed(): + with pytest.raises(EgressBlocked): + assert_url_egress_allowed( + "http://169.254.169.254/latest/meta-data/", allow_private=True, + ) + with pytest.raises(EgressBlocked): + assert_url_egress_allowed("http://2852039166/", allow_private=True) + + +@pytest.mark.parametrize( + "url", + [ + "http://127.0.0.1/mcp", + "http://localhost/mcp", + "http://10.0.0.5/mcp", + "http://192.168.1.10/mcp", + "http://172.16.0.1/mcp", + "http://[::1]/mcp", + "http://[fc00::1]/mcp", # IPv6 ULA + "http://[fe80::1]/mcp", # IPv6 link-local + "http://[::ffff:169.254.169.254]/mcp", # IPv4-mapped metadata + ], +) +def test_private_and_loopback_refused_by_default(url): + with pytest.raises(EgressBlocked): + assert_url_egress_allowed(url) + + +@pytest.mark.parametrize( + "url", + ["http://10.0.0.5/mcp", "http://127.0.0.1/mcp", "http://[fc00::1]/mcp"], +) +def test_private_allowed_with_opt_in(url): + assert_url_egress_allowed(url, allow_private=True) + + +def test_public_host_allowed(): + # Literal public IP so the floor passes without a live DNS lookup. + assert_url_egress_allowed("https://8.8.8.8/mcp") + assert_url_egress_allowed("http://[2001:4860:4860::8888]/mcp") + + +def test_non_http_scheme_refused(): + with pytest.raises(EgressBlocked): + assert_url_egress_allowed("file:///etc/passwd") + with pytest.raises(EgressBlocked): + assert_url_egress_allowed("gopher://10.0.0.1/") + + +def test_same_origin_logic(): + assert _same_origin("http://a.com/x", "http://a.com/y") + assert _same_origin("http://a.com:80/x", "http://a.com/y") + assert not _same_origin("http://a.com/x", "https://a.com/x") + assert not _same_origin("http://a.com/x", "http://b.com/x") + assert not _same_origin("http://a.com:80/x", "http://a.com:81/x") + + +# -- connector-level blocking ---------------------------------------------- + + +def test_client_refuses_metadata_upstream(): + with pytest.raises(ProxyError): + HttpUpstreamClient("http://169.254.169.254/mcp") + + +def test_client_refuses_private_upstream_by_default(): + with pytest.raises(ProxyError): + HttpUpstreamClient("http://10.0.0.5/mcp") + + +def test_client_allows_private_with_opt_in(): + client = HttpUpstreamClient("http://10.0.0.5/mcp", allow_private_hosts=True) + client.close() + + +# -- redirect handling ------------------------------------------------------ + + +class _RedirectHandler(BaseHTTPRequestHandler): + def log_message(self, *args): + pass + + def _do(self): + cfg = self.server.cfg + cfg["last_headers"] = dict(self.headers) + location = cfg.get("location") + if location is not None: + # 302 so urllib follows it (it refuses to auto-follow 307 on POST); + # the redirect target is reached as a GET, which is all these tests + # need to observe the egress re-check and the cross-origin auth drop. + self.send_response(302) + self.send_header("Location", location) + self.end_headers() + return + body = json.dumps( + {"jsonrpc": "2.0", "id": cfg.get("want_id", 1), "result": {"ok": True}} + ).encode() + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + + do_POST = _do + do_GET = _do + + +@pytest.fixture +def redirect_server(): + httpd = ThreadingHTTPServer(("127.0.0.1", 0), _RedirectHandler) + httpd.daemon_threads = True + httpd.cfg = {} + threading.Thread(target=httpd.serve_forever, daemon=True).start() + url = f"http://127.0.0.1:{httpd.server_address[1]}/mcp" + try: + yield httpd, url + finally: + httpd.shutdown() + + +def test_redirect_to_private_ip_refused(redirect_server): + """A redirect to the metadata address is refused even when the original + host was opted in: the guard re-checks every hop.""" + httpd, url = redirect_server + httpd.cfg["location"] = "http://169.254.169.254/latest/meta-data/" + client = HttpUpstreamClient(url, allow_private_hosts=True) + try: + with pytest.raises(ProxyError): + client.request({"jsonrpc": "2.0", "id": 5, "method": "tools/list"}) + finally: + client.close() + + +def test_authorization_not_leaked_cross_origin(redirect_server): + """A cross-origin redirect must not carry the upstream Authorization header + to the new host.""" + target = ThreadingHTTPServer(("127.0.0.1", 0), _RedirectHandler) + target.daemon_threads = True + target.cfg = {"want_id": 9} + threading.Thread(target=target.serve_forever, daemon=True).start() + target_url = f"http://127.0.0.1:{target.server_address[1]}/mcp" + + httpd, url = redirect_server + httpd.cfg["location"] = target_url + + client = HttpUpstreamClient( + url, + headers={"Authorization": "Bearer secret-token"}, + allow_private_hosts=True, + ) + try: + reply = client.request({"jsonrpc": "2.0", "id": 9, "method": "tools/list"}) + assert reply["result"] == {"ok": True} + landed = target.cfg.get("last_headers", {}) + assert not any(k.lower() == "authorization" for k in landed) + first = httpd.cfg.get("last_headers", {}) + assert first.get("Authorization") == "Bearer secret-token" + finally: + client.close() + target.shutdown() diff --git a/tests/test_mcp_notify.py b/tests/test_mcp_notify.py index a73f777..c5871e6 100644 --- a/tests/test_mcp_notify.py +++ b/tests/test_mcp_notify.py @@ -182,3 +182,33 @@ def test_http_router_re_registration_replaces_session(): assert new.replay_since(0) == [(1, {"new": True})] finally: loop.close() + + +def test_stale_unregister_does_not_drop_reconnected_session(): + """The reconnect race: the old stream's teardown must not unregister the + NEW session that took its id. + + register_session("sess-a") returns old; a reconnect registers new under the + same id and closes old. When old's SSE finally block runs unregister with + its own state as the identity guard, the map entry (now new) is left intact + so delivery to sess-a still reaches the live reconnected session. + """ + router = HttpRouter(replay_buffer_size=10) + loop = _new_loop() + try: + old = router.register_session("sess-a", "alpha", "", loop) + new = router.register_session("sess-a", "alpha", "", loop) + # Old stream tears down and runs its identity-checked unregister. + router.unregister_session("sess-a", expected=old) + assert router.session_count() == 1, "stale teardown dropped the live session" + # The live session still receives a targeted notification. + router.deliver({"jsonrpc": "2.0", "method": "notifications/x"}, + session_id="sess-a", upstream="alpha") + assert new.replay_since(0) == [ + (1, {"jsonrpc": "2.0", "method": "notifications/x"}), + ] + # An unguarded unregister still works for the normal teardown path. + router.unregister_session("sess-a", expected=new) + assert router.session_count() == 0 + finally: + loop.close() diff --git a/tests/test_mcp_upstream_http.py b/tests/test_mcp_upstream_http.py index d5425b4..fbdf8d9 100644 --- a/tests/test_mcp_upstream_http.py +++ b/tests/test_mcp_upstream_http.py @@ -18,6 +18,16 @@ from vaara.integrations._mcp_upstream_http import HttpUpstreamClient +@pytest.fixture(autouse=True) +def _allow_loopback_upstream(monkeypatch): + """The fake MCP servers in this module bind loopback, which the SSRF egress + floor refuses by default. Opt in process-wide for the connector tests; the + dedicated egress tests below construct with allow_private_hosts=False to + assert the blocking path explicitly. + """ + monkeypatch.setenv("VAARA_MCP_ALLOW_PRIVATE_UPSTREAM", "1") + + class _Handler(BaseHTTPRequestHandler): def log_message(self, *args): # silence the test server pass diff --git a/tests/test_mcp_upstream_rebind.py b/tests/test_mcp_upstream_rebind.py new file mode 100644 index 0000000..3d61686 --- /dev/null +++ b/tests/test_mcp_upstream_rebind.py @@ -0,0 +1,112 @@ +"""DNS-rebind regression tests for the upstream egress floor. + +The host-resolution floor (`assert_url_egress_allowed`) validates the IPs a +name resolves to, but on its own it then hands the *name* back to urllib, +which re-resolves at socket-connect time. A time-split rebind (public at the +check, a blocked address at the connect) would slip through that gap. The fix +validates and pins the IP at connect time and dials the IP literal, so the +address that passed the floor is the exact address the socket reaches. + +These tests drive `socket.getaddrinfo`/`socket.create_connection` directly so +no real network is touched. +""" + +from __future__ import annotations + +import socket + +import pytest + +from vaara.integrations._egress_guard import EgressBlocked, pick_egress_ip +from vaara.integrations._mcp_upstream import ProxyError +from vaara.integrations._mcp_upstream_http import HttpUpstreamClient + +PUBLIC = "93.184.216.34" +METADATA = "169.254.169.254" + + +def _addrinfo(ip: str, port): + return [(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP, "", (ip, port or 0))] + + +def _sequence_resolver(*ips: str): + """getaddrinfo stand-in yielding ips[i] on the i-th call, last repeats.""" + state = {"n": 0} + + def fake(host, port, *args, **kwargs): + idx = min(state["n"], len(ips) - 1) + state["n"] += 1 + return _addrinfo(ips[idx], port) + + return fake, state + + +def _recording_connect(targets: list[str]): + def fake(address, *args, **kwargs): + targets.append(address[0]) + raise ConnectionRefusedError("rebind-test: no real socket opened") + + return fake + + +# -- connector-level rebind defence ----------------------------------------- + + +def test_rebind_after_preflight_is_blocked_at_connect(monkeypatch): + """Public at the constructor check, metadata at connect: refused, no socket.""" + resolver, _ = _sequence_resolver(PUBLIC, METADATA) + targets: list[str] = [] + monkeypatch.setattr(socket, "getaddrinfo", resolver) + monkeypatch.setattr(socket, "create_connection", _recording_connect(targets)) + + client = HttpUpstreamClient("http://rebind.test:8765/mcp", allow_private_hosts=False) + with pytest.raises(ProxyError): + client.request({"jsonrpc": "2.0", "id": 1, "method": "ping"}) + # The floor refused the rebound address before any socket was opened. + assert METADATA not in targets + assert targets == [] + client.close() + + +def test_connect_dials_validated_ip_literal(monkeypatch): + """The socket is opened to the validated IP literal, never the hostname.""" + resolver, _ = _sequence_resolver(PUBLIC) # public on every resolution + targets: list[str] = [] + monkeypatch.setattr(socket, "getaddrinfo", resolver) + monkeypatch.setattr(socket, "create_connection", _recording_connect(targets)) + + client = HttpUpstreamClient("http://pin.test:8765/mcp", allow_private_hosts=False) + with pytest.raises(ProxyError): # ConnectionRefusedError from the recorder + client.request({"jsonrpc": "2.0", "id": 1, "method": "ping"}) + # Exactly one connect, to the literal we validated, not a re-resolved name. + assert targets == [PUBLIC] + client.close() + + +# -- pick_egress_ip unit behaviour ------------------------------------------ + + +def test_pick_egress_ip_pins_first_public(monkeypatch): + resolver, _ = _sequence_resolver(PUBLIC) + monkeypatch.setattr(socket, "getaddrinfo", resolver) + assert pick_egress_ip("example.test", 443, allow_private=False) == PUBLIC + + +def test_pick_egress_ip_refuses_mixed_public_and_metadata(monkeypatch): + """A single answer set containing a blocked address is refused outright.""" + + def resolver(host, port, *args, **kwargs): + return _addrinfo(PUBLIC, port) + _addrinfo(METADATA, port) + + monkeypatch.setattr(socket, "getaddrinfo", resolver) + with pytest.raises(EgressBlocked): + pick_egress_ip("mixed.test", 443, allow_private=False) + + +def test_pick_egress_ip_returns_literals_without_resolving(): + assert pick_egress_ip("8.8.8.8", 443, allow_private=False) == "8.8.8.8" + assert pick_egress_ip("127.0.0.1", 80, allow_private=True) == "127.0.0.1" + with pytest.raises(EgressBlocked): # loopback blocked without opt-in + pick_egress_ip("127.0.0.1", 80, allow_private=False) + with pytest.raises(EgressBlocked): # metadata blocked even under opt-in + pick_egress_ip("169.254.169.254", 80, allow_private=True)