-
Notifications
You must be signed in to change notification settings - Fork 431
Closed
Labels
bugSomething isn't workingSomething isn't working
Description
Version
1.3
Which installation method(s) does this occur on?
Source
Describe the bug.
Fix race: cleanup can remove an in-use session.
_cleanup_inactive_sessions removes entries after checking ref_count without coordinating with session usage increments. _session_usage_context increments ref_count outside the RWLock, so writer cleanup can win the race.
- Acquire the per‑session lock in cleanup before removal, then re‑check ref_count.
- In _session_usage_context, use the RWLock reader while fetching session_data and incrementing ref_count to block writer cleanup.
Proposed changes:
async def _cleanup_inactive_sessions(self, max_age: timedelta | None = None):
@@
- async with self._session_rwlock.writer:
+ async with self._session_rwlock.writer:
current_time = datetime.now()
inactive_sessions = []
@@
- for session_id in inactive_sessions:
+ for session_id in inactive_sessions:
try:
- logger.info("Cleaning up inactive session client: %s", truncate_session_id(session_id))
- session_data = self._sessions[session_id]
+ logger.info("Cleaning up inactive session client: %s", truncate_session_id(session_id))
+ session_data = self._sessions[session_id]
+ # Coordinate with in-flight users
+ async with session_data.lock:
+ if session_data.ref_count > 0:
+ # Became active; skip
+ continue
# Close the client connection
await session_data.client.__aexit__(None, None, None)And:
@asynccontextmanager
async def _session_usage_context(self, session_id: str):
@@
- # Ensure session exists - create it if it doesn't
- if session_id not in self._sessions:
- # Create session client first
- await self._get_session_client(session_id)
- # Session should now exist in _sessions
+ # Ensure session exists - create it if it doesn't
+ if session_id not in self._sessions:
+ await self._get_session_client(session_id)
@@
- # Get session data (session must exist at this point)
- session_data = self._sessions[session_id]
+ # Get session data under reader lock to block writer cleanup
+ async with self._session_rwlock.reader:
+ session_data = self._sessions[session_id]
@@
- async with session_data.lock:
+ async with session_data.lock:
session_data.ref_count += 1
@@
- async with session_data.lock:
+ async with session_data.lock:
session_data.ref_count -= 1📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
async def _cleanup_inactive_sessions(self, max_age: timedelta | None = None):
"""Remove clients for sessions inactive longer than max_age.
This method uses the RWLock writer to ensure thread-safe cleanup.
"""
if max_age is None:
max_age = (
self._client_config.session_idle_timeout
if self._client_config
else timedelta(hours=1)
)
async with self._session_rwlock.writer:
current_time = datetime.now()
inactive_sessions: list[str] = []
for session_id, session_data in self._sessions.items():
# Skip cleanup if session is actively being used
if session_data.ref_count > 0:
continue
if current_time - session_data.last_activity > max_age:
inactive_sessions.append(session_id)
for session_id in inactive_sessions:
try:
logger.info(
"Cleaning up inactive session client: %s",
truncate_session_id(session_id),
)
session_data = self._sessions[session_id]
# Coordinate with in-flight users
async with session_data.lock:
if session_data.ref_count > 0:
# Became active; skip closing
continue
# Close the client connection
await session_data.client.__aexit__(None, None, None)
logger.info(
"Cleaned up inactive session client: %s",
truncate_session_id(session_id),
)
except Exception as e:
logger.warning(
"Error cleaning up session client %s: %s",
truncate_session_id(session_id),
e,
)
finally:
# Always remove from tracking to prevent leaks, even if close failed
self._sessions.pop(session_id, None)
logger.info(
"Cleaned up session tracking for: %s",
truncate_session_id(session_id),
)
logger.info(" Total sessions: %d", len(self._sessions))
@asynccontextmanager
async def _session_usage_context(self, session_id: str):
"""Context manager for using a session: bumps ref_count while in use."""
# Ensure session exists - create it if it doesn't
if session_id not in self._sessions:
await self._get_session_client(session_id)
# Grab a reader lock to block writer cleanup while we fetch & bump
async with self._session_rwlock.reader:
session_data = self._sessions[session_id]
async with session_data.lock:
session_data.ref_count += 1
try:
yield session_data.client
finally:
async with session_data.lock:
session_data.ref_count -= 1
🧰 Tools
🪛 Ruff (0.13.3)
176-176: Do not catch blind exception: Exception
(BLE001)
🤖 Prompt for AI Agents
In packages/nvidia_nat_mcp/src/nat/plugins/mcp/client_impl.py around lines
149-183, cleanup can remove a session that becomes in-use because ref_count is
checked and the session removed while _session_usage_context increments
ref_count outside the RWLock; to fix, in _cleanup_inactive_sessions acquire the
session's per-session lock (or otherwise lock the session_data) before
re-checking ref_count and performing close/remove so you never remove an
actively used session, and in _session_usage_context take the RWLock reader (or
otherwise hold a lock that blocks writer cleanup) while fetching session_data
and incrementing ref_count so the writer cleanup cannot race and remove the
session between the check and increment.
Originally posted by @coderabbitai[bot] in #919 (comment)
Minimum reproducible example
Relevant log output
Click here to see error details
[Paste the error here, it will be hidden by default]
Other/Misc.
No response
Code of Conduct
- I agree to follow the NeMo Agent toolkit Code of Conduct
- I have searched the open bugs and have found no duplicates for this bug report
Metadata
Metadata
Assignees
Labels
bugSomething isn't workingSomething isn't working