Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 73 additions & 10 deletions src/fastmcp/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,24 @@ class Client(Generic[ClientTransportT]):
handles connection establishment and management. Client provides methods for
working with resources, prompts, tools and other MCP capabilities.

This client supports reentrant context managers (multiple concurrent
`async with client:` blocks) using reference counting and background session
management. This allows efficient session reuse in any scenario with
nested or concurrent client usage.

MCP SDK 1.10 introduced automatic list_tools() calls during call_tool()
execution. This created a race condition where events could be reset while
other tasks were waiting on them, causing deadlocks. The issue was exposed
in proxy scenarios but affects any reentrant usage.

The solution uses reference counting to track active context managers,
a background task to manage the session lifecycle, events to coordinate
between tasks, and ensures all session state changes happen within a lock.
Events are only created when needed, never reset outside locks.

See: https://github.com/jlowin/fastmcp/issues/1051
https://github.com/jlowin/fastmcp/pull/1054

Args:
transport: Connection source specification, which can be:
- ClientTransport: Direct transport instance
Expand Down Expand Up @@ -214,14 +232,15 @@ def __init__(
elicitation_handler
)

# session context management
self._session: ClientSession | None = None
self._exit_stack: AsyncExitStack | None = None
self._nesting_counter: int = 0
self._context_lock = anyio.Lock()
self._session_task: asyncio.Task | None = None
self._ready_event = anyio.Event()
self._stop_event = anyio.Event()
# Session context management - see class docstring for detailed explanation
self._session: ClientSession | None = None # Active MCP session
self._nesting_counter: int = 0 # Reference count for active context managers
self._context_lock = anyio.Lock() # Protects all session state changes
self._session_task: asyncio.Task | None = (
None # Background session manager task
)
self._ready_event = anyio.Event() # Signals when session is ready for use
self._stop_event = anyio.Event() # Signals when session should stop

@property
def session(self) -> ClientSession:
Expand Down Expand Up @@ -291,19 +310,37 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):
await self._disconnect()

async def _connect(self):
"""
Establish or reuse a session connection.

This method implements the reentrant context manager pattern:
- First call: Creates background session task and waits for it to be ready
- Subsequent calls: Increments reference counter and reuses existing session
- All operations protected by _context_lock to prevent race conditions

The critical fix: Events are only created when starting a new session,
never reset outside the lock, preventing the deadlock scenario where
tasks wait on events that get replaced by other tasks.
"""
# ensure only one session is running at a time to avoid race conditions
async with self._context_lock:
need_to_start = self._session_task is None or self._session_task.done()
if need_to_start:
assert self._nesting_counter == 0
if self._nesting_counter != 0:
raise RuntimeError(
f"Internal error: nesting counter should be 0 when starting new session, got {self._nesting_counter}"
)
self._stop_event = anyio.Event()
self._ready_event = anyio.Event()
self._session_task = asyncio.create_task(self._session_runner())
await self._ready_event.wait()

if self._session_task.done():
exception = self._session_task.exception()
assert exception is not None
if exception is None:
raise RuntimeError(
"Session task completed without exception but connection failed"
)
if isinstance(exception, httpx.HTTPStatusError):
raise exception
raise RuntimeError(
Expand All @@ -314,6 +351,19 @@ async def _connect(self):
return self

async def _disconnect(self, force: bool = False):
"""
Disconnect from session using reference counting.

This method implements proper cleanup for reentrant context managers:
- Decrements reference counter for normal exits
- Only stops session when counter reaches 0 (no more active contexts)
- Force flag bypasses reference counting for immediate shutdown
- Session cleanup happens inside the lock to ensure atomicity

Key fix: Removed the problematic "Reset for future reconnects" logic
that was resetting events outside the lock, causing race conditions.
Event recreation now happens only in _connect() when actually needed.
"""
# ensure only one session is running at a time to avoid race conditions
async with self._context_lock:
# if we are forcing a disconnect, reset the nesting counter
Expand All @@ -337,6 +387,19 @@ async def _disconnect(self, force: bool = False):
self._session_task = None

async def _session_runner(self):
"""
Background task that manages the actual session lifecycle.

This task runs in the background and:
1. Establishes the transport connection via _context_manager()
2. Signals that the session is ready via _ready_event.set()
3. Waits for disconnect signal via _stop_event.wait()
4. Ensures _ready_event is always set, even on failures

The simplified error handling (compared to the original) removes
redundant exception re-raising while ensuring waiting tasks are
always unblocked via the finally block.
"""
try:
async with AsyncExitStack() as stack:
await stack.enter_async_context(self._context_manager())
Expand Down
Loading