Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 34 additions & 16 deletions src/fastmcp/client/transports.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
)
from mcp.client.sse import sse_client
from mcp.client.stdio import stdio_client
from mcp.client.streamable_http import streamablehttp_client
from mcp.client.streamable_http import streamable_http_client
from mcp.server.fastmcp import FastMCP as FastMCP1Server
from mcp.shared._httpx_utils import McpHttpClientFactory
from mcp.shared.memory import create_client_server_memory_streams
Expand Down Expand Up @@ -252,6 +252,16 @@ def __init__(
self.httpx_client_factory = httpx_client_factory
self._set_auth(auth)

if sse_read_timeout is not None:
if fastmcp.settings.deprecation_warnings:
warnings.warn(
"The `sse_read_timeout` parameter is deprecated and no longer used. "
"The new streamable_http_client API does not support this parameter. "
"Use `read_timeout_seconds` in session_kwargs or configure timeout on "
"the httpx client via `httpx_client_factory` instead.",
DeprecationWarning,
stacklevel=2,
)
if isinstance(sse_read_timeout, int | float):
sse_read_timeout = datetime.timedelta(seconds=float(sse_read_timeout))
self.sse_read_timeout = sse_read_timeout
Expand All @@ -269,28 +279,36 @@ def _set_auth(self, auth: httpx.Auth | Literal["oauth"] | str | None):
async def connect_session(
self, **session_kwargs: Unpack[SessionKwargs]
) -> AsyncIterator[ClientSession]:
client_kwargs: dict[str, Any] = {}

# load headers from an active HTTP request, if available. This will only be true
# Load headers from an active HTTP request, if available. This will only be true
# if the client is used in a FastMCP Proxy, in which case the MCP client headers
# need to be forwarded to the remote server.
client_kwargs["headers"] = get_http_headers() | self.headers
headers = get_http_headers() | self.headers

# sse_read_timeout has a default value set, so we can't pass None without overriding it
# instead we simply leave the kwarg out if it's not provided
if self.sse_read_timeout is not None:
client_kwargs["sse_read_timeout"] = self.sse_read_timeout
# Build httpx client configuration
httpx_client_kwargs: dict[str, Any] = {
"headers": headers,
"auth": self.auth,
"follow_redirects": True,
}

# Configure timeout if provided (convert timedelta to seconds for httpx)
if session_kwargs.get("read_timeout_seconds") is not None:
client_kwargs["timeout"] = session_kwargs.get("read_timeout_seconds")
read_timeout_seconds = cast(
datetime.timedelta, session_kwargs.get("read_timeout_seconds")
)
httpx_client_kwargs["timeout"] = read_timeout_seconds.total_seconds()

# Create httpx client from factory or use default
if self.httpx_client_factory is not None:
client_kwargs["httpx_client_factory"] = self.httpx_client_factory
http_client = self.httpx_client_factory(**httpx_client_kwargs)
else:
http_client = httpx.AsyncClient(**httpx_client_kwargs)

async with streamablehttp_client(
self.url,
auth=self.auth,
**client_kwargs,
) as transport:
# Ensure httpx client is closed after use
async with (
http_client,
streamable_http_client(self.url, http_client=http_client) as transport,
):
read_stream, write_stream, get_session_id = transport
self._get_session_id_cb = get_session_id
async with ClientSession(
Expand Down