From 8e927fbfed6457cf1e156e92bcfcc340814f5d21 Mon Sep 17 00:00:00 2001 From: Chris Guidry Date: Fri, 9 Jan 2026 11:27:15 -0500 Subject: [PATCH 01/12] Add debug logging for task lifecycle --- src/fastmcp/server/server.py | 62 ++++++++++++++++++++++++++++ src/fastmcp/server/tasks/handlers.py | 18 +++++++- 2 files changed, 79 insertions(+), 1 deletion(-) diff --git a/src/fastmcp/server/server.py b/src/fastmcp/server/server.py index 295e2aa70a..a8e9a64779 100644 --- a/src/fastmcp/server/server.py +++ b/src/fastmcp/server/server.py @@ -385,6 +385,8 @@ def docket(self) -> Docket | None: @asynccontextmanager async def _docket_lifespan(self) -> AsyncIterator[None]: """Manage Docket instance and Worker for background task execution.""" + import socket + from fastmcp import settings # Set FastMCP server in ContextVar so CurrentFastMCP can access it (use weakref to avoid reference cycles) @@ -394,16 +396,26 @@ async def _docket_lifespan(self) -> AsyncIterator[None]: _current_worker, ) + # Get instance identifier for debugging Lambda lifecycle issues + instance_id = f"{socket.gethostname()}#{id(self)}" + logger.info(f"[{instance_id}] _docket_lifespan ENTERING") + server_token = _current_server.set(weakref.ref(self)) + logger.info(f"[{instance_id}] _current_server SET (token={server_token})") try: # For directly mounted servers, the parent's Docket/Worker handles all # task execution. Skip creating our own to avoid race conditions with # multiple workers competing for tasks from the same queue. if self._is_mounted: + logger.info(f"[{instance_id}] Server is mounted, skipping Docket setup") yield return + logger.info( + f"[{instance_id}] Creating Docket with name={settings.docket.name}, " + f"url={settings.docket.url}" + ) # Create Docket instance using configured name and URL async with Docket( name=settings.docket.name, @@ -456,6 +468,9 @@ async def _docket_lifespan(self) -> AsyncIterator[None]: # Set Docket in ContextVar so CurrentDocket can access it docket_token = _current_docket.set(docket) + logger.info( + f"[{instance_id}] _current_docket SET (token={docket_token})" + ) try: # Build worker kwargs from settings worker_kwargs: dict[str, Any] = { @@ -467,27 +482,52 @@ async def _docket_lifespan(self) -> AsyncIterator[None]: worker_kwargs["name"] = settings.docket.worker_name # Create and start Worker + logger.info( + f"[{instance_id}] Creating Worker with kwargs={worker_kwargs}" + ) async with Worker(docket, **worker_kwargs) as worker: # type: ignore[arg-type] # Set Worker in ContextVar so CurrentWorker can access it worker_token = _current_worker.set(worker) + logger.info( + f"[{instance_id}] _current_worker SET (token={worker_token}), " + f"worker.name={worker.name}" + ) try: worker_task = asyncio.create_task(worker.run_forever()) + logger.info( + f"[{instance_id}] Worker task started, " + f"_docket_lifespan YIELDING (lifespan active)" + ) try: yield finally: + logger.info( + f"[{instance_id}] _docket_lifespan EXITING " + "(after yield, cancelling worker)" + ) worker_task.cancel() with suppress(asyncio.CancelledError): await worker_task + logger.info(f"[{instance_id}] Worker task cancelled") finally: + logger.info( + f"[{instance_id}] _current_worker RESET (token={worker_token})" + ) _current_worker.reset(worker_token) finally: # Reset ContextVar + logger.info( + f"[{instance_id}] _current_docket RESET (token={docket_token})" + ) _current_docket.reset(docket_token) # Clear instance attribute self._docket = None + logger.info(f"[{instance_id}] self._docket = None") finally: # Reset server ContextVar + logger.info(f"[{instance_id}] _current_server RESET (token={server_token})") _current_server.reset(server_token) + logger.info(f"[{instance_id}] _docket_lifespan EXITED") async def _register_mounted_server_functions( self, @@ -563,16 +603,30 @@ async def _register_mounted_server_functions( @asynccontextmanager async def _lifespan_manager(self) -> AsyncIterator[None]: + import socket + + instance_id = f"{socket.gethostname()}#{id(self)}" + logger.info(f"[{instance_id}] _lifespan_manager ENTERING") + if self._lifespan_result_set: + logger.info( + f"[{instance_id}] _lifespan_manager: already set, yielding without setup" + ) yield return + logger.info( + f"[{instance_id}] _lifespan_manager: entering user lifespan and docket lifespan" + ) async with ( self._lifespan(self) as user_lifespan_result, self._docket_lifespan(), ): self._lifespan_result = user_lifespan_result self._lifespan_result_set = True + logger.info( + f"[{instance_id}] _lifespan_manager: _lifespan_result_set = True" + ) async with AsyncExitStack[bool | None]() as stack: for server in self._mounted_servers: @@ -581,13 +635,21 @@ async def _lifespan_manager(self) -> AsyncIterator[None]: ) self._started.set() + logger.info( + f"[{instance_id}] _lifespan_manager YIELDING (server started, " + f"_started.is_set={self._started.is_set()})" + ) try: yield finally: + logger.info( + f"[{instance_id}] _lifespan_manager EXITING (after yield)" + ) self._started.clear() self._lifespan_result_set = False self._lifespan_result = None + logger.info(f"[{instance_id}] _lifespan_manager EXITED") async def run_async( self, diff --git a/src/fastmcp/server/tasks/handlers.py b/src/fastmcp/server/tasks/handlers.py index 3f528a0b59..6e46d78287 100644 --- a/src/fastmcp/server/tasks/handlers.py +++ b/src/fastmcp/server/tasks/handlers.py @@ -44,6 +44,13 @@ async def handle_tool_as_task( Returns: CallToolResult: Task stub with task metadata in _meta """ + import socket + + from fastmcp.utilities.logging import get_logger + + _logger = get_logger(__name__) + instance_id = f"{socket.gethostname()}#{id(server)}" + # Generate server-side task ID per SEP-1686 final spec (line 375-377) # Server MUST generate task IDs, clients no longer provide them server_task_id = str(uuid.uuid4()) @@ -57,11 +64,20 @@ async def handle_tool_as_task( session_id = ctx.session_id docket = _current_docket.get() + _logger.info( + f"[{instance_id}] handle_tool_as_task: tool={tool_name}, " + f"_current_docket.get()={docket}, server._docket={server._docket}" + ) if docket is None: + _logger.error( + f"[{instance_id}] handle_tool_as_task FAILED: _current_docket is None! " + f"server._started.is_set()={server._started.is_set()}" + ) raise McpError( ErrorData( code=INTERNAL_ERROR, - message="Background tasks require a running FastMCP server context", + message=f"Background tasks require a running FastMCP server context " + f"(instance={instance_id}, server._docket={server._docket})", ) ) From fc4d0114dbd3f08c709bed3497e4d21a4e66f026 Mon Sep 17 00:00:00 2001 From: Chris Guidry Date: Fri, 9 Jan 2026 11:47:01 -0500 Subject: [PATCH 02/12] Fix ContextVar propagation for nested ASGI lifespan calls When FastMCP runs with uvicorn, the lifespan is entered twice: 1. FastMCP's outer context (during http_app setup) 2. Starlette's ASGI lifespan (which request handlers inherit from) The second call was skipping ContextVar setup because _lifespan_result_set was already True. This caused _current_docket.get() to return None in request handlers even though server._docket was correctly set. Fix: Always set ContextVars when entering _lifespan_manager, using the already-initialized values from self._docket and self._worker. Co-Authored-By: Claude Opus 4.5 --- src/fastmcp/server/server.py | 32 +++++++++++++++++++++++++++++--- 1 file changed, 29 insertions(+), 3 deletions(-) diff --git a/src/fastmcp/server/server.py b/src/fastmcp/server/server.py index a8e9a64779..1a8cab4d3a 100644 --- a/src/fastmcp/server/server.py +++ b/src/fastmcp/server/server.py @@ -197,8 +197,9 @@ def __init__( # Resolve server default for background task support self._support_tasks_by_default: bool = tasks if tasks is not None else False - # Docket instance (set during lifespan for cross-task access) + # Docket and Worker instances (set during lifespan for cross-task access) self._docket = None + self._worker = None self._additional_http_routes: list[BaseRoute] = [] self._mounted_servers: list[MountedServer] = [] @@ -486,6 +487,8 @@ async def _docket_lifespan(self) -> AsyncIterator[None]: f"[{instance_id}] Creating Worker with kwargs={worker_kwargs}" ) async with Worker(docket, **worker_kwargs) as worker: # type: ignore[arg-type] + # Store on server instance for cross-context access + self._worker = worker # Set Worker in ContextVar so CurrentWorker can access it worker_token = _current_worker.set(worker) logger.info( @@ -514,6 +517,7 @@ async def _docket_lifespan(self) -> AsyncIterator[None]: f"[{instance_id}] _current_worker RESET (token={worker_token})" ) _current_worker.reset(worker_token) + self._worker = None finally: # Reset ContextVar logger.info( @@ -609,10 +613,32 @@ async def _lifespan_manager(self) -> AsyncIterator[None]: logger.info(f"[{instance_id}] _lifespan_manager ENTERING") if self._lifespan_result_set: + # Lifespan already ran in a parent async context. We need to set ContextVars + # again in THIS context because ContextVars don't propagate across different + # async contexts (e.g., Starlette's ASGI lifespan vs FastMCP's outer lifespan). + # Request handlers inherit from Starlette's context, so we must set here. + from fastmcp.server.dependencies import ( + _current_docket, + _current_server, + _current_worker, + ) + logger.info( - f"[{instance_id}] _lifespan_manager: already set, yielding without setup" + f"[{instance_id}] _lifespan_manager: already set, setting ContextVars " + f"for this context (docket={self._docket}, worker={self._worker})" ) - yield + + server_token = _current_server.set(weakref.ref(self)) + docket_token = _current_docket.set(self._docket) if self._docket else None + worker_token = _current_worker.set(self._worker) if self._worker else None + try: + yield + finally: + _current_server.reset(server_token) + if docket_token is not None: + _current_docket.reset(docket_token) + if worker_token is not None: + _current_worker.reset(worker_token) return logger.info( From 4c169a80de5f4d51b89d2a061e27c72c18eed4f6 Mon Sep 17 00:00:00 2001 From: Chris Guidry Date: Fri, 9 Jan 2026 13:02:32 -0500 Subject: [PATCH 03/12] Add server._docket fallback for Lambda ContextVar issue ContextVars set during lifespan don't propagate to request handlers in Lambda (works fine locally). As a workaround, fall back to using server._docket when the ContextVar returns None. This is a Lambda-specific issue - possibly related to how Lambda Web Adapter or Lambda's asyncio runtime handles context propagation. Co-Authored-By: Claude Opus 4.5 --- src/fastmcp/server/tasks/handlers.py | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/src/fastmcp/server/tasks/handlers.py b/src/fastmcp/server/tasks/handlers.py index 6e46d78287..48356be79b 100644 --- a/src/fastmcp/server/tasks/handlers.py +++ b/src/fastmcp/server/tasks/handlers.py @@ -63,21 +63,27 @@ async def handle_tool_as_task( ctx = get_context() session_id = ctx.session_id + # Try ContextVar first, fall back to server instance attribute. + # ContextVars don't propagate to request handlers in ASGI servers because + # request tasks are created from the main event loop, not the lifespan context. docket = _current_docket.get() + if docket is None: + docket = server._docket _logger.info( f"[{instance_id}] handle_tool_as_task: tool={tool_name}, " - f"_current_docket.get()={docket}, server._docket={server._docket}" + f"_current_docket.get()={_current_docket.get()}, server._docket={server._docket}, " + f"using docket={docket}" ) if docket is None: _logger.error( - f"[{instance_id}] handle_tool_as_task FAILED: _current_docket is None! " + f"[{instance_id}] handle_tool_as_task FAILED: no Docket available! " f"server._started.is_set()={server._started.is_set()}" ) raise McpError( ErrorData( code=INTERNAL_ERROR, message=f"Background tasks require a running FastMCP server context " - f"(instance={instance_id}, server._docket={server._docket})", + f"(instance={instance_id})", ) ) @@ -183,7 +189,10 @@ async def handle_prompt_as_task( ctx = get_context() session_id = ctx.session_id + # Try ContextVar first, fall back to server instance attribute docket = _current_docket.get() + if docket is None: + docket = server._docket if docket is None: raise McpError( ErrorData( @@ -292,7 +301,10 @@ async def handle_resource_as_task( ctx = get_context() session_id = ctx.session_id + # Try ContextVar first, fall back to server instance attribute docket = _current_docket.get() + if docket is None: + docket = server._docket if docket is None: raise McpError( ErrorData( From b32f7f5f99c6839d05985886bdc0c52f87514560 Mon Sep 17 00:00:00 2001 From: Chris Guidry Date: Fri, 9 Jan 2026 13:03:52 -0500 Subject: [PATCH 04/12] Set docket/worker ContextVars at request time in Context Instead of relying on ContextVar propagation from lifespan (which fails in Lambda), set _current_docket and _current_worker when entering a Context for each request. This ensures user dependencies like CurrentDocket() and CurrentWorker() work in all environments. The values come from server._docket and server._worker which are always available after lifespan initialization. Co-Authored-By: Claude Opus 4.5 --- src/fastmcp/server/context.py | 31 +++++++++++++++++++++++++++---- 1 file changed, 27 insertions(+), 4 deletions(-) diff --git a/src/fastmcp/server/context.py b/src/fastmcp/server/context.py index 8cd4f7307a..2abc9b939f 100644 --- a/src/fastmcp/server/context.py +++ b/src/fastmcp/server/context.py @@ -185,10 +185,23 @@ async def __aenter__(self) -> Context: self._tokens.append(token) # Set current server for dependency injection (use weakref to avoid reference cycles) - from fastmcp.server.dependencies import _current_server + from fastmcp.server.dependencies import ( + _current_docket, + _current_server, + _current_worker, + ) self._server_token = _current_server.set(weakref.ref(self.fastmcp)) + # Set docket/worker from server instance for this request's context. + # This ensures ContextVars work even in environments (like Lambda) where + # lifespan ContextVars don't propagate to request handlers. + server = self.fastmcp + if server._docket is not None: + self._docket_token = _current_docket.set(server._docket) + if server._worker is not None: + self._worker_token = _current_worker.set(server._worker) + return self async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: @@ -196,10 +209,20 @@ async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: # Flush any remaining notifications before exiting await self._flush_notifications() - # Reset server token - if hasattr(self, "_server_token"): - from fastmcp.server.dependencies import _current_server + # Reset server/docket/worker tokens + from fastmcp.server.dependencies import ( + _current_docket, + _current_server, + _current_worker, + ) + if hasattr(self, "_worker_token"): + _current_worker.reset(self._worker_token) + delattr(self, "_worker_token") + if hasattr(self, "_docket_token"): + _current_docket.reset(self._docket_token) + delattr(self, "_docket_token") + if hasattr(self, "_server_token"): _current_server.reset(self._server_token) delattr(self, "_server_token") From 6e8c60b65e449d516dde82cefb6ffdd228c25c8a Mon Sep 17 00:00:00 2001 From: Chris Guidry Date: Fri, 9 Jan 2026 13:05:57 -0500 Subject: [PATCH 05/12] Add verbose logging to Context for debugging Lambda ContextVar issue Adds detailed logging to Context.__aenter__ and __aexit__ to track: - When Context is entered/exited - Values of server._docket and server._worker - ContextVar values before and after setting - Token values for debugging reset issues This will help diagnose why ContextVars might not propagate in Lambda. Co-Authored-By: Claude Opus 4.5 --- src/fastmcp/server/context.py | 60 +++++++++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/src/fastmcp/server/context.py b/src/fastmcp/server/context.py index 2abc9b939f..1e62a4699e 100644 --- a/src/fastmcp/server/context.py +++ b/src/fastmcp/server/context.py @@ -175,14 +175,25 @@ def fastmcp(self) -> FastMCP: async def __aenter__(self) -> Context: """Enter the context manager and set this context as the current context.""" + import socket + + from fastmcp.utilities.logging import get_logger + + logger = get_logger(__name__) + instance_id = f"{socket.gethostname()}#{id(self.fastmcp)}" + + logger.info(f"[{instance_id}] Context.__aenter__ ENTERING") + parent_context = _current_context.get(None) if parent_context is not None: # Inherit state from parent context self._state = copy.deepcopy(parent_context._state) + logger.info(f"[{instance_id}] Context: inherited state from parent context") # Always set this context and save the token token = _current_context.set(self) self._tokens.append(token) + logger.info(f"[{instance_id}] Context: _current_context SET (token={token})") # Set current server for dependency injection (use weakref to avoid reference cycles) from fastmcp.server.dependencies import ( @@ -192,20 +203,55 @@ async def __aenter__(self) -> Context: ) self._server_token = _current_server.set(weakref.ref(self.fastmcp)) + logger.info( + f"[{instance_id}] Context: _current_server SET (token={self._server_token})" + ) # Set docket/worker from server instance for this request's context. # This ensures ContextVars work even in environments (like Lambda) where # lifespan ContextVars don't propagate to request handlers. server = self.fastmcp + logger.info( + f"[{instance_id}] Context: server._docket={server._docket}, " + f"server._worker={server._worker}, " + f"_current_docket.get() BEFORE={_current_docket.get()}" + ) if server._docket is not None: self._docket_token = _current_docket.set(server._docket) + logger.info( + f"[{instance_id}] Context: _current_docket SET from server._docket " + f"(token={self._docket_token}), _current_docket.get() AFTER={_current_docket.get()}" + ) + else: + logger.warning( + f"[{instance_id}] Context: server._docket is None, skipping _current_docket.set()" + ) + if server._worker is not None: self._worker_token = _current_worker.set(server._worker) + logger.info( + f"[{instance_id}] Context: _current_worker SET from server._worker " + f"(token={self._worker_token})" + ) + else: + logger.warning( + f"[{instance_id}] Context: server._worker is None, skipping _current_worker.set()" + ) + logger.info(f"[{instance_id}] Context.__aenter__ COMPLETE") return self async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: """Exit the context manager and reset the most recent token.""" + import socket + + from fastmcp.utilities.logging import get_logger + + logger = get_logger(__name__) + instance_id = f"{socket.gethostname()}#{id(self.fastmcp)}" + + logger.info(f"[{instance_id}] Context.__aexit__ ENTERING (exc_type={exc_type})") + # Flush any remaining notifications before exiting await self._flush_notifications() @@ -217,20 +263,34 @@ async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: ) if hasattr(self, "_worker_token"): + logger.info( + f"[{instance_id}] Context: resetting _current_worker (token={self._worker_token})" + ) _current_worker.reset(self._worker_token) delattr(self, "_worker_token") if hasattr(self, "_docket_token"): + logger.info( + f"[{instance_id}] Context: resetting _current_docket (token={self._docket_token})" + ) _current_docket.reset(self._docket_token) delattr(self, "_docket_token") if hasattr(self, "_server_token"): + logger.info( + f"[{instance_id}] Context: resetting _current_server (token={self._server_token})" + ) _current_server.reset(self._server_token) delattr(self, "_server_token") # Reset context token if self._tokens: token = self._tokens.pop() + logger.info( + f"[{instance_id}] Context: resetting _current_context (token={token})" + ) _current_context.reset(token) + logger.info(f"[{instance_id}] Context.__aexit__ COMPLETE") + @property def request_context(self) -> RequestContext[ServerSession, Any, Request] | None: """Access to the underlying request context. From 726976a99f9047380adade068b0f62fff32db528 Mon Sep 17 00:00:00 2001 From: Chris Guidry Date: Fri, 9 Jan 2026 13:28:52 -0500 Subject: [PATCH 06/12] Add verbose Redis error logging with full traceback When Redis operations fail, log the full traceback to help diagnose ACL and permission issues in production environments. Co-Authored-By: Claude Opus 4.5 --- src/fastmcp/server/tasks/handlers.py | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/src/fastmcp/server/tasks/handlers.py b/src/fastmcp/server/tasks/handlers.py index 48356be79b..00a65df278 100644 --- a/src/fastmcp/server/tasks/handlers.py +++ b/src/fastmcp/server/tasks/handlers.py @@ -101,9 +101,23 @@ async def handle_tool_as_task( ttl_seconds = int( docket.execution_ttl.total_seconds() + TASK_MAPPING_TTL_BUFFER_SECONDS ) - async with docket.redis() as redis: - await redis.set(task_meta_key, task_key, ex=ttl_seconds) - await redis.set(created_at_key, created_at, ex=ttl_seconds) + _logger.info( + f"[{instance_id}] About to write to Redis: task_meta_key={task_meta_key}, " + f"created_at_key={created_at_key}, ttl={ttl_seconds}" + ) + try: + async with docket.redis() as redis: + await redis.set(task_meta_key, task_key, ex=ttl_seconds) + await redis.set(created_at_key, created_at, ex=ttl_seconds) + _logger.info(f"[{instance_id}] Redis write successful") + except Exception as e: + import traceback + + _logger.error( + f"[{instance_id}] Redis write FAILED: {type(e).__name__}: {e}\n" + f"Traceback:\n{traceback.format_exc()}" + ) + raise # Send notifications/tasks/created per SEP-1686 (mandatory) # Send BEFORE queuing to avoid race where task completes before notification From c72b5dd7f0cdeef9cdf3f4bb1c3eca64deb0e973 Mon Sep 17 00:00:00 2001 From: Chris Guidry Date: Fri, 9 Jan 2026 13:37:09 -0500 Subject: [PATCH 07/12] Add detailed logging around docket.add() and subscription tasks Tracing where the Redis ACL error occurs - the initial Redis writes succeed but error happens somewhere after. Co-Authored-By: Claude Opus 4.5 --- src/fastmcp/server/tasks/handlers.py | 30 ++++++++++++++++++++--- src/fastmcp/server/tasks/subscriptions.py | 17 ++++++++++++- 2 files changed, 42 insertions(+), 5 deletions(-) diff --git a/src/fastmcp/server/tasks/handlers.py b/src/fastmcp/server/tasks/handlers.py index 00a65df278..4f850b9215 100644 --- a/src/fastmcp/server/tasks/handlers.py +++ b/src/fastmcp/server/tasks/handlers.py @@ -139,18 +139,38 @@ async def handle_tool_as_task( # Queue function to Docket by name (result storage via execution_ttl) # Use tool.key which matches what was registered - prefixed for mounted tools - await docket.add( - tool.key, - key=task_key, - )(**arguments) + _logger.info( + f"[{instance_id}] About to call docket.add: tool.key={tool.key}, " + f"task_key={task_key}, arguments={arguments}" + ) + try: + await docket.add( + tool.key, + key=task_key, + )(**arguments) + _logger.info(f"[{instance_id}] docket.add completed successfully") + except Exception as e: + import traceback + + _logger.error( + f"[{instance_id}] docket.add FAILED: {type(e).__name__}: {e}\n" + f"Traceback:\n{traceback.format_exc()}" + ) + raise # Spawn subscription task to send status notifications (SEP-1686 optional feature) from fastmcp.server.tasks.subscriptions import subscribe_to_task_updates # Start subscription in session's task group (persists for connection lifetime) + _logger.info( + f"[{instance_id}] Checking for subscription task group: " + f"hasattr={hasattr(ctx.session, '_subscription_task_group')}" + ) if hasattr(ctx.session, "_subscription_task_group"): tg = ctx.session._subscription_task_group # type: ignore[attr-defined] + _logger.info(f"[{instance_id}] Task group: {tg}") if tg: + _logger.info(f"[{instance_id}] Starting subscription task") tg.start_soon( # type: ignore[union-attr] subscribe_to_task_updates, server_task_id, @@ -158,7 +178,9 @@ async def handle_tool_as_task( ctx.session, docket, ) + _logger.info(f"[{instance_id}] Subscription task started") + _logger.info(f"[{instance_id}] About to return task stub") # Return task stub # Tasks MUST begin in "working" status per SEP-1686 final spec (line 381) return mcp.types.CallToolResult( diff --git a/src/fastmcp/server/tasks/subscriptions.py b/src/fastmcp/server/tasks/subscriptions.py index 9bbf0fad16..5b42694710 100644 --- a/src/fastmcp/server/tasks/subscriptions.py +++ b/src/fastmcp/server/tasks/subscriptions.py @@ -42,13 +42,23 @@ async def subscribe_to_task_updates( session: MCP ServerSession for sending notifications docket: Docket instance for subscribing to execution events """ + logger.info( + f"subscribe_to_task_updates STARTING: task_id={task_id}, task_key={task_key}" + ) try: + logger.info( + f"subscribe_to_task_updates: About to call docket.get_execution({task_key})" + ) execution = await docket.get_execution(task_key) + logger.info( + f"subscribe_to_task_updates: docket.get_execution returned {execution}" + ) if execution is None: logger.warning(f"No execution found for task {task_id}") return # Subscribe to state and progress events from Docket + logger.info("subscribe_to_task_updates: About to subscribe to execution events") async for event in execution.subscribe(): if event["type"] == "state": # Send notifications/tasks/status when state changes @@ -70,7 +80,12 @@ async def subscribe_to_task_updates( ) except Exception as e: - logger.warning(f"Subscription task failed for {task_id}: {e}", exc_info=True) + import traceback + + logger.error( + f"subscribe_to_task_updates FAILED for {task_id}: {type(e).__name__}: {e}\n" + f"Traceback:\n{traceback.format_exc()}" + ) async def _send_status_notification( From 6824d56aec6c808e82c1beb9777a6f7982fccbe5 Mon Sep 17 00:00:00 2001 From: Chris Guidry Date: Fri, 9 Jan 2026 15:06:02 -0500 Subject: [PATCH 08/12] Bump pydocket to >=0.16.5 for Redis ACL fix pydocket 0.16.5 fixes an issue where worker_group_name was passed as a KEY instead of ARGV in Lua scripts, causing ACL failures when Redis users are restricted to key patterns. Co-Authored-By: Claude Opus 4.5 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 3010b826eb..d0ad1daf65 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,7 +10,7 @@ dependencies = [ "mcp>=1.24.0,<2.0", "openapi-pydantic>=0.5.1", "platformdirs>=4.0.0", - "pydocket>=0.16.4", + "pydocket>=0.16.5", "rich>=13.9.4", "cyclopts>=4.0.0", "authlib>=1.6.5", From 5cea6ba575f054281eef4cdeaaf0ffd831020481 Mon Sep 17 00:00:00 2001 From: Chris Guidry Date: Fri, 9 Jan 2026 17:10:53 -0500 Subject: [PATCH 09/12] Bump pydocket to >=0.16.6 for py-key-value Redis username workaround Co-Authored-By: Claude Opus 4.5 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index d0ad1daf65..431db8fed5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,7 +10,7 @@ dependencies = [ "mcp>=1.24.0,<2.0", "openapi-pydantic>=0.5.1", "platformdirs>=4.0.0", - "pydocket>=0.16.5", + "pydocket>=0.16.6", "rich>=13.9.4", "cyclopts>=4.0.0", "authlib>=1.6.5", From 687889c598216fd0236d2118967994062a7fdff7 Mon Sep 17 00:00:00 2001 From: Chris Guidry Date: Mon, 12 Jan 2026 09:59:11 -0500 Subject: [PATCH 10/12] Simplify ContextVar management - trust Context to set them at request time Remove redundant ContextVar handling: - _lifespan_manager no longer re-sets ContextVars in early-return branch - Handler fallback logic removed (no more `if docket is None: docket = server._docket`) The authoritative place for request-context ContextVars is now Context.__aenter__, which sets _current_docket and _current_worker from server instance attributes. Co-Authored-By: Claude Opus 4.5 --- src/fastmcp/server/server.py | 29 +++++----------------------- src/fastmcp/server/tasks/handlers.py | 17 ++++------------ 2 files changed, 9 insertions(+), 37 deletions(-) diff --git a/src/fastmcp/server/server.py b/src/fastmcp/server/server.py index 1a8cab4d3a..0be9595182 100644 --- a/src/fastmcp/server/server.py +++ b/src/fastmcp/server/server.py @@ -613,32 +613,13 @@ async def _lifespan_manager(self) -> AsyncIterator[None]: logger.info(f"[{instance_id}] _lifespan_manager ENTERING") if self._lifespan_result_set: - # Lifespan already ran in a parent async context. We need to set ContextVars - # again in THIS context because ContextVars don't propagate across different - # async contexts (e.g., Starlette's ASGI lifespan vs FastMCP's outer lifespan). - # Request handlers inherit from Starlette's context, so we must set here. - from fastmcp.server.dependencies import ( - _current_docket, - _current_server, - _current_worker, - ) - + # Lifespan already ran - ContextVars will be set by Context.__aenter__ + # at request time, so we just yield here. logger.info( - f"[{instance_id}] _lifespan_manager: already set, setting ContextVars " - f"for this context (docket={self._docket}, worker={self._worker})" + f"[{instance_id}] _lifespan_manager: already set, yielding " + f"(ContextVars managed by Context at request time)" ) - - server_token = _current_server.set(weakref.ref(self)) - docket_token = _current_docket.set(self._docket) if self._docket else None - worker_token = _current_worker.set(self._worker) if self._worker else None - try: - yield - finally: - _current_server.reset(server_token) - if docket_token is not None: - _current_docket.reset(docket_token) - if worker_token is not None: - _current_worker.reset(worker_token) + yield return logger.info( diff --git a/src/fastmcp/server/tasks/handlers.py b/src/fastmcp/server/tasks/handlers.py index 4f850b9215..1a63cc1589 100644 --- a/src/fastmcp/server/tasks/handlers.py +++ b/src/fastmcp/server/tasks/handlers.py @@ -63,16 +63,11 @@ async def handle_tool_as_task( ctx = get_context() session_id = ctx.session_id - # Try ContextVar first, fall back to server instance attribute. - # ContextVars don't propagate to request handlers in ASGI servers because - # request tasks are created from the main event loop, not the lifespan context. + # Get Docket from ContextVar (set by Context.__aenter__ at request time) docket = _current_docket.get() - if docket is None: - docket = server._docket _logger.info( f"[{instance_id}] handle_tool_as_task: tool={tool_name}, " - f"_current_docket.get()={_current_docket.get()}, server._docket={server._docket}, " - f"using docket={docket}" + f"_current_docket.get()={docket}" ) if docket is None: _logger.error( @@ -225,10 +220,8 @@ async def handle_prompt_as_task( ctx = get_context() session_id = ctx.session_id - # Try ContextVar first, fall back to server instance attribute + # Get Docket from ContextVar (set by Context.__aenter__ at request time) docket = _current_docket.get() - if docket is None: - docket = server._docket if docket is None: raise McpError( ErrorData( @@ -337,10 +330,8 @@ async def handle_resource_as_task( ctx = get_context() session_id = ctx.session_id - # Try ContextVar first, fall back to server instance attribute + # Get Docket from ContextVar (set by Context.__aenter__ at request time) docket = _current_docket.get() - if docket is None: - docket = server._docket if docket is None: raise McpError( ErrorData( From 7b24bd58c81b7365148681cea04ddeda95b12952 Mon Sep 17 00:00:00 2001 From: Chris Guidry Date: Mon, 12 Jan 2026 10:00:42 -0500 Subject: [PATCH 11/12] Make all task handlers consistent with debug logging All three handlers (tool, prompt, resource) now have identical patterns: - Debug logging for docket access, Redis writes, docket.add, subscriptions - Try/except with traceback logging around Redis and docket operations - Consistent error messages with instance_id Co-Authored-By: Claude Opus 4.5 --- src/fastmcp/server/tasks/handlers.py | 158 +++++++++++++++++++++++---- 1 file changed, 138 insertions(+), 20 deletions(-) diff --git a/src/fastmcp/server/tasks/handlers.py b/src/fastmcp/server/tasks/handlers.py index 1a63cc1589..34a2f9824b 100644 --- a/src/fastmcp/server/tasks/handlers.py +++ b/src/fastmcp/server/tasks/handlers.py @@ -208,6 +208,13 @@ async def handle_prompt_as_task( Returns: GetPromptResult: Task stub with task metadata in _meta """ + import socket + + from fastmcp.utilities.logging import get_logger + + _logger = get_logger(__name__) + instance_id = f"{socket.gethostname()}#{id(server)}" + # Generate server-side task ID per SEP-1686 final spec (line 375-377) # Server MUST generate task IDs, clients no longer provide them server_task_id = str(uuid.uuid4()) @@ -222,11 +229,20 @@ async def handle_prompt_as_task( # Get Docket from ContextVar (set by Context.__aenter__ at request time) docket = _current_docket.get() + _logger.info( + f"[{instance_id}] handle_prompt_as_task: prompt={prompt_name}, " + f"_current_docket.get()={docket}" + ) if docket is None: + _logger.error( + f"[{instance_id}] handle_prompt_as_task FAILED: no Docket available! " + f"server._started.is_set()={server._started.is_set()}" + ) raise McpError( ErrorData( code=INTERNAL_ERROR, - message="Background tasks require a running FastMCP server context", + message=f"Background tasks require a running FastMCP server context " + f"(instance={instance_id})", ) ) @@ -244,9 +260,23 @@ async def handle_prompt_as_task( ttl_seconds = int( docket.execution_ttl.total_seconds() + TASK_MAPPING_TTL_BUFFER_SECONDS ) - async with docket.redis() as redis: - await redis.set(task_meta_key, task_key, ex=ttl_seconds) - await redis.set(created_at_key, created_at, ex=ttl_seconds) + _logger.info( + f"[{instance_id}] About to write to Redis: task_meta_key={task_meta_key}, " + f"created_at_key={created_at_key}, ttl={ttl_seconds}" + ) + try: + async with docket.redis() as redis: + await redis.set(task_meta_key, task_key, ex=ttl_seconds) + await redis.set(created_at_key, created_at, ex=ttl_seconds) + _logger.info(f"[{instance_id}] Redis write successful") + except Exception as e: + import traceback + + _logger.error( + f"[{instance_id}] Redis write FAILED: {type(e).__name__}: {e}\n" + f"Traceback:\n{traceback.format_exc()}" + ) + raise # Send notifications/tasks/created per SEP-1686 (mandatory) # Send BEFORE queuing to avoid race where task completes before notification @@ -265,18 +295,38 @@ async def handle_prompt_as_task( # Queue function to Docket by name (result storage via execution_ttl) # Use prompt.key which matches what was registered - prefixed for mounted prompts - await docket.add( - prompt.key, - key=task_key, - )(**(arguments or {})) + _logger.info( + f"[{instance_id}] About to call docket.add: prompt.key={prompt.key}, " + f"task_key={task_key}, arguments={arguments}" + ) + try: + await docket.add( + prompt.key, + key=task_key, + )(**(arguments or {})) + _logger.info(f"[{instance_id}] docket.add completed successfully") + except Exception as e: + import traceback + + _logger.error( + f"[{instance_id}] docket.add FAILED: {type(e).__name__}: {e}\n" + f"Traceback:\n{traceback.format_exc()}" + ) + raise # Spawn subscription task to send status notifications (SEP-1686 optional feature) from fastmcp.server.tasks.subscriptions import subscribe_to_task_updates # Start subscription in session's task group (persists for connection lifetime) + _logger.info( + f"[{instance_id}] Checking for subscription task group: " + f"hasattr={hasattr(ctx.session, '_subscription_task_group')}" + ) if hasattr(ctx.session, "_subscription_task_group"): tg = ctx.session._subscription_task_group # type: ignore[attr-defined] + _logger.info(f"[{instance_id}] Task group: {tg}") if tg: + _logger.info(f"[{instance_id}] Starting subscription task") tg.start_soon( # type: ignore[union-attr] subscribe_to_task_updates, server_task_id, @@ -284,7 +334,9 @@ async def handle_prompt_as_task( ctx.session, docket, ) + _logger.info(f"[{instance_id}] Subscription task started") + _logger.info(f"[{instance_id}] About to return task stub") # Return task stub # Tasks MUST begin in "working" status per SEP-1686 final spec (line 381) return mcp.types.GetPromptResult( @@ -318,6 +370,13 @@ async def handle_resource_as_task( Returns: ServerResult with ReadResourceResult stub """ + import socket + + from fastmcp.utilities.logging import get_logger + + _logger = get_logger(__name__) + instance_id = f"{socket.gethostname()}#{id(server)}" + # Generate server-side task ID per SEP-1686 final spec (line 375-377) # Server MUST generate task IDs, clients no longer provide them server_task_id = str(uuid.uuid4()) @@ -332,11 +391,20 @@ async def handle_resource_as_task( # Get Docket from ContextVar (set by Context.__aenter__ at request time) docket = _current_docket.get() + _logger.info( + f"[{instance_id}] handle_resource_as_task: uri={uri}, " + f"_current_docket.get()={docket}" + ) if docket is None: + _logger.error( + f"[{instance_id}] handle_resource_as_task FAILED: no Docket available! " + f"server._started.is_set()={server._started.is_set()}" + ) raise McpError( ErrorData( code=INTERNAL_ERROR, - message="Background tasks require Docket", + message=f"Background tasks require a running FastMCP server context " + f"(instance={instance_id})", ) ) @@ -351,9 +419,23 @@ async def handle_resource_as_task( ttl_seconds = int( docket.execution_ttl.total_seconds() + TASK_MAPPING_TTL_BUFFER_SECONDS ) - async with docket.redis() as redis: - await redis.set(task_meta_key, task_key, ex=ttl_seconds) - await redis.set(created_at_key, created_at, ex=ttl_seconds) + _logger.info( + f"[{instance_id}] About to write to Redis: task_meta_key={task_meta_key}, " + f"created_at_key={created_at_key}, ttl={ttl_seconds}" + ) + try: + async with docket.redis() as redis: + await redis.set(task_meta_key, task_key, ex=ttl_seconds) + await redis.set(created_at_key, created_at, ex=ttl_seconds) + _logger.info(f"[{instance_id}] Redis write successful") + except Exception as e: + import traceback + + _logger.error( + f"[{instance_id}] Redis write FAILED: {type(e).__name__}: {e}\n" + f"Traceback:\n{traceback.format_exc()}" + ) + raise # Send notifications/tasks/created per SEP-1686 (mandatory) # Send BEFORE queuing to avoid race where task completes before notification @@ -377,23 +459,57 @@ async def handle_resource_as_task( if isinstance(resource, FunctionResourceTemplate): params = match_uri_template(uri, resource.uri_template) or {} - await docket.add( - resource.name, - key=task_key, - )(**params) + _logger.info( + f"[{instance_id}] About to call docket.add: resource.name={resource.name}, " + f"task_key={task_key}, params={params} (template)" + ) + try: + await docket.add( + resource.name, + key=task_key, + )(**params) + _logger.info(f"[{instance_id}] docket.add completed successfully") + except Exception as e: + import traceback + + _logger.error( + f"[{instance_id}] docket.add FAILED: {type(e).__name__}: {e}\n" + f"Traceback:\n{traceback.format_exc()}" + ) + raise else: - await docket.add( - resource.name, - key=task_key, - )() + _logger.info( + f"[{instance_id}] About to call docket.add: resource.name={resource.name}, " + f"task_key={task_key} (static resource)" + ) + try: + await docket.add( + resource.name, + key=task_key, + )() + _logger.info(f"[{instance_id}] docket.add completed successfully") + except Exception as e: + import traceback + + _logger.error( + f"[{instance_id}] docket.add FAILED: {type(e).__name__}: {e}\n" + f"Traceback:\n{traceback.format_exc()}" + ) + raise # Spawn subscription task to send status notifications (SEP-1686 optional feature) from fastmcp.server.tasks.subscriptions import subscribe_to_task_updates # Start subscription in session's task group (persists for connection lifetime) + _logger.info( + f"[{instance_id}] Checking for subscription task group: " + f"hasattr={hasattr(ctx.session, '_subscription_task_group')}" + ) if hasattr(ctx.session, "_subscription_task_group"): tg = ctx.session._subscription_task_group # type: ignore[attr-defined] + _logger.info(f"[{instance_id}] Task group: {tg}") if tg: + _logger.info(f"[{instance_id}] Starting subscription task") tg.start_soon( # type: ignore[union-attr] subscribe_to_task_updates, server_task_id, @@ -401,7 +517,9 @@ async def handle_resource_as_task( ctx.session, docket, ) + _logger.info(f"[{instance_id}] Subscription task started") + _logger.info(f"[{instance_id}] About to return task stub") # Return task stub # Tasks MUST begin in "working" status per SEP-1686 final spec (line 381) return mcp.types.ServerResult( From c127dd979fd3b0b901db73662cfe9e5a9d6f7e95 Mon Sep 17 00:00:00 2001 From: Chris Guidry Date: Mon, 12 Jan 2026 10:21:04 -0500 Subject: [PATCH 12/12] Remove debug logging, keep minimal ContextVar fix Removes all the verbose debug logging added during diagnosis while preserving the essential fix: Context.__aenter__ sets _current_docket and _current_worker from server instance attributes. This ensures ContextVars work in ASGI environments where lifespan and request handlers run in sibling async contexts. Co-Authored-By: Claude Opus 4.5 --- src/fastmcp/server/context.py | 59 ------ src/fastmcp/server/server.py | 66 ------- src/fastmcp/server/tasks/handlers.py | 226 +++------------------- src/fastmcp/server/tasks/subscriptions.py | 17 +- uv.lock | 8 +- 5 files changed, 33 insertions(+), 343 deletions(-) diff --git a/src/fastmcp/server/context.py b/src/fastmcp/server/context.py index 1e62a4699e..5370f04690 100644 --- a/src/fastmcp/server/context.py +++ b/src/fastmcp/server/context.py @@ -175,25 +175,14 @@ def fastmcp(self) -> FastMCP: async def __aenter__(self) -> Context: """Enter the context manager and set this context as the current context.""" - import socket - - from fastmcp.utilities.logging import get_logger - - logger = get_logger(__name__) - instance_id = f"{socket.gethostname()}#{id(self.fastmcp)}" - - logger.info(f"[{instance_id}] Context.__aenter__ ENTERING") - parent_context = _current_context.get(None) if parent_context is not None: # Inherit state from parent context self._state = copy.deepcopy(parent_context._state) - logger.info(f"[{instance_id}] Context: inherited state from parent context") # Always set this context and save the token token = _current_context.set(self) self._tokens.append(token) - logger.info(f"[{instance_id}] Context: _current_context SET (token={token})") # Set current server for dependency injection (use weakref to avoid reference cycles) from fastmcp.server.dependencies import ( @@ -203,55 +192,21 @@ async def __aenter__(self) -> Context: ) self._server_token = _current_server.set(weakref.ref(self.fastmcp)) - logger.info( - f"[{instance_id}] Context: _current_server SET (token={self._server_token})" - ) # Set docket/worker from server instance for this request's context. # This ensures ContextVars work even in environments (like Lambda) where # lifespan ContextVars don't propagate to request handlers. server = self.fastmcp - logger.info( - f"[{instance_id}] Context: server._docket={server._docket}, " - f"server._worker={server._worker}, " - f"_current_docket.get() BEFORE={_current_docket.get()}" - ) if server._docket is not None: self._docket_token = _current_docket.set(server._docket) - logger.info( - f"[{instance_id}] Context: _current_docket SET from server._docket " - f"(token={self._docket_token}), _current_docket.get() AFTER={_current_docket.get()}" - ) - else: - logger.warning( - f"[{instance_id}] Context: server._docket is None, skipping _current_docket.set()" - ) if server._worker is not None: self._worker_token = _current_worker.set(server._worker) - logger.info( - f"[{instance_id}] Context: _current_worker SET from server._worker " - f"(token={self._worker_token})" - ) - else: - logger.warning( - f"[{instance_id}] Context: server._worker is None, skipping _current_worker.set()" - ) - logger.info(f"[{instance_id}] Context.__aenter__ COMPLETE") return self async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: """Exit the context manager and reset the most recent token.""" - import socket - - from fastmcp.utilities.logging import get_logger - - logger = get_logger(__name__) - instance_id = f"{socket.gethostname()}#{id(self.fastmcp)}" - - logger.info(f"[{instance_id}] Context.__aexit__ ENTERING (exc_type={exc_type})") - # Flush any remaining notifications before exiting await self._flush_notifications() @@ -263,34 +218,20 @@ async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: ) if hasattr(self, "_worker_token"): - logger.info( - f"[{instance_id}] Context: resetting _current_worker (token={self._worker_token})" - ) _current_worker.reset(self._worker_token) delattr(self, "_worker_token") if hasattr(self, "_docket_token"): - logger.info( - f"[{instance_id}] Context: resetting _current_docket (token={self._docket_token})" - ) _current_docket.reset(self._docket_token) delattr(self, "_docket_token") if hasattr(self, "_server_token"): - logger.info( - f"[{instance_id}] Context: resetting _current_server (token={self._server_token})" - ) _current_server.reset(self._server_token) delattr(self, "_server_token") # Reset context token if self._tokens: token = self._tokens.pop() - logger.info( - f"[{instance_id}] Context: resetting _current_context (token={token})" - ) _current_context.reset(token) - logger.info(f"[{instance_id}] Context.__aexit__ COMPLETE") - @property def request_context(self) -> RequestContext[ServerSession, Any, Request] | None: """Access to the underlying request context. diff --git a/src/fastmcp/server/server.py b/src/fastmcp/server/server.py index 0be9595182..44ac0aab20 100644 --- a/src/fastmcp/server/server.py +++ b/src/fastmcp/server/server.py @@ -386,8 +386,6 @@ def docket(self) -> Docket | None: @asynccontextmanager async def _docket_lifespan(self) -> AsyncIterator[None]: """Manage Docket instance and Worker for background task execution.""" - import socket - from fastmcp import settings # Set FastMCP server in ContextVar so CurrentFastMCP can access it (use weakref to avoid reference cycles) @@ -397,26 +395,16 @@ async def _docket_lifespan(self) -> AsyncIterator[None]: _current_worker, ) - # Get instance identifier for debugging Lambda lifecycle issues - instance_id = f"{socket.gethostname()}#{id(self)}" - logger.info(f"[{instance_id}] _docket_lifespan ENTERING") - server_token = _current_server.set(weakref.ref(self)) - logger.info(f"[{instance_id}] _current_server SET (token={server_token})") try: # For directly mounted servers, the parent's Docket/Worker handles all # task execution. Skip creating our own to avoid race conditions with # multiple workers competing for tasks from the same queue. if self._is_mounted: - logger.info(f"[{instance_id}] Server is mounted, skipping Docket setup") yield return - logger.info( - f"[{instance_id}] Creating Docket with name={settings.docket.name}, " - f"url={settings.docket.url}" - ) # Create Docket instance using configured name and URL async with Docket( name=settings.docket.name, @@ -469,9 +457,6 @@ async def _docket_lifespan(self) -> AsyncIterator[None]: # Set Docket in ContextVar so CurrentDocket can access it docket_token = _current_docket.set(docket) - logger.info( - f"[{instance_id}] _current_docket SET (token={docket_token})" - ) try: # Build worker kwargs from settings worker_kwargs: dict[str, Any] = { @@ -483,55 +468,27 @@ async def _docket_lifespan(self) -> AsyncIterator[None]: worker_kwargs["name"] = settings.docket.worker_name # Create and start Worker - logger.info( - f"[{instance_id}] Creating Worker with kwargs={worker_kwargs}" - ) async with Worker(docket, **worker_kwargs) as worker: # type: ignore[arg-type] # Store on server instance for cross-context access self._worker = worker # Set Worker in ContextVar so CurrentWorker can access it worker_token = _current_worker.set(worker) - logger.info( - f"[{instance_id}] _current_worker SET (token={worker_token}), " - f"worker.name={worker.name}" - ) try: worker_task = asyncio.create_task(worker.run_forever()) - logger.info( - f"[{instance_id}] Worker task started, " - f"_docket_lifespan YIELDING (lifespan active)" - ) try: yield finally: - logger.info( - f"[{instance_id}] _docket_lifespan EXITING " - "(after yield, cancelling worker)" - ) worker_task.cancel() with suppress(asyncio.CancelledError): await worker_task - logger.info(f"[{instance_id}] Worker task cancelled") finally: - logger.info( - f"[{instance_id}] _current_worker RESET (token={worker_token})" - ) _current_worker.reset(worker_token) self._worker = None finally: - # Reset ContextVar - logger.info( - f"[{instance_id}] _current_docket RESET (token={docket_token})" - ) _current_docket.reset(docket_token) - # Clear instance attribute self._docket = None - logger.info(f"[{instance_id}] self._docket = None") finally: - # Reset server ContextVar - logger.info(f"[{instance_id}] _current_server RESET (token={server_token})") _current_server.reset(server_token) - logger.info(f"[{instance_id}] _docket_lifespan EXITED") async def _register_mounted_server_functions( self, @@ -607,33 +564,18 @@ async def _register_mounted_server_functions( @asynccontextmanager async def _lifespan_manager(self) -> AsyncIterator[None]: - import socket - - instance_id = f"{socket.gethostname()}#{id(self)}" - logger.info(f"[{instance_id}] _lifespan_manager ENTERING") - if self._lifespan_result_set: # Lifespan already ran - ContextVars will be set by Context.__aenter__ # at request time, so we just yield here. - logger.info( - f"[{instance_id}] _lifespan_manager: already set, yielding " - f"(ContextVars managed by Context at request time)" - ) yield return - logger.info( - f"[{instance_id}] _lifespan_manager: entering user lifespan and docket lifespan" - ) async with ( self._lifespan(self) as user_lifespan_result, self._docket_lifespan(), ): self._lifespan_result = user_lifespan_result self._lifespan_result_set = True - logger.info( - f"[{instance_id}] _lifespan_manager: _lifespan_result_set = True" - ) async with AsyncExitStack[bool | None]() as stack: for server in self._mounted_servers: @@ -642,21 +584,13 @@ async def _lifespan_manager(self) -> AsyncIterator[None]: ) self._started.set() - logger.info( - f"[{instance_id}] _lifespan_manager YIELDING (server started, " - f"_started.is_set={self._started.is_set()})" - ) try: yield finally: - logger.info( - f"[{instance_id}] _lifespan_manager EXITING (after yield)" - ) self._started.clear() self._lifespan_result_set = False self._lifespan_result = None - logger.info(f"[{instance_id}] _lifespan_manager EXITED") async def run_async( self, diff --git a/src/fastmcp/server/tasks/handlers.py b/src/fastmcp/server/tasks/handlers.py index 34a2f9824b..ecfa0c8082 100644 --- a/src/fastmcp/server/tasks/handlers.py +++ b/src/fastmcp/server/tasks/handlers.py @@ -44,13 +44,6 @@ async def handle_tool_as_task( Returns: CallToolResult: Task stub with task metadata in _meta """ - import socket - - from fastmcp.utilities.logging import get_logger - - _logger = get_logger(__name__) - instance_id = f"{socket.gethostname()}#{id(server)}" - # Generate server-side task ID per SEP-1686 final spec (line 375-377) # Server MUST generate task IDs, clients no longer provide them server_task_id = str(uuid.uuid4()) @@ -65,20 +58,11 @@ async def handle_tool_as_task( # Get Docket from ContextVar (set by Context.__aenter__ at request time) docket = _current_docket.get() - _logger.info( - f"[{instance_id}] handle_tool_as_task: tool={tool_name}, " - f"_current_docket.get()={docket}" - ) if docket is None: - _logger.error( - f"[{instance_id}] handle_tool_as_task FAILED: no Docket available! " - f"server._started.is_set()={server._started.is_set()}" - ) raise McpError( ErrorData( code=INTERNAL_ERROR, - message=f"Background tasks require a running FastMCP server context " - f"(instance={instance_id})", + message="Background tasks require a running FastMCP server context", ) ) @@ -96,23 +80,9 @@ async def handle_tool_as_task( ttl_seconds = int( docket.execution_ttl.total_seconds() + TASK_MAPPING_TTL_BUFFER_SECONDS ) - _logger.info( - f"[{instance_id}] About to write to Redis: task_meta_key={task_meta_key}, " - f"created_at_key={created_at_key}, ttl={ttl_seconds}" - ) - try: - async with docket.redis() as redis: - await redis.set(task_meta_key, task_key, ex=ttl_seconds) - await redis.set(created_at_key, created_at, ex=ttl_seconds) - _logger.info(f"[{instance_id}] Redis write successful") - except Exception as e: - import traceback - - _logger.error( - f"[{instance_id}] Redis write FAILED: {type(e).__name__}: {e}\n" - f"Traceback:\n{traceback.format_exc()}" - ) - raise + async with docket.redis() as redis: + await redis.set(task_meta_key, task_key, ex=ttl_seconds) + await redis.set(created_at_key, created_at, ex=ttl_seconds) # Send notifications/tasks/created per SEP-1686 (mandatory) # Send BEFORE queuing to avoid race where task completes before notification @@ -134,38 +104,18 @@ async def handle_tool_as_task( # Queue function to Docket by name (result storage via execution_ttl) # Use tool.key which matches what was registered - prefixed for mounted tools - _logger.info( - f"[{instance_id}] About to call docket.add: tool.key={tool.key}, " - f"task_key={task_key}, arguments={arguments}" - ) - try: - await docket.add( - tool.key, - key=task_key, - )(**arguments) - _logger.info(f"[{instance_id}] docket.add completed successfully") - except Exception as e: - import traceback - - _logger.error( - f"[{instance_id}] docket.add FAILED: {type(e).__name__}: {e}\n" - f"Traceback:\n{traceback.format_exc()}" - ) - raise + await docket.add( + tool.key, + key=task_key, + )(**arguments) # Spawn subscription task to send status notifications (SEP-1686 optional feature) from fastmcp.server.tasks.subscriptions import subscribe_to_task_updates # Start subscription in session's task group (persists for connection lifetime) - _logger.info( - f"[{instance_id}] Checking for subscription task group: " - f"hasattr={hasattr(ctx.session, '_subscription_task_group')}" - ) if hasattr(ctx.session, "_subscription_task_group"): tg = ctx.session._subscription_task_group # type: ignore[attr-defined] - _logger.info(f"[{instance_id}] Task group: {tg}") if tg: - _logger.info(f"[{instance_id}] Starting subscription task") tg.start_soon( # type: ignore[union-attr] subscribe_to_task_updates, server_task_id, @@ -173,9 +123,7 @@ async def handle_tool_as_task( ctx.session, docket, ) - _logger.info(f"[{instance_id}] Subscription task started") - _logger.info(f"[{instance_id}] About to return task stub") # Return task stub # Tasks MUST begin in "working" status per SEP-1686 final spec (line 381) return mcp.types.CallToolResult( @@ -208,13 +156,6 @@ async def handle_prompt_as_task( Returns: GetPromptResult: Task stub with task metadata in _meta """ - import socket - - from fastmcp.utilities.logging import get_logger - - _logger = get_logger(__name__) - instance_id = f"{socket.gethostname()}#{id(server)}" - # Generate server-side task ID per SEP-1686 final spec (line 375-377) # Server MUST generate task IDs, clients no longer provide them server_task_id = str(uuid.uuid4()) @@ -229,20 +170,11 @@ async def handle_prompt_as_task( # Get Docket from ContextVar (set by Context.__aenter__ at request time) docket = _current_docket.get() - _logger.info( - f"[{instance_id}] handle_prompt_as_task: prompt={prompt_name}, " - f"_current_docket.get()={docket}" - ) if docket is None: - _logger.error( - f"[{instance_id}] handle_prompt_as_task FAILED: no Docket available! " - f"server._started.is_set()={server._started.is_set()}" - ) raise McpError( ErrorData( code=INTERNAL_ERROR, - message=f"Background tasks require a running FastMCP server context " - f"(instance={instance_id})", + message="Background tasks require a running FastMCP server context", ) ) @@ -260,23 +192,9 @@ async def handle_prompt_as_task( ttl_seconds = int( docket.execution_ttl.total_seconds() + TASK_MAPPING_TTL_BUFFER_SECONDS ) - _logger.info( - f"[{instance_id}] About to write to Redis: task_meta_key={task_meta_key}, " - f"created_at_key={created_at_key}, ttl={ttl_seconds}" - ) - try: - async with docket.redis() as redis: - await redis.set(task_meta_key, task_key, ex=ttl_seconds) - await redis.set(created_at_key, created_at, ex=ttl_seconds) - _logger.info(f"[{instance_id}] Redis write successful") - except Exception as e: - import traceback - - _logger.error( - f"[{instance_id}] Redis write FAILED: {type(e).__name__}: {e}\n" - f"Traceback:\n{traceback.format_exc()}" - ) - raise + async with docket.redis() as redis: + await redis.set(task_meta_key, task_key, ex=ttl_seconds) + await redis.set(created_at_key, created_at, ex=ttl_seconds) # Send notifications/tasks/created per SEP-1686 (mandatory) # Send BEFORE queuing to avoid race where task completes before notification @@ -295,38 +213,18 @@ async def handle_prompt_as_task( # Queue function to Docket by name (result storage via execution_ttl) # Use prompt.key which matches what was registered - prefixed for mounted prompts - _logger.info( - f"[{instance_id}] About to call docket.add: prompt.key={prompt.key}, " - f"task_key={task_key}, arguments={arguments}" - ) - try: - await docket.add( - prompt.key, - key=task_key, - )(**(arguments or {})) - _logger.info(f"[{instance_id}] docket.add completed successfully") - except Exception as e: - import traceback - - _logger.error( - f"[{instance_id}] docket.add FAILED: {type(e).__name__}: {e}\n" - f"Traceback:\n{traceback.format_exc()}" - ) - raise + await docket.add( + prompt.key, + key=task_key, + )(**(arguments or {})) # Spawn subscription task to send status notifications (SEP-1686 optional feature) from fastmcp.server.tasks.subscriptions import subscribe_to_task_updates # Start subscription in session's task group (persists for connection lifetime) - _logger.info( - f"[{instance_id}] Checking for subscription task group: " - f"hasattr={hasattr(ctx.session, '_subscription_task_group')}" - ) if hasattr(ctx.session, "_subscription_task_group"): tg = ctx.session._subscription_task_group # type: ignore[attr-defined] - _logger.info(f"[{instance_id}] Task group: {tg}") if tg: - _logger.info(f"[{instance_id}] Starting subscription task") tg.start_soon( # type: ignore[union-attr] subscribe_to_task_updates, server_task_id, @@ -334,9 +232,7 @@ async def handle_prompt_as_task( ctx.session, docket, ) - _logger.info(f"[{instance_id}] Subscription task started") - _logger.info(f"[{instance_id}] About to return task stub") # Return task stub # Tasks MUST begin in "working" status per SEP-1686 final spec (line 381) return mcp.types.GetPromptResult( @@ -370,13 +266,6 @@ async def handle_resource_as_task( Returns: ServerResult with ReadResourceResult stub """ - import socket - - from fastmcp.utilities.logging import get_logger - - _logger = get_logger(__name__) - instance_id = f"{socket.gethostname()}#{id(server)}" - # Generate server-side task ID per SEP-1686 final spec (line 375-377) # Server MUST generate task IDs, clients no longer provide them server_task_id = str(uuid.uuid4()) @@ -391,20 +280,11 @@ async def handle_resource_as_task( # Get Docket from ContextVar (set by Context.__aenter__ at request time) docket = _current_docket.get() - _logger.info( - f"[{instance_id}] handle_resource_as_task: uri={uri}, " - f"_current_docket.get()={docket}" - ) if docket is None: - _logger.error( - f"[{instance_id}] handle_resource_as_task FAILED: no Docket available! " - f"server._started.is_set()={server._started.is_set()}" - ) raise McpError( ErrorData( code=INTERNAL_ERROR, - message=f"Background tasks require a running FastMCP server context " - f"(instance={instance_id})", + message="Background tasks require a running FastMCP server context", ) ) @@ -419,23 +299,9 @@ async def handle_resource_as_task( ttl_seconds = int( docket.execution_ttl.total_seconds() + TASK_MAPPING_TTL_BUFFER_SECONDS ) - _logger.info( - f"[{instance_id}] About to write to Redis: task_meta_key={task_meta_key}, " - f"created_at_key={created_at_key}, ttl={ttl_seconds}" - ) - try: - async with docket.redis() as redis: - await redis.set(task_meta_key, task_key, ex=ttl_seconds) - await redis.set(created_at_key, created_at, ex=ttl_seconds) - _logger.info(f"[{instance_id}] Redis write successful") - except Exception as e: - import traceback - - _logger.error( - f"[{instance_id}] Redis write FAILED: {type(e).__name__}: {e}\n" - f"Traceback:\n{traceback.format_exc()}" - ) - raise + async with docket.redis() as redis: + await redis.set(task_meta_key, task_key, ex=ttl_seconds) + await redis.set(created_at_key, created_at, ex=ttl_seconds) # Send notifications/tasks/created per SEP-1686 (mandatory) # Send BEFORE queuing to avoid race where task completes before notification @@ -459,57 +325,23 @@ async def handle_resource_as_task( if isinstance(resource, FunctionResourceTemplate): params = match_uri_template(uri, resource.uri_template) or {} - _logger.info( - f"[{instance_id}] About to call docket.add: resource.name={resource.name}, " - f"task_key={task_key}, params={params} (template)" - ) - try: - await docket.add( - resource.name, - key=task_key, - )(**params) - _logger.info(f"[{instance_id}] docket.add completed successfully") - except Exception as e: - import traceback - - _logger.error( - f"[{instance_id}] docket.add FAILED: {type(e).__name__}: {e}\n" - f"Traceback:\n{traceback.format_exc()}" - ) - raise + await docket.add( + resource.name, + key=task_key, + )(**params) else: - _logger.info( - f"[{instance_id}] About to call docket.add: resource.name={resource.name}, " - f"task_key={task_key} (static resource)" - ) - try: - await docket.add( - resource.name, - key=task_key, - )() - _logger.info(f"[{instance_id}] docket.add completed successfully") - except Exception as e: - import traceback - - _logger.error( - f"[{instance_id}] docket.add FAILED: {type(e).__name__}: {e}\n" - f"Traceback:\n{traceback.format_exc()}" - ) - raise + await docket.add( + resource.name, + key=task_key, + )() # Spawn subscription task to send status notifications (SEP-1686 optional feature) from fastmcp.server.tasks.subscriptions import subscribe_to_task_updates # Start subscription in session's task group (persists for connection lifetime) - _logger.info( - f"[{instance_id}] Checking for subscription task group: " - f"hasattr={hasattr(ctx.session, '_subscription_task_group')}" - ) if hasattr(ctx.session, "_subscription_task_group"): tg = ctx.session._subscription_task_group # type: ignore[attr-defined] - _logger.info(f"[{instance_id}] Task group: {tg}") if tg: - _logger.info(f"[{instance_id}] Starting subscription task") tg.start_soon( # type: ignore[union-attr] subscribe_to_task_updates, server_task_id, @@ -517,9 +349,7 @@ async def handle_resource_as_task( ctx.session, docket, ) - _logger.info(f"[{instance_id}] Subscription task started") - _logger.info(f"[{instance_id}] About to return task stub") # Return task stub # Tasks MUST begin in "working" status per SEP-1686 final spec (line 381) return mcp.types.ServerResult( diff --git a/src/fastmcp/server/tasks/subscriptions.py b/src/fastmcp/server/tasks/subscriptions.py index 5b42694710..c22e88f041 100644 --- a/src/fastmcp/server/tasks/subscriptions.py +++ b/src/fastmcp/server/tasks/subscriptions.py @@ -42,23 +42,13 @@ async def subscribe_to_task_updates( session: MCP ServerSession for sending notifications docket: Docket instance for subscribing to execution events """ - logger.info( - f"subscribe_to_task_updates STARTING: task_id={task_id}, task_key={task_key}" - ) try: - logger.info( - f"subscribe_to_task_updates: About to call docket.get_execution({task_key})" - ) execution = await docket.get_execution(task_key) - logger.info( - f"subscribe_to_task_updates: docket.get_execution returned {execution}" - ) if execution is None: logger.warning(f"No execution found for task {task_id}") return # Subscribe to state and progress events from Docket - logger.info("subscribe_to_task_updates: About to subscribe to execution events") async for event in execution.subscribe(): if event["type"] == "state": # Send notifications/tasks/status when state changes @@ -80,12 +70,7 @@ async def subscribe_to_task_updates( ) except Exception as e: - import traceback - - logger.error( - f"subscribe_to_task_updates FAILED for {task_id}: {type(e).__name__}: {e}\n" - f"Traceback:\n{traceback.format_exc()}" - ) + logger.error(f"subscribe_to_task_updates failed for {task_id}: {e}") async def _send_status_notification( diff --git a/uv.lock b/uv.lock index afb7d2323d..b17a98f839 100644 --- a/uv.lock +++ b/uv.lock @@ -752,7 +752,7 @@ requires-dist = [ { name = "platformdirs", specifier = ">=4.0.0" }, { name = "py-key-value-aio", extras = ["disk", "keyring", "memory"], specifier = ">=0.3.0,<0.4.0" }, { name = "pydantic", extras = ["email"], specifier = ">=2.11.7" }, - { name = "pydocket", specifier = ">=0.16.4" }, + { name = "pydocket", specifier = ">=0.16.6" }, { name = "pyperclip", specifier = ">=1.9.0" }, { name = "python-dotenv", specifier = ">=1.1.0" }, { name = "rich", specifier = ">=13.9.4" }, @@ -1775,7 +1775,7 @@ wheels = [ [[package]] name = "pydocket" -version = "0.16.4" +version = "0.16.6" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "cloudpickle" }, @@ -1792,9 +1792,9 @@ dependencies = [ { name = "typer" }, { name = "typing-extensions" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/4d/c6/eb7f3af72fa5c04b52a3f9390ff0c948987441987f9526dd992d2a6b3524/pydocket-0.16.4.tar.gz", hash = "sha256:d034d1ac75877560d86329fb3643e7b862fcbcdac407d876a62f5d9e386e8753", size = 297949, upload-time = "2026-01-08T21:58:31.637Z" } +sdist = { url = "https://files.pythonhosted.org/packages/72/00/26befe5f58df7cd1aeda4a8d10bc7d1908ffd86b80fd995e57a2a7b3f7bd/pydocket-0.16.6.tar.gz", hash = "sha256:b96c96ad7692827214ed4ff25fcf941ec38371314db5dcc1ae792b3e9d3a0294", size = 299054, upload-time = "2026-01-09T22:09:15.405Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/74/c5/e6ffed3902ead6cb906758749c42211f7f24ea6d8fdd772f531f5a81c9fa/pydocket-0.16.4-py3-none-any.whl", hash = "sha256:cdcdf74b987c2cd5d03c7353d15f8dd2ac9bd43f2a91f7441748d5a8ebd617c9", size = 67374, upload-time = "2026-01-08T21:58:30.01Z" }, + { url = "https://files.pythonhosted.org/packages/0a/3f/7483e5a6dc6326b6e0c640619b5c5bd1d6e3c20e54d58f5fb86267cef00e/pydocket-0.16.6-py3-none-any.whl", hash = "sha256:683d21e2e846aa5106274e7d59210331b242d7fb0dce5b08d3b82065663ed183", size = 67697, upload-time = "2026-01-09T22:09:13.436Z" }, ] [[package]]