diff --git a/python/src/mcp_server/mcp_server.py b/python/src/mcp_server/mcp_server.py index f4796acbaa..4819ed2747 100644 --- a/python/src/mcp_server/mcp_server.py +++ b/python/src/mcp_server/mcp_server.py @@ -14,11 +14,11 @@ API service and frontend, not through MCP tools. """ +import asyncio import json import logging import os import sys -import threading import time import traceback from collections.abc import AsyncIterator @@ -64,7 +64,10 @@ from src.server.services.mcp_session_manager import get_session_manager # Global initialization lock and flag -_initialization_lock = threading.Lock() +# NOTE: asyncio.Lock is event-loop bound - safe for single Uvicorn worker (FastMCP's +# intended deployment), but not across threads/loops. If multi-worker or cross-thread +# access is needed, use threading.Lock with run_in_executor for async compatibility. +_initialization_lock = asyncio.Lock() _initialization_complete = False _shared_context = None @@ -134,58 +137,70 @@ async def perform_health_checks(context: ArchonContext): async def lifespan(server: FastMCP) -> AsyncIterator[ArchonContext]: """ Lifecycle manager - no heavy dependencies. + Uses asyncio.Lock to avoid blocking the event loop. """ global _initialization_complete, _shared_context - # Quick check without lock - if _initialization_complete and _shared_context: + # Quick check without lock (use 'is not None' for safety) + if _initialization_complete and _shared_context is not None: logger.info("โ™ป๏ธ Reusing existing context for new SSE connection") yield _shared_context return - # Acquire lock for initialization - with _initialization_lock: - # Double-check pattern - if _initialization_complete and _shared_context: - logger.info("โ™ป๏ธ Reusing existing context for new SSE connection") - yield _shared_context - return + # Capture context locally to avoid race between lock release and yield + ctx: ArchonContext | None = None - logger.info("๐Ÿš€ Starting MCP server...") - - try: - # Initialize session manager - logger.info("๐Ÿ” Initializing session manager...") - session_manager = get_session_manager() - logger.info("โœ“ Session manager initialized") - - # Initialize service client for HTTP calls - logger.info("๐ŸŒ Initializing service client...") - service_client = get_mcp_service_client() - logger.info("โœ“ Service client initialized") - - # Create context - context = ArchonContext(service_client=service_client) - - # Perform initial health check - await perform_health_checks(context) - - logger.info("โœ“ MCP server ready") - - # Store context globally - _shared_context = context - _initialization_complete = True - - yield context - - except Exception as e: - logger.error(f"๐Ÿ’ฅ Critical error in lifespan setup: {e}") - logger.error(traceback.format_exc()) - raise - finally: - # Clean up resources - logger.info("๐Ÿงน Cleaning up MCP server...") - logger.info("โœ… MCP server shutdown complete") + # Acquire async lock for initialization (doesn't block event loop) + async with _initialization_lock: + # Double-check pattern (use 'is not None' for safety) + if _initialization_complete and _shared_context is not None: + logger.info("โ™ป๏ธ Reusing existing context for new SSE connection (after lock)") + ctx = _shared_context + else: + logger.info("๐Ÿš€ Starting MCP server...") + + try: + # Initialize session manager + # NOTE: These sync calls run inside the lock. They don't cause deadlock + # (asyncio.Lock yields to other coroutines), but slow init here delays + # concurrent connection attempts. Consider run_in_executor if these + # become I/O-bound bottlenecks under load. + logger.info("๐Ÿ” Initializing session manager...") + session_manager = get_session_manager() + logger.info("โœ“ Session manager initialized") + + # Initialize service client for HTTP calls + logger.info("๐ŸŒ Initializing service client...") + service_client = get_mcp_service_client() + logger.info("โœ“ Service client initialized") + + # Create context + ctx = ArchonContext(service_client=service_client) + + # Perform initial health check + await perform_health_checks(ctx) + + logger.info("โœ“ MCP server ready") + + # Store context globally (assign last for atomicity) + _shared_context = ctx + _initialization_complete = True + + except Exception as e: + logger.error(f"๐Ÿ’ฅ Critical error in lifespan setup: {e}") + logger.error(traceback.format_exc()) + raise + + # Yield outside the lock so we don't block other connections + # Guard: If _shared_context were cleared between lock release and yield (e.g., by a + # shutdown path), ctx would be None. Fail fast rather than yield invalid context. + if ctx is None: + raise RuntimeError("MCP context initialization failed - ctx is None") + try: + yield ctx + finally: + # Clean up resources (only log, don't actually clean up shared context) + logger.info("๐Ÿ”„ Connection closed (shared context remains)") # Define MCP instructions for Claude Code and other clients