diff --git a/src/fastmcp/client/client.py b/src/fastmcp/client/client.py index 4d773e36e..7478eb894 100644 --- a/src/fastmcp/client/client.py +++ b/src/fastmcp/client/client.py @@ -74,6 +74,24 @@ class Client(Generic[ClientTransportT]): handles connection establishment and management. Client provides methods for working with resources, prompts, tools and other MCP capabilities. + This client supports reentrant context managers (multiple concurrent + `async with client:` blocks) using reference counting and background session + management. This allows efficient session reuse in any scenario with + nested or concurrent client usage. + + MCP SDK 1.10 introduced automatic list_tools() calls during call_tool() + execution. This created a race condition where events could be reset while + other tasks were waiting on them, causing deadlocks. The issue was exposed + in proxy scenarios but affects any reentrant usage. + + The solution uses reference counting to track active context managers, + a background task to manage the session lifecycle, events to coordinate + between tasks, and ensures all session state changes happen within a lock. + Events are only created when needed, never reset outside locks. + + See: https://github.com/jlowin/fastmcp/issues/1051 + https://github.com/jlowin/fastmcp/pull/1054 + Args: transport: Connection source specification, which can be: - ClientTransport: Direct transport instance @@ -214,14 +232,15 @@ def __init__( elicitation_handler ) - # session context management - self._session: ClientSession | None = None - self._exit_stack: AsyncExitStack | None = None - self._nesting_counter: int = 0 - self._context_lock = anyio.Lock() - self._session_task: asyncio.Task | None = None - self._ready_event = anyio.Event() - self._stop_event = anyio.Event() + # Session context management - see class docstring for detailed explanation + self._session: ClientSession | None = None # Active MCP session + self._nesting_counter: int = 0 # Reference count for active context managers + self._context_lock = anyio.Lock() # Protects all session state changes + self._session_task: asyncio.Task | None = ( + None # Background session manager task + ) + self._ready_event = anyio.Event() # Signals when session is ready for use + self._stop_event = anyio.Event() # Signals when session should stop @property def session(self) -> ClientSession: @@ -291,11 +310,26 @@ async def __aexit__(self, exc_type, exc_val, exc_tb): await self._disconnect() async def _connect(self): + """ + Establish or reuse a session connection. + + This method implements the reentrant context manager pattern: + - First call: Creates background session task and waits for it to be ready + - Subsequent calls: Increments reference counter and reuses existing session + - All operations protected by _context_lock to prevent race conditions + + The critical fix: Events are only created when starting a new session, + never reset outside the lock, preventing the deadlock scenario where + tasks wait on events that get replaced by other tasks. + """ # ensure only one session is running at a time to avoid race conditions async with self._context_lock: need_to_start = self._session_task is None or self._session_task.done() if need_to_start: - assert self._nesting_counter == 0 + if self._nesting_counter != 0: + raise RuntimeError( + f"Internal error: nesting counter should be 0 when starting new session, got {self._nesting_counter}" + ) self._stop_event = anyio.Event() self._ready_event = anyio.Event() self._session_task = asyncio.create_task(self._session_runner()) @@ -303,7 +337,10 @@ async def _connect(self): if self._session_task.done(): exception = self._session_task.exception() - assert exception is not None + if exception is None: + raise RuntimeError( + "Session task completed without exception but connection failed" + ) if isinstance(exception, httpx.HTTPStatusError): raise exception raise RuntimeError( @@ -314,6 +351,19 @@ async def _connect(self): return self async def _disconnect(self, force: bool = False): + """ + Disconnect from session using reference counting. + + This method implements proper cleanup for reentrant context managers: + - Decrements reference counter for normal exits + - Only stops session when counter reaches 0 (no more active contexts) + - Force flag bypasses reference counting for immediate shutdown + - Session cleanup happens inside the lock to ensure atomicity + + Key fix: Removed the problematic "Reset for future reconnects" logic + that was resetting events outside the lock, causing race conditions. + Event recreation now happens only in _connect() when actually needed. + """ # ensure only one session is running at a time to avoid race conditions async with self._context_lock: # if we are forcing a disconnect, reset the nesting counter @@ -337,6 +387,19 @@ async def _disconnect(self, force: bool = False): self._session_task = None async def _session_runner(self): + """ + Background task that manages the actual session lifecycle. + + This task runs in the background and: + 1. Establishes the transport connection via _context_manager() + 2. Signals that the session is ready via _ready_event.set() + 3. Waits for disconnect signal via _stop_event.wait() + 4. Ensures _ready_event is always set, even on failures + + The simplified error handling (compared to the original) removes + redundant exception re-raising while ensuring waiting tasks are + always unblocked via the finally block. + """ try: async with AsyncExitStack() as stack: await stack.enter_async_context(self._context_manager())