diff --git a/src/fastmcp/client/transports.py b/src/fastmcp/client/transports.py index cd565bbf59..2eaede2a1b 100644 --- a/src/fastmcp/client/transports.py +++ b/src/fastmcp/client/transports.py @@ -23,7 +23,7 @@ ) from mcp.client.sse import sse_client from mcp.client.stdio import stdio_client -from mcp.client.streamable_http import streamablehttp_client +from mcp.client.streamable_http import streamable_http_client from mcp.server.fastmcp import FastMCP as FastMCP1Server from mcp.shared._httpx_utils import McpHttpClientFactory from mcp.shared.memory import create_client_server_memory_streams @@ -252,6 +252,16 @@ def __init__( self.httpx_client_factory = httpx_client_factory self._set_auth(auth) + if sse_read_timeout is not None: + if fastmcp.settings.deprecation_warnings: + warnings.warn( + "The `sse_read_timeout` parameter is deprecated and no longer used. " + "The new streamable_http_client API does not support this parameter. " + "Use `read_timeout_seconds` in session_kwargs or configure timeout on " + "the httpx client via `httpx_client_factory` instead.", + DeprecationWarning, + stacklevel=2, + ) if isinstance(sse_read_timeout, int | float): sse_read_timeout = datetime.timedelta(seconds=float(sse_read_timeout)) self.sse_read_timeout = sse_read_timeout @@ -269,28 +279,36 @@ def _set_auth(self, auth: httpx.Auth | Literal["oauth"] | str | None): async def connect_session( self, **session_kwargs: Unpack[SessionKwargs] ) -> AsyncIterator[ClientSession]: - client_kwargs: dict[str, Any] = {} - - # load headers from an active HTTP request, if available. This will only be true + # Load headers from an active HTTP request, if available. This will only be true # if the client is used in a FastMCP Proxy, in which case the MCP client headers # need to be forwarded to the remote server. - client_kwargs["headers"] = get_http_headers() | self.headers + headers = get_http_headers() | self.headers - # sse_read_timeout has a default value set, so we can't pass None without overriding it - # instead we simply leave the kwarg out if it's not provided - if self.sse_read_timeout is not None: - client_kwargs["sse_read_timeout"] = self.sse_read_timeout + # Build httpx client configuration + httpx_client_kwargs: dict[str, Any] = { + "headers": headers, + "auth": self.auth, + "follow_redirects": True, + } + + # Configure timeout if provided (convert timedelta to seconds for httpx) if session_kwargs.get("read_timeout_seconds") is not None: - client_kwargs["timeout"] = session_kwargs.get("read_timeout_seconds") + read_timeout_seconds = cast( + datetime.timedelta, session_kwargs.get("read_timeout_seconds") + ) + httpx_client_kwargs["timeout"] = read_timeout_seconds.total_seconds() + # Create httpx client from factory or use default if self.httpx_client_factory is not None: - client_kwargs["httpx_client_factory"] = self.httpx_client_factory + http_client = self.httpx_client_factory(**httpx_client_kwargs) + else: + http_client = httpx.AsyncClient(**httpx_client_kwargs) - async with streamablehttp_client( - self.url, - auth=self.auth, - **client_kwargs, - ) as transport: + # Ensure httpx client is closed after use + async with ( + http_client, + streamable_http_client(self.url, http_client=http_client) as transport, + ): read_stream, write_stream, get_session_id = transport self._get_session_id_cb = get_session_id async with ClientSession(